Skip to content
Merged
24 changes: 19 additions & 5 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,34 @@ def needs_cpu(self):
Notes
-----
Returns ``True`` if the occupancy per core is some factor larger
than ``startup_cost``.
than ``startup_cost`` and the number of tasks exceeds the number of
cores
"""
total_occupancy = self.scheduler.total_occupancy
total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()])
total_cores = self.scheduler.total_ncores

if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2:
logger.info(
"CPU limit exceeded [%d occupancy / %d cores]",
total_occupancy,
total_cores,
)
return True
else:
return False

tasks_processing = 0

for w in self.scheduler.workers.values():
tasks_processing += len(w.processing)

if tasks_processing > total_cores:
logger.info(
"pending tasks exceed number of cores " "[%d tasks / %d cores]",
tasks_processing,
total_cores,
)

return True

return False

def needs_memory(self):
"""
Expand Down
2 changes: 0 additions & 2 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from time import sleep

import pytest
from toolz import frequencies, pluck
from tornado import gen
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -331,7 +330,6 @@ def test_adapt_down():
yield cluster.close()


@pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks")
@gen_test(timeout=30)
def test_no_more_workers_than_tasks():
loop = IOLoop.current()
Expand Down