From f07581b1cf81663ac97742dda8b07b56c572ee5f Mon Sep 17 00:00:00 2001 From: Oliver Koenig Date: Tue, 1 Oct 2024 14:52:43 -0700 Subject: [PATCH] ADLR/megatron-lm!2164 - ci: Download artifacts --- .../jet/generate_jet_trigger_job.py | 1 + .../jet/launch_jet_workload.py | 28 ++++++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/tests/functional_tests/python_test_utils/jet/generate_jet_trigger_job.py b/tests/functional_tests/python_test_utils/jet/generate_jet_trigger_job.py index b67d856464..30d13c3730 100644 --- a/tests/functional_tests/python_test_utils/jet/generate_jet_trigger_job.py +++ b/tests/functional_tests/python_test_utils/jet/generate_jet_trigger_job.py @@ -97,6 +97,7 @@ def main( "timeout": "7 days", "needs": [{"pipeline": '$PARENT_PIPELINE_ID', "job": "jet-generate"}], "script": [" ".join(script)], + "artifacts": {"paths": ["results/"]}, } with open(output_path, 'w') as outfile: diff --git a/tests/functional_tests/python_test_utils/jet/launch_jet_workload.py b/tests/functional_tests/python_test_utils/jet/launch_jet_workload.py index 123c322677..3e243c542a 100644 --- a/tests/functional_tests/python_test_utils/jet/launch_jet_workload.py +++ b/tests/functional_tests/python_test_utils/jet/launch_jet_workload.py @@ -86,6 +86,22 @@ def launch_and_wait_for_completion( return pipeline +def download_job_assets(job: jetclient.JETJob, iteration: int = 0) -> List[str]: + logs = job.get_logs() + if not logs: + return [""] + + assets_base_path = BASE_PATH / ".." / ".." / ".." / ".." / "results" / f"iteration={iteration}" + + for restart_idx, log in enumerate(logs): + assets = log.get_assets() + assets_path = assets_base_path / f"restart={restart_idx}" + assets_path.mkdir(parents=True, exist_ok=True) + for log_filename in assets.keys(): + with open(assets_path / log_filename, "w") as fh: + assets[log_filename].download(pathlib.Path(fh.name)) + + def download_job_logs(job: jetclient.JETJob) -> List[str]: logs = job.get_logs() if not logs: @@ -157,6 +173,7 @@ def main( sys.exit(1) n_attempts = 0 + n_iteration = 0 while True and n_attempts < 3: pipeline = launch_and_wait_for_completion( test_case=test_case, @@ -168,12 +185,14 @@ def main( wandb_experiment=wandb_experiment, ) - logs = download_job_logs( - job=[job for job in pipeline.get_jobs() if job.name.startswith("basic")][0] - ) + main_job = [job for job in pipeline.get_jobs() if job.name.startswith("basic")][0] + + logs = download_job_logs(job=main_job) concat_logs = "\n".join(logs) print(f"Logs:\n{concat_logs}") + download_job_assets(job=main_job, iteration=n_iteration) + if test_type != "release": success = pipeline.get_status() == PipelineStatus.SUCCESS sys.exit(int(not success)) # invert for exit 0 @@ -186,9 +205,10 @@ def main( current_iteration, total_iterations = parsed_result if current_iteration == total_iterations: + success = pipeline.get_status() == PipelineStatus.SUCCESS sys.exit(int(not success)) # invert for exit 0 - + n_iteration += 1 sys.exit(1)