Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions airflow-core/docs/administration-and-deployment/dag-bundles.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,124 @@ are configured so that impersonated users can access bundle files created by the
the need for shared group permissions.


Configuring Default Bundle Version Behavior
Comment thread
nathadfield marked this conversation as resolved.
--------------------------------------------

When a user clears a DAG run or task instance, the UI shows a checkbox asking whether to rerun
with the latest bundle version or with the version the original run used. The
``run_on_latest_version`` setting controls the default state of that checkbox, so teams don't
have to make that decision manually every time.

.. note::

This only applies to versioned bundle types (like ``GitDagBundle``). Local bundles
(``LocalDagBundle``) do not support versioning and will always use the latest code.

How It Works
~~~~~~~~~~~~

Each DAG has a **parsed version** (``DagModel.bundle_version``), updated every time the scheduler

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Each DAG has a **parsed version** (``DagModel.bundle_version``), updated every time the scheduler
Each DAG has a **parsed version** (``DagModel.bundle_version``), updated every time the dag processor

re-parses the DAG file. Separately, each bundle tracks a **latest version**
(``DagBundleModel.version``), updated when the bundle detects a new version (e.g. a new Git commit).

When ``run_on_latest_version`` is **False** (the default), reruns use the same bundle version as the
original run and new scheduled runs use the parsed version. This provides reproducibility when
debugging failures. When **True**, runs use the latest bundle version from the bundle, ensuring the
most recent code is always used.

The setting is resolved using the following precedence (highest to lowest):

1. **DAG-level**: The DAG's ``run_on_latest_version`` parameter (if ``True`` or ``False``)
2. **Global config**: The ``[core] run_on_latest_version`` option (if set)
3. **Default**: ``False``

Global Configuration
~~~~~~~~~~~~~~~~~~~~

Set organization-wide defaults using the ``[core] run_on_latest_version`` option. This applies to

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call this rerun_with_latest_version if adding this as a config so that it doesn't confuse as starting a new run on an old version.

all DAGs that don't explicitly override it at the DAG level.

.. code-block:: ini

[core]
run_on_latest_version = False # Default - rerun with the original bundle version
# run_on_latest_version = True # Rerun with the latest bundle version

DAG-Level Configuration
~~~~~~~~~~~~~~~~~~~~~~~

Override the global default for specific DAGs:

.. code-block:: python

from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

# Always rerun with the latest version
with DAG(
dag_id="always_latest_dag",
run_on_latest_version=True,
start_date=datetime(2024, 1, 1),
) as dag1:
EmptyOperator(task_id="task")

# Always rerun with the same version as the original run
with DAG(
dag_id="pinned_version_dag",
run_on_latest_version=False,
start_date=datetime(2024, 1, 1),
) as dag2:
EmptyOperator(task_id="task")

# Inherit from global configuration (default if omitted)
with DAG(
dag_id="default_behavior_dag",
start_date=datetime(2024, 1, 1),
) as dag3:
EmptyOperator(task_id="task")

Use Cases
~~~~~~~~~

**Debugging failed runs**:
With ``False`` (the default), clearing a failed run reruns it with the same code, making it
easier to reproduce and isolate issues.

**Always run latest code**:
Set ``[core] run_on_latest_version = True`` if your team prefers reruns to always pick up the
latest code, for example when bug fixes have been deployed since the original run.

**Mixed policy**:
Set the global default to ``True`` but override specific critical DAGs with
``run_on_latest_version=False`` for version stability where it matters most.

Relationship with ``disable_bundle_versioning``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Airflow provides two separate settings that affect bundle versioning behavior.
They serve different purposes:

``disable_bundle_versioning``
Turns off version tracking entirely. When set to ``True``, no ``bundle_version`` is
recorded on DAG runs. Use this when versioning adds no value, for example local
development with ``LocalDagBundle`` or pipelines where reproducibility is not a concern.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalDagBundle doesn't use disable_bundle_versioning. This is used by versioned bundles like GitDagBundle

