Skip to content

Commit 4184d06

Browse files
committed
[feature/PI-618-bulk_etl] bulk etl local
1 parent 9aba96f commit 4184d06

35 files changed

+1937
-577
lines changed
File renamed without changes.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import json
2+
from collections import deque
3+
from dataclasses import asdict
4+
from io import BytesIO
5+
from typing import TYPE_CHECKING
6+
7+
import boto3
8+
from etl_utils.constants import WorkerKey
9+
from etl_utils.io import EtlEncoder, pkl_dump_lz4
10+
from etl_utils.ldif.ldif import filter_and_group_ldif_from_s3_by_property, parse_ldif
11+
from etl_utils.worker.action import apply_action
12+
from etl_utils.worker.model import WorkerActionResponse, WorkerEnvironment
13+
from etl_utils.worker.worker_step_chain import execute_step_chain
14+
from event.json import json_loads
15+
from nhs_context_logging import log_action
16+
from sds.domain.constants import FILTER_TERMS
17+
from sds.domain.parse import parse_sds_record
18+
19+
_log_action_without_inputs = lambda function: log_action(log_args=[], log_result=False)(
20+
function
21+
)
22+
23+
if TYPE_CHECKING:
24+
from mypy_boto3_s3 import S3Client
25+
26+
27+
S3_CLIENT = boto3.client("s3")
28+
ENVIRONMENT = WorkerEnvironment.build()
29+
30+
31+
@_log_action_without_inputs
32+
def _read(s3_client: "S3Client", s3_input_path: str) -> deque[tuple[dict]]:
33+
filtered_ldif_by_group = filter_and_group_ldif_from_s3_by_property(
34+
s3_path=s3_input_path,
35+
filter_terms=FILTER_TERMS,
36+
group_field="nhsMhsPartyKey",
37+
s3_client=s3_client,
38+
)
39+
return deque(
40+
tuple(parse_ldif(file_opener=BytesIO, path_or_data=filtered_ldif))
41+
for filtered_ldif in filtered_ldif_by_group
42+
)
43+
44+
45+
def extract(
46+
s3_client: "S3Client", s3_input_path: str, s3_output_path: str, max_records: int
47+
) -> WorkerActionResponse:
48+
unprocessed_records = _read(s3_client=s3_client, s3_input_path=s3_input_path)
49+
50+
processed_records = []
51+
52+
exception = apply_action(
53+
unprocessed_records=unprocessed_records,
54+
processed_records=processed_records,
55+
action=lambda record: [[parse_sds_record(*r).dict() for r in record]],
56+
record_serializer=lambda dns_and_records: json_loads(
57+
json.dumps([r[1] for r in dns_and_records], cls=EtlEncoder)
58+
),
59+
)
60+
61+
return WorkerActionResponse(
62+
unprocessed_records=unprocessed_records,
63+
processed_records=processed_records,
64+
exception=exception,
65+
s3_input_path=s3_input_path,
66+
s3_output_path=s3_output_path,
67+
)
68+
69+
70+
def handler(event, context):
71+
response = execute_step_chain(
72+
action=extract,
73+
s3_client=S3_CLIENT,
74+
s3_input_path=ENVIRONMENT.s3_path(WorkerKey.EXTRACT),
75+
s3_output_path=ENVIRONMENT.s3_path(WorkerKey.TRANSFORM),
76+
unprocessed_dumper=lambda **kwargs: None,
77+
processed_dumper=pkl_dump_lz4,
78+
)
79+
return asdict(response)
File renamed without changes.
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
dn: uniqueIdentifier=00000000000a,ou=Services,o=nhs
2+
objectClass: nhsMhs
3+
objectClass: top
4+
nhsApproverURP: myApprover
5+
nhsDateApproved: 20010101010101
6+
nhsDateDNSApproved: 20010101010101
7+
nhsDateRequested: 20010101010101
8+
nhsDNSApprover: myApprover
9+
nhsIDCode: AAA
10+
nhsMHSAckRequested: never
11+
nhsMhsCPAId: 00000000000a
12+
nhsMHSDuplicateElimination: never
13+
nhsMHSEndPoint: https://test.C3O9X.nhs.uk/
14+
nhsMhsFQDN: test.C3O9X.nhs.uk
15+
nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V001
16+
nhsMhsIPAddress: 0.0.0.0
17+
nhsMhsManufacturerOrg: LSP04
18+
nhsMHSPartyKey: AAA-111111
19+
nhsMHsSN: urn:nhs:names:services:ers
20+
nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001
21+
nhsProductKey: 111
22+
nhsProductName: My AAA Product
23+
nhsProductVersion: 2005.02
24+
nhsRequestorURP: myRequestor
25+
uniqueIdentifier: 00000000000a
26+
nhsContractPropertyTemplateKey: 14
27+
nhsEPInteractionType: FHIR
28+
nhsMHSIsAuthenticated: none
29+
30+
dn: uniqueIdentifier=000000000001,ou=Services,o=nhs
31+
objectClass: nhsAS
32+
objectClass: top
33+
nhsApproverURP: myApprover
34+
nhsAsClient: AAA
35+
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK01
36+
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001
37+
nhsDateApproved: 20010101010101
38+
nhsDateRequested: 20010101010101
39+
nhsIDCode: AAA
40+
nhsMhsManufacturerOrg: LSP04
41+
nhsMHSPartyKey: AAA-111111
42+
nhsProductKey: 111
43+
nhsProductName: My AAA Product
44+
nhsProductVersion: 2005.02
45+
nhsRequestorURP: myRequestor
46+
nhsTempUid: 111
47+
uniqueIdentifier: 000000000001
48+
49+
dn: uniqueIdentifier=000000000002,ou=Services,o=nhs
50+
objectClass: nhsAS
51+
objectClass: top
52+
nhsApproverURP: myApprover
53+
nhsAsClient: BBB
54+
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK02
55+
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002
56+
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003
57+
nhsDateApproved: 20020202020202
58+
nhsDateRequested: 20020202020202
59+
nhsIDCode: BBB
60+
nhsMhsManufacturerOrg: LSP04
61+
nhsMHSPartyKey: BBB-111111
62+
nhsProductKey: 222
63+
nhsProductName: My BBB Product
64+
nhsProductVersion: 2005.02
65+
nhsRequestorURP: myRequestor
66+
nhsTempUid: 222
67+
uniqueIdentifier: 000000000002
68+
69+
dn: uniqueIdentifier=00000000000b,ou=Services,o=nhs
70+
objectClass: nhsMhs
71+
objectClass: top
72+
nhsApproverURP: myApprover
73+
nhsDateApproved: 20020202020202
74+
nhsDateDNSApproved: 20020202020202
75+
nhsDateRequested: 20020202020202
76+
nhsDNSApprover: myApprover
77+
nhsIDCode: BBB
78+
nhsMHSAckRequested: never
79+
nhsMhsCPAId: 00000000000b
80+
nhsMHSDuplicateElimination: never
81+
nhsMHSEndPoint: https://test.C3O9X.nhs.uk/
82+
nhsMhsFQDN: test.C3O9X.nhs.uk
83+
nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V002
84+
nhsMhsIPAddress: 0.0.0.0
85+
nhsMhsManufacturerOrg: LSP04
86+
nhsMHSPartyKey: BBB-111111
87+
nhsMHsSN: urn:nhs:names:services:ers
88+
nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002
89+
nhsProductKey: 111
90+
nhsProductName: My BBB Product
91+
nhsProductVersion: 2005.02
92+
nhsRequestorURP: myRequestor
93+
uniqueIdentifier: 00000000000b
94+
nhsContractPropertyTemplateKey: 14
95+
nhsEPInteractionType: FHIR
96+
nhsMHSIsAuthenticated: none
97+
98+
dn: uniqueIdentifier=000000000003,ou=Services,o=nhs
99+
objectClass: nhsAS
100+
objectClass: top
101+
nhsApproverURP: myApprover
102+
nhsAsClient: BBB
103+
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK03
104+
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002
105+
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003
106+
nhsDateApproved: 20030303030303
107+
nhsDateRequested: 20030303030303
108+
nhsIDCode: BBB
109+
nhsMhsManufacturerOrg: LSP04
110+
nhsMHSPartyKey: BBB-111111
111+
nhsProductKey: 333
112+
nhsProductName: My BBB Product
113+
nhsProductVersion: 2005.02
114+
nhsRequestorURP: myRequestor
115+
nhsTempUid: 333
116+
uniqueIdentifier: 000000000003
117+
118+
dn: uniqueIdentifier=000000000004,ou=Services,o=nhs
119+
objectClass: nhsAS
120+
objectClass: top
121+
nhsApproverURP: myApprover
122+
nhsAsClient: AAA
123+
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK04
124+
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001
125+
nhsDateApproved: 20040404040404
126+
nhsDateRequested: 20040404040404
127+
nhsIDCode: AAA
128+
nhsMhsManufacturerOrg: LSP04
129+
nhsMHSPartyKey: AAA-111111
130+
nhsProductKey: 444
131+
nhsProductName: My AAA Product
132+
nhsProductVersion: 2005.02
133+
nhsRequestorURP: myRequestor
134+
nhsTempUid: 444
135+
uniqueIdentifier: 000000000004
136+
137+
138+
dn: uniqueIdentifier=00000000000c,ou=Services,o=nhs
139+
objectClass: nhsMhs
140+
objectClass: top
141+
nhsApproverURP: myApprover
142+
nhsDateApproved: 20020202020202
143+
nhsDateDNSApproved: 20020202020202
144+
nhsDateRequested: 20020202020202
145+
nhsDNSApprover: myApprover
146+
nhsIDCode: BBB
147+
nhsMHSAckRequested: never
148+
nhsMhsCPAId: 00000000000c
149+
nhsMHSDuplicateElimination: never
150+
nhsMHSEndPoint: https://test.C3O9X.nhs.uk/
151+
nhsMhsFQDN: test.C3O9X.nhs.uk
152+
nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V002
153+
nhsMhsIPAddress: 0.0.0.0
154+
nhsMhsManufacturerOrg: LSP04
155+
nhsMHSPartyKey: BBB-111111
156+
nhsMHsSN: urn:nhs:names:services:ers
157+
nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003
158+
nhsProductKey: 111
159+
nhsProductName: My BBB Product
160+
nhsProductVersion: 2005.02
161+
nhsRequestorURP: myRequestor
162+
uniqueIdentifier: 00000000000c
163+
nhsContractPropertyTemplateKey: 14
164+
nhsEPInteractionType: FHIR
165+
nhsMHSIsAuthenticated: none

0 commit comments

Comments
 (0)