From 1a97a2fd8b176d858cfef2fc96c36f43afce6cc4 Mon Sep 17 00:00:00 2001 From: Michael Delgado Date: Wed, 31 Oct 2018 19:32:06 -0700 Subject: [PATCH 1/9] adaptive: recommend close workers if idle --- distributed/deploy/adaptive.py | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 62d308c6e22..35f325a2b1c 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -278,36 +278,31 @@ def get_scale_up_kwargs(self): return {'n': instances} def recommendations(self, comm=None): - should_scale_up = self.should_scale_up() - workers = set(self.workers_to_close(key=self.worker_key, - minimum=self.minimum)) - if should_scale_up and workers: - logger.info("Attempting to scale up and scale down simultaneously.") - self.close_counts.clear() - return {'status': 'error', - 'msg': 'Trying to scale up and down simultaneously'} - - elif should_scale_up: - self.close_counts.clear() - return toolz.merge({'status': 'up'}, self.get_scale_up_kwargs()) - elif workers: + workers_to_close = set(self.workers_to_close(key=self.worker_key, + minimum=self.minimum)) + if workers_to_close: d = {} to_close = [] for w, c in self.close_counts.items(): - if w in workers: + if w in workers_to_close: if c >= self.wait_count: to_close.append(w) else: d[w] = c - for w in workers: + for w in workers_to_close: d[w] = d.get(w, 0) + 1 self.close_counts = d if to_close: return {'status': 'down', 'workers': to_close} + + elif self.should_scale_up(): + self.close_counts.clear() + return toolz.merge({'status': 'up'}, self.get_scale_up_kwargs()) + else: self.close_counts.clear() return None From 3ea0d5c880e8ea2136df5b57ba05a773cfc22eaa Mon Sep 17 00:00:00 2001 From: Michael Delgado Date: Wed, 31 Oct 2018 19:36:43 -0700 Subject: [PATCH 2/9] adaptive: check for idle workers before recommending scale up --- distributed/deploy/adaptive.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 35f325a2b1c..970a8d435af 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -168,10 +168,11 @@ def should_scale_up(self): ---- Additional workers are added whenever - 1. There are unrunnable tasks and no workers - 2. The cluster is CPU constrained - 3. The cluster is RAM constrained - 4. There are fewer workers than our minimum + 1. There are fewer workers than our minimum + 2. There are unrunnable tasks and no workers + 3. There are no idle tasks, and + a. The cluster is CPU constrained, or + b. The cluster is RAM constrained See Also -------- @@ -188,6 +189,9 @@ def should_scale_up(self): if self.scheduler.unrunnable and not self.scheduler.workers: return True + if not all(ws.processing for ws in self.scheduler.workers.values()): + return False + needs_cpu = self.needs_cpu() needs_memory = self.needs_memory() From 2487e2e3074543bacfe3d461f29e9a2a4446c96b Mon Sep 17 00:00:00 2001 From: Michael Delgado Date: Wed, 31 Oct 2018 20:28:47 -0700 Subject: [PATCH 3/9] adaptive: check for waiting tasks in should_scale_up --- distributed/deploy/adaptive.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 970a8d435af..3e602219196 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -170,7 +170,8 @@ def should_scale_up(self): 1. There are fewer workers than our minimum 2. There are unrunnable tasks and no workers - 3. There are no idle tasks, and + 3. There are no idle workers and the number of pending tasks exceeds + the number of workers, and a. The cluster is CPU constrained, or b. The cluster is RAM constrained @@ -192,6 +193,14 @@ def should_scale_up(self): if not all(ws.processing for ws in self.scheduler.workers.values()): return False + tasks_processing = sum( + (len(w.processing) for w in self.scheduler.workers.values())) + + num_workers = len(self.scheduler.workers) + + if tasks_processing <= num_workers: + return False + needs_cpu = self.needs_cpu() needs_memory = self.needs_memory() From ef70b0e62a1fe54729e3ef32a588c95b94b1baed Mon Sep 17 00:00:00 2001 From: Michael Delgado Date: Sat, 3 Nov 2018 01:57:29 -0700 Subject: [PATCH 4/9] revert to changing needs_cpu only --- distributed/deploy/adaptive.py | 67 ++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 3e602219196..f588ad36a0a 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -123,7 +123,8 @@ def needs_cpu(self): Notes ----- Returns ``True`` if the occupancy per core is some factor larger - than ``startup_cost``. + than ``startup_cost`` and the number of tasks exceeds the number of + workers """ total_occupancy = self.scheduler.total_occupancy total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()]) @@ -131,9 +132,21 @@ def needs_cpu(self): if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2: logger.info("CPU limit exceeded [%d occupancy / %d cores]", total_occupancy, total_cores) - return True - else: - return False + + tasks_processing = sum( + (len(w.processing) for w in self.scheduler.workers.values())) + + num_workers = len(self.scheduler.workers) + + if tasks_processing > num_workers: + logger.info( + "pending tasks exceed number of workers " + "[%d tasks / %d workers]", + tasks_processing, num_workers) + + return True + + return False def needs_memory(self): """ @@ -168,12 +181,10 @@ def should_scale_up(self): ---- Additional workers are added whenever - 1. There are fewer workers than our minimum - 2. There are unrunnable tasks and no workers - 3. There are no idle workers and the number of pending tasks exceeds - the number of workers, and - a. The cluster is CPU constrained, or - b. The cluster is RAM constrained + 1. There are unrunnable tasks and no workers + 2. The cluster is CPU constrained + 3. The cluster is RAM constrained + 4. There are fewer workers than our minimum See Also -------- @@ -190,17 +201,6 @@ def should_scale_up(self): if self.scheduler.unrunnable and not self.scheduler.workers: return True - if not all(ws.processing for ws in self.scheduler.workers.values()): - return False - - tasks_processing = sum( - (len(w.processing) for w in self.scheduler.workers.values())) - - num_workers = len(self.scheduler.workers) - - if tasks_processing <= num_workers: - return False - needs_cpu = self.needs_cpu() needs_memory = self.needs_memory() @@ -291,31 +291,36 @@ def get_scale_up_kwargs(self): return {'n': instances} def recommendations(self, comm=None): - - workers_to_close = set(self.workers_to_close(key=self.worker_key, + should_scale_up = self.should_scale_up() + workers = set(self.workers_to_close(key=self.worker_key, minimum=self.minimum)) - if workers_to_close: + if should_scale_up and workers: + logger.info("Attempting to scale up and scale down simultaneously.") + self.close_counts.clear() + return {'status': 'error', + 'msg': 'Trying to scale up and down simultaneously'} + + elif should_scale_up: + self.close_counts.clear() + return toolz.merge({'status': 'up'}, self.get_scale_up_kwargs()) + + elif workers: d = {} to_close = [] for w, c in self.close_counts.items(): - if w in workers_to_close: + if w in workers: if c >= self.wait_count: to_close.append(w) else: d[w] = c - for w in workers_to_close: + for w in workers: d[w] = d.get(w, 0) + 1 self.close_counts = d if to_close: return {'status': 'down', 'workers': to_close} - - elif self.should_scale_up(): - self.close_counts.clear() - return toolz.merge({'status': 'up'}, self.get_scale_up_kwargs()) - else: self.close_counts.clear() return None From 519785c3046b5ef9499bf99dc4b7a90140212f03 Mon Sep 17 00:00:00 2001 From: Michael Delgado Date: Sat, 3 Nov 2018 19:18:53 -0700 Subject: [PATCH 5/9] performance bump in adaptive.needs_cpu by looping through workers --- distributed/deploy/adaptive.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index f588ad36a0a..918d0c57f82 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -133,18 +133,20 @@ def needs_cpu(self): logger.info("CPU limit exceeded [%d occupancy / %d cores]", total_occupancy, total_cores) - tasks_processing = sum( - (len(w.processing) for w in self.scheduler.workers.values())) - num_workers = len(self.scheduler.workers) - if tasks_processing > num_workers: - logger.info( - "pending tasks exceed number of workers " - "[%d tasks / %d workers]", - tasks_processing, num_workers) + tasks_processing = 0 - return True + for w in self.scheduler.workers.values(): + tasks_processing += len(w.processing) + + if tasks_processing > num_workers: + logger.info( + "pending tasks exceed number of workers " + "[%d tasks / %d workers]", + tasks_processing, num_workers) + + return True return False From 62b94f0d7b112be2cd889e9f1fa0952e881d0e77 Mon Sep 17 00:00:00 2001 From: Michael Delgado Date: Fri, 7 Dec 2018 11:06:31 -0800 Subject: [PATCH 6/9] switch to checking number of cores, not workers --- distributed/deploy/adaptive.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 918d0c57f82..9a3d72c1c2d 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -124,27 +124,25 @@ def needs_cpu(self): ----- Returns ``True`` if the occupancy per core is some factor larger than ``startup_cost`` and the number of tasks exceeds the number of - workers + cores """ total_occupancy = self.scheduler.total_occupancy - total_cores = sum([ws.ncores for ws in self.scheduler.workers.values()]) + total_cores = self.scheduler.total_ncores if total_occupancy / (total_cores + 1e-9) > self.startup_cost * 2: logger.info("CPU limit exceeded [%d occupancy / %d cores]", total_occupancy, total_cores) - num_workers = len(self.scheduler.workers) - tasks_processing = 0 for w in self.scheduler.workers.values(): tasks_processing += len(w.processing) - if tasks_processing > num_workers: + if tasks_processing > total_cores: logger.info( - "pending tasks exceed number of workers " - "[%d tasks / %d workers]", - tasks_processing, num_workers) + "pending tasks exceed number of cores " + "[%d tasks / %d cores]", + tasks_processing, total_cores) return True From 9c164055da321c4352f371f52d552968bd8698fe Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 24 Apr 2019 11:27:46 -0400 Subject: [PATCH 7/9] apply black --- distributed/deploy/adaptive.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 5df01ad6688..8c260609638 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -155,9 +155,10 @@ def needs_cpu(self): if tasks_processing > total_cores: logger.info( - "pending tasks exceed number of cores " - "[%d tasks / %d cores]", - tasks_processing, total_cores) + "pending tasks exceed number of cores " "[%d tasks / %d cores]", + tasks_processing, + total_cores, + ) return True From a085e8ee5ecbb1d92d78a2d850c1047e312d5c62 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 24 Apr 2019 12:03:36 -0400 Subject: [PATCH 8/9] remove xfail --- distributed/deploy/tests/test_adaptive.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 1d8a48bf7fc..44818d4c67a 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -331,7 +331,6 @@ def test_adapt_down(): yield cluster.close() -@pytest.mark.xfail(reason="we currently only judge occupancy, not ntasks") @gen_test(timeout=30) def test_no_more_workers_than_tasks(): loop = IOLoop.current() From 3bb8e25adcf2c7060a72542f54fa226338d0f336 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 24 Apr 2019 12:49:13 -0400 Subject: [PATCH 9/9] flake --- distributed/deploy/tests/test_adaptive.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 44818d4c67a..50c4f0a45a3 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -2,7 +2,6 @@ from time import sleep -import pytest from toolz import frequencies, pluck from tornado import gen from tornado.ioloop import IOLoop