From d3782be1bb8ebabe282c9c1368f18107f61208ff Mon Sep 17 00:00:00 2001 From: Nathan Hadfield Date: Wed, 4 Feb 2026 13:17:57 +0000 Subject: [PATCH] Add three-level run_on_latest_version configuration hierarchy Implement global and DAG-level configuration for run_on_latest_version to control bundle version selection when creating DAG runs. This provides flexibility to configure the behavior at multiple levels with a clear precedence hierarchy: DAG-level > Global config > System default (False). Configuration levels: - Global: [core] run_on_latest_version config option - DAG-level: @dag(run_on_latest_version=True/False) parameter - UI: Clear/Rerun dialogs with checkbox when bundle versions differ Implementation: - Add run_on_latest_version parameter to DAG definition and serialization - Add global config option in config.yml with proper defaults - Implement three-level resolution logic in SerializedDAG._resolve_bundle_version() - Add BundleVersionUnavailable exception handling in scheduler for race conditions - Create custom React hook (useRunOnLatestVersion) for UI integration - Update Clear/Rerun dialogs to show version selection when applicable Exception handling: - Scheduler handles BundleVersionUnavailable gracefully for scheduled runs - Scheduler handles BundleVersionUnavailable gracefully for asset-triggered runs - Execution API returns 503 with retry guidance during bundle refresh windows - Runs are created on next scheduling cycle once bundle is parsed Testing: - Add comprehensive tests for all configuration levels and precedence - Add scheduler tests for exception handling during bundle refresh - Fix test isolation issues and database constraint violations - 648 tests passed across scheduler, DAG model, execution API, and serialization Fixes #60887 Co-Authored-By: Claude Sonnet 4.5 --- .../dag-bundles.rst | 118 ++++++++++ .../newsfragments/61448.significant.rst | 30 +++ .../api_fastapi/core_api/datamodels/dags.py | 1 + .../datamodels/task_instance_history.py | 1 + .../core_api/datamodels/task_instances.py | 1 + .../core_api/datamodels/ui/config.py | 1 + .../core_api/openapi/_private_ui.yaml | 10 + .../openapi/v2-rest-api-generated.yaml | 17 ++ .../core_api/routes/public/dag_run.py | 6 + .../api_fastapi/core_api/routes/ui/config.py | 1 + .../execution_api/routes/dag_runs.py | 15 +- .../src/airflow/cli/commands/dag_command.py | 10 +- .../src/airflow/config_templates/config.yml | 14 ++ .../airflow/config_templates/unit_tests.cfg | 2 + airflow-core/src/airflow/exceptions.py | 11 + .../src/airflow/jobs/scheduler_job_runner.py | 139 +++++++---- airflow-core/src/airflow/models/backfill.py | 89 ++++--- .../src/airflow/models/taskinstance.py | 2 +- .../airflow/serialization/definitions/dag.py | 139 ++++++++++- .../src/airflow/serialization/schema.json | 3 +- .../serialization/serialized_objects.py | 1 + .../ui/openapi-gen/requests/schemas.gen.ts | 43 +++- .../ui/openapi-gen/requests/types.gen.ts | 4 + .../components/Clear/Run/ClearRunDialog.tsx | 21 +- .../ClearGroupTaskInstanceDialog.tsx | 29 ++- .../TaskInstance/ClearTaskInstanceDialog.tsx | 27 ++- .../components/Clear/useRunOnLatestVersion.ts | 94 ++++++++ .../core_api/routes/public/test_dag_run.py | 11 + .../core_api/routes/public/test_dags.py | 54 +++++ .../core_api/routes/public/test_hitl.py | 1 + .../routes/public/test_task_instances.py | 29 +++ .../core_api/routes/ui/test_config.py | 42 ++++ .../versions/head/test_dag_runs.py | 45 ++++ .../tests/unit/jobs/test_scheduler_job.py | 136 ++++++++++- .../tests/unit/models/test_backfill.py | 27 +++ airflow-core/tests/unit/models/test_dag.py | 219 +++++++++++++++++- .../serialization/test_dag_serialization.py | 17 ++ .../airflowctl/api/datamodels/generated.py | 3 + task-sdk/src/airflow/sdk/definitions/dag.py | 6 + 39 files changed, 1304 insertions(+), 115 deletions(-) create mode 100644 airflow-core/newsfragments/61448.significant.rst create mode 100644 airflow-core/src/airflow/ui/src/components/Clear/useRunOnLatestVersion.ts diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst b/airflow-core/docs/administration-and-deployment/dag-bundles.rst index 9dd9b9681206f..4fa257560e0bf 100644 --- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst +++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst @@ -139,6 +139,124 @@ are configured so that impersonated users can access bundle files created by the the need for shared group permissions. +Configuring Default Bundle Version Behavior +-------------------------------------------- + +When a user clears a DAG run or task instance, the UI shows a checkbox asking whether to rerun +with the latest bundle version or with the version the original run used. The +``run_on_latest_version`` setting controls the default state of that checkbox, so teams don't +have to make that decision manually every time. + +.. note:: + + This only applies to versioned bundle types (like ``GitDagBundle``). Local bundles + (``LocalDagBundle``) do not support versioning and will always use the latest code. + +How It Works +~~~~~~~~~~~~ + +Each DAG has a **parsed version** (``DagModel.bundle_version``), updated every time the scheduler +re-parses the DAG file. Separately, each bundle tracks a **latest version** +(``DagBundleModel.version``), updated when the bundle detects a new version (e.g. a new Git commit). + +When ``run_on_latest_version`` is **False** (the default), reruns use the same bundle version as the +original run and new scheduled runs use the parsed version. This provides reproducibility when +debugging failures. When **True**, runs use the latest bundle version from the bundle, ensuring the +most recent code is always used. + +The setting is resolved using the following precedence (highest to lowest): + +1. **DAG-level**: The DAG's ``run_on_latest_version`` parameter (if ``True`` or ``False``) +2. **Global config**: The ``[core] run_on_latest_version`` option (if set) +3. **Default**: ``False`` + +Global Configuration +~~~~~~~~~~~~~~~~~~~~ + +Set organization-wide defaults using the ``[core] run_on_latest_version`` option. This applies to +all DAGs that don't explicitly override it at the DAG level. + +.. code-block:: ini + + [core] + run_on_latest_version = False # Default - rerun with the original bundle version + # run_on_latest_version = True # Rerun with the latest bundle version + +DAG-Level Configuration +~~~~~~~~~~~~~~~~~~~~~~~ + +Override the global default for specific DAGs: + +.. code-block:: python + + from datetime import datetime + + from airflow import DAG + from airflow.operators.empty import EmptyOperator + + # Always rerun with the latest version + with DAG( + dag_id="always_latest_dag", + run_on_latest_version=True, + start_date=datetime(2024, 1, 1), + ) as dag1: + EmptyOperator(task_id="task") + + # Always rerun with the same version as the original run + with DAG( + dag_id="pinned_version_dag", + run_on_latest_version=False, + start_date=datetime(2024, 1, 1), + ) as dag2: + EmptyOperator(task_id="task") + + # Inherit from global configuration (default if omitted) + with DAG( + dag_id="default_behavior_dag", + start_date=datetime(2024, 1, 1), + ) as dag3: + EmptyOperator(task_id="task") + +Use Cases +~~~~~~~~~ + +**Debugging failed runs**: + With ``False`` (the default), clearing a failed run reruns it with the same code, making it + easier to reproduce and isolate issues. + +**Always run latest code**: + Set ``[core] run_on_latest_version = True`` if your team prefers reruns to always pick up the + latest code, for example when bug fixes have been deployed since the original run. + +**Mixed policy**: + Set the global default to ``True`` but override specific critical DAGs with + ``run_on_latest_version=False`` for version stability where it matters most. + +Relationship with ``disable_bundle_versioning`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Airflow provides two separate settings that affect bundle versioning behavior. +They serve different purposes: + +``disable_bundle_versioning`` + Turns off version tracking entirely. When set to ``True``, no ``bundle_version`` is + recorded on DAG runs. Use this when versioning adds no value, for example local + development with ``LocalDagBundle`` or pipelines where reproducibility is not a concern. + Available as a DAG parameter and as a global config option + (``[dag_processor] disable_bundle_versioning``). + +``run_on_latest_version`` + Controls the default *rerun behavior* while keeping version tracking active. When a user + clears or reruns a task, this determines whether the new run uses the latest bundle + version or the original version. Versioning remains enabled so the version history is + still recorded. This only changes the default choice presented to users. + +In short: ``disable_bundle_versioning`` answers "should we track versions at all?", while +``run_on_latest_version`` answers "when rerunning, which version should be the default?". +The two settings are independent. ``run_on_latest_version`` has no effect when versioning +is disabled. + + Writing custom Dag bundles -------------------------- diff --git a/airflow-core/newsfragments/61448.significant.rst b/airflow-core/newsfragments/61448.significant.rst new file mode 100644 index 0000000000000..f1bedd72bddff --- /dev/null +++ b/airflow-core/newsfragments/61448.significant.rst @@ -0,0 +1,30 @@ +Add ``run_on_latest_version`` configuration for DAG bundle versioning + +When clearing or rerunning tasks, this setting controls whether the new DAG run +uses the latest bundle version or the original version from the initial run. +The setting follows a three-level precedence: + +1. **DAG-level**: ``run_on_latest_version`` parameter on the DAG definition. +2. **Global config**: ``[dag_bundles] run_on_latest_version`` in ``airflow.cfg``. +3. **Default**: ``False`` (use the original bundle version). + +In Airflow 2.x, reruns always used the latest code. Airflow 3.x introduced bundle +versioning, defaulting to the original version. This setting gives users control +over which behaviour is the default. + +See :doc:`/administration-and-deployment/dag-bundles` for full details. + +* Types of change + + * [ ] Dag changes + * [x] Config changes + * [x] API changes + * [ ] CLI changes + * [x] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [ ] Code interface changes + +* Migration rules needed + + * None - this is a new optional feature with backwards-compatible defaults diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py index 333349ad5e1a1..8c14e5ed53015 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py @@ -187,6 +187,7 @@ class DAGDetailsResponse(DAGResponse): owner_links: dict[str, str] | None = None is_favorite: bool = False active_runs_count: int = 0 + run_on_latest_version: bool | None = None @field_validator("timezone", mode="before") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instance_history.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instance_history.py index c913d91c1e607..e78ca70753436 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instance_history.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instance_history.py @@ -48,6 +48,7 @@ class TaskInstanceHistoryResponse(BaseModel): max_tries: int task_display_name: str dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name")) + dag_run_bundle_version: str | None = Field(validation_alias=AliasPath("dag_run", "bundle_version")) hostname: str | None unixname: str | None pool: str diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index 397389994a09f..45ef4ff47bb26 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -58,6 +58,7 @@ class TaskInstanceResponse(BaseModel): max_tries: int task_display_name: str dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name")) + dag_run_bundle_version: str | None = Field(validation_alias=AliasPath("dag_run", "bundle_version")) hostname: str | None unixname: str | None pool: str diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py index 96cd4aaad266a..bff326d6128da 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py @@ -36,3 +36,4 @@ class ConfigResponse(BaseModel): external_log_name: str | None = None theme: Theme | None multi_team: bool + run_on_latest_version: bool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 50e3d75bbdd2e..8a85f24c85655 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1615,6 +1615,9 @@ components: multi_team: type: boolean title: Multi Team + run_on_latest_version: + type: boolean + title: Run On Latest Version type: object required: - fallback_page_limit @@ -1629,6 +1632,7 @@ components: - show_external_log_redirect - theme - multi_team + - run_on_latest_version title: ConfigResponse description: configuration serializer. ConnectionHookFieldBehavior: @@ -3009,6 +3013,11 @@ components: dag_display_name: type: string title: Dag Display Name + dag_run_bundle_version: + anyOf: + - type: string + - type: 'null' + title: Dag Run Bundle Version hostname: anyOf: - type: string @@ -3113,6 +3122,7 @@ components: - max_tries - task_display_name - dag_display_name + - dag_run_bundle_version - hostname - unixname - pool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 17c4f87a6e44f..346fa064170b5 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -10356,6 +10356,11 @@ components: type: integer title: Active Runs Count default: 0 + run_on_latest_version: + anyOf: + - type: boolean + - type: 'null' + title: Run On Latest Version file_token: type: string title: File Token @@ -12564,6 +12569,11 @@ components: dag_display_name: type: string title: Dag Display Name + dag_run_bundle_version: + anyOf: + - type: string + - type: 'null' + title: Dag Run Bundle Version hostname: anyOf: - type: string @@ -12643,6 +12653,7 @@ components: - max_tries - task_display_name - dag_display_name + - dag_run_bundle_version - hostname - unixname - pool @@ -12720,6 +12731,11 @@ components: dag_display_name: type: string title: Dag Display Name + dag_run_bundle_version: + anyOf: + - type: string + - type: 'null' + title: Dag Run Bundle Version hostname: anyOf: - type: string @@ -12824,6 +12840,7 @@ components: - max_tries - task_display_name - dag_display_name + - dag_run_bundle_version - hostname - unixname - pool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 359809458cf29..11af39cbcc400 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -82,6 +82,7 @@ ) from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter from airflow.api_fastapi.logging.decorators import action_logging +from airflow.exceptions import BundleVersionUnavailable from airflow.listeners.listener import get_listener_manager from airflow.models import DagModel, DagRun from airflow.models.asset import AssetEvent @@ -498,6 +499,11 @@ def trigger_dag_run( current_user_id = user.get_id() dag_run.note = (dag_run_note, current_user_id) return dag_run + except BundleVersionUnavailable as e: + raise HTTPException( + status.HTTP_503_SERVICE_UNAVAILABLE, + f"Bundle version not yet available. Please retry: {e}", + ) except ValueError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py index 9510938232015..f51aae329c658 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py @@ -62,6 +62,7 @@ def get_configs() -> ConfigResponse: "external_log_name": getattr(task_log_reader.log_handler, "log_name", None), "theme": loads(conf.get("api", "theme", fallback="{}")) or None, "multi_team": conf.getboolean("core", "multi_team"), + "run_on_latest_version": conf.getboolean("core", "run_on_latest_version", fallback=False), } config.update({key: value for key, value in additional_config.items()}) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index 69a06eca9b5e1..da475e21e3af8 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -32,7 +32,7 @@ from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun -from airflow.exceptions import DagRunAlreadyExists +from airflow.exceptions import BundleVersionUnavailable, DagRunAlreadyExists from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun as DagRunModel from airflow.utils.state import DagRunState @@ -143,6 +143,19 @@ def trigger_dag_run( "message": f"A run already exists for Dag '{dag_id}' with run_id '{run_id}'", }, ) + except BundleVersionUnavailable as e: + log.warning( + "Bundle version unavailable when triggering DAG run: %s", + e, + extra={"dag_id": dag_id, "run_id": run_id}, + ) + raise HTTPException( + status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "reason": "bundle_version_unavailable", + "message": str(e), + }, + ) @router.post( diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py b/airflow-core/src/airflow/cli/commands/dag_command.py index 6572f467ff34c..e090a3212e492 100644 --- a/airflow-core/src/airflow/cli/commands/dag_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_command.py @@ -39,7 +39,7 @@ from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.dag_processing.dagbag import BundleDagBag, DagBag, sync_bag_to_db -from airflow.exceptions import AirflowConfigException, AirflowException +from airflow.exceptions import AirflowConfigException, AirflowException, BundleVersionUnavailable from airflow.jobs.job import Job from airflow.models import DagModel, DagRun, TaskInstance from airflow.models.errors import ParseImportError @@ -92,6 +92,14 @@ def dag_trigger(args) -> None: data=[message] if message is not None else [], output=args.output, ) + except BundleVersionUnavailable as err: + log.error( + "Bundle version not yet available: %s. " + "The bundle has been refreshed but DAGs have not been parsed yet. " + "Please retry in a few moments.", + err, + ) + sys.exit(1) except OSError as err: raise AirflowException(err) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 0c002f5276cfe..e6894dcb74a28 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -507,6 +507,20 @@ core: type: boolean example: ~ default: "False" + run_on_latest_version: + description: | + When True, DAG runs will use the latest available bundle version by default + when triggered, rerun, or cleared. This can be overridden at the DAG level + (via the DAG's ``run_on_latest_version`` parameter). + + .. note:: + + This only applies to bundles that support versioning (e.g., GitDagBundle). + LocalDagBundle and other non-versioned bundles are unaffected. + version_added: 3.2.0 + type: boolean + example: ~ + default: "False" database: description: ~ options: diff --git a/airflow-core/src/airflow/config_templates/unit_tests.cfg b/airflow-core/src/airflow/config_templates/unit_tests.cfg index c4493bc828b90..fde67fd8ace4e 100644 --- a/airflow-core/src/airflow/config_templates/unit_tests.cfg +++ b/airflow-core/src/airflow/config_templates/unit_tests.cfg @@ -57,6 +57,8 @@ unit_test_mode = True killed_task_cleanup_time = 5 # We only allow our own classes to be deserialized in tests allowed_deserialization_classes = airflow.* tests.* +# Default behavior for bundle versioning +run_on_latest_version = False [database] diff --git a/airflow-core/src/airflow/exceptions.py b/airflow-core/src/airflow/exceptions.py index ebcdb47b3a21f..c432eecfe1b4a 100644 --- a/airflow-core/src/airflow/exceptions.py +++ b/airflow-core/src/airflow/exceptions.py @@ -156,6 +156,17 @@ def serialize(self): ) +class BundleVersionUnavailable(AirflowException): + """ + Raise when a requested bundle version exists but has not been parsed/serialized yet. + + This is a temporary condition that occurs during the window between bundle refresh + completion and DAG parsing/serialization. Callers should retry the operation. + """ + + status_code = HTTPStatus.SERVICE_UNAVAILABLE + + class SerializationError(AirflowException): """A problem occurred when trying to serialize something.""" diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index f8c043f0c9f50..6772d62e633c5 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -34,7 +34,7 @@ from typing import TYPE_CHECKING, Any from sqlalchemy import CTE, and_, delete, exists, func, inspect, or_, select, text, tuple_, update -from sqlalchemy.exc import OperationalError +from sqlalchemy.exc import DBAPIError, OperationalError from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload from sqlalchemy.sql import expression @@ -51,7 +51,7 @@ ) from airflow.configuration import conf from airflow.dag_processing.bundles.base import BundleUsageTrackingManager -from airflow.exceptions import DagNotFound +from airflow.exceptions import BundleVersionUnavailable, DagNotFound from airflow.executors import workloads from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.base_job_runner import BaseJobRunner @@ -1792,30 +1792,44 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st partition_dag_ids.add(apdr.target_dag_id) run_after = timezone.utcnow() - dag_run = dag.create_dagrun( - run_id=DagRun.generate_run_id( - run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=run_after - ), - logical_date=None, - data_interval=None, - partition_key=apdr.partition_key, - run_after=run_after, - run_type=DagRunType.ASSET_TRIGGERED, - triggered_by=DagRunTriggeredByType.ASSET, - state=DagRunState.QUEUED, - creating_job_id=self.job.id, - session=session, - ) - asset_events = session.scalars( - select(AssetEvent).where( - PartitionedAssetKeyLog.asset_partition_dag_run_id == apdr.id, - PartitionedAssetKeyLog.asset_event_id == AssetEvent.id, + try: + dag_run = dag.create_dagrun( + run_id=DagRun.generate_run_id( + run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=run_after + ), + logical_date=None, + data_interval=None, + partition_key=apdr.partition_key, + run_after=run_after, + run_type=DagRunType.ASSET_TRIGGERED, + triggered_by=DagRunTriggeredByType.ASSET, + state=DagRunState.QUEUED, + creating_job_id=self.job.id, + session=session, + ) + asset_events = session.scalars( + select(AssetEvent).where( + PartitionedAssetKeyLog.asset_partition_dag_run_id == apdr.id, + PartitionedAssetKeyLog.asset_event_id == AssetEvent.id, + ) + ) + dag_run.consumed_asset_events.extend(asset_events) + session.flush() + apdr.created_dag_run_id = dag_run.id + session.flush() + except BundleVersionUnavailable: + self.log.warning( + "Bundle version not yet available for partitioned asset-triggered DAG %s. " + "Run will be created on next evaluation cycle.", + apdr.target_dag_id, + ) + except DBAPIError: + raise + except Exception: + self.log.exception( + "Failed creating DagRun for partitioned asset-triggered DAG %s", + apdr.target_dag_id, ) - ) - dag_run.consumed_asset_events.extend(asset_events) - session.flush() - apdr.created_dag_run_id = dag_run.id - session.flush() return partition_dag_ids @@ -1988,6 +2002,17 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - active_non_backfill_runs=active_runs_of_dags[dag_model.dag_id], ) + # Handle bundle version race condition: bundle refreshed but not yet parsed/serialized + except BundleVersionUnavailable: + self.log.warning( + "Bundle version not yet available for DAG %s. " + "Bundle refresh completed but DAGs not yet parsed. " + "Run will be created on next scheduling cycle.", + dag_model.dag_id, + ) + # Continue to next DAG - this run will be retried automatically + # on the next scheduling cycle when parsing completes + # Exceptions like ValueError, ParamValidationError, etc. are raised by # DagModel.create_dagrun() when dag is misconfigured. The scheduler should not # crash due to misconfigured dags. We should log any exception encountered @@ -2075,30 +2100,48 @@ def _create_dag_runs_asset_triggered( ) ) - dag_run = dag.create_dagrun( - run_id=DagRun.generate_run_id( - run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=triggered_date - ), - logical_date=None, - data_interval=None, - run_after=triggered_date, - run_type=DagRunType.ASSET_TRIGGERED, - triggered_by=DagRunTriggeredByType.ASSET, - state=DagRunState.QUEUED, - creating_job_id=self.job.id, - session=session, - ) - Stats.incr("asset.triggered_dagruns") - dag_run.consumed_asset_events.extend(asset_events) - - # Delete only consumed ADRQ rows to avoid dropping newly queued events - # (e.g. DagRun triggered by asset A while a new event for asset B arrives). - adrq_pks = [(record.asset_id, record.target_dag_id) for record in queued_adrqs] - session.execute( - delete(AssetDagRunQueue).where( - tuple_(AssetDagRunQueue.asset_id, AssetDagRunQueue.target_dag_id).in_(adrq_pks) + try: + dag_run = dag.create_dagrun( + run_id=DagRun.generate_run_id( + run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=triggered_date + ), + logical_date=None, + data_interval=None, + run_after=triggered_date, + run_type=DagRunType.ASSET_TRIGGERED, + triggered_by=DagRunTriggeredByType.ASSET, + state=DagRunState.QUEUED, + creating_job_id=self.job.id, + session=session, ) - ) + Stats.incr("asset.triggered_dagruns") + dag_run.consumed_asset_events.extend(asset_events) + + # Delete only consumed ADRQ rows to avoid dropping newly queued events + # (e.g. DagRun triggered by asset A while a new event for asset B arrives). + adrq_pks = [(record.asset_id, record.target_dag_id) for record in queued_adrqs] + session.execute( + delete(AssetDagRunQueue).where( + tuple_(AssetDagRunQueue.asset_id, AssetDagRunQueue.target_dag_id).in_(adrq_pks) + ) + ) + except BundleVersionUnavailable: + self.log.warning( + "Bundle version not yet available for asset-triggered DAG %s. " + "Bundle refresh completed but DAGs not yet parsed. " + "Run will be created on next asset trigger evaluation.", + dag.dag_id, + ) + # Asset trigger remains in AssetDagRunQueue and will be retried + # on next evaluation cycle when parsing completes + except DBAPIError: + raise + except Exception: + self.log.exception( + "Failed creating asset-triggered DagRun for DAG %s", + dag.dag_id, + ) + # Continue to next DAG to avoid cascading failures def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) -> dict[int, Backfill]: """ diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index bb54faec615fe..a998d46d65bad 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -43,7 +43,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship, validates from airflow._shared.timezones import timezone -from airflow.exceptions import AirflowException, DagNotFound, DagRunTypeNotAllowed +from airflow.exceptions import AirflowException, BundleVersionUnavailable, DagNotFound, DagRunTypeNotAllowed from airflow.models.base import Base, StringID from airflow.utils.session import create_session from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks @@ -419,6 +419,25 @@ def _create_backfill_dag_run_non_partitioned( sort_ordinal=backfill_sort_ordinal, ) ) + except BundleVersionUnavailable: + log.warning( + "Bundle version not yet available for dag_id=%s backfill_id=%s, logical_date=%s. " + "Bundle refresh completed but DAGs not yet parsed. Will retry on next backfill iteration.", + dag.dag_id, + backfill_id, + info.logical_date, + ) + nested.rollback() + + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=BackfillDagRunExceptionReason.IN_FLIGHT, + sort_ordinal=backfill_sort_ordinal, + ) + ) def _create_backfill_dag_run_partitioned( @@ -451,35 +470,53 @@ def _create_backfill_dag_run_partitioned( "Skipping dag run creation.", non_create_reason=non_create_reason, backfill_id=backfill_id ) return - dr = dag.create_dagrun( - run_id=dag.timetable.generate_run_id( - run_type=DagRunType.BACKFILL_JOB, - data_interval=info.data_interval, + try: + dr = dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.BACKFILL_JOB, + data_interval=info.data_interval, + partition_key=info.partition_key, + run_after=info.run_after, + ), + logical_date=info.logical_date, partition_key=info.partition_key, + data_interval=info.data_interval if info.logical_date else None, run_after=info.run_after, - ), - logical_date=info.logical_date, - partition_key=info.partition_key, - data_interval=info.data_interval if info.logical_date else None, - run_after=info.run_after, - conf=dag_run_conf, - run_type=DagRunType.BACKFILL_JOB, - triggered_by=DagRunTriggeredByType.BACKFILL, - triggering_user_name=triggering_user_name, - state=DagRunState.QUEUED, - start_date=timezone.utcnow(), - backfill_id=backfill_id, - session=session, - ) - session.add( - BackfillDagRun( + conf=dag_run_conf, + run_type=DagRunType.BACKFILL_JOB, + triggered_by=DagRunTriggeredByType.BACKFILL, + triggering_user_name=triggering_user_name, + state=DagRunState.QUEUED, + start_date=timezone.utcnow(), backfill_id=backfill_id, - dag_run_id=dr.id, - sort_ordinal=backfill_sort_ordinal, - logical_date=info.logical_date, - partition_key=info.partition_key, + session=session, + ) + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + sort_ordinal=backfill_sort_ordinal, + logical_date=info.logical_date, + partition_key=info.partition_key, + ) + ) + except BundleVersionUnavailable: + log.warning( + "Bundle version not yet available for partitioned backfill dag_id=%s backfill_id=%s. " + "Will retry on next backfill iteration.", + dag.dag_id, + backfill_id, + ) + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + partition_key=info.partition_key, + exception_reason=BackfillDagRunExceptionReason.IN_FLIGHT, + sort_ordinal=backfill_sort_ordinal, + ) ) - ) def _get_info_list( diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index c8a9c05bd9fd3..d6dc73e39c29c 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -420,7 +420,7 @@ def clear_task_instances( log.warning("No serialized dag found for dag '%s'", dr.dag_id) if dr_dag and not dr_dag.disable_bundle_versioning and run_on_latest_version: bundle_version = dr.dag_model.bundle_version - if bundle_version is not None and run_on_latest_version: + if bundle_version is not None: dr.bundle_version = bundle_version if dag_run_state == DagRunState.QUEUED: dr.last_scheduling_decision = None diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index a8fa92d12ce9a..865f7d4976d6b 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -32,9 +32,10 @@ from airflow._shared.timezones.timezone import coerce_datetime from airflow.configuration import conf as airflow_conf -from airflow.exceptions import AirflowException, TaskNotFound +from airflow.exceptions import AirflowException, BundleVersionUnavailable, TaskNotFound from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion +from airflow.models.dagbundle import DagBundleModel from airflow.models.dagrun import DagRun from airflow.models.deadline import Deadline from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel @@ -103,6 +104,7 @@ class SerializedDAG: allowed_run_types: list[str] | None = None description: str | None = None disable_bundle_versioning: bool = False + run_on_latest_version: bool | None = None doc_md: str | None = None edge_info: dict[str, dict[str, EdgeInfoType]] = attrs.field(factory=dict) end_date: datetime.datetime | None = None @@ -164,6 +166,7 @@ def get_serialized_fields(cls) -> frozenset[str]: "owner_links", "relative_fileloc", "render_template_as_native_obj", + "run_on_latest_version", "start_date", "tags", "task_group", @@ -1138,6 +1141,114 @@ def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeI return empty +def _resolve_bundle_version( + dag: SerializedDAG, + session: Session, +) -> str | None: + """ + Resolve the bundle version for a DAG run based on the precedence hierarchy. + + Precedence (highest to lowest): + 1. DAG-level configuration (dag.run_on_latest_version) + 2. Global configuration (core.run_on_latest_version) + 3. System default (use original version from DagModel) + + :param dag: The serialized DAG + :param session: Database session + :return: The resolved bundle version, or None if versioning is disabled + """ + if dag.disable_bundle_versioning: + return None + + # Determine whether to use latest version based on precedence hierarchy + use_latest = _should_use_latest_version(dag) + source = _get_config_source(dag) + + if use_latest: + log.debug("Using latest bundle version", dag_id=dag.dag_id, source=source) + return _get_latest_bundle_version(dag.dag_id, session) + + log.debug("Using original bundle version", dag_id=dag.dag_id, source=source) + return _get_original_bundle_version(dag.dag_id, session) + + +def _should_use_latest_version( + dag: SerializedDAG, +) -> bool: + """ + Determine whether to use latest bundle version based on precedence hierarchy. + + Returns True if latest version should be used, False otherwise. + """ + # Level 1: DAG-level configuration (explicit True or False) + if dag.run_on_latest_version is not None: + return dag.run_on_latest_version + + # Level 2: Global configuration (fallback to False) + return airflow_conf.getboolean("core", "run_on_latest_version", fallback=False) + + +def _get_config_source( + dag: SerializedDAG, +) -> str: + """Return descriptive source of the bundle version configuration for logging.""" + if dag.run_on_latest_version is not None: + return "DAG configuration" + if airflow_conf.has_option("core", "run_on_latest_version"): + return "global configuration" + return "system default" + + +def _get_latest_bundle_version(dag_id: str, session: Session) -> str | None: + """ + Get the latest bundle version from DagBundleModel, falling back to DagModel. + + :param dag_id: The DAG ID + :param session: Database session + """ + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id)) + if not dag_model: + log.warning( + "Cannot resolve latest bundle version: DagModel not found", + dag_id=dag_id, + ) + return None + + dag_bundle = session.scalar(select(DagBundleModel).where(DagBundleModel.name == dag_model.bundle_name)) + if not dag_bundle: + log.warning( + "DagBundleModel not found, falling back to original version", + dag_id=dag_id, + bundle_name=dag_model.bundle_name, + fallback_version=dag_model.bundle_version, + ) + return dag_model.bundle_version + + # Non-versioned bundle (e.g., LocalDagBundle) - use original version + if dag_bundle.version is None: + log.debug( + "Bundle does not support versioning, using original version", + dag_id=dag_id, + bundle_name=dag_model.bundle_name, + ) + return dag_model.bundle_version + + log.debug( + "Resolved latest bundle version", + dag_id=dag_id, + bundle_name=dag_model.bundle_name, + bundle_version=dag_bundle.version, + ) + return dag_bundle.version + + +def _get_original_bundle_version(dag_id: str, session: Session) -> str | None: + """Get the original bundle version from DagModel.""" + version = session.scalar(select(DagModel.bundle_version).where(DagModel.dag_id == dag_id)) + log.debug("Using original bundle version", dag_id=dag_id, bundle_version=version) + return version + + @provide_session def _create_orm_dagrun( *, @@ -1159,13 +1270,27 @@ def _create_orm_dagrun( note: str | None = None, session: Session = NEW_SESSION, ) -> DagRun: - bundle_version = None - if not dag.disable_bundle_versioning: - bundle_version = session.scalar( - select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id), - ) - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + bundle_version = _resolve_bundle_version( + dag=dag, + session=session, + ) + dag_version = DagVersion.get_latest_version(dag.dag_id, bundle_version=bundle_version, session=session) + if not dag_version: + if bundle_version: + # Bundle version exists but not yet serialized - this is a temporary race condition + log.warning( + "Bundle version %s for DAG %s is not yet available. " + "The bundle has been refreshed but DAGs have not been parsed yet. " + "Please retry in a few moments.", + bundle_version, + dag.dag_id, + ) + raise BundleVersionUnavailable( + f"Cannot create DagRun for DAG {dag.dag_id} with bundle version {bundle_version}. " + f"The requested bundle version has not been parsed yet. " + f"This is a temporary condition during bundle refresh. Please retry." + ) raise AirflowException(f"Cannot create DagRun for DAG {dag.dag_id} because the dag is not serialized") run = DagRun( diff --git a/airflow-core/src/airflow/serialization/schema.json b/airflow-core/src/airflow/serialization/schema.json index bed7b7e9132ee..7f176e92b74c7 100644 --- a/airflow-core/src/airflow/serialization/schema.json +++ b/airflow-core/src/airflow/serialization/schema.json @@ -224,7 +224,8 @@ ]}, "edge_info": { "$ref": "#/definitions/edge_info" }, "dag_dependencies": { "$ref": "#/definitions/dag_dependencies" }, - "disable_bundle_versioning": {"type": "boolean", "default": false } + "disable_bundle_versioning": {"type": "boolean", "default": false }, + "run_on_latest_version": {"type": ["boolean", "null"]} }, "required": [ "dag_id", diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 9a0c7ab11d7e1..3bf3eb2cfbb97 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -2232,6 +2232,7 @@ class LazyDeserializedDAG(pydantic.BaseModel): "jinja_environment_kwargs", "relative_fileloc", "disable_bundle_versioning", + "run_on_latest_version", "fail_fast", "last_loaded", } diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index f47aa107aea24..958e978ab2801 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2158,6 +2158,17 @@ export const $DAGDetailsResponse = { title: 'Active Runs Count', default: 0 }, + run_on_latest_version: { + anyOf: [ + { + type: 'boolean' + }, + { + type: 'null' + } + ], + title: 'Run On Latest Version' + }, file_token: { type: 'string', title: 'File Token', @@ -5302,6 +5313,17 @@ export const $TaskInstanceHistoryResponse = { type: 'string', title: 'Dag Display Name' }, + dag_run_bundle_version: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Run Bundle Version' + }, hostname: { anyOf: [ { @@ -5438,7 +5460,7 @@ export const $TaskInstanceHistoryResponse = { } }, type: 'object', - required: ['task_id', 'dag_id', 'dag_run_id', 'map_index', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'operator_name', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'dag_version'], + required: ['task_id', 'dag_id', 'dag_run_id', 'map_index', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'dag_run_bundle_version', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'operator_name', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'dag_version'], title: 'TaskInstanceHistoryResponse', description: 'TaskInstanceHistory serializer for responses.' } as const; @@ -5544,6 +5566,17 @@ export const $TaskInstanceResponse = { type: 'string', title: 'Dag Display Name' }, + dag_run_bundle_version: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Run Bundle Version' + }, hostname: { anyOf: [ { @@ -5727,7 +5760,7 @@ export const $TaskInstanceResponse = { } }, type: 'object', - required: ['id', 'task_id', 'dag_id', 'dag_run_id', 'map_index', 'logical_date', 'run_after', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'operator_name', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'note', 'rendered_map_index', 'trigger', 'triggerer_job', 'dag_version'], + required: ['id', 'task_id', 'dag_id', 'dag_run_id', 'map_index', 'logical_date', 'run_after', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'dag_run_bundle_version', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'operator_name', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'note', 'rendered_map_index', 'trigger', 'triggerer_job', 'dag_version'], title: 'TaskInstanceResponse', description: 'TaskInstance serializer for responses.' } as const; @@ -7307,10 +7340,14 @@ export const $ConfigResponse = { multi_team: { type: 'boolean', title: 'Multi Team' + }, + run_on_latest_version: { + type: 'boolean', + title: 'Run On Latest Version' } }, type: 'object', - required: ['fallback_page_limit', 'auto_refresh_interval', 'hide_paused_dags_by_default', 'instance_name', 'enable_swagger_ui', 'require_confirmation_dag_change', 'default_wrap', 'test_connection', 'dashboard_alert', 'show_external_log_redirect', 'theme', 'multi_team'], + required: ['fallback_page_limit', 'auto_refresh_interval', 'hide_paused_dags_by_default', 'instance_name', 'enable_swagger_ui', 'require_confirmation_dag_change', 'default_wrap', 'test_connection', 'dashboard_alert', 'show_external_log_redirect', 'theme', 'multi_team', 'run_on_latest_version'], title: 'ConfigResponse', description: 'configuration serializer.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 9ebcb1278c987..3d1e06c03b5a5 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -577,6 +577,7 @@ export type DAGDetailsResponse = { } | null; is_favorite?: boolean; active_runs_count?: number; + run_on_latest_version?: boolean | null; /** * Return file token. */ @@ -1389,6 +1390,7 @@ export type TaskInstanceHistoryResponse = { max_tries: number; task_display_name: string; dag_display_name: string; + dag_run_bundle_version: string | null; hostname: string | null; unixname: string | null; pool: string; @@ -1424,6 +1426,7 @@ export type TaskInstanceResponse = { max_tries: number; task_display_name: string; dag_display_name: string; + dag_run_bundle_version: string | null; hostname: string | null; unixname: string | null; pool: string; @@ -1797,6 +1800,7 @@ export type ConfigResponse = { external_log_name?: string | null; theme: Theme | null; multi_team: boolean; + run_on_latest_version: boolean; }; /** diff --git a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx index a4b53c104280a..164e37f3594ba 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx @@ -24,6 +24,7 @@ import { CgRedo } from "react-icons/cg"; import { useDagServiceGetDagDetails } from "openapi/queries"; import type { DAGRunResponse, TaskInstanceResponse } from "openapi/requests/types.gen"; import { ActionAccordion } from "src/components/ActionAccordion"; +import { useRunOnLatestVersion } from "src/components/Clear/useRunOnLatestVersion"; import { Checkbox, Dialog } from "src/components/ui"; import SegmentedControl from "src/components/ui/SegmentedControl"; import { useClearDagRunDryRun } from "src/queries/useClearDagRunDryRun"; @@ -45,13 +46,23 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { const [note, setNote] = useState(dagRun.note); const [selectedOptions, setSelectedOptions] = useState>(["existingTasks"]); const onlyFailed = selectedOptions.includes("onlyFailed"); - const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); // Get current DAG's bundle version to compare with DAG run's bundle version const { data: dagDetails } = useDagServiceGetDagDetails({ dagId, }); + // Use custom hook for run_on_latest_version checkbox state and visibility + const { + setValue: setRunOnLatestVersion, + shouldShowCheckbox: shouldShowBundleVersionOption, + value: runOnLatestVersion, + } = useRunOnLatestVersion({ + currentBundleVersion: dagDetails?.bundle_version, + dagLevelConfig: dagDetails?.run_on_latest_version, + runBundleVersion: dagRun.bundle_version, + }); + const refetchInterval = useAutoRefresh({ dagId }); const { data: affectedTasks = { task_instances: [], total_entries: 0 } } = useClearDagRunDryRun({ @@ -78,13 +89,6 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { onSuccess: onClose, }); - // Check if bundle versions are different - const currentDagBundleVersion = dagDetails?.bundle_version; - const dagRunBundleVersion = dagRun.bundle_version; - const bundleVersionsDiffer = currentDagBundleVersion !== dagRunBundleVersion; - const shouldShowBundleVersionOption = - bundleVersionsDiffer && dagRunBundleVersion !== null && dagRunBundleVersion !== ""; - return ( @@ -126,6 +130,7 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx index 63e1df78526c4..91dfcc9b56383 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx @@ -22,9 +22,14 @@ import { useTranslation } from "react-i18next"; import { CgRedo } from "react-icons/cg"; import { useParams } from "react-router-dom"; -import { useDagServiceGetDagDetails, useTaskInstanceServiceGetTaskInstances } from "openapi/queries"; +import { + useDagRunServiceGetDagRun, + useDagServiceGetDagDetails, + useTaskInstanceServiceGetTaskInstances, +} from "openapi/queries"; import type { LightGridTaskInstanceSummary, TaskInstanceResponse } from "openapi/requests/types.gen"; import { ActionAccordion } from "src/components/ActionAccordion"; +import { useRunOnLatestVersion } from "src/components/Clear/useRunOnLatestVersion"; import { Checkbox, Dialog } from "src/components/ui"; import SegmentedControl from "src/components/ui/SegmentedControl"; import { useClearTaskInstances } from "src/queries/useClearTaskInstances"; @@ -55,7 +60,6 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr const future = selectedOptions.includes("future"); const upstream = selectedOptions.includes("upstream"); const downstream = selectedOptions.includes("downstream"); - const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); const [note, setNote] = useState(""); @@ -63,6 +67,23 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr dagId, }); + // Get dag run to compare bundle versions + const { data: dagRun } = useDagRunServiceGetDagRun({ + dagId, + dagRunId: runId, + }); + + // Use custom hook for run_on_latest_version checkbox state and visibility + const { + setValue: setRunOnLatestVersion, + shouldShowCheckbox: shouldShowBundleVersionOption, + value: runOnLatestVersion, + } = useRunOnLatestVersion({ + currentBundleVersion: dagDetails?.bundle_version, + dagLevelConfig: dagDetails?.run_on_latest_version, + runBundleVersion: dagRun?.bundle_version, + }); + const { data: groupTaskInstances } = useTaskInstanceServiceGetTaskInstances( { dagId, @@ -106,9 +127,6 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr total_entries: 0, }; - const shouldShowBundleVersionOption = - dagDetails?.bundle_version !== null && dagDetails?.bundle_version !== ""; - return ( @@ -161,6 +179,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, taskInstance }: Pr diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx index c7562083c05f3..0f1d784916cbb 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx @@ -24,6 +24,7 @@ import { CgRedo } from "react-icons/cg"; import { useDagServiceGetDagDetails } from "openapi/queries"; import type { TaskInstanceResponse } from "openapi/requests/types.gen"; import { ActionAccordion } from "src/components/ActionAccordion"; +import { useRunOnLatestVersion } from "src/components/Clear/useRunOnLatestVersion"; import Time from "src/components/Time"; import { Checkbox, Dialog } from "src/components/ui"; import SegmentedControl from "src/components/ui/SegmentedControl"; @@ -62,7 +63,6 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas const future = selectedOptions.includes("future"); const upstream = selectedOptions.includes("upstream"); const downstream = selectedOptions.includes("downstream"); - const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); const [preventRunningTask, setPreventRunningTask] = useState(true); const [note, setNote] = useState(taskInstance.note); @@ -73,11 +73,22 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas taskId, }); - // Get current DAG's bundle version to compare with task instance's DAG version bundle version + // Get current DAG's bundle version to compare with task instance's DAG run bundle version const { data: dagDetails } = useDagServiceGetDagDetails({ dagId, }); + // Use custom hook for run_on_latest_version checkbox state and visibility + const { + setValue: setRunOnLatestVersion, + shouldShowCheckbox: shouldShowBundleVersionOption, + value: runOnLatestVersion, + } = useRunOnLatestVersion({ + currentBundleVersion: dagDetails?.bundle_version, + dagLevelConfig: dagDetails?.run_on_latest_version, + runBundleVersion: taskInstance.dag_run_bundle_version, + }); + const refetchInterval = useAutoRefresh({ dagId }); const { data } = useClearTaskInstancesDryRun({ @@ -107,15 +118,6 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas total_entries: 0, }; - // Check if bundle versions are different - const currentDagBundleVersion = dagDetails?.bundle_version; - const taskInstanceDagVersionBundleVersion = taskInstance.dag_version?.bundle_version; - const bundleVersionsDiffer = currentDagBundleVersion !== taskInstanceDagVersionBundleVersion; - const shouldShowBundleVersionOption = - bundleVersionsDiffer && - taskInstanceDagVersionBundleVersion !== null && - taskInstanceDagVersionBundleVersion !== ""; - return ( <> @@ -171,6 +173,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas @@ -245,7 +248,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, tas open={open} preventRunningTask={preventRunningTask} /> - ) : null} + ) : undefined} ); }; diff --git a/airflow-core/src/airflow/ui/src/components/Clear/useRunOnLatestVersion.ts b/airflow-core/src/airflow/ui/src/components/Clear/useRunOnLatestVersion.ts new file mode 100644 index 0000000000000..c5354a39a7c23 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/components/Clear/useRunOnLatestVersion.ts @@ -0,0 +1,94 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useState } from "react"; + +import { useConfig } from "src/queries/useConfig"; + +type UseRunOnLatestVersionProps = { + /** + * Current DAG bundle version + */ + currentBundleVersion?: string | null; + + /** + * DAG-level configuration from the DAG details. + * If defined (true or false), takes precedence over global config. + */ + dagLevelConfig?: boolean | null; + + /** + * DAG run bundle version + */ + runBundleVersion?: string | null; +}; + +type UseRunOnLatestVersionResult = { + /** + * Setter for the checkbox value + */ + setValue: (value: boolean) => void; + + /** + * Whether the checkbox should be shown + */ + shouldShowCheckbox: boolean; + + /** + * Current value of the checkbox (controlled) + */ + value: boolean; +}; + +/** + * Custom hook for managing "Run on Latest Version" checkbox state. + * + * Implements the three-level precedence hierarchy: + * 1. DAG-level configuration (highest priority) + * 2. Global configuration + * 3. System default (false) + * + * Uses nullable override pattern: state is null until user interacts, + * avoiding the useState + useEffect synchronization anti-pattern. + */ +export const useRunOnLatestVersion = ({ + currentBundleVersion, + dagLevelConfig, + runBundleVersion, +}: UseRunOnLatestVersionProps): UseRunOnLatestVersionResult => { + const globalConfigValue = useConfig("run_on_latest_version"); + const globalDefault = Boolean(globalConfigValue); + + // Precedence: DAG-level > Global > System default (false) + const defaultValue = dagLevelConfig ?? globalDefault; + + // Nullable override: null until user interacts, then stores their choice + const [userOverride, setUserOverride] = useState(null); + const value = userOverride ?? defaultValue; + + // Show checkbox only when versions differ and run has a valid bundle version + const hasValidRunVersion = + runBundleVersion !== undefined && runBundleVersion !== null && runBundleVersion !== ""; + const shouldShowCheckbox = hasValidRunVersion && currentBundleVersion !== runBundleVersion; + + return { + setValue: setUserOverride, + shouldShowCheckbox, + value, + }; +}; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 3a55532e3d73b..be6afb866c5d8 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -27,6 +27,7 @@ from airflow._shared.timezones import timezone from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse +from airflow.exceptions import BundleVersionUnavailable from airflow.models import DagModel, DagRun, Log from airflow.models.asset import AssetEvent, AssetModel from airflow.providers.standard.operators.empty import EmptyOperator @@ -1799,6 +1800,16 @@ def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun, test_cli assert response.status_code == 400 assert response.json() == {"detail": error_message} + @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.create_dagrun") + def test_trigger_dag_run_returns_503_on_bundle_version_unavailable(self, mock_create_dagrun, test_client): + now = timezone.utcnow().isoformat() + + mock_create_dagrun.side_effect = BundleVersionUnavailable("Bundle version v2.0.0 not yet parsed") + + response = test_client.post(f"/dags/{DAG1_ID}/dagRuns", json={"logical_date": now}) + assert response.status_code == 503 + assert "Bundle version not yet available" in response.json()["detail"] + def test_should_respond_404_if_a_dag_is_inactive(self, test_client, session, testing_dag_bundle): now = timezone.utcnow().isoformat() self._dags_for_trigger_tests(session) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index 55515b76931eb..7e3d60c95fac5 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -1000,6 +1000,7 @@ def test_dag_details( "timetable_partitioned": False, "timetable_summary": None, "timezone": UTC_JSON_REPR, + "run_on_latest_version": None, } assert res_json == expected @@ -1099,6 +1100,7 @@ def test_dag_details_with_view_url_template( "timetable_description": "Never, external triggers only", "timetable_partitioned": False, "timezone": UTC_JSON_REPR, + "run_on_latest_version": None, } assert res_json == expected @@ -1191,6 +1193,58 @@ def test_dag_details_includes_active_runs_count(self, session, test_client): assert isinstance(body["active_runs_count"], int) assert body["active_runs_count"] == 0 + def test_dag_details_includes_run_on_latest_version(self, session, test_client, dag_maker): + """Test that DAG details include the run_on_latest_version field.""" + from airflow.providers.standard.operators.empty import EmptyOperator + + # Test with a DAG that has run_on_latest_version=True + with dag_maker( + dag_id="test_dag_with_latest_version", + start_date=datetime(2021, 6, 15, tzinfo=timezone.utc), + run_on_latest_version=True, + session=session, + serialized=True, + ): + EmptyOperator(task_id="test_task") + + session.commit() + + response = test_client.get("/dags/test_dag_with_latest_version/details") + assert response.status_code == 200 + body = response.json() + + # Verify run_on_latest_version field is present and True + assert "run_on_latest_version" in body + assert body["run_on_latest_version"] is True + + # Test with a DAG that has run_on_latest_version=False + with dag_maker( + dag_id="test_dag_without_latest_version", + start_date=datetime(2021, 6, 15, tzinfo=timezone.utc), + run_on_latest_version=False, + session=session, + serialized=True, + ): + EmptyOperator(task_id="test_task") + + session.commit() + + response = test_client.get("/dags/test_dag_without_latest_version/details") + assert response.status_code == 200 + body = response.json() + + assert "run_on_latest_version" in body + assert body["run_on_latest_version"] is False + + # Test with a DAG that has run_on_latest_version=None (default/inherit) + # DAG2_ID from the fixtures doesn't have this parameter set + response = test_client.get(f"/dags/{DAG2_ID}/details") + assert response.status_code == 200 + body = response.json() + + assert "run_on_latest_version" in body + assert body["run_on_latest_version"] is None + class TestGetDag(TestDagEndpoint): """Unit tests for Get DAG.""" diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index 65b156fd71a39..e63caebb17e83 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -228,6 +228,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "task_instance": { "dag_display_name": DAG_ID, "dag_id": DAG_ID, + "dag_run_bundle_version": None, "dag_run_id": "test", "dag_version": { "bundle_name": "dag_maker", diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 6bf7213c9c8bb..ba3cf0dada959 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -201,6 +201,7 @@ def test_should_respond_200(self, test_client, session): "version_number": 1, }, "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "logical_date": "2020-01-01T00:00:00Z", @@ -280,6 +281,7 @@ def test_should_respond_200_with_versions( "dag_id": "dag_with_multiple_versions", "dag_run_id": run_id, "dag_display_name": "dag_with_multiple_versions", + "dag_run_bundle_version": f"some_commit_hash{expected_version_number}", "map_index": -1, "logical_date": mock.ANY, "start_date": None, @@ -359,6 +361,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi "version_number": 1, }, "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "logical_date": "2020-01-01T00:00:00Z", @@ -423,6 +426,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio "version_number": 1, }, "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "logical_date": "2020-01-01T00:00:00Z", @@ -479,6 +483,7 @@ def test_should_respond_200_task_instance_with_rendered(self, test_client, sessi "version_number": 1, }, "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "logical_date": "2020-01-01T00:00:00Z", @@ -599,6 +604,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, test_client, se "version_number": 1, }, "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "logical_date": "2020-01-01T00:00:00Z", @@ -2112,6 +2118,7 @@ def test_should_respond_200(self, test_client, session): assert response_data == { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -2158,6 +2165,7 @@ def test_should_respond_200_with_different_try_numbers(self, test_client, try_nu assert response_data == { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -2235,6 +2243,7 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers( assert response_data == { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -2307,6 +2316,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi assert response_data == { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -2354,6 +2364,7 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio assert response_data == { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -2434,6 +2445,7 @@ def test_should_respond_200_with_versions( "task_id": "task1", "dag_id": "dag_with_multiple_versions", "dag_display_name": "dag_with_multiple_versions", + "dag_run_bundle_version": f"some_commit_hash{expected_version_number}", "dag_run_id": run_id, "map_index": -1, "start_date": None, @@ -2488,6 +2500,7 @@ def test_should_respond_200_with_versions_using_url_template( "task_id": "task1", "dag_id": "dag_with_multiple_versions", "dag_display_name": "dag_with_multiple_versions", + "dag_run_bundle_version": f"some_commit_hash{expected_version_number}", "dag_run_id": run_id, "map_index": -1, "start_date": None, @@ -3183,6 +3196,7 @@ def test_should_respond_200_with_dag_run_id( { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -3570,6 +3584,7 @@ def test_should_respond_200(self, test_client, session): { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -3607,6 +3622,7 @@ def test_should_respond_200(self, test_client, session): { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -3678,6 +3694,7 @@ def test_ti_in_retry_state_not_returned(self, test_client, session): { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -3761,6 +3778,7 @@ def test_mapped_task_should_respond_200(self, test_client, session): { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -3798,6 +3816,7 @@ def test_mapped_task_should_respond_200(self, test_client, session): { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "duration": 10000.0, "end_date": "2020-01-03T00:00:00Z", "executor": None, @@ -3870,6 +3889,7 @@ def test_should_respond_200_with_versions( "task_id": "task1", "dag_id": "dag_with_multiple_versions", "dag_display_name": "dag_with_multiple_versions", + "dag_run_bundle_version": f"some_commit_hash{expected_version_number}", "dag_run_id": run_id, "map_index": -1, "start_date": None, @@ -3924,6 +3944,7 @@ def test_should_respond_200_with_versions_using_url_template( "task_id": "task1", "dag_id": "dag_with_multiple_versions", "dag_display_name": "dag_with_multiple_versions", + "dag_run_bundle_version": f"some_commit_hash{expected_version_number}", "dag_run_id": run_id, "map_index": -1, "start_date": None, @@ -4023,6 +4044,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): { "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -4297,6 +4319,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -4433,6 +4456,7 @@ def test_update_mask_set_note_should_respond_200( { "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -4494,6 +4518,7 @@ def test_set_note_should_respond_200(self, test_client, session): { "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -4573,6 +4598,7 @@ def test_set_note_should_respond_200_mapped_task_with_rtif(self, test_client, se { "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -4654,6 +4680,7 @@ def test_set_note_should_respond_200_mapped_task_summary_with_rtif(self, test_cl assert response_ti == { "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -4770,6 +4797,7 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): { "dag_id": self.DAG_ID, "dag_display_name": self.DAG_DISPLAY_NAME, + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, @@ -5056,6 +5084,7 @@ def test_should_raise_422_for_invalid_task_instance_state(self, payload, expecte { "dag_id": "example_python_operator", "dag_display_name": "example_python_operator", + "dag_run_bundle_version": None, "dag_version": { "bundle_name": "dags-folder", "bundle_url": None, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py index ca5f44b1fc5b6..3b460fc9a2f4c 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py @@ -69,6 +69,7 @@ "external_log_name": None, "theme": THEME, "multi_team": False, + "run_on_latest_version": False, } @@ -112,3 +113,44 @@ def test_get_config_just_authenticated(self, mock_config_data, unauthorized_test response = unauthorized_test_client.get("/config") assert response.status_code == 200 assert response.json() == expected_config_response + + def test_get_config_with_run_on_latest_version_true(self, test_client): + """Test that run_on_latest_version config is properly exposed when True.""" + with conf_vars( + { + ("core", "run_on_latest_version"): "true", + ("api", "instance_name"): "Airflow", + ("api", "enable_swagger_ui"): "true", + ("api", "hide_paused_dags_by_default"): "true", + ("api", "page_size"): "100", + ("api", "default_wrap"): "false", + ("api", "auto_refresh_interval"): "3", + ("api", "require_confirmation_dag_change"): "false", + ("api", "theme"): json.dumps(THEME), + } + ): + response = test_client.get("/config") + + assert response.status_code == 200 + response_data = response.json() + assert response_data["run_on_latest_version"] is True + + def test_get_config_with_run_on_latest_version_false(self, test_client): + """Test that run_on_latest_version config defaults to False when not set.""" + with conf_vars( + { + ("api", "instance_name"): "Airflow", + ("api", "enable_swagger_ui"): "true", + ("api", "hide_paused_dags_by_default"): "true", + ("api", "page_size"): "100", + ("api", "default_wrap"): "false", + ("api", "auto_refresh_interval"): "3", + ("api", "require_confirmation_dag_change"): "false", + ("api", "theme"): json.dumps(THEME), + } + ): + response = test_client.get("/config") + + assert response.status_code == 200 + response_data = response.json() + assert response_data["run_on_latest_version"] is False diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index 2aa3be51220c4..3b7b47e551fd6 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -23,6 +23,7 @@ from airflow._shared.timezones import timezone from airflow.models import DagModel +from airflow.models.dagbundle import DagBundleModel from airflow.models.dagrun import DagRun from airflow.providers.standard.operators.empty import EmptyOperator from airflow.utils.state import DagRunState, State @@ -191,6 +192,50 @@ def test_trigger_dag_run_already_exists(self, client, session, dag_maker): } } + def test_trigger_dag_run_bundle_version_not_yet_parsed(self, client, session, dag_maker): + """Test 503 when requested bundle version hasn't been serialized yet (race condition).""" + + dag_id = "test_bundle_race" + run_id = "test_run_id" + logical_date = timezone.datetime(2025, 2, 20) + + # Create DAG with run_on_latest_version=True to trigger the race condition + with dag_maker(dag_id=dag_id, session=session, serialized=True, run_on_latest_version=True): + EmptyOperator(task_id="test_task") + + # Create DagBundleModel with v2.0.0 (simulating bundle refresh completed) + dag_bundle = DagBundleModel(name="test_bundle", version="v2.0.0") + session.add(dag_bundle) + session.flush() + + # Update DagModel to point to this bundle + dag_model = session.scalars(select(DagModel).where(DagModel.dag_id == dag_id)).one() + dag_model.bundle_name = "test_bundle" + dag_model.bundle_version = "v1.0.0" # Original version + session.flush() + + # DagVersion for v1.0.0 exists (from dag_maker), but v2.0.0 doesn't exist yet + # This simulates the race condition where bundle refresh updated DagBundleModel + # but parsing hasn't completed yet. The DAG-level run_on_latest_version=True + # will cause the backend to request v2.0.0, which hasn't been serialized yet. + + session.commit() + + # Request trigger (no bundle_version in payload - using DAG-level config) + response = client.post( + f"/execution/dag-runs/{dag_id}/{run_id}", + json={ + "logical_date": logical_date.isoformat(), + }, + ) + + # Should return 503 Service Unavailable with clear message + assert response.status_code == 503 + response_json = response.json() + assert response_json["detail"]["reason"] == "bundle_version_unavailable" + assert "not been parsed yet" in response_json["detail"]["message"] + assert "retry" in response_json["detail"]["message"].lower() + class TestDagRunClear: def setup_method(self): diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 4511c33368933..d89cc124730e1 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -48,7 +48,7 @@ from airflow.callbacks.database_callback_sink import DatabaseCallbackSink from airflow.dag_processing.collection import AssetModelOperation, DagModelOperation from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, BundleVersionUnavailable from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_constants import MOCK_EXECUTOR from airflow.executors.executor_loader import ExecutorLoader @@ -5215,6 +5215,140 @@ def test_scheduler_create_dag_runs_does_not_crash_on_deserialization_error(self, f"Expected deserialization error log, got: {scheduler_messages}" ) + def test_scheduler_create_dag_runs_handles_bundle_version_unavailable(self, caplog, dag_maker): + """ + Test that scheduler._create_dag_runs handles BundleVersionUnavailable gracefully. + + During bundle refresh, there's a window where DagBundleModel.version is updated + but DAGs haven't been parsed/serialized yet. The scheduler should log a warning + and continue processing other DAGs. + """ + with dag_maker(dag_id="test_bundle_unavailable", run_on_latest_version=True): + EmptyOperator(task_id="dummy") + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + caplog.set_level("WARNING") + caplog.clear() + with ( + create_session() as session, + caplog.at_level("WARNING", logger="airflow.jobs.scheduler_job_runner"), + patch( + "airflow.serialization.definitions.dag.SerializedDAG.create_dagrun", + side_effect=BundleVersionUnavailable("Bundle version not yet parsed"), + ), + ): + # Should not raise - scheduler continues processing + self.job_runner._create_dag_runs([dag_maker.dag_model], session) + + scheduler_messages = [ + record.message for record in caplog.records if record.levelno >= logging.WARNING + ] + assert any("Bundle version not yet available" in msg for msg in scheduler_messages) + + def test_scheduler_create_dag_runs_asset_triggered_handles_bundle_version_unavailable( + self, caplog, dag_maker, session + ): + """ + Test that asset-triggered DAG run creation handles BundleVersionUnavailable gracefully. + + When an asset trigger fires during a bundle refresh window, the scheduler should + log a warning and continue processing. + """ + asset = Asset(uri="test://asset_1", name="test_asset_1", group="test_group") + with dag_maker(dag_id="test_asset_bundle_unavailable", schedule=[asset], run_on_latest_version=True): + EmptyOperator(task_id="dummy") + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + # Create asset event to trigger the DAG + asset_manager = AssetManager() + asset_manager.register_asset_change( + asset=asset, + session=session, + ) + + caplog.set_level("WARNING") + caplog.clear() + with ( + caplog.at_level("WARNING", logger="airflow.jobs.scheduler_job_runner"), + patch( + "airflow.serialization.definitions.dag.SerializedDAG.create_dagrun", + side_effect=BundleVersionUnavailable("Bundle version not yet parsed"), + ), + ): + triggered_date_by_dag = {dag_maker.dag.dag_id: timezone.utcnow()} + + # Should not raise - scheduler handles exception gracefully + self.job_runner._create_dag_runs_asset_triggered( + [dag_maker.dag_model], triggered_date_by_dag, session + ) + + scheduler_messages = [ + record.message for record in caplog.records if record.levelno >= logging.WARNING + ] + assert any("Bundle version not yet available" in msg for msg in scheduler_messages) + + @pytest.mark.need_serialized_dag + @pytest.mark.usefixtures("clear_asset_partition_rows") + def test_create_dagruns_for_partitioned_asset_dags_handles_bundle_version_unavailable( + self, caplog, dag_maker, session + ): + """ + Test that partitioned asset-triggered DAG run creation handles BundleVersionUnavailable. + + When a partitioned asset trigger fires during a bundle refresh window, the scheduler + should log a warning, leave apdr.created_dag_run_id as None for retry, and continue. + """ + asset_1 = Asset(name="asset-partition-test") + + # Consumer DAG with partitioned asset timetable + with dag_maker( + dag_id="partitioned-consumer", + schedule=PartitionedAssetTimetable( + assets=asset_1, + default_partition_mapper=IdentityMapper(), + ), + session=session, + ): + EmptyOperator(task_id="hi") + session.commit() + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + # Produce an asset event to create the AssetPartitionDagRun record + apdr = _produce_and_register_asset_event( + dag_id="partitioned-producer", + asset=asset_1, + partition_key="key-1", + session=session, + dag_maker=dag_maker, + ) + + caplog.set_level("WARNING") + caplog.clear() + with ( + caplog.at_level("WARNING", logger="airflow.jobs.scheduler_job_runner"), + patch( + "airflow.serialization.definitions.dag.SerializedDAG.create_dagrun", + side_effect=BundleVersionUnavailable("Bundle version not yet parsed"), + ), + ): + partition_dags = self.job_runner._create_dagruns_for_partitioned_asset_dags(session=session) + + # apdr should remain unlinked so it's retried on the next cycle + session.refresh(apdr) + assert apdr.created_dag_run_id is None + assert partition_dags == {"partitioned-consumer"} + + scheduler_messages = [ + record.message for record in caplog.records if record.levelno >= logging.WARNING + ] + assert any("Bundle version not yet available" in msg for msg in scheduler_messages) + def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_maker, testing_dag_bundle): """ Test that externally triggered Dag Runs should not affect (by skipping) next diff --git a/airflow-core/tests/unit/models/test_backfill.py b/airflow-core/tests/unit/models/test_backfill.py index ad309e5038359..d0fa86331c2b3 100644 --- a/airflow-core/tests/unit/models/test_backfill.py +++ b/airflow-core/tests/unit/models/test_backfill.py @@ -38,6 +38,7 @@ _create_backfill, _get_latest_dag_run_row_query, ) +from airflow.models.dag_version import DagVersion from airflow.providers.standard.operators.python import PythonOperator from airflow.ti_deps.dep_context import DepContext from airflow.timetables.base import DagRunInfo @@ -171,6 +172,19 @@ def test_create_backfill_clear_existing_bundle_version(dag_maker, session, run_o dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag.dag_id)) first_bundle_version = "bundle_VclmpcTdXv" dag_model.bundle_version = first_bundle_version + + # Get existing DagVersion (created by dag_maker) and create new one with bundle version + existing_version = session.scalar( + select(DagVersion).where(DagVersion.dag_id == dag.dag_id).order_by(DagVersion.version_number.desc()) + ) + session.add( + DagVersion( + dag_id=dag.dag_id, + bundle_name="test_bundle", + bundle_version=first_bundle_version, + version_number=existing_version.version_number + 1 if existing_version else 1, + ) + ) session.commit() for date in existing: dag_maker.create_dagrun( @@ -181,6 +195,19 @@ def test_create_backfill_clear_existing_bundle_version(dag_maker, session, run_o # update bundle version new_bundle_version = "bundle_VclmpcTdXv-2" dag_model.bundle_version = new_bundle_version + + # Create DagVersion for new bundle version + existing_version = session.scalar( + select(DagVersion).where(DagVersion.dag_id == dag.dag_id).order_by(DagVersion.version_number.desc()) + ) + session.add( + DagVersion( + dag_id=dag.dag_id, + bundle_name="test_bundle", + bundle_version=new_bundle_version, + version_number=existing_version.version_number + 1, + ) + ) session.commit() # verify that existing dag runs still have the first bundle version diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 046c85ea79901..ef2d561d8262e 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -41,7 +41,7 @@ from airflow._shared.timezones.timezone import datetime as datetime_tz from airflow.configuration import conf from airflow.dag_processing.dagbag import BundleDagBag, DagBag -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, BundleVersionUnavailable from airflow.models.asset import ( AssetAliasModel, AssetDagRunQueue, @@ -57,6 +57,7 @@ get_next_data_interval, get_run_data_interval, ) +from airflow.models.dag_version import DagVersion from airflow.models.dagbag import DBDagBag from airflow.models.dagbundle import DagBundleModel from airflow.models.dagrun import DagRun @@ -1232,6 +1233,102 @@ def test_create_dagrun_partition_key(self, partition_key, dag_maker): ) assert dr.partition_key == partition_key + def _setup_bundle(self, session, dag_id, bundle_name, original_version="v1.0.0", latest_version="v2.0.0"): + """ + Set up bundle configuration for testing bundle version resolution. + + Creates a DagBundleModel with the latest version, updates the DagModel with bundle info, + and creates DagVersion entries for both original and latest versions to simulate + what happens when different bundle versions are parsed. + """ + + # Create bundle with latest version + session.add(DagBundleModel(name=bundle_name, version=latest_version)) + session.flush() + + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id)) + if not dag_model: + return + + # Configure DagModel with bundle info + dag_model.bundle_name = bundle_name + dag_model.bundle_version = original_version + session.flush() + + # Update existing DagVersion to represent original version + existing_dag_version = session.scalar(select(DagVersion).where(DagVersion.dag_id == dag_id)) + if not existing_dag_version: + return + + existing_dag_version.bundle_name = bundle_name + existing_dag_version.bundle_version = original_version + + # Create DagVersion for latest bundle version + session.add( + DagVersion( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version=latest_version, + version_number=existing_dag_version.version_number + 1, + ) + ) + session.flush() + + def _create_test_dagrun(self, dag_maker, run_id="test_run", days_offset=0, **kwargs): + """Helper to create a dagrun with common defaults.""" + logical_date = DEFAULT_DATE + timedelta(days=days_offset) + return dag_maker.create_dagrun( + run_id=run_id, + logical_date=logical_date, + run_after=logical_date, + run_type=DagRunType.MANUAL, + state=State.NONE, + triggered_by=DagRunTriggeredByType.TEST, + **kwargs, + ) + + @mock.patch("airflow.configuration.conf.getboolean") + def test_create_dagrun_with_global_run_on_latest_version(self, mock_getboolean, dag_maker, session): + """Test that global config run_on_latest_version uses latest bundle version.""" + mock_getboolean.side_effect = lambda section, key, fallback=None: ( + True if section == "core" and key == "run_on_latest_version" else fallback + ) + + with dag_maker("test_global_default_run_on_latest"): + ... + + self._setup_bundle(session, "test_global_default_run_on_latest", "test_bundle_global") + + dr = self._create_test_dagrun(dag_maker) + assert dr.bundle_version == "v2.0.0" + + def test_create_dagrun_with_dag_level_run_on_latest_version(self, dag_maker, session): + """Test that DAG-level run_on_latest_version uses latest bundle version.""" + with dag_maker("test_dag_default_run_on_latest", run_on_latest_version=True): + ... + + self._setup_bundle(session, "test_dag_default_run_on_latest", "test_bundle_dag_level") + + dr = self._create_test_dagrun(dag_maker) + assert dr.bundle_version == "v2.0.0" + + @mock.patch("airflow.configuration.conf.getboolean") + def test_create_dagrun_precedence_hierarchy(self, mock_getboolean, dag_maker, session): + """Test that precedence hierarchy works: DAG > global > system default.""" + mock_getboolean.side_effect = lambda section, key, fallback=None: ( + True if section == "core" and key == "run_on_latest_version" else fallback + ) + + # DAG level explicitly says False (should override global True) + with dag_maker("test_precedence", run_on_latest_version=False): + ... + + self._setup_bundle(session, "test_precedence", "test_bundle_precedence") + + # DAG level False should override global True - uses original version + dr1 = self._create_test_dagrun(dag_maker, run_id="test_dag_level_overrides_global") + assert dr1.bundle_version == "v1.0.0" + def test_dag_add_task_sets_default_task_group(self): dag = DAG(dag_id="test_dag_add_task_sets_default_task_group", schedule=None, start_date=DEFAULT_DATE) task_without_task_group = EmptyOperator(task_id="task_without_group_id") @@ -1245,6 +1342,108 @@ def test_dag_add_task_sets_default_task_group(self): assert task_group.get_child_by_label("task_with_task_group") == task_with_task_group assert dag.get_task("task_group.task_with_task_group") == task_with_task_group + def test_create_dagrun_system_default_uses_original_version(self, dag_maker, session): + """Test system default: when DAG=None and global=False, uses original bundle version.""" + with conf_vars({("core", "run_on_latest_version"): "False"}): + with dag_maker("test_system_default", run_on_latest_version=None): + ... + + self._setup_bundle(session, "test_system_default", "test_bundle_system_default") + + dr = self._create_test_dagrun(dag_maker, run_id="test_system_default") + assert dr.bundle_version == "v1.0.0" + + def test_create_dagrun_disable_bundle_versioning_bypasses_logic(self, dag_maker, session): + """Test that disable_bundle_versioning=True bypasses all bundle version logic.""" + with conf_vars({("core", "run_on_latest_version"): "True"}): + with dag_maker("test_bypass", disable_bundle_versioning=True): + ... + + self._setup_bundle(session, "test_bypass", "test_bundle_bypass") + + dr = self._create_test_dagrun(dag_maker, run_id="test_bypass") + assert dr.bundle_version is None + + def test_create_dagrun_race_condition_fails_fast(self, dag_maker, session): + """Test that race condition (bundle updated but not serialized) raises clear error.""" + + with dag_maker("test_race_condition", run_on_latest_version=True): + ... + + # Create DagBundleModel with v2.0.0 (simulating bundle refresh) + dag_bundle = DagBundleModel(name="test_bundle_race", version="v2.0.0") + session.add(dag_bundle) + session.flush() + + # Update DagModel to point to this bundle + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == "test_race_condition")) + dag_model.bundle_name = "test_bundle_race" + dag_model.bundle_version = "v1.0.0" # Original version + session.flush() + + # Create only ONE DagVersion for v1.0.0 (v2.0.0 not serialized yet - simulating race) + existing_dag_version = session.scalar( + select(DagVersion).where(DagVersion.dag_id == "test_race_condition").limit(1) + ) + if existing_dag_version: + existing_dag_version.bundle_name = "test_bundle_race" + existing_dag_version.bundle_version = "v1.0.0" + session.flush() + + # Request "run on latest" - should get v2.0.0 from DagBundleModel + # But DagVersion for v2.0.0 doesn't exist yet (race condition) + # Should raise exception with clear message about temporary condition + with pytest.raises( + BundleVersionUnavailable, + match="Cannot create DagRun.*bundle version v2.0.0.*not been parsed yet.*retry", + ): + self._create_test_dagrun(dag_maker, run_id="test_race_fail") + + @mock.patch("airflow.configuration.conf.getboolean") + def test_create_dagrun_dag_none_uses_global_true(self, mock_getboolean, dag_maker, session): + """Test DAG=None with Global=True uses latest version (not original).""" + mock_getboolean.side_effect = lambda section, key, fallback=None: ( + True if section == "core" and key == "run_on_latest_version" else fallback + ) + + # DAG level is None (not explicitly set) - should use global True + with dag_maker("test_dag_none_global_true", run_on_latest_version=None): + ... + + self._setup_bundle(session, "test_dag_none_global_true", "test_bundle_none_global") + + dr = self._create_test_dagrun(dag_maker) + # Should use v2.0.0 because global=True and DAG=None falls through + assert dr.bundle_version == "v2.0.0" + + def test_create_dagrun_non_versioned_bundle_uses_original_version(self, dag_maker, session): + """Test that non-versioned bundles (e.g. LocalDagBundle) fall back to original version.""" + + with dag_maker("test_non_versioned_bundle", run_on_latest_version=True): + ... + + # Create a non-versioned bundle (version=None, like LocalDagBundle) + session.add(DagBundleModel(name="local_bundle", version=None)) + session.flush() + + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == "test_non_versioned_bundle")) + dag_model.bundle_name = "local_bundle" + dag_model.bundle_version = "v1.0.0" + session.flush() + + # Update existing DagVersion to match + existing_dag_version = session.scalar( + select(DagVersion).where(DagVersion.dag_id == "test_non_versioned_bundle") + ) + existing_dag_version.bundle_name = "local_bundle" + existing_dag_version.bundle_version = "v1.0.0" + session.flush() + + dr = self._create_test_dagrun(dag_maker) + # Even though run_on_latest_version=True, bundle doesn't support versioning, + # so it should fall back to original version + assert dr.bundle_version == "v1.0.0" + @pytest.mark.parametrize("dag_run_state", [DagRunState.QUEUED, DagRunState.RUNNING]) @pytest.mark.need_serialized_dag def test_clear_set_dagrun_state(self, dag_run_state, dag_maker, session): @@ -3556,6 +3755,24 @@ def hello(): dag_model.bundle_version = bundle_version session.commit() + # Create DagVersion for the bundle_version to avoid BundleVersionUnavailable + if not disable and bundle_version: + existing_version = session.scalar( + select(DagVersion) + .where(DagVersion.dag_id == dag.dag_id) + .order_by(DagVersion.version_number.desc()) + ) + if existing_version: + session.add( + DagVersion( + dag_id=dag.dag_id, + bundle_name=existing_version.bundle_name, + bundle_version=bundle_version, + version_number=existing_version.version_number + 1, + ) + ) + session.commit() + dr = dag.create_dagrun( run_id="abcoercuhcrh", run_after=pendulum.now(), diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 375b13dea3561..7c1599add73d2 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -4635,3 +4635,20 @@ def get_weight(self, ti): op = BaseOperator(task_id="empty_task", weight_rule=NotRegisteredPriorityWeightStrategy()) with pytest.raises(ValueError, match="Unknown priority strategy"): OperatorSerialization.serialize(op) + + +@pytest.mark.parametrize("run_on_latest_version", [None, True, False]) +def test_dag_run_on_latest_version_serialization(run_on_latest_version): + """Test that run_on_latest_version is serialized and deserialized correctly.""" + dag = DAG( + dag_id=f"test_dag_{run_on_latest_version}", + start_date=datetime(2023, 1, 1), + schedule=None, + run_on_latest_version=run_on_latest_version, + ) + BaseOperator(task_id="task", dag=dag) + + assert dag.run_on_latest_version is run_on_latest_version + serialized = DagSerialization.to_dict(dag) + deserialized = DagSerialization.from_dict(serialized) + assert deserialized.run_on_latest_version is run_on_latest_version diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index bb375f888349c..4f7e1adb4feb0 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1390,6 +1390,7 @@ class DAGDetailsResponse(BaseModel): owner_links: Annotated[dict[str, str] | None, Field(title="Owner Links")] = None is_favorite: Annotated[bool | None, Field(title="Is Favorite")] = False active_runs_count: Annotated[int | None, Field(title="Active Runs Count")] = 0 + run_on_latest_version: Annotated[bool | None, Field(title="Run On Latest Version")] = None file_token: Annotated[str, Field(description="Return file token.", title="File Token")] concurrency: Annotated[ int, @@ -1717,6 +1718,7 @@ class TaskInstanceHistoryResponse(BaseModel): max_tries: Annotated[int, Field(title="Max Tries")] task_display_name: Annotated[str, Field(title="Task Display Name")] dag_display_name: Annotated[str, Field(title="Dag Display Name")] + dag_run_bundle_version: Annotated[str | None, Field(title="Dag Run Bundle Version")] = None hostname: Annotated[str | None, Field(title="Hostname")] = None unixname: Annotated[str | None, Field(title="Unixname")] = None pool: Annotated[str, Field(title="Pool")] @@ -1753,6 +1755,7 @@ class TaskInstanceResponse(BaseModel): max_tries: Annotated[int, Field(title="Max Tries")] task_display_name: Annotated[str, Field(title="Task Display Name")] dag_display_name: Annotated[str, Field(title="Dag Display Name")] + dag_run_bundle_version: Annotated[str | None, Field(title="Dag Run Bundle Version")] = None hostname: Annotated[str | None, Field(title="Hostname")] = None unixname: Annotated[str | None, Field(title="Unixname")] = None pool: Annotated[str, Field(title="Pool")] diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index c3f786584746b..f3454eacf6397 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -414,6 +414,10 @@ class DAG: :param allowed_run_types: An optional list or single DagRunType specifying which run types are permitted for this dag. When set, the scheduler and API will only allow runs of the specified types. :param dag_display_name: The display name of the Dag which appears on the UI. + :param run_on_latest_version: If True, runs of this DAG will use the latest + available bundle version when triggered, rerun, or cleared. If False, runs will + use the original bundle version. If None (default), inherits from the global config + ``[core] run_on_latest_version``. """ __serialized_fields: ClassVar[frozenset[str]] @@ -545,6 +549,7 @@ def __rich_repr__(self): disable_bundle_versioning: bool = attrs.field( factory=_config_bool_factory("dag_processor", "disable_bundle_versioning") ) + run_on_latest_version: bool | None = attrs.field(default=None, converter=attrs.converters.optional(bool)) # TODO (GH-52141): This is never used in the sdk dag (it only makes sense # after this goes through the dag processor), but various parts of the code @@ -1568,6 +1573,7 @@ def dag( allowed_run_types: DagRunType | Collection[DagRunType] | None = None, dag_display_name: str | None = None, disable_bundle_versioning: bool = False, + run_on_latest_version: bool | None = None, ) -> Callable[[Callable], Callable[..., DAG]]: """ Python dag decorator which wraps a function into an Airflow Dag.