Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove glommio runtime #9

Merged
merged 3 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@

Akasa 是一个 Rust 写的高性能,低延迟,高度可扩展的 MQTT 服务器。

Akasa 用 [glommio][glommio] 来实现高性能低延迟的网络 IO. 它底层的 MQTT 协议消息编解码器 ([mqtt-proto][mqtt-proto]) 是为了高性能和 async 环境而精心设计实现的。
它底层的 MQTT 协议消息编解码器 ([mqtt-proto][mqtt-proto]) 是为了高性能和 async 环境而精心设计实现的。

## 特性
- [x] 完全支持 MQTT v3.1/v3.1.1/v5.0
- [x] 支持 TLS (包括双向认证)
- [x] 支持 WebSocket (包括 TLS 支持)
- [x] 支持 [Proxy Protocol V2][proxy-protocol]
- [x] 使用 `io_uring` ([glommio][glommio]) 来实现高性能低延迟 IO (非 Linux 环境可以用 tokio)
- [x] 使用 [Hook trait][hook-trait] 来扩展服务器
- [x] 用一个密码文件来支持简单的认证
- [ ] 基于 Raft 的服务器集群 (*敬请期待*)
Expand Down Expand Up @@ -95,7 +94,6 @@ Akasa 会有一个企业版本,企业版中的额外功能包括:
[mqtt-proto]: https://github.com/akasamq/mqtt-proto
[mqtt-proto-fuzz]: https://github.com/akasamq/mqtt-proto/tree/master/fuzz
[proxy-protocol]: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
[glommio]: https://github.com/DataDog/glommio
[bsl]: https://mariadb.com/bsl-faq-mariadb/
[hook-trait]: https://github.com/akasamq/akasa/blob/5ade2d788d9a919671f81b01d720155caf8e4e2d/akasa-core/src/hook.rs#L43
[tensorflow]: https://blog.tensorflow.org/2020/09/supercharging-tensorflowjs-webassembly.html
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ English | [简体中文](README-CN.md)

Akasa is a high performance, low latency and high extendable MQTT server in Rust.

It uses [glommio][glommio] for high performance and low latency network IO. The underlying MQTT protocol message codec ([mqtt-proto][mqtt-proto]) is carefully crafted for high performance and async environment.
The underlying MQTT protocol message codec ([mqtt-proto][mqtt-proto]) is carefully crafted for high performance and async environment.

## Features
- [x] Full support MQTT v3.1/v3.1.1/v5.0
- [x] Support TLS (include two-way authentication)
- [x] Support WebSocket (include TLS support)
- [x] Support [Proxy Protocol V2][proxy-protocol]
- [x] Use `io_uring` ([glommio][glommio]) for high performance low latency IO (can use tokio on non-Linux OS)
- [x] Use a [Hook trait][hook-trait] to extend the server
- [x] Simple password file based authentication
- [ ] Raft based cluster (*coming soon*)
Expand Down Expand Up @@ -100,7 +99,6 @@ Akasa will have an enterprise edition. In this edition, it provides:
[mqtt-proto]: https://github.com/akasamq/mqtt-proto
[mqtt-proto-fuzz]: https://github.com/akasamq/mqtt-proto/tree/master/fuzz
[proxy-protocol]: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
[glommio]: https://github.com/DataDog/glommio
[bsl]: https://mariadb.com/bsl-faq-mariadb/
[hook-trait]: https://github.com/akasamq/akasa/blob/5ade2d788d9a919671f81b01d720155caf8e4e2d/akasa-core/src/hook.rs#L43
[tensorflow]: https://blog.tensorflow.org/2020/09/supercharging-tensorflowjs-webassembly.html
Expand Down
6 changes: 2 additions & 4 deletions akasa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ parking_lot = "0.12.1"
serde = { version = "1.0.147", features = ["derive"] }
thiserror = "1.0.38"
tokio = { version = "1.23.0", features = ["full"] }
tokio-tungstenite = "0.20.1"
tokio-openssl = "0.6.3"
uuid = { version = "1.2.2", features = ["v4"] }
rand = { version = "0.8.5", features = ["getrandom"] }
ahash = "0.8.3"
Expand All @@ -35,10 +37,6 @@ base64 = "0.21.0"
ring = "0.16"
crc32c = "0.6.3"
openssl = "0.10.51"
async-tungstenite = "0.21.0"

