From 036412a0665acb60446ae302e32ecb34cd8f9992 Mon Sep 17 00:00:00 2001 From: mineshp Date: Wed, 24 Apr 2024 16:59:16 +0530 Subject: [PATCH 1/2] chore: headers enabled for handshake channel --- commons/nats-client/src/lib.rs | 16 +++- grpc-server/src/services/messaging.rs | 1 + messaging/src/handler.rs | 7 +- messaging/src/service.rs | 12 ++- networking/src/errors.rs | 4 + networking/src/handler.rs | 21 ++++- networking/src/handshake_handler.rs | 117 ++++++++++++++++++++------ networking/src/service.rs | 47 +++-------- settings/src/service.rs | 1 + status/src/service.rs | 2 + telemetry/src/service.rs | 2 + 11 files changed, 161 insertions(+), 69 deletions(-) diff --git a/commons/nats-client/src/lib.rs b/commons/nats-client/src/lib.rs index a68f24ae..f78f05e6 100644 --- a/commons/nats-client/src/lib.rs +++ b/commons/nats-client/src/lib.rs @@ -4,6 +4,7 @@ pub use async_nats::Subscriber; pub use bytes::Bytes; use events::Event; use nkeys::KeyPair; +use std::collections::HashMap; use std::{str::FromStr, sync::Arc}; use tokio::sync::broadcast::Sender; use tracing::{debug, error, info, trace}; @@ -147,7 +148,12 @@ impl NatsClient { Ok(client) } - pub async fn publish(&self, subject: &str, data: Bytes) -> Result { + pub async fn publish( + &self, + subject: &str, + req_headers: Option>, + data: Bytes, + ) -> Result { trace!( func = "publish", package = PACKAGE_NAME, @@ -174,6 +180,14 @@ impl NatsClient { "X-Agent", async_nats::HeaderValue::from_str(version_detail.as_str()).unwrap(), ); + if req_headers.is_some() { + for (k, v) in req_headers.unwrap() { + headers.insert( + k.as_str(), + async_nats::HeaderValue::from_str(v.as_str()).unwrap(), + ); + } + } debug!( func = "publish", package = PACKAGE_NAME, diff --git a/grpc-server/src/services/messaging.rs b/grpc-server/src/services/messaging.rs index 86c732f1..4f5417b3 100644 --- a/grpc-server/src/services/messaging.rs +++ b/grpc-server/src/services/messaging.rs @@ -35,6 +35,7 @@ impl MessagingService for MessagingServiceHandler { reply_to: tx, message: message_request.message, subject: message_request.subject, + headers: None, }) .await; diff --git a/messaging/src/handler.rs b/messaging/src/handler.rs index 6880ade1..f8a7e6c2 100644 --- a/messaging/src/handler.rs +++ b/messaging/src/handler.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::errors::{MessagingError, MessagingErrorCodes}; use crate::service::{get_machine_id, Messaging}; use anyhow::{bail, Result}; @@ -32,6 +34,7 @@ pub enum MessagingMessage { reply_to: oneshot::Sender>, message: String, subject: String, + headers: Option>, }, Request { reply_to: oneshot::Sender>, @@ -78,8 +81,8 @@ impl MessagingHandler { continue; } match msg.unwrap() { - MessagingMessage::Send{reply_to, message, subject} => { - let res = self.messaging_client.publish(&subject.as_str(), Bytes::from(message)).await; + MessagingMessage::Send{reply_to, message, subject, headers} => { + let res = self.messaging_client.publish(&subject.as_str(), headers, Bytes::from(message)).await; let _ = reply_to.send(res); } MessagingMessage::Request{reply_to, message, subject} => { diff --git a/messaging/src/service.rs b/messaging/src/service.rs index 94ef9b9a..1bdb659d 100644 --- a/messaging/src/service.rs +++ b/messaging/src/service.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use agent_settings::read_settings_yml; use agent_settings::{messaging::MessagingSettings, AgentSettings}; use anyhow::{bail, Result}; @@ -197,7 +199,12 @@ impl Messaging { ); Ok(true) } - pub async fn publish(&self, subject: &str, data: Bytes) -> Result { + pub async fn publish( + &self, + subject: &str, + headers: Option>, + data: Bytes, + ) -> Result { let fn_name = "publish"; debug!( func = fn_name, @@ -219,7 +226,7 @@ impl Messaging { } let nats_client = self.nats_client.as_ref().unwrap(); - let is_published = match nats_client.publish(subject, data).await { + let is_published = match nats_client.publish(subject, headers, data).await { Ok(s) => s, Err(e) => { error!( @@ -408,6 +415,7 @@ pub async fn authenticate( package = PACKAGE_NAME, "authentication token obtained!" ); + println!("Auth token: -{:?}", token); Ok(token) } diff --git a/networking/src/errors.rs b/networking/src/errors.rs index 3f215554..ce253920 100644 --- a/networking/src/errors.rs +++ b/networking/src/errors.rs @@ -17,6 +17,7 @@ pub enum NetworkingErrorCodes { MessageAcknowledgeError, NetworkingInitError, NetworkingDiscoSocketBindError, + ExtractMessageHeadersError, } impl fmt::Display for NetworkingErrorCodes { @@ -58,6 +59,9 @@ impl fmt::Display for NetworkingErrorCodes { NetworkingErrorCodes::NetworkingDiscoSocketBindError => { write!(f, "NetworkingErrorCodes: NetworkingDiscoSocketBindError") } + NetworkingErrorCodes::ExtractMessageHeadersError => { + write!(f, "NetworkingErrorCodes: ExtractMessageHeadersError") + } } } } diff --git a/networking/src/handler.rs b/networking/src/handler.rs index bc123c79..3f0c1a92 100644 --- a/networking/src/handler.rs +++ b/networking/src/handler.rs @@ -14,10 +14,10 @@ use wireguard::Wireguard; use crate::errors::{NetworkingError, NetworkingErrorCodes}; use crate::handshake_handler::{ - await_networking_handshake_request, HandshakeChannelHandler, HandshakeMessage, Manifest, + await_networking_handshake_message, HandshakeChannelHandler, HandshakeMessage, Manifest, }; use crate::service::{ - await_consumer_message, configure_wireguard, create_channel_sync_consumer, + await_consumer_message, configure_wireguard, create_channel_sync_consumer, get_machine_id, get_networking_subscriber, publish_networking_channel, reconnect_messaging_service, }; @@ -92,7 +92,7 @@ impl NetworkingHandler { } }; let mut futures = JoinSet::new(); - futures.spawn(await_networking_handshake_request( + futures.spawn(await_networking_handshake_message( subscribers.handshake_request.unwrap(), handshake_handler.handshake_tx.clone(), )); @@ -127,6 +127,18 @@ impl NetworkingHandler { if exist_consumer_token.is_some() { let _ = exist_consumer_token.as_ref().unwrap().cancel(); } + let machine_id = match get_machine_id(self.identity_tx.clone()).await { + Ok(id) => id, + Err(e) => { + error!( + func = fn_name, + package = PACKAGE_NAME, + error = e.to_string().as_str(), + "Error getting machine id" + ); + bail!(e) + } + }; //TODO: handle this error unwrap let handshake_handler = self.handshake_handler.as_ref().unwrap(); // create a new token @@ -161,6 +173,7 @@ impl NetworkingHandler { messaging_tx.clone(), self.settings_tx.clone(), handshake_handler.channel_id.clone(), + machine_id, )); // create spawn for timer let _: JoinHandle> = tokio::task::spawn(async move { @@ -324,9 +337,9 @@ impl NetworkingHandler { } } //TODO: error handling ( do not exit the service if error occurs) + let _ = self.networking_consumer().await; let _ = publish_networking_channel(handshake_channel_id.clone(), self.messaging_tx.clone(), self.identity_tx.clone(), self.settings_tx.clone()).await; let _ = self.subscribe_to_nats().await; - let _ = self.networking_consumer().await; }, "false" => { let _ = reconnect_messaging_service(self.messaging_tx.clone(),v.to_string(), existing_settings).await; diff --git a/networking/src/handshake_handler.rs b/networking/src/handshake_handler.rs index ab2e542f..703b91ba 100644 --- a/networking/src/handshake_handler.rs +++ b/networking/src/handshake_handler.rs @@ -4,10 +4,11 @@ use std::str::FromStr; use agent_settings::{read_settings_yml, AgentSettings}; use anyhow::{bail, Result}; +use chrono::format; use crypto::random::generate_random_alphanumeric; use futures::StreamExt; use local_ip_address::list_afinet_netifas; -use messaging::async_nats::Message; +use messaging::async_nats::{HeaderMap, Message}; use messaging::handler::MessagingMessage; use messaging::Subscriber as NatsSubscriber; use serde::{Deserialize, Serialize}; @@ -196,6 +197,7 @@ impl HandshakeChannelHandler { subject: reply_subject, message: json!(manifest).to_string(), reply_to: tx, + headers: None, }) .await; Ok(true) @@ -241,7 +243,7 @@ pub fn discover_endpoints() -> Result> { Ok(ipv4_addr) } -pub async fn await_networking_handshake_request( +pub async fn await_networking_handshake_message( mut subscriber: NatsSubscriber, handshake_tx: mpsc::Sender, ) -> Result<()> { @@ -281,37 +283,100 @@ async fn process_handshake_request( )) } }; - let request_payload: ChannelDetails = match serde_json::from_str(&payload_str) { - Ok(s) => s, - Err(e) => bail!(NetworkingError::new( - NetworkingErrorCodes::PayloadDeserializationError, - format!("error while deserializing message payload {}", e), - true - )), + let message_type = + match get_header_by_key(message.headers.clone(), String::from("Message-Type")) { + Ok(s) => s, + Err(e) => { + error! { + func = fn_name, + package = PACKAGE_NAME, + "error getting message type from headers - {}", + e + }; + bail!(e) + } + }; + match message_type.as_str() { + "REQUEST" => { + let request_payload: ChannelDetails = match serde_json::from_str(&payload_str) { + Ok(s) => s, + Err(e) => bail!(NetworkingError::new( + NetworkingErrorCodes::PayloadDeserializationError, + format!("error while deserializing message payload {}", e), + true + )), + }; + info!( + func = fn_name, + package = PACKAGE_NAME, + "received handshake request: {:?}", + request_payload + ); + let reply_subject = format!( + "network.{}.node.handshake.{}", + sha256::digest(request_payload.network_id.clone()), + request_payload.channel.clone() + ); + let _ = handshake_tx + .send(HandshakeMessage::Request { + machine_id: request_payload.machine_id.clone(), + reply_subject: reply_subject, + }) + .await; + } + "REPLY" => { + let reply_payload: Manifest = match serde_json::from_str(&payload_str) { + Ok(s) => s, + Err(e) => bail!(NetworkingError::new( + NetworkingErrorCodes::PayloadDeserializationError, + format!("error while deserializing message payload {}", e), + true + )), + }; + println!("manifest received: {:?}", reply_payload); + } + _ => { + warn!( + func = fn_name, + package = PACKAGE_NAME, + "Unknown message type: {}", + message_type + ); + } + } + Ok(true) +} + +fn get_header_by_key(headers: Option, header_key: String) -> Result { + let fn_name = "get_header_by_key"; + let message_headers = match headers { + Some(h) => h, + None => { + warn!( + func = fn_name, + package = PACKAGE_NAME, + "No headers found in message", + ); + bail!(NetworkingError::new( + NetworkingErrorCodes::ExtractMessageHeadersError, + String::from("no headers found in message"), + false + )) + } }; - info!( - func = fn_name, - package = PACKAGE_NAME, - "received handshake request: {:?}", - request_payload - ); - let reply_to_subject = match message.reply { - Some(subject) => subject.to_string(), + let message_type = match message_headers.get("Message-Type") { + Some(v) => v.to_string(), None => { warn!( + func = fn_name, package = PACKAGE_NAME, - "No reply subject found in message: {:?}", message + "No message type found in message headers: {:?}", + message_headers ); - String::from("") //TODO: need to discuss + String::from("") } }; - let _ = handshake_tx - .send(HandshakeMessage::Request { - machine_id: request_payload.machine_id.clone(), - reply_subject: reply_to_subject, - }) - .await; - Ok(true) + Ok(message_type) } pub async fn create_disco_socket(addr: String) -> Result { info!(func = "create_disco_socket", package = PACKAGE_NAME, "init"); diff --git a/networking/src/service.rs b/networking/src/service.rs index d8197791..dd804a30 100644 --- a/networking/src/service.rs +++ b/networking/src/service.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::net::SocketAddr; -use agent_settings::{networking::NetworkingSettings, read_settings_yml, AgentSettings}; +use agent_settings::{read_settings_yml, AgentSettings}; use anyhow::{bail, Result}; use channel::{recv_with_custom_timeout, recv_with_timeout}; use crypto::random::generate_random_alphanumeric; @@ -268,6 +268,7 @@ pub async fn publish_networking_channel( reply_to: tx, message: json!(payload).to_string(), subject: subject, + headers: None, }) .await { @@ -296,6 +297,7 @@ pub async fn await_consumer_message( messaging_tx: Sender, settings_tx: Sender, channel_id: String, + machine_id: String, ) -> Result { println!("awaiting consumer message"); let fn_name = "await_consumer_message"; @@ -331,10 +333,11 @@ pub async fn await_consumer_message( }; while let Some(Ok(message)) = messages.next().await { println!("message in consumer stream {:?}", message.payload); - match process_message( + match process_consumer_message( message.clone(), network_id.clone(), channel_id.clone(), + machine_id.clone(), messaging_tx.clone(), ) .await @@ -371,10 +374,11 @@ pub async fn await_consumer_message( Ok(true) } -async fn process_message( +async fn process_consumer_message( message: Message, network_id: String, channel_id: String, + machine_id: String, messaging_tx: Sender, ) -> Result { let fn_name = "process_message"; @@ -442,16 +446,19 @@ async fn process_message( subject_to_publish_channel_info ); let channel_details_payload = ChannelDetails { - machine_id: request_payload.machine_id.clone(), + machine_id: machine_id.clone(), network_id: network_id.clone(), channel: channel_id.clone(), }; + let mut headers = HashMap::new(); + headers.insert(String::from("Message-Type"), String::from("REQUEST")); let (tx, rx) = oneshot::channel(); match messaging_tx - .send(MessagingMessage::Request { + .send(MessagingMessage::Send { reply_to: tx, message: json!(channel_details_payload).to_string(), subject: subject_to_publish_channel_info, + headers: Some(headers), }) .await { @@ -470,34 +477,6 @@ async fn process_message( )); } } - //wait for a replay - let result = match recv_with_custom_timeout(10000, rx).await { - Ok(res) => res, - Err(e) => { - error!( - func = fn_name, - package = PACKAGE_NAME, - "error receiving get que subscriber for issue token- {}", - e - ); - bail!(NetworkingError::new( - NetworkingErrorCodes::ChannelReceiveMessageError, - format!("error receiving subscriber message - {}", e), - true - )); - } - }; - let payload_str = match std::str::from_utf8(&result) { - Ok(s) => s, - Err(e) => { - bail!(NetworkingError::new( - NetworkingErrorCodes::ExtractMessagePayloadError, - format!("error converting payload to string - {}", e), - true - )) - } - }; - println!("payload_str {:?}", payload_str); match message.ack().await { Ok(_) => { println!("networking node message acknowledged") @@ -654,7 +633,7 @@ pub async fn create_channel_sync_consumer( ); Ok(consumer) } -async fn get_machine_id(identity_tx: mpsc::Sender) -> Result { +pub async fn get_machine_id(identity_tx: mpsc::Sender) -> Result { let (tx, rx) = oneshot::channel(); match identity_tx .clone() diff --git a/settings/src/service.rs b/settings/src/service.rs index 296e3e26..2260853d 100644 --- a/settings/src/service.rs +++ b/settings/src/service.rs @@ -490,6 +490,7 @@ async fn process_message( reply_to: tx, message: json!(ack_payload).to_string(), subject: header_value.to_string(), + headers: None, }) .await { diff --git a/status/src/service.rs b/status/src/service.rs index e329da40..4ce81be8 100644 --- a/status/src/service.rs +++ b/status/src/service.rs @@ -150,6 +150,7 @@ pub async fn send_status(status_options: SendStatusOptions) -> Result { reply_to: publish_result_tx, message: json!(publish_payload).to_string(), subject: format!("machine.{}.status.heartbeat", digest(machine_id.clone())), + headers: None, }) .await; match send_output { @@ -282,6 +283,7 @@ pub async fn machine_platform_info( reply_to: tx, message: json!(platform_info).to_string(), subject: format!("machine.{}.status.info", digest(machine_id.clone())), + headers: None, }) .await { diff --git a/telemetry/src/service.rs b/telemetry/src/service.rs index cf154f01..629f42a0 100644 --- a/telemetry/src/service.rs +++ b/telemetry/src/service.rs @@ -133,6 +133,7 @@ pub async fn process_metrics( reply_to: tx, message: payload.into(), subject: subject.clone(), + headers: None, }) .await { @@ -266,6 +267,7 @@ pub async fn process_logs( reply_to: tx, message: payload.into(), subject: subject.clone(), + headers: None, }) .await { From d766068d233e06887ed29746648b4541f5771a65 Mon Sep 17 00:00:00 2001 From: mineshp Date: Thu, 25 Apr 2024 10:34:22 +0530 Subject: [PATCH 2/2] chore: message ack removed if channel is same --- networking/src/handler.rs | 2 +- networking/src/handshake_handler.rs | 6 +++-- networking/src/service.rs | 37 +---------------------------- 3 files changed, 6 insertions(+), 39 deletions(-) diff --git a/networking/src/handler.rs b/networking/src/handler.rs index 3f0c1a92..48435f3e 100644 --- a/networking/src/handler.rs +++ b/networking/src/handler.rs @@ -337,9 +337,9 @@ impl NetworkingHandler { } } //TODO: error handling ( do not exit the service if error occurs) - let _ = self.networking_consumer().await; let _ = publish_networking_channel(handshake_channel_id.clone(), self.messaging_tx.clone(), self.identity_tx.clone(), self.settings_tx.clone()).await; let _ = self.subscribe_to_nats().await; + let _ = self.networking_consumer().await; }, "false" => { let _ = reconnect_messaging_service(self.messaging_tx.clone(),v.to_string(), existing_settings).await; diff --git a/networking/src/handshake_handler.rs b/networking/src/handshake_handler.rs index 703b91ba..d222f988 100644 --- a/networking/src/handshake_handler.rs +++ b/networking/src/handshake_handler.rs @@ -188,6 +188,8 @@ impl HandshakeChannelHandler { txn_id: txn_id, candidates: Some(candidates), }; + let mut header_map: HashMap = HashMap::new(); + header_map.insert(String::from("Message-Type"), String::from("REPLY")); println!("manifest: {:?}", manifest); // send reply to NATS let (tx, _rx) = oneshot::channel(); @@ -197,7 +199,7 @@ impl HandshakeChannelHandler { subject: reply_subject, message: json!(manifest).to_string(), reply_to: tx, - headers: None, + headers: Some(header_map), }) .await; Ok(true) @@ -364,7 +366,7 @@ fn get_header_by_key(headers: Option, header_key: String) -> Result v.to_string(), None => { warn!( diff --git a/networking/src/service.rs b/networking/src/service.rs index dd804a30..5f2c1fe5 100644 --- a/networking/src/service.rs +++ b/networking/src/service.rs @@ -352,19 +352,6 @@ pub async fn await_consumer_message( ); } } - - // Acknowledges a message delivery - match message.ack().await { - Ok(res) => println!("message Acknowledged {:?}", res), - Err(err) => { - error!( - func = fn_name, - package = PACKAGE_NAME, - "message acknowledge failed {}", - err - ); - } - }; } info!( func = fn_name, @@ -416,24 +403,6 @@ async fn process_consumer_message( package = PACKAGE_NAME, "message from same node, ignoring" ); - match message.ack().await { - Ok(_) => { - println!("networking node message acknowledged") - } - Err(e) => { - error!( - func = fn_name, - package = PACKAGE_NAME, - "error acknowledging message - {:?}", - e - ); - bail!(NetworkingError::new( - NetworkingErrorCodes::MessageAcknowledgeError, - format!("error acknowledging message - {:?}", e), - true - )) - } - }; return Ok(true); } let subject_to_publish_channel_info = format!( @@ -604,11 +573,7 @@ pub async fn create_channel_sync_consumer( } }; - let filter_subject = format!( - "networking.networks.{}.channels.{}", - digest(network_id), - digest(machine_id) - ); + let filter_subject = format!("networking.networks.{}.channels.*", digest(network_id)); let consumer = match jet_stream_client .create_consumer(stream, filter_subject, consumer_name.clone())