diff --git a/koku/subs/subs_data_messenger.py b/koku/subs/subs_data_messenger.py index 511eb1b88b..e1b61ce1d7 100644 --- a/koku/subs/subs_data_messenger.py +++ b/koku/subs/subs_data_messenger.py @@ -57,7 +57,7 @@ def __init__(self, context, schema_name, tracing_id): self.org_id = subs_cust.org_id self.download_path = mkdtemp(prefix="subs") self.instance_map = {} - self.date_map = defaultdict(list) + self.date_map = defaultdict(dict) def determine_azure_instance_and_tenant_id(self, row): """For Azure we have to query the instance id if its not provided by a tag and the tenant_id.""" @@ -222,18 +222,23 @@ def process_azure_row(self, row): """Process an Azure row into subs kafka messages.""" msg_count = 0 # Azure can unexplicably generate strange records with a second entry per day - # so we track the resource ids we've seen for a specific day so we don't send a record twice - if self.date_map.get(row["subs_start_time"]) and row["subs_resource_id"] in self.date_map.get( - row["subs_start_time"] - ): - return msg_count - self.date_map[row["subs_start_time"]].append(row["subs_resource_id"]) + # these two values should sum to the total usage so we need to track what was already + # sent for a specific instance so we get the full usage amount + range_start = 0 + resource_id = row["subs_resource_id"] + start_time = row["subs_start_time"] + usage = int(row["subs_usage_quantity"]) + if self.date_map.get(start_time) and resource_id in self.date_map.get(start_time): + range_start = self.date_map.get(start_time).get(resource_id) + self.date_map[start_time] = {resource_id: usage + range_start} instance_id, tenant_id = self.determine_azure_instance_and_tenant_id(row) if not instance_id: return msg_count # Azure is daily records but subs need hourly records - start = parser.parse(row["subs_start_time"]) - for i in range(int(row["subs_usage_quantity"])): + start = parser.parse(start_time) + # if data for the day was previously sent, start at hour following previous events + start = start + timedelta(hours=range_start) + for i in range(range_start, range_start + usage): end = start + timedelta(hours=1) subs_dict = self.build_azure_subs_dict( instance_id, diff --git a/koku/subs/test/test_subs_data_messenger.py b/koku/subs/test/test_subs_data_messenger.py index 12bfa7eaba..daae1b1875 100644 --- a/koku/subs/test/test_subs_data_messenger.py +++ b/koku/subs/test/test_subs_data_messenger.py @@ -536,25 +536,41 @@ def test_process_and_send_subs_message_azure_with_id(self, mock_reader, mock_pro def test_process_and_send_subs_message_azure_time_already_processed( self, mock_msg_builder, mock_reader, mock_producer, mock_remove, mock_azure_id ): - """Tests that the functions are not called for a provider that has already processed.""" + """Tests that the start for the range is updated.""" + mock_azure_id.return_value = ("expected", "expected") + mock_msg_builder.return_value = {"fake": "msg"} upload_keys = ["fake_key"] - self.azure_messenger.date_map["2023-07-01T01:00:00Z"] = "i-55555556" + self.azure_messenger.date_map = {"2024-07-01T00:00:00Z": {"i-55555556": 12}} + instance = "expected" + account = "9999999999999" + vcpu = "2" + rhel_version = "7" + sla = "Premium" + usage = "Production" + role = "Red Hat Enterprise Linux Server" + conversion = "true" + addon_id = "ELS" + tenant_id = "expected" + expected_start = "2024-07-01T12:00:00+00:00" + expected_end = "2024-07-01T13:00:00+00:00" mock_reader.return_value = [ { "resourceid": "i-55555556", - "subs_start_time": "2023-07-01T01:00:00Z", - "subs_end_time": "2023-07-01T02:00:00Z", + "subs_start_time": "2024-07-01T00:00:00Z", + "subs_end_time": "2024-07-02T00:00:00Z", "subs_resource_id": "i-55555556", - "subs_account": "9999999999999", + "subs_account": account, "physical_cores": "1", "subs_vcpu": "2", "variant": "Server", - "subs_usage": "Production", - "subs_sla": "Premium", - "subs_role": "Red Hat Enterprise Linux Server", - "subs_product_ids": "479-70", - "subs_addon": "false", - "subs_instance": "", + "subs_usage": usage, + "subs_usage_quantity": "1", + "subs_sla": sla, + "subs_role": role, + "subs_rhel_version": rhel_version, + "subs_addon_id": addon_id, + "subs_instance": instance, + "subs_conversion": conversion, "source": self.azure_provider.uuid, "resourcegroup": "my-fake-rg", } @@ -562,9 +578,22 @@ def test_process_and_send_subs_message_azure_time_already_processed( mock_op = mock_open(read_data="x,y,z") with patch("builtins.open", mock_op): self.azure_messenger.process_and_send_subs_message(upload_keys) - mock_azure_id.assert_not_called() - mock_msg_builder.assert_not_called() - mock_producer.assert_not_called() + mock_azure_id.assert_called_once() + mock_msg_builder.assert_called_with( + instance, + account, + expected_start, + expected_end, + vcpu, + rhel_version, + sla, + usage, + role, + conversion, + addon_id, + tenant_id, + ) + mock_producer.assert_called_once() def test_determine_product_ids(self): """Test that different combinations of inputs result in expected product IDs"""