Skip to content

Commit

Permalink
Add Rwlock
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jan 26, 2025
1 parent 1da7d08 commit ad7855f
Show file tree
Hide file tree
Showing 9 changed files with 684 additions and 8 deletions.
47 changes: 45 additions & 2 deletions bench/bench_hashtbl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 100 * Util.iter_factor)

let t = Hashtbl.create 1000 in
let lock = Lock.create ~padded:true () in
let rwlock = Rwlock.create ~padded:true () in
let sem = Sem.create ~padded:true 1 in

let n_ops = (100 + percent_mem) * n_ops / 100 in
Expand All @@ -41,6 +42,16 @@ let run_one ~budgetf ~n_domains ?(n_ops = 100 * Util.iter_factor)
Hashtbl.replace t key value
done
end
| `Rwlock ->
Rwlock.holding rwlock @@ fun () ->
Hashtbl.clear t;
if prepopulate then begin
for _ = 1 to n_keys do
let value = Random.bits () in
let key = value mod n_keys in
Hashtbl.replace t key value
done
end
| `Sem ->
Sem.acquire sem;
Hashtbl.clear t;
Expand Down Expand Up @@ -87,6 +98,34 @@ let run_one ~budgetf ~n_domains ?(n_ops = 100 * Util.iter_factor)
end
in
work ()
| `Rwlock ->
let rec work () =
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:1000 in
if n <> 0 then begin
for _ = 1 to n do
let value = Random.State.bits state in
let op = (value asr 20) mod 100 in
let key = value mod n_keys in
if op < percent_mem then begin
Rwlock.acquire_shared rwlock;
Hashtbl.find_opt t key |> ignore;
Rwlock.release_shared rwlock
end
else if op < limit_add then begin
Rwlock.acquire rwlock;
Hashtbl.replace t key value;
Rwlock.release rwlock
end
else begin
Rwlock.acquire rwlock;
Hashtbl.remove t key;
Rwlock.release rwlock
end
done;
work ()
end
in
work ()
| `Sem ->
let rec work () =
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:1000 in
Expand Down Expand Up @@ -121,14 +160,18 @@ let run_one ~budgetf ~n_domains ?(n_ops = 100 * Util.iter_factor)
Printf.sprintf "%d worker%s, %d%% reads with %s" n_domains
(if n_domains = 1 then "" else "s")
percent_mem
(match lock_type with `Lock -> "Lock" | `Sem -> "Sem")
(match lock_type with
| `Lock -> "Lock"
| `Rwlock -> "Rwlock"
| `Sem -> "Sem")
in
Times.record ~budgetf ~n_domains ~n_warmups:1 ~n_runs_min:1 ~before ~init
~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config

let run_suite ~budgetf =
Util.cross [ 1; 2; 4; 8 ] (Util.cross [ `Lock; `Sem ] [ 10; 50; 90; 95; 100 ])
Util.cross [ 1; 2; 4; 8 ]
(Util.cross [ `Lock; `Rwlock; `Sem ] [ 10; 50; 90; 95; 100 ])
|> List.concat_map @@ fun (n_domains, (lock_type, percent_mem)) ->
if Picos_domain.recommended_domain_count () < n_domains then []
else run_one ~budgetf ~n_domains ~percent_mem ~lock_type ()
20 changes: 19 additions & 1 deletion bench/bench_lock_yield.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ let run_one ~budgetf ~n_fibers ~use_domains ~lock_type () =
let n_ops_todo = Countdown.create ~n_domains () in

let lock = Lock.create ~padded:true () in
let rwlock = Rwlock.create ~padded:true () in
let sem =
Sem.create ~padded:true (match lock_type with `Sem_n n -> n | _ -> 1)
in
Expand Down Expand Up @@ -53,6 +54,22 @@ let run_one ~budgetf ~n_fibers ~use_domains ~lock_type () =
else work ()
in
loop n
| `Rwlock ->
if n <> 0 then
let rec loop n =
if 0 < n then begin
Rwlock.acquire rwlock;
let x = !v in
v := x + 1;
Control.yield ();
assert (!v = x + 1);
v := x;
Rwlock.release rwlock;
loop (n - 1)
end
else work ()
in
loop n
| `Sem ->
if n <> 0 then
let rec loop n =
Expand Down Expand Up @@ -102,6 +119,7 @@ let run_one ~budgetf ~n_fibers ~use_domains ~lock_type () =
(if n_fibers = 1 then "" else "s")
(match lock_type with
| `Lock -> "Lock"
| `Rwlock -> "Rwlock"
| `Sem -> "Sem"
| `Sem_n n -> Printf.sprintf "Sem %d" n)
in
Expand All @@ -112,7 +130,7 @@ let run_one ~budgetf ~n_fibers ~use_domains ~lock_type () =
let run_suite ~budgetf =
Util.cross [ false; true ]
(Util.cross
[ `Lock; `Sem; `Sem_n 2; `Sem_n 3; `Sem_n 4 ]
[ `Lock; `Rwlock; `Sem; `Sem_n 2; `Sem_n 3; `Sem_n 4 ]
[ 1; 2; 3; 4; 8; 256; 512; 1024 ])
|> List.concat_map @@ fun (use_domains, (lock_type, n_fibers)) ->
if
Expand Down
34 changes: 32 additions & 2 deletions bench/bench_ref.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ let run_one ~budgetf ?(n_iter = 250 * Util.iter_factor) ~lock_type
(Op (name, value, op1, op2, op_kind)) =
let lock = Lock.create () in
let sem = Sem.create 1 in
let rwlock = Rwlock.create () in

