From d4846f783d0acac6e7d71c22bce0eff17096978b Mon Sep 17 00:00:00 2001 From: cordepe Date: Tue, 1 Apr 2025 11:34:31 +0200 Subject: [PATCH 1/2] Fixed dependency issue taking the object itself instead of task_id --- .../providers/databricks/operators/databricks.py | 10 +++++----- .../databricks/operators/databricks_workflow.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 2de7de67b3e94..f16d830305c25 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -29,7 +29,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.models import BaseOperator +from airflow.models import BaseOperator, BaseOperatorLink, XCom from airflow.providers.databricks.hooks.databricks import DatabricksHook, RunLifeCycleState, RunState from airflow.providers.databricks.operators.databricks_workflow import ( DatabricksWorkflowTaskGroup, @@ -1154,9 +1154,9 @@ def _convert_to_databricks_workflow_task( result = { "task_key": self.databricks_task_key, "depends_on": [ - {"task_key": self._generate_databricks_task_key(task_id)} - for task_id in self.upstream_task_ids - if task_id in relevant_upstreams + {"task_key": upstream_task.databricks_task_key} + for upstream_task in relevant_upstreams + if isinstance(upstream_task, DatabricksTaskBaseOperator) and upstream_task.task_id in self.upstream_task_ids ], **base_task_json, } @@ -1449,4 +1449,4 @@ def __init__( def _get_task_base_json(self) -> dict[str, Any]: """Get task base json to be used for task submissions.""" - return self.task_config + return self.task_config \ No newline at end of file diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py index f9f91adcb56bb..a49b7552420d7 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py @@ -114,8 +114,8 @@ def __init__( self.job_clusters = job_clusters or [] self.max_concurrent_runs = max_concurrent_runs self.notebook_params = notebook_params or {} - self.tasks_to_convert = tasks_to_convert or [] - self.relevant_upstreams = [task_id] + self.tasks_to_convert = tasks_to_convert or [] + self.relevant_upstreams: list[BaseOperator] = [self] self.workflow_run_metadata: WorkflowRunMetadata | None = None super().__init__(task_id=task_id, **kwargs) @@ -333,10 +333,10 @@ def __exit__( ) task.workflow_run_metadata = create_databricks_workflow_task.output - create_databricks_workflow_task.relevant_upstreams.append(task.task_id) + create_databricks_workflow_task.relevant_upstreams.append(task) create_databricks_workflow_task.add_task(task) for root_task in roots: root_task.set_upstream(create_databricks_workflow_task) - super().__exit__(_type, _value, _tb) + super().__exit__(_type, _value, _tb) \ No newline at end of file From fc5fc937fa74b56b0b9d0e08275d04849c06272e Mon Sep 17 00:00:00 2001 From: cordepe Date: Tue, 1 Apr 2025 11:58:31 +0200 Subject: [PATCH 2/2] Added empty line --- .../src/airflow/providers/databricks/operators/databricks.py | 2 +- .../providers/databricks/operators/databricks_workflow.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index f16d830305c25..804a84e2093ba 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -1449,4 +1449,4 @@ def __init__( def _get_task_base_json(self) -> dict[str, Any]: """Get task base json to be used for task submissions.""" - return self.task_config \ No newline at end of file + return self.task_config diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py index a49b7552420d7..3c0f5d48ee768 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py @@ -339,4 +339,4 @@ def __exit__( for root_task in roots: root_task.set_upstream(create_databricks_workflow_task) - super().__exit__(_type, _value, _tb) \ No newline at end of file + super().__exit__(_type, _value, _tb)