Skip to content

Commit 8059ef9

Browse files
committed
[feature/PI-605-bulk_etl_transform] bulk etl transform
1 parent e9b570b commit 8059ef9

File tree

19 files changed

+722
-378
lines changed

19 files changed

+722
-378
lines changed

src/layers/domain/repository/cpm_product_repository/v1.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def read(self, product_team_id: str, id: str):
2424
return super()._read(parent_ids=(product_team_id,), id=id)
2525

2626
def search(self, product_team_id: str):
27-
return super()._query(parent_ids=(product_team_id,))
27+
return super()._search(parent_ids=(product_team_id,))
2828

2929
def handle_CpmProductCreatedEvent(self, event: CpmProductCreatedEvent):
3030
return self.create_index(

src/layers/domain/repository/device_reference_data_repository/v1.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def read(self, product_team_id: str, product_id: str, id: str):
3030
return super()._read(parent_ids=(product_team_id, product_id), id=id)
3131

3232
def search(self, product_team_id: str, product_id: str):
33-
return super()._query(parent_ids=(product_team_id, product_id))
33+
return super()._search(parent_ids=(product_team_id, product_id))
3434

3535
def handle_DeviceReferenceDataCreatedEvent(
3636
self, event: DeviceReferenceDataCreatedEvent

src/layers/domain/repository/device_repository/v1.py

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -94,27 +94,6 @@ def create_tag_index(
9494
)
9595

9696

97-
def create_tag_index_batch(device_id: str, tag_value: str, data: dict):
98-
"""
99-
Difference between `create_tag_index` and `create_tag_index_batch`:
100-
101-
`create_index` is intended for the event-based
102-
`handle_TagAddedEvent` which is called by the base
103-
`write` method, which expects `TransactItem`s for use with `client.transact_write_items`
104-
105-
`create_tag_index_batch` is intended for the entity-based handler
106-
`handle_bulk` which is called by the base method `write_bulk`, which expects
107-
`BatchWriteItem`s which we render as a `dict` for use with `client.batch_write_items`
108-
"""
109-
pk = TableKey.DEVICE_TAG.key(tag_value)
110-
sk = TableKey.DEVICE.key(device_id)
111-
return {
112-
"PutRequest": {
113-
"Item": marshall(pk=pk, sk=sk, pk_read=pk, sk_read=sk, root=False, **data)
114-
}
115-
}
116-
117-
11897
def delete_tag_index(table_name: str, device_id: str, tag_value: str) -> TransactItem:
11998
pk = TableKey.DEVICE_TAG.key(tag_value)
12099
sk = TableKey.DEVICE.key(device_id)
@@ -158,11 +137,16 @@ def __init__(self, table_name, dynamodb_client):
158137
table_key=TableKey.DEVICE,
159138
)
160139

140+
def _query(self, parent_ids: tuple[str], id: str = None):
141+
return map(
142+
decompress_device_fields, super()._query(parent_ids=parent_ids, id=id)
143+
)
144+
161145
def read(self, product_team_id: str, product_id: str, id: str):
162146
return super()._read(parent_ids=(product_team_id, product_id), id=id)
163147

164148
def search(self, product_team_id: str, product_id: str):
165-
return super()._query(parent_ids=(product_team_id, product_id))
149+
return super()._search(parent_ids=(product_team_id, product_id))
166150

167151
def handle_DeviceCreatedEvent(self, event: DeviceCreatedEvent) -> TransactItem:
168152
return self.create_index(
@@ -310,7 +294,9 @@ def handle_DeviceKeyDeletedEvent(
310294
def handle_DeviceTagAddedEvent(
311295
self, event: DeviceTagAddedEvent
312296
) -> list[TransactItem]:
313-
data = {"tags": event.tags, "updated_on": event.updated_on}
297+
data = compress_device_fields(
298+
{"tags": event.tags, "updated_on": event.updated_on}
299+
)
314300

315301
# Create a copy of the Device indexed against the new tag
316302
create_tag_transaction = create_tag_index(
@@ -343,7 +329,9 @@ def handle_DeviceTagAddedEvent(
343329
)
344330

345331
def handle_DeviceTagsAddedEvent(self, event: DeviceTagsAddedEvent):
346-
data = {"tags": event.tags, "updated_on": event.updated_on}
332+
data = compress_device_fields(
333+
{"tags": event.tags, "updated_on": event.updated_on}
334+
)
347335
_data = compress_device_fields(
348336
event, fields_to_compress=NON_ROOT_FIELDS_TO_COMPRESS
349337
)
@@ -390,9 +378,8 @@ def handle_DeviceTagsClearedEvent(self, event: DeviceTagsClearedEvent):
390378
]
391379

392380
keys = {DeviceKey(**key).key_value for key in event.keys}
393-
update_transactions = self.update_indexes(
394-
id=event.id, keys=keys, data={"tags": []}
395-
)
381+
data = compress_device_fields({"tags": []})
382+
update_transactions = self.update_indexes(id=event.id, keys=keys, data=data)
396383
return delete_tags_transactions + update_transactions
397384

398385
def handle_DeviceReferenceDataIdAddedEvent(
@@ -410,39 +397,6 @@ def handle_QuestionnaireResponseUpdatedEvent(
410397
) -> TransactItem:
411398
return self.handle_DeviceUpdatedEvent(event=event)
412399

413-
def handle_bulk(self, item: dict) -> list[dict]:
414-
parent_key = (item["product_team_id"], item["product_id"])
415-
416-
root_data = compress_device_fields(item)
417-
create_device_transaction = self.create_index_batch(
418-
id=item["id"], parent_key_parts=parent_key, data=root_data, root=True
419-
)
420-
421-
non_root_data = compress_device_fields(
422-
item, fields_to_compress=NON_ROOT_FIELDS_TO_COMPRESS
423-
)
424-
create_keys_transactions = [
425-
self.create_index_batch(
426-
id=key["key_value"],
427-
parent_key_parts=parent_key,
428-
data=non_root_data,
429-
root=False,
430-
)
431-
for key in item["keys"]
432-
]
433-
434-
create_tags_transactions = [
435-
create_tag_index_batch(
436-
device_id=item["id"], tag_value=tag, data=non_root_data
437-
)
438-
for tag in item["tags"]
439-
]
440-
return (
441-
[create_device_transaction]
442-
+ create_keys_transactions
443-
+ create_tags_transactions
444-
)
445-
446400
def query_by_tag(
447401
self,
448402
fields_to_drop: list[str] | set[str] = None,

src/layers/domain/repository/product_team_repository/v1.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,23 @@ def __init__(self, table_name: str, dynamodb_client):
1616
)
1717

1818
def read(self, id: str) -> ProductTeam:
19-
return super()._read(parent_ids=(), id=id)
19+
return super()._read(parent_ids=("",), id=id)
20+
21+
def search(self) -> list[ProductTeam]:
22+
return super()._search(parent_ids=("",))
2023

2124
def handle_ProductTeamCreatedEvent(self, event: ProductTeamCreatedEvent):
2225
create_root_transaction = self.create_index(
23-
id=event.id, parent_key_parts=(event.id,), data=asdict(event), root=True
26+
id=event.id, parent_key_parts=("",), data=asdict(event), root=True
2427
)
2528

2629
keys = {ProductTeamKey(**key) for key in event.keys}
2730
create_key_transactions = [
2831
self.create_index(
2932
id=key.key_value,
30-
parent_key_parts=(key.key_value,),
33+
parent_key_parts=("",),
3134
data=asdict(event),
32-
root=True,
35+
root=False,
3336
)
3437
for key in keys
3538
]
Lines changed: 1 addition & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,7 @@
11
import pytest
22
from domain.repository.errors import AlreadyExistsError, ItemNotFound
3-
from domain.repository.repository import (
4-
exponential_backoff_with_jitter,
5-
retry_with_jitter,
6-
)
73

8-
from .model_v1 import (
9-
MyEventAdd,
10-
MyEventDelete,
11-
MyModel,
12-
MyOtherEventAdd,
13-
MyRepository,
14-
MyTableKey,
15-
)
4+
from .model_v1 import MyEventAdd, MyEventDelete, MyModel, MyOtherEventAdd, MyRepository
165

176

187
@pytest.fixture
@@ -125,135 +114,3 @@ def test_repository_add_and_delete_separate_transactions(repository: MyRepositor
125114

126115
with pytest.raises(ItemNotFound):
127116
repository.read(id=value)
128-
129-
130-
@pytest.mark.integration
131-
def test_repository_write_bulk(repository: MyRepository):
132-
responses = repository.write_bulk(
133-
[
134-
{
135-
"pk": str(i),
136-
"sk": str(i),
137-
"pk_read": MyTableKey.FOO.key(str(i)),
138-
"sk_read": MyTableKey.FOO.key(str(i)),
139-
"field": f"boo-{i}",
140-
}
141-
for i in range(51)
142-
],
143-
batch_size=25,
144-
)
145-
assert len(responses) >= 3 # 51/25
146-
147-
for i in range(51):
148-
assert repository.read(id=str(i)).field == f"boo-{i}"
149-
150-
151-
def test_exponential_backoff_with_jitter():
152-
base_delay = 0.1
153-
max_delay = 5
154-
min_delay = 0.05
155-
n_samples = 1000
156-
157-
delays = []
158-
for retry in range(n_samples):
159-
delay = exponential_backoff_with_jitter(
160-
n_retries=retry,
161-
base_delay=base_delay,
162-
min_delay=min_delay,
163-
max_delay=max_delay,
164-
)
165-
assert max_delay >= delay >= min_delay
166-
delays.append(delay)
167-
assert len(set(delays)) == n_samples # all delays should be unique
168-
assert sum(delays[n_samples:]) < sum(
169-
delays[:n_samples]
170-
) # final delays should be larger than first delays
171-
172-
173-
@pytest.mark.parametrize(
174-
"error_code",
175-
[
176-
"ProvisionedThroughputExceededException",
177-
"ThrottlingException",
178-
"InternalServerError",
179-
],
180-
)
181-
def test_retry_with_jitter_all_fail(error_code: str):
182-
class MockException(Exception):
183-
def __init__(self, error_code):
184-
self.response = {"Error": {"Code": error_code}}
185-
186-
max_retries = 3
187-
188-
@retry_with_jitter(max_retries=max_retries, error=MockException)
189-
def throw(error_code):
190-
raise MockException(error_code=error_code)
191-
192-
with pytest.raises(ExceptionGroup) as exception_info:
193-
throw(error_code=error_code)
194-
195-
assert (
196-
exception_info.value.message
197-
== f"Failed to put item after {max_retries} retries"
198-
)
199-
assert len(exception_info.value.exceptions) == max_retries
200-
assert all(
201-
isinstance(exc, MockException) for exc in exception_info.value.exceptions
202-
)
203-
204-
205-
@pytest.mark.parametrize(
206-
"error_code",
207-
[
208-
"ProvisionedThroughputExceededException",
209-
"ThrottlingException",
210-
"InternalServerError",
211-
],
212-
)
213-
def test_retry_with_jitter_third_passes(error_code: str):
214-
class MockException(Exception):
215-
retries = 0
216-
217-
def __init__(self, error_code):
218-
self.response = {"Error": {"Code": error_code}}
219-
220-
max_retries = 3
221-
222-
@retry_with_jitter(max_retries=max_retries, error=MockException)
223-
def throw(error_code):
224-
if MockException.retries == max_retries - 1:
225-
return "foo"
226-
MockException.retries += 1
227-
raise MockException(error_code=error_code)
228-
229-
assert throw(error_code=error_code) == "foo"
230-
231-
232-
@pytest.mark.parametrize(
233-
"error_code",
234-
[
235-
"SomeOtherError",
236-
],
237-
)
238-
def test_retry_with_jitter_other_code(error_code: str):
239-
class MockException(Exception):
240-
def __init__(self, error_code):
241-
self.response = {"Error": {"Code": error_code}}
242-
243-
@retry_with_jitter(max_retries=3, error=MockException)
244-
def throw(error_code):
245-
raise MockException(error_code=error_code)
246-
247-
with pytest.raises(MockException) as exception_info:
248-
throw(error_code=error_code)
249-
250-
assert exception_info.value.response == {"Error": {"Code": error_code}}
251-
252-
253-
def test_retry_with_jitter_other_exception():
254-
@retry_with_jitter(max_retries=3, error=ValueError)
255-
def throw():
256-
raise TypeError()
257-
258-
with pytest.raises(TypeError):
259-
throw()

0 commit comments

Comments
 (0)