Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/get_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ def get_code(dag_id: str) -> str:
try:
return DagCode.get_code_by_fileloc(dag.fileloc)
except (OSError, DagCodeNotFound) as exception:
error_message = f"Error {str(exception)} while reading Dag id {dag_id} Code"
error_message = f"Error {exception} while reading Dag id {dag_id} Code"
raise AirflowException(error_message, exception)
36 changes: 18 additions & 18 deletions airflow/auth/managers/fab/security_manager/modules/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def create_db(self):
if self.count_users() == 0 and self.auth_role_public != self.auth_role_admin:
log.warning(const.LOGMSG_WAR_SEC_NO_USER)
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_CREATE_DB.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_CREATE_DB.format(e))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orthogontal, but I want to investigate fixing these eager formatting at some point.

exit(1)

"""
Expand All @@ -106,7 +106,7 @@ def update_role(self, role_id, name: str) -> Role | None:
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_UPD_ROLE.format(role))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_UPD_ROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_UPD_ROLE.format(e))
self.get_session.rollback()
return None
return role
Expand All @@ -123,7 +123,7 @@ def add_role(self, name: str) -> Role:
log.info(const.LOGMSG_INF_SEC_ADD_ROLE.format(name))
return role
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_ROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_ROLE.format(e))
self.get_session.rollback()
return role

Expand Down Expand Up @@ -190,7 +190,7 @@ def add_user(
log.info(const.LOGMSG_INF_SEC_ADD_USER.format(username))
return user
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_USER.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -226,7 +226,7 @@ def add_register_user(self, username, first_name, last_name, email, password="",
self.get_session.commit()
return register_user
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_REGISTER_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_REGISTER_USER.format(e))
self.get_session.rollback()
return None

Expand Down Expand Up @@ -269,7 +269,7 @@ def update_user(self, user):
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_UPD_USER.format(user))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_UPD_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_UPD_USER.format(e))
self.get_session.rollback()
return False

Expand All @@ -284,7 +284,7 @@ def del_register_user(self, register_user):
self.get_session.commit()
return True
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_REGISTER_USER.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_REGISTER_USER.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -322,7 +322,7 @@ def create_action(self, name):
self.get_session.commit()
return action
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION.format(e))
self.get_session.rollback()
return action

Expand All @@ -349,7 +349,7 @@ def delete_action(self, name: str) -> bool:
self.get_session.commit()
return True
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -383,7 +383,7 @@ def create_resource(self, name) -> Resource:
self.get_session.commit()
return resource
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU.format(e))
self.get_session.rollback()
return resource

Expand Down Expand Up @@ -419,7 +419,7 @@ def delete_resource(self, name: str) -> bool:
self.get_session.commit()
return True
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMISSION.format(e))
self.get_session.rollback()
return False

Expand Down Expand Up @@ -481,10 +481,10 @@ def create_permission(self, action_name, resource_name) -> Permission | None:
try:
self.get_session.add(perm)
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW.format(str(perm)))
log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW.format(perm))
return perm
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW.format(e))
self.get_session.rollback()
return None

Expand Down Expand Up @@ -518,7 +518,7 @@ def delete_permission(self, action_name: str, resource_name: str) -> None:
self.delete_action(perm.action.name)
log.info(const.LOGMSG_INF_SEC_DEL_PERMVIEW.format(action_name, resource_name))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMVIEW.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMVIEW.format(e))
self.get_session.rollback()

def add_permission_to_role(self, role: Role, permission: Permission | None) -> None:
Expand All @@ -534,9 +534,9 @@ def add_permission_to_role(self, role: Role, permission: Permission | None) -> N
role.permissions.append(permission)
self.get_session.merge(role)
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE.format(str(permission), role.name))
log.info(const.LOGMSG_INF_SEC_ADD_PERMROLE.format(permission, role.name))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_ADD_PERMROLE.format(e))
self.get_session.rollback()

def remove_permission_from_role(self, role: Role, permission: Permission) -> None:
Expand All @@ -551,7 +551,7 @@ def remove_permission_from_role(self, role: Role, permission: Permission) -> Non
role.permissions.remove(permission)
self.get_session.merge(role)
self.get_session.commit()
log.info(const.LOGMSG_INF_SEC_DEL_PERMROLE.format(str(permission), role.name))
log.info(const.LOGMSG_INF_SEC_DEL_PERMROLE.format(permission, role.name))
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_DEL_PERMROLE.format(str(e)))
log.error(const.LOGMSG_ERR_SEC_DEL_PERMROLE.format(e))
self.get_session.rollback()
2 changes: 1 addition & 1 deletion airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def task_states_for_dag_run(args, session: Session = NEW_SESSION) -> None:
select(DagRun).where(DagRun.execution_date == execution_date, DagRun.dag_id == args.dag_id)
)
except (ParserError, TypeError) as err:
raise AirflowException(f"Error parsing the supplied execution_date. Error: {str(err)}")
raise AirflowException(f"Error parsing the supplied execution_date. Error: {err}")

if dag_run is None:
raise DagRunNotFound(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

for i in range(3):
task = BashOperator(
task_id="runme_" + str(i),
task_id=f"runme_{i}",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_short_circuit_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def example_short_circuit_decorator():
def check_condition(condition):
return condition

ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]]
ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]]
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id="true_" + str(i)) for i in [1, 2]]
ds_false = [EmptyOperator(task_id="false_" + str(i)) for i in [1, 2]]
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None
try:
self.running.remove(key)
except KeyError:
self.log.debug("Could not find key: %s", str(key))
self.log.debug("Could not find key: %s", key)
self.event_buffer[key] = state, info

def fail(self, key: TaskInstanceKey, info=None) -> None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _run_task(self, ti: TaskInstance) -> bool:
except Exception as e:
ti.set_state(TaskInstanceState.FAILED)
self.change_state(key, TaskInstanceState.FAILED)
self.log.exception("Failed to execute task: %s.", str(e))
self.log.exception("Failed to execute task: %s.", e)
return False

def queue_task_instance(
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _execute_work_in_subprocess(self, command: CommandType) -> TaskInstanceState
subprocess.check_call(command, close_fds=True)
return TaskInstanceState.SUCCESS
except subprocess.CalledProcessError as e:
self.log.error("Failed to execute task %s.", str(e))
self.log.error("Failed to execute task %s.", e)
return TaskInstanceState.FAILED

def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def sync(self) -> None:
self.change_state(key, TaskInstanceState.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, TaskInstanceState.FAILED)
self.log.error("Failed to execute task %s.", str(e))
self.log.error("Failed to execute task %s.", e)

self.commands_to_run = []

Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def kill(self, session: Session = NEW_SESSION) -> NoReturn:
try:
self.on_kill()
except Exception as e:
self.log.error("on_kill() method failed: %s", str(e))
self.log.error("on_kill() method failed: %s", e)
session.merge(job)
session.commit()
raise AirflowException("Job shut down externally.")
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
else:
dagrun_timeout = None
if dagrun_timeout and execution_time > dagrun_timeout:
self.log.warning("DagRun timed out after %s.", str(execution_time))
self.log.warning("DagRun timed out after %s.", execution_time)

# potential race condition, the _run_raw_task commits `success` or other state
# but task_runner does not exit right away due to slow process shutdown or any other reasons
Expand Down
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION)
if callback_to_run:
self._send_dag_callbacks_to_processor(dag, callback_to_run)
except Exception as e: # should not fail the scheduler
self.log.exception("Failed to update dag run state for paused dags due to %s", str(e))
self.log.exception("Failed to update dag run state for paused dags due to %s", e)

def _run_scheduler_loop(self) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _make_shim_fn(name, desired_sig, target):
#
codestr = textwrap.dedent(
f"""
def {name}_name_mismatch_shim{str(desired_sig)}:
def {name}_name_mismatch_shim{desired_sig}:
return __target({' ,'.join(desired_sig.parameters)})
"""
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/hooks/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ def write_batch_data(self, items: Iterable) -> bool:
batch.put_item(Item=item)
return True
except Exception as general_error:
raise AirflowException(f"Failed to insert items in dynamodb, error: {str(general_error)}")
raise AirflowException(f"Failed to insert items in dynamodb, error: {general_error}")
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _read(self, task_instance, try_number, metadata=None):
except Exception as e:
log = (
f"*** Unable to read remote logs from Cloudwatch (log_group: {self.log_group}, log_stream: "
f"{stream_name})\n*** {str(e)}\n\n"
f"{stream_name})\n*** {e}\n\n"
)
self.log.error(log)
local_log, metadata = super()._read(task_instance, try_number, metadata)
Expand Down
4 changes: 1 addition & 3 deletions airflow/providers/amazon/aws/sensors/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ def poke(self, context: Context):
response = self.hook.conn.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)

if "Successful" not in response:
raise AirflowException(
"Delete SQS Messages failed " + str(response) + " for messages " + str(messages)
)
raise AirflowException(f"Delete SQS Messages failed {response} for messages {messages}")
if not len(message_batch):
return False

Expand Down
4 changes: 1 addition & 3 deletions airflow/providers/amazon/aws/triggers/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ async def poke(self, client: Any):
response = await client.delete_message_batch(QueueUrl=self.sqs_queue, Entries=entries)

if "Successful" not in response:
raise AirflowException(
f"Delete SQS Messages failed {str(response)} for messages {str(messages)}"
)
raise AirflowException(f"Delete SQS Messages failed {response} for messages {messages}")

return message_batch

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/livy/triggers/livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
{
"status": "error",
"batch_id": self._batch_id,
"response": f"Batch {self._batch_id} did not succeed with {str(exc)}",
"response": f"Batch {self._batch_id} did not succeed with {exc}",
"log_lines": None,
}
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/arangodb/hooks/arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def query(self, query, **kwargs) -> Cursor:
f"Failed to execute AQLQuery, error connecting to database: {self.database}"
)
except AQLQueryExecuteError as error:
raise AirflowException(f"Failed to execute AQLQuery, error: {str(error)}")
raise AirflowException(f"Failed to execute AQLQuery, error: {error}")

def create_collection(self, name):
if not self.db_conn.has_collection(name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li
if log:
messages.append("Found logs through kube API")
except Exception as e:
messages.append(f"Reading from k8s pod logs failed: {str(e)}")
messages.append(f"Reading from k8s pod logs failed: {e}")
return messages, ["\n".join(log)]

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/dbt/cloud/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async def get_job_status(
Valid values are "trigger", "job", "repository", and "environment".
"""
try:
self.log.info("Getting the status of job run %s.", str(run_id))
self.log.info("Getting the status of job run %s.", run_id)
response = await self.get_job_details(
run_id, account_id=account_id, include_related=include_related
)
Expand Down Expand Up @@ -490,14 +490,12 @@ def get_job_run_status(self, run_id: int, account_id: int | None = None) -> int:
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The status of a dbt Cloud job run.
"""
self.log.info("Getting the status of job run %s.", str(run_id))
self.log.info("Getting the status of job run %s.", run_id)

job_run = self.get_job_run(account_id=account_id, run_id=run_id)
job_run_status = job_run.json()["data"]["status"]

self.log.info(
"Current status of job run %s: %s", str(run_id), DbtCloudJobRunStatus(job_run_status).name
)
self.log.info("Current status of job run %s: %s", run_id, DbtCloudJobRunStatus(job_run_status).name)

return job_run_status

Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/ftp/sensors/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ def poke(self, context: Context) -> bool:
self.log.info("Poking for %s", self.path)
try:
mod_time = hook.get_mod_time(self.path)
self.log.info("Found File %s last modified: %s", str(self.path), str(mod_time))
self.log.info("Found File %s last modified: %s", self.path, mod_time)

except ftplib.error_perm as e:
self.log.error("Ftp error encountered: %s", str(e))
self.log.error("Ftp error encountered: %s", e)
error_code = self._get_error_code(e)
if (error_code != 550) and (
self.fail_on_transient_errors or (error_code not in self.transient_errors)
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/github/operators/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ def execute(self, context: Context) -> Any:
return github_result

except GithubException as github_error:
raise AirflowException(f"Failed to execute GithubOperator, error: {str(github_error)}")
raise AirflowException(f"Failed to execute GithubOperator, error: {github_error}")
except Exception as e:
raise AirflowException(f"GitHub operator error: {str(e)}")
raise AirflowException(f"GitHub operator error: {e}")
4 changes: 2 additions & 2 deletions airflow/providers/github/sensors/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ def tag_checker(self, repo: Any) -> bool | None:
result = self.tag_name in all_tags

except GithubException as github_error: # type: ignore[misc]
raise AirflowException(f"Failed to execute GithubSensor, error: {str(github_error)}")
raise AirflowException(f"Failed to execute GithubSensor, error: {github_error}")
except Exception as e:
raise AirflowException(f"GitHub operator error: {str(e)}")
raise AirflowException(f"GitHub operator error: {e}")

if result is True:
self.log.info("Tag %s exists in %s repository, Success.", self.tag_name, self.repository_name)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/log/gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def gcs_write(self, log, remote_log_location) -> bool:
pass
else:
log += self._add_message(
f"Error checking for previous log; if exists, may be overwritten: {str(e)}"
f"Error checking for previous log; if exists, may be overwritten: {e}"
)
self.log.warning("Error checking for previous log: %s", e)
try:
Expand Down
Loading