Skip to content

Commit

Permalink
[FEA] Allow users to specify custom Dependency jars (#1395)
Browse files Browse the repository at this point in the history
* Allow users to specify custom Dependency jars

Signed-off-by: Ahmed Hussein <[email protected]>

Fixes #1359

Add a new input argument that takes a path to a yaml file `--tools_config_file`
The config file allows the users to define their own binaries that need
to be added to the classpath of the tools jar cmd.
This change is important because users can use the user-tools wrapper
with their custom spark.

* fix typo in aws configs

Signed-off-by: Ahmed Hussein <[email protected]>

* fix default dependencyType

Signed-off-by: Ahmed Hussein <[email protected]>

* add unit tests for toolsconfig

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

* improve field descriptions

Signed-off-by: Ahmed Hussein <[email protected]>

* address review comments

Signed-off-by: Ahmed Hussein <[email protected]>

---------

Signed-off-by: Ahmed Hussein <[email protected]>
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
  • Loading branch information
amahussein authored Nov 4, 2024
1 parent e1c4742 commit ca6cac6
Show file tree
Hide file tree
Showing 23 changed files with 716 additions and 166 deletions.
74 changes: 48 additions & 26 deletions user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
from spark_rapids_pytools.rapids.rapids_job import RapidsJobPropContainer
from spark_rapids_pytools.rapids.tool_ctxt import ToolContext
from spark_rapids_tools import CspEnv
from spark_rapids_tools.enums import HashAlgorithm
from spark_rapids_tools.configuration.common import RuntimeDependency
from spark_rapids_tools.configuration.tools_config import ToolsConfig
from spark_rapids_tools.enums import DependencyType
from spark_rapids_tools.storagelib import LocalPath, CspFs
from spark_rapids_tools.storagelib.tools.fs_utils import untar_file, FileHashAlgorithm
from spark_rapids_tools.storagelib.tools.fs_utils import untar_file
from spark_rapids_tools.utils import Utilities
from spark_rapids_tools.utils.net_utils import DownloadTask

Expand Down Expand Up @@ -70,6 +72,13 @@ class RapidsTool(object):
logger: Logger = field(default=None, init=False)
spinner: ToolsSpinner = field(default=None, init=False)

def get_tools_config_obj(self) -> Optional['ToolsConfig']:
"""
Get the tools configuration object if provided in the CLI arguments.
:return: An object containing all the tools configuration or None if not provided.
"""
return self.wrapper_options.get('toolsConfig')

def pretty_name(self):
return self.name.capitalize()

Expand Down Expand Up @@ -136,7 +145,7 @@ def _check_environment(self) -> None:

def _process_output_args(self):
self.logger.debug('Processing Output Arguments')
# make sure that output_folder is being absolute
# make sure output_folder is absolute
if self.output_folder is None:
self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd())
try:
Expand Down Expand Up @@ -393,7 +402,8 @@ def _calculate_spark_settings(self, worker_info: NodeHWInfo) -> dict:
return res

