2121import contextlib
2222import dataclasses
2323import datetime
24+ import functools
2425import heapq
2526import logging
2627import platform
@@ -48,6 +49,12 @@ class ScheduledJob:
4849
4950
5051class SchedulerQueue :
52+ """
53+ A priority queue for scheduler jobs.
54+ Jobs are stored in ascending order by their scheduled execution time.
55+ This allows for efficient peek and pop operations of the next scheduled job.
56+ """
57+
5158 def __init__ (self ):
5259 self ._queue = []
5360
@@ -64,7 +71,7 @@ def peek_next_event_dt(self) -> Optional[datetime.datetime]:
6471 def peek_last_event_dt (self ) -> Optional [datetime .datetime ]:
6572 ret = None
6673 if self ._queue :
67- ret = self ._queue [ - 1 ].when
74+ ret = heapq . nlargest ( 1 , self ._queue )[ 0 ].when
6875 return ret
6976
7077 def pop (self ) -> Tuple [datetime .datetime , SchedulerJob ]:
@@ -74,6 +81,9 @@ def pop(self) -> Tuple[datetime.datetime, SchedulerJob]:
7481
7582
7683class EventMultiplexer :
84+ """
85+ A multiplexer that manages multiple event sources and provides methods to retrieve events in chronological order.
86+ """
7787 def __init__ (self ) -> None :
7888 self ._prefetched_events : Dict [event .EventSource , Optional [event .Event ]] = {}
7989
@@ -83,11 +93,10 @@ def add(self, source: event.EventSource):
8393 def peek_next_event_dt (self ) -> Optional [datetime .datetime ]:
8494 self ._prefetch ()
8595
86- next_dt = None
87- prefetched_events = [evnt for evnt in self ._prefetched_events .values () if evnt ]
88- if prefetched_events :
89- next_dt = min (map (lambda evnt : evnt .when , prefetched_events ))
90- return next_dt
96+ return min (
97+ [evnt .when for evnt in self ._prefetched_events .values () if evnt ],
98+ default = None
99+ )
91100
92101 def pop (self , max_dt : datetime .datetime ) -> Tuple [Optional [event .EventSource ], Optional [event .Event ]]:
93102 ret_source : Optional [event .EventSource ] = None
@@ -124,7 +133,8 @@ def _prefetch(self):
124133
125134
126135class EventDispatcher (metaclass = abc .ABCMeta ):
127- """Responsible for connecting event sources to event handlers and dispatching events in the right order.
136+ """
137+ Responsible for connecting event sources to event handlers and dispatching events in the right order.
128138
129139 :param max_concurrent: The maximum number of events to process concurrently.
130140
@@ -142,18 +152,19 @@ def __init__(self, max_concurrent: int):
142152 self ._sniffers_pre : List [EventHandler ] = []
143153 self ._sniffers_post : List [EventHandler ] = []
144154 self ._producers : Set [event .Producer ] = set ()
145- self ._active_tasks : Optional [helpers .TaskGroup ] = None
155+ # Task group for core tasks like producers and dispatch loop.
156+ self ._core_tasks : Optional [helpers .TaskGroup ] = None
146157 self ._running = False
147158 self ._stopped = False
148159 self ._scheduler_queue = SchedulerQueue ()
149160 self ._event_mux = EventMultiplexer ()
150- # Used to execute event and scheduler handlers.
151- self ._handlers_task_pool = helpers .TaskPool (max_concurrent )
161+ # Task group for event and scheduler handlers.
162+ self ._handler_tasks = helpers .TaskPool (max_concurrent , max_queue_size = max_concurrent * 10 )
152163 # Set to True for the dispatcher to stop if a handler raises an exception.
153164 self .stop_on_handler_exceptions = False
154165
155166 @property
156- def current_event_dt (self ) -> Optional [datetime .datetime ]: # pragma: no cover
167+ def current_event_dt (self ) -> Optional [datetime .datetime ]:
157168 helpers .deprecation_warning ("Use now() instead" )
158169 return self .now ()
159170
@@ -170,9 +181,9 @@ def stop(self):
170181 """Requests the event dispatcher to stop the event processing loop."""
171182 logger .debug ("Stop requested" )
172183 self ._stopped = True
173- if self ._active_tasks :
174- self ._active_tasks .cancel ()
175- self ._handlers_task_pool .cancel ()
184+ if self ._core_tasks :
185+ self ._core_tasks .cancel ()
186+ self ._handler_tasks .cancel ()
176187
177188 def subscribe (self , source : event .EventSource , event_handler : EventHandler ):
178189 """Registers an async callable that will be called when an event source has new events.
@@ -224,7 +235,7 @@ async def run(self, stop_signals: List[int] = [signal.SIGINT, signal.SIGTERM]):
224235 """
225236
226237 assert not self ._running , "Can't run twice."
227- assert self ._active_tasks is None
238+ assert self ._core_tasks is None
228239
229240 # This block has coverage on all platforms except on Windows.
230241 if platform .system () != "Windows" : # pragma: no cover
@@ -234,23 +245,23 @@ async def run(self, stop_signals: List[int] = [signal.SIGINT, signal.SIGTERM]):
234245 self ._running = True
235246 try :
236247 # Initialize producers.
237- async with self ._task_group () as tg :
248+ async with self ._core_task_group () as tg :
238249 for producer in self ._producers :
239250 tg .create_task (producer .initialize ())
240251 # Run producers and dispatch loop.
241- async with self ._task_group () as tg :
252+ async with self ._core_task_group () as tg :
242253 for producer in self ._producers :
243254 tg .create_task (producer .main ())
244255 tg .create_task (self ._dispatch_loop ())
245256 except asyncio .CancelledError :
246257 if not self .stopped :
247258 raise
248259 finally :
249- # Cancel any pending task in the event pool.
250- self ._handlers_task_pool .cancel ()
251- await self ._handlers_task_pool .wait ()
260+ # Cancel any pending task in the event handlers pool.
261+ self ._handler_tasks .cancel ()
262+ await self ._handler_tasks .wait ()
252263 # No more cancelation at this point.
253- self ._active_tasks = None
264+ self ._core_tasks = None
254265 # Finalize producers.
255266 await gather_no_raise (* [producer .finalize () for producer in self ._producers ])
256267
@@ -262,13 +273,13 @@ async def _dispatch_loop(self):
262273 raise NotImplementedError ()
263274
264275 @contextlib .asynccontextmanager
265- async def _task_group (self ):
276+ async def _core_task_group (self ):
266277 try :
267278 async with helpers .TaskGroup () as tg :
268- self ._active_tasks = tg # So it can be canceled.
279+ self ._core_tasks = tg # So it can be canceled.
269280 yield tg
270281 finally :
271- self ._active_tasks = None
282+ self ._core_tasks = None
272283
273284 async def _dispatch_event (self , event_dispatch : EventDispatch ):
274285 logger .debug (logs .StructuredMessage (
@@ -350,7 +361,7 @@ async def _dispatch_loop(self):
350361 await self ._dispatch_scheduled (next_dt )
351362 await self ._dispatch_events (next_dt )
352363 else :
353- # Dispatch all pending scheduled before stopping.
364+ # No more events. Dispatch all pending scheduled jobs before stopping.
354365 if last_scheduled_dt := self ._scheduler_queue .peek_last_event_dt ():
355366 await self ._dispatch_scheduled (last_scheduled_dt )
356367 self .stop ()
@@ -364,20 +375,22 @@ async def _dispatch_scheduled(self, dt: datetime.datetime):
364375 if self ._last_dt is None or next_scheduled_dt > self ._last_dt :
365376 self ._last_dt = next_scheduled_dt
366377
367- await self ._handlers_task_pool .push (self ._execute_scheduled ( next_scheduled_dt , job ))
378+ await self ._handler_tasks .push (functools . partial ( self ._execute_scheduled , next_scheduled_dt , job ))
368379 # Waiting here and not outside of the loop to prevent executing distant scheduled jobs at the same time.
369- await self ._handlers_task_pool .wait ()
380+ await self ._handler_tasks .wait ()
370381
371382 next_scheduled_dt = self ._scheduler_queue .peek_next_event_dt ()
372383
373384 async def _dispatch_events (self , dt : datetime .datetime ):
374385 # Pop events, push them into the task pool, and wait those to finish executing.
375386 self ._last_dt = dt
376387 for source , evnt in self ._event_mux .pop_while (dt ):
377- await self ._handlers_task_pool .push (
378- self ._dispatch_event (EventDispatch (event = evnt , handlers = self ._event_handlers [source ]))
388+ await self ._handler_tasks .push (
389+ functools .partial (
390+ self ._dispatch_event , EventDispatch (event = evnt , handlers = self ._event_handlers [source ])
391+ )
379392 )
380- await self ._handlers_task_pool .wait ()
393+ await self ._handler_tasks .wait ()
381394
382395
383396class RealtimeDispatcher (EventDispatcher ):
@@ -389,8 +402,8 @@ class RealtimeDispatcher(EventDispatcher):
389402 def __init__ (self , max_concurrent : int ):
390403 super ().__init__ (max_concurrent = max_concurrent )
391404 self ._prev_event_dt : Dict [event .EventSource , datetime .datetime ] = {}
392- self .idle_sleep = 0.01
393- self ._wait_all_timeout : Optional [ float ] = 0.01
405+ self .idle_sleep : float = 0.001
406+ self ._wait_all_timeout : float = 0 # TODO: Will be removed in a future version.
394407 self ._idle_handlers : List [IdleHandler ] = []
395408
396409 def now (self ) -> datetime .datetime :
@@ -415,25 +428,31 @@ async def _dispatch_loop(self):
415428 self ._push_scheduled (now ),
416429 self ._push_events (now ),
417430 )
418- # Give some time for tasks to execute, and keep on pushing tasks.
419- await self ._handlers_task_pool .wait (timeout = self ._wait_all_timeout )
420- if self ._handlers_task_pool .idle :
431+ # Optionally give some time for handlers to execute before pushing new ones.
432+ # This is disabled by default and it will be deprecated.
433+ if self ._wait_all_timeout : # pragma: no cover
434+ await self ._handler_tasks .wait (timeout = self ._wait_all_timeout )
435+
436+ if self ._handler_tasks .idle :
421437 await self ._on_idle ()
438+ else :
439+ # Yield to the event loop to allow other tasks to run.
440+ await asyncio .sleep (0 )
422441
423442 async def _on_idle (self ):
424443 if self ._idle_handlers :
425444 await gather_no_raise (* [
426- self ._handlers_task_pool .push (idle_handler () ) for idle_handler in self ._idle_handlers
445+ self ._handler_tasks .push (idle_handler ) for idle_handler in self ._idle_handlers
427446 ])
428- else :
429- # Otherwise we'll monopolize the event loop .
430- await asyncio .sleep (self .idle_sleep )
447+
448+ # Avoid trashing the CPU if there's nothing to do .
449+ await asyncio .sleep (self .idle_sleep )
431450
432451 async def _push_scheduled (self , dt : datetime .datetime ):
433452 while (next_scheduled_dt := self ._scheduler_queue .peek_next_event_dt ()) and next_scheduled_dt <= dt :
434453 next_scheduled_dt , job = self ._scheduler_queue .pop ()
435454 # Push scheduled job into the task pool for processing.
436- await self ._handlers_task_pool .push (self ._execute_scheduled ( next_scheduled_dt , job ))
455+ await self ._handler_tasks .push (functools . partial ( self ._execute_scheduled , next_scheduled_dt , job ))
437456
438457 async def _push_events (self , dt : datetime .datetime ):
439458 # Pop events and feed the pool.
@@ -449,8 +468,8 @@ async def _push_events(self, dt: datetime.datetime):
449468 self ._prev_event_dt [source ] = evnt .when
450469
451470 # Push event into the task pool for processing.
452- await self ._handlers_task_pool .push (
453- self ._dispatch_event ( EventDispatch (
471+ await self ._handler_tasks .push (
472+ functools . partial ( self ._dispatch_event , EventDispatch (
454473 event = evnt ,
455474 handlers = self ._event_handlers [source ]
456475 ))
0 commit comments