From 103e1485c07f5a54b2959e59b4e5d2bdb55ce967 Mon Sep 17 00:00:00 2001 From: matt garber Date: Thu, 3 Aug 2023 10:38:54 -0400 Subject: [PATCH] Updates for merge bug resolution (#98) * Updates for merge bug resolution * light cleanup --- src/handlers/site_upload/powerset_merge.py | 25 +++++++----- template.yaml | 2 +- tests/site_upload/test_powerset_merge.py | 40 +++++++++++++++++++- tests/test_data/count_synthea_empty.csv | 1 + tests/test_data/count_synthea_empty.parquet | Bin 0 -> 3193 bytes 5 files changed, 56 insertions(+), 12 deletions(-) create mode 100644 tests/test_data/count_synthea_empty.csv create mode 100644 tests/test_data/count_synthea_empty.parquet diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index cc7f3b7..7ac8e97 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -2,6 +2,7 @@ import csv import logging import os +import traceback from datetime import datetime, timezone @@ -25,6 +26,12 @@ ) +class MergeError(ValueError): + def __init__(self, message, filename): + super().__init__(message) + self.filename = filename + + class S3Manager: """Convenience class for managing S3 Access""" @@ -112,14 +119,14 @@ def write_local_metadata(self): def merge_error_handler( self, s3_path: str, - bucket_path: BucketPath, subbucket_path: str, error: Exception, ) -> None: """Helper for logging errors and moving files""" logging.error("File %s failed to aggregate: %s", s3_path, str(error)) + logging.error(traceback.print_exc()) self.move_file( - f"{bucket_path.value}/{subbucket_path}", + s3_path.replace(f"s3://{self.s3_bucket_name}/", ""), f"{BucketPath.ERROR.value}/{subbucket_path}", ) self.update_local_metadata("last_error") @@ -150,7 +157,7 @@ def expand_and_concat_sets( """ site_df = awswrangler.s3.read_parquet(file_path) if site_df.empty: - raise ValueError("Uploaded data file is empty") + raise MergeError("Uploaded data file is empty", filename=file_path) df_copy = site_df.copy() site_df["site"] = get_static_string_series(None, site_df.index) df_copy["site"] = get_static_string_series(site_name, df_copy.index) @@ -159,7 +166,10 @@ def expand_and_concat_sets( # are generated from the same vintage. This naive approach will cause a decent # amount of data churn we'll have to manage in the interim. if df.empty is False and set(site_df.columns) != set(df.columns): - raise ValueError("Uploaded data has a different schema than last aggregate") + raise MergeError( + "Uploaded data has a different schema than last aggregate", + filename=file_path, + ) # concating in this way adds a new column we want to explictly drop # from the final set @@ -198,7 +208,6 @@ def merge_powersets(manager: S3Manager) -> None: site_specific_name = get_s3_site_filename_suffix(last_valid_path) subbucket_path = f"{manager.study}/{manager.data_package}/{site_specific_name}" last_valid_site = site_specific_name.split("/", maxsplit=1)[0] - # If the latest uploads don't include this site, we'll use the last-valid # one instead try: @@ -207,12 +216,11 @@ def merge_powersets(manager: S3Manager) -> None: manager.update_local_metadata( "last_uploaded_date", site=last_valid_site ) - except Exception as e: # pylint: disable=broad-except + except MergeError as e: # This is expected to trigger if there's an issue in expand_and_concat_sets; # this usually means there's a data problem. manager.merge_error_handler( - last_valid_path, - BucketPath.LATEST, + e.filename, subbucket_path, e, ) @@ -246,7 +254,6 @@ def merge_powersets(manager: S3Manager) -> None: except Exception as e: # pylint: disable=broad-except manager.merge_error_handler( latest_path, - BucketPath.LATEST, subbucket_path, e, ) diff --git a/template.yaml b/template.yaml index 730db4d..ddcdcf2 100644 --- a/template.yaml +++ b/template.yaml @@ -145,7 +145,7 @@ Resources: Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python39:1] Handler: src/handlers/site_upload/powerset_merge.powerset_merge_handler Runtime: python3.9 - MemorySize: 4096 + MemorySize: 8192 Timeout: 800 Description: Merges and aggregates powerset count data Environment: diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index 882697a..8905087 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -2,18 +2,23 @@ import io import os +from contextlib import nullcontext as does_not_raise from unittest import mock import awswrangler import pytest from datetime import datetime, timezone -from pandas import DataFrame +from pandas import DataFrame, read_parquet from freezegun import freeze_time from src.handlers.shared.enums import BucketPath from src.handlers.shared.functions import read_metadata, write_metadata -from src.handlers.site_upload.powerset_merge import powerset_merge_handler +from src.handlers.site_upload.powerset_merge import ( + powerset_merge_handler, + expand_and_concat_sets, + MergeError, +) from tests.utils import get_mock_metadata, TEST_BUCKET, ITEM_COUNT, MOCK_ENV @@ -254,3 +259,34 @@ def test_powerset_merge_join_study_data( else: assert len(agg_df["site"].unique() == 3) assert errors == expected_errors + + +# Explicitly testing for raising errors during concat due to them being appropriately +# handled by the generic error handler +@pytest.mark.parametrize( + "upload_file,load_empty,raises", + [ + ("./tests/test_data/count_synthea_patient.parquet", False, does_not_raise()), + ( + "./tests/test_data/cube_simple_example.parquet", + False, + pytest.raises(MergeError), + ), + ( + "./tests/test_data/count_synthea_empty.parquet", + True, + pytest.raises(MergeError), + ), + ], +) +def test_expand_and_concat(mock_bucket, upload_file, load_empty, raises): + with raises: + df = read_parquet("./tests/test_data/count_synthea_patient_agg.parquet") + s3_path = f"/test/uploaded.parquet" + s3_client = boto3.client("s3", region_name="us-east-1") + s3_client.upload_file( + upload_file, + TEST_BUCKET, + s3_path, + ) + expand_and_concat_sets(df, f"s3://{TEST_BUCKET}/{s3_path}", EXISTING_STUDY_NAME) diff --git a/tests/test_data/count_synthea_empty.csv b/tests/test_data/count_synthea_empty.csv new file mode 100644 index 0000000..6ad73de --- /dev/null +++ b/tests/test_data/count_synthea_empty.csv @@ -0,0 +1 @@ +cnt,gender,age,race_display \ No newline at end of file diff --git a/tests/test_data/count_synthea_empty.parquet b/tests/test_data/count_synthea_empty.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8e7c38eedb625960798a960610d3bc8ba597791e GIT binary patch literal 3193 zcmcgvOK;;;6n0xwA?N}SjS{Isl_esfc@U@d(98nqI*k)&+RQYa@ryx`UrC%^JDJ$2 zPxJ0bRti+)@|%&<_X2DC1IXRy>nv0BC(H}(W(k-RFs}J-3GJM`ih=QRTw$m1@{VOG z-n|aRyC%N)k276-yVT~sU&}$4<$hfIl;aBb^P3|pbgd}=%jRo7tBy0IlOT-v$GhL~ zD;49vy}!$65?L`lCit2Ut-u47a6&0abbXJyon>0)s1v6n6yztN?+jg!2kCGUjM6K{ z&>nbBjP$^ZE!T=I5P!{kA&C274|QKWmmeJPZ{EzoETiDzOre1#7?;FrIJ{sKr*}kw z(Y?73U}ll=*&$qYHK)HzZ_wbYu4cq{>1XkvuR6TWeTSZJ>Yf{V5Gwm`owCjudsaLg zv+kn4J+{UvHrFV%#^YgvfuDQh$RCDC6-p0F4+S2of9ZRX`A|e2m)|1o_EYU~Hn;LW zZ+(f+qh5^=aspp`o)EH6q#lt8>5*!W(Ijb!pCA!*X8cC)Fx$h*tT4VpNJ2mle$j|@ z0$bdJrULTPS4u!9W?kLa%dK;}p~iN2>_7D-TcMS%rHN;z-tXFVwQOqB?y+3yDzp?S z)#ku1PX${OFJ-8cDvuO7276-P)?T2EiK|Vcqbl*ufKt2O?%IJIo2MXG7X`bXpq<13 zABNuQx(%8zUbQLM0R{VQs`yC{>^RNeu47ZUgTfl!nnc{|3=IUJf|*0&g6?lOr|Cz zZsUyU+Myad4Z0Z9xe<)0v6tDlHML}D?Y>*DUFE5JnjrRxp$QamDS-=dw5j7On5V$f z)X0=8f5*Qmj}-D&@=lpP)Ivx+wNmnQX=cCTTY)-otBfC;kUxm2WL(u}S~;p>e_Ym+ z_=e`rYGRI{AI|jh4ZKk51Ma>fEc(KHw8ginC*(4+8&J2#Qk_ymYf-)29yxnbWawGm zSNg2lyixA|s2{^!#r|E*l`d39SX)o7M5b<1TRup1;ehqde}Zk+_fDWf&sEadIf*O{ z@}oCJ)^DLC!h34(IShvzsHsz?=eCb$NSPj2_miVR&Bs1f<~1&xdh^24Dida7P@A+) z_oqr&f&pXTyqWdoF}%Nk?;$QEz@>o;IlzU@Bl!@9AZ}|C4nk68LyW!RmN2T+)ytRj zYDxiPq+b9HM;+6@SHn??(xi<_Z0Ke7*_I#>;so-*R4Zqe-b(DcEHajY{_mBp=`*|x zU#5>Umf$%cPoATwaZMhBUi4STy;)Xq#-%1tDC1cVvxUO}zMlGa5Z$7t`+S;@Qovq{7P`j$8b2Uw_DP&)`qTUxsg! AKL7v# literal 0 HcmV?d00001