From b902ad494671b5435cf8578d8cabd66d6ba7818d Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 9 Jun 2026 22:03:41 +0800 Subject: [PATCH] Fix remote log provider type mismatch for callback-subprocess uploads have no task instance) but only updated Elasticsearch and Opensearch, leaving S3, CloudWatch, GCS, Wasb, Stackdriver, OSS and HDFS with a required ti so they no longer satisfied the RemoteLogIO protocol. Widen their upload signature to accept an optional ti; the bodies don't use ti, so there is no behavior change. --- .../src/airflow/providers/alibaba/cloud/log/oss_task_handler.py | 2 +- .../airflow/providers/amazon/aws/log/cloudwatch_task_handler.py | 2 +- .../src/airflow/providers/amazon/aws/log/s3_task_handler.py | 2 +- .../src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py | 2 +- .../src/airflow/providers/google/cloud/log/gcs_task_handler.py | 2 +- .../providers/google/cloud/log/stackdriver_task_handler.py | 2 +- .../airflow/providers/microsoft/azure/log/wasb_task_handler.py | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py index d09261b3df93e..a000c6ff01f36 100644 --- a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -44,7 +44,7 @@ class OSSRemoteLogIO(LoggingMixin): # noqa: D101 processors = () - def upload(self, path: os.PathLike | str, ti: RuntimeTI): + def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None: """Upload the given log path to the remote storage.""" path = Path(path) if path.is_absolute(): diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 055cfd4493306..5bc286cb9631d 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -166,7 +166,7 @@ def close(self): self.handler.flush() - def upload(self, path: os.PathLike | str, ti: RuntimeTI): + def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None: """Upload the given log path to the remote storage.""" # No batch upload — logs stream in real-time. Flush pending events and clean up. self.close() diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index e9cc98ac632f7..d27530f7c871d 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -46,7 +46,7 @@ class S3RemoteLogIO(LoggingMixin): # noqa: D101 processors = () - def upload(self, path: os.PathLike | str, ti: RuntimeTI): + def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None: """Upload the given log path to the remote storage.""" path = pathlib.Path(path) if path.is_absolute(): diff --git a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py index 970420a1e1bfa..22304637db237 100644 --- a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py +++ b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py @@ -46,7 +46,7 @@ class HdfsRemoteLogIO(LoggingMixin): # noqa: D101 processors = () - def upload(self, path: os.PathLike | str, ti: RuntimeTI): + def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None: """Upload the given log path to the remote storage.""" path = Path(path) if path.is_absolute(): diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index f9a64b4929eda..42c4ba9a61b41 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -70,7 +70,7 @@ class GCSRemoteLogIO(LoggingMixin): # noqa: D101 processors = () - def upload(self, path: os.PathLike | str, ti: RuntimeTI): + def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None: """Upload the given log path to the remote storage.""" path = Path(path) if path.is_absolute(): diff --git a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py index 262f141fabb70..41992d98ff24a 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -176,7 +176,7 @@ def proc( return (proc,) - def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None: + def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None: """Flush the transport and optionally delete local log files.""" self.transport.flush() if self.delete_local_copy: diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py index c32d5410b371d..71704d82588c5 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -49,7 +49,7 @@ class WasbRemoteLogIO(LoggingMixin): # noqa: D101 processors = () - def upload(self, path: str | os.PathLike, ti: RuntimeTI): + def upload(self, path: str | os.PathLike, ti: RuntimeTI | None = None) -> None: """Upload the given log path to the remote storage.""" path = Path(path) if path.is_absolute():