Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Library and Samplesheet Linking for Existing Sequence Runs #889

Merged
merged 7 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import path from 'path';
import * as cdk from 'aws-cdk-lib';
import { aws_lambda, aws_secretsmanager, Duration, Stack } from 'aws-cdk-lib';
import { aws_lambda, aws_secretsmanager, Duration, Stack, RemovalPolicy } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { ISecurityGroup, IVpc, SecurityGroup, Vpc, VpcLookupOptions } from 'aws-cdk-lib/aws-ec2';
import { EventBus, EventField, IEventBus, Rule, RuleTargetInput } from 'aws-cdk-lib/aws-events';
Expand All @@ -24,6 +24,7 @@ import {
import { ApiGatewayConstruct, ApiGatewayConstructProps } from '../../../../components/api-gateway';
import { Architecture, IFunction } from 'aws-cdk-lib/aws-lambda';
import { PostgresManagerStack } from '../../../../stateful/stacks/postgres-manager/deploy/stack';
import { Bucket } from 'aws-cdk-lib/aws-s3';

export interface SequenceRunManagerStackProps {
lambdaSecurityGroupName: string;
Expand Down Expand Up @@ -105,6 +106,7 @@ export class SequenceRunManagerStack extends Stack {
this.createApiHandlerAndIntegration(props);
this.createProcSqsHandler();
this.createSlackNotificationHandler(props.slackTopicName, props.orcabusUIBaseUrl);
this.createLibraryLinkingHandler();
}

private createPythonFunction(name: string, props: object): PythonFunction {
Expand Down Expand Up @@ -316,4 +318,51 @@ export class SequenceRunManagerStack extends Stack {
})
);
}

