Skip to content

Commit

Permalink
Allow for using and working with offset locks
Browse files Browse the repository at this point in the history
Offset locks allow for lock inside a single file
using byte offsets, this avoids having to create hundreds
of lock files for each use case, and instead allows for
an application to be smarter, and shard locks from a single
file for its use-cases (assuming the file can pick a large
enough file with enough offsets to satisfy its needs).

This makes it much easier to cleanup lock files, track them
and know when to delete them (because deleting per-lock lock
files is hard to do when an application is always online, because
knowing when to delete a lock file is a non-trivial problem when
an application has no interface to tell which locks are alive or
dead and which are safe to delete).
  • Loading branch information
Joshua Harlow authored and Joshua Harlow committed Aug 16, 2018
1 parent 2f78159 commit 5cfeee4
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 17 deletions.
67 changes: 50 additions & 17 deletions fasteners/process_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
# License for the specific language governing permissions and limitations
# under the License.

import abc
import errno
import logging
import os
import threading
import time

import six
from six.moves import range as compat_range # noqa

from fasteners import _utils

Expand Down Expand Up @@ -49,6 +51,7 @@ def _ensure_tree(path):
return True


@six.add_metaclass(abc.ABCMeta)
class _InterProcessLock(object):
"""An interprocess locking implementation.
Expand Down Expand Up @@ -87,13 +90,35 @@ class _InterProcessLock(object):
acquire the lock (and repeat).
"""

def __init__(self, path, sleep_func=time.sleep, logger=None):
SUPPORTS_OFFSETS = False
"""
Whether or not the lock class implements support for offset based locks.
"""

def __init__(self, path, sleep_func=time.sleep, logger=None, offset=None):
if offset is not None and offset < 0:
raise ValueError("Offset must be greater than or equal to zero")
if offset is not None and not self.SUPPORTS_OFFSETS:
raise ValueError("Offsets can not be provided (not supported)")
self.lockfile = None
self.path = _utils.canonicalize_path(path)
self.acquired = False
self.offset = offset
self.sleep_func = sleep_func
self.logger = _utils.pick_first_not_none(logger, LOG)

@classmethod
def make_offset_locks(cls, path, amount,
sleep_func=time.sleep, logger=None):
"""Create many locks that use the same path (and offsets in it)."""
if amount <= 0:
raise ValueError("At least one lock must be created")
locks = []
for i in compat_range(0, amount):
locks.append(cls(path, sleep_func=sleep_func,
offset=i, logger=logger))
return locks

def _try_acquire(self, blocking, watch):
try:
self.trylock()
Expand Down Expand Up @@ -126,6 +151,8 @@ def _do_open(self):
# creating a symlink to an important file in our lock path.
if self.lockfile is None or self.lockfile.closed:
self.lockfile = open(self.path, 'a')
if self.offset is not None and self.offset >= 0:
self.lockfile.seek(self.offset)

def acquire(self, blocking=True,
delay=DELAY_INCREMENT, max_delay=MAX_DELAY,
Expand Down Expand Up @@ -219,39 +246,45 @@ def trylock(self):
def unlock(self):
self._unlock(self.lockfile)

@staticmethod
def _trylock():
raise NotImplementedError()
@abc.abstractmethod
def _trylock(self, lockfile):
pass

@staticmethod
def _unlock():
raise NotImplementedError()
@abc.abstractmethod
def _unlock(self, lockfile):
pass


class _WindowsLock(_InterProcessLock):
"""Interprocess lock implementation that works on windows systems."""

@staticmethod
def _trylock(lockfile):
def _trylock(self, lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_NBLCK, 1)

@staticmethod
def _unlock(lockfile):
def _unlock(self, lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_UNLCK, 1)


class _FcntlLock(_InterProcessLock):
"""Interprocess lock implementation that works on posix systems."""

@staticmethod
def _trylock(lockfile):
fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
SUPPORTS_OFFSETS = True

@staticmethod
def _unlock(lockfile):
fcntl.lockf(lockfile, fcntl.LOCK_UN)
def _trylock(self, lockfile):
if self.offset is not None:
fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB, 1,
lockfile.tell(), os.SEEK_CUR)
else:
fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)

def _unlock(self, lockfile):
if self.offset is not None:
fcntl.lockf(lockfile, fcntl.LOCK_UN, 1,
lockfile.tell(), os.SEEK_CUR)
else:
fcntl.lockf(lockfile, fcntl.LOCK_UN)


if os.name == 'nt':
Expand Down
38 changes: 38 additions & 0 deletions fasteners/tests/test_process_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
WIN32 = os.name == 'nt'


def spin_lock(lock_file, passed, death, offset=None):
with pl.InterProcessLock(lock_file, offset=offset):
passed.set()
death.wait()


class BrokenLock(pl.InterProcessLock):
def __init__(self, name, errno_code):
super(BrokenLock, self).__init__(name)
Expand Down Expand Up @@ -205,6 +211,38 @@ def foo():

foo()

def test_offset_lock_acquire(self):

def try_unlink(lock_file):
try:
os.unlink(lock_file)
except OSError as e:
if e.errno != errno.ENOENT:
raise

lock_file = os.path.join(self.lock_dir, 'lock')
self.addCleanup(try_unlink, lock_file)

children = []
for i in range(0, 25):
death = multiprocessing.Event()
passed = multiprocessing.Event()
child = multiprocessing.Process(target=spin_lock,
args=[lock_file,
passed, death],
kwargs={'offset': i})
child.start()
children.append((child, passed, death))

while children:
child, passed, death = children.pop()
try:
passed.wait()
death.set()
child.join()
finally:
del child, passed, death

def test_bad_acquire(self):
lock_file = os.path.join(self.lock_dir, 'lock')
lock = BrokenLock(lock_file, errno.EBUSY)
Expand Down

0 comments on commit 5cfeee4

Please sign in to comment.