From 014b9653a6c853a4e83162840aa792137b8fd579 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Mon, 13 Apr 2026 10:01:29 +0900 Subject: [PATCH 1/6] fix memory leak in worker caused by log_file_descriptor --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 757a73b7e2edc..0f4aeb15612a9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -47,6 +47,7 @@ import psutil import structlog from pydantic import BaseModel, TypeAdapter +from structlog._output import WRITE_LOCKS from airflow.sdk._shared.logging.structlog import reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError @@ -2328,6 +2329,7 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() + WRITE_LOCKS.pop(log_file_descriptor) provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait From e3d9fd162b9bd1f0e19b7f53ca2d8911cf617e54 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 23 Apr 2026 13:18:37 +0900 Subject: [PATCH 2/6] fix logic --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 0f4aeb15612a9..cb52ac0280cb7 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -47,7 +47,6 @@ import psutil import structlog from pydantic import BaseModel, TypeAdapter -from structlog._output import WRITE_LOCKS from airflow.sdk._shared.logging.structlog import reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError @@ -149,10 +148,17 @@ except ImportError: send_fds = None # type: ignore[assignment] +<<<<<<< HEAD from opentelemetry import context as otel_context, trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator _trace_propagator = TraceContextTextMapPropagator() +======= +try: + from structlog._output import WRITE_LOCKS +except ImportError: + WRITE_LOCKS = None # type: ignore[assignment] +>>>>>>> d26179d9e0 (fix logic) if TYPE_CHECKING: from structlog.typing import FilteringBoundLogger, WrappedLogger @@ -2329,7 +2335,8 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() - WRITE_LOCKS.pop(log_file_descriptor) + if WRITE_LOCKS is not None: + WRITE_LOCKS.pop(log_file_descriptor, None) provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait From 90c56a5ec7f867e3dca42db2bd4eab9cfccdf790 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 30 Apr 2026 15:23:22 +0900 Subject: [PATCH 3/6] fix logic --- .../logging/src/airflow_shared/logging/structlog.py | 12 +++++++++++- .../sdk/execution_time/callback_supervisor.py | 2 ++ .../src/airflow/sdk/execution_time/supervisor.py | 13 +++---------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 7dddafe3b0a25..865c4f4219697 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -28,7 +28,7 @@ from functools import cache, cached_property, partial from pathlib import Path from types import ModuleType -from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast +from typing import IO, TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast import pygtrie import structlog @@ -809,6 +809,16 @@ def reconfigure_logger( ) +def clear_structlog_shared_lock(log_file_descriptor: IO[Any]): + try: + from structlog._output import WRITE_LOCKS + except ImportError: + WRITE_LOCKS = None # type: ignore[assignment] + + if WRITE_LOCKS is not None: + WRITE_LOCKS.pop(log_file_descriptor, None) + + if __name__ == "__main__": configure_logging( # json_output=True, diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py index 94d84193192db..d698ddfbef317 100644 --- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py @@ -28,6 +28,7 @@ import structlog from pydantic import Field, TypeAdapter +from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock from airflow.sdk._shared.module_loading import accepts_context, accepts_keyword_args from airflow.sdk.exceptions import ErrorType from airflow.sdk.execution_time.comms import ( @@ -378,3 +379,4 @@ def supervise_callback( finally: if log_path and log_file_descriptor: log_file_descriptor.close() + clear_structlog_shared_lock(log_file_descriptor) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index cb52ac0280cb7..98d24187ff479 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -48,7 +48,7 @@ import structlog from pydantic import BaseModel, TypeAdapter -from airflow.sdk._shared.logging.structlog import reconfigure_logger +from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock, reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError from airflow.sdk.api.datamodels._generated import ( AssetResponse, @@ -148,17 +148,10 @@ except ImportError: send_fds = None # type: ignore[assignment] -<<<<<<< HEAD from opentelemetry import context as otel_context, trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator _trace_propagator = TraceContextTextMapPropagator() -======= -try: - from structlog._output import WRITE_LOCKS -except ImportError: - WRITE_LOCKS = None # type: ignore[assignment] ->>>>>>> d26179d9e0 (fix logic) if TYPE_CHECKING: from structlog.typing import FilteringBoundLogger, WrappedLogger @@ -2335,8 +2328,8 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() - if WRITE_LOCKS is not None: - WRITE_LOCKS.pop(log_file_descriptor, None) + clear_structlog_shared_lock(log_file_descriptor) + provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait From a16f701bbb476fe5b90e679636cf8e7d8e021705 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Tue, 12 May 2026 10:08:22 +0900 Subject: [PATCH 4/6] fix logic --- shared/logging/src/airflow_shared/logging/structlog.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 865c4f4219697..60ad3ca17eae1 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -810,12 +810,14 @@ def reconfigure_logger( def clear_structlog_shared_lock(log_file_descriptor: IO[Any]): + """Temporary workaround that prevents a memory leak in the supervisor by removing the FD reference from WRITE_LOCKS.""" + # TODO: remove this logic and bump the structlog version when the next release (possibly 26.1.0) is out try: from structlog._output import WRITE_LOCKS except ImportError: WRITE_LOCKS = None # type: ignore[assignment] - if WRITE_LOCKS is not None: + if WRITE_LOCKS is not None and isinstance(WRITE_LOCKS, dict): WRITE_LOCKS.pop(log_file_descriptor, None) From 46572d0e17602d305cfad2cf2c60482279ba3f53 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Wed, 13 May 2026 18:20:08 +0900 Subject: [PATCH 5/6] fix logic --- .../src/airflow_shared/logging/structlog.py | 26 +++++++++---------- .../sdk/execution_time/callback_supervisor.py | 2 -- .../airflow/sdk/execution_time/supervisor.py | 3 +-- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 60ad3ca17eae1..5441913ec198d 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -24,11 +24,12 @@ import os import re import sys +import weakref from collections.abc import Callable, Iterable, Mapping, Sequence from functools import cache, cached_property, partial from pathlib import Path from types import ModuleType -from typing import IO, TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast +from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast import pygtrie import structlog @@ -593,6 +594,17 @@ def is_atty(): text_output = cast("TextIO", output) logger_factory = LoggerFactory(NamedWriteLogger, io=text_output) + # Replace structlog's WRITE_LOCKS dict with a WeakKeyDictionary so entries + # for closed file descriptors are garbage-collected instead of leaking. + # TODO: drop once structlog ships the upstream fix (tracked for 26.1.0). + try: + from structlog import _output as _structlog_output + + if isinstance(_structlog_output.WRITE_LOCKS, dict): + _structlog_output.WRITE_LOCKS = weakref.WeakKeyDictionary() # type: ignore[assignment] + except Exception: + pass + structlog.configure( processors=shared_pre_chain + [for_structlog], cache_logger_on_first_use=cache_logger_on_first_use, @@ -809,18 +821,6 @@ def reconfigure_logger( ) -def clear_structlog_shared_lock(log_file_descriptor: IO[Any]): - """Temporary workaround that prevents a memory leak in the supervisor by removing the FD reference from WRITE_LOCKS.""" - # TODO: remove this logic and bump the structlog version when the next release (possibly 26.1.0) is out - try: - from structlog._output import WRITE_LOCKS - except ImportError: - WRITE_LOCKS = None # type: ignore[assignment] - - if WRITE_LOCKS is not None and isinstance(WRITE_LOCKS, dict): - WRITE_LOCKS.pop(log_file_descriptor, None) - - if __name__ == "__main__": configure_logging( # json_output=True, diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py index d698ddfbef317..94d84193192db 100644 --- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py @@ -28,7 +28,6 @@ import structlog from pydantic import Field, TypeAdapter -from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock from airflow.sdk._shared.module_loading import accepts_context, accepts_keyword_args from airflow.sdk.exceptions import ErrorType from airflow.sdk.execution_time.comms import ( @@ -379,4 +378,3 @@ def supervise_callback( finally: if log_path and log_file_descriptor: log_file_descriptor.close() - clear_structlog_shared_lock(log_file_descriptor) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 98d24187ff479..aabcbdd64ce6f 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -48,7 +48,7 @@ import structlog from pydantic import BaseModel, TypeAdapter -from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock, reconfigure_logger +from airflow.sdk._shared.logging.structlog import reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError from airflow.sdk.api.datamodels._generated import ( AssetResponse, @@ -2328,7 +2328,6 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() - clear_structlog_shared_lock(log_file_descriptor) provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): From b5af13523cb4889c90ebddae28dad289a6981401 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Wed, 13 May 2026 18:21:30 +0900 Subject: [PATCH 6/6] fix logic --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index aabcbdd64ce6f..757a73b7e2edc 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -2328,7 +2328,6 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() - provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait