diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index b2e96a2e..db5a2be3 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -13,9 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import contextlib -import errno import functools import logging import os @@ -27,12 +25,11 @@ import time import weakref +import fasteners from oslo_config import cfg -import retrying import six -from oslo_concurrency._i18n import _, _LE, _LI -from oslo_concurrency.openstack.common import fileutils +from oslo_concurrency._i18n import _, _LI LOG = logging.getLogger(__name__) @@ -79,199 +76,8 @@ def get_lock_path(conf): return conf.oslo_concurrency.lock_path -class _Hourglass(object): - """A hourglass like periodic timer.""" - - def __init__(self, period): - self._period = period - self._last_flipped = None - - def flip(self): - """Flips the hourglass. - - The drain() method will now only return true until the period - is reached again. - """ - self._last_flipped = time.time() - - def drain(self): - """Drains the hourglass, returns True if period reached.""" - if self._last_flipped is None: - return True - else: - elapsed = max(0, time.time() - self._last_flipped) - return elapsed >= self._period - - -def _lock_retry(delay, filename, - # These parameters trigger logging to begin after a certain - # amount of time has elapsed where the lock couldn't be - # acquired (log statements will be emitted after that duration - # at the provided periodicity). - log_begins_after=1.0, log_periodicity=0.5): - """Retry logic that acquiring a lock will go through.""" - - # If this returns True, a retry attempt will occur (using the defined - # retry policy we have requested the retrying library to apply), if it - # returns False then the original exception will be re-raised (if it - # raises a new or different exception the original exception will be - # replaced with that one and raised). - def retry_on_exception(e): - # TODO(harlowja): once/if https://github.com/rholder/retrying/pull/20 - # gets merged we should just switch to using that to avoid having to - # catch and inspect all execeptions (and there types...) - if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN): - return True - raise threading.ThreadError(_("Unable to acquire lock on" - " `%(filename)s` due to" - " %(exception)s") % - { - 'filename': filename, - 'exception': e, - }) - - # Logs all attempts (with information about how long we have been trying - # to acquire the underlying lock...); after a threshold has been passed, - # and only at a fixed rate... - def never_stop(hg, attempt_number, delay_since_first_attempt_ms): - delay_since_first_attempt = delay_since_first_attempt_ms / 1000.0 - if delay_since_first_attempt >= log_begins_after: - if hg.drain(): - LOG.debug("Attempting to acquire %s (delayed %0.2f seconds)", - filename, delay_since_first_attempt) - hg.flip() - return False - - # The retrying library seems to prefer milliseconds for some reason; this - # might be changed in (see: https://github.com/rholder/retrying/issues/6) - # someday in the future... - delay_ms = delay * 1000.0 - - def decorator(func): - - @six.wraps(func) - def wrapper(*args, **kwargs): - hg = _Hourglass(log_periodicity) - r = retrying.Retrying(wait_fixed=delay_ms, - retry_on_exception=retry_on_exception, - stop_func=functools.partial(never_stop, hg)) - return r.call(func, *args, **kwargs) - - return wrapper - - return decorator - - -class _FileLock(object): - """Lock implementation which allows multiple locks, working around - issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does - not require any cleanup. Since the lock is always held on a file - descriptor rather than outside of the process, the lock gets dropped - automatically if the process crashes, even if __exit__ is not executed. - - There are no guarantees regarding usage by multiple green threads in a - single process here. This lock works only between processes. Exclusive - access between local threads should be achieved using the semaphores - in the @synchronized decorator. - - Note these locks are released when the descriptor is closed, so it's not - safe to close the file descriptor while another green thread holds the - lock. Just opening and closing the lock file can break synchronisation, - so lock files must be accessed only using this abstraction. - """ - - def __init__(self, name): - self.lockfile = None - self.fname = name - self.acquire_time = None - - def acquire(self, delay=0.01): - if delay < 0: - raise ValueError("Delay must be greater than or equal to zero") - - basedir = os.path.dirname(self.fname) - if not os.path.exists(basedir): - fileutils.ensure_tree(basedir) - LOG.info(_LI('Created lock path: %s'), basedir) - - # Open in append mode so we don't overwrite any potential contents of - # the target file. This eliminates the possibility of an attacker - # creating a symlink to an important file in our lock_path. - self.lockfile = open(self.fname, 'a') - start_time = time.time() - - # Using non-blocking locks (with retries) since green threads are not - # patched to deal with blocking locking calls. Also upon reading the - # MSDN docs for locking(), it seems to have a 'laughable' 10 - # attempts "blocking" mechanism. - do_acquire = _lock_retry(delay=delay, - filename=self.fname)(self.trylock) - do_acquire() - self.acquire_time = time.time() - LOG.debug('Acquired file lock "%s" after waiting %0.3fs', - self.fname, (self.acquire_time - start_time)) - - return True - - def __enter__(self): - self.acquire() - return self - - def release(self): - if self.acquire_time is None: - raise threading.ThreadError(_("Unable to release an unacquired" - " lock")) - try: - release_time = time.time() - LOG.debug('Releasing file lock "%s" after holding it for %0.3fs', - self.fname, (release_time - self.acquire_time)) - self.unlock() - self.acquire_time = None - except IOError: - LOG.exception(_LE("Could not unlock the acquired lock `%s`"), - self.fname) - else: - try: - self.lockfile.close() - except IOError: - LOG.exception(_LE("Could not close the acquired file handle" - " `%s`"), self.fname) - - def __exit__(self, exc_type, exc_val, exc_tb): - self.release() - - def exists(self): - return os.path.exists(self.fname) - - def trylock(self): - raise NotImplementedError() - - def unlock(self): - raise NotImplementedError() - - -class _WindowsLock(_FileLock): - def trylock(self): - msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) - - def unlock(self): - msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) - - -class _FcntlLock(_FileLock): - def trylock(self): - fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) - - def unlock(self): - fcntl.lockf(self.lockfile, fcntl.LOCK_UN) - - -if os.name == 'nt': - import msvcrt - InterProcessLock = _WindowsLock -else: - import fcntl - InterProcessLock = _FcntlLock +InterProcessLock = fasteners.InterProcessLock +ReaderWriterLock = fasteners.ReaderWriterLock class Semaphores(object): @@ -506,177 +312,6 @@ def _lock_wrapper(argv): return ret_val -class ReaderWriterLock(object): - """A reader/writer lock. - - This lock allows for simultaneous readers to exist but only one writer - to exist for use-cases where it is useful to have such types of locks. - - Currently a reader can not escalate its read lock to a write lock and - a writer can not acquire a read lock while it owns or is waiting on - the write lock. - - In the future these restrictions may be relaxed. - - This can be eventually removed if http://bugs.python.org/issue8800 ever - gets accepted into the python standard threading library... - """ - WRITER = b'w' - READER = b'r' - - @staticmethod - def _fetch_current_thread_functor(): - # Until https://github.com/eventlet/eventlet/issues/172 is resolved - # or addressed we have to use complicated workaround to get a object - # that will not be recycled; the usage of threading.current_thread() - # doesn't appear to currently be monkey patched and therefore isn't - # reliable to use (and breaks badly when used as all threads share - # the same current_thread() object)... - try: - import eventlet - from eventlet import patcher - green_threaded = patcher.is_monkey_patched('thread') - except ImportError: - green_threaded = False - if green_threaded: - return lambda: eventlet.getcurrent() - else: - return lambda: threading.current_thread() - - def __init__(self): - self._writer = None - self._pending_writers = collections.deque() - self._readers = collections.defaultdict(int) - self._cond = threading.Condition() - self._current_thread = self._fetch_current_thread_functor() - - def _has_pending_writers(self): - """Returns if there are writers waiting to become the *one* writer. - - Internal usage only. - - :return: whether there are any pending writers - :rtype: boolean - """ - return bool(self._pending_writers) - - def _is_writer(self, check_pending=True): - """Returns if the caller is the active writer or a pending writer. - - Internal usage only. - - :param check_pending: checks the pending writes as well, if false then - only the current writer is checked (and not those - writers that may be in line). - - :return: whether the current thread is a active/pending writer - :rtype: boolean - """ - me = self._current_thread() - with self._cond: - if self._writer is not None and self._writer == me: - return True - if check_pending: - return me in self._pending_writers - else: - return False - - @property - def owner_type(self): - """Returns whether the lock is locked by a writer/reader/nobody. - - :return: constant defining what the active owners type is - :rtype: WRITER/READER/None - """ - with self._cond: - if self._writer is not None: - return self.WRITER - if self._readers: - return self.READER - return None - - def _is_reader(self): - """Returns if the caller is one of the readers. - - Internal usage only. - - :return: whether the current thread is a active/pending reader - :rtype: boolean - """ - me = self._current_thread() - with self._cond: - return me in self._readers - - @contextlib.contextmanager - def read_lock(self): - """Context manager that grants a read lock. - - Will wait until no active or pending writers. - - Raises a ``RuntimeError`` if an active or pending writer tries to - acquire a read lock as this is disallowed. - """ - me = self._current_thread() - if self._is_writer(): - raise RuntimeError("Writer %s can not acquire a read lock" - " while holding/waiting for the write lock" - % me) - with self._cond: - while self._writer is not None: - # An active writer; guess we have to wait. - self._cond.wait() - # No active writer; we are good to become a reader. - self._readers[me] += 1 - try: - yield self - finally: - # I am no longer a reader, remove *one* occurrence of myself. - # If the current thread acquired two read locks, then it will - # still have to remove that other read lock; this allows for - # basic reentrancy to be possible. - with self._cond: - claims = self._readers[me] - if claims == 1: - self._readers.pop(me) - else: - self._readers[me] = claims - 1 - if not self._readers: - self._cond.notify_all() - - @contextlib.contextmanager - def write_lock(self): - """Context manager that grants a write lock. - - Will wait until no active readers. Blocks readers after acquiring. - - Raises a ``RuntimeError`` if an active reader attempts to acquire a - writer lock as this is disallowed. - """ - me = self._current_thread() - if self._is_reader(): - raise RuntimeError("Reader %s to writer privilege" - " escalation not allowed" % me) - if self._is_writer(check_pending=False): - # Already the writer; this allows for basic reentrancy. - yield self - else: - with self._cond: - # Add ourself to the pending writes and wait until we are - # the one writer that can run (aka, when we are the first - # element in the pending writers). - self._pending_writers.append(me) - while (self._readers or self._writer is not None - or self._pending_writers[0] != me): - self._cond.wait() - self._writer = self._pending_writers.popleft() - try: - yield self - finally: - with self._cond: - self._writer = None - self._cond.notify_all() - - def main(): sys.exit(_lock_wrapper(sys.argv)) diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py index e1630783..b5a1a3c8 100644 --- a/oslo_concurrency/tests/unit/test_lockutils.py +++ b/oslo_concurrency/tests/unit/test_lockutils.py @@ -13,9 +13,7 @@ # under the License. import collections -import errno import fcntl -import multiprocessing import os import shutil import signal @@ -29,8 +27,6 @@ from oslotest import base as test_base import six -from concurrent import futures - from oslo_concurrency.fixture import lockutils as fixtures from oslo_concurrency import lockutils from oslo_config import fixture as config @@ -53,49 +49,6 @@ def foo(): self.assertEqual(foo.__name__, 'foo', "Wrapped function's name " "got mangled") - def test_lock_acquire_release_file_lock(self): - lock_dir = tempfile.mkdtemp() - lock_file = os.path.join(lock_dir, 'lock') - lock = lockutils._FcntlLock(lock_file) - - def try_lock(): - try: - my_lock = lockutils._FcntlLock(lock_file) - my_lock.lockfile = open(lock_file, 'w') - my_lock.trylock() - my_lock.unlock() - os._exit(1) - except IOError: - os._exit(0) - - def attempt_acquire(count): - children = [] - for i in range(count): - child = multiprocessing.Process(target=try_lock) - child.start() - children.append(child) - exit_codes = [] - for child in children: - child.join() - exit_codes.append(child.exitcode) - return sum(exit_codes) - - self.assertTrue(lock.acquire()) - try: - acquired_children = attempt_acquire(10) - self.assertEqual(0, acquired_children) - finally: - lock.release() - - try: - acquired_children = attempt_acquire(5) - self.assertNotEqual(0, acquired_children) - finally: - try: - shutil.rmtree(lock_dir) - except IOError: - pass - def test_lock_internally_different_collections(self): s1 = lockutils.Semaphores() s2 = lockutils.Semaphores() @@ -387,20 +340,6 @@ def test_deprecated_names(self): self.assertTrue(conf.oslo_concurrency.disable_process_locking) -class BrokenLock(lockutils._FileLock): - def __init__(self, name, errno_code): - super(BrokenLock, self).__init__(name) - self.errno_code = errno_code - - def unlock(self): - pass - - def trylock(self): - err = IOError() - err.errno = self.errno_code - raise err - - class FileBasedLockingTestCase(test_base.BaseTestCase): def setUp(self): super(FileBasedLockingTestCase, self).setUp() @@ -416,12 +355,6 @@ def foo(): foo() - def test_bad_acquire(self): - lock_file = os.path.join(self.lock_dir, 'lock') - lock = BrokenLock(lock_file, errno.EBUSY) - - self.assertRaises(threading.ThreadError, lock.acquire) - def test_interprocess_lock(self): lock_file = os.path.join(self.lock_dir, 'processlock') @@ -515,328 +448,6 @@ def test_non_destructive(self): self.assertEqual(f.read(), 'test') -class ReadWriteLockTest(test_base.BaseTestCase): - # This test works by sending up a bunch of threads and then running - # them all at once and having different threads either a read lock - # or a write lock; and sleeping for a period of time while using it. - # - # After the tests have completed the timings of each thread are checked - # to ensure that there are no *invalid* overlaps (a writer should never - # overlap with any readers, for example). - - # We will spend this amount of time doing some "fake" work. - WORK_TIMES = [(0.01 + x / 100.0) for x in range(0, 5)] - - # NOTE(harlowja): Sleep a little so time.time() can not be the same (which - # will cause false positives when our overlap detection code runs). If - # there are real overlaps then they will still exist. - NAPPY_TIME = 0.05 - - @staticmethod - def _find_overlaps(times, start, end): - """Counts num of overlaps between start and end in the given times.""" - overlaps = 0 - for (s, e) in times: - if s >= start and e <= end: - overlaps += 1 - return overlaps - - @classmethod - def _spawn_variation(cls, readers, writers, max_workers=None): - """Spawns the given number of readers and writers.""" - - start_stops = collections.deque() - lock = lockutils.ReaderWriterLock() - - def read_func(ident): - with lock.read_lock(): - # TODO(harlowja): sometime in the future use a monotonic clock - # here to avoid problems that can be caused by ntpd resyncing - # the clock while we are actively running. - enter_time = time.time() - time.sleep(cls.WORK_TIMES[ident % len(cls.WORK_TIMES)]) - exit_time = time.time() - start_stops.append((lock.READER, enter_time, exit_time)) - time.sleep(cls.NAPPY_TIME) - - def write_func(ident): - with lock.write_lock(): - enter_time = time.time() - time.sleep(cls.WORK_TIMES[ident % len(cls.WORK_TIMES)]) - exit_time = time.time() - start_stops.append((lock.WRITER, enter_time, exit_time)) - time.sleep(cls.NAPPY_TIME) - - if max_workers is None: - max_workers = max(0, readers) + max(0, writers) - if max_workers > 0: - with futures.ThreadPoolExecutor(max_workers=max_workers) as e: - count = 0 - for _i in range(0, readers): - e.submit(read_func, count) - count += 1 - for _i in range(0, writers): - e.submit(write_func, count) - count += 1 - - writer_times = [] - reader_times = [] - for (lock_type, start, stop) in list(start_stops): - if lock_type == lock.WRITER: - writer_times.append((start, stop)) - else: - reader_times.append((start, stop)) - return (writer_times, reader_times) - - def test_writer_abort(self): - # Ensures that the lock is released when the writer has an - # exception... - lock = lockutils.ReaderWriterLock() - self.assertFalse(lock.owner_type) - - def blow_up(): - with lock.write_lock(): - self.assertEqual(lock.WRITER, lock.owner_type) - raise RuntimeError("Broken") - - self.assertRaises(RuntimeError, blow_up) - self.assertFalse(lock.owner_type) - - def test_reader_abort(self): - lock = lockutils.ReaderWriterLock() - self.assertFalse(lock.owner_type) - - def blow_up(): - with lock.read_lock(): - self.assertEqual(lock.READER, lock.owner_type) - raise RuntimeError("Broken") - - self.assertRaises(RuntimeError, blow_up) - self.assertFalse(lock.owner_type) - - def test_double_reader_abort(self): - lock = lockutils.ReaderWriterLock() - activated = collections.deque() - - def double_bad_reader(): - with lock.read_lock(): - with lock.read_lock(): - raise RuntimeError("Broken") - - def happy_writer(): - with lock.write_lock(): - activated.append(lock.owner_type) - - # Submit a bunch of work to a pool, and then ensure that the correct - # number of writers eventually executed (every other thread will - # be a reader thread that will fail)... - max_workers = 8 - with futures.ThreadPoolExecutor(max_workers=max_workers) as e: - for i in range(0, max_workers): - if i % 2 == 0: - e.submit(double_bad_reader) - else: - e.submit(happy_writer) - - self.assertEqual(max_workers / 2, - len([a for a in activated - if a == lockutils.ReaderWriterLock.WRITER])) - - def test_double_reader_writer(self): - lock = lockutils.ReaderWriterLock() - activated = collections.deque() - active = threading.Event() - - def double_reader(): - with lock.read_lock(): - active.set() - # Wait for the writer thread to get into pending mode using a - # simple spin-loop... - while not lock._has_pending_writers(): - time.sleep(0.001) - with lock.read_lock(): - activated.append(lock.owner_type) - - def happy_writer(): - with lock.write_lock(): - activated.append(lock.owner_type) - - reader = threading.Thread(target=double_reader) - reader.daemon = True - reader.start() - - # Wait for the reader to become the active reader. - active.wait() - self.assertTrue(active.is_set()) - - # Start up the writer (the reader will wait until its going). - writer = threading.Thread(target=happy_writer) - writer.daemon = True - writer.start() - - # Ensure it went in the order we expected. - reader.join() - writer.join() - self.assertEqual(2, len(activated)) - self.assertEqual([lockutils.ReaderWriterLock.READER, - lockutils.ReaderWriterLock.WRITER], list(activated)) - - def test_reader_chaotic(self): - lock = lockutils.ReaderWriterLock() - activated = collections.deque() - - def chaotic_reader(blow_up): - with lock.read_lock(): - if blow_up: - raise RuntimeError("Broken") - else: - activated.append(lock.owner_type) - - def happy_writer(): - with lock.write_lock(): - activated.append(lock.owner_type) - - # Test that every 4th reader blows up and that we get the expected - # number of owners with this occuring. - max_workers = 8 - with futures.ThreadPoolExecutor(max_workers=max_workers) as e: - for i in range(0, max_workers): - if i % 2 == 0: - e.submit(chaotic_reader, blow_up=bool(i % 4 == 0)) - else: - e.submit(happy_writer) - - writers = [a for a in activated - if a == lockutils.ReaderWriterLock.WRITER] - readers = [a for a in activated - if a == lockutils.ReaderWriterLock.READER] - self.assertEqual(4, len(writers)) - self.assertEqual(2, len(readers)) - - def test_writer_chaotic(self): - lock = lockutils.ReaderWriterLock() - activated = collections.deque() - - def chaotic_writer(blow_up): - with lock.write_lock(): - if blow_up: - raise RuntimeError("Broken") - else: - activated.append(lock.owner_type) - - def happy_reader(): - with lock.read_lock(): - activated.append(lock.owner_type) - - # Test that every 4th reader blows up and that we get the expected - # number of owners with this occuring. - max_workers = 8 - with futures.ThreadPoolExecutor(max_workers=max_workers) as e: - for i in range(0, max_workers): - if i % 2 == 0: - e.submit(chaotic_writer, blow_up=bool(i % 4 == 0)) - else: - e.submit(happy_reader) - - writers = [a for a in activated - if a == lockutils.ReaderWriterLock.WRITER] - readers = [a for a in activated - if a == lockutils.ReaderWriterLock.READER] - self.assertEqual(2, len(writers)) - self.assertEqual(4, len(readers)) - - def test_single_reader_writer(self): - results = [] - lock = lockutils.ReaderWriterLock() - with lock.read_lock(): - self.assertTrue(lock._is_reader()) - self.assertEqual(0, len(results)) - with lock.write_lock(): - results.append(1) - self.assertTrue(lock._is_writer()) - with lock.read_lock(): - self.assertTrue(lock._is_reader()) - self.assertEqual(1, len(results)) - self.assertFalse(lock._is_reader()) - self.assertFalse(lock._is_writer()) - - def test_reader_to_writer(self): - lock = lockutils.ReaderWriterLock() - - def writer_func(): - with lock.write_lock(): - pass - - with lock.read_lock(): - self.assertRaises(RuntimeError, writer_func) - self.assertFalse(lock._is_writer()) - - self.assertFalse(lock._is_reader()) - self.assertFalse(lock._is_writer()) - - def test_writer_to_reader(self): - lock = lockutils.ReaderWriterLock() - - def reader_func(): - with lock.read_lock(): - pass - - with lock.write_lock(): - self.assertRaises(RuntimeError, reader_func) - self.assertFalse(lock._is_reader()) - - self.assertFalse(lock._is_reader()) - self.assertFalse(lock._is_writer()) - - def test_double_writer(self): - lock = lockutils.ReaderWriterLock() - with lock.write_lock(): - self.assertFalse(lock._is_reader()) - self.assertTrue(lock._is_writer()) - with lock.write_lock(): - self.assertTrue(lock._is_writer()) - self.assertTrue(lock._is_writer()) - - self.assertFalse(lock._is_reader()) - self.assertFalse(lock._is_writer()) - - def test_double_reader(self): - lock = lockutils.ReaderWriterLock() - with lock.read_lock(): - self.assertTrue(lock._is_reader()) - self.assertFalse(lock._is_writer()) - with lock.read_lock(): - self.assertTrue(lock._is_reader()) - self.assertTrue(lock._is_reader()) - - self.assertFalse(lock._is_reader()) - self.assertFalse(lock._is_writer()) - - def test_multi_reader_multi_writer(self): - writer_times, reader_times = self._spawn_variation(10, 10) - self.assertEqual(10, len(writer_times)) - self.assertEqual(10, len(reader_times)) - for (start, stop) in writer_times: - self.assertEqual(0, self._find_overlaps(reader_times, start, stop)) - self.assertEqual(1, self._find_overlaps(writer_times, start, stop)) - for (start, stop) in reader_times: - self.assertEqual(0, self._find_overlaps(writer_times, start, stop)) - - def test_multi_reader_single_writer(self): - writer_times, reader_times = self._spawn_variation(9, 1) - self.assertEqual(1, len(writer_times)) - self.assertEqual(9, len(reader_times)) - start, stop = writer_times[0] - self.assertEqual(0, self._find_overlaps(reader_times, start, stop)) - - def test_multi_writer(self): - writer_times, reader_times = self._spawn_variation(0, 10) - self.assertEqual(10, len(writer_times)) - self.assertEqual(0, len(reader_times)) - for (start, stop) in writer_times: - self.assertEqual(1, self._find_overlaps(writer_times, start, stop)) - - class LockutilsModuleTestCase(test_base.BaseTestCase): def setUp(self): diff --git a/requirements.txt b/requirements.txt index df20d909..aa67baf6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ oslo.i18n>=1.5.0 # Apache-2.0 oslo.utils>=1.4.0 # Apache-2.0 posix_ipc six>=1.9.0 +fasteners>=0.5 # Apache-2.0 retrying>=1.2.3,!=1.3.0 # Apache-2.0