This repository has been archived by the owner on May 10, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Implement routing table computation & sketch routing #2
Open
cache-nez
wants to merge
13
commits into
handshake
Choose a base branch
from
naive-routing
base: handshake
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
2dd1cda
Implement router.Handshake and provide serialization/deserialization …
cache-nez 1ad2725
A sketch of routing implementation for Dela.
cache-nez d8963e3
Add dela's minogrpc's example_test as a simple test of this prefix-ba…
cache-nez b95ecba
Fix formatting, add comments to all public methods
cache-nez 144ed3e
Generalize the routing test to stream to several players
cache-nez 3c216f7
Compute the routing table only once.
cache-nez 564ee62
Include the current node in the set of players when constructing a ro…
cache-nez 2246ad0
Store this node's address in the routing table
cache-nez 4358ea7
Route the message to nil if the destination id is this node's id but
cache-nez bf6b97e
Improve error handling in routing: descriptive messages
cache-nez 2f3aada
Add clarifying comments to routing data structures and go.mod's replace
cache-nez ebced97
Add a test with a larger number of participants. Increase default id …
cache-nez e0fc7e3
Merge branch 'handshake' into naive-routing
cache-nez File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
package routing | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.dedis.ch/dela/mino" | ||
"go.dedis.ch/dela/mino/minogrpc" | ||
"go.dedis.ch/dela/serde" | ||
) | ||
|
||
func StreamN(numFollowers int, basePort uint16) { | ||
orchestrator, err := minogrpc.NewMinogrpc(minogrpc.ParseAddress( | ||
"127.0.0.1", basePort), NewRouter(minogrpc.NewAddressFactory())) | ||
if err != nil { | ||
panic("orchestrator overlay failed: " + err.Error()) | ||
} | ||
orchestratorRPC := mino.MustCreateRPC(orchestrator, "test", | ||
exampleHandler{}, exampleFactory{}) | ||
|
||
|
||
players := make([]*minogrpc.Minogrpc, numFollowers + 1) | ||
players[0] = orchestrator | ||
addresses := make([]mino.Address, numFollowers + 1) | ||
addresses[0] = orchestrator.GetAddress() | ||
addrToIdx := make(map[string]int) | ||
for i := 1; i <= numFollowers; i++ { | ||
player, err := minogrpc.NewMinogrpc(minogrpc.ParseAddress( | ||
"127.0.0.1", basePort + uint16(i)), | ||
NewRouter(minogrpc.NewAddressFactory())) | ||
if err != nil { | ||
panic("overlay " + string(rune(i)) + " failed: " + err.Error()) | ||
} | ||
players[i] = player | ||
addresses[i] = player.GetAddress() | ||
addrToIdx[player.GetAddress().String()] = i | ||
mino.MustCreateRPC(player, "test", exampleHandler{}, exampleFactory{}) | ||
} | ||
|
||
// set up certificates | ||
for i, firstPlayer := range players { | ||
for j, secondPlayer := range players { | ||
if i != j { | ||
firstPlayer.GetCertificateStore().Store( | ||
secondPlayer.GetAddress(), secondPlayer.GetCertificate()) | ||
} | ||
} | ||
} | ||
|
||
addrs := mino.NewAddresses(addresses...) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
sender, recv, err := orchestratorRPC.Stream(ctx, addrs) | ||
if err != nil { | ||
panic("stream failed: " + err.Error()) | ||
} | ||
|
||
msgFormatString := "Hello %d!" | ||
for i := 1; i <= numFollowers; i++ { | ||
msg := fmt.Sprintf(msgFormatString, i) | ||
err = <-sender.Send(exampleMessage{value: msg}, addresses[i]) | ||
if err != nil { | ||
errorStr := fmt.Sprintf("failed to send to %d (%s): %s", | ||
i, addresses[i].String(), err.Error()) | ||
panic(errorStr) | ||
} | ||
} | ||
|
||
for i := 0; i < numFollowers; i++ { | ||
from, msg, err := recv.Recv(ctx) | ||
if err != nil { | ||
panic("failed to receive: " + err.Error()) | ||
} | ||
idx, ok := addrToIdx[from.String()] | ||
if !ok { | ||
panic("received a message from unexpected address: " + from.String()) | ||
} | ||
expectedMsg := fmt.Sprintf(msgFormatString, idx) | ||
receivedMsg := msg.(exampleMessage).value | ||
if receivedMsg != expectedMsg { | ||
panic("expected " + expectedMsg + ", received " + receivedMsg) | ||
} | ||
} | ||
fmt.Println("Success") | ||
} | ||
|
||
func Example_stream_one() { | ||
StreamN(1, 2000) | ||
// Output: Success | ||
} | ||
|
||
func Example_stream_several() { | ||
StreamN(5, 3000) | ||
// Output: Success | ||
} | ||
|
||
func Example_stream_many() { | ||
StreamN(50, 4000) | ||
// Output: Success | ||
} | ||
|
||
// exampleHandler is an RPC handler example. | ||
// | ||
// - implements mino.Handler | ||
type exampleHandler struct { | ||
mino.UnsupportedHandler | ||
} | ||
|
||
// Process implements mino.Handler. It returns the message received. | ||
func (exampleHandler) Process(req mino.Request) (serde.Message, error) { | ||
return req.Message, nil | ||
} | ||
|
||
// Stream implements mino.Handler. It returns the message to the sender. | ||
func (exampleHandler) Stream(sender mino.Sender, recv mino.Receiver) error { | ||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer cancel() | ||
|
||
from, msg, err := recv.Recv(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = <-sender.Send(msg, from) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// exampleMessage is an example of a message. | ||
// | ||
// - implements serde.Message | ||
type exampleMessage struct { | ||
value string | ||
} | ||
|
||
// Serialize implements serde.Message. It returns the value contained in the | ||
// message. | ||
func (m exampleMessage) Serialize(serde.Context) ([]byte, error) { | ||
return []byte(m.value), nil | ||
} | ||
|
||
// exampleFactory is an example of a factory. | ||
// | ||
// - implements serde.Factory | ||
type exampleFactory struct{} | ||
|
||
// Deserialize implements serde.Factory. It returns the message using data as | ||
// the inner value. | ||
func (exampleFactory) Deserialize(ctx serde.Context, data []byte) (serde.Message, error) { | ||
return exampleMessage{value: string(data)}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,207 @@ | ||||||
package routing | ||||||
|
||||||
import ( | ||||||
"fmt" | ||||||
"github.com/dedis/student20_rabyt/id" | ||||||
"github.com/dedis/student20_rabyt/routing/handshake" | ||||||
"go.dedis.ch/dela/mino" | ||||||
"go.dedis.ch/dela/mino/router" | ||||||
"go.dedis.ch/dela/mino/router/tree/types" | ||||||
"math/rand" | ||||||
"time" | ||||||
) | ||||||
|
||||||
// RoutingTable is a prefix-based routing implementation of router.RoutingTable. | ||||||
// Each entry in the NextHop maps a prefix of this node's id (thisNode) plus | ||||||
// a differing next digit to the address, where a packet whose destination has | ||||||
// this prefix should be routed. | ||||||
// Players contains the addresses of other nodes that this node is aware of and | ||||||
// is used to select an alternative next hop when the connection to the one | ||||||
// specified in NextHop fails. | ||||||
type RoutingTable struct { | ||||||
thisNode id.NodeID | ||||||
thisAddress mino.Address | ||||||
NextHop map[id.Prefix]mino.Address | ||||||
Players []mino.Address | ||||||
} | ||||||
|
||||||
// Router implements router.Router | ||||||
type Router struct { | ||||||
packetFac router.PacketFactory | ||||||
hsFac router.HandshakeFactory | ||||||
routingTable router.RoutingTable | ||||||
} | ||||||
|
||||||
// NewRouter returns a new router. | ||||||
func NewRouter(f mino.AddressFactory) *Router { | ||||||
fac := types.NewPacketFactory(f) | ||||||
hsFac := handshake.NewHandshakeFactory(f) | ||||||
|
||||||
r := Router{ | ||||||
packetFac: fac, | ||||||
hsFac: hsFac, | ||||||
routingTable: nil, | ||||||
} | ||||||
|
||||||
return &r | ||||||
} | ||||||
|
||||||
// GetPacketFactory implements router.Router. It returns the packet factory. | ||||||
func (r *Router) GetPacketFactory() router.PacketFactory { | ||||||
return r.packetFac | ||||||
} | ||||||
|
||||||
// GetHandshakeFactory implements router.Router. It returns the handshake | ||||||
// factory. | ||||||
func (r *Router) GetHandshakeFactory() router.HandshakeFactory { | ||||||
return r.hsFac | ||||||
} | ||||||
|
||||||
// New implements router.Router. It determines the base and length of node | ||||||
// ids based on the number of players and creates the routing table for the node | ||||||
// that is booting the protocol. | ||||||
func (r *Router) New(players mino.Players, thisAddress mino.Address) ( | ||||||
router.RoutingTable, error) { | ||||||
addrs := make([]mino.Address, 0, players.Len()) | ||||||
iter := players.AddressIterator() | ||||||
includedThis := false | ||||||
for iter.HasNext() { | ||||||
currentAddr := iter.GetNext() | ||||||
if currentAddr.Equal(thisAddress) { | ||||||
includedThis = true | ||||||
} | ||||||
addrs = append(addrs, currentAddr) | ||||||
} | ||||||
if !includedThis { | ||||||
addrs = append(addrs, thisAddress) | ||||||
} | ||||||
|
||||||
base, length := id.BaseAndLenFromPlayers(len(addrs)) | ||||||
table, err := NewTable(addrs, id.NewArrayNodeID(thisAddress, base, | ||||||
length), thisAddress) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
r.routingTable = table | ||||||
return r.routingTable, nil | ||||||
} | ||||||
|
||||||
// GenerateTableFrom implements router.Router. It selects entries for the | ||||||
// routing table from the addresses, received in the handshake. | ||||||
func (r *Router) GenerateTableFrom(h router.Handshake) (router.RoutingTable, | ||||||
error) { | ||||||
hs := h.(handshake.Handshake) | ||||||
if r.routingTable == nil { | ||||||
thisId := id.NewArrayNodeID(hs.ThisAddress, hs.IdBase, hs.IdLength) | ||||||
table, err := NewTable(hs.Addresses, thisId, hs.ThisAddress) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
r.routingTable = table | ||||||
} | ||||||
return r.routingTable, nil | ||||||
} | ||||||
|
||||||
// NewTable constructs a routing table from the addresses of participating nodes. | ||||||
// It requires the id of the node, for which the routing table is constructed, | ||||||
// to calculate the common prefix of this node's id and other nodes' ids. | ||||||
func NewTable(addresses []mino.Address, thisId id.NodeID, | ||||||
thisAddress mino.Address) (router.RoutingTable, error) { | ||||||
// random shuffle ensures that different nodes have different entries for | ||||||
// the same prefix | ||||||
randomShuffle(addresses) | ||||||
|
||||||
hopMap := make(map[id.Prefix]mino.Address) | ||||||
for _, address := range addresses { | ||||||
if address.Equal(thisAddress) { | ||||||
continue | ||||||
} | ||||||
otherId := id.NewArrayNodeID(address, thisId.Base(), thisId.Length()) | ||||||
if otherId.Equals(thisId) { | ||||||
return nil, fmt.Errorf("id collision: id %s for addresses %s" + | ||||||
" and %s", thisId, thisAddress.String(), address.String()) | ||||||
} | ||||||
prefix, err := otherId.CommonPrefixAndFirstDifferentDigit(thisId) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("error when calculating common prefix of" + | ||||||
" ids: %s", err.Error()) | ||||||
} | ||||||
if _, contains := hopMap[prefix]; !contains { | ||||||
hopMap[prefix] = address | ||||||
} | ||||||
} | ||||||
|
||||||
return RoutingTable{thisId, thisAddress, hopMap, addresses}, nil | ||||||
} | ||||||
|
||||||
func randomShuffle(addresses []mino.Address) { | ||||||
rand.Seed(time.Now().UnixNano()) | ||||||
rand.Shuffle(len(addresses), func(i, j int) { | ||||||
addresses[i], addresses[j] = addresses[j], addresses[i] | ||||||
}) | ||||||
} | ||||||
|
||||||
// Make implements router.RoutingTable. It creates a packet with the source | ||||||
// address, the destination addresses and the payload. | ||||||
func (t RoutingTable) Make(src mino.Address, to []mino.Address, | ||||||
msg []byte) router.Packet { | ||||||
return types.NewPacket(src, msg, to...) | ||||||
} | ||||||
|
||||||
// PrepareHandshakeFor implements router.RoutingTable. It creates a handshake | ||||||
// that should be sent to the distant peer when opening a relay to it. | ||||||
// The peer will then generate its own routing table based on the handshake. | ||||||
func (t RoutingTable) PrepareHandshakeFor(to mino.Address) router.Handshake { | ||||||
base, length := id.BaseAndLenFromPlayers(len(t.Players)) | ||||||
return handshake.Handshake{IdBase: base, IdLength: length, | ||||||
ThisAddress: to, Addresses: t.Players} | ||||||
} | ||||||
|
||||||
// Forward implements router.RoutingTable. It splits the packet into multiple, | ||||||
// based on the calculated next hops. | ||||||
func (t RoutingTable) Forward(packet router.Packet) (router.Routes, | ||||||
router.Voids) { | ||||||
routes := make(router.Routes) | ||||||
voids := make(router.Voids) | ||||||
|
||||||
for _, dest := range packet.GetDestination() { | ||||||
gateway := t.GetRoute(dest) | ||||||
p, ok := routes[gateway] | ||||||
// A packet for this next hop hasn't been created yet, | ||||||
// create it and add to routes | ||||||
if !ok { | ||||||
p = types.NewPacket(packet.GetSource(), packet.GetMessage()) | ||||||
routes[gateway] = p | ||||||
} | ||||||
|
||||||
p.(*types.Packet).Add(dest) | ||||||
} | ||||||
|
||||||
return routes, voids | ||||||
|
||||||
} | ||||||
|
||||||
// GetRoute implements router.RoutingTable. It calculates the next hop for a | ||||||
// given destination. | ||||||
func (t RoutingTable) GetRoute(to mino.Address) mino.Address { | ||||||
toId := id.NewArrayNodeID(to, t.thisNode.Base(), t.thisNode.Length()) | ||||||
// Since id collisions are not expected, the only way this can happen is | ||||||
// if this node is orchestrator's server side and the message is routed to | ||||||
// orchestrator's client side. The only way the message can reach it is if | ||||||
// it is routed to nil. | ||||||
if toId.Equals(t.thisNode) && !to.Equal(t.thisAddress) { | ||||||
return nil | ||||||
} | ||||||
// Take the common prefix of this node and destination + first differing | ||||||
// digit of the destination | ||||||
routingPrefix, _ := toId.CommonPrefixAndFirstDifferentDigit(t.thisNode) | ||||||
return t.NextHop[routingPrefix] | ||||||
} | ||||||
|
||||||
// OnFailure implements router.RoutingTable. It changes the next hop for a | ||||||
// given destination because the provided next hop is not available. | ||||||
func (t RoutingTable) OnFailure(to mino.Address) error { | ||||||
// TODO: keep redundancy in the routing table, use the alternative hop | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be implemented in a separate PR. |
||||||
// and mark this node as unreachable | ||||||
return nil | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this can be moved to
init()
so that the Seed is initialised only once