Skip to content

Feature: SQS Queue Consumer L3 Construct with Typed Message Handlers #356

@hoegertn

Description

@hoegertn

Summary

Add an opinionated L3 construct for SQS → Lambda processing that follows the cdk-serverless code generation approach. Users define message schemas in a definition file and get typed handlers, DLQ configuration, partial batch failure handling, and sensible defaults — all generated automatically.

Problem

SQS-triggered Lambda is one of the most common serverless patterns for decoupling and buffering workloads, but setting it up properly involves a surprising amount of detail:

  • Calculating visibility timeout relative to Lambda timeout (the AWS docs recommend 6x)
  • Configuring a dead-letter queue with appropriate maxReceiveCount
  • Enabling ReportBatchItemFailures for partial batch failure handling
  • Setting batch size and batching window for throughput tuning
  • FIFO queue configuration (deduplication, message group ID)
  • IAM permissions between SQS, Lambda, and DLQ
  • No typed message payloads — handlers work with raw SQSEvent records

cdk-serverless users shouldn't have to think about any of this for the common case.

Proposed Solution

Definition File

# queues/order-processing.yaml
processorName: OrderProcessing
queues:
  ProcessOrders:
    schema:
      type: object
      properties:
        orderId:
          type: string
        customerId:
          type: string
        items:
          type: array
          items:
            type: object
            properties:
              productId:
                type: string
              quantity:
                type: integer
      required: [orderId, customerId, items]
    config:
      batchSize: 10
      maxBatchingWindow: 30  # seconds
      maxConcurrency: 5      # reserved concurrent executions
      fifo: false

  SendNotifications:
    schema:
      type: object
      properties:
        userId:
          type: string
        channel:
          type: string
          enum: [email, sms, push]
        templateId:
          type: string
        data:
          type: object
      required: [userId, channel, templateId]
    config:
      batchSize: 1           # process one notification at a time
      fifo: true
      deduplication: contentBased

Projen Integration

import { QueueProcessor } from 'cdk-serverless/projen';

new QueueProcessor(project, {
  processorName: 'OrderProcessing',
  definitionFile: 'queues/order-processing.yaml',
});

Running projen generates:

  • Typed message interfaces (e.g. ProcessOrdersMessage, SendNotificationsMessage)
  • Typed handler signatures for each queue (batch and per-record)
  • A typed QueueSender class for enqueuing messages
  • The L3 CDK construct

CDK Construct Usage

import { OrderProcessingQueueProcessor } from './generated/queue.orderprocessing.generated';

const processor = new OrderProcessingQueueProcessor(this, 'Processor', {
  singleTableDatastore, // optional
  additionalEnv: {
    NOTIFICATION_SERVICE_URL: props.notificationUrl,
  },
});

The construct automatically:

  • Creates the SQS queue (standard or FIFO based on config)
  • Creates a DLQ with maxReceiveCount: 3 (configurable)
  • Sets visibility timeout to 6 × Lambda timeout
  • Creates the Lambda function with the event source mapping
  • Enables ReportBatchItemFailures on the event source mapping
  • Configures maxConcurrency on the event source mapping if specified
  • Sets up CloudWatch alarms on DLQ depth (non-empty) and age of oldest message
  • Integrates with the existing monitoring infrastructure

Handler DX — Per-Record Processing

// Process one message at a time — framework handles batch iteration and partial failures
export const handler: ProcessOrdersRecordHandler = async (message) => {
  const { orderId, customerId, items } = message; // typed from schema
  
  // Process order...
  // If this throws, only this record is reported as failed (partial batch failure)
};

Handler DX — Full Batch Processing

// For advanced use cases where you need control over the full batch
export const handler: ProcessOrdersBatchHandler = async (messages, ctx) => {
  const results = await Promise.allSettled(
    messages.map(msg => processOrder(msg))
  );
  
  // Return failed message IDs — framework builds the batchItemFailures response
  return ctx.reportFailures(results);
};

Sending Messages

import { OrderProcessingSender } from './generated/queue.orderprocessing-sender.generated';

const sender = new OrderProcessingSender();

// Type-safe — schema enforces payload, auto-completion works
await sender.send('ProcessOrders', {
  orderId: '123',
  customerId: 'cust-456',
  items: [{ productId: 'prod-789', quantity: 2 }],
});

// FIFO queue — messageGroupId required
await sender.send('SendNotifications', {
  userId: 'user-123',
  channel: 'email',
  templateId: 'order-confirmation',
  data: { orderNumber: '123' },
}, { messageGroupId: 'user-123' });

Opinionated Defaults

Setting Default Rationale
Visibility Timeout 6 × Lambda timeout AWS recommendation
DLQ maxReceiveCount 3 Retry 3 times before dead-lettering
Batch Size 10 Lambda SQS default
Max Batching Window 0 (disabled) Low-latency by default
ReportBatchItemFailures enabled Always use partial batch failures
Reserved Concurrency none Opt-in to avoid over-throttling
Encryption SQS-managed SSE Encryption at rest by default

All defaults are overridable via the definition file or construct props.

Integration Points

  • EventBus construct: EventBridge rule → SQS queue for buffered event processing
  • SingleTableDatastore: Handlers get a pre-configured datastore client via context
  • SNS construct (if implemented): SNS → SQS subscription for fan-out with buffering
  • S3EventProcessor (if implemented): Uses SQS buffer mode internally
  • RestApi / GraphQlApi: API handlers use the generated sender to enqueue work

Out of Scope

  • SQS as an event source for Step Functions → Step Functions already exists in the toolkit
  • SQS → Lambda → SQS chaining patterns → users compose individual constructs
  • Amazon MQ / MSK consumer patterns → different services, different scope
  • FIFO exactly-once processing guarantees beyond what SQS natively provides

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions