|
1 | 1 | import sys |
2 | 2 | import asyncio |
3 | 3 | import asyncio.events as events |
4 | | -import heapq |
5 | 4 | import threading |
6 | 5 |
|
7 | 6 |
|
@@ -48,117 +47,48 @@ def _patch_loop(loop): |
48 | 47 | """ |
49 | 48 | Patch loop to make it reentrent. |
50 | 49 | """ |
51 | | - def run_forever_35_36(self): |
52 | | - # from Python 3.5/3.6 asyncio.base_events |
53 | | - self._check_closed() |
54 | | - old_thread_id = self._thread_id |
55 | | - old_running_loop = events._get_running_loop() |
56 | | - self._set_coroutine_wrapper(self._debug) |
57 | | - self._thread_id = threading.get_ident() |
58 | | - if self._asyncgens is not None: |
59 | | - old_agen_hooks = sys.get_asyncgen_hooks() |
60 | | - sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, |
61 | | - finalizer=self._asyncgen_finalizer_hook) |
62 | | - try: |
63 | | - events._set_running_loop(self) |
64 | | - while True: |
65 | | - self._run_once() |
66 | | - if self._stopping: |
67 | | - break |
68 | | - finally: |
69 | | - self._stopping = False |
70 | | - self._thread_id = old_thread_id |
71 | | - events._set_running_loop(old_running_loop) |
72 | | - self._set_coroutine_wrapper(False) |
73 | | - if self._asyncgens is not None: |
74 | | - sys.set_asyncgen_hooks(*old_agen_hooks) |
| 50 | + if sys.version_info >= (3, 7, 0): |
| 51 | + set_coro_tracking = loop._set_coroutine_origin_tracking |
| 52 | + else: |
| 53 | + set_coro_tracking = loop._set_coroutine_wrapper |
75 | 54 |
|
76 | | - def run_forever_37(self): |
77 | | - # from Python 3.7 asyncio.base_events |
| 55 | + def run_forever(self): |
78 | 56 | self._check_closed() |
79 | 57 | old_thread_id = self._thread_id |
80 | 58 | old_running_loop = events._get_running_loop() |
81 | | - self._set_coroutine_origin_tracking(self._debug) |
| 59 | + set_coro_tracking(self._debug) |
82 | 60 | self._thread_id = threading.get_ident() |
83 | 61 |
|
84 | 62 | old_agen_hooks = sys.get_asyncgen_hooks() |
85 | | - sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, |
86 | | - finalizer=self._asyncgen_finalizer_hook) |
| 63 | + if self._asyncgens is not None: |
| 64 | + sys.set_asyncgen_hooks( |
| 65 | + firstiter=self._asyncgen_firstiter_hook, |
| 66 | + finalizer=self._asyncgen_finalizer_hook) |
87 | 67 | try: |
88 | 68 | events._set_running_loop(self) |
89 | 69 | while True: |
90 | | - self._run_once() |
| 70 | + try: |
| 71 | + self._run_once() |
| 72 | + except IndexError: |
| 73 | + # Ignore 'pop from an empty deque' errors. |
| 74 | + # This happens when all ready handles have already been |
| 75 | + # processed but _run_once expects there to be more. |
| 76 | + # Since the handles have been processed anyway it is |
| 77 | + # safe to ignore this. |
| 78 | + pass |
91 | 79 | if self._stopping: |
92 | 80 | break |
93 | 81 | finally: |
94 | 82 | self._stopping = False |
95 | 83 | self._thread_id = old_thread_id |
96 | 84 | events._set_running_loop(old_running_loop) |
97 | | - self._set_coroutine_origin_tracking(False) |
98 | | - sys.set_asyncgen_hooks(*old_agen_hooks) |
99 | | - |
100 | | - bogus_handle = asyncio.Handle(None, None, loop) |
101 | | - bogus_handle.cancel() |
102 | | - |
103 | | - def run_once(self): |
104 | | - ready = self._ready |
105 | | - scheduled = self._scheduled |
106 | | - |
107 | | - # remove bogus handles to get more efficient timeout |
108 | | - while ready and ready[0] is bogus_handle: |
109 | | - ready.popleft() |
110 | | - nready = len(ready) |
111 | | - |
112 | | - while scheduled and scheduled[0]._cancelled: |
113 | | - self._timer_cancelled_count -= 1 |
114 | | - handle = heapq.heappop(scheduled) |
115 | | - handle._scheduled = False |
116 | | - |
117 | | - timeout = None |
118 | | - if ready or self._stopping: |
119 | | - timeout = 0 |
120 | | - elif scheduled: |
121 | | - when = scheduled[0]._when |
122 | | - timeout = max(0, when - self.time()) |
123 | | - |
124 | | - event_list = self._selector.select(timeout) |
125 | | - self._process_events(event_list) |
126 | | - |
127 | | - end_time = self.time() + self._clock_resolution |
128 | | - while scheduled: |
129 | | - handle = scheduled[0] |
130 | | - if handle._when >= end_time: |
131 | | - break |
132 | | - handle = heapq.heappop(scheduled) |
133 | | - handle._scheduled = False |
134 | | - ready.append(handle) |
135 | | - |
136 | | - self._nesting_level += 1 |
137 | | - ntodo = len(ready) |
138 | | - for _ in range(ntodo): |
139 | | - if not ready: |
140 | | - break |
141 | | - handle = ready.popleft() |
142 | | - if handle._cancelled: |
143 | | - continue |
144 | | - handle._run() |
145 | | - handle = None |
146 | | - self._nesting_level -= 1 |
147 | | - |
148 | | - if nready and self._nesting_level == 0: |
149 | | - # When the loop was patched while it was already running, |
150 | | - # there is an unpatched loop._run_once enclosing us. |
151 | | - # It expects to process 'nready' handles and will crash |
152 | | - # if there are less. # So here we feed it 'nready' bogus handles. |
153 | | - ready.extendleft([bogus_handle] * nready) |
| 85 | + set_coro_tracking(False) |
| 86 | + if self._asyncgens is not None: |
| 87 | + sys.set_asyncgen_hooks(*old_agen_hooks) |
154 | 88 |
|
155 | 89 | cls = loop.__class__ |
156 | 90 | cls._run_forever_orig = cls.run_forever |
157 | | - if sys.version_info >= (3, 7, 0): |
158 | | - cls.run_forever = run_forever_37 |
159 | | - else: |
160 | | - cls.run_forever = run_forever_35_36 |
161 | | - cls._nesting_level = 0 |
| 91 | + cls.run_forever = run_forever |
162 | 92 |
|
163 | 93 |
|
164 | 94 | def _patch_task(): |
|
0 commit comments