From 582f1b392ae6d31fdac45212cfe9d6769a58432c Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Mon, 2 Feb 2026 15:33:34 +1100 Subject: [PATCH 1/3] fix: cleanup temporary files `tempfile.mktemp()` does not clean up after itself --- tests/test_pipeline.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 3dd34df0..c9fbda37 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -18,6 +18,11 @@ import zarrs +# Create a single temp directory for all test arrays +_test_tmpdir = tempfile.TemporaryDirectory(prefix="zarrs_test_") +_test_arrays_dir = Path(_test_tmpdir.name) +_test_array_counter = 0 + axis_size_ = 10 chunk_size_ = axis_size_ // 2 fill_value_ = 32767 @@ -74,9 +79,14 @@ def pytest_generate_tests(metafunc): if sum(isinstance(i, EllipsisType) for i in index) > 1: continue for indexing_method_param in indexing_method_params: + global _test_array_counter arr = gen_arr( - fill_value_, Path(tempfile.mktemp()), dimensionality, format + fill_value_, + _test_arrays_dir / str(_test_array_counter), + dimensionality, + format, ) + _test_array_counter += 1 indexing_method = indexing_method_param.values[0] dimensionality_id = f"{dimensionality}d" id = "-".join( From 90abe75833c64d0a4e45b40e2479b344c9e5025c Mon Sep 17 00:00:00 2001 From: "Philipp A." Date: Mon, 2 Feb 2026 09:55:22 +0100 Subject: [PATCH 2/3] defer array creation in pipeline tests --- tests/pipeline/conftest.py | 134 +++++++++++++ tests/pipeline/test_pipeline.py | 131 ++++++++++++ tests/pipeline/test_roundtrip.py | 117 +++++++++++ tests/test_pipeline.py | 328 ------------------------------- 4 files changed, 382 insertions(+), 328 deletions(-) create mode 100644 tests/pipeline/conftest.py create mode 100644 tests/pipeline/test_pipeline.py create mode 100644 tests/pipeline/test_roundtrip.py delete mode 100644 tests/test_pipeline.py diff --git a/tests/pipeline/conftest.py b/tests/pipeline/conftest.py new file mode 100644 index 00000000..8afbe77f --- /dev/null +++ b/tests/pipeline/conftest.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +import operator +from functools import reduce +from itertools import product +from types import EllipsisType +from typing import TYPE_CHECKING, Literal, TypeAlias + +import numpy as np +import pytest +import zarr +import zarr.codecs +from zarr.storage import LocalStore + +if TYPE_CHECKING: + from collections.abc import Callable, Generator + from pathlib import Path + + from _pytest.mark.structures import ParameterSet + + Index: TypeAlias = tuple[int | slice | np.ndarray | EllipsisType, ...] + + +axis_size_ = 10 +chunk_size_ = axis_size_ // 2 +fill_value_ = 32767 +dimensionalities_ = list(range(1, 5)) + + +@pytest.fixture +def axis_size() -> int: + return axis_size_ + + +@pytest.fixture +def fill_value() -> int: + return fill_value_ + + +non_numpy_indices = [ + pytest.param(slice(1, 3), id="slice_in_chunk"), + pytest.param(slice(1, 7), id="slice_across_chunks"), + pytest.param(2, id="int"), + pytest.param(slice(None), id="full_slice"), + pytest.param(Ellipsis, id="ellipsis"), +] + +numpy_indices = [ + pytest.param(np.array([1, 2]), id="contiguous_in_chunk_array"), + pytest.param(np.array([0, 3]), id="discontinuous_in_chunk_array"), + pytest.param(np.array([0, 6]), id="across_chunks_indices_array"), +] + +all_indices = numpy_indices + non_numpy_indices + +indexing_method_params = [ + pytest.param(lambda x: getattr(x, "oindex"), id="oindex"), + pytest.param(lambda x: x, id="vindex"), +] + +zarr_formats = [2, 3] + + +def _full_array(shape: tuple[int, ...]) -> np.ndarray: + return np.arange(reduce(operator.mul, shape, 1)).reshape(shape) + + +@pytest.fixture +def full_array() -> Callable[[tuple[int, ...]], np.ndarray]: + return _full_array + + +def gen_arr( + tmp_path: Path, fill_value: int, dimensionality: int, format: Literal[2, 3] +) -> zarr.Array: + return zarr.create( + (axis_size_,) * dimensionality, + store=LocalStore(root=tmp_path / ".zarr"), + chunks=(chunk_size_,) * dimensionality, + dtype=np.int16, + fill_value=fill_value, + codecs=[zarr.codecs.BytesCodec(), zarr.codecs.BloscCodec()] + if format == 3 + else None, + zarr_format=format, + ) + + +@pytest.fixture(params=dimensionalities_) +def dimensionality(request: pytest.FixtureRequest) -> int: + return request.param + + +@pytest.fixture(params=zarr_formats) +def format(request: pytest.FixtureRequest) -> Literal[2, 3]: + return request.param + + +@pytest.fixture +def arr(tmp_path: Path, dimensionality: int, format: Literal[2, 3]) -> zarr.Array: + return gen_arr(tmp_path, fill_value_, dimensionality, format) + + +# this parameter set is only used for test_roundtrip, but it’s easier to define here + + +def roundtrip_params() -> Generator[ParameterSet]: + for format, dimensionality in product(zarr_formats, dimensionalities_): + indexers = non_numpy_indices if dimensionality > 2 else all_indices + for index_param_prod in product(indexers, repeat=dimensionality): + index = tuple(index_param.values[0] for index_param in index_param_prod) + # multi-ellipsis indexing is not supported + if sum(isinstance(i, EllipsisType) for i in index) > 1: + continue + for indexing_method_param in indexing_method_params: + id = "-".join( + [ + str(indexing_method_param.id), + f"{dimensionality}d", + *(str(index_param.id) for index_param in index_param_prod), + f"v{format}", + ] + ) + indexing_method = indexing_method_param.values[0] + yield pytest.param( + (format, dimensionality, index, indexing_method), id=id + ) + + +@pytest.fixture(params=list(roundtrip_params())) +def roundtrip( + request: pytest.FixtureRequest, +) -> tuple[Literal[2, 3], int, Index, Callable]: + return request.param diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py new file mode 100644 index 00000000..8c747d3a --- /dev/null +++ b/tests/pipeline/test_pipeline.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import operator +import pickle +import platform +from contextlib import contextmanager +from functools import reduce +from pathlib import Path +from typing import TYPE_CHECKING, TypeAlias + +import numpy as np +import pytest +import zarr +import zarr.abc.codec +import zarr.codecs + +import zarrs + +if TYPE_CHECKING: + from collections.abc import Callable + from types import EllipsisType + + Index: TypeAlias = tuple[int | slice | np.ndarray | EllipsisType, ...] + + +def test_fill_value(arr: zarr.Array, fill_value: int) -> None: + assert np.all(arr[:] == fill_value) + + +def test_constant(arr: zarr.Array): + arr[:] = 42 + assert np.all(arr[:] == 42) + + +def test_singleton(arr: zarr.Array): + singleton_index = (1,) * len(arr.shape) + non_singleton_index = (0,) * len(arr.shape) + arr[singleton_index] = 42 + assert arr[singleton_index] == 42 + assert arr[non_singleton_index] != 42 + + +def test_full_array( + arr: zarr.Array, full_array: Callable[[tuple[int, ...]], np.ndarray] +) -> None: + stored_values = full_array(arr.shape) + arr[:] = stored_values + assert np.all(arr[:] == stored_values) + + +def test_ellipsis_indexing_invalid(arr: zarr.Array): + if len(arr.shape) <= 2: + pytest.skip( + "Ellipsis indexing works for 1D and 2D arrays in zarr-python despite a shape mismatch" + ) + stored_value = np.array([1, 2, 3]) + expected_errors = ( + "references array indices.*out-of-bounds of array shape", + "the size of the chunk subset.*and input/output subset.* are incompatible", + ) + with pytest.raises(IndexError, match="|".join(expected_errors)): + arr[2, ...] = stored_value + + +def test_pickle(arr: zarr.Array, tmp_path: Path): + arr[:] = np.arange(reduce(operator.mul, arr.shape, 1)).reshape(arr.shape) + expected = arr[:] + with Path.open(tmp_path / "arr.pickle", "wb") as f: + pickle.dump(arr._async_array.codec_pipeline, f) + with Path.open(tmp_path / "arr.pickle", "rb") as f: + object.__setattr__(arr._async_array, "codec_pipeline", pickle.load(f)) + assert (arr[:] == expected).all() + + +@pytest.mark.parametrize( + "codec", + [zarr.codecs.BloscCodec(), zarr.codecs.GzipCodec(), zarr.codecs.ZstdCodec()], +) +@pytest.mark.parametrize("should_shard", [True, False]) +def test_pipeline_used( + mocker, codec: zarr.abc.codec.BaseCodec, tmp_path: Path, *, should_shard: bool +): + z = zarr.create_array( + tmp_path / "foo.zarr", + dtype=np.float64, + shape=(80, 100), + chunks=(10, 10), + shards=(20, 20) if should_shard else None, + compressors=[codec], + ) + spy_read = mocker.spy(z._async_array.codec_pipeline, "read") + spy_write = mocker.spy(z._async_array.codec_pipeline, "write") + assert isinstance(z._async_array.codec_pipeline, zarrs.ZarrsCodecPipeline) + z[...] = np.random.random(z.shape) + z[...] + assert spy_read.call_count == 1 + assert spy_write.call_count == 1 + + +@contextmanager +def use_zarrs_direct_io(): + zarr.config.set( + { + "codec_pipeline.path": "zarrs.ZarrsCodecPipeline", + "codec_pipeline.direct_io": True, + } + ) + yield + zarr.config.set( + { + "codec_pipeline.path": "zarrs.ZarrsCodecPipeline", + "codec_pipeline.direct_io": False, + } + ) + + +@pytest.mark.skipif( + platform.system() != "Linux", reason="Can only run O_DIRECT on linux" +) +def test_direct_io(tmp_path: Path): + with use_zarrs_direct_io(): + z = zarr.create_array( + tmp_path / "foo.zarr", + dtype=np.float64, + shape=(80, 100), + chunks=(10, 10), + shards=(20, 20), + ) + ground_truth_arr = np.random.random(z.shape) + z[...] = ground_truth_arr + np.testing.assert_array_equal(z[...], ground_truth_arr) diff --git a/tests/pipeline/test_roundtrip.py b/tests/pipeline/test_roundtrip.py new file mode 100644 index 00000000..db4cb4e6 --- /dev/null +++ b/tests/pipeline/test_roundtrip.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from collections.abc import Generator +from contextlib import contextmanager +from types import EllipsisType +from typing import TYPE_CHECKING, Literal, TypeAlias + +import numpy as np +import pytest +import zarr + +if TYPE_CHECKING: + from collections.abc import Callable, Generator + + Index: TypeAlias = tuple[int | slice | np.ndarray | EllipsisType, ...] + + +@pytest.fixture +def store_values( + roundtrip: tuple[Literal[2, 3], int, Index, Callable], + axis_size: int, + full_array: Callable[[tuple[int, ...]], np.ndarray], +) -> np.ndarray: + _, dimensionality, index, indexing_method = roundtrip + return gen_store_values( + indexing_method, + index, + full_array((axis_size,) * dimensionality), + ) + + +def gen_store_values( + indexing_method: Callable, index: Index, full_array: np.ndarray +) -> np.ndarray: + class smoke: + oindex = "oindex" + + def maybe_convert( + i: int | np.ndarray | slice | EllipsisType, axis: int + ) -> np.ndarray: + if isinstance(i, np.ndarray): + return i + if isinstance(i, slice): + return np.arange( + i.start if i.start is not None else 0, + i.stop if i.stop is not None else full_array.shape[axis], + ) + if isinstance(i, int): + return np.array([i]) + if isinstance(i, EllipsisType): + return np.arange(full_array.shape[axis]) + raise ValueError(f"Invalid index {i}") + + if not isinstance(index, EllipsisType) and indexing_method(smoke()) == "oindex": + index = tuple(maybe_convert(i, axis) for axis, i in enumerate(index)) + res = full_array[np.ix_(*index)] + # squeeze out extra dims from integer indexers + if all(i.shape == (1,) for i in index): + res = res.squeeze() + return res + res = res.squeeze( + axis=tuple(axis for axis, i in enumerate(index) if i.shape == (1,)) + ) + return res + return full_array[index] + + +# overwrite format and dimensionality from conftest + + +@pytest.fixture +def format(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> Literal[2, 3]: + return roundtrip[0] + + +@pytest.fixture +def dimensionality(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> int: + return roundtrip[1] + + +@pytest.fixture +def index(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> Index: + return roundtrip[2] + + +@pytest.fixture +def indexing_method(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> Callable: + return roundtrip[3] + + +@contextmanager +def use_zarr_default_codec_reader() -> Generator[None]: + zarr.config.set( + {"codec_pipeline.path": "zarr.core.codec_pipeline.BatchedCodecPipeline"} + ) + yield + zarr.config.set({"codec_pipeline.path": "zarrs.ZarrsCodecPipeline"}) + + +def test_roundtrip( + arr: zarr.Array, store_values: np.ndarray, index: Index, indexing_method: Callable +) -> None: + indexing_method(arr)[index] = store_values + res = indexing_method(arr)[index] + assert np.all(res == store_values), res + + +def test_roundtrip_read_only_zarrs( + arr: zarr.Array, store_values: np.ndarray, index: Index, indexing_method: Callable +) -> None: + with use_zarr_default_codec_reader(): + arr_default = zarr.open(arr.store, read_only=True) + indexing_method(arr_default)[index] = store_values + res = indexing_method(zarr.open(arr.store))[index] + assert np.all( + res == store_values, + ), res diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py deleted file mode 100644 index c9fbda37..00000000 --- a/tests/test_pipeline.py +++ /dev/null @@ -1,328 +0,0 @@ -#!/usr/bin/env python3 - -import operator -import pickle -import platform -import tempfile -from collections.abc import Callable -from contextlib import contextmanager -from functools import reduce -from itertools import product -from pathlib import Path -from types import EllipsisType - -import numpy as np -import pytest -import zarr -from zarr.storage import LocalStore - -import zarrs - -# Create a single temp directory for all test arrays -_test_tmpdir = tempfile.TemporaryDirectory(prefix="zarrs_test_") -_test_arrays_dir = Path(_test_tmpdir.name) -_test_array_counter = 0 - -axis_size_ = 10 -chunk_size_ = axis_size_ // 2 -fill_value_ = 32767 -dimensionalities_ = list(range(1, 5)) - - -@pytest.fixture -def fill_value() -> int: - return fill_value_ - - -non_numpy_indices = [ - pytest.param(slice(1, 3), id="slice_in_chunk"), - pytest.param(slice(1, 7), id="slice_across_chunks"), - pytest.param(2, id="int"), - pytest.param(slice(None), id="full_slice"), - pytest.param(Ellipsis, id="ellipsis"), -] - -numpy_indices = [ - pytest.param(np.array([1, 2]), id="contiguous_in_chunk_array"), - pytest.param(np.array([0, 3]), id="discontinuous_in_chunk_array"), - pytest.param(np.array([0, 6]), id="across_chunks_indices_array"), -] - -all_indices = numpy_indices + non_numpy_indices - -indexing_method_params = [ - pytest.param(lambda x: getattr(x, "oindex"), id="oindex"), - pytest.param(lambda x: x, id="vindex"), -] - -zarr_formats = [2, 3] - - -def pytest_generate_tests(metafunc): - old_pipeline_path = zarr.config.get("codec_pipeline.path") - # need to set the codec pipeline to the zarrs pipeline because the autouse fixture doesn't apply here - zarr.config.set({"codec_pipeline.path": "zarrs.ZarrsCodecPipeline"}) - if "test_roundtrip" in metafunc.function.__name__: - arrs = [] - indices = [] - store_values = [] - indexing_methods = [] - ids = [] - for format in zarr_formats: - for dimensionality in dimensionalities_: - indexers = non_numpy_indices if dimensionality > 2 else all_indices - for index_param_prod in product(indexers, repeat=dimensionality): - index = tuple( - index_param.values[0] for index_param in index_param_prod - ) - # multi-ellipsis indexing is not supported - if sum(isinstance(i, EllipsisType) for i in index) > 1: - continue - for indexing_method_param in indexing_method_params: - global _test_array_counter - arr = gen_arr( - fill_value_, - _test_arrays_dir / str(_test_array_counter), - dimensionality, - format, - ) - _test_array_counter += 1 - indexing_method = indexing_method_param.values[0] - dimensionality_id = f"{dimensionality}d" - id = "-".join( - [indexing_method_param.id, dimensionality_id] - + [index_param.id for index_param in index_param_prod] - + [f"v{format}"] - ) - ids.append(id) - store_values.append( - gen_store_values( - indexing_method, - index, - full_array((axis_size_,) * dimensionality), - ) - ) - indexing_methods.append(indexing_method) - indices.append(index) - arrs.append(arr) - # array is used as param name to prevent collision with arr fixture - metafunc.parametrize( - ["array", "index", "store_values", "indexing_method"], - zip(arrs, indices, store_values, indexing_methods), - ids=ids, - ) - zarr.config.set({"codec_pipeline.path": old_pipeline_path}) - - -def full_array(shape) -> np.ndarray: - return np.arange(reduce(operator.mul, shape, 1)).reshape(shape) - - -def gen_store_values( - indexing_method: Callable, - index: tuple[int | slice | np.ndarray | EllipsisType, ...], - full_array: np.ndarray, -) -> np.ndarray: - class smoke: - oindex = "oindex" - - def maybe_convert( - i: int | np.ndarray | slice | EllipsisType, axis: int - ) -> np.ndarray: - if isinstance(i, np.ndarray): - return i - if isinstance(i, slice): - return np.arange( - i.start if i.start is not None else 0, - i.stop if i.stop is not None else full_array.shape[axis], - ) - if isinstance(i, int): - return np.array([i]) - if isinstance(i, EllipsisType): - return np.arange(full_array.shape[axis]) - raise ValueError(f"Invalid index {i}") - - if not isinstance(index, EllipsisType) and indexing_method(smoke()) == "oindex": - index: tuple[np.ndarray, ...] = tuple( - maybe_convert(i, axis) for axis, i in enumerate(index) - ) - res = full_array[np.ix_(*index)] - # squeeze out extra dims from integer indexers - if all(i.shape == (1,) for i in index): - res = res.squeeze() - return res - res = res.squeeze( - axis=tuple(axis for axis, i in enumerate(index) if i.shape == (1,)) - ) - return res - return full_array[index] - - -def gen_arr(fill_value, tmp_path, dimensionality, format) -> zarr.Array: - return zarr.create( - (axis_size_,) * dimensionality, - store=LocalStore(root=tmp_path / ".zarr"), - chunks=(chunk_size_,) * dimensionality, - dtype=np.int16, - fill_value=fill_value, - codecs=[zarr.codecs.BytesCodec(), zarr.codecs.BloscCodec()] - if format == 3 - else None, - zarr_format=format, - ) - - -@pytest.fixture(params=dimensionalities_) -def dimensionality(request): - return request.param - - -@pytest.fixture(params=zarr_formats) -def format(request): - return request.param - - -@pytest.fixture -def arr(dimensionality, tmp_path, format) -> zarr.Array: - return gen_arr(fill_value_, tmp_path, dimensionality, format) - - -def test_fill_value(arr: zarr.Array): - assert np.all(arr[:] == fill_value_) - - -def test_constant(arr: zarr.Array): - arr[:] = 42 - assert np.all(arr[:] == 42) - - -def test_singleton(arr: zarr.Array): - singleton_index = (1,) * len(arr.shape) - non_singleton_index = (0,) * len(arr.shape) - arr[singleton_index] = 42 - assert arr[singleton_index] == 42 - assert arr[non_singleton_index] != 42 - - -def test_full_array(arr: zarr.Array): - stored_values = full_array(arr.shape) - arr[:] = stored_values - assert np.all(arr[:] == stored_values) - - -def test_roundtrip( - array: zarr.Array, - store_values: np.ndarray, - index: tuple[int | slice | np.ndarray | EllipsisType, ...], - indexing_method: Callable, -): - indexing_method(array)[index] = store_values - res = indexing_method(array)[index] - assert np.all(res == store_values), res - - -def test_ellipsis_indexing_invalid(arr: zarr.Array): - if len(arr.shape) <= 2: - pytest.skip( - "Ellipsis indexing works for 1D and 2D arrays in zarr-python despite a shape mismatch" - ) - stored_value = np.array([1, 2, 3]) - expected_errors = ( - "references array indices.*out-of-bounds of array shape", - "the size of the chunk subset.*and input/output subset.* are incompatible", - ) - with pytest.raises(IndexError, match="|".join(expected_errors)): - arr[2, ...] = stored_value - - -def test_pickle(arr: zarr.Array, tmp_path: Path): - arr[:] = np.arange(reduce(operator.mul, arr.shape, 1)).reshape(arr.shape) - expected = arr[:] - with Path.open(tmp_path / "arr.pickle", "wb") as f: - pickle.dump(arr._async_array.codec_pipeline, f) - with Path.open(tmp_path / "arr.pickle", "rb") as f: - object.__setattr__(arr._async_array, "codec_pipeline", pickle.load(f)) - assert (arr[:] == expected).all() - - -@contextmanager -def use_zarr_default_codec_reader(): - zarr.config.set( - {"codec_pipeline.path": "zarr.core.codec_pipeline.BatchedCodecPipeline"} - ) - yield - zarr.config.set({"codec_pipeline.path": "zarrs.ZarrsCodecPipeline"}) - - -def test_roundtrip_read_only_zarrs( - array: zarr.Array, - store_values: np.ndarray, - index: tuple[int | slice | np.ndarray | EllipsisType, ...], - indexing_method: Callable, -): - with use_zarr_default_codec_reader(): - arr_default = zarr.open(array.store, read_only=True) - indexing_method(arr_default)[index] = store_values - res = indexing_method(zarr.open(array.store))[index] - assert np.all( - res == store_values, - ), res - - -@pytest.mark.parametrize( - "codec", - [zarr.codecs.BloscCodec(), zarr.codecs.GzipCodec(), zarr.codecs.ZstdCodec()], -) -@pytest.mark.parametrize("should_shard", [True, False]) -def test_pipeline_used( - mocker, codec: zarr.abc.codec.BaseCodec, tmp_path: Path, *, should_shard: bool -): - z = zarr.create_array( - tmp_path / "foo.zarr", - dtype=np.float64, - shape=(80, 100), - chunks=(10, 10), - shards=(20, 20) if should_shard else None, - compressors=[codec], - ) - spy_read = mocker.spy(z._async_array.codec_pipeline, "read") - spy_write = mocker.spy(z._async_array.codec_pipeline, "write") - assert isinstance(z._async_array.codec_pipeline, zarrs.ZarrsCodecPipeline) - z[...] = np.random.random(z.shape) - z[...] - assert spy_read.call_count == 1 - assert spy_write.call_count == 1 - - -@contextmanager -def use_zarrs_direct_io(): - zarr.config.set( - { - "codec_pipeline.path": "zarrs.ZarrsCodecPipeline", - "codec_pipeline.direct_io": True, - } - ) - yield - zarr.config.set( - { - "codec_pipeline.path": "zarrs.ZarrsCodecPipeline", - "codec_pipeline.direct_io": False, - } - ) - - -@pytest.mark.skipif( - platform.system() != "Linux", reason="Can only run O_DIRECT on linux" -) -def test_direct_io(tmp_path: Path): - with use_zarrs_direct_io(): - z = zarr.create_array( - tmp_path / "foo.zarr", - dtype=np.float64, - shape=(80, 100), - chunks=(10, 10), - shards=(20, 20), - ) - ground_truth_arr = np.random.random(z.shape) - z[...] = ground_truth_arr - np.testing.assert_array_equal(z[...], ground_truth_arr) From 81416ce391d176b1327e9f03f9a653a50464ee85 Mon Sep 17 00:00:00 2001 From: "Philipp A." Date: Mon, 2 Feb 2026 10:17:26 +0100 Subject: [PATCH 3/3] make use_zarrs_direct_io a fixture --- tests/pipeline/test_pipeline.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 8c747d3a..70c7a1dd 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,7 +3,6 @@ import operator import pickle import platform -from contextlib import contextmanager from functools import reduce from pathlib import Path from typing import TYPE_CHECKING, TypeAlias @@ -17,7 +16,7 @@ import zarrs if TYPE_CHECKING: - from collections.abc import Callable + from collections.abc import Callable, Generator from types import EllipsisType Index: TypeAlias = tuple[int | slice | np.ndarray | EllipsisType, ...] @@ -97,8 +96,8 @@ def test_pipeline_used( assert spy_write.call_count == 1 -@contextmanager -def use_zarrs_direct_io(): +@pytest.fixture +def use_zarrs_direct_io() -> Generator[None]: zarr.config.set( { "codec_pipeline.path": "zarrs.ZarrsCodecPipeline", @@ -117,15 +116,15 @@ def use_zarrs_direct_io(): @pytest.mark.skipif( platform.system() != "Linux", reason="Can only run O_DIRECT on linux" ) -def test_direct_io(tmp_path: Path): - with use_zarrs_direct_io(): - z = zarr.create_array( - tmp_path / "foo.zarr", - dtype=np.float64, - shape=(80, 100), - chunks=(10, 10), - shards=(20, 20), - ) - ground_truth_arr = np.random.random(z.shape) - z[...] = ground_truth_arr - np.testing.assert_array_equal(z[...], ground_truth_arr) +@pytest.mark.usefixtures("use_zarrs_direct_io") +def test_direct_io(tmp_path: Path) -> None: + z = zarr.create_array( + tmp_path / "foo.zarr", + dtype=np.float64, + shape=(80, 100), + chunks=(10, 10), + shards=(20, 20), + ) + ground_truth_arr = np.random.random(z.shape) + z[...] = ground_truth_arr + np.testing.assert_array_equal(z[...], ground_truth_arr)