Skip to content
5 changes: 1 addition & 4 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,7 @@ def _check_closed(self):
def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen)
if not self.is_closed():
self.create_task(agen.aclose())
# Wake up the loop if the finalizer was called from
# a different thread.
self._write_to_self()
self.call_soon_threadsafe(self.create_task, agen.aclose())

def _asyncgen_firstiter_hook(self, agen):
if self._asyncgens_shutdown_called:
Expand Down
68 changes: 68 additions & 0 deletions Lib/test/test_asyncio/test_base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,74 @@ def test_run_forever_pre_stopped(self):
self.loop.run_forever()
self.loop._selector.select.assert_called_once_with(0)

async def leave_unfinalized_asyncgen(self):
# Create an async generator, iterate it partially, and leave it
# to be garbage collected.
# Used in async generator finalization tests.
# Depends on implementation details of garbage collector. Changes
# in gc may break this function.
status = {'started': False,
'stopped': False,
'finalized': False}

async def agen():
status['started'] = True
try:
for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
yield item
finally:
status['finalized'] = True

ag = agen()
ai = ag.__aiter__()

async def iter_one():
try:
item = await ai.__anext__()
except StopAsyncIteration:
return
if item == 'THREE':
status['stopped'] = True
return
self.loop.create_task(iter_one())

self.loop.create_task(iter_one())
return status

def test_asyncgen_finalization_by_gc(self):
# Async generators should be finalized when garbage collected.
self.loop._process_events = mock.Mock()
self.loop._write_to_self = mock.Mock()
with support.disable_gc():
status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
while not status['stopped']:
test_utils.run_briefly(self.loop)
self.assertTrue(status['started'])
self.assertTrue(status['stopped'])
self.assertFalse(status['finalized'])
support.gc_collect()
test_utils.run_briefly(self.loop)
self.assertTrue(status['finalized'])

def test_asyncgen_finalization_by_gc_in_other_thread(self):
# Python issue 34769: If garbage collector runs in another
# thread, async generators will not finalize in debug
# mode.
self.loop._process_events = mock.Mock()
self.loop._write_to_self = mock.Mock()
self.loop.set_debug(True)
with support.disable_gc():
status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
while not status['stopped']:
test_utils.run_briefly(self.loop)
self.assertTrue(status['started'])
self.assertTrue(status['stopped'])
self.assertFalse(status['finalized'])
self.loop.run_until_complete(
self.loop.run_in_executor(None, support.gc_collect))
test_utils.run_briefly(self.loop)
self.assertTrue(status['finalized'])


class MyProto(asyncio.Protocol):
done = None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix for async generators not finalizing when event loop is in debug mode and
garbage collector runs in another thread.