Available as a DAG parameter and as a global config option
(``[dag_processor] disable_bundle_versioning``).

``run_on_latest_version``
Controls the default *rerun behavior* while keeping version tracking active. When a user
clears or reruns a task, this determines whether the new run uses the latest bundle
version or the original version. Versioning remains enabled so the version history is
still recorded. This only changes the default choice presented to users.

In short: ``disable_bundle_versioning`` answers "should we track versions at all?", while
``run_on_latest_version`` answers "when rerunning, which version should be the default?".
The two settings are independent. ``run_on_latest_version`` has no effect when versioning
is disabled.


Writing custom Dag bundles
--------------------------

Expand Down
30 changes: 30 additions & 0 deletions airflow-core/newsfragments/61448.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Add ``run_on_latest_version`` configuration for DAG bundle versioning

When clearing or rerunning tasks, this setting controls whether the new DAG run
uses the latest bundle version or the original version from the initial run.
The setting follows a three-level precedence:

1. **DAG-level**: ``run_on_latest_version`` parameter on the DAG definition.
2. **Global config**: ``[dag_bundles] run_on_latest_version`` in ``airflow.cfg``.
3. **Default**: ``False`` (use the original bundle version).

In Airflow 2.x, reruns always used the latest code. Airflow 3.x introduced bundle
versioning, defaulting to the original version. This setting gives users control
over which behaviour is the default.

See :doc:`/administration-and-deployment/dag-bundles` for full details.

* Types of change

* [ ] Dag changes
* [x] Config changes
* [x] API changes
* [ ] CLI changes
* [x] Behaviour changes
* [ ] Plugin changes
* [ ] Dependency changes
* [ ] Code interface changes

* Migration rules needed

* None - this is a new optional feature with backwards-compatible defaults
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class DAGDetailsResponse(DAGResponse):
owner_links: dict[str, str] | None = None
is_favorite: bool = False
active_runs_count: int = 0
run_on_latest_version: bool | None = None

@field_validator("timezone", mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TaskInstanceHistoryResponse(BaseModel):
max_tries: int
task_display_name: str
dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name"))
dag_run_bundle_version: str | None = Field(validation_alias=AliasPath("dag_run", "bundle_version"))
hostname: str | None
unixname: str | None
pool: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TaskInstanceResponse(BaseModel):
max_tries: int
task_display_name: str
dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name"))
dag_run_bundle_version: str | None = Field(validation_alias=AliasPath("dag_run", "bundle_version"))
hostname: str | None
unixname: str | None
pool: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ class ConfigResponse(BaseModel):
external_log_name: str | None = None
theme: Theme | None
multi_team: bool
run_on_latest_version: bool
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,9 @@ components:
multi_team:
type: boolean
title: Multi Team
run_on_latest_version:
type: boolean
title: Run On Latest Version
type: object
required:
- fallback_page_limit
Expand All @@ -1629,6 +1632,7 @@ components:
- show_external_log_redirect
- theme
- multi_team
- run_on_latest_version
title: ConfigResponse
description: configuration serializer.
ConnectionHookFieldBehavior:
Expand Down Expand Up @@ -3009,6 +3013,11 @@ components:
dag_display_name:
type: string
title: Dag Display Name
dag_run_bundle_version:
anyOf:
- type: string
- type: 'null'
title: Dag Run Bundle Version
hostname:
anyOf:
- type: string
Expand Down Expand Up @@ -3113,6 +3122,7 @@ components:
- max_tries
- task_display_name
- dag_display_name
- dag_run_bundle_version
- hostname
- unixname
- pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10356,6 +10356,11 @@ components:
type: integer
title: Active Runs Count
default: 0
run_on_latest_version:
anyOf:
- type: boolean
- type: 'null'
title: Run On Latest Version
file_token:
type: string
title: File Token
Expand Down Expand Up @@ -12564,6 +12569,11 @@ components:
dag_display_name:
type: string
title: Dag Display Name
dag_run_bundle_version:
anyOf:
- type: string
- type: 'null'
title: Dag Run Bundle Version
hostname:
anyOf:
- type: string
Expand Down Expand Up @@ -12643,6 +12653,7 @@ components:
- max_tries
- task_display_name
- dag_display_name
- dag_run_bundle_version
- hostname
- unixname
- pool
Expand Down Expand Up @@ -12720,6 +12731,11 @@ components:
dag_display_name:
type: string
title: Dag Display Name
dag_run_bundle_version:
anyOf:
- type: string
- type: 'null'
title: Dag Run Bundle Version
hostname:
anyOf:
- type: string
Expand Down Expand Up @@ -12824,6 +12840,7 @@ components:
- max_tries
- task_display_name
- dag_display_name
- dag_run_bundle_version
- hostname
- unixname
- pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
)
from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import BundleVersionUnavailable
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent
Expand Down Expand Up @@ -498,6 +499,11 @@ def trigger_dag_run(
current_user_id = user.get_id()
dag_run.note = (dag_run_note, current_user_id)
return dag_run
except BundleVersionUnavailable as e:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
f"Bundle version not yet available. Please retry: {e}",
Comment thread
nathadfield marked this conversation as resolved.
)
except ValueError as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def get_configs() -> ConfigResponse:
"external_log_name": getattr(task_log_reader.log_handler, "log_name", None),
"theme": loads(conf.get("api", "theme", fallback="{}")) or None,
"multi_team": conf.getboolean("core", "multi_team"),
"run_on_latest_version": conf.getboolean("core", "run_on_latest_version", fallback=False),
}

