Skip to content

Commit

Permalink
[feature/PI-618-bulk_etl] bulk etl local
Browse files Browse the repository at this point in the history
  • Loading branch information
jaklinger committed Nov 25, 2024
1 parent e4f5e85 commit 241d3b0
Show file tree
Hide file tree
Showing 35 changed files with 1,937 additions and 577 deletions.
File renamed without changes.
79 changes: 79 additions & 0 deletions src/etl/sds/worker/bulk/extract_bulk/extract_bulk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
from collections import deque
from dataclasses import asdict
from io import BytesIO
from typing import TYPE_CHECKING

import boto3
from etl_utils.constants import WorkerKey
from etl_utils.io import EtlEncoder, pkl_dump_lz4
from etl_utils.ldif.ldif import filter_and_group_ldif_from_s3_by_property, parse_ldif
from etl_utils.worker.action import apply_action
from etl_utils.worker.model import WorkerActionResponse, WorkerEnvironment
from etl_utils.worker.worker_step_chain import execute_step_chain
from event.json import json_loads
from nhs_context_logging import log_action
from sds.domain.constants import FILTER_TERMS
from sds.domain.parse import parse_sds_record

_log_action_without_inputs = lambda function: log_action(log_args=[], log_result=False)(
function
)

if TYPE_CHECKING:
from mypy_boto3_s3 import S3Client


S3_CLIENT = boto3.client("s3")
ENVIRONMENT = WorkerEnvironment.build()


@_log_action_without_inputs
def _read(s3_client: "S3Client", s3_input_path: str) -> deque[tuple[dict]]:
filtered_ldif_by_group = filter_and_group_ldif_from_s3_by_property(
s3_path=s3_input_path,
filter_terms=FILTER_TERMS,
group_field="nhsMhsPartyKey",
s3_client=s3_client,
)
return deque(
tuple(parse_ldif(file_opener=BytesIO, path_or_data=filtered_ldif))
for filtered_ldif in filtered_ldif_by_group
)


def extract(
s3_client: "S3Client", s3_input_path: str, s3_output_path: str, max_records: int
) -> WorkerActionResponse:
unprocessed_records = _read(s3_client=s3_client, s3_input_path=s3_input_path)

processed_records = []

exception = apply_action(
unprocessed_records=unprocessed_records,
processed_records=processed_records,
action=lambda record: [[parse_sds_record(*r).dict() for r in record]],
record_serializer=lambda dns_and_records: json_loads(
json.dumps([r[1] for r in dns_and_records], cls=EtlEncoder)
),
)

return WorkerActionResponse(
unprocessed_records=unprocessed_records,
processed_records=processed_records,
exception=exception,
s3_input_path=s3_input_path,
s3_output_path=s3_output_path,
)


def handler(event, context):
response = execute_step_chain(
action=extract,
s3_client=S3_CLIENT,
s3_input_path=ENVIRONMENT.s3_path(WorkerKey.EXTRACT),
s3_output_path=ENVIRONMENT.s3_path(WorkerKey.TRANSFORM),
unprocessed_dumper=lambda **kwargs: None,
processed_dumper=pkl_dump_lz4,
)
return asdict(response)
File renamed without changes.
165 changes: 165 additions & 0 deletions src/etl/sds/worker/bulk/extract_bulk/tests/extract_bulk_input.ldif
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
dn: uniqueIdentifier=00000000000a,ou=Services,o=nhs
objectClass: nhsMhs
objectClass: top
nhsApproverURP: myApprover
nhsDateApproved: 20010101010101
nhsDateDNSApproved: 20010101010101
nhsDateRequested: 20010101010101
nhsDNSApprover: myApprover
nhsIDCode: AAA
nhsMHSAckRequested: never
nhsMhsCPAId: 00000000000a
nhsMHSDuplicateElimination: never
nhsMHSEndPoint: https://test.C3O9X.nhs.uk/
nhsMhsFQDN: test.C3O9X.nhs.uk
nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V001
nhsMhsIPAddress: 0.0.0.0
nhsMhsManufacturerOrg: LSP04
nhsMHSPartyKey: AAA-111111
nhsMHsSN: urn:nhs:names:services:ers
nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001
nhsProductKey: 111
nhsProductName: My AAA Product
nhsProductVersion: 2005.02
nhsRequestorURP: myRequestor
uniqueIdentifier: 00000000000a
nhsContractPropertyTemplateKey: 14
nhsEPInteractionType: FHIR
nhsMHSIsAuthenticated: none

dn: uniqueIdentifier=000000000001,ou=Services,o=nhs
objectClass: nhsAS
objectClass: top
nhsApproverURP: myApprover
nhsAsClient: AAA
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK01
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001
nhsDateApproved: 20010101010101
nhsDateRequested: 20010101010101
nhsIDCode: AAA
nhsMhsManufacturerOrg: LSP04
nhsMHSPartyKey: AAA-111111
nhsProductKey: 111
nhsProductName: My AAA Product
nhsProductVersion: 2005.02
nhsRequestorURP: myRequestor
nhsTempUid: 111
uniqueIdentifier: 000000000001

dn: uniqueIdentifier=000000000002,ou=Services,o=nhs
objectClass: nhsAS
objectClass: top
nhsApproverURP: myApprover
nhsAsClient: BBB
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK02
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003
nhsDateApproved: 20020202020202
nhsDateRequested: 20020202020202
nhsIDCode: BBB
nhsMhsManufacturerOrg: LSP04
nhsMHSPartyKey: BBB-111111
nhsProductKey: 222
nhsProductName: My BBB Product
nhsProductVersion: 2005.02
nhsRequestorURP: myRequestor
nhsTempUid: 222
uniqueIdentifier: 000000000002

dn: uniqueIdentifier=00000000000b,ou=Services,o=nhs
objectClass: nhsMhs
objectClass: top
nhsApproverURP: myApprover
nhsDateApproved: 20020202020202
nhsDateDNSApproved: 20020202020202
nhsDateRequested: 20020202020202
nhsDNSApprover: myApprover
nhsIDCode: BBB
nhsMHSAckRequested: never
nhsMhsCPAId: 00000000000b
nhsMHSDuplicateElimination: never
nhsMHSEndPoint: https://test.C3O9X.nhs.uk/
nhsMhsFQDN: test.C3O9X.nhs.uk
nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V002
nhsMhsIPAddress: 0.0.0.0
nhsMhsManufacturerOrg: LSP04
nhsMHSPartyKey: BBB-111111
nhsMHsSN: urn:nhs:names:services:ers
nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002
nhsProductKey: 111
nhsProductName: My BBB Product
nhsProductVersion: 2005.02
nhsRequestorURP: myRequestor
uniqueIdentifier: 00000000000b
nhsContractPropertyTemplateKey: 14
nhsEPInteractionType: FHIR
nhsMHSIsAuthenticated: none

dn: uniqueIdentifier=000000000003,ou=Services,o=nhs
objectClass: nhsAS
objectClass: top
nhsApproverURP: myApprover
nhsAsClient: BBB
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK03
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V002
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003
nhsDateApproved: 20030303030303
nhsDateRequested: 20030303030303
nhsIDCode: BBB
nhsMhsManufacturerOrg: LSP04
nhsMHSPartyKey: BBB-111111
nhsProductKey: 333
nhsProductName: My BBB Product
nhsProductVersion: 2005.02
nhsRequestorURP: myRequestor
nhsTempUid: 333
uniqueIdentifier: 000000000003

dn: uniqueIdentifier=000000000004,ou=Services,o=nhs
objectClass: nhsAS
objectClass: top
nhsApproverURP: myApprover
nhsAsClient: AAA
nhsAsSvcIA: urn:nhs:names:services:pds:QUPA_IN040000UK04
nhsAsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001
nhsDateApproved: 20040404040404
nhsDateRequested: 20040404040404
nhsIDCode: AAA
nhsMhsManufacturerOrg: LSP04
nhsMHSPartyKey: AAA-111111
nhsProductKey: 444
nhsProductName: My AAA Product
nhsProductVersion: 2005.02
nhsRequestorURP: myRequestor
nhsTempUid: 444
uniqueIdentifier: 000000000004


dn: uniqueIdentifier=00000000000c,ou=Services,o=nhs
objectClass: nhsMhs
objectClass: top
nhsApproverURP: myApprover
nhsDateApproved: 20020202020202
nhsDateDNSApproved: 20020202020202
nhsDateRequested: 20020202020202
nhsDNSApprover: myApprover
nhsIDCode: BBB
nhsMHSAckRequested: never
nhsMhsCPAId: 00000000000c
nhsMHSDuplicateElimination: never
nhsMHSEndPoint: https://test.C3O9X.nhs.uk/
nhsMhsFQDN: test.C3O9X.nhs.uk
nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V002
nhsMhsIPAddress: 0.0.0.0
nhsMhsManufacturerOrg: LSP04
nhsMHSPartyKey: BBB-111111
nhsMHsSN: urn:nhs:names:services:ers
nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V003
nhsProductKey: 111
nhsProductName: My BBB Product
nhsProductVersion: 2005.02
nhsRequestorURP: myRequestor
uniqueIdentifier: 00000000000c
nhsContractPropertyTemplateKey: 14
nhsEPInteractionType: FHIR
nhsMHSIsAuthenticated: none
Loading

0 comments on commit 241d3b0

Please sign in to comment.