From 3d991f04a86a645f9e4a68b42e1cfbcca81bf4e7 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 27 Aug 2025 14:06:42 -0700 Subject: [PATCH 1/6] Use atomic writes in LocalStore Fixes #3411 I use the standard strategy of writing to a temporary file in the same directory, and then renaming it to the desired name. This ensure that Zarr writes are either complete or not written at all. --- src/zarr/storage/_local.py | 52 +++++++++++++++++++++++++++------- tests/test_store/test_local.py | 44 ++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index 1229ec316a..d7d7b75b2b 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -1,11 +1,15 @@ from __future__ import annotations import asyncio +from collections.abc import Iterator +import contextlib import io import os import shutil +import sys from pathlib import Path -from typing import TYPE_CHECKING, Self +from typing import TYPE_CHECKING, Literal, BinaryIO, Self +import uuid from zarr.abc.store import ( ByteRequest, @@ -41,6 +45,39 @@ def _get(path: Path, prototype: BufferPrototype, byte_range: ByteRequest | None) return prototype.buffer.from_bytes(f.read()) +if sys.platform == 'win32': + # Per the os.rename docs: + # On Windows, if dst exists a FileExistsError is always raised. + _safe_move = os.rename +else: + # On Unix, os.rename silently replace files, so instead we use os.link like + # atomicwrites: + # https://github.com/untitaker/python-atomicwrites/blob/1.4.1/atomicwrites/__init__.py#L59-L60 + # This also raises FileExistsError if dst exists. + def _safe_move(src: Path, dst: Path) -> None: + os.link(src, dst) + os.unlink(src) + + +@contextlib.contextmanager +def _atomic_write( + path: Path, + mode: Literal["r+b", "wb"], + exclusive: bool = False, +) -> Iterator[BinaryIO]: + tmp_path = path.with_suffix(f'.{uuid.uuid4().hex}.partial') + try: + with tmp_path.open(mode) as f: + yield f + if exclusive: + _safe_move(tmp_path, path) + else: + tmp_path.replace(path) + except Exception: + tmp_path.unlink(missing_ok=True) + raise + + def _put( path: Path, value: Buffer, @@ -48,20 +85,15 @@ def _put( exclusive: bool = False, ) -> int | None: path.parent.mkdir(parents=True, exist_ok=True) + # write takes any object supporting the buffer protocol + view = value.as_buffer_like() if start is not None: with path.open("r+b") as f: f.seek(start) - # write takes any object supporting the buffer protocol - f.write(value.as_buffer_like()) + f.write(view) return None else: - view = value.as_buffer_like() - if exclusive: - mode = "xb" - else: - mode = "wb" - with path.open(mode=mode) as f: - # write takes any object supporting the buffer protocol + with _atomic_write(path, "wb", exclusive=exclusive) as f: return f.write(view) diff --git a/tests/test_store/test_local.py b/tests/test_store/test_local.py index 7974d0d633..ea1713c441 100644 --- a/tests/test_store/test_local.py +++ b/tests/test_store/test_local.py @@ -10,6 +10,7 @@ from zarr import create_array from zarr.core.buffer import Buffer, cpu from zarr.storage import LocalStore +from zarr.storage._local import _atomic_write from zarr.testing.store import StoreTests from zarr.testing.utils import assert_bytes_equal @@ -109,3 +110,46 @@ async def test_move( FileExistsError, match=re.escape(f"Destination root {destination} already exists") ): await store2.move(destination) + + +@pytest.mark.parametrize("exclusive", [True, False]) +def test_atomic_write_successful(tmp_path: pathlib.Path, exclusive: bool) -> None: + path = pathlib.Path(tmp_path) / 'data' + with _atomic_write(path, 'wb', exclusive=exclusive) as f: + f.write(b'abc') + assert path.read_bytes() == b'abc' + assert list(path.parent.iterdir()) == [path] # no temp files + + +@pytest.mark.parametrize("exclusive", [True, False]) +def test_atomic_write_incomplete(tmp_path: pathlib.Path, exclusive: bool) -> None: + path = pathlib.Path(tmp_path) / 'data' + with pytest.raises(RuntimeError): + with _atomic_write(path, 'wb', exclusive=exclusive) as f: + f.write(b'a') + raise RuntimeError + assert not path.exists() + assert list(path.parent.iterdir()) == [] # no temp files + + +def test_atomic_write_non_exclusive_preexisting(tmp_path: pathlib.Path) -> None: + path = pathlib.Path(tmp_path) / 'data' + with path.open('wb') as f: + f.write(b'xyz') + assert path.read_bytes() == b'xyz' + with _atomic_write(path, 'wb', exclusive=False) as f: + f.write(b'abc') + assert path.read_bytes() == b'abc' + assert list(path.parent.iterdir()) == [path] # no temp files + + +def test_atomic_write_exclusive_preexisting(tmp_path: pathlib.Path) -> None: + path = pathlib.Path(tmp_path) / 'data' + with path.open('wb') as f: + f.write(b'xyz') + assert path.read_bytes() == b'xyz' + with pytest.raises(FileExistsError): + with _atomic_write(path, 'wb', exclusive=True) as f: + f.write(b'abc') + assert path.read_bytes() == b'xyz' + assert list(path.parent.iterdir()) == [path] # no temp files From fb1f32dcc21965d73be82171dcc77ea9aae62b23 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 27 Aug 2025 14:21:09 -0700 Subject: [PATCH 2/6] Lint fixes --- src/zarr/storage/_local.py | 3 +-- tests/test_store/test_local.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index d7d7b75b2b..f52197329b 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -from collections.abc import Iterator import contextlib import io import os @@ -23,7 +22,7 @@ from zarr.core.common import AccessModeLiteral, concurrent_map if TYPE_CHECKING: - from collections.abc import AsyncIterator, Iterable + from collections.abc import AsyncIterator, Iterable, Iterator from zarr.core.buffer import BufferPrototype diff --git a/tests/test_store/test_local.py b/tests/test_store/test_local.py index ea1713c441..0e35e80e81 100644 --- a/tests/test_store/test_local.py +++ b/tests/test_store/test_local.py @@ -124,7 +124,7 @@ def test_atomic_write_successful(tmp_path: pathlib.Path, exclusive: bool) -> Non @pytest.mark.parametrize("exclusive", [True, False]) def test_atomic_write_incomplete(tmp_path: pathlib.Path, exclusive: bool) -> None: path = pathlib.Path(tmp_path) / 'data' - with pytest.raises(RuntimeError): + with pytest.raises(RuntimeError): # noqa: PT012 with _atomic_write(path, 'wb', exclusive=exclusive) as f: f.write(b'a') raise RuntimeError From 6cab98654c2f09c77f2047fe610c4f119625779a Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 27 Aug 2025 14:32:10 -0700 Subject: [PATCH 3/6] import sort --- src/zarr/storage/_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index f52197329b..1719fea1e7 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -7,7 +7,7 @@ import shutil import sys from pathlib import Path -from typing import TYPE_CHECKING, Literal, BinaryIO, Self +from typing import TYPE_CHECKING, BinaryIO, Literal, Self import uuid from zarr.abc.store import ( From cfc01e131b7b8f9bc69d8673515302e7c292c6a6 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 27 Aug 2025 15:35:22 -0700 Subject: [PATCH 4/6] actually fix import order --- src/zarr/storage/_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index 1719fea1e7..88f1b42394 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -6,9 +6,9 @@ import os import shutil import sys +import uuid from pathlib import Path from typing import TYPE_CHECKING, BinaryIO, Literal, Self -import uuid from zarr.abc.store import ( ByteRequest, From 03dc6f4c1dc691cb21a9b3d97700f09b0c0ee622 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 27 Aug 2025 15:38:25 -0700 Subject: [PATCH 5/6] ruff format --- src/zarr/storage/_local.py | 4 ++-- tests/test_store/test_local.py | 42 +++++++++++++++++----------------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index 88f1b42394..717ed04144 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -44,7 +44,7 @@ def _get(path: Path, prototype: BufferPrototype, byte_range: ByteRequest | None) return prototype.buffer.from_bytes(f.read()) -if sys.platform == 'win32': +if sys.platform == "win32": # Per the os.rename docs: # On Windows, if dst exists a FileExistsError is always raised. _safe_move = os.rename @@ -64,7 +64,7 @@ def _atomic_write( mode: Literal["r+b", "wb"], exclusive: bool = False, ) -> Iterator[BinaryIO]: - tmp_path = path.with_suffix(f'.{uuid.uuid4().hex}.partial') + tmp_path = path.with_suffix(f".{uuid.uuid4().hex}.partial") try: with tmp_path.open(mode) as f: yield f diff --git a/tests/test_store/test_local.py b/tests/test_store/test_local.py index 0e35e80e81..970bb7d374 100644 --- a/tests/test_store/test_local.py +++ b/tests/test_store/test_local.py @@ -114,42 +114,42 @@ async def test_move( @pytest.mark.parametrize("exclusive", [True, False]) def test_atomic_write_successful(tmp_path: pathlib.Path, exclusive: bool) -> None: - path = pathlib.Path(tmp_path) / 'data' - with _atomic_write(path, 'wb', exclusive=exclusive) as f: - f.write(b'abc') - assert path.read_bytes() == b'abc' + path = pathlib.Path(tmp_path) / "data" + with _atomic_write(path, "wb", exclusive=exclusive) as f: + f.write(b"abc") + assert path.read_bytes() == b"abc" assert list(path.parent.iterdir()) == [path] # no temp files @pytest.mark.parametrize("exclusive", [True, False]) def test_atomic_write_incomplete(tmp_path: pathlib.Path, exclusive: bool) -> None: - path = pathlib.Path(tmp_path) / 'data' + path = pathlib.Path(tmp_path) / "data" with pytest.raises(RuntimeError): # noqa: PT012 - with _atomic_write(path, 'wb', exclusive=exclusive) as f: - f.write(b'a') + with _atomic_write(path, "wb", exclusive=exclusive) as f: + f.write(b"a") raise RuntimeError assert not path.exists() assert list(path.parent.iterdir()) == [] # no temp files def test_atomic_write_non_exclusive_preexisting(tmp_path: pathlib.Path) -> None: - path = pathlib.Path(tmp_path) / 'data' - with path.open('wb') as f: - f.write(b'xyz') - assert path.read_bytes() == b'xyz' - with _atomic_write(path, 'wb', exclusive=False) as f: - f.write(b'abc') - assert path.read_bytes() == b'abc' + path = pathlib.Path(tmp_path) / "data" + with path.open("wb") as f: + f.write(b"xyz") + assert path.read_bytes() == b"xyz" + with _atomic_write(path, "wb", exclusive=False) as f: + f.write(b"abc") + assert path.read_bytes() == b"abc" assert list(path.parent.iterdir()) == [path] # no temp files def test_atomic_write_exclusive_preexisting(tmp_path: pathlib.Path) -> None: - path = pathlib.Path(tmp_path) / 'data' - with path.open('wb') as f: - f.write(b'xyz') - assert path.read_bytes() == b'xyz' + path = pathlib.Path(tmp_path) / "data" + with path.open("wb") as f: + f.write(b"xyz") + assert path.read_bytes() == b"xyz" with pytest.raises(FileExistsError): - with _atomic_write(path, 'wb', exclusive=True) as f: - f.write(b'abc') - assert path.read_bytes() == b'xyz' + with _atomic_write(path, "wb", exclusive=True) as f: + f.write(b"abc") + assert path.read_bytes() == b"xyz" assert list(path.parent.iterdir()) == [path] # no temp files From f2c1fcd45562327fa970541cb0c846f83c481406 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Thu, 28 Aug 2025 10:25:53 -0700 Subject: [PATCH 6/6] Add release note --- changes/3411.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/3411.bugfix.rst diff --git a/changes/3411.bugfix.rst b/changes/3411.bugfix.rst new file mode 100644 index 0000000000..b9303b9666 --- /dev/null +++ b/changes/3411.bugfix.rst @@ -0,0 +1 @@ +LocalStore now uses atomic writes, which should prevent some cases of corrupted data.