Skip to content

Commit

Permalink
move RTCHandler to shared
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Jun 29, 2024
1 parent 1a9bfce commit 18cb778
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 66 deletions.
50 changes: 50 additions & 0 deletions rtc-shared/src/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::error::Result;
use retty::transport::Transmit;
use std::time::Instant;

pub trait RTCHandler {
/// Associated event input message type
type Ein: 'static;
/// Associated event output message type
type Eout: 'static;
/// Associated read input message type
type Rin: 'static;
/// Associated read output message type
type Rout: 'static;
/// Associated write input message type
type Win: 'static;
/// Associated write output message type for
type Wout: 'static;

/// Handles Rin and returns Rout for next inbound handler handling
fn handle_read(&mut self, msg: Transmit<Self::Rin>) -> Result<()>;

/// Polls Rout from internal queue for next inbound handler handling
fn poll_read(&mut self) -> Option<Transmit<Self::Rout>>;

/// Handles Win and returns Wout for next outbound handler handling
fn handle_write(&mut self, msg: Transmit<Self::Win>) -> Result<()>;

/// Polls Wout from internal queue for next outbound handler handling
fn poll_write(&mut self) -> Option<Transmit<Self::Wout>>;

/// Handles event
fn handle_event(&mut self, _evt: Self::Ein) -> Result<()> {
Ok(())
}

/// Polls event
fn poll_event(&mut self) -> Option<Self::Eout> {
None
}

/// Handles timeout
fn handle_timeout(&mut self, _now: Instant) -> Result<()> {
Ok(())
}

/// Polls timeout
fn poll_timeout(&mut self) -> Option<Instant> {
None
}
}
1 change: 1 addition & 0 deletions rtc-shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod marshal;
pub mod replay_detector;

pub mod error;
pub mod handler;
pub mod util;

