-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Keep data in fails cases in sync service #2361
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the import task well enough to approve right now. I need clarification on the following points:
- How do we ensure this cache doesn't grow forever? Is the
Import
task short-lived? While the import task launches short-lived streams, it seems like a long-living task to me. - How can we be sure we'll query exactly the same ranges as we have cached? Where is that invariant maintained.
Let me know if you want to jump on a call to chat about this, or just write if I'm missing something obvious here.
header_stream | ||
let ranges = range_chunks(range, params.header_batch_size); | ||
futures::stream::iter(ranges) | ||
.map({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the pattern was established before this PR, I think it would be nice to use then
instead of map
here, and skip the .awaits
. We'd be able to return just a Stream<Item = SealedBlockBatch>
instead of having the nested futures in the returned stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree and there is a lot more things to improve on this service I don't wanna make this PR even bigger and so I created an issue for that : #2370
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then
resolved the future, while map
allows us create a stream to parallelize it later.
@netrome Thanks for taking the time to review this Regarding your interrogations : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far looks good, I need to have a deeper look at the tests though.
Convert to draft because of big refacto. |
…rs and blocks mixed)
…r and added a bunch of tests
Co-authored-by: Rafał Chabowski <[email protected]>
Had a chat about this. @xgreenx proposed we change the p2p interface to not require any peer ID when requesting transactions, but instead leave it up to the p2p implementation to decide which peer to request them from and return that peer ID in the response. |
## Linked Issues/PRs This is a requirement for #2361 ## Description This PR adds a way to fetch transactions with p2p but without giving a specific peer and let p2p choose the one they prefer. This will be used in #2361 ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --------- Co-authored-by: Green Baneling <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 6 out of 10 changed files in this pull request and generated no suggestions.
Files not reviewed (4)
- crates/services/sync/src/import/test_helpers/pressure_peer_to_peer.rs: Evaluated as low risk
- crates/services/sync/src/import/tests.rs: Evaluated as low risk
- crates/services/sync/src/ports.rs: Evaluated as low risk
- CHANGELOG.md: Evaluated as low risk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks really good=)
header_stream | ||
let ranges = range_chunks(range, params.header_batch_size); | ||
futures::stream::iter(ranges) | ||
.map({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then
resolved the future, while map
allows us create a stream to parallelize it later.
crates/services/sync/src/import.rs
Outdated
} | ||
} | ||
BlockHeaderData::Cached(CachedDataBatch::None(_)) => { | ||
unreachable!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it is true, let's return an error and print a log that this place shouldn't be reachable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a log and I returned a malformed batch which is used as error in this whole process. I don't want to change the whole architecture of the module for this error. (the other solution is to panic like it's done here :
fuel-core/crates/services/sync/src/import.rs
Line 475 in 99135e3
.expect("We checked headers are not empty above"), |
Some(peer_id) => { | ||
let source_peer = peer_id.clone().bind(range.clone()); | ||
let Ok(Some(txs)) = p2p | ||
.get_transactions_from_peer(source_peer) | ||
.await | ||
.trace_err("Failed to get transactions") | ||
else { | ||
report_peer( | ||
p2p, | ||
Some(peer_id.clone()), | ||
PeerReportReason::MissingTransactions, | ||
); | ||
return None; | ||
}; | ||
Some(SourcePeer { peer_id, data: txs }) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need to support this case?=)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so because if we are in the case where we don't use cache and we already fetched the header to a particular peer and we have his peer_id it's more optimize to directly asks him the transactions instead of running computation to find someone with these infos (and probably end-up on him also)
CachedDataBatch::Headers(batch) => { | ||
if batch.results.len() >= max_chunk_size { | ||
chunks.push(CachedDataBatch::Headers(batch)); | ||
CachedDataBatch::None(current_height..current_height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why we want to return None
instead of new Headers
with remaining elements.
I see that it was extracted from the loop and there it makes sense, because None
is a default value to start the next iteration of the loop. But here, it looks strange.
I think if we had function with name truncate_chunk
and did something like current_chunk = truncate_chunk(current_chunk, &mut chunks)
, it would be simpler to understand=)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this to split only when the chunk is inserted in the accumulator and it really simplify the whole code. I added some comments also.
p2p.expect_get_sealed_block_headers() | ||
.times(1) | ||
.in_sequence(&mut seq) | ||
.returning(|_| { | ||
Box::pin(async move { | ||
tokio::time::sleep(Duration::from_millis(300)).await; | ||
Err(anyhow::anyhow!("Some network error")) | ||
}) | ||
}); | ||
p2p.expect_get_sealed_block_headers() | ||
.times(2) | ||
.in_sequence(&mut seq) | ||
.returning(|range| { | ||
Box::pin(async move { | ||
let peer = random_peer(); | ||
let headers = Some(range.map(empty_header).collect()); | ||
let headers = peer.bind(headers); | ||
Ok(headers) | ||
}) | ||
}); | ||
// Then | ||
// Reask only for block 4 | ||
p2p.expect_get_sealed_block_headers() | ||
.times(1) | ||
.in_sequence(&mut seq) | ||
.returning(|range| { | ||
Box::pin(async move { | ||
let peer = random_peer(); | ||
let headers = Some(range.map(empty_header).collect()); | ||
let headers = peer.bind(headers); | ||
Ok(headers) | ||
}) | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the sequence is [fail, success, success]? Based on the comments I will expect either [fail, success] or [success(for first 3 blocks), fail, success]. The same question for get transactions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have explicit the comments the expected is indeed : [fail (4), success(5), success(6)] and then : [success(4)] only for both of the tests. Tell me if it's more clear :)
@@ -14,7 +14,11 @@ use crate::{ | |||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to see test where execution fails, and we see that we will not call p2p
because all data is fetched already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a test locally that is failing because this is not the behavior we decided together. I think we said that if execution fails we should remove the data from the cache because it would probably fails again. The line that clears the cache :
fuel-core/crates/services/sync/src/import.rs
Line 340 in a566ac5
cache.remove_element(&height); |
@xgreenx Thanks for the kind comment and I have addressed all of your concerns some may still need some answers :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice stuff! Some minor questions and comments from me, but overall looks good.
} | ||
|
||
pub fn insert_blocks(&mut self, batch: Batch<SealedBlock>) { | ||
let mut lock = self.0.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh didn't know the parking lot Mutex was infallible. So no poisoned mutexes to worry about, nice!
)) | ||
} | ||
(CachedDataBatch::Headers(mut batch), CachedData::Header(data)) => { | ||
debug_assert_eq!(batch.range.end, height); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we perhaps log a warning if this isn't correct in production?
CachedDataBatch::None(4..7), | ||
CachedDataBatch::None(7..10), | ||
CachedDataBatch::None(10..11), | ||
]; "one header and empty ranges")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The range exists but there are no blocks. I don't see any test cases with an empty range or a 0 max batch size. Would be interesting to add. Otherwise, love the test suite!
#[async_trait::async_trait] | ||
#[cfg_attr(any(test, feature = "benchmarking"), mockall::automock)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the order here matters (i.e. the previous order didn't add the async_trait
sugar to the mocks right?) and that this is a broken-window fix, or how does this change relate` to the current PR? It seems very orthogonal.
Linked Issues/PRs
Closes #2357
Description
This pull request introduces a caching mechanism to the sync service to avoid redundant data fetching from the network. The most important changes include adding a cache module, modifying the
Import
struct to include a cache, and updating related methods to utilize this cache.Caching Mechanism:
crates/services/sync/src/import.rs
: Added a newcache
module and integrated it into theImport
struct. Updated methods to use the cache for fetching and storing headers and blocks.Test Updates:
This PR contains 50% of changes in the tests and addition of tests in the cache.
Checklist
Before requesting review