Skip to content

Commit

Permalink
Merge branch 'main' into COST-5141-vaccum-expired-partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
myersCody authored Jun 18, 2024
2 parents f220e74 + 4513035 commit 6369174
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 65 deletions.
14 changes: 10 additions & 4 deletions dev/scripts/nise_ymls/ocp_on_aws/aws_static_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ generators:
resourceTags/user:environment: dev
resourceTags/user:version: beta
resourceTags/user:dashed-key-on-aws: dashed-value
resourceTags/user:com_REDHAT_rhel: RHEL 8 ELS
resourceTags/user:com_REDHAT_rhel: 8
resourceTags/user:com_REDHAT_rhel_addon: ELS
resourceTags/user:com_redhat_rhel_sla: self-support
resourceTags/user:CoM_RedHat_Rhel_varianT: Workstation
resourceTags/user:com_redhat_rhel_usage: disaster Recovery
resourceTags/user:com_redhat_rhel_conversion: True
resourceTags/user:Mapping: b1
resourceTags/user:Map: b2
resourceTags/user:Name: instance-name-2
Expand All @@ -64,7 +66,9 @@ generators:
resourceTags/user:environment: dev
resourceTags/user:version: gamma
resourceTags/user:dashed-key-on-aws: dashed-value
resourceTags/user:com_REDHAT_rhel: RHEL 7 ELS
resourceTags/user:com_REDHAT_rhel: 7
resourceTags/user:com_REDHAT_addon: ELS
resourceTags/user:com_REDHAT_conversion: True
resourceTags/user:com_redhat_rhel_sla: Standard
resourceTags/user:CoM_RedHat_Rhel_varianT: Workstation
resourceTags/user:Mapping: c1
Expand Down Expand Up @@ -93,7 +97,9 @@ generators:
resourceTags/user:environment: dev
resourceTags/user:version: beta
resourceTags/user:dashed-key-on-aws: dashed-value
resourceTags/user:com_REDHAT_rhel: RHEL 8 ELS
resourceTags/user:com_REDHAT_rhel: 8
resourceTags/user:com_REDHAT_rhel_addon: ELS
resourceTags/user:com_REDHAT_rhel_conversion: True
resourceTags/user:com_redhat_rhel_sla: self-support
resourceTags/user:CoM_RedHat_Rhel_varianT: hPC
resourceTags/user:com_redhat_rhel_usage: disaster Recovery
Expand All @@ -120,7 +126,7 @@ generators:
resourceTags/user:environment: dev
resourceTags/user:version: beta
resourceTags/user:dashed-key-on-aws: dashed-value
resourceTags/user:com_REDHAT_rhel: RHEL 7 els
resourceTags/user:com_REDHAT_rhel: 7
resourceTags/user:com_redhat_rhel_sla: self-support
resourceTags/user:CoM_RedHat_Rhel_varianT: hPC
resourceTags/user:com_redhat_rhel_usage: disaster Recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ CREATE TABLE IF NOT EXISTS {{schema | sqlsafe}}.gcp_openshift_daily_resource_mat
instance_type varchar,
service_id varchar,
service_alias varchar,
data_transfer_direction varchar,
sku_id varchar,
sku_alias varchar,
region varchar,
Expand Down Expand Up @@ -134,6 +135,7 @@ CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.reporting_ocpgcpcostlineite
instance_type varchar,
service_id varchar,
service_alias varchar,
data_transfer_direction varchar,
sku_id varchar,
sku_alias varchar,
region varchar,
Expand Down
4 changes: 2 additions & 2 deletions koku/masu/management/commands/migrate_trino_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,11 @@ def drop_expired_partitions(tables, schema):
source_column_param = manage_table_mapping[table]
if not check_table_exists(schema, table):
LOG.info(f"{table} does not exist for {schema}")
return
continue
expired_partitions = find_expired_partitions(schema, months, table, source_column_param)
if not expired_partitions:
LOG.info(f"No expired partitions found for {table} {schema}")
return
continue
LOG.info(f"Found {len(expired_partitions)}")
for partition in expired_partitions:
year, month, source = partition
Expand Down
86 changes: 77 additions & 9 deletions koku/subs/subs_data_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@

LOG = logging.getLogger(__name__)

HPC_ROLE = "Red Hat Enterprise Linux Compute Node"
SAP_ROLE = "SAP"
RHEL_7 = "69"
RHEL_8 = "479"
ELS = "204"
RHEL_7_HPC_ID = "76"
RHEL_7_SAP_ID = "146"
RHEL_8_HPC_ID = "479"
RHEL_8_SAP_ID = "241"


