Skip to content

Commit

Permalink
(#64) Support tasks retry & propagate raised exception
Browse files Browse the repository at this point in the history
For documentation, see:
1. Docstring of `task.task`
2. Tests in `tests.test_task` e.g. `test_retry_as_per_task_definition`
3. Sample usages in `tests.apps.simple_app` e.g. `append_to_file`

Changelist:

* Formalize serialization and deserialization
* Serialize & deserialize exceptions correctly
* Encapsulate retry & retry_on in a new dict 'options'
* Implement serde for AsyncResult
* Ensure generated file deleted after test
* Add jsonpickle to toml file
* Exclude `if TYPE_CHECKING:` from coverage
* Add test for singleton
* Add logging for worker
* Wrap all constants inside `Config` class

Signed-off-by: Imran Ariffin <[email protected]>
  • Loading branch information
imranariffin committed Dec 3, 2023
1 parent f02f6ce commit 4d4c0c0
Show file tree
Hide file tree
Showing 23 changed files with 901 additions and 134 deletions.
6 changes: 6 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ source = src/
parallel = True
concurrency = multiprocessing
sigterm = True

[report]
exclude_lines =
pragma: no cover
if TYPE_CHECKING:
if t.TYPE_CHECKING:
12 changes: 8 additions & 4 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,16 @@ ignored-parents=
max-args=10

# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=10

# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5

# Maximum number of branch for function / method body.
max-branches=12
max-branches=15

# Maximum number of locals for function / method body.
max-locals=15
max-locals=20

# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down Expand Up @@ -458,7 +458,11 @@ good-names=i,
a,
b,
c,
n
n,
id,
e,
s,
on

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
6 changes: 5 additions & 1 deletion .pylintrc.tests
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,12 @@ good-names=i,
a,
b,
c,
e,
n,
ls
t,
ls,
fo,
fi

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
13 changes: 12 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"justMyCode": false
},
{
"name": "Main",
"type": "python",
Expand Down Expand Up @@ -31,7 +41,8 @@
"-s",
],
"request": "launch",
"console": "integratedTerminal"
"console": "integratedTerminal",
"justMyCode": false
},
{
"name": "Sample Worker (Simple App)",
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import asyncio
import aiotaskq


@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down Expand Up @@ -132,22 +132,22 @@ import asyncio
from aiotaskq import task


@task
@task()
def task_1(*args, **kwargs):
pass


@task
@task()
def task_2(*args, **kwargs):
pass


@task
@task()
def task_3(*args, **kwargs):
pass


@task
@task()
def task_4(*args, **kwargs):
pass

Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ build-backend = "setuptools.build_meta"
requires-python = ">=3.9"
dependencies = [
"aioredis >= 2.0.0, < 2.1.0",
"jsonpickle >= 3.0.0, < 3.1.0",
"tomlkit >= 0.11.0, < 0.12.0",
"typer >= 0.4.0, < 0.5.0",
]
name = "aiotaskq"
version = "0.0.12"
version = "0.0.13"
readme = "README.md"
description = "A simple asynchronous task queue"
authors = [
Expand All @@ -28,7 +29,7 @@ license = { file = "LICENSE" }

[project.optional-dependencies]
dev = [
"black >= 22.1.0, < 22.2.0",
"black >= 22.2.0, < 23.0.0",
"coverage >= 6.4.0, < 6.5.0",
"mypy >= 0.931, < 1.0",
"mypy-extensions >= 0.4.0, < 0.5.0",
Expand Down
2 changes: 1 addition & 1 deletion src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import aiotaskq
@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down
5 changes: 4 additions & 1 deletion src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

#!/usr/bin/env python

import logging
import typing as t

import typer

from . import __version__
from .constants import Config
from .interfaces import ConcurrencyType
from .worker import Defaults, run_worker_forever
from . import __version__

cli = typer.Typer()
logging.basicConfig(level=Config.log_level())


def _version_callback(value: bool):
Expand Down
64 changes: 60 additions & 4 deletions src/aiotaskq/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,61 @@
"""Module to define and store all constants used across the library."""
"""
Module to define and store all constants used across the library.
REDIS_URL = "redis://127.0.0.1:6379"
TASKS_CHANNEL = "channel:tasks"
RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"
The public object from this module is `Config`. This object wraps
all the constants, which include:
- Variables
- Environment variables
- Static methods that return constant values
"""

import logging
from os import environ

from .interfaces import SerializationType

_REDIS_URL = "redis://127.0.0.1:6379"
_TASKS_CHANNEL = "channel:tasks"
_RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"


class Config:
"""
Provide configuration values.
These include:
- Variables
- Environment variables
- Static methods that return constant values
"""

@staticmethod
def serialization_type() -> SerializationType:
"""Return the serialization type as provided via env var AIOTASKQ_SERIALIZATION."""
s: str | None = environ.get("AIOTASKQ_SERIALIZATION", SerializationType.DEFAULT.value)
return SerializationType[s.upper()]

@staticmethod
def log_level() -> int:
"""Return the log level as provided via env var LOG_LEVEL."""
level: int = int(environ.get("AIOTASKQ_LOG_LEVEL", logging.DEBUG))
return level

@staticmethod
def broker_url() -> str:
"""
Return the broker url as provided via env var BROKER_URL.
Defaults to "redis://127.0.0.1:6379".
"""
broker_url: str = environ.get("BROKER_URL", _REDIS_URL)
return broker_url

@staticmethod
def tasks_channel() -> str:
"""Return the channel name used for transporting task requests on the broker."""
return _TASKS_CHANNEL

@staticmethod
def results_channel_template() -> str:
"""Return the template chnnale name used for transporting task results on the broker."""
return _RESULTS_CHANNEL_TEMPLATE
54 changes: 54 additions & 0 deletions src/aiotaskq/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,57 @@ class IWorkerManager(IWorker):
"""

concurrency_manager: IConcurrencyManager


class SerializationType(str, enum.Enum):
"""Specify the types of serialization supported."""

JSON = "json"
DEFAULT = JSON


T = t.TypeVar("T")


class ISerialization(t.Protocol, t.Generic[T]):
"""Define the interface required to serialize and deserialize a generic object."""

@classmethod
def serialize(cls, obj: T) -> bytes:
"""Serialize any object into bytes."""

@classmethod
def deserialize(cls, klass: type[T], s: bytes) -> T:
"""Deserialize bytes into any object."""


class RetryOptions(t.TypedDict):
"""
Specify the available retry options.
max_retries int | None: The number times to keep retrying the execution of the task
until the task executes successfully. Counting starts from
0 so if max_retries = 2 for example, then the task will execute
1 + 2 times (1 time for first execution, 2 times for re-try).
on tuple[type[Exception], ...]: The tuple of exception classes to retry on. The task will
will only be retried if that exception that is raised
during task execution is an instance of one of the listed
exception classes.
Examples:
If on=(Exception,) then any kind of exception will trigger
a retry.
If on=(ExceptionA, ExceptionB,) and during task
execution ExceptionC was raised, then retry is not triggered.
"""

max_retries: int | None
on: tuple[type[Exception], ...]


class TaskOptions(t.TypedDict):
"""Specify the options available for a task."""

retry: RetryOptions | None
Loading

0 comments on commit 4d4c0c0

Please sign in to comment.