Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialise ICAv2 Data Copy Manager #788

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -33,6 +33,6 @@ repos:
rev: v2.7.1
hooks:
- id: prettier
entry: prettier --ignore-unknown
entry: yarn prettier --ignore-unknown
args: [ '--check' ]
exclude: ^(skel/|yarn.lock|.yarn/|.local/|docs/|openapi/)
583 changes: 292 additions & 291 deletions .yarn/releases/yarn-4.6.0.cjs → .yarn/releases/yarn-4.7.0.cjs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion .yarnrc.yml
Original file line number Diff line number Diff line change
@@ -9,4 +9,4 @@ plugins:
path: .yarn/plugins/@yarnpkg/plugin-outdated.cjs
spec: "https://go.mskelton.dev/yarn-outdated/v4"

yarnPath: .yarn/releases/yarn-4.6.0.cjs
yarnPath: .yarn/releases/yarn-4.7.0.cjs
99 changes: 99 additions & 0 deletions cdk.context.json
Original file line number Diff line number Diff line change
@@ -490,5 +490,104 @@
"security-group:account=472057503814:region=ap-southeast-2:securityGroupName=OrcaBusSharedComputeSecurityGroup:vpcId=vpc-0dc99f521ceaa3f2d": {
"securityGroupId": "sg-02e363a39220c955f",
"allowAllOutbound": false
},
"vpc-provider:account=843407916570:filter.isDefault=true:region=ap-southeast-2:returnAsymmetricSubnets=true": {
"vpcId": "vpc-6ceacc0b",
"vpcCidrBlock": "172.31.0.0/16",
"ownerAccountId": "843407916570",
"availabilityZones": [],
"subnetGroups": [
{
"name": "Public",
"type": "Public",
"subnets": [
{
"subnetId": "subnet-d93b35be",
"cidr": "172.31.0.0/20",
"availabilityZone": "ap-southeast-2a",
"routeTableId": "rtb-8a56feec"
},
{
"subnetId": "subnet-5549721c",
"cidr": "172.31.32.0/20",
"availabilityZone": "ap-southeast-2b",
"routeTableId": "rtb-8a56feec"
},
{
"subnetId": "subnet-d52e978d",
"cidr": "172.31.16.0/20",
"availabilityZone": "ap-southeast-2c",
"routeTableId": "rtb-8a56feec"
}
]
}
]
},
"acknowledged-issue-numbers": [
32775
],
"vpc-provider:account=455634345446:filter.isDefault=true:region=ap-southeast-2:returnAsymmetricSubnets=true": {
"vpcId": "vpc-0a4856f21c3f95e43",
"vpcCidrBlock": "172.31.0.0/16",
"ownerAccountId": "455634345446",
"availabilityZones": [],
"subnetGroups": [
{
"name": "Public",
"type": "Public",
"subnets": [
{
"subnetId": "subnet-066ef61d855e7a897",
"cidr": "172.31.32.0/20",
"availabilityZone": "ap-southeast-2a",
"routeTableId": "rtb-0fa8244edbc08e632"
},
{
"subnetId": "subnet-0390478d9900fcbba",
"cidr": "172.31.0.0/20",
"availabilityZone": "ap-southeast-2b",
"routeTableId": "rtb-0fa8244edbc08e632"
},
{
"subnetId": "subnet-01fcbe72e7b6399d2",
"cidr": "172.31.16.0/20",
"availabilityZone": "ap-southeast-2c",
"routeTableId": "rtb-0fa8244edbc08e632"
}
]
}
]
},
"vpc-provider:account=472057503814:filter.isDefault=true:region=ap-southeast-2:returnAsymmetricSubnets=true": {
"vpcId": "vpc-dd55f8ba",
"vpcCidrBlock": "172.31.0.0/16",
"ownerAccountId": "472057503814",
"availabilityZones": [],
"subnetGroups": [
{
"name": "Public",
"type": "Public",
"subnets": [
{
"subnetId": "subnet-d2ef749b",
"cidr": "172.31.32.0/20",
"availabilityZone": "ap-southeast-2a",
"routeTableId": "rtb-d1ba9cb6"
},
{
"subnetId": "subnet-13b8d674",
"cidr": "172.31.0.0/20",
"availabilityZone": "ap-southeast-2b",
"routeTableId": "rtb-d1ba9cb6"
},
{
"subnetId": "subnet-e28174ba",
"cidr": "172.31.16.0/20",
"availabilityZone": "ap-southeast-2c",
"routeTableId": "rtb-d1ba9cb6"
}
]
}
]
}
}
21 changes: 21 additions & 0 deletions config/config.ts
Original file line number Diff line number Diff line change
@@ -67,6 +67,19 @@ import { getDataMigrateStackProps } from './stacks/dataMigrate';
import { getHtsgetProps } from './stacks/htsget';
import { getSampleSheetCheckerProps } from './stacks/sampleSheetChecker';
import { getAccessKeySecretStackProps } from './stacks/accessKeySecret';
import { getFastqManagerStackProps, getFastqManagerTableStackProps } from './stacks/fastqManager';
import {
getFastqUnarchivingManagerStackProps,
getFastqUnarchivingManagerTableStackProps,
} from './stacks/fastqUnarchivingManager';
import {
getFastqSyncManagerStackProps,
getFastqSyncManagerTableStackProps,
} from './stacks/fastqSyncManager';
import {
getIcav2DataCopyManagerStackProps,
getIcav2DataCopyManagerTableStackProps,
} from './stacks/icav2DataCopyManager';

interface EnvironmentConfig {
name: string;
@@ -108,6 +121,10 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
oncoanalyserPipelineTableStackProps: getOncoanalyserPipelineTableStackProps(),
sashPipelineTableStackProps: getSashPipelineTableStackProps(),
accessKeySecretStackProps: getAccessKeySecretStackProps(stage),
fastqManagerTableStackProps: getFastqManagerTableStackProps(stage),
fastqUnarchivingManagerTableStackProps: getFastqUnarchivingManagerTableStackProps(),
fastqSyncManagerTableStackProps: getFastqSyncManagerTableStackProps(),
icav2DataCopyTableStackProps: getIcav2DataCopyManagerTableStackProps(),
},
statelessConfig: {
metadataManagerStackProps: getMetadataManagerStackProps(stage),
@@ -138,6 +155,10 @@ export const getEnvironmentConfig = (stage: AppStage): EnvironmentConfig | null
htsgetProps: getHtsgetProps(stage),
sampleSheetCheckerProps: getSampleSheetCheckerProps(stage),
pgDDProps: getPgDDProps(stage),
fastqManagerStackProps: getFastqManagerStackProps(stage),
fastqUnarchivingManagerStackProps: getFastqUnarchivingManagerStackProps(stage),
fastqSyncManagerStackProps: getFastqSyncManagerStackProps(),
icav2DataCopyManagerStackProps: getIcav2DataCopyManagerStackProps(stage),
},
};

