-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring: Move partition snapshotting to PPM #2303
Conversation
17e437f
to
a39b4bd
Compare
0a64a61
to
494bccc
Compare
partition_id: PartitionId, | ||
archived_lsn: Lsn, | ||
) -> anyhow::Result<()> { | ||
let mut guard = self.lookup.lock().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
494bccc
to
c3f448b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. I think it looks already quite good. I've left a question about how blocking the export of a column family snapshot is and whether it needs to be run outside of the Tokio threads. I also left a suggestion how you could handle the error cases and responding to a caller a bit more streamlined.
crates/worker/src/partition/mod.rs
Outdated
PartitionProcessorControlCommand::RunForLeader(leader_epoch) => { | ||
RunForLeader(leader_epoch) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personal taste: I do prefer the enum type being visible here because it tells me on this line where it is coming from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, I won't do this :-)
let snapshot_id = SnapshotId::new(); | ||
let snapshot = self | ||
.partition_store_manager | ||
.export_partition_snapshot(self.partition_id, snapshot_id, self.snapshot_base_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much of a blocking operation is this one? Like how much does snapshotting a CF in RocksDB block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered above - the only part that's blocking is already offloaded as a low-priority StorageTask
further down in the RocksDb
impl layer.
e6ffde8
to
693c174
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @pcholakov. I think it looks really good. The one thing which wasn't fully clear to me is the need for the explicit archived lsn watch. I think we can remove it and get the information from the returned PartitionSnapshotMetadata
.
let mut partition_store = self | ||
.lookup | ||
.lock() | ||
.await | ||
.live | ||
.get_mut(&partition_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be simplified via calling self.get_partition_store()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely! I completely missed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your feedback, @tillrohrmann! I believe all the concerns you raised are addressed in the latest revision.
let mut partition_store = self | ||
.lookup | ||
.lock() | ||
.await | ||
.live | ||
.get_mut(&partition_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely! I completely missed that.
warn!( | ||
partition_id = %self.partition_id, | ||
"Failed to create partition snapshot: {}", | ||
err | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, this should be IO errors related to exporting the snapshot itself (something bubbles up from RocksDB's export_column_family
) or ancillaries (writing metadata JSON header to disk, in the future - uploading to the object store). Do you think we could do something more besides log it? We're responding to the caller also, so I think it would generally be their responsibility to redrive and/or raise a flare.
I can also see introducing some metrics around snapshotting in the future - successes/errors/bytes uploaded. Maybe save that for when I introduce the object store integration?
crates/worker/src/partition_processor_manager/processor_state.rs
Outdated
Show resolved
Hide resolved
@@ -656,7 +696,7 @@ impl PartitionProcessorManager { | |||
} | |||
} | |||
|
|||
fn request_partition_snapshots(&mut self) { | |||
fn trigger_periodic_partition_snapshots(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had missed this in this in the initial revision, added now! (Line 748
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @pcholakov. I think we are very close to merge it :-) I left a few comments/questions.
if self.pending_snapshots.contains_key(&partition_id) { | ||
warn!(%partition_id, "Partition processor stopped while snapshot task is still pending."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partition processor didn't stop yet. It is stopping. When it stopped, on_asynchronous_event
will be called with EventKind::Stopped
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated wording to be more precise. Just to test my understanding: while we've already called processor.cancel()
at this point, that doesn't necessarily mean the cancellation token effect has propagated yet?
if let Some(pending) = self.pending_snapshots.remove(&partition_id) { | ||
let _ = pending.sender.send(response); | ||
} else { | ||
error!("Snapshot task result received, but there was no pending sender found!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error
is usually used for situations where the cluster is at risk of failing. Maybe a lower log level is ok for this situation (even though I currently can't see how senders will disappear from pending_snapshots
so it probably shouldn't happen).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went back and forth on this. It's definitely not critical to system stability, so I'll update this back to warn
but it does indicate a potential bug.
TaskKind::PartitionSnapshotProducer, | ||
"create-snapshot", | ||
Some(partition_id), | ||
async move { create_snapshot_task.run().await }.instrument(snapshot_span), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Instrumenting run
via #[instrument()]
would have the benefit that it's clearer whats being emitted via this span (snapshot_id
and partition_id
). Now if one touches the SnapshotPartitionTask
one needs to be aware of this detail (someone else attaches a span) that lives in a different file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huge improvement, thank you!
/// The Partition Processor is in a state in which it is acceptable to create and publish | ||
/// snapshots. Since we don't want newer snapshots to move backwards in applied LSN, the current | ||
/// implementation checks whether the processor is fully caught up with the log. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From correctness pov, it wouldn't be a problem if newer snapshots would go backwards in LSN, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, definitely not! It's more a question of how aggressively do we want to trim - if we wanted to leave precisely one snapshot in the repository and trim the log to its LSN, then we should make a much greater effort to ensure we don't move backwards. But I suspect that once you put a snapshot in the repository, it will probably stay there on the order of days to months before it gets pruned.
With some upcoming changes it will get cheap to do a check of the latest snapshot LSN in the repo, so we'll be able to easily add a condition on the producer not to put lower LSN snapshots in the very near future, iff we wanted this to be strictly monotonic.
@@ -656,7 +696,7 @@ impl PartitionProcessorManager { | |||
} | |||
} | |||
|
|||
fn request_partition_snapshots(&mut self) { | |||
fn trigger_periodic_partition_snapshots(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why trigger_periodic_partition_snapshots
sends a ProcessorsManagerCommand::CreateSnapshot
instead of directly calling spawn_create_snapshot_task
?
let _ = self.result_sender.send(match result { | ||
Ok(metadata) => { | ||
debug!( | ||
archived_lsn = %metadata.min_applied_lsn, | ||
"Partition snapshot created" | ||
); | ||
Ok(metadata) | ||
} | ||
Err(err) => { | ||
warn!("Failed to create partition snapshot: {}", err); | ||
Err(err) | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to directly return result
? Then we don't have to introduce the oneshot
that one needs to keep track of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! This is much cleaner; I tried this in an earlier iteration and hit some channel ownership issues which are now gone by virtue of other simplifications.
Some(result) = self.snapshot_export_tasks.next() => { | ||
if let Ok(result) = result { | ||
self.on_create_snapshot_task_completed(result); | ||
} else { | ||
debug!("Create snapshot task failed: {}", result.unwrap_err()); // shutting down | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like polling TaskHandle
a bit more than introducing a new indirection via the oneshots
. One advantage of polling the TaskHandle
is that one would also see panics that might crash the task. The oneshot would be closed in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see how that is an improvement! I've switched to polling a FuturesUnordered<TaskHandle<_>>
now :-)
This change moves the responsibility for orchestrating snapshot creation to the PartitionProcessorManager, allowing the PartitionProcessor to be more focused on its core task of processing journal operations.
9f6d162
to
d686e7e
Compare
Thanks for all the input, @tillrohrmann! :-) Ready for another pass whenever you get a chance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @pcholakov. It looks really good. I have one question left which is about the behavior of spawn_create_snapshot_task
when we pass in None
as sender
. Did I understand it correctly that we won't remember a spawned snapshot task in this case? So if another snapshot request comes while the previous task is still running, would we create another task?
|
||
pending_snapshots: HashMap<PartitionId, oneshot::Sender<SnapshotResult>>, | ||
snapshot_export_tasks: | ||
FuturesUnordered<TaskHandle<Result<PartitionSnapshotMetadata, SnapshotError>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You could use SnapshotResultInternal
here.
if let Ok(result) = result { | ||
self.on_create_snapshot_task_completed(result); | ||
} else { | ||
debug!("Create snapshot task failed: {}", result.unwrap_err()); // shutting down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug!("Create snapshot task failed: {}", result.unwrap_err()); // shutting down | |
debug!("Create snapshot task failed: {}", result.unwrap_err()); |
if let Some(sender) = sender { | ||
entry.insert(sender); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if sender == None
? Will this mean that we create a snapshot task but don't remember that it is in progress?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now explicitly call with sender
= None
when requesting a snapshot from the PartitionProcessorManager
. We still have a handle to the task in snapshot_export_tasks
, but there's no one outside of the PPM to notify about the result. A oneshot channel is now only used to respond to a CreateSnapshot
RPC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't it then happen that we start multiple snapshot tasks for the same partition if snapshotting takes a long time, for example? I was under the impression that we wanted to allow only a single in-flight snapshot task per partition at any point in time to control the resource usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh, this was a silly bug to introduce at the last minute! Thanks for catching this.
async fn create_snapshot_inner( | ||
snapshot_id: SnapshotId, | ||
partition_id: PartitionId, | ||
partition_store_manager: PartitionStoreManager, | ||
snapshot_base_path: PathBuf, | ||
cluster_name: String, | ||
node_name: String, | ||
) -> Result<PartitionSnapshotMetadata, SnapshotError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a reason to make this function not a method of SnapshotPartitionTask
? If it were, then you wouldn't have to pass in all the parameters explicitly. Instead it could accept self
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a leftover from the earlier channels-based communication, will revisit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this and write metadata to methods, also got rid of some clones in the process! 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for putting up with all my requests @pcholakov. I think it looks really nice now. +1 for merging :-)
Not at all, @tillrohrmann - thanks for keeping the bar high and highlighting so many problems and improvements! 🔥 |
This change moves the responsibility for orchestrating snapshot creation to the PartitionProcessorManager, allowing the PartitionProcessor to be more focused on its core task of processing journal operations.
Based on: #2253