diff --git a/go.mod b/go.mod index 8056653..86e365e 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,9 @@ go 1.15 require ( github.com/stretchr/testify v1.6.1 go.dedis.ch/dela v0.0.0-20201014124135-54b9c0717601 + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 golang.org/x/tools v0.0.0-20200904185747-39188db58858 ) + +// required until https://github.com/dedis/dela/issues/170 is fixed +replace go.dedis.ch/dela => /home/cache-nez/epfl/dedis-semester-project/dela diff --git a/go.sum b/go.sum index d00288c..739fa6b 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,7 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.19.0 h1:hYz4ZVdUgjXTBUmrkrw55j1nHx68LfOKIQk5IYtyScg= github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= diff --git a/id/mod.go b/id/mod.go index 913250d..bbb51d4 100644 --- a/id/mod.go +++ b/id/mod.go @@ -17,6 +17,11 @@ type NodeID interface { CommonPrefixAndFirstDifferentDigit(other NodeID) (Prefix, error) } +// TODO: calculate these parameters from the number of players +func BaseAndLenFromPlayers(numPlayers int) (byte, int) { + return 16, 5 +} + type ArrayNodeID struct { id []byte base byte diff --git a/routing/example_test.go b/routing/example_test.go new file mode 100644 index 0000000..a1c7867 --- /dev/null +++ b/routing/example_test.go @@ -0,0 +1,157 @@ +package routing + +import ( + "context" + "fmt" + "time" + + "go.dedis.ch/dela/mino" + "go.dedis.ch/dela/mino/minogrpc" + "go.dedis.ch/dela/serde" +) + +func StreamN(numFollowers int, basePort uint16) { + orchestrator, err := minogrpc.NewMinogrpc(minogrpc.ParseAddress( + "127.0.0.1", basePort), NewRouter(minogrpc.NewAddressFactory())) + if err != nil { + panic("orchestrator overlay failed: " + err.Error()) + } + orchestratorRPC := mino.MustCreateRPC(orchestrator, "test", + exampleHandler{}, exampleFactory{}) + + + players := make([]*minogrpc.Minogrpc, numFollowers + 1) + players[0] = orchestrator + addresses := make([]mino.Address, numFollowers + 1) + addresses[0] = orchestrator.GetAddress() + addrToIdx := make(map[string]int) + for i := 1; i <= numFollowers; i++ { + player, err := minogrpc.NewMinogrpc(minogrpc.ParseAddress( + "127.0.0.1", basePort + uint16(i)), + NewRouter(minogrpc.NewAddressFactory())) + if err != nil { + panic("overlay " + string(rune(i)) + " failed: " + err.Error()) + } + players[i] = player + addresses[i] = player.GetAddress() + addrToIdx[player.GetAddress().String()] = i + mino.MustCreateRPC(player, "test", exampleHandler{}, exampleFactory{}) + } + + // set up certificates + for i, firstPlayer := range players { + for j, secondPlayer := range players { + if i != j { + firstPlayer.GetCertificateStore().Store( + secondPlayer.GetAddress(), secondPlayer.GetCertificate()) + } + } + } + + addrs := mino.NewAddresses(addresses...) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sender, recv, err := orchestratorRPC.Stream(ctx, addrs) + if err != nil { + panic("stream failed: " + err.Error()) + } + + msgFormatString := "Hello %d!" + for i := 1; i <= numFollowers; i++ { + msg := fmt.Sprintf(msgFormatString, i) + err = <-sender.Send(exampleMessage{value: msg}, addresses[i]) + if err != nil { + errorStr := fmt.Sprintf("failed to send to %d (%s): %s", + i, addresses[i].String(), err.Error()) + panic(errorStr) + } + } + + for i := 0; i < numFollowers; i++ { + from, msg, err := recv.Recv(ctx) + if err != nil { + panic("failed to receive: " + err.Error()) + } + idx, ok := addrToIdx[from.String()] + if !ok { + panic("received a message from unexpected address: " + from.String()) + } + expectedMsg := fmt.Sprintf(msgFormatString, idx) + receivedMsg := msg.(exampleMessage).value + if receivedMsg != expectedMsg { + panic("expected " + expectedMsg + ", received " + receivedMsg) + } + } + fmt.Println("Success") +} + +func Example_stream_one() { + StreamN(1, 2000) + // Output: Success +} + +func Example_stream_several() { + StreamN(5, 3000) + // Output: Success +} + +func Example_stream_many() { + StreamN(50, 4000) + // Output: Success +} + +// exampleHandler is an RPC handler example. +// +// - implements mino.Handler +type exampleHandler struct { + mino.UnsupportedHandler +} + +// Process implements mino.Handler. It returns the message received. +func (exampleHandler) Process(req mino.Request) (serde.Message, error) { + return req.Message, nil +} + +// Stream implements mino.Handler. It returns the message to the sender. +func (exampleHandler) Stream(sender mino.Sender, recv mino.Receiver) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + from, msg, err := recv.Recv(ctx) + if err != nil { + return err + } + + err = <-sender.Send(msg, from) + if err != nil { + return err + } + + return nil +} + +// exampleMessage is an example of a message. +// +// - implements serde.Message +type exampleMessage struct { + value string +} + +// Serialize implements serde.Message. It returns the value contained in the +// message. +func (m exampleMessage) Serialize(serde.Context) ([]byte, error) { + return []byte(m.value), nil +} + +// exampleFactory is an example of a factory. +// +// - implements serde.Factory +type exampleFactory struct{} + +// Deserialize implements serde.Factory. It returns the message using data as +// the inner value. +func (exampleFactory) Deserialize(ctx serde.Context, data []byte) (serde.Message, error) { + return exampleMessage{value: string(data)}, nil +} diff --git a/routing/table.go b/routing/table.go new file mode 100644 index 0000000..bd3f3a6 --- /dev/null +++ b/routing/table.go @@ -0,0 +1,207 @@ +package routing + +import ( + "fmt" + "github.com/dedis/student20_rabyt/id" + "github.com/dedis/student20_rabyt/routing/handshake" + "go.dedis.ch/dela/mino" + "go.dedis.ch/dela/mino/router" + "go.dedis.ch/dela/mino/router/tree/types" + "math/rand" + "time" +) + +// RoutingTable is a prefix-based routing implementation of router.RoutingTable. +// Each entry in the NextHop maps a prefix of this node's id (thisNode) plus +// a differing next digit to the address, where a packet whose destination has +// this prefix should be routed. +// Players contains the addresses of other nodes that this node is aware of and +// is used to select an alternative next hop when the connection to the one +// specified in NextHop fails. +type RoutingTable struct { + thisNode id.NodeID + thisAddress mino.Address + NextHop map[id.Prefix]mino.Address + Players []mino.Address +} + +// Router implements router.Router +type Router struct { + packetFac router.PacketFactory + hsFac router.HandshakeFactory + routingTable router.RoutingTable +} + +// NewRouter returns a new router. +func NewRouter(f mino.AddressFactory) *Router { + fac := types.NewPacketFactory(f) + hsFac := handshake.NewHandshakeFactory(f) + + r := Router{ + packetFac: fac, + hsFac: hsFac, + routingTable: nil, + } + + return &r +} + +// GetPacketFactory implements router.Router. It returns the packet factory. +func (r *Router) GetPacketFactory() router.PacketFactory { + return r.packetFac +} + +// GetHandshakeFactory implements router.Router. It returns the handshake +// factory. +func (r *Router) GetHandshakeFactory() router.HandshakeFactory { + return r.hsFac +} + +// New implements router.Router. It determines the base and length of node +// ids based on the number of players and creates the routing table for the node +// that is booting the protocol. +func (r *Router) New(players mino.Players, thisAddress mino.Address) ( + router.RoutingTable, error) { + addrs := make([]mino.Address, 0, players.Len()) + iter := players.AddressIterator() + includedThis := false + for iter.HasNext() { + currentAddr := iter.GetNext() + if currentAddr.Equal(thisAddress) { + includedThis = true + } + addrs = append(addrs, currentAddr) + } + if !includedThis { + addrs = append(addrs, thisAddress) + } + + base, length := id.BaseAndLenFromPlayers(len(addrs)) + table, err := NewTable(addrs, id.NewArrayNodeID(thisAddress, base, + length), thisAddress) + if err != nil { + return nil, err + } + r.routingTable = table + return r.routingTable, nil +} + +// GenerateTableFrom implements router.Router. It selects entries for the +// routing table from the addresses, received in the handshake. +func (r *Router) GenerateTableFrom(h router.Handshake) (router.RoutingTable, + error) { + hs := h.(handshake.Handshake) + if r.routingTable == nil { + thisId := id.NewArrayNodeID(hs.ThisAddress, hs.IdBase, hs.IdLength) + table, err := NewTable(hs.Addresses, thisId, hs.ThisAddress) + if err != nil { + return nil, err + } + r.routingTable = table + } + return r.routingTable, nil +} + +// NewTable constructs a routing table from the addresses of participating nodes. +// It requires the id of the node, for which the routing table is constructed, +// to calculate the common prefix of this node's id and other nodes' ids. +func NewTable(addresses []mino.Address, thisId id.NodeID, + thisAddress mino.Address) (router.RoutingTable, error) { + // random shuffle ensures that different nodes have different entries for + // the same prefix + randomShuffle(addresses) + + hopMap := make(map[id.Prefix]mino.Address) + for _, address := range addresses { + if address.Equal(thisAddress) { + continue + } + otherId := id.NewArrayNodeID(address, thisId.Base(), thisId.Length()) + if otherId.Equals(thisId) { + return nil, fmt.Errorf("id collision: id %s for addresses %s" + + " and %s", thisId, thisAddress.String(), address.String()) + } + prefix, err := otherId.CommonPrefixAndFirstDifferentDigit(thisId) + if err != nil { + return nil, fmt.Errorf("error when calculating common prefix of" + + " ids: %s", err.Error()) + } + if _, contains := hopMap[prefix]; !contains { + hopMap[prefix] = address + } + } + + return RoutingTable{thisId, thisAddress, hopMap, addresses}, nil +} + +func randomShuffle(addresses []mino.Address) { + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(addresses), func(i, j int) { + addresses[i], addresses[j] = addresses[j], addresses[i] + }) +} + +// Make implements router.RoutingTable. It creates a packet with the source +// address, the destination addresses and the payload. +func (t RoutingTable) Make(src mino.Address, to []mino.Address, + msg []byte) router.Packet { + return types.NewPacket(src, msg, to...) +} + +// PrepareHandshakeFor implements router.RoutingTable. It creates a handshake +// that should be sent to the distant peer when opening a relay to it. +// The peer will then generate its own routing table based on the handshake. +func (t RoutingTable) PrepareHandshakeFor(to mino.Address) router.Handshake { + base, length := id.BaseAndLenFromPlayers(len(t.Players)) + return handshake.Handshake{IdBase: base, IdLength: length, + ThisAddress: to, Addresses: t.Players} +} + +// Forward implements router.RoutingTable. It splits the packet into multiple, +// based on the calculated next hops. +func (t RoutingTable) Forward(packet router.Packet) (router.Routes, + router.Voids) { + routes := make(router.Routes) + voids := make(router.Voids) + + for _, dest := range packet.GetDestination() { + gateway := t.GetRoute(dest) + p, ok := routes[gateway] + // A packet for this next hop hasn't been created yet, + // create it and add to routes + if !ok { + p = types.NewPacket(packet.GetSource(), packet.GetMessage()) + routes[gateway] = p + } + + p.(*types.Packet).Add(dest) + } + + return routes, voids + +} + +// GetRoute implements router.RoutingTable. It calculates the next hop for a +// given destination. +func (t RoutingTable) GetRoute(to mino.Address) mino.Address { + toId := id.NewArrayNodeID(to, t.thisNode.Base(), t.thisNode.Length()) + // Since id collisions are not expected, the only way this can happen is + // if this node is orchestrator's server side and the message is routed to + // orchestrator's client side. The only way the message can reach it is if + // it is routed to nil. + if toId.Equals(t.thisNode) && !to.Equal(t.thisAddress) { + return nil + } + // Take the common prefix of this node and destination + first differing + // digit of the destination + routingPrefix, _ := toId.CommonPrefixAndFirstDifferentDigit(t.thisNode) + return t.NextHop[routingPrefix] +} + +// OnFailure implements router.RoutingTable. It changes the next hop for a +// given destination because the provided next hop is not available. +func (t RoutingTable) OnFailure(to mino.Address) error { + // TODO: keep redundancy in the routing table, use the alternative hop + // and mark this node as unreachable + return nil +}