Skip to content

Restoring include_downstream_dags logic to clear downstream Dags#65314

Open
jroachgolf84 wants to merge 14 commits into
apache:mainfrom
jroachgolf84:issue-61451
Open

Restoring include_downstream_dags logic to clear downstream Dags#65314
jroachgolf84 wants to merge 14 commits into
apache:mainfrom
jroachgolf84:issue-61451

Conversation

@jroachgolf84

@jroachgolf84 jroachgolf84 commented Apr 15, 2026

Copy link
Copy Markdown
Collaborator

Description

Restoring behavior to clear downstream DAGs when a Task state is cleared (if applicable).

closes: #61451

Testing

Unit-tests were added/updated to test these changes. The changes were also tested E2E.

Unit Tests

The unit-tests added/updated as part of this effort can be found in these files:

  • airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
  • airflow-core/tests/unit/serialization/definitions/test_dag.py

These tests can be executed using the commands below:

breeze testing core-tests airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py

breeze testing core-tests airflow-core/tests/unit/serialization/definitions/test_dag.py

E2E Testing

The DAG below was used to test the changes int his PR. This DAG is made up of a parent and child DAG. The ExternalTaskMarker Operator is cleared in the parent DAG, the downstream Tasks in the child DAG are also cleared. That's showed in the video below.

from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.sdk import DAG

from datetime import datetime


with DAG(
    dag_id="parent_dag_61451",
    start_date=datetime(2026, 1, 18),
    schedule="@daily",
    catchup=False,
    tags=["issue-61451"],
) as parent_dag:
    parent_echo = BashOperator(
        task_id="parent_echo",
        bash_command="echo parent_dag_61451",
    )

    trigger_child = ExternalTaskMarker(
        task_id="trigger_child",
        external_dag_id="child_dag_61451",
        external_task_id="receive_call_from_parent",
    )

    parent_echo >> trigger_child

with DAG(
    dag_id="child_dag_61451",
    start_date=datetime(2026, 1, 18),
    schedule="@daily",
    catchup=False,
    tags=["issue-61451"],
) as child_dag:
    receive_call_from_parent = ExternalTaskSensor(
        task_id="receive_call_from_parent",
        external_dag_id="parent_dag_61451",
        external_task_id="trigger_child",
        poke_interval=5,
    )

    child_echo = BashOperator(
        task_id="child_echo",
        bash_command="echo DONE",
    )

    receive_call_from_parent >> child_echo

Video

issue-61451-include-downstream-tasks.mov

@LinasData

Copy link
Copy Markdown

Hey, any updates on this?

@jroachgolf84 jroachgolf84 added the ready for maintainer review Set after triaging when all criteria pass. label Jun 13, 2026
@jroachgolf84

Copy link
Copy Markdown
Collaborator Author

@jason810496 - can you take a look at this one for me?

@jroachgolf84

Copy link
Copy Markdown
Collaborator Author

@bugraoz93 - do you mind taking a look at this one?

Comment thread airflow-core/src/airflow/serialization/definitions/dag.py
Comment thread airflow-core/src/airflow/serialization/definitions/dag.py
if task.logical_date != default_template:
from airflow.models.renderedtifields import RenderedTaskInstanceFields

rendered = RenderedTaskInstanceFields.get_templated_fields(ti, session=session)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The custom-template path has a case that isn't handled: if the marker uses a non-default logical_date template and the RTIF row has already been pruned (num_dag_runs_to_retain_rendered_fields), rendered comes back empty and logical_date_str quietly stays as the marker run's own date from line 1214 -- so the child lookup targets the wrong DagRun. The test seeds the RTIF row, so this case isn't exercised. Either fail loudly or add a test with the row missing.

Comment thread airflow-core/src/airflow/serialization/definitions/dag.py
@kaxil kaxil changed the title issue-61451: Restoring include_downstream_dags logic Restoring include_downstream_dags logic to clear downstream Dags Jun 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:DAG-processing ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

I am upgrading from Airflow 2.11.0 to Airflow 3.1.6 and noticed a change in behavior regarding the ExternalTaskMarker

3 participants