You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
There are no guarantees regarding usage by multiple threads in a single process with these locks (you will have to ensure single process safety yourself using traditional thread based locks). In other words this lock works only between processes.
so the question became -- how to make it possible?
I have made a basic attempt to just also use threading.Lock around the fasteners.try_lock in this "minimal" (still uses our DataLad ProducerConsumer threading parallelization)
#!/usr/bin/env python3fromdatalad.support.parallelimportProducerConsumerimportfastenersprint(f"fasteners: {fasteners.__version__}")
importthreadingimportcontextlibfromtimeimporttime_lock=fasteners.InterProcessLock('/tmp/downloader-auth.lck')
@contextlib.contextmanagerdefour_try_lock(lock):
withthreading.Lock() aslck:
assertlckwithfasteners.try_lock(lock) asfl:
assertflyieldfldefconsumer(args):
rec= {"args": args, "status": "ok"}
print(f"In for {rec}")
withour_try_lock(_lock) asgotten:
yielddict(rec, time=time(), state=gotten)
yielddict(rec, time=time(), state="out")
producer_consumer=ProducerConsumer(
range(30),
consumer,
jobs=6,
)
# to test if we are receiving them all in orderprev_t=Noneforlinproducer_consumer:
print(l, end='')
ifprev_t:
ifl['time'] <prev_t:
print(" out of order", end='')
prev_t=l['time']
print()
but unfortunately it doesn't work: quickly we get Could not unlock the acquired lock and then TypeError: argument must be an int, or have a fileno() method and so on
full log of the run
$> ../datalad-master/tools/ts-parallel-download.py
fasteners: 0.17.3
In for {'args': 0, 'status': 'ok'}
In for {'args': 1, 'status': 'ok'}
In for {'args': 2, 'status': 'ok'}
In for {'args': 3, 'status': 'ok'}
In for {'args': 4, 'status': 'ok'}
In for {'args': 6, 'status': 'ok'}
In for {'args': 5, 'status': 'ok'}
In for {'args': 7, 'status': 'ok'}
In for {'args': 9, 'status': 'ok'}
In for {'args': 10, 'status': 'ok'}
In for {'args': 8, 'status': 'ok'}
('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')
Traceback (most recent call last):
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
self.unlock()
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
_interprocess_mechanism.unlock(self.lockfile)
File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
fcntl.lockf(lockfile, fcntl.LOCK_UN)
TypeError: argument must be an int, or have a fileno() method.
In for {'args': 12, 'status': 'ok'}
{'args': 1, 'status': 'ok', 'time': 1645800421.3134058, 'state': True}
In for {'args': 13, 'status': 'ok'}
In for {'args': 14, 'status': 'ok'}
In for {'args': 11, 'status': 'ok'}
In for {'args': 15, 'status': 'ok'}
In for {'args': 16, 'status': 'ok'}
In for {'args': 17, 'status': 'ok'}
In for {'args': 18, 'status': 'ok'}
In for {'args': 19, 'status': 'ok'}
('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')
Traceback (most recent call last):
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
self.unlock()
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
_interprocess_mechanism.unlock(self.lockfile)
File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
fcntl.lockf(lockfile, fcntl.LOCK_UN)
ValueError: I/O operation on closed file
In for {'args': 21, 'status': 'ok'}
In for {'args': 20, 'status': 'ok'}
In for {'args': 23, 'status': 'ok'}
In for {'args': 24, 'status': 'ok'}
In for {'args': 25, 'status': 'ok'}
In for {'args': 22, 'status': 'ok'}
[WARNING] Received an exception RuntimeError(('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')). Canceling not-yet running jobs and waiting for completion of running. You can force earlier forceful exit by Ctrl-C.
In for {'args': 26, 'status': 'ok'}
[INFO ] Canceled 1 out of 26 jobs. 6 left running.
In for {'args': 28, 'status': 'ok'}
('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')
Traceback (most recent call last):
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
self.unlock()
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
_interprocess_mechanism.unlock(self.lockfile)
File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
fcntl.lockf(lockfile, fcntl.LOCK_UN)
ValueError: I/O operation on closed file
In for {'args': 27, 'status': 'ok'}
{'args': 1, 'status': 'ok', 'time': 1645800421.3135803, 'state': 'out'}
[WARNING] One more exception was received while trying to finish gracefully: RuntimeError(('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck'))
{'args': 2, 'status': 'ok', 'time': 1645800421.313764, 'state': True}
[WARNING] One more exception was received while trying to finish gracefully: RuntimeError(('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck'))
{'args': 2, 'status': 'ok', 'time': 1645800421.313784, 'state': 'out'}
{'args': 0, 'status': 'ok', 'time': 1645800421.3139951, 'state': True}
{'args': 0, 'status': 'ok', 'time': 1645800421.3142169, 'state': 'out'}
{'args': 4, 'status': 'ok', 'time': 1645800421.31457, 'state': True}
{'args': 4, 'status': 'ok', 'time': 1645800421.3146691, 'state': 'out'}
{'args': 5, 'status': 'ok', 'time': 1645800421.3154967, 'state': True}
{'args': 5, 'status': 'ok', 'time': 1645800421.3155172, 'state': 'out'}
{'args': 6, 'status': 'ok', 'time': 1645800421.3156846, 'state': True}
{'args': 6, 'status': 'ok', 'time': 1645800421.3157363, 'state': 'out'}
{'args': 3, 'status': 'ok', 'time': 1645800421.3157907, 'state': True}
{'args': 7, 'status': 'ok', 'time': 1645800421.3162234, 'state': True}
{'args': 7, 'status': 'ok', 'time': 1645800421.316261, 'state': 'out'}
{'args': 9, 'status': 'ok', 'time': 1645800421.3167636, 'state': True}
{'args': 10, 'status': 'ok', 'time': 1645800421.3169804, 'state': True}
{'args': 10, 'status': 'ok', 'time': 1645800421.3170028, 'state': 'out'}
{'args': 9, 'status': 'ok', 'time': 1645800421.3171237, 'state': 'out'}
{'args': 8, 'status': 'ok', 'time': 1645800421.3171704, 'state': True}
{'args': 8, 'status': 'ok', 'time': 1645800421.317269, 'state': 'out'}
{'args': 15, 'status': 'ok', 'time': 1645800421.317556, 'state': True}
{'args': 14, 'status': 'ok', 'time': 1645800421.317613, 'state': True}
{'args': 12, 'status': 'ok', 'time': 1645800421.3176675, 'state': True}
{'args': 15, 'status': 'ok', 'time': 1645800421.3177376, 'state': 'out'}
{'args': 14, 'status': 'ok', 'time': 1645800421.317787, 'state': 'out'}
{'args': 11, 'status': 'ok', 'time': 1645800421.3179567, 'state': True}
{'args': 11, 'status': 'ok', 'time': 1645800421.3180025, 'state': 'out'}
{'args': 17, 'status': 'ok', 'time': 1645800421.3181202, 'state': True}
{'args': 16, 'status': 'ok', 'time': 1645800421.3181636, 'state': True}
{'args': 17, 'status': 'ok', 'time': 1645800421.3182127, 'state': 'out'}
{'args': 16, 'status': 'ok', 'time': 1645800421.3182464, 'state': 'out'}
{'args': 13, 'status': 'ok', 'time': 1645800421.3184254, 'state': True}
{'args': 13, 'status': 'ok', 'time': 1645800421.318478, 'state': 'out'}
{'args': 18, 'status': 'ok', 'time': 1645800421.3186393, 'state': True}
{'args': 18, 'status': 'ok', 'time': 1645800421.3186636, 'state': 'out'}
{'args': 21, 'status': 'ok', 'time': 1645800421.3188107, 'state': True}
{'args': 21, 'status': 'ok', 'time': 1645800421.318859, 'state': 'out'}
{'args': 25, 'status': 'ok', 'time': 1645800421.3190067, 'state': True}
{'args': 25, 'status': 'ok', 'time': 1645800421.3190224, 'state': 'out'}
{'args': 20, 'status': 'ok', 'time': 1645800421.31907, 'state': True}
{'args': 19, 'status': 'ok', 'time': 1645800421.3190956, 'state': True}
{'args': 23, 'status': 'ok', 'time': 1645800421.3191924, 'state': True}
{'args': 20, 'status': 'ok', 'time': 1645800421.319232, 'state': 'out'}
{'args': 23, 'status': 'ok', 'time': 1645800421.3195095, 'state': 'out'}
{'args': 24, 'status': 'ok', 'time': 1645800421.3197582, 'state': True}
{'args': 24, 'status': 'ok', 'time': 1645800421.3197832, 'state': 'out'}
{'args': 26, 'status': 'ok', 'time': 1645800421.3198633, 'state': True}
{'args': 26, 'status': 'ok', 'time': 1645800421.3199008, 'state': 'out'}
{'args': 22, 'status': 'ok', 'time': 1645800421.3199584, 'state': True}
{'args': 28, 'status': 'ok', 'time': 1645800421.3201094, 'state': True}
{'args': 27, 'status': 'ok', 'time': 1645800421.3201497, 'state': True}
{'args': 28, 'status': 'ok', 'time': 1645800421.3201678, 'state': 'out'}
{'args': 27, 'status': 'ok', 'time': 1645800421.3202121, 'state': 'out'}
{'args': 22, 'status': 'ok', 'time': 1645800421.320232, 'state': 'out'}
Traceback (most recent call last):
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 179, in release
self.unlock()
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 206, in unlock
_interprocess_mechanism.unlock(self.lockfile)
File "/home/yoh/proj/misc/fasteners/fasteners/process_mechanism.py", line 65, in unlock
fcntl.lockf(lockfile, fcntl.LOCK_UN)
TypeError: argument must be an int, or have a fileno() method.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/yoh/proj/datalad/datalad-master/../datalad-master/tools/ts-parallel-download.py", line 40, in<module>forlin producer_consumer:
File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 265, in __iter__
yield from self._iter_threads(self._jobs)
File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 417, in _iter_threads
self.shutdown(force=True, exception=self._producer_exception or interrupted_by_exception)
File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 233, in shutdown
raise exception
File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 401, in _iter_threads
done_useful |= self._pop_done_futures(lgr)
File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 463, in _pop_done_futures
raise exception
File "/usr/lib/python3.9/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/yoh/proj/datalad/datalad-master/datalad/support/parallel.py", line 329, in consumer_worker
forrin res:
File "/home/yoh/proj/datalad/datalad-master/../datalad-master/tools/ts-parallel-download.py", line 28, in consumer
yield dict(rec, time=time(), state=gotten)
File "/usr/lib/python3.9/contextlib.py", line 126, in __exit__
next(self.gen)
File "/home/yoh/proj/datalad/datalad-master/../datalad-master/tools/ts-parallel-download.py", line 21, in our_try_lock
yield fl
File "/usr/lib/python3.9/contextlib.py", line 126, in __exit__
next(self.gen)
File "/home/yoh/proj/misc/fasteners/fasteners/lock.py", line 344, in try_lock
lock.release()
File "/home/yoh/proj/misc/fasteners/fasteners/process_lock.py", line 183, in release
raise threading.ThreadError(msg) from e
RuntimeError: ('Could not unlock the acquired lock opened on `%s`', b'/tmp/downloader-auth.lck')
any advice on how to use both types of locks (interprocess + thread locking) would be very welcome!
I was about to hit "submit" but then decided to try to instantiate lock right when used instead of reusing the same instance of the InterProcessLock (thus across threads)
here is the lobotomized version of the above script showing it
#!/usr/bin/env python3
from datalad.support.parallel import ProducerConsumer
import fasteners
print(f"fasteners: {fasteners.__version__}")
import threading
import contextlib
from time import time
@contextlib.contextmanager
def our_try_lock(lock):
#with threading.Lock() as lck:# assert lck
with fasteners.try_lock(lock) as fl:
assert fl
yield fl
def consumer(args):
rec = {"args": args, "status": "ok"}
print(f"In for {rec}")
_lock = fasteners.InterProcessLock('/tmp/downloader-auth.lck')
with our_try_lock(_lock) as gotten:
yield dict(rec, time=time(), state=gotten)
yield dict(rec, time=time(), state="out")
producer_consumer = ProducerConsumer(
range(30),
consumer,
jobs=6,
)
# to test if we are receiving them all in order
prev_t = None
forlin producer_consumer:
print(l, end='')
if prev_t:
if l['time'] < prev_t:
print(" out of order", end='')
prev_t = l['time']
print()
and it worked! Decided to file an "issue" so may be it could be of help to others: do not reuse the same InterProcessLock across threads! Will close it right upon opening ;)
The text was updated successfully, but these errors were encountered:
apparently that does not play nice with multi-threading parallelization,
where (I guess) we instantiate and reuse the same Downloader across multiple
threads, and thus reusing the lock.
Bu instantiating the lock right before using it, albeit may be adding even more runtime
overhead, we are avoiding the problems with InterProcessLock.
Closesdatalad#6483 and also see harlowja/fasteners#91 for more
information
We use InterProcessLock in https://github.com/datalad/datalad to avoid possible multiple parallel processes (e.g. spawn by git-annex) to query/modify credentials at the same time. Recently we realized that it doesn't play nice with multi-threaded parallelization we have implemented in the same process (datalad/datalad#6483). I RTFM to find
https://fasteners.readthedocs.io/en/latest/examples.html#interprocess-locks
so the question became -- how to make it possible?
I have made a basic attempt to just also use threading.Lock around the fasteners.try_lock in this "minimal" (still uses our DataLad ProducerConsumer threading parallelization)
but unfortunately it doesn't work: quickly we get
Could not unlock the acquired lock
and thenTypeError: argument must be an int, or have a fileno() method
and so onfull log of the run
any advice on how to use both types of locks (interprocess + thread locking) would be very welcome!
I was about to hit "submit" but then decided to try to instantiate lock right when used instead of reusing the same instance of the InterProcessLock (thus across threads)
here is the lobotomized version of the above script showing it
The text was updated successfully, but these errors were encountered: