Skip to content

Commit

Permalink
WIP: Two stack queue
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jan 5, 2024
1 parent ad3f582 commit d809286
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 23 deletions.
112 changes: 112 additions & 0 deletions bench/bench_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
open Kcas_data
open Bench

let run_single ~budgetf ?(n_msgs = 100 * Util.iter_factor) () =
let t = Two_stack_queue.create () in

let init _ = () in
let work _ () =
for i = 1 to n_msgs do
Two_stack_queue.push t i;
Two_stack_queue.pop t |> ignore
done
in

let times = Times.record ~n_domains:1 ~budgetf ~init ~work () in

let name metric = Printf.sprintf "%s/single-domain" metric in

List.concat
[
Stats.of_times times
|> Stats.scale (1_000_000_000.0 /. Float.of_int n_msgs)
|> Stats.to_json ~name:(name "time per message")
~description:
"Time to transmit one message from one domain to another"
~units:"ns";
Times.invert times |> Stats.of_times
|> Stats.scale (Float.of_int n_msgs /. 1_000_000.0)
|> Stats.to_json
~name:(name "messages over time")
~description:
"Number of messages transmitted over time using all domains"
~units:"M/s";
]

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 100 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Two_stack_queue.create () in

let n_msgs_to_take = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in

let init _ = () in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Two_stack_queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then begin
for _ = 1 to n do
while Option.is_none (Two_stack_queue.pop_opt t) do
Domain.cpu_relax ()
done
done;
work ()
end
in
work ()
in
let after () =
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in

let times = Times.record ~n_domains ~budgetf ~init ~work ~after () in

let name metric =
let format role blocking n =
Printf.sprintf "%d %s%s%s" n
(if blocking then "" else "nb ")
role
(if n = 1 then "" else "s")
in
Printf.sprintf "%s/%s, %s" metric
(format "adder" false n_adders)
(format "taker" false n_takers)
in

List.concat
[
Stats.of_times times
|> Stats.scale (1_000_000_000.0 /. Float.of_int n_msgs)
|> Stats.to_json ~name:(name "time per message")
~description:
"Time to transmit one message from one domain to another"
~units:"ns";
Times.invert times |> Stats.of_times
|> Stats.scale (Float.of_int (n_msgs * n_domains) /. 1_000_000.0)
|> Stats.to_json
~name:(name "messages over time")
~description:
"Number of messages transmitted over time using all domains"
~units:"M/s";
]

let run_suite ~budgetf =
run_single ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ let benchmarks =
("Kcas_data Dllist", Bench_dllist.run_suite);
("Kcas_data Hashtbl", Bench_hashtbl.run_suite);
("Kcas_data Mvar", Bench_mvar.run_suite);
("Kcas_data Two_stack_queue", Bench_two_stack_queue.run_suite);
("Kcas_data Queue", Bench_queue.run_suite);
("Kcas_data Stack", Bench_stack.run_suite);
]
Expand Down
4 changes: 1 addition & 3 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@
(multicore-magic
(>= 2.1.0))
(backoff
(and
(>= 0.1.0)
:with-test))
(>= 0.1.0))
(domain-local-await
(and
(>= 1.0.0)
Expand Down
2 changes: 1 addition & 1 deletion kcas_data.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ depends: [
"dune" {>= "3.8"}
"kcas" {= version}
"multicore-magic" {>= "2.1.0"}
"backoff" {>= "0.1.0" & with-test}
"backoff" {>= "0.1.0"}
"domain-local-await" {>= "1.0.0" & with-test}
"domain_shims" {>= "0.1.0" & with-test}
"mtime" {>= "2.0.0" & with-test}
Expand Down
35 changes: 17 additions & 18 deletions src/kcas/kcas.ml
Original file line number Diff line number Diff line change
Expand Up @@ -539,22 +539,14 @@ let rec exchange_no_alloc backoff loc state =
end
else exchange_no_alloc (Backoff.once backoff) loc state

let[@inline] rec cas_with_state backoff loc before state state_old =
let[@inline] cas_with_state loc before state state_old =
before == eval state_old
&& (before == state.after
||
if Atomic.compare_and_set (as_atomic loc) state_old state then begin
resume_awaiters state_old.awaiters;
true
end
else
(* We must retry, because compare is by value rather than by state. In
other words, we should not fail spuriously due to some other thread
having installed or removed a waiter.
Fenceless is safe as there was a fence before. *)
cas_with_state (Backoff.once backoff) loc before state
(fenceless_get (as_atomic loc)))
|| Atomic.compare_and_set (as_atomic loc) state_old state
&& begin
resume_awaiters state_old.awaiters;
true
end)

let inc x = x + 1
let dec x = x - 1
Expand Down Expand Up @@ -607,10 +599,17 @@ module Loc = struct
let[@inline] get_mode loc =
if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free

let compare_and_set ?(backoff = Backoff.default) loc before after =
let state = new_state after in
let compare_and_set loc before after =
let state_old = atomic_get (as_atomic (to_loc loc)) in
cas_with_state backoff (to_loc loc) before state state_old
before == eval state_old
&& (before == after
|| Atomic.compare_and_set
(as_atomic (to_loc loc))
state_old (new_state after)
&& begin
resume_awaiters state_old.awaiters;
true
end)

let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f =
let timeout = Timeout.alloc_opt timeoutf in
Expand Down Expand Up @@ -910,7 +909,7 @@ module Xt = struct
(* Fenceless is safe inside transactions as each log update has a
fence. *)
let state_old = fenceless_get (as_atomic loc) in
if cas_with_state Backoff.default loc before state state_old then
if cas_with_state loc before state state_old then
success xt result
else commit_once_reuse backoff xt tx
end
Expand Down
2 changes: 1 addition & 1 deletion src/kcas/kcas.mli
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ module Loc : sig
conditional load. It is also safe for the given function [f] to raise any
other exception to abort the conditional load. *)

val compare_and_set : ?backoff:Backoff.t -> 'a t -> 'a -> 'a -> bool
val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r before after] atomically updates the shared memory
location [r] to the [after] value if the current value of [r] is the
[before] value. *)
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name kcas_data)
(libraries
(re_export kcas)
backoff
multicore-magic))

