Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 33 additions & 20 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,34 @@

if TYPE_CHECKING:
from apify_client import ApifyClient, ApifyClientAsync
from apify_client._resource_clients.request_queue import RequestQueueClient, RequestQueueClientAsync
from apify_client._typeddicts import RequestDict, RequestDraftDeleteDict, RequestDraftDict


async def ensure_queue_is_populated(
rq_client: RequestQueueClient | RequestQueueClientAsync,
*,
expected_count: int,
is_async: bool,
) -> None:
"""Poll the queue until `expected_count` requests are visible.

Uses `list_head` (without side effects) so polling does not lock items, which would otherwise
lead to an ambiguous count of actually-locked requests in tests that exercise locking.
"""
head_response: RequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_head(limit=expected_count))
assert isinstance(result, RequestQueueHead)
head_response = result
if len(head_response.items) == expected_count:
break

assert head_response is not None
assert len(head_response.items) == expected_count


async def test_request_queue_collection_list(client: ApifyClient | ApifyClientAsync) -> None:
"""Test listing request queues."""
rq_page = await maybe_await(client.request_queues().list(limit=10))
Expand Down Expand Up @@ -411,17 +436,11 @@ async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClien
rq_client.add_request({'url': f'https://example.com/lock-{i}', 'unique_key': f'lock-{i}'})
)

# Poll until requests are available for locking (eventual consistency)
lock_response: LockedRequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
assert isinstance(result, LockedRequestQueueHead)
lock_response = result
if len(lock_response.items) == 3:
break
await ensure_queue_is_populated(rq_client, expected_count=5, is_async=is_async)

assert lock_response is not None
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
assert isinstance(result, LockedRequestQueueHead)
lock_response = result
assert len(lock_response.items) == 3

# Verify requests are locked
Expand Down Expand Up @@ -522,17 +541,11 @@ async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAs
rq_client.add_request({'url': f'https://example.com/unlock-{i}', 'unique_key': f'unlock-{i}'})
)

# Poll until requests are available for locking (eventual consistency)
lock_response: LockedRequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
assert isinstance(result, LockedRequestQueueHead)
lock_response = result
if len(lock_response.items) == 3:
break
await ensure_queue_is_populated(rq_client, expected_count=5, is_async=is_async)

assert lock_response is not None
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
assert isinstance(result, LockedRequestQueueHead)
lock_response = result
assert len(lock_response.items) == 3

# Unlock all requests
Expand Down
Loading