Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 88 additions & 11 deletions nemo_run/run/ray/templates/ray.sub.j2
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,36 @@ DASHBOARD_AGENT_GRPC_PORT=${DASHBOARD_AGENT_GRPC_PORT:-53007}
METRICS_EXPORT_PORT=${METRICS_EXPORT_PORT:-53009}

# Ports for the head node
PORT=${PORT:-6379}
PORT=${PORT:-54514}
RAY_CLIENT_SERVER_PORT=${RAY_CLIENT_SERVER_PORT:-10001}
#REDIT_SHARD_PORTS=${REDIT_SHARD_PORTS:-"random"} ??
DASHBOARD_PORT=${DASHBOARD_PORT:-8265} # Also used by debugger
DASHBOARD_AGENT_LISTEN_PORT=${DASHBOARD_AGENT_LISTEN_PORT:-52365}
RAY_DEBUGGER_ARGS=
if [ "${RAY_DEBUG:-}" = "legacy" ]; then
RAY_DEBUGGER_ARGS="--ray-debugger-external"
fi

# After ray>=2.47, this feature is enabled by default which creates uv venvs for any py_executable starting with `uv run`.
# There is severe contention and performance issues with this enabled considering our dependencies are so large and occasionally
# need to be compiled, so NeMo RL has an implementation in nemo_rl/utils/venv.py that does it once per node as opposed to once per task.
export RAY_ENABLE_UV_RUN_RUNTIME_ENV=0

# Setting ulimit is recommended by ray best practices page
# @ https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html
# It's session based and won't affect the system outside the script
# Ensure that the soft limit isn't above the hard limit
if [[ $(ulimit -Hn) == "unlimited" ]] || [[ 65535 -lt $(ulimit -Hn) ]]; then
ulimit -Sn 65535
elif [[ $(ulimit -Hn) != "unlimited" ]] && [[ $(ulimit -Hn) -lt 65535 ]]; then
echo "[WARNING]: Cannot increase ulimit on file descriptors to 65535 according ray recommendation: https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html. Speak to cluster admins to increase, otherwise ray may crash unexpectedly."
fi

# On our clusters, the largest port range on an idle worker appeared between 52369-64607
# (not including the other ports set by this script). So this range is chosen to be
# somewhere in the middle
MIN_WORKER_PORT=${MIN_WORKER_PORT:-54001}
MAX_WORKER_PORT=${MAX_WORKER_PORT:-54257}
MAX_WORKER_PORT=${MAX_WORKER_PORT:-54513}

# Ray temp directory (inside container). Used by --temp-dir and log sync sidecar
RAY_TEMP_DIR=${RAY_TEMP_DIR:-/ray-cluster}
Expand Down Expand Up @@ -82,13 +101,66 @@ gpus_per_node=8

num_retries={{ num_retries }}

# Track backgrounded srun client PIDs for head and workers
declare -A SRUN_PIDS

# Verify all backgrounded srun client processes are still alive; exit fast if any died
check_srun_processes() {
for name in "${!SRUN_PIDS[@]}"; do
pid="${SRUN_PIDS[$name]}"
# Check if the process is still running
if ! kill -0 "$pid" 2>/dev/null; then
echo "[ERROR] Background srun '$name' died (pid=$pid). Could be a failure in startup or an issue with the node preventing the srun to start. Attempting to exit." >&2
# Signal sidecars inside containers to terminate ASAP
touch "$LOG_DIR/ENDED"
exit 1
fi
done
}

# Getting the node names and IP addresses in the SLURM allocation
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
ip_addresses_array=()

for node in $nodes; do
ip_address=$(getent hosts "$node" | awk '{print $1}' | head -n1)
# Try multiple methods to get IP address - ENHANCED VERSION v2.0
echo "[DEBUG] Resolving hostname: $node using enhanced resolution methods"
ip_address=""

# Method 1: Try host command
echo "[DEBUG] Method 1: host command"
ip_address=$(host $node 2>/dev/null | awk '/has address/ { print $4 }' | head -1 || true)
echo "[DEBUG] host result: '$ip_address'"

# Method 2: If host fails, try getent
if [[ -z "$ip_address" ]]; then
echo "[DEBUG] Method 2: getent hosts"
ip_address=$(getent hosts $node 2>/dev/null | awk '{ print $1 }' | head -1 || true)
echo "[DEBUG] getent result: '$ip_address'"
fi

# Method 3: If getent fails, try nslookup
if [[ -z "$ip_address" ]]; then
echo "[DEBUG] Method 3: nslookup"
ip_address=$(nslookup $node 2>/dev/null | awk '/^Address: / { print $2 }' | head -1 || true)
echo "[DEBUG] nslookup result: '$ip_address'"
fi

# Method 4: If all DNS methods fail, try ping to extract IP
if [[ -z "$ip_address" ]]; then
echo "[DEBUG] Method 4: ping"
ip_address=$(ping -c 1 $node 2>/dev/null | grep "PING" | sed 's/.*(\([^)]*\)).*/\1/' || true)
echo "[DEBUG] ping result: '$ip_address'"
fi

# If still no IP, use the hostname itself (might work if it's already an IP or resolvable)
if [[ -z "$ip_address" ]]; then
echo "[WARNING] Could not resolve IP for $node, using hostname as fallback"
ip_address=$node
fi

echo "[INFO] Node: $node -> IP: $ip_address"
# Add the IP address to the array
ip_addresses_array+=("$ip_address")
done
Expand Down Expand Up @@ -184,12 +256,13 @@ ray start --head \
--ray-client-server-port=${RAY_CLIENT_SERVER_PORT} \
--dashboard-port=${DASHBOARD_PORT} \
\
--node-manager-port=${NODE_MANAGER_PORT} \
--object-manager-port=${OBJECT_MANAGER_PORT} \
--runtime-env-agent-port=${RUNTIME_ENV_AGENT_PORT} \
--dashboard-agent-grpc-port=${DASHBOARD_AGENT_GRPC_PORT} \
--dashboard-agent-listen-port=${DASHBOARD_AGENT_LISTEN_PORT} \
--metrics-export-port=${METRICS_EXPORT_PORT} \
--node-manager-port=$((${NODE_MANAGER_PORT} + 1)) \
--object-manager-port=$((${OBJECT_MANAGER_PORT} + 1)) \
--runtime-env-agent-port=$((${RUNTIME_ENV_AGENT_PORT} + 1)) \
--dashboard-agent-grpc-port=$((${DASHBOARD_AGENT_GRPC_PORT} + 1)) \
--dashboard-agent-listen-port=$((${DASHBOARD_AGENT_LISTEN_PORT} + 1)) \
--metrics-export-port=$((${METRICS_EXPORT_PORT} + 1)) \
$RAY_DEBUGGER_ARGS \
\
--block
EOFINNER
Expand All @@ -207,6 +280,7 @@ exit 1
EOF
)
srun {{ common_srun_args }} --container-name=ray-head --nodes=1 --ntasks=1 -w "$head_node" -o $LOG_DIR/{{ ray_log_prefix }}head.log bash -x -c "$head_cmd" &
SRUN_PIDS["ray-head"]=$!

# Wait for the head node container to start and for Ray to be ready
elapsed_time=0
Expand All @@ -217,6 +291,7 @@ while ! (srun --overlap --nodes=1 --ntasks=1 -w $head_node test -f $LOG_DIR/STAR
exit 1
fi
echo "[INFO][$(date)] Waiting for Ray head node container to start and be ready... ($elapsed_time/$RAY_HEAD_START_TIMEOUT seconds)"
check_srun_processes
sleep 2
elapsed_time=$((elapsed_time + 2))
done
Expand Down Expand Up @@ -261,7 +336,6 @@ monitor-sidecar &
sed -i 's/context\.py_executable = " "\.join(self\.nsight_cmd) + " python"/context.py_executable = " ".join(self.nsight_cmd) + f" {context.py_executable}"/g' /opt/nemo_rl_venv/lib64/python*/site-packages/ray/_private/runtime_env/nsight.py

cat <<EOFINNER | tee /launch-worker.sh
sleep 5
ray start --address "$ip_head" \
--disable-usage-stats \
--resources="{\"worker_units\": $gpus_per_node, \"slurm_managed_ray_cluster\": 1}" \
Expand All @@ -274,6 +348,7 @@ ray start --address "$ip_head" \
--dashboard-agent-grpc-port=${DASHBOARD_AGENT_GRPC_PORT} \
--dashboard-agent-listen-port=${DASHBOARD_AGENT_LISTEN_PORT} \
--metrics-export-port=${METRICS_EXPORT_PORT} \
$RAY_DEBUGGER_ARGS \
\
--block
EOFINNER
Expand All @@ -293,6 +368,7 @@ EOF
OVERLAP_HEAD_AND_WORKER_ARG="--overlap"
fi
srun {{ common_srun_args }} ${OVERLAP_HEAD_AND_WORKER_ARG:-} --container-name=ray-worker-$i --exact --nodes=1 --ntasks=1 --cpus-per-task=$((16 * gpus_per_node)) -w "$node_i" -o $LOG_DIR/{{ ray_log_prefix }}worker-$i.log bash -x -c "$worker_cmd" &
SRUN_PIDS["ray-worker-$i"]=$!
sleep 3
done

Expand All @@ -316,9 +392,10 @@ extract_worker_units() {
while true; do
worker_units=$(extract_worker_units)
echo "[INFO] Number of actors online: $worker_units/$NUM_ACTORS"
if [ "$worker_units" -eq "$NUM_ACTORS" ]; then
if [[ "$worker_units" -eq "$NUM_ACTORS" ]]; then
break
fi
check_srun_processes
sleep 2
done

Expand Down
Loading
Loading