Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ environment:
ARROW_BUILD_GANDIVA: "OFF"
ARROW_LLVM_VERSION: "7.0.*"
ARROW_S3: "OFF"
PYTHON: "3.7"
PYTHON: "3.8"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows to run the cancellation tests on Windows.

ARCH: "64"

matrix:
Expand Down
9 changes: 8 additions & 1 deletion python/pyarrow/error.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,21 @@ cdef class SignalStopHandler:
def __exit__(self, exc_type, exc_value, exc_tb):
if self._enabled:
UnregisterCancellingSignalHandler()
if exc_value is None:
# Make sure we didn't lose a signal
try:
check_status(self._stop_token.stop_token.Poll())
except ArrowCancelled as e:
exc_value = e
if isinstance(exc_value, ArrowCancelled):
if exc_value.signum:
# Re-emit the exact same signal. We restored the Python signal
# handler above, so it should receive it.
if os.name == 'nt':
SendSignal(exc_value.signum)
else:
SendSignalToThread(exc_value.signum, threading.get_ident())
SendSignalToThread(exc_value.signum,
threading.main_thread().ident)
else:
# Simulate Python receiving a SIGINT
# (see https://bugs.python.org/issue43356 for why we can't
Expand Down
78 changes: 52 additions & 26 deletions python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import itertools
import os
import pickle
import select
import shutil
import signal
import string
Expand Down Expand Up @@ -1373,43 +1374,68 @@ def test_cancellation(self):
pytest.skip("test only works from main Python thread")
# Skips test if not available
raise_signal = util.get_raise_signal()
signum = signal.SIGINT

# Make the interruptible workload large enough to not finish
# before the interrupt comes, even in release mode on fast machines.
def signal_from_thread():
# Give our workload a chance to start up
time.sleep(0.2)
raise_signal(signum)

# We start with a small CSV reading workload and increase its size
# until it's large enough to get an interruption during it, even in
# release mode on fast machines.
last_duration = 0.0
workload_size = 100_000
attempts = 0

while last_duration < 1.0:
while last_duration < 5.0 and attempts < 10:
print("workload size:", workload_size)
large_csv = b"a,b,c\n" + b"1,2,3\n" * workload_size
t1 = time.time()
self.read_bytes(large_csv)
last_duration = time.time() - t1
exc_info = None

try:
# We use a signal fd to reliably ensure that the signal
# has been delivered to Python, regardless of how exactly
# it was caught.
with util.signal_wakeup_fd() as sigfd:
try:
t = threading.Thread(target=signal_from_thread)
t.start()
t1 = time.time()
try:
self.read_bytes(large_csv)
except KeyboardInterrupt as e:
exc_info = e
last_duration = time.time() - t1
finally:
# Wait for signal to arrive if it didn't already,
# to avoid getting a KeyboardInterrupt after the
# `except` block below.
select.select([sigfd], [], [sigfd], 10.0)

except KeyboardInterrupt:
# KeyboardInterrupt didn't interrupt `read_bytes` above.
pass

if exc_info is not None:
# We managed to get `self.read_bytes` interrupted, see if it
# was actually interrupted inside Arrow C++ or in the Python
# scaffolding.
if exc_info.__context__ is not None:
# Interrupted inside Arrow C++, we're satisfied now
break

# Increase workload size to get a better chance
workload_size = workload_size * 3

def signal_from_thread():
time.sleep(0.2)
raise_signal(signal.SIGINT)
if exc_info is None:
pytest.fail("Failed to get an interruption during CSV reading")

t1 = time.time()
try:
try:
t = threading.Thread(target=signal_from_thread)
with pytest.raises(KeyboardInterrupt) as exc_info:
t.start()
self.read_bytes(large_csv)
finally:
t.join()
except KeyboardInterrupt:
# In case KeyboardInterrupt didn't interrupt `self.read_bytes`
# above, at least prevent it from stopping the test suite
pytest.fail("KeyboardInterrupt didn't interrupt CSV reading")
dt = time.time() - t1
# Interruption should have arrived timely
assert dt <= 1.0
e = exc_info.value.__context__
assert last_duration <= 1.0
e = exc_info.__context__
assert isinstance(e, pa.ArrowCancelled)
assert e.signum == signal.SIGINT
assert e.signum == signum

def test_cancellation_disabled(self):
# ARROW-12622: reader would segfault when the cancelling signal
Expand Down
20 changes: 20 additions & 0 deletions python/pyarrow/tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import os
import random
import signal
import socket
import string
import subprocess
import sys
Expand Down Expand Up @@ -329,3 +330,22 @@ def get_raise_signal():
def raise_signal(signum):
os.kill(os.getpid(), signum)
return raise_signal


@contextlib.contextmanager
def signal_wakeup_fd(*, warn_on_full_buffer=False):
# Use a socket pair, rather a self-pipe, so that select() can be used
# on Windows.
r, w = socket.socketpair()
old_fd = None
try:
r.setblocking(False)
w.setblocking(False)
old_fd = signal.set_wakeup_fd(
w.fileno(), warn_on_full_buffer=warn_on_full_buffer)
yield r
finally:
if old_fd is not None:
Comment thread
lidavidm marked this conversation as resolved.
Outdated
signal.set_wakeup_fd(old_fd)
r.close()
w.close()