Skip to content

Commit

Permalink
[release/2024-11-19-a] Merge branch feature/PI-581-create_mhs_device_…
Browse files Browse the repository at this point in the history
…with_drd into release/2024-11-19-a
  • Loading branch information
jaklinger committed Nov 19, 2024
1 parent fadf0d2 commit 6633a17
Show file tree
Hide file tree
Showing 17 changed files with 452 additions and 122 deletions.
2 changes: 1 addition & 1 deletion src/api/createDevice/src/v1/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


@mark_validation_errors_as_inbound
def parse_device_payload(data, cache) -> Device:
def parse_device_payload(data, cache) -> CreateDeviceIncomingParams:
payload: dict = data[parse_event_body]
return CreateDeviceIncomingParams(**payload)

Expand Down
134 changes: 105 additions & 29 deletions src/api/createDeviceMessageHandlingSystem/src/v1/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,50 +8,91 @@
read_product_team,
)
from domain.core.cpm_product import CpmProduct
from domain.core.device import Device
from domain.core.error import InvalidSpineMhsResponse
from domain.core.device import (
MHS_DEVICE_NAME,
Device,
DeviceKeyAddedEvent,
DeviceReferenceDataIdAddedEvent,
DeviceTagAddedEvent,
QuestionnaireResponseUpdatedEvent,
)
from domain.core.device_key import DeviceKeyType
from domain.core.device_reference_data import DeviceReferenceData
from domain.core.error import ConfigurationError
from domain.core.product_team import ProductTeam
from domain.core.questionnaire import Questionnaire, QuestionnaireResponse
from domain.repository.device_reference_data_repository import (
DeviceReferenceDataRepository,
)
from domain.repository.device_repository import DeviceRepository
from domain.repository.questionnaire_repository import (
QuestionnaireInstance,
QuestionnaireRepository,
)
from domain.request_models import CreateMhsDeviceIncomingParams
from domain.request_models import CpmProductPathParams, CreateMhsDeviceIncomingParams
from domain.response.validation_errors import mark_validation_errors_as_inbound


@mark_validation_errors_as_inbound
def parse_mhs_device_payload(data, cache) -> Device:
def parse_mhs_device_payload(data, cache) -> CreateMhsDeviceIncomingParams:
payload: dict = data[parse_event_body]
return CreateMhsDeviceIncomingParams(**payload)


def check_for_existing_mhs(data, cache):
product_team: ProductTeam = data[read_product_team]
product: CpmProduct = data[read_product]

device_repo = DeviceRepository(
table_name=cache["DYNAMODB_TABLE"], dynamodb_client=cache["DYNAMODB_CLIENT"]
)

devices = device_repo.search(product_team_id=product_team.id, product_id=product.id)
if any(device.name == MHS_DEVICE_NAME for device in devices):
raise ConfigurationError(
"There is already an existing MHS Device for this Product"
)


def read_device_reference_data(data, cache) -> DeviceReferenceData:
path_params: CpmProductPathParams = data[parse_path_params]
drd_repo = DeviceReferenceDataRepository(
table_name=cache["DYNAMODB_TABLE"], dynamodb_client=cache["DYNAMODB_CLIENT"]
)
device_reference_datas = drd_repo.search(
product_id=path_params.product_id,
product_team_id=path_params.product_team_id,
)

party_key: str = data[get_party_key]
# use {QuestionnaireInstance.SPINE_MHS_MESSAGE_SETS}
mhs_message_set_drd_name = f"{party_key} - MHS Message Set"

try:
(device_reference_data,) = filter(
lambda drd: drd.name == mhs_message_set_drd_name, device_reference_datas
)
except ValueError:
raise ConfigurationError(
"You must configure exactly one MessageSet Device Reference Data before creating an MHS Device"
)
return device_reference_data


def read_spine_mhs_questionnaire(data, cache) -> Questionnaire:
return QuestionnaireRepository().read(QuestionnaireInstance.SPINE_MHS)


