Skip to content

Commit

Permalink
Useless Option<HeaderMap>
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Jul 22, 2023
1 parent 0534719 commit 8809b13
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 61 deletions.
26 changes: 6 additions & 20 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,8 @@ impl Client {
/// # }
/// ```
pub async fn publish(&self, subject: String, payload: Bytes) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: None,
headers: None,
})
.await?;
Ok(())
self.publish_with_headers(subject, HeaderMap::new(), payload)
.await
}

/// Publish a [Message] with headers to a given subject.
Expand Down Expand Up @@ -190,7 +183,7 @@ impl Client {
subject,
payload,
respond: None,
headers: Some(headers),
headers,
})
.await?;
Ok(())
Expand Down Expand Up @@ -222,15 +215,8 @@ impl Client {
reply: String,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: Some(reply),
headers: None,
})
.await?;
Ok(())
self.publish_with_reply_and_headers(subject, reply, HeaderMap::new(), payload)
.await
}

/// Publish a [Message] to a given subject with headers and specified response subject
Expand Down Expand Up @@ -268,7 +254,7 @@ impl Client {
subject,
payload,
respond: Some(reply),
headers: Some(headers),
headers,
})
.await?;
Ok(())
Expand Down
69 changes: 30 additions & 39 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,10 @@ impl Connection {
respond,
headers,
} => {
match headers.as_ref() {
Some(headers) if !headers.is_empty() => {
self.stream.write_all(b"HPUB ").await?;
}
_ => {
self.stream.write_all(b"PUB ").await?;
}
if !headers.is_empty() {
self.stream.write_all(b"HPUB ").await?;
} else {
self.stream.write_all(b"PUB ").await?;
}

self.stream.write_all(subject.as_bytes()).await?;
Expand All @@ -376,36 +373,33 @@ impl Connection {
self.stream.write_all(b" ").await?;
}

match headers {
Some(headers) if !headers.is_empty() => {
let headers = headers.to_bytes();
if !headers.is_empty() {
let headers = headers.to_bytes();

let mut header_len_buf = itoa::Buffer::new();
self.stream
.write_all(header_len_buf.format(headers.len()).as_bytes())
.await?;
let mut header_len_buf = itoa::Buffer::new();
self.stream
.write_all(header_len_buf.format(headers.len()).as_bytes())
.await?;

self.stream.write_all(b" ").await?;
self.stream.write_all(b" ").await?;

let mut total_len_buf = itoa::Buffer::new();
self.stream
.write_all(
total_len_buf
.format(headers.len() + payload.len())
.as_bytes(),
)
.await?;
let mut total_len_buf = itoa::Buffer::new();
self.stream
.write_all(
total_len_buf
.format(headers.len() + payload.len())
.as_bytes(),
)
.await?;

self.stream.write_all(b"\r\n").await?;
self.stream.write_all(&headers).await?;
}
_ => {
let mut len_buf = itoa::Buffer::new();
self.stream
.write_all(len_buf.format(payload.len()).as_bytes())
.await?;
self.stream.write_all(b"\r\n").await?;
}
self.stream.write_all(b"\r\n").await?;
self.stream.write_all(&headers).await?;
} else {
let mut len_buf = itoa::Buffer::new();
self.stream
.write_all(len_buf.format(payload.len()).as_bytes())
.await?;
self.stream.write_all(b"\r\n").await?;
}

self.stream.write_all(payload).await?;
Expand Down Expand Up @@ -780,7 +774,7 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: None,
headers: None,
headers: HeaderMap::new(),
})
.await
.unwrap();
Expand All @@ -797,7 +791,7 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: Some("INBOX.67".into()),
headers: None,
headers: HeaderMap::new(),
})
.await
.unwrap();
Expand All @@ -813,10 +807,7 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: Some("INBOX.67".into()),
headers: Some(HeaderMap::from_iter([(
"Header".parse().unwrap(),
"X".parse().unwrap(),
)])),
headers: HeaderMap::from_iter([("Header".parse().unwrap(), "X".parse().unwrap())]),
})
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub(crate) enum Command {
subject: String,
payload: Bytes,
respond: Option<String>,
headers: Option<HeaderMap>,
headers: HeaderMap,
},
Subscribe {
sid: u64,
Expand All @@ -289,7 +289,7 @@ pub(crate) enum ClientOp {
subject: String,
payload: Bytes,
respond: Option<String>,
headers: Option<HeaderMap>,
headers: HeaderMap,
},
Subscribe {
sid: u64,
Expand Down

0 comments on commit 8809b13

Please sign in to comment.