Skip to content

Commit

Permalink
[hailctl] batch submit fixes
Browse files Browse the repository at this point in the history
CHANGELOG: Fix many issues, including (hail#14274), with hailctl batch submit introduced in 0.2.127.
Fixes hail-is#14274, Replaces hail-is#14351 (authored by @jigold)
  • Loading branch information
ehigham committed Jan 30, 2025
1 parent 5d9c642 commit a2a376e
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 95 deletions.
20 changes: 7 additions & 13 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3060,9 +3060,7 @@ steps:
BATCH_ID=$(hailctl batch submit simple_hail.py --name=test-hailctl-batch-submit --files=foo -o json | jq '.id')
STATUS=$(hailctl batch wait -o json $BATCH_ID)
STATE=$(echo $STATUS | jq -jr '.state')
if [ "$STATE" == "success" ]; then
exit 0;
else
if [ "$STATE" != "success" ]; then
echo $STATUS;
exit 1;
fi
Expand All @@ -3081,21 +3079,19 @@ steps:
BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json hail_with_args.py 100 | jq '.id')
STATUS=$(hailctl batch wait -o json $BATCH_ID)
STATE=$(echo $STATUS | jq -jr '.state')
if [ "$STATE" == "success" ]; then
exit 0;
else
if [ "$STATE" != "success" ]; then
echo $STATUS;
exit 1;
fi
cat >file.sh <<EOF
cat > file.sh << 'EOF'
set -ex
cat foo
cat foo/baz.txt
echo "Hello World!"
EOF
BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json file.sh | jq '.id')
BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json --image-name ubuntu:22.04 file.sh | jq '.id')
STATUS=$(hailctl batch wait -o json $BATCH_ID)
STATE=$(echo $STATUS | jq -jr '.state')
if [ "$STATE" == "success" ]; then
Expand All @@ -3114,12 +3110,10 @@ steps:
echo "Hello World! $1 $2"
EOF
BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json file-with-args.sh abc 123 | jq '.id')
BATCH_ID=$(hailctl batch submit --name=test-hailctl-batch-submit --files=foo -o json --image-name ubuntu:22.04 file-with-args.sh abc 123 | jq '.id')
STATUS=$(hailctl batch wait -o json $BATCH_ID)
STATE=$(echo $STATUS | jq -jr '.state')
if [ "$STATE" == "success" ]; then
exit 0;
else
if [ "$STATE" != "success" ]; then
echo $STATUS;
exit 1;
fi
Expand Down
33 changes: 28 additions & 5 deletions hail/python/hailtop/hailctl/batch/cli.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import asyncio
import json
from enum import Enum
from typing import Annotated as Ann
from typing import Any, Dict, List, Optional, cast

import orjson
import typer
from typer import Argument as Arg
from typer import Option as Opt

