Skip to content

Commit

Permalink
Incomplete attempt at reading from audit database
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Gerace <[email protected]>
  • Loading branch information
nickgerace committed Dec 2, 2024
1 parent f96e434 commit f1364cb
Show file tree
Hide file tree
Showing 24 changed files with 617 additions and 433 deletions.
2 changes: 1 addition & 1 deletion .ci/docker-compose.test-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ services:
- "PGPASSWORD=bugbear"
- "POSTGRES_USER=si_test"
- "POSTGRES_DB=si_test"
- "POSTGRES_MULTIPLE_DBS=si_test_dal,si_test_sdf_server,si_test_layer_db"
- "POSTGRES_MULTIPLE_DBS=si_test_dal,si_test_sdf_server,si_test_layer_db,si_test_audit"
command:
- "-c"
- "fsync=off"
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

254 changes: 185 additions & 69 deletions lib/audit-logs/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
//! Contains functionality for setting up and communicating with the audit database.
use std::str::FromStr;

use chrono::DateTime;
use chrono::Utc;
use serde::Deserialize;
use serde::Serialize;
use si_data_pg::PgError;
use si_data_pg::PgPoolError;
use si_data_pg::PgRow;
use si_events::audit_log::AuditLogKind;
use si_events::audit_log::AuditLogMetadata;
use si_events::ulid;
use si_events::Actor;
use si_events::ChangeSetId;
use si_events::UserPk;
use si_events::WorkspacePk;
use telemetry::prelude::*;
use thiserror::Error;
Expand All @@ -33,77 +40,186 @@ pub enum AuditDatabaseError {
PgPool(#[from] PgPoolError),
#[error("serde json error: {0}")]
SerdeJson(#[from] serde_json::Error),
#[error("ulid decode error: {0}")]
UlidDecode(#[from] ulid::DecodeError),
}

type Result<T> = std::result::Result<T, AuditDatabaseError>;

#[allow(clippy::too_many_arguments, missing_docs)]
#[instrument(
name = "audit_log.insert",
level = "debug",
skip_all,
fields(
si.workspace.id = %workspace_id,
),
)]
pub async fn insert(
context: &AuditDatabaseContext,
workspace_id: WorkspacePk,
kind: AuditLogKind,
timestamp: String,
change_set_id: Option<ChangeSetId>,
actor: Actor,
entity_name: Option<String>,
) -> Result<()> {
let kind_as_string = kind.to_string();
let user_id = match actor {
Actor::System => None,
Actor::User(user_id) => Some(user_id),
};

let metadata = AuditLogMetadata::from(kind);
let (title, entity_type) = metadata.title_and_entity_type();
let serialized_metadata = serde_json::to_value(metadata)?;
let timestamp: DateTime<Utc> = timestamp.parse()?;

context
.pg_pool()
.get()
.await?
.query_one(
"INSERT INTO audit_logs (
workspace_id,
kind,
timestamp,
title,
change_set_id,
user_id,
entity_name,
entity_type,
metadata
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9
) RETURNING *",
&[
&workspace_id,
&kind_as_string,
&timestamp,
&title,
&change_set_id.map(|id| id.to_string()),
&user_id.map(|id| id.to_string()),
&entity_name,
&entity_type,
&serialized_metadata,
],
)
.await?;
Ok(())
/// A row in the audit logs table of the audit database.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AuditLogRow {
/// Indicates the workspace that the row belongs to.
pub workspace_id: WorkspacePk,
/// The [kind](AuditLogKind) of the [`AuditLog`] (converted into a string because enum discriminants are not
/// serializable).
pub kind: String,
/// The timestamp that can be used in ISO RFC 3339 format.
pub timestamp: DateTime<Utc>,
/// The title of the [`AuditLog`]. It will likely be combined with the `entity_type` to make a full display name.
pub title: String,
/// The identifier of the change set, which will only be empty for actions taken outside of the workspace.
pub change_set_id: Option<ChangeSetId>,
/// The identifier of the user. If this is empty, it is the system user.
pub user_id: Option<UserPk>,
/// The entity name.
pub entity_name: Option<String>,
/// The entity type.
pub entity_type: Option<String>,
/// Serialized version of [`AuditLogMetadata`](si_events::audit_log::AuditLogMetadata), which is an
/// untagged version of the specific [`AuditLogKind`](si_events::audit_log::AuditLogKind).
pub metadata: Option<serde_json::Value>,
}

impl AuditLogRow {
/// Inserts a new row into the audit logs table of the audit database.
#[allow(clippy::too_many_arguments)]
#[instrument(
name = "audit_log.database.insert",
level = "debug",
skip_all,
fields(
si.workspace.id = %workspace_id,
),
)]
pub async fn insert(
context: &AuditDatabaseContext,
workspace_id: WorkspacePk,
kind: AuditLogKind,
timestamp: String,
change_set_id: Option<ChangeSetId>,
actor: Actor,
entity_name: Option<String>,
) -> Result<()> {
let kind_as_string = kind.to_string();
let user_id = match actor {
Actor::System => None,
Actor::User(user_id) => Some(user_id),
};

let metadata = AuditLogMetadata::from(kind);
let (title, entity_type) = metadata.title_and_entity_type();
let serialized_metadata = serde_json::to_value(metadata)?;
let timestamp: DateTime<Utc> = timestamp.parse()?;

context
.pg_pool()
.get()
.await?
.query_one(
"INSERT INTO audit_logs (
workspace_id,
kind,
timestamp,
title,
change_set_id,
user_id,
entity_name,
entity_type,
metadata
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9
) RETURNING *",
&[
&workspace_id.to_string(),
&kind_as_string,
&timestamp,
&title,
&change_set_id.map(|id| id.to_string()),
&user_id.map(|id| id.to_string()),
&entity_name,
&entity_type,
&serialized_metadata,
],
)
.await?;
Ok(())
}

/// Lists rows of the audit logs table in the audit database.
#[instrument(
name = "audit_log.database.list",
level = "debug",
skip_all,
fields(
si.workspace.id = %workspace_id,
),
)]
pub async fn list(
context: &AuditDatabaseContext,
workspace_id: WorkspacePk,
change_set_ids: Vec<ChangeSetId>,
size: usize,
) -> Result<(Vec<Self>, bool)> {
let size = size as i64;
let change_set_ids: Vec<String> = change_set_ids.iter().map(|id| id.to_string()).collect();

let client = context.pg_pool().get().await?;
let row = client
.query_one(
"SELECT COUNT(*) from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2)",
&[&workspace_id, &change_set_ids],
)
.await?;
let count: i64 = row.try_get("count")?;
let can_load_more = count > size;

let mut result = Vec::new();
let rows = client
.query(
"SELECT * from audit_logs WHERE workspace_id = $1 AND change_set_id = ANY($2) ORDER BY timestamp DESC LIMIT $3",
&[&workspace_id, &change_set_ids, &size],
)
.await?;
for row in rows {
result.push(Self::try_from(row)?);
}

Ok((result, can_load_more))
}
}

impl TryFrom<PgRow> for AuditLogRow {
type Error = AuditDatabaseError;

fn try_from(value: PgRow) -> std::result::Result<Self, Self::Error> {
let workspace_id = {
let inner: String = value.try_get("workspace_id")?;
WorkspacePk::from_str(&inner)?
};
let change_set_id = {
let maybe_inner: Option<String> = value.try_get("change_set_id")?;
match maybe_inner {
Some(inner) => Some(ChangeSetId::from_str(&inner)?),
None => None,
}
};
let user_id = {
let maybe_inner: Option<String> = value.try_get("user_id")?;
match maybe_inner {
Some(inner) => Some(UserPk::from_str(&inner)?),
None => None,
}
};

Ok(Self {
workspace_id,
kind: value.try_get("kind")?,
timestamp: value.try_get("timestamp")?,
title: value.try_get("title")?,
change_set_id,
user_id,
entity_name: value.try_get("entity_name")?,
entity_type: value.try_get("entity_type")?,
metadata: value.try_get("metadata")?,
})
}
}
1 change: 1 addition & 0 deletions lib/audit-logs/src/database/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use si_data_pg::PgPoolConfig;

/// The name of the audit database.
pub const DBNAME: &str = "si_audit";
// pub const DBNAME: &str = "si_test_audit";
const APPLICATION_NAME: &str = "si-audit";

const DEFAULT_INSERT_CONCURRENCY_LIMIT: usize = 64;
Expand Down
2 changes: 2 additions & 0 deletions lib/dal-test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@prelude-si//:macros.bzl", "rust_library")
rust_library(
name = "dal-test",
deps = [
"//lib/audit-logs:audit-logs",
"//lib/buck2-resources:buck2-resources",
"//lib/dal:dal",
"//lib/forklift-server:forklift-server",
Expand All @@ -12,6 +13,7 @@ rust_library(
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
"//lib/si-events-rs:si-events",
"//lib/si-layer-cache:si-layer-cache",
"//lib/si-pkg:si-pkg",
"//lib/si-runtime-rs:si-runtime",
Expand Down
2 changes: 2 additions & 0 deletions lib/dal-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish.workspace = true

[dependencies]
async-recursion = { workspace = true }
audit-logs = { path = "../../lib/audit-logs" }
base64 = { workspace = true }
buck2-resources = { path = "../../lib/buck2-resources" }
color-eyre = { workspace = true }
Expand All @@ -31,6 +32,7 @@ serde_json = { workspace = true }
si-crypto = { path = "../../lib/si-crypto" }
si-data-nats = { path = "../../lib/si-data-nats" }
si-data-pg = { path = "../../lib/si-data-pg" }
si-events = { path = "../../lib/si-events-rs" }
si-layer-cache = { path = "../../lib/si-layer-cache" }
si-pkg = { path = "../../lib/si-pkg" }
si-runtime = { path = "../../lib/si-runtime-rs" }
Expand Down
30 changes: 30 additions & 0 deletions lib/dal-test/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::time::Duration;

use audit_logs::database::{AuditDatabaseContext, AuditLogRow};
use color_eyre::eyre::eyre;
use color_eyre::Result;
use dal::component::socket::{ComponentInputSocket, ComponentOutputSocket};
Expand Down Expand Up @@ -409,3 +410,32 @@ pub async fn confirm_jetstream_stream_has_no_messages(
"hit timeout and stream still has at least one message: {message_count}"
))
}

/// Retries listing audit logs until the expected number of rows are returned.
pub async fn list_audit_logs_until_expected_number_of_rows(
ctx: &DalContext,
context: &AuditDatabaseContext,
size: usize,
expected_number_of_rows: usize,
timeout_seconds: u64,
interval_milliseconds: u64,
) -> Result<Vec<AuditLogRow>> {
let timeout = Duration::from_secs(timeout_seconds);
let interval = Duration::from_millis(interval_milliseconds);

let start = Instant::now();
let mut actual_number_of_rows = 0;

while start.elapsed() < timeout {
let (audit_logs, _) = audit_logging::list(ctx, context, size).await?;
actual_number_of_rows = audit_logs.len();
if actual_number_of_rows == expected_number_of_rows {
return Ok(audit_logs);
}
tokio::time::sleep(interval).await;
}

Err(eyre!(
"hit timeout before audit logs query returns expected number of rows (expected: {expected_number_of_rows}, actual: {actual_number_of_rows})"
))
}
Loading

0 comments on commit f1364cb

Please sign in to comment.