From 55eb66b33d9a545b6c48d3f9b51623e09bb39b28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sat, 7 Feb 2026 16:22:23 -0300 Subject: [PATCH 1/8] Fix ASYNC240: replace blocking filesystem calls in async functions with anyio Replace blocking os.path and pathlib.Path calls inside async functions with async-safe anyio.Path equivalents to prevent blocking the event loop during filesystem I/O. Re-enable the ASYNC240 ruff rule. closes #61418 --- airflow-core/newsfragments/61418.significant.rst | 5 +++++ .../tests/unit/api_fastapi/auth/test_tokens.py | 5 +++-- .../src/sphinx_exts/pagefind_search/builder.py | 7 ++++--- .../src/airflow/providers/edge3/cli/worker.py | 4 +++- .../edge3/tests/unit/edge3/cli/test_worker.py | 9 +++++---- .../airflow/providers/standard/triggers/file.py | 15 +++++++++------ .../tests/unit/standard/triggers/test_file.py | 7 ++++--- pyproject.toml | 1 - 8 files changed, 33 insertions(+), 20 deletions(-) create mode 100644 airflow-core/newsfragments/61418.significant.rst diff --git a/airflow-core/newsfragments/61418.significant.rst b/airflow-core/newsfragments/61418.significant.rst new file mode 100644 index 0000000000000..92cffbd290454 --- /dev/null +++ b/airflow-core/newsfragments/61418.significant.rst @@ -0,0 +1,5 @@ +Replace blocking filesystem calls in async functions with ``anyio`` equivalents + +Blocking ``os.path`` and ``pathlib.Path`` calls inside async functions have been replaced +with their async-safe ``anyio.Path`` counterparts, fixing all ASYNC240 lint violations. +This prevents blocking the event loop during filesystem I/O in triggers and other async code paths. diff --git a/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py b/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py index d0fe41e14500a..b4caee15d3f83 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py +++ b/airflow-core/tests/unit/api_fastapi/auth/test_tokens.py @@ -23,6 +23,7 @@ from typing import TYPE_CHECKING from unittest.mock import patch +import anyio import httpx import jwt import pytest @@ -213,10 +214,10 @@ async def test_jwt_generate_validate_roundtrip_with_jwks(private_key, algorithm, jwk_content = json.dumps({"keys": [key_to_jwk_dict(private_key, "custom-kid")]}) jwks = tmp_path.joinpath("jwks.json") - jwks.write_text(jwk_content) + await anyio.Path(jwks).write_text(jwk_content) priv_key = tmp_path.joinpath("key.pem") - priv_key.write_bytes(key_to_pem(private_key)) + await anyio.Path(priv_key).write_bytes(key_to_pem(private_key)) with conf_vars( { diff --git a/devel-common/src/sphinx_exts/pagefind_search/builder.py b/devel-common/src/sphinx_exts/pagefind_search/builder.py index 066123bd98020..43651a991d3d4 100644 --- a/devel-common/src/sphinx_exts/pagefind_search/builder.py +++ b/devel-common/src/sphinx_exts/pagefind_search/builder.py @@ -25,6 +25,7 @@ from pathlib import Path from typing import TYPE_CHECKING +import anyio from pagefind.index import IndexConfig, PagefindIndex from sphinx.util.fileutil import copy_asset @@ -118,8 +119,8 @@ async def build_pagefind_index(app: Sphinx) -> dict[str, int]: skipped = 0 async with PagefindIndex(config=config) as index: - for html_file in output_dir.glob(app.config.pagefind_glob): - if not html_file.is_file(): + for html_file in await anyio.Path(output_dir).glob(app.config.pagefind_glob): + if not await anyio.Path(html_file).is_file(): continue relative_path = html_file.relative_to(output_dir) @@ -131,7 +132,7 @@ async def build_pagefind_index(app: Sphinx) -> dict[str, int]: continue try: - content = html_file.read_text(encoding="utf-8") + content = await anyio.Path(html_file).read_text(encoding="utf-8") await index.add_html_file( content=content, source_path=str(html_file), diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py b/providers/edge3/src/airflow/providers/edge3/cli/worker.py index 7ffd2b5aaffe6..c4aa1d735c509 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py @@ -29,6 +29,7 @@ from pathlib import Path from typing import TYPE_CHECKING +import anyio from aiofiles import open as aio_open from aiohttp import ClientResponseError from lockfile.pidlockfile import remove_existing_pidfile @@ -238,7 +239,8 @@ def _launch_job(self, workload: ExecuteTask) -> tuple[Process, Queue[Exception]] return process, results_queue async def _push_logs_in_chunks(self, job: Job): - if push_logs and job.logfile.exists() and job.logfile.stat().st_size > job.logsize: + aio_logfile = anyio.Path(job.logfile) + if push_logs and await aio_logfile.exists() and (await aio_logfile.stat()).st_size > job.logsize: async with aio_open(job.logfile, mode="rb") as logf: await logf.seek(job.logsize, os.SEEK_SET) read_data = await logf.read() diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index 5a504b0c33681..8a0357207d8dd 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -26,6 +26,7 @@ from unittest import mock from unittest.mock import call, patch +import anyio import pytest import time_machine from aiohttp import ClientResponseError, RequestInfo @@ -307,7 +308,7 @@ async def test_fetch_and_run_job_one_job_fail( @pytest.mark.asyncio async def test_push_logs_in_chunks(self, mock_logs_push, worker_with_job: EdgeWorker): job = EdgeWorker.jobs[0] - job.logfile.write_text("some log content") + await anyio.Path(job.logfile).write_text("some log content") with conf_vars({("edge", "api_url"): "https://invalid-api-test-endpoint"}): await worker_with_job._push_logs_in_chunks(job) @@ -321,9 +322,9 @@ async def test_push_logs_in_chunks(self, mock_logs_push, worker_with_job: EdgeWo @pytest.mark.asyncio async def test_check_running_jobs_log_push_increment(self, mock_logs_push, worker_with_job: EdgeWorker): job = EdgeWorker.jobs[0] - job.logfile.write_text("hello ") - job.logsize = job.logfile.stat().st_size - job.logfile.write_text("hello world") + await anyio.Path(job.logfile).write_text("hello ") + job.logsize = (await anyio.Path(job.logfile).stat()).st_size + await anyio.Path(job.logfile).write_text("hello world") with conf_vars({("edge", "api_url"): "https://invalid-api-test-endpoint"}): await worker_with_job._push_logs_in_chunks(job) assert len(EdgeWorker.jobs) == 1 diff --git a/providers/standard/src/airflow/providers/standard/triggers/file.py b/providers/standard/src/airflow/providers/standard/triggers/file.py index 38f4a5ea178d6..699be775ffa8b 100644 --- a/providers/standard/src/airflow/providers/standard/triggers/file.py +++ b/providers/standard/src/airflow/providers/standard/triggers/file.py @@ -23,6 +23,8 @@ from glob import glob from typing import Any +import anyio + from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: @@ -73,13 +75,13 @@ async def run(self) -> AsyncIterator[TriggerEvent]: """Loop until the relevant files are found.""" while True: for path in glob(self.filepath, recursive=self.recursive): - if os.path.isfile(path): - mod_time_f = os.path.getmtime(path) + if await anyio.Path(path).is_file(): + mod_time_f = (await anyio.Path(path).stat()).st_mtime mod_time = datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S") self.log.info("Found File %s last modified: %s", path, mod_time) yield TriggerEvent(True) return - for _, _, files in os.walk(path): + for _, _, files in await anyio.to_thread.run_sync(lambda: list(os.walk(path))): if files: yield TriggerEvent(True) return @@ -120,11 +122,12 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self) -> AsyncIterator[TriggerEvent]: """Loop until the relevant file is found.""" while True: - if os.path.isfile(self.filepath): - mod_time_f = os.path.getmtime(self.filepath) + filepath = anyio.Path(self.filepath) + if await filepath.is_file(): + mod_time_f = (await filepath.stat()).st_mtime mod_time = datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S") self.log.info("Found file %s last modified: %s", self.filepath, mod_time) - os.remove(self.filepath) + await filepath.unlink() self.log.info("File %s has been deleted", self.filepath) yield TriggerEvent(True) return diff --git a/providers/standard/tests/unit/standard/triggers/test_file.py b/providers/standard/tests/unit/standard/triggers/test_file.py index 2ab66fa360d2f..2d5a52eaf151f 100644 --- a/providers/standard/tests/unit/standard/triggers/test_file.py +++ b/providers/standard/tests/unit/standard/triggers/test_file.py @@ -18,6 +18,7 @@ import asyncio +import anyio import pytest from airflow.providers.standard.triggers.file import FileDeleteTrigger, FileTrigger @@ -43,7 +44,7 @@ def test_serialization(self): async def test_task_file_trigger(self, tmp_path): """Asserts that the trigger only goes off on or after file is found""" tmp_dir = tmp_path / "test_dir" - tmp_dir.mkdir() + await anyio.Path(tmp_dir).mkdir() p = tmp_dir / "hello.txt" trigger = FileTrigger( @@ -84,7 +85,7 @@ def test_serialization(self): async def test_file_delete_trigger(self, tmp_path): """Asserts that the trigger goes off on or after file is found and that the files gets deleted.""" tmp_dir = tmp_path / "test_dir" - tmp_dir.mkdir() + await anyio.Path(tmp_dir).mkdir() p = tmp_dir / "hello.txt" trigger = FileDeleteTrigger( @@ -101,7 +102,7 @@ async def test_file_delete_trigger(self, tmp_path): p.touch() await asyncio.sleep(0.5) - assert p.exists() is False + assert await anyio.Path(p).exists() is False # Prevents error when task is destroyed while in "pending" state asyncio.get_event_loop().stop() diff --git a/pyproject.toml b/pyproject.toml index 1295d6a216166..540c8400354a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -679,7 +679,6 @@ ignore = [ "COM819", "E501", # Formatted code may exceed the line length, leading to line-too-long (E501) errors. "ASYNC110", # TODO: Use `anyio.Event` instead of awaiting `anyio.sleep` in a `while` loop - "ASYNC240", # TODO: Async functions should not use os.path methods, use trio.Path or anyio.path "SIM105", # Use contextlib.suppress({exception}) instead of try-except-pass ] unfixable = [ From 8d2999f9c65b51f5b21c095012801817d7dee459 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sat, 7 Feb 2026 16:39:24 -0300 Subject: [PATCH 2/8] Fix mypy error: use sync helper for pathlib.glob in async context anyio.Path.glob() returns AsyncIterator, not Awaitable. Use a sync helper function with anyio.to_thread.run_sync instead. --- devel-common/src/sphinx_exts/pagefind_search/builder.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/devel-common/src/sphinx_exts/pagefind_search/builder.py b/devel-common/src/sphinx_exts/pagefind_search/builder.py index 43651a991d3d4..7341b107faf79 100644 --- a/devel-common/src/sphinx_exts/pagefind_search/builder.py +++ b/devel-common/src/sphinx_exts/pagefind_search/builder.py @@ -119,7 +119,11 @@ async def build_pagefind_index(app: Sphinx) -> dict[str, int]: skipped = 0 async with PagefindIndex(config=config) as index: - for html_file in await anyio.Path(output_dir).glob(app.config.pagefind_glob): + def _glob_html_files(): + return list(output_dir.glob(app.config.pagefind_glob)) + + html_files = await anyio.to_thread.run_sync(_glob_html_files) + for html_file in html_files: if not await anyio.Path(html_file).is_file(): continue From 164d9cdbf0e5c6133463cb4988e289533e2db5ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sat, 7 Feb 2026 17:40:00 -0300 Subject: [PATCH 3/8] Retrigger CI From a6b0efe56f745e9b6c5afd4dec06ef99d6aa9b0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sat, 7 Feb 2026 18:11:37 -0300 Subject: [PATCH 4/8] Fix ruff format: add blank line before nested function definition --- devel-common/src/sphinx_exts/pagefind_search/builder.py | 1 + 1 file changed, 1 insertion(+) diff --git a/devel-common/src/sphinx_exts/pagefind_search/builder.py b/devel-common/src/sphinx_exts/pagefind_search/builder.py index 7341b107faf79..b9bf10753ef95 100644 --- a/devel-common/src/sphinx_exts/pagefind_search/builder.py +++ b/devel-common/src/sphinx_exts/pagefind_search/builder.py @@ -119,6 +119,7 @@ async def build_pagefind_index(app: Sphinx) -> dict[str, int]: skipped = 0 async with PagefindIndex(config=config) as index: + def _glob_html_files(): return list(output_dir.glob(app.config.pagefind_glob)) From 464c0ff34ff2ca250437c5db71d498619c5b5349 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sat, 7 Feb 2026 19:06:32 -0300 Subject: [PATCH 5/8] Remove newsfragment: internal change, not user-facing --- airflow-core/newsfragments/61418.significant.rst | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 airflow-core/newsfragments/61418.significant.rst diff --git a/airflow-core/newsfragments/61418.significant.rst b/airflow-core/newsfragments/61418.significant.rst deleted file mode 100644 index 92cffbd290454..0000000000000 --- a/airflow-core/newsfragments/61418.significant.rst +++ /dev/null @@ -1,5 +0,0 @@ -Replace blocking filesystem calls in async functions with ``anyio`` equivalents - -Blocking ``os.path`` and ``pathlib.Path`` calls inside async functions have been replaced -with their async-safe ``anyio.Path`` counterparts, fixing all ASYNC240 lint violations. -This prevents blocking the event loop during filesystem I/O in triggers and other async code paths. From 28b8857814f1b1504686af4b2db9ca5c3b4fbc52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sun, 8 Feb 2026 01:09:17 -0300 Subject: [PATCH 6/8] Fix CLI test hang: update mocks to use anyio.Path instead of os.path The test_cli_test_with_deferrable_operator test was mocking os.path.isfile and os.path.getmtime, but these calls were replaced with anyio.Path in the FileTrigger. The stale mocks caused the trigger's while-True loop to never find the file, resulting in an infinite hang that timed out CI. --- .../unit/cli/commands/test_task_command.py | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py b/airflow-core/tests/unit/cli/commands/test_task_command.py index f26b21ee476b6..7ba2f3cc4bf09 100644 --- a/airflow-core/tests/unit/cli/commands/test_task_command.py +++ b/airflow-core/tests/unit/cli/commands/test_task_command.py @@ -241,27 +241,32 @@ def test_cli_test_with_env_vars(self): assert "foo=bar" in output assert "AIRFLOW_TEST_MODE=True" in output - @mock.patch("airflow.providers.standard.triggers.file.os.path.getmtime", return_value=0) @mock.patch( "airflow.providers.standard.triggers.file.glob", return_value=["/tmp/temporary_file_for_testing"] ) - @mock.patch("airflow.providers.standard.triggers.file.os") @mock.patch("airflow.providers.standard.sensors.filesystem.FileSensor.poke", return_value=False) - def test_cli_test_with_deferrable_operator(self, mock_pock, mock_os, mock_glob, mock_getmtime, caplog): - mock_os.path.isfile.return_value = True - with caplog.at_level(level=logging.INFO): - task_command.task_test( - self.parser.parse_args( - [ - "tasks", - "test", - "example_sensors", - "wait_for_file_async", - DEFAULT_DATE.isoformat(), - ] + def test_cli_test_with_deferrable_operator(self, mock_poke, mock_glob, caplog): + mock_stat = mock.MagicMock() + mock_stat.st_mtime = 0 + mock_path_instance = mock.MagicMock() + mock_path_instance.is_file = mock.AsyncMock(return_value=True) + mock_path_instance.stat = mock.AsyncMock(return_value=mock_stat) + mock_anyio_path = mock.MagicMock(return_value=mock_path_instance) + + with mock.patch("airflow.providers.standard.triggers.file.anyio.Path", mock_anyio_path): + with caplog.at_level(level=logging.INFO): + task_command.task_test( + self.parser.parse_args( + [ + "tasks", + "test", + "example_sensors", + "wait_for_file_async", + DEFAULT_DATE.isoformat(), + ] + ) ) - ) - output = caplog.text + output = caplog.text assert "Found File /tmp/temporary_file_for_testing" in output def test_task_render(self): From 9f97c9c390e163c0070ccc75031c8c86b198bfb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sun, 8 Feb 2026 01:17:54 -0300 Subject: [PATCH 7/8] Cache anyio.Path instance in test to match production code style --- providers/edge3/tests/unit/edge3/cli/test_worker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py b/providers/edge3/tests/unit/edge3/cli/test_worker.py index 8a0357207d8dd..2d6d4542a8d99 100644 --- a/providers/edge3/tests/unit/edge3/cli/test_worker.py +++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py @@ -322,9 +322,10 @@ async def test_push_logs_in_chunks(self, mock_logs_push, worker_with_job: EdgeWo @pytest.mark.asyncio async def test_check_running_jobs_log_push_increment(self, mock_logs_push, worker_with_job: EdgeWorker): job = EdgeWorker.jobs[0] - await anyio.Path(job.logfile).write_text("hello ") - job.logsize = (await anyio.Path(job.logfile).stat()).st_size - await anyio.Path(job.logfile).write_text("hello world") + aio_logfile = anyio.Path(job.logfile) + await aio_logfile.write_text("hello ") + job.logsize = (await aio_logfile.stat()).st_size + await aio_logfile.write_text("hello world") with conf_vars({("edge", "api_url"): "https://invalid-api-test-endpoint"}): await worker_with_job._push_logs_in_chunks(job) assert len(EdgeWorker.jobs) == 1 From 05219c39b83d83f59d7b902b57e906c1eb8c0b4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Ahlert?= Date: Sun, 8 Feb 2026 02:37:25 -0300 Subject: [PATCH 8/8] Retrigger CI