Skip to content

Commit

Permalink
long job tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
cmr committed Oct 17, 2019
1 parent 8911a7a commit dca8c1b
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/job_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ let run_jobs (type a) t scheduler =
the job out of the queue and decrement [jobs_left_this_cycle]. [run_job] or
[run_external_jobs] may side effect [t], either by enqueueing jobs, or by
clearing [t]. *)
let start = Time_ns.now () in
run_job t scheduler execution_context f a;
let this_job_time = Time_ns.(diff (now ()) start) in
if Float.(Time_ns.Span.to_ms this_job_time >= 2000.) then scheduler.long_jobs_last_cycle <- (execution_context, this_job_time) :: scheduler.long_jobs_last_cycle;
(* [run_external_jobs] at each iteration of the [while] loop, for fairness. *)
run_external_jobs t scheduler
done;
Expand Down
6 changes: 6 additions & 0 deletions src/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ let long_cycles_with_context t ~at_least =
then Tail.extend tail (t.last_cycle_time,t.current_execution_context)))
;;

let long_jobs_with_context t =
Stream.create (fun tail ->
run_every_cycle_start t ~f:(fun () ->
List.iter t.long_jobs_last_cycle ~f:(fun job -> Tail.extend tail job)))
;;

let cycle_num_jobs t =
Stream.create (fun tail ->
run_every_cycle_start t ~f:(fun () -> Tail.extend tail t.last_cycle_num_jobs))
Expand Down
1 change: 1 addition & 0 deletions src/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ val set_record_backtraces : t -> bool -> unit
val run_every_cycle_start : t -> f:(unit -> unit) -> unit
val long_cycles : t -> at_least:Time_ns.Span.t -> Time_ns.Span.t Async_stream.t
val long_cycles_with_context : t -> at_least:Time_ns.Span.t -> (Time_ns.Span.t * Execution_context.t) Async_stream.t
val long_jobs_with_context : t -> (Execution_context.t * Time_ns.Span.t) Async_stream.t
val can_run_a_job : t -> bool
val create_alarm : t -> (unit -> unit) -> Gc.Expert.Alarm.t
val add_finalizer : t -> 'a Heap_block.t -> ('a Heap_block.t -> unit) -> unit
Expand Down
3 changes: 3 additions & 0 deletions src/scheduler1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type t = Scheduler0.t =
; mutable record_backtraces : bool
; mutable on_start_of_cycle : unit -> unit
; mutable on_end_of_cycle : unit -> unit
; mutable long_jobs_last_cycle : (Execution_context.t * Time_ns.Span.t) list
; mutable cycle_started : bool
}
[@@deriving fields, sexp_of]
Expand Down Expand Up @@ -174,6 +175,7 @@ let invariant t : unit =
~max_num_jobs_per_priority_per_cycle:ignore
~record_backtraces:ignore
~cycle_started:ignore
~long_jobs_last_cycle:ignore
~on_start_of_cycle:ignore
~on_end_of_cycle:ignore
with
Expand Down Expand Up @@ -249,6 +251,7 @@ let create () =
; cycle_started = false
; on_start_of_cycle = Fn.id
; on_end_of_cycle = Fn.id
; long_jobs_last_cycle= []
}
and events =
Timing_wheel_ns.create ~config:Async_kernel_config.timing_wheel_config ~start:now
Expand Down
3 changes: 2 additions & 1 deletion src/types.ml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ and Scheduler : sig
; mutable record_backtraces : bool
; mutable on_start_of_cycle : unit -> unit
; mutable on_end_of_cycle : unit -> unit
; mutable long_jobs_last_cycle : (Execution_context.t * Time_ns.Span.t) list
; mutable cycle_started : bool
}
end =
Expand Down Expand Up @@ -248,6 +249,6 @@ end =
Very_low_priority_worker

and Tracing : sig
type tracing_fns =
type tracing_fns =
{ trace_thread_switch : Execution_context.t -> unit }
end = Tracing

0 comments on commit dca8c1b

Please sign in to comment.