From b6e4127b2d3c83e42fbf6dc675f8c0f1aa999da7 Mon Sep 17 00:00:00 2001 From: myersCody Date: Mon, 17 Jun 2024 09:25:51 -0400 Subject: [PATCH 1/8] [COST-5141] Automation for vaccuming trino migrations --- koku/masu/api/expired_data.py | 29 +++++++++++++++ koku/masu/api/urls.py | 2 ++ koku/masu/api/views.py | 1 + koku/masu/database/aws_report_db_accessor.py | 8 ++--- .../masu/database/azure_report_db_accessor.py | 8 ++--- koku/masu/database/gcp_report_db_accessor.py | 8 ++--- koku/masu/database/ocp_report_db_accessor.py | 18 ++++++++++ koku/masu/database/report_db_accessor_base.py | 4 +-- koku/masu/processor/_tasks/remove_expired.py | 35 +++++++++++++++++++ koku/masu/processor/expired_data_remover.py | 15 +++++++- .../processor/ocp/ocp_report_db_cleaner.py | 24 +++++++++++++ koku/masu/processor/tasks.py | 25 +++++++++++++ koku/reporting/models.py | 28 +++++++++++++++ 13 files changed, 184 insertions(+), 21 deletions(-) diff --git a/koku/masu/api/expired_data.py b/koku/masu/api/expired_data.py index 49b29c4fe6..c89f1520cb 100644 --- a/koku/masu/api/expired_data.py +++ b/koku/masu/api/expired_data.py @@ -6,6 +6,7 @@ import logging from django.views.decorators.cache import never_cache +from rest_framework import status from rest_framework.decorators import api_view from rest_framework.decorators import permission_classes from rest_framework.decorators import renderer_classes @@ -15,6 +16,7 @@ from masu.config import Config from masu.processor.orchestrator import Orchestrator +from masu.processor.tasks import remove_expired_trino_migrations LOG = logging.getLogger(__name__) @@ -36,3 +38,30 @@ def expired_data(request): if simulate: response_key = response_key + " (simulated)" return Response({response_key: str(async_delete_results)}) + + +@never_cache +@api_view(http_method_names=["GET", "DELETE"]) +@permission_classes((AllowAny,)) +@renderer_classes(tuple(api_settings.DEFAULT_RENDERER_CLASSES)) +def expired_trino_migrations(request): + """Return expired data.""" + params = request.query_params + schema_name = params.get("schema") + provider_type = params.get("provider_type") + provider_uuid = params.get("provider_uuid") + simulate = params.get("simulate") + simulate = True + + if request.method == "DELETE" and Config.DEBUG: + simulate = False + LOG.info("Simulate Flag: %s", simulate) + if schema_name is None: + errmsg = "schema is a required parameter." + return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) + + async_delete_results = remove_expired_trino_migrations.delay(schema_name, provider_type, simulate, provider_uuid) + response_key = "Async jobs for expired data removal" + if simulate: + response_key = response_key + " (simulated)" + return Response({response_key: str(async_delete_results)}) diff --git a/koku/masu/api/urls.py b/koku/masu/api/urls.py index 85cf5154b1..7735736a1f 100644 --- a/koku/masu/api/urls.py +++ b/koku/masu/api/urls.py @@ -21,6 +21,7 @@ from masu.api.views import download_report from masu.api.views import EnabledTagView from masu.api.views import expired_data +from masu.api.views import expired_trino_migrations from masu.api.views import explain_query from masu.api.views import fix_parquet from masu.api.views import get_status @@ -58,6 +59,7 @@ path("update_exchange_rates/", update_exchange_rates, name="update_exchange_rates"), path("update_azure_storage_capacity/", update_azure_storage_capacity, name="update_azure_storage_capacity"), path("enabled_tags/", EnabledTagView.as_view(), name="enabled_tags"), + path("expired_trino_migrations", expired_trino_migrations, name="expired_trino_migrations"), path("expired_data/", expired_data, name="expired_data"), path("hcs_report_data/", hcs_report_data, name="hcs_report_data"), path("hcs_report_finalization/", hcs_report_finalization, name="hcs_report_finalization"), diff --git a/koku/masu/api/views.py b/koku/masu/api/views.py index f56ada5a87..9572bb9733 100644 --- a/koku/masu/api/views.py +++ b/koku/masu/api/views.py @@ -18,6 +18,7 @@ from masu.api.download import download_report from masu.api.enabled_tags import EnabledTagView from masu.api.expired_data import expired_data +from masu.api.expired_data import expired_trino_migrations from masu.api.hcs_report_data import hcs_report_data from masu.api.hcs_report_finalization import hcs_report_finalization from masu.api.ingest_ocp_payload import ingest_ocp_payload diff --git a/koku/masu/database/aws_report_db_accessor.py b/koku/masu/database/aws_report_db_accessor.py index b106375b1c..9bc5f95901 100644 --- a/koku/masu/database/aws_report_db_accessor.py +++ b/koku/masu/database/aws_report_db_accessor.py @@ -26,6 +26,7 @@ from masu.processor import is_feature_cost_3592_tag_mapping_enabled from reporting.models import OCP_ON_ALL_PERSPECTIVES from reporting.models import OCP_ON_AWS_PERSPECTIVES +from reporting.models import OCP_ON_AWS_TEMP_MANAGED_TABLES from reporting.models import OCPAllCostLineItemDailySummaryP from reporting.models import OCPAllCostLineItemProjectDailySummaryP from reporting.models import OCPAWSCostLineItemProjectDailySummaryP @@ -243,12 +244,7 @@ def populate_ocp_on_aws_cost_daily_summary_trino( days = self.date_helper.list_days(start_date, end_date) days_tup = tuple(str(day.day) for day in days) self.delete_ocp_on_aws_hive_partition_by_day(days_tup, aws_provider_uuid, openshift_provider_uuid, year, month) - tables = [ - "reporting_ocpawscostlineitem_project_daily_summary_temp", - "aws_openshift_daily_resource_matched_temp", - "aws_openshift_daily_tag_matched_temp", - ] - for table in tables: + for table in OCP_ON_AWS_TEMP_MANAGED_TABLES: self.delete_hive_partition_by_month(table, openshift_provider_uuid, year, month) pod_column = "pod_effective_usage_cpu_core_hours" diff --git a/koku/masu/database/azure_report_db_accessor.py b/koku/masu/database/azure_report_db_accessor.py index e2deda1e68..a396d3980d 100644 --- a/koku/masu/database/azure_report_db_accessor.py +++ b/koku/masu/database/azure_report_db_accessor.py @@ -26,6 +26,7 @@ from masu.processor import is_feature_cost_3592_tag_mapping_enabled from reporting.models import OCP_ON_ALL_PERSPECTIVES from reporting.models import OCP_ON_AZURE_PERSPECTIVES +from reporting.models import OCP_ON_AZURE_TEMP_MANAGED_TABLES from reporting.models import OCPAllCostLineItemDailySummaryP from reporting.models import OCPAllCostLineItemProjectDailySummaryP from reporting.models import OCPAzureCostLineItemProjectDailySummaryP @@ -296,12 +297,7 @@ def populate_ocp_on_azure_cost_daily_summary_trino( """Populate the daily cost aggregated summary for OCP on Azure.""" year = start_date.strftime("%Y") month = start_date.strftime("%m") - tables = [ - "reporting_ocpazurecostlineitem_project_daily_summary_temp", - "azure_openshift_daily_resource_matched_temp", - "azure_openshift_daily_tag_matched_temp", - ] - for table in tables: + for table in OCP_ON_AZURE_TEMP_MANAGED_TABLES: self.delete_hive_partition_by_month(table, openshift_provider_uuid, year, month) days = self.date_helper.list_days(start_date, end_date) days_tup = tuple(str(day.day) for day in days) diff --git a/koku/masu/database/gcp_report_db_accessor.py b/koku/masu/database/gcp_report_db_accessor.py index 3497deece5..3fa3a95bdf 100644 --- a/koku/masu/database/gcp_report_db_accessor.py +++ b/koku/masu/database/gcp_report_db_accessor.py @@ -29,6 +29,7 @@ from masu.processor import is_feature_cost_3592_tag_mapping_enabled from masu.util.gcp.common import check_resource_level from masu.util.ocp.common import get_cluster_alias_from_cluster_id +from reporting.models import OCP_ON_GCP_TEMP_MANAGED_TABLES from reporting.provider.all.models import TagMapping from reporting.provider.gcp.models import GCPCostEntryBill from reporting.provider.gcp.models import GCPCostEntryLineItemDailySummary @@ -322,12 +323,7 @@ def populate_ocp_on_gcp_cost_daily_summary_trino( year = start_date.strftime("%Y") month = start_date.strftime("%m") - tables = [ - "reporting_ocpgcpcostlineitem_project_daily_summary_temp", - "gcp_openshift_daily_resource_matched_temp", - "gcp_openshift_daily_tag_matched_temp", - ] - for table in tables: + for table in OCP_ON_GCP_TEMP_MANAGED_TABLES: self.delete_hive_partition_by_month(table, openshift_provider_uuid, year, month) days = self.date_helper.list_days(start_date, end_date) diff --git a/koku/masu/database/ocp_report_db_accessor.py b/koku/masu/database/ocp_report_db_accessor.py index 0d0a79b4fd..0eb56e0555 100644 --- a/koku/masu/database/ocp_report_db_accessor.py +++ b/koku/masu/database/ocp_report_db_accessor.py @@ -283,6 +283,24 @@ def delete_hive_partitions_by_source(self, table, partition_column, provider_uui LOG.info(log_json(msg="successfully deleted Hive partitions", context=ctx)) return True + def find_expired_trino_partitions(self, table, source_column, date_str): + """Queries Trino for partitions less than the parition date.""" + if not self.schema_exists_trino() or not self.table_exists_trino(table): + return False + sql = f""" +SELECT partitions.year, partitions.month, partitions.source +FROM ( + SELECT year as year, + month as month, + day as day, + cast(date_parse(concat(year, '-', month, '-', day), '%Y-%m-%d') as date) as partition_date, + {source_column} as source + FROM "{table}$partitions" +) as partitions +WHERE partitions.partition_date < DATE '{date_str}' +""" + return self._execute_trino_raw_sql_query(sql, log_ref="finding expired partitions") + def populate_line_item_daily_summary_table_trino( self, start_date, end_date, report_period_id, cluster_id, cluster_alias, source ): diff --git a/koku/masu/database/report_db_accessor_base.py b/koku/masu/database/report_db_accessor_base.py index f5a91b0a84..88463f2009 100644 --- a/koku/masu/database/report_db_accessor_base.py +++ b/koku/masu/database/report_db_accessor_base.py @@ -221,7 +221,7 @@ def schema_exists_trino(self): check_sql = f"SHOW SCHEMAS LIKE '{self.schema}'" return bool(self._execute_trino_raw_sql_query(check_sql, log_ref="schema_exists_trino")) - def delete_hive_partition_by_month(self, table, source, year, month): + def delete_hive_partition_by_month(self, table, source, year, month, source_column="ocp_source"): """Deletes partitions individually by month.""" retries = settings.HIVE_PARTITION_DELETE_RETRIES if self.schema_exists_trino() and self.table_exists_trino(table): @@ -238,7 +238,7 @@ def delete_hive_partition_by_month(self, table, source, year, month): try: sql = f""" DELETE FROM hive.{self.schema}.{table} - WHERE ocp_source = '{source}' + WHERE {source_column} = '{source}' AND year = '{year}' AND (month = replace(ltrim(replace('{month}', '0', ' ')),' ', '0') OR month = '{month}') """ diff --git a/koku/masu/processor/_tasks/remove_expired.py b/koku/masu/processor/_tasks/remove_expired.py index b1160ae34e..bdd52c18ac 100644 --- a/koku/masu/processor/_tasks/remove_expired.py +++ b/koku/masu/processor/_tasks/remove_expired.py @@ -6,6 +6,7 @@ import logging from masu.processor.expired_data_remover import ExpiredDataRemover +from masu.processor.expired_data_remover import ExpiredDataRemoverError LOG = logging.getLogger(__name__) @@ -37,3 +38,37 @@ def _remove_expired_data(schema_name, provider, simulate, provider_uuid=None): status_msg = "Expired Data" if simulate else "Removed Data" result_msg = f"{status_msg}:\n {str(removed_data)}" LOG.info(result_msg) + # We could extend the logic below here, or keep it as a separate celery task + + +def _remove_expired_trino_migrations(schema_name, provider_type, simulate, provider_uuid=None): + """ + Task to remove expired data. + + Args: + schema_name (String) db schema name + provider (String) provider type + simulate (Boolean) Simulate report data removal + + Returns: + None + + """ + log_statement = ( + f"Remove expired data:\n" + f" schema_name: {schema_name}\n" + f" provider_type: {provider_type}\n" + f" simulate: {simulate}\n" + ) + LOG.info(log_statement) + + try: + remover = ExpiredDataRemover(schema_name, provider_type) + except ExpiredDataRemoverError: + return + + removed_trino_migrations = remover.remove_expired_trino_partitions(simulate=simulate, provider_uuid=provider_uuid) + if removed_trino_migrations: + status_msg = "Expired Partitions" if simulate else "Removed Partitions" + result_msg = f"{status_msg}:\n {str(removed_trino_migrations)}" + LOG.info(result_msg) diff --git a/koku/masu/processor/expired_data_remover.py b/koku/masu/processor/expired_data_remover.py index b96d9e66b4..beefa027a0 100644 --- a/koku/masu/processor/expired_data_remover.py +++ b/koku/masu/processor/expired_data_remover.py @@ -156,5 +156,18 @@ def remove(self, simulate=False, provider_uuid=None): self._provider, expiration_date, ) - return removed_data + + def remove_expired_trino_partitions(self, simulate=False, provider_uuid=None): + """ + Removes expired trino partitions based on the retention policy. + """ + if self._provider != Provider.PROVIDER_OCP: + LOG.info(f"{Provider.PROVIDER_OCP} is the only supported type for removing trino partitions.") + return + removed_partitions = [] + expiration_date = self._calculate_expiration_date() + removed_partitions = self._cleaner.purge_expired_trino_partitions( + expired_date=expiration_date, simulate=simulate, provider_uuid=provider_uuid + ) + return removed_partitions diff --git a/koku/masu/processor/ocp/ocp_report_db_cleaner.py b/koku/masu/processor/ocp/ocp_report_db_cleaner.py index 61d1a3f204..996cf0b7cf 100644 --- a/koku/masu/processor/ocp/ocp_report_db_cleaner.py +++ b/koku/masu/processor/ocp/ocp_report_db_cleaner.py @@ -12,6 +12,7 @@ from koku.database import cascade_delete from koku.database import execute_delete_sql from masu.database.ocp_report_db_accessor import OCPReportDBAccessor +from reporting.models import EXPIRE_MANAGED_TABLES from reporting.models import PartitionedTable from reporting.provider.ocp.models import UI_SUMMARY_TABLES @@ -144,3 +145,26 @@ def purge_expired_report_data_by_date(self, expired_date, simulate=False): LOG.info(log_json(msg="deleted table partitions", count=del_count, schema=self._schema)) return removed_items + + def purge_expired_trino_partitions(self, expired_date, provider_uuid=None, simulate=False): + """Removes expired trino partitions.""" + if (expired_date is not None and provider_uuid is not None) or ( # noqa: W504 + expired_date is None and provider_uuid is None + ): + err = "This method must be called with expired_date or provider_uuid" + raise OCPReportDBCleanerError(err) + + with OCPReportDBAccessor(self._schema) as accessor: + for table, source_column in EXPIRE_MANAGED_TABLES.items(): + results = accessor.find_expired_trino_partitions(table, source_column, str(expired_date.date())) + if results: + LOG.info(f"Discovered {len(results)} expired partitions") + else: + return + if simulate: + for partition in results: + LOG.info(f"partition_info: {partition}") + if not simulate: + for result in results: + year, month, source_value = result + accessor.delete_hive_partition_by_month(table, source_value, year, month, source_column) diff --git a/koku/masu/processor/tasks.py b/koku/masu/processor/tasks.py index b918af63a6..0d9dfbf6ca 100644 --- a/koku/masu/processor/tasks.py +++ b/koku/masu/processor/tasks.py @@ -43,6 +43,7 @@ from masu.processor._tasks.download import _get_report_files from masu.processor._tasks.process import _process_report_file from masu.processor._tasks.remove_expired import _remove_expired_data +from masu.processor._tasks.remove_expired import _remove_expired_trino_migrations from masu.processor.cost_model_cost_updater import CostModelCostUpdater from masu.processor.ocp.ocp_cloud_parquet_summary_updater import DELETE_TABLE from masu.processor.ocp.ocp_cloud_parquet_summary_updater import TRUNCATE_TABLE @@ -346,6 +347,30 @@ def remove_expired_data(schema_name, provider, simulate, provider_uuid=None, que _remove_expired_data(schema_name, provider, simulate, provider_uuid) +@celery_app.task(name="masu.processor.tasks.remove_expired_data", queue=DEFAULT) +def remove_expired_trino_migrations(schema_name, provider_type, simulate, provider_uuid=None, queue_name=None): + """ + Remove expired report data. + + Args: + schema_name (String) db schema name + provider (String) provider type + simulate (Boolean) Simulate report data removal + + Returns: + None + + """ + + context = { + "schema": schema_name, + "provider_type": provider_type, + "provider_uuid": provider_uuid, + } + LOG.info(log_json("remove_expired_data", msg="removing expired data", context=context)) + _remove_expired_trino_migrations(schema_name, provider_type, simulate, provider_uuid) + + @celery_app.task(name="masu.processor.tasks.summarize_reports", queue=SUMMARIZE_REPORTS_QUEUE) # noqa: C901 def summarize_reports( # noqa: C901 reports_to_summarize, queue_name=None, manifest_list=None, ingress_report_uuid=None diff --git a/koku/reporting/models.py b/koku/reporting/models.py index bd873b18db..27ea4cc104 100644 --- a/koku/reporting/models.py +++ b/koku/reporting/models.py @@ -161,6 +161,7 @@ OCPAzureDatabaseSummaryP, ) +# These are cleaned during source delete TRINO_MANAGED_TABLES = { "reporting_ocpusagelineitem_daily_summary": "source", "reporting_ocpawscostlineitem_project_daily_summary": "ocp_source", @@ -176,3 +177,30 @@ "gcp_openshift_daily_resource_matched_temp": "ocp_source", "gcp_openshift_daily_tag_matched_temp": "ocp_source", } + +# These are cleaned during expired_data flow +EXPIRE_MANAGED_TABLES = { + "reporting_ocpusagelineitem_daily_summary": "source", + "reporting_ocpawscostlineitem_project_daily_summary": "ocp_source", + "reporting_ocpgcpcostlineitem_project_daily_summary": "ocp_source", + "reporting_ocpazurecostlineitem_project_daily_summary": "ocp_source", +} + +# TEMP tables are cleaned during day to day processing +OCP_ON_AWS_TEMP_MANAGED_TABLES = { + "reporting_ocpawscostlineitem_project_daily_summary_temp", + "aws_openshift_daily_resource_matched_temp", + "aws_openshift_daily_tag_matched_temp", +} + +OCP_ON_AZURE_TEMP_MANAGED_TABLES = { + "reporting_ocpazurecostlineitem_project_daily_summary_temp", + "azure_openshift_daily_resource_matched_temp", + "azure_openshift_daily_tag_matched_temp", +} + +OCP_ON_GCP_TEMP_MANAGED_TABLES = { + "reporting_ocpgcpcostlineitem_project_daily_summary_temp", + "gcp_openshift_daily_resource_matched_temp", + "gcp_openshift_daily_tag_matched_temp", +} From b3658a9ee17b00305f2a51393c9544944108d616 Mon Sep 17 00:00:00 2001 From: myersCody Date: Mon, 17 Jun 2024 16:16:48 -0400 Subject: [PATCH 2/8] Update expired logic. --- koku/masu/api/expired_data.py | 17 ++++---------- koku/masu/api/urls.py | 4 ++-- koku/masu/api/views.py | 2 +- koku/masu/database/ocp_report_db_accessor.py | 3 ++- koku/masu/processor/_tasks/remove_expired.py | 8 +++---- .../processor/ocp/ocp_report_db_cleaner.py | 10 +++++---- koku/masu/processor/orchestrator.py | 22 +++++++++++++++++++ koku/masu/processor/tasks.py | 10 ++++----- 8 files changed, 46 insertions(+), 30 deletions(-) diff --git a/koku/masu/api/expired_data.py b/koku/masu/api/expired_data.py index c89f1520cb..10248be0d1 100644 --- a/koku/masu/api/expired_data.py +++ b/koku/masu/api/expired_data.py @@ -6,7 +6,6 @@ import logging from django.views.decorators.cache import never_cache -from rest_framework import status from rest_framework.decorators import api_view from rest_framework.decorators import permission_classes from rest_framework.decorators import renderer_classes @@ -16,7 +15,6 @@ from masu.config import Config from masu.processor.orchestrator import Orchestrator -from masu.processor.tasks import remove_expired_trino_migrations LOG = logging.getLogger(__name__) @@ -44,23 +42,16 @@ def expired_data(request): @api_view(http_method_names=["GET", "DELETE"]) @permission_classes((AllowAny,)) @renderer_classes(tuple(api_settings.DEFAULT_RENDERER_CLASSES)) -def expired_trino_migrations(request): +def expired_trino_partitions(request): """Return expired data.""" - params = request.query_params - schema_name = params.get("schema") - provider_type = params.get("provider_type") - provider_uuid = params.get("provider_uuid") - simulate = params.get("simulate") simulate = True - if request.method == "DELETE" and Config.DEBUG: simulate = False LOG.info("Simulate Flag: %s", simulate) - if schema_name is None: - errmsg = "schema is a required parameter." - return Response({"Error": errmsg}, status=status.HTTP_400_BAD_REQUEST) - async_delete_results = remove_expired_trino_migrations.delay(schema_name, provider_type, simulate, provider_uuid) + orchestrator = Orchestrator() + async_delete_results = orchestrator.remove_expired_trino_partitions(simulate=simulate) + response_key = "Async jobs for expired data removal" if simulate: response_key = response_key + " (simulated)" diff --git a/koku/masu/api/urls.py b/koku/masu/api/urls.py index 7735736a1f..89eb878b61 100644 --- a/koku/masu/api/urls.py +++ b/koku/masu/api/urls.py @@ -21,7 +21,7 @@ from masu.api.views import download_report from masu.api.views import EnabledTagView from masu.api.views import expired_data -from masu.api.views import expired_trino_migrations +from masu.api.views import expired_trino_partitions from masu.api.views import explain_query from masu.api.views import fix_parquet from masu.api.views import get_status @@ -59,7 +59,7 @@ path("update_exchange_rates/", update_exchange_rates, name="update_exchange_rates"), path("update_azure_storage_capacity/", update_azure_storage_capacity, name="update_azure_storage_capacity"), path("enabled_tags/", EnabledTagView.as_view(), name="enabled_tags"), - path("expired_trino_migrations", expired_trino_migrations, name="expired_trino_migrations"), + path("expired_trino_partitions/", expired_trino_partitions, name="expired_trino_partitions"), path("expired_data/", expired_data, name="expired_data"), path("hcs_report_data/", hcs_report_data, name="hcs_report_data"), path("hcs_report_finalization/", hcs_report_finalization, name="hcs_report_finalization"), diff --git a/koku/masu/api/views.py b/koku/masu/api/views.py index 9572bb9733..e192ec9f8a 100644 --- a/koku/masu/api/views.py +++ b/koku/masu/api/views.py @@ -18,7 +18,7 @@ from masu.api.download import download_report from masu.api.enabled_tags import EnabledTagView from masu.api.expired_data import expired_data -from masu.api.expired_data import expired_trino_migrations +from masu.api.expired_data import expired_trino_partitions from masu.api.hcs_report_data import hcs_report_data from masu.api.hcs_report_finalization import hcs_report_finalization from masu.api.ingest_ocp_payload import ingest_ocp_payload diff --git a/koku/masu/database/ocp_report_db_accessor.py b/koku/masu/database/ocp_report_db_accessor.py index 0eb56e0555..c1812bf344 100644 --- a/koku/masu/database/ocp_report_db_accessor.py +++ b/koku/masu/database/ocp_report_db_accessor.py @@ -285,7 +285,8 @@ def delete_hive_partitions_by_source(self, table, partition_column, provider_uui def find_expired_trino_partitions(self, table, source_column, date_str): """Queries Trino for partitions less than the parition date.""" - if not self.schema_exists_trino() or not self.table_exists_trino(table): + if not self.table_exists_trino(table): + LOG.info("Could not find table.") return False sql = f""" SELECT partitions.year, partitions.month, partitions.source diff --git a/koku/masu/processor/_tasks/remove_expired.py b/koku/masu/processor/_tasks/remove_expired.py index bdd52c18ac..252392f02b 100644 --- a/koku/masu/processor/_tasks/remove_expired.py +++ b/koku/masu/processor/_tasks/remove_expired.py @@ -41,7 +41,7 @@ def _remove_expired_data(schema_name, provider, simulate, provider_uuid=None): # We could extend the logic below here, or keep it as a separate celery task -def _remove_expired_trino_migrations(schema_name, provider_type, simulate, provider_uuid=None): +def _remove_expired_trino_partitions(schema_name, provider_type, simulate, provider_uuid=None): """ Task to remove expired data. @@ -67,8 +67,8 @@ def _remove_expired_trino_migrations(schema_name, provider_type, simulate, provi except ExpiredDataRemoverError: return - removed_trino_migrations = remover.remove_expired_trino_partitions(simulate=simulate, provider_uuid=provider_uuid) - if removed_trino_migrations: + removed_trino_partitions = remover.remove_expired_trino_partitions(simulate=simulate, provider_uuid=provider_uuid) + if removed_trino_partitions: status_msg = "Expired Partitions" if simulate else "Removed Partitions" - result_msg = f"{status_msg}:\n {str(removed_trino_migrations)}" + result_msg = f"{status_msg}:\n {str(removed_trino_partitions)}" LOG.info(result_msg) diff --git a/koku/masu/processor/ocp/ocp_report_db_cleaner.py b/koku/masu/processor/ocp/ocp_report_db_cleaner.py index 996cf0b7cf..cb81ef63b6 100644 --- a/koku/masu/processor/ocp/ocp_report_db_cleaner.py +++ b/koku/masu/processor/ocp/ocp_report_db_cleaner.py @@ -148,23 +148,25 @@ def purge_expired_report_data_by_date(self, expired_date, simulate=False): def purge_expired_trino_partitions(self, expired_date, provider_uuid=None, simulate=False): """Removes expired trino partitions.""" - if (expired_date is not None and provider_uuid is not None) or ( # noqa: W504 - expired_date is None and provider_uuid is None - ): + LOG.debug(f"purge_expired_trino_partitions: {expired_date}, {provider_uuid}, {simulate}") + if expired_date is None and provider_uuid is None: err = "This method must be called with expired_date or provider_uuid" raise OCPReportDBCleanerError(err) with OCPReportDBAccessor(self._schema) as accessor: + LOG.info(EXPIRE_MANAGED_TABLES.items()) for table, source_column in EXPIRE_MANAGED_TABLES.items(): + LOG.debug(f"{table}, {source_column}") results = accessor.find_expired_trino_partitions(table, source_column, str(expired_date.date())) if results: LOG.info(f"Discovered {len(results)} expired partitions") else: + LOG.info("No expired partitions") return if simulate: for partition in results: LOG.info(f"partition_info: {partition}") - if not simulate: + else: for result in results: year, month, source_value = result accessor.delete_hive_partition_by_month(table, source_value, year, month, source_column) diff --git a/koku/masu/processor/orchestrator.py b/koku/masu/processor/orchestrator.py index 9790c988e5..845e693fd2 100644 --- a/koku/masu/processor/orchestrator.py +++ b/koku/masu/processor/orchestrator.py @@ -31,6 +31,7 @@ from masu.processor.tasks import record_all_manifest_files from masu.processor.tasks import record_report_status from masu.processor.tasks import remove_expired_data +from masu.processor.tasks import remove_expired_trino_partitions from masu.processor.tasks import summarize_reports from masu.processor.tasks import SUMMARIZE_REPORTS_QUEUE from masu.processor.tasks import SUMMARIZE_REPORTS_QUEUE_XL @@ -501,3 +502,24 @@ def remove_expired_report_data(self, simulate=False): ) async_results.append({"customer": account.get("customer_name"), "async_id": str(async_result)}) return async_results + + def remove_expired_trino_partitions(self, simulate=False): + """ + Removes expired trino partitions for each account. + """ + async_results = [] + for account in Provider.objects.get_accounts(): + if account.get("provider_type") == Provider.PROVIDER_OCP: + LOG.info("Calling remove_expired_trino_partitions with account: %s", account) + async_result = remove_expired_trino_partitions.delay( + schema_name=account.get("schema_name"), + provider_type=account.get("provider_type"), + simulate=simulate, + ) + + LOG.info( + "Expired partition removal queued - schema_name: %s, Task ID: %s", + account.get("schema_name"), + str(async_result), + ) + return async_results diff --git a/koku/masu/processor/tasks.py b/koku/masu/processor/tasks.py index 0d9dfbf6ca..e9b485c0bb 100644 --- a/koku/masu/processor/tasks.py +++ b/koku/masu/processor/tasks.py @@ -43,7 +43,7 @@ from masu.processor._tasks.download import _get_report_files from masu.processor._tasks.process import _process_report_file from masu.processor._tasks.remove_expired import _remove_expired_data -from masu.processor._tasks.remove_expired import _remove_expired_trino_migrations +from masu.processor._tasks.remove_expired import _remove_expired_trino_partitions from masu.processor.cost_model_cost_updater import CostModelCostUpdater from masu.processor.ocp.ocp_cloud_parquet_summary_updater import DELETE_TABLE from masu.processor.ocp.ocp_cloud_parquet_summary_updater import TRUNCATE_TABLE @@ -347,8 +347,8 @@ def remove_expired_data(schema_name, provider, simulate, provider_uuid=None, que _remove_expired_data(schema_name, provider, simulate, provider_uuid) -@celery_app.task(name="masu.processor.tasks.remove_expired_data", queue=DEFAULT) -def remove_expired_trino_migrations(schema_name, provider_type, simulate, provider_uuid=None, queue_name=None): +@celery_app.task(name="masu.processor.tasks.remove_expired_trino_partitions", queue=DEFAULT) +def remove_expired_trino_partitions(schema_name, provider_type, simulate, provider_uuid=None, queue_name=None): """ Remove expired report data. @@ -367,8 +367,8 @@ def remove_expired_trino_migrations(schema_name, provider_type, simulate, provid "provider_type": provider_type, "provider_uuid": provider_uuid, } - LOG.info(log_json("remove_expired_data", msg="removing expired data", context=context)) - _remove_expired_trino_migrations(schema_name, provider_type, simulate, provider_uuid) + LOG.info(log_json("remove_expired_data", msg="removing expired partitions", context=context)) + _remove_expired_trino_partitions(schema_name, provider_type, simulate, provider_uuid) @celery_app.task(name="masu.processor.tasks.summarize_reports", queue=SUMMARIZE_REPORTS_QUEUE) # noqa: C901 From 14c51fb5da6a539c6e8de86eb121ed3eafd0f99b Mon Sep 17 00:00:00 2001 From: myersCody Date: Tue, 18 Jun 2024 15:16:09 -0400 Subject: [PATCH 3/8] Additional improvements. --- koku/masu/database/ocp_report_db_accessor.py | 1 + koku/masu/processor/orchestrator.py | 33 ++++++++++++-------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/koku/masu/database/ocp_report_db_accessor.py b/koku/masu/database/ocp_report_db_accessor.py index c1812bf344..1030d0e66e 100644 --- a/koku/masu/database/ocp_report_db_accessor.py +++ b/koku/masu/database/ocp_report_db_accessor.py @@ -299,6 +299,7 @@ def find_expired_trino_partitions(self, table, source_column, date_str): FROM "{table}$partitions" ) as partitions WHERE partitions.partition_date < DATE '{date_str}' +GROUP BY partitions.year, partitions.month, partitions.source """ return self._execute_trino_raw_sql_query(sql, log_ref="finding expired partitions") diff --git a/koku/masu/processor/orchestrator.py b/koku/masu/processor/orchestrator.py index 845e693fd2..1add41f402 100644 --- a/koku/masu/processor/orchestrator.py +++ b/koku/masu/processor/orchestrator.py @@ -508,18 +508,25 @@ def remove_expired_trino_partitions(self, simulate=False): Removes expired trino partitions for each account. """ async_results = [] - for account in Provider.objects.get_accounts(): - if account.get("provider_type") == Provider.PROVIDER_OCP: - LOG.info("Calling remove_expired_trino_partitions with account: %s", account) - async_result = remove_expired_trino_partitions.delay( - schema_name=account.get("schema_name"), - provider_type=account.get("provider_type"), - simulate=simulate, - ) + schemas = set( + Provider.objects.filter(type=Provider.PROVIDER_OCP) + .values_list("customer__schema_name", flat=True) + .distinct() + ) + # + # distinct is not removing duplicates from this list, so using a set to reduce instead + + for schema in schemas: + LOG.info("Calling remove_expired_trino_partitions with account: %s", schema) + async_result = remove_expired_trino_partitions.delay( + schema_name=schema, + provider_type=Provider.PROVIDER_OCP, + simulate=simulate, + ) - LOG.info( - "Expired partition removal queued - schema_name: %s, Task ID: %s", - account.get("schema_name"), - str(async_result), - ) + LOG.info( + "Expired partition removal queued - schema_name: %s, Task ID: %s", + schema, + str(async_result), + ) return async_results From 4847e7c62e1bde74aa090e8915a6631bdd9d7652 Mon Sep 17 00:00:00 2001 From: myersCody Date: Tue, 9 Jul 2024 14:53:59 -0400 Subject: [PATCH 4/8] Improve test coverage --- koku/masu/api/expired_data.py | 2 +- koku/masu/celery/tasks.py | 1 + koku/masu/processor/_tasks/remove_expired.py | 18 ++++---- .../processor/ocp/ocp_report_db_cleaner.py | 1 - koku/masu/processor/tasks.py | 1 - koku/masu/test/api/test_expired_data.py | 34 +++++++++++++++ .../database/test_ocp_report_db_accessor.py | 17 ++++++++ .../ocp/test_ocp_report_db_cleaner.py | 24 +++++++++++ koku/masu/test/processor/test_tasks.py | 43 +++++++++++++++++++ 9 files changed, 128 insertions(+), 13 deletions(-) diff --git a/koku/masu/api/expired_data.py b/koku/masu/api/expired_data.py index 10248be0d1..529f694b3f 100644 --- a/koku/masu/api/expired_data.py +++ b/koku/masu/api/expired_data.py @@ -52,7 +52,7 @@ def expired_trino_partitions(request): orchestrator = Orchestrator() async_delete_results = orchestrator.remove_expired_trino_partitions(simulate=simulate) - response_key = "Async jobs for expired data removal" + response_key = "Async jobs for expired paritions removal" if simulate: response_key = response_key + " (simulated)" return Response({response_key: str(async_delete_results)}) diff --git a/koku/masu/celery/tasks.py b/koku/masu/celery/tasks.py index c23f19c524..3e7d9997d2 100644 --- a/koku/masu/celery/tasks.py +++ b/koku/masu/celery/tasks.py @@ -82,6 +82,7 @@ def remove_expired_data(simulate=False): LOG.info("removing expired data") orchestrator = Orchestrator() orchestrator.remove_expired_report_data(simulate) + orchestrator.remove_expired_trino_partitions(simulate) @celery_app.task(name="masu.celery.tasks.purge_trino_files", queue=DEFAULT) diff --git a/koku/masu/processor/_tasks/remove_expired.py b/koku/masu/processor/_tasks/remove_expired.py index 447b9ec6dd..1bcb386a7c 100644 --- a/koku/masu/processor/_tasks/remove_expired.py +++ b/koku/masu/processor/_tasks/remove_expired.py @@ -51,13 +51,13 @@ def _remove_expired_trino_partitions(schema_name, provider_type, simulate, provi None """ - log_statement = ( - f"Remove expired data:\n" - f" schema_name: {schema_name}\n" - f" provider_type: {provider_type}\n" - f" simulate: {simulate}\n" - ) - LOG.info(log_statement) + context = { + "schema": schema_name, + "provider_type": provider_type, + "provider_uuid": provider_uuid, + "simulate": simulate, + } + LOG.info(log_json(msg="Remove expired partitions", context=context)) try: remover = ExpiredDataRemover(schema_name, provider_type) @@ -66,7 +66,5 @@ def _remove_expired_trino_partitions(schema_name, provider_type, simulate, provi removed_trino_partitions = remover.remove_expired_trino_partitions(simulate=simulate, provider_uuid=provider_uuid) if removed_trino_partitions: - # TODO: Fix this log message status_msg = "Expired Partitions" if simulate else "Removed Partitions" - result_msg = f"{status_msg}:\n {str(removed_trino_partitions)}" - LOG.info(result_msg) + LOG.info(log_json(msg=status_msg, removed_data=removed_trino_partitions, context=context)) diff --git a/koku/masu/processor/ocp/ocp_report_db_cleaner.py b/koku/masu/processor/ocp/ocp_report_db_cleaner.py index 1d31e3a03c..7e395f6988 100644 --- a/koku/masu/processor/ocp/ocp_report_db_cleaner.py +++ b/koku/masu/processor/ocp/ocp_report_db_cleaner.py @@ -157,7 +157,6 @@ def purge_expired_trino_partitions(self, expired_date, provider_uuid=None, simul raise OCPReportDBCleanerError(err) with OCPReportDBAccessor(self._schema) as accessor: - LOG.info(EXPIRE_MANAGED_TABLES.items()) for table, source_column in EXPIRE_MANAGED_TABLES.items(): LOG.debug(f"{table}, {source_column}") results = accessor.find_expired_trino_partitions(table, source_column, str(expired_date.date())) diff --git a/koku/masu/processor/tasks.py b/koku/masu/processor/tasks.py index 67c3632d87..4e66d75308 100644 --- a/koku/masu/processor/tasks.py +++ b/koku/masu/processor/tasks.py @@ -335,7 +335,6 @@ def remove_expired_trino_partitions(schema_name, provider_type, simulate, provid _remove_expired_trino_partitions(schema_name, provider_type, simulate, provider_uuid) - @celery_app.task(name="masu.processor.tasks.summarize_reports", queue=SummaryQueue.DEFAULT) # noqa: C901 def summarize_reports( # noqa: C901 reports_to_summarize, queue_name=None, manifest_list=None, ingress_report_uuid=None diff --git a/koku/masu/test/api/test_expired_data.py b/koku/masu/test/api/test_expired_data.py index d69523c4e0..3b6f566b1e 100644 --- a/koku/masu/test/api/test_expired_data.py +++ b/koku/masu/test/api/test_expired_data.py @@ -48,3 +48,37 @@ def test_del_expired_data(self, mock_orchestrator, mock_debug, _, mock_service): self.assertEqual(response.status_code, 200) self.assertIn(expected_key, body) self.assertIn(str(mock_response), body.get(expected_key)) + + @patch("masu.processor.worker_cache.CELERY_INSPECT") + @patch("koku.middleware.MASU", return_value=True) + @patch.object(Orchestrator, "remove_expired_trino_partitions") + def test_get_expired_partitions(self, mock_orchestrator, _, mock_service): + """Test the GET expired_trino_paritions endpoint.""" + mock_response = [{"customer": "org1234567", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}] + expected_key = "Async jobs for expired paritions removal (simulated)" + mock_orchestrator.return_value = mock_response + response = self.client.get(reverse("expired_trino_partitions")) + body = response.json() + + self.assertEqual(response.status_code, 200) + self.assertIn(expected_key, body) + self.assertIn(str(mock_response), body.get(expected_key)) + mock_orchestrator.assert_called() + + @patch("masu.processor.worker_cache.CELERY_INSPECT") + @patch("koku.middleware.MASU", return_value=True) + @patch.object(Config, "DEBUG", return_value=False) + @patch.object(Orchestrator, "remove_expired_trino_partitions") + def test_del_expired_partitions(self, mock_orchestrator, mock_debug, _, mock_service): + """Test the DELETE expired_trino_partitions endpoint.""" + mock_response = [{"customer": "org1234567", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}] + expected_key = "Async jobs for expired paritions removal" + mock_orchestrator.return_value = mock_response + + response = self.client.delete(reverse("expired_trino_partitions")) + body = response.json() + + self.assertEqual(response.status_code, 200) + self.assertIn(expected_key, body) + self.assertIn(str(mock_response), body.get(expected_key)) + mock_orchestrator.assert_called() diff --git a/koku/masu/test/database/test_ocp_report_db_accessor.py b/koku/masu/test/database/test_ocp_report_db_accessor.py index f921f3095c..0538af0295 100644 --- a/koku/masu/test/database/test_ocp_report_db_accessor.py +++ b/koku/masu/test/database/test_ocp_report_db_accessor.py @@ -789,6 +789,23 @@ def test_delete_hive_partitions_by_source_success(self, mock_trino, mock_table_e mock_trino.assert_called() self.assertTrue(result) + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.table_exists_trino") + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor._execute_trino_raw_sql_query") + def test_find_expired_trino_partitions_success(self, mock_trino, mock_table_exist): + """Test that deletions work with retries.""" + result = self.accessor.find_expired_trino_partitions("table", "source_column", "2024-06-01") + mock_trino.assert_called() + self.assertTrue(result) + + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.table_exists_trino") + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor._execute_trino_raw_sql_query") + def test_find_expired_trino_partitions_no_table(self, mock_trino, mock_table_exist): + """Test that deletions work with retries.""" + mock_table_exist.return_value = False + result = self.accessor.find_expired_trino_partitions("table", "source_column", "2024-06-01") + mock_trino.assert_not_called() + self.assertFalse(result) + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.schema_exists_trino") @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.table_exists_trino") @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor._execute_trino_raw_sql_query") diff --git a/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py b/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py index 1d02203f72..2549c30060 100644 --- a/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py +++ b/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py @@ -5,6 +5,7 @@ """Test the OCPReportDBCleaner utility object.""" import datetime import uuid +from unittest.mock import patch from dateutil import relativedelta from django.conf import settings @@ -213,3 +214,26 @@ def test_drop_report_partitions(self): self.assertEqual(len(removed_data), 1) with schema_context(self.schema): self.assertFalse(table_exists(self.schema, test_part.table_name)) + + def test_purge_expired_trino_partitions_short_circuit(self): + cleaner = OCPReportDBCleaner(self.schema) + with self.assertRaises(OCPReportDBCleanerError): + cleaner.purge_expired_trino_partitions(None, None, True) + + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.delete_hive_partition_by_month") + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.find_expired_trino_partitions") + def test_purge_expired_trino_partitions_no_partitions(self, mock_find_partitions, mock_delete): + mock_find_partitions.return_value = [] + cleaner = OCPReportDBCleaner(self.schema) + cutoff_date = datetime.datetime(2018, 12, 31, tzinfo=settings.UTC) + cleaner.purge_expired_trino_partitions(cutoff_date) + mock_delete.assert_not_called() + + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.delete_hive_partition_by_month") + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.find_expired_trino_partitions") + def test_purge_expired_trino_partitions(self, mock_find_partitions, mock_delete): + mock_find_partitions.return_value = [("year", "month", "A")] + cleaner = OCPReportDBCleaner(self.schema) + cutoff_date = datetime.datetime(2018, 12, 31, tzinfo=settings.UTC) + cleaner.purge_expired_trino_partitions(cutoff_date) + mock_delete.assert_called() diff --git a/koku/masu/test/processor/test_tasks.py b/koku/masu/test/processor/test_tasks.py index e54d273140..7a77df81b3 100644 --- a/koku/masu/test/processor/test_tasks.py +++ b/koku/masu/test/processor/test_tasks.py @@ -59,6 +59,7 @@ from masu.processor.tasks import record_all_manifest_files from masu.processor.tasks import record_report_status from masu.processor.tasks import remove_expired_data +from masu.processor.tasks import remove_expired_trino_partitions from masu.processor.tasks import remove_stale_tenants from masu.processor.tasks import summarize_reports from masu.processor.tasks import update_all_summary_tables @@ -718,6 +719,48 @@ def test_remove_expired_data_no_simulate(self, fake_remover): self.assertIn(expected_initial_remove_log, logger.output) self.assertNotIn(expected_expired_data_log, logger.output) + @patch.object(ExpiredDataRemover, "remove_expired_trino_partitions") + def test_remove_expired_trino_partitions(self, fake_remover): + """Test task.""" + expected_results = ["A"] + fake_remover.return_value = expected_results + + # disable logging override set in masu/__init__.py + logging.disable(logging.NOTSET) + for simulate in [True, False]: + with self.subTest(simulate=simulate): + with self.assertLogs("masu.processor._tasks.remove_expired") as logger: + expected_initial_remove_log = ( + "INFO:masu.processor._tasks.remove_expired:" + "{'message': 'Remove expired partitions', 'tracing_id': '', " + "'schema': '" + self.schema + "', " + "'provider_type': '" + Provider.PROVIDER_OCP + "', " + "'provider_uuid': " + str(None) + ", " + "'simulate': " + str(simulate) + "}" + ) + + expected_expired_data_log = ( + "INFO:masu.processor._tasks.remove_expired:" + "{'message': 'Removed Partitions', 'tracing_id': '', " + "'schema': '" + self.schema + "', " + "'provider_type': '" + Provider.PROVIDER_OCP + "', " + "'provider_uuid': " + str(None) + ", " + "'simulate': " + str(simulate) + ", " + "'removed_data': " + str(expected_results) + "}" + ) + remove_expired_trino_partitions( + schema_name=self.schema, provider_type=Provider.PROVIDER_OCP, simulate=simulate + ) + + # Debugging output + print("Logger output:", logger.output) + + self.assertIn(expected_initial_remove_log, logger.output) + if simulate: + self.assertNotIn(expected_expired_data_log, logger.output) + else: + self.assertIn(expected_expired_data_log, logger.output) + class TestUpdateSummaryTablesTask(MasuTestCase): """Test cases for Processor summary table Celery tasks.""" From 4cf7780d8d58d51c3cdaf2d462d8f10aed50735b Mon Sep 17 00:00:00 2001 From: myersCody Date: Tue, 9 Jul 2024 16:07:15 -0400 Subject: [PATCH 5/8] Improve test coverage. --- koku/masu/processor/orchestrator.py | 3 ++- .../ocp/test_ocp_report_db_cleaner.py | 9 ++++++++ koku/masu/test/processor/test_orchestrator.py | 21 +++++++++++++++++++ koku/masu/test/processor/test_tasks.py | 9 +++++--- 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/koku/masu/processor/orchestrator.py b/koku/masu/processor/orchestrator.py index b8f0db880c..14b801785f 100644 --- a/koku/masu/processor/orchestrator.py +++ b/koku/masu/processor/orchestrator.py @@ -556,7 +556,6 @@ def remove_expired_trino_partitions(self, simulate=False): ) # # distinct is not removing duplicates from this list, so using a set to reduce instead - for schema in schemas: LOG.info("Calling remove_expired_trino_partitions with account: %s", schema) async_result = remove_expired_trino_partitions.delay( @@ -570,4 +569,6 @@ def remove_expired_trino_partitions(self, simulate=False): schema, str(async_result), ) + async_results.append(async_result) + return async_results diff --git a/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py b/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py index 2549c30060..8b5e67291e 100644 --- a/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py +++ b/koku/masu/test/processor/ocp/test_ocp_report_db_cleaner.py @@ -237,3 +237,12 @@ def test_purge_expired_trino_partitions(self, mock_find_partitions, mock_delete) cutoff_date = datetime.datetime(2018, 12, 31, tzinfo=settings.UTC) cleaner.purge_expired_trino_partitions(cutoff_date) mock_delete.assert_called() + + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.delete_hive_partition_by_month") + @patch("masu.database.ocp_report_db_accessor.OCPReportDBAccessor.find_expired_trino_partitions") + def test_purge_expired_trino_partitions_simulate(self, mock_find_partitions, mock_delete): + mock_find_partitions.return_value = [("year", "month", "A")] + cleaner = OCPReportDBCleaner(self.schema) + cutoff_date = datetime.datetime(2018, 12, 31, tzinfo=settings.UTC) + cleaner.purge_expired_trino_partitions(cutoff_date, simulate=True) + mock_delete.assert_not_called() diff --git a/koku/masu/test/processor/test_orchestrator.py b/koku/masu/test/processor/test_orchestrator.py index f2ddeb1fea..118c3150d2 100644 --- a/koku/masu/test/processor/test_orchestrator.py +++ b/koku/masu/test/processor/test_orchestrator.py @@ -597,3 +597,24 @@ def test_check_currently_processing(self): initial_prov_failed.data_updated_timestamp = None result = check_currently_processing(self.schema_name, initial_prov_failed) self.assertEqual(result, False) + + @patch("masu.processor.worker_cache.CELERY_INSPECT") + @patch.object(ExpiredDataRemover, "remove_expired_trino_partitions") + @patch("masu.processor.orchestrator.remove_expired_trino_partitions.delay") + def test_remove_expired_trino_partitions(self, mock_task, mock_remover, mock_inspect): + """Test removing expired trino partitions.""" + task_id = "123" + expected_results = [{"account_payer_id": "999999999", "billing_period_start": "2018-06-24 15:47:33.052509"}] + mock_remover.return_value = expected_results + mock_task.return_value = task_id + expected = ( + "INFO:masu.processor.orchestrator:Expired partition removal queued - schema_name: org1234567, Task ID: {}" + ) + # unset disabling all logging below CRITICAL from masu/__init__.py + logging.disable(logging.NOTSET) + with self.assertLogs("masu.processor.orchestrator", level="INFO") as logger: + orchestrator = Orchestrator() + results = orchestrator.remove_expired_trino_partitions() + self.assertTrue(results) + self.assertEqual(len(results), 1) + self.assertIn(expected.format(task_id), logger.output) diff --git a/koku/masu/test/processor/test_tasks.py b/koku/masu/test/processor/test_tasks.py index 7a77df81b3..3faa55ef28 100644 --- a/koku/masu/test/processor/test_tasks.py +++ b/koku/masu/test/processor/test_tasks.py @@ -46,6 +46,7 @@ from masu.processor._tasks.download import _get_report_files from masu.processor._tasks.process import _process_report_file from masu.processor.expired_data_remover import ExpiredDataRemover +from masu.processor.expired_data_remover import ExpiredDataRemoverError from masu.processor.report_processor import ReportProcessorError from masu.processor.report_summary_updater import ReportSummaryUpdaterCloudError from masu.processor.report_summary_updater import ReportSummaryUpdaterError @@ -752,15 +753,17 @@ def test_remove_expired_trino_partitions(self, fake_remover): schema_name=self.schema, provider_type=Provider.PROVIDER_OCP, simulate=simulate ) - # Debugging output - print("Logger output:", logger.output) - self.assertIn(expected_initial_remove_log, logger.output) if simulate: self.assertNotIn(expected_expired_data_log, logger.output) else: self.assertIn(expected_expired_data_log, logger.output) + def test_failed_remover_error(self): + """Test that a fake provider will result in an error.""" + with self.assertRaises(ExpiredDataRemoverError): + remove_expired_trino_partitions(self.schema, "FAKE_PROVIDER", False) + class TestUpdateSummaryTablesTask(MasuTestCase): """Test cases for Processor summary table Celery tasks.""" From 533a11a8a349a4fecd61709b9adfd267050aa2bb Mon Sep 17 00:00:00 2001 From: myersCody Date: Tue, 9 Jul 2024 16:33:19 -0400 Subject: [PATCH 6/8] Remove bad test. --- koku/masu/test/processor/test_tasks.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/koku/masu/test/processor/test_tasks.py b/koku/masu/test/processor/test_tasks.py index 3faa55ef28..7640923217 100644 --- a/koku/masu/test/processor/test_tasks.py +++ b/koku/masu/test/processor/test_tasks.py @@ -46,7 +46,6 @@ from masu.processor._tasks.download import _get_report_files from masu.processor._tasks.process import _process_report_file from masu.processor.expired_data_remover import ExpiredDataRemover -from masu.processor.expired_data_remover import ExpiredDataRemoverError from masu.processor.report_processor import ReportProcessorError from masu.processor.report_summary_updater import ReportSummaryUpdaterCloudError from masu.processor.report_summary_updater import ReportSummaryUpdaterError @@ -759,11 +758,6 @@ def test_remove_expired_trino_partitions(self, fake_remover): else: self.assertIn(expected_expired_data_log, logger.output) - def test_failed_remover_error(self): - """Test that a fake provider will result in an error.""" - with self.assertRaises(ExpiredDataRemoverError): - remove_expired_trino_partitions(self.schema, "FAKE_PROVIDER", False) - class TestUpdateSummaryTablesTask(MasuTestCase): """Test cases for Processor summary table Celery tasks.""" From 6f971499cdf35b2c79123fe4d36d3e719b220965 Mon Sep 17 00:00:00 2001 From: Cody Myers Date: Thu, 11 Jul 2024 13:51:24 -0400 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: Michael Skarbek --- koku/masu/processor/ocp/ocp_report_db_cleaner.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/koku/masu/processor/ocp/ocp_report_db_cleaner.py b/koku/masu/processor/ocp/ocp_report_db_cleaner.py index 7e395f6988..2b1b1ea0c8 100644 --- a/koku/masu/processor/ocp/ocp_report_db_cleaner.py +++ b/koku/masu/processor/ocp/ocp_report_db_cleaner.py @@ -165,10 +165,8 @@ def purge_expired_trino_partitions(self, expired_date, provider_uuid=None, simul else: LOG.info("No expired partitions") return - if simulate: - for partition in results: - LOG.info(f"partition_info: {partition}") - else: - for result in results: - year, month, source_value = result + for partition in results: + LOG.info(f"partition_info: {partition}") + if not simulate: + year, month, source_value = partition accessor.delete_hive_partition_by_month(table, source_value, year, month, source_column) From 5767434b23fdd3cef9369a5f318a7396e54a4648 Mon Sep 17 00:00:00 2001 From: myersCody Date: Thu, 11 Jul 2024 14:05:58 -0400 Subject: [PATCH 8/8] Address comments --- koku/masu/processor/_tasks/remove_expired.py | 1 - koku/masu/processor/orchestrator.py | 7 +++---- koku/masu/processor/tasks.py | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/koku/masu/processor/_tasks/remove_expired.py b/koku/masu/processor/_tasks/remove_expired.py index 1bcb386a7c..abba683d58 100644 --- a/koku/masu/processor/_tasks/remove_expired.py +++ b/koku/masu/processor/_tasks/remove_expired.py @@ -35,7 +35,6 @@ def _remove_expired_data(schema_name, provider, simulate, provider_uuid=None): if removed_data: status_msg = "Expired Data" if simulate else "Removed Data" LOG.info(log_json(msg=status_msg, removed_data=removed_data, context=context)) - # We could extend the logic below here, or keep it as a separate celery task def _remove_expired_trino_partitions(schema_name, provider_type, simulate, provider_uuid=None): diff --git a/koku/masu/processor/orchestrator.py b/koku/masu/processor/orchestrator.py index 14b801785f..c8a7966be0 100644 --- a/koku/masu/processor/orchestrator.py +++ b/koku/masu/processor/orchestrator.py @@ -549,13 +549,12 @@ def remove_expired_trino_partitions(self, simulate=False): Removes expired trino partitions for each account. """ async_results = [] - schemas = set( - Provider.objects.filter(type=Provider.PROVIDER_OCP) + schemas = ( + Provider.objects.order_by() + .filter(type=Provider.PROVIDER_OCP) .values_list("customer__schema_name", flat=True) .distinct() ) - # - # distinct is not removing duplicates from this list, so using a set to reduce instead for schema in schemas: LOG.info("Calling remove_expired_trino_partitions with account: %s", schema) async_result = remove_expired_trino_partitions.delay( diff --git a/koku/masu/processor/tasks.py b/koku/masu/processor/tasks.py index 4e66d75308..924ab0f37a 100644 --- a/koku/masu/processor/tasks.py +++ b/koku/masu/processor/tasks.py @@ -312,7 +312,7 @@ def remove_expired_data(schema_name, provider, simulate, provider_uuid=None, que @celery_app.task(name="masu.processor.tasks.remove_expired_trino_partitions", queue=DEFAULT) -def remove_expired_trino_partitions(schema_name, provider_type, simulate, provider_uuid=None, queue_name=None): +def remove_expired_trino_partitions(schema_name, provider_type, simulate, provider_uuid=None): """ Remove expired report data.