-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpessimistic_lock.py
83 lines (69 loc) · 3.11 KB
/
pessimistic_lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import datetime
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from types_aiobotocore_dynamodb import DynamoDBClient
from types_aiobotocore_dynamodb.type_defs import UniversalAttributeValueTypeDef
from .time import now
class PessimisticLockAcquisitionError(Exception):
pass
class PessimisticLockItemNotFoundError(Exception):
pass
class DynamoDBPessimisticLock:
def __init__(
self,
client: DynamoDBClient,
table_name: str,
*,
lock_timeout: datetime.timedelta | None = None,
lock_attribute: str = "__LockedAt",
) -> None:
self._client = client
self._table_name = table_name
self._lock_timeout = lock_timeout
self._lock_attribute = lock_attribute
@asynccontextmanager
async def __call__(self, key: dict[str, UniversalAttributeValueTypeDef]) -> AsyncGenerator[None, None]:
lock_acquired = False
try:
await self._acquire_lock(key)
lock_acquired = True
yield
finally:
if lock_acquired:
await self._release_lock(key)
async def _acquire_lock(self, key: dict[str, UniversalAttributeValueTypeDef]) -> None:
try:
await self._client.update_item(
TableName=self._table_name,
Key=key,
UpdateExpression="SET #LockAttribute = :LockAttribute",
ExpressionAttributeNames={"#LockAttribute": self._lock_attribute},
ExpressionAttributeValues={
":LockAttribute": {"S": now().isoformat()},
**self._lock_expires_at_attribute_value(),
},
ConditionExpression=f"{self._item_exists_expression(key)} AND {self._lock_not_acquired_expression()}",
)
except self._client.exceptions.ConditionalCheckFailedException as e:
raise PessimisticLockAcquisitionError(key) from e
async def _release_lock(self, key: dict[str, UniversalAttributeValueTypeDef]) -> None:
try:
await self._client.update_item(
TableName=self._table_name,
Key=key,
UpdateExpression="REMOVE #LockAttribute",
ExpressionAttributeNames={"#LockAttribute": self._lock_attribute},
ConditionExpression=self._item_exists_expression(key),
)
except self._client.exceptions.ConditionalCheckFailedException as e:
raise PessimisticLockItemNotFoundError(key) from e
def _item_exists_expression(self, key: dict[str, UniversalAttributeValueTypeDef]) -> str:
return " AND ".join(f"attribute_exists({v})" for v in key.keys()).removesuffix(" AND ")
def _lock_expires_at_attribute_value(self) -> dict:
if not self._lock_timeout:
return {}
return {":LockExpiresAt": {"S": (now() - self._lock_timeout).isoformat()}}
def _lock_not_acquired_expression(self) -> str:
if not self._lock_timeout:
return "attribute_not_exists(#LockAttribute)"
return "(attribute_not_exists(#LockAttribute) OR :LockExpiresAt > #LockAttribute)"