Skip to content

Allows running workflows (chains and groups of tasks) using the Python background task processing library dramatiq

License

Notifications You must be signed in to change notification settings

Outset-AI/dramatiq-workflow

Repository files navigation

dramatiq-workflow

Run Tests PyPI version

dramatiq-workflow allows running workflows (chains and groups of tasks) using the Python background task processing library dramatiq.

Sponsors

Outset

Motivation

While dramatiq allows running tasks in parallel via groups, and in sequence via pipelines, it does not provide a way to combine these two concepts. dramatiq-workflow aims to fill this gap and allows creating complex workflows, similar to the canvas feature in Celery.

Features

  • Define workflows with tasks running in parallel and in sequence using chains and groups.
  • Nest chains and groups of tasks to create complex workflows.
  • Schedules workflows to run in the background using dramatiq.

Note: dramatiq-workflow does not support passing the results from one task to the next one in a chain. We recommend using a database to store intermediate results if needed.

Installation

You can install dramatiq-workflow from PyPI:

pip install dramatiq-workflow

Then, add the dramatiq-workflow middleware to your dramatiq broker:

from dramatiq.rate_limits.backends import RedisBackend  # or MemcachedBackend
from dramatiq_workflow import WorkflowMiddleware

backend = RedisBackend()  # or MemcachedBackend()
broker.add_middleware(WorkflowMiddleware(backend))

Please refer to the dramatiq documentation for details on how to set up a broker.

Example

Let's assume we want a workflow that looks like this:

             ╭────────╮  ╭────────╮
             │ Task 2 │  │ Task 5 │
          ╭──┼●      ●┼──┼●      ●┼╮
╭────────╮│  ╰────────╯  ╰────────╯│  ╭────────╮
│ Task 1 ││  ╭────────╮            │  │ Task 8 │
│       ●┼╯  │ Task 3 │            ╰──┼●       │
│       ●┼───┼●      ●┼───────────────┼●       │
│       ●┼╮  ╰────────╯             ╭─┼●       │
╰────────╯│  ╭────────╮   ╭────────╮│╭┼●       │
          │  │ Task 4 │   │ Task 6 │││╰────────╯
          ╰──┼●      ●┼───┼●      ●┼╯│
             │       ●┼╮  ╰────────╯ │
             ╰────────╯│             │
                       │  ╭────────╮ │
                       │  │ Task 7 │ │
                       ╰──┼●      ●┼─╯
                          ╰────────╯

We can define this workflow as follows:

import dramatiq
from dramatiq_workflow import Workflow, Chain, Group

@dramatiq.actor
def task1(arg1, arg2, arg3):
    print("Task 1")

@dramatiq.actor
def task2():
    print("Task 2")

# ...

workflow = Workflow(
    Chain(
        task1.message("arguments", "go", "here"),
        Group(
            Chain(
                task2.message(),
                task5.message(),
            ),
            task3.message(),
            Chain(
                task4.message(),
                Group(
                    task6.message(),
                    task7.message(),
                ),
            ),
        ),
        task8.message(),
    ),
)
workflow.run()  # Schedules the workflow to run in the background

Execution Order

In this example, the execution would look like this:

  1. Task 1 runs (with arguments "arguments", "go", and "here")
  2. Task 2, 3, and 4 run in parallel once Task 1 finishes
  3. Task 5 runs once Task 2 finishes
  4. Task 6 and 7 run in parallel once Task 4 finishes
  5. Task 8 runs once Task 5, 6, and 7 finish

This is a simplified example. The actual execution order may vary because tasks that can run in parallel (i.e. in a Group) are not guaranteed to run in the order they are defined in the workflow.

Advanced Usage

WithDelay

The WithDelay class allows delaying the execution of a task or a group of tasks:

from dramatiq_workflow import Chain, Group, WithDelay, Workflow

workflow = Workflow(
    Chain(
        task1.message("arguments", "go", "here"),
        WithDelay(task2.message(), delay=1_000),
        WithDelay(
            Group(
                task3.message(),
                task4.message(),
            ),
            delay=2_000,
        ),
    )
)

In this example, Task 2 will run roughly 1 second after Task 1 finishes, and Task 3 and will run 2 seconds after Task 2 finishes.

Large Workflows

Because of how dramatiq-workflow is implemented, each task in a workflow has to know about the remaining tasks in the workflow that could potentially run after it. By default, this is stored alongside your messages in the message queue. When a workflow has a large number of tasks, it can lead to an increase of memory usage in the broker and increased network traffic between the broker and the workers, especially when using Group tasks: Each task in a Group can potentially be the last one to finish, so each task has to retain a copy of the remaining tasks that run after the Group.

There are a few things you can do to alleviate this issue:

  • Minimize the usage of parameters in the message method. Instead, consider using a database to store data that is required by your tasks.
  • Limit the size of groups to a reasonable number of tasks. Instead of scheduling one task with 1000 tasks in a group, consider scheduling 10 groups with 100 tasks each and chaining them together.
  • Consider breaking down large workflows into smaller partial workflows that then schedule a subsequent workflow at the very end of the outermost Chain.

Compression

You can use compression to reduce the size of the messages in your queue. While dramatiq does not provide a compression implementation by default, one can be added with just a few lines of code. For example:

import dramatiq
from dramatiq.encoder import JSONEncoder, MessageData
import lz4.frame

class DramatiqLz4JSONEncoder(JSONEncoder):
    def encode(self, data: MessageData) -> bytes:
        return lz4.frame.compress(super().encode(data))

    def decode(self, data: bytes) -> MessageData:
        try:
            decompressed = lz4.frame.decompress(data)
        except RuntimeError:
            # Uncompressed data from before the switch to lz4
            decompressed = data
        return super().decode(decompressed)

dramatiq.set_encoder(DramatiqLz4JSONEncoder())

Callback Storage

To completely eliminate the issue of large workflows being stored in your message queue, you can provide a custom callback storage backend to the WorkflowMiddleware. A callback storage backend is responsible for storing and retrieving the list of callbacks. For example, you could implement a storage backend that stores the callbacks in S3 and only stores a reference to the S3 object in the message options.

A storage backend must implement the CallbackStorage interface:

from typing import Any
from dramatiq_workflow import CallbackStorage, SerializedCompletionCallbacks

class MyS3Storage(CallbackStorage):
    def store(self, callbacks: SerializedCompletionCallbacks) -> Any:
        # ... store in S3 and return a key
        pass

    def retrieve(self, ref: Any) -> SerializedCompletionCallbacks:
        # ... retrieve from S3 using the key
        pass

Then, you can pass an instance of your custom storage backend to the WorkflowMiddleware:

from dramatiq.rate_limits.backends import RedisBackend
from dramatiq_workflow import WorkflowMiddleware

backend = RedisBackend()
storage = MyS3Storage()  # Your custom storage backend
broker.add_middleware(WorkflowMiddleware(backend, callback_storage=storage))
Deduplicating Workflows

For convenience, dramatiq-workflow provides an abstract DedupWorkflowCallbackStorage class that you can use to separate the storage of workflows from the storage of callbacks. This is useful for deduplicating large workflow definitions that may be part of multiple callbacks, especially when chaining large groups of tasks.

To use it, you need to subclass DedupWorkflowCallbackStorage and implement the _store_workflow and _load_workflow methods.

from typing import Any
from dramatiq_workflow import DedupWorkflowCallbackStorage

class MyDedupStorage(DedupWorkflowCallbackStorage):
    def __init__(self):
        # In a real application, this would be a persistent storage like a
        # database or a distributed cache so that workers and producers can
        # both access it.
        self.__workflow_storage = {}

    def _store_workflow(self, id: str, workflow: dict) -> Any:
        # Using the `id` (which is the completion ID) to deduplicate.
        workflow_key = id
        if workflow_key not in self.__workflow_storage:
            self.__workflow_storage[workflow_key] = workflow
        return workflow_key  # Return a reference to the workflow.

    def _load_workflow(self, id: str, ref: Any) -> dict:
        # `ref` is what `_store_workflow` returned.
        return self.__workflow_storage[ref]

Barrier

dramatiq-workflow uses a barrier mechanism to keep track of the current state of a workflow. For example, every time a task in a Group is completed, the barrier is decreased by one. When the barrier reaches zero, the next task in the outer Chain is scheduled to run.

By default, dramatiq-workflow uses a custom AtMostOnceBarrier that ensures the barrier is never released more than once. When the barrier reaches zero, an additional key is set in the backend to prevent releasing the barrier again. In almost all cases, this is the desired behavior since releasing a barrier more than once could lead to duplicate tasks being scheduled - which would have severe compounding effects in a workflow with many Group tasks.

However, there is a small chance that the barrier is never released. This can happen when the configured rate_limiter_backend loses its state or when the worker unexpectedly crashes before scheduling the next task in the workflow.

To configure a different barrier implementation such as dramatiq's default Barrier, you can pass it to the WorkflowMiddleware:

from dramatiq.rate_limits import Barrier
from dramatiq.rate_limits.backends import RedisBackend
from dramatiq_workflow import WorkflowMiddleware

backend = RedisBackend()
broker.add_middleware(WorkflowMiddleware(backend, barrier_type=Barrier))

License

This project is licensed under the MIT License. See the LICENSE file for details.

About

Allows running workflows (chains and groups of tasks) using the Python background task processing library dramatiq

Resources

License

Stars

Watchers

Forks

Packages

No packages published