Skip to content

Commit

Permalink
impl RTCHandler for RTCIceTransport
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 24, 2024
1 parent e38ff45 commit bbb4887
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 82 deletions.
127 changes: 127 additions & 0 deletions rtc/src/handlers/ice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use crate::handlers::RTCHandler;
use crate::messages::{RTCEvent, RTCMessage, STUNMessage};
use crate::transport::ice_transport::ice_candidate_pair::RTCIceCandidatePair;
use crate::transport::ice_transport::{IceTransportEvent, RTCIceTransport};
use bytes::BytesMut;
use ice::Event;
use log::{debug, error, warn};
use shared::error::{Error, Result};
use shared::Transmit;
use std::time::Instant;

impl RTCHandler for RTCIceTransport {
fn handle_transmit(&mut self, msg: Transmit<RTCMessage>) -> Vec<Transmit<RTCMessage>> {
if let RTCMessage::Stun(STUNMessage::Raw(message)) = msg.message {
let stun_transmit = Transmit {
now: msg.now,
transport: msg.transport,
message,
};

let try_read = || -> Result<()> {
let ice_agent = self
.gatherer
.agent
.as_mut()
.ok_or(Error::ErrICEAgentNotExist)?;

ice_agent.handle_read(stun_transmit)
};

if let Err(err) = try_read() {
warn!("try_read got error {}", err);
self.handle_error(err);
}
vec![]
} else {
debug!("bypass StunHandler read for {}", msg.transport.peer_addr);
vec![msg]
}
}

fn poll_transmit(&mut self, msg: Option<Transmit<RTCMessage>>) -> Option<Transmit<RTCMessage>> {
if let Some(msg) = msg {
if let RTCMessage::Stun(STUNMessage::Stun(mut stun_message)) = msg.message {
debug!(
"StunMessage type {} sent to {}",
stun_message.typ, msg.transport.peer_addr
);
stun_message.encode();
let message = BytesMut::from(&stun_message.raw[..]);
self.transmits.push_back(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Stun(STUNMessage::Raw(message)),
});
} else {
debug!("bypass StunHandler write for {}", msg.transport.peer_addr);
self.transmits.push_back(msg);
}
}

self.transmits.pop_front()
}

fn poll_event(&mut self) -> Option<RTCEvent> {
if let Some(ice_agent) = self.gatherer.agent.as_mut() {
if let Some(event) = ice_agent.poll_event() {
match event {
Event::ConnectionStateChange(state) => Some(RTCEvent::IceTransportEvent(
IceTransportEvent::OnConnectionStateChange(state.into()),
)),
Event::SelectedCandidatePairChange(local, remote) => {
Some(RTCEvent::IceTransportEvent(
IceTransportEvent::OnSelectedCandidatePairChange(Box::new(
RTCIceCandidatePair::new((&*local).into(), (&*remote).into()),
)),
))
}
}
} else {
None
}
} else {
None
}
}

/// Handles a timeout event
fn handle_timeout(&mut self, now: Instant) {
let mut try_timeout = || -> Result<()> {
let ice_agent = self
.gatherer
.agent
.as_mut()
.ok_or(Error::ErrICEAgentNotExist)?;

ice_agent.handle_timeout(now);
while let Some(transmit) = ice_agent.poll_transmit() {
self.transmits.push_back(Transmit {
now: transmit.now,
transport: transmit.transport,
message: RTCMessage::Stun(STUNMessage::Raw(transmit.message)),
});
}

Ok(())
};
match try_timeout() {
Ok(_) => {}
Err(err) => {
error!("try_timeout with error {}", err);
self.handle_error(err);
}
}
}

/// Polls a timeout event
fn poll_timeout(&mut self, eto: &mut Instant) {
if let Some(ice_agent) = self.gatherer.agent.as_mut() {
if let Some(timeout) = ice_agent.poll_timeout() {
if timeout < *eto {
*eto = timeout;
}
}
}
}
}
2 changes: 1 addition & 1 deletion rtc/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::time::Instant;

pub mod demuxer;
pub mod dtls;
pub mod ice;
mod sctp;
pub mod stun;

pub trait RTCHandler {
/// Handles input message
Expand Down
75 changes: 0 additions & 75 deletions rtc/src/handlers/stun.rs

This file was deleted.

11 changes: 5 additions & 6 deletions rtc/src/transport/ice_transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use ice_role::RTCIceRole;
use std::collections::VecDeque;

//use crate::transports::ice_transport::ice_parameters::RTCIceParameters;
use crate::messages::RTCMessage;
use crate::stats::stats_collector::StatsCollector;
use crate::stats::ICETransportStats;
use crate::stats::StatsReportType::Transport;
use crate::transport::ice_transport::ice_transport_state::RTCIceTransportState;
use shared::error::{Error, Result};
use shared::Transmit;

/*TODO:#[cfg(test)]
mod ice_transport_test;
Expand All @@ -37,26 +39,23 @@ pub enum IceTransportEvent {
OnSelectedCandidatePairChange(Box<RTCIceCandidatePair>),
}

#[derive(Default)]
struct ICETransportInternal {}

/// ICETransport allows an application access to information about the ICE
/// transport over which packets are sent and received.
#[derive(Default)]
pub struct RTCIceTransport {
pub(crate) gatherer: RTCIceGatherer,
state: RTCIceTransportState,
events: VecDeque<IceTransportEvent>,
role: RTCIceRole,

pub(crate) transmits: VecDeque<Transmit<RTCMessage>>,
}

impl RTCIceTransport {
/// creates a new new_icetransport.
/// creates a new new_ice_transport.
pub(crate) fn new(gatherer: RTCIceGatherer) -> Self {
RTCIceTransport {
gatherer,
state: RTCIceTransportState::New,
events: VecDeque::new(),
..Default::default()
}
}
Expand Down

0 comments on commit bbb4887

Please sign in to comment.