diff --git a/firewood/src/db.rs b/firewood/src/db.rs index 5677b885c..5f8cd0b0f 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -45,7 +45,7 @@ use std::{ }; use tokio::task::block_in_place; -mod proposal; +pub mod proposal; use self::proposal::ProposalBase; diff --git a/firewood/src/db/proposal.rs b/firewood/src/db/proposal.rs index 009d0ee39..80dafa608 100644 --- a/firewood/src/db/proposal.rs +++ b/firewood/src/db/proposal.rs @@ -43,7 +43,7 @@ pub enum ProposalBase { } #[async_trait] -impl crate::v2::api::Proposal for Proposal { +impl api::Proposal for Proposal { type Proposal = Proposal; #[allow(clippy::unwrap_used)] diff --git a/firewood/src/merkle/stream.rs b/firewood/src/merkle/stream.rs index 69a12230b..0bbb9795b 100644 --- a/firewood/src/merkle/stream.rs +++ b/firewood/src/merkle/stream.rs @@ -29,6 +29,23 @@ enum IterationNode<'a> { }, } +impl<'a> std::fmt::Debug for IterationNode<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unvisited { key, node } => f + .debug_struct("Unvisited") + .field("key", key) + .field("node", node) + .finish(), + Self::Visited { + key, + children_iter: _, + } => f.debug_struct("Visited").field("key", key).finish(), + } + } +} + +#[derive(Debug)] enum NodeStreamState<'a> { /// The iterator state is lazily initialized when poll_next is called /// for the first time. The iteration start key is stored here. @@ -49,6 +66,7 @@ impl NodeStreamState<'_> { } } +#[derive(Debug)] pub struct MerkleNodeStream<'a, S, T> { state: NodeStreamState<'a>, merkle_root: DiskAddress, @@ -271,6 +289,7 @@ fn get_iterator_intial_state<'a, S: ShaleStore + Send + Sync, T>( } } +#[derive(Debug)] enum MerkleKeyValueStreamState<'a, S, T> { /// The iterator state is lazily initialized when poll_next is called /// for the first time. The iteration start key is stored here. @@ -295,6 +314,7 @@ impl<'a, S, T> MerkleKeyValueStreamState<'a, S, T> { } } +#[derive(Debug)] pub struct MerkleKeyValueStream<'a, S, T> { state: MerkleKeyValueStreamState<'a, S, T>, merkle_root: DiskAddress, diff --git a/grpc-testtool/Cargo.toml b/grpc-testtool/Cargo.toml index f7a22fe29..af9ba1e5d 100644 --- a/grpc-testtool/Cargo.toml +++ b/grpc-testtool/Cargo.toml @@ -29,6 +29,7 @@ env_logger = "0.11.2" chrono = "0.4.34" serde_json = "1.0.113" serde = { version = "1.0.196", features = ["derive"] } +futures = "*" [build-dependencies] tonic-build = "0.11.0" diff --git a/grpc-testtool/build.rs b/grpc-testtool/build.rs index f546f152e..b313c974c 100644 --- a/grpc-testtool/build.rs +++ b/grpc-testtool/build.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; fn main() -> Result<(), Box> { // we want to import these proto files - let import_protos = ["sync", "rpcdb", "process-server"]; + let import_protos = ["sync", "rpcdb", "process-server", "merkle"]; let protos: Box<[PathBuf]> = import_protos .into_iter() diff --git a/grpc-testtool/proto/merkle/merkle.proto b/grpc-testtool/proto/merkle/merkle.proto index 94261d3ea..a894d5376 100644 --- a/grpc-testtool/proto/merkle/merkle.proto +++ b/grpc-testtool/proto/merkle/merkle.proto @@ -7,16 +7,22 @@ import "google/protobuf/empty.proto"; // Methods on this service return status code NOT_FOUND if a requested // view, iterator or root hash is not found. service Merkle { + // --- Proposals --- rpc NewProposal(NewProposalRequest) returns (NewProposalResponse); rpc ProposalCommit(ProposalCommitRequest) returns (google.protobuf.Empty); + // --- Views --- rpc NewView(NewViewRequest) returns (NewViewResponse); + + // --- Reads --- // The methods below may be called with an ID that corresponds to either a (committable) proposal // or (non-committable) historical view. rpc ViewHas(ViewHasRequest) returns (ViewHasResponse); rpc ViewGet(ViewGetRequest) returns (ViewGetResponse); + + // --- Iterators --- rpc ViewNewIteratorWithStartAndPrefix(ViewNewIteratorWithStartAndPrefixRequest) returns (ViewNewIteratorWithStartAndPrefixResponse); - // Returns status code NOT_FOUND when the iterator is done. + // Returns status code OUT_OF_RANGE when the iterator is done rpc IteratorNext(IteratorNextRequest) returns (IteratorNextResponse); rpc IteratorError(IteratorErrorRequest) returns (google.protobuf.Empty); // Iterator can't be used (even to check error) after release. @@ -67,17 +73,17 @@ message ViewGetResponse { } message ViewNewIteratorWithStartAndPrefixRequest { - uint32 id = 1; + uint64 id = 1; bytes start = 2; bytes prefix = 3; } message ViewNewIteratorWithStartAndPrefixResponse { - uint32 id = 1; + uint64 id = 1; } message IteratorNextRequest { - uint32 id = 1; + uint64 id = 1; } message IteratorNextResponse { @@ -85,11 +91,11 @@ message IteratorNextResponse { } message IteratorErrorRequest { - uint32 id = 1; + uint64 id = 1; } message IteratorReleaseRequest { - uint32 id = 1; + uint64 id = 1; } message ViewReleaseRequest { diff --git a/grpc-testtool/src/lib.rs b/grpc-testtool/src/lib.rs index 3819ac80a..953309573 100644 --- a/grpc-testtool/src/lib.rs +++ b/grpc-testtool/src/lib.rs @@ -16,6 +16,11 @@ pub mod process_server { tonic::include_proto!("process"); } +pub mod merkle { + #![allow(clippy::unwrap_used, clippy::missing_const_for_fn)] + tonic::include_proto!("merkle"); +} + pub mod service; pub use service::Database as DatabaseService; diff --git a/grpc-testtool/src/service.rs b/grpc-testtool/src/service.rs index b82ba676a..6af26e4fc 100644 --- a/grpc-testtool/src/service.rs +++ b/grpc-testtool/src/service.rs @@ -2,10 +2,14 @@ // See the file LICENSE.md for licensing terms. use firewood::db::{Db, DbConfig}; +use firewood::merkle::MerkleKeyValueStream; use firewood::storage::WalConfig; use firewood::v2::{api::Db as _, api::Error}; +use std::fmt::Debug; use std::path::Path; +use std::pin::Pin; +use std::sync::atomic::AtomicU32; use std::{ collections::HashMap, ops::Deref, @@ -19,6 +23,7 @@ use tonic::Status; pub mod database; pub mod db; +pub mod merkle; pub mod process; trait IntoStatusResultExt { @@ -44,6 +49,45 @@ impl IntoStatusResultExt for Result { pub struct Database { db: Db, iterators: Arc>, + views: Arc>, +} +#[derive(Default, Debug)] +struct Views { + map: HashMap, + next_id: AtomicU32, +} + +impl Views { + fn insert(&mut self, view: View) -> u32 { + let next_id = self.next_id.fetch_add(1, Ordering::Relaxed); + self.map.insert(next_id, view); + next_id + } + + fn delete(&mut self, view_id: u32) -> Option { + self.map.remove(&view_id) + } + + fn get(&self, view_id: u32) -> Option<&View> { + self.map.get(&view_id) + } +} + +use futures::Stream; + +enum View { + Historical(Arc<::Historical>), + Proposal(Arc<::Proposal>), +} + +// TODO: We manually implement Debug since Proposal does not, but probably should +impl Debug for View { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Historical(arg0) => f.debug_tuple("Historical").field(arg0).finish(), + Self::Proposal(_arg0) => f.debug_tuple("Proposal").finish(), + } + } } impl Database { @@ -62,6 +106,7 @@ impl Database { Ok(Self { db, iterators: Default::default(), + views: Default::default(), }) } } @@ -81,28 +126,35 @@ impl Database { } } -// TODO: implement Iterator -#[derive(Debug)] -struct Iter; +trait DebugStream: Stream + Debug + Send {} + +impl, T: Send + Sync + Debug> DebugStream for MerkleKeyValueStream<'_, S, T> {} +type Iter = dyn DebugStream, Vec), firewood::v2::api::Error>>; + +type IteratorID = u64; #[derive(Default, Debug)] struct Iterators { - map: HashMap, + map: HashMap>>, next_id: AtomicU64, } impl Iterators { - fn insert(&mut self, iter: Iter) -> u64 { + fn insert(&mut self, iter: Pin>) -> IteratorID { let id = self.next_id.fetch_add(1, Ordering::Relaxed); self.map.insert(id, iter); id } - fn _get(&self, id: u64) -> Option<&Iter> { + fn get(&self, id: IteratorID) -> Option<&Pin>> { self.map.get(&id) } - fn remove(&mut self, id: u64) { + fn get_mut(&mut self, id: IteratorID) -> Option<&mut Pin>> { + self.map.get_mut(&id) + } + + fn remove(&mut self, id: IteratorID) { self.map.remove(&id); } } diff --git a/grpc-testtool/src/service/database.rs b/grpc-testtool/src/service/database.rs index 7fc08ba05..857a6050a 100644 --- a/grpc-testtool/src/service/database.rs +++ b/grpc-testtool/src/service/database.rs @@ -1,7 +1,7 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use super::{Database as DatabaseService, IntoStatusResultExt, Iter}; +use super::{Database as DatabaseService, IntoStatusResultExt}; use crate::rpcdb::{ database_server::Database, CloseRequest, CloseResponse, CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, GetRequest, GetResponse, HasRequest, HasResponse, @@ -113,20 +113,13 @@ impl Database for DatabaseService { async fn new_iterator_with_start_and_prefix( &self, - request: Request, + req: Request, ) -> Result, Status> { let NewIteratorWithStartAndPrefixRequest { start: _, prefix: _, - } = request.into_inner(); - - // TODO: create the actual iterator - let id = { - let mut iters = self.iterators.lock().await; - iters.insert(Iter) - }; - - Ok(Response::new(NewIteratorWithStartAndPrefixResponse { id })) + } = req.into_inner(); + Err(Status::unimplemented("new_iterator_with_start_and_prefix not implemented")) } async fn iterator_next( diff --git a/grpc-testtool/src/service/merkle.rs b/grpc-testtool/src/service/merkle.rs new file mode 100644 index 000000000..29ae80bc2 --- /dev/null +++ b/grpc-testtool/src/service/merkle.rs @@ -0,0 +1,186 @@ +use std::sync::Arc; + +use crate::merkle::{ + merkle_server::Merkle as MerkleServiceTrait, IteratorErrorRequest, IteratorNextRequest, + IteratorNextResponse, IteratorReleaseRequest, NewProposalRequest, NewProposalResponse, + NewViewRequest, NewViewResponse, ProposalCommitRequest, PutRequest, ViewGetRequest, + ViewGetResponse, ViewHasRequest, ViewHasResponse, ViewNewIteratorWithStartAndPrefixRequest, + ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest, +}; +use firewood::{ + db::{BatchOp, Proposal}, + v2::api::{self, Db}, +}; +use tonic::{async_trait, Request, Response, Status}; + +use super::{IntoStatusResultExt, View}; + +use futures::StreamExt; + +#[async_trait] +impl MerkleServiceTrait for super::Database { + async fn new_proposal( + &self, + req: Request, + ) -> Result, Status> { + let req = req.into_inner(); + + // convert the provided data into a set of BatchOp Put and Delete operations + let mut data: Vec<_> = req + .puts + .into_iter() + .map(|put| BatchOp::Put { + key: put.key, + value: put.value, + }) + .collect(); + data.extend( + req.deletes + .into_iter() + .map(|del| BatchOp::Delete { key: del }), + ); + + let mut views = self.views.lock().await; + + // the proposal depends on the parent_view_id. If it's None, then we propose on the db itself + // Otherwise, we're provided a base proposal to base it off of, so go fetch that + // proposal from the views + let proposal = match req.parent_id { + None => self.db.propose(data).await, + Some(parent_id) => { + let view = views.map.get(&parent_id); + match view { + None => return Err(Status::not_found(format!("ID {parent_id} not found"))), + Some(View::Proposal(parent)) => { + firewood::v2::api::Proposal::propose(parent.clone(), data).await + } + Some(_) => { + return Err(Status::invalid_argument(format!( + "ID {parent_id} is not a commitable proposal" + ))) + } + } + } + } + .into_status_result()?; + + // compute the next view id + let id = views.insert(View::Proposal(Arc::new(proposal))); + + let resp = Response::new(NewProposalResponse { id }); + Ok(resp) + } + + async fn proposal_commit( + &self, + req: Request, + ) -> Result, Status> { + let mut views = self.views.lock().await; + let id = req.into_inner().id; + + match views.map.remove(&id) { + None => return Err(Status::not_found(format!("id {id} not found"))), + Some(View::Proposal(proposal)) => proposal.commit(), + Some(_) => { + return Err(Status::invalid_argument(format!( + "id {id} is not a commitable proposal" + ))) + } + } + .await + .into_status_result()?; + Ok(Response::new(())) + } + + async fn new_view( + &self, + req: Request, + ) -> Result, Status> { + let hash = std::convert::TryInto::<[u8; 32]>::try_into(req.into_inner().root_hash) + .map_err(|_| api::Error::InvalidProposal) // TODO: better error here? + .into_status_result()?; + let mut views = self.views.lock().await; + let view = self.db.revision(hash).await.into_status_result()?; + let id = views.insert(View::Historical(view)); + Ok(Response::new(NewViewResponse { id })) + } + + async fn view_has( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn view_get( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn view_new_iterator_with_start_and_prefix( + &self, + req: Request, + ) -> Result, Status> { + let req = req.into_inner(); + let id = req.id; + let key = req.start; + let views = self + .views + .lock() + .await; + let iter = views + .get(id as u32) + .ok_or_else(|| Status::not_found("id {id} not found")); + let iter = iter + .map(|view| match view { + View::Historical(historical) => historical.stream_from(key.into_boxed_slice()), + View::Proposal(_proposal) => todo!(), // proposal.stream_from(key), + })?; + let mut iterators = self.iterators.lock().await; + let id = iterators.insert(Box::pin(iter)); + Ok(Response::new(ViewNewIteratorWithStartAndPrefixResponse { id })) + } + + async fn view_release(&self, req: Request) -> Result, Status> { + let mut views = self.views.lock().await; + // we don't care if this works :/ + views.delete(req.into_inner().id); + Ok(Response::new(())) + } + + async fn iterator_next( + &self, + req: Request, + ) -> Result, Status> { + let id = req.into_inner().id; + let mut iterators = self.iterators.lock().await; + let view = iterators + .get_mut(id) + .ok_or_else(|| Status::not_found(format!("iterator {id} not found")))?; + + let (key, value) = view + .next() + .await + .ok_or_else(|| Status::out_of_range(format!("iterator {id} at end")))? + .into_status_result()?; + Ok(Response::new(IteratorNextResponse { + data: Some(PutRequest { key: key.to_vec(), value }), + })) + } + + async fn iterator_release( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn iterator_error( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } +}