diff --git a/cmd/pineconesim/ui/modules/graph.js b/cmd/pineconesim/ui/modules/graph.js
index 12699289..ebe49a32 100644
--- a/cmd/pineconesim/ui/modules/graph.js
+++ b/cmd/pineconesim/ui/modules/graph.js
@@ -946,7 +946,7 @@ function handleNodePanelUpdate() {
"
| Dest | Peer |
" +
snekTable +
"" +
- "
Broadcasts Received
" +
+ "
Broadcasts Received (" + broadcasts.size + ")
" +
"" +
"| Name | Time |
" +
bcastTable +
diff --git a/router/consts.go b/router/consts.go
index 7fddcd53..5ab5bcb1 100644
--- a/router/consts.go
+++ b/router/consts.go
@@ -65,6 +65,9 @@ const coordsCacheMaintainInterval = time.Minute
// to send broadcast messages into the network.
const wakeupBroadcastInterval = time.Minute
+// wakeupBroadcastType is the default flooding method.
+const wakeupBroadcastType = FloodDefault
+
// broadcastExpiryPeriod is how long we'll wait to
// expire a seen broadcast.
const broadcastExpiryPeriod = wakeupBroadcastInterval * 3
diff --git a/router/state_broadcast.go b/router/state_broadcast.go
index 3ad9f6de..12fe9ca6 100644
--- a/router/state_broadcast.go
+++ b/router/state_broadcast.go
@@ -84,6 +84,7 @@ func (s *state) _createBroadcastFrame() (*types.Frame, error) {
// Construct the frame.
send := getFrame()
send.Type = types.TypeWakeupBroadcast
+ send.Source = s._coords()
send.SourceKey = s.r.public
send.HopLimit = types.NetworkHorizonDistance
send.Payload = append(send.Payload[:0], b[:n]...)
@@ -96,8 +97,7 @@ func (s *state) _sendWakeupBroadcasts() {
if err != nil {
s.r.log.Println("Failed creating broadcast frame:", err)
}
-
- s._flood(s.r.local, broadcast, ClassicFlood)
+ s._flood(s.r.local, broadcast, wakeupBroadcastType)
}
func (s *state) _handleBroadcast(p *peer, f *types.Frame) error {
@@ -157,13 +157,8 @@ func (s *state) _handleBroadcast(p *peer, f *types.Frame) error {
return nil
}
- // Forward the broadcast to all our peers except for the peer we
- // received it from.
- if f.HopLimit >= types.NetworkHorizonDistance-1 {
- s._flood(p, f, ClassicFlood)
- } else {
- s._flood(p, f, TreeFlood)
- }
+ // Forward the broadcast onwards.
+ s._flood(p, f, wakeupBroadcastType)
return nil
}
diff --git a/router/state_forward.go b/router/state_forward.go
index 4dbc9c26..42ab7e42 100644
--- a/router/state_forward.go
+++ b/router/state_forward.go
@@ -25,8 +25,13 @@ import (
type FloodType int
const (
- ClassicFlood FloodType = iota
- TreeFlood
+ // Send to all neighbours with a further tree dist than the source. Potential
+ // for duplicate deliveries but much higher % chance of successful delivery.
+ FloodDefault FloodType = iota
+
+ // Send only to on-tree neighbours with a further tree dist. Technically this
+ // eliminates duplicates but it has much weaker delivery guarantees.
+ FloodTreeOnly
)
// _nextHopsFor returns the next-hop for the given frame. It will examine the packet
@@ -166,6 +171,8 @@ func (s *state) _forward(p *peer, f *types.Frame) error {
// Tree flooding works by only sending frames to peers on the same branch.
func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) {
floodCandidates := make(map[types.PublicKey]*peer)
+ ourCoords := s._coords()
+ ourDist := ourCoords.DistanceTo(f.Source)
for _, newCandidate := range s._peers {
if newCandidate == nil || newCandidate.proto == nil || !newCandidate.started.Load() {
continue
@@ -180,15 +187,26 @@ func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) {
continue
}
- if floodType == TreeFlood {
- if coords, err := newCandidate._coords(); err == nil {
- if coords.DistanceTo(s._coords()) != 1 {
- // This peer is not directly on the same branch.
+ if coords, err := newCandidate._coords(); err == nil {
+ if coords.DistanceTo(f.Source) <= ourDist {
+ // Don't forward the packet to any peers who aren't
+ // strictly further away from the source coordinates
+ // than we are.
+ continue
+ }
+ if floodType == FloodTreeOnly {
+ // Only flood via on-tree paths, i.e. direct parents or
+ // direct children. In this mode we will pretty much get
+ // at-most-once delivery, but we won't flood to any peer
+ // which is a shortcut to another branch. Without this,
+ // we'll send messages via shortcuts but that may result
+ // in duplicates.
+ if !ourCoords.IsChildOf(coords) && !coords.IsChildOf(ourCoords) {
continue
}
- } else {
- continue
}
+ } else {
+ continue
}
if existingCandidate, ok := floodCandidates[newCandidate.public]; ok {
diff --git a/router/version.go b/router/version.go
index f93ae905..75739597 100644
--- a/router/version.go
+++ b/router/version.go
@@ -21,7 +21,13 @@ const (
capabilityDedupedCoordinateInfo
capabilitySoftState
capabilityHybridRouting
+ capabilityImprovedTreeBroadcast
)
const ourVersion uint8 = 1
-const ourCapabilities uint32 = capabilityLengthenedRootInterval | capabilityCryptographicSetups | capabilityDedupedCoordinateInfo | capabilitySoftState | capabilityHybridRouting
+const ourCapabilities uint32 = capabilityLengthenedRootInterval |
+ capabilityCryptographicSetups |
+ capabilityDedupedCoordinateInfo |
+ capabilitySoftState |
+ capabilityHybridRouting |
+ capabilityImprovedTreeBroadcast
diff --git a/types/coordinates.go b/types/coordinates.go
index 11eddb81..30fbbf07 100644
--- a/types/coordinates.go
+++ b/types/coordinates.go
@@ -117,6 +117,13 @@ func (a Coordinates) DistanceTo(b Coordinates) int {
return len(a) + len(b) - 2*ancestor
}
+func (a Coordinates) IsChildOf(b Coordinates) bool {
+ if len(a) == 0 || len(a)-1 != len(b) {
+ return false
+ }
+ return a[:len(a)-1].EqualTo(b)
+}
+
func getCommonPrefix(a, b Coordinates) int {
c := 0
l := len(a)
diff --git a/types/frame.go b/types/frame.go
index 33f92a8a..689de3ee 100644
--- a/types/frame.go
+++ b/types/frame.go
@@ -89,6 +89,8 @@ func (f *Frame) CopyInto(t *Frame) {
t.Type = f.Type
t.Extra = f.Extra
t.HopLimit = f.HopLimit
+ t.Source = append(t.Source[:0], f.Source...)
+ t.Destination = append(t.Source[:0], f.Destination...)
t.DestinationKey = f.DestinationKey
t.SourceKey = f.SourceKey
t.Watermark = f.Watermark
@@ -134,6 +136,11 @@ func (f *Frame) MarshalBinary(buffer []byte) (int, error) {
payloadLen := len(f.Payload)
binary.BigEndian.PutUint16(buffer[offset+0:offset+2], uint16(payloadLen))
offset += 2
+ sn, err := f.Source.MarshalBinary(buffer[offset:])
+ if err != nil {
+ return 0, fmt.Errorf("f.Source.MarshalBinary: %w", err)
+ }
+ offset += sn
offset += copy(buffer[offset:], f.SourceKey[:ed25519.PublicKeySize])
if f.Payload != nil {
f.Payload = f.Payload[:payloadLen]
@@ -231,6 +238,11 @@ func (f *Frame) UnmarshalBinary(data []byte) (int, error) {
return 0, fmt.Errorf("payload length exceeds frame capacity")
}
offset += 2
+ srcLen, srcErr := f.Source.UnmarshalBinary(data[offset:])
+ if srcErr != nil {
+ return 0, fmt.Errorf("f.Source.UnmarshalBinary: %w", srcErr)
+ }
+ offset += srcLen
offset += copy(f.SourceKey[:], data[offset:])
f.Payload = f.Payload[:payloadLen]
offset += copy(f.Payload[:payloadLen], data[offset:])