Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Two stack queue #175

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions bench/bench_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
open Multicore_bench
module Queue = Kcas_data.Two_stack_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.pop_opt t == None);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 100 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in

let init _ = () in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then begin
for _ = 1 to n do
while Option.is_none (Queue.pop_opt t) do
Domain.cpu_relax ()
done
done;
work ()
end
in
work ()
in
let after () =
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in

let config =
let format role blocking n =
Printf.sprintf "%d %s%s%s" n
(if blocking then "" else "nb ")
role
(if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "adder" false n_adders)
(format "taker" false n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ~after ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ let benchmarks =
("Kcas_data Dllist", Bench_dllist.run_suite);
("Kcas_data Hashtbl", Bench_hashtbl.run_suite);
("Kcas_data Mvar", Bench_mvar.run_suite);
("Kcas_data Two_stack_queue", Bench_two_stack_queue.run_suite);
("Kcas_data Queue", Bench_queue.run_suite);
("Kcas_data Stack", Bench_stack.run_suite);
]
Expand Down
4 changes: 1 addition & 3 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@
(multicore-magic
(>= 2.1.0))
(backoff
(and
(>= 0.1.0)
:with-test))
(>= 0.1.0))
(domain-local-await
(and
(>= 1.0.1)
Expand Down
2 changes: 1 addition & 1 deletion kcas_data.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ depends: [
"dune" {>= "3.14"}
"kcas" {= version}
"multicore-magic" {>= "2.1.0"}
"backoff" {>= "0.1.0" & with-test}
"backoff" {>= "0.1.0"}
"domain-local-await" {>= "1.0.1" & with-test}
"domain_shims" {>= "0.1.0" & with-test}
"multicore-bench" {>= "0.1.2" & with-test}
Expand Down
37 changes: 19 additions & 18 deletions src/kcas/kcas.ml
Original file line number Diff line number Diff line change
Expand Up @@ -539,22 +539,15 @@ let rec exchange_no_alloc backoff loc state =
end
else exchange_no_alloc (Backoff.once backoff) loc state

let[@inline] rec cas_with_state backoff loc before state state_old =
let[@inline] cas_with_state loc before state state_old =
before == eval state_old
&& (before == state.after
||
if Atomic.compare_and_set (as_atomic loc) state_old state then begin
resume_awaiters state_old.awaiters;
true
end
else
(* We must retry, because compare is by value rather than by state. In
other words, we should not fail spuriously due to some other thread
having installed or removed a waiter.

Fenceless is safe as there was a fence before. *)
cas_with_state (Backoff.once backoff) loc before state
(fenceless_get (as_atomic loc)))
|| atomic_get (as_atomic loc) == state_old
&& Atomic.compare_and_set (as_atomic loc) state_old state
&& begin
resume_awaiters state_old.awaiters;
true
end)

let inc x = x + 1
let dec x = x - 1
Expand Down Expand Up @@ -607,10 +600,18 @@ module Loc = struct
let[@inline] get_mode loc =
if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free

let compare_and_set ?(backoff = Backoff.default) loc before after =
let state = new_state after in
let compare_and_set loc before after =
let state_old = atomic_get (as_atomic (to_loc loc)) in
cas_with_state backoff (to_loc loc) before state state_old
before == eval state_old
&& (before == after
|| atomic_get (as_atomic (to_loc loc)) == state_old
&& Atomic.compare_and_set
(as_atomic (to_loc loc))
state_old (new_state after)
&& begin
resume_awaiters state_old.awaiters;
true
end)

let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f =
let timeout = Timeout.alloc_opt timeoutf in
Expand Down Expand Up @@ -947,7 +948,7 @@ module Xt = struct
(* Fenceless is safe inside transactions as each log update has a
fence. *)
let state_old = fenceless_get (as_atomic loc) in
if cas_with_state Backoff.default loc before state state_old then
if cas_with_state loc before state state_old then
success xt result
else commit_once_reuse backoff xt tx
end
Expand Down
2 changes: 1 addition & 1 deletion src/kcas/kcas.mli
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ module Loc : sig
conditional load. It is also safe for the given function [f] to raise any
other exception to abort the conditional load. *)

val compare_and_set : ?backoff:Backoff.t -> 'a t -> 'a -> 'a -> bool
val compare_and_set : 'a t -> 'a -> 'a -> bool
(** [compare_and_set r before after] atomically updates the shared memory
location [r] to the [after] value if the current value of [r] is the
[before] value. *)
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name kcas_data)
(libraries
(re_export kcas)
backoff
multicore-magic))

