Skip to content

Commit

Permalink
diagram of ingest workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Bento007 committed Jan 31, 2025
1 parent d1af140 commit 107c127
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 136 deletions.
45 changes: 44 additions & 1 deletion .happy/terraform/modules/sfn/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ resource "aws_sfn_state_machine" "state_machine" {
"Validate": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Next": "Cxg",
"Next": "AddLabels",
"Parameters": {
"JobDefinition":"${var.job_definition_arn}",
"JobName": "validate",
Expand All @@ -45,6 +45,49 @@ resource "aws_sfn_state_machine" "state_machine" {
"Name": "DROPBOX_URL",
"Value.$": "$.url"
},
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
{
"Name": "STEP_NAME",
"Value": "download"
},
{
"Name": "TASK_TOKEN",
"Value.$": "$$.Task.Token"
}
]
}
},
"TimeoutSeconds": ${local.timeout},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "HandleErrors",
"ResultPath": "$.error"
}
],
"Retry": [ {
"ErrorEquals": ["AWS.Batch.TooManyRequestsException", "Batch.BatchException", "Batch.AWSBatchException"],
"IntervalSeconds": 2,
"MaxAttempts": 7,
"BackoffRate": 5
} ],
"ResultPath": "$.batch"
},
"AddLabels": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Next": "Cxg",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "add_labels",
"JobQueue.$": "$.job_queue",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
Expand Down
91 changes: 91 additions & 0 deletions .happy/terraform/modules/sfn/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
NEW

```mermaid
---
title: Cellxgene Ingestion State Machine
---
stateDiagram-v2
Validate: Validate
ValidateAnndata: ValidateAnndata
note left of ValidateAnndata
DatasetStatusKey.VALIDATION, DatasetValidationStatus.VALIDATING
1 download anndata file
DatasetStatusKey.H5AD, DatasetValidationStatus.VALIDATING
2 validate anndata file
DatasetStatusKey.H5AD, DatasetValidationStatus.VALID
3 upload anndata file
Fail: DatasetStatusKey.H5AD, DatasetValidationStatus.INVALID
DatasetStatusKey.VALIDATION, DatasetProcessingStatus.FAILURE
end note
ValidateFragment: ValidateFragment
note left of ValidateFragment
1 download fragment file and anndata file
DatasetStatusKey.ATAC_SEQ_FRAGMENT, DatasetValidationStatus.VALIDATING
2 validate fragment file
DatasetStatusKey.ATAC_SEQ_FRAGMENT, DatasetValidationStatus.VALID
DatasetStatusKey.ATAC_SEQ_FRAGMENT, DatasetConversionStatus.CONVERTING
3 generate index
DatasetStatusKey.ATAC_SEQ_FRAGMENT, DatasetConversionStatus.CONVERTED
DatasetStatusKey.ATAC_SEQ_FRAGMENT, DatasetConversionStatus.UPLOADING
4 upload fragment file and index
DatasetStatusKey.ATAC_SEQ_FRAGMENT, DatasetUploadStatus.UPLOADED
Fail: DatasetStatusKey.FRAGMENT, DatasetValidationStatus.INVALID
end note
AddLabels: AddLabels
note left of AddLabels
1 download original h5ad
DatasetStatusKey.H5AD, DatasetConversionStatus.CONVERTING
1 Add labels to anndata file
DatasetStatusKey.H5AD, DatasetConversionStatus.CONVERTED
2 extract metadata
DatasetStatusKey.H5AD, DatasetConversionStatus.UPLOADING
3 upload anndata file
DatasetStatusKey.H5AD, DatasetConversionStatus.UPLOADED
DatasetStatusKey.VALIDATION, DatasetProcessingStatus.SUCCESS
Fail: DatasetStatusKey.H5AD, DatasetConversionStatus.FAILED
DatasetStatusKey.H5AD, DatasetProcessingStatus.H5AD
end note
CxgSeuratParallel: CxgSeuratParallel
HandleSuccess: HandleSuccess
HandleErrors: HandleErrors
CheckForErrors: CheckForErrors
ConversionError: ConversionError
DownloadValidateError: DownloadValidateError
EndPass: EndPass
Cxg: Cxg
Seurat: Seurat
CatchCxgFailure: CatchCxgFailure
CatchSeuratFailure: CatchSeuratFailure
[*] --> Validate
state Validate {
[*] --> ValidateAnndata: has anndata
ValidateAnndata --> [*]
state hasFragment <<choice>>
[*] --> hasFragment: has fragment
hasFragment --> ValidateFragment: yes
hasFragment --> [*]: no
ValidateFragment --> [*]
}
Validate --> AddLabels:
AddLabels --> CxgSeuratParallel
state CxgSeuratParallel {
[*] --> Cxg
Cxg --> CatchCxgFailure
Cxg --> [*]
[*] --> Seurat
Seurat --> CatchSeuratFailure
Seurat --> [*]
}
CxgSeuratParallel --> HandleSuccess
HandleSuccess --> CheckForErrors
CheckForErrors --> DownloadValidateError: $.error
CheckForErrors --> ConversionError: $[0].error or $[1].error
CheckForErrors --> EndPass: Default
Validate --> HandleErrors
AddLabels --> HandleErrors
HandleErrors --> CheckForErrors
ConversionError --> [*]
DownloadValidateError --> [*]
EndPass --> [*]
```
3 changes: 3 additions & 0 deletions backend/layers/common/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DatasetStatusKey(str, Enum):
CXG = "cxg"
RDS = "rds"
H5AD = "h5ad"
ATAC_SEQ_FRAGMENT = "atac_seq_fragment"
PROCESSING = "processing"


