Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/rtsp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func NewClient(uri string) *Conn {
ID: core.NewID(),
FormatName: "rtsp",
},
uri: uri,
uri: uri,
lastTimestamp: make(map[byte]uint32), // channel -> last timestamp
timestampOffset: make(map[byte]uint32), // channel -> constant timestamp offset after reconnect
lastSeq: make(map[byte]uint16), // channel -> last sequence number
seqOffset: make(map[byte]uint16), // channel -> constant seq offset after reconnect
}
}

Expand Down
40 changes: 40 additions & 0 deletions pkg/rtsp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type Conn struct {

udpConn []*net.UDPConn
udpAddr []*net.UDPAddr

lastTimestamp map[byte]uint32
lastSeq map[byte]uint16
timestampOffset map[byte]uint32
seqOffset map[byte]uint16
trackMu sync.Mutex
}

const (
Expand Down Expand Up @@ -293,6 +299,40 @@ func (c *Conn) handleRawPacket(channel byte, buf []byte) error {
return err
}

// Detect and fix timestamp resets after reconnect
// Some cameras (e.g., Tapo) reset RTP timestamp to 0 after reconnect,
// which causes "non monotonically increasing dts" in clients
c.trackMu.Lock()

// Calculate offset on first packet with timestamp=0 after reconnect
if packet.Timestamp == 0 && c.timestampOffset[channel] == 0 {
if lastTS := c.lastTimestamp[channel]; lastTS > 0 {
var offsetTs uint32 = 0

for _, r := range c.Receivers {
if r.ID == channel {
offsetTs = r.Codec.ClockRate / 10 // 100ms
break
}
}

c.timestampOffset[channel] = lastTS + offsetTs
}
}
if packet.SequenceNumber == 0 && c.seqOffset[channel] == 0 {
if lastSeq := c.lastSeq[channel]; lastSeq > 0 {
c.seqOffset[channel] = lastSeq + 1
}
}

packet.Timestamp += c.timestampOffset[channel]
packet.SequenceNumber += c.seqOffset[channel]

c.lastTimestamp[channel] = packet.Timestamp
c.lastSeq[channel] = packet.SequenceNumber

c.trackMu.Unlock()

for _, receiver := range c.Receivers {
if receiver.ID == channel {
receiver.WriteRTP(packet)
Expand Down
8 changes: 6 additions & 2 deletions pkg/rtsp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ func NewServer(conn net.Conn) *Conn {
Protocol: "rtsp+tcp",
RemoteAddr: conn.RemoteAddr().String(),
},
conn: conn,
reader: bufio.NewReader(conn),
conn: conn,
reader: bufio.NewReader(conn),
lastTimestamp: make(map[byte]uint32), // channel -> last timestamp
timestampOffset: make(map[byte]uint32), // channel -> constant timestamp offset after reconnect
lastSeq: make(map[byte]uint16), // channel -> last sequence number
seqOffset: make(map[byte]uint16), // channel -> constant seq offset after reconnect
}
}

Expand Down