diff --git a/cmd/pineconesim/ui/modules/graph.js b/cmd/pineconesim/ui/modules/graph.js index 12699289..ebe49a32 100644 --- a/cmd/pineconesim/ui/modules/graph.js +++ b/cmd/pineconesim/ui/modules/graph.js @@ -946,7 +946,7 @@ function handleNodePanelUpdate() { "DestPeer" + snekTable + "" + - "

Broadcasts Received

" + + "

Broadcasts Received (" + broadcasts.size + ")

" + "" + "" + bcastTable + diff --git a/router/consts.go b/router/consts.go index 7fddcd53..5ab5bcb1 100644 --- a/router/consts.go +++ b/router/consts.go @@ -65,6 +65,9 @@ const coordsCacheMaintainInterval = time.Minute // to send broadcast messages into the network. const wakeupBroadcastInterval = time.Minute +// wakeupBroadcastType is the default flooding method. +const wakeupBroadcastType = FloodDefault + // broadcastExpiryPeriod is how long we'll wait to // expire a seen broadcast. const broadcastExpiryPeriod = wakeupBroadcastInterval * 3 diff --git a/router/state_broadcast.go b/router/state_broadcast.go index 3ad9f6de..12fe9ca6 100644 --- a/router/state_broadcast.go +++ b/router/state_broadcast.go @@ -84,6 +84,7 @@ func (s *state) _createBroadcastFrame() (*types.Frame, error) { // Construct the frame. send := getFrame() send.Type = types.TypeWakeupBroadcast + send.Source = s._coords() send.SourceKey = s.r.public send.HopLimit = types.NetworkHorizonDistance send.Payload = append(send.Payload[:0], b[:n]...) @@ -96,8 +97,7 @@ func (s *state) _sendWakeupBroadcasts() { if err != nil { s.r.log.Println("Failed creating broadcast frame:", err) } - - s._flood(s.r.local, broadcast, ClassicFlood) + s._flood(s.r.local, broadcast, wakeupBroadcastType) } func (s *state) _handleBroadcast(p *peer, f *types.Frame) error { @@ -157,13 +157,8 @@ func (s *state) _handleBroadcast(p *peer, f *types.Frame) error { return nil } - // Forward the broadcast to all our peers except for the peer we - // received it from. - if f.HopLimit >= types.NetworkHorizonDistance-1 { - s._flood(p, f, ClassicFlood) - } else { - s._flood(p, f, TreeFlood) - } + // Forward the broadcast onwards. + s._flood(p, f, wakeupBroadcastType) return nil } diff --git a/router/state_forward.go b/router/state_forward.go index 4dbc9c26..42ab7e42 100644 --- a/router/state_forward.go +++ b/router/state_forward.go @@ -25,8 +25,13 @@ import ( type FloodType int const ( - ClassicFlood FloodType = iota - TreeFlood + // Send to all neighbours with a further tree dist than the source. Potential + // for duplicate deliveries but much higher % chance of successful delivery. + FloodDefault FloodType = iota + + // Send only to on-tree neighbours with a further tree dist. Technically this + // eliminates duplicates but it has much weaker delivery guarantees. + FloodTreeOnly ) // _nextHopsFor returns the next-hop for the given frame. It will examine the packet @@ -166,6 +171,8 @@ func (s *state) _forward(p *peer, f *types.Frame) error { // Tree flooding works by only sending frames to peers on the same branch. func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) { floodCandidates := make(map[types.PublicKey]*peer) + ourCoords := s._coords() + ourDist := ourCoords.DistanceTo(f.Source) for _, newCandidate := range s._peers { if newCandidate == nil || newCandidate.proto == nil || !newCandidate.started.Load() { continue @@ -180,15 +187,26 @@ func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) { continue } - if floodType == TreeFlood { - if coords, err := newCandidate._coords(); err == nil { - if coords.DistanceTo(s._coords()) != 1 { - // This peer is not directly on the same branch. + if coords, err := newCandidate._coords(); err == nil { + if coords.DistanceTo(f.Source) <= ourDist { + // Don't forward the packet to any peers who aren't + // strictly further away from the source coordinates + // than we are. + continue + } + if floodType == FloodTreeOnly { + // Only flood via on-tree paths, i.e. direct parents or + // direct children. In this mode we will pretty much get + // at-most-once delivery, but we won't flood to any peer + // which is a shortcut to another branch. Without this, + // we'll send messages via shortcuts but that may result + // in duplicates. + if !ourCoords.IsChildOf(coords) && !coords.IsChildOf(ourCoords) { continue } - } else { - continue } + } else { + continue } if existingCandidate, ok := floodCandidates[newCandidate.public]; ok { diff --git a/router/version.go b/router/version.go index f93ae905..75739597 100644 --- a/router/version.go +++ b/router/version.go @@ -21,7 +21,13 @@ const ( capabilityDedupedCoordinateInfo capabilitySoftState capabilityHybridRouting + capabilityImprovedTreeBroadcast ) const ourVersion uint8 = 1 -const ourCapabilities uint32 = capabilityLengthenedRootInterval | capabilityCryptographicSetups | capabilityDedupedCoordinateInfo | capabilitySoftState | capabilityHybridRouting +const ourCapabilities uint32 = capabilityLengthenedRootInterval | + capabilityCryptographicSetups | + capabilityDedupedCoordinateInfo | + capabilitySoftState | + capabilityHybridRouting | + capabilityImprovedTreeBroadcast diff --git a/types/coordinates.go b/types/coordinates.go index 11eddb81..30fbbf07 100644 --- a/types/coordinates.go +++ b/types/coordinates.go @@ -117,6 +117,13 @@ func (a Coordinates) DistanceTo(b Coordinates) int { return len(a) + len(b) - 2*ancestor } +func (a Coordinates) IsChildOf(b Coordinates) bool { + if len(a) == 0 || len(a)-1 != len(b) { + return false + } + return a[:len(a)-1].EqualTo(b) +} + func getCommonPrefix(a, b Coordinates) int { c := 0 l := len(a) diff --git a/types/frame.go b/types/frame.go index 33f92a8a..689de3ee 100644 --- a/types/frame.go +++ b/types/frame.go @@ -89,6 +89,8 @@ func (f *Frame) CopyInto(t *Frame) { t.Type = f.Type t.Extra = f.Extra t.HopLimit = f.HopLimit + t.Source = append(t.Source[:0], f.Source...) + t.Destination = append(t.Source[:0], f.Destination...) t.DestinationKey = f.DestinationKey t.SourceKey = f.SourceKey t.Watermark = f.Watermark @@ -134,6 +136,11 @@ func (f *Frame) MarshalBinary(buffer []byte) (int, error) { payloadLen := len(f.Payload) binary.BigEndian.PutUint16(buffer[offset+0:offset+2], uint16(payloadLen)) offset += 2 + sn, err := f.Source.MarshalBinary(buffer[offset:]) + if err != nil { + return 0, fmt.Errorf("f.Source.MarshalBinary: %w", err) + } + offset += sn offset += copy(buffer[offset:], f.SourceKey[:ed25519.PublicKeySize]) if f.Payload != nil { f.Payload = f.Payload[:payloadLen] @@ -231,6 +238,11 @@ func (f *Frame) UnmarshalBinary(data []byte) (int, error) { return 0, fmt.Errorf("payload length exceeds frame capacity") } offset += 2 + srcLen, srcErr := f.Source.UnmarshalBinary(data[offset:]) + if srcErr != nil { + return 0, fmt.Errorf("f.Source.UnmarshalBinary: %w", srcErr) + } + offset += srcLen offset += copy(f.SourceKey[:], data[offset:]) f.Payload = f.Payload[:payloadLen] offset += copy(f.Payload[:payloadLen], data[offset:])
NameTime