Expand Down Expand Up @@ -80,6 +81,8 @@ class DatasetArtifactType(str, Enum):
H5AD = "h5ad"
RDS = "rds"
CXG = "cxg"
ATAC_SEQ_FRAGMENT = "atac_seq_fragment"
ATAC_SEQ_FRAGMENT_INDEX = "atac_seq_fragment_index"


class Visibility(Enum):
Expand Down
2 changes: 1 addition & 1 deletion backend/layers/persistence/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ def update_dataset_validation_message(self, version_id: DatasetVersionId, valida
with self._get_serializable_session() as session:
dataset_version = session.query(DatasetVersionTable).filter_by(id=version_id.id).one()
dataset_version_status = deepcopy(dataset_version.status)
dataset_version_status["validation_message"] = validation_message
dataset_version_status["validation_message"] = validation_message # TODO append message.
dataset_version.status = dataset_version_status

def get_dataset_version_status(self, version_id: DatasetVersionId) -> DatasetStatus:
Expand Down
4 changes: 4 additions & 0 deletions backend/layers/processing/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class ValidationFailed(ProcessingException):
errors: List[str]


class AddLabelsFailed(ProcessingException):
errors: List[str]


class ProcessingFailed(ProcessingException):
pass

Expand Down
12 changes: 9 additions & 3 deletions backend/layers/processing/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
ValidationFailed,
)
from backend.layers.processing.logger import configure_logging
from backend.layers.processing.process_add_labels import ProcessAddLabels
from backend.layers.processing.process_cxg import ProcessCxg
from backend.layers.processing.process_logic import ProcessingLogic
from backend.layers.processing.process_validate import ProcessValidate
from backend.layers.processing.process_validate import ProcessValidateH5AD
from backend.layers.processing.schema_migration import SchemaMigrate
from backend.layers.thirdparty.s3_provider import S3Provider, S3ProviderInterface
from backend.layers.thirdparty.schema_validator_provider import (
Expand All @@ -41,7 +42,7 @@ class ProcessMain(ProcessingLogic):
Main class for the dataset pipeline processing
"""

process_validate: ProcessValidate
process_validate: ProcessValidateH5AD
process_cxg: ProcessCxg

def __init__(
Expand All @@ -56,9 +57,10 @@ def __init__(
self.uri_provider = uri_provider
self.s3_provider = s3_provider
self.schema_validator = schema_validator
self.process_validate = ProcessValidate(
self.process_validate = ProcessValidateH5AD(
self.business_logic, self.uri_provider, self.s3_provider, self.schema_validator
)
self.process_add_labels = ProcessAddLabels(self.business_logic, self.uri_provider, self.s3_provider)
self.process_cxg = ProcessCxg(self.business_logic, self.uri_provider, self.s3_provider)
self.schema_migrate = SchemaMigrate(self.business_logic, self.schema_validator)

Expand Down Expand Up @@ -101,6 +103,10 @@ def process(
self.process_validate.process(
collection_version_id, dataset_version_id, dropbox_uri, artifact_bucket, datasets_bucket
)
elif step_name == "add_labels":
self.process_add_labels.process(
collection_version_id, dataset_version_id, artifact_bucket, datasets_bucket
)
elif step_name == "cxg":
self.process_cxg.process(dataset_version_id, artifact_bucket, cxg_bucket)
elif step_name == "cxg_remaster":
Expand Down
Loading

0 comments on commit 107c127

Please sign in to comment.