Skip to content

marcus-sa/deepkit-restate

Repository files navigation

Deepkit Restate

Deepkit Restate is a seamless Restate integration for Deepkit. It enables effortless communication between distributed services using durable invocations, service interfaces, and event-driven architecture.

This documentation assumes familiarity with Deepkit and Restate's concepts and lifecycle.


Installation

npm add deepkit-restate

Module Setup

To use Deepkit Restate, import the RestateModule and provide configuration for the components you need:

import { FrameworkModule } from '@deepkit/framework';
import { RestateModule } from 'deepkit-restate';
import { App } from '@deepkit/app';

const app = new App({
  imports: [
    new FrameworkModule(),
    new RestateModule({
      server: {
        host: 'http://localhost',
        port: 9080,
        propagateIncomingHeaders: true, // Forward all incoming headers to service calls
      },
      ingress: {
        url: 'http://localhost:8080',
      },
      pubsub: {
        cluster: 'default',
        defaultStream: 'all',
        sse: {
          url: 'http://localhost:3000',
        },
      },
      admin: {
        url: 'http://0.0.0.0:9070',
        deployOnStartup: true,
      },
    }),
  ],
});

You can configure any combination of the following:

  • server: Starts a Restate server
  • ingress: Enables outbound service calls
  • pubsub: Enables pub/sub event system
  • admin: Registers deployments with the admin interface

If a section is not configured, that functionality will not be available.

Server Configuration

The server configuration section supports the following options:

Option Type Default Description
host string - The host address for the Restate server
port number 9080 The port number for the Restate server
propagateIncomingHeaders true | string[] undefined Controls header propagation to downstream service calls

Header Propagation

The propagateIncomingHeaders option controls whether incoming request headers are forwarded when making service-to-service calls:

// Forward all incoming headers
server: {
  propagateIncomingHeaders: true
}

// Forward only specific headers
server: {
  propagateIncomingHeaders: ['authorization', 'x-correlation-id', 'x-tenant-id']
}

// No header propagation (default)
server: {
  // propagateIncomingHeaders not specified
}

This is particularly useful for:

  • Authentication: Forwarding authorization tokens through the service call chain
  • Tracing: Propagating correlation IDs for distributed tracing
  • Multi-tenancy: Passing tenant identifiers to downstream services
  • Custom context: Forwarding application-specific headers

Note: When propagateIncomingHeaders is enabled, the incoming headers are merged with any explicitly provided headers in the service call options. Explicitly provided headers take precedence over incoming headers.


Serialization (Serde) and Error Handling

All serialization and deserialization in Deepkit Restate is handled via BSON by default.

This means you can return and accept any types in your service handlers or saga steps, including:

  • Primitives (string, number, boolean, etc.)
  • Plain objects ({ name: string; age: number })
  • Class instances (with properties and methods)
  • Complex nested types and arrays
  • Custom types supported by BSON serialization

The serialization system preserves type fidelity and structure when encoding and decoding data across the network.

Automatic Error Forwarding and Serialization

  • If an error is thrown inside a handler or saga step, it is automatically serialized and forwarded to the caller.
  • This allows errors to be caught remotely, preserving the error information.
  • Custom errors with type information are supported and will not be retried automatically by the system, enabling precise control over error handling and retries.

We are actively working on an adapter to support JSON serialization as an alternative to BSON.


Calling Services

RestateClient

The RestateClient handles communication between services and objects. It behaves differently depending on whether it is used within or outside an invocation context.

You can create an ingress client manually:

import { RestateIngressClient } from 'deepkit-restate';

const client = new RestateIngressClient({ url: 'http://localhost:9080' });

Or retrieve the configured instance via DI:

const client = app.get<RestateClient>();

Using the Client

To create a proxy to a service:

const user = client.service<UserServiceApi>();

To create a proxy to an object:

const user = client.object<UserObjectApi>();

Invoking Methods

Durable request (waits for a result):

await client.call(user.create());

Fire-and-forget (does not wait for result):

await client.send(user.create());

You can configure delivery options:

await client.send(user.create(), { delay: '10s' });

For object calls, specify the key:

await client.call('user-key', user.create());
await client.send('user-key', user.create());

Defining Services and Objects

Services

interface UserServiceHandlers {
  create(username: string): Promise<User>;
}

