diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 60116ec7313..214766253e7 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -8,6 +8,8 @@ ([#1034](https://github.com/open-telemetry/opentelemetry-python/pull/1034)) - Remove lazy Event and Link API from Span interface ([#1045](https://github.com/open-telemetry/opentelemetry-python/pull/1045)) +- Improve BatchExportSpanProcessor + ([#1062](https://github.com/open-telemetry/opentelemetry-python/pull/1062)) - Populate resource attributes as per semantic conventions ([#1053](https://github.com/open-telemetry/opentelemetry-python/pull/1053)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 9fe55ed7fd4..7c1e51f3ec5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -20,8 +20,7 @@ import typing from enum import Enum -from opentelemetry.context import attach, detach, get_current, set_value -from opentelemetry.trace import DefaultSpan +from opentelemetry.context import attach, detach, set_value from opentelemetry.util import time_ns from .. import Span, SpanProcessor @@ -91,6 +90,16 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return True +class _FlushRequest: + """Represents a request for the BatchExportSpanProcessor to flush spans.""" + + __slots__ = ["event", "num_spans"] + + def __init__(self): + self.event = threading.Event() + self.num_spans = 0 + + class BatchExportSpanProcessor(SpanProcessor): """Batch span processor implementation. @@ -98,8 +107,6 @@ class BatchExportSpanProcessor(SpanProcessor): batches ended spans and pushes them to the configured `SpanExporter`. """ - _FLUSH_TOKEN_SPAN = DefaultSpan(context=None) - def __init__( self, span_exporter: SpanExporter, @@ -129,9 +136,7 @@ def __init__( ) # type: typing.Deque[Span] self.worker_thread = threading.Thread(target=self.worker, daemon=True) self.condition = threading.Condition(threading.Lock()) - self.flush_condition = threading.Condition(threading.Lock()) - # flag to indicate that there is a flush operation on progress - self._flushing = False + self._flush_request = None # type: typing.Optional[_FlushRequest] self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.max_queue_size = max_queue_size @@ -164,60 +169,128 @@ def on_end(self, span: Span) -> None: def worker(self): timeout = self.schedule_delay_millis / 1e3 + flush_request = None # type: typing.Optional[_FlushRequest] while not self.done: - if ( - len(self.queue) < self.max_export_batch_size - and not self._flushing - ): - with self.condition: + with self.condition: + if self.done: + # done flag may have changed, avoid waiting + break + flush_request = self._get_and_unset_flush_request() + if ( + len(self.queue) < self.max_export_batch_size + and flush_request is None + ): + self.condition.wait(timeout) + flush_request = self._get_and_unset_flush_request() if not self.queue: # spurious notification, let's wait again + self._notify_flush_request_finished(flush_request) + flush_request = None continue if self.done: # missing spans will be sent when calling flush break - # substract the duration of this export call to the next timeout + # subtract the duration of this export call to the next timeout start = time_ns() - self.export() + self._export(flush_request) end = time_ns() duration = (end - start) / 1e9 timeout = self.schedule_delay_millis / 1e3 - duration + self._notify_flush_request_finished(flush_request) + flush_request = None + + # there might have been a new flush request while export was running + # and before the done flag switched to true + with self.condition: + shutdown_flush_request = self._get_and_unset_flush_request() + # be sure that all spans are sent self._drain_queue() + self._notify_flush_request_finished(flush_request) + self._notify_flush_request_finished(shutdown_flush_request) + + def _get_and_unset_flush_request(self,) -> typing.Optional[_FlushRequest]: + """Returns the current flush request and makes it invisible to the + worker thread for subsequent calls. + """ + flush_request = self._flush_request + self._flush_request = None + if flush_request is not None: + flush_request.num_spans = len(self.queue) + return flush_request + + @staticmethod + def _notify_flush_request_finished( + flush_request: typing.Optional[_FlushRequest], + ): + """Notifies the flush initiator(s) waiting on the given request/event + that the flush operation was finished. + """ + if flush_request is not None: + flush_request.event.set() + + def _get_or_create_flush_request(self) -> _FlushRequest: + """Either returns the current active flush event or creates a new one. - def export(self) -> None: - """Exports at most max_export_batch_size spans.""" + The flush event will be visible and read by the worker thread before an + export operation starts. Callers of a flush operation may wait on the + returned event to be notified when the flush/export operation was + finished. + + This method is not thread-safe, i.e. callers need to take care about + synchronization/locking. + """ + if self._flush_request is None: + self._flush_request = _FlushRequest() + return self._flush_request + + def _export(self, flush_request: typing.Optional[_FlushRequest]): + """Exports spans considering the given flush_request. + + In case of a given flush_requests spans are exported in batches until + the number of exported spans reached or exceeded the number of spans in + the flush request. + In no flush_request was given at most max_export_batch_size spans are + exported. + """ + if not flush_request: + self._export_batch() + return + + num_spans = flush_request.num_spans + while self.queue: + num_exported = self._export_batch() + num_spans -= num_exported + + if num_spans <= 0: + break + + def _export_batch(self) -> int: + """Exports at most max_export_batch_size spans and returns the number of + exported spans. + """ idx = 0 - notify_flush = False # currently only a single thread acts as consumer, so queue.pop() will # not raise an exception while idx < self.max_export_batch_size and self.queue: - span = self.queue.pop() - if span is self._FLUSH_TOKEN_SPAN: - notify_flush = True - else: - self.spans_list[idx] = span - idx += 1 + self.spans_list[idx] = self.queue.pop() + idx += 1 token = attach(set_value("suppress_instrumentation", True)) try: # Ignore type b/c the Optional[None]+slicing is too "clever" # for mypy self.span_exporter.export(self.spans_list[:idx]) # type: ignore - # pylint: disable=broad-except - except Exception: + except Exception: # pylint: disable=broad-except logger.exception("Exception while exporting Span batch.") detach(token) - if notify_flush: - with self.flush_condition: - self.flush_condition.notify() - # clean up list for index in range(idx): self.spans_list[index] = None + return idx def _drain_queue(self): """"Export all elements until queue is empty. @@ -226,26 +299,20 @@ def _drain_queue(self): `export` that is not thread safe. """ while self.queue: - self.export() + self._export_batch() def force_flush(self, timeout_millis: int = 30000) -> bool: if self.done: logger.warning("Already shutdown, ignoring call to force_flush().") return True - self._flushing = True - self.queue.appendleft(self._FLUSH_TOKEN_SPAN) - - # wake up worker thread with self.condition: + flush_request = self._get_or_create_flush_request() + # signal the worker thread to flush and wait for it to finish self.condition.notify_all() # wait for token to be processed - with self.flush_condition: - ret = self.flush_condition.wait(timeout_millis / 1e3) - - self._flushing = False - + ret = flush_request.event.wait(timeout_millis / 1e3) if not ret: logger.warning("Timeout was exceeded in force_flush().") return ret diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 43b7893951f..34e1d14d23c 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -13,8 +13,10 @@ # limitations under the License. import os +import threading import time import unittest +from concurrent.futures import ThreadPoolExecutor from logging import WARNING from unittest import mock @@ -31,11 +33,13 @@ def __init__( destination, max_export_batch_size=None, export_timeout_millis=0.0, + export_event: threading.Event = None, ): self.destination = destination self.max_export_batch_size = max_export_batch_size self.is_shutdown = False self.export_timeout = export_timeout_millis / 1e3 + self.export_event = export_event def export(self, spans: trace.Span) -> export.SpanExportResult: if ( @@ -45,6 +49,8 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: raise ValueError("Batch is too big") time.sleep(self.export_timeout) self.destination.extend(span.name for span in spans) + if self.export_event: + self.export_event.set() return export.SpanExportResult.SUCCESS def shutdown(self): @@ -148,6 +154,42 @@ def test_flush(self): span_processor.shutdown() + def test_flush_empty(self): + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor(my_exporter) + + self.assertTrue(span_processor.force_flush()) + + def test_flush_from_multiple_threads(self): + num_threads = 50 + num_spans = 10 + + span_list = [] + + my_exporter = MySpanExporter(destination=span_list) + span_processor = export.BatchExportSpanProcessor( + my_exporter, max_queue_size=512, max_export_batch_size=128 + ) + + def create_spans_and_flush(tno: int): + for span_idx in range(num_spans): + _create_start_and_end_span( + "Span {}-{}".format(tno, span_idx), span_processor + ) + self.assertTrue(span_processor.force_flush()) + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + future_list = [] + for thread_no in range(num_threads): + future = executor.submit(create_spans_and_flush, thread_no) + future_list.append(future) + + executor.shutdown() + + self.assertEqual(num_threads * num_spans, len(span_list)) + def test_flush_timeout(self): spans_names_list = [] @@ -209,17 +251,22 @@ def test_batch_span_processor_scheduled_delay(self): """Test that spans are exported each schedule_delay_millis""" spans_names_list = [] - my_exporter = MySpanExporter(destination=spans_names_list) + export_event = threading.Event() + my_exporter = MySpanExporter( + destination=spans_names_list, export_event=export_event + ) span_processor = export.BatchExportSpanProcessor( - my_exporter, schedule_delay_millis=50 + my_exporter, schedule_delay_millis=50, ) # create single span + start_time = time.time() _create_start_and_end_span("foo", span_processor) - time.sleep(0.05 + 0.02) - # span should be already exported + self.assertTrue(export_event.wait(2)) + export_time = time.time() self.assertEqual(len(spans_names_list), 1) + self.assertGreaterEqual((export_time - start_time) * 1e3, 50) span_processor.shutdown()