From 4867c13c1b360af39198f27737e6f4547ff07bcb Mon Sep 17 00:00:00 2001 From: Timothy Perry Date: Sun, 30 Mar 2025 15:10:12 -0700 Subject: [PATCH 1/4] Create test verifying issue 47971 exists on main --- .../tests/unit/models/test_taskinstance.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 2a9e86771c2f7..cef85ac26c617 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -627,6 +627,27 @@ def test_next_retry_datetime(self, dag_maker): date = ti.next_retry_datetime() assert date == ti.end_date + max_delay + def test_next_retry_datetime_returns_max_for_overflow(self, dag_maker): + delay = datetime.timedelta(seconds=30) + max_delay = datetime.timedelta(minutes=60) + + with dag_maker(dag_id="fail_dag"): + task = BashOperator( + task_id="task_with_exp_backoff_and_max_delay", + bash_command="exit 1", + retries=3, + retry_delay=delay, + retry_exponential_backoff=True, + max_retry_delay=max_delay, + ) + ti = dag_maker.create_dagrun().task_instances[0] + ti.task = task + ti.end_date = pendulum.instance(timezone.utcnow()) + + ti.try_number = 5000 + date = ti.next_retry_datetime() + assert date == ti.end_date + max_delay + @pytest.mark.parametrize("seconds", [0, 0.5, 1]) def test_next_retry_datetime_short_or_zero_intervals(self, dag_maker, seconds): delay = datetime.timedelta(seconds=seconds) From 56adf241476c9bdfa6173ce8b3f877dbc6a7fffe Mon Sep 17 00:00:00 2001 From: Timothy Perry Date: Sun, 30 Mar 2025 15:13:23 -0700 Subject: [PATCH 2/4] Use max delay when overflow occurs to fix 47971 --- .../src/airflow/models/taskinstance.py | 72 ++++++++++--------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 53ca7c6589e16..d300e51efef37 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1070,39 +1070,45 @@ def next_retry_datetime(self): delay = self.task.retry_delay if self.task.retry_exponential_backoff: - # If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus, - # we must round up prior to converting to an int, otherwise a divide by zero error - # will occur in the modded_hash calculation. - # this probably gives unexpected results if a task instance has previously been cleared, - # because try_number can increase without bound - min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1))) - - # In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1. - # To address this, we impose a lower bound of 1 on min_backoff. This effectively makes - # the ceiling function unnecessary, but the ceiling function was retained to avoid - # introducing a breaking change. - if min_backoff < 1: - min_backoff = 1 - - # deterministic per task instance - ti_hash = int( - hashlib.sha1( - f"{self.dag_id}#{self.task_id}#{self.logical_date}#{self.try_number}".encode(), - usedforsecurity=False, - ).hexdigest(), - 16, - ) - # between 1 and 1.0 * delay * (2^retry_number) - modded_hash = min_backoff + ti_hash % min_backoff - # timedelta has a maximum representable value. The exponentiation - # here means this value can be exceeded after a certain number - # of tries (around 50 if the initial delay is 1s, even fewer if - # the delay is larger). Cap the value here before creating a - # timedelta object so the operation doesn't fail with "OverflowError". - delay_backoff_in_seconds = min(modded_hash, MAX_RETRY_DELAY) - delay = timedelta(seconds=delay_backoff_in_seconds) - if self.task.max_retry_delay: - delay = min(self.task.max_retry_delay, delay) + try: + # If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus, + # we must round up prior to converting to an int, otherwise a divide by zero error + # will occur in the modded_hash calculation. + # this probably gives unexpected results if a task instance has previously been cleared, + # because try_number can increase without bound + min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1))) + + # In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1. + # To address this, we impose a lower bound of 1 on min_backoff. This effectively makes + # the ceiling function unnecessary, but the ceiling function was retained to avoid + # introducing a breaking change. + if min_backoff < 1: + min_backoff = 1 + + # deterministic per task instance + ti_hash = int( + hashlib.sha1( + f"{self.dag_id}#{self.task_id}#{self.logical_date}#{self.try_number}".encode(), + usedforsecurity=False, + ).hexdigest(), + 16, + ) + # between 1 and 1.0 * delay * (2^retry_number) + modded_hash = min_backoff + ti_hash % min_backoff + # timedelta has a maximum representable value. The exponentiation + # here means this value can be exceeded after a certain number + # of tries (around 50 if the initial delay is 1s, even fewer if + # the delay is larger). Cap the value here before creating a + # timedelta object so the operation doesn't fail with "OverflowError". + delay_backoff_in_seconds = min(modded_hash, MAX_RETRY_DELAY) + delay = timedelta(seconds=delay_backoff_in_seconds) + if self.task.max_retry_delay: + delay = min(self.task.max_retry_delay, delay) + except OverflowError: + if self.task.max_retry_delay: + delay = self.task.max_retry_delay + else: + delay = MAX_RETRY_DELAY return self.end_date + delay def ready_for_retry(self) -> bool: From 8912ae20fcaca686c74a56ebb3302e6c5593cbce Mon Sep 17 00:00:00 2001 From: Timothy Perry Date: Sun, 1 Jun 2025 10:50:01 -0700 Subject: [PATCH 3/4] Reduce scope of try block --- .../src/airflow/models/taskinstance.py | 59 +++++++++---------- .../tests/unit/models/test_taskinstance.py | 4 ++ 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index d300e51efef37..a83c844223412 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1077,38 +1077,35 @@ def next_retry_datetime(self): # this probably gives unexpected results if a task instance has previously been cleared, # because try_number can increase without bound min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1))) - - # In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1. - # To address this, we impose a lower bound of 1 on min_backoff. This effectively makes - # the ceiling function unnecessary, but the ceiling function was retained to avoid - # introducing a breaking change. - if min_backoff < 1: - min_backoff = 1 - - # deterministic per task instance - ti_hash = int( - hashlib.sha1( - f"{self.dag_id}#{self.task_id}#{self.logical_date}#{self.try_number}".encode(), - usedforsecurity=False, - ).hexdigest(), - 16, - ) - # between 1 and 1.0 * delay * (2^retry_number) - modded_hash = min_backoff + ti_hash % min_backoff - # timedelta has a maximum representable value. The exponentiation - # here means this value can be exceeded after a certain number - # of tries (around 50 if the initial delay is 1s, even fewer if - # the delay is larger). Cap the value here before creating a - # timedelta object so the operation doesn't fail with "OverflowError". - delay_backoff_in_seconds = min(modded_hash, MAX_RETRY_DELAY) - delay = timedelta(seconds=delay_backoff_in_seconds) - if self.task.max_retry_delay: - delay = min(self.task.max_retry_delay, delay) except OverflowError: - if self.task.max_retry_delay: - delay = self.task.max_retry_delay - else: - delay = MAX_RETRY_DELAY + min_backoff = MAX_RETRY_DELAY + + # In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1. + # To address this, we impose a lower bound of 1 on min_backoff. This effectively makes + # the ceiling function unnecessary, but the ceiling function was retained to avoid + # introducing a breaking change. + if min_backoff < 1: + min_backoff = 1 + + # deterministic per task instance + ti_hash = int( + hashlib.sha1( + f"{self.dag_id}#{self.task_id}#{self.logical_date}#{self.try_number}".encode(), + usedforsecurity=False, + ).hexdigest(), + 16, + ) + # between 1 and 1.0 * delay * (2^retry_number) + modded_hash = min_backoff + ti_hash % min_backoff + # timedelta has a maximum representable value. The exponentiation + # here means this value can be exceeded after a certain number + # of tries (around 50 if the initial delay is 1s, even fewer if + # the delay is larger). Cap the value here before creating a + # timedelta object so the operation doesn't fail with "OverflowError". + delay_backoff_in_seconds = min(modded_hash, MAX_RETRY_DELAY) + delay = timedelta(seconds=delay_backoff_in_seconds) + if self.task.max_retry_delay: + delay = min(self.task.max_retry_delay, delay) return self.end_date + delay def ready_for_retry(self) -> bool: diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index cef85ac26c617..c4ab350c07ef3 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -648,6 +648,10 @@ def test_next_retry_datetime_returns_max_for_overflow(self, dag_maker): date = ti.next_retry_datetime() assert date == ti.end_date + max_delay + ti.try_number = 50000 + date = ti.next_retry_datetime() + assert date == ti.end_date + max_delay + @pytest.mark.parametrize("seconds", [0, 0.5, 1]) def test_next_retry_datetime_short_or_zero_intervals(self, dag_maker, seconds): delay = datetime.timedelta(seconds=seconds) From 57e611ecb7aa6c3f44ef77a8a8c69e63dfa87e83 Mon Sep 17 00:00:00 2001 From: Timothy Perry Date: Tue, 1 Jul 2025 15:14:17 -0700 Subject: [PATCH 4/4] log when backoff calculation overflows --- airflow-core/src/airflow/models/taskinstance.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index a83c844223412..6bdeba70f25f0 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1079,6 +1079,9 @@ def next_retry_datetime(self): min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1))) except OverflowError: min_backoff = MAX_RETRY_DELAY + self.log.warning( + "OverflowError occurred while calculating min_backoff, using MAX_RETRY_DELAY for min_backoff." + ) # In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1. # To address this, we impose a lower bound of 1 on min_backoff. This effectively makes