From 46e36ce135392ab26d3f966c637b32b2b1f6b9a0 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Sat, 18 May 2019 00:56:00 -0700 Subject: [PATCH 1/6] Stop requiring an async operations that raises an exception to perform a checkpoint --- docs/source/design.rst | 7 + docs/source/reference-core.rst | 42 +++-- docs/source/reference-hazmat.rst | 8 +- newsfragments/474.misc.rst | 5 + trio/_core/_io_epoll.py | 1 - trio/_core/_io_kqueue.py | 1 - trio/_core/_io_windows.py | 1 - trio/_core/_run.py | 2 +- trio/_core/_unbounded_queue.py | 6 +- trio/_core/tests/test_io.py | 10 +- trio/_highlevel_socket.py | 12 +- trio/_socket.py | 3 - trio/_ssl.py | 295 +++++++++++++++---------------- trio/_sync.py | 3 - trio/_timeouts.py | 3 +- trio/_unix_pipes.py | 2 +- trio/_windows_pipes.py | 2 +- trio/testing/_checkpoints.py | 11 +- trio/tests/test_socket.py | 35 ++-- trio/tests/test_ssl.py | 20 +-- trio/tests/test_sync.py | 5 +- 21 files changed, 231 insertions(+), 243 deletions(-) create mode 100644 newsfragments/474.misc.rst diff --git a/docs/source/design.rst b/docs/source/design.rst index 4f6d78c3b7..eb41d55c45 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -221,6 +221,13 @@ In fact, to make this even simpler, we make it so you don't even have to look at the function arguments: each *function* is either a cancel+schedule point on *every* call or on *no* calls. +(Pragmatic exception: a Trio primitive is not required to act as a +cancel+schedule point when it raises an exception, even if it would +act as one in the case of a successful return. See `issue 474 +`__ for more details; +basically, requiring checkpoints on all exception paths added a lot of +implementation complexity with negligible user-facing benefit.) + Observation: since blocking is always a cancel+schedule point, rule 2 implies that any function that *sometimes* blocks is *always* a cancel+schedule point. diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 4fdfcbd0ce..3ff91c0930 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -74,8 +74,7 @@ not others, depending on the arguments passed / network speed / phase of the moon? How do we figure out where the checkpoints are when we're stressed and sleep deprived but still want to get this code review right, and would prefer to reserve our mental energy for -thinking about the actual logic instead of worrying about check -points? +thinking about the actual logic instead of worrying about checkpoints? .. _checkpoint-rule: @@ -85,19 +84,28 @@ them. Here are the rules: * Regular (synchronous) functions never contain any checkpoints. -* Every async function provided by Trio *always* acts as a check - point; if you see ``await ``, or ``async for - ... in ``, or ``async with ``, then - that's *definitely* a checkpoint. - - (Partial exception: for async context managers, it might be only the - entry or only the exit that acts as a checkpoint; this is - documented on a case-by-case basis.) - -* Third-party async functions can act as checkpoints; if you see - ``await `` or one of its friends, then that *might* be a - checkpoint. So to be safe, you should prepare for scheduling or - cancellation happening there. +* If you call an async function provided by Trio (``await + ``), and it doesn't raise an exception, + then it *always* acts as a checkpoint. (If it does raise an + exception, it might act as a checkpoint or might not.) + + * This includes async iterators: If you write ``async for ... in ``, then there will be at least one checkpoint before + each iteration of the loop and one checkpoint after the last + iteration. + + * Partial exception for async context managers: + Both the entry and exit of an ``async with`` block are + defined as async functions; but for a + particular type of async context manager, it's often the + case that only one of them is able to block, which means + only that one will act as a checkpoint. This is documented + on a case-by-case basis. + +* Third-party async functions / iterators / context managers can act + as checkpoints; if you see ``await `` or one of its + friends, then that *might* be a checkpoint. So to be safe, you + should prepare for scheduling or cancellation happening there. The reason we distinguish between Trio functions and other functions is that we can't make any guarantees about third party @@ -138,8 +146,8 @@ A slightly trickier case is a function like:: Here the function acts as a checkpoint if you call it with ``should_sleep`` set to a true value, but not otherwise. This is why -we emphasize that Trio's own async functions are *unconditional* check -points: they *always* check for cancellation and check for scheduling, +we emphasize that Trio's own async functions are *unconditional* checkpoints: +they *always* check for cancellation and check for scheduling, regardless of what arguments they're passed. If you find an async function in Trio that doesn't follow this rule, then it's a bug and you should `let us know diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 09c08296a9..b14c4cfdb6 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -412,10 +412,6 @@ checkpoint semantics. Example:: except BlockingIOError: # need to block and then retry, which we do below pass - except: - # some other error, finish the checkpoint then let it propagate - await cancel_shielded_checkpoint() - raise else: # operation succeeded, finish the checkpoint then return await cancel_shielded_checkpoint() @@ -429,13 +425,13 @@ checkpoint semantics. Example:: This logic is a bit convoluted, but accomplishes all of the following: -* Every execution path passes through a checkpoint (assuming that +* Every successful execution path passes through a checkpoint (assuming that ``wait_for_operation_to_be_ready`` is an unconditional checkpoint) * Our :ref:`cancellation semantics ` say that :exc:`~trio.Cancelled` should only be raised if the operation didn't happen. Using :func:`cancel_shielded_checkpoint` on the early-exit - branches accomplishes this. + branch accomplishes this. * On the path where we do end up blocking, we don't pass through any schedule points before that, which avoids some unnecessary work. diff --git a/newsfragments/474.misc.rst b/newsfragments/474.misc.rst new file mode 100644 index 0000000000..b9de15c005 --- /dev/null +++ b/newsfragments/474.misc.rst @@ -0,0 +1,5 @@ +We've slightly relaxed our definition of which Trio operations act as +:ref:`checkpoints `. A Trio async function that exits by +throwing an exception is no longer guaranteed to execute a checkpoint; +it might or might not. The rules are unchanged for async functions that +don't exit with an exception, async iterators, and async context managers. diff --git a/trio/_core/_io_epoll.py b/trio/_core/_io_epoll.py index 45d8b8c5c5..adae275747 100644 --- a/trio/_core/_io_epoll.py +++ b/trio/_core/_io_epoll.py @@ -103,7 +103,6 @@ async def _epoll_wait(self, fd, attr_name): self._registered[fd] = EpollWaiters() waiters = self._registered[fd] if getattr(waiters, attr_name) is not None: - await _core.checkpoint() raise _core.BusyResourceError( "another task is already reading / writing this fd" ) diff --git a/trio/_core/_io_kqueue.py b/trio/_core/_io_kqueue.py index 4c4ac69879..00998af872 100644 --- a/trio/_core/_io_kqueue.py +++ b/trio/_core/_io_kqueue.py @@ -97,7 +97,6 @@ def monitor_kevent(self, ident, filter): async def wait_kevent(self, ident, filter, abort_func): key = (ident, filter) if key in self._registered: - await _core.checkpoint() raise _core.BusyResourceError( "attempt to register multiple listeners for same " "ident/filter pair" diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 429ba9d9f7..05c62d372b 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -433,7 +433,6 @@ async def _wait_socket(self, which, sock): if not isinstance(sock, int): sock = sock.fileno() if sock in self._socket_waiters[which]: - await _core.checkpoint() raise _core.BusyResourceError( "another task is already waiting to {} this socket" .format(which) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 5b5e2921d2..3042ed2c5f 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -781,7 +781,7 @@ class Task: _child_nurseries = attr.ib(default=attr.Factory(list)) # these are counts of how many cancel/schedule points this task has - # executed, for assert{_no,}_yields + # executed, for assert{_no,}_checkpoints # XX maybe these should be exposed as part of a statistics() method? _cancel_points = attr.ib(default=0) _schedule_points = attr.ib(default=0) diff --git a/trio/_core/_unbounded_queue.py b/trio/_core/_unbounded_queue.py index 13a75fd029..53265b7ead 100644 --- a/trio/_core/_unbounded_queue.py +++ b/trio/_core/_unbounded_queue.py @@ -128,10 +128,8 @@ async def get_batch(self): await self._lot.park() return self._get_batch_protected() else: - try: - return self._get_batch_protected() - finally: - await _core.cancel_shielded_checkpoint() + await _core.cancel_shielded_checkpoint() + return self._get_batch_protected() def statistics(self): """Return an object containing debugging information. diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index 797e3500a9..7f5a43c79c 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -156,9 +156,8 @@ async def test_double_read(socketpair, wait_readable): async with _core.open_nursery() as nursery: nursery.start_soon(wait_readable, a) await wait_all_tasks_blocked() - with assert_checkpoints(): - with pytest.raises(_core.BusyResourceError): - await wait_readable(a) + with pytest.raises(_core.BusyResourceError): + await wait_readable(a) nursery.cancel_scope.cancel() @@ -171,9 +170,8 @@ async def test_double_write(socketpair, wait_writable): async with _core.open_nursery() as nursery: nursery.start_soon(wait_writable, a) await wait_all_tasks_blocked() - with assert_checkpoints(): - with pytest.raises(_core.BusyResourceError): - await wait_writable(a) + with pytest.raises(_core.BusyResourceError): + await wait_writable(a) nursery.cancel_scope.cancel() diff --git a/trio/_highlevel_socket.py b/trio/_highlevel_socket.py index 8611e1fdf9..0eba659128 100644 --- a/trio/_highlevel_socket.py +++ b/trio/_highlevel_socket.py @@ -95,17 +95,16 @@ def __init__(self, socket): async def send_all(self, data): if self.socket.did_shutdown_SHUT_WR: - await trio.hazmat.checkpoint() raise trio.ClosedResourceError("can't send data after sending EOF") with self._send_conflict_detector.sync: with _translate_socket_errors_to_stream_errors(): with memoryview(data) as data: if not data: - await trio.hazmat.checkpoint() if self.socket.fileno() == -1: raise trio.ClosedResourceError( "socket was already closed" ) + await trio.hazmat.checkpoint() return total_sent = 0 while total_sent < len(data): @@ -114,7 +113,7 @@ async def send_all(self, data): total_sent += sent async def wait_send_all_might_not_block(self): - async with self._send_conflict_detector: + with self._send_conflict_detector.sync: if self.socket.fileno() == -1: raise trio.ClosedResourceError with _translate_socket_errors_to_stream_errors(): @@ -131,7 +130,6 @@ async def send_eof(self): async def receive_some(self, max_bytes): if max_bytes < 1: - await trio.hazmat.checkpoint() raise ValueError("max_bytes must be >= 1") with _translate_socket_errors_to_stream_errors(): return await self.socket.recv(max_bytes) @@ -383,7 +381,5 @@ async def aclose(self): """Close this listener and its underlying socket. """ - try: - self.socket.close() - finally: - await trio.hazmat.checkpoint() + self.socket.close() + await trio.hazmat.checkpoint() diff --git a/trio/_socket.py b/trio/_socket.py index 4db376d1e4..1b563f3d43 100644 --- a/trio/_socket.py +++ b/trio/_socket.py @@ -456,7 +456,6 @@ def close(self): self._sock.close() async def bind(self, address): - await trio.hazmat.checkpoint() address = await self._resolve_local_address(address) if ( hasattr(_stdlib_socket, "AF_UNIX") @@ -493,11 +492,9 @@ async def _resolve_address(self, address, flags): # Do some pre-checking (or exit early for non-IP sockets) if self._sock.family == _stdlib_socket.AF_INET: if not isinstance(address, tuple) or not len(address) == 2: - await trio.hazmat.checkpoint() raise ValueError("address should be a (host, port) tuple") elif self._sock.family == _stdlib_socket.AF_INET6: if not isinstance(address, tuple) or not 2 <= len(address) <= 4: - await trio.hazmat.checkpoint() raise ValueError( "address should be a (host, port, [flowinfo, [scopeid]]) " "tuple" diff --git a/trio/_ssl.py b/trio/_ssl.py index f2d639dedc..8b3511fe67 100644 --- a/trio/_ssl.py +++ b/trio/_ssl.py @@ -411,146 +411,144 @@ def _check_status(self): async def _retry(self, fn, *args, ignore_want_read=False): await trio.hazmat.checkpoint_if_cancelled() yielded = False - try: - finished = False - while not finished: - # WARNING: this code needs to be very careful with when it - # calls 'await'! There might be multiple tasks calling this - # function at the same time trying to do different operations, - # so we need to be careful to: - # - # 1) interact with the SSLObject, then - # 2) await on exactly one thing that lets us make forward - # progress, then - # 3) loop or exit - # - # In particular we don't want to yield while interacting with - # the SSLObject (because it's shared state, so someone else - # might come in and mess with it while we're suspended), and - # we don't want to yield *before* starting the operation that - # will help us make progress, because then someone else might - # come in and leapfrog us. - - # Call the SSLObject method, and get its result. - # - # NB: despite what the docs say, SSLWantWriteError can't - # happen – "Writes to memory BIOs will always succeed if - # memory is available: that is their size can grow - # indefinitely." - # https://wiki.openssl.org/index.php/Manual:BIO_s_mem(3) + finished = False + while not finished: + # WARNING: this code needs to be very careful with when it + # calls 'await'! There might be multiple tasks calling this + # function at the same time trying to do different operations, + # so we need to be careful to: + # + # 1) interact with the SSLObject, then + # 2) await on exactly one thing that lets us make forward + # progress, then + # 3) loop or exit + # + # In particular we don't want to yield while interacting with + # the SSLObject (because it's shared state, so someone else + # might come in and mess with it while we're suspended), and + # we don't want to yield *before* starting the operation that + # will help us make progress, because then someone else might + # come in and leapfrog us. + + # Call the SSLObject method, and get its result. + # + # NB: despite what the docs say, SSLWantWriteError can't + # happen – "Writes to memory BIOs will always succeed if + # memory is available: that is their size can grow + # indefinitely." + # https://wiki.openssl.org/index.php/Manual:BIO_s_mem(3) + want_read = False + ret = None + try: + ret = fn(*args) + except _stdlib_ssl.SSLWantReadError: + want_read = True + except ( + _stdlib_ssl.SSLError, _stdlib_ssl.CertificateError + ) as exc: + self._state = _State.BROKEN + raise trio.BrokenResourceError from exc + else: + finished = True + if ignore_want_read: want_read = False - ret = None - try: - ret = fn(*args) - except _stdlib_ssl.SSLWantReadError: - want_read = True - except ( - _stdlib_ssl.SSLError, _stdlib_ssl.CertificateError - ) as exc: - self._state = _State.BROKEN - raise trio.BrokenResourceError from exc - else: - finished = True - if ignore_want_read: - want_read = False - finished = True - to_send = self._outgoing.read() + finished = True + to_send = self._outgoing.read() - # Outputs from the above code block are: - # - # - to_send: bytestring; if non-empty then we need to send - # this data to make forward progress - # - # - want_read: True if we need to receive_some some data to make - # forward progress - # - # - finished: False means that we need to retry the call to - # fn(*args) again, after having pushed things forward. True - # means we still need to do whatever was said (in particular - # send any data in to_send), but once we do then we're - # done. - # - # - ret: the operation's return value. (Meaningless unless - # finished is True.) - # - # Invariant: want_read and finished can't both be True at the - # same time. - # - # Now we need to move things forward. There are two things we - # might have to do, and any given operation might require - # either, both, or neither to proceed: - # - # - send the data in to_send - # - # - receive_some some data and put it into the incoming BIO - # - # Our strategy is: if there's data to send, send it; - # *otherwise* if there's data to receive_some, receive_some it. - # - # If both need to happen, then we only send. Why? Well, we - # know that *right now* we have to both send and receive_some - # before the operation can complete. But as soon as we yield, - # that information becomes potentially stale – e.g. while - # we're sending, some other task might go and receive_some the - # data we need and put it into the incoming BIO. And if it - # does, then we *definitely don't* want to do a receive_some – - # there might not be any more data coming, and we'd deadlock! - # We could do something tricky to keep track of whether a - # receive_some happens while we're sending, but the case where - # we have to do both is very unusual (only during a - # renegotation), so it's better to keep things simple. So we - # do just one potentially-blocking operation, then check again - # for fresh information. - # - # And we prioritize sending over receiving because, if there - # are multiple tasks that want to receive_some, then it - # doesn't matter what order they go in. But if there are - # multiple tasks that want to send, then they each have - # different data, and the data needs to get put onto the wire - # in the same order that it was retrieved from the outgoing - # BIO. So if we have data to send, that *needs* to be the - # *very* *next* *thing* we do, to make sure no-one else sneaks - # in before us. Or if we can't send immediately because - # someone else is, then we at least need to get in line - # immediately. - if to_send: - # NOTE: This relies on the lock being strict FIFO fair! - async with self._inner_send_lock: - yielded = True - try: - await self.transport_stream.send_all(to_send) - except: - # Some unknown amount of our data got sent, and we - # don't know how much. This stream is doomed. - self._state = _State.BROKEN - raise - elif want_read: - # It's possible that someone else is already blocked in - # transport_stream.receive_some. If so then we want to - # wait for them to finish, but we don't want to call - # transport_stream.receive_some again ourselves; we just - # want to loop around and check if their contribution - # helped anything. So we make a note of how many times - # some task has been through here before taking the lock, - # and if it's changed by the time we get the lock, then we - # skip calling transport_stream.receive_some and loop - # around immediately. - recv_count = self._inner_recv_count - async with self._inner_recv_lock: - yielded = True - if recv_count == self._inner_recv_count: - data = await self.transport_stream.receive_some( - self._max_refill_bytes - ) - if not data: - self._incoming.write_eof() - else: - self._incoming.write(data) - self._inner_recv_count += 1 - return ret - finally: - if not yielded: - await trio.hazmat.cancel_shielded_checkpoint() + # Outputs from the above code block are: + # + # - to_send: bytestring; if non-empty then we need to send + # this data to make forward progress + # + # - want_read: True if we need to receive_some some data to make + # forward progress + # + # - finished: False means that we need to retry the call to + # fn(*args) again, after having pushed things forward. True + # means we still need to do whatever was said (in particular + # send any data in to_send), but once we do then we're + # done. + # + # - ret: the operation's return value. (Meaningless unless + # finished is True.) + # + # Invariant: want_read and finished can't both be True at the + # same time. + # + # Now we need to move things forward. There are two things we + # might have to do, and any given operation might require + # either, both, or neither to proceed: + # + # - send the data in to_send + # + # - receive_some some data and put it into the incoming BIO + # + # Our strategy is: if there's data to send, send it; + # *otherwise* if there's data to receive_some, receive_some it. + # + # If both need to happen, then we only send. Why? Well, we + # know that *right now* we have to both send and receive_some + # before the operation can complete. But as soon as we yield, + # that information becomes potentially stale – e.g. while + # we're sending, some other task might go and receive_some the + # data we need and put it into the incoming BIO. And if it + # does, then we *definitely don't* want to do a receive_some – + # there might not be any more data coming, and we'd deadlock! + # We could do something tricky to keep track of whether a + # receive_some happens while we're sending, but the case where + # we have to do both is very unusual (only during a + # renegotation), so it's better to keep things simple. So we + # do just one potentially-blocking operation, then check again + # for fresh information. + # + # And we prioritize sending over receiving because, if there + # are multiple tasks that want to receive_some, then it + # doesn't matter what order they go in. But if there are + # multiple tasks that want to send, then they each have + # different data, and the data needs to get put onto the wire + # in the same order that it was retrieved from the outgoing + # BIO. So if we have data to send, that *needs* to be the + # *very* *next* *thing* we do, to make sure no-one else sneaks + # in before us. Or if we can't send immediately because + # someone else is, then we at least need to get in line + # immediately. + if to_send: + # NOTE: This relies on the lock being strict FIFO fair! + async with self._inner_send_lock: + yielded = True + try: + await self.transport_stream.send_all(to_send) + except: + # Some unknown amount of our data got sent, and we + # don't know how much. This stream is doomed. + self._state = _State.BROKEN + raise + elif want_read: + # It's possible that someone else is already blocked in + # transport_stream.receive_some. If so then we want to + # wait for them to finish, but we don't want to call + # transport_stream.receive_some again ourselves; we just + # want to loop around and check if their contribution + # helped anything. So we make a note of how many times + # some task has been through here before taking the lock, + # and if it's changed by the time we get the lock, then we + # skip calling transport_stream.receive_some and loop + # around immediately. + recv_count = self._inner_recv_count + async with self._inner_recv_lock: + yielded = True + if recv_count == self._inner_recv_count: + data = await self.transport_stream.receive_some( + self._max_refill_bytes + ) + if not data: + self._incoming.write_eof() + else: + self._incoming.write(data) + self._inner_recv_count += 1 + if not yielded: + await trio.hazmat.cancel_shielded_checkpoint() + return ret async def _do_handshake(self): try: @@ -583,11 +581,7 @@ async def do_handshake(self): :exc:`trio.BrokenResourceError`. """ - try: - self._check_status() - except: - await trio.hazmat.checkpoint() - raise + self._check_status() await self._handshook.ensure(checkpoint=True) # Most things work if we don't explicitly force do_handshake to be called @@ -611,7 +605,7 @@ async def receive_some(self, max_bytes): :exc:`trio.BrokenResourceError`. """ - async with self._outer_recv_conflict_detector: + with self._outer_recv_conflict_detector.sync: self._check_status() try: await self._handshook.ensure(checkpoint=False) @@ -625,6 +619,7 @@ async def receive_some(self, max_bytes): (_stdlib_ssl.SSLEOFError, _stdlib_ssl.SSLSyscallError) ) ): + await trio.hazmat.checkpoint() return b"" else: raise @@ -643,6 +638,7 @@ async def receive_some(self, max_bytes): self._https_compatible and isinstance(exc.__cause__, _stdlib_ssl.SSLEOFError) ): + await trio.hazmat.checkpoint() return b"" else: raise @@ -658,7 +654,7 @@ async def send_all(self, data): :exc:`trio.BrokenResourceError`. """ - async with self._outer_send_conflict_detector: + with self._outer_send_conflict_detector.sync: self._check_status() await self._handshook.ensure(checkpoint=False) # SSLObject interprets write(b"") as an EOF for some reason, which @@ -685,8 +681,8 @@ async def unwrap(self): ``transport_stream.receive_some(...)``. """ - async with self._outer_recv_conflict_detector, \ - self._outer_send_conflict_detector: + with self._outer_recv_conflict_detector.sync, \ + self._outer_send_conflict_detector.sync: self._check_status() await self._handshook.ensure(checkpoint=False) await self._retry(self._ssl_object.unwrap) @@ -792,9 +788,8 @@ async def wait_send_all_might_not_block(self): # # First, we take the outer send lock, because of Trio's standard # semantics that wait_send_all_might_not_block and send_all - # conflict. This also takes care of providing correct checkpoint - # semantics before we potentially error out from _check_status(). - async with self._outer_send_conflict_detector: + # conflict. + with self._outer_send_conflict_detector.sync: self._check_status() # Then we take the inner send lock. We know that no other tasks # are calling self.send_all or self.wait_send_all_might_not_block, diff --git a/trio/_sync.py b/trio/_sync.py index ce4baefe47..086253e14b 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -306,9 +306,6 @@ async def acquire_on_behalf_of(self, borrower): except trio.Cancelled: self._pending_borrowers.pop(task) raise - except: - await trio.hazmat.cancel_shielded_checkpoint() - raise else: await trio.hazmat.cancel_shielded_checkpoint() diff --git a/trio/_timeouts.py b/trio/_timeouts.py index 67f6cd8d13..fc1fa2d76e 100644 --- a/trio/_timeouts.py +++ b/trio/_timeouts.py @@ -61,7 +61,8 @@ async def sleep_until(deadline): Args: deadline (float): The time at which we should wake up again. May be in - the past, in which case this function yields but does not block. + the past, in which case this function executes a checkpoint but + does not block. """ with move_on_at(deadline): diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index e70ed58da4..b16f562563 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -100,7 +100,7 @@ async def send_all(self, data: bytes): raise trio.BrokenResourceError from e async def wait_send_all_might_not_block(self) -> None: - async with self._conflict_detector: + with self._conflict_detector.sync: if self._fd_holder.closed: raise trio.ClosedResourceError("this pipe was already closed") try: diff --git a/trio/_windows_pipes.py b/trio/_windows_pipes.py index 7f6333f514..afd0eef62e 100644 --- a/trio/_windows_pipes.py +++ b/trio/_windows_pipes.py @@ -89,7 +89,7 @@ def __init__(self, handle: int) -> None: ) async def receive_some(self, max_bytes: int) -> bytes: - async with self._conflict_detector: + with self._conflict_detector.sync: if self._handle_holder.closed: raise _core.ClosedResourceError("this pipe is already closed") diff --git a/trio/testing/_checkpoints.py b/trio/testing/_checkpoints.py index f97c91a30b..716260893b 100644 --- a/trio/testing/_checkpoints.py +++ b/trio/testing/_checkpoints.py @@ -13,7 +13,6 @@ def _assert_yields_or_not(expected): orig_schedule = task._schedule_points try: yield - finally: if ( expected and ( task._cancel_points == orig_cancel @@ -21,18 +20,20 @@ def _assert_yields_or_not(expected): ) ): raise AssertionError("assert_checkpoints block did not yield!") - elif ( + finally: + if ( not expected and ( task._cancel_points != orig_cancel or task._schedule_points != orig_schedule ) ): - raise AssertionError("assert_no_yields block yielded!") + raise AssertionError("assert_no_checkpoints block yielded!") def assert_checkpoints(): """Use as a context manager to check that the code inside the ``with`` - block executes at least one :ref:`checkpoint `. + block either exits with an exception or executes at least one + :ref:`checkpoint `. Raises: AssertionError: if no checkpoint was executed. @@ -51,7 +52,7 @@ def assert_checkpoints(): def assert_no_checkpoints(): """Use as a context manager to check that the code inside the ``with`` - block does not execute any :ref:`check points `. + block does not execute any :ref:`checkpoints `. Raises: AssertionError: if a checkpoint was executed. diff --git a/trio/tests/test_socket.py b/trio/tests/test_socket.py index 43a39cb16b..beab12a999 100644 --- a/trio/tests/test_socket.py +++ b/trio/tests/test_socket.py @@ -620,18 +620,16 @@ async def t2(): # This tests the complicated paths through connect async def test_SocketType_connect_paths(): with tsocket.socket() as sock: - with assert_checkpoints(): - with pytest.raises(ValueError): - # Should be a tuple - await sock.connect("localhost") + with pytest.raises(ValueError): + # Should be a tuple + await sock.connect("localhost") # cancelled before we start with tsocket.socket() as sock: - with assert_checkpoints(): - with _core.CancelScope() as cancel_scope: - cancel_scope.cancel() - with pytest.raises(_core.Cancelled): - await sock.connect(("127.0.0.1", 80)) + with _core.CancelScope() as cancel_scope: + cancel_scope.cancel() + with pytest.raises(_core.Cancelled): + await sock.connect(("127.0.0.1", 80)) # Cancelled in between the connect() call and the connect completing with _core.CancelScope() as cancel_scope: @@ -662,16 +660,15 @@ def connect(self, *args, **kwargs): # Failed connect (hopefully after raising BlockingIOError) with tsocket.socket() as sock: - with assert_checkpoints(): - with pytest.raises(OSError): - # TCP port 2 is not assigned. Pretty sure nothing will be - # listening there. (We used to bind a port and then *not* call - # listen() to ensure nothing was listening there, but it turns - # out on macOS if you do this it takes 30 seconds for the - # connect to fail. Really. Also if you use a non-routable - # address. This way fails instantly though. As long as nothing - # is listening on port 2.) - await sock.connect(("127.0.0.1", 2)) + with pytest.raises(OSError): + # TCP port 2 is not assigned. Pretty sure nothing will be + # listening there. (We used to bind a port and then *not* call + # listen() to ensure nothing was listening there, but it turns + # out on macOS if you do this it takes 30 seconds for the + # connect to fail. Really. Also if you use a non-routable + # address. This way fails instantly though. As long as nothing + # is listening on port 2.) + await sock.connect(("127.0.0.1", 2)) async def test_resolve_remote_address_exception_closes_socket(): diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index 1ddbfc2dcf..b97956cb2a 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -858,23 +858,19 @@ async def server_closer(): await client_ssl.aclose() # Trying to send more data does not work - with assert_checkpoints(): - with pytest.raises(ClosedResourceError): - await server_ssl.send_all(b"123") + with pytest.raises(ClosedResourceError): + await server_ssl.send_all(b"123") # And once the connection is has been closed *locally*, then instead of # getting empty bytestrings we get a proper error - with assert_checkpoints(): - with pytest.raises(ClosedResourceError): - await client_ssl.receive_some(10) == b"" + with pytest.raises(ClosedResourceError): + await client_ssl.receive_some(10) == b"" - with assert_checkpoints(): - with pytest.raises(ClosedResourceError): - await client_ssl.unwrap() + with pytest.raises(ClosedResourceError): + await client_ssl.unwrap() - with assert_checkpoints(): - with pytest.raises(ClosedResourceError): - await client_ssl.do_handshake() + with pytest.raises(ClosedResourceError): + await client_ssl.do_handshake() # Check that a graceful close *before* handshaking gives a clean EOF on # the other side diff --git a/trio/tests/test_sync.py b/trio/tests/test_sync.py index 3b0f5562c4..c29d0f71c4 100644 --- a/trio/tests/test_sync.py +++ b/trio/tests/test_sync.py @@ -68,9 +68,8 @@ async def test_CapacityLimiter(): with pytest.raises(RuntimeError): c.acquire_nowait() assert c.borrowed_tokens == 1 - with assert_checkpoints(): - with pytest.raises(RuntimeError): - await c.acquire() + with pytest.raises(RuntimeError): + await c.acquire() assert c.borrowed_tokens == 1 # We can acquire on behalf of someone else though From 0878c7df53add064b3e2bf267345adea1f95e164 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Sat, 18 May 2019 13:49:37 -0700 Subject: [PATCH 2/6] yapf --- trio/_ssl.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/trio/_ssl.py b/trio/_ssl.py index 8b3511fe67..07f8e388be 100644 --- a/trio/_ssl.py +++ b/trio/_ssl.py @@ -443,9 +443,7 @@ async def _retry(self, fn, *args, ignore_want_read=False): ret = fn(*args) except _stdlib_ssl.SSLWantReadError: want_read = True - except ( - _stdlib_ssl.SSLError, _stdlib_ssl.CertificateError - ) as exc: + except (_stdlib_ssl.SSLError, _stdlib_ssl.CertificateError) as exc: self._state = _State.BROKEN raise trio.BrokenResourceError from exc else: From 8e9cc7ed91665a7d572be513b1c4a809c61a0ba4 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Mon, 3 Jun 2019 22:16:49 -0700 Subject: [PATCH 3/6] Fix UnboundedQueue, remove async ConflictDetector, clarify newsfragment --- newsfragments/474.misc.rst | 3 +++ trio/_core/_unbounded_queue.py | 6 ++++-- trio/_highlevel_socket.py | 7 +++--- trio/_signals.py | 2 +- trio/_ssl.py | 10 ++++----- trio/_unix_pipes.py | 9 ++++---- trio/_util.py | 37 +++++++++----------------------- trio/_windows_pipes.py | 8 ++++--- trio/testing/_memory_streams.py | 38 ++++++++++++++++++--------------- trio/tests/test_ssl.py | 9 +++++--- trio/tests/test_util.py | 24 ++++++--------------- 11 files changed, 71 insertions(+), 82 deletions(-) diff --git a/newsfragments/474.misc.rst b/newsfragments/474.misc.rst index b9de15c005..29c08c668a 100644 --- a/newsfragments/474.misc.rst +++ b/newsfragments/474.misc.rst @@ -3,3 +3,6 @@ We've slightly relaxed our definition of which Trio operations act as throwing an exception is no longer guaranteed to execute a checkpoint; it might or might not. The rules are unchanged for async functions that don't exit with an exception, async iterators, and async context managers. +:func:`trio.testing.assert_checkpoints` has been updated to reflect the +new behavior: if its ``with`` block exits with an exception, no assertion +is made. diff --git a/trio/_core/_unbounded_queue.py b/trio/_core/_unbounded_queue.py index 7b62240a83..e6d3dc84fa 100644 --- a/trio/_core/_unbounded_queue.py +++ b/trio/_core/_unbounded_queue.py @@ -127,8 +127,10 @@ async def get_batch(self): await self._lot.park() return self._get_batch_protected() else: - await _core.cancel_shielded_checkpoint() - return self._get_batch_protected() + try: + return self._get_batch_protected() + finally: + await _core.cancel_shielded_checkpoint() def statistics(self): """Return an object containing debugging information. diff --git a/trio/_highlevel_socket.py b/trio/_highlevel_socket.py index 0eba659128..e25f498b9a 100644 --- a/trio/_highlevel_socket.py +++ b/trio/_highlevel_socket.py @@ -96,7 +96,7 @@ def __init__(self, socket): async def send_all(self, data): if self.socket.did_shutdown_SHUT_WR: raise trio.ClosedResourceError("can't send data after sending EOF") - with self._send_conflict_detector.sync: + with self._send_conflict_detector: with _translate_socket_errors_to_stream_errors(): with memoryview(data) as data: if not data: @@ -113,14 +113,15 @@ async def send_all(self, data): total_sent += sent async def wait_send_all_might_not_block(self): - with self._send_conflict_detector.sync: + with self._send_conflict_detector: if self.socket.fileno() == -1: raise trio.ClosedResourceError with _translate_socket_errors_to_stream_errors(): await self.socket.wait_writable() async def send_eof(self): - async with self._send_conflict_detector: + with self._send_conflict_detector: + await trio.hazmat.checkpoint() # On macOS, calling shutdown a second time raises ENOTCONN, but # send_eof needs to be idempotent. if self.socket.did_shutdown_SHUT_WR: diff --git a/trio/_signals.py b/trio/_signals.py index 15eabd22ea..09dbedecc4 100644 --- a/trio/_signals.py +++ b/trio/_signals.py @@ -107,7 +107,7 @@ async def __anext__(self): # In principle it would be possible to support multiple concurrent # calls to __anext__, but doing it without race conditions is quite # tricky, and there doesn't seem to be any point in trying. - with self._conflict_detector.sync: + with self._conflict_detector: await self._have_pending.wait() signum, _ = self._pending.popitem(last=False) if not self._pending: diff --git a/trio/_ssl.py b/trio/_ssl.py index 07f8e388be..8f64161df2 100644 --- a/trio/_ssl.py +++ b/trio/_ssl.py @@ -603,7 +603,7 @@ async def receive_some(self, max_bytes): :exc:`trio.BrokenResourceError`. """ - with self._outer_recv_conflict_detector.sync: + with self._outer_recv_conflict_detector: self._check_status() try: await self._handshook.ensure(checkpoint=False) @@ -652,7 +652,7 @@ async def send_all(self, data): :exc:`trio.BrokenResourceError`. """ - with self._outer_send_conflict_detector.sync: + with self._outer_send_conflict_detector: self._check_status() await self._handshook.ensure(checkpoint=False) # SSLObject interprets write(b"") as an EOF for some reason, which @@ -679,8 +679,8 @@ async def unwrap(self): ``transport_stream.receive_some(...)``. """ - with self._outer_recv_conflict_detector.sync, \ - self._outer_send_conflict_detector.sync: + with self._outer_recv_conflict_detector, \ + self._outer_send_conflict_detector: self._check_status() await self._handshook.ensure(checkpoint=False) await self._retry(self._ssl_object.unwrap) @@ -787,7 +787,7 @@ async def wait_send_all_might_not_block(self): # First, we take the outer send lock, because of Trio's standard # semantics that wait_send_all_might_not_block and send_all # conflict. - with self._outer_send_conflict_detector.sync: + with self._outer_send_conflict_detector: self._check_status() # Then we take the inner send lock. We know that no other tasks # are calling self.send_all or self.wait_send_all_might_not_block, diff --git a/trio/_unix_pipes.py b/trio/_unix_pipes.py index b16f562563..58c203f3ef 100644 --- a/trio/_unix_pipes.py +++ b/trio/_unix_pipes.py @@ -75,12 +75,12 @@ def __init__(self, fd: int): ) async def send_all(self, data: bytes): - async with self._conflict_detector: + with self._conflict_detector: # have to check up front, because send_all(b"") on a closed pipe # should raise if self._fd_holder.closed: raise trio.ClosedResourceError("this pipe was already closed") - + await trio.hazmat.checkpoint() length = len(data) # adapted from the SocketStream code with memoryview(data) as view: @@ -100,7 +100,7 @@ async def send_all(self, data: bytes): raise trio.BrokenResourceError from e async def wait_send_all_might_not_block(self) -> None: - with self._conflict_detector.sync: + with self._conflict_detector: if self._fd_holder.closed: raise trio.ClosedResourceError("this pipe was already closed") try: @@ -127,13 +127,14 @@ def __init__(self, fd: int): ) async def receive_some(self, max_bytes: int) -> bytes: - async with self._conflict_detector: + with self._conflict_detector: if not isinstance(max_bytes, int): raise TypeError("max_bytes must be integer >= 1") if max_bytes < 1: raise ValueError("max_bytes must be integer >= 1") + await trio.hazmat.checkpoint() while True: try: data = os.read(self._fd_holder.fd, max_bytes) diff --git a/trio/_util.py b/trio/_util.py index 2460dde7ec..41f3f681d5 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -107,26 +107,11 @@ def is_main_thread(): return False -class _ConflictDetectorSync: - def __init__(self, msg): - self._msg = msg - self._held = False - - def __enter__(self): - if self._held: - raise trio.BusyResourceError(self._msg) - else: - self._held = True - - def __exit__(self, *args): - self._held = False - - class ConflictDetector: """Detect when two tasks are about to perform operations that would conflict. - Use as an async context manager; if two tasks enter it at the same + Use as a synchronous context manager; if two tasks enter it at the same time then the second one raises an error. You can use it when there are two pieces of code that *would* collide and need a lock if they ever were called at the same time, but that should never happen. @@ -134,22 +119,20 @@ class ConflictDetector: We use this in particular for things like, making sure that two different tasks don't call sendall simultaneously on the same stream. - This executes a checkpoint on entry. That's the only reason it's async. - - To use from sync code, do ``with cd.sync``; this is just like ``async with - cd`` except that it doesn't execute a checkpoint. - """ def __init__(self, msg): - self.sync = _ConflictDetectorSync(msg) + self._msg = msg + self._held = False - async def __aenter__(self): - await trio.hazmat.checkpoint() - return self.sync.__enter__() + def __enter__(self): + if self._held: + raise trio.BusyResourceError(self._msg) + else: + self._held = True - async def __aexit__(self, *args): - return self.sync.__exit__() + def __exit__(self, *args): + self._held = False def async_wraps(cls, wrapped_cls, attr_name): diff --git a/trio/_windows_pipes.py b/trio/_windows_pipes.py index a219a8d59f..6d2615ac7d 100644 --- a/trio/_windows_pipes.py +++ b/trio/_windows_pipes.py @@ -46,11 +46,12 @@ def __init__(self, handle: int) -> None: ) async def send_all(self, data: bytes): - async with self._conflict_detector: + with self._conflict_detector: if self._handle_holder.closed: raise _core.ClosedResourceError("this pipe is already closed") if not data: + await trio.hazmat.checkpoint() return try: @@ -65,11 +66,12 @@ async def send_all(self, data: bytes): assert written == len(data) async def wait_send_all_might_not_block(self) -> None: - async with self._conflict_detector: + with self._conflict_detector: if self._handle_holder.closed: raise _core.ClosedResourceError("This pipe is already closed") # not implemented yet, and probably not needed + await trio.hazmat.checkpoint() async def aclose(self): await self._handle_holder.aclose() @@ -85,7 +87,7 @@ def __init__(self, handle: int) -> None: ) async def receive_some(self, max_bytes: int) -> bytes: - with self._conflict_detector.sync: + with self._conflict_detector: if self._handle_holder.closed: raise _core.ClosedResourceError("this pipe is already closed") diff --git a/trio/testing/_memory_streams.py b/trio/testing/_memory_streams.py index 5d42f34d3a..8457c82f7a 100644 --- a/trio/testing/_memory_streams.py +++ b/trio/testing/_memory_streams.py @@ -67,17 +67,19 @@ def _get_impl(self, max_bytes): return bytearray() def get_nowait(self, max_bytes=None): - with self._fetch_lock.sync: + with self._fetch_lock: self._check_max_bytes(max_bytes) if not self._closed and not self._data: raise _core.WouldBlock return self._get_impl(max_bytes) async def get(self, max_bytes=None): - async with self._fetch_lock: + with self._fetch_lock: self._check_max_bytes(max_bytes) if not self._closed and not self._data: await self._lot.park() + else: + await _core.checkpoint() return self._get_impl(max_bytes) @@ -121,10 +123,10 @@ async def send_all(self, data): calls the :attr:`send_all_hook` (if any). """ - # The lock itself is a checkpoint, but then we also yield inside the - # lock to give ourselves a chance to detect buggy user code that calls - # this twice at the same time. - async with self._conflict_detector: + # Execute two checkpoints so we have more of a chance to detect + # buggy user code that calls this twice at the same time. + with self._conflict_detector: + await _core.checkpoint() await _core.checkpoint() self._outgoing.put(data) if self.send_all_hook is not None: @@ -135,10 +137,10 @@ async def wait_send_all_might_not_block(self): then returns immediately. """ - # The lock itself is a checkpoint, but then we also yield inside the - # lock to give ourselves a chance to detect buggy user code that calls - # this twice at the same time. - async with self._conflict_detector: + # Execute two checkpoints so we have more of a chance to detect + # buggy user code that calls this twice at the same time. + with self._conflict_detector: + await _core.checkpoint() await _core.checkpoint() # check for being closed: self._outgoing.put(b"") @@ -228,10 +230,10 @@ async def receive_some(self, max_bytes): data from the internal buffer, blocking if necessary. """ - # The lock itself is a checkpoint, but then we also yield inside the - # lock to give ourselves a chance to detect buggy user code that calls - # this twice at the same time. - async with self._conflict_detector: + # Execute two checkpoints so we have more of a chance to detect + # buggy user code that calls this twice at the same time. + with self._conflict_detector: + await _core.checkpoint() await _core.checkpoint() if max_bytes is None: raise TypeError("max_bytes must not be None") @@ -470,6 +472,7 @@ async def _wait_for(self, fn): if self._sender_closed or self._receiver_closed: break await self._waiters.park() + await _core.checkpoint() def close_sender(self): self._sender_closed = True @@ -480,7 +483,7 @@ def close_receiver(self): self._something_happened() async def send_all(self, data): - async with self._send_conflict_detector: + with self._send_conflict_detector: if self._sender_closed: raise _core.ClosedResourceError if self._receiver_closed: @@ -495,17 +498,18 @@ async def send_all(self, data): raise _core.BrokenResourceError async def wait_send_all_might_not_block(self): - async with self._send_conflict_detector: + with self._send_conflict_detector: if self._sender_closed: raise _core.ClosedResourceError if self._receiver_closed: + await _core.checkpoint() return await self._wait_for(lambda: self._receiver_waiting) if self._sender_closed: raise _core.ClosedResourceError async def receive_some(self, max_bytes): - async with self._receive_conflict_detector: + with self._receive_conflict_detector: # Argument validation max_bytes = operator.index(max_bytes) if max_bytes < 1: diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index b97956cb2a..c004f5a38e 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -194,13 +194,15 @@ def renegotiate(self): assert self._conn.renegotiate() async def wait_send_all_might_not_block(self): - async with self._send_all_conflict_detector: + with self._send_all_conflict_detector: + await _core.checkpoint() await _core.checkpoint() await self.sleeper("wait_send_all_might_not_block") async def send_all(self, data): print(" --> transport_stream.send_all") - async with self._send_all_conflict_detector: + with self._send_all_conflict_detector: + await _core.checkpoint() await _core.checkpoint() await self.sleeper("send_all") self._conn.bio_write(data) @@ -222,8 +224,9 @@ async def send_all(self, data): async def receive_some(self, nbytes): print(" --> transport_stream.receive_some") - async with self._receive_some_conflict_detector: + with self._receive_some_conflict_detector: try: + await _core.checkpoint() await _core.checkpoint() while True: await self.sleeper("receive_some") diff --git a/trio/tests/test_util.py b/trio/tests/test_util.py index cad278b285..77fdcd0835 100644 --- a/trio/tests/test_util.py +++ b/trio/tests/test_util.py @@ -38,20 +38,18 @@ async def test_ConflictDetector(): ul1 = ConflictDetector("ul1") ul2 = ConflictDetector("ul2") - async with ul1: - with assert_checkpoints(): - async with ul2: - print("ok") + with ul1: + with ul2: + print("ok") with pytest.raises(_core.BusyResourceError) as excinfo: - async with ul1: - with assert_checkpoints(): - async with ul1: - pass # pragma: no cover + with ul1: + with ul1: + pass # pragma: no cover assert "ul1" in str(excinfo.value) async def wait_with_ul1(): - async with ul1: + with ul1: await wait_all_tasks_blocked() with pytest.raises(_core.BusyResourceError) as excinfo: @@ -60,14 +58,6 @@ async def wait_with_ul1(): nursery.start_soon(wait_with_ul1) assert "ul1" in str(excinfo.value) - # mixing sync and async entry - with pytest.raises(_core.BusyResourceError) as excinfo: - with ul1.sync: - with assert_checkpoints(): - async with ul1: - pass # pragma: no cover - assert "ul1" in str(excinfo.value) - def test_module_metadata_is_fixed_up(): import trio From 726fb51dddbf50d0a7323267fec03d9dbc016942 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Mon, 3 Jun 2019 22:22:49 -0700 Subject: [PATCH 4/6] Fix _windows_pipes --- trio/_windows_pipes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trio/_windows_pipes.py b/trio/_windows_pipes.py index 6d2615ac7d..82919b836b 100644 --- a/trio/_windows_pipes.py +++ b/trio/_windows_pipes.py @@ -51,7 +51,7 @@ async def send_all(self, data: bytes): raise _core.ClosedResourceError("this pipe is already closed") if not data: - await trio.hazmat.checkpoint() + await _core.checkpoint() return try: @@ -71,7 +71,7 @@ async def wait_send_all_might_not_block(self) -> None: raise _core.ClosedResourceError("This pipe is already closed") # not implemented yet, and probably not needed - await trio.hazmat.checkpoint() + await _core.checkpoint() async def aclose(self): await self._handle_holder.aclose() From f708f1dc4961e254cd7908fbc67291744a286649 Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Tue, 4 Jun 2019 21:56:21 -0700 Subject: [PATCH 5/6] Make UnboundedByteQueue.get() more obviously correct --- trio/testing/_memory_streams.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/trio/testing/_memory_streams.py b/trio/testing/_memory_streams.py index 8457c82f7a..4855261369 100644 --- a/trio/testing/_memory_streams.py +++ b/trio/testing/_memory_streams.py @@ -78,9 +78,12 @@ async def get(self, max_bytes=None): self._check_max_bytes(max_bytes) if not self._closed and not self._data: await self._lot.park() - else: - await _core.checkpoint() - return self._get_impl(max_bytes) + return self._get_impl(max_bytes) + await _core.checkpoint_if_cancelled() + try: + return self._get_impl(max_bytes) + finally: + await _core.cancel_shielded_checkpoint() class MemorySendStream(SendStream): From 6a2053abd266d6c77e369b615c902a207505d04b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Tue, 4 Jun 2019 22:35:24 -0700 Subject: [PATCH 6/6] Revert the change I suggested https://github.com/python-trio/trio/pull/1058#discussion_r290164238 --- trio/testing/_memory_streams.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/trio/testing/_memory_streams.py b/trio/testing/_memory_streams.py index 4855261369..8457c82f7a 100644 --- a/trio/testing/_memory_streams.py +++ b/trio/testing/_memory_streams.py @@ -78,12 +78,9 @@ async def get(self, max_bytes=None): self._check_max_bytes(max_bytes) if not self._closed and not self._data: await self._lot.park() - return self._get_impl(max_bytes) - await _core.checkpoint_if_cancelled() - try: - return self._get_impl(max_bytes) - finally: - await _core.cancel_shielded_checkpoint() + else: + await _core.checkpoint() + return self._get_impl(max_bytes) class MemorySendStream(SendStream):