let loc = Ref.make value in

Expand All @@ -46,6 +47,32 @@ let run_one ~budgetf ?(n_iter = 250 * Util.iter_factor) ~lock_type
end
in
loop n_iter
| `Rwlock, `RW ->
let rec loop i =
if i > 0 then begin
Rwlock.acquire rwlock;
op1 loc |> ignore;
Rwlock.release rwlock;
Rwlock.acquire rwlock;
op2 loc |> ignore;
Rwlock.release rwlock;
loop (i - 2)
end
in
loop n_iter
| `Rwlock, `RO ->
let rec loop i =
if i > 0 then begin
Rwlock.acquire_shared rwlock;
op1 loc |> ignore;
Rwlock.release_shared rwlock;
Rwlock.acquire_shared rwlock;
op2 loc |> ignore;
Rwlock.release_shared rwlock;
loop (i - 2)
end
in
loop n_iter
| `Sem, _ ->
let rec loop i =
if i > 0 then begin
Expand All @@ -63,13 +90,16 @@ let run_one ~budgetf ?(n_iter = 250 * Util.iter_factor) ~lock_type

let config =
Printf.sprintf "%s with %s" name
(match lock_type with `Lock -> "Lock" | `Sem -> "Sem")
(match lock_type with
| `Lock -> "Lock"
| `Rwlock -> "Rwlock"
| `Sem -> "Sem")
in
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_iter ~singular:"op" ~config

let run_suite ~budgetf =
Util.cross [ `Lock; `Sem ]
Util.cross [ `Lock; `Rwlock; `Sem ]
[
(let get x = !x in
Op ("get", 42, get, get, `RO));
Expand Down
1 change: 1 addition & 0 deletions lib/picos_std.sync/picos_std_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Mutex = Mutex
module Condition = Condition
module Semaphore = Semaphore
module Lock = Lock
module Rwlock = Rwlock
module Sem = Sem
module Lazy = Lazy
module Latch = Latch
Expand Down
174 changes: 171 additions & 3 deletions lib/picos_std.sync/picos_std_sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module Mutex : sig
[~checked:false] on an operation may prevent error checking also on a
subsequent operation.
See also {!Lock}. *)
See also {!Lock}, and {!Rwlock}. *)

type t
(** Represents a mutual-exclusion lock or mutex. *)
Expand Down Expand Up @@ -162,9 +162,10 @@ module Lock : sig
🏎️ This uses a low overhead, optimistic, and unfair implementation that
also does not perform runtime ownership error checking. In most cases this
should be the mutual exclusion lock you will want to use.
should be the mutual exclusion lock you will want to use. Consider using
{!Rwlock} in case most operations are reads.
See also {!Mutex}. *)
See also {!Mutex}, and {!Rwlock}. *)

type t
(** Represents a mutual exclusion lock. *)
Expand Down Expand Up @@ -247,6 +248,173 @@ module Lock : sig
in case the [lock] is not currently held exclusively. *)
end

module Rwlock : sig
(** A read-write lock.
🏎️ This uses a low overhead, optimistic, and unfair implementation that
also does not perform runtime ownership error checking. In most cases this
should be the read-write lock you will want to use and should give roughly
equal or better performance than {!Lock} in cases where the majority of
operations are reads.
🐌 This is a "slim" lock. Acquiring the lock in read mode has low overhead,
but limited scalability. For highly parallel use cases you will either
want to use sharding or a "fat" scalable read-write lock.
⚠️ The current implementation allows readers to bypass the queue and does
not prevent writers from starvation. For example, a pair of readers
running concurrently, acquiring and releasing the lock such that there is
never a point where the lock is fully released, prevents writers from
acquiring the lock. This might be changed in the future such that neither
readers nor writers should starve assuming no single party holds the lock
indefinitely.
See also {!Lock}, and {!Mutex}. *)

type t
(** Represents a read-write lock. *)

(** {1 Basic API} *)

val create : ?padded:bool -> unit -> t
(** [create ()] returns a new read-write lock that is initially unlocked. *)

exception Poisoned
(** Exception raised in case the read-write lock has been
{{!poison} poisoned}. *)

