Skip to content

Commit

Permalink
New MPMC queue using cooperative pointer reversal
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Feb 8, 2025
1 parent 1666b8c commit 8e3e724
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 196 deletions.
288 changes: 100 additions & 188 deletions lib/picos_aux.mpmcq/picos_aux_mpmcq.ml
Original file line number Diff line number Diff line change
@@ -1,208 +1,120 @@
module Atomic = Multicore_magic.Transparent_atomic

type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }
type 'a node = { mutable next : 'a node; index : int; mutable value : 'a }
type 'a t = { head : 'a node Atomic.t; tail : 'a node Atomic.t }

and ('a, _) tdt =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head;
}
-> ('a, [> `Cons ]) tdt
| Head : { counter : int } -> ('a, [> `Head ]) tdt
| Snoc : {
counter : int;
prefix : 'a tail;
value : 'a;
}
-> ('a, [> `Snoc ]) tdt
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc | `Used ]) tdt;
}
-> ('a, [> `Tail ]) tdt
| Used : ('a, [> `Used ]) tdt
let[@inline] create_sentinel index =
let sentinel = { next = Obj.magic index; index; value = Obj.magic index } in
sentinel.next <- sentinel;
sentinel

and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]
let[@inline] maybe_fix tail =
let mystery = tail.next in
if mystery.index = tail.index - 1 then
let prev = mystery in
if prev.next != tail then prev.next <- tail

let create ?padded () =
let head =
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded
in
let tail =
Atomic.make (T (Tail { counter = 0; move = Used }))
|> Multicore_magic.copy_as ?padded
in
Multicore_magic.copy_as ?padded { head; tail }
let sentinel = create_sentinel 0 in
let head = Atomic.make sentinel |> Multicore_magic.copy_as ?padded in
let tail = Atomic.make sentinel |> Multicore_magic.copy_as ?padded in
{ head; tail } |> Multicore_magic.copy_as ?padded

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let rec push t value backoff = function
| T (Snoc snoc_r) as prefix ->
let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in
if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push t value backoff (Atomic.fenceless_get t.tail)
| T (Tail tail_r) as prefix -> begin
match tail_r.move with
| Used ->
let after = Snoc { counter = tail_r.counter + 1; prefix; value } in
if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push t value backoff (Atomic.fenceless_get t.tail)
| Snoc move_r as move ->
begin
match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Used
| _ -> tail_r.move <- Used
end;
push t value backoff (Atomic.get t.tail)
end
let rec push t value backoff =
let tail = Atomic.get t.tail in
let new_tail = { next = tail; value; index = tail.index + 1 } in
maybe_fix tail;
if Atomic.compare_and_set t.tail tail new_tail then tail.next <- new_tail
else push t value (Backoff.once backoff)

exception Empty

let rec pop t backoff = function
| H (Cons cons_r as cons) ->
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
| H (Head head_r as head) -> begin
match Atomic.get t.tail with
| T (Snoc snoc_r as move) ->
if head_r.counter = snoc_r.counter then
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
snoc_r.value
else pop t backoff (Atomic.fenceless_get t.head)
else
let (Tail tail_r as tail : (_, [ `Tail ]) tdt) =
Tail { counter = snoc_r.counter; move }
in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else if Atomic.compare_and_set t.tail (T move) (T tail) then
let (Cons cons_r) = rev move in
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Used;
cons_r.value
end
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
else pop t backoff (Atomic.fenceless_get t.head)
| T (Tail tail_r) -> begin
match tail_r.move with
| Used ->
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else raise_notrace Empty
| Snoc move_r as move ->
if head_r.counter < move_r.counter then
let (Cons cons_r) = rev move in
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Used;
cons_r.value
end
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
else
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else raise_notrace Empty
end
let rec pop_exn t backoff =
let head = Atomic.get t.head in
let next = head.next in
if head.index + 1 = next.index then
let value = next.value in
if Atomic.compare_and_set t.head head next then begin
next.value <- Obj.magic ();
value
end
else pop_exn t (Backoff.once backoff)
else
let tail = Atomic.get t.tail in
if tail == head then raise_notrace Empty
else begin
maybe_fix tail;
pop_exn t Backoff.default
end

let rec push_head t value backoff =
match Atomic.get t.head with
| H (Cons cons_r) as suffix ->
let after = Cons { counter = cons_r.counter - 1; value; suffix } in
if not (Atomic.compare_and_set t.head suffix (H after)) then
push_head t value (Backoff.once backoff)
| H (Head head_r) as head -> begin
match Atomic.get t.tail with
| T (Snoc snoc_r as move) ->
if Atomic.get t.head != head then push_head t value backoff
else if head_r.counter = snoc_r.counter then begin
let prefix = T (Snoc { snoc_r with value }) in
let after =
Snoc { snoc_r with counter = snoc_r.counter + 1; prefix }
in
if not (Atomic.compare_and_set t.tail (T move) (T after)) then
push_head t value (Backoff.once backoff)
end
else
let tail = Tail { counter = snoc_r.counter; move } in
let backoff =
if Atomic.compare_and_set t.tail (T move) (T tail) then backoff
else Backoff.once backoff
in
push_head t value backoff
| T (Tail tail_r) as prefix -> begin
match tail_r.move with
| Used ->
if Atomic.get t.head == head then begin
let tail =
Snoc { counter = tail_r.counter + 1; value; prefix }
in
if not (Atomic.compare_and_set t.tail prefix (T tail)) then
push_head t value (Backoff.once backoff)
end
else push_head t value backoff
| Snoc move_r as move ->
begin
match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter
->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Used
| _ -> tail_r.move <- Used
end;
push_head t value backoff
end
let head = Atomic.get t.head in
let next = head.next in
let index = head.index in
if index + 1 = next.index then begin
let new_next = { value; next; index } in
let index = index - 1 in
let new_head = { value = Obj.magic index; next = new_next; index } in
if not (Atomic.compare_and_set t.head head new_head) then
push_head t value (Backoff.once backoff)
end
else
let tail = Atomic.get t.tail in
if tail == head then
let new_tail = { value; next = tail; index = tail.index + 1 } in
if Atomic.compare_and_set t.tail tail new_tail then tail.next <- new_tail
else push_head t value (Backoff.once backoff)
else begin
maybe_fix tail;
push_head t value Backoff.default
end

let rec length t =
let head = Atomic.get t.head in
let tail = Atomic.fenceless_get t.tail in
if head != Atomic.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
let tail = Atomic.get t.tail in
let head = Atomic.fenceless_get t.head in
if tail == Atomic.get t.tail then tail.index - head.index else length t

let[@inline] is_empty t = length t == 0
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)
type ('a, _) res = Seq : ('a, 'a Seq.t) res | Array : ('a, 'a array) res

let[@inline] push t value =
push t value Backoff.default (Atomic.fenceless_get t.tail)
let rec pop_all_as : type a r. a t -> (a, r) res -> _ -> r =
fun t result backoff ->
let head = Atomic.get t.head in
let next = head.next in
if head.index + 1 = next.index then begin
let new_sentinel = create_sentinel head.index in
if Atomic.compare_and_set t.head head new_sentinel then begin
(* TODO: not lock-free. *)
let tail = Atomic.exchange t.tail new_sentinel in
maybe_fix tail;
match result with
| Seq ->
let rec to_seq work tail () =
Seq.Cons
( work.value,
if work == tail then Seq.empty else to_seq work.next tail )
in
to_seq head.next tail
| Array ->
let n = tail.index - head.index in
let work = ref head.next in
Array.init n @@ fun _ ->
let node = !work in
work := node.next;
node.value
end
else pop_all_as t result (Backoff.once backoff)
end
else
let tail = Atomic.get t.tail in
if tail == head then match result with Seq -> Seq.empty | Array -> [||]
else begin
maybe_fix tail;
pop_all_as t result Backoff.default
end

let[@inline] push t value = push t value Backoff.default
let[@inline] pop_exn t = pop_exn t Backoff.default
let[@inline] pop_all t = pop_all_as t Seq Backoff.default
let[@inline] pop_all_as_array t = pop_all_as t Array Backoff.default
let[@inline] push_head t value = push_head t value Backoff.default
let[@inline] is_empty t = length t == 0
23 changes: 17 additions & 6 deletions lib/picos_aux.mpmcq/picos_aux_mpmcq.mli
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
threads attempt to pop fibers from the queues of other threads when their
local queues are empty. It is also possible to use only a single shared
queue, but that will result in very high contention as this queue is not
relaxed. *)
relaxed.
⚠️ The {!pop_all} and {!pop_all_as_array} operation are not lock-free and
prevent concurrent {!pop_exn}, {!pop_all}, {!pop_all_as_array}, and
{!push_head} operations from making progress until the operation has reached
its linearization point. Other concurrent operations are not prevented from
making progress. *)

(** {1 API} *)

Expand All @@ -31,6 +37,14 @@ val pop_exn : 'a t -> 'a
@raise Empty in case the queue was empty. *)

val pop_all : 'a t -> 'a Seq.t
(** [pop_all queue] removes all values from the [queue] and returns them as a
sequence. *)

val pop_all_as_array : 'a t -> 'a array
(** [pop_all_as_array queue] removes all values from the [queue] and returns
them as an array. *)

val length : 'a t -> int
(** [length queue] returns the length or the number of values in the [queue]. *)

Expand Down Expand Up @@ -60,11 +74,8 @@ val is_empty : 'a t -> bool
# Picos_aux_mpmcq.pop_exn q
- : int = 76
# Picos_aux_mpmcq.pop_exn q
- : int = 42
# Picos_aux_mpmcq.pop_exn q
- : int = 101
# Picos_aux_mpmcq.pop_all_as_array q
- : int array = [|42; 101|]
# Picos_aux_mpmcq.pop_exn q
Exception: Picos_aux_mpmcq.Empty.
Expand Down
Loading

0 comments on commit 8e3e724

Please sign in to comment.