Skip to content

Commit

Permalink
[COST-5213] - fix S3 prepare (#5194)
Browse files Browse the repository at this point in the history
* Switch default parquet flag to prevent iterating on all files in each worker when there is nothing to delete
  • Loading branch information
lcouzens committed Jun 28, 2024
1 parent 4eddf8e commit a42cf32
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 35 deletions.
8 changes: 0 additions & 8 deletions koku/masu/database/report_manifest_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,6 @@ def mark_s3_parquet_cleared(self, manifest: CostUsageReportManifest, report_key:
update_fields = ["s3_parquet_cleared"]
manifest.save(update_fields=update_fields)

def mark_s3_parquet_to_be_cleared(self, manifest_id):
"""Mark manifest to clear parquet files."""
manifest = self.get_manifest_by_id(manifest_id)
if manifest:
# Set this to false to reprocesses a full month of files for AWS/Azure
manifest.s3_parquet_cleared = False
manifest.save(update_fields=["s3_parquet_cleared"])

def set_manifest_daily_start_date(self, manifest_id, date):
"""
Mark manifest processing daily archive start date.
Expand Down
1 change: 0 additions & 1 deletion koku/masu/external/downloader/aws/aws_report_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def get_processing_date(
if (
data_frame[invoice_bill].any() and start_date.month != DateHelper().now_utc.month or ingress_reports
) or not check_provider_setup_complete(provider_uuid):
ReportManifestDBAccessor().mark_s3_parquet_to_be_cleared(manifest_id)
process_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, start_date)
else:
process_date = utils.get_or_clear_daily_s3_by_date(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def get_processing_date(
or ingress_reports
):
process_date = start_date
ReportManifestDBAccessor().mark_s3_parquet_to_be_cleared(manifest_id)
process_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, process_date)
else:
process_date = get_or_clear_daily_s3_by_date(
Expand Down
26 changes: 7 additions & 19 deletions koku/masu/test/database/test_report_manifest_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,33 +182,21 @@ def test_get_s3_parquet_cleared_no_manifest(self):
status = self.manifest_accessor.get_s3_parquet_cleared(None)
self.assertFalse(status)

def test_get_s3_parquet_cleared_non_ocp(self):
"""Test that s3 CSV clear status is reported."""
status = self.manifest_accessor.get_s3_parquet_cleared(self.manifest)
self.assertTrue(status)

self.manifest_accessor.mark_s3_parquet_to_be_cleared(self.manifest.id)
fetch_manifest = self.manifest_accessor.get_manifest_by_id(self.manifest.id)

status = self.manifest_accessor.get_s3_parquet_cleared(fetch_manifest)
self.assertFalse(status)

def test_get_s3_parquet_cleared_ocp_no_key(self):
"""Test that s3 CSV clear status is reported."""
self.manifest_dict["cluster_id"] = "cluster_id"
self.manifest_dict["assembly_id"] = uuid.uuid4()
manifest = self.baker.make("CostUsageReportManifest", **self.manifest_dict)
status = self.manifest_accessor.get_s3_parquet_cleared(manifest)
self.assertTrue(status)
self.assertFalse(status)

self.manifest_accessor.mark_s3_parquet_to_be_cleared(manifest.id)
fetch_manifest = self.manifest_accessor.get_manifest_by_id(manifest.id)
self.manifest_accessor.mark_s3_parquet_cleared(manifest)

self.assertDictEqual(fetch_manifest.s3_parquet_cleared_tracker, {})
self.assertFalse(fetch_manifest.s3_parquet_cleared)
self.assertDictEqual(manifest.s3_parquet_cleared_tracker, {})
self.assertTrue(manifest.s3_parquet_cleared)

status = self.manifest_accessor.get_s3_parquet_cleared(fetch_manifest)
self.assertFalse(status)
status = self.manifest_accessor.get_s3_parquet_cleared(manifest)
self.assertTrue(status)

def test_get_s3_parquet_cleared_ocp_with_key(self):
"""Test that s3 CSV clear status is reported."""
Expand All @@ -222,7 +210,7 @@ def test_get_s3_parquet_cleared_ocp_with_key(self):
self.manifest_accessor.mark_s3_parquet_cleared(manifest, key)

self.assertDictEqual(manifest.s3_parquet_cleared_tracker, {key: True})
self.assertTrue(manifest.s3_parquet_cleared)
self.assertFalse(manifest.s3_parquet_cleared)

status = self.manifest_accessor.get_s3_parquet_cleared(manifest, key)
self.assertTrue(status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,11 @@ def test_convert_to_parquet(self, mock_remove, mock_exists):
with patch(
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None
):
with self.assertRaises(ParquetReportProcessorError):
self.report_processor_ocp.convert_to_parquet()
with patch(
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3"
):
with self.assertRaises(ParquetReportProcessorError):
self.report_processor_ocp.convert_to_parquet()

expected = "no split files to convert to parquet"
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch.object(
Expand Down
11 changes: 8 additions & 3 deletions koku/masu/util/aws/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,6 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
delete_s3_objects(request_id, to_delete, context)
manifest = ReportManifestDBAccessor().get_manifest_by_id(manifest_id)
ReportManifestDBAccessor().mark_s3_csv_cleared(manifest)
ReportManifestDBAccessor().mark_s3_parquet_to_be_cleared(manifest_id)
LOG.info(
log_json(msg="removed csv files, marked manifest csv cleared and parquet not cleared", context=context)
)
Expand Down Expand Up @@ -823,7 +822,7 @@ def delete_s3_objects(request_id, keys_to_delete, context) -> list[str]:


def clear_s3_files(
csv_s3_path, provider_uuid, start_date, metadata_key, metadata_value_check, context, request_id, invoice_month=None
csv_s3_path, provider_uuid, start_date, metadata_key, manifest_id, context, request_id, invoice_month=None
):
"""Clear s3 files for daily archive processing"""
account = context.get("account")
Expand Down Expand Up @@ -858,7 +857,7 @@ def clear_s3_files(
try:
existing_object = obj_summary.Object()
metadata_value = existing_object.metadata.get(metadata_key)
if str(metadata_value) != str(metadata_value_check):
if str(metadata_value) != str(manifest_id):
to_delete.append(existing_object.key)
except (ClientError) as err:
LOG.warning(
Expand All @@ -871,6 +870,12 @@ def clear_s3_files(
exc_info=err,
)
delete_s3_objects(request_id, to_delete, context)
manifest_accessor = ReportManifestDBAccessor()
manifest = manifest_accessor.get_manifest_by_id(manifest_id)
# Note: Marking the parquet files cleared here prevents all
# the parquet files for the manifest from being deleted
# later on in report_parquet_processor
manifest_accessor.mark_s3_parquet_cleared(manifest)


def remove_files_not_in_set_from_s3_bucket(request_id, s3_path, manifest_id, context=None):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.11 on 2024-06-28 12:00
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

dependencies = [
("reporting_common", "0041_diskcapacity"),
]

operations = [
migrations.AlterField(
model_name="costusagereportmanifest",
name="s3_parquet_cleared",
field=models.BooleanField(default=False, null=True),
),
]
2 changes: 1 addition & 1 deletion koku/reporting_common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Meta:
# s3_csv_cleared used in AWS/Azure to indicate csv's have been cleared for daily archive processing
s3_csv_cleared = models.BooleanField(default=False, null=True)
# s3_parquet_cleared used to indicate parquet files have been cleared prior to csv to parquet conversion
s3_parquet_cleared = models.BooleanField(default=True, null=True)
s3_parquet_cleared = models.BooleanField(default=False, null=True)
# Indicates what initial date to start at for daily processing
daily_archive_start_date = models.DateTimeField(null=True)
operator_version = models.TextField(null=True)
Expand Down

0 comments on commit a42cf32

Please sign in to comment.