Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
Browse files Browse the repository at this point in the history
…pu_transition_time
  • Loading branch information
nartal1 committed Oct 14, 2023
2 parents 4a8ab84 + 5fd7299 commit b17df32
Show file tree
Hide file tree
Showing 21 changed files with 740 additions and 107 deletions.
4 changes: 4 additions & 0 deletions user_tools/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ The following table summarizes the commands supported for each cloud platform:
| | diagnostic | spark_rapids_user_tools \ | 23.06+ |
| | | dataproc diagnostic [ARGS] | |
+------------------+---------------+-----------------------------------------+----------+
| Dataproc_GKE | qualification | spark_rapids_user_tools \ | 23.08.2+ |
| | | dataproc-gke qualification [ARGS] | |
+------------------+---------------+-----------------------------------------+----------+
| Databricks_AWS | qualification | spark_rapids_user_tools \ | 23.04+ |
| | | databricks-aws qualification [ARGS] | |
| +---------------+-----------------------------------------+----------+
Expand Down Expand Up @@ -131,6 +134,7 @@ platform:

- [AWS EMR](user-tools-aws-emr.md)
- [Google Cloud Dataproc](user-tools-dataproc.md)
- [Google Cloud Dataproc GKE](user-tools-dataproc-gke.md)
- [Databricks_AWS](user-tools-databricks-aws.md)
- [Databricks_Azure](user-tools-databricks-azure.md)
- [OnPrem](user-tools-onprem.md)
69 changes: 34 additions & 35 deletions user_tools/docs/user-tools-databricks-aws.md

Large diffs are not rendered by default.

63 changes: 31 additions & 32 deletions user_tools/docs/user-tools-databricks-azure.md

Large diffs are not rendered by default.

179 changes: 179 additions & 0 deletions user_tools/docs/user-tools-dataproc-gke.md

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,24 @@ def _build_platform_list_cluster(self, cluster, query_args: dict = None) -> list
def pull_cluster_props_by_args(self, args: dict) -> str:
get_cluster_cmd = ['databricks', 'clusters', 'get']
if 'Id' in args:
get_cluster_cmd.extend(['--cluster-id', args.get('Id')])
get_cluster_cmd.extend([args.get('Id')])
elif 'cluster' in args:
get_cluster_cmd.extend(['--cluster-name', args.get('cluster')])
# TODO: currently, arguments '--cpu_cluster' or '--gpu_cluster' are processed and stored as
# 'cluster' (as cluster names), while they are actually cluster ids for databricks platforms
get_cluster_cmd.extend([args.get('cluster')])
else:
self.logger.error('Invalid arguments to pull the cluster properties')
cluster_described = self.run_sys_cmd(get_cluster_cmd)
if cluster_described is not None:
raw_prop_container = JSONPropertiesContainer(prop_arg=cluster_described, file_load=False)
return json.dumps(raw_prop_container.props)
return cluster_described
self.logger.error('Unable to pull cluster id or cluster name information')

try:
cluster_described = self.run_sys_cmd(get_cluster_cmd)
if cluster_described is not None:
raw_prop_container = JSONPropertiesContainer(prop_arg=cluster_described, file_load=False)
return json.dumps(raw_prop_container.props)
except Exception as ex:
self.logger.error('Invalid arguments to pull the cluster properties: %s', ex)
raise ex

return None

def _build_cmd_ssh_prefix_for_node(self, node: ClusterNode) -> str:
port = self.env_vars.get('sshPort')
Expand Down
20 changes: 16 additions & 4 deletions user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,24 @@ def _build_platform_list_cluster(self, cluster, query_args: dict = None) -> list
def pull_cluster_props_by_args(self, args: dict) -> str:
get_cluster_cmd = ['databricks', 'clusters', 'get']
if 'Id' in args:
get_cluster_cmd.extend(['--cluster-id', args.get('Id')])
get_cluster_cmd.extend([args.get('Id')])
elif 'cluster' in args:
get_cluster_cmd.extend(['--cluster-name', args.get('cluster')])
# TODO: currently, arguments '--cpu_cluster' or '--gpu_cluster' are processed and stored as
# 'cluster' (as cluster names), while they are actually cluster ids for databricks platforms
get_cluster_cmd.extend([args.get('cluster')])
else:
self.logger.error('Invalid arguments to pull the cluster properties')
return self.run_sys_cmd(get_cluster_cmd)
self.logger.error('Unable to pull cluster id or cluster name information')

try:
cluster_described = self.run_sys_cmd(get_cluster_cmd)
if cluster_described is not None:
raw_prop_container = JSONPropertiesContainer(prop_arg=cluster_described, file_load=False)
return json.dumps(raw_prop_container.props)
except Exception as ex:
self.logger.error('Invalid arguments to pull the cluster properties: %s', ex)
raise ex

return None

def _build_cmd_ssh_prefix_for_node(self, node: ClusterNode) -> str:
port = self.env_vars.get('sshPort')
Expand Down
14 changes: 7 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def calc_num_gpus(gpus_criteria_conf: List[dict], num_cores: int) -> int:


@dataclass
class DataprocCMDDriver(CMDDriverBase):
class DataprocCMDDriver(CMDDriverBase): # pylint: disable=abstract-method
"""Represents the command interface that will be used by Dataproc"""

def _list_inconsistent_configurations(self) -> list:
Expand Down Expand Up @@ -326,8 +326,8 @@ def parse_accelerator_description(raw_description: str) -> dict:
for defined_acc in accelerator_arr:
# TODO: if the accelerator_arr has other non-gpu ones, then we need to loop until we
# find the gpu accelerators
gpu_configs = {'num_gpus': defined_acc.get('acceleratorCount')}
accelerator_type = defined_acc.get('acceleratorTypeUri')
gpu_configs = {'num_gpus': int(defined_acc.get('acceleratorCount'))}
accelerator_type = defined_acc.get('acceleratorTypeUri') or defined_acc.get('acceleratorType')
gpu_device_type = self.__extract_info_from_value(accelerator_type)
gpu_description = cli.exec_platform_describe_accelerator(accelerator_type=gpu_device_type,
cmd_args=None)
Expand All @@ -350,7 +350,7 @@ def _set_fields_from_props(self):
# set the machine type
if not self.props:
return
mc_type_uri = self.props.get_value('machineTypeUri')
mc_type_uri = self.props.get_value_silent('machineTypeUri')
if mc_type_uri:
self.instance_type = self.__extract_info_from_value(mc_type_uri)
else:
Expand Down Expand Up @@ -514,7 +514,7 @@ class DataprocSavingsEstimator(SavingsEstimator):
"""
A class that calculates the savings based on Dataproc price provider
"""
def __calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: SparkNodeType):
def _calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: SparkNodeType):
nodes_cnt = cluster_inst.get_nodes_cnt(node_type)
cores_count = cluster_inst.get_node_core_count(node_type)
mem_mb = cluster_inst.get_node_mem_mb(node_type)
Expand All @@ -532,7 +532,7 @@ def __calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: Sp
return nodes_cnt * (cores_cost + memory_cost + gpu_cost)

