diff --git a/koku/masu/util/aws/common.py b/koku/masu/util/aws/common.py index 1f4df87815..9d1abd0121 100644 --- a/koku/masu/util/aws/common.py +++ b/koku/masu/util/aws/common.py @@ -587,10 +587,13 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da Fetches latest processed date based on daily csv files and clears relevant s3 files """ # We do this if we have multiple workers running different files for a single manifest. - processing_date = ReportManifestDBAccessor().get_manifest_daily_start_date(manifest_id) + manifest_accessor = ReportManifestDBAccessor() + manifest = manifest_accessor.get_manifest_by_id(manifest_id) + processing_date = manifest_accessor.get_manifest_daily_start_date(manifest_id) if processing_date: - # Prevent other works running trino queries until all files are removed. - clear_s3_files(csv_s3_path, provider_uuid, processing_date, "manifestid", manifest_id, context, request_id) + if not manifest_accessor.get_s3_parquet_cleared(manifest): + # Prevent other works running trino queries until all files are removed. + clear_s3_files(csv_s3_path, provider_uuid, processing_date, "manifestid", manifest_id, context, request_id) return processing_date processing_date = start_date try: @@ -610,7 +613,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da process_date - datetime.timedelta(days=3) if process_date.day > 3 else process_date.replace(day=1) ) # Set processing date for all workers - processing_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, processing_date) + processing_date = manifest_accessor.set_manifest_daily_start_date(manifest_id, processing_date) # Try to clear s3 files for dates. Small edge case, we may have parquet files even without csvs clear_s3_files(csv_s3_path, provider_uuid, processing_date, "manifestid", manifest_id, context, request_id) except (EndpointConnectionError, ClientError, AttributeError, ValueError): @@ -626,7 +629,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da bucket=settings.S3_BUCKET_NAME, ), ) - processing_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, processing_date) + processing_date = manifest_accessor.set_manifest_daily_start_date(manifest_id, processing_date) to_delete = get_s3_objects_not_matching_metadata( request_id, csv_s3_path, @@ -635,8 +638,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da context=context, ) delete_s3_objects(request_id, to_delete, context) - manifest = ReportManifestDBAccessor().get_manifest_by_id(manifest_id) - ReportManifestDBAccessor().mark_s3_csv_cleared(manifest) + manifest_accessor.mark_s3_csv_cleared(manifest) LOG.info( log_json(msg="removed csv files, marked manifest csv cleared and parquet not cleared", context=context) ) @@ -853,22 +855,11 @@ def clear_s3_files( s3_prefixes.append(parquet_ocp_on_cloud_path_s3 + path) to_delete = [] for prefix in s3_prefixes: - for obj_summary in _get_s3_objects(prefix): - try: - existing_object = obj_summary.Object() - metadata_value = existing_object.metadata.get(metadata_key) - if str(metadata_value) != str(manifest_id): - to_delete.append(existing_object.key) - except (ClientError) as err: - LOG.warning( - log_json( - request_id, - msg="unable to get matching object, likely deleted by another worker", - context=context, - bucket=settings.S3_BUCKET_NAME, - ), - exc_info=err, - ) + to_delete.extend( + get_s3_objects_not_matching_metadata( + request_id, prefix, metadata_key=metadata_key, metadata_value_check=str(manifest_id), context=context + ) + ) delete_s3_objects(request_id, to_delete, context) manifest_accessor = ReportManifestDBAccessor() manifest = manifest_accessor.get_manifest_by_id(manifest_id)