Skip to content

Commit

Permalink
break Azure records out into a kafka message per hour
Browse files Browse the repository at this point in the history
  • Loading branch information
cgoodfred committed Dec 12, 2023
1 parent 930268e commit 3545f3d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 18 deletions.
54 changes: 38 additions & 16 deletions koku/subs/subs_data_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import os
import uuid
from collections import defaultdict
from datetime import timedelta
from tempfile import mkdtemp

from dateutil import parser
from django.conf import settings

from api.common import log_json
Expand Down Expand Up @@ -106,21 +108,41 @@ def process_and_send_subs_message(self, upload_keys):
instance_id = self.determine_azure_instance_id(row)
if not instance_id:
continue

Check warning on line 110 in koku/subs/subs_data_messenger.py

View check run for this annotation

Codecov / codecov/patch

koku/subs/subs_data_messenger.py#L110

Added line #L110 was not covered by tests
row["subs_resource_id"] = instance_id
# row["subs_product_ids"] is a string of numbers separated by '-' to be sent as a list
msg = self.build_subs_msg(
row["subs_resource_id"],
row["subs_account"],
row["subs_start_time"],
row["subs_end_time"],
row["subs_vcpu"],
row["subs_sla"],
row["subs_usage"],
row["subs_role"],
row["subs_product_ids"].split("-"),
)
self.send_kafka_message(msg)
msg_count += 1
# Azure is daily records but subs need hourly records
start = parser.parse(row["subs_start_time"])
LOG.info(f"start\n{start}\n{type(start)}")
for i in range(int(row["subs_usage_quantity"])):
end = start + timedelta(hours=1)
msg = self.build_subs_msg(
instance_id,
row["subs_account"],
str(start),
str(end),
row["subs_vcpu"],
row["subs_sla"],
row["subs_usage"],
row["subs_role"],
row["subs_product_ids"].split("-"),
)
# move to the next hour in the range
start = end
self.send_kafka_message(msg)
msg_count += 1
else:
# row["subs_product_ids"] is a string of numbers separated by '-' to be sent as a list
msg = self.build_subs_msg(
row["subs_resource_id"],
row["subs_account"],
row["subs_start_time"],
row["subs_end_time"],
row["subs_vcpu"],
row["subs_sla"],
row["subs_usage"],
row["subs_role"],
row["subs_product_ids"].split("-"),
)
self.send_kafka_message(msg)
msg_count += 1
LOG.info(
log_json(
self.tracing_id,
Expand Down Expand Up @@ -158,7 +180,7 @@ def build_subs_msg(
"role": role,
"sla": sla,
"usage": usage,
"billing_provider": "aws",
"billing_provider": self.provider_type.lower(),
"billing_account_id": billing_account_id,
}
return bytes(json.dumps(subs_json), "utf-8")
5 changes: 3 additions & 2 deletions koku/subs/test/test_subs_data_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def test_process_and_send_subs_message_azure_with_id(
"subs_usage": "Production",
"subs_sla": "Premium",
"subs_role": "Red Hat Enterprise Linux Server",
"subs_usage_quantity": "4",
"subs_product_ids": "479-70",
"subs_instance": "",
"source": self.azure_provider.uuid,
Expand All @@ -236,8 +237,8 @@ def test_process_and_send_subs_message_azure_with_id(
with patch("builtins.open", mock_op):
self.azure_messenger.process_and_send_subs_message(upload_keys)
mock_azure_id.assert_called_once()
mock_msg_builder.assert_called_once()
mock_producer.assert_called_once()
self.assertEqual(mock_msg_builder.call_count, 4)
self.assertEqual(mock_producer.call_count, 4)

@patch("subs.subs_data_messenger.SUBSDataMessenger.determine_azure_instance_id")
@patch("subs.subs_data_messenger.os.remove")
Expand Down
3 changes: 3 additions & 0 deletions koku/subs/trino_sql/azure_subs_pre_or_clause.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SELECT
json_extract_scalar(lower(additionalinfo), '$.vcpus') as subs_vcpu,
COALESCE(NULLIF(subscriptionid, ''), subscriptionguid) as subs_account,
regexp_extract(COALESCE(NULLIF(resourceid, ''), instancename), '([^/]+$)') as subs_resource_id,
CAST(ceil(coalesce(nullif(quantity, 0), usagequantity)) AS INTEGER) as subs_usage_quantity,
CASE lower(json_extract_scalar(lower(tags), '$.com_redhat_rhel_variant'))
WHEN 'workstation' THEN 'Red Hat Enterprise Linux Workstation'
ELSE 'Red Hat Enterprise Linux Server'
Expand Down Expand Up @@ -34,3 +35,5 @@ WHERE
AND metercategory = 'Virtual Machines'
AND json_extract_scalar(lower(additionalinfo), '$.vcpus') IS NOT NULL
AND json_extract_scalar(lower(lower(tags)), '$.com_redhat_rhel') IS NOT NULL
-- ensure there is usage
AND ceil(coalesce(nullif(quantity, 0), usagequantity)) > 0

0 comments on commit 3545f3d

Please sign in to comment.