diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index a827a8a01e053..9ac07b372b66a 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -249,6 +249,8 @@ def on_task_instance_success( self.log.debug("OpenLineage listener got notification about task instance success") if isinstance(task_instance, TaskInstance): + # On AF3 we still get DB TaskInstance model when task instance state is changed manually + # (via UI or API). The listener is called on API server so we do not have task and dag models. self._on_task_instance_manual_state_change( ti=task_instance, dagrun=task_instance.dag_run, @@ -382,6 +384,12 @@ def on_task_instance_failed( self.log.debug("OpenLineage listener got notification about task instance failure") if isinstance(task_instance, TaskInstance): + # There are two cases where on AF3 we still get DB TaskInstance model: + # 1. when task instance state is changed manually (via UI or API, models.patch_task_instance + # endpoint). The listener is called on API server so we do not have task and dag models. + # 2. `process_executor_events` method on scheduler, where the external state change is handled + # https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally + # In second case, we still should not run user code, but at least we have access to operator self._on_task_instance_manual_state_change( ti=task_instance, dagrun=task_instance.dag_run, @@ -645,6 +653,24 @@ def _on_task_instance_manual_state_change( self.log.debug("`_on_task_instance_manual_state_change` was called with state: `%s`.", ti_state) end_date = timezone.utcnow() + task = getattr(ti, "task") # on scheduler, we should have access to task + if task and is_operator_disabled(task): + self.log.debug( + "Skipping OpenLineage event emission for operator `%s` " + "due to its presence in [openlineage] disabled_for_operators.", + task.task_type, + ) + return + + if task and not is_selective_lineage_enabled(task): + self.log.debug( + "Skipping OpenLineage event emission for task `%s` " + "due to lack of explicit lineage enablement for task or DAG while " + "[openlineage] selective_enable is on.", + ti.task_id, + ) + return + @print_warning(self.log) def on_state_change(): date = dagrun.logical_date or dagrun.run_after @@ -662,23 +688,45 @@ def on_state_change(): map_index=ti.map_index, ) + data_interval_start = dagrun.data_interval_start + if isinstance(data_interval_start, datetime): + data_interval_start = data_interval_start.isoformat() + data_interval_end = dagrun.data_interval_end + if isinstance(data_interval_end, datetime): + data_interval_end = data_interval_end.isoformat() + + dag_tags, owners, doc, doc_type = None, None, None, None + airflow_run_facet = {} + if task: # on scheduler, we should have access to task + doc, doc_type = get_task_documentation(task) + dag = getattr(task, "dag") + if dag: + if not doc: + doc, doc_type = get_dag_documentation(dag) + + dag_tags = dag.tags + owners = [x.strip() for x in (task if task.owner != "airflow" else dag).owner.split(",")] + + airflow_run_facet = get_airflow_run_facet(dagrun, dag, ti, task, task_uuid) + adapter_kwargs = { "run_id": task_uuid, "job_name": get_job_name(ti), "end_time": end_date.isoformat(), "task": OperatorLineage(), - "nominal_start_time": None, - "nominal_end_time": None, - "tags": None, - "owners": None, - "job_description": None, - "job_description_type": None, + "nominal_start_time": data_interval_start, + "nominal_end_time": data_interval_end, + "tags": dag_tags, + "owners": owners, + "job_description": doc, + "job_description_type": doc_type, "run_facets": { **get_task_parent_run_facet( parent_run_id=parent_run_id, parent_job_name=ti.dag_id, dr_conf=getattr(dagrun, "conf", {}), ), + **airflow_run_facet, **get_airflow_debug_facet(), }, } diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index 705775f258f8e..8c44a920ebcf7 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -1085,6 +1085,7 @@ def _create_listener_and_task_instance( listener, task_instance = _create_listener_and_task_instance() # Now you can use listener and task_instance in your tests to simulate their interaction. """ + from airflow.sdk.definitions.dag import DAG if not runtime_ti: # TaskInstance is used when on API server (when listener gets called about manual state change) @@ -1097,15 +1098,27 @@ def _create_listener_and_task_instance( ) else: task_instance = TaskInstance(task=MagicMock(), dag_version_id=uuid7()) + + dag = DAG( + dag_id="dag_id_from_dag_not_ti", + description="Test DAG Description", + tags=["tag1", "tag2"], + ) + task = EmptyOperator( + task_id="task_id_from_task_not_ti", dag=dag, owner="task_owner", doc_md="TASK Description" + ) + task_instance.dag_run = DagRun() task_instance.dag_run.dag_id = "dag_id_from_dagrun_and_not_ti" task_instance.dag_run.run_id = "dag_run_run_id" task_instance.dag_run.clear_number = 0 task_instance.dag_run.logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1) task_instance.dag_run.run_after = timezone.datetime(2020, 1, 1, 1, 1, 1) + task_instance.dag_run.data_interval_start = timezone.datetime(2020, 1, 1, 1, 1, 1) + task_instance.dag_run.data_interval_end = timezone.datetime(2020, 1, 1, 1, 1, 1) task_instance.dag_run.state = DagRunState.RUNNING - task_instance.task = None - task_instance.dag = None + task_instance.task = task # type: ignore[assignment] # For testing we'll avoid serialization + task_instance.dag = dag task_instance.task_id = "task_id" task_instance.dag_id = "dag_id" task_instance.try_number = 1 @@ -1119,7 +1132,6 @@ def _create_listener_and_task_instance( TaskInstance as SdkTaskInstance, TIRunContext, ) - from airflow.sdk.definitions.dag import DAG from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance dag = DAG( @@ -1447,6 +1459,7 @@ def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty @mock.patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.emit") @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_debug_facet") + @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call @@ -1454,6 +1467,7 @@ def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_model( self, mock_get_task_parent_run_facet, + mock_get_airflow_run_facet, mock_debug_facet, mock_debug_mode, mock_emit, @@ -1467,6 +1481,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False) listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False) + mock_get_airflow_run_facet.return_value = {"airflow": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1483,14 +1498,15 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=OperatorLineage(), - nominal_start_time=None, - nominal_end_time=None, - tags=None, - owners=None, - job_description=None, - job_description_type=None, + nominal_start_time="2020-01-01T01:01:01+00:00", + nominal_end_time="2020-01-01T01:01:01+00:00", + tags={"tag1", "tag2"}, + owners=["task_owner"], + job_description="TASK Description", + job_description_type="text/markdown", run_facets={ "parent": 4, + "airflow": 3, "debug": "packages", }, error=err, @@ -1645,6 +1661,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False) listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False) + delattr(task_instance, "task") # Test api server path, where task is not available mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1661,8 +1678,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=OperatorLineage(), - nominal_start_time=None, - nominal_end_time=None, + nominal_start_time="2020-01-01T01:01:01+00:00", + nominal_end_time="2020-01-01T01:01:01+00:00", tags=None, owners=None, job_description=None, @@ -1851,6 +1868,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False) listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False) + delattr(task_instance, "task") # Test api server path, where task is not available mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1867,8 +1885,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=OperatorLineage(), - nominal_start_time=None, - nominal_end_time=None, + nominal_start_time="2020-01-01T01:01:01+00:00", + nominal_end_time="2020-01-01T01:01:01+00:00", tags=None, owners=None, job_description=None,