Skip to content

Commit

Permalink
conference: add call_id handling
Browse files Browse the repository at this point in the history
The `call_id` does seem to be after all not the same as `conf_id`!
  • Loading branch information
daniel-abramov committed Nov 28, 2022
1 parent 0569f85 commit d845e7b
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 30 deletions.
7 changes: 4 additions & 3 deletions pkg/conference/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ import (
"github.com/pion/webrtc/v3"
"github.com/sirupsen/logrus"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)

type MessageContent interface{}

type MatrixMessage struct {
UserID id.UserID
Content MessageContent
Sender ParticipantID
Content MessageContent
RawEvent *event.Event
}

// New participant tries to join the conference.
func (c *Conference) onNewParticipant(participantID ParticipantID, inviteEvent *event.CallInviteEventContent) error {
logger := c.logger.WithFields(logrus.Fields{
"user_id": participantID.UserID,
"device_id": participantID.DeviceID,
"call_id": participantID.CallID,
})

logger.Info("Incoming call invite")
Expand Down
2 changes: 2 additions & 0 deletions pkg/conference/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type ParticipantID struct {
UserID id.UserID
DeviceID id.DeviceID
CallID string
}

// Participant represents a participant in the conference.
Expand All @@ -32,6 +33,7 @@ func (p *Participant) asMatrixRecipient() signaling.MatrixRecipient {
return signaling.MatrixRecipient{
UserID: p.id.UserID,
DeviceID: p.id.DeviceID,
CallID: p.id.CallID,
RemoteSessionID: p.remoteSessionID,
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/conference/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ func (c *Conference) handleDataChannelMessage(participant *Participant, sfuMessa
func (c *Conference) processMatrixMessage(msg MatrixMessage) {
switch ev := msg.Content.(type) {
case *event.CallInviteEventContent:
c.onNewParticipant(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev)
c.onNewParticipant(msg.Sender, ev)
case *event.CallCandidatesEventContent:
c.onCandidates(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev)
c.onCandidates(msg.Sender, ev)
case *event.CallSelectAnswerEventContent:
c.onSelectAnswer(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev)
c.onSelectAnswer(msg.Sender, ev)
case *event.CallHangupEventContent:
c.onHangup(ParticipantID{UserID: msg.UserID, DeviceID: ev.DeviceID}, ev)
c.onHangup(msg.Sender, ev)
default:
c.logger.Errorf("Unexpected event type: %T", ev)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/conference/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func StartConference(
logger: logrus.WithFields(logrus.Fields{"conf_id": confID}),
}

participantID := ParticipantID{UserID: UserID, DeviceID: inviteEvent.DeviceID}
participantID := ParticipantID{UserID: UserID, DeviceID: inviteEvent.DeviceID, CallID: inviteEvent.CallID}
if err := conference.onNewParticipant(participantID, inviteEvent); err != nil {
return nil, err
}
Expand Down
51 changes: 34 additions & 17 deletions pkg/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/matrix-org/waterfall/pkg/signaling"
"github.com/sirupsen/logrus"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)

type Conference = common.Sender[conf.MatrixMessage]
Expand Down Expand Up @@ -58,10 +59,11 @@ func newRouter(matrix *signaling.MatrixClient, config conf.Config) chan<- Router
case ConferenceEndedMessage:
// Remove the conference that ended from the list.
delete(router.conferenceSinks, msg.conferenceID)

// Process the message that was not read by the conference.
if len(msg.unread) > 0 {
// FIXME: We must handle these messages!
logrus.Warnf("Unread messages: %v", len(msg.unread))
for _, msg := range msg.unread {
// TODO: We actually already know the type, so we can do this better.
router.handleMatrixEvent(msg.RawEvent)
}
}
}
Expand All @@ -72,22 +74,36 @@ func newRouter(matrix *signaling.MatrixClient, config conf.Config) chan<- Router

// Handles incoming To-Device events that the SFU receives from clients.
func (r *Router) handleMatrixEvent(evt *event.Event) {
// Check if `conf_id` is present in the message (right messages do have it).
rawConferenceID, ok := evt.Content.Raw["conf_id"]
if !ok {
return
}
var (
conferenceID string
callID string
deviceID string
userID = evt.Sender
)

// Try to parse the conference ID without parsing the whole event.
conferenceID, ok := rawConferenceID.(string)
if !ok {
return
// Check if `conf_id` is present in the message (right messages do have it).
rawConferenceID, okConferenceId := evt.Content.Raw["conf_id"]
rawCallID, okCallId := evt.Content.Raw["call_id"]
rawDeviceID, okDeviceID := evt.Content.Raw["device_id"]

if okConferenceId && okCallId && okDeviceID {
// Extract the conference ID from the message.
conferenceID, okConferenceId = rawConferenceID.(string)
callID, okCallId = rawCallID.(string)
deviceID, okDeviceID = rawDeviceID.(string)

if !okConferenceId || !okCallId || !okDeviceID {
logrus.Warn("Ignoring invalid message without IDs")
return
}
}

logger := logrus.WithFields(logrus.Fields{
"type": evt.Type.Type,
"user_id": evt.Sender.String(),
"conf_id": conferenceID,
"type": evt.Type.Type,
"user_id": userID,
"conf_id": conferenceID,
"call_id": callID,
"device_id": deviceID,
})

conference := r.conferenceSinks[conferenceID]
Expand All @@ -101,7 +117,7 @@ func (r *Router) handleMatrixEvent(evt *event.Event) {
r.config,
r.matrix.CreateForConference(conferenceID),
createConferenceEndNotifier(conferenceID, r.channel),
evt.Sender,
userID,
evt.Content.AsCallInvite(),
)
if err != nil {
Expand All @@ -122,9 +138,10 @@ func (r *Router) handleMatrixEvent(evt *event.Event) {
// A helper function to deal with messages that can't be sent due to the conference closed.
// Not a function due to the need to capture local environment.
sendToConference := func(eventContent conf.MessageContent) {
sender := conf.ParticipantID{userID, id.DeviceID(deviceID), callID}
// At this point the conference is not nil.
// Let's check if the channel is still available.
if conference.Send(conf.MatrixMessage{UserID: evt.Sender, Content: eventContent}) != nil {
if conference.Send(conf.MatrixMessage{Content: eventContent, RawEvent: evt, Sender: sender}) != nil {
// If sending failed, then the conference is over.
delete(r.conferenceSinks, conferenceID)
// Since we were not able to send the message, let's re-process it now.
Expand Down
14 changes: 9 additions & 5 deletions pkg/signaling/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type MatrixRecipient struct {
UserID id.UserID
DeviceID id.DeviceID
RemoteSessionID id.SessionID
CallID string
}

// Interface that abstracts sending Send-to-device messages for the conference.
Expand All @@ -63,7 +64,7 @@ func (m *MatrixForConference) SendSDPAnswer(
) {
eventContent := &event.Content{
Parsed: event.CallAnswerEventContent{
BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID),
BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID),
Answer: event.CallData{
Type: "answer",
SDP: sdp,
Expand All @@ -78,7 +79,7 @@ func (m *MatrixForConference) SendSDPAnswer(
func (m *MatrixForConference) SendICECandidates(recipient MatrixRecipient, candidates []event.CallCandidate) {
eventContent := &event.Content{
Parsed: event.CallCandidatesEventContent{
BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID),
BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID),
Candidates: candidates,
},
}
Expand All @@ -89,7 +90,7 @@ func (m *MatrixForConference) SendICECandidates(recipient MatrixRecipient, candi
func (m *MatrixForConference) SendCandidatesGatheringFinished(recipient MatrixRecipient) {
eventContent := &event.Content{
Parsed: event.CallCandidatesEventContent{
BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID),
BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID),
Candidates: []event.CallCandidate{{Candidate: ""}},
},
}
Expand All @@ -100,15 +101,18 @@ func (m *MatrixForConference) SendCandidatesGatheringFinished(recipient MatrixRe
func (m *MatrixForConference) SendHangup(recipient MatrixRecipient, reason event.CallHangupReason) {
eventContent := &event.Content{
Parsed: event.CallHangupEventContent{
BaseCallEventContent: m.createBaseEventContent(recipient.RemoteSessionID),
BaseCallEventContent: m.createBaseEventContent(recipient.CallID, recipient.RemoteSessionID),
Reason: reason,
},
}

m.sendToDevice(recipient, event.CallHangup, eventContent)
}

func (m *MatrixForConference) createBaseEventContent(destSessionID id.SessionID) event.BaseCallEventContent {
func (m *MatrixForConference) createBaseEventContent(
callID string,
destSessionID id.SessionID,
) event.BaseCallEventContent {
return event.BaseCallEventContent{
CallID: m.conferenceID,
ConfID: m.conferenceID,
Expand Down

0 comments on commit d845e7b

Please sign in to comment.