From eaf122c90b7a8736a10d8549f7f05367d515e7e0 Mon Sep 17 00:00:00 2001 From: matt garber Date: Mon, 31 Jul 2023 14:50:50 -0400 Subject: [PATCH] Decouple powerset logic from read/write (#97) * Decouple powerset logic from read/write * docstring cleanup * namespace cutover * lint pass * docstring tweak --- src/handlers/site_upload/powerset_merge.py | 256 +++++++++++---------- tests/site_upload/test_powerset_merge.py | 5 - 2 files changed, 132 insertions(+), 129 deletions(-) diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index 93edce0..cc7f3b7 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -4,7 +4,6 @@ import os from datetime import datetime, timezone -from typing import Dict import awswrangler import boto3 @@ -26,34 +25,111 @@ ) +class S3Manager: + """Convenience class for managing S3 Access""" + + def __init__(self, event): + self.s3_bucket_name = os.environ.get("BUCKET_NAME") + self.s3_client = boto3.client("s3") + self.sns_client = boto3.client( + "sns", region_name=self.s3_client.meta.region_name + ) + + s3_key = event["Records"][0]["Sns"]["Message"] + s3_key_array = s3_key.split("/") + self.site = s3_key_array[3] + self.study = s3_key_array[1] + self.data_package = s3_key_array[2] + + self.metadata = read_metadata(self.s3_client, self.s3_bucket_name) + + # S3 Filesystem operations + def get_data_package_list(self, path) -> list: + """convenience wrapper for get_s3_data_package_list""" + return get_s3_data_package_list( + path, self.s3_bucket_name, self.study, self.data_package + ) + + def move_file(self, from_path: str, to_path: str) -> None: + """convenience wrapper for move_s3_file""" + move_s3_file(self.s3_client, self.s3_bucket_name, from_path, to_path) + + def copy_file(self, from_path: str, to_path: str) -> None: + """convenience wrapper for copy_s3_file""" + source = { + "Bucket": self.s3_bucket_name, + "Key": from_path, + } + self.s3_client.copy_object( + CopySource=source, + Bucket=self.s3_bucket_name, + Key=to_path, + ) + + # parquet/csv output creation + def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None: + """writes dataframe as parquet to s3 and sends an SNS notification if new""" + parquet_aggregate_path = ( + f"s3://{self.s3_bucket_name}/{BucketPath.AGGREGATE.value}/" + f"{self.study}/{self.study}__{self.data_package}/" + f"{self.study}__{self.data_package}__aggregate.parquet" + ) + awswrangler.s3.to_parquet(df, parquet_aggregate_path, index=False) + if is_new_data_package: + topic_sns_arn = os.environ.get("TOPIC_CACHE_API_ARN") + self.sns_client.publish( + TopicArn=topic_sns_arn, Message="data_packages", Subject="data_packages" + ) + + def write_csv(self, df: pandas.DataFrame) -> None: + """writes dataframe as csv to s3""" + csv_aggregate_path = ( + f"s3://{self.s3_bucket_name}/{BucketPath.CSVAGGREGATE.value}/" + f"{self.study}/{self.study}__{self.data_package}/" + f"{self.study}__{self.data_package}__aggregate.csv" + ) + df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace( + '""', nan + ) + df = df.replace(to_replace=r",", value="", regex=True) + awswrangler.s3.to_csv( + df, csv_aggregate_path, index=False, quoting=csv.QUOTE_NONE + ) + + # metadata + def update_local_metadata(self, key, site=None): + """convenience wrapper for update_metadata""" + if site is None: + site = self.site + self.metadata = update_metadata( + self.metadata, site, self.study, self.data_package, key + ) + + def write_local_metadata(self): + """convenience wrapper for write_metadata""" + write_metadata(self.s3_client, self.s3_bucket_name, self.metadata) + + def merge_error_handler( + self, + s3_path: str, + bucket_path: BucketPath, + subbucket_path: str, + error: Exception, + ) -> None: + """Helper for logging errors and moving files""" + logging.error("File %s failed to aggregate: %s", s3_path, str(error)) + self.move_file( + f"{bucket_path.value}/{subbucket_path}", + f"{BucketPath.ERROR.value}/{subbucket_path}", + ) + self.update_local_metadata("last_error") + + def get_static_string_series(static_str: str, index: RangeIndex) -> pandas.Series: """Helper for the verbose way of defining a pandas string series""" return pandas.Series([static_str] * len(index)).astype("string") -def merge_error_handler( - s3_client, - s3_path: str, - s3_bucket_name: str, - bucket_path: BucketPath, - subbucket_path: str, - metadata: Dict, - site: str, - study: str, - data_package: str, - error: Exception, -) -> None: - """Helper for logging errors and moving files""" - logging.error("File %s failed to aggregate: %s", s3_path, str(error)) - move_s3_file( - s3_client, - s3_bucket_name, - f"{bucket_path.value}/{subbucket_path}", - f"{BucketPath.ERROR.value}/{subbucket_path}", - ) - metadata = update_metadata(metadata, site, study, data_package, "last_error") - - def expand_and_concat_sets( df: pandas.DataFrame, file_path: str, site_name: str ) -> pandas.DataFrame: @@ -108,118 +184,70 @@ def expand_and_concat_sets( return agg_df -def write_parquet( - df: pandas.DataFrame, path: str, sns_client, is_new_data_package: bool -) -> None: - awswrangler.s3.to_parquet(df, path, index=False) - if is_new_data_package: - topic_sns_arn = os.environ.get("TOPIC_CACHE_API_ARN") - sns_client.publish( - TopicArn=topic_sns_arn, Message="data_packages", Subject="data_packages" - ) - - -def write_csv(df: pandas.DataFrame, path) -> None: - df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace('""', nan) - df = df.replace(to_replace=r",", value="", regex=True) - awswrangler.s3.to_csv(df, path, index=False, quoting=csv.QUOTE_NONE) - - -def merge_powersets( - s3_client, sns_client, s3_bucket_name: str, site: str, study: str, data_package: str -) -> None: +def merge_powersets(manager: S3Manager) -> None: """Creates an aggregate powerset from all files with a given s3 prefix""" # TODO: this should be memory profiled for large datasets. We can use # chunking to lower memory usage during merges. # initializing this early in case an empty file causes us to never set it is_new_data_package = False - - metadata = read_metadata(s3_client, s3_bucket_name) df = pandas.DataFrame() - latest_file_list = get_s3_data_package_list( - BucketPath.LATEST.value, s3_bucket_name, study, data_package - ) - last_valid_file_list = get_s3_data_package_list( - BucketPath.LAST_VALID.value, s3_bucket_name, study, data_package - ) - for s3_path in last_valid_file_list: - site_specific_name = get_s3_site_filename_suffix(s3_path) - subbucket_path = f"{study}/{data_package}/{site_specific_name}" + latest_file_list = manager.get_data_package_list(BucketPath.LATEST.value) + last_valid_file_list = manager.get_data_package_list(BucketPath.LAST_VALID.value) + for last_valid_path in last_valid_file_list: + site_specific_name = get_s3_site_filename_suffix(last_valid_path) + subbucket_path = f"{manager.study}/{manager.data_package}/{site_specific_name}" last_valid_site = site_specific_name.split("/", maxsplit=1)[0] # If the latest uploads don't include this site, we'll use the last-valid # one instead try: if not any(x.endswith(site_specific_name) for x in latest_file_list): - df = expand_and_concat_sets(df, s3_path, last_valid_site) - metadata = update_metadata( - metadata, last_valid_site, study, data_package, "last_uploaded_date" + df = expand_and_concat_sets(df, last_valid_path, last_valid_site) + manager.update_local_metadata( + "last_uploaded_date", site=last_valid_site ) except Exception as e: # pylint: disable=broad-except # This is expected to trigger if there's an issue in expand_and_concat_sets; # this usually means there's a data problem. - merge_error_handler( - s3_client, - s3_path, - s3_bucket_name, + manager.merge_error_handler( + last_valid_path, BucketPath.LATEST, subbucket_path, - metadata, - site, - study, - data_package, e, ) - for s3_path in latest_file_list: - site_specific_name = get_s3_site_filename_suffix(s3_path) - subbucket_path = f"{study}/{data_package}/{site_specific_name}" + for latest_path in latest_file_list: + site_specific_name = get_s3_site_filename_suffix(latest_path) + subbucket_path = f"{manager.study}/{manager.data_package}/{site_specific_name}" date_str = datetime.now(timezone.utc).isoformat() timestamped_name = f".{date_str}.".join(site_specific_name.split(".")) - timestamped_path = f"{study}/{data_package}/{timestamped_name}" + timestamped_path = f"{manager.study}/{manager.data_package}/{timestamped_name}" try: is_new_data_package = False # if we're going to replace a file in last_valid, archive the old data if any(x.endswith(site_specific_name) for x in last_valid_file_list): - source = { - "Bucket": s3_bucket_name, - "Key": f"{BucketPath.LAST_VALID.value}/{subbucket_path}", - } - s3_client.copy_object( - CopySource=source, - Bucket=s3_bucket_name, - Key=f"{BucketPath.ARCHIVE.value}/{timestamped_path}", + manager.copy_file( + f"{BucketPath.LAST_VALID.value}/{subbucket_path}", + f"{BucketPath.ARCHIVE.value}/{timestamped_path}", ) # otherwise, this is the first instance - after it's in the database, # we'll generate a new list of valid tables for the dashboard else: is_new_data_package = True - df = expand_and_concat_sets(df, s3_path, site) - move_s3_file( - s3_client, - s3_bucket_name, + df = expand_and_concat_sets(df, latest_path, manager.site) + manager.move_file( f"{BucketPath.LATEST.value}/{subbucket_path}", f"{BucketPath.LAST_VALID.value}/{subbucket_path}", ) latest_site = site_specific_name.split("/", maxsplit=1)[0] - metadata = update_metadata( - metadata, latest_site, study, data_package, "last_data_update" - ) - metadata = update_metadata( - metadata, latest_site, study, data_package, "last_aggregation" - ) + manager.update_local_metadata("last_data_update", site=latest_site) + manager.update_local_metadata("last_aggregation", site=latest_site) except Exception as e: # pylint: disable=broad-except - merge_error_handler( - s3_client, - s3_path, - s3_bucket_name, + manager.merge_error_handler( + latest_path, BucketPath.LATEST, subbucket_path, - metadata, - site, - study, - data_package, e, ) # if a new file fails, we want to replace it with the last valid @@ -227,47 +255,27 @@ def merge_powersets( if any(x.endswith(site_specific_name) for x in last_valid_file_list): df = expand_and_concat_sets( df, - f"s3://{s3_bucket_name}/{BucketPath.LAST_VALID.value}" + f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}" f"/{subbucket_path}", - site, + manager.site, ) - metadata = update_metadata( - metadata, site, study, data_package, "last_aggregation" - ) - write_metadata(s3_client, s3_bucket_name, metadata) + manager.update_local_metadata("last_aggregation") + manager.write_local_metadata() # In this section, we are trying to accomplish two things: # - Prepare a csv that can be loaded manually into the dashboard (requiring no # quotes, which means removing commas from strings) # - Make a parquet file from the dataframe, which may mutate the dataframe # So we're making a deep copy to isolate these two mutation paths from each other. - - parquet_aggregate_path = ( - f"s3://{s3_bucket_name}/{BucketPath.AGGREGATE.value}/" - f"{study}/{study}__{data_package}/{study}__{data_package}__aggregate.parquet" - ) - write_parquet( - df.copy(deep=True), parquet_aggregate_path, sns_client, is_new_data_package - ) - csv_aggregate_path = ( - f"s3://{s3_bucket_name}/{BucketPath.CSVAGGREGATE.value}/" - f"{study}/{study}__{data_package}/{study}__{data_package}__aggregate.csv" - ) - write_csv(df, csv_aggregate_path) + manager.write_parquet(df.copy(deep=True), is_new_data_package) + manager.write_csv(df) @generic_error_handler(msg="Error merging powersets") def powerset_merge_handler(event, context): """manages event from SNS, triggers file processing and merge""" del context - s3_bucket = os.environ.get("BUCKET_NAME") - s3_client = boto3.client("s3") - s3_key = event["Records"][0]["Sns"]["Message"] - s3_key_array = s3_key.split("/") - site = s3_key_array[3] - study = s3_key_array[1] - data_package = s3_key_array[2] - sns_client = boto3.client("sns", region_name=s3_client.meta.region_name) - merge_powersets(s3_client, sns_client, s3_bucket, site, study, data_package) + manager = S3Manager(event) + merge_powersets(manager) res = http_response(200, "Merge successful") return res diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index a16d28f..882697a 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -24,9 +24,6 @@ EXISTING_STUDY_NAME = "study" DATA_P_NAME = "encounter" -""" -""" - @freeze_time("2020-01-01") @pytest.mark.parametrize( @@ -124,7 +121,6 @@ def test_powerset_merge_single_upload( event = { "Records": [ { - "awsRegion": "us-east-1", "Sns": {"Message": f"{BucketPath.LATEST.value}{event_key}"}, } ] @@ -235,7 +231,6 @@ def test_powerset_merge_join_study_data( event = { "Records": [ { - "awsRegion": "us-east-1", "Sns": { "Message": f"{BucketPath.LATEST.value}/{EXISTING_STUDY_NAME}" f"/{DATA_P_NAME}/{NEW_SITE_NAME}/encounter.parquet"