Skip to content

Commit

Permalink
clean up imports and add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Oct 28, 2024
1 parent 1cbe324 commit eaac2a5
Showing 1 changed file with 3 additions and 11 deletions.
14 changes: 3 additions & 11 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/usr/bin/env python

from time import sleep
import os
import json
import logging
import shutil
from json import loads
from pathlib import Path
from typing import List, Dict, Any, Optional, Union, Tuple
Expand All @@ -13,7 +11,6 @@
from nmdc_automation.api import NmdcRuntimeApi
from nmdc_automation.config import SiteConfig
from .wfutils import WorkflowJob
from .wfutils import _md5


DEFAULT_STATE_DIR = Path(__file__).parent / "_state"
Expand Down Expand Up @@ -97,7 +94,6 @@ def __init__(self, config: SiteConfig, file_handler: FileHandler, init_cache: bo
if init_cache:
self.restore_from_state()


@property
def job_cache(self)-> List[WorkflowJob]:
""" Get the job cache """
Expand Down Expand Up @@ -137,12 +133,10 @@ def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
wf_job_list.append(wf_job)
return wf_job_list


def find_job_by_opid(self, opid) -> Optional[WorkflowJob]:
""" Find a job by operation id """
return next((job for job in self.job_cache if job.opid == opid), None)


def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False)-> Optional[WorkflowJob]:
""" Prepare and cache a new job """
if "object_id_latest" in new_job.workflow.config:
Expand All @@ -161,8 +155,6 @@ def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False
self.job_cache.append(new_job)
return new_job



def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
""" Get finished jobs """
successful_jobs = []
Expand All @@ -176,10 +168,9 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
failed_jobs.append(job)
return (successful_jobs, failed_jobs)


def process_successful_job(self, job: WorkflowJob) -> Database:
""" Process a successful job """
logger.info(f"Running post for op {job.opid}")
logger.info(f"Process successful job: {job.opid}")

output_path = self.file_handler.get_output_path(job)
if not output_path.exists():
Expand All @@ -195,7 +186,6 @@ def process_successful_job(self, job: WorkflowJob) -> Database:
self.file_handler.write_metadata_if_not_exists(job)
return database


def process_failed_job(self, job: WorkflowJob) -> None:
""" Process a failed job """
if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS:
Expand All @@ -209,11 +199,13 @@ def process_failed_job(self, job: WorkflowJob) -> None:


class RuntimeApiHandler:
""" RuntimeApiHandler class for managing API calls to the runtime """
def __init__(self, config):
self.runtime_api = NmdcRuntimeApi(config)
self.config = config

def claim_job(self, job_id):
""" Claim a job by its ID """
return self.runtime_api.claim_job(job_id)

def get_unclaimed_jobs(self, allowed_workflows)-> List[WorkflowJob]:
Expand Down

0 comments on commit eaac2a5

Please sign in to comment.