Skip to content

Commit

Permalink
refactor track
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Apr 20, 2024
1 parent f683c51 commit 468acd9
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 73 deletions.
5 changes: 2 additions & 3 deletions rtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ pub mod rtp_transceiver;
pub mod stats;
pub mod transport;

/*pub mod track;
pub mod track;

// re-export sub-crates
pub use {data, dtls, ice, interceptor, mdns, media, rtcp, rtp, sctp, sdp, srtp, stun, turn, util};
*/
//pub use {data, dtls, ice, interceptor, mdns, media, rtcp, rtp, sctp, sdp, srtp, stun, turn, util};
35 changes: 16 additions & 19 deletions rtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ use crate::peer_connection::peer_connection_state::{
};
use crate::peer_connection::policy::ice_transport_policy::RTCIceTransportPolicy;
//use crate::peer_connection::sdp::sdp_type::RTCSdpType;
//use crate::peer_connection::sdp::{get_mid_value, get_peer_direction};
//use crate::peer_connection::sdp::sdp_type::RTCSdpType;
use crate::peer_connection::sdp::session_description::RTCSessionDescription;
//use crate::peer_connection::sdp::*;
use crate::peer_connection::signaling_state::{
/*check_next_signaling_state,*/ RTCSignalingState, //StateChangeOp,
};
//use crate::rtp_transceiver::rtp_codec::RTPCodecType;
//use crate::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection;
use crate::transport::dtls_transport::RTCDtlsTransport;
use crate::transport::ice_transport::{
ice_gatherer::{RTCIceGatherOptions, RTCIceGatherer},
Expand Down Expand Up @@ -1366,30 +1370,25 @@ impl RTCPeerConnection {
}
false
}
*/

/*
/// set_remote_description sets the SessionDescription of the remote peer
pub async fn set_remote_description(&self, mut desc: RTCSessionDescription) -> Result<()> {
if self.internal.is_closed.load(Ordering::SeqCst) {
pub fn set_remote_description(&mut self, mut desc: RTCSessionDescription) -> Result<()> {
if self.is_closed {
return Err(Error::ErrConnectionClosed);
}
let is_renegotiation = {
let current_remote_description = self.internal.current_remote_description.lock().await;
current_remote_description.is_some()
};
let is_renegotiation = self.current_remote_description.is_some();
desc.parsed = Some(desc.unmarshal()?);
self.set_description(&desc, StateChangeOp::SetRemote)
.await?;
self.set_description(&desc, StateChangeOp::SetRemote)?;
if let Some(parsed) = &desc.parsed {
self.internal
.media_engine
.update_from_remote_description(parsed)
.await?;
self.media_engine.update_from_remote_description(parsed)?;
let mut local_transceivers = self.get_transceivers().await;
let remote_description = self.remote_description().await;
let mut local_transceivers = self.get_transceivers();
let remote_description = self.remote_description();
let we_offer = desc.sdp_type == RTCSdpType::Answer;
if !we_offer {
Expand Down Expand Up @@ -1418,13 +1417,10 @@ impl RTCPeerConnection {
continue;
}
let t = if let Some(t) =
find_by_mid(mid_value, &mut local_transceivers).await
{
let t = if let Some(t) = find_by_mid(mid_value, &mut local_transceivers) {
Some(t)
} else {
satisfy_type_and_direction(kind, direction, &mut local_transceivers)
.await
};
if let Some(t) = t {
Expand Down Expand Up @@ -1644,8 +1640,9 @@ impl RTCPeerConnection {
}
Ok(())
}
}*/

/*
/// start_rtp_senders starts all outbound RTP streams
pub(crate) async fn start_rtp_senders(&self) -> Result<()> {
let current_transceivers = self.internal.rtp_transceivers.lock().await;
Expand Down
34 changes: 14 additions & 20 deletions rtc/src/rtp_transceiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,12 @@
mod rtp_transceiver_test;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use interceptor::stream_info::{RTPHeaderExtension, StreamInfo};
use interceptor::Attributes;
use log::trace;*/
use serde::{Deserialize, Serialize};
/*
use smol_str::SmolStr;
use tokio::sync::{Mutex, OnceCell};
use util::Unmarshal;
use crate::api::media_engine::MediaEngine;*/
use crate::rtp_transceiver::rtp_codec::*;
/*
Expand All @@ -27,9 +19,9 @@ use crate::track::track_local::TrackLocal;
*/
pub(crate) mod fmtp;
pub mod rtp_codec;
/*
pub mod rtp_receiver;
pub mod rtp_sender;*/

//pub mod rtp_receiver;
//pub mod rtp_sender;
pub mod rtp_transceiver_direction;
/*pub(crate) mod srtp_writer_future;
*/
Expand Down Expand Up @@ -477,13 +469,14 @@ impl fmt::Debug for RTCRtpTransceiver {
.finish()
}
}
pub(crate) async fn find_by_mid(
*/
/*
pub(crate) fn find_by_mid(
mid: &str,
local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
) -> Option<Arc<RTCRtpTransceiver>> {
local_transceivers: &mut Vec<RTCRtpTransceiver>,
) -> Option<RTCRtpTransceiver> {
for (i, t) in local_transceivers.iter().enumerate() {
if t.mid() == Some(SmolStr::from(mid)) {
if t.mid() == Some(mid) {
return Some(local_transceivers.remove(i));
}
}
Expand All @@ -493,11 +486,11 @@ pub(crate) async fn find_by_mid(
/// Given a direction+type pluck a transceiver from the passed list
/// if no entry satisfies the requested type+direction return a inactive Transceiver
pub(crate) async fn satisfy_type_and_direction(
pub(crate) fn satisfy_type_and_direction(
remote_kind: RTPCodecType,
remote_direction: RTCRtpTransceiverDirection,
local_transceivers: &mut Vec<Arc<RTCRtpTransceiver>>,
) -> Option<Arc<RTCRtpTransceiver>> {
local_transceivers: &mut Vec<RTCRtpTransceiver>,
) -> Option<RTCRtpTransceiver> {
// Get direction order from most preferred to least
let get_preferred_directions = || -> Vec<RTCRtpTransceiverDirection> {
match remote_direction {
Expand Down Expand Up @@ -561,4 +554,5 @@ pub(crate) fn handle_unknown_rtp_packet(
};
Ok((mid, rid, srid, payload_type))
}*/
}
*/
2 changes: 1 addition & 1 deletion rtc/src/rtp_transceiver/rtp_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use smol_str::SmolStr;
use tokio::sync::{watch, Mutex, RwLock};

