Skip to content

Commit

Permalink
feat: socket/channel closed detection (#337)
Browse files Browse the repository at this point in the history
determine if a socket or channel is closed for better error handling

---------

Co-authored-by: Johan Klokkhammer Helsing <[email protected]>
  • Loading branch information
simbleau and johanhelsing authored Jan 6, 2024
1 parent b5013e1 commit 3e36ac5
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 11 deletions.
2 changes: 0 additions & 2 deletions matchbox_signaling/src/signaling_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ pub(crate) mod error;
pub(crate) mod handlers;
pub(crate) mod server;

pub use server::SignalingServer;

/// State managed by the signaling server
pub trait SignalingState: Clone + Send + Sync + 'static {}

Expand Down
112 changes: 103 additions & 9 deletions matchbox_socket/src/webrtc_socket/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl WebRtcSocketBuilder {
/// Sets the number of attempts to make at reconnecting to the signaling server,
/// if `None` the socket will attempt to connect indefinitely.
///
/// The default is 2 reconnection attempts.
/// The default is 3 reconnection attempts.
pub fn reconnect_attempts(mut self, attempts: Option<u16>) -> Self {
self.config.attempts = attempts;
self
Expand Down Expand Up @@ -349,6 +349,20 @@ pub struct WebRtcChannel {
}

impl WebRtcChannel {
/// Returns whether it's still possible to send messages.
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}

/// Close this channel.
///
/// This prevents sending and receiving any messages in the future, but does not drain messages
/// that are buffered.
pub fn close(&mut self) {
self.tx.close_channel();
self.rx.close();
}

/// Call this where you want to handle new received messages. Returns immediately.
///
/// Messages are removed from the socket when called.
Expand Down Expand Up @@ -430,6 +444,17 @@ impl WebRtcSocket {
}

impl<C: ChannelPlurality> WebRtcSocket<C> {
// Todo: Disconnect from the peer, severing all communication channels.
// pub fn disconnect(&mut self, peer: PeerId) {}

/// Close this socket, disconnecting all channels.
pub fn close(&mut self) {
self.channels
.iter_mut()
.filter_map(Option::as_mut)
.for_each(|c| c.close());
}

/// Handle peers connecting or disconnecting
///
/// Constructed using [`WebRtcSocketBuilder`].
Expand Down Expand Up @@ -515,7 +540,7 @@ impl<C: ChannelPlurality> WebRtcSocket<C> {
}
}

/// Gets a mutable reference to the [`WebRtcChannel`] of a given id.
/// Gets an immutable reference to the [`WebRtcChannel`] of a given id.
///
/// ```
/// use matchbox_socket::*;
Expand All @@ -524,20 +549,43 @@ impl<C: ChannelPlurality> WebRtcSocket<C> {
/// .add_channel(ChannelConfig::reliable())
/// .add_channel(ChannelConfig::unreliable())
/// .build();
/// let reliable_channel_messages = socket.channel(0).receive();
/// let is_closed = socket.channel(0).is_closed();
/// ```
///
/// See also: [`WebRtcSocket::get_channel`], [`WebRtcSocket::take_channel`]
/// See also: [`WebRtcSocket::channel_mut`], [`WebRtcSocket::get_channel`],
/// [`WebRtcSocket::take_channel`]
///
/// # Panics
///
/// will panic if the channel cannot be found.
pub fn channel(&mut self, channel: usize) -> &mut WebRtcChannel {
pub fn channel(&self, channel: usize) -> &WebRtcChannel {
self.get_channel(channel).unwrap()
}

/// Gets a mutable reference to the [`WebRtcChannel`] of a given id.
///
/// ```
/// use matchbox_socket::*;
///
/// let (mut socket, message_loop) = WebRtcSocketBuilder::new("wss://example.invalid/")
/// .add_channel(ChannelConfig::reliable())
/// .add_channel(ChannelConfig::unreliable())
/// .build();
/// let reliable_channel_messages = socket.channel_mut(0).receive();
/// ```
///
/// See also: [`WebRtcSocket::channel`], [`WebRtcSocket::get_channel_mut`],
/// [`WebRtcSocket::take_channel`]
///
/// # Panics
///
/// will panic if the channel cannot be found.
pub fn channel_mut(&mut self, channel: usize) -> &mut WebRtcChannel {
self.get_channel_mut(channel).unwrap()
}

/// Gets an immutable reference to the [`WebRtcChannel`] of a given id.
///
/// Returns an error if the channel was not found.
///
/// ```
Expand All @@ -547,11 +595,34 @@ impl<C: ChannelPlurality> WebRtcSocket<C> {
/// .add_channel(ChannelConfig::reliable())
/// .add_channel(ChannelConfig::unreliable())
/// .build();
/// let reliable_channel_messages = socket.get_channel(0).unwrap().receive();
/// let is_closed = socket.get_channel(0).unwrap().is_closed();
/// ```
///
/// See also: [`WebRtcSocket::get_channel_mut`], [`WebRtcSocket::take_channel`]
pub fn get_channel(&self, channel: usize) -> Result<&WebRtcChannel, ChannelError> {
self.channels
.get(channel)
.ok_or(ChannelError::NotFound)?
.as_ref()
.ok_or(ChannelError::Taken)
}

/// Gets a mutable reference to the [`WebRtcChannel`] of a given id.
///
/// Returns an error if the channel was not found.
///
/// ```
/// use matchbox_socket::*;
///
/// let (mut socket, message_loop) = WebRtcSocketBuilder::new("wss://example.invalid/")
/// .add_channel(ChannelConfig::reliable())
/// .add_channel(ChannelConfig::unreliable())
/// .build();
/// let reliable_channel_messages = socket.get_channel_mut(0).unwrap().receive();
/// ```
///
/// See also: [`WebRtcSocket::channel`], [`WebRtcSocket::take_channel`]
pub fn get_channel(&mut self, channel: usize) -> Result<&mut WebRtcChannel, ChannelError> {
pub fn get_channel_mut(&mut self, channel: usize) -> Result<&mut WebRtcChannel, ChannelError> {
self.channels
.get_mut(channel)
.ok_or(ChannelError::NotFound)?
Expand Down Expand Up @@ -587,13 +658,13 @@ impl WebRtcSocket<SingleChannel> {
///
/// Messages are removed from the socket when called.
pub fn receive(&mut self) -> Vec<(PeerId, Packet)> {
self.channel(0).receive()
self.channel_mut(0).receive()
}

/// Try to send a packet to the given peer. An error is propagated if the socket future
/// is dropped. `Ok` is not a guarantee of delivery.
pub fn try_send(&mut self, packet: Packet, peer: PeerId) -> Result<(), SendError> {
self.channel(0).try_send(packet, peer)
self.channel_mut(0).try_send(packet, peer)
}

/// Send a packet to the given peer. There is no guarantee of delivery.
Expand All @@ -603,6 +674,29 @@ impl WebRtcSocket<SingleChannel> {
pub fn send(&mut self, packet: Packet, peer: PeerId) {
self.try_send(packet, peer).expect("Send failed");
}

/// Returns whether the socket channel is closed
pub fn is_closed(&self) -> bool {
self.channel(0).is_closed()
}
}

impl WebRtcSocket<MultipleChannels> {
/// Returns whether any socket channel is closed
pub fn any_closed(&self) -> bool {
self.channels
.iter()
.filter_map(Option::as_ref)
.any(|c| c.is_closed())
}

/// Returns whether all socket channels are closed
pub fn all_closed(&self) -> bool {
self.channels
.iter()
.filter_map(Option::as_ref)
.all(|c| c.is_closed())
}
}

pub(crate) fn new_senders_and_receivers<T>(
Expand Down

0 comments on commit 3e36ac5

Please sign in to comment.