Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unwraps in MessageLoop #283

Merged
merged 66 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
fcb0cd0
cargo clippy fixes
simbleau Jun 17, 2023
849b31a
Merge branch 'main' of github.com:vectorgameexperts/matchbox
simbleau Jun 18, 2023
2f99f7d
Merge branch 'main' of github.com:johanhelsing/matchbox
simbleau Jul 22, 2023
cbf74ee
Merge branch 'main' of github.com:vectorgameexperts/matchbox into unw…
simbleau Jul 29, 2023
4a498dd
code cleanup
simbleau Jul 29, 2023
a2c5580
removed unwraps
simbleau Jul 29, 2023
9c5a7b6
renamed MessageLoopSendError to MessageSendError
simbleau Jul 29, 2023
8589cbe
cargo fmt
simbleau Jul 29, 2023
7baafe8
Update matchbox_socket/src/webrtc_socket/native.rs
simbleau Aug 9, 2023
e59e5df
consolidated errors
simbleau Sep 12, 2023
bb0b8e9
removed unused deps
simbleau Sep 12, 2023
df2f425
fix wasm errors
simbleau Sep 12, 2023
7e71875
Merge branch 'main' into unwrap-healing
simbleau Sep 12, 2023
c234135
added tracing subscriber
simbleau Sep 12, 2023
557c09b
treat KeepAlive failure as fatal
simbleau Sep 12, 2023
ec6d833
end connection on send failure
simbleau Sep 12, 2023
7e97c3c
now treating bad send to peer as a warning
simbleau Sep 12, 2023
498025b
updated docs
simbleau Sep 12, 2023
c77c57e
log message loop errors
simbleau Sep 12, 2023
a7eaa33
exit clean on peer id send failure
simbleau Sep 14, 2023
d65b7e5
removed unused error
simbleau Sep 14, 2023
b8f80f7
adjusted fmt of messaging error
simbleau Sep 14, 2023
b55ed21
code cleanup
simbleau Jul 29, 2023
5537a3c
removed unwraps
simbleau Jul 29, 2023
e01ccca
renamed MessageLoopSendError to MessageSendError
simbleau Jul 29, 2023
c584be6
cargo fmt
simbleau Jul 29, 2023
5c740c9
Update matchbox_socket/src/webrtc_socket/native.rs
simbleau Aug 9, 2023
27d62b5
consolidated errors
simbleau Sep 12, 2023
0424307
removed unused deps
simbleau Sep 12, 2023
379fcc3
fix wasm errors
simbleau Sep 12, 2023
3d44437
added tracing subscriber
simbleau Sep 12, 2023
5caa6dc
treat KeepAlive failure as fatal
simbleau Sep 12, 2023
547b5e4
end connection on send failure
simbleau Sep 12, 2023
4122a98
now treating bad send to peer as a warning
simbleau Sep 12, 2023
b97692d
updated docs
simbleau Sep 12, 2023
b85b28a
log message loop errors
simbleau Sep 12, 2023
97b6ca7
exit clean on peer id send failure
simbleau Sep 14, 2023
fbd0da3
removed unused error
simbleau Sep 14, 2023
5a0b3e2
adjusted fmt of messaging error
simbleau Sep 14, 2023
ce74f7f
Update matchbox_socket/src/webrtc_socket/mod.rs
simbleau Sep 15, 2023
b1683bc
Merge branch 'main' into unwrap-healing
simbleau Sep 15, 2023
e2dc9d5
build: additional gitignore files
simbleau Sep 16, 2023
3275e7c
build: flatten error struct
simbleau Sep 16, 2023
7f54be4
Merge branch 'unwrap-healing' of github.com:vectorgameexperts/matchbo…
simbleau Sep 16, 2023
cbe1f12
fix: remap JsPacket to Disconnected
simbleau Sep 16, 2023
6dfc190
fix: wasm compat
simbleau Sep 16, 2023
6fb0a89
feat: add `WebRtcChannel::try_send`
simbleau Sep 16, 2023
cce25e2
build: attach source to error
simbleau Sep 18, 2023
08ef38a
fix: break with Err on foreign channel close
simbleau Sep 18, 2023
cb90b1f
feat: added `is_closed` to channel/socket for help checking health
simbleau Sep 19, 2023
2f37301
Merge branch 'main' of github.com:johanhelsing/matchbox into unwrap-h…
simbleau Sep 19, 2023
294c909
fix: address comment
simbleau Sep 21, 2023
6f415b7
docs: fmt
simbleau Sep 21, 2023
7043e17
docs: fix comment
simbleau Sep 21, 2023
b0d9872
fix: remove runtime error
simbleau Sep 21, 2023
c0bf5fe
feat: add `try_send` to `SingleChannel` socket
simbleau Sep 21, 2023
a720093
fix: remove is_closed
simbleau Sep 21, 2023
c836688
docs: change `receiver` to `socket future`
simbleau Sep 21, 2023
49b8003
fix: renamed `JsPacket` and `Packet`
simbleau Sep 21, 2023
9a413fd
Add error_handling_example
johanhelsing Sep 22, 2023
da1bcc0
fixup: Don't error on peer disconnect after dropping socket
johanhelsing Sep 22, 2023
5ea1bb1
Remove non-errors
johanhelsing Sep 22, 2023
559c922
Remove conversion footgun
johanhelsing Sep 22, 2023
e94eb08
Rework error API
johanhelsing Sep 22, 2023
1fef9fe
docs: Update readme for error handling example
johanhelsing Sep 22, 2023
c7d977e
Merge pull request #3 from johanhelsing/error-handling-api
simbleau Sep 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ crates/*/target
/.vscode
/benches/target
*.code-workspace
.DS_Store
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions matchbox_signaling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ matchbox_protocol = { version = "0.7", path = "../matchbox_protocol", features =
axum = { version = "0.6", features = ["ws"] }
hyper = { version = "0.14", features = ["server"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower-http = { version = "0.4", features = ["cors", "trace"] }
tokio = { version = "1.32", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
uuid = { version = "1.4", features = ["serde", "v4"] }
clap = { version = "4.3", features = ["derive", "env"] }
thiserror = "1.0"
tokio-stream = "0.1"
async-trait = { version = "0.1" }

[dev-dependencies]
tokio-tungstenite = "0.20.0"
tracing-subscriber = "0.3"
9 changes: 6 additions & 3 deletions matchbox_signaling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ pub mod topologies;

pub use error::Error;
pub use signaling_server::{
builder::SignalingServerBuilder, callbacks::Callback, error::ClientRequestError,
error::SignalingError, handlers::WsStateMeta, server::SignalingServer, NoCallbacks, NoState,
SignalingCallbacks, SignalingState,
builder::SignalingServerBuilder,
callbacks::Callback,
error::{ClientRequestError, SignalingError},
handlers::WsStateMeta,
server::SignalingServer,
NoCallbacks, NoState, SignalingCallbacks, SignalingState,
};
pub use topologies::{common_logic, SignalingTopology};
3 changes: 2 additions & 1 deletion matchbox_signaling/src/topologies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ where
async fn state_machine(upgrade: WsStateMeta<Cb, S>);
}

/// Common, re-usable logic and types shared between topologies and which may be useful if building your own topology.
/// Common, re-usable logic and types shared between topologies and which may be useful if building
/// your own topology.
simbleau marked this conversation as resolved.
Show resolved Hide resolved
pub mod common_logic {
use crate::signaling_server::error::{ClientRequestError, SignalingError};
use axum::extract::ws::{Message, WebSocket};
Expand Down
1 change: 0 additions & 1 deletion matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ once_cell = { version = "1.17", default-features = false, features = [
"race",
"alloc",
] }
crossbeam-channel = "0.5"
derive_more = "0.99"

ggrs = { version = "0.9", default-features = false, optional = true }
Expand Down
24 changes: 20 additions & 4 deletions matchbox_socket/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
use crate::webrtc_socket::error::SignalingError;

/// Errors that can happen when using Matchbox.
/// Errors that can happen when using Matchbox sockets.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// An error occurring during the signaling loop.
#[error("An error in the signaling loop: {0}")]
Signaling(#[from] SignalingError),
/// An error occurring if the connection fails to establish. Perhaps check your connection or
/// try again.
#[error("The connection failed to establish. Check your connection and try again.")]
ConnectionFailed {
/// The source of the connection failure
source: SignalingError,
},
/// Unexpected fatal error ocurred with messaging. Please file an issue or triage.
#[error("An unexpected error ocurred at runtime with messaging: {source}")]
Runtime {
/// The source of the connection failure
source: SignalingError,
},
johanhelsing marked this conversation as resolved.
Show resolved Hide resolved
/// Kicked by the server or disconnected
#[error("The signaling server connection was severed.")]
Disconnected {
/// The source of the connection failure
source: SignalingError,
},
}
2 changes: 1 addition & 1 deletion matchbox_socket/src/ggrs_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl WebRtcSocket {

impl WebRtcSocket {
/// Returns a Vec of connected peers as [`ggrs::PlayerType`]
pub fn players(&self) -> Vec<PlayerType<PeerId>> {
pub fn players(&mut self) -> Vec<PlayerType<PeerId>> {
let Some(our_id) = self.id() else {
// we're still waiting for the server to initialize our id
// no peers should be added at this point anyway
Expand Down
27 changes: 13 additions & 14 deletions matchbox_socket/src/webrtc_socket/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::webrtc_socket::messages::PeerEvent;
use cfg_if::cfg_if;
use futures_channel::mpsc::TrySendError;
use futures_channel::mpsc::{SendError, TrySendError};

/// An error that can occur when getting a socket's channel through
/// `get_channel`, `take_channel` or `try_update_peers`.
Expand All @@ -20,18 +20,21 @@ pub enum ChannelError {
Closed,
}

/// An error that can occur with WebRTC signaling.
/// An error that can occur with WebRTC messaging.
#[derive(Debug, thiserror::Error)]
pub enum SignalingError {
// Common
#[error("failed to send event to signaling server: {0}")]
#[error("failed to send to signaling server: {0}")]
Undeliverable(#[from] TrySendError<PeerEvent>),

#[error("The stream is exhausted")]
StreamExhausted,

#[error("Message received in unknown format")]
UnknownFormat,

#[error("failed to establish initial connection: {0}")]
ConnectionFailed(#[from] Box<SignalingError>),
NegotiationFailed(#[from] Box<SignalingError>),

// Native
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -42,18 +45,14 @@ pub enum SignalingError {
#[cfg(target_arch = "wasm32")]
#[error("socket failure communicating with signaling server: {0}")]
Socket(#[from] ws_stream_wasm::WsErr),
}

/// An error that can occur with WebRTC messaging.
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, thiserror::Error)]
#[error("failed to send message to peer: {0}")]
pub(crate) struct MessagingError(#[from] futures_channel::mpsc::TrySendError<crate::Packet>);
#[error("failed to send message to peer over javascript: {0}")]
#[cfg(target_arch = "wasm32")]
JsPacket(#[from] JsError),

#[cfg(target_arch = "wasm32")]
#[derive(Debug, thiserror::Error)]
#[error("failed to send message to peer: {0}")]
pub(crate) struct MessagingError(#[from] JsError);
#[error("failed to send message to peer: {0}")]
Packet(#[from] SendError),
}

cfg_if! {
if #[cfg(target_arch = "wasm32")] {
Expand Down
53 changes: 34 additions & 19 deletions matchbox_socket/src/webrtc_socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ mod messages;
mod signal_peer;
mod socket;

use self::error::{MessagingError, SignalingError};
use self::error::SignalingError;
use crate::{webrtc_socket::signal_peer::SignalPeer, Error};
use async_trait::async_trait;
use cfg_if::cfg_if;
use futures::{future::Either, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures_channel::mpsc::{TrySendError, UnboundedReceiver, UnboundedSender};
use futures_timer::Delay;
use futures_util::select;
use log::{debug, warn};
use log::{debug, error, warn};
use matchbox_protocol::PeerId;
use messages::*;
pub(crate) use socket::MessageLoopChannels;
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn signaling_loop<S: Signaller>(
pub type Packet = Box<[u8]>;

trait PeerDataSender {
fn send(&mut self, packet: Packet) -> Result<(), MessagingError>;
fn send(&mut self, packet: Packet) -> Result<(), SignalingError>;
}

struct HandshakeResult<D: PeerDataSender, M> {
Expand Down Expand Up @@ -123,12 +123,12 @@ trait Messenger {
}

async fn message_loop<M: Messenger>(
id_tx: crossbeam_channel::Sender<PeerId>,
id_tx: futures_channel::oneshot::Sender<PeerId>,
ice_server_config: &RtcIceServerConfig,
channel_configs: &[ChannelConfig],
channels: MessageLoopChannels,
keep_alive_interval: Option<Duration>,
) {
) -> Result<(), SignalingError> {
let MessageLoopChannels {
requests_sender,
mut events_receiver,
Expand All @@ -141,6 +141,7 @@ async fn message_loop<M: Messenger>(
let mut peer_loops = FuturesUnordered::new();
let mut handshake_signals = HashMap::new();
let mut data_channels = HashMap::new();
let mut id_tx = Option::Some(id_tx);

let mut timeout = if let Some(interval) = keep_alive_interval {
Either::Left(Delay::new(interval))
Expand All @@ -160,26 +161,34 @@ async fn message_loop<M: Messenger>(

select! {
_ = &mut timeout => {
requests_sender.unbounded_send(PeerRequest::KeepAlive).expect("send failed");
// UNWRAP: we will only ever get here if there already was a timeout
let interval = keep_alive_interval.unwrap();
timeout = Either::Left(Delay::new(interval)).fuse();
requests_sender.unbounded_send(PeerRequest::KeepAlive).map_err(TrySendError::into_send_error)?;
if let Some(interval) = keep_alive_interval {
timeout = Either::Left(Delay::new(interval)).fuse();
} else {
error!("no keep alive timeout, please file a bug");
}
simbleau marked this conversation as resolved.
Show resolved Hide resolved
}

message = events_receiver.next().fuse() => {
if let Some(event) = message {
debug!("{event:?}");
match event {
PeerEvent::IdAssigned(peer_uuid) => {
id_tx.try_send(peer_uuid.to_owned()).unwrap();
if id_tx.take().expect("already sent peer id").send(peer_uuid.to_owned()).is_err() {
// Socket receiver was dropped, exit cleanly.
break Ok(());
};
},
PeerEvent::NewPeer(peer_uuid) => {
let (signal_tx, signal_rx) = futures_channel::mpsc::unbounded();
handshake_signals.insert(peer_uuid, signal_tx);
let signal_peer = SignalPeer::new(peer_uuid, requests_sender.clone());
handshakes.push(M::offer_handshake(signal_peer, signal_rx, messages_from_peers_tx.clone(), ice_server_config, channel_configs))
},
PeerEvent::PeerLeft(peer_uuid) => {peer_state_tx.unbounded_send((peer_uuid, PeerState::Disconnected)).expect("fail to report peer as disconnected");},
PeerEvent::PeerLeft(peer_uuid) => {
peer_state_tx.unbounded_send((peer_uuid, PeerState::Disconnected))
.map_err(TrySendError::into_send_error)?;
simbleau marked this conversation as resolved.
Show resolved Hide resolved
},
PeerEvent::Signal { sender, data } => {
let signal_tx = handshake_signals.entry(sender).or_insert_with(|| {
let (from_peer_tx, peer_signal_rx) = futures_channel::mpsc::unbounded();
Expand All @@ -198,13 +207,15 @@ async fn message_loop<M: Messenger>(

handshake_result = handshakes.select_next_some() => {
data_channels.insert(handshake_result.peer_id, handshake_result.data_channels);
peer_state_tx.unbounded_send((handshake_result.peer_id, PeerState::Connected)).expect("failed to report peer as connected");
peer_state_tx.unbounded_send((handshake_result.peer_id, PeerState::Connected))
.map_err(TrySendError::into_send_error)?;
simbleau marked this conversation as resolved.
Show resolved Hide resolved
peer_loops.push(M::peer_loop(handshake_result.peer_id, handshake_result.metadata));
}

peer_uuid = peer_loops.select_next_some() => {
debug!("peer {peer_uuid} finished");
peer_state_tx.unbounded_send((peer_uuid, PeerState::Disconnected)).expect("failed to report peer as disconnected");
peer_state_tx.unbounded_send((peer_uuid, PeerState::Disconnected))
.map_err(TrySendError::into_send_error)?;
}

message = next_peer_message_out => {
Expand All @@ -214,21 +225,25 @@ async fn message_loop<M: Messenger>(
.get_mut(&peer)
.expect("couldn't find data channel for peer")
.get_mut(channel_index).unwrap_or_else(|| panic!("couldn't find data channel with index {channel_index}"));
data_channel.send(packet).unwrap();

if let Err(e) = data_channel.send(packet) {
// Peer we're sending to closed their end of the connection.
// We anticipate the PeerLeft event soon, but we sent a message before it came.
// Do nothing. Only log it.
warn!("failed to send to peer {peer} (socket closed): {e:?}");
};
simbleau marked this conversation as resolved.
Show resolved Hide resolved
}
Some((_, None)) | None => {
// Receiver end of outgoing message channel closed,
// which most likely means the socket was dropped.
// There could probably be cleaner ways to handle this,
// but for now, just exit cleanly.
debug!("Outgoing message queue closed");
break;
warn!("Outgoing message queue closed, message not sent");
break Err(SignalingError::StreamExhausted);
}
}
}

complete => break
complete => break Ok(())
}
}
}
12 changes: 7 additions & 5 deletions matchbox_socket/src/webrtc_socket/native.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{error::MessagingError, HandshakeResult, PeerDataSender};
use super::{HandshakeResult, PeerDataSender};
use crate::{
webrtc_socket::{
error::SignalingError,
Expand All @@ -22,7 +22,7 @@ use futures::{
stream::FuturesUnordered,
Future, FutureExt, SinkExt, StreamExt,
};
use futures_channel::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender};
use futures_channel::mpsc::{Receiver, Sender, TrySendError, UnboundedReceiver, UnboundedSender};
use futures_timer::Delay;
use futures_util::{lock::Mutex, select};
use log::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -54,7 +54,7 @@ impl Signaller for NativeSignaller {
Err(e) => {
if let Some(attempts) = attempts.as_mut() {
if *attempts <= 1 {
return Err(SignalingError::ConnectionFailed(Box::new(e)));
return Err(SignalingError::NegotiationFailed(Box::new(e)));
} else {
*attempts -= 1;
warn!("connection to signaling server failed, {attempts} attempt(s) remain");
Expand Down Expand Up @@ -92,8 +92,10 @@ impl Signaller for NativeSignaller {
pub(crate) struct NativeMessenger;

impl PeerDataSender for UnboundedSender<Packet> {
fn send(&mut self, packet: Packet) -> Result<(), super::error::MessagingError> {
self.unbounded_send(packet).map_err(MessagingError::from)
fn send(&mut self, packet: Packet) -> Result<(), SignalingError> {
Ok(self
.unbounded_send(packet)
.map_err(TrySendError::into_send_error)?)
}
}

Expand Down
Loading
Loading