From 34a86d266106b820e9efb557d50c0978cc45da85 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Sun, 22 Jun 2025 19:05:53 +0530 Subject: [PATCH] Replace usage of os.environ with conf_vars in kafka IT --- .../apache/kafka/operators/test_consume.py | 43 ++++++++----------- .../apache/kafka/operators/test_produce.py | 38 ++++++---------- 2 files changed, 32 insertions(+), 49 deletions(-) diff --git a/providers/apache/kafka/tests/integration/apache/kafka/operators/test_consume.py b/providers/apache/kafka/tests/integration/apache/kafka/operators/test_consume.py index 772c9317fafab..ddca5fc502e17 100644 --- a/providers/apache/kafka/tests/integration/apache/kafka/operators/test_consume.py +++ b/providers/apache/kafka/tests/integration/apache/kafka/operators/test_consume.py @@ -18,19 +18,17 @@ from __future__ import annotations -import json import logging -import os from typing import Any import pytest from confluent_kafka import Producer -from airflow.models import Connection - # Import Operator from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator +from tests_common.test_utils.config import conf_vars + log = logging.getLogger(__name__) @@ -52,32 +50,27 @@ def _basic_message_tester(message, test=None) -> Any: @pytest.mark.integration("kafka") +@conf_vars( + { + ( + "connections", + "operator.consumer.test.integration.test_1", + ): "kafka://broker:29092?socket.timeout.ms=10&bootstrap.servers=broker:29092&group.id=operator.consumer.test.integration.test_1&enable.auto.commit=False&auto.offset.reset=beginning", + ( + "connections", + "operator.consumer.test.integration.test_2", + ): "kafka://broker:29092?socket.timeout.ms=10&bootstrap.servers=broker:29092&group.id=operator.consumer.test.integration.test_2&enable.auto.commit=False&auto.offset.reset=beginning", + ( + "connections", + "operator.consumer.test.integration.test_3", + ): "kafka://broker:29092?socket.timeout.ms=10&bootstrap.servers=broker:29092&group.id=operator.consumer.test.integration.test_3&enable.auto.commit=False&auto.offset.reset=beginning", + } +) class TestConsumeFromTopic: """ test ConsumeFromTopicOperator """ - def setup_method(self): - """Set up connections for each test method.""" - # Create separate connections for each test - for num in (1, 2, 3): - conn = Connection( - conn_id=f"operator.consumer.test.integration.test_{num}", - conn_type="kafka", - extra=json.dumps( - { - "socket.timeout.ms": 10, - "bootstrap.servers": "broker:29092", - "group.id": f"operator.consumer.test.integration.test_{num}", - "enable.auto.commit": False, - "auto.offset.reset": "beginning", - } - ), - ) - - env_var_name = f"AIRFLOW_CONN_{conn.conn_id.upper()}" - os.environ[env_var_name] = conn.get_uri() - def test_consumer_operator_test_1(self): """test consumer works with string import""" diff --git a/providers/apache/kafka/tests/integration/apache/kafka/operators/test_produce.py b/providers/apache/kafka/tests/integration/apache/kafka/operators/test_produce.py index ee73c133b4195..ada93900ce67d 100644 --- a/providers/apache/kafka/tests/integration/apache/kafka/operators/test_produce.py +++ b/providers/apache/kafka/tests/integration/apache/kafka/operators/test_produce.py @@ -18,14 +18,14 @@ import json import logging -import os import pytest from confluent_kafka import Consumer -from airflow.models import Connection from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator +from tests_common.test_utils.config import conf_vars + log = logging.getLogger(__name__) @@ -35,33 +35,23 @@ def _producer_function(): @pytest.mark.integration("kafka") +@conf_vars( + { + ( + "connections", + "kafka_default_test_1", + ): "kafka://broker:29092?socket.timeout.ms=10&message.timeout.ms=10&group.id=operator.producer.test.integration.test_1", + ( + "connections", + "kafka_default_test_2", + ): "kafka://broker:29092?socket.timeout.ms=10&message.timeout.ms=10&group.id=operator.producer.test.integration.test_2", + } +) class TestProduceToTopic: """ test ProduceToTopicOperator """ - def setup_method(self): - """Set up connections for each test method.""" - # Create separate connections for each test - for num in (1, 2): - group = f"operator.producer.test.integration.test_{num}" - conn = Connection( - conn_id=f"kafka_default_test_{num}", - conn_type="kafka", - extra=json.dumps( - { - "socket.timeout.ms": 10, - "message.timeout.ms": 10, - "bootstrap.servers": "broker:29092", - "group.id": group, - } - ), - ) - - # Set environment variable directly (like create_connection_without_db does) - env_var_name = f"AIRFLOW_CONN_{conn.conn_id.upper()}" - os.environ[env_var_name] = conn.get_uri() - def test_producer_operator_test_1(self): GROUP = "operator.producer.test.integration.test_1" TOPIC = "operator.producer.test.integration.test_1"