From 978e2cd38e70102814d9a5b7f931327d165f6c16 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Fri, 28 Nov 2025 14:24:24 +0100 Subject: [PATCH 1/5] Add documentation (cherry picked from commit 96644d0850ea164967ac49b425c0c22b1fb5cc54) --- airflow-core/docs/howto/index.rst | 1 + airflow-core/docs/howto/performance.rst | 48 +++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 airflow-core/docs/howto/performance.rst diff --git a/airflow-core/docs/howto/index.rst b/airflow-core/docs/howto/index.rst index d2bd18573d9d2..17d56694a8784 100644 --- a/airflow-core/docs/howto/index.rst +++ b/airflow-core/docs/howto/index.rst @@ -56,4 +56,5 @@ configuring an Airflow environment. dynamic-dag-generation docker-compose/index run-with-self-signed-certificate + performance memory-profiling diff --git a/airflow-core/docs/howto/performance.rst b/airflow-core/docs/howto/performance.rst new file mode 100644 index 0000000000000..c092c5c8a44fd --- /dev/null +++ b/airflow-core/docs/howto/performance.rst @@ -0,0 +1,48 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Performance tuning (API and UI) +=============================== + +This guide collects pragmatic tips that improve Airflow performance for API and UI workloads. + +Configurable metadata indexes +----------------------------- + +Airflow can create additional database indexes on startup to accelerate common queries used by the API and UI. +This is helpful in larger deployments where filtering and lookups on high‑cardinality columns become hot paths. + +Use :ref:`config:database__metadata_indexes` to declare a list of index specifications. +Each entry must follow the ``table(column1, column2, ...)`` syntax (no schema qualification). + +- Indexes are created at API server startup. Existing indexes are detected and skipped. +- On PostgreSQL, indexes are created ``CONCURRENTLY`` to avoid blocking writes during creation. +- On other databases (e.g. MySQL, SQLite), a best‑effort non‑blocking creation is attempted; if not supported, + a standard index creation is used. +- Only Airflow metadata tables should be targeted. Do not include schema qualifiers. + +When to use: + +- Slow API list/detail endpoints caused by frequent scans or lookups on columns like ``dag_id``, ``task_id``, + ``run_id``, timestamps (e.g. ``dttm``), or status fields. +- UI pages that load large lists or perform heavy filtering on metadata tables. + +Additional Notes: + +- Review query plans (e.g. via ``EXPLAIN``) to choose effective column sets and ordering for your workload. +- Composite indexes should list columns in selectivity order appropriate to your most common predicates. +- Indexes incur write overhead; add only those that materially improve your read paths. From 0a91b54ea9d6c0af43fa573c686ed0a3875733d6 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Wed, 3 Dec 2025 12:09:36 +0100 Subject: [PATCH 2/5] Address code review --- airflow-core/docs/howto/performance.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/docs/howto/performance.rst b/airflow-core/docs/howto/performance.rst index c092c5c8a44fd..012b0154f8e8f 100644 --- a/airflow-core/docs/howto/performance.rst +++ b/airflow-core/docs/howto/performance.rst @@ -23,7 +23,7 @@ This guide collects pragmatic tips that improve Airflow performance for API and Configurable metadata indexes ----------------------------- -Airflow can create additional database indexes on startup to accelerate common queries used by the API and UI. +Airflow can create additional database indexes on API startup to accelerate common queries used by the API and UI. This is helpful in larger deployments where filtering and lookups on high‑cardinality columns become hot paths. Use :ref:`config:database__metadata_indexes` to declare a list of index specifications. From 9dba43db839c87657fe908406b95256af3155908 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Wed, 3 Dec 2025 15:42:57 +0100 Subject: [PATCH 3/5] Update documentation --- airflow-core/docs/howto/performance.rst | 39 +++++++++++-------- .../airflow/api_fastapi/common/parameters.py | 7 +--- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/airflow-core/docs/howto/performance.rst b/airflow-core/docs/howto/performance.rst index 012b0154f8e8f..1f7d77d85dc58 100644 --- a/airflow-core/docs/howto/performance.rst +++ b/airflow-core/docs/howto/performance.rst @@ -20,28 +20,35 @@ Performance tuning (API and UI) This guide collects pragmatic tips that improve Airflow performance for API and UI workloads. -Configurable metadata indexes ------------------------------ +Custom metadata indexes +----------------------- -Airflow can create additional database indexes on API startup to accelerate common queries used by the API and UI. -This is helpful in larger deployments where filtering and lookups on high‑cardinality columns become hot paths. +If you observe slowness in some API calls or specific UI views, you should inspect query plans and add indexes yourself +that match your workload. Listing endpoints and UI table views with specific ordering criteria are likely +to benefit from additional indexes if you have a large volume of metadata. -Use :ref:`config:database__metadata_indexes` to declare a list of index specifications. -Each entry must follow the ``table(column1, column2, ...)`` syntax (no schema qualification). +When to use +^^^^^^^^^^^ -- Indexes are created at API server startup. Existing indexes are detected and skipped. -- On PostgreSQL, indexes are created ``CONCURRENTLY`` to avoid blocking writes during creation. -- On other databases (e.g. MySQL, SQLite), a best‑effort non‑blocking creation is attempted; if not supported, - a standard index creation is used. -- Only Airflow metadata tables should be targeted. Do not include schema qualifiers. +- Slow API list/detail endpoints caused by frequent scans or lookups on columns like ``start_date``, timestamps (e.g. ``dttm``), or status fields. +- UI pages that load large lists or perform heavy filtering on metadata tables. -When to use: +Guidance +^^^^^^^^ -- Slow API list/detail endpoints caused by frequent scans or lookups on columns like ``dag_id``, ``task_id``, - ``run_id``, timestamps (e.g. ``dttm``), or status fields. -- UI pages that load large lists or perform heavy filtering on metadata tables. +- Inspect the query planner (e.g., ``EXPLAIN``/``EXPLAIN ANALYZE``) for slow endpoints and identify missing indexes. +- Prefer single or composite indexes that match your most common ordering logic, typically the ``order_by`` + query parameter used in API calls. Composite indexes can cover multi criteria ordering. +- Your optimal indexes depend on how you use the API and UI; there is no one-size-fits-all set we can ship by default. + +Upgrade considerations +^^^^^^^^^^^^^^^^^^^^^^ + +To avoid conflicts with Airflow database upgrades, delete your custom indexes before running an Airflow DB upgrade +and re-apply them after the upgrade succeeds. -Additional Notes: +Notes +^^^^^ - Review query plans (e.g. via ``EXPLAIN``) to choose effective column sets and ordering for your workload. - Composite indexes should list columns in selectivity order appropriate to your most common predicates. diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 914d6284fc5b8..dd80027cf0c59 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -35,7 +35,7 @@ from fastapi import Depends, HTTPException, Query from pendulum.parsing.exceptions import ParserError from pydantic import AfterValidator, BaseModel, NonNegativeInt -from sqlalchemy import Column, and_, case, func, not_, or_, select as sql_select +from sqlalchemy import Column, and_, func, not_, or_, select as sql_select from sqlalchemy.inspection import inspect from airflow._shared.timezones import timezone @@ -252,11 +252,6 @@ def to_orm(self, select: Select) -> Select: if column is None: column = getattr(self.model, lstriped_orderby) - # MySQL does not support `nullslast`, and True/False ordering depends on the - # database implementation. - nullscheck = case((column.isnot(None), 0), else_=1) - - columns.append(nullscheck) if order_by_value.startswith("-"): columns.append(column.desc()) else: From df255d1ebfc52486e39e0cbbbaba3c79939e0d08 Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Wed, 3 Dec 2025 19:22:09 +0100 Subject: [PATCH 4/5] Fix CI --- .../core_api/routes/public/test_dags.py | 80 ++++++++++++------- .../core_api/routes/public/test_hitl.py | 4 + .../routes/public/test_task_instances.py | 3 +- 3 files changed, 56 insertions(+), 31 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index b0dc432ceedc1..763f25b0e2ee6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -383,36 +383,6 @@ class TestGetDags(TestDagEndpoint): 2, [DAG1_ID, DAG2_ID], ), - ( - {"order_by": "next_dagrun", "exclude_stale": False}, - 3, - [DAG3_ID, DAG1_ID, DAG2_ID], - ), - ( - {"order_by": "last_run_state", "exclude_stale": False}, - 3, - [DAG1_ID, DAG3_ID, DAG2_ID], - ), - ( - {"order_by": "-last_run_state", "exclude_stale": False}, - 3, - [DAG3_ID, DAG1_ID, DAG2_ID], - ), - ( - {"order_by": "last_run_start_date", "exclude_stale": False}, - 3, - [DAG1_ID, DAG3_ID, DAG2_ID], - ), - ( - {"order_by": "-last_run_start_date", "exclude_stale": False}, - 3, - [DAG3_ID, DAG1_ID, DAG2_ID], - ), - ( - {"order_by": ["next_dagrun", "-dag_display_name"], "exclude_stale": False}, - 3, - [DAG3_ID, DAG2_ID, DAG1_ID], - ), # Search ({"dag_id_pattern": "1"}, 1, [DAG1_ID]), ({"dag_display_name_pattern": "test_dag2"}, 1, [DAG2_ID]), @@ -451,6 +421,56 @@ def test_get_dags(self, test_client, query_params, expected_total_entries, expec assert actual_ids == expected_ids + # Ordering of nulls values is DB specific. + @pytest.mark.backend("sqlite") + @pytest.mark.parametrize( + ("query_params", "expected_total_entries", "expected_ids"), + [ + ( + {"order_by": "next_dagrun", "exclude_stale": False}, + 3, + [DAG1_ID, DAG2_ID, DAG3_ID], + ), + ( + {"order_by": "last_run_state", "exclude_stale": False}, + 3, + [DAG2_ID, DAG1_ID, DAG3_ID], + ), + ( + {"order_by": "-last_run_state", "exclude_stale": False}, + 3, + [DAG3_ID, DAG1_ID, DAG2_ID], + ), + ( + {"order_by": "last_run_start_date", "exclude_stale": False}, + 3, + [DAG2_ID, DAG1_ID, DAG3_ID], + ), + ( + {"order_by": "-last_run_start_date", "exclude_stale": False}, + 3, + [DAG3_ID, DAG1_ID, DAG2_ID], + ), + ( + {"order_by": ["next_dagrun", "-dag_display_name"], "exclude_stale": False}, + 3, + [DAG2_ID, DAG1_ID, DAG3_ID], + ), + ], + ) + def test_get_dags_with_nullable_fields( + self, test_client, query_params, expected_total_entries, expected_ids, session + ): + with assert_queries_count(4): + response = test_client.get("/dags", params=query_params) + assert response.status_code == 200 + body = response.json() + + assert body["total_entries"] == expected_total_entries + actual_ids = [dag["dag_id"] for dag in body["dags"]] + + assert actual_ids == expected_ids + @mock.patch("airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids") def test_get_dags_should_call_get_authorized_dag_ids(self, mock_get_authorized_dag_ids, test_client): mock_get_authorized_dag_ids.return_value = {DAG1_ID, DAG2_ID} diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index b2c05e982b8b9..d9b5a0287bb4b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -697,6 +697,10 @@ def test_should_respond_200_with_existing_response_and_order_by( reverse=reverse, ) + # Remove entries with None, because None orders depends on the DB implementation + hitl_details = [d for d in hitl_details if get_key_lambda(d) is not None] + sorted_hitl_details = [d for d in sorted_hitl_details if get_key_lambda(d) is not None] + assert hitl_details == sorted_hitl_details def test_should_respond_200_without_response(self, test_client: TestClient) -> None: diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 1c53c2ec0610d..66b506e4d929c 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -817,10 +817,11 @@ def test_mapped_instances_order( assert len(body["task_instances"]) == params["limit"] assert expected_map_indexes == [ti["map_index"] for ti in body["task_instances"]] + @pytest.mark.backend("sqlite") @pytest.mark.parametrize( ("params", "expected_map_indexes"), [ - ({"order_by": "rendered_map_index", "limit": 108}, [0] + list(range(1, 108))), # Asc + ({"order_by": "rendered_map_index", "limit": 108}, list(range(1, 109))), # Asc ({"order_by": "-rendered_map_index", "limit": 100}, [0] + list(range(11, 110)[::-1])), # Desc ], ) From d25ffd44815820c13c1aea463f1fff8619e37d2e Mon Sep 17 00:00:00 2001 From: pierrejeambrun Date: Thu, 4 Dec 2025 14:29:19 +0100 Subject: [PATCH 5/5] Fix CI --- .../core_api/routes/public/test_event_logs.py | 53 ++++++++++++++----- .../routes/public/test_task_instances.py | 1 + 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_event_logs.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_event_logs.py index 3a7fac8618916..c71dc587a151f 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_event_logs.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_event_logs.py @@ -279,20 +279,6 @@ class TestGetEventLogs(TestEventLogsEndpoint): 4, [EVENT_NORMAL, EVENT_WITH_OWNER, TASK_INSTANCE_EVENT, EVENT_WITH_OWNER_AND_TASK_INSTANCE], ), - # order_by - ( - {"order_by": "-id"}, - 200, - 4, - [EVENT_WITH_OWNER_AND_TASK_INSTANCE, TASK_INSTANCE_EVENT, EVENT_WITH_OWNER, EVENT_NORMAL], - ), - ( - {"order_by": "logical_date"}, - 200, - 4, - [TASK_INSTANCE_EVENT, EVENT_WITH_OWNER_AND_TASK_INSTANCE, EVENT_NORMAL, EVENT_WITH_OWNER], - ), - # combination of query parameters ( {"offset": 1, "excluded_events": ["non_existed_event"], "order_by": "event"}, 200, @@ -327,6 +313,45 @@ def test_get_event_logs( for event_log, expected_event in zip(resp_json["event_logs"], expected_events): assert event_log["event"] == expected_event + # Ordering of nulls values is DB specific. + @pytest.mark.backend("sqlite") + @pytest.mark.parametrize( + ("query_params", "expected_status_code", "expected_total_entries", "expected_events"), + [ + ( + {"order_by": "-id"}, + 200, + 4, + [EVENT_WITH_OWNER_AND_TASK_INSTANCE, TASK_INSTANCE_EVENT, EVENT_WITH_OWNER, EVENT_NORMAL], + ), + ( + {"order_by": "logical_date"}, + 200, + 4, + [EVENT_NORMAL, EVENT_WITH_OWNER, TASK_INSTANCE_EVENT, EVENT_WITH_OWNER_AND_TASK_INSTANCE], + ), + ( + {"order_by": "-logical_date"}, + 200, + 4, + [EVENT_WITH_OWNER_AND_TASK_INSTANCE, TASK_INSTANCE_EVENT, EVENT_WITH_OWNER, EVENT_NORMAL], + ), + ], + ) + def test_get_event_logs_order_by( + self, test_client, query_params, expected_status_code, expected_total_entries, expected_events + ): + with assert_queries_count(2): + response = test_client.get("/eventLogs", params=query_params) + assert response.status_code == expected_status_code + if expected_status_code != 200: + return + + resp_json = response.json() + assert resp_json["total_entries"] == expected_total_entries + for event_log, expected_event in zip(resp_json["event_logs"], expected_events): + assert event_log["event"] == expected_event + def test_should_raises_401_unauthenticated(self, unauthenticated_test_client): response = unauthenticated_test_client.get("/eventLogs") assert response.status_code == 401 diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 66b506e4d929c..66a100c6c6bdb 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -817,6 +817,7 @@ def test_mapped_instances_order( assert len(body["task_instances"]) == params["limit"] assert expected_map_indexes == [ti["map_index"] for ti in body["task_instances"]] + # Ordering of nulls values is DB specific. @pytest.mark.backend("sqlite") @pytest.mark.parametrize( ("params", "expected_map_indexes"),