Skip to content

Commit

Permalink
ADLR/megatron-lm!2348 - ci: Re-enable llava tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ko3n1g committed Nov 17, 2024
1 parent ce507ee commit 9e9d4f5
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 65 deletions.
19 changes: 14 additions & 5 deletions tests/functional_tests/jet_recipes/multimodal-llava.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ type: basic
format_version: 1
maintainers: [mcore]
loggers: [stdout]
launchers:
type:slurm:
ntasks_per_node: '{gpus}'
spec:
name: "{test_case}"
name: '{test_case}'
model: multimodal-llava
build: mcore-pyt-{environment}
nodes: 1
Expand Down Expand Up @@ -33,8 +36,14 @@ products:
- environment: [lts, dev]
scope: [mr]
n_repeat: [5]
gpus: [8]
test_case:
- multimodal_llava_mr_mcore_te_tp1_pp1_dgx_a100_1N8G
- multimodal_llava_mr_mcore_te_tp2_pp3_dgx_a100_1N8G
# - multimodal_llava_mr_mcore_te_tp4_pp1_etp3_dgx_a100_1N7G
# - multimodal_llava_mr_mcore_te_tp4_pp1_resume_torch_etp3_dgx_a100_1N7G
- multimodal_llava_mr_mcore_te_tp1_pp1_dgx_a100_1N8G
- multimodal_llava_mr_mcore_te_tp2_pp3_dgx_a100_1N8G
- environment: [lts, dev]
scope: [mr]
n_repeat: [5]
gpus: [7]
test_case:
- multimodal_llava_mr_mcore_te_tp4_pp1_etp3_dgx_a100_1N7G
- multimodal_llava_mr_mcore_te_tp4_pp1_resume_torch_etp3_dgx_a100_1N7G
36 changes: 26 additions & 10 deletions tests/functional_tests/python_test_utils/jet/launch_jet_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import sys
import tempfile
import time
from typing import List, Optional, Tuple
from typing import List, Optional

import click
import jetclient
import requests
import yaml
from jetclient.facades.objects import log as jet_log
from jetclient.services.dtos.pipeline import PipelineStatus

