Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cromwell #188

Merged
merged 17 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ Dockerfile
.git
LICENSE
CHRIS_REMOTE_FS
venv/

4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ jobs:
- name: provision local services
run: |
docker swarm init --advertise-addr 127.0.0.1
docker build --build-arg ENVIRONMENT=local -t fnndsc/pman:dev .
./make.sh -s -U -i
docker build --build-arg ENVIRONMENT=local -t local/pman:dev .
./make.sh -s -U -i local:dev
- name: nosetests
run: docker exec $(docker ps -f label=org.chrisproject.role=pman -q | head -n 1) nosetests --exe tests
- name: teardown
Expand Down
3 changes: 3 additions & 0 deletions pman/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __init__(self):
if self.CONTAINER_ENV == 'kubernetes':
self.JOB_NAMESPACE = env('JOB_NAMESPACE', 'default')

if self.CONTAINER_ENV == 'cromwell':
self.CROMWELL_URL = env('CROMWELL_URL')

self.env = env


Expand Down
Empty file added pman/cromwell/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions pman/cromwell/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import io
import json
from dataclasses import dataclass
from typing import Optional, Dict
from .models import (
WorkflowId, StrWdl,
WorkflowIdAndStatus, WorkflowQueryResponse, WorkflowMetadataResponse
)
from cromwell_tools.cromwell_api import CromwellAPI, CromwellAuth
from serde.json import from_json
from os import path
import requests


@dataclass
class CromwellClient:
"""
A wrapper around :mod:`cromwell_tools.cromwell_api` providing a similar
interface but with typed parameters and returns.
"""
auth: CromwellAuth

def submit(self, wdl: StrWdl, label: Dict[str, str]) -> WorkflowIdAndStatus:
"""
Schedule a WDL file to be executed.

:param wdl: WDL
:param label: labels to apply to this workflow
:return: response from Cromwell
"""
res = CromwellAPI.submit(
auth=self.auth,
wdl_file=self.__str2bytesio(wdl),
label_file=self.__create_label(label),
raise_for_status=True
)
return from_json(WorkflowIdAndStatus, res.text)

def status(self, uuid: WorkflowId) -> Optional[WorkflowIdAndStatus]:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#retrieves-the-current-state-for-a-workflow

:return: workflow ID and status, or None if workflow not found
"""
res = CromwellAPI.status(uuid=uuid, auth=self.auth, raise_for_status=False)
if res.status_code == 404:
return None
res.raise_for_status()
return from_json(WorkflowIdAndStatus, res.text)

def query(self, label: Optional[Dict[str, str]] = None) -> WorkflowQueryResponse:
query_dict = {}
if label:
query_dict['label'] = label
res = CromwellAPI.query(query_dict=query_dict,
auth=self.auth, raise_for_status=True)
return from_json(WorkflowQueryResponse, res.text)

def metadata(self, uuid: WorkflowId) -> Optional[WorkflowMetadataResponse]:
res = CromwellAPI.metadata(uuid=uuid,
auth=self.auth, raise_for_status=False)
if res.status_code == 404:
return None
res.raise_for_status()
return from_json(WorkflowMetadataResponse, res.text)

def abort(self, uuid: WorkflowId) -> WorkflowIdAndStatus:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#abort-a-running-workflow
"""
res = CromwellAPI.abort(uuid=uuid, auth=self.auth, raise_for_status=True)
return from_json(WorkflowIdAndStatus, res.text)

def logs_idc(self, uuid: WorkflowId) -> dict:
"""
This method is not available in upstream cromwell-tools.
"""
uri = path.join(self.auth.url, 'api/workflows/v1', uuid, 'logs')
res = requests.get(uri)
res.raise_for_status()
return res.json()

@classmethod
def __create_label(cls, d: Dict[str, str]) -> io.BytesIO:
"""
Create Cromwell labels from a dictionary of key-value pairs.

https://cromwell.readthedocs.io/en/stable/cromwell_features/Labels/
"""
return cls.__str2bytesio(json.dumps(d))

@staticmethod
def __str2bytesio(s: str) -> io.BytesIO:
return io.BytesIO(bytes(s, 'utf-8'))
143 changes: 143 additions & 0 deletions pman/cromwell/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""
Data definitions for Cromwell API responses.
"""
from enum import Enum
from serde import deserialize
from typing import NewType, List, Dict, Optional
from pman.abstractmgr import TimeStamp
from pathlib import Path


StrWdl = NewType('StrWdl', str)
"""WDL as a :type:`str`."""
WorkflowName = NewType('WorkflowName', str)
WorkflowId = NewType('WorkflowId', str)
RuntimeAttributes = Dict[str, str]
"""
Custom information about a task call from Cromwell workflow metadata,
defined by how Cromwell's backend is configured.

p.s. a type alias bc https://github.com/yukinarit/pyserde/issues/192
"""


class WorkflowStatus(Enum):
"""
https://github.com/broadinstitute/cromwell/blob/32d5d0cbf07e46f56d3d070f457eaff0138478d5/wes2cromwell/src/main/scala/wes2cromwell/WesState.scala#L19-L28
"""

# btw, the Cromwell documentation is not accurate. It's missing the "On Hold" status.
# https://broadworkbench.atlassian.net/browse/CROM-6869

OnHold = 'On Hold'
Submitted = 'Submitted'
Running = 'Running'
Aborting = 'Aborting'
Aborted = 'Aborted'
Succeeded = 'Succeeded'
Failed = 'Failed'


@deserialize
class WorkflowIdAndStatus:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowidandstatus
"""
id: WorkflowId
status: WorkflowStatus


@deserialize
class WorkflowQueryResult:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowqueryresult
"""
end: Optional[TimeStamp]
id: WorkflowId
# name will be undefined for the first few seconds after submission
name: Optional[WorkflowName]
start: Optional[TimeStamp]
status: WorkflowStatus
submission: Optional[TimeStamp]


@deserialize
class WorkflowQueryResponse:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowqueryresponse
"""
results: List[WorkflowQueryResult]
totalResultsCount: int


# doesn't seem correct
# @deserialize
# class FailureMessage:
# """
# https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#failuremessage
# """
# failure: str
# timestamp: TimeStamp

@deserialize
class CausedFailure:
message: str
causedBy: List # is actually a List['CausedFailure'],
# but pyserde does not support circular data definition


@deserialize
class CallMetadata:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#callmetadata
"""
# these are the conventional fields
backend: Optional[str]
backendLogs: Optional[dict]
backendStatus: Optional[str]
end: Optional[TimeStamp]
executionStatus: str
failures: Optional[List[CausedFailure]]
inputs: Optional[dict]
jobId: Optional[str]
returnCode: Optional[int]
start: Optional[TimeStamp]
stderr: Optional[Path]
stdout: Optional[Path]
# these fields are not documented, yet they are very important
commandLine: Optional[str]
runtimeAttributes: Optional[RuntimeAttributes]
attempt: Optional[int]
# and these, we don't care about
# compressedDockerSize: int
# callCaching: CallCaching
# shardIndex: int


@deserialize
class SubmittedFiles:
workflow: StrWdl
root: str
options: str
inputs: str
workflowUrl: str
labels: str


@deserialize
class WorkflowMetadataResponse:
"""
https://cromwell.readthedocs.io/en/stable/api/RESTAPI/#workflowmetadataresponse
"""
calls: Optional[Dict[str, List[CallMetadata]]]
end: Optional[TimeStamp]
failures: Optional[List[CausedFailure]]
id: WorkflowId
inputs: Optional[dict]
outputs: Optional[dict]
start: Optional[TimeStamp]
status: WorkflowStatus
submission: TimeStamp
# these fields are undocumented
labels: Dict[str, str]
submittedFiles: Optional[SubmittedFiles]
Loading