Skip to content

Commit

Permalink
Send x-restate-id header back on each request (#1542)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored May 22, 2024
1 parent 2b05f4e commit db544d3
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 19 deletions.
1 change: 1 addition & 0 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl MessageHandler for IngressDispatcher {
// TODO we need to add back the expiration time for idempotent results
idempotency_expiry_time: None,
result: invocation_response.response.clone(),
invocation_id: invocation_response.correlation_ids.invocation_id,
};
if let Err(response) = sender.send(dispatcher_response) {
debug!(
Expand Down
1 change: 1 addition & 0 deletions crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct IngressDispatcherRequest {
pub struct IngressInvocationResponse {
pub idempotency_expiry_time: Option<String>,
pub result: IngressResponseResult,
pub invocation_id: Option<InvocationId>,
}

pub type IngressDeduplicationId = (String, MessageIndex);
Expand Down
22 changes: 14 additions & 8 deletions crates/ingress-http/src/handler/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ where

Self::reply_with_invocation_response(
response.result,
Some(invocation_id),
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
Expand Down Expand Up @@ -141,13 +142,18 @@ where
}
};

Self::reply_with_invocation_response(response.response, None, move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
})
Self::reply_with_invocation_response(
response.response,
Some(invocation_id),
None,
move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
},
)
}
}
11 changes: 10 additions & 1 deletion crates/ingress-http/src/handler/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ use bytes::Bytes;
use http::{header, HeaderName, Response};
use http_body_util::Full;
use restate_schema_api::invocation_target::InvocationTargetMetadata;
use restate_types::identifiers::InvocationId;
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::InvocationTarget;
use tracing::{info, trace};

const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");
pub(crate) const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");
/// Contains the string representation of the invocation id
pub(crate) const X_RESTATE_ID: HeaderName = HeaderName::from_static("x-restate-id");

impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader> {
pub(crate) fn reply_with_invocation_response(
response: IngressResponseResult,
invocation_id: Option<InvocationId>,
idempotency_expiry_time: Option<&str>,
invocation_target_metadata_retriever: impl FnOnce(
&InvocationTarget,
Expand All @@ -34,6 +38,11 @@ impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageRea
// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add invocation id if any
if let Some(id) = invocation_id {
response_builder = response_builder.header(X_RESTATE_ID, id.to_string());
}

// Add idempotency expiry time if available
if let Some(expiry_time) = idempotency_expiry_time {
response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
Expand Down
4 changes: 3 additions & 1 deletion crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::tracing::prepare_tracing_span;
use super::HandlerError;
use super::{Handler, APPLICATION_JSON};

use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID};
use crate::metric_definitions::{INGRESS_REQUESTS, INGRESS_REQUEST_DURATION, REQUEST_COMPLETED};
use bytes::Bytes;
use bytestring::ByteString;
Expand All @@ -31,7 +32,6 @@ use std::time::{Duration, Instant, SystemTime};
use tracing::{info, trace, warn, Instrument};

pub(crate) const IDEMPOTENCY_KEY: HeaderName = HeaderName::from_static("idempotency-key");
const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");
const DELAY_QUERY_PARAM: &str = "delay";
const DELAYSEC_QUERY_PARAM: &str = "delaysec";

Expand Down Expand Up @@ -245,6 +245,7 @@ where

Self::reply_with_invocation_response(
response.result,
Some(invocation_id),
response.idempotency_expiry_time.as_deref(),
move |_| Ok(invocation_target_metadata),
)
Expand Down Expand Up @@ -283,6 +284,7 @@ where
Ok(Response::builder()
.status(StatusCode::ACCEPTED)
.header(header::CONTENT_TYPE, APPLICATION_JSON)
.header(X_RESTATE_ID, submit_notification.invocation_id.to_string())
.body(Full::new(
serde_json::to_vec(&SendResponse {
invocation_id: submit_notification.invocation_id,
Expand Down
11 changes: 10 additions & 1 deletion crates/ingress-http/src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::Handler;
use restate_ingress_dispatcher::{IngressInvocationResponse, SubmittedInvocationNotification};
use std::collections::HashMap;

use crate::handler::responses::X_RESTATE_ID;
use bytes::Bytes;
use bytestring::ByteString;
use googletest::prelude::*;
Expand Down Expand Up @@ -83,6 +84,7 @@ async fn call_service() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand All @@ -97,7 +99,8 @@ async fn call_service() {
.await;

assert_eq!(response.status(), StatusCode::OK);
let (_, response_body) = response.into_parts();
let (parts, response_body) = response.into_parts();
assert!(parts.headers.contains_key(X_RESTATE_ID));
let response_bytes = response_body.collect().await.unwrap().to_bytes();
let response_value: GreetingResponse = serde_json::from_slice(&response_bytes).unwrap();
assert_eq!(response_value.greeting, "Igal");
Expand Down Expand Up @@ -138,6 +141,7 @@ async fn call_service_with_get() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -195,6 +199,7 @@ async fn call_virtual_object() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -382,6 +387,7 @@ async fn idempotency_key_parsing() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -493,6 +499,7 @@ async fn attach() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(invocation_id),
result: IngressResponseResult::Success(
InvocationTarget::service("greeter.Greeter", "greet"),
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -888,6 +895,7 @@ fn expect_invocation_and_reply_with_empty(req: IngressDispatcherRequest) {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
Bytes::new(),
Expand All @@ -901,6 +909,7 @@ fn expect_invocation_and_reply_with_non_empty(req: IngressDispatcherRequest) {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
Bytes::from_static(b"123"),
Expand Down
22 changes: 14 additions & 8 deletions crates/ingress-http/src/handler/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ where

Self::reply_with_invocation_response(
response.result,
response.invocation_id,
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
Expand Down Expand Up @@ -137,13 +138,18 @@ where
}
};

Self::reply_with_invocation_response(response.response, None, move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
})
Self::reply_with_invocation_response(
response.response,
response.correlation_ids.invocation_id,
None,
move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
},
)
}
}
2 changes: 2 additions & 0 deletions crates/ingress-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ mod tests {
use restate_ingress_dispatcher::mocks::MockDispatcher;
use restate_ingress_dispatcher::{IngressDispatcherRequest, IngressInvocationResponse};
use restate_test_util::assert_eq;
use restate_types::identifiers::InvocationId;
use restate_types::ingress::IngressResponseResult;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
Expand Down Expand Up @@ -287,6 +288,7 @@ mod tests {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&crate::mocks::GreetingResponse {
Expand Down

0 comments on commit db544d3

Please sign in to comment.