Deepkit Restate is a seamless Restate integration for Deepkit. It enables effortless communication between distributed services using durable invocations, service interfaces, and event-driven architecture.
This documentation assumes familiarity with Deepkit and Restate's concepts and lifecycle.
npm add deepkit-restate
To use Deepkit Restate, import the RestateModule
and provide configuration for the components you need:
import { FrameworkModule } from '@deepkit/framework';
import { RestateModule } from 'deepkit-restate';
import { App } from '@deepkit/app';
const app = new App({
imports: [
new FrameworkModule(),
new RestateModule({
server: {
host: 'http://localhost',
port: 9080,
propagateIncomingHeaders: true, // Forward all incoming headers to service calls
},
ingress: {
url: 'http://localhost:8080',
},
pubsub: {
cluster: 'default',
defaultStream: 'all',
sse: {
url: 'http://localhost:3000',
},
},
admin: {
url: 'http://0.0.0.0:9070',
deployOnStartup: true,
},
}),
],
});
You can configure any combination of the following:
- server: Starts a Restate server
- ingress: Enables outbound service calls
- pubsub: Enables pub/sub event system
- admin: Registers deployments with the admin interface
If a section is not configured, that functionality will not be available.
The server
configuration section supports the following options:
Option | Type | Default | Description |
---|---|---|---|
host |
string |
- | The host address for the Restate server |
port |
number |
9080 |
The port number for the Restate server |
propagateIncomingHeaders |
true | string[] |
undefined |
Controls header propagation to downstream service calls |
The propagateIncomingHeaders
option controls whether incoming request headers are forwarded when making service-to-service calls:
// Forward all incoming headers
server: {
propagateIncomingHeaders: true
}
// Forward only specific headers
server: {
propagateIncomingHeaders: ['authorization', 'x-correlation-id', 'x-tenant-id']
}
// No header propagation (default)
server: {
// propagateIncomingHeaders not specified
}
This is particularly useful for:
- Authentication: Forwarding authorization tokens through the service call chain
- Tracing: Propagating correlation IDs for distributed tracing
- Multi-tenancy: Passing tenant identifiers to downstream services
- Custom context: Forwarding application-specific headers
Note: When
propagateIncomingHeaders
is enabled, the incoming headers are merged with any explicitly provided headers in the service call options. Explicitly provided headers take precedence over incoming headers.
All serialization and deserialization in Deepkit Restate is handled via BSON by default.
This means you can return and accept any types in your service handlers or saga steps, including:
- Primitives (
string
,number
,boolean
, etc.) - Plain objects (
{ name: string; age: number }
) - Class instances (with properties and methods)
- Complex nested types and arrays
- Custom types supported by BSON serialization
The serialization system preserves type fidelity and structure when encoding and decoding data across the network.
- If an error is thrown inside a handler or saga step, it is automatically serialized and forwarded to the caller.
- This allows errors to be caught remotely, preserving the error information.
- Custom errors with type information are supported and will not be retried automatically by the system, enabling precise control over error handling and retries.
We are actively working on an adapter to support JSON serialization as an alternative to BSON.
The RestateClient
handles communication between services and objects. It behaves differently depending on whether it is used within or outside an invocation context.
You can create an ingress client manually:
import { RestateIngressClient } from 'deepkit-restate';
const client = new RestateIngressClient({ url: 'http://localhost:9080' });
Or retrieve the configured instance via DI:
const client = app.get<RestateClient>();
To create a proxy to a service:
const user = client.service<UserServiceApi>();
To create a proxy to an object:
const user = client.object<UserObjectApi>();
Durable request (waits for a result):
await client.call(user.create());
Fire-and-forget (does not wait for result):
await client.send(user.create());
You can configure delivery options:
await client.send(user.create(), { delay: '10s' });
For object calls, specify the key:
await client.call('user-key', user.create());
await client.send('user-key', user.create());
interface UserServiceHandlers {
create(username: string): Promise<User>;
}
type UserServiceApi = RestateService<'user', UserServiceHandlers>;
@restate.service<UserServiceApi>()
class UserService implements UserServiceHandlers {
constructor(private readonly ctx: RestateServiceContext) {}
@restate.handler()
async create(username: string): Promise<User> {
return User.create(this.ctx, username);
}
}
- Use
@restate.service()
to define a service. - Use
@restate.handler()
define handlers. - The context (
RestateServiceContext
) provides durable execution helpers.
interface UserObjectHandlers {}
type UserObjectApi = RestateObject<'user', UserObjectHandlers>;
@restate.object<UserObjectApi>()
class UserObject implements UserObjectHandlers {}
Use @restate.object()
to define virtual objects.
Shared handlers can be declared using
@restate.shared().handler()
. Note: Shared handlers use the object context, which is not type-safe. Avoid usingctx.set()
at runtime in shared handlers.
Middleware provides a way to execute code before handlers are invoked, enabling cross-cutting concerns like authentication, logging, validation, and request preprocessing.
Create a middleware class that implements the RestateMiddleware
interface:
import {
RestateMiddleware,
RestateSharedContext,
RestateClassMetadata,
RestateHandlerMetadata
} from 'deepkit-restate';
class AuthenticationMiddleware implements RestateMiddleware {
async execute(
ctx: RestateSharedContext,
classMetadata: RestateClassMetadata,
handlerMetadata?: RestateHandlerMetadata,
): Promise<void> {
// Access context properties like headers, request data, etc.
const headers = ctx.request().headers;
// Access metadata about the service/object and handler
console.log(`Executing ${classMetadata.name}.${handlerMetadata?.name}`);
console.log(`Service class: ${classMetadata.classType.name}`);
// Perform authentication logic
if (!headers?.authorization) {
throw new Error('Authentication required');
}
// Middleware can modify context or perform side effects
console.log('Request authenticated');
}
}
Apply middleware to all handlers in a service:
@restate.service<UserServiceApi>().middleware(AuthenticationMiddleware)
class UserService implements UserServiceHandlers {
@restate.handler()
async create(username: string): Promise<User> {
// AuthenticationMiddleware runs before this handler
return new User(username);
}
}
Apply middleware to specific handlers:
@restate.service<UserServiceApi>()
class UserService implements UserServiceHandlers {
@restate.handler().middleware(ValidationMiddleware)
async create(username: string): Promise<User> {
// ValidationMiddleware runs before this handler
return new User(username);
}
}
Middleware works the same way for objects:
@restate.object<UserObjectApi>().middleware(LoggingMiddleware)
class UserObject implements UserObjectHandlers {
@restate.handler()
async update(data: UserData): Promise<void> {
// LoggingMiddleware runs before this handler
}
}
Apply middleware to all services and objects:
new RestateModule({
// ... other config
}).addGlobalMiddleware(LoggingMiddleware, MetricsMiddleware);
Middleware executes in the following order:
- Global middleware (in registration order)
- Service/Object-level middleware (in registration order)
- Handler-level middleware (in registration order)
- Handler execution
Middleware receives three parameters providing comprehensive execution context:
Provides access to:
- Request information: Headers, method name, service name
- Execution context: Invocation ID, retry information
- Restate utilities: Random number generation, timing functions
Provides information about the service/object being executed:
- Service/Object name: The registered name
- Class type: The actual TypeScript class
- Handlers: All handlers defined on the service/object
- Applied middleware: Middleware configured at the class level
Provides information about the specific handler being executed:
- Handler name: The method name being invoked
- Return type: TypeScript type information for the return value
- Arguments type: TypeScript type information for the parameters
- Handler options: Configuration options for the handler
- Applied middleware: Middleware configured at the handler level
class RequestLoggingMiddleware implements RestateMiddleware {
async execute(
ctx: RestateSharedContext,
classMetadata: RestateClassMetadata,
handlerMetadata?: RestateHandlerMetadata,
): Promise<void> {
console.log(`Executing ${classMetadata.name}.${handlerMetadata?.name || 'unknown'}`);
console.log(`Service class: ${classMetadata.classType.name}`);
console.log(`Invocation ID: ${ctx.invocationId}`);
console.log(`Headers:`, ctx.request?.headers);
// Access handler-specific information
if (handlerMetadata) {
console.log(`Handler return type: ${handlerMetadata.returnType.kind}`);
console.log(`Handler middleware count: ${handlerMetadata.middlewares.length}`);
}
// Access class-level information
console.log(`Service middleware count: ${classMetadata.middlewares.length}`);
console.log(`Total handlers: ${classMetadata.handlers.size}`);
}
}
If middleware throws an error, the handler will not execute and the error will be propagated to the caller:
class ValidationMiddleware implements RestateMiddleware {
async execute(
ctx: RestateSharedContext,
classMetadata: RestateClassMetadata,
handlerMetadata?: RestateHandlerMetadata,
): Promise<void> {
// This error will prevent handler execution
if (!this.isValidRequest(ctx, handlerMetadata)) {
throw new Error(`Invalid request format for ${classMetadata.name}.${handlerMetadata?.name}`);
}
}
private isValidRequest(ctx: RestateSharedContext, handlerMetadata?: RestateHandlerMetadata): boolean {
// Validation logic can use both context and metadata
return true; // Simplified example
}
}
Middleware classes support dependency injection like any other service:
class DatabaseMiddleware implements RestateMiddleware {
constructor(private readonly database: Database) {}
async execute(
ctx: RestateSharedContext,
classMetadata: RestateClassMetadata,
handlerMetadata?: RestateHandlerMetadata,
): Promise<void> {
// Use injected dependencies and metadata
await this.database.logRequest({
invocationId: ctx.invocationId,
serviceName: classMetadata.name,
handlerName: handlerMetadata?.name,
serviceClass: classMetadata.classType.name,
});
}
}
Middleware classes are automatically resolved by the dependency injection system when applied to services, objects, or handlers. No manual registration in the providers array is required.
The metadata parameters enable powerful middleware capabilities:
class ServiceSpecificMiddleware implements RestateMiddleware {
async execute(
ctx: RestateSharedContext,
classMetadata: RestateClassMetadata,
handlerMetadata?: RestateHandlerMetadata,
): Promise<void> {
// Apply different logic based on service name
if (classMetadata.name === 'payment') {
await this.validatePaymentSecurity(ctx);
} else if (classMetadata.name === 'user') {
await this.validateUserPermissions(ctx);
}
}
}
class HandlerSpecificMiddleware implements RestateMiddleware {
async execute(
ctx: RestateSharedContext,
classMetadata: RestateClassMetadata,
handlerMetadata?: RestateHandlerMetadata,
): Promise<void> {
// Skip validation for read-only operations
if (handlerMetadata?.name?.startsWith('get') || handlerMetadata?.name?.startsWith('list')) {
return; // Skip middleware for read operations
}
// Apply strict validation for write operations
await this.validateWritePermissions(ctx, classMetadata.name);
}
}
class ConfigurableMiddleware implements RestateMiddleware {
async execute(
ctx: RestateSharedContext,
classMetadata: RestateClassMetadata,
handlerMetadata?: RestateHandlerMetadata,
): Promise<void> {
// Use handler options for configuration
const timeout = handlerMetadata?.options?.timeout || 30000;
const retries = handlerMetadata?.options?.retries || 3;
// Apply configuration-based logic
await this.setupTimeoutAndRetries(ctx, timeout, retries);
}
}
You can inject the client and proxy APIs into a service:
@restate.service<UserServiceApi>()
class UserService {
constructor(
private readonly client: RestateClient,
private readonly payment: PaymentServiceApi,
) {}
@restate.handler()
async create(user: User): Promise<void> {
await this.client.call(this.payment.create('Test', user));
}
}
For objects, remember to provide a key:
await this.client.call('payment-id', this.payment.create('Test'));
The ctx.run()
helper ensures a block is executed durably:
const user = await this.ctx.run<User>('create user', () => new User(username));
Without a type argument, the return value is ignored:
const none = await this.ctx.run('create user', () => new User(username));
Used to pause and resume execution:
const awakeable = this.ctx.awakeable<User>();
To resume:
this.ctx.resolveAwakeable<User>();
Store and retrieve durable state using the context:
await this.ctx.set<User>('user', user);
const user = await this.ctx.get<User>('user');
Set up a dedicated application for handling events.
import { App } from '@deepkit/app';
import { FrameworkModule } from '@deepkit/framework';
import { RestateModule } from 'deepkit-restate';
import { RestatePubsubServerModule } from 'deepkit-restate/pubsub-server';
await new App({
imports: [
new FrameworkModule({ port: 9090 }),
new RestateModule({ server: { port: 9080 } }),
new RestatePubSubServerModule({
sse: {
all: true,
autoDiscover: true,
nodes: ['localhost:9090'],
},
}),
],
}).run();
Inside a service handler (durable):
constructor(private readonly publisher: RestateEventPublisher) {}
await this.publisher.publish([new UserCreatedEvent(user)]);
Outside of invocation (non-durable):
const publisher = app.get<RestateEventPublisher>();
await publisher.publish([new UserCreatedEvent(user)]);
Only classes are supported as events.
Events are versioned by hashing their structure.
Only services can define event handlers:
@restate.service<UserServiceApi>()
class UserService {
@(restate.event<UserCreatedEvent>().handler())
async onUserCreated(event: UserCreatedEvent): Promise<void> {
// handle event
}
}
Server-Sent Events (SSE) allow real-time delivery of events to connected subscribers.
Subscribe to events from contexts like HTTP or RPC controllers:
const subscriber = app.get<RestateEventSubscriber>();
const unsubscribe = await subscriber.subscribe<UserCreatedEvent>(event => {
// handle event
});
await unsubscribe();
You can also use union types to subscribe to multiple events.
You can configure global SSE delivery behavior in RestatePubSubServerModule
:
new RestatePubSubServerModule({
sse: {
all: true,
autoDiscover: true,
nodes: ['events-1.internal:9090', 'events-2.internal:9090'],
},
});
Option | Type | Description |
---|---|---|
sse.all |
boolean |
If true , all published events will be delivered via SSE by default. |
sse.autoDiscover |
boolean |
When enabled, resolves peer IPs via DNS to fan out SSE events to other nodes. |
sse.nodes |
string[] |
List of peer server URLs for fan-out. |
SSE fan-out is stateless and opportunistic. Each node will attempt to push matching events to other known nodes.
You can override the global SSE setting by passing { sse: true }
in the publish options:
await publisher.publish([new UserCreatedEvent(user)], {
sse: true,
});
Behavior summary:
- If
sse.all
is true, SSE is used by default unless explicitly disabled. - If
sse.all
is false, SSE is off by default — but you can still enable it by passingsse: true
.
Only events published with SSE enabled will be streamed to subscribers.
Sagas provide a powerful way to orchestrate complex, long-running workflows that involve multiple services. They support stepwise execution, compensation (rollback), reply handling, and waiting for external events (via awakeables).
A Saga is a workflow pattern that manages distributed transactions and side effects in a coordinated way, including compensations for failures. In Deepkit Restate, you define sagas by extending the Saga<T>
class and using the @restate.saga<Api>()
decorator.
Sagas are defined using a fluent builder pattern in the definition
property:
step()
: Defines a new step in the saga.invoke(handler)
: Calls a method in your saga class to perform an action or service call.compensate(handler)
: Defines a rollback method if the step fails or the saga is aborted.onReply<EventType>(handler)
: Registers an event handler for replies to invoked actions.build()
: Finalizes the saga definition.
Awakeables are special constructs to wait for asynchronous external events. They provide a promise you can await
to pause saga execution until an event occurs.
Create awakeables with the saga context inside your saga methods:
this.confirmTicketAwakeable = this.ctx.awakeable<TicketConfirmed>();
The RestateSagaContext
(this.ctx
) provides utilities like:
awakeable<T>()
: Creates an awakeable to wait for events.set<T>(key, value)
: Persist state data during saga execution.get<T>(key)
: Retrieve persisted state.
All service calls inside invocation handlers automatically use the underlying client.call
. This means:
- You do not need to manually call
client.call
within your saga handlers. - Only service calls are supported currently (no direct calls to objects).
- The framework handles communication and reply handling.
import {
restate,
Saga,
RestateSagaContext,
RestateAwakeable,
} from 'deepkit-restate';
@restate.saga<CreateOrderSagaApi>()
export class CreateOrderSaga extends Saga<CreateOrderSagaData> {
confirmTicketAwakeable?: RestateAwakeable<TicketConfirmed>;
readonly definition = this.step()
.invoke(this.create)
.compensate(this.reject)
.step()
.invoke(this.createTicket)
.onReply<TicketCreated>(this.handleTicketCreated)
.step()
.invoke(this.waitForTicketConfirmation)
.build();
constructor(
private readonly order: OrderServiceApi,
private readonly kitchen: KitchenServiceApi,
private readonly ctx: RestateSagaContext,
) {
super();
}
create(data: CreateOrderSagaData) {
return this.order.create(data.orderId, data.orderDetails);
}
reject(data: CreateOrderSagaData) {
return this.order.reject(data.orderId);
}
createTicket(data: CreateOrderSagaData) {
this.confirmTicketAwakeable = this.ctx.awakeable<TicketConfirmed>();
return this.kitchen.createTicket(
data.orderDetails.restaurantId,
data.orderId,
data.orderDetails.lineItems,
this.confirmTicketAwakeable.id,
);
}
handleTicketCreated(data: CreateOrderSagaData, event: TicketCreated) {
data.ticketId = event.ticketId;
}
async waitForTicketConfirmation(data: CreateOrderSagaData) {
await this.confirmTicketAwakeable!.promise;
}
}
After defining your saga, you typically want to start an instance of it and later query its state to track progress or outcome.
Use the client to create a saga proxy:
const createOrderSaga = client.saga<CreateOrderSagaApi>();
This creates a handle to interact with the saga.
To start a saga, call start
with the saga’s unique ID and initial input data:
const startStatus = await createOrderSaga.start(orderId, {
id: orderId,
orderTotal: 10.5,
customerId,
});
orderId
uniquely identifies the saga instance.- The second argument is the initial data payload to pass to the saga.
start
returns the initial status of saga execution.
At any time, you can query the current state of the saga instance by its ID using state
:
const state = await createOrderSaga.state(orderId);
This returns the persisted saga data reflecting its current progress, e.g., which step it is on, and any state variables updated along the way.
- The saga
start
call triggers the first step of your saga workflow. - The saga state reflects all persisted data and progress, useful for monitoring or troubleshooting.
- You can invoke
start
only once per unique saga instance ID. - Subsequent state changes happen asynchronously as the saga progresses.
- Sagas manage multi-step distributed workflows with clear compensation.
- Steps can invoke service calls, wait for replies, or wait for external events.
- Awakeables let you asynchronously wait inside sagas for external confirmations.
- Saga state can be persisted and retrieved with the saga context.
- Invocation handlers automatically handle calling services; no manual client calls needed.
- Currently, only service calls are supported, no direct object calls with keys.
- Compensation methods help rollback on failure or abort scenarios.