Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1027,13 +1027,6 @@ properties:
type: boolean
description: Enter Python Debugger on scheduling error

transition-counter-max:
oneOf:
- enum: [false]
- type: integer
description: Cause the scheduler or workers to break if they reach this
number of transitions

system-monitor:
type: object
description: |
Expand Down
4 changes: 0 additions & 4 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,6 @@ distributed:
log-length: 10000 # default length of logs to keep in memory
log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
pdb-on-err: False # enter debug mode on scheduling error
# Cause scheduler and workers to break if they reach this many transitions.
# Used to debug infinite transition loops.
# Note: setting this will cause healthy long-running services to eventually break.
transition-counter-max: False
system-monitor:
interval: 500ms
event-loop: tornado
Expand Down
7 changes: 4 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ def __init__(
unrunnable: set,
validate: bool,
plugins: Iterable[SchedulerPlugin] = (),
transition_counter_max: int | Literal[False] = False,
**kwargs, # Passed verbatim to Server.__init__()
):
logger.info("State start")
Expand Down Expand Up @@ -1338,9 +1339,7 @@ def __init__(
/ 2.0
)
self.transition_counter = 0
self.transition_counter_max = dask.config.get(
"distributed.admin.transition-counter-max"
)
self.transition_counter_max = transition_counter_max

@property
def memory(self) -> MemoryState:
Expand Down Expand Up @@ -2878,6 +2877,7 @@ def __init__(
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
**kwargs,
):
self._setup_logging(logger)
Expand Down Expand Up @@ -3109,6 +3109,7 @@ def __init__(
unrunnable=unrunnable,
validate=validate,
plugins=plugins,
transition_counter_max=transition_counter_max,
)
ServerNode.__init__(
self,
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3264,7 +3264,8 @@ async def test_transition_counter_max_worker(c, s, a):
@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.admin.transition-counter-max": False},
scheduler_kwargs={"transition_counter_max": False},
worker_kwargs={"transition_counter_max": False},
)
async def test_disable_transition_counter_max(c, s, a, b):
"""Test that the cluster can run indefinitely if transition_counter_max is disabled.
Expand Down
6 changes: 4 additions & 2 deletions distributed/tests/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ async def test_stress_steal(c, s, *workers):
nthreads=[("", 1)] * 10,
client=True,
timeout=180,
config={"distributed.admin.transition-counter-max": 500_000},
scheduler_kwargs={"transition_counter_max": 500_000},
worker_kwargs={"transition_counter_max": 500_000},
)
async def test_close_connections(c, s, *workers):
da = pytest.importorskip("dask.array")
Expand Down Expand Up @@ -287,7 +288,8 @@ async def test_no_delay_during_large_transfer(c, s, w):
client=True,
Worker=Nanny,
nthreads=[("", 2)] * 6,
config={"distributed.admin.transition-counter-max": 500_000},
scheduler_kwargs={"transition_counter_max": 500_000},
worker_kwargs={"transition_counter_max": 500_000},
)
async def test_chaos_rechunk(c, s, *workers):
s.allowed_failures = 10000
Expand Down
15 changes: 12 additions & 3 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,12 +989,21 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture
timeout = 3600

scheduler_kwargs = merge(
{"dashboard": False, "dashboard_address": ":0"}, scheduler_kwargs
dict(
dashboard=False,
dashboard_address=":0",
transition_counter_max=50_000,
),
scheduler_kwargs,
)
worker_kwargs = merge(
{"memory_limit": system.MEMORY_LIMIT, "death_timeout": 15}, worker_kwargs
dict(
memory_limit=system.MEMORY_LIMIT,
death_timeout=15,
transition_counter_max=50_000,
),
worker_kwargs,
)
config = merge({"distributed.admin.transition-counter-max": 50_000}, config)

def _(func):
if not iscoroutinefunction(func):
Expand Down
5 changes: 2 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ def __init__(
lifetime: Any | None = None,
lifetime_stagger: Any | None = None,
lifetime_restart: bool | None = None,
transition_counter_max: int | Literal[False] = False,
###################################
# Parameters to WorkerMemoryManager
memory_limit: str | float = "auto",
Expand Down Expand Up @@ -610,9 +611,7 @@ def __init__(
self.validate = validate

self.transition_counter = 0
self.transition_counter_max = dask.config.get(
"distributed.admin.transition-counter-max"
)
self.transition_counter_max = transition_counter_max
self.incoming_transfer_log = deque(maxlen=100000)
self.incoming_count = 0
self.outgoing_transfer_log = deque(maxlen=100000)
Expand Down