Skip to content

Deferred task behaviour is different from normal task #40329

Description

@purnachandergit

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

V2.7.2

What happened?

I have defined a custom operator inheriting BaseSensorOperator

In execute method of the custom operator
First, I am increasing retries by 1
Second, I am reading task instance using context and increasing the max tries by 1
Third, if deferrable, deferring the task else sleeping for 60 sec

In case of deferred task, max tries is not getting increased where in other case, max tries getting updated

What you think should happen instead?

DB update is happening but some where in the deferred flow, the value is getting overridden

How to reproduce

test dag

from future import annotations

import datetime
from datetime import timedelta
from time import sleep

import pendulum
from airflow import DAG
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
from typing import Any

args = {
'owner': 'purna',
'start_date': "2024-06-13",
'depends_on_past': False
}

dag = DAG(
dag_id="sample_defer_test-purna",
schedule="0 0 * * *",
catchup=False,
default_args=args
)

class CustomUserSensor(BaseSensorOperator):
def init(
self,
deferrable: bool = False,
**kwargs
) -> None:
super().init(**kwargs)
self.deferrable = deferrable

def execute(self, context:Context) -> None:
    from airflow.models import TaskInstance
    from airflow.utils.session import create_session
    task_instance = context["task_instance"]
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    run_id = task_instance.run_id
    map_index = task_instance.map_index
    self.retries += 1
    with create_session() as session:
        db_task_instance = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id, TaskInstance.run_id == run_id, TaskInstance.map_index == map_index).first()
        
        if db_task_instance:
            db_task_instance.max_tries += 1
            session.merge(db_task_instance)
                  
        if self.deferrable:
            self.defer(trigger=TimeDeltaTrigger(timedelta(minutes=2)), method_name="execute_complete",)
        else:
            sleep(60)
                           
def execute_complete(self, context: Context, event: dict[str, Any] | None = None,) -> None:
    return

run_task2 = CustomUserSensor(task_id="tun_task2", poke_interval=20, dag=dag, deferrable=True)

run_task3 = CustomUserSensor(task_id="run_task3", poke_interval=20, dag=dag)

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions