diff --git a/appveyor.yml b/appveyor.yml index b132f88e793..8342dbf6cb3 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -49,7 +49,7 @@ environment: ARROW_BUILD_GANDIVA: "OFF" ARROW_LLVM_VERSION: "7.0.*" ARROW_S3: "OFF" - PYTHON: "3.7" + PYTHON: "3.8" ARCH: "64" matrix: diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi index 233b4fb1678..a1120e5b1c8 100644 --- a/python/pyarrow/error.pxi +++ b/python/pyarrow/error.pxi @@ -213,6 +213,12 @@ cdef class SignalStopHandler: def __exit__(self, exc_type, exc_value, exc_tb): if self._enabled: UnregisterCancellingSignalHandler() + if exc_value is None: + # Make sure we didn't lose a signal + try: + check_status(self._stop_token.stop_token.Poll()) + except ArrowCancelled as e: + exc_value = e if isinstance(exc_value, ArrowCancelled): if exc_value.signum: # Re-emit the exact same signal. We restored the Python signal @@ -220,7 +226,8 @@ cdef class SignalStopHandler: if os.name == 'nt': SendSignal(exc_value.signum) else: - SendSignalToThread(exc_value.signum, threading.get_ident()) + SendSignalToThread(exc_value.signum, + threading.main_thread().ident) else: # Simulate Python receiving a SIGINT # (see https://bugs.python.org/issue43356 for why we can't diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index d70f252345b..95dc3761e06 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -25,6 +25,7 @@ import itertools import os import pickle +import select import shutil import signal import string @@ -1373,43 +1374,68 @@ def test_cancellation(self): pytest.skip("test only works from main Python thread") # Skips test if not available raise_signal = util.get_raise_signal() + signum = signal.SIGINT - # Make the interruptible workload large enough to not finish - # before the interrupt comes, even in release mode on fast machines. + def signal_from_thread(): + # Give our workload a chance to start up + time.sleep(0.2) + raise_signal(signum) + + # We start with a small CSV reading workload and increase its size + # until it's large enough to get an interruption during it, even in + # release mode on fast machines. last_duration = 0.0 workload_size = 100_000 + attempts = 0 - while last_duration < 1.0: + while last_duration < 5.0 and attempts < 10: print("workload size:", workload_size) large_csv = b"a,b,c\n" + b"1,2,3\n" * workload_size - t1 = time.time() - self.read_bytes(large_csv) - last_duration = time.time() - t1 + exc_info = None + + try: + # We use a signal fd to reliably ensure that the signal + # has been delivered to Python, regardless of how exactly + # it was caught. + with util.signal_wakeup_fd() as sigfd: + try: + t = threading.Thread(target=signal_from_thread) + t.start() + t1 = time.time() + try: + self.read_bytes(large_csv) + except KeyboardInterrupt as e: + exc_info = e + last_duration = time.time() - t1 + finally: + # Wait for signal to arrive if it didn't already, + # to avoid getting a KeyboardInterrupt after the + # `except` block below. + select.select([sigfd], [], [sigfd], 10.0) + + except KeyboardInterrupt: + # KeyboardInterrupt didn't interrupt `read_bytes` above. + pass + + if exc_info is not None: + # We managed to get `self.read_bytes` interrupted, see if it + # was actually interrupted inside Arrow C++ or in the Python + # scaffolding. + if exc_info.__context__ is not None: + # Interrupted inside Arrow C++, we're satisfied now + break + + # Increase workload size to get a better chance workload_size = workload_size * 3 - def signal_from_thread(): - time.sleep(0.2) - raise_signal(signal.SIGINT) + if exc_info is None: + pytest.fail("Failed to get an interruption during CSV reading") - t1 = time.time() - try: - try: - t = threading.Thread(target=signal_from_thread) - with pytest.raises(KeyboardInterrupt) as exc_info: - t.start() - self.read_bytes(large_csv) - finally: - t.join() - except KeyboardInterrupt: - # In case KeyboardInterrupt didn't interrupt `self.read_bytes` - # above, at least prevent it from stopping the test suite - pytest.fail("KeyboardInterrupt didn't interrupt CSV reading") - dt = time.time() - t1 # Interruption should have arrived timely - assert dt <= 1.0 - e = exc_info.value.__context__ + assert last_duration <= 1.0 + e = exc_info.__context__ assert isinstance(e, pa.ArrowCancelled) - assert e.signum == signal.SIGINT + assert e.signum == signum def test_cancellation_disabled(self): # ARROW-12622: reader would segfault when the cancelling signal diff --git a/python/pyarrow/tests/util.py b/python/pyarrow/tests/util.py index 281de69e3e6..bd88ae4013f 100644 --- a/python/pyarrow/tests/util.py +++ b/python/pyarrow/tests/util.py @@ -26,6 +26,7 @@ import os import random import signal +import socket import string import subprocess import sys @@ -329,3 +330,22 @@ def get_raise_signal(): def raise_signal(signum): os.kill(os.getpid(), signum) return raise_signal + + +@contextlib.contextmanager +def signal_wakeup_fd(*, warn_on_full_buffer=False): + # Use a socket pair, rather a self-pipe, so that select() can be used + # on Windows. + r, w = socket.socketpair() + old_fd = None + try: + r.setblocking(False) + w.setblocking(False) + old_fd = signal.set_wakeup_fd( + w.fileno(), warn_on_full_buffer=warn_on_full_buffer) + yield r + finally: + if old_fd is not None: + signal.set_wakeup_fd(old_fd) + r.close() + w.close()