Skip to content

Commit

Permalink
WIP: Two stack queue
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jul 17, 2024
1 parent b234a02 commit e4fe282
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 23 deletions.
77 changes: 77 additions & 0 deletions bench/bench_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
open Multicore_bench
module Queue = Kcas_data.Two_stack_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.pop_opt t == None);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 100 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in

let init _ = () in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then begin
for _ = 1 to n do
while Option.is_none (Queue.pop_opt t) do
Domain.cpu_relax ()
done
done;
work ()
end
in
work ()
in
let after () =
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in

let config =
let format role blocking n =
Printf.sprintf "%d %s%s%s" n
(if blocking then "" else "nb ")
role
(if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "adder" false n_adders)
(format "taker" false n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ~after ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ let benchmarks =
("Kcas_data Dllist", Bench_dllist.run_suite);
("Kcas_data Hashtbl", Bench_hashtbl.run_suite);
("Kcas_data Mvar", Bench_mvar.run_suite);
("Kcas_data Two_stack_queue", Bench_two_stack_queue.run_suite);
("Kcas_data Queue", Bench_queue.run_suite);
("Kcas_data Stack", Bench_stack.run_suite);
]
Expand Down
4 changes: 1 addition & 3 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@
(multicore-magic
(>= 2.1.0))
(backoff
(and
(>= 0.1.0)
:with-test))
(>= 0.1.0))
(domain-local-await
(and
(>= 1.0.1)
Expand Down
2 changes: 1 addition & 1 deletion kcas_data.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ depends: [
"dune" {>= "3.14"}
"kcas" {= version}
"multicore-magic" {>= "2.1.0"}
"backoff" {>= "0.1.0" & with-test}
"backoff" {>= "0.1.0"}
"domain-local-await" {>= "1.0.1" & with-test}
"domain_shims" {>= "0.1.0" & with-test}
"multicore-bench" {>= "0.1.2" & with-test}
Expand Down
37 changes: 19 additions & 18 deletions src/kcas/kcas.ml
Original file line number Diff line number Diff line change
Expand Up @@ -539,22 +539,15 @@ let rec exchange_no_alloc backoff loc state =
end
else exchange_no_alloc (Backoff.once backoff) loc state

let[@inline] rec cas_with_state backoff loc before state state_old =
let[@inline] cas_with_state loc before state state_old =
before == eval state_old
&& (before == state.after
||
if Atomic.compare_and_set (as_atomic loc) state_old state then begin
resume_awaiters state_old.awaiters;
true
end
else
(* We must retry, because compare is by value rather than by state. In
other words, we should not fail spuriously due to some other thread
having installed or removed a waiter.
Fenceless is safe as there was a fence before. *)
cas_with_state (Backoff.once backoff) loc before state
(fenceless_get (as_atomic loc)))
|| atomic_get (as_atomic loc) == state_old
&& Atomic.compare_and_set (as_atomic loc) state_old state
&& begin
resume_awaiters state_old.awaiters;
true
end)

let inc x = x + 1
let dec x = x - 1
Expand Down Expand Up @@ -607,10 +600,18 @@ module Loc = struct
let[@inline] get_mode loc =
if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free

let compare_and_set ?(backoff = Backoff.default) loc before after =
let state = new_state after in
let compare_and_set loc before after =
let state_old = atomic_get (as_atomic (to_loc loc)) in
cas_with_state backoff (to_loc loc) before state state_old
before == eval state_old
&& (before == after
|| atomic_get (as_atomic (to_loc loc)) == state_old
&& Atomic.compare_and_set
(as_atomic (to_loc loc))
state_old (new_state after)
&& begin
resume_awaiters state_old.awaiters;
true
end)

let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f =
let timeout = Timeout.alloc_opt timeoutf in
Expand Down Expand Up @@ -947,7 +948,7 @@ module Xt = struct
(* Fenceless is safe inside transactions as each log update has a
fence. *)
let state_old = fenceless_get (as_atomic loc) in
if cas_with_state Backoff.default loc before state state_old then
if cas_with_state loc before state state_old then
success xt result
else commit_once_reuse backoff xt tx
end
Expand Down
2 changes: 1 addition & 1 deletion src/kcas/kcas.mli
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ module Loc : sig
conditional load. It is also safe for the given function [f] to raise any
other exception to abort the conditional load. *)

val compare_and_set : ?backoff:Backoff.t -> 'a t -> 'a -> 'a -> bool
val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r before after] atomically updates the shared memory
location [r] to the [after] value if the current value of [r] is the
[before] value. *)
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name kcas_data)
(libraries
(re_export kcas)
backoff
multicore-magic))

