Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
barafael authored Nov 18, 2024
2 parents 603a762 + b5d0bdf commit eb56658
Show file tree
Hide file tree
Showing 20 changed files with 1,244 additions and 136 deletions.
4 changes: 4 additions & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,7 @@ create_consumer_strict_on_stream
leafnodes
get_stream
get_stream_no_info
lifecycle
AtomicU64
with_deleted
StreamInfoBuilder
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:

- name: Install msrv Rust on ubuntu-latest
id: install-rust
uses: dtolnay/rust-toolchain@1.70.0
uses: dtolnay/rust-toolchain@1.79.0
- name: Cache the build artifacts
uses: Swatinem/rust-cache@v2
with:
Expand Down
12 changes: 11 additions & 1 deletion async-nats/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# v0.36.0
# v0.37.0
## Overview

A smaller release containing stats and Watcher improvements.

## What's Changed
* Add Client stats by @Jarema in https://github.com/nats-io/nats.rs/pull/1314
* Improve kv::Watcher without messages by @Jarema in https://github.com/nats-io/nats.rs/pull/1321

**Full Changelog**: https://github.com/nats-io/nats.rs/compare/async-nats/v0.36.0...async-nats/v0.37.0

# v0.36.0
## Overview
This release adds a useful `futures::Sink<PublishMessage>`, and ability to get `Stream` handle without IO call,
among other changes.
Expand Down
12 changes: 7 additions & 5 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[package]
name = "async-nats"
authors = ["Tomasz Pietrek <[email protected]>", "Casper Beyer <[email protected]>"]
version = "0.36.0"
version = "0.37.0"
edition = "2021"
rust = "1.74.0"
rust = "1.79.0"
description = "A async Rust NATS client"
license = "Apache-2.0"
documentation = "https://docs.rs/async-nats"
Expand Down Expand Up @@ -41,6 +41,8 @@ ring = { version = "0.17", optional = true }
rand = "0.8"
webpki = { package = "rustls-webpki", version = "0.102" }
portable-atomic = "1"
tokio-websockets = { version = "0.10", features = ["client", "rand", "rustls-native-roots"], optional = true }
pin-project = "1.0"

[dev-dependencies]
ring = "0.17"
Expand All @@ -57,13 +59,13 @@ jsonschema = "0.17.1"
# for -Z minimal-versions
num = "0.4.1"


[features]
default = ["server_2_10", "ring"]
# Enables Service API for the client.
service = []
aws-lc-rs = ["dep:aws-lc-rs", "tokio-rustls/aws-lc-rs"]
ring = ["dep:ring", "tokio-rustls/ring"]
websockets = ["dep:tokio-websockets"]
aws-lc-rs = ["dep:aws-lc-rs", "tokio-rustls/aws-lc-rs", "tokio-websockets/aws-lc-rs"]
ring = ["dep:ring", "tokio-rustls/ring", "tokio-websockets/ring"]
fips = ["aws-lc-rs", "tokio-rustls/fips"]
# All experimental features are part of this feature flag.
experimental = ["service"]
Expand Down
2 changes: 1 addition & 1 deletion async-nats/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub struct Auth {
pub jwt: Option<String>,
pub nkey: Option<String>,
pub(crate) signature_callback: Option<CallbackArg1<String, Result<String, AuthError>>>,
pub signature: Option<String>,
pub signature: Option<Vec<u8>>,
pub username: Option<String>,
pub password: Option<String>,
pub token: Option<String>,
Expand Down
98 changes: 98 additions & 0 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct Client {
inbox_prefix: Arc<str>,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
connection_stats: Arc<Statistics>,
}

impl Sink<PublishMessage> for Client {
Expand All @@ -108,6 +109,7 @@ impl Sink<PublishMessage> for Client {
}

impl Client {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
info: tokio::sync::watch::Receiver<ServerInfo>,
state: tokio::sync::watch::Receiver<State>,
Expand All @@ -116,6 +118,7 @@ impl Client {
inbox_prefix: String,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
statistics: Arc<Statistics>,
) -> Client {
let poll_sender = PollSender::new(sender.clone());
Client {
Expand All @@ -128,9 +131,25 @@ impl Client {
inbox_prefix: inbox_prefix.into(),
request_timeout,
max_payload,
connection_stats: statistics,
}
}

/// Returns the default timeout for requests set when creating the client.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// println!("default request timeout: {:?}", client.timeout());
/// # Ok(())
/// # }
/// ```
pub fn timeout(&self) -> Option<Duration> {
self.request_timeout
}

/// Returns last received info from the server.
///
/// # Examples
Expand Down Expand Up @@ -612,6 +631,39 @@ impl Client {
Ok(())
}

/// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
/// messages, then closes the connection. Once completed, any associated streams associated with the
/// client will be closed, and further client commands will fail
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut subscription = client.subscribe("events.>").await?;
///
/// client.drain().await?;
///
/// # // existing subscriptions are closed and further commands will fail
/// assert!(subscription.next().await.is_none());
/// client
/// .subscribe("events.>")
/// .await
/// .expect_err("Expected further commands to fail");
///
/// # Ok(())
/// # }
/// ```
pub async fn drain(&self) -> Result<(), DrainError> {
// Drain all subscriptions
self.sender.send(Command::Drain { sid: None }).await?;

// Remaining process is handled on the handler-side
Ok(())
}

/// Returns the current state of the connection.
///
/// # Examples
Expand Down Expand Up @@ -649,6 +701,26 @@ impl Client {
.await
.map_err(Into::into)
}

/// Returns struct representing statistics of the whole lifecycle of the client.
/// This includes number of bytes sent/received, number of messages sent/received,
/// and number of times the connection was established.
/// As this returns [Arc] with [AtomicU64] fields, it can be safely reused and shared
/// across threads.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let statistics = client.statistics();
/// println!("client statistics: {:#?}", statistics);
/// # Ok(())
/// # }
/// ```
pub fn statistics(&self) -> Arc<Statistics> {
self.connection_stats.clone()
}
}

/// Used for building customized requests.
Expand Down Expand Up @@ -769,6 +841,16 @@ impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
}
}

#[derive(Error, Debug)]
#[error("failed to send drain: {0}")]
pub struct DrainError(#[source] crate::Error);

impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
DrainError(Box::new(err))
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RequestErrorKind {
/// There are services listening on requested subject, but they didn't respond
Expand Down Expand Up @@ -826,3 +908,19 @@ impl Display for FlushErrorKind {
}

pub type FlushError = Error<FlushErrorKind>;

/// Represents statistics for the instance of the client throughout its lifecycle.
#[derive(Default, Debug)]
pub struct Statistics {
/// Number of bytes received. This does not include the protocol overhead.
pub in_bytes: AtomicU64,
/// Number of bytes sent. This doe not include the protocol overhead.
pub out_bytes: AtomicU64,
/// Number of messages received.
pub in_messages: AtomicU64,
/// Number of messages sent.
pub out_messages: AtomicU64,
/// Number of times connection was established.
/// Initial connect will be counted as well, then all successful reconnects.
pub connects: AtomicU64,
}
Loading

0 comments on commit eb56658

Please sign in to comment.