class SUBSDataMessenger:
def __init__(self, context, schema_name, tracing_id):
Expand Down Expand Up @@ -106,10 +116,12 @@ def process_and_send_subs_message(self, upload_keys):
row["subs_start_time"],
row["subs_end_time"],
row["subs_vcpu"],
row["subs_rhel_version"],
row["subs_sla"],
row["subs_usage"],
row["subs_role"],
row["subs_product_ids"].split("-"),
row["subs_conversion"],
row["subs_addon_id"],
)
msg = bytes(json.dumps(subs_dict), "utf-8")
self.send_kafka_message(msg)
Expand All @@ -130,8 +142,11 @@ def send_kafka_message(self, msg):
producer.produce(SUBS_TOPIC, key=self.org_id, value=msg, callback=delivery_callback)
producer.poll(0)

def build_base_subs_dict(self, instance_id, tstamp, expiration, cpu_count, sla, usage, role, product_ids):
def build_base_subs_dict(
self, instance_id, tstamp, expiration, cpu_count, version, sla, usage, role, conversion, addon
):
"""Gathers the relevant information for the kafka message and returns a filled dictionary of information."""
product_ids = self.determine_product_ids(version, addon, role)
subs_dict = {
"event_id": str(uuid.uuid4()),
"event_source": "cost-management",
Expand All @@ -149,29 +164,55 @@ def build_base_subs_dict(self, instance_id, tstamp, expiration, cpu_count, sla,
"sla": sla,
"usage": usage,
"billing_provider": self.provider_type.lower(),
"conversion": True,
}
if conversion == "true":
subs_dict["conversion"] = True
else:
subs_dict["conversion"] = False
# SAP is identified only through product ids and does not have an associated Role
if role != "SAP":
if role != SAP_ROLE:
subs_dict["role"] = role
return subs_dict

def build_aws_subs_dict(
self, instance_id, billing_account_id, tstamp, expiration, cpu_count, sla, usage, role, product_ids
self,
instance_id,
billing_account_id,
tstamp,
expiration,
cpu_count,
version,
sla,
usage,
role,
conversion,
addon,
):
"""Adds AWS specific fields to the base subs dict."""
subs_dict = self.build_base_subs_dict(
instance_id, tstamp, expiration, cpu_count, sla, usage, role, product_ids
instance_id, tstamp, expiration, cpu_count, version, sla, usage, role, conversion, addon
)
subs_dict["billing_account_id"] = billing_account_id
return subs_dict

def build_azure_subs_dict(
self, instance_id, billing_account_id, tstamp, expiration, cpu_count, sla, usage, role, product_ids, tenant_id
self,
instance_id,
billing_account_id,
tstamp,
expiration,
cpu_count,
version,
sla,
usage,
role,
conversion,
addon,
tenant_id,
):
"""Adds Azure specific fields to the base subs dict."""
subs_dict = self.build_base_subs_dict(
instance_id, tstamp, expiration, cpu_count, sla, usage, role, product_ids
instance_id, tstamp, expiration, cpu_count, version, sla, usage, role, conversion, addon
)
subs_dict["azure_subscription_id"] = billing_account_id
subs_dict["azure_tenant_id"] = tenant_id
Expand Down Expand Up @@ -200,10 +241,12 @@ def process_azure_row(self, row):
start.isoformat(),
end.isoformat(),
row["subs_vcpu"],
row["subs_rhel_version"],
row["subs_sla"],
row["subs_usage"],
row["subs_role"],
row["subs_product_ids"].split("-"),
row["subs_conversion"],
row["subs_addon_id"],
tenant_id,
)
msg = bytes(json.dumps(subs_dict), "utf-8")
Expand All @@ -212,3 +255,28 @@ def process_azure_row(self, row):
self.send_kafka_message(msg)
msg_count += 1
return msg_count

def determine_product_ids(self, rhel_version, addon, role):
"""Determine the appropriate product id's based on the RHEL version, addon and role.
HPC variants overwrite the product id's and are handled via ROLE.
SAP variants are additional product id's.
"""
product_ids = []
if rhel_version == RHEL_7:
if role == HPC_ROLE:
return [RHEL_7_HPC_ID]
elif role == SAP_ROLE:
product_ids.append(RHEL_7_SAP_ID)
product_ids.append(RHEL_7)
elif rhel_version == RHEL_8:
if role == HPC_ROLE:
return [RHEL_8_HPC_ID]
elif role == SAP_ROLE:
product_ids.append(RHEL_8_SAP_ID)
product_ids.append(RHEL_8)

if addon == ELS:
product_ids.append(ELS)

return product_ids
Loading

0 comments on commit 6369174

Please sign in to comment.