Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aquire_read_lock et. al. methods to ReaderWriterLock #108

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

## [Unreleased]
- Add `.acquire_read_lock`, `.release_read_lock`, `.acquire_write_lock`, and
`.release_write_lock` methods to the inter thread `ReaderWriterLock` as was
promised in the README.

## [0.18]
- Reshuffle the process lock code and properly document it.
Expand Down
141 changes: 105 additions & 36 deletions fasteners/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self,
threads are not properly identified by threading.current_thread
"""
self._writer = None
self._writer_entries = 0
self._pending_writers = collections.deque()
self._readers = {}
self._cond = condition_cls()
Expand Down Expand Up @@ -99,16 +100,27 @@ def owner(self) -> Optional[str]:
return self.READER
return None

@contextlib.contextmanager
def read_lock(self):
"""Context manager that grants a read lock.
def acquire_read_lock(self):
"""Acquire a read lock.

Will wait until no active or pending writers.

Raises:
RuntimeError: if a pending writer tries to acquire a read lock.
"""
me = self._current_thread()
self._acquire_read_lock(me)

def release_read_lock(self):
"""Release a read lock.

Raises:
RuntimeError: if the current thread does not own a read lock.
"""
me = self._current_thread()
self._release_read_lock(me)

def _acquire_read_lock(self, me):
if me in self._pending_writers:
raise RuntimeError("Writer %s can not acquire a read lock"
" while waiting for the write lock"
Expand All @@ -128,23 +140,91 @@ def read_lock(self):
break
# An active or pending writer; guess we have to wait.
self._cond.wait()

def _release_read_lock(self, me, raise_on_not_owned=True):
# I am no longer a reader, remove *one* occurrence of myself.
# If the current thread acquired two read locks, then it will
# still have to remove that other read lock; this allows for
# basic reentrancy to be possible.
with self._cond:
try:
me_instances = self._readers[me]
if me_instances > 1:
self._readers[me] = me_instances - 1
else:
self._readers.pop(me)
except KeyError:
if raise_on_not_owned:
raise RuntimeError(f"Thread {me} does not own a read lock")
self._cond.notify_all()

@contextlib.contextmanager
def read_lock(self):
"""Context manager that grants a read lock.

Will wait until no active or pending writers.

Raises:
RuntimeError: if a pending writer tries to acquire a read lock.
"""
me = self._current_thread()
self._acquire_read_lock(me)
try:
yield self
finally:
# I am no longer a reader, remove *one* occurrence of myself.
# If the current thread acquired two read locks, then it will
# still have to remove that other read lock; this allows for
# basic reentrancy to be possible.
with self._cond:
try:
me_instances = self._readers[me]
if me_instances > 1:
self._readers[me] = me_instances - 1
else:
self._readers.pop(me)
except KeyError:
pass
self._cond.notify_all()
self._release_read_lock(me, raise_on_not_owned=False)

def _acquire_write_lock(self, me):
if self.is_reader():
raise RuntimeError("Reader %s to writer privilege"
" escalation not allowed" % me)

with self._cond:
self._pending_writers.append(me)
while True:
# No readers, and no active writer, am I next??
if len(self._readers) == 0 and self._writer is None:
if self._pending_writers[0] == me:
self._writer = self._pending_writers.popleft()
self._writer_entries = 1
break
self._cond.wait()

def _release_write_lock(self, me, raise_on_not_owned=True):
with self._cond:
self._writer = None
self._writer_entries = 0
self._cond.notify_all()

def acquire_write_lock(self):
"""Acquire a write lock.

Will wait until no active readers. Blocks readers after acquiring.

Guaranteed for locks to be processed in fair order (FIFO).

Raises:
RuntimeError: if an active reader attempts to acquire a lock.
"""
me = self._current_thread()
if self._writer == me:
self._writer_entries += 1
else:
self._acquire_write_lock(me)

def release_write_lock(self):
"""Release a write lock.

Raises:
RuntimeError: if the current thread does not own a write lock.
"""
me = self._current_thread()
if self._writer == me:
self._writer_entries -= 1
if self._writer_entries == 0:
self._release_write_lock(me)
else:
raise RuntimeError(f"Thread {me} does not own a write lock")

@contextlib.contextmanager
def write_lock(self):
Expand All @@ -158,29 +238,18 @@ def write_lock(self):
RuntimeError: if an active reader attempts to acquire a lock.
"""
me = self._current_thread()
i_am_writer = self.is_writer(check_pending=False)
if self.is_reader() and not i_am_writer:
raise RuntimeError("Reader %s to writer privilege"
" escalation not allowed" % me)
if i_am_writer:
# Already the writer; this allows for basic reentrancy.
yield self
if self.is_writer(check_pending=False):
self._writer_entries += 1
try:
yield self
finally:
self._writer_entries -= 1
else:
with self._cond:
self._pending_writers.append(me)
while True:
# No readers, and no active writer, am I next??
if len(self._readers) == 0 and self._writer is None:
if self._pending_writers[0] == me:
self._writer = self._pending_writers.popleft()
break
self._cond.wait()
self._acquire_write_lock(me)
try:
yield self
finally:
with self._cond:
self._writer = None
self._cond.notify_all()
self._release_write_lock(me)


def locked(*args, **kwargs):
Expand Down
Loading
Loading