-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SFU Refactoring #52
SFU Refactoring #52
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is feeling like a nice refactor so far - using channels & messaging passing in favour of locks definitely feels like a step in the right direction and is good to get done while the project is still a fairly manageable size.
4fd4a70
to
6b8eee8
Compare
d845e7b
to
f358ee1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is largely looking good to me though I sometimes struggle a bit not to get lost... Perhaps a better file naming would be a bit, some of the file names aren't very descriptive from my POV.
I am also a little concerned about completely removing the subscriber/publisher model which is very useful for simulcast and possibly other things; it's also very similar to what ion-sfu and livekit does, iirc
I would also like my PR to get merged before this and for us to somehow integrate it into this one
All of the TODO are also a little concerning
Is there something that could be improved? I want to understand what makes you feel lost. Or is it just because the PR is too large? I'm definitely in favor of smaller PRs and I'm sorry that this PR is now that big: it's as big as it currently is since it's simply a complete rewrite of a previous implementation (I barely touched Perhaps it would be easier if I published it as a new repository? (since 90% of things anyway changed) Then it would be easier to review it in an editor (or an IDE) as a new SFU rather than trying to comprehend how the new changes relate to the previous implementation. (which I agree must be confusing) What I normally do for the large PRs is to split it into smaller "atomic" commits where each commit changes a single thing and explains why it was changed: that's the approach I'm in favor of and that I try to use in all projects (I've noticed that in Matrix and Element it's not always followed: many commits just alter the state of a previous one or combine unrelated things in one commit: something that I want to change!), but I think if I could present it not as one big change, but as series of well-separated commits, it would be much easier to understand (I normally rebase commits before a push and make sure they are clear, but unfortunately this time I published it earlier than I planned so that Dave can take a look at it earlier as we spoke about it).
Good point, expected you to ask about it :) So, the subscriber and publisher model does exist in the current implementation, but it does not have a separate module or file for it since for our current implementation it does not seem to be useful. I.e. I can create a function in a We may indeed need it in the future (e.g. when implementing simulcast)! And once we need it, we definitely give it a good thought and introduce it! But adding these entities right now seemed like a "premature optimization" to me (since it would pose the questions listed above) that would complicate our current (pretty trivial and simple) use case. As for the other SFUs, I'm only familiar with some internals of Janus and LiveKit. You mentioned LiveKit: yes, they have a concept of a subscription and publishing a track (but that concept also we have in our implementation 🙂), but they, like we, don't have special files or packages for So I would not say that we don't have a publisher-subscriber model. In fact, we do have it between peers and a conference (peers publish the messages to which the conference is subscribed). And we also know which tracks everyone is subscribed it and which tracks are being published by each participant (see And I'm pretty sure that with time we'll also have a more elaborated logic for this with more structures that have a publisher or subscriber-specific logic inside.
Would you like me to integrate the logic from the RTCP PR into this PR before merging it into I can do it, but I was wondering if you would like to do it yourself to get a feel of what the rewritten version looks like and how such things are implemented within the new architecture.
Yep, but there are also TODOs and FIXMEs in |
The PR being large is definitely a part of it but I feel like the names don't tell me much in some cases...
No worries about the PR being large, no reason to create a separate repo, I realize it's a large refactor and so there aren't many alternatives
Alright, let's leave it out for now and re-add it with simulcast
Yeah, I actually wanted to have a go at adding it to this PR exactly for that reason :D Might take a bit of time, but I would like to have a look tomorrow
Sure, can we just check we're not removing any functionality with this refactor (e.g. keep-alive)? |
pkg/conference/participant.go
Outdated
peer *peer.Peer[ParticipantID] | ||
remoteSessionID id.SessionID | ||
streamMetadata event.CallSDPStreamMetadata | ||
publishedTracks map[event.SFUTrackDescription]*webrtc.TrackLocalStaticRTP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we not use event.SFUTrackDescription
here, please? It doesn't feel good that the Matrix event types and internal types being coupled if they aren't exactly the same thing. Also, event.SFUTrackDescription
optionally includes resolution, so it just feels wrong to use it as a key here since we'll never want to use resolution in the key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I totally agree with it. I think the only reason I used it like this is that it was not clear what exactly we are going to use as a track identifier, but I agree that having a separate identifier for tracks makes more sense (it would also simplify the adjacent functions that compare things).
Would you prefer me to change it now or after mering the PR? - I was in favor of merging this ASAP as it would allow us to create PRs against main
. Now we're creating the PRs again in this branch and other branches on top of other branches :) I.e. me changing this thing now would conflict with changes that you did in the RTCP PR. But if you want, we can today merge your PR and this PR and then I'll make a quick change on top of main
then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, let's merge this ASAP - have made #56. I might actually have a look into that myself if I get a change before you
Pretty generic work-in-progress state.
This is a WIP state that does not work as expected!
This allows to generalize the message sink and get rid of a lot of copy-paste in the handling functions. Also this moves types to the right modules, so that `peer` is now completely matrix-unaware module that contains only plain WebRTC logic.
This seems to be more idiomatic in Go projects.
New participants are now properly handled depending on whether or not they were in the session. Also, the conference and peer get documented.
Once the track become obsolete, we unsubscribe from them.
This is a temporary solution.
Instead of calling `LocalDescription()` to get it again. It does not seem to make any difference in our particular case?
Don't include tracks in the metadata that are not yet published (for which we don't have any remote streams available). Also, inform about metadata changes once tracks get published and unpublished.
This is a temporary measure to fix the screen sharing.
Co-authored-by: Šimon Brandner <[email protected]>
Co-authored-by: Šimon Brandner <[email protected]>
Note that we start the keepalive routine once the peer is created. We do it like this since the keepalive deadline is actually quite high and I would say that if within that deadline no heartbeat messages were sent, then we can consider the connection as stalled. I.e. starting the keepalive timer only once the peer is connected is like sparing a second that a peer normally needs to establish a connection?
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
Signed-off-by: Šimon Brandner <[email protected]>
1b6743e
to
537f4c0
Compare
// The channel itself. | ||
channel chan<- M | ||
// Atomic variable that indicates whether the channel is closed. | ||
receiverClosed *atomic.Bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that this is considered a bit of a code smell in Go, since a sender should never be trying to write to a closed channel. I guess this means we have something where we can't know this and need to check each time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good that you've noticed it: tbh I did not like the part of writing this particular structure as it felt like it's not very elegant, but unfortunately I did not find a better way to write it.
The problem that I'm trying to solve is to indeed not write to the closed channel, but the problem is that from Go's standpoint the channel is not closed since the channels could only be closed in Go from the sender's side, but not from the receiver's side.
A practical example of where it happens in our code:
- There is a router in the code: a structure that simply has a map of all conferences running on the SFU. The router receives signaling messages (To-Device messages), checks their
conf_id,
and simply sends the message to the conference that is responsible for this message. So there is a channel between the router and a particular conference. - Conference is listening to the channel and reacting to the messages. But at some point, all participants leave the conference which means that the conference must be ended. So the conference goroutine and the main loop of the conference end, sending a message to the router telling "Ok, I'm done, you can remove it".
The problem occurs because at the moment the conference is considered ended, the channel remains open (the conference can't close the channel because the conference only holds a receiver part of the channel). And in Go there is no way to check if the channel is alive or if there is someone listening on the channel. So this means that the router might have sent certain messages to the conference expecting that the conference will read them whereas, in reality, the conference has stopped listening on the channel by the moment the router gets to know that the conference is dead.
To sum up:
- There is a way in Go to check if the channel is closed by the sender on the receiver's side.
- There is no way in Go to check if there is any listener on the sender's side.
- We need a way to know (on the sender side) if someone is listening to the channel (in order to know if it makes sense to post a message).
That's pretty much the problem that I tried to solve with this logic, i.e. create a wrapper around the channel where I could inform the sender that the receiver is not listening to the changes, so that if the sender tries to send the message to such a channel, it gets the message back and knows that there are no receivers anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed explanation - I've also found Go is a bit light on actually explaining how you would write correct & non-racy code sticking to the given paradigms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right! I've also spent some time trying to understand how to do things safely. In Rust, for instance, we always know if there is a sender and if there is a receiver, but in Go, an elegant implementation of this does not work because of garbage collection: the receivers are not deleted immediately after leaving the scope of a function, but only get removed when the GC kicks in which means that the lifetime of the objects is not strictly defined, hence the consequence of us not knowing when the other counterpart of a channel is dead.
messageSink chan<- Message[SenderType, MessageType] | ||
// Atomic variable that indicates whether the message sink is sealed. | ||
// This is used to prevent sending messages to a sealed message sink. | ||
// The variable is atomic because it may be accessed from multiple goroutines. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity's sake, could we define what 'sealed' means here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, will add a comment about that! Perhaps I need to rewrite the comment for this variable altogether.
func (p *Participant) sendDataChannelMessage(toSend event.SFUMessage) { | ||
jsonToSend, err := json.Marshal(toSend) | ||
if err != nil { | ||
p.logger.Error("Failed to marshal data channel message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be returning here, otherwise we'll send blank messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks! Will be fixed right away.
This closes #39 and does add structured logging, but not the framework for #50
The main idea of this refactoring was to provide us with a somewhat stable foundation on top of which we could build new features with as much compile-time safety and with some guarantees that we lower race conditions.
See #39 for a description of things that were improved and fixed.
Summary: [TODO, write some better explanation and highlights of this refactoring].
A rough diagram of how components are conceptually connected with each other:
NB: This is still work-in-progress and must be tested before merging.