type UserServiceApi = RestateService<'user', UserServiceHandlers>;

@restate.service<UserServiceApi>()
class UserService implements UserServiceHandlers {
  constructor(private readonly ctx: RestateServiceContext) {}

  @restate.handler()
  async create(username: string): Promise<User> {
    return User.create(this.ctx, username);
  }
}
  • Use @restate.service() to define a service.
  • Use @restate.handler() define handlers.
  • The context (RestateServiceContext) provides durable execution helpers.

Objects

interface UserObjectHandlers {}

type UserObjectApi = RestateObject<'user', UserObjectHandlers>;

@restate.object<UserObjectApi>()
class UserObject implements UserObjectHandlers {}

Use @restate.object() to define virtual objects.

Shared handlers can be declared using @restate.shared().handler(). Note: Shared handlers use the object context, which is not type-safe. Avoid using ctx.set() at runtime in shared handlers.


Middleware

Middleware provides a way to execute code before handlers are invoked, enabling cross-cutting concerns like authentication, logging, validation, and request preprocessing.

Defining Middleware

Create a middleware class that implements the RestateMiddleware interface:

import {
  RestateMiddleware,
  RestateSharedContext,
  RestateClassMetadata,
  RestateHandlerMetadata
} from 'deepkit-restate';

class AuthenticationMiddleware implements RestateMiddleware {
  async execute(
    ctx: RestateSharedContext,
    classMetadata: RestateClassMetadata,
    handlerMetadata?: RestateHandlerMetadata,
  ): Promise<void> {
    // Access context properties like headers, request data, etc.
    const headers = ctx.request().headers;

    // Access metadata about the service/object and handler
    console.log(`Executing ${classMetadata.name}.${handlerMetadata?.name}`);
    console.log(`Service class: ${classMetadata.classType.name}`);

    // Perform authentication logic
    if (!headers?.authorization) {
      throw new Error('Authentication required');
    }

    // Middleware can modify context or perform side effects
    console.log('Request authenticated');
  }
}

Applying Middleware

Service-Level Middleware

Apply middleware to all handlers in a service:

@restate.service<UserServiceApi>().middleware(AuthenticationMiddleware)
class UserService implements UserServiceHandlers {
  @restate.handler()
  async create(username: string): Promise<User> {
    // AuthenticationMiddleware runs before this handler
    return new User(username);
  }
}

Handler-Level Middleware

Apply middleware to specific handlers:

@restate.service<UserServiceApi>()
class UserService implements UserServiceHandlers {
  @restate.handler().middleware(ValidationMiddleware)
  async create(username: string): Promise<User> {
    // ValidationMiddleware runs before this handler
    return new User(username);
  }
}

Object Middleware

Middleware works the same way for objects:

@restate.object<UserObjectApi>().middleware(LoggingMiddleware)
class UserObject implements UserObjectHandlers {
  @restate.handler()
  async update(data: UserData): Promise<void> {
    // LoggingMiddleware runs before this handler
  }
}

Global Middleware

Apply middleware to all services and objects:

new RestateModule({
  // ... other config
}).addGlobalMiddleware(LoggingMiddleware, MetricsMiddleware);

Middleware Execution Order

Middleware executes in the following order:

  1. Global middleware (in registration order)
  2. Service/Object-level middleware (in registration order)
  3. Handler-level middleware (in registration order)
  4. Handler execution

Middleware Context

Middleware receives three parameters providing comprehensive execution context:

1. RestateSharedContext

Provides access to:

  • Request information: Headers, method name, service name
  • Execution context: Invocation ID, retry information
  • Restate utilities: Random number generation, timing functions

2. RestateClassMetadata

Provides information about the service/object being executed:

  • Service/Object name: The registered name
  • Class type: The actual TypeScript class
  • Handlers: All handlers defined on the service/object
  • Applied middleware: Middleware configured at the class level

3. RestateHandlerMetadata (optional)

Provides information about the specific handler being executed:

  • Handler name: The method name being invoked
  • Return type: TypeScript type information for the return value
  • Arguments type: TypeScript type information for the parameters
  • Handler options: Configuration options for the handler
  • Applied middleware: Middleware configured at the handler level
