Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bwe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package bwe implements data structures that are common to all bandwidth
// estimators.
package bwe
25 changes: 25 additions & 0 deletions ecn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package bwe

// ECN represents the ECN bits of an IP packet header.
type ECN uint8

const (
// ECNNonECT signals Non ECN-Capable Transport, Non-ECT.
// nolint:misspell
ECNNonECT ECN = iota // 00

// ECNECT1 signals ECN Capable Transport, ECT(0).
// nolint:misspell
ECNECT1 // 01

// ECNECT0 signals ECN Capable Transport, ECT(1).
// nolint:misspell
ECNECT0 // 10

// ECNCE signals ECN Congestion Encountered, CE.
// nolint:misspell
ECNCE // 11
)
57 changes: 57 additions & 0 deletions gcc/arrival_group_accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package gcc

import (
"time"

"github.com/pion/bwe"
)

type arrivalGroup []bwe.Packet

type arrivalGroupAccumulator struct {
next arrivalGroup
burstInterval time.Duration
maxBurstDuration time.Duration
}

func newArrivalGroupAccumulator() *arrivalGroupAccumulator {
return &arrivalGroupAccumulator{
next: make([]bwe.Packet, 0),
burstInterval: 5 * time.Millisecond,
maxBurstDuration: 100 * time.Millisecond,
}
}

func (a *arrivalGroupAccumulator) onPacketAcked(ack bwe.Packet) arrivalGroup {
if len(a.next) == 0 {
a.next = append(a.next, ack)

return nil
}

sendTimeDelta := ack.Departure.Sub(a.next[0].Departure)
if sendTimeDelta < a.burstInterval {
a.next = append(a.next, ack)

return nil
}

arrivalTimeDeltaLast := ack.Arrival.Sub(a.next[len(a.next)-1].Arrival)
arrivalTimeDeltaFirst := ack.Arrival.Sub(a.next[0].Arrival)
propagationDelta := arrivalTimeDeltaFirst - sendTimeDelta

if propagationDelta < 0 && arrivalTimeDeltaLast <= a.burstInterval && arrivalTimeDeltaFirst < a.maxBurstDuration {
a.next = append(a.next, ack)

return nil
}

group := make(arrivalGroup, len(a.next))
copy(group, a.next)
a.next = arrivalGroup{ack}

return group
}
210 changes: 210 additions & 0 deletions gcc/arrival_group_accumulator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package gcc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestArrivalGroupAccumulator(t *testing.T) {
triggerNewGroupElement := Acknowledgment{

Check failure on line 14 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 14 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 14 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
Departure: time.Time{}.Add(time.Second),
Arrival: time.Time{}.Add(time.Second),
}
cases := []struct {
name string
log []Acknowledgment

Check failure on line 20 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 20 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 20 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
exp []arrivalGroup
}{
{
name: "emptyCreatesNoGroups",
log: []Acknowledgment{},

Check failure on line 25 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 25 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 25 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
exp: []arrivalGroup{},
},
{
name: "createsSingleElementGroup",
log: []Acknowledgment{

Check failure on line 30 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 30 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 30 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
{
Departure: time.Time{},
Arrival: time.Time{}.Add(time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(time.Millisecond),
},
},
},
},
{
name: "createsTwoElementGroup",
log: []Acknowledgment{

Check failure on line 48 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 48 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 48 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
}},
},
{
name: "createsTwoArrivalGroups1",
log: []Acknowledgment{

Check failure on line 72 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 72 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 72 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
{
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(24 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(20 * time.Millisecond),
},
},
{
{
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(24 * time.Millisecond),
},
},
},
},
{
name: "ignoresOutOfOrderPackets",
log: []Acknowledgment{

Check failure on line 108 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 108 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 108 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
{
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(34 * time.Millisecond),
},
{
Departure: time.Time{}.Add(8 * time.Millisecond),
Arrival: time.Time{}.Add(30 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
Departure: time.Time{},
Arrival: time.Time{}.Add(15 * time.Millisecond),
},
},
{
{
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(34 * time.Millisecond),
},
{
Departure: time.Time{}.Add(8 * time.Millisecond),
Arrival: time.Time{}.Add(30 * time.Millisecond),
},
},
},
},
{
name: "newGroupBecauseOfInterDepartureTime",
log: []Acknowledgment{

Check failure on line 144 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

undefined: Acknowledgment

Check failure on line 144 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: Acknowledgment

Check failure on line 144 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: Acknowledgment
{
SeqNr: 0,
Departure: time.Time{},
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
SeqNr: 1,
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
SeqNr: 2,
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
SeqNr: 3,
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
triggerNewGroupElement,
},
exp: []arrivalGroup{
{
{
SeqNr: 0,

Check failure on line 170 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

unknown field SeqNr in struct literal of type bwe.Packet

Check failure on line 170 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

unknown field SeqNr in struct literal of type bwe.Packet

Check failure on line 170 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

unknown field SeqNr in struct literal of type bwe.Packet
Departure: time.Time{},
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
{
SeqNr: 1,

Check failure on line 175 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.25) / Go 1.25

unknown field SeqNr in struct literal of type bwe.Packet

Check failure on line 175 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

unknown field SeqNr in struct literal of type bwe.Packet

Check failure on line 175 in gcc/arrival_group_accumulator_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

unknown field SeqNr in struct literal of type bwe.Packet
Departure: time.Time{}.Add(3 * time.Millisecond),
Arrival: time.Time{}.Add(4 * time.Millisecond),
},
},
{
{
SeqNr: 2,
Departure: time.Time{}.Add(6 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
{
SeqNr: 3,
Departure: time.Time{}.Add(9 * time.Millisecond),
Arrival: time.Time{}.Add(10 * time.Millisecond),
},
},
},
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
aga := newArrivalGroupAccumulator()
received := []arrivalGroup{}
for _, ack := range tc.log {
next := aga.onPacketAcked(ack)
if next != nil {
received = append(received, next)
}
}
assert.Equal(t, tc.exp, received)
})
}
}
90 changes: 90 additions & 0 deletions gcc/delay_rate_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package gcc

import (
"time"

"github.com/pion/bwe"
"github.com/pion/logging"
)

type delayRateController struct {
log logging.LeveledLogger
aga *arrivalGroupAccumulator
last arrivalGroup
kf *kalmanFilter
od *overuseDetector
rc *rateController
latestUsage usage
samples int
}

func newDelayRateController(initialRate int, logger logging.LeveledLogger) *delayRateController {
return &delayRateController{
log: logger,
aga: newArrivalGroupAccumulator(),
last: []bwe.Packet{},
kf: newKalmanFilter(),
od: newOveruseDetector(true),
rc: newRateController(initialRate),
latestUsage: 0,
samples: 0,
}
}

func (c *delayRateController) onPacketAcked(ack bwe.Packet) {
next := c.aga.onPacketAcked(ack)
if next == nil {
return
}
if len(next) == 0 {
// ignore empty groups, should never occur
return
}
if len(c.last) == 0 {
c.last = next

return
}

prevSize := groupSize(c.last)
nextSize := groupSize(next)
sizeDelta := nextSize - prevSize

interArrivalTime := next[len(next)-1].Arrival.Sub(c.last[len(c.last)-1].Arrival)
interDepartureTime := next[len(next)-1].Departure.Sub(c.last[len(c.last)-1].Departure)
interGroupDelay := interArrivalTime - interDepartureTime
estimate := c.kf.update(float64(interGroupDelay.Milliseconds()), float64(sizeDelta))
c.samples++
c.latestUsage = c.od.update(ack.Arrival, estimate, c.samples)
c.last = next
c.log.Tracef(
"ts=%v.%06d, seq=%v, size=%v, interArrivalTime=%v, interDepartureTime=%v, interGroupDelay=%v, estimate=%v, threshold=%v, usage=%v, state=%v", // nolint
c.last[0].Departure.UTC().Format("2006/01/02 15:04:05"),
c.last[0].Departure.UTC().Nanosecond()/1e3,
next[0].SequenceNumber,
nextSize,
interArrivalTime.Microseconds(),
interDepartureTime.Microseconds(),
interGroupDelay.Microseconds(),
estimate,
c.od.delayThreshold,
int(c.latestUsage),
int(c.rc.s),
)
}

func (c *delayRateController) update(ts time.Time, lastDeliveryRate int, rtt time.Duration) int {
return c.rc.update(ts, c.latestUsage, lastDeliveryRate, rtt)
}

func groupSize(group arrivalGroup) int {
sum := 0
for _, ack := range group {
sum += int(ack.Size)
}

return sum
}
Loading
Loading