Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support iterators and views in grpc in firewood #533

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bee6c15
rename proto
Feb 15, 2024
a21c255
nit newline
Feb 15, 2024
e29e5fa
Merge branch 'main' into danlaine/view-server-proto
Feb 15, 2024
3ebbb71
fix NewViewResponse
Feb 16, 2024
8c93966
Merge branch 'main' into danlaine/view-server-proto
rkuris Feb 16, 2024
6d2e47c
Framework for MerkleService
rkuris Feb 16, 2024
7d3046b
Improve build.rs
rkuris Feb 16, 2024
1f40f0d
Merge branch 'rkuris/improve-build-rs' into rkuris/implement-merkle-grpc
rkuris Feb 16, 2024
fd86e5c
Merge remote-tracking branch 'origin/main' into rkuris/implement-merk…
rkuris Feb 16, 2024
f517585
Merge remote-tracking branch 'origin/main' into rkuris/implement-merk…
rkuris Feb 16, 2024
f6320c0
More implementation of interface
rkuris Feb 16, 2024
1131c07
Implement view_release
rkuris Feb 16, 2024
df9b19b
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 17, 2024
ddadd4d
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 19, 2024
6d44238
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 19, 2024
e779124
Merge remote-tracking branch 'origin/main' into rkuris/implement-merk…
rkuris Feb 20, 2024
6399d5f
Merged upstream changes
rkuris Feb 20, 2024
12d74b1
Use CommitError from the proto
rkuris Feb 20, 2024
6706e63
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 21, 2024
abd6bac
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 23, 2024
7d5ea3f
MerkleKeyValueStream should implement Debug
rkuris Feb 23, 2024
fe19314
WIP
rkuris Feb 24, 2024
4255e5b
Format
rkuris Feb 24, 2024
6236f2a
Merge branch 'rkuris/mkv-stream-debug-impl' into rkuris/implement-mer…
rkuris Feb 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::{
};
use tokio::task::block_in_place;

mod proposal;
pub mod proposal;

use self::proposal::ProposalBase;

Expand Down
2 changes: 1 addition & 1 deletion firewood/src/db/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum ProposalBase {
}

#[async_trait]
impl crate::v2::api::Proposal for Proposal {
impl api::Proposal for Proposal {
type Proposal = Proposal;

#[allow(clippy::unwrap_used)]
Expand Down
20 changes: 20 additions & 0 deletions firewood/src/merkle/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ enum IterationNode<'a> {
},
}

impl<'a> std::fmt::Debug for IterationNode<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unvisited { key, node } => f
.debug_struct("Unvisited")
.field("key", key)
.field("node", node)
.finish(),
Self::Visited {
key,
children_iter: _,
} => f.debug_struct("Visited").field("key", key).finish(),
}
}
}

#[derive(Debug)]
enum NodeStreamState<'a> {
/// The iterator state is lazily initialized when poll_next is called
/// for the first time. The iteration start key is stored here.
Expand All @@ -49,6 +66,7 @@ impl NodeStreamState<'_> {
}
}

#[derive(Debug)]
pub struct MerkleNodeStream<'a, S, T> {
state: NodeStreamState<'a>,
merkle_root: DiskAddress,
Expand Down Expand Up @@ -271,6 +289,7 @@ fn get_iterator_intial_state<'a, S: ShaleStore<Node> + Send + Sync, T>(
}
}

#[derive(Debug)]
enum MerkleKeyValueStreamState<'a, S, T> {
/// The iterator state is lazily initialized when poll_next is called
/// for the first time. The iteration start key is stored here.
Expand All @@ -295,6 +314,7 @@ impl<'a, S, T> MerkleKeyValueStreamState<'a, S, T> {
}
}

#[derive(Debug)]
pub struct MerkleKeyValueStream<'a, S, T> {
state: MerkleKeyValueStreamState<'a, S, T>,
merkle_root: DiskAddress,
Expand Down
1 change: 1 addition & 0 deletions grpc-testtool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ env_logger = "0.11.2"
chrono = "0.4.34"
serde_json = "1.0.113"
serde = { version = "1.0.196", features = ["derive"] }
futures = "*"

[build-dependencies]
tonic-build = "0.11.0"
Expand Down
2 changes: 1 addition & 1 deletion grpc-testtool/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
// we want to import these proto files
let import_protos = ["sync", "rpcdb", "process-server"];
let import_protos = ["sync", "rpcdb", "process-server", "merkle"];

let protos: Box<[PathBuf]> = import_protos
.into_iter()
Expand Down
18 changes: 12 additions & 6 deletions grpc-testtool/proto/merkle/merkle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@ import "google/protobuf/empty.proto";
// Methods on this service return status code NOT_FOUND if a requested
// view, iterator or root hash is not found.
service Merkle {
// --- Proposals ---
rpc NewProposal(NewProposalRequest) returns (NewProposalResponse);
rpc ProposalCommit(ProposalCommitRequest) returns (google.protobuf.Empty);

// --- Views ---
rpc NewView(NewViewRequest) returns (NewViewResponse);

// --- Reads ---
// The methods below may be called with an ID that corresponds to either a (committable) proposal
// or (non-committable) historical view.
rpc ViewHas(ViewHasRequest) returns (ViewHasResponse);
rpc ViewGet(ViewGetRequest) returns (ViewGetResponse);

// --- Iterators ---
rpc ViewNewIteratorWithStartAndPrefix(ViewNewIteratorWithStartAndPrefixRequest) returns (ViewNewIteratorWithStartAndPrefixResponse);
// Returns status code NOT_FOUND when the iterator is done.
// Returns status code OUT_OF_RANGE when the iterator is done
rpc IteratorNext(IteratorNextRequest) returns (IteratorNextResponse);
rpc IteratorError(IteratorErrorRequest) returns (google.protobuf.Empty);
// Iterator can't be used (even to check error) after release.
Expand Down Expand Up @@ -67,29 +73,29 @@ message ViewGetResponse {
}

message ViewNewIteratorWithStartAndPrefixRequest {
rkuris marked this conversation as resolved.
Show resolved Hide resolved
uint32 id = 1;
uint64 id = 1;
bytes start = 2;
bytes prefix = 3;
}

message ViewNewIteratorWithStartAndPrefixResponse {
uint32 id = 1;
uint64 id = 1;
}

message IteratorNextRequest {
uint32 id = 1;
uint64 id = 1;
}

message IteratorNextResponse {
PutRequest data = 1;
}

message IteratorErrorRequest {
uint32 id = 1;
uint64 id = 1;
}

message IteratorReleaseRequest {
uint32 id = 1;
uint64 id = 1;
}

message ViewReleaseRequest {
Expand Down
5 changes: 5 additions & 0 deletions grpc-testtool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ pub mod process_server {
tonic::include_proto!("process");
}

pub mod merkle {
#![allow(clippy::unwrap_used, clippy::missing_const_for_fn)]
tonic::include_proto!("merkle");
}

pub mod service;

pub use service::Database as DatabaseService;
66 changes: 59 additions & 7 deletions grpc-testtool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
// See the file LICENSE.md for licensing terms.

use firewood::db::{Db, DbConfig};
use firewood::merkle::MerkleKeyValueStream;
use firewood::storage::WalConfig;
use firewood::v2::{api::Db as _, api::Error};

use std::fmt::Debug;
use std::path::Path;
use std::pin::Pin;
use std::sync::atomic::AtomicU32;
use std::{
collections::HashMap,
ops::Deref,
Expand All @@ -19,6 +23,7 @@ use tonic::Status;

pub mod database;
pub mod db;
pub mod merkle;
pub mod process;

trait IntoStatusResultExt<T> {
Expand All @@ -44,6 +49,45 @@ impl<T> IntoStatusResultExt<T> for Result<T, Error> {
pub struct Database {
db: Db,
iterators: Arc<Mutex<Iterators>>,
views: Arc<Mutex<Views>>,
}
#[derive(Default, Debug)]
struct Views {
map: HashMap<u32, View>,
next_id: AtomicU32,
}

impl Views {
fn insert(&mut self, view: View) -> u32 {
let next_id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.map.insert(next_id, view);
next_id
}

fn delete(&mut self, view_id: u32) -> Option<View> {
self.map.remove(&view_id)
}

fn get(&self, view_id: u32) -> Option<&View> {
self.map.get(&view_id)
}
}

use futures::Stream;

enum View {
Historical(Arc<<firewood::db::Db as firewood::v2::api::Db>::Historical>),
Proposal(Arc<<firewood::db::Db as firewood::v2::api::Db>::Proposal>),
}

// TODO: We manually implement Debug since Proposal does not, but probably should
impl Debug for View {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Historical(arg0) => f.debug_tuple("Historical").field(arg0).finish(),
Self::Proposal(_arg0) => f.debug_tuple("Proposal").finish(),
}
}
}

impl Database {
Expand All @@ -62,6 +106,7 @@ impl Database {
Ok(Self {
db,
iterators: Default::default(),
views: Default::default(),
})
}
}
Expand All @@ -81,28 +126,35 @@ impl Database {
}
}

// TODO: implement Iterator
#[derive(Debug)]
struct Iter;
trait DebugStream: Stream + Debug + Send {}

impl<S: Send + Sync + Debug + firewood::shale::ShaleStore<firewood::merkle::Node>, T: Send + Sync + Debug> DebugStream for MerkleKeyValueStream<'_, S, T> {}
type Iter = dyn DebugStream<Item = Result<(Box<[u8]>, Vec<u8>), firewood::v2::api::Error>>;

type IteratorID = u64;

#[derive(Default, Debug)]
struct Iterators {
map: HashMap<u64, Iter>,
map: HashMap<IteratorID, Pin<Box<Iter>>>,
next_id: AtomicU64,
}

impl Iterators {
fn insert(&mut self, iter: Iter) -> u64 {
fn insert(&mut self, iter: Pin<Box<Iter>>) -> IteratorID {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.map.insert(id, iter);
id
}

fn _get(&self, id: u64) -> Option<&Iter> {
fn get(&self, id: IteratorID) -> Option<&Pin<Box<Iter>>> {
self.map.get(&id)
}

fn remove(&mut self, id: u64) {
fn get_mut(&mut self, id: IteratorID) -> Option<&mut Pin<Box<Iter>>> {
self.map.get_mut(&id)
}

fn remove(&mut self, id: IteratorID) {
self.map.remove(&id);
}
}
Expand Down
15 changes: 4 additions & 11 deletions grpc-testtool/src/service/database.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE.md for licensing terms.

use super::{Database as DatabaseService, IntoStatusResultExt, Iter};
use super::{Database as DatabaseService, IntoStatusResultExt};
use crate::rpcdb::{
database_server::Database, CloseRequest, CloseResponse, CompactRequest, CompactResponse,
DeleteRequest, DeleteResponse, GetRequest, GetResponse, HasRequest, HasResponse,
Expand Down Expand Up @@ -113,20 +113,13 @@ impl Database for DatabaseService {

async fn new_iterator_with_start_and_prefix(
&self,
request: Request<NewIteratorWithStartAndPrefixRequest>,
req: Request<NewIteratorWithStartAndPrefixRequest>,
) -> Result<Response<NewIteratorWithStartAndPrefixResponse>, Status> {
let NewIteratorWithStartAndPrefixRequest {
start: _,
prefix: _,
} = request.into_inner();

// TODO: create the actual iterator
let id = {
let mut iters = self.iterators.lock().await;
iters.insert(Iter)
};

Ok(Response::new(NewIteratorWithStartAndPrefixResponse { id }))
} = req.into_inner();
Err(Status::unimplemented("new_iterator_with_start_and_prefix not implemented"))
}

async fn iterator_next(
Expand Down
Loading
Loading