class RequestLoggingMiddleware implements RestateMiddleware {
  async execute(
    ctx: RestateSharedContext,
    classMetadata: RestateClassMetadata,
    handlerMetadata?: RestateHandlerMetadata,
  ): Promise<void> {
    console.log(`Executing ${classMetadata.name}.${handlerMetadata?.name || 'unknown'}`);
    console.log(`Service class: ${classMetadata.classType.name}`);
    console.log(`Invocation ID: ${ctx.invocationId}`);
    console.log(`Headers:`, ctx.request?.headers);

    // Access handler-specific information
    if (handlerMetadata) {
      console.log(`Handler return type: ${handlerMetadata.returnType.kind}`);
      console.log(`Handler middleware count: ${handlerMetadata.middlewares.length}`);
    }

    // Access class-level information
    console.log(`Service middleware count: ${classMetadata.middlewares.length}`);
    console.log(`Total handlers: ${classMetadata.handlers.size}`);
  }
}

Error Handling in Middleware

If middleware throws an error, the handler will not execute and the error will be propagated to the caller:

class ValidationMiddleware implements RestateMiddleware {
  async execute(
    ctx: RestateSharedContext,
    classMetadata: RestateClassMetadata,
    handlerMetadata?: RestateHandlerMetadata,
  ): Promise<void> {
    // This error will prevent handler execution
    if (!this.isValidRequest(ctx, handlerMetadata)) {
      throw new Error(`Invalid request format for ${classMetadata.name}.${handlerMetadata?.name}`);
    }
  }

  private isValidRequest(ctx: RestateSharedContext, handlerMetadata?: RestateHandlerMetadata): boolean {
    // Validation logic can use both context and metadata
    return true; // Simplified example
  }
}

Dependency Injection

Middleware classes support dependency injection like any other service:

class DatabaseMiddleware implements RestateMiddleware {
  constructor(private readonly database: Database) {}

  async execute(
    ctx: RestateSharedContext,
    classMetadata: RestateClassMetadata,
    handlerMetadata?: RestateHandlerMetadata,
  ): Promise<void> {
    // Use injected dependencies and metadata
    await this.database.logRequest({
      invocationId: ctx.invocationId,
      serviceName: classMetadata.name,
      handlerName: handlerMetadata?.name,
      serviceClass: classMetadata.classType.name,
    });
  }
}

Middleware classes are automatically resolved by the dependency injection system when applied to services, objects, or handlers. No manual registration in the providers array is required.

Using Metadata in Middleware

The metadata parameters enable powerful middleware capabilities:

Service-Specific Logic

class ServiceSpecificMiddleware implements RestateMiddleware {
  async execute(
    ctx: RestateSharedContext,
    classMetadata: RestateClassMetadata,
    handlerMetadata?: RestateHandlerMetadata,
  ): Promise<void> {
    // Apply different logic based on service name
    if (classMetadata.name === 'payment') {
      await this.validatePaymentSecurity(ctx);
    } else if (classMetadata.name === 'user') {
      await this.validateUserPermissions(ctx);
    }
  }
}

Handler-Specific Behavior

class HandlerSpecificMiddleware implements RestateMiddleware {
  async execute(
    ctx: RestateSharedContext,
    classMetadata: RestateClassMetadata,
    handlerMetadata?: RestateHandlerMetadata,
  ): Promise<void> {
    // Skip validation for read-only operations
    if (handlerMetadata?.name?.startsWith('get') || handlerMetadata?.name?.startsWith('list')) {
      return; // Skip middleware for read operations
    }

    // Apply strict validation for write operations
    await this.validateWritePermissions(ctx, classMetadata.name);
  }
}

Dynamic Configuration

class ConfigurableMiddleware implements RestateMiddleware {
  async execute(
    ctx: RestateSharedContext,
    classMetadata: RestateClassMetadata,
    handlerMetadata?: RestateHandlerMetadata,
  ): Promise<void> {
    // Use handler options for configuration
    const timeout = handlerMetadata?.options?.timeout || 30000;
    const retries = handlerMetadata?.options?.retries || 3;

    // Apply configuration-based logic
    await this.setupTimeoutAndRetries(ctx, timeout, retries);
  }
}

Dependency Injection: Calling Other Services

You can inject the client and proxy APIs into a service:

