-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnode.go
186 lines (157 loc) · 5.43 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Package node provides distributed worker pools with supervisors.
package node
import (
"encoding/gob"
"errors"
"fmt"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/rpc"
)
func init() {
gob.Register(&ARpc{})
}
const (
// EnvAmNodeLogSupervisor enables machine logging for node supervisor.
EnvAmNodeLogSupervisor = "AM_NODE_LOG_SUPERVISOR"
// EnvAmNodeLogClient enables machine logging for node client.
EnvAmNodeLogClient = "AM_NODE_LOG_CLIENT"
)
// states of a worker
type WorkerState string
const (
StateIniting WorkerState = "initing"
StateRpc WorkerState = "rpc"
StateIdle WorkerState = "idle"
StateBusy WorkerState = "busy"
StateReady WorkerState = "ready"
)
// ///// ///// /////
// ///// ERRORS
// ///// ///// /////
// sentinel errors
var (
ErrWorker = errors.New("worker error")
ErrWorkerMissing = errors.New("worker missing")
ErrWorkerHealth = errors.New("worker failed healthcheck")
ErrWorkerConn = errors.New("error starting connection")
ErrWorkerKill = errors.New("error killing worker")
ErrPool = errors.New("pool error")
ErrHeartbeat = errors.New("heartbeat failed")
ErrRpc = errors.New("rpc error")
)
// error mutations
// TODO add event param
// AddErrWorker wraps an error in the ErrWorker sentinel and adds to a machine.
func AddErrWorker(
event *am.Event, mach *am.Machine, err error, args am.A,
) error {
err = fmt.Errorf("%w: %w", ErrWorker, err)
mach.EvAddErrState(event, ssS.ErrWorker, err, args)
return err
}
// AddErrWorkerStr wraps a msg in the ErrWorker sentinel and adds to a machine.
func AddErrWorkerStr(mach *am.Machine, msg string, args am.A) error {
err := fmt.Errorf("%w: %s", ErrWorker, msg)
mach.AddErrState(ssS.ErrWorker, err, args)
return err
}
// AddErrPool wraps an error in the ErrPool sentinel and adds to a machine.
func AddErrPool(mach *am.Machine, err error, args am.A) error {
wrappedErr := fmt.Errorf("%w: %w", ErrPool, err)
mach.AddErrState(ssS.ErrPool, wrappedErr, args)
return wrappedErr
}
// AddErrPoolStr wraps a msg in the ErrPool sentinel and adds to a machine.
func AddErrPoolStr(mach *am.Machine, msg string, args am.A) error {
err := fmt.Errorf("%w: %s", ErrPool, msg)
mach.AddErrState(ssS.ErrPool, err, args)
return err
}
// AddErrRpc wraps an error in the ErrRpc sentinel and adds to a machine.
func AddErrRpc(mach *am.Machine, err error, args am.A) error {
wrappedErr := fmt.Errorf("%w: %w", ErrRpc, err)
mach.AddErrState(ssS.ErrNetwork, wrappedErr, args)
return wrappedErr
}
// ///// ///// /////
// ///// ARGS
// ///// ///// /////
// A is a struct for node arguments. It's a typesafe alternative to am.A.
type A struct {
// Id is a machine ID.
Id string `log:"id"`
// PublicAddr is the public address of a Supervisor or WorkerRpc.
PublicAddr string `log:"public_addr"`
// LocalAddr is the public address of a Supervisor or WorkerRpc.
LocalAddr string `log:"local_addr"`
// BootAddr is the local address of the Bootstrap machine.
BootAddr string `log:"boot_addr"`
// NodesList is a list of available nodes (supervisors' public RPC addresses).
NodesList []string
// WorkerRpcId is a machine ID of the worker RPC client.
WorkerRpcId string `log:"id"`
// SuperRpcId is a machine ID of the super RPC client.
SuperRpcId string `log:"id"`
// non-rpc fields
// WorkerRpc is the RPC client connected to a WorkerRpc.
WorkerRpc *rpc.Client
// Bootstrap is the RPC machine used to connect WorkerRpc to the Supervisor.
Bootstrap *bootstrap
// Dispose the worker.
Dispose bool
// WorkerAddr is an index for WorkerInfo.
WorkerAddr string
// WorkerInfo describes a worker.
WorkerInfo *workerInfo
// WorkersCh returns a list of workers. This channel has to be buffered.
WorkersCh chan<- []*workerInfo
// WorkerState is a requested state of workers, eg for listings.
WorkerState WorkerState
}
// ARpc is a subset of A, that can be passed over RPC.
type ARpc struct {
// Id is a machine ID.
Id string `log:"id"`
// PublicAddr is the public address of a Supervisor or Worker.
PublicAddr string `log:"public_addr"`
// LocalAddr is the public address of a Supervisor, Worker, or [bootstrap].
LocalAddr string `log:"local_addr"`
// BootAddr is the local address of the Bootstrap machine.
BootAddr string `log:"boot_addr"`
// NodesList is a list of available nodes (supervisors' public RPC addresses).
NodesList []string
// WorkerRpcId is a machine ID of the worker RPC client.
WorkerRpcId string `log:"worker_rpc_id"`
// SuperRpcId is a machine ID of the super RPC client.
SuperRpcId string `log:"super_rpc_id"`
}
// ParseArgs extracts A from [am.Event.Args]["am_node"].
func ParseArgs(args am.A) *A {
if r, ok := args["am_node"].(*ARpc); ok {
return amhelp.ArgsToArgs(r, &A{})
} else if r, ok := args["am_node"].(ARpc); ok {
return amhelp.ArgsToArgs(&r, &A{})
}
if a, _ := args["am_node"].(*A); a != nil {
return a
}
return &A{}
}
// Pass prepares [am.A] from A to pass to further mutations.
func Pass(args *A) am.A {
return am.A{"am_node": args}
}
// PassRpc prepares [am.A] from A to pass over RPC.
func PassRpc(args *A) am.A {
return am.A{"am_node": amhelp.ArgsToArgs(args, &ARpc{})}
}
// LogArgs is an args logger for A and rpc.A.
func LogArgs(args am.A) map[string]string {
a1 := rpc.ParseArgs(args)
a2 := ParseArgs(args)
if a1 == nil && a2 == nil {
return nil
}
return am.AMerge(amhelp.ArgsToLogMap(a1), amhelp.ArgsToLogMap(a2))
}