Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
025bf9b
Add batch to VirtualChunked ReadParameters
sjperkins Jan 22, 2026
52c0f62
Change VirtualChunked::ReadParameters::batch to std::optional<Batch> …
sjperkins Jan 22, 2026
aecca8e
Add missing reference to batch() return type
sjperkins Jan 22, 2026
62f96b9
Store a Batch in VirtualChunked::ReadParameters::batch_ instead of st…
sjperkins Jan 23, 2026
5ebc7cc
Temporarily change linux arm64 runner to ubuntu-22.04-arm
sjperkins Jan 23, 2026
57bc55d
Rework ReadParameters::batch_ member initialization
sjperkins Jan 31, 2026
7abf919
Change ReadParameters::batch() to Batch::View
sjperkins Jan 31, 2026
2f424c5
Remove whitespace
sjperkins Jan 31, 2026
1c93dcd
Prefer list initialisation for ReadParameters::batch_
sjperkins Jan 31, 2026
0fd1130
Merge branch 'master' into add-batch-to-virtual-chunked-ReadParameters
sjperkins Jan 31, 2026
9878d7f
Update ReadParameters::batch() comment
sjperkins Jan 31, 2026
8a6d183
Add C++ test cases for VirtualChunked <-> Batch interaction
sjperkins Feb 1, 2026
0935684
clang-format
sjperkins Feb 1, 2026
88aec5d
Improve python ReadParameters.batch member documentation
sjperkins Feb 1, 2026
88dbf1e
#include "tensorstore/batch.h" in virtual_chunked.cc
sjperkins Feb 2, 2026
b219505
Rework batch tests to pass std::optional<Batch::View>& output_batch
sjperkins Feb 2, 2026
1509c66
Update #includes in tensorstore/virtual_chunked.h
sjperkins Feb 2, 2026
73c2b61
Add missing Schema include in virtual_chunked.cc
sjperkins Feb 3, 2026
c39c090
#include <utility> for std::move in virtual_chunked.h
sjperkins Feb 3, 2026
88f8a1a
Remove testing of Batch implementation in virtual_chunked_test.cc
sjperkins Feb 3, 2026
e539fa9
Remove conditional setting of output_batch in BatchSettingView
sjperkins Feb 3, 2026
6a6a6d9
clang-format virtual_chunked.h
sjperkins Feb 3, 2026
2bf5f48
Improve test case symmetry
sjperkins Feb 3, 2026
98119c4
Add //tensorstore:schema to tensorstore_cc_library(virtual_chunked)
sjperkins Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ jobs:
- os: "ubuntu-latest"
cibw_build: "*"
wheel_identifier: linux_x86_64
- os: "arm-ubuntu-arm-22.04-8core"
- os: "ubuntu-22.04-arm"
# TODO: Revert back to the following before merge
# - os: "arm-ubuntu-arm-22.04-8core"
cibw_build: "*"
wheel_identifier: linux_arm64
- os: "windows-2022"
Expand Down
6 changes: 6 additions & 0 deletions python/tensorstore/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -12185,6 +12185,12 @@ class VirtualChunkedReadParameters:
Virtual views
"""

@property
def batch(self) -> Batch | None:
"""
Batch associated with the read request.
"""

@property
def if_not_equal(self) -> bytes:
"""
Expand Down
48 changes: 48 additions & 0 deletions python/tensorstore/tests/virtual_chunked_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,51 @@ def do_read(
do_read.future = future # type: ignore

gc_tester(t)


def test_read_batch() -> None:
"""Tests reading both with and without a batch."""
array = ts.array(np.arange(np.prod(shape := (4, 5))).reshape(shape))
# Avoid sharing the common threadpool with array
context = ts.Context({"data_copy_concurrency": {"limit": 1}})

def do_batched_read(
domain: ts.IndexDomain,
chunk: np.ndarray,
read_params: ts.VirtualChunkedReadParameters,
) -> None:
assert isinstance(read_params.batch, ts.Batch)
chunk[...] = array[domain].read(batch=read_params.batch).result()

t = ts.virtual_chunked(
do_batched_read,
None,
dtype=array.dtype,
shape=array.shape,
chunk_layout=ts.ChunkLayout(read_chunk_shape=(2, 3)),
context=context,
)

with ts.Batch() as b:
f = t[1:3, 1:3].read(batch=b)

np.testing.assert_array_equal(f.result(), array[1:3, 1:3])

def do_unbatched_read(
domain: ts.IndexDomain,
chunk: np.ndarray,
read_params: ts.VirtualChunkedReadParameters,
) -> None:
assert read_params.batch is None
chunk[...] = array[domain].read(batch=read_params.batch).result()

t2 = ts.virtual_chunked(
do_unbatched_read,
None,
dtype=array.dtype,
shape=array.shape,
chunk_layout=ts.ChunkLayout(read_chunk_shape=(2, 3)),
context=context
)

np.testing.assert_array_equal(t2[1:3, 1:3].read().result(), array[1:3, 1:3])
11 changes: 11 additions & 0 deletions python/tensorstore/virtual_chunked.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "python/tensorstore/time.h"
#include "python/tensorstore/type_name_override.h"
#include "tensorstore/array.h"
#include "tensorstore/batch.h"
#include "tensorstore/container_kind.h"
#include "tensorstore/context.h"
#include "tensorstore/index_space/index_domain.h"
Expand Down Expand Up @@ -96,6 +97,16 @@ Cached generation, read request can be skipped if no newer data is available.
},
R"(
Read may be fulfilled with cached data no older than the specified bound.
)");

cls.def_property_readonly(
"batch",
[](const Self& self) {
if (auto b = self.batch(); b) return std::optional<Batch>{b};
return std::optional<Batch>{std::nullopt};
},
R"(
Batch associated with the read request.
)");
}

Expand Down
1 change: 1 addition & 0 deletions tensorstore/driver/virtual_chunked/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tensorstore_cc_library(
"//tensorstore:open_mode",
"//tensorstore:open_options",
"//tensorstore:rank",
"//tensorstore:schema",
"//tensorstore:staleness_bound",
"//tensorstore:strided_layout",
"//tensorstore:transaction",
Expand Down
6 changes: 5 additions & 1 deletion tensorstore/driver/virtual_chunked/virtual_chunked.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "tensorstore/array.h"
#include "tensorstore/batch.h"
#include "tensorstore/box.h"
#include "tensorstore/chunk_layout.h"
#include "tensorstore/codec_spec.h"
Expand Down Expand Up @@ -61,6 +62,7 @@
#include "tensorstore/open_mode.h"
#include "tensorstore/open_options.h"
#include "tensorstore/rank.h"
#include "tensorstore/schema.h"
#include "tensorstore/serialization/absl_time.h" // IWYU pragma: keep
#include "tensorstore/serialization/std_optional.h" // IWYU pragma: keep
#include "tensorstore/staleness_bound.h"
Expand Down Expand Up @@ -237,7 +239,8 @@ void VirtualChunkedCache::DoRead(EntryOrNode& node,
auto& executor = cache.executor();
// `node` is guaranteed to remain valid until `ReadSuccess` or `ReadError`
// is called. Therefore we don't need to separately hold a reference.
executor([&node, staleness_bound = request.staleness_bound] {
executor([&node, staleness_bound = request.staleness_bound,
batch = Batch(request.batch)] {
Comment thread
sjperkins marked this conversation as resolved.
auto& entry = GetOwningEntry(node);
auto& cache = GetOwningCache(entry);
const auto& component_spec = cache.grid().components.front();
Expand Down Expand Up @@ -267,6 +270,7 @@ void VirtualChunkedCache::DoRead(EntryOrNode& node,
read_params.if_not_equal_ = lock.stamp().generation;
}
read_params.staleness_bound_ = staleness_bound;
read_params.batch_ = std::move(batch);
auto read_future =
cache.read_function_(ConstDataTypeCast<void>(std::move(partial_array)),
std::move(read_params));
Expand Down
75 changes: 75 additions & 0 deletions tensorstore/driver/virtual_chunked/virtual_chunked_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "tensorstore/virtual_chunked.h"

#include <memory>
#include <optional>
#include <utility>
#include <vector>

Expand All @@ -25,6 +26,7 @@
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "tensorstore/array.h"
#include "tensorstore/batch.h"
#include "tensorstore/chunk_layout.h"
#include "tensorstore/context.h"
#include "tensorstore/data_type.h"
Expand All @@ -51,6 +53,7 @@

namespace {

using ::tensorstore::Batch;
using ::tensorstore::DimensionIndex;
using ::tensorstore::dynamic_rank;
using ::tensorstore::Future;
Expand Down Expand Up @@ -587,4 +590,76 @@ TEST(VirtualChunkedTest, NonAtomicSingleChunk) {
TENSORSTORE_ASSERT_OK(future);
}

template <typename... Option>
Result<tensorstore::TensorStore<void, dynamic_rank,
tensorstore::ReadWriteMode::read>>
BatchSettingView(std::optional<Batch::View>& output_batch, Option&&... option) {
auto mutex = std::make_shared<absl::Mutex>();
return tensorstore::VirtualChunked(
tensorstore::NonSerializable{
[mutex, &output_batch](auto output, auto read_params)
-> Future<TimestampedStorageGeneration> {
tensorstore::InitializeArray(output);
absl::MutexLock lock(*mutex.get());
output_batch = read_params.batch();
return TimestampedStorageGeneration{
StorageGeneration::FromString("abc"), absl::Now()};
}},
std::forward<Option>(option)...);
}

// Test passing of Read batch into VirtualChunked ReadParameters
// No Batch argument used in tensorstore::Read
// results in output_batch not set
TEST(VirtualChunkedTest, ReadNoBatch) {
std::optional<Batch::View> output_batch{std::nullopt};
auto virtual_chunked =
BatchSettingView(output_batch, tensorstore::dtype_v<int>,
tensorstore::Schema::Shape({2, 3}),
tensorstore::ChunkLayout::ReadChunkShape({2, 1}));

EXPECT_FALSE(output_batch.has_value());
auto data = tensorstore::Read(virtual_chunked).result();
EXPECT_TRUE(output_batch.has_value());
EXPECT_FALSE(*output_batch);
}

// No Batch argument passed to tensorstore::Read
// results in output_batch not set
TEST(VirtualChunkedTest, ReadNoBatchArgument) {
std::optional<Batch::View> output_batch{std::nullopt};
auto batch = Batch{tensorstore::no_batch};
auto virtual_chunked =
BatchSettingView(output_batch, tensorstore::dtype_v<int>,
tensorstore::Schema::Shape({2, 3}),
tensorstore::ChunkLayout::ReadChunkShape({2, 1}));

EXPECT_FALSE(output_batch.has_value());
auto read_future = tensorstore::Read(virtual_chunked, batch);
batch.Release();
auto data = read_future.result();

EXPECT_TRUE(output_batch.has_value());
EXPECT_FALSE(*output_batch);
}

// Batch argument passed to tensorstore::Read
// is reflected in output_batch
TEST(VirtualChunkedTest, ReadBatchArgument) {
std::optional<Batch::View> output_batch{std::nullopt};
auto virtual_chunked =
BatchSettingView(output_batch, tensorstore::dtype_v<int>,
tensorstore::Schema::Shape({2, 3}),
tensorstore::ChunkLayout::ReadChunkShape({2, 1}));
auto batch = Batch::New();
Batch::View batch_view{batch};
EXPECT_FALSE(output_batch.has_value());
auto read_future = tensorstore::Read(virtual_chunked, batch);
batch.Release();
auto data = read_future.result();

EXPECT_TRUE(output_batch.has_value());
EXPECT_TRUE(*output_batch);
}

} // namespace
21 changes: 17 additions & 4 deletions tensorstore/virtual_chunked.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,22 +243,32 @@
/// no different than binding the transaction to an existing virtual chunked
/// view.

#include <functional>
#include <type_traits>
#include <utility>

#include "absl/base/attributes.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/time/time.h"
#include "tensorstore/array.h"
#include "tensorstore/box.h"
#include "tensorstore/batch.h"
#include "tensorstore/context.h"
#include "tensorstore/driver/driver.h"
#include "tensorstore/index.h"
#include "tensorstore/kvstore/generation.h"
#include "tensorstore/open_mode.h"
#include "tensorstore/rank.h"
#include "tensorstore/schema.h"
#include "tensorstore/serialization/function.h"
#include "tensorstore/staleness_bound.h"
#include "tensorstore/static_cast.h"
#include "tensorstore/strided_layout.h"
#include "tensorstore/tensorstore.h"
#include "tensorstore/transaction.h"
#include "tensorstore/util/executor.h"
#include "tensorstore/util/option.h"
#include "tensorstore/util/future.h"
#include "tensorstore/util/result.h"
#include "tensorstore/util/status.h"

namespace tensorstore {
namespace virtual_chunked {
Expand All @@ -278,11 +288,14 @@ class ReadParameters {
/// Read may be fulfilled with cached data no older than the specified bound.
absl::Time staleness_bound() const { return staleness_bound_; }

// Treat as private:
/// Batch associated with the read request.
Batch::View batch() const { return batch_; }

// Treat as private:
Executor executor_;
StorageGeneration if_not_equal_;
absl::Time staleness_bound_;
Batch batch_{no_batch};
};

/// Type-erased function called to read a single chunk.
Expand Down