diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 87ae7e0b..26b46113 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -26,7 +26,12 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata # In shared mode, there is a propagation delay between operations so we use test helper -# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808. +# `call_with_exp_backoff` for exponential backoff. The delay also means that the relative order of requests +# added or reclaimed close together is not guaranteed, so order-sensitive tests wait for propagation and +# relax exact-order assertions in shared mode. See https://github.com/apify/apify-sdk-python/issues/808. + +# How long to wait in shared mode for forefront operations to propagate to the queue head before fetching. +_SHARED_MODE_PROPAGATION_DELAY = 10 async def test_add_and_fetch_requests( @@ -162,6 +167,11 @@ async def test_forefront_requests_ordering( total_count = await rq.get_total_count() Actor.log.info(f'Added 2 forefront requests, total in queue: {total_count}') + if rq_access_mode == 'shared': + # Wait for the forefront requests to propagate to the queue head, so that no regular request is fetched + # before them (see the note on shared-mode propagation delay at the top of this module). + await asyncio.sleep(_SHARED_MODE_PROPAGATION_DELAY) + # Fetch requests and verify order. fetched_urls = [] while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): @@ -170,17 +180,30 @@ async def test_forefront_requests_ordering( await rq.mark_request_as_handled(next_request) # Forefront requests should come first (in reverse order of addition) - expected_order = [ - 'https://example.com/priority2', - 'https://example.com/priority1', - 'https://example.com/1', - 'https://example.com/2', - 'https://example.com/3', - ] - assert fetched_urls == expected_order, ( - f'fetched_urls={fetched_urls}', - f'expected_order={expected_order}', - ) + assert len(fetched_urls) == 5, f'fetched_urls={fetched_urls}' + if rq_access_mode == 'shared': + # In shared mode, the relative order of requests added close together is not guaranteed (see the note at + # the top of this module). Only verify that the forefront requests come before the regular ones. + assert set(fetched_urls[:2]) == {'https://example.com/priority1', 'https://example.com/priority2'}, ( + f'fetched_urls={fetched_urls}' + ) + assert set(fetched_urls[2:]) == { + 'https://example.com/1', + 'https://example.com/2', + 'https://example.com/3', + }, f'fetched_urls={fetched_urls}' + else: + expected_order = [ + 'https://example.com/priority2', + 'https://example.com/priority1', + 'https://example.com/1', + 'https://example.com/2', + 'https://example.com/3', + ] + assert fetched_urls == expected_order, ( + f'fetched_urls={fetched_urls}', + f'expected_order={expected_order}', + ) async def test_request_unique_key_behavior( @@ -301,6 +324,11 @@ async def test_request_reclaim_with_forefront( await rq.reclaim_request(first_request, forefront=True) Actor.log.info('Request reclaimed to forefront') + if rq_access_mode == 'shared': + # Wait for the reclaimed request to propagate to the queue head, so that no regular request is fetched + # before it (see the note on shared-mode propagation delay at the top of this module). + await asyncio.sleep(_SHARED_MODE_PROPAGATION_DELAY) + # The reclaimed request should be fetched first again. A reclaimed request may take a moment to reappear # at the queue head (eventually-consistent API state), even in single mode, so poll until it does. next_request = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None) @@ -797,7 +825,13 @@ async def test_request_ordering_with_mixed_operations( # Fetch one and reclaim to forefront. request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request1 is not None, f'request1={request1}' - assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' + if rq_access_mode == 'shared': + # In shared mode, the relative order of requests added close together is not guaranteed (see the note at + # the top of this module), so the first fetched request may be either of the two initial ones. + assert request1.url in {'https://example.com/1', 'https://example.com/2'}, f'request1.url={request1.url}' + else: + assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' + remaining_url = ({'https://example.com/1', 'https://example.com/2'} - {request1.url}).pop() Actor.log.info(f'Fetched request: {request1.url}') await rq.reclaim_request(request1, forefront=True) @@ -807,6 +841,11 @@ async def test_request_ordering_with_mixed_operations( await rq.add_request('https://example.com/priority', forefront=True) Actor.log.info('Added new forefront request') + if rq_access_mode == 'shared': + # Wait for the forefront operations to propagate to the queue head, so that the regular request is not + # fetched before them (see the note on shared-mode propagation delay at the top of this module). + await asyncio.sleep(_SHARED_MODE_PROPAGATION_DELAY) + # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): @@ -818,12 +857,25 @@ async def test_request_ordering_with_mixed_operations( # Verify that we got all 3 requests assert len(urls_ordered) == 3, f'len(urls_ordered)={len(urls_ordered)}' - assert urls_ordered[0] == 'https://example.com/priority', f'urls_ordered[0]={urls_ordered[0]}' - assert urls_ordered[1] == request1.url, ( - f'urls_ordered[1]={urls_ordered[1]}', - f'request1.url={request1.url}', + if rq_access_mode == 'shared': + # In shared mode, the relative order of two forefront operations performed close together (the reclaim of + # request1 and the add of the priority request) is not guaranteed (see the note at the top of this + # module). Only verify that both forefront requests come before the regular one. + assert set(urls_ordered[:2]) == {'https://example.com/priority', request1.url}, ( + f'urls_ordered={urls_ordered}', + f'request1.url={request1.url}', + ) + else: + assert urls_ordered[0] == 'https://example.com/priority', f'urls_ordered[0]={urls_ordered[0]}' + assert urls_ordered[1] == request1.url, ( + f'urls_ordered[1]={urls_ordered[1]}', + f'request1.url={request1.url}', + ) + + assert urls_ordered[2] == remaining_url, ( + f'urls_ordered[2]={urls_ordered[2]}', + f'remaining_url={remaining_url}', ) - assert urls_ordered[2] == 'https://example.com/2', f'urls_ordered[2]={urls_ordered[2]}' Actor.log.info('Request ordering verified successfully')