Skip to content

Commit

Permalink
fs_store and InMemory/repo.rs had different visions of what compact s…
Browse files Browse the repository at this point in the history
…hould do. I believe this rectifies them.
  • Loading branch information
issackelly committed Sep 16, 2023
1 parent 6c221cd commit c9f2c90
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 36 deletions.
39 changes: 8 additions & 31 deletions src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,36 +151,30 @@ impl FsStore {
Ok(())
}

pub fn compact(&self, id: &DocumentId, full_doc: &[u8]) -> Result<(), Error> {
pub fn compact(
&self,
id: &DocumentId,
full_doc: &[u8],
new_heads: Vec<ChangeHash>,
) -> Result<(), Error> {
let paths = DocIdPaths::from(id);

// Load all the data we have into a doc
match Chunks::load(&self.root, id) {
Ok(Some(chunks)) => {
let doc = chunks
.to_doc()
.map_err(|e| Error(ErrorKind::LoadDocToCompact(e)))?;

// Write the snapshot
let output_chunk_name = SavedChunkName::new_snapshot(doc.get_heads());
let chunk = doc.save();
write_chunk(&self.root, &paths, &chunk, output_chunk_name.clone())?;
let output_chunk_name = SavedChunkName::new_snapshot(new_heads);
write_chunk(&self.root, &paths, full_doc, output_chunk_name.clone())?;

// Remove all the old data
for incremental in chunks.incrementals.keys() {
let path = paths.chunk_path(&self.root, incremental);
std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
let just_wrote = paths.chunk_path(&self.root, &output_chunk_name);
for snapshot in chunks.snapshots.keys() {
let path = paths.chunk_path(&self.root, snapshot);

if path == just_wrote {
tracing::trace!("Somehow trying to delete the same path we just wrote to. Not today Satan");
continue;
}

std::fs::remove_file(&path)
.map_err(|e| Error(ErrorKind::DeleteChunk(path, e)))?;
}
Expand Down Expand Up @@ -441,21 +435,6 @@ impl Chunks {
incrementals,
}))
}

fn to_doc(&self) -> Result<automerge::Automerge, automerge::AutomergeError> {
let mut bytes = Vec::new();
for chunk in self.snapshots.values() {
bytes.extend(chunk);
}
for chunk in self.incrementals.values() {
bytes.extend(chunk);
}

automerge::Automerge::load_with_options(
&bytes,
automerge::LoadOptions::new().on_partial_load(automerge::OnPartialLoad::Ignore),
)
}
}

mod error {
Expand Down Expand Up @@ -499,8 +478,6 @@ mod error {
ErrReadingChunkFile(PathBuf, std::io::Error),
#[error("error creating level 2 path {0}: {1}")]
CreateLevel2Path(PathBuf, std::io::Error),
#[error("error loading doc to compact: {0}")]
LoadDocToCompact(automerge::AutomergeError),
#[error("error creating temp file: {0}")]
CreateTempFile(std::io::Error),
#[error("error writing temp file {0}: {1}")]
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use automerge::ChangeHash;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -113,5 +114,6 @@ pub trait Storage: Send {
&self,
_id: DocumentId,
_full_doc: Vec<u8>,
_new_heads: Vec<ChangeHash>,
) -> BoxFuture<'static, Result<(), StorageError>>;
}
5 changes: 4 additions & 1 deletion src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,10 @@ impl DocumentInfo {
(doc.automerge.save(), doc.automerge.get_heads())
};
self.patches_since_last_compact = 0;
(storage.compact(document_id.clone(), to_save), new_heads)
(
storage.compact(document_id.clone(), to_save, new_heads.clone()),
new_heads,
)
} else {
let (to_save, new_heads) = {
let doc = self.document.read();
Expand Down
8 changes: 7 additions & 1 deletion src/tokio/fs_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ impl Storage for FsStorage {
&self,
id: crate::DocumentId,
full_doc: Vec<u8>,
new_heads: Vec<automerge::ChangeHash>,
) -> BoxFuture<'static, Result<(), StorageError>> {
let inner = Arc::clone(&self.inner);
let inner_id = id.clone();
self.handle
.spawn_blocking(move || inner.lock().unwrap().compact(&inner_id, &full_doc))
.spawn_blocking(move || {
inner
.lock()
.unwrap()
.compact(&inner_id, &full_doc, &new_heads)
})
.map(handle_joinerror)
.map_err(move |e| {
tracing::error!(err=?e, doc=?id, "error compacting chunk to filesystem");
Expand Down
4 changes: 4 additions & 0 deletions test_utils/src/storage_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use automerge::ChangeHash;
use automerge_repo::{DocumentId, Storage, StorageError};
use futures::future::{BoxFuture, TryFutureExt};
use futures::FutureExt;
Expand Down Expand Up @@ -30,6 +31,7 @@ impl Storage for SimpleStorage {
&self,
_id: DocumentId,
_chunk: Vec<u8>,
_new_heads: Vec<ChangeHash>,
) -> BoxFuture<'static, Result<(), StorageError>> {
futures::future::ready(Ok(())).boxed()
}
Expand Down Expand Up @@ -76,6 +78,7 @@ impl Storage for InMemoryStorage {
&self,
id: DocumentId,
full_doc: Vec<u8>,
_new_heads: Vec<ChangeHash>,
) -> BoxFuture<'static, Result<(), StorageError>> {
let mut documents = self.documents.lock();
documents.insert(id, full_doc);
Expand Down Expand Up @@ -202,6 +205,7 @@ impl Storage for AsyncInMemoryStorage {
&self,
id: DocumentId,
full_doc: Vec<u8>,
_new_heads: Vec<ChangeHash>,
) -> BoxFuture<'static, Result<(), StorageError>> {
let (tx, rx) = oneshot();
self.chan
Expand Down
1 change: 0 additions & 1 deletion tests/document_changed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ fn test_document_changed_over_sync() {

// Spawn a task that awaits the document change.
let (done_sync_sender, mut done_sync_receiver) = channel(1);
let repo_id = repo_handle_1.get_repo_id().clone();
rt.spawn(async move {
loop {
// Await changes until the edit comes through over sync.
Expand Down
5 changes: 3 additions & 2 deletions tests/fs_storage/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use automerge::transaction::Transactable;
use automerge_repo::fs_store;
use itertools::Itertools;
use uuid::Uuid;

/// Asserts that the &[u8] in `data` is some permutation of the chunks of Vec<&[u8> in `expected`
macro_rules! assert_permutation_of {
Expand Down Expand Up @@ -45,7 +44,9 @@ fn fs_store_crud() {
assert_permutation_of!(result, vec![change1.bytes(), change2.bytes()]);

// now compact
store.compact(&doc_id, &[]).unwrap();
store
.compact(&doc_id, &doc.save(), doc.get_heads())
.unwrap();
let result = store.get(&doc_id).unwrap().unwrap();
let expected = doc.save();
assert_eq!(result, expected);
Expand Down

0 comments on commit c9f2c90

Please sign in to comment.