Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow-core/docs/administration-and-deployment/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ This section contains information about deploying Dags into production and the a
priority-weight
web-stack
plugins
task-and-asset-store
task-and-asset-store-cleanup
task-and-asset-state-store
task-and-asset-state-store-cleanup
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
specific language governing permissions and limitations
under the License.

.. _task-and-asset-store-cleanup:
.. _task-and-asset-state-store-cleanup:

Task and Asset Store Cleanup
=============================
Task and Asset State Store Cleanup
==================================

.. versionadded:: 3.3

Airflow does not automatically purge task store rows on a schedule. Cleanup (also known as "garbage collection") is the responsibility of the user (you) and must be triggered explicitly via the CLI. This page explains what gets cleaned up, how to run it, and how to integrate it into a recurring maintenance workflow.
Airflow does not automatically purge task state store rows on a schedule. Cleanup (also known as "garbage collection") is the responsibility of the user (you) and must be triggered explicitly via the CLI. This page explains what gets cleaned up, how to run it, and how to integrate it into a recurring maintenance workflow.


What gets cleaned up
--------------------

The cleanup command operates only on **task store** rows in the ``MetastoreStoreBackend``. Asset store rows are never touched by this command. Asset store rows are removed only by the orphan sweep when an asset is deactivated (see :ref:`task-and-asset-store`).
The cleanup command operates only on **task state store** rows in the ``MetastoreBackend``. Asset store rows are never touched by this command. Asset store rows are removed only by the orphan sweep when an asset is deactivated (see :ref:`task-and-asset-state-store`).

A task store row is eligible for deletion when its ``expires_at`` timestamp is in the past. ``expires_at`` is computed on the worker at write time:
A task state store row is eligible for deletion when its ``expires_at`` timestamp is in the past. ``expires_at`` is computed on the worker at write time:

* Keys written with an explicit ``retention=timedelta(...)`` expire after that duration from the time of the write.
* Keys written with ``retention=None`` (the default) pick up an expiry based on ``[state_store] default_retention_days``. If that value is ``> 0``, the key expires that many days after the write.
Expand All @@ -48,26 +48,26 @@ Running cleanup

The command is::

airflow state-store cleanup-task-store
airflow state-store cleanup-task-state-store

It reads ``[state_store] default_retention_days`` and ``[state_store] state_cleanup_batch_size`` from the ``airflow.cfg`` file, then deletes all eligible rows.

**Dry run**

Use ``--dry-run`` to preview what would be deleted without removing anything::

airflow state-store cleanup-task-store --dry-run
airflow state-store cleanup-task-state-store --dry-run

The output lists every row that would be deleted, grouped by dag, run, task, map index, and key.

**Batching**

By default (``state_cleanup_batch_size = 0``) all eligible rows are deleted in a single statement. On deployments with large ``task_store`` tables, set a batch size to reduce lock duration per transaction::
By default (``state_cleanup_batch_size = 0``) all eligible rows are deleted in a single statement. On deployments with large ``task_state_store`` tables, set a batch size to reduce lock duration per transaction::

# airflow.cfg
[state_store]
state_cleanup_batch_size = 10000

The command then deletes rows in batches of 10,000, committing after each batch, until no eligible rows remain.

How often to run cleanup depends on your write volume and the value of ``default_retention_days``. A weekly cleanup may be sufficient for most environments. For high-throughput pipelines that write task store entries on every task execution, consider running cleanup more frequently to keep the ``task_store`` table small.
How often to run cleanup depends on your write volume and the value of ``default_retention_days``. A weekly cleanup may be sufficient for most environments. For high-throughput pipelines that write task state store entries on every task execution, consider running cleanup more frequently to keep the ``task_state_store`` table small.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
specific language governing permissions and limitations
under the License.

.. _task-and-asset-store:
.. _task-and-asset-state-store:

Task and Asset Store Configuration
====================================
Task and Asset State Store Configuration
========================================

.. versionadded:: 3.3

The task and asset store is the persistence layer for :doc:`task store </core-concepts/task-store>` and :doc:`asset store </core-concepts/asset-store>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. Configuration (``[state_store]``), the CLI (``airflow state-store``), and the backend base class (:class:`~airflow.sdk.state.BaseStoreBackend`) all use the ``state_store`` name for this feature.
The task and asset state store is the persistence layer for :doc:`task state store </core-concepts/task-state-store>` and :doc:`asset state store </core-concepts/asset-state-store>`. By default, both are stored in the Airflow metadata database. This page describes the available configuration options, garbage-collection semantics, and how to provide a custom backend. Configuration (``[state_store]``), the CLI (``airflow state-store``), and the backend base class (:class:`~airflow.sdk.state.BaseStoreBackend`) all use the ``state_store`` name for this feature.

Configuration reference
-----------------------
Expand All @@ -31,7 +31,7 @@ All options live under the ``[state_store]`` section of ``airflow.cfg``.

.. note::

The config section is ``[state_store]``, **not** ``[task_store]``.
The config section is ``[state_store]``, **not** ``[task_state_store]``.

``backend``
~~~~~~~~~~~
Expand All @@ -46,11 +46,11 @@ Full dotted path to a class that implements :class:`~airflow.sdk.state.BaseStore
``default_retention_days``
~~~~~~~~~~~~~~~~~~~~~~~~~~