def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
master_cost = self.__calculate_group_cost(cluster, SparkNodeType.MASTER)
workers_cost = self.__calculate_group_cost(cluster, SparkNodeType.WORKER)
master_cost = self._calculate_group_cost(cluster, SparkNodeType.MASTER)
workers_cost = self._calculate_group_cost(cluster, SparkNodeType.WORKER)
dataproc_cost = self.price_provider.get_container_cost()
return master_cost + workers_cost + dataproc_cost
200 changes: 200 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Implementation specific to Dataproc"""

from dataclasses import dataclass, field
from typing import Any

from spark_rapids_pytools.cloud_api.dataproc import DataprocCluster, DataprocCMDDriver, DataprocNode, \
DataprocPlatform, DataprocSavingsEstimator
from spark_rapids_pytools.cloud_api.dataproc_gke_job import DataprocGkeLocalRapidsJob
from spark_rapids_pytools.cloud_api.sp_types import CMDDriverBase, \
SparkNodeType, ClusterGetAccessor
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.pricing.dataproc_gke_pricing import DataprocGkePriceProvider
from spark_rapids_tools import CspEnv


@dataclass
class DataprocGkePlatform(DataprocPlatform):
"""
Represents the interface and utilities required by DataprocGke.
Prerequisites:
- install gcloud command lines (gcloud, gsutil)
- configure the gcloud CLI.
- dataproc_gke has staging temporary storage. we can retrieve that from the cluster properties.
"""

def __post_init__(self):
super().__post_init__()
self.type_id = CspEnv.DATAPROC_GKE

@classmethod
def get_spark_node_type_fromstring(cls, value: str):
node_type_mapping = {
'SPARK_EXECUTOR': SparkNodeType.WORKER,
'SPARK_DRIVER': SparkNodeType.MASTER,
}
return node_type_mapping.get(value.upper())

def _construct_cli_object(self) -> CMDDriverBase:
return DataprocGkeCMDDriver(timeout=0, cloud_ctxt=self.ctxt)

def _construct_cluster_from_props(self, cluster: str, props: str = None):
return DataprocGkeCluster(self).set_connection(cluster_id=cluster, props=props)

def migrate_cluster_to_gpu(self, orig_cluster):
"""
given a cluster, convert it to run NVIDIA Gpu based on mapping instance types
:param orig_cluster: the original cluster to migrate from
:return: a new object cluster that supports GPU.
"""
gpu_cluster_ob = DataprocGkeCluster(self)
gpu_cluster_ob.migrate_from_cluster(orig_cluster)
return gpu_cluster_ob

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config,
file_load=False)
else:
pricing_config: JSONPropertiesContainer = None
pricing_provider = DataprocGkePriceProvider(region=self.cli.get_region(),
pricing_configs={'gcloud': pricing_config})
saving_estimator = DataprocGkeSavingsEstimator(price_provider=pricing_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocGkeLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)


@dataclass
class DataprocGkeCMDDriver(DataprocCMDDriver):
"""Represents the command interface that will be used by DataprocGke"""

def pull_node_pool_props_by_args(self, args: dict) -> str:
node_pool_name = args.get('node_pool_name')
gke_cluster_name = args.get('gke_cluster_name')
if 'region' in args:
region_name = args.get('region')
else:
region_name = self.get_region()
describe_cluster_cmd = ['gcloud',
'container',
'node-pools',
'describe',
node_pool_name,
'--cluster',
gke_cluster_name,
'--region',
region_name]
return self.run_sys_cmd(describe_cluster_cmd)


@dataclass
class GkeNodePool:
"""
Holds information about node pools
"""
name: str
roles: list
gke_cluster_name: str
spark_node_type: SparkNodeType = field(default=None, init=False) # map the role to Spark type.

def __post_init__(self):
# only the first role is considered
if len(self.roles) > 0:
self.spark_node_type = DataprocGkePlatform.get_spark_node_type_fromstring(self.roles[0])


@dataclass
class DataprocGkeCluster(DataprocCluster):
"""
Represents an instance of running cluster on DataprocGke.
"""
node_pools: list = field(default=None, init=False)

@staticmethod
def __extract_info_from_value(conf_val: str):
if '/' in conf_val:
# this is a valid url-path
return FSUtil.get_resource_name(conf_val)
# this is a value
return conf_val

def _init_node_pools(self):
gke_cluster_config = self.props.get_value('virtualClusterConfig', 'kubernetesClusterConfig', 'gkeClusterConfig')
gke_cluster_name = self.__extract_info_from_value(gke_cluster_config['gkeClusterTarget'])
raw_node_pools = gke_cluster_config['nodePoolTarget']

def create_node_pool_elem(node_pool: dict) -> GkeNodePool:
pool_name = self.__extract_info_from_value(node_pool['nodePool'])
return GkeNodePool(name=pool_name, roles=node_pool['roles'], gke_cluster_name=gke_cluster_name)

self.node_pools = [create_node_pool_elem(node_pool) for node_pool in raw_node_pools]

def _init_nodes(self):
self._init_node_pools()

def create_cluster_node(node_pool):
if node_pool.spark_node_type is None:
return None
args = {'node_pool_name': node_pool.name, 'gke_cluster_name': node_pool.gke_cluster_name}
raw_node_props = self.cli.pull_node_pool_props_by_args(args)
node_props = JSONPropertiesContainer(prop_arg=raw_node_props, file_load=False)
node = DataprocNode.create_node(node_pool.spark_node_type).set_fields_from_dict({
'name': node_props.get_value('name'),
'props': JSONPropertiesContainer(prop_arg=node_props.get_value('config'), file_load=False),
'zone': self.zone
})
node.fetch_and_set_hw_info(self.cli)
return node

executor_nodes = []
driver_nodes = []
for gke_node_pool in self.node_pools:
c_node = create_cluster_node(gke_node_pool)
if gke_node_pool.spark_node_type == SparkNodeType.WORKER:
executor_nodes.append(c_node)
elif gke_node_pool.spark_node_type == SparkNodeType.MASTER:
driver_nodes.append(c_node)
self.nodes = {
SparkNodeType.WORKER: executor_nodes,
SparkNodeType.MASTER: driver_nodes[0]
}


@dataclass
class DataprocGkeSavingsEstimator(DataprocSavingsEstimator):
"""
A class that calculates the savings based on DataprocGke price provider
"""

def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
master_cost = self._calculate_group_cost(cluster, SparkNodeType.MASTER)
workers_cost = self._calculate_group_cost(cluster, SparkNodeType.WORKER)
dataproc_gke_cost = self.price_provider.get_container_cost()
return master_cost + workers_cost + dataproc_gke_cost
27 changes: 27 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation of Job submissions on GCloud Dataproc GKE"""

