From 971e4d8d39d53764eb920b22bf1b63793491b977 Mon Sep 17 00:00:00 2001 From: pmoriello Date: Thu, 23 May 2024 17:50:53 +0200 Subject: [PATCH 1/3] Refresh KubernetesPodOperator properties when credentials expire also when logs are enabled --- .../cncf/kubernetes/operators/pod.py | 24 +++++++------- .../cncf/kubernetes/operators/test_pod.py | 31 ++++++++++++------- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index ef5366d6c4030..d6b0c06882dca 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -612,16 +612,7 @@ def execute_sync(self, context: Context): mode=ExecutionMode.SYNC, ) - if self.get_logs: - self.pod_manager.fetch_requested_container_logs( - pod=self.pod, - containers=self.container_logs, - follow_logs=True, - ) - if not self.get_logs or ( - self.container_logs is not True and self.base_container_name not in self.container_logs - ): - self.await_container_completion(pod=self.pod, container_name=self.base_container_name) + self.await_container_completion(pod=self.pod) if self.callbacks: self.callbacks.on_pod_completion( pod=self.find_pod(self.pod.metadata.namespace, context=context), @@ -654,9 +645,18 @@ def execute_sync(self, context: Context): retry=tenacity.retry_if_exception(lambda exc: check_exception_is_kubernetes_api_unauthorized(exc)), reraise=True, ) - def await_container_completion(self, pod: k8s.V1Pod, container_name: str): + def await_container_completion(self, pod: k8s.V1Pod): try: - self.pod_manager.await_container_completion(pod=pod, container_name=container_name) + if self.get_logs: + self.pod_manager.fetch_requested_container_logs( + pod=pod, + containers=self.container_logs, + follow_logs=True, + ) + if not self.get_logs or ( + self.container_logs is not True and self.base_container_name not in self.container_logs + ): + self.pod_manager.await_container_completion(pod=pod, container_name=self.base_container_name) except kubernetes.client.exceptions.ApiException as exc: if exc.status and str(exc.status) == "401": self.log.warning( diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index f7a8e24d6973d..7ba0dd2c33409 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1621,29 +1621,38 @@ def test_execute_async_callbacks(self): "pod": remote_pod_mock, } + @pytest.mark.parametrize("get_logs", [True, False]) + @patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs") @patch(f"{POD_MANAGER_CLASS}.await_container_completion") def test_await_container_completion_refreshes_properties_on_exception( - self, mock_await_container_completion + self, mock_await_container_completion, fetch_requested_container_logs, get_logs ): - container_name = "base" k = KubernetesPodOperator( task_id="task", + get_logs=get_logs ) pod = self.run_pod(k) client, hook, pod_manager = k.client, k.hook, k.pod_manager # no exception doesn't update properties - k.await_container_completion(pod, container_name=container_name) + k.await_container_completion(pod) assert client == k.client assert hook == k.hook assert pod_manager == k.pod_manager # exception refreshes properties mock_await_container_completion.side_effect = [ApiException(status=401), mock.DEFAULT] - k.await_container_completion(pod, container_name=container_name) - mock_await_container_completion.assert_has_calls( - [mock.call(pod=pod, container_name=container_name)] * 3 - ) + fetch_requested_container_logs.side_effect = [ApiException(status=401), mock.DEFAULT] + k.await_container_completion(pod) + + if get_logs: + fetch_requested_container_logs.assert_has_calls( + [mock.call(pod=pod, containers=k.container_logs, follow_logs=True)] * 3 + ) + else: + mock_await_container_completion.assert_has_calls( + [mock.call(pod=pod, container_name=k.base_container_name)] * 3 + ) assert client != k.client assert hook != k.hook assert pod_manager != k.pod_manager @@ -1662,20 +1671,20 @@ def test_await_container_completion_refreshes_properties_on_exception( def test_await_container_completion_retries_on_specific_exception( self, mock_await_container_completion, side_effect, exception_type, expect_exc ): - container_name = "base" k = KubernetesPodOperator( task_id="task", + get_logs=False, ) pod = self.run_pod(k) mock_await_container_completion.side_effect = side_effect if expect_exc: - k.await_container_completion(pod, container_name=container_name) + k.await_container_completion(pod) else: with pytest.raises(exception_type): - k.await_container_completion(pod, container_name=container_name) + k.await_container_completion(pod) expected_call_count = min(len(side_effect), 3) # retry max 3 times mock_await_container_completion.assert_has_calls( - [mock.call(pod=pod, container_name=container_name)] * expected_call_count + [mock.call(pod=pod, container_name=k.base_container_name)] * expected_call_count ) From eb21149df6ea5b90ca8cf6c1aeadee2d16059882 Mon Sep 17 00:00:00 2001 From: pmoriello Date: Thu, 23 May 2024 19:25:00 +0200 Subject: [PATCH 2/3] Linting --- tests/providers/cncf/kubernetes/operators/test_pod.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 7ba0dd2c33409..cd3b2b9399a5a 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1627,10 +1627,7 @@ def test_execute_async_callbacks(self): def test_await_container_completion_refreshes_properties_on_exception( self, mock_await_container_completion, fetch_requested_container_logs, get_logs ): - k = KubernetesPodOperator( - task_id="task", - get_logs=get_logs - ) + k = KubernetesPodOperator(task_id="task", get_logs=get_logs) pod = self.run_pod(k) client, hook, pod_manager = k.client, k.hook, k.pod_manager From a8255401ccf42116ba784b54d8e63f6790ffa81d Mon Sep 17 00:00:00 2001 From: pmoriello Date: Fri, 24 May 2024 08:18:12 +0200 Subject: [PATCH 3/3] Rename function --- airflow/providers/cncf/kubernetes/operators/pod.py | 4 ++-- tests/providers/cncf/kubernetes/operators/test_pod.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index d6b0c06882dca..67f63f66786ae 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -612,7 +612,7 @@ def execute_sync(self, context: Context): mode=ExecutionMode.SYNC, ) - self.await_container_completion(pod=self.pod) + self.await_pod_completion(pod=self.pod) if self.callbacks: self.callbacks.on_pod_completion( pod=self.find_pod(self.pod.metadata.namespace, context=context), @@ -645,7 +645,7 @@ def execute_sync(self, context: Context): retry=tenacity.retry_if_exception(lambda exc: check_exception_is_kubernetes_api_unauthorized(exc)), reraise=True, ) - def await_container_completion(self, pod: k8s.V1Pod): + def await_pod_completion(self, pod: k8s.V1Pod): try: if self.get_logs: self.pod_manager.fetch_requested_container_logs( diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index cd3b2b9399a5a..c1e0c29ff433f 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1632,7 +1632,7 @@ def test_await_container_completion_refreshes_properties_on_exception( client, hook, pod_manager = k.client, k.hook, k.pod_manager # no exception doesn't update properties - k.await_container_completion(pod) + k.await_pod_completion(pod) assert client == k.client assert hook == k.hook assert pod_manager == k.pod_manager @@ -1640,7 +1640,7 @@ def test_await_container_completion_refreshes_properties_on_exception( # exception refreshes properties mock_await_container_completion.side_effect = [ApiException(status=401), mock.DEFAULT] fetch_requested_container_logs.side_effect = [ApiException(status=401), mock.DEFAULT] - k.await_container_completion(pod) + k.await_pod_completion(pod) if get_logs: fetch_requested_container_logs.assert_has_calls( @@ -1675,10 +1675,10 @@ def test_await_container_completion_retries_on_specific_exception( pod = self.run_pod(k) mock_await_container_completion.side_effect = side_effect if expect_exc: - k.await_container_completion(pod) + k.await_pod_completion(pod) else: with pytest.raises(exception_type): - k.await_container_completion(pod) + k.await_pod_completion(pod) expected_call_count = min(len(side_effect), 3) # retry max 3 times mock_await_container_completion.assert_has_calls( [mock.call(pod=pod, container_name=k.base_container_name)] * expected_call_count