Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question/Answer: How to "multi-thread safe" an InterProcessLock? #91

Open
yarikoptic opened this issue Feb 25, 2022 · 1 comment
Open

Comments

@yarikoptic
Copy link

We use InterProcessLock in https://github.com/datalad/datalad to avoid possible multiple parallel processes (e.g. spawn by git-annex) to query/modify credentials at the same time. Recently we realized that it doesn't play nice with multi-threaded parallelization we have implemented in the same process (datalad/datalad#6483). I RTFM to find
https://fasteners.readthedocs.io/en/latest/examples.html#interprocess-locks

Warning

There are no guarantees regarding usage by multiple threads in a single process with these locks (you will have to ensure single process safety yourself using traditional thread based locks). In other words this lock works only between processes.

so the question became -- how to make it possible?

I have made a basic attempt to just also use threading.Lock around the fasteners.try_lock in this "minimal" (still uses our DataLad ProducerConsumer threading parallelization)
#!/usr/bin/env python3

from datalad.support.parallel import ProducerConsumer

import fasteners
print(f"fasteners: {fasteners.__version__}")

import threading
import contextlib
from time import time

_lock = fasteners.InterProcessLock('/tmp/downloader-auth.lck')


@contextlib.contextmanager
def our_try_lock(lock):
    with threading.Lock() as lck:
        assert lck
        with fasteners.try_lock(lock) as fl:
            assert fl
            yield fl


def consumer(args):
    rec = {"args": args, "status": "ok"}
    print(f"In for {rec}")
    with our_try_lock(_lock) as gotten:
        yield dict(rec, time=time(), state=gotten)
    yield dict(rec, time=time(), state="out")


producer_consumer = ProducerConsumer(
    range(30),
    consumer,
    jobs=6,
)

# to test if we are receiving them all in order
prev_t = None
for l in producer_consumer:
    print(l, end='')
    if prev_t:
        if l['time'] < prev_t:
            print(" out of order", end='')
    prev_t = l['time']
    print()

but unfortunately it doesn't work: quickly we get Could not unlock the acquired lock and then TypeError: argument must be an int, or have a fileno() method and so on

full log of the run
$> ../datalad-master/tools/ts-parallel-download.py               
fasteners: 0.17.3
In for {'args': 0, 'status': 'ok'}
In for {'args': 1, 'status': 'ok'}
In for {'args': 2, 'status': 'ok'}
In for {'args': 3, 'status': 'ok'}
In for {'args': 4, 'status': 'ok'}
In for {'args': 6, 'status': 'ok'}
In for {'args': 5, 'status': 'ok'}
In for {'args': 7, 'status': 'ok'}
In for {'args': 9, 'status': 'ok'}
In for {'args': 10, 'status': 'ok'}
In for {'args': 8, 'status': 'ok'}
('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')
Traceback (most recent call last):
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
    self.unlock()
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
    _interprocess_mechanism.unlock(self.lockfile)
  File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
    fcntl.lockf(lockfile, fcntl.LOCK_UN)
TypeError: argument must be an int, or have a fileno() method.
In for {'args': 12, 'status': 'ok'}
{'args': 1, 'status': 'ok', 'time': 1645800421.3134058, 'state': True}
In for {'args': 13, 'status': 'ok'}
In for {'args': 14, 'status': 'ok'}
In for {'args': 11, 'status': 'ok'}
In for {'args': 15, 'status': 'ok'}
In for {'args': 16, 'status': 'ok'}
In for {'args': 17, 'status': 'ok'}
In for {'args': 18, 'status': 'ok'}
In for {'args': 19, 'status': 'ok'}
('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')
Traceback (most recent call last):
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
    self.unlock()
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
    _interprocess_mechanism.unlock(self.lockfile)
  File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
    fcntl.lockf(lockfile, fcntl.LOCK_UN)
ValueError: I/O operation on closed file
In for {'args': 21, 'status': 'ok'}
In for {'args': 20, 'status': 'ok'}
In for {'args': 23, 'status': 'ok'}
In for {'args': 24, 'status': 'ok'}
In for {'args': 25, 'status': 'ok'}
In for {'args': 22, 'status': 'ok'}
[WARNING] Received an exception RuntimeError(('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')). Canceling not-yet running jobs and waiting for completion of running. You can force earlier forceful exit by Ctrl-C. 
In for {'args': 26, 'status': 'ok'}
[INFO   ] Canceled 1 out of 26 jobs. 6 left running. 
In for {'args': 28, 'status': 'ok'}
('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')
Traceback (most recent call last):
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
    self.unlock()
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
    _interprocess_mechanism.unlock(self.lockfile)
  File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
    fcntl.lockf(lockfile, fcntl.LOCK_UN)
