Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cpp/src/arrow/io/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/io/slow.h"
#include "arrow/io/transform.h"
#include "arrow/io/util_internal.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -768,6 +769,38 @@ TEST(RangeReadCache, Basics) {
}
}

TEST(RangeReadCache, ShouldRetainRam) {
std::string data = "abcdefghijklmnopqrstuvwxyz";

CacheOptions options = CacheOptions::Defaults();

for (auto lazy : std::vector<bool>{false, true}) {
SCOPED_TRACE(lazy);
options.lazy = lazy;
options.range_size_limit = 2;
options.hole_size_limit = 0;
std::shared_ptr<Buffer> data_buffer = std::make_shared<Buffer>(data);
auto file = std::make_shared<BufferReader>(data_buffer);
ProxyMemoryPool pool(default_memory_pool());
IOContext ctx(&pool);
internal::ReadRangeCache cache(file, ctx, options);

// The test reference and file's reference
ASSERT_EQ(2, data_buffer.use_count());

ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {5, 2}}));
ASSERT_FINISHES_OK(cache.Wait());
// Each cache entry should have a reference to the data buffer
ASSERT_EQ(5, data_buffer.use_count());

ASSERT_OK(cache.Read({1, 2}));
ASSERT_OK(cache.Read({3, 2}));
ASSERT_OK(cache.Read({5, 2}));
// Once the cache entries are read they should release their buffer reference
ASSERT_EQ(5, data_buffer.use_count());
}
}

TEST(RangeReadCache, Concurrency) {
std::string data = "abcdefghijklmnopqrstuvwxyz";

Expand Down
64 changes: 64 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2467,6 +2467,70 @@ TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
}
}

TEST(TestArrowReadWrite, ReadShouldNotRetainRam) {
const int num_rows = 1024;
const int row_group_size = 512;
const int num_columns = 2;
const MemoryPool* memory_pool = default_memory_pool();

std::shared_ptr<FileReader> arrow_reader;
{
std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(
std::move(table), row_group_size, default_arrow_writer_properties(), &buffer));
auto parquet_reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(std::move(buffer)));
ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_pre_buffer(true);
std::unique_ptr<FileReader> reader;
ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(parquet_reader),
std::move(properties), &reader));
arrow_reader = std::move(reader);
}

const std::vector<int> groups = {0, 1};
const std::vector<int> columns = {0, 1};
const int64_t prev_mem = memory_pool->bytes_allocated();
int64_t file_size = 0;
{
ASSERT_OK_AND_ASSIGN(auto batch_generator, arrow_reader->GetRecordBatchGenerator(
arrow_reader, groups, columns));
auto future_batches = ::arrow::CollectAsyncGenerator(batch_generator).result();
ASSERT_OK_AND_ASSIGN(auto batches, std::move(future_batches));
file_size = memory_pool->bytes_allocated() - prev_mem;
}

const int64_t prev_total_mem = memory_pool->bytes_allocated();
int64_t total_mem = 0;
{
ASSERT_OK_AND_ASSIGN(auto batch_generator, arrow_reader->GetRecordBatchGenerator(
arrow_reader, groups, columns));

int64_t prev_mem = memory_pool->bytes_allocated();
auto fut1 = batch_generator();
const int64_t mem_part1 = memory_pool->bytes_allocated() - prev_mem;
// We will get the full content loaded because the use of concatenation
ASSERT_EQ(mem_part1, file_size);

prev_mem = memory_pool->bytes_allocated();
auto fut2 = batch_generator();
const int64_t mem_part2 = memory_pool->bytes_allocated() - prev_mem;
// We don't need more RAM because we already have the full content loaded
ASSERT_EQ(mem_part2, 0);

prev_mem = memory_pool->bytes_allocated();
auto fut3 = batch_generator();
ASSERT_EQ(memory_pool->bytes_allocated() - prev_mem, 0);

total_mem = memory_pool->bytes_allocated() - prev_total_mem;
}
// We don't keep the RAM either after reading the entire file
ASSERT_EQ(prev_total_mem, memory_pool->bytes_allocated());
ASSERT_EQ(total_mem, file_size);
}

