diff --git a/crates/ingress-dispatcher/src/dispatcher.rs b/crates/ingress-dispatcher/src/dispatcher.rs index 273149ede..783da93ca 100644 --- a/crates/ingress-dispatcher/src/dispatcher.rs +++ b/crates/ingress-dispatcher/src/dispatcher.rs @@ -234,6 +234,7 @@ impl MessageHandler for IngressDispatcher { // TODO we need to add back the expiration time for idempotent results idempotency_expiry_time: None, result: invocation_response.response.clone(), + invocation_id: invocation_response.correlation_ids.invocation_id, }; if let Err(response) = sender.send(dispatcher_response) { debug!( diff --git a/crates/ingress-dispatcher/src/lib.rs b/crates/ingress-dispatcher/src/lib.rs index 5446a32fb..23fcbeee9 100644 --- a/crates/ingress-dispatcher/src/lib.rs +++ b/crates/ingress-dispatcher/src/lib.rs @@ -116,6 +116,7 @@ pub struct IngressDispatcherRequest { pub struct IngressInvocationResponse { pub idempotency_expiry_time: Option<String>, pub result: IngressResponseResult, + pub invocation_id: Option<InvocationId>, } pub type IngressDeduplicationId = (String, MessageIndex); diff --git a/crates/ingress-http/src/handler/invocation.rs b/crates/ingress-http/src/handler/invocation.rs index 355212480..c4a95d3b8 100644 --- a/crates/ingress-http/src/handler/invocation.rs +++ b/crates/ingress-http/src/handler/invocation.rs @@ -98,6 +98,7 @@ where Self::reply_with_invocation_response( response.result, + Some(invocation_id), response.idempotency_expiry_time.as_deref(), move |invocation_target| { self.schemas @@ -141,13 +142,18 @@ where } }; - Self::reply_with_invocation_response(response.response, None, move |invocation_target| { - self.schemas - .resolve_latest_invocation_target( - invocation_target.service_name(), - invocation_target.handler_name(), - ) - .ok_or(HandlerError::NotFound) - }) + Self::reply_with_invocation_response( + response.response, + Some(invocation_id), + None, + move |invocation_target| { + self.schemas + .resolve_latest_invocation_target( + invocation_target.service_name(), + invocation_target.handler_name(), + ) + .ok_or(HandlerError::NotFound) + }, + ) } } diff --git a/crates/ingress-http/src/handler/responses.rs b/crates/ingress-http/src/handler/responses.rs index 38f563cab..74d4097ce 100644 --- a/crates/ingress-http/src/handler/responses.rs +++ b/crates/ingress-http/src/handler/responses.rs @@ -14,15 +14,19 @@ use bytes::Bytes; use http::{header, HeaderName, Response}; use http_body_util::Full; use restate_schema_api::invocation_target::InvocationTargetMetadata; +use restate_types::identifiers::InvocationId; use restate_types::ingress::IngressResponseResult; use restate_types::invocation::InvocationTarget; use tracing::{info, trace}; -const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires"); +pub(crate) const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires"); +/// Contains the string representation of the invocation id +pub(crate) const X_RESTATE_ID: HeaderName = HeaderName::from_static("x-restate-id"); impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader> { pub(crate) fn reply_with_invocation_response( response: IngressResponseResult, + invocation_id: Option<InvocationId>, idempotency_expiry_time: Option<&str>, invocation_target_metadata_retriever: impl FnOnce( &InvocationTarget, @@ -34,6 +38,11 @@ impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageRea // Prepare response metadata let mut response_builder = hyper::Response::builder(); + // Add invocation id if any + if let Some(id) = invocation_id { + response_builder = response_builder.header(X_RESTATE_ID, id.to_string()); + } + // Add idempotency expiry time if available if let Some(expiry_time) = idempotency_expiry_time { response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time); diff --git a/crates/ingress-http/src/handler/service_handler.rs b/crates/ingress-http/src/handler/service_handler.rs index 7e8d3bf26..96f1e3b4a 100644 --- a/crates/ingress-http/src/handler/service_handler.rs +++ b/crates/ingress-http/src/handler/service_handler.rs @@ -13,6 +13,7 @@ use super::tracing::prepare_tracing_span; use super::HandlerError; use super::{Handler, APPLICATION_JSON}; +use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID}; use crate::metric_definitions::{INGRESS_REQUESTS, INGRESS_REQUEST_DURATION, REQUEST_COMPLETED}; use bytes::Bytes; use bytestring::ByteString; @@ -31,7 +32,6 @@ use std::time::{Duration, Instant, SystemTime}; use tracing::{info, trace, warn, Instrument}; pub(crate) const IDEMPOTENCY_KEY: HeaderName = HeaderName::from_static("idempotency-key"); -const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires"); const DELAY_QUERY_PARAM: &str = "delay"; const DELAYSEC_QUERY_PARAM: &str = "delaysec"; @@ -245,6 +245,7 @@ where Self::reply_with_invocation_response( response.result, + Some(invocation_id), response.idempotency_expiry_time.as_deref(), move |_| Ok(invocation_target_metadata), ) @@ -283,6 +284,7 @@ where Ok(Response::builder() .status(StatusCode::ACCEPTED) .header(header::CONTENT_TYPE, APPLICATION_JSON) + .header(X_RESTATE_ID, submit_notification.invocation_id.to_string()) .body(Full::new( serde_json::to_vec(&SendResponse { invocation_id: submit_notification.invocation_id, diff --git a/crates/ingress-http/src/handler/tests.rs b/crates/ingress-http/src/handler/tests.rs index b28afe356..23685ddc4 100644 --- a/crates/ingress-http/src/handler/tests.rs +++ b/crates/ingress-http/src/handler/tests.rs @@ -16,6 +16,7 @@ use super::Handler; use restate_ingress_dispatcher::{IngressInvocationResponse, SubmittedInvocationNotification}; use std::collections::HashMap; +use crate::handler::responses::X_RESTATE_ID; use bytes::Bytes; use bytestring::ByteString; use googletest::prelude::*; @@ -83,6 +84,7 @@ async fn call_service() { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(InvocationId::mock_random()), result: IngressResponseResult::Success( service_invocation.invocation_target, serde_json::to_vec(&GreetingResponse { @@ -97,7 +99,8 @@ async fn call_service() { .await; assert_eq!(response.status(), StatusCode::OK); - let (_, response_body) = response.into_parts(); + let (parts, response_body) = response.into_parts(); + assert!(parts.headers.contains_key(X_RESTATE_ID)); let response_bytes = response_body.collect().await.unwrap().to_bytes(); let response_value: GreetingResponse = serde_json::from_slice(&response_bytes).unwrap(); assert_eq!(response_value.greeting, "Igal"); @@ -138,6 +141,7 @@ async fn call_service_with_get() { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(InvocationId::mock_random()), result: IngressResponseResult::Success( service_invocation.invocation_target, serde_json::to_vec(&GreetingResponse { @@ -195,6 +199,7 @@ async fn call_virtual_object() { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(InvocationId::mock_random()), result: IngressResponseResult::Success( service_invocation.invocation_target, serde_json::to_vec(&GreetingResponse { @@ -382,6 +387,7 @@ async fn idempotency_key_parsing() { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(InvocationId::mock_random()), result: IngressResponseResult::Success( service_invocation.invocation_target, serde_json::to_vec(&GreetingResponse { @@ -493,6 +499,7 @@ async fn attach() { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(invocation_id), result: IngressResponseResult::Success( InvocationTarget::service("greeter.Greeter", "greet"), serde_json::to_vec(&GreetingResponse { @@ -888,6 +895,7 @@ fn expect_invocation_and_reply_with_empty(req: IngressDispatcherRequest) { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(InvocationId::mock_random()), result: IngressResponseResult::Success( service_invocation.invocation_target, Bytes::new(), @@ -901,6 +909,7 @@ fn expect_invocation_and_reply_with_non_empty(req: IngressDispatcherRequest) { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(InvocationId::mock_random()), result: IngressResponseResult::Success( service_invocation.invocation_target, Bytes::from_static(b"123"), diff --git a/crates/ingress-http/src/handler/workflow.rs b/crates/ingress-http/src/handler/workflow.rs index 649b13a43..527580d4d 100644 --- a/crates/ingress-http/src/handler/workflow.rs +++ b/crates/ingress-http/src/handler/workflow.rs @@ -94,6 +94,7 @@ where Self::reply_with_invocation_response( response.result, + response.invocation_id, response.idempotency_expiry_time.as_deref(), move |invocation_target| { self.schemas @@ -137,13 +138,18 @@ where } }; - Self::reply_with_invocation_response(response.response, None, move |invocation_target| { - self.schemas - .resolve_latest_invocation_target( - invocation_target.service_name(), - invocation_target.handler_name(), - ) - .ok_or(HandlerError::NotFound) - }) + Self::reply_with_invocation_response( + response.response, + response.correlation_ids.invocation_id, + None, + move |invocation_target| { + self.schemas + .resolve_latest_invocation_target( + invocation_target.service_name(), + invocation_target.handler_name(), + ) + .ok_or(HandlerError::NotFound) + }, + ) } } diff --git a/crates/ingress-http/src/server.rs b/crates/ingress-http/src/server.rs index 76e9b3156..dc15362d2 100644 --- a/crates/ingress-http/src/server.rs +++ b/crates/ingress-http/src/server.rs @@ -249,6 +249,7 @@ mod tests { use restate_ingress_dispatcher::mocks::MockDispatcher; use restate_ingress_dispatcher::{IngressDispatcherRequest, IngressInvocationResponse}; use restate_test_util::assert_eq; + use restate_types::identifiers::InvocationId; use restate_types::ingress::IngressResponseResult; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; @@ -287,6 +288,7 @@ mod tests { response_tx .send(IngressInvocationResponse { idempotency_expiry_time: None, + invocation_id: Some(InvocationId::mock_random()), result: IngressResponseResult::Success( service_invocation.invocation_target, serde_json::to_vec(&crate::mocks::GreetingResponse {