Skip to content

Commit

Permalink
Merge branch '275-watcherwatch-not-handling-state-file-correctly' int…
Browse files Browse the repository at this point in the history
…o 256-configure-and-deploy-berkley-rc1
  • Loading branch information
mbthornton-lbl committed Oct 28, 2024
2 parents 41ca2e7 + 11269f4 commit 797242b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 30 deletions.
4 changes: 4 additions & 0 deletions nmdc_automation/api/nmdcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ def get_op(self, opid):

@refresh_token
def update_op(self, opid, done=None, results=None, meta=None):
"""
Update an operation with the given ID with the specified parameters.
Returns the updated operation.
"""
url = "%soperations/%s" % (self._base_url, opid)
d = dict()
if done is not None:
Expand Down
2 changes: 1 addition & 1 deletion nmdc_automation/run_process/run_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def daemon(ctx):
@click.argument("opid")
def reset(ctx, opid):
watcher = ctx.obj
print(watcher.nmdc.update_op(opid, done=False))
print(watcher.nmdc.update_operation(opid, done=False))


if __name__ == "__main__":
Expand Down
77 changes: 48 additions & 29 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/usr/bin/env python

from time import sleep
import os
import json
import logging
import shutil
from json import loads
from pathlib import Path
from typing import List, Dict, Any, Optional, Union, Tuple
Expand All @@ -13,7 +11,6 @@
from nmdc_automation.api import NmdcRuntimeApi
from nmdc_automation.config import SiteConfig
from .wfutils import WorkflowJob
from .wfutils import _md5


DEFAULT_STATE_DIR = Path(__file__).parent / "_state"
Expand All @@ -30,8 +27,10 @@ def __init__(self, config: SiteConfig, state_file: Union[str, Path] = None):
self._state_file = None
# set state file
if state_file:
logger.info(f"Using state file: {state_file}")
self._state_file = Path(state_file)
elif self.config.agent_state:
logger.info(f"Using state file from config: {self.config.agent_state}")
self._state_file = Path(self.config.agent_state)
else:
# no state file provided or set in config set up a default
Expand All @@ -42,6 +41,7 @@ def __init__(self, config: SiteConfig, state_file: Union[str, Path] = None):
if DEFAULT_STATE_FILE.stat().st_size == 0:
with open(DEFAULT_STATE_FILE, "w") as f:
json.dump(INITIAL_STATE, f, indent=2)
logger.info(f"Using default state file: {DEFAULT_STATE_FILE}")
self._state_file = DEFAULT_STATE_FILE

@property
Expand All @@ -54,7 +54,7 @@ def state_file(self, value) -> None:
""" Set the state file path """
self._state_file = value

def read_state(self)-> Optional[Dict[str, Any]]:
def read_state(self) -> Optional[Dict[str, Any]]:
""" Read the state file and return the data """
logging.info(f"Reading state from {self.state_file}")
with open(self.state_file, "r") as f:
Expand All @@ -64,9 +64,13 @@ def read_state(self)-> Optional[Dict[str, Any]]:
def write_state(self, data) -> None:
""" Write data to the state file """
# normalize "id" used in database job records to "nmdc_jobid"
job_count = 0
for job in data["jobs"]:
job_count += 1
if "id" in job:
job["nmdc_jobid"] = job.pop("id")

logger.debug(f"Writing state to {self.state_file} - updating {job_count} jobs")
with open(self.state_file, "w") as f:
json.dump(data, f, indent=2)

Expand All @@ -82,8 +86,10 @@ def write_metadata_if_not_exists(self, job: WorkflowJob)->Path:
# make sure the parent directories exist
metadata_filepath.parent.mkdir(parents=True, exist_ok=True)
if not metadata_filepath.exists():
logger.debug(f"Writing metadata to {metadata_filepath}")
with open(metadata_filepath, "w") as f:
json.dump(job.job.metadata, f)

return metadata_filepath


Expand All @@ -98,7 +104,6 @@ def __init__(self, config: SiteConfig, file_handler: FileHandler, init_cache: bo
if init_cache:
self.restore_from_state()


@property
def job_cache(self)-> List[WorkflowJob]:
""" Get the job cache """
Expand All @@ -120,53 +125,57 @@ def save_checkpoint(self) -> None:
data = self.job_checkpoint()
self.file_handler.write_state(data)

def restore_from_state(self)-> None:
def restore_from_state(self) -> None:
""" Restore jobs from state data """
logging.info("Restoring job cache from state")
logging.info(f"Job cache length: {len(self.job_cache)}")
self.job_cache = self.get_workflow_jobs_from_state()
new_jobs = self.get_new_workflow_jobs_from_state()
if new_jobs:
logger.info(f"Restoring {len(new_jobs)} jobs from state.")
self.job_cache.extend(new_jobs)

def get_workflow_jobs_from_state(self)-> List[WorkflowJob]:
""" Find jobs from state data """
def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
""" Find new jobs from state data that are not already in the job cache """
wf_job_list = []
job_cache_ids = [job.opid for job in self.job_cache]
state = self.file_handler.read_state()
jobs = state.get("jobs", [])
for job in jobs:
if job.get("opid") in job_cache_ids:

for job in state["jobs"]:
if job.get("opid") and job.get("opid") in job_cache_ids:
# already in cache
continue
wf_job = WorkflowJob(self.config, workflow_state=job)
logger.debug(f"New workflow job: {wf_job.opid} from state.")
job_cache_ids.append(wf_job.opid)
wf_job_list.append(wf_job)
logging.info(f"Restored {len(wf_job_list)} jobs from state")
return wf_job_list


def find_job_by_opid(self, opid) -> Optional[WorkflowJob]:
""" Find a job by operation id """
return next((job for job in self.job_cache if job.opid == opid), None)


def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False)-> Optional[WorkflowJob]:
""" Prepare and cache a new job """
"""
Prepare and cache a new job, if it doesn't already exist by opid.
The job can be forced to replace an existing job.
"""
if "object_id_latest" in new_job.workflow.config:
logger.warning("Old record. Skipping.")
return
existing_job = self.find_job_by_opid(opid)
if not existing_job:
logger.info(f"Prepare and cache new job: {opid}")
new_job.set_opid(opid, force=force)
new_job.done = False
self.job_cache.append(new_job)
return new_job
elif force:
logger.info(f"Replacing existing job: {existing_job.opid} with new job: {opid}")
self.job_cache.remove(existing_job)
new_job.set_opid(opid, force=force)
new_job.done = False
self.job_cache.append(new_job)
return new_job



def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
""" Get finished jobs """
successful_jobs = []
Expand All @@ -178,12 +187,15 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
successful_jobs.append(job)
elif status == "Failed" and job.opid:
failed_jobs.append(job)
if successful_jobs:
logger.info(f"Found {len(successful_jobs)} successful jobs.")
if failed_jobs:
logger.info(f"Found {len(failed_jobs)} failed jobs.")
return (successful_jobs, failed_jobs)


def process_successful_job(self, job: WorkflowJob) -> Database:
""" Process a successful job """
logger.info(f"Running post for op {job.opid}")
""" Process a successful job and return a Database object """
logger.info(f"Process successful job: {job.opid}")

output_path = self.file_handler.get_output_path(job)
if not output_path.exists():
Expand All @@ -199,7 +211,6 @@ def process_successful_job(self, job: WorkflowJob) -> Database:
self.file_handler.write_metadata_if_not_exists(job)
return database


def process_failed_job(self, job: WorkflowJob) -> None:
""" Process a failed job """
if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS:
Expand All @@ -213,14 +224,17 @@ def process_failed_job(self, job: WorkflowJob) -> None:


class RuntimeApiHandler:
""" RuntimeApiHandler class for managing API calls to the runtime """
def __init__(self, config):
self.runtime_api = NmdcRuntimeApi(config)
self.config = config

def claim_job(self, job_id):
""" Claim a job by its ID """
return self.runtime_api.claim_job(job_id)

def get_unclaimed_jobs(self, allowed_workflows)-> List[WorkflowJob]:
def get_unclaimed_jobs(self, allowed_workflows) -> List[WorkflowJob]:
""" Get unclaimed jobs from the runtime """
jobs = []
filt = {
"workflow.id": {"$in": allowed_workflows},
Expand All @@ -234,13 +248,16 @@ def get_unclaimed_jobs(self, allowed_workflows)-> List[WorkflowJob]:
return jobs

def post_objects(self, database_obj):
""" Post a Database with workflow executions and their data objects to the workflow_executions endpoint """
return self.runtime_api.post_objects(database_obj)

def update_op(self, opid, done, meta):
def update_operation(self, opid, done, meta):
""" Update the state of an operation with new metadata, results, and done status """
return self.runtime_api.update_op(opid, done=done, meta=meta)


class Watcher:
""" Watcher class for monitoring and managing jobs """
def __init__(self, site_configuration_file: Union[str, Path], state_file: Union[str, Path] = None):
self._POLL = 20
self._MAX_FAILS = 2
Expand All @@ -260,6 +277,7 @@ def restore_from_checkpoint(self, state_data: Dict[str, Any] = None)-> None:


def cycle(self):
""" Perform a cycle of watching for unclaimed jobs, claiming jobs, and processing finished jobs """
self.restore_from_checkpoint()
if not self.should_skip_claim:
unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
Expand All @@ -277,7 +295,7 @@ def cycle(self):
continue
job.done = True
# update the operation record
resp = self.runtime_api_handler.update_op(
resp = self.runtime_api_handler.update_operation(
job.opid, done=True, meta=job.job.metadata
)
if not resp.ok:
Expand All @@ -288,6 +306,7 @@ def cycle(self):
self.job_manager.process_failed_job(job)

def watch(self):
""" Maintain a polling loop to 'cycle' through job claims and processing """
logger.info("Entering polling loop")
while True:
try:
Expand All @@ -296,10 +315,10 @@ def watch(self):
logger.exception(f"Error occurred during cycle: {e}", exc_info=True)
sleep(self._POLL)


def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None):
# unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None) -> None:
""" Claim unclaimed jobs, prepare them, and submit them. Write a checkpoint after claiming jobs. """
for job in unclaimed_jobs:
logger.info(f"Claiming job {job.workflow.nmdc_jobid}")
claim = self.runtime_api_handler.claim_job(job.workflow.nmdc_jobid)
opid = claim["detail"]["id"]
new_job = self.job_manager.prepare_and_cache_new_job(job, opid)
Expand Down

0 comments on commit 797242b

Please sign in to comment.