-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement simple networking layer for RaftMetadataStore
This fixes #1803.
- Loading branch information
1 parent
a6d99f7
commit 3b113e3
Showing
15 changed files
with
562 additions
and
26 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH | ||
// | ||
// This file is part of the Restate service protocol, which is | ||
// released under the MIT license. | ||
// | ||
// You can find a copy of the license in file LICENSE in the root | ||
// directory of this repository or package, or at | ||
// https://github.com/restatedev/proto/blob/main/LICENSE | ||
|
||
syntax = "proto3"; | ||
|
||
import "google/protobuf/empty.proto"; | ||
|
||
package dev.restate.raft_metadata_store_svc; | ||
|
||
// Grpc service definition for the RaftMetadataStore implementation. | ||
service RaftMetadataStoreSvc { | ||
rpc Raft(stream RaftMessage) returns (stream RaftMessage); | ||
} | ||
|
||
message RaftMessage { | ||
bytes message = 1; | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. | ||
// All rights reserved. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the LICENSE file. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0. | ||
|
||
use crate::raft::grpc_svc::RaftMessage; | ||
use futures::StreamExt; | ||
use protobuf::Message as ProtobufMessage; | ||
use raft::prelude::Message; | ||
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; | ||
use std::collections::HashMap; | ||
use std::sync::{Arc, Mutex}; | ||
use tokio::sync::mpsc; | ||
use tokio::sync::mpsc::error::TrySendError; | ||
use tokio_stream::wrappers::ReceiverStream; | ||
use tonic::codegen::BoxStream; | ||
use tracing::{debug, instrument}; | ||
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum ConnectionError { | ||
#[error("internal error: {0}")] | ||
Internal(String), | ||
#[error(transparent)] | ||
Shutdown(#[from] ShutdownError), | ||
} | ||
|
||
#[derive(Clone, derive_more::Debug)] | ||
pub struct ConnectionManager { | ||
inner: Arc<ConnectionManagerInner>, | ||
#[debug(skip)] | ||
task_center: TaskCenter, | ||
} | ||
|
||
impl ConnectionManager { | ||
pub fn new(task_center: TaskCenter, identity: u64, router: mpsc::Sender<Message>) -> Self { | ||
ConnectionManager { | ||
inner: Arc::new(ConnectionManagerInner::new(identity, router)), | ||
task_center, | ||
} | ||
} | ||
|
||
pub fn identity(&self) -> u64 { | ||
self.inner.identity | ||
} | ||
|
||
pub fn accept_connection( | ||
&self, | ||
raft_peer: u64, | ||
incoming_rx: tonic::Streaming<RaftMessage>, | ||
) -> Result<BoxStream<RaftMessage>, ConnectionError> { | ||
let (outgoing_tx, outgoing_rx) = mpsc::channel(128); | ||
self.run_connection(raft_peer, outgoing_tx, incoming_rx)?; | ||
|
||
let outgoing_stream = ReceiverStream::new(outgoing_rx) | ||
.map(Result::<_, tonic::Status>::Ok) | ||
.boxed(); | ||
Ok(outgoing_stream) | ||
} | ||
|
||
pub fn run_connection( | ||
&self, | ||
remote_peer: u64, | ||
outgoing_tx: mpsc::Sender<RaftMessage>, | ||
incoming_rx: tonic::Streaming<RaftMessage>, | ||
) -> Result<(), ConnectionError> { | ||
let mut guard = self.inner.connections.lock().unwrap(); | ||
|
||
if guard.contains_key(&remote_peer) { | ||
// we already have a connection established to remote peer | ||
return Ok(()); | ||
} | ||
|
||
let connection = Connection::new(outgoing_tx); | ||
guard.insert(remote_peer, connection); | ||
|
||
let reactor = ConnectionReactor { | ||
remote_peer, | ||
connection_manager: Arc::clone(&self.inner), | ||
}; | ||
|
||
let _task_id = self.task_center.spawn_child( | ||
TaskKind::ConnectionReactor, | ||
"raft-connection-reactor", | ||
None, | ||
reactor.run(incoming_rx), | ||
)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
pub fn get_connection(&self, target: u64) -> Option<Connection> { | ||
self.inner.connections.lock().unwrap().get(&target).cloned() | ||
} | ||
} | ||
|
||
struct ConnectionReactor { | ||
remote_peer: u64, | ||
connection_manager: Arc<ConnectionManagerInner>, | ||
} | ||
|
||
impl ConnectionReactor { | ||
#[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))] | ||
async fn run(self, mut incoming_rx: tonic::Streaming<RaftMessage>) -> anyhow::Result<()> { | ||
let mut shutdown = std::pin::pin!(cancellation_watcher()); | ||
debug!("Run connection reactor"); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = &mut shutdown => { | ||
break; | ||
}, | ||
message = incoming_rx.next() => { | ||
match message { | ||
Some(message) => { | ||
match message { | ||
Ok(message) => { | ||
let message = Message::parse_from_carllerche_bytes(&message.message)?; | ||
|
||
assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity); | ||
|
||
if self.connection_manager.router.send(message).await.is_err() { | ||
// system is shutting down | ||
debug!("System is shutting down; closing connection"); | ||
break; | ||
} | ||
} | ||
Err(err) => { | ||
debug!("Closing connection because received error: {err}"); | ||
break; | ||
} | ||
} | ||
} | ||
None => { | ||
debug!("Remote peer closed connection"); | ||
break | ||
}, | ||
} | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl Drop for ConnectionReactor { | ||
fn drop(&mut self) { | ||
debug!(remote_peer = %self.remote_peer, "Close connection"); | ||
self.connection_manager | ||
.connections | ||
.lock() | ||
.expect("shouldn't be poisoned") | ||
.remove(&self.remote_peer); | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
struct ConnectionManagerInner { | ||
identity: u64, | ||
connections: Mutex<HashMap<u64, Connection>>, | ||
router: mpsc::Sender<Message>, | ||
} | ||
|
||
impl ConnectionManagerInner { | ||
pub fn new(identity: u64, router: mpsc::Sender<Message>) -> Self { | ||
ConnectionManagerInner { | ||
identity, | ||
router, | ||
connections: Mutex::default(), | ||
} | ||
} | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct Connection { | ||
tx: mpsc::Sender<RaftMessage>, | ||
} | ||
|
||
impl Connection { | ||
pub fn new(tx: mpsc::Sender<RaftMessage>) -> Self { | ||
Connection { tx } | ||
} | ||
|
||
pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError<RaftMessage>> { | ||
self.tx.try_send(message) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. | ||
// All rights reserved. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the LICENSE file. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0. | ||
|
||
tonic::include_proto!("dev.restate.raft_metadata_store_svc"); | ||
|
||
pub const FILE_DESCRIPTOR_SET: &[u8] = | ||
tonic::include_file_descriptor_set!("raft_metadata_store_svc"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. | ||
// All rights reserved. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the LICENSE file. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0. | ||
|
||
use crate::raft::connection_manager::{ConnectionError, ConnectionManager}; | ||
use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc; | ||
use crate::raft::grpc_svc::RaftMessage; | ||
use std::str::FromStr; | ||
use tonic::codegen::BoxStream; | ||
use tonic::{Request, Response, Status, Streaming}; | ||
|
||
pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer"; | ||
|
||
#[derive(Debug)] | ||
pub struct RaftMetadataStoreHandler { | ||
connection_manager: ConnectionManager, | ||
} | ||
|
||
impl RaftMetadataStoreHandler { | ||
pub fn new(connection_manager: ConnectionManager) -> Self { | ||
Self { connection_manager } | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl RaftMetadataStoreSvc for RaftMetadataStoreHandler { | ||
type RaftStream = BoxStream<RaftMessage>; | ||
|
||
async fn raft( | ||
&self, | ||
request: Request<Streaming<RaftMessage>>, | ||
) -> Result<Response<Self::RaftStream>, Status> { | ||
let raft_peer_metadata = | ||
request | ||
.metadata() | ||
.get(RAFT_PEER_METADATA_KEY) | ||
.ok_or(Status::invalid_argument(format!( | ||
"'{}' is missing", | ||
RAFT_PEER_METADATA_KEY | ||
)))?; | ||
let raft_peer = u64::from_str( | ||
raft_peer_metadata | ||
.to_str() | ||
.map_err(|err| Status::invalid_argument(err.to_string()))?, | ||
) | ||
.map_err(|err| Status::invalid_argument(err.to_string()))?; | ||
let outgoing_rx = self | ||
.connection_manager | ||
.accept_connection(raft_peer, request.into_inner())?; | ||
Ok(Response::new(outgoing_rx)) | ||
} | ||
} | ||
|
||
impl From<ConnectionError> for Status { | ||
fn from(value: ConnectionError) -> Self { | ||
match value { | ||
ConnectionError::Internal(err) => Status::internal(err), | ||
ConnectionError::Shutdown(err) => Status::aborted(err.to_string()), | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.