Open
Description
# coding=utf-8
import time
import asyncio
from threading import Thread
from filelock import FileLock
class TestObject(object):
def __init__(self):
self.file_lock = FileLock('/tmp/test_file.lock')
print("file_lock acquire")
self.file_lock.acquire()
print("file_lock acquired")
def run(self):
time.sleep(5)
print("file_lock release")
self.file_lock.release()
print("file_lock releasd")
async def test1():
test_obj = TestObject()
Thread(target=test_obj.run).start()
async def main():
await test1()
time.sleep(1)
await test1()
if __name__ == '__main__':
asyncio.run(main())
time.sleep(10)