From dfccccdd3c12ba6dd50a69f34d97cb20f1b8f200 Mon Sep 17 00:00:00 2001 From: sanjay pillai Date: Mon, 23 Jan 2023 13:28:35 +0530 Subject: [PATCH] Fix #28391 manual task trigger from UI fails for k8s executor Manual task trigger from UI fails for k8s executor. the executor.job_id is currently set to "manual". the task instance queued_by_job_id field is expected to be None|Integer. this causes the filter query in clear_not_launched_queued_tasks method in kubernetes_executor to fail with psycopg2.errors.InvalidTextRepresentation invalid input syntax for integer: "manual" error. setting the job_id to None fixes the issue. --- airflow/cli/commands/task_command.py | 2 +- airflow/executors/kubernetes_executor.py | 2 -- airflow/www/views.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 0f3f5a8bba6ca..9ab96e2faa139 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -221,7 +221,7 @@ def _run_task_by_executor(args, dag, ti): print(e) raise e executor = ExecutorLoader.get_default_executor() - executor.job_id = "manual" + executor.job_id = None executor.start() print("Sending to executor.") executor.queue_task_instance( diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 040ca21856fad..e1d7b06a98d07 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -569,8 +569,6 @@ def clear_not_launched_queued_tasks(self, session=None) -> None: def start(self) -> None: """Starts the executor.""" self.log.info("Start Kubernetes executor") - if not self.job_id: - raise AirflowException("Could not get scheduler_job_id") self.scheduler_job_id = str(self.job_id) self.log.debug("Start with scheduler_job_id: %s", self.scheduler_job_id) self.kube_client = get_kube_client() diff --git a/airflow/www/views.py b/airflow/www/views.py index 41f3a89b8a267..f74d09162cd9b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1882,7 +1882,7 @@ def run(self, session=None): msg = f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}" return redirect_or_json(origin, msg, "error", 400) - executor.job_id = "manual" + executor.job_id = None executor.start() executor.queue_task_instance( ti,