Skip to content

Commit

Permalink
handle core errors properly
Browse files Browse the repository at this point in the history
  • Loading branch information
Khoyo committed Jul 11, 2024
1 parent 1aacf1b commit 1a6f5ac
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package fr.sncf.osrd.reporting.exceptions;

import com.squareup.moshi.Json;

public enum ErrorCause {
@Json(name = "Internal")
INTERNAL,
@Json(name = "User")
USER
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class OSRDError extends RuntimeException {
public Map<String, Object> context = new HashMap<>();

public final transient ErrorType osrdErrorType;
public final transient ErrorCause cause;
public final ErrorCause cause;

/**
* Constructs a new OSRDError with the specified error type.
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/java/fr/sncf/osrd/cli/WorkerCommand.kt
Original file line number Diff line number Diff line change
Expand Up @@ -169,24 +169,30 @@ class WorkerCommand : CliCommand {
)
val span = tracer.spanBuilder(path).setParent(context).startSpan()

val payload = try {
var payload: ByteArray
var status: ByteArray
try {
span.makeCurrent().use { scope ->
val response = endpoint.act(MQRequest(path, body))
response
payload = response
.body()
.readAllBytes() // TODO: check the response code too to catch error
.readAllBytes() // TODO: check the response code too to catch
val httpHeader = response.head().first()
val statusCode = httpHeader.split(" ")[1]
status = (if (statusCode[0] == '2') "ok" else "core_error").encodeToByteArray()
}
} catch (t: Throwable) {
span.recordException(t)
"ERROR, exception received".toByteArray() // TODO: have a valid payload for uncaught exceptions
payload = "ERROR, exception received".toByteArray() // TODO: have a valid payload for uncaught exceptions
status = "core_error".encodeToByteArray()
} finally {
span.end()
}

if (replyTo != null) {
val properties = AMQP.BasicProperties().builder()
.correlationId(correlationId)
.headers(mapOf("x-status" to "ok"))
.headers(mapOf("x-status" to status))
.build()
channel.basicPublish("", replyTo, properties, payload)
}
Expand Down
37 changes: 25 additions & 12 deletions editoast/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,16 @@ impl CoreClient {
Ok(Self::MessageQueue(client))
}

fn handle_error(
&self,
bytes: &[u8],
status: reqwest::StatusCode,
url: String,
) -> InternalError {
fn handle_error(&self, bytes: &[u8], url: String) -> InternalError {
// We try to deserialize the response as an StandardCoreError in order to retain the context of the core error
if let Ok(mut core_error) = <Json<StandardCoreError>>::from_bytes(bytes) {
let status: u16 = match core_error.cause {
CoreErrorCause::Internal => 500,
CoreErrorCause::User => 400,
};
core_error.context.insert("url".to_owned(), url.into());
let mut internal_error: InternalError = core_error.into();
internal_error.set_status(StatusCode::from_u16(status.as_u16()).unwrap());
internal_error.set_status(StatusCode::from_u16(status).unwrap());
return internal_error;
}

Expand Down Expand Up @@ -166,7 +165,7 @@ impl CoreClient {
}

error!(target: "editoast::coreclient", "{method_s} {path} {status}", status = status.to_string().bold().red());
Err(self.handle_error(bytes.as_ref(), status, url))
Err(self.handle_error(bytes.as_ref(), url))
}
CoreClient::MessageQueue(client) => {
// TODO: maybe implement retry?
Expand All @@ -175,19 +174,26 @@ impl CoreClient {
// TODO: tracing: use correlation id

let response = client
.call_with_response::<_, R>(infra_id.to_string(), path, &body, true, None, None)
.call_with_response(infra_id.to_string(), path, &body, true, None, None)
.await?;

Ok(response)
if response.status == b"ok" {
return Ok(R::from_bytes(&response.payload)?);
}

if response.status == b"core_error" {
return Err(self.handle_error(&response.payload, path.to_string()));
}

todo!("TODO: handle protocol errors")
}
#[cfg(test)]
CoreClient::Mocked(client) => {
match client.fetch_mocked::<_, B, R>(method, path, body) {
Ok(Some(response)) => Ok(response),
Ok(None) => Err(CoreError::NoResponseContent.into()),
Err(MockingError { bytes, status, url }) => Err(self.handle_error(
&bytes,
reqwest::StatusCode::from_u16(status.as_u16()).unwrap(),
&bytes, //reqwest::StatusCode::from_u16(status.as_u16()).unwrap(),
url,
)),
}
Expand Down Expand Up @@ -372,6 +378,13 @@ pub struct StandardCoreError {
error_type: String,
context: HashMap<String, Value>,
message: String,
cause: CoreErrorCause,
}

#[derive(Debug, Deserialize)]
pub enum CoreErrorCause {
Internal,
User,
}

impl crate::error::EditoastError for StandardCoreError {
Expand Down
34 changes: 27 additions & 7 deletions editoast/src/core/mq_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use serde_json::to_vec;
use std::{fmt::Debug, sync::Arc};
use thiserror::Error;
use tokio::time::{timeout, Duration};
use utoipa::openapi::response;

#[derive(Debug, Clone)]
pub struct RabbitMQClient {
Expand Down Expand Up @@ -44,11 +45,19 @@ pub enum Error {
#[error("Cannot deserialize response: {0}")]
#[editoast_error(status = "500")]
DeserialisationError(InternalError),
#[error("Cannot parse response status")]
#[editoast_error(status = "500")]
StatusParsingError,
#[error("Response timeout")]
#[editoast_error(status = "500")]
ResponseTimeout,
}

pub struct MQResponse {
pub payload: Vec<u8>,
pub status: Vec<u8>,
}

impl RabbitMQClient {
pub async fn new(options: Options) -> Result<Self, Error> {
let connection = Connection::connect(&options.uri, ConnectionProperties::default())
Expand Down Expand Up @@ -120,18 +129,17 @@ impl RabbitMQClient {
Ok(())
}

pub async fn call_with_response<T, TR>(
pub async fn call_with_response<T>(
&self,
routing_key: String,
path: &str,
published_payload: &Option<T>,
mandatory: bool,
correlation_id: Option<String>,
override_timeout: Option<u64>,
) -> Result<TR::Response, Error>
) -> Result<MQResponse, Error>
where
T: Serialize,
TR: CoreResponse,
{
// Create a channel
let channel = self
Expand Down Expand Up @@ -204,10 +212,22 @@ impl RabbitMQClient {
.map_err(Error::Lapin)?;

// Deserialize the response
let response =
TR::from_bytes(&delivery.data).map_err(|e| Error::DeserialisationError(e))?;

Ok(response)
// let payload =
// TR::from_bytes(&delivery.data).map_err(|e| Error::DeserialisationError(e))?;

let status = delivery
.properties
.headers()
.as_ref()
.and_then(|f| f.inner().get("x-status"))
.and_then(|s| s.as_byte_array())
.map(|s| Ok(s.as_slice().to_owned()))
.unwrap_or(Err(Error::StatusParsingError))?;

Ok(MQResponse {
payload: delivery.data,
status,
})
} else {
Err(Error::ResponseTimeout)
}
Expand Down

0 comments on commit 1a6f5ac

Please sign in to comment.