diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ad37548766b..cc00d815ce3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -6697,7 +6697,7 @@ async def get_story(self, keys=()): transition_story = story - def reschedule(self, key=None, worker=None): + def reschedule(self, key: str, worker=None, *, stimulus_id: str): """Reschedule a task Things may have shifted and this task may now be better suited to run @@ -6715,7 +6715,7 @@ def reschedule(self, key=None, worker=None): return if worker and ts.processing_on.address != worker: return - self.transitions({key: "released"}, f"reschedule-{time()}") + self.transitions({key: "released"}, stimulus_id) ##################### # Utility functions # diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 784448c588c..e709e624e05 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1481,7 +1481,7 @@ async def test_reschedule(c, s, a, b): await asyncio.sleep(0.001) for future in x: - s.reschedule(key=future.key) + s.reschedule(future.key, stimulus_id="test") # Worker b gets more of the original tasks await wait(x) @@ -1492,7 +1492,7 @@ async def test_reschedule(c, s, a, b): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2) async def test_reschedule_warns(c, s, a, b): with captured_logger(logging.getLogger("distributed.scheduler")) as sched: - s.reschedule(key="__this-key-does-not-exist__") + s.reschedule("__this-key-does-not-exist__", stimulus_id="test") assert "not found on the scheduler" in sched.getvalue() assert "Aborting reschedule" in sched.getvalue() @@ -3301,7 +3301,7 @@ async def test_set_restrictions(c, s, a, b): await f s.set_restrictions(worker={f.key: a.address}) assert s.tasks[f.key].worker_restrictions == {a.address} - s.reschedule(f) + s.reschedule(f, stimulus_id="test") await f diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 9fc2420d6ba..fa61b0403d0 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1109,7 +1109,7 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers): steal.move_task_request(victim_ts, wsA, wsB) - s.reschedule(victim_key) + s.reschedule(victim_key, stimulus_id="test") await c.gather(futs1) del futs1 @@ -1175,7 +1175,7 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): steal.move_task_request(victim_ts, wsA, wsB) s.set_restrictions(worker={victim_key: [wsB.address]}) - s.reschedule(victim_key) + s.reschedule(victim_key, stimulus_id="test") assert wsB == victim_ts.processing_on # move_task_request is not responsible for respecting worker restrictions steal.move_task_request(victim_ts, wsB, wsC)