Skip to content

Commit

Permalink
Simplify snapshot task tracking, spawn tasks directly for periodic sn…
Browse files Browse the repository at this point in the history
…apshots
  • Loading branch information
pcholakov committed Nov 20, 2024
1 parent 5fa2226 commit d686e7e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 78 deletions.
118 changes: 53 additions & 65 deletions crates/worker/src/partition_processor_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,13 @@ pub struct PartitionProcessorManager {

asynchronous_operations: JoinSet<AsynchronousEvent>,

pending_snapshots: HashMap<PartitionId, PendingSnapshotTask>,
snapshot_export_tasks: FuturesUnordered<oneshot::Receiver<SnapshotResultInternal>>,
pending_snapshots: HashMap<PartitionId, oneshot::Sender<SnapshotResult>>,
snapshot_export_tasks:
FuturesUnordered<TaskHandle<Result<PartitionSnapshotMetadata, SnapshotError>>>,
}

type SnapshotResultInternal = Result<PartitionSnapshotMetadata, SnapshotError>;

/// Handle to an outstanding [`SnapshotPartitionTask`] that has been spawned, including a reference
/// to notify the requester.
pub struct PendingSnapshotTask {
pub handle: TaskHandle<()>,
pub sender: oneshot::Sender<SnapshotResult>,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Expand Down Expand Up @@ -284,8 +278,8 @@ impl PartitionProcessorManager {
}
_ = &mut shutdown => {
self.health_status.update(WorkerStatus::Unknown);
for task in self.pending_snapshots.values() {
task.handle.cancel();
for task in self.snapshot_export_tasks.iter() {
task.cancel();
}
return Ok(());
}
Expand Down Expand Up @@ -327,7 +321,8 @@ impl PartitionProcessorManager {
}
}

#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)))]
#[instrument(level = "debug", skip_all, fields(partition_id = %event.partition_id, event = %<&'static str as From<&EventKind>>::from(&event.inner)
))]
fn on_asynchronous_event(&mut self, event: AsynchronousEvent) {
let AsynchronousEvent {
partition_id,
Expand Down Expand Up @@ -619,7 +614,7 @@ impl PartitionProcessorManager {
processor_state.stop();
}
if self.pending_snapshots.contains_key(&partition_id) {
warn!(%partition_id, "Partition processor requested to stop while a snapshot task is still outstanding.");
info!(%partition_id, "Partition processor stop requested with snapshot task result outstanding.");
}
}
ProcessorCommand::Follower | ProcessorCommand::Leader => {
Expand Down Expand Up @@ -703,7 +698,7 @@ impl PartitionProcessorManager {
return;
}

self.spawn_create_snapshot_task(partition_id, sender);
self.spawn_create_snapshot_task(partition_id, Some(sender));
}

fn on_create_snapshot_task_completed(&mut self, result: SnapshotResultInternal) {
Expand All @@ -723,10 +718,8 @@ impl PartitionProcessorManager {
Err(snapshot_error) => (snapshot_error.partition_id(), Err(snapshot_error)),
};

if let Some(pending) = self.pending_snapshots.remove(&partition_id) {
let _ = pending.sender.send(response);
} else {
warn!("Snapshot task result received, but there was no pending sender found!")
if let Some(sender) = self.pending_snapshots.remove(&partition_id) {
let _ = sender.send(response);
}
}

Expand All @@ -741,61 +734,54 @@ impl PartitionProcessorManager {
return;
};

for (partition_id, state) in self.processor_states.iter() {
let status = state.partition_processor_status();
match status {
Some(status)
if status.effective_mode == RunMode::Leader
&& status.replay_status == ReplayStatus::Active
&& status.last_applied_log_lsn.unwrap_or(Lsn::INVALID)
>= status
.last_archived_log_lsn
.unwrap_or(Lsn::OLDEST)
.add(Lsn::from(records_per_snapshot.get())) =>
{
debug!(
%partition_id,
last_archived_lsn = %status.last_archived_log_lsn.unwrap_or(SequenceNumber::OLDEST),
last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID),
"Creating partition snapshot",
);
let (tx, _) = oneshot::channel();

// ignore errors and don't request further snapshots if internal queue is full; we will try again later
if self
.tx
.try_send(ProcessorsManagerCommand::CreateSnapshot(*partition_id, tx))
.is_err()
{
break;
}
}
_ => {
continue;
}
}
let snapshot_partitions: Vec<_> = self
.processor_states
.iter()
.filter_map(|(partition_id, state)| {
state
.partition_processor_status()
.map(|status| (*partition_id, status))
})
.filter(|(_, status)| {
status.effective_mode == RunMode::Leader
&& status.replay_status == ReplayStatus::Active
&& status.last_applied_log_lsn.unwrap_or(Lsn::INVALID)
>= status
.last_archived_log_lsn
.unwrap_or(Lsn::OLDEST)
.add(Lsn::from(records_per_snapshot.get()))
})
.collect();

for (partition_id, status) in snapshot_partitions {
debug!(
%partition_id,
last_archived_lsn = %status.last_archived_log_lsn.unwrap_or(SequenceNumber::OLDEST),
last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID),
"Requesting partition snapshot",
);
self.spawn_create_snapshot_task(partition_id, None);
}
}