(rule
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/kcas_data.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Hashtbl = Hashtbl
module Queue = Queue
module Two_stack_queue = Two_stack_queue
module Stack = Stack
module Mvar = Mvar
module Promise = Promise
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/kcas_data.mli
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@

module Hashtbl = Hashtbl
module Queue = Queue
module Two_stack_queue = Two_stack_queue
module Stack = Stack

(** {1 Communication and synchronization primitives} *)
Expand Down
156 changes: 156 additions & 0 deletions src/kcas_data/two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
open Kcas

type 'a t = { head : 'a head_pack Loc.t; tail : 'a tail_pack Loc.t }

and ('a, _) head =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head_pack;
}
-> ('a, [> `Cons ]) head
| Head : { counter : int } -> ('a, [> `Head ]) head

and 'a head_pack = H : ('a, [< `Cons | `Head ]) head -> 'a head_pack
[@@unboxed]

and ('a, _) tail =
| Snoc : {
counter : int;
prefix : 'a tail_pack;
value : 'a;
}
-> ('a, [> `Snoc ]) tail
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc ]) tail;
}
-> ('a, [> `Tail ]) tail

and 'a tail_pack = T : ('a, [< `Snoc | `Tail ]) tail -> 'a tail_pack
[@@unboxed]

let create () =
let head = Loc.make ~padded:true (H (Head { counter = 1 })) in
let tail =
Loc.make ~padded:true (T (Tail { counter = 0; move = Obj.magic () }))
in
{ head; tail } |> Multicore_magic.copy_as_padded

let rec rev (suffix : (_, [< `Cons ]) head) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let[@inline] rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tail) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let[@inline] counter_of_head = function
| (Head r : (_, [< `Head ]) head) -> r.counter

let[@inline] counter_of_snoc = function
| (Snoc r : (_, [< `Snoc ]) tail) -> r.counter

let[@inline] counter_of_tail = function
| (Tail r : (_, [< `Tail ]) tail) -> r.counter

let clear_move = function
| (Tail tail_r : (_, [< `Tail ]) tail) -> tail_r.move <- Obj.magic ()

let is_tail = function T (Tail _) -> true | T (Snoc _) -> false

let rec push backoff t value =
match Loc.fenceless_get t.tail with
| T (Snoc snoc_r) as prefix ->
let after = T (Snoc { counter = snoc_r.counter + 1; prefix; value }) in
if not (Loc.compare_and_set t.tail prefix after) then
push (Backoff.once backoff) t value
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move != Obj.magic () then
match Loc.fenceless_get t.head with
| H (Head _ as head) when counter_of_head head < counter_of_snoc move ->
let after = rev move in
if Loc.compare_and_set t.head (H head) (H after) then
clear_move tail;
push backoff t value
| _ -> push_with backoff t (counter_of_tail tail) (T tail) value
else push_with backoff t (counter_of_tail tail) (T tail) value

and push_with backoff t counter prefix value =
let after = Snoc { counter = counter + 1; prefix; value } in
if not (Loc.compare_and_set t.tail prefix (T after)) then
push (Backoff.once backoff) t value

let[@inline] push t value = push Backoff.default t value

exception Empty

let rec pop backoff t =
match Loc.get t.head with
| H (Cons cons_r) as before ->
let after = cons_r.suffix in
if Loc.compare_and_set t.head before after then cons_r.value
else pop (Backoff.once backoff) t
| H (Head _ as head) -> begin
match Loc.fenceless_get t.tail with
| T (Snoc snoc_r as move) ->
if is_tail snoc_r.prefix then begin
let tail =
Tail { counter = snoc_r.counter - 1; move = Obj.magic () }
in
if
Loc.fenceless_get t.head == H head
&& Loc.compare_and_set t.tail (T move) (T tail)
then snoc_r.value
else pop backoff t
end
else
let tail = Tail { counter = snoc_r.counter; move } in
if
Loc.fenceless_get t.head == H head
&& Loc.compare_and_set t.tail (T move) (T tail)
then pop_moving backoff t head move tail
else pop backoff t
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move == Obj.magic () then pop_emptyish backoff t head
else pop_moving backoff t head move tail
end

and pop_moving backoff t head move tail =
if counter_of_head head < counter_of_snoc move then
match rev move with
| Cons cons_r ->
if Loc.compare_and_set t.head (H head) cons_r.suffix then begin
clear_move tail;
cons_r.value
end
else pop backoff t
else pop_emptyish backoff t head

and pop_emptyish backoff t head =
if Loc.get t.head == H head then raise_notrace Empty else pop backoff t

let[@inline] pop_opt t =
match pop Backoff.default t with
| value -> Some value
| exception Empty -> None

let[@inline] pop t = pop Backoff.default t

let rec length t =
let head = Loc.get t.head in
let tail = Loc.get t.tail in
if head != Loc.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
Loading

0 comments on commit d809286

Please sign in to comment.