From 340fd81778ce2e2d71c6707b733b76e5e412828b Mon Sep 17 00:00:00 2001 From: Alex X Date: Sat, 9 Nov 2024 18:17:41 +0300 Subject: [PATCH] Fix loop request, ex. `camera1: ffmpeg:camera1` --- internal/ffmpeg/ffmpeg.go | 2 ++ internal/rtsp/rtsp.go | 3 +++ internal/streams/add_consumer.go | 6 ++++++ pkg/core/connection.go | 5 +++++ 4 files changed, 16 insertions(+) diff --git a/internal/ffmpeg/ffmpeg.go b/internal/ffmpeg/ffmpeg.go index 12a9be83..b934be53 100644 --- a/internal/ffmpeg/ffmpeg.go +++ b/internal/ffmpeg/ffmpeg.go @@ -179,6 +179,7 @@ func parseArgs(s string) *ffmpeg.Args { Version: verAV, } + var source = s var query url.Values if i := strings.IndexByte(s, '#'); i >= 0 { query = streams.ParseQuery(s[i+1:]) @@ -221,6 +222,7 @@ func parseArgs(s string) *ffmpeg.Args { default: s += "?video&audio" } + s += "&source=ffmpeg:" + url.QueryEscape(source) args.Input = inputTemplate("rtsp", s, query) } else if i = strings.Index(s, "?"); i > 0 { switch s[:i] { diff --git a/internal/rtsp/rtsp.go b/internal/rtsp/rtsp.go index 230bdece..a4075f6c 100644 --- a/internal/rtsp/rtsp.go +++ b/internal/rtsp/rtsp.go @@ -188,6 +188,9 @@ func tcpHandler(conn *rtsp.Conn) { conn.PacketSize = uint16(core.Atoi(s)) } + // will help to protect looping requests to same source + conn.Connection.Source = query.Get("source") + if err := stream.AddConsumer(conn); err != nil { log.Warn().Err(err).Str("stream", name).Msg("[rtsp]") return diff --git a/internal/streams/add_consumer.go b/internal/streams/add_consumer.go index eb767691..d72e17ee 100644 --- a/internal/streams/add_consumer.go +++ b/internal/streams/add_consumer.go @@ -22,6 +22,12 @@ func (s *Stream) AddConsumer(cons core.Consumer) (err error) { producers: for prodN, prod := range s.producers { + // check for loop request, ex. `camera1: ffmpeg:camera1` + if info, ok := cons.(core.Info); ok && prod.url == info.GetSource() { + log.Trace().Msgf("[streams] skip cons=%d prod=%d", consN, prodN) + continue + } + if prodErrors[prodN] != nil { log.Trace().Msgf("[streams] skip cons=%d prod=%d", consN, prodN) continue diff --git a/pkg/core/connection.go b/pkg/core/connection.go index 2c3f2196..cc0f43e4 100644 --- a/pkg/core/connection.go +++ b/pkg/core/connection.go @@ -25,6 +25,7 @@ type Info interface { SetSource(string) SetURL(string) WithRequest(*http.Request) + GetSource() string } // Connection just like webrtc.PeerConnection @@ -123,6 +124,10 @@ func (c *Connection) WithRequest(r *http.Request) { c.UserAgent = r.UserAgent() } +func (c *Connection) GetSource() string { + return c.Source +} + // Create like os.Create, init Consumer with existing Transport func Create(w io.Writer) (*Connection, error) { return &Connection{Transport: w}, nil