diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index cc7f3b7..7ac8e97 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -2,6 +2,7 @@ import csv import logging import os +import traceback from datetime import datetime, timezone @@ -25,6 +26,12 @@ ) +class MergeError(ValueError): + def __init__(self, message, filename): + super().__init__(message) + self.filename = filename + + class S3Manager: """Convenience class for managing S3 Access""" @@ -112,14 +119,14 @@ def write_local_metadata(self): def merge_error_handler( self, s3_path: str, - bucket_path: BucketPath, subbucket_path: str, error: Exception, ) -> None: """Helper for logging errors and moving files""" logging.error("File %s failed to aggregate: %s", s3_path, str(error)) + logging.error(traceback.print_exc()) self.move_file( - f"{bucket_path.value}/{subbucket_path}", + s3_path.replace(f"s3://{self.s3_bucket_name}/", ""), f"{BucketPath.ERROR.value}/{subbucket_path}", ) self.update_local_metadata("last_error") @@ -150,7 +157,7 @@ def expand_and_concat_sets( """ site_df = awswrangler.s3.read_parquet(file_path) if site_df.empty: - raise ValueError("Uploaded data file is empty") + raise MergeError("Uploaded data file is empty", filename=file_path) df_copy = site_df.copy() site_df["site"] = get_static_string_series(None, site_df.index) df_copy["site"] = get_static_string_series(site_name, df_copy.index) @@ -159,7 +166,10 @@ def expand_and_concat_sets( # are generated from the same vintage. This naive approach will cause a decent # amount of data churn we'll have to manage in the interim. if df.empty is False and set(site_df.columns) != set(df.columns): - raise ValueError("Uploaded data has a different schema than last aggregate") + raise MergeError( + "Uploaded data has a different schema than last aggregate", + filename=file_path, + ) # concating in this way adds a new column we want to explictly drop # from the final set @@ -198,7 +208,6 @@ def merge_powersets(manager: S3Manager) -> None: site_specific_name = get_s3_site_filename_suffix(last_valid_path) subbucket_path = f"{manager.study}/{manager.data_package}/{site_specific_name}" last_valid_site = site_specific_name.split("/", maxsplit=1)[0] - # If the latest uploads don't include this site, we'll use the last-valid # one instead try: @@ -207,12 +216,11 @@ def merge_powersets(manager: S3Manager) -> None: manager.update_local_metadata( "last_uploaded_date", site=last_valid_site ) - except Exception as e: # pylint: disable=broad-except + except MergeError as e: # This is expected to trigger if there's an issue in expand_and_concat_sets; # this usually means there's a data problem. manager.merge_error_handler( - last_valid_path, - BucketPath.LATEST, + e.filename, subbucket_path, e, ) @@ -246,7 +254,6 @@ def merge_powersets(manager: S3Manager) -> None: except Exception as e: # pylint: disable=broad-except manager.merge_error_handler( latest_path, - BucketPath.LATEST, subbucket_path, e, ) diff --git a/template.yaml b/template.yaml index 730db4d..ddcdcf2 100644 --- a/template.yaml +++ b/template.yaml @@ -145,7 +145,7 @@ Resources: Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python39:1] Handler: src/handlers/site_upload/powerset_merge.powerset_merge_handler Runtime: python3.9 - MemorySize: 4096 + MemorySize: 8192 Timeout: 800 Description: Merges and aggregates powerset count data Environment: diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index 882697a..8905087 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -2,18 +2,23 @@ import io import os +from contextlib import nullcontext as does_not_raise from unittest import mock import awswrangler import pytest from datetime import datetime, timezone -from pandas import DataFrame +from pandas import DataFrame, read_parquet from freezegun import freeze_time from src.handlers.shared.enums import BucketPath from src.handlers.shared.functions import read_metadata, write_metadata -from src.handlers.site_upload.powerset_merge import powerset_merge_handler +from src.handlers.site_upload.powerset_merge import ( + powerset_merge_handler, + expand_and_concat_sets, + MergeError, +) from tests.utils import get_mock_metadata, TEST_BUCKET, ITEM_COUNT, MOCK_ENV @@ -254,3 +259,34 @@ def test_powerset_merge_join_study_data( else: assert len(agg_df["site"].unique() == 3) assert errors == expected_errors + + +# Explicitly testing for raising errors during concat due to them being appropriately +# handled by the generic error handler +@pytest.mark.parametrize( + "upload_file,load_empty,raises", + [ + ("./tests/test_data/count_synthea_patient.parquet", False, does_not_raise()), + ( + "./tests/test_data/cube_simple_example.parquet", + False, + pytest.raises(MergeError), + ), + ( + "./tests/test_data/count_synthea_empty.parquet", + True, + pytest.raises(MergeError), + ), + ], +) +def test_expand_and_concat(mock_bucket, upload_file, load_empty, raises): + with raises: + df = read_parquet("./tests/test_data/count_synthea_patient_agg.parquet") + s3_path = f"/test/uploaded.parquet" + s3_client = boto3.client("s3", region_name="us-east-1") + s3_client.upload_file( + upload_file, + TEST_BUCKET, + s3_path, + ) + expand_and_concat_sets(df, f"s3://{TEST_BUCKET}/{s3_path}", EXISTING_STUDY_NAME) diff --git a/tests/test_data/count_synthea_empty.csv b/tests/test_data/count_synthea_empty.csv new file mode 100644 index 0000000..6ad73de --- /dev/null +++ b/tests/test_data/count_synthea_empty.csv @@ -0,0 +1 @@ +cnt,gender,age,race_display \ No newline at end of file diff --git a/tests/test_data/count_synthea_empty.parquet b/tests/test_data/count_synthea_empty.parquet new file mode 100644 index 0000000..8e7c38e Binary files /dev/null and b/tests/test_data/count_synthea_empty.parquet differ