from . import billing, list_batches
from . import submit as _submit
from .batch_cli_utils import (
ExtendedOutputFormat,
ExtendedOutputFormatOption,
Expand Down Expand Up @@ -131,7 +130,7 @@ def wait(
quiet = quiet or output != StructuredFormatPlusText.TEXT
out = batch.wait(disable_progress_bar=quiet)
if output == StructuredFormatPlusText.JSON:
print(json.dumps(out))
print(orjson.dumps(out).decode('utf-8'))
else:
print(out)

Expand Down Expand Up @@ -168,16 +167,40 @@ def submit(
name: Ann[str, Opt(help='The name of the batch.')] = '',
image_name: Ann[Optional[str], Opt(help='Name of Docker image for the job (default: hailgenetics/hail)')] = None,
output: StructuredFormatPlusTextOption = StructuredFormatPlusText.TEXT,
wait: Ann[bool, Opt(help='Wait for the batch to complete.')] = False,
):
"""Submit a batch with a single job that runs SCRIPT with the arguments ARGUMENTS.
If you wish to pass option-like arguments you should use "--". For example:
$ hailctl batch submit --image-name docker.io/image my_script.py -- some-argument --animal dog
Copy a local file into the working directory of the job:
$ hailctl batch submit --image-name docker.io/image my_script.py --files a-file -- some-argument --animal dog
Copy a local file into a particular directory in the job:
$ hailctl batch submit --image-name docker.io/image my_script.py --files a-file:/foo/bar/ -- some-argument --animal dog
$ hailctl batch submit --image-name docker.io/image my_script.py -- some-argument --animal dog
Copy a local directory to the directory /foo/bar/a-directory in the job:
$ hailctl batch submit --image-name docker.io/image my_script.py --files a-directory:/foo/bar/ -- some-argument --animal dog
Copy a local file or a directory to a specific location in the job:
$ hailctl batch submit --image-name docker.io/image my_script.py --files a/local/path:/foo/bar -- some-argument --animal dog
Copy a local directory to a specific location in the job:
$ hailctl batch submit --image-name docker.io/image my_script.py --files a-file:/foo/bar -- some-argument --animal dog
Notes
-----
Copying a local directory to the root directory in the job is not supported (example: ``--files my-local-dir/:/``).
"""
asyncio.run(_submit.submit(name, image_name, files or [], output, script, [*(arguments or []), *ctx.args]))
from .submit import HailctlBatchSubmitError # pylint: disable=import-outside-toplevel
from .submit import submit as _submit # pylint: disable=import-outside-toplevel

try:
asyncio.run(_submit(name, image_name, files or [], output, script, [*(arguments or []), *ctx.args], wait))
except HailctlBatchSubmitError as err:
print(err.message)
raise typer.Exit(err.exit_code)


@app.command('init', help='Initialize a Hail Batch environment.')
Expand Down
245 changes: 168 additions & 77 deletions hail/python/hailtop/hailctl/batch/submit.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,199 @@
import os
import re
from contextlib import AsyncExitStack
from shlex import quote as shq
from typing import Tuple
from typing import List, Optional, Tuple

import orjson
import typer

import hailtop.batch as hb
from hailtop import yamlx
from hailtop.aiotools.copy import copy_from_dict
from hailtop.aiotools.fs import AsyncFSURL
from hailtop.aiotools.router_fs import RouterAsyncFS
from hailtop.batch.job import BashJob
from hailtop.config import (
get_deploy_config,
get_remote_tmpdir,
get_user_config_path,
)
from hailtop.utils import (
secret_alnum_string,
unpack_comma_delimited_inputs,
)
from hailtop.version import __pip_version__

from .batch_cli_utils import StructuredFormatPlusTextOption


def real_absolute_expanded_path(path: str) -> Tuple[str, bool]:
had_trailing_slash = path[-1] == '/' # NB: realpath removes trailing slash
return os.path.realpath(os.path.abspath(os.path.expanduser(path))), had_trailing_slash


def real_absolute_cwd() -> str:
return real_absolute_expanded_path(os.getcwd())[0]


class HailctlBatchSubmitError(Exception):
def __init__(self, message: str, exit_code: int):
self.message = message
self.exit_code = exit_code


async def submit(
name: str,
image_name: Optional[str],
files_options: List[str],
output: StructuredFormatPlusTextOption,
script: str,
arguments: List[str],
wait: bool,
):
files_options = unpack_comma_delimited_inputs(files_options)

from hailtop import __pip_version__
quiet = output != 'text'

FILE_REGEX = re.compile(r'(?P<src>[^:]+)(:(?P<dest>.+))?')
async with AsyncExitStack() as exitstack:
fs = RouterAsyncFS()
exitstack.push_async_callback(fs.close)

remote_tmpdir = fs.parse_url(get_remote_tmpdir('hailctl batch submit')).with_new_path_component(
secret_alnum_string()
)

async def submit(name, image_name, files, output, script, arguments):
import hailtop.batch as hb # pylint: disable=import-outside-toplevel
from hailtop.aiotools.copy import copy_from_dict # pylint: disable=import-outside-toplevel
from hailtop.config import ( # pylint: disable=import-outside-toplevel
get_deploy_config,
get_remote_tmpdir,
get_user_config_path,
)
from hailtop.utils import ( # pylint: disable=import-outside-toplevel
secret_alnum_string,
unpack_comma_delimited_inputs,
)
backend = hb.ServiceBackend()
exitstack.push_async_callback(backend._async_close)

files = unpack_comma_delimited_inputs(files)
user_config = str(get_user_config_path())
b = hb.Batch(name=name, backend=backend)
j = b.new_bash_job()
j.image(image_name or os.environ.get('HAIL_GENETICS_HAIL_IMAGE', f'hailgenetics/hail:{__pip_version__}'))
j.env('HAIL_QUERY_BACKEND', 'batch')

quiet = output != 'text'
await transfer_files_options_files_into_job(remote_tmpdir, files_options, j, b)

remote_tmpdir = get_remote_tmpdir('hailctl batch submit')
script_cloud_file, user_config_cloud_file = await upload_script_and_user_config(remote_tmpdir, script)
if user_config_cloud_file is not None:
config_file = b.read_input(user_config_cloud_file)
j.command('mkdir -p $HOME/.config/hail')
j.command(f'ln -s {shq(config_file)} $HOME/.config/hail/config.ini')

tmpdir_path_prefix = secret_alnum_string()
real_cwd = real_absolute_cwd()
j.command(f'mkdir -p {shq(real_cwd)}')
j.command(f'cd {shq(real_cwd)}')

def cloud_prefix(path):
path = path.lstrip('/')
return f'{remote_tmpdir}/{tmpdir_path_prefix}/{path}'
command = 'python3' if script.endswith('.py') else 'bash'
script_file = b.read_input(script_cloud_file)
script_arguments = " ".join(shq(x) for x in arguments)
j.command(f'{command} {script_file} {script_arguments}')

def file_input_to_src_dest(file: str) -> Tuple[str, str, str]:
match = FILE_REGEX.match(file)
if match is None:
raise ValueError(f'invalid file specification {file}. Must have the form "src" or "src:dest"')
batch_handle = await b._async_run(wait=False, disable_progress_bar=quiet)
assert batch_handle

result = match.groupdict()
if output == 'text':
deploy_config = get_deploy_config()
url = deploy_config.external_url('batch', f'/batches/{batch_handle.id}/jobs/1')
print(f'Submitted batch {batch_handle.id}, see {url}')
else:
assert output == 'json'
print(orjson.dumps({'id': batch_handle.id}).decode('utf-8'))

src = result.get('src')
if src is None:
raise ValueError(f'invalid file specification {file}. Must have a "src" defined.')
src = os.path.abspath(os.path.expanduser(src))
src = src.rstrip('/')
if wait:
out = batch_handle.wait(disable_progress_bar=quiet)
try:
out['log'] = batch_handle.get_job_log(1)['main']
except:
out['log'] = 'Could not retrieve job log.'
if output == 'text':
print(yamlx.dump(out))
else:
print(orjson.dumps(out))
if out['state'] != 'success':
raise typer.Exit(1)

dest = result.get('dest')
if dest is not None:
dest = os.path.abspath(os.path.expanduser(dest))
else:
dest = os.getcwd()

cloud_file = cloud_prefix(src)
def cloud_prefix(remote_tmpdir: AsyncFSURL, path: str) -> str:
path = path.lstrip('/')
return str(remote_tmpdir.with_new_path_component(path))

return (src, dest, cloud_file)

backend = hb.ServiceBackend()
b = hb.Batch(name=name, backend=backend)
j = b.new_bash_job()
j.image(image_name or os.environ.get('HAIL_GENETICS_HAIL_IMAGE', f'hailgenetics/hail:{__pip_version__}'))
FILE_REGEX = re.compile(r'([^:]+)(?::(.+))?')

local_files_to_cloud_files = []

for file in files:
src, dest, cloud_file = file_input_to_src_dest(file)
local_files_to_cloud_files.append({'from': src, 'to': cloud_file})
in_file = b.read_input(cloud_file)
j.command(f'mkdir -p {os.path.dirname(dest)}; ln -s {in_file} {dest}')
def parse_files_option_to_src_dest_and_cloud_intermediate(remote_tmpdir: AsyncFSURL, file: str) -> Tuple[str, str, str]:
match = FILE_REGEX.match(file)
if match is None:
raise ValueError(f'invalid file specification {file}. Must have the form "src" or "src:dest"')

script_src, _, script_cloud_file = file_input_to_src_dest(script)
user_config_src, _, user_config_cloud_file = file_input_to_src_dest(user_config)
src, dest = match.groups()

await copy_from_dict(files=local_files_to_cloud_files)
await copy_from_dict(
files=[
{'from': script_src, 'to': script_cloud_file},
{'from': user_config_src, 'to': user_config_cloud_file},
]
)
if src is None:
raise ValueError(f'invalid file specification {file}. Must have a "src" defined.')

script_file = b.read_input(script_cloud_file)
config_file = b.read_input(user_config_cloud_file)
src, src_looks_like_directory = real_absolute_expanded_path(src)

j.env('HAIL_QUERY_BACKEND', 'batch')
if dest is None:
dest = os.path.join(real_absolute_cwd(), os.path.basename(src))
else:
dest, dest_looks_like_directory = real_absolute_expanded_path(dest)

command = 'python3' if script.endswith('.py') else 'bash'
script_arguments = " ".join(shq(x) for x in arguments)
if src_looks_like_directory and dest == '/':
raise ValueError('cannot mount a directory to "/"')

j.command(f'mkdir -p $HOME/.config/hail && ln -s {config_file} $HOME/.config/hail/config.ini')
j.command(f'cd {os.getcwd()}')
j.command(f'{command} {script_file} {script_arguments}')
batch_handle = await b._async_run(wait=False, disable_progress_bar=quiet)
assert batch_handle
if not src_looks_like_directory and dest_looks_like_directory:
dest = os.path.join(dest, os.path.basename(src))

if output == 'text':
deploy_config = get_deploy_config()
url = deploy_config.external_url('batch', f'/batches/{batch_handle.id}/jobs/1')
print(f'Submitted batch {batch_handle.id}, see {url}')
else:
assert output == 'json'
print(orjson.dumps({'id': batch_handle.id}).decode('utf-8'))
return (src, dest, cloud_prefix(remote_tmpdir, src))


async def transfer_files_options_files_into_job(
remote_tmpdir: AsyncFSURL, files_options: List[str], j: BashJob, b: hb.Batch
):
src_dst_cloud_intermediate_triplets = [
parse_files_option_to_src_dest_and_cloud_intermediate(remote_tmpdir, files_option)
for files_option in files_options
]

if non_existing_files := [src for src, _, _ in src_dst_cloud_intermediate_triplets if not os.path.exists(src)]:
non_existing_files_str = '- ' + '\n- '.join(non_existing_files)
raise HailctlBatchSubmitError(f'Some --files did not exist:\n{non_existing_files_str}', 1)

await copy_from_dict(
files=[
{'from': src, 'to': cloud_intermediate}
for src, _, cloud_intermediate in src_dst_cloud_intermediate_triplets
]
)

await backend.async_close()
for _, dest, cloud_intermediate in src_dst_cloud_intermediate_triplets:
in_file = b.read_input(cloud_intermediate)
j.command(f'mkdir -p {shq(os.path.dirname(dest))}; ln -s {shq(in_file)} {shq(dest)}')


async def upload_script_and_user_config(remote_tmpdir: AsyncFSURL, script: str):
if not os.path.exists(script):
raise HailctlBatchSubmitError(f'Script file does not exist: {script}', 1)
script_src, _, script_cloud_file = parse_files_option_to_src_dest_and_cloud_intermediate(remote_tmpdir, script)

extra_files_to_copy = [
# In Azure, two concurrent uploads to the same blob path race until one succeeds. The
# other fails with an error that is indistinguishable from a client-side logic
# error. We could treat "Invalid Block List" as a limited retry error, but, currently,
# multi-part-create does not retry when an error occurs in the `__aexit__` which is
# when we would experience the error for multi-part-creates.
# https://azure.github.io/Storage/docs/application-and-user-data/code-samples/concurrent-uploads-with-versioning/
# https://github.com/hail-is/hail/pull/13812#issuecomment-1882088862
{'from': script_src, 'to': script_cloud_file},
]

user_config_path = str(get_user_config_path())
user_config_cloud_file = None
if os.path.exists(user_config_path):
user_config_src, _, user_config_cloud_file = parse_files_option_to_src_dest_and_cloud_intermediate(
remote_tmpdir, user_config_path
)
extra_files_to_copy.append({'from': user_config_src, 'to': user_config_cloud_file})

await copy_from_dict(files=extra_files_to_copy)
return script_cloud_file, user_config_cloud_file
Empty file.
Loading

0 comments on commit a2a376e

Please sign in to comment.