@restate.service<UserServiceApi>()
class UserService {
  constructor(
    private readonly client: RestateClient,
    private readonly payment: PaymentServiceApi,
  ) {}

  @restate.handler()
  async create(user: User): Promise<void> {
    await this.client.call(this.payment.create('Test', user));
  }
}

For objects, remember to provide a key:

await this.client.call('payment-id', this.payment.create('Test'));

Durable Helpers

run blocks

The ctx.run() helper ensures a block is executed durably:

const user = await this.ctx.run<User>('create user', () => new User(username));

Without a type argument, the return value is ignored:

const none = await this.ctx.run('create user', () => new User(username));

Awakeables

Used to pause and resume execution:

const awakeable = this.ctx.awakeable<User>();

To resume:

this.ctx.resolveAwakeable<User>();

Durable State

Store and retrieve durable state using the context:

await this.ctx.set<User>('user', user);
const user = await this.ctx.get<User>('user');

Pub/Sub

Server Setup

Set up a dedicated application for handling events.

import { App } from '@deepkit/app';
import { FrameworkModule } from '@deepkit/framework';
import { RestateModule } from 'deepkit-restate';
import { RestatePubsubServerModule } from 'deepkit-restate/pubsub-server';

await new App({
  imports: [
    new FrameworkModule({ port: 9090 }),
    new RestateModule({ server: { port: 9080 } }),
    new RestatePubSubServerModule({
      sse: {
        all: true,
        autoDiscover: true,
        nodes: ['localhost:9090'],
      },
    }),
  ],
}).run();

Publishing Events

Inside a service handler (durable):

constructor(private readonly publisher: RestateEventPublisher) {}

await this.publisher.publish([new UserCreatedEvent(user)]);

Outside of invocation (non-durable):

const publisher = app.get<RestateEventPublisher>();
await publisher.publish([new UserCreatedEvent(user)]);

Only classes are supported as events.

Events are versioned by hashing their structure.

Handling Events

Only services can define event handlers:

@restate.service<UserServiceApi>()
class UserService {
  @(restate.event<UserCreatedEvent>().handler())
  async onUserCreated(event: UserCreatedEvent): Promise<void> {
    // handle event
  }
}

SSE Delivery

Server-Sent Events (SSE) allow real-time delivery of events to connected subscribers.

Subscribing to Events Outside of Services

Subscribe to events from contexts like HTTP or RPC controllers:

const subscriber = app.get<RestateEventSubscriber>();

const unsubscribe = await subscriber.subscribe<UserCreatedEvent>(event => {
  // handle event
});

await unsubscribe();

You can also use union types to subscribe to multiple events.

Configuration (Global)

You can configure global SSE delivery behavior in RestatePubSubServerModule:

new RestatePubSubServerModule({
  sse: {
    all: true,
    autoDiscover: true,
    nodes: ['events-1.internal:9090', 'events-2.internal:9090'],
  },
});
Option Type Description
sse.all boolean If true, all published events will be delivered via SSE by default.
sse.autoDiscover boolean When enabled, resolves peer IPs via DNS to fan out SSE events to other nodes.
sse.nodes string[] List of peer server URLs for fan-out.

SSE fan-out is stateless and opportunistic. Each node will attempt to push matching events to other known nodes.

Overriding per Publish

You can override the global SSE setting by passing { sse: true } in the publish options:

await publisher.publish([new UserCreatedEvent(user)], {
  sse: true,
});

Behavior summary:

  • If sse.all is true, SSE is used by default unless explicitly disabled.
  • If sse.all is false, SSE is off by default — but you can still enable it by passing sse: true.

Only events published with SSE enabled will be streamed to subscribers.

Sagas

Sagas provide a powerful way to orchestrate complex, long-running workflows that involve multiple services. They support stepwise execution, compensation (rollback), reply handling, and waiting for external events (via awakeables).


What is a Saga?

A Saga is a workflow pattern that manages distributed transactions and side effects in a coordinated way, including compensations for failures. In Deepkit Restate, you define sagas by extending the Saga<T> class and using the @restate.saga<Api>() decorator.


Defining a Saga Workflow

Sagas are defined using a fluent builder pattern in the definition property:

  • step(): Defines a new step in the saga.
  • invoke(handler): Calls a method in your saga class to perform an action or service call.
  • compensate(handler): Defines a rollback method if the step fails or the saga is aborted.
  • onReply<EventType>(handler): Registers an event handler for replies to invoked actions.
  • build(): Finalizes the saga definition.

Awakeables

Awakeables are special constructs to wait for asynchronous external events. They provide a promise you can await to pause saga execution until an event occurs.

Create awakeables with the saga context inside your saga methods:

this.confirmTicketAwakeable = this.ctx.awakeable<TicketConfirmed>();

Using the Saga Context

The RestateSagaContext (this.ctx) provides utilities like:

  • awakeable<T>(): Creates an awakeable to wait for events.
  • set<T>(key, value): Persist state data during saga execution.
  • get<T>(key): Retrieve persisted state.

Calling Other Services

All service calls inside invocation handlers automatically use the underlying client.call. This means:

  • You do not need to manually call client.call within your saga handlers.
  • Only service calls are supported currently (no direct calls to objects).
  • The framework handles communication and reply handling.

Example: Simplified CreateOrderSaga

import {
  restate,
  Saga,
  RestateSagaContext,
  RestateAwakeable,
} from 'deepkit-restate';

@restate.saga<CreateOrderSagaApi>()
export class CreateOrderSaga extends Saga<CreateOrderSagaData> {
  confirmTicketAwakeable?: RestateAwakeable<TicketConfirmed>;

  readonly definition = this.step()
    .invoke(this.create)
    .compensate(this.reject)
    .step()
    .invoke(this.createTicket)
    .onReply<TicketCreated>(this.handleTicketCreated)
    .step()
    .invoke(this.waitForTicketConfirmation)
    .build();

  constructor(
    private readonly order: OrderServiceApi,
    private readonly kitchen: KitchenServiceApi,
    private readonly ctx: RestateSagaContext,
  ) {
    super();
  }

  create(data: CreateOrderSagaData) {
    return this.order.create(data.orderId, data.orderDetails);
  }

  reject(data: CreateOrderSagaData) {
    return this.order.reject(data.orderId);
  }

  createTicket(data: CreateOrderSagaData) {
    this.confirmTicketAwakeable = this.ctx.awakeable<TicketConfirmed>();
    return this.kitchen.createTicket(
      data.orderDetails.restaurantId,
      data.orderId,
      data.orderDetails.lineItems,
      this.confirmTicketAwakeable.id,
    );
  }

  handleTicketCreated(data: CreateOrderSagaData, event: TicketCreated) {
    data.ticketId = event.ticketId;
  }

  async waitForTicketConfirmation(data: CreateOrderSagaData) {
    await this.confirmTicketAwakeable!.promise;
  }
}

Starting a Saga and Retrieving Its State

After defining your saga, you typically want to start an instance of it and later query its state to track progress or outcome.

Creating a Saga Client

Use the client to create a saga proxy:

const createOrderSaga = client.saga<CreateOrderSagaApi>();

This creates a handle to interact with the saga.


Starting a Saga Instance

To start a saga, call start with the saga’s unique ID and initial input data:

const startStatus = await createOrderSaga.start(orderId, {
  id: orderId,
  orderTotal: 10.5,
  customerId,
});
  • orderId uniquely identifies the saga instance.
  • The second argument is the initial data payload to pass to the saga.
  • start returns the initial status of saga execution.

Querying the Saga State

At any time, you can query the current state of the saga instance by its ID using state:

const state = await createOrderSaga.state(orderId);

This returns the persisted saga data reflecting its current progress, e.g., which step it is on, and any state variables updated along the way.


Notes

  • The saga start call triggers the first step of your saga workflow.
  • The saga state reflects all persisted data and progress, useful for monitoring or troubleshooting.
  • You can invoke start only once per unique saga instance ID.
  • Subsequent state changes happen asynchronously as the saga progresses.

Summary

  • Sagas manage multi-step distributed workflows with clear compensation.
  • Steps can invoke service calls, wait for replies, or wait for external events.
  • Awakeables let you asynchronously wait inside sagas for external confirmations.
  • Saga state can be persisted and retrieved with the saga context.
  • Invocation handlers automatically handle calling services; no manual client calls needed.
  • Currently, only service calls are supported, no direct object calls with keys.
  • Compensation methods help rollback on failure or abort scenarios.

Packages

No packages published