private createLibraryLinkingHandler() {
// create a s3 bucket to store the library linking data
const srmTempLinkingDataBucket = new Bucket(this, 'SrmTempLinkingDataBucket', {
bucketName: 'orcabus-temp-srm-linking-data-' + this.account + '-ap-southeast-2',
removalPolicy: RemovalPolicy.DESTROY,
enforceSSL: true,
});

// lambda function to check and create the library linking
const libraryLinkingFn = this.createPythonFunction('LibraryLinking', {
index: 'sequence_run_manager_proc/lambdas/check_and_create_library_linking.py',
handler: 'handler',
timeout: Duration.minutes(15),
environment: {
LINKING_DATA_BUCKET_NAME: srmTempLinkingDataBucket.bucketName,
...this.lambdaEnv,
},
});

// lambda function to check and create the samplesheet
const samplesheetFn = this.createPythonFunction('Samplesheet', {
index: 'sequence_run_manager_proc/lambdas/check_and_create_samplesheet.py',
handler: 'handler',
timeout: Duration.minutes(15),
environment: {
LINKING_DATA_BUCKET_NAME: srmTempLinkingDataBucket.bucketName,
...this.lambdaEnv,
},
});

// grant the lambda function permission to write to the library linking bucket
srmTempLinkingDataBucket.grantReadWrite(libraryLinkingFn);
srmTempLinkingDataBucket.grantReadWrite(samplesheetFn);
libraryLinkingFn.addToRolePolicy(
new PolicyStatement({
actions: ['s3:GetObject', 's3:PutObject', 's3:DeleteObject'],
resources: [srmTempLinkingDataBucket.arnForObjects('*')],
})
);
samplesheetFn.addToRolePolicy(
new PolicyStatement({
actions: ['s3:GetObject', 's3:PutObject', 's3:DeleteObject'],
resources: [srmTempLinkingDataBucket.arnForObjects('*')],
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

router = OptionalSlashDefaultRouter()
router.register(r"sequence", SequenceViewSet, basename="sequence")
router.register(r"sequence", SampleSheetViewSet, basename="sequence-sample-sheet")
router.register("sequence/(?P<orcabus_id>[^/]+)/comment", CommentViewSet, basename="sequence-comment")
router.register("sequence/(?P<orcabus_id>[^/]+)/state", StateViewSet, basename="sequence-states")
router.register("sequence/(?P<orcabus_id>[^/]+)/sample_sheet", SampleSheetViewSet, basename="sequence-sample-sheet")
router.register(r"sequence/stats", SequenceStatsViewSet, basename="sequence-stats")


Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
from sequence_run_manager.models import SampleSheet
from sequence_run_manager.models import SampleSheet, Sequence
from sequence_run_manager.serializers.sample_sheet import SampleSheetSerializer
from rest_framework.viewsets import GenericViewSet
from rest_framework.viewsets import ViewSet
from rest_framework import mixins
from django.shortcuts import get_object_or_404
from rest_framework.response import Response
from rest_framework import status
from rest_framework.decorators import action
from drf_spectacular.utils import extend_schema, OpenApiResponse

import logging
logger = logging.getLogger(__name__)

class SampleSheetViewSet(mixins.ListModelMixin, GenericViewSet):
class SampleSheetViewSet(ViewSet):
"""
ViewSet for sample sheet
"""
serializer_class = SampleSheetSerializer
search_fields = SampleSheet.get_base_fields()
queryset = Sequence.objects.all()
pagination_class = None
lookup_value_regex = "[^/]+" # to allow id prefix

def get_queryset(self):
return SampleSheet.objects.filter(sequence_id=self.kwargs["orcabus_id"], association_status='active')
@extend_schema(
responses={
200: SampleSheetSerializer,
404: OpenApiResponse(description="No active sample sheet found for this sequence.")
},
operation_id="get_sequence_sample_sheet"
)
@action(detail=True, methods=["get"], url_name="sample_sheet", url_path="sample_sheet")
def sample_sheet(self, request, *args, **kwargs):
"""
Returns a queryset containing a single SampleSheet record or an empty queryset.
"""

sample_sheet = SampleSheet.objects.filter(sequence_id = kwargs.get('pk'), association_status='active').first()
if sample_sheet:
return Response(SampleSheetSerializer(sample_sheet).data, status=status.HTTP_200_OK)
else:
return Response(status=status.HTTP_404_NOT_FOUND)


Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class SequenceViewSet(BaseViewSet):
serializer_class = SequenceSerializer
search_fields = Sequence.get_base_fields()
queryset = Sequence.objects.all()

def get_queryset(self):
"""Pick up the start_time and end_time from the query params and exclude them from the rest of the query params"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import os
import logging

import django

django.setup()

from typing import Dict, List
import time
import json
import boto3
from django.utils import timezone
from django.db import transaction
from sequence_run_manager.models.sequence import Sequence, LibraryAssociation

# Configure logging for AWS Lambda
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Constants - Optimized for files with <200 records
BATCH_SIZE = 50 # Keep bulk create batch size for database operations
ASSOCIATION_STATUS = "ACTIVE"

class LibraryLinkingProcessor:
def __init__(self):
self.s3_client = boto3.client('s3')
self.start_time = time.time()
self.stats = {
"processed_count": 0,
"error_count": 0,
"success_count": 0,
"skipped_count": 0,
"total_associations_created": 0
}
self.sequence_cache = {}



def prefetch_sequences(self, instrument_ids: List[str]):
"""Prefetch sequences for all instrument IDs"""
sequences = Sequence.objects.filter(instrument_run_id__in=instrument_ids)
self.sequence_cache = {seq.instrument_run_id: seq for seq in sequences}
logger.info(f"Prefetched {len(self.sequence_cache)} sequences")

def check_existing_associations(self, sequence_ids: List[str]) -> set:
"""Get sequences that already have associations"""
existing = LibraryAssociation.objects.filter(
sequence_id__in=sequence_ids
).values_list('sequence_id', flat=True)
return set(existing)

def process_batch(self, records: List[Dict], batch_index: int) -> List[Dict]:
"""Process a batch of records efficiently"""
results = []

# Prefetch sequences for all records at once
instrument_ids = [record['instrument_id'] for record in records]
self.prefetch_sequences(instrument_ids)

# Check existing associations in one query
sequence_ids = [seq.orcabus_id for seq in self.sequence_cache.values()]
existing_associations = self.check_existing_associations(sequence_ids)

# Process records
associations_to_create = []

for record in records:
instrument_id = record['instrument_id']
result = {
"instrument_id": instrument_id,
"batch_index": batch_index
}

try:
sequence = self.sequence_cache.get(instrument_id)
if not sequence:
result.update({
"status": "error",
"error": "Sequence not found"
})
self.stats["error_count"] += 1
results.append(result)
continue

if sequence.orcabus_id in existing_associations:
result.update({
"status": "skipped",
"reason": "existing_associations"
})
self.stats["skipped_count"] += 1
results.append(result)
continue

# Collect all associations for bulk create
associations_to_create.extend([
LibraryAssociation(
library_id=library_id,
sequence=sequence,
association_date=timezone.now(),
status=ASSOCIATION_STATUS
) for library_id in record['library_ids']
])

result.update({
"status": "pending",
"libraries_count": len(record['library_ids'])
})
results.append(result)

except Exception as e:
logger.error(f"Error processing {instrument_id}: {str(e)}")
result.update({
"status": "error",
"error": str(e)
})
self.stats["error_count"] += 1
results.append(result)

# Bulk create all associations in one transaction
try:
with transaction.atomic():
created = LibraryAssociation.objects.bulk_create(
associations_to_create,
batch_size=BATCH_SIZE # Keep batch size for database efficiency
)

self.stats["total_associations_created"] += len(created)
self.stats["success_count"] += sum(
1 for r in results if r["status"] == "pending"
)

# Update results status
for result in results:
if result["status"] == "pending":
result["status"] = "success"

except Exception as e:
logger.error(f"Bulk creation error in batch {batch_index}: {str(e)}")
for result in results:
if result["status"] == "pending":
result["status"] = "error"
result["error"] = "Bulk creation failed"
self.stats["error_count"] += 1

self.stats["processed_count"] += len(records)
return results

def process_file(self, data: List[Dict]) -> List[Dict]:
"""Process all records in the file"""
logger.info(f"Starting to process {len(data)} records")

# Process all records in one batch since file is small
return self.process_batch(data, 0)

def handler(event, context):
"""
Lambda handler for processing library linking file

Expected event structure:
{
"key": "instrument_library_linkings_1.json"
}
"""
start_time = time.time()
processor = LibraryLinkingProcessor()

assert os.environ['LINKING_DATA_BUCKET_NAME'], "LINKING_DATA_BUCKET_NAME is not set"

try:
# Read data from S3
response = processor.s3_client.get_object(
Bucket=os.environ['LINKING_DATA_BUCKET_NAME'],
Key=event['key']
)
data = json.loads(response['Body'].read().decode('utf-8'))

# Process the file
results = processor.process_file(data)

# Prepare summary
execution_time = time.time() - start_time
summary = {
"file_processed": event['key'],
"total_records": len(data),
**processor.stats,
"total_associations": processor.stats["total_associations_created"],
"execution_time": execution_time,
"completed": processor.stats["processed_count"] == len(data),
"results": results
}

logger.info(f"Processing summary: {summary}")

return {
"statusCode": 200,
"body": json.dumps(summary)
}

except Exception as e:
logger.error(f"Fatal error processing file {event['key']}: {str(e)}")
raise
Loading
Loading