pub use retty::transport::{
Expand Down
76 changes: 48 additions & 28 deletions rtc/src/handler/demuxer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::handler::RTCHandler;
use crate::messages::{DTLSMessage, RTCMessage, RTPMessage, STUNMessage};
use log::{debug, error};
use shared::error::Result;
use shared::handler::RTCHandler;
use shared::Transmit;
use std::collections::VecDeque;

/// match_range is a MatchFunc that accepts packets with the first byte in [lower..upper]
fn match_range(lower: u8, upper: u8, buf: &[u8]) -> bool {
Expand Down Expand Up @@ -39,62 +41,80 @@ fn match_srtp(b: &[u8]) -> bool {

/// DemuxerHandler implements demuxing of STUN/DTLS/RTP/RTCP Protocol packets
#[derive(Default)]
pub struct Demuxer;
pub struct Demuxer {
routs: VecDeque<Transmit<RTCMessage>>,
wouts: VecDeque<Transmit<RTCMessage>>,
}

impl Demuxer {
pub fn new() -> Self {
Demuxer
Self {
routs: VecDeque::new(),
wouts: VecDeque::new(),
}
}
}

impl RTCHandler for Demuxer {
fn handle_transmit(&mut self, msg: Transmit<RTCMessage>) -> Vec<Transmit<RTCMessage>> {
type Ein = ();
type Eout = ();
type Rin = RTCMessage;
type Rout = RTCMessage;
type Win = RTCMessage;
type Wout = RTCMessage;

fn handle_read(&mut self, msg: Transmit<Self::Rin>) -> Result<()> {
if let RTCMessage::Raw(message) = msg.message {
if message.is_empty() {
error!("drop invalid packet due to zero length");
vec![]
} else if match_dtls(&message) {
vec![Transmit {
self.routs.push_back(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Dtls(DTLSMessage::Raw(message)),
}]
});
} else if match_srtp(&message) {
vec![Transmit {
self.routs.push_back(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Rtp(RTPMessage::Raw(message)),
}]
});
} else {
vec![Transmit {
self.routs.push_back(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Stun(STUNMessage::Raw(message)),
}]
});
}
} else {
debug!("drop non-RAW packet {:?}", msg.message);
vec![]
}

Ok(())
}

fn poll_transmit(&mut self, msg: Option<Transmit<RTCMessage>>) -> Option<Transmit<RTCMessage>> {
if let Some(msg) = msg {
match msg.message {
RTCMessage::Stun(STUNMessage::Raw(message))
| RTCMessage::Dtls(DTLSMessage::Raw(message))
| RTCMessage::Rtp(RTPMessage::Raw(message)) => Some(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Raw(message),
}),
_ => {
debug!("drop non-RAW packet {:?}", msg.message);
None
}
fn poll_read(&mut self) -> Option<Transmit<Self::Rout>> {
self.routs.pop_front()
}

fn handle_write(&mut self, msg: Transmit<Self::Win>) -> Result<()> {
match msg.message {
RTCMessage::Stun(STUNMessage::Raw(message))
| RTCMessage::Dtls(DTLSMessage::Raw(message))
| RTCMessage::Rtp(RTPMessage::Raw(message)) => self.wouts.push_back(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Raw(message),
}),
_ => {
debug!("drop non-RAW packet {:?}", msg.message);
}
} else {
None
}

Ok(())
}

fn poll_write(&mut self) -> Option<Transmit<RTCMessage>> {
self.wouts.pop_front()
}
}
2 changes: 1 addition & 1 deletion rtc/src/handler/dtls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::sync::Arc;
use std::time::Instant;

use crate::api::setting_engine::SettingEngine;
use crate::handler::RTCHandler;
use crate::messages::{DTLSMessage, RTCEvent, RTCMessage};
use crate::transport::dtls_transport::RTCDtlsTransport;
use dtls::endpoint::EndpointEvent;
use dtls::extension::extension_use_srtp::SrtpProtectionProfile;
use dtls::state::State;
use log::{debug, error, warn};
use shared::error::{Error, Result};
use shared::handler::RTCHandler;
use shared::Transmit;
use srtp::option::{srtcp_replay_protection, srtp_no_replay_protection, srtp_replay_protection};
use srtp::protection_profile::ProtectionProfile;
Expand Down
2 changes: 1 addition & 1 deletion rtc/src/handler/ice.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::handler::RTCHandler;
use crate::messages::{RTCEvent, RTCMessage, STUNMessage};
use crate::transport::ice_transport::ice_candidate_pair::RTCIceCandidatePair;
use crate::transport::ice_transport::{IceTransportEvent, RTCIceTransport};
use bytes::BytesMut;
use ice::Event;
use log::{debug, error, warn};
use shared::error::Result;
use shared::handler::RTCHandler;
use shared::Transmit;
use std::time::Instant;

Expand Down
38 changes: 3 additions & 35 deletions rtc/src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,5 @@
use crate::messages::{RTCEvent, RTCMessage};
use shared::error::Error;
use shared::Transmit;
use std::time::Instant;

pub mod demuxer;
pub mod dtls;
/*TODO:pub mod dtls;
pub mod ice;
mod sctp;

pub trait RTCHandler {
/// Handles input message
fn handle_transmit(&mut self, msg: Transmit<RTCMessage>) -> Vec<Transmit<RTCMessage>> {
vec![msg]
}

/// Polls output message from internal transmit queue
fn poll_transmit(&mut self, msg: Option<Transmit<RTCMessage>>) -> Option<Transmit<RTCMessage>> {
msg
}

fn poll_event(&mut self) -> Option<RTCEvent> {
None
}

/// Handles a timeout event
fn handle_timeout(&mut self, _now: Instant) {}

/// Polls a timeout event
fn poll_timeout(&mut self, _eto: &mut Instant) {}

/// Handle an error event
fn handle_error(&mut self, _err: Error) {}

/// Handle a close event
fn handle_close(&mut self) {}
}
pub mod sctp;
*/
2 changes: 1 addition & 1 deletion rtc/src/handler/sctp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::handler::RTCHandler;
use crate::messages::{
DTLSMessage, DataChannelMessage, DataChannelMessageParams, DataChannelMessageType, RTCEvent,
RTCMessage,
Expand All @@ -11,6 +10,7 @@ use sctp::{
PayloadProtocolIdentifier, StreamEvent,
};
use shared::error::{Error, Result};
use shared::handler::RTCHandler;
use shared::Transmit;
use std::collections::{HashMap, VecDeque};
use std::time::Instant;
Expand Down

0 comments on commit 18cb778

Please sign in to comment.