Skip to content

Commit

Permalink
chore: message ack removed if channel is same
Browse files Browse the repository at this point in the history
  • Loading branch information
mineshp-mecha committed Apr 25, 2024
1 parent 036412a commit d766068
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 39 deletions.
2 changes: 1 addition & 1 deletion networking/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,9 @@ impl NetworkingHandler {
}
}
//TODO: error handling ( do not exit the service if error occurs)
let _ = self.networking_consumer().await;
let _ = publish_networking_channel(handshake_channel_id.clone(), self.messaging_tx.clone(), self.identity_tx.clone(), self.settings_tx.clone()).await;
let _ = self.subscribe_to_nats().await;
let _ = self.networking_consumer().await;
},
"false" => {
let _ = reconnect_messaging_service(self.messaging_tx.clone(),v.to_string(), existing_settings).await;
Expand Down
6 changes: 4 additions & 2 deletions networking/src/handshake_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ impl HandshakeChannelHandler {
txn_id: txn_id,
candidates: Some(candidates),
};
let mut header_map: HashMap<String, String> = HashMap::new();
header_map.insert(String::from("Message-Type"), String::from("REPLY"));
println!("manifest: {:?}", manifest);
// send reply to NATS
let (tx, _rx) = oneshot::channel();
Expand All @@ -197,7 +199,7 @@ impl HandshakeChannelHandler {
subject: reply_subject,
message: json!(manifest).to_string(),
reply_to: tx,
headers: None,
headers: Some(header_map),
})
.await;
Ok(true)
Expand Down Expand Up @@ -364,7 +366,7 @@ fn get_header_by_key(headers: Option<HeaderMap>, header_key: String) -> Result<S
))
}
};
let message_type = match message_headers.get("Message-Type") {
let message_type = match message_headers.get(header_key.as_str()) {
Some(v) => v.to_string(),
None => {
warn!(
Expand Down
37 changes: 1 addition & 36 deletions networking/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,19 +352,6 @@ pub async fn await_consumer_message(
);
}
}

// Acknowledges a message delivery
match message.ack().await {
Ok(res) => println!("message Acknowledged {:?}", res),
Err(err) => {
error!(
func = fn_name,
package = PACKAGE_NAME,
"message acknowledge failed {}",
err
);
}
};
}
info!(
func = fn_name,
Expand Down Expand Up @@ -416,24 +403,6 @@ async fn process_consumer_message(
package = PACKAGE_NAME,
"message from same node, ignoring"
);
match message.ack().await {
Ok(_) => {
println!("networking node message acknowledged")
}
Err(e) => {
error!(
func = fn_name,
package = PACKAGE_NAME,
"error acknowledging message - {:?}",
e
);
bail!(NetworkingError::new(
NetworkingErrorCodes::MessageAcknowledgeError,
format!("error acknowledging message - {:?}", e),
true
))
}
};
return Ok(true);
}
let subject_to_publish_channel_info = format!(
Expand Down Expand Up @@ -604,11 +573,7 @@ pub async fn create_channel_sync_consumer(
}
};

let filter_subject = format!(
"networking.networks.{}.channels.{}",
digest(network_id),
digest(machine_id)
);
let filter_subject = format!("networking.networks.{}.channels.*", digest(network_id));

let consumer = match jet_stream_client
.create_consumer(stream, filter_subject, consumer_name.clone())
Expand Down

0 comments on commit d766068

Please sign in to comment.