diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 6faeced12f..6098533dcd 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1,3 +1,4 @@ +import abc import functools import itertools import logging @@ -9,7 +10,9 @@ from collections import deque import collections.abc from contextlib import contextmanager, closing +from functools import partial +import immutables from contextvars import copy_context from math import inf from time import perf_counter @@ -124,8 +127,222 @@ def deadline_to_sleep_time(self, deadline): ################################################################ +class CancelLogic(metaclass=abc.ABCMeta): + @abc.abstractmethod + def enter_binding(self, binding): + raise NotImplementedError + + @abc.abstractmethod + def leave_binding(self, binding): + raise NotImplementedError + + @abc.abstractmethod + def compute_effective_deadline( + self, parent_effective_deadline, parent_extra_info, task + ): + raise NotImplementedError + + +class PropagateOnly(CancelLogic): + def enter_binding(self, binding): + pass + + def leave_binding(self, binding): + pass + + def compute_effective_deadline( + self, parent_effective_deadline, parent_extra_info, task + ): + return parent_effective_deadline, parent_extra_info + + def __repr__(self): + return "PropagateOnly" + + +@attr.s(cmp=False, repr=False, slots=True) +class CancelBinding: + _logic = attr.ib() + _task = attr.ib(default=None, init=False) + _effective_deadline = attr.ib(default=inf, init=False) + _catch_cancel = attr.ib(default=False, init=False) + _extra_info = attr.ib(default=immutables.Map(), init=False) + _parent = attr.ib(default=None, init=False) + _children = attr.ib(factory=set, init=False) + cancelled_caught = attr.ib(default=False, init=False) + + def __getattr__(self, name): + if not name.startswith("_"): + return getattr(self._logic, name) + raise AttributeError(name) + + def __setattr__(self, name, value): + if not name.startswith("_"): + try: + setattr(self._logic, name, value) + return + except AttributeError: + pass + object.__setattr__(self, name, value) + + def __repr__(self): + bits = ["cancel binding {:#x} for {!r}".format(id(self), self._logic)] + if self._task is None: + if self.cancelled_caught: + bits.append("cancelled") + else: + bits.append("unbound") + else: + if self._task._active_cancel_binding is not self: + bits.append("shadowed") + else: + bits.append("active") + try: + now = current_time() + except RuntimeError: # must be called from async context + bits.append("deadline unknown outside trio thread") + else: + stamp = self._effective_deadline + if stamp == inf: + bits.append("no deadline") + elif stamp <= now: + bits.append( + "cancel requested {:.2f}sec ago".format(now - stamp) + ) + else: + bits.append("cancel in {:.2f}sec".format(stamp - now)) + return "<{}>".format(", ".join(bits)) + + def _open(self, task): + # This is __enter__, but with the task specifiable, so that + # Runner.spawn_impl() can use it to create the root binding + # of each task as a child of its nursery's binding. + self._task = task + self._catch_cancel = False + self.cancelled_caught = False + self._parent = task._active_cancel_binding + self._parent._children.add(self) + self._task._active_cancel_binding = self + self._logic.enter_binding(self) + self.update_effective_deadline() + + def _close(self, exc): + # This is __exit__, except it returns the exception instead of + # raising it, which helps eliminate traceback frames in + # each nursery (NurseryManager calls _close() directly). + if exc is not None: + exc = MultiError.filter(self._exc_filter, exc) + assert self._parent is not None and not self._children + self._logic.leave_binding(self) + self._task._active_cancel_binding = self._parent + self._task._check_effective_deadline() + self._parent._children.remove(self) + self._parent = None + self._task = None + return exc + + @enable_ki_protection + def __enter__(self): + task = _core.current_task() + if self._task is not None: + raise RuntimeError( + "cancel binding may not be entered while it is already " + "active{}".format( + "" if self._task is task else + " in another task ({!r})".format(self._task.name) + ) + ) + self._open(task) + return self + + @enable_ki_protection + def __exit__(self, etype, exc, tb): + # Actual __exit__ logic is in _close(); this just adds the + # boilerplate that adapts the result to the context manager protocol. + + # Tracebacks show the 'raise' line below out of context, so let's give + # this variable a name that makes sense out of context. + remaining_error_after_cancel_scope = self._close(exc) + if remaining_error_after_cancel_scope is None: + return True + elif remaining_error_after_cancel_scope is exc: + return False + else: + # Copied verbatim from MultiErrorCatcher. Python doesn't + # allow us to encapsulate this __context__ fixup. + old_context = remaining_error_after_cancel_scope.__context__ + try: + raise remaining_error_after_cancel_scope + finally: + _, value, _ = sys.exc_info() + assert value is remaining_error_after_cancel_scope + value.__context__ = old_context + + def _reparent(self, new_parent): + # Used by nursery.start() + assert self._parent is not None + self._parent._children.remove(self) + self._parent = new_parent + self._parent._children.add(self) + + def _should_raise_cancel(self): + now = current_time() + if now < self._effective_deadline: + return False + current = self + while ( + current._parent is not None + and now >= current._parent._effective_deadline + ): + current = current._parent + current._catch_cancel = True + return True + + def _exc_filter(self, exc): + if not isinstance(exc, Cancelled) or not self._catch_cancel: + return exc + if self._parent is not None and self._parent._should_raise_cancel(): + return exc + self.cancelled_caught = True + return None + + @property + def effective_deadline(self): + return self._effective_deadline + + @property + def cancel_requested(self): + return self._effective_deadline <= current_time() + + @property + def parent(self): + return self._parent + + @property + def task(self): + return self._task + + def update_effective_deadline(self): + assert self._parent is not None + deadline, extra = self._logic.compute_effective_deadline( + self._parent._effective_deadline, + self._parent._extra_info, + self._task + ) + changed = ( + deadline != self._effective_deadline + or extra != self._extra_info + ) + self._effective_deadline = deadline + self._extra_info = extra + if changed: + if self._task._active_cancel_binding is self: + self._task._check_effective_deadline() + for child in self._children: + child.update_effective_deadline() + + @attr.s(cmp=False, repr=False, slots=True) -class CancelScope: +class CancelScope(CancelLogic): """A *cancellation scope*: the link between a unit of cancellable work and Trio's cancellation system. @@ -178,86 +395,59 @@ class CancelScope: has been entered yet, and changes take immediate effect. """ - _tasks = attr.ib(factory=set, init=False) - _scope_task = attr.ib(default=None, init=False) - _effective_deadline = attr.ib(default=inf, init=False) - cancel_called = attr.ib(default=False, init=False) - cancelled_caught = attr.ib(default=False, init=False) - - # Constructor arguments: + _cancel_called = attr.ib(default=False, init=False) + _bindings = attr.ib(factory=set, init=False) + _ever_had_multiple_bindings = attr.ib(default=False, init=False) + _last_cancelled_caught = attr.ib(default=False, init=False) _deadline = attr.ib(default=inf, kw_only=True) _shield = attr.ib(default=False, kw_only=True) @enable_ki_protection def __enter__(self): - task = _core.current_task() - if self._scope_task is not None: - raise RuntimeError( - "cancel scope may not be entered while it is already " - "active{}".format( - "" if self._scope_task is task else - " in another task ({!r})".format(self._scope_task.name) - ) - ) - self._scope_task = task - self.cancelled_caught = False - if current_time() >= self._deadline: - self.cancel_called = True - with self._might_change_effective_deadline(): - self._add_task(task) - return self + binding = self.bind() + binding.__enter__() + return binding @enable_ki_protection def __exit__(self, etype, exc, tb): - # NB: NurseryManager calls _close() directly rather than __exit__(), - # so __exit__() must be just _close() plus this logic for adapting - # the exception-filtering result to the context manager API. + binding = _core.current_task()._active_cancel_binding + assert binding in self._bindings + return binding.__exit__(etype, exc, tb) - # Tracebacks show the 'raise' line below out of context, so let's give - # this variable a name that makes sense out of context. - remaining_error_after_cancel_scope = self._close(exc) - if remaining_error_after_cancel_scope is None: - return True - elif remaining_error_after_cancel_scope is exc: - return False - else: - # Copied verbatim from MultiErrorCatcher. Python doesn't - # allow us to encapsulate this __context__ fixup. - old_context = remaining_error_after_cancel_scope.__context__ - try: - raise remaining_error_after_cancel_scope - finally: - _, value, _ = sys.exc_info() - assert value is remaining_error_after_cancel_scope - value.__context__ = old_context + def bind(self): + return CancelBinding(self) - def __repr__(self): - if self._scope_task is None: - binding = "unbound" - else: - binding = "bound to {!r}".format(self._scope_task.name) - if len(self._tasks) > 1: - binding += " and its {} descendant{}".format( - len(self._tasks) - 1, "s" if len(self._tasks) > 2 else "" - ) + def enter_binding(self, binding): + self._bindings.add(binding) + self._last_cancelled_caught = False + if len(self._bindings) > 1: + self._ever_had_multiple_bindings = True - if self.cancel_called: - state = ", cancelled" - elif self.deadline == inf: - state = "" - else: - try: - now = current_time() - except RuntimeError: # must be called from async context - state = "" - else: - state = ", deadline is {:.2f} seconds {}".format( - abs(self.deadline - now), - "from now" if self.deadline >= now else "ago" - ) + def leave_binding(self, binding): + self._last_cancelled_caught = binding.cancelled_caught + self._bindings.remove(binding) + + def compute_effective_deadline( + self, parent_effective_deadline, parent_extra_info, task + ): + incoming_deadline = inf if self._shield else parent_effective_deadline + my_deadline = -inf if self._cancel_called else self._deadline + return min(incoming_deadline, my_deadline), parent_extra_info - return "".format( - id(self), binding, state + @property + def cancelled_caught(self): + if self._ever_had_multiple_bindings: + raise RuntimeError( + "CancelScope.cancelled_caught is ambiguous for cancel " + "scopes that are entered in multiple tasks " + "simultaneously; use 'with cancel_scope as cancel_binding:' " + "and inspect cancel_binding.cancelled_caught instead" + ) + return self._last_cancelled_caught + + def __repr__(self): + return "CancelScope(deadline={:.2f}, shield={!r})".format( + self._deadline, self._shield ) @contextmanager @@ -266,18 +456,8 @@ def _might_change_effective_deadline(self): try: yield finally: - old = self._effective_deadline - if self.cancel_called or not self._tasks: - new = inf - else: - new = self._deadline - if old != new: - self._effective_deadline = new - runner = GLOBAL_RUN_CONTEXT.runner - if old != inf: - del runner.deadlines[old, id(self)] - if new != inf: - runner.deadlines[new, id(self)] = self + for binding in self._bindings: + binding.update_effective_deadline() @property def deadline(self): @@ -339,10 +519,8 @@ def shield(self): def shield(self, new_value): if not isinstance(new_value, bool): raise TypeError("shield must be a bool") - self._shield = new_value - if not self._shield: - for task in self._tasks: - task._attempt_delivery_of_any_pending_cancel() + with self._might_change_effective_deadline(): + self._shield = new_value @enable_ki_protection def cancel(self): @@ -351,47 +529,12 @@ def cancel(self): This method is idempotent, i.e., if the scope was already cancelled then this method silently does nothing. """ - if self.cancel_called: - return with self._might_change_effective_deadline(): - self.cancel_called = True - for task in self._tasks: - task._attempt_delivery_of_any_pending_cancel() - - def _add_task(self, task): - self._tasks.add(task) - task._cancel_stack.append(self) - - def _remove_task(self, task): - self._tasks.remove(task) - assert task._cancel_stack[-1] is self - task._cancel_stack.pop() + self._cancel_called = True - # Used by the nursery.start trickiness - def _tasks_removed_by_adoption(self, tasks): - with self._might_change_effective_deadline(): - self._tasks.difference_update(tasks) - - # Used by the nursery.start trickiness - def _tasks_added_by_adoption(self, tasks): - self._tasks.update(tasks) - - def _exc_filter(self, exc): - if ( - isinstance(exc, Cancelled) and self.cancel_called - and self._scope_task._pending_cancel_scope() is self - ): - self.cancelled_caught = True - return None - return exc - - def _close(self, exc): - if exc is not None: - exc = MultiError.filter(self._exc_filter, exc) - with self._might_change_effective_deadline(): - self._remove_task(self._scope_task) - self._scope_task = None - return exc + @property + def cancel_called(self): + return self._cancel_called or self._deadline <= current_time() @deprecated("0.10.0", issue=607, instead="trio.CancelScope") @@ -429,61 +572,39 @@ def started(self, value=None): # will eventually exit on its own, and we don't want to risk moving # children that might have propagating Cancelled exceptions into # a place with no cancelled cancel scopes to catch them. - if _pending_cancel_scope(self._old_nursery._cancel_stack) is not None: + if self._old_nursery.cancel_binding.cancel_requested: return # Can't be closed, b/c we checked in start() and then _pending_starts # should keep it open. assert not self._new_nursery._closed - # otherwise, find all the tasks under the old nursery, and move them - # under the new nursery instead. This means: - # - changing parents of direct children - # - changing cancel stack of all direct+indirect children - # - changing cancel stack of all direct+indirect children's nurseries - # - checking for cancellation in all changed cancel stacks - old_stack = self._old_nursery._cancel_stack - new_stack = self._new_nursery._cancel_stack - # LIFO todo stack for depth-first traversal - todo = list(self._old_nursery._children) - munged_tasks = [] - while todo: - task = todo.pop() - # Direct children need to be reparented - if task._parent_nursery is self._old_nursery: - self._old_nursery._children.remove(task) - task._parent_nursery = self._new_nursery - self._new_nursery._children.add(task) - # Everyone needs their cancel scopes fixed up... - assert task._cancel_stack[:len(old_stack)] == old_stack - task._cancel_stack[:len(old_stack)] = new_stack - # ...and their nurseries' cancel scopes fixed up. - for nursery in task._child_nurseries: - assert nursery._cancel_stack[:len(old_stack)] == old_stack - nursery._cancel_stack[:len(old_stack)] = new_stack - # And then add all the nursery's children to our todo list - todo.extend(nursery._children) - # And make a note to check for cancellation later - munged_tasks.append(task) - - # Tell all the cancel scopes about the change. (There are probably - # some scopes in common between the two stacks, so some scopes will - # get the same tasks removed and then immediately re-added. This is - # fine though.) - for cancel_scope in old_stack: - cancel_scope._tasks_removed_by_adoption(munged_tasks) - for cancel_scope in new_stack: - cancel_scope._tasks_added_by_adoption(munged_tasks) + # Graft cancel bindings underneath the old nursery onto the + # new nursery instead + moved_bindings = [] + for cancel_binding in list(self._old_nursery.cancel_binding._children): + if cancel_binding.task in self._old_nursery._children: + cancel_binding._reparent(self._new_nursery.cancel_binding) + moved_bindings.append(cancel_binding) + else: + assert cancel_binding.task is self._old_nursery.parent_task + + # Move tasks from the old to the new nursery + for task in list(self._old_nursery._children): + self._old_nursery._children.remove(task) + task._parent_nursery = self._new_nursery + self._new_nursery._children.add(task) # That should have removed all the children from the old nursery assert not self._old_nursery._children - # After all the delicate surgery is done, check for cancellation in - # all the tasks that had their cancel scopes munged. This can trigger - # arbitrary abort() callbacks, so we put it off until our internal - # data structures are all self-consistent again. - for task in munged_tasks: - task._attempt_delivery_of_any_pending_cancel() + # After all the delicate surgery is done, check for + # cancellation in all the tasks that had their cancellation + # context changed. This can trigger arbitrary abort() + # callbacks, so we put it off until our internal data + # structures are all self-consistent again. + for cancel_binding in moved_bindings: + cancel_binding.update_effective_deadline() # And finally, poke the old nursery so it notices that all its # children have disappeared and can exit. @@ -502,9 +623,9 @@ class NurseryManager: @enable_ki_protection async def __aenter__(self): - self._scope = CancelScope() - self._scope.__enter__() - self._nursery = Nursery(current_task(), self._scope) + scope = CancelScope() + self._binding = scope.__enter__() + self._nursery = Nursery(current_task(), scope, self._binding) return self._nursery @enable_ki_protection @@ -512,7 +633,7 @@ async def __aexit__(self, etype, exc, tb): new_exc = await self._nursery._nested_child_finished(exc) # Tracebacks show the 'raise' line below out of context, so let's give # this variable a name that makes sense out of context. - combined_error_from_nursery = self._scope._close(new_exc) + combined_error_from_nursery = self._binding._close(new_exc) if combined_error_from_nursery is None: return True elif combined_error_from_nursery is exc: @@ -549,16 +670,14 @@ def open_nursery(): class Nursery: - def __init__(self, parent_task, cancel_scope): + def __init__(self, parent_task, cancel_scope, cancel_binding): self._parent_task = parent_task parent_task._child_nurseries.append(self) - # the cancel stack that children inherit - we take a snapshot, so it - # won't be affected by any changes in the parent. - self._cancel_stack = list(parent_task._cancel_stack) # the cancel scope that directly surrounds us; used for cancelling all # children. self.cancel_scope = cancel_scope - assert self.cancel_scope is self._cancel_stack[-1] + self.cancel_binding = cancel_binding + assert self.cancel_binding is self._parent_task._active_cancel_binding self._children = set() self._pending_excs = [] # The "nested child" is how this code refers to the contents of the @@ -662,19 +781,6 @@ def __del__(self): ################################################################ -def _pending_cancel_scope(cancel_stack): - # Return the outermost exception that is is not outside a shield. - pending_scope = None - for scope in cancel_stack: - # Check shield before _exc, because shield should not block - # processing of *this* scope's exception - if scope.shield: - pending_scope = None - if pending_scope is None and scope.cancel_called: - pending_scope = scope - return pending_scope - - @attr.s(cmp=False, hash=False, repr=False) class Task: _parent_nursery = attr.ib() @@ -730,10 +836,25 @@ def child_nurseries(self): # Cancellation ################ - _cancel_stack = attr.ib(default=attr.Factory(list), repr=False) - - def _pending_cancel_scope(self): - return _pending_cancel_scope(self._cancel_stack) + _effective_deadline = attr.ib(default=inf, init=False) + _deadline_is_registered = attr.ib(default=False, init=False) + _active_cancel_binding = attr.ib(kw_only=True) + + def _check_effective_deadline(self): + old = self._effective_deadline + new = self._active_cancel_binding._effective_deadline + if old != new: + old_registered = self._deadline_is_registered + new_registered = (new > current_time() and new != inf) + if old_registered: + del self._runner.deadlines[old, id(self)] + if new_registered: + self._runner.deadlines[new, id(self)] = self + self._effective_deadline = new + self._deadline_is_registered = new_registered + if not new_registered and new != inf: + # new deadline has already passed -- cancel now + self._attempt_delivery_of_any_pending_cancel() def _attempt_abort(self, raise_cancel): # Either the abort succeeds, in which case we will reschedule the @@ -752,13 +873,10 @@ def _attempt_abort(self, raise_cancel): def _attempt_delivery_of_any_pending_cancel(self): if self._abort_func is None: return - if self._pending_cancel_scope() is None: - return - - def raise_cancel(): - raise Cancelled._init() - - self._attempt_abort(raise_cancel) + if self._active_cancel_binding._should_raise_cancel(): + def raise_cancel(): + raise Cancelled._init() + self._attempt_abort(raise_cancel) def _attempt_delivery_of_pending_ki(self): assert self._runner.ki_pending @@ -798,9 +916,9 @@ class Runner: runq = attr.ib(default=attr.Factory(deque)) tasks = attr.ib(default=attr.Factory(set)) - # {(deadline, id(CancelScope)): CancelScope} - # only contains scopes with non-infinite deadlines that are currently - # attached to at least one task + # {(deadline, id(Task)): Task} + # only contains tasks whose active cancel bindings have non-infinite + # deadlines that are in the future deadlines = attr.ib(default=attr.Factory(SortedDict)) init_task = attr.ib(default=None) @@ -812,6 +930,8 @@ class Runner: entry_queue = attr.ib(default=attr.Factory(EntryQueue)) trio_token = attr.ib(default=None) + root_cancel_binding = attr.ib(factory=partial(CancelBinding, None)) + _NO_SEND = object() def close(self): @@ -1040,13 +1160,22 @@ def _return_value_looks_like_wrong_library(value): else: context = copy_context() + if nursery is not None: + parent_binding = nursery.cancel_binding + else: + parent_binding = self.root_cancel_binding + task = Task( coro=coro, parent_nursery=nursery, runner=self, name=name, context=context, + active_cancel_binding=parent_binding, ) + child_root_binding = CancelBinding(PropagateOnly()) + child_root_binding._open(task) + self.tasks.add(task) coro.cr_frame.f_locals.setdefault( LOCALS_KEY_KI_PROTECTION_ENABLED, system_task @@ -1054,8 +1183,6 @@ def _return_value_looks_like_wrong_library(value): if nursery is not None: nursery._children.add(task) - for scope in nursery._cancel_stack: - scope._add_task(task) if self.instruments: self.instrument("task_spawned", task) @@ -1065,8 +1192,8 @@ def _return_value_looks_like_wrong_library(value): return task def task_exited(self, task, outcome): - while task._cancel_stack: - task._cancel_stack[-1]._remove_task(task) + task._active_cancel_binding._close(None) + assert task._active_cancel_binding._task is not task self.tasks.remove(task) if task is self.main_task: self.main_task_outcome = outcome @@ -1518,10 +1645,11 @@ def run_impl(runner, async_fn, args): # Process cancellations due to deadline expiry now = runner.clock.current_time() while runner.deadlines: - (deadline, _), cancel_scope = runner.deadlines.peekitem(0) + (deadline, ident), task = runner.deadlines.peekitem(0) if deadline <= now: - # This removes the given scope from runner.deadlines: - cancel_scope.cancel() + del runner.deadlines[deadline, ident] + task._deadline_is_registered = False + task._attempt_delivery_of_any_pending_cancel() idle_primed = False else: break @@ -1689,14 +1817,8 @@ def current_effective_deadline(): """ task = current_task() - deadline = inf - for scope in task._cancel_stack: - if scope._shield: - deadline = inf - if scope.cancel_called: - deadline = -inf - deadline = min(deadline, scope._deadline) - return deadline + deadline = task._active_cancel_binding.effective_deadline + return -inf if deadline <= current_time() else deadline async def checkpoint(): @@ -1734,7 +1856,7 @@ async def checkpoint_if_cancelled(): """ task = current_task() if ( - task._pending_cancel_scope() is not None or + task._active_cancel_binding.effective_deadline <= current_time() or (task is task._runner.main_task and task._runner.ki_pending) ): await _core.checkpoint() diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index c6d3298bcd..cd7070ea09 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -549,21 +549,27 @@ async def main(): async def test_cancel_scope_repr(mock_clock): scope = _core.CancelScope() - assert "unbound" in repr(scope) - with scope: - assert "bound to {!r}".format(_core.current_task().name) in repr(scope) - async with _core.open_nursery() as nursery: - nursery.start_soon(sleep, 10) - assert "and its 1 descendant" in repr(scope) - nursery.cancel_scope.cancel() + assert repr(scope) == "CancelScope(deadline=inf, shield=False)" + binding = scope.bind() + assert "unbound" in repr(binding) + with binding: + assert "active" in repr(binding) + with _core.CancelScope() as inner: + assert "shadowed" in repr(binding) and "active" in repr(inner) + assert "active" in repr(binding) and "unbound" in repr(inner) scope.deadline = _core.current_time() - 1 - assert "deadline is 1.00 seconds ago" in repr(scope) + assert "cancel requested 1.00sec ago" in repr(binding) scope.deadline = _core.current_time() + 10 - assert "deadline is 10.00 seconds from now" in repr(scope) + assert "cancel in 10.00sec" in repr(binding) # when not in async context, can't get the current time - assert "deadline" not in await run_sync_in_worker_thread(repr, scope) + assert "deadline unknown outside trio thread" in ( + await run_sync_in_worker_thread(repr, binding) + ) scope.cancel() - assert "cancelled" in repr(scope) + assert "cancel requested infsec ago" in repr(binding) + await _core.checkpoint() + assert binding.cancelled_caught + assert "cancelled" in repr(binding) def test_cancel_points():