Skip to content

Commit

Permalink
Merge pull request #5035 from systeminit/nf/forklift-dead-letter-queue
Browse files Browse the repository at this point in the history
feat: add a NATS "dead letter queue" stream for failing messages
  • Loading branch information
fnichol authored Nov 27, 2024
2 parents f479b84 + a2cc8f8 commit f2d09cc
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 1 deletion.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"lib/forklift-server",
"lib/module-index-client",
"lib/module-index-server",
"lib/nats-dead-letter-queue",
"lib/nats-multiplexer",
"lib/nats-multiplexer-client",
"lib/nats-multiplexer-core",
Expand All @@ -37,9 +38,9 @@ members = [
"lib/naxum-api-types",
"lib/object-tree",
"lib/pending-events",
"lib/permissions",
"lib/pinga-core",
"lib/pinga-server",
"lib/permissions",
"lib/rebaser-client",
"lib/rebaser-core",
"lib/rebaser-server",
Expand Down
1 change: 1 addition & 0 deletions lib/forklift-server/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rust_library(
"//lib/billing-events:billing-events",
"//lib/buck2-resources:buck2-resources",
"//lib/data-warehouse-stream-client:data-warehouse-stream-client",
"//lib/nats-dead-letter-queue:nats-dead-letter-queue",
"//lib/naxum:naxum",
"//lib/si-data-nats:si-data-nats",
"//lib/si-events-rs:si-events",
Expand Down
1 change: 1 addition & 0 deletions lib/forklift-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ audit-logs = { path = "../../lib/audit-logs" }
billing-events = { path = "../../lib/billing-events" }
buck2-resources = { path = "../../lib/buck2-resources" }
data-warehouse-stream-client = { path = "../../lib/data-warehouse-stream-client" }
nats-dead-letter-queue = { path = "../../lib/nats-dead-letter-queue" }
naxum = { path = "../../lib/naxum" }
si-data-nats = { path = "../../lib/si-data-nats" }
si-events = { path = "../../lib/si-events-rs" }
Expand Down
12 changes: 12 additions & 0 deletions lib/forklift-server/src/server/app/audit_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::{
future::{Future, IntoFuture as _},
io,
sync::Arc,
time::Duration,
};

use app_state::AppState;
use audit_logs::{
database::{AuditDatabaseConfig, AuditDatabaseContext, AuditDatabaseContextError},
AuditLogsStream, AuditLogsStreamError,
};
use nats_dead_letter_queue::NatsDeadLetterQueueError;
use naxum::{
extract::MatchedSubject,
handler::Handler as _,
Expand Down Expand Up @@ -46,6 +48,8 @@ pub enum AuditLogsAppSetupError {
AuditDatabaseContext(#[from] AuditDatabaseContextError),
#[error("audit logs stream error: {0}")]
AuditLogsStream(#[from] AuditLogsStreamError),
#[error("failed to create dead letter stream: {0}")]
NatsDeadLetterQueue(#[from] NatsDeadLetterQueueError),
}

type Result<T> = std::result::Result<T, AuditLogsAppSetupError>;
Expand All @@ -65,6 +69,8 @@ pub(crate) async fn build_and_run(
audit_database_config: &AuditDatabaseConfig,
token: CancellationToken,
) -> Result<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> {
nats_dead_letter_queue::create_stream(&jetstream_context).await?;

let incoming = {
let stream = AuditLogsStream::get_or_create(jetstream_context).await?;
let consumer_subject = stream.consuming_subject_for_all_workspaces();
Expand All @@ -74,6 +80,12 @@ pub(crate) async fn build_and_run(
.create_consumer(async_nats::jetstream::consumer::pull::Config {
durable_name: Some(durable_consumer_name),
filter_subject: consumer_subject.into_string(),
max_deliver: 4,
backoff: vec![
Duration::from_secs(5),
Duration::from_secs(10),
Duration::from_secs(15),
],
..Default::default()
})
.await?
Expand Down
14 changes: 14 additions & 0 deletions lib/nats-dead-letter-queue/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@prelude-si//:macros.bzl", "rust_library")

rust_library(
name = "nats-dead-letter-queue",
deps = [
"//lib/si-data-nats:si-data-nats",
"//lib/telemetry-rs:telemetry",
"//third-party/rust:remain",
"//third-party/rust:thiserror",
],
srcs = glob([
"src/**/*.rs",
]),
)
15 changes: 15 additions & 0 deletions lib/nats-dead-letter-queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "nats-dead-letter-queue"
version.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
edition.workspace = true
rust-version.workspace = true
publish.workspace = true

[dependencies]
remain = { workspace = true }
si-data-nats = { path = "../../lib/si-data-nats" }
telemetry-nats = { path = "../../lib/telemetry-nats-rs" }
thiserror = { workspace = true }
59 changes: 59 additions & 0 deletions lib/nats-dead-letter-queue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use si_data_nats::{
async_nats::jetstream::{
context::CreateStreamError,
stream::{Config, RetentionPolicy},
},
jetstream::Context,
};
use thiserror::Error;

const STREAM_NAME: &str = "DEAD_LETTER_QUEUES";
const STREAM_DESCRIPTION: &str = "Dead Letter Queues";
// Subscribe to *all* stream and consumer max deliveries events. This subject is of the form:
// `$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<STREAM>.<CONSUMER>`
//
// See: https://docs.nats.io/running-a-nats-service/nats_admin/monitoring/monitoring_jetstream
const STREAM_SUBJECTS: &str = "$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.*.*";

#[allow(missing_docs)]
#[remain::sorted]
#[derive(Debug, Error)]
pub enum Error {
#[error("create stream error: {0}")]
CreateStream(#[from] CreateStreamError),
}

pub type NatsDeadLetterQueueError = Error;

type Result<T, E = Error> = std::result::Result<T, E>;

/// Ensures that the "dead letter queue" stream is created
pub async fn create_stream(context: &Context) -> Result<()> {
let prefix = context.metadata().subject_prefix();

context
.get_or_create_stream(Config {
name: prefixed_stream_name(prefix, STREAM_NAME),
description: Some(STREAM_DESCRIPTION.to_string()),
retention: RetentionPolicy::Limits,
subjects: vec![prefixed_subject(prefix, STREAM_SUBJECTS)],
..Default::default()
})
.await?;

Ok(())
}

fn prefixed_stream_name(prefix: Option<&str>, stream_name: &str) -> String {
match prefix {
Some(prefix) => format!("{prefix}_{stream_name}"),
None => stream_name.to_owned(),
}
}

fn prefixed_subject(prefix: Option<&str>, subject: &str) -> String {
match prefix {
Some(prefix) => format!("{prefix}.{subject}"),
None => subject.to_owned(),
}
}

0 comments on commit f2d09cc

Please sign in to comment.