Skip to content

Commit

Permalink
Add load-shedding to the GRPC V2 service.
Browse files Browse the repository at this point in the history
  • Loading branch information
abizjak committed Oct 12, 2023
1 parent 91a7f8a commit 4243e74
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 20 deletions.
2 changes: 1 addition & 1 deletion concordium-node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions concordium-node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "concordium_node"
version = "6.1.6" # must be kept in sync with 'is_compatible_version' in 'src/configuration.rs'
version = "6.1.7" # must be kept in sync with 'is_compatible_version' in 'src/configuration.rs'
description = "Concordium Node"
authors = ["Concordium <[email protected]>"]
exclude = [".gitignore", ".gitlab-ci.yml", "test/**/*","**/**/.gitignore","**/**/.gitlab-ci.yml"]
Expand Down Expand Up @@ -78,7 +78,7 @@ tempfile = { version = "3.1" }
tonic = { version = "0.9", features = ["tls"] }
tonic-reflection = "0.9"
tower-http = { version = "0.4", features = ["trace", "metrics"] }
tower = "0.4"
tower = {version = "0.4", features = ["load-shed"]}
tonic-web = "0.9"
prost = "0.11"
tokio = { version = "1.20", features = ["macros", "rt-multi-thread", "signal", "io-util", "time"] }
Expand Down
56 changes: 39 additions & 17 deletions concordium-node/src/grpc2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,12 +1064,18 @@ pub mod server {
.http2_keepalive_timeout(Some(std::time::Duration::from_secs(
config.keepalive_timeout,
)))
.layer(log_layer)
.layer(tower::limit::ConcurrencyLimitLayer::new(config.max_concurrent_requests))
// Note: the in-flight request layer applies after the limit layer. This is what we want so that the
// metric reflects the actual number of in-flight requests.
// Note: the in-flight request layer applies first here. Since we are using a load-shed
// layer just below this corresponds very directly to the number of requests being actually handled.
// The technical reason for this is that we cannot really stack the in flight requests layer
// below the stats layer since we want to transform some `Err` responses in the stats layer
// to Ok responses with a meaningful gRPC status code,
// but since the in flight request layer adds a guard to count in-flight requests this would
// mean we'd have to construct such a guard in the response, which is not possible.
.layer(in_flight_request_layer)
.layer(stats_layer);
.layer(stats_layer)
.layer(tower::load_shed::LoadShedLayer::new())
.layer(tower::limit::ConcurrencyLimitLayer::new(config.max_concurrent_requests))
.layer(log_layer);
if let Some(identity) = identity {
builder = builder
.tls_config(ServerTlsConfig::new().identity(identity))
Expand Down Expand Up @@ -2560,19 +2566,21 @@ fn get_grpc_code_label(code: tonic::Code) -> &'static str {
/// Actual middleware implementation updating the stats.
/// The middleware is called once for each gRPC request, even for the streaming
/// gRPC methods.
impl<S> tower::Service<hyper::Request<hyper::Body>> for StatsMiddleware<S>
impl<S, Body: hyper::body::HttpBody + Unpin + Send + Default>
tower::Service<hyper::Request<hyper::Body>> for StatsMiddleware<S>
where
S: tower::Service<
hyper::Request<hyper::Body>,
Response = hyper::Response<tonic::body::BoxBody>,
Response = hyper::Response<Body>,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
type Error = S::Error;
type Error = tower::BoxError;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Response = S::Response;
type Response = hyper::Response<Body>;

fn poll_ready(
&mut self,
Expand All @@ -2599,15 +2607,29 @@ where
// Time taken for the inner service to send back a response, meaning for
// streaming gRPC methods this is the duration for it to first return a stream.
let duration = request_received.elapsed().as_secs_f64();
if result.is_err() {
grpc_request_duration
.with_label_values(&[
endpoint_name.as_str(),
get_grpc_code_label(tonic::Code::Internal),
])
.observe(duration);
match result {
Err(e) => {
// If the load shed service terminated the request this will be signalled as
// an Err(Overloaded). So record resource exhaustion
// in the metrics.
let (code, response) = if e.is::<tower::load_shed::error::Overloaded>() {
let new_response =
tonic::Status::resource_exhausted("Too many concurrent requests.")
.to_http();
(
tonic::Code::ResourceExhausted,
Ok(new_response.map(|_| Default::default())),
)
} else {
(tonic::Code::Internal, Err(e))
};
grpc_request_duration
.with_label_values(&[endpoint_name.as_str(), get_grpc_code_label(code)])
.observe(duration);
return response;
}
Ok(result) => (result, duration),
}
(result?, duration)
};

// Check if the gRPC status header is part of the HTTP headers, if not check for
Expand Down

0 comments on commit 4243e74

Please sign in to comment.