Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#64) Support tasks retry & propagate raised exception #65

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

imranariffin
Copy link
Owner

@imranariffin imranariffin commented Nov 27, 2023

(#64) Support tasks retry & propagate raised exception

For documentation, see:

  1. Docstring of task.task (which will point to interfaces.TaskOptions and in turn interfaces.RetryOptions)
  2. Tests in tests.test_task e.g. test_retry_as_per_task_definition
  3. Sample usages in tests.apps.simple_app e.g. append_to_file

Changelist:

  • Formalize serialization and deserialization
  • Serialize & deserialize exceptions correctly
  • Encapsulate retry & retry_on in a new dict 'options'
  • Implement serde for AsyncResult
  • Ensure generated file deleted after test
  • Add jsonpickle to toml file
  • Exclude if TYPE_CHECKING: from coverage
  • Add test for singleton
  • Add logging for worker
  • Wrap all constants inside Config class

@codecov-commenter
Copy link

codecov-commenter commented Nov 28, 2023

Codecov Report

Attention: Patch coverage is 99.76359% with 1 lines in your changes are missing coverage. Please review.

Project coverage is 99.59%. Comparing base (f02f6ce) to head (5bd0afa).

Files Patch % Lines
src/aiotaskq/task.py 98.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #65      +/-   ##
==========================================
+ Coverage   99.21%   99.59%   +0.38%     
==========================================
  Files          18       21       +3     
  Lines         635      988     +353     
==========================================
+ Hits          630      984     +354     
+ Misses          5        4       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@imranariffin imranariffin changed the title 64/support retry on tasks (#64) Support tasks retry & propagate raised exception Dec 3, 2023
@imranariffin imranariffin marked this pull request as ready for review December 3, 2023 04:37
@imranariffin imranariffin force-pushed the 64/support-retry-on-tasks branch 2 times, most recently from d09a7eb to 8a2e01a Compare December 3, 2023 05:32
For documentation, see:
1. Docstring of `task.task`
2. Tests in `tests.test_task` e.g. `test_retry_as_per_task_definition`
3. Sample usages in `tests.apps.simple_app` e.g. `append_to_file`

Changelist:

* Formalize serialization and deserialization
* Serialize & deserialize exceptions correctly
* Encapsulate retry & retry_on in a new dict 'options'
* Implement serde for AsyncResult
* Ensure generated file deleted after test
* Add jsonpickle to toml file
* Exclude `if TYPE_CHECKING:` from coverage
* Add test for singleton
* Add logging for worker
* Wrap all constants inside `Config` class

Signed-off-by: Imran Ariffin <[email protected]>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably rename this to config.py

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

return _TASKS_CHANNEL

@staticmethod
def results_channel_template() -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to split this into two files, one for config & one for constants

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm maybe that's a good idea

self.ready = ready
self.result = result
self.error = error
self.pubsub = PubSub.get(url=Config.broker_url(), poll_interval_s=0.01)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this since I think it's not used here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, got lost during refactoring

self.pubsub = PubSub.get(url=Config.broker_url(), poll_interval_s=0.01)

@classmethod
async def from_publisher(cls, task_id: str) -> "AsyncResult":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should move this polling logic elsewhere & keep AsyncResult as a simple result definition

Copy link
Owner Author

@imranariffin imranariffin Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same but I couldn't find a way to do it without breaking this kind of interface:

@task()
async def some_task() -> int:
  return 42


some_awaitable: Awaitable[int] = some_task.apply_async()
ret: int = await some_awaitable
assert ret == 42
# Or
# ret: int = await some_task.apply_async()
# assert ret == 42

Let me think about this more

await pubsub.publish(channel=result_channel, message=task_result)
retry = False
error = None
retries: int = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just initiate this as 0?

Copy link
Owner Author

@imranariffin imranariffin Dec 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of differentiating between "there will be a first retry (0)" vs "there will be no retry at all (None)".

# Retry if still within retry limit
if retry:
async with redis.from_url(url="redis://localhost:6379") as redis_client:
retries = int(await redis_client.get(f"retry:{task.id}") or 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If redis_client.get keeps returning None or something, it might mean we will never stop retrying

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants