Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/pineconesim/ui/modules/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ function handleNodePanelUpdate() {
"<tr><th>Dest</th><th>Peer</th></tr>" +
snekTable +
"</table>" +
"<hr><h4><u>Broadcasts Received</u></h4>" +
"<hr><h4><u>Broadcasts Received (" + broadcasts.size + ")</u></h4>" +
"<table>" +
"<tr><th>Name</th><th>Time</th></tr>" +
bcastTable +
Expand Down
3 changes: 3 additions & 0 deletions router/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ const coordsCacheMaintainInterval = time.Minute
// to send broadcast messages into the network.
const wakeupBroadcastInterval = time.Minute

// wakeupBroadcastType is the default flooding method.
const wakeupBroadcastType = FloodDefault

// broadcastExpiryPeriod is how long we'll wait to
// expire a seen broadcast.
const broadcastExpiryPeriod = wakeupBroadcastInterval * 3
Expand Down
13 changes: 4 additions & 9 deletions router/state_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (s *state) _createBroadcastFrame() (*types.Frame, error) {
// Construct the frame.
send := getFrame()
send.Type = types.TypeWakeupBroadcast
send.Source = s._coords()
send.SourceKey = s.r.public
send.HopLimit = types.NetworkHorizonDistance
send.Payload = append(send.Payload[:0], b[:n]...)
Expand All @@ -96,8 +97,7 @@ func (s *state) _sendWakeupBroadcasts() {
if err != nil {
s.r.log.Println("Failed creating broadcast frame:", err)
}

s._flood(s.r.local, broadcast, ClassicFlood)
s._flood(s.r.local, broadcast, wakeupBroadcastType)
}

func (s *state) _handleBroadcast(p *peer, f *types.Frame) error {
Expand Down Expand Up @@ -157,13 +157,8 @@ func (s *state) _handleBroadcast(p *peer, f *types.Frame) error {
return nil
}

// Forward the broadcast to all our peers except for the peer we
// received it from.
if f.HopLimit >= types.NetworkHorizonDistance-1 {
s._flood(p, f, ClassicFlood)
} else {
s._flood(p, f, TreeFlood)
}
// Forward the broadcast onwards.
s._flood(p, f, wakeupBroadcastType)

return nil
}
34 changes: 26 additions & 8 deletions router/state_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ import (
type FloodType int

const (
ClassicFlood FloodType = iota
TreeFlood
// Send to all neighbours with a further tree dist than the source. Potential
// for duplicate deliveries but much higher % chance of successful delivery.
FloodDefault FloodType = iota

// Send only to on-tree neighbours with a further tree dist. Technically this
// eliminates duplicates but it has much weaker delivery guarantees.
FloodTreeOnly
)

// _nextHopsFor returns the next-hop for the given frame. It will examine the packet
Expand Down Expand Up @@ -166,6 +171,8 @@ func (s *state) _forward(p *peer, f *types.Frame) error {
// Tree flooding works by only sending frames to peers on the same branch.
func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) {
floodCandidates := make(map[types.PublicKey]*peer)
ourCoords := s._coords()
ourDist := ourCoords.DistanceTo(f.Source)
for _, newCandidate := range s._peers {
if newCandidate == nil || newCandidate.proto == nil || !newCandidate.started.Load() {
continue
Expand All @@ -180,15 +187,26 @@ func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) {
continue
}

if floodType == TreeFlood {
if coords, err := newCandidate._coords(); err == nil {
if coords.DistanceTo(s._coords()) != 1 {
// This peer is not directly on the same branch.
if coords, err := newCandidate._coords(); err == nil {
if coords.DistanceTo(f.Source) <= ourDist {
// Don't forward the packet to any peers who aren't
// strictly further away from the source coordinates
// than we are.
continue
}
if floodType == FloodTreeOnly {
// Only flood via on-tree paths, i.e. direct parents or
// direct children. In this mode we will pretty much get
// at-most-once delivery, but we won't flood to any peer
// which is a shortcut to another branch. Without this,
// we'll send messages via shortcuts but that may result
// in duplicates.
if !ourCoords.IsChildOf(coords) && !coords.IsChildOf(ourCoords) {
continue
}
} else {
continue
}
} else {
continue
}

if existingCandidate, ok := floodCandidates[newCandidate.public]; ok {
Expand Down
8 changes: 7 additions & 1 deletion router/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ const (
capabilityDedupedCoordinateInfo
capabilitySoftState
capabilityHybridRouting
capabilityImprovedTreeBroadcast
)

const ourVersion uint8 = 1
const ourCapabilities uint32 = capabilityLengthenedRootInterval | capabilityCryptographicSetups | capabilityDedupedCoordinateInfo | capabilitySoftState | capabilityHybridRouting
const ourCapabilities uint32 = capabilityLengthenedRootInterval |
capabilityCryptographicSetups |
capabilityDedupedCoordinateInfo |
capabilitySoftState |
capabilityHybridRouting |
capabilityImprovedTreeBroadcast
7 changes: 7 additions & 0 deletions types/coordinates.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ func (a Coordinates) DistanceTo(b Coordinates) int {
return len(a) + len(b) - 2*ancestor
}

func (a Coordinates) IsChildOf(b Coordinates) bool {
if len(a) == 0 || len(a)-1 != len(b) {
return false
}
return a[:len(a)-1].EqualTo(b)
}

func getCommonPrefix(a, b Coordinates) int {
c := 0
l := len(a)
Expand Down
12 changes: 12 additions & 0 deletions types/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (f *Frame) CopyInto(t *Frame) {
t.Type = f.Type
t.Extra = f.Extra
t.HopLimit = f.HopLimit
t.Source = append(t.Source[:0], f.Source...)
t.Destination = append(t.Source[:0], f.Destination...)
t.DestinationKey = f.DestinationKey
t.SourceKey = f.SourceKey
t.Watermark = f.Watermark
Expand Down Expand Up @@ -134,6 +136,11 @@ func (f *Frame) MarshalBinary(buffer []byte) (int, error) {
payloadLen := len(f.Payload)
binary.BigEndian.PutUint16(buffer[offset+0:offset+2], uint16(payloadLen))
offset += 2
sn, err := f.Source.MarshalBinary(buffer[offset:])
if err != nil {
return 0, fmt.Errorf("f.Source.MarshalBinary: %w", err)
}
offset += sn
offset += copy(buffer[offset:], f.SourceKey[:ed25519.PublicKeySize])
if f.Payload != nil {
f.Payload = f.Payload[:payloadLen]
Expand Down Expand Up @@ -231,6 +238,11 @@ func (f *Frame) UnmarshalBinary(data []byte) (int, error) {
return 0, fmt.Errorf("payload length exceeds frame capacity")
}
offset += 2
srcLen, srcErr := f.Source.UnmarshalBinary(data[offset:])
if srcErr != nil {
return 0, fmt.Errorf("f.Source.UnmarshalBinary: %w", srcErr)
}
offset += srcLen
offset += copy(f.SourceKey[:], data[offset:])
f.Payload = f.Payload[:payloadLen]
offset += copy(f.Payload[:payloadLen], data[offset:])
Expand Down
Loading