2020import typing
2121from enum import Enum
2222
23- from opentelemetry .context import attach , detach , get_current , set_value
24- from opentelemetry .trace import DefaultSpan
23+ from opentelemetry .context import attach , detach , set_value
2524from opentelemetry .util import time_ns
2625
2726from .. import Span , SpanProcessor
@@ -91,15 +90,23 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
9190 return True
9291
9392
93+ class _FlushRequest :
94+ """Represents a request for the BatchExportSpanProcessor to flush spans."""
95+
96+ __slots__ = ["event" , "num_spans" ]
97+
98+ def __init__ (self ):
99+ self .event = threading .Event ()
100+ self .num_spans = 0
101+
102+
94103class BatchExportSpanProcessor (SpanProcessor ):
95104 """Batch span processor implementation.
96105
97106 BatchExportSpanProcessor is an implementation of `SpanProcessor` that
98107 batches ended spans and pushes them to the configured `SpanExporter`.
99108 """
100109
101- _FLUSH_TOKEN_SPAN = DefaultSpan (context = None )
102-
103110 def __init__ (
104111 self ,
105112 span_exporter : SpanExporter ,
@@ -129,9 +136,7 @@ def __init__(
129136 ) # type: typing.Deque[Span]
130137 self .worker_thread = threading .Thread (target = self .worker , daemon = True )
131138 self .condition = threading .Condition (threading .Lock ())
132- self .flush_condition = threading .Condition (threading .Lock ())
133- # flag to indicate that there is a flush operation on progress
134- self ._flushing = False
139+ self ._flush_request = None # type: typing.Optional[_FlushRequest]
135140 self .schedule_delay_millis = schedule_delay_millis
136141 self .max_export_batch_size = max_export_batch_size
137142 self .max_queue_size = max_queue_size
@@ -164,60 +169,128 @@ def on_end(self, span: Span) -> None:
164169
165170 def worker (self ):
166171 timeout = self .schedule_delay_millis / 1e3
172+ flush_request = None # type: typing.Optional[_FlushRequest]
167173 while not self .done :
168- if (
169- len (self .queue ) < self .max_export_batch_size
170- and not self ._flushing
171- ):
172- with self .condition :
174+ with self .condition :
175+ if self .done :
176+ # done flag may have changed, avoid waiting
177+ break
178+ flush_request = self ._get_and_unset_flush_request ()
179+ if (
180+ len (self .queue ) < self .max_export_batch_size
181+ and flush_request is None
182+ ):
183+
173184 self .condition .wait (timeout )
185+ flush_request = self ._get_and_unset_flush_request ()
174186 if not self .queue :
175187 # spurious notification, let's wait again
188+ self ._notify_flush_request_finished (flush_request )
189+ flush_request = None
176190 continue
177191 if self .done :
178192 # missing spans will be sent when calling flush
179193 break
180194
181- # substract the duration of this export call to the next timeout
195+ # subtract the duration of this export call to the next timeout
182196 start = time_ns ()
183- self .export ( )
197+ self ._export ( flush_request )
184198 end = time_ns ()
185199 duration = (end - start ) / 1e9
186200 timeout = self .schedule_delay_millis / 1e3 - duration
187201
202+ self ._notify_flush_request_finished (flush_request )
203+ flush_request = None
204+
205+ # there might have been a new flush request while export was running
206+ # and before the done flag switched to true
207+ with self .condition :
208+ shutdown_flush_request = self ._get_and_unset_flush_request ()
209+
188210 # be sure that all spans are sent
189211 self ._drain_queue ()
212+ self ._notify_flush_request_finished (flush_request )
213+ self ._notify_flush_request_finished (shutdown_flush_request )
214+
215+ def _get_and_unset_flush_request (self ,) -> typing .Optional [_FlushRequest ]:
216+ """Returns the current flush request and makes it invisible to the
217+ worker thread for subsequent calls.
218+ """
219+ flush_request = self ._flush_request
220+ self ._flush_request = None
221+ if flush_request is not None :
222+ flush_request .num_spans = len (self .queue )
223+ return flush_request
224+
225+ @staticmethod
226+ def _notify_flush_request_finished (
227+ flush_request : typing .Optional [_FlushRequest ],
228+ ):
229+ """Notifies the flush initiator(s) waiting on the given request/event
230+ that the flush operation was finished.
231+ """
232+ if flush_request is not None :
233+ flush_request .event .set ()
234+
235+ def _get_or_create_flush_request (self ) -> _FlushRequest :
236+ """Either returns the current active flush event or creates a new one.
190237
191- def export (self ) -> None :
192- """Exports at most max_export_batch_size spans."""
238+ The flush event will be visible and read by the worker thread before an
239+ export operation starts. Callers of a flush operation may wait on the
240+ returned event to be notified when the flush/export operation was
241+ finished.
242+
243+ This method is not thread-safe, i.e. callers need to take care about
244+ synchronization/locking.
245+ """
246+ if self ._flush_request is None :
247+ self ._flush_request = _FlushRequest ()
248+ return self ._flush_request
249+
250+ def _export (self , flush_request : typing .Optional [_FlushRequest ]):
251+ """Exports spans considering the given flush_request.
252+
253+ In case of a given flush_requests spans are exported in batches until
254+ the number of exported spans reached or exceeded the number of spans in
255+ the flush request.
256+ In no flush_request was given at most max_export_batch_size spans are
257+ exported.
258+ """
259+ if not flush_request :
260+ self ._export_batch ()
261+ return
262+
263+ num_spans = flush_request .num_spans
264+ while self .queue :
265+ num_exported = self ._export_batch ()
266+ num_spans -= num_exported
267+
268+ if num_spans <= 0 :
269+ break
270+
271+ def _export_batch (self ) -> int :
272+ """Exports at most max_export_batch_size spans and returns the number of
273+ exported spans.
274+ """
193275 idx = 0
194- notify_flush = False
195276 # currently only a single thread acts as consumer, so queue.pop() will
196277 # not raise an exception
197278 while idx < self .max_export_batch_size and self .queue :
198- span = self .queue .pop ()
199- if span is self ._FLUSH_TOKEN_SPAN :
200- notify_flush = True
201- else :
202- self .spans_list [idx ] = span
203- idx += 1
279+ self .spans_list [idx ] = self .queue .pop ()
280+ idx += 1
204281 token = attach (set_value ("suppress_instrumentation" , True ))
205282 try :
206283 # Ignore type b/c the Optional[None]+slicing is too "clever"
207284 # for mypy
208285 self .span_exporter .export (self .spans_list [:idx ]) # type: ignore
209- # pylint: disable=broad-except
210- except Exception :
286+ except Exception : # pylint: disable=broad-except
211287 logger .exception ("Exception while exporting Span batch." )
212288 detach (token )
213289
214- if notify_flush :
215- with self .flush_condition :
216- self .flush_condition .notify ()
217-
218290 # clean up list
219291 for index in range (idx ):
220292 self .spans_list [index ] = None
293+ return idx
221294
222295 def _drain_queue (self ):
223296 """"Export all elements until queue is empty.
@@ -226,26 +299,20 @@ def _drain_queue(self):
226299 `export` that is not thread safe.
227300 """
228301 while self .queue :
229- self .export ()
302+ self ._export_batch ()
230303
231304 def force_flush (self , timeout_millis : int = 30000 ) -> bool :
232305 if self .done :
233306 logger .warning ("Already shutdown, ignoring call to force_flush()." )
234307 return True
235308
236- self ._flushing = True
237- self .queue .appendleft (self ._FLUSH_TOKEN_SPAN )
238-
239- # wake up worker thread
240309 with self .condition :
310+ flush_request = self ._get_or_create_flush_request ()
311+ # signal the worker thread to flush and wait for it to finish
241312 self .condition .notify_all ()
242313
243314 # wait for token to be processed
244- with self .flush_condition :
245- ret = self .flush_condition .wait (timeout_millis / 1e3 )
246-
247- self ._flushing = False
248-
315+ ret = flush_request .event .wait (timeout_millis / 1e3 )
249316 if not ret :
250317 logger .warning ("Timeout was exceeded in force_flush()." )
251318 return ret
0 commit comments