diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c9c67be4b..51d4389fa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4339](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4339)) - `opentelemetry-instrumentation-confluent-kafka`: Skip `recv` span creation when `poll()` returns no message or `consume()` returns an empty list, avoiding empty spans on idle polls ([#4349](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4349)) +- `opentelemetry-instrumentation-confluent-kafka`: Fix `ProxiedProducer` and `ProxiedConsumer` not delegating methods to the underlying producer/consumer instances + ([#4278](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4278)) - Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone ([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305)) - Don't import module in unwrap if not already imported diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index ed390d7006..7625b32b88 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -151,7 +151,7 @@ def close(self): # pylint: disable=useless-super-delegation return super().close() -class ProxiedProducer(Producer): +class ProxiedProducer: def __init__(self, producer: Producer, tracer: Tracer): self._producer = producer self._tracer = tracer @@ -177,8 +177,11 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor def original_producer(self): return self._producer + def __getattr__(self, name): + return getattr(self._producer, name) -class ProxiedConsumer(Consumer): + +class ProxiedConsumer: def __init__(self, consumer: Consumer, tracer: Tracer): self._consumer = consumer self._tracer = tracer @@ -224,6 +227,9 @@ def subscribe(self, topics, on_assign=lambda *args: None, *args, **kwargs): # p def original_consumer(self): return self._consumer + def __getattr__(self, name): + return getattr(self._consumer, name) + class ConfluentKafkaInstrumentor(BaseInstrumentor): """An instrumentor for confluent kafka module diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 772ecd09ee..2edb0b5a33 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -447,3 +447,43 @@ def test_producer_flush(self) -> None: span_list = self.memory_exporter.get_finished_spans() self._assert_span_count(span_list, 1) self._assert_topic(span_list[0], "topic-1") + + def test_proxied_producer_delegates_undefined_methods( + self, + ) -> None: + """Regression test for #4278: methods not defined on + ProxiedProducer must delegate to the underlying + producer instead of hitting an uninitialized handle.""" + instrumentation = ConfluentKafkaInstrumentor() + message_queue = [] + + producer = MockedProducer( + message_queue, + { + "bootstrap.servers": "localhost:29092", + }, + ) + + proxied = instrumentation.instrument_producer(producer) + self.assertEqual(proxied.list_topics(), "producer_topics") + + def test_proxied_consumer_delegates_undefined_methods( + self, + ) -> None: + """Regression test for #4278: methods not defined on + ProxiedConsumer must delegate to the underlying + consumer instead of hitting an uninitialized handle.""" + instrumentation = ConfluentKafkaInstrumentor() + + consumer = MockConsumer( + [], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + + proxied = instrumentation.instrument_consumer(consumer) + self.assertEqual(proxied.list_topics(), "consumer_topics") + self.assertEqual(proxied.assignment(), []) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index f87dbd6576..72765fe4c3 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -18,6 +18,12 @@ def poll(self, timeout=None): return self._queue.pop(0) return None + def list_topics(self, topic=None, timeout=-1): # pylint: disable=no-self-use + return "consumer_topics" + + def assignment(self): # pylint: disable=no-self-use + return [] + class MockedMessage: def __init__( @@ -77,3 +83,6 @@ def poll(self, *args, **kwargs): def flush(self, *args, **kwargs): return len(self._queue) + + def list_topics(self, topic=None, timeout=-1): # pylint: disable=no-self-use + return "producer_topics"