Skip to content

Commit

Permalink
Merge pull request #5044 from systeminit/fnichol/forklift-audit-inser…
Browse files Browse the repository at this point in the history
…t-concurrency

chore(forklift): add tuning for concurrent audit log database inserts
  • Loading branch information
fnichol authored Nov 29, 2024
2 parents 03ac579 + e201c54 commit af59e7c
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 6 deletions.
5 changes: 5 additions & 0 deletions lib/audit-logs/src/database/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ use si_data_pg::PgPoolConfig;
pub const DBNAME: &str = "si_audit";
const APPLICATION_NAME: &str = "si-audit";

const DEFAULT_INSERT_CONCURRENCY_LIMIT: usize = 64;

/// The configuration used for communicating with and setting up the audit database.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AuditDatabaseConfig {
/// The configuration for the PostgreSQL pool.
///
/// _Note:_ this is called "pg" for ease of use with layered load configuration files.
pub pg: PgPoolConfig,
/// The concurrency limit used when inserting events into the database store.
pub insert_concurrency_limit: usize,
}

impl Default for AuditDatabaseConfig {
Expand All @@ -22,6 +26,7 @@ impl Default for AuditDatabaseConfig {
application_name: APPLICATION_NAME.into(),
..Default::default()
},
insert_concurrency_limit: DEFAULT_INSERT_CONCURRENCY_LIMIT,
}
}
}
1 change: 0 additions & 1 deletion lib/forklift-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ impl Server {
jetstream_context.clone(),
DURABLE_CONSUMER_NAME.to_string(),
connection_metadata.clone(),
config.concurrency_limit(),
config.audit(),
token.clone(),
)
Expand Down
2 changes: 0 additions & 2 deletions lib/forklift-server/src/server/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ pub(crate) async fn audit_logs(
jetstream_context: Context,
durable_consumer_name: String,
connection_metadata: Arc<ConnectionMetadata>,
concurrency_limit: usize,
audit_database_config: &AuditDatabaseConfig,
token: CancellationToken,
) -> Result<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> {
Ok(audit_logs::build_and_run(
jetstream_context,
durable_consumer_name,
connection_metadata,
concurrency_limit,
audit_database_config,
token,
)
Expand Down
6 changes: 3 additions & 3 deletions lib/forklift-server/src/server/app/audit_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ pub(crate) async fn build_and_run(
jetstream_context: Context,
durable_consumer_name: String,
connection_metadata: Arc<ConnectionMetadata>,
concurrency_limit: usize,
audit_database_config: &AuditDatabaseConfig,
config: &AuditDatabaseConfig,
token: CancellationToken,
) -> Result<Box<dyn Future<Output = io::Result<()>> + Unpin + Send>> {
nats_dead_letter_queue::create_stream(&jetstream_context).await?;
Expand All @@ -93,7 +92,8 @@ pub(crate) async fn build_and_run(
.await?
};

let context = AuditDatabaseContext::from_config(audit_database_config).await?;
let concurrency_limit = config.insert_concurrency_limit;
let context = AuditDatabaseContext::from_config(config).await?;
let state = AppState::new(context, connection_metadata.subject_prefix().is_some());

// NOTE(nick,fletcher): the "NatsMakeSpan" builder defaults to "info" level logging. Bump it down, if needed.
Expand Down

0 comments on commit af59e7c

Please sign in to comment.