From b0f56bee6752ed5566e3deab0698d528a913a5cb Mon Sep 17 00:00:00 2001 From: Lucas Date: Fri, 23 Dec 2022 20:50:47 +0100 Subject: [PATCH] fix: resubmit --- box/box.go | 47 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/box/box.go b/box/box.go index 83fa6d3..c8e9a17 100644 --- a/box/box.go +++ b/box/box.go @@ -35,8 +35,11 @@ var ( socketPath string socketPattern string targetMessage atomic.Int32 + sourceDevice string ) +var lastMessage = atomic.Pointer[time.Time]{} + func connectUDP(log *zap.SugaredLogger, addr string) *net.UDPConn { udpAddr, err := net.ResolveUDPAddr("udp4", addr) if err != nil { @@ -147,6 +150,10 @@ func waitAndRead(log *zap.SugaredLogger, pathfinder net.Conn, target *net.UDPCon if target == 0 { continue } + lastMessage.Store(func() *time.Time { + t := time.Now() + return &t + }()) targetMessage.Store(target) onChange(log, onChangeVal) @@ -176,12 +183,25 @@ func checkTrimmedData(trimmedData string) (target int32, onChange bool, err erro return 6, false, nil } +func resubPathfinder(log *zap.SugaredLogger, pathfinder net.Conn) (err error) { + writeTCP(log, pathfinder, fmt.Sprintf("UNSUB %s", sourceDevice)) + writeTCP(log, pathfinder, fmt.Sprintf("SUB %s", sourceDevice)) + // ensure we didn't miss a message while we were unsubbed by generating one + writeTCP(log, pathfinder, fmt.Sprintf("GET %s", sourceDevice)) + return nil +} + // Execute initializes virtual-sämbox and runs is business logic. func Execute(log *zap.SugaredLogger, sendUDP bool, targetAddr string, pathfinderAddr string, pathfinderAuth string, device string, socket bool, socketPathOpt string, socketPatternOpt string) { socketActive = socket socketPath = socketPathOpt socketPattern = socketPatternOpt + sourceDevice = device + lastMessage.Store(func() *time.Time { + t := time.Now() + return &t + }()) var target *net.UDPConn if sendUDP { @@ -196,8 +216,8 @@ func Execute(log *zap.SugaredLogger, sendUDP bool, targetAddr string, pathfinder go waitAndRead(log, pathfinder, target) writeTCP(log, pathfinder, fmt.Sprintf("LOGIN %s", pathfinderAuth)) - writeTCP(log, pathfinder, fmt.Sprintf("SUB %s", device)) - writeTCP(log, pathfinder, fmt.Sprintf("GET %s", device)) + writeTCP(log, pathfinder, fmt.Sprintf("SUB %s", sourceDevice)) + writeTCP(log, pathfinder, fmt.Sprintf("GET %s", sourceDevice)) for { if sendUDP { @@ -205,6 +225,29 @@ func Execute(log *zap.SugaredLogger, sendUDP bool, targetAddr string, pathfinder writeUDP(log, target, fmt.Sprintf("%d\r\n", targetMessage.Load())) } } + // Resubscribe every then and now just in case pathfinder rebooted on us. + // basically i'm giving up on trying to figure out how we can have stale + // connections to pathfinder when it reboots. This shows up as us not being + // subscribed to what we need anymore while the connection never gets + // terminated properly. + // I can't reprod the exact scenario because everything i try terminates the + // the connection properly (except actually rebooting the device that is). + // The code below works around all of this by resubscribing using UNSUB/SUB + // again after some time has passed. We might consider this as renewing the + // lease on the subscriptions. + // to make sure we don't spam klangbecken with "started" messages that lead + // to tracks being skipped, this logic is only triggered when we haven't + // heard from pathfinder for a bit over an hour and we don't think klangbecken + // is currently running. + // I'm hoping that this will make it so that this becomes self-healing and + // and does so as soon as we have a show that runs for more than + if time.Since(*lastMessage.Load()) > time.Minute*63 && targetMessage.Load() != 1 { + log.Info("Resubbing Pathfinder") + err := resubPathfinder(log, pathfinder) + if err != nil { + log.Fatal(err) + } + } time.Sleep(600 * time.Millisecond) } }