diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py index 306781daba210..230dc55d18d84 100644 --- a/airflow/datasets/manager.py +++ b/airflow/datasets/manager.py @@ -114,7 +114,7 @@ def register_dataset_change( session.add(dataset_event) dags_to_queue_from_dataset = { - ref.dag for ref in dataset_model.consuming_dags if ref.dag.is_active and not ref.dag.is_paused + ref.dag for ref in dataset_model.consuming_dags if not ref.dag.is_paused } dags_to_queue_from_dataset_alias = set() if source_alias_names: @@ -133,9 +133,7 @@ def register_dataset_change( session.add(dsa) dags_to_queue_from_dataset_alias |= { - alias_ref.dag - for alias_ref in dsa.consuming_dags - if alias_ref.dag.is_active and not alias_ref.dag.is_paused + alias_ref.dag for alias_ref in dsa.consuming_dags if not alias_ref.dag.is_paused } dags_to_reparse = dags_to_queue_from_dataset_alias - dags_to_queue_from_dataset diff --git a/tests/datasets/test_manager.py b/tests/datasets/test_manager.py index cf36c8a8e3a9d..9eb98734faf9d 100644 --- a/tests/datasets/test_manager.py +++ b/tests/datasets/test_manager.py @@ -22,7 +22,7 @@ from unittest import mock import pytest -from sqlalchemy import delete +from sqlalchemy import delete, func, select from airflow.datasets import Dataset, DatasetAlias from airflow.datasets.manager import DatasetManager @@ -46,7 +46,7 @@ pytest.importorskip("pydantic", minversion="2.0.0") -@pytest.fixture +@pytest.fixture(autouse=True) def clear_datasets(): from tests.test_utils.db import clear_db_datasets @@ -55,6 +55,13 @@ def clear_datasets(): clear_db_datasets() +@pytest.fixture(autouse=True) +def clean_listener_manager(): + dataset_listener.clear() + yield + dataset_listener.clear() + + @pytest.fixture def mock_task_instance(): return TaskInstancePydantic( @@ -145,7 +152,6 @@ def test_register_dataset_change(self, session, dag_maker, mock_task_instance): assert session.query(DatasetDagRunQueue).count() == 2 @pytest.mark.skip_if_database_isolation_mode - @pytest.mark.usefixtures("clear_datasets") def test_register_dataset_change_with_alias(self, session, dag_maker, mock_task_instance): consumer_dag_1 = DagModel(dag_id="conumser_1", is_active=True, fileloc="dag1.py") consumer_dag_2 = DagModel(dag_id="conumser_2", is_active=True, fileloc="dag2.py") @@ -197,7 +203,6 @@ def test_register_dataset_change_no_downstreams(self, session, mock_task_instanc @pytest.mark.skip_if_database_isolation_mode def test_register_dataset_change_notifies_dataset_listener(self, session, mock_task_instance): dsem = DatasetManager() - dataset_listener.clear() get_listener_manager().add_listener(dataset_listener) ds = Dataset(uri="test_dataset_uri_2") @@ -218,7 +223,6 @@ def test_register_dataset_change_notifies_dataset_listener(self, session, mock_t @pytest.mark.skip_if_database_isolation_mode def test_create_datasets_notifies_dataset_listener(self, session): dsem = DatasetManager() - dataset_listener.clear() get_listener_manager().add_listener(dataset_listener) dsm = DatasetModel(uri="test_dataset_uri_3") @@ -228,3 +232,32 @@ def test_create_datasets_notifies_dataset_listener(self, session): # Ensure the listener was notified assert len(dataset_listener.created) == 1 assert dataset_listener.created[0].uri == dsm.uri + + @pytest.mark.skip_if_database_isolation_mode + def test_register_dataset_change_queues_stale_dag(self, session, mock_task_instance): + dsem = DatasetManager() + + dsm = DatasetModel(uri="test_dataset_uri_3") + session.add(dsm) + + # Setup a dag that is STALE but NOT PAUSED + # We want stale dags to still receive updates + stale_dag = DagModel(dag_id="stale_dag", is_active=False, is_paused=False) + session.add(stale_dag) + + # Link stale dags to the dataset + dsm.consuming_dags = [DagScheduleDatasetReference(dag_id=stale_dag.dag_id)] + + session.execute(delete(DatasetDagRunQueue)) + session.flush() + + dsem.register_dataset_change( + task_instance=mock_task_instance, + dataset=Dataset(dsm.uri), + session=session, + ) + session.flush() + + # Verify the stale Dag was NOT ignored + assert session.scalar(select(func.count()).select_from(DatasetDagRunQueue)) == 1 + assert session.scalar(select(DatasetDagRunQueue.target_dag_id)) == "stale_dag" diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index abf394743ee86..e034e4d401b2d 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -4479,7 +4479,10 @@ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable, session=session, ) assert session.scalars(dse_q).one().source_run_id == dr1.run_id - assert session.scalars(ddrq_q).one_or_none() is None + if "is_active" in disable: + assert session.scalars(ddrq_q).one_or_none() is not None + else: + assert session.scalars(ddrq_q).one_or_none() is None # Simulate the consumer DAG being enabled. session.execute(update(DagModel).where(DagModel.dag_id == "consumer").values(**enable)) diff --git a/tests/listeners/test_dataset_listener.py b/tests/listeners/test_dataset_listener.py index b0ac6223e79ea..2e9669d836d6e 100644 --- a/tests/listeners/test_dataset_listener.py +++ b/tests/listeners/test_dataset_listener.py @@ -24,6 +24,7 @@ from airflow.operators.empty import EmptyOperator from airflow.utils.session import provide_session from tests.listeners import dataset_listener +from tests.test_utils.db import clear_db_datasets @pytest.fixture(autouse=True) @@ -37,6 +38,13 @@ def clean_listener_manager(): dataset_listener.clear() +@pytest.fixture(autouse=True) +def clear_datasets(): + clear_db_datasets() + yield + clear_db_datasets() + + @pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode @pytest.mark.db_test @provide_session