diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 890e30c027f..8c260609638 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -135,10 +135,11 @@ def needs_cpu(self): Notes ----- Returns ``True`` if the occupancy per core is some factor larger - than ``startup_cost``. + than ``startup_cost`` and the number of tasks exceeds the number of + cores """ total_occupancy = self.scheduler.total_occupancy - total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()]) + total_cores = self.scheduler.total_ncores if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2: logger.info( @@ -146,9 +147,22 @@ def needs_cpu(self): total_occupancy, total_cores, ) - return True - else: - return False + + tasks_processing = 0 + + for w in self.scheduler.workers.values(): + tasks_processing += len(w.processing) + + if tasks_processing > total_cores: + logger.info( + "pending tasks exceed number of cores " "[%d tasks / %d cores]", + tasks_processing, + total_cores, + ) + + return True + + return False def needs_memory(self): """ diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 1d8a48bf7fc..50c4f0a45a3 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -2,7 +2,6 @@ from time import sleep -import pytest from toolz import frequencies, pluck from tornado import gen from tornado.ioloop import IOLoop @@ -331,7 +330,6 @@ def test_adapt_down(): yield cluster.close() -@pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks") @gen_test(timeout=30) def test_no_more_workers_than_tasks(): loop = IOLoop.current()