Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 70 additions & 18 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
from apify.storage_clients._apify._models import ApifyRequestQueueMetadata

# In shared mode, there is a propagation delay between operations so we use test helper
# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808.
# `call_with_exp_backoff` for exponential backoff. The delay also means that the relative order of requests
# added or reclaimed close together is not guaranteed, so order-sensitive tests wait for propagation and
# relax exact-order assertions in shared mode. See https://github.com/apify/apify-sdk-python/issues/808.

# How long to wait in shared mode for forefront operations to propagate to the queue head before fetching.
_SHARED_MODE_PROPAGATION_DELAY = 10


async def test_add_and_fetch_requests(
Expand Down Expand Up @@ -162,6 +167,11 @@ async def test_forefront_requests_ordering(
total_count = await rq.get_total_count()
Actor.log.info(f'Added 2 forefront requests, total in queue: {total_count}')

if rq_access_mode == 'shared':
# Wait for the forefront requests to propagate to the queue head, so that no regular request is fetched
# before them (see the note on shared-mode propagation delay at the top of this module).
await asyncio.sleep(_SHARED_MODE_PROPAGATION_DELAY)

# Fetch requests and verify order.
fetched_urls = []
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
Expand All @@ -170,17 +180,30 @@ async def test_forefront_requests_ordering(
await rq.mark_request_as_handled(next_request)

# Forefront requests should come first (in reverse order of addition)
expected_order = [
'https://example.com/priority2',
'https://example.com/priority1',
'https://example.com/1',
'https://example.com/2',
'https://example.com/3',
]
assert fetched_urls == expected_order, (
f'fetched_urls={fetched_urls}',
f'expected_order={expected_order}',
)
assert len(fetched_urls) == 5, f'fetched_urls={fetched_urls}'
if rq_access_mode == 'shared':
# In shared mode, the relative order of requests added close together is not guaranteed (see the note at
# the top of this module). Only verify that the forefront requests come before the regular ones.
assert set(fetched_urls[:2]) == {'https://example.com/priority1', 'https://example.com/priority2'}, (
f'fetched_urls={fetched_urls}'
)
assert set(fetched_urls[2:]) == {
'https://example.com/1',
'https://example.com/2',
'https://example.com/3',
}, f'fetched_urls={fetched_urls}'
else:
expected_order = [
'https://example.com/priority2',
'https://example.com/priority1',
'https://example.com/1',
'https://example.com/2',
'https://example.com/3',
]
assert fetched_urls == expected_order, (
f'fetched_urls={fetched_urls}',
f'expected_order={expected_order}',
)


async def test_request_unique_key_behavior(
Expand Down Expand Up @@ -301,6 +324,11 @@ async def test_request_reclaim_with_forefront(
await rq.reclaim_request(first_request, forefront=True)
Actor.log.info('Request reclaimed to forefront')

if rq_access_mode == 'shared':
# Wait for the reclaimed request to propagate to the queue head, so that no regular request is fetched
# before it (see the note on shared-mode propagation delay at the top of this module).
await asyncio.sleep(_SHARED_MODE_PROPAGATION_DELAY)

# The reclaimed request should be fetched first again. A reclaimed request may take a moment to reappear
# at the queue head (eventually-consistent API state), even in single mode, so poll until it does.
next_request = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None)
Expand Down Expand Up @@ -797,7 +825,13 @@ async def test_request_ordering_with_mixed_operations(
# Fetch one and reclaim to forefront.
request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
assert request1 is not None, f'request1={request1}'
assert request1.url == 'https://example.com/1', f'request1.url={request1.url}'
if rq_access_mode == 'shared':
# In shared mode, the relative order of requests added close together is not guaranteed (see the note at
# the top of this module), so the first fetched request may be either of the two initial ones.
assert request1.url in {'https://example.com/1', 'https://example.com/2'}, f'request1.url={request1.url}'
else:
assert request1.url == 'https://example.com/1', f'request1.url={request1.url}'
remaining_url = ({'https://example.com/1', 'https://example.com/2'} - {request1.url}).pop()
Actor.log.info(f'Fetched request: {request1.url}')

await rq.reclaim_request(request1, forefront=True)
Expand All @@ -807,6 +841,11 @@ async def test_request_ordering_with_mixed_operations(
await rq.add_request('https://example.com/priority', forefront=True)
Actor.log.info('Added new forefront request')

if rq_access_mode == 'shared':
# Wait for the forefront operations to propagate to the queue head, so that the regular request is not
# fetched before them (see the note on shared-mode propagation delay at the top of this module).
await asyncio.sleep(_SHARED_MODE_PROPAGATION_DELAY)

# Fetch all requests and verify forefront behavior.
urls_ordered = list[str]()
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
Expand All @@ -818,12 +857,25 @@ async def test_request_ordering_with_mixed_operations(
# Verify that we got all 3 requests
assert len(urls_ordered) == 3, f'len(urls_ordered)={len(urls_ordered)}'

assert urls_ordered[0] == 'https://example.com/priority', f'urls_ordered[0]={urls_ordered[0]}'
assert urls_ordered[1] == request1.url, (
f'urls_ordered[1]={urls_ordered[1]}',
f'request1.url={request1.url}',
if rq_access_mode == 'shared':
# In shared mode, the relative order of two forefront operations performed close together (the reclaim of
# request1 and the add of the priority request) is not guaranteed (see the note at the top of this
# module). Only verify that both forefront requests come before the regular one.
assert set(urls_ordered[:2]) == {'https://example.com/priority', request1.url}, (
f'urls_ordered={urls_ordered}',
f'request1.url={request1.url}',
)
else:
assert urls_ordered[0] == 'https://example.com/priority', f'urls_ordered[0]={urls_ordered[0]}'
assert urls_ordered[1] == request1.url, (
f'urls_ordered[1]={urls_ordered[1]}',
f'request1.url={request1.url}',
)

assert urls_ordered[2] == remaining_url, (
f'urls_ordered[2]={urls_ordered[2]}',
f'remaining_url={remaining_url}',
)
assert urls_ordered[2] == 'https://example.com/2', f'urls_ordered[2]={urls_ordered[2]}'
Actor.log.info('Request ordering verified successfully')


Expand Down
Loading