Skip to content

Commit

Permalink
lust and gs tablet fix release 0215
Browse files Browse the repository at this point in the history
  • Loading branch information
miamia0 committed Apr 3, 2024
1 parent 7839ca4 commit 446ce2a
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 104 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tokio = { workspace = true, features = [
"parking_lot",
] }
tracing.workspace = true
tokio-condvar = "0.1.0"

[features]
default = []
Expand Down
15 changes: 8 additions & 7 deletions volo-thrift/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,10 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
}

#[derive(Clone)]
pub struct MessageService<Resp, MkT, MkC>
pub struct MessageService<Req, Resp, MkT, MkC>
where
Resp: EntryMessage + Send + 'static,
Req: EntryMessage + Send + 'static + Sync,
Resp: EntryMessage + Send + 'static + Sync,
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
{
Expand All @@ -474,14 +475,14 @@ where
#[cfg(feature = "multiplex")]
inner: motore::utils::Either<
pingpong::Client<Resp, MkT, MkC>,
crate::transport::multiplex::Client<Resp, MkT, MkC>,
crate::transport::multiplex::Client<Req, Resp, MkT, MkC>,
>,
read_biz_error: bool,
}

impl<Req, Resp, MkT, MkC> Service<ClientContext, Req> for MessageService<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Service<ClientContext, Req> for MessageService<Req, Resp, MkT, MkC>
where
Req: EntryMessage + 'static + Send,
Req: Send + 'static + EntryMessage + Sync,
Resp: Send + 'static + EntryMessage + Sync,
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Expand Down Expand Up @@ -531,8 +532,8 @@ where
+ Clone
+ Sync,
Req: EntryMessage + Send + 'static + Sync + Clone,
Resp: EntryMessage + Send + 'static,
IL: Layer<MessageService<Resp, MkT, MkC>>,
Resp: EntryMessage + Send + 'static + Sync,
IL: Layer<MessageService<Req, Resp, MkT, MkC>>,
IL::Service:
Service<ClientContext, Req, Response = Option<Resp>> + Sync + Clone + Send + 'static,
<IL::Service as Service<ClientContext, Req>>::Error: Send + Into<ClientError>,
Expand Down
49 changes: 47 additions & 2 deletions volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ pub struct DefaultEncoder<E, W> {
impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
for DefaultEncoder<E, W>
{
#[inline]
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
async fn send<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
Expand Down Expand Up @@ -179,6 +178,52 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + Unpin + Send + Sync + 'static> Encoder
}
// write_result
}

#[inline]
async fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> Result<(), ThriftException> {
cx.stats_mut().record_encode_start_at();

// first, we need to get the size of the message
let (real_size, malloc_size) = self.encoder.size(cx, &msg)?;
trace!(
"[VOLO] codec encode message real size: {}, malloc size: {}",
real_size,
malloc_size
);
cx.stats_mut().set_write_size(real_size);

// then we reserve the size of the message in the linked bytes
self.linked_bytes.reserve(malloc_size);
// after that, we encode the message into the linked bytes
self.encoder
.encode(cx, &mut self.linked_bytes, msg)
.map_err(|e| {
// record the error time
cx.stats_mut().record_encode_end_at();
e
})?;

cx.stats_mut().record_encode_end_at();
Ok(())
}

async fn flush(&mut self) -> Result<(), ThriftException> {
let write_result: Result<(), ThriftException> = self
.linked_bytes
.write_all_vectored(&mut self.writer)
.await
.map_err(|e| e.into());
write_result?;
self.writer.flush().await.map_err(Into::into)
}

async fn reset(&mut self) {
self.linked_bytes.reset();
}
}

pub struct DefaultDecoder<D, R> {
Expand Down
8 changes: 8 additions & 0 deletions volo-thrift/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,19 @@ pub trait Decoder: Send + 'static {
///
/// Note: [`Encoder`] should be designed to be ready for reuse.
pub trait Encoder: Send + 'static {
fn reset(&mut self) -> impl Future<Output = ()> + Send;
fn send<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> impl Future<Output = Result<(), ThriftException>> + Send;
fn encode<Req: Send + EntryMessage, Cx: ThriftContext>(
&mut self,
cx: &mut Cx,
msg: ThriftMessage<Req>,
) -> impl Future<Output = Result<(), ThriftException>> + Send;

fn flush(&mut self) -> impl Future<Output = Result<(), ThriftException>> + Send;
}

