Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/background/05_task_chains.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ Tasks may be posted to a job queue (see `JobQueueTask`) and run by a job queue r
## Compound Task

A compound task is `GraphAsTask` that contains a group of tasks (in DAG format) chained together to look like a single task.

## Streaming Between Tasks

- Tasks can stream partial results to dependants without waiting for `execute()` to finish by declaring stream-capable outputs via the static `streaming()` descriptor.
- Ports marked with readiness `first-chunk` allow downstream tasks to begin work as soon as the first chunk is emitted. Ports with readiness `final` defer dependants until streaming completes.
- `IExecuteContext` now exposes `pushChunk`, `closeStream`, and `attachStreamController` helpers so tasks can enqueue chunks directly or adapt custom `ReadableStream` producers.
- Dataflows track streaming state and expose async iterables so consumers can react to chunk updates while still receiving the final aggregated output when the stream ends.
55 changes: 55 additions & 0 deletions packages/task-graph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,61 @@ try {

## Advanced Patterns

### Streaming Outputs

Tasks can stream partial results to downstream consumers. Declare stream-capable ports with `static streaming()` and use the execution context helpers to emit chunks.

```typescript
import {
Task,
type IExecuteContext,
type DataPortSchema,
type TaskStreamingDescriptor,
createStringAccumulator,
} from "@podley/task-graph";
import { Type } from "@sinclair/typebox";

class StreamingTextTask extends Task<{ chunks: string[] }, { output: string }> {
static readonly type = "StreamingTextTask";

static inputSchema() {
return Type.Object({
chunks: Type.Array(Type.String({ description: "Chunk to emit" })),
});
}

static outputSchema() {
return Type.Object({
output: Type.String({ description: "Concatenated output" }),
});
}

static streaming(): TaskStreamingDescriptor {
return {
outputs: {
output: {
chunkSchema: Type.String({
description: "Live chunk emitted while processing",
}) as DataPortSchema,
readiness: "first-chunk",
accumulator: createStringAccumulator(),
},
},
};
}

async execute(input: { chunks: string[] }, context: IExecuteContext) {
for (const chunk of input.chunks) {
await context.pushChunk("output", chunk);
}
await context.closeStream("output");
return { output: input.chunks.join("") };
}
}
```

`readiness: "first-chunk"` allows dependent tasks to begin as soon as the first chunk is available. Use `"final"` to defer dependants until the stream closes. The execution context also exposes `attachStreamController` so existing `ReadableStream` producers can be bridged into the task graph without rewriting streaming logic.

### Array Tasks (Parallel Processing)

```typescript
Expand Down
1 change: 1 addition & 0 deletions packages/task-graph/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export * from "./task/ITask";
export * from "./task/TaskEvents";
export * from "./task/TaskJSON";
export * from "./task/Task";
export * from "./task/TaskStream";
export * from "./task/GraphAsTask";
export * from "./task/GraphAsTaskRunner";
export * from "./task/TaskRegistry";
Expand Down
211 changes: 210 additions & 1 deletion packages/task-graph/src/task-graph/Dataflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import { areSemanticallyCompatible, EventEmitter } from "@podley/util";
import { Type } from "@sinclair/typebox";
import { TaskError } from "../task/TaskError";
import { DataflowJson } from "../task/TaskJSON";
import { Provenance, TaskIdType, TaskOutput, TaskStatus } from "../task/TaskTypes";
import {
Provenance,
TaskIdType,
TaskOutput,
TaskStatus,
type TaskStreamPortDescriptor,
} from "../task/TaskTypes";
import {
DataflowEventListener,
DataflowEventListeners,
Expand All @@ -23,6 +29,26 @@ export type DataflowIdType = `${string}[${string}] ==> ${string}[${string}]`;
export const DATAFLOW_ALL_PORTS = "*";
export const DATAFLOW_ERROR_PORT = "[error]";

interface StreamListener {
index: number;
pending:
| {
resolve: (result: IteratorResult<unknown>) => void;
reject: (error: unknown) => void;
}
| null;
closed: boolean;
}

interface DataflowStreamState {
descriptor: TaskStreamPortDescriptor<any, any>;
history: unknown[];
listeners: Set<StreamListener>;
closed: boolean;
error: TaskError | null;
readinessReached: boolean;
}

/**
* Represents a data flow between two tasks, indicating how one task's output is used as input for another task
*/
Expand Down Expand Up @@ -53,12 +79,14 @@ export class Dataflow {
public provenance: Provenance = {};
public status: TaskStatus = TaskStatus.PENDING;
public error: TaskError | undefined;
private streamState: DataflowStreamState | null = null;

public reset() {
this.status = TaskStatus.PENDING;
this.error = undefined;
this.value = undefined;
this.provenance = {};
this.streamState = null;
this.emit("reset");
this.emit("status", this.status);
}
Expand All @@ -70,6 +98,9 @@ export class Dataflow {
case TaskStatus.PROCESSING:
this.emit("start");
break;
case TaskStatus.STREAMING:
this.emit("stream_start");
break;
case TaskStatus.COMPLETED:
this.emit("complete");
break;
Expand Down Expand Up @@ -110,6 +141,184 @@ export class Dataflow {
}
}

public beginStream(
descriptor: TaskStreamPortDescriptor<any, any>,
provenance?: Provenance
): void {
if (!this.streamState) {
this.streamState = {
descriptor,
history: [],
listeners: new Set(),
closed: false,
error: null,
readinessReached: false,
};
}
if (provenance) {
this.provenance = provenance;
}
this.setStatus(TaskStatus.STREAMING);
}

public pushStreamChunk(
chunk: unknown,
aggregate: unknown,
provenance?: Provenance
): boolean {
const state = this.ensureActiveStream();
state.history.push(chunk);
let readinessTriggered = false;
if (!state.readinessReached && state.descriptor.readiness === "first-chunk") {
state.readinessReached = true;
readinessTriggered = true;
}
if (provenance) {
this.provenance = provenance;
}
this.value = aggregate;
this.emit("stream_chunk", chunk, aggregate);
this.flushStreamListeners();
return readinessTriggered;
}

public endStream(finalValue: unknown, provenance?: Provenance): boolean {
const state = this.ensureActiveStream();
state.closed = true;
let readinessTriggered = false;
if (state.descriptor.readiness === "final") {
state.readinessReached = true;
readinessTriggered = true;
}
if (provenance) {
this.provenance = provenance;
}
this.value = finalValue;
this.emit("stream_end", finalValue);
this.flushStreamListeners();
return readinessTriggered;
}

public failStream(error: TaskError): void {
const state = this.ensureActiveStream();
state.error = error;
state.closed = true;
this.flushStreamListeners();
this.emit("error", error);
}

public streamIterator(): AsyncIterableIterator<unknown> {
const state = this.ensureActiveStream();
const listener: StreamListener = {
index: 0,
pending: null,
closed: false,
};
state.listeners.add(listener);
const iterator: AsyncIterableIterator<unknown> = {
next: () => this.resolveStreamListener(listener),
return: () => {
this.cleanupStreamListener(listener, false);
return Promise.resolve({ value: undefined, done: true });
},
throw: (error?: unknown) => {
this.cleanupStreamListener(listener, false);
return Promise.reject(error);
},
[Symbol.asyncIterator](): AsyncIterableIterator<unknown> {
return this;
},
};
this.flushStreamListener(listener);
return iterator;
}

public hasActiveStream(): boolean {
return this.streamState !== null && !this.streamState.closed;
}

public streamReadinessReached(): boolean {
return this.streamState?.readinessReached ?? false;
}

public getStreamDescriptor(): TaskStreamPortDescriptor<any, any> | null {
return this.streamState?.descriptor ?? null;
}

private ensureActiveStream(): DataflowStreamState {
if (!this.streamState) {
throw new TaskError("Streaming state has not been initialised for this dataflow.");
}
return this.streamState;
}

private resolveStreamListener(
listener: StreamListener
): Promise<IteratorResult<unknown>> {
return new Promise((resolve, reject) => {
listener.pending = { resolve, reject };
this.flushStreamListener(listener);
});
}

private flushStreamListeners(): void {
if (!this.streamState) return;
for (const listener of Array.from(this.streamState.listeners)) {
this.flushStreamListener(listener);
}
}

private flushStreamListener(listener: StreamListener): void {
const state = this.streamState;
if (!state) return;
if (listener.closed) {
if (listener.pending) {
listener.pending.resolve({ value: undefined, done: true });
listener.pending = null;
}
state.listeners.delete(listener);
return;
}
if (!listener.pending) {
return;
}
if (listener.index < state.history.length) {
const value = state.history[listener.index++];
const { resolve } = listener.pending;
listener.pending = null;
resolve({ value, done: false });
return;
}
if (state.error) {
const { reject } = listener.pending;
listener.pending = null;
listener.closed = true;
reject(state.error);
state.listeners.delete(listener);
return;
}
if (state.closed) {
const { resolve } = listener.pending;
listener.pending = null;
listener.closed = true;
resolve({ value: undefined, done: true });
state.listeners.delete(listener);
}
}

private cleanupStreamListener(listener: StreamListener, settle: boolean): void {
const state = this.streamState;
if (!state) return;
if (listener.pending) {
if (settle) {
listener.pending.resolve({ value: undefined, done: true });
}
listener.pending = null;
}
listener.closed = true;
state.listeners.delete(listener);
}

toJSON(): DataflowJson {
return {
sourceTaskId: this.sourceTaskId,
Expand Down
9 changes: 9 additions & 0 deletions packages/task-graph/src/task-graph/DataflowEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ export type DataflowEventListeners = {
/** Fired when a source task completes successfully */
complete: () => void;

/** Fired when a dataflow begins streaming */
stream_start: () => void;

/** Fired when a streaming chunk traverses the dataflow */
stream_chunk: (chunk: unknown, aggregate: unknown) => void;

/** Fired when streaming ends */
stream_end: (aggregate: unknown) => void;

/** Fired when a source task is skipped */
skipped: () => void;

Expand Down
Loading
Loading