diff --git a/rtc-shared/src/handler.rs b/rtc-shared/src/handler.rs new file mode 100644 index 0000000..0531a7b --- /dev/null +++ b/rtc-shared/src/handler.rs @@ -0,0 +1,50 @@ +use crate::error::Result; +use retty::transport::Transmit; +use std::time::Instant; + +pub trait RTCHandler { + /// Associated event input message type + type Ein: 'static; + /// Associated event output message type + type Eout: 'static; + /// Associated read input message type + type Rin: 'static; + /// Associated read output message type + type Rout: 'static; + /// Associated write input message type + type Win: 'static; + /// Associated write output message type for + type Wout: 'static; + + /// Handles Rin and returns Rout for next inbound handler handling + fn handle_read(&mut self, msg: Transmit) -> Result<()>; + + /// Polls Rout from internal queue for next inbound handler handling + fn poll_read(&mut self) -> Option>; + + /// Handles Win and returns Wout for next outbound handler handling + fn handle_write(&mut self, msg: Transmit) -> Result<()>; + + /// Polls Wout from internal queue for next outbound handler handling + fn poll_write(&mut self) -> Option>; + + /// Handles event + fn handle_event(&mut self, _evt: Self::Ein) -> Result<()> { + Ok(()) + } + + /// Polls event + fn poll_event(&mut self) -> Option { + None + } + + /// Handles timeout + fn handle_timeout(&mut self, _now: Instant) -> Result<()> { + Ok(()) + } + + /// Polls timeout + fn poll_timeout(&mut self) -> Option { + None + } +} diff --git a/rtc-shared/src/lib.rs b/rtc-shared/src/lib.rs index b236ab3..22d8310 100644 --- a/rtc-shared/src/lib.rs +++ b/rtc-shared/src/lib.rs @@ -11,6 +11,7 @@ pub mod marshal; pub mod replay_detector; pub mod error; +pub mod handler; pub mod util; pub use retty::transport::{ diff --git a/rtc/src/handler/demuxer.rs b/rtc/src/handler/demuxer.rs index 4f60982..2b5a229 100644 --- a/rtc/src/handler/demuxer.rs +++ b/rtc/src/handler/demuxer.rs @@ -1,7 +1,9 @@ -use crate::handler::RTCHandler; use crate::messages::{DTLSMessage, RTCMessage, RTPMessage, STUNMessage}; use log::{debug, error}; +use shared::error::Result; +use shared::handler::RTCHandler; use shared::Transmit; +use std::collections::VecDeque; /// match_range is a MatchFunc that accepts packets with the first byte in [lower..upper] fn match_range(lower: u8, upper: u8, buf: &[u8]) -> bool { @@ -39,62 +41,80 @@ fn match_srtp(b: &[u8]) -> bool { /// DemuxerHandler implements demuxing of STUN/DTLS/RTP/RTCP Protocol packets #[derive(Default)] -pub struct Demuxer; +pub struct Demuxer { + routs: VecDeque>, + wouts: VecDeque>, +} impl Demuxer { pub fn new() -> Self { - Demuxer + Self { + routs: VecDeque::new(), + wouts: VecDeque::new(), + } } } impl RTCHandler for Demuxer { - fn handle_transmit(&mut self, msg: Transmit) -> Vec> { + type Ein = (); + type Eout = (); + type Rin = RTCMessage; + type Rout = RTCMessage; + type Win = RTCMessage; + type Wout = RTCMessage; + + fn handle_read(&mut self, msg: Transmit) -> Result<()> { if let RTCMessage::Raw(message) = msg.message { if message.is_empty() { error!("drop invalid packet due to zero length"); - vec![] } else if match_dtls(&message) { - vec![Transmit { + self.routs.push_back(Transmit { now: msg.now, transport: msg.transport, message: RTCMessage::Dtls(DTLSMessage::Raw(message)), - }] + }); } else if match_srtp(&message) { - vec![Transmit { + self.routs.push_back(Transmit { now: msg.now, transport: msg.transport, message: RTCMessage::Rtp(RTPMessage::Raw(message)), - }] + }); } else { - vec![Transmit { + self.routs.push_back(Transmit { now: msg.now, transport: msg.transport, message: RTCMessage::Stun(STUNMessage::Raw(message)), - }] + }); } } else { debug!("drop non-RAW packet {:?}", msg.message); - vec![] } + + Ok(()) } - fn poll_transmit(&mut self, msg: Option>) -> Option> { - if let Some(msg) = msg { - match msg.message { - RTCMessage::Stun(STUNMessage::Raw(message)) - | RTCMessage::Dtls(DTLSMessage::Raw(message)) - | RTCMessage::Rtp(RTPMessage::Raw(message)) => Some(Transmit { - now: msg.now, - transport: msg.transport, - message: RTCMessage::Raw(message), - }), - _ => { - debug!("drop non-RAW packet {:?}", msg.message); - None - } + fn poll_read(&mut self) -> Option> { + self.routs.pop_front() + } + + fn handle_write(&mut self, msg: Transmit) -> Result<()> { + match msg.message { + RTCMessage::Stun(STUNMessage::Raw(message)) + | RTCMessage::Dtls(DTLSMessage::Raw(message)) + | RTCMessage::Rtp(RTPMessage::Raw(message)) => self.wouts.push_back(Transmit { + now: msg.now, + transport: msg.transport, + message: RTCMessage::Raw(message), + }), + _ => { + debug!("drop non-RAW packet {:?}", msg.message); } - } else { - None } + + Ok(()) + } + + fn poll_write(&mut self) -> Option> { + self.wouts.pop_front() } } diff --git a/rtc/src/handler/dtls.rs b/rtc/src/handler/dtls.rs index f4315bf..208a498 100644 --- a/rtc/src/handler/dtls.rs +++ b/rtc/src/handler/dtls.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Instant; use crate::api::setting_engine::SettingEngine; -use crate::handler::RTCHandler; use crate::messages::{DTLSMessage, RTCEvent, RTCMessage}; use crate::transport::dtls_transport::RTCDtlsTransport; use dtls::endpoint::EndpointEvent; @@ -12,6 +11,7 @@ use dtls::extension::extension_use_srtp::SrtpProtectionProfile; use dtls::state::State; use log::{debug, error, warn}; use shared::error::{Error, Result}; +use shared::handler::RTCHandler; use shared::Transmit; use srtp::option::{srtcp_replay_protection, srtp_no_replay_protection, srtp_replay_protection}; use srtp::protection_profile::ProtectionProfile; diff --git a/rtc/src/handler/ice.rs b/rtc/src/handler/ice.rs index e228bba..6f6cd96 100644 --- a/rtc/src/handler/ice.rs +++ b/rtc/src/handler/ice.rs @@ -1,4 +1,3 @@ -use crate::handler::RTCHandler; use crate::messages::{RTCEvent, RTCMessage, STUNMessage}; use crate::transport::ice_transport::ice_candidate_pair::RTCIceCandidatePair; use crate::transport::ice_transport::{IceTransportEvent, RTCIceTransport}; @@ -6,6 +5,7 @@ use bytes::BytesMut; use ice::Event; use log::{debug, error, warn}; use shared::error::Result; +use shared::handler::RTCHandler; use shared::Transmit; use std::time::Instant; diff --git a/rtc/src/handler/mod.rs b/rtc/src/handler/mod.rs index 7fc8696..abd107d 100644 --- a/rtc/src/handler/mod.rs +++ b/rtc/src/handler/mod.rs @@ -1,37 +1,5 @@ -use crate::messages::{RTCEvent, RTCMessage}; -use shared::error::Error; -use shared::Transmit; -use std::time::Instant; - pub mod demuxer; -pub mod dtls; +/*TODO:pub mod dtls; pub mod ice; -mod sctp; - -pub trait RTCHandler { - /// Handles input message - fn handle_transmit(&mut self, msg: Transmit) -> Vec> { - vec![msg] - } - - /// Polls output message from internal transmit queue - fn poll_transmit(&mut self, msg: Option>) -> Option> { - msg - } - - fn poll_event(&mut self) -> Option { - None - } - - /// Handles a timeout event - fn handle_timeout(&mut self, _now: Instant) {} - - /// Polls a timeout event - fn poll_timeout(&mut self, _eto: &mut Instant) {} - - /// Handle an error event - fn handle_error(&mut self, _err: Error) {} - - /// Handle a close event - fn handle_close(&mut self) {} -} +pub mod sctp; +*/ diff --git a/rtc/src/handler/sctp.rs b/rtc/src/handler/sctp.rs index 63c7bd0..21df28b 100644 --- a/rtc/src/handler/sctp.rs +++ b/rtc/src/handler/sctp.rs @@ -1,4 +1,3 @@ -use crate::handler::RTCHandler; use crate::messages::{ DTLSMessage, DataChannelMessage, DataChannelMessageParams, DataChannelMessageType, RTCEvent, RTCMessage, @@ -11,6 +10,7 @@ use sctp::{ PayloadProtocolIdentifier, StreamEvent, }; use shared::error::{Error, Result}; +use shared::handler::RTCHandler; use shared::Transmit; use std::collections::{HashMap, VecDeque}; use std::time::Instant;