Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trouble with mocking aiopg #146

Open
DrMartiner opened this issue Jan 5, 2020 · 0 comments
Open

Trouble with mocking aiopg #146

DrMartiner opened this issue Jan 5, 2020 · 0 comments

Comments

@DrMartiner
Copy link

Hi everyone,

Could u help me to mock aiopg's connection pool? I have error object MagicMock can't be used in 'await' expression

import asynctest
import pytest
from asyncmock import AsyncMock

pytestmark = pytest.mark.asyncio


@pytest.yield_fixture(scope="function")
def pg_pool():
    pg_pool_mock = asynctest.CoroutineMock()
    pg_pool_mock.acquire.__aenter__.cursor.__aenter__.begin.return_value = AsyncMock()

    return pg_pool_mock


async def test_do_something(pg_pool):
    await do_something(pg_pool)
import asyncio

import aiopg


async def main():
    async with aiopg.create_pool("DSN string") as pg_pool:
        await asyncio.gather(*[do_something(pg_pool) for i in range(10)])


async def do_something(pg_pool):
    async with pg_pool.acquire() as connection:
        async with connection.cursor() as cursor:
            transaction = await cursor.begin()  # object MagicMock can't be used in 'await' expression
            try:
                await cursor.execute("...", [])
                await transaction.commit()
            except Exception as e:
                await transaction.rollback()

if __name__ == '__main__':
    asyncio.run(main())

Best regards,
Alex

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant