Skip to content

Commit

Permalink
[COST-5226] - Skip S3 delete (daily flow) if we have marked deletion …
Browse files Browse the repository at this point in the history
…complete. (#5198)

* dont attempt more S3 deletes if we have marked deletion complete
  • Loading branch information
lcouzens committed Jul 2, 2024
1 parent 066faf6 commit 0507abb
Showing 1 changed file with 14 additions and 23 deletions.
37 changes: 14 additions & 23 deletions koku/masu/util/aws/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,13 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
Fetches latest processed date based on daily csv files and clears relevant s3 files
"""
# We do this if we have multiple workers running different files for a single manifest.
processing_date = ReportManifestDBAccessor().get_manifest_daily_start_date(manifest_id)
manifest_accessor = ReportManifestDBAccessor()
manifest = manifest_accessor.get_manifest_by_id(manifest_id)
processing_date = manifest_accessor.get_manifest_daily_start_date(manifest_id)
if processing_date:
# Prevent other works running trino queries until all files are removed.
clear_s3_files(csv_s3_path, provider_uuid, processing_date, "manifestid", manifest_id, context, request_id)
if not manifest_accessor.get_s3_parquet_cleared(manifest):
# Prevent other works running trino queries until all files are removed.
clear_s3_files(csv_s3_path, provider_uuid, processing_date, "manifestid", manifest_id, context, request_id)
return processing_date
processing_date = start_date
try:
Expand All @@ -610,7 +613,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
process_date - datetime.timedelta(days=3) if process_date.day > 3 else process_date.replace(day=1)
)
# Set processing date for all workers
processing_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, processing_date)
processing_date = manifest_accessor.set_manifest_daily_start_date(manifest_id, processing_date)
# Try to clear s3 files for dates. Small edge case, we may have parquet files even without csvs
clear_s3_files(csv_s3_path, provider_uuid, processing_date, "manifestid", manifest_id, context, request_id)
except (EndpointConnectionError, ClientError, AttributeError, ValueError):
Expand All @@ -626,7 +629,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
bucket=settings.S3_BUCKET_NAME,
),
)
processing_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, processing_date)
processing_date = manifest_accessor.set_manifest_daily_start_date(manifest_id, processing_date)
to_delete = get_s3_objects_not_matching_metadata(
request_id,
csv_s3_path,
Expand All @@ -635,8 +638,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
context=context,
)
delete_s3_objects(request_id, to_delete, context)
manifest = ReportManifestDBAccessor().get_manifest_by_id(manifest_id)
ReportManifestDBAccessor().mark_s3_csv_cleared(manifest)
manifest_accessor.mark_s3_csv_cleared(manifest)
LOG.info(
log_json(msg="removed csv files, marked manifest csv cleared and parquet not cleared", context=context)
)
Expand Down Expand Up @@ -853,22 +855,11 @@ def clear_s3_files(
s3_prefixes.append(parquet_ocp_on_cloud_path_s3 + path)
to_delete = []
for prefix in s3_prefixes:
for obj_summary in _get_s3_objects(prefix):
try:
existing_object = obj_summary.Object()
metadata_value = existing_object.metadata.get(metadata_key)
if str(metadata_value) != str(manifest_id):
to_delete.append(existing_object.key)
except (ClientError) as err:
LOG.warning(
log_json(
request_id,
msg="unable to get matching object, likely deleted by another worker",
context=context,
bucket=settings.S3_BUCKET_NAME,
),
exc_info=err,
)
to_delete.extend(
get_s3_objects_not_matching_metadata(
request_id, prefix, metadata_key=metadata_key, metadata_value_check=str(manifest_id), context=context
)
)
delete_s3_objects(request_id, to_delete, context)
manifest_accessor = ReportManifestDBAccessor()
manifest = manifest_accessor.get_manifest_by_id(manifest_id)
Expand Down

0 comments on commit 0507abb

Please sign in to comment.