Skip to content

Commit

Permalink
Merge pull request #422 from knowsys/issue-417
Browse files Browse the repository at this point in the history
Fix for issue #417
  • Loading branch information
mkroetzsch authored Nov 22, 2023
2 parents 6cf2200 + 5350a87 commit 97c0513
Showing 1 changed file with 82 additions and 41 deletions.
123 changes: 82 additions & 41 deletions nemo-physical/src/dictionary/hash_map_dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use super::{AddResult, Dictionary, DictionaryString};

use std::{
cell::UnsafeCell,
collections::HashMap,
fmt::Display,
hash::{Hash, Hasher},
sync::atomic::{AtomicBool, Ordering},
};

use std::sync::atomic::{AtomicBool, Ordering};

/// Global string buffer for dictionary data.
/// This is global here to allow keys in the hashmap to access it for computing equality and hashes,
/// without the need to re-implement the whole hashmap to inject such an object.
static mut BUFFER: StringBuffer = StringBuffer::new();
// The following code is needed if allocations are done while constructing [StringBuffer]:
//use once_cell::sync::Lazy;
//static mut BUFFER: Lazy<StringBuffer> = Lazy::new(||StringBuffer::new());
// use once_cell::sync::Lazy;
// static mut BUFFER: Lazy<StringBuffer> = Lazy::new(||StringBuffer::new());

/// Address size of pages in the string buffer
const PAGE_ADDR_BITS: usize = 25; // 32MB
Expand All @@ -23,25 +23,42 @@ const PAGE_SIZE: usize = 1 << PAGE_ADDR_BITS;
/// Bit mask that keeps only the (lower) PAGE_ADDR_BITS-1 bits, for extracting a string's length
const LENGTH_BITS_MASK: u64 = (1 << (PAGE_ADDR_BITS - 1)) - 1;

/// A buffer for string data using compact memory regions that are managed in pages.
/// New buffers need to be initialized, upn which they will receive an identifying buffer id
/// that is used whenever the data accessed.
/// A manager for buffers for string data, using compact memory regions managed in pages.
/// New buffers need to be initialized, upon which they will receive an identifying buffer id
/// that is used whenever the data is accessed. Strings are always added to buffers, possibly
/// requiring new pages to be started. At each time, there is one (latest) active page per buffer.
/// Buffers might be dropped, upon which all of its pages will be freed. There is no other way
/// of removing contents from a buffer.
///
/// Individual pages have a size of at most [`PAGE_SIZE`] glyphs, so that [`PAGE_ADDR_BITS`]
/// are needed to specify a position within a page. References to buffered strings are represented
/// by [`StringRef`], which stores a starting address and length of the string. The `usize` starting
/// address is global (uniform all buffers), with the lower [`PAGE_ADDR_BITS`] bits encoding a position within a page,
/// and the remaining higher bits encoding the number of the page (whatever buffer it belongs to).
/// Since this address must fit into usize, 32bit platforms can only support 2 to the power of
/// (32-[`PAGE_ADDR_BITS`]) pages. Moreover, since [`StringRef`] combines the address and the string length
/// into a single `u64` (on all platforms), even 64bit platforms cannot use all 64bits for string addresses.
/// The number of bits reserved for length is [`STRINGREF_STRING_LENGTH_BITS`], which should always be less
/// than [`PAGE_ADDR_BITS`] since longer strings would not fit any buffer page anyway.
///
/// The implementaion is not fully thread-safe, but it is thread-safe as long as each buffer
/// is used in only one thread. That is, parallel threads can safely create buffers (which will
/// have different ids), as long as all their operations use the buffer id that they were given.
/// The implementaion can be used in multiple parallel threads.
///
/// Note: The multi-thrading support is based on aggressive locking of all major operations. It might be
/// possible to reduce the amount of locking by designing more careful data structures. For example, locking
/// could be limited to the rare page-writing operations if Vectors would not move existing entries on (some)
/// writes, which causes races that may lead to reading errors unless all reads are also locked.
struct StringBuffer {
/// Vector of buffer ids and string buffers
/// Vector of all string buffer pages with the id of the buffer they belong to.
pages: Vec<(usize, String)>,
/// Single temporary string per buffer. [StringRef] uses this for representing strings that are not in the buffer.
///
/// TODO: It would be possible and more elegant to have a special alternative key implementation for our hashmap,
/// where the key has the relevant data instead of pointing to a temporary buffer.
tmp_strings: Vec<String>,
/// Currently active page for each buffer
/// Currently active page for each buffer. This is always the last page that was allocated for the buffer.
cur_pages: Vec<usize>,
/// Lock to guard page assignment operations when using multiple threads
lock: AtomicBool,
lock: UnsafeCell<AtomicBool>,
}

