Skip to content

Commit

Permalink
Merge pull request #165 from 56quarters/client-refactor
Browse files Browse the repository at this point in the history
Refactor DNS and Memcached clients to allow easier testing
  • Loading branch information
56quarters authored Jul 24, 2024
2 parents 88f063a + b62c1c8 commit ddb4bf8
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 197 deletions.
73 changes: 26 additions & 47 deletions mtop-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,62 +13,30 @@ use tokio_rustls::rustls::ClientConfig;
use tracing::instrument::WithSubscriber;

#[derive(Debug, Clone)]
pub struct MemcachedPoolConfig {
pub tls: TlsConfig,
pub struct MemcachedClientConfig {
pub pool_max_idle: u64,
}

impl Default for MemcachedPoolConfig {
impl Default for MemcachedClientConfig {
fn default() -> Self {
Self {
tls: TlsConfig::default(),
pool_max_idle: 4,
}
Self { pool_max_idle: 4 }
}
}

/// Implementation of a `ClientFactory` that creates new Memcached clients that
/// use plaintext or TLS TCP connections.
#[derive(Debug)]
pub struct MemcachedPool {
inner: ClientPool<Server, Memcached, MemcachedFactory>,
}

impl MemcachedPool {
pub async fn new(handle: Handle, config: MemcachedPoolConfig) -> Result<Self, MtopError> {
let pool_config = ClientPoolConfig {
name: "memcached-tcp".to_owned(),
max_idle: config.pool_max_idle,
};

let factory = MemcachedFactory::new(handle, config).await?;
let inner = ClientPool::new(pool_config, factory);
Ok(Self { inner })
}

pub async fn get(&self, server: &Server) -> Result<PooledClient<Server, Memcached>, MtopError> {
self.inner.get(server).await
}

pub async fn put(&self, client: PooledClient<Server, Memcached>) {
self.inner.put(client).await
}
}

#[derive(Debug)]
struct MemcachedFactory {
pub struct MemcachedFactory {
client_config: Option<Arc<ClientConfig>>,
server_name: Option<ServerName<'static>>,
}

impl MemcachedFactory {
async fn new(handle: Handle, config: MemcachedPoolConfig) -> Result<Self, MtopError> {
let server_name = if config.tls.enabled {
config.tls.server_name.clone()
} else {
None
};
pub async fn new(handle: Handle, tls: TlsConfig) -> Result<Self, MtopError> {
let server_name = if tls.enabled { tls.server_name.clone() } else { None };

let client_config = if config.tls.enabled {
Some(Arc::new(tls_client_config(handle, config.tls).await?))
let client_config = if tls.enabled {
Some(Arc::new(tls_client_config(handle, tls).await?))
} else {
None
};
Expand Down Expand Up @@ -182,10 +150,13 @@ impl ValuesResponse {
}

#[derive(Debug)]
pub struct MemcachedClient {
pub struct MemcachedClient<F>
where
F: ClientFactory<Server, Memcached> + Send + Sync + 'static,
{
handle: Handle,
selector: SelectorRendezvous,
pool: Arc<MemcachedPool>,
pool: Arc<ClientPool<Server, Memcached, F>>,
}

/// Run a method for a particular server in a spawned future.
Expand Down Expand Up @@ -272,18 +243,26 @@ macro_rules! operation_for_all {
}};
}

impl MemcachedClient {
impl<F> MemcachedClient<F>
where
F: ClientFactory<Server, Memcached> + Send + Sync + 'static,
{
/// Create a new `MemcachedClient` instance.
///
/// `handle` is used to spawn multiple async tasks to fetch data from servers in
/// parallel. `selector` is used to determine which server "owns" a particular key.
/// `pool` is used for pooling or establishing new connections to each server as
/// needed.
pub fn new(handle: Handle, selector: SelectorRendezvous, pool: MemcachedPool) -> Self {
pub fn new(cfg: MemcachedClientConfig, handle: Handle, selector: SelectorRendezvous, factory: F) -> Self {
let pool_config = ClientPoolConfig {
name: "memcached-tcp".to_owned(),
max_idle: cfg.pool_max_idle,
};

Self {
handle,
selector,
pool: Arc::new(pool),
pool: Arc::new(ClientPool::new(pool_config, factory)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion mtop-client/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ mod test {
#[tokio::test]
async fn test_memcached_ping_success() {
let (mut rx, mut client) = client!("VERSION 1.6.22\r\n");
let _res = client.ping().await.unwrap();
client.ping().await.unwrap();

let bytes = rx.recv().await.unwrap();
let command = String::from_utf8(bytes).unwrap();
Expand Down
Loading

0 comments on commit ddb4bf8

Please sign in to comment.