You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Since updating pytest-celery to version 1.0.0 I cannot get async celery tasks to be executed in test.
...
@pytest.fixture(autouse=True)
def setUp(self, transactional_db):
...
@shared_task
def some_celery_task(a, b):
return a + b
@pytest.mark.celery(task_always_eager=True)
def test_celery_task(
self,
celery_app,
celery_worker,
):
res = some_celery_task.apply_async((2, 3))
res.collect()
When I checked what celery apps are used in the test I see that application created by the celery_app fixture, app used for the celery_worker fixture, app for the task and app used for my project are all different apps.
(
<Celery ecomm at 0x109eb6f30>,
<Celery celery.tests at 0x113bdbc20>,
<Celery celery_test_app at 0x113c50c20>,
<Celery celery_test_app at 0x113bd86e0>
)
I do not have any custom setup for the pytest related to the celery.
The text was updated successfully, but these errors were encountered:
Accidentally I figured out that this way test works. As you can see, I had to explicitly set celery_app created by the fixture as current, and remove celery_worker fixture. This way the task was executed. (I changed the test a little bit, but those changes have no effect on the issue itself.)
@pytest.mark.celery(task_always_eager=True)
def test_celery_task(
self,
celery_app,
):
celery_app.set_current()
res = some_celery_task.apply_async((2, 3), queue="celery")
a = res.get()
assert a == 5
The celery_worker is not compatible with the v0.0.0 API.
Basically, the v0.0.0 testing infrastructure is in celery. The v1.0.0 is a new framework, to which celery_worker belongs, that is based on the source code of the pytest-celery repo itself.
So it appears you were trying to include a fixture from the new framework, into a test that is using the v0.0.0 infra which is why there were conflicts.
celery_app.set_current()
However, this shouldn’t be needed if you remove the celery_worker fixture.
P.S
I know it’s a bit confusing, but that’s how it works right now.
Closing this issue - let me know if it needs reopening.
Since updating pytest-celery to version 1.0.0 I cannot get async celery tasks to be executed in test.
When I checked what celery apps are used in the test I see that application created by the celery_app fixture, app used for the celery_worker fixture, app for the task and app used for my project are all different apps.
I do not have any custom setup for the pytest related to the celery.
The text was updated successfully, but these errors were encountered: