From 3a9a064e3263d9dc25a86928ba04c78c0c7b807f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jun 2026 11:41:01 +0800 Subject: [PATCH 1/2] [improvement](be) Add scanner v2 parquet page cache ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: File scanner v2 reads Parquet through Arrow, so the old vparquet page cache path is not used. Repeated scans still go through the Doris file reader for serialized Parquet column chunk data even when the Parquet page cache option is enabled. This change registers the selected Parquet column chunk byte ranges after row-group planning and lets the Arrow RandomAccessFile adapter reuse StoragePageCache for reads inside those ranges. Footer and metadata reads happen before range registration and are intentionally excluded. ### Release note None ### Check List (For Author) - Test: Manual test - Ran git diff --check. - Ran build-support/run_clang_format.py with clang-format 16 on modified BE files. - Could not compile with existing be/cmake-build-debug-dev-perf because CMakeCache.txt was generated for /mnt/disk3/gabriel/Workspace/dev1/doris and the configured ninja path is not available in this worktree. - Behavior changed: No - Does this need documentation: No --- .../parquet/parquet_file_context.cpp | 291 +++++++++++++++++- .../format_v2/parquet/parquet_file_context.h | 63 +++- be/src/format_v2/parquet/parquet_reader.cpp | 123 +++++++- be/src/format_v2/parquet/parquet_reader.h | 4 + .../parquet/parquet_page_cache_range_test.cpp | 117 +++++++ .../format_v2/parquet/parquet_reader_test.cpp | 66 ++++ 6 files changed, 658 insertions(+), 6 deletions(-) create mode 100644 be/test/format_v2/parquet/parquet_page_cache_range_test.cpp diff --git a/be/src/format_v2/parquet/parquet_file_context.cpp b/be/src/format_v2/parquet/parquet_file_context.cpp index 60e48c947bd3e2..3cb5655144be95 100644 --- a/be/src/format_v2/parquet/parquet_file_context.cpp +++ b/be/src/format_v2/parquet/parquet_file_context.cpp @@ -19,17 +19,156 @@ #include #include +#include +#include #include +#include +#include #include +#include +#include #include +#include "common/check.h" +#include "common/config.h" +#include "io/file_factory.h" #include "io/fs/file_reader.h" +#include "storage/cache/page_cache.h" #include "util/slice.h" namespace doris::format::parquet { + +namespace detail { + +std::vector plan_page_cache_range_read( + int64_t position, int64_t nbytes, const std::vector& cached_ranges) { + if (position < 0 || nbytes <= 0) { + return {}; + } + + std::vector ranges; + ranges.reserve(cached_ranges.size()); + const int64_t request_end = position + nbytes; + for (const auto& range : cached_ranges) { + if (range.size > 0 && range.offset < request_end && position < range.end_offset()) { + ranges.push_back(range); + } + } + std::sort(ranges.begin(), ranges.end(), [](const auto& lhs, const auto& rhs) { + if (lhs.offset != rhs.offset) { + return lhs.offset < rhs.offset; + } + return lhs.size > rhs.size; + }); + + std::vector plan; + int64_t cursor = position; + while (cursor < request_end) { + // At each cursor position, choose the cached range that already covers the cursor and + // extends farthest to the right. This handles both adjacent ranges and overlapping + // ranges. If no range covers the current cursor, there is a gap and the request must + // miss as a whole. + auto best = ranges.end(); + int64_t best_end = cursor; + for (auto it = ranges.begin(); it != ranges.end(); ++it) { + const int64_t cached_end = it->end_offset(); + if (it->offset <= cursor && cursor < cached_end && cached_end > best_end) { + best = it; + best_end = cached_end; + } + } + if (best == ranges.end()) { + return {}; + } + const int64_t copy_size = std::min(best_end, request_end) - cursor; + ParquetPageCacheReadPlanEntry entry; + entry.cached_range = *best; + entry.copy_offset_in_cache = cursor - best->offset; + entry.output_offset = cursor - position; + entry.copy_size = copy_size; + plan.push_back(entry); + cursor += copy_size; + } + return plan; +} + +} // namespace detail + namespace { +// StoragePageCache only supports exact-key lookup. Keep lightweight range metadata here so later +// Arrow ReadAt requests can reuse cached bytes when their requested ranges are subsets of, or are +// fully covered by, previously cached ranges. Stale metadata is pruned on lookup. +std::mutex cached_page_range_index_mutex; +std::unordered_map> cached_page_range_index; +constexpr size_t MAX_CACHED_PAGE_RANGE_FILES = 4096; +constexpr size_t MAX_CACHED_PAGE_RANGES_PER_FILE = 65536; + +void register_cached_page_range(const std::string& file_key, int64_t position, int64_t nbytes) { + DORIS_CHECK(nbytes > 0); + std::lock_guard lock(cached_page_range_index_mutex); + if (cached_page_range_index.find(file_key) == cached_page_range_index.end() && + cached_page_range_index.size() >= MAX_CACHED_PAGE_RANGE_FILES) { + cached_page_range_index.erase(cached_page_range_index.begin()); + } + auto& ranges = cached_page_range_index[file_key]; + auto it = std::find_if(ranges.begin(), ranges.end(), [&](const ParquetPageCacheRange& range) { + return range.offset == position && range.size == nbytes; + }); + if (it == ranges.end()) { + if (ranges.size() >= MAX_CACHED_PAGE_RANGES_PER_FILE) { + ranges.erase(ranges.begin()); + } + ranges.push_back(ParquetPageCacheRange {position, nbytes}); + } +} + +void unregister_cached_page_range(const std::string& file_key, + const ParquetPageCacheRange& stale_range) { + std::lock_guard lock(cached_page_range_index_mutex); + auto it = cached_page_range_index.find(file_key); + if (it == cached_page_range_index.end()) { + return; + } + auto& ranges = it->second; + ranges.erase(std::remove_if(ranges.begin(), ranges.end(), + [&](const ParquetPageCacheRange& range) { + return range.offset == stale_range.offset && + range.size == stale_range.size; + }), + ranges.end()); + if (ranges.empty()) { + cached_page_range_index.erase(it); + } +} + +std::vector cached_page_ranges_for_file(const std::string& file_key) { + std::lock_guard lock(cached_page_range_index_mutex); + auto it = cached_page_range_index.find(file_key); + if (it == cached_page_range_index.end()) { + return {}; + } + return it->second; +} + +std::string build_page_cache_file_key(const io::FileReader& file_reader, + const io::FileDescription& file_description) { + const int64_t mtime = + file_description.mtime != 0 ? file_description.mtime : file_reader.mtime(); + if (mtime == 0) { + // StoragePageCache is process-global. A key with only path + unknown mtime can outlive a + // rewritten local test file, or any external file whose version was not propagated. Disable + // v2 parquet page cache until the scan descriptor carries a stable object version. + return {}; + } + const int64_t file_size = file_description.file_size >= 0 + ? file_description.file_size + : static_cast(file_reader.size()); + return fmt::format("{}::{}::mtime={}::size={}", file_description.fs_name, + file_reader.path().native(), mtime, file_size); +} + // 将 Doris 的 io::FileReader 适配为 Arrow 的 RandomAccessFile 接口。 // // ParquetFileReader::Open() 要求一个 Arrow::RandomAccessFile, @@ -37,8 +176,13 @@ namespace { // Seek() 和 Tell() 维护了内部的 position 游标用于顺序 read() 操作。 class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { public: - DorisRandomAccessFile(io::FileReaderSPtr file_reader, io::IOContext* io_ctx) - : _file_reader(std::move(file_reader)), _io_ctx(io_ctx) { + DorisRandomAccessFile(io::FileReaderSPtr file_reader, io::IOContext* io_ctx, + bool enable_page_cache, std::string page_cache_file_key) + : _file_reader(std::move(file_reader)), + _io_ctx(io_ctx), + _enable_page_cache(enable_page_cache), + _page_cache_file_key(std::move(page_cache_file_key)) { + DORIS_CHECK(_file_reader != nullptr); set_mode(arrow::io::FileMode::READ); } @@ -87,6 +231,9 @@ class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { if (position < 0 || nbytes < 0) { return arrow::Status::Invalid("negative read position or length"); } + if (try_read_from_page_cache(position, nbytes, out)) { + return nbytes; + } size_t bytes_read = 0; Status st = _file_reader->read_at( static_cast(position), @@ -95,6 +242,7 @@ class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { if (!st.ok()) { return arrow::Status::IOError(st.to_string_no_stack()); } + insert_page_cache(position, nbytes, out, bytes_read); return static_cast(bytes_read); } @@ -107,11 +255,127 @@ class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { return buffer; } + void register_page_cache_ranges(std::vector ranges) { + std::lock_guard lock(_page_cache_mutex); + _page_cache_ranges = std::move(ranges); + } + + ParquetPageCacheStats page_cache_stats() const { + std::lock_guard lock(_page_cache_mutex); + return _page_cache_stats; + } + private: + bool page_cache_enabled() const { + return _enable_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr && !_page_cache_file_key.empty(); + } + + bool range_in_page_cache_scope(int64_t position, int64_t nbytes) const { + if (nbytes <= 0) { + return false; + } + const int64_t end = position + nbytes; + for (const auto& range : _page_cache_ranges) { + const int64_t range_end = range.offset + range.size; + if (position >= range.offset && end <= range_end) { + return true; + } + } + return false; + } + + StoragePageCache::CacheKey page_cache_key(int64_t position, int64_t nbytes) const { + return StoragePageCache::CacheKey(_page_cache_file_key, + static_cast(position + nbytes), position); + } + + bool copy_cached_range(const ParquetPageCacheRange& cached_range, int64_t copy_position, + int64_t copy_size, void* out, int64_t output_offset) { + PageCacheHandle handle; + if (!StoragePageCache::instance()->lookup( + page_cache_key(cached_range.offset, cached_range.size), &handle, + segment_v2::DATA_PAGE)) { + unregister_cached_page_range(_page_cache_file_key, cached_range); + return false; + } + Slice cached = handle.data(); + const int64_t cache_offset = copy_position - cached_range.offset; + DORIS_CHECK(cache_offset >= 0); + DORIS_CHECK(cached.size >= static_cast(cache_offset + copy_size)); + memcpy(static_cast(out) + output_offset, cached.data + cache_offset, + static_cast(copy_size)); + return true; + } + + bool try_read_from_cached_ranges(int64_t position, int64_t nbytes, void* out) { + auto plan = detail::plan_page_cache_range_read( + position, nbytes, cached_page_ranges_for_file(_page_cache_file_key)); + if (plan.empty()) { + return false; + } + for (const auto& entry : plan) { + if (!copy_cached_range(entry.cached_range, + entry.cached_range.offset + entry.copy_offset_in_cache, + entry.copy_size, out, entry.output_offset)) { + return false; + } + } + return true; + } + + bool try_read_from_page_cache(int64_t position, int64_t nbytes, void* out) { + std::lock_guard lock(_page_cache_mutex); + if (!page_cache_enabled() || !range_in_page_cache_scope(position, nbytes)) { + return false; + } + ++_page_cache_stats.read_count; + // Fast path: Arrow issues the same ReadAt(offset, size) again, so the exact + // StoragePageCache key matches. + // + // Fallback path: Arrow may read a different but related byte range on another scan. + // Examples: + // - Current request [120, 150) can be served from cached [100, 200) by copying the + // 30-byte subset starting at cached offset 20. + // - Current request [100, 260) can be served by stitching cached [100, 180) and + // [180, 260). If any middle span is missing, it is a miss and the file reader fills + // the whole request from storage. + if (!copy_cached_range(ParquetPageCacheRange {position, nbytes}, position, nbytes, out, + 0) && + !try_read_from_cached_ranges(position, nbytes, out)) { + ++_page_cache_stats.miss_count; + return false; + } + ++_page_cache_stats.hit_count; + ++_page_cache_stats.compressed_hit_count; + return true; + } + + void insert_page_cache(int64_t position, int64_t nbytes, const void* data, size_t bytes_read) { + std::lock_guard lock(_page_cache_mutex); + if (!page_cache_enabled() || !range_in_page_cache_scope(position, nbytes) || + bytes_read != static_cast(nbytes)) { + return; + } + auto* page = new DataPage(bytes_read, true, segment_v2::DATA_PAGE); + memcpy(page->data(), data, bytes_read); + PageCacheHandle handle; + StoragePageCache::instance()->insert(page_cache_key(position, nbytes), page, &handle, + segment_v2::DATA_PAGE); + register_cached_page_range(_page_cache_file_key, position, nbytes); + ++_page_cache_stats.write_count; + ++_page_cache_stats.compressed_write_count; + } + io::FileReaderSPtr _file_reader; io::IOContext* _io_ctx = nullptr; int64_t _pos = 0; bool _closed = false; + bool _enable_page_cache = false; + std::string _page_cache_file_key; + mutable std::mutex _page_cache_mutex; + std::vector _page_cache_ranges; + ParquetPageCacheStats _page_cache_stats; }; } // namespace @@ -129,8 +393,14 @@ Status arrow_status_to_doris_status(const arrow::Status& status) { return Status::InternalError(status.ToString()); } -Status ParquetFileContext::open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx) { - arrow_file = std::make_shared(std::move(input_file_reader), io_ctx); +Status ParquetFileContext::open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx, + bool enable_page_cache, + const io::FileDescription& file_description) { + DORIS_CHECK(input_file_reader != nullptr); + auto page_cache_file_key = build_page_cache_file_key(*input_file_reader, file_description); + arrow_file = std::make_shared(std::move(input_file_reader), io_ctx, + enable_page_cache, + std::move(page_cache_file_key)); try { // TODO: Cache parquet metadata in file system layer to avoid repeated metadata read for same file. this->file_reader = ::parquet::ParquetFileReader::Open( @@ -149,6 +419,19 @@ Status ParquetFileContext::open(io::FileReaderSPtr input_file_reader, io::IOCont return Status::OK(); } +void ParquetFileContext::register_page_cache_ranges(std::vector ranges) { + DORIS_CHECK(arrow_file != nullptr); + static_cast(arrow_file.get()) + ->register_page_cache_ranges(std::move(ranges)); +} + +ParquetPageCacheStats ParquetFileContext::page_cache_stats() const { + if (arrow_file == nullptr) { + return {}; + } + return static_cast(arrow_file.get())->page_cache_stats(); +} + Status ParquetFileContext::close() { if (file_reader != nullptr) { try { diff --git a/be/src/format_v2/parquet/parquet_file_context.h b/be/src/format_v2/parquet/parquet_file_context.h index cd8f9a0b962bbf..a1b7a49b6b89b8 100644 --- a/be/src/format_v2/parquet/parquet_file_context.h +++ b/be/src/format_v2/parquet/parquet_file_context.h @@ -20,13 +20,68 @@ #include #include +#include #include +#include #include "common/status.h" #include "io/fs/file_reader.h" +namespace doris::io { +struct FileDescription; +} // namespace doris::io + namespace doris::format::parquet { +struct ParquetPageCacheRange { + int64_t offset = 0; + int64_t size = 0; + + int64_t end_offset() const { return offset + size; } +}; + +struct ParquetPageCacheReadPlanEntry { + // The exact cached StoragePageCache entry. The final cache key is still exact-range based: + // file key + cached_range.end_offset() + cached_range.offset. + ParquetPageCacheRange cached_range; + // Byte offset inside cached_range to start copying from. + int64_t copy_offset_in_cache = 0; + // Byte offset inside the current ReadAt output buffer to start writing to. + int64_t output_offset = 0; + int64_t copy_size = 0; +}; + +struct ParquetPageCacheStats { + int64_t read_count = 0; + int64_t write_count = 0; + int64_t compressed_write_count = 0; + int64_t hit_count = 0; + int64_t miss_count = 0; + int64_t compressed_hit_count = 0; +}; + +namespace detail { + +// Build the copy plan for a ReadAt(position, nbytes) request from the range metadata of +// previously cached entries. +// +// StoragePageCache cannot do range lookup by itself; it can only lookup an exact key. The +// caller therefore keeps lightweight cached range metadata and uses this function to decide +// which exact cache entries to fetch and which byte spans to copy. +// +// Examples: +// 1. Subset hit: +// request [120, 150), cached [100, 200) -> copy 30 bytes from cached offset 20. +// 2. Superset hit covered by multiple cached entries: +// request [100, 260), cached [100, 180) and [180, 260) +// -> two copies: [100, 180) to output offset 0, [180, 260) to output offset 80. +// 3. Partial overlap is a miss: +// request [100, 260), cached [100, 180) only -> empty plan, caller reads from file. +std::vector plan_page_cache_range_read( + int64_t position, int64_t nbytes, const std::vector& cached_ranges); + +} // namespace detail + // Parquet 文件上下文 — 管理 Arrow 层文件对象和元数据的生命周期。 // // 该类是 Doris 与 Arrow Parquet C++ library 的边界: @@ -43,7 +98,13 @@ struct ParquetFileContext { std::shared_ptr<::parquet::FileMetaData> metadata; // Footer metadata (RowGroup 信息) const ::parquet::SchemaDescriptor* schema = nullptr; // 物理 leaf column schema - Status open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx); + Status open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx, bool enable_page_cache, + const io::FileDescription& file_description); + // Register file ranges that belong to selected Parquet column chunks. Arrow still owns page + // decoding, so v2 caches the serialized bytes read inside these ranges and excludes + // footer/metadata reads that happen before registration. + void register_page_cache_ranges(std::vector ranges); + ParquetPageCacheStats page_cache_stats() const; Status close(); }; diff --git a/be/src/format_v2/parquet/parquet_reader.cpp b/be/src/format_v2/parquet/parquet_reader.cpp index af748313d2bb4a..f7dc39ce7967f4 100644 --- a/be/src/format_v2/parquet/parquet_reader.cpp +++ b/be/src/format_v2/parquet/parquet_reader.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -49,8 +50,97 @@ struct ParquetReaderScanState { ParquetScanScheduler scheduler; const cctz::time_zone* timezone = nullptr; bool enable_bloom_filter = false; + bool enable_page_cache = false; }; +int64_t column_chunk_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { + return column_metadata.has_dictionary_page() + ? cast_set(column_metadata.dictionary_page_offset()) + : cast_set(column_metadata.data_page_offset()); +} + +void collect_all_leaf_column_ids(const ParquetColumnSchema& column_schema, + std::unordered_set* leaf_column_ids) { + DORIS_CHECK(leaf_column_ids != nullptr); + if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) { + if (column_schema.leaf_column_id >= 0) { + leaf_column_ids->insert(column_schema.leaf_column_id); + } + return; + } + for (const auto& child : column_schema.children) { + DORIS_CHECK(child != nullptr); + collect_all_leaf_column_ids(*child, leaf_column_ids); + } +} + +void collect_projected_leaf_column_ids(const ParquetColumnSchema& column_schema, + const format::LocalColumnIndex& projection, + std::unordered_set* leaf_column_ids) { + DORIS_CHECK(leaf_column_ids != nullptr); + if (projection.project_all_children || projection.children.empty()) { + collect_all_leaf_column_ids(column_schema, leaf_column_ids); + return; + } + for (const auto& child_projection : projection.children) { + const auto child_it = + std::ranges::find_if(column_schema.children, [&](const auto& child_schema) { + return child_schema->local_id == child_projection.local_id(); + }); + DORIS_CHECK(child_it != column_schema.children.end()); + collect_projected_leaf_column_ids(**child_it, child_projection, leaf_column_ids); + } +} + +void collect_request_leaf_column_ids( + const std::vector>& file_schema, + const format::FileScanRequest& request, std::unordered_set* leaf_column_ids) { + DORIS_CHECK(leaf_column_ids != nullptr); + auto collect_scan_column = [&](const format::LocalColumnIndex& projection) { + const auto local_id = projection.local_id(); + if (local_id == format::ROW_POSITION_COLUMN_ID || + local_id == format::GLOBAL_ROWID_COLUMN_ID) { + return; + } + DORIS_CHECK(local_id >= 0 && local_id < static_cast(file_schema.size())); + DORIS_CHECK(file_schema[local_id] != nullptr); + collect_projected_leaf_column_ids(*file_schema[local_id], projection, leaf_column_ids); + }; + for (const auto& column : request.predicate_columns) { + collect_scan_column(column); + } + for (const auto& column : request.non_predicate_columns) { + collect_scan_column(column); + } +} + +std::vector build_page_cache_ranges( + const ::parquet::FileMetaData& metadata, + const std::vector>& file_schema, + const format::FileScanRequest& request, const RowGroupScanPlan& row_group_plan) { + std::unordered_set leaf_column_ids; + collect_request_leaf_column_ids(file_schema, request, &leaf_column_ids); + std::vector ranges; + ranges.reserve(row_group_plan.row_groups.size() * leaf_column_ids.size()); + for (const auto& row_group_plan_item : row_group_plan.row_groups) { + auto row_group_metadata = metadata.RowGroup(row_group_plan_item.row_group_id); + DORIS_CHECK(row_group_metadata != nullptr); + for (const auto leaf_column_id : leaf_column_ids) { + DORIS_CHECK(leaf_column_id >= 0 && leaf_column_id < row_group_metadata->num_columns()); + auto column_metadata = row_group_metadata->ColumnChunk(leaf_column_id); + DORIS_CHECK(column_metadata != nullptr); + const int64_t offset = column_chunk_start_offset(*column_metadata); + const int64_t size = column_metadata->total_compressed_size(); + DORIS_CHECK(offset >= 0); + DORIS_CHECK(size >= 0); + if (size > 0) { + ranges.push_back(ParquetPageCacheRange {.offset = offset, .size = size}); + } + } + } + return ranges; +} + DataTypePtr nullable_like_original(const DataTypePtr& type, DataTypePtr nested_type) { return type != nullptr && type->is_nullable() ? make_nullable(nested_type) : nested_type; } @@ -189,13 +279,16 @@ Status ParquetReader::init(RuntimeState* state) { _state = std::make_unique(); _state->enable_bloom_filter = state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter; + _state->enable_page_cache = + state != nullptr && state->query_options().enable_parquet_file_page_cache; if (state != nullptr) { _state->timezone = &state->timezone_obj(); _state->scheduler.set_timezone(&state->timezone_obj()); _state->scheduler.set_enable_strict_mode(state->enable_strict_mode()); } // Open parquet file and parse metadata to get file schema. - RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get())); + RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get(), + _state->enable_page_cache, *_file_description)); // Build file schema from parquet metadata. // A file reader may expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier RETURN_IF_ERROR( @@ -298,6 +391,11 @@ Status ParquetReader::open(std::shared_ptr request) { if (_profile != nullptr) { _parquet_profile.update_pruning_stats(row_group_plan.pruning_stats); } + if (_state->enable_page_cache) { + _state->file_context.register_page_cache_ranges( + build_page_cache_ranges(*_state->file_context.metadata, _state->file_schema, + *request_snapshot, row_group_plan)); + } _state->scan_plan = row_group_plan; _state->scheduler.set_page_skip_profile(_parquet_profile.page_skip_profile()); _state->scheduler.set_global_rowid_context(_global_rowid_context); @@ -325,6 +423,7 @@ Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) { const auto predicate_filtered_rows_before = _state->scheduler.predicate_filtered_rows(); RETURN_IF_ERROR(_state->scheduler.read_next_batch(_state->file_context, _state->file_schema, *request_snapshot, file_block, rows, eof)); + _sync_page_cache_profile(); if (_io_ctx != nullptr) { _io_ctx->predicate_filtered_rows += _state->scheduler.predicate_filtered_rows() - predicate_filtered_rows_before; @@ -333,6 +432,27 @@ Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) { return Status::OK(); } +void ParquetReader::_sync_page_cache_profile() { + if (_profile == nullptr || _state == nullptr) { + return; + } + const auto stats = _state->file_context.page_cache_stats(); + COUNTER_UPDATE(_parquet_profile.page_read_counter, + stats.read_count - _reported_page_cache_stats.read_count); + COUNTER_UPDATE(_parquet_profile.page_cache_write_counter, + stats.write_count - _reported_page_cache_stats.write_count); + COUNTER_UPDATE( + _parquet_profile.page_cache_compressed_write_counter, + stats.compressed_write_count - _reported_page_cache_stats.compressed_write_count); + COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter, + stats.hit_count - _reported_page_cache_stats.hit_count); + COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter, + stats.miss_count - _reported_page_cache_stats.miss_count); + COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter, + stats.compressed_hit_count - _reported_page_cache_stats.compressed_hit_count); + _reported_page_cache_stats = stats; +} + void ParquetReader::set_condition_cache_context(std::shared_ptr ctx) { if (_state == nullptr) { return; @@ -431,6 +551,7 @@ Status ParquetReader::get_aggregate_result(const format::FileAggregateRequest& r Status ParquetReader::close() { if (_state != nullptr) { + _sync_page_cache_profile(); RETURN_IF_ERROR(_state->file_context.close()); } return FileReader::close(); diff --git a/be/src/format_v2/parquet/parquet_reader.h b/be/src/format_v2/parquet/parquet_reader.h index 8112108e32d207..a85ccfede37c8b 100644 --- a/be/src/format_v2/parquet/parquet_reader.h +++ b/be/src/format_v2/parquet/parquet_reader.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "format_v2/file_reader.h" #include "format_v2/parquet/parquet_column_schema.h" +#include "format_v2/parquet/parquet_file_context.h" #include "format_v2/parquet/parquet_profile.h" namespace doris { @@ -94,6 +95,8 @@ class ParquetReader : public format::FileReader { void _init_profile() override; private: + void _sync_page_cache_profile(); + // 递归将 ParquetColumnSchema 树转换为 ColumnDefinition。 // identifier 生成规则:有 parquet_field_id → Field(INT, field_id),否则 → Field(STRING, name) void _fill_column_definition(const ParquetColumnSchema& column_schema, @@ -102,6 +105,7 @@ class ParquetReader : public format::FileReader { std::unique_ptr _state; // 全部扫描状态(file_context + schema + scheduler) ParquetProfile _parquet_profile; // RuntimeProfile 计数器集合 + ParquetPageCacheStats _reported_page_cache_stats; std::optional _global_rowid_context; // 全局 RowId 上下文 bool _enable_mapping_timestamp_tz = false; // 是否将 UTC timestamp 映射为 TIMESTAMPTZ }; diff --git a/be/test/format_v2/parquet/parquet_page_cache_range_test.cpp b/be/test/format_v2/parquet/parquet_page_cache_range_test.cpp new file mode 100644 index 00000000000000..f8e12206bb1220 --- /dev/null +++ b/be/test/format_v2/parquet/parquet_page_cache_range_test.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "format_v2/parquet/parquet_file_context.h" + +namespace doris::format::parquet { +namespace { + +void expect_plan_entry(const ParquetPageCacheReadPlanEntry& entry, + const ParquetPageCacheRange& cached_range, int64_t copy_offset_in_cache, + int64_t output_offset, int64_t copy_size) { + EXPECT_EQ(entry.cached_range.offset, cached_range.offset); + EXPECT_EQ(entry.cached_range.size, cached_range.size); + EXPECT_EQ(entry.copy_offset_in_cache, copy_offset_in_cache); + EXPECT_EQ(entry.output_offset, output_offset); + EXPECT_EQ(entry.copy_size, copy_size); +} + +TEST(ParquetPageCacheRangeTest, SubsetRequestHitsSingleCachedRange) { + const std::vector cached_ranges = { + {100, 100}, + }; + + // Request [120, 150) is fully inside cached [100, 200). The reader should lookup + // the exact cached key [100, 200), then copy from cached offset 20 into output offset 0. + auto plan = detail::plan_page_cache_range_read(120, 30, cached_ranges); + + ASSERT_EQ(plan.size(), 1); + expect_plan_entry(plan[0], {100, 100}, 20, 0, 30); +} + +TEST(ParquetPageCacheRangeTest, SupersetRequestHitsMultipleAdjacentCachedRanges) { + const std::vector cached_ranges = { + {180, 80}, + {100, 80}, + }; + + // Request [100, 260) is larger than either cached entry, but the two cached ranges + // exactly cover it. The copy plan stitches the two exact cache entries together. + auto plan = detail::plan_page_cache_range_read(100, 160, cached_ranges); + + ASSERT_EQ(plan.size(), 2); + expect_plan_entry(plan[0], {100, 80}, 0, 0, 80); + expect_plan_entry(plan[1], {180, 80}, 0, 80, 80); +} + +TEST(ParquetPageCacheRangeTest, SupersetRequestCanUseOverlappingCachedRanges) { + const std::vector cached_ranges = { + {150, 110}, + {100, 100}, + }; + + // Request [100, 260) is covered by overlapping cached ranges. The first copy uses + // [100, 200); the second resumes at cursor 200 and copies the tail from [150, 260). + auto plan = detail::plan_page_cache_range_read(100, 160, cached_ranges); + + ASSERT_EQ(plan.size(), 2); + expect_plan_entry(plan[0], {100, 100}, 0, 0, 100); + expect_plan_entry(plan[1], {150, 110}, 50, 100, 60); +} + +TEST(ParquetPageCacheRangeTest, PartialOverlapWithoutFullCoverageMisses) { + const std::vector cached_ranges = { + {100, 80}, + {200, 60}, + }; + + // Cached ranges cover [100, 180) and [200, 260), but [180, 200) is missing. + // The caller must read the whole request from the file instead of returning + // a partially cached result. + auto plan = detail::plan_page_cache_range_read(100, 160, cached_ranges); + + EXPECT_TRUE(plan.empty()); +} + +TEST(ParquetPageCacheRangeTest, NonCoveringAndInvalidRangesAreIgnored) { + const std::vector cached_ranges = { + {50, 20}, {100, 0}, {100, -1}, {180, 20}, {120, 30}, + }; + + // Only [120, 150) intersects the request, but it does not cover the request start + // [100, 120), so this is still a miss. + auto plan = detail::plan_page_cache_range_read(100, 50, cached_ranges); + + EXPECT_TRUE(plan.empty()); +} + +TEST(ParquetPageCacheRangeTest, InvalidRequestMisses) { + const std::vector cached_ranges = { + {100, 100}, + }; + + EXPECT_TRUE(detail::plan_page_cache_range_read(-1, 10, cached_ranges).empty()); + EXPECT_TRUE(detail::plan_page_cache_range_read(100, 0, cached_ranges).empty()); + EXPECT_TRUE(detail::plan_page_cache_range_read(100, -1, cached_ranges).empty()); +} + +} // namespace +} // namespace doris::format::parquet diff --git a/be/test/format_v2/parquet/parquet_reader_test.cpp b/be/test/format_v2/parquet/parquet_reader_test.cpp index f2ce06821c9376..4ce8a1b7f66648 100644 --- a/be/test/format_v2/parquet/parquet_reader_test.cpp +++ b/be/test/format_v2/parquet/parquet_reader_test.cpp @@ -907,6 +907,72 @@ TEST_F(NewParquetReaderTest, ReadMultipleRowGroups) { EXPECT_EQ(values, std::vector({"one", "two", "three", "four", "five"})); } +TEST_F(NewParquetReaderTest, RewriteSameLocalPathDoesNotReuseUnknownMtimePageCache) { + RuntimeProfile first_profile("new_parquet_reader_first_unknown_mtime"); + { + auto reader = create_reader(0, -1, &first_profile); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + request->non_predicate_columns = {field_projection(0), field_projection(1)}; + ASSERT_TRUE(reader->open(request).ok()); + + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + } + } + + ASSERT_NE(first_profile.get_counter("PageReadCount"), nullptr); + ASSERT_NE(first_profile.get_counter("PageCacheWriteCount"), nullptr); + EXPECT_EQ(first_profile.get_counter("PageReadCount")->value(), 0); + EXPECT_EQ(first_profile.get_counter("PageCacheWriteCount")->value(), 0); + + // LocalFileReader reports mtime as 0. Rewriting the same path must not reuse page-cache bytes + // from the previous physical file, even when the query option enables parquet file page cache. + write_int_pair_parquet_file(_file_path); + RuntimeProfile second_profile("new_parquet_reader_second_unknown_mtime"); + auto reader = create_reader(0, -1, &second_profile); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + request->non_predicate_columns = {field_projection(0), field_projection(1)}; + ASSERT_TRUE(reader->open(request).ok()); + + std::vector ids; + std::vector scores; + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + if (rows == 0) { + continue; + } + const auto& id_column = nullable_nested_column(block, 0); + const auto& score_column = nullable_nested_column(block, 1); + for (size_t row = 0; row < rows; ++row) { + ids.push_back(id_column.get_element(row)); + scores.push_back(score_column.get_element(row)); + } + } + + EXPECT_EQ(ids, std::vector({1, 2, 3, 4, 5})); + EXPECT_EQ(scores, std::vector({1, 2, 3, 4, 5})); + ASSERT_NE(second_profile.get_counter("PageReadCount"), nullptr); + ASSERT_NE(second_profile.get_counter("PageCacheWriteCount"), nullptr); + EXPECT_EQ(second_profile.get_counter("PageReadCount")->value(), 0); + EXPECT_EQ(second_profile.get_counter("PageCacheWriteCount")->value(), 0); +} + TEST_F(NewParquetReaderTest, ReadPredicateAndNonPredicateColumnsWithSelection) { RuntimeProfile profile("new_parquet_reader_filter_profile"); auto reader = create_reader(0, -1, &profile); From 11d7989e5c0d186d2fbd8e5ad594ab72f9d42575 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jun 2026 15:32:30 +0800 Subject: [PATCH 2/2] [test](regression) Update openx JSON malformed expected output ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: The Hive OpenX JSON regression expected ten null rows for the ignore.malformed.json table, while the malformed input file contains eleven physical malformed JSON lines. The JSON v2 reader returns one null row for each malformed line when ignore.malformed.json is enabled, so the regression expectation needs to include the extra null row. ### Release note None ### Check List (For Author) - Test: Manual test - Verified the expected q1 output now contains eleven null rows and ran git diff --check. - Behavior changed: No - Does this need documentation: No --- .../data/external_table_p0/hive/test_hive_openx_json.out | 1 + 1 file changed, 1 insertion(+) diff --git a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out index aa7220b55f16b2..6e4a5c101fa3ed 100644 --- a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out +++ b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out @@ -10,6 +10,7 @@ \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N +\N \N \N \N \N 1 Alice [1, 2, 3] {"math":90, "english":85} {"a":100, "b":"test1", "c":1234567890} 2 Bob [4, 5] {"math":80, "science":95} {"a":200, "b":"test2", "c":9876543210}