Skip to content

Commit

Permalink
Updates for merge bug resolution (#98)
Browse files Browse the repository at this point in the history
* Updates for merge bug resolution

* light cleanup
  • Loading branch information
dogversioning authored Aug 3, 2023
1 parent eaf122c commit 103e148
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 12 deletions.
25 changes: 16 additions & 9 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import csv
import logging
import os
import traceback

from datetime import datetime, timezone

Expand All @@ -25,6 +26,12 @@
)


class MergeError(ValueError):
def __init__(self, message, filename):
super().__init__(message)
self.filename = filename


class S3Manager:
"""Convenience class for managing S3 Access"""

Expand Down Expand Up @@ -112,14 +119,14 @@ def write_local_metadata(self):
def merge_error_handler(
self,
s3_path: str,
bucket_path: BucketPath,
subbucket_path: str,
error: Exception,
) -> None:
"""Helper for logging errors and moving files"""
logging.error("File %s failed to aggregate: %s", s3_path, str(error))
logging.error(traceback.print_exc())
self.move_file(
f"{bucket_path.value}/{subbucket_path}",
s3_path.replace(f"s3://{self.s3_bucket_name}/", ""),
f"{BucketPath.ERROR.value}/{subbucket_path}",
)
self.update_local_metadata("last_error")
Expand Down Expand Up @@ -150,7 +157,7 @@ def expand_and_concat_sets(
"""
site_df = awswrangler.s3.read_parquet(file_path)
if site_df.empty:
raise ValueError("Uploaded data file is empty")
raise MergeError("Uploaded data file is empty", filename=file_path)
df_copy = site_df.copy()
site_df["site"] = get_static_string_series(None, site_df.index)
df_copy["site"] = get_static_string_series(site_name, df_copy.index)
Expand All @@ -159,7 +166,10 @@ def expand_and_concat_sets(
# are generated from the same vintage. This naive approach will cause a decent
# amount of data churn we'll have to manage in the interim.
if df.empty is False and set(site_df.columns) != set(df.columns):
raise ValueError("Uploaded data has a different schema than last aggregate")
raise MergeError(
"Uploaded data has a different schema than last aggregate",
filename=file_path,
)

# concating in this way adds a new column we want to explictly drop
# from the final set
Expand Down Expand Up @@ -198,7 +208,6 @@ def merge_powersets(manager: S3Manager) -> None:
site_specific_name = get_s3_site_filename_suffix(last_valid_path)
subbucket_path = f"{manager.study}/{manager.data_package}/{site_specific_name}"
last_valid_site = site_specific_name.split("/", maxsplit=1)[0]

# If the latest uploads don't include this site, we'll use the last-valid
# one instead
try:
Expand All @@ -207,12 +216,11 @@ def merge_powersets(manager: S3Manager) -> None:
manager.update_local_metadata(
"last_uploaded_date", site=last_valid_site
)
except Exception as e: # pylint: disable=broad-except
except MergeError as e:
# This is expected to trigger if there's an issue in expand_and_concat_sets;
# this usually means there's a data problem.
manager.merge_error_handler(
last_valid_path,
BucketPath.LATEST,
e.filename,
subbucket_path,
e,
)
Expand Down Expand Up @@ -246,7 +254,6 @@ def merge_powersets(manager: S3Manager) -> None:
except Exception as e: # pylint: disable=broad-except
manager.merge_error_handler(
latest_path,
BucketPath.LATEST,
subbucket_path,
e,
)
Expand Down
2 changes: 1 addition & 1 deletion template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Resources:
Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python39:1]
Handler: src/handlers/site_upload/powerset_merge.powerset_merge_handler
Runtime: python3.9
MemorySize: 4096
MemorySize: 8192
Timeout: 800
Description: Merges and aggregates powerset count data
Environment:
Expand Down
40 changes: 38 additions & 2 deletions tests/site_upload/test_powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@
import io
import os

from contextlib import nullcontext as does_not_raise
from unittest import mock

import awswrangler
import pytest

from datetime import datetime, timezone
from pandas import DataFrame
from pandas import DataFrame, read_parquet
from freezegun import freeze_time

from src.handlers.shared.enums import BucketPath
from src.handlers.shared.functions import read_metadata, write_metadata
from src.handlers.site_upload.powerset_merge import powerset_merge_handler
from src.handlers.site_upload.powerset_merge import (
powerset_merge_handler,
expand_and_concat_sets,
MergeError,
)

from tests.utils import get_mock_metadata, TEST_BUCKET, ITEM_COUNT, MOCK_ENV

Expand Down Expand Up @@ -254,3 +259,34 @@ def test_powerset_merge_join_study_data(
else:
assert len(agg_df["site"].unique() == 3)
assert errors == expected_errors


# Explicitly testing for raising errors during concat due to them being appropriately
# handled by the generic error handler
@pytest.mark.parametrize(
"upload_file,load_empty,raises",
[
("./tests/test_data/count_synthea_patient.parquet", False, does_not_raise()),
(
"./tests/test_data/cube_simple_example.parquet",
False,
pytest.raises(MergeError),
),
(
"./tests/test_data/count_synthea_empty.parquet",
True,
pytest.raises(MergeError),
),
],
)
def test_expand_and_concat(mock_bucket, upload_file, load_empty, raises):
with raises:
df = read_parquet("./tests/test_data/count_synthea_patient_agg.parquet")
s3_path = f"/test/uploaded.parquet"
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.upload_file(
upload_file,
TEST_BUCKET,
s3_path,
)
expand_and_concat_sets(df, f"s3://{TEST_BUCKET}/{s3_path}", EXISTING_STUDY_NAME)
1 change: 1 addition & 0 deletions tests/test_data/count_synthea_empty.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cnt,gender,age,race_display
Binary file added tests/test_data/count_synthea_empty.parquet
Binary file not shown.

0 comments on commit 103e148

Please sign in to comment.