Skip to content

Feature: S3 Event Processing L3 Construct with Typed Handlers #358

@hoegertn

Description

@hoegertn

Summary

Add an opinionated L3 construct for S3 event-triggered Lambda processing that follows the cdk-serverless code generation approach. Users define bucket event configurations and processing schemas in a definition file and get typed handlers with pre-configured S3 clients, configurable processing modes (direct, buffered, EventBridge), DLQ handling, and concurrency control — all generated automatically.

Problem

S3 → Lambda is one of the most common serverless patterns (file uploads, ETL pipelines, media processing, document ingestion), but setting it up properly requires dealing with:

  • S3 event notification configuration (event types, prefix/suffix filters)
  • Choosing between S3 native notifications, SQS buffering, or EventBridge for S3
  • IAM permissions for Lambda to read objects, manage DLQ, etc.
  • Concurrency management to avoid throttling downstream services
  • DLQ/retry for failed processing (S3 native notifications have no built-in retry)
  • The raw S3Event type is cumbersome — extracting bucket, key, size requires navigating nested records
  • No pre-configured S3 client for reading the triggering object

Each of these decisions is well-understood and has a clear best practice, but developers still make mistakes (e.g., forgetting to URL-decode the object key, not buffering through SQS for high-throughput buckets, missing DLQ on the event source).

Proposed Solution

Definition File

# s3-processors/documents.yaml
processorName: DocumentIngestion
processors:
  IngestUpload:
    events: [s3:ObjectCreated:Put, s3:ObjectCreated:CompleteMultipartUpload]
    prefix: uploads/
    suffix: .pdf
    mode: buffered  # S3 → SQS → Lambda (recommended for production)
    config:
      batchSize: 5
      maxConcurrency: 10
      lambdaTimeout: 300  # seconds — PDF processing can be slow

  ProcessThumbnail:
    events: [s3:ObjectCreated:Put]
    prefix: images/
    suffix: .jpg
    mode: direct  # S3 → Lambda (fine for low-throughput)
    config:
      lambdaTimeout: 30
      memorySize: 1024  # image processing needs more memory

  AuditDeletion:
    events: [s3:ObjectRemoved:Delete]
    prefix: ""  # all objects
    mode: evented  # S3 → EventBridge → Rule → Lambda
    config:
      lambdaTimeout: 10

Processing Modes

Mode Flow Best For
direct S3 → Lambda Low-throughput, simple processing, dev/staging
buffered S3 → SQS → Lambda Production workloads, high throughput, needs DLQ and retry
evented S3 → EventBridge → Lambda When events need to be routed to multiple targets or integrated with existing EventBridge bus

Projen Integration

import { S3EventProcessor } from 'cdk-serverless/projen';

new S3EventProcessor(project, {
  processorName: 'DocumentIngestion',
  definitionFile: 's3-processors/documents.yaml',
});

Running projen generates:

  • Typed handler signatures per processor (e.g. IngestUploadHandler, ProcessThumbnailHandler)
  • A typed S3Record interface with convenient accessors (decoded key, content type, size, etag)
  • A pre-configured handler context with S3 client helpers
  • The L3 CDK construct

CDK Construct Usage

import { DocumentIngestionS3Processor } from './generated/s3.documentingestion.generated';

const processor = new DocumentIngestionS3Processor(this, 'Processor', {
  // Create a new bucket or reference an existing one
  bucketName: `customer-documents-${props.stageName}`,
  // Or: existingBucket: myBucket,
  
  singleTableDatastore, // optional: write processing metadata
  eventBus: orderEventsBus, // optional: emit events after processing
  
  // Bucket-level settings
  versioning: true,
  lifecycleRules: [
    { prefix: 'uploads/', expiration: Duration.days(90) },
  ],
});

// Access the bucket for granting permissions to other constructs
const bucket = processor.bucket;

The construct automatically:

  • Creates or references the S3 bucket with encryption (SSE-S3 or KMS)
  • Enables EventBridge notifications on the bucket (if any processor uses evented mode)
  • Creates Lambda functions for each processor with appropriate timeout and memory
  • direct mode: Configures S3 event notifications → Lambda with proper permissions
  • buffered mode: Creates SQS queue (with DLQ), configures S3 → SQS notification, sets up Lambda event source mapping with ReportBatchItemFailures, calculates visibility timeout from Lambda timeout
  • evented mode: Creates EventBridge rule matching the S3 event pattern (bucket, prefix, suffix, event type) → Lambda target with DLQ
  • Grants s3:GetObject (and s3:HeadObject) on the relevant prefix to each handler's role
  • Sets up CloudWatch alarms: processing errors, DLQ depth (buffered mode), duration P99 approaching timeout
  • Integrates with the existing monitoring infrastructure

Handler DX

// Typed handler — per-record processing with convenient context
export const handler: IngestUploadHandler = async (record, ctx) => {
  // record is typed with convenient accessors
  const { bucket, key, size, eTag, contentType } = record;
  // key is already URL-decoded — no need to handle %20 etc.
  
  // Pre-configured S3 client — reads from the triggering bucket
  const body = await ctx.s3.getObjectBody(bucket, key);
  const metadata = await ctx.s3.headObject(bucket, key);
  
  // Process the document...
  const extractedText = await processPdf(body);
  
  // Write metadata to SingleTableDatastore (if configured)
  await ctx.datastore.put({
    PK: `DOC#${key}`,
    SK: 'METADATA',
    status: 'processed',
    size,
    extractedTextLength: extractedText.length,
    processedAt: new Date().toISOString(),
  });
  
  // Emit event via EventBus (if configured)
  await ctx.events.emit('DocumentProcessed', {
    key,
    size,
    contentType,
    extractedTextLength: extractedText.length,
  });
};

Handler DX — Batch Processing (buffered mode)

// For buffered mode — process a batch of S3 records
export const handler: IngestUploadBatchHandler = async (records, ctx) => {
  const results = await Promise.allSettled(
    records.map(async (record) => {
      const body = await ctx.s3.getObjectBody(record.bucket, record.key);
      return processPdf(body);
    })
  );
  
  // Framework builds SQS batchItemFailures response
  return ctx.reportFailures(results);
};

Opinionated Defaults

Setting Default Rationale
Processing Mode buffered Production-safe default with DLQ and retry
SQS Visibility Timeout 6 × Lambda timeout AWS recommendation
DLQ maxReceiveCount 3 Retry 3 times before dead-lettering
Batch Size (buffered) 5 Balance throughput and per-record processing time
Lambda Timeout 60s Generous default for file processing
Lambda Memory 256 MB Sufficient for most text processing; override for images/video
Bucket Encryption SSE-S3 Encryption at rest, zero cost
Object Key Decoding automatic URL-decode keys in the typed record

All defaults are overridable via the definition file or construct props.

Integration Points

  • EventBus construct: Emit events after processing (e.g. DocumentProcessed, ThumbnailGenerated)
  • QueueProcessor construct: The buffered mode internally uses the same SQS patterns, consistent DX
  • SingleTableDatastore: Write processing metadata, track file status
  • Authentication: Scope bucket access per Cognito identity (e.g. uploads/{identity_id}/)
  • RestApi: Pre-signed URL generation for uploads, processing status queries

Out of Scope

  • S3 static website hosting / CloudFront distribution → different pattern entirely
  • S3 Batch Operations → different service, job-based processing
  • S3 Object Lambda → specialized access point use case
  • Multi-bucket processing pipelines → users compose individual processor constructs
  • S3 Replication → infrastructure concern, not application logic

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions