From e11365bfaa6f10f88ef478a9980215f517cccb11 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 9 Oct 2023 15:19:54 -0700 Subject: [PATCH 1/6] Add support for Dataproc GKE in user tools Signed-off-by: Partho Sarthi --- .../cloud_api/dataproc.py | 12 +- .../cloud_api/dataproc_gke.py | 226 ++++++++++++++++++ .../cloud_api/dataproc_gke_job.py | 27 +++ .../cloud_api/sp_types.py | 5 +- .../pricing/dataproc_gke_pricing.py | 103 ++++++++ .../src/spark_rapids_pytools/wrapper.py | 2 + .../wrappers/dataproc_gke_wrapper.py | 141 +++++++++++ .../src/spark_rapids_tools/cloud/__init__.py | 3 +- .../cloud/dataproc/dataproccluster.py | 18 ++ user_tools/src/spark_rapids_tools/enums.py | 1 + 10 files changed, 530 insertions(+), 8 deletions(-) create mode 100644 user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py create mode 100644 user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke_job.py create mode 100644 user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py create mode 100644 user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index 78cfa49be..a68a060c9 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -326,8 +326,8 @@ def parse_accelerator_description(raw_description: str) -> dict: for defined_acc in accelerator_arr: # TODO: if the accelerator_arr has other non-gpu ones, then we need to loop until we # find the gpu accelerators - gpu_configs = {'num_gpus': defined_acc.get('acceleratorCount')} - accelerator_type = defined_acc.get('acceleratorTypeUri') + gpu_configs = {'num_gpus': int(defined_acc.get('acceleratorCount'))} + accelerator_type = defined_acc.get('acceleratorTypeUri') or defined_acc.get('acceleratorType') gpu_device_type = self.__extract_info_from_value(accelerator_type) gpu_description = cli.exec_platform_describe_accelerator(accelerator_type=gpu_device_type, cmd_args=None) @@ -350,7 +350,7 @@ def _set_fields_from_props(self): # set the machine type if not self.props: return - mc_type_uri = self.props.get_value('machineTypeUri') + mc_type_uri = self.props.get_value_silent('machineTypeUri') if mc_type_uri: self.instance_type = self.__extract_info_from_value(mc_type_uri) else: @@ -514,7 +514,7 @@ class DataprocSavingsEstimator(SavingsEstimator): """ A class that calculates the savings based on Dataproc price provider """ - def __calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: SparkNodeType): + def _calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: SparkNodeType): nodes_cnt = cluster_inst.get_nodes_cnt(node_type) cores_count = cluster_inst.get_node_core_count(node_type) mem_mb = cluster_inst.get_node_mem_mb(node_type) @@ -532,7 +532,7 @@ def __calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: Sp return nodes_cnt * (cores_cost + memory_cost + gpu_cost) def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): - master_cost = self.__calculate_group_cost(cluster, SparkNodeType.MASTER) - workers_cost = self.__calculate_group_cost(cluster, SparkNodeType.WORKER) + master_cost = self._calculate_group_cost(cluster, SparkNodeType.MASTER) + workers_cost = self._calculate_group_cost(cluster, SparkNodeType.WORKER) dataproc_cost = self.price_provider.get_container_cost() return master_cost + workers_cost + dataproc_cost diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py new file mode 100644 index 000000000..82ae4085f --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py @@ -0,0 +1,226 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Implementation specific to Dataproc""" + +from dataclasses import dataclass, field +from typing import Any + +from spark_rapids_pytools.cloud_api.dataproc import DataprocCluster, DataprocCMDDriver, DataprocNode, \ + DataprocPlatform, DataprocSavingsEstimator +from spark_rapids_pytools.cloud_api.dataproc_gke_job import DataprocGkeLocalRapidsJob +from spark_rapids_pytools.cloud_api.sp_types import CMDDriverBase, \ + ClusterNode, SparkNodeType, ClusterState, ClusterGetAccessor +from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer +from spark_rapids_pytools.common.sys_storage import FSUtil +from spark_rapids_pytools.pricing.dataproc_gke_pricing import DataprocGkePriceProvider +from spark_rapids_tools import CspEnv + + +@dataclass +class GkeNodePool: + role: str + pool_name: str + + @staticmethod + def __extract_info_from_value(conf_val: str): + if '/' in conf_val: + # This is a valid url-path + return FSUtil.get_resource_name(conf_val) + # This is a value + return conf_val + + @classmethod + def from_dict(cls, node_pool_info: dict): + pool_name = cls.__extract_info_from_value(node_pool_info['nodePool']) + return cls(role=node_pool_info['roles'], pool_name=pool_name) + + +@dataclass +class GkeCluster: + cluster_name: str + node_pools: list[GkeNodePool] + + +@dataclass +class DataprocGkePlatform(DataprocPlatform): + """ + Represents the interface and utilities required by DataprocGke. + Prerequisites: + - install gcloud command lines (gcloud, gsutil) + - configure the gcloud CLI. + - dataproc_gke has staging temporary storage. we can retrieve that from the cluster properties. + """ + + def __post_init__(self): + super().__post_init__() + self.type_id = CspEnv.DATAPROC_GKE + + @classmethod + def get_spark_node_type_fromstring(cls, value: str): + node_type_mapping = { + 'SPARK_EXECUTOR': SparkNodeType.WORKER, + 'SPARK_DRIVER': SparkNodeType.MASTER, + } + return node_type_mapping.get(value.upper()) + + def _construct_cli_object(self) -> CMDDriverBase: + return DataprocGkeCMDDriver(timeout=0, cloud_ctxt=self.ctxt) + + def _construct_cluster_from_props(self, cluster: str, props: str = None): + return DataprocGkeCluster(self).set_connection(cluster_id=cluster, props=props) + + def migrate_cluster_to_gpu(self, orig_cluster): + """ + given a cluster, convert it to run NVIDIA Gpu based on mapping instance types + :param orig_cluster: the original cluster to migrate from + :return: a new object cluster that supports GPU. + """ + gpu_cluster_ob = DataprocGkeCluster(self) + gpu_cluster_ob.migrate_from_cluster(orig_cluster) + return gpu_cluster_ob + + def create_saving_estimator(self, + source_cluster: ClusterGetAccessor, + reshaped_cluster: ClusterGetAccessor, + target_cost: float = None, + source_cost: float = None): + raw_pricing_config = self.configs.get_value_silent('pricing') + if raw_pricing_config: + pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config, + file_load=False) + else: + pricing_config: JSONPropertiesContainer = None + pricing_provider = DataprocGkePriceProvider(region=self.cli.get_region(), + pricing_configs={'gcloud': pricing_config}) + saving_estimator = DataprocGkeSavingsEstimator(price_provider=pricing_provider, + reshaped_cluster=reshaped_cluster, + source_cluster=source_cluster, + target_cost=target_cost, + source_cost=source_cost) + return saving_estimator + + def create_local_submission_job(self, job_prop, ctxt) -> Any: + return DataprocGkeLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt) + + +@dataclass +class DataprocGkeCMDDriver(DataprocCMDDriver): + """Represents the command interface that will be used by DataprocGke""" + + def pull_node_pool_props_by_args(self, args: dict) -> str: + node_pool_name = args.get('node_pool_name') + gke_cluster_name = args.get('gke_cluster_name') + if 'region' in args: + region_name = args.get('region') + else: + region_name = self.get_region() + describe_cluster_cmd = ['gcloud', + 'container', + 'node-pools', + 'describe', + node_pool_name, + '--cluster', + gke_cluster_name, + '--region', + region_name] + return self.run_sys_cmd(describe_cluster_cmd) + + +@dataclass +class GkeNodePool: + """ + Holds information about node pools + """ + name: str + roles: list + gke_cluster_name: str + spark_node_type: SparkNodeType = field(default=None, init=False) # map the role to Spark type. + + def __post_init__(self): + # only the first role is considered + if len(self.roles) > 0: + self.spark_node_type = DataprocGkePlatform.get_spark_node_type_fromstring(self.roles[0]) + + +@dataclass +class DataprocGkeCluster(DataprocCluster): + """ + Represents an instance of running cluster on DataprocGke. + """ + node_pools: list[GkeNodePool] = field(default=None, init=False) + + @staticmethod + def __extract_info_from_value(conf_val: str): + if '/' in conf_val: + # this is a valid url-path + return FSUtil.get_resource_name(conf_val) + # this is a value + return conf_val + + def _init_node_pools(self): + gke_cluster_config = self.props.get_value('virtualClusterConfig', 'kubernetesClusterConfig', 'gkeClusterConfig') + gke_cluster_name = self.__extract_info_from_value(gke_cluster_config['gkeClusterTarget']) + raw_node_pools = gke_cluster_config['nodePoolTarget'] + + def create_node_pool_elem(node_pool: dict) -> GkeNodePool: + pool_name = self.__extract_info_from_value(node_pool['nodePool']) + return GkeNodePool(name=pool_name, roles=node_pool['roles'], gke_cluster_name=gke_cluster_name) + + self.node_pools = [create_node_pool_elem(node_pool) for node_pool in raw_node_pools] + + def _init_nodes(self): + self._init_node_pools() + + def create_cluster_node(node_pool): + if node_pool.spark_node_type is None: + return None + else: + args = {'node_pool_name': node_pool.name, 'gke_cluster_name': node_pool.gke_cluster_name} + raw_node_props = self.cli.pull_node_pool_props_by_args(args) + node_props = JSONPropertiesContainer(prop_arg=raw_node_props, file_load=False) + node = DataprocNode.create_node(node_pool.spark_node_type).set_fields_from_dict({ + 'name': node_props.get_value('name'), + 'props': JSONPropertiesContainer(prop_arg=node_props.get_value('config'), file_load=False), + 'zone': self.zone + }) + node.fetch_and_set_hw_info(self.cli) + return node + + executor_nodes = [] + driver_nodes = [] + for gke_node_pool in self.node_pools: + c_node = create_cluster_node(gke_node_pool) + if gke_node_pool.spark_node_type == SparkNodeType.WORKER: + executor_nodes.append(c_node) + elif gke_node_pool.spark_node_type == SparkNodeType.MASTER: + driver_nodes.append(c_node) + self.nodes = { + SparkNodeType.WORKER: executor_nodes, + SparkNodeType.MASTER: driver_nodes[0] + } + + +@dataclass +class DataprocGkeSavingsEstimator(DataprocSavingsEstimator): + """ + A class that calculates the savings based on DataprocGke price provider + """ + + def _get_cost_per_cluster(self, cluster: ClusterGetAccessor): + master_cost = self._calculate_group_cost(cluster, SparkNodeType.MASTER) + workers_cost = self._calculate_group_cost(cluster, SparkNodeType.WORKER) + dataproc_gke_cost = self.price_provider.get_container_cost() + return master_cost + workers_cost + dataproc_gke_cost diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke_job.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke_job.py new file mode 100644 index 000000000..01fd3d729 --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke_job.py @@ -0,0 +1,27 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implementation of Job submissions on GCloud Dataproc GKE""" + +from dataclasses import dataclass + +from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob + + +@dataclass +class DataprocGkeLocalRapidsJob(RapidsLocalJob): + """ + Implementation of a RAPIDS job that runs on a local machine. + """ + job_label = 'dataprocLocal' diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index 57979e2d5..52522ce3f 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -28,7 +28,6 @@ from spark_rapids_pytools.common.sys_storage import StorageDriver, FSUtil from spark_rapids_pytools.common.utilities import ToolLogging, SysCmd, Utils, TemplateGenerator - class DeployMode(EnumeratedType): """List of tools deployment methods""" # The rapids job is running on local node @@ -514,6 +513,9 @@ def _build_platform_list_cluster(self, query_args: dict = None) -> list: raise NotImplementedError + def pull_node_pool_props_by_args(self, args: dict) -> str: + raise NotImplementedError + def exec_platform_list_cluster_instances(self, cluster, query_args: dict = None) -> str: @@ -1178,6 +1180,7 @@ def get_platform(platform_id: Enum) -> Type[PlatformBase]: CspEnv.DATABRICKS_AWS: ('spark_rapids_pytools.cloud_api.databricks_aws', 'DBAWSPlatform'), CspEnv.DATABRICKS_AZURE: ('spark_rapids_pytools.cloud_api.databricks_azure', 'DBAzurePlatform'), CspEnv.DATAPROC: ('spark_rapids_pytools.cloud_api.dataproc', 'DataprocPlatform'), + CspEnv.DATAPROC_GKE: ('spark_rapids_pytools.cloud_api.dataproc_gke', 'DataprocGkePlatform'), CspEnv.EMR: ('spark_rapids_pytools.cloud_api.emr', 'EMRPlatform'), CspEnv.ONPREM: ('spark_rapids_pytools.cloud_api.onprem', 'OnPremPlatform'), } diff --git a/user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py b/user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py new file mode 100644 index 000000000..d12c2d1ec --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py @@ -0,0 +1,103 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""providing absolute costs of resources in GCloud DataprocGke""" + +from dataclasses import dataclass + + +from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer +from spark_rapids_pytools.common.sys_storage import FSUtil +from spark_rapids_pytools.pricing.price_provider import PriceProvider + + +@dataclass +class DataprocGkeCatalogContainer(JSONPropertiesContainer): + def _init_fields(self) -> None: + # the prices of the products are defined under 'gcp_price_list' + self.props = self.props['gcp_price_list'] + + +@dataclass +class DataprocGkePriceProvider(PriceProvider): + """ + Provide costs of DataprocGke instances + """ + name = 'DataprocGke' + + def _process_resource_configs(self): + online_entries = self.pricing_configs['gcloud'].get_value('catalog', 'onlineResources') + for online_entry in online_entries: + if online_entry.get('resourceKey') == 'gcloud-catalog': + file_name = online_entry.get('localFile') + self.cache_files = {'gcloud': FSUtil.build_path(self.cache_directory, file_name)} + self.resource_urls = {'gcloud': online_entry.get('onlineURL')} + break + + def _create_catalogs(self): + self.catalogs = {'gcloud': DataprocGkeCatalogContainer(prop_arg=self.cache_files['gcloud'])} + + def get_ssd_price(self, machine_type: str) -> float: + lookup_key = 'CP-COMPUTEENGINE-LOCAL-SSD' + ssd_unit_size_factor = float(self.pricing_configs['gcloud'].get_value('catalog', 'ssd', 'unitSizeFactor')) + return self.catalogs['gcloud'].get_value(lookup_key, self.region) * ssd_unit_size_factor + + def get_ram_price(self, machine_type: str) -> float: + lookup_key = self._key_for_cpe_machine_ram(machine_type) + return self.catalogs['gcloud'].get_value(lookup_key, self.region) + + def get_gpu_price(self, gpu_device: str) -> float: + lookup_key = self._key_for_gpu_device(gpu_device) + return self.catalogs['gcloud'].get_value(lookup_key, self.region) + + def get_cpu_price(self, machine_type: str) -> float: + lookup_key = self._key_for_cpe_machine_cores(machine_type) + return self.catalogs['gcloud'].get_value(lookup_key, self.region) + + def get_container_cost(self) -> float: + return self.__get_dataproc_cluster_price() + + def __get_dataproc_cluster_price(self) -> float: + lookup_key = 'CP-DATAPROC' + return self.catalogs['gcloud'].get_value(lookup_key, 'us') + + def get_cores_count_for_vm(self, machine_type: str) -> str: + lookup_key = self._key_for_cpe_vm(machine_type) + cores = self.catalogs['gcloud'].get_value_silent(lookup_key, 'cores') + return cores + + def get_ram_size_for_vm(self, machine_type: str) -> str: + lookup_key = self._key_for_cpe_vm(machine_type) + memory = self.catalogs['gcloud'].get_value_silent(lookup_key, 'memory') + return memory + + @classmethod + def _key_for_cpe_machine_cores(cls, machine_type: str) -> str: + return f'CP-COMPUTEENGINE-{cls._get_machine_prefix(machine_type).upper()}-PREDEFINED-VM-CORE' + + @classmethod + def _key_for_cpe_machine_ram(cls, machine_type: str) -> str: + return f'CP-COMPUTEENGINE-{cls._get_machine_prefix(machine_type).upper()}-PREDEFINED-VM-RAM' + + @classmethod + def _key_for_gpu_device(cls, gpu_device: str) -> str: + return f'GPU_NVIDIA_TESLA_{gpu_device.upper()}' + + @classmethod + def _get_machine_prefix(cls, machine_type: str) -> str: + return machine_type.split('-')[0] + + @classmethod + def _key_for_cpe_vm(cls, machine_type: str): + return f'CP-COMPUTEENGINE-VMIMAGE-{machine_type.upper()}' diff --git a/user_tools/src/spark_rapids_pytools/wrapper.py b/user_tools/src/spark_rapids_pytools/wrapper.py index ae13119ce..1d6b1c879 100644 --- a/user_tools/src/spark_rapids_pytools/wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrapper.py @@ -19,6 +19,7 @@ from spark_rapids_pytools.wrappers.databricks_aws_wrapper import DBAWSWrapper from spark_rapids_pytools.wrappers.databricks_azure_wrapper import DBAzureWrapper from spark_rapids_pytools.wrappers.dataproc_wrapper import DataprocWrapper +from spark_rapids_pytools.wrappers.dataproc_gke_wrapper import DataprocGKEWrapper from spark_rapids_pytools.wrappers.emr_wrapper import EMRWrapper from spark_rapids_pytools.wrappers.onprem_wrapper import OnPremWrapper @@ -27,6 +28,7 @@ def main(): fire.Fire({ 'emr': EMRWrapper, 'dataproc': DataprocWrapper, + 'dataproc-gke': DataprocGKEWrapper, 'databricks-aws': DBAWSWrapper, 'databricks-azure': DBAzureWrapper, 'onprem': OnPremWrapper diff --git a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py new file mode 100644 index 000000000..48b954abe --- /dev/null +++ b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py @@ -0,0 +1,141 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Wrapper class to run tools associated with RAPIDS Accelerator for Apache Spark plugin on Dataproc.""" + +from spark_rapids_tools import CspEnv +from spark_rapids_pytools.cloud_api.sp_types import DeployMode +from spark_rapids_pytools.common.utilities import ToolLogging +from spark_rapids_pytools.rapids.qualification import QualFilterApp, QualificationAsLocal, QualGpuClusterReshapeType + + +class CliDataprocGKELocalMode: # pylint: disable=too-few-public-methods + """ + A wrapper that runs the RAPIDS Accelerator tools locally on the dev machine for Dataproc GKE. + """ + + @staticmethod + def qualification(cpu_cluster: str = None, + eventlogs: str = None, + local_folder: str = None, + remote_folder: str = None, + gpu_cluster: str = None, + tools_jar: str = None, + credentials_file: str = None, + filter_apps: str = QualFilterApp.tostring(QualFilterApp.SAVINGS), + gpu_cluster_recommendation: str = QualGpuClusterReshapeType.tostring( + QualGpuClusterReshapeType.get_default()), + jvm_heap_size: int = 24, + verbose: bool = False, + cpu_discount: int = None, + gpu_discount: int = None, + global_discount: int = None, + **rapids_options) -> None: + """ + The Qualification tool analyzes Spark events generated from CPU based Spark applications to + help quantify the expected acceleration and costs savings of migrating a Spark application + or query to GPU. The wrapper downloads dependencies and executes the analysis on the local + dev machine + :param cpu_cluster: The Dataproc-cluster on which the Spark applications were executed. The argument + can be a Dataproc-cluster or a valid path to the cluster's properties file (json format) + generated by the gcloud-CLI. + :param eventlogs: Event log filenames or gcs storage directories + containing event logs (comma separated). If missing, the wrapper Reads the Spark's + property `spark.eventLog.dir` defined in `cpu_cluster`. This property should be included + in the output of `gcloud dataproc clusters describe` + Note that the wrapper will raise an exception if the property is not set + :param local_folder: Local work-directory path to store the output and to be used as root + directory for temporary folders/files. The final output will go into a subdirectory called + ${local_folder}/qual-${EXEC_ID} where exec_id is an auto-generated unique identifier of the + execution. If the argument is NONE, the default value is the env variable + RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY if any; or the current working directory + :param remote_folder: A gcs folder where the output is uploaded at the end of execution. + If no value is provided, the output will be only available on local disk + :param gpu_cluster: The Dataproc-cluster on which the Spark applications is planned to be migrated. + The argument can be a Dataproc-cluster or a valid path to the cluster's properties file + (json format) generated by the gcloud-CLI. If missing, the wrapper maps the dataproc machine + instances of the original cluster into dataproc instances that support GPU acceleration + :param tools_jar: Path to a bundled jar including Rapids tool. The path is a local filesystem, + or remote gcs url. If missing, the wrapper downloads the latest rapids-4-spark-tools_*.jar + from maven repo + :param credentials_file: The local path of JSON file that contains the application credentials. + If missing, the wrapper looks for "GOOGLE_APPLICATION_CREDENTIALS" environment variable + to provide the location of a credential JSON file. The default credentials file exists as + "$HOME/.config/gcloud/application_default_credentials.json" + :param filter_apps: filtering criteria of the applications listed in the final STDOUT table + is one of the following (ALL, SPEEDUPS, savings). + Note that this filter does not affect the CSV report. + "ALL" means no filter applied. "SPEEDUPS" lists all the apps that are either + 'Recommended', or 'Strongly Recommended' based on speedups. "SAVINGS" + lists all the apps that have positive estimated GPU savings except for the apps that + are "Not Applicable" + :param gpu_cluster_recommendation: The type of GPU cluster recommendation to generate. + It accepts one of the following ("CLUSTER", "JOB" and the default value "MATCH"). + "MATCH": keep GPU cluster same number of nodes as CPU cluster; + "CLUSTER": recommend optimal GPU cluster by cost for entire cluster; + "JOB": recommend optimal GPU cluster by cost per job + :param jvm_heap_size: The maximum heap size of the JVM in gigabytes + :param verbose: True or False to enable verbosity to the wrapper script + :param cpu_discount: A percent discount for the cpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param gpu_discount: A percent discount for the gpu cluster cost in the form of an integer value + (e.g. 30 for 30% discount). + :param global_discount: A percent discount for both the cpu and gpu cluster costs in the form of an + integer value (e.g. 30 for 30% discount). + :param rapids_options: A list of valid Qualification tool options. + Note that the wrapper ignores ["output-directory", "platform"] flags, and it does not support + multiple "spark-property" arguments. + For more details on Qualification tool options, please visit + https://nvidia.github.io/spark-rapids/docs/spark-qualification-tool.html#qualification-tool-options + """ + if verbose: + # when debug is set to true set it in the environment. + ToolLogging.enable_debug_mode() + wrapper_qual_options = { + 'platformOpts': { + 'credentialFile': credentials_file, + 'deployMode': DeployMode.LOCAL, + }, + 'migrationClustersProps': { + 'cpuCluster': cpu_cluster, + 'gpuCluster': gpu_cluster + }, + 'jobSubmissionProps': { + 'remoteFolder': remote_folder, + 'platformArgs': { + 'jvmMaxHeapSize': jvm_heap_size + } + }, + 'eventlogs': eventlogs, + 'filterApps': filter_apps, + 'toolsJar': tools_jar, + 'gpuClusterRecommendation': gpu_cluster_recommendation, + 'cpuDiscount': cpu_discount, + 'gpuDiscount': gpu_discount, + 'globalDiscount': global_discount + } + + tool_obj = QualificationAsLocal(platform_type=CspEnv.DATAPROC_GKE, + output_folder=local_folder, + wrapper_options=wrapper_qual_options, + rapids_options=rapids_options) + tool_obj.launch() + + +class DataprocGKEWrapper: # pylint: disable=too-few-public-methods + """ + A wrapper script to run RAPIDS Accelerator tools (Qualification, Profiling, and Bootstrap) on Gcloud Dataproc GKE. + """ + def __init__(self): + self.qualification = CliDataprocGKELocalMode.qualification \ No newline at end of file diff --git a/user_tools/src/spark_rapids_tools/cloud/__init__.py b/user_tools/src/spark_rapids_tools/cloud/__init__.py index cd87d6950..7bdc56fec 100644 --- a/user_tools/src/spark_rapids_tools/cloud/__init__.py +++ b/user_tools/src/spark_rapids_tools/cloud/__init__.py @@ -17,7 +17,7 @@ from .cluster import ClientCluster from .onprem.onpremcluster import OnPremClientCluster from .emr.emrcluster import EmrClientCluster -from .dataproc.dataproccluster import DataprocClientCluster +from .dataproc.dataproccluster import DataprocClientCluster, DataprocGkeClientCluster from .databricks.dbcluster import DBAwsClientCluster, DBAzureClientCluster __all__ = [ @@ -25,6 +25,7 @@ 'DBAwsClientCluster', 'DBAzureClientCluster', 'DataprocClientCluster', + 'DataprocGkeClientCluster', 'EmrClientCluster', 'OnPremClientCluster' ] diff --git a/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py b/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py index 20ebf526a..8e9c3858f 100644 --- a/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py +++ b/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py @@ -29,6 +29,13 @@ class DataprocClusterSchema(PropValidatorSchemaCamel): config: dict +class DataprocGkeClusterSchema(PropValidatorSchemaCamel): + cluster_name: str + cluster_uuid: str + project_id: str + config: dict + + @register_cluster_prop_mgr('dataproc') class DataprocClusterPropMgr(ClusterPropMgr): schema_clzz: ClassVar[Type[PropValidatorSchema]] = DataprocClusterSchema @@ -37,3 +44,14 @@ class DataprocClusterPropMgr(ClusterPropMgr): @register_client_cluster('dataproc') class DataprocClientCluster(ClientCluster): # pylint: disable=too-few-public-methods pass + + +@register_cluster_prop_mgr('dataproc_gke') +class DataprocGkeClusterPropMgr(ClusterPropMgr): + schema_clzz: ClassVar[Type[PropValidatorSchema]] = DataprocGkeClusterSchema + + +@register_client_cluster('dataproc_gke') +class DataprocGkeClientCluster(ClientCluster): # pylint: disable=too-few-public-methods + pass + diff --git a/user_tools/src/spark_rapids_tools/enums.py b/user_tools/src/spark_rapids_tools/enums.py index 944ceef91..58fa8cd0b 100644 --- a/user_tools/src/spark_rapids_tools/enums.py +++ b/user_tools/src/spark_rapids_tools/enums.py @@ -70,6 +70,7 @@ class CspEnv(EnumeratedType): DATABRICKS_AWS = 'databricks_aws' DATABRICKS_AZURE = 'databricks_azure' DATAPROC = 'dataproc' + DATAPROC_GKE = 'dataproc_gke' EMR = 'emr' ONPREM = 'onprem' NONE = 'NONE' From 557f2c792b49b16cf17a4069d5a26652dc78a1dc Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 9 Oct 2023 16:45:26 -0700 Subject: [PATCH 2/6] Fix pylint Signed-off-by: Partho Sarthi --- .../cloud_api/dataproc.py | 2 +- .../cloud_api/dataproc_gke.py | 48 +++++-------------- .../wrappers/dataproc_gke_wrapper.py | 2 +- .../cloud/dataproc/dataproccluster.py | 1 - 4 files changed, 13 insertions(+), 40 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index a68a060c9..bd17016bd 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -155,7 +155,7 @@ def calc_num_gpus(gpus_criteria_conf: List[dict], num_cores: int) -> int: @dataclass -class DataprocCMDDriver(CMDDriverBase): +class DataprocCMDDriver(CMDDriverBase): # pylint: disable=abstract-method """Represents the command interface that will be used by Dataproc""" def _list_inconsistent_configurations(self) -> list: diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py index 82ae4085f..a84ec7d29 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py @@ -22,38 +22,13 @@ DataprocPlatform, DataprocSavingsEstimator from spark_rapids_pytools.cloud_api.dataproc_gke_job import DataprocGkeLocalRapidsJob from spark_rapids_pytools.cloud_api.sp_types import CMDDriverBase, \ - ClusterNode, SparkNodeType, ClusterState, ClusterGetAccessor + SparkNodeType, ClusterGetAccessor from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.pricing.dataproc_gke_pricing import DataprocGkePriceProvider from spark_rapids_tools import CspEnv -@dataclass -class GkeNodePool: - role: str - pool_name: str - - @staticmethod - def __extract_info_from_value(conf_val: str): - if '/' in conf_val: - # This is a valid url-path - return FSUtil.get_resource_name(conf_val) - # This is a value - return conf_val - - @classmethod - def from_dict(cls, node_pool_info: dict): - pool_name = cls.__extract_info_from_value(node_pool_info['nodePool']) - return cls(role=node_pool_info['roles'], pool_name=pool_name) - - -@dataclass -class GkeCluster: - cluster_name: str - node_pools: list[GkeNodePool] - - @dataclass class DataprocGkePlatform(DataprocPlatform): """ @@ -187,17 +162,16 @@ def _init_nodes(self): def create_cluster_node(node_pool): if node_pool.spark_node_type is None: return None - else: - args = {'node_pool_name': node_pool.name, 'gke_cluster_name': node_pool.gke_cluster_name} - raw_node_props = self.cli.pull_node_pool_props_by_args(args) - node_props = JSONPropertiesContainer(prop_arg=raw_node_props, file_load=False) - node = DataprocNode.create_node(node_pool.spark_node_type).set_fields_from_dict({ - 'name': node_props.get_value('name'), - 'props': JSONPropertiesContainer(prop_arg=node_props.get_value('config'), file_load=False), - 'zone': self.zone - }) - node.fetch_and_set_hw_info(self.cli) - return node + args = {'node_pool_name': node_pool.name, 'gke_cluster_name': node_pool.gke_cluster_name} + raw_node_props = self.cli.pull_node_pool_props_by_args(args) + node_props = JSONPropertiesContainer(prop_arg=raw_node_props, file_load=False) + node = DataprocNode.create_node(node_pool.spark_node_type).set_fields_from_dict({ + 'name': node_props.get_value('name'), + 'props': JSONPropertiesContainer(prop_arg=node_props.get_value('config'), file_load=False), + 'zone': self.zone + }) + node.fetch_and_set_hw_info(self.cli) + return node executor_nodes = [] driver_nodes = [] diff --git a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py index 48b954abe..29a4ff1b2 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py @@ -138,4 +138,4 @@ class DataprocGKEWrapper: # pylint: disable=too-few-public-methods A wrapper script to run RAPIDS Accelerator tools (Qualification, Profiling, and Bootstrap) on Gcloud Dataproc GKE. """ def __init__(self): - self.qualification = CliDataprocGKELocalMode.qualification \ No newline at end of file + self.qualification = CliDataprocGKELocalMode.qualification diff --git a/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py b/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py index 8e9c3858f..6018a1e79 100644 --- a/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py +++ b/user_tools/src/spark_rapids_tools/cloud/dataproc/dataproccluster.py @@ -54,4 +54,3 @@ class DataprocGkeClusterPropMgr(ClusterPropMgr): @register_client_cluster('dataproc_gke') class DataprocGkeClientCluster(ClientCluster): # pylint: disable=too-few-public-methods pass - From 3d1dd549321e178d2bb3752fb2c0e7ca5ce80d2f Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 10 Oct 2023 09:32:39 -0700 Subject: [PATCH 3/6] Update Dataproc GKE price provider Signed-off-by: Partho Sarthi --- .../pricing/dataproc_gke_pricing.py | 81 ++----------------- 1 file changed, 7 insertions(+), 74 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py b/user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py index d12c2d1ec..5c01d3296 100644 --- a/user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py +++ b/user_tools/src/spark_rapids_pytools/pricing/dataproc_gke_pricing.py @@ -16,88 +16,21 @@ from dataclasses import dataclass - -from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer -from spark_rapids_pytools.common.sys_storage import FSUtil -from spark_rapids_pytools.pricing.price_provider import PriceProvider +from spark_rapids_pytools.pricing.dataproc_pricing import DataprocPriceProvider @dataclass -class DataprocGkeCatalogContainer(JSONPropertiesContainer): - def _init_fields(self) -> None: - # the prices of the products are defined under 'gcp_price_list' - self.props = self.props['gcp_price_list'] - - -@dataclass -class DataprocGkePriceProvider(PriceProvider): +class DataprocGkePriceProvider(DataprocPriceProvider): """ Provide costs of DataprocGke instances """ name = 'DataprocGke' - def _process_resource_configs(self): - online_entries = self.pricing_configs['gcloud'].get_value('catalog', 'onlineResources') - for online_entry in online_entries: - if online_entry.get('resourceKey') == 'gcloud-catalog': - file_name = online_entry.get('localFile') - self.cache_files = {'gcloud': FSUtil.build_path(self.cache_directory, file_name)} - self.resource_urls = {'gcloud': online_entry.get('onlineURL')} - break - - def _create_catalogs(self): - self.catalogs = {'gcloud': DataprocGkeCatalogContainer(prop_arg=self.cache_files['gcloud'])} - - def get_ssd_price(self, machine_type: str) -> float: - lookup_key = 'CP-COMPUTEENGINE-LOCAL-SSD' - ssd_unit_size_factor = float(self.pricing_configs['gcloud'].get_value('catalog', 'ssd', 'unitSizeFactor')) - return self.catalogs['gcloud'].get_value(lookup_key, self.region) * ssd_unit_size_factor - - def get_ram_price(self, machine_type: str) -> float: - lookup_key = self._key_for_cpe_machine_ram(machine_type) - return self.catalogs['gcloud'].get_value(lookup_key, self.region) - - def get_gpu_price(self, gpu_device: str) -> float: - lookup_key = self._key_for_gpu_device(gpu_device) - return self.catalogs['gcloud'].get_value(lookup_key, self.region) - - def get_cpu_price(self, machine_type: str) -> float: - lookup_key = self._key_for_cpe_machine_cores(machine_type) - return self.catalogs['gcloud'].get_value(lookup_key, self.region) - def get_container_cost(self) -> float: - return self.__get_dataproc_cluster_price() + dataproc_cost = super().get_container_cost() + gke_container_cost = self.__get_gke_container_cost() + return dataproc_cost + gke_container_cost - def __get_dataproc_cluster_price(self) -> float: - lookup_key = 'CP-DATAPROC' + def __get_gke_container_cost(self) -> float: + lookup_key = 'CP-GKE-CONTAINER-MANAGMENT-COST' return self.catalogs['gcloud'].get_value(lookup_key, 'us') - - def get_cores_count_for_vm(self, machine_type: str) -> str: - lookup_key = self._key_for_cpe_vm(machine_type) - cores = self.catalogs['gcloud'].get_value_silent(lookup_key, 'cores') - return cores - - def get_ram_size_for_vm(self, machine_type: str) -> str: - lookup_key = self._key_for_cpe_vm(machine_type) - memory = self.catalogs['gcloud'].get_value_silent(lookup_key, 'memory') - return memory - - @classmethod - def _key_for_cpe_machine_cores(cls, machine_type: str) -> str: - return f'CP-COMPUTEENGINE-{cls._get_machine_prefix(machine_type).upper()}-PREDEFINED-VM-CORE' - - @classmethod - def _key_for_cpe_machine_ram(cls, machine_type: str) -> str: - return f'CP-COMPUTEENGINE-{cls._get_machine_prefix(machine_type).upper()}-PREDEFINED-VM-RAM' - - @classmethod - def _key_for_gpu_device(cls, gpu_device: str) -> str: - return f'GPU_NVIDIA_TESLA_{gpu_device.upper()}' - - @classmethod - def _get_machine_prefix(cls, machine_type: str) -> str: - return machine_type.split('-')[0] - - @classmethod - def _key_for_cpe_vm(cls, machine_type: str): - return f'CP-COMPUTEENGINE-VMIMAGE-{machine_type.upper()}' From 565572b61c8c05538c237f7b59268a40e01939b1 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 10 Oct 2023 19:07:34 -0700 Subject: [PATCH 4/6] Update docs and pylints Signed-off-by: Partho Sarthi --- user_tools/docs/index.md | 4 + user_tools/docs/user-tools-dataproc-gke.md | 180 ++++++++++++++++++ .../cloud_api/dataproc_gke.py | 2 +- .../cloud_api/sp_types.py | 1 + .../src/spark_rapids_tools/cmdli/tools_cli.py | 4 +- user_tools/tests/mock_cluster.py | 3 +- 6 files changed, 190 insertions(+), 4 deletions(-) create mode 100644 user_tools/docs/user-tools-dataproc-gke.md diff --git a/user_tools/docs/index.md b/user_tools/docs/index.md index 48481b004..e47e83078 100644 --- a/user_tools/docs/index.md +++ b/user_tools/docs/index.md @@ -95,6 +95,9 @@ The following table summarizes the commands supported for each cloud platform: | | diagnostic | spark_rapids_user_tools \ | 23.06+ | | | | dataproc diagnostic [ARGS] | | +------------------+---------------+-----------------------------------------+----------+ +| Dataproc_GKE | qualification | spark_rapids_user_tools \ | 23.08.2+ | +| | | dataproc-gke qualification [ARGS] | | ++------------------+---------------+-----------------------------------------+----------+ | Databricks_AWS | qualification | spark_rapids_user_tools \ | 23.04+ | | | | databricks-aws qualification [ARGS] | | | +---------------+-----------------------------------------+----------+ @@ -131,6 +134,7 @@ platform: - [AWS EMR](user-tools-aws-emr.md) - [Google Cloud Dataproc](user-tools-dataproc.md) +- [Google Cloud Dataproc GKE](user-tools-dataproc-gke.md) - [Databricks_AWS](user-tools-databricks-aws.md) - [Databricks_Azure](user-tools-databricks-azure.md) - [OnPrem](user-tools-onprem.md) diff --git a/user_tools/docs/user-tools-dataproc-gke.md b/user_tools/docs/user-tools-dataproc-gke.md new file mode 100644 index 000000000..6fe2c5955 --- /dev/null +++ b/user_tools/docs/user-tools-dataproc-gke.md @@ -0,0 +1,180 @@ +# RAPIDS User Tools on Dataproc GKE + +This is a guide for the RAPIDS tools for Apache Spark on [Google Cloud Dataproc GKE](https://cloud.google.com/dataproc/docs/guides/dpgke/dataproc-gke-overview). +At the end of this guide, the user will be able to run the RAPIDS tools to analyze the clusters and +the applications running on _Google Cloud Dataproc GKE_. + + +## Prerequisites + +### 1.gcloud CLI + +- Install the gcloud CLI. Follow the instructions on [gcloud-sdk-install](https://cloud.google.com/sdk/docs/install) +- Set the configuration settings and credentials of the gcloud CLI: + - Initialize the gcloud CLI by following [these instructions](https://cloud.google.com/sdk/docs/initializing#initialize_the) + - Grant authorization to the gcloud CLI [with a user account](https://cloud.google.com/sdk/docs/authorizing#authorize_with_a_user_account) + - Set up application default credentials to the gcloud CLI [by logging in](https://cloud.google.com/sdk/docs/authorizing#set_up_application_default_credentials) + - Manage gcloud CLI configurations. For more details, visit [gcloud-sdk-configurations](https://cloud.google.com/sdk/docs/configurations) + - Verify that the following [gcloud CLI properties](https://cloud.google.com/sdk/docs/properties) are properly defined: + - `dataproc/region`, + - `compute/zone`, + - `compute/region` + - `core/project` + +### 2.RAPIDS tools + +- Spark event logs: + - The RAPIDS tools can process Apache Spark CPU event logs from Spark 2.0 or higher (raw, .lz4, .lzf, .snappy, .zstd) + - For `qualification` commands, the event logs need to be archived to an accessible gs folder. + +### 3.Install the package + +- Install `spark-rapids-user-tools` with python [3.8, 3.10] using: + - pip: `pip install spark-rapids-user-tools` + - wheel-file: `pip install ` + - from source: `pip install -e .` +- verify the command is installed correctly by running + ```bash + spark_rapids_user_tools dataproc-gke -- --help + ``` + +### 4.Environment variables + +Before running any command, you can set environment variables to specify configurations. +RAPIDS variables have a naming pattern `RAPIDS_USER_TOOLS_*`: + - `RAPIDS_USER_TOOLS_CACHE_FOLDER`: specifies the location of a local directory that the RAPIDS-cli uses to + store and cache the downloaded resources. The default is `/var/tmp/spark_rapids_user_tools_cache`. + Note that caching the resources locally has an impact on the total execution time of the command. + - `RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY`: specifies the location of a local directory that the RAPIDS-cli uses to + generate the output. The wrapper CLI arguments override that environment variable + (`--output_folder` and `local_folder` for Bootstrap and Qualification respectively). + +## Qualification command + +### Local deployment + +``` +spark_rapids_user_tools dataproc-gke qualification [options] +spark_rapids_user_tools dataproc-gke qualification -- --help +``` + +The local deployment runs on the local development machine. It requires: +1. Installing and configuring the gcloud CLI (`gsutil` and `gcloud` commands) +2. Java 1.8+ development environment +3. Internet access to download JAR dependencies from mvn: `spark-*.jar`, and `gcs-connector-hadoop-*.jar` +4. Dependencies are cached on the local disk to reduce the overhead of the download. + + +#### Command options + +| Option | Description | Default | Required | +|--------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------:| +| **cpu_cluster** | The virtual Dataproc-cluster on which the Apache Spark applications were executed. Accepted values are an virtual Dataproc-cluster name, or a valid path to the cluster properties file (json format) generated by gcloud CLI command `gcloud dataproc clusters describe`. This should not be confused with the GKE cluster name. | N/A | N | +| **eventlogs** | A comma seperated list of gs urls pointing to event logs or gs directory | Reads the Spark's property `spark.eventLog.dir` defined in `cpu_cluster`. This property should be included in the output of `dataproc clusters describe`. Note that the wrapper will raise an exception if the property is not set. | N | +| **remote_folder** | The gs folder where the output of the wrapper's output is copied. If missing, the output will be available only on local disk | N/A | N | +| **gpu_cluster** | The virtual Dataproc-cluster on which the Spark applications is planned to be migrated. The argument can be an virtual Dataproc-cluster or a valid path to the cluster's properties file (json format) generated by the gcloud CLI command `gcloud dataproc clusters describe` | The wrapper maps the machine instances of the original cluster into GPU supported instances | N | +| **local_folder** | Local work-directory path to store the output and to be used as root directory for temporary folders/files. The final output will go into a subdirectory named `qual-${EXEC_ID}` where `exec_id` is an auto-generated unique identifier of the execution. | If the argument is NONE, the default value is the env variable `RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY` if any; or the current working directory. | N | +| **jvm_heap_size** | The maximum heap size of the JVM in gigabytes | 24 | N | +| **tools_jar** | Path to a bundled jar including RAPIDS tool. The path is a local filesystem, or remote gs url | Downloads the latest `rapids-4-spark-tools_*.jar` from mvn repo | N | +| **credentials_file** | The local path of JSON file that contains the application credentials | If missing, loads the env variable `GOOGLE_APPLICATION_CREDENTIALS` if any. Otherwise, it uses the default path "$HOME/.config/gcloud/application_default_credentials.json" | N | +| **filter_apps** | Filtering criteria of the applications listed in the final STDOUT table is one of the following (`ALL`, `SPEEDUPS`, `SAVINGS`). "`ALL`" means no filter applied. "`SPEEDUPS`" lists all the apps that are either '_Recommended_', or '_Strongly Recommended_' based on speedups. "`SAVINGS`" lists all the apps that have positive estimated GPU savings except for the apps that are '_Not Applicable_'. | `SAVINGS` | N | +| **gpu_cluster_recommendation** | The type of GPU cluster recommendation to generate. It accepts one of the following (`CLUSTER`, `JOB`, `MATCH`). `MATCH`: keep GPU cluster same number of nodes as CPU cluster; `CLUSTER`: recommend optimal GPU cluster by cost for entire cluster. `JOB`: recommend optimal GPU cluster by cost per job | `MATCH` | N | +| **cpu_discount** | A percent discount for the cpu cluster cost in the form of an integer value (e.g. 30 for 30% discount) | N/A | N | +| **gpu_discount** | A percent discount for the gpu cluster cost in the form of an integer value (e.g. 30 for 30% discount) | N/A | N | +| **global_discount** | A percent discount for both the cpu and gpu cluster costs in the form of an integer value (e.g. 30 for 30% discount) | N/A | N | +| **verbose** | True or False to enable verbosity to the wrapper script | False if `RAPIDS_USER_TOOLS_LOG_DEBUG` is not set | N | +| **rapids_options**** | A list of valid [Qualification tool options](../../core/docs/spark-qualification-tool.md#qualification-tool-options). Note that (`output-directory`, `platform`) flags are ignored, and that multiple "spark-property" is not supported. | N/A | N | + +#### Use case scenario + +A typical workflow to successfully run the `qualification` command in local mode is described as follows: + +1. Store the Apache Spark event logs in gs folder. +2. A user sets up his development machine: + 1. configures Java + 2. installs gcloud CLI and configures the profile and the credentials to make sure the gcloud CLI + commands can access the gs resources `LOGS_BUCKET`. + 3. installs `spark_rapids_user_tools` +3. If the results of the wrapper need to be stored on gs, then another gs uri is required `REMOTE_FOLDER=gs://OUT_BUCKET/` +4. User defines the virtual Dataproc-cluster on which the Spark application were running. Note that the cluster needs to be + active; Dataproc on GKE cannot be present in `STOPPED` state. It has to be visible by the gcloud CLI (i.e., can run `gcloud dataproc clusters describe + cluster_name`). +5. The following script runs qualification by passing gs remote directory to store the output: + + ``` + # define the wrapper cache directory if necessary + export RAPIDS_USER_TOOLS_CACHE_FOLDER=my_cache_folder + export EVENTLOGS=gs://LOGS_BUCKET/eventlogs/ + export CLUSTER_NAME=my-virtual-dataproc-cpu-cluster + export REMOTE_FOLDER=gs://OUT_BUCKET/wrapper_output + + spark_rapids_user_tools dataproc-gke qualification \ + --eventlogs $EVENTLOGS \ + --cpu_cluster $CLUSTER_NAME \ + --remote_folder $REMOTE_FOLDER + ``` + The wrapper generates a unique-Id for each execution in the format of `qual__<0x%08X>` + The above command will generate a directory containing `qualification_summary.csv` in addition to + the actual folder of the RAPIDS Qualification tool. The directory will be mirrored to gs path (`REMOTE_FOLDER`). + + ``` + ./qual__<0x%08X>/qualification_summary.csv + ./qual__<0x%08X>/rapids_4_spark_qualification_output/ + ``` + +### Qualification output + +For each app, the command output lists the following fields: + +- `App ID`: An application is referenced by its application ID, '_app-id_'. When running on YARN, + each application may have multiple attempts, but there are attempt IDs only for applications + in cluster mode, not applications in client mode. Applications in YARN cluster mode can be + identified by their attempt-id. +- `App Name`: Name of the application +- `Speedup Based Recommendation`: Recommendation based on '_Estimated Speed-up Factor_'. Note that an + application that has job or stage failures will be labeled '_Not Applicable_' +- `Savings Based Recommendation`: Recommendation based on '_Estimated GPU Savings_'. + - '_Strongly Recommended_': An app with savings GEQ 40% + - '_Recommended_': An app with savings between (1, 40) % + - '_Not Recommended_': An app with no savings + - '_Not Applicable_': An app that has job or stage failures. +- `Estimated GPU Speedup`: Speed-up factor estimated for the app. Calculated as the ratio + between '_App Duration_' and '_Estimated GPU Duration_'. +- `Estimated GPU Duration`: Predicted runtime of the app if it was run on GPU +- `App Duration`: Wall-Clock time measured since the application starts till it is completed. + If an app is not completed an estimated completion time would be computed. +- `Estimated GPU Savings(%)`: Percentage of cost savings of the app if it migrates to an + accelerated cluster. It is calculated as: + ``` + estimated_saving = 100 - ((100 * gpu_cost) / cpu_cost) + +The command creates a directory with UUID that contains the following: +- Directory generated by the RAPIDS qualification tool `rapids_4_spark_qualification_output`; +- A CSV file that contains the summary of all the applications along with estimated absolute costs +- Sample directory structure: + ``` + qual_20230314145334_d2CaFA34 + ├── qualification_summary.csv + └── rapids_4_spark_qualification_output + ├── ui + │ └── html + │ ├── sql-recommendation.html + │ ├── index.html + │ ├── application.html + │ └── raw.html + ├── rapids_4_spark_qualification_output_stages.csv + ├── rapids_4_spark_qualification_output.csv + ├── rapids_4_spark_qualification_output_execs.csv + └── rapids_4_spark_qualification_output.log + 3 directories, 9 files + + ``` + +#### TCO calculator + +In the `qualification_summary.csv` output file, you will see two additional columns appended: +`Estimated Job Frequency (monthly)` and `Annual Cost Savings`. +These new columns are to be used as part of a TCO calculator to see the long-term benefit of using +Spark RAPIDS with your applications. +A GSheet template with instructions can be found at here: [link](https://docs.google.com/spreadsheets/d/1CslQHTwxHEDTlAP4lcrOzbSrmucvn8z4iFlJo6EAhxs/edit#gid=1607726286). +Make a copy of the GSheet template and then follow the instructions listed in the `Instructions` tab. diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py index a84ec7d29..06a71d342 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py @@ -135,7 +135,7 @@ class DataprocGkeCluster(DataprocCluster): """ Represents an instance of running cluster on DataprocGke. """ - node_pools: list[GkeNodePool] = field(default=None, init=False) + node_pools: list = field(default=None, init=False) @staticmethod def __extract_info_from_value(conf_val: str): diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index 52522ce3f..b8f4f6a29 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -28,6 +28,7 @@ from spark_rapids_pytools.common.sys_storage import StorageDriver, FSUtil from spark_rapids_pytools.common.utilities import ToolLogging, SysCmd, Utils, TemplateGenerator + class DeployMode(EnumeratedType): """List of tools deployment methods""" # The rapids job is running on local node diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index 2f971dcbe..50cd28d01 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -64,8 +64,8 @@ def qualification(self, Skipping this argument requires that the cluster argument points to a valid cluster name on the CSP. :param cluster: Name of cluster or path to cluster-properties. - :param platform: defines one of the following "onprem", "emr", "dataproc", "databricks-aws", - and "databricks-azure". + :param platform: defines one of the following "onprem", "emr", "dataproc", "dataproc-gke", + "databricks-aws", and "databricks-azure". :param target_platform: Cost savings and speedup recommendation for comparable cluster in target_platform based on on-premises cluster configuration. diff --git a/user_tools/tests/mock_cluster.py b/user_tools/tests/mock_cluster.py index 547de368b..65b74155f 100644 --- a/user_tools/tests/mock_cluster.py +++ b/user_tools/tests/mock_cluster.py @@ -34,7 +34,8 @@ "workerConfig": { "accelerators": [{ "acceleratorTypeUri": "https://www.googleapis.com/compute/beta/projects/project-id/zones/"\ - "us-central1-a/acceleratorTypes/nvidia-tesla-t4" + "us-central1-a/acceleratorTypes/nvidia-tesla-t4", + "acceleratorCount": 1, }], "instanceNames": [ "test-worker-0", From 30abc6e48b86f504594fe16d22cae251c202de1c Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Tue, 10 Oct 2023 19:42:45 -0700 Subject: [PATCH 5/6] Update docs Signed-off-by: Partho Sarthi --- user_tools/docs/user-tools-dataproc-gke.md | 4 ++-- .../wrappers/dataproc_gke_wrapper.py | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/user_tools/docs/user-tools-dataproc-gke.md b/user_tools/docs/user-tools-dataproc-gke.md index 6fe2c5955..836eb9f4f 100644 --- a/user_tools/docs/user-tools-dataproc-gke.md +++ b/user_tools/docs/user-tools-dataproc-gke.md @@ -69,10 +69,10 @@ The local deployment runs on the local development machine. It requires: | Option | Description | Default | Required | |--------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------:| -| **cpu_cluster** | The virtual Dataproc-cluster on which the Apache Spark applications were executed. Accepted values are an virtual Dataproc-cluster name, or a valid path to the cluster properties file (json format) generated by gcloud CLI command `gcloud dataproc clusters describe`. This should not be confused with the GKE cluster name. | N/A | N | +| **cpu_cluster** | The virtual Dataproc-cluster on which the Apache Spark applications were executed. Accepted values are a virtual Dataproc-cluster name, or a valid path to the cluster properties file (json format) generated by gcloud CLI command `gcloud dataproc clusters describe`. This should not be confused with the GKE cluster name. | N/A | N | | **eventlogs** | A comma seperated list of gs urls pointing to event logs or gs directory | Reads the Spark's property `spark.eventLog.dir` defined in `cpu_cluster`. This property should be included in the output of `dataproc clusters describe`. Note that the wrapper will raise an exception if the property is not set. | N | | **remote_folder** | The gs folder where the output of the wrapper's output is copied. If missing, the output will be available only on local disk | N/A | N | -| **gpu_cluster** | The virtual Dataproc-cluster on which the Spark applications is planned to be migrated. The argument can be an virtual Dataproc-cluster or a valid path to the cluster's properties file (json format) generated by the gcloud CLI command `gcloud dataproc clusters describe` | The wrapper maps the machine instances of the original cluster into GPU supported instances | N | +| **gpu_cluster** | The virtual Dataproc-cluster on which the Spark applications is planned to be migrated. The argument can be a virtual Dataproc-cluster or a valid path to the cluster's properties file (json format) generated by the gcloud CLI command `gcloud dataproc clusters describe`. This should not be confused with the GKE cluster name. | The wrapper maps the machine instances of the original cluster into GPU supported instances | N | | **local_folder** | Local work-directory path to store the output and to be used as root directory for temporary folders/files. The final output will go into a subdirectory named `qual-${EXEC_ID}` where `exec_id` is an auto-generated unique identifier of the execution. | If the argument is NONE, the default value is the env variable `RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY` if any; or the current working directory. | N | | **jvm_heap_size** | The maximum heap size of the JVM in gigabytes | 24 | N | | **tools_jar** | Path to a bundled jar including RAPIDS tool. The path is a local filesystem, or remote gs url | Downloads the latest `rapids-4-spark-tools_*.jar` from mvn repo | N | diff --git a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py index 29a4ff1b2..bce199887 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py @@ -47,9 +47,10 @@ def qualification(cpu_cluster: str = None, help quantify the expected acceleration and costs savings of migrating a Spark application or query to GPU. The wrapper downloads dependencies and executes the analysis on the local dev machine - :param cpu_cluster: The Dataproc-cluster on which the Spark applications were executed. The argument - can be a Dataproc-cluster or a valid path to the cluster's properties file (json format) - generated by the gcloud-CLI. + :param cpu_cluster: The virtual Dataproc-cluster on which the Apache Spark applications were executed. + Accepted values are a virtual Dataproc-cluster name, or a valid path to the cluster properties + file (json format) generated by gcloud CLI command `gcloud dataproc clusters describe`. This + should not be confused with the GKE cluster name. :param eventlogs: Event log filenames or gcs storage directories containing event logs (comma separated). If missing, the wrapper Reads the Spark's property `spark.eventLog.dir` defined in `cpu_cluster`. This property should be included @@ -62,10 +63,10 @@ def qualification(cpu_cluster: str = None, RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY if any; or the current working directory :param remote_folder: A gcs folder where the output is uploaded at the end of execution. If no value is provided, the output will be only available on local disk - :param gpu_cluster: The Dataproc-cluster on which the Spark applications is planned to be migrated. - The argument can be a Dataproc-cluster or a valid path to the cluster's properties file - (json format) generated by the gcloud-CLI. If missing, the wrapper maps the dataproc machine - instances of the original cluster into dataproc instances that support GPU acceleration + :param gpu_cluster: The virtual Dataproc-cluster on which the Spark applications is planned to be + migrated. The argument can be a virtual Dataproc-cluster or a valid path to the cluster's + properties file (json format) generated by the gcloud CLI command `gcloud dataproc clusters + describe`. This should not be confused with the GKE cluster name. :param tools_jar: Path to a bundled jar including Rapids tool. The path is a local filesystem, or remote gcs url. If missing, the wrapper downloads the latest rapids-4-spark-tools_*.jar from maven repo From e5d16713562a92f862bc259ae82cbbd7fcaceb8d Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Thu, 12 Oct 2023 16:16:52 -0700 Subject: [PATCH 6/6] Remove bootstrap and profiling mentions Signed-off-by: Partho Sarthi --- user_tools/docs/user-tools-dataproc-gke.md | 3 +-- .../src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/user_tools/docs/user-tools-dataproc-gke.md b/user_tools/docs/user-tools-dataproc-gke.md index 836eb9f4f..58d5105d3 100644 --- a/user_tools/docs/user-tools-dataproc-gke.md +++ b/user_tools/docs/user-tools-dataproc-gke.md @@ -46,8 +46,7 @@ RAPIDS variables have a naming pattern `RAPIDS_USER_TOOLS_*`: store and cache the downloaded resources. The default is `/var/tmp/spark_rapids_user_tools_cache`. Note that caching the resources locally has an impact on the total execution time of the command. - `RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY`: specifies the location of a local directory that the RAPIDS-cli uses to - generate the output. The wrapper CLI arguments override that environment variable - (`--output_folder` and `local_folder` for Bootstrap and Qualification respectively). + generate the output. The wrapper CLI arguments override that environment variable (`--local_folder` for Qualification). ## Qualification command diff --git a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py index bce199887..87b9e4bfe 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/dataproc_gke_wrapper.py @@ -136,7 +136,7 @@ def qualification(cpu_cluster: str = None, class DataprocGKEWrapper: # pylint: disable=too-few-public-methods """ - A wrapper script to run RAPIDS Accelerator tools (Qualification, Profiling, and Bootstrap) on Gcloud Dataproc GKE. + A wrapper script to run RAPIDS Accelerator tool (Qualification) on Gcloud Dataproc GKE. """ def __init__(self): self.qualification = CliDataprocGKELocalMode.qualification