From 77cbed62824854a54226c19e6e2dac111fb839c7 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sat, 26 Aug 2023 04:33:31 +0200 Subject: [PATCH] Use a single statement with multiple contexts instead of nested statements in core --- airflow/sensors/bash.py | 75 +++++++++++++++++++++-------------------- airflow/utils/db.py | 7 ++-- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/airflow/sensors/bash.py b/airflow/sensors/bash.py index c99421c982527..25c4a547b809a 100644 --- a/airflow/sensors/bash.py +++ b/airflow/sensors/bash.py @@ -68,44 +68,45 @@ def poke(self, context: Context): """Execute the bash command in a temporary directory.""" bash_command = self.bash_command self.log.info("Tmp dir root location: %s", gettempdir()) - with TemporaryDirectory(prefix="airflowtmp") as tmp_dir: - with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: - f.write(bytes(bash_command, "utf_8")) - f.flush() - fname = f.name - script_location = tmp_dir + "/" + fname - self.log.info("Temporary script location: %s", script_location) - self.log.info("Running command: %s", bash_command) + with TemporaryDirectory(prefix="airflowtmp") as tmp_dir, NamedTemporaryFile( + dir=tmp_dir, prefix=self.task_id + ) as f: + f.write(bytes(bash_command, "utf_8")) + f.flush() + fname = f.name + script_location = tmp_dir + "/" + fname + self.log.info("Temporary script location: %s", script_location) + self.log.info("Running command: %s", bash_command) - with Popen( - ["bash", fname], - stdout=PIPE, - stderr=STDOUT, - close_fds=True, - cwd=tmp_dir, - env=self.env, - preexec_fn=os.setsid, - ) as resp: - if resp.stdout: - self.log.info("Output:") - for line in iter(resp.stdout.readline, b""): - self.log.info(line.decode(self.output_encoding).strip()) - resp.wait() - self.log.info("Command exited with return code %s", resp.returncode) + with Popen( + ["bash", fname], + stdout=PIPE, + stderr=STDOUT, + close_fds=True, + cwd=tmp_dir, + env=self.env, + preexec_fn=os.setsid, + ) as resp: + if resp.stdout: + self.log.info("Output:") + for line in iter(resp.stdout.readline, b""): + self.log.info(line.decode(self.output_encoding).strip()) + resp.wait() + self.log.info("Command exited with return code %s", resp.returncode) - # zero code means success, the sensor can go green - if resp.returncode == 0: - return True + # zero code means success, the sensor can go green + if resp.returncode == 0: + return True - # we have a retry exit code, sensor retries if return code matches, otherwise error - elif self.retry_exit_code is not None: - if resp.returncode == self.retry_exit_code: - self.log.info("Return code matches retry code, will retry later") - return False - else: - raise AirflowFailException(f"Command exited with return code {resp.returncode}") - - # backwards compatibility: sensor retries no matter the error code - else: - self.log.info("Non-zero return code and no retry code set, will retry later") + # we have a retry exit code, sensor retries if return code matches, otherwise error + elif self.retry_exit_code is not None: + if resp.returncode == self.retry_exit_code: + self.log.info("Return code matches retry code, will retry later") return False + else: + raise AirflowFailException(f"Command exited with return code {resp.returncode}") + + # backwards compatibility: sensor retries no matter the error code + else: + self.log.info("Non-zero return code and no retry code set, will retry later") + return False diff --git a/airflow/utils/db.py b/airflow/utils/db.py index a5252f4e705cc..db4631f14867a 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1650,10 +1650,9 @@ def resetdb(session: Session = NEW_SESSION, skip_init: bool = False): connection = settings.engine.connect() - with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): - with connection.begin(): - drop_airflow_models(connection) - drop_airflow_moved_tables(connection) + with create_global_lock(session=session, lock=DBLocks.MIGRATIONS), connection.begin(): + drop_airflow_models(connection) + drop_airflow_moved_tables(connection) if not skip_init: initdb(session=session)