(rule
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/kcas_data.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Hashtbl = Hashtbl
module Queue = Queue
module Two_stack_queue = Two_stack_queue
module Stack = Stack
module Mvar = Mvar
module Promise = Promise
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/kcas_data.mli
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@

module Hashtbl = Hashtbl
module Queue = Queue
module Two_stack_queue = Two_stack_queue
module Stack = Stack

(** {1 Communication and synchronization primitives} *)
Expand Down
143 changes: 143 additions & 0 deletions src/kcas_data/two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
open Kcas

type 'a t = { head : 'a head Loc.t; tail : 'a tail Loc.t }

and ('a, _) tdt =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head;
}
-> ('a, [> `Cons ]) tdt
| Head : { counter : int } -> ('a, [> `Head ]) tdt
| Snoc : {
counter : int;
prefix : 'a tail;
value : 'a;
}
-> ('a, [> `Snoc ]) tdt
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc | `Used ]) tdt;
}
-> ('a, [> `Tail ]) tdt
| Used : ('a, [> `Used ]) tdt

and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]

(* *)

let create () =
let head = Loc.make ~padded:true (H (Head { counter = 1 })) in
let tail = Loc.make ~padded:true (T (Tail { counter = 0; move = Used })) in
{ head; tail } |> Multicore_magic.copy_as_padded

(* *)

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let[@inline] rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

(* *)

let rec push backoff t value =
match Loc.fenceless_get t.tail with
| T (Snoc snoc_r) as prefix -> push_with backoff t snoc_r.counter prefix value
| T (Tail tail_r as tail) ->
begin
match tail_r.move with
| Used -> ()
| Snoc move_r as move -> begin
match Loc.fenceless_get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if Loc.compare_and_set t.head (H head) (H after) then
tail_r.move <- Used
| _ -> ()
end
end;
push_with backoff t tail_r.counter (T tail) value

and push_with backoff t counter prefix value =
let after = Snoc { counter = counter + 1; prefix; value } in
if not (Loc.compare_and_set t.tail prefix (T after)) then
push (Backoff.once backoff) t value

let[@inline] push t value = push Backoff.default t value

(* *)

exception Empty

let rec pop backoff t =
match Loc.get t.head with
| H (Cons cons_r) as before ->
let after = cons_r.suffix in
if Loc.compare_and_set t.head before after then cons_r.value
else pop (Backoff.once backoff) t
| H (Head head_r as head) -> begin
match Loc.fenceless_get t.tail with
| T (Snoc snoc_r as move) ->
if head_r.counter = snoc_r.counter then
if Loc.compare_and_set t.tail (T move) snoc_r.prefix then
snoc_r.value
else pop backoff t
else
let tail = Tail { counter = snoc_r.counter; move } in
if
Loc.fenceless_get t.head == H head
&& Loc.compare_and_set t.tail (T move) (T tail)
then pop_moving backoff t head move tail
else pop backoff t
| T (Tail tail_r as tail) -> begin
match tail_r.move with
| Used -> pop_emptyish backoff t head
| Snoc _ as move -> pop_moving backoff t head move tail
end
end

and pop_moving backoff t (Head head_r as head : (_, [< `Head ]) tdt)
(Snoc move_r as move : (_, [< `Snoc ]) tdt)
(Tail tail_r : (_, [< `Tail ]) tdt) =
if head_r.counter < move_r.counter then
match rev move with
| Cons cons_r ->
if Loc.compare_and_set t.head (H head) cons_r.suffix then begin
tail_r.move <- Used;
cons_r.value
end
else pop (Backoff.once backoff) t
else pop_emptyish backoff t head

and pop_emptyish backoff t head =
if Loc.get t.head == H head then raise_notrace Empty else pop backoff t

let[@inline] pop_opt t =
match pop Backoff.default t with
| value -> Some value
| exception Empty -> None

let[@inline] pop t = pop Backoff.default t

(* *)

let rec length t =
let head = Loc.get t.head in
let tail = Loc.fenceless_get t.tail in
if head != Loc.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
20 changes: 20 additions & 0 deletions src/kcas_data/two_stack_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type !'a t
(** *)

val create : unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** *)

val pop : 'a t -> 'a
(** *)

val pop_opt : 'a t -> 'a option
(** *)

val length : 'a t -> int
(** *)
1 change: 1 addition & 0 deletions test/kcas_data/dune
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
queue_test_stm
stack_test
stack_test_stm
two_stack_queue_test_stm
xt_test)
(libraries
alcotest
Expand Down
Loading

0 comments on commit e4fe282

Please sign in to comment.