An automation framework for running sequential metagenome analysis jobs and making the outputs available as metadata in the NMDC database, and data objects on the NMDC data portal.
- nmdc_automation
- mongodb-community needs to be installed and running on the local machine
- Python 3.11 or later
- Poetry
Poetry Installation instructions can be found Here
Install MongoDB using Homebrew on MacOS:
brew tap mongodb/brew
brew install mongodb-community
brew services start mongodb-community
Full Mongodb installation instructions for Mac can be found here
Ensure that the mongodb service is running:
brew services start mongodb-community
-
Clone the repository
git clone https://github.com/microbiomedata/nmdc_automation.git
-
Install the required packages
cd nmdc_automation poetry install
-
Activate the poetry environment
poetry env activate
OR
poetry shell
Depending on Poetry version
- Run the tests
make test
The Scheduler polls the NMDC database based upon an Allowlist
of DataGeneration IDs. Based on an allowed data-generation ID, the scheduler examines WorkflowExecutions and DataObjects that was_informed_by
by the data generation, and builds a graph of Workflow Process Nodes
.
A Workflow Process Node
is a representation of:
workflow
- the workflow configuration, from workflows.yaml. The "recipe" for the given type of analysisworkflow.children
- the child workflow recipes that can be run after this workflow
process
- the planned process, from the NMDC database. The "instance" of a workflow execution or data generation from the NMDC databaseparent
- the parent workflow process node, if anychildren
- the child workflow process nodes, if any
Workflow Process Node Mermaid Diagram:
erDiagram
WorkflowProcessNode ||--|| PlannedProcess: "process"
PlannedProcess ||-- |{ DataObject: "has_input / has_output"
WorkflowProcessNode }|--|| WorkflowConfig: "workflow"
WorkflowConfig ||--o{ WorkflowConfig: "children"
WorkflowProcessNode |o--o| WorkflowProcessNode: "parent"
WorkflowProcessNode |o--o{ WorkflowProcessNode: "children"
When the Scheduler finds a node where:
- The node has a workflow configuration in node.workflow.children
- The node DOES NOT have a child node in node.children
- The required inputs for the child workflow are available in node's process outputs
Scheduler Process Mermaid Diagram:
erDiagram
WPNode_Sequencing ||--|| WPNode_ReadsQC: "children nodes"
WPNode_Sequencing ||--|| WConfig_Sequencing: "workflow"
WConfig_Sequencing ||--o{ WConfig_ReadsQC: "children workflows"
WPNode_Sequencing ||--|| Process_Sequencing: "process"
Process_Sequencing ||-- |{ SequencingData: "has_output"
WPNode_ReadsQC ||--|| Process_ReadsQC: "process"
Process_ReadsQC ||--|{ SequencingData: "has_input"
Process_ReadsQC ||-- |{ ReadsQCData: "has_output"
WPNode_ReadsQC ||--|| WConfig_ReadsQC: "workflow"
WConfig_ReadsQC ||--o{ WConfig_Assembly: "children workflows"
In this case the Scheduler will "schedule" a new job by creating a Job configuration from:
- the workflow configuration from
node.workflow.children
- input data from
node.data_objects
and writing this to the jobs
collection in the NMDC database
The Watcher "watches" the jobs
table in the NMDC database looking for unclaimed jobs. If found, the Watcher will create a WorkflowJob
to manage the analysis job. The watcher will then periodically poll each workflow job for its status and process successful or failed jobs when they are complete
A WorkflowJob
consists of a WorkflowStateManager
and a JobRunner
and is responsible for preparing the
required inputs for an analysis job, submitting it to the job running service.
The default job running service is JAWS. The legacy job running service is a self-managed SLURM/Condor/Cromwell stack running on Permutter.
Details can be found in README_Slurm.md
The JobRunner
is also responsible for processing the resulting data and metadata when the job completes.
The watcher maintains a record of it's current activity in a State File
Site-specific configuration is provided by a .toml file and defines some parameters that are used across the workflow process including
- URL and credentials for NMDC API
- Staging and Data filesystem locations for the site
- Job Runner service URLs
- Path to the state file
Workflow Definitions : Workflow definitions in a .yaml file describing each analysis step, specifying:
- Name, type, version, WDL and git repository for each workflow
- Inputs, Outputs and Workflow Execution steps
- Data Object Types, description and name templates for processing workflow output data
The Scheduler is a Dockerized application running on Rancher. To initialize the Scheduler for new DataGeneration IDs, the following steps:
- On Rancher, go to
Deployments
, selectProduction
from the clusters list, and find the Scheduler in eithernmdc
ornmdc-dev
- Click on the Scheduler and select
run shell
- In the shell,
cd /conf
- Update the file
allow.lst
with the Data Generation IDs that you want to schedule- Copy the list of data-generation IDs to you clipboard
- In the shell, delete the existing allow list
rm allow.lst
- Replace the file with your copied list:
cat >allow.lst
- Paste your IDs
command-v
- Ensure a blank line at the end with a
return
- Terminate the
cat
command usingcontrol-d
- The default log level is
INFO
if you want to change it toDEBUG
for more verbose logging, run the following command:export NMDC_LOG_LEVEL=DEBUG
- Restart the scheduler. In the shell, in
/conf
:./run_scheduler.sh
- If running tests on
dev
, make sure to check./run_scheduler.sh -h
for options.
- If running tests on
- Ensure the scheduler is running by checking
sched.log
The watcher is a python application which runs on a login node on Perlmutter.
The following instructions all assume the user is logged in as user [email protected]
- Get an ssh key - in your home directory:
./sshproxy.sh -u <your_nersc_username> -c nmdcda
- Log in using the key
ssh -i .ssh/nmdcda [email protected]
Watcher code and config files can be found
/global/homes/n/nmdcda/nmdc_automation/prod
/global/homes/n/nmdcda/nmdc_automation/dev
-
Check the last node the watcher was running on
(base) nmdcda@perlmutter:login07:~> cd nmdc_automation/dev (base) nmdcda@perlmutter:login07:~/nmdc_automation/dev> cat host-dev.last login24
-
ssh to that node
(base) nmdcda@perlmutter:login07:~/nmdc_automation/dev> ssh login24
-
Check for the watcher process
(base) nmdcda@perlmutter:login24:~> ps aux | grep watcher nmdcda 115825 0.0 0.0 8236 848 pts/94 S+ 09:33 0:00 grep watcher nmdcda 2044781 0.4 0.0 146420 113668 ? S Mar06 5:42 python -m nmdc_automation.run_process.run_workflows watcher --config /global/homes/n/nmdcda/nmdc_automation/prod/site_configuration_nersc_prod.toml --jaws daemon nmdcda 2044782 0.0 0.0 5504 744 ? S Mar06 0:00 tee -a watcher-prod.log
-
IF we are going to shut down the Watcher (without restarting), we need to kill the existing process
(base) nmdcda@perlmutter:login24:~> kill -9 2044781
Note
This will also terminate the tee
process that is writing to the log file.
To restart the Watcher with older versions of the ./run.sh script
, manual termination of the existing process was necessary with kill -9 2044781
. However, the new run_watcher.sh
script now handles killing and restarting the Watcher.
- Ensure you have the latest
nmdc_automation
code.cd nmdc_automation
git status
/git switch main
if not on main branchgit fetch origin
git pull
- Setup NMDC automation environment with
conda
andpoetry
.- load conda:
eval "$__conda_setup"
- in the
nmdc_automation
directory, install the nmdc_automation project withpoetry install
poetry shell
to use the environment
- load conda:
Example Setup:
(nersc-python) nmdcda@perlmutter:login38:~> pwd
/global/homes/n/nmdcda
(nersc-python) nmdcda@perlmutter:login38:~> cd nmdc_automation/dev/
(nersc-python) nmdcda@perlmutter:login38:~/nmdc_automation/dev> eval "$__conda_setup"
(base) nmdcda@perlmutter:login38:~/nmdc_automation/dev> cd nmdc_automation/
(base) nmdcda@perlmutter:login38:~/nmdc_automation/dev/nmdc_automation> poetry install
Installing dependencies from lock file
No dependencies to install or update
Installing the current project: nmdc-automation (0.1.0)
(base) nmdcda@perlmutter:login38:~/nmdc_automation/dev/nmdc_automation> poetry shell
Spawning shell within /global/cfs/cdirs/m3408/nmdc_automation/dev/nmdc_automation/.venv
. /global/cfs/cdirs/m3408/nmdc_automation/dev/nmdc_automation/.venv/bin/activate
(base) nmdcda@perlmutter:login38:~/nmdc_automation/dev/nmdc_automation> . /global/cfs/cdirs/m3408/nmdc_automation/dev/nmdc_automation/.venv/bin/activate
(nmdc-automation-py3.11) (base) nmdcda@perlmutter:login38:~/nmdc_automation/dev/nmdc_automation>
The poetry shell
command will activate the environment for the current shell session.
Environment (nmdc-automation-py3.11)
will be displayed in the prompt.
We run the Watcher using nohup
(No Hangup) - this prevents the Watcher process from being terminated
when the user's terminal session ends. This will cause stdout and stderr to be written to a file
names nohup.out
in addition to being written to the watcher-[dev/prod].log
file.
- change to the working
prod
ordev
directory
/global/homes/n/nmdcda/nmdc_automation/prod
/global/homes/n/nmdcda/nmdc_automation/dev
rm nohup.out
(Long term logging is captured in thewatcher-[dev/prod].log
file, which is retained)nohup ./run_watcher_dev.sh &
(for dev) ORnohup ./run_watcher_prod.sh &
(for prod)
Note
These scripts use the JAWS service to run jobs. If you want to use SLURM/Condor, use run_dev_slurm.sh
or run_prod_slurm.sh
Same process as as Checking the Watcher Status
JAWS is the default job running service. It is a Cromwell-based service that runs jobs on NERSC and other compute resources. Documentation can be found here.
With the jaws_jobid
from the agent.state
files, you can check the status of the job in the JAWS service
JAWS Status call:
> jaws status 109288
{
"compute_site_id": "nmdc",
"cpu_hours": null,
"cromwell_run_id": "0fddc559-833e-4e14-9fa5-1e3d485b232d",
"id": 109288,
"input_site_id": "nmdc",
"json_file": "/tmp/tmpeoq9a5p_.json",
"output_dir": null,
"result": null,
"status": "running",
"status_detail": "The run is being executed; you can check `tasks` for more detail",
"submitted": "2025-05-01 11:22:45",
"tag": "nmdc:dgns-11-sm8wyy89/nmdc:wfrqc-11-7fgdsy18.1",
"team_id": "nmdc",
"updated": "2025-05-01 11:40:44",
"user_id": "nmdcda",
"wdl_file": "/tmp/tmpq0l3fk0n.wdl",
"workflow_name": "nmdc_rqcfilter",
"workflow_root": "/pscratch/sd/n/nmjaws/nmdc-prod/cromwell-executions/nmdc_rqcfilter/0fddc559-833e-4e14-9fa5-1e3d485b232d"
}
-
Query the
jobs
table in the NMDC database based onwas_informed_by
a specific DataGeneration IDdb.getCollection("jobs").find({ "config.was_informed_by": "nmdc:omprc-11-sdyccb57" })
Similarly, you can query
workflow_executions
to find results based onwas_informed_by
a specific DataGeneration IDdb.getCollection("workflow_execution_set").find({ "was_informed_by": "nmdc:omprc-11-sdyccb57" })
-
Job document example
Example database entry
{ "workflow" : { "id" : "Metagenome Assembly: v1.0.9" }, "id" : "nmdc:9380c834-fab7-11ef-b4bd-0a13321f5970", "created_at" : "2025-03-06T18:19:43.000+0000", "config" : { "git_repo" : "https://github.com/microbiomedata/metaAssembly", "release" : "v1.0.9", "wdl" : "jgi_assembly.wdl", "activity_id" : "nmdc:wfmgas-12-k8dxr170.1", "activity_set" : "workflow_execution_set", "was_informed_by" : "nmdc:omprc-11-sdyccb57", "trigger_activity" : "nmdc:wfrqc-12-dvn15085.1", "iteration" : 1, "input_prefix" : "jgi_metaAssembly", "inputs" : { "input_files" : "https://data.microbiomedata.org/data/nmdc:omprc-11-sdyccb57/nmdc:wfrqc-12-dvn15085.1/nmdc_wfrqc-12-dvn15085.1_filtered.fastq.gz", "proj" : "nmdc:wfmgas-12-k8dxr170.1", "shortRead" : false }, "input_data_objects" : [], "activity" : {}, "outputs" : [] }, "claims" : [ ] }
Things to note:
config.was_informed_by
is the DataGeneration ID that is the root of this jobconfig.trigger_activity
is the WorkflowExecution ID that triggered this jobconfig.inputs
are the inputs to the jobclaims
a list of workers that have claimed the job. If this list is empty, the job is available to be claimed. If the list is not empty, the job is being processed by a worker - example:{ "op_id" : "nmdc:sys0z232qf64", "site_id" : "NERSC" }
This refers to the operation
and site
that is processing the job.
The Watcher maintains a state file with job configuration, metadata and status information. The location of the
state file is defined in the site configuration file. For dev this location is:
/global/cfs/cdirs/m3408/var/dev/agent.state
Example State File Entry
{
"workflow": {
"id": "Metagenome Assembly: v1.0.9"
},
"created_at": "2025-03-06T18:19:43",
"config": {
"git_repo": "https://github.com/microbiomedata/metaAssembly",
"release": "v1.0.9",
"wdl": "jgi_assembly.wdl",
"activity_id": "nmdc:wfmgas-12-k8dxr170.1",
"activity_set": "workflow_execution_set",
"was_informed_by": "nmdc:omprc-11-sdyccb57",
"trigger_activity": "nmdc:wfrqc-12-dvn15085.1",
"iteration": 1,
"input_prefix": "jgi_metaAssembly",
"inputs": {
"input_files": "https://data.microbiomedata.org/data/nmdc:omprc-11-sdyccb57/nmdc:wfrqc-12-dvn15085.1/nmdc_wfrqc-12-dvn15085.1_filtered.fastq.gz",
"proj": "nmdc:wfmgas-12-k8dxr170.1",
"shortRead": false
},
"input_data_objects": [],
"activity": {},
"outputs": []
},
"claims": [],
"opid": "nmdc:sys0z232qf64",
"done": true,
"start": "2025-03-06T19:24:52.176365+00:00",
"cromwell_jobid": "0b138671-824d-496a-b681-24fb6cb207b3",
"last_status": "Failed",
"nmdc_jobid": "nmdc:9380c834-fab7-11ef-b4bd-0a13321f5970",
"failed_count": 3
}
Similar to a jobs
record, with these additional things to note:
done
is a boolean indicating if the job is completecromwell_jobid
is the job ID from the Cromwell servicelast_status
is the last known status of the job - this is updated by the watcherfailed_count
is the number of times the job has failed
By default, the Watcher will retry a failed job 1 additional time via jaws resubmit
.
If the job fails again, the Watcher will mark the job as done
and update the status to Failed
.
Some things to note:
For jobs that have failed for with a transient incomplete data download, these may be resolved by invoking the jaws download $jaws_jobid
command
For jobs that may have failed due to Cromwell or other system errors and need to be resubmitted, use the API release endpoint to mark a claimed job as failed and have JAWS resubmit the job if the JAWS job itself cannot be resubmitted. This will increase the claims
array in the jobs
record by 1.