From 98c7731d0e2fe60806fd909b84100c1f1da6a474 Mon Sep 17 00:00:00 2001 From: Xch1 Date: Mon, 18 May 2026 13:41:41 +0800 Subject: [PATCH] refactor: GetXCom check to handler Signed-off-by: Xch1 --- .../src/airflow/dag_processing/processor.py | 9 ++------- .../src/airflow/jobs/triggerer_job_runner.py | 10 ++-------- .../airflow/sdk/execution_time/request_handlers.py | 13 +++++++++++++ .../src/airflow/sdk/execution_time/supervisor.py | 8 ++------ 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index d7c0a9d2b59fc..9c375c3c1909d 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -69,6 +69,7 @@ XComSequenceIndexResult, XComSequenceSliceResult, ) +from airflow.sdk.execution_time.request_handlers import handle_get_variable_keys, handle_get_xcom from airflow.sdk.execution_time.supervisor import WatchedSubprocess from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification from airflow.sdk.log import mask_secret @@ -633,8 +634,6 @@ def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int else: resp = var elif isinstance(msg, GetVariableKeys): - from airflow.sdk.execution_time.request_handlers import handle_get_variable_keys - resp, dump_opts = handle_get_variable_keys(self.client, msg) elif isinstance(msg, PutVariable): self.client.variables.set(msg.key, msg.value, msg.description) @@ -652,11 +651,7 @@ def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int resp = dagrun_result dump_opts = {"exclude_unset": True} elif isinstance(msg, GetXCom): - xcom = self.client.xcoms.get( - msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index, msg.include_prior_dates - ) - xcom_result = XComResult.from_xcom_response(xcom) - resp = xcom_result + resp, dump_opts = handle_get_xcom(self.client, msg) elif isinstance(msg, GetXComCount): resp = self.client.xcoms.head(msg.dag_id, msg.run_id, msg.task_id, msg.key) elif isinstance(msg, GetXComSequenceItem): diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 2b4db481c266e..2da4add832930 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -93,6 +93,7 @@ handle_get_connection, handle_get_variable, handle_get_variable_keys, + handle_get_xcom, handle_mask_secret, ) from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader @@ -496,7 +497,6 @@ def make_client(self) -> Client: def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, req_id: int) -> None: from airflow.sdk.api.datamodels._generated import ( TaskStatesResponse, - XComResponse, ) resp: BaseModel | None = None @@ -546,13 +546,7 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r elif isinstance(msg, DeleteXCom): self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) elif isinstance(msg, GetXCom): - xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index) - if isinstance(xcom, XComResponse): - xcom_result = XComResult.from_xcom_response(xcom) - resp = xcom_result - dump_opts = {"exclude_unset": True} - else: - resp = xcom + resp, dump_opts = handle_get_xcom(self.client, msg) elif isinstance(msg, SetXCom): self.client.xcoms.set( msg.dag_id, diff --git a/task-sdk/src/airflow/sdk/execution_time/request_handlers.py b/task-sdk/src/airflow/sdk/execution_time/request_handlers.py index fbd0e1cee5893..07ff1927c6a3a 100644 --- a/task-sdk/src/airflow/sdk/execution_time/request_handlers.py +++ b/task-sdk/src/airflow/sdk/execution_time/request_handlers.py @@ -32,15 +32,18 @@ from airflow.sdk.api.datamodels._generated import ( ConnectionResponse, VariableResponse, + XComResponse, ) from airflow.sdk.execution_time.comms import ( ConnectionResult, GetConnection, GetVariable, GetVariableKeys, + GetXCom, MaskSecret, VariableKeysResult, VariableResult, + XComResult, ) from airflow.sdk.log import mask_secret @@ -83,6 +86,16 @@ def handle_get_variable_keys( ) +def handle_get_xcom(client: Client, msg: GetXCom) -> tuple[BaseModel | None, dict[str, bool]]: + """Fetch an XCom value.""" + xcom = client.xcoms.get( + msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index, msg.include_prior_dates + ) + if isinstance(xcom, XComResponse): + return XComResult.from_xcom_response(xcom), {"exclude_unset": True} + return xcom, {} + + def handle_mask_secret(msg: MaskSecret) -> None: """Register a value with the secrets masker.""" mask_secret(msg.value, msg.name) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 2269f22420f55..0ffe22e1641ae 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -132,7 +132,6 @@ ToSupervisor, TriggerDagRun, ValidateInletsAndOutlets, - XComResult, XComSequenceIndexResult, XComSequenceSliceResult, _RequestFrame, @@ -142,6 +141,7 @@ handle_get_connection, handle_get_variable, handle_get_variable_keys, + handle_get_xcom, handle_mask_secret, ) @@ -1458,11 +1458,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: elif isinstance(msg, GetVariableKeys): resp, dump_opts = handle_get_variable_keys(self.client, msg) elif isinstance(msg, GetXCom): - xcom = self.client.xcoms.get( - msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index, msg.include_prior_dates - ) - xcom_result = XComResult.from_xcom_response(xcom) - resp = xcom_result + resp, dump_opts = handle_get_xcom(self.client, msg) elif isinstance(msg, GetXComCount): resp = self.client.xcoms.head(msg.dag_id, msg.run_id, msg.task_id, msg.key) elif isinstance(msg, GetXComSequenceItem):