Skip to content

Flaky test_close_gracefully #4201

Description

@jrbourbeau

test_close_gracefully failed on CI build over in #4192 which contained seemly unrelated changes

Traceback:
2020-10-29T04:49:56.4843472Z ____________________________ test_close_gracefully ____________________________
2020-10-29T04:49:56.4843705Z 
2020-10-29T04:49:56.4844237Z     def test_func():
2020-10-29T04:49:56.4844564Z         result = None
2020-10-29T04:49:56.4844855Z         workers = []
2020-10-29T04:49:56.4845243Z         with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2020-10-29T04:49:56.4845816Z     
2020-10-29T04:49:56.4846289Z             async def coro():
2020-10-29T04:49:56.4847778Z                 with dask.config.set(config):
2020-10-29T04:49:56.4848155Z                     s = False
2020-10-29T04:49:56.4848442Z                     for i in range(5):
2020-10-29T04:49:56.4848724Z                         try:
2020-10-29T04:49:56.4849002Z                             s, ws = await start_cluster(
2020-10-29T04:49:56.4849356Z                                 nthreads,
2020-10-29T04:49:56.4849673Z                                 scheduler,
2020-10-29T04:49:56.4849969Z                                 loop,
2020-10-29T04:49:56.4850291Z                                 security=security,
2020-10-29T04:49:56.4850651Z                                 Worker=Worker,
2020-10-29T04:49:56.4851060Z                                 scheduler_kwargs=scheduler_kwargs,
2020-10-29T04:49:56.4851514Z                                 worker_kwargs=worker_kwargs,
2020-10-29T04:49:56.4851979Z                             )
2020-10-29T04:49:56.4852296Z                         except Exception as e:
2020-10-29T04:49:56.4852657Z                             logger.error(
2020-10-29T04:49:56.4853081Z                                 "Failed to start gen_cluster, retrying",
2020-10-29T04:49:56.4853480Z                                 exc_info=True,
2020-10-29T04:49:56.4853748Z                             )
2020-10-29T04:49:56.4854070Z                             await asyncio.sleep(1)
2020-10-29T04:49:56.4854418Z                         else:
2020-10-29T04:49:56.4854660Z                             workers[:] = ws
2020-10-29T04:49:56.4854972Z                             args = [s] + workers
2020-10-29T04:49:56.4855260Z                             break
2020-10-29T04:49:56.4855549Z                     if s is False:
2020-10-29T04:49:56.4856004Z                         raise Exception("Could not start cluster")
2020-10-29T04:49:56.4856561Z                     if client:
2020-10-29T04:49:56.4857046Z                         c = await Client(
2020-10-29T04:49:56.4857364Z                             s.address,
2020-10-29T04:49:56.4857618Z                             loop=loop,
2020-10-29T04:49:56.4857951Z                             security=security,
2020-10-29T04:49:56.4858330Z                             asynchronous=True,
2020-10-29T04:49:56.4858685Z                             **client_kwargs,
2020-10-29T04:49:56.4858962Z                         )
2020-10-29T04:49:56.4859226Z                         args = [c] + args
2020-10-29T04:49:56.4859494Z                     try:
2020-10-29T04:49:56.4859788Z                         future = func(*args)
2020-10-29T04:49:56.4860060Z                         if timeout:
2020-10-29T04:49:56.4860962Z                             future = asyncio.wait_for(future, timeout)
2020-10-29T04:49:56.4861619Z                         result = await future
2020-10-29T04:49:56.4861962Z                         if s.validate:
2020-10-29T04:49:56.4862307Z                             s.validate_state()
2020-10-29T04:49:56.4862624Z                     finally:
2020-10-29T04:49:56.4863017Z                         if client and c.status not in ("closing", "closed"):
2020-10-29T04:49:56.4863530Z                             await c._close(fast=s.status == Status.closed)
2020-10-29T04:49:56.4863951Z                         await end_cluster(s, workers)
2020-10-29T04:49:56.4864428Z                         await asyncio.wait_for(cleanup_global_workers(), 1)
2020-10-29T04:49:56.4864809Z     
2020-10-29T04:49:56.4865045Z                     try:
2020-10-29T04:49:56.4865358Z                         c = await default_client()
2020-10-29T04:49:56.4865728Z                     except ValueError:
2020-10-29T04:49:56.4866108Z                         pass
2020-10-29T04:49:56.4866410Z                     else:
2020-10-29T04:49:56.4866738Z                         await c._close(fast=True)
2020-10-29T04:49:56.4867035Z     
2020-10-29T04:49:56.4867315Z                     def get_unclosed():
2020-10-29T04:49:56.4867744Z                         return [c for c in Comm._instances if not c.closed()] + [
2020-10-29T04:49:56.4868130Z                             c
2020-10-29T04:49:56.4868495Z                             for c in _global_clients.values()
2020-10-29T04:49:56.4868899Z                             if c.status != "closed"
2020-10-29T04:49:56.4869190Z                         ]
2020-10-29T04:49:56.4869372Z     
2020-10-29T04:49:56.4869608Z                     try:
2020-10-29T04:49:56.4869889Z                         start = time()
2020-10-29T04:49:56.4870205Z                         while time() < start + 5:
2020-10-29T04:49:56.4870901Z                             gc.collect()
2020-10-29T04:49:56.4871385Z                             if not get_unclosed():
2020-10-29T04:49:56.4871668Z                                 break
2020-10-29T04:49:56.4871973Z                             await asyncio.sleep(0.05)
2020-10-29T04:49:56.4872313Z                         else:
2020-10-29T04:49:56.4872780Z                             if allow_unclosed:
2020-10-29T04:49:56.4873165Z                                 print(f"Unclosed Comms: {get_unclosed()}")
2020-10-29T04:49:56.4873509Z                             else:
2020-10-29T04:49:56.4873910Z                                 raise RuntimeError("Unclosed Comms", get_unclosed())
2020-10-29T04:49:56.4874317Z                     finally:
2020-10-29T04:49:56.4874658Z                         Comm._instances.clear()
2020-10-29T04:49:56.4875046Z                         _global_clients.clear()
2020-10-29T04:49:56.4875662Z     
2020-10-29T04:49:56.4875972Z                     return result
2020-10-29T04:49:56.4876244Z     
2020-10-29T04:49:56.4876538Z >           result = loop.run_sync(
2020-10-29T04:49:56.4876976Z                 coro, timeout=timeout * 2 if timeout else timeout
2020-10-29T04:49:56.4877340Z             )
2020-10-29T04:49:56.4877483Z 
2020-10-29T04:49:56.4877807Z distributed\utils_test.py:953: 
2020-10-29T04:49:56.4878148Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-10-29T04:49:56.4879367Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:532: in run_sync
2020-10-29T04:49:56.4880323Z     return future_cell[0].result()
2020-10-29T04:49:56.4880862Z distributed\utils_test.py:912: in coro
2020-10-29T04:49:56.4881322Z     result = await future
2020-10-29T04:49:56.4881967Z C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:491: in wait_for
2020-10-29T04:49:56.4882612Z     return fut.result()
2020-10-29T04:49:56.4883043Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-10-29T04:49:56.4883363Z 
2020-10-29T04:49:56.4883772Z c = <Client: not connected>
2020-10-29T04:49:56.4884424Z s = <Scheduler: "tcp://127.0.0.1:56676" processes: 0 cores: 0>
2020-10-29T04:49:56.4885172Z a = <Worker: 'tcp://127.0.0.1:56677', 0, Status.closed, stored: 1, running: -1/1, ready: 0, comm: 0, waiting: 0>
2020-10-29T04:49:56.4886279Z b = <Worker: 'tcp://127.0.0.1:56679', 1, Status.closed, stored: 2, running: 2/2, ready: 96, comm: 0, waiting: 0>
2020-10-29T04:49:56.4886942Z 
2020-10-29T04:49:56.4887385Z     @gen_cluster(client=True)
2020-10-29T04:49:56.4887907Z     async def test_close_gracefully(c, s, a, b):
2020-10-29T04:49:56.4888634Z         futures = c.map(slowinc, range(200), delay=0.1)
2020-10-29T04:49:56.4889306Z         while not b.data:
2020-10-29T04:49:56.4889773Z             await asyncio.sleep(0.1)
2020-10-29T04:49:56.4890187Z     
2020-10-29T04:49:56.4890509Z         mem = set(b.data)
2020-10-29T04:49:56.4891067Z         proc = [ts for ts in b.tasks.values() if ts.state == "executing"]
2020-10-29T04:49:56.4891584Z     
2020-10-29T04:49:56.4892279Z         await b.close_gracefully()
2020-10-29T04:49:56.4893608Z     
2020-10-29T04:49:56.4894190Z         assert b.status == Status.closed
2020-10-29T04:49:56.4894753Z         assert b.address not in s.workers
2020-10-29T04:49:56.4895312Z         assert mem.issubset(set(a.data))
2020-10-29T04:49:56.4895749Z         for ts in proc:
2020-10-29T04:49:56.4896333Z >           assert ts.state in ("processing", "memory")
2020-10-29T04:49:56.4897037Z E           AssertionError: assert 'executing' in ('processing', 'memory')
2020-10-29T04:49:56.4898093Z E            +  where 'executing' = <Task 'slowinc-a373197c122b45e71acceb95cd7cb992' executing>.state
2020-10-29T04:49:56.4899687Z 
2020-10-29T04:49:56.4900293Z distributed\tests\test_worker.py:1574: AssertionError
2020-10-29T04:49:56.4900923Z ---------------------------- Captured stderr call -----------------------------
2020-10-29T04:49:56.4901550Z distributed.scheduler - INFO - Clear task state
2020-10-29T04:49:56.4902359Z distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:56676
2020-10-29T04:49:56.4903196Z distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
2020-10-29T04:49:56.4904078Z distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:56677
2020-10-29T04:49:56.4905095Z distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:56677
2020-10-29T04:49:56.4905807Z distributed.worker - INFO -          dashboard at:            127.0.0.1:56678
2020-10-29T04:49:56.4907957Z distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:56676
2020-10-29T04:49:56.4908728Z distributed.worker - INFO - -------------------------------------------------
2020-10-29T04:49:56.4909379Z distributed.worker - INFO -               Threads:                          1
2020-10-29T04:49:56.4910397Z distributed.worker - INFO -                Memory:                    7.52 GB
2020-10-29T04:49:56.4911480Z distributed.worker - INFO -       Local Directory: D:\a\distributed\distributed\dask-worker-space\worker-b2i2kj7f
2020-10-29T04:49:56.4912410Z distributed.worker - INFO - -------------------------------------------------
2020-10-29T04:49:56.4913416Z distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:56679
2020-10-29T04:49:56.4914250Z distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:56679
2020-10-29T04:49:56.4915004Z distributed.worker - INFO -          dashboard at:            127.0.0.1:56680
2020-10-29T04:49:56.4915768Z distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:56676
2020-10-29T04:49:56.4916927Z distributed.worker - INFO - -------------------------------------------------
2020-10-29T04:49:56.4917587Z distributed.worker - INFO -               Threads:                          2
2020-10-29T04:49:56.4918290Z distributed.worker - INFO -                Memory:                    7.52 GB
2020-10-29T04:49:56.4919199Z distributed.worker - INFO -       Local Directory: D:\a\distributed\distributed\dask-worker-space\worker-xvpum_ry
2020-10-29T04:49:56.4920137Z distributed.worker - INFO - -------------------------------------------------
2020-10-29T04:49:56.4921130Z distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:56677', name: 0, memory: 0, processing: 0>
2020-10-29T04:49:56.4922485Z distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:56677
2020-10-29T04:49:56.4923428Z distributed.core - INFO - Starting established connection
2020-10-29T04:49:56.4924229Z distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:56676
2020-10-29T04:49:56.4925860Z distributed.worker - INFO - -------------------------------------------------
2020-10-29T04:49:56.4927836Z distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:56679', name: 1, memory: 0, processing: 0>
2020-10-29T04:49:56.4928907Z distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:56679
2020-10-29T04:49:56.4929793Z distributed.core - INFO - Starting established connection
2020-10-29T04:49:56.4930737Z distributed.core - INFO - Starting established connection
2020-10-29T04:49:56.4931601Z distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:56676
2020-10-29T04:49:56.4932466Z distributed.worker - INFO - -------------------------------------------------
2020-10-29T04:49:56.4933182Z distributed.core - INFO - Starting established connection
2020-10-29T04:49:56.4934569Z distributed.scheduler - INFO - Receive client connection: Client-1c9286f8-19a2-11eb-9780-000d3ae52c36
2020-10-29T04:49:56.4936055Z distributed.core - INFO - Starting established connection
2020-10-29T04:49:56.4936921Z distributed.worker - INFO - Closing worker gracefully: tcp://127.0.0.1:56679
2020-10-29T04:49:56.4937692Z distributed.worker - INFO - Comm closed
2020-10-29T04:49:56.4938568Z distributed.scheduler - INFO - Retire workers {<Worker 'tcp://127.0.0.1:56679', name: 1, memory: 1, processing: 99>}
2020-10-29T04:49:56.4939523Z distributed.scheduler - INFO - Moving 1 keys to other workers
2020-10-29T04:49:56.4940339Z distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:56679
2020-10-29T04:49:56.4941303Z distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:56679', name: 1, memory: 1, processing: 99>
2020-10-29T04:49:56.4942379Z distributed.core - INFO - Removing comms to tcp://127.0.0.1:56679
2020-10-29T04:49:56.4943384Z distributed.scheduler - INFO - Remove client Client-1c9286f8-19a2-11eb-9780-000d3ae52c36
2020-10-29T04:49:56.4944621Z distributed.scheduler - INFO - Remove client Client-1c9286f8-19a2-11eb-9780-000d3ae52c36
2020-10-29T04:49:56.4945929Z distributed.scheduler - INFO - Close client connection: Client-1c9286f8-19a2-11eb-9780-000d3ae52c36
2020-10-29T04:49:56.4947067Z distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:56677
2020-10-29T04:49:56.4948022Z distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:56677', name: 0, memory: 0, processing: 0>
2020-10-29T04:49:56.4948947Z distributed.core - INFO - Removing comms to tcp://127.0.0.1:56677
2020-10-29T04:49:56.4949701Z distributed.scheduler - INFO - Lost all workers
2020-10-29T04:49:56.4950440Z distributed.worker - INFO - Comm closed
2020-10-29T04:49:56.4951171Z distributed.scheduler - INFO - Scheduler closing...
2020-10-29T04:49:56.5213428Z distributed.scheduler - INFO - Scheduler closing all comms

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions