From 93b3915ed3d39a76986e22bb580ab22ce57587f8 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Fri, 1 Sep 2023 01:15:18 +0200 Subject: [PATCH 1/3] Combine similar if logics in core --- airflow/models/baseoperator.py | 8 +++++--- airflow/serialization/serialized_objects.py | 8 +++++--- airflow/ti_deps/deps/trigger_rule_dep.py | 15 +-------------- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index a7414397ea7b0..3569019cd0672 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1107,9 +1107,11 @@ def dag(self, dag: DAG | None): if self.__from_mapped: pass # Don't add to DAG -- the mapped task takes the place. - elif self.task_id not in dag.task_dict: - dag.add_task(self) - elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self: + elif ( + self.task_id not in dag.task_dict + or self.task_id in dag.task_dict + and dag.task_dict[self.task_id] is not self + ): dag.add_task(self) self._dag = dag diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 230f4aca18938..bee6192631003 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -997,9 +997,11 @@ def populate_operator(cls, op: Operator, encoded_op: dict[str, Any]) -> None: v = {arg: cls.deserialize(value) for arg, value in v.items()} elif k in {"expand_input", "op_kwargs_expand_input"}: v = _ExpandInputRef(v["type"], cls.deserialize(v["value"])) - elif k in cls._decorated_fields or k not in op.get_serialized_fields(): - v = cls.deserialize(v) - elif k in ("outlets", "inlets"): + elif ( + k in cls._decorated_fields + or k not in op.get_serialized_fields() + or k in ("outlets", "inlets") + ): v = cls.deserialize(v) elif k == "on_failure_fail_dagrun": k = "_on_failure_fail_dagrun" diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 9731d2a7c235e..f0e2425ea44bb 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -367,20 +367,7 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]: f"upstream_task_ids={task.upstream_task_ids}" ) ) - elif trigger_rule == TR.NONE_FAILED: - num_failures = upstream - success - skipped - if ti.map_index > -1: - num_failures -= removed - if num_failures > 0: - yield self._failing_status( - reason=( - f"Task's trigger rule '{trigger_rule}' requires all upstream tasks to have " - f"succeeded or been skipped, but found {num_failures} non-success(es). " - f"upstream_states={upstream_states}, " - f"upstream_task_ids={task.upstream_task_ids}" - ) - ) - elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS: + elif trigger_rule in (TR.NONE_FAILED, TR.NONE_FAILED_MIN_ONE_SUCCESS): num_failures = upstream - success - skipped if ti.map_index > -1: num_failures -= removed From 68654b160640487ded699c0bc56d13ac62aac77b Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 2 Sep 2023 00:26:15 +0200 Subject: [PATCH 2/3] Update airflow/models/baseoperator.py Co-authored-by: Tzu-ping Chung --- airflow/models/baseoperator.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 3569019cd0672..3aa67c8ecabcd 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1107,11 +1107,7 @@ def dag(self, dag: DAG | None): if self.__from_mapped: pass # Don't add to DAG -- the mapped task takes the place. - elif ( - self.task_id not in dag.task_dict - or self.task_id in dag.task_dict - and dag.task_dict[self.task_id] is not self - ): + elif dag.task_dict.get(self.task_id) is not self: dag.add_task(self) self._dag = dag From 490774c57fde3a55e567bf61bdfb1de85088d26b Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 2 Sep 2023 00:28:12 +0200 Subject: [PATCH 3/3] replace in tuple by multiple or equalities --- airflow/ti_deps/deps/trigger_rule_dep.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index f0e2425ea44bb..9f2e00089b100 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -367,7 +367,7 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]: f"upstream_task_ids={task.upstream_task_ids}" ) ) - elif trigger_rule in (TR.NONE_FAILED, TR.NONE_FAILED_MIN_ONE_SUCCESS): + elif trigger_rule == TR.NONE_FAILED or trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS: num_failures = upstream - success - skipped if ti.map_index > -1: num_failures -= removed