From da4cb68e4e73a61eec1fe292fcb5968031201bf0 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 26 Jul 2022 16:44:12 -0600 Subject: [PATCH 1/3] Improve output --- distributed/tests/test_utils_test.py | 43 ++++++++++++++++++++++++++++ distributed/utils_test.py | 14 +++++++-- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 78f4db5e400..3a5d3edf113 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -16,6 +16,7 @@ import pytest import yaml +from _pytest.outcomes import Failed from tornado import gen import dask.config @@ -34,6 +35,7 @@ assert_story, captured_logger, check_process_leak, + check_thread_leak, cluster, dump_cluster_state, freeze_batched_send, @@ -755,6 +757,47 @@ def test_raises_with_cause(): raise RuntimeError("exception") from ValueError("cause") +@pytest.mark.slow +def test_check_thread_leak(): + event = threading.Event() + + t1 = threading.Thread(target=lambda: (event.wait(), "one")) + t1.start() + + t2 = t3 = None + try: + with pytest.raises(Failed, match=r"2 thread\(s\) were leaked") as exc: + with check_thread_leak(): + t2 = threading.Thread(target=lambda: (event.wait(), "two")) + t2.start() + t3 = threading.Thread(target=lambda: (event.wait(), "three")) + t3.start() + + msg = exc.value.msg + assert msg + print(msg) # For reference, if test fails + + # First, outer thread is ignored + assert msg.count("Call stack of leaked thread") == 2 + assert "one" not in msg + + # Make sure we can see the full traceback, not just the last line + assert msg.count(__file__) == 2 + assert 'target=lambda: (event.wait(), "two")' in msg + assert 'target=lambda: (event.wait(), "three")' in msg + + # Ensure there aren't too many or too few newlines + exc.match(r'event.wait\(\), "three"\)\)\n +File') + finally: + # Clean up + event.set() + t1.join(5) + if t2: + t2.join(5) + if t3: + t3.join(5) + + @pytest.mark.parametrize("sync", [True, False]) def test_fail_hard(sync): """@fail_hard is a last resort when error handling for everything that we foresaw diff --git a/distributed/utils_test.py b/distributed/utils_test.py index fadc6002376..6971165c4aa 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1750,9 +1750,17 @@ def check_thread_leak(): # Raise an error with information about leaked threads from distributed import profile - bad_thread = bad_threads[0] - call_stacks = profile.call_stack(sys._current_frames()[bad_thread.ident]) - assert False, (bad_thread, call_stacks) + lines: list[str] = [f"{len(bad_threads)} thread(s) were leaked from test\n"] + for i, thread in enumerate(bad_threads, 1): + lines.append( + f"------ Call stack of leaked thread {i}/{len(bad_threads)}: {thread} ------" + ) + lines.append( + "".join(profile.call_stack(sys._current_frames()[thread.ident])) + # NOTE: `call_stack` already adds newlines + ) + + pytest.fail("\n".join(lines), pytrace=False) def wait_active_children(timeout: float) -> list[multiprocessing.Process]: From 495ae88c12b889d17efd8215dce5a04a1c64b922 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 28 Jul 2022 16:47:48 -0600 Subject: [PATCH 2/3] `pytest.fail.Exception` --- distributed/tests/test_utils_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 3a5d3edf113..f9d98dd7882 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -16,7 +16,6 @@ import pytest import yaml -from _pytest.outcomes import Failed from tornado import gen import dask.config @@ -766,7 +765,9 @@ def test_check_thread_leak(): t2 = t3 = None try: - with pytest.raises(Failed, match=r"2 thread\(s\) were leaked") as exc: + with pytest.raises( + pytest.fail.Exception, match=r"2 thread\(s\) were leaked" + ) as exc: with check_thread_leak(): t2 = threading.Thread(target=lambda: (event.wait(), "two")) t2.start() From 95aabed10fbf61282520cd505fdcf931d6be37f7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 28 Jul 2022 16:50:41 -0600 Subject: [PATCH 3/3] call `_current_frames` once --- distributed/utils_test.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 6971165c4aa..90b4266e3a1 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1750,15 +1750,21 @@ def check_thread_leak(): # Raise an error with information about leaked threads from distributed import profile - lines: list[str] = [f"{len(bad_threads)} thread(s) were leaked from test\n"] - for i, thread in enumerate(bad_threads, 1): - lines.append( - f"------ Call stack of leaked thread {i}/{len(bad_threads)}: {thread} ------" - ) - lines.append( - "".join(profile.call_stack(sys._current_frames()[thread.ident])) - # NOTE: `call_stack` already adds newlines - ) + frames = sys._current_frames() + try: + lines: list[str] = [ + f"{len(bad_threads)} thread(s) were leaked from test\n" + ] + for i, thread in enumerate(bad_threads, 1): + lines.append( + f"------ Call stack of leaked thread {i}/{len(bad_threads)}: {thread} ------" + ) + lines.append( + "".join(profile.call_stack(frames[thread.ident])) + # NOTE: `call_stack` already adds newlines + ) + finally: + del frames pytest.fail("\n".join(lines), pytrace=False)