Skip to content

Commit

Permalink
Decouple powerset logic from read/write (#97)
Browse files Browse the repository at this point in the history
* Decouple powerset logic from read/write

* docstring cleanup

* namespace cutover

* lint pass

* docstring tweak
  • Loading branch information
dogversioning authored Jul 31, 2023
1 parent ddb504a commit eaf122c
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 129 deletions.
256 changes: 132 additions & 124 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os

from datetime import datetime, timezone
from typing import Dict

import awswrangler
import boto3
Expand All @@ -26,34 +25,111 @@
)


class S3Manager:
"""Convenience class for managing S3 Access"""

def __init__(self, event):
self.s3_bucket_name = os.environ.get("BUCKET_NAME")
self.s3_client = boto3.client("s3")
self.sns_client = boto3.client(
"sns", region_name=self.s3_client.meta.region_name
)

s3_key = event["Records"][0]["Sns"]["Message"]
s3_key_array = s3_key.split("/")
self.site = s3_key_array[3]
self.study = s3_key_array[1]
self.data_package = s3_key_array[2]

self.metadata = read_metadata(self.s3_client, self.s3_bucket_name)

# S3 Filesystem operations
def get_data_package_list(self, path) -> list:
"""convenience wrapper for get_s3_data_package_list"""
return get_s3_data_package_list(
path, self.s3_bucket_name, self.study, self.data_package
)

def move_file(self, from_path: str, to_path: str) -> None:
"""convenience wrapper for move_s3_file"""
move_s3_file(self.s3_client, self.s3_bucket_name, from_path, to_path)

def copy_file(self, from_path: str, to_path: str) -> None:
"""convenience wrapper for copy_s3_file"""
source = {
"Bucket": self.s3_bucket_name,
"Key": from_path,
}
self.s3_client.copy_object(
CopySource=source,
Bucket=self.s3_bucket_name,
Key=to_path,
)

# parquet/csv output creation
def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None:
"""writes dataframe as parquet to s3 and sends an SNS notification if new"""
parquet_aggregate_path = (
f"s3://{self.s3_bucket_name}/{BucketPath.AGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}__{self.data_package}__aggregate.parquet"
)
awswrangler.s3.to_parquet(df, parquet_aggregate_path, index=False)
if is_new_data_package:
topic_sns_arn = os.environ.get("TOPIC_CACHE_API_ARN")
self.sns_client.publish(
TopicArn=topic_sns_arn, Message="data_packages", Subject="data_packages"
)

def write_csv(self, df: pandas.DataFrame) -> None:
"""writes dataframe as csv to s3"""
csv_aggregate_path = (
f"s3://{self.s3_bucket_name}/{BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)
df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace(
'""', nan
)
df = df.replace(to_replace=r",", value="", regex=True)
awswrangler.s3.to_csv(
df, csv_aggregate_path, index=False, quoting=csv.QUOTE_NONE
)

# metadata
def update_local_metadata(self, key, site=None):
"""convenience wrapper for update_metadata"""
if site is None:
site = self.site
self.metadata = update_metadata(
self.metadata, site, self.study, self.data_package, key
)

def write_local_metadata(self):
"""convenience wrapper for write_metadata"""
write_metadata(self.s3_client, self.s3_bucket_name, self.metadata)

def merge_error_handler(
self,
s3_path: str,
bucket_path: BucketPath,
subbucket_path: str,
error: Exception,
) -> None:
"""Helper for logging errors and moving files"""
logging.error("File %s failed to aggregate: %s", s3_path, str(error))
self.move_file(
f"{bucket_path.value}/{subbucket_path}",
f"{BucketPath.ERROR.value}/{subbucket_path}",
)
self.update_local_metadata("last_error")


def get_static_string_series(static_str: str, index: RangeIndex) -> pandas.Series:
"""Helper for the verbose way of defining a pandas string series"""
return pandas.Series([static_str] * len(index)).astype("string")


def merge_error_handler(
s3_client,
s3_path: str,
s3_bucket_name: str,
bucket_path: BucketPath,
subbucket_path: str,
metadata: Dict,
site: str,
study: str,
data_package: str,
error: Exception,
) -> None:
"""Helper for logging errors and moving files"""
logging.error("File %s failed to aggregate: %s", s3_path, str(error))
move_s3_file(
s3_client,
s3_bucket_name,
f"{bucket_path.value}/{subbucket_path}",
f"{BucketPath.ERROR.value}/{subbucket_path}",
)
metadata = update_metadata(metadata, site, study, data_package, "last_error")


def expand_and_concat_sets(
df: pandas.DataFrame, file_path: str, site_name: str
) -> pandas.DataFrame:
Expand Down Expand Up @@ -108,166 +184,98 @@ def expand_and_concat_sets(
return agg_df


def write_parquet(
df: pandas.DataFrame, path: str, sns_client, is_new_data_package: bool
) -> None:
awswrangler.s3.to_parquet(df, path, index=False)
if is_new_data_package:
topic_sns_arn = os.environ.get("TOPIC_CACHE_API_ARN")
sns_client.publish(
TopicArn=topic_sns_arn, Message="data_packages", Subject="data_packages"
)


def write_csv(df: pandas.DataFrame, path) -> None:
df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace('""', nan)
df = df.replace(to_replace=r",", value="", regex=True)
awswrangler.s3.to_csv(df, path, index=False, quoting=csv.QUOTE_NONE)


def merge_powersets(
s3_client, sns_client, s3_bucket_name: str, site: str, study: str, data_package: str
) -> None:
def merge_powersets(manager: S3Manager) -> None:
"""Creates an aggregate powerset from all files with a given s3 prefix"""
# TODO: this should be memory profiled for large datasets. We can use
# chunking to lower memory usage during merges.

# initializing this early in case an empty file causes us to never set it
is_new_data_package = False

metadata = read_metadata(s3_client, s3_bucket_name)
df = pandas.DataFrame()
latest_file_list = get_s3_data_package_list(
BucketPath.LATEST.value, s3_bucket_name, study, data_package
)
last_valid_file_list = get_s3_data_package_list(
BucketPath.LAST_VALID.value, s3_bucket_name, study, data_package
)
for s3_path in last_valid_file_list:
site_specific_name = get_s3_site_filename_suffix(s3_path)
subbucket_path = f"{study}/{data_package}/{site_specific_name}"
latest_file_list = manager.get_data_package_list(BucketPath.LATEST.value)
last_valid_file_list = manager.get_data_package_list(BucketPath.LAST_VALID.value)
for last_valid_path in last_valid_file_list:
site_specific_name = get_s3_site_filename_suffix(last_valid_path)
subbucket_path = f"{manager.study}/{manager.data_package}/{site_specific_name}"
last_valid_site = site_specific_name.split("/", maxsplit=1)[0]

# If the latest uploads don't include this site, we'll use the last-valid
# one instead
try:
if not any(x.endswith(site_specific_name) for x in latest_file_list):
df = expand_and_concat_sets(df, s3_path, last_valid_site)
metadata = update_metadata(
metadata, last_valid_site, study, data_package, "last_uploaded_date"
df = expand_and_concat_sets(df, last_valid_path, last_valid_site)
manager.update_local_metadata(
"last_uploaded_date", site=last_valid_site
)
except Exception as e: # pylint: disable=broad-except
# This is expected to trigger if there's an issue in expand_and_concat_sets;
# this usually means there's a data problem.
merge_error_handler(
s3_client,
s3_path,
s3_bucket_name,
manager.merge_error_handler(
last_valid_path,
BucketPath.LATEST,
subbucket_path,
metadata,
site,
study,
data_package,
e,
)
for s3_path in latest_file_list:
site_specific_name = get_s3_site_filename_suffix(s3_path)
subbucket_path = f"{study}/{data_package}/{site_specific_name}"
for latest_path in latest_file_list:
site_specific_name = get_s3_site_filename_suffix(latest_path)
subbucket_path = f"{manager.study}/{manager.data_package}/{site_specific_name}"
date_str = datetime.now(timezone.utc).isoformat()
timestamped_name = f".{date_str}.".join(site_specific_name.split("."))
timestamped_path = f"{study}/{data_package}/{timestamped_name}"
timestamped_path = f"{manager.study}/{manager.data_package}/{timestamped_name}"
try:
is_new_data_package = False
# if we're going to replace a file in last_valid, archive the old data
if any(x.endswith(site_specific_name) for x in last_valid_file_list):
source = {
"Bucket": s3_bucket_name,
"Key": f"{BucketPath.LAST_VALID.value}/{subbucket_path}",
}
s3_client.copy_object(
CopySource=source,
Bucket=s3_bucket_name,
Key=f"{BucketPath.ARCHIVE.value}/{timestamped_path}",
manager.copy_file(
f"{BucketPath.LAST_VALID.value}/{subbucket_path}",
f"{BucketPath.ARCHIVE.value}/{timestamped_path}",
)

# otherwise, this is the first instance - after it's in the database,
# we'll generate a new list of valid tables for the dashboard
else:
is_new_data_package = True
df = expand_and_concat_sets(df, s3_path, site)
move_s3_file(
s3_client,
s3_bucket_name,
df = expand_and_concat_sets(df, latest_path, manager.site)
manager.move_file(
f"{BucketPath.LATEST.value}/{subbucket_path}",
f"{BucketPath.LAST_VALID.value}/{subbucket_path}",
)
latest_site = site_specific_name.split("/", maxsplit=1)[0]
metadata = update_metadata(
metadata, latest_site, study, data_package, "last_data_update"
)
metadata = update_metadata(
metadata, latest_site, study, data_package, "last_aggregation"
)
manager.update_local_metadata("last_data_update", site=latest_site)
manager.update_local_metadata("last_aggregation", site=latest_site)
except Exception as e: # pylint: disable=broad-except
merge_error_handler(
s3_client,
s3_path,
s3_bucket_name,
manager.merge_error_handler(
latest_path,
BucketPath.LATEST,
subbucket_path,
metadata,
site,
study,
data_package,
e,
)
# if a new file fails, we want to replace it with the last valid
# for purposes of aggregation
if any(x.endswith(site_specific_name) for x in last_valid_file_list):
df = expand_and_concat_sets(
df,
f"s3://{s3_bucket_name}/{BucketPath.LAST_VALID.value}"
f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}"
f"/{subbucket_path}",
site,
manager.site,
)
metadata = update_metadata(
metadata, site, study, data_package, "last_aggregation"
)
write_metadata(s3_client, s3_bucket_name, metadata)
manager.update_local_metadata("last_aggregation")
manager.write_local_metadata()

# In this section, we are trying to accomplish two things:
# - Prepare a csv that can be loaded manually into the dashboard (requiring no
# quotes, which means removing commas from strings)
# - Make a parquet file from the dataframe, which may mutate the dataframe
# So we're making a deep copy to isolate these two mutation paths from each other.

parquet_aggregate_path = (
f"s3://{s3_bucket_name}/{BucketPath.AGGREGATE.value}/"
f"{study}/{study}__{data_package}/{study}__{data_package}__aggregate.parquet"
)
write_parquet(
df.copy(deep=True), parquet_aggregate_path, sns_client, is_new_data_package
)
csv_aggregate_path = (
f"s3://{s3_bucket_name}/{BucketPath.CSVAGGREGATE.value}/"
f"{study}/{study}__{data_package}/{study}__{data_package}__aggregate.csv"
)
write_csv(df, csv_aggregate_path)
manager.write_parquet(df.copy(deep=True), is_new_data_package)
manager.write_csv(df)


@generic_error_handler(msg="Error merging powersets")
def powerset_merge_handler(event, context):
"""manages event from SNS, triggers file processing and merge"""
del context
s3_bucket = os.environ.get("BUCKET_NAME")
s3_client = boto3.client("s3")
s3_key = event["Records"][0]["Sns"]["Message"]
s3_key_array = s3_key.split("/")
site = s3_key_array[3]
study = s3_key_array[1]
data_package = s3_key_array[2]
sns_client = boto3.client("sns", region_name=s3_client.meta.region_name)
merge_powersets(s3_client, sns_client, s3_bucket, site, study, data_package)
manager = S3Manager(event)
merge_powersets(manager)
res = http_response(200, "Merge successful")
return res
5 changes: 0 additions & 5 deletions tests/site_upload/test_powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
EXISTING_STUDY_NAME = "study"
DATA_P_NAME = "encounter"

"""
"""


@freeze_time("2020-01-01")
@pytest.mark.parametrize(
Expand Down Expand Up @@ -124,7 +121,6 @@ def test_powerset_merge_single_upload(
event = {
"Records": [
{
"awsRegion": "us-east-1",
"Sns": {"Message": f"{BucketPath.LATEST.value}{event_key}"},
}
]
Expand Down Expand Up @@ -235,7 +231,6 @@ def test_powerset_merge_join_study_data(
event = {
"Records": [
{
"awsRegion": "us-east-1",
"Sns": {
"Message": f"{BucketPath.LATEST.value}/{EXISTING_STUDY_NAME}"
f"/{DATA_P_NAME}/{NEW_SITE_NAME}/encounter.parquet"
Expand Down

0 comments on commit eaf122c

Please sign in to comment.