From c5031d15feeff3254d0351a6b90b38acd38da72c Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Wed, 11 Jun 2025 23:36:47 +0500 Subject: [PATCH 1/8] Explicitly close log file descriptor --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 3fb5a9877a9a1..7ddae3ce5666a 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1565,6 +1565,7 @@ def supervise( # TODO: Use logging providers to handle the chunked upload for us etc. logger: FilteringBoundLogger | None = None + log_file_descriptor = None if log_path: # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it @@ -1575,8 +1576,10 @@ def supervise( pretty_logs = False if pretty_logs: - underlying_logger: WrappedLogger = structlog.WriteLogger(log_file.open("a", buffering=1)) + log_file_descriptor = log_file.open("a", buffering=1) + underlying_logger: WrappedLogger = structlog.WriteLogger(log_file_descriptor) else: + log_file_descriptor = log_file.open("ab") underlying_logger = structlog.BytesLogger(log_file.open("ab")) processors = logging_processors(enable_pretty_log=pretty_logs)[0] logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() @@ -1602,4 +1605,7 @@ def supervise( exit_code = process.wait() end = time.monotonic() log.info("Task finished", exit_code=exit_code, duration=end - start, final_state=process.final_state) + if log_path and log_file_descriptor: + log_file_descriptor.close() + log.info("Log file closed successfully", log_path=log_path) return exit_code From 750bd868c7c506fc797d3485d210a6ea79794c46 Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Wed, 11 Jun 2025 23:59:31 +0500 Subject: [PATCH 2/8] Update supervisor.py --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 7ddae3ce5666a..28f006ed9509b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1580,7 +1580,7 @@ def supervise( underlying_logger: WrappedLogger = structlog.WriteLogger(log_file_descriptor) else: log_file_descriptor = log_file.open("ab") - underlying_logger = structlog.BytesLogger(log_file.open("ab")) + underlying_logger = structlog.BytesLogger(log_file_descriptor) processors = logging_processors(enable_pretty_log=pretty_logs)[0] logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() From 1b186ab741d0775e252cc0f562f020dfc29dfa15 Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Thu, 12 Jun 2025 00:08:47 +0500 Subject: [PATCH 3/8] fixing mypy checks --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 28f006ed9509b..c9244616a2821 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -32,6 +32,7 @@ from contextlib import contextmanager, suppress from datetime import datetime, timezone from http import HTTPStatus +from io import BufferedWriter from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair from typing import ( TYPE_CHECKING, @@ -1565,7 +1566,7 @@ def supervise( # TODO: Use logging providers to handle the chunked upload for us etc. logger: FilteringBoundLogger | None = None - log_file_descriptor = None + log_file_descriptor: BufferedWriter | None = None if log_path: # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it From 01b513d441632d9cc6a43e4f63e29d9e771c24a1 Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Thu, 12 Jun 2025 13:38:21 +0500 Subject: [PATCH 4/8] removed noisy log message Co-authored-by: Ash Berlin-Taylor --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index c9244616a2821..ed18bb2b966df 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1608,5 +1608,4 @@ def supervise( log.info("Task finished", exit_code=exit_code, duration=end - start, final_state=process.final_state) if log_path and log_file_descriptor: log_file_descriptor.close() - log.info("Log file closed successfully", log_path=log_path) return exit_code From 09b882fb726b5f5be1721cf01f57ca8cd24cbe14 Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Thu, 12 Jun 2025 13:44:11 +0500 Subject: [PATCH 5/8] mypy typings --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index ed18bb2b966df..fa461efefa7d9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -32,10 +32,10 @@ from contextlib import contextmanager, suppress from datetime import datetime, timezone from http import HTTPStatus -from io import BufferedWriter from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair from typing import ( TYPE_CHECKING, + BinaryIO, Callable, ClassVar, NoReturn, @@ -1566,7 +1566,7 @@ def supervise( # TODO: Use logging providers to handle the chunked upload for us etc. logger: FilteringBoundLogger | None = None - log_file_descriptor: BufferedWriter | None = None + log_file_descriptor: BinaryIO | TextIO | None = None if log_path: # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it @@ -1577,10 +1577,10 @@ def supervise( pretty_logs = False if pretty_logs: - log_file_descriptor = log_file.open("a", buffering=1) + log_file_descriptor: TextIO = log_file.open("a", buffering=1) underlying_logger: WrappedLogger = structlog.WriteLogger(log_file_descriptor) else: - log_file_descriptor = log_file.open("ab") + log_file_descriptor: BinaryIO = log_file.open("ab") underlying_logger = structlog.BytesLogger(log_file_descriptor) processors = logging_processors(enable_pretty_log=pretty_logs)[0] logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() From b466ca9aa06637b22307b6e2cb60f6291bff5165 Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Thu, 12 Jun 2025 14:10:42 +0500 Subject: [PATCH 6/8] explicit type casting --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index fa461efefa7d9..bc3bb267466f1 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1577,11 +1577,11 @@ def supervise( pretty_logs = False if pretty_logs: - log_file_descriptor: TextIO = log_file.open("a", buffering=1) - underlying_logger: WrappedLogger = structlog.WriteLogger(log_file_descriptor) + log_file_descriptor = log_file.open("a", buffering=1) + underlying_logger: WrappedLogger = structlog.WriteLogger(cast(TextIO, log_file_descriptor)) else: - log_file_descriptor: BinaryIO = log_file.open("ab") - underlying_logger = structlog.BytesLogger(log_file_descriptor) + log_file_descriptor = log_file.open("ab") + underlying_logger = structlog.BytesLogger(cast(BinaryIO, log_file_descriptor)) processors = logging_processors(enable_pretty_log=pretty_logs)[0] logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() From f71925f7e4011742281e433120f41e96cf79c79a Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Thu, 12 Jun 2025 14:37:07 +0500 Subject: [PATCH 7/8] patch due to ruff linter --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index bc3bb267466f1..ada2632819ecb 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -35,7 +35,6 @@ from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair from typing import ( TYPE_CHECKING, - BinaryIO, Callable, ClassVar, NoReturn, @@ -1578,10 +1577,10 @@ def supervise( pretty_logs = False if pretty_logs: log_file_descriptor = log_file.open("a", buffering=1) - underlying_logger: WrappedLogger = structlog.WriteLogger(cast(TextIO, log_file_descriptor)) + underlying_logger: WrappedLogger = structlog.WriteLogger(cast("TextIO", log_file_descriptor)) else: log_file_descriptor = log_file.open("ab") - underlying_logger = structlog.BytesLogger(cast(BinaryIO, log_file_descriptor)) + underlying_logger = structlog.BytesLogger(cast("BinaryIO", log_file_descriptor)) processors = logging_processors(enable_pretty_log=pretty_logs)[0] logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() From a90c9161b1d11ad1d4df15e26399200dea552763 Mon Sep 17 00:00:00 2001 From: Adylzhan Khashtamov Date: Thu, 12 Jun 2025 14:53:01 +0500 Subject: [PATCH 8/8] fixed import typo --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index ada2632819ecb..dc63daeef7bb7 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -35,6 +35,7 @@ from socket import SO_SNDBUF, SOL_SOCKET, SocketIO, socket, socketpair from typing import ( TYPE_CHECKING, + BinaryIO, Callable, ClassVar, NoReturn,