Skip to content

Ray Research

Dr Ben Denham edited this page Apr 25, 2025 · 1 revision
  • A task can call another task - potentially a nice paradigm than explicitly creating a DAG? Harder for us to implement for labtech, as it requires running a task and then "shelving" it while its dependencies run.
  • Use objects to represent context and results
  • "Remote objects are cached in Ray’s distributed shared-memory object store, and there is one object store per node in the cluster. In the cluster setting, a remote object can live on one or many nodes, independent of who holds the object ref(s)."
  • "If the object is a numpy array or a collection of numpy arrays, the get call is zero-copy and returns arrays backed by shared object store memory. Otherwise, we deserialize the object data into a Python object."
  • "If the current node’s object store does not contain the object, the object is downloaded."
  • "Objects are tracked via distributed reference counting, and their data is automatically freed once all references to the object are deleted."
  • "You can also pass objects to tasks via closure-capture."
  • "For production usage or non-changing environments, we recommend installing your dependencies into a container image and specifying the image using the Cluster Launcher. For dynamic environments (e.g. for development and experimentation), we recommend using runtime environments."
  • "You can build all your files and dependencies into a container image and specify this in your your Cluster YAML Configuration." ... "You can push local files to the cluster using ray rsync_up"
  • Runtime Environments:
    • Can be different for each task, or one for the entire "job" (app) - probably just use the latter for simplicity?
    • ray.init(runtime_env={...})
    • Specifying a working_dir will ensure that local directory is pushed to the cluster by ray.init().
    • You can specify pip or conda deps, but probably best to leave these to the cluster server setup - otherwise you'll be re-downloading every time.
    • You can specify Python modules that your tasks depend on with py_modules.
      • This can be a directory of python files. "if the local directory contains a .gitignore file, the files and paths specified there are not uploaded to the cluster."
      • "Note: This feature is currently limited to modules that are packages with a single directory containing an init.py file. For single-file modules, you may use working_dir."
      • Also has an excludes option.
    • "If runtime_env cannot be set up (e.g., network issues, download failures, etc.), Ray will fail to schedule tasks/actors that require the runtime_env. If you call ray.get, it will raise RuntimeEnvSetupError with the error message in detail."
  • You can choose a different scheduling strategy.
  • The default strategy has locality-aware scheduling - Ray will automatically try to allocate tasks to nodes where dependent objects are already present.
  • Ray can handle GPU allocation
  • Ray automatically kills tasks to prevent out-of-memory
  • Tasks killed by the memory monitor will be retried infinitely with exponential backoff up to 60 seconds.
  • Generally, don't call .put() inside a task.
  • "When a worker is executing a task, if the worker dies unexpectedly, either because the process crashed or because the machine failed, Ray will rerun the task until either the task succeeds or the maximum number of retries is exceeded. The default number of retries is 3 and can be overridden by specifying max_retries in the @ray.remote decorator. Specifying -1 allows infinite retries, and 0 disables retries."
  • "By default, Ray will not retry tasks upon exceptions thrown by application code. However, you may control whether application-level errors are retried, and even which application-level errors are retried, via the retry_exceptions argument. This is False by default. To enable retries upon application-level errors, set retry_exceptions=True to retry upon any exception, or pass a list of retryable exceptions."
  • "If a task is hanging, you may want to cancel the task to continue to make progress. You can do this by calling ray.cancel on an ObjectRef returned by the task. By default, this will send a KeyboardInterrupt to the task’s worker if it is mid-execution. Passing force=True to ray.cancel will force-exit the worker. See the API reference for ray.cancel for more details."
  • "Sometimes, application-level code may cause memory leaks on a worker after repeated task executions, e.g., due to bugs in third-party libraries. To make progress in these cases, you can set the max_calls option in a task’s @ray.remote decorator. Once a worker has executed this many invocations of the given remote function, it will automatically exit. By default, max_calls is set to infinity."
    • Sounds equivalent to max_tasks_per_child in concurrent futures.
  • Ray Dashboard can be run to view running tasks
  • It is automatically run on the head node on a given port.
    • We should make sure tasks are meaningfully named to help our users view this.
  • There is also a CLI to get task status
  • Log files go into a tmp directory by default
  • "By default, Worker stdout and stderr for Tasks and Actors stream to the Ray Driver (the entrypoint script that calls ray.init). It helps users aggregate the logs for the distributed Ray application in a single place."
  • There is an API to build a DAG that can be executed, but probably not helpful for us.
  • Tips for running Ray clusters with 1k+ nodes!
  • Notes for avoiding zombies when your task contains subprocesses and is killed.
  • "RAY_kill_child_processes_on_worker_exit (default true): Only works on Linux. If true, the worker kills all direct child processes on exit. This won’t work if the worker crashed. This is NOT recursive, in that grandchild processes are not killed by this mechanism."
  • "RAY_kill_child_processes_on_worker_exit_with_raylet_subreaper (default false): Only works on Linux greater than or equal to 3.4. If true, Raylet recursively kills any child processes and grandchild processes that were spawned by the worker after the worker exits. This works even if the worker crashed. The killing happens within 10 seconds after the worker death."
  • "On non-Linux platforms, user-spawned process is not controlled by Ray. The user is responsible for managing the lifetime of the child processes. If the parent Ray worker process dies, the child processes will continue to run."
  • Using ray.wait to limit the number of pending tasks
  • Anti-pattern: Redefining the same remote function or class harms performance