Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ def __init__(
("waiting", "flight"): self.transition_waiting_flight,
("ready", "executing"): self.transition_ready_executing,
("ready", "memory"): self.transition_ready_memory,
("ready", "error"): self.transition_ready_error,
("ready", "waiting"): self.transition_ready_waiting,
("constrained", "waiting"): self.transition_ready_waiting,
("constrained", "executing"): self.transition_constrained_executing,
("executing", "memory"): self.transition_executing_done,
("executing", "error"): self.transition_executing_done,
Expand Down Expand Up @@ -1445,8 +1448,10 @@ def add_task(
**kwargs2,
):
try:
runspec = SerializedTask(function, args, kwargs, task)
if key in self.tasks:
ts = self.tasks[key]
ts.runspec = runspec
if ts.state == "memory":
assert key in self.data or key in self.actors
logger.debug(
Expand Down Expand Up @@ -1475,6 +1480,7 @@ def add_task(
if actor:
self.actors[ts.key] = None

ts.runspec = runspec
ts.priority = priority
ts.duration = duration
if resource_restrictions:
Expand Down Expand Up @@ -1560,13 +1566,14 @@ def transition_waiting_flight(self, ts, worker=None):
pdb.set_trace()
raise

def transition_flight_waiting(self, ts, worker=None, remove=True):
def transition_flight_waiting(self, ts, worker=None, remove=True, runspec=None):
try:
if self.validate:
assert ts.state == "flight"

self.in_flight_tasks -= 1
ts.coming_from = None
ts.runspec = runspec or ts.runspec
if remove:
try:
ts.who_has.remove(worker)
Expand Down Expand Up @@ -1691,9 +1698,33 @@ def transition_ready_executing(self, ts):
pdb.set_trace()
raise

def transition_ready_error(self, ts):
if self.validate:
assert ts.exception is not None
assert ts.traceback is not None
self.send_task_state_to_scheduler(ts)

def transition_ready_memory(self, ts, value=None):
if value:
self.put_key_in_memory(ts, value=value)
self.send_task_state_to_scheduler(ts)

def transition_ready_waiting(self, ts):
"""
This transition is common for work stealing
"""
ts.state = "waiting"
ts.runspec = None
# FIXME GH4413 Replace with recommended transition to forgotten
# If there are no dependents anymore, this has been a mere dependency
# and it was not intended to be executed on this worker. In this case we
# need to release the key ourselves since the scheduler will no longer
# propagate the deletion to this worker
if not ts.dependents:
self.release_key(ts.key)
return
return ts.state

def transition_constrained_executing(self, ts):
self.transition_ready_executing(ts)
for resource, quantity in ts.resource_restrictions.items():
Expand Down Expand Up @@ -2241,20 +2272,24 @@ def steal_request(self, key):
# There may be a race condition between stealing and releasing a task.
# In this case the self.tasks is already cleared. The `None` will be
# registered as `already-computing` on the other end
ts = self.tasks.get(key)
if key in self.tasks:
state = self.tasks[key].state
state = ts.state
else:
state = None

response = {"op": "steal-response", "key": key, "state": state}
self.batched_stream.send(response)

if state in ("ready", "waiting", "constrained"):
ts.runspec = None
self.release_key(key)

def release_key(self, key, cause=None, reason=None, report=True):
try:
ts = self.tasks.get(key, TaskState(key=key))
if self.validate:
assert not ts.dependents
if cause:
self.log.append((key, "release-key", {"cause": cause}))
else:
Expand Down Expand Up @@ -2530,8 +2565,15 @@ async def execute(self, key, report=False):
if key not in self.tasks:
return
ts = self.tasks[key]
if ts.state != "executing" or ts.runspec is None:
if ts.state != "executing":
# This might happen if keys are canceled
logger.debug(
"Trying to execute a task %s which is not in executing state anymore"
% ts
)
return
if ts.runspec is None:
logger.critical("No runspec available for task %s." % ts)
if self.validate:
assert not ts.waiting_for_data
assert ts.state == "executing"
Expand Down Expand Up @@ -2581,7 +2623,17 @@ async def execute(self, key, report=False):
executor_error = e
raise

if ts.state not in ("executing", "long-running"):
# We'll need to check again for the task state since it may have
# changed since the execution was kicked off. In particular, it may
# have been canceled and released already in which case we'll have
# to drop the result immediately
key = ts.key
ts = self.tasks.get(key)

if ts is None:
logger.debug(
"Dropping result for %s since task has already been released." % key
)
return

result["key"] = ts.key
Expand Down Expand Up @@ -2874,13 +2926,14 @@ def _notify_plugins(self, method_name, *args, **kwargs):

def validate_task_memory(self, ts):
assert ts.key in self.data or ts.key in self.actors
assert ts.nbytes is not None
assert isinstance(ts.nbytes, int)
assert not ts.waiting_for_data
assert ts.key not in self.ready
assert ts.state == "memory"

def validate_task_executing(self, ts):
assert ts.state == "executing"
assert ts.runspec is not None
assert ts.key not in self.data
assert not ts.waiting_for_data
assert all(
Expand Down