diff --git a/crates/rsjudge-rabbitmq/src/config.rs b/crates/rsjudge-rabbitmq/src/config.rs index 413bd17..39600c3 100644 --- a/crates/rsjudge-rabbitmq/src/config.rs +++ b/crates/rsjudge-rabbitmq/src/config.rs @@ -7,6 +7,7 @@ use rsjudge_traits::Config; #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct RabbitMqConfig { pub uri: String, + pub queue_name: String, } #[cfg(feature = "serde")] diff --git a/crates/rsjudge-rabbitmq/src/lib.rs b/crates/rsjudge-rabbitmq/src/lib.rs index 8693f47..ee85034 100644 --- a/crates/rsjudge-rabbitmq/src/lib.rs +++ b/crates/rsjudge-rabbitmq/src/lib.rs @@ -1,8 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 use amqprs::{ - callbacks::{DefaultChannelCallback, DefaultConnectionCallback}, - connection::{Connection, OpenConnectionArguments}, + callbacks::{ DefaultChannelCallback, DefaultConnectionCallback}, channel::{BasicAckArguments, BasicConsumeArguments, BasicPublishArguments, BasicQosArguments, Channel, QueueDeclareArguments}, connection::{Connection, OpenConnectionArguments}, BasicProperties, Deliver }; use crate::config::RabbitMqConfig; @@ -13,17 +12,63 @@ mod error; pub async fn register(config: RabbitMqConfig) -> Result<()> { // Build arguments for new connection. - let args = OpenConnectionArguments::try_from(&*config.uri)?; - let connection = Connection::open(&args).await?; + let conn_args = OpenConnectionArguments::try_from(&*config.uri)?; + let connection = Connection::open(&conn_args).await?; connection .register_callback(DefaultConnectionCallback) .await?; + let channel = connection.open_channel(None).await?; channel.register_callback(DefaultChannelCallback).await?; channel.flow(true).await?; + channel.basic_qos(BasicQosArguments::default().prefetch_count(1).finish()).await?; + + let queue_name = config.queue_name; + let queue_args = QueueDeclareArguments::new(&queue_name).durable(true).finish(); + channel.queue_declare(queue_args).await?; + + let consumer_args = BasicConsumeArguments::new("rpc_queue", ""); + let (_, mut rx) = channel.basic_consume_rx(consumer_args).await?; + + while let Some(message) = rx.recv().await { + if let Some(payload) = message.content { + on_request( + &channel, + message.deliver.unwrap(), + message.basic_properties.unwrap(), + payload, + ) + .await?; + } + } // Gracefully shutdown. channel.close().await?; connection.close().await?; Ok(()) } + +async fn on_request(channel: &Channel, method: Deliver, props: BasicProperties, payload: Vec) -> Result<()>{ + let content = std::str::from_utf8(&payload).unwrap_or("").to_string(); + let response = process(content).await.into_bytes(); + + let publish_args = BasicPublishArguments::new("", props.reply_to().unwrap_or(&"".to_string())); + + let properties = BasicProperties::default() + .with_correlation_id(props.correlation_id().unwrap_or(&"".to_string())) + .finish(); + + channel + .basic_publish(properties, response, publish_args) + .await?; + + channel + .basic_ack(BasicAckArguments::new(method.delivery_tag(), false)) + .await?; + + Ok(()) +} + +async fn process(s: String) -> String { + s +} diff --git a/src/config.rs b/src/config.rs index 04921bc..d30a054 100644 --- a/src/config.rs +++ b/src/config.rs @@ -66,7 +66,8 @@ mod tests { }, #[cfg(feature = "rabbitmq")] rabbitmq: RabbitMqConfig { - uri: "amqp://user:bitnami@localhost".to_owned() + uri: "amqp://user:bitnami@localhost".to_owned(), + queue_name: "rpc_queue".to_owned(), }, })? );