Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unwraps in MessageLoop #283

Merged
merged 66 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
fcb0cd0
cargo clippy fixes
simbleau Jun 17, 2023
849b31a
Merge branch 'main' of github.com:vectorgameexperts/matchbox
simbleau Jun 18, 2023
2f99f7d
Merge branch 'main' of github.com:johanhelsing/matchbox
simbleau Jul 22, 2023
cbf74ee
Merge branch 'main' of github.com:vectorgameexperts/matchbox into unw…
simbleau Jul 29, 2023
4a498dd
code cleanup
simbleau Jul 29, 2023
a2c5580
removed unwraps
simbleau Jul 29, 2023
9c5a7b6
renamed MessageLoopSendError to MessageSendError
simbleau Jul 29, 2023
8589cbe
cargo fmt
simbleau Jul 29, 2023
7baafe8
Update matchbox_socket/src/webrtc_socket/native.rs
simbleau Aug 9, 2023
e59e5df
consolidated errors
simbleau Sep 12, 2023
bb0b8e9
removed unused deps
simbleau Sep 12, 2023
df2f425
fix wasm errors
simbleau Sep 12, 2023
7e71875
Merge branch 'main' into unwrap-healing
simbleau Sep 12, 2023
c234135
added tracing subscriber
simbleau Sep 12, 2023
557c09b
treat KeepAlive failure as fatal
simbleau Sep 12, 2023
ec6d833
end connection on send failure
simbleau Sep 12, 2023
7e97c3c
now treating bad send to peer as a warning
simbleau Sep 12, 2023
498025b
updated docs
simbleau Sep 12, 2023
c77c57e
log message loop errors
simbleau Sep 12, 2023
a7eaa33
exit clean on peer id send failure
simbleau Sep 14, 2023
d65b7e5
removed unused error
simbleau Sep 14, 2023
b8f80f7
adjusted fmt of messaging error
simbleau Sep 14, 2023
b55ed21
code cleanup
simbleau Jul 29, 2023
5537a3c
removed unwraps
simbleau Jul 29, 2023
e01ccca
renamed MessageLoopSendError to MessageSendError
simbleau Jul 29, 2023
c584be6
cargo fmt
simbleau Jul 29, 2023
5c740c9
Update matchbox_socket/src/webrtc_socket/native.rs
simbleau Aug 9, 2023
27d62b5
consolidated errors
simbleau Sep 12, 2023
0424307
removed unused deps
simbleau Sep 12, 2023
379fcc3
fix wasm errors
simbleau Sep 12, 2023
3d44437
added tracing subscriber
simbleau Sep 12, 2023
5caa6dc
treat KeepAlive failure as fatal
simbleau Sep 12, 2023
547b5e4
end connection on send failure
simbleau Sep 12, 2023
4122a98
now treating bad send to peer as a warning
simbleau Sep 12, 2023
b97692d
updated docs
simbleau Sep 12, 2023
b85b28a
log message loop errors
simbleau Sep 12, 2023
97b6ca7
exit clean on peer id send failure
simbleau Sep 14, 2023
fbd0da3
removed unused error
simbleau Sep 14, 2023
5a0b3e2
adjusted fmt of messaging error
simbleau Sep 14, 2023
ce74f7f
Update matchbox_socket/src/webrtc_socket/mod.rs
simbleau Sep 15, 2023
b1683bc
Merge branch 'main' into unwrap-healing
simbleau Sep 15, 2023
e2dc9d5
build: additional gitignore files
simbleau Sep 16, 2023
3275e7c
build: flatten error struct
simbleau Sep 16, 2023
7f54be4
Merge branch 'unwrap-healing' of github.com:vectorgameexperts/matchbo…
simbleau Sep 16, 2023
cbe1f12
fix: remap JsPacket to Disconnected
simbleau Sep 16, 2023
6dfc190
fix: wasm compat
simbleau Sep 16, 2023
6fb0a89
feat: add `WebRtcChannel::try_send`
simbleau Sep 16, 2023
cce25e2
build: attach source to error
simbleau Sep 18, 2023
08ef38a
fix: break with Err on foreign channel close
simbleau Sep 18, 2023
cb90b1f
feat: added `is_closed` to channel/socket for help checking health
simbleau Sep 19, 2023
2f37301
Merge branch 'main' of github.com:johanhelsing/matchbox into unwrap-h…
simbleau Sep 19, 2023
294c909
fix: address comment
simbleau Sep 21, 2023
6f415b7
docs: fmt
simbleau Sep 21, 2023
7043e17
docs: fix comment
simbleau Sep 21, 2023
b0d9872
fix: remove runtime error
simbleau Sep 21, 2023
c0bf5fe
feat: add `try_send` to `SingleChannel` socket
simbleau Sep 21, 2023
a720093
fix: remove is_closed
simbleau Sep 21, 2023
c836688
docs: change `receiver` to `socket future`
simbleau Sep 21, 2023
49b8003
fix: renamed `JsPacket` and `Packet`
simbleau Sep 21, 2023
9a413fd
Add error_handling_example
johanhelsing Sep 22, 2023
da1bcc0
fixup: Don't error on peer disconnect after dropping socket
johanhelsing Sep 22, 2023
5ea1bb1
Remove non-errors
johanhelsing Sep 22, 2023
559c922
Remove conversion footgun
johanhelsing Sep 22, 2023
e94eb08
Rework error API
johanhelsing Sep 22, 2023
1fef9fe
docs: Update readme for error handling example
johanhelsing Sep 22, 2023
c7d977e
Merge pull request #3 from johanhelsing/error-handling-api
simbleau Sep 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ crates/*/target
/.vscode
/benches/target
*.code-workspace
.DS_Store
17 changes: 15 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions examples/error_handling/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[target.wasm32-unknown-unknown]
runner = "wasm-server-runner"
20 changes: 20 additions & 0 deletions examples/error_handling/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "error_handling_example"
version = "0.1.0"
edition = "2021"

[dependencies]
matchbox_socket = { path = "../../matchbox_socket" }
futures-timer = { version = "3", features = ["wasm-bindgen"] }
log = { version = "0.4", default-features = false }

[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1.7"
console_log = "1.0"
futures = { version = "0.3", default-features = false }
wasm-bindgen-futures = "0.4.29"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
futures = "0.3"
tokio = "1.32"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
45 changes: 45 additions & 0 deletions examples/error_handling/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Error handling example

This example shows one way failures can be handled, logging at the appropriate points.

The example tries to connect to a room, then sends messages to all peers as quickly as possible, logging any messages received, then disconnects after a timeout.

## Instructions

- Run the matchbox-provided [`matchbox_server`](../../matchbox_server/) ([help](../../matchbox_server/README.md)), or run your own on `ws://localhost:3536/`.
- Run the demo
- [on Native](#run-on-native)
- [on WASM](#run-on-wasm)

## Run on Native

```sh
cargo run
```

## Run on WASM

### Prerequisites

Install the `wasm32-unknown-unknown` target

```sh
rustup target install wasm32-unknown-unknown
```

Install a lightweight web server

```sh
cargo install wasm-server-runner
```

### Serve

```sh
cargo run --target wasm32-unknown-unknown
```

### Run

- Use a web browser and navigate to <http://127.0.0.1:1334>
- Open the console to see execution logs
114 changes: 114 additions & 0 deletions examples/error_handling/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use futures::{select, FutureExt};
use futures_timer::Delay;
use log::{info, warn};
use matchbox_socket::{Error as SocketError, PeerId, PeerState, WebRtcSocket};
use std::time::Duration;

#[cfg(target_arch = "wasm32")]
fn main() {
// Setup logging
console_error_panic_hook::set_once();
console_log::init_with_level(log::Level::Debug).unwrap();

wasm_bindgen_futures::spawn_local(async_main());
}

#[cfg(not(target_arch = "wasm32"))]
#[tokio::main]
async fn main() {
// Setup logging
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "error_handling_example=info,matchbox_socket=info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

async_main().await
}

async fn async_main() {
info!("Connecting to matchbox");
let (mut socket, loop_fut) =
WebRtcSocket::new_reliable("ws://localhost:3536/error_handling_example");

let loop_fut = async {
match loop_fut.await {
Ok(()) => info!("Exited cleanly :)"),
Err(e) => match e {
SocketError::ConnectionFailed(e) => {
warn!("couldn't connect to signaling server, please check your connection: {e}");
// todo: show prompt and reconnect?
}
SocketError::Disconnected(e) => {
warn!("you were kicked, or your connection went down, or the signaling server stopped: {e}");
}
},
}
}
.fuse();

futures::pin_mut!(loop_fut);

let timeout = Delay::new(Duration::from_millis(100));
futures::pin_mut!(timeout);

let fake_user_quit = Delay::new(Duration::from_millis(20050)); // longer than reconnection timeouts
futures::pin_mut!(fake_user_quit);

loop {
// Handle any new peers
for (peer, state) in socket.update_peers() {
match state {
PeerState::Connected => {
info!("Peer joined: {peer}");
let packet = "hello friend!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
}
PeerState::Disconnected => {
info!("Peer left: {peer}");
}
}
}

// Accept any messages incoming
for (peer, packet) in socket.receive() {
let message = String::from_utf8_lossy(&packet);
info!("Message from {peer}: {message:?}");
}

select! {
// Restart this loop every 100ms
_ = (&mut timeout).fuse() => {
let peers: Vec<PeerId> = socket.connected_peers().collect();
for peer in peers {
let packet = "ping!".as_bytes().to_vec().into_boxed_slice();
socket.send(packet, peer);
}
timeout.reset(Duration::from_millis(10));
}

_ = (&mut fake_user_quit).fuse() => {
info!("timeout, stopping sending/receiving");
break;
}

// Or break if the message loop ends (disconnected, closed, etc.)
_ = &mut loop_fut => {
break;
}
}
}

info!("dropping socket (intentionally disconnecting if connected)");
drop(socket);

// join!(Delay::new(Duration::from_millis(2000)), loop_fut);

Delay::new(Duration::from_millis(2000)).await;
loop_fut.await;

info!("Finished");
}
3 changes: 1 addition & 2 deletions matchbox_signaling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ matchbox_protocol = { version = "0.7", path = "../matchbox_protocol", features =
axum = { version = "0.6", features = ["ws"] }
hyper = { version = "0.14", features = ["server"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower-http = { version = "0.4", features = ["cors", "trace"] }
tokio = { version = "1.32", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
uuid = { version = "1.4", features = ["serde", "v4"] }
clap = { version = "4.3", features = ["derive", "env"] }
thiserror = "1.0"
tokio-stream = "0.1"
async-trait = { version = "0.1" }

[dev-dependencies]
tokio-tungstenite = "0.20.0"
tracing-subscriber = "0.3"
9 changes: 6 additions & 3 deletions matchbox_signaling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ pub mod topologies;

pub use error::Error;
pub use signaling_server::{
builder::SignalingServerBuilder, callbacks::Callback, error::ClientRequestError,
error::SignalingError, handlers::WsStateMeta, server::SignalingServer, NoCallbacks, NoState,
SignalingCallbacks, SignalingState,
builder::SignalingServerBuilder,
callbacks::Callback,
error::{ClientRequestError, SignalingError},
handlers::WsStateMeta,
server::SignalingServer,
NoCallbacks, NoState, SignalingCallbacks, SignalingState,
};
pub use topologies::{common_logic, SignalingTopology};
3 changes: 2 additions & 1 deletion matchbox_signaling/src/topologies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ where
async fn state_machine(upgrade: WsStateMeta<Cb, S>);
}

/// Common, re-usable logic and types shared between topologies and which may be useful if building your own topology.
/// Common, re-usable logic and types shared between topologies and which may be useful if building
/// your own topology.
simbleau marked this conversation as resolved.
Show resolved Hide resolved
pub mod common_logic {
use crate::signaling_server::error::{ClientRequestError, SignalingError};
use axum::extract::ws::{Message, WebSocket};
Expand Down
1 change: 0 additions & 1 deletion matchbox_socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ once_cell = { version = "1.17", default-features = false, features = [
"race",
"alloc",
] }
crossbeam-channel = "0.5"
derive_more = "0.99"

ggrs = { version = "0.9", default-features = false, optional = true }
Expand Down
12 changes: 8 additions & 4 deletions matchbox_socket/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::webrtc_socket::error::SignalingError;

/// Errors that can happen when using Matchbox.
/// Errors that can happen when using Matchbox sockets.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// An error occurring during the signaling loop.
#[error("An error in the signaling loop: {0}")]
Signaling(#[from] SignalingError),
/// An error occurring if the connection fails to establish. Perhaps check your connection or
/// try again.
#[error("The connection failed to establish. Check your connection and try again.")]
ConnectionFailed(SignalingError),
/// Disconnected from the signaling server
#[error("The signaling server connection was severed.")]
Disconnected(SignalingError),
}
2 changes: 1 addition & 1 deletion matchbox_socket/src/ggrs_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl WebRtcSocket {

impl WebRtcSocket {
/// Returns a Vec of connected peers as [`ggrs::PlayerType`]
pub fn players(&self) -> Vec<PlayerType<PeerId>> {
pub fn players(&mut self) -> Vec<PlayerType<PeerId>> {
let Some(our_id) = self.id() else {
// we're still waiting for the server to initialize our id
// no peers should be added at this point anyway
Expand Down
27 changes: 9 additions & 18 deletions matchbox_socket/src/webrtc_socket/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::webrtc_socket::messages::PeerEvent;
use cfg_if::cfg_if;
use futures_channel::mpsc::TrySendError;

/// An error that can occur when getting a socket's channel through
/// `get_channel`, `take_channel` or `try_update_peers`.
Expand All @@ -20,41 +19,33 @@ pub enum ChannelError {
Closed,
}

/// An error that can occur with WebRTC signaling.
/// An error that can occur with WebRTC messaging.
#[derive(Debug, thiserror::Error)]
pub enum SignalingError {
// Common
#[error("failed to send event to signaling server: {0}")]
Undeliverable(#[from] TrySendError<PeerEvent>),
#[error("failed to send to signaling server: {0}")]
UndeliverableSignal(#[from] futures_channel::mpsc::TrySendError<PeerEvent>),

#[error("The stream is exhausted")]
StreamExhausted,

#[error("Message received in unknown format")]
UnknownFormat,

#[error("failed to establish initial connection: {0}")]
ConnectionFailed(#[from] Box<SignalingError>),
NegotiationFailed(#[from] Box<SignalingError>),

// Native
#[cfg(not(target_arch = "wasm32"))]
#[error("socket failure communicating with signaling server: {0}")]
Socket(#[from] async_tungstenite::tungstenite::Error),
WebSocket(#[from] async_tungstenite::tungstenite::Error),

// WASM
#[cfg(target_arch = "wasm32")]
#[error("socket failure communicating with signaling server: {0}")]
Socket(#[from] ws_stream_wasm::WsErr),
WebSocket(#[from] ws_stream_wasm::WsErr),
}

/// An error that can occur with WebRTC messaging.
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, thiserror::Error)]
#[error("failed to send message to peer: {0}")]
pub(crate) struct MessagingError(#[from] futures_channel::mpsc::TrySendError<crate::Packet>);

#[cfg(target_arch = "wasm32")]
#[derive(Debug, thiserror::Error)]
#[error("failed to send message to peer: {0}")]
pub(crate) struct MessagingError(#[from] JsError);

cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use wasm_bindgen::{JsValue};
Expand Down
Loading
Loading