forked from happyer/distributed-computing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaster_rpc.go
66 lines (61 loc) · 1.54 KB
/
master_rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package mapreduce
import (
"fmt"
"log"
"net"
"net/rpc"
"os"
)
// Shutdown is an RPC method that shuts down the Master's RPC server.
func (mr *Master) Shutdown(_, _ *struct{}) error {
debug("Shutdown: registration server\n")
close(mr.shutdown)
mr.l.Close() // causes the Accept to fail
return nil
}
// startRPCServer staarts the Master's RPC server. It continues accepting RPC
// calls (Register in particular) for as long as the worker is alive.
func (mr *Master) startRPCServer() {
rpcs := rpc.NewServer()
rpcs.Register(mr)
os.Remove(mr.address) // only needed for "unix"
l, e := net.Listen("unix", mr.address)
if e != nil {
log.Fatal("RegstrationServer", mr.address, " error: ", e)
}
mr.l = l
// now that we are listening on the master address, can fork off
// accepting connections to another thread.
go func() {
loop:
for {
select {
case <-mr.shutdown:
break loop
default:
}
conn, err := mr.l.Accept()
if err == nil {
go func() {
rpcs.ServeConn(conn)
conn.Close()
}()
} else {
debug("RegistrationServer: accept error", err)
break
}
}
debug("RegistrationServer: done\n")
}()
}
// stopRPCServer stops the master RPC server.
// This must be done through an RPC to avoid race conditions between the RPC
// server thread and the current thread.
func (mr *Master) stopRPCServer() {
var reply ShutdownReply
ok := call(mr.address, "Master.Shutdown", new(struct{}), &reply)
if ok == false {
fmt.Printf("Cleanup: RPC %s error\n", mr.address)
}
debug("cleanupRegistration: done\n")
}