/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a
Expand Down
1 change: 0 additions & 1 deletion volo-thrift/src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub(crate) mod incoming;
#[cfg(feature = "multiplex")]
pub mod multiplex;
pub mod pingpong;
pub mod pool;
Expand Down
38 changes: 21 additions & 17 deletions volo-thrift/src/transport/multiplex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ use crate::{
ClientError, EntryMessage, ThriftMessage,
};

pub struct MakeClientTransport<MkT, MkC, Resp>
pub struct MakeClientTransport<MkT, MkC, Req, Resp>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>,
{
make_transport: MkT,
make_codec: MkC,
_phantom: PhantomData<fn() -> Resp>,
_phantom: PhantomData<(fn() -> Resp, fn() -> Req)>,
}

impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Resp> Clone
for MakeClientTransport<MkT, MkC, Resp>
impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Req, Resp> Clone
for MakeClientTransport<MkT, MkC, Req, Resp>
{
fn clone(&self) -> Self {
Self {
Expand All @@ -36,7 +36,7 @@ impl<MkT: MakeTransport, MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>, Resp> Cl
}
}

impl<MkT, MkC, Resp> MakeClientTransport<MkT, MkC, Resp>
impl<MkT, MkC, Req, Resp> MakeClientTransport<MkT, MkC, Req, Resp>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf>,
Expand All @@ -51,13 +51,14 @@ where
}
}

impl<MkT, MkC, Resp> UnaryService<Address> for MakeClientTransport<MkT, MkC, Resp>
impl<MkT, MkC, Req, Resp> UnaryService<Address> for MakeClientTransport<MkT, MkC, Req, Resp>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
type Response = ThriftTransport<MkC::Encoder, Resp>;
type Response = ThriftTransport<MkC::Encoder, Req, Resp>;
type Error = io::Error;

async fn call(&self, target: Address) -> Result<Self::Response, Self::Error> {
Expand All @@ -72,22 +73,24 @@ where
}
}

pub struct Client<Resp, MkT, MkC>
pub struct Client<Req, Resp, MkT, MkC>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
#[allow(clippy::type_complexity)]
make_transport: PooledMakeTransport<MakeClientTransport<MkT, MkC, Resp>, Address>,
make_transport: PooledMakeTransport<MakeClientTransport<MkT, MkC, Req, Resp>, Address>,
_marker: PhantomData<Resp>,
}

impl<Resp, MkT, MkC> Clone for Client<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Clone for Client<Req, Resp, MkT, MkC>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
fn clone(&self) -> Self {
Self {
Expand All @@ -97,11 +100,12 @@ where
}
}

impl<Resp, MkT, MkC> Client<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Client<Req, Resp, MkT, MkC>
where
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Resp: EntryMessage + Send + 'static,
Resp: EntryMessage + Send + 'static + Sync,
Req: EntryMessage + Send + 'static + Sync,
{
pub fn new(make_transport: MkT, pool_cfg: Option<Config>, make_codec: MkC) -> Self {
let make_transport = MakeClientTransport::new(make_transport, make_codec);
Expand All @@ -113,9 +117,9 @@ where
}
}

impl<Req, Resp, MkT, MkC> Service<ClientContext, ThriftMessage<Req>> for Client<Resp, MkT, MkC>
impl<Req, Resp, MkT, MkC> Service<ClientContext, ThriftMessage<Req>> for Client<Req, Resp, MkT, MkC>
where
Req: Send + 'static + EntryMessage,
Req: Send + 'static + EntryMessage + Sync,
Resp: EntryMessage + Send + 'static + Sync,
MkT: MakeTransport,
MkC: MakeCodec<MkT::ReadHalf, MkT::WriteHalf> + Sync,
Expand Down
1 change: 1 addition & 0 deletions volo-thrift/src/transport/multiplex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod client;
mod server;
mod thrift_transport;
pub mod utils;

pub use client::Client;
pub use server::serve;
2 changes: 1 addition & 1 deletion volo-thrift/src/transport/multiplex/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn serve<Svc, Req, Resp, E, D>(
if let Err(e) = metainfo::METAINFO
.scope(
RefCell::new(mi),
encoder.encode::<Resp, ServerContext>(&mut cx, msg),
encoder.send::<Resp, ServerContext>(&mut cx, msg),
)
.await
{
Expand Down
Loading

0 comments on commit 446ce2a

Please sign in to comment.