impl StringBuffer {
Expand All @@ -51,7 +68,7 @@ impl StringBuffer {
pages: Vec::new(),
tmp_strings: Vec::new(),
cur_pages: Vec::new(),
lock: AtomicBool::new(false),
lock: UnsafeCell::new(AtomicBool::new(false)),
}
}

Expand Down Expand Up @@ -86,66 +103,90 @@ impl StringBuffer {
fn push_str(&mut self, buffer: usize, s: &str) -> StringRef {
let len = s.len();
assert!(len < PAGE_SIZE);

self.acquire_page_lock();
let mut page_num = self.cur_pages[buffer];
if self.pages[page_num].1.len() + len > PAGE_SIZE {
self.acquire_page_lock();
self.pages.push((buffer, String::with_capacity(PAGE_SIZE)));
page_num = self.pages.len() - 1;
self.cur_pages[buffer] = page_num;
self.release_page_lock();
}
let page_addr = self.pages[page_num].1.len();
let page_inner_addr = self.pages[page_num].1.len();
self.pages[page_num].1.push_str(s);
self.release_page_lock();

StringRef::new(page_num * PAGE_SIZE + page_addr, s.len())
StringRef::new(page_num * PAGE_SIZE + page_inner_addr, len)
}

/// Returns a direct string slice reference for this data.
/// This is a pointer to global mutable data, and cannot be used safely.
fn get_str(&self, address: usize, length: usize) -> &str {
let page_num = address >> PAGE_ADDR_BITS;
let page_addr = address % PAGE_SIZE;
let page_inner_addr = address % PAGE_SIZE;

unsafe {
self.pages[page_num]
.1
.get_unchecked(page_addr..page_addr + length)
self.get_page(page_num)
.get_unchecked(page_inner_addr..page_inner_addr + length)
}
}

/// Creates a reference to the given string without adding the string to the buffer.
fn get_tmp_string_ref(&mut self, buffer: usize, s: &str) -> StringRef {
self.acquire_page_lock();
self.tmp_strings[buffer].clear();
self.tmp_strings[buffer].push_str(s);
self.release_page_lock();
StringRef::new_tmp(buffer)
}

/// Returns the current contents of the temporary string.
fn get_tmp_string(&self, buffer: usize) -> &str {
self.tmp_strings[buffer].as_str()
self.acquire_page_lock();
let result = self.tmp_strings[buffer].as_str();
self.release_page_lock();
result
}

/// Acquire the lock that we use for operations that add new pages or change
/// the assignment of pages to buffers in any way.
fn acquire_page_lock(&mut self) {
while self
.lock
/// Acquire the lock that we use for operations that read or write any of the internal data
/// structures that multiple buffers might use.
fn acquire_page_lock(&self) {
let lock = unsafe { &mut *self.lock.get() };
while lock
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Acquire)
.is_err()
{}
}

/// Release the lock.
fn release_page_lock(&mut self) {
self.lock.store(false, Ordering::Release);
fn release_page_lock(&self) {
let lock = unsafe { &mut *self.lock.get() };
lock.store(false, Ordering::Release);
}

fn get_page(&self, page_num: usize) -> &String {
self.acquire_page_lock();
let result = &self.pages[page_num].1;
self.release_page_lock();
result
}
}

const STRINGREF_STRING_LEGHT_BITS: u64 = 24;
const STRINGREF_STARTING_ADDRESS_BITS: u64 = 40;
const MAX_STRINGREF_STRING_LEGHT: u64 = 1 << STRINGREF_STRING_LEGHT_BITS;
const MAX_STRINGREF_STARTING_ADDRESS: u64 = 1 << STRINGREF_STARTING_ADDRESS_BITS;
/// Number of bits reserved for encoding the length of referenced strings.
/// This should be at most [`PAGE_ADDR_BITS`], the maximal length in any page
/// in the [`StringBuffer`], but it could conceivably also be less.
const STRINGREF_STRING_LENGTH_BITS: u64 = 24;
/// Number of bits reserved for the starting address of a string.
const STRINGREF_STARTING_ADDRESS_BITS: u64 = 64 - STRINGREF_STRING_LENGTH_BITS;
/// Largest number that can specify the length of a string in a [`StringRef`].
const MAX_STRINGREF_STRING_LENGTH: u64 = (1 << STRINGREF_STRING_LENGTH_BITS) - 1;
/// Largest number that can specify the starting address of a string in a [`StringRef`].
const MAX_STRINGREF_STARTING_ADDRESS: u64 = (1 << STRINGREF_STARTING_ADDRESS_BITS) - 1;

/// Memory-optimized reference to a string in the dictionary.
///
/// Internally, a single u64 number is used to combine the starting address of a
/// string in the [`StringBuffer`] and its length.
/// See [`StringBuffer`] for a discussion of the resulting constraints.
#[derive(Clone, Copy, Debug, Default)]
struct StringRef {
/// The 64bits reference consists of 40bits that encode a starting address within
Expand All @@ -159,29 +200,29 @@ impl StringRef {
/// Creates an object that refers to the current contents of the
/// buffer's temporary String.
fn new_tmp(buffer: usize) -> Self {
assert!(u64::try_from(buffer).unwrap() < MAX_STRINGREF_STRING_LEGHT);
assert!(u64::try_from(buffer).unwrap() <= MAX_STRINGREF_STRING_LENGTH);
let u64buffer: u64 = buffer.try_into().unwrap();
StringRef {
reference: (u64::MAX << STRINGREF_STRING_LEGHT_BITS) + u64buffer,
reference: (u64::MAX << STRINGREF_STRING_LENGTH_BITS) + u64buffer,
}
}

/// Creates a reference to the specific string slice in the buffer.
/// It is not checked if that slice is allocated.
fn new(address: usize, len: usize) -> Self {
assert!(u64::try_from(len).unwrap() < MAX_STRINGREF_STRING_LEGHT);
assert!(u64::try_from(address).unwrap() < MAX_STRINGREF_STARTING_ADDRESS);
assert!(u64::try_from(len).unwrap() <= MAX_STRINGREF_STRING_LENGTH);
assert!(u64::try_from(address).unwrap() <= MAX_STRINGREF_STARTING_ADDRESS);
let u64add: u64 = address.try_into().unwrap();
let u64len: u64 = len.try_into().unwrap();
StringRef {
reference: (u64add << STRINGREF_STRING_LEGHT_BITS) + u64len,
reference: (u64add << STRINGREF_STRING_LENGTH_BITS) + u64len,
}
}

/// Returns the stored start address for the string that this refers to.
/// For temporary references that do not point to the buffer, the result is meaningless.
fn address(&self) -> usize {
(self.reference >> STRINGREF_STRING_LEGHT_BITS)
(self.reference >> STRINGREF_STRING_LENGTH_BITS)
.try_into()
.unwrap()
}
Expand All @@ -195,7 +236,7 @@ impl StringRef {
/// Returns a direct string slice reference for this data.
/// This is a pointer to global mutable data, and cannot be used safely.
fn as_str(&self) -> &str {
if ((!self.reference) >> STRINGREF_STRING_LEGHT_BITS) != 0 {
if ((!self.reference) >> STRINGREF_STRING_LENGTH_BITS) != 0 {
unsafe { BUFFER.get_str(self.address(), self.len()) }
} else {
unsafe { BUFFER.get_tmp_string(self.len()) }
Expand Down

0 comments on commit 97c0513

Please sign in to comment.