from dataclasses import dataclass

from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob


@dataclass
class DataprocGkeLocalRapidsJob(RapidsLocalJob):
"""
Implementation of a RAPIDS job that runs on a local machine.
"""
job_label = 'dataprocLocal'
2 changes: 1 addition & 1 deletion user_tools/src/spark_rapids_pytools/cloud_api/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _list_inconsistent_configurations(self) -> list:
def pull_cluster_props_by_args(self, args: dict) -> str:
aws_cluster_id = args.get('Id')
cluster_name = args.get('cluster')
if args.get('Id') is None:
if aws_cluster_id is None:
# use cluster name to get the cluster values
# we need to get the cluster_id from the list command first.
list_cmd_res = self.exec_platform_list_cluster_by_name(cluster_name)
Expand Down
4 changes: 4 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ def _build_platform_list_cluster(self,
query_args: dict = None) -> list:
raise NotImplementedError

def pull_node_pool_props_by_args(self, args: dict) -> str:
raise NotImplementedError

def exec_platform_list_cluster_instances(self,
cluster,
query_args: dict = None) -> str:
Expand Down Expand Up @@ -1178,6 +1181,7 @@ def get_platform(platform_id: Enum) -> Type[PlatformBase]:
CspEnv.DATABRICKS_AWS: ('spark_rapids_pytools.cloud_api.databricks_aws', 'DBAWSPlatform'),
CspEnv.DATABRICKS_AZURE: ('spark_rapids_pytools.cloud_api.databricks_azure', 'DBAzurePlatform'),
CspEnv.DATAPROC: ('spark_rapids_pytools.cloud_api.dataproc', 'DataprocPlatform'),
CspEnv.DATAPROC_GKE: ('spark_rapids_pytools.cloud_api.dataproc_gke', 'DataprocGkePlatform'),
CspEnv.EMR: ('spark_rapids_pytools.cloud_api.emr', 'EMRPlatform'),
CspEnv.ONPREM: ('spark_rapids_pytools.cloud_api.onprem', 'OnPremPlatform'),
}
Expand Down
Loading

0 comments on commit b17df32

Please sign in to comment.