TEST(TestArrowReadWrite, ScanContents) {
const int num_columns = 20;
const int num_rows = 1000;
Expand Down
40 changes: 25 additions & 15 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand Down Expand Up @@ -1059,15 +1060,17 @@ class RowGroupGenerator {
explicit RowGroupGenerator(std::shared_ptr<FileReaderImpl> arrow_reader,
::arrow::internal::Executor* cpu_executor,
std::vector<int> row_groups, std::vector<int> column_indices,
int64_t min_rows_in_flight)
int64_t min_rows_in_flight,
ArrowReaderProperties reader_properties)
: arrow_reader_(std::move(arrow_reader)),
cpu_executor_(cpu_executor),
row_groups_(std::move(row_groups)),
column_indices_(std::move(column_indices)),
min_rows_in_flight_(min_rows_in_flight),
rows_in_flight_(0),
index_(0),
readahead_index_(0) {}
readahead_index_(0),
reader_properties_(reader_properties) {}

::arrow::Future<RecordBatchGenerator> operator()() {
if (index_ >= row_groups_.size()) {
Expand Down Expand Up @@ -1107,11 +1110,23 @@ class RowGroupGenerator {
if (!reader->properties().pre_buffer()) {
row_group_read = SubmitRead(cpu_executor_, reader, row_group, column_indices);
} else {
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
auto ready = ::arrow::Future<>::MakeFinished();
row_group_read = ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader->parquet_reader()->PreBuffer({row_group}, column_indices_,
reader_properties_.io_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
auto wait_buffer =
reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
wait_buffer.Wait();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm…I think anything calling Wait() in a callback/async context is not going to be right.

I think the issue is that the pre-buffer code doesn't handle concurrent use. The Wait() is effectively just working around that by blocking the thread so that there's no sharing. However, if you attach a reentrant readahead generator to it, I'd guess it'd still fail. So I think either the internals should be refactored so that it does handle concurrent use, or we should just create a separate ReadRangeCache per row group. (The advantage of that is that you'd have a harder bound on memory usage.)

However either way this loses 'nice' properties of the original, buffer-entire-file approach (e.g. small row groups can get combined together for I/O). IMO, the longer term solution would be to disentangle the 'cache' and 'coalesce' behaviors (and possibly even remove the 'cache' behavior, which may make more sense as a wrapper over RandomAccessFile?) and try the approach proposed in the original JIRA, which would be to coalesce ranges, then track when ranges are actually read and remove the buffer from the coalescer once all ranges mapping to a given buffer are read. (The buffer may be kept alive downstream due to shared usage, though.) Or maybe that's still overly fancy.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks David, I'll close this PR in favor of https://issues.apache.org/jira/browse/ARROW-18113

auto read = wait_buffer.Then([=]() {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
});
return read;
});
if (cpu_executor_)
row_group_read = cpu_executor_->TransferAlways(std::move(row_group_read));
}
in_flight_reads_.push({std::move(row_group_read), num_rows});
}
Expand Down Expand Up @@ -1156,6 +1171,7 @@ class RowGroupGenerator {
int64_t rows_in_flight_;
size_t index_;
size_t readahead_index_;
ArrowReaderProperties reader_properties_;
};

::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
Expand All @@ -1168,20 +1184,14 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
if (rows_to_readahead < 0) {
return Status::Invalid("rows_to_readahead must be > 0");
}
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator =
RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
cpu_executor, row_group_indices, column_indices,
rows_to_readahead);
::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> concatenated =
rows_to_readahead, reader_properties_);
::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> async_gen =
::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
WRAP_ASYNC_GENERATOR(std::move(concatenated));
return concatenated;
WRAP_ASYNC_GENERATOR(std::move(async_gen));
return async_gen;
}

Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
Expand Down