Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
([#1034](https://github.com/open-telemetry/opentelemetry-python/pull/1034))
- Remove lazy Event and Link API from Span interface
([#1045](https://github.com/open-telemetry/opentelemetry-python/pull/1045))
- Improve BatchExportSpanProcessor
([#1062](https://github.com/open-telemetry/opentelemetry-python/pull/1062))

## Version 0.12b0

Expand Down
145 changes: 106 additions & 39 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import typing
from enum import Enum

from opentelemetry.context import attach, detach, get_current, set_value
from opentelemetry.trace import DefaultSpan
from opentelemetry.context import attach, detach, set_value
from opentelemetry.util import time_ns

from .. import Span, SpanProcessor
Expand Down Expand Up @@ -91,15 +90,23 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


class _FlushRequest:
"""Represents a request for the BatchExportSpanProcessor to flush spans."""

__slots__ = ["event", "num_spans"]

def __init__(self):
self.event = threading.Event()
self.num_spans = 0


class BatchExportSpanProcessor(SpanProcessor):
"""Batch span processor implementation.

BatchExportSpanProcessor is an implementation of `SpanProcessor` that
batches ended spans and pushes them to the configured `SpanExporter`.
"""

_FLUSH_TOKEN_SPAN = DefaultSpan(context=None)

def __init__(
self,
span_exporter: SpanExporter,
Expand Down Expand Up @@ -129,9 +136,7 @@ def __init__(
) # type: typing.Deque[Span]
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
self.condition = threading.Condition(threading.Lock())
self.flush_condition = threading.Condition(threading.Lock())
# flag to indicate that there is a flush operation on progress
self._flushing = False
self._flush_request = None # type: typing.Optional[_FlushRequest]
self.schedule_delay_millis = schedule_delay_millis
self.max_export_batch_size = max_export_batch_size
self.max_queue_size = max_queue_size
Expand Down Expand Up @@ -164,60 +169,128 @@ def on_end(self, span: Span) -> None:

def worker(self):
timeout = self.schedule_delay_millis / 1e3
flush_request = None # type: typing.Optional[_FlushRequest]
while not self.done:
if (
len(self.queue) < self.max_export_batch_size
and not self._flushing
):
with self.condition:
with self.condition:
if self.done:
# done flag may have changed, avoid waiting
break
flush_request = self._get_and_unset_flush_request()
if (
len(self.queue) < self.max_export_batch_size
and flush_request is None
):

self.condition.wait(timeout)
flush_request = self._get_and_unset_flush_request()
if not self.queue:
# spurious notification, let's wait again
self._notify_flush_request_finished(flush_request)
flush_request = None
continue
if self.done:
# missing spans will be sent when calling flush
break

# substract the duration of this export call to the next timeout
# subtract the duration of this export call to the next timeout
start = time_ns()
self.export()
self.export(flush_request)
end = time_ns()
duration = (end - start) / 1e9
timeout = self.schedule_delay_millis / 1e3 - duration

self._notify_flush_request_finished(flush_request)
flush_request = None

# there might have been a new flush request while export was running
# and before the done flag switched to true
with self.condition:
shutdown_flush_request = self._get_and_unset_flush_request()

# be sure that all spans are sent
self._drain_queue()
self._notify_flush_request_finished(flush_request)
self._notify_flush_request_finished(shutdown_flush_request)

def _get_and_unset_flush_request(self,) -> typing.Optional[_FlushRequest]:
"""Returns the current flush request and makes it invisible to the
worker thread for subsequent calls.
"""
flush_request = self._flush_request
self._flush_request = None
if flush_request is not None:
flush_request.num_spans = len(self.queue)
return flush_request

@staticmethod
def _notify_flush_request_finished(
flush_request: typing.Optional[_FlushRequest],
):
"""Notifies the flush initiator(s) waiting on the given request/event
that the flush operation was finished.
"""
if flush_request is not None:
flush_request.event.set()

def _get_or_create_flush_request(self) -> _FlushRequest:
"""Either returns the current active flush event or creates a new one.

def export(self) -> None:
"""Exports at most max_export_batch_size spans."""
The flush event will be visible and read by the worker thread before an
export operation starts. Callers of a flush operation may wait on the
returned event to be notified when the flush/export operation was
finished.

This method is not thread-safe, i.e. callers need to take care about
synchronization/locking.
"""
if self._flush_request is None:
self._flush_request = _FlushRequest()
return self._flush_request

def export(self, flush_request: typing.Optional[_FlushRequest]):
"""Exports spans considering the given flush_request.

In case of a given flush_requests spans are exported in batches until
the number of exported spans reached or exceeded the number of spans in
the flush request.
In no flush_request was given at most max_export_batch_size spans are
exported.
"""
if not flush_request:
self.export_batch()
return

num_spans = flush_request.num_spans
while self.queue:
num_exported = self.export_batch()
num_spans -= num_exported

if num_spans <= 0:
break

def export_batch(self) -> int:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make this method internal to the processor?

Suggested change
def export_batch(self) -> int:
def _export_batch(self) -> int:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good since it should only be called by the worker thread. same goes for the export method.

"""Exports at most max_export_batch_size spans and returns the number of
exported spans.
"""
idx = 0
notify_flush = False
# currently only a single thread acts as consumer, so queue.pop() will
# not raise an exception
while idx < self.max_export_batch_size and self.queue:
span = self.queue.pop()
if span is self._FLUSH_TOKEN_SPAN:
notify_flush = True
else:
self.spans_list[idx] = span
idx += 1
self.spans_list[idx] = self.queue.pop()
idx += 1
token = attach(set_value("suppress_instrumentation", True))
try:
# Ignore type b/c the Optional[None]+slicing is too "clever"
# for mypy
self.span_exporter.export(self.spans_list[:idx]) # type: ignore
# pylint: disable=broad-except
except Exception:
except Exception: # pylint: disable=broad-except
logger.exception("Exception while exporting Span batch.")
detach(token)

if notify_flush:
with self.flush_condition:
self.flush_condition.notify()

# clean up list
for index in range(idx):
self.spans_list[index] = None
return idx

def _drain_queue(self):
""""Export all elements until queue is empty.
Expand All @@ -226,26 +299,20 @@ def _drain_queue(self):
`export` that is not thread safe.
"""
while self.queue:
self.export()
self.export_batch()

def force_flush(self, timeout_millis: int = 30000) -> bool:
if self.done:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True

self._flushing = True
self.queue.appendleft(self._FLUSH_TOKEN_SPAN)

# wake up worker thread
with self.condition:
flush_request = self._get_or_create_flush_request()
# signal the worker thread to flush and wait for it to finish
self.condition.notify_all()

# wait for token to be processed
with self.flush_condition:
ret = self.flush_condition.wait(timeout_millis / 1e3)

self._flushing = False

ret = flush_request.event.wait(timeout_millis / 1e3)
if not ret:
logger.warning("Timeout was exceeded in force_flush().")
return ret
Expand Down
55 changes: 51 additions & 4 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

import os
import threading
import time
import unittest
from concurrent.futures import ThreadPoolExecutor
from logging import WARNING
from unittest import mock

Expand All @@ -31,11 +33,13 @@ def __init__(
destination,
max_export_batch_size=None,
export_timeout_millis=0.0,
export_event: threading.Event = None,
):
self.destination = destination
self.max_export_batch_size = max_export_batch_size
self.is_shutdown = False
self.export_timeout = export_timeout_millis / 1e3
self.export_event = export_event

def export(self, spans: trace.Span) -> export.SpanExportResult:
if (
Expand All @@ -45,6 +49,8 @@ def export(self, spans: trace.Span) -> export.SpanExportResult:
raise ValueError("Batch is too big")
time.sleep(self.export_timeout)
self.destination.extend(span.name for span in spans)
if self.export_event:
self.export_event.set()
return export.SpanExportResult.SUCCESS

def shutdown(self):
Expand Down Expand Up @@ -148,6 +154,42 @@ def test_flush(self):

span_processor.shutdown()

def test_flush_empty(self):
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchExportSpanProcessor(my_exporter)

self.assertTrue(span_processor.force_flush())

def test_flush_from_multiple_threads(self):
num_threads = 50
num_spans = 10

span_list = []

my_exporter = MySpanExporter(destination=span_list)
span_processor = export.BatchExportSpanProcessor(
my_exporter, max_queue_size=512, max_export_batch_size=128
)

def create_spans_and_flush(tno: int):
for span_idx in range(num_spans):
_create_start_and_end_span(
"Span {}-{}".format(tno, span_idx), span_processor
)
self.assertTrue(span_processor.force_flush())

with ThreadPoolExecutor(max_workers=num_threads) as executor:
future_list = []
for thread_no in range(num_threads):
future = executor.submit(create_spans_and_flush, thread_no)
future_list.append(future)

executor.shutdown()

self.assertEqual(num_threads * num_spans, len(span_list))

def test_flush_timeout(self):
spans_names_list = []

Expand Down Expand Up @@ -209,17 +251,22 @@ def test_batch_span_processor_scheduled_delay(self):
"""Test that spans are exported each schedule_delay_millis"""
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
export_event = threading.Event()
my_exporter = MySpanExporter(
destination=spans_names_list, export_event=export_event
)
span_processor = export.BatchExportSpanProcessor(
my_exporter, schedule_delay_millis=50
my_exporter, schedule_delay_millis=50,
)

# create single span
start_time = time.time()
_create_start_and_end_span("foo", span_processor)

time.sleep(0.05 + 0.02)
# span should be already exported
self.assertTrue(export_event.wait(2))
export_time = time.time()
self.assertEqual(len(spans_names_list), 1)
self.assertGreaterEqual((export_time - start_time) * 1e3, 50)

span_processor.shutdown()

Expand Down