From c68e3cafe4e385270080c1b0ede0132bdd79657a Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Thu, 3 Jul 2025 23:35:58 +0200 Subject: [PATCH 01/10] fixes doorbird backchannel audio: - proper session handling - honor http status codes - prevent device from being flooded by limiting concurrent audio channels --- pkg/doorbird/backchannel.go | 64 +++++++++++++++++++++++++++----- pkg/doorbird/backchannel_lock.go | 5 +++ 2 files changed, 60 insertions(+), 9 deletions(-) create mode 100644 pkg/doorbird/backchannel_lock.go diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 82379383d..82ea31b4b 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -1,21 +1,32 @@ package doorbird import ( + "bufio" "fmt" "net" "net/url" + "strconv" + "strings" "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" ) +var ( + clt Client +) + type Client struct { core.Connection conn net.Conn } func Dial(rawURL string) (*Client, error) { + if clt.conn != nil { + return &clt, nil + } + u, err := url.Parse(rawURL) if err != nil { return nil, err @@ -45,6 +56,23 @@ func Dial(rawURL string) (*Client, error) { return nil, err } + reader := bufio.NewReader(conn) + statusLine, _ := reader.ReadString('\n') + parts := strings.SplitN(statusLine, " ", 3) + if len(parts) >= 2 { + statusCode, err := strconv.Atoi(parts[1]) + if err == nil { + if statusCode == 204 { + conn.Close() + return nil, fmt.Errorf("DoorBird user has no api permission: %d", statusCode) + } + if statusCode == 503 { + conn.Close() + return nil, fmt.Errorf("DoorBird device is busy: %d", statusCode) + } + } + } + medias := []*core.Media{ { Kind: core.KindAudio, @@ -55,17 +83,19 @@ func Dial(rawURL string) (*Client, error) { }, } - return &Client{ + clt = Client{ core.Connection{ ID: core.NewID(), FormatName: "doorbird", Protocol: "http", URL: rawURL, Medias: medias, - Transport: conn, + // Transport: conn, }, conn, - }, nil + } + + return &clt, nil } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { @@ -73,12 +103,18 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + if len(c.Senders) > 0 { + return fmt.Errorf("DoorBird backchannel already in use") + } + sender := core.NewSender(media, track.Codec) sender.Handler = func(pkt *rtp.Packet) { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(pkt.Payload); err == nil { - c.Send += n + if c.conn != nil { + _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := c.conn.Write(pkt.Payload); err == nil { + c.Send += n + } } } @@ -87,7 +123,17 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece return nil } -func (c *Client) Start() (err error) { - _, err = c.conn.Read(nil) - return +func (c *Client) Start() error { + if c.conn == nil { + return nil + } + buf := make([]byte, 1) + for { + _, err := c.conn.Read(buf) + if err != nil { + c.conn.Close() + c.conn = nil + return err + } + } } diff --git a/pkg/doorbird/backchannel_lock.go b/pkg/doorbird/backchannel_lock.go new file mode 100644 index 000000000..758320dc0 --- /dev/null +++ b/pkg/doorbird/backchannel_lock.go @@ -0,0 +1,5 @@ +package doorbird + +import "sync" + +var backchannelMu sync.Mutex From e00d211619c437d1cbe920d7f08cbc558ef71c56 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Sun, 6 Jul 2025 22:33:25 +0200 Subject: [PATCH 02/10] ensure that doorbird errors where shown in logs --- pkg/doorbird/backchannel.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 82ea31b4b..d338a4450 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -2,6 +2,7 @@ package doorbird import ( "bufio" + "errors" "fmt" "net" "net/url" @@ -64,11 +65,11 @@ func Dial(rawURL string) (*Client, error) { if err == nil { if statusCode == 204 { conn.Close() - return nil, fmt.Errorf("DoorBird user has no api permission: %d", statusCode) + return nil, errors.New("DoorBird user has no api permission") } if statusCode == 503 { conn.Close() - return nil, fmt.Errorf("DoorBird device is busy: %d", statusCode) + return nil, errors.New("DoorBird device is busy") } } } @@ -104,7 +105,7 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { if len(c.Senders) > 0 { - return fmt.Errorf("DoorBird backchannel already in use") + return errors.New("DoorBird backchannel already in use") } sender := core.NewSender(media, track.Codec) From 56e61a85ee7be5c87e0313ec8e0dafc29a7c5d96 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Wed, 16 Jul 2025 21:07:34 +0200 Subject: [PATCH 03/10] proper error handling cleanup files --- pkg/doorbird/backchannel.go | 26 ++++++++++---------------- pkg/doorbird/backchannel_lock.go | 5 ----- 2 files changed, 10 insertions(+), 21 deletions(-) delete mode 100644 pkg/doorbird/backchannel_lock.go diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index d338a4450..8a9a25d97 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -5,9 +5,8 @@ import ( "errors" "fmt" "net" + "net/http" "net/url" - "strconv" - "strings" "time" "github.com/AlexxIT/go2rtc/pkg/core" @@ -57,20 +56,15 @@ func Dial(rawURL string) (*Client, error) { return nil, err } - reader := bufio.NewReader(conn) - statusLine, _ := reader.ReadString('\n') - parts := strings.SplitN(statusLine, " ", 3) - if len(parts) >= 2 { - statusCode, err := strconv.Atoi(parts[1]) - if err == nil { - if statusCode == 204 { - conn.Close() - return nil, errors.New("DoorBird user has no api permission") - } - if statusCode == 503 { - conn.Close() - return nil, errors.New("DoorBird device is busy") - } + resp, _ := http.ReadResponse(bufio.NewReader(conn), nil) + if resp != nil { + switch resp.StatusCode { + case 204: + conn.Close() + return nil, errors.New("DoorBird user has no api permission") + case 503: + conn.Close() + return nil, errors.New("DoorBird device is busy") } } diff --git a/pkg/doorbird/backchannel_lock.go b/pkg/doorbird/backchannel_lock.go deleted file mode 100644 index 758320dc0..000000000 --- a/pkg/doorbird/backchannel_lock.go +++ /dev/null @@ -1,5 +0,0 @@ -package doorbird - -import "sync" - -var backchannelMu sync.Mutex From a92e04b6e0885bec723b222f5d16d1ffa35a22a1 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Tue, 22 Jul 2025 20:54:24 +0200 Subject: [PATCH 04/10] added audio mixing capability to avoid device overload when multiple backchannel audio streams are connected --- pkg/doorbird/backchannel.go | 195 +++++++++++++++++++++++++++++++++--- 1 file changed, 179 insertions(+), 16 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 8a9a25d97..51b4c1943 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -7,19 +7,140 @@ import ( "net" "net/http" "net/url" + "sync" "time" "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" "github.com/pion/rtp" ) -var ( - clt Client -) +var clt Client + +type AudioMixer struct { + mu sync.Mutex + streams map[string]chan []byte + output chan []byte + running bool +} + +func NewAudioMixer() *AudioMixer { + return &AudioMixer{ + streams: make(map[string]chan []byte), + output: make(chan []byte, 100), + } +} + +func (m *AudioMixer) AddStream(id string) chan []byte { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.running { + m.running = true + go m.mixLoop() + } + + stream := make(chan []byte, 100) + m.streams[id] = stream + return stream +} + +func (m *AudioMixer) RemoveStream(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + if stream, exists := m.streams[id]; exists { + close(stream) + delete(m.streams, id) + } +} + +func (m *AudioMixer) mixLoop() { + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + m.mu.Lock() + if len(m.streams) == 0 { + m.mu.Unlock() + continue + } + + var pcmSamples [][]int16 + activeStreams := 0 + + for _, stream := range m.streams { + select { + case data := <-stream: + if len(data) > 0 { + samples := make([]int16, len(data)) + for i, sample := range data { + samples[i] = pcm.PCMUtoPCM(sample) + } + pcmSamples = append(pcmSamples, samples) + activeStreams++ + } + default: + } + } + m.mu.Unlock() + + if activeStreams == 0 { + continue + } + + var mixedLength int + for _, samples := range pcmSamples { + if len(samples) > mixedLength { + mixedLength = len(samples) + } + } + + if mixedLength == 0 { + continue + } + + mixed := make([]int16, mixedLength) + for i := 0; i < mixedLength; i++ { + var sum int32 + var count int32 + + for _, samples := range pcmSamples { + if i < len(samples) { + sum += int32(samples[i]) + count++ + } + } + + if count > 0 { + averaged := sum / count + if averaged > 32767 { + mixed[i] = 32767 + } else if averaged < -32768 { + mixed[i] = -32768 + } else { + mixed[i] = int16(averaged) + } + } + } + + output := make([]byte, len(mixed)) + for i, sample := range mixed { + output[i] = pcm.PCMtoPCMU(sample) + } + + select { + case m.output <- output: + default: + } + } +} type Client struct { core.Connection - conn net.Conn + conn net.Conn + mixer *AudioMixer + trackMap map[*core.Sender]string } func Dial(rawURL string) (*Client, error) { @@ -85,9 +206,10 @@ func Dial(rawURL string) (*Client, error) { Protocol: "http", URL: rawURL, Medias: medias, - // Transport: conn, }, conn, + NewAudioMixer(), + make(map[*core.Sender]string), } return &clt, nil @@ -98,22 +220,35 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - if len(c.Senders) > 0 { - return errors.New("DoorBird backchannel already in use") - } - sender := core.NewSender(media, track.Codec) + trackID := fmt.Sprintf("%d", core.NewID()) + streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if c.conn != nil { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(pkt.Payload); err == nil { - c.Send += n + if c.conn != nil && len(pkt.Payload) > 0 { + select { + case streamChan <- pkt.Payload: + default: } } } - sender.HandleRTP(track) + c.trackMap[sender] = trackID + + if len(c.Senders) == 0 { + go func() { + for mixedData := range c.mixer.output { + if c.conn != nil { + _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := c.conn.Write(mixedData); err == nil { + c.Send += n + } + } + } + }() + } + + sender.WithParent(track).Start() c.Senders = append(c.Senders, sender) return nil } @@ -126,9 +261,37 @@ func (c *Client) Start() error { for { _, err := c.conn.Read(buf) if err != nil { - c.conn.Close() - c.conn = nil + c.cleanup() return err } } } + +func (c *Client) cleanup() { + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + if c.mixer != nil { + c.mixer.mu.Lock() + for id := range c.mixer.streams { + if stream, exists := c.mixer.streams[id]; exists { + close(stream) + } + } + c.mixer.streams = make(map[string]chan []byte) + close(c.mixer.output) + c.mixer.running = false + c.mixer.mu.Unlock() + } + + c.trackMap = make(map[*core.Sender]string) +} + +func (c *Client) RemoveTrack(sender *core.Sender) { + if trackID, exists := c.trackMap[sender]; exists { + c.mixer.RemoveStream(trackID) + delete(c.trackMap, sender) + } +} From 7d2ad92c4b4c426062cb48f22c38110a6cc4ce30 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Mon, 28 Jul 2025 22:27:38 +0200 Subject: [PATCH 05/10] fix app crashes remove orphaned streams --- pkg/doorbird/backchannel.go | 107 +++++++++++++++++++++++++++++++----- 1 file changed, 92 insertions(+), 15 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 51b4c1943..8cdd0136a 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -22,6 +22,7 @@ type AudioMixer struct { streams map[string]chan []byte output chan []byte running bool + closed bool } func NewAudioMixer() *AudioMixer { @@ -35,6 +36,12 @@ func (m *AudioMixer) AddStream(id string) chan []byte { m.mu.Lock() defer m.mu.Unlock() + if m.closed { + ch := make(chan []byte) + close(ch) + return ch + } + if !m.running { m.running = true go m.mixLoop() @@ -138,9 +145,11 @@ func (m *AudioMixer) mixLoop() { type Client struct { core.Connection - conn net.Conn - mixer *AudioMixer - trackMap map[*core.Sender]string + conn net.Conn + mixer *AudioMixer + trackMap map[*core.Sender]string + senderStats map[*core.Sender]time.Time + mu sync.RWMutex } func Dial(rawURL string) (*Client, error) { @@ -200,16 +209,17 @@ func Dial(rawURL string) (*Client, error) { } clt = Client{ - core.Connection{ + Connection: core.Connection{ ID: core.NewID(), FormatName: "doorbird", Protocol: "http", URL: rawURL, Medias: medias, }, - conn, - NewAudioMixer(), - make(map[*core.Sender]string), + conn: conn, + mixer: NewAudioMixer(), + trackMap: make(map[*core.Sender]string), + senderStats: make(map[*core.Sender]time.Time), } return &clt, nil @@ -220,20 +230,31 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + c.mu.Lock() + defer c.mu.Unlock() + sender := core.NewSender(media, track.Codec) trackID := fmt.Sprintf("%d", core.NewID()) streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if c.conn != nil && len(pkt.Payload) > 0 { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil && len(pkt.Payload) > 0 { select { case streamChan <- pkt.Payload: + c.mu.Lock() + c.senderStats[sender] = time.Now() + c.mu.Unlock() default: } } } c.trackMap[sender] = trackID + c.senderStats[sender] = time.Now() if len(c.Senders) == 0 { go func() { @@ -257,6 +278,15 @@ func (c *Client) Start() error { if c.conn == nil { return nil } + + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + c.cleanupOrphanedSenders() + } + }() + buf := make([]byte, 1) for { _, err := c.conn.Read(buf) @@ -268,6 +298,9 @@ func (c *Client) Start() error { } func (c *Client) cleanup() { + c.mu.Lock() + defer c.mu.Unlock() + if c.conn != nil { c.conn.Close() c.conn = nil @@ -275,23 +308,67 @@ func (c *Client) cleanup() { if c.mixer != nil { c.mixer.mu.Lock() - for id := range c.mixer.streams { - if stream, exists := c.mixer.streams[id]; exists { - close(stream) - } + c.mixer.closed = true + for id, stream := range c.mixer.streams { + close(stream) + delete(c.mixer.streams, id) + } + if c.mixer.running { + close(c.mixer.output) + c.mixer.running = false } - c.mixer.streams = make(map[string]chan []byte) - close(c.mixer.output) - c.mixer.running = false c.mixer.mu.Unlock() } c.trackMap = make(map[*core.Sender]string) + c.senderStats = make(map[*core.Sender]time.Time) +} + +func (c *Client) cleanupOrphanedSenders() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + removedCount := 0 + validIndex := 0 + + for i, sender := range c.Senders { + lastActivity, exists := c.senderStats[sender] + if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= 5*time.Second { + if trackID, exists := c.trackMap[sender]; exists { + c.mixer.RemoveStream(trackID) + delete(c.trackMap, sender) + } + delete(c.senderStats, sender) + sender.Close() + removedCount++ + } else { + c.Senders[validIndex] = c.Senders[i] + validIndex++ + } + } + + c.Senders = c.Senders[:validIndex] + + if removedCount > 0 { + fmt.Printf("DoorBird: Cleaned up %d orphaned senders, %d remain active\n", removedCount, validIndex) + } } func (c *Client) RemoveTrack(sender *core.Sender) { + c.mu.Lock() + defer c.mu.Unlock() + if trackID, exists := c.trackMap[sender]; exists { c.mixer.RemoveStream(trackID) delete(c.trackMap, sender) } + delete(c.senderStats, sender) + + for i, s := range c.Senders { + if s == sender { + c.Senders = append(c.Senders[:i], c.Senders[i+1:]...) + break + } + } } From 3d38e5e567329d24d5ad87baf0729d841eaf3ad5 Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Wed, 30 Jul 2025 23:37:06 +0200 Subject: [PATCH 06/10] fix unexpected close of backchannel streams --- pkg/doorbird/backchannel.go | 69 ++++++++++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 8cdd0136a..5e5e88348 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -15,7 +15,10 @@ import ( "github.com/pion/rtp" ) -var clt Client +var ( + cltMu sync.Mutex + cltMap = make(map[string]*Client) +) type AudioMixer struct { mu sync.Mutex @@ -68,6 +71,11 @@ func (m *AudioMixer) mixLoop() { for range ticker.C { m.mu.Lock() + if m.closed { + m.mu.Unlock() + return + } + if len(m.streams) == 0 { m.mu.Unlock() continue @@ -153,8 +161,12 @@ type Client struct { } func Dial(rawURL string) (*Client, error) { - if clt.conn != nil { - return &clt, nil + cltMu.Lock() + defer cltMu.Unlock() + + // Check if we already have a client for this URL + if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { + return existingClient, nil } u, err := url.Parse(rawURL) @@ -183,6 +195,7 @@ func Dial(rawURL string) (*Client, error) { _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) if _, err = conn.Write([]byte(s)); err != nil { + conn.Close() return nil, err } @@ -208,7 +221,7 @@ func Dial(rawURL string) (*Client, error) { }, } - clt = Client{ + client := &Client{ Connection: core.Connection{ ID: core.NewID(), FormatName: "doorbird", @@ -222,7 +235,10 @@ func Dial(rawURL string) (*Client, error) { senderStats: make(map[*core.Sender]time.Time), } - return &clt, nil + // Store the client in the map + cltMap[rawURL] = client + + return client, nil } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { @@ -238,17 +254,22 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { + if len(pkt.Payload) == 0 { + return + } + c.mu.RLock() conn := c.conn c.mu.RUnlock() - if conn != nil && len(pkt.Payload) > 0 { + if conn != nil { select { case streamChan <- pkt.Payload: c.mu.Lock() c.senderStats[sender] = time.Now() c.mu.Unlock() default: + // Channel is full, skip this packet } } } @@ -258,11 +279,24 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece if len(c.Senders) == 0 { go func() { + defer func() { + if r := recover(); r != nil { + // Recover from any panics when mixer is closed + } + }() + for mixedData := range c.mixer.output { - if c.conn != nil { - _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := c.conn.Write(mixedData); err == nil { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil && len(mixedData) > 0 { + _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := conn.Write(mixedData); err == nil { c.Send += n + } else { + // Connection failed, break out of loop + break } } } @@ -289,9 +323,15 @@ func (c *Client) Start() error { buf := make([]byte, 1) for { + // Set read deadline to detect connection issues + _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) _, err := c.conn.Read(buf) if err != nil { c.cleanup() + // Remove this client from the global map + cltMu.Lock() + delete(cltMap, c.URL) + cltMu.Unlock() return err } } @@ -320,8 +360,19 @@ func (c *Client) cleanup() { c.mixer.mu.Unlock() } + // Close all senders + for _, sender := range c.Senders { + sender.Close() + } + c.Senders = nil + c.trackMap = make(map[*core.Sender]string) c.senderStats = make(map[*core.Sender]time.Time) + + // Remove from global map + cltMu.Lock() + delete(cltMap, c.URL) + cltMu.Unlock() } func (c *Client) cleanupOrphanedSenders() { From 975a43d39276bd61ef3ed0f9c2eb1adb66b5e60d Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Thu, 31 Jul 2025 21:07:45 +0200 Subject: [PATCH 07/10] reduce audio delay by lowering buffer size --- pkg/doorbird/backchannel.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 5e5e88348..dc66ee0ef 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -15,6 +15,14 @@ import ( "github.com/pion/rtp" ) +const ( + AudioMixerInterval = 10 * time.Millisecond + AudioChannelBuffer = 10 + OutputChannelBuffer = 10 + SenderCleanupInterval = 5 * time.Second + SenderTimeoutDuration = 5 * time.Second +) + var ( cltMu sync.Mutex cltMap = make(map[string]*Client) @@ -31,7 +39,7 @@ type AudioMixer struct { func NewAudioMixer() *AudioMixer { return &AudioMixer{ streams: make(map[string]chan []byte), - output: make(chan []byte, 100), + output: make(chan []byte, OutputChannelBuffer), } } @@ -50,7 +58,7 @@ func (m *AudioMixer) AddStream(id string) chan []byte { go m.mixLoop() } - stream := make(chan []byte, 100) + stream := make(chan []byte, AudioChannelBuffer) m.streams[id] = stream return stream } @@ -66,7 +74,7 @@ func (m *AudioMixer) RemoveStream(id string) { } func (m *AudioMixer) mixLoop() { - ticker := time.NewTicker(20 * time.Millisecond) + ticker := time.NewTicker(AudioMixerInterval) defer ticker.Stop() for range ticker.C { @@ -164,7 +172,6 @@ func Dial(rawURL string) (*Client, error) { cltMu.Lock() defer cltMu.Unlock() - // Check if we already have a client for this URL if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { return existingClient, nil } @@ -235,7 +242,6 @@ func Dial(rawURL string) (*Client, error) { senderStats: make(map[*core.Sender]time.Time), } - // Store the client in the map cltMap[rawURL] = client return client, nil @@ -269,7 +275,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece c.senderStats[sender] = time.Now() c.mu.Unlock() default: - // Channel is full, skip this packet } } } @@ -281,7 +286,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece go func() { defer func() { if r := recover(); r != nil { - // Recover from any panics when mixer is closed } }() @@ -295,7 +299,6 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece if n, err := conn.Write(mixedData); err == nil { c.Send += n } else { - // Connection failed, break out of loop break } } @@ -314,7 +317,7 @@ func (c *Client) Start() error { } go func() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(SenderCleanupInterval) defer ticker.Stop() for range ticker.C { c.cleanupOrphanedSenders() @@ -323,12 +326,10 @@ func (c *Client) Start() error { buf := make([]byte, 1) for { - // Set read deadline to detect connection issues _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) _, err := c.conn.Read(buf) if err != nil { c.cleanup() - // Remove this client from the global map cltMu.Lock() delete(cltMap, c.URL) cltMu.Unlock() @@ -360,7 +361,6 @@ func (c *Client) cleanup() { c.mixer.mu.Unlock() } - // Close all senders for _, sender := range c.Senders { sender.Close() } @@ -369,7 +369,6 @@ func (c *Client) cleanup() { c.trackMap = make(map[*core.Sender]string) c.senderStats = make(map[*core.Sender]time.Time) - // Remove from global map cltMu.Lock() delete(cltMap, c.URL) cltMu.Unlock() @@ -385,7 +384,7 @@ func (c *Client) cleanupOrphanedSenders() { for i, sender := range c.Senders { lastActivity, exists := c.senderStats[sender] - if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= 5*time.Second { + if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= SenderTimeoutDuration { if trackID, exists := c.trackMap[sender]; exists { c.mixer.RemoveStream(trackID) delete(c.trackMap, sender) From f2242e31c8d3757e589b8a7dff0e4b1ae8ab66fe Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Tue, 19 Aug 2025 07:53:10 +0200 Subject: [PATCH 08/10] impove connection timeout to prevent reconnections after 30 seconds --- pkg/doorbird/backchannel.go | 41 +++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index dc66ee0ef..a49130e5b 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -21,6 +21,8 @@ const ( OutputChannelBuffer = 10 SenderCleanupInterval = 5 * time.Second SenderTimeoutDuration = 5 * time.Second + ConnectionReadTimeout = 5 * time.Minute + HeartbeatInterval = 30 * time.Second ) var ( @@ -244,6 +246,8 @@ func Dial(rawURL string) (*Client, error) { cltMap[rawURL] = client + fmt.Printf("DoorBird: New connection established to %s\n", rawURL) + return client, nil } @@ -299,6 +303,7 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece if n, err := conn.Write(mixedData); err == nil { c.Send += n } else { + fmt.Printf("DoorBird: Write error, breaking audio loop: %v\n", err) break } } @@ -324,17 +329,47 @@ func (c *Client) Start() error { } }() + // Start a heartbeat goroutine to periodically check connection health + go func() { + heartbeat := time.NewTicker(HeartbeatInterval) + defer heartbeat.Stop() + + for range heartbeat.C { + c.mu.RLock() + conn := c.conn + c.mu.RUnlock() + + if conn != nil { + // Try to write a small amount of silence to keep connection alive + silence := make([]byte, 160) // 20ms of silence at 8kHz + _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if _, err := conn.Write(silence); err != nil { + fmt.Printf("DoorBird: Heartbeat write failed: %v\n", err) + // Don't break here, let the main read loop handle it + } + } + } + }() + + // The main loop now just monitors for any unexpected data or connection errors + // DoorBird typically doesn't send data back, so we use a very long timeout buf := make([]byte, 1) + connectionStart := time.Now() for { - _ = c.conn.SetReadDeadline(time.Now().Add(30 * time.Second)) - _, err := c.conn.Read(buf) + _ = c.conn.SetReadDeadline(time.Now().Add(ConnectionReadTimeout)) + n, err := c.conn.Read(buf) if err != nil { + elapsed := time.Since(connectionStart) + fmt.Printf("DoorBird: Connection failed after %v, error: %v\n", elapsed, err) c.cleanup() cltMu.Lock() delete(cltMap, c.URL) cltMu.Unlock() return err } + if n > 0 { + fmt.Printf("DoorBird: Unexpected data received: %v\n", buf[:n]) + } } } @@ -342,6 +377,8 @@ func (c *Client) cleanup() { c.mu.Lock() defer c.mu.Unlock() + fmt.Printf("DoorBird: Starting cleanup for connection %s\n", c.URL) + if c.conn != nil { c.conn.Close() c.conn = nil From 887f0f48905459d1ad6f2e94bbe33b2450980a8d Mon Sep 17 00:00:00 2001 From: Oliver Eiber Date: Sat, 4 Oct 2025 21:37:19 +0200 Subject: [PATCH 09/10] fix connection handling in conjunction with doorbird backchannel --- pkg/doorbird/backchannel.go | 397 ++---------------------------------- 1 file changed, 16 insertions(+), 381 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index a49130e5b..28eb5b693 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -1,183 +1,21 @@ package doorbird import ( - "bufio" - "errors" "fmt" "net" - "net/http" "net/url" - "sync" "time" "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/pcm" "github.com/pion/rtp" ) -const ( - AudioMixerInterval = 10 * time.Millisecond - AudioChannelBuffer = 10 - OutputChannelBuffer = 10 - SenderCleanupInterval = 5 * time.Second - SenderTimeoutDuration = 5 * time.Second - ConnectionReadTimeout = 5 * time.Minute - HeartbeatInterval = 30 * time.Second -) - -var ( - cltMu sync.Mutex - cltMap = make(map[string]*Client) -) - -type AudioMixer struct { - mu sync.Mutex - streams map[string]chan []byte - output chan []byte - running bool - closed bool -} - -func NewAudioMixer() *AudioMixer { - return &AudioMixer{ - streams: make(map[string]chan []byte), - output: make(chan []byte, OutputChannelBuffer), - } -} - -func (m *AudioMixer) AddStream(id string) chan []byte { - m.mu.Lock() - defer m.mu.Unlock() - - if m.closed { - ch := make(chan []byte) - close(ch) - return ch - } - - if !m.running { - m.running = true - go m.mixLoop() - } - - stream := make(chan []byte, AudioChannelBuffer) - m.streams[id] = stream - return stream -} - -func (m *AudioMixer) RemoveStream(id string) { - m.mu.Lock() - defer m.mu.Unlock() - - if stream, exists := m.streams[id]; exists { - close(stream) - delete(m.streams, id) - } -} - -func (m *AudioMixer) mixLoop() { - ticker := time.NewTicker(AudioMixerInterval) - defer ticker.Stop() - - for range ticker.C { - m.mu.Lock() - if m.closed { - m.mu.Unlock() - return - } - - if len(m.streams) == 0 { - m.mu.Unlock() - continue - } - - var pcmSamples [][]int16 - activeStreams := 0 - - for _, stream := range m.streams { - select { - case data := <-stream: - if len(data) > 0 { - samples := make([]int16, len(data)) - for i, sample := range data { - samples[i] = pcm.PCMUtoPCM(sample) - } - pcmSamples = append(pcmSamples, samples) - activeStreams++ - } - default: - } - } - m.mu.Unlock() - - if activeStreams == 0 { - continue - } - - var mixedLength int - for _, samples := range pcmSamples { - if len(samples) > mixedLength { - mixedLength = len(samples) - } - } - - if mixedLength == 0 { - continue - } - - mixed := make([]int16, mixedLength) - for i := 0; i < mixedLength; i++ { - var sum int32 - var count int32 - - for _, samples := range pcmSamples { - if i < len(samples) { - sum += int32(samples[i]) - count++ - } - } - - if count > 0 { - averaged := sum / count - if averaged > 32767 { - mixed[i] = 32767 - } else if averaged < -32768 { - mixed[i] = -32768 - } else { - mixed[i] = int16(averaged) - } - } - } - - output := make([]byte, len(mixed)) - for i, sample := range mixed { - output[i] = pcm.PCMtoPCMU(sample) - } - - select { - case m.output <- output: - default: - } - } -} - type Client struct { core.Connection - conn net.Conn - mixer *AudioMixer - trackMap map[*core.Sender]string - senderStats map[*core.Sender]time.Time - mu sync.RWMutex + conn net.Conn } func Dial(rawURL string) (*Client, error) { - cltMu.Lock() - defer cltMu.Unlock() - - if existingClient, exists := cltMap[rawURL]; exists && existingClient.conn != nil { - return existingClient, nil - } - u, err := url.Parse(rawURL) if err != nil { return nil, err @@ -204,22 +42,9 @@ func Dial(rawURL string) (*Client, error) { _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) if _, err = conn.Write([]byte(s)); err != nil { - conn.Close() return nil, err } - resp, _ := http.ReadResponse(bufio.NewReader(conn), nil) - if resp != nil { - switch resp.StatusCode { - case 204: - conn.Close() - return nil, errors.New("DoorBird user has no api permission") - case 503: - conn.Close() - return nil, errors.New("DoorBird device is busy") - } - } - medias := []*core.Media{ { Kind: core.KindAudio, @@ -230,25 +55,17 @@ func Dial(rawURL string) (*Client, error) { }, } - client := &Client{ - Connection: core.Connection{ + return &Client{ + core.Connection{ ID: core.NewID(), FormatName: "doorbird", Protocol: "http", URL: rawURL, Medias: medias, + Transport: conn, }, - conn: conn, - mixer: NewAudioMixer(), - trackMap: make(map[*core.Sender]string), - senderStats: make(map[*core.Sender]time.Time), - } - - cltMap[rawURL] = client - - fmt.Printf("DoorBird: New connection established to %s\n", rawURL) - - return client, nil + conn, + }, nil } func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { @@ -256,206 +73,24 @@ func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, } func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { - c.mu.Lock() - defer c.mu.Unlock() - sender := core.NewSender(media, track.Codec) - trackID := fmt.Sprintf("%d", core.NewID()) - streamChan := c.mixer.AddStream(trackID) sender.Handler = func(pkt *rtp.Packet) { - if len(pkt.Payload) == 0 { - return - } - - c.mu.RLock() - conn := c.conn - c.mu.RUnlock() - - if conn != nil { - select { - case streamChan <- pkt.Payload: - c.mu.Lock() - c.senderStats[sender] = time.Now() - c.mu.Unlock() - default: - } + _ = c.conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) + if n, err := c.conn.Write(pkt.Payload); err == nil { + c.Send += n } } - c.trackMap[sender] = trackID - c.senderStats[sender] = time.Now() - - if len(c.Senders) == 0 { - go func() { - defer func() { - if r := recover(); r != nil { - } - }() - - for mixedData := range c.mixer.output { - c.mu.RLock() - conn := c.conn - c.mu.RUnlock() - - if conn != nil && len(mixedData) > 0 { - _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if n, err := conn.Write(mixedData); err == nil { - c.Send += n - } else { - fmt.Printf("DoorBird: Write error, breaking audio loop: %v\n", err) - break - } - } - } - }() - } - - sender.WithParent(track).Start() + sender.HandleRTP(track) c.Senders = append(c.Senders, sender) return nil } -func (c *Client) Start() error { - if c.conn == nil { - return nil - } - - go func() { - ticker := time.NewTicker(SenderCleanupInterval) - defer ticker.Stop() - for range ticker.C { - c.cleanupOrphanedSenders() - } - }() - - // Start a heartbeat goroutine to periodically check connection health - go func() { - heartbeat := time.NewTicker(HeartbeatInterval) - defer heartbeat.Stop() - - for range heartbeat.C { - c.mu.RLock() - conn := c.conn - c.mu.RUnlock() - - if conn != nil { - // Try to write a small amount of silence to keep connection alive - silence := make([]byte, 160) // 20ms of silence at 8kHz - _ = conn.SetWriteDeadline(time.Now().Add(core.ConnDeadline)) - if _, err := conn.Write(silence); err != nil { - fmt.Printf("DoorBird: Heartbeat write failed: %v\n", err) - // Don't break here, let the main read loop handle it - } - } - } - }() - - // The main loop now just monitors for any unexpected data or connection errors - // DoorBird typically doesn't send data back, so we use a very long timeout - buf := make([]byte, 1) - connectionStart := time.Now() - for { - _ = c.conn.SetReadDeadline(time.Now().Add(ConnectionReadTimeout)) - n, err := c.conn.Read(buf) - if err != nil { - elapsed := time.Since(connectionStart) - fmt.Printf("DoorBird: Connection failed after %v, error: %v\n", elapsed, err) - c.cleanup() - cltMu.Lock() - delete(cltMap, c.URL) - cltMu.Unlock() - return err - } - if n > 0 { - fmt.Printf("DoorBird: Unexpected data received: %v\n", buf[:n]) - } - } -} - -func (c *Client) cleanup() { - c.mu.Lock() - defer c.mu.Unlock() - - fmt.Printf("DoorBird: Starting cleanup for connection %s\n", c.URL) - - if c.conn != nil { - c.conn.Close() - c.conn = nil - } - - if c.mixer != nil { - c.mixer.mu.Lock() - c.mixer.closed = true - for id, stream := range c.mixer.streams { - close(stream) - delete(c.mixer.streams, id) - } - if c.mixer.running { - close(c.mixer.output) - c.mixer.running = false - } - c.mixer.mu.Unlock() - } - - for _, sender := range c.Senders { - sender.Close() - } - c.Senders = nil - - c.trackMap = make(map[*core.Sender]string) - c.senderStats = make(map[*core.Sender]time.Time) - - cltMu.Lock() - delete(cltMap, c.URL) - cltMu.Unlock() -} - -func (c *Client) cleanupOrphanedSenders() { - c.mu.Lock() - defer c.mu.Unlock() - - now := time.Now() - removedCount := 0 - validIndex := 0 - - for i, sender := range c.Senders { - lastActivity, exists := c.senderStats[sender] - if sender.State() == "closed" || !exists || now.Sub(lastActivity) >= SenderTimeoutDuration { - if trackID, exists := c.trackMap[sender]; exists { - c.mixer.RemoveStream(trackID) - delete(c.trackMap, sender) - } - delete(c.senderStats, sender) - sender.Close() - removedCount++ - } else { - c.Senders[validIndex] = c.Senders[i] - validIndex++ - } - } - - c.Senders = c.Senders[:validIndex] - - if removedCount > 0 { - fmt.Printf("DoorBird: Cleaned up %d orphaned senders, %d remain active\n", removedCount, validIndex) - } -} - -func (c *Client) RemoveTrack(sender *core.Sender) { - c.mu.Lock() - defer c.mu.Unlock() - - if trackID, exists := c.trackMap[sender]; exists { - c.mixer.RemoveStream(trackID) - delete(c.trackMap, sender) - } - delete(c.senderStats, sender) - - for i, s := range c.Senders { - if s == sender { - c.Senders = append(c.Senders[:i], c.Senders[i+1:]...) - break - } - } +func (c *Client) Start() (err error) { + _, err = c.conn.Read(nil) + // just block until c.conn closed + b := make([]byte, 1) + _, _ = c.conn.Read(b) + return } From 94b7c33485ec29052c6521e4646de9ea6162a438 Mon Sep 17 00:00:00 2001 From: Alex X Date: Sun, 5 Oct 2025 16:00:58 +0300 Subject: [PATCH 10/10] Update backchannel.go Code refactoring for #1895 --- pkg/doorbird/backchannel.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/doorbird/backchannel.go b/pkg/doorbird/backchannel.go index 28eb5b693..4d2522283 100644 --- a/pkg/doorbird/backchannel.go +++ b/pkg/doorbird/backchannel.go @@ -88,9 +88,8 @@ func (c *Client) AddTrack(media *core.Media, codec *core.Codec, track *core.Rece } func (c *Client) Start() (err error) { - _, err = c.conn.Read(nil) // just block until c.conn closed b := make([]byte, 1) - _, _ = c.conn.Read(b) + _, err = c.conn.Read(b) return }