@classmethod
def get_rapids_tools_dependencies(cls, deploy_mode: str, json_props: AbstractPropertiesContainer) -> Optional[list]:
def get_rapids_tools_dependencies(cls, deploy_mode: str,
json_props: AbstractPropertiesContainer) -> Optional[list]:
"""
Get the tools dependencies from the platform configuration.
"""
Expand All @@ -403,7 +413,9 @@ def get_rapids_tools_dependencies(cls, deploy_mode: str, json_props: AbstractPro
depend_arr = json_props.get_value_silent('dependencies', 'deployMode', deploy_mode, active_buildver)
if depend_arr is None:
raise ValueError(f'Invalid SPARK dependency version [{active_buildver}]')
return depend_arr
# convert the json array to a list of RuntimeDependency objects
runtime_dep_arr = [RuntimeDependency(**dep) for dep in depend_arr]
return runtime_dep_arr


@dataclass
Expand Down Expand Up @@ -532,47 +544,46 @@ def exception_handler(future):
if exception:
self.logger.error('Error while downloading dependency: %s', exception)

def cache_single_dependency(dep: dict) -> str:
def cache_single_dependency(dep: RuntimeDependency) -> str:
"""
Downloads the specified URL and saves it to disk
"""
self.logger.info('Checking dependency %s', dep['name'])
self.logger.info('Checking dependency %s', dep.name)
dest_folder = self.ctxt.get_cache_folder()
verify_opts = {}
dep_verification = dep.get('verification')
if dep_verification is not None:
if 'size' in dep_verification:
verify_opts['size'] = dep_verification['size']
hash_lib_alg = dep_verification.get('hashLib')
if hash_lib_alg:
verify_opts['file_hash'] = FileHashAlgorithm(HashAlgorithm(hash_lib_alg['type']),
hash_lib_alg['value'])
download_task = DownloadTask(src_url=dep['uri'], # pylint: disable=no-value-for-parameter)
if dep.verification is not None:
verify_opts = dict(dep.verification)
download_task = DownloadTask(src_url=dep.uri, # pylint: disable=no-value-for-parameter)
dest_folder=dest_folder,
verification=verify_opts)
download_result = download_task.run_task()
self.logger.info('Completed downloading of dependency [%s] => %s',
dep['name'],
dep.name,
f'{download_result.pretty_print()}')
if not download_result.success:
msg = f'Failed to download dependency {dep["name"]}, reason: {download_result.download_error}'
msg = f'Failed to download dependency {dep.name}, reason: {download_result.download_error}'
raise RuntimeError(f'Could not download all dependencies. Aborting Executions.\n\t{msg}')
destination_path = self.ctxt.get_local_work_dir()
destination_cspath = LocalPath(destination_path)
if dep['type'] == 'archive':
# set the default dependency type to jar
defined_dep_type = DependencyType.get_default()
if dep.dependency_type:
defined_dep_type = dep.dependency_type.dep_type
if defined_dep_type == DependencyType.ARCHIVE:
uncompressed_cspath = untar_file(download_result.resource, destination_cspath)
dep_item = uncompressed_cspath.no_scheme
relative_path = dep.get('relativePath')
if relative_path is not None:
dep_item = f'{dep_item}/{relative_path}'
else:
if dep.dependency_type.relative_path is not None:
dep_item = f'{dep_item}/{dep.dependency_type.relative_path}'
elif defined_dep_type == DependencyType.JAR:
# copy the jar into dependency folder
CspFs.copy_resources(download_result.resource, destination_cspath)
final_dep_csp = destination_cspath.create_sub_path(download_result.resource.base_name())
dep_item = final_dep_csp.no_scheme
else:
raise ValueError(f'Invalid dependency type [{defined_dep_type}]')
return dep_item

def cache_all_dependencies(dep_arr: List[dict]):
def cache_all_dependencies(dep_arr: List[RuntimeDependency]) -> List[str]:
"""
Create a thread pool and download specified urls
"""
Expand All @@ -593,8 +604,19 @@ def cache_all_dependencies(dep_arr: List[dict]):
raise ex
return results

deploy_mode = DeployMode.tostring(self.ctxt.get_deploy_mode())
depend_arr = self.get_rapids_tools_dependencies(deploy_mode, self.ctxt.platform.configs)
def populate_dependency_list() -> List[RuntimeDependency]:
# check if the dependencies is defined in a config file
config_obj = self.get_tools_config_obj()
if config_obj is not None:
if config_obj.runtime.dependencies:
return config_obj.runtime.dependencies
self.logger.info('The ToolsConfig did not specify the dependencies. '
'Falling back to the default dependencies.')
# load dependency list from the platform configuration
deploy_mode = DeployMode.tostring(self.ctxt.get_deploy_mode())
return self.get_rapids_tools_dependencies(deploy_mode, self.ctxt.platform.configs)

depend_arr = populate_dependency_list()
if depend_arr:
dep_list = cache_all_dependencies(depend_arr)
if any(dep_item is None for dep_item in dep_list):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
},
"size": 400395283
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop AWS",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a65839fbf1869f81a1632e09f415e586922e4f80"
},
"size": 962685
Expand All @@ -33,8 +35,8 @@
"name": "AWS Java SDK Bundled",
"uri": "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "02deec3a0ad83d13d032b1812421b23d7a961eea"
},
"size": 280645251
Expand All @@ -47,38 +49,38 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "ebf79c7861f3120d5ed9465fdd8d5302a734ff30713a0454b714bbded7ab9f218b3108dc46a5de4cc2102c86e7be53908f84d2c7a19e59bc75880766eeefeef9"
},
"size": 299426263
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop AWS",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a65839fbf1869f81a1632e09f415e586922e4f80"
},
"size": 962685
},
"type": "jar"
}
},
{
"name": "AWS Java SDK Bundled",
"uri": "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "02deec3a0ad83d13d032b1812421b23d7a961eea"
},
"size": 280645251
},
"type": "jar"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,55 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
},
"size": 400395283
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop Azure",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a23f621bca9b2100554150f6b0b521f94b8b419e"
},
"size": 574116
},
"type": "jar"
}
}
],
"333": [
{
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "ebf79c7861f3120d5ed9465fdd8d5302a734ff30713a0454b714bbded7ab9f218b3108dc46a5de4cc2102c86e7be53908f84d2c7a19e59bc75880766eeefeef9"
},
"size": 299426263
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "Hadoop Azure",
"uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "a23f621bca9b2100554150f6b0b521f94b8b419e"
},
"size": 574116
},
"type": "jar"
}
}
]
}
Expand Down
34 changes: 18 additions & 16 deletions user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,55 @@
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "8883c67e0a138069e597f3e7d4edbbd5c3a565d50b28644aad02856a1ec1da7cb92b8f80454ca427118f69459ea326eaa073cf7b1a860c3b796f4b07c2101319"
},
"size": 400395283
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "GCS Connector Hadoop3",
"uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.19/gcs-connector-hadoop3-2.2.19-shaded.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "3bea6d5e62663a2a5c03d8ca44dff4921aeb3170"
},
"size": 39359477
},
"type": "jar"
}
}
],
"333": [
{
"name": "Apache Spark",
"uri": "https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz",
"verification": {
"hashLib": {
"type": "sha512",
"fileHash": {
"algorithm": "sha512",
"value": "ebf79c7861f3120d5ed9465fdd8d5302a734ff30713a0454b714bbded7ab9f218b3108dc46a5de4cc2102c86e7be53908f84d2c7a19e59bc75880766eeefeef9"
},
"size": 299426263
},
"type": "archive",
"relativePath": "jars/*"
"dependencyType": {
"depType": "archive",
"relativePath": "jars/*"
}
},
{
"name": "GCS Connector Hadoop3",
"uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.17/gcs-connector-hadoop3-2.2.17-shaded.jar",
"verification": {
"hashLib": {
"type": "sha1",
"fileHash": {
"algorithm": "sha1",
"value": "06438f562692ff8fae5e8555eba2b9f95cb74f66"
},
"size": 38413466
},
"type": "jar"
}
}
]
}
Expand Down
Loading

0 comments on commit ca6cac6

Please sign in to comment.