ValueError: I/O operation on closed file
In for {'args': 27, 'status': 'ok'}
{'args': 1, 'status': 'ok', 'time': 1645800421.3135803, 'state': 'out'}
[WARNING] One more exception was received while trying to finish gracefully: RuntimeError(('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')) 
{'args': 2, 'status': 'ok', 'time': 1645800421.313764, 'state': True}
[WARNING] One more exception was received while trying to finish gracefully: RuntimeError(('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')) 
{'args': 2, 'status': 'ok', 'time': 1645800421.313784, 'state': 'out'}
{'args': 0, 'status': 'ok', 'time': 1645800421.3139951, 'state': True}
{'args': 0, 'status': 'ok', 'time': 1645800421.3142169, 'state': 'out'}
{'args': 4, 'status': 'ok', 'time': 1645800421.31457, 'state': True}
{'args': 4, 'status': 'ok', 'time': 1645800421.3146691, 'state': 'out'}
{'args': 5, 'status': 'ok', 'time': 1645800421.3154967, 'state': True}
{'args': 5, 'status': 'ok', 'time': 1645800421.3155172, 'state': 'out'}
{'args': 6, 'status': 'ok', 'time': 1645800421.3156846, 'state': True}
{'args': 6, 'status': 'ok', 'time': 1645800421.3157363, 'state': 'out'}
{'args': 3, 'status': 'ok', 'time': 1645800421.3157907, 'state': True}
{'args': 7, 'status': 'ok', 'time': 1645800421.3162234, 'state': True}
{'args': 7, 'status': 'ok', 'time': 1645800421.316261, 'state': 'out'}
{'args': 9, 'status': 'ok', 'time': 1645800421.3167636, 'state': True}
{'args': 10, 'status': 'ok', 'time': 1645800421.3169804, 'state': True}
{'args': 10, 'status': 'ok', 'time': 1645800421.3170028, 'state': 'out'}
{'args': 9, 'status': 'ok', 'time': 1645800421.3171237, 'state': 'out'}
{'args': 8, 'status': 'ok', 'time': 1645800421.3171704, 'state': True}
{'args': 8, 'status': 'ok', 'time': 1645800421.317269, 'state': 'out'}
{'args': 15, 'status': 'ok', 'time': 1645800421.317556, 'state': True}
{'args': 14, 'status': 'ok', 'time': 1645800421.317613, 'state': True}
{'args': 12, 'status': 'ok', 'time': 1645800421.3176675, 'state': True}
{'args': 15, 'status': 'ok', 'time': 1645800421.3177376, 'state': 'out'}
{'args': 14, 'status': 'ok', 'time': 1645800421.317787, 'state': 'out'}
{'args': 11, 'status': 'ok', 'time': 1645800421.3179567, 'state': True}
{'args': 11, 'status': 'ok', 'time': 1645800421.3180025, 'state': 'out'}
{'args': 17, 'status': 'ok', 'time': 1645800421.3181202, 'state': True}
{'args': 16, 'status': 'ok', 'time': 1645800421.3181636, 'state': True}
{'args': 17, 'status': 'ok', 'time': 1645800421.3182127, 'state': 'out'}
{'args': 16, 'status': 'ok', 'time': 1645800421.3182464, 'state': 'out'}
{'args': 13, 'status': 'ok', 'time': 1645800421.3184254, 'state': True}
{'args': 13, 'status': 'ok', 'time': 1645800421.318478, 'state': 'out'}
{'args': 18, 'status': 'ok', 'time': 1645800421.3186393, 'state': True}
{'args': 18, 'status': 'ok', 'time': 1645800421.3186636, 'state': 'out'}
{'args': 21, 'status': 'ok', 'time': 1645800421.3188107, 'state': True}
{'args': 21, 'status': 'ok', 'time': 1645800421.318859, 'state': 'out'}
{'args': 25, 'status': 'ok', 'time': 1645800421.3190067, 'state': True}
{'args': 25, 'status': 'ok', 'time': 1645800421.3190224, 'state': 'out'}
{'args': 20, 'status': 'ok', 'time': 1645800421.31907, 'state': True}
{'args': 19, 'status': 'ok', 'time': 1645800421.3190956, 'state': True}
{'args': 23, 'status': 'ok', 'time': 1645800421.3191924, 'state': True}
{'args': 20, 'status': 'ok', 'time': 1645800421.319232, 'state': 'out'}
{'args': 23, 'status': 'ok', 'time': 1645800421.3195095, 'state': 'out'}
{'args': 24, 'status': 'ok', 'time': 1645800421.3197582, 'state': True}
{'args': 24, 'status': 'ok', 'time': 1645800421.3197832, 'state': 'out'}
{'args': 26, 'status': 'ok', 'time': 1645800421.3198633, 'state': True}
{'args': 26, 'status': 'ok', 'time': 1645800421.3199008, 'state': 'out'}
{'args': 22, 'status': 'ok', 'time': 1645800421.3199584, 'state': True}
{'args': 28, 'status': 'ok', 'time': 1645800421.3201094, 'state': True}
{'args': 27, 'status': 'ok', 'time': 1645800421.3201497, 'state': True}
{'args': 28, 'status': 'ok', 'time': 1645800421.3201678, 'state': 'out'}
{'args': 27, 'status': 'ok', 'time': 1645800421.3202121, 'state': 'out'}
{'args': 22, 'status': 'ok', 'time': 1645800421.320232, 'state': 'out'}
Traceback (most recent call last):
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
    self.unlock()
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
    _interprocess_mechanism.unlock(self.lockfile)
  File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
    fcntl.lockf(lockfile, fcntl.LOCK_UN)
