Skip to content

Commit

Permalink
datalogger: basics of section handling & flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
gbin committed May 29, 2024
1 parent 9817ed4 commit 5bc34b5
Showing 1 changed file with 158 additions and 31 deletions.
189 changes: 158 additions & 31 deletions copper_datalogger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use bincode::Encode;
use bincode_derive::Decode as dDecode;
use bincode_derive::Encode as dEncode;

use bincode::config::Configuration;
use bincode::config::standard;
use memmap2::{MmapMut, RemapOptions};
use std::fs::{File, OpenOptions};
use std::io;
use std::path::Path;
use std::slice::from_raw_parts_mut;
use std::sync::{Arc, Mutex};

const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];

Expand All @@ -23,7 +25,7 @@ struct MainHeader {
}

#[derive(dEncode, dDecode, Copy, Clone)]
enum EntryType {
pub enum EntryType {
StructuredLogLine,
CopperList,
}
Expand All @@ -35,48 +37,99 @@ struct SectionHeader {
section_size: u32, // offset of section_magic + section_size -> should be the index of the next section_magic
}

pub struct DataLogger {
file: File,
mmap_buffer: MmapMut,
page_size: usize,
current_position: usize,
config: Configuration,
}

trait Stream<'a> {
pub trait Stream<'a> {
fn log(&'a mut self, obj: &impl Encode);
}

struct MmapStream<'a> {
entry_type: EntryType,
parent_logger: &'a mut DataLogger,
parent_logger: Arc<Mutex<DataLogger>>,
current_slice: &'a mut [u8],
current_position: usize,
minimum_allocation_amount: usize,
}

impl<'a> MmapStream<'a> {
fn new(
entry_type: EntryType,
parent_logger: Arc<Mutex<DataLogger>>,
current_slice: &'a mut [u8],
minimum_allocation_amount: usize,
) -> Self {
Self {
entry_type,
parent_logger,
current_slice,
current_position: 0,
minimum_allocation_amount,
}
}
}

impl<'a> Stream<'a> for MmapStream<'a> {
fn log(&'a mut self, obj: &impl Encode) {
let result = encode_into_slice(
obj,
&mut self.current_slice[self.current_position..],
self.parent_logger.config,
bincode::config::standard(),
);
match result {
Ok(nb_bytes) => {
self.current_position += nb_bytes;
}
Err(e) => match e {
EncodeError::UnexpectedEnd => {
self.parent_logger.add_section(self.entry_type, 1024);
let mut logger_guard = self.parent_logger.lock().unwrap();
// here compute with timing what should be the reasonable amount
let underlying_slice =
logger_guard.add_section(self.entry_type, self.minimum_allocation_amount);

// here we have the guarantee for exclusive access to that memory, the borrow checker cannot understand that ever.
self.current_slice = unsafe {
from_raw_parts_mut(underlying_slice.as_mut_ptr(), underlying_slice.len())
};
}
_ => panic!("Unexpected error while encoding object: {:?}", e),
},
}
}
}

impl Drop for MmapStream<'_> {
fn drop(&mut self) {
let mut logger_guard = self.parent_logger.lock().unwrap();
logger_guard.unlock_section(self.current_slice);
}
}

pub fn stream(
logger: Arc<Mutex<DataLogger>>,
entry_type: EntryType,
minimum_allocation_amount: usize,
) -> impl Stream<'static> {
let clone = logger.clone();
let mut logger = logger.lock().unwrap();
let underlying_slice = logger.add_section(entry_type, minimum_allocation_amount);
MmapStream::new(
entry_type,
clone,
// here we have the guarantee for exclusive access to that memory, the borrow checker cannot understand that ever.
unsafe { from_raw_parts_mut(underlying_slice.as_mut_ptr(), underlying_slice.len()) },
minimum_allocation_amount,
)
}

const DEFAULT_LOGGER_SIZE: usize = 1024 * 1024 * 1024; // 1GB

pub struct DataLogger {
file: File,
mmap_buffer: MmapMut,
page_size: usize,
current_global_position: usize,
sections_in_flight: Vec<usize>,
flushed_until: usize,
}

impl DataLogger {
pub fn new(file_path: &Path, preallocated_size: Option<usize>) -> std::io::Result<Self> {
let file = OpenOptions::new()
Expand All @@ -90,22 +143,22 @@ impl DataLogger {
} else {
file.set_len(DEFAULT_LOGGER_SIZE as u64)?;
}
let config = bincode::config::standard();
let mut mmap = unsafe { MmapMut::map_mut(&file)? };
let main_header = MainHeader {
magic: MAIN_MAGIC,
first_section_offset: page_size as u16,
};
let nb_bytes = encode_into_slice(&main_header, &mut mmap[..], config)
let nb_bytes = encode_into_slice(&main_header, &mut mmap[..], standard())
.expect("Failed to encode main header");
assert!(nb_bytes < page_size);

Ok(Self {
file,
mmap_buffer: mmap,
page_size,
current_position: page_size,
config,
current_global_position: page_size,
sections_in_flight: Vec::with_capacity(16),
flushed_until: 0,
})
}

Expand All @@ -124,14 +177,34 @@ impl DataLogger {
Ok(())
}

fn flush_until(&mut self, position: usize) {
self.mmap_buffer
.flush_async_range(self.flushed_until, position)
.expect("Failed to flush memory map");
self.flushed_until = position;
}

fn unlock_section(&mut self, section: &mut [u8]) {
let base = self.mmap_buffer.as_mut_ptr() as usize;
self.sections_in_flight
.retain(|&x| x != section.as_mut_ptr() as usize - base);
if self.sections_in_flight.is_empty() {
self.flush_until(self.current_global_position);
return;
}
if self.flushed_until < self.sections_in_flight[0] {
self.flush_until(self.sections_in_flight[0]);
}
}

/// The returned slice is section_size or greater.
pub fn add_section(&mut self, entry_type: EntryType, section_size: usize) -> &mut [u8] {
fn add_section(&mut self, entry_type: EntryType, section_size: usize) -> &mut [u8] {
// align current_position to the next page
self.current_position =
(self.current_position + self.page_size - 1) & !(self.page_size - 1);
self.current_global_position =
(self.current_global_position + self.page_size - 1) & !(self.page_size - 1);

// We have the assumption here that the section header fits into a page.
self.unsure_size(self.current_position + section_size + self.page_size)
self.unsure_size(self.current_global_position + section_size + self.page_size)
.expect("Failed to resize memory map");

let section_header = SectionHeader {
Expand All @@ -142,15 +215,20 @@ impl DataLogger {

let nb_bytes = encode_into_slice(
&section_header,
&mut self.mmap_buffer[self.current_position..],
self.config,
&mut self.mmap_buffer[self.current_global_position..],
standard(),
)
.expect("Failed to encode section header");
assert!(nb_bytes < self.page_size);
self.current_position += nb_bytes;
let end_of_section = self.current_position + section_size;
let user_buffer = &mut self.mmap_buffer[self.current_position..end_of_section];
self.current_position = end_of_section;

self.current_global_position += nb_bytes;
let end_of_section = self.current_global_position + section_size;
let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];

// save the position to keep track for in flight sections
self.sections_in_flight.push(self.current_global_position);
self.current_global_position = end_of_section;

user_buffer
}

Expand All @@ -161,14 +239,14 @@ impl DataLogger {

#[inline]
fn used(&self) -> usize {
self.current_position
self.current_global_position
}
}

impl Drop for DataLogger {
fn drop(&mut self) {
self.file
.set_len(self.current_position as u64)
.set_len(self.current_global_position as u64)
.expect("Failed to trim datalogger file");
}
}
Expand All @@ -177,6 +255,13 @@ impl Drop for DataLogger {
mod tests {
use super::*;
use tempfile::TempDir;
fn make_a_logger() -> Arc<Mutex<DataLogger>> {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let file_path = tmp_dir.path().join("test.bin");
Arc::new(Mutex::new(
DataLogger::new(&file_path, Some(100000)).expect("Failed to create logger"),
))
}

#[test]
fn test_truncation_and_sections_creations() {
Expand All @@ -197,8 +282,50 @@ mod tests {
let file = OpenOptions::new()
.read(true)
.open(file_path)
.expect("Could not reopne en file");
.expect("Could not reopen the file");
// Check if we have correctly truncated the file
assert_eq!(file.metadata().unwrap().len(), used as u64);
}

#[test]
fn test_one_section_self_cleaning() {
let logger = make_a_logger();
{
let stream = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
}
let lg = logger.lock().unwrap();
assert_eq!(lg.sections_in_flight.len(), 0);
assert_eq!(lg.flushed_until, lg.current_global_position);
}

#[test]
fn test_two_sections_self_cleaning_in_order() {
let logger = make_a_logger();
let s1 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 2);
drop(s2);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
drop(s1);
let lg = logger.lock().unwrap();
assert_eq!(lg.sections_in_flight.len(), 0);
assert_eq!(lg.flushed_until, lg.current_global_position);
}

#[test]
fn test_two_sections_self_cleaning_out_of_order() {
let logger = make_a_logger();
let s1 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream(logger.clone(), EntryType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 2);
drop(s1);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
drop(s2);
let lg = logger.lock().unwrap();
assert_eq!(lg.sections_in_flight.len(), 0);
assert_eq!(lg.flushed_until, lg.current_global_position);
}
}

0 comments on commit 5bc34b5

Please sign in to comment.