Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#64) Support tasks retry & propagate raised exception #65

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ source = src/
parallel = True
concurrency = multiprocessing
sigterm = True

[report]
exclude_lines =
pragma: no cover
if TYPE_CHECKING:
if t.TYPE_CHECKING:
12 changes: 8 additions & 4 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,16 @@ ignored-parents=
max-args=10

# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=10

# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5

# Maximum number of branch for function / method body.
max-branches=12
max-branches=15

# Maximum number of locals for function / method body.
max-locals=15
max-locals=20

# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down Expand Up @@ -458,7 +458,11 @@ good-names=i,
a,
b,
c,
n
n,
id,
e,
s,
on

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
6 changes: 5 additions & 1 deletion .pylintrc.tests
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,12 @@ good-names=i,
a,
b,
c,
e,
n,
ls
t,
ls,
fo,
fi

# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
Expand Down
13 changes: 12 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"justMyCode": false
},
{
"name": "Main",
"type": "python",
Expand Down Expand Up @@ -31,7 +41,8 @@
"-s",
],
"request": "launch",
"console": "integratedTerminal"
"console": "integratedTerminal",
"justMyCode": false
},
{
"name": "Sample Worker (Simple App)",
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import asyncio
import aiotaskq


@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down Expand Up @@ -132,22 +132,22 @@ import asyncio
from aiotaskq import task


@task
@task()
def task_1(*args, **kwargs):
pass


@task
@task()
def task_2(*args, **kwargs):
pass


@task
@task()
def task_3(*args, **kwargs):
pass


@task
@task()
def task_4(*args, **kwargs):
pass

Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ build-backend = "setuptools.build_meta"
requires-python = ">=3.9"
dependencies = [
"aioredis >= 2.0.0, < 2.1.0",
"jsonpickle >= 3.0.0, < 3.1.0",
"tomlkit >= 0.11.0, < 0.12.0",
"typer >= 0.4.0, < 0.5.0",
]
name = "aiotaskq"
version = "0.0.12"
version = "0.0.13"
readme = "README.md"
description = "A simple asynchronous task queue"
authors = [
Expand All @@ -28,7 +29,7 @@ license = { file = "LICENSE" }

[project.optional-dependencies]
dev = [
"black >= 22.1.0, < 22.2.0",
"black >= 22.2.0, < 23.0.0",
"coverage >= 6.4.0, < 6.5.0",
"mypy >= 0.931, < 1.0",
"mypy-extensions >= 0.4.0, < 0.5.0",
Expand Down
2 changes: 1 addition & 1 deletion src/aiotaskq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import aiotaskq
@aiotaskq.task
@aiotaskq.task()
def some_task(b: int) -> int:
# Some task with high cpu usage
def _naive_fib(n: int) -> int:
Expand Down
5 changes: 4 additions & 1 deletion src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

#!/usr/bin/env python

import logging
import typing as t

import typer

from . import __version__
from .constants import Config
from .interfaces import ConcurrencyType
from .worker import Defaults, run_worker_forever
from . import __version__

cli = typer.Typer()
logging.basicConfig(level=Config.log_level())


def _version_callback(value: bool):
Expand Down
64 changes: 60 additions & 4 deletions src/aiotaskq/constants.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably rename this to config.py

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Original file line number Diff line number Diff line change
@@ -1,5 +1,61 @@
"""Module to define and store all constants used across the library."""
"""
Module to define and store all constants used across the library.
REDIS_URL = "redis://127.0.0.1:6379"
TASKS_CHANNEL = "channel:tasks"
RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"
The public object from this module is `Config`. This object wraps
all the constants, which include:
- Variables
- Environment variables
- Static methods that return constant values
"""

import logging
from os import environ

from .interfaces import SerializationType

_REDIS_URL = "redis://127.0.0.1:6379"
_TASKS_CHANNEL = "channel:tasks"
_RESULTS_CHANNEL_TEMPLATE = "channel:results:{task_id}"


class Config:
"""
Provide configuration values.
These include:
- Variables
- Environment variables
- Static methods that return constant values
"""

@staticmethod
def serialization_type() -> SerializationType:
"""Return the serialization type as provided via env var AIOTASKQ_SERIALIZATION."""
s: str | None = environ.get("AIOTASKQ_SERIALIZATION", SerializationType.DEFAULT.value)
return SerializationType[s.upper()]

@staticmethod
def log_level() -> int:
"""Return the log level as provided via env var LOG_LEVEL."""
level: int = int(environ.get("AIOTASKQ_LOG_LEVEL", logging.DEBUG))
return level

@staticmethod
def broker_url() -> str:
"""
Return the broker url as provided via env var BROKER_URL.
Defaults to "redis://127.0.0.1:6379".
"""
broker_url: str = environ.get("BROKER_URL", _REDIS_URL)
return broker_url

@staticmethod
def tasks_channel() -> str:
"""Return the channel name used for transporting task requests on the broker."""
return _TASKS_CHANNEL

@staticmethod
def results_channel_template() -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to split this into two files, one for config & one for constants

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm maybe that's a good idea

"""Return the template chnnale name used for transporting task results on the broker."""
return _RESULTS_CHANNEL_TEMPLATE
54 changes: 54 additions & 0 deletions src/aiotaskq/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,57 @@ class IWorkerManager(IWorker):
"""

concurrency_manager: IConcurrencyManager


class SerializationType(str, enum.Enum):
"""Specify the types of serialization supported."""

JSON = "json"
DEFAULT = JSON


T = t.TypeVar("T")


class ISerialization(t.Protocol, t.Generic[T]):
"""Define the interface required to serialize and deserialize a generic object."""

@classmethod
def serialize(cls, obj: T) -> bytes:
"""Serialize any object into bytes."""

@classmethod
def deserialize(cls, klass: type[T], s: bytes) -> T:
"""Deserialize bytes into any object."""


class RetryOptions(t.TypedDict):
"""
Specify the available retry options.
max_retries int | None: The number times to keep retrying the execution of the task
until the task executes successfully. Counting starts from
0 so if max_retries = 2 for example, then the task will execute
1 + 2 times (1 time for first execution, 2 times for re-try).
on tuple[type[Exception], ...]: The tuple of exception classes to retry on. The task will
will only be retried if that exception that is raised
during task execution is an instance of one of the listed
exception classes.
Examples:
If on=(Exception,) then any kind of exception will trigger
a retry.
If on=(ExceptionA, ExceptionB,) and during task
execution ExceptionC was raised, then retry is not triggered.
"""

max_retries: int | None
on: tuple[type[Exception], ...]


class TaskOptions(t.TypedDict):
"""Specify the options available for a task."""

retry: RetryOptions | None
Loading