diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index cdcbe240f858..40baae5ea160 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -36,6 +36,7 @@ #include "arrow/io/slow.h" #include "arrow/io/transform.h" #include "arrow/io/util_internal.h" +#include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" @@ -768,6 +769,38 @@ TEST(RangeReadCache, Basics) { } } +TEST(RangeReadCache, ShouldRetainRam) { + std::string data = "abcdefghijklmnopqrstuvwxyz"; + + CacheOptions options = CacheOptions::Defaults(); + + for (auto lazy : std::vector{false, true}) { + SCOPED_TRACE(lazy); + options.lazy = lazy; + options.range_size_limit = 2; + options.hole_size_limit = 0; + std::shared_ptr data_buffer = std::make_shared(data); + auto file = std::make_shared(data_buffer); + ProxyMemoryPool pool(default_memory_pool()); + IOContext ctx(&pool); + internal::ReadRangeCache cache(file, ctx, options); + + // The test reference and file's reference + ASSERT_EQ(2, data_buffer.use_count()); + + ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {5, 2}})); + ASSERT_FINISHES_OK(cache.Wait()); + // Each cache entry should have a reference to the data buffer + ASSERT_EQ(5, data_buffer.use_count()); + + ASSERT_OK(cache.Read({1, 2})); + ASSERT_OK(cache.Read({3, 2})); + ASSERT_OK(cache.Read({5, 2})); + // Once the cache entries are read they should release their buffer reference + ASSERT_EQ(5, data_buffer.use_count()); + } +} + TEST(RangeReadCache, Concurrency) { std::string data = "abcdefghijklmnopqrstuvwxyz"; diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index d719f0e642e6..67e129519d95 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2467,6 +2467,70 @@ TEST(TestArrowReadWrite, GetRecordBatchGenerator) { } } +TEST(TestArrowReadWrite, ReadShouldNotRetainRam) { + const int num_rows = 1024; + const int row_group_size = 512; + const int num_columns = 2; + const MemoryPool* memory_pool = default_memory_pool(); + + std::shared_ptr arrow_reader; + { + std::shared_ptr table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); + std::shared_ptr buffer; + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer( + std::move(table), row_group_size, default_arrow_writer_properties(), &buffer)); + auto parquet_reader = + ParquetFileReader::Open(std::make_shared(std::move(buffer))); + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_pre_buffer(true); + std::unique_ptr reader; + ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(parquet_reader), + std::move(properties), &reader)); + arrow_reader = std::move(reader); + } + + const std::vector groups = {0, 1}; + const std::vector columns = {0, 1}; + const int64_t prev_mem = memory_pool->bytes_allocated(); + int64_t file_size = 0; + { + ASSERT_OK_AND_ASSIGN(auto batch_generator, arrow_reader->GetRecordBatchGenerator( + arrow_reader, groups, columns)); + auto future_batches = ::arrow::CollectAsyncGenerator(batch_generator).result(); + ASSERT_OK_AND_ASSIGN(auto batches, std::move(future_batches)); + file_size = memory_pool->bytes_allocated() - prev_mem; + } + + const int64_t prev_total_mem = memory_pool->bytes_allocated(); + int64_t total_mem = 0; + { + ASSERT_OK_AND_ASSIGN(auto batch_generator, arrow_reader->GetRecordBatchGenerator( + arrow_reader, groups, columns)); + + int64_t prev_mem = memory_pool->bytes_allocated(); + auto fut1 = batch_generator(); + const int64_t mem_part1 = memory_pool->bytes_allocated() - prev_mem; + // We will get the full content loaded because the use of concatenation + ASSERT_EQ(mem_part1, file_size); + + prev_mem = memory_pool->bytes_allocated(); + auto fut2 = batch_generator(); + const int64_t mem_part2 = memory_pool->bytes_allocated() - prev_mem; + // We don't need more RAM because we already have the full content loaded + ASSERT_EQ(mem_part2, 0); + + prev_mem = memory_pool->bytes_allocated(); + auto fut3 = batch_generator(); + ASSERT_EQ(memory_pool->bytes_allocated() - prev_mem, 0); + + total_mem = memory_pool->bytes_allocated() - prev_total_mem; + } + // We don't keep the RAM either after reading the entire file + ASSERT_EQ(prev_total_mem, memory_pool->bytes_allocated()); + ASSERT_EQ(total_mem, file_size); +} + TEST(TestArrowReadWrite, ScanContents) { const int num_columns = 20; const int num_rows = 1000; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 31480133d7ac..9fecce3c05e5 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1,3 +1,4 @@ + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -1059,7 +1060,8 @@ class RowGroupGenerator { explicit RowGroupGenerator(std::shared_ptr arrow_reader, ::arrow::internal::Executor* cpu_executor, std::vector row_groups, std::vector column_indices, - int64_t min_rows_in_flight) + int64_t min_rows_in_flight, + ArrowReaderProperties reader_properties) : arrow_reader_(std::move(arrow_reader)), cpu_executor_(cpu_executor), row_groups_(std::move(row_groups)), @@ -1067,7 +1069,8 @@ class RowGroupGenerator { min_rows_in_flight_(min_rows_in_flight), rows_in_flight_(0), index_(0), - readahead_index_(0) {} + readahead_index_(0), + reader_properties_(reader_properties) {} ::arrow::Future operator()() { if (index_ >= row_groups_.size()) { @@ -1107,11 +1110,23 @@ class RowGroupGenerator { if (!reader->properties().pre_buffer()) { row_group_read = SubmitRead(cpu_executor_, reader, row_group, column_indices); } else { - auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); - if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); + auto ready = ::arrow::Future<>::MakeFinished(); row_group_read = ready.Then([=]() -> ::arrow::Future { - return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); + BEGIN_PARQUET_CATCH_EXCEPTIONS + reader->parquet_reader()->PreBuffer({row_group}, column_indices_, + reader_properties_.io_context(), + reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS + auto wait_buffer = + reader->parquet_reader()->WhenBuffered({row_group}, column_indices); + wait_buffer.Wait(); + auto read = wait_buffer.Then([=]() { + return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); + }); + return read; }); + if (cpu_executor_) + row_group_read = cpu_executor_->TransferAlways(std::move(row_group_read)); } in_flight_reads_.push({std::move(row_group_read), num_rows}); } @@ -1156,6 +1171,7 @@ class RowGroupGenerator { int64_t rows_in_flight_; size_t index_; size_t readahead_index_; + ArrowReaderProperties reader_properties_; }; ::arrow::Result<::arrow::AsyncGenerator>> @@ -1168,20 +1184,14 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, if (rows_to_readahead < 0) { return Status::Invalid("rows_to_readahead must be > 0"); } - if (reader_properties_.pre_buffer()) { - BEGIN_PARQUET_CATCH_EXCEPTIONS - reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); - END_PARQUET_CATCH_EXCEPTIONS - } ::arrow::AsyncGenerator row_group_generator = RowGroupGenerator(::arrow::internal::checked_pointer_cast(reader), cpu_executor, row_group_indices, column_indices, - rows_to_readahead); - ::arrow::AsyncGenerator> concatenated = + rows_to_readahead, reader_properties_); + ::arrow::AsyncGenerator> async_gen = ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); - WRAP_ASYNC_GENERATOR(std::move(concatenated)); - return concatenated; + WRAP_ASYNC_GENERATOR(std::move(async_gen)); + return async_gen; } Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,