Skip to content

Commit 2f577d8

Browse files
committed
Add connection statistic upgrade
Interval based statistics are now available. You have to provide a Statistics object to the connection. This is a breaking change. The data will be used to calculate the interval based statistics. This allows you to apply your own interval. Also the statistics object has now 3 sub-object for the accumulated, interval, and instantaneous values.
1 parent 655cdc1 commit 2f577d8

File tree

8 files changed

+350
-194
lines changed

8 files changed

+350
-194
lines changed

connection.go

Lines changed: 111 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type Conn interface {
5353
StreamId() string
5454

5555
// Stats returns accumulated and instantaneous statistics of the connection.
56-
Stats() Statistics
56+
Stats(s *Statistics)
5757
}
5858

5959
type connStats struct {
@@ -250,6 +250,7 @@ func newSRTConn(config srtConnConfig) *srtConn {
250250

251251
c.snd = congestion.NewLiveSend(congestion.SendConfig{
252252
InitialSequenceNumber: c.initialPacketSequenceNumber,
253+
DropThreshold: c.dropThreshold,
253254
MaxBW: c.config.MaxBW,
254255
InputBW: c.config.InputBW,
255256
MinInputBW: c.config.MinInputBW,
@@ -321,7 +322,7 @@ func (c *srtConn) ticker(ctx context.Context) {
321322
tickTime := uint64(t.Sub(c.start).Microseconds())
322323

323324
c.recv.Tick(c.tsbpdTimeBase + tickTime)
324-
c.snd.Tick(tickTime, c.dropThreshold)
325+
c.snd.Tick(tickTime)
325326
}
326327
}
327328
}
@@ -915,10 +916,10 @@ func (c *srtConn) sendACK(seq circular.Number, lite bool) {
915916

916917
cif.RTT = uint32(c.rtt)
917918
cif.RTTVar = uint32(c.rttVar)
918-
cif.AvailableBufferSize = c.config.FC // TODO: available buffer size (packets)
919-
cif.PacketsReceivingRate = pps // packets receiving rate (packets/s)
920-
cif.EstimatedLinkCapacity = 0 // estimated link capacity (packets/s), not relevant for live mode
921-
cif.ReceivingRate = 0 // receiving rate (bytes/s), not relevant for live mode
919+
cif.AvailableBufferSize = c.config.FC // TODO: available buffer size (packets)
920+
cif.PacketsReceivingRate = uint32(pps) // packets receiving rate (packets/s)
921+
cif.EstimatedLinkCapacity = 0 // estimated link capacity (packets/s), not relevant for live mode
922+
cif.ReceivingRate = 0 // receiving rate (bytes/s), not relevant for live mode
922923

923924
p.Header().TypeSpecific = c.nextACKNumber.Val()
924925

@@ -1059,65 +1060,113 @@ func (c *srtConn) SetDeadline(t time.Time) error { return nil }
10591060
func (c *srtConn) SetReadDeadline(t time.Time) error { return nil }
10601061
func (c *srtConn) SetWriteDeadline(t time.Time) error { return nil }
10611062

1062-
func (c *srtConn) Stats() Statistics {
1063+
func (c *srtConn) Stats(s *Statistics) {
1064+
now := uint64(time.Since(c.start).Milliseconds())
1065+
10631066
send := c.snd.Stats()
10641067
recv := c.recv.Stats()
10651068

1066-
s := Statistics{
1067-
MsTimeStamp: uint64(time.Since(c.start).Milliseconds()),
1068-
1069-
// Accumulated
1070-
PktSent: send.Pkt,
1071-
PktRecv: recv.Pkt,
1072-
PktSentUnique: send.PktUnique,
1073-
PktRecvUnique: recv.PktUnique,
1074-
PktSndLoss: send.PktLoss,
1075-
PktRcvLoss: recv.PktLoss,
1076-
PktRetrans: send.PktRetrans,
1077-
PktRcvRetrans: recv.PktRetrans,
1078-
PktSentACK: c.statistics.pktSentACK,
1079-
PktRecvACK: c.statistics.pktRecvACK,
1080-
PktSentNAK: c.statistics.pktSentNAK,
1081-
PktRecvNAK: c.statistics.pktRecvNAK,
1082-
PktSentKM: c.statistics.pktSentKM,
1083-
PktRecvKM: c.statistics.pktRecvKM,
1084-
UsSndDuration: send.UsSndDuration,
1085-
PktSndDrop: send.PktDrop,
1086-
PktRcvDrop: recv.PktDrop,
1087-
PktRcvUndecrypt: c.statistics.pktRecvUndecrypt,
1088-
ByteSent: send.Byte + (send.Pkt * c.statistics.headerSize),
1089-
ByteRecv: recv.Byte + (recv.Pkt * c.statistics.headerSize),
1090-
ByteSentUnique: send.ByteUnique + (send.PktUnique * c.statistics.headerSize),
1091-
ByteRecvUnique: recv.ByteUnique + (recv.PktUnique * c.statistics.headerSize),
1092-
ByteRcvLoss: recv.ByteLoss + (recv.PktLoss * c.statistics.headerSize),
1093-
ByteRetrans: send.ByteRetrans + (send.PktRetrans * c.statistics.headerSize),
1094-
ByteRcvRetrans: recv.ByteRetrans + (recv.PktRetrans * c.statistics.headerSize),
1095-
ByteSndDrop: send.ByteDrop + (send.PktDrop * c.statistics.headerSize),
1096-
ByteRcvDrop: recv.ByteDrop + (recv.PktDrop * c.statistics.headerSize),
1097-
ByteRcvUndecrypt: c.statistics.byteRecvUndecrypt + (c.statistics.pktRecvUndecrypt * c.statistics.headerSize),
1098-
1099-
// Instantaneous
1100-
UsPktSndPeriod: send.UsPktSndPeriod,
1101-
PktFlowWindow: uint64(c.config.FC),
1102-
PktFlightSize: send.PktFlightSize,
1103-
MsRTT: c.rtt / 1_000,
1104-
MbpsBandwidth: 0,
1105-
MbpsLinkCapacity: 0,
1106-
ByteAvailSndBuf: 0,
1107-
ByteAvailRcvBuf: 0,
1108-
MbpsMaxBW: float64(c.config.MaxBW / 1024 / 1024),
1109-
ByteMSS: uint64(c.config.MSS),
1110-
PktSndBuf: send.PktBuf,
1111-
ByteSndBuf: send.ByteBuf,
1112-
MsSndBuf: send.MsBuf,
1113-
MsSndTsbPdDelay: c.peerTsbpdDelay / 1000,
1114-
PktRcvBuf: recv.PktBuf,
1115-
ByteRcvBuf: recv.ByteBuf,
1116-
MsRcvBuf: recv.MsBuf,
1117-
MsRcvTsbPdDelay: c.tsbpdDelay / 1000,
1118-
PktReorderTolerance: 0,
1119-
PktRcvAvgBelatedTime: 0,
1069+
previous := s.Accumulated
1070+
interval := now - s.MsTimeStamp
1071+
1072+
// Accumulated
1073+
s.Accumulated = StatisticsAccumulated{
1074+
PktSent: send.Pkt,
1075+
PktRecv: recv.Pkt,
1076+
PktSentUnique: send.PktUnique,
1077+
PktRecvUnique: recv.PktUnique,
1078+
PktSendLoss: send.PktLoss,
1079+
PktRecvLoss: recv.PktLoss,
1080+
PktRetrans: send.PktRetrans,
1081+
PktRecvRetrans: recv.PktRetrans,
1082+
PktSentACK: c.statistics.pktSentACK,
1083+
PktRecvACK: c.statistics.pktRecvACK,
1084+
PktSentNAK: c.statistics.pktSentNAK,
1085+
PktRecvNAK: c.statistics.pktRecvNAK,
1086+
PktSentKM: c.statistics.pktSentKM,
1087+
PktRecvKM: c.statistics.pktRecvKM,
1088+
UsSndDuration: send.UsSndDuration,
1089+
PktSendDrop: send.PktDrop,
1090+
PktRecvDrop: recv.PktDrop,
1091+
PktRecvUndecrypt: c.statistics.pktRecvUndecrypt,
1092+
ByteSent: send.Byte + (send.Pkt * c.statistics.headerSize),
1093+
ByteRecv: recv.Byte + (recv.Pkt * c.statistics.headerSize),
1094+
ByteSentUnique: send.ByteUnique + (send.PktUnique * c.statistics.headerSize),
1095+
ByteRecvUnique: recv.ByteUnique + (recv.PktUnique * c.statistics.headerSize),
1096+
ByteRecvLoss: recv.ByteLoss + (recv.PktLoss * c.statistics.headerSize),
1097+
ByteRetrans: send.ByteRetrans + (send.PktRetrans * c.statistics.headerSize),
1098+
ByteRecvRetrans: recv.ByteRetrans + (recv.PktRetrans * c.statistics.headerSize),
1099+
ByteSendDrop: send.ByteDrop + (send.PktDrop * c.statistics.headerSize),
1100+
ByteRecvDrop: recv.ByteDrop + (recv.PktDrop * c.statistics.headerSize),
1101+
ByteRecvUndecrypt: c.statistics.byteRecvUndecrypt + (c.statistics.pktRecvUndecrypt * c.statistics.headerSize),
1102+
}
1103+
1104+
// Interval
1105+
s.Interval = StatisticsInterval{
1106+
MsInterval: interval,
1107+
PktSent: s.Accumulated.PktSent - previous.PktSent,
1108+
PktRecv: s.Accumulated.PktRecv - previous.PktRecv,
1109+
PktSentUnique: s.Accumulated.PktSentUnique - previous.PktSentUnique,
1110+
PktRecvUnique: s.Accumulated.PktRecvUnique - previous.PktRecvUnique,
1111+
PktSendLoss: s.Accumulated.PktSendLoss - previous.PktSendLoss,
1112+
PktRecvLoss: s.Accumulated.PktRecvLoss - previous.PktRecvLoss,
1113+
PktRetrans: s.Accumulated.PktRetrans - previous.PktRetrans,
1114+
PktRecvRetrans: s.Accumulated.PktRecvRetrans - previous.PktRecvRetrans,
1115+
PktSentACK: s.Accumulated.PktSentACK - previous.PktSentACK,
1116+
PktRecvACK: s.Accumulated.PktRecvACK - previous.PktRecvACK,
1117+
PktSentNAK: s.Accumulated.PktSentNAK - previous.PktSentNAK,
1118+
PktRecvNAK: s.Accumulated.PktRecvNAK - previous.PktRecvNAK,
1119+
MbpsSendRate: float64(s.Accumulated.ByteSent-previous.ByteSent) * 8 / 1024 / 1024 / (float64(interval) / 1000),
1120+
MbpsRecvRate: float64(s.Accumulated.ByteRecv-previous.ByteRecv) * 8 / 1024 / 1024 / (float64(interval) / 1000),
1121+
UsSndDuration: s.Accumulated.UsSndDuration - previous.UsSndDuration,
1122+
PktReorderDistance: 0,
1123+
PktRecvBelated: s.Accumulated.PktRecvBelated - previous.PktRecvBelated,
1124+
PktSndDrop: s.Accumulated.PktSendDrop - previous.PktSendDrop,
1125+
PktRecvDrop: s.Accumulated.PktRecvDrop - previous.PktRecvDrop,
1126+
PktRecvUndecrypt: s.Accumulated.PktRecvUndecrypt - previous.PktRecvUndecrypt,
1127+
ByteSent: s.Accumulated.ByteSent - previous.ByteSent,
1128+
ByteRecv: s.Accumulated.ByteRecv - previous.ByteRecv,
1129+
ByteSentUnique: s.Accumulated.ByteSentUnique - previous.ByteSentUnique,
1130+
ByteRecvUnique: s.Accumulated.ByteRecvUnique - previous.ByteRecvUnique,
1131+
ByteRecvLoss: s.Accumulated.ByteRecvLoss - previous.ByteRecvLoss,
1132+
ByteRetrans: s.Accumulated.ByteRetrans - previous.ByteRetrans,
1133+
ByteRecvRetrans: s.Accumulated.ByteRecvRetrans - previous.ByteRecvRetrans,
1134+
ByteRecvBelated: s.Accumulated.ByteRecvBelated - previous.ByteRecvBelated,
1135+
ByteSendDrop: s.Accumulated.ByteSendDrop - previous.ByteSendDrop,
1136+
ByteRecvDrop: s.Accumulated.ByteRecvDrop - previous.ByteRecvDrop,
1137+
ByteRecvUndecrypt: s.Accumulated.ByteRecvUndecrypt - previous.ByteRecvUndecrypt,
1138+
}
1139+
1140+
// Instantaneous
1141+
s.Instantaneous = StatisticsInstantaneous{
1142+
UsPktSendPeriod: send.UsPktSndPeriod,
1143+
PktFlowWindow: uint64(c.config.FC),
1144+
PktFlightSize: send.PktFlightSize,
1145+
MsRTT: c.rtt / 1000,
1146+
MbpsSentRate: send.MbpsEstimatedSentBandwidth,
1147+
MbpsRecvRate: recv.MbpsEstimatedRecvBandwidth,
1148+
MbpsLinkCapacity: 0,
1149+
ByteAvailSendBuf: 0, // unlimited
1150+
ByteAvailRecvBuf: 0, // unlimited
1151+
MbpsMaxBW: float64(c.config.MaxBW) / 1024 / 1024,
1152+
ByteMSS: uint64(c.config.MSS),
1153+
PktSendBuf: send.PktBuf,
1154+
ByteSendBuf: send.ByteBuf,
1155+
MsSendBuf: send.MsBuf,
1156+
MsSendTsbPdDelay: c.peerTsbpdDelay / 1000,
1157+
PktRecvBuf: recv.PktBuf,
1158+
ByteRecvBuf: recv.ByteBuf,
1159+
MsRecvBuf: recv.MsBuf,
1160+
MsRecvTsbPdDelay: c.tsbpdDelay / 1000,
1161+
PktReorderTolerance: uint64(c.config.LossMaxTTL),
1162+
PktRecvAvgBelatedTime: 0,
1163+
PktSendLossRate: send.PktLossRate,
1164+
PktRecvLossRate: recv.PktLossRate,
1165+
}
1166+
1167+
if c.config.MaxBW < 0 {
1168+
s.Instantaneous.MbpsMaxBW = -1
11201169
}
11211170

1122-
return s
1171+
s.MsTimeStamp = now
11231172
}

contrib/client/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func main() {
132132
r.Close()
133133

134134
if srtconn, ok := r.(srt.Conn); ok {
135-
stats := srtconn.Stats()
135+
stats := &srt.Statistics{}
136+
srtconn.Stats(stats)
136137

137138
fmt.Fprintf(os.Stderr, "%+v\n", stats)
138139
}

contrib/server/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,10 @@ func (s *server) handlePublish(conn srt.Conn) {
236236

237237
s.log("PUBLISH", "STOP", u.Path, "", client)
238238

239-
fmt.Fprintf(os.Stderr, "%+v\n", conn.Stats())
239+
stats := &srt.Statistics{}
240+
conn.Stats(stats)
241+
242+
fmt.Fprintf(os.Stderr, "%+v\n", stats)
240243

241244
conn.Close()
242245
}
@@ -264,7 +267,10 @@ func (s *server) handleSubscribe(conn srt.Conn) {
264267

265268
s.log("SUBSCRIBE", "STOP", u.Path, "", client)
266269

267-
fmt.Fprintf(os.Stderr, "%+v\n", conn.Stats())
270+
stats := &srt.Statistics{}
271+
conn.Stats(stats)
272+
273+
fmt.Fprintf(os.Stderr, "%+v\n", stats)
268274

269275
conn.Close()
270276
}

dial.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ func (dl *dialer) writePacket(p packet.Packet) error {
702702
func (dl *dialer) SetDeadline(t time.Time) error { return dl.conn.SetDeadline(t) }
703703
func (dl *dialer) SetReadDeadline(t time.Time) error { return dl.conn.SetReadDeadline(t) }
704704
func (dl *dialer) SetWriteDeadline(t time.Time) error { return dl.conn.SetWriteDeadline(t) }
705-
func (dl *dialer) Stats() Statistics { return dl.conn.Stats() }
705+
func (dl *dialer) Stats(s *Statistics) { dl.conn.Stats(s) }
706706

707707
func (dl *dialer) log(topic string, message func() string) {
708708
dl.config.Logger.Print(topic, dl.socketId, 2, message)

internal/congestion/congestion.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
// SendConfig is the configuration for the liveSend congestion control
1010
type SendConfig struct {
1111
InitialSequenceNumber circular.Number
12+
DropThreshold uint64
1213
MaxBW int64
1314
InputBW int64
1415
MinInputBW int64
@@ -21,9 +22,10 @@ type Sender interface {
2122
Stats() SendStats
2223
Flush()
2324
Push(p packet.Packet)
24-
Tick(now, dropThreshold uint64)
25+
Tick(now uint64)
2526
ACK(sequenceNumber circular.Number)
2627
NAK(sequenceNumbers []circular.Number)
28+
SetDropThreshold(threshold uint64)
2729
}
2830

2931
// ReceiveConfig is the configuration for the liveResv congestion control
@@ -39,7 +41,7 @@ type ReceiveConfig struct {
3941
// Receiver is the receiving part of the congestion control
4042
type Receiver interface {
4143
Stats() ReceiveStats
42-
PacketRate() (pps, bps uint32)
44+
PacketRate() (pps, bps float64)
4345
Flush()
4446
Push(pkt packet.Packet)
4547
Tick(now uint64)
@@ -74,6 +76,11 @@ type SendStats struct {
7476

7577
UsPktSndPeriod float64 // microseconds
7678
BytePayload uint64
79+
80+
MbpsEstimatedInputBandwidth float64
81+
MbpsEstimatedSentBandwidth float64
82+
83+
PktLossRate float64
7784
}
7885

7986
// ReceiveStats are collected statistics from liveRecv
@@ -102,4 +109,8 @@ type ReceiveStats struct {
102109
MsBuf uint64
103110

104111
BytePayload uint64
112+
113+
MbpsEstimatedRecvBandwidth float64
114+
115+
PktLossRate float64
105116
}

0 commit comments

Comments
 (0)