Skip to content

flaky test_drop_with_waiter #7092

Description

@quasiben

Seen in #7089

https://github.com/dask/distributed/actions/runs/3154045824/jobs/5132914151

____________________________ test_drop_with_waiter _____________________________

args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:79: in inner
    return func(*args, **kwds)
distributed/utils_test.py:1112: in test_func
    return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:387: in _run_and_close_tornado
    return asyncio.run(inner_fn())
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/runners.py:44: in run
    return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:646: in run_until_complete
    return future.result()
distributed/utils_test.py:384: in inner_fn
    return await async_fn(*args, **kwargs)
distributed/utils_test.py:1109: in async_fn_outer
    return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
../../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/tasks.py:445: in wait_for
    return fut.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    async def async_fn():
        result = None
        with dask.config.set(config):
            async with _cluster_factory() as (s, workers), _client_factory(
                s
            ) as c:
                args = [s] + workers
                if c is not None:
                    args = [c] + args
                try:
                    coro = func(*args, *outer_args, **kwargs)
                    task = asyncio.create_task(coro)
                    coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
                    result = await coro2
                    validate_state(s, *workers)
    
                except asyncio.TimeoutError:
                    assert task
                    buffer = io.StringIO()
                    # This stack indicates where the coro/test is suspended
                    task.print_stack(file=buffer)
    
                    if cluster_dump_directory:
                        await dump_cluster_state(
                            s=s,
                            ws=workers,
                            output_dir=cluster_dump_directory,
                            func_name=func.__name__,
                        )
    
                    task.cancel()
                    while not task.cancelled():
                        await asyncio.sleep(0.01)
    
                    # Hopefully, the hang has been caused by inconsistent
                    # state, which should be much more meaningful than the
                    # timeout
                    validate_state(s, *workers)
    
                    # Remove as much of the traceback as possible; it's
                    # uninteresting boilerplate from utils_test and asyncio
                    # and not from the code being tested.
>                   raise asyncio.TimeoutError(
                        f"Test timeout after {timeout}s.\n"
                        "========== Test stack trace starts here ==========\n"
                        f"{buffer.getvalue()}"
                    ) from None
E                   asyncio.exceptions.TimeoutError: Test timeout after 30s.
E                   ========== Test stack trace starts here ==========
E                   Stack for <Task pending name='Task-67488' coro=<test_drop_with_waiter() running at /Users/runner/work/distributed/distributed/distributed/tests/test_active_memory_manager.py:317> wait_for=<Future pending cb=[Task.task_wakeup()]>> (most recent call last):
E                     File "/Users/runner/work/distributed/distributed/distributed/tests/test_active_memory_manager.py", line 317, in test_drop_with_waiter
E                       await asyncio.sleep(0.01)

distributed/utils_test.py:1046: TimeoutError

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions