Skip to content

Conversation

@sethyx
Copy link
Contributor

@sethyx sethyx commented Nov 17, 2025

Summary

This PR fixes critical concurrency issues, memory leaks, and goroutine lifecycle problems in core packet forwarding and Tapo client implementation.

Problems Fixed

1. Data Races in Packet Processing

  • Symptom: Deadlocks under high load with multiple consumers
  • Cause: Receiver.Input iterated over children without lock while other threads modified the slice
  • Fix: Lock-and-copy pattern for safe iteration

2. Ghost Children Accumulation

  • Symptom: Producers not stopping when consumers disconnect, memory leaks
  • Cause: Closed child nodes remained in parent's children list
  • Fix: Active ghost cleanup with cleanGhostChildren(), closed state tracking

3. Orphaned Senders

  • Symptom: Tapo streams couldn't be reused after disconnect
  • Cause: Backchannel sender kept alive with no parent reference
  • Fix: Clear sender reference on Stop()

4. Uncancellable Goroutines

  • Symptom: Tapo Handle() and producer workers stuck running after Stop()
  • Cause: No cancellation mechanism, blocking I/O couldn't be interrupted
  • Fix: Context cancellation, stop channels, monitor goroutines

See screenshot below for stuck Tapo streams that kept sending data without any consumers:

Screenshot 2025-11-17 at 20 32 08

Changes

Commit 1: Core Concurrency Fixes

  • pkg/core/track.go: Lock-and-copy in Receiver.Input, thread-safe HasSenders()
  • pkg/core/node.go: Protected parent/child access with locks
  • internal/streams/stream.go: Use new thread-safe HasSenders()

Commit 2: Ghost Children Prevention

  • pkg/core/node.go: Add closed flag, cleanGhostChildren(), prevent append to closed parent
  • pkg/core/track.go: Call ghost cleanup in HasSenders()
  • pkg/tapo/producer.go: Clear sender reference

Commit 3: Goroutine Lifecycle Management

  • pkg/tapo/client.go: Add context cancellation, monitor goroutine, safe Close()
  • pkg/tapo/backchannel.go: Context checks before operations
  • internal/streams/producer.go: Stop channel for worker termination

- Add lock-and-copy pattern in Receiver.Input to prevent data races
- Protect all Node parent/child operations with proper mutex locking
- Add thread-safe HasSenders() method

Eliminates data races.
- Add 'closed' flag and cleanGhostChildren()
- Prevent adding children to closed parents
- Clear Tapo sender reference on Stop()

Fixes memory leaks and stuck producers.
- Add context.Context to Tapo client for graceful cancellation
- Make Handle() loop cancellable with context checks
- Implement stop channel in Producer worker

Fixes stuck Tapo connections and goroutine leaks.
@sethyx
Copy link
Contributor Author

sethyx commented Nov 19, 2025

Just to add a bit more context, I've been working on this in the past two weeks and have been running a build with the final commits for several days with no issues. These bugs were not easy to troubleshoot (my test builds had tons of extra trace logs), but I could pretty easily create stuck children by rapidly enabling/disabling two-way audio in Frigate for my Tapo cameras, eg. switching back-and forth between the tapo protocol and rtsp.

This PR would likely fix strange issues like #1461, #1933, #1611.

Data races can also be checked with go run -race. There are quite a few, but I've been focusing on the core and streams packages, as well as Tapo (since I have Tapo stuff). Cherry picking a few examples:

WARNING: DATA RACE
Read at 0x00c0001f5c20 by goroutine 15:
  runtime.mapaccess2_faststr()
      /usr/local/go/src/internal/runtime/maps/runtime_faststr_swiss.go:162 +0x0
  github.com/AlexxIT/go2rtc/internal/streams.GetProducer()
      /root/go2rtc/internal/streams/handlers.go:64 +0x114
  github.com/AlexxIT/go2rtc/internal/streams.(*Producer).Dial()
      /root/go2rtc/internal/streams/producer.go:62 +0xee
  github.com/AlexxIT/go2rtc/internal/streams.(*Stream).AddConsumer()
      /root/go2rtc/internal/streams/add_consumer.go:36 +0x7f1
  github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler.func1()
      /root/go2rtc/internal/rtsp/rtsp.go:219 +0x158e
  github.com/AlexxIT/go2rtc/pkg/core.(*Listener).Fire()
      /root/go2rtc/pkg/core/listener.go:16 +0x1dab
  github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).Accept()
      /root/go2rtc/pkg/rtsp/server.go:110 +0xb0b
  github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler()
      /root/go2rtc/internal/rtsp/rtsp.go:256 +0x3ca
  github.com/AlexxIT/go2rtc/internal/rtsp.Init.func1.gowrap1()
      /root/go2rtc/internal/rtsp/rtsp.go:76 +0x33

Previous write at 0x00c0001f5c20 by main goroutine:
  runtime.mapassign_faststr()
      /usr/local/go/src/internal/runtime/maps/runtime_faststr_swiss.go:263 +0x0
  github.com/AlexxIT/go2rtc/internal/streams.HandleFunc()
      /root/go2rtc/internal/streams/handlers.go:16 +0x47
  github.com/AlexxIT/go2rtc/internal/tapo.Init()
      /root/go2rtc/internal/tapo/tapo.go:11 +0x1c
  main.main()
      /root/go2rtc/main.go:107 +0x5e2
WARNING: DATA RACE
Read at 0x00c0000e0771 by goroutine 24:
  github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).packetWriter.func2()
      /root/go2rtc/pkg/rtsp/consumer.go:141 +0x595
  github.com/AlexxIT/go2rtc/pkg/core.NewSender.func2()
      /root/go2rtc/pkg/core/track.go:112 +0x48
  github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.func1()
      /root/go2rtc/pkg/core/track.go:145 +0x6a
  github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.gowrap2()
      /root/go2rtc/pkg/core/track.go:148 +0x41

Previous write at 0x00c0000e0771 by goroutine 23:
  github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).Accept()
      /root/go2rtc/pkg/rtsp/server.go:207 +0x16d5
  github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler()
      /root/go2rtc/internal/rtsp/rtsp.go:256 +0x3ca
  github.com/AlexxIT/go2rtc/internal/rtsp.Init.func1.gowrap1()
      /root/go2rtc/internal/rtsp/rtsp.go:76 +0x33
WARNING: DATA RACE
Write at 0x00c00021c1b8 by goroutine 15:
  github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).Accept()
      /root/go2rtc/pkg/rtsp/server.go:170 +0x45e
  github.com/AlexxIT/go2rtc/internal/rtsp.tcpHandler()
      /root/go2rtc/internal/rtsp/rtsp.go:256 +0x3ca
  github.com/AlexxIT/go2rtc/internal/rtsp.Init.func1.gowrap1()
      /root/go2rtc/internal/rtsp/rtsp.go:76 +0x33

Previous read at 0x00c00021c1b8 by goroutine 17:
  github.com/AlexxIT/go2rtc/pkg/rtsp.(*Conn).packetWriter.func2()
      /root/go2rtc/pkg/rtsp/consumer.go:96 +0x8b
  github.com/AlexxIT/go2rtc/pkg/core.NewSender.func2()
      /root/go2rtc/pkg/core/track.go:112 +0x48
  github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.func1()
      /root/go2rtc/pkg/core/track.go:145 +0x6a
  github.com/AlexxIT/go2rtc/pkg/core.(*Sender).Start.gowrap2()
      /root/go2rtc/pkg/core/track.go:148 +0x41

@sethyx sethyx marked this pull request as draft November 21, 2025 10:16
@AlexxIT
Copy link
Owner

AlexxIT commented Nov 23, 2025

This is a very complex and confusing part of go2rtc. I myself am confused about it and plan to rewrite it in the future.

@lcwsoft
Copy link

lcwsoft commented Nov 27, 2025

From an outsider perspective, these look to be fairly safe changes to merge and could potentially solve some odd corner case issues (like the ones mentioned by the author). I have a mix of RTSP cameras that exhibit weird issues with two way audio sometimes killing streams, and I suspect that they may fix that.

Speaking from my own dev experience, a rewrite of a portion of code that has issues is always a good thing, but its also good to address issues with the existing code that impacts users in the meantime. At the very least, it could reduce the immediate need for the improvements resulting from a rewrite.

@sethyx sethyx marked this pull request as ready for review December 3, 2025 16:31
@sethyx
Copy link
Contributor Author

sethyx commented Dec 3, 2025

Thanks for the feedback. I've updated the PR to resolve conflicts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants