Skip to content

Commit

Permalink
[COST-3814] Add Azure source processing to subs flow (#4831)
Browse files Browse the repository at this point in the history
* Add Azure processing to subs flow

* handle enabled subs provider types via environment variable

* break Azure records out into a kafka message per hour
  • Loading branch information
cgoodfred committed Dec 13, 2023
1 parent 5fdf225 commit 6b31bb1
Show file tree
Hide file tree
Showing 15 changed files with 456 additions and 59 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ adal = "*"
app-common-python = ">=0.2.3"
azure-identity = "*"
azure-mgmt-costmanagement = ">=3.0.0"
azure-mgmt-compute = "*"
azure-mgmt-resource = ">=8.0"
azure-mgmt-storage = ">=20.1.0"
azure-storage-blob = ">=12.1"
Expand Down
10 changes: 9 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3960,6 +3960,8 @@ objects:
value: ${ENHANCED_ORG_ADMIN}
- name: ENABLE_SUBS_DEBUG
value: ${ENABLE_SUBS_DEBUG}
- name: ENABLE_SUBS_PROVIDER_TYPES
value: ${ENABLE_SUBS_PROVIDER_TYPES}
image: ${IMAGE}:${IMAGE_TAG}
livenessProbe:
failureThreshold: 5
Expand Down Expand Up @@ -4642,6 +4644,10 @@ parameters:
displayName: Enable ROS Debug
name: ENABLE_ROS_DEBUG
value: "False"
- description: Enable SUBS Provider Types for Processing
displayname: Enable SUBS Provider Types
name: ENABLE_SUBS_PROVIDER_TYPES
value: AWS
- displayName: Minimum replicas
name: KOKU_REPLICAS
required: true
Expand Down
4 changes: 4 additions & 0 deletions deploy/kustomize/base/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,7 @@ parameters:
displayName: Enable ROS Debug
name: ENABLE_ROS_DEBUG
value: "False"
- description: Enable SUBS Provider Types for Processing
displayname: Enable SUBS Provider Types
name: ENABLE_SUBS_PROVIDER_TYPES
value: "AWS"
2 changes: 2 additions & 0 deletions deploy/kustomize/patches/worker-subs-extraction.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
value: ${ENHANCED_ORG_ADMIN}
- name: ENABLE_SUBS_DEBUG
value: ${ENABLE_SUBS_DEBUG}
- name: ENABLE_SUBS_PROVIDER_TYPES
value: ${ENABLE_SUBS_PROVIDER_TYPES}
livenessProbe:
httpGet:
path: /livez
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ services:
- ENABLE_S3_ARCHIVING=${ENABLE_S3_ARCHIVING-False}
- ENABLE_HCS_DEBUG=${ENABLE_HCS_DEBUG-False}
- ENABLE_SUBS_DEBUG=${ENABLE_SUBS_DEBUG-False}
- ENABLE_SUBS_PROVIDER_TYPES=${ENABLE_SUBS_PROVIDER_TYPES}
- ENABLE_ROS_DEBUG=${ENABLE_ROS_DEBUG-False}
- S3_BUCKET_NAME=${S3_BUCKET_NAME-koku-bucket}
- S3_BUCKET_PATH=${S3_BUCKET_PATH-data_archive}
Expand Down Expand Up @@ -318,6 +319,7 @@ services:
- KOKU_API_PORT=${KOKU_API_PORT-8000}
- KOKU_API_PATH_PREFIX=${KOKU_API_PATH_PREFIX-/api/cost-management/v1}
- ENABLE_SUBS_DEBUG=${ENABLE_SUBS_DEBUG-False}
- ENABLE_SUBS_PROVIDER_TYPES=${ENABLE_SUBS_PROVIDER_TYPES}
- S3_BUCKET_NAME=${S3_BUCKET_NAME-koku-bucket}
- S3_BUCKET_PATH=${S3_BUCKET_PATH-data_archive}
- S3_ENDPOINT
Expand Down
1 change: 1 addition & 0 deletions koku/koku/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@

# SUBS Data Extraction debugging
ENABLE_SUBS_DEBUG = ENVIRONMENT.bool("ENABLE_SUBS_DEBUG", default=False)
ENABLE_SUBS_PROVIDER_TYPES = ENVIRONMENT.list("ENABLE_SUBS_PROVIDER_TYPES", default=["AWS"])

# ROS debugging
ENABLE_ROS_DEBUG = ENVIRONMENT.bool("ENABLE_ROS_DEBUG", default=False)
6 changes: 6 additions & 0 deletions koku/providers/azure/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#
"""Azure Client Configuration."""
from azure.identity import ClientSecretCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.costmanagement import CostManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.storage import StorageManagementClient
Expand Down Expand Up @@ -70,6 +71,11 @@ def storage_client(self):
"""Get storage client with subscription and credentials."""
return StorageManagementClient(self.credentials, self.subscription_id)

@property
def compute_client(self):
"""Get compute client with subscription and credentials."""
return ComputeManagementClient(self.credentials, self.subscription_id)

@property
def subscription_id(self):
"""Subscription ID property."""
Expand Down
138 changes: 108 additions & 30 deletions koku/subs/subs_data_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,89 @@
from reporting.models import SubsIDMap
from reporting.models import SubsLastProcessed
from reporting.provider.aws.models import TRINO_LINE_ITEM_TABLE as AWS_TABLE
from reporting.provider.azure.models import TRINO_LINE_ITEM_TABLE as AZURE_TABLE


LOG = logging.getLogger(__name__)

TABLE_MAP = {
Provider.PROVIDER_AWS: AWS_TABLE,
Provider.PROVIDER_AZURE: AZURE_TABLE,
}

ID_COLUMN_MAP = {
Provider.PROVIDER_AWS: "lineitem_usageaccountid",
Provider.PROVIDER_AZURE: "COALESCE(NULLIF(subscriptionid, ''), subscriptionguid)",
}

RECORD_FILTER_MAP = {
Provider.PROVIDER_AWS: (
" lineitem_productcode = 'AmazonEC2' AND lineitem_lineitemtype IN ('Usage', 'SavingsPlanCoveredUsage') "
"AND product_vcpu != '' AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0"
),
Provider.PROVIDER_AZURE: (
" metercategory = 'Virtual Machines' AND chargetype = 'Usage' "
"AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL "
"AND json_extract_scalar(lower(tags), '$.com_redhat_rhel') IS NOT NULL"
),
}

RESOURCE_ID_FILTER_MAP = {
Provider.PROVIDER_AWS: (
" AND lineitem_productcode = 'AmazonEC2' "
"AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0 AND lineitem_usageaccountid = {{usage_account}}"
),
Provider.PROVIDER_AZURE: (
" AND metercategory = 'Virtual Machines' "
"AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL "
"AND json_extract_scalar(lower(tags), '$.com_redhat_rhel') IS NOT NULL "
"AND (subscriptionid = {{usage_account}} or subscriptionguid = {{usage_account}}) "
),
}

RESOURCE_SELECT_MAP = {
Provider.PROVIDER_AWS: " SELECT lineitem_resourceid, max(lineitem_usagestartdate) ",
Provider.PROVIDER_AZURE: " SELECT coalesce(NULLIF(resourceid, ''), instancename), date_add('day', -1, max(coalesce(date, usagedatetime))) ", # noqa E501
}

RESOURCE_ID_GROUP_BY_MAP = {
Provider.PROVIDER_AWS: " GROUP BY lineitem_resourceid",
Provider.PROVIDER_AZURE: " GROUP BY resourceid, instancename",
}

RESOURCE_ID_EXCLUSION_CLAUSE_MAP = {
Provider.PROVIDER_AWS: " AND lineitem_resourceid NOT IN {{excluded_ids | inclause}} ",
Provider.PROVIDER_AZURE: " and coalesce(NULLIF(resourceid, ''), instancename) NOT IN {{excluded_ids | inclause}} ",
}

RESOURCE_ID_SQL_CLAUSE_MAP = {
Provider.PROVIDER_AWS: (
" ( lineitem_resourceid = {{{{ rid_{0} }}}} "
" AND lineitem_usagestartdate >= {{{{ start_date_{0} }}}} "
" AND lineitem_usagestartdate <= {{{{ end_date_{0} }}}}) "
),
Provider.PROVIDER_AZURE: (
" ( coalesce(NULLIF(resourceid, ''), instancename) = {{{{ rid_{0} }}}} "
"AND coalesce(date, usagedatetime) >= {{{{ start_date_{0} }}}} "
"AND coalesce(date, usagedatetime) <= {{{{ end_date_{0} }}}}) "
),
}

POST_OR_CLAUSE_SQL_MAP = {
Provider.PROVIDER_AWS: """
OFFSET
{{ offset }}
LIMIT
{{ limit }}
)
WHERE json_extract_scalar(tags, '$.com_redhat_rhel') IS NOT NULL
""",
Provider.PROVIDER_AZURE: """
OFFSET
{{ offset }}
LIMIT
{{ limit }}
""",
}


Expand All @@ -40,11 +118,20 @@ def __init__(self, tracing_id, context):
microsecond=0, second=0, minute=0, hour=0
) - timedelta(days=1)
self.tracing_id = tracing_id
self.table = TABLE_MAP.get(self.provider_type)
self.s3_resource = get_s3_resource(
settings.S3_SUBS_ACCESS_KEY, settings.S3_SUBS_SECRET, settings.S3_SUBS_REGION
)
self.context = context
# The following variables all change depending on the provider type to run the correct SQL
self.table = TABLE_MAP.get(self.provider_type)
self.id_column = ID_COLUMN_MAP.get(self.provider_type)
self.provider_where_clause = RECORD_FILTER_MAP.get(self.provider_type)
self.resource_select_sql = RESOURCE_SELECT_MAP.get(self.provider_type)
self.resource_id_where_clause = RESOURCE_ID_FILTER_MAP.get(self.provider_type)
self.resource_id_group_by = RESOURCE_ID_GROUP_BY_MAP.get(self.provider_type)
self.resource_id_sql_clause = RESOURCE_ID_SQL_CLAUSE_MAP.get(self.provider_type)
self.resource_id_exclusion_clause = RESOURCE_ID_EXCLUSION_CLAUSE_MAP.get(self.provider_type)
self.post_or_clause_sql = POST_OR_CLAUSE_SQL_MAP.get(self.provider_type)

@cached_property
def subs_s3_path(self):
Expand All @@ -62,7 +149,11 @@ def get_latest_processed_dict_for_provider(self, year, month):
# and we want to gather new data we have not processed yet
# so we add one second to the last timestamp to ensure the time range processed
# is all new data
lpt_dict[rid] = latest_time + timedelta(seconds=1)
if self.provider_type != Provider.PROVIDER_AZURE:
lpt_dict[rid] = latest_time + timedelta(seconds=1)
# Azure is daily timestamps so we do not need to increase this by 1 since this was the previous end
else:
lpt_dict[rid] = latest_time
return lpt_dict

def determine_latest_processed_time_for_provider(self, rid, year, month):
Expand All @@ -87,17 +178,19 @@ def determine_ids_for_provider(self, year, month):
SubsIDMap.objects.exclude(source_uuid=self.provider_uuid).values_list("usage_id", flat=True)
)
sql = (
"SELECT DISTINCT lineitem_usageaccountid FROM hive.{{schema | sqlsafe}}.aws_line_items WHERE"
"SELECT DISTINCT {{id_column | sqlsafe}} FROM hive.{{schema | sqlsafe}}.{{table | sqlsafe}} WHERE"
" source={{source_uuid}} AND year={{year}} AND month={{month}}"
)
if excluded_ids:
sql += "AND lineitem_usageaccountid NOT IN {{excluded_ids | inclause}}"
sql += " AND {{id_column | sqlsafe}} NOT IN {{excluded_ids | inclause}}"
sql_params = {
"schema": self.schema,
"source_uuid": self.provider_uuid,
"year": year,
"month": month,
"excluded_ids": excluded_ids,
"id_column": self.id_column,
"table": self.table,
}
ids = self._execute_trino_raw_sql_query(
sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_ids_for_provider"
Expand All @@ -120,11 +213,9 @@ def determine_line_item_count(self, where_clause, sql_params):

def determine_where_clause_and_params(self, year, month):
"""Determine the where clause to use when processing subs data"""
where_clause = (
"WHERE source={{source_uuid}} AND year={{year}} AND month={{month}} AND"
" lineitem_productcode = 'AmazonEC2' AND lineitem_lineitemtype IN ('Usage', 'SavingsPlanCoveredUsage') AND"
" product_vcpu != '' AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0"
)
where_clause = "WHERE source={{source_uuid}} AND year={{year}} AND month={{month}} AND"
# different provider types have different required filters here
where_clause += self.provider_where_clause
sql_params = {
"source_uuid": self.provider_uuid,
"year": year,
Expand All @@ -139,23 +230,22 @@ def get_resource_ids_for_usage_account(self, usage_account, year, month):
excluded_ids = list(
SubsLastProcessed.objects.exclude(source_uuid=self.provider_uuid).values_list("resource_id", flat=True)
)
sql = (
"SELECT lineitem_resourceid, max(lineitem_usagestartdate)"
" FROM hive.{{schema | sqlsafe}}.aws_line_items WHERE"
sql = self.resource_select_sql + (
" FROM hive.{{schema | sqlsafe}}.{{table | sqlsafe}} WHERE"
" source={{source_uuid}} AND year={{year}} AND month={{month}}"
" AND lineitem_productcode = 'AmazonEC2' AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0"
" AND lineitem_usageaccountid = {{usage_account}}"
)
sql += self.resource_id_where_clause
if excluded_ids:
sql += "AND lineitem_resourceid NOT IN {{excluded_ids | inclause}}"
sql += " GROUP BY lineitem_resourceid"
sql += self.resource_id_exclusion_clause
sql += self.resource_id_group_by
sql_params = {
"schema": self.schema,
"source_uuid": self.provider_uuid,
"year": year,
"month": month,
"excluded_ids": excluded_ids,
"usage_account": usage_account,
"table": self.table,
}
ids = self._execute_trino_raw_sql_query(
sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_rids_for_provider"
Expand All @@ -174,25 +264,13 @@ def gather_and_upload_for_resource_batch(self, year, month, batch, base_filename
sql_params[f"rid_{i}"] = rid
sql_params[f"start_date_{i}"] = start_time
sql_params[f"end_date_{i}"] = end_time
rid_sql_clause += (
"( lineitem_resourceid = {{{{ rid_{0} }}}}"
" AND lineitem_usagestartdate >= {{{{ start_date_{0} }}}}"
" AND lineitem_usagestartdate <= {{{{ end_date_{0} }}}})"
).format(i)
rid_sql_clause += self.resource_id_sql_clause.format(i)
if i < len(batch) - 1:
rid_sql_clause += " OR "
rid_sql_clause += " )"
where_clause += rid_sql_clause
summary_sql += rid_sql_clause
summary_sql += """
OFFSET
{{ offset }}
LIMIT
{{ limit }}
)
-- this ensures the required `com_redhat_rhel` tag exists in the set of tags since the above match is not exact
WHERE json_extract_scalar(tags, '$.com_redhat_rhel') IS NOT NULL
"""
summary_sql += self.post_or_clause_sql
total_count = self.determine_line_item_count(where_clause, sql_params)
LOG.debug(
log_json(
Expand Down
Loading

0 comments on commit 6b31bb1

Please sign in to comment.