diff --git a/koku/masu/database/report_manifest_db_accessor.py b/koku/masu/database/report_manifest_db_accessor.py index b4c89261d1..d17c323d4d 100644 --- a/koku/masu/database/report_manifest_db_accessor.py +++ b/koku/masu/database/report_manifest_db_accessor.py @@ -245,14 +245,6 @@ def mark_s3_parquet_cleared(self, manifest: CostUsageReportManifest, report_key: update_fields = ["s3_parquet_cleared"] manifest.save(update_fields=update_fields) - def mark_s3_parquet_to_be_cleared(self, manifest_id): - """Mark manifest to clear parquet files.""" - manifest = self.get_manifest_by_id(manifest_id) - if manifest: - # Set this to false to reprocesses a full month of files for AWS/Azure - manifest.s3_parquet_cleared = False - manifest.save(update_fields=["s3_parquet_cleared"]) - def set_manifest_daily_start_date(self, manifest_id, date): """ Mark manifest processing daily archive start date. diff --git a/koku/masu/external/downloader/aws/aws_report_downloader.py b/koku/masu/external/downloader/aws/aws_report_downloader.py index 7fd5ff2682..84933f5e47 100644 --- a/koku/masu/external/downloader/aws/aws_report_downloader.py +++ b/koku/masu/external/downloader/aws/aws_report_downloader.py @@ -82,7 +82,6 @@ def get_processing_date( if ( data_frame[invoice_bill].any() and start_date.month != DateHelper().now_utc.month or ingress_reports ) or not check_provider_setup_complete(provider_uuid): - ReportManifestDBAccessor().mark_s3_parquet_to_be_cleared(manifest_id) process_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, start_date) else: process_date = utils.get_or_clear_daily_s3_by_date( diff --git a/koku/masu/external/downloader/azure/azure_report_downloader.py b/koku/masu/external/downloader/azure/azure_report_downloader.py index 900060c153..aa5796979d 100644 --- a/koku/masu/external/downloader/azure/azure_report_downloader.py +++ b/koku/masu/external/downloader/azure/azure_report_downloader.py @@ -68,7 +68,6 @@ def get_processing_date( or ingress_reports ): process_date = start_date - ReportManifestDBAccessor().mark_s3_parquet_to_be_cleared(manifest_id) process_date = ReportManifestDBAccessor().set_manifest_daily_start_date(manifest_id, process_date) else: process_date = get_or_clear_daily_s3_by_date( diff --git a/koku/masu/test/database/test_report_manifest_db_accessor.py b/koku/masu/test/database/test_report_manifest_db_accessor.py index 0a777630df..87d0f12e87 100644 --- a/koku/masu/test/database/test_report_manifest_db_accessor.py +++ b/koku/masu/test/database/test_report_manifest_db_accessor.py @@ -182,33 +182,21 @@ def test_get_s3_parquet_cleared_no_manifest(self): status = self.manifest_accessor.get_s3_parquet_cleared(None) self.assertFalse(status) - def test_get_s3_parquet_cleared_non_ocp(self): - """Test that s3 CSV clear status is reported.""" - status = self.manifest_accessor.get_s3_parquet_cleared(self.manifest) - self.assertTrue(status) - - self.manifest_accessor.mark_s3_parquet_to_be_cleared(self.manifest.id) - fetch_manifest = self.manifest_accessor.get_manifest_by_id(self.manifest.id) - - status = self.manifest_accessor.get_s3_parquet_cleared(fetch_manifest) - self.assertFalse(status) - def test_get_s3_parquet_cleared_ocp_no_key(self): """Test that s3 CSV clear status is reported.""" self.manifest_dict["cluster_id"] = "cluster_id" self.manifest_dict["assembly_id"] = uuid.uuid4() manifest = self.baker.make("CostUsageReportManifest", **self.manifest_dict) status = self.manifest_accessor.get_s3_parquet_cleared(manifest) - self.assertTrue(status) + self.assertFalse(status) - self.manifest_accessor.mark_s3_parquet_to_be_cleared(manifest.id) - fetch_manifest = self.manifest_accessor.get_manifest_by_id(manifest.id) + self.manifest_accessor.mark_s3_parquet_cleared(manifest) - self.assertDictEqual(fetch_manifest.s3_parquet_cleared_tracker, {}) - self.assertFalse(fetch_manifest.s3_parquet_cleared) + self.assertDictEqual(manifest.s3_parquet_cleared_tracker, {}) + self.assertTrue(manifest.s3_parquet_cleared) - status = self.manifest_accessor.get_s3_parquet_cleared(fetch_manifest) - self.assertFalse(status) + status = self.manifest_accessor.get_s3_parquet_cleared(manifest) + self.assertTrue(status) def test_get_s3_parquet_cleared_ocp_with_key(self): """Test that s3 CSV clear status is reported.""" @@ -222,7 +210,7 @@ def test_get_s3_parquet_cleared_ocp_with_key(self): self.manifest_accessor.mark_s3_parquet_cleared(manifest, key) self.assertDictEqual(manifest.s3_parquet_cleared_tracker, {key: True}) - self.assertTrue(manifest.s3_parquet_cleared) + self.assertFalse(manifest.s3_parquet_cleared) status = self.manifest_accessor.get_s3_parquet_cleared(manifest, key) self.assertTrue(status) diff --git a/koku/masu/test/processor/parquet/test_parquet_report_processor.py b/koku/masu/test/processor/parquet/test_parquet_report_processor.py index 6c4bdb546e..5371bb1d7a 100644 --- a/koku/masu/test/processor/parquet/test_parquet_report_processor.py +++ b/koku/masu/test/processor/parquet/test_parquet_report_processor.py @@ -275,8 +275,11 @@ def test_convert_to_parquet(self, mock_remove, mock_exists): with patch( "masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None ): - with self.assertRaises(ParquetReportProcessorError): - self.report_processor_ocp.convert_to_parquet() + with patch( + "masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3" + ): + with self.assertRaises(ParquetReportProcessorError): + self.report_processor_ocp.convert_to_parquet() expected = "no split files to convert to parquet" with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch.object( diff --git a/koku/masu/util/aws/common.py b/koku/masu/util/aws/common.py index e562db13bf..1f4df87815 100644 --- a/koku/masu/util/aws/common.py +++ b/koku/masu/util/aws/common.py @@ -637,7 +637,6 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da delete_s3_objects(request_id, to_delete, context) manifest = ReportManifestDBAccessor().get_manifest_by_id(manifest_id) ReportManifestDBAccessor().mark_s3_csv_cleared(manifest) - ReportManifestDBAccessor().mark_s3_parquet_to_be_cleared(manifest_id) LOG.info( log_json(msg="removed csv files, marked manifest csv cleared and parquet not cleared", context=context) ) @@ -823,7 +822,7 @@ def delete_s3_objects(request_id, keys_to_delete, context) -> list[str]: def clear_s3_files( - csv_s3_path, provider_uuid, start_date, metadata_key, metadata_value_check, context, request_id, invoice_month=None + csv_s3_path, provider_uuid, start_date, metadata_key, manifest_id, context, request_id, invoice_month=None ): """Clear s3 files for daily archive processing""" account = context.get("account") @@ -858,7 +857,7 @@ def clear_s3_files( try: existing_object = obj_summary.Object() metadata_value = existing_object.metadata.get(metadata_key) - if str(metadata_value) != str(metadata_value_check): + if str(metadata_value) != str(manifest_id): to_delete.append(existing_object.key) except (ClientError) as err: LOG.warning( @@ -871,6 +870,12 @@ def clear_s3_files( exc_info=err, ) delete_s3_objects(request_id, to_delete, context) + manifest_accessor = ReportManifestDBAccessor() + manifest = manifest_accessor.get_manifest_by_id(manifest_id) + # Note: Marking the parquet files cleared here prevents all + # the parquet files for the manifest from being deleted + # later on in report_parquet_processor + manifest_accessor.mark_s3_parquet_cleared(manifest) def remove_files_not_in_set_from_s3_bucket(request_id, s3_path, manifest_id, context=None): diff --git a/koku/reporting_common/migrations/0042_alter_costusagereportmanifest_s3_parquet_cleared.py b/koku/reporting_common/migrations/0042_alter_costusagereportmanifest_s3_parquet_cleared.py new file mode 100644 index 0000000000..e37cd61017 --- /dev/null +++ b/koku/reporting_common/migrations/0042_alter_costusagereportmanifest_s3_parquet_cleared.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.11 on 2024-06-28 12:00 +from django.db import migrations +from django.db import models + + +class Migration(migrations.Migration): + + dependencies = [ + ("reporting_common", "0041_diskcapacity"), + ] + + operations = [ + migrations.AlterField( + model_name="costusagereportmanifest", + name="s3_parquet_cleared", + field=models.BooleanField(default=False, null=True), + ), + ] diff --git a/koku/reporting_common/models.py b/koku/reporting_common/models.py index cc89cc757e..4d37c1c9d6 100644 --- a/koku/reporting_common/models.py +++ b/koku/reporting_common/models.py @@ -43,7 +43,7 @@ class Meta: # s3_csv_cleared used in AWS/Azure to indicate csv's have been cleared for daily archive processing s3_csv_cleared = models.BooleanField(default=False, null=True) # s3_parquet_cleared used to indicate parquet files have been cleared prior to csv to parquet conversion - s3_parquet_cleared = models.BooleanField(default=True, null=True) + s3_parquet_cleared = models.BooleanField(default=False, null=True) # Indicates what initial date to start at for daily processing daily_archive_start_date = models.DateTimeField(null=True) operator_version = models.TextField(null=True)