diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3648e5175..0b7e6dc86 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: rev: v1.4.0 hooks: - id: detect-secrets - exclude: ".pre-commit-config.yaml|infrastructure/localstack/provider.tf|src/api/searchDevice/src/data/response.py" + exclude: ".pre-commit-config.yaml|infrastructure/localstack/provider.tf|src/api/searchDevice/src/data/response.py|src/etl/sds/tests/changelog" - repo: https://github.com/prettier/pre-commit rev: 57f39166b5a5a504d6808b87ab98d41ebf095b46 diff --git a/CHANGELOG.md b/CHANGELOG.md index 13cce9854..54c7841f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 2024-06-04 +- [PI-322] State machine lambda error messages truncated +- [PI-346] Update now includes modify changes, plus testing suite +- [PI-347] Updates implemented sequentially +- [PI-358] Disable SDS ETL update timer on persistent environments + ## 2024-05-31 - [PI-342] Add redacted-fields to context loggers - [PI-376] Check releases fully rebased on main diff --git a/VERSION b/VERSION index 5b76f3d67..a9bd38319 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2024.05.31 +2024.06.04 diff --git a/changelog/2024-06-04.md b/changelog/2024-06-04.md new file mode 100644 index 000000000..82091d1a5 --- /dev/null +++ b/changelog/2024-06-04.md @@ -0,0 +1,4 @@ +- [PI-322] State machine lambda error messages truncated +- [PI-346] Update now includes modify changes, plus testing suite +- [PI-347] Updates implemented sequentially +- [PI-358] Disable SDS ETL update timer on persistent environments diff --git a/infrastructure/terraform/per_workspace/modules/etl/sds/etl-diagram--update-transform-and-load.asl.json b/infrastructure/terraform/per_workspace/modules/etl/sds/etl-diagram--update-transform-and-load.asl.json new file mode 100644 index 000000000..8e4c19297 --- /dev/null +++ b/infrastructure/terraform/per_workspace/modules/etl/sds/etl-diagram--update-transform-and-load.asl.json @@ -0,0 +1,91 @@ +{ + "Comment": "SDS ETL - Sequential Transform and Load for updates", + "TimeoutSeconds": 60, + "StartAt": "load", + "States": { + "load": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${load_worker_arn}:$LATEST", + "Payload": { + "max_records": "1" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "Next": "load-result" + }, + "load-result": { + "Type": "Choice", + "Choices": [ + { + "Not": { + "Variable": "$.error_message", + "IsNull": true + }, + "Next": "etl-stopped" + }, + { + "And": [ + { + "Variable": "$.error_message", + "IsNull": true + }, + { + "Not": { + "Variable": "$.unprocessed_records", + "NumericEquals": 0 + } + } + ], + "Next": "load" + } + ], + "Default": "transform" + }, + "transform": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${transform_worker_arn}:$LATEST", + "Payload": { + "max_records": "1", + "trust": false + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "Next": "etl-stopped" + }, + + "etl-stopped": { + "Type": "Pass", + "End": true + } + } +} diff --git a/infrastructure/terraform/per_workspace/modules/etl/sds/step-function.asl.json b/infrastructure/terraform/per_workspace/modules/etl/sds/etl-diagram.asl.json similarity index 58% rename from infrastructure/terraform/per_workspace/modules/etl/sds/step-function.asl.json rename to infrastructure/terraform/per_workspace/modules/etl/sds/etl-diagram.asl.json index e4c22267e..884ca4332 100644 --- a/infrastructure/terraform/per_workspace/modules/etl/sds/step-function.asl.json +++ b/infrastructure/terraform/per_workspace/modules/etl/sds/etl-diagram.asl.json @@ -13,7 +13,7 @@ "Type": "Choice", "Choices": [ { - "Variable": "$.changelog-number", + "Variable": "$.changelog_number_end", "IsPresent": true, "Next": "write-changelog-number" } @@ -24,7 +24,7 @@ "Type": "Task", "End": true, "Parameters": { - "Body.$": "$.changelog-number", + "Body.$": "$.changelog_number_end", "Bucket": "${etl_bucket}", "Key": "${changelog_key}" }, @@ -37,25 +37,29 @@ } }, { - "StartAt": "parse-init", + "StartAt": "parse-etl-type", "States": { - "parse-init": { + "parse-etl-type": { "Type": "Choice", "Choices": [ { - "Variable": "$.init", - "StringEquals": "extract", - "Next": "extract" + "Variable": "$.etl_type", + "StringEquals": "bulk", + "Next": "extract-bulk" + }, + { + "Variable": "$.etl_type", + "StringEquals": "update", + "Next": "extract-update" } - ], - "Default": "invalid-init-value" + ] }, - "extract": { + "extract-bulk": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { - "Payload.$": "$", + "Payload": {}, "FunctionName": "${extract_worker_arn}:$LATEST" }, "Retry": [ @@ -71,25 +75,27 @@ "BackoffRate": 2 } ], - "Next": "extract-result" + "Next": "extract-bulk-result" }, - "extract-result": { + "extract-bulk-result": { "Type": "Choice", "Choices": [ { "Variable": "$.error_message", "IsNull": true, - "Next": "transform" + "Next": "transform-bulk" } ], - "Default": "worker-failed" + "Default": "etl-stopped" }, - "transform": { + "transform-bulk": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { - "Payload.$": "$", + "Payload": { + "trust": true + }, "FunctionName": "${transform_worker_arn}:$LATEST" }, "Retry": [ @@ -105,25 +111,27 @@ "BackoffRate": 2 } ], - "Next": "transform-result" + "Next": "transform-bulk-result" }, - "transform-result": { + "transform-bulk-result": { "Type": "Choice", "Choices": [ { "Variable": "$.error_message", "IsNull": true, - "Next": "load" + "Next": "load-bulk" } ], - "Default": "worker-failed" + "Default": "etl-stopped" }, - "load": { + "load-bulk": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { - "Payload.$": "$", + "Payload": { + "max_records": "${bulk_load_chunksize}" + }, "FunctionName": "${load_worker_arn}:$LATEST" }, "Retry": [ @@ -139,18 +147,11 @@ "BackoffRate": 2 } ], - "Next": "load-result" + "Next": "load-bulk-result" }, - "load-result": { + "load-bulk-result": { "Type": "Choice", "Choices": [ - { - "Not": { - "Variable": "$.error_message", - "IsNull": true - }, - "Next": "worker-failed" - }, { "And": [ { @@ -164,23 +165,90 @@ } } ], - "Next": "load" + "Next": "load-bulk" } ], - "Default": "load-successful" + "Default": "etl-stopped" }, - "load-successful": { - "Type": "Pass", - "End": true + "extract-update": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload": {}, + "FunctionName": "${extract_worker_arn}:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "Next": "extract-update-result" }, - "worker-failed": { + "extract-update-result": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.error_message", + "IsNull": true, + "Next": "etl-update" + } + ], + "Default": "etl-stopped" + }, + "etl-update": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${etl_update_state_machine_arn}", + "Input": { + "StatePayload": {}, + "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" + } + }, + "OutputPath": "$.Output", + "Next": "etl-update-result" + }, + "etl-update-result": { + "Type": "Choice", + "Choices": [ + { + "Or": [ + { + "Not": { + "Variable": "$.error_message", + "IsNull": true + } + }, + { + "And": [ + { + "Variable": "$.processed_records", + "NumericEquals": 0 + }, + { + "Variable": "$.unprocessed_records", + "NumericEquals": 0 + } + ] + } + ], + "Next": "etl-stopped" + } + ], + "Default": "etl-update" + }, + "etl-stopped": { "Type": "Pass", "End": true - }, - "invalid-init-value": { - "Type": "Fail", - "Error": "InvalidInitValue", - "Cause": "Invalid Value of Init" } } } diff --git a/infrastructure/terraform/per_workspace/modules/etl/sds/main.tf b/infrastructure/terraform/per_workspace/modules/etl/sds/main.tf index 29473e73e..59fdc15ee 100644 --- a/infrastructure/terraform/per_workspace/modules/etl/sds/main.tf +++ b/infrastructure/terraform/per_workspace/modules/etl/sds/main.tf @@ -195,53 +195,42 @@ resource "aws_cloudwatch_log_group" "step_function" { name = "/aws/vendedlogs/states/${var.workspace_prefix}--${local.etl_name}" } -module "step_function" { +module "update_transform_and_load_step_function" { source = "terraform-aws-modules/step-functions/aws" - version = "4.1.0" + version = "4.2.0" type = "STANDARD" - name = "${var.workspace_prefix}--${local.etl_name}" + name = "${var.workspace_prefix}--${local.etl_name}--update-transform-and-load" use_existing_cloudwatch_log_group = true cloudwatch_log_group_name = aws_cloudwatch_log_group.step_function.name definition = templatefile( - "${path.module}/step-function.asl.json", + "${path.module}/etl-diagram--update-transform-and-load.asl.json", { - extract_worker_arn = module.worker_extract.arn transform_worker_arn = module.worker_transform.arn load_worker_arn = module.worker_load.arn - notify_arn = module.notify.arn etl_bucket = module.bucket.s3_bucket_id - changelog_key = var.changelog_key } ) service_integrations = { lambda = { lambda = [ - module.worker_extract.arn, module.worker_transform.arn, module.worker_load.arn, - module.notify.arn + "${module.worker_transform.arn}:*", + "${module.worker_load.arn}:*" ] } } + + attach_policy_json = true policy_json = <<-EOT { "Version": "2012-10-17", "Statement": [ - { - "Action": "lambda:InvokeFunction", - "Effect": "Allow", - "Resource": [ - "${module.worker_extract.arn}:*", - "${module.worker_transform.arn}:*", - "${module.worker_load.arn}:*", - "${module.notify.arn}:*" - ] - }, { "Action": [ "s3:PutObject", @@ -262,6 +251,8 @@ module "step_function" { } EOT + + logging_configuration = { log_destination = "${aws_cloudwatch_log_group.step_function.arn}:*" include_execution_data = true @@ -269,12 +260,42 @@ module "step_function" { } tags = { - Name = "${var.workspace_prefix}--${local.etl_name}" + Name = "${var.workspace_prefix}--${local.etl_name}--update-transform-and-load" } depends_on = [aws_cloudwatch_log_group.step_function] } +resource "aws_sfn_state_machine" "state_machine" { + name = "${var.workspace_prefix}--${local.etl_name}" + type = "STANDARD" + role_arn = aws_iam_role.step_function.arn + definition = templatefile( + "${path.module}/etl-diagram.asl.json", + { + extract_worker_arn = module.worker_extract.arn + transform_worker_arn = module.worker_transform.arn + load_worker_arn = module.worker_load.arn + notify_arn = module.notify.arn + etl_bucket = module.bucket.s3_bucket_id + changelog_key = var.changelog_key + bulk_load_chunksize = var.bulk_load_chunksize + etl_update_state_machine_arn = module.update_transform_and_load_step_function.state_machine_arn + } + ) + logging_configuration { + log_destination = "${aws_cloudwatch_log_group.step_function.arn}:*" + include_execution_data = true + level = "ALL" + } + + + tags = { + Name = "${var.workspace_prefix}--${local.etl_name}" + } + depends_on = [aws_cloudwatch_log_group.step_function, module.update_transform_and_load_step_function, aws_iam_role.step_function] +} + module "trigger_bulk" { source = "./trigger/" @@ -289,7 +310,7 @@ module "trigger_bulk" { etl_bucket_arn = module.bucket.s3_bucket_arn etl_layer_arn = module.etl_layer.lambda_layer_arn notify_lambda_arn = module.notify.arn - state_machine_arn = module.step_function.state_machine_arn + state_machine_arn = aws_sfn_state_machine.state_machine.arn table_arn = var.table_arn environment_variables = { TABLE_NAME = var.table_name @@ -339,7 +360,7 @@ module "trigger_update" { etl_bucket_arn = module.bucket.s3_bucket_arn etl_layer_arn = module.etl_layer.lambda_layer_arn notify_lambda_arn = module.notify.arn - state_machine_arn = module.step_function.state_machine_arn + state_machine_arn = aws_sfn_state_machine.state_machine.arn table_arn = var.table_arn allowed_triggers = {} environment_variables = { @@ -397,10 +418,11 @@ module "trigger_update" { } module "schedule_trigger_update" { - source = "./schedule/" - lambda_arn = module.trigger_update.lambda_function.lambda_function_arn - lambda_name = module.trigger_update.lambda_function.lambda_function_name - schedule_expression = var.is_persistent ? "rate(15 minutes)" : "rate(1 day)" + source = "./schedule/" + lambda_arn = module.trigger_update.lambda_function.lambda_function_arn + lambda_name = module.trigger_update.lambda_function.lambda_function_name + # schedule_expression = var.is_persistent ? "rate(15 minutes)" : "rate(1 day)" + schedule_expression = var.is_persistent ? "cron(0 0 1 1 ? 2000)" : "rate(1 day)" # cron(0 0 1 1 ? 2000) means "never" } module "bulk_trigger_notification" { diff --git a/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf b/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf index 5a8e10dc6..43b18854c 100644 --- a/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf +++ b/infrastructure/terraform/per_workspace/modules/etl/sds/output.tf @@ -1,5 +1,5 @@ output "state_machine_arn" { - value = module.step_function.state_machine_arn + value = aws_sfn_state_machine.state_machine.arn } output "bucket" { diff --git a/infrastructure/terraform/per_workspace/modules/etl/sds/step_function_role.tf b/infrastructure/terraform/per_workspace/modules/etl/sds/step_function_role.tf new file mode 100644 index 000000000..6300c360f --- /dev/null +++ b/infrastructure/terraform/per_workspace/modules/etl/sds/step_function_role.tf @@ -0,0 +1,117 @@ +locals { + name = "${var.workspace_prefix}--${local.etl_name}" + tags = { + Name = "${var.workspace_prefix}--${local.etl_name}" + } +} + +locals { + service_integrations = { + lambda = { + actions = ["lambda:InvokeFunction"] + resources = [ + module.worker_extract.arn, + module.worker_transform.arn, + module.worker_load.arn, + module.notify.arn, + "${module.worker_extract.arn}:*", + "${module.worker_transform.arn}:*", + "${module.worker_load.arn}:*", + "${module.notify.arn}:*" + ] + } + + step_function_start = { + actions = ["states:StartExecution", "states:StartSyncExecution"] + resources = [module.update_transform_and_load_step_function.state_machine_arn] + } + + step_function_stop = { + actions = ["states:DescribeExecution", "states:StopExecution"] + resources = ["${replace(module.update_transform_and_load_step_function.state_machine_arn, "stateMachine", "execution")}:*"] + } + + step_function_event_polling = { + actions = ["events:PutTargets", "events:PutRule", "events:DescribeRule"] + resources = ["arn:aws:events:eu-west-2:${var.assume_account}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule"] + } + + s3 = { + actions = [ + "s3:PutObject", + "s3:AbortMultipartUpload", + "s3:GetBucketLocation", + "s3:GetObject", + "s3:ListBucket", + "s3:ListBucketMultipartUploads", + "s3:PutObjectVersionTagging" + ] + resources = [ + "${module.bucket.s3_bucket_arn}", + "${module.bucket.s3_bucket_arn}/*" + ] + } + + logging = { + actions = [ + "logs:CreateLogDelivery", + "logs:GetLogDelivery", + "logs:UpdateLogDelivery", + "logs:DeleteLogDelivery", + "logs:ListLogDeliveries", + "logs:PutResourcePolicy", + "logs:DescribeResourcePolicies", + "logs:DescribeLogGroups", + ] + resources = ["*"] + } + } + + depends_on = [module.bucket, module.update_transform_and_load_step_function] +} + +data "aws_iam_policy_document" "assume_role" { + statement { + effect = "Allow" + actions = ["sts:AssumeRole"] + + principals { + type = "Service" + identifiers = ["states.eu-west-2.amazonaws.com"] + } + } +} + +resource "aws_iam_role" "step_function" { + name = "${local.name}--role" + assume_role_policy = data.aws_iam_policy_document.assume_role.json + tags = local.tags +} + +data "aws_iam_policy_document" "service" { + for_each = { for k, v in local.service_integrations : k => v } + + dynamic "statement" { + for_each = [each.value] + content { + effect = "Allow" + sid = replace("${local.name}${each.key}", "/[^0-9A-Za-z]*/", "") + actions = each.value.actions + resources = each.value.resources + } + } +} + +resource "aws_iam_policy" "service" { + for_each = { for k, v in local.service_integrations : k => v } + name = "${local.name}--${each.key}" + policy = data.aws_iam_policy_document.service[each.key].json + tags = local.tags +} + +resource "aws_iam_policy_attachment" "service" { + for_each = { for k, v in local.service_integrations : k => v } + name = "${local.name}--${each.key}" + roles = [aws_iam_role.step_function.name] + policy_arn = aws_iam_policy.service[each.key].arn +} diff --git a/infrastructure/terraform/per_workspace/modules/etl/sds/vars.tf b/infrastructure/terraform/per_workspace/modules/etl/sds/vars.tf index e736d2b43..ee486f7bd 100644 --- a/infrastructure/terraform/per_workspace/modules/etl/sds/vars.tf +++ b/infrastructure/terraform/per_workspace/modules/etl/sds/vars.tf @@ -9,6 +9,9 @@ variable "domain_layer" {} variable "changelog_key" { default = "changelog-number" } +variable "bulk_load_chunksize" { + default = 10000 +} variable "table_name" {} variable "table_arn" {} variable "is_persistent" {} diff --git a/pyproject.toml b/pyproject.toml index 4a4326a94..81677fd4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "connecting-party-manager" -version = "2024.05.31" +version = "2024.06.04" description = "Repository for the Connecting Party Manager API and related services" authors = ["NHS England"] license = "LICENSE.md" diff --git a/scripts/etl/etl.mk b/scripts/etl/etl.mk index 6636c7941..921ffea90 100644 --- a/scripts/etl/etl.mk +++ b/scripts/etl/etl.mk @@ -3,3 +3,7 @@ WORKSPACE := etl--clear-state: aws--login ## Clear the ETL state AWS_DEFAULT_REGION=$(AWS_DEFAULT_REGION) AWS_ACCESS_KEY_ID=$(AWS_ACCESS_KEY_ID) AWS_SECRET_ACCESS_KEY=$(AWS_SECRET_ACCESS_KEY) AWS_SESSION_TOKEN=$(AWS_SESSION_TOKEN) poetry run python scripts/etl/clear_state_inputs.py "$(SET_CHANGELOG_NUMBER)" "$(WORKSPACE)" + + +etl--head-state: aws--login ## Download the head of the ETL state + AWS_DEFAULT_REGION=$(AWS_DEFAULT_REGION) AWS_ACCESS_KEY_ID=$(AWS_ACCESS_KEY_ID) AWS_SECRET_ACCESS_KEY=$(AWS_SECRET_ACCESS_KEY) AWS_SESSION_TOKEN=$(AWS_SESSION_TOKEN) poetry run python scripts/etl/head_etl.py "$(WORKSPACE)" diff --git a/scripts/etl/head_etl.py b/scripts/etl/head_etl.py new file mode 100644 index 000000000..bfba17e5c --- /dev/null +++ b/scripts/etl/head_etl.py @@ -0,0 +1,85 @@ +""" +Run with + + poetry run python scripts/etl/head_etl.py +""" + +import json +import sys +from functools import partial + +import boto3 +from changelog.changelog_precommit import PATH_TO_ROOT +from etl_utils.constants import CHANGELOG_NUMBER, WorkerKey +from etl_utils.io import EtlEncoder, pkl_load_lz4 +from mypy_boto3_s3 import S3Client + +from test_helpers.aws_session import aws_session +from test_helpers.terraform import read_terraform_output + + +def _get_object(s3_client: S3Client, bucket, key): + try: + response = s3_client.get_object(Bucket=bucket, Key=key) + return response["Body"] + except: + return None + + +def _ldif_head(s3_client: S3Client, bucket, key): + head = None + body = _get_object(s3_client=s3_client, bucket=bucket, key=key) + if body: + head, *_ = body.read().split(b"\n\n") + return head + + +def _pkl_loads_head(s3_client: S3Client, bucket, key): + head = None + body = _get_object(s3_client=s3_client, bucket=bucket, key=key) + if body: + items = pkl_load_lz4(body) + if items: + head = items[0].decode() + return head + + +def main(workspace): + etl_bucket = ( + f"nhse-cpm--{workspace}--sds--etl" + if workspace + else read_terraform_output("sds_etl.value.bucket") + ) + + with aws_session(): + s3_client = boto3.client("s3") + get_object = partial(_get_object, s3_client=s3_client) + ldif_head = partial(_ldif_head, s3_client=s3_client) + pkl_loads_head = partial(_pkl_loads_head, s3_client=s3_client) + + body = get_object(bucket=etl_bucket, key=CHANGELOG_NUMBER) + changelog_number = body.read().decode() if body else body + + extract_head = ldif_head(bucket=etl_bucket, key=WorkerKey.EXTRACT) + transform_head = pkl_loads_head(bucket=etl_bucket, key=WorkerKey.TRANSFORM) + load_head = pkl_loads_head(bucket=etl_bucket, key=WorkerKey.LOAD) + + path = PATH_TO_ROOT / ".downloads" / "etl-head.json" + with open(path, "w") as f: + json.dump( + fp=f, + obj={ + "changelog_number": changelog_number, + "extract": extract_head.decode(), + "transform": transform_head, + "load": load_head, + }, + cls=EtlEncoder, + indent=2, + ) + print("Written to", path) # noqa + + +if __name__ == "__main__": + _, workspace = sys.argv + main(workspace) diff --git a/scripts/etl/ldif_cleanup.py b/scripts/etl/ldif_cleanup.py new file mode 100644 index 000000000..51386568d --- /dev/null +++ b/scripts/etl/ldif_cleanup.py @@ -0,0 +1,110 @@ +""" +A script that filters and removes objects from an LDIF that won't pass the ETL. +Useful for cleaning up the INT bulk data. + +This script isn't particularly intended to have a long life. General instructions are to either do: + + ldif_cleanup("local/path/to/file.ldif") + +or (slower) + + ldif_cleanup("s3://path/to/file.ldif", boto3.client("s3")) + +and it will save a file in your root dir called 'good.ldif' that has only good +nhsMhs and nhsAccreditedSystem objects in them, i.e. those which are guaranteed +to be successfully processed by the ETL. + +It's not particularly optimised, either timewise or memorywise, so: +* you better have some ok memory on your laptop, +* it'll take a few minutes + +Oh, and you'll need to install 'tqdm', thanks! +""" + +import json +from collections import deque +from io import BytesIO +from pathlib import Path + +from domain.core.load_questionnaire import render_questionnaire +from domain.core.questionnaires import QuestionnaireInstance +from etl_utils.io import EtlEncoder +from etl_utils.ldif.ldif import filter_ldif_from_s3_by_property, parse_ldif +from etl_utils.worker.action import apply_action +from event.json import json_loads +from mypy_boto3_s3 import S3Client +from sds.cpm_translation import translate +from sds.domain.constants import FILTER_TERMS +from sds.domain.parse import parse_sds_record +from tqdm import tqdm + + +def ldif_cleanup(s3_input_path: str, s3_client: S3Client) -> Path: + filtered_ldif = filter_ldif_from_s3_by_property( + s3_path=s3_input_path, filter_terms=FILTER_TERMS, s3_client=s3_client + ) + + spine_device_questionnaire = render_questionnaire( + questionnaire_name=QuestionnaireInstance.SPINE_DEVICE, questionnaire_version=1 + ) + spine_endpoint_questionnaire = render_questionnaire( + questionnaire_name=QuestionnaireInstance.SPINE_ENDPOINT, questionnaire_version=1 + ) + + _spine_device_questionnaire = spine_device_questionnaire.dict() + _spine_endpoint_questionnaire = spine_endpoint_questionnaire.dict() + + all_keys = set() + + good_rows = [] + for row in tqdm(filtered_ldif.tobytes().split(b"\n\n")): + unprocessed_records = deque(parse_ldif(file_opener=BytesIO, path_or_data=row)) + processed_records = deque() + + exception = apply_action( + unprocessed_records=unprocessed_records, + processed_records=processed_records, + action=lambda record: parse_sds_record(*record).dict(), + record_serializer=lambda dn_and_record: json_loads( + json.dumps(dn_and_record[1], cls=EtlEncoder) + ), + ) + if exception: + raise exception + + processed_transform_records = deque() + exception = apply_action( + unprocessed_records=processed_records, + processed_records=processed_transform_records, + action=lambda record: translate( + obj=record, + spine_device_questionnaire=spine_device_questionnaire, + _spine_device_questionnaire=_spine_device_questionnaire, + spine_endpoint_questionnaire=spine_endpoint_questionnaire, + _spine_endpoint_questionnaire=_spine_endpoint_questionnaire, + _trust=True, + repository=None, + ), + ) + if exception: + continue + + is_duplicate = False + for transform_events in processed_transform_records: + ((_, event),) = transform_events.items() + device_key = event.get("key") + if device_key: + is_duplicate = device_key in all_keys + all_keys.add(device_key) + if is_duplicate: + break + + if not is_duplicate: + good_rows.append(row) + + with open("good.ldif", "w") as f: + f.write(b"\n\n".join(good_rows).decode()) + + +if __name__ == "__main__": + ldif_cleanup(s3_input_path="~/Downloads/538684.ldif", s3_client=None) diff --git a/src/conftest.py b/src/conftest.py index bf6271167..59da9ffb3 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -11,7 +11,7 @@ log_capture_global_fixture as log_capture_global, ) from nhs_context_logging.formatters import json_serializer -from pytest import Config, FixtureRequest, Item, fixture +from pytest import Config, FixtureRequest, Item, Parser, fixture from test_helpers.aws_session import aws_session from test_helpers.constants import PROJECT_ROOT @@ -19,6 +19,10 @@ from test_helpers.terraform import read_terraform_output +def pytest_addoption(parser: Parser): + parser.addoption("--suppress-logs", action="store", default=False) + + def is_integration(request: FixtureRequest) -> bool: return request.node.get_closest_marker("integration") is not None @@ -65,9 +69,14 @@ def pytest_collection_modifyitems(items: list[Item], config: Config): @fixture(autouse=True) -def log_on_failure(request: FixtureRequest, log_capture): +def log_on_failure(pytestconfig: Config, request: FixtureRequest, log_capture): setup_logger(request.node.name) + if pytestconfig.getoption("suppress_logs") is not False: + from nhs_context_logging import app_logger + + app_logger.log = lambda *args, **kwargs: None + exception = None try: yield @@ -76,7 +85,8 @@ def log_on_failure(request: FixtureRequest, log_capture): std_out, std_err = log_capture for log in (*std_out, *std_err): - print(json.dumps(log, indent=2, default=json_serializer)) # noqa: T201 + if pytestconfig.getoption("suppress_logs") is False: + print(json.dumps(log, indent=2, default=json_serializer)) # noqa: T201 if isinstance(exception, Exception): raise exception diff --git a/src/etl/sds/tests/changelog/changelog_components/add/accredited_system.ldif b/src/etl/sds/tests/changelog/changelog_components/add/accredited_system.ldif new file mode 100644 index 000000000..138b3f622 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/add/accredited_system.ldif @@ -0,0 +1,16 @@ +dn: uniqueIdentifier=000173655515,ou=Services,o=nhs +changeType: add +objectClass: nhsAS +objectClass: top +nhsApproverURP: uniqueIdentifier=102583034545,uniqueIdentifier=352307522545,uid=432776896545,ou=People,o=nhs +nhsAsClient: K83015 +nhsAsSvcIA: urn:nhs:names:services:pdsquery:QUQI_IN010000UK14 +nhsDateApproved: 20091016133823 +nhsDateRequested: 20091016133757 +nhsIDCode: K83015 +nhsMHSPartyKey: K83015-806782 +nhsProductKey: 6216 +nhsProductName: TPP SystmOne +nhsRequestorURP: uniqueIdentifier=203171972540,uniqueIdentifier=352307522545,uid=432776896545,ou=People,o=nhs +nhsTempUid: 10312 +uniqueIdentifier: 000173655515 diff --git a/src/etl/sds/tests/changelog/changelog_components/add/message_handling_system.AnotherWithSameUniqueIdentifier.ldif b/src/etl/sds/tests/changelog/changelog_components/add/message_handling_system.AnotherWithSameUniqueIdentifier.ldif new file mode 100644 index 000000000..d5d403ee9 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/add/message_handling_system.AnotherWithSameUniqueIdentifier.ldif @@ -0,0 +1,24 @@ +dn: uniqueIdentifier=00000a84594b2ef34279,ou=Services,o=nhs +changeType: add +objectClass: nhsMhs +objectClass: top +nhsApproverURP: uniqueidentifier=555050304105,uniqueidentifier=555008548101,uid=555008545108,ou=people, o=nhs +nhsContractPropertyTemplateKey: 14 +nhsDateApproved: 20231030092939 +nhsDateDNSApproved: 20231030092939 +nhsDateRequested: 20231030092906 +nhsDNSApprover: uniqueidentifier=555050304105,uniqueidentifier=555008548101,uid=555008545108,ou=people, o=nhs +nhsEPInteractionType: FHIR +nhsIDCode: C3O9X +nhsMhsCPAId: 00000a84594b2ef34279 +nhsMHSEndPoint: https://test.C3O9X.nhs.uk/ +nhsMhsFQDN: test.C3O9X.nhs.uk +nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V001 +nhsMHSIsAuthenticated: none +nhsMHSPartyKey: C3O9X-123456 +nhsMHsSN: urn:nhs:names:services:ers +nhsMhsSvcIA: urn:nhs:names:services:ers:ABCDEFG +nhsProductKey: 11929 +nhsProductVersion: Mar2023 +nhsRequestorURP: uniqueidentifier=555050304105,uniqueidentifier=555008548101,uid=555008545108,ou=people, o=nhs +uniqueIdentifier: 00000a84594b2ef34279 diff --git a/src/etl/sds/tests/changelog/changelog_components/add/message_handling_system.ldif b/src/etl/sds/tests/changelog/changelog_components/add/message_handling_system.ldif new file mode 100644 index 000000000..694db4354 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/add/message_handling_system.ldif @@ -0,0 +1,24 @@ +dn: uniqueIdentifier=00000a84594b2ef34279,ou=Services,o=nhs +changeType: add +objectClass: nhsMhs +objectClass: top +nhsApproverURP: uniqueidentifier=555050304105,uniqueidentifier=555008548101,uid=555008545108,ou=people, o=nhs +nhsContractPropertyTemplateKey: 14 +nhsDateApproved: 20231030092939 +nhsDateDNSApproved: 20231030092939 +nhsDateRequested: 20231030092906 +nhsDNSApprover: uniqueidentifier=555050304105,uniqueidentifier=555008548101,uid=555008545108,ou=people, o=nhs +nhsEPInteractionType: FHIR +nhsIDCode: C3O9X +nhsMhsCPAId: 00000a84594b2ef34279 +nhsMHSEndPoint: https://test.C3O9X.nhs.uk/ +nhsMhsFQDN: test.C3O9X.nhs.uk +nhsMHsIN: READ_PRACTITIONER_ROLE_R4_V001 +nhsMHSIsAuthenticated: none +nhsMHSPartyKey: C3O9X-823610 +nhsMHsSN: urn:nhs:names:services:ers +nhsMhsSvcIA: urn:nhs:names:services:ers:READ_PRACTITIONER_ROLE_R4_V001 +nhsProductKey: 11929 +nhsProductVersion: Mar2023 +nhsRequestorURP: uniqueidentifier=555050304105,uniqueidentifier=555008548101,uid=555008545108,ou=people, o=nhs +uniqueIdentifier: 00000a84594b2ef34279 diff --git a/src/etl/sds/tests/changelog/changelog_components/delete/accredited_system.ldif b/src/etl/sds/tests/changelog/changelog_components/delete/accredited_system.ldif new file mode 100644 index 000000000..fd9ac7175 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/delete/accredited_system.ldif @@ -0,0 +1,5 @@ +dn: uniqueIdentifier=000173655515,ou=Services,o=nhs +objectClass: delete +objectClass: top +changeType: delete +uniqueIdentifier: 000173655515 diff --git a/src/etl/sds/tests/changelog/changelog_components/delete/message_handling_system.ldif b/src/etl/sds/tests/changelog/changelog_components/delete/message_handling_system.ldif new file mode 100644 index 000000000..dcae648e2 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/delete/message_handling_system.ldif @@ -0,0 +1,5 @@ +dn: uniqueIdentifier=00000a84594b2ef34279,ou=Services,o=nhs +objectClass: delete +objectClass: top +changeType: delete +uniqueIdentifier: 00000a84594b2ef34279 diff --git a/src/etl/sds/tests/changelog/changelog_components/delete/unknown_entity.ldif b/src/etl/sds/tests/changelog/changelog_components/delete/unknown_entity.ldif new file mode 100644 index 000000000..aa14cffbe --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/delete/unknown_entity.ldif @@ -0,0 +1,5 @@ +dn: uniqueIdentifier=1234,ou=Services,o=nhs +objectClass: delete +objectClass: top +changeType: delete +uniqueIdentifier: 1234 diff --git a/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_approver_urp.ldif b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_approver_urp.ldif new file mode 100644 index 000000000..466b457f0 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_approver_urp.ldif @@ -0,0 +1,2 @@ +add: nhsApproverURP +nhsApproverURP: fooApprover diff --git a/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.AlreadyExists.ldif b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.AlreadyExists.ldif new file mode 100644 index 000000000..995f19887 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.AlreadyExists.ldif @@ -0,0 +1,2 @@ +add: nhsAsClient +nhsAsClient: K83015 diff --git a/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.Another.ldif b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.Another.ldif new file mode 100644 index 000000000..250c1536f --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.Another.ldif @@ -0,0 +1,2 @@ +add: nhsAsClient +nhsAsClient: CCC diff --git a/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.ldif b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.ldif new file mode 100644 index 000000000..c35b014d1 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_client.ldif @@ -0,0 +1,2 @@ +add: nhsAsClient +nhsAsClient: AAA diff --git a/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_svc_ia.ldif b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_svc_ia.ldif new file mode 100644 index 000000000..a492cb974 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_as_svc_ia.ldif @@ -0,0 +1,2 @@ +add: nhsAsSvcIA +nhsAsSvcIA: urn:nhs:names:services:pdsquery:123456 diff --git a/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_product_name.ldif b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_product_name.ldif new file mode 100644 index 000000000..e2b07c195 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/add/nhs_product_name.ldif @@ -0,0 +1,2 @@ +add: nhsProductName +nhsProductName: fooProduct diff --git a/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/base.ldif b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/base.ldif new file mode 100644 index 000000000..0472699e1 --- /dev/null +++ b/src/etl/sds/tests/changelog/changelog_components/modify/accredited_system/base.ldif @@ -0,0 +1,6 @@ +dn: uniqueIdentifier=000173655515,ou=Services,o=nhs +changeType: modify +<