Skip to content

Commit

Permalink
Refactor request log
Browse files Browse the repository at this point in the history
  • Loading branch information
bubelov committed Nov 5, 2024
1 parent 4952ed2 commit 7451da7
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 92 deletions.
81 changes: 81 additions & 0 deletions src/log/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use super::summary;
use crate::{data_dir_file, Result};
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
middleware::Next,
Error, HttpMessage, HttpRequest,
};
use rusqlite::Connection;
use std::time::Instant;
use time::OffsetDateTime;

thread_local! {
static CONN: Connection = open_conn().unwrap_or_else(|e| {
eprintln!("Failed to open logger connection: {e}");
std::process::exit(1)
});
}

fn open_conn() -> Result<Connection> {
let conn = Connection::open(data_dir_file("log.db")?)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
summary::init(&conn)?;
Ok(conn)
}

pub struct RequestExtension {
pub endpoint: String,
pub entities: i64,
}

impl RequestExtension {
pub fn new(endpoint: &str, entities: i64) -> Self {
RequestExtension {
endpoint: endpoint.into(),
entities,
}
}
}

pub async fn handle_request(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
let started_at = Instant::now();
let res = next.call(req).await;
let Ok(res) = res else { return res };
let extensions = res.request().extensions();
let Some(extension) = extensions.get::<RequestExtension>() else {
drop(extensions);
return Ok(res);
};
let endpoint = extension.endpoint.clone();
let entities = extension.entities;
drop(extensions);
let time_ns = Instant::now().duration_since(started_at).as_nanos();
log_summary(res.request(), &endpoint, entities, time_ns as i64)?;
Ok(res)
}

fn log_summary(req: &HttpRequest, endpoint_id: &str, entities: i64, time_ns: i64) -> Result<()> {
let conn_info = req.connection_info();
let Some(addr) = conn_info.realip_remote_addr() else {
return Ok(());
};
let today = OffsetDateTime::now_utc().date().to_string();
CONN.with(|conn| {
match summary::select(&today, &addr, endpoint_id, &conn)? {
Some(entry) => summary::update(
entry.id,
entry.reqests + 1,
entry.entities + entities,
entry.time_ns + time_ns,
&conn,
),
None => summary::insert(&today, &addr, endpoint_id, 1, entities, time_ns, &conn),
}?;
Ok(())
})
}
4 changes: 4 additions & 0 deletions src/log/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod middleware;
pub use middleware::handle_request as middleware;
pub use middleware::RequestExtension;
mod summary;
104 changes: 13 additions & 91 deletions src/log.rs → src/log/summary.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,11 @@
use crate::{data_dir_file, Result};
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
middleware::Next,
Error, HttpMessage, HttpRequest,
};
use crate::Result;
use rusqlite::{named_params, Connection, OptionalExtension, Row};
use std::time::Instant;
use time::OffsetDateTime;

thread_local! {
static CONN: Connection = open_conn().unwrap_or_else(|e| {
eprintln!("Failed to open logger connection: {e}");
std::process::exit(1)
});
}

struct LogEntry {
id: i64,
reqests: i64,
entities: i64,
time_ns: i64,
}

pub struct RequestExtension {
pub endpoint: String,
pub struct Summary {
pub id: i64,
pub reqests: i64,
pub entities: i64,
}

impl RequestExtension {
pub fn new(endpoint: &str, entities: i64) -> Self {
RequestExtension {
endpoint: endpoint.into(),
entities,
}
}
}

pub async fn log(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
let started_at = Instant::now();
let res = next.call(req).await;
let Ok(res) = res else { return res };
let extensions = res.request().extensions();
let Some(extension) = extensions.get::<RequestExtension>() else {
drop(extensions);
return Ok(res);
};
let endpoint = extension.endpoint.clone();
let entities = extension.entities;
drop(extensions);
let time_ns = Instant::now().duration_since(started_at).as_nanos();
_log(res.request(), &endpoint, entities, time_ns as i64)?;
Ok(res)
}

fn _log(req: &HttpRequest, endpoint_id: &str, entities: i64, time_ns: i64) -> Result<()> {
let conn_info = req.connection_info();
let Some(addr) = conn_info.realip_remote_addr() else {
return Ok(());
};
let today = OffsetDateTime::now_utc().date().to_string();
CONN.with(|conn| {
match select_entry(&today, &addr, endpoint_id, &conn)? {
Some(entry) => update_entry(
entry.id,
entry.reqests + 1,
entry.entities + entities,
entry.time_ns + time_ns,
&conn,
),
None => insert_entry(&today, &addr, endpoint_id, 1, entities, time_ns, &conn),
}?;
Ok(())
})
pub time_ns: i64,
}

const TABLE_NAME: &str = "summary";
Expand All @@ -87,10 +17,7 @@ const COL_REQUESTS: &str = "requests";
const COL_ENTITIES: &str = "entities";
const COL_TIME_NS: &str = "time_ns";

pub fn open_conn() -> Result<Connection> {
let conn = Connection::open(data_dir_file("log.db")?)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
pub fn init(conn: &Connection) -> Result<()> {
let query = format!(
r#"
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
Expand All @@ -106,10 +33,10 @@ pub fn open_conn() -> Result<Connection> {
"#
);
conn.execute(&query, [])?;
Ok(conn)
Ok(())
}

fn insert_entry(
pub fn insert(
date: &str,
ip: &str,
endpoint: &str,
Expand Down Expand Up @@ -151,12 +78,7 @@ fn insert_entry(
Ok(())
}

fn select_entry(
date: &str,
ip: &str,
endpoint: &str,
conn: &Connection,
) -> Result<Option<LogEntry>> {
pub fn select(date: &str, ip: &str, endpoint: &str, conn: &Connection) -> Result<Option<Summary>> {
let mut stmt = conn.prepare(&format!(
r#"
SELECT {COL_ID}, {COL_REQUESTS}, {COL_ENTITIES}, {COL_TIME_NS}
Expand All @@ -177,7 +99,7 @@ fn select_entry(
Ok(res)
}

fn update_entry(
pub fn update(
id: i64,
requests: i64,
entities: i64,
Expand All @@ -203,9 +125,9 @@ fn update_entry(
Ok(())
}

const fn mapper() -> fn(&Row) -> rusqlite::Result<LogEntry> {
|row: &Row| -> rusqlite::Result<LogEntry> {
Ok(LogEntry {
const fn mapper() -> fn(&Row) -> rusqlite::Result<Summary> {
|row: &Row| -> rusqlite::Result<Summary> {
Ok(Summary {
id: row.get(0)?,
reqests: row.get(1)?,
entities: row.get(2)?,
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn main() -> Result<()> {

HttpServer::new(move || {
App::new()
.wrap(from_fn(log::log))
.wrap(from_fn(log::middleware))
.wrap(NormalizePath::trim())
.wrap(Compress::default())
.app_data(Data::new(pool.clone()))
Expand Down

0 comments on commit 7451da7

Please sign in to comment.