From c36ddf0b2d2553877c12ee10437b24f7ee9fb332 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Wed, 12 Apr 2023 13:26:47 -0400 Subject: [PATCH 1/6] Study period detection --- pyproject.toml | 1 - src/handlers/shared/awswrangler_functions.py | 31 +++++++ src/handlers/shared/enums.py | 5 +- src/handlers/shared/functions.py | 62 ++++++++++---- src/handlers/site_upload/powerset_merge.py | 17 +--- src/handlers/site_upload/process_upload.py | 27 ++++--- src/handlers/site_upload/study_period.py | 74 +++++++++++++++++ template.yaml | 47 +++++++++-- tests/conftest.py | 7 +- tests/site_upload/test_powerset_merge.py | 1 + tests/site_upload/test_process_upload.py | 13 ++- tests/site_upload/test_study_period.py | 81 +++++++++++++++++++ tests/test_data/meta_date.parquet | Bin 0 -> 1984 bytes tests/utils.py | 41 +++++++--- 14 files changed, 343 insertions(+), 64 deletions(-) create mode 100644 src/handlers/shared/awswrangler_functions.py create mode 100644 src/handlers/site_upload/study_period.py create mode 100644 tests/site_upload/test_study_period.py create mode 100644 tests/test_data/meta_date.parquet diff --git a/pyproject.toml b/pyproject.toml index ad4ee08..4d8a11f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,5 +47,4 @@ dev = [ "pre-commit", "pylint", "pycodestyle" - ] diff --git a/src/handlers/shared/awswrangler_functions.py b/src/handlers/shared/awswrangler_functions.py new file mode 100644 index 0000000..c3cb894 --- /dev/null +++ b/src/handlers/shared/awswrangler_functions.py @@ -0,0 +1,31 @@ +""" functions specifically requiring AWSWranger, which requires a lambda layer""" +import awswrangler + + +def get_s3_data_package_list( + bucket_root: str, + s3_bucket_name: str, + study: str, + data_package: str, + extension: str = "parquet", +): + """Retrieves a list of data packages for a given S3 path post-upload proceesing""" + return awswrangler.s3.list_objects( + path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}", + suffix=extension, + ) + + +def get_s3_study_meta_list( + bucket_root: str, + s3_bucket_name: str, + study: str, + data_package: str, + site: str, + extension: str = "parquet", +): + """Retrieves a list of data packages for a given S3 path post-upload proceesing""" + return awswrangler.s3.list_objects( + path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}/{site}", + suffix=extension, + ) diff --git a/src/handlers/shared/enums.py b/src/handlers/shared/enums.py index 42c008c..b514d55 100644 --- a/src/handlers/shared/enums.py +++ b/src/handlers/shared/enums.py @@ -3,6 +3,8 @@ class BucketPath(Enum): + """stores root level buckets for managing data processing state""" + ADMIN = "admin" AGGREGATE = "aggregates" ARCHIVE = "archive" @@ -10,5 +12,6 @@ class BucketPath(Enum): ERROR = "error" LAST_VALID = "last_valid" LATEST = "latest" - META = "site_metadata" + META = "metadata" + STUDY_META = "study_metadata" UPLOAD = "site_upload" diff --git a/src/handlers/shared/functions.py b/src/handlers/shared/functions.py index 3d23626..59af3f6 100644 --- a/src/handlers/shared/functions.py +++ b/src/handlers/shared/functions.py @@ -10,17 +10,20 @@ from src.handlers.shared.enums import BucketPath -META_PATH = f"{BucketPath.META.value}/transactions.json" -METADATA_TEMPLATE = { +TRANSACTION_METADATA_TEMPLATE = { "version": "1.0", "last_upload": None, "last_data_update": None, "last_aggregation": None, "last_error": None, - "earliest_data": None, - "latest_data": None, "deleted": None, } +STUDY_PERIOD_METADATA_TEMPLATE = { + "version": "1.0", + "earliest_date": None, + "latest_date": None, + "last_data_update": None, +} def http_response(status: int, body: str) -> Dict: @@ -36,12 +39,24 @@ def http_response(status: int, body: str) -> Dict: # metadata processing -def read_metadata(s3_client, s3_bucket_name: str) -> Dict: +def check_meta_type(meta_type: str) -> None: + """helper for ensuring specified metadata types""" + types = ["transactions", "study_periods"] + if meta_type not in types: + raise ValueError("invalid metadata type specified") + + +def read_metadata( + s3_client, s3_bucket_name: str, meta_type: str = "transactions" +) -> Dict: """Reads transaction information from an s3 bucket as a dictionary""" - res = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=META_PATH) + check_meta_type(meta_type) + prefix = f"{BucketPath.META.value}/{meta_type}.json" + res = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=prefix) if "Contents" in res: - res = s3_client.get_object(Bucket=s3_bucket_name, Key=META_PATH) - return json.loads(res["Body"].read()) + res = s3_client.get_object(Bucket=s3_bucket_name, Key=prefix) + doc = res["Body"].read() + return json.loads(doc) else: return {} @@ -53,20 +68,37 @@ def update_metadata( data_package: str, target: str, dt: Optional[datetime] = None, + meta_type: str = "transactions", ): """Safely updates items in metadata dictionary""" - site_metadata = metadata.setdefault(site, {}) - study_metadata = site_metadata.setdefault(study, {}) - data_package_metadata = study_metadata.setdefault(data_package, METADATA_TEMPLATE) - dt = dt or datetime.now(timezone.utc) - data_package_metadata[target] = dt.isoformat() + check_meta_type(meta_type) + if meta_type == "transactions": + site_metadata = metadata.setdefault(site, {}) + study_metadata = site_metadata.setdefault(study, {}) + data_package_metadata = study_metadata.setdefault( + data_package, TRANSACTION_METADATA_TEMPLATE + ) + dt = dt or datetime.now(timezone.utc) + data_package_metadata[target] = dt.isoformat() + elif meta_type == "study_periods": + site_metadata = site_metadata.setdefault(site, {}) + study_period_metadata = site_metadata.setdefault( + study, STUDY_PERIOD_METADATA_TEMPLATE + ) + dt = dt or datetime.now(timezone.utc) + study_period_metadata[target] = dt.isoformat() return metadata -def write_metadata(s3_client, s3_bucket_name: str, metadata: Dict) -> None: +def write_metadata( + s3_client, s3_bucket_name: str, metadata: Dict, meta_type: str = "transactions" +) -> None: """Writes transaction info from ∏a dictionary to an s3 bucket metadata location""" + check_meta_type(meta_type) s3_client.put_object( - Bucket=s3_bucket_name, Key=META_PATH, Body=json.dumps(metadata) + Bucket=s3_bucket_name, + Key=f"{BucketPath.META.value}/{meta_type}.json", + Body=json.dumps(metadata), ) diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index ca59b1e..cc2713e 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -13,6 +13,7 @@ from src.handlers.shared.decorators import generic_error_handler from src.handlers.shared.enums import BucketPath +from src.handlers.shared.awswrangler_functions import get_s3_data_package_list from src.handlers.shared.functions import ( get_s3_site_filename_suffix, http_response, @@ -23,22 +24,6 @@ ) -def get_s3_data_package_list( - bucket_root: str, - s3_bucket_name: str, - study: str, - data_package: str, - extension: str = "parquet", -): - """Retrieves a list of all data packages for a given S3 path""" - # TODO: this may need to be moved to a shared location at some point - it would - # need to be a new one for just AWSWrangler-enabled lambdas. - return awswrangler.s3.list_objects( - path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}", - suffix=extension, - ) - - def concat_sets(df: pandas.DataFrame, file_path: str) -> pandas.DataFrame: """concats a count dataset in a specified S3 location with in memory dataframe""" site_df = awswrangler.s3.read_parquet(file_path) diff --git a/src/handlers/site_upload/process_upload.py b/src/handlers/site_upload/process_upload.py index a03e245..36eb529 100644 --- a/src/handlers/site_upload/process_upload.py +++ b/src/handlers/site_upload/process_upload.py @@ -18,10 +18,8 @@ class UnexpectedFileTypeError(Exception): pass -def process_upload( - s3_client, sns_client, s3_bucket_name: str, s3_key: str, topic_sns_arn: str -) -> None: - """Moves file from upload path to powerset generation path""" +def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> None: + """Moves file from upload path to appropriate subfolder and emits SNS event""" last_uploaded_date = s3_client.head_object(Bucket=s3_bucket_name, Key=s3_key)[ "LastModified" ] @@ -31,19 +29,25 @@ def process_upload( data_package = path_params[2] site = path_params[3] if s3_key.endswith(".parquet"): - new_key = f"{BucketPath.LATEST.value}/{s3_key.split('/', 1)[-1]}" + print(s3_key) + if "_meta_" in s3_key: + new_key = f"{BucketPath.STUDY_META.value}/{s3_key.split('/', 1)[-1]}" + topic_sns_arn = os.environ.get("TOPIC_PROCESS_STUDY_META_ARN") + sns_subject = "Process study medata upload event" + else: + new_key = f"{BucketPath.LATEST.value}/{s3_key.split('/', 1)[-1]}" + topic_sns_arn = os.environ.get("TOPIC_PROCESS_COUNTS_ARN") + sns_subject = "Process counts upload event" move_s3_file(s3_client, s3_bucket_name, s3_key, new_key) metadata = update_metadata( metadata, site, study, data_package, - "last_uploaded_date", + "last_upload", last_uploaded_date, ) - sns_client.publish( - TopicArn=topic_sns_arn, Message=new_key, Subject="Process Upload Event" - ) + sns_client.publish(TopicArn=topic_sns_arn, Message=new_key, Subject=sns_subject) write_metadata(s3_client, s3_bucket_name, metadata) else: new_key = f"{BucketPath.ERROR.value}/{s3_key.split('/', 1)[-1]}" @@ -53,7 +57,7 @@ def process_upload( site, study, data_package, - "last_uploaded_date", + "last_upload", last_uploaded_date, ) metadata = update_metadata( @@ -68,10 +72,9 @@ def process_upload_handler(event, context): """manages event from S3, triggers file processing and merge""" del context s3_bucket = os.environ.get("BUCKET_NAME") - topic_sns_arn = os.environ.get("TOPIC_PROCESS_UPLOAD_ARN") s3_client = boto3.client("s3") sns_client = boto3.client("sns") s3_key = event["Records"][0]["s3"]["object"]["key"] - process_upload(s3_client, sns_client, s3_bucket, s3_key, topic_sns_arn) + process_upload(s3_client, sns_client, s3_bucket, s3_key) res = http_response(200, "Upload processing successful") return res diff --git a/src/handlers/site_upload/study_period.py b/src/handlers/site_upload/study_period.py new file mode 100644 index 0000000..2026d47 --- /dev/null +++ b/src/handlers/site_upload/study_period.py @@ -0,0 +1,74 @@ +""" Lambda for performing joins of site count data """ + +import os + +from datetime import datetime, timezone + +import awswrangler +import boto3 + +from src.handlers.shared.decorators import generic_error_handler +from src.handlers.shared.enums import BucketPath +from src.handlers.shared.awswrangler_functions import get_s3_study_meta_list +from src.handlers.shared.functions import ( + http_response, + read_metadata, + update_metadata, + write_metadata, +) + + +def update_study_period(s3_client, s3_bucket, site, study, data_package): + """gets earliest/latest date from study metadata files""" + path = get_s3_study_meta_list( + BucketPath.STUDY_META.value, s3_bucket, study, data_package, site + ) + if len(path) != 1: + raise KeyError("Unique date path not found") + df = awswrangler.s3.read_parquet(path[0]) + study_meta = read_metadata(s3_client, s3_bucket, meta_type="study_periods") + study_meta = update_metadata( + study_meta, + site, + study, + data_package, + "earliest_date", + df["min_date"][0], + meta_type="study_periods", + ) + study_meta = update_metadata( + study_meta, + site, + study, + data_package, + "latest_date", + df["max_date"][0], + meta_type="study_periods", + ) + study_meta = update_metadata( + study_meta, + site, + study, + data_package, + "last_data_update", + datetime.now(timezone.utc), + meta_type="study_periods", + ) + write_metadata(s3_client, s3_bucket, study_meta, meta_type="study_periods") + + +@generic_error_handler(msg="Error updating study period") +def study_period_handler(event, context): + """manages event from SNS, triggers file processing and merge""" + del context + s3_bucket = os.environ.get("BUCKET_NAME") + s3_client = boto3.client("s3") + s3_key = event["Records"][0]["Sns"]["Message"] + s3_key_array = s3_key.split("/") + site = s3_key_array[3] + study = s3_key_array[1] + data_package = s3_key_array[2] + + update_study_period(s3_client, s3_bucket, site, study, data_package) + res = http_response(200, "Study period update successful") + return res diff --git a/template.yaml b/template.yaml index 9592ed0..f46938d 100644 --- a/template.yaml +++ b/template.yaml @@ -83,16 +83,19 @@ Resources: Runtime: python3.9 MemorySize: 128 Timeout: 800 - Description: Merges and aggregates powerset count data + Description: Handles initial relocation of upload data Environment: Variables: BUCKET_NAME: !Sub '${BucketNameParameter}-${DeployStage}' - TOPIC_PROCESS_UPLOAD_ARN: !Ref SNSTopicProcessUpload + TOPIC_PROCESS_COUNTS_ARN: !Ref SNSTopicProcessCounts + TOPIC_PROCESS_STUDY_META_ARN: !Ref SNSTopicProcessStudyMeta Policies: - S3CrudPolicy: BucketName: !Sub '${BucketNameParameter}-${DeployStage}' - SNSPublishMessagePolicy: - TopicName: !GetAtt SNSTopicProcessUpload.TopicName + TopicName: !GetAtt SNSTopicProcessCounts.TopicName + - SNSPublishMessagePolicy: + TopicName: !GetAtt SNSTopicProcessStudyMeta.TopicName PowersetMergeFunction: Type: AWS::Serverless::Function @@ -111,7 +114,29 @@ Resources: ProcessUploadSNSEvent: Type: SNS Properties: - Topic: !Ref SNSTopicProcessUpload + Topic: !Ref SNSTopicProcessCounts + Policies: + - S3CrudPolicy: + BucketName: !Sub '${BucketNameParameter}-${DeployStage}' + + StudyPeriodFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub 'CumulusAggStudyPeriod-${DeployStage}' + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python39:1] + Handler: src/handlers/site_upload/study_period.study_period_handler + Runtime: python3.9 + MemorySize: 512 + Timeout: 800 + Description: Handles metadata outside of upload/processing for studies + Environment: + Variables: + BUCKET_NAME: !Sub '${BucketNameParameter}-${DeployStage}' + Events: + ProcessUploadSNSEvent: + Type: SNS + Properties: + Topic: !Ref SNSTopicProcessStudyMeta Policies: - S3CrudPolicy: BucketName: !Sub '${BucketNameParameter}-${DeployStage}' @@ -265,13 +290,21 @@ Resources: SourceAccount: !Ref AWS::AccountId ### SNS topics - SNSTopicProcessUpload: + SNSTopicProcessCounts: + Type: AWS::SNS::Topic + Properties: + TopicName: !Sub 'CumulusProcessCounts-${DeployStage}' + Tags: + - Key: Name + Value: !Sub 'CumulusProcessCounts-${DeployStage}' + + SNSTopicProcessStudyMeta: Type: AWS::SNS::Topic Properties: - TopicName: !Sub 'CumulusProcessUpload-${DeployStage}' + TopicName: !Sub 'CumulusProcessStudyMeta-${DeployStage}' Tags: - Key: Name - Value: !Sub 'CumulusProcessUpload-${DeployStage}' + Value: !Sub 'CumulusProcessStudyMeta-${DeployStage}' ### S3 Buckets diff --git a/tests/conftest.py b/tests/conftest.py index db799bd..41c25dd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,7 +12,7 @@ from scripts.credential_management import create_auth, create_meta from src.handlers.shared.enums import BucketPath from src.handlers.shared.functions import write_metadata -from tests.utils import get_mock_metadata, ITEM_COUNT, MOCK_ENV +from tests.utils import get_mock_metadata, get_mock_study_metadata, ITEM_COUNT, MOCK_ENV def _init_mock_data(s3_client, bucket_name, site, study, data_package): @@ -60,6 +60,8 @@ def mock_bucket(): _init_mock_data(s3_client, bucket, *param_list) metadata = get_mock_metadata() write_metadata(s3_client, bucket, metadata) + study_metadata = get_mock_study_metadata() + write_metadata(s3_client, bucket, study_metadata, meta_type="study_periods") yield s3.stop() @@ -69,7 +71,8 @@ def mock_notification(): sns = mock_sns() sns.start() sns_client = boto3.client("sns", region_name="us-east-1") - sns_client.create_topic(Name="test-upload") + sns_client.create_topic(Name="test-counts") + sns_client.create_topic(Name="test-meta") yield sns.stop() diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index 88390c6..9a47160 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -113,6 +113,7 @@ def test_process_upload( or item["Key"].startswith(BucketPath.ARCHIVE.value) is True or item["Key"].startswith(BucketPath.ERROR.value) is True or item["Key"].startswith(BucketPath.ADMIN.value) is True + or item["Key"].endswith("study_periods.json") is True ) if archives: keys = [] diff --git a/tests/site_upload/test_process_upload.py b/tests/site_upload/test_process_upload.py index 79079d6..52fcb99 100644 --- a/tests/site_upload/test_process_upload.py +++ b/tests/site_upload/test_process_upload.py @@ -59,6 +59,14 @@ 500, ITEM_COUNT, ), + ( # Adding study metadata data package + "general_hospital", + "./tests/test_data/cube_simple_example.parquet", + "/covid/encounter/general_hospital/document_meta_date.parquet", + "/covid/encounter/general_hospital/document_meta_date.parquet", + 200, + ITEM_COUNT + 1, + ), ], ) def test_process_upload( @@ -100,13 +108,16 @@ def test_process_upload( metadata = read_metadata(s3_client, TEST_BUCKET) if upload_file is not None: assert ( - metadata[site]["covid"]["encounter"]["last_uploaded_date"] + metadata[site]["covid"]["encounter"]["last_upload"] == "2020-01-01T00:00:00+00:00" ) + elif item["Key"].startswith(BucketPath.STUDY_META.value): + assert "_meta_" in item["Key"] else: assert ( item["Key"].startswith(BucketPath.LATEST.value) is True or item["Key"].startswith(BucketPath.LAST_VALID.value) is True or item["Key"].startswith(BucketPath.ERROR.value) is True or item["Key"].startswith(BucketPath.ADMIN.value) is True + or item["Key"].endswith("study_periods.json") is True ) diff --git a/tests/site_upload/test_study_period.py b/tests/site_upload/test_study_period.py new file mode 100644 index 0000000..19a1b7a --- /dev/null +++ b/tests/site_upload/test_study_period.py @@ -0,0 +1,81 @@ +import boto3 +import os + +import pytest +from datetime import datetime +from freezegun import freeze_time +from unittest import mock + +from src.handlers.shared.enums import BucketPath +from src.handlers.shared.functions import read_metadata, write_metadata +from src.handlers.site_upload.study_period import study_period_handler + +from tests.utils import get_mock_study_metadata, TEST_BUCKET + + +@freeze_time("2020-01-01") +@mock.patch("src.handlers.site_upload.study_period.datetime") +@pytest.mark.parametrize( + "site,upload_file,upload_path,event_key,status,study_key", + [ + ( # Adding a new study to an existing site + "general_hospital", + "./tests/test_data/meta_date.parquet", + "/test/test_meta_date/general_hospital/test_meta_date.parquet", + "/test/test_meta_date/general_hospital/test_meta_date.parquet", + 200, + "test", + ), + ( # Adding a new study to a new site + "chicago_hope", + "./tests/test_data/meta_date.parquet", + "/test/test_meta_date/chicago_hope/test_meta_date.parquet", + "/test/test_meta_date/chicago_hope/test_meta_date.parquet", + 200, + "test", + ), + ( # updating an existing study + "general_hospital", + "./tests/test_data/meta_date.parquet", + "/covid/test_meta_date/general_hospital/test_meta_date.parquet", + "/covid/test_meta_date/general_hospital/test_meta_date.parquet", + 200, + "covid", + ), + ( # invalid file + "general_hospital", + "./tests/test_data/meta_date.parquet", + None, + "/covid/test_meta_date/general_hospital/wrong.parquet", + 500, + None, + ), + ], +) +def test_process_upload( + mock_dt, + site, + upload_file, + upload_path, + event_key, + status, + study_key, + mock_bucket, +): + mock_dt.now = mock.Mock(return_value=datetime(2020, 1, 1)) + s3_client = boto3.client("s3", region_name="us-east-1") + if upload_file is not None: + s3_client.upload_file( + upload_file, + TEST_BUCKET, + f"{BucketPath.STUDY_META.value}{upload_path}", + ) + event = { + "Records": [{"Sns": {"Message": f"{BucketPath.STUDY_META.value}{event_key}"}}] + } + res = study_period_handler(event, {}) + assert res["statusCode"] == status + metadata = read_metadata(s3_client, TEST_BUCKET, meta_type="study_periods") + if study_key is not None: + assert study_key in metadata[site] + assert metadata[site][study_key]["last_data_update"] == "2020-01-01T00:00:00" diff --git a/tests/test_data/meta_date.parquet b/tests/test_data/meta_date.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f5eb5cc2e19c1645673f078b6c8fa74fcd7cafd8 GIT binary patch literal 1984 zcmcgt&u`*J6dp*5sMJFg#Rw!02x(EJveYJcyR3Q{lh`o}*)1vf$7)4wFxbR4F4zGQ zt+bck_Rr~|haUFWL;r*R6MO8jZ^lkT6ZTN4+Yw;T`)0oPeQ(}~m$=6<`^*7z)M3&L zIz%X2L_hreRkiUMVi0r4>t5yE9`W#i=p*#IJQTj=Q9iRA*_LN8`(Wx(`mS7$jymrS zi&fA?>{29=v5n-?g<_%l=?h-@9}1HD03A}v=HBidAyla5GlAicjb;9)y_>S*kFE7^ zG#xYv83Jc}DP|333I z8TDxkWBeU_$#{-&0}E3uVm!sIDUAujKkyX^p8kz@RsfSZ49Y@r8RHcO0G@=T)h8HP zh~+*!gl}ZejdW?yQ0wPc=Gh=L{jT%E5lr=BHI=<2<~9S<>$#RRxRz)0slu^Kxz(PT z^)+WId>lJoI;JOv`i1-iu+UKWi7tsO%*k5MPzFm~Y(|>Gxr9UZxWIIqmzvshWe0Nk zf(<$B5!YlbRY=zK?6EXp4YG%<@p#Tu1ztVw@gu1ZIw_MWdAAg6OEPiasr;LO?z8|ZHZeNJU!5r^`%6- zL!8?mgrhTeGCp%xNj<2~vE+vO_Zq9z`D-<hS#MB| z?nQ&^%(~8m_KOBhA*?io2d!PwAY@K{;(j?lfhK(lv(x9a$%NbfBP+-t{UwB&YRBhJ zIZs-owE;LGB+$&jr$H7>5|C!4tH{QdO(K*e&PS(I1U3~nO3nbH8itD^k!L)*80AZW jvG_5v!pEZ^cuX&5DY Date: Wed, 12 Apr 2023 13:46:20 -0400 Subject: [PATCH 2/6] overzealous naming cleanup --- src/handlers/shared/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/shared/functions.py b/src/handlers/shared/functions.py index 59af3f6..947703c 100644 --- a/src/handlers/shared/functions.py +++ b/src/handlers/shared/functions.py @@ -81,7 +81,7 @@ def update_metadata( dt = dt or datetime.now(timezone.utc) data_package_metadata[target] = dt.isoformat() elif meta_type == "study_periods": - site_metadata = site_metadata.setdefault(site, {}) + site_metadata = metadata.setdefault(site, {}) study_period_metadata = site_metadata.setdefault( study, STUDY_PERIOD_METADATA_TEMPLATE ) From 6e460791ff8af4587c31bf2881ce765e5f3fc97c Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Thu, 13 Apr 2023 10:49:03 -0400 Subject: [PATCH 3/6] Low hanging PR comment fruit --- src/handlers/shared/functions.py | 6 ++--- src/handlers/site_upload/process_upload.py | 1 - src/handlers/site_upload/study_period.py | 2 +- tests/site_upload/test_powerset_merge.py | 29 ++++++++++------------ tests/site_upload/test_process_upload.py | 24 ++++++++---------- 5 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/handlers/shared/functions.py b/src/handlers/shared/functions.py index 947703c..2e002ad 100644 --- a/src/handlers/shared/functions.py +++ b/src/handlers/shared/functions.py @@ -51,10 +51,10 @@ def read_metadata( ) -> Dict: """Reads transaction information from an s3 bucket as a dictionary""" check_meta_type(meta_type) - prefix = f"{BucketPath.META.value}/{meta_type}.json" - res = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=prefix) + s3_path = f"{BucketPath.META.value}/{meta_type}.json" + res = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=s3_path) if "Contents" in res: - res = s3_client.get_object(Bucket=s3_bucket_name, Key=prefix) + res = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_path) doc = res["Body"].read() return json.loads(doc) else: diff --git a/src/handlers/site_upload/process_upload.py b/src/handlers/site_upload/process_upload.py index 36eb529..0d3e824 100644 --- a/src/handlers/site_upload/process_upload.py +++ b/src/handlers/site_upload/process_upload.py @@ -29,7 +29,6 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N data_package = path_params[2] site = path_params[3] if s3_key.endswith(".parquet"): - print(s3_key) if "_meta_" in s3_key: new_key = f"{BucketPath.STUDY_META.value}/{s3_key.split('/', 1)[-1]}" topic_sns_arn = os.environ.get("TOPIC_PROCESS_STUDY_META_ARN") diff --git a/src/handlers/site_upload/study_period.py b/src/handlers/site_upload/study_period.py index 2026d47..faa7273 100644 --- a/src/handlers/site_upload/study_period.py +++ b/src/handlers/site_upload/study_period.py @@ -1,4 +1,4 @@ -""" Lambda for performing joins of site count data """ +""" Lambda for updating date ranges associated with studies """ import os diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index 9a47160..0c4e7e8 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -2,9 +2,8 @@ import os import pytest -from datetime import datetime +from datetime import datetime, timezone from freezegun import freeze_time -from unittest import mock from src.handlers.shared.enums import BucketPath from src.handlers.shared.functions import read_metadata, write_metadata @@ -14,7 +13,6 @@ @freeze_time("2020-01-01") -@mock.patch("src.handlers.site_upload.powerset_merge.datetime") @pytest.mark.parametrize( "site,upload_file,upload_path,event_key,archives,status,expected_contents", [ @@ -56,8 +54,7 @@ ), ], ) -def test_process_upload( - mock_dt, +def test_powerset_merge( site, upload_file, upload_path, @@ -67,7 +64,6 @@ def test_process_upload( expected_contents, mock_bucket, ): - mock_dt.now = mock.Mock(return_value=datetime(2020, 1, 1)) s3_client = boto3.client("s3", region_name="us-east-1") if upload_file is not None: s3_client.upload_file( @@ -89,16 +85,16 @@ def test_process_upload( assert len(s3_res["Contents"]) == expected_contents for item in s3_res["Contents"]: if item["Key"].endswith("aggregate.parquet"): - assert item["Key"].startswith(BucketPath.AGGREGATE.value) is True + assert item["Key"].startswith(BucketPath.AGGREGATE.value) elif item["Key"].endswith("aggregate.csv"): - assert item["Key"].startswith(BucketPath.CSVAGGREGATE.value) is True + assert item["Key"].startswith(BucketPath.CSVAGGREGATE.value) elif item["Key"].endswith("transactions.json"): - assert item["Key"].startswith(BucketPath.META.value) is True + assert item["Key"].startswith(BucketPath.META.value) metadata = read_metadata(s3_client, TEST_BUCKET) if res["statusCode"] == 200: assert ( metadata[site]["covid"]["encounter"]["last_aggregation"] - == "2020-01-01T00:00:00+00:00" + == datetime.now(timezone.utc).isoformat() ) else: assert ( @@ -109,15 +105,16 @@ def test_process_upload( ) else: assert ( - item["Key"].startswith(BucketPath.LAST_VALID.value) is True - or item["Key"].startswith(BucketPath.ARCHIVE.value) is True - or item["Key"].startswith(BucketPath.ERROR.value) is True - or item["Key"].startswith(BucketPath.ADMIN.value) is True - or item["Key"].endswith("study_periods.json") is True + item["Key"].startswith(BucketPath.LAST_VALID.value) + or item["Key"].startswith(BucketPath.ARCHIVE.value) + or item["Key"].startswith(BucketPath.ERROR.value) + or item["Key"].startswith(BucketPath.ADMIN.value) + or item["Key"].endswith("study_periods.json") ) if archives: keys = [] for resource in s3_res["Contents"]: keys.append(resource["Key"]) - archive_path = ".2020-01-01T00:00:00.".join(upload_path.split(".")) + date_str = datetime.now(timezone.utc).isoformat() + archive_path = f".{date_str}.".join(upload_path.split(".")) assert f"{BucketPath.ARCHIVE.value}{archive_path}" in keys diff --git a/tests/site_upload/test_process_upload.py b/tests/site_upload/test_process_upload.py index 52fcb99..c69488c 100644 --- a/tests/site_upload/test_process_upload.py +++ b/tests/site_upload/test_process_upload.py @@ -3,9 +3,8 @@ import pytest -from datetime import datetime +from datetime import datetime, timezone from freezegun import freeze_time -from unittest import mock from src.handlers.shared.enums import BucketPath from src.handlers.shared.functions import read_metadata, write_metadata @@ -15,7 +14,6 @@ @freeze_time("2020-01-01") -@mock.patch("src.handlers.site_upload.powerset_merge.datetime") @pytest.mark.parametrize( "site,upload_file,upload_path,event_key,status,expected_contents", [ @@ -70,7 +68,6 @@ ], ) def test_process_upload( - mock_dt, site, upload_file, upload_path, @@ -80,7 +77,6 @@ def test_process_upload( mock_bucket, mock_notification, ): - mock_dt.now = mock.Mock(return_value=datetime(2020, 1, 1)) s3_client = boto3.client("s3", region_name="us-east-1") if upload_file is not None: s3_client.upload_file( @@ -100,24 +96,24 @@ def test_process_upload( assert len(s3_res["Contents"]) == expected_contents for item in s3_res["Contents"]: if item["Key"].endswith("aggregate.parquet"): - assert item["Key"].startswith(BucketPath.AGGREGATE.value) is True + assert item["Key"].startswith(BucketPath.AGGREGATE.value) elif item["Key"].endswith("aggregate.csv"): - assert item["Key"].startswith(BucketPath.CSVAGGREGATE.value) is True + assert item["Key"].startswith(BucketPath.CSVAGGREGATE.value) elif item["Key"].endswith("transactions.json"): - assert item["Key"].startswith(BucketPath.META.value) is True + assert item["Key"].startswith(BucketPath.META.value) metadata = read_metadata(s3_client, TEST_BUCKET) if upload_file is not None: assert ( metadata[site]["covid"]["encounter"]["last_upload"] - == "2020-01-01T00:00:00+00:00" + == datetime.now(timezone.utc).isoformat() ) elif item["Key"].startswith(BucketPath.STUDY_META.value): assert "_meta_" in item["Key"] else: assert ( - item["Key"].startswith(BucketPath.LATEST.value) is True - or item["Key"].startswith(BucketPath.LAST_VALID.value) is True - or item["Key"].startswith(BucketPath.ERROR.value) is True - or item["Key"].startswith(BucketPath.ADMIN.value) is True - or item["Key"].endswith("study_periods.json") is True + item["Key"].startswith(BucketPath.LATEST.value) + or item["Key"].startswith(BucketPath.LAST_VALID.value) + or item["Key"].startswith(BucketPath.ERROR.value) + or item["Key"].startswith(BucketPath.ADMIN.value) + or item["Key"].endswith("study_periods.json") ) From 4d6315345b8f82f40adab0bf19e2d5e12cebf6d4 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Thu, 13 Apr 2023 10:50:50 -0400 Subject: [PATCH 4/6] missed one mock datetime --- tests/site_upload/test_study_period.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/site_upload/test_study_period.py b/tests/site_upload/test_study_period.py index 19a1b7a..e654b19 100644 --- a/tests/site_upload/test_study_period.py +++ b/tests/site_upload/test_study_period.py @@ -2,9 +2,8 @@ import os import pytest -from datetime import datetime +from datetime import datetime, timezone from freezegun import freeze_time -from unittest import mock from src.handlers.shared.enums import BucketPath from src.handlers.shared.functions import read_metadata, write_metadata @@ -14,7 +13,6 @@ @freeze_time("2020-01-01") -@mock.patch("src.handlers.site_upload.study_period.datetime") @pytest.mark.parametrize( "site,upload_file,upload_path,event_key,status,study_key", [ @@ -53,7 +51,6 @@ ], ) def test_process_upload( - mock_dt, site, upload_file, upload_path, @@ -62,7 +59,6 @@ def test_process_upload( study_key, mock_bucket, ): - mock_dt.now = mock.Mock(return_value=datetime(2020, 1, 1)) s3_client = boto3.client("s3", region_name="us-east-1") if upload_file is not None: s3_client.upload_file( @@ -78,4 +74,7 @@ def test_process_upload( metadata = read_metadata(s3_client, TEST_BUCKET, meta_type="study_periods") if study_key is not None: assert study_key in metadata[site] - assert metadata[site][study_key]["last_data_update"] == "2020-01-01T00:00:00" + assert ( + metadata[site][study_key]["last_data_update"] + == datetime.now(timezone.utc).isoformat() + ) From ffe468ccde3758ef4da39ee7782c73d0c8c8b3ec Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Thu, 13 Apr 2023 11:02:44 -0400 Subject: [PATCH 5/6] JsonDict enum --- src/handlers/shared/enums.py | 7 +++++++ src/handlers/shared/functions.py | 19 +++++++++++-------- src/handlers/site_upload/study_period.py | 16 ++++++++++------ 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/src/handlers/shared/enums.py b/src/handlers/shared/enums.py index b514d55..3985515 100644 --- a/src/handlers/shared/enums.py +++ b/src/handlers/shared/enums.py @@ -15,3 +15,10 @@ class BucketPath(Enum): META = "metadata" STUDY_META = "study_metadata" UPLOAD = "site_upload" + + +class JsonDict(Enum): + """stores names of expected kinds of S3 json dictionaries""" + + TRANSACTIONS = "transactions" + STUDY_PERIODS = "study_periods" diff --git a/src/handlers/shared/functions.py b/src/handlers/shared/functions.py index 2e002ad..a16bf45 100644 --- a/src/handlers/shared/functions.py +++ b/src/handlers/shared/functions.py @@ -8,7 +8,7 @@ import boto3 -from src.handlers.shared.enums import BucketPath +from src.handlers.shared.enums import BucketPath, JsonDict TRANSACTION_METADATA_TEMPLATE = { "version": "1.0", @@ -36,18 +36,18 @@ def http_response(status: int, body: str) -> Dict: } -# metadata processing +# S3 json processing def check_meta_type(meta_type: str) -> None: """helper for ensuring specified metadata types""" - types = ["transactions", "study_periods"] + types = [item.value for item in JsonDict] if meta_type not in types: raise ValueError("invalid metadata type specified") def read_metadata( - s3_client, s3_bucket_name: str, meta_type: str = "transactions" + s3_client, s3_bucket_name: str, meta_type: str = JsonDict.TRANSACTIONS.value ) -> Dict: """Reads transaction information from an s3 bucket as a dictionary""" check_meta_type(meta_type) @@ -68,11 +68,11 @@ def update_metadata( data_package: str, target: str, dt: Optional[datetime] = None, - meta_type: str = "transactions", + meta_type: str = JsonDict.TRANSACTIONS.value, ): """Safely updates items in metadata dictionary""" check_meta_type(meta_type) - if meta_type == "transactions": + if meta_type == JsonDict.TRANSACTIONS.value: site_metadata = metadata.setdefault(site, {}) study_metadata = site_metadata.setdefault(study, {}) data_package_metadata = study_metadata.setdefault( @@ -80,7 +80,7 @@ def update_metadata( ) dt = dt or datetime.now(timezone.utc) data_package_metadata[target] = dt.isoformat() - elif meta_type == "study_periods": + elif meta_type == JsonDict.STUDY_PERIODS.value: site_metadata = metadata.setdefault(site, {}) study_period_metadata = site_metadata.setdefault( study, STUDY_PERIOD_METADATA_TEMPLATE @@ -91,7 +91,10 @@ def update_metadata( def write_metadata( - s3_client, s3_bucket_name: str, metadata: Dict, meta_type: str = "transactions" + s3_client, + s3_bucket_name: str, + metadata: Dict, + meta_type: str = JsonDict.TRANSACTIONS.value, ) -> None: """Writes transaction info from ∏a dictionary to an s3 bucket metadata location""" check_meta_type(meta_type) diff --git a/src/handlers/site_upload/study_period.py b/src/handlers/site_upload/study_period.py index faa7273..6bfc1e4 100644 --- a/src/handlers/site_upload/study_period.py +++ b/src/handlers/site_upload/study_period.py @@ -8,7 +8,7 @@ import boto3 from src.handlers.shared.decorators import generic_error_handler -from src.handlers.shared.enums import BucketPath +from src.handlers.shared.enums import BucketPath, JsonDict from src.handlers.shared.awswrangler_functions import get_s3_study_meta_list from src.handlers.shared.functions import ( http_response, @@ -26,7 +26,9 @@ def update_study_period(s3_client, s3_bucket, site, study, data_package): if len(path) != 1: raise KeyError("Unique date path not found") df = awswrangler.s3.read_parquet(path[0]) - study_meta = read_metadata(s3_client, s3_bucket, meta_type="study_periods") + study_meta = read_metadata( + s3_client, s3_bucket, meta_type=JsonDict.STUDY_PERIODS.value + ) study_meta = update_metadata( study_meta, site, @@ -34,7 +36,7 @@ def update_study_period(s3_client, s3_bucket, site, study, data_package): data_package, "earliest_date", df["min_date"][0], - meta_type="study_periods", + meta_type=JsonDict.STUDY_PERIODS.value, ) study_meta = update_metadata( study_meta, @@ -43,7 +45,7 @@ def update_study_period(s3_client, s3_bucket, site, study, data_package): data_package, "latest_date", df["max_date"][0], - meta_type="study_periods", + meta_type=JsonDict.STUDY_PERIODS.value, ) study_meta = update_metadata( study_meta, @@ -52,9 +54,11 @@ def update_study_period(s3_client, s3_bucket, site, study, data_package): data_package, "last_data_update", datetime.now(timezone.utc), - meta_type="study_periods", + meta_type=JsonDict.STUDY_PERIODS.value, + ) + write_metadata( + s3_client, s3_bucket, study_meta, meta_type=JsonDict.STUDY_PERIODS.value ) - write_metadata(s3_client, s3_bucket, study_meta, meta_type="study_periods") @generic_error_handler(msg="Error updating study period") From 3d896af599d06d0a1c42484579561e61dc515338 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Thu, 13 Apr 2023 14:18:07 -0400 Subject: [PATCH 6/6] added csv of date meta export --- tests/site_upload/test_study_period.py | 8 ++++++++ tests/test_data/meta_date.csv | 2 ++ 2 files changed, 10 insertions(+) create mode 100644 tests/test_data/meta_date.csv diff --git a/tests/site_upload/test_study_period.py b/tests/site_upload/test_study_period.py index e654b19..96901dc 100644 --- a/tests/site_upload/test_study_period.py +++ b/tests/site_upload/test_study_period.py @@ -1,4 +1,5 @@ import boto3 +import csv import os import pytest @@ -78,3 +79,10 @@ def test_process_upload( metadata[site][study_key]["last_data_update"] == datetime.now(timezone.utc).isoformat() ) + with open("./tests/test_data/meta_date.csv", "r") as file: + reader = csv.reader(file) + # discarding CSV header row + next(reader) + row = next(reader) + assert metadata[site][study_key]["earliest_date"] == f"{row[0]}T00:00:00" + assert metadata[site][study_key]["latest_date"] == f"{row[1]}T00:00:00" diff --git a/tests/test_data/meta_date.csv b/tests/test_data/meta_date.csv new file mode 100644 index 0000000..1300fd3 --- /dev/null +++ b/tests/test_data/meta_date.csv @@ -0,0 +1,2 @@ +min_date,max_date +2016-06-01,2023-04-09