Skip to content

AIP-103: Allow for get/set/delete/clear to run for AssetStateStoreAccessor on Triggerer#68900

Merged
amoghrajesh merged 8 commits into
apache:mainfrom
jroachgolf84:feature/trigger-allow-accessor-get-set
Jun 25, 2026
Merged

AIP-103: Allow for get/set/delete/clear to run for AssetStateStoreAccessor on Triggerer#68900
amoghrajesh merged 8 commits into
apache:mainfrom
jroachgolf84:feature/trigger-allow-accessor-get-set

Conversation

@jroachgolf84

@jroachgolf84 jroachgolf84 commented Jun 23, 2026

Copy link
Copy Markdown
Collaborator

Description

When attempting to use the Asset State Store from within a BaseEventTrigger, the following exception was thrown:

Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 890, in handle_requests
    msg = self.decoder.validate_python(self._deserialize_request(request.body))
  File "/usr/python/lib/python3.10/site-packages/pydantic/type_adapter.py", line 441, in validate_python
    return self.validator.validate_python(

pydantic_core._pydantic_core.ValidationError: 1 validation error for tagged-union[
  TriggerStateChanges, GetConnection, DeleteVariable, GetVariable, GetVariableKeys,
  PutVariable, DeleteXCom, GetXCom, SetXCom, GetTICount, GetTaskStates, GetDagRunState,
  GetDRCount, GetPreviousTI, GetHITLDetailResponse, UpdateHITLDetail, MaskSecret
]

  Input tag 'GetAssetStateStoreByName' found using 'type' does not match any of the expected tags:
  'TriggerStateChanges', 'GetConnection', 'DeleteVariable', 'GetVariable', 'GetVariableKeys',
  'PutVariable', 'DeleteXCom', 'GetXCom', 'SetXCom', 'GetTICount', 'GetTaskStates',
  'GetDagRunState', 'GetDRCount', 'GetPreviousTI', 'GetHITLDetailResponse', 'UpdateHITLDetail',
  'MaskSecret'

  [type=union_tag_invalid, input_value={'name': 'generic_asset', ... 'type': 'GetAssetStateStoreByName'}, input_type=dict]

This was called out as something that needed to be addressed by @vikramkoka in #67839. This PR addresses this error by adding a pathway for each of the 8 operations that the Asset State Store can perform.

related: #67839

Testing

Changes were tested using both unit-tests, as well as E2E. Unit-tests can be run using the below commands:

breeze testing core-tests airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_asset_state_store.py

breeze testing core-tests airflow-core/tests/unit/jobs/test_triggerer_job.py

E2E Testing Example

To test this E2E, a custom BaseEventTrigger was written to test get, set, delete, and clear operations. The DAG that uses this Trigger executes successfully E2E, and the Triggered contains all of the expected logging statements.

from airflow.triggers.base import BaseEventTrigger, TriggerEvent
from collections.abc import AsyncIterator
from typing import Any

import asyncio
import logging
import random


class GenericEventTrigger(BaseEventTrigger):
    def __init__(
        self,
        random_number,
        waiter_delay,
        asset_name,
        **kwargs
    ):
        super().__init__(**kwargs)

        self.random_number = random_number
        self.waiter_delay = waiter_delay
        self.asset_name = asset_name

    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serialize the Trigger, including the func, params, and waiter_delay."""
        return (
            self.__class__.__module__ + "." + self.__class__.__qualname__,
            {
                "random_number": self.random_number,
                "waiter_delay": self.waiter_delay,
                "asset_name": self.asset_name,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Logic that fires a TriggerEvent."""
        asset_state_store = self.asset_state_store
        logging.info(f"asset_state_store: {asset_state_store}")

        # Clearing state store before starting (in case there is anything there)
        asset_state_store.clear()
        logging.info(f"cleared asset_state_store")

        while True:
            # Get the previous number
            last_result = asset_state_store.get("result", -1)
            logging.info(f"last_result: {last_result}")

            # Create a new number
            result = random.randint(0, 5)
            asset_state_store.set("result", result)
            logging.info(f"result: {result}")

            if result == self.random_number:
                logging.info("yield'ing TriggerEvent")
                yield TriggerEvent({"status": "success", "result": result})

                # Clear the state_store
                asset_state_store.delete(key="result")  # Should this work?
                logging.info(f"deleted 'result' key for asset_state_store")
                break

            logging.info(f"Sleeping for {self.waiter_delay} seconds")
            await asyncio.sleep(self.waiter_delay)

…r to run for AssetStateStoreAccessor on Triggerer
@jroachgolf84 jroachgolf84 changed the title Allow for get/set/delete/clear to run for AssetStateStoreAccessor on Triggerer AIP-103: Allow for get/set/delete/clear to run for AssetStateStoreAccessor on Triggerer Jun 23, 2026
@vatsrahul1001 vatsrahul1001 added this to the Airflow 3.3.0 milestone Jun 23, 2026

@amoghrajesh amoghrajesh left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more work is needed.

Comment thread airflow-core/src/airflow/jobs/triggerer_job_runner.py Outdated
Comment thread airflow-core/src/airflow/jobs/triggerer_job_runner.py Outdated
Comment thread airflow-core/src/airflow/jobs/triggerer_job_runner.py Outdated
Comment thread airflow-core/tests/unit/jobs/test_triggerer_job.py Outdated
Comment thread airflow-core/src/airflow/jobs/triggerer_job_runner.py
@jroachgolf84 jroachgolf84 requested a review from amoghrajesh June 23, 2026 13:56
@jroachgolf84

Copy link
Copy Markdown
Collaborator Author

@amoghrajesh - changes have been implemented.

Comment thread task-sdk/src/airflow/sdk/execution_time/request_handlers.py

@amoghrajesh amoghrajesh left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR makes self.asset_state_store actually functional inside BaseEventTrigger.run(), so it is worth documenting on the asset-state-store.rst page.

Right now theres nothing explaining how to use self.asset_state_store from a trigger, that the triggerer injects it before run() is called. The example in the PR description is a good starting point.

Comment thread airflow-core/tests/unit/jobs/test_triggerer_job.py
Comment thread task-sdk/tests/task_sdk/execution_time/test_request_handlers.py
@jroachgolf84

Copy link
Copy Markdown
Collaborator Author

@amoghrajesh - all feedback has been implemented!

@amoghrajesh amoghrajesh left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @jroachgolf84! Looks good.

Comment thread task-sdk/tests/task_sdk/execution_time/test_request_handlers.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py Outdated
Comment thread airflow-core/docs/core-concepts/asset-state-store.rst Outdated
Comment thread airflow-core/docs/core-concepts/asset-state-store.rst Outdated
@amoghrajesh

Copy link
Copy Markdown
Contributor

Actually let me just apply the suggestions.

Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
@amoghrajesh amoghrajesh added the backport-to-v3-3-test Backport to v3-3-test label Jun 25, 2026
@amoghrajesh amoghrajesh merged commit feea09d into apache:main Jun 25, 2026
102 checks passed
@github-actions

Copy link
Copy Markdown
Contributor

Backport successfully created: v3-3-test

Note: As of Merging PRs targeted for Airflow 3.X
the committer who merges the PR is responsible for backporting the PRs that are bug fixes (generally speaking) to the maintenance branches.

In matter of doubt please ask in #release-management Slack channel.

Status Branch Result
v3-3-test PR Link

github-actions Bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jun 25, 2026
…r `AssetStateStoreAccessor` on Triggerer (apache#68900)

(cherry picked from commit feea09d)

Co-authored-by: Jake McGrath <116606359+jroachgolf84@users.noreply.github.com>
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
aws-airflow-bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jun 25, 2026
…r `AssetStateStoreAccessor` on Triggerer (apache#68900)

(cherry picked from commit feea09d)

Co-authored-by: Jake McGrath <116606359+jroachgolf84@users.noreply.github.com>
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
potiuk pushed a commit that referenced this pull request Jun 26, 2026
…r `AssetStateStoreAccessor` on Triggerer (#68900) (#68966)

(cherry picked from commit feea09d)

Co-authored-by: Jake McGrath <116606359+jroachgolf84@users.noreply.github.com>
Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
karenbraganz pushed a commit to karenbraganz/airflow that referenced this pull request Jun 30, 2026
…eStoreAccessor` on Triggerer (apache#68900)

Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk area:Triggerer backport-to-v3-3-test Backport to v3-3-test

Projects

Development

Successfully merging this pull request may close these issues.

4 participants