diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ed7c7bb58..3060675e042 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2347](https://github.com/FuelLabs/fuel-core/pull/2364): Add activity concept in order to protect against infinitely increasing DA gas price scenarios - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. - [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit. +- [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer. ## [Version 0.40.0] diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 05dd2ec1b38..47d189ae85f 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -677,17 +677,31 @@ impl FuelP2PService { V2ResponseMessage::SealedHeaders(v) => { // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 // Change type of ResponseSender and remove the .ok() here - c.send((peer, Ok(v.ok()))).is_ok() + c.send(Ok((peer, Ok(v.ok())))).is_ok() } _ => { warn!( "Invalid response type received for request {:?}", request_id ); - c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() + c.send(Ok((peer, Err(ResponseError::TypeMismatch)))) + .is_ok() } }, ResponseSender::Transactions(c) => match response { + V2ResponseMessage::Transactions(v) => { + c.send(Ok((peer, Ok(v.ok())))).is_ok() + } + _ => { + warn!( + "Invalid response type received for request {:?}", + request_id + ); + c.send(Ok((peer, Err(ResponseError::TypeMismatch)))) + .is_ok() + } + }, + ResponseSender::TransactionsFromPeer(c) => match response { V2ResponseMessage::Transactions(v) => { c.send((peer, Ok(v.ok()))).is_ok() } @@ -750,9 +764,12 @@ impl FuelP2PService { if let Some(channel) = self.outbound_requests_table.remove(&request_id) { match channel { ResponseSender::SealedHeaders(c) => { - let _ = c.send((peer, Err(ResponseError::P2P(error)))); + let _ = c.send(Ok((peer, Err(ResponseError::P2P(error))))); } ResponseSender::Transactions(c) => { + let _ = c.send(Ok((peer, Err(ResponseError::P2P(error))))); + } + ResponseSender::TransactionsFromPeer(c) => { let _ = c.send((peer, Err(ResponseError::P2P(error)))); } ResponseSender::TxPoolAllTransactionsIds(c) => { @@ -1700,9 +1717,25 @@ mod tests { let expected = arbitrary_headers_for_range(range.clone()); - if let Ok((_, Ok(sealed_headers))) = response_message { - let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b)); - let _ = tx_test_end.send(check).await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(Some(sealed_headers)))) => { + let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b)); + let _ = tx_test_end.send(check).await; + }, + Ok((_, Ok(None))) => { + tracing::error!("Node A did not return any headers"); + let _ = tx_test_end.send(false).await; + }, + Ok((_, Err(e))) => { + tracing::error!("Error in P2P communication: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + Err(e) => { + tracing::error!("Error in P2P before sending message: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + } } else { tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); let _ = tx_test_end.send(false).await; @@ -1717,9 +1750,25 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transactions)))) = response_message { - let check = transactions.len() == 1 && transactions[0].0.len() == 5; - let _ = tx_test_end.send(check).await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(Some(transactions)))) => { + let check = transactions.len() == 1 && transactions[0].0.len() == 5; + let _ = tx_test_end.send(check).await; + }, + Ok((_, Ok(None))) => { + tracing::error!("Node A did not return any transactions"); + let _ = tx_test_end.send(false).await; + }, + Ok((_, Err(e))) => { + tracing::error!("Error in P2P communication: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + Err(e) => { + tracing::error!("Error in P2P before sending message: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + } } else { tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); let _ = tx_test_end.send(false).await; @@ -1878,23 +1927,28 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - match response_message { - Ok((_, Ok(_))) => { - let _ = tx_test_end.send(false).await; - panic!("Request succeeded unexpectedly"); - }, - Ok((_, Err(ResponseError::TypeMismatch))) => { - // Got Invalid Response Type as expected, so end test - let _ = tx_test_end.send(true).await; - }, - Ok((_, Err(err))) => { - let _ = tx_test_end.send(false).await; - panic!("Unexpected error: {:?}", err); - }, - Err(_) => { - let _ = tx_test_end.send(false).await; - panic!("Channel closed unexpectedly"); - }, + if let Ok(response) = response_message { + match response { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly"); + }, + Ok((_, Err(ResponseError::TypeMismatch))) => { + // Got Invalid Response Type as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error in P2P communication: {:?}", err); + }, + Err(e) => { + let _ = tx_test_end.send(false).await; + panic!("Error in P2P before sending message: {:?}", e); + }, + } + } else { + let _ = tx_test_end.send(false).await; + panic!("Orchestrator failed to receive a message: {:?}", response_message); } }); } @@ -1964,21 +2018,29 @@ mod tests { tokio::spawn(async move { // 3. Simulating NetworkOrchestrator receiving a Timeout Error Message! - match rx_orchestrator.await { - Ok((_, Ok(_))) => { - let _ = tx_test_end.send(false).await; - panic!("Request succeeded unexpectedly")}, - Ok((_, Err(ResponseError::P2P(_)))) => { - // Got timeout as expected, so end test - let _ = tx_test_end.send(true).await; - }, - Ok((_, Err(err))) => { - let _ = tx_test_end.send(false).await; - panic!("Unexpected error: {:?}", err); - }, - Err(e) => { - let _ = tx_test_end.send(false).await; - panic!("Channel closed unexpectedly: {:?}", e)}, + let response_message = rx_orchestrator.await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly"); + }, + Ok((_, Err(ResponseError::P2P(_)))) => { + // Got Invalid Response Type as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error in P2P communication: {:?}", err); + }, + Err(e) => { + let _ = tx_test_end.send(false).await; + panic!("Error in P2P before sending message: {:?}", e); + }, + } + } else { + let _ = tx_test_end.send(false).await; + panic!("Orchestrator failed to receive a message: {:?}", response_message); } }); } diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 2a0e03ba2cd..f1c3b176f4b 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,6 +18,8 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; +use crate::service::TaskError; + pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; @@ -104,11 +106,16 @@ impl From for V1ResponseMessage { } pub type OnResponse = oneshot::Sender<(PeerId, Result)>; +// This type is more complex because it's used in tasks that need to select a peer to send the request and this +// can cause errors where the peer is not defined. +pub type OnResponseWithPeerSelection = + oneshot::Sender), TaskError>>; #[derive(Debug)] pub enum ResponseSender { - SealedHeaders(OnResponse>>), - Transactions(OnResponse>>), + SealedHeaders(OnResponseWithPeerSelection>>), + Transactions(OnResponseWithPeerSelection>>), + TransactionsFromPeer(OnResponse>>), TxPoolAllTransactionsIds(OnResponse>>), TxPoolFullTransactions(OnResponse>>>), } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c85e1e3a6c8..30c5dd9310a 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -21,6 +21,7 @@ use crate::{ }, request_response::messages::{ OnResponse, + OnResponseWithPeerSelection, RequestMessage, ResponseMessageErrorCode, ResponseSender, @@ -84,6 +85,7 @@ use std::{ ops::Range, sync::Arc, }; +use thiserror::Error; use tokio::{ sync::{ broadcast, @@ -104,6 +106,12 @@ const CHANNEL_SIZE: usize = 1024 * 10; pub type Service = ServiceRunner>; +#[derive(Debug, Error)] +pub enum TaskError { + #[error("No peer found to send request to")] + NoPeerFound, +} + pub enum TaskRequest { // Broadcast requests to p2p network BroadcastTransaction(Arc), @@ -113,9 +121,13 @@ pub enum TaskRequest { }, GetSealedHeaders { block_height_range: Range, - channel: OnResponse>>, + channel: OnResponseWithPeerSelection>>, }, GetTransactions { + block_height_range: Range, + channel: OnResponseWithPeerSelection>>, + }, + GetTransactionsFromPeer { block_height_range: Range, from_peer: PeerId, channel: OnResponse>>, @@ -167,6 +179,9 @@ impl Debug for TaskRequest { TaskRequest::GetTransactions { .. } => { write!(f, "TaskRequest::GetTransactions") } + TaskRequest::GetTransactionsFromPeer { .. } => { + write!(f, "TaskRequest::GetTransactionsFromPeer") + } TaskRequest::TxPoolGetAllTxIds { .. } => { write!(f, "TaskRequest::TxPoolGetAllTxIds") } @@ -869,19 +884,29 @@ where } } Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => { - let channel = ResponseSender::SealedHeaders(channel); - let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); - // Note: this range has already been checked for // validity in `SharedState::get_sealed_block_headers`. let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); - let peer = self.p2p_service.get_peer_id_with_height(&height); - if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() { - tracing::warn!("No peers found for block at height {:?}", height); - } + let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { + let _ = channel.send(Err(TaskError::NoPeerFound)); + return Ok(should_continue); + }; + let channel = ResponseSender::SealedHeaders(channel); + let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); + self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target"); } - Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => { + Some(TaskRequest::GetTransactions {block_height_range, channel }) => { + let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); + let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { + let _ = channel.send(Err(TaskError::NoPeerFound)); + return Ok(should_continue); + }; let channel = ResponseSender::Transactions(channel); + let request_msg = RequestMessage::Transactions(block_height_range.clone()); + self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target"); + } + Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => { + let channel = ResponseSender::TransactionsFromPeer(channel); let request_msg = RequestMessage::Transactions(block_height_range); self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target"); } @@ -1038,7 +1063,38 @@ impl SharedState { }) .await?; - let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; + let (peer_id, response) = receiver + .await + .map_err(|e| anyhow!("{e}"))? + .map_err(|e| anyhow!("{e}"))?; + + let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + Ok((peer_id.to_bytes(), data)) + } + + pub async fn get_transactions( + &self, + range: Range, + ) -> anyhow::Result<(Vec, Option>)> { + let (sender, receiver) = oneshot::channel(); + + if range.is_empty() { + return Err(anyhow!( + "Cannot retrieve transactions for an empty range of block heights" + )); + } + + self.request_sender + .send(TaskRequest::GetTransactions { + block_height_range: range, + channel: sender, + }) + .await?; + + let (peer_id, response) = receiver + .await + .map_err(|e| anyhow!("{e}"))? + .map_err(|e| anyhow!("{e}"))?; let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; Ok((peer_id.to_bytes(), data)) @@ -1052,7 +1108,7 @@ impl SharedState { let (sender, receiver) = oneshot::channel(); let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId"); - let request = TaskRequest::GetTransactions { + let request = TaskRequest::GetTransactionsFromPeer { block_height_range: range, from_peer, channel: sender,