diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py index 9e2e8c08a4b56..b74ed036ccadc 100644 --- a/airflow/api/common/experimental/get_code.py +++ b/airflow/api/common/experimental/get_code.py @@ -37,5 +37,5 @@ def get_code(dag_id: str) -> str: try: return DagCode.get_code_by_fileloc(dag.fileloc) except (OSError, DagCodeNotFound) as exception: - error_message = f"Error {str(exception)} while reading Dag id {dag_id} Code" + error_message = f"Error {exception} while reading Dag id {dag_id} Code" raise AirflowException(error_message, exception) diff --git a/airflow/auth/managers/fab/security_manager/modules/db.py b/airflow/auth/managers/fab/security_manager/modules/db.py index c7a1556acd3ed..c9b7b34d5e1b4 100644 --- a/airflow/auth/managers/fab/security_manager/modules/db.py +++ b/airflow/auth/managers/fab/security_manager/modules/db.py @@ -86,7 +86,7 @@ def create_db(self): if self.count_users() == 0 and self.auth_role_public != self.auth_role_admin: log.warning(const.LOGMSG_WAR_SEC_NO_USER) except Exception as e: - log.error(const.LOGMSG_ERR_SEC_CREATE_DB.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_CREATE_DB.format(e)) exit(1) """ @@ -106,7 +106,7 @@ def update_role(self, role_id, name: str) -> Role | None: self.get_session.commit() log.info(const.LOGMSG_INF_SEC_UPD_ROLE.format(role)) except Exception as e: - log.error(const.LOGMSG_ERR_SEC_UPD_ROLE.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_UPD_ROLE.format(e)) self.get_session.rollback() return None return role @@ -123,7 +123,7 @@ def add_role(self, name: str) -> Role: log.info(const.LOGMSG_INF_SEC_ADD_ROLE.format(name)) return role except Exception as e: - log.error(const.LOGMSG_ERR_SEC_ADD_ROLE.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_ADD_ROLE.format(e)) self.get_session.rollback() return role @@ -190,7 +190,7 @@ def add_user( log.info(const.LOGMSG_INF_SEC_ADD_USER.format(username)) return user except Exception as e: - log.error(const.LOGMSG_ERR_SEC_ADD_USER.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_ADD_USER.format(e)) self.get_session.rollback() return False @@ -226,7 +226,7 @@ def add_register_user(self, username, first_name, last_name, email, password="", self.get_session.commit() return register_user except Exception as e: - log.error(const.LOGMSG_ERR_SEC_ADD_REGISTER_USER.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_ADD_REGISTER_USER.format(e)) self.get_session.rollback() return None @@ -269,7 +269,7 @@ def update_user(self, user): self.get_session.commit() log.info(const.LOGMSG_INF_SEC_UPD_USER.format(user)) except Exception as e: - log.error(const.LOGMSG_ERR_SEC_UPD_USER.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_UPD_USER.format(e)) self.get_session.rollback() return False @@ -284,7 +284,7 @@ def del_register_user(self, register_user): self.get_session.commit() return True except Exception as e: - log.error(const.LOGMSG_ERR_SEC_DEL_REGISTER_USER.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_DEL_REGISTER_USER.format(e)) self.get_session.rollback() return False @@ -322,7 +322,7 @@ def create_action(self, name): self.get_session.commit() return action except Exception as e: - log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION.format(e)) self.get_session.rollback() return action @@ -349,7 +349,7 @@ def delete_action(self, name: str) -> bool: self.get_session.commit() return True except Exception as e: - log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(e)) self.get_session.rollback() return False @@ -383,7 +383,7 @@ def create_resource(self, name) -> Resource: self.get_session.commit() return resource except Exception as e: - log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU.format(e)) self.get_session.rollback() return resource @@ -419,7 +419,7 @@ def delete_resource(self, name: str) -> bool: self.get_session.commit() return True except Exception as e: - log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(e)) self.get_session.rollback() return False @@ -481,10 +481,10 @@ def create_permission(self, action_name, resource_name) -> Permission | None: try: self.get_session.add(perm) self.get_session.commit() - log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW.format(str(perm))) + log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW.format(perm)) return perm except Exception as e: - log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW.format(e)) self.get_session.rollback() return None @@ -518,7 +518,7 @@ def delete_permission(self, action_name: str, resource_name: str) -> None: self.delete_action(perm.action.name) log.info(const.LOGMSG_INF_SEC_DEL_PERMVIEW.format(action_name, resource_name)) except Exception as e: - log.error(const.LOGMSG_ERR_SEC_DEL_PERMVIEW.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_DEL_PERMVIEW.format(e)) self.get_session.rollback() def add_permission_to_role(self, role: Role, permission: Permission | None) -> None: @@ -534,9 +534,9 @@ def add_permission_to_role(self, role: Role, permission: Permission | None) -> N role.permissions.append(permission) self.get_session.merge(role) self.get_session.commit() - log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE.format(str(permission), role.name)) + log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE.format(permission, role.name)) except Exception as e: - log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE.format(e)) self.get_session.rollback() def remove_permission_from_role(self, role: Role, permission: Permission) -> None: @@ -551,7 +551,7 @@ def remove_permission_from_role(self, role: Role, permission: Permission) -> Non role.permissions.remove(permission) self.get_session.merge(role) self.get_session.commit() - log.info(const.LOGMSG_INF_SEC_DEL_PERMROLE.format(str(permission), role.name)) + log.info(const.LOGMSG_INF_SEC_DEL_PERMROLE.format(permission, role.name)) except Exception as e: - log.error(const.LOGMSG_ERR_SEC_DEL_PERMROLE.format(str(e))) + log.error(const.LOGMSG_ERR_SEC_DEL_PERMROLE.format(e)) self.get_session.rollback() diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 205d87b2f0345..97b290f0780fb 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -548,7 +548,7 @@ def task_states_for_dag_run(args, session: Session = NEW_SESSION) -> None: select(DagRun).where(DagRun.execution_date == execution_date, DagRun.dag_id == args.dag_id) ) except (ParserError, TypeError) as err: - raise AirflowException(f"Error parsing the supplied execution_date. Error: {str(err)}") + raise AirflowException(f"Error parsing the supplied execution_date. Error: {err}") if dag_run is None: raise DagRunNotFound( diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 2f0343e1584d4..a855ce9859ee4 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -50,7 +50,7 @@ for i in range(3): task = BashOperator( - task_id="runme_" + str(i), + task_id=f"runme_{i}", bash_command='echo "{{ task_instance_key_str }}" && sleep 1', ) task >> run_this diff --git a/airflow/example_dags/example_short_circuit_decorator.py b/airflow/example_dags/example_short_circuit_decorator.py index 30f6cd0e012bb..79a8c6904b17e 100644 --- a/airflow/example_dags/example_short_circuit_decorator.py +++ b/airflow/example_dags/example_short_circuit_decorator.py @@ -32,8 +32,8 @@ def example_short_circuit_decorator(): def check_condition(condition): return condition - ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]] - ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]] + ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]] + ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]] condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True) condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False) diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 77f976c502a16..5a35f0b49b83a 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -42,8 +42,8 @@ python_callable=lambda: False, ) - ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]] - ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]] + ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]] + ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]] chain(cond_true, *ds_true) chain(cond_false, *ds_false) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 7ae175540f082..909a955cbf536 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -308,7 +308,7 @@ def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None try: self.running.remove(key) except KeyError: - self.log.debug("Could not find key: %s", str(key)) + self.log.debug("Could not find key: %s", key) self.event_buffer[key] = state, info def fail(self, key: TaskInstanceKey, info=None) -> None: diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index 8a46d6cda0dd1..4ecebdff8b4f8 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -92,7 +92,7 @@ def _run_task(self, ti: TaskInstance) -> bool: except Exception as e: ti.set_state(TaskInstanceState.FAILED) self.change_state(key, TaskInstanceState.FAILED) - self.log.exception("Failed to execute task: %s.", str(e)) + self.log.exception("Failed to execute task: %s.", e) return False def queue_task_instance( diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index cf88ca13b2f5e..9dcfdb629faed 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -99,7 +99,7 @@ def _execute_work_in_subprocess(self, command: CommandType) -> TaskInstanceState subprocess.check_call(command, close_fds=True) return TaskInstanceState.SUCCESS except subprocess.CalledProcessError as e: - self.log.error("Failed to execute task %s.", str(e)) + self.log.error("Failed to execute task %s.", e) return TaskInstanceState.FAILED def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState: diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 2715edad6ea21..8ea3e42dc5b68 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -78,7 +78,7 @@ def sync(self) -> None: self.change_state(key, TaskInstanceState.SUCCESS) except subprocess.CalledProcessError as e: self.change_state(key, TaskInstanceState.FAILED) - self.log.error("Failed to execute task %s.", str(e)) + self.log.error("Failed to execute task %s.", e) self.commands_to_run = [] diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index f20808868db1d..159b8f934a1b7 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -153,7 +153,7 @@ def kill(self, session: Session = NEW_SESSION) -> NoReturn: try: self.on_kill() except Exception as e: - self.log.error("on_kill() method failed: %s", str(e)) + self.log.error("on_kill() method failed: %s", e) session.merge(job) session.commit() raise AirflowException("Job shut down externally.") diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py index d04af55dcedc3..f20a8943614f9 100644 --- a/airflow/jobs/local_task_job_runner.py +++ b/airflow/jobs/local_task_job_runner.py @@ -285,7 +285,7 @@ def heartbeat_callback(self, session: Session = NEW_SESSION) -> None: else: dagrun_timeout = None if dagrun_timeout and execution_time > dagrun_timeout: - self.log.warning("DagRun timed out after %s.", str(execution_time)) + self.log.warning("DagRun timed out after %s.", execution_time) # potential race condition, the _run_raw_task commits `success` or other state # but task_runner does not exit right away due to slow process shutdown or any other reasons diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 50f78a48a9680..5bd9f816a33f3 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -899,7 +899,7 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) if callback_to_run: self._send_dag_callbacks_to_processor(dag, callback_to_run) except Exception as e: # should not fail the scheduler - self.log.exception("Failed to update dag run state for paused dags due to %s", str(e)) + self.log.exception("Failed to update dag run state for paused dags due to %s", e) def _run_scheduler_loop(self) -> None: """ diff --git a/airflow/policies.py b/airflow/policies.py index f37703fe2a250..6b40b5ac5f3a3 100644 --- a/airflow/policies.py +++ b/airflow/policies.py @@ -171,7 +171,7 @@ def _make_shim_fn(name, desired_sig, target): # codestr = textwrap.dedent( f""" - def {name}_name_mismatch_shim{str(desired_sig)}: + def {name}_name_mismatch_shim{desired_sig}: return __target({' ,'.join(desired_sig.parameters)}) """ ) diff --git a/airflow/providers/amazon/aws/hooks/dynamodb.py b/airflow/providers/amazon/aws/hooks/dynamodb.py index db38b401af7f3..957730a754c06 100644 --- a/airflow/providers/amazon/aws/hooks/dynamodb.py +++ b/airflow/providers/amazon/aws/hooks/dynamodb.py @@ -68,4 +68,4 @@ def write_batch_data(self, items: Iterable) -> bool: batch.put_item(Item=item) return True except Exception as general_error: - raise AirflowException(f"Failed to insert items in dynamodb, error: {str(general_error)}") + raise AirflowException(f"Failed to insert items in dynamodb, error: {general_error}") diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 5f74468d04ce4..bb2a6a58599d9 100644 --- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -99,7 +99,7 @@ def _read(self, task_instance, try_number, metadata=None): except Exception as e: log = ( f"*** Unable to read remote logs from Cloudwatch (log_group: {self.log_group}, log_stream: " - f"{stream_name})\n*** {str(e)}\n\n" + f"{stream_name})\n*** {e}\n\n" ) self.log.error(log) local_log, metadata = super()._read(task_instance, try_number, metadata) diff --git a/airflow/providers/amazon/aws/sensors/sqs.py b/airflow/providers/amazon/aws/sensors/sqs.py index a61652acb21fa..705f100fb1476 100644 --- a/airflow/providers/amazon/aws/sensors/sqs.py +++ b/airflow/providers/amazon/aws/sensors/sqs.py @@ -202,9 +202,7 @@ def poke(self, context: Context): response = self.hook.conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries) if "Successful" not in response: - raise AirflowException( - "Delete SQS Messages failed " + str(response) + " for messages " + str(messages) - ) + raise AirflowException(f"Delete SQS Messages failed {response} for messages {messages}") if not len(message_batch): return False diff --git a/airflow/providers/amazon/aws/triggers/sqs.py b/airflow/providers/amazon/aws/triggers/sqs.py index 7e26b9c28f3fa..68f85c5f53019 100644 --- a/airflow/providers/amazon/aws/triggers/sqs.py +++ b/airflow/providers/amazon/aws/triggers/sqs.py @@ -149,9 +149,7 @@ async def poke(self, client: Any): response = await client.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries) if "Successful" not in response: - raise AirflowException( - f"Delete SQS Messages failed {str(response)} for messages {str(messages)}" - ) + raise AirflowException(f"Delete SQS Messages failed {response} for messages {messages}") return message_batch diff --git a/airflow/providers/apache/livy/triggers/livy.py b/airflow/providers/apache/livy/triggers/livy.py index 95ccc8577b54b..30d6e393f1d3d 100644 --- a/airflow/providers/apache/livy/triggers/livy.py +++ b/airflow/providers/apache/livy/triggers/livy.py @@ -101,7 +101,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: { "status": "error", "batch_id": self._batch_id, - "response": f"Batch {self._batch_id} did not succeed with {str(exc)}", + "response": f"Batch {self._batch_id} did not succeed with {exc}", "log_lines": None, } ) diff --git a/airflow/providers/arangodb/hooks/arangodb.py b/airflow/providers/arangodb/hooks/arangodb.py index 07363e00f5979..f8f4d90072f7a 100644 --- a/airflow/providers/arangodb/hooks/arangodb.py +++ b/airflow/providers/arangodb/hooks/arangodb.py @@ -108,7 +108,7 @@ def query(self, query, **kwargs) -> Cursor: f"Failed to execute AQLQuery, error connecting to database: {self.database}" ) except AQLQueryExecuteError as error: - raise AirflowException(f"Failed to execute AQLQuery, error: {str(error)}") + raise AirflowException(f"Failed to execute AQLQuery, error: {error}") def create_collection(self, name): if not self.db_conn.has_collection(name): diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 8e95f7a6c9387..3dd237f91c654 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -516,7 +516,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li if log: messages.append("Found logs through kube API") except Exception as e: - messages.append(f"Reading from k8s pod logs failed: {str(e)}") + messages.append(f"Reading from k8s pod logs failed: {e}") return messages, ["\n".join(log)] def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: diff --git a/airflow/providers/dbt/cloud/hooks/dbt.py b/airflow/providers/dbt/cloud/hooks/dbt.py index 8c589970ab620..4a9785da3eff8 100644 --- a/airflow/providers/dbt/cloud/hooks/dbt.py +++ b/airflow/providers/dbt/cloud/hooks/dbt.py @@ -254,7 +254,7 @@ async def get_job_status( Valid values are "trigger", "job", "repository", and "environment". """ try: - self.log.info("Getting the status of job run %s.", str(run_id)) + self.log.info("Getting the status of job run %s.", run_id) response = await self.get_job_details( run_id, account_id=account_id, include_related=include_related ) @@ -490,14 +490,12 @@ def get_job_run_status(self, run_id: int, account_id: int | None = None) -> int: :param account_id: Optional. The ID of a dbt Cloud account. :return: The status of a dbt Cloud job run. """ - self.log.info("Getting the status of job run %s.", str(run_id)) + self.log.info("Getting the status of job run %s.", run_id) job_run = self.get_job_run(account_id=account_id, run_id=run_id) job_run_status = job_run.json()["data"]["status"] - self.log.info( - "Current status of job run %s: %s", str(run_id), DbtCloudJobRunStatus(job_run_status).name - ) + self.log.info("Current status of job run %s: %s", run_id, DbtCloudJobRunStatus(job_run_status).name) return job_run_status diff --git a/airflow/providers/ftp/sensors/ftp.py b/airflow/providers/ftp/sensors/ftp.py index 92710aeb21b05..4ec7cf75169a9 100644 --- a/airflow/providers/ftp/sensors/ftp.py +++ b/airflow/providers/ftp/sensors/ftp.py @@ -73,10 +73,10 @@ def poke(self, context: Context) -> bool: self.log.info("Poking for %s", self.path) try: mod_time = hook.get_mod_time(self.path) - self.log.info("Found File %s last modified: %s", str(self.path), str(mod_time)) + self.log.info("Found File %s last modified: %s", self.path, mod_time) except ftplib.error_perm as e: - self.log.error("Ftp error encountered: %s", str(e)) + self.log.error("Ftp error encountered: %s", e) error_code = self._get_error_code(e) if (error_code != 550) and ( self.fail_on_transient_errors or (error_code not in self.transient_errors) diff --git a/airflow/providers/github/operators/github.py b/airflow/providers/github/operators/github.py index 82c9ab3b77abf..e7e305808f85f 100644 --- a/airflow/providers/github/operators/github.py +++ b/airflow/providers/github/operators/github.py @@ -74,6 +74,6 @@ def execute(self, context: Context) -> Any: return github_result except GithubException as github_error: - raise AirflowException(f"Failed to execute GithubOperator, error: {str(github_error)}") + raise AirflowException(f"Failed to execute GithubOperator, error: {github_error}") except Exception as e: - raise AirflowException(f"GitHub operator error: {str(e)}") + raise AirflowException(f"GitHub operator error: {e}") diff --git a/airflow/providers/github/sensors/github.py b/airflow/providers/github/sensors/github.py index eb9caad5aeed4..db943cd0ea0f3 100644 --- a/airflow/providers/github/sensors/github.py +++ b/airflow/providers/github/sensors/github.py @@ -136,9 +136,9 @@ def tag_checker(self, repo: Any) -> bool | None: result = self.tag_name in all_tags except GithubException as github_error: # type: ignore[misc] - raise AirflowException(f"Failed to execute GithubSensor, error: {str(github_error)}") + raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}") except Exception as e: - raise AirflowException(f"GitHub operator error: {str(e)}") + raise AirflowException(f"GitHub operator error: {e}") if result is True: self.log.info("Tag %s exists in %s repository, Success.", self.tag_name, self.repository_name) diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 121a668857b29..79055f0847f78 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -247,7 +247,7 @@ def gcs_write(self, log, remote_log_location) -> bool: pass else: log += self._add_message( - f"Error checking for previous log; if exists, may be overwritten: {str(e)}" + f"Error checking for previous log; if exists, may be overwritten: {e}" ) self.log.warning("Error checking for previous log: %s", e) try: diff --git a/airflow/providers/google/cloud/operators/mlengine.py b/airflow/providers/google/cloud/operators/mlengine.py index b391041372273..30d1225c0561e 100644 --- a/airflow/providers/google/cloud/operators/mlengine.py +++ b/airflow/providers/google/cloud/operators/mlengine.py @@ -278,7 +278,7 @@ def check_existing_job(existing_job): ) if finished_prediction_job["state"] != "SUCCEEDED": - self.log.error("MLEngine batch prediction job failed: %s", str(finished_prediction_job)) + self.log.error("MLEngine batch prediction job failed: %s", finished_prediction_job) raise RuntimeError(finished_prediction_job["errorMessage"]) return finished_prediction_job["predictionOutput"] @@ -1153,7 +1153,7 @@ def __init__( def _handle_job_error(self, finished_training_job) -> None: if finished_training_job["state"] != "SUCCEEDED": - self.log.error("MLEngine training job failed: %s", str(finished_training_job)) + self.log.error("MLEngine training job failed: %s", finished_training_job) raise RuntimeError(finished_training_job["errorMessage"]) def execute(self, context: Context): diff --git a/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py b/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py index 408977d7ccfa4..620edfbe1558f 100644 --- a/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py @@ -284,7 +284,7 @@ def convert_value(self, value: Any | None) -> Any | None: elif isinstance(value, OrderedMapSerializedKey): return self.convert_map_type(value) else: - raise AirflowException("Unexpected value: " + str(value)) + raise AirflowException(f"Unexpected value: {value}") def convert_array_types(self, value: list[Any] | SortedSet) -> list[Any]: """Maps convert_value over array.""" @@ -308,7 +308,7 @@ def convert_tuple_type(self, values: tuple[Any]) -> dict[str, Any]: will be named 'field_', where index is determined by the order of the tuple elements defined in cassandra. """ - names = ["field_" + str(i) for i in range(len(values))] + names = [f"field_{i}" for i in range(len(values))] return self.generate_data_dict(names, values) def convert_map_type(self, value: OrderedMapSerializedKey) -> list[dict[str, Any]]: @@ -351,7 +351,7 @@ def get_bq_fields(cls, type_: Any) -> list[dict[str, Any]]: types = type_.subtypes if types and not names and type_.cassname == "TupleType": - names = ["field_" + str(i) for i in range(len(types))] + names = [f"field_{i}" for i in range(len(types))] elif types and not names and type_.cassname == "MapType": names = ["key", "value"] diff --git a/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py b/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py index 30e7f61860b4e..9273553e1bc17 100644 --- a/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py @@ -141,11 +141,11 @@ def execute(self, context: Context): account_id=account_id, ) else: - self.log.warning("account_id: %s returned empty report", str(account_id)) + self.log.warning("account_id: %s returned empty report", account_id) else: message = ( "Facebook Ads Hook returned different type than expected. Expected return types should be " - "List or Dict. Actual return type of the Hook: " + str(type(bulk_report)) + f"List or Dict. Actual return type of the Hook: {type(bulk_report)}" ) raise AirflowException(message) total_row_count = self._decide_and_flush(converted_rows_with_action=converted_rows_with_action) diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py b/airflow/providers/google/cloud/triggers/bigquery_dts.py index 3a5ab2267f97a..d5a920a762a2c 100644 --- a/airflow/providers/google/cloud/triggers/bigquery_dts.py +++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py @@ -133,7 +133,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent( { "status": "failed", - "message": f"Trigger failed with exception: {str(e)}", + "message": f"Trigger failed with exception: {e}", } ) return diff --git a/airflow/providers/grpc/hooks/grpc.py b/airflow/providers/grpc/hooks/grpc.py index c42262f914e12..b7b0344a4fc9e 100644 --- a/airflow/providers/grpc/hooks/grpc.py +++ b/airflow/providers/grpc/hooks/grpc.py @@ -112,7 +112,7 @@ def get_conn(self) -> grpc.Channel: else: raise AirflowConfigException( "auth_type not supported or not provided, channel cannot be established, " - f"given value: {str(auth_type)}" + f"given value: {auth_type}" ) if self.interceptors: diff --git a/airflow/providers/microsoft/azure/hooks/synapse.py b/airflow/providers/microsoft/azure/hooks/synapse.py index 881f918b60d76..84475a2f38e3d 100644 --- a/airflow/providers/microsoft/azure/hooks/synapse.py +++ b/airflow/providers/microsoft/azure/hooks/synapse.py @@ -183,7 +183,7 @@ def wait_for_job_run_status( ) # Wait to check the status of the job run based on the ``check_interval`` configured. - self.log.info("Sleeping for %s seconds", str(check_interval)) + self.log.info("Sleeping for %s seconds", check_interval) time.sleep(check_interval) job_run_status = self.get_job_run_status() diff --git a/airflow/providers/microsoft/azure/triggers/data_factory.py b/airflow/providers/microsoft/azure/triggers/data_factory.py index e3dd38ad6646a..1ce5484008f9c 100644 --- a/airflow/providers/microsoft/azure/triggers/data_factory.py +++ b/airflow/providers/microsoft/azure/triggers/data_factory.py @@ -233,7 +233,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: resource_group_name=self.resource_group_name, factory_name=self.factory_name, ) - self.log.info("Unexpected error %s caught. Cancel pipeline run %s", str(e), self.run_id) + self.log.info("Unexpected error %s caught. Cancel pipeline run %s", e, self.run_id) except Exception as err: yield TriggerEvent({"status": "error", "message": str(err), "run_id": self.run_id}) yield TriggerEvent({"status": "error", "message": str(e), "run_id": self.run_id}) diff --git a/airflow/providers/microsoft/winrm/operators/winrm.py b/airflow/providers/microsoft/winrm/operators/winrm.py index 7aef926173c08..b8562fd58807d 100644 --- a/airflow/providers/microsoft/winrm/operators/winrm.py +++ b/airflow/providers/microsoft/winrm/operators/winrm.py @@ -141,7 +141,7 @@ def execute(self, context: Context) -> list | str: self.winrm_hook.winrm_protocol.close_shell(winrm_client) # type: ignore[attr-defined] except Exception as e: - raise AirflowException(f"WinRM operator error: {str(e)}") + raise AirflowException(f"WinRM operator error: {e}") if return_code == 0: # returning output if do_xcom_push is set diff --git a/airflow/providers/sftp/operators/sftp.py b/airflow/providers/sftp/operators/sftp.py index 95b3a8eeb9d99..949654cc46538 100644 --- a/airflow/providers/sftp/operators/sftp.py +++ b/airflow/providers/sftp/operators/sftp.py @@ -188,7 +188,7 @@ def execute(self, context: Any) -> str | list[str] | None: self.sftp_hook.store_file(_remote_filepath, _local_filepath, confirm=self.confirm) except Exception as e: - raise AirflowException(f"Error while transferring {file_msg}, error: {str(e)}") + raise AirflowException(f"Error while transferring {file_msg}, error: {e}") return self.local_filepath diff --git a/airflow/providers/sftp/sensors/sftp.py b/airflow/providers/sftp/sensors/sftp.py index 82baab2bce5ad..f6a1278c18a11 100644 --- a/airflow/providers/sftp/sensors/sftp.py +++ b/airflow/providers/sftp/sensors/sftp.py @@ -87,7 +87,7 @@ def poke(self, context: Context) -> PokeReturnValue | bool: for actual_file_to_check in actual_files_to_check: try: mod_time = self.hook.get_mod_time(actual_file_to_check) - self.log.info("Found File %s last modified: %s", str(actual_file_to_check), str(mod_time)) + self.log.info("Found File %s last modified: %s", actual_file_to_check, mod_time) except OSError as e: if e.errno != SFTP_NO_SUCH_FILE: raise e diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py index 3eba10380f740..d2d006818f93f 100644 --- a/airflow/sensors/filesystem.py +++ b/airflow/sensors/filesystem.py @@ -66,7 +66,7 @@ def poke(self, context: Context): for path in glob(full_path, recursive=self.recursive): if os.path.isfile(path): mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S") - self.log.info("Found File %s last modified: %s", str(path), mod_time) + self.log.info("Found File %s last modified: %s", path, mod_time) return True for _, _, files in os.walk(path): diff --git a/airflow/triggers/file.py b/airflow/triggers/file.py index 85a5a373baecf..93880407e5360 100644 --- a/airflow/triggers/file.py +++ b/airflow/triggers/file.py @@ -65,7 +65,7 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]: if os.path.isfile(path): mod_time_f = os.path.getmtime(path) mod_time = datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S") - self.log.info("Found File %s last modified: %s", str(path), str(mod_time)) + self.log.info("Found File %s last modified: %s", path, mod_time) yield TriggerEvent(True) for _, _, files in os.walk(self.filepath): if files: diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 1314cd7ff9b85..2530b8fdb5313 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -520,7 +520,7 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li messages.append(f"Found logs served from host {url}") logs.append(response.text) except Exception as e: - messages.append(f"Could not read served logs: {str(e)}") + messages.append(f"Could not read served logs: {e}") logger.exception("Could not read served logs") return messages, logs diff --git a/airflow/www/views.py b/airflow/www/views.py index 018b720b4f287..32f8006fcda2e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1694,7 +1694,7 @@ def get_logs_with_metadata(self, session: Session = NEW_SESSION): headers={"Content-Disposition": f"attachment; filename={attachment_filename}"}, ) except AttributeError as e: - error_messages = [f"Task log handler does not support read logs.\n{str(e)}\n"] + error_messages = [f"Task log handler does not support read logs.\n{e}\n"] metadata["end_of_log"] = True return {"message": error_messages, "error": True, "metadata": metadata} diff --git a/tests/api_connexion/endpoints/test_connection_endpoint.py b/tests/api_connexion/endpoints/test_connection_endpoint.py index de0b7aad973fd..24180b5052bc7 100644 --- a/tests/api_connexion/endpoints/test_connection_endpoint.py +++ b/tests/api_connexion/endpoints/test_connection_endpoint.py @@ -351,7 +351,7 @@ def test_should_return_conf_max_if_req_max_above_conf(self, session): def _create_connections(self, count): return [ - Connection(conn_id="TEST_CONN_ID" + str(i), conn_type="TEST_CONN_TYPE" + str(i)) + Connection(conn_id=f"TEST_CONN_ID{i}", conn_type=f"TEST_CONN_TYPE{i}") for i in range(1, count + 1) ] diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 434511b9e9560..e66139a289fe9 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -137,7 +137,7 @@ def _create_test_dag_run(self, state="running", extra_dag=False, commit=True, id dags.append(DagModel(dag_id="TEST_DAG_ID", is_active=True)) dagrun_model = DagRun( dag_id="TEST_DAG_ID", - run_id="TEST_DAG_RUN_ID_" + str(i), + run_id=f"TEST_DAG_RUN_ID_{i}", run_type=DagRunType.MANUAL, execution_date=timezone.parse(self.default_time) + timedelta(days=i - 1), start_date=timezone.parse(self.default_time), @@ -148,11 +148,11 @@ def _create_test_dag_run(self, state="running", extra_dag=False, commit=True, id if extra_dag: for i in range(idx_start + 2, idx_start + 4): - dags.append(DagModel(dag_id="TEST_DAG_ID_" + str(i))) + dags.append(DagModel(dag_id=f"TEST_DAG_ID_{i}")) dag_runs.append( DagRun( - dag_id="TEST_DAG_ID_" + str(i), - run_id="TEST_DAG_RUN_ID_" + str(i), + dag_id=f"TEST_DAG_ID_{i}", + run_id=f"TEST_DAG_RUN_ID_{i}", run_type=DagRunType.MANUAL, execution_date=timezone.parse(self.default_time_2), start_date=timezone.parse(self.default_time), @@ -496,7 +496,7 @@ def _create_dag_runs(self, count): dag_runs = [ DagRun( dag_id="TEST_DAG_ID", - run_id="TEST_DAG_RUN_ID" + str(i), + run_id=f"TEST_DAG_RUN_ID{i}", run_type=DagRunType.MANUAL, execution_date=timezone.parse(self.default_time) + timedelta(minutes=i), start_date=timezone.parse(self.default_time), @@ -581,7 +581,7 @@ def _create_dag_runs(self): return [ DagRun( dag_id="TEST_DAG_ID", - run_id="TEST_START_EXEC_DAY_1" + str(i), + run_id=f"TEST_START_EXEC_DAY_1{i}", run_type=DagRunType.MANUAL, execution_date=timezone.parse(dates[i]), start_date=timezone.parse(dates[i]), @@ -872,7 +872,7 @@ def _create_dag_runs(self, count): dag_runs = [ DagRun( dag_id="TEST_DAG_ID", - run_id="TEST_DAG_RUN_ID" + str(i), + run_id=f"TEST_DAG_RUN_ID{i}", state="running", run_type=DagRunType.MANUAL, execution_date=timezone.parse(self.default_time) + timedelta(minutes=i), @@ -956,7 +956,7 @@ def _create_dag_runs(self): dag_runs = [ DagRun( dag_id="TEST_DAG_ID", - run_id="TEST_START_EXEC_DAY_1" + str(i), + run_id=f"TEST_START_EXEC_DAY_1{i}", run_type=DagRunType.MANUAL, execution_date=timezone.parse(dates[i]), start_date=timezone.parse(dates[i]), diff --git a/tests/api_connexion/endpoints/test_event_log_endpoint.py b/tests/api_connexion/endpoints/test_event_log_endpoint.py index 18439f8a92128..ee1efba55bd62 100644 --- a/tests/api_connexion/endpoints/test_event_log_endpoint.py +++ b/tests/api_connexion/endpoints/test_event_log_endpoint.py @@ -319,4 +319,4 @@ def test_should_return_conf_max_if_req_max_above_conf(self, task_instance, sessi assert len(response.json["event_logs"]) == 150 def _create_event_logs(self, task_instance, count): - return [Log(event="TEST_EVENT_" + str(i), task_instance=task_instance) for i in range(1, count + 1)] + return [Log(event=f"TEST_EVENT_{i}", task_instance=task_instance) for i in range(1, count + 1)] diff --git a/tests/dags/test_example_bash_operator.py b/tests/dags/test_example_bash_operator.py index 3036487b66490..1678799f4b196 100644 --- a/tests/dags/test_example_bash_operator.py +++ b/tests/dags/test_example_bash_operator.py @@ -38,7 +38,7 @@ for i in range(3): task = BashOperator( - task_id="runme_" + str(i), bash_command='echo "{{ task_instance_key_str }}" && sleep 1', dag=dag + task_id=f"runme_{i}", bash_command='echo "{{ task_instance_key_str }}" && sleep 1', dag=dag ) task.set_downstream(run_this) diff --git a/tests/dags/test_miscellaneous.py b/tests/dags/test_miscellaneous.py index 587d33e69281c..cad59dda1e349 100644 --- a/tests/dags/test_miscellaneous.py +++ b/tests/dags/test_miscellaneous.py @@ -55,7 +55,7 @@ for i in range(3): task = BashOperator( - task_id="runme_" + str(i), + task_id=f"runme_{i}", bash_command='echo "{{ task_instance_key_str }}" && sleep 1', dag=dag, ) diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index ec504ba186db8..a832d3ab854cf 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -532,11 +532,11 @@ def test_dags_clear(self): num_of_dags = 5 for i in range(num_of_dags): dag = DAG( - "test_dag_clear_" + str(i), + f"test_dag_clear_{i}", start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + datetime.timedelta(days=10), ) - task = EmptyOperator(task_id="test_task_clear_" + str(i), owner="test", dag=dag) + task = EmptyOperator(task_id=f"test_task_clear_{i}", owner="test", dag=dag) dr = dag.create_dagrun( execution_date=DEFAULT_DATE, diff --git a/tests/providers/amazon/aws/utils/eks_test_utils.py b/tests/providers/amazon/aws/utils/eks_test_utils.py index ffab5d7fbfc38..0da93e25452de 100644 --- a/tests/providers/amazon/aws/utils/eks_test_utils.py +++ b/tests/providers/amazon/aws/utils/eks_test_utils.py @@ -92,7 +92,7 @@ def generate_clusters(eks_hook: EksHook, num_clusters: int, minimal: bool) -> li """ # Generates N clusters named cluster0, cluster1, .., clusterN return [ - eks_hook.create_cluster(name=f"cluster{str(count)}", **_input_builder(ClusterInputs, minimal))[ + eks_hook.create_cluster(name=f"cluster{count}", **_input_builder(ClusterInputs, minimal))[ ResponseAttributes.CLUSTER ][ClusterAttributes.NAME] for count in range(num_clusters) @@ -114,7 +114,7 @@ def generate_fargate_profiles( # Generates N Fargate profiles named profile0, profile1, .., profileN return [ eks_hook.create_fargate_profile( - fargateProfileName=f"profile{str(count)}", + fargateProfileName=f"profile{count}", clusterName=cluster_name, **_input_builder(FargateProfileInputs, minimal), )[ResponseAttributes.FARGATE_PROFILE][FargateProfileAttributes.FARGATE_PROFILE_NAME] @@ -137,7 +137,7 @@ def generate_nodegroups( # Generates N nodegroups named nodegroup0, nodegroup1, .., nodegroupN return [ eks_hook.create_nodegroup( - nodegroupName=f"nodegroup{str(count)}", + nodegroupName=f"nodegroup{count}", clusterName=cluster_name, **_input_builder(NodegroupInputs, minimal), )[ResponseAttributes.NODEGROUP][NodegroupAttributes.NODEGROUP_NAME] diff --git a/tests/providers/cncf/kubernetes/models/test_secret.py b/tests/providers/cncf/kubernetes/models/test_secret.py index d6b8a38c2d16e..556a5cc30a7f2 100644 --- a/tests/providers/cncf/kubernetes/models/test_secret.py +++ b/tests/providers/cncf/kubernetes/models/test_secret.py @@ -113,7 +113,7 @@ def test_attach_to_pod(self, mock_rand_str, mock_uuid): "volumeMounts": [ { "mountPath": "/etc/foo", - "name": "secretvol" + str(static_uuid), + "name": f"secretvol{static_uuid}", "readOnly": True, }, ], @@ -123,7 +123,7 @@ def test_attach_to_pod(self, mock_rand_str, mock_uuid): "imagePullSecrets": [{"name": "pull_secret_a"}, {"name": "pull_secret_b"}], "securityContext": {"fsGroup": 2000, "runAsUser": 1000}, "volumes": [ - {"name": "secretvol" + str(static_uuid), "secret": {"secretName": "secret_b"}}, + {"name": f"secretvol{static_uuid}", "secret": {"secretName": "secret_b"}}, ], }, } diff --git a/tests/providers/microsoft/azure/hooks/test_asb.py b/tests/providers/microsoft/azure/hooks/test_asb.py index 61ce05a0b90b2..a9a3851561623 100644 --- a/tests/providers/microsoft/azure/hooks/test_asb.py +++ b/tests/providers/microsoft/azure/hooks/test_asb.py @@ -30,7 +30,7 @@ from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook MESSAGE = "Test Message" -MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)] +MESSAGE_LIST = [f"{MESSAGE} {n}" for n in range(0, 10)] class TestAdminClientHook: diff --git a/tests/system/providers/github/example_github.py b/tests/system/providers/github/example_github.py index a791a5bd71010..658fcc470f0a6 100644 --- a/tests/system/providers/github/example_github.py +++ b/tests/system/providers/github/example_github.py @@ -61,9 +61,9 @@ def tag_checker(repo: Any, tag_name: str) -> bool | None: result = tag_name in all_tags except GithubException as github_error: # type: ignore[misc] - raise AirflowException(f"Failed to execute GithubSensor, error: {str(github_error)}") + raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}") except Exception as e: - raise AirflowException(f"GitHub operator error: {str(e)}") + raise AirflowException(f"GitHub operator error: {e}") return result github_sensor = GithubSensor( diff --git a/tests/system/providers/microsoft/azure/example_azure_service_bus.py b/tests/system/providers/microsoft/azure/example_azure_service_bus.py index 7c4a99786d820..1507e6281e2de 100644 --- a/tests/system/providers/microsoft/azure/example_azure_service_bus.py +++ b/tests/system/providers/microsoft/azure/example_azure_service_bus.py @@ -45,7 +45,7 @@ CLIENT_ID = os.getenv("CLIENT_ID", "") QUEUE_NAME = "sb_mgmt_queue_test" MESSAGE = "Test Message" -MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)] +MESSAGE_LIST = [f"{MESSAGE} {n}" for n in range(0, 10)] TOPIC_NAME = "sb_mgmt_topic_test" SUBSCRIPTION_NAME = "sb_mgmt_subscription"