forked from barko/dawg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.ml
523 lines (443 loc) · 15.4 KB
/
worker.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
(* server which computes best splits on behalf of clients *)
let port = 60_287
(* if we can't bind to this port, we concluded that another instance
of this server is running on the same host; only one such instance
may run on each host, so we refuse to start another on another
port *)
let sp = Printf.sprintf
let create () =
LP_tcp.Server.create port
(* serialize and send outgoing message *)
let send t peer m =
let s = Proto_b.string_of_from_worker m in
LP_tcp.Server.send t peer (Some s)
(* deserialize (parse) incoming message *)
let recv srv =
lwt peer, event = LP_tcp.Server.recv srv in
let event =
match event with
| `Connect -> `Connect
| `Disconnect -> `Disconnect
| `Message s -> `Message (Proto_b.to_worker_of_string s)
in
Lwt.return (peer, event)
let is_sleeping thr =
match Lwt.state thr with
| Lwt.Sleep
| Lwt.Return _ -> false
| Lwt.Fail _ -> assert false
let nchoose_fold f threads x0 =
lwt results = Lwt.nchoose threads in
let sleeping_threads = List.filter is_sleeping threads in
let rec loop x threads_accu = function
| [] -> Lwt.return (x, List.flatten threads_accu)
| h :: t ->
lwt x, threads = f x h in
loop x (threads :: threads_accu) t
in
loop x0 [sleeping_threads] results
type ba = bool array
type subset_list = [
| `S of (ba * subset_list)
| `LR of (ba * ba * subset_list)
| `N
]
type eval = (Dog_t.feature_id -> Feat.afeature) ->
Model_t.l_tree -> float array
module Configured = struct
type t = {
task_id : Proto_t.task_id;
y_feature_id : Proto_t.feature_id;
fold_feature_id_opt : Proto_t.feature_id option;
splitter : Loss.splitter;
feature_map : D_feat_map.t;
sampler : Sampler.t;
fold : int array;
eval : eval;
num_observations : int;
}
end
module Learning = struct
type t = {
task_id : Proto_t.task_id;
y_feature_id : Proto_t.feature_id;
fold_feature_id_opt : Proto_t.feature_id option;
splitter : Loss.splitter;
feature_map : D_feat_map.t;
sampler : Sampler.t;
fold : int array;
eval : eval;
num_observations : int;
fold_set : bool array;
subsets : subset_list;
}
end
type state = [
| `Available
(* worker is free to do work for any master that cares for its services *)
| `Acquired of Proto_t.task_id
(* configured is setting up the task *)
| `Configured of Configured.t
(* worker has successfully setup the task; that means
it has at least the target (y) feature, and the fold
feature (if one is required) *)
| `Learning of Learning.t
]
type t = {
srv : LP_tcp.Server.t;
worker_id : string;
(* path of directory that we use to read and write files in the
process of working on tasks *)
dot_dawg : string;
user : string;
state : state;
}
let rec service t threads =
lwt t, threads = nchoose_fold react threads t in
service t threads
and react t = function
| peer, `Connect -> Lwt.return (t, [recv t.srv])
| peer, `Disconnect -> Lwt.return (t, [recv t.srv])
| peer, `Message msg -> react_msg t peer msg
and react_msg t peer = function
| `Id ->
let ack_id = `AckId { Proto_b.worker_id = t.worker_id; user = t.user } in
lwt () = send t.srv peer ack_id in
Lwt.return (t, [recv t.srv])
| `InformPeerHosts _ -> assert false
| `Acquire task_id -> (
match t.state with
| `Available ->
let t, response = react_available t task_id in
lwt () = send t.srv peer response in
Lwt.return (t, [recv t.srv])
| `Acquired _
| `Configured _
| `Learning _ ->
let response = `AckAcquire false in (* not available *)
lwt () = send t.srv peer response in
Lwt.return (t, [recv t.srv])
)
| `Acquired (s_task_id, acquired_msg) -> (
match t.state with
| `Acquired task_id ->
if s_task_id = task_id then
let t, response = react_acquired t task_id acquired_msg in
lwt () = send t.srv peer response in
Lwt.return (t, [recv t.srv])
else
let response = `Error "busy with another task" in
lwt () = send t.srv peer response in
Lwt.return (t, [recv t.srv])
| _ ->
let response = `Error "not acquired" in
lwt () = send t.srv peer response in
Lwt.return (t, [recv t.srv])
)
| `Configured (s_task_id, configured_msg) -> (
lwt t, response =
match t.state with
| `Configured conf ->
let open Configured in
if s_task_id = conf.task_id then
Lwt.return (react_configured t conf configured_msg)
else
Lwt.return (t, `Error "busy with another task")
| _ ->
Lwt.return (t, `Error "not configured")
in
lwt () = send t.srv peer response in
Lwt.return (t, [recv t.srv])
)
| `Learning (task_id, learning_msg) -> (
match t.state with
| `Learning learning -> (
let open Learning in
if task_id = learning.task_id then
lwt t, result = react_learning_msg t learning learning_msg in
lwt () = send t.srv peer result in
Lwt.return (t, [recv t.srv])
else
let result = `Error "busy on another task" in
lwt () = send t.srv peer result in
Lwt.return (t, [recv t.srv])
)
| _ ->
let response = `Error "not learning" in
lwt () = send t.srv peer response in
Lwt.return (t, [recv t.srv])
)
and react_available t task_id =
let t = { t with state = `Acquired task_id } in
t, `AckAcquire true
and react_acquired t task_id = function
| `Configure conf ->
let open Proto_t in
(* create a directory for the task; it will contain a number of
memory-mapped files. *)
let task_home = Filename.concat t.dot_dawg task_id in
Utils.mkdir_else_exit task_home;
let dog_ra =
let dog_file = Filename.concat task_home "dog" in
Dog_io.RW.create dog_file (Some (conf.dog_file_size, conf.dog_t))
in
let feature_map = D_feat_map.create dog_ra in
(* add the target feature *)
let y_feature_id, y_feature_vector = conf.y_feature in
let feature_map = D_feat_map.add feature_map y_feature_id
y_feature_vector `Inactive in
(* now extract it *)
let y_feature =
try
D_feat_map.a_find_by_id feature_map y_feature_id
with D_feat_map.FeatureIdNotFound _ ->
assert false
in
let num_observations = Dog_io.RW.num_observations dog_ra in
try
let splitter =
match conf.loss_type with
| `Logistic -> new Logistic.splitter y_feature num_observations
| `Square -> new Square.splitter y_feature num_observations
in
let sampler = Sampler.create num_observations in
let eval = Tree.mk_eval num_observations in
let random_state = Random.State.make conf.random_seed in
Sampler.shuffle sampler random_state;
match conf.fold_feature_opt with
| None ->
(* we don't have a fold feature; randomly assign folds *)
let fold = Sampler.array (
fun ~index ~value ->
value mod conf.num_folds
) sampler in
let configured = Configured.({
task_id;
y_feature_id;
fold_feature_id_opt = None;
feature_map;
sampler;
splitter;
fold;
eval;
num_observations;
}) in
{ t with state = `Configured configured }, `AckConfigure
| Some (fold_feature_id, fold_feature_vector) ->
let feature_map = D_feat_map.add feature_map fold_feature_id
fold_feature_vector `Inactive in
try
let fold_feature = D_feat_map.a_find_by_id feature_map
fold_feature_id in
match Feat_utils.folds_of_feature ~n:num_observations
~num_folds:conf.num_folds fold_feature with
| `Folds fold ->
let configured =
let open Configured in
{ task_id;
y_feature_id;
fold_feature_id_opt = Some fold_feature_id;
feature_map;
sampler;
splitter;
fold;
eval;
num_observations;
}
in
{ t with state = `Configured configured }, `AckConfigure
| `TooManyOrdinalFolds cardinality ->
let err = sp "the cardinality of ordinal fold feature (%d) is \
too large relative to the number of folds (%d)"
cardinality conf.num_folds in
t, `Error err
| `CategoricalCardinalityMismatch cardinality ->
let err = sp "the cardinality of the categorical fold feature (%d) \
must equal the number of folds (%d)"
cardinality conf.num_folds in
t, `Error err
with Dog_io.RW.FeatureIdNotFound _ ->
t, `Error (sp "fold feature %d not found" fold_feature_id)
with
| Loss.WrongTargetType -> t, `Error "wrong target type"
| Loss.BadTargetDistribution -> t, `Error "bad target distribution"
and react_learning_msg t learning = function
| `BestSplit ->
let result = best_split learning in
Lwt.return (t, result)
| `Sample -> Lwt.return (sample t learning)
| `Ascend -> Lwt.return (ascend t learning)
| `Push p -> Lwt.return (push t learning p)
| `Descend d -> Lwt.return (descend t learning d)
| `CopyFeatures cf ->
Lwt.return (copy_features t learning cf)
and best_split learning =
let open Learning in
match learning.subsets with
| `LR _ | `N -> `Error "best_split: not in S state"
| `S (subset, _) ->
let result =
D_feat_map.best_split_of_features learning.feature_map
learning.splitter
in
let loss_split_opt =
match result with
| Some (_, loss, split) -> Some (loss, split)
| None -> None
in
`AckBestSplit loss_split_opt
and sample t learning =
let open Learning in
match learning.subsets with
| `N ->
let subset = Sampler.array (
fun ~index ~value ->
(* sample half the data that is also in the current fold *)
learning.fold_set.(index) && value mod 2 = 0
) learning.sampler in
learning.splitter#update_with_subset subset;
let learning = { learning with subsets = `S ( subset, `N ) } in
let t = { t with state = `Learning learning } in
t, `AckSample
| `LR _ | `S _ ->
(* TODO: perhaps relax this, so we can enter at any sample
state? this would make it possible to start over if any one
of the workers died. or else, we should create a message to
clear the subsets to [`N]. *)
t, `Error "sample: not in N state"
and ascend t learning =
let open Learning in
match learning.subsets with
| `LR (_, _, subsets ) -> (
let learning = { learning with subsets } in
let t = { t with state = `Learning learning } in
t, `AckAscend
)
| `S _ | `N ->
t, `Error "ascend: not in LR state"
and push t learning {Proto_t.split; feature_id} =
let open Learning in
match learning.subsets with
| `S (subset, _) -> (
try
let splitting_feature = D_feat_map.a_find_by_id learning.feature_map
feature_id in
let left, right =
Tree.partition_observations subset splitting_feature split in
let subsets = `LR (left, right, learning.subsets) in
let learning = { learning with subsets } in
let t = { t with state = `Learning learning } in
t, `AckPush
with D_feat_map.FeatureIdNotFound _ ->
t, `Error (sp "push: feature %d not found" feature_id)
)
| `LR _ | `N ->
t, `Error "push: not in S state"
and descend t learning direction =
let open Learning in
match learning.subsets with
| `S _ | `N ->
t, `Error "descend: not in LR state"
| `LR ( left, right, _) ->
let subsets =
let subset =
match direction with
| `Left -> left
| `Right -> right
in
`S (subset , learning.subsets)
in
let learning = { learning with subsets } in
let t = { t with state = `Learning learning } in
t, `AckDescend
and copy_features t learning list =
let open Learning in
let feature_map = List.fold_left (
fun t (feature_id, vector) ->
D_feat_map.add learning.feature_map feature_id vector `Active
) learning.feature_map list in
let learning = { learning with feature_map } in
let t = { t with state = `Learning learning } in
t, `AckCopyFeatures
and react_configured t configured = function
| `Learn {Proto_t.fold; learning_rate} ->
assert( fold >= 0 );
assert (learning_rate > 0.0);
let open Configured in
let fold_set = Array.init configured.num_observations
(fun i -> configured.fold.(i) <> fold) in
let gamma =
let leaf0 = configured.splitter#first_tree fold_set in
let first_tree = Tree.shrink learning_rate leaf0 in
configured.eval
(D_feat_map.a_find_by_id configured.feature_map) first_tree in
configured.splitter#clear;
(match configured.splitter#boost gamma with
| `NaN -> assert false
| `Ok -> ()
);
let learning = {
Learning.task_id = configured.task_id;
y_feature_id = configured.y_feature_id;
fold_feature_id_opt = configured.fold_feature_id_opt;
feature_map = configured.feature_map;
sampler = configured.sampler;
splitter = configured.splitter;
fold = configured.fold;
eval = configured.eval;
num_observations = configured.num_observations;
fold_set;
subsets = `N;
} in
let state = `Learning learning in
let t = { t with state } in
t, `AckLearn
let worker detach : unit =
(* igore SIGPIPE's *)
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
(* create the working directory [$HOME/.dawg] if one does not
already exist *)
let home = Unix.getenv "HOME" in
let dot_dawg = Filename.concat home ".dawg" in
Utils.mkdir_else_exit dot_dawg;
(* read the existing worker id (stored in [$HOME/.dawg/worker-id])
or create a random worker id, and write it to that path *)
let worker_id =
let worker_id_path = Filename.concat dot_dawg "worker-id" in
if Sys.file_exists worker_id_path then
Utils.bi_read_from_file Proto_b.read_worker_id worker_id_path
else
(* create the worker id *)
let worker_id = "asdfasdfasdf" in
Utils.bi_write_to_file Proto_b.write_worker_id worker_id_path worker_id;
worker_id
in
let srv =
try
create ()
with Unix.Unix_error( _, "bind", _) ->
(* TODO: connect to the process, to get its id and user *)
Printf.printf "another process already has port %d bound\n%!" port;
exit 1
in
let threads = [recv srv] in
let t = {
srv;
worker_id;
user = Unix.getlogin ();
dot_dawg;
state = `Available
} in
Lwt_main.run (service t threads)
open Cmdliner
let commands =
let worker_cmd =
let doc = "start the EigenDog worker server" in
let detach =
let doc = "detach from the terminal" in
Arg.(value & opt (some bool) (Some true) &
info ["d";"detach"] ~docv:"BOOL" ~doc)
in
Term.( pure worker $ detach ), Term.info "worker" ~doc
in
[worker_cmd]