diff --git a/Pipfile b/Pipfile index c016e0c6a7..8ee3956484 100644 --- a/Pipfile +++ b/Pipfile @@ -9,6 +9,7 @@ adal = "*" app-common-python = ">=0.2.3" azure-identity = "*" azure-mgmt-costmanagement = ">=3.0.0" +azure-mgmt-compute = "*" azure-mgmt-resource = ">=8.0" azure-mgmt-storage = ">=20.1.0" azure-storage-blob = ">=12.1" diff --git a/Pipfile.lock b/Pipfile.lock index 0110ffccce..59562553b9 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "12b9a65de36f21d1117234fbeab350c4dfeab1a49ebc679208b0bbae8bd5cd5a" + "sha256": "3635af99c48af9de4668d97061063a8fa9eb64d2ec6f2dfc305dd488e2d9f3c9" }, "pipfile-spec": 6, "requires": { @@ -87,6 +87,14 @@ "index": "pypi", "version": "==1.15.0" }, + "azure-mgmt-compute": { + "hashes": [ + "sha256:73845529ebee3ba1e9f5d3963f5b1f9108c950948faa8ab2206b1d62eb7f036c", + "sha256:e529786340f862aa6a218a7ad6a5beb6b9fb55833c5caa0851cc3314d0493b63" + ], + "index": "pypi", + "version": "==30.3.0" + }, "azure-mgmt-core": { "hashes": [ "sha256:81071675f186a585555ef01816f2774d49c1c9024cb76e5720c3c0f6b337bb7d", diff --git a/deploy/clowdapp.yaml b/deploy/clowdapp.yaml index 70866ad442..efb412c3ad 100644 --- a/deploy/clowdapp.yaml +++ b/deploy/clowdapp.yaml @@ -3960,6 +3960,8 @@ objects: value: ${ENHANCED_ORG_ADMIN} - name: ENABLE_SUBS_DEBUG value: ${ENABLE_SUBS_DEBUG} + - name: ENABLE_SUBS_PROVIDER_TYPES + value: ${ENABLE_SUBS_PROVIDER_TYPES} image: ${IMAGE}:${IMAGE_TAG} livenessProbe: failureThreshold: 5 @@ -4642,6 +4644,10 @@ parameters: displayName: Enable ROS Debug name: ENABLE_ROS_DEBUG value: "False" +- description: Enable SUBS Provider Types for Processing + displayname: Enable SUBS Provider Types + name: ENABLE_SUBS_PROVIDER_TYPES + value: AWS - displayName: Minimum replicas name: KOKU_REPLICAS required: true diff --git a/deploy/kustomize/base/base.yaml b/deploy/kustomize/base/base.yaml index a580f4a4eb..aa9cf1d1f3 100644 --- a/deploy/kustomize/base/base.yaml +++ b/deploy/kustomize/base/base.yaml @@ -541,3 +541,7 @@ parameters: displayName: Enable ROS Debug name: ENABLE_ROS_DEBUG value: "False" +- description: Enable SUBS Provider Types for Processing + displayname: Enable SUBS Provider Types + name: ENABLE_SUBS_PROVIDER_TYPES + value: "AWS" diff --git a/deploy/kustomize/patches/worker-subs-extraction.yaml b/deploy/kustomize/patches/worker-subs-extraction.yaml index baad7ddf19..08c9b4ff52 100644 --- a/deploy/kustomize/patches/worker-subs-extraction.yaml +++ b/deploy/kustomize/patches/worker-subs-extraction.yaml @@ -111,6 +111,8 @@ value: ${ENHANCED_ORG_ADMIN} - name: ENABLE_SUBS_DEBUG value: ${ENABLE_SUBS_DEBUG} + - name: ENABLE_SUBS_PROVIDER_TYPES + value: ${ENABLE_SUBS_PROVIDER_TYPES} livenessProbe: httpGet: path: /livez diff --git a/docker-compose.yml b/docker-compose.yml index 596e955dc3..26b98f8668 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -183,6 +183,7 @@ services: - ENABLE_S3_ARCHIVING=${ENABLE_S3_ARCHIVING-False} - ENABLE_HCS_DEBUG=${ENABLE_HCS_DEBUG-False} - ENABLE_SUBS_DEBUG=${ENABLE_SUBS_DEBUG-False} + - ENABLE_SUBS_PROVIDER_TYPES=${ENABLE_SUBS_PROVIDER_TYPES} - ENABLE_ROS_DEBUG=${ENABLE_ROS_DEBUG-False} - S3_BUCKET_NAME=${S3_BUCKET_NAME-koku-bucket} - S3_BUCKET_PATH=${S3_BUCKET_PATH-data_archive} @@ -318,6 +319,7 @@ services: - KOKU_API_PORT=${KOKU_API_PORT-8000} - KOKU_API_PATH_PREFIX=${KOKU_API_PATH_PREFIX-/api/cost-management/v1} - ENABLE_SUBS_DEBUG=${ENABLE_SUBS_DEBUG-False} + - ENABLE_SUBS_PROVIDER_TYPES=${ENABLE_SUBS_PROVIDER_TYPES} - S3_BUCKET_NAME=${S3_BUCKET_NAME-koku-bucket} - S3_BUCKET_PATH=${S3_BUCKET_PATH-data_archive} - S3_ENDPOINT diff --git a/koku/koku/settings.py b/koku/koku/settings.py index 6c8d8e2b60..e9c4a3b09e 100644 --- a/koku/koku/settings.py +++ b/koku/koku/settings.py @@ -566,6 +566,7 @@ # SUBS Data Extraction debugging ENABLE_SUBS_DEBUG = ENVIRONMENT.bool("ENABLE_SUBS_DEBUG", default=False) +ENABLE_SUBS_PROVIDER_TYPES = ENVIRONMENT.list("ENABLE_SUBS_PROVIDER_TYPES", default=["AWS"]) # ROS debugging ENABLE_ROS_DEBUG = ENVIRONMENT.bool("ENABLE_ROS_DEBUG", default=False) diff --git a/koku/providers/azure/client.py b/koku/providers/azure/client.py index 877edf23a3..8679337ea3 100644 --- a/koku/providers/azure/client.py +++ b/koku/providers/azure/client.py @@ -4,6 +4,7 @@ # """Azure Client Configuration.""" from azure.identity import ClientSecretCredential +from azure.mgmt.compute import ComputeManagementClient from azure.mgmt.costmanagement import CostManagementClient from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.storage import StorageManagementClient @@ -70,6 +71,11 @@ def storage_client(self): """Get storage client with subscription and credentials.""" return StorageManagementClient(self.credentials, self.subscription_id) + @property + def compute_client(self): + """Get compute client with subscription and credentials.""" + return ComputeManagementClient(self.credentials, self.subscription_id) + @property def subscription_id(self): """Subscription ID property.""" diff --git a/koku/subs/subs_data_extractor.py b/koku/subs/subs_data_extractor.py index ad58c663d7..2cb399d822 100644 --- a/koku/subs/subs_data_extractor.py +++ b/koku/subs/subs_data_extractor.py @@ -22,11 +22,89 @@ from reporting.models import SubsIDMap from reporting.models import SubsLastProcessed from reporting.provider.aws.models import TRINO_LINE_ITEM_TABLE as AWS_TABLE +from reporting.provider.azure.models import TRINO_LINE_ITEM_TABLE as AZURE_TABLE + LOG = logging.getLogger(__name__) TABLE_MAP = { Provider.PROVIDER_AWS: AWS_TABLE, + Provider.PROVIDER_AZURE: AZURE_TABLE, +} + +ID_COLUMN_MAP = { + Provider.PROVIDER_AWS: "lineitem_usageaccountid", + Provider.PROVIDER_AZURE: "COALESCE(NULLIF(subscriptionid, ''), subscriptionguid)", +} + +RECORD_FILTER_MAP = { + Provider.PROVIDER_AWS: ( + " lineitem_productcode = 'AmazonEC2' AND lineitem_lineitemtype IN ('Usage', 'SavingsPlanCoveredUsage') " + "AND product_vcpu != '' AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0" + ), + Provider.PROVIDER_AZURE: ( + " metercategory = 'Virtual Machines' AND chargetype = 'Usage' " + "AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL " + "AND json_extract_scalar(lower(tags), '$.com_redhat_rhel') IS NOT NULL" + ), +} + +RESOURCE_ID_FILTER_MAP = { + Provider.PROVIDER_AWS: ( + " AND lineitem_productcode = 'AmazonEC2' " + "AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0 AND lineitem_usageaccountid = {{usage_account}}" + ), + Provider.PROVIDER_AZURE: ( + " AND metercategory = 'Virtual Machines' " + "AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL " + "AND json_extract_scalar(lower(tags), '$.com_redhat_rhel') IS NOT NULL " + "AND (subscriptionid = {{usage_account}} or subscriptionguid = {{usage_account}}) " + ), +} + +RESOURCE_SELECT_MAP = { + Provider.PROVIDER_AWS: " SELECT lineitem_resourceid, max(lineitem_usagestartdate) ", + Provider.PROVIDER_AZURE: " SELECT coalesce(NULLIF(resourceid, ''), instancename), date_add('day', -1, max(coalesce(date, usagedatetime))) ", # noqa E501 +} + +RESOURCE_ID_GROUP_BY_MAP = { + Provider.PROVIDER_AWS: " GROUP BY lineitem_resourceid", + Provider.PROVIDER_AZURE: " GROUP BY resourceid, instancename", +} + +RESOURCE_ID_EXCLUSION_CLAUSE_MAP = { + Provider.PROVIDER_AWS: " AND lineitem_resourceid NOT IN {{excluded_ids | inclause}} ", + Provider.PROVIDER_AZURE: " and coalesce(NULLIF(resourceid, ''), instancename) NOT IN {{excluded_ids | inclause}} ", +} + +RESOURCE_ID_SQL_CLAUSE_MAP = { + Provider.PROVIDER_AWS: ( + " ( lineitem_resourceid = {{{{ rid_{0} }}}} " + " AND lineitem_usagestartdate >= {{{{ start_date_{0} }}}} " + " AND lineitem_usagestartdate <= {{{{ end_date_{0} }}}}) " + ), + Provider.PROVIDER_AZURE: ( + " ( coalesce(NULLIF(resourceid, ''), instancename) = {{{{ rid_{0} }}}} " + "AND coalesce(date, usagedatetime) >= {{{{ start_date_{0} }}}} " + "AND coalesce(date, usagedatetime) <= {{{{ end_date_{0} }}}}) " + ), +} + +POST_OR_CLAUSE_SQL_MAP = { + Provider.PROVIDER_AWS: """ +OFFSET + {{ offset }} + LIMIT + {{ limit }} + ) +WHERE json_extract_scalar(tags, '$.com_redhat_rhel') IS NOT NULL +""", + Provider.PROVIDER_AZURE: """ +OFFSET + {{ offset }} + LIMIT + {{ limit }} +""", } @@ -40,11 +118,20 @@ def __init__(self, tracing_id, context): microsecond=0, second=0, minute=0, hour=0 ) - timedelta(days=1) self.tracing_id = tracing_id - self.table = TABLE_MAP.get(self.provider_type) self.s3_resource = get_s3_resource( settings.S3_SUBS_ACCESS_KEY, settings.S3_SUBS_SECRET, settings.S3_SUBS_REGION ) self.context = context + # The following variables all change depending on the provider type to run the correct SQL + self.table = TABLE_MAP.get(self.provider_type) + self.id_column = ID_COLUMN_MAP.get(self.provider_type) + self.provider_where_clause = RECORD_FILTER_MAP.get(self.provider_type) + self.resource_select_sql = RESOURCE_SELECT_MAP.get(self.provider_type) + self.resource_id_where_clause = RESOURCE_ID_FILTER_MAP.get(self.provider_type) + self.resource_id_group_by = RESOURCE_ID_GROUP_BY_MAP.get(self.provider_type) + self.resource_id_sql_clause = RESOURCE_ID_SQL_CLAUSE_MAP.get(self.provider_type) + self.resource_id_exclusion_clause = RESOURCE_ID_EXCLUSION_CLAUSE_MAP.get(self.provider_type) + self.post_or_clause_sql = POST_OR_CLAUSE_SQL_MAP.get(self.provider_type) @cached_property def subs_s3_path(self): @@ -62,7 +149,11 @@ def get_latest_processed_dict_for_provider(self, year, month): # and we want to gather new data we have not processed yet # so we add one second to the last timestamp to ensure the time range processed # is all new data - lpt_dict[rid] = latest_time + timedelta(seconds=1) + if self.provider_type != Provider.PROVIDER_AZURE: + lpt_dict[rid] = latest_time + timedelta(seconds=1) + # Azure is daily timestamps so we do not need to increase this by 1 since this was the previous end + else: + lpt_dict[rid] = latest_time return lpt_dict def determine_latest_processed_time_for_provider(self, rid, year, month): @@ -87,17 +178,19 @@ def determine_ids_for_provider(self, year, month): SubsIDMap.objects.exclude(source_uuid=self.provider_uuid).values_list("usage_id", flat=True) ) sql = ( - "SELECT DISTINCT lineitem_usageaccountid FROM hive.{{schema | sqlsafe}}.aws_line_items WHERE" + "SELECT DISTINCT {{id_column | sqlsafe}} FROM hive.{{schema | sqlsafe}}.{{table | sqlsafe}} WHERE" " source={{source_uuid}} AND year={{year}} AND month={{month}}" ) if excluded_ids: - sql += "AND lineitem_usageaccountid NOT IN {{excluded_ids | inclause}}" + sql += " AND {{id_column | sqlsafe}} NOT IN {{excluded_ids | inclause}}" sql_params = { "schema": self.schema, "source_uuid": self.provider_uuid, "year": year, "month": month, "excluded_ids": excluded_ids, + "id_column": self.id_column, + "table": self.table, } ids = self._execute_trino_raw_sql_query( sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_ids_for_provider" @@ -120,11 +213,9 @@ def determine_line_item_count(self, where_clause, sql_params): def determine_where_clause_and_params(self, year, month): """Determine the where clause to use when processing subs data""" - where_clause = ( - "WHERE source={{source_uuid}} AND year={{year}} AND month={{month}} AND" - " lineitem_productcode = 'AmazonEC2' AND lineitem_lineitemtype IN ('Usage', 'SavingsPlanCoveredUsage') AND" - " product_vcpu != '' AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0" - ) + where_clause = "WHERE source={{source_uuid}} AND year={{year}} AND month={{month}} AND" + # different provider types have different required filters here + where_clause += self.provider_where_clause sql_params = { "source_uuid": self.provider_uuid, "year": year, @@ -139,16 +230,14 @@ def get_resource_ids_for_usage_account(self, usage_account, year, month): excluded_ids = list( SubsLastProcessed.objects.exclude(source_uuid=self.provider_uuid).values_list("resource_id", flat=True) ) - sql = ( - "SELECT lineitem_resourceid, max(lineitem_usagestartdate)" - " FROM hive.{{schema | sqlsafe}}.aws_line_items WHERE" + sql = self.resource_select_sql + ( + " FROM hive.{{schema | sqlsafe}}.{{table | sqlsafe}} WHERE" " source={{source_uuid}} AND year={{year}} AND month={{month}}" - " AND lineitem_productcode = 'AmazonEC2' AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0" - " AND lineitem_usageaccountid = {{usage_account}}" ) + sql += self.resource_id_where_clause if excluded_ids: - sql += "AND lineitem_resourceid NOT IN {{excluded_ids | inclause}}" - sql += " GROUP BY lineitem_resourceid" + sql += self.resource_id_exclusion_clause + sql += self.resource_id_group_by sql_params = { "schema": self.schema, "source_uuid": self.provider_uuid, @@ -156,6 +245,7 @@ def get_resource_ids_for_usage_account(self, usage_account, year, month): "month": month, "excluded_ids": excluded_ids, "usage_account": usage_account, + "table": self.table, } ids = self._execute_trino_raw_sql_query( sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_rids_for_provider" @@ -174,25 +264,13 @@ def gather_and_upload_for_resource_batch(self, year, month, batch, base_filename sql_params[f"rid_{i}"] = rid sql_params[f"start_date_{i}"] = start_time sql_params[f"end_date_{i}"] = end_time - rid_sql_clause += ( - "( lineitem_resourceid = {{{{ rid_{0} }}}}" - " AND lineitem_usagestartdate >= {{{{ start_date_{0} }}}}" - " AND lineitem_usagestartdate <= {{{{ end_date_{0} }}}})" - ).format(i) + rid_sql_clause += self.resource_id_sql_clause.format(i) if i < len(batch) - 1: rid_sql_clause += " OR " rid_sql_clause += " )" where_clause += rid_sql_clause summary_sql += rid_sql_clause - summary_sql += """ -OFFSET - {{ offset }} - LIMIT - {{ limit }} - ) --- this ensures the required `com_redhat_rhel` tag exists in the set of tags since the above match is not exact -WHERE json_extract_scalar(tags, '$.com_redhat_rhel') IS NOT NULL -""" + summary_sql += self.post_or_clause_sql total_count = self.determine_line_item_count(where_clause, sql_params) LOG.debug( log_json( diff --git a/koku/subs/subs_data_messenger.py b/koku/subs/subs_data_messenger.py index d7213d78f0..51f5f8cdc0 100644 --- a/koku/subs/subs_data_messenger.py +++ b/koku/subs/subs_data_messenger.py @@ -7,23 +7,35 @@ import logging import os import uuid +from collections import defaultdict +from datetime import timedelta from tempfile import mkdtemp +from dateutil import parser from django.conf import settings from api.common import log_json from api.iam.models import Customer +from api.provider.models import Provider from kafka_utils.utils import delivery_callback from kafka_utils.utils import get_producer from kafka_utils.utils import SUBS_TOPIC from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER from masu.util.aws.common import get_s3_resource +from providers.azure.client import AzureClientFactory + LOG = logging.getLogger(__name__) class SUBSDataMessenger: def __init__(self, context, schema_name, tracing_id): + self.provider_type = context["provider_type"].removesuffix("-local") + # Local azure providers shouldnt attempt to query Azure + if context["provider_type"] == Provider.PROVIDER_AZURE_LOCAL: + self.local_prov = True + else: + self.local_prov = False self.context = context self.tracing_id = tracing_id self.schema_name = schema_name @@ -34,6 +46,38 @@ def __init__(self, context, schema_name, tracing_id): self.account_id = subs_cust.account_id self.org_id = subs_cust.org_id self.download_path = mkdtemp(prefix="subs") + self.instance_map = {} + self.date_map = defaultdict(list) + + def determine_azure_instance_id(self, row): + """For Azure we have to query the instance id if its not provided by a tag.""" + if row["subs_resource_id"] in self.instance_map: + return self.instance_map.get(row["subs_resource_id"]) + # this column comes from a user defined tag allowing us to avoid querying Azure if its present. + if row["subs_instance"] != "": + instance_id = row["subs_instance"] + # attempt to query azure for instance id + else: + # if its a local Azure provider, don't query Azure + if self.local_prov: + return "" + prov = Provider.objects.get(uuid=row["source"]) + credentials = prov.account.get("credentials") + subscription_id = credentials.get("subscription_id") + tenant_id = credentials.get("tenant_id") + client_id = credentials.get("client_id") + client_secret = credentials.get("client_secret") + _factory = AzureClientFactory(subscription_id, tenant_id, client_id, client_secret) + compute_client = _factory.compute_client + resource_group = row["resourcegroup"] if row.get("resourcegroup") else row["resourcegroupname"] + response = compute_client.virtual_machines.get( + resource_group_name=resource_group, + vm_name=row["subs_resource_id"], + ) + instance_id = response.vm_id + + self.instance_map[row["subs_resource_id"]] = instance_id + return instance_id def process_and_send_subs_message(self, upload_keys): """ @@ -53,20 +97,23 @@ def process_and_send_subs_message(self, upload_keys): ) msg_count = 0 for row in reader: - # row["subs_product_ids"] is a string of numbers separated by '-' to be sent as a list - msg = self.build_subs_msg( - row["lineitem_resourceid"], - row["lineitem_usageaccountid"], - row["subs_start_time"], - row["subs_end_time"], - row["product_vcpu"], - row["subs_sla"], - row["subs_usage"], - row["subs_role"], - row["subs_product_ids"].split("-"), - ) - self.send_kafka_message(msg) - msg_count += 1 + if self.provider_type == Provider.PROVIDER_AZURE: + msg_count += self.process_azure_row(row) + else: + # row["subs_product_ids"] is a string of numbers separated by '-' to be sent as a list + msg = self.build_subs_msg( + row["subs_resource_id"], + row["subs_account"], + row["subs_start_time"], + row["subs_end_time"], + row["subs_vcpu"], + row["subs_sla"], + row["subs_usage"], + row["subs_role"], + row["subs_product_ids"].split("-"), + ) + self.send_kafka_message(msg) + msg_count += 1 LOG.info( log_json( self.tracing_id, @@ -98,13 +145,47 @@ def build_subs_msg( "timestamp": tstamp, "expiration": expiration, "measurements": [{"value": cpu_count, "uom": "vCPUs"}], - "cloud_provider": "AWS", + "cloud_provider": self.provider_type, "hardware_type": "Cloud", "product_ids": product_ids, "role": role, "sla": sla, "usage": usage, - "billing_provider": "aws", + "billing_provider": self.provider_type.lower(), "billing_account_id": billing_account_id, } return bytes(json.dumps(subs_json), "utf-8") + + def process_azure_row(self, row): + """Process an Azure row into subs kafka messages.""" + msg_count = 0 + # Azure can unexplicably generate strange records with a second entry per day + # so we track the resource ids we've seen for a specific day so we don't send a record twice + if self.date_map.get(row["subs_start_time"]) and row["subs_resource_id"] in self.date_map.get( + row["subs_start_time"] + ): + return msg_count + self.date_map[row["subs_start_time"]].append(row["subs_resource_id"]) + instance_id = self.determine_azure_instance_id(row) + if not instance_id: + return msg_count + # Azure is daily records but subs need hourly records + start = parser.parse(row["subs_start_time"]) + for i in range(int(row["subs_usage_quantity"])): + end = start + timedelta(hours=1) + msg = self.build_subs_msg( + instance_id, + row["subs_account"], + str(start), + str(end), + row["subs_vcpu"], + row["subs_sla"], + row["subs_usage"], + row["subs_role"], + row["subs_product_ids"].split("-"), + ) + # move to the next hour in the range + start = end + self.send_kafka_message(msg) + msg_count += 1 + return msg_count diff --git a/koku/subs/tasks.py b/koku/subs/tasks.py index bfb53b1dc2..b2d453d733 100644 --- a/koku/subs/tasks.py +++ b/koku/subs/tasks.py @@ -9,7 +9,6 @@ from dateutil import parser from api.common import log_json -from api.provider.models import Provider from api.utils import DateHelper from koku import celery_app from koku import settings @@ -27,12 +26,6 @@ # any additional queues should be added to this list QUEUE_LIST = [SUBS_EXTRACTION_QUEUE, SUBS_TRANSMISSION_QUEUE] -SUBS_ACCEPTED_PROVIDERS = ( - Provider.PROVIDER_AWS, - Provider.PROVIDER_AWS_LOCAL, - # Add additional accepted providers here -) - def enable_subs_extraction(schema_name: str, metered: str) -> bool: # pragma: no cover """Helper to determine if source is enabled for SUBS extraction.""" @@ -79,7 +72,8 @@ def extract_subs_data_from_reports(reports_to_extract, metered): provider_uuid = report.get("provider_uuid") tracing_id = report.get("tracing_id", report.get("manifest_uuid", str(uuid.uuid4()))) context = {"schema": schema_name, "provider_type": provider_type, "provider_uuid": provider_uuid} - if provider_type not in SUBS_ACCEPTED_PROVIDERS: + # SUBS provider type enablement is handled through the ENABLE_SUBS_PROVIDER_TYPES environment variable + if provider_type.rstrip("-local") not in settings.ENABLE_SUBS_PROVIDER_TYPES: LOG.info(log_json(tracing_id, msg="provider type not valid for subs processing", context=context)) continue if not enable_subs_extraction(schema_name, metered): diff --git a/koku/subs/test/__init__.py b/koku/subs/test/__init__.py index ea4c2281ec..b3090710f6 100644 --- a/koku/subs/test/__init__.py +++ b/koku/subs/test/__init__.py @@ -17,3 +17,6 @@ def setUpClass(cls): cls.aws_provider = Provider.objects.filter(type=Provider.PROVIDER_AWS_LOCAL).first() cls.aws_provider_type = Provider.PROVIDER_AWS_LOCAL + + cls.azure_provider = Provider.objects.filter(type=Provider.PROVIDER_AZURE_LOCAL).first() + cls.azure_provider_type = Provider.PROVIDER_AZURE_LOCAL diff --git a/koku/subs/test/test_subs_data_messenger.py b/koku/subs/test/test_subs_data_messenger.py index 934defd287..093d5b01ec 100644 --- a/koku/subs/test/test_subs_data_messenger.py +++ b/koku/subs/test/test_subs_data_messenger.py @@ -4,6 +4,7 @@ # import json import uuid +from collections import defaultdict from unittest.mock import mock_open from unittest.mock import patch @@ -18,10 +19,12 @@ class TestSUBSDataMessenger(SUBSTestCase): def setUpClass(cls): """Set up the class.""" super().setUpClass() - cls.context = {"some": "context"} + cls.context = {"some": "context", "provider_type": "AWS-local"} + cls.azure_context = {"some": "context", "provider_type": "Azure-local"} cls.tracing_id = "trace_me" with patch("subs.subs_data_messenger.get_s3_resource"): cls.messenger = SUBSDataMessenger(cls.context, cls.schema, cls.tracing_id) + cls.azure_messenger = SUBSDataMessenger(cls.azure_context, cls.schema, cls.tracing_id) @patch("subs.subs_data_messenger.os.remove") @patch("subs.subs_data_messenger.get_producer") @@ -34,10 +37,10 @@ def test_process_and_send_subs_message(self, mock_msg_builder, mock_reader, mock { "subs_start_time": "2023-07-01T01:00:00Z", "subs_end_time": "2023-07-01T02:00:00Z", - "lineitem_resourceid": "i-55555556", - "lineitem_usageaccountid": "9999999999999", + "subs_resource_id": "i-55555556", + "subs_account": "9999999999999", "physical_cores": "1", - "product_vcpu": "2", + "subs_vcpu": "2", "variant": "Server", "subs_usage": "Production", "subs_sla": "Premium", @@ -107,3 +110,169 @@ def test_send_kafka_message(self, mock_producer): kafka_msg = {"test"} self.messenger.send_kafka_message(kafka_msg) mock_producer.assert_called() + + def test_determine_azure_instance_id_tag(self): + """Test getting the azure instance id from the row provided by a tag returns as expected.""" + expected_instance = "waffle-house" + self.messenger.instance_map = {} + my_row = { + "resourceid": "i-55555556", + "subs_start_time": "2023-07-01T01:00:00Z", + "subs_end_time": "2023-07-01T02:00:00Z", + "subs_resource_id": "i-55555556", + "subs_account": "9999999999999", + "physical_cores": "1", + "subs_vcpu": "2", + "variant": "Server", + "subs_usage": "Production", + "subs_sla": "Premium", + "subs_role": "Red Hat Enterprise Linux Server", + "subs_product_ids": "479-70", + "subs_instance": expected_instance, + } + actual = self.messenger.determine_azure_instance_id(my_row) + self.assertEqual(expected_instance, actual) + + def test_determine_azure_instance_id_local_prov(self): + """Test that a local provider does not reach out to Azure.""" + self.messenger.instance_map = {} + my_row = { + "resourceid": "i-55555556", + "subs_start_time": "2023-07-01T01:00:00Z", + "subs_end_time": "2023-07-01T02:00:00Z", + "subs_resource_id": "i-55555556", + "subs_account": "9999999999999", + "physical_cores": "1", + "subs_vcpu": "2", + "variant": "Server", + "subs_usage": "Production", + "subs_sla": "Premium", + "subs_role": "Red Hat Enterprise Linux Server", + "subs_product_ids": "479-70", + "subs_instance": "", + } + actual = self.azure_messenger.determine_azure_instance_id(my_row) + self.assertEqual("", actual) + + def test_determine_azure_instance_id_from_map(self): + """Test getting the azure instance id from the instance map returns as expected.""" + expected = "oh-yeah" + self.messenger.instance_map["i-55555556"] = expected + my_row = { + "resourceid": "i-55555556", + "subs_start_time": "2023-07-01T01:00:00Z", + "subs_end_time": "2023-07-01T02:00:00Z", + "subs_resource_id": "i-55555556", + "subs_account": "9999999999999", + "physical_cores": "1", + "subs_vcpu": "2", + "variant": "Server", + "subs_usage": "Production", + "subs_sla": "Premium", + "subs_role": "Red Hat Enterprise Linux Server", + "subs_product_ids": "479-70", + "subs_instance": "fake", + } + actual = self.messenger.determine_azure_instance_id(my_row) + self.assertEqual(expected, actual) + + def test_determine_azure_instance_id(self): + """Test getting the azure instance id from mock Azure Compute Client returns as expected.""" + expected = "my-fake-id" + self.messenger.instance_map = {} + my_row = { + "resourceid": "i-55555556", + "subs_start_time": "2023-07-01T01:00:00Z", + "subs_end_time": "2023-07-01T02:00:00Z", + "subs_resource_id": "i-55555556", + "subs_account": "9999999999999", + "physical_cores": "1", + "subs_vcpu": "2", + "variant": "Server", + "subs_usage": "Production", + "subs_sla": "Premium", + "subs_role": "Red Hat Enterprise Linux Server", + "subs_product_ids": "479-70", + "subs_instance": "", + "source": self.azure_provider.uuid, + "resourcegroup": "my-fake-rg", + } + with patch("subs.subs_data_messenger.AzureClientFactory") as mock_factory: + mock_factory.return_value.compute_client.virtual_machines.get.return_value.vm_id = expected + actual = self.messenger.determine_azure_instance_id(my_row) + self.assertEqual(expected, actual) + + @patch("subs.subs_data_messenger.SUBSDataMessenger.determine_azure_instance_id") + @patch("subs.subs_data_messenger.os.remove") + @patch("subs.subs_data_messenger.get_producer") + @patch("subs.subs_data_messenger.csv.DictReader") + @patch("subs.subs_data_messenger.SUBSDataMessenger.build_subs_msg") + def test_process_and_send_subs_message_azure_with_id( + self, mock_msg_builder, mock_reader, mock_producer, mock_remove, mock_azure_id + ): + """Tests that the proper functions are called when running process_and_send_subs_message with Azure provider.""" + upload_keys = ["fake_key"] + self.azure_messenger.date_map = defaultdict(list) + mock_reader.return_value = [ + { + "resourceid": "i-55555556", + "subs_start_time": "2023-07-01T01:00:00Z", + "subs_end_time": "2023-07-01T02:00:00Z", + "subs_resource_id": "i-55555556", + "subs_account": "9999999999999", + "physical_cores": "1", + "subs_vcpu": "2", + "variant": "Server", + "subs_usage": "Production", + "subs_sla": "Premium", + "subs_role": "Red Hat Enterprise Linux Server", + "subs_usage_quantity": "4", + "subs_product_ids": "479-70", + "subs_instance": "", + "source": self.azure_provider.uuid, + "resourcegroup": "my-fake-rg", + } + ] + mock_op = mock_open(read_data="x,y,z") + with patch("builtins.open", mock_op): + self.azure_messenger.process_and_send_subs_message(upload_keys) + mock_azure_id.assert_called_once() + self.assertEqual(mock_msg_builder.call_count, 4) + self.assertEqual(mock_producer.call_count, 4) + + @patch("subs.subs_data_messenger.SUBSDataMessenger.determine_azure_instance_id") + @patch("subs.subs_data_messenger.os.remove") + @patch("subs.subs_data_messenger.get_producer") + @patch("subs.subs_data_messenger.csv.DictReader") + @patch("subs.subs_data_messenger.SUBSDataMessenger.build_subs_msg") + def test_process_and_send_subs_message_azure_time_already_processed( + self, mock_msg_builder, mock_reader, mock_producer, mock_remove, mock_azure_id + ): + """Tests that the functions are not called for a provider that has already processed.""" + upload_keys = ["fake_key"] + self.azure_messenger.date_map["2023-07-01T01:00:00Z"] = "i-55555556" + mock_reader.return_value = [ + { + "resourceid": "i-55555556", + "subs_start_time": "2023-07-01T01:00:00Z", + "subs_end_time": "2023-07-01T02:00:00Z", + "subs_resource_id": "i-55555556", + "subs_account": "9999999999999", + "physical_cores": "1", + "subs_vcpu": "2", + "variant": "Server", + "subs_usage": "Production", + "subs_sla": "Premium", + "subs_role": "Red Hat Enterprise Linux Server", + "subs_product_ids": "479-70", + "subs_instance": "", + "source": self.azure_provider.uuid, + "resourcegroup": "my-fake-rg", + } + ] + mock_op = mock_open(read_data="x,y,z") + with patch("builtins.open", mock_op): + self.azure_messenger.process_and_send_subs_message(upload_keys) + mock_azure_id.assert_not_called() + mock_msg_builder.assert_not_called() + mock_producer.assert_not_called() diff --git a/koku/subs/trino_sql/aws_subs_pre_or_clause.sql b/koku/subs/trino_sql/aws_subs_pre_or_clause.sql index f0e7be173a..933d9fde0a 100644 --- a/koku/subs/trino_sql/aws_subs_pre_or_clause.sql +++ b/koku/subs/trino_sql/aws_subs_pre_or_clause.sql @@ -2,6 +2,9 @@ SELECT *, time_split[1] as subs_start_time, time_split[2] as subs_end_time, + product_vcpu as subs_vcpu, + lineitem_usageaccountid as subs_account, + lineitem_resourceid as subs_resource_id, CASE lower(json_extract_scalar(tags, '$.com_redhat_rhel_variant')) WHEN 'workstation' THEN 'Red Hat Enterprise Linux Workstation' ELSE 'Red Hat Enterprise Linux Server' diff --git a/koku/subs/trino_sql/azure_subs_pre_or_clause.sql b/koku/subs/trino_sql/azure_subs_pre_or_clause.sql new file mode 100644 index 0000000000..446ee6ce01 --- /dev/null +++ b/koku/subs/trino_sql/azure_subs_pre_or_clause.sql @@ -0,0 +1,39 @@ +SELECT + *, + with_timezone(COALESCE(date, usagedatetime), 'UTC') as subs_start_time, + with_timezone(date_add('day', 1, COALESCE(date, usagedatetime)), 'UTC') as subs_end_time, + json_extract_scalar(lower(additionalinfo), '$.vcpus') as subs_vcpu, + COALESCE(NULLIF(subscriptionid, ''), subscriptionguid) as subs_account, + regexp_extract(COALESCE(NULLIF(resourceid, ''), instancename), '([^/]+$)') as subs_resource_id, + CAST(ceil(coalesce(nullif(quantity, 0), usagequantity)) AS INTEGER) as subs_usage_quantity, + CASE lower(json_extract_scalar(lower(tags), '$.com_redhat_rhel_variant')) + WHEN 'workstation' THEN 'Red Hat Enterprise Linux Workstation' + ELSE 'Red Hat Enterprise Linux Server' + END as subs_role, + CASE lower(json_extract_scalar(lower(tags), '$.com_redhat_rhel_usage')) + WHEN 'development/test' THEN 'Development/Test' + WHEN 'disaster recovery' THEN 'Disaster Recovery' + ELSE 'Production' + END as subs_usage, + CASE lower(json_extract_scalar(lower(tags), '$.com_redhat_rhel_sla')) + WHEN 'standard' THEN 'Standard' + WHEN 'self-support' THEN 'Self-Support' + ELSE 'Premium' + END as subs_sla, + CASE lower(json_extract_scalar(lower(tags), '$.com_redhat_rhel')) + WHEN 'rhel 7 els' THEN '69-204' + WHEN 'rhel 8 els' THEN '479-204' + ELSE '479' + END as subs_product_ids, + COALESCE(lower(json_extract_scalar(lower(tags), '$.com_redhat_rhel_instance')), '') as subs_instance +FROM + hive.{{schema | sqlsafe}}.azure_line_items +WHERE + source = {{ source_uuid }} + AND year = {{ year }} + AND month = {{ month }} + AND metercategory = 'Virtual Machines' + AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL + AND json_extract_scalar(lower(lower(tags)), '$.com_redhat_rhel') IS NOT NULL + -- ensure there is usage + AND ceil(coalesce(nullif(quantity, 0), usagequantity)) > 0