(rule
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/kcas_data.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Hashtbl = Hashtbl
module Queue = Queue
module Two_stack_queue = Two_stack_queue
module Stack = Stack
module Mvar = Mvar
module Promise = Promise
Expand Down
1 change: 1 addition & 0 deletions src/kcas_data/kcas_data.mli
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@

module Hashtbl = Hashtbl
module Queue = Queue
module Two_stack_queue = Two_stack_queue
module Stack = Stack

(** {1 Communication and synchronization primitives} *)
Expand Down
143 changes: 143 additions & 0 deletions src/kcas_data/two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
open Kcas

type 'a t = { head : 'a head Loc.t; tail : 'a tail Loc.t }

and ('a, _) tdt =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head;
}
-> ('a, [> `Cons ]) tdt
| Head : { counter : int } -> ('a, [> `Head ]) tdt
| Snoc : {
counter : int;
prefix : 'a tail;
value : 'a;
}
-> ('a, [> `Snoc ]) tdt
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc | `Used ]) tdt;
}
-> ('a, [> `Tail ]) tdt
| Used : ('a, [> `Used ]) tdt

and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]

(* *)

let create () =
let head = Loc.make ~padded:true (H (Head { counter = 1 })) in
let tail = Loc.make ~padded:true (T (Tail { counter = 0; move = Used })) in
{ head; tail } |> Multicore_magic.copy_as_padded

(* *)

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let[@inline] rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

(* *)

let rec push backoff t value =
match Loc.fenceless_get t.tail with
| T (Snoc snoc_r) as prefix -> push_with backoff t snoc_r.counter prefix value
| T (Tail tail_r as tail) ->
begin
match tail_r.move with
| Used -> ()
| Snoc move_r as move -> begin
match Loc.fenceless_get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if Loc.compare_and_set t.head (H head) (H after) then
tail_r.move <- Used
| _ -> ()
end
end;
push_with backoff t tail_r.counter (T tail) value

and push_with backoff t counter prefix value =
let after = Snoc { counter = counter + 1; prefix; value } in
if not (Loc.compare_and_set t.tail prefix (T after)) then
push (Backoff.once backoff) t value

let[@inline] push t value = push Backoff.default t value

(* *)

exception Empty

let rec pop backoff t =
match Loc.get t.head with
| H (Cons cons_r) as before ->
let after = cons_r.suffix in
if Loc.compare_and_set t.head before after then cons_r.value
else pop (Backoff.once backoff) t
| H (Head head_r as head) -> begin
match Loc.fenceless_get t.tail with
| T (Snoc snoc_r as move) ->
if head_r.counter = snoc_r.counter then
if Loc.compare_and_set t.tail (T move) snoc_r.prefix then
snoc_r.value
else pop backoff t
else
let tail = Tail { counter = snoc_r.counter; move } in
if
Loc.fenceless_get t.head == H head
&& Loc.compare_and_set t.tail (T move) (T tail)
then pop_moving backoff t head move tail
else pop backoff t
| T (Tail tail_r as tail) -> begin
match tail_r.move with
| Used -> pop_emptyish backoff t head
| Snoc _ as move -> pop_moving backoff t head move tail
end
end

and pop_moving backoff t (Head head_r as head : (_, [< `Head ]) tdt)
(Snoc move_r as move : (_, [< `Snoc ]) tdt)
(Tail tail_r : (_, [< `Tail ]) tdt) =
if head_r.counter < move_r.counter then
match rev move with
| Cons cons_r ->
if Loc.compare_and_set t.head (H head) cons_r.suffix then begin
tail_r.move <- Used;
cons_r.value
end
else pop (Backoff.once backoff) t
else pop_emptyish backoff t head

and pop_emptyish backoff t head =
if Loc.get t.head == H head then raise_notrace Empty else pop backoff t

let[@inline] pop_opt t =
match pop Backoff.default t with
| value -> Some value
| exception Empty -> None

let[@inline] pop t = pop Backoff.default t

(* *)

let rec length t =
let head = Loc.get t.head in
let tail = Loc.fenceless_get t.tail in
if head != Loc.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
20 changes: 20 additions & 0 deletions src/kcas_data/two_stack_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type !'a t
(** *)

val create : unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** *)

val pop : 'a t -> 'a
(** *)

val pop_opt : 'a t -> 'a option
(** *)

val length : 'a t -> int
(** *)
1 change: 1 addition & 0 deletions test/kcas_data/dune
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
queue_test_stm
stack_test
stack_test_stm
two_stack_queue_test_stm
xt_test)
(libraries
alcotest
Expand Down
Loading
Loading