Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a way to fetch transactions in P2P without specifying a peer #2376

Merged
merged 8 commits into from
Oct 31, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added
- [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`).
- [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer.

## [Version 0.40.0]

Expand Down
45 changes: 43 additions & 2 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub enum TaskRequest {
channel: OnResponse<Option<Vec<SealedBlockHeader>>>,
},
GetTransactions {
block_height_range: Range<u32>,
channel: OnResponse<Option<Vec<Transactions>>>,
AurelienFT marked this conversation as resolved.
Show resolved Hide resolved
},
GetTransactionsFromPeer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this variant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's useful in case of syncing when you already received the header from someone in the current syncing process and so you can directly ask him without having to redo peer selection. Maybe there is also othhers use cases but this is the only one I see.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that after your PR we don't need this variant anymore. But we can address it in your PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhhh yeah let's discuss it in the other PR because IMO what I have written above still works in certain cases.

block_height_range: Range<u32>,
from_peer: PeerId,
channel: OnResponse<Option<Vec<Transactions>>>,
Expand Down Expand Up @@ -165,6 +169,9 @@ impl Debug for TaskRequest {
TaskRequest::GetTransactions { .. } => {
write!(f, "TaskRequest::GetTransactions")
}
TaskRequest::GetTransactionsFromPeer { .. } => {
write!(f, "TaskRequest::GetTransactionsFromPeer")
}
TaskRequest::TxPoolGetAllTxIds { .. } => {
write!(f, "TaskRequest::TxPoolGetAllTxIds")
}
Expand Down Expand Up @@ -856,7 +863,16 @@ where
tracing::warn!("No peers found for block at height {:?}", height);
}
}
Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => {
Some(TaskRequest::GetTransactions {block_height_range, channel }) => {
let channel = ResponseSender::Transactions(channel);
let request_msg = RequestMessage::Transactions(block_height_range.clone());
let height = BlockHeight::from(block_height_range.end.saturating_sub(1));
let peer = self.p2p_service.get_peer_id_with_height(&height);
if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return error to the channel if peer is None, to allow proper handling of it by the caller.

Copy link
Contributor Author

@AurelienFT AurelienFT Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it, ok. However, I want to ask if we want the same in GetSealedHeaders (just above in the code) ? It has the same behavior and doesn't return the error for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good catch, let's fix it as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It complexify a bit the type in the channel because we have a two-stage error type but with unification of return type etc that we want to do in p2p maybe we could improve this in the future.

return Err(anyhow!("No peers found for block at height {:?}", height));
}
}
Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => {
let channel = ResponseSender::Transactions(channel);
let request_msg = RequestMessage::Transactions(block_height_range);
self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target");
Expand Down Expand Up @@ -1020,6 +1036,31 @@ impl SharedState {
Ok((peer_id.to_bytes(), data))
}

pub async fn get_transactions(
&self,
range: Range<u32>,
) -> anyhow::Result<(Vec<u8>, Option<Vec<Transactions>>)> {
let (sender, receiver) = oneshot::channel();

if range.is_empty() {
return Err(anyhow!(
"Cannot retrieve transactions for an empty range of block heights"
));
}

self.request_sender
.send(TaskRequest::GetTransactions {
block_height_range: range,
channel: sender,
})
.await?;
Comment on lines +1087 to +1092
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should continue to use the TaskRequest::GetTransactionsFromPeer and perform peer selection for a given block height instead -

fn get_peer_id_with_height(&self, height: &BlockHeight) -> Option<PeerId> {
self.peer_manager().get_peer_id_with_height(height)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think that there is still use case where it's interesting to already give a peer that we know have the information, we use this un sync service.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, i agree ~ but this information is also valid for a peer returned by get_peer_id_with_height. perhaps we should not duplicate valid peers for a given height then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get your comment. My flow is that I ask for an header and so the peer is returned along with the header then I can directly ask to this peer about the transactions because in our network if you have the header we assume you have the transactions. It avoid running this get_peer_id_with_height function that can be costly.
Maybe I miss your point and in this case we can go over more voice discussion :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should continue to use the TaskRequest::GetTransactionsFromPeer and perform peer selection for a given block height instead

If we do peer selection outside of the run loop, we can face race condition were peer is disconnected already


let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?;

let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?;
Ok((peer_id.to_bytes(), data))
}

pub async fn get_transactions_from_peer(
&self,
peer_id: FuelPeerId,
Expand All @@ -1028,7 +1069,7 @@ impl SharedState {
let (sender, receiver) = oneshot::channel();
let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId");

let request = TaskRequest::GetTransactions {
let request = TaskRequest::GetTransactionsFromPeer {
block_height_range: range,
from_peer,
channel: sender,
Expand Down
Loading