Skip to content

KeyError in Worker.handle_compute_task (causes deadlock) #5482

Description

@gjoseph92
Traceback (most recent call last):
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1237, in handle_scheduler
    await self.handle_stream(
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 564, in handle_stream
    handler(**merge(extra, msg))
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1937, in handle_compute_task
    self.tasks[key].nbytes = value
KeyError: "('slices-40d6794b777d639e9440f8f518224cfd', 2, 1)"
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...

I may be able to reproduce this if necessary. I was running a stackstac example notebook on binder against a Coiled cluster over wss, where the particular versions of things were causing a lot of errors (unrelated to dask). So I was frequently rerunning the same tasks, cancelling them, restarting the client, rerunning, etc. Perhaps this cancelling, restarting, rerunning is related?

@fjetter says

The only reason this keyerror can appear is if the compute instruction transitions its own dependency into a forgotten state which is very, very wrong.

Relevant code, ending at the line where the error occurs:

def handle_compute_task(
self,
*,
key,
function=None,
args=None,
kwargs=None,
task=no_value,
who_has=None,
nbytes=None,
priority=None,
duration=None,
resource_restrictions=None,
actor=False,
annotations=None,
stimulus_id=None,
):
self.log.append((key, "compute-task", stimulus_id, time()))
try:
ts = self.tasks[key]
logger.debug(
"Asked to compute an already known task %s",
{"task": ts, "stimulus_id": stimulus_id},
)
except KeyError:
self.tasks[key] = ts = TaskState(key)
ts.runspec = SerializedTask(function, args, kwargs, task)
if priority is not None:
priority = tuple(priority) + (self.generation,)
self.generation -= 1
if actor:
self.actors[ts.key] = None
ts.exception = None
ts.traceback = None
ts.exception_text = ""
ts.traceback_text = ""
ts.priority = priority
ts.duration = duration
if resource_restrictions:
ts.resource_restrictions = resource_restrictions
ts.annotations = annotations
recommendations = {}
scheduler_msgs = []
for dependency in who_has:
dep_ts = self.ensure_task_exists(
key=dependency,
stimulus_id=stimulus_id,
priority=priority,
)
# link up to child / parents
ts.dependencies.add(dep_ts)
dep_ts.dependents.add(ts)
if ts.state in READY | {"executing", "waiting", "resumed"}:
pass
elif ts.state == "memory":
recommendations[ts] = "memory"
scheduler_msgs.append(self.get_task_state_for_scheduler(ts))
elif ts.state in {
"released",
"fetch",
"flight",
"missing",
"cancelled",
"error",
}:
recommendations[ts] = "waiting"
else:
raise RuntimeError(f"Unexpected task state encountered {ts} {stimulus_id}")
for msg in scheduler_msgs:
self.batched_stream.send(msg)
self.transitions(recommendations, stimulus_id=stimulus_id)
# We received new info, that's great but not related to the compute-task
# instruction
self.update_who_has(who_has, stimulus_id=stimulus_id)
if nbytes is not None:
for key, value in nbytes.items():
self.tasks[key].nbytes = value

Scheduler code producing the message which causes this error:

def _task_to_msg(state: SchedulerState, ts: TaskState, duration: double = -1) -> dict:
"""Convert a single computational task to a message"""
ws: WorkerState
dts: TaskState
# FIXME: The duration attribute is not used on worker. We could safe ourselves the time to compute and submit this
if duration < 0:
duration = state.get_task_duration(ts)
msg: dict = {
"op": "compute-task",
"key": ts._key,
"priority": ts._priority,
"duration": duration,
"stimulus_id": f"compute-task-{time()}",
"who_has": {},
}
if ts._resource_restrictions:
msg["resource_restrictions"] = ts._resource_restrictions
if ts._actor:
msg["actor"] = True
deps: set = ts._dependencies
if deps:
msg["who_has"] = {
dts._key: [ws._address for ws in dts._who_has] for dts in deps
}
msg["nbytes"] = {dts._key: dts._nbytes for dts in deps}
if state._validate:
assert all(msg["who_has"].values())
task = ts._run_spec
if type(task) is dict:
msg.update(task)
else:
msg["task"] = task
if ts._annotations:
msg["annotations"] = ts._annotations
return msg

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions