diff --git a/Cargo.toml b/Cargo.toml index 01cbe5a9..7dd41ac9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,11 +45,13 @@ members = [ "cloudevents-sdk-actix-web", "cloudevents-sdk-reqwest", "cloudevents-sdk-rdkafka", - "cloudevents-sdk-warp" + "cloudevents-sdk-warp", + "cloudevents-sdk-mqtt" ] exclude = [ "example-projects/actix-web-example", "example-projects/reqwest-wasm-example", "example-projects/rdkafka-example", "example-projects/warp-example", + "example-projects/mqtt-example" ] \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/Cargo.toml b/cloudevents-sdk-mqtt/Cargo.toml new file mode 100644 index 00000000..f01e197d --- /dev/null +++ b/cloudevents-sdk-mqtt/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "cloudevents-sdk-mqtt" +version = "0.2.0" +authors = ["Francesco Guardiani "] +license-file = "../LICENSE" +edition = "2018" +description = "CloudEvents official Rust SDK - Mqtt integration" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cloudevents-sdk = { version = "0.2.0", path = ".." } +lazy_static = "1.4.0" +paho-mqtt = { path = "../../paho.mqtt.rust" } +chrono = { version = "^0.4", features = ["serde"] } + +[dev-dependencies] +serde_json = "^1.0" \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/headers.rs b/cloudevents-sdk-mqtt/src/headers.rs new file mode 100644 index 00000000..f9f57a6c --- /dev/null +++ b/cloudevents-sdk-mqtt/src/headers.rs @@ -0,0 +1,35 @@ +use cloudevents::event::SpecVersion; +use lazy_static::lazy_static; +use std::collections::HashMap; + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + format!("ce_{}", $attribute) + }; +} + +fn attributes_to_headers(it: impl Iterator) -> HashMap<&'static str, String> { + it.map(|s| { + if s == "datacontenttype" { + (s, String::from("content-type")) + } else { + (s, attribute_name_to_header!(s)) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_MQTT_HEADERS: HashMap<&'static str, String> = + attributes_to_headers(SpecVersion::all_attribute_names()); +} + +pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion"; +pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json"; +pub(crate) static CONTENT_TYPE: &'static str = "content-type"; + +pub enum MqttVersion { + V3_1, + V3_1_1, + V5, +} \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/lib.rs b/cloudevents-sdk-mqtt/src/lib.rs new file mode 100644 index 00000000..2499b500 --- /dev/null +++ b/cloudevents-sdk-mqtt/src/lib.rs @@ -0,0 +1,14 @@ +//! This library provides Mqtt protocol bindings for CloudEvents +//! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ +#[macro_use] +mod headers; +mod mqtt_producer_record; +mod mqtt_consumer_record; + +pub use mqtt_consumer_record::record_to_event; +pub use mqtt_consumer_record::ConsumerMessageDeserializer; +pub use mqtt_consumer_record::MessageExt; + +pub use mqtt_producer_record::MessageBuilderExt; +pub use mqtt_producer_record::MessageRecord; +pub use headers::MqttVersion; \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs new file mode 100644 index 00000000..2ad83b3e --- /dev/null +++ b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs @@ -0,0 +1,225 @@ +use crate::headers; +use cloudevents::event::SpecVersion; +use cloudevents::message::{Result, BinarySerializer, BinaryDeserializer, MessageAttributeValue, + MessageDeserializer, Encoding, StructuredSerializer, StructuredDeserializer}; +use cloudevents::{message, Event}; +use paho_mqtt::{Message, PropertyCode}; +use std::collections::HashMap; +use std::convert::TryFrom; +use std::str; + +pub struct ConsumerMessageDeserializer { + pub(crate) headers: HashMap>, + pub(crate) payload: Option>, +} + +impl ConsumerMessageDeserializer { + fn get_mqtt_headers(message: &Message) -> Result>> { + let mut hm = HashMap::new(); + let prop_iterator = message.properties().iter(PropertyCode::UserProperty); + + for property in prop_iterator { + let header = property.get_string_pair().unwrap(); + hm.insert(header.0.to_string(), Vec::from(header.1)); + } + + Ok(hm) + } + + pub fn new(message: &Message) -> Result { + Ok(ConsumerMessageDeserializer { + headers: Self::get_mqtt_headers(message)?, + payload: Some(message.payload()).map(|s| Vec::from(s)), + }) + } +} + +impl BinaryDeserializer for ConsumerMessageDeserializer { + fn deserialize_binary>(mut self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}) + } + + let spec_version = SpecVersion::try_from( + str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..]) + .map_err(|e| cloudevents::message::Error::Other { + source: Box::new(e), + })?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + cloudevents::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } + + for (hn, hv) in self + .headers + .into_iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) + { + let name = &hn["ce_".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + cloudevents::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + cloudevents::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } + } + + if self.payload != None { + visitor.end_with_data(self.payload.unwrap()) + } else { + visitor.end() + } + } +} + +impl StructuredDeserializer for ConsumerMessageDeserializer { + fn deserialize_structured>(self, visitor: V) -> Result { + visitor.set_structured_event(self.payload.unwrap()) + } +} + +impl MessageDeserializer for ConsumerMessageDeserializer { + fn encoding(&self) -> Encoding { + match ( + self.headers + .get("content-type") + .map(|s| String::from_utf8(s.to_vec()).ok()) + .flatten() + .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER)) + .unwrap_or(false), + self.headers.get(headers::SPEC_VERSION_HEADER), + ) { + (true, _) => Encoding::STRUCTURED, + (_, Some(_)) => Encoding::BINARY, + _ => Encoding::UNKNOWN, + } + } +} + +pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result { + match version { + headers::MqttVersion::V5 => BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + headers::MqttVersion::V3_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + headers::MqttVersion::V3_1_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + } +} + +pub trait MessageExt { + fn to_event(&self, version: headers::MqttVersion) -> Result; +} + +impl MessageExt for Message { + fn to_event(&self, version: headers::MqttVersion) -> Result { + record_to_event(self, version) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mqtt_producer_record::MessageRecord; + + use chrono::Utc; + use cloudevents::{EventBuilder, EventBuilderV10}; + use crate::MessageBuilderExt; + use serde_json::json; + use cloudevents::event::Data; + + #[test] + fn test_binary_record() { + let time = Utc::now(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .data("application/json", + Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes())) + .extension("someint", "10") + .build() + .unwrap(); + + let message_record = MessageRecord::from_event( + EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .extension("someint", "10") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(), + headers::MqttVersion::V5, + ) + .unwrap(); + + let msg = MessageBuilder::new() + .topic("test") + .message_record(&message_record) + .qos(1) + .finalize(); + + assert_eq!(msg.to_event(headers::MqttVersion::V5).unwrap(), expected) + } + + #[test] + fn test_structured_record() { + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/cloudevents+json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/cloudevents+json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let serialized_event = + StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap(); + + let msg = MessageBuilder::new() + .topic("test") + .message_record(&serialized_event) + .qos(1) + .finalize(); + + assert_eq!(msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), expected) + } +} \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs new file mode 100644 index 00000000..1b894dc0 --- /dev/null +++ b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs @@ -0,0 +1,137 @@ +use super::headers; +use paho_mqtt::{Properties, Property, PropertyCode, MessageBuilder}; +use cloudevents::message::{BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, + StructuredDeserializer, StructuredSerializer, Error}; +use cloudevents::Event; +use cloudevents::event::SpecVersion; +use std::option::Option::Some; + +pub struct MessageRecord { + pub(crate) headers: Properties, + pub(crate) payload: Option>, +} + +impl MessageRecord { + /// Create a new empty [`MessageRecord`] + pub fn new() -> Self { + MessageRecord { + headers: Properties::new(), + payload: None, + } + } + + pub fn from_event(event: Event, version: headers::MqttVersion) -> Result { + match version { + headers::MqttVersion::V5 => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()), + headers::MqttVersion::V3_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), + headers::MqttVersion::V3_1_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), + } + } +} + +impl BinarySerializer for MessageRecord { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, headers::SPEC_VERSION_HEADER, + spec_version.as_str()) { + Ok(property) => { + match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e) + }), + _ => Ok(self) + } + }, + _ => Err(Error::UnrecognizedAttributeName { + name: headers::SPEC_VERSION_HEADER.to_string() + }) + } + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, &headers::ATTRIBUTES_TO_MQTT_HEADERS + .get(name) + .ok_or(cloudevents::message::Error::UnrecognizedAttributeName { + name: String::from(name), + })? + .clone()[..], + &value.to_string()[..]) { + Ok(property) => { + match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e) + }), + _ => Ok(self) + } + }, + _ => Err(Error::UnrecognizedAttributeName { + name: headers::SPEC_VERSION_HEADER.to_string() + }) + } + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, + &attribute_name_to_header!(name)[..], + &value.to_string()[..]) { + Ok(property) => { + match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e) + }), + _ => Ok(self) + } + }, + _ => Err(Error::UnrecognizedAttributeName { + name: headers::SPEC_VERSION_HEADER.to_string() + }) + } + } + + fn end_with_data(mut self, bytes: Vec) -> Result { + self.payload = Some(bytes); + + Ok(self) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl StructuredSerializer for MessageRecord { + fn set_structured_event(mut self, bytes: Vec) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, + headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER) { + Ok(property) => { + match self.headers.push(property) { + _ => () + } + }, + _ => () + } + self.payload = Some(bytes); + + Ok(self) + } +} + +pub trait MessageBuilderExt { + fn message_record( + self, + message_record: & MessageRecord, + ) -> MessageBuilder; +} + +impl MessageBuilderExt for MessageBuilder { + fn message_record(mut self, + message_record: & MessageRecord + ) -> MessageBuilder { + self = self.properties(message_record.headers.clone()); + + if let Some(s) = message_record.payload.as_ref() { + self = self.payload(s.to_vec()); + } + + self + } +} \ No newline at end of file diff --git a/example-projects/mqtt-example/Cargo.toml b/example-projects/mqtt-example/Cargo.toml new file mode 100644 index 00000000..dce76b2e --- /dev/null +++ b/example-projects/mqtt-example/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "mqtt-example" +version = "0.2.0" +authors = ["Subhobrata Dey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "^0.1.33" +cloudevents-sdk = { path = "../sdk-rust" } +cloudevents-sdk-mqtt = { path = "../sdk-rust/cloudevents-sdk-mqtt"} +env_logger = "0.7.1" +paho-mqtt = { path = "../paho.mqtt.rust" } +serde_json = "^1.0" +futures = "^0.3" +tokio = { version = "^0.2", features = ["full"] } +clap = "2.33.1" \ No newline at end of file diff --git a/example-projects/mqtt-example/src/main.rs b/example-projects/mqtt-example/src/main.rs new file mode 100644 index 00000000..42456f2f --- /dev/null +++ b/example-projects/mqtt-example/src/main.rs @@ -0,0 +1,264 @@ +use clap::{App, Arg}; +use std::process; +use futures::executor::block_on; +use paho_mqtt as mqtt; +use tokio::time::Duration; +use serde_json::json; +use std::option::Option::Some; +use tokio::stream::StreamExt; +use cloudevents::{EventBuilderV10, EventBuilder}; +use cloudevents_sdk_mqtt::{MessageRecord, MessageBuilderExt, MqttVersion, MessageExt}; + +fn consume_v3(broker: &str, topic_name: &str) { + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .client_id("rust_async_consumer") + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(async { + let mut strm = cli.get_stream(25); + + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) + .finalize(); + + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; + + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; + + println!("Waiting for messages..."); + + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event(MqttVersion::V3_1_1).unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; + } + } + } + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn consume_v5(broker: &str, topic_name: &str) { + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .client_id("rust_async_consumer") + .mqtt_version(5) + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(async { + let mut strm = cli.get_stream(25); + + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) + .mqtt_version(5) + .finalize(); + + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; + + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; + + println!("Waiting for messages..."); + + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event(MqttVersion::V5).unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; + } + } + } + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn produce_v3(broker: &str, topic_name: &str) { + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(async { + let conn_opts = mqtt::ConnectOptions::new(); + + cli.connect(conn_opts).await?; + + println!("Publishing a message on the topic"); + + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + let message_record = + MessageRecord::from_event(event, MqttVersion::V3_1_1).expect("error while serializing the event"); + + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .message_record(&message_record) + .qos(1) + .finalize(); + + cli.publish(msg).await?; + + cli.disconnect(None).await?; + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn produce_v5(broker: &str, topic_name: &str) { + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .mqtt_version(5) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(async { + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .mqtt_version(5) + .finalize(); + + cli.connect(conn_opts).await?; + + println!("Publishing a message on the topic"); + + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + let message_record = + MessageRecord::from_event(event, MqttVersion::V5).expect("error while serializing the event"); + + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .message_record(&message_record) + .qos(1) + .finalize(); + + cli.publish(msg).await?; + + cli.disconnect(None).await?; + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn main() { + let selector = App::new("CloudEvents Mqtt Example") + .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) + .about("select consumer or producer") + .arg( + Arg::with_name("mode") + .long("mode") + .help("enter \"consmer\" or \"producer\"") + .takes_value(true) + .possible_values(&["consumerV3", "producerV3", "consumerV5", "producerV5"]) + .required(true), + ) + .arg( + Arg::with_name("topic") + .long("topic") + .help("Mqtt topic") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("broker") + .short("b") + .long("broker") + .help("Broker list in mqtt format") + .takes_value(true) + .default_value("tcp://localhost:1883"), + ) + .get_matches(); + + + match selector.value_of("mode").unwrap() { + "producerV3" => { + produce_v3( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + "consumerV3" => { + consume_v3( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + "producerV5" => { + produce_v5( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + "consumerV5" => { + consume_v5( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + _ => (), + } +} \ No newline at end of file