Skip to content

Commit

Permalink
[OPTIMIZATION] least connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ismoilovdevml committed Oct 15, 2023
1 parent 5f4d663 commit 38c7900
Showing 1 changed file with 19 additions and 37 deletions.
56 changes: 19 additions & 37 deletions src/algorithms/least_connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::{collections::HashMap, sync::RwLock};

use hyper::{Body, Request, Uri};
use rand::{thread_rng, Rng};

use super::{Context, LoadBalancingStrategy, RequestForwarder};

#[derive(Debug)]
Expand All @@ -29,49 +27,33 @@ impl LoadBalancingStrategy for LeastConnection {
fn on_tcp_close(&self, remote: &Uri) {
if let Some(authority) = remote.authority() {
let mut connections = self.connections.write().unwrap();
*connections.entry(authority.to_string()).or_insert(1) -= 1;
if let Some(count) = connections.get_mut(&authority.to_string()) {
*count -= 1;
}
}
}

fn select_backend<'l>(&'l self, _request: &Request<Body>, context: &'l Context) -> RequestForwarder {
// ok to unwrap - only panics when we panic somewhere else :)
let connections = self.connections.read().unwrap();

let address_indices: Vec<usize> = if connections.len() == 0 || context.backend_addresses.len() > connections.len() {
// if no TCP connections have been opened yet, or some backend servers are not used yet, we'll use them for the next request
context
.backend_addresses
.iter()
.enumerate()
.filter(|(_, address)| !connections.contains_key(**address))
.map(|(index, _)| index)
.collect()
} else {
let backend_address_map = context
.backend_addresses
.iter()
.enumerate()
.map(|(index, address)| (*address, index))
.collect::<HashMap<_, _>>();
let mut least_connections = connections.iter().collect::<Vec<_>>();

least_connections.sort_by(|a, b| a.1.cmp(b.1));

let min_connection_count = least_connections[0].1;
least_connections
.iter()
.take_while(|(_, connection_count)| *connection_count == min_connection_count)
.map(|tuple| tuple.0)
.map(|address| *backend_address_map.get(address.as_str()).unwrap())
.collect()
};
// Find the address with the least number of connections.
let mut least_connection_address = None;
let mut least_connection_count = usize::MAX;
for address in context.backend_addresses.iter() {
let connection_count = *connections.get(*address).unwrap_or(&0);
if connection_count < least_connection_count {
least_connection_address = Some(address);
least_connection_count = connection_count;
}
}

if address_indices.len() == 1 {
RequestForwarder::new(&context.backend_addresses[address_indices[0]])
if let Some(address) = least_connection_address {
RequestForwarder::new(address)
} else {
// Fall back to random if there's a tie.
let mut rng = thread_rng();
let index = rng.gen_range(0..address_indices.len());
RequestForwarder::new(&context.backend_addresses[address_indices[index]])
let index = rng.gen_range(0..context.backend_addresses.len());
RequestForwarder::new(&context.backend_addresses[index])
}
}
}
Expand Down Expand Up @@ -124,4 +106,4 @@ mod tests {
context.backend_addresses[0]
);
}
}
}

0 comments on commit 38c7900

Please sign in to comment.