Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cursor/rules/testing.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ alwaysApply: true

Use bun, not jest, not npm, not node

To run tests: `bun test`. To run specific ones: `bun run <testfilename>`.
To run tests: `bun test`. To run specific ones: `bun test <testfilename>`.
8 changes: 4 additions & 4 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/background/06_run_graph_orchestration.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The editor DAG is defined by the end user and saved in the database (tasks and d

## Graph

The graph is a DAG. It is a list of tasks and a list of dataflows. The tasks are the nodes and the dataflows are the connections between task outputs and inputs, plus status and provenance.
The graph is a DAG. It is a list of tasks and a list of dataflows. The tasks are the nodes and the dataflows are the connections between task outputs and inputs, plus status.

We expose events for graphs, tasks, and dataflows. A suspend/resume could be added for bulk creation. This helps keep a UI in sync as the graph runs.

Expand Down
2 changes: 1 addition & 1 deletion docs/background/07_intrumentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ Instrumentation is the process of adding code to a program to collect data about
Some of these tools cost money, so we need to track and estimate costs.

- Tasks emit status/progress events (`TaskStatus`, progress percent)
- Dataflows emit start/complete/error events and carry provenance
- Dataflows emit start/complete/error events
- Task graphs emit start/progress/complete/error events
4 changes: 1 addition & 3 deletions docs/developers/02_architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,10 @@ classDiagram

class TaskGraphRunner{
Map<number, Task[]> layers
Map<unknown, TaskInput> provenanceInput
TaskGraph dag
TaskOutputRepository repository
assignLayers(Task[] sortedNodes)
runGraph(TaskInput parentProvenance) TaskOutput
runGraph(TaskInput input) TaskOutput
runGraphReactive() TaskOutput
}

Expand All @@ -255,7 +254,6 @@ classDiagram
The TaskGraphRunner is responsible for executing tasks in a task graph. Key features include:

- **Layer-based Execution**: Tasks are organized into layers based on dependencies, allowing parallel execution of independent tasks
- **Provenance Tracking**: Tracks the lineage and input data that led to each task's output
- **Caching Support**: Can use a TaskOutputRepository to cache task outputs and avoid re-running tasks
- **Reactive Mode**: Supports reactive execution where tasks can respond to input changes without full re-execution
- **Smart Task Scheduling**: Automatically determines task execution order based on dependencies
Expand Down
241 changes: 240 additions & 1 deletion docs/developers/03_extending.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This document covers how to write your own tasks. For a more practical guide to
- [Tasks must have a `run()` method](#tasks-must-have-a-run-method)
- [Define Inputs and Outputs](#define-inputs-and-outputs)
- [Register the Task](#register-the-task)
- [Schema Format Annotations](#schema-format-annotations)
- [Job Queues and LLM tasks](#job-queues-and-llm-tasks)
- [Write a new Compound Task](#write-a-new-compound-task)
- [Reactive Task UIs](#reactive-task-uis)
Expand Down Expand Up @@ -117,7 +118,7 @@ To use the Task in Workflow, there are a few steps:

```ts
export const simpleDebug = (input: DebugLogTaskInput) => {
return new SimpleDebugTask(input).run();
return new SimpleDebugTask({} as DebugLogTaskInput, {}).run(input);
};

declare module "@workglow/task-graph" {
Expand All @@ -129,6 +130,103 @@ declare module "@workglow/task-graph" {
Workflow.prototype.simpleDebug = CreateWorkflow(SimpleDebugTask);
```

## Schema Format Annotations

When defining task input schemas, you can use `format` annotations to enable automatic resolution of string identifiers to object instances. The TaskRunner inspects input schemas and resolves annotated string values before task execution.

### Built-in Format Annotations

The system supports several format annotations out of the box:

| Format | Description | Helper Function |
| --------------------- | ----------------------------------- | -------------------------- |
| `model` | Any AI model configuration | — |
| `model:TaskName` | Model compatible with specific task | — |
| `repository:tabular` | Tabular data repository | `TypeTabularRepository()` |
| `repository:vector` | Vector storage repository | `TypeVectorRepository()` |
| `repository:document` | Document repository | `TypeDocumentRepository()` |

### Example: Using Format Annotations

```typescript
import { Task, type DataPortSchema } from "@workglow/task-graph";
import { TypeTabularRepository } from "@workglow/storage";
import { FromSchema } from "@workglow/util";

const MyTaskInputSchema = {
type: "object",
properties: {
// Model input - accepts string ID or ModelConfig object
model: {
title: "AI Model",
description: "Model for text generation",
format: "model:TextGenerationTask",
oneOf: [
{ type: "string", title: "Model ID" },
{ type: "object", title: "Model Config" },
],
},
// Repository input - uses helper function
dataSource: TypeTabularRepository({
title: "Data Source",
description: "Repository containing source data",
}),
// Regular string input (no resolution)
prompt: { type: "string", title: "Prompt" },
},
required: ["model", "dataSource", "prompt"],
} as const satisfies DataPortSchema;

type MyTaskInput = FromSchema<typeof MyTaskInputSchema>;

export class MyTask extends Task<MyTaskInput> {
static readonly type = "MyTask";
static inputSchema = () => MyTaskInputSchema;

async executeReactive(input: MyTaskInput) {
// By the time execute runs, model is a ModelConfig object
// and dataSource is an ITabularRepository instance
const { model, dataSource, prompt } = input;
// ...
}
}
```

### Creating Custom Format Resolvers

You can extend the resolution system by registering custom resolvers:

```typescript
import { registerInputResolver } from "@workglow/util";

// Register a resolver for "template:*" formats
registerInputResolver("template", async (id, format, registry) => {
const templateRepo = registry.get(TEMPLATE_REPOSITORY);
const template = await templateRepo.findById(id);
if (!template) {
throw new Error(`Template "${id}" not found`);
}
return template;
});
```

Then use it in your schemas:

```typescript
const inputSchema = {
type: "object",
properties: {
emailTemplate: {
type: "string",
format: "template:email",
title: "Email Template",
},
},
};
```

When a task runs with `{ emailTemplate: "welcome-email" }`, the resolver automatically converts it to the template object before execution.

## Job Queues and LLM tasks

We separate any long running tasks as Jobs. Jobs could potentially be run anywhere, either locally in the same thread, in separate threads, or on a remote server. A job queue will manage these for a single provider (like OpenAI, or a local Transformers.js ONNX runtime), and handle backoff, retries, etc.
Expand All @@ -148,3 +246,144 @@ Compound Tasks are not cached (though any or all of their children may be).
## Reactive Task UIs

Tasks can be reactive at a certain level. This means that they can be triggered by changes in the data they depend on, without "running" the expensive job based task runs. This is useful for a UI node editor. For example, you change a color in one task and it is propagated downstream without incurring costs for re-running the entire graph. It is like a spreadsheet where changing a cell can trigger a recalculation of other cells. This is implemented via a `runReactive()` method that is called when the data changes. Typically, the `run()` will call `runReactive()` on itself at the end of the method.

## AI and RAG Tasks

The `@workglow/ai` package provides a comprehensive set of tasks for building RAG (Retrieval-Augmented Generation) pipelines. These tasks are designed to chain together in workflows without requiring external loops.

### Document Processing Tasks

| Task | Description |
| ------------------------- | ----------------------------------------------------- |
| `StructuralParserTask` | Parses markdown/text into hierarchical document trees |
| `TextChunkerTask` | Splits text into chunks with configurable strategies |
| `HierarchicalChunkerTask` | Token-aware chunking that respects document structure |
| `TopicSegmenterTask` | Segments text by topic using heuristics or embeddings |
| `DocumentEnricherTask` | Adds summaries and entities to document nodes |

### Vector and Embedding Tasks

| Task | Description |
| ----------------------- | ---------------------------------------------- |
| `TextEmbeddingTask` | Generates embeddings using configurable models |
| `ChunkToVectorTask` | Transforms chunks to vector store format |
| `VectorStoreUpsertTask` | Stores vectors in a repository |
| `VectorStoreSearchTask` | Searches vectors by similarity |
| `VectorQuantizeTask` | Quantizes vectors for storage efficiency |

### Retrieval and Generation Tasks

| Task | Description |
| ------------------------ | --------------------------------------------- |
| `QueryExpanderTask` | Expands queries for better retrieval coverage |
| `HybridSearchTask` | Combines vector and full-text search |
| `RerankerTask` | Reranks search results for relevance |
| `HierarchyJoinTask` | Enriches results with parent context |
| `ContextBuilderTask` | Builds context for LLM prompts |
| `RetrievalTask` | Orchestrates end-to-end retrieval |
| `TextQuestionAnswerTask` | Generates answers from context |
| `TextGenerationTask` | General text generation |

### Chainable RAG Pipeline Example

Tasks chain together through compatible input/output schemas:

```typescript
import { Workflow } from "@workglow/task-graph";
import { InMemoryVectorRepository } from "@workglow/storage";

const vectorRepo = new InMemoryVectorRepository();
await vectorRepo.setupDatabase();

// Document ingestion pipeline
await new Workflow()
.structuralParser({
text: markdownContent,
title: "My Document",
format: "markdown",
})
.documentEnricher({
generateSummaries: true,
extractEntities: true,
})
.hierarchicalChunker({
maxTokens: 512,
overlap: 50,
strategy: "hierarchical",
})
.textEmbedding({
model: "Xenova/all-MiniLM-L6-v2",
})
.chunkToVector()
.vectorStoreUpsert({
repository: vectorRepo,
})
.run();
```

### Retrieval Pipeline Example

```typescript
const answer = await new Workflow()
.textEmbedding({
text: query,
model: "Xenova/all-MiniLM-L6-v2",
})
.vectorStoreSearch({
repository: vectorRepo,
topK: 10,
})
.reranker({
query,
topK: 5,
})
.contextBuilder({
format: "markdown",
maxLength: 2000,
})
.textQuestionAnswer({
question: query,
model: "Xenova/LaMini-Flan-T5-783M",
})
.run();
```

### Hierarchical Document Structure

Documents are represented as trees with typed nodes:

```typescript
type DocumentNode =
| DocumentRootNode // Root of document
| SectionNode // Headers, structural sections
| ParagraphNode // Text blocks
| SentenceNode // Fine-grained (optional)
| TopicNode; // Detected topic segments

// Each node contains:
interface BaseNode {
nodeId: string; // Deterministic content-based ID
range: { start: number; end: number };
text: string;
enrichment?: {
summary?: string;
entities?: Entity[];
keywords?: string[];
};
}
```

### Task Data Flow

Each task passes through what the next task needs:

| Task | Passes Through | Adds |
| --------------------- | ----------------------- | ------------------------------------ |
| `structuralParser` | - | `docId`, `documentTree`, `nodeCount` |
| `documentEnricher` | `docId`, `documentTree` | `summaryCount`, `entityCount` |
| `hierarchicalChunker` | `docId` | `chunks`, `text[]`, `count` |
| `textEmbedding` | (implicit) | `vector[]` |
| `chunkToVector` | - | `ids[]`, `vectors[]`, `metadata[]` |
| `vectorStoreUpsert` | - | `count`, `ids` |

This design eliminates the need for external loops - the entire pipeline chains together naturally.
6 changes: 3 additions & 3 deletions examples/cli/src/TaskCLI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function AddBaseCommands(program: Command) {
? (await getGlobalModelRepository().findByName(options.model))?.model_id
: (await getGlobalModelRepository().findModelsByTask("TextEmbeddingTask"))?.map(
(m) => m.model_id
);
)?.[0];

if (!model) {
program.error(`Unknown model ${options.model}`);
Expand All @@ -67,7 +67,7 @@ export function AddBaseCommands(program: Command) {
? (await getGlobalModelRepository().findByName(options.model))?.model_id
: (await getGlobalModelRepository().findModelsByTask("TextSummaryTask"))?.map(
(m) => m.model_id
);
)?.[0];
if (!model) {
program.error(`Unknown model ${options.model}`);
} else {
Expand All @@ -92,7 +92,7 @@ export function AddBaseCommands(program: Command) {
? (await getGlobalModelRepository().findByName(options.model))?.model_id
: (await getGlobalModelRepository().findModelsByTask("TextRewriterTask"))?.map(
(m) => m.model_id
);
)?.[0];
if (!model) {
program.error(`Unknown model ${options.model}`);
} else {
Expand Down
2 changes: 1 addition & 1 deletion packages/ai-provider/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ const task = new TextEmbeddingTask({
});

const result = await task.execute();
// result.vector: TypedArray - Vector embedding
// result.vector: Vector - Vector embedding
```

**Text Translation:**
Expand Down
Loading