val sharing : t -> (unit -> 'a) -> 'a
(** [sharing rwlock thunk] acquires a read lock on the [rwlock] and calls
[thunk ()]. Whether [thunk ()] returns a value or raises an exception, the
[rwlock] will be unlocked.
A single fiber may acquire a read lock on a specific [rwlock] multiple
times and other fibers may concurrently acquire read locks on the [rwlock]
as well.
@raise Poisoned in case the [rwlock] has been {{!poison} poisoned}. *)

exception Frozen
(** Exception raised in case the read-write lock has been {{!freeze} frozen}.
*)

val holding : t -> (unit -> 'a) -> 'a
(** [holding rwlock thunk] acquires a write lock on the [rwlock] and calls
[thunk ()]. In case [thunk ()] returns a value, the read-write lock is
release and the value is returned. Otherwise the read-write lock is
poisoned and the exception is reraised.
@raise Poisoned in case the [rwlock] has been {{!poison} poisoned}.
@raise Frozen in case the [rwlock] has been {{!freeze} frozen}. *)

val freeze : t -> unit
(** [freeze rwlock] marks a [rwlock] as frozen, which means that one can no
longer acquire a write lock on the [rwlock].
ℹ️ Freezing a [rwlock] does not improve the scalability of acquiring read
locks on the [rwlock].
@raise Poisoned in case the [rwlock] has been {{!poison} poisoned}. *)

val protect : t -> (unit -> 'a) -> 'a
(** [protect rwlock thunk] acquires a write lock the [rwlock], runs
[thunk ()], and releases the [rwlock] after [thunk ()] returns or raises.
@raise Poisoned in case the lock has been {{!poison} poisoned}.
@raise Frozen in case the [rwlock] has been {{!freeze} frozen}. *)

module Condition : sig
(** A condition variable.
⚠️ The associated read-write lock must be held exclusively via {!acquire}
when used with a condition variable. *)

include Intf.Condition with type lock = t

val wait_shared : t -> lock -> unit
(** [wait_shared condition lock] releases the shared [lock], waits for the
[condition], and acquires the shared [lock] before returning or raising
due to the operation being canceled.
ℹ️ If the lock is {{!poison} poisoned} during the {!wait_shared}, then
the {!Poisoned} exception will be raised. *)
end

(** {1 State query API} *)

val is_locked_shared : t -> bool
(** [is_locked_shared rwlock] determines whether the [rwlock] is currently
read locked or not.
⚠️ [is_locked_shared rwlock] will return [false] in case the [rwlock] is
write locked. *)

val is_frozen : t -> bool
(** [is_frozen rwlock] determines whether the [rwlock] has been
{{!freeze} frozen}. *)

val is_locked : t -> bool
(** [is_locked rwlock] determines whether the [rwlock] is currently write
locked or not.
⚠️ [is_locked rwlock] will return [false] in case the [rwlock] is read
locked. *)

val is_poisoned : t -> bool
(** [is_poisoned rwlock] determines whether the [rwlock] has been
{{!poison} poisoned}. *)

(** {1 Expert API}
⚠️ The calls in this section must be matched correctly or the state of the
read-write lock may become corrupted. *)

val acquire_shared : t -> unit
(** [acquire_shared rwlock] acquires a read lock on the [rwlock].
A single fiber may acquire a read lock on a specific [rwlock] multiple
times and other fibers may concurrently acquire read locks on the [rwlock]
as well.
@raise Poisoned in case the [rwlock] has been {{!poison} poisoned}. *)

val try_acquire_shared : t -> bool
(** [try_acquire_shared rwlock] attempts to acquire a read lock on the
[rwlock]. Returns [true] in case of success and [false] in case of
failure.
@raise Poisoned in case the [rwlock] has been {{!poison} poisoned}. *)

val release_shared : t -> unit
(** [release_shared rwlock] release one read lock on the read-write lock or
does nothing in case the lock has been {{!poison} poisoned}. *)

val acquire : t -> unit
(** [acquire rwlock] acquires a write lock on the [rwlock].
A fiber may acquire a write lock on a specific [rwlock] once at a time.
@raise Poisoned in case the [rwlock] has been {{!poison} poisoned}.
@raise Frozen in case the [rwlock] has been {{!freeze} frozen}. *)

val try_acquire : t -> bool
(** [try_acquire rwlock] attempts to acquire a write lock on the [rwlock].
Returns [true] in case of success and [false] in case of failure.
@raise Poisoned in case the [rwlock] has been {{!poison} poisoned}.
@raise Frozen in case the [rwlock] has been {{!freeze} frozen}. *)

val release : t -> unit
(** [release rwlock] releases the write lock on the read-write lock or does
nothing in case the read-write lock has been {{!poison} poisoned}. *)

val poison : t -> unit
(** [poison rwlock] marks a write locked [rwlock] as poisoned.
@raise Invalid_argument
in case the [rwlock] is not currently write locked. *)
end

module Sem : sig
(** A counting semaphore.
Expand Down
Loading

0 comments on commit ad7855f

Please sign in to comment.