def validate_spine_mhs_questionnaire_response(data, cache) -> QuestionnaireResponse:
spine_mhs_questionnaire: Questionnaire = data[read_spine_mhs_questionnaire]
payload: CreateMhsDeviceIncomingParams = data[parse_mhs_device_payload]
questionnaire_responses = payload.questionnaire_responses

# Ensure there's a questionnaire named 'spine_mhs' in the responses
if QuestionnaireInstance.SPINE_MHS not in questionnaire_responses:
raise InvalidSpineMhsResponse(
"Require a 'spine_mhs' questionnaire response to create a MHS Device"
)

raw_spine_mhs_questionnaire_response = payload.questionnaire_responses[
spine_mhs_questionnaire_response = payload.questionnaire_responses[
QuestionnaireInstance.SPINE_MHS
]
# Ensure there's only one response to 'spine_mhs'
if len(raw_spine_mhs_questionnaire_response) != 1:
raise InvalidSpineMhsResponse(
"Expected only one response for the 'spine_mhs' questionnaire"
)

return spine_mhs_questionnaire.validate(
data=raw_spine_mhs_questionnaire_response[0]
data=spine_mhs_questionnaire_response.__root__[0]
)


Expand All @@ -64,32 +105,63 @@ def create_mhs_device(data, cache) -> Device:
return product.create_device(**device_payload)


def create_party_key_tag(data, cache):
def create_party_key_tag(data, cache) -> DeviceTagAddedEvent:
mhs_device: Device = data[create_mhs_device]
return mhs_device.add_tag(party_key=data[get_party_key])


def create_cpa_id_keys(data, cache) -> DeviceKeyAddedEvent:
mhs_device: Device = data[create_mhs_device]
mhs_device.add_tag(party_key=data[get_party_key])
party_key = data[get_party_key]
drd: DeviceReferenceData = data[read_device_reference_data]
interaction_ids = []

# Extract Interaction IDs from questionnaire responses
questionnaire_responses = drd.questionnaire_responses.get(
f"{QuestionnaireInstance.SPINE_MHS_MESSAGE_SETS}/1", []
)
for response in questionnaire_responses:
interaction_ids.append(response.data.get("Interaction ID"))

# Use cpa_id in furture
for id in interaction_ids:
mhs_device.add_key(
key_type=DeviceKeyType.INTERACTION_ID, key_value=f"{party_key}:{id}"
)

return mhs_device


def add_spine_mhs_questionnaire_response(data, cache) -> list[QuestionnaireResponse]:
def add_device_reference_data_id(data, cache) -> DeviceReferenceDataIdAddedEvent:
mhs_device: Device = data[create_mhs_device]
drd: DeviceReferenceData = data[read_device_reference_data]
return mhs_device.add_device_reference_data_id(
device_reference_data_id=str(drd.id), path_to_data=["*"]
)


def add_spine_mhs_questionnaire_response(
data, cache
) -> QuestionnaireResponseUpdatedEvent:
spine_mhs_questionnaire_response: QuestionnaireResponse = data[
validate_spine_mhs_questionnaire_response
]
mhs_device: Device = data[create_party_key_tag]
mhs_device.add_questionnaire_response(spine_mhs_questionnaire_response)
return mhs_device
mhs_device: Device = data[create_mhs_device]

return mhs_device.add_questionnaire_response(spine_mhs_questionnaire_response)


def write_device(data: dict[str, CpmProduct], cache) -> CpmProduct:
mhs_device: Device = data[add_spine_mhs_questionnaire_response]
def write_device(data: dict[str, Device], cache) -> Device:
mhs_device: Device = data[create_mhs_device]
repo = DeviceRepository(
table_name=cache["DYNAMODB_TABLE"], dynamodb_client=cache["DYNAMODB_CLIENT"]
)
return repo.write(mhs_device)


def set_http_status(data, cache) -> tuple[HTTPStatus, str]:
device: Device = data[create_mhs_device]
return HTTPStatus.CREATED, device.state_exclude_tags()
def set_http_status(data, cache) -> tuple[HTTPStatus, dict]:
mhs_device: Device = data[create_mhs_device]
return HTTPStatus.CREATED, mhs_device.state_exclude_tags()


steps = [
Expand All @@ -99,10 +171,14 @@ def set_http_status(data, cache) -> tuple[HTTPStatus, str]:
read_product_team,
read_product,
get_party_key,
check_for_existing_mhs,
read_device_reference_data,
read_spine_mhs_questionnaire,
validate_spine_mhs_questionnaire_response,
create_mhs_device,
create_party_key_tag,
create_cpa_id_keys,
add_device_reference_data_id,
add_spine_mhs_questionnaire_response,
write_device,
set_http_status,
Expand Down
111 changes: 105 additions & 6 deletions src/api/createDeviceMessageHandlingSystem/tests/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
from domain.core.product_key import ProductKeyType
from domain.core.root import Root
from domain.repository.cpm_product_repository import CpmProductRepository
from domain.repository.device_reference_data_repository import (
DeviceReferenceDataRepository,
)
from domain.repository.device_repository import DeviceRepository
from domain.repository.product_team_repository import ProductTeamRepository
from domain.repository.questionnaire_repository import (
QuestionnaireInstance,
QuestionnaireRepository,
)
from event.json import json_loads

from test_helpers.dynamodb import mock_table
Expand Down Expand Up @@ -49,7 +56,9 @@


@contextmanager
def mock_epr_product() -> Generator[tuple[ModuleType, CpmProduct], Any, None]:
def mock_epr_product_with_message_set_drd() -> (
Generator[tuple[ModuleType, CpmProduct], Any, None]
):
org = Root.create_ods_organisation(ods_code=ODS_CODE)
product_team = org.create_product_team(name=PRODUCT_TEAM_NAME)

Expand All @@ -72,6 +81,37 @@ def mock_epr_product() -> Generator[tuple[ModuleType, CpmProduct], Any, None]:
)
product_repo.write(entity=product)

# set up questionnaire response
mhs_message_set_questionnaire = QuestionnaireRepository().read(
QuestionnaireInstance.SPINE_MHS_MESSAGE_SETS
)
questionnaire_response = mhs_message_set_questionnaire.validate(
data={
"Interaction ID": "urn:foo",
"MHS SN": "bar",
"MHS IN": "baz",
}
)

questionnaire_response_2 = mhs_message_set_questionnaire.validate(
data={
"Interaction ID": "urn:foo2",
"MHS SN": "bar2",
"MHS IN": "baz2",
},
)

# Set up DeviceReferenceData in DB
device_reference_data = product.create_device_reference_data(
name="ABC1234-987654 - MHS Message Set"
)
device_reference_data.add_questionnaire_response(questionnaire_response)
device_reference_data.add_questionnaire_response(questionnaire_response_2)
device_reference_data_repo = DeviceReferenceDataRepository(
table_name=TABLE_NAME, dynamodb_client=client
)
device_reference_data_repo.write(device_reference_data)

import api.createDeviceMessageHandlingSystem.index as index

index.cache["DYNAMODB_CLIENT"] = client
Expand Down Expand Up @@ -109,8 +149,41 @@ def mock_not_epr_product() -> Generator[tuple[ModuleType, CpmProduct], Any, None
yield index, product


@contextmanager
def mock_epr_product_without_message_set_drd() -> (
Generator[tuple[ModuleType, CpmProduct], Any, None]
):
org = Root.create_ods_organisation(ods_code=ODS_CODE)
product_team = org.create_product_team(name=PRODUCT_TEAM_NAME)

with mock_table(table_name=TABLE_NAME) as client, mock.patch.dict(
os.environ,
{"DYNAMODB_TABLE": TABLE_NAME, "AWS_DEFAULT_REGION": "eu-west-2"},
clear=True,
):
product_team_repo = ProductTeamRepository(
table_name=TABLE_NAME, dynamodb_client=client
)
product_team_repo.write(entity=product_team)

product = product_team.create_cpm_product(
name=PRODUCT_NAME, product_id=PRODUCT_ID
)
product.add_key(key_type=ProductKeyType.PARTY_KEY, key_value="ABC1234-987654")
product_repo = CpmProductRepository(
table_name=TABLE_NAME, dynamodb_client=client
)
product_repo.write(entity=product)

import api.createDeviceMessageHandlingSystem.index as index

index.cache["DYNAMODB_CLIENT"] = client

yield index, product


def test_index() -> None:
with mock_epr_product() as (index, product):
with mock_epr_product_with_message_set_drd() as (index, product):
# Execute the lambda
response = index.handler(
event={
Expand Down Expand Up @@ -194,7 +267,7 @@ def test_index() -> None:
],
)
def test_incoming_errors(body, path_parameters, error_code, status_code):
with mock_epr_product() as (index, _):
with mock_epr_product_with_message_set_drd() as (index, _):
# Execute the lambda
response = index.handler(
event={
Expand All @@ -219,7 +292,7 @@ def test_incoming_errors(body, path_parameters, error_code, status_code):
},
},
"VALIDATION_ERROR",
"Expected only one response for the 'spine_mhs' questionnaire",
"CreateMhsDeviceIncomingParams.questionnaire_responses.spine_mhs.__root__: ensure this value has at most 1 items",
400,
),
(
Expand All @@ -237,7 +310,7 @@ def test_incoming_errors(body, path_parameters, error_code, status_code):
def test_questionnaire_response_validation_errors(
body, error_code, error_message, status_code
):
with mock_epr_product() as (index, product):
with mock_epr_product_with_message_set_drd() as (index, product):
# Execute the lambda
response = index.handler(
event={
Expand Down Expand Up @@ -274,6 +347,32 @@ def test_not_epr_product():

assert response["statusCode"] == 400
expected_error_code = "VALIDATION_ERROR"
expected_message_code = "Not an EPR Product: Cannot create MHS device for product without exactly one Party Key"
expected_message_code = "Not an EPR Product: Cannot create MHS Device for product without exactly one Party Key"
assert expected_error_code in response["body"]
assert expected_message_code in response["body"]


def test_no_existing_message_set_drd():
with mock_epr_product_without_message_set_drd() as (index, product):
# Execute the lambda
response = index.handler(
event={
"headers": {"version": VERSION},
"body": json.dumps(
{"questionnaire_responses": {"spine_mhs": [QUESTIONNAIRE_DATA]}}
),
"pathParameters": {
"product_team_id": str(product.product_team_id),
"product_id": str(product.id),
},
}
)

assert response["statusCode"] == 400
expected_error_code = "VALIDATION_ERROR"
expected_message_code = "You must configure exactly one MessageSet Device Reference Data before creating an MHS Device"
assert expected_error_code in response["body"]
assert expected_message_code in response["body"]


# add test for already existing mhs device?
4 changes: 3 additions & 1 deletion src/api/createDeviceReferenceData/src/v1/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@


@mark_validation_errors_as_inbound
def parse_device_reference_data_payload(data, cache) -> DeviceReferenceData:
def parse_device_reference_data_payload(
data, cache
) -> CreateDeviceReferenceDataIncomingParams:
payload: dict = data[parse_event_body]
return CreateDeviceReferenceDataIncomingParams(**payload)

Expand Down
2 changes: 1 addition & 1 deletion src/api/readDevice/src/v1/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def read_product(data, cache) -> CpmProduct:
table_name=cache["DYNAMODB_TABLE"], dynamodb_client=cache["DYNAMODB_CLIENT"]
)
cpm_product = product_repo.read(
product_team_id=product_team.id, id=path_params.product_id
id=path_params.product_id, product_team_id=product_team.id
)
return cpm_product

Expand Down
Loading

0 comments on commit 6633a17

Please sign in to comment.