Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support in user tools for running qualification on Dataproc GKE #612

Merged
merged 7 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions user_tools/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ The following table summarizes the commands supported for each cloud platform:
| | diagnostic | spark_rapids_user_tools \ | 23.06+ |
| | | dataproc diagnostic [ARGS] | |
+------------------+---------------+-----------------------------------------+----------+
| Dataproc_GKE | qualification | spark_rapids_user_tools \ | 23.08.2+ |
| | | dataproc-gke qualification [ARGS] | |
+------------------+---------------+-----------------------------------------+----------+
| Databricks_AWS | qualification | spark_rapids_user_tools \ | 23.04+ |
| | | databricks-aws qualification [ARGS] | |
| +---------------+-----------------------------------------+----------+
Expand Down Expand Up @@ -131,6 +134,7 @@ platform:

- [AWS EMR](user-tools-aws-emr.md)
- [Google Cloud Dataproc](user-tools-dataproc.md)
- [Google Cloud Dataproc GKE](user-tools-dataproc-gke.md)
- [Databricks_AWS](user-tools-databricks-aws.md)
- [Databricks_Azure](user-tools-databricks-azure.md)
- [OnPrem](user-tools-onprem.md)
179 changes: 179 additions & 0 deletions user_tools/docs/user-tools-dataproc-gke.md

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def calc_num_gpus(gpus_criteria_conf: List[dict], num_cores: int) -> int:


@dataclass
class DataprocCMDDriver(CMDDriverBase):
class DataprocCMDDriver(CMDDriverBase): # pylint: disable=abstract-method
"""Represents the command interface that will be used by Dataproc"""

def _list_inconsistent_configurations(self) -> list:
Expand Down Expand Up @@ -326,8 +326,8 @@ def parse_accelerator_description(raw_description: str) -> dict:
for defined_acc in accelerator_arr:
# TODO: if the accelerator_arr has other non-gpu ones, then we need to loop until we
# find the gpu accelerators
gpu_configs = {'num_gpus': defined_acc.get('acceleratorCount')}
accelerator_type = defined_acc.get('acceleratorTypeUri')
gpu_configs = {'num_gpus': int(defined_acc.get('acceleratorCount'))}
accelerator_type = defined_acc.get('acceleratorTypeUri') or defined_acc.get('acceleratorType')
gpu_device_type = self.__extract_info_from_value(accelerator_type)
gpu_description = cli.exec_platform_describe_accelerator(accelerator_type=gpu_device_type,
cmd_args=None)
Expand All @@ -350,7 +350,7 @@ def _set_fields_from_props(self):
# set the machine type
if not self.props:
return
mc_type_uri = self.props.get_value('machineTypeUri')
mc_type_uri = self.props.get_value_silent('machineTypeUri')
if mc_type_uri:
self.instance_type = self.__extract_info_from_value(mc_type_uri)
else:
Expand Down Expand Up @@ -514,7 +514,7 @@ class DataprocSavingsEstimator(SavingsEstimator):
"""
A class that calculates the savings based on Dataproc price provider
"""
def __calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: SparkNodeType):
def _calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: SparkNodeType):
nodes_cnt = cluster_inst.get_nodes_cnt(node_type)
cores_count = cluster_inst.get_node_core_count(node_type)
mem_mb = cluster_inst.get_node_mem_mb(node_type)
Expand All @@ -532,7 +532,7 @@ def __calculate_group_cost(self, cluster_inst: ClusterGetAccessor, node_type: Sp
return nodes_cnt * (cores_cost + memory_cost + gpu_cost)

def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
master_cost = self.__calculate_group_cost(cluster, SparkNodeType.MASTER)
workers_cost = self.__calculate_group_cost(cluster, SparkNodeType.WORKER)
master_cost = self._calculate_group_cost(cluster, SparkNodeType.MASTER)
workers_cost = self._calculate_group_cost(cluster, SparkNodeType.WORKER)
dataproc_cost = self.price_provider.get_container_cost()
return master_cost + workers_cost + dataproc_cost
200 changes: 200 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Implementation specific to Dataproc"""

from dataclasses import dataclass, field
from typing import Any

from spark_rapids_pytools.cloud_api.dataproc import DataprocCluster, DataprocCMDDriver, DataprocNode, \
DataprocPlatform, DataprocSavingsEstimator
from spark_rapids_pytools.cloud_api.dataproc_gke_job import DataprocGkeLocalRapidsJob
from spark_rapids_pytools.cloud_api.sp_types import CMDDriverBase, \
SparkNodeType, ClusterGetAccessor
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.pricing.dataproc_gke_pricing import DataprocGkePriceProvider
from spark_rapids_tools import CspEnv


@dataclass
class DataprocGkePlatform(DataprocPlatform):
"""
Represents the interface and utilities required by DataprocGke.
Prerequisites:
- install gcloud command lines (gcloud, gsutil)
- configure the gcloud CLI.
- dataproc_gke has staging temporary storage. we can retrieve that from the cluster properties.
"""

def __post_init__(self):
super().__post_init__()
self.type_id = CspEnv.DATAPROC_GKE

@classmethod
def get_spark_node_type_fromstring(cls, value: str):
node_type_mapping = {
'SPARK_EXECUTOR': SparkNodeType.WORKER,
'SPARK_DRIVER': SparkNodeType.MASTER,
}
return node_type_mapping.get(value.upper())

def _construct_cli_object(self) -> CMDDriverBase:
return DataprocGkeCMDDriver(timeout=0, cloud_ctxt=self.ctxt)

def _construct_cluster_from_props(self, cluster: str, props: str = None):
return DataprocGkeCluster(self).set_connection(cluster_id=cluster, props=props)

def migrate_cluster_to_gpu(self, orig_cluster):
"""
given a cluster, convert it to run NVIDIA Gpu based on mapping instance types
:param orig_cluster: the original cluster to migrate from
:return: a new object cluster that supports GPU.
"""
gpu_cluster_ob = DataprocGkeCluster(self)
gpu_cluster_ob.migrate_from_cluster(orig_cluster)
return gpu_cluster_ob

def create_saving_estimator(self,
source_cluster: ClusterGetAccessor,
reshaped_cluster: ClusterGetAccessor,
target_cost: float = None,
source_cost: float = None):
raw_pricing_config = self.configs.get_value_silent('pricing')
if raw_pricing_config:
pricing_config = JSONPropertiesContainer(prop_arg=raw_pricing_config,
file_load=False)
else:
pricing_config: JSONPropertiesContainer = None
pricing_provider = DataprocGkePriceProvider(region=self.cli.get_region(),
pricing_configs={'gcloud': pricing_config})
saving_estimator = DataprocGkeSavingsEstimator(price_provider=pricing_provider,
reshaped_cluster=reshaped_cluster,
source_cluster=source_cluster,
target_cost=target_cost,
source_cost=source_cost)
return saving_estimator

def create_local_submission_job(self, job_prop, ctxt) -> Any:
return DataprocGkeLocalRapidsJob(prop_container=job_prop, exec_ctxt=ctxt)


@dataclass
class DataprocGkeCMDDriver(DataprocCMDDriver):
"""Represents the command interface that will be used by DataprocGke"""

def pull_node_pool_props_by_args(self, args: dict) -> str:
node_pool_name = args.get('node_pool_name')
gke_cluster_name = args.get('gke_cluster_name')
if 'region' in args:
region_name = args.get('region')
else:
region_name = self.get_region()
describe_cluster_cmd = ['gcloud',
'container',
'node-pools',
'describe',
node_pool_name,
'--cluster',
gke_cluster_name,
'--region',
region_name]
return self.run_sys_cmd(describe_cluster_cmd)


@dataclass
class GkeNodePool:
"""
Holds information about node pools
"""
name: str
roles: list
gke_cluster_name: str
spark_node_type: SparkNodeType = field(default=None, init=False) # map the role to Spark type.

def __post_init__(self):
# only the first role is considered
if len(self.roles) > 0:
self.spark_node_type = DataprocGkePlatform.get_spark_node_type_fromstring(self.roles[0])


@dataclass
class DataprocGkeCluster(DataprocCluster):
"""
Represents an instance of running cluster on DataprocGke.
"""
node_pools: list = field(default=None, init=False)

@staticmethod
def __extract_info_from_value(conf_val: str):
if '/' in conf_val:
# this is a valid url-path
return FSUtil.get_resource_name(conf_val)
# this is a value
return conf_val

def _init_node_pools(self):
gke_cluster_config = self.props.get_value('virtualClusterConfig', 'kubernetesClusterConfig', 'gkeClusterConfig')
gke_cluster_name = self.__extract_info_from_value(gke_cluster_config['gkeClusterTarget'])
raw_node_pools = gke_cluster_config['nodePoolTarget']

def create_node_pool_elem(node_pool: dict) -> GkeNodePool:
pool_name = self.__extract_info_from_value(node_pool['nodePool'])
return GkeNodePool(name=pool_name, roles=node_pool['roles'], gke_cluster_name=gke_cluster_name)

self.node_pools = [create_node_pool_elem(node_pool) for node_pool in raw_node_pools]

def _init_nodes(self):
self._init_node_pools()

def create_cluster_node(node_pool):
if node_pool.spark_node_type is None:
return None
args = {'node_pool_name': node_pool.name, 'gke_cluster_name': node_pool.gke_cluster_name}
raw_node_props = self.cli.pull_node_pool_props_by_args(args)
node_props = JSONPropertiesContainer(prop_arg=raw_node_props, file_load=False)
node = DataprocNode.create_node(node_pool.spark_node_type).set_fields_from_dict({
'name': node_props.get_value('name'),
'props': JSONPropertiesContainer(prop_arg=node_props.get_value('config'), file_load=False),
'zone': self.zone
})
node.fetch_and_set_hw_info(self.cli)
return node

executor_nodes = []
driver_nodes = []
for gke_node_pool in self.node_pools:
c_node = create_cluster_node(gke_node_pool)
if gke_node_pool.spark_node_type == SparkNodeType.WORKER:
executor_nodes.append(c_node)
elif gke_node_pool.spark_node_type == SparkNodeType.MASTER:
driver_nodes.append(c_node)
self.nodes = {
SparkNodeType.WORKER: executor_nodes,
SparkNodeType.MASTER: driver_nodes[0]
}


@dataclass
class DataprocGkeSavingsEstimator(DataprocSavingsEstimator):
"""
A class that calculates the savings based on DataprocGke price provider
"""

def _get_cost_per_cluster(self, cluster: ClusterGetAccessor):
master_cost = self._calculate_group_cost(cluster, SparkNodeType.MASTER)
workers_cost = self._calculate_group_cost(cluster, SparkNodeType.WORKER)
dataproc_gke_cost = self.price_provider.get_container_cost()
return master_cost + workers_cost + dataproc_gke_cost
27 changes: 27 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc_gke_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implementation of Job submissions on GCloud Dataproc GKE"""

from dataclasses import dataclass

from spark_rapids_pytools.rapids.rapids_job import RapidsLocalJob


@dataclass
class DataprocGkeLocalRapidsJob(RapidsLocalJob):
"""
Implementation of a RAPIDS job that runs on a local machine.
"""
job_label = 'dataprocLocal'
4 changes: 4 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ def _build_platform_list_cluster(self,
query_args: dict = None) -> list:
raise NotImplementedError

def pull_node_pool_props_by_args(self, args: dict) -> str:
raise NotImplementedError

def exec_platform_list_cluster_instances(self,
cluster,
query_args: dict = None) -> str:
Expand Down Expand Up @@ -1178,6 +1181,7 @@ def get_platform(platform_id: Enum) -> Type[PlatformBase]:
CspEnv.DATABRICKS_AWS: ('spark_rapids_pytools.cloud_api.databricks_aws', 'DBAWSPlatform'),
CspEnv.DATABRICKS_AZURE: ('spark_rapids_pytools.cloud_api.databricks_azure', 'DBAzurePlatform'),
CspEnv.DATAPROC: ('spark_rapids_pytools.cloud_api.dataproc', 'DataprocPlatform'),
CspEnv.DATAPROC_GKE: ('spark_rapids_pytools.cloud_api.dataproc_gke', 'DataprocGkePlatform'),
CspEnv.EMR: ('spark_rapids_pytools.cloud_api.emr', 'EMRPlatform'),
CspEnv.ONPREM: ('spark_rapids_pytools.cloud_api.onprem', 'OnPremPlatform'),
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""providing absolute costs of resources in GCloud DataprocGke"""

from dataclasses import dataclass

from spark_rapids_pytools.pricing.dataproc_pricing import DataprocPriceProvider


@dataclass
class DataprocGkePriceProvider(DataprocPriceProvider):
"""
Provide costs of DataprocGke instances
"""
name = 'DataprocGke'

def get_container_cost(self) -> float:
dataproc_cost = super().get_container_cost()
gke_container_cost = self.__get_gke_container_cost()
return dataproc_cost + gke_container_cost

def __get_gke_container_cost(self) -> float:
lookup_key = 'CP-GKE-CONTAINER-MANAGMENT-COST'
return self.catalogs['gcloud'].get_value(lookup_key, 'us')
2 changes: 2 additions & 0 deletions user_tools/src/spark_rapids_pytools/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from spark_rapids_pytools.wrappers.databricks_aws_wrapper import DBAWSWrapper
from spark_rapids_pytools.wrappers.databricks_azure_wrapper import DBAzureWrapper
from spark_rapids_pytools.wrappers.dataproc_wrapper import DataprocWrapper
from spark_rapids_pytools.wrappers.dataproc_gke_wrapper import DataprocGKEWrapper
from spark_rapids_pytools.wrappers.emr_wrapper import EMRWrapper
from spark_rapids_pytools.wrappers.onprem_wrapper import OnPremWrapper

Expand All @@ -27,6 +28,7 @@ def main():
fire.Fire({
'emr': EMRWrapper,
'dataproc': DataprocWrapper,
'dataproc-gke': DataprocGKEWrapper,
'databricks-aws': DBAWSWrapper,
'databricks-azure': DBAzureWrapper,
'onprem': OnPremWrapper
Expand Down
Loading
Loading