diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index f0f74f42c4217..a331987b57ff0 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -791,7 +791,8 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime ) for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") - self.log.info("Container logs: %s", line) + if line: + self.log.info("Container logs: %s", line) except HTTPError as e: self.log.warning( "Reading of logs interrupted with error %r; will retry. " diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 787757828de37..66bbd5d05a5a5 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -464,7 +464,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC ) - self.log.info("[%s] %s", container_name, message_to_log) + if message_to_log is not None: + self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp message_to_log = message message_timestamp = line_timestamp @@ -481,7 +482,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC ) - self.log.info("[%s] %s", container_name, message_to_log) + if message_to_log is not None: + self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp except TimeoutError as e: # in case of timeout, increment return time by 2 seconds to avoid diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index fb04e27212849..5ea3a0cd9feea 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -93,6 +93,21 @@ def test_read_pod_logs_retries_successfully(self): ] ) + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_fetch_container_logs_do_not_log_none(self, mock_container_is_running, caplog): + MockWrapper.reset() + caplog.set_level(logging.INFO) + + def consumer_iter(): + """This will simulate a container that hasn't produced any logs in the last read_timeout window""" + yield from () + + with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter: + mock_consumer_iter.side_effect = consumer_iter + mock_container_is_running.side_effect = [True, True, False] + self.pod_manager.fetch_container_logs(mock.MagicMock(), "container-name", follow=True) + assert "[container-name] None" not in (record.message for record in caplog.records) + def test_read_pod_logs_retries_fails(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.side_effect = [