Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Study period detection #68

Merged
merged 6 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,4 @@ dev = [
"pre-commit",
"pylint",
"pycodestyle"

]
31 changes: 31 additions & 0 deletions src/handlers/shared/awswrangler_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
""" functions specifically requiring AWSWranger, which requires a lambda layer"""
import awswrangler


def get_s3_data_package_list(
bucket_root: str,
s3_bucket_name: str,
study: str,
data_package: str,
extension: str = "parquet",
):
"""Retrieves a list of data packages for a given S3 path post-upload proceesing"""
return awswrangler.s3.list_objects(
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}",
suffix=extension,
)


def get_s3_study_meta_list(
bucket_root: str,
s3_bucket_name: str,
study: str,
data_package: str,
site: str,
extension: str = "parquet",
):
"""Retrieves a list of data packages for a given S3 path post-upload proceesing"""
return awswrangler.s3.list_objects(
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}/{site}",
suffix=extension,
)
12 changes: 11 additions & 1 deletion src/handlers/shared/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@


class BucketPath(Enum):
"""stores root level buckets for managing data processing state"""

ADMIN = "admin"
AGGREGATE = "aggregates"
ARCHIVE = "archive"
CSVAGGREGATE = "csv_aggregates"
ERROR = "error"
LAST_VALID = "last_valid"
LATEST = "latest"
META = "site_metadata"
META = "metadata"
STUDY_META = "study_metadata"
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
UPLOAD = "site_upload"


class JsonDict(Enum):
"""stores names of expected kinds of S3 json dictionaries"""

TRANSACTIONS = "transactions"
STUDY_PERIODS = "study_periods"
69 changes: 52 additions & 17 deletions src/handlers/shared/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@

import boto3

from src.handlers.shared.enums import BucketPath
from src.handlers.shared.enums import BucketPath, JsonDict

META_PATH = f"{BucketPath.META.value}/transactions.json"
METADATA_TEMPLATE = {
TRANSACTION_METADATA_TEMPLATE = {
"version": "1.0",
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
"last_upload": None,
"last_data_update": None,
"last_aggregation": None,
"last_error": None,
"earliest_data": None,
"latest_data": None,
"deleted": None,
}
STUDY_PERIOD_METADATA_TEMPLATE = {
"version": "1.0",
"earliest_date": None,
"latest_date": None,
"last_data_update": None,
}
Comment on lines +21 to +26
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explaining the business semantics here: a study period encapsulates all exports associated with a study, so there's only one date range element per study topic. This gets us around 'not every count table will have a date column in it'.



def http_response(status: int, body: str) -> Dict:
Expand All @@ -33,15 +36,27 @@ def http_response(status: int, body: str) -> Dict:
}


# metadata processing
# S3 json processing


def read_metadata(s3_client, s3_bucket_name: str) -> Dict:
def check_meta_type(meta_type: str) -> None:
"""helper for ensuring specified metadata types"""
types = [item.value for item in JsonDict]
if meta_type not in types:
raise ValueError("invalid metadata type specified")


def read_metadata(
s3_client, s3_bucket_name: str, meta_type: str = JsonDict.TRANSACTIONS.value
) -> Dict:
"""Reads transaction information from an s3 bucket as a dictionary"""
res = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=META_PATH)
check_meta_type(meta_type)
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
s3_path = f"{BucketPath.META.value}/{meta_type}.json"
res = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=s3_path)
if "Contents" in res:
res = s3_client.get_object(Bucket=s3_bucket_name, Key=META_PATH)
return json.loads(res["Body"].read())
res = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_path)
doc = res["Body"].read()
return json.loads(doc)
else:
return {}

Expand All @@ -53,20 +68,40 @@ def update_metadata(
data_package: str,
target: str,
dt: Optional[datetime] = None,
meta_type: str = JsonDict.TRANSACTIONS.value,
):
"""Safely updates items in metadata dictionary"""
site_metadata = metadata.setdefault(site, {})
study_metadata = site_metadata.setdefault(study, {})
data_package_metadata = study_metadata.setdefault(data_package, METADATA_TEMPLATE)
dt = dt or datetime.now(timezone.utc)
data_package_metadata[target] = dt.isoformat()
check_meta_type(meta_type)
if meta_type == JsonDict.TRANSACTIONS.value:
site_metadata = metadata.setdefault(site, {})
study_metadata = site_metadata.setdefault(study, {})
data_package_metadata = study_metadata.setdefault(
data_package, TRANSACTION_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
data_package_metadata[target] = dt.isoformat()
elif meta_type == JsonDict.STUDY_PERIODS.value:
site_metadata = metadata.setdefault(site, {})
study_period_metadata = site_metadata.setdefault(
study, STUDY_PERIOD_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
study_period_metadata[target] = dt.isoformat()
return metadata


def write_metadata(s3_client, s3_bucket_name: str, metadata: Dict) -> None:
def write_metadata(
s3_client,
s3_bucket_name: str,
metadata: Dict,
meta_type: str = JsonDict.TRANSACTIONS.value,
) -> None:
"""Writes transaction info from ∏a dictionary to an s3 bucket metadata location"""
check_meta_type(meta_type)
s3_client.put_object(
Bucket=s3_bucket_name, Key=META_PATH, Body=json.dumps(metadata)
Bucket=s3_bucket_name,
Key=f"{BucketPath.META.value}/{meta_type}.json",
Body=json.dumps(metadata),
)


Expand Down
17 changes: 1 addition & 16 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath
from src.handlers.shared.awswrangler_functions import get_s3_data_package_list
from src.handlers.shared.functions import (
get_s3_site_filename_suffix,
http_response,
Expand All @@ -23,22 +24,6 @@
)


def get_s3_data_package_list(
bucket_root: str,
s3_bucket_name: str,
study: str,
data_package: str,
extension: str = "parquet",
):
"""Retrieves a list of all data packages for a given S3 path"""
# TODO: this may need to be moved to a shared location at some point - it would
# need to be a new one for just AWSWrangler-enabled lambdas.
return awswrangler.s3.list_objects(
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}",
suffix=extension,
)


def concat_sets(df: pandas.DataFrame, file_path: str) -> pandas.DataFrame:
"""concats a count dataset in a specified S3 location with in memory dataframe"""
site_df = awswrangler.s3.read_parquet(file_path)
Expand Down
26 changes: 14 additions & 12 deletions src/handlers/site_upload/process_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ class UnexpectedFileTypeError(Exception):
pass


def process_upload(
s3_client, sns_client, s3_bucket_name: str, s3_key: str, topic_sns_arn: str
) -> None:
"""Moves file from upload path to powerset generation path"""
def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> None:
"""Moves file from upload path to appropriate subfolder and emits SNS event"""
last_uploaded_date = s3_client.head_object(Bucket=s3_bucket_name, Key=s3_key)[
"LastModified"
]
Expand All @@ -31,19 +29,24 @@ def process_upload(
data_package = path_params[2]
site = path_params[3]
if s3_key.endswith(".parquet"):
new_key = f"{BucketPath.LATEST.value}/{s3_key.split('/', 1)[-1]}"
if "_meta_" in s3_key:
new_key = f"{BucketPath.STUDY_META.value}/{s3_key.split('/', 1)[-1]}"
topic_sns_arn = os.environ.get("TOPIC_PROCESS_STUDY_META_ARN")
sns_subject = "Process study medata upload event"
else:
new_key = f"{BucketPath.LATEST.value}/{s3_key.split('/', 1)[-1]}"
topic_sns_arn = os.environ.get("TOPIC_PROCESS_COUNTS_ARN")
sns_subject = "Process counts upload event"
move_s3_file(s3_client, s3_bucket_name, s3_key, new_key)
metadata = update_metadata(
metadata,
site,
study,
data_package,
"last_uploaded_date",
"last_upload",
last_uploaded_date,
)
sns_client.publish(
TopicArn=topic_sns_arn, Message=new_key, Subject="Process Upload Event"
)
sns_client.publish(TopicArn=topic_sns_arn, Message=new_key, Subject=sns_subject)
write_metadata(s3_client, s3_bucket_name, metadata)
else:
new_key = f"{BucketPath.ERROR.value}/{s3_key.split('/', 1)[-1]}"
Expand All @@ -53,7 +56,7 @@ def process_upload(
site,
study,
data_package,
"last_uploaded_date",
"last_upload",
last_uploaded_date,
)
metadata = update_metadata(
Expand All @@ -68,10 +71,9 @@ def process_upload_handler(event, context):
"""manages event from S3, triggers file processing and merge"""
del context
s3_bucket = os.environ.get("BUCKET_NAME")
topic_sns_arn = os.environ.get("TOPIC_PROCESS_UPLOAD_ARN")
s3_client = boto3.client("s3")
sns_client = boto3.client("sns")
s3_key = event["Records"][0]["s3"]["object"]["key"]
process_upload(s3_client, sns_client, s3_bucket, s3_key, topic_sns_arn)
process_upload(s3_client, sns_client, s3_bucket, s3_key)
res = http_response(200, "Upload processing successful")
return res
78 changes: 78 additions & 0 deletions src/handlers/site_upload/study_period.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
""" Lambda for updating date ranges associated with studies """

import os

from datetime import datetime, timezone

import awswrangler
import boto3

from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath, JsonDict
from src.handlers.shared.awswrangler_functions import get_s3_study_meta_list
from src.handlers.shared.functions import (
http_response,
read_metadata,
update_metadata,
write_metadata,
)


def update_study_period(s3_client, s3_bucket, site, study, data_package):
"""gets earliest/latest date from study metadata files"""
path = get_s3_study_meta_list(
BucketPath.STUDY_META.value, s3_bucket, study, data_package, site
)
if len(path) != 1:
raise KeyError("Unique date path not found")
df = awswrangler.s3.read_parquet(path[0])
study_meta = read_metadata(
s3_client, s3_bucket, meta_type=JsonDict.STUDY_PERIODS.value
)
study_meta = update_metadata(
study_meta,
site,
study,
data_package,
"earliest_date",
df["min_date"][0],
meta_type=JsonDict.STUDY_PERIODS.value,
)
study_meta = update_metadata(
study_meta,
site,
study,
data_package,
"latest_date",
df["max_date"][0],
meta_type=JsonDict.STUDY_PERIODS.value,
)
study_meta = update_metadata(
study_meta,
site,
study,
data_package,
"last_data_update",
datetime.now(timezone.utc),
meta_type=JsonDict.STUDY_PERIODS.value,
)
write_metadata(
s3_client, s3_bucket, study_meta, meta_type=JsonDict.STUDY_PERIODS.value
)


@generic_error_handler(msg="Error updating study period")
def study_period_handler(event, context):
"""manages event from SNS, triggers file processing and merge"""
del context
s3_bucket = os.environ.get("BUCKET_NAME")
s3_client = boto3.client("s3")
s3_key = event["Records"][0]["Sns"]["Message"]
s3_key_array = s3_key.split("/")
site = s3_key_array[3]
study = s3_key_array[1]
data_package = s3_key_array[2]

update_study_period(s3_client, s3_bucket, site, study, data_package)
res = http_response(200, "Study period update successful")
return res
Loading