From 6f47f36e160a24f3fd3567a3d914e609b4baf038 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 14:46:50 +0800 Subject: [PATCH 1/8] test: simplify test_partitioned_dag_run_with_customized_mapper --- .../tests/unit/jobs/test_scheduler_job.py | 43 +++---------------- 1 file changed, 6 insertions(+), 37 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 8f8a686ddf5ed..5e31becd95f81 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8786,46 +8786,15 @@ def test_partitioned_dag_run_with_customized_mapper( runner = SchedulerJobRunner( job=Job(job_type=SchedulerJobRunner.job_type, executor=MockExecutor(do_update=False)) ) - - with dag_maker(dag_id="asset-event-producer", schedule=None, session=session) as dag: - EmptyOperator(task_id="hi", outlets=[asset_1]) - - dr = dag_maker.create_dagrun(partition_key="this-is-not-key-1-before-mapped", session=session) - [ti] = dr.get_task_instances(session=session) - session.commit() - - serialized_outlets = dag.get_task("hi").outlets with custom_partition_mapper_patch(): - TaskInstance.register_asset_changes_in_db( - ti=ti, - task_outlets=[o.asprofile() for o in serialized_outlets], - outlet_events=[], + apdr = _produce_and_register_asset_event( + dag_id="asset-event-producer", + asset=asset_1, + partition_key="this-is-not-key-1-before-mapped", session=session, + dag_maker=dag_maker, + expected_partition_key="key-1" ) - session.commit() - - event = session.scalar( - select(AssetEvent).where( - AssetEvent.source_dag_id == dag.dag_id, - AssetEvent.source_run_id == dr.run_id, - ) - ) - assert event is not None - assert event.partition_key == "this-is-not-key-1-before-mapped" - - apdr = session.scalar( - select(AssetPartitionDagRun) - .join( - PartitionedAssetKeyLog, - PartitionedAssetKeyLog.asset_partition_dag_run_id == AssetPartitionDagRun.id, - ) - .where(PartitionedAssetKeyLog.asset_event_id == event.id) - ) - assert apdr is not None - assert apdr.created_dag_run_id is None - assert apdr.partition_key == "key-1" - - with custom_partition_mapper_patch(): partition_dags = runner._create_dagruns_for_partitioned_asset_dags(session=session) session.refresh(apdr) # Since asset event for Asset(name="asset-2") with key "key-1" has not yet been created, From 7621bd373a100232601cecc14dd5909c77c81e64 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 14:57:27 +0800 Subject: [PATCH 2/8] test(scheduler_job): improve APDR test cases to include consumed_asset_event validation --- .../tests/unit/jobs/test_scheduler_job.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 5e31becd95f81..1bf4c518929ff 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8803,6 +8803,14 @@ def test_partitioned_dag_run_with_customized_mapper( assert len(partition_dags) == 1 assert partition_dags == {"asset-event-consumer"} + asset_event = session.scalar( + select(DagRun).where(DagRun.id == apdr.created_dag_run_id) + ).consumed_asset_events[0] + + assert asset_event.source_task_id == "hi" + assert asset_event.source_dag_id == "asset-event-producer" + assert asset_event.source_run_id == "test" + @pytest.mark.need_serialized_dag @pytest.mark.usefixtures("clear_asset_partition_rows") @@ -8877,6 +8885,15 @@ def test_consumer_dag_listen_to_two_partitioned_asset( assert partition_dags == {"asset-event-consumer"} + + for asset_event in session.scalar( + select(DagRun).where(DagRun.id == apdr.created_dag_run_id) + ).consumed_asset_events: + assert asset_event.source_task_id == "hi" + assert "asset-event-producer-" in asset_event.source_dag_id + assert asset_event.source_run_id == "test" + + @pytest.mark.need_serialized_dag @pytest.mark.usefixtures("clear_asset_partition_rows") def test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper( @@ -8940,3 +8957,10 @@ def test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper( assert apdr.created_dag_run_id is not None assert len(partition_dags) == 1 assert partition_dags == {"asset-event-consumer"} + + asset_event = session.scalar( + select(DagRun).where(DagRun.id == apdr.created_dag_run_id) + ).consumed_asset_events[0] + assert asset_event.source_task_id == "hi" + assert asset_event.source_dag_id == "asset-event-producer-2" + assert asset_event.source_run_id == "test" From e490e4d5e8af7d662daf14fe7aa890ba6e8f7a8e Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 09:35:39 +0800 Subject: [PATCH 3/8] fix(APDR): add asset events to partitioned DagRun --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index ee59d3f5156a3..74f6fdcb039de 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1843,6 +1843,13 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st creating_job_id=self.job.id, session=session, ) + asset_events = session.scalars( + select(AssetEvent).where( + PartitionedAssetKeyLog.asset_partition_dag_run_id == apdr.id, + PartitionedAssetKeyLog.asset_event_id == AssetEvent.id + ) + ) + dag_run.consumed_asset_events.extend(asset_events) session.flush() apdr.created_dag_run_id = dag_run.id session.flush() From 289e382519ca0d1d0d9608acea788d7ba0a0e83c Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 15:57:44 +0800 Subject: [PATCH 4/8] fixup! fix(APDR): add asset events to partitioned DagRun --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 74f6fdcb039de..8c31d68efe40f 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1846,7 +1846,7 @@ def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> set[st asset_events = session.scalars( select(AssetEvent).where( PartitionedAssetKeyLog.asset_partition_dag_run_id == apdr.id, - PartitionedAssetKeyLog.asset_event_id == AssetEvent.id + PartitionedAssetKeyLog.asset_event_id == AssetEvent.id, ) ) dag_run.consumed_asset_events.extend(asset_events) From f93e6fe79d9bd9d59f97e0275c238739d029943d Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 15:57:44 +0800 Subject: [PATCH 5/8] fixup! test: simplify test_partitioned_dag_run_with_customized_mapper --- airflow-core/tests/unit/jobs/test_scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 1bf4c518929ff..ac15b3eb6301e 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8793,7 +8793,7 @@ def test_partitioned_dag_run_with_customized_mapper( partition_key="this-is-not-key-1-before-mapped", session=session, dag_maker=dag_maker, - expected_partition_key="key-1" + expected_partition_key="key-1", ) partition_dags = runner._create_dagruns_for_partitioned_asset_dags(session=session) session.refresh(apdr) From dfbdb42f06d96a42596198a255d0437fc16a794f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 15:57:44 +0800 Subject: [PATCH 6/8] fixup! test(scheduler_job): improve APDR test cases to include consumed_asset_event validation --- airflow-core/tests/unit/jobs/test_scheduler_job.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index ac15b3eb6301e..0f45016d870ec 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8803,10 +8803,9 @@ def test_partitioned_dag_run_with_customized_mapper( assert len(partition_dags) == 1 assert partition_dags == {"asset-event-consumer"} - asset_event = session.scalar( - select(DagRun).where(DagRun.id == apdr.created_dag_run_id) - ).consumed_asset_events[0] - + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + asset_event = dag_run.consumed_asset_events[0] assert asset_event.source_task_id == "hi" assert asset_event.source_dag_id == "asset-event-producer" assert asset_event.source_run_id == "test" From 1548a813df821eaa34bcea41ea3bbed23b1b7eb6 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 15:57:44 +0800 Subject: [PATCH 7/8] fixup! test(scheduler_job): improve APDR test cases to include consumed_asset_event validation --- airflow-core/tests/unit/jobs/test_scheduler_job.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 0f45016d870ec..926b141e63c71 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8883,11 +8883,9 @@ def test_consumer_dag_listen_to_two_partitioned_asset( assert len(partition_dags) == 1 assert partition_dags == {"asset-event-consumer"} - - - for asset_event in session.scalar( - select(DagRun).where(DagRun.id == apdr.created_dag_run_id) - ).consumed_asset_events: + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + for asset_event in dag_run.consumed_asset_events: assert asset_event.source_task_id == "hi" assert "asset-event-producer-" in asset_event.source_dag_id assert asset_event.source_run_id == "test" From ceba945841e7981c7cd6447494ab09b98dc07dff Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 4 Feb 2026 15:57:44 +0800 Subject: [PATCH 8/8] fixup! test(scheduler_job): improve APDR test cases to include consumed_asset_event validation --- airflow-core/tests/unit/jobs/test_scheduler_job.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 926b141e63c71..f59768cb4afcd 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8955,9 +8955,9 @@ def test_consumer_dag_listen_to_two_partitioned_asset_with_key_1_mapper( assert len(partition_dags) == 1 assert partition_dags == {"asset-event-consumer"} - asset_event = session.scalar( - select(DagRun).where(DagRun.id == apdr.created_dag_run_id) - ).consumed_asset_events[0] - assert asset_event.source_task_id == "hi" - assert asset_event.source_dag_id == "asset-event-producer-2" - assert asset_event.source_run_id == "test" + dag_run = session.scalar(select(DagRun).where(DagRun.id == apdr.created_dag_run_id)) + assert dag_run is not None + for asset_event in dag_run.consumed_asset_events: + assert asset_event.source_task_id == "hi" + assert "asset-event-producer-" in asset_event.source_dag_id + assert asset_event.source_run_id == "test"