diff --git a/Cargo.lock b/Cargo.lock index 95a2dea10..aeb11130a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2544,6 +2544,16 @@ dependencies = [ "dirs-sys", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + [[package]] name = "dirs-sys" version = "0.4.1" @@ -2556,6 +2566,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -3001,6 +3022,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -3024,6 +3054,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getset" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f636605b743120a8d32ed92fc27b6cde1a769f8f936c065151eb66f88ded513c" +dependencies = [ + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.85", +] + [[package]] name = "gimli" version = "0.28.1" @@ -5437,6 +5479,36 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +dependencies = [ + "bytes", +] + +[[package]] +name = "protobuf-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2df9942df2981178a930a72d442de47e2f0df18ad68e50a30f816f1848215ad0" +dependencies = [ + "bitflags 1.3.2", + "protobuf", + "protobuf-codegen", + "regex", +] + +[[package]] +name = "protobuf-codegen" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6" +dependencies = [ + "protobuf", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -5554,6 +5626,36 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "raft" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12688b23a649902762d4c11d854d73c49c9b93138f2de16403ef9f571ad5bae" +dependencies = [ + "bytes", + "fxhash", + "getset", + "protobuf", + "raft-proto", + "rand", + "slog", + "slog-envlogger", + "slog-stdlog", + "slog-term", + "thiserror", +] + +[[package]] +name = "raft-proto" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb6884896294f553e8d5cfbdb55080b9f5f2f43394afff59c9f077e0f4b46d6b" +dependencies = [ + "bytes", + "protobuf", + "protobuf-build", +] + [[package]] name = "rand" version = "0.8.5" @@ -6398,6 +6500,7 @@ dependencies = [ "bytestring", "codederror", "derive_builder", + "derive_more", "flexbuffers", "futures", "googletest", @@ -6407,12 +6510,15 @@ dependencies = [ "hyper-util", "prost", "prost-types", + "protobuf", + "raft", "restate-core", "restate-rocksdb", "restate-types", "rust-rocksdb", "schemars", "serde", + "slog", "static_assertions", "tempfile", "test-log", @@ -6427,7 +6533,9 @@ dependencies = [ "tower", "tower-http 0.5.2", "tracing", + "tracing-slog", "tracing-subscriber", + "ulid", ] [[package]] @@ -7755,6 +7863,74 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slog" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06" + +[[package]] +name = "slog-async" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72c8038f898a2c79507940990f05386455b3a317d8f18d4caea7cbc3d5096b84" +dependencies = [ + "crossbeam-channel", + "slog", + "take_mut", + "thread_local", +] + +[[package]] +name = "slog-envlogger" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "906a1a0bc43fed692df4b82a5e2fbfc3733db8dad8bb514ab27a4f23ad04f5c0" +dependencies = [ + "log", + "regex", + "slog", + "slog-async", + "slog-scope", + "slog-stdlog", + "slog-term", +] + +[[package]] +name = "slog-scope" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f95a4b4c3274cd2869549da82b57ccc930859bdbf5bcea0424bc5f140b3c786" +dependencies = [ + "arc-swap", + "lazy_static", + "slog", +] + +[[package]] +name = "slog-stdlog" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6706b2ace5bbae7291d3f8d2473e2bfab073ccd7d03670946197aec98471fa3e" +dependencies = [ + "log", + "slog", + "slog-scope", +] + +[[package]] +name = "slog-term" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6e022d0b998abfe5c3782c1f03551a596269450ccd677ea51c56f8b214610e8" +dependencies = [ + "is-terminal", + "slog", + "term", + "thread_local", + "time", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -8051,6 +8227,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + [[package]] name = "tap" version = "1.0.1" @@ -8069,6 +8251,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "termcolor" version = "1.4.1" @@ -8636,6 +8829,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-slog" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9306d2ca06aa9dfc8aa729ff884e9dca181f588a298cc5c59d4fdd91372570bf" +dependencies = [ + "once_cell", + "slog", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -8782,6 +8986,7 @@ checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" dependencies = [ "getrandom", "rand", + "serde", "web-time", ] diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index b04d107c8..40ea78a0e 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -18,10 +18,12 @@ restate-rocksdb = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } +assert2 = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } derive_builder = { workspace = true } +derive_more = { workspace = true } futures = { workspace = true } http = { workspace = true } humantime = { workspace = true } @@ -29,9 +31,12 @@ hyper = { workspace = true } hyper-util = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +protobuf = "2.28.0" +raft = { version = "0.7.0" } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } +slog = { version = "2.7.0" } static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } @@ -43,6 +48,8 @@ tonic-health = { workspace = true } tower = { workspace = true } tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } +tracing-slog = { version = "0.3.0" } +ulid = { workspace = true, features = ["serde"] } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } diff --git a/crates/metadata-store/build.rs b/crates/metadata-store/build.rs index 6c2386c90..2e16327e7 100644 --- a/crates/metadata-store/build.rs +++ b/crates/metadata-store/build.rs @@ -21,5 +21,11 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .compile_protos(&["./proto/metadata_store_svc.proto"], &["proto"])?; + tonic_build::configure() + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("metadata_store_network_svc.bin")) + .protoc_arg("--experimental_allow_proto3_optional") + .compile_protos(&["./proto/metadata_store_network_svc.proto"], &["proto"])?; + Ok(()) } diff --git a/crates/metadata-store/proto/metadata_store_network_svc.proto b/crates/metadata-store/proto/metadata_store_network_svc.proto new file mode 100644 index 000000000..ea6c1aa3e --- /dev/null +++ b/crates/metadata-store/proto/metadata_store_network_svc.proto @@ -0,0 +1,24 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package dev.restate.metadata_store_network_svc; + +// Grpc service definition for the metadata store network implementation. +service MetadataStoreNetworkSvc { + rpc ConnectTo(stream NetworkMessage) returns (stream NetworkMessage); +} + +message NetworkMessage { + bytes payload = 1; +} + diff --git a/crates/metadata-store/src/local/grpc/client.rs b/crates/metadata-store/src/grpc/client.rs similarity index 93% rename from crates/metadata-store/src/local/grpc/client.rs rename to crates/metadata-store/src/grpc/client.rs index 4d5362281..1ff30a203 100644 --- a/crates/metadata-store/src/local/grpc/client.rs +++ b/crates/metadata-store/src/grpc/client.rs @@ -21,17 +21,17 @@ use restate_core::network::net_util::CommonClientConnectionOptions; use restate_types::net::AdvertisedAddress; use restate_types::Version; +use crate::grpc::pb_conversions::ConversionError; use crate::grpc_svc::metadata_store_svc_client::MetadataStoreSvcClient; use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest}; -use crate::local::grpc::pb_conversions::ConversionError; -/// Client end to interact with the [`LocalMetadataStore`]. +/// Client end to interact with the metadata store. #[derive(Debug, Clone)] -pub struct LocalMetadataStoreClient { +pub struct GrpcMetadataStoreClient { svc_client: MetadataStoreSvcClient, } -impl LocalMetadataStoreClient { +impl GrpcMetadataStoreClient { pub fn new( metadata_store_address: AdvertisedAddress, options: &T, @@ -45,7 +45,7 @@ impl LocalMetadataStoreClient { } #[async_trait] -impl MetadataStore for LocalMetadataStoreClient { +impl MetadataStore for GrpcMetadataStoreClient { async fn get(&self, key: ByteString) -> Result, ReadError> { let response = self .svc_client diff --git a/crates/metadata-store/src/local/grpc/handler.rs b/crates/metadata-store/src/grpc/handler.rs similarity index 89% rename from crates/metadata-store/src/local/grpc/handler.rs rename to crates/metadata-store/src/grpc/handler.rs index 9a72aa6fa..d048129d4 100644 --- a/crates/metadata-store/src/local/grpc/handler.rs +++ b/crates/metadata-store/src/grpc/handler.rs @@ -8,28 +8,28 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::grpc::pb_conversions::ConversionError; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvc; use crate::grpc_svc::{DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest}; -use crate::local::grpc::pb_conversions::ConversionError; -use crate::local::store::{Error, MetadataStoreRequest, RequestSender}; +use crate::{MetadataStoreRequest, RequestError, RequestSender}; use async_trait::async_trait; use tokio::sync::oneshot; use tonic::{Request, Response, Status}; -/// Grpc svc handler for the [`LocalMetadataStore`]. +/// Grpc svc handler for the metadata store. #[derive(Debug)] -pub struct LocalMetadataStoreHandler { +pub struct MetadataStoreHandler { request_tx: RequestSender, } -impl LocalMetadataStoreHandler { +impl MetadataStoreHandler { pub fn new(request_tx: RequestSender) -> Self { Self { request_tx } } } #[async_trait] -impl MetadataStoreSvc for LocalMetadataStoreHandler { +impl MetadataStoreSvc for MetadataStoreHandler { async fn get(&self, request: Request) -> Result, Status> { let (result_tx, result_rx) = oneshot::channel(); @@ -129,10 +129,11 @@ impl MetadataStoreSvc for LocalMetadataStoreHandler { } } -impl From for Status { - fn from(err: Error) -> Self { +impl From for Status { + fn from(err: RequestError) -> Self { match err { - Error::FailedPrecondition(msg) => Status::failed_precondition(msg), + RequestError::FailedPrecondition(err) => Status::failed_precondition(err.to_string()), + RequestError::Unavailable(err) => Status::unavailable(err.to_string()), err => Status::internal(err.to_string()), } } diff --git a/crates/metadata-store/src/local/grpc/mod.rs b/crates/metadata-store/src/grpc/mod.rs similarity index 99% rename from crates/metadata-store/src/local/grpc/mod.rs rename to crates/metadata-store/src/grpc/mod.rs index 0efb8ab78..8499e7eeb 100644 --- a/crates/metadata-store/src/local/grpc/mod.rs +++ b/crates/metadata-store/src/grpc/mod.rs @@ -10,6 +10,8 @@ pub mod client; pub mod handler; +pub mod server; +pub mod service_builder; pub mod pb_conversions { use crate::grpc_svc; diff --git a/crates/metadata-store/src/grpc/server.rs b/crates/metadata-store/src/grpc/server.rs new file mode 100644 index 000000000..f5d0b9a32 --- /dev/null +++ b/crates/metadata-store/src/grpc/server.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::Request; +use hyper::body::Incoming; +use hyper_util::service::TowerToHyperService; +use restate_core::network::net_util; +use restate_core::ShutdownError; +use restate_types::health::HealthStatus; +use restate_types::net::BindAddress; +use restate_types::protobuf::common::MetadataServerStatus; +use tonic::body::boxed; +use tonic::service::Routes; +use tower::ServiceExt; +use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier}; + +pub struct GrpcServer { + bind_address: BindAddress, + routes: Routes, +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed running grpc server: {0}")] + GrpcServer(#[from] net_util::Error), + #[error("system is shutting down")] + Shutdown(#[from] ShutdownError), +} + +impl GrpcServer { + pub fn new(bind_address: BindAddress, routes: Routes) -> Self { + Self { + bind_address, + routes, + } + } + + pub async fn run(self, health_status: HealthStatus) -> Result<(), Error> { + let span_factory = tower_http::trace::DefaultMakeSpan::new() + .include_headers(true) + .level(tracing::Level::ERROR); + + let trace_layer = tower_http::trace::TraceLayer::new(SharedClassifier::new( + GrpcErrorsAsFailures::new().with_success(GrpcCode::FailedPrecondition), + )) + .make_span_with(span_factory); + + let server_builder = tonic::transport::Server::builder() + .layer(trace_layer) + .add_routes(self.routes); + + let service = TowerToHyperService::new( + server_builder + .into_service() + .map_request(|req: Request| req.map(boxed)), + ); + + net_util::run_hyper_server( + &self.bind_address, + service, + "metadata-store-grpc", + || health_status.update(MetadataServerStatus::Ready), + || health_status.update(MetadataServerStatus::Unknown), + ) + .await?; + + Ok(()) + } +} diff --git a/crates/metadata-store/src/grpc/service_builder.rs b/crates/metadata-store/src/grpc/service_builder.rs new file mode 100644 index 000000000..6f5540fcf --- /dev/null +++ b/crates/metadata-store/src/grpc/service_builder.rs @@ -0,0 +1,81 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use http::{Request, Response}; +use std::convert::Infallible; +use tonic::body::BoxBody; +use tonic::server::NamedService; +use tonic::service::{Routes, RoutesBuilder}; +use tonic_health::ServingStatus; +use tower::Service; + +#[derive(Debug)] +pub struct GrpcServiceBuilder<'a> { + reflection_service_builder: Option>, + routes_builder: RoutesBuilder, + svc_names: Vec<&'static str>, +} + +impl<'a> Default for GrpcServiceBuilder<'a> { + fn default() -> Self { + let routes_builder = RoutesBuilder::default(); + + Self { + reflection_service_builder: Some(tonic_reflection::server::Builder::configure()), + routes_builder, + svc_names: Vec::default(), + } + } +} + +impl<'a> GrpcServiceBuilder<'a> { + pub fn add_service(&mut self, svc: S) + where + S: Service, Response = Response, Error = Infallible> + + NamedService + + Clone + + Send + + 'static, + S::Future: Send + 'static, + { + self.svc_names.push(S::NAME); + self.routes_builder.add_service(svc); + } + + pub fn register_file_descriptor_set_for_reflection<'b: 'a>( + &mut self, + encoded_file_descriptor_set: &'b [u8], + ) { + self.reflection_service_builder = Some( + self.reflection_service_builder + .take() + .expect("be present") + .register_encoded_file_descriptor_set(encoded_file_descriptor_set), + ); + } + + pub async fn build(mut self) -> Result { + let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); + + for svc_name in self.svc_names { + health_reporter + .set_service_status(svc_name, ServingStatus::Serving) + .await; + } + + self.routes_builder.add_service(health_service); + self.routes_builder.add_service( + self.reflection_service_builder + .expect("be present") + .build_v1()?, + ); + Ok(self.routes_builder.routes()) + } +} diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 5704fbf5e..fc2eaefe8 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -8,9 +8,132 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod grpc; mod grpc_svc; pub mod local; +mod network; +pub mod raft; +mod util; +use bytestring::ByteString; +use restate_core::metadata_store::VersionedValue; pub use restate_core::metadata_store::{ MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError, }; +use restate_core::ShutdownError; +use restate_types::errors::GenericError; +use restate_types::storage::{StorageDecodeError, StorageEncodeError}; +use restate_types::Version; +use tokio::sync::{mpsc, oneshot}; + +pub type BoxedMetadataStoreService = Box; + +pub type RequestSender = mpsc::Sender; +pub type RequestReceiver = mpsc::Receiver; + +#[derive(Debug, thiserror::Error)] +pub enum RequestError { + #[error("internal error: {0}")] + Internal(GenericError), + #[error("service currently unavailable: {0}")] + Unavailable(GenericError), + #[error("failed precondition: {0}")] + FailedPrecondition(#[from] PreconditionViolation), + #[error("invalid argument: {0}")] + InvalidArgument(String), + #[error("encode error: {0}")] + Encode(#[from] StorageEncodeError), + #[error("decode error: {0}")] + Decode(#[from] StorageDecodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum PreconditionViolation { + #[error("key-value pair already exists")] + Exists, + #[error("expected version '{expected}' but found version '{actual:?}'")] + VersionMismatch { + expected: Version, + actual: Option, + }, +} + +impl PreconditionViolation { + fn kv_pair_exists() -> Self { + PreconditionViolation::Exists + } + + fn version_mismatch(expected: Version, actual: Option) -> Self { + PreconditionViolation::VersionMismatch { expected, actual } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("error while running server grpc reflection service: {0}")] + GrpcReflection(#[from] tonic_reflection::server::Error), + #[error("system is shutting down")] + Shutdown(#[from] ShutdownError), + #[error(transparent)] + Generic(#[from] GenericError), +} + +impl Error { + pub fn generic(err: impl Into) -> Error { + Error::Generic(err.into()) + } +} + +#[async_trait::async_trait] +pub trait MetadataStoreServiceBoxed { + async fn run_boxed(self: Box) -> Result<(), Error>; +} + +#[async_trait::async_trait] +impl MetadataStoreServiceBoxed for T { + async fn run_boxed(self: Box) -> Result<(), Error> { + (*self).run().await + } +} + +#[async_trait::async_trait] +pub trait MetadataStoreService: MetadataStoreServiceBoxed + Send { + async fn run(self) -> Result<(), Error>; + + fn boxed(self) -> BoxedMetadataStoreService + where + Self: Sized + 'static, + { + Box::new(self) + } +} + +#[async_trait::async_trait] +impl MetadataStoreService for Box { + async fn run(self) -> Result<(), Error> { + self.run_boxed().await + } +} + +#[derive(Debug)] +pub enum MetadataStoreRequest { + Get { + key: ByteString, + result_tx: oneshot::Sender, RequestError>>, + }, + GetVersion { + key: ByteString, + result_tx: oneshot::Sender, RequestError>>, + }, + Put { + key: ByteString, + value: VersionedValue, + precondition: Precondition, + result_tx: oneshot::Sender>, + }, + Delete { + key: ByteString, + precondition: Precondition, + result_tx: oneshot::Sender>, + }, +} diff --git a/crates/metadata-store/src/local/mod.rs b/crates/metadata-store/src/local/mod.rs index 8bf6d8988..57b8afe0a 100644 --- a/crates/metadata-store/src/local/mod.rs +++ b/crates/metadata-store/src/local/mod.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -mod grpc; mod store; mod service; @@ -19,10 +18,11 @@ use restate_types::{ errors::GenericError, }; -use crate::local::grpc::client::LocalMetadataStoreClient; -pub use service::{Error, LocalMetadataStoreService}; +use crate::grpc::client::GrpcMetadataStoreClient; -/// Creates a [`MetadataStoreClient`]. +pub use service::LocalMetadataStoreService; + +/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataStoreClient`]. pub async fn create_client( metadata_store_client_options: MetadataStoreClientOptions, ) -> Result { @@ -34,8 +34,9 @@ pub async fn create_client( let client = match metadata_store_client_options.metadata_store_client.clone() { MetadataStoreClientConfig::Embedded { address } => { - let store = LocalMetadataStoreClient::new(address, &metadata_store_client_options); - MetadataStoreClient::new(store, backoff_policy) + let inner_client = + GrpcMetadataStoreClient::new(address, &metadata_store_client_options); + MetadataStoreClient::new(inner_client, backoff_policy) } MetadataStoreClientConfig::Etcd { addresses } => { let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?; diff --git a/crates/metadata-store/src/local/service.rs b/crates/metadata-store/src/local/service.rs index 17cf85d4a..0db76664e 100644 --- a/crates/metadata-store/src/local/service.rs +++ b/crates/metadata-store/src/local/service.rs @@ -8,45 +8,25 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use http::Request; -use hyper::body::Incoming; -use hyper_util::service::TowerToHyperService; -use restate_types::health::HealthStatus; -use tonic::body::boxed; -use tonic::server::NamedService; -use tower::ServiceExt; -use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier}; - -use restate_core::network::net_util; -use restate_core::{ShutdownError, TaskCenter, TaskKind}; -use restate_rocksdb::RocksError; +use crate::grpc::handler::MetadataStoreHandler; +use crate::grpc::server::GrpcServer; +use crate::grpc::service_builder::GrpcServiceBuilder; +use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::local::store::LocalMetadataStore; +use crate::{grpc_svc, Error, MetadataStoreService}; +use futures::TryFutureExt; +use restate_core::{TaskCenter, TaskKind}; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; +use restate_types::health::HealthStatus; use restate_types::live::BoxedLiveLoad; use restate_types::protobuf::common::MetadataServerStatus; -use crate::grpc_svc; -use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; -use crate::local::grpc::handler::LocalMetadataStoreHandler; -use crate::local::store::LocalMetadataStore; - pub struct LocalMetadataStoreService { health_status: HealthStatus, opts: BoxedLiveLoad, rocksdb_options: BoxedLiveLoad, } -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("failed running grpc server: {0}")] - GrpcServer(#[from] net_util::Error), - #[error("error while running server server grpc reflection service: {0}")] - GrpcReflection(#[from] tonic_reflection::server::Error), - #[error("system is shutting down")] - Shutdown(#[from] ShutdownError), - #[error("rocksdb error: {0}")] - RocksDB(#[from] RocksError), -} - impl LocalMetadataStoreService { pub fn from_options( health_status: HealthStatus, @@ -60,12 +40,11 @@ impl LocalMetadataStoreService { rocksdb_options, } } +} - pub fn grpc_service_name(&self) -> &str { - MetadataStoreSvcServer::::NAME - } - - pub async fn run(self) -> Result<(), Error> { +#[async_trait::async_trait] +impl MetadataStoreService for LocalMetadataStoreService { + async fn run(self) -> Result<(), Error> { let LocalMetadataStoreService { health_status, mut opts, @@ -73,50 +52,22 @@ impl LocalMetadataStoreService { } = self; let options = opts.live_load(); let bind_address = options.bind_address.clone(); - let store = LocalMetadataStore::create(options, rocksdb_options).await?; - - let trace_layer = tower_http::trace::TraceLayer::new(SharedClassifier::new( - GrpcErrorsAsFailures::new().with_success(GrpcCode::FailedPrecondition), - )) - .make_span_with( - tower_http::trace::DefaultMakeSpan::new() - .include_headers(true) - .level(tracing::Level::ERROR), - ); - - let reflection_service_builder = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(grpc_svc::FILE_DESCRIPTOR_SET); - - let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); - health_reporter - .set_serving::>() - .await; - - let server_builder = tonic::transport::Server::builder() - .layer(trace_layer) - .add_service(health_service) - .add_service(MetadataStoreSvcServer::new(LocalMetadataStoreHandler::new( - store.request_sender(), - ))) - .add_service(reflection_service_builder.build_v1()?); - - let service = TowerToHyperService::new( - server_builder - .into_service() - .map_request(|req: Request| req.map(boxed)), - ); - - TaskCenter::spawn_child(TaskKind::RpcServer, "metadata-store-grpc", async move { - net_util::run_hyper_server( - &bind_address, - service, - "metadata-store-grpc", - || health_status.update(MetadataServerStatus::Ready), - || health_status.update(MetadataServerStatus::Unknown), - ) - .await?; - Ok(()) - })?; + let store = LocalMetadataStore::create(options, rocksdb_options) + .await + .map_err(|err| Error::Generic(err.into()))?; + let mut builder = GrpcServiceBuilder::default(); + + builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( + store.request_sender(), + ))); + let grpc_server = GrpcServer::new(bind_address, builder.build().await?); + + TaskCenter::spawn_child( + TaskKind::RpcServer, + "metadata-store-grpc", + grpc_server.run(health_status).map_err(Into::into), + )?; store.run().await; diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 50fa165a0..c94913ad8 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -8,6 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::{ + util, MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; use bytes::BytesMut; use bytestring::ByteString; use restate_core::cancellation_watcher; @@ -18,75 +21,16 @@ use restate_rocksdb::{ }; use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use restate_types::storage::{ - StorageCodec, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, -}; +use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; use restate_types::Version; -use rocksdb::{BoundColumnFamily, DBCompressionType, WriteBatch, WriteOptions, DB}; +use rocksdb::{BoundColumnFamily, WriteBatch, WriteOptions, DB}; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tracing::{debug, trace}; -pub type RequestSender = mpsc::Sender; -pub type RequestReceiver = mpsc::Receiver; - -type Result = std::result::Result; - const DB_NAME: &str = "local-metadata-store"; const KV_PAIRS: &str = "kv_pairs"; -#[derive(Debug)] -pub enum MetadataStoreRequest { - Get { - key: ByteString, - result_tx: oneshot::Sender>>, - }, - GetVersion { - key: ByteString, - result_tx: oneshot::Sender>>, - }, - Put { - key: ByteString, - value: VersionedValue, - precondition: Precondition, - result_tx: oneshot::Sender>, - }, - Delete { - key: ByteString, - precondition: Precondition, - result_tx: oneshot::Sender>, - }, -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("storage error: {0}")] - Storage(#[from] rocksdb::Error), - #[error("rocksdb error: {0}")] - RocksDb(#[from] RocksError), - #[error("failed precondition: {0}")] - FailedPrecondition(String), - #[error("invalid argument: {0}")] - InvalidArgument(String), - #[error("encode error: {0}")] - Encode(#[from] StorageEncodeError), - #[error("decode error: {0}")] - Decode(#[from] StorageDecodeError), -} - -impl Error { - fn kv_pair_exists() -> Self { - Error::FailedPrecondition("key-value pair already exists".to_owned()) - } - - fn version_mismatch(expected: Version, actual: Option) -> Self { - Error::FailedPrecondition(format!( - "Expected version '{}' but found version '{:?}'", - expected, actual - )) - } -} - /// Single node metadata store which stores the key value pairs in RocksDB. /// /// In order to avoid issues arising from concurrency, we run the metadata @@ -112,14 +56,18 @@ impl LocalMetadataStore { let db_name = DbName::new(DB_NAME); let db_manager = RocksDbManager::get(); let cfs = vec![CfName::new(KV_PAIRS)]; - let db_spec = DbSpecBuilder::new(db_name.clone(), options.data_dir(), db_options(options)) - .add_cf_pattern( - CfPrefixPattern::ANY, - cf_options(options.rocksdb_memory_budget()), - ) - .ensure_column_families(cfs) - .build() - .expect("valid spec"); + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build() + .expect("valid spec"); let db = db_manager .open_db(updateable_rocksdb_options.clone(), db_spec) @@ -216,9 +164,12 @@ impl LocalMetadataStore { }; } - fn get(&self, key: &ByteString) -> Result> { + fn get(&self, key: &ByteString) -> Result, RequestError> { let cf_handle = self.kv_cf_handle(); - let slice = self.db.get_pinned_cf(&cf_handle, key)?; + let slice = self + .db + .get_pinned_cf(&cf_handle, key) + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { Ok(Some(Self::decode(bytes)?)) @@ -227,9 +178,12 @@ impl LocalMetadataStore { } } - fn get_version(&self, key: &ByteString) -> Result> { + fn get_version(&self, key: &ByteString) -> Result, RequestError> { let cf_handle = self.kv_cf_handle(); - let slice = self.db.get_pinned_cf(&cf_handle, key)?; + let slice = self + .db + .get_pinned_cf(&cf_handle, key) + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { // todo only deserialize the version part @@ -245,7 +199,7 @@ impl LocalMetadataStore { key: &ByteString, value: &VersionedValue, precondition: Precondition, - ) -> Result<()> { + ) -> Result<(), RequestError> { match precondition { Precondition::None => Ok(self.write_versioned_kv_pair(key, value).await?), Precondition::DoesNotExist => { @@ -253,7 +207,7 @@ impl LocalMetadataStore { if current_version.is_none() { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(Error::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -261,7 +215,10 @@ impl LocalMetadataStore { if current_version == Some(version) { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(Error::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } @@ -271,7 +228,7 @@ impl LocalMetadataStore { &mut self, key: &ByteString, value: &VersionedValue, - ) -> Result<()> { + ) -> Result<(), RequestError> { self.buffer.clear(); Self::encode(value, &mut self.buffer)?; @@ -279,8 +236,7 @@ impl LocalMetadataStore { let cf_handle = self.kv_cf_handle(); let mut wb = WriteBatch::default(); wb.put_cf(&cf_handle, key, self.buffer.as_ref()); - Ok(self - .rocksdb + self.rocksdb .write_batch( "local-metadata-write-batch", Priority::High, @@ -288,10 +244,11 @@ impl LocalMetadataStore { write_options, wb, ) - .await?) + .await + .map_err(|err| RequestError::Internal(err.into())) } - fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> { + fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<(), RequestError> { match precondition { Precondition::None => self.delete_kv_pair(key), // this condition does not really make sense for the delete operation @@ -302,7 +259,7 @@ impl LocalMetadataStore { // nothing to do Ok(()) } else { - Err(Error::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -311,72 +268,35 @@ impl LocalMetadataStore { if current_version == Some(version) { self.delete_kv_pair(key) } else { - Err(Error::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } } - fn delete_kv_pair(&mut self, key: &ByteString) -> Result<()> { + fn delete_kv_pair(&mut self, key: &ByteString) -> Result<(), RequestError> { let write_options = self.write_options(); self.db .delete_cf_opt(&self.kv_cf_handle(), key, &write_options) - .map_err(Into::into) + .map_err(|err| RequestError::Internal(err.into())) } - fn encode(value: &T, buf: &mut BytesMut) -> Result<()> { + fn encode(value: &T, buf: &mut BytesMut) -> Result<(), RequestError> { StorageCodec::encode(value, buf)?; Ok(()) } - fn decode(buf: impl AsRef<[u8]>) -> Result { + fn decode(buf: impl AsRef<[u8]>) -> Result { let value = StorageCodec::decode(&mut buf.as_ref())?; Ok(value) } - fn log_error(result: &Result, request: &str) { + fn log_error(result: &Result, request: &str) { if let Err(err) = &result { debug!("failed to process request '{}': '{}'", request, err) } } } - -fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { - rocksdb::Options::default() -} - -fn cf_options( - memory_budget: usize, -) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { - move |mut opts| { - set_memory_related_opts(&mut opts, memory_budget); - opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - opts.set_num_levels(3); - - opts.set_compression_per_level(&[ - DBCompressionType::None, - DBCompressionType::None, - DBCompressionType::Zstd, - ]); - - // - opts - } -} - -fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { - // We set the budget to allow 1 mutable + 3 immutable. - opts.set_write_buffer_size(memtables_budget / 4); - - // merge 2 memtables when flushing to L0 - opts.set_min_write_buffer_number_to_merge(2); - opts.set_max_write_buffer_number(4); - // start flushing L0->L1 as soon as possible. each file on level0 is - // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than - // memtable_memory_budget. - opts.set_level_zero_file_num_compaction_trigger(2); - // doesn't really matter much, but we don't want to create too many files - opts.set_target_file_size_base(memtables_budget as u64 / 8); - // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast - opts.set_max_bytes_for_level_base(memtables_budget as u64); -} diff --git a/crates/metadata-store/src/local/tests.rs b/crates/metadata-store/src/local/tests.rs index 96fb9e82d..250f75572 100644 --- a/crates/metadata-store/src/local/tests.rs +++ b/crates/metadata-store/src/local/tests.rs @@ -27,9 +27,9 @@ use restate_types::net::{AdvertisedAddress, BindAddress}; use restate_types::protobuf::common::MetadataServerStatus; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; -use crate::local::grpc::client::LocalMetadataStoreClient; +use crate::grpc::client::GrpcMetadataStoreClient; use crate::local::service::LocalMetadataStoreService; -use crate::{MetadataStoreClient, Precondition, WriteError}; +use crate::{MetadataStoreClient, MetadataStoreService, Precondition, WriteError}; #[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)] struct Value { @@ -276,7 +276,7 @@ async fn durable_storage() -> anyhow::Result<()> { Ok(()) } -/// Creates a test environment with the [`RocksDBMetadataStore`] and a [`MetadataStoreClient`] +/// Creates a test environment with the [`RocksDBMetadataStore`] and a [`GrpcMetadataStoreClient`] /// connected to it. async fn create_test_environment( opts: &MetadataStoreOptions, @@ -341,9 +341,9 @@ async fn start_metadata_store( .wait_for_value(MetadataServerStatus::Ready) .await; - let rocksdb_client = LocalMetadataStoreClient::new(address, &metadata_store_client_options); + let grpc_client = GrpcMetadataStoreClient::new(address, &metadata_store_client_options); let client = MetadataStoreClient::new( - rocksdb_client, + grpc_client, Some(metadata_store_client_options.metadata_store_client_backoff_policy), ); diff --git a/crates/metadata-store/src/network/connection_manager.rs b/crates/metadata-store/src/network/connection_manager.rs new file mode 100644 index 000000000..1ace09587 --- /dev/null +++ b/crates/metadata-store/src/network/connection_manager.rs @@ -0,0 +1,193 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::network::Message; +use crate::network::NetworkMessage; +use futures::StreamExt; +use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::BoxStream; +use tracing::{debug, instrument}; + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("internal error: {0}")] + Internal(String), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +#[derive(Clone, derive_more::Debug)] +pub struct ConnectionManager { + inner: Arc>, +} + +impl ConnectionManager +where + M: Message + Send + 'static, +{ + pub fn new(identity: u64, router: mpsc::Sender) -> Self { + ConnectionManager { + inner: Arc::new(ConnectionManagerInner::new(identity, router)), + } + } + + pub fn identity(&self) -> u64 { + self.inner.identity + } + + pub fn accept_connection( + &self, + raft_peer: u64, + incoming_rx: tonic::Streaming, + ) -> Result, ConnectionError> { + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + self.run_connection(raft_peer, outgoing_tx, incoming_rx)?; + + let outgoing_stream = ReceiverStream::new(outgoing_rx) + .map(Result::<_, tonic::Status>::Ok) + .boxed(); + Ok(outgoing_stream) + } + + pub fn run_connection( + &self, + remote_peer: u64, + outgoing_tx: mpsc::Sender, + incoming_rx: tonic::Streaming, + ) -> Result<(), ConnectionError> { + let mut guard = self.inner.connections.lock().unwrap(); + + if guard.contains_key(&remote_peer) { + // we already have a connection established to remote peer + return Ok(()); + } + + let connection = Connection::new(outgoing_tx); + guard.insert(remote_peer, connection); + + let reactor = ConnectionReactor { + remote_peer, + connection_manager: Arc::clone(&self.inner), + }; + + let _task_id = TaskCenter::spawn_child( + TaskKind::ConnectionReactor, + "raft-connection-reactor", + reactor.run(incoming_rx), + )?; + + Ok(()) + } + + pub fn get_connection(&self, target: u64) -> Option { + self.inner.connections.lock().unwrap().get(&target).cloned() + } +} + +struct ConnectionReactor { + remote_peer: u64, + connection_manager: Arc>, +} + +impl ConnectionReactor +where + M: Message, +{ + #[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))] + async fn run(self, mut incoming_rx: tonic::Streaming) -> anyhow::Result<()> { + let mut shutdown = std::pin::pin!(cancellation_watcher()); + debug!("Run connection reactor"); + + loop { + tokio::select! { + _ = &mut shutdown => { + break; + }, + message = incoming_rx.next() => { + match message { + Some(message) => { + match message { + Ok(message) => { + let message = M::deserialize(&message.payload)?; + + assert_eq!(message.to(), self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity); + + if self.connection_manager.router.send(message).await.is_err() { + // system is shutting down + debug!("System is shutting down; closing connection"); + break; + } + } + Err(err) => { + debug!("Closing connection because received error: {err}"); + break; + } + } + } + None => { + debug!("Remote peer closed connection"); + break + }, + } + } + } + } + + Ok(()) + } +} + +impl Drop for ConnectionReactor { + fn drop(&mut self) { + debug!(remote_peer = %self.remote_peer, "Close connection"); + self.connection_manager + .connections + .lock() + .expect("shouldn't be poisoned") + .remove(&self.remote_peer); + } +} + +#[derive(Debug)] +struct ConnectionManagerInner { + identity: u64, + connections: Mutex>, + router: mpsc::Sender, +} + +impl ConnectionManagerInner { + pub fn new(identity: u64, router: mpsc::Sender) -> Self { + ConnectionManagerInner { + identity, + router, + connections: Mutex::default(), + } + } +} + +#[derive(Debug, Clone)] +pub struct Connection { + tx: mpsc::Sender, +} + +impl Connection { + pub fn new(tx: mpsc::Sender) -> Self { + Connection { tx } + } + + pub fn try_send(&self, message: NetworkMessage) -> Result<(), TrySendError> { + self.tx.try_send(message) + } +} diff --git a/crates/metadata-store/src/network/grpc_svc.rs b/crates/metadata-store/src/network/grpc_svc.rs new file mode 100644 index 000000000..20fc64f48 --- /dev/null +++ b/crates/metadata-store/src/network/grpc_svc.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +tonic::include_proto!("dev.restate.metadata_store_network_svc"); + +pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("metadata_store_network_svc"); diff --git a/crates/metadata-store/src/network/handler.rs b/crates/metadata-store/src/network/handler.rs new file mode 100644 index 000000000..811cfad7d --- /dev/null +++ b/crates/metadata-store/src/network/handler.rs @@ -0,0 +1,71 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::network::connection_manager::ConnectionError; +use crate::network::grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvc; +use crate::network::grpc_svc::NetworkMessage; +use crate::network::{ConnectionManager, Message}; +use std::str::FromStr; +use tonic::codegen::BoxStream; +use tonic::{Request, Response, Status, Streaming}; + +pub const PEER_METADATA_KEY: &str = "x-restate-metadata-store-peer"; + +#[derive(Debug)] +pub struct MetadataStoreNetworkHandler { + connection_manager: ConnectionManager, +} + +impl MetadataStoreNetworkHandler { + pub fn new(connection_manager: ConnectionManager) -> Self { + Self { connection_manager } + } +} + +#[async_trait::async_trait] +impl MetadataStoreNetworkSvc for MetadataStoreNetworkHandler +where + M: Message + Send + 'static, +{ + type ConnectToStream = BoxStream; + + async fn connect_to( + &self, + request: Request>, + ) -> Result, Status> { + let peer_metadata = + request + .metadata() + .get(PEER_METADATA_KEY) + .ok_or(Status::invalid_argument(format!( + "'{}' is missing", + PEER_METADATA_KEY + )))?; + let peer = u64::from_str( + peer_metadata + .to_str() + .map_err(|err| Status::invalid_argument(err.to_string()))?, + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + let outgoing_rx = self + .connection_manager + .accept_connection(peer, request.into_inner())?; + Ok(Response::new(outgoing_rx)) + } +} + +impl From for Status { + fn from(value: ConnectionError) -> Self { + match value { + ConnectionError::Internal(err) => Status::internal(err), + ConnectionError::Shutdown(err) => Status::aborted(err.to_string()), + } + } +} diff --git a/crates/metadata-store/src/network/mod.rs b/crates/metadata-store/src/network/mod.rs new file mode 100644 index 000000000..565bf5330 --- /dev/null +++ b/crates/metadata-store/src/network/mod.rs @@ -0,0 +1,20 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod connection_manager; +mod grpc_svc; +mod handler; +mod networking; + +pub use connection_manager::ConnectionManager; +pub use grpc_svc::metadata_store_network_svc_server::MetadataStoreNetworkSvcServer; +pub use grpc_svc::{NetworkMessage, FILE_DESCRIPTOR_SET}; +pub use handler::MetadataStoreNetworkHandler; +pub use networking::{Message, Networking}; diff --git a/crates/metadata-store/src/network/networking.rs b/crates/metadata-store/src/network/networking.rs new file mode 100644 index 000000000..fb668fec7 --- /dev/null +++ b/crates/metadata-store/src/network/networking.rs @@ -0,0 +1,168 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::network::connection_manager::ConnectionManager; +use crate::network::handler::PEER_METADATA_KEY; +use crate::network::NetworkMessage; +use bytes::{BufMut, Bytes, BytesMut}; +use futures::FutureExt; +use restate_core::network::net_util; +use restate_core::{ShutdownError, TaskCenter, TaskHandle, TaskKind}; +use restate_types::config::{Configuration, NetworkingOptions}; +use restate_types::net::AdvertisedAddress; +use std::collections::HashMap; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::metadata::MetadataValue; +use tonic::IntoStreamingRequest; +use tracing::{debug, trace}; + +#[derive(Debug, thiserror::Error)] +pub enum TrySendError { + #[error("failed sending message")] + Send(T), + #[error("connecting to peer")] + Connecting(T), + #[error("unknown peer: {0}")] + UnknownPeer(u64), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +#[derive(derive_more::Debug)] +pub struct Networking { + connection_manager: ConnectionManager, + addresses: HashMap, + #[debug(skip)] + connection_attempts: HashMap>>, + serde_buffer: BytesMut, +} + +impl Networking +where + M: Message + Clone + Send + 'static, +{ + pub fn new(connection_manager: ConnectionManager) -> Self { + Networking { + connection_manager, + addresses: HashMap::default(), + connection_attempts: HashMap::default(), + serde_buffer: BytesMut::with_capacity(1024), + } + } + + pub fn register_address(&mut self, peer: u64, address: AdvertisedAddress) { + self.addresses.insert(peer, address); + } + + pub fn try_send(&mut self, target: u64, message: M) -> Result<(), TrySendError> { + if let Some(connection) = self.connection_manager.get_connection(target) { + message.serialize(&mut self.serde_buffer); + + // todo: Maybe send message directly w/o indirection through NetworkMessage + let network_message = NetworkMessage { + payload: self.serde_buffer.split().freeze(), + }; + + connection + .try_send(network_message) + .map_err(|_err| TrySendError::Send(message))?; + } else if let Some(address) = self.addresses.get(&target) { + if let Some(task_handle) = self.connection_attempts.remove(&target) { + if !task_handle.is_finished() { + return Err(TrySendError::Connecting(message)); + } else { + match task_handle.now_or_never().expect("should be finished") { + Ok(result) => { + match result { + Ok(_) => trace!("Previous connection attempt to '{target}' succeeded but connection was closed in meantime."), + Err(err) => trace!("Previous connection attempt to '{target}' failed: {}", err) + } + + } + Err(err) => { + trace!("Previous connection attempt to '{target}' panicked: {}", err) + } + } + } + } + + self.connection_attempts.insert( + target, + Self::try_connecting_to( + self.connection_manager.clone(), + target, + address.clone(), + &Configuration::pinned().networking, + )?, + ); + return Err(TrySendError::Connecting(message)); + } else { + return Err(TrySendError::UnknownPeer(target)); + } + + Ok(()) + } + + fn try_connecting_to( + connection_manager: ConnectionManager, + target: u64, + address: AdvertisedAddress, + networking_options: &NetworkingOptions, + ) -> Result>, ShutdownError> { + TaskCenter::spawn_unmanaged( + TaskKind::RpcConnection, + "metadata-store-network-connection-attempt", + { + trace!(%target, "Try connecting to metadata store peer"); + let channel = net_util::create_tonic_channel_from_advertised_address( + address.clone(), + networking_options, + ); + + async move { + let mut network_client = crate::network::grpc_svc::metadata_store_network_svc_client::MetadataStoreNetworkSvcClient::new(channel); + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + + let mut request = ReceiverStream::new(outgoing_rx).into_streaming_request(); + // send our identity alongside with the request to the target + request.metadata_mut().insert( + PEER_METADATA_KEY, + MetadataValue::try_from(connection_manager.identity().to_string())?, + ); + + let incoming_rx = network_client.connect_to(request).await?; + + connection_manager.run_connection( + target, + outgoing_tx, + incoming_rx.into_inner(), + )?; + + Ok(()) + } + }, + ) + } +} + +/// A message that can be sent over the network +pub trait Message { + /// The target of the message + fn to(&self) -> u64; + + /// Serialize the message into the buffer + fn serialize(&self, buffer: impl BufMut); + + /// Deserialize the message from the bytes + fn deserialize(bytes: &Bytes) -> anyhow::Result + where + Self: Sized; +} diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs new file mode 100644 index 000000000..7c14a5b27 --- /dev/null +++ b/crates/metadata-store/src/raft/mod.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod service; +mod storage; +mod store; diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs new file mode 100644 index 000000000..192037bf1 --- /dev/null +++ b/crates/metadata-store/src/raft/service.rs @@ -0,0 +1,111 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::grpc::handler::MetadataStoreHandler; +use crate::grpc::server::GrpcServer; +use crate::grpc::service_builder::GrpcServiceBuilder; +use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::network::{ + ConnectionManager, Message, MetadataStoreNetworkHandler, MetadataStoreNetworkSvcServer, + Networking, +}; +use crate::raft::store::RaftMetadataStore; +use crate::{grpc_svc, network, Error, MetadataStoreService}; +use anyhow::Context; +use assert2::let_assert; +use bytes::{BufMut, Bytes}; +use futures::TryFutureExt; +use protobuf::Message as ProtobufMessage; +use restate_core::{TaskCenter, TaskKind}; +use restate_types::config::{Kind, MetadataStoreOptions, RocksDbOptions}; +use restate_types::health::HealthStatus; +use restate_types::live::BoxedLiveLoad; +use restate_types::protobuf::common::MetadataServerStatus; +use tokio::sync::mpsc; + +pub struct RaftMetadataStoreService { + health_status: HealthStatus, + options: BoxedLiveLoad, + rocksdb_options: BoxedLiveLoad, +} + +impl RaftMetadataStoreService { + pub fn new( + health_status: HealthStatus, + options: BoxedLiveLoad, + rocksdb_options: BoxedLiveLoad, + ) -> Self { + Self { + options, + rocksdb_options, + health_status, + } + } +} + +#[async_trait::async_trait] +impl MetadataStoreService for RaftMetadataStoreService { + async fn run(mut self) -> Result<(), Error> { + let store_options = self.options.live_load(); + let_assert!(Kind::Raft(raft_options) = &store_options.kind); + + let (router_tx, router_rx) = mpsc::channel(128); + let connection_manager = ConnectionManager::new(raft_options.id, router_tx); + let store = RaftMetadataStore::create( + raft_options, + self.rocksdb_options, + Networking::new(connection_manager.clone()), + router_rx, + ) + .await + .map_err(Error::generic)?; + + let mut builder = GrpcServiceBuilder::default(); + + builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( + store.request_sender(), + ))); + + builder.register_file_descriptor_set_for_reflection(network::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreNetworkSvcServer::new( + MetadataStoreNetworkHandler::new(connection_manager), + )); + + let grpc_server = + GrpcServer::new(store_options.bind_address.clone(), builder.build().await?); + + TaskCenter::spawn_child( + TaskKind::RpcServer, + "metadata-store-grpc", + grpc_server.run(self.health_status).map_err(Into::into), + )?; + + store.run().await.map_err(Error::generic)?; + + Ok(()) + } +} + +impl Message for raft::prelude::Message { + fn to(&self) -> u64 { + self.to + } + + fn serialize(&self, buffer: impl BufMut) { + let mut writer = buffer.writer(); + self.write_to_writer(&mut writer) + .expect("should be able to write message"); + } + + fn deserialize(bytes: &Bytes) -> anyhow::Result { + ProtobufMessage::parse_from_carllerche_bytes(bytes).context("failed deserializing message") + } +} diff --git a/crates/metadata-store/src/raft/storage.rs b/crates/metadata-store/src/raft/storage.rs new file mode 100644 index 000000000..b016888bb --- /dev/null +++ b/crates/metadata-store/src/raft/storage.rs @@ -0,0 +1,419 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::util; +use protobuf::{Message, ProtobufError}; +use raft::eraftpb::{ConfState, Entry, Snapshot}; +use raft::prelude::HardState; +use raft::{GetEntriesContext, RaftState, Storage, StorageError}; +use restate_rocksdb::{ + CfName, CfPrefixPattern, DbName, DbSpecBuilder, IoMode, Priority, RocksDb, RocksDbManager, + RocksError, +}; +use restate_types::config::{MetadataStoreOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; +use rocksdb::{BoundColumnFamily, ReadOptions, WriteBatch, WriteOptions, DB}; +use std::mem::size_of; +use std::sync::Arc; +use std::{error, mem}; + +const DB_NAME: &str = "raft-metadata-store"; +const RAFT_CF: &str = "raft"; + +const FIRST_RAFT_INDEX: u64 = 1; + +const RAFT_ENTRY_DISCRIMINATOR: u8 = 0x01; +const HARD_STATE_DISCRIMINATOR: u8 = 0x02; +const CONF_STATE_DISCRIMINATOR: u8 = 0x03; + +const RAFT_ENTRY_KEY_LENGTH: usize = 9; + +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + #[error("failed creating RocksDb: {0}")] + RocksDb(#[from] RocksError), +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed reading/writing from/to RocksDb: {0}")] + RocksDb(#[from] RocksError), + #[error("failed reading/writing from/to raw RocksDb: {0}")] + RocksDbRaw(#[from] rocksdb::Error), + #[error("failed encoding value: {0}")] + Encode(#[from] ProtobufError), + #[error("index '{index}' is out of bounds; last index is '{last_index}'")] + IndexOutOfBounds { index: u64, last_index: u64 }, + #[error("raft log has been compacted; first index is {0}")] + Compacted(u64), +} + +/// Map our internal error type to [`raft::Error`] to fit the [`Storage`] trait definition. +impl From for raft::Error { + fn from(value: Error) -> Self { + match value { + err @ Error::RocksDb(_) + | err @ Error::RocksDbRaw(_) + | err @ Error::IndexOutOfBounds { .. } => storage_error(err), + Error::Encode(err) => raft::Error::CodecError(err), + Error::Compacted(_) => raft::Error::Store(StorageError::Compacted), + } + } +} + +pub struct RocksDbStorage { + db: Arc, + rocksdb: Arc, + + last_index: u64, + buffer: Vec, +} + +impl RocksDbStorage { + pub async fn create( + options: &MetadataStoreOptions, + rocksdb_options: BoxedLiveLoad, + ) -> Result { + let db_name = DbName::new(DB_NAME); + let db_manager = RocksDbManager::get(); + let cfs = vec![CfName::new(RAFT_CF)]; + let db_spec = DbSpecBuilder::new( + db_name.clone(), + options.data_dir(), + util::db_options(options), + ) + .add_cf_pattern( + CfPrefixPattern::ANY, + util::cf_options(options.rocksdb_memory_budget()), + ) + .ensure_column_families(cfs) + .build() + .expect("valid spec"); + + let db = db_manager.open_db(rocksdb_options, db_spec).await?; + let rocksdb = db_manager + .get_db(db_name) + .expect("raft metadata store db is open"); + + let last_index = Self::find_last_index(&db); + + Ok(Self { + db, + rocksdb, + last_index, + buffer: Vec::with_capacity(1024), + }) + } +} + +impl RocksDbStorage { + fn write_options(&self) -> WriteOptions { + let mut write_opts = WriteOptions::default(); + write_opts.disable_wal(false); + // always sync to not lose data + write_opts.set_sync(true); + write_opts + } + + fn find_last_index(db: &DB) -> u64 { + let cf = db.cf_handle(RAFT_CF).expect("RAFT_CF exists"); + let start = Self::raft_entry_key(0); + let end = Self::raft_entry_key(u64::MAX); + + let mut options = ReadOptions::default(); + options.set_async_io(true); + options.set_iterate_range(start..end); + let mut iterator = db.raw_iterator_cf_opt(&cf, options); + + iterator.seek_to_last(); + + if iterator.valid() { + let key_bytes = iterator.key().expect("key should be present"); + assert_eq!( + key_bytes.len(), + RAFT_ENTRY_KEY_LENGTH, + "raft entry keys must consist of '{}' bytes", + RAFT_ENTRY_KEY_LENGTH + ); + u64::from_be_bytes( + key_bytes[1..(1 + size_of::())] + .try_into() + .expect("buffer should be long enough"), + ) + } else { + // the first valid raft index starts at 1, so 0 means there are no replicated raft entries + 0 + } + } + + pub fn get_hard_state(&self) -> Result { + let key = Self::hard_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_hard_state(&mut self, hard_state: HardState) -> Result<(), Error> { + let key = Self::hard_state_key(); + self.put_value(key, hard_state).await + } + + pub fn get_conf_state(&self) -> Result { + let key = Self::conf_state_key(); + self.get_value(key) + .map(|hard_state| hard_state.unwrap_or_default()) + } + + pub async fn store_conf_state(&mut self, conf_state: ConfState) -> Result<(), Error> { + let key = Self::conf_state_key(); + self.put_value(key, conf_state).await + } + + pub fn get_entry(&self, idx: u64) -> Result, Error> { + let key = Self::raft_entry_key(idx); + self.get_value(key) + } + + fn get_value(&self, key: impl AsRef<[u8]>) -> Result, Error> { + let cf = self.get_cf_handle(); + let bytes = self.db.get_pinned_cf(&cf, key)?; + + if let Some(bytes) = bytes { + let mut value = T::default(); + value.merge_from_bytes(bytes.as_ref())?; + Ok(Some(value)) + } else { + Ok(None) + } + } + + async fn put_value( + &mut self, + key: impl AsRef<[u8]>, + value: T, + ) -> Result<(), Error> { + self.buffer.clear(); + value.write_to_vec(&mut self.buffer)?; + + let cf = self.get_cf_handle(); + let mut write_batch = WriteBatch::default(); + write_batch.put_cf(&cf, key.as_ref(), &self.buffer); + self.rocksdb + .write_batch( + "put_value", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into) + } + + pub async fn append(&mut self, entries: &Vec) -> Result<(), Error> { + let mut write_batch = WriteBatch::default(); + let mut buffer = mem::take(&mut self.buffer); + let mut last_index = self.last_index; + + { + let cf = self.get_cf_handle(); + + for entry in entries { + assert_eq!(last_index + 1, entry.index, "Expect raft log w/o holes"); + let key = Self::raft_entry_key(entry.index); + + buffer.clear(); + entry.write_to_vec(&mut buffer)?; + + write_batch.put_cf(&cf, key, &buffer); + last_index = entry.index; + } + } + + let result = self + .rocksdb + .write_batch( + "append", + Priority::High, + IoMode::Default, + self.write_options(), + write_batch, + ) + .await + .map_err(Into::into); + + self.buffer = buffer; + self.last_index = last_index; + + result + } + + fn get_cf_handle(&self) -> Arc { + self.db.cf_handle(RAFT_CF).expect("RAFT_CF exists") + } + + fn raft_entry_key(idx: u64) -> [u8; RAFT_ENTRY_KEY_LENGTH] { + let mut key = [0; RAFT_ENTRY_KEY_LENGTH]; + key[0] = RAFT_ENTRY_DISCRIMINATOR; + key[1..9].copy_from_slice(&idx.to_be_bytes()); + key + } + + fn hard_state_key() -> [u8; 1] { + [HARD_STATE_DISCRIMINATOR] + } + + fn conf_state_key() -> [u8; 1] { + [CONF_STATE_DISCRIMINATOR] + } + + fn check_index(&self, idx: u64) -> Result<(), Error> { + if idx < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } else if idx > self.last_index() { + return Err(Error::IndexOutOfBounds { + index: idx, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn check_range(&self, low: u64, high: u64) -> Result<(), Error> { + assert!(low < high, "Low '{low}' must be smaller than high '{high}'"); + + if low < self.first_index() { + return Err(Error::Compacted(self.first_index())); + } + + if high > self.last_index() + 1 { + return Err(Error::IndexOutOfBounds { + index: high, + last_index: self.last_index(), + }); + } + + Ok(()) + } + + fn last_index(&self) -> u64 { + self.last_index + } + + fn first_index(&self) -> u64 { + FIRST_RAFT_INDEX + } + + pub fn apply_snapshot(&mut self, _snapshot: Snapshot) -> Result<(), Error> { + unimplemented!("snapshots are currently not supported"); + } +} + +impl Storage for RocksDbStorage { + fn initial_state(&self) -> raft::Result { + let hard_state = self.get_hard_state()?; + Ok(RaftState::new(hard_state, self.get_conf_state()?)) + } + + fn entries( + &self, + low: u64, + high: u64, + max_size: impl Into>, + _context: GetEntriesContext, + ) -> raft::Result> { + self.check_range(low, high)?; + let start_key = Self::raft_entry_key(low); + let end_key = Self::raft_entry_key(high); + + let cf = self.get_cf_handle(); + let mut opts = ReadOptions::default(); + opts.set_iterate_range(start_key..end_key); + opts.set_async_io(true); + + let mut iterator = self.db.raw_iterator_cf_opt(&cf, opts); + iterator.seek(start_key); + + let mut result = + Vec::with_capacity(usize::try_from(high - low).expect("u64 fits into usize")); + + let max_size = + usize::try_from(max_size.into().unwrap_or(u64::MAX)).expect("u64 fits into usize"); + let mut size = 0; + let mut expected_idx = low; + + while iterator.valid() { + if size > 0 && size >= max_size { + break; + } + + if let Some(value) = iterator.value() { + let mut entry = Entry::default(); + entry.merge_from_bytes(value)?; + + if expected_idx != entry.index { + if expected_idx == low { + Err(StorageError::Compacted)?; + } else { + // missing raft entries :-( + Err(StorageError::Unavailable)?; + } + } + + result.push(entry); + expected_idx += 1; + size += value.len(); + } + + iterator.next(); + } + + // check for an occurred error + iterator + .status() + .map_err(|err| StorageError::Other(err.into()))?; + + Ok(result) + } + + fn term(&self, idx: u64) -> raft::Result { + // todo handle first_index - 1 once truncation is supported + if idx == 0 { + return Ok(0); + } + + self.check_index(idx)?; + self.get_entry(idx) + .map(|entry| entry.expect("should exist").term) + .map_err(Into::into) + } + + fn first_index(&self) -> raft::Result { + Ok(self.first_index()) + } + + fn last_index(&self) -> raft::Result { + Ok(self.last_index()) + } + + fn snapshot(&self, _request_index: u64, _to: u64) -> raft::Result { + // time is relative as some clever people figured out + Err(raft::Error::Store( + StorageError::SnapshotTemporarilyUnavailable, + )) + } +} + +pub fn storage_error(error: E) -> raft::Error +where + E: Into>, +{ + raft::Error::Store(StorageError::Other(error.into())) +} diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs new file mode 100644 index 000000000..9c0bf32c2 --- /dev/null +++ b/crates/metadata-store/src/raft/store.rs @@ -0,0 +1,588 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::network::Networking; +use crate::raft::storage; +use crate::raft::storage::RocksDbStorage; +use crate::{ + MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; +use assert2::let_assert; +use bytes::{Bytes, BytesMut}; +use bytestring::ByteString; +use protobuf::{Message as ProtobufMessage, ProtobufError}; +use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Message}; +use raft::{Config, RawNode}; +use restate_core::cancellation_watcher; +use restate_core::metadata_store::{Precondition, VersionedValue}; +use restate_types::config::{Configuration, RaftOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; +use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; +use restate_types::{flexbuffers_storage_encode_decode, Version}; +use slog::o; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use tokio::time::MissedTickBehavior; +use tracing::{debug, info, warn}; +use tracing_slog::TracingSlogDrain; +use ulid::Ulid; + +#[derive(Debug, thiserror::Error)] +pub enum BuildError { + #[error("failed creating raft node: {0}")] + Raft(#[from] raft::Error), + #[error("failed creating raft storage: {0}")] + Storage(#[from] storage::BuildError), + #[error("failed bootstrapping conf state: {0}")] + BootstrapConfState(#[from] storage::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed appending entries: {0}")] + Append(#[from] raft::Error), + #[error("failed deserializing raft serialized requests: {0}")] + DecodeRequest(StorageDecodeError), + #[error("failed deserializing conf change: {0}")] + DecodeConf(ProtobufError), + #[error("failed applying conf change: {0}")] + ApplyConfChange(raft::Error), + #[error("failed reading/writing from/to storage: {0}")] + Storage(#[from] storage::Error), +} + +pub struct RaftMetadataStore { + _logger: slog::Logger, + raw_node: RawNode, + networking: Networking, + raft_rx: mpsc::Receiver, + tick_interval: time::Interval, + + callbacks: HashMap, + kv_entries: HashMap, + + request_tx: RequestSender, + request_rx: RequestReceiver, +} + +impl RaftMetadataStore { + pub async fn create( + raft_options: &RaftOptions, + rocksdb_options: BoxedLiveLoad, + mut networking: Networking, + raft_rx: mpsc::Receiver, + ) -> Result { + let (request_tx, request_rx) = mpsc::channel(2); + + let config = Config { + id: raft_options.id, + ..Default::default() + }; + + let mut metadata_store_options = + Configuration::updateable().map(|configuration| &configuration.metadata_store); + let mut storage = + RocksDbStorage::create(metadata_store_options.live_load(), rocksdb_options).await?; + + // todo: Only write configuration on initialization + let voters: Vec<_> = raft_options.peers.keys().cloned().collect(); + let conf_state = ConfState::from((voters, vec![])); + storage.store_conf_state(conf_state).await?; + + // todo: Persist address information with configuration + for (peer, address) in &raft_options.peers { + networking.register_address(*peer, address.clone()); + } + + let drain = TracingSlogDrain; + let logger = slog::Logger::root(drain, o!()); + + let raw_node = RawNode::new(&config, storage, &logger)?; + + let mut tick_interval = time::interval(Duration::from_millis(100)); + tick_interval.set_missed_tick_behavior(MissedTickBehavior::Burst); + + Ok(Self { + // we only need to keep it alive + _logger: logger, + raw_node, + raft_rx, + networking, + tick_interval, + callbacks: HashMap::default(), + kv_entries: HashMap::default(), + request_rx, + request_tx, + }) + } + + pub fn request_sender(&self) -> RequestSender { + self.request_tx.clone() + } + + pub async fn run(mut self) -> Result<(), Error> { + let mut cancellation = std::pin::pin!(cancellation_watcher()); + + loop { + tokio::select! { + _ = &mut cancellation => { + break; + }, + raft = self.raft_rx.recv() => { + if let Some(raft) = raft { + self.raw_node.step(raft)?; + } else { + break; + } + } + Some(request) = self.request_rx.recv() => { + // todo: Unclear whether every replica should be allowed to propose. Maybe + // only the leader should propose and respond to clients. + let (callback, request) = Self::split_request(request); + + if let Err(err) = request + .encode_to_vec() + .map_err(Into::into) + .and_then(|request| self.raw_node + .propose(vec![], request) + .map_err(RequestError::from)) { + info!("Failed processing request: {err}"); + callback.fail(err); + continue; + } + + self.register_callback(callback); + } + _ = self.tick_interval.tick() => { + self.raw_node.tick(); + } + } + + self.on_ready().await?; + } + + debug!("Stop running RaftMetadataStore."); + + Ok(()) + } + + async fn on_ready(&mut self) -> Result<(), Error> { + if !self.raw_node.has_ready() { + return Ok(()); + } + + let mut ready = self.raw_node.ready(); + + // first need to send outgoing messages + if !ready.messages().is_empty() { + self.send_messages(ready.take_messages()); + } + + // apply snapshot if one was sent + if !ready.snapshot().is_empty() { + if let Err(err) = self + .raw_node + .mut_store() + .apply_snapshot(ready.snapshot().clone()) + { + warn!("failed applying snapshot: {err}"); + } + } + + // then handle committed entries + self.handle_committed_entries(ready.take_committed_entries()) + .await?; + + // append new Raft entries to storage + self.raw_node.mut_store().append(ready.entries()).await?; + + // update the hard state if an update was produced (e.g. vote has happened) + if let Some(hs) = ready.hs() { + self.raw_node + .mut_store() + .store_hard_state(hs.clone()) + .await?; + } + + // send persisted messages (after entries were appended and hard state was updated) + if !ready.persisted_messages().is_empty() { + self.send_messages(ready.take_persisted_messages()); + } + + // advance the raft node + let mut light_ready = self.raw_node.advance(ready); + + // update the commit index if it changed + if let Some(_commit) = light_ready.commit_index() { + // update commit index in cached hard_state; no need to persist it though + } + + // send outgoing messages + if !light_ready.messages().is_empty() { + self.send_messages(light_ready.take_messages()); + } + + // handle committed entries + if !light_ready.committed_entries().is_empty() { + self.handle_committed_entries(light_ready.take_committed_entries()) + .await?; + } + + self.raw_node.advance_apply(); + + Ok(()) + } + + fn register_callback(&mut self, callback: Callback) { + self.callbacks.insert(callback.request_id, callback); + } + + fn send_messages(&mut self, messages: Vec) { + for message in messages { + if let Err(err) = self.networking.try_send(message.to, message) { + debug!("failed sending message: {err}"); + } + } + } + + async fn handle_committed_entries( + &mut self, + committed_entries: Vec, + ) -> Result<(), Error> { + for entry in committed_entries { + if entry.data.is_empty() { + // new leader was elected + continue; + } + + match entry.get_entry_type() { + EntryType::EntryNormal => self.handle_normal_entry(entry)?, + EntryType::EntryConfChange => self.handle_conf_change(entry).await?, + EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry).await?, + } + } + + Ok(()) + } + + fn handle_normal_entry(&mut self, entry: Entry) -> Result<(), Error> { + let request = Request::decode_from_bytes(entry.data).map_err(Error::DecodeRequest)?; + self.handle_request(request); + + Ok(()) + } + + fn handle_request(&mut self, request: Request) { + match request.kind { + RequestKind::Get { key } => { + let result = self.get(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get(result); + } + } + RequestKind::GetVersion { key } => { + let result = self.get_version(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get_version(result); + } + } + RequestKind::Put { + key, + value, + precondition, + } => { + let result = self.put(key, value, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_put(result.map_err(Into::into)); + } + } + RequestKind::Delete { key, precondition } => { + let result = self.delete(key, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_delete(result.map_err(Into::into)); + } + } + } + } + + fn get(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).cloned() + } + + fn get_version(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).map(|entry| entry.version) + } + + fn put( + &mut self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.insert(key, value); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + + self.kv_entries.insert(key, value); + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.insert(key, value); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + fn delete( + &mut self, + key: ByteString, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.remove(&key); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.remove(&key); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + async fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChange::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.mut_store().store_conf_state(cs).await?; + Ok(()) + } + + async fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChangeV2::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.mut_store().store_conf_state(cs).await?; + Ok(()) + } + + fn split_request(request: MetadataStoreRequest) -> (Callback, Request) { + let (request_kind, callback_kind) = match request { + MetadataStoreRequest::Get { key, result_tx } => { + (RequestKind::Get { key }, CallbackKind::Get { result_tx }) + } + MetadataStoreRequest::GetVersion { key, result_tx } => ( + RequestKind::GetVersion { key }, + CallbackKind::GetVersion { result_tx }, + ), + MetadataStoreRequest::Put { + key, + value, + precondition, + result_tx, + } => ( + RequestKind::Put { + key, + value, + precondition, + }, + CallbackKind::Put { result_tx }, + ), + MetadataStoreRequest::Delete { + key, + precondition, + result_tx, + } => ( + RequestKind::Delete { key, precondition }, + CallbackKind::Delete { result_tx }, + ), + }; + + let request_id = Ulid::new(); + + let callback = Callback { + request_id, + kind: callback_kind, + }; + + let request = Request { + request_id, + kind: request_kind, + }; + + (callback, request) + } +} + +struct Callback { + request_id: Ulid, + kind: CallbackKind, +} + +impl Callback { + fn fail(self, err: impl Into) { + match self.kind { + CallbackKind::Get { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::GetVersion { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Put { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Delete { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + }; + } + + fn complete_get(self, result: Option) { + let_assert!( + CallbackKind::Get { result_tx } = self.kind, + "expected 'Get' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_get_version(self, result: Option) { + let_assert!( + CallbackKind::GetVersion { result_tx } = self.kind, + "expected 'GetVersion' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_put(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Put { result_tx } = self.kind, + "expected 'Put' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } + + fn complete_delete(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Delete { result_tx } = self.kind, + "expected 'Delete' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } +} + +enum CallbackKind { + Get { + result_tx: oneshot::Sender, RequestError>>, + }, + GetVersion { + result_tx: oneshot::Sender, RequestError>>, + }, + Put { + result_tx: oneshot::Sender>, + }, + Delete { + result_tx: oneshot::Sender>, + }, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Request { + request_id: Ulid, + kind: RequestKind, +} + +flexbuffers_storage_encode_decode!(Request); + +impl Request { + fn encode_to_vec(&self) -> Result, StorageEncodeError> { + let mut buffer = BytesMut::new(); + // todo: Removing support for BufMut requires an extra copy from BytesMut to Vec :-( + StorageCodec::encode(self, &mut buffer)?; + Ok(buffer.to_vec()) + } + + fn decode_from_bytes(mut bytes: Bytes) -> Result { + StorageCodec::decode::(&mut bytes) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +enum RequestKind { + Get { + key: ByteString, + }, + GetVersion { + key: ByteString, + }, + Put { + key: ByteString, + value: VersionedValue, + precondition: Precondition, + }, + Delete { + key: ByteString, + precondition: Precondition, + }, +} + +impl From for RequestError { + fn from(value: raft::Error) -> Self { + match value { + err @ raft::Error::ProposalDropped => RequestError::Unavailable(err.into()), + err => RequestError::Internal(err.into()), + } + } +} diff --git a/crates/metadata-store/src/util.rs b/crates/metadata-store/src/util.rs new file mode 100644 index 000000000..41b971da4 --- /dev/null +++ b/crates/metadata-store/src/util.rs @@ -0,0 +1,52 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::config::MetadataStoreOptions; +use rocksdb::DBCompressionType; + +pub fn db_options(_options: &MetadataStoreOptions) -> rocksdb::Options { + rocksdb::Options::default() +} + +pub fn cf_options( + memory_budget: usize, +) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static { + move |mut opts| { + set_memory_related_opts(&mut opts, memory_budget); + opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); + opts.set_num_levels(3); + + opts.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::None, + DBCompressionType::Zstd, + ]); + + // + opts + } +} + +pub fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) { + // We set the budget to allow 1 mutable + 3 immutable. + opts.set_write_buffer_size(memtables_budget / 4); + + // merge 2 memtables when flushing to L0 + opts.set_min_write_buffer_number_to_merge(2); + opts.set_max_write_buffer_number(4); + // start flushing L0->L1 as soon as possible. each file on level0 is + // (memtable_memory_budget / 2). This will flush level 0 when it's bigger than + // memtable_memory_budget. + opts.set_level_zero_file_num_compaction_trigger(2); + // doesn't really matter much, but we don't want to create too many files + opts.set_target_file_size_base(memtables_budget as u64 / 8); + // make Level1 size equal to Level0 size, so that L0->L1 compactions are fast + opts.set_max_bytes_for_level_base(memtables_budget as u64); +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 4ecc8caed..9a8430312 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -31,10 +31,13 @@ use restate_core::{ #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; -use restate_metadata_store::MetadataStoreClient; -use restate_types::config::{CommonOptions, Configuration}; +use restate_metadata_store::raft::service::RaftMetadataStoreService; +use restate_metadata_store::{ + BoxedMetadataStoreService, MetadataStoreClient, MetadataStoreService, +}; +use restate_types::config::{CommonOptions, Configuration, Kind}; use restate_types::errors::GenericError; -use restate_types::health::Health; +use restate_types::health::{Health, HealthStatus}; use restate_types::live::Live; #[cfg(feature = "replicated-loglet")] use restate_types::logs::RecordCache; @@ -110,7 +113,7 @@ pub struct Node { partition_routing_refresher: PartitionRoutingRefresher, metadata_store_client: MetadataStoreClient, bifrost: BifrostService, - metadata_store_role: Option, + metadata_store_role: Option, base_role: BaseRole, admin_role: Option>, worker_role: Option, @@ -131,13 +134,9 @@ impl Node { // todo(asoli) move local metadata store to use NetworkServer let metadata_store_role = if config.has_role(Role::MetadataStore) { - Some(LocalMetadataStoreService::from_options( + Some(Self::create_metadata_store( + &updateable_config, health.metadata_server_status(), - updateable_config.clone().map(|c| &c.metadata_store).boxed(), - updateable_config - .clone() - .map(|config| &config.metadata_store.rocksdb) - .boxed(), )) } else { None @@ -306,6 +305,32 @@ impl Node { }) } + fn create_metadata_store( + updateable_config: &Live, + health_status: HealthStatus, + ) -> BoxedMetadataStoreService { + match updateable_config.pinned().metadata_store.kind { + Kind::Local => LocalMetadataStoreService::from_options( + health_status, + updateable_config.clone().map(|c| &c.metadata_store).boxed(), + updateable_config + .clone() + .map(|config| &config.metadata_store.rocksdb) + .boxed(), + ) + .boxed(), + Kind::Raft(_) => RaftMetadataStoreService::new( + health_status, + updateable_config.clone().map(|c| &c.metadata_store).boxed(), + updateable_config + .clone() + .map(|config| &config.metadata_store.rocksdb) + .boxed(), + ) + .boxed(), + } + } + pub async fn start(mut self) -> Result<(), anyhow::Error> { let config = self.updateable_config.pinned(); diff --git a/crates/types/src/config/metadata_store.rs b/crates/types/src/config/metadata_store.rs index c24411c89..512d1e89d 100644 --- a/crates/types/src/config/metadata_store.rs +++ b/crates/types/src/config/metadata_store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::num::NonZeroUsize; use std::path::PathBuf; @@ -18,7 +19,7 @@ use restate_serde_util::NonZeroByteCount; use tracing::warn; use super::{data_dir, CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; -use crate::net::BindAddress; +use crate::net::{AdvertisedAddress, BindAddress}; /// # Metadata store options #[serde_as] @@ -60,6 +61,20 @@ pub struct MetadataStoreOptions { /// /// The RocksDB options which will be used to configure the metadata store's RocksDB instance. pub rocksdb: RocksDbOptions, + + /// # Type of metadata store to start + /// + /// The type of metadata store to start when running the metadata store role. + pub kind: Kind, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub enum Kind { + #[default] + Local, + Raft(RaftOptions), } impl MetadataStoreOptions { @@ -112,6 +127,18 @@ impl Default for MetadataStoreOptions { rocksdb_memory_budget: None, rocksdb_memory_ratio: 0.01, rocksdb, + kind: Kind::default(), } } } + +#[serde_as] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub struct RaftOptions { + pub id: u64, + #[cfg_attr(feature = "schemars", schemars(with = "Vec<(u64, String)>"))] + #[serde_as(as = "serde_with::Seq<(_, _)>")] + pub peers: HashMap, +} diff --git a/tools/restatectl/src/environment/metadata_store.rs b/tools/restatectl/src/environment/metadata_store.rs index 0986bee99..33b3b37cb 100644 --- a/tools/restatectl/src/environment/metadata_store.rs +++ b/tools/restatectl/src/environment/metadata_store.rs @@ -13,6 +13,7 @@ use tracing::info; use restate_core::metadata_store::MetadataStoreClient; use restate_core::{TaskCenter, TaskKind}; use restate_metadata_store::local::LocalMetadataStoreService; +use restate_metadata_store::MetadataStoreService; use restate_types::config::{MetadataStoreClientOptions, MetadataStoreOptions, RocksDbOptions}; use restate_types::health::HealthStatus; use restate_types::live::BoxedLiveLoad;