Skip to content
This repository has been archived by the owner on May 10, 2021. It is now read-only.

Implement routing table computation & sketch routing #2

Open
wants to merge 13 commits into
base: handshake
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ go 1.15
require (
github.com/stretchr/testify v1.6.1
go.dedis.ch/dela v0.0.0-20201014124135-54b9c0717601
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
golang.org/x/tools v0.0.0-20200904185747-39188db58858
)

// required until https://github.com/dedis/dela/issues/170 is fixed
replace go.dedis.ch/dela => /home/cache-nez/epfl/dedis-semester-project/dela
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.19.0 h1:hYz4ZVdUgjXTBUmrkrw55j1nHx68LfOKIQk5IYtyScg=
github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
Expand Down
5 changes: 5 additions & 0 deletions id/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type NodeID interface {
CommonPrefixAndFirstDifferentDigit(other NodeID) (Prefix, error)
}

// TODO: calculate these parameters from the number of players
func BaseAndLenFromPlayers(numPlayers int) (byte, int) {
return 16, 5
}

type ArrayNodeID struct {
id []byte
base byte
Expand Down
157 changes: 157 additions & 0 deletions routing/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package routing

import (
"context"
"fmt"
"time"

"go.dedis.ch/dela/mino"
"go.dedis.ch/dela/mino/minogrpc"
"go.dedis.ch/dela/serde"
)

func StreamN(numFollowers int, basePort uint16) {
orchestrator, err := minogrpc.NewMinogrpc(minogrpc.ParseAddress(
"127.0.0.1", basePort), NewRouter(minogrpc.NewAddressFactory()))
if err != nil {
panic("orchestrator overlay failed: " + err.Error())
}
orchestratorRPC := mino.MustCreateRPC(orchestrator, "test",
exampleHandler{}, exampleFactory{})


players := make([]*minogrpc.Minogrpc, numFollowers + 1)
players[0] = orchestrator
addresses := make([]mino.Address, numFollowers + 1)
addresses[0] = orchestrator.GetAddress()
addrToIdx := make(map[string]int)
for i := 1; i <= numFollowers; i++ {
player, err := minogrpc.NewMinogrpc(minogrpc.ParseAddress(
"127.0.0.1", basePort + uint16(i)),
NewRouter(minogrpc.NewAddressFactory()))
if err != nil {
panic("overlay " + string(rune(i)) + " failed: " + err.Error())
}
players[i] = player
addresses[i] = player.GetAddress()
addrToIdx[player.GetAddress().String()] = i
mino.MustCreateRPC(player, "test", exampleHandler{}, exampleFactory{})
}

// set up certificates
for i, firstPlayer := range players {
for j, secondPlayer := range players {
if i != j {
firstPlayer.GetCertificateStore().Store(
secondPlayer.GetAddress(), secondPlayer.GetCertificate())
}
}
}

addrs := mino.NewAddresses(addresses...)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sender, recv, err := orchestratorRPC.Stream(ctx, addrs)
if err != nil {
panic("stream failed: " + err.Error())
}

msgFormatString := "Hello %d!"
for i := 1; i <= numFollowers; i++ {
msg := fmt.Sprintf(msgFormatString, i)
err = <-sender.Send(exampleMessage{value: msg}, addresses[i])
if err != nil {
errorStr := fmt.Sprintf("failed to send to %d (%s): %s",
i, addresses[i].String(), err.Error())
panic(errorStr)
}
}

for i := 0; i < numFollowers; i++ {
from, msg, err := recv.Recv(ctx)
if err != nil {
panic("failed to receive: " + err.Error())
}
idx, ok := addrToIdx[from.String()]
if !ok {
panic("received a message from unexpected address: " + from.String())
}
expectedMsg := fmt.Sprintf(msgFormatString, idx)
receivedMsg := msg.(exampleMessage).value
if receivedMsg != expectedMsg {
panic("expected " + expectedMsg + ", received " + receivedMsg)
}
}
fmt.Println("Success")
}

func Example_stream_one() {
StreamN(1, 2000)
// Output: Success
}

func Example_stream_several() {
StreamN(5, 3000)
// Output: Success
}

func Example_stream_many() {
StreamN(50, 4000)
// Output: Success
}

// exampleHandler is an RPC handler example.
//
// - implements mino.Handler
type exampleHandler struct {
mino.UnsupportedHandler
}

// Process implements mino.Handler. It returns the message received.
func (exampleHandler) Process(req mino.Request) (serde.Message, error) {
return req.Message, nil
}

// Stream implements mino.Handler. It returns the message to the sender.
func (exampleHandler) Stream(sender mino.Sender, recv mino.Receiver) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

from, msg, err := recv.Recv(ctx)
if err != nil {
return err
}

err = <-sender.Send(msg, from)
if err != nil {
return err
}

return nil
}

// exampleMessage is an example of a message.
//
// - implements serde.Message
type exampleMessage struct {
value string
}

// Serialize implements serde.Message. It returns the value contained in the
// message.
func (m exampleMessage) Serialize(serde.Context) ([]byte, error) {
return []byte(m.value), nil
}

// exampleFactory is an example of a factory.
//
// - implements serde.Factory
type exampleFactory struct{}

// Deserialize implements serde.Factory. It returns the message using data as
// the inner value.
func (exampleFactory) Deserialize(ctx serde.Context, data []byte) (serde.Message, error) {
return exampleMessage{value: string(data)}, nil
}
207 changes: 207 additions & 0 deletions routing/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package routing

import (
"fmt"
"github.com/dedis/student20_rabyt/id"
"github.com/dedis/student20_rabyt/routing/handshake"
"go.dedis.ch/dela/mino"
"go.dedis.ch/dela/mino/router"
"go.dedis.ch/dela/mino/router/tree/types"
"math/rand"
"time"
)

// RoutingTable is a prefix-based routing implementation of router.RoutingTable.
// Each entry in the NextHop maps a prefix of this node's id (thisNode) plus
// a differing next digit to the address, where a packet whose destination has
// this prefix should be routed.
// Players contains the addresses of other nodes that this node is aware of and
// is used to select an alternative next hop when the connection to the one
// specified in NextHop fails.
type RoutingTable struct {
thisNode id.NodeID
thisAddress mino.Address
NextHop map[id.Prefix]mino.Address
Players []mino.Address
}

// Router implements router.Router
type Router struct {
packetFac router.PacketFactory
hsFac router.HandshakeFactory
routingTable router.RoutingTable
}

// NewRouter returns a new router.
func NewRouter(f mino.AddressFactory) *Router {
fac := types.NewPacketFactory(f)
hsFac := handshake.NewHandshakeFactory(f)

r := Router{
packetFac: fac,
hsFac: hsFac,
routingTable: nil,
}

return &r
}

// GetPacketFactory implements router.Router. It returns the packet factory.
func (r *Router) GetPacketFactory() router.PacketFactory {
return r.packetFac
}

// GetHandshakeFactory implements router.Router. It returns the handshake
// factory.
func (r *Router) GetHandshakeFactory() router.HandshakeFactory {
return r.hsFac
}

// New implements router.Router. It determines the base and length of node
// ids based on the number of players and creates the routing table for the node
// that is booting the protocol.
func (r *Router) New(players mino.Players, thisAddress mino.Address) (
router.RoutingTable, error) {
addrs := make([]mino.Address, 0, players.Len())
iter := players.AddressIterator()
includedThis := false
for iter.HasNext() {
currentAddr := iter.GetNext()
if currentAddr.Equal(thisAddress) {
includedThis = true
}
addrs = append(addrs, currentAddr)
}
if !includedThis {
addrs = append(addrs, thisAddress)
}

base, length := id.BaseAndLenFromPlayers(len(addrs))
table, err := NewTable(addrs, id.NewArrayNodeID(thisAddress, base,
length), thisAddress)
if err != nil {
return nil, err
}
r.routingTable = table
return r.routingTable, nil
}

// GenerateTableFrom implements router.Router. It selects entries for the
// routing table from the addresses, received in the handshake.
func (r *Router) GenerateTableFrom(h router.Handshake) (router.RoutingTable,
error) {
hs := h.(handshake.Handshake)
if r.routingTable == nil {
thisId := id.NewArrayNodeID(hs.ThisAddress, hs.IdBase, hs.IdLength)
table, err := NewTable(hs.Addresses, thisId, hs.ThisAddress)
if err != nil {
return nil, err
}
r.routingTable = table
}
return r.routingTable, nil
}

// NewTable constructs a routing table from the addresses of participating nodes.
// It requires the id of the node, for which the routing table is constructed,
// to calculate the common prefix of this node's id and other nodes' ids.
func NewTable(addresses []mino.Address, thisId id.NodeID,
thisAddress mino.Address) (router.RoutingTable, error) {
// random shuffle ensures that different nodes have different entries for
// the same prefix
randomShuffle(addresses)

hopMap := make(map[id.Prefix]mino.Address)
for _, address := range addresses {
if address.Equal(thisAddress) {
continue
}
otherId := id.NewArrayNodeID(address, thisId.Base(), thisId.Length())
if otherId.Equals(thisId) {
return nil, fmt.Errorf("id collision: id %s for addresses %s" +
" and %s", thisId, thisAddress.String(), address.String())
}
prefix, err := otherId.CommonPrefixAndFirstDifferentDigit(thisId)
if err != nil {
return nil, fmt.Errorf("error when calculating common prefix of" +
" ids: %s", err.Error())
}
if _, contains := hopMap[prefix]; !contains {
hopMap[prefix] = address
}
}

return RoutingTable{thisId, thisAddress, hopMap, addresses}, nil
}

func randomShuffle(addresses []mino.Address) {
rand.Seed(time.Now().UnixNano())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this can be moved to init() so that the Seed is initialised only once

rand.Shuffle(len(addresses), func(i, j int) {
addresses[i], addresses[j] = addresses[j], addresses[i]
})
}

// Make implements router.RoutingTable. It creates a packet with the source
// address, the destination addresses and the payload.
func (t RoutingTable) Make(src mino.Address, to []mino.Address,
msg []byte) router.Packet {
return types.NewPacket(src, msg, to...)
}

// PrepareHandshakeFor implements router.RoutingTable. It creates a handshake
// that should be sent to the distant peer when opening a relay to it.
// The peer will then generate its own routing table based on the handshake.
func (t RoutingTable) PrepareHandshakeFor(to mino.Address) router.Handshake {
base, length := id.BaseAndLenFromPlayers(len(t.Players))
return handshake.Handshake{IdBase: base, IdLength: length,
ThisAddress: to, Addresses: t.Players}
}

// Forward implements router.RoutingTable. It splits the packet into multiple,
// based on the calculated next hops.
func (t RoutingTable) Forward(packet router.Packet) (router.Routes,
router.Voids) {
routes := make(router.Routes)
voids := make(router.Voids)

for _, dest := range packet.GetDestination() {
gateway := t.GetRoute(dest)
p, ok := routes[gateway]
// A packet for this next hop hasn't been created yet,
// create it and add to routes
if !ok {
p = types.NewPacket(packet.GetSource(), packet.GetMessage())
routes[gateway] = p
}

p.(*types.Packet).Add(dest)
}

return routes, voids

}

// GetRoute implements router.RoutingTable. It calculates the next hop for a
// given destination.
func (t RoutingTable) GetRoute(to mino.Address) mino.Address {
toId := id.NewArrayNodeID(to, t.thisNode.Base(), t.thisNode.Length())
// Since id collisions are not expected, the only way this can happen is
// if this node is orchestrator's server side and the message is routed to
// orchestrator's client side. The only way the message can reach it is if
// it is routed to nil.
if toId.Equals(t.thisNode) && !to.Equal(t.thisAddress) {
return nil
}
// Take the common prefix of this node and destination + first differing
// digit of the destination
routingPrefix, _ := toId.CommonPrefixAndFirstDifferentDigit(t.thisNode)
return t.NextHop[routingPrefix]
}

// OnFailure implements router.RoutingTable. It changes the next hop for a
// given destination because the provided next hop is not available.
func (t RoutingTable) OnFailure(to mino.Address) error {
// TODO: keep redundancy in the routing table, use the alternative hop
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be implemented in a separate PR.

// and mark this node as unreachable
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil
return xerrors.Errorf("TODO")

}