Number of days after which task store rows expire. When a key is written with no explicit retention, expires_at is computed on the worker as now + default_retention_days. Changing this setting does not affect already-written rows.
Number of days after which task state store rows expire. When a key is written with no explicit retention, expires_at is computed on the worker as now + default_retention_days. Changing this setting does not affect already-written rows.

* Set to ``0`` to disable time-based cleanup entirely.
* Default: ``30``.
* This setting does **not** apply to asset store rows.
* This setting does **not** apply to asset state store rows.

.. code-block:: ini

Expand All @@ -60,11 +60,11 @@ Number of days after which task store rows expire. When a key is written with no
``clear_on_success``
~~~~~~~~~~~~~~~~~~~~

When ``True``, all task store keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task store entries after success for observability (e.g. the submitted job ID or the last row count is still readable from the UI or REST API after the run completes).
When ``True``, all task state store keys for a task instance are automatically deleted when that task instance moves to the ``success`` state. Defaults to ``False``, which preserves task state store entries after success for observability (e.g. the submitted job ID or the last row count is still readable from the UI or REST API after the run completes).

.. important::

``clear_on_success`` clears **task store only**. It has no effect on asset store. Asset store is scoped to the asset rather than the task instance and must be cleared explicitly.
``clear_on_success`` clears **task state store only**. It has no effect on asset state store. Asset store is scoped to the asset rather than the task instance and must be cleared explicitly.

.. code-block:: ini

Expand All @@ -74,19 +74,19 @@ When ``True``, all task store keys for a task instance are automatically deleted
``state_cleanup_batch_size``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Number of rows deleted per batch during garbage collection cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_store`` tables to reduce lock contention.
Number of rows deleted per batch during garbage collection cleanup. Set to ``0`` (default) to delete all matching rows in a single statement. Tune this on deployments with large ``task_state_store`` tables to reduce lock contention.

.. code-block:: ini

[state_store]
state_cleanup_batch_size = 10000

.. _task-and-asset-store:worker-backends:
.. _task-and-asset-state-store:worker-backends:

Worker-side backend (``[workers] state_backend``)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A separate, optional config key under ``[workers]`` lets you route task store and asset store values through a worker-side backend before they reach the API server.
A separate, optional config key under ``[workers]`` lets you route task state store and asset state store values through a worker-side backend before they reach the API server.

.. code-block:: ini

Expand All @@ -99,19 +99,19 @@ When this is set, ``TaskStoreAccessor.set()`` calls ``serialize_task_store_to_re
Garbage collection semantics
-----------------------------

The cleanup task, also known as "garbage collection" is triggered using the Airflow CLI. The command to trigger the cleanup task is ``airflow state-store cleanup-task-store``. This process removes store rows according to the following rules:
The cleanup task, also known as "garbage collection" is triggered using the Airflow CLI. The command to trigger the cleanup task is ``airflow state-store cleanup-task-state-store``. This process removes store rows according to the following rules:

**Time-based expiry (task store only)**
**Time-based expiry (task state store only)**
Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server.

**``default_retention_days`` fallback (task store only)**
**``default_retention_days`` fallback (task state store only)**
Keys written with no explicit retention get an ``expires_at`` of now + default_retention_days computed at write time. Garbage collection deletes rows where ``expires_at < now()``."

**``NEVER_EXPIRE`` keys**
Keys set with ``retention=NEVER_EXPIRE`` are stored with ``expires_at = NULL`` and a flag that tells the garbage collection to skip them unconditionally. They are never deleted by time-based cleanup, regardless of ``default_retention_days``.

**``on_delete=CASCADE`` (asset store)**
When an asset is deleted, all corresponding asset store rows for that asset are deleted.
**``on_delete=CASCADE`` (asset state store)**
When an asset is deleted, all corresponding asset state store rows for that asset are deleted.

.. important::

Expand Down Expand Up @@ -157,7 +157,7 @@ Override four serialization hooks from :class:`~airflow.sdk.state.BaseStoreBacke

* ``serialize_task_store_to_ref``: called by ``TaskStoreAccessor.set()`` before the value is sent to the Execution API; return a compact reference string (e.g. an S3 key) to be stored in the database instead of the raw value.
* ``deserialize_task_store_from_ref``: called by ``TaskStoreAccessor.get()`` after retrieving the reference from the backend; return the actual value.
* ``serialize_asset_store_to_ref``: same as the task variant but for asset store; receives the asset scope as ``scope`` (an :class:`~airflow.sdk.state.AssetScope` with ``name`` and/or ``uri``).
* ``serialize_asset_store_to_ref``: same as the task variant but for asset state store; receives the asset scope as ``scope`` (an :class:`~airflow.sdk.state.AssetScope` with ``name`` and/or ``uri``).
* ``deserialize_asset_store_from_ref``: called by ``AssetStoreAccessor.get()`` to resolve the stored reference back to the actual value.

.. important::
Expand Down
Loading
Loading