From 2548cabd0fb18153e36354ea4d87b7ce1299644c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Fri, 23 Sep 2022 21:33:28 -0500 Subject: [PATCH 1/2] draft: change the way how we read buffered files move PreBuffer down into the RowGroupGenerator and draft the new test Don't concatenate RecordBatchGenerator and add more ideas for the test Avoid keeping the RAM when using the RecordBatchGenerator: Use MakeMappedGenerator instead of MakeConcatenatedGenerator to create the asyncgenerator. cleaning ... format concatenate the async generator when we are using prebuffering format (I forgot to update archery linters to clang-tools 14) prefer concatenation instead to not break parquet pytest, fix some issues using arrow::Future force wait the buffering finish before read (all of this is async) --- cpp/src/arrow/io/memory_test.cc | 33 ++++++++++ .../parquet/arrow/arrow_reader_writer_test.cc | 64 +++++++++++++++++++ cpp/src/parquet/arrow/reader.cc | 40 +++++++----- 3 files changed, 122 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index cdcbe240f858..40baae5ea160 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -36,6 +36,7 @@ #include "arrow/io/slow.h" #include "arrow/io/transform.h" #include "arrow/io/util_internal.h" +#include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" @@ -768,6 +769,38 @@ TEST(RangeReadCache, Basics) { } } +TEST(RangeReadCache, ShouldRetainRam) { + std::string data = "abcdefghijklmnopqrstuvwxyz"; + + CacheOptions options = CacheOptions::Defaults(); + + for (auto lazy : std::vector{false, true}) { + SCOPED_TRACE(lazy); + options.lazy = lazy; + options.range_size_limit = 2; + options.hole_size_limit = 0; + std::shared_ptr data_buffer = std::make_shared(data); + auto file = std::make_shared(data_buffer); + ProxyMemoryPool pool(default_memory_pool()); + IOContext ctx(&pool); + internal::ReadRangeCache cache(file, ctx, options); + + // The test reference and file's reference + ASSERT_EQ(2, data_buffer.use_count()); + + ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {5, 2}})); + ASSERT_FINISHES_OK(cache.Wait()); + // Each cache entry should have a reference to the data buffer + ASSERT_EQ(5, data_buffer.use_count()); + + ASSERT_OK(cache.Read({1, 2})); + ASSERT_OK(cache.Read({3, 2})); + ASSERT_OK(cache.Read({5, 2})); + // Once the cache entries are read they should release their buffer reference + ASSERT_EQ(5, data_buffer.use_count()); + } +} + TEST(RangeReadCache, Concurrency) { std::string data = "abcdefghijklmnopqrstuvwxyz"; diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index d719f0e642e6..67e129519d95 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2467,6 +2467,70 @@ TEST(TestArrowReadWrite, GetRecordBatchGenerator) { } } +TEST(TestArrowReadWrite, ReadShouldNotRetainRam) { + const int num_rows = 1024; + const int row_group_size = 512; + const int num_columns = 2; + const MemoryPool* memory_pool = default_memory_pool(); + + std::shared_ptr arrow_reader; + { + std::shared_ptr table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); + std::shared_ptr buffer; + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer( + std::move(table), row_group_size, default_arrow_writer_properties(), &buffer)); + auto parquet_reader = + ParquetFileReader::Open(std::make_shared(std::move(buffer))); + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_pre_buffer(true); + std::unique_ptr reader; + ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(parquet_reader), + std::move(properties), &reader)); + arrow_reader = std::move(reader); + } + + const std::vector groups = {0, 1}; + const std::vector columns = {0, 1}; + const int64_t prev_mem = memory_pool->bytes_allocated(); + int64_t file_size = 0; + { + ASSERT_OK_AND_ASSIGN(auto batch_generator, arrow_reader->GetRecordBatchGenerator( + arrow_reader, groups, columns)); + auto future_batches = ::arrow::CollectAsyncGenerator(batch_generator).result(); + ASSERT_OK_AND_ASSIGN(auto batches, std::move(future_batches)); + file_size = memory_pool->bytes_allocated() - prev_mem; + } + + const int64_t prev_total_mem = memory_pool->bytes_allocated(); + int64_t total_mem = 0; + { + ASSERT_OK_AND_ASSIGN(auto batch_generator, arrow_reader->GetRecordBatchGenerator( + arrow_reader, groups, columns)); + + int64_t prev_mem = memory_pool->bytes_allocated(); + auto fut1 = batch_generator(); + const int64_t mem_part1 = memory_pool->bytes_allocated() - prev_mem; + // We will get the full content loaded because the use of concatenation + ASSERT_EQ(mem_part1, file_size); + + prev_mem = memory_pool->bytes_allocated(); + auto fut2 = batch_generator(); + const int64_t mem_part2 = memory_pool->bytes_allocated() - prev_mem; + // We don't need more RAM because we already have the full content loaded + ASSERT_EQ(mem_part2, 0); + + prev_mem = memory_pool->bytes_allocated(); + auto fut3 = batch_generator(); + ASSERT_EQ(memory_pool->bytes_allocated() - prev_mem, 0); + + total_mem = memory_pool->bytes_allocated() - prev_total_mem; + } + // We don't keep the RAM either after reading the entire file + ASSERT_EQ(prev_total_mem, memory_pool->bytes_allocated()); + ASSERT_EQ(total_mem, file_size); +} + TEST(TestArrowReadWrite, ScanContents) { const int num_columns = 20; const int num_rows = 1000; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 31480133d7ac..60de51deebc6 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1,3 +1,4 @@ + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -1059,7 +1060,8 @@ class RowGroupGenerator { explicit RowGroupGenerator(std::shared_ptr arrow_reader, ::arrow::internal::Executor* cpu_executor, std::vector row_groups, std::vector column_indices, - int64_t min_rows_in_flight) + int64_t min_rows_in_flight, + ArrowReaderProperties reader_properties) : arrow_reader_(std::move(arrow_reader)), cpu_executor_(cpu_executor), row_groups_(std::move(row_groups)), @@ -1067,14 +1069,17 @@ class RowGroupGenerator { min_rows_in_flight_(min_rows_in_flight), rows_in_flight_(0), index_(0), - readahead_index_(0) {} + readahead_index_(0), + reader_properties_(reader_properties) {} ::arrow::Future operator()() { if (index_ >= row_groups_.size()) { return ::arrow::AsyncGeneratorEnd(); } index_++; + BEGIN_PARQUET_CATCH_EXCEPTIONS FillReadahead(); + END_PARQUET_CATCH_EXCEPTIONS ReadRequest next = std::move(in_flight_reads_.front()); DCHECK(!in_flight_reads_.empty()); in_flight_reads_.pop(); @@ -1107,11 +1112,21 @@ class RowGroupGenerator { if (!reader->properties().pre_buffer()) { row_group_read = SubmitRead(cpu_executor_, reader, row_group, column_indices); } else { - auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); - if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); + auto ready = ::arrow::Future<>::MakeFinished(); row_group_read = ready.Then([=]() -> ::arrow::Future { - return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); + reader->parquet_reader()->PreBuffer({row_group}, column_indices_, + reader_properties_.io_context(), + reader_properties_.cache_options()); + auto wait_buffer = + reader->parquet_reader()->WhenBuffered({row_group}, column_indices); + wait_buffer.Wait(); + auto read = wait_buffer.Then([=]() { + return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); + }); + return read; }); + if (cpu_executor_) + row_group_read = cpu_executor_->TransferAlways(std::move(row_group_read)); } in_flight_reads_.push({std::move(row_group_read), num_rows}); } @@ -1156,6 +1171,7 @@ class RowGroupGenerator { int64_t rows_in_flight_; size_t index_; size_t readahead_index_; + ArrowReaderProperties reader_properties_; }; ::arrow::Result<::arrow::AsyncGenerator>> @@ -1168,20 +1184,14 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, if (rows_to_readahead < 0) { return Status::Invalid("rows_to_readahead must be > 0"); } - if (reader_properties_.pre_buffer()) { - BEGIN_PARQUET_CATCH_EXCEPTIONS - reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); - END_PARQUET_CATCH_EXCEPTIONS - } ::arrow::AsyncGenerator row_group_generator = RowGroupGenerator(::arrow::internal::checked_pointer_cast(reader), cpu_executor, row_group_indices, column_indices, - rows_to_readahead); - ::arrow::AsyncGenerator> concatenated = + rows_to_readahead, reader_properties_); + ::arrow::AsyncGenerator> async_gen = ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); - WRAP_ASYNC_GENERATOR(std::move(concatenated)); - return concatenated; + WRAP_ASYNC_GENERATOR(std::move(async_gen)); + return async_gen; } Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory, From 1077dd58f6fae504eefd9969adae16dd8be00512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Percy=20Camilo=20Trive=C3=B1o=20Aucahuasi?= Date: Mon, 17 Oct 2022 16:29:33 -0500 Subject: [PATCH 2/2] move the error catch macros inside the lambda --- cpp/src/parquet/arrow/reader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 60de51deebc6..9fecce3c05e5 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1077,9 +1077,7 @@ class RowGroupGenerator { return ::arrow::AsyncGeneratorEnd(); } index_++; - BEGIN_PARQUET_CATCH_EXCEPTIONS FillReadahead(); - END_PARQUET_CATCH_EXCEPTIONS ReadRequest next = std::move(in_flight_reads_.front()); DCHECK(!in_flight_reads_.empty()); in_flight_reads_.pop(); @@ -1114,9 +1112,11 @@ class RowGroupGenerator { } else { auto ready = ::arrow::Future<>::MakeFinished(); row_group_read = ready.Then([=]() -> ::arrow::Future { + BEGIN_PARQUET_CATCH_EXCEPTIONS reader->parquet_reader()->PreBuffer({row_group}, column_indices_, reader_properties_.io_context(), reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS auto wait_buffer = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); wait_buffer.Wait();