Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resubscribe to pathfinder if we don't hear anything for a while #102

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions box/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ var (
socketPath string
socketPattern string
targetMessage atomic.Int32
sourceDevice string
)

var lastMessage = atomic.Pointer[time.Time]{}

func connectUDP(log *zap.SugaredLogger, addr string) *net.UDPConn {
udpAddr, err := net.ResolveUDPAddr("udp4", addr)
if err != nil {
Expand Down Expand Up @@ -147,6 +150,10 @@ func waitAndRead(log *zap.SugaredLogger, pathfinder net.Conn, target *net.UDPCon
if target == 0 {
continue
}
lastMessage.Store(func() *time.Time {
t := time.Now()
return &t
}())
targetMessage.Store(target)
onChange(log, onChangeVal)

Expand Down Expand Up @@ -176,12 +183,25 @@ func checkTrimmedData(trimmedData string) (target int32, onChange bool, err erro
return 6, false, nil
}

func resubPathfinder(log *zap.SugaredLogger, pathfinder net.Conn) (err error) {
writeTCP(log, pathfinder, fmt.Sprintf("UNSUB %s", sourceDevice))
writeTCP(log, pathfinder, fmt.Sprintf("SUB %s", sourceDevice))
// ensure we didn't miss a message while we were unsubbed by generating one
writeTCP(log, pathfinder, fmt.Sprintf("GET %s", sourceDevice))
return nil
}

// Execute initializes virtual-sämbox and runs is business logic.
func Execute(log *zap.SugaredLogger, sendUDP bool, targetAddr string, pathfinderAddr string, pathfinderAuth string, device string, socket bool, socketPathOpt string, socketPatternOpt string) {

socketActive = socket
socketPath = socketPathOpt
socketPattern = socketPatternOpt
sourceDevice = device
lastMessage.Store(func() *time.Time {
t := time.Now()
return &t
}())

var target *net.UDPConn
if sendUDP {
Expand All @@ -196,15 +216,38 @@ func Execute(log *zap.SugaredLogger, sendUDP bool, targetAddr string, pathfinder
go waitAndRead(log, pathfinder, target)

writeTCP(log, pathfinder, fmt.Sprintf("LOGIN %s", pathfinderAuth))
writeTCP(log, pathfinder, fmt.Sprintf("SUB %s", device))
writeTCP(log, pathfinder, fmt.Sprintf("GET %s", device))
writeTCP(log, pathfinder, fmt.Sprintf("SUB %s", sourceDevice))
writeTCP(log, pathfinder, fmt.Sprintf("GET %s", sourceDevice))

for {
if sendUDP {
if targetMessage.Load() != 0 {
writeUDP(log, target, fmt.Sprintf("%d\r\n", targetMessage.Load()))
}
}
// Resubscribe every then and now just in case pathfinder rebooted on us.
// basically i'm giving up on trying to figure out how we can have stale
// connections to pathfinder when it reboots. This shows up as us not being
// subscribed to what we need anymore while the connection never gets
// terminated properly.
// I can't reprod the exact scenario because everything i try terminates the
// the connection properly (except actually rebooting the device that is).
// The code below works around all of this by resubscribing using UNSUB/SUB
// again after some time has passed. We might consider this as renewing the
// lease on the subscriptions.
// to make sure we don't spam klangbecken with "started" messages that lead
// to tracks being skipped, this logic is only triggered when we haven't
// heard from pathfinder for a bit over an hour and we don't think klangbecken
// is currently running.
// I'm hoping that this will make it so that this becomes self-healing and
// and does so as soon as we have a show that runs for more than
if time.Since(*lastMessage.Load()) > time.Minute*63 && targetMessage.Load() != 1 {
log.Info("Resubbing Pathfinder")
err := resubPathfinder(log, pathfinder)
if err != nil {
log.Fatal(err)
}
}
time.Sleep(600 * time.Millisecond)
}
}