Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,10 @@
OPENLINEAGE_PARENT_JOB_EXAMPLE_SPARK_PROPERTIES = {
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.parentRunId": "11111111-1111-1111-1111-111111111111",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentRunId": "22222222-2222-2222-2222-222222222222",
}


Expand Down Expand Up @@ -1430,13 +1430,15 @@ def test_dataproc_operator_execute_async_done_before_defer(self, mock_submit_job
op.execute(context=self.mock_context)
assert not mock_defer.called

@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_parent_job_info_injection(
self, mock_hook, mock_ol_accessible, mock_static_uuid
self, mock_hook, mock_ol_accessible, task_ol_run_id, dag_ol_run_id
):
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
job_config = {
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {
Expand All @@ -1456,10 +1458,10 @@ def test_execute_openlineage_parent_job_info_injection(
"spark.openlineage.transport.type": "console",
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.parentRunId": "11111111-1111-1111-1111-111111111111",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentRunId": "22222222-2222-2222-2222-222222222222",
},
},
}
Expand Down Expand Up @@ -1499,15 +1501,17 @@ def test_execute_openlineage_parent_job_info_injection(
metadata=METADATA,
)

@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_http_transport_info_injection(
self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, dag_ol_run_id
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
Expand Down Expand Up @@ -1556,15 +1560,17 @@ def test_execute_openlineage_http_transport_info_injection(
metadata=METADATA,
)

@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_all_info_injection(
self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, dag_ol_run_id
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
Expand Down Expand Up @@ -2591,14 +2597,16 @@ def test_wait_for_operation_on_execute(self, mock_hook):
)
mock_op.return_value.result.assert_not_called()

@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_parent_job_info_injection(
self, mock_hook, mock_ol_accessible, mock_static_uuid
self, mock_hook, mock_ol_accessible, task_ol_run_id, dag_ol_run_id
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
template = {
**WORKFLOW_TEMPLATE,
"jobs": [
Expand Down Expand Up @@ -2643,10 +2651,10 @@ def test_execute_openlineage_parent_job_info_injection(
"spark.sql.shuffle.partitions": "1",
"spark.openlineage.parentJobName": "dag_id.task_id",
"spark.openlineage.parentJobNamespace": "default",
"spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.parentRunId": "11111111-1111-1111-1111-111111111111",
"spark.openlineage.rootParentJobName": "dag_id",
"spark.openlineage.rootParentJobNamespace": "default",
"spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
"spark.openlineage.rootParentRunId": "22222222-2222-2222-2222-222222222222",
},
},
},
Expand Down Expand Up @@ -2784,15 +2792,17 @@ def test_execute_openlineage_parent_job_info_injection_skipped_when_ol_not_acces
metadata=METADATA,
)

@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_transport_info_injection(
self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, dag_ol_run_id
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
Expand Down Expand Up @@ -2892,15 +2902,17 @@ def test_execute_openlineage_transport_info_injection(
metadata=METADATA,
)

@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_all_info_injection(
self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, dag_ol_run_id
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
Expand Down Expand Up @@ -3419,7 +3431,8 @@ def test_execute_batch_already_exists_cancelled(self, mock_hook, mock_log):
mock_log.info.assert_any_call("Batch with given id already exists.")

@mock.patch.object(DataprocCreateBatchOperator, "log", new_callable=mock.MagicMock)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
Expand All @@ -3428,11 +3441,13 @@ def test_execute_openlineage_parent_job_info_injection(
mock_hook,
to_dict_mock,
mock_ol_accessible,
mock_static_uuid,
task_ol_run_id,
dag_ol_run_id,
mock_log,
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
expected_batch = {
**BATCH,
"labels": EXPECTED_LABELS,
Expand Down Expand Up @@ -3474,16 +3489,25 @@ def test_execute_openlineage_parent_job_info_injection(
mock_log.info.assert_any_call("Batch job %s completed.\nDriver logs: %s", BATCH_ID, logs_link)

@mock.patch.object(DataprocCreateBatchOperator, "log", new_callable=mock.MagicMock)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_transport_info_injection(
self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener, mock_static_uuid, mock_log
self,
mock_hook,
to_dict_mock,
mock_ol_accessible,
mock_ol_listener,
task_ol_run_id,
dag_ol_run_id,
mock_log,
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
Expand Down Expand Up @@ -3533,16 +3557,18 @@ def test_execute_openlineage_transport_info_injection(
logs_link,
)

@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
@mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_openlineage_all_info_injection(
self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener, mock_static_uuid
self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener, task_ol_run_id, dag_ol_run_id
):
mock_ol_accessible.return_value = True
mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
fake_listener = mock.MagicMock()
mock_ol_listener.return_value = fake_listener
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = HttpTransport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@
ownership_job,
tags_job,
)
from openlineage.client.uuid import generate_static_uuid

from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
build_dag_run_ol_run_id,
build_task_instance_ol_run_id,
get_airflow_debug_facet,
get_airflow_state_run_facet,
get_dag_job_dependency_facet,
get_processing_engine_facet,
)
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -123,12 +125,7 @@ def _read_yaml_config(path: str) -> dict | None:

@staticmethod
def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number: int) -> str:
return str(
generate_static_uuid(
instant=logical_date,
data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
)
)
return build_dag_run_ol_run_id(dag_id=dag_id, logical_date=logical_date, clear_number=clear_number)

@staticmethod
def build_task_instance_run_id(
Expand All @@ -138,11 +135,12 @@ def build_task_instance_run_id(
logical_date: datetime,
map_index: int,
):
return str(
generate_static_uuid(
instant=logical_date,
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
)
return build_task_instance_ol_run_id(
dag_id=dag_id,
task_id=task_id,
try_number=try_number,
logical_date=logical_date,
map_index=map_index,
)

def emit(self, event: RunEvent):
Expand Down Expand Up @@ -365,6 +363,7 @@ def fail_task(
def dag_started(
self,
dag_id: str,
run_id: str,
logical_date: datetime,
start_date: datetime,
nominal_start_time: str | None,
Expand All @@ -374,10 +373,14 @@ def dag_started(
run_facets: dict[str, RunFacet],
clear_number: int,
job_description: str | None,
is_asset_triggered: bool,
job_description_type: str | None = None,
job_facets: dict[str, JobFacet] | None = None, # Custom job facets
):
try:
job_dependency_facet = {}
if is_asset_triggered:
job_dependency_facet = get_dag_job_dependency_facet(dag_id=dag_id, dag_run_id=run_id)
event = RunEvent(
eventType=RunState.START,
eventTime=start_date.isoformat(),
Expand All @@ -396,7 +399,7 @@ def dag_started(
),
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={**run_facets, **get_airflow_debug_facet()},
run_facets={**run_facets, **get_airflow_debug_facet(), **job_dependency_facet},
),
inputs=[],
outputs=[],
Expand Down Expand Up @@ -424,9 +427,13 @@ def dag_success(
owners: list[str] | None,
run_facets: dict[str, RunFacet],
job_description: str | None,
is_asset_triggered: bool,
job_description_type: str | None = None,
):
try:
job_dependency_facet = {}
if is_asset_triggered:
job_dependency_facet = get_dag_job_dependency_facet(dag_id=dag_id, dag_run_id=run_id)
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_date.isoformat(),
Expand All @@ -446,6 +453,7 @@ def dag_success(
nominal_end_time=nominal_end_time,
run_facets={
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**job_dependency_facet,
**get_airflow_debug_facet(),
**run_facets,
},
Expand Down Expand Up @@ -477,9 +485,13 @@ def dag_failed(
msg: str,
run_facets: dict[str, RunFacet],
job_description: str | None,
is_asset_triggered: bool,
job_description_type: str | None = None,
):
try:
job_dependency_facet = {}
if is_asset_triggered:
job_dependency_facet = get_dag_job_dependency_facet(dag_id=dag_id, dag_run_id=run_id)
event = RunEvent(
eventType=RunState.FAIL,
eventTime=end_date.isoformat(),
Expand All @@ -502,6 +514,7 @@ def dag_failed(
message=msg, programmingLanguage="python"
),
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**job_dependency_facet,
**get_airflow_debug_facet(),
**run_facets,
},
Expand Down
Loading
Loading