diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 51471045e0..36b37e3e37 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -471,7 +471,7 @@ this does serve to illustrate the basic structure of the Task API --------- +======== .. autofunction:: current_root_task() @@ -534,10 +534,410 @@ Task API :func:`wait_task_rescheduled` for details.) +.. _guest-mode: + +Using "guest mode" to run Trio on top of other event loops +========================================================== + +What is "guest mode"? +--------------------- + +An event loop acts as a central coordinator to manage all the IO +happening in your program. Normally, that means that your application +has to pick one event loop, and use it for everything. But what if you +like Trio, but also need to use a framework like `Qt +`__ or `PyGame +`__ that has its own event loop? Then you +need some way to run both event loops at once. + +It is possible to combine event loops, but the standard approaches all +have significant downsides: + +- **Polling:** this is where you use a `busy-loop + `__ to manually check + for IO on both event loops many times per second. This adds latency, + and wastes CPU time and electricity. + +- **Pluggable IO backends:** this is where you reimplement one of the + event loop APIs on top of the other, so you effectively end up with + just one event loop. This requires a significant amount of work for + each pair of event loops you want to integrate, and different + backends inevitably end up with inconsistent behavior, forcing users + to program against the least-common-denominator. And if the two + event loops expose different feature sets, it may not even be + possible to implement one in terms of the other. + +- **Running the two event loops in separate threads:** This works, but + most event loop APIs aren't thread-safe, so in this approach you + need to keep careful track of which code runs on which event loop, + and remember to use explicit inter-thread messaging whenever you + interact with the other loop – or else risk obscure race conditions + and data corruption. + +That's why Trio offers a fourth option: **guest mode**. Guest mode +lets you execute `trio.run` on top of some other "host" event loop, +like Qt. Its advantages are: + +- Efficiency: guest mode is event-driven instead of using a busy-loop, + so it has low latency and doesn't waste electricity. + +- No need to think about threads: your Trio code runs in the same + thread as the host event loop, so you can freely call sync Trio APIs + from the host, and call sync host APIs from Trio. For example, if + you're making a GUI app with Qt as the host loop, then making a + `cancel button `__ and + connecting it to a `trio.CancelScope` is as easy as writing:: + + # Trio code can create Qt objects without any special ceremony... + my_cancel_button = QPushButton("Cancel") + # ...and Qt can call back to Trio just as easily + my_cancel_button.clicked.connect(my_cancel_scope.cancel) + + (For async APIs, it's not that simple, but you can use sync APIs to + build explicit bridges between the two worlds, e.g. by passing async + functions and their results back and forth through queues.) + +- Consistent behavior: guest mode uses the same code as regular Trio: + the same scheduler, same IO code, same everything. So you get the + full feature set and everything acts the way you expect. + +- Simple integration and broad compatibility: pretty much every event + loop offers some threadsafe "schedule a callback" operation, and + that's all you need to use it as a host loop. + + +Really? How is that possible? +----------------------------- + +.. note:: + + You can use guest mode without reading this section. It's included + for those who enjoy understanding how things work. + +All event loops have the same basic structure. They loop through two +operations, over and over: + +1. Wait for the operating system to notify them that something + interesting has happened, like data arriving on a socket or a + timeout passing. They do this by invoking a platform-specific + ``sleep_until_something_happens()`` system call – ``select``, + ``epoll``, ``kqueue``, ``GetQueuedCompletionEvents``, etc. + +2. Run all the user tasks that care about whatever happened, then go + back to step 1. + +The problem here is step 1. Two different event loops on the same +thread can take turns running user tasks in step 2, but when they're +idle and nothing is happening, they can't both invoke their own +``sleep_until_something_happens()`` function at the same time. + +The "polling" and "pluggable backend" strategies solve this by hacking +the loops so both step 1s can run at the same time in the same thread. +Keeping everything in one thread is great for step 2, but the step 1 +hacks create problems. + +The "separate threads" strategy solves this by moving both steps into +separate threads. This makes step 1 work, but the downside is that now +the user tasks in step 2 are running separate threads as well, so +users are forced to deal with inter-thread coordination. + +The idea behind guest mode is to combine the best parts of each +approach: we move Trio's step 1 into a separate worker thread, while +keeping Trio's step 2 in the main host thread. This way, when the +application is idle, both event loops do their +``sleep_until_something_happens()`` at the same time in their own +threads. But when the app wakes up and your code is actually running, +it all happens in a single thread. The threading trickiness is all +handled transparently inside Trio. + +Concretely, we unroll Trio's internal event loop into a chain of +callbacks, and as each callback finishes, it schedules the next +callback onto the host loop or a worker thread as appropriate. So the +only thing the host loop has to provide is a way to schedule a +callback onto the main thread from a worker thread. + +Coordinating between Trio and the host loop does add some overhead. +The main cost is switching in and out of the background thread, since +this requires cross-thread messaging. This is cheap (on the order of a +few microseconds, assuming your host loop is implemented efficiently), +but it's not free. + +But, there's a nice optimization we can make: we only *need* the +thread when our ``sleep_until_something_happens()`` call actually +sleeps, that is, when the Trio part of your program is idle and has +nothing to do. So before we switch into the worker thread, we +double-check whether we're idle, and if not, then we skip the worker +thread and jump directly to step 2. This means that your app only pays +the extra thread-switching penalty at moments when it would otherwise +be sleeping, so it should have minimal effect on your app's overall +performance. + +The total overhead will depend on your host loop, your platform, your +application, etc. But we expect that in most cases, apps running in +guest mode should only be 5-10% slower than the same code using +`trio.run`. If you find that's not true for your app, then please let +us know and we'll see if we can fix it! + + +.. _guest-run-implementation: + +Implementing guest mode for your favorite event loop +---------------------------------------------------- + +Let's walk through what you need to do to integrate Trio's guest mode +with your favorite event loop. Treat this section like a checklist. + +**Getting started:** The first step is to get something basic working. +Here's a minimal example of running Trio on top of asyncio, that you +can use as a model:: + + import asyncio, trio + + # A tiny Trio program + async def trio_main(): + for i in range(5): + print(f"Hello from Trio!") + # This is inside Trio, so we have to use Trio APIs + await trio.sleep(1) + return "trio done!" + + # The code to run it as a guest inside asyncio + async def asyncio_main(): + asyncio_loop = asyncio.get_running_loop() + + def run_sync_soon_threadsafe(fn): + asyncio_loop.call_soon_threadsafe(fn) + + def done_callback(trio_main_outcome): + print(f"Trio program ended with: {trio_main_outcome}") + + # This is where the magic happens: + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + done_callback=done_callback, + ) + + # Let the host loop run for a while to give trio_main time to + # finish. (WARNING: This is a hack. See below for better + # approaches.) + # + # This function is in asyncio, so we have to use asyncio APIs. + await asyncio.sleep(10) + + asyncio.run(asyncio_main()) + +You can see we're using asyncio-specific APIs to start up a loop, and +then we call `trio.lowlevel.start_guest_run`. This function is very +similar to `trio.run`, and takes all the same arguments. But it has +two differences: + +First, instead of blocking until ``trio_main`` has finished, it +schedules ``trio_main`` to start running on top of the host loop, and +then returns immediately. So ``trio_main`` is running in the +background – that's why we have to sleep and give it time to finish. + +And second, it requires two extra keyword arguments: +``run_sync_soon_threadsafe``, and ``done_callback``. + +For ``run_sync_soon_threadsafe``, we need a function that takes a +synchronous callback, and schedules it to run on your host loop. And +this function needs to be "threadsafe" in the sense that you can +safely call it from any thread. So you need to figure out how to write +a function that does that using your host loop's API. For asyncio, +this is easy because `~asyncio.loop.call_soon_threadsafe` does exactly +what we need; for your loop, it might be more or less complicated. + +For ``done_callback``, you pass in a function that Trio will +automatically invoke when the Trio run finishes, so you know it's done +and what happened. For this basic starting version, we just print the +result; in the next section we'll discuss better alternatives. + +At this stage you should be able to run a simple Trio program inside +your host loop. Now we'll turn that prototype into something solid. + + +**Loop lifetimes:** One of the trickiest things in most event loops is +shutting down correctly. And having two event loops makes this even +harder! + +If you can, we recommend following this pattern: + +- Start up your host loop +- Immediately call `start_guest_run` to start Trio +- When Trio finishes and your ``done_callback`` is invoked, shut down + the host loop +- Make sure that nothing else shuts down your host loop + +This way, your two event loops have the same lifetime, and your +program automatically exits when your Trio function finishes. + +Here's how we'd extend our asyncio example to implement this pattern: + +.. code-block:: python3 + :emphasize-lines: 8-11,19-22 + + # Improved version, that shuts down properly after Trio finishes + async def asyncio_main(): + asyncio_loop = asyncio.get_running_loop() + + def run_sync_soon_threadsafe(fn): + asyncio_loop.call_soon_threadsafe(fn) + + # Revised 'done' callback: set a Future + done_fut = asyncio.Future() + def done_callback(trio_main_outcome): + done_fut.set_result(trio_main_outcome) + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + done_callback=done_callback, + ) + + # Wait for the guest run to finish + trio_main_outcome = await done_fut + # Pass through the return value or exception from the guest run + return trio_main_outcome.unwrap() + +And then you can encapsulate all this machinery in a utility function +that exposes a `trio.run`-like API, but runs both loops together:: + + def trio_run_with_asyncio(trio_main, *args, **trio_run_kwargs): + async def asyncio_main(): + # same as above + ... + + return asyncio.run(asyncio_main()) + +Technically, it is possible to use other patterns. But there are some +important limitations you have to respect: + +- **You must let the Trio program run to completion.** Many event + loops let you stop the event loop at any point, and any pending + callbacks/tasks/etc. just... don't run. Trio follows a more + structured system, where you can cancel things, but the code always + runs to completion, so ``finally`` blocks run, resources are cleaned + up, etc. If you stop your host loop early, before the + ``done_callback`` is invoked, then that cuts off the Trio run in the + middle without a chance to clean up. This can leave your code in an + inconsistent state, and will definitely leave Trio's internals in an + inconsistent state, which will cause errors if you try to use Trio + again in that thread. + + Some programs need to be able to quit at any time, for example in + response to a GUI window being closed or a user selecting a "Quit" + from a menu. In these cases, we recommend wrapping your whole + program in a `trio.CancelScope`, and cancelling it when you want to + quit. + +- Each host loop can only have one `start_guest_run` at a time. If you + try to start a second one, you'll get an error. If you need to run + multiple Trio functions at the same time, then start up a single + Trio run, open a nursery, and then start your functions as child + tasks in that nursery. + +- Unless you or your host loop register a handler for `signal.SIGINT` + before starting Trio (this is not common), then Trio will take over + delivery of `KeyboardInterrupt`\s. And since Trio can't tell which + host code is safe to interrupt, it will only deliver + `KeyboardInterrupt` into the Trio part of your code. This is fine if + your program is set up to exit when the Trio part exits, because the + `KeyboardInterrupt` will propagate out of Trio and then trigger the + shutdown of your host loop, which is just what you want. + +Given these constraints, we think the simplest approach is to always +start and stop the two loops together. + +**Signal management:** `"Signals" +`__ are a low-level +inter-process communication primitive. When you hit control-C to kill +a program, that uses a signal. Signal handling in Python has `a lot of +moving parts +`__. +One of those parts is `signal.set_wakeup_fd`, which event loops use to +make sure that they wake up when a signal arrives so they can respond +to it. (If you've ever had an event loop ignore you when you hit +control-C, it was probably because they weren't using +`signal.set_wakeup_fd` correctly.) + +But, only one event loop can use `signal.set_wakeup_fd` at a time. And +in guest mode that can cause problems: Trio and the host loop might +start fighting over who's using `signal.set_wakeup_fd`. + +Some event loops, like asyncio, won't work correctly unless they win +this fight. Fortunately, Trio is a little less picky: as long as +*someone* makes sure that the program wakes up when a signal arrives, +it should work correctly. So if your host loop wants +`signal.set_wakeup_fd`, then you should disable Trio's +`signal.set_wakeup_fd` support, and then both loops will work +correctly. + +On the other hand, if your host loop doesn't use +`signal.set_wakeup_fd`, then the only way to make everything work +correctly is to *enable* Trio's `signal.set_wakeup_fd` support. + +By default, Trio assumes that your host loop doesn't use +`signal.set_wakeup_fd`. It does try to detect when this creates a +conflict with the host loop, and print a warning – but unfortunately, +by the time it detects it, the damage has already been done. So if +you're getting this warning, then you should disable Trio's +`signal.set_wakeup_fd` support by passing +``host_uses_signal_set_wakeup_fd=True`` to `start_guest_run`. + +If you aren't seeing any warnings with your initial prototype, you're +*probably* fine. But the only way to be certain is to check your host +loop's source. For example, asyncio may or may not use +`signal.set_wakeup_fd` depending on the Python version and operating +system. + + +**A small optimization:** Finally, consider a small optimization. Some +event loops offer two versions of their "call this function soon" API: +one that can be used from any thread, and one that can only be used +from the event loop thread, with the latter being cheaper. For +example, asyncio has both `~asyncio.loop.call_soon_threadsafe` and +`~asyncio.loop.call_soon`. + +If you have a loop like this, then you can also pass a +``run_sync_soon_not_threadsafe=...`` kwarg to `start_guest_run`, and +Trio will automatically use it when appropriate. + +If your loop doesn't have a split like this, then don't worry about +it; ``run_sync_soon_not_threadsafe=`` is optional. (If it's not +passed, then Trio will just use your threadsafe version in all cases.) + +**That's it!** If you've followed all these steps, you should now have +a cleanly-integrated hybrid event loop. Go make some cool +GUIs/games/whatever! + + +Limitations +----------- + +In general, almost all Trio features should work in guest mode. The +exception is features which rely on Trio having a complete picture of +everything that your program is doing, since obviously, it can't +control the host loop or see what it's doing. + +Custom clocks can be used in guest mode, but they only affect Trio +timeouts, not host loop timeouts. And the :ref:`autojump clock +` and related `trio.testing.wait_all_tasks_blocked` can +technically be used in guest mode, but they'll only take Trio tasks +into account when decided whether to jump the clock or whether all +tasks are blocked. + + +Reference +--------- + +.. autofunction:: start_guest_run + + .. _live-coroutine-handoff: Handing off live coroutine objects between coroutine runners ------------------------------------------------------------- +============================================================ Internally, Python's async/await syntax is built around the idea of "coroutine objects" and "coroutine runners". A coroutine object diff --git a/docs/source/reference-testing.rst b/docs/source/reference-testing.rst index 40a275bbeb..76ecd4a2d4 100644 --- a/docs/source/reference-testing.rst +++ b/docs/source/reference-testing.rst @@ -16,6 +16,8 @@ Test harness integration .. decorator:: trio_test +.. _testing-time: + Time and timeouts ----------------- diff --git a/newsfragments/399.feature.rst b/newsfragments/399.feature.rst new file mode 100644 index 0000000000..4de52bfb30 --- /dev/null +++ b/newsfragments/399.feature.rst @@ -0,0 +1,3 @@ +If you want to use Trio, but are stuck with some other event loop like +Qt or PyGame, then good news: now you can have both. For details, see: +:ref:`guest-mode`. diff --git a/notes-to-self/aio-guest-test.py b/notes-to-self/aio-guest-test.py new file mode 100644 index 0000000000..5e9b398132 --- /dev/null +++ b/notes-to-self/aio-guest-test.py @@ -0,0 +1,48 @@ +import asyncio +import trio + +async def aio_main(): + loop = asyncio.get_running_loop() + + trio_done_fut = asyncio.Future() + def trio_done_callback(main_outcome): + print(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + + (await trio_done_fut).unwrap() + + +async def trio_main(): + print("trio_main!") + + to_trio, from_aio = trio.open_memory_channel(float("inf")) + from_trio = asyncio.Queue() + + asyncio.create_task(aio_pingpong(from_trio, to_trio)) + + from_trio.put_nowait(0) + + async for n in from_aio: + print(f"trio got: {n}") + await trio.sleep(1) + from_trio.put_nowait(n + 1) + if n >= 10: + return + +async def aio_pingpong(from_trio, to_trio): + print("aio_pingpong!") + + while True: + n = await from_trio.get() + print(f"aio got: {n}") + await asyncio.sleep(1) + to_trio.send_nowait(n + 1) + + +asyncio.run(aio_main()) diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index 136bfe6b98..e22485149d 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -48,6 +48,7 @@ wait_writable, notify_closing, Nursery, + start_guest_run, ) # Has to come after _run to resolve a circular import diff --git a/trio/_core/_io_epoll.py b/trio/_core/_io_epoll.py index 71f46c40a7..665bbe66f7 100644 --- a/trio/_core/_io_epoll.py +++ b/trio/_core/_io_epoll.py @@ -5,6 +5,7 @@ from .. import _core from ._run import _public from ._io_common import wake_all +from ._wakeup_socketpair import WakeupSocketpair @attr.s(slots=True, eq=False, frozen=True) @@ -184,6 +185,12 @@ class EpollIOManager: _epoll = attr.ib(factory=select.epoll) # {fd: EpollWaiters} _registered = attr.ib(factory=lambda: defaultdict(EpollWaiters)) + _force_wakeup = attr.ib(factory=WakeupSocketpair) + _force_wakeup_fd = attr.ib(default=None) + + def __attrs_post_init__(self): + self._epoll.register(self._force_wakeup.wakeup_sock, select.EPOLLIN) + self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno() def statistics(self): tasks_waiting_read = 0 @@ -200,13 +207,26 @@ def statistics(self): def close(self): self._epoll.close() + self._force_wakeup.close() + + def force_wakeup(self): + self._force_wakeup.wakeup_thread_and_signal_safe() - # Called internally by the task runner: - def handle_io(self, timeout): + # Return value must be False-y IFF the timeout expired, NOT if any I/O + # happened or force_wakeup was called. Otherwise it can be anything; gets + # passed straight through to process_events. + def get_events(self, timeout): # max_events must be > 0 or epoll gets cranky + # accessing self._registered from a thread looks dangerous, but it's + # OK because it doesn't matter if our value is a little bit off. max_events = max(1, len(self._registered)) - events = self._epoll.poll(timeout, max_events) + return self._epoll.poll(timeout, max_events) + + def process_events(self, events): for fd, flags in events: + if fd == self._force_wakeup_fd: + self._force_wakeup.drain() + continue waiters = self._registered[fd] # EPOLLONESHOT always clears the flags when an event is delivered waiters.current_flags = 0 diff --git a/trio/_core/_io_kqueue.py b/trio/_core/_io_kqueue.py index b194e85f53..d2d80a7341 100644 --- a/trio/_core/_io_kqueue.py +++ b/trio/_core/_io_kqueue.py @@ -7,6 +7,7 @@ from .. import _core from ._run import _public +from ._wakeup_socketpair import WakeupSocketpair @attr.s(slots=True, eq=False, frozen=True) @@ -21,6 +22,15 @@ class KqueueIOManager: _kqueue = attr.ib(factory=select.kqueue) # {(ident, filter): Task or UnboundedQueue} _registered = attr.ib(factory=dict) + _force_wakeup = attr.ib(factory=WakeupSocketpair) + _force_wakeup_fd = attr.ib(default=None) + + def __attrs_post_init__(self): + force_wakeup_event = select.kevent( + self._force_wakeup.wakeup_sock, select.KQ_FILTER_READ, select.KQ_EV_ADD + ) + self._kqueue.control([force_wakeup_event], 0) + self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno() def statistics(self): tasks_waiting = 0 @@ -35,7 +45,10 @@ def statistics(self): def close(self): self._kqueue.close() - def handle_io(self, timeout): + def force_wakeup(self): + self._force_wakeup.wakeup_thread_and_signal_safe() + + def get_events(self, timeout): # max_events must be > 0 or kqueue gets cranky # and we generally want this to be strictly larger than the actual # number of events we get, so that we can tell that we've gotten @@ -50,8 +63,14 @@ def handle_io(self, timeout): else: timeout = 0 # and loop back to the start + return events + + def process_events(self, events): for event in events: key = (event.ident, event.filter) + if event.ident == self._force_wakeup_fd: + self._force_wakeup.drain() + continue receiver = self._registered[key] if event.flags & select.KQ_EV_ONESHOT: del self._registered[key] diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 57056e465e..35fc15e02e 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -171,7 +171,8 @@ class CKeys(enum.IntEnum): AFD_POLL = 0 WAIT_OVERLAPPED = 1 LATE_CANCEL = 2 - USER_DEFINED = 3 # and above + FORCE_WAKEUP = 3 + USER_DEFINED = 4 # and above def _check(success): @@ -388,7 +389,14 @@ def statistics(self): completion_key_monitors=len(self._completion_key_queues), ) - def handle_io(self, timeout): + def force_wakeup(self): + _check( + kernel32.PostQueuedCompletionStatus( + self._iocp, 0, CKeys.FORCE_WAKEUP, ffi.NULL + ) + ) + + def get_events(self, timeout): received = ffi.new("PULONG") milliseconds = round(1000 * timeout) if timeout > 0 and milliseconds == 0: @@ -402,8 +410,11 @@ def handle_io(self, timeout): except OSError as exc: if exc.winerror != ErrorCodes.WAIT_TIMEOUT: # pragma: no cover raise - return - for i in range(received[0]): + return 0 + return received[0] + + def process_events(self, received): + for i in range(received): entry = self._events[i] if entry.lpCompletionKey == CKeys.AFD_POLL: lpo = entry.lpOverlapped @@ -465,6 +476,8 @@ def handle_io(self, timeout): # try changing this line to # _core.reschedule(waiter, outcome.Error(exc)) raise exc + elif entry.lpCompletionKey == CKeys.FORCE_WAKEUP: + pass else: # dispatch on lpCompletionKey queue = self._completion_key_queues[entry.lpCompletionKey] diff --git a/trio/_core/_ki.py b/trio/_core/_ki.py index ec22ff5fb6..01ce03ce59 100644 --- a/trio/_core/_ki.py +++ b/trio/_core/_ki.py @@ -3,6 +3,7 @@ import sys from contextlib import contextmanager from functools import wraps +import attr import async_generator @@ -170,26 +171,31 @@ def wrapper(*args, **kwargs): disable_ki_protection.__name__ = "disable_ki_protection" -@contextmanager -def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints): - if ( - not is_main_thread() - or signal.getsignal(signal.SIGINT) != signal.default_int_handler - ): - yield - return - - def handler(signum, frame): - assert signum == signal.SIGINT - protection_enabled = ki_protection_enabled(frame) - if protection_enabled or restrict_keyboard_interrupt_to_checkpoints: - deliver_cb() - else: - raise KeyboardInterrupt - - signal.signal(signal.SIGINT, handler) - try: - yield - finally: - if signal.getsignal(signal.SIGINT) is handler: - signal.signal(signal.SIGINT, signal.default_int_handler) +@attr.s +class KIManager: + handler = attr.ib(default=None) + + def install(self, deliver_cb, restrict_keyboard_interrupt_to_checkpoints): + assert self.handler is None + if ( + not is_main_thread() + or signal.getsignal(signal.SIGINT) != signal.default_int_handler + ): + return + + def handler(signum, frame): + assert signum == signal.SIGINT + protection_enabled = ki_protection_enabled(frame) + if protection_enabled or restrict_keyboard_interrupt_to_checkpoints: + deliver_cb() + else: + raise KeyboardInterrupt + + self.handler = handler + signal.signal(signal.SIGINT, handler) + + def close(self): + if self.handler is not None: + if signal.getsignal(signal.SIGINT) is self.handler: + signal.signal(signal.SIGINT, signal.default_int_handler) + self.handler = None diff --git a/trio/_core/_run.py b/trio/_core/_run.py index b19e576201..a10d6cb57f 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -11,6 +11,7 @@ from collections import deque import collections.abc from contextlib import contextmanager, closing +import warnings from contextvars import copy_context from math import inf @@ -26,7 +27,7 @@ from ._exceptions import TrioInternalError, RunFinishedError, Cancelled from ._ki import ( LOCALS_KEY_KI_PROTECTION_ENABLED, - ki_manager, + KIManager, enable_ki_protection, ) from ._multierror import MultiError @@ -37,6 +38,7 @@ PermanentlyDetachCoroutineObject, WaitTaskRescheduled, ) +from ._thread_cache import start_thread_soon from .. import _core from .._deprecate import deprecated from .._util import Final, NoPublicConstructor, coroutine_or_error @@ -509,10 +511,16 @@ def _might_change_registered_deadline(self): if old != new: self._registered_deadline = new runner = GLOBAL_RUN_CONTEXT.runner + if runner.is_guest: + old_next_deadline = runner.next_deadline() if old != inf: del runner.deadlines[old, id(self)] if new != inf: runner.deadlines[new, id(self)] = self + if runner.is_guest: + new_next_deadline = runner.next_deadline() + if old_next_deadline != new_next_deadline: + runner.force_guest_tick_asap() @property def deadline(self): @@ -816,7 +824,7 @@ def _check_nursery_closed(self): def _child_finished(self, task, outcome): self._children.remove(task) - if type(outcome) is Error: + if isinstance(outcome, Error): self._add_exc(outcome.error) self._check_nursery_closed() @@ -1111,11 +1119,75 @@ class _RunStatistics: run_sync_soon_queue_size = attr.ib() -@attr.s(eq=False, hash=False) +# This holds all the state that gets trampolined back and forth between +# callbacks when we're running in guest mode. +# +# It has to be a separate object from Runner, and Runner *cannot* hold +# references to it (directly or indirectly)! +# +# The idea is that we want a chance to detect if our host loop quits and stops +# driving us forward. We detect that by unrolled_run_gen being garbage +# collected, and hitting its 'except GeneratorExit:' block. So this only +# happens if unrolled_run_gen is GCed. +# +# The Runner state is referenced from the global GLOBAL_RUN_CONTEXT. The only +# way it gets *un*referenced is by unrolled_run_gen completing, e.g. by being +# GCed. But if Runner has a direct or indirect reference to it, and the host +# loop has abandoned it, then this will never happen! +# +# So this object can reference Runner, but Runner can't reference it. The only +# references to it are the "in flight" callback chain on the host loop / +# worker thread. +@attr.s(eq=False, hash=False, slots=True) +class GuestState: + runner = attr.ib() + run_sync_soon_threadsafe = attr.ib() + run_sync_soon_not_threadsafe = attr.ib() + done_callback = attr.ib() + unrolled_run_gen = attr.ib() + unrolled_run_next_send = attr.ib(factory=lambda: Value(None)) + + def guest_tick(self): + try: + timeout = self.unrolled_run_next_send.send(self.unrolled_run_gen) + except StopIteration: + self.done_callback(self.runner.main_task_outcome) + return + except TrioInternalError as exc: + self.done_callback(Error(exc)) + return + + # Optimization: try to skip going into the thread if we can avoid it + events_outcome = capture(self.runner.io_manager.get_events, 0) + if timeout <= 0 or isinstance(events_outcome, Error) or events_outcome.value: + # No need to go into the thread + self.unrolled_run_next_send = events_outcome + self.runner.guest_tick_scheduled = True + self.run_sync_soon_not_threadsafe(self.guest_tick) + else: + # Need to go into the thread and call get_events() there + self.runner.guest_tick_scheduled = False + + def get_events(): + return self.runner.io_manager.get_events(timeout) + + def deliver(events_outcome): + def in_main_thread(): + self.unrolled_run_next_send = events_outcome + self.runner.guest_tick_scheduled = True + self.guest_tick() + + self.run_sync_soon_threadsafe(in_main_thread) + + start_thread_soon(get_events, deliver) + + +@attr.s(eq=False, hash=False, slots=True) class Runner: clock = attr.ib() instruments = attr.ib() io_manager = attr.ib() + ki_manager = attr.ib() # Run-local values, see _local.py _locals = attr.ib(factory=dict) @@ -1137,11 +1209,31 @@ class Runner: entry_queue = attr.ib(factory=EntryQueue) trio_token = attr.ib(default=None) + # Guest mode stuff + is_guest = attr.ib(default=False) + guest_tick_scheduled = attr.ib(default=False) + + def force_guest_tick_asap(self): + if self.guest_tick_scheduled: + return + self.guest_tick_scheduled = True + self.io_manager.force_wakeup() + def close(self): self.io_manager.close() self.entry_queue.close() if self.instruments: self.instrument("after_run") + # This is where KI protection gets disabled, so we do it last + self.ki_manager.close() + + def next_deadline(self): + try: + (next_deadline, _), _ = self.deadlines.peekitem(0) + except IndexError: + return inf + else: + return next_deadline @_public def current_statistics(self): @@ -1167,11 +1259,7 @@ def current_statistics(self): other attributes vary between backends. """ - if self.deadlines: - next_deadline, _ = self.deadlines.keys()[0] - seconds_to_next_deadline = next_deadline - self.current_time() - else: - seconds_to_next_deadline = float("inf") + seconds_to_next_deadline = self.next_deadline() - self.current_time() return _RunStatistics( tasks_living=len(self.tasks), tasks_runnable=len(self.runq), @@ -1241,6 +1329,8 @@ def reschedule(self, task, next_send=_NO_SEND): task._next_send = next_send task._abort_func = None task.custom_sleep_data = None + if not self.runq and self.is_guest: + self.force_guest_tick_asap() self.runq.append(task) if self.instruments: self.instrument("task_scheduled", task) @@ -1596,6 +1686,100 @@ def remove_instrument(self, instrument): ################################################################ # run ################################################################ +# +# Trio's core task scheduler and coroutine runner is in 'unrolled_run'. It's +# called that because it has an unusual feature: it's actually a generator. +# Whenever it needs to fetch IO events from the OS, it yields, and waits for +# its caller to send the IO events back in. So the loop is "unrolled" into a +# sequence of generator send() calls. +# +# The reason for this unusual design is to support two different modes of +# operation, where the IO is handled differently. +# +# In normal mode using trio.run, the scheduler and IO run in the same thread: +# +# Main thread: +# +# +---------------------------+ +# | Run tasks | +# | (unrolled_run) | +# +---------------------------+ +# | Block waiting for I/O | +# | (io_manager.get_events) | +# +---------------------------+ +# | Run tasks | +# | (unrolled_run) | +# +---------------------------+ +# | Block waiting for I/O | +# | (io_manager.get_events) | +# +---------------------------+ +# : +# +# +# In guest mode using trio.lowlevel.start_guest_run, the scheduler runs on the +# main thread as a host loop callback, but blocking for IO gets pushed into a +# worker thread: +# +# Main thread executing host loop: Trio I/O thread: +# +# +---------------------------+ +# | Run Trio tasks | +# | (unrolled_run) | +# +---------------------------+ --------------+ +# v +# +---------------------------+ +----------------------------+ +# | Host loop does whatever | | Block waiting for Trio I/O | +# | it wants | | (io_manager.get_events) | +# +---------------------------+ +----------------------------+ +# | +# +---------------------------+ <-------------+ +# | Run Trio tasks | +# | (unrolled_run) | +# +---------------------------+ --------------+ +# v +# +---------------------------+ +----------------------------+ +# | Host loop does whatever | | Block waiting for Trio I/O | +# | it wants | | (io_manager.get_events) | +# +---------------------------+ +----------------------------+ +# : : +# +# Most of Trio's internals don't need to care about this difference. The main +# complication it creates is that in guest mode, we might need to wake up not +# just due to OS-reported IO events, but also because of code running on the +# host loop calling reschedule() or changing task deadlines. Search for +# 'is_guest' to see the special cases we need to handle this. + + +def setup_runner(clock, instruments, restrict_keyboard_interrupt_to_checkpoints): + """Create a Runner object and install it as the GLOBAL_RUN_CONTEXT.""" + # It wouldn't be *hard* to support nested calls to run(), but I can't + # think of a single good reason for it, so let's be conservative for + # now: + if hasattr(GLOBAL_RUN_CONTEXT, "runner"): + raise RuntimeError("Attempted to call run() from inside a run()") + + if clock is None: + clock = SystemClock() + instruments = list(instruments) + io_manager = TheIOManager() + system_context = copy_context() + system_context.run(current_async_library_cvar.set, "trio") + ki_manager = KIManager() + + runner = Runner( + clock=clock, + instruments=instruments, + io_manager=io_manager, + system_context=system_context, + ki_manager=ki_manager, + ) + + # This is where KI protection gets enabled, so we want to do it early - in + # particular before we start modifying global state like GLOBAL_RUN_CONTEXT + ki_manager.install(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints) + + GLOBAL_RUN_CONTEXT.runner = runner + return runner def run( @@ -1675,219 +1859,326 @@ def run( __tracebackhide__ = True - # Do error-checking up front, before we enter the TrioInternalError - # try/catch - # - # It wouldn't be *hard* to support nested calls to run(), but I can't - # think of a single good reason for it, so let's be conservative for - # now: - if hasattr(GLOBAL_RUN_CONTEXT, "runner"): - raise RuntimeError("Attempted to call run() from inside a run()") - - if clock is None: - clock = SystemClock() - instruments = list(instruments) - io_manager = TheIOManager() - system_context = copy_context() - system_context.run(current_async_library_cvar.set, "trio") - runner = Runner( - clock=clock, - instruments=instruments, - io_manager=io_manager, - system_context=system_context, + runner = setup_runner( + clock, instruments, restrict_keyboard_interrupt_to_checkpoints ) - GLOBAL_RUN_CONTEXT.runner = runner - # KI handling goes outside the core try/except/finally to avoid a window - # where KeyboardInterrupt would be allowed and converted into an - # TrioInternalError: - try: - with ki_manager(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints): - try: - with closing(runner): - with runner.entry_queue.wakeup.wakeup_on_signals(): - # The main reason this is split off into its own - # function is just to get rid of this extra - # indentation. - run_impl(runner, async_fn, args) - except TrioInternalError: - raise - except BaseException as exc: - raise TrioInternalError( - "internal error in Trio - please file a bug!" - ) from exc - finally: - GLOBAL_RUN_CONTEXT.__dict__.clear() - # Inlined copy of runner.main_task_outcome.unwrap() to avoid - # cluttering every single Trio traceback with an extra frame. - if type(runner.main_task_outcome) is Value: - return runner.main_task_outcome.value - else: - raise runner.main_task_outcome.error - finally: - # To guarantee that we never swallow a KeyboardInterrupt, we have to - # check for pending ones once more after leaving the context manager: - if runner.ki_pending: - # Implicitly chains with any exception from outcome.unwrap(): - raise KeyboardInterrupt + gen = unrolled_run(runner, async_fn, args) + next_send = None + while True: + try: + timeout = gen.send(next_send) + except StopIteration: + break + next_send = runner.io_manager.get_events(timeout) + # Inlined copy of runner.main_task_outcome.unwrap() to avoid + # cluttering every single Trio traceback with an extra frame. + if isinstance(runner.main_task_outcome, Value): + return runner.main_task_outcome.value + else: + raise runner.main_task_outcome.error + + +def start_guest_run( + async_fn, + *args, + run_sync_soon_threadsafe, + done_callback, + run_sync_soon_not_threadsafe=None, + host_uses_signal_set_wakeup_fd=False, + clock=None, + instruments=(), + restrict_keyboard_interrupt_to_checkpoints=False, +): + """Start a "guest" run of Trio on top of some other "host" event loop. + Each host loop can only have one guest run at a time. -# 24 hours is arbitrary, but it avoids issues like people setting timeouts of -# 10**20 and then getting integer overflows in the underlying system calls. -_MAX_TIMEOUT = 24 * 60 * 60 + You should always let the Trio run finish before stopping the host loop; + if not, it may leave Trio's internal data structures in an inconsistent + state. You might be able to get away with it if you immediately exit the + program, but it's safest not to go there in the first place. + Generally, the best way to do this is wrap this in a function that starts + the host loop and then immediately starts the guest run, and then shuts + down the host when the guest run completes. -def run_impl(runner, async_fn, args): - __tracebackhide__ = True + Args: + + run_sync_soon_threadsafe: An arbitrary callable, which will be passed a + function as its sole argument:: + + def my_run_sync_soon_threadsafe(fn): + ... + + This callable should schedule ``fn()`` to be run by the host on its + next pass through its loop. **Must support being called from + arbitrary threads.** + + done_callback: An arbitrary callable:: - if runner.instruments: - runner.instrument("before_run") - runner.clock.start_clock() - runner.init_task = runner.spawn_impl( - runner.init, (async_fn, args), None, "", system_task=True, + def my_done_callback(run_outcome): + ... + + When the Trio run has finished, Trio will invoke this callback to let + you know. The argument is an `outcome.Outcome`, reporting what would + have been returned or raised by `trio.run`. This function can do + anything you want, but commonly you'll want it to shut down the + host loop, unwrap the outcome, etc. + + run_sync_soon_not_threadsafe: Like ``run_sync_soon_threadsafe``, but + will only be called from inside the host loop's main thread. + Optional, but if your host loop allows you to implement this more + efficiently than ``run_sync_soon_threadsafe`` then passing it will + make things a bit faster. + + host_uses_signal_set_wakeup_fd (bool): Pass `True` if your host loop + uses `signal.set_wakeup_fd`, and `False` otherwise. For more details, + see :ref:`guest-run-implementation`. + + For the meaning of other arguments, see `trio.run`. + + """ + runner = setup_runner( + clock, instruments, restrict_keyboard_interrupt_to_checkpoints ) + runner.is_guest = True + runner.guest_tick_scheduled = True + + if run_sync_soon_not_threadsafe is None: + run_sync_soon_not_threadsafe = run_sync_soon_threadsafe + + guest_state = GuestState( + runner=runner, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + run_sync_soon_not_threadsafe=run_sync_soon_not_threadsafe, + done_callback=done_callback, + unrolled_run_gen=unrolled_run( + runner, + async_fn, + args, + host_uses_signal_set_wakeup_fd=host_uses_signal_set_wakeup_fd, + ), + ) + run_sync_soon_not_threadsafe(guest_state.guest_tick) - # You know how people talk about "event loops"? This 'while' loop right - # here is our event loop: - while runner.tasks: - if runner.runq: - timeout = 0 - elif runner.deadlines: - deadline, _ = runner.deadlines.keys()[0] - timeout = runner.clock.deadline_to_sleep_time(deadline) - else: - timeout = _MAX_TIMEOUT - timeout = min(max(0, timeout), _MAX_TIMEOUT) - idle_primed = False - if runner.waiting_for_idle: - cushion, tiebreaker, _ = runner.waiting_for_idle.keys()[0] - if cushion < timeout: - timeout = cushion - idle_primed = True +# 24 hours is arbitrary, but it avoids issues like people setting timeouts of +# 10**20 and then getting integer overflows in the underlying system calls. +_MAX_TIMEOUT = 24 * 60 * 60 - if runner.instruments: - runner.instrument("before_io_wait", timeout) - runner.io_manager.handle_io(timeout) +# Weird quirk: this is written as a generator in order to support "guest +# mode", where our core event loop gets unrolled into a series of callbacks on +# the host loop. If you're doing a regular trio.run then this gets run +# straight through. +def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): + locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True + __tracebackhide__ = True + + try: + if not host_uses_signal_set_wakeup_fd: + runner.entry_queue.wakeup.wakeup_on_signals() if runner.instruments: - runner.instrument("after_io_wait", timeout) - - # Process cancellations due to deadline expiry - now = runner.clock.current_time() - while runner.deadlines: - (deadline, _), cancel_scope = runner.deadlines.peekitem(0) - if deadline <= now: - # This removes the given scope from runner.deadlines: - cancel_scope.cancel() - idle_primed = False + runner.instrument("before_run") + runner.clock.start_clock() + runner.init_task = runner.spawn_impl( + runner.init, (async_fn, args), None, "", system_task=True, + ) + + # You know how people talk about "event loops"? This 'while' loop right + # here is our event loop: + while runner.tasks: + if runner.runq: + timeout = 0 + elif runner.deadlines: + deadline, _ = runner.deadlines.keys()[0] + timeout = runner.clock.deadline_to_sleep_time(deadline) else: - break - - if not runner.runq and idle_primed: - while runner.waiting_for_idle: - key, task = runner.waiting_for_idle.peekitem(0) - if key[:2] == (cushion, tiebreaker): - del runner.waiting_for_idle[key] - runner.reschedule(task) - else: - break + timeout = _MAX_TIMEOUT + timeout = min(max(0, timeout), _MAX_TIMEOUT) - # Process all runnable tasks, but only the ones that are already - # runnable now. Anything that becomes runnable during this cycle needs - # to wait until the next pass. This avoids various starvation issues - # by ensuring that there's never an unbounded delay between successive - # checks for I/O. - # - # Also, we randomize the order of each batch to avoid assumptions - # about scheduling order sneaking in. In the long run, I suspect we'll - # either (a) use strict FIFO ordering and document that for - # predictability/determinism, or (b) implement a more sophisticated - # scheduler (e.g. some variant of fair queueing), for better behavior - # under load. For now, this is the worst of both worlds - but it keeps - # our options open. (If we do decide to go all in on deterministic - # scheduling, then there are other things that will probably need to - # change too, like the deadlines tie-breaker and the non-deterministic - # ordering of task._notify_queues.) - batch = list(runner.runq) - if _ALLOW_DETERMINISTIC_SCHEDULING: - # We're running under Hypothesis, and pytest-trio has patched this - # in to make the scheduler deterministic and avoid flaky tests. - # It's not worth the (small) performance cost in normal operation, - # since we'll shuffle the list and _r is only seeded for tests. - batch.sort(key=lambda t: t._counter) - runner.runq.clear() - _r.shuffle(batch) - while batch: - task = batch.pop() - GLOBAL_RUN_CONTEXT.task = task + idle_primed = False + if runner.waiting_for_idle: + cushion, tiebreaker, _ = runner.waiting_for_idle.keys()[0] + if cushion < timeout: + timeout = cushion + idle_primed = True if runner.instruments: - runner.instrument("before_task_step", task) + runner.instrument("before_io_wait", timeout) - next_send_fn = task._next_send_fn - next_send = task._next_send - task._next_send_fn = task._next_send = None - final_outcome = None - try: - # We used to unwrap the Outcome object here and send/throw its - # contents in directly, but it turns out that .throw() is - # buggy, at least on CPython 3.6: - # https://bugs.python.org/issue29587 - # https://bugs.python.org/issue29590 - # So now we send in the Outcome object and unwrap it on the - # other side. - msg = task.context.run(next_send_fn, next_send) - except StopIteration as stop_iteration: - final_outcome = Value(stop_iteration.value) - except BaseException as task_exc: - # Store for later, removing uninteresting top frames: 1 frame - # we always remove, because it's this function catching it, - # and then in addition we remove however many more Context.run - # adds. - tb = task_exc.__traceback__.tb_next - for _ in range(CONTEXT_RUN_TB_FRAMES): - tb = tb.tb_next - final_outcome = Error(task_exc.with_traceback(tb)) - - if final_outcome is not None: - # We can't call this directly inside the except: blocks above, - # because then the exceptions end up attaching themselves to - # other exceptions as __context__ in unwanted ways. - runner.task_exited(task, final_outcome) - else: - task._schedule_points += 1 - if msg is CancelShieldedCheckpoint: - runner.reschedule(task) - elif type(msg) is WaitTaskRescheduled: - task._cancel_points += 1 - task._abort_func = msg.abort_func - # KI is "outside" all cancel scopes, so check for it - # before checking for regular cancellation: - if runner.ki_pending and task is runner.main_task: - task._attempt_delivery_of_pending_ki() - task._attempt_delivery_of_any_pending_cancel() - elif type(msg) is PermanentlyDetachCoroutineObject: - # Pretend the task just exited with the given outcome - runner.task_exited(task, msg.final_outcome) - else: - exc = TypeError( - "trio.run received unrecognized yield message {!r}. " - "Are you trying to use a library written for some " - "other framework like asyncio? That won't work " - "without some kind of compatibility shim.".format(msg) - ) - # The foreign library probably doesn't adhere to our - # protocol of unwrapping whatever outcome gets sent in. - # Instead, we'll arrange to throw `exc` in directly, - # which works for at least asyncio and curio. - runner.reschedule(task, exc) - task._next_send_fn = task.coro.throw + # Driver will call io_manager.get_events(timeout) and pass it back + # in throuh the yield + events = yield timeout + runner.io_manager.process_events(events) if runner.instruments: - runner.instrument("after_task_step", task) - del GLOBAL_RUN_CONTEXT.task + runner.instrument("after_io_wait", timeout) + + # Process cancellations due to deadline expiry + now = runner.clock.current_time() + while runner.deadlines: + (deadline, _), cancel_scope = runner.deadlines.peekitem(0) + if deadline <= now: + # This removes the given scope from runner.deadlines: + cancel_scope.cancel() + idle_primed = False + else: + break + + # idle_primed=True means: if the IO wait hit the timeout, and still + # nothing is happening, then we should start waking up + # wait_all_tasks_blocked tasks. But there are some subtleties in + # defining "nothing is happening". + # + # 'not runner.runq' means that no tasks are currently runnable. + # 'not events' means that the last IO wait call hit its full + # timeout. These are very similar, and if idle_primed=True and + # we're running in regular mode then they always go together. But, + # in *guest* mode, they can happen independently, even when + # idle_primed=True: + # + # - runner.runq=empty and events=True: the host loop adjusted a + # deadline and that forced an IO wakeup before the timeout expired, + # even though no actual tasks were scheduled. + # + # - runner.runq=nonempty and events=False: the IO wait hit its + # timeout, but then some code in the host thread rescheduled a task + # before we got here. + # + # So we need to check both. + if idle_primed and not runner.runq and not events: + while runner.waiting_for_idle: + key, task = runner.waiting_for_idle.peekitem(0) + if key[:2] == (cushion, tiebreaker): + del runner.waiting_for_idle[key] + runner.reschedule(task) + else: + break + + # Process all runnable tasks, but only the ones that are already + # runnable now. Anything that becomes runnable during this cycle + # needs to wait until the next pass. This avoids various + # starvation issues by ensuring that there's never an unbounded + # delay between successive checks for I/O. + # + # Also, we randomize the order of each batch to avoid assumptions + # about scheduling order sneaking in. In the long run, I suspect + # we'll either (a) use strict FIFO ordering and document that for + # predictability/determinism, or (b) implement a more + # sophisticated scheduler (e.g. some variant of fair queueing), + # for better behavior under load. For now, this is the worst of + # both worlds - but it keeps our options open. (If we do decide to + # go all in on deterministic scheduling, then there are other + # things that will probably need to change too, like the deadlines + # tie-breaker and the non-deterministic ordering of + # task._notify_queues.) + batch = list(runner.runq) + if _ALLOW_DETERMINISTIC_SCHEDULING: + # We're running under Hypothesis, and pytest-trio has patched + # this in to make the scheduler deterministic and avoid flaky + # tests. It's not worth the (small) performance cost in normal + # operation, since we'll shuffle the list and _r is only + # seeded for tests. + batch.sort(key=lambda t: t._counter) + runner.runq.clear() + _r.shuffle(batch) + while batch: + task = batch.pop() + GLOBAL_RUN_CONTEXT.task = task + + if runner.instruments: + runner.instrument("before_task_step", task) + + next_send_fn = task._next_send_fn + next_send = task._next_send + task._next_send_fn = task._next_send = None + final_outcome = None + try: + # We used to unwrap the Outcome object here and send/throw + # its contents in directly, but it turns out that .throw() + # is buggy, at least on CPython 3.6: + # https://bugs.python.org/issue29587 + # https://bugs.python.org/issue29590 + # So now we send in the Outcome object and unwrap it on the + # other side. + msg = task.context.run(next_send_fn, next_send) + except StopIteration as stop_iteration: + final_outcome = Value(stop_iteration.value) + except BaseException as task_exc: + # Store for later, removing uninteresting top frames: 1 + # frame we always remove, because it's this function + # catching it, and then in addition we remove however many + # more Context.run adds. + tb = task_exc.__traceback__.tb_next + for _ in range(CONTEXT_RUN_TB_FRAMES): + tb = tb.tb_next + final_outcome = Error(task_exc.with_traceback(tb)) + + if final_outcome is not None: + # We can't call this directly inside the except: blocks + # above, because then the exceptions end up attaching + # themselves to other exceptions as __context__ in + # unwanted ways. + runner.task_exited(task, final_outcome) + else: + task._schedule_points += 1 + if msg is CancelShieldedCheckpoint: + runner.reschedule(task) + elif type(msg) is WaitTaskRescheduled: + task._cancel_points += 1 + task._abort_func = msg.abort_func + # KI is "outside" all cancel scopes, so check for it + # before checking for regular cancellation: + if runner.ki_pending and task is runner.main_task: + task._attempt_delivery_of_pending_ki() + task._attempt_delivery_of_any_pending_cancel() + elif type(msg) is PermanentlyDetachCoroutineObject: + # Pretend the task just exited with the given outcome + runner.task_exited(task, msg.final_outcome) + else: + exc = TypeError( + "trio.run received unrecognized yield message {!r}. " + "Are you trying to use a library written for some " + "other framework like asyncio? That won't work " + "without some kind of compatibility shim.".format(msg) + ) + # The foreign library probably doesn't adhere to our + # protocol of unwrapping whatever outcome gets sent in. + # Instead, we'll arrange to throw `exc` in directly, + # which works for at least asyncio and curio. + runner.reschedule(task, exc) + task._next_send_fn = task.coro.throw + + if runner.instruments: + runner.instrument("after_task_step", task) + del GLOBAL_RUN_CONTEXT.task + + except GeneratorExit: + # The run-loop generator has been garbage collected without finishing + warnings.warn( + RuntimeWarning( + "Trio guest run got abandoned without properly finishing... " + "weird stuff might happen" + ) + ) + except TrioInternalError: + raise + except BaseException as exc: + raise TrioInternalError("internal error in Trio - please file a bug!") from exc + finally: + GLOBAL_RUN_CONTEXT.__dict__.clear() + runner.close() + # Have to do this after runner.close() has disabled KI protection, + # because otherwise there's a race where ki_pending could get set + # after we check it. + if runner.ki_pending: + ki = KeyboardInterrupt() + if isinstance(runner.main_task_outcome, Error): + ki.__context__ = runner.main_task_outcome.error + runner.main_task_outcome = Error(ki) ################################################################ diff --git a/trio/_core/_wakeup_socketpair.py b/trio/_core/_wakeup_socketpair.py index 3513cc1ab3..80d3090ee9 100644 --- a/trio/_core/_wakeup_socketpair.py +++ b/trio/_core/_wakeup_socketpair.py @@ -2,6 +2,7 @@ import sys from contextlib import contextmanager import signal +import warnings from .. import _core from .._util import is_main_thread @@ -38,6 +39,7 @@ def __init__(self): self.write_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except OSError: pass + self.old_wakeup_fd = None def wakeup_thread_and_signal_safe(self): try: @@ -56,21 +58,29 @@ def drain(self): except BlockingIOError: pass - @contextmanager def wakeup_on_signals(self): + assert self.old_wakeup_fd is None if not is_main_thread(): - yield return fd = self.write_sock.fileno() if HAVE_WARN_ON_FULL_BUFFER: - old_wakeup_fd = signal.set_wakeup_fd(fd, warn_on_full_buffer=False) + self.old_wakeup_fd = signal.set_wakeup_fd(fd, warn_on_full_buffer=False) else: - old_wakeup_fd = signal.set_wakeup_fd(fd) - try: - yield - finally: - signal.set_wakeup_fd(old_wakeup_fd) + self.old_wakeup_fd = signal.set_wakeup_fd(fd) + if self.old_wakeup_fd != -1: + warnings.warn( + RuntimeWarning( + "It looks like Trio's signal handling code might have " + "collided with another library you're using. If you're " + "running Trio in guest mode, then this might mean you " + "should set host_uses_signal_set_wakeup_fd=True. " + "Otherwise, file a bug on Trio and we'll help you figure " + "out what's going on." + ) + ) def close(self): self.wakeup_sock.close() self.write_sock.close() + if self.old_wakeup_fd is not None: + signal.set_wakeup_fd(self.old_wakeup_fd) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py new file mode 100644 index 0000000000..46e741e392 --- /dev/null +++ b/trio/_core/tests/test_guest_mode.py @@ -0,0 +1,475 @@ +import pytest +import asyncio +import traceback +import queue +from functools import partial +from math import inf +import signal +import socket +import threading + +import trio +import trio.testing +from .tutil import gc_collect_harder +from ..._util import signal_raise + +# The simplest possible "host" loop. +# Nice features: +# - we can run code "outside" of trio using the schedule function passed to +# our main +# - final result is returned +# - any unhandled exceptions cause an immediate crash +def trivial_guest_run(trio_fn, **start_guest_run_kwargs): + todo = queue.Queue() + + host_thread = threading.current_thread() + + def run_sync_soon_threadsafe(fn): + if host_thread is threading.current_thread(): # pragma: no cover + crash = partial( + pytest.fail, "run_sync_soon_threadsafe called from host thread" + ) + todo.put(("run", crash)) + todo.put(("run", fn)) + + def run_sync_soon_not_threadsafe(fn): + if host_thread is not threading.current_thread(): # pragma: no cover + crash = partial( + pytest.fail, "run_sync_soon_not_threadsafe called from worker thread" + ) + todo.put(("run", crash)) + todo.put(("run", fn)) + + def done_callback(outcome): + todo.put(("unwrap", outcome)) + + trio.lowlevel.start_guest_run( + trio_fn, + run_sync_soon_not_threadsafe, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + run_sync_soon_not_threadsafe=run_sync_soon_not_threadsafe, + done_callback=done_callback, + **start_guest_run_kwargs, + ) + + try: + while True: + op, obj = todo.get() + if op == "run": + obj() + elif op == "unwrap": + return obj.unwrap() + else: # pragma: no cover + assert False + finally: + # Make sure that exceptions raised here don't capture these, so that + # if an exception does cause us to abandon a run then the Trio state + # has a chance to be GC'ed and warn about it. + del todo, run_sync_soon_threadsafe, done_callback + + +def test_guest_trivial(): + async def trio_return(in_host): + await trio.sleep(0) + return "ok" + + assert trivial_guest_run(trio_return) == "ok" + + async def trio_fail(in_host): + raise KeyError("whoopsiedaisy") + + with pytest.raises(KeyError, match="whoopsiedaisy"): + trivial_guest_run(trio_fail) + + +def test_guest_can_do_io(): + async def trio_main(in_host): + record = [] + a, b = trio.socket.socketpair() + with a, b: + async with trio.open_nursery() as nursery: + + async def do_receive(): + record.append(await a.recv(1)) + + nursery.start_soon(do_receive) + await trio.testing.wait_all_tasks_blocked() + + await b.send(b"x") + + assert record == [b"x"] + + trivial_guest_run(trio_main) + + +def test_host_can_directly_wake_trio_task(): + async def trio_main(in_host): + ev = trio.Event() + in_host(ev.set) + await ev.wait() + return "ok" + + assert trivial_guest_run(trio_main) == "ok" + + +def test_host_altering_deadlines_wakes_trio_up(): + def set_deadline(cscope, new_deadline): + cscope.deadline = new_deadline + + async def trio_main(in_host): + with trio.CancelScope() as cscope: + in_host(lambda: set_deadline(cscope, -inf)) + await trio.sleep_forever() + assert cscope.cancelled_caught + + with trio.CancelScope() as cscope: + # also do a change that doesn't affect the next deadline, just to + # exercise that path + in_host(lambda: set_deadline(cscope, 1e6)) + in_host(lambda: set_deadline(cscope, -inf)) + await trio.sleep(999) + assert cscope.cancelled_caught + + return "ok" + + assert trivial_guest_run(trio_main) == "ok" + + +def test_warn_set_wakeup_fd_overwrite(): + assert signal.set_wakeup_fd(-1) == -1 + + async def trio_main(in_host): + return "ok" + + a, b = socket.socketpair() + with a, b: + a.setblocking(False) + + # Warn if there's already a wakeup fd + signal.set_wakeup_fd(a.fileno()) + try: + with pytest.warns(RuntimeWarning, match="signal handling code.*collided"): + assert trivial_guest_run(trio_main) == "ok" + finally: + assert signal.set_wakeup_fd(-1) == a.fileno() + + signal.set_wakeup_fd(a.fileno()) + try: + with pytest.warns(RuntimeWarning, match="signal handling code.*collided"): + assert ( + trivial_guest_run(trio_main, host_uses_signal_set_wakeup_fd=False) + == "ok" + ) + finally: + assert signal.set_wakeup_fd(-1) == a.fileno() + + # Don't warn if there isn't already a wakeup fd + with pytest.warns(None) as record: + assert trivial_guest_run(trio_main) == "ok" + # Apparently this is how you assert 'there were no RuntimeWarnings' + with pytest.raises(AssertionError): + record.pop(RuntimeWarning) + + with pytest.warns(None) as record: + assert ( + trivial_guest_run(trio_main, host_uses_signal_set_wakeup_fd=True) + == "ok" + ) + with pytest.raises(AssertionError): + record.pop(RuntimeWarning) + + # If there's already a wakeup fd, but we've been told to trust it, + # then it's left alone and there's no warning + signal.set_wakeup_fd(a.fileno()) + try: + + async def trio_check_wakeup_fd_unaltered(in_host): + fd = signal.set_wakeup_fd(-1) + assert fd == a.fileno() + signal.set_wakeup_fd(fd) + return "ok" + + with pytest.warns(None) as record: + assert ( + trivial_guest_run( + trio_check_wakeup_fd_unaltered, + host_uses_signal_set_wakeup_fd=True, + ) + == "ok" + ) + with pytest.raises(AssertionError): + record.pop(RuntimeWarning) + finally: + assert signal.set_wakeup_fd(-1) == a.fileno() + + +def test_host_wakeup_doesnt_trigger_wait_all_tasks_blocked(): + # This is designed to hit the branch in unrolled_run where: + # idle_primed=True + # runner.runq is empty + # events is Truth-y + # ...and confirm that in this case, wait_all_tasks_blocked does not get + # triggered. + def set_deadline(cscope, new_deadline): + print(f"setting deadline {new_deadline}") + cscope.deadline = new_deadline + + async def trio_main(in_host): + async def sit_in_wait_all_tasks_blocked(watb_cscope): + with watb_cscope: + # Overall point of this test is that this + # wait_all_tasks_blocked should *not* return normally, but + # only by cancellation. + await trio.testing.wait_all_tasks_blocked(cushion=9999) + assert False # pragma: no cover + assert watb_cscope.cancelled_caught + + async def get_woken_by_host_deadline(watb_cscope): + with trio.CancelScope() as cscope: + print("scheduling stuff to happen") + # Altering the deadline from the host, to something in the + # future, will cause the run loop to wake up, but then + # discover that there is nothing to do and go back to sleep. + # This should *not* trigger wait_all_tasks_blocked. + # + # So the 'before_io_wait' here will wait until we're blocking + # with the wait_all_tasks_blocked primed, and then schedule a + # deadline change. The critical test is that this should *not* + # wake up 'sit_in_wait_all_tasks_blocked'. + # + # The after we've had a chance to wake up + # 'sit_in_wait_all_tasks_blocked', we want the test to + # actually end. So in after_io_wait we schedule a second host + # call to tear things down. + class InstrumentHelper: + def __init__(self): + self.primed = False + + def before_io_wait(self, timeout): + print(f"before_io_wait({timeout})") + if timeout == 9999: # pragma: no branch + assert not self.primed + in_host(lambda: set_deadline(cscope, 1e9)) + self.primed = True + + def after_io_wait(self, timeout): + if self.primed: # pragma: no branch + print("instrument triggered") + in_host(lambda: cscope.cancel()) + trio.lowlevel.remove_instrument(self) + + trio.lowlevel.add_instrument(InstrumentHelper()) + await trio.sleep_forever() + assert cscope.cancelled_caught + watb_cscope.cancel() + + async with trio.open_nursery() as nursery: + watb_cscope = trio.CancelScope() + nursery.start_soon(sit_in_wait_all_tasks_blocked, watb_cscope) + await trio.testing.wait_all_tasks_blocked() + nursery.start_soon(get_woken_by_host_deadline, watb_cscope) + + return "ok" + + assert trivial_guest_run(trio_main) == "ok" + + +def test_guest_warns_if_abandoned(): + # This warning is emitted from the garbage collector. So we have to make + # sure that our abandoned run is garbage. The easiest way to do this is to + # put it into a function, so that we're sure all the local state, + # traceback frames, etc. are garbage once it returns. + def do_abandoned_guest_run(): + async def abandoned_main(in_host): + in_host(lambda: 1 / 0) + while True: + await trio.sleep(0) + + with pytest.raises(ZeroDivisionError): + trivial_guest_run(abandoned_main) + + with pytest.warns(RuntimeWarning, match="Trio guest run got abandoned"): + do_abandoned_guest_run() + gc_collect_harder() + + # If you have problems some day figuring out what's holding onto a + # reference to the unrolled_run generator and making this test fail, + # then this might be useful to help track it down. (It assumes you + # also hack start_guest_run so that it does 'global W; W = + # weakref(unrolled_run_gen)'.) + # + # import gc + # print(trio._core._run.W) + # targets = [trio._core._run.W()] + # for i in range(15): + # new_targets = [] + # for target in targets: + # new_targets += gc.get_referrers(target) + # new_targets.remove(targets) + # print("#####################") + # print(f"depth {i}: {len(new_targets)}") + # print(new_targets) + # targets = new_targets + + with pytest.raises(RuntimeError): + trio.current_time() + + +def aiotrio_run(trio_fn, *, pass_not_threadsafe=True, **start_guest_run_kwargs): + loop = asyncio.new_event_loop() + + async def aio_main(): + trio_done_fut = asyncio.Future() + + def trio_done_callback(main_outcome): + print(f"trio_fn finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + if pass_not_threadsafe: + start_guest_run_kwargs["run_sync_soon_not_threadsafe"] = loop.call_soon + + trio.lowlevel.start_guest_run( + trio_fn, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + **start_guest_run_kwargs, + ) + + return (await trio_done_fut).unwrap() + + try: + return loop.run_until_complete(aio_main()) + finally: + loop.close() + + +def test_guest_mode_on_asyncio(): + async def trio_main(): + print("trio_main!") + + to_trio, from_aio = trio.open_memory_channel(float("inf")) + from_trio = asyncio.Queue() + + aio_task = asyncio.ensure_future(aio_pingpong(from_trio, to_trio)) + + # Make sure we have at least one tick where we don't need to go into + # the thread + await trio.sleep(0) + + from_trio.put_nowait(0) + + async for n in from_aio: + print(f"trio got: {n}") + from_trio.put_nowait(n + 1) + if n >= 10: + aio_task.cancel() + return "trio-main-done" + + async def aio_pingpong(from_trio, to_trio): + print("aio_pingpong!") + + try: + while True: + n = await from_trio.get() + print(f"aio got: {n}") + to_trio.send_nowait(n + 1) + except asyncio.CancelledError: + raise + except: # pragma: no cover + traceback.print_exc() + raise + + assert ( + aiotrio_run( + trio_main, + # Not all versions of asyncio we test on can actually be trusted, + # but this test doesn't care about signal handling, and it's + # easier to just avoid the warnings. + host_uses_signal_set_wakeup_fd=True, + ) + == "trio-main-done" + ) + + assert ( + aiotrio_run( + trio_main, + # Also check that passing only call_soon_threadsafe works, via the + # fallback path where we use it for everything. + pass_not_threadsafe=False, + host_uses_signal_set_wakeup_fd=True, + ) + == "trio-main-done" + ) + + +def test_guest_mode_internal_errors(monkeypatch, recwarn): + with monkeypatch.context() as m: + + async def crash_in_run_loop(in_host): + m.setattr("trio._core._run.GLOBAL_RUN_CONTEXT.runner.runq", "HI") + await trio.sleep(1) + + with pytest.raises(trio.TrioInternalError): + trivial_guest_run(crash_in_run_loop) + + with monkeypatch.context() as m: + + async def crash_in_io(in_host): + m.setattr("trio._core._run.TheIOManager.get_events", None) + await trio.sleep(0) + + with pytest.raises(trio.TrioInternalError): + trivial_guest_run(crash_in_io) + + with monkeypatch.context() as m: + + async def crash_in_worker_thread_io(in_host): + t = threading.current_thread() + old_get_events = trio._core._run.TheIOManager.get_events + + def bad_get_events(*args): + if threading.current_thread() is not t: + raise ValueError("oh no!") + else: + return old_get_events(*args) + + m.setattr("trio._core._run.TheIOManager.get_events", bad_get_events) + + await trio.sleep(1) + + with pytest.raises(trio.TrioInternalError): + trivial_guest_run(crash_in_worker_thread_io) + + gc_collect_harder() + + +def test_guest_mode_ki(): + assert signal.getsignal(signal.SIGINT) is signal.default_int_handler + + # Check SIGINT in Trio func and in host func + async def trio_main(in_host): + with pytest.raises(KeyboardInterrupt): + signal_raise(signal.SIGINT) + + # Host SIGINT should get injected into Trio + in_host(partial(signal_raise, signal.SIGINT)) + await trio.sleep(10) + + with pytest.raises(KeyboardInterrupt) as excinfo: + trivial_guest_run(trio_main) + assert excinfo.value.__context__ is None + # Signal handler should be restored properly on exit + assert signal.getsignal(signal.SIGINT) is signal.default_int_handler + + # Also check chaining in the case where KI is injected after main exits + final_exc = KeyError("whoa") + + async def trio_main_raising(in_host): + in_host(partial(signal_raise, signal.SIGINT)) + raise final_exc + + with pytest.raises(KeyboardInterrupt) as excinfo: + trivial_guest_run(trio_main_raising) + assert excinfo.value.__context__ is final_exc + + assert signal.getsignal(signal.SIGINT) is signal.default_int_handler diff --git a/trio/lowlevel.py b/trio/lowlevel.py index 3ce3e741ba..b54b3dba52 100644 --- a/trio/lowlevel.py +++ b/trio/lowlevel.py @@ -42,6 +42,7 @@ wait_writable, notify_closing, start_thread_soon, + start_guest_run, ) # Unix-specific symbols