@@ -6,6 +6,45 @@ module Selection = Worker.Selection
6
6
module Store = Git_unix. Store
7
7
module Worker_process = Internal_worker. Worker_process
8
8
9
+ module Metrics = struct
10
+ open Prometheus
11
+
12
+ let namespace = " ocluster"
13
+ let subsystem = " worker"
14
+
15
+ let in_use_solver_process =
16
+ let help = " Number of solver-process currently used for solving" in
17
+ Gauge. v ~help ~namespace ~subsystem " in_use_solver_process"
18
+
19
+ let wait_queue_solver_process_pool =
20
+ let help = " Number of request in the queue of the solver-process pool" in
21
+ Gauge. v ~help ~namespace ~subsystem " wait_queue_requests"
22
+
23
+ let started_requests =
24
+ let help = " Number of requests sent on solver-process pool" in
25
+ Counter. v ~help ~namespace ~subsystem " started_requests_total"
26
+
27
+ let success_requests =
28
+ let help = " Number of solver_process requests success" in
29
+ Counter. v ~help ~namespace ~subsystem " success_requests_total"
30
+
31
+ let failed_requests =
32
+ let help = " Number of solver_process requests failed" in
33
+ Counter. v ~help ~namespace ~subsystem " failed_requests_total"
34
+
35
+ let canceled_before_started =
36
+ let help = " Number of solver_process requests canceled before started" in
37
+ Counter. v ~help ~namespace ~subsystem " canceled_before_started_total"
38
+
39
+ let canceled_after_started =
40
+ let help = " Number of solver_process requests canceled after started" in
41
+ Counter. v ~help ~namespace ~subsystem " canceled_after_started_total"
42
+ end
43
+
44
+ let update_wait_queue t =
45
+ Prometheus.Gauge. set Metrics. wait_queue_solver_process_pool
46
+ (float_of_int (Lwt_pool. wait_queue_length t))
47
+
9
48
let oldest_commit = Lwt_pool. create 180 @@ fun _ -> Lwt. return_unit
10
49
(* we are using at most 360 pipes at the same time and that's enough to keep the current
11
50
* performance and prevent some jobs to fail because of file descriptors exceed the limit.*)
@@ -91,7 +130,10 @@ module Make (Opam_repo : Opam_repository_intf.S) = struct
91
130
92
131
(* Send [request] to [worker] and read the reply. *)
93
132
let process ~switch ~log ~id request worker =
94
- if not (Lwt_switch. is_on switch) then Lwt. fail Lwt. Canceled
133
+ Prometheus.Gauge. inc_one Metrics. in_use_solver_process;
134
+ if not (Lwt_switch. is_on switch) then (
135
+ Prometheus.Counter. inc_one Metrics. canceled_before_started;
136
+ Lwt. fail Lwt. Canceled )
95
137
else
96
138
let request_str =
97
139
Worker.Solve_request. to_yojson request |> Yojson.Safe. to_string
@@ -131,6 +173,7 @@ module Make (Opam_repo : Opam_repository_intf.S) = struct
131
173
* workers's pool choosing the worker for another processing.*)
132
174
if Lwt. state process = Lwt. Sleep then (
133
175
Worker_process. release worker;
176
+ Prometheus.Counter. inc_one Metrics. canceled_after_started;
134
177
Lwt. cancel process;
135
178
dispose worker)
136
179
else Lwt. return_unit )
@@ -193,9 +236,25 @@ module Make (Opam_repo : Opam_repository_intf.S) = struct
193
236
else compatible_root_pkgs
194
237
in
195
238
let slice = { request with platforms = [ p ]; root_pkgs } in
196
- Lwt_pool. use t (process ~switch ~log ~id slice) >> = function
197
- | Error _ as e -> Lwt. return (id, e)
239
+ Lwt. catch
240
+ (fun () ->
241
+ Prometheus.Counter. inc_one Metrics. started_requests;
242
+ update_wait_queue t;
243
+ Lwt_pool. use t (process ~switch ~log ~id slice))
244
+ (function
245
+ | Lwt. Canceled ->
246
+ Prometheus.Gauge. dec_one Metrics. in_use_solver_process;
247
+ update_wait_queue t;
248
+ Lwt. fail Lwt. Canceled
249
+ | ex -> raise ex)
250
+ >> = fun result ->
251
+ Prometheus.Gauge. dec_one Metrics. in_use_solver_process;
252
+ match result with
253
+ | Error _ as e ->
254
+ Prometheus.Counter. inc_one Metrics. failed_requests;
255
+ Lwt. return (id, e)
198
256
| Ok packages ->
257
+ let _ = Prometheus.Counter. inc_one Metrics. success_requests in
199
258
let repo_packages =
200
259
packages
201
260
|> List. filter_map (fun pkg ->
0 commit comments