-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(#64) Support tasks retry & propagate raised exception #65
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #65 +/- ##
==========================================
+ Coverage 99.21% 99.59% +0.38%
==========================================
Files 18 21 +3
Lines 635 988 +353
==========================================
+ Hits 630 984 +354
+ Misses 5 4 -1 ☔ View full report in Codecov by Sentry. |
aaeb820
to
bff64dc
Compare
d09a7eb
to
8a2e01a
Compare
For documentation, see: 1. Docstring of `task.task` 2. Tests in `tests.test_task` e.g. `test_retry_as_per_task_definition` 3. Sample usages in `tests.apps.simple_app` e.g. `append_to_file` Changelist: * Formalize serialization and deserialization * Serialize & deserialize exceptions correctly * Encapsulate retry & retry_on in a new dict 'options' * Implement serde for AsyncResult * Ensure generated file deleted after test * Add jsonpickle to toml file * Exclude `if TYPE_CHECKING:` from coverage * Add test for singleton * Add logging for worker * Wrap all constants inside `Config` class Signed-off-by: Imran Ariffin <[email protected]>
8a2e01a
to
4d4c0c0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably rename this to config.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
return _TASKS_CHANNEL | ||
|
||
@staticmethod | ||
def results_channel_template() -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe better to split this into two files, one for config & one for constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm maybe that's a good idea
src/aiotaskq/task.py
Outdated
self.ready = ready | ||
self.result = result | ||
self.error = error | ||
self.pubsub = PubSub.get(url=Config.broker_url(), poll_interval_s=0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this since I think it's not used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, got lost during refactoring
self.pubsub = PubSub.get(url=Config.broker_url(), poll_interval_s=0.01) | ||
|
||
@classmethod | ||
async def from_publisher(cls, task_id: str) -> "AsyncResult": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we should move this polling logic elsewhere & keep AsyncResult
as a simple result definition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the same but I couldn't find a way to do it without breaking this kind of interface:
@task()
async def some_task() -> int:
return 42
some_awaitable: Awaitable[int] = some_task.apply_async()
ret: int = await some_awaitable
assert ret == 42
# Or
# ret: int = await some_task.apply_async()
# assert ret == 42
Let me think about this more
await pubsub.publish(channel=result_channel, message=task_result) | ||
retry = False | ||
error = None | ||
retries: int = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just initiate this as 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of differentiating between "there will be a first retry (0)" vs "there will be no retry at all (None)".
src/aiotaskq/worker.py
Outdated
# Retry if still within retry limit | ||
if retry: | ||
async with redis.from_url(url="redis://localhost:6379") as redis_client: | ||
retries = int(await redis_client.get(f"retry:{task.id}") or 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If redis_client.get
keeps returning None
or something, it might mean we will never stop retrying
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
(#64) Support tasks retry & propagate raised exception
For documentation, see:
task.task
(which will point tointerfaces.TaskOptions
and in turninterfaces.RetryOptions
)tests.test_task
e.g.test_retry_as_per_task_definition
tests.apps.simple_app
e.g.append_to_file
Changelist:
if TYPE_CHECKING:
from coverageConfig
class