From bee6c15d7be07c2b92903f58e2295d3c4733f1a5 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Thu, 15 Feb 2024 17:27:22 -0500 Subject: [PATCH 01/12] rename proto --- grpc-testtool/proto/merkle/merkle.proto | 102 ++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 grpc-testtool/proto/merkle/merkle.proto diff --git a/grpc-testtool/proto/merkle/merkle.proto b/grpc-testtool/proto/merkle/merkle.proto new file mode 100644 index 000000000..910246a22 --- /dev/null +++ b/grpc-testtool/proto/merkle/merkle.proto @@ -0,0 +1,102 @@ +syntax = "proto3"; + +package merkle; + +import "google/protobuf/empty.proto"; + +service Merkle { + rpc NewProposal(NewProposalRequest) returns (NewProposalResponse); + rpc ProposalCommit(ProposalCommitRequest) returns (ProposalCommitResponse); + + rpc NewView(NewViewRequest) returns (NewViewResponse); + // The methods below may be called with a view ID that corresponds to either a (committable) proposal + // or (non-committable) historical view. + rpc ViewHas(ViewHasRequest) returns (ViewHasResponse); + rpc ViewGet(ViewGetRequest) returns (ViewGetResponse); + rpc ViewNewIteratorWithStartAndPrefix(ViewNewIteratorWithStartAndPrefixRequest) returns (ViewNewIteratorWithStartAndPrefixResponse); + rpc ViewRelease(ViewReleaseRequest) returns (google.protobuf.Empty); +} + +message NewProposalRequest { + // If not given, the parent view is the current database revision. + optional uint32 parent_view_id = 1; + repeated PutRequest puts = 2; + repeated bytes deletes = 3; +} + +message NewProposalResponse { + uint32 proposal_id = 1; +} + +message ProposalCommitRequest { + uint32 proposal_id = 1; +} + +message ProposalCommitResponse { + CommitError err = 1; +} + +message NewViewRequest { + bytes root_id = 1; +} + +message NewViewResponse { + // If not given, the parent view is the current database revision. + optional uint32 view_id = 1; +} + +message ViewHasRequest { + uint32 view_id = 1; + bytes key = 2; +} + +message ViewHasResponse { + bool has = 1; +} + +message ViewGetRequest { + uint32 view_id = 1; + bytes key = 2; +} + +message ViewGetResponse { + bytes value = 1; + GetError err = 2; +} + +message ViewNewIteratorWithStartAndPrefixRequest { + uint32 view_id = 1; + bytes start = 2; + bytes prefix = 3; +} + +message ViewNewIteratorWithStartAndPrefixResponse { + uint32 iterator_id = 1; +} + +message ViewReleaseRequest { + uint32 view_id = 1; +} + +// TODO import this from the rpcdb package. +message PutRequest { + bytes key = 1; + bytes value = 2; +} + +enum GetError { + // ERROR_UNSPECIFIED_GET is used to indicate that no error occurred. + ERROR_UNSPECIFIED_GET = 0; + ERROR_CLOSED_GET = 1; + ERROR_NOT_FOUND = 2; +} + +enum CommitError { + // ERROR_UNSPECIFIED_COMMIT is used to indicate that no error occurred. + ERROR_UNSPECIFIED_COMMIT = 0; + ERROR_CLOSED_COMMIT = 1; + ERROR_INVALID = 2; + ERROR_COMMITTED = 3; + ERROR_PARENT_NOT_DATABASE = 4; + ERROR_NON_PROPOSAL_ID = 5; +} \ No newline at end of file From a21c2553586335d1fdb3ec5ac90b4dc8c64581aa Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Thu, 15 Feb 2024 17:33:22 -0500 Subject: [PATCH 02/12] nit newline --- grpc-testtool/proto/merkle/merkle.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc-testtool/proto/merkle/merkle.proto b/grpc-testtool/proto/merkle/merkle.proto index 910246a22..1757d6ac2 100644 --- a/grpc-testtool/proto/merkle/merkle.proto +++ b/grpc-testtool/proto/merkle/merkle.proto @@ -99,4 +99,4 @@ enum CommitError { ERROR_COMMITTED = 3; ERROR_PARENT_NOT_DATABASE = 4; ERROR_NON_PROPOSAL_ID = 5; -} \ No newline at end of file +} From 3ebbb71473ca7d2d16408c249aaceb2a9bf44baa Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Fri, 16 Feb 2024 08:37:42 -0500 Subject: [PATCH 03/12] fix NewViewResponse --- grpc-testtool/proto/merkle/merkle.proto | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/grpc-testtool/proto/merkle/merkle.proto b/grpc-testtool/proto/merkle/merkle.proto index 1757d6ac2..7723cd4d3 100644 --- a/grpc-testtool/proto/merkle/merkle.proto +++ b/grpc-testtool/proto/merkle/merkle.proto @@ -41,8 +41,7 @@ message NewViewRequest { } message NewViewResponse { - // If not given, the parent view is the current database revision. - optional uint32 view_id = 1; + uint32 view_id = 1; } message ViewHasRequest { From 6d2e47ca03c0ea8bb07a0480cf56b8ebc8e87aad Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 16 Feb 2024 18:19:23 +0000 Subject: [PATCH 04/12] Framework for MerkleService Still todo!() in all implementations Fixed up build.rs so that it only rebuilds the tonic things when the specs change, which is rare, speeding up incremental builds. --- grpc-testtool/build.rs | 20 ++++++++-- grpc-testtool/src/lib.rs | 5 +++ grpc-testtool/src/service.rs | 1 + grpc-testtool/src/service/merkle.rs | 59 +++++++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 grpc-testtool/src/service/merkle.rs diff --git a/grpc-testtool/build.rs b/grpc-testtool/build.rs index 1c3c17ee3..89a9d39cc 100644 --- a/grpc-testtool/build.rs +++ b/grpc-testtool/build.rs @@ -1,10 +1,24 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. +use std::path::PathBuf; + fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/sync/sync.proto")?; - tonic_build::compile_protos("proto/rpcdb/rpcdb.proto")?; - tonic_build::compile_protos("proto/process-server/process-server.proto")?; + // we want to import these 4 proto files + let import_protos = ["sync", "rpcdb", "process-server", "merkle"]; + + let protos: Box<[PathBuf]> = import_protos + .into_iter() + .map(|proto| PathBuf::from(format!("proto/{proto}/{proto}.proto"))) + .collect(); + + // go through each proto and build it, also let cargo know we rerun this if the file changes + for proto in protos.iter() { + tonic_build::compile_protos(proto)?; + + // this improves recompile times; we only rerun tonic if any of these files change + println!("cargo:rerun-if-changed={}", proto.display()); + } Ok(()) } diff --git a/grpc-testtool/src/lib.rs b/grpc-testtool/src/lib.rs index 3819ac80a..953309573 100644 --- a/grpc-testtool/src/lib.rs +++ b/grpc-testtool/src/lib.rs @@ -16,6 +16,11 @@ pub mod process_server { tonic::include_proto!("process"); } +pub mod merkle { + #![allow(clippy::unwrap_used, clippy::missing_const_for_fn)] + tonic::include_proto!("merkle"); +} + pub mod service; pub use service::Database as DatabaseService; diff --git a/grpc-testtool/src/service.rs b/grpc-testtool/src/service.rs index b82ba676a..1778b38fa 100644 --- a/grpc-testtool/src/service.rs +++ b/grpc-testtool/src/service.rs @@ -19,6 +19,7 @@ use tonic::Status; pub mod database; pub mod db; +pub mod merkle; pub mod process; trait IntoStatusResultExt { diff --git a/grpc-testtool/src/service/merkle.rs b/grpc-testtool/src/service/merkle.rs new file mode 100644 index 000000000..e91e43030 --- /dev/null +++ b/grpc-testtool/src/service/merkle.rs @@ -0,0 +1,59 @@ +use crate::merkle::{ + merkle_server::Merkle as MerkleServiceTrait, NewProposalRequest, NewProposalResponse, + NewViewRequest, NewViewResponse, ProposalCommitRequest, ProposalCommitResponse, ViewGetRequest, + ViewGetResponse, ViewHasRequest, ViewHasResponse, ViewNewIteratorWithStartAndPrefixRequest, + ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest, +}; +use tonic::{async_trait, Request, Response, Status}; + +#[async_trait] +impl MerkleServiceTrait for super::Db { + async fn new_proposal( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn proposal_commit( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn new_view( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn view_has( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn view_get( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn view_new_iterator_with_start_and_prefix( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn view_release( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } +} From 7d3046b8993db91c23af56bfcacfb04a68ba7881 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 16 Feb 2024 20:16:21 +0000 Subject: [PATCH 05/12] Improve build.rs Made it a bit easier to add more proto files, and also report to cargo that these are the only files that cause the build.rs to run again --- grpc-testtool/build.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/grpc-testtool/build.rs b/grpc-testtool/build.rs index 1c3c17ee3..f546f152e 100644 --- a/grpc-testtool/build.rs +++ b/grpc-testtool/build.rs @@ -1,10 +1,24 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. +use std::path::PathBuf; + fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/sync/sync.proto")?; - tonic_build::compile_protos("proto/rpcdb/rpcdb.proto")?; - tonic_build::compile_protos("proto/process-server/process-server.proto")?; + // we want to import these proto files + let import_protos = ["sync", "rpcdb", "process-server"]; + + let protos: Box<[PathBuf]> = import_protos + .into_iter() + .map(|proto| PathBuf::from(format!("proto/{proto}/{proto}.proto"))) + .collect(); + + // go through each proto and build it, also let cargo know we rerun this if the file changes + for proto in protos.iter() { + tonic_build::compile_protos(proto)?; + + // this improves recompile times; we only rerun tonic if any of these files change + println!("cargo:rerun-if-changed={}", proto.display()); + } Ok(()) } From f6320c0be16be4af942ace5f1fc3b8111f5de9a6 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 16 Feb 2024 22:28:17 +0000 Subject: [PATCH 06/12] More implementation of interface Still need the reader functions --- firewood/src/db.rs | 2 +- firewood/src/db/proposal.rs | 2 +- grpc-testtool/src/service.rs | 32 +++++++++++ grpc-testtool/src/service/merkle.rs | 87 ++++++++++++++++++++++++++--- 4 files changed, 114 insertions(+), 9 deletions(-) diff --git a/firewood/src/db.rs b/firewood/src/db.rs index b278b564a..bf36f7594 100644 --- a/firewood/src/db.rs +++ b/firewood/src/db.rs @@ -45,7 +45,7 @@ use std::{ }; use tokio::task::block_in_place; -mod proposal; +pub mod proposal; use self::proposal::ProposalBase; diff --git a/firewood/src/db/proposal.rs b/firewood/src/db/proposal.rs index e842c1087..12967c3af 100644 --- a/firewood/src/db/proposal.rs +++ b/firewood/src/db/proposal.rs @@ -43,7 +43,7 @@ pub enum ProposalBase { } #[async_trait] -impl crate::v2::api::Proposal for Proposal { +impl api::Proposal for Proposal { type Proposal = Proposal; #[allow(clippy::unwrap_used)] diff --git a/grpc-testtool/src/service.rs b/grpc-testtool/src/service.rs index 1778b38fa..6ae7270e3 100644 --- a/grpc-testtool/src/service.rs +++ b/grpc-testtool/src/service.rs @@ -5,7 +5,9 @@ use firewood::db::{Db, DbConfig}; use firewood::storage::WalConfig; use firewood::v2::{api::Db as _, api::Error}; +use std::fmt::Debug; use std::path::Path; +use std::sync::atomic::AtomicU32; use std::{ collections::HashMap, ops::Deref, @@ -45,6 +47,35 @@ impl IntoStatusResultExt for Result { pub struct Database { db: Db, iterators: Arc>, + views: Arc>, +} +#[derive(Default, Debug)] +struct Views { + map: HashMap, + next_id: AtomicU32, +} + +impl Views { + fn insert(&mut self, view: View) -> u32 { + let next_id = self.next_id.fetch_add(1, Ordering::Relaxed); + self.map.insert(next_id, view); + next_id + } +} + +enum View { + Historical(Arc<::Historical>), + Proposal(Arc<::Proposal>), +} + +// TODO: We manually implement Debug since Proposal does not, but probably should +impl Debug for View { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Historical(arg0) => f.debug_tuple("Historical").field(arg0).finish(), + Self::Proposal(_arg0) => f.debug_tuple("Proposal").finish(), + } + } } impl Database { @@ -63,6 +94,7 @@ impl Database { Ok(Self { db, iterators: Default::default(), + views: Default::default(), }) } } diff --git a/grpc-testtool/src/service/merkle.rs b/grpc-testtool/src/service/merkle.rs index e91e43030..1f214b188 100644 --- a/grpc-testtool/src/service/merkle.rs +++ b/grpc-testtool/src/service/merkle.rs @@ -1,32 +1,105 @@ +use std::sync::Arc; + use crate::merkle::{ merkle_server::Merkle as MerkleServiceTrait, NewProposalRequest, NewProposalResponse, NewViewRequest, NewViewResponse, ProposalCommitRequest, ProposalCommitResponse, ViewGetRequest, ViewGetResponse, ViewHasRequest, ViewHasResponse, ViewNewIteratorWithStartAndPrefixRequest, ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest, }; +use firewood::{ + db::{BatchOp, Proposal}, + v2::api::{self, Db}, +}; use tonic::{async_trait, Request, Response, Status}; +use super::{IntoStatusResultExt, View}; + +//#[prost(uint32, optional, tag = "1")] +//pub parent_view_id: ::core::option::Option, +//#[prost(message, repeated, tag = "2")] +//pub puts: ::prost::alloc::vec::Vec, +//#[prost(bytes = "vec", repeated, tag = "3")] +//pub deletes: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[async_trait] -impl MerkleServiceTrait for super::Db { +impl MerkleServiceTrait for super::Database { async fn new_proposal( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let req = req.into_inner(); + + // convert the provided data into a set of BatchOp Put and Delete operations + let mut data: Vec<_> = req + .puts + .into_iter() + .map(|put| BatchOp::Put { + key: put.key, + value: put.value, + }) + .collect(); + data.extend( + req.deletes + .into_iter() + .map(|del| BatchOp::Delete { key: del }), + ); + + let mut views = self.views.lock().await; + + // the proposal depends on the parent_view_id. If it's None, then we propose on the db itself + // Otherwise, we're provided a base proposal to base it off of, so go fetch that + // proposal from the views + let proposal = match req.parent_view_id { + None => self.db.propose(data).await, + Some(parent_id) => { + let view = views.map.get(&parent_id); + match view { + None => return Err(Status::invalid_argument("invalid view id")), + Some(View::Proposal(parent)) => { + firewood::v2::api::Proposal::propose(parent.clone(), data).await + } + Some(_) => return Err(Status::invalid_argument("non-proposal id")), + } + } + } + .into_status_result()?; + + // compute the next view id + let view_id = views.insert(View::Proposal(Arc::new(proposal))); + + let resp = Response::new(NewProposalResponse { + proposal_id: view_id, + }); + Ok(resp) } async fn proposal_commit( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let mut views = self.views.lock().await; + + match views.map.remove(&req.into_inner().proposal_id) { + None => return Err(Status::invalid_argument("invalid view id")), + Some(View::Proposal(proposal)) => proposal.commit(), + Some(_) => return Err(Status::invalid_argument("non-proposal id")), + } + .await + .into_status_result()?; + Ok(Response::new(ProposalCommitResponse { err: 0 })) } async fn new_view( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let hash = std::convert::TryInto::<[u8; 32]>::try_into(req.into_inner().root_id) + .map_err(|_| api::Error::InvalidProposal) // TODO: better error here? + .into_status_result()?; + let mut views = self.views.lock().await; + let view = self.db.revision(hash).await.into_status_result()?; + let view_id = views.insert(View::Historical(view)); + Ok(Response::new(NewViewResponse { view_id })) } async fn view_has( From 1131c0741da32ccdadbb6032cb43ea753c527169 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 16 Feb 2024 22:33:14 +0000 Subject: [PATCH 07/12] Implement view_release --- grpc-testtool/src/service.rs | 4 ++++ grpc-testtool/src/service/merkle.rs | 10 +++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/grpc-testtool/src/service.rs b/grpc-testtool/src/service.rs index 6ae7270e3..b42f1b552 100644 --- a/grpc-testtool/src/service.rs +++ b/grpc-testtool/src/service.rs @@ -61,6 +61,10 @@ impl Views { self.map.insert(next_id, view); next_id } + + fn delete(&mut self, view_id: u32) -> Option { + self.map.remove(&view_id) + } } enum View { diff --git a/grpc-testtool/src/service/merkle.rs b/grpc-testtool/src/service/merkle.rs index 1f214b188..33b3c82b1 100644 --- a/grpc-testtool/src/service/merkle.rs +++ b/grpc-testtool/src/service/merkle.rs @@ -123,10 +123,10 @@ impl MerkleServiceTrait for super::Database { todo!() } - async fn view_release( - &self, - _req: Request, - ) -> Result, Status> { - todo!() + async fn view_release(&self, req: Request) -> Result, Status> { + let mut views = self.views.lock().await; + // we don't care if this works :/ + views.delete(req.into_inner().view_id); + Ok(Response::new(())) } } From 6399d5f84d3417551597cb818dc95e65d3c3bb8e Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Tue, 20 Feb 2024 15:47:03 +0000 Subject: [PATCH 08/12] Merged upstream changes --- grpc-testtool/proto/merkle/merkle.proto | 3 --- 1 file changed, 3 deletions(-) diff --git a/grpc-testtool/proto/merkle/merkle.proto b/grpc-testtool/proto/merkle/merkle.proto index a554e1161..7519288c7 100644 --- a/grpc-testtool/proto/merkle/merkle.proto +++ b/grpc-testtool/proto/merkle/merkle.proto @@ -21,10 +21,7 @@ message NewProposalRequest { // If not given, the parent view is the current database revision. optional uint32 parent_view_id = 1; repeated PutRequest puts = 2; -<<<<<<< HEAD -======= // The keys being deleted. ->>>>>>> main repeated bytes deletes = 3; } From 12d74b14ec1988b728fb69c14058efdc4b948217 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Tue, 20 Feb 2024 16:16:36 +0000 Subject: [PATCH 09/12] Use CommitError from the proto These have a built-in as_str_name method we can utilize. Unfortunately, the proto system's error wants an implementation for Into, so I implemented From for String. --- grpc-testtool/src/lib.rs | 6 ++++++ grpc-testtool/src/service/merkle.rs | 17 +++++------------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/grpc-testtool/src/lib.rs b/grpc-testtool/src/lib.rs index 953309573..0392491b2 100644 --- a/grpc-testtool/src/lib.rs +++ b/grpc-testtool/src/lib.rs @@ -19,6 +19,12 @@ pub mod process_server { pub mod merkle { #![allow(clippy::unwrap_used, clippy::missing_const_for_fn)] tonic::include_proto!("merkle"); + + impl From for String { + fn from(e: CommitError) -> String { + e.as_str_name().into() + } + } } pub mod service; diff --git a/grpc-testtool/src/service/merkle.rs b/grpc-testtool/src/service/merkle.rs index 33b3c82b1..d2051c7c0 100644 --- a/grpc-testtool/src/service/merkle.rs +++ b/grpc-testtool/src/service/merkle.rs @@ -4,7 +4,7 @@ use crate::merkle::{ merkle_server::Merkle as MerkleServiceTrait, NewProposalRequest, NewProposalResponse, NewViewRequest, NewViewResponse, ProposalCommitRequest, ProposalCommitResponse, ViewGetRequest, ViewGetResponse, ViewHasRequest, ViewHasResponse, ViewNewIteratorWithStartAndPrefixRequest, - ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest, + ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest, CommitError }; use firewood::{ db::{BatchOp, Proposal}, @@ -14,13 +14,6 @@ use tonic::{async_trait, Request, Response, Status}; use super::{IntoStatusResultExt, View}; -//#[prost(uint32, optional, tag = "1")] -//pub parent_view_id: ::core::option::Option, -//#[prost(message, repeated, tag = "2")] -//pub puts: ::prost::alloc::vec::Vec, -//#[prost(bytes = "vec", repeated, tag = "3")] -//pub deletes: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, - #[async_trait] impl MerkleServiceTrait for super::Database { async fn new_proposal( @@ -54,11 +47,11 @@ impl MerkleServiceTrait for super::Database { Some(parent_id) => { let view = views.map.get(&parent_id); match view { - None => return Err(Status::invalid_argument("invalid view id")), + None => return Err(Status::invalid_argument(CommitError::ErrorInvalid)), Some(View::Proposal(parent)) => { firewood::v2::api::Proposal::propose(parent.clone(), data).await } - Some(_) => return Err(Status::invalid_argument("non-proposal id")), + Some(_) => return Err(Status::invalid_argument(CommitError::ErrorNonProposalId)), } } } @@ -80,9 +73,9 @@ impl MerkleServiceTrait for super::Database { let mut views = self.views.lock().await; match views.map.remove(&req.into_inner().proposal_id) { - None => return Err(Status::invalid_argument("invalid view id")), + None => return Err(Status::invalid_argument(CommitError::ErrorClosedCommit)), Some(View::Proposal(proposal)) => proposal.commit(), - Some(_) => return Err(Status::invalid_argument("non-proposal id")), + Some(_) => return Err(Status::invalid_argument(CommitError::ErrorNonProposalId)), } .await .into_status_result()?; From 7d5ea3fb74724e13a8c91c3782684998c47075b1 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 23 Feb 2024 23:56:52 +0000 Subject: [PATCH 10/12] MerkleKeyValueStream should implement Debug --- firewood/src/merkle/stream.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/firewood/src/merkle/stream.rs b/firewood/src/merkle/stream.rs index 69a12230b..58b2bf410 100644 --- a/firewood/src/merkle/stream.rs +++ b/firewood/src/merkle/stream.rs @@ -29,6 +29,16 @@ enum IterationNode<'a> { }, } +impl<'a> std::fmt::Debug for IterationNode<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unvisited { key, node } => f.debug_struct("Unvisited").field("key", key).field("node", node).finish(), + Self::Visited { key, children_iter : _} => f.debug_struct("Visited").field("key", key).finish(), + } + } +} + +#[derive(Debug)] enum NodeStreamState<'a> { /// The iterator state is lazily initialized when poll_next is called /// for the first time. The iteration start key is stored here. @@ -49,6 +59,7 @@ impl NodeStreamState<'_> { } } +#[derive(Debug)] pub struct MerkleNodeStream<'a, S, T> { state: NodeStreamState<'a>, merkle_root: DiskAddress, @@ -271,6 +282,7 @@ fn get_iterator_intial_state<'a, S: ShaleStore + Send + Sync, T>( } } +#[derive(Debug)] enum MerkleKeyValueStreamState<'a, S, T> { /// The iterator state is lazily initialized when poll_next is called /// for the first time. The iteration start key is stored here. @@ -295,6 +307,7 @@ impl<'a, S, T> MerkleKeyValueStreamState<'a, S, T> { } } +#[derive(Debug)] pub struct MerkleKeyValueStream<'a, S, T> { state: MerkleKeyValueStreamState<'a, S, T>, merkle_root: DiskAddress, From fe19314fb7cdbdb883fd7ec2e015741b2cfade8a Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Sat, 24 Feb 2024 00:09:31 +0000 Subject: [PATCH 11/12] WIP --- grpc-testtool/Cargo.toml | 1 + grpc-testtool/proto/merkle/merkle.proto | 18 +++-- grpc-testtool/src/lib.rs | 6 -- grpc-testtool/src/service.rs | 29 +++++-- grpc-testtool/src/service/database.rs | 15 +--- grpc-testtool/src/service/merkle.rs | 103 +++++++++++++++++++----- 6 files changed, 121 insertions(+), 51 deletions(-) diff --git a/grpc-testtool/Cargo.toml b/grpc-testtool/Cargo.toml index f7a22fe29..af9ba1e5d 100644 --- a/grpc-testtool/Cargo.toml +++ b/grpc-testtool/Cargo.toml @@ -29,6 +29,7 @@ env_logger = "0.11.2" chrono = "0.4.34" serde_json = "1.0.113" serde = { version = "1.0.196", features = ["derive"] } +futures = "*" [build-dependencies] tonic-build = "0.11.0" diff --git a/grpc-testtool/proto/merkle/merkle.proto b/grpc-testtool/proto/merkle/merkle.proto index 94261d3ea..a894d5376 100644 --- a/grpc-testtool/proto/merkle/merkle.proto +++ b/grpc-testtool/proto/merkle/merkle.proto @@ -7,16 +7,22 @@ import "google/protobuf/empty.proto"; // Methods on this service return status code NOT_FOUND if a requested // view, iterator or root hash is not found. service Merkle { + // --- Proposals --- rpc NewProposal(NewProposalRequest) returns (NewProposalResponse); rpc ProposalCommit(ProposalCommitRequest) returns (google.protobuf.Empty); + // --- Views --- rpc NewView(NewViewRequest) returns (NewViewResponse); + + // --- Reads --- // The methods below may be called with an ID that corresponds to either a (committable) proposal // or (non-committable) historical view. rpc ViewHas(ViewHasRequest) returns (ViewHasResponse); rpc ViewGet(ViewGetRequest) returns (ViewGetResponse); + + // --- Iterators --- rpc ViewNewIteratorWithStartAndPrefix(ViewNewIteratorWithStartAndPrefixRequest) returns (ViewNewIteratorWithStartAndPrefixResponse); - // Returns status code NOT_FOUND when the iterator is done. + // Returns status code OUT_OF_RANGE when the iterator is done rpc IteratorNext(IteratorNextRequest) returns (IteratorNextResponse); rpc IteratorError(IteratorErrorRequest) returns (google.protobuf.Empty); // Iterator can't be used (even to check error) after release. @@ -67,17 +73,17 @@ message ViewGetResponse { } message ViewNewIteratorWithStartAndPrefixRequest { - uint32 id = 1; + uint64 id = 1; bytes start = 2; bytes prefix = 3; } message ViewNewIteratorWithStartAndPrefixResponse { - uint32 id = 1; + uint64 id = 1; } message IteratorNextRequest { - uint32 id = 1; + uint64 id = 1; } message IteratorNextResponse { @@ -85,11 +91,11 @@ message IteratorNextResponse { } message IteratorErrorRequest { - uint32 id = 1; + uint64 id = 1; } message IteratorReleaseRequest { - uint32 id = 1; + uint64 id = 1; } message ViewReleaseRequest { diff --git a/grpc-testtool/src/lib.rs b/grpc-testtool/src/lib.rs index 0392491b2..953309573 100644 --- a/grpc-testtool/src/lib.rs +++ b/grpc-testtool/src/lib.rs @@ -19,12 +19,6 @@ pub mod process_server { pub mod merkle { #![allow(clippy::unwrap_used, clippy::missing_const_for_fn)] tonic::include_proto!("merkle"); - - impl From for String { - fn from(e: CommitError) -> String { - e.as_str_name().into() - } - } } pub mod service; diff --git a/grpc-testtool/src/service.rs b/grpc-testtool/src/service.rs index b42f1b552..6af26e4fc 100644 --- a/grpc-testtool/src/service.rs +++ b/grpc-testtool/src/service.rs @@ -2,11 +2,13 @@ // See the file LICENSE.md for licensing terms. use firewood::db::{Db, DbConfig}; +use firewood::merkle::MerkleKeyValueStream; use firewood::storage::WalConfig; use firewood::v2::{api::Db as _, api::Error}; use std::fmt::Debug; use std::path::Path; +use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::{ collections::HashMap, @@ -65,8 +67,14 @@ impl Views { fn delete(&mut self, view_id: u32) -> Option { self.map.remove(&view_id) } + + fn get(&self, view_id: u32) -> Option<&View> { + self.map.get(&view_id) + } } +use futures::Stream; + enum View { Historical(Arc<::Historical>), Proposal(Arc<::Proposal>), @@ -118,28 +126,35 @@ impl Database { } } -// TODO: implement Iterator -#[derive(Debug)] -struct Iter; +trait DebugStream: Stream + Debug + Send {} + +impl, T: Send + Sync + Debug> DebugStream for MerkleKeyValueStream<'_, S, T> {} +type Iter = dyn DebugStream, Vec), firewood::v2::api::Error>>; + +type IteratorID = u64; #[derive(Default, Debug)] struct Iterators { - map: HashMap, + map: HashMap>>, next_id: AtomicU64, } impl Iterators { - fn insert(&mut self, iter: Iter) -> u64 { + fn insert(&mut self, iter: Pin>) -> IteratorID { let id = self.next_id.fetch_add(1, Ordering::Relaxed); self.map.insert(id, iter); id } - fn _get(&self, id: u64) -> Option<&Iter> { + fn get(&self, id: IteratorID) -> Option<&Pin>> { self.map.get(&id) } - fn remove(&mut self, id: u64) { + fn get_mut(&mut self, id: IteratorID) -> Option<&mut Pin>> { + self.map.get_mut(&id) + } + + fn remove(&mut self, id: IteratorID) { self.map.remove(&id); } } diff --git a/grpc-testtool/src/service/database.rs b/grpc-testtool/src/service/database.rs index 7fc08ba05..857a6050a 100644 --- a/grpc-testtool/src/service/database.rs +++ b/grpc-testtool/src/service/database.rs @@ -1,7 +1,7 @@ // Copyright (C) 2023, Ava Labs, Inc. All rights reserved. // See the file LICENSE.md for licensing terms. -use super::{Database as DatabaseService, IntoStatusResultExt, Iter}; +use super::{Database as DatabaseService, IntoStatusResultExt}; use crate::rpcdb::{ database_server::Database, CloseRequest, CloseResponse, CompactRequest, CompactResponse, DeleteRequest, DeleteResponse, GetRequest, GetResponse, HasRequest, HasResponse, @@ -113,20 +113,13 @@ impl Database for DatabaseService { async fn new_iterator_with_start_and_prefix( &self, - request: Request, + req: Request, ) -> Result, Status> { let NewIteratorWithStartAndPrefixRequest { start: _, prefix: _, - } = request.into_inner(); - - // TODO: create the actual iterator - let id = { - let mut iters = self.iterators.lock().await; - iters.insert(Iter) - }; - - Ok(Response::new(NewIteratorWithStartAndPrefixResponse { id })) + } = req.into_inner(); + Err(Status::unimplemented("new_iterator_with_start_and_prefix not implemented")) } async fn iterator_next( diff --git a/grpc-testtool/src/service/merkle.rs b/grpc-testtool/src/service/merkle.rs index d2051c7c0..29ae80bc2 100644 --- a/grpc-testtool/src/service/merkle.rs +++ b/grpc-testtool/src/service/merkle.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use crate::merkle::{ - merkle_server::Merkle as MerkleServiceTrait, NewProposalRequest, NewProposalResponse, - NewViewRequest, NewViewResponse, ProposalCommitRequest, ProposalCommitResponse, ViewGetRequest, + merkle_server::Merkle as MerkleServiceTrait, IteratorErrorRequest, IteratorNextRequest, + IteratorNextResponse, IteratorReleaseRequest, NewProposalRequest, NewProposalResponse, + NewViewRequest, NewViewResponse, ProposalCommitRequest, PutRequest, ViewGetRequest, ViewGetResponse, ViewHasRequest, ViewHasResponse, ViewNewIteratorWithStartAndPrefixRequest, - ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest, CommitError + ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest, }; use firewood::{ db::{BatchOp, Proposal}, @@ -14,6 +15,8 @@ use tonic::{async_trait, Request, Response, Status}; use super::{IntoStatusResultExt, View}; +use futures::StreamExt; + #[async_trait] impl MerkleServiceTrait for super::Database { async fn new_proposal( @@ -42,57 +45,64 @@ impl MerkleServiceTrait for super::Database { // the proposal depends on the parent_view_id. If it's None, then we propose on the db itself // Otherwise, we're provided a base proposal to base it off of, so go fetch that // proposal from the views - let proposal = match req.parent_view_id { + let proposal = match req.parent_id { None => self.db.propose(data).await, Some(parent_id) => { let view = views.map.get(&parent_id); match view { - None => return Err(Status::invalid_argument(CommitError::ErrorInvalid)), + None => return Err(Status::not_found(format!("ID {parent_id} not found"))), Some(View::Proposal(parent)) => { firewood::v2::api::Proposal::propose(parent.clone(), data).await } - Some(_) => return Err(Status::invalid_argument(CommitError::ErrorNonProposalId)), + Some(_) => { + return Err(Status::invalid_argument(format!( + "ID {parent_id} is not a commitable proposal" + ))) + } } } } .into_status_result()?; // compute the next view id - let view_id = views.insert(View::Proposal(Arc::new(proposal))); + let id = views.insert(View::Proposal(Arc::new(proposal))); - let resp = Response::new(NewProposalResponse { - proposal_id: view_id, - }); + let resp = Response::new(NewProposalResponse { id }); Ok(resp) } async fn proposal_commit( &self, req: Request, - ) -> Result, Status> { + ) -> Result, Status> { let mut views = self.views.lock().await; + let id = req.into_inner().id; - match views.map.remove(&req.into_inner().proposal_id) { - None => return Err(Status::invalid_argument(CommitError::ErrorClosedCommit)), + match views.map.remove(&id) { + None => return Err(Status::not_found(format!("id {id} not found"))), Some(View::Proposal(proposal)) => proposal.commit(), - Some(_) => return Err(Status::invalid_argument(CommitError::ErrorNonProposalId)), + Some(_) => { + return Err(Status::invalid_argument(format!( + "id {id} is not a commitable proposal" + ))) + } } .await .into_status_result()?; - Ok(Response::new(ProposalCommitResponse { err: 0 })) + Ok(Response::new(())) } async fn new_view( &self, req: Request, ) -> Result, Status> { - let hash = std::convert::TryInto::<[u8; 32]>::try_into(req.into_inner().root_id) + let hash = std::convert::TryInto::<[u8; 32]>::try_into(req.into_inner().root_hash) .map_err(|_| api::Error::InvalidProposal) // TODO: better error here? .into_status_result()?; let mut views = self.views.lock().await; let view = self.db.revision(hash).await.into_status_result()?; - let view_id = views.insert(View::Historical(view)); - Ok(Response::new(NewViewResponse { view_id })) + let id = views.insert(View::Historical(view)); + Ok(Response::new(NewViewResponse { id })) } async fn view_has( @@ -111,15 +121,66 @@ impl MerkleServiceTrait for super::Database { async fn view_new_iterator_with_start_and_prefix( &self, - _req: Request, + req: Request, ) -> Result, Status> { - todo!() + let req = req.into_inner(); + let id = req.id; + let key = req.start; + let views = self + .views + .lock() + .await; + let iter = views + .get(id as u32) + .ok_or_else(|| Status::not_found("id {id} not found")); + let iter = iter + .map(|view| match view { + View::Historical(historical) => historical.stream_from(key.into_boxed_slice()), + View::Proposal(_proposal) => todo!(), // proposal.stream_from(key), + })?; + let mut iterators = self.iterators.lock().await; + let id = iterators.insert(Box::pin(iter)); + Ok(Response::new(ViewNewIteratorWithStartAndPrefixResponse { id })) } async fn view_release(&self, req: Request) -> Result, Status> { let mut views = self.views.lock().await; // we don't care if this works :/ - views.delete(req.into_inner().view_id); + views.delete(req.into_inner().id); Ok(Response::new(())) } + + async fn iterator_next( + &self, + req: Request, + ) -> Result, Status> { + let id = req.into_inner().id; + let mut iterators = self.iterators.lock().await; + let view = iterators + .get_mut(id) + .ok_or_else(|| Status::not_found(format!("iterator {id} not found")))?; + + let (key, value) = view + .next() + .await + .ok_or_else(|| Status::out_of_range(format!("iterator {id} at end")))? + .into_status_result()?; + Ok(Response::new(IteratorNextResponse { + data: Some(PutRequest { key: key.to_vec(), value }), + })) + } + + async fn iterator_release( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } + + async fn iterator_error( + &self, + _req: Request, + ) -> Result, Status> { + todo!() + } } From 4255e5b36d9876b5a6ac8ad16614a59ba48e655c Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Sat, 24 Feb 2024 00:09:44 +0000 Subject: [PATCH 12/12] Format --- firewood/src/merkle/stream.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/firewood/src/merkle/stream.rs b/firewood/src/merkle/stream.rs index 58b2bf410..0bbb9795b 100644 --- a/firewood/src/merkle/stream.rs +++ b/firewood/src/merkle/stream.rs @@ -32,8 +32,15 @@ enum IterationNode<'a> { impl<'a> std::fmt::Debug for IterationNode<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Unvisited { key, node } => f.debug_struct("Unvisited").field("key", key).field("node", node).finish(), - Self::Visited { key, children_iter : _} => f.debug_struct("Visited").field("key", key).finish(), + Self::Unvisited { key, node } => f + .debug_struct("Unvisited") + .field("key", key) + .field("node", node) + .finish(), + Self::Visited { + key, + children_iter: _, + } => f.debug_struct("Visited").field("key", key).finish(), } } }