Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
notedit committed May 8, 2020
1 parent cb31193 commit 1b12c93
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 72 deletions.
51 changes: 23 additions & 28 deletions incomingstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@ import (
type IncomingStream struct {
id string
info *sdp.StreamInfo
transport transportWrapper
transport native.DTLSICETransport
receiver native.RTPReceiverFacade
tracks map[string]*IncomingStreamTrack
onStopListeners []func()
onStreamAddIncomingTrackListeners []func(*IncomingStreamTrack)
sync.Mutex
l sync.Mutex
}

// internal use
type transportWrapper interface {
AddIncomingSourceGroup(group native.RTPIncomingSourceGroup) bool
RemoveIncomingSourceGroup(group native.RTPIncomingSourceGroup) bool
GetTimeService() native.TimeService
}

// NewIncomingStream Create new incoming stream
// TODO: make this public
Expand All @@ -39,7 +32,6 @@ func newIncomingStream(transport native.DTLSICETransport, receiver native.RTPRec
stream.receiver = receiver
stream.tracks = make(map[string]*IncomingStreamTrack)

stream.onStopListeners = make([]func(), 0)
stream.onStreamAddIncomingTrackListeners = make([]func(*IncomingStreamTrack), 0)

for _, track := range info.GetTracks() {
Expand Down Expand Up @@ -79,13 +71,15 @@ func (i *IncomingStream) GetStats() map[string]map[string]*IncomingAllStats {

// GetTrack Get track by id
func (i *IncomingStream) GetTrack(trackID string) *IncomingStreamTrack {
i.Lock()
defer i.Unlock()
i.l.Lock()
defer i.l.Unlock()
return i.tracks[trackID]
}

// GetTracks Get all tracks in this stream
func (i *IncomingStream) GetTracks() []*IncomingStreamTrack {
i.l.Lock()
defer i.l.Unlock()
tracks := []*IncomingStreamTrack{}
for _, track := range i.tracks {
tracks = append(tracks, track)
Expand All @@ -95,6 +89,8 @@ func (i *IncomingStream) GetTracks() []*IncomingStreamTrack {

// GetAudioTracks get all audio tracks
func (i *IncomingStream) GetAudioTracks() []*IncomingStreamTrack {
i.l.Lock()
defer i.l.Unlock()
audioTracks := []*IncomingStreamTrack{}
for _, track := range i.tracks {
if strings.ToLower(track.GetMedia()) == "audio" {
Expand All @@ -106,6 +102,8 @@ func (i *IncomingStream) GetAudioTracks() []*IncomingStreamTrack {

// GetVideoTracks get all video tracks
func (i *IncomingStream) GetVideoTracks() []*IncomingStreamTrack {
i.l.Lock()
defer i.l.Unlock()
videoTracks := []*IncomingStreamTrack{}
for _, track := range i.tracks {
if strings.ToLower(track.GetMedia()) == "video" {
Expand All @@ -118,8 +116,8 @@ func (i *IncomingStream) GetVideoTracks() []*IncomingStreamTrack {
// AddTrack Adds an incoming stream track created using the Transpocnder.CreateIncomingStreamTrack to this stream
func (i *IncomingStream) AddTrack(track *IncomingStreamTrack) error {

i.Lock()
defer i.Unlock()
i.l.Lock()
defer i.l.Unlock()
if _, ok := i.tracks[track.GetID()]; ok {
return errors.New("Track id already present in stream")
}
Expand All @@ -130,7 +128,10 @@ func (i *IncomingStream) AddTrack(track *IncomingStreamTrack) error {

func (i *IncomingStream) RemoveTrack(track *IncomingStreamTrack) error {

// TODO
i.l.Lock()
defer i.l.Unlock()

delete(i.tracks,track.GetID())
return nil
}

Expand Down Expand Up @@ -268,36 +269,30 @@ func (i *IncomingStream) CreateTrack(track *sdp.TrackInfo) *IncomingStreamTrack

incomingTrack := NewIncomingStreamTrack(track.GetMedia(), track.GetID(), i.receiver, sources)

i.Lock()
i.l.Lock()
i.tracks[track.GetID()] = incomingTrack
i.Unlock()

for _, ontrack := range i.onStreamAddIncomingTrackListeners {
ontrack(incomingTrack)
}
i.l.Unlock()

return incomingTrack
}

// OnTrack register addtrack listener
func (i *IncomingStream) OnTrack(ontrack func(*IncomingStreamTrack)) {
i.onStreamAddIncomingTrackListeners = append(i.onStreamAddIncomingTrackListeners, ontrack)
}

// Stop Removes the media strem from the transport and also detaches from any attached incoming stream
func (i *IncomingStream) Stop() {

if i.transport == nil {
return
}

i.l.Lock()
defer i.l.Unlock()

for k, track := range i.tracks {
track.Stop()
i.Lock()
delete(i.tracks, k)
i.Unlock()
}


native.DeleteRTPReceiverFacade(i.receiver) // other module maybe need delete
i.receiver = nil
i.transport = nil
}
2 changes: 0 additions & 2 deletions incomingstreamtrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,7 @@ func getStatsFromIncomingSource(source native.RTPIncomingSource) *IncomingStats
}

for _, layer2 := range individual {

if layer2.SpatialLayerId <= aggregated.SpatialLayerId && layer2.TemporalLayerId <= aggregated.TemporalLayerId {

aggregated.TotalBytes += layer2.TotalBytes
aggregated.NumPackets += layer2.NumPackets
aggregated.Bitrate += layer2.Bitrate
Expand Down
43 changes: 20 additions & 23 deletions outgoingstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ type OutgoingStream struct {
muted bool
tracks map[string]*OutgoingStreamTrack
onStopListeners []func()
onMuteListeners []func(bool)
onAddTrackListeners []func(*OutgoingStreamTrack)
sync.Mutex
l sync.Mutex
}

// NewOutgoingStream create outgoing stream
Expand All @@ -35,7 +34,6 @@ func NewOutgoingStream(transport native.DTLSICETransport, info *sdp.StreamInfo)
}

stream.onStopListeners = make([]func(), 0)
stream.onMuteListeners = make([]func(bool), 0)
stream.onAddTrackListeners = make([]func(*OutgoingStreamTrack), 0)

return stream
Expand Down Expand Up @@ -70,9 +68,6 @@ func (o *OutgoingStream) Mute(muting bool) {

if o.muted != muting {
o.muted = muting
for _, muteFunc := range o.onMuteListeners {
muteFunc(muting)
}
}
}

Expand Down Expand Up @@ -116,26 +111,29 @@ func (o *OutgoingStream) AttachTo(incomingStream *IncomingStream) []*Transponder
// Detach Stop listening for media
func (o *OutgoingStream) Detach() {

o.l.Lock()
defer o.l.Unlock()
for _, track := range o.tracks {
track.Detach()
}
}

// GetStreamInfo get the stream info
func (o *OutgoingStream) GetStreamInfo() *sdp.StreamInfo {

return o.info
}

// GetTrack get one track
func (o *OutgoingStream) GetTrack(trackID string) *OutgoingStreamTrack {
o.Lock()
defer o.Unlock()
o.l.Lock()
defer o.l.Unlock()
return o.tracks[trackID]
}

// GetTracks get all the tracks
func (o *OutgoingStream) GetTracks() []*OutgoingStreamTrack {
o.l.Lock()
defer o.l.Unlock()
tracks := []*OutgoingStreamTrack{}
for _, track := range o.tracks {
tracks = append(tracks, track)
Expand All @@ -145,6 +143,8 @@ func (o *OutgoingStream) GetTracks() []*OutgoingStreamTrack {

// GetAudioTracks Get an array of the media stream audio tracks
func (o *OutgoingStream) GetAudioTracks() []*OutgoingStreamTrack {
o.l.Lock()
defer o.l.Unlock()
audioTracks := []*OutgoingStreamTrack{}
for _, track := range o.tracks {
if strings.ToLower(track.GetMedia()) == "audio" {
Expand All @@ -156,6 +156,8 @@ func (o *OutgoingStream) GetAudioTracks() []*OutgoingStreamTrack {

// GetVideoTracks Get an array of the media stream video tracks
func (o *OutgoingStream) GetVideoTracks() []*OutgoingStreamTrack {
o.l.Lock()
defer o.l.Unlock()
videoTracks := []*OutgoingStreamTrack{}
for _, track := range o.tracks {
if strings.ToLower(track.GetMedia()) == "video" {
Expand All @@ -167,9 +169,8 @@ func (o *OutgoingStream) GetVideoTracks() []*OutgoingStreamTrack {

// AddTrack add one outgoing track
func (o *OutgoingStream) AddTrack(track *OutgoingStreamTrack) {

o.Lock()
defer o.Unlock()
o.l.Lock()
defer o.l.Unlock()

if _, ok := o.tracks[track.GetID()]; ok {
return
Expand All @@ -178,7 +179,10 @@ func (o *OutgoingStream) AddTrack(track *OutgoingStreamTrack) {
}

func (o *OutgoingStream) RemoveTrack(track *OutgoingStreamTrack) {
// TODO
o.l.Lock()
defer o.l.Unlock()

delete(o.tracks, track.GetID())
}

// CreateTrack Create new track from a TrackInfo object and add it to this stream
Expand Down Expand Up @@ -216,13 +220,14 @@ func (o *OutgoingStream) CreateTrack(track *sdp.TrackInfo) *OutgoingStreamTrack

outgoingTrack := newOutgoingStreamTrack(track.GetMedia(), track.GetID(), native.TransportToSender(o.transport), source)

// TODO
// runtime.SetFinalizer(source, func(source native.RTPOutgoingSourceGroup) {
// o.transport.RemoveOutgoingSourceGroup(source)
// })

o.Lock()
o.l.Lock()
o.tracks[outgoingTrack.GetID()] = outgoingTrack
o.Unlock()
o.l.Unlock()

for _, addTrackFunc := range o.onAddTrackListeners {
addTrackFunc(outgoingTrack)
Expand All @@ -236,10 +241,6 @@ func (o *OutgoingStream) OnTrack(listener func(*OutgoingStreamTrack)) {
o.onAddTrackListeners = append(o.onAddTrackListeners, listener)
}

// OnMute register onmute listener
func (o *OutgoingStream) OnMute(listener func(bool)) {
o.onMuteListeners = append(o.onMuteListeners, listener)
}

// Stop stop the remote stream
func (o *OutgoingStream) Stop() {
Expand All @@ -252,10 +253,6 @@ func (o *OutgoingStream) Stop() {
track.Stop()
}

for _, stopFunc := range o.onStopListeners {
stopFunc()
}

o.tracks = make(map[string]*OutgoingStreamTrack, 0)

o.transport = nil
Expand Down
10 changes: 0 additions & 10 deletions transponder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type Transponder struct {
temporalLayerId int
maxSpatialLayerId int
maxTemporalLayerId int
onMuteListeners []func(bool)
onStopListeners []func()
}

Expand All @@ -42,7 +41,6 @@ func NewTransponder(transponderFacade native.RTPStreamTransponderFacade) *Transp
transponder.maxSpatialLayerId = MaxLayerId
transponder.maxTemporalLayerId = MaxLayerId

transponder.onMuteListeners = make([]func(bool), 0)
transponder.onStopListeners = make([]func(), 0)

return transponder
Expand Down Expand Up @@ -109,10 +107,6 @@ func (t *Transponder) Mute(muting bool) {
if t.transponder != nil {
t.transponder.Mute(muting)
}

for _, mutefunc := range t.onMuteListeners {
mutefunc(muting)
}
}
}

Expand Down Expand Up @@ -302,10 +296,6 @@ func (t *Transponder) SetMaximumLayers(maxSpatialLayerId, maxTemporalLayerId int
t.maxTemporalLayerId = maxTemporalLayerId
}

// OnMute register mute listener
func (t *Transponder) OnMute(listener func(bool)) {
t.onMuteListeners = append(t.onMuteListeners, listener)
}

// Stop stop this transponder
func (t *Transponder) Stop() {
Expand Down
17 changes: 8 additions & 9 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Transport struct {
incomingStreamTracks map[string]*IncomingStreamTrack
outgoingStreamTracks map[string]*OutgoingStreamTrack

iceStats ICEStats
iceStats *ICEStats

senderSideListener senderSideEstimatorListener
dtlsICEListener dtlsICETransportListener
Expand Down Expand Up @@ -133,7 +133,7 @@ func NewTransport(bundle native.RTPBundleTransport, remoteIce *sdp.ICEInfo, remo
transport.connection = bundle.AddICETransport(transport.username, properties)
transport.transport = transport.connection.GetTransport()

transport.iceStats = ICEStats{}
transport.iceStats = &ICEStats{}

native.DeletePropertiesFacade(properties)

Expand Down Expand Up @@ -206,15 +206,14 @@ func (t *Transport) GetDTLSState() string {
}

// GetICEStats get ice stats
func (t *Transport) GetICEStats() ICEStats {
func (t *Transport) GetICEStats() *ICEStats {

iceStats := ICEStats{}
iceStats.RequestsSent = t.connection.GetIceRequestsSent()
iceStats.RequestsReceived = t.connection.GetIceRequestsReceived()
iceStats.ResponsesSent = t.connection.GetIceResponsesSent()
iceStats.ResponsesReceived = t.connection.GetIceResponsesReceived()
t.iceStats.RequestsSent = t.connection.GetIceRequestsSent()
t.iceStats.RequestsReceived = t.connection.GetIceRequestsReceived()
t.iceStats.ResponsesSent = t.connection.GetIceResponsesSent()
t.iceStats.ResponsesReceived = t.connection.GetIceResponsesReceived()

return iceStats
return t.iceStats
}

// SetRemoteProperties Set remote RTP properties
Expand Down
1 change: 1 addition & 0 deletions wrapper/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package native


/*
#cgo CFLAGS: -Wno-deprecated
#cgo CXXFLAGS: -std=c++1z
#cgo CPPFLAGS: -I${SRCDIR}/../thirdparty/openssl/build/include/
#cgo CPPFLAGS: -I${SRCDIR}/../thirdparty/libsrtp/build/include/
Expand Down

0 comments on commit 1b12c93

Please sign in to comment.