From 14c51fb5da6a539c6e8de86eb121ed3eafd0f99b Mon Sep 17 00:00:00 2001 From: myersCody Date: Tue, 18 Jun 2024 15:16:09 -0400 Subject: [PATCH] Additional improvements. --- koku/masu/database/ocp_report_db_accessor.py | 1 + koku/masu/processor/orchestrator.py | 33 ++++++++++++-------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/koku/masu/database/ocp_report_db_accessor.py b/koku/masu/database/ocp_report_db_accessor.py index c1812bf344..1030d0e66e 100644 --- a/koku/masu/database/ocp_report_db_accessor.py +++ b/koku/masu/database/ocp_report_db_accessor.py @@ -299,6 +299,7 @@ def find_expired_trino_partitions(self, table, source_column, date_str): FROM "{table}$partitions" ) as partitions WHERE partitions.partition_date < DATE '{date_str}' +GROUP BY partitions.year, partitions.month, partitions.source """ return self._execute_trino_raw_sql_query(sql, log_ref="finding expired partitions") diff --git a/koku/masu/processor/orchestrator.py b/koku/masu/processor/orchestrator.py index 845e693fd2..1add41f402 100644 --- a/koku/masu/processor/orchestrator.py +++ b/koku/masu/processor/orchestrator.py @@ -508,18 +508,25 @@ def remove_expired_trino_partitions(self, simulate=False): Removes expired trino partitions for each account. """ async_results = [] - for account in Provider.objects.get_accounts(): - if account.get("provider_type") == Provider.PROVIDER_OCP: - LOG.info("Calling remove_expired_trino_partitions with account: %s", account) - async_result = remove_expired_trino_partitions.delay( - schema_name=account.get("schema_name"), - provider_type=account.get("provider_type"), - simulate=simulate, - ) + schemas = set( + Provider.objects.filter(type=Provider.PROVIDER_OCP) + .values_list("customer__schema_name", flat=True) + .distinct() + ) + # + # distinct is not removing duplicates from this list, so using a set to reduce instead + + for schema in schemas: + LOG.info("Calling remove_expired_trino_partitions with account: %s", schema) + async_result = remove_expired_trino_partitions.delay( + schema_name=schema, + provider_type=Provider.PROVIDER_OCP, + simulate=simulate, + ) - LOG.info( - "Expired partition removal queued - schema_name: %s, Task ID: %s", - account.get("schema_name"), - str(async_result), - ) + LOG.info( + "Expired partition removal queued - schema_name: %s, Task ID: %s", + schema, + str(async_result), + ) return async_results