TypeError: argument must be an int, or have a fileno() method.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/yoh/proj/datalad/datalad-master/../datalad-master/tools/ts-parallel-download.py", line 40, in <module>
    for l in producer_consumer:
  File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 265, in __iter__
    yield from self._iter_threads(self._jobs)
  File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 417, in _iter_threads
    self.shutdown(force=True, exception=self._producer_exception or interrupted_by_exception)
  File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 233, in shutdown
    raise exception
  File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 401, in _iter_threads
    done_useful |= self._pop_done_futures(lgr)
  File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 463, in _pop_done_futures
    raise exception
  File "/usr/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 329, in consumer_worker
    for r in res:
  File "/home/yoh/proj/datalad/datalad-master/../datalad-master/tools/ts-parallel-download.py", line 28, in consumer
    yield dict(rec, time=time(), state=gotten)
  File "/usr/lib/python3.9/contextlib.py", line 126, in __exit__
    next(self.gen)
  File "/home/yoh/proj/datalad/datalad-master/../datalad-master/tools/ts-parallel-download.py", line 21, in our_try_lock
    yield fl
  File "/usr/lib/python3.9/contextlib.py", line 126, in __exit__
    next(self.gen)
  File "/home/yoh/proj/misc/fasteners/fasteners/lock.py", line 344, in try_lock
    lock.release()
  File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 183, in release
    raise threading.ThreadError(msg) from e
RuntimeError: ('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')

any advice on how to use both types of locks (interprocess + thread locking) would be very welcome!

I was about to hit "submit" but then decided to try to instantiate lock right when used instead of reusing the same instance of the InterProcessLock (thus across threads)

here is the lobotomized version of the above script showing it
#!/usr/bin/env python3

from datalad.support.parallel import ProducerConsumer

import fasteners
print(f"fasteners: {fasteners.__version__}")

import threading
import contextlib
from time import time



@contextlib.contextmanager
def our_try_lock(lock):
    #with threading.Lock() as lck:
    #    assert lck
        with fasteners.try_lock(lock) as fl:
            assert fl
            yield fl


def consumer(args):
    rec = {"args": args, "status": "ok"}
    print(f"In for {rec}")

    _lock = fasteners.InterProcessLock('/tmp/downloader-auth.lck')
    with our_try_lock(_lock) as gotten:
        yield dict(rec, time=time(), state=gotten)
    yield dict(rec, time=time(), state="out")


producer_consumer = ProducerConsumer(
    range(30),
    consumer,
    jobs=6,
)

# to test if we are receiving them all in order
prev_t = None
for l in producer_consumer:
    print(l, end='')
    if prev_t:
        if l['time'] < prev_t:
            print(" out of order", end='')
    prev_t = l['time']
    print()


and it worked! Decided to file an "issue" so may be it could be of help to others: do not reuse the same InterProcessLock across threads! Will close it right upon opening ;)
yarikoptic added a commit to yarikoptic/datalad that referenced this issue Feb 25, 2022
apparently that does not play nice with multi-threading parallelization,
where (I guess) we instantiate and reuse the same Downloader across multiple
threads, and thus reusing the lock.

Bu instantiating the lock right before using it, albeit may be adding even more runtime
overhead, we are avoiding the problems with InterProcessLock.

Closes datalad#6483 and also see harlowja/fasteners#91 for more
information
@psarka
Copy link
Collaborator

psarka commented Feb 25, 2022

Thanks! This is very interesting, I'm updating the documentation at the moment, so I will reopen this issue, with the following two tasks for myself:

  • Add at test confirming @yarikoptic interprocess-interthread approach across different OS
  • Add it to the new documentation

@psarka psarka reopened this Feb 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants