Skip to content

Update outdated work stealing docs regarding worker restrictions#9214

Merged
crusaderky merged 1 commit into
dask:mainfrom
kevinziroldi:fix-work-stealing-docs
May 4, 2026
Merged

Update outdated work stealing docs regarding worker restrictions#9214
crusaderky merged 1 commit into
dask:mainfrom
kevinziroldi:fix-work-stealing-docs

Conversation

@kevinziroldi

Copy link
Copy Markdown
Contributor

Closes #9129

The documentation in work-stealing.rst stated that work stealing is not enabled for tasks that have been specifically restricted to run on particular workers.
After the closure of #3069 (but also #1389 and #2740), work stealing has been enabled in presence of restrictions, provided the thief meets them.
I updated the text to accurately reflect the modern scheduler behavior.

Since the person who opened #9129 was still not sure about this behavior, I verified it on the current version of Dask. Here are the local test scripts I used to prove that stealing correctly occurs among a restricted subset of workers and in presence of resource restrictions:

import time
from dask.distributed import Client, LocalCluster
import logging

def variable_task(x):
    # Tasks 0-24 are very slow, Tasks 25-49 are very fast
    if x < 25:
        time.sleep(1.0)
    else:
        time.sleep(0.01)
    return x

if __name__ == "__main__":
    # Create cluster and client
    cluster = LocalCluster(n_workers=4, threads_per_worker=1)
    client = Client(cluster)
    
    # Get all workers and pick a strict subset of two
    workers = list(client.scheduler_info()['workers'].keys())
    subset = workers[:2]
    
    print(f"Restricting tasks strictly to: {subset}\n")

    # Map 50 imbalanced tasks
    futures = client.map(
        variable_task, 
        range(50), 
        workers=subset, 
        allow_other_workers=False
    )
    
    # Wait for everything to finish
    client.gather(futures)
    
    # Suppress the Dask "Removing worker caused cluster to lose tasks" warnings
    logging.getLogger("distributed.scheduler").setLevel(logging.ERROR)
    
    # Check the scheduler's internal event log for stealing
    stealing_events = client.get_events('stealing')
    
    if stealing_events:
        print(f"\nRecorded {len(stealing_events)} work stealing events.")
        
        # Extract individual steal tuples
        all_steals = []
        for event in stealing_events:
            event_data = event[-1]
            if isinstance(event_data, tuple) and event_data[0] == 'request':
                all_steals.extend(event_data[1])

        for steal in all_steals:
            print(f"Stolen Task: {steal[2]}")
            print(f"Victim:      {steal[4]}")
            print(f"Thief:       {steal[6]}")
        
        if all_steals:
            # Verify no thief was outside our subset
            thieves = set(s[6] for s in all_steals)
            outside_thieves = thieves - set(subset)
            
            print(f"\nSubset allowed: {subset}")
            print(f"Unique Thieves: {list(thieves)}")
            
            if not outside_thieves:
                print("All thieves were strictly within the allowed subset!")
            else:
                print("Error: A worker outside the subset stole a task!")
    else:
        print("No stealing occurred.")
import time
import logging
from dask.distributed import Client

def variable_task(x):
    if x < 25:
        time.sleep(1.0)
    else:
        time.sleep(0.01)
    return x

if __name__ == "__main__":
    logging.getLogger("distributed.scheduler").setLevel(logging.ERROR)

    client = Client("tcp://127.0.0.1:8786")

    info = client.scheduler_info()["workers"]
    name_to_addr = {meta["name"]: addr for addr, meta in info.items()}

    w1 = name_to_addr["w1"]
    w2 = name_to_addr["w2"]
    w3 = name_to_addr["w3"]

    print("Workers:")
    print("w1 =", w1)
    print("w2 =", w2)
    print("w3 =", w3, "(ENERGY=0, should never steal)")

    futures = client.map(
        variable_task,
        range(50),
        resources={"ENERGY": 10},
    )
    client.gather(futures)

    stealing_events = client.get_events("stealing")

    all_steals = []
    for event in stealing_events:
        payload = event[-1]
        if isinstance(payload, tuple) and payload[0] == "request":
            all_steals.extend(payload[1])

    print(f"\nTotal steals found: {len(all_steals)}")

    thieves = {s[6] for s in all_steals}
    print("Unique thief addresses:", thieves)

    if w3 in thieves:
        print("Error: w3 stole work despite ENERGY=0")
    else:
        print("Success: w3 never stole work")

    client.close()

@kevinziroldi kevinziroldi requested a review from fjetter as a code owner March 23, 2026 10:01
@github-actions

Copy link
Copy Markdown
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    31 files  ±0      31 suites  ±0   11h 25m 36s ⏱️ + 8m 11s
 4 113 tests ±0   4 000 ✅  - 3    104 💤 ±0  9 ❌ +4 
59 636 runs  ±0  57 154 ✅  - 3  2 473 💤 ±0  9 ❌ +4 

For more details on these failures, see this check.

Results for commit 6338599. ± Comparison against base commit 595d591.

@crusaderky crusaderky left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@crusaderky crusaderky merged commit 21bbe27 into dask:main May 4, 2026
28 of 38 checks passed
@crusaderky crusaderky added the documentation Improve or add to documentation label May 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improve or add to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

documentation not updated following #3029

2 participants