Skip to content

Feature: Real-Time Event API L3 Construct (AppSync Events) #359

@hoegertn

Description

@hoegertn

Summary

Add an opinionated L3 construct for AWS AppSync Events API that follows the cdk-serverless code generation approach. Users define channel namespaces, payload schemas, and authorization rules in a definition file and get typed event handlers (onPublish, onSubscribe), typed server-side publishing utilities, and managed infrastructure — all generated automatically using AppSync's managed WebSocket infrastructure for connection lifecycle, fan-out, and scaling.

Problem

Real-time features (notifications, live updates, collaborative editing, dashboards) are increasingly expected in modern applications. AppSync Events API provides managed serverless WebSocket infrastructure with channel-based pub/sub, but wiring it up still requires:

  • Creating the AppSync Event API with proper authorization configuration
  • Defining channel namespaces and their auth rules
  • Writing onPublish and onSubscribe event handlers with untyped payloads
  • Building server-side publishing logic (HTTP POST to the AppSync Events endpoint)
  • Configuring multi-auth (e.g., Cognito for users, IAM for service-to-service)
  • Setting up custom domains
  • No type-safe channel payloads — everything is raw JSON

AppSync Events eliminates the need to manage connections, DynamoDB connection stores, or @connections POST-back logic that API Gateway WebSocket APIs require. The managed fan-out to subscribers, automatic connection lifecycle handling, and native channel semantics make it the right foundation for a cdk-serverless real-time construct.

Proposed Solution

Definition File

# realtime/live-updates.yaml
apiName: LiveUpdates
namespaces:
  orders:
    description: Real-time order status updates
    authorization:
      publish: iam       # only backend services can publish
      subscribe: cognito # authenticated users can subscribe
    channels:
      statusUpdate:
        publishSchema:
          type: object
          properties:
            orderId:
              type: string
            status:
              type: string
              enum: [pending, processing, shipped, delivered, cancelled]
            estimatedDelivery:
              type: string
              format: date-time
            carrier:
              type: string
          required: [orderId, status]
        subscribeSchema:
          # What the client receives (can differ from publish if onPublish transforms)
          type: object
          properties:
            orderId:
              type: string
            status:
              type: string
            estimatedDelivery:
              type: string
            carrier:
              type: string
            updatedAt:
              type: number
          required: [orderId, status, updatedAt]
        onPublish: true    # generate a handler to validate/transform
        onSubscribe: true  # generate a handler for authorization

  chat:
    description: Real-time chat messages
    authorization:
      publish: cognito
      subscribe: cognito
    channels:
      message:
        publishSchema:
          type: object
          properties:
            roomId:
              type: string
            text:
              type: string
            replyTo:
              type: string
          required: [roomId, text]
        subscribeSchema:
          type: object
          properties:
            roomId:
              type: string
            text:
              type: string
            replyTo:
              type: string
            userId:
              type: string
            displayName:
              type: string
            sentAt:
              type: number
          required: [roomId, text, userId, sentAt]
        onPublish: true
        onSubscribe: true

  system:
    description: System-wide broadcast notifications
    authorization:
      publish: iam
      subscribe: cognito
    channels:
      announcement:
        publishSchema:
          type: object
          properties:
            title:
              type: string
            message:
              type: string
            severity:
              type: string
              enum: [info, warning, critical]
          required: [title, message, severity]
        onPublish: false   # no transformation needed
        onSubscribe: false # all authenticated users can subscribe

Projen Integration

import { RealtimeApi } from 'cdk-serverless/projen';

new RealtimeApi(project, {
  apiName: 'LiveUpdates',
  definitionFile: 'realtime/live-updates.yaml',
});

Running projen generates:

  • Typed payload interfaces per channel (e.g. OrderStatusUpdatePublishPayload, ChatMessageSubscribePayload)
  • Typed onPublish handler signatures (e.g. OrderStatusUpdatePublishHandler)
  • Typed onSubscribe handler signatures (e.g. ChatMessageSubscribeHandler)
  • A typed EventPublisher class for server-side publishing with per-channel methods
  • The L3 CDK construct

CDK Construct Usage

import { LiveUpdatesRealtimeApi } from './generated/realtime.liveupdates.generated';

const realtimeApi = new LiveUpdatesRealtimeApi(this, 'RealtimeApi', {
  stageName: props.stageName,
  domainName: props.domainName,
  realtimeHostname: 'rt', // rt.example.com
  authentication, // Cognito user pool integration
  additionalEnv: {
    DOMAIN_NAME: props.domainName,
  },
});

// Access the API endpoint for wiring into other constructs
const httpEndpoint = realtimeApi.httpEndpoint;
const realtimeEndpoint = realtimeApi.realtimeEndpoint;

The construct automatically:

  • Creates the AppSync Event API with channel namespace configuration
  • Configures authorization modes (Cognito, IAM, API Key, OIDC — multi-auth per namespace)
  • Creates Lambda functions for onPublish and onSubscribe handlers where defined
  • Connects event handlers to the appropriate channel namespaces
  • Configures custom domain (CNAME + ACM certificate) consistent with RestApi/GraphQlApi pattern
  • Sets up CloudWatch logging with configurable log level
  • Grants appsync:EventPublish permissions to Lambdas using the generated publisher
  • Integrates with the existing monitoring infrastructure

onPublish Handler DX

// Validate and enrich events before they reach subscribers
export const handler: OrderStatusUpdatePublishHandler = async (event, ctx) => {
  const { orderId, status, estimatedDelivery, carrier } = event.payload;
  
  // Validate: does the order exist?
  const order = await ctx.datastore.get({ PK: `ORDER#${orderId}` });
  if (!order) {
    return { error: 'Order not found' };
  }
  
  // Enrich: add timestamp, then AppSync handles fan-out
  return {
    payload: {
      orderId,
      status,
      estimatedDelivery,
      carrier,
      updatedAt: Date.now(),
    },
  };
};

onSubscribe Handler DX

// Control who can subscribe to which channels
export const handler: ChatMessageSubscribeHandler = async (event, ctx) => {
  const { roomId } = event.channelPath; // parsed from /chat/message/{roomId}
  const userId = ctx.identity.sub;
  
  // Check room membership
  const membership = await ctx.datastore.get({
    PK: `ROOM#${roomId}`,
    SK: `MEMBER#${userId}`,
  });
  
  if (!membership) {
    return { reject: true };
  }
  
  return { allow: true };
};

Server-Side Publishing (from other Lambdas)

import { LiveUpdatesPublisher } from './generated/realtime.liveupdates-publisher.generated';

// In an EventBridge handler, REST API handler, or any Lambda
const publisher = new LiveUpdatesPublisher();

// Type-safe — schema enforces payload shape, auto-completion works
await publisher.publish('orders', 'statusUpdate', {
  orderId: '123',
  status: 'shipped',
  estimatedDelivery: '2026-03-25T14:00:00Z',
  carrier: 'DHL',
  updatedAt: Date.now(),
});

// Broadcast system announcement — no channel path needed
await publisher.publish('system', 'announcement', {
  title: 'Scheduled Maintenance',
  message: 'System will be unavailable from 02:00-04:00 CET',
  severity: 'warning',
});

Connecting EDA → Real-Time Push

// In your stack — wire EventBridge events to real-time push
// EventBridge rule → Lambda → AppSync Events publish
const bus = new OrderEventsEventBus(this, 'Bus', { /* ... */ });

// The generated handler for OrderStatusChanged can publish to the real-time API
// by importing the LiveUpdatesPublisher in the handler code.
// Optionally, a convenience wiring method:
realtimeApi.connectEventBus(bus, {
  event: 'OrderStatusChanged',
  namespace: 'orders',
  channel: 'statusUpdate',
  // Transform EventBridge detail to AppSync Events payload
  transform: (detail) => ({
    orderId: detail.orderId,
    status: detail.newStatus,
    updatedAt: Date.now(),
  }),
});

Integration Points

  • EventBus construct: EventBridge rule → Lambda → AppSync Events publish, enabling backend event → real-time client push pipeline
  • Authentication: Cognito user pools wire directly into AppSync Events authorization modes
  • RestApi / GraphQlApi: Mutations or API calls trigger real-time pushes through the shared publisher utility
  • SingleTableDatastore: DynamoDB Streams → Lambda → publish to channel for real-time data sync; onPublish/onSubscribe handlers get a pre-configured datastore client
  • TopicPublisher construct: SNS subscriber Lambda publishes to AppSync Events for client push
  • S3EventProcessor construct: File processing completion → real-time progress update to client

Client Integration Notes

While client-side code generation is out of scope for this issue, the generated types and endpoint configuration should make client integration straightforward. The construct should output:

  • HTTP endpoint URL (for server-side publishing)
  • Real-time (WebSocket) endpoint URL (for client subscriptions)
  • API key (if API Key auth is used)
  • Channel namespace paths

These outputs enable frontend frameworks (Amplify, custom WebSocket clients) to connect with minimal configuration.

Test Utility Extension

Extend the existing IntegTestUtil with real-time testing capabilities:

const test = new IntegTestUtil({ /* existing config + */ realtimeOptions: {
  httpEndpoint: 'https://rt.example.com',
  realtimeEndpoint: 'wss://rt.example.com/event/realtime',
}});

// Subscribe and assert
const subscription = await test.subscribe('orders/statusUpdate', authenticatedUser);
await test.publish('orders', 'statusUpdate', { orderId: '123', status: 'shipped', updatedAt: Date.now() });
const received = await subscription.waitForMessage(5000);
expect(received.orderId).toBe('123');
await subscription.close();

// Test subscription rejection
await expect(
  test.subscribe('chat/message/room-999', unauthorizedUser)
).rejects.toThrow();

Out of Scope

  • Client-side code generation (React hooks, Amplify integration) → separate concern
  • AppSync GraphQL subscriptions → the existing GraphQlApi construct covers this
  • Custom WebSocket protocol handling → AppSync Events manages the protocol
  • Presence / typing indicators → application-level logic built on top of channels
  • Message history / persistence → use SingleTableDatastore separately; AppSync Events is fire-and-forget

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions