Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send x-restate-id header back on each request #1542

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/ingress-dispatcher/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl MessageHandler for IngressDispatcher {
// TODO we need to add back the expiration time for idempotent results
idempotency_expiry_time: None,
result: invocation_response.response.clone(),
invocation_id: invocation_response.correlation_ids.invocation_id,
};
if let Err(response) = sender.send(dispatcher_response) {
debug!(
Expand Down
1 change: 1 addition & 0 deletions crates/ingress-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct IngressDispatcherRequest {
pub struct IngressInvocationResponse {
pub idempotency_expiry_time: Option<String>,
pub result: IngressResponseResult,
pub invocation_id: Option<InvocationId>,
}

pub type IngressDeduplicationId = (String, MessageIndex);
Expand Down
22 changes: 14 additions & 8 deletions crates/ingress-http/src/handler/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ where

Self::reply_with_invocation_response(
response.result,
Some(invocation_id),
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
Expand Down Expand Up @@ -141,13 +142,18 @@ where
}
};

Self::reply_with_invocation_response(response.response, None, move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
})
Self::reply_with_invocation_response(
response.response,
Some(invocation_id),
None,
move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
},
)
}
}
11 changes: 10 additions & 1 deletion crates/ingress-http/src/handler/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ use bytes::Bytes;
use http::{header, HeaderName, Response};
use http_body_util::Full;
use restate_schema_api::invocation_target::InvocationTargetMetadata;
use restate_types::identifiers::InvocationId;
use restate_types::ingress::IngressResponseResult;
use restate_types::invocation::InvocationTarget;
use tracing::{info, trace};

const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");
pub(crate) const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");
/// Contains the string representation of the invocation id
pub(crate) const X_RESTATE_ID: HeaderName = HeaderName::from_static("x-restate-id");
slinkydeveloper marked this conversation as resolved.
Show resolved Hide resolved

impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageReader> {
pub(crate) fn reply_with_invocation_response(
response: IngressResponseResult,
invocation_id: Option<InvocationId>,
idempotency_expiry_time: Option<&str>,
invocation_target_metadata_retriever: impl FnOnce(
&InvocationTarget,
Expand All @@ -34,6 +38,11 @@ impl<Schemas, Dispatcher, StorageReader> Handler<Schemas, Dispatcher, StorageRea
// Prepare response metadata
let mut response_builder = hyper::Response::builder();

// Add invocation id if any
if let Some(id) = invocation_id {
response_builder = response_builder.header(X_RESTATE_ID, id.to_string());
}

// Add idempotency expiry time if available
if let Some(expiry_time) = idempotency_expiry_time {
response_builder = response_builder.header(IDEMPOTENCY_EXPIRES, expiry_time);
Expand Down
4 changes: 3 additions & 1 deletion crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::tracing::prepare_tracing_span;
use super::HandlerError;
use super::{Handler, APPLICATION_JSON};

use crate::handler::responses::{IDEMPOTENCY_EXPIRES, X_RESTATE_ID};
use crate::metric_definitions::{INGRESS_REQUESTS, INGRESS_REQUEST_DURATION, REQUEST_COMPLETED};
use bytes::Bytes;
use bytestring::ByteString;
Expand All @@ -31,7 +32,6 @@ use std::time::{Duration, Instant, SystemTime};
use tracing::{info, trace, warn, Instrument};

pub(crate) const IDEMPOTENCY_KEY: HeaderName = HeaderName::from_static("idempotency-key");
const IDEMPOTENCY_EXPIRES: HeaderName = HeaderName::from_static("idempotency-expires");
const DELAY_QUERY_PARAM: &str = "delay";
const DELAYSEC_QUERY_PARAM: &str = "delaysec";

Expand Down Expand Up @@ -245,6 +245,7 @@ where

Self::reply_with_invocation_response(
response.result,
Some(invocation_id),
response.idempotency_expiry_time.as_deref(),
move |_| Ok(invocation_target_metadata),
)
Expand Down Expand Up @@ -283,6 +284,7 @@ where
Ok(Response::builder()
.status(StatusCode::ACCEPTED)
.header(header::CONTENT_TYPE, APPLICATION_JSON)
.header(X_RESTATE_ID, submit_notification.invocation_id.to_string())
.body(Full::new(
serde_json::to_vec(&SendResponse {
invocation_id: submit_notification.invocation_id,
Expand Down
11 changes: 10 additions & 1 deletion crates/ingress-http/src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::Handler;
use restate_ingress_dispatcher::{IngressInvocationResponse, SubmittedInvocationNotification};
use std::collections::HashMap;

use crate::handler::responses::X_RESTATE_ID;
use bytes::Bytes;
use bytestring::ByteString;
use googletest::prelude::*;
Expand Down Expand Up @@ -83,6 +84,7 @@ async fn call_service() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand All @@ -97,7 +99,8 @@ async fn call_service() {
.await;

assert_eq!(response.status(), StatusCode::OK);
let (_, response_body) = response.into_parts();
let (parts, response_body) = response.into_parts();
assert!(parts.headers.contains_key(X_RESTATE_ID));
let response_bytes = response_body.collect().await.unwrap().to_bytes();
let response_value: GreetingResponse = serde_json::from_slice(&response_bytes).unwrap();
assert_eq!(response_value.greeting, "Igal");
Expand Down Expand Up @@ -138,6 +141,7 @@ async fn call_service_with_get() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -195,6 +199,7 @@ async fn call_virtual_object() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -382,6 +387,7 @@ async fn idempotency_key_parsing() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -493,6 +499,7 @@ async fn attach() {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(invocation_id),
result: IngressResponseResult::Success(
InvocationTarget::service("greeter.Greeter", "greet"),
serde_json::to_vec(&GreetingResponse {
Expand Down Expand Up @@ -888,6 +895,7 @@ fn expect_invocation_and_reply_with_empty(req: IngressDispatcherRequest) {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
Bytes::new(),
Expand All @@ -901,6 +909,7 @@ fn expect_invocation_and_reply_with_non_empty(req: IngressDispatcherRequest) {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
Bytes::from_static(b"123"),
Expand Down
22 changes: 14 additions & 8 deletions crates/ingress-http/src/handler/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ where

Self::reply_with_invocation_response(
response.result,
response.invocation_id,
response.idempotency_expiry_time.as_deref(),
move |invocation_target| {
self.schemas
Expand Down Expand Up @@ -137,13 +138,18 @@ where
}
};

Self::reply_with_invocation_response(response.response, None, move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
})
Self::reply_with_invocation_response(
response.response,
response.correlation_ids.invocation_id,
None,
move |invocation_target| {
self.schemas
.resolve_latest_invocation_target(
invocation_target.service_name(),
invocation_target.handler_name(),
)
.ok_or(HandlerError::NotFound)
},
)
}
}
2 changes: 2 additions & 0 deletions crates/ingress-http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ mod tests {
use restate_ingress_dispatcher::mocks::MockDispatcher;
use restate_ingress_dispatcher::{IngressDispatcherRequest, IngressInvocationResponse};
use restate_test_util::assert_eq;
use restate_types::identifiers::InvocationId;
use restate_types::ingress::IngressResponseResult;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
Expand Down Expand Up @@ -287,6 +288,7 @@ mod tests {
response_tx
.send(IngressInvocationResponse {
idempotency_expiry_time: None,
invocation_id: Some(InvocationId::mock_random()),
result: IngressResponseResult::Success(
service_invocation.invocation_target,
serde_json::to_vec(&crate::mocks::GreetingResponse {
Expand Down
Loading