config.update({key: value for key, value in additional_config.items()})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
from airflow.exceptions import DagRunAlreadyExists
from airflow.exceptions import BundleVersionUnavailable, DagRunAlreadyExists
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DagRunModel
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -143,6 +143,19 @@ def trigger_dag_run(
"message": f"A run already exists for Dag '{dag_id}' with run_id '{run_id}'",
},
)
except BundleVersionUnavailable as e:
Comment thread
nathadfield marked this conversation as resolved.
log.warning(
"Bundle version unavailable when triggering DAG run: %s",
e,
extra={"dag_id": dag_id, "run_id": run_id},
)
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"reason": "bundle_version_unavailable",
"message": str(e),
},
)


@router.post(
Expand Down
10 changes: 9 additions & 1 deletion airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.dag_processing.dagbag import BundleDagBag, DagBag, sync_bag_to_db
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.exceptions import AirflowConfigException, AirflowException, BundleVersionUnavailable
from airflow.jobs.job import Job
from airflow.models import DagModel, DagRun, TaskInstance
from airflow.models.errors import ParseImportError
Expand Down Expand Up @@ -92,6 +92,14 @@ def dag_trigger(args) -> None:
data=[message] if message is not None else [],
output=args.output,
)
except BundleVersionUnavailable as err:
log.error(
"Bundle version not yet available: %s. "
"The bundle has been refreshed but DAGs have not been parsed yet. "
"Please retry in a few moments.",
err,
)
sys.exit(1)
except OSError as err:
raise AirflowException(err)

Expand Down
14 changes: 14 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,20 @@ core:
type: boolean
example: ~
default: "False"
run_on_latest_version:
description: |
When True, DAG runs will use the latest available bundle version by default
when triggered, rerun, or cleared. This can be overridden at the DAG level
(via the DAG's ``run_on_latest_version`` parameter).

.. note::

This only applies to bundles that support versioning (e.g., GitDagBundle).
LocalDagBundle and other non-versioned bundles are unaffected.
version_added: 3.2.0
type: boolean
example: ~
default: "False"
database:
description: ~
options:
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/config_templates/unit_tests.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ unit_test_mode = True
killed_task_cleanup_time = 5
# We only allow our own classes to be deserialized in tests
allowed_deserialization_classes = airflow.* tests.*
# Default behavior for bundle versioning
run_on_latest_version = False

[database]

Expand Down
Loading
Loading