[target.'cfg(target_os = "linux")'.dependencies]
glommio = { version = "0.8.0" }

[dev-dependencies]
futures-sink = "0.3.26"
Expand Down
17 changes: 9 additions & 8 deletions akasa-core/src/protocols/mqtt/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::time::{Duration, Instant};

use parking_lot::RwLock;

use crate::state::{ClientId, ControlMessage, Executor, GlobalState};
use crate::state::{ClientId, ControlMessage, GlobalState};

pub(crate) fn start_keep_alive_timer<E: Executor>(
pub(crate) fn start_keep_alive_timer(
keep_alive: u16,
client_id: ClientId,
last_packet_time: &Arc<RwLock<Instant>>,
executor: &E,
global: &Arc<GlobalState>,
) -> io::Result<()> {
// FIXME: if kee_alive is zero, set a default keep_alive value from config
Expand All @@ -19,7 +18,7 @@ pub(crate) fn start_keep_alive_timer<E: Executor>(
log::debug!("{} keep alive: {:?}", client_id, half_interval * 2);
let last_packet_time = Arc::clone(last_packet_time);
let global = Arc::clone(global);
if let Err(err) = executor.spawn_interval(move || {
let action_gen = move || {
// Need clone twice: https://stackoverflow.com/a/68462908/1274372
let last_packet_time = Arc::clone(&last_packet_time);
let global = Arc::clone(&global);
Expand All @@ -45,10 +44,12 @@ pub(crate) fn start_keep_alive_timer<E: Executor>(
}
None
}
}) {
log::error!("spawn executor keep alive timer failed: {:?}", err);
return Err(err);
}
};
tokio::spawn(async move {
while let Some(duration) = action_gen().await {
tokio::time::sleep(duration).await;
}
});
}
Ok(())
}
6 changes: 2 additions & 4 deletions akasa-core/src/protocols/mqtt/online_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ use flume::{
r#async::{RecvStream, SendSink},
Sender,
};
use futures_lite::{
io::{AsyncRead, AsyncWrite},
Stream,
};
use futures_lite::Stream;
use futures_sink::Sink;
use hashbrown::HashMap;
use mqtt_proto::{v3, v5, GenericPollPacket, GenericPollPacketState, PollHeader, QoS, VarBytes};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::hook::{handle_request, Hook, HookAction, HookRequest, HookResponse};
use crate::state::{ClientId, ClientReceiver, ControlMessage, GlobalState, NormalMessage};
Expand Down
40 changes: 10 additions & 30 deletions akasa-core/src/protocols/mqtt/v3/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use flume::{Receiver, Sender};
use futures_lite::{
io::{AsyncRead, AsyncWrite},
FutureExt,
};
use futures_lite::FutureExt;
use hashbrown::HashMap;
use mqtt_proto::{
v3::{
Expand All @@ -17,6 +14,7 @@ use mqtt_proto::{
},
Error, Pid, Protocol, QoS, QosPid,
};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::hook::{
handle_request, Hook, HookAction, HookRequest, HookResponse, LockedHookContext, PublishAction,
Expand All @@ -25,9 +23,7 @@ use crate::hook::{
use crate::protocols::mqtt::{
BroadcastPackets, OnlineLoop, OnlineSession, PendingPackets, WritePacket,
};
use crate::state::{
ClientId, ClientReceiver, ControlMessage, Executor, GlobalState, NormalMessage,
};
use crate::state::{ClientId, ClientReceiver, ControlMessage, GlobalState, NormalMessage};

use super::{
packet::{
Expand All @@ -45,7 +41,6 @@ use super::{
#[allow(clippy::too_many_arguments)]
pub async fn handle_connection<
T: AsyncRead + AsyncWrite + Unpin,
E: Executor,
H: Hook + Clone + Send + Sync + 'static,
>(
conn: T,
Expand All @@ -54,7 +49,6 @@ pub async fn handle_connection<
protocol: Protocol,
timeout_receiver: Receiver<()>,
hook_handler: H,
executor: E,
global: Arc<GlobalState>,
) -> io::Result<()> {
match handle_online(
Expand All @@ -64,35 +58,31 @@ pub async fn handle_connection<
protocol,
timeout_receiver,
&hook_handler,
&executor,
&global,
)
.await
{
Ok(Some((session, receiver))) => {
log::info!(
"executor {:03}, {}({}) go to offline, total {} clients ({} online)",
executor.id(),
"{}({}) go to offline, total {} clients ({} online)",
session.client_id,
peer,
global.clients_count(),
global.online_clients_count(),
);
executor.spawn_local(handle_offline(session, receiver, global));
tokio::spawn(handle_offline(session, receiver, global));
}
Ok(None) => {
log::info!(
"executor {:03}, {} finished, total {} clients ({} online)",
executor.id(),
"{} finished, total {} clients ({} online)",
peer,
global.clients_count(),
global.online_clients_count(),
);
}
Err(err) => {
log::info!(
"executor {:03}, {} error: {}, total {} clients ({} online)",
executor.id(),
"{} error: {}, total {} clients ({} online)",
peer,
err,
global.clients_count(),
Expand All @@ -107,7 +97,6 @@ pub async fn handle_connection<
#[allow(clippy::too_many_arguments)]
async fn handle_online<
T: AsyncRead + AsyncWrite + Unpin,
E: Executor,
H: Hook + Clone + Send + Sync + 'static,
>(
mut conn: T,
Expand All @@ -116,7 +105,6 @@ async fn handle_online<
protocol: Protocol,
timeout_receiver: Receiver<()>,
hook_handler: &H,
executor: &E,
global: &Arc<GlobalState>,
) -> io::Result<Option<(Session, ClientReceiver)>> {
let mut session = Session::new(&global.config, peer);
Expand All @@ -143,15 +131,8 @@ async fn handle_online<
before_connect_hook(peer, &packet, hook_handler, global).await?;
}

let session_present = handle_connect(
&mut session,
&mut receiver,
packet,
&mut conn,
executor,
global,
)
.await?;
let session_present =
handle_connect(&mut session, &mut receiver, packet, &mut conn, global).await?;

if !session.connected {
log::info!("{} not connected", session.peer);
Expand All @@ -169,8 +150,7 @@ async fn handle_online<

let receiver = receiver.expect("receiver");
log::info!(
"executor {:03}, {} connected, total {} clients ({} online) ",
executor.id(),
"{} connected, total {} clients ({} online) ",
session.peer,
global.clients_count(),
global.online_clients_count(),
Expand Down
2 changes: 1 addition & 1 deletion akasa-core/src/protocols/mqtt/v3/packet/common.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::io;
use std::time::Instant;

use futures_lite::io::AsyncWrite;
use mqtt_proto::{
v3::{Packet, Publish},
QoS, QosPid,
};
use tokio::io::AsyncWrite;

use crate::protocols::mqtt::{get_unix_ts, PendingPacketStatus};
use crate::state::ClientId;
Expand Down
8 changes: 3 additions & 5 deletions akasa-core/src/protocols/mqtt/v3/packet/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@ use std::io;
use std::sync::Arc;
use std::time::Instant;

use futures_lite::io::AsyncWrite;
use mqtt_proto::{
v3::{Connack, Connect, ConnectReturnCode},
Protocol,
};
use tokio::io::AsyncWrite;

use crate::protocols::mqtt::{check_password, start_keep_alive_timer};
use crate::state::{AddClientReceipt, ClientReceiver, Executor, GlobalState};
use crate::state::{AddClientReceipt, ClientReceiver, GlobalState};

use super::super::Session;
use super::common::write_packet;

pub(crate) async fn handle_connect<T: AsyncWrite + Unpin, E: Executor>(
pub(crate) async fn handle_connect<T: AsyncWrite + Unpin>(
session: &mut Session,
receiver: &mut Option<ClientReceiver>,
packet: Connect,
conn: &mut T,
executor: &E,
global: &Arc<GlobalState>,
) -> io::Result<bool> {
log::debug!(
Expand Down Expand Up @@ -152,7 +151,6 @@ clean session : {}
session.keep_alive,
session.client_id,
&session.last_packet_time,
executor,
global,
)?;

Expand Down
Loading
Loading