/// Spawn a task to create a snapshot of the given partition. Optionally, a sender will be
/// notified of the result on completion.
fn spawn_create_snapshot_task(
&mut self,
partition_id: PartitionId,
sender: oneshot::Sender<SnapshotResult>,
sender: Option<oneshot::Sender<SnapshotResult>>,
) {
if let Entry::Vacant(entry) = self.pending_snapshots.entry(partition_id) {
let config = self.updateable_config.live_load();

let snapshot_base_path = config.worker.snapshots.snapshots_dir(partition_id);

let snapshot_id = SnapshotId::new();
let (snapshot_metadata_tx, snapshot_metadata_rx) = oneshot::channel();

let create_snapshot_task = SnapshotPartitionTask {
snapshot_id,
partition_id,
partition_store_manager: self.partition_store_manager.clone(),
snapshot_base_path,
result_sender: snapshot_metadata_tx,
partition_store_manager: self.partition_store_manager.clone(),
cluster_name: config.common.cluster_name().into(),
node_name: config.common.node_name().into(),
};
Expand All @@ -809,19 +795,21 @@ impl PartitionProcessorManager {

match spawn_task_result {
Ok(handle) => {
self.snapshot_export_tasks.push(snapshot_metadata_rx);
entry.insert(PendingSnapshotTask { handle, sender });
self.snapshot_export_tasks.push(handle);
if let Some(sender) = sender {
entry.insert(sender);
}
}
Err(_) => {
sender
.send(Err(SnapshotError::InvalidState(partition_id)))
.ok();
Err(_shutdown) => {
if let Some(sender) = sender {
let _ = sender.send(Err(SnapshotError::InvalidState(partition_id)));
}
}
}
} else if let Some(sender) = sender {
let _ = sender.send(Err(SnapshotError::SnapshotInProgress(partition_id)));
} else {
sender
.send(Err(SnapshotError::SnapshotInProgress(partition_id)))
.ok();
warn!(%partition_id, "Snapshot task not started: another snapshot is already in progress")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ impl ProcessorState {
}

/// The Partition Processor is in a state in which it is acceptable to create and publish
/// snapshots. Since we don't want newer snapshots to move backwards in applied LSN, the current
/// implementation checks whether the processor is fully caught up with the log.
/// snapshots. Since we generally don't want newer snapshots to move backwards in applied LSN,
/// the current implementation checks whether the processor is fully caught up with the log.
pub fn should_publish_snapshots(&self) -> bool {
match self {
ProcessorState::Started {
Expand Down
17 changes: 6 additions & 11 deletions crates/worker/src/partition_processor_manager/snapshot_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use std::path::PathBuf;
use std::time::SystemTime;

use tokio::sync::oneshot;
use tracing::{debug, instrument, warn};

use restate_core::worker_api::SnapshotError;
Expand All @@ -27,14 +26,13 @@ pub struct SnapshotPartitionTask {
pub partition_id: PartitionId,
pub snapshot_base_path: PathBuf,
pub partition_store_manager: PartitionStoreManager,
pub result_sender: oneshot::Sender<Result<PartitionSnapshotMetadata, SnapshotError>>,
pub cluster_name: String,
pub node_name: String,
}

impl SnapshotPartitionTask {
#[instrument(level = "info", skip_all, fields(snapshot_id = %self.snapshot_id, partition_id = %self.partition_id))]
pub async fn run(self) {
pub async fn run(self) -> Result<PartitionSnapshotMetadata, SnapshotError> {
debug!("Creating partition snapshot");

let result = create_snapshot_inner(
Expand All @@ -47,19 +45,16 @@ impl SnapshotPartitionTask {
)
.await;

let _ = self.result_sender.send(match result {
Ok(metadata) => {
result
.inspect(|metadata| {
debug!(
archived_lsn = %metadata.min_applied_lsn,
"Partition snapshot created"
);
Ok(metadata)
}
Err(err) => {
})
.inspect_err(|err| {
warn!("Failed to create partition snapshot: {}", err);
Err(err)
}
});
})
}
}

Expand Down

0 comments on commit d686e7e

Please sign in to comment.