From 77454f5fd6f48cf50871c6e7adef16338f0b1ff1 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 16 May 2022 14:49:03 +0100 Subject: [PATCH] Remove transition-counter-max from config --- distributed/distributed-schema.yaml | 7 ------- distributed/distributed.yaml | 4 ---- distributed/scheduler.py | 7 ++++--- distributed/tests/test_scheduler.py | 3 ++- distributed/tests/test_stress.py | 6 ++++-- distributed/utils_test.py | 15 ++++++++++++--- distributed/worker.py | 5 ++--- 7 files changed, 24 insertions(+), 23 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 86e9acf6b6b..942eab8ff9a 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -1027,13 +1027,6 @@ properties: type: boolean description: Enter Python Debugger on scheduling error - transition-counter-max: - oneOf: - - enum: [false] - - type: integer - description: Cause the scheduler or workers to break if they reach this - number of transitions - system-monitor: type: object description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 649ddfe2b33..74d59addb35 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -277,10 +277,6 @@ distributed: log-length: 10000 # default length of logs to keep in memory log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s' pdb-on-err: False # enter debug mode on scheduling error - # Cause scheduler and workers to break if they reach this many transitions. - # Used to debug infinite transition loops. - # Note: setting this will cause healthy long-running services to eventually break. - transition-counter-max: False system-monitor: interval: 500ms event-loop: tornado diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 110c50bbe47..c9ddc2ec67c 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1283,6 +1283,7 @@ def __init__( unrunnable: set, validate: bool, plugins: Iterable[SchedulerPlugin] = (), + transition_counter_max: int | Literal[False] = False, **kwargs, # Passed verbatim to Server.__init__() ): logger.info("State start") @@ -1338,9 +1339,7 @@ def __init__( / 2.0 ) self.transition_counter = 0 - self.transition_counter_max = dask.config.get( - "distributed.admin.transition-counter-max" - ) + self.transition_counter_max = transition_counter_max @property def memory(self) -> MemoryState: @@ -2878,6 +2877,7 @@ def __init__( preload_argv=(), plugins=(), contact_address=None, + transition_counter_max=False, **kwargs, ): self._setup_logging(logger) @@ -3109,6 +3109,7 @@ def __init__( unrunnable=unrunnable, validate=validate, plugins=plugins, + transition_counter_max=transition_counter_max, ) ServerNode.__init__( self, diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index dbbf5c55ded..5c4056cfcd7 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3264,7 +3264,8 @@ async def test_transition_counter_max_worker(c, s, a): @gen_cluster( client=True, nthreads=[("", 1)], - config={"distributed.admin.transition-counter-max": False}, + scheduler_kwargs={"transition_counter_max": False}, + worker_kwargs={"transition_counter_max": False}, ) async def test_disable_transition_counter_max(c, s, a, b): """Test that the cluster can run indefinitely if transition_counter_max is disabled. diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 0444c536ec6..001d20c6688 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -219,7 +219,8 @@ async def test_stress_steal(c, s, *workers): nthreads=[("", 1)] * 10, client=True, timeout=180, - config={"distributed.admin.transition-counter-max": 500_000}, + scheduler_kwargs={"transition_counter_max": 500_000}, + worker_kwargs={"transition_counter_max": 500_000}, ) async def test_close_connections(c, s, *workers): da = pytest.importorskip("dask.array") @@ -287,7 +288,8 @@ async def test_no_delay_during_large_transfer(c, s, w): client=True, Worker=Nanny, nthreads=[("", 2)] * 6, - config={"distributed.admin.transition-counter-max": 500_000}, + scheduler_kwargs={"transition_counter_max": 500_000}, + worker_kwargs={"transition_counter_max": 500_000}, ) async def test_chaos_rechunk(c, s, *workers): s.allowed_failures = 10000 diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 3b46a9b6a2a..fc75ea18177 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -989,12 +989,21 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture timeout = 3600 scheduler_kwargs = merge( - {"dashboard": False, "dashboard_address": ":0"}, scheduler_kwargs + dict( + dashboard=False, + dashboard_address=":0", + transition_counter_max=50_000, + ), + scheduler_kwargs, ) worker_kwargs = merge( - {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 15}, worker_kwargs + dict( + memory_limit=system.MEMORY_LIMIT, + death_timeout=15, + transition_counter_max=50_000, + ), + worker_kwargs, ) - config = merge({"distributed.admin.transition-counter-max": 50_000}, config) def _(func): if not iscoroutinefunction(func): diff --git a/distributed/worker.py b/distributed/worker.py index f45edb041e2..57a872455fd 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -548,6 +548,7 @@ def __init__( lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, + transition_counter_max: int | Literal[False] = False, ################################### # Parameters to WorkerMemoryManager memory_limit: str | float = "auto", @@ -610,9 +611,7 @@ def __init__( self.validate = validate self.transition_counter = 0 - self.transition_counter_max = dask.config.get( - "distributed.admin.transition-counter-max" - ) + self.transition_counter_max = transition_counter_max self.incoming_transfer_log = deque(maxlen=100000) self.incoming_count = 0 self.outgoing_transfer_log = deque(maxlen=100000)