From 0709c7afa18661a63c8d254403a70ab3a0036b92 Mon Sep 17 00:00:00 2001 From: zachliu Date: Fri, 4 Mar 2022 14:15:31 -0500 Subject: [PATCH 1/3] retry on very specific eni provision failures --- airflow/providers/amazon/aws/exceptions.py | 11 +++++ airflow/providers/amazon/aws/operators/ecs.py | 43 +++++++++++++------ .../amazon/aws/operators/test_ecs.py | 14 ++++-- 3 files changed, 51 insertions(+), 17 deletions(-) diff --git a/airflow/providers/amazon/aws/exceptions.py b/airflow/providers/amazon/aws/exceptions.py index 6ae5dab5e88e6..27f46dfae7563 100644 --- a/airflow/providers/amazon/aws/exceptions.py +++ b/airflow/providers/amazon/aws/exceptions.py @@ -21,6 +21,17 @@ import warnings +class EcsTaskFailToStart(Exception): + """Raise when ECS tasks fail to start AFTER processing the request.""" + + def __init__(self, message: str): + self.message = message + super().__init__(message) + + def __reduce__(self): + return EcsTaskFailToStart, (self.message) + + class EcsOperatorError(Exception): """Raise when ECS cannot handle the request.""" diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py index f982084b64b39..6e6e3ca67ad4a 100644 --- a/airflow/providers/amazon/aws/operators/ecs.py +++ b/airflow/providers/amazon/aws/operators/ecs.py @@ -30,7 +30,7 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator, XCom -from airflow.providers.amazon.aws.exceptions import EcsOperatorError +from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook from airflow.typing_compat import Protocol, runtime_checkable @@ -48,6 +48,16 @@ def should_retry(exception: Exception): return False +def should_retry_eni(exception: Exception): + """Check if exception is related to ENI (Elastic Network Interfaces).""" + if isinstance(exception, EcsTaskFailToStart): + return any( + eni_reason in exception.message + for eni_reason in ['network interface provisioning'] + ) + return False + + @runtime_checkable class EcsProtocol(Protocol): """ @@ -287,6 +297,23 @@ def execute(self, context, session=None): if self.reattach: self._try_reattach_task(context) + self._start_wait_check_task(context) + + self.log.info('ECS Task has been successfully executed') + + if self.reattach: + # Clear the XCom value storing the ECS task ARN if the task has completed + # as we can't reattach it anymore + self._xcom_del(session, self.REATTACH_XCOM_TASK_ID_TEMPLATE.format(task_id=self.task_id)) + + if self.do_xcom_push and self.task_log_fetcher: + return self.task_log_fetcher.get_last_log_message() + + return None + + @AwsBaseHook.retry(should_retry_eni) + def _start_wait_check_task(self, context): + if not self.arn: self._start_task(context) @@ -306,18 +333,6 @@ def execute(self, context, session=None): self._check_success_task() - self.log.info('ECS Task has been successfully executed') - - if self.reattach: - # Clear the XCom value storing the ECS task ARN if the task has completed - # as we can't reattach it anymore - self._xcom_del(session, self.REATTACH_XCOM_TASK_ID_TEMPLATE.format(task_id=self.task_id)) - - if self.do_xcom_push and self.task_log_fetcher: - return self.task_log_fetcher.get_last_log_message() - - return None - def _xcom_del(self, session, task_id): session.query(XCom).filter(XCom.dag_id == self.dag_id, XCom.task_id == task_id).delete() @@ -438,7 +453,7 @@ def _check_success_task(self) -> None: for task in response['tasks']: if task.get('stopCode', '') == 'TaskFailedToStart': - raise AirflowException(f"The task failed to start due to: {task.get('stoppedReason', '')}") + raise EcsTaskFailToStart(f"The task failed to start due to: {task.get('stoppedReason', '')}") # This is a `stoppedReason` that indicates a task has not # successfully finished, but there is no other indication of failure diff --git a/tests/providers/amazon/aws/operators/test_ecs.py b/tests/providers/amazon/aws/operators/test_ecs.py index a29f2a8df5b89..b12db98e64e65 100644 --- a/tests/providers/amazon/aws/operators/test_ecs.py +++ b/tests/providers/amazon/aws/operators/test_ecs.py @@ -28,8 +28,8 @@ from parameterized import parameterized from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.exceptions import EcsOperatorError -from airflow.providers.amazon.aws.operators.ecs import EcsOperator, EcsTaskLogFetcher, should_retry +from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart +from airflow.providers.amazon.aws.operators.ecs import EcsOperator, EcsTaskLogFetcher, should_retry, should_retry_eni # fmt: off RESPONSE_WITHOUT_FAILURES = { @@ -261,7 +261,7 @@ def test_check_success_tasks_raises_failed_to_start(self): ] } - with pytest.raises(Exception) as ctx: + with pytest.raises(EcsTaskFailToStart) as ctx: self.ecs._check_success_task() assert str(ctx.value) == "The task failed to start due to: Task failed to start" @@ -558,6 +558,14 @@ def test_return_false_on_invalid_reason(self): self.assertFalse(should_retry(EcsOperatorError([{'reason': 'CLUSTER_NOT_FOUND'}], 'Foo'))) +class TestShouldRetryEni(unittest.TestCase): + def test_return_true_on_valid_reason(self): + self.assertTrue(should_retry_eni(EcsTaskFailToStart("The task failed to start due to: Timeout waiting for network interface provisioning to complete."))) + + def test_return_false_on_invalid_reason(self): + self.assertFalse(should_retry_eni(EcsTaskFailToStart("The task failed to start due to: CannotPullContainerError: ref pull has been retried 5 time(s): failed to resolve reference"))) + + class TestEcsTaskLogFetcher(unittest.TestCase): @mock.patch('logging.Logger') def set_up_log_fetcher(self, logger_mock): From 30615272757bbd3d134a7dd30a6bb6e32d6eec19 Mon Sep 17 00:00:00 2001 From: zachliu Date: Fri, 4 Mar 2022 15:19:30 -0500 Subject: [PATCH 2/3] formats --- airflow/providers/amazon/aws/operators/ecs.py | 5 +--- .../amazon/aws/operators/test_ecs.py | 23 ++++++++++++++++--- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py index 6e6e3ca67ad4a..8a4a039bc2b9b 100644 --- a/airflow/providers/amazon/aws/operators/ecs.py +++ b/airflow/providers/amazon/aws/operators/ecs.py @@ -51,10 +51,7 @@ def should_retry(exception: Exception): def should_retry_eni(exception: Exception): """Check if exception is related to ENI (Elastic Network Interfaces).""" if isinstance(exception, EcsTaskFailToStart): - return any( - eni_reason in exception.message - for eni_reason in ['network interface provisioning'] - ) + return any(eni_reason in exception.message for eni_reason in ['network interface provisioning']) return False diff --git a/tests/providers/amazon/aws/operators/test_ecs.py b/tests/providers/amazon/aws/operators/test_ecs.py index b12db98e64e65..5d0ea0929cdee 100644 --- a/tests/providers/amazon/aws/operators/test_ecs.py +++ b/tests/providers/amazon/aws/operators/test_ecs.py @@ -29,7 +29,12 @@ from airflow.exceptions import AirflowException from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart -from airflow.providers.amazon.aws.operators.ecs import EcsOperator, EcsTaskLogFetcher, should_retry, should_retry_eni +from airflow.providers.amazon.aws.operators.ecs import ( + EcsOperator, + EcsTaskLogFetcher, + should_retry, + should_retry_eni, +) # fmt: off RESPONSE_WITHOUT_FAILURES = { @@ -560,10 +565,22 @@ def test_return_false_on_invalid_reason(self): class TestShouldRetryEni(unittest.TestCase): def test_return_true_on_valid_reason(self): - self.assertTrue(should_retry_eni(EcsTaskFailToStart("The task failed to start due to: Timeout waiting for network interface provisioning to complete."))) + self.assertTrue( + should_retry_eni( + EcsTaskFailToStart( + "The task failed to start due to: Timeout waiting for network interface provisioning to complete." + ) + ) + ) def test_return_false_on_invalid_reason(self): - self.assertFalse(should_retry_eni(EcsTaskFailToStart("The task failed to start due to: CannotPullContainerError: ref pull has been retried 5 time(s): failed to resolve reference"))) + self.assertFalse( + should_retry_eni( + EcsTaskFailToStart( + "The task failed to start due to: CannotPullContainerError: ref pull has been retried 5 time(s): failed to resolve reference" + ) + ) + ) class TestEcsTaskLogFetcher(unittest.TestCase): From a6b8be3ec29e488a3c0ce62661521769d83030c6 Mon Sep 17 00:00:00 2001 From: zachliu Date: Fri, 4 Mar 2022 15:47:51 -0500 Subject: [PATCH 3/3] line too long --- tests/providers/amazon/aws/operators/test_ecs.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_ecs.py b/tests/providers/amazon/aws/operators/test_ecs.py index 5d0ea0929cdee..3a2bcce4ead10 100644 --- a/tests/providers/amazon/aws/operators/test_ecs.py +++ b/tests/providers/amazon/aws/operators/test_ecs.py @@ -568,7 +568,8 @@ def test_return_true_on_valid_reason(self): self.assertTrue( should_retry_eni( EcsTaskFailToStart( - "The task failed to start due to: Timeout waiting for network interface provisioning to complete." + "The task failed to start due to: " + "Timeout waiting for network interface provisioning to complete." ) ) ) @@ -577,7 +578,9 @@ def test_return_false_on_invalid_reason(self): self.assertFalse( should_retry_eni( EcsTaskFailToStart( - "The task failed to start due to: CannotPullContainerError: ref pull has been retried 5 time(s): failed to resolve reference" + "The task failed to start due to: " + "CannotPullContainerError: " + "ref pull has been retried 5 time(s): failed to resolve reference" ) ) )