-
Notifications
You must be signed in to change notification settings - Fork 0
/
rwlock.py
81 lines (66 loc) · 2.25 KB
/
rwlock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio
class RWLock:
def __init__(self):
self.cond = asyncio.Condition()
self.readers = 0
self.writer = False
async def lock(self, write=False):
async with self.cond:
while self.readers and write or self.writer:
await self.cond.wait()
if write: self.writer = True
else: self.readers += 1
print("locked for write" if write else "locked for read")
async def unlock(self):
async with self.cond:
if self.writer: self.writer = False
else: self.readers -= 1
self.cond.notify_all()
def test_rwlock():
import random
holders = set()
lock = RWLock()
def invariant():
if any(r.startswith("w-") for r in holders):
# Either there's a single writer
assert len(holders) == 1
assert all(r.startswith("w-") for r in holders)
else:
# Or unlimited number of readers
assert all(r.startswith("r-") for r in holders)
async def reader():
await asyncio.sleep(random.random() / 10)
self = object()
invariant()
await lock.lock()
holders.add(f"r-{id(self)}")
await asyncio.sleep(random.random() / 10)
invariant()
holders.remove(f"r-{id(self)}")
await lock.unlock()
invariant()
async def writer():
await asyncio.sleep(random.random() / 10)
self = object()
invariant()
await lock.lock(write=True)
holders.add(f"w-{id(self)}")
await asyncio.sleep(random.random() / 10)
invariant()
holders.remove(f"w-{id(self)}")
await lock.unlock()
invariant()
async def test():
assert not lock.readers and not lock.writer
tasks = set()
for i in range(100):
await asyncio.sleep(random.random() / 30)
tasks.add(asyncio.ensure_future(writer() if random.random() < 0.10 else reader()))
await asyncio.gather(*tasks)
assert not lock.readers and not lock.writer
print("----")
asyncio.get_event_loop().run_until_complete(test())
print("----")
asyncio.get_event_loop().run_until_complete(test())
if __name__ == "__main__":
test_rwlock()