Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1783,7 +1783,7 @@ def __init__(self, scheduler, **kwargs):
self.last = 0
self.source = ColumnDataSource(
{
"time": [time() - 20, time()],
"time": [time() - 60, time()],

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is obviously unrelated. this slows down the stealing dashboard a bit. 20s rolling window is very fast

"level": [0, 15],
"color": ["white", "white"],
"duration": [0, 0],
Expand Down Expand Up @@ -1828,7 +1828,7 @@ def convert(self, msgs):
"""Convert a log message to a glyph"""
total_duration = 0
for msg in msgs:
time, level, key, duration, sat, occ_sat, idl, occ_idl = msg
time, level, key, duration, sat, occ_sat, idl, occ_idl = msg[:8]
total_duration += duration

try:
Expand Down
10 changes: 7 additions & 3 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,22 @@ def steal_time_ratio(self, ts: TaskState) -> tuple[float, int] | tuple[None, Non
assert ts.processing_on
ws = ts.processing_on
compute_time = ws.processing[ts]
if compute_time < 0.005: # 5ms, just give up

if not compute_time:
# occupancy/ws.proccessing[ts] is only allowed to be zero for
# long running tasks which cannot be stolen
assert ts in ws.long_running
return None, None

nbytes = ts.get_nbytes_deps()
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
cost_multiplier = transfer_time / compute_time
if cost_multiplier > 100:
return None, None

level = int(round(log2(cost_multiplier) + 6))
if level < 1:
level = 1
elif level >= len(self.cost_multipliers):
return None, None

return cost_multiplier, level

Expand Down
26 changes: 26 additions & 0 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from operator import mul
from time import sleep

import numpy as np
import pytest
from tlz import sliding_window

Expand Down Expand Up @@ -1350,3 +1351,28 @@ def test_steal_worker_state(ws_with_running_task):
assert "x" not in ws.tasks
assert "x" not in ws.data
assert ws.available_resources == {"R": 1}


@pytest.mark.slow()
@gen_cluster(nthreads=[("", 1)] * 4, client=True)
async def test_steal_very_fast_tasks(c, s, *workers):
# Ensure that very fast tasks are allowed to be stolen
root = dask.delayed(lambda n: "x" * n)(
dask.utils.parse_bytes("1MiB"), dask_key_name="root"
)

@dask.delayed
def func(*args):
import time

time.sleep(0.002)

ntasks = 1000
results = [func(root, i) for i in range(ntasks)]
futs = c.compute(results)
await c.gather(futs)

ntasks_per_worker = np.array([len(w.data) for w in workers])
ideal = ntasks / len(workers)
assert (ntasks_per_worker > ideal * 0.5).all(), (ideal, ntasks_per_worker)
assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker)