Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: sqs #834

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions docs/core_concepts/43_preprocessors/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Here are examples of a preprocessor function in [TypeScript](../../getting_start
export async function preprocessor(
/* args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email) */
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'postgres' | 'kafka' | 'nats',
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'postgres' | 'kafka' | 'nats', 'sqs',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
Expand All @@ -47,6 +47,16 @@ export async function preprocessor(
status?: number
description?: string
length: number
},
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
}
) {
Expand Down Expand Up @@ -89,12 +99,24 @@ class Nats(TypedDict):
description: str | None
length: int

class MessageAttribute(TypedDict):
string_value: str | None
data_type: str

class Sqs(TypeDict):
queue_url: str
message_id: str | None
receipt_handle: str | None
attributes: dict[str, str]
message_attributes: dict[str, MessageAttribute] | None

class WmTrigger(TypedDict):
kind: Literal["http", "email", "webhook", "websocket", "postgres", "kafka", "nats"]
kind: Literal["http", "email", "webhook", "websocket", "postgres", "kafka", "nats", "sqs"]
http: Http | None
websocket: Websocket | None
kakfa: Kafka | None
nats: Nats | None
sqs: Sqs | None

def preprocessor(
# args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email)
Expand Down Expand Up @@ -127,9 +149,9 @@ The flow preprocessor takes the same arguments as the script preprocessor and sh
<TabItem value="TypeScript">
```TypeScript
export async function preprocessor(
/* args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email) */
/* args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats/sqs, raw_email and parsed_email for email) */
wm_trigger: {
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'postgres' | 'kafka' | 'nats',
kind: 'http' | 'email' | 'webhook' | 'websocket' | 'postgres' | 'kafka' | 'nats', 'sqs',
http?: {
route: string // The route path, e.g. "/users/:id"
path: string // The actual path called, e.g. "/users/123"
Expand All @@ -149,6 +171,16 @@ export async function preprocessor(
nats?: {
servers: string[]
subject: string
},
sqs?: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
}
) {
Expand Down Expand Up @@ -187,12 +219,24 @@ class Nats(TypedDict):
description: str | None
length: int

class MessageAttribute(TypedDict):
string_value: str | None
data_type: str

class Sqs(TypeDict):
queue_url: str
message_id: str | None
receipt_handle: str | None
attributes: dict[str, str]
message_attributes: dict[str, MessageAttribute] | None

class WmTrigger(TypedDict):
kind: Literal["http", "email", "webhook", "websocket", "postgres", "kafka", "nats"]
kind: Literal["http", "email", "webhook", "websocket", "postgres", "kafka", "nats", "sqs"]
http: Http | None
websocket: Websocket | None
kafka: Kafka | None
nats: Nats | None
sqs: Sqs | None

def preprocessor(
# args from the request body (e.g. webhook/http body args, msg for ws/kafka/nats, raw_email and parsed_email for email)
Expand Down
98 changes: 98 additions & 0 deletions docs/core_concepts/48_sqs_triggers/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# SQS Triggers

Windmill can connect to an [SQS](https://aws.amazon.com/sqs/) queue and trigger runnables (scripts, flows) in response to messages received.
SQS triggers is a self-hosted Enterprise feature.

For more details on SQS, see the [AWS SQS Documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html).

![SQS triggers](./sqs_triggers.png)

---

## How to Use

- **Pick an AWS Resource**
- Select an existing AWS resource or create a new one.
- The AWS resource must have permissions to interact with SQS.

- **Select the Runnable to Execute**
- Choose the runnable (script or flow) that should be executed when a message arrives in the queue.
- The message will be passed as a JSON object to the runnable.

- **Provide an SQS Queue URL**
- Enter the **Queue URL** of the SQS queue that should trigger the runnable.
- You can find the Queue URL in the AWS Management Console under SQS.
- For more details, see the [SQS Queue URL Documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html#sqs-queue-url).

- **Choose (Optional) Message Attributes**
- Specify which message attributes should be included in the triggered event.
- These attributes can carry metadata, such as sender information or priority levels.
- For more details, see the [SQS Message Attributes Documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes).


## Example

Below are code examples demonstrating how to handle SQS messages in your Windmill scripts. You can either process messages directly in a basic script or use a preprocessor for more advanced message handling and transformation before execution.

### Basic Script Example

```TypeScript
export async function main(msg: string) {
// do something with the message
}
```

### Using a Preprocessor

If you use a [preprocessor](../43_preprocessors/index.mdx), the preprocessor function receives an SQS message with the following fields:

#### Field Descriptions

- **`queue_url`**: The URL of the SQS queue that received the message.
- **`message_id`**: A unique identifier assigned to each message by SQS.
- [More details](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)
- **`receipt_handle`**: A token used to delete the message after processing.
- [More details](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html)
- **`attributes`**: Metadata attributes set by SQS, such as `SentTimestamp`.
- [Full list of system attributes](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-system-attributes)
- **`message_attributes`**: User-defined attributes that can be attached to the message.
- `string_value`: The string representation of the attribute value.
- `data_type`: The data type of the attribute (e.g., `"String"`, `"Number"`, `"Binary"`).
- [More details on message attributes](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes)


```TypeScript
export async function preprocessor(
msg: string,
wm_trigger: {
kind: "sqs",
sqs: {
queue_url: string,
message_id?: string,
receipt_handle?: string,
attributes: Record<string, string>,
message_attributes?: Record<string, {
string_value?: string,
data_type: string
}>
}
},
) {
// assuming the message is a JSON object
const data = JSON.parse(msg);

return {
content: data.content,
metadata: {
sentAt: wm_trigger.sqs.attributes.SentTimestamp,
messageId: wm_trigger.sqs.message_id
}
};
}

export async function main(content: string, metadata: { sentAt: string, messageId: string }) {
// Process transformed message data
console.log(`Processing message ${metadata.messageId} sent at ${metadata.sentAt}`);
console.log("Content:", content);
}
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/core_concepts/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ On top of its editors to build endpoints, flows and apps, Windmill comes with a
description="Trigger scripts and flows from NATS subjects."
href="/docs/core_concepts/nats_triggers"
/>
<DocCard
title="SQS"
description="Trigger scripts and flows from Amazon SQS."
href="/docs/core_concepts/sqs_triggers"
/>
</div>

## Windmill features
Expand Down
13 changes: 13 additions & 0 deletions docs/getting_started/8_triggers/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Triggers from external events:
- [Postgres triggers](#postgres-triggers)
- [Kafka triggers](#kafka-triggers)
- [NATS triggers](#nats-triggers)
- [SQS triggers](#sqs-triggers)
- [Scheduled polls](#scheduled-polls-scheduling--trigger-scripts)

:::info Scripts and Flows in Windmill
Expand Down Expand Up @@ -305,6 +306,18 @@ Windmill can connect to NATS servers and trigger scripts or flows when messages
/>
</div>

### SQS triggers

Windmill can connect to an Amazon SQS queues and trigger scripts or flows when messages are received. This enables event-driven processing from your AWS ecosystem. Preprocessors can transform the SQS message data before it reaches your script or flow.

<div className="grid grid-cols-2 gap-6 mb-4">
<DocCard
title="SQS triggers"
description="Trigger scripts and flows from Amazon SQS messages."
href="/docs/core_concepts/sqs_triggers"
/>
</div>

### Scheduled polls (Scheduling + Trigger scripts)

A particular use case for schedules are [Trigger scripts](../../flows/10_flow_trigger.mdx).
Expand Down
12 changes: 12 additions & 0 deletions docs/script_editor/settings.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ Windmill can connect to NATS brokers and trigger scripts or flows when messages
/>
</div>

### SQS triggers

Windmill can connect to Amazon SQS queues and trigger scripts or flows when messages are received. This enables event-driven processing from your AWS ecosystem. Preprocessors can transform the SQS message data before it reaches your script or flow.

<div className="grid grid-cols-2 gap-6 mb-4">
<DocCard
title="SQS triggers"
description="Trigger scripts and flows from Amazon SQS messages."
href="/docs/core_concepts/sqs_triggers"
/>
</div>

### Email

Scripts and flows can be triggered by email messages sent to a specific email address, leveraging SMTP.
Expand Down
5 changes: 5 additions & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ const sidebars = {
type: 'doc',
id: 'core_concepts/nats_triggers/index',
label: 'NATS'
},
{
type: 'doc',
id: 'core_concepts/sqs_triggers/index',
label: 'SQS'
}
]
},
Expand Down
12 changes: 12 additions & 0 deletions src/components/Pricing.js
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,18 @@ const sections = [
},
link: '/docs/core_concepts/nats_triggers',
},
{
name: 'SQS triggers',
tiers: {
'tier-free-selfhost': false,
'tier-enterprise-selfhost': true,
'tier-enterprise-cloud': false,
'tier-free': false,
'tier-team': false
},
link: '/docs/core_concepts/sqs_triggers',
tooltip: 'Self-hosted only'
},
{
name: 'Private Hub',
tiers: {
Expand Down