from tests.functional_tests.python_test_utils.jet import common
Expand Down Expand Up @@ -97,8 +98,7 @@ def launch_and_wait_for_completion(
return pipeline


def download_job_assets(job: jetclient.JETJob, iteration: int = 0) -> List[str]:
logs = job.get_logs()
def download_job_assets(logs: List[jet_log.JETLog], iteration: int = 0) -> List[str]:
if not logs:
return [""]

Expand All @@ -113,8 +113,7 @@ def download_job_assets(job: jetclient.JETJob, iteration: int = 0) -> List[str]:
assets[log_filename].download(pathlib.Path(fh.name))


def download_job_logs(job: jetclient.JETJob) -> List[str]:
logs = job.get_logs()
def extract_logs_to_string(logs: List[jet_log.JETLog]) -> List[str]:
if not logs:
return [""]

Expand Down Expand Up @@ -201,8 +200,9 @@ def main(
sys.exit(1)

n_attempts = 0
n_nondeterminism_attemps = 0
n_iteration = 0
while True and n_attempts < 3:
while True and n_attempts < 3 and n_nondeterminism_attemps < 2:
pipeline = launch_and_wait_for_completion(
test_case=test_case,
environment=environment,
Expand All @@ -218,15 +218,29 @@ def main(

main_job = [job for job in pipeline.get_jobs() if job.name.startswith("basic")][0]

logs = download_job_logs(job=main_job)
n_download_attempt = 0
while n_download_attempt < 3:
try:
jet_log = main_job.get_logs()
break
except requests.exceptions.ConnectionError as e:
print(e)
time.sleep((3**n_download_attempt) * 60)
n_download_attempt += 1

logs = extract_logs_to_string(logs=jet_log)

concat_logs = "\n".join(logs)
print(f"Logs:\n{concat_logs}")

download_job_assets(job=main_job, iteration=n_iteration)
download_job_assets(logs=jet_log, iteration=n_iteration)

if test_type != "release":
success = pipeline.get_status() == PipelineStatus.SUCCESS

if success:
sys.exit(int(not success)) # invert for exit 0

if (
"Some NCCL operations have failed or timed out." in concat_logs
or "uncorrectable ECC error encountered" in concat_logs
Expand All @@ -236,8 +250,10 @@ def main(
print("Detected NCCL failure, attempt restart.")
n_attempts += 1
continue

sys.exit(int(not success)) # invert for exit 0
else:
print("Non-determinism, let's try another node.")
n_nondeterminism_attemps += 1
continue

if parse_failed_job(logs=logs):
n_attempts += 1
Expand Down
104 changes: 54 additions & 50 deletions tests/functional_tests/shell_test_utils/notify.sh
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
set -euxo pipefail

collect_jobs () {
PAGE=1
PER_PAGE=100
RESULTS="[]"

while true; do
# Fetch the paginated results
RESPONSE=$(curl \
-s \
--globoff \
--header "PRIVATE-TOKEN: $RO_API_TOKEN" \
"https://${GITLAB_ENDPOINT}/api/v4/projects/${CI_PROJECT_ID}/pipelines/${DOWNSTREAM_PIPELINE_ID}/jobs?page=$PAGE&per_page=$PER_PAGE"
)
# Combine the results
RESULTS=$(jq -s '.[0] + .[1]' <<< "$RESULTS $RESPONSE")

# Check if there are more pages
if [[ $(jq 'length' <<< "$RESPONSE") -lt $PER_PAGE ]]; then
break
fi
collect_jobs() {
PAGE=1
PER_PAGE=100
RESULTS="[]"

while true; do
# Fetch the paginated results
RESPONSE=$(
curl \
-s \
--globoff \
--header "PRIVATE-TOKEN: $RO_API_TOKEN" \
"https://${GITLAB_ENDPOINT}/api/v4/projects/${CI_PROJECT_ID}/pipelines/${DOWNSTREAM_PIPELINE_ID}/jobs?page=$PAGE&per_page=$PER_PAGE"
)
# Combine the results
RESULTS=$(jq -s '.[0] + .[1]' <<<"$RESULTS $RESPONSE")

# Check if there are more pages
if [[ $(jq 'length' <<<"$RESPONSE") -lt $PER_PAGE ]]; then
break
fi

# Increment the page number
PAGE=$((PAGE + 1))
done
# Increment the page number
PAGE=$((PAGE + 1))
done

echo "$RESULTS"
echo "$RESULTS"
}

CI_PIPELINE_ID=${1:-16595865}
Expand All @@ -35,31 +36,32 @@ CI_PROJECT_ID=${CI_PROJECT_ID:-19378}

# Fetch Elastic logs
set +x
PIPELINE_JSON=$(curl \
--fail \
--silent \
--header "PRIVATE-TOKEN: ${RO_API_TOKEN}" \
"https://${GITLAB_ENDPOINT}/api/v4/projects/${CI_PROJECT_ID}/pipelines/${CI_PIPELINE_ID}/bridges?per_page=100"
) || ret_code=$?
PIPELINE_JSON=$(
curl \
--fail \
--silent \
--header "PRIVATE-TOKEN: ${RO_API_TOKEN}" \
"https://${GITLAB_ENDPOINT}/api/v4/projects/${CI_PROJECT_ID}/pipelines/${CI_PIPELINE_ID}/bridges?per_page=100"
) || ret_code=$?
set -x
if [[ ${ret_code:-0} -ne 0 ]]; then
echo CI_PIPELINE_ID=$CI_PIPELINE_ID does not exist
exit 1
fi

# Fetch GitLab logs of JET downstream pipeline
DOWNSTREAM_PIPELINE_ID=$(jq --arg environment "$ENVIRONMENT" '.[] |select(.name == "functional:run_" + $environment) | .downstream_pipeline.id' <<< "$PIPELINE_JSON")
DOWNSTREAM_PIPELINE_ID=$(jq --arg environment "$ENVIRONMENT" '.[] |select(.name == "functional:run_" + $environment) | .downstream_pipeline.id' <<<"$PIPELINE_JSON")

PIPELINE_URL=https://${GITLAB_ENDPOINT}/ADLR/megatron-lm/-/pipelines/$CI_PIPELINE_ID
JOB_URL=https://${GITLAB_ENDPOINT}/ADLR/megatron-lm/-/jobs/

if [[ $DOWNSTREAM_PIPELINE_ID == null ]]; then
FAILED_JOBS=$(curl \
--fail \
--silent \
--header "PRIVATE-TOKEN: ${RO_API_TOKEN}" \
"https://${GITLAB_ENDPOINT}/api/v4/projects/${CI_PROJECT_ID}/pipelines/${CI_PIPELINE_ID}/jobs?per_page=100" \
| jq --arg JOB_URL "$JOB_URL" '[.[] | select(.status == "failed") | ("<" + $JOB_URL + (.id | tostring) + "|" + .name + ">")] | join("\n• Job: ")' | tr -d '"')
--fail \
--silent \
--header "PRIVATE-TOKEN: ${RO_API_TOKEN}" \
"https://${GITLAB_ENDPOINT}/api/v4/projects/${CI_PROJECT_ID}/pipelines/${CI_PIPELINE_ID}/jobs?per_page=100" |
jq --arg JOB_URL "$JOB_URL" '[.[] | select(.status == "failed") | ("<" + $JOB_URL + (.id | tostring) + "|" + .name + ">")] | join("\n• Job: ")' | tr -d '"')
curl \
-X POST \
-H "Content-type: application/json" \
Expand Down Expand Up @@ -91,40 +93,41 @@ else
echo $JOBS
set -x

FAILED_JOBS=$(echo "$JOBS" \
| jq --arg GITLAB_ENDPOINT "$GITLAB_ENDPOINT" '[
FAILED_JOBS=$(
echo "$JOBS" |
jq --arg GITLAB_ENDPOINT "$GITLAB_ENDPOINT" '[
.[]
| select(.status != "success")
| {
name,
id,
"url": ("https://" + $GITLAB_ENDPOINT + "/dl/jet/ci/-/jobs/" + (.id | tostring)),
"url": ("https://" + $GITLAB_ENDPOINT + "/adlr/megatron-lm/-/jobs/" + (.id | tostring)),
}
]'
)
)
set -x

for row in $(echo "${FAILED_JOBS}" | jq -r '.[] | @base64'); do
_jq() {
echo ${row} | base64 --decode | jq -r ${1}
echo ${row} | base64 --decode | jq -r ${1}
}
JOB_ID=$(_jq '.id')
FULL_LOG=$(curl \
--location \
--header "PRIVATE-TOKEN: ${RO_API_TOKEN}" \
"https://${GITLAB_ENDPOINT}/api/v4/projects/${CI_PROJECT_ID}/jobs/${JOB_ID}/trace")
if [[ "$FULL_LOG" == *exception* ]]; then

if [[ "$FULL_LOG" == *exception* ]]; then
LAST_EXCEPTION_POS=$(echo "$FULL_LOG" | grep -o -b 'exception' | tail -1 | cut -d: -f1)
SHORT_LOG=${FULL_LOG:$LAST_EXCEPTION_POS-500:499}
else
SHORT_LOG=${FULL_LOG: -1000}
fi

FAILED_JOBS=$(echo "$FAILED_JOBS" \
| jq \
--argjson JOB_ID "$JOB_ID" \
--arg SLURM_FAILURE "$SHORT_LOG" '
FAILED_JOBS=$(echo "$FAILED_JOBS" |
jq \
--argjson JOB_ID "$JOB_ID" \
--arg SLURM_FAILURE "$SHORT_LOG" '
.[] |= ((select(.id==$JOB_ID) += {
"slurm_failure_reason": $SLURM_FAILURE}))
')
Expand All @@ -144,8 +147,9 @@ else
}
]'
else
BLOCKS=$(echo "$FAILED_JOBS" \
| jq --arg DATE "$DATE" --arg CONTEXT "$CONTEXT" --arg URL "$PIPELINE_URL" --arg NUM_FAILED "$NUM_FAILED" --arg NUM_TOTAL "$NUM_TOTAL" '
BLOCKS=$(
echo "$FAILED_JOBS" |
jq --arg DATE "$DATE" --arg CONTEXT "$CONTEXT" --arg URL "$PIPELINE_URL" --arg NUM_FAILED "$NUM_FAILED" --arg NUM_TOTAL "$NUM_TOTAL" '
[
{
"type": "section",
Expand Down Expand Up @@ -191,4 +195,4 @@ else
$WEBHOOK_URL
done

fi
fi

0 comments on commit 9e9d4f5

Please sign in to comment.