From db544d3096c0e54b1703b6e98ea53924c2a76480 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Wed, 22 May 2024 17:56:24 +0200 Subject: [PATCH] Send x-restate-id header back on each request (#1542) --- crates/ingress-dispatcher/src/dispatcher.rs | 1 + crates/ingress-dispatcher/src/lib.rs | 1 + crates/ingress-http/src/handler/invocation.rs | 22 ++++++++++++------- crates/ingress-http/src/handler/responses.rs | 11 +++++++++- .../src/handler/service_handler.rs | 4 +++- crates/ingress-http/src/handler/tests.rs | 11 +++++++++- crates/ingress-http/src/handler/workflow.rs | 22 ++++++++++++------- crates/ingress-http/src/server.rs | 2 ++ 8 files changed, 55 insertions(+), 19 deletions(-) diff --git a/crates/ingress-dispatcher/src/dispatcher.rs b/crates/ingress-dispatcher/src/dispatcher.rs index 273149ede..783da93ca 100644 --- a/crates/ingress-dispatcher/src/dispatcher.rs +++ b/crates/ingress-dispatcher/src/dispatcher.rs @@ -234,6 +234,7 @@ impl MessageHandler for IngressDispatcher { // TODO we need to add back the expiration time for idempotent results idempotency_expiry_time: None, result: invocation_response.response.clone(), + invocation_id: invocation_response.correlation_ids.invocation_id, }; if let Err(response) = sender.send(dispatcher_response) { debug!( diff --git a/crates/ingress-dispatcher/src/lib.rs b/crates/ingress-dispatcher/src/lib.rs index 5446a32fb..23fcbeee9 100644 --- a/crates/ingress-dispatcher/src/lib.rs +++ b/crates/ingress-dispatcher/src/lib.rs @@ -116,6 +116,7 @@ pub struct IngressDispatcherRequest { pub struct IngressInvocationResponse { pub idempotency_expiry_time: Option, pub result: IngressResponseResult, + pub invocation_id: Option, } pub type IngressDeduplicationId = (String, MessageIndex); diff --git a/crates/ingress-http/src/handler/invocation.rs b/crates/ingress-http/src/handler/invocation.rs index 355212480..c4a95d3b8 100644 --- a/crates/ingress-http/src/handler/invocation.rs +++ b/crates/ingress-http/src/handler/invocation.rs @@ -98,6 +98,7 @@ where Self::reply_with_invocation_response( response.result, + Some(invocation_id), response.idempotency_expiry_time.as_deref(), move |invocation_target| { self.schemas @@ -141,13 +142,18 @@ where } }; - Self::reply_with_invocation_response(response.response, None, move |invocation_target| { - self.schemas - .resolve_latest_invocation_target( - invocation_target.service_name(), - invocation_target.handler_name(), - ) - .ok_or(HandlerError::NotFound) - }) + Self::reply_with_invocation_response( + response.response, + Some(invocation_id), + None, + move |invocation_target| { + self.schemas + .resolve_latest_invocation_target( + invocation_target.service_name(), + invocation_target.handler_name(), + ) + .ok_or(HandlerError::NotFound) + }, + ) } } diff --git a/crates/ingress-http/src/handler/responses.rs b/crates/ingress-http/src/handler/responses.rs index 38f563cab..74d4097ce 100644 --- a/crates/ingress-http/src/handler/responses.rs +++ b/crates/ingress-http/src/handler/responses.rs @@ -14,15 +14,19 @@ use bytes::Bytes; use http::{header, HeaderName, Response}; use http_body_util::Full; use restate_schema_api::invocation_target::InvocationTargetMetadata; +use restate_types::identifiers::InvocationId; use restate_types::ingress::IngressResponseResult; use restate_types::invocation::InvocationTarget; use tracing::{info, trace}; -const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires"); +pub(crate) const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires"); +/// Contains the string representation of the invocation id +pub(crate) const X_RESTATE_ID: HeaderName = HeaderName::from_static("x-restate-id"); impl Handler { pub(crate) fn reply_with_invocation_response( response: IngressResponseResult, + invocation_id: Option, idempotency_expiry_time: Option<&str>, invocation_target_metadata_retriever: impl FnOnce( &InvocationTarget, @@ -34,6 +38,11 @@ impl Handler