From fa0c04d7248f5d506b0ace7f14de0cd617e7e8c4 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Mon, 18 Mar 2024 14:16:18 +0530 Subject: [PATCH 1/5] Avoid logging empty line KPO --- .../providers/cncf/kubernetes/operators/pod.py | 3 ++- .../cncf/kubernetes/utils/pod_manager.py | 3 ++- .../cncf/kubernetes/utils/test_pod_manager.py | 15 +++++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index f0f74f42c4217..6b0eb16c27429 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -791,7 +791,8 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime ) for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") - self.log.info("Container logs: %s", line) + if line.strip(): + self.log.info("Container logs: %s", line) except HTTPError as e: self.log.warning( "Reading of logs interrupted with error %r; will retry. " diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 787757828de37..69da412597124 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -481,7 +481,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC ) - self.log.info("[%s] %s", container_name, message_to_log) + if message_to_log and message_to_log.strip(): + self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp except TimeoutError as e: # in case of timeout, increment return time by 2 seconds to avoid diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index fb04e27212849..5ea3a0cd9feea 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -93,6 +93,21 @@ def test_read_pod_logs_retries_successfully(self): ] ) + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_fetch_container_logs_do_not_log_none(self, mock_container_is_running, caplog): + MockWrapper.reset() + caplog.set_level(logging.INFO) + + def consumer_iter(): + """This will simulate a container that hasn't produced any logs in the last read_timeout window""" + yield from () + + with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter: + mock_consumer_iter.side_effect = consumer_iter + mock_container_is_running.side_effect = [True, True, False] + self.pod_manager.fetch_container_logs(mock.MagicMock(), "container-name", follow=True) + assert "[container-name] None" not in (record.message for record in caplog.records) + def test_read_pod_logs_retries_fails(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.side_effect = [ From d427dd8a992bd1ec70bd3ea6d3ef94a9f32d52ff Mon Sep 17 00:00:00 2001 From: Pankaj Date: Mon, 18 Mar 2024 14:35:23 +0530 Subject: [PATCH 2/5] cleanup --- airflow/providers/cncf/kubernetes/operators/pod.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 6b0eb16c27429..74d8347ac65d9 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -790,8 +790,8 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime since_seconds=since_seconds, ) for raw_line in logs: - line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") - if line.strip(): + line = raw_line.decode("utf-8", errors="backslashreplace").strip() + if line: self.log.info("Container logs: %s", line) except HTTPError as e: self.log.warning( From 8de8e957c983ef07a194e4c0223f6c54fe752032 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 19 Mar 2024 23:35:22 +0530 Subject: [PATCH 3/5] Apply review suggestions --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 69da412597124..cd1a9b3b946a2 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -464,7 +464,8 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC ) - self.log.info("[%s] %s", container_name, message_to_log) + if message_to_log: + self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp message_to_log = message message_timestamp = line_timestamp @@ -481,7 +482,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC ) - if message_to_log and message_to_log.strip(): + if message_to_log: self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp except TimeoutError as e: From df8b23ab8eb544afde6a228ee604a62b4aa00732 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 20 Mar 2024 23:43:37 +0530 Subject: [PATCH 4/5] Apply review feedback --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index cd1a9b3b946a2..66bbd5d05a5a5 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -464,7 +464,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC ) - if message_to_log: + if message_to_log is not None: self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp message_to_log = message @@ -482,7 +482,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC ) - if message_to_log: + if message_to_log is not None: self.log.info("[%s] %s", container_name, message_to_log) last_captured_timestamp = message_timestamp except TimeoutError as e: From edc62a7eaf3621e7f2aaf777bb431a7adc860846 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 27 Mar 2024 17:09:01 +0530 Subject: [PATCH 5/5] Update airflow/providers/cncf/kubernetes/operators/pod.py --- airflow/providers/cncf/kubernetes/operators/pod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 74d8347ac65d9..a331987b57ff0 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -790,7 +790,7 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime since_seconds=since_seconds, ) for raw_line in logs: - line = raw_line.decode("utf-8", errors="backslashreplace").strip() + line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") if line: self.log.info("Container logs: %s", line) except HTTPError as e: