From d1c6edbb3fbf8ccb9b665ffec3f5b6185676def0 Mon Sep 17 00:00:00 2001 From: Edmund Higham Date: Thu, 30 Jan 2025 13:28:51 -0500 Subject: [PATCH] [hailctl] batch submit fixes CHANGELOG: Fix many issues, including (hail#14274), with hailctl batch submit introduced in 0.2.127. Fixes #14274, Replaces #14351 (authored by @jigold) --- build.yaml | 20 +- hail/python/hailtop/hailctl/batch/cli.py | 33 ++- hail/python/hailtop/hailctl/batch/submit.py | 245 ++++++++++++------ .../test/hailtop/hailctl/batch/__init__.py | 0 .../test/hailtop/hailctl/batch/test_submit.py | 154 +++++++++++ 5 files changed, 357 insertions(+), 95 deletions(-) create mode 100644 hail/python/test/hailtop/hailctl/batch/__init__.py create mode 100644 hail/python/test/hailtop/hailctl/batch/test_submit.py diff --git a/build.yaml b/build.yaml index 327582ddff9..6852b96499e 100644 --- a/build.yaml +++ b/build.yaml @@ -3060,9 +3060,7 @@ steps: BATCH_ID=$(hailctl batch submit simple_hail.py --name=test-hailctl-batch-submit --files=foo -o json | jq '.id') STATUS=$(hailctl batch wait -o json $BATCH_ID) STATE=$(echo $STATUS | jq -jr '.state') - if [ "$STATE" == "success" ]; then - exit 0; - else + if [ "$STATE" != "success" ]; then echo $STATUS; exit 1; fi @@ -3081,21 +3079,19 @@ steps: BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json hail_with_args.py 100 | jq '.id') STATUS=$(hailctl batch wait -o json $BATCH_ID) STATE=$(echo $STATUS | jq -jr '.state') - if [ "$STATE" == "success" ]; then - exit 0; - else + if [ "$STATE" != "success" ]; then echo $STATUS; exit 1; fi - cat >file.sh < file.sh << 'EOF' set -ex - cat foo + cat foo/baz.txt echo "Hello World!" EOF - BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json file.sh | jq '.id') + BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json --image-name ubuntu:22.04 file.sh | jq '.id') STATUS=$(hailctl batch wait -o json $BATCH_ID) STATE=$(echo $STATUS | jq -jr '.state') if [ "$STATE" == "success" ]; then @@ -3114,12 +3110,10 @@ steps: echo "Hello World! $1 $2" EOF - BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json file-with-args.sh abc 123 | jq '.id') + BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json --image-name ubuntu:22.04 file-with-args.sh abc 123 | jq '.id') STATUS=$(hailctl batch wait -o json $BATCH_ID) STATE=$(echo $STATUS | jq -jr '.state') - if [ "$STATE" == "success" ]; then - exit 0; - else + if [ "$STATE" != "success" ]; then echo $STATUS; exit 1; fi diff --git a/hail/python/hailtop/hailctl/batch/cli.py b/hail/python/hailtop/hailctl/batch/cli.py index c10650f02f1..55a0188b504 100644 --- a/hail/python/hailtop/hailctl/batch/cli.py +++ b/hail/python/hailtop/hailctl/batch/cli.py @@ -1,15 +1,14 @@ import asyncio -import json from enum import Enum from typing import Annotated as Ann from typing import Any, Dict, List, Optional, cast +import orjson import typer from typer import Argument as Arg from typer import Option as Opt from . import billing, list_batches -from . import submit as _submit from .batch_cli_utils import ( ExtendedOutputFormat, ExtendedOutputFormatOption, @@ -131,7 +130,7 @@ def wait( quiet = quiet or output != StructuredFormatPlusText.TEXT out = batch.wait(disable_progress_bar=quiet) if output == StructuredFormatPlusText.JSON: - print(json.dumps(out)) + print(orjson.dumps(out).decode('utf-8')) else: print(out) @@ -168,16 +167,40 @@ def submit( name: Ann[str, Opt(help='The name of the batch.')] = '', image_name: Ann[Optional[str], Opt(help='Name of Docker image for the job (default: hailgenetics/hail)')] = None, output: StructuredFormatPlusTextOption = StructuredFormatPlusText.TEXT, + wait: Ann[bool, Opt(help='Wait for the batch to complete.')] = False, ): """Submit a batch with a single job that runs SCRIPT with the arguments ARGUMENTS. If you wish to pass option-like arguments you should use "--". For example: + $ hailctl batch submit --image-name docker.io/image my_script.py -- some-argument --animal dog + Copy a local file into the working directory of the job: + $ hailctl batch submit --image-name docker.io/image my_script.py --files a-file -- some-argument --animal dog + Copy a local file into a particular directory in the job: + $ hailctl batch submit --image-name docker.io/image my_script.py --files a-file:/foo/bar/ -- some-argument --animal dog - $ hailctl batch submit --image-name docker.io/image my_script.py -- some-argument --animal dog + Copy a local directory to the directory /foo/bar/a-directory in the job: + $ hailctl batch submit --image-name docker.io/image my_script.py --files a-directory:/foo/bar/ -- some-argument --animal dog + + Copy a local file or a directory to a specific location in the job: + $ hailctl batch submit --image-name docker.io/image my_script.py --files a/local/path:/foo/bar -- some-argument --animal dog + + Copy a local directory to a specific location in the job: + $ hailctl batch submit --image-name docker.io/image my_script.py --files a-file:/foo/bar -- some-argument --animal dog + + Notes + ----- + Copying a local directory to the root directory in the job is not supported (example: ``--files my-local-dir/:/``). """ - asyncio.run(_submit.submit(name, image_name, files or [], output, script, [*(arguments or []), *ctx.args])) + from .submit import HailctlBatchSubmitError # pylint: disable=import-outside-toplevel + from .submit import submit as _submit # pylint: disable=import-outside-toplevel + + try: + asyncio.run(_submit(name, image_name, files or [], output, script, [*(arguments or []), *ctx.args], wait)) + except HailctlBatchSubmitError as err: + print(err.message) + raise typer.Exit(err.exit_code) @app.command('init', help='Initialize a Hail Batch environment.') diff --git a/hail/python/hailtop/hailctl/batch/submit.py b/hail/python/hailtop/hailctl/batch/submit.py index 234ec62ac52..fba990a2d76 100644 --- a/hail/python/hailtop/hailctl/batch/submit.py +++ b/hail/python/hailtop/hailctl/batch/submit.py @@ -1,108 +1,199 @@ import os import re +from contextlib import AsyncExitStack from shlex import quote as shq -from typing import Tuple +from typing import List, Optional, Tuple import orjson +import typer + +import hailtop.batch as hb +from hailtop import yamlx +from hailtop.aiotools.copy import copy_from_dict +from hailtop.aiotools.fs import AsyncFSURL +from hailtop.aiotools.router_fs import RouterAsyncFS +from hailtop.batch.job import BashJob +from hailtop.config import ( + get_deploy_config, + get_remote_tmpdir, + get_user_config_path, +) +from hailtop.utils import ( + secret_alnum_string, + unpack_comma_delimited_inputs, +) +from hailtop.version import __pip_version__ + +from .batch_cli_utils import StructuredFormatPlusTextOption + + +def real_absolute_expanded_path(path: str) -> Tuple[str, bool]: + had_trailing_slash = path[-1] == '/' # NB: realpath removes trailing slash + return os.path.realpath(os.path.abspath(os.path.expanduser(path))), had_trailing_slash + + +def real_absolute_cwd() -> str: + return real_absolute_expanded_path(os.getcwd())[0] + + +class HailctlBatchSubmitError(Exception): + def __init__(self, message: str, exit_code: int): + self.message = message + self.exit_code = exit_code + + +async def submit( + name: str, + image_name: Optional[str], + files_options: List[str], + output: StructuredFormatPlusTextOption, + script: str, + arguments: List[str], + wait: bool, +): + files_options = unpack_comma_delimited_inputs(files_options) -from hailtop import __pip_version__ + quiet = output != 'text' -FILE_REGEX = re.compile(r'(?P[^:]+)(:(?P.+))?') + async with AsyncExitStack() as exitstack: + fs = RouterAsyncFS() + exitstack.push_async_callback(fs.close) + remote_tmpdir = fs.parse_url(get_remote_tmpdir('hailctl batch submit')).with_new_path_component( + secret_alnum_string() + ) -async def submit(name, image_name, files, output, script, arguments): - import hailtop.batch as hb # pylint: disable=import-outside-toplevel - from hailtop.aiotools.copy import copy_from_dict # pylint: disable=import-outside-toplevel - from hailtop.config import ( # pylint: disable=import-outside-toplevel - get_deploy_config, - get_remote_tmpdir, - get_user_config_path, - ) - from hailtop.utils import ( # pylint: disable=import-outside-toplevel - secret_alnum_string, - unpack_comma_delimited_inputs, - ) + backend = hb.ServiceBackend() + exitstack.push_async_callback(backend._async_close) - files = unpack_comma_delimited_inputs(files) - user_config = str(get_user_config_path()) + b = hb.Batch(name=name, backend=backend) + j = b.new_bash_job() + j.image(image_name or os.environ.get('HAIL_GENETICS_HAIL_IMAGE', f'hailgenetics/hail:{__pip_version__}')) + j.env('HAIL_QUERY_BACKEND', 'batch') - quiet = output != 'text' + await transfer_files_options_files_into_job(remote_tmpdir, files_options, j, b) - remote_tmpdir = get_remote_tmpdir('hailctl batch submit') + script_cloud_file, user_config_cloud_file = await upload_script_and_user_config(remote_tmpdir, script) + if user_config_cloud_file is not None: + config_file = b.read_input(user_config_cloud_file) + j.command('mkdir -p $HOME/.config/hail') + j.command(f'ln -s {shq(config_file)} $HOME/.config/hail/config.ini') - tmpdir_path_prefix = secret_alnum_string() + real_cwd = real_absolute_cwd() + j.command(f'mkdir -p {shq(real_cwd)}') + j.command(f'cd {shq(real_cwd)}') - def cloud_prefix(path): - path = path.lstrip('/') - return f'{remote_tmpdir}/{tmpdir_path_prefix}/{path}' + command = 'python3' if script.endswith('.py') else 'bash' + script_file = b.read_input(script_cloud_file) + script_arguments = " ".join(shq(x) for x in arguments) + j.command(f'{command} {script_file} {script_arguments}') - def file_input_to_src_dest(file: str) -> Tuple[str, str, str]: - match = FILE_REGEX.match(file) - if match is None: - raise ValueError(f'invalid file specification {file}. Must have the form "src" or "src:dest"') + batch_handle = await b._async_run(wait=False, disable_progress_bar=quiet) + assert batch_handle - result = match.groupdict() + if output == 'text': + deploy_config = get_deploy_config() + url = deploy_config.external_url('batch', f'/batches/{batch_handle.id}/jobs/1') + print(f'Submitted batch {batch_handle.id}, see {url}') + else: + assert output == 'json' + print(orjson.dumps({'id': batch_handle.id}).decode('utf-8')) - src = result.get('src') - if src is None: - raise ValueError(f'invalid file specification {file}. Must have a "src" defined.') - src = os.path.abspath(os.path.expanduser(src)) - src = src.rstrip('/') + if wait: + out = batch_handle.wait(disable_progress_bar=quiet) + try: + out['log'] = batch_handle.get_job_log(1)['main'] + except: + out['log'] = 'Could not retrieve job log.' + if output == 'text': + print(yamlx.dump(out)) + else: + print(orjson.dumps(out)) + if out['state'] != 'success': + raise typer.Exit(1) - dest = result.get('dest') - if dest is not None: - dest = os.path.abspath(os.path.expanduser(dest)) - else: - dest = os.getcwd() - cloud_file = cloud_prefix(src) +def cloud_prefix(remote_tmpdir: AsyncFSURL, path: str) -> str: + path = path.lstrip('/') + return str(remote_tmpdir.with_new_path_component(path)) - return (src, dest, cloud_file) - backend = hb.ServiceBackend() - b = hb.Batch(name=name, backend=backend) - j = b.new_bash_job() - j.image(image_name or os.environ.get('HAIL_GENETICS_HAIL_IMAGE', f'hailgenetics/hail:{__pip_version__}')) +FILE_REGEX = re.compile(r'([^:]+)(?::(.+))?') - local_files_to_cloud_files = [] - for file in files: - src, dest, cloud_file = file_input_to_src_dest(file) - local_files_to_cloud_files.append({'from': src, 'to': cloud_file}) - in_file = b.read_input(cloud_file) - j.command(f'mkdir -p {os.path.dirname(dest)}; ln -s {in_file} {dest}') +def parse_files_option_to_src_dest_and_cloud_intermediate(remote_tmpdir: AsyncFSURL, file: str) -> Tuple[str, str, str]: + match = FILE_REGEX.match(file) + if match is None: + raise ValueError(f'invalid file specification {file}. Must have the form "src" or "src:dest"') - script_src, _, script_cloud_file = file_input_to_src_dest(script) - user_config_src, _, user_config_cloud_file = file_input_to_src_dest(user_config) + src, dest = match.groups() - await copy_from_dict(files=local_files_to_cloud_files) - await copy_from_dict( - files=[ - {'from': script_src, 'to': script_cloud_file}, - {'from': user_config_src, 'to': user_config_cloud_file}, - ] - ) + if src is None: + raise ValueError(f'invalid file specification {file}. Must have a "src" defined.') - script_file = b.read_input(script_cloud_file) - config_file = b.read_input(user_config_cloud_file) + src, src_looks_like_directory = real_absolute_expanded_path(src) - j.env('HAIL_QUERY_BACKEND', 'batch') + if dest is None: + dest = os.path.join(real_absolute_cwd(), os.path.basename(src)) + else: + dest, dest_looks_like_directory = real_absolute_expanded_path(dest) - command = 'python3' if script.endswith('.py') else 'bash' - script_arguments = " ".join(shq(x) for x in arguments) + if src_looks_like_directory and dest == '/': + raise ValueError('cannot mount a directory to "/"') - j.command(f'mkdir -p $HOME/.config/hail && ln -s {config_file} $HOME/.config/hail/config.ini') - j.command(f'cd {os.getcwd()}') - j.command(f'{command} {script_file} {script_arguments}') - batch_handle = await b._async_run(wait=False, disable_progress_bar=quiet) - assert batch_handle + if not src_looks_like_directory and dest_looks_like_directory: + dest = os.path.join(dest, os.path.basename(src)) - if output == 'text': - deploy_config = get_deploy_config() - url = deploy_config.external_url('batch', f'/batches/{batch_handle.id}/jobs/1') - print(f'Submitted batch {batch_handle.id}, see {url}') - else: - assert output == 'json' - print(orjson.dumps({'id': batch_handle.id}).decode('utf-8')) + return (src, dest, cloud_prefix(remote_tmpdir, src)) + + +async def transfer_files_options_files_into_job( + remote_tmpdir: AsyncFSURL, files_options: List[str], j: BashJob, b: hb.Batch +): + src_dst_cloud_intermediate_triplets = [ + parse_files_option_to_src_dest_and_cloud_intermediate(remote_tmpdir, files_option) + for files_option in files_options + ] + + if non_existing_files := [src for src, _, _ in src_dst_cloud_intermediate_triplets if not os.path.exists(src)]: + non_existing_files_str = '- ' + '\n- '.join(non_existing_files) + raise HailctlBatchSubmitError(f'Some --files did not exist:\n{non_existing_files_str}', 1) + + await copy_from_dict( + files=[ + {'from': src, 'to': cloud_intermediate} + for src, _, cloud_intermediate in src_dst_cloud_intermediate_triplets + ] + ) - await backend.async_close() + for _, dest, cloud_intermediate in src_dst_cloud_intermediate_triplets: + in_file = b.read_input(cloud_intermediate) + j.command(f'mkdir -p {shq(os.path.dirname(dest))}; ln -s {shq(in_file)} {shq(dest)}') + + +async def upload_script_and_user_config(remote_tmpdir: AsyncFSURL, script: str): + if not os.path.exists(script): + raise HailctlBatchSubmitError(f'Script file does not exist: {script}', 1) + script_src, _, script_cloud_file = parse_files_option_to_src_dest_and_cloud_intermediate(remote_tmpdir, script) + + extra_files_to_copy = [ + # In Azure, two concurrent uploads to the same blob path race until one succeeds. The + # other fails with an error that is indistinguishable from a client-side logic + # error. We could treat "Invalid Block List" as a limited retry error, but, currently, + # multi-part-create does not retry when an error occurs in the `__aexit__` which is + # when we would experience the error for multi-part-creates. + # https://azure.github.io/Storage/docs/application-and-user-data/code-samples/concurrent-uploads-with-versioning/ + # https://github.com/hail-is/hail/pull/13812#issuecomment-1882088862 + {'from': script_src, 'to': script_cloud_file}, + ] + + user_config_path = str(get_user_config_path()) + user_config_cloud_file = None + if os.path.exists(user_config_path): + user_config_src, _, user_config_cloud_file = parse_files_option_to_src_dest_and_cloud_intermediate( + remote_tmpdir, user_config_path + ) + extra_files_to_copy.append({'from': user_config_src, 'to': user_config_cloud_file}) + + await copy_from_dict(files=extra_files_to_copy) + return script_cloud_file, user_config_cloud_file diff --git a/hail/python/test/hailtop/hailctl/batch/__init__.py b/hail/python/test/hailtop/hailctl/batch/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hail/python/test/hailtop/hailctl/batch/test_submit.py b/hail/python/test/hailtop/hailctl/batch/test_submit.py new file mode 100644 index 00000000000..e5a148f7b41 --- /dev/null +++ b/hail/python/test/hailtop/hailctl/batch/test_submit.py @@ -0,0 +1,154 @@ +import os +import tempfile + +import pytest +from typer.testing import CliRunner + +from hailtop.hailctl.batch import cli +from hailtop.hailctl.batch.submit import real_absolute_expanded_path + + +@pytest.fixture +def runner(): + yield CliRunner(mix_stderr=False) + + +def write_script(dir: str, filename: str): + with open(f'{dir}/test_job.py', 'w') as f: + file, _ = real_absolute_expanded_path(filename) + f.write(f""" +print(open("{file}").read()) +""") + + +def write_hello(filename: str): + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'w') as f: + f.write('hello\n') + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_file_with_no_dest(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.chdir(dir) + write_hello(f'{dir}/hello.txt') + write_script(dir, f'{dir}/hello.txt') + res = runner.invoke( + cli.app, ['submit', '--wait', '--files', 'hello.txt', 'test_job.py'], catch_exceptions=False + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_file_in_current_dir(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.chdir(dir) + write_hello(f'{dir}/hello.txt') + write_script(dir, f'/hello.txt') + res = runner.invoke( + cli.app, ['submit', '--wait', '--files', 'hello.txt:/', 'test_job.py'], catch_exceptions=False + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_file_mount_in_child_dir(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.chdir(dir) + write_hello(f'{dir}/hello.txt') + write_script(dir, '/child/hello.txt') + res = runner.invoke( + cli.app, ['submit', '--wait', '--files', 'hello.txt:/child/', 'test_job.py'], catch_exceptions=False + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_file_mount_in_child_dir_to_root_dir(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.chdir(dir) + write_hello(f'{dir}/child/hello.txt') + write_script(dir, '/hello.txt') + res = runner.invoke( + cli.app, ['submit', '--wait', '--files', 'child/hello.txt:/', 'test_job.py'], catch_exceptions=False + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_mount_multiple_files(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.chdir(dir) + write_hello(f'{dir}/child/hello1.txt') + write_hello(f'{dir}/child/hello2.txt') + write_script(dir, '/hello1.txt') + res = runner.invoke( + cli.app, + [ + 'submit', + '--wait', + '--files', + 'child/hello1.txt:/', + '--files', + 'child/hello2.txt:/', + 'test_job.py', + ], + catch_exceptions=False, + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_dir_mount_in_child_dir_to_child_dir(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.chdir(dir) + write_hello(f'{dir}/child/hello1.txt') + write_hello(f'{dir}/child/hello2.txt') + write_script(dir, '/child/hello1.txt') + res = runner.invoke( + cli.app, ['submit', '--wait', '--files', 'child/:/child/', 'test_job.py'], catch_exceptions=False + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_file_outside_curdir(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.mkdir(f'{dir}/working_dir') + os.chdir(f'{dir}/working_dir') + write_hello(f'{dir}/hello.txt') + write_script(dir, '/hello.txt') + res = runner.invoke( + cli.app, ['submit', '--wait', '--files', f'{dir}/hello.txt:/', '../test_job.py'], catch_exceptions=False + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_dir_outside_curdir(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + os.mkdir(f'{dir}/working_dir') + os.chdir(f'{dir}/working_dir') + write_hello(f'{dir}/hello1.txt') + write_hello(f'{dir}/hello2.txt') + + write_script(dir, f'/foo/hello1.txt') + + res = runner.invoke( + cli.app, ['submit', '--wait', '--files', f'{dir}/:/foo/', '../test_job.py'], catch_exceptions=False + ) + assert res.exit_code == 0, repr((res.output, res.stdout, res.stderr, res.exception)) + + +@pytest.mark.timeout(5 * 60) # image pulling is very slow +def test_mounting_dir_to_root_dir_fails(runner: CliRunner): + with tempfile.TemporaryDirectory() as dir: + write_hello(f'{dir}/hello1.txt') + write_hello(f'{dir}/hello2.txt') + + write_script(dir, '/hello1.txt') + + with pytest.raises(ValueError, match='cannot mount a directory to "/"'): + runner.invoke( + cli.app, ['submit', '--wait', '--files', f'{dir}/:/', '../test_job.py'], catch_exceptions=False + )