use crate::api::media_engine::MediaEngine;
use crate::error::{flatten_errs, Error, Result};
use crate::peer_connection::sdp::TrackDetails;
use crate::rtp_transceiver::rtp_codec::{
codec_parameters_fuzzy_search, CodecMatch, RTCRtpCodecCapability, RTCRtpCodecParameters,
Expand All @@ -25,6 +24,7 @@ use crate::rtp_transceiver::{
use crate::track::track_remote::TrackRemote;
use crate::track::{TrackStream, TrackStreams};
use crate::transports::dtls_transport::RTCDtlsTransport;
use shared::error::{flatten_errs, Error, Result};

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
Expand Down
7 changes: 4 additions & 3 deletions rtc/src/track/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
pub mod track_local;
pub mod track_remote;
/*pub mod track_remote;
use std::sync::Arc;
use interceptor::stream_info::StreamInfo;
use interceptor::{RTCPReader, RTPReader};
use track_remote::*;

*/
pub(crate) const RTP_OUTBOUND_MTU: usize = 1200;
pub(crate) const RTP_PAYLOAD_TYPE_BITMASK: u8 = 0x7F;

/*
#[derive(Clone)]
pub(crate) struct TrackStream {
pub(crate) stream_info: Option<StreamInfo>,
Expand All @@ -27,3 +27,4 @@ pub(crate) struct TrackStreams {
pub(crate) stream: TrackStream,
pub(crate) repair_stream: TrackStream,
}
*/
35 changes: 16 additions & 19 deletions rtc/src/track/track_local/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
#[cfg(test)]
mod track_local_static_test;
//TODO:#[cfg(test)]
//mod track_local_static_test;

pub mod track_local_static_rtp;
pub mod track_local_static_sample;
//TODO:pub mod track_local_static_rtp;
//TOOD:pub mod track_local_static_sample;

use std::any::Any;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use async_trait::async_trait;
use interceptor::{Attributes, RTPWriter};
use tokio::sync::Mutex;
use util::Unmarshal;
//use interceptor::{Attributes, RTPWriter};
//use shared::marshal::Unmarshal;

use crate::error::{Error, Result};
use crate::rtp_transceiver::rtp_codec::*;
use crate::rtp_transceiver::*;
use shared::error::Result;

/// TrackLocalWriter is the Writer for outbound RTP Packets
#[async_trait]
pub trait TrackLocalWriter: fmt::Debug {
/// write_rtp encrypts a RTP packet and writes to the connection
async fn write_rtp(&self, p: &rtp::packet::Packet) -> Result<usize>;
fn write_rtp(&self, p: &rtp::packet::Packet) -> Result<usize>;

/// write encrypts and writes a full RTP packet
async fn write(&self, b: &[u8]) -> Result<usize>;
fn write(&self, b: &[u8]) -> Result<usize>;
}

/// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used
Expand Down Expand Up @@ -72,16 +69,15 @@ impl TrackLocalContext {
/// TrackLocal is an interface that controls how the user can send media
/// The user can provide their own TrackLocal implementations, or use
/// the implementations in pkg/media
#[async_trait]
pub trait TrackLocal {
/// bind should implement the way how the media data flows from the Track to the PeerConnection
/// This will be called internally after signaling is complete and the list of available
/// codecs has been determined
async fn bind(&self, t: &TrackLocalContext) -> Result<RTCRtpCodecParameters>;
fn bind(&self, t: &TrackLocalContext) -> Result<RTCRtpCodecParameters>;

/// unbind should implement the teardown logic when the track is no longer needed. This happens
/// because a track has been stopped.
async fn unbind(&self, t: &TrackLocalContext) -> Result<()>;
fn unbind(&self, t: &TrackLocalContext) -> Result<()>;

/// id is the unique identifier for this Track. This should be unique for the
/// stream, but doesn't have to globally unique. A common example would be 'audio' or 'video'
Expand Down Expand Up @@ -116,15 +112,16 @@ impl TrackBinding {
}
}

/*TODO:
pub(crate) struct InterceptorToTrackLocalWriter {
pub(crate) interceptor_rtp_writer: Mutex<Option<Arc<dyn RTPWriter + Send + Sync>>>,
pub(crate) interceptor_rtp_writer: Option<Arc<dyn RTPWriter + Send + Sync>>,
sender_paused: Arc<AtomicBool>,
}
impl InterceptorToTrackLocalWriter {
pub(crate) fn new(paused: Arc<AtomicBool>) -> Self {
InterceptorToTrackLocalWriter {
interceptor_rtp_writer: Mutex::new(None),
interceptor_rtp_writer: None,
sender_paused: paused,
}
}
Expand All @@ -140,9 +137,8 @@ impl std::fmt::Debug for InterceptorToTrackLocalWriter {
}
}
#[async_trait]
impl TrackLocalWriter for InterceptorToTrackLocalWriter {
async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
if self.is_sender_paused() {
return Ok(0);
}
Expand All @@ -161,3 +157,4 @@ impl TrackLocalWriter for InterceptorToTrackLocalWriter {
self.write_rtp(&pkt).await
}
}
*/
8 changes: 3 additions & 5 deletions rtc/src/track/track_local/track_local_static_rtp.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::HashMap;

use bytes::BytesMut;
use tokio::sync::Mutex;
use util::{Marshal, MarshalSize};
use shared::marshal::{Marshal, MarshalSize};

use super::*;
use crate::error::flatten_errs;
use shared::error::flatten_errs;

/// TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets.
/// If you wish to send a media.Sample use TrackLocalStaticSample
Expand Down Expand Up @@ -133,12 +132,11 @@ impl TrackLocalStaticRTP {
}
}

#[async_trait]
impl TrackLocal for TrackLocalStaticRTP {
/// bind is called by the PeerConnection after negotiation is complete
/// This asserts that the code requested is supported by the remote peer.
/// If so it setups all the state (SSRC and PayloadType) to have a call
async fn bind(&self, t: &TrackLocalContext) -> Result<RTCRtpCodecParameters> {
fn bind(&self, t: &TrackLocalContext) -> Result<RTCRtpCodecParameters> {
let parameters = RTCRtpCodecParameters {
capability: self.codec.clone(),
..Default::default()
Expand Down
5 changes: 2 additions & 3 deletions rtc/src/track/track_local/track_local_static_sample.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use log::warn;
use media::Sample;
use tokio::sync::Mutex;

use super::track_local_static_rtp::TrackLocalStaticRTP;
use super::*;
use crate::error::flatten_errs;
use crate::track::RTP_OUTBOUND_MTU;
use shared::error::flatten_errs;

#[derive(Debug, Clone)]
struct TrackLocalStaticSampleInternal {
Expand Down Expand Up @@ -243,7 +242,7 @@ mod sample_writer {
use rtp::extension::HeaderExtension;

use super::TrackLocalStaticSample;
use crate::error::Result;
use shared::error::Result;

/// Helper for writing Samples via [`TrackLocalStaticSample`] that carry extra RTP data.
///
Expand Down

0 comments on commit 468acd9

Please sign in to comment.