Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4339](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4339))
- `opentelemetry-instrumentation-confluent-kafka`: Skip `recv` span creation when `poll()` returns no message or `consume()` returns an empty list, avoiding empty spans on idle polls
([#4349](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4349))
- `opentelemetry-instrumentation-confluent-kafka`: Fix `ProxiedProducer` and `ProxiedConsumer` not delegating methods to the underlying producer/consumer instances
([#4278](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4278))
- Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone
([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305))
- Don't import module in unwrap if not already imported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def close(self): # pylint: disable=useless-super-delegation
return super().close()


class ProxiedProducer(Producer):
class ProxiedProducer:
def __init__(self, producer: Producer, tracer: Tracer):
self._producer = producer
self._tracer = tracer
Expand All @@ -177,8 +177,11 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
def original_producer(self):
return self._producer

def __getattr__(self, name):
return getattr(self._producer, name)

class ProxiedConsumer(Consumer):

class ProxiedConsumer:
def __init__(self, consumer: Consumer, tracer: Tracer):
self._consumer = consumer
self._tracer = tracer
Expand Down Expand Up @@ -224,6 +227,9 @@ def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs): # p
def original_consumer(self):
return self._consumer

def __getattr__(self, name):
return getattr(self._consumer, name)


class ConfluentKafkaInstrumentor(BaseInstrumentor):
"""An instrumentor for confluent kafka module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,43 @@ def test_producer_flush(self) -> None:
span_list = self.memory_exporter.get_finished_spans()
self._assert_span_count(span_list, 1)
self._assert_topic(span_list[0], "topic-1")

def test_proxied_producer_delegates_undefined_methods(
self,
) -> None:
"""Regression test for #4278: methods not defined on
ProxiedProducer must delegate to the underlying
producer instead of hitting an uninitialized handle."""
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []

producer = MockedProducer(
message_queue,
{
"bootstrap.servers": "localhost:29092",
},
)

proxied = instrumentation.instrument_producer(producer)
self.assertEqual(proxied.list_topics(), "producer_topics")

def test_proxied_consumer_delegates_undefined_methods(
self,
) -> None:
"""Regression test for #4278: methods not defined on
ProxiedConsumer must delegate to the underlying
consumer instead of hitting an uninitialized handle."""
instrumentation = ConfluentKafkaInstrumentor()

consumer = MockConsumer(
[],
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)

proxied = instrumentation.instrument_consumer(consumer)
self.assertEqual(proxied.list_topics(), "consumer_topics")
self.assertEqual(proxied.assignment(), [])
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ def poll(self, timeout=None):
return self._queue.pop(0)
return None

def list_topics(self, topic=None, timeout=-1): # pylint: disable=no-self-use
return "consumer_topics"

def assignment(self): # pylint: disable=no-self-use
return []


class MockedMessage:
def __init__(
Expand Down Expand Up @@ -77,3 +83,6 @@ def poll(self, *args, **kwargs):

def flush(self, *args, **kwargs):
return len(self._queue)

def list_topics(self, topic=None, timeout=-1): # pylint: disable=no-self-use
return "producer_topics"
Loading