Skip to content

Commit

Permalink
Refactor request log
Browse files Browse the repository at this point in the history
  • Loading branch information
bubelov committed Nov 4, 2024
1 parent 17ec46f commit 4952ed2
Showing 1 changed file with 90 additions and 84 deletions.
174 changes: 90 additions & 84 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ use rusqlite::{named_params, Connection, OptionalExtension, Row};
use std::time::Instant;
use time::OffsetDateTime;

#[allow(dead_code)]
struct UsageLog {
thread_local! {
static CONN: Connection = open_conn().unwrap_or_else(|e| {
eprintln!("Failed to open logger connection: {e}");
std::process::exit(1)
});
}

struct LogEntry {
id: i64,
date: String,
ip: String,
endpoint: String,
reqests: i64,
entities: i64,
time_ms: i64,
time_ns: i64,
}

pub struct RequestExtension {
Expand Down Expand Up @@ -49,115 +52,118 @@ pub async fn log(
let endpoint = extension.endpoint.clone();
let entities = extension.entities;
drop(extensions);
let time_ms = Instant::now().duration_since(started_at).as_millis() as i64;
log_sync_api_request(res.request(), &endpoint, entities, time_ms)?;
let time_ns = Instant::now().duration_since(started_at).as_nanos();
_log(res.request(), &endpoint, entities, time_ns as i64)?;
Ok(res)
}

thread_local! {
static CONN: Connection = open_conn().unwrap();
}

fn log_sync_api_request(
req: &HttpRequest,
endpoint_id: &str,
entities: i64,
time_ms: i64,
) -> Result<()> {
fn _log(req: &HttpRequest, endpoint_id: &str, entities: i64, time_ns: i64) -> Result<()> {
let conn_info = req.connection_info();
let Some(addr) = conn_info.realip_remote_addr() else {
return Ok(());
};
let today = OffsetDateTime::now_utc().date().to_string();
CONN.with(|conn| {
match select_usage_log(&today, &addr, endpoint_id, &conn)? {
Some(log) => update_usage_log(
log.id,
log.reqests + 1,
log.entities + entities,
log.time_ms + time_ms,
match select_entry(&today, &addr, endpoint_id, &conn)? {
Some(entry) => update_entry(
entry.id,
entry.reqests + 1,
entry.entities + entities,
entry.time_ns + time_ns,
&conn,
),
None => insert_usage_log(&today, &addr, endpoint_id, 1, entities, time_ms, &conn),
None => insert_entry(&today, &addr, endpoint_id, 1, entities, time_ns, &conn),
}?;
Ok(())
})
}

const TABLE_NAME: &str = "summary";
const COL_ID: &str = "id";
const COL_DATE: &str = "date";
const COL_IP: &str = "ip";
const COL_ENDPOINT: &str = "endpoint";
const COL_REQUESTS: &str = "requests";
const COL_ENTITIES: &str = "entities";
const COL_TIME_NS: &str = "time_ns";

pub fn open_conn() -> Result<Connection> {
let conn = Connection::open(data_dir_file("firewall-v1.db")?)?;
let conn = Connection::open(data_dir_file("log.db")?)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.execute(
let query = format!(
r#"
CREATE TABLE IF NOT EXISTS usage_log (
id INTEGER PRIMARY KEY NOT NULL,
date TEXT NOT NULL,
ip TEXT NOT NULL,
endpoint TEXT NOT NULL,
requests INTEGER NOT NULL,
entities INTEGER NOT NULL,
time_ms INTEGER NOT NULL
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
{COL_ID} INTEGER PRIMARY KEY NOT NULL,
{COL_DATE} TEXT NOT NULL,
{COL_IP} TEXT NOT NULL,
{COL_ENDPOINT} TEXT NOT NULL,
{COL_REQUESTS} INTEGER NOT NULL,
{COL_ENTITIES} INTEGER NOT NULL,
{COL_TIME_NS} INTEGER NOT NULL
) STRICT;
CREATE INDEX IF NOT EXISTS usage_log_date_ip_endpoint ON usage_log(date, ip, endpoint);
"#,
[],
)?;
CREATE UNIQUE INDEX IF NOT EXISTS {COL_DATE}_{COL_IP}_{COL_ENDPOINT} ON {TABLE_NAME}({COL_DATE}, {COL_IP}, {COL_ENDPOINT});
"#
);
conn.execute(&query, [])?;
Ok(conn)
}

fn insert_usage_log(
fn insert_entry(
date: &str,
ip: &str,
endpoint: &str,
requests: i64,
entities: i64,
time_ms: i64,
time_ns: i64,
conn: &Connection,
) -> Result<()> {
conn.execute(
let query = format!(
r#"
INSERT INTO usage_log (
date,
ip,
endpoint,
requests,
entities,
time_ms
) VALUES (
:date,
:ip,
:endpoint,
:requests,
:entities,
:time_ms
);
"#,
INSERT INTO {TABLE_NAME} (
{COL_DATE},
{COL_IP},
{COL_ENDPOINT},
{COL_REQUESTS},
{COL_ENTITIES},
{COL_TIME_NS}
) VALUES (
:{COL_DATE},
:{COL_IP},
:{COL_ENDPOINT},
:{COL_REQUESTS},
:{COL_ENTITIES},
:{COL_TIME_NS}
);
"#
);
conn.execute(
&query,
named_params! {
":date": date,
":ip": ip,
":endpoint": endpoint,
":requests": requests,
":entities": entities,
":time_ms": time_ms,
":time_ns": time_ns,
},
)?;
Ok(())
}

fn select_usage_log(
fn select_entry(
date: &str,
ip: &str,
endpoint: &str,
conn: &Connection,
) -> Result<Option<UsageLog>> {
let mut stmt = conn.prepare(
) -> Result<Option<LogEntry>> {
let mut stmt = conn.prepare(&format!(
r#"
SELECT id, date, ip, endpoint, requests, entities, time_ms
FROM usage_log
WHERE date = :date AND ip = :ip and endpoint = :endpoint
"#,
)?;
SELECT {COL_ID}, {COL_REQUESTS}, {COL_ENTITIES}, {COL_TIME_NS}
FROM {TABLE_NAME}
WHERE {COL_DATE} = :{COL_DATE} AND {COL_IP} = :{COL_IP} AND {COL_ENDPOINT} = :{COL_ENDPOINT}
"#
))?;
let res = stmt
.query_row(
named_params! {
Expand All @@ -171,39 +177,39 @@ fn select_usage_log(
Ok(res)
}

fn update_usage_log(
fn update_entry(
id: i64,
requests: i64,
entities: i64,
time_ms: i64,
time_ns: i64,
conn: &Connection,
) -> Result<()> {
conn.execute(
let query = format!(
r#"
UPDATE usage_log
SET requests = :requests, entities = :entities, time_ms = :time_ms
WHERE id = :id;
"#,
UPDATE {TABLE_NAME}
SET {COL_REQUESTS} = :{COL_REQUESTS}, {COL_ENTITIES} = :{COL_ENTITIES}, {COL_TIME_NS} = :{COL_TIME_NS}
WHERE {COL_ID} = :{COL_ID};
"#
);
conn.execute(
&query,
named_params! {
":id": id,
":requests": requests,
":entities": entities,
":time_ms": time_ms,
":time_ns": time_ns,
},
)?;
Ok(())
}

const fn mapper() -> fn(&Row) -> rusqlite::Result<UsageLog> {
|row: &Row| -> rusqlite::Result<UsageLog> {
Ok(UsageLog {
const fn mapper() -> fn(&Row) -> rusqlite::Result<LogEntry> {
|row: &Row| -> rusqlite::Result<LogEntry> {
Ok(LogEntry {
id: row.get(0)?,
date: row.get(1)?,
ip: row.get(2)?,
endpoint: row.get(3)?,
reqests: row.get(4)?,
entities: row.get(5)?,
time_ms: row.get(6)?,
reqests: row.get(1)?,
entities: row.get(2)?,
time_ns: row.get(3)?,
})
}
}

0 comments on commit 4952ed2

Please sign in to comment.