Skip to content

Commit

Permalink
Add optional capacity argument to Queue.create and Stack.create
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Dec 31, 2023
1 parent f2aa501 commit bd9ae40
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 87 deletions.
10 changes: 10 additions & 0 deletions src/kcas_data/elems.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@ let rev_prepend_to_seq t tl =
| Right t' -> t'
in
prepend_to_seq t tl ()

let rec of_list_rev tl length = function
| [] -> tl
| x :: xs ->
let length = length + 1 in
of_list_rev { value = x; tl; length } length xs

let[@inline] of_list_rev = function
| [] -> empty
| x :: xs -> of_list_rev (singleton x) 1 xs
1 change: 1 addition & 0 deletions src/kcas_data/elems.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ val prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val to_seq : 'a t -> 'a Seq.t
val of_seq_rev : 'a Seq.t -> 'a t
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val of_list_rev : 'a list -> 'a t
114 changes: 114 additions & 0 deletions src/kcas_data/list_with_capacity.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
open Kcas

type 'a t = { capacity : int; length : int; list : 'a list; limit : int }

let empty_unlimited =
{ capacity = Int.max_int; length = 0; list = []; limit = Int.max_int }

let[@inline] make_empty ~capacity =
if capacity = Int.max_int then empty_unlimited
else { capacity; length = 0; list = []; limit = capacity }

let[@inline] make ~capacity ~length ~list ~limit =
{ capacity; length; list; limit }

let[@inline] to_rev_elems t = Elems.of_list_rev t.list
let[@inline] is_empty t = t.length = 0
let[@inline] length t = t.length
let[@inline] capacity t = t.capacity
let[@inline] limit t = t.limit
let[@inline] list t = t.list

let[@inline] tl_safe = function
| { list = []; _ } as t -> t
| { capacity; length; list = _ :: list; _ } as t ->
let limit = if capacity = Int.max_int then capacity else t.limit in
{ capacity; length = length - 1; list; limit }

let[@inline] tl_or_retry = function
| { list = []; _ } -> Retry.later ()
| { capacity; length; list = _ :: list; _ } as t ->
let limit = if capacity = Int.max_int then capacity else t.limit in
{ capacity; length = length - 1; list; limit }

let[@inline] hd_opt t = match t.list with [] -> None | x :: _ -> Some x

let[@inline] hd_or_retry t =
match t.list with [] -> Retry.later () | x :: _ -> x

let[@inline] hd_unsafe t = List.hd t.list

let[@inline] cons_safe x ({ capacity; _ } as t) =
if capacity = Int.max_int then
let { length; list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit = capacity }
else
let { length; limit; _ } = t in
if length < limit then
let { list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit }
else t

let[@inline] cons_or_retry x ({ capacity; _ } as t) =
if capacity = Int.max_int then
let { length; list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit = capacity }
else
let { length; limit; _ } = t in
if length < limit then
let { list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit }
else Retry.later ()

let[@inline] move ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else
let { length; _ } = t in
if length = 0 then t
else
let { limit; _ } = t in
{ capacity; length = 0; list = []; limit = limit - length }

let move_last ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else
let { length; _ } = t in
let limit = capacity - length in
if length = 0 && t.limit = limit then t
else { capacity; length = 0; list = []; limit }

let[@inline] clear ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else if t.length = 0 && t.limit = capacity then t
else make_empty ~capacity

let rec prepend_to_seq xs tl =
match xs with
| [] -> tl
| x :: xs -> fun () -> Seq.Cons (x, prepend_to_seq xs tl)

let to_seq { list; _ } = prepend_to_seq list Seq.empty

let rev_prepend_to_seq { length; list; _ } tl =
if length <= 1 then prepend_to_seq list tl
else
let t = ref (`Original list) in
fun () ->
let t =
match !t with
| `Original t' ->
(* This is domain safe as the result is always equivalent. *)
let t' = List.rev t' in
t := `Reversed t';
t'
| `Reversed t' -> t'
in
prepend_to_seq t tl ()

let of_list ?(capacity = Int.max_int) list =
let length = List.length list in
let limit = Int.min 0 (capacity - length) in
{ capacity; length; list; limit }

let of_seq_rev ?capacity xs =
of_list ?capacity (Seq.fold_left (fun xs x -> x :: xs) [] xs)
24 changes: 24 additions & 0 deletions src/kcas_data/list_with_capacity.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
type !'a t

val empty_unlimited : 'a t
val make_empty : capacity:int -> 'a t
val make : capacity:int -> length:int -> list:'a list -> limit:int -> 'a t
val is_empty : 'a t -> bool
val length : 'a t -> int
val capacity : 'a t -> int
val limit : 'a t -> int
val list : 'a t -> 'a list
val cons_safe : 'a -> 'a t -> 'a t
val cons_or_retry : 'a -> 'a t -> 'a t
val move : 'a t -> 'a t
val move_last : 'a t -> 'a t
val clear : 'a t -> 'a t
val to_rev_elems : 'a t -> 'a Elems.t
val to_seq : 'a t -> 'a Seq.t
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val of_seq_rev : ?capacity:int -> 'a Seq.t -> 'a t
val tl_safe : 'a t -> 'a t
val tl_or_retry : 'a t -> 'a t
val hd_opt : 'a t -> 'a option
val hd_or_retry : 'a t -> 'a
val hd_unsafe : 'a t -> 'a
147 changes: 95 additions & 52 deletions src/kcas_data/queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,78 @@ open Kcas

type 'a t = {
front : 'a Elems.t Loc.t;
middle : 'a Elems.t Loc.t;
back : 'a Elems.t Loc.t;
back : 'a List_with_capacity.t Loc.t;
middle : 'a List_with_capacity.t Loc.t;
}

let alloc ~front ~middle ~back =
let alloc ~front ~back ~middle =
(* We allocate locations in specific order to make most efficient use of the
splay-tree based transaction log. *)
let front = Loc.make ~padded:true front
and middle = Loc.make ~padded:true middle
and back = Loc.make ~padded:true back in
and back = Loc.make ~padded:true back
and middle = Loc.make ~padded:true middle in
Multicore_magic.copy_as_padded { back; middle; front }

let create () = alloc ~front:Elems.empty ~middle:Elems.empty ~back:Elems.empty
let create ?(capacity = Int.max_int) () =
if capacity < 0 then invalid_arg "Queue.create: capacity must be non-negative";
let back = List_with_capacity.make_empty ~capacity in
alloc ~front:Elems.empty ~back ~middle:List_with_capacity.empty_unlimited

let copy q =
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.middle, Xt.get ~xt q.back) in
let front, middle, back = Xt.commit { tx } in
alloc ~front ~middle ~back
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.back, Xt.get ~xt q.middle) in
let front, back, middle = Xt.commit { tx } in
alloc ~front ~back ~middle

module Xt = struct
let is_empty ~xt t =
(* We access locations in order of allocation to make most efficient use of
the splay-tree based transaction log. *)
Xt.get ~xt t.front == Elems.empty
&& Xt.get ~xt t.middle == Elems.empty
&& Xt.get ~xt t.back == Elems.empty

let length ~xt { back; middle; front } =
Elems.length (Xt.get ~xt front)
+ Elems.length (Xt.get ~xt middle)
+ Elems.length (Xt.get ~xt back)

let add ~xt x q = Xt.unsafe_modify ~xt q.back @@ Elems.cons x
&& List_with_capacity.is_empty (Xt.get ~xt t.back)
&& Xt.get ~xt t.middle == List_with_capacity.empty_unlimited

let length ~xt q =
Elems.length (Xt.get ~xt q.front)
+ List_with_capacity.length (Xt.get ~xt q.back)
+ List_with_capacity.length (Xt.get ~xt q.middle)

let try_add ~xt x q =
let lwc = Xt.unsafe_update ~xt q.back (List_with_capacity.cons_safe x) in
let capacity = List_with_capacity.capacity lwc in
capacity = Int.max_int
||
let back_length = List_with_capacity.length lwc in
back_length < List_with_capacity.limit lwc
||
let other_length =
List_with_capacity.length (Xt.get ~xt q.middle)
+ Elems.length (Xt.get ~xt q.front)
in
let limit = capacity - other_length in
back_length < limit
&&
(Xt.set ~xt q.back
(List_with_capacity.make ~capacity ~length:(back_length + 1)
~list:(x :: List_with_capacity.list lwc)
~limit);
true)

let add ~xt x q = Retry.unless (try_add ~xt x q)
let push = add

(** Cooperative helper to move elems from back to middle. *)
let back_to_middle ~middle ~back =
let back_to_middle ~back ~middle =
let tx ~xt =
let xs = Xt.exchange ~xt back Elems.empty in
if xs == Elems.empty || Xt.exchange ~xt middle xs != Elems.empty then
raise_notrace Exit
let xs = Xt.unsafe_update ~xt back List_with_capacity.move in
if
List_with_capacity.length xs = 0
|| Xt.exchange ~xt middle xs != List_with_capacity.empty_unlimited
then raise_notrace Exit
in
try Xt.commit { tx } with Exit -> ()

let take_opt_finish ~xt front elems =
let elems = Elems.rev elems in
let take_opt_finish ~xt front lwc =
let elems = List_with_capacity.to_rev_elems lwc in
Xt.set ~xt front (Elems.tl_safe elems);
Elems.hd_opt elems

Expand All @@ -58,17 +84,19 @@ module Xt = struct
else
let middle = t.middle and back = t.back in
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems
back_to_middle ~back ~middle;
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
if lwc != List_with_capacity.empty_unlimited then
take_opt_finish ~xt front lwc
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems else None
let lwc = Xt.unsafe_update ~xt back List_with_capacity.move_last in
if List_with_capacity.length lwc <> 0 then take_opt_finish ~xt front lwc
else None

let take_blocking ~xt q = Xt.to_blocking ~xt (take_opt q)

let peek_opt_finish ~xt front elems =
let elems = Elems.rev elems in
let peek_opt_finish ~xt front lwc =
let elems = List_with_capacity.to_rev_elems lwc in
Xt.set ~xt front elems;
Elems.hd_opt elems

Expand All @@ -79,57 +107,72 @@ module Xt = struct
else
let middle = t.middle and back = t.back in
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems
back_to_middle ~back ~middle;
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
if lwc != List_with_capacity.empty_unlimited then
peek_opt_finish ~xt front lwc
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems else None
let lwc = Xt.unsafe_update ~xt back List_with_capacity.move_last in
if List_with_capacity.length lwc <> 0 then peek_opt_finish ~xt front lwc
else None

let peek_blocking ~xt q = Xt.to_blocking ~xt (peek_opt q)

let clear ~xt t =
Xt.set ~xt t.front Elems.empty;
Xt.set ~xt t.middle Elems.empty;
Xt.set ~xt t.back Elems.empty
Xt.unsafe_modify ~xt t.back List_with_capacity.clear;
Xt.set ~xt t.middle List_with_capacity.empty_unlimited

let swap ~xt q1 q2 =
let front = Xt.get ~xt q1.front
and middle = Xt.get ~xt q1.middle
and back = Xt.get ~xt q1.back in
and back = Xt.get ~xt q1.back
and middle = Xt.get ~xt q1.middle in
let front = Xt.exchange ~xt q2.front front
and middle = Xt.exchange ~xt q2.middle middle
and back = Xt.exchange ~xt q2.back back in
and back = Xt.exchange ~xt q2.back back
and middle = Xt.exchange ~xt q2.middle middle in
Xt.set ~xt q1.front front;
Xt.set ~xt q1.middle middle;
Xt.set ~xt q1.back back
Xt.set ~xt q1.back back;
Xt.set ~xt q1.middle middle

let seq_of ~front ~middle ~back =
(* Sequence construction is lazy, so this function is O(1). *)
Seq.empty
|> Elems.rev_prepend_to_seq back
|> Elems.rev_prepend_to_seq middle
|> List_with_capacity.rev_prepend_to_seq back
|> List_with_capacity.rev_prepend_to_seq middle
|> Elems.prepend_to_seq front

let to_seq ~xt t =
let front = Xt.get ~xt t.front
and middle = Xt.get ~xt t.middle
and back = Xt.get ~xt t.back in
and back = Xt.get ~xt t.back
and middle = Xt.get ~xt t.middle in
seq_of ~front ~middle ~back

let take_all ~xt t =
let front = Xt.exchange ~xt t.front Elems.empty
and middle = Xt.exchange ~xt t.middle Elems.empty
and back = Xt.exchange ~xt t.back Elems.empty in
and back = Xt.unsafe_update ~xt t.back List_with_capacity.clear
and middle = Xt.exchange ~xt t.middle List_with_capacity.empty_unlimited in
seq_of ~front ~middle ~back
end

let is_empty q = Kcas.Xt.commit { tx = Xt.is_empty q }
let length q = Kcas.Xt.commit { tx = Xt.length q }

let try_add x q =
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
let lwc = Loc.fenceless_update q.back (List_with_capacity.cons_safe x) in
let capacity = List_with_capacity.capacity lwc in
capacity = Int.max_int
||
let back_length = List_with_capacity.length lwc in
back_length < List_with_capacity.limit lwc
|| Kcas.Xt.commit { tx = Xt.try_add x q }

let add x q =
(* Fenceless is safe as we always update. *)
Loc.fenceless_modify q.back @@ Elems.cons x
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
let lwc = Loc.fenceless_update q.back (List_with_capacity.cons_safe x) in
if List_with_capacity.capacity lwc <> Int.max_int then
if List_with_capacity.length lwc = List_with_capacity.limit lwc then
Kcas.Xt.commit { tx = Xt.add x q }

let push = add

Expand Down
Loading

0 comments on commit bd9ae40

Please sign in to comment.