From 8d5854de785390c34e0dfb18cb87ef981ccb9e39 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 31 Aug 2023 00:50:48 +0200 Subject: [PATCH 1/2] Replace sequence concatination by unpacking in Airflow core --- airflow/api/common/trigger_dag.py | 2 +- airflow/api_connexion/schemas/common_schema.py | 4 ++-- airflow/api_connexion/schemas/enum_schemas.py | 4 ++-- airflow/cli/cli_config.py | 2 +- airflow/cli/commands/standalone_command.py | 2 +- airflow/configuration.py | 2 +- airflow/jobs/backfill_job_runner.py | 2 +- .../kubernetes/pre_7_4_0_compatibility/pod_generator.py | 7 ++++--- airflow/models/dag.py | 2 +- airflow/utils/python_virtualenv.py | 6 +++--- airflow/utils/state.py | 2 +- airflow/www/utils.py | 6 ++++-- 12 files changed, 22 insertions(+), 19 deletions(-) diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index a522b938df449..a7b39cb5a630f 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -85,7 +85,7 @@ def _trigger_dag( run_conf = conf if isinstance(conf, dict) else json.loads(conf) dag_runs = [] - dags_to_run = [dag] + dag.subdags + dags_to_run = [dag, *dag.subdags] for _dag in dags_to_run: dag_run = _dag.create_dagrun( run_id=run_id, diff --git a/airflow/api_connexion/schemas/common_schema.py b/airflow/api_connexion/schemas/common_schema.py index cf510137621b6..a470e6b1c04c8 100644 --- a/airflow/api_connexion/schemas/common_schema.py +++ b/airflow/api_connexion/schemas/common_schema.py @@ -135,7 +135,7 @@ class ColorField(fields.String): def __init__(self, **metadata): super().__init__(**metadata) - self.validators = [validate.Regexp("^#[a-fA-F0-9]{3,6}$")] + list(self.validators) + self.validators = [validate.Regexp("^#[a-fA-F0-9]{3,6}$"), *self.validators] class WeightRuleField(fields.String): @@ -143,7 +143,7 @@ class WeightRuleField(fields.String): def __init__(self, **metadata): super().__init__(**metadata) - self.validators = [validate.OneOf(WeightRule.all_weight_rules())] + list(self.validators) + self.validators = [validate.OneOf(WeightRule.all_weight_rules()), *self.validators] class TimezoneField(fields.String): diff --git a/airflow/api_connexion/schemas/enum_schemas.py b/airflow/api_connexion/schemas/enum_schemas.py index 981a3669b1b58..ba82010783213 100644 --- a/airflow/api_connexion/schemas/enum_schemas.py +++ b/airflow/api_connexion/schemas/enum_schemas.py @@ -26,7 +26,7 @@ class DagStateField(fields.String): def __init__(self, **metadata): super().__init__(**metadata) - self.validators = [validate.OneOf(State.dag_states)] + list(self.validators) + self.validators = [validate.OneOf(State.dag_states), *self.validators] class TaskInstanceStateField(fields.String): @@ -34,4 +34,4 @@ class TaskInstanceStateField(fields.String): def __init__(self, **metadata): super().__init__(**metadata) - self.validators = [validate.OneOf(State.task_states)] + list(self.validators) + self.validators = [validate.OneOf(State.task_states), *self.validators] diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index fadf988a4ac09..d4e971d3dbd0a 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -1619,7 +1619,7 @@ class GroupCommand(NamedTuple): name="add", help="Add a connection", func=lazy_load_command("airflow.cli.commands.connection_command.connections_add"), - args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA) + tuple(ALTERNATIVE_CONN_SPECS_ARGS), + args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA, *ALTERNATIVE_CONN_SPECS_ARGS), ), ActionCommand( name="delete", diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py index 3265b6b747ff1..2c5fe7ab4882f 100644 --- a/airflow/cli/commands/standalone_command.py +++ b/airflow/cli/commands/standalone_command.py @@ -291,7 +291,7 @@ def __init__(self, parent, name: str, command: list[str], env: dict[str, str]): def run(self): """Run the actual process and captures it output to a queue.""" self.process = subprocess.Popen( - ["airflow"] + self.command, + ["airflow", *self.command], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=self.env, diff --git a/airflow/configuration.py b/airflow/configuration.py index 7f04c6926b24c..842a6b5b0ea7d 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -463,7 +463,7 @@ def inversed_deprecated_sections(self): ("logging", "logging_level"): _available_logging_levels, ("logging", "fab_logging_level"): _available_logging_levels, # celery_logging_level can be empty, which uses logging_level as fallback - ("logging", "celery_logging_level"): _available_logging_levels + [""], + ("logging", "celery_logging_level"): [*_available_logging_levels, ""], ("webserver", "analytical_tool"): ["google_analytics", "metarouter", "segment", ""], } diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index d93d3594cae29..6f1fbb0878a7d 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -788,7 +788,7 @@ def tabulate_ti_keys_set(ti_keys: Iterable[TaskInstanceKey]) -> str: yield tabulate_ti_keys_set([ti.key for ti in ti_status.deadlocked]) def _get_dag_with_subdags(self) -> list[DAG]: - return [self.dag] + self.dag.subdags + return [self.dag, *self.dag.subdags] @provide_session def _execute_dagruns( diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py index 45761978638c8..d61a3bc6ce9a3 100644 --- a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py +++ b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py @@ -366,9 +366,10 @@ def reconcile_containers( client_container = extend_object_field(base_container, client_container, "volume_devices") client_container = merge_objects(base_container, client_container) - return [client_container] + PodGenerator.reconcile_containers( - base_containers[1:], client_containers[1:] - ) + return [ + client_container, + *PodGenerator.reconcile_containers(base_containers[1:], client_containers[1:]), + ] @classmethod def construct_pod( diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5979c51c518fa..5e1426713b04d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1699,7 +1699,7 @@ def _get_task_instances( if include_subdags: # Crafting the right filter for dag_id and task_ids combo conditions = [] - for dag in self.subdags + [self]: + for dag in [*self.subdags, self]: conditions.append( (TaskInstance.dag_id == dag.dag_id) & TaskInstance.task_id.in_(dag.task_ids) ) diff --git a/airflow/utils/python_virtualenv.py b/airflow/utils/python_virtualenv.py index e957cce837fdc..4498cddbcedd6 100644 --- a/airflow/utils/python_virtualenv.py +++ b/airflow/utils/python_virtualenv.py @@ -41,14 +41,14 @@ def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages def _generate_pip_install_cmd_from_file( tmp_dir: str, requirements_file_path: str, pip_install_options: list[str] ) -> list[str]: - cmd = [f"{tmp_dir}/bin/pip", "install"] + pip_install_options + ["-r"] - return cmd + [requirements_file_path] + cmd = [f"{tmp_dir}/bin/pip", "install", *pip_install_options, "-r"] + return [*cmd, requirements_file_path] def _generate_pip_install_cmd_from_list( tmp_dir: str, requirements: list[str], pip_install_options: list[str] ) -> list[str]: - cmd = [f"{tmp_dir}/bin/pip", "install"] + pip_install_options + cmd = [f"{tmp_dir}/bin/pip", "install", *pip_install_options] return cmd + requirements diff --git a/airflow/utils/state.py b/airflow/utils/state.py index da5381215df05..ff1435f1fbb84 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -103,7 +103,7 @@ class State: finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) - task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState) + task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState) dag_states: tuple[DagRunState, ...] = ( DagRunState.QUEUED, diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 5f90b309694b8..d3cf5373bb65a 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -775,7 +775,8 @@ class AirflowFilterConverter(fab_sqlafilters.SQLAFilterConverter): "is_extendedjson", [], ), - ) + fab_sqlafilters.SQLAFilterConverter.conversion_table + *fab_sqlafilters.SQLAFilterConverter.conversion_table, + ) def __init__(self, datamodel): super().__init__(datamodel) @@ -877,7 +878,8 @@ def delete_all(self, items: list[Model]) -> bool: # place FieldConverter.conversion_table = ( ("is_utcdatetime", DateTimeWithTimezoneField, AirflowDateTimePickerWidget), -) + FieldConverter.conversion_table + *FieldConverter.conversion_table, +) class UIAlert: From f968c0e0481e0dfa9f3708b13f7ffc34834dce9c Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 31 Aug 2023 20:24:59 +0200 Subject: [PATCH 2/2] comments from code review --- airflow/utils/python_virtualenv.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/utils/python_virtualenv.py b/airflow/utils/python_virtualenv.py index 4498cddbcedd6..d613782f32994 100644 --- a/airflow/utils/python_virtualenv.py +++ b/airflow/utils/python_virtualenv.py @@ -41,15 +41,13 @@ def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages def _generate_pip_install_cmd_from_file( tmp_dir: str, requirements_file_path: str, pip_install_options: list[str] ) -> list[str]: - cmd = [f"{tmp_dir}/bin/pip", "install", *pip_install_options, "-r"] - return [*cmd, requirements_file_path] + return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, "-r", requirements_file_path] def _generate_pip_install_cmd_from_list( tmp_dir: str, requirements: list[str], pip_install_options: list[str] ) -> list[str]: - cmd = [f"{tmp_dir}/bin/pip", "install", *pip_install_options] - return cmd + requirements + return [f"{tmp_dir}/bin/pip", "install", *pip_install_options, *requirements] def _generate_pip_conf(conf_file: Path, index_urls: list[str]) -> None: