Skip to content

Commit

Permalink
Useless Option<HeaderMap>
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Oct 8, 2023
1 parent 4222b83 commit 9b5d25c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 64 deletions.
55 changes: 19 additions & 36 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,8 @@ impl Client {
/// # }
/// ```
pub async fn publish(&self, subject: Subject, payload: Bytes) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: None,
headers: None,
})
.await?;
Ok(())
self.publish_with_headers(subject, HeaderMap::new(), payload)
.await
}

/// Publish a [Message] with headers to a given subject.
Expand Down Expand Up @@ -193,7 +186,7 @@ impl Client {
subject,
payload,
respond: None,
headers: Some(headers),
headers,
})
.await?;
Ok(())
Expand Down Expand Up @@ -225,15 +218,8 @@ impl Client {
reply: Subject,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: Some(reply),
headers: None,
})
.await?;
Ok(())
self.publish_with_reply_and_headers(subject, reply, HeaderMap::new(), payload)
.await
}

/// Publish a [Message] to a given subject with headers and specified response subject
Expand Down Expand Up @@ -271,7 +257,7 @@ impl Client {
subject,
payload,
respond: Some(reply),
headers: Some(headers),
headers,
})
.await?;
Ok(())
Expand Down Expand Up @@ -344,17 +330,14 @@ impl Client {
if let Some(inbox) = request.inbox {
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut subscriber = self.subscribe(inbox.clone().into()).await?;
let payload: Bytes = request.payload.unwrap_or_default();
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox.into(), headers, payload)
.await?
}
None => {
self.publish_with_reply(subject, inbox.into(), payload)
.await?
}
}
self.publish_with_reply_and_headers(
subject,
inbox.into(),
request.headers,
request.payload,
)
.await?;

let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, subscriber.next())
Expand All @@ -381,9 +364,9 @@ impl Client {
} else {
let (sender, receiver) = oneshot::channel();

let payload = request.payload.unwrap_or_default();
let respond = self.new_inbox().into();
let headers = request.headers;
let payload = request.payload;

self.sender
.send(Command::Request {
Expand Down Expand Up @@ -551,8 +534,8 @@ impl Client {
/// Used for building customized requests.
#[derive(Default)]
pub struct Request {
payload: Option<Bytes>,
headers: Option<HeaderMap>,
payload: Bytes,
headers: HeaderMap,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}
Expand All @@ -575,7 +558,7 @@ impl Request {
/// # }
/// ```
pub fn payload(mut self, payload: Bytes) -> Request {
self.payload = Some(payload);
self.payload = payload;
self
}

Expand All @@ -600,7 +583,7 @@ impl Request {
/// # }
/// ```
pub fn headers(mut self, headers: HeaderMap) -> Request {
self.headers = Some(headers);
self.headers = headers;
self
}

Expand Down
34 changes: 14 additions & 20 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,30 +431,24 @@ impl Connection {
respond,
headers,
} => {
let verb = match headers.as_ref() {
Some(headers) if !headers.is_empty() => "HPUB",
_ => "PUB",
};
let verb = if !headers.is_empty() { "HPUB" } else { "PUB" };

small_write!("{verb} {subject} ");

if let Some(respond) = respond {
small_write!("{respond} ");
}

match headers {
Some(headers) if !headers.is_empty() => {
let headers = headers.to_bytes();
if !headers.is_empty() {
let headers = headers.to_bytes();

let headers_len = headers.len();
let total_len = headers_len + payload.len();
small_write!("{headers_len} {total_len}\r\n");
self.write(headers);
}
_ => {
let payload_len = payload.len();
small_write!("{payload_len}\r\n");
}
let headers_len = headers.len();
let total_len = headers_len + payload.len();
small_write!("{headers_len} {total_len}\r\n");
self.write(headers);
} else {
let payload_len = payload.len();
small_write!("{payload_len}\r\n");
}

self.write(Bytes::clone(payload));
Expand Down Expand Up @@ -954,7 +948,7 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: None,
headers: None,
headers: HeaderMap::new(),
}]
.iter(),
)
Expand All @@ -973,7 +967,7 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: Some("INBOX.67".into()),
headers: None,
headers: HeaderMap::new(),
}]
.iter(),
)
Expand All @@ -991,10 +985,10 @@ mod write_op {
subject: "FOO.BAR".into(),
payload: "Hello World".into(),
respond: Some("INBOX.67".into()),
headers: Some(HeaderMap::from_iter([(
headers: HeaderMap::from_iter([(
"Header".parse().unwrap(),
"X".parse().unwrap(),
)])),
)]),
}]
.iter(),
)
Expand Down
8 changes: 3 additions & 5 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ impl futures::Stream for Streams<'_> {
#[derive(Default, Clone, Debug)]
pub struct Publish {
payload: Bytes,
headers: Option<header::HeaderMap>,
headers: header::HeaderMap,
}
impl Publish {
/// Creates a new custom Publish struct to be used with.
Expand All @@ -1194,14 +1194,12 @@ impl Publish {
}
/// Adds headers to the message.
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.headers = Some(headers);
self.headers = headers;
self
}
/// A shorthand to add a single header.
pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
self.headers
.get_or_insert(header::HeaderMap::new())
.insert(name, value);
self.headers.insert(name, value);
self
}
/// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
Expand Down
6 changes: 3 additions & 3 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,13 @@ pub(crate) enum Command {
subject: Subject,
payload: Bytes,
respond: Option<Subject>,
headers: Option<HeaderMap>,
headers: HeaderMap,
},
Request {
subject: Subject,
payload: Bytes,
respond: Subject,
headers: Option<HeaderMap>,
headers: HeaderMap,
sender: oneshot::Sender<Message>,
},
Subscribe {
Expand All @@ -301,7 +301,7 @@ pub(crate) enum ClientOp {
subject: Subject,
payload: Bytes,
respond: Option<Subject>,
headers: Option<HeaderMap>,
headers: HeaderMap,
},
Subscribe {
sid: u64,
Expand Down

0 comments on commit 9b5d25c

Please sign in to comment.