From f98651072ffea305d836cd5eca35f3c250a28cde Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Mon, 26 Jan 2026 11:22:49 +0800 Subject: [PATCH] [feature](vparquet-reader) Implements parquet file page cache. (#59307) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem Summary: [Feature] Implementation of Parquet File Page Cache and Integration with Unified Page Cache Framework This PR implements a page-level caching mechanism for Parquet files and integrates it with Apache Doris's existing unified page cache framework, significantly improving query performance by caching decompressed (or compressed) data pages in memory. Key Features 1. Unified Page Cache Integration • Leverages Existing Framework: Directly integrates with Doris's StoragePageCache infrastructure used for internal tables • Shared Resource Management: Parquet cache shares memory pool and eviction policies with internal table caches • Consistent Monitoring: Reuses existing cache statistics and RuntimeProfile for unified performance monitoring • Cache Type Identification: Uses segment_v2::DATA_PAGE as cache page type, consistent with internal table data page caching 2. Smart Caching Strategy • Compression Ratio Awareness: Automatically chooses between caching compressed or decompressed data based on parquet_page_cache_decompress_threshold (default: 1.5) • Flexible Storage: Caches decompressed data when uncompressed_size/compressed_size ≤ threshold, otherwise caches compressed data if enable_parquet_cache_compressed_pages=true • Cache Key Design: Uses file_path::mtime::offset as key to ensure cache consistency across file modifications --- be/src/common/config.cpp | 6 + be/src/common/config.h | 6 + be/src/io/cache/cached_remote_file_reader.h | 2 + be/src/io/file_factory.cpp | 12 + be/src/io/file_factory.h | 6 + be/src/io/fs/broker_file_reader.cpp | 6 +- be/src/io/fs/broker_file_reader.h | 5 +- be/src/io/fs/broker_file_system.cpp | 2 +- be/src/io/fs/buffered_reader.h | 12 + be/src/io/fs/file_reader.h | 3 + be/src/io/fs/hdfs_file_reader.cpp | 7 +- be/src/io/fs/hdfs_file_reader.h | 5 +- be/src/io/fs/http_file_reader.cpp | 7 +- be/src/io/fs/http_file_reader.h | 5 +- be/src/io/fs/http_file_system.cpp | 2 +- be/src/io/fs/local_file_reader.h | 2 + be/src/io/fs/packed_file_reader.h | 2 + be/src/io/fs/s3_file_reader.h | 2 + be/src/io/fs/stream_load_pipe.h | 2 + be/src/io/fs/tracing_file_reader.h | 2 + be/src/vec/exec/format/orc/orc_file_reader.h | 2 + .../parquet/vparquet_column_chunk_reader.cpp | 294 ++++++- .../parquet/vparquet_column_chunk_reader.h | 45 +- .../format/parquet/vparquet_column_reader.cpp | 31 +- .../format/parquet/vparquet_column_reader.h | 45 +- .../format/parquet/vparquet_group_reader.cpp | 2 +- .../format/parquet/vparquet_page_reader.cpp | 76 ++ .../format/parquet/vparquet_page_reader.h | 96 ++- .../exec/format/parquet/vparquet_reader.cpp | 31 + .../vec/exec/format/parquet/vparquet_reader.h | 8 + be/test/io/fs/buffered_reader_test.cpp | 6 + .../io/fs/packed_file_concurrency_test.cpp | 2 + be/test/io/fs/packed_file_reader_test.cpp | 2 + be/test/io/fs/packed_file_system_test.cpp | 2 + .../file_reader/file_meta_cache_test.cpp | 2 + .../parquet/parquet_page_cache_test.cpp | 804 ++++++++++++++++++ .../format/parquet/parquet_thrift_test.cpp | 3 +- be/test/vec/exec/orc/orc_file_reader_test.cpp | 2 + .../org/apache/doris/qe/SessionVariable.java | 12 + gensrc/thrift/PaloInternalService.thrift | 2 + 40 files changed, 1469 insertions(+), 94 deletions(-) create mode 100644 be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index fc299779c8dc7b..f316f3bf32e2bd 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -404,6 +404,12 @@ DEFINE_Int32(index_page_cache_percentage, "10"); DEFINE_mBool(disable_storage_page_cache, "false"); // whether to disable row cache feature in storage DEFINE_mBool(disable_storage_row_cache, "true"); +// Parquet page cache: threshold ratio for caching decompressed vs compressed pages +// If uncompressed_size / compressed_size <= threshold, cache decompressed; +// otherwise cache compressed if enable_parquet_cache_compressed_pages = true +DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5"); +// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold) +DEFINE_Bool(enable_parquet_cache_compressed_pages, "false"); // whether to disable pk page cache feature in storage DEFINE_Bool(disable_pk_storage_page_cache, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 73922709283bc2..e679a6f0e39809 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -472,6 +472,12 @@ DECLARE_Int32(index_page_cache_percentage); DECLARE_Bool(disable_storage_page_cache); // whether to disable row cache feature in storage DECLARE_mBool(disable_storage_row_cache); +// Parquet page cache: threshold ratio for caching decompressed vs compressed pages +// If uncompressed_size / compressed_size <= threshold, cache decompressed; +// otherwise cache compressed if enable_parquet_cache_compressed_pages = true +DECLARE_Double(parquet_page_cache_decompress_threshold); +// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold) +DECLARE_Bool(enable_parquet_cache_compressed_pages); // whether to disable pk page cache feature in storage DECLARE_Bool(disable_pk_storage_page_cache); diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 939471b62ea41d..20c1a47ce881c3 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -55,6 +55,8 @@ class CachedRemoteFileReader final : public FileReader { static std::pair s_align_size(size_t offset, size_t size, size_t length); + int64_t mtime() const override { return _remote_file_reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index bd08bc20461016..7b1821c0fa80f5 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -203,6 +203,18 @@ Result FileFactory::create_file_reader( const io::FileSystemProperties& system_properties, const io::FileDescription& file_description, const io::FileReaderOptions& reader_options, RuntimeProfile* profile) { + auto reader_res = _create_file_reader_internal(system_properties, file_description, + reader_options, profile); + if (!reader_res.has_value()) { + return unexpected(std::move(reader_res).error()); + } + return std::move(reader_res).value(); +} + +Result FileFactory::_create_file_reader_internal( + const io::FileSystemProperties& system_properties, + const io::FileDescription& file_description, const io::FileReaderOptions& reader_options, + RuntimeProfile* profile) { TFileType::type type = system_properties.system_type; switch (type) { case TFileType::FILE_LOCAL: { diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 0ba791bd0a3dc9..61e322ca0af02c 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -126,6 +126,12 @@ class FileFactory { private: static std::string _get_fs_name(const io::FileDescription& file_description); + + /// Create FileReader without FS + static Result _create_file_reader_internal( + const io::FileSystemProperties& system_properties, + const io::FileDescription& file_description, + const io::FileReaderOptions& reader_options, RuntimeProfile* profile = nullptr); }; } // namespace doris diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index 102ea3e247778a..41b2992f70008a 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -39,12 +39,14 @@ struct IOContext; BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd, - std::shared_ptr connection) + std::shared_ptr connection, + int64_t mtime) : _path(std::move(path)), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), - _connection(std::move(connection)) { + _connection(std::move(connection)), + _mtime(mtime) { DorisMetrics::instance()->broker_file_open_reading->increment(1); DorisMetrics::instance()->broker_file_reader_total->increment(1); } diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h index 7d19edb32c0dea..2f6bd94b652bcb 100644 --- a/be/src/io/fs/broker_file_reader.h +++ b/be/src/io/fs/broker_file_reader.h @@ -38,7 +38,7 @@ struct IOContext; class BrokerFileReader final : public FileReader { public: BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd, - std::shared_ptr connection); + std::shared_ptr connection, int64_t mtime = 0); ~BrokerFileReader() override; @@ -50,6 +50,8 @@ class BrokerFileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } + int64_t mtime() const override { return _mtime; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -62,6 +64,7 @@ class BrokerFileReader final : public FileReader { TBrokerFD _fd; std::shared_ptr _connection; + int64_t _mtime; std::atomic _closed = false; }; } // namespace doris::io diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 8b0d5db23e2116..b0dc89dc277ad1 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re error_msg(response->opStatus.message)); } *reader = std::make_shared(_broker_addr, file, fsize, response->fd, - _connection); + _connection, opts.mtime); return Status::OK(); } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 5fe071762351d8..6ddcca02067ddb 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -160,6 +160,8 @@ class RangeCacheFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _inner_reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -329,6 +331,8 @@ class MergeRangeFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _reader->mtime(); } + // for test only size_t buffer_remaining() const { return _remaining; } @@ -532,6 +536,8 @@ class PrefetchBufferedReader final : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _reader->mtime(); } + void set_random_access_ranges(const std::vector* random_access_ranges) { _random_access_ranges = random_access_ranges; for (auto& _pre_buffer : _pre_buffers) { @@ -592,6 +598,8 @@ class InMemoryFileReader final : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -626,6 +634,8 @@ class BufferedStreamReader { virtual ~BufferedStreamReader() = default; // return the file path virtual std::string path() = 0; + + virtual int64_t mtime() const = 0; }; class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector { @@ -639,6 +649,8 @@ class BufferedFileStreamReader : public BufferedStreamReader, public ProfileColl Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override; std::string path() override { return _file->path(); } + int64_t mtime() const override { return _file->mtime(); } + protected: void _collect_profile_before_close() override { if (_file != nullptr) { diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index e6d8527e831906..3df912cbad4af9 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -90,6 +90,9 @@ class FileReader : public doris::ProfileCollector { virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; } + // File modification time (seconds since epoch). Default to 0 meaning unknown. + virtual int64_t mtime() const = 0; + protected: virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) = 0; diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 0e278dff0c8847..b1d65a63ba0529 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -66,16 +66,17 @@ Result HdfsFileReader::create(Path full_path, const hdfsFS& fs, auto path = convert_path(full_path, fs_name); return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) { return std::make_shared(std::move(path), std::move(fs_name), - std::move(accessor), profile); + std::move(accessor), profile, opts.mtime); }); } HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor, - RuntimeProfile* profile) + RuntimeProfile* profile, int64_t mtime) : _path(std::move(path)), _fs_name(std::move(fs_name)), _accessor(std::move(accessor)), - _profile(profile) { + _profile(profile), + _mtime(mtime) { _handle = _accessor.get(); DorisMetrics::instance()->hdfs_file_open_reading->increment(1); diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index 8556eea0de6ac5..08f98bca29af0c 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -45,7 +45,7 @@ class HdfsFileReader final : public FileReader { const FileReaderOptions& opts, RuntimeProfile* profile); HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor, - RuntimeProfile* profile); + RuntimeProfile* profile, int64_t mtime = 0); ~HdfsFileReader() override; @@ -57,6 +57,8 @@ class HdfsFileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } + int64_t mtime() const override { return _mtime; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -86,6 +88,7 @@ class HdfsFileReader final : public FileReader { CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle std::atomic _closed = false; RuntimeProfile* _profile = nullptr; + int64_t _mtime; #ifdef USE_HADOOP_HDFS HDFSProfile _hdfs_profile; #endif diff --git a/be/src/io/fs/http_file_reader.cpp b/be/src/io/fs/http_file_reader.cpp index fb243179baf557..5ad984039fc475 100644 --- a/be/src/io/fs/http_file_reader.cpp +++ b/be/src/io/fs/http_file_reader.cpp @@ -34,7 +34,7 @@ Result HttpFileReader::create(const std::string& url, ofi.path = Path(url); ofi.extend_info = props; - auto reader = std::make_shared(ofi, url); + auto reader = std::make_shared(ofi, url, opts.mtime); // Open the file to detect Range support and validate configuration RETURN_IF_ERROR_RESULT(reader->open(opts)); @@ -42,11 +42,12 @@ Result HttpFileReader::create(const std::string& url, return reader; } -HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url) +HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime) : _extend_kv(fileInfo.extend_info), _path(fileInfo.path), _url(std::move(url)), - _client(std::make_unique()) { + _client(std::make_unique()), + _mtime(mtime) { auto etag_iter = _extend_kv.find("etag"); if (etag_iter != _extend_kv.end()) { _etag = etag_iter->second; diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h index 607eedf3d1a50b..982e65905aa691 100644 --- a/be/src/io/fs/http_file_reader.h +++ b/be/src/io/fs/http_file_reader.h @@ -41,7 +41,7 @@ class HttpFileReader final : public FileReader { const std::map& props, const FileReaderOptions& opts, RuntimeProfile* profile); - explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url); + explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime); ~HttpFileReader() override; Status open(const FileReaderOptions& opts); @@ -52,6 +52,8 @@ class HttpFileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } size_t size() const override { return _file_size; } + int64_t mtime() const override { return _mtime; } + private: // Prepare and initialize the HTTP client for a new request Status prepare_client(bool set_fail_on_error = true); @@ -78,6 +80,7 @@ class HttpFileReader final : public FileReader { int64_t _last_modified = 0; std::atomic _closed = false; std::unique_ptr _client; + int64_t _mtime; // Configuration for non-Range request handling bool _enable_range_request = true; // Whether Range request is required diff --git a/be/src/io/fs/http_file_system.cpp b/be/src/io/fs/http_file_system.cpp index 92e175ca774041..b1e8de354ad9b3 100644 --- a/be/src/io/fs/http_file_system.cpp +++ b/be/src/io/fs/http_file_system.cpp @@ -56,7 +56,7 @@ Status HttpFileSystem::open_file_internal(const Path& path, FileReaderSPtr* read // Pass properties (including HTTP headers) to the file reader file_info.extend_info = _properties; - auto http_reader = std::make_shared(file_info, path.native()); + auto http_reader = std::make_shared(file_info, path.native(), opts.mtime); RETURN_IF_ERROR(http_reader->open(opts)); *reader = http_reader; return Status::OK(); diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h index 38b4cfa55af3bf..e11bb152b672d8 100644 --- a/be/src/io/fs/local_file_reader.h +++ b/be/src/io/fs/local_file_reader.h @@ -63,6 +63,8 @@ class LocalFileReader final : public FileReader { const std::string& get_data_dir_path() override { return _data_dir_path; } + int64_t mtime() const override { return 0; } + private: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/fs/packed_file_reader.h b/be/src/io/fs/packed_file_reader.h index 79cd23c8cd71a8..b6b423fbbfefdb 100644 --- a/be/src/io/fs/packed_file_reader.h +++ b/be/src/io/fs/packed_file_reader.h @@ -48,6 +48,8 @@ class PackedFileReader final : public FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _inner_reader->mtime(); } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index 58294ec1891cb8..40e3ac61d3aca6 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -53,6 +53,8 @@ class S3FileReader final : public FileReader { bool closed() const override { return _closed.load(std::memory_order_acquire); } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index cedab0b6c17a7b..df137a9267cb29 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -57,6 +57,8 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { size_t size() const override { return 0; } + int64_t mtime() const override { return 0; } + // called when consumer finished Status close() override { if (!(_finished || _cancelled)) { diff --git a/be/src/io/fs/tracing_file_reader.h b/be/src/io/fs/tracing_file_reader.h index 39b70dfbb63bef..7a6651afd21a2a 100644 --- a/be/src/io/fs/tracing_file_reader.h +++ b/be/src/io/fs/tracing_file_reader.h @@ -47,6 +47,8 @@ class TracingFileReader : public FileReader { void _collect_profile_at_runtime() override { return _inner->collect_profile_at_runtime(); } void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); } + int64_t mtime() const override { return _inner->mtime(); } + FileReaderStats* stats() const { return _stats; } doris::io::FileReaderSPtr inner_reader() { return _inner; } diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h b/be/src/vec/exec/format/orc/orc_file_reader.h index 503777e67c2946..15aeed332428b7 100644 --- a/be/src/vec/exec/format/orc/orc_file_reader.h +++ b/be/src/vec/exec/format/orc/orc_file_reader.h @@ -54,6 +54,8 @@ class OrcMergeRangeFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return _inner_reader->mtime(); } + // for test only const Statistics& statistics() const { return _statistics; } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index e7f5f94d32ffae..7e228e58dffb7e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -22,9 +22,12 @@ #include #include +#include #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "io/fs/buffered_reader.h" +#include "olap/page_cache.h" #include "util/bit_util.h" #include "util/block_compression.h" #include "util/runtime_profile.h" @@ -52,7 +55,7 @@ template ColumnChunkReader::ColumnChunkReader( io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index, size_t total_rows, - io::IOContext* io_ctx) + io::IOContext* io_ctx, const ParquetPageReadContext& page_read_ctx) : _field_schema(field_schema), _max_rep_level(field_schema->repetition_level), _max_def_level(field_schema->definition_level), @@ -60,7 +63,8 @@ ColumnChunkReader::ColumnChunkReader( _metadata(column_chunk->meta_data), _offset_index(offset_index), _total_rows(total_rows), - _io_ctx(io_ctx) {} + _io_ctx(io_ctx), + _page_read_ctx(page_read_ctx) {} template Status ColumnChunkReader::init() { @@ -69,7 +73,8 @@ Status ColumnChunkReader::init() { size_t chunk_size = _metadata.total_compressed_size; // create page reader _page_reader = create_page_reader( - _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, _offset_index); + _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, _metadata, + _page_read_ctx, _offset_index); // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec)); _state = INITIALIZED; @@ -104,7 +109,7 @@ Status ColumnChunkReader::_parse_first_page_header( RETURN_IF_ERROR(parse_page_header()); const tparquet::PageHeader* header = nullptr; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); if (header->type == tparquet::PageType::DICTIONARY_PAGE) { // the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false, // so we should parse the directory page in next_page() @@ -125,8 +130,7 @@ Status ColumnChunkReader::parse_page_header() { RETURN_IF_ERROR(_page_reader->parse_page_header()); const tparquet::PageHeader* header = nullptr; - ; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); int32_t page_num_values = _page_reader->is_header_v2() ? header->data_page_header_v2.num_values : header->data_page_header.num_values; _remaining_rep_nums = page_num_values; @@ -169,37 +173,144 @@ Status ColumnChunkReader::load_page_data() { } const tparquet::PageHeader* header = nullptr; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); int32_t uncompressed_size = header->uncompressed_page_size; + bool page_loaded = false; + + // First, try to reuse a cache handle previously discovered by PageReader + // (header-only lookup) to avoid a second lookup here. + if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + if (_page_reader->has_page_cache_handle()) { + const PageCacheHandle& handle = _page_reader->page_cache_handle(); + Slice cached = handle.data(); + size_t header_size = _page_reader->header_bytes().size(); + size_t levels_size = 0; + if (header->__isset.data_page_header_v2) { + const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; + size_t rl = header_v2.repetition_levels_byte_length; + size_t dl = header_v2.definition_levels_byte_length; + levels_size = rl + dl; + _v2_rep_levels = + Slice(reinterpret_cast(cached.data) + header_size, rl); + _v2_def_levels = + Slice(reinterpret_cast(cached.data) + header_size + rl, dl); + } + // payload_slice points to the bytes after header and levels + Slice payload_slice(cached.data + header_size + levels_size, + cached.size - header_size - levels_size); - if (_block_compress_codec != nullptr) { - Slice compressed_data; - RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); - if (header->__isset.data_page_header_v2) { - const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; - // uncompressed_size = rl + dl + uncompressed_data_size - // compressed_size = rl + dl + compressed_data_size - uncompressed_size -= header_v2.repetition_levels_byte_length + - header_v2.definition_levels_byte_length; - _get_uncompressed_levels(header_v2, compressed_data); + bool cache_payload_is_decompressed = _page_reader->is_cache_payload_decompressed(); + + if (cache_payload_is_decompressed) { + // Cached payload is already uncompressed + _page_data = payload_slice; + } else { + CHECK(_block_compress_codec); + // Decompress cached payload into _decompress_buf for decoding + size_t uncompressed_payload_size = + header->__isset.data_page_header_v2 + ? static_cast(header->uncompressed_page_size) - levels_size + : static_cast(header->uncompressed_page_size); + _reserve_decompress_buf(uncompressed_payload_size); + _page_data = Slice(_decompress_buf.get(), uncompressed_payload_size); + SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); + _chunk_statistics.decompress_cnt++; + RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &_page_data)); + } + // page cache counters were incremented when PageReader did the header-only + // cache lookup. Do not increment again to avoid double-counting. + page_loaded = true; } - bool is_v2_compressed = - header->__isset.data_page_header_v2 && header->data_page_header_v2.is_compressed; - if (header->__isset.data_page_header || is_v2_compressed) { - // check decompressed buffer size - _reserve_decompress_buf(uncompressed_size); - _page_data = Slice(_decompress_buf.get(), uncompressed_size); - SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); - _chunk_statistics.decompress_cnt++; - RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); + } + + if (!page_loaded) { + if (_block_compress_codec != nullptr) { + Slice compressed_data; + RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); + std::vector level_bytes; + if (header->__isset.data_page_header_v2) { + const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; + // uncompressed_size = rl + dl + uncompressed_data_size + // compressed_size = rl + dl + compressed_data_size + uncompressed_size -= header_v2.repetition_levels_byte_length + + header_v2.definition_levels_byte_length; + // copy level bytes (rl + dl) so that we can cache header + levels + uncompressed payload + size_t rl = header_v2.repetition_levels_byte_length; + size_t dl = header_v2.definition_levels_byte_length; + size_t level_sz = rl + dl; + if (level_sz > 0) { + level_bytes.resize(level_sz); + memcpy(level_bytes.data(), compressed_data.data, level_sz); + } + // now remove levels from compressed_data for decompression + _get_uncompressed_levels(header_v2, compressed_data); + } + bool is_v2_compressed = header->__isset.data_page_header_v2 && + header->data_page_header_v2.is_compressed; + bool page_has_compression = header->__isset.data_page_header || is_v2_compressed; + + if (page_has_compression) { + // Decompress payload for immediate decoding + _reserve_decompress_buf(uncompressed_size); + _page_data = Slice(_decompress_buf.get(), uncompressed_size); + SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); + _chunk_statistics.decompress_cnt++; + RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); + + // Decide whether to cache decompressed payload or compressed payload based on threshold + bool cache_payload_decompressed = should_cache_decompressed(header, _metadata); + + if (_page_read_ctx.enable_parquet_file_page_cache && + !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr && + !_page_reader->header_bytes().empty()) { + if (cache_payload_decompressed) { + _insert_page_into_cache(level_bytes, _page_data); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } else { + if (config::enable_parquet_cache_compressed_pages) { + // cache the compressed payload as-is (header | levels | compressed_payload) + _insert_page_into_cache( + level_bytes, Slice(compressed_data.data, compressed_data.size)); + _chunk_statistics.page_cache_compressed_write_counter += 1; + } + } + } + } else { + // no compression on this page, use the data directly + _page_data = Slice(compressed_data.data, compressed_data.size); + if (_page_read_ctx.enable_parquet_file_page_cache && + !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + _insert_page_into_cache(level_bytes, _page_data); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } + } } else { - // Don't need decompress - _page_data = Slice(compressed_data.data, compressed_data.size); - } - } else { - RETURN_IF_ERROR(_page_reader->get_page_data(_page_data)); - if (header->__isset.data_page_header_v2) { - _get_uncompressed_levels(header->data_page_header_v2, _page_data); + // For uncompressed page, we may still need to extract v2 levels + std::vector level_bytes; + Slice uncompressed_data; + RETURN_IF_ERROR(_page_reader->get_page_data(uncompressed_data)); + if (header->__isset.data_page_header_v2) { + const tparquet::DataPageHeaderV2& header_v2 = header->data_page_header_v2; + size_t rl = header_v2.repetition_levels_byte_length; + size_t dl = header_v2.definition_levels_byte_length; + size_t level_sz = rl + dl; + if (level_sz > 0) { + level_bytes.resize(level_sz); + memcpy(level_bytes.data(), uncompressed_data.data, level_sz); + } + _get_uncompressed_levels(header_v2, uncompressed_data); + } + // copy page data out + _page_data = Slice(uncompressed_data.data, uncompressed_data.size); + // Optionally cache uncompressed data for uncompressed pages + if (_page_read_ctx.enable_parquet_file_page_cache && + !config::disable_storage_page_cache && StoragePageCache::instance() != nullptr) { + _insert_page_into_cache(level_bytes, _page_data); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } } } @@ -244,7 +355,6 @@ Status ColumnChunkReader::load_page_data() { _decoders[static_cast(encoding)] = std::move(page_decoder); _page_decoder = _decoders[static_cast(encoding)].get(); } - // Reset page data for each page RETURN_IF_ERROR(_page_decoder->set_data(&_page_data)); _state = DATA_LOADED; @@ -254,7 +364,7 @@ Status ColumnChunkReader::load_page_data() { template Status ColumnChunkReader::_decode_dict_page() { const tparquet::PageHeader* header = nullptr; - RETURN_IF_ERROR(_page_reader->get_page_header(header)); + RETURN_IF_ERROR(_page_reader->get_page_header(&header)); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type); SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time); @@ -271,16 +381,84 @@ Status ColumnChunkReader::_decode_dict_page() { // Prepare dictionary data int32_t uncompressed_size = header->uncompressed_page_size; auto dict_data = make_unique_buffer(uncompressed_size); - if (_block_compress_codec != nullptr) { - Slice compressed_data; - RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); - Slice dict_slice(dict_data.get(), uncompressed_size); - RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice)); - } else { - Slice dict_slice; - RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice)); - // The data is stored by BufferedStreamReader, we should copy it out - memcpy(dict_data.get(), dict_slice.data, dict_slice.size); + bool dict_loaded = false; + + // Try to load dictionary page from cache + if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + if (_page_reader->has_page_cache_handle()) { + const PageCacheHandle& handle = _page_reader->page_cache_handle(); + Slice cached = handle.data(); + size_t header_size = _page_reader->header_bytes().size(); + // Dictionary page layout in cache: header | payload (compressed or uncompressed) + Slice payload_slice(cached.data + header_size, cached.size - header_size); + + bool cache_payload_is_decompressed = _page_reader->is_cache_payload_decompressed(); + + if (cache_payload_is_decompressed) { + // Use cached decompressed dictionary data + memcpy(dict_data.get(), payload_slice.data, payload_slice.size); + dict_loaded = true; + } else { + CHECK(_block_compress_codec); + // Decompress cached compressed dictionary data + Slice dict_slice(dict_data.get(), uncompressed_size); + RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &dict_slice)); + dict_loaded = true; + } + + // When dictionary page is loaded from cache, we need to skip the page data + // to update the offset correctly (similar to calling get_page_data()) + if (dict_loaded) { + _page_reader->skip_page_data(); + } + } + } + + if (!dict_loaded) { + // Load and decompress dictionary page from file + if (_block_compress_codec != nullptr) { + Slice compressed_data; + RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); + Slice dict_slice(dict_data.get(), uncompressed_size); + RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice)); + + // Decide whether to cache decompressed or compressed dictionary based on threshold + bool cache_payload_decompressed = should_cache_decompressed(header, _metadata); + + if (_page_read_ctx.enable_parquet_file_page_cache && + !config::disable_storage_page_cache && StoragePageCache::instance() != nullptr && + !_page_reader->header_bytes().empty()) { + std::vector empty_levels; // Dictionary pages don't have levels + if (cache_payload_decompressed) { + // Cache the decompressed dictionary page + _insert_page_into_cache(empty_levels, dict_slice); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } else { + if (config::enable_parquet_cache_compressed_pages) { + // Cache the compressed dictionary page + _insert_page_into_cache(empty_levels, + Slice(compressed_data.data, compressed_data.size)); + _chunk_statistics.page_cache_compressed_write_counter += 1; + } + } + } + } else { + Slice dict_slice; + RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice)); + // The data is stored by BufferedStreamReader, we should copy it out + memcpy(dict_data.get(), dict_slice.data, dict_slice.size); + + // Cache the uncompressed dictionary page + if (_page_read_ctx.enable_parquet_file_page_cache && + !config::disable_storage_page_cache && StoragePageCache::instance() != nullptr && + !_page_reader->header_bytes().empty()) { + std::vector empty_levels; + Slice payload(dict_data.get(), uncompressed_size); + _insert_page_into_cache(empty_levels, payload); + _chunk_statistics.page_cache_decompressed_write_counter += 1; + } + } } // Cache page decoder @@ -306,6 +484,32 @@ void ColumnChunkReader::_reserve_decompress_buf(siz } } +template +void ColumnChunkReader::_insert_page_into_cache( + const std::vector& level_bytes, const Slice& payload) { + StoragePageCache::CacheKey key = + _page_reader->make_page_cache_key(_page_reader->header_start_offset()); + const std::vector& header_bytes = _page_reader->header_bytes(); + size_t total = header_bytes.size() + level_bytes.size() + payload.size; + auto page = std::make_unique(total, true, segment_v2::DATA_PAGE); + size_t pos = 0; + memcpy(page->data() + pos, header_bytes.data(), header_bytes.size()); + pos += header_bytes.size(); + if (!level_bytes.empty()) { + memcpy(page->data() + pos, level_bytes.data(), level_bytes.size()); + pos += level_bytes.size(); + } + if (payload.size > 0) { + memcpy(page->data() + pos, payload.data, payload.size); + pos += payload.size; + } + page->reset_size(total); + PageCacheHandle handle; + StoragePageCache::instance()->insert(key, page.get(), &handle, segment_v2::DATA_PAGE); + page.release(); + _chunk_statistics.page_cache_write_counter += 1; +} + template Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 9e77a3139f60fb..d0bf7ab2d81085 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -61,20 +61,19 @@ struct ColumnChunkReaderStatistics { int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; int64_t read_page_header_time = 0; + int64_t page_read_counter = 0; + int64_t page_cache_write_counter = 0; + int64_t page_cache_compressed_write_counter = 0; + int64_t page_cache_decompressed_write_counter = 0; + int64_t page_cache_hit_counter = 0; + int64_t page_cache_missing_counter = 0; + int64_t page_cache_compressed_hit_counter = 0; + int64_t page_cache_decompressed_hit_counter = 0; }; /** * Read and decode parquet column data into doris block column. - *

Usage:

struct ColumnChunkReaderStatistics { - int64_t decompress_time = 0; - int64_t decompress_cnt = 0; - int64_t decode_header_time = 0; - int64_t decode_value_time = 0; - int64_t decode_dict_time = 0; - int64_t decode_level_time = 0; - int64_t skip_page_header_num = 0; - int64_t parse_page_header_num = 0; - }; + *

Usage:

* // Create chunk reader * ColumnChunkReader chunk_reader(BufferedStreamReader* reader, * tparquet::ColumnChunk* column_chunk, @@ -97,7 +96,8 @@ class ColumnChunkReader { public: ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index, - size_t total_row, io::IOContext* io_ctx); + size_t total_row, io::IOContext* io_ctx, + const ParquetPageReadContext& page_read_ctx); ~ColumnChunkReader() = default; // Initialize chunk reader, will generate the decoder and codec. @@ -155,6 +155,21 @@ class ColumnChunkReader { _page_reader->page_statistics().parse_page_header_num; _chunk_statistics.read_page_header_time = _page_reader->page_statistics().read_page_header_time; + _chunk_statistics.page_read_counter += _page_reader->page_statistics().page_read_counter; + _chunk_statistics.page_cache_write_counter += + _page_reader->page_statistics().page_cache_write_counter; + _chunk_statistics.page_cache_compressed_write_counter += + _page_reader->page_statistics().page_cache_compressed_write_counter; + _chunk_statistics.page_cache_decompressed_write_counter += + _page_reader->page_statistics().page_cache_decompressed_write_counter; + _chunk_statistics.page_cache_hit_counter += + _page_reader->page_statistics().page_cache_hit_counter; + _chunk_statistics.page_cache_missing_counter += + _page_reader->page_statistics().page_cache_missing_counter; + _chunk_statistics.page_cache_compressed_hit_counter += + _page_reader->page_statistics().page_cache_compressed_hit_counter; + _chunk_statistics.page_cache_decompressed_hit_counter += + _page_reader->page_statistics().page_cache_decompressed_hit_counter; return _chunk_statistics; } @@ -193,6 +208,11 @@ class ColumnChunkReader { size_t* result_rows, bool* cross_page); Status load_cross_page_nested_row(std::vector& rep_levels, bool* cross_page); + Slice get_page_data() const { return _page_data; } + const Slice& v2_rep_levels() const { return _v2_rep_levels; } + const Slice& v2_def_levels() const { return _v2_def_levels; } + ColumnChunkReaderStatistics& statistics() { return chunk_statistics(); } + private: enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED }; @@ -202,6 +222,7 @@ class ColumnChunkReader { void _reserve_decompress_buf(size_t size); int32_t _get_type_length(); + void _insert_page_into_cache(const std::vector& level_bytes, const Slice& payload); void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data); Status _skip_nested_rows_in_page(size_t num_rows); @@ -234,6 +255,8 @@ class ColumnChunkReader { std::unique_ptr> _page_reader; BlockCompressionCodec* _block_compress_codec = nullptr; + ParquetPageReadContext _page_read_ctx; + LevelDecoder _rep_level_decoder; LevelDecoder _def_level_decoder; size_t _chunk_parsed_values = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 30962632662df4..656e599a962641 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -110,13 +110,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, std::unique_ptr& reader, size_t max_buf_size, std::unordered_map& col_offsets, - bool in_collection, const std::set& column_ids, + RuntimeState* state, bool in_collection, + const std::set& column_ids, const std::set& filter_column_ids) { size_t total_rows = row_group.num_rows; if (field->data_type->get_primitive_type() == TYPE_ARRAY) { std::unique_ptr element_reader; RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, - element_reader, max_buf_size, col_offsets, true, column_ids, + element_reader, max_buf_size, col_offsets, state, true, column_ids, filter_column_ids)); auto array_reader = ArrayColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); element_reader->set_column_in_nested(); @@ -130,7 +131,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, column_ids.find(field->children[0].get_column_id()) != column_ids.end()) { // Create key reader RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, - key_reader, max_buf_size, col_offsets, true, column_ids, + key_reader, max_buf_size, col_offsets, state, true, column_ids, filter_column_ids)); } else { auto skip_reader = std::make_unique(row_ranges, total_rows, ctz, @@ -142,7 +143,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, column_ids.find(field->children[1].get_column_id()) != column_ids.end()) { // Create value reader RETURN_IF_ERROR(create(file, &field->children[1], row_group, row_ranges, ctz, io_ctx, - value_reader, max_buf_size, col_offsets, true, column_ids, + value_reader, max_buf_size, col_offsets, state, true, column_ids, filter_column_ids)); } else { auto skip_reader = std::make_unique(row_ranges, total_rows, ctz, @@ -165,8 +166,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, std::unique_ptr child_reader; if (column_ids.empty() || column_ids.find(child.get_column_id()) != column_ids.end()) { RETURN_IF_ERROR(create(file, &child, row_group, row_ranges, ctz, io_ctx, - child_reader, max_buf_size, col_offsets, in_collection, - column_ids, filter_column_ids)); + child_reader, max_buf_size, col_offsets, state, + in_collection, column_ids, filter_column_ids)); child_readers[child.name] = std::move(child_reader); // Record the first non-SkippingReader if (non_skip_reader_idx == -1) { @@ -184,7 +185,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, if (non_skip_reader_idx == -1) { std::unique_ptr child_reader; RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, - child_reader, max_buf_size, col_offsets, in_collection, + child_reader, max_buf_size, col_offsets, state, in_collection, column_ids, filter_column_ids)); child_reader->set_column_in_nested(); child_readers[field->children[0].name] = std::move(child_reader); @@ -205,14 +206,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } else { auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } @@ -221,14 +222,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } else { auto scalar_reader = ScalarColumnReader::create_unique( row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); - RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); + RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state)); scalar_reader->_filter_column_ids = filter_column_ids; reader.reset(scalar_reader.release()); } @@ -246,7 +247,8 @@ void ParquetColumnReader::_generate_read_ranges(RowRange page_row_range, template Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, - size_t max_buf_size) { + size_t max_buf_size, + RuntimeState* state) { _field_schema = field; auto& chunk_meta = _chunk_meta.meta_data; int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset @@ -262,8 +264,11 @@ Status ScalarColumnReader::init(io::FileReaderSPtr } _stream_reader = std::make_unique(file, chunk_start, chunk_len, prefetch_buffer_size); + ParquetPageReadContext ctx( + (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache); + _chunk_reader = std::make_unique>( - _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx); + _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx); RETURN_IF_ERROR(_chunk_reader->init()); return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 2a21ddd84cb0b5..97ad9d1fd3ac39 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -65,7 +65,15 @@ class ParquetColumnReader { decode_null_map_time(0), skip_page_header_num(0), parse_page_header_num(0), - read_page_header_time(0) {} + read_page_header_time(0), + page_read_counter(0), + page_cache_write_counter(0), + page_cache_compressed_write_counter(0), + page_cache_decompressed_write_counter(0), + page_cache_hit_counter(0), + page_cache_missing_counter(0), + page_cache_compressed_hit_counter(0), + page_cache_decompressed_hit_counter(0) {} ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time) : page_index_read_calls(0), @@ -78,7 +86,15 @@ class ParquetColumnReader { decode_null_map_time(null_map_time), skip_page_header_num(cs.skip_page_header_num), parse_page_header_num(cs.parse_page_header_num), - read_page_header_time(cs.read_page_header_time) {} + read_page_header_time(cs.read_page_header_time), + page_read_counter(cs.page_read_counter), + page_cache_write_counter(cs.page_cache_write_counter), + page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter), + page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter), + page_cache_hit_counter(cs.page_cache_hit_counter), + page_cache_missing_counter(cs.page_cache_missing_counter), + page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter), + page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {} int64_t page_index_read_calls; int64_t decompress_time; @@ -91,6 +107,14 @@ class ParquetColumnReader { int64_t skip_page_header_num; int64_t parse_page_header_num; int64_t read_page_header_time; + int64_t page_read_counter; + int64_t page_cache_write_counter; + int64_t page_cache_compressed_write_counter; + int64_t page_cache_decompressed_write_counter; + int64_t page_cache_hit_counter; + int64_t page_cache_missing_counter; + int64_t page_cache_compressed_hit_counter; + int64_t page_cache_decompressed_hit_counter; void merge(ColumnStatistics& col_statistics) { page_index_read_calls += col_statistics.page_index_read_calls; @@ -104,6 +128,17 @@ class ParquetColumnReader { skip_page_header_num += col_statistics.skip_page_header_num; parse_page_header_num += col_statistics.parse_page_header_num; read_page_header_time += col_statistics.read_page_header_time; + page_read_counter += col_statistics.page_read_counter; + page_cache_write_counter += col_statistics.page_cache_write_counter; + page_cache_compressed_write_counter += + col_statistics.page_cache_compressed_write_counter; + page_cache_decompressed_write_counter += + col_statistics.page_cache_decompressed_write_counter; + page_cache_hit_counter += col_statistics.page_cache_hit_counter; + page_cache_missing_counter += col_statistics.page_cache_missing_counter; + page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter; + page_cache_decompressed_hit_counter += + col_statistics.page_cache_decompressed_hit_counter; } }; @@ -132,7 +167,8 @@ class ParquetColumnReader { cctz::time_zone* ctz, io::IOContext* io_ctx, std::unique_ptr& reader, size_t max_buf_size, std::unordered_map& col_offsets, - bool in_collection = false, const std::set& column_ids = {}, + RuntimeState* state, bool in_collection = false, + const std::set& column_ids = {}, const std::set& filter_column_ids = {}); virtual const std::vector& get_rep_level() const = 0; virtual const std::vector& get_def_level() const = 0; @@ -175,7 +211,8 @@ class ScalarColumnReader : public ParquetColumnReader { _chunk_meta(chunk_meta), _offset_index(offset_index) {} ~ScalarColumnReader() override { close(); } - Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size); + Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, + RuntimeState* state); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, const std::shared_ptr& root_node, FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 543c8c44bd4809..12ac55016cacc6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -133,7 +133,7 @@ Status RowGroupReader::init( std::unique_ptr reader; RETURN_IF_ERROR(ParquetColumnReader::create( _file_reader, field, _row_group_meta, _read_ranges, _ctz, _io_ctx, reader, - max_buf_size, col_offsets, false, _column_ids, _filter_column_ids)); + max_buf_size, col_offsets, _state, false, _column_ids, _filter_column_ids)); if (reader == nullptr) { VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed"; return Status::Corruption("Init row group reader failed"); diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index 3b6d7fdcb9bbb2..70fdccd04a6fe9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -17,6 +17,7 @@ #include "vparquet_page_reader.h" +#include #include #include #include @@ -26,6 +27,8 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "io/fs/buffered_reader.h" +#include "olap/page_cache.h" +#include "parquet_common.h" #include "util/runtime_profile.h" #include "util/slice.h" #include "util/thrift_util.h" @@ -40,10 +43,16 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" static constexpr size_t INIT_PAGE_HEADER_SIZE = 128; +void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) { + _file_key_prefix = fmt::format("{}::{}", path, mtime); +} + template PageReader::PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, uint64_t length, size_t total_rows, + const tparquet::ColumnMetaData& metadata, + const ParquetPageReadContext& page_read_ctx, const tparquet::OffsetIndex* offset_index) : _reader(reader), _io_ctx(io_ctx), @@ -51,9 +60,12 @@ PageReader::PageReader(io::BufferedStreamReader* re _start_offset(offset), _end_offset(offset + length), _total_rows(total_rows), + _metadata(metadata), + _page_read_ctx(page_read_ctx), _offset_index(offset_index) { _next_header_offset = _offset; _state = INITIALIZED; + _page_cache_key_builder.init(_reader->path(), _reader->mtime()); if constexpr (OFFSET_INDEX) { _end_row = _offset_index->page_locations.size() >= 2 @@ -77,11 +89,72 @@ Status PageReader::parse_page_header() { return Status::IOError("Should skip or load current page to get next page"); } + _page_statistics.page_read_counter += 1; + + // Parse page header from file; header bytes are saved for possible cache insertion const uint8_t* page_header_buf = nullptr; size_t max_size = _end_offset - _offset; size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; uint32_t real_header_size = 0; + + // Try a header-only lookup in the page cache. Cached pages store + // header + optional v2 levels + uncompressed payload, so we can + // parse the page header directly from the cached bytes and avoid + // a file read for the header. + if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && + StoragePageCache::instance() != nullptr) { + PageCacheHandle handle; + StoragePageCache::CacheKey key = make_page_cache_key(static_cast(_offset)); + if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { + // Parse header directly from cached data + _page_cache_handle = std::move(handle); + Slice s = _page_cache_handle.data(); + real_header_size = cast_set(s.size); + SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); + auto st = deserialize_thrift_msg(reinterpret_cast(s.data), + &real_header_size, true, &_cur_page_header); + if (!st.ok()) return st; + // Increment page cache counters for a true cache hit on header+payload + _page_statistics.page_cache_hit_counter += 1; + // Detect whether the cached payload is compressed or decompressed and record + bool is_cache_payload_decompressed = + should_cache_decompressed(&_cur_page_header, _metadata); + + if (is_cache_payload_decompressed) { + _page_statistics.page_cache_decompressed_hit_counter += 1; + } else { + _page_statistics.page_cache_compressed_hit_counter += 1; + } + + _is_cache_payload_decompressed = is_cache_payload_decompressed; + + if constexpr (OFFSET_INDEX == false) { + if (is_header_v2()) { + _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; + } else if constexpr (!IN_COLLECTION) { + _end_row = _start_row + _cur_page_header.data_page_header.num_values; + } + } + + // Save header bytes for later use (e.g., to insert updated cache entries) + _header_buf.assign(s.data, s.data + real_header_size); + _last_header_size = real_header_size; + _page_statistics.parse_page_header_num++; + _offset += real_header_size; + _next_header_offset = _offset + _cur_page_header.compressed_page_size; + _state = HEADER_PARSED; + return Status::OK(); + } else { + _page_statistics.page_cache_missing_counter += 1; + // Clear any existing cache handle on miss to avoid holding stale handle + _page_cache_handle = PageCacheHandle(); + } + } + // NOTE: page cache lookup for *decompressed* page data is handled in + // ColumnChunkReader::load_page_data(). PageReader should only be + // responsible for parsing the header bytes from the file and saving + // them in `_header_buf` for possible later insertion into the cache. while (true) { if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { return Status::EndOfFile("stop"); @@ -115,6 +188,9 @@ Status PageReader::parse_page_header() { } } + // Save header bytes for possible cache insertion later + _header_buf.assign(page_header_buf, page_header_buf + real_header_size); + _last_header_size = real_header_size; _page_statistics.parse_page_header_num++; _offset += real_header_size; _next_header_offset = _offset + _cur_page_header.compressed_page_size; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 9246819d59c399..9aa5ba3171c851 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -21,9 +21,12 @@ #include #include +#include #include "common/cast_set.h" +#include "common/config.h" #include "common/status.h" +#include "olap/page_cache.h" #include "util/block_compression.h" #include "vec/exec/format/parquet/parquet_common.h" namespace doris { @@ -50,6 +53,35 @@ namespace doris::vectorized { * Use to deserialize parquet page header, and get the page data in iterator interface. */ +// Session-level options for parquet page reading/caching. +struct ParquetPageReadContext { + bool enable_parquet_file_page_cache = true; + ParquetPageReadContext() = default; + ParquetPageReadContext(bool enable_parquet_file_page_cache) + : enable_parquet_file_page_cache(enable_parquet_file_page_cache) {} +}; + +inline bool should_cache_decompressed(const tparquet::PageHeader* header, + const tparquet::ColumnMetaData& metadata) { + if (header->compressed_page_size <= 0) return true; + if (metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) return true; + + double ratio = static_cast(header->uncompressed_page_size) / + static_cast(header->compressed_page_size); + return ratio <= config::parquet_page_cache_decompress_threshold; +} + +class ParquetPageCacheKeyBuilder { +public: + void init(const std::string& path, int64_t mtime); + StoragePageCache::CacheKey make_key(uint64_t end_offset, int64_t offset) const { + return StoragePageCache::CacheKey(_file_key_prefix, end_offset, offset); + } + +private: + std::string _file_key_prefix; +}; + template class PageReader { public: @@ -58,10 +90,19 @@ class PageReader { int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; int64_t read_page_header_time = 0; + int64_t page_cache_hit_counter = 0; + int64_t page_cache_missing_counter = 0; + int64_t page_cache_compressed_hit_counter = 0; + int64_t page_cache_decompressed_hit_counter = 0; + int64_t page_cache_write_counter = 0; + int64_t page_cache_compressed_write_counter = 0; + int64_t page_cache_decompressed_write_counter = 0; + int64_t page_read_counter = 0; }; PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, - uint64_t length, size_t total_rows, + uint64_t length, size_t total_rows, const tparquet::ColumnMetaData& metadata, + const ParquetPageReadContext& page_read_ctx, const tparquet::OffsetIndex* offset_index = nullptr); ~PageReader() = default; @@ -123,24 +164,53 @@ class PageReader { } } - Status get_page_header(const tparquet::PageHeader*& page_header) { + Status get_page_header(const tparquet::PageHeader** page_header) { if (UNLIKELY(_state != HEADER_PARSED)) { return Status::InternalError("Page header not parsed"); } - page_header = &_cur_page_header; + *page_header = &_cur_page_header; return Status::OK(); } Status get_page_data(Slice& slice); + // Skip page data and update offset (used when data is loaded from cache) + void skip_page_data() { + if (_state == HEADER_PARSED) { + _offset += _cur_page_header.compressed_page_size; + _state = DATA_LOADED; + } + } + + const std::vector& header_bytes() const { return _header_buf; } + // header start offset for current page + int64_t header_start_offset() const { + return static_cast(_next_header_offset) - static_cast(_last_header_size) - + static_cast(_cur_page_header.compressed_page_size); + } + uint64_t file_end_offset() const { return _end_offset; } + bool cached_decompressed() const { + return should_cache_decompressed(&_cur_page_header, _metadata); + } + PageStatistics& page_statistics() { return _page_statistics; } bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2; } + // Returns whether the current page's cache payload is decompressed + bool is_cache_payload_decompressed() const { return _is_cache_payload_decompressed; } + size_t start_row() const { return _start_row; } size_t end_row() const { return _end_row; } + // Accessors for cache handle + bool has_page_cache_handle() const { return _page_cache_handle.cache() != nullptr; } + const doris::PageCacheHandle& page_cache_handle() const { return _page_cache_handle; } + StoragePageCache::CacheKey make_page_cache_key(int64_t offset) const { + return _page_cache_key_builder.make_key(_end_offset, offset); + } + private: enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED }; PageReaderState _state = INITIALIZED; @@ -159,19 +229,33 @@ class PageReader { size_t _end_row = 0; // total rows in this column chunk size_t _total_rows = 0; + // Column metadata for this column chunk + const tparquet::ColumnMetaData& _metadata; + // Session-level parquet page cache options + ParquetPageReadContext _page_read_ctx; // for page index size_t _page_index = 0; const tparquet::OffsetIndex* _offset_index; tparquet::PageHeader _cur_page_header; + bool _is_cache_payload_decompressed = true; + + // Page cache members + ParquetPageCacheKeyBuilder _page_cache_key_builder; + doris::PageCacheHandle _page_cache_handle; + // stored header bytes when cache miss so we can insert header+payload into cache + std::vector _header_buf; + // last parsed header size in bytes + uint32_t _last_header_size = 0; }; template std::unique_ptr> create_page_reader( io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, uint64_t length, - size_t total_rows, const tparquet::OffsetIndex* offset_index = nullptr) { - return std::make_unique>(reader, io_ctx, offset, length, - total_rows, offset_index); + size_t total_rows, const tparquet::ColumnMetaData& metadata, + const ParquetPageReadContext& ctx, const tparquet::OffsetIndex* offset_index = nullptr) { + return std::make_unique>( + reader, io_ctx, offset, length, total_rows, metadata, ctx, offset_index); } #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index a84ee2b9d2ceda..de99ad2170c555 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -194,6 +194,22 @@ void ParquetReader::_init_profile() { ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", parquet_profile, 1); _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_read_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageReadCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_write_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheWriteCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_compressed_write_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheCompressedWriteCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_decompressed_write_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheDecompressedWriteCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheHitCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_missing_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheMissingCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_compressed_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheCompressedHitCount", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.page_cache_decompressed_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "PageCacheDecompressedHitCount", TUnit::UNIT, parquet_profile, 1); _parquet_profile.decode_header_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime", parquet_profile, 1); _parquet_profile.read_page_header_time = @@ -1271,6 +1287,21 @@ void ParquetReader::_collect_profile() { _column_statistics.page_index_read_calls); COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt); + COUNTER_UPDATE(_parquet_profile.page_read_counter, _column_statistics.page_read_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_write_counter, + _column_statistics.page_cache_write_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_compressed_write_counter, + _column_statistics.page_cache_compressed_write_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_write_counter, + _column_statistics.page_cache_decompressed_write_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter, + _column_statistics.page_cache_hit_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter, + _column_statistics.page_cache_missing_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter, + _column_statistics.page_cache_compressed_hit_counter); + COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_hit_counter, + _column_statistics.page_cache_decompressed_hit_counter); COUNTER_UPDATE(_parquet_profile.decode_header_time, _column_statistics.decode_header_time); COUNTER_UPDATE(_parquet_profile.read_page_header_time, _column_statistics.read_page_header_time); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index b6451ad15d41d2..dcd9e99f393e41 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -195,6 +195,14 @@ class ParquetReader : public GenericReader { RuntimeProfile::Counter* file_footer_hit_cache = nullptr; RuntimeProfile::Counter* decompress_time = nullptr; RuntimeProfile::Counter* decompress_cnt = nullptr; + RuntimeProfile::Counter* page_read_counter = nullptr; + RuntimeProfile::Counter* page_cache_write_counter = nullptr; + RuntimeProfile::Counter* page_cache_compressed_write_counter = nullptr; + RuntimeProfile::Counter* page_cache_decompressed_write_counter = nullptr; + RuntimeProfile::Counter* page_cache_hit_counter = nullptr; + RuntimeProfile::Counter* page_cache_missing_counter = nullptr; + RuntimeProfile::Counter* page_cache_compressed_hit_counter = nullptr; + RuntimeProfile::Counter* page_cache_decompressed_hit_counter = nullptr; RuntimeProfile::Counter* decode_header_time = nullptr; RuntimeProfile::Counter* read_page_header_time = nullptr; RuntimeProfile::Counter* decode_value_time = nullptr; diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 3874b06c68c592..bc92d22b178d05 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -68,6 +68,8 @@ class SyncLocalFileReader : public io::FileReader { bool closed() const override { return _reader->closed(); } + int64_t mtime() const override { return _reader->mtime(); } + private: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const io::IOContext* io_ctx) override { @@ -96,6 +98,8 @@ class MockOffsetFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const io::IOContext* io_ctx) override { @@ -130,6 +134,8 @@ class TestingRangeCacheFileReader : public io::FileReader { bool closed() const override { return _delegate->closed(); } + int64_t mtime() const override { return _delegate->mtime(); } + const io::PrefetchRange& last_read_range() const { return *_last_read_range; } protected: diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp b/be/test/io/fs/packed_file_concurrency_test.cpp index 31c2db19fb8675..1e581ff59b8259 100644 --- a/be/test/io/fs/packed_file_concurrency_test.cpp +++ b/be/test/io/fs/packed_file_concurrency_test.cpp @@ -411,6 +411,8 @@ class MockRemoteReader : public FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* /*io_ctx*/) override { diff --git a/be/test/io/fs/packed_file_reader_test.cpp b/be/test/io/fs/packed_file_reader_test.cpp index feaf5c37f3d104..2b0472c38a5795 100644 --- a/be/test/io/fs/packed_file_reader_test.cpp +++ b/be/test/io/fs/packed_file_reader_test.cpp @@ -69,6 +69,8 @@ class MockFileReader : public FileReader { return Status::OK(); } + int64_t mtime() const override { return 0; } + private: Path _path = Path("mock_file"); std::string _content; diff --git a/be/test/io/fs/packed_file_system_test.cpp b/be/test/io/fs/packed_file_system_test.cpp index 7f8a4af63df0b7..096bb8c4a93516 100644 --- a/be/test/io/fs/packed_file_system_test.cpp +++ b/be/test/io/fs/packed_file_system_test.cpp @@ -53,6 +53,8 @@ class MockFileReader : public FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* /*io_ctx*/) override { diff --git a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp index 3aef8db8459f39..5ea6335d2b9d18 100644 --- a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp +++ b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp @@ -38,6 +38,8 @@ class MockFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + Status close() override { _closed = true; return Status::OK(); diff --git a/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp new file mode 100644 index 00000000000000..c415d264e8e915 --- /dev/null +++ b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp @@ -0,0 +1,804 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "common/config.h" +#include "io/fs/buffered_reader.h" +#include "olap/page_cache.h" +#include "runtime/exec_env.h" +#include "runtime/memory/cache_manager.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "util/thrift_util.h" +#include "vec/exec/format/parquet/schema_desc.h" +#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" +#include "vec/exec/format/parquet/vparquet_page_reader.h" + +using namespace doris; +using namespace doris::vectorized; + +class FakeBufferedReader : public io::BufferedStreamReader { +public: + FakeBufferedReader(std::string path, std::vector data) + : _path(std::move(path)), _data(std::move(data)) {} + Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read, + const doris::io::IOContext* io_ctx) override { + if (offset + bytes_to_read > _data.size()) return Status::IOError("Out of bounds"); + *buf = _data.data() + offset; + return Status::OK(); + } + Status read_bytes(Slice& slice, uint64_t offset, const doris::io::IOContext* io_ctx) override { + if (offset + slice.size > _data.size()) return Status::IOError("Out of bounds"); + slice.data = reinterpret_cast(_data.data() + offset); + return Status::OK(); + } + std::string path() override { return _path; } + + int64_t mtime() const override { return 0; } + +private: + std::string _path; + std::vector _data; +}; + +TEST(ParquetPageCacheTest, CacheHitReturnsDecompressedPayload) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // construct thrift PageHeader (uncompressed payload) and payload + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(4); + header.__set_uncompressed_page_size(4); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector payload = {0x11, 0x22, 0x33, 0x44}; + std::vector cached_data; + cached_data.insert(cached_data.end(), header_bytes.begin(), header_bytes.end()); + cached_data.insert(cached_data.end(), payload.begin(), payload.end()); + + std::string path = "test_parquet_cache_file"; + int64_t header_offset = 128; + // make file_end_offset consistent with reader/page reader end offset used in test + int64_t file_end_offset = header_offset + static_cast(cached_data.size()); + + // insert into cache + int64_t mtime = 0; + StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime), + static_cast(file_end_offset), header_offset); + size_t total = cached_data.size(); + auto* page = new DataPage(total, true, segment_v2::DATA_PAGE); + memcpy(page->data(), cached_data.data(), total); + page->reset_size(total); + PageCacheHandle handle; + StoragePageCache::instance()->insert(key, page, &handle, segment_v2::DATA_PAGE); + + // create fake reader and a ColumnChunkReader to verify cache hit + // ensure the reader contains the same header+payload at the header offset so header parsing succeeds + std::vector backing(256, 0); + memcpy(backing.data() + header_offset, cached_data.data(), total); + FakeBufferedReader reader(path, backing); + // prepare column chunk metadata so ColumnChunkReader uses same offsets + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(header_offset); + cc.meta_data.__set_total_compressed_size(total); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + // load_page_data should hit the cache and return decompressed payload + ASSERT_TRUE(ccr.load_page_data().ok()); + Slice s = ccr.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + ASSERT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + // stats: ensure there was a page read and at least one hit recorded + auto& statistics = ccr.statistics(); + EXPECT_EQ(statistics.page_read_counter, 1); + EXPECT_EQ(statistics.page_cache_hit_counter, 1); + EXPECT_EQ(statistics.page_cache_decompressed_hit_counter, 1); +} + +TEST(ParquetPageCacheTest, DecompressedPageInsertedByColumnChunkReader) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // ensure decompressed pages are cached via BE config + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + + // construct uncompressed header + payload in file buffer + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(4); + header.__set_uncompressed_page_size(4); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector payload = {0x55, 0x66, 0x77, 0x88}; + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), payload.begin(), payload.end()); + + std::string path = "test_parquet_insert_file"; + int64_t header_offset = 0; + + FakeBufferedReader reader(path, file_data); + + // prepare column chunk metadata + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(header_offset); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + { + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + + // Now cache should have an entry; verify by creating a fresh ColumnChunkReader and hitting cache + ColumnChunkReader ccr_check(&reader, &cc, &field_schema, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1); + } + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, V2LevelsPreservedInCache) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + // ensure decompressed pages are cached via BE config + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + + // construct v2 header + levels + payload in file buffer (uncompressed) + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + int payload_sz = 2; + header.__set_compressed_page_size(rl + dl + payload_sz); + header.__set_uncompressed_page_size(rl + dl + payload_sz); + header.__isset.data_page_header_v2 = true; + header.data_page_header_v2.__set_repetition_levels_byte_length(rl); + header.data_page_header_v2.__set_definition_levels_byte_length(dl); + header.data_page_header_v2.__set_is_compressed(false); + header.data_page_header_v2.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload = {0xAA, 0xBB}; + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), level_bytes.begin(), level_bytes.end()); + file_data.insert(file_data.end(), payload.begin(), payload.end()); + + std::string path = "test_v2_levels_file"; + FakeBufferedReader reader(path, file_data); + + // prepare column chunk metadata + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + { + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + + // Now cache should have entry; verify by creating a ColumnChunkReader and hitting cache + ColumnChunkReader ccr_check(&reader, &cc, &field_schema, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + } + + // Verify that a fresh ColumnChunkReader reusing cache gets level bytes preserved + FieldSchema field_schema2; + field_schema2.repetition_level = 2; // v2 levels present + field_schema2.definition_level = 1; + ColumnChunkReader ccr2(&reader, &cc, &field_schema2, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr2.init().ok()); + ASSERT_TRUE(ccr2.load_page_data().ok()); + // Level slices should equal the original level bytes + const Slice& rep = ccr2.v2_rep_levels(); + const Slice& def = ccr2.v2_def_levels(); + auto& statistics = ccr2.statistics(); + EXPECT_GT(statistics.page_cache_hit_counter, 0); + // because threshold is set to cache decompressed, we should see decompressed hits + EXPECT_GT(statistics.page_cache_decompressed_hit_counter, 0); + ASSERT_EQ(def.size, dl); + EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl)); + EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl)); + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, CompressedV1PageCachedAndHit) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // construct compressed v1 header + compressed payload in file buffer + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector payload = {0x01, 0x02, 0x03, 0x04}; + + // compress payload using a block codec + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + header.__set_compressed_page_size(static_cast(compressed_fast.size())); + header.__set_uncompressed_page_size(static_cast(payload.size())); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + std::string path = "test_compressed_v1_file"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + + // Load page to trigger decompression + cache insert + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1); + + // Now verify a fresh reader hits the cache and returns payload + ColumnChunkReader ccr_check(&reader, &cc, &field_schema, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1); +} + +TEST(ParquetPageCacheTest, CompressedV2LevelsPreservedInCache) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // construct v2 header + levels + compressed payload in file buffer + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + //int payload_sz = 2; + header.__isset.data_page_header_v2 = true; + header.data_page_header_v2.__set_repetition_levels_byte_length(rl); + header.data_page_header_v2.__set_definition_levels_byte_length(dl); + header.data_page_header_v2.__set_is_compressed(true); + header.data_page_header_v2.__set_num_values(1); + + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload = {0xAA, 0xBB}; + + // compress payload + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + // compressed page: levels (uncompressed) followed by compressed payload + std::vector compressed_page; + compressed_page.insert(compressed_page.end(), level_bytes.begin(), level_bytes.end()); + compressed_page.insert(compressed_page.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + header.__set_compressed_page_size(static_cast(compressed_page.size())); + header.__set_uncompressed_page_size(static_cast(level_bytes.size() + payload.size())); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_page.begin(), compressed_page.end()); + + std::string path = "test_compressed_v2_file"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema field_schema; + field_schema.repetition_level = 0; + field_schema.definition_level = 0; + + // Load page to trigger decompression + cache insert + ColumnChunkReader ccr(&reader, &cc, &field_schema, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1); + + // Now verify a fresh reader hits the cache and v2 levels are preserved + FieldSchema field_schema2; + field_schema2.repetition_level = rl; + field_schema2.definition_level = dl; + ColumnChunkReader ccr_check(&reader, &cc, &field_schema2, nullptr, 0, nullptr, + ctx); + ASSERT_TRUE(ccr_check.init().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + Slice s = ccr_check.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + const Slice& rep = ccr_check.v2_rep_levels(); + const Slice& def = ccr_check.v2_def_levels(); + ASSERT_EQ(rep.size, rl); + ASSERT_EQ(def.size, dl); + // cached v2 page is stored decompressed (threshold=100), make sure counter reflects it + EXPECT_GT(ccr_check.statistics().page_cache_decompressed_hit_counter, 0); + EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl)); + EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl)); +} + +TEST(ParquetPageCacheTest, MultiPagesMixedV1V2CacheHit) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // Prepare a v1 uncompressed page and a v2 uncompressed page and insert both into cache + std::string path = "test_multi_pages_file"; + + // v1 page + tparquet::PageHeader hdr1; + hdr1.type = tparquet::PageType::DATA_PAGE; + hdr1.__set_compressed_page_size(4); + hdr1.__set_uncompressed_page_size(4); + hdr1.__isset.data_page_header = true; + hdr1.data_page_header.__set_num_values(1); + std::vector header1_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&hdr1, &header1_bytes).ok()); + std::vector payload1 = {0x10, 0x20, 0x30, 0x40}; + std::vector cached1; + cached1.insert(cached1.end(), header1_bytes.begin(), header1_bytes.end()); + cached1.insert(cached1.end(), payload1.begin(), payload1.end()); + + // v2 page + tparquet::PageHeader hdr2; + hdr2.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + int payload2_sz = 2; + hdr2.__set_compressed_page_size(rl + dl + payload2_sz); + hdr2.__set_uncompressed_page_size(rl + dl + payload2_sz); + hdr2.__isset.data_page_header_v2 = true; + hdr2.data_page_header_v2.__set_repetition_levels_byte_length(rl); + hdr2.data_page_header_v2.__set_definition_levels_byte_length(dl); + hdr2.data_page_header_v2.__set_is_compressed(false); + hdr2.data_page_header_v2.__set_num_values(1); + std::vector header2_bytes; + ASSERT_TRUE(ts.serialize(&hdr2, &header2_bytes).ok()); + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload2 = {0xAA, 0xBB}; + std::vector cached2; + cached2.insert(cached2.end(), header2_bytes.begin(), header2_bytes.end()); + cached2.insert(cached2.end(), level_bytes.begin(), level_bytes.end()); + cached2.insert(cached2.end(), payload2.begin(), payload2.end()); + + // Insert both pages into cache under different header offsets + size_t total1 = cached1.size(); + auto* page1 = new DataPage(total1, true, segment_v2::DATA_PAGE); + memcpy(page1->data(), cached1.data(), total1); + page1->reset_size(total1); + PageCacheHandle h1; + size_t header1_start = 128; + int64_t mtime = 0; + StoragePageCache::CacheKey key1(fmt::format("{}::{}", path, mtime), + static_cast(header1_start + total1), header1_start); + StoragePageCache::instance()->insert(key1, page1, &h1, segment_v2::DATA_PAGE); + + size_t total2 = cached2.size(); + auto* page2 = new DataPage(total2, true, segment_v2::DATA_PAGE); + memcpy(page2->data(), cached2.data(), total2); + page2->reset_size(total2); + PageCacheHandle h2; + size_t header2_start = 256; + StoragePageCache::CacheKey key2(fmt::format("{}::{}", path, mtime), + static_cast(header2_start + total2), header2_start); + StoragePageCache::instance()->insert(key2, page2, &h2, segment_v2::DATA_PAGE); + + // Now create readers that would lookup those cache keys + // Reader1 must expose header+page bytes at offset header1_start + std::vector reader_backing1(3000, 0); + memcpy(reader_backing1.data() + header1_start, cached1.data(), total1); + FakeBufferedReader reader1(path, reader_backing1); + tparquet::ColumnChunk cc1; + cc1.meta_data.__set_data_page_offset(128); + cc1.meta_data.__set_total_compressed_size(total1); + cc1.meta_data.__set_num_values(1); + cc1.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + FieldSchema field_schema1; + field_schema1.repetition_level = 0; + field_schema1.definition_level = 0; + ColumnChunkReader ccr1(&reader1, &cc1, &field_schema1, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr1.init().ok()); + ASSERT_TRUE(ccr1.load_page_data().ok()); + Slice s1 = ccr1.get_page_data(); + ASSERT_EQ(s1.size, payload1.size()); + EXPECT_EQ(0, memcmp(s1.data, payload1.data(), payload1.size())); + + std::vector reader_backing2(3000, 0); + memcpy(reader_backing2.data() + header2_start, cached2.data(), total2); + FakeBufferedReader reader2(path, reader_backing2); + tparquet::ColumnChunk cc2; + cc2.meta_data.__set_data_page_offset(256); + cc2.meta_data.__set_total_compressed_size(total2); + cc2.meta_data.__set_num_values(1); + cc2.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + FieldSchema field_schema2; + field_schema2.repetition_level = rl; + field_schema2.definition_level = dl; + ColumnChunkReader ccr2(&reader2, &cc2, &field_schema2, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr2.init().ok()); + ASSERT_TRUE(ccr2.load_page_data().ok()); + Slice s2 = ccr2.get_page_data(); + ASSERT_EQ(s2.size, payload2.size()); + EXPECT_EQ(0, memcmp(s2.data, payload2.data(), payload2.size())); + const Slice& rep = ccr2.v2_rep_levels(); + const Slice& def = ccr2.v2_def_levels(); + ASSERT_EQ(rep.size, rl); + ASSERT_EQ(def.size, dl); + EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl)); + EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl)); +} + +TEST(ParquetPageCacheTest, CacheMissThenHit) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // uncompressed v1 page + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(4); + header.__set_uncompressed_page_size(4); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + std::vector payload = {0xDE, 0xAD, 0xBE, 0xEF}; + std::vector backing(256, 0); + std::vector cached; + cached.insert(cached.end(), header_bytes.begin(), header_bytes.end()); + cached.insert(cached.end(), payload.begin(), payload.end()); + int64_t header_offset = 64; + memcpy(backing.data() + header_offset, cached.data(), cached.size()); + + std::string path = "test_miss_then_hit"; + FakeBufferedReader reader(path, backing); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(header_offset); + cc.meta_data.__set_total_compressed_size(cached.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + + FieldSchema fs; + fs.repetition_level = 0; + fs.definition_level = 0; + + // First reader: should not hit cache, but should write cache + ColumnChunkReader ccr(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + auto& statistics = ccr.statistics(); + EXPECT_EQ(statistics.page_cache_hit_counter, 0); + EXPECT_EQ(statistics.page_cache_write_counter, 1); + + // Second reader: should hit cache + ColumnChunkReader ccr2(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr2.init().ok()); + ASSERT_TRUE(ccr2.load_page_data().ok()); + auto& statistics2 = ccr2.statistics(); + EXPECT_EQ(statistics2.page_cache_hit_counter, 1); + EXPECT_EQ(statistics2.page_cache_decompressed_hit_counter, 1); +} + +TEST(ParquetPageCacheTest, DecompressThresholdCachesCompressed) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // prepare a compressible payload (lots of zeros) + std::vector payload(1024, 0); + + // compress payload using snappy + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(static_cast(compressed_fast.size())); + header.__set_uncompressed_page_size(static_cast(payload.size())); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + std::string path = "test_threshold_file_compressed"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema fs; + fs.repetition_level = 0; + fs.definition_level = 0; + + // Case: very small threshold -> cache the compressed payload (smaller footprint) + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 0.1; + config::enable_parquet_cache_compressed_pages = true; + ColumnChunkReader ccr_small_thresh(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr_small_thresh.init().ok()); + // ASSERT_TRUE(ccr_small_thresh.next_page().ok()); + ASSERT_TRUE(ccr_small_thresh.load_page_data().ok()); + EXPECT_EQ(ccr_small_thresh.statistics().page_cache_write_counter, 1); + + // Inspect cache entry: payload stored should be compressed size + PageCacheHandle handle_small; + size_t file_end = header_bytes.size() + compressed_fast.size(); + int64_t mtime = 0; + StoragePageCache::CacheKey key_small(fmt::format("{}::{}", path, mtime), + /*file_end_offset*/ file_end, /*header_start*/ 0); + bool found_small = + StoragePageCache::instance()->lookup(key_small, &handle_small, segment_v2::DATA_PAGE); + ASSERT_TRUE(found_small); + Slice cached_small = handle_small.data(); + size_t header_size = header_bytes.size(); + size_t payload_in_cache_size = cached_small.size - header_size; // no levels here + ASSERT_EQ(payload_in_cache_size, compressed_fast.size()); + + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, DecompressThresholdCachesDecompressed) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + + // prepare a compressible payload (lots of zeros) + std::vector payload(1024, 0); + + // compress payload using snappy + BlockCompressionCodec* codec = nullptr; + ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, &codec).ok()); + faststring compressed_fast; + std::vector inputs; + inputs.emplace_back(payload.data(), payload.size()); + ASSERT_TRUE(codec->compress(inputs, payload.size(), &compressed_fast).ok()); + + tparquet::PageHeader header; + header.type = tparquet::PageType::DATA_PAGE; + header.__set_compressed_page_size(static_cast(compressed_fast.size())); + header.__set_uncompressed_page_size(static_cast(payload.size())); + header.__isset.data_page_header = true; + header.data_page_header.__set_num_values(1); + + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok()); + + std::vector file_data; + file_data.insert(file_data.end(), header_bytes.begin(), header_bytes.end()); + file_data.insert(file_data.end(), compressed_fast.data(), + compressed_fast.data() + compressed_fast.size()); + + std::string path = "test_threshold_file_decompressed"; + FakeBufferedReader reader(path, file_data); + + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(0); + cc.meta_data.__set_total_compressed_size(file_data.size()); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY); + + FieldSchema fs; + fs.repetition_level = 0; + fs.definition_level = 0; + + // Case: very large threshold -> cache decompressed payload + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + ColumnChunkReader ccr_large_thresh(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr_large_thresh.init().ok()); + // ASSERT_TRUE(ccr_large_thresh.next_page().ok()); + ASSERT_TRUE(ccr_large_thresh.load_page_data().ok()); + EXPECT_EQ(ccr_large_thresh.statistics().page_cache_write_counter, 1); + + // Inspect cache entry for large threshold: payload stored should be uncompressed size + PageCacheHandle handle_large; + size_t file_end = header_bytes.size() + compressed_fast.size(); + int64_t mtime = 0; + StoragePageCache::CacheKey key_large(fmt::format("{}::{}", path, mtime), + /*file_end_offset*/ file_end, /*header_start*/ 0); + bool found_large = + StoragePageCache::instance()->lookup(key_large, &handle_large, segment_v2::DATA_PAGE); + ASSERT_TRUE(found_large); + Slice cached_large = handle_large.data(); + size_t payload_in_cache_size_large = cached_large.size - header_bytes.size(); + ASSERT_EQ(payload_in_cache_size_large, payload.size()); + + // Verify cache hit for a new reader (should hit the decompressed entry we just created) + ColumnChunkReader ccr_check(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr_check.init().ok()); + // ASSERT_TRUE(ccr_check.next_page().ok()); + ASSERT_TRUE(ccr_check.load_page_data().ok()); + EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1); + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} + +TEST(ParquetPageCacheTest, MultipleReadersShareCachedEntry) { + ParquetPageReadContext ctx; + ctx.enable_parquet_file_page_cache = true; + double old_thresh = config::parquet_page_cache_decompress_threshold; + bool old_enable_compressed = config::enable_parquet_cache_compressed_pages; + config::parquet_page_cache_decompress_threshold = 100.0; + config::enable_parquet_cache_compressed_pages = false; + + // Create a v2 cached page and then instantiate multiple readers that hit the cache + std::string path = "test_shared_handles"; + tparquet::PageHeader hdr; + hdr.type = tparquet::PageType::DATA_PAGE_V2; + int rl = 2; + int dl = 1; + hdr.__isset.data_page_header_v2 = true; + hdr.data_page_header_v2.__set_repetition_levels_byte_length(rl); + hdr.data_page_header_v2.__set_definition_levels_byte_length(dl); + hdr.data_page_header_v2.__set_is_compressed(false); + hdr.data_page_header_v2.__set_num_values(1); + std::vector header_bytes; + ThriftSerializer ts(/*compact*/ true, /*initial*/ 256); + ASSERT_TRUE(ts.serialize(&hdr, &header_bytes).ok()); + std::vector level_bytes = {0x11, 0x22, 0x33}; + std::vector payload = {0x0A, 0x0B}; + std::vector cached; + cached.insert(cached.end(), header_bytes.begin(), header_bytes.end()); + cached.insert(cached.end(), level_bytes.begin(), level_bytes.end()); + cached.insert(cached.end(), payload.begin(), payload.end()); + + size_t total = cached.size(); + auto* page = new DataPage(total, true, segment_v2::DATA_PAGE); + memcpy(page->data(), cached.data(), total); + page->reset_size(total); + PageCacheHandle handle; + size_t header_start = 512; + int64_t mtime = 0; + StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime), + static_cast(header_start + total), header_start); + StoragePageCache::instance()->insert(key, page, &handle, segment_v2::DATA_PAGE); + + // Create multiple readers that will hit cache + const int N = 4; + for (int i = 0; i < N; ++i) { + std::vector reader_backing(5000, 0); + memcpy(reader_backing.data() + header_start, cached.data(), total); + FakeBufferedReader reader(path, reader_backing); + tparquet::ColumnChunk cc; + cc.meta_data.__set_data_page_offset(512); + cc.meta_data.__set_total_compressed_size(total); + cc.meta_data.__set_num_values(1); + cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED); + FieldSchema fs; + fs.repetition_level = rl; + fs.definition_level = dl; + ColumnChunkReader ccr(&reader, &cc, &fs, nullptr, 0, nullptr, ctx); + ASSERT_TRUE(ccr.init().ok()); + ASSERT_TRUE(ccr.load_page_data().ok()); + Slice s = ccr.get_page_data(); + ASSERT_EQ(s.size, payload.size()); + EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size())); + const Slice& rep = ccr.v2_rep_levels(); + const Slice& def = ccr.v2_def_levels(); + ASSERT_EQ(rep.size, rl); + ASSERT_EQ(def.size, dl); + } + // restore config + config::parquet_page_cache_decompress_threshold = old_thresh; + config::enable_parquet_cache_compressed_pages = old_enable_compressed; +} diff --git a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp index 1d9c62ebfad851..e3695b2b54544e 100644 --- a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp @@ -197,8 +197,9 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column io::BufferedFileStreamReader stream_reader(file_reader, start_offset, chunk_size, 1024); + ParquetPageReadContext page_read_ctx; ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema, - nullptr, total_rows, nullptr); + nullptr, total_rows, nullptr, page_read_ctx); // initialize chunk reader static_cast(chunk_reader.init()); // seek to next page header diff --git a/be/test/vec/exec/orc/orc_file_reader_test.cpp b/be/test/vec/exec/orc/orc_file_reader_test.cpp index 9e1003c397f07f..4c71129cdbbc3d 100644 --- a/be/test/vec/exec/orc/orc_file_reader_test.cpp +++ b/be/test/vec/exec/orc/orc_file_reader_test.cpp @@ -41,6 +41,8 @@ class MockFileReader : public io::FileReader { bool closed() const override { return _closed; } + int64_t mtime() const override { return 0; } + void set_data(const std::string& data) { _data = data; } protected: diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 9defb0cbf08584..526014569fe56a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -505,6 +505,7 @@ public class SessionVariable implements Serializable, Writable { public static final String SHOW_USER_DEFAULT_ROLE = "show_user_default_role"; public static final String ENABLE_PAGE_CACHE = "enable_page_cache"; + public static final String ENABLE_PARQUET_FILE_PAGE_CACHE = "enable_parquet_file_page_cache"; public static final String MINIDUMP_PATH = "minidump_path"; @@ -2228,6 +2229,13 @@ public boolean isEnableHboNonStrictMatchingMode() { needForward = true) public boolean enablePageCache = true; + @VariableMgr.VarAttr( + name = ENABLE_PARQUET_FILE_PAGE_CACHE, + description = {"控制是否启用 Parquet file page cache。默认为 true。", + "Controls whether to use Parquet file page cache. The default is true."}, + needForward = true) + public boolean enableParquetFilePageCache = true; + @VariableMgr.VarAttr(name = ENABLE_FOLD_NONDETERMINISTIC_FN) public boolean enableFoldNondeterministicFn = false; @@ -5107,6 +5115,8 @@ public TQueryOptions toThrift() { tResult.setEnablePageCache(enablePageCache); + tResult.setEnableParquetFilePageCache(enableParquetFilePageCache); + tResult.setFileCacheBasePath(fileCacheBasePath); tResult.setEnableInvertedIndexQuery(enableInvertedIndexQuery); @@ -5121,6 +5131,8 @@ public TQueryOptions toThrift() { tResult.setEnableOrcLazyMat(enableOrcLazyMat); tResult.setEnableParquetFilterByMinMax(enableParquetFilterByMinMax); tResult.setEnableParquetFilterByBloomFilter(enableParquetFilterByBloomFilter); + + tResult.setEnableParquetFilePageCache(enableParquetFilePageCache); tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax); tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 3217eeacb06874..678e1a067e261e 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -436,6 +436,8 @@ struct TQueryOptions { // hash table expansion thresholds since all data is local. 202: optional bool single_backend_query = false; + 185: optional bool enable_parquet_file_page_cache = true; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.