-
Notifications
You must be signed in to change notification settings - Fork 1
/
master.go
72 lines (64 loc) · 1.57 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package Onyx
import (
pb "github.com/go-gems/Onyx/stream"
"github.com/google/uuid"
"google.golang.org/grpc"
"io"
"math/rand"
"net"
"time"
)
type master struct {
*actionCollection
*connectionCollection
Id string
Address string
Protocol string
}
func (masterNode *master) Connect(server pb.StreamService_ConnectServer) error {
debugPrintf("connection established : %v",masterNode.Id)
server.Send(&pb.Instruction{
From: masterNode.Id,
Action: "_",
})
for {
instruction, err := server.Recv()
if err == io.EOF || instruction == nil {
debugPrintln("empty instruction")
return nil
}
debugPrintf("instruction received : %v - %v", instruction.From, instruction.Action)
slave := masterNode.fetchOrCreate(instruction.From, masterNode.Id, server, nil)
if err != nil {
masterNode.remove(instruction.From)
debugPrintln(err)
return nil
}
if err = masterNode.do(slave, instruction); err != nil {
return err
}
}
}
func NewMaster(address string) *master {
rand.Seed(time.Now().UnixNano())
return &master{
Id: uuid.New().String(),
Address: address,
Protocol: "tcp",
connectionCollection: newConnectionCollection(),
actionCollection: newActionCollection(),
}
}
func NewDefaultMaster() *master {
return NewMaster(":50000")
}
func (masterNode *master) Serve() error {
lis, err := net.Listen(masterNode.Protocol, masterNode.Address)
if err != nil {
return err
}
s := grpc.NewServer()
pb.RegisterStreamServiceServer(s, masterNode)
debugPrintln("starting server")
return s.Serve(lis)
}