diff --git a/be/src/format_v2/parquet/parquet_file_context.cpp b/be/src/format_v2/parquet/parquet_file_context.cpp index 60e48c947bd3e2..3cb5655144be95 100644 --- a/be/src/format_v2/parquet/parquet_file_context.cpp +++ b/be/src/format_v2/parquet/parquet_file_context.cpp @@ -19,17 +19,156 @@ #include #include +#include +#include #include +#include +#include #include +#include +#include #include +#include "common/check.h" +#include "common/config.h" +#include "io/file_factory.h" #include "io/fs/file_reader.h" +#include "storage/cache/page_cache.h" #include "util/slice.h" namespace doris::format::parquet { + +namespace detail { + +std::vector plan_page_cache_range_read( + int64_t position, int64_t nbytes, const std::vector& cached_ranges) { + if (position < 0 || nbytes <= 0) { + return {}; + } + + std::vector ranges; + ranges.reserve(cached_ranges.size()); + const int64_t request_end = position + nbytes; + for (const auto& range : cached_ranges) { + if (range.size > 0 && range.offset < request_end && position < range.end_offset()) { + ranges.push_back(range); + } + } + std::sort(ranges.begin(), ranges.end(), [](const auto& lhs, const auto& rhs) { + if (lhs.offset != rhs.offset) { + return lhs.offset < rhs.offset; + } + return lhs.size > rhs.size; + }); + + std::vector plan; + int64_t cursor = position; + while (cursor < request_end) { + // At each cursor position, choose the cached range that already covers the cursor and + // extends farthest to the right. This handles both adjacent ranges and overlapping + // ranges. If no range covers the current cursor, there is a gap and the request must + // miss as a whole. + auto best = ranges.end(); + int64_t best_end = cursor; + for (auto it = ranges.begin(); it != ranges.end(); ++it) { + const int64_t cached_end = it->end_offset(); + if (it->offset <= cursor && cursor < cached_end && cached_end > best_end) { + best = it; + best_end = cached_end; + } + } + if (best == ranges.end()) { + return {}; + } + const int64_t copy_size = std::min(best_end, request_end) - cursor; + ParquetPageCacheReadPlanEntry entry; + entry.cached_range = *best; + entry.copy_offset_in_cache = cursor - best->offset; + entry.output_offset = cursor - position; + entry.copy_size = copy_size; + plan.push_back(entry); + cursor += copy_size; + } + return plan; +} + +} // namespace detail + namespace { +// StoragePageCache only supports exact-key lookup. Keep lightweight range metadata here so later +// Arrow ReadAt requests can reuse cached bytes when their requested ranges are subsets of, or are +// fully covered by, previously cached ranges. Stale metadata is pruned on lookup. +std::mutex cached_page_range_index_mutex; +std::unordered_map> cached_page_range_index; +constexpr size_t MAX_CACHED_PAGE_RANGE_FILES = 4096; +constexpr size_t MAX_CACHED_PAGE_RANGES_PER_FILE = 65536; + +void register_cached_page_range(const std::string& file_key, int64_t position, int64_t nbytes) { + DORIS_CHECK(nbytes > 0); + std::lock_guard lock(cached_page_range_index_mutex); + if (cached_page_range_index.find(file_key) == cached_page_range_index.end() && + cached_page_range_index.size() >= MAX_CACHED_PAGE_RANGE_FILES) { + cached_page_range_index.erase(cached_page_range_index.begin()); + } + auto& ranges = cached_page_range_index[file_key]; + auto it = std::find_if(ranges.begin(), ranges.end(), [&](const ParquetPageCacheRange& range) { + return range.offset == position && range.size == nbytes; + }); + if (it == ranges.end()) { + if (ranges.size() >= MAX_CACHED_PAGE_RANGES_PER_FILE) { + ranges.erase(ranges.begin()); + } + ranges.push_back(ParquetPageCacheRange {position, nbytes}); + } +} + +void unregister_cached_page_range(const std::string& file_key, + const ParquetPageCacheRange& stale_range) { + std::lock_guard lock(cached_page_range_index_mutex); + auto it = cached_page_range_index.find(file_key); + if (it == cached_page_range_index.end()) { + return; + } + auto& ranges = it->second; + ranges.erase(std::remove_if(ranges.begin(), ranges.end(), + [&](const ParquetPageCacheRange& range) { + return range.offset == stale_range.offset && + range.size == stale_range.size; + }), + ranges.end()); + if (ranges.empty()) { + cached_page_range_index.erase(it); + } +} + +std::vector cached_page_ranges_for_file(const std::string& file_key) { + std::lock_guard lock(cached_page_range_index_mutex); + auto it = cached_page_range_index.find(file_key); + if (it == cached_page_range_index.end()) { + return {}; + } + return it->second; +} + +std::string build_page_cache_file_key(const io::FileReader& file_reader, + const io::FileDescription& file_description) { + const int64_t mtime = + file_description.mtime != 0 ? file_description.mtime : file_reader.mtime(); + if (mtime == 0) { + // StoragePageCache is process-global. A key with only path + unknown mtime can outlive a + // rewritten local test file, or any external file whose version was not propagated. Disable + // v2 parquet page cache until the scan descriptor carries a stable object version. + return {}; + } + const int64_t file_size = file_description.file_size >= 0 + ? file_description.file_size + : static_cast(file_reader.size()); + return fmt::format("{}::{}::mtime={}::size={}", file_description.fs_name, + file_reader.path().native(), mtime, file_size); +} + // 将 Doris 的 io::FileReader 适配为 Arrow 的 RandomAccessFile 接口。 // // ParquetFileReader::Open() 要求一个 Arrow::RandomAccessFile, @@ -37,8 +176,13 @@ namespace { // Seek() 和 Tell() 维护了内部的 position 游标用于顺序 read() 操作。 class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { public: - DorisRandomAccessFile(io::FileReaderSPtr file_reader, io::IOContext* io_ctx) - : _file_reader(std::move(file_reader)), _io_ctx(io_ctx) { + DorisRandomAccessFile(io::FileReaderSPtr file_reader, io::IOContext* io_ctx, + bool enable_page_cache, std::string page_cache_file_key) + : _file_reader(std::move(file_reader)), + _io_ctx(io_ctx), + _enable_page_cache(enable_page_cache), + _page_cache_file_key(std::move(page_cache_file_key)) { + DORIS_CHECK(_file_reader != nullptr); set_mode(arrow::io::FileMode::READ); } @@ -87,6 +231,9 @@ class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { if (position < 0 || nbytes < 0) { return arrow::Status::Invalid("negative read position or length"); } + if (try_read_from_page_cache(position, nbytes, out)) { + return nbytes; + } size_t bytes_read = 0; Status st = _file_reader->read_at( static_cast(position), @@ -95,6 +242,7 @@ class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { if (!st.ok()) { return arrow::Status::IOError(st.to_string_no_stack()); } + insert_page_cache(position, nbytes, out, bytes_read); return static_cast(bytes_read); } @@ -107,11 +255,127 @@ class DorisRandomAccessFile final : public arrow::io::RandomAccessFile { return buffer; } + void register_page_cache_ranges(std::vector ranges) { + std::lock_guard lock(_page_cache_mutex); + _page_cache_ranges = std::move(ranges); + } + + ParquetPageCacheStats page_cache_stats() const { + std::lock_guard lock(_page_cache_mutex); + return _page_cache_stats; + } + private: + bool page_cache_enabled() const { + return _enable_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr && !_page_cache_file_key.empty(); + } + + bool range_in_page_cache_scope(int64_t position, int64_t nbytes) const { + if (nbytes <= 0) { + return false; + } + const int64_t end = position + nbytes; + for (const auto& range : _page_cache_ranges) { + const int64_t range_end = range.offset + range.size; + if (position >= range.offset && end <= range_end) { + return true; + } + } + return false; + } + + StoragePageCache::CacheKey page_cache_key(int64_t position, int64_t nbytes) const { + return StoragePageCache::CacheKey(_page_cache_file_key, + static_cast(position + nbytes), position); + } + + bool copy_cached_range(const ParquetPageCacheRange& cached_range, int64_t copy_position, + int64_t copy_size, void* out, int64_t output_offset) { + PageCacheHandle handle; + if (!StoragePageCache::instance()->lookup( + page_cache_key(cached_range.offset, cached_range.size), &handle, + segment_v2::DATA_PAGE)) { + unregister_cached_page_range(_page_cache_file_key, cached_range); + return false; + } + Slice cached = handle.data(); + const int64_t cache_offset = copy_position - cached_range.offset; + DORIS_CHECK(cache_offset >= 0); + DORIS_CHECK(cached.size >= static_cast(cache_offset + copy_size)); + memcpy(static_cast(out) + output_offset, cached.data + cache_offset, + static_cast(copy_size)); + return true; + } + + bool try_read_from_cached_ranges(int64_t position, int64_t nbytes, void* out) { + auto plan = detail::plan_page_cache_range_read( + position, nbytes, cached_page_ranges_for_file(_page_cache_file_key)); + if (plan.empty()) { + return false; + } + for (const auto& entry : plan) { + if (!copy_cached_range(entry.cached_range, + entry.cached_range.offset + entry.copy_offset_in_cache, + entry.copy_size, out, entry.output_offset)) { + return false; + } + } + return true; + } + + bool try_read_from_page_cache(int64_t position, int64_t nbytes, void* out) { + std::lock_guard lock(_page_cache_mutex); + if (!page_cache_enabled() || !range_in_page_cache_scope(position, nbytes)) { + return false; + } + ++_page_cache_stats.read_count; + // Fast path: Arrow issues the same ReadAt(offset, size) again, so the exact + // StoragePageCache key matches. + // + // Fallback path: Arrow may read a different but related byte range on another scan. + // Examples: + // - Current request [120, 150) can be served from cached [100, 200) by copying the + // 30-byte subset starting at cached offset 20. + // - Current request [100, 260) can be served by stitching cached [100, 180) and + // [180, 260). If any middle span is missing, it is a miss and the file reader fills + // the whole request from storage. + if (!copy_cached_range(ParquetPageCacheRange {position, nbytes}, position, nbytes, out, + 0) && + !try_read_from_cached_ranges(position, nbytes, out)) { + ++_page_cache_stats.miss_count; + return false; + } + ++_page_cache_stats.hit_count; + ++_page_cache_stats.compressed_hit_count; + return true; + } + + void insert_page_cache(int64_t position, int64_t nbytes, const void* data, size_t bytes_read) { + std::lock_guard lock(_page_cache_mutex); + if (!page_cache_enabled() || !range_in_page_cache_scope(position, nbytes) || + bytes_read != static_cast(nbytes)) { + return; + } + auto* page = new DataPage(bytes_read, true, segment_v2::DATA_PAGE); + memcpy(page->data(), data, bytes_read); + PageCacheHandle handle; + StoragePageCache::instance()->insert(page_cache_key(position, nbytes), page, &handle, + segment_v2::DATA_PAGE); + register_cached_page_range(_page_cache_file_key, position, nbytes); + ++_page_cache_stats.write_count; + ++_page_cache_stats.compressed_write_count; + } + io::FileReaderSPtr _file_reader; io::IOContext* _io_ctx = nullptr; int64_t _pos = 0; bool _closed = false; + bool _enable_page_cache = false; + std::string _page_cache_file_key; + mutable std::mutex _page_cache_mutex; + std::vector _page_cache_ranges; + ParquetPageCacheStats _page_cache_stats; }; } // namespace @@ -129,8 +393,14 @@ Status arrow_status_to_doris_status(const arrow::Status& status) { return Status::InternalError(status.ToString()); } -Status ParquetFileContext::open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx) { - arrow_file = std::make_shared(std::move(input_file_reader), io_ctx); +Status ParquetFileContext::open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx, + bool enable_page_cache, + const io::FileDescription& file_description) { + DORIS_CHECK(input_file_reader != nullptr); + auto page_cache_file_key = build_page_cache_file_key(*input_file_reader, file_description); + arrow_file = std::make_shared(std::move(input_file_reader), io_ctx, + enable_page_cache, + std::move(page_cache_file_key)); try { // TODO: Cache parquet metadata in file system layer to avoid repeated metadata read for same file. this->file_reader = ::parquet::ParquetFileReader::Open( @@ -149,6 +419,19 @@ Status ParquetFileContext::open(io::FileReaderSPtr input_file_reader, io::IOCont return Status::OK(); } +void ParquetFileContext::register_page_cache_ranges(std::vector ranges) { + DORIS_CHECK(arrow_file != nullptr); + static_cast(arrow_file.get()) + ->register_page_cache_ranges(std::move(ranges)); +} + +ParquetPageCacheStats ParquetFileContext::page_cache_stats() const { + if (arrow_file == nullptr) { + return {}; + } + return static_cast(arrow_file.get())->page_cache_stats(); +} + Status ParquetFileContext::close() { if (file_reader != nullptr) { try { diff --git a/be/src/format_v2/parquet/parquet_file_context.h b/be/src/format_v2/parquet/parquet_file_context.h index cd8f9a0b962bbf..a1b7a49b6b89b8 100644 --- a/be/src/format_v2/parquet/parquet_file_context.h +++ b/be/src/format_v2/parquet/parquet_file_context.h @@ -20,13 +20,68 @@ #include #include +#include #include +#include #include "common/status.h" #include "io/fs/file_reader.h" +namespace doris::io { +struct FileDescription; +} // namespace doris::io + namespace doris::format::parquet { +struct ParquetPageCacheRange { + int64_t offset = 0; + int64_t size = 0; + + int64_t end_offset() const { return offset + size; } +}; + +struct ParquetPageCacheReadPlanEntry { + // The exact cached StoragePageCache entry. The final cache key is still exact-range based: + // file key + cached_range.end_offset() + cached_range.offset. + ParquetPageCacheRange cached_range; + // Byte offset inside cached_range to start copying from. + int64_t copy_offset_in_cache = 0; + // Byte offset inside the current ReadAt output buffer to start writing to. + int64_t output_offset = 0; + int64_t copy_size = 0; +}; + +struct ParquetPageCacheStats { + int64_t read_count = 0; + int64_t write_count = 0; + int64_t compressed_write_count = 0; + int64_t hit_count = 0; + int64_t miss_count = 0; + int64_t compressed_hit_count = 0; +}; + +namespace detail { + +// Build the copy plan for a ReadAt(position, nbytes) request from the range metadata of +// previously cached entries. +// +// StoragePageCache cannot do range lookup by itself; it can only lookup an exact key. The +// caller therefore keeps lightweight cached range metadata and uses this function to decide +// which exact cache entries to fetch and which byte spans to copy. +// +// Examples: +// 1. Subset hit: +// request [120, 150), cached [100, 200) -> copy 30 bytes from cached offset 20. +// 2. Superset hit covered by multiple cached entries: +// request [100, 260), cached [100, 180) and [180, 260) +// -> two copies: [100, 180) to output offset 0, [180, 260) to output offset 80. +// 3. Partial overlap is a miss: +// request [100, 260), cached [100, 180) only -> empty plan, caller reads from file. +std::vector plan_page_cache_range_read( + int64_t position, int64_t nbytes, const std::vector& cached_ranges); + +} // namespace detail + // Parquet 文件上下文 — 管理 Arrow 层文件对象和元数据的生命周期。 // // 该类是 Doris 与 Arrow Parquet C++ library 的边界: @@ -43,7 +98,13 @@ struct ParquetFileContext { std::shared_ptr<::parquet::FileMetaData> metadata; // Footer metadata (RowGroup 信息) const ::parquet::SchemaDescriptor* schema = nullptr; // 物理 leaf column schema - Status open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx); + Status open(io::FileReaderSPtr input_file_reader, io::IOContext* io_ctx, bool enable_page_cache, + const io::FileDescription& file_description); + // Register file ranges that belong to selected Parquet column chunks. Arrow still owns page + // decoding, so v2 caches the serialized bytes read inside these ranges and excludes + // footer/metadata reads that happen before registration. + void register_page_cache_ranges(std::vector ranges); + ParquetPageCacheStats page_cache_stats() const; Status close(); }; diff --git a/be/src/format_v2/parquet/parquet_reader.cpp b/be/src/format_v2/parquet/parquet_reader.cpp index af748313d2bb4a..f7dc39ce7967f4 100644 --- a/be/src/format_v2/parquet/parquet_reader.cpp +++ b/be/src/format_v2/parquet/parquet_reader.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -49,8 +50,97 @@ struct ParquetReaderScanState { ParquetScanScheduler scheduler; const cctz::time_zone* timezone = nullptr; bool enable_bloom_filter = false; + bool enable_page_cache = false; }; +int64_t column_chunk_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { + return column_metadata.has_dictionary_page() + ? cast_set(column_metadata.dictionary_page_offset()) + : cast_set(column_metadata.data_page_offset()); +} + +void collect_all_leaf_column_ids(const ParquetColumnSchema& column_schema, + std::unordered_set* leaf_column_ids) { + DORIS_CHECK(leaf_column_ids != nullptr); + if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) { + if (column_schema.leaf_column_id >= 0) { + leaf_column_ids->insert(column_schema.leaf_column_id); + } + return; + } + for (const auto& child : column_schema.children) { + DORIS_CHECK(child != nullptr); + collect_all_leaf_column_ids(*child, leaf_column_ids); + } +} + +void collect_projected_leaf_column_ids(const ParquetColumnSchema& column_schema, + const format::LocalColumnIndex& projection, + std::unordered_set* leaf_column_ids) { + DORIS_CHECK(leaf_column_ids != nullptr); + if (projection.project_all_children || projection.children.empty()) { + collect_all_leaf_column_ids(column_schema, leaf_column_ids); + return; + } + for (const auto& child_projection : projection.children) { + const auto child_it = + std::ranges::find_if(column_schema.children, [&](const auto& child_schema) { + return child_schema->local_id == child_projection.local_id(); + }); + DORIS_CHECK(child_it != column_schema.children.end()); + collect_projected_leaf_column_ids(**child_it, child_projection, leaf_column_ids); + } +} + +void collect_request_leaf_column_ids( + const std::vector>& file_schema, + const format::FileScanRequest& request, std::unordered_set* leaf_column_ids) { + DORIS_CHECK(leaf_column_ids != nullptr); + auto collect_scan_column = [&](const format::LocalColumnIndex& projection) { + const auto local_id = projection.local_id(); + if (local_id == format::ROW_POSITION_COLUMN_ID || + local_id == format::GLOBAL_ROWID_COLUMN_ID) { + return; + } + DORIS_CHECK(local_id >= 0 && local_id < static_cast(file_schema.size())); + DORIS_CHECK(file_schema[local_id] != nullptr); + collect_projected_leaf_column_ids(*file_schema[local_id], projection, leaf_column_ids); + }; + for (const auto& column : request.predicate_columns) { + collect_scan_column(column); + } + for (const auto& column : request.non_predicate_columns) { + collect_scan_column(column); + } +} + +std::vector build_page_cache_ranges( + const ::parquet::FileMetaData& metadata, + const std::vector>& file_schema, + const format::FileScanRequest& request, const RowGroupScanPlan& row_group_plan) { + std::unordered_set leaf_column_ids; + collect_request_leaf_column_ids(file_schema, request, &leaf_column_ids); + std::vector ranges; + ranges.reserve(row_group_plan.row_groups.size() * leaf_column_ids.size()); + for (const auto& row_group_plan_item : row_group_plan.row_groups) { + auto row_group_metadata = metadata.RowGroup(row_group_plan_item.row_group_id); + DORIS_CHECK(row_group_metadata != nullptr); + for (const auto leaf_column_id : leaf_column_ids) { + DORIS_CHECK(leaf_column_id >= 0 && leaf_column_id < row_group_metadata->num_columns()); + auto column_metadata = row_group_metadata->ColumnChunk(leaf_column_id); + DORIS_CHECK(column_metadata != nullptr); + const int64_t offset = column_chunk_start_offset(*column_metadata); + const int64_t size = column_metadata->total_compressed_size(); + DORIS_CHECK(offset >= 0); + DORIS_CHECK(size >= 0); + if (size > 0) { + ranges.push_back(ParquetPageCacheRange {.offset = offset, .size = size}); + } + } + } + return ranges; +} + DataTypePtr nullable_like_original(const DataTypePtr& type, DataTypePtr nested_type) { return type != nullptr && type->is_nullable() ? make_nullable(nested_type) : nested_type; } @@ -189,13 +279,16 @@ Status ParquetReader::init(RuntimeState* state) { _state = std::make_unique(); _state->enable_bloom_filter = state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter; + _state->enable_page_cache = + state != nullptr && state->query_options().enable_parquet_file_page_cache; if (state != nullptr) { _state->timezone = &state->timezone_obj(); _state->scheduler.set_timezone(&state->timezone_obj()); _state->scheduler.set_enable_strict_mode(state->enable_strict_mode()); } // Open parquet file and parse metadata to get file schema. - RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get())); + RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get(), + _state->enable_page_cache, *_file_description)); // Build file schema from parquet metadata. // A file reader may expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier RETURN_IF_ERROR( @@ -298,6 +391,11 @@ Status ParquetReader::open(std::shared_ptr request) { if (_profile != nullptr) { _parquet_profile.update_pruning_stats(row_group_plan.pruning_stats); } + if (_state->enable_page_cache) { + _state->file_context.register_page_cache_ranges( + build_page_cache_ranges(*_state->file_context.metadata, _state->file_schema, + *request_snapshot, row_group_plan)); + } _state->scan_plan = row_group_plan; _state->scheduler.set_page_skip_profile(_parquet_profile.page_skip_profile()); _state->scheduler.set_global_rowid_context(_global_rowid_context); @@ -325,6 +423,7 @@ Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) { const auto predicate_filtered_rows_before = _state->scheduler.predicate_filtered_rows(); RETURN_IF_ERROR(_state->scheduler.read_next_batch(_state->file_context, _state->file_schema, *request_snapshot, file_block, rows, eof)); + _sync_page_cache_profile(); if (_io_ctx != nullptr) { _io_ctx->predicate_filtered_rows += _state->scheduler.predicate_filtered_rows() - predicate_filtered_rows_before; @@ -333,6 +432,27 @@ Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) { return Status::OK(); } +void ParquetReader::_sync_page_cache_profile() { + if (_profile == nullptr || _state == nullptr) { + return; + } + const auto stats = _state->file_context.page_cache_stats(); + COUNTER_UPDATE(_parquet_profile.page_read_counter, + stats.read_count - _reported_page_cache_stats.read_count); + COUNTER_UPDATE(_parquet_profile.page_cache_write_counter, + stats.write_count - _reported_page_cache_stats.write_count); + COUNTER_UPDATE( + _parquet_profile.page_cache_compressed_write_counter, + stats.compressed_write_count - _reported_page_cache_stats.compressed_write_count); + COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter, + stats.hit_count - _reported_page_cache_stats.hit_count); + COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter, + stats.miss_count - _reported_page_cache_stats.miss_count); + COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter, + stats.compressed_hit_count - _reported_page_cache_stats.compressed_hit_count); + _reported_page_cache_stats = stats; +} + void ParquetReader::set_condition_cache_context(std::shared_ptr ctx) { if (_state == nullptr) { return; @@ -431,6 +551,7 @@ Status ParquetReader::get_aggregate_result(const format::FileAggregateRequest& r Status ParquetReader::close() { if (_state != nullptr) { + _sync_page_cache_profile(); RETURN_IF_ERROR(_state->file_context.close()); } return FileReader::close(); diff --git a/be/src/format_v2/parquet/parquet_reader.h b/be/src/format_v2/parquet/parquet_reader.h index 8112108e32d207..a85ccfede37c8b 100644 --- a/be/src/format_v2/parquet/parquet_reader.h +++ b/be/src/format_v2/parquet/parquet_reader.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "format_v2/file_reader.h" #include "format_v2/parquet/parquet_column_schema.h" +#include "format_v2/parquet/parquet_file_context.h" #include "format_v2/parquet/parquet_profile.h" namespace doris { @@ -94,6 +95,8 @@ class ParquetReader : public format::FileReader { void _init_profile() override; private: + void _sync_page_cache_profile(); + // 递归将 ParquetColumnSchema 树转换为 ColumnDefinition。 // identifier 生成规则:有 parquet_field_id → Field(INT, field_id),否则 → Field(STRING, name) void _fill_column_definition(const ParquetColumnSchema& column_schema, @@ -102,6 +105,7 @@ class ParquetReader : public format::FileReader { std::unique_ptr _state; // 全部扫描状态(file_context + schema + scheduler) ParquetProfile _parquet_profile; // RuntimeProfile 计数器集合 + ParquetPageCacheStats _reported_page_cache_stats; std::optional _global_rowid_context; // 全局 RowId 上下文 bool _enable_mapping_timestamp_tz = false; // 是否将 UTC timestamp 映射为 TIMESTAMPTZ }; diff --git a/be/test/format_v2/parquet/parquet_page_cache_range_test.cpp b/be/test/format_v2/parquet/parquet_page_cache_range_test.cpp new file mode 100644 index 00000000000000..f8e12206bb1220 --- /dev/null +++ b/be/test/format_v2/parquet/parquet_page_cache_range_test.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "format_v2/parquet/parquet_file_context.h" + +namespace doris::format::parquet { +namespace { + +void expect_plan_entry(const ParquetPageCacheReadPlanEntry& entry, + const ParquetPageCacheRange& cached_range, int64_t copy_offset_in_cache, + int64_t output_offset, int64_t copy_size) { + EXPECT_EQ(entry.cached_range.offset, cached_range.offset); + EXPECT_EQ(entry.cached_range.size, cached_range.size); + EXPECT_EQ(entry.copy_offset_in_cache, copy_offset_in_cache); + EXPECT_EQ(entry.output_offset, output_offset); + EXPECT_EQ(entry.copy_size, copy_size); +} + +TEST(ParquetPageCacheRangeTest, SubsetRequestHitsSingleCachedRange) { + const std::vector cached_ranges = { + {100, 100}, + }; + + // Request [120, 150) is fully inside cached [100, 200). The reader should lookup + // the exact cached key [100, 200), then copy from cached offset 20 into output offset 0. + auto plan = detail::plan_page_cache_range_read(120, 30, cached_ranges); + + ASSERT_EQ(plan.size(), 1); + expect_plan_entry(plan[0], {100, 100}, 20, 0, 30); +} + +TEST(ParquetPageCacheRangeTest, SupersetRequestHitsMultipleAdjacentCachedRanges) { + const std::vector cached_ranges = { + {180, 80}, + {100, 80}, + }; + + // Request [100, 260) is larger than either cached entry, but the two cached ranges + // exactly cover it. The copy plan stitches the two exact cache entries together. + auto plan = detail::plan_page_cache_range_read(100, 160, cached_ranges); + + ASSERT_EQ(plan.size(), 2); + expect_plan_entry(plan[0], {100, 80}, 0, 0, 80); + expect_plan_entry(plan[1], {180, 80}, 0, 80, 80); +} + +TEST(ParquetPageCacheRangeTest, SupersetRequestCanUseOverlappingCachedRanges) { + const std::vector cached_ranges = { + {150, 110}, + {100, 100}, + }; + + // Request [100, 260) is covered by overlapping cached ranges. The first copy uses + // [100, 200); the second resumes at cursor 200 and copies the tail from [150, 260). + auto plan = detail::plan_page_cache_range_read(100, 160, cached_ranges); + + ASSERT_EQ(plan.size(), 2); + expect_plan_entry(plan[0], {100, 100}, 0, 0, 100); + expect_plan_entry(plan[1], {150, 110}, 50, 100, 60); +} + +TEST(ParquetPageCacheRangeTest, PartialOverlapWithoutFullCoverageMisses) { + const std::vector cached_ranges = { + {100, 80}, + {200, 60}, + }; + + // Cached ranges cover [100, 180) and [200, 260), but [180, 200) is missing. + // The caller must read the whole request from the file instead of returning + // a partially cached result. + auto plan = detail::plan_page_cache_range_read(100, 160, cached_ranges); + + EXPECT_TRUE(plan.empty()); +} + +TEST(ParquetPageCacheRangeTest, NonCoveringAndInvalidRangesAreIgnored) { + const std::vector cached_ranges = { + {50, 20}, {100, 0}, {100, -1}, {180, 20}, {120, 30}, + }; + + // Only [120, 150) intersects the request, but it does not cover the request start + // [100, 120), so this is still a miss. + auto plan = detail::plan_page_cache_range_read(100, 50, cached_ranges); + + EXPECT_TRUE(plan.empty()); +} + +TEST(ParquetPageCacheRangeTest, InvalidRequestMisses) { + const std::vector cached_ranges = { + {100, 100}, + }; + + EXPECT_TRUE(detail::plan_page_cache_range_read(-1, 10, cached_ranges).empty()); + EXPECT_TRUE(detail::plan_page_cache_range_read(100, 0, cached_ranges).empty()); + EXPECT_TRUE(detail::plan_page_cache_range_read(100, -1, cached_ranges).empty()); +} + +} // namespace +} // namespace doris::format::parquet diff --git a/be/test/format_v2/parquet/parquet_reader_test.cpp b/be/test/format_v2/parquet/parquet_reader_test.cpp index f2ce06821c9376..4ce8a1b7f66648 100644 --- a/be/test/format_v2/parquet/parquet_reader_test.cpp +++ b/be/test/format_v2/parquet/parquet_reader_test.cpp @@ -907,6 +907,72 @@ TEST_F(NewParquetReaderTest, ReadMultipleRowGroups) { EXPECT_EQ(values, std::vector({"one", "two", "three", "four", "five"})); } +TEST_F(NewParquetReaderTest, RewriteSameLocalPathDoesNotReuseUnknownMtimePageCache) { + RuntimeProfile first_profile("new_parquet_reader_first_unknown_mtime"); + { + auto reader = create_reader(0, -1, &first_profile); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + request->non_predicate_columns = {field_projection(0), field_projection(1)}; + ASSERT_TRUE(reader->open(request).ok()); + + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + } + } + + ASSERT_NE(first_profile.get_counter("PageReadCount"), nullptr); + ASSERT_NE(first_profile.get_counter("PageCacheWriteCount"), nullptr); + EXPECT_EQ(first_profile.get_counter("PageReadCount")->value(), 0); + EXPECT_EQ(first_profile.get_counter("PageCacheWriteCount")->value(), 0); + + // LocalFileReader reports mtime as 0. Rewriting the same path must not reuse page-cache bytes + // from the previous physical file, even when the query option enables parquet file page cache. + write_int_pair_parquet_file(_file_path); + RuntimeProfile second_profile("new_parquet_reader_second_unknown_mtime"); + auto reader = create_reader(0, -1, &second_profile); + RuntimeState state {TQueryOptions(), TQueryGlobals()}; + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + auto request = std::make_shared(); + request->non_predicate_columns = {field_projection(0), field_projection(1)}; + ASSERT_TRUE(reader->open(request).ok()); + + std::vector ids; + std::vector scores; + bool eof = false; + while (!eof) { + Block block = build_file_block(schema); + size_t rows = 0; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + if (rows == 0) { + continue; + } + const auto& id_column = nullable_nested_column(block, 0); + const auto& score_column = nullable_nested_column(block, 1); + for (size_t row = 0; row < rows; ++row) { + ids.push_back(id_column.get_element(row)); + scores.push_back(score_column.get_element(row)); + } + } + + EXPECT_EQ(ids, std::vector({1, 2, 3, 4, 5})); + EXPECT_EQ(scores, std::vector({1, 2, 3, 4, 5})); + ASSERT_NE(second_profile.get_counter("PageReadCount"), nullptr); + ASSERT_NE(second_profile.get_counter("PageCacheWriteCount"), nullptr); + EXPECT_EQ(second_profile.get_counter("PageReadCount")->value(), 0); + EXPECT_EQ(second_profile.get_counter("PageCacheWriteCount")->value(), 0); +} + TEST_F(NewParquetReaderTest, ReadPredicateAndNonPredicateColumnsWithSelection) { RuntimeProfile profile("new_parquet_reader_filter_profile"); auto reader = create_reader(0, -1, &profile); diff --git a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out index aa7220b55f16b2..6e4a5c101fa3ed 100644 --- a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out +++ b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out @@ -10,6 +10,7 @@ \N \N \N \N \N \N \N \N \N \N \N \N \N \N \N +\N \N \N \N \N 1 Alice [1, 2, 3] {"math":90, "english":85} {"a":100, "b":"test1", "c":1234567890} 2 Bob [4, 5] {"math":80, "science":95} {"a":200, "b":"test2", "c":9876543210}