Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions tests/pipeline/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from __future__ import annotations

import operator
from functools import reduce
from itertools import product
from types import EllipsisType
from typing import TYPE_CHECKING, Literal, TypeAlias

import numpy as np
import pytest
import zarr
import zarr.codecs
from zarr.storage import LocalStore

if TYPE_CHECKING:
from collections.abc import Callable, Generator
from pathlib import Path

from _pytest.mark.structures import ParameterSet

Index: TypeAlias = tuple[int | slice | np.ndarray | EllipsisType, ...]


axis_size_ = 10
chunk_size_ = axis_size_ // 2
fill_value_ = 32767
dimensionalities_ = list(range(1, 5))


@pytest.fixture
def axis_size() -> int:
return axis_size_


@pytest.fixture
def fill_value() -> int:
return fill_value_


non_numpy_indices = [
pytest.param(slice(1, 3), id="slice_in_chunk"),
pytest.param(slice(1, 7), id="slice_across_chunks"),
pytest.param(2, id="int"),
pytest.param(slice(None), id="full_slice"),
pytest.param(Ellipsis, id="ellipsis"),
]

numpy_indices = [
pytest.param(np.array([1, 2]), id="contiguous_in_chunk_array"),
pytest.param(np.array([0, 3]), id="discontinuous_in_chunk_array"),
pytest.param(np.array([0, 6]), id="across_chunks_indices_array"),
]

all_indices = numpy_indices + non_numpy_indices

indexing_method_params = [
pytest.param(lambda x: getattr(x, "oindex"), id="oindex"),
pytest.param(lambda x: x, id="vindex"),
]

zarr_formats = [2, 3]


def _full_array(shape: tuple[int, ...]) -> np.ndarray:
return np.arange(reduce(operator.mul, shape, 1)).reshape(shape)


@pytest.fixture
def full_array() -> Callable[[tuple[int, ...]], np.ndarray]:
return _full_array


def gen_arr(
tmp_path: Path, fill_value: int, dimensionality: int, format: Literal[2, 3]
) -> zarr.Array:
return zarr.create(
(axis_size_,) * dimensionality,
store=LocalStore(root=tmp_path / ".zarr"),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would in-memory storage also work here?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so since this package doesn't support that kind of store crossing into rust

chunks=(chunk_size_,) * dimensionality,
dtype=np.int16,
fill_value=fill_value,
codecs=[zarr.codecs.BytesCodec(), zarr.codecs.BloscCodec()]
if format == 3
else None,
zarr_format=format,
)


@pytest.fixture(params=dimensionalities_)
def dimensionality(request: pytest.FixtureRequest) -> int:
return request.param


@pytest.fixture(params=zarr_formats)
def format(request: pytest.FixtureRequest) -> Literal[2, 3]:
return request.param


@pytest.fixture
def arr(tmp_path: Path, dimensionality: int, format: Literal[2, 3]) -> zarr.Array:
return gen_arr(tmp_path, fill_value_, dimensionality, format)


# this parameter set is only used for test_roundtrip, but it’s easier to define here


def roundtrip_params() -> Generator[ParameterSet]:
for format, dimensionality in product(zarr_formats, dimensionalities_):
indexers = non_numpy_indices if dimensionality > 2 else all_indices
for index_param_prod in product(indexers, repeat=dimensionality):
index = tuple(index_param.values[0] for index_param in index_param_prod)
# multi-ellipsis indexing is not supported
if sum(isinstance(i, EllipsisType) for i in index) > 1:
continue
for indexing_method_param in indexing_method_params:
id = "-".join(
[
str(indexing_method_param.id),
f"{dimensionality}d",
*(str(index_param.id) for index_param in index_param_prod),
f"v{format}",
]
)
indexing_method = indexing_method_param.values[0]
yield pytest.param(
(format, dimensionality, index, indexing_method), id=id
)


@pytest.fixture(params=list(roundtrip_params()))
def roundtrip(
request: pytest.FixtureRequest,
) -> tuple[Literal[2, 3], int, Index, Callable]:
return request.param
130 changes: 130 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from __future__ import annotations

import operator
import pickle
import platform
from functools import reduce
from pathlib import Path
from typing import TYPE_CHECKING, TypeAlias

import numpy as np
import pytest
import zarr
import zarr.abc.codec
import zarr.codecs

import zarrs

if TYPE_CHECKING:
from collections.abc import Callable, Generator
from types import EllipsisType

Index: TypeAlias = tuple[int | slice | np.ndarray | EllipsisType, ...]


def test_fill_value(arr: zarr.Array, fill_value: int) -> None:
assert np.all(arr[:] == fill_value)


def test_constant(arr: zarr.Array):
arr[:] = 42
assert np.all(arr[:] == 42)


def test_singleton(arr: zarr.Array):
singleton_index = (1,) * len(arr.shape)
non_singleton_index = (0,) * len(arr.shape)
arr[singleton_index] = 42
assert arr[singleton_index] == 42
assert arr[non_singleton_index] != 42


def test_full_array(
arr: zarr.Array, full_array: Callable[[tuple[int, ...]], np.ndarray]
) -> None:
stored_values = full_array(arr.shape)
arr[:] = stored_values
assert np.all(arr[:] == stored_values)


def test_ellipsis_indexing_invalid(arr: zarr.Array):
if len(arr.shape) <= 2:
pytest.skip(
"Ellipsis indexing works for 1D and 2D arrays in zarr-python despite a shape mismatch"
)
stored_value = np.array([1, 2, 3])
expected_errors = (
"references array indices.*out-of-bounds of array shape",
"the size of the chunk subset.*and input/output subset.* are incompatible",
)
with pytest.raises(IndexError, match="|".join(expected_errors)):
arr[2, ...] = stored_value


def test_pickle(arr: zarr.Array, tmp_path: Path):
arr[:] = np.arange(reduce(operator.mul, arr.shape, 1)).reshape(arr.shape)
expected = arr[:]
with Path.open(tmp_path / "arr.pickle", "wb") as f:
pickle.dump(arr._async_array.codec_pipeline, f)
with Path.open(tmp_path / "arr.pickle", "rb") as f:
object.__setattr__(arr._async_array, "codec_pipeline", pickle.load(f))
assert (arr[:] == expected).all()


@pytest.mark.parametrize(
"codec",
[zarr.codecs.BloscCodec(), zarr.codecs.GzipCodec(), zarr.codecs.ZstdCodec()],
)
@pytest.mark.parametrize("should_shard", [True, False])
def test_pipeline_used(
mocker, codec: zarr.abc.codec.BaseCodec, tmp_path: Path, *, should_shard: bool
):
z = zarr.create_array(
tmp_path / "foo.zarr",
dtype=np.float64,
shape=(80, 100),
chunks=(10, 10),
shards=(20, 20) if should_shard else None,
compressors=[codec],
)
spy_read = mocker.spy(z._async_array.codec_pipeline, "read")
spy_write = mocker.spy(z._async_array.codec_pipeline, "write")
assert isinstance(z._async_array.codec_pipeline, zarrs.ZarrsCodecPipeline)
z[...] = np.random.random(z.shape)
z[...]
assert spy_read.call_count == 1
assert spy_write.call_count == 1


@pytest.fixture
def use_zarrs_direct_io() -> Generator[None]:
zarr.config.set(
{
"codec_pipeline.path": "zarrs.ZarrsCodecPipeline",
"codec_pipeline.direct_io": True,
}
)
yield
zarr.config.set(
{
"codec_pipeline.path": "zarrs.ZarrsCodecPipeline",
"codec_pipeline.direct_io": False,
}
)


@pytest.mark.skipif(
platform.system() != "Linux", reason="Can only run O_DIRECT on linux"
)
@pytest.mark.usefixtures("use_zarrs_direct_io")
def test_direct_io(tmp_path: Path) -> None:
z = zarr.create_array(
tmp_path / "foo.zarr",
dtype=np.float64,
shape=(80, 100),
chunks=(10, 10),
shards=(20, 20),
)
ground_truth_arr = np.random.random(z.shape)
z[...] = ground_truth_arr
np.testing.assert_array_equal(z[...], ground_truth_arr)
117 changes: 117 additions & 0 deletions tests/pipeline/test_roundtrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import annotations

from collections.abc import Generator
from contextlib import contextmanager
from types import EllipsisType
from typing import TYPE_CHECKING, Literal, TypeAlias

import numpy as np
import pytest
import zarr

if TYPE_CHECKING:
from collections.abc import Callable, Generator

Index: TypeAlias = tuple[int | slice | np.ndarray | EllipsisType, ...]


@pytest.fixture
def store_values(
roundtrip: tuple[Literal[2, 3], int, Index, Callable],
axis_size: int,
full_array: Callable[[tuple[int, ...]], np.ndarray],
) -> np.ndarray:
_, dimensionality, index, indexing_method = roundtrip
return gen_store_values(
indexing_method,
index,
full_array((axis_size,) * dimensionality),
)


def gen_store_values(
indexing_method: Callable, index: Index, full_array: np.ndarray
) -> np.ndarray:
class smoke:
oindex = "oindex"

def maybe_convert(
i: int | np.ndarray | slice | EllipsisType, axis: int
) -> np.ndarray:
if isinstance(i, np.ndarray):
return i
if isinstance(i, slice):
return np.arange(
i.start if i.start is not None else 0,
i.stop if i.stop is not None else full_array.shape[axis],
)
if isinstance(i, int):
return np.array([i])
if isinstance(i, EllipsisType):
return np.arange(full_array.shape[axis])
raise ValueError(f"Invalid index {i}")

if not isinstance(index, EllipsisType) and indexing_method(smoke()) == "oindex":
index = tuple(maybe_convert(i, axis) for axis, i in enumerate(index))
res = full_array[np.ix_(*index)]
# squeeze out extra dims from integer indexers
if all(i.shape == (1,) for i in index):
res = res.squeeze()
return res
res = res.squeeze(
axis=tuple(axis for axis, i in enumerate(index) if i.shape == (1,))
)
return res
return full_array[index]


# overwrite format and dimensionality from conftest


@pytest.fixture
def format(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> Literal[2, 3]:
return roundtrip[0]


@pytest.fixture
def dimensionality(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> int:
return roundtrip[1]


@pytest.fixture
def index(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> Index:
return roundtrip[2]


@pytest.fixture
def indexing_method(roundtrip: tuple[Literal[2, 3], int, Index, Callable]) -> Callable:
return roundtrip[3]


@contextmanager
def use_zarr_default_codec_reader() -> Generator[None]:
zarr.config.set(
{"codec_pipeline.path": "zarr.core.codec_pipeline.BatchedCodecPipeline"}
)
yield
zarr.config.set({"codec_pipeline.path": "zarrs.ZarrsCodecPipeline"})


def test_roundtrip(
arr: zarr.Array, store_values: np.ndarray, index: Index, indexing_method: Callable
) -> None:
indexing_method(arr)[index] = store_values
res = indexing_method(arr)[index]
assert np.all(res == store_values), res


def test_roundtrip_read_only_zarrs(
arr: zarr.Array, store_values: np.ndarray, index: Index, indexing_method: Callable
) -> None:
with use_zarr_default_codec_reader():
arr_default = zarr.open(arr.store, read_only=True)
indexing_method(arr_default)[index] = store_values
res = indexing_method(zarr.open(arr.store))[index]
assert np.all(
res == store_values,
), res
Loading