90 changes: 90 additions & 0 deletions config/constants.ts
Original file line number Diff line number Diff line change
@@ -42,6 +42,9 @@ export const vpcProps: VpcLookupOptions = {
export const authStackHttpLambdaAuthorizerParameterName =
'/orcabus/authorization-stack/http-lambda-authorization-arn';

// The hosted zone name ssm parameter path
export const hostedZoneNameParameterPath = '/hosted_zone/umccr/name';

// upstream infra: cognito
export const cognitoPortalAppClientIdParameterName =
'/data_portal/client/data2/cog_app_client_id_stage';
@@ -86,6 +89,12 @@ export const icav2PipelineCacheBucket: Record<AppStage, string> = {
[AppStage.PROD]: 'pipeline-prod-cache-503977275616-ap-southeast-2',
};

export const icav2PipelineCachePrefix: Record<AppStage, string> = {
[AppStage.BETA]: 'byob-icav2/development/',
[AppStage.GAMMA]: 'byob-icav2/staging/',
[AppStage.PROD]: 'byob-icav2/production/',
};

// The test inventory bucket for dev.
export const fileManagerInventoryBucket: Record<AppStage.BETA, string> = {
[AppStage.BETA]: 'filemanager-inventory-test',
@@ -889,3 +898,84 @@ export const oraDecompressionIcav2ReadyEventSource = 'orcabus.workflowmanager';
export const oraDecompressionIcav2EventSource = 'orcabus.oradecompression';
export const oraDecompressionIcav2EventDetailType = 'FastqListRowDecompressed';
export const oraDecompressionStateMachinePrefix = 'oraDecompressionSfn';

/*
Fastq Manager
*/

// Tables
export const fastqListRowTableName = 'fastqManagerDynamoDBTable';
export const fastqSetTableName = 'fastqSetDynamoDBTable';
export const fastqJobTableName = 'fastqJobDynamoDBTable';

// Table indexes
export const fastqListRowManagerIndexes = [
'rgid_ext',
'instrument_run_id',
'library_orcabus_id',
'fastq_set_id',
];
export const fastqSetManagerIndexes = ['rgid_ext', 'instrument_run_id', 'library_orcabus_id'];
export const fastqJobManagerIndexes = ['fastq_id', 'job_type', 'status'];

// S3 Buckets
export const fastqManagerCacheBucket: Record<AppStage, string> = {
[AppStage.BETA]: `fastq-manager-cache-${accountIdAlias.beta}-ap-southeast-2`,
[AppStage.GAMMA]: `fastq-manager-cache-${accountIdAlias.gamma}-ap-southeast-2`,
[AppStage.PROD]: `fastq-manager-cache-${accountIdAlias.prod}-ap-southeast-2`,
};

export const ntsmBucket: Record<AppStage, string> = {
[AppStage.BETA]: `ntsm-fingerprints-${accountIdAlias.beta}-ap-southeast-2`,
[AppStage.GAMMA]: `ntsm-fingerprints-${accountIdAlias.gamma}-ap-southeast-2`,
[AppStage.PROD]: `ntsm-fingerprints-${accountIdAlias.prod}-ap-southeast-2`,
};

// Events
export const fastqManagerEventSource = 'orcabus.fastqmanager';
export const fastqManagerEventDetails = {
createFastqListRow: 'FastqListRowCreated',
updateFastqListRow: 'FastqListRowUpdated',
deleteFastqListRow: 'FastqListRowDeleted',
createFastqSet: 'FastqListSetCreated',
updateFastqSet: 'FastqListSetUpdated',
mergeFastqSet: 'FastqListSetMerged',
deleteFastqSet: 'FastqListSetDeleted',
};

/*
S3 Copy Steps Function ARNs by account id
*/
export const s3CopyStepsFunctionArn: Record<AppStage, string> = {
[AppStage.BETA]: `arn:aws:states:${region}:${accountIdAlias.beta}:stateMachine:StepsS3CopyStateMachine157A1409-jx4WNxpdckgQ`, // pragma: allowlist secret
[AppStage.GAMMA]: `arn:aws:states:${region}:${accountIdAlias.gamma}:stateMachine:StepsS3CopyStateMachine157A1409-ikBos7HzwDtL`, // pragma: allowlist secret
[AppStage.PROD]: `arn:aws:states:${region}:${accountIdAlias.prod}:stateMachine:StepsS3CopyStateMachine157A1409-YbCgUX7dCZRm`, // pragma: allowlist secret
};
export const s3CopyStepsBucket: Record<AppStage, string> = {
[AppStage.BETA]: 'stepss3copy-working66f7dd3f-x4jwbnt6qvxc', // pragma: allowlist secret
[AppStage.GAMMA]: 'stg-stepss3copystack-stepss3copyworking01b34927-szqxpff5lsbx', // pragma: allowlist secret
[AppStage.PROD]: 'prod-stepss3copystack-stepss3copyworking01b34927-mp9y88d9e1py', // pragma: allowlist secret
};

/*
Fastq Unarchiving service
*/
export const fastqUnarchivingJobTableName = 'fastqUnarchivingDynamoDBTable';
export const fastqUnarchivingJobTableIndexes = ['status', 'job_type'];
export const fastqUnarchivingEventDetailType = {
createJob: 'FastqUnarchivingJobCreated',
updateJob: 'FastqUnarchivingJobUpdated',
};
export const fastqUnarchivingManagerEventSource = 'orcabus.fastqunarchivingmanager';

/*
Fastq sync service
*/
export const fastqSyncEventDetailType = 'fastqSync';

/*
ICAv2 ProjectData Copy Manager Stack
*/
export const icav2DataCopyManagerDynamodbTableName = 'icav2DataCopyManagerDynamoDBTable';
export const icav2DataCopyEventSource = 'orcabus.icav2datacopymanager';
export const icav2DataCopySyncDetailType = 'ICAv2DataCopySync';
84 changes: 84 additions & 0 deletions config/stacks/fastqManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import {
AppStage,
fastqListRowTableName,
cognitoApiGatewayConfig,
corsAllowOrigins,
logsApiGatewayConfig,
jwtSecretName,
hostedZoneNameParameterPath,
fastqListRowManagerIndexes,
fastqSetTableName,
fastqSetManagerIndexes,
fastqJobTableName,
fastqJobManagerIndexes,
fastqManagerCacheBucket,
ntsmBucket,
fastqManagerEventSource,
fastqManagerEventDetails,
icav2PipelineCacheBucket,
icav2PipelineCachePrefix,
eventBusName,
} from '../constants';
import { FastqManagerTableConfig } from '../../lib/workload/stateful/stacks/fastq-manager-db/deploy/stack';
import { FastqManagerStackConfig } from '../../lib/workload/stateless/stacks/fastq-manager/deploy/interfaces';

// Stateful
export const getFastqManagerTableStackProps = (stage: AppStage): FastqManagerTableConfig => {
return {
/* DynamoDB table for fastq list rows */
fastqListRowDynamodbTableName: fastqListRowTableName,
fastqSetDynamodbTableName: fastqSetTableName,
fastqJobDynamodbTableName: fastqJobTableName,
/* Buckets */
fastqManagerCacheBucketName: fastqManagerCacheBucket[stage],
ntsmBucketName: ntsmBucket[stage],
};
};

// Stateless
export const getFastqManagerStackProps = (stage: AppStage): FastqManagerStackConfig => {
return {
/*
API Gateway props
*/
apiGatewayCognitoProps: {
...cognitoApiGatewayConfig,
corsAllowOrigins: corsAllowOrigins[stage],
apiGwLogsConfig: logsApiGatewayConfig[stage],
apiName: 'FastqManager',
customDomainNamePrefix: 'fastq',
},

/*
Orcabus token and zone name for external lambda functions
*/
orcabusTokenSecretsManagerPath: jwtSecretName,
hostedZoneNameSsmParameterPath: hostedZoneNameParameterPath,

/*
Data tables
*/
fastqListRowDynamodbTableName: fastqListRowTableName,
fastqSetDynamodbTableName: fastqSetTableName,
fastqJobsDynamodbTableName: fastqJobTableName,
/* Indexes - need permissions to query indexes */
fastqListRowDynamodbIndexes: fastqListRowManagerIndexes,
fastqSetDynamodbIndexes: fastqSetManagerIndexes,
fastqJobsDynamodbIndexes: fastqJobManagerIndexes,

/*
Buckets stuff
*/
pipelineCacheBucketName: icav2PipelineCacheBucket[stage],
pipelineCachePrefix: icav2PipelineCachePrefix[stage],
fastqManagerCacheBucketName: fastqManagerCacheBucket[stage],
ntsmBucketName: ntsmBucket[stage],

/*
Event bus stuff
*/
eventBusName: eventBusName,
eventSource: fastqManagerEventSource,
eventDetailType: fastqManagerEventDetails,
};
};
82 changes: 82 additions & 0 deletions config/stacks/fastqSyncManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import {
/* Events */
eventBusName,
fastqManagerEventDetails,
fastqManagerEventSource,
fastqSyncEventDetailType,
fastqUnarchivingEventDetailType,
fastqUnarchivingManagerEventSource,
hostedZoneNameParameterPath,
jwtSecretName,
} from '../constants';

import { FastqSyncManagerStackConfig } from '../../lib/workload/stateless/stacks/fastq-sync/deploy/interfaces';
import { FastqSyncTableConfig } from '../../lib/workload/stateful/stacks/fastq-sync-dynamodb/deploy/stack';

/*
export interface FastqSyncManagerStackConfig {
/*
Orcabus token and zone name for external lambda functions
*/
//orcabusTokenSecretsManagerPath: string;
//hostedZoneNameSsmParameterPath: string;

/*
Data tables
*/
//fastqSyncDynamodbTableName: string;
/*
Event bus stuff
*/
//eventBusName: string;
// eventTriggers: FastqSyncEventTriggers
//}
//*/

const fastqSyncDynamodbTableName = 'fastqSyncTokenTable';

// Stateful
export const getFastqSyncManagerTableStackProps = (): FastqSyncTableConfig => {
return {
/* DynamoDB table for fastq list rows */
dynamodbTableName: fastqSyncDynamodbTableName,
};
};

// Stateless
export const getFastqSyncManagerStackProps = (): FastqSyncManagerStackConfig => {
return {
/*
Table stuff
*/
fastqSyncDynamodbTableName: fastqSyncDynamodbTableName,

/*
Events stuff
*/
eventBusName: eventBusName,
eventTriggers: {
fastqSetUpdated: {
eventSource: fastqManagerEventSource,
eventDetailType: fastqManagerEventDetails.updateFastqSet,
},
fastqListRowUpdated: {
eventSource: fastqManagerEventSource,
eventDetailType: fastqManagerEventDetails.updateFastqListRow,
},
fastqUnarchiving: {
eventSource: fastqUnarchivingManagerEventSource,
eventDetailType: fastqUnarchivingEventDetailType.updateJob,
},
fastqSync: {
eventDetailType: fastqSyncEventDetailType,
},
},

/*
Orcabus token and zone name for external lambda functions
*/
orcabusTokenSecretsManagerPath: jwtSecretName,
hostedZoneNameSsmParameterPath: hostedZoneNameParameterPath,
};
};
85 changes: 85 additions & 0 deletions config/stacks/fastqUnarchivingManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import {
AppStage,
/* API Gateway */
cognitoApiGatewayConfig,
corsAllowOrigins,
logsApiGatewayConfig,
/* Secrets and ssms */
jwtSecretName,
hostedZoneNameParameterPath,
/* DyanmoDB */
fastqUnarchivingJobTableName,
fastqUnarchivingJobTableIndexes,
/* S3 */
icav2PipelineCacheBucket,
icav2PipelineCachePrefix,
s3CopyStepsBucket,
s3CopyStepsFunctionArn,
/* Events */
eventBusName,
fastqUnarchivingEventDetailType,
fastqUnarchivingManagerEventSource,
} from '../constants';

import { FastqUnarchivingManagerTableConfig } from '../../lib/workload/stateful/stacks/fastq-unarchiving-dynamodb/deploy';

import { FastqUnarchivingManagerStackConfig } from '../../lib/workload/stateless/stacks/fastq-unarchiving/deploy/interfaces';

// Stateful
export const getFastqUnarchivingManagerTableStackProps = (): FastqUnarchivingManagerTableConfig => {
return {
/* DynamoDB table for fastq list rows */
fastqUnarchivingJobDynamodbTableName: fastqUnarchivingJobTableName,
};
};

// Stateless
export const getFastqUnarchivingManagerStackProps = (
stage: AppStage
): FastqUnarchivingManagerStackConfig => {
return {
/*
API Gateway props
*/
apiGatewayCognitoProps: {
...cognitoApiGatewayConfig,
corsAllowOrigins: corsAllowOrigins[stage],
apiGwLogsConfig: logsApiGatewayConfig[stage],
apiName: 'FastqUnarchivingManager',
customDomainNamePrefix: 'fastq-unarchiving',
},

/*
Events stuff
*/
eventBusName: eventBusName,
eventDetailType: fastqUnarchivingEventDetailType,
eventSource: fastqUnarchivingManagerEventSource,

/*
Orcabus token and zone name for external lambda functions
*/
orcabusTokenSecretsManagerPath: jwtSecretName,
hostedZoneNameSsmParameterPath: hostedZoneNameParameterPath,

/*
Data tables
*/
fastqUnarchivingJobsDynamodbTableName: fastqUnarchivingJobTableName,
/* Indexes - need permissions to query indexes */
fastqUnarchivingJobsDynamodbIndexes: fastqUnarchivingJobTableIndexes,

/*
Buckets stuff
*/
s3Byob: {
bucketName: icav2PipelineCacheBucket[stage],
prefix: `${icav2PipelineCachePrefix[stage]}restored/`,
},
s3StepsCopy: {
s3StepsCopyBucketName: s3CopyStepsBucket[stage],
s3StepsCopyPrefix: 'FASTQ_UNARCHIVING/',
s3StepsFunctionArn: s3CopyStepsFunctionArn[stage],
},
};
};
7 changes: 6 additions & 1 deletion config/stacks/fileManager.ts
Original file line number Diff line number Diff line change
@@ -18,10 +18,15 @@ import {
oncoanalyserBucket,
region,
vpcProps,
ntsmBucket,
} from '../constants';

export const fileManagerBuckets = (stage: AppStage): string[] => {
const eventSourceBuckets = [oncoanalyserBucket[stage], icav2PipelineCacheBucket[stage]];
const eventSourceBuckets = [
oncoanalyserBucket[stage],
icav2PipelineCacheBucket[stage],
ntsmBucket[stage],
];
// Note, that we only archive production data, so we only need access to the archive buckets in prod.
if (stage == AppStage.PROD) {
eventSourceBuckets.push(icav2ArchiveAnalysisBucket[stage]);
47 changes: 47 additions & 0 deletions config/stacks/icav2DataCopyManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import {
AppStage,
eventBusName,
icaEventPipeStackName,
icav2AccessTokenSecretName,
icav2DataCopyEventSource,
icav2DataCopyManagerDynamodbTableName,
icav2DataCopySyncDetailType,
} from '../constants';
import { Icav2DataCopyManagerTableConfig } from '../../lib/workload/stateful/stacks/icav2-data-copy-manager-dynamo-db/deploy';
import { Icav2DataCopyManagerConfig } from '../../lib/workload/stateless/stacks/icav2-data-copy-manager/deploy/interfaces';

// Stateful
export const getIcav2DataCopyManagerTableStackProps = (): Icav2DataCopyManagerTableConfig => {
return {
dynamodbTableName: icav2DataCopyManagerDynamodbTableName,
};
};

// Stateless
export const getIcav2DataCopyManagerStackProps = (stage: AppStage): Icav2DataCopyManagerConfig => {
return {
/*
Tables
*/
dynamodbTableName: icav2DataCopyManagerDynamodbTableName,

/*
Event handling
*/
eventBusName: eventBusName,
icaEventPipeName: icaEventPipeStackName,
eventSource: icav2DataCopyEventSource,
eventDetailType: icav2DataCopySyncDetailType,

/*
Names for things
*/
stateMachinePrefix: 'icav2-data-copy',
ruleNamePrefix: 'icav2-data-copy',

/*
Secrets
*/
icav2AccessTokenSecretId: icav2AccessTokenSecretName[stage], // "/icav2/umccr-prod/service-production-jwt-token-secret-arn"
};
};
5 changes: 5 additions & 0 deletions config/stacks/shared.ts
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import {
icav2ArchiveAnalysisBucket,
icav2ArchiveFastqBucket,
icav2PipelineCacheBucket,
ntsmBucket,
oncoanalyserBucket,
rdsMasterSecretName,
vpcProps,
@@ -114,6 +115,10 @@ const getEventSourceConstructProps = (stage: AppStage): EventSourceProps => {
eventTypes,
key: [{ 'anything-but': { wildcard: 'byob-icav2/*/cache/*' } }],
},
{
bucket: ntsmBucket[stage],
eventTypes,
},
],
};

4 changes: 3 additions & 1 deletion lib/workload/components/dynamodb-partitioned-table/index.ts
Original file line number Diff line number Diff line change
@@ -27,7 +27,9 @@ export class DynamodbPartitionedPipelineConstruct extends Construct {
},
tableName: props.tableName,
removalPolicy: props.removalPolicy || RemovalPolicy.RETAIN_ON_UPDATE_OR_DELETE,
pointInTimeRecovery: true,
pointInTimeRecoverySpecification: {
pointInTimeRecoveryEnabled: true,
},
});

// Set outputs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "fastq_tools"
version = "0.0.1"
description = "Workflow Manager Lambda Layers"
license = "GPL-3.0-or-later"
authors = [
"Alexis Lucattini"
]
homepage = "https://github.com/umccr/orcabus"
repository = "https://github.com/umccr/orcabus"

[tool.poetry.dependencies]
python = "^3.12, <3.13"
requests = "^2.32.3"

[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
pytest = "^7.0.0" # For testing only
# For typehinting only, not required at runtime
mypy-boto3-ssm = "^1.34"
mypy-boto3-secretsmanager = "^1.34"
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python3

"""
Fastq tools to be used by various lambdas as needed
"""
from .utils.models import(
FastqListRow,
FastqStorageObject,
FileStorageObject,
FastqSet,
Job,
JobStatus,
JobType
)

from .utils.query_helpers import (
get_fastq,
get_fastqs,
get_fastqs_in_instrument_run_id,
get_fastqs_in_library,
get_fastqs_in_library_list,
get_fastqs_in_libraries_and_instrument_run_id,
get_fastqs_in_sample,
get_fastqs_in_subject,
get_fastqs_in_individual,
get_fastqs_in_project,
get_fastq_set,
get_fastq_jobs
)


from .utils.update_helpers import (
add_qc_stats,
add_read_count,
add_file_compression_information,
add_ntsm_storage_object,
add_read_set,
detach_read_set,
validate_fastq,
invalidate_fastq
)

from .utils.workflow_helpers import (
to_cwl
)

from .utils.job_helpers import (
run_qc_stats,
run_ntsm,
run_file_compression_stats
)


__all__ = [
# Models
"FastqListRow",
"FastqStorageObject",
"FileStorageObject",
"FastqSet",
"Job",
"JobStatus",
"JobType",

# Query helpers
"get_fastq",
"get_fastqs",
"get_fastqs_in_instrument_run_id",
"get_fastqs_in_library",
"get_fastqs_in_library_list",
"get_fastqs_in_libraries_and_instrument_run_id",
"get_fastqs_in_sample",
"get_fastqs_in_subject",
"get_fastqs_in_individual",
"get_fastqs_in_project",

# Fastq Set Query helpers
"get_fastq_set",

# Job helpers
"get_fastq_jobs",

# Update helpers
"add_qc_stats",
"add_read_count",
"add_file_compression_information",
"add_ntsm_storage_object",
"add_read_set",
"detach_read_set",
"validate_fastq",
"invalidate_fastq",

# Workflow helpers
"to_cwl",

# Job helpers
"run_qc_stats",
"run_ntsm",
"run_file_compression_stats"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python3

# Standard imports
import typing
import boto3
import json
from os import environ


# Type hinting
if typing.TYPE_CHECKING:
from mypy_boto3_secretsmanager import SecretsManagerClient
from mypy_boto3_ssm import SSMClient


def get_secretsmanager_client() -> 'SecretsManagerClient':
return boto3.client('secretsmanager')


def get_ssm_client() -> 'SSMClient':
return boto3.client('ssm')


def get_secret_value(secret_id) -> str:
"""
Collect the secret value
:param secret_id:
:return:
"""
# Get the boto3 response
get_secret_value_response = get_secretsmanager_client().get_secret_value(SecretId=secret_id)

return get_secret_value_response['SecretString']


def get_ssm_value(parameter_name) -> str:
# Get the boto3 response
get_ssm_parameter_response = get_ssm_client().get_parameter(Name=parameter_name)

return get_ssm_parameter_response['Parameter']['Value']


def get_orcabus_token() -> str:
"""
From the AWS Secrets Manager, retrieve the OrcaBus token.
:return:
"""
return json.loads(get_secret_value(environ.get("ORCABUS_TOKEN_SECRET_ID")))['id_token']


def get_hostname() -> str:
return get_ssm_value(environ.get("HOSTNAME_SSM_PARAMETER"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env python3

import re

# AWS PARAMETERS
FASTQ_SUBDOMAIN_NAME = "fastq"

# API ENDPOINTS
FASTQ_LIST_ROW_ENDPOINT = "api/v1/fastq"
FASTQ_SET_ENDPOINT = "api/v1/fastqSet"

# REGEX
ORCABUS_ULID_REGEX_MATCH = re.compile(r'^[a-z0-9]{3}\.[A-Z0-9]{26}$')
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python3

"""
Update helpers for the update script.
- run_qc_stats
- run_ntsm
- run_file_compression_information
"""

# Standard imports

# Local imports
from .globals import FASTQ_LIST_ROW_ENDPOINT
from .request_helpers import patch_request
from .models import Job


def run_qc_stats(fastq_id) -> Job:
"""
Add QC stats to a fastq_id.
:param fastq_id: Fastq str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}:runQcStats",
)


def run_ntsm(fastq_id) -> Job:
"""
Run ntsm for a fastq_id.
:param fastq_id: Fastq str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}:runNtsm",
)


def run_file_compression_stats(fastq_id) -> Job:
"""
Run file compression stats for a fastq_id.
:param fastq_id: Fastq str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}:runFileCompressionInformation",
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#!/usr/bin/env python3

"""
{
"id": "fqr.01JJY7P1AVFGHGVMEDE8T4VWJG",
"rgid": "ATCCACTG+ACGCACCT.2",
"index": "ATCCACTG",
"index2": "ACGCACCT",
"lane": 2,
"instrumentRunId": "230223_A00130_0244_AHN3W3DSX5",
"library": {
"orcabusId": "lib.01JBMVFP45C2EZRVK67P8JY1D2",
"libraryId": "L2300223"
},
"readSet": {
"r1": {
"s3IngestId": "019387bd-2494-7c00-9e41-03e8b6a73306",
"gzipCompressionSizeInBytes": 49532847794,
"rawMd5sum": "19e339fdb3c42f0133f5f3b1f9d188e0", // pragma: allowlist secret
"s3Uri": "s3://archive-prod-fastq-503977275616-ap-southeast-2/v1/year=2023/month=02/230223_A00130_0244_AHN3W3DSX5/202411226f4f7af0/WGS_TsqNano/MDX230039_L2300223_S7_L002_R1_001.fastq.ora"
},
"r2": {
"s3IngestId": "019387bd-9177-79c1-a489-d940ecc11b11",
"gzipCompressionSizeInBytes": 53189277581,
"rawMd5sum": "e857de35a8ca008589d24b2e0f647cc7", // pragma: allowlist secret
"s3Uri": "s3://archive-prod-fastq-503977275616-ap-southeast-2/v1/year=2023/month=02/230223_A00130_0244_AHN3W3DSX5/202411226f4f7af0/WGS_TsqNano/MDX230039_L2300223_S7_L002_R2_001.fastq.ora"
},
"compressionFormat": "ORA"
},
"qc": null,
"readCount": null,
"baseCountEst": null,
"isValid": true,
"ntsm": null
}
"""

from enum import Enum
from typing import (
TypedDict,
Optional,
Dict,
List
)
from datetime import datetime


class JobType(Enum):
QC = 'QC'
FILE_COMPRESSION = 'FILE_COMPRESSION'
NTSM = 'NTSM'


class JobStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
FAILED = "FAILED"
SUCCEEDED = "SUCCEEDED"


class FileStorageObject(TypedDict):
s3IngestId: str
s3Uri: str
storageClass: str
sha256: str


class FastqStorageObject(FileStorageObject):
gzipCompressionSizeInBytes: int
rawMd5sum: str


class ReadSet(TypedDict):
r1: FastqStorageObject
r2: FastqStorageObject
compressionFormat: str


class Library(TypedDict):
orcabusId: str
libraryId: str


class FastqListRow(TypedDict):
id: str
fastqSetId: str
index: str
lane: int
instrumentRunId: str
library: Library
platform: Optional[str]
center: Optional[str]
date: Optional[datetime]
readSet: Optional[ReadSet]
qc: Optional[Dict]
readCount: Optional[int]
baseCountEst: Optional[int]
isValid: Optional[bool]
ntsm: Optional[FileStorageObject]


class FastqSet(TypedDict):
id: str
library: Library
fastqSet: List[FastqListRow]
allowAdditionalFastq: bool
isCurrentFastqSet: bool


class QcStats(TypedDict):
insertSizeEstimate: int
rawWgsCoverageEstimate: int
r1Q20Fraction: float
r2Q20Fraction: float
r1GcFraction: float
r2GcFraction: float


class ReadCount(TypedDict):
readCount: int
baseCountEst: int


class FileCompressionInformation(TypedDict):
compressionFormat: str
r1GzipCompressionSizeInBytes: Optional[int]
r2GzipCompressionSizeInBytes: Optional[int]
r1RawMd5sum: Optional[int]
r2RawMd5sum: Optional[int]


CWLFile = TypedDict('CWLFile', {
'class': str,
'location': str
})


class CWLDict(TypedDict):
rgid: str
index: str
index2: Optional[str]
lane: int
read_1: CWLFile
read_2: CWLFile


class Job(TypedDict):
id: str
fastqId: str
jobType: JobType
stepsExecutionArn: str
status: JobStatus
startTime: datetime
endTime: datetime
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#!/usr/bin/env python3

"""
Query helpers -
get_fastqs_in_instrument_run_id
get_fastqs_in_library
get_fastqs_in_sample
get_fastqs_in_subject
get_fastqs_in_individual
get_fastqs_in_project
get_fastq_by_rgid_and_instrument_run_id
"""
from typing import List

from .request_helpers import (
get_request_response,
get_request_response_results,
)

from .globals import FASTQ_LIST_ROW_ENDPOINT, FASTQ_SET_ENDPOINT
from .models import FastqListRow, FastqSet, Job


def get_fastq(fastq_id: str, **kwargs) -> FastqListRow:
return get_request_response(f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}", params=kwargs)


def get_fastq_set(fastq_set_id: str, **kwargs) -> FastqSet:
"""
Get the fastq set by id
:param fastq_set_id:
:param kwargs:
:return:
"""
return get_request_response(f"{FASTQ_SET_ENDPOINT}/{fastq_set_id}", params=kwargs)


def get_fastqs(*args, **kwargs) -> List[FastqListRow]:
"""
Get all fastqs
"""
return get_request_response_results(FASTQ_LIST_ROW_ENDPOINT, params=kwargs)


def get_fastqs_in_instrument_run_id(instrument_run_id: str):
"""
Get all fastqs in an instrument run id
"""
return get_fastqs(
**{
"instrumentRunId": instrument_run_id
}
)


def get_fastqs_in_library(library_id: str):
"""
Get all fastqs in a library
"""
return get_fastqs(
**{
"library": library_id
}
)


def get_fastqs_in_library_list(library_id_list: List[str]):
"""
Get all fastqs in a list of libraries
"""
return get_fastqs(
**{
"library[]": library_id_list
}
)


def get_fastqs_in_libraries_and_instrument_run_id(library_id_list, instrument_run_id):
"""
Get all fastqs in a list of libraries and instrument run id
:param library_id_list:
:param instrument_run_id:
:return:
"""
return get_fastqs(
**{
"library[]": library_id_list,
"instrumentRunId": instrument_run_id
}
)


def get_fastqs_in_sample(sample_id):
"""
Get all fastqs in a sample
"""
return get_fastqs(
**{
"sample": sample_id
}
)


def get_fastqs_in_subject(subject_id):
"""
Get all fastqs in a subject
"""
return get_fastqs(
**{
"subject": subject_id
}
)


def get_fastqs_in_individual(individual_id):
"""
Get all fastqs in an individual
"""
return get_fastqs(
**{
"individual": individual_id
}
)


def get_fastqs_in_project(project_id):
"""
Get all fastqs in a project
"""
return get_fastqs(
**{
"project": project_id
}
)


def get_fastq_list_rows_in_fastq_set(fastq_set_id):
"""
Get all fastqs in a fastq set
"""
return get_fastqs(
**{
"fastqSet": fastq_set_id
}
)


def get_fastq_jobs(fastq_id: str) -> List[Job]:
"""
Get all fastqs in a fastq set
"""
return get_request_response_results(f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/jobs")
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
from typing import Dict, Optional, List, Union
from urllib.parse import urlunparse, urlparse

# Standard imports
import requests
import logging
from copy import deepcopy

# Locals
from .globals import (
FASTQ_SUBDOMAIN_NAME,
)

from .aws_helpers import (
get_orcabus_token, get_hostname
)

# Set default request params
DEFAULT_REQUEST_PARAMS = {}

# Set logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def get_url(endpoint: str) -> str:
"""
Get the URL for the Metadata endpoint
:param endpoint:
:return:
"""
# Get the hostname
hostname = get_hostname()

return urlunparse(
[
"https",
".".join([FASTQ_SUBDOMAIN_NAME, hostname]),
endpoint,
None, None, None
]
)


def get_request_response(endpoint: str, params: Optional[Dict] = None) -> Dict:
"""
Run get response against the Metadata endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)


# Make the request
response = requests.get(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
params=req_params
)

response.raise_for_status()

return response.json()


def get_request_response_results(endpoint: str, params: Optional[Dict] = None) -> Union[List[Dict], Dict]:
"""
Run get response against the Metadata endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
response_dict = get_request_response(endpoint, params)

return response_dict['results']


def patch_request(endpoint: str, params: Optional[Dict] = None) -> Dict:
"""
Run patch request against the Metadata endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)

# Make the request
response = requests.patch(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
json=req_params
)

response.raise_for_status()

return response.json()
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/usr/bin/env python3

"""
Update helpers for the update script.
- add_qc_stats
- add_read_count
- add_ntsm_storage_object / add_ntsm
- add_fastq_pair_storage_object / add_read_set
- detach_fastq_pair_storage_object / detach_read_set
- validate
- invalidate
"""

# Standard imports

# Local imports
from .globals import FASTQ_LIST_ROW_ENDPOINT
from .request_helpers import patch_request
from .models import QcStats, FastqListRow, ReadCount, FileCompressionInformation, FileStorageObject, ReadSet


def add_qc_stats(fastq_id: str, qc_stats: QcStats) -> FastqListRow:
"""
Add QC stats to a fastq_id.
:param fastq_id: Fastq str
:param qc_stats: Dictionary of QC stats
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/addQcStats",
params=qc_stats
)


def add_read_count(fastq_id: str, read_count: ReadCount) -> FastqListRow:
"""
Add read count to a fastq id
:param fastq_id:
:param read_count:
:return:
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/addReadCount",
params=read_count
)


def add_file_compression_information(fastq_id: str, file_compression_information: FileCompressionInformation) -> FastqListRow:
"""
Add file compression information to a fastq id
:param fastq_id:
:param file_compression_information:
:return:
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/addFileCompressionInformation",
params=file_compression_information
)


def add_ntsm_storage_object(fastq_id: str, ntsmFastqStorageObject: FileStorageObject) -> FastqListRow:
"""
Add a Ntsm storage object to a fastq id.
:param fastq_id: Fastq str
:param ntsm_id: Ntsm str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/addNtsmStorageObject",
params=ntsmFastqStorageObject
)


def add_read_set(fastq_id: str, read_set: ReadSet) -> FastqListRow:
"""
Add a read set to a fastq id.
:param fastq_id: Fastq str
:param read_set: ReadSet str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/addFastqPairStorageObject",
params=read_set
)

def detach_read_set(fastq_id: str, read_set: ReadSet) -> FastqListRow:
"""
Detach a read set to a fastq id.
:param fastq_id: Fastq str
:param read_set: ReadSet str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/detachFastqPairStorageObject",
params=read_set
)


def validate_fastq(fastq_id: str) -> FastqListRow:
"""
Validate a fastq id.
:param fastq_id: Fastq str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/validate"
)


def invalidate_fastq(fastq_id: str) -> FastqListRow:
"""
Invalidate a fastq id.
:param fastq_id: Fastq str
"""
return patch_request(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/invalidate"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python3

"""
Workflow helpers - a collection of helper functions for the workflow
- to_cwl: Given a fastq id, convert to a cwl file
"""


# Local imports
from .models import CWLDict
from .globals import FASTQ_LIST_ROW_ENDPOINT
from .request_helpers import get_request_response_results


def to_cwl(fastq_id) -> CWLDict:
return get_request_response_results(
f"{FASTQ_LIST_ROW_ENDPOINT}/{fastq_id}/toCwl"
)
26 changes: 26 additions & 0 deletions lib/workload/components/python-fastq-tools-layer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3

import { Construct } from 'constructs';
import { PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha';
import path from 'path';
import { PythonLambdaLayerConstruct } from '../python-lambda-layer';

export interface PythonFastqLambdaLayerConstructProps {
layerPrefix: string;
}

export class FastqToolsPythonLambdaLayer extends Construct {
public readonly lambdaLayerVersionObj: PythonLayerVersion;

constructor(scope: Construct, id: string, props: PythonFastqLambdaLayerConstructProps) {
super(scope, id);

// Generate lambda Fastq python layer
// Get lambda layer object
this.lambdaLayerVersionObj = new PythonLambdaLayerConstruct(this, 'lambda_layer', {
layerName: `${props.layerPrefix}-Fastq-py-layer`,
layerDescription: 'Lambda Layer for handling the Fastq api via Python',
layerDirectory: path.join(__dirname, 'fastq_tools_layer'),
}).lambdaLayerVersionObj;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "fastq_unarchiving_tools"
version = "0.0.1"
description = "Workflow Manager Lambda Layers"
license = "GPL-3.0-or-later"
authors = [
"Alexis Lucattini"
]
homepage = "https://github.com/umccr/orcabus"
repository = "https://github.com/umccr/orcabus"

[tool.poetry.dependencies]
python = "^3.12, <3.13"
requests = "^2.32.3"

[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
pytest = "^7.0.0" # For testing only
# For typehinting only, not required at runtime
mypy-boto3-ssm = "^1.34"
mypy-boto3-secretsmanager = "^1.34"
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from .utils.create_helpers import create_job
from .utils.query_helpers import (
get_unarchiving_job_list, get_job_from_job_id, get_job_list_for_fastq
)
from .utils.update_helpers import update_status
from .utils.models import Job, JobType, JobStatus

__all__ = [
# Create
'create_job',

# Query
'get_unarchiving_job_list',
'get_job_from_job_id',
'get_job_list_for_fastq',

# Updating
'update_status',

# Models
'Job',
'JobType',
'JobStatus'
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python3

# Standard imports
import typing
import boto3
import json
from os import environ


# Type hinting
if typing.TYPE_CHECKING:
from mypy_boto3_secretsmanager import SecretsManagerClient
from mypy_boto3_ssm import SSMClient


def get_secretsmanager_client() -> 'SecretsManagerClient':
return boto3.client('secretsmanager')


def get_ssm_client() -> 'SSMClient':
return boto3.client('ssm')


def get_secret_value(secret_id) -> str:
"""
Collect the secret value
:param secret_id:
:return:
"""
# Get the boto3 response
get_secret_value_response = get_secretsmanager_client().get_secret_value(SecretId=secret_id)

return get_secret_value_response['SecretString']


def get_ssm_value(parameter_name) -> str:
# Get the boto3 response
get_ssm_parameter_response = get_ssm_client().get_parameter(Name=parameter_name)

return get_ssm_parameter_response['Parameter']['Value']


def get_orcabus_token() -> str:
"""
From the AWS Secrets Manager, retrieve the OrcaBus token.
:return:
"""
return json.loads(get_secret_value(environ.get("ORCABUS_TOKEN_SECRET_ID")))['id_token']


def get_hostname() -> str:
return get_ssm_value(environ.get("HOSTNAME_SSM_PARAMETER"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3

"""
Create the job
"""

from typing import List, Optional
from .request_helpers import post_request
from .globals import FASTQ_UNARCHIVING_SUBDOMAIN_NAME
from .models import Job, JobType


def create_job(fastq_ids: List[str], job_type: Optional[JobType] = None) -> Job:
"""
Create the job
"""
if job_type is None:
job_type = JobType.S3_UNARCHIVING

return post_request(
FASTQ_UNARCHIVING_SUBDOMAIN_NAME,
params={
"fastqIds": fastq_ids,
"jobType": job_type.value
}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3

import re
from enum import Enum

# AWS PARAMETERS
FASTQ_UNARCHIVING_SUBDOMAIN_NAME = "fastq-unarchiving"

# API ENDPOINTS
JOB_ENDPOINT = "api/v1/jobs"

# REGEX
ORCABUS_ULID_REGEX_MATCH = re.compile(r'^[a-z0-9]{3}\.[A-Z0-9]{26}$')


class JobStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
FAILED = "FAILED"
ABORTED = "ABORTED"
SUCCEEDED = "SUCCEEDED"
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3

"""
{
"id": "ufr.01JJY7P1AVFGHGVMEDE8T4VWJG",
"jobType: "S3_UNARCHIVING",
"stepsExecutionArn": "aws:arn:states:us-east-1:123456789012:execution:myStateMachine:myExecution",
"status": "SUCCEEDED",
"startTime": "2021-07-01T00:00:00Z",
"endTime": "2021-07-01T00:00:00Z",
}
"""

from typing import (
TypedDict,
)

from enum import Enum


class JobType(Enum):
S3_UNARCHIVING = "S3_UNARCHIVING"


class JobStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
FAILED = "FAILED"
ABORTED = "ABORTED"
SUCCEEDED = "SUCCEEDED"


class Job(TypedDict):
id: str
jobType: JobType
stepsExecutionArn: str
status: JobStatus
startTime: str
endTime: str
errorMessages: str
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python3

"""
Query helpers -
get_job_from_job_id
get_unarchiving_job_list
"""
from typing import List

from .models import Job, JobStatus
from .request_helpers import (
get_request_response_results,
)

from .globals import JOB_ENDPOINT


def get_job_from_job_id(job_id: str, **kwargs) -> Job:
return get_request_response_results(f"{JOB_ENDPOINT}/{job_id}", params=kwargs)


def get_unarchiving_job_list(*args, **kwargs) -> List[Job]:
"""
Get all fastqs
"""
return get_request_response_results(JOB_ENDPOINT, params=kwargs)


def get_job_list_for_fastq(fastq_id: str, job_status: JobStatus) -> List[Job]:
"""
Check if fastq in job list
:return:
"""
return get_unarchiving_job_list(fastq_id=fastq_id, status=job_status.value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#!/usr/bin/env python3
from typing import Dict, Optional, List, Union
from urllib.parse import urlunparse, urlparse

# Standard imports
import requests
import logging
from copy import deepcopy

# Locals
from .globals import (
FASTQ_UNARCHIVING_SUBDOMAIN_NAME,
)

from .aws_helpers import (
get_orcabus_token, get_hostname
)

# Set default request params
DEFAULT_REQUEST_PARAMS = {}

# Set logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def get_url(endpoint: str) -> str:
"""
Get the URL for the Metadata endpoint
:param endpoint:
:return:
"""
# Get the hostname
hostname = get_hostname()

return urlunparse(
[
"https",
".".join([FASTQ_UNARCHIVING_SUBDOMAIN_NAME, hostname]),
endpoint,
None, None, None
]
)


def get_request_response_results(endpoint: str, params: Optional[Dict] = None) -> Union[List[Dict], Dict]:
"""
Run get response against the Metadata endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)


# Make the request
response = requests.get(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
params=req_params
)

response.raise_for_status()

return response.json()


def patch_request(endpoint: str, params: Optional[Dict] = None) -> Dict:
"""
Run patch request against the fastq unarchiving endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)

# Make the request
response = requests.patch(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
json=req_params
)

response.raise_for_status()

return response.json()


def post_request(endpoint: str, params: Optional[Dict] = None) -> Dict:
"""
Run post request against the fastq unarchiving endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)

# Make the request
response = requests.post(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
json=req_params
)

response.raise_for_status()

return response.json()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python3

"""
Update helpers for the update script.
- add_qc_stats
- add_read_count
- add_ntsm_storage_object / add_ntsm
- add_fastq_pair_storage_object / add_read_set
- detach_fastq_pair_storage_object / detach_read_set
- validate
- invalidate
"""

# Standard imports
from typing import Optional

# Local imports
from .globals import JobStatus, JOB_ENDPOINT
from .request_helpers import patch_request
from .models import Job


def update_status(job_id: str, job_status: JobStatus, error_message: Optional[str] = None) -> Job:
"""
Add QC stats to a fastq_id.
:param fastq_id: Fastq str
:param qc_stats: Dictionary of QC stats
"""
return patch_request(
f"{JOB_ENDPOINT}/{job_id}",
params=dict(filter(
lambda x: x[1] is not None,
{
"status": job_status,
"error_message": error_message
}.items()
))
)


Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env python3

import { Construct } from 'constructs';
import { PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha';
import path from 'path';
import { PythonLambdaLayerConstruct } from '../python-lambda-layer';

export interface PythonFastqUnarchivingLambdaLayerConstructProps {
layerPrefix: string;
}

export class FastqUnarchivingToolsPythonLambdaLayer extends Construct {
public readonly lambdaLayerVersionObj: PythonLayerVersion;

constructor(
scope: Construct,
id: string,
props: PythonFastqUnarchivingLambdaLayerConstructProps
) {
super(scope, id);

// Generate lambda FastqUnarchiving python layer
// Get lambda layer object
this.lambdaLayerVersionObj = new PythonLambdaLayerConstruct(this, 'lambda_layer', {
layerName: `${props.layerPrefix}-FastqUnarchiving-py-layer`,
layerDescription: 'Lambda Layer for handling the fastq unarchiving api via Python',
layerDirectory: path.join(__dirname, 'fastq_unarchiving_tools_layer'),
}).lambdaLayerVersionObj;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "filemanager_tools"
version = "0.0.1"
description = "File Manager Lambda Layers"
license = "GPL-3.0-or-later"
authors = [
"Alexis Lucattini"
]
homepage = "https://github.com/umccr/orcabus"
repository = "https://github.com/umccr/orcabus"

[tool.poetry.dependencies]
python = "^3.12, <3.13"
requests = "^2.31.0"

[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
pytest = "^7.0.0" # For testing only
# For typehinting only, not required at runtime
mypy-boto3-ssm = "^1.34"
mypy-boto3-secretsmanager = "^1.34"
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from .utils.file_helpers import (
FileObject,
STORAGE_ENUM,
update_ingest_id,
get_file_object_from_s3_uri,
get_file_object_from_id,
get_file_object_from_ingest_id,
list_files_from_portal_run_id,
get_presigned_url,
get_s3_object_id_from_s3_uri,
get_s3_uri_from_s3_object_id,
get_s3_uri_from_ingest_id,
get_ingest_id_from_s3_uri,
get_presigned_url_from_ingest_id,
get_presigned_url_expiry,
get_s3_objs_from_ingest_ids_map,
file_search,
list_files_recursively,
get_cache_bucket_from_account_id,
get_restore_prefix_from_account_id,
get_archive_fastq_bucket_from_account_id
)

__all__ = [
"FileObject",
"STORAGE_ENUM",
"update_ingest_id",
"get_file_object_from_s3_uri",
"get_file_object_from_id",
"get_file_object_from_ingest_id",
"list_files_from_portal_run_id",
"get_presigned_url",
"get_s3_object_id_from_s3_uri",
"get_s3_uri_from_s3_object_id",
"get_s3_uri_from_ingest_id",
"get_ingest_id_from_s3_uri",
"get_presigned_url_from_ingest_id",
"get_presigned_url_expiry",
"get_s3_objs_from_ingest_ids_map",
"file_search",
"list_files_recursively",
"get_cache_bucket_from_account_id",
"get_restore_prefix_from_account_id",
"get_archive_fastq_bucket_from_account_id"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env python3


Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3

# Standard imports
import typing
from typing import Optional
import boto3
import json
from os import environ
from urllib.parse import urlparse


# Type hinting
if typing.TYPE_CHECKING:
from mypy_boto3_secretsmanager import SecretsManagerClient
from mypy_boto3_ssm import SSMClient

ORCABUS_TOKEN_STR: Optional[str] = None
HOSTNAME_STR: Optional[str] = None


def get_secretsmanager_client() -> 'SecretsManagerClient':
return boto3.client('secretsmanager')


def get_ssm_client() -> 'SSMClient':
return boto3.client('ssm')


def get_secret_value(secret_id) -> str:
"""
Collect the secret value
:param secret_id:
:return:
"""
# Get the boto3 response
get_secret_value_response = get_secretsmanager_client().get_secret_value(SecretId=secret_id)

return get_secret_value_response['SecretString']


def get_ssm_value(parameter_name) -> str:
# Get the boto3 response
get_ssm_parameter_response = get_ssm_client().get_parameter(Name=parameter_name)

return get_ssm_parameter_response['Parameter']['Value']


def set_orcabus_token():
global ORCABUS_TOKEN_STR

ORCABUS_TOKEN_STR = (
json.loads(
get_secret_value(environ.get("ORCABUS_TOKEN_SECRET_ID"))
)['id_token']
)


def get_orcabus_token() -> str:
"""
From the AWS Secrets Manager, retrieve the OrcaBus token.
:return:
"""
if ORCABUS_TOKEN_STR is None:
set_orcabus_token()
return ORCABUS_TOKEN_STR


def set_hostname():
global HOSTNAME_STR

HOSTNAME_STR = get_ssm_value(environ.get("HOSTNAME_SSM_PARAMETER"))

def get_hostname() -> str:
if HOSTNAME_STR is None:
set_hostname()
return HOSTNAME_STR


def get_bucket_key_pair_from_uri(s3_uri: str) -> (str, str):
"""
Get the bucket and key from an s3 uri
:param s3_uri:
:return:
"""
url_obj = urlparse(s3_uri)

s3_bucket = url_obj.netloc
s3_key = url_obj.path.lstrip('/')

if s3_bucket is None or s3_key is None:
raise ValueError(f"Invalid S3 URI: {s3_uri}")

return s3_bucket, s3_key
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Optional


class S3FileNotFoundError(Exception):
def __init__(
self,
s3_object_id: Optional[str] = None,
s3_uri: Optional[str] = None,
ingest_id: Optional[str] = None
):
self.s3_object_id = s3_object_id
self.s3_uri = s3_uri
self.ingest_id = ingest_id
if s3_object_id is not None:
self.message = f"Could not find file with object ID '{s3_object_id}'"
elif s3_uri is not None:
self.message = f"Could not find the file at S3 URI '{s3_uri}'"
elif ingest_id is not None:
self.message = f"Could not find file with ingest ID '{ingest_id}'"
else:
self.message = "Could not find file"
super().__init__(self.message)


class S3DuplicateFileCopyError(Exception):
def __init__(
self,
s3_object_id: Optional[str] = None,
s3_uri: Optional[str] = None,
ingest_id: Optional[str] = None
):
self.s3_object_id = s3_object_id
self.s3_uri = s3_uri
self.ingest_id = ingest_id
if s3_object_id is not None:
self.message = f"Found multiple files with the object ID '{s3_object_id}'"
elif s3_uri is not None:
self.s3_uri = f"Found multiple files with the uri'{s3_uri}'"
elif ingest_id is not None:
self.message = f"Found multiple files with the ingest id '{ingest_id}'"
else:
self.message = "Found multiple files"
super().__init__(self.message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
#!/usr/bin/env python3
from functools import reduce
from operator import concat
from typing import List, Dict, Union
import typing

import boto3

from .errors import S3FileNotFoundError, S3DuplicateFileCopyError
from .models import FileObject
from .aws_helpers import get_bucket_key_pair_from_uri
from .request_helpers import get_request_response_results, get_response, patch_response
from .globals import S3_LIST_ENDPOINT, S3_BUCKETS_BY_ACCOUNT_ID, S3_PREFIXES_BY_ACCOUNT_ID, STORAGE_ENUM, STORAGE_PRIORITY
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse
from itertools import batched

if typing.TYPE_CHECKING:
from mypy_boto3_sts import STSClient


def get_file_object_from_s3_uri(s3_uri: str) -> FileObject:
s3_bucket, s3_key = get_bucket_key_pair_from_uri(s3_uri)

response = get_request_response_results(S3_LIST_ENDPOINT, {
"bucket": s3_bucket,
"key": s3_key,
"currentState": 'true'
})

if len(response) == 0:
# Try again with current_state=False
response = get_request_response_results(S3_LIST_ENDPOINT, {
"bucket": s3_bucket,
"key": s3_key,
"currentState": 'false'
})

if len(response) == 0:
raise S3FileNotFoundError(s3_uri=s3_uri)

# Filter responses with no "s3IngestId" field
response = list(filter(
lambda result_iter_: result_iter_.get("ingestId", None) is not None,
response
))

if not len(response) == 1:
raise S3DuplicateFileCopyError(s3_uri=s3_uri)

# Return as a FileObject model
return FileObject(**response[0])


def get_file_object_from_id(s3_object_id: str) -> FileObject:
"""
Get file object from the id
:param s3_object_id:
:return:
"""
response = get_request_response_results(f"{S3_LIST_ENDPOINT}/{s3_object_id}")

if len(response) == 0:
raise S3FileNotFoundError(s3_object_id=s3_object_id)
elif not len(response) == 1:
raise S3DuplicateFileCopyError(s3_object_id=s3_object_id)

# Return as a FileObject model
return FileObject(**response[0])


def get_file_object_from_ingest_id(ingest_id: str) -> FileObject:
response = get_request_response_results(S3_LIST_ENDPOINT, {
"ingestId": ingest_id
})

if len(response) == 0:
raise S3FileNotFoundError(ingest_id=ingest_id)
elif len(response) == 1:
return FileObject(**response[0])

file_objects_list = list(map(
lambda file_obj_iter_: FileObject(**file_obj_iter_),
response
))

# Order by storage class
file_objects_list.sort(key=lambda file_obj_iter_: STORAGE_PRIORITY(STORAGE_ENUM[file_obj_iter_['storageClass']]))

# Return as a FileObject model
return file_objects_list[0]


def list_files_from_portal_run_id(portal_run_id: str) -> List[FileObject]:
response = get_request_response_results(S3_LIST_ENDPOINT, {
"portalRunId": portal_run_id
})

# Return as a list of FileObject models
return [FileObject(**file) for file in response]


def get_presigned_url(s3_object_id: str) -> str:
"""
Get presigned url
:param s3_object_id:
:return:
"""

response = get_response(f"{S3_LIST_ENDPOINT}/presign/{s3_object_id}")

return str(response)


def get_s3_object_id_from_s3_uri(s3_uri: str) -> str:
return get_file_object_from_s3_uri(s3_uri)['s3ObjectId']


def get_s3_uri_from_s3_object_id(s3_object_id: str) -> str:
file_object: FileObject = get_file_object_from_id(s3_object_id)
return f"s3://{file_object['bucket']}/{file_object['key']}"


def get_s3_uri_from_ingest_id(ingest_id: str) -> str:
file_object: FileObject = get_file_object_from_ingest_id(ingest_id)
return f"s3://{file_object['bucket']}/{file_object['key']}"


def get_ingest_id_from_s3_uri(s3_uri: str) -> str:
return get_file_object_from_s3_uri(s3_uri)['ingestId']


def get_presigned_url_from_ingest_id(ingest_id: str) -> str:
"""
Get presigned url from ingest id
:param ingest_id:
:return:
"""
return get_presigned_url(get_file_object_from_ingest_id(ingest_id)['s3ObjectId'])


def get_presigned_url_expiry(s3_presigned_url: str) -> datetime:
"""
Given a presigned url, return the expiry
:param s3_presigned_url:
:return:
"""
urlobj = urlparse(s3_presigned_url)

query_dict = dict(map(
lambda params_iter_: params_iter_.split("=", 1),
urlparse(s3_presigned_url).query.split("&"))
)

# Take the X-Amz-Date value (in 20250121T013812Z format) and add in the X-Amz-Expires value
creation_time = datetime.strptime(query_dict['X-Amz-Date'], "%Y%m%dT%H%M%SZ")
expiry_ext = timedelta(seconds=int(query_dict['X-Amz-Expires']))

return (creation_time + expiry_ext).astimezone(tz=timezone.utc)


def get_s3_objs_from_ingest_ids_map(ingest_ids: List[str]) -> List[Dict[str, Union[FileObject, str]]]:
# Split by groups of 100
ingest_id_batches = batched(ingest_ids, 100)

# Get the s3 objects
try:
return list(map(
lambda s3_obj_iter: {
"ingestId": s3_obj_iter['ingestId'],
"fileObject": s3_obj_iter
},
list(reduce(
concat,
list(map(
lambda ingest_id_batch_:
get_request_response_results(S3_LIST_ENDPOINT, {
"ingestId[]": list(ingest_id_batch_)
}),
ingest_id_batches
))
))
))
except TypeError as e:
# TypeError: reduce() of empty iterable with no initial value
return []


def file_search(bucket: str, key: str) -> List[FileObject]:
filtered_params = dict(
filter(
lambda param_iter_: param_iter_[1] is not None,
{
"bucket": bucket,
"key": key
}
)
)
response = get_request_response_results(
S3_LIST_ENDPOINT,
params=filtered_params
)

# Return as a list of FileObject models
return response


def list_files_recursively(bucket: str, key: str) -> List[FileObject]:
response = get_request_response_results(
S3_LIST_ENDPOINT,
{
"bucket": bucket,
"key": f"{key}*", # Append wildcard to key
}
)

# Return as a list of FileObject models
return response


def get_sts_client() -> 'STSClient':
return boto3.client('sts')


def get_cache_bucket_from_account_id() -> str:
return S3_BUCKETS_BY_ACCOUNT_ID["cache"][get_sts_client().get_caller_identity()['Account']]

def get_archive_fastq_bucket_from_account_id():
return S3_BUCKETS_BY_ACCOUNT_ID["archive_fastq"][get_sts_client().get_caller_identity()['Account']]

def get_restore_prefix_from_account_id():
return S3_PREFIXES_BY_ACCOUNT_ID["restore"][get_sts_client().get_caller_identity()['Account']]


def update_ingest_id(s3_object_id: str, new_ingest_id: str) -> Dict:
json_data = {
'ingestId': [
{
'op': 'add',
'path': '/',
'value': new_ingest_id,
},
],
}
return patch_response(
endpoint=f"{S3_LIST_ENDPOINT}/{s3_object_id}",
json_data=json_data
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python
from enum import Enum

# AWS PARAMETERS
FILE_SUBDOMAIN_NAME = "file"

S3_LIST_ENDPOINT = "api/v1/s3"

S3_ATTRIBUTES_LIST_ENDPOINT = "api/v1/s3/attributes"

S3_BUCKETS_BY_ACCOUNT_ID = {
"cache": {
"843407916570": "pipeline-dev-cache-503977275616-ap-southeast-2",
"455634345446": "pipeline-stg-cache-503977275616-ap-southeast-2",
"472057503814": "pipeline-prod-cache-503977275616-ap-southeast-2",
},
"archive_fastq": {
"843407916570": "archive-dev-fastq-503977275616-ap-southeast-2",
"455634345446": "archive-stg-fastq-503977275616-ap-southeast-2",
"472057503814": "archive-prod-fastq-503977275616-ap-southeast-2",
}
}

S3_PREFIXES_BY_ACCOUNT_ID = {
"restore": {
"843407916570": "byob-icav2/development/restore",
"455634345446": "byob-icav2/staging/restore",
"472057503814": "byob-icav2/production/restore",
},
}

# FROM FileManager Schema
# "DeepArchive"
# "Glacier"
# "GlacierIr"
# "IntelligentTiering"
# "OnezoneIa"
# "Outposts"
# "ReducedRedundancy"
# "Snow"
# "Standard"
# "StandardIa"

class STORAGE_ENUM(Enum):
STANDARD = "Standard"
STANDARD_IA = "StandardIa"
INTELLIGENT_TIERING = "IntelligentTiering"
GLACIER_INSTANT_RETRIEVAL = "GlacierIr"
GLACIER = "Glacier"
DEEP_ARCHIVE = "DeepArchive"


class STORAGE_PRIORITY(Enum):
STANDARD = 1
STANDARD_IA = 2
INTELLIGENT_TIERING = 3
GLACIER_INSTANT_RETRIEVAL = 4
GLACIER = 5
DEEP_ARCHIVE = 6
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Optional, TypedDict, Dict

"""
Example File Object response
{
"attributes": null,
"bucket": "string",
"deletedDate": "2025-01-19T23:32:42.747Z",
"deletedSequencer": "string",
"eTag": "string",
"eventTime": "2025-01-19T23:32:42.747Z",
"eventType": "Created",
"ingestId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"isCurrentState": true,
"isDeleteMarker": true,
"key": "string",
"lastModifiedDate": "2025-01-19T23:32:42.747Z",
"numberDuplicateEvents": 9007199254740991,
"numberReordered": 9007199254740991,
"s3ObjectId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"sequencer": "string",
"sha256": "string",
"size": 9007199254740991,
"storageClass": null,
"versionId": "string"
}
"""


class FileObject(TypedDict):
# Identifier
s3ObjectId: str

# Path attributes
bucket: str
key: str

# File attributes
eTag: str
eventTime: str
eventType: str
ingestId: str
isCurrentState: bool
isDeleteMarker: bool
lastModifiedDate: str
numberDuplicateEvents: int
numberReordered: int
sequencer: str
size: int
storageClass: str

# Attribute attributes
attributes: Optional[Dict]

# Optional attributes
deletedDate: Optional[str]
deletedSequencer: Optional[str]
versionId: Optional[str]
sha256: Optional[str]
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
from typing import Dict, Optional, List
from urllib.parse import urlunparse, urlparse

# Standard imports
import requests
import logging
from copy import deepcopy

# Locals
from .globals import (
FILE_SUBDOMAIN_NAME,
)

from .aws_helpers import (
get_orcabus_token, get_hostname
)

# Globals
DEFAULT_REQUEST_PARAMS = {
"rowsPerPage": 1000
}

# Set logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def get_url(endpoint: str) -> str:
"""
Get the URL for the filemanager endpoint
:param endpoint:
:return:
"""
# Get the hostname
hostname = get_hostname()

return urlunparse(
[
"https",
".".join([FILE_SUBDOMAIN_NAME, hostname]),
endpoint,
None, None, None
]
)


def get_response(endpoint: str, params: Optional[Dict] = None) -> Dict:
"""
Run get response against the filemanager endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)

# Make the request
response = requests.get(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
params=req_params
)

response.raise_for_status()

response_json = response.json()

return response_json



def get_request_response_results(endpoint: str, params: Optional[Dict] = None) -> List[Dict]:
"""
Run get response against the filemanager endpoint
:param endpoint:
:param params:
:return:
"""
# Get authorization header
headers = {
"Authorization": f"Bearer {get_orcabus_token()}"
}

req_params = deepcopy(DEFAULT_REQUEST_PARAMS)

req_params.update(
params if params is not None else {}
)


# Make the request
response = requests.get(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
params=req_params
)

response.raise_for_status()

response_json = response.json()

if 'links' not in response_json.keys():
return [response_json]

if 'next' in response_json['links'].keys() and response_json['links']['next'] is not None:
return response_json['results'] + get_request_response_results(response_json['links']['next'])
return response_json['results']


def patch_response(endpoint: str, params: Optional[Dict], json_data: Optional[Dict]) -> Dict:
"""
Run patch response against the filemanager endpoint
:param endpoint:
:param params:
:param json_data:
:return:
"""
# Get authorization header
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {get_orcabus_token()}"
}

# Make the request
response = requests.patch(
get_url(endpoint) if not urlparse(endpoint).scheme else endpoint,
headers=headers,
params=params,
json=json_data
)

response.raise_for_status()

response_json = response.json()

return response_json
26 changes: 26 additions & 0 deletions lib/workload/components/python-filemanager-tools-layer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3

import { Construct } from 'constructs';
import { PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha';
import path from 'path';
import { PythonLambdaLayerConstruct } from '../python-lambda-layer';

export interface PythonFilemanagerLambdaLayerConstructProps {
layerPrefix: string;
}

export class FilemanagerToolsPythonLambdaLayer extends Construct {
public readonly lambdaLayerVersionObj: PythonLayerVersion;

constructor(scope: Construct, id: string, props: PythonFilemanagerLambdaLayerConstructProps) {
super(scope, id);

// Generate lambda filemanager python layer
// Get lambda layer object
this.lambdaLayerVersionObj = new PythonLambdaLayerConstruct(this, 'lambda_layer', {
layerName: `${props.layerPrefix}-filemanager-py-layer`,
layerDescription: 'Lambda Layer for handling the filemanager api via Python',
layerDirectory: path.join(__dirname, 'filemanager_tools_layer'),
}).lambdaLayerVersionObj;
}
}
5 changes: 4 additions & 1 deletion lib/workload/components/python-lambda-layer/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Construct } from 'constructs';
import { PythonLayerVersion } from '@aws-cdk/aws-lambda-python-alpha';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { getPythonUvDockerImage } from '../uv-python-lambda-image-builder';

export interface PythonLambdaLayerConstructProps {
layerName: string;
@@ -15,6 +16,7 @@ export class PythonLambdaLayerConstruct extends Construct {
constructor(scope: Construct, id: string, props: PythonLambdaLayerConstructProps) {
super(scope, id);

// Generate the docker image
this.lambdaLayerVersionObj = new PythonLayerVersion(this, 'python_lambda_layer', {
layerVersionName: props.layerName,
entry: props.layerDirectory,
@@ -23,14 +25,15 @@ export class PythonLambdaLayerConstruct extends Construct {
license: 'GPL3',
description: props.layerDescription,
bundling: {
image: getPythonUvDockerImage(),
commandHooks: {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
beforeBundling(inputDir: string, outputDir: string): string[] {
return [];
},
afterBundling(inputDir: string, outputDir: string): string[] {
return [
`python -m pip install ${inputDir} -t ${outputDir}`,
`pip install ${inputDir} --target ${outputDir}`,
`find ${outputDir} -name 'pandas' -exec rm -rf {}/tests/ \\;`,
];
},
59 changes: 59 additions & 0 deletions lib/workload/components/python-lambda-layer/uv_docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# NOTES -
# THIS IS FROM https://github.com/aws/aws-cdk/blob/main/packages/%40aws-cdk/aws-lambda-python-alpha/lib/Dockerfile
# We have made the following changes:
# 1. Added the installation of 'uv'
# 2. Addded in the UV_CACHE_DIR environment variable
# 3. Replaced 'pip' with 'uv pip' in the main code chunk
# 4. Replaced /usr/app/venv/pip with uv pip shell script wrapper
# The correct AWS SAM build image based on the runtime of the function will be
# passed as build arg. The default allows to do `docker build .` when testing.
ARG IMAGE=public.ecr.aws/sam/build-python3.12:latest
FROM $IMAGE

ARG PIP_INDEX_URL
ARG PIP_EXTRA_INDEX_URL
ARG HTTPS_PROXY
ARG POETRY_VERSION=1.5.1

# ADDITION: Install uv
RUN curl -LsSf https://astral.sh/uv/install.sh | XDG_CONFIG_HOME=/tmp UV_INSTALL_DIR=/usr/bin sh

# Add virtualenv path
ENV PATH="/usr/app/venv/bin:$PATH"

# set the pip cache location
ENV PIP_CACHE_DIR=/tmp/pip-cache

# set the poetry cache
ENV POETRY_CACHE_DIR=/tmp/poetry-cache

# ADDITION: set the uv cache
ENV UV_CACHE_DIR=/tmp/uv-cache

# ADDITION: Replace 'pip' with uv pip in the following chunk
RUN \
# create a new virtualenv for python to use
# so that it isn't using root
python -m venv /usr/app/venv && \
# Create a new location for the pip cache
mkdir /tmp/pip-cache && \
# Ensure all users can write to pip cache
chmod -R 777 /tmp/pip-cache && \
# Upgrade pip (required by cryptography v3.4 and above, which is a dependency of poetry)
uv pip install --upgrade pip && \
# Create a new location for the poetry cache
mkdir /tmp/poetry-cache && \
# Ensure all users can write to poetry cache
chmod -R 777 /tmp/poetry-cache && \
# pipenv 2022.4.8 is the last version with Python 3.6 support
uv pip install pipenv==2022.4.8 poetry==$POETRY_VERSION && \
# Ensure no temporary files remain in the caches
rm -rf /tmp/pip-cache/* /tmp/poetry-cache/*

# ADDITION: Replace /usr/app/venv/pip with uv pip
RUN printf '#!/bin/bash \n uv pip $@' > /usr/app/venv/bin/pip && \
printf '#!/bin/bash \n uv pip $@' > /usr/app/venv/bin/pip3

RUN rm -rf /tmp/uv-cache

CMD [ "python" ]
Original file line number Diff line number Diff line change
@@ -3,10 +3,47 @@
# Utils
from .utils.aws_helpers import get_orcabus_token

# Errors
from .utils.errors import (
SampleNotFoundError,
SubjectNotFoundError,
ProjectNotFoundError,
IndividualNotFoundError,
LibraryNotFoundError,
ContactNotFoundError,
)

# Models
from .utils.models import (
MetadataBase,
LibraryBase,
SampleBase,
SubjectBase,
IndividualBase,
ProjectBase,
ContactBase,
LibraryDetail,
SampleDetail,
SubjectDetail,
IndividualDetail,
ProjectDetail,
ContactDetail,
Library,
Sample,
Subject,
Individual,
Project,
Contact,
LimsRow,
)

# Library Helpers
from .utils.library_helpers import (
get_library_from_library_id,
get_library_id_from_library_orcabus_id,
get_library_orcabus_id_from_library_id,
get_library_from_library_orcabus_id,
coerce_library_id_or_orcabus_id_to_library_orcabus_id,
get_subject_from_library_id,
get_library_type,
get_library_assay_type,
@@ -18,15 +55,19 @@
# Sample Helpers
from .utils.sample_helpers import (
get_sample_from_sample_id,
get_sample_orcabus_id_from_sample_id,
get_sample_from_sample_orcabus_id,
list_libraries_in_sample,
coerce_sample_id_or_orcabus_id_to_sample_orcabus_id,
get_all_samples
)

# Subject Helpers
from .utils.subject_helpers import (
get_subject_from_subject_id,
get_subject_orcabus_id_from_subject_id,
get_subject_from_subject_orcabus_id,
coerce_subject_id_or_orcabus_id_to_subject_orcabus_id,
list_samples_in_subject,
list_libraries_in_subject,
get_all_subjects
@@ -35,58 +76,116 @@
# Project Helpers
from .utils.project_helpers import (
get_all_projects,
get_project_orcabus_id_from_project_id,
get_project_from_project_id,
get_project_from_project_orcabus_id,
coerce_project_id_or_orcabus_id_to_project_orcabus_id,
list_libraries_in_project
)

# Individual Helpers
from .utils.individual_helpers import (
get_individual_from_individual_id,
get_individual_orcabus_id_from_individual_id,
get_individual_from_individual_orcabus_id,
get_all_individuals
coerce_individual_id_or_orcabus_id_to_individual_orcabus_id,
get_all_individuals,
list_libraries_in_individual
)

# Contact helpers
from .utils.contact_helpers import (
get_contact_from_contact_id,
get_contact_orcabus_id_from_contact_id,
get_contact_from_contact_orcabus_id,
coerce_contact_id_or_orcabus_id_to_contact_orcabus_id,
get_all_contacts
)

# Miscell
from .utils.lims_helpers import (
generate_lims_row,
)

# Set _all__
__all__ = [
# Errors
'SampleNotFoundError',
'SubjectNotFoundError',
'ProjectNotFoundError',
'IndividualNotFoundError',
'LibraryNotFoundError',
'ContactNotFoundError',
# Models
'MetadataBase',
'LibraryBase',
'SampleBase',
'SubjectBase',
'IndividualBase',
'ProjectBase',
'ContactBase',
'LibraryDetail',
'SampleDetail',
'SubjectDetail',
'IndividualDetail',
'ProjectDetail',
'ContactDetail',
'Library',
'Sample',
'Subject',
'Individual',
'Project',
'Contact',
'LimsRow',
# Utils
'get_orcabus_token',
# Library Funcs
'get_library_from_library_id',
'get_library_orcabus_id_from_library_id',
'get_library_id_from_library_orcabus_id',
'get_library_from_library_orcabus_id',
'get_subject_from_library_id',
'coerce_library_id_or_orcabus_id_to_library_orcabus_id',
'get_library_type',
'get_library_assay_type',
'get_library_phenotype',
'get_library_workflow',
'get_all_libraries',
# Sample Funcs
'get_sample_from_sample_id',
'get_sample_orcabus_id_from_sample_id',
'get_sample_from_sample_orcabus_id',
'coerce_sample_id_or_orcabus_id_to_sample_orcabus_id',
'list_libraries_in_sample',
'list_samples_in_subject',
'get_all_samples',
# Subject Funcs
'get_subject_from_subject_id',
'get_subject_orcabus_id_from_subject_id',
'get_subject_from_subject_orcabus_id',
'coerce_subject_id_or_orcabus_id_to_subject_orcabus_id',
'list_libraries_in_subject',
'get_all_subjects',
# Project Funcs
'get_all_projects',
'get_project_from_project_id',
'get_project_orcabus_id_from_project_id',
'get_project_from_project_orcabus_id',
'coerce_project_id_or_orcabus_id_to_project_orcabus_id',
'list_libraries_in_project',
# Individual Funcs
'get_individual_from_individual_id',
'get_individual_orcabus_id_from_individual_id',
'get_individual_from_individual_orcabus_id',
'coerce_individual_id_or_orcabus_id_to_individual_orcabus_id',
'get_all_individuals',
'list_libraries_in_individual',
# Contact Funcs
'get_contact_from_contact_id',
'get_contact_orcabus_id_from_contact_id',
'get_contact_from_contact_orcabus_id',
'coerce_contact_id_or_orcabus_id_to_contact_orcabus_id',
'get_all_contacts',
# Miscell
'generate_lims_row',
]
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

# Standard imports
import typing
from typing import Optional
import boto3
import json
from os import environ
@@ -11,6 +12,10 @@
from mypy_boto3_secretsmanager import SecretsManagerClient
from mypy_boto3_ssm import SSMClient

# Set globals
ORCABUS_TOKEN_STR: Optional[str] = None
HOSTNAME_STR: Optional[str] = None


def get_secretsmanager_client() -> 'SecretsManagerClient':
return boto3.client('secretsmanager')
@@ -39,13 +44,33 @@ def get_ssm_value(parameter_name) -> str:
return get_ssm_parameter_response['Parameter']['Value']


def set_orcabus_token():
global ORCABUS_TOKEN_STR

ORCABUS_TOKEN_STR = (
json.loads(
get_secret_value(environ.get("ORCABUS_TOKEN_SECRET_ID"))
)['id_token']
)


def get_orcabus_token() -> str:
"""
From the AWS Secrets Manager, retrieve the OrcaBus token.
:return:
"""
return json.loads(get_secret_value(environ.get("ORCABUS_TOKEN_SECRET_ID")))['id_token']
if ORCABUS_TOKEN_STR is None:
set_orcabus_token()
return ORCABUS_TOKEN_STR


def set_hostname():
global HOSTNAME_STR

HOSTNAME_STR = get_ssm_value(environ.get("HOSTNAME_SSM_PARAMETER"))


def get_hostname() -> str:
return get_ssm_value(environ.get("HOSTNAME_SSM_PARAMETER"))
if HOSTNAME_STR is None:
set_hostname()
return HOSTNAME_STR
Original file line number Diff line number Diff line change
@@ -6,15 +6,20 @@

# Standard imports
from typing import List, Dict
from .globals import CONTACT_ENDPOINT

from requests import HTTPError

from .globals import CONTACT_ENDPOINT, ORCABUS_ULID_REGEX_MATCH
from .models import Contact

# Local imports
from .requests_helpers import get_request_response_results
from .. import ContactNotFoundError


def get_contact_from_contact_id(contact_id: str) -> Dict:
def get_contact_from_contact_id(contact_id: str) -> Contact:
"""
Get subject from the subject id
Get contact from the contact id
:param contact_id:
:return:
"""
@@ -23,8 +28,19 @@ def get_contact_from_contact_id(contact_id: str) -> Dict:
"contact_id": contact_id
}

# Get subject
return get_request_response_results(CONTACT_ENDPOINT, params)[0]
# Get contact
try:
query_list = get_request_response_results(CONTACT_ENDPOINT, params)
assert len(query_list) == 1
return query_list[0]
except (HTTPError, AssertionError):
raise ContactNotFoundError(
contact_id=contact_id,
)


def get_contact_orcabus_id_from_contact_id(contact_id: str) -> str:
return get_contact_from_contact_id(contact_id)['orcabusId']


def get_contact_from_contact_orcabus_id(contact_orcabus_id: str) -> Dict:
@@ -38,13 +54,26 @@ def get_contact_from_contact_orcabus_id(contact_orcabus_id: str) -> Dict:
}

# Get contact
return get_request_response_results(CONTACT_ENDPOINT, params)[0]
try:
query_result = get_request_response_results(CONTACT_ENDPOINT, params)
assert len(query_result) == 1
return query_result[0]
except (HTTPError, AssertionError):
raise ContactNotFoundError(
contact_orcabus_id=contact_orcabus_id,
)


def coerce_contact_id_or_orcabus_id_to_contact_orcabus_id(id_: str) -> str:
if ORCABUS_ULID_REGEX_MATCH.match(id_):
return id_
else :
return get_contact_orcabus_id_from_contact_id(id_)

def get_all_contacts() -> List[Dict]:

def get_all_contacts() -> List[Contact]:
"""
Get all subjects
:return:
"""

return get_request_response_results(CONTACT_ENDPOINT)
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# SampleNotFoundError
from typing import Optional


class SampleNotFoundError(Exception):
def __init__(
self,
sample_id: Optional[str] = None,
sample_orcabus_id: Optional[str] = None
):
self.sample_id = sample_id
self.sample_orcabus_id = sample_orcabus_id
if sample_id is not None:
self.message = f"Could not find sample with id '{sample_id}'"
elif sample_orcabus_id is not None:
self.message = f"Could not find sample with OrcaBus ID '{sample_orcabus_id}'"
else:
self.message = "Could not find sample"
super().__init__(self.message)


class SubjectNotFoundError(Exception):
def __init__(
self,
subject_id: Optional[str] = None,
subject_orcabus_id: Optional[str] = None
):
self.subject_id = subject_id
self.subject_orcabus_id = subject_orcabus_id
if subject_id is not None:
self.message = f"Could not find subject with id '{subject_id}'"
elif subject_orcabus_id is not None:
self.message = f"Could not find subject with OrcaBus ID '{subject_orcabus_id}'"
else:
self.message = "Could not find subject"
super().__init__(self.message)


class ProjectNotFoundError(Exception):
def __init__(
self,
project_id: Optional[str] = None,
project_orcabus_id: Optional[str] = None
):
self.project_id = project_id
self.project_orcabus_id = project_orcabus_id
if project_id is not None:
self.message = f"Could not find project with id '{project_id}'"
elif project_orcabus_id is not None:
self.message = f"Could not find project with OrcaBus ID '{project_orcabus_id}'"
else:
self.message = "Could not find project"
super().__init__(self.message)


class IndividualNotFoundError(Exception):
def __init__(
self,
individual_id: Optional[str] = None,
individual_orcabus_id: Optional[str] = None
):
self.individual_id = individual_id
self.individual_orcabus_id = individual_orcabus_id
if individual_id is not None:
self.message = f"Could not find individual with id '{individual_id}'"
elif individual_orcabus_id is not None:
self.message = f"Could not find individual with OrcaBus ID '{individual_orcabus_id}'"
else:
self.message = "Could not find individual"
super().__init__(self.message)


class LibraryNotFoundError(Exception):
def __init__(
self,
library_id: Optional[str] = None,
library_orcabus_id: Optional[str] = None
):
self.library_id = library_id
self.library_orcabus_id = library_orcabus_id
if library_id is not None:
self.message = f"Could not find library with id '{library_id}'"
elif library_orcabus_id is not None:
self.message = f"Could not find library with OrcaBus ID '{library_orcabus_id}'"
else:
self.message = "Could not find library"
super().__init__(self.message)


class ContactNotFoundError(Exception):
def __init__(
self,
contact_id: Optional[str] = None,
contact_orcabus_id: Optional[str] = None
):
self.contact_id = contact_id
self.contact_orcabus_id = contact_orcabus_id
if contact_id is not None:
self.message = f"Could not find contact with id '{contact_id}'"
elif contact_orcabus_id is not None:
self.message = f"Could not find contact with OrcaBus ID '{contact_orcabus_id}'"
else:
self.message = "Could not find contact"
super().__init__(self.message)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

from enum import Enum
import re

# AWS PARAMETERS
METADATA_SUBDOMAIN_NAME = "metadata"
@@ -11,4 +11,6 @@
SUBJECT_ENDPOINT = "api/v1/subject"
PROJECT_ENDPOINT = "api/v1/project"
INDIVIDUAL_ENDPOINT = "api/v1/individual"
CONTACT_ENDPOINT = "api/v1/contact"
CONTACT_ENDPOINT = "api/v1/contact"

ORCABUS_ULID_REGEX_MATCH = re.compile(r'^(?:[a-z0-9]{3}\.)?[A-Z0-9]{26}$')
Original file line number Diff line number Diff line change
@@ -4,25 +4,20 @@
This module contains helper functions for the individual class.
"""

#!/usr/bin/env python3


# !/usr/bin/env python3


"""
Helper functions for a subject
"""

# Standard imports
from typing import Dict
from typing import List
from requests import HTTPError
from functools import reduce
from operator import concat

# Local imports
from .globals import INDIVIDUAL_ENDPOINT
from .models import Individual
from .. import list_libraries_in_subject, IndividualNotFoundError
from .globals import INDIVIDUAL_ENDPOINT, ORCABUS_ULID_REGEX_MATCH
from .requests_helpers import get_request_response_results


def get_individual_from_individual_id(individual_id: str) -> Dict:
def get_individual_from_individual_id(individual_id: str) -> Individual:
"""
Get individual from the individual id
:param individual_id:
@@ -34,10 +29,21 @@ def get_individual_from_individual_id(individual_id: str) -> Dict:
}

# Get individual
return get_request_response_results(INDIVIDUAL_ENDPOINT, params)[0]
try:
query_results = get_request_response_results(INDIVIDUAL_ENDPOINT, params)
assert len(query_results) == 1
return query_results[0]
except (HTTPError, AssertionError):
raise IndividualNotFoundError(
individual_id=individual_id
)


def get_individual_orcabus_id_from_individual_id(individual_id: str) -> str:
return get_individual_from_individual_id(individual_id)["orcabusId"]


def get_individual_from_individual_orcabus_id(individual_orcabus_id: str) -> Dict:
def get_individual_from_individual_orcabus_id(individual_orcabus_id: str) -> Individual:
"""
Get individual from the individual id
:param individual_orcabus_id:
@@ -49,7 +55,21 @@ def get_individual_from_individual_orcabus_id(individual_orcabus_id: str) -> Dic
}

# Get individual
return get_request_response_results(INDIVIDUAL_ENDPOINT, params)[0]
try:
query_results = get_request_response_results(INDIVIDUAL_ENDPOINT, params)
assert len(query_results) == 1
return query_results[0]
except (HTTPError, AssertionError):
raise IndividualNotFoundError(
individual_orcabus_id=individual_orcabus_id
)


def coerce_individual_id_or_orcabus_id_to_individual_orcabus_id(id_: str) -> str:
if ORCABUS_ULID_REGEX_MATCH.match(id_):
return id_
else :
return get_individual_orcabus_id_from_individual_id(id_)


def get_all_individuals():
@@ -58,3 +78,23 @@ def get_all_individuals():
:return:
"""
return get_request_response_results(INDIVIDUAL_ENDPOINT)


def list_libraries_in_individual(individual_orcabus_id: str) -> List[Individual]:
"""
Given an individual id, return all the libraries associated with the individual
First we need to collect all subjects associated with the individual
Then we need to collect all libraries associated with the subjects
:param individual_orcabus_id:
:return:
"""
return list(reduce(
concat,
list(map(
# For each subject, get libraries in subject
lambda subject_iter_: list_libraries_in_subject(subject_iter_['orcabusId']),
# Get list of subject orcabus ids
get_individual_from_individual_orcabus_id(individual_orcabus_id)["subjectSet"]
))
))
Loading