diff --git a/tools/dataproc-job-metric-utility/README.md b/tools/dataproc-job-metric-utility/README.md new file mode 100644 index 0000000000..67afa6713e --- /dev/null +++ b/tools/dataproc-job-metric-utility/README.md @@ -0,0 +1,288 @@ +# dataproc-job-metric-utility + +---- + +## Table Of Contents + +1. [Use Case](#use-case) +2. [About](#about) +3. [Guide](#guide) + +---- + +## use-case + +Collect various metrics for dataproc jobs and store in GCS and BigQuery. + +---- + +## about + +This repository contains a python script to collect dataproc job metrics. These metrics provide deeper insight into the performance of Dataproc jobs. The user can compare dataproc job runs with different dataproc job/cluster configurations and property settings. Also helpful when comparing Dataproc jobs with on-prem hadoop, spark, etc. jobs. + +This utility can be scheduled via Cloud Functions + Cloud Scheduler, Cloud Workflows, or utilized in an Airflow DAG on Cloud Composer for continuous metric collection and historical analysis. + +---- + +## guide + +**project_id**: (required) Google Cloud project ID. + +**region**: (required) Google Cloud region where the Dataproc jobs ran. + +**bq_dataset**: (required) BigQuery (BQ) dataset to store metrics. + +**bq_table**: (required) BQ table to store metrics. will create this table if it does not exist. + +**bucket_name**: (required) Google Cloud Storage (GCS) bucket to store metrics data and load objects into BQ from. + +**hours**: (optional) Number of previous hours to collect job metrics from. defaults to 24 (1 day). + +**blob_name**: (optional) Name of the GCS blob/object. defaults to `dataproc_metrics.json`. Will be prefixed by today's date. example `01012024_dataproc_metrics.json` + + +### sample usage + +``` +python3 collect.py \ + --project_id cy-artifacts \ + --region us-central1 \ + --bq_dataset sandbox \ + --bq_table dp_metrics \ + --bucket_name cy-sandbox \ + --hours 480 +``` + +### sample BigQuery table schema + +```sql +CREATE TABLE + `..dataproc_job_metrics` ( yarnMetrics STRUCT>>, + logAggregationStatus STRING, + clusterUsagePercentage FLOAT64, + queueUsagePercentage FLOAT64, + numAMContainerPreempted INT64, + memorySeconds INT64, + preemptedResourceVCores INT64, + runningContainers INT64, + reservedVCores INT64, + reservedMB INT64, + allocatedVCores INT64, + amRPCAddress STRING, + preemptedResourceSecondsMap STRING, + applicationTags STRING, + elapsedTime INT64, + diagnostics STRING, + finishedTime INT64, + applicationType STRING, + startedTime INT64, + priority INT64, + launchTime INT64, + amHostHttpAddress STRING, + unmanagedApplication BOOL, + id STRING, + trackingUI STRING, + trackingUrl STRING, + masterNodeId STRING, + allocatedMB INT64, + progress FLOAT64, + name STRING, + numNonAMContainerPreempted INT64, + state STRING, + resourceSecondsMap STRUCT>, + vcoreSeconds INT64, + queue STRING, + amNodeLabelExpression STRING, + amContainerLogs STRING, + preemptedMemorySeconds INT64, + finalStatus STRING, + preemptedVcoreSeconds INT64, + user STRING, + preemptedResourceMB INT64, + clusterId INT64>, + secondaryMachineConfig STRING, + primaryMachineConfig STRUCT, + dataprocClusterConfig STRUCT, + hdfsMetrics STRUCT>, + labels STRUCT, + statusHistory ARRAY>, + config STRUCT>, + encryptionConfig STRUCT, + gceClusterConfig STRUCT, + shieldedInstanceConfig STRUCT, + serviceAccount STRING, + serviceAccountScopes ARRAY, + internalIpOnly BOOL, + zoneUri STRING>, + lifecycleConfig STRUCT, + workerConfig STRUCT, + machineTypeUri STRING, + imageUri STRING, + preemptibility STRING, + instanceNames ARRAY>, + metastoreConfig STRUCT, + masterConfig STRUCT, + machineTypeUri STRING, + imageUri STRING, + preemptibility STRING, + instanceNames ARRAY>, + tempBucket STRING, + softwareConfig STRUCT, + initializationActions ARRAY>, + configBucket STRING>, + projectId STRING, + clusterName STRING, + clusterUuid STRING, + status STRUCT>, + dataprocJobConfig STRUCT>, + pysparkJob STRUCT, + mainPythonFileUri STRING>, + hiveJob STRUCT, + queryFileUri STRING>, + driverControlFilesUri STRING, + statusHistory ARRAY>, + jobUuid STRING, + done BOOL, + status STRUCT, + driverOutputResourceUri STRING, + pigJob STRUCT, + properties STRING, + queryFileUri STRING>, + placement STRUCT, + reference STRUCT> ) +``` + +### sample BigQuery table query + +```sql +SELECT *, + primaryWorkerCount * primaryWorkerVCores AS totalVCores, + primaryWorkerCount * primaryWorkerDiskSizeGb AS totalDiskSize, + primaryWorkerCount * primaryWorkerMemoryMb AS totalMemoryMb +FROM +(SELECT + dataprocJobConfig.reference.jobId, + dataprocJobConfig.reference.projectId, + dataprocJobConfig.placement.clusterName, + dataprocJobConfig.statusHistory[0].stateStartTime AS startTime, + dataprocClusterConfig.config.workerConfig.numInstances AS primaryWorkerCount, + dataprocClusterConfig.config.workerConfig.machineTypeUri AS primaryWorkerMachineType, + dataprocClusterConfig.config.workerConfig.diskConfig.bootDiskType AS primaryWorkerDiskType, + dataprocClusterConfig.config.workerConfig.diskConfig.bootDiskSizeGb AS primaryWorkerDiskSizeGb, + dataprocClusterConfig.config.masterConfig.numInstances AS masterNodeCount, + dataprocClusterConfig.config.masterConfig.diskConfig.bootDiskType AS masterNodeDiskType, + dataprocClusterConfig.config.masterConfig.diskConfig.bootDiskSizeGb AS masterNodeDiskSize, + dataprocClusterConfig.config.masterConfig.machineTypeUri AS masterNodeMachineType, + primaryMachineConfig.guestCpus AS primaryWorkerVCores, + primaryMachineConfig.imageSpaceGb AS primaryWorkerImageSpaceGb, + primaryMachineConfig.memoryMb AS primaryWorkerMemoryMb, + yarnMetrics.memorySeconds, + yarnMetrics.vcoreSeconds, + yarnMetrics.finishedTime, + yarnMetrics.elapsedTime, + yarnMetrics.startedTime, + yarnMetrics.applicationType, + yarnMetrics.clusterUsagePercentage, +FROM + `..dataproc_job_metrics` +) +ORDER BY + startTime DESC +LIMIT + 1000 +``` \ No newline at end of file diff --git a/tools/dataproc-job-metric-utility/src/collect.py b/tools/dataproc-job-metric-utility/src/collect.py new file mode 100644 index 0000000000..6825a5050f --- /dev/null +++ b/tools/dataproc-job-metric-utility/src/collect.py @@ -0,0 +1,332 @@ +""" +A utility to collect dataproc job metrics over a given timeframe. +""" + +from datetime import timedelta, datetime +import json +import argparse +import logging as logger +from google.cloud import dataproc_v1, compute_v1, bigquery, storage +from google.api_core.exceptions import NotFound +import requests +import google.auth.transport.requests +from google.protobuf.json_format import MessageToDict + + +def to_camel_case(key_str: str): + """Converts a snake_case string to camelCase.""" + key_str = key_str.replace("-", "_").replace(" ", + "_") # Normalize delimiters + components = key_str.split("_") + return components[0] + "".join(x.title() for x in components[1:]) + + +def clean_up_keys(data): + """Creates a new dictionary with camelCase keys from a given dictionary.""" + camel_case_dict = {} + for key, value in data.items(): + if "HiveServer2" in key: + key = "HiveServer2" + if isinstance(value, dict): + # Recursively handle nested dictionaries + value = clean_up_keys(value) + camel_case_dict[to_camel_case(key)] = value + + return camel_case_dict + + +def clean_up_values(data: dict): + """Replaces empty dictionaries with None in a dictionary, including nested ones.""" + for key, value in data.items(): + if isinstance(value, dict): + clean_up_values(value) + if not value: + data[key] = ( + None # BQ: Unsupported empty struct type for field + ) + if "properties" in key: + data[key] = str(data[key]) # wrap properties maps as strings + return data + + +def upload_json_to_gcs(bucket_name: str, blob_name: str, data: dict): + """ Upload json data to a GCS location. """ + logger.info(f"Uploading results to gs://{bucket_name}/{blob_name}") + + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + + ndjson_string = "\n".join(json.dumps(item) for item in data) + + # Upload the string to GCS + blob.upload_from_string(ndjson_string, content_type="application/json") + + +def load_metrics_to_bigquery( + bq_dataset: str, + bq_table: str, + bucket_name: str, + blob_name: str, + project_id: str, + kms_key_name: str = None, +): + """ Load a GCS object containing dataproc metrics into a BQ table """ + logger.info( + f"Loading results to BigQuery: {project_id}.{bq_dataset}.{bq_table}") + + bq_client = bigquery.Client(project=project_id) + + dataset_ref = bq_client.dataset(bq_dataset) + table_ref = dataset_ref.table(bq_table) + + if kms_key_name: + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + autodetect=True, + create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED, + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + destination_encryption_configuration=bigquery. + EncryptionConfiguration(kms_key_name=kms_key_name), + ) + else: + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + autodetect=True, + create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED, + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + ) + + load_job = bq_client.load_table_from_uri("gs://" + bucket_name + "/" + + blob_name, + table_ref, + job_config=job_config) + + load_job.result() # Waits for the job to complete + + +def collect_dataproc_job_metrics(project_id, region, hours, bucket_name, + blob_name, bq_dataset, bq_table): + """ + Uses the Dataproc Job, Dataproc Cluster, Compute, and GCS APIs to collect + dataproc job metrics for all jobs that have ran in the last + hours. Uploads results to GCS. + """ + + # ------------------------------------------------- + # Begin by getting all dataproc jobs in last x hours + # ------------------------------------------------- + logger.info(f"Retrieving all dataproc jobs in the last {hours} hour(s).") + + creds, project = google.auth.default() + auth_req = google.auth.transport.requests.Request() + creds.refresh(auth_req) + cred_token = creds.token + + dataproc_job_client = dataproc_v1.JobControllerClient( + client_options={"api_endpoint": f"{region}-dataproc.googleapis.com"}) + dataproc_cluster_client = dataproc_v1.ClusterControllerClient( + client_options={ + "api_endpoint": f"{region}-dataproc.googleapis.com:443" + }) + + # ------------------------------------------------- + # Get Jobs that have started recently + # ------------------------------------------------- + + dataproc_jobs = dataproc_job_client.list_jobs(request={ + "project_id": project_id, + "region": region + }) + + now = datetime.now() + min_range = datetime.timestamp(now - timedelta(hours=hours)) + + timeframed_jobs = [] + all_jobs = [] + for dataproc_job in dataproc_jobs: + all_jobs.append(dataproc_job) + job_start = datetime.timestamp( + dataproc_job.status_history[0].state_start_time) + if job_start and job_start > min_range: + timeframed_jobs.append(dataproc_job) + + all_job_counts = str(len(all_jobs)) + timeframed_jobs_counts = str(len(timeframed_jobs)) + print(f"All Jobs: {all_job_counts}") + print(f"Jobs in the last {hours} hours: {timeframed_jobs_counts}") + + all_metrics = [] + dataproc_job_config = {} + dataproc_cluster_config = {} + primary_machine_type_config = {} + secondary_machine_type_config = {} + yarn_metrics = {} + count = 0 + for dataproc_job in timeframed_jobs: + count += 1 + print("Progress: " + str(round(count / len(timeframed_jobs) * 100, 2)) + + "%") + dataproc_job_config = MessageToDict(dataproc_job._pb) + dataproc_cluster_name = dataproc_job_config.get("placement").get( + "clusterName") + + try: + dataproc_cluster = dataproc_cluster_client.get_cluster( + project_id=project_id, + region=region, + cluster_name=dataproc_cluster_name, + ) + dataproc_cluster_config = MessageToDict(dataproc_cluster._pb) + + # ------------------------------------------------- + # Collect metrics for cluster machine types + # ------------------------------------------------- + + compute_client = compute_v1.MachineTypesClient() + + primary_machine_type = str( + dataproc_cluster.config.worker_config.machine_type_uri).rsplit( + "/", 1)[-1] + primary_machine_type_config = MessageToDict( + compute_client.get( + project=project_id, + zone=region + "-a", + machine_type=primary_machine_type, + )._pb) + + secondary_worker_count = int( + dataproc_cluster.config.secondary_worker_config.num_instances) + secondary_machine_type_config = {} + if secondary_worker_count > 0: + secondary_machine_type = str( + dataproc_cluster.config.secondary_worker_config. + machine_type_uri).rsplit("/", 1)[-1] + secondary_machine_type_config = MessageToDict( + compute_client.get( + project=project_id, + zone=region + "-a", + machine_type=secondary_machine_type, + )._pb) + except NotFound: + logger.info("Cluster not found for job id.") + dataproc_cluster_config = None + + # ------------------------------------------------- + # Collect YARN metrics for Job if Cluster exists + # ------------------------------------------------- + yarn_metrics = {} + if dataproc_cluster_config: + if dataproc_job.yarn_applications: + yarn_endpoint = dataproc_cluster.config.endpoint_config.http_ports.get( + "YARN ResourceManager") + application_id = dataproc_job.yarn_applications[ + 0].tracking_url.split("/")[-2] + + base_url = f"{yarn_endpoint}ws/v1/cluster/apps/{application_id}" + try: + headers = {"Proxy-Authorization": f"Bearer {cred_token}"} + response = requests.get(url=base_url, headers=headers) + response.raise_for_status( + ) # Raise an exception for HTTP errors + yarn_metrics = response.json().get("app") + + except requests.exceptions.RequestException as e: + print(str(e)) + continue + + job_metrics = { + "dataproc_job_config": dataproc_job_config, + "dataproc_cluster_config": dataproc_cluster_config, + "primary_machine_config": primary_machine_type_config, + "secondary_machine_config": secondary_machine_type_config, + "yarn_metrics": yarn_metrics, + } + print(job_metrics) + job_metrics = clean_up_keys(job_metrics) + job_metrics = clean_up_values(job_metrics) + all_metrics.append(job_metrics) + + if all_metrics: + + # ------------------------------------------------- + # Upload results to GCS + # ------------------------------------------------- + upload_json_to_gcs(bucket_name=bucket_name, + blob_name=blob_name, + data=all_metrics) + + # ------------------------------------------------- + # Load results into BigQuery + # ------------------------------------------------- + load_metrics_to_bigquery( + project_id=project_id, + bq_dataset=bq_dataset, + bq_table=bq_table, + bucket_name=bucket_name, + blob_name=blob_name, + ) + + logger.info("Metric collection complete.") + else: + logger.error("No Dataproc jobs found in the specified timeframe.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Collect Dataproc job metrics and store them in BigQuery.") + + # Required Arguments + parser.add_argument("--project_id", + type=str, + required=True, + help="Google Cloud project ID") + parser.add_argument( + "--region", + type=str, + required=True, + help="Cloud region where the Dataproc jobs ran", + ) + parser.add_argument( + "--bq_dataset", + type=str, + required=True, + help="BigQuery dataset to store metrics", + ) + parser.add_argument( + "--bucket_name", + type=str, + required=True, + help="GCS bucket to store metrics data", + ) + parser.add_argument("--bq_table", + type=str, + required=True, + help="BigQuery table to store metrics") + + # Optional Arguments (with defaults) + parser.add_argument( + "--hours", + type=int, + default=24, + help="Number of hours to look back for job metrics (default: 24)", + ) + parser.add_argument( + "--blob_name", + type=str, + default="dataproc_metrics.json", + help="Name of the GCS metrics blob", + ) + + args = parser.parse_args() + + # Call the function with the parsed arguments + collect_dataproc_job_metrics( + args.project_id, + args.region, + args.hours, + args.bucket_name, + args.blob_name, + args.bq_dataset, + args.bq_table, + )