From 941be828037689292aeec6bcefaec2c211696563 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 2 Jun 2026 22:37:51 +0800 Subject: [PATCH] [fix](be) Use shared IOContext in file scanner readers ### What problem does this PR solve? Issue Number: close #25905 Related PR: None Problem Summary: FileScanner kept passing raw IOContext pointers to several file readers, so DelegateReader could still create a shallow-copied IOContext on the hot scan path. That left different IOContext instances inside the same reader stack and could also dereference missing child stats pointers when an IOContext existed without file reader stats. This change keeps FileScanner's IOContext in a shared holder, passes it through CSV, text, JSON, native, Parquet, ORC, and table-format reader variants, and makes Native/Parquet/ORC use the shared DelegateReader API when a holder is available. Tracing/stat updates now check the nested stats pointer before use. ### Release note None ### Check List (For Author) - Test: Manual test - Ran build-support/clang-format.sh - Ran git diff --cached --check - Tried ./build.sh --be, but local JAVA_HOME points to JDK 11 and the build script requires JDK 17 - Behavior changed: No - Does this need documentation: No --- be/src/exec/scan/file_scanner.cpp | 61 +++++++++---------- be/src/exec/scan/file_scanner.h | 4 +- be/src/format/arrow/arrow_stream_reader.cpp | 3 +- be/src/format/csv/csv_reader.cpp | 3 +- be/src/format/json/new_json_reader.cpp | 3 +- be/src/format/native/native_reader.cpp | 25 ++++++-- be/src/format/native/native_reader.h | 6 ++ be/src/format/orc/vorc_reader.cpp | 7 ++- be/src/format/orc/vorc_reader.h | 7 ++- be/src/format/parquet/vparquet_reader.cpp | 21 +++++-- be/src/format/table/hive_reader.h | 19 ++++++ be/src/format/table/hudi_reader.h | 13 ++++ be/src/format/table/iceberg_reader.h | 15 +++++ be/src/format/table/paimon_reader.h | 21 +++++++ .../table/transactional_hive_reader.cpp | 17 ++++++ .../format/table/transactional_hive_reader.h | 8 +++ be/src/format/text/text_reader.cpp | 6 +- be/src/format/text/text_reader.h | 3 +- 18 files changed, 186 insertions(+), 56 deletions(-) diff --git a/be/src/exec/scan/file_scanner.cpp b/be/src/exec/scan/file_scanner.cpp index c0a79f9f38d9ec..5a0955fb55ffe0 100644 --- a/be/src/exec/scan/file_scanner.cpp +++ b/be/src/exec/scan/file_scanner.cpp @@ -1197,9 +1197,9 @@ Status FileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_DEFLATE: case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: case TFileFormatType::FORMAT_PROTO: { - auto reader = - CsvReader::create_unique(_state, _profile, &_counter, *_params, range, - _file_slot_descs, _state->batch_size(), _io_ctx.get()); + auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range, + _file_slot_descs, _state->batch_size(), nullptr, + _io_ctx); CsvInitContext csv_ctx; _fill_base_init_context(&csv_ctx); csv_ctx.is_load = _is_load; @@ -1209,8 +1209,8 @@ Status FileScanner::_get_next_reader() { } case TFileFormatType::FORMAT_TEXT: { auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range, - _file_slot_descs, _state->batch_size(), - _io_ctx.get()); + _file_slot_descs, _state->batch_size(), nullptr, + _io_ctx); CsvInitContext text_ctx; _fill_base_init_context(&text_ctx); text_ctx.is_load = _is_load; @@ -1221,7 +1221,7 @@ Status FileScanner::_get_next_reader() { case TFileFormatType::FORMAT_JSON: { _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range, _file_slot_descs, &_scanner_eof, - _state->batch_size(), _io_ctx.get()); + _state->batch_size(), nullptr, _io_ctx); JsonInitContext json_ctx; _fill_base_init_context(&json_ctx); json_ctx.col_default_value_ctx = &_col_default_value_ctx; @@ -1239,8 +1239,7 @@ Status FileScanner::_get_next_reader() { break; } case TFileFormatType::FORMAT_NATIVE: { - auto reader = - NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state); + auto reader = NativeReader::create_unique(_profile, *_params, range, _io_ctx, _state); ReaderInitContext native_ctx; _fill_base_init_context(&native_ctx); init_status = static_cast(reader.get())->init_reader(&native_ctx); @@ -1382,7 +1381,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, // IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed std::unique_ptr iceberg_reader = IcebergParquetReader::create_unique( _kv_cache, _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr); + _io_ctx, _state, file_meta_cache_ptr); iceberg_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { return _create_row_id_column_iterator(); @@ -1394,21 +1393,21 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, // PaimonParquetReader IS-A ParquetReader, no wrapping needed auto paimon_reader = PaimonParquetReader::create_unique( _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _kv_cache, - _io_ctx.get(), _state, file_meta_cache_ptr); + _io_ctx, _state, file_meta_cache_ptr); init_status = static_cast(paimon_reader.get())->init_reader(&pctx); _cur_reader = std::move(paimon_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { // HudiParquetReader IS-A ParquetReader, no wrapping needed auto hudi_reader = HudiParquetReader::create_unique( - _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr); + _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _io_ctx, + _state, file_meta_cache_ptr); init_status = static_cast(hudi_reader.get())->init_reader(&pctx); _cur_reader = std::move(hudi_reader); } else if (range.table_format_params.table_format_type == "hive") { auto hive_reader = HiveParquetReader::create_unique( - _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr, + _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _io_ctx, + _state, &_is_file_slot, file_meta_cache_ptr, _state->query_options().enable_parquet_lazy_mat); hive_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { @@ -1420,7 +1419,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, if (!parquet_reader) { parquet_reader = ParquetReader::create_unique( _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr, + _io_ctx, _state, file_meta_cache_ptr, _state->query_options().enable_parquet_lazy_mat); } parquet_reader->set_create_row_id_column_iterator_func( @@ -1433,7 +1432,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, if (!parquet_reader) { parquet_reader = ParquetReader::create_unique( _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr, + _io_ctx, _state, file_meta_cache_ptr, _state->query_options().enable_parquet_lazy_mat); } init_status = static_cast(parquet_reader.get())->init_reader(&pctx); @@ -1460,7 +1459,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // TransactionalHiveReader IS-A OrcReader, no wrapping needed auto tran_orc_reader = TransactionalHiveReader::create_unique( _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr); + _io_ctx, file_meta_cache_ptr); tran_orc_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { return _create_row_id_column_iterator(); @@ -1473,7 +1472,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed std::unique_ptr iceberg_reader = IcebergOrcReader::create_unique( _kv_cache, _profile, _state, *_params, range, _state->batch_size(), - _state->timezone(), _io_ctx.get(), file_meta_cache_ptr); + _state->timezone(), _io_ctx, file_meta_cache_ptr); iceberg_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { return _create_row_id_column_iterator(); @@ -1486,7 +1485,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // PaimonOrcReader IS-A OrcReader, no wrapping needed auto paimon_reader = PaimonOrcReader::create_unique( _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _kv_cache, _io_ctx.get(), file_meta_cache_ptr); + _kv_cache, _io_ctx, file_meta_cache_ptr); init_status = static_cast(paimon_reader.get())->init_reader(&octx); _cur_reader = std::move(paimon_reader); @@ -1495,7 +1494,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // HudiOrcReader IS-A OrcReader, no wrapping needed auto hudi_reader = HudiOrcReader::create_unique(_profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr); + _io_ctx, file_meta_cache_ptr); init_status = static_cast(hudi_reader.get())->init_reader(&octx); _cur_reader = std::move(hudi_reader); @@ -1503,7 +1502,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, range.table_format_params.table_format_type == "hive") { auto hive_reader = HiveOrcReader::create_unique( _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr, + _io_ctx, &_is_file_slot, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat); hive_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { @@ -1515,10 +1514,9 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "tvf") { if (!orc_reader) { - orc_reader = OrcReader::create_unique(_profile, _state, *_params, range, - _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr, - _state->query_options().enable_orc_lazy_mat); + orc_reader = OrcReader::create_unique( + _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), + _io_ctx, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat); } orc_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { @@ -1528,10 +1526,9 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, _cur_reader = std::move(orc_reader); } else if (_is_load) { if (!orc_reader) { - orc_reader = OrcReader::create_unique(_profile, _state, *_params, range, - _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr, - _state->query_options().enable_orc_lazy_mat); + orc_reader = OrcReader::create_unique( + _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), + _io_ctx, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat); } init_status = static_cast(orc_reader.get())->init_reader(&octx); _cur_reader = std::move(orc_reader); @@ -1631,8 +1628,8 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, switch (format_type) { case TFileFormatType::FORMAT_PARQUET: { std::unique_ptr parquet_reader = ParquetReader::create_unique( - _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(), - _state, file_meta_cache_ptr, false); + _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx, _state, + file_meta_cache_ptr, false); RETURN_IF_ERROR( _init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader))); // _init_parquet_reader may create a new table-format specific reader @@ -1643,7 +1640,7 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, } case TFileFormatType::FORMAT_ORC: { std::unique_ptr orc_reader = OrcReader::create_unique( - _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(), + _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx, file_meta_cache_ptr, false); RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader))); // Same as above: re-apply read_by_rows to the actual _cur_reader. diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h index cd4066ec987ad8..fb3c291e5ce3aa 100644 --- a/be/src/exec/scan/file_scanner.h +++ b/be/src/exec/scan/file_scanner.h @@ -190,7 +190,7 @@ class FileScanner : public Scanner { std::unique_ptr _file_cache_statistics; std::unique_ptr _file_reader_stats; - std::unique_ptr _io_ctx; + std::shared_ptr _io_ctx; // Whether to fill partition columns from path, default is true. bool _fill_partition_from_path = true; @@ -294,7 +294,7 @@ class FileScanner : public Scanner { }; Status _init_io_ctx() { - _io_ctx.reset(new io::IOContext()); + _io_ctx = std::make_shared(); _io_ctx->query_id = &_state->query_id(); return Status::OK(); }; diff --git a/be/src/format/arrow/arrow_stream_reader.cpp b/be/src/format/arrow/arrow_stream_reader.cpp index 7d496d803a6248..7000c55507b63c 100644 --- a/be/src/format/arrow/arrow_stream_reader.cpp +++ b/be/src/format/arrow/arrow_stream_reader.cpp @@ -57,7 +57,8 @@ ArrowStreamReader::~ArrowStreamReader() = default; Status ArrowStreamReader::init_reader() { io::FileReaderSPtr file_reader; RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &file_reader, _state, false)); - _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), _io_ctx->file_reader_stats) : file_reader; _pip_stream = ArrowPipInputStream::create_unique(_file_reader); diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp index 266f569acbe9ae..731f9e61049713 100644 --- a/be/src/format/csv/csv_reader.cpp +++ b/be/src/format/csv/csv_reader.cpp @@ -666,7 +666,8 @@ Status CsvReader::_create_file_reader(bool need_schema) { io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size))); } - _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), _io_ctx->file_reader_stats) : file_reader; } diff --git a/be/src/format/json/new_json_reader.cpp b/be/src/format/json/new_json_reader.cpp index cc5208b7c30bf0..81ceab2f6b879a 100644 --- a/be/src/format/json/new_json_reader.cpp +++ b/be/src/format/json/new_json_reader.cpp @@ -513,7 +513,8 @@ Status NewJsonReader::_open_file_reader(bool need_schema) { io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } - _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), _io_ctx->file_reader_stats) : file_reader; } diff --git a/be/src/format/native/native_reader.cpp b/be/src/format/native/native_reader.cpp index cdf742c6925615..029d7ff2024f20 100644 --- a/be/src/format/native/native_reader.cpp +++ b/be/src/format/native/native_reader.cpp @@ -19,6 +19,8 @@ #include +#include + #include "core/block/block.h" #include "format/native/native_format.h" #include "io/file_factory.h" @@ -38,6 +40,16 @@ NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& _io_ctx(io_ctx), _state(state) {} +NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, + std::shared_ptr io_ctx_holder, RuntimeState* state) + : _profile(profile), + _scan_params(params), + _scan_range(range), + _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr), + _io_ctx_holder(std::move(io_ctx_holder)), + _state(state) {} + NativeReader::~NativeReader() { (void)close(); } @@ -127,15 +139,20 @@ Status NativeReader::init_reader() { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, file_description); - auto reader_res = io::DelegateReader::create_file_reader( - _profile, system_properties, file_description, reader_options, - io::DelegateReader::AccessMode::RANDOM, _io_ctx); + auto reader_res = + _io_ctx_holder ? io::DelegateReader::create_file_reader( + _profile, system_properties, file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, + std::static_pointer_cast(_io_ctx_holder)) + : io::DelegateReader::create_file_reader( + _profile, system_properties, file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, _io_ctx); if (!reader_res.has_value()) { return reader_res.error(); } _file_reader = reader_res.value(); - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { _file_reader = std::make_shared(_file_reader, _io_ctx->file_reader_stats); } diff --git a/be/src/format/native/native_reader.h b/be/src/format/native/native_reader.h index 796340c4d43e8d..83d15493310040 100644 --- a/be/src/format/native/native_reader.h +++ b/be/src/format/native/native_reader.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -51,6 +52,10 @@ class NativeReader : public TableFormatReader { NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state); + NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, std::shared_ptr io_ctx_holder, + RuntimeState* state); + ~NativeReader() override; // Initialize underlying file reader and any format specific state. @@ -80,6 +85,7 @@ class NativeReader : public TableFormatReader { io::FileReaderSPtr _file_reader; io::IOContext* _io_ctx = nullptr; + std::shared_ptr _io_ctx_holder; RuntimeState* _state = nullptr; bool _eof = false; diff --git a/be/src/format/orc/vorc_reader.cpp b/be/src/format/orc/vorc_reader.cpp index f41b4d0a7e572e..2b79363e983588 100644 --- a/be/src/format/orc/vorc_reader.cpp +++ b/be/src/format/orc/vorc_reader.cpp @@ -2288,7 +2288,7 @@ Status OrcReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) _reader_metrics.SelectedRowGroupCount); COUNTER_UPDATE(_orc_profile.evaluated_row_group_count, _reader_metrics.EvaluatedRowGroupCount); - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { _io_ctx->file_reader_stats->read_rows += _reader_metrics.ReadRowCount; } } @@ -3442,7 +3442,7 @@ void ORCFileInputStream::_build_small_ranges_input_stripe_streams( std::make_shared(_profile, _file_reader, merged_range); std::shared_ptr tracing_file_reader; - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { tracing_file_reader = std::make_shared( std::move(merge_range_file_reader), _io_ctx->file_reader_stats); } else { @@ -3475,7 +3475,8 @@ void ORCFileInputStream::_build_large_ranges_input_stripe_streams( for (const auto& range : ranges) { auto stripe_stream_input_stream = std::make_shared( getName(), - _io_ctx ? std::make_shared(_file_reader, + _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(_file_reader, _io_ctx->file_reader_stats) : _file_reader, _io_ctx, _profile); diff --git a/be/src/format/orc/vorc_reader.h b/be/src/format/orc/vorc_reader.h index 6d9f74ae4a0ace..9f13726fd514e3 100644 --- a/be/src/format/orc/vorc_reader.h +++ b/be/src/format/orc/vorc_reader.h @@ -904,9 +904,10 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { : _file_name(file_name), _inner_reader(inner_reader), _file_reader(inner_reader), - _tracing_file_reader(io_ctx ? std::make_shared( - _file_reader, io_ctx->file_reader_stats) - : _file_reader), + _tracing_file_reader(io_ctx && io_ctx->file_reader_stats + ? std::make_shared( + _file_reader, io_ctx->file_reader_stats) + : _file_reader), _orc_once_max_read_bytes(orc_once_max_read_bytes), _orc_max_merge_distance_bytes(orc_max_merge_distance_bytes), _io_ctx(io_ctx), diff --git a/be/src/format/parquet/vparquet_reader.cpp b/be/src/format/parquet/vparquet_reader.cpp index 8485d9e9d2a173..060e077cf5a579 100644 --- a/be/src/format/parquet/vparquet_reader.cpp +++ b/be/src/format/parquet/vparquet_reader.cpp @@ -315,10 +315,18 @@ Status ParquetReader::_open_file() { _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, _file_description); - _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, reader_options, - io::DelegateReader::AccessMode::RANDOM, _io_ctx)); - _tracing_file_reader = _io_ctx ? std::make_shared( + if (_io_ctx_holder) { + _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( + _profile, _system_properties, _file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, + std::static_pointer_cast(_io_ctx_holder))); + } else { + _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( + _profile, _system_properties, _file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, _io_ctx)); + } + _tracing_file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared( _file_reader, _io_ctx->file_reader_stats) : _file_reader; } @@ -898,7 +906,7 @@ Status ParquetReader::_next_row_group_reader() { } _reader_statistics.read_rows += candidate_row_ranges.count(); - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { _io_ctx->file_reader_stats->read_rows += candidate_row_ranges.count(); } @@ -948,7 +956,8 @@ Status ParquetReader::_next_row_group_reader() { : _file_reader; } _current_group_reader.reset(new RowGroupReader( - _io_ctx ? std::make_shared(group_file_reader, + _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(group_file_reader, _io_ctx->file_reader_stats) : group_file_reader, _read_table_columns, _current_row_group_index.row_group_id, row_group, _ctz, _io_ctx, diff --git a/be/src/format/table/hive_reader.h b/be/src/format/table/hive_reader.h index 9bcaa0536e7374..31a577f2dd9faa 100644 --- a/be/src/format/table/hive_reader.h +++ b/be/src/format/table/hive_reader.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include "format/orc/vorc_reader.h" @@ -35,6 +36,15 @@ class HiveOrcReader final : public OrcReader, public TableSchemaChangeHelper { enable_lazy_mat), _is_file_slot(is_file_slot) {} + HiveOrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, + const std::set* is_file_slot, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, enable_lazy_mat), + _is_file_slot(is_file_slot) {} + ~HiveOrcReader() final = default; protected: @@ -62,6 +72,15 @@ class HiveParquetReader final : public ParquetReader, public TableSchemaChangeHe enable_lazy_mat), _is_file_slot(is_file_slot) {} + HiveParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz, + std::shared_ptr io_ctx_holder, RuntimeState* state, + const std::set* is_file_slot, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : ParquetReader(profile, params, range, batch_size, ctz, std::move(io_ctx_holder), + state, meta_cache, enable_lazy_mat), + _is_file_slot(is_file_slot) {} + ~HiveParquetReader() final = default; protected: diff --git a/be/src/format/table/hudi_reader.h b/be/src/format/table/hudi_reader.h index 878f874b3edaea..4b032bc190a262 100644 --- a/be/src/format/table/hudi_reader.h +++ b/be/src/format/table/hudi_reader.h @@ -16,6 +16,7 @@ // under the License. #pragma once #include +#include #include #include "format/orc/vorc_reader.h" @@ -34,6 +35,12 @@ class HudiParquetReader final : public ParquetReader, public TableSchemaChangeHe FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true) : ParquetReader(profile, params, range, batch_size, ctz, io_ctx, state, meta_cache, enable_lazy_mat) {} + HudiParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz, + std::shared_ptr io_ctx_holder, RuntimeState* state, + FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true) + : ParquetReader(profile, params, range, batch_size, ctz, std::move(io_ctx_holder), + state, meta_cache, enable_lazy_mat) {} ~HudiParquetReader() final = default; protected: @@ -50,6 +57,12 @@ class HudiOrcReader final : public OrcReader, public TableSchemaChangeHelper { bool enable_lazy_mat = true) : OrcReader(profile, state, params, range, batch_size, ctz, io_ctx, meta_cache, enable_lazy_mat) {} + HudiOrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, enable_lazy_mat) {} ~HudiOrcReader() final = default; protected: diff --git a/be/src/format/table/iceberg_reader.h b/be/src/format/table/iceberg_reader.h index ab28b57dfa5656..d4156964efe1a9 100644 --- a/be/src/format/table/iceberg_reader.h +++ b/be/src/format/table/iceberg_reader.h @@ -76,6 +76,14 @@ class IcebergParquetReader final : public IcebergReaderMixin { : IcebergReaderMixin(kv_cache, profile, params, range, batch_size, ctz, io_ctx, state, meta_cache) {} + IcebergParquetReader(ShardedKVCache* kv_cache, RuntimeProfile* profile, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const cctz::time_zone* ctz, + std::shared_ptr io_ctx_holder, RuntimeState* state, + FileMetaCache* meta_cache) + : IcebergReaderMixin(kv_cache, profile, params, range, batch_size, ctz, + std::move(io_ctx_holder), state, meta_cache) {} + void set_delete_rows() final { // Call ParquetReader's set_delete_rows(const vector*) ParquetReader::set_delete_rows(_iceberg_delete_rows); @@ -113,6 +121,13 @@ class IcebergOrcReader final : public IcebergReaderMixin { : IcebergReaderMixin(kv_cache, profile, state, params, range, batch_size, ctz, io_ctx, meta_cache) {} + IcebergOrcReader(ShardedKVCache* kv_cache, RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, FileMetaCache* meta_cache) + : IcebergReaderMixin(kv_cache, profile, state, params, range, batch_size, + ctz, std::move(io_ctx_holder), meta_cache) {} + void set_delete_rows() final { // Call OrcReader's set_position_delete_rowids this->set_position_delete_rowids(_iceberg_delete_rows); diff --git a/be/src/format/table/paimon_reader.h b/be/src/format/table/paimon_reader.h index 0a916dfc094e42..42a5d211a9bc04 100644 --- a/be/src/format/table/paimon_reader.h +++ b/be/src/format/table/paimon_reader.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "format/orc/vorc_reader.h" @@ -42,6 +43,16 @@ class PaimonOrcReader final : public OrcReader, public TableSchemaChangeHelper { _kv_cache(kv_cache) { _init_paimon_profile(); } + PaimonOrcReader(RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const std::string& ctz, ShardedKVCache* kv_cache, + std::shared_ptr io_ctx_holder, + FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, enable_lazy_mat), + _kv_cache(kv_cache) { + _init_paimon_profile(); + } ~PaimonOrcReader() final = default; protected: @@ -77,6 +88,16 @@ class PaimonParquetReader final : public ParquetReader, public TableSchemaChange _kv_cache(kv_cache) { _init_paimon_profile(); } + PaimonParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz, + ShardedKVCache* kv_cache, std::shared_ptr io_ctx_holder, + RuntimeState* state, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : ParquetReader(profile, params, range, batch_size, ctz, std::move(io_ctx_holder), + state, meta_cache, enable_lazy_mat), + _kv_cache(kv_cache) { + _init_paimon_profile(); + } ~PaimonParquetReader() final = default; protected: diff --git a/be/src/format/table/transactional_hive_reader.cpp b/be/src/format/table/transactional_hive_reader.cpp index 2308d746b14b27..16b86bb6eca735 100644 --- a/be/src/format/table/transactional_hive_reader.cpp +++ b/be/src/format/table/transactional_hive_reader.cpp @@ -19,6 +19,8 @@ #include +#include + #include "core/data_type/data_type_factory.hpp" #include "format/orc/vorc_reader.h" #include "format/table/table_schema_change_helper.h" @@ -40,6 +42,21 @@ TransactionalHiveReader::TransactionalHiveReader(RuntimeProfile* profile, Runtim const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache) : OrcReader(profile, state, params, range, batch_size, ctz, io_ctx, meta_cache, false) { + _init_transactional_hive_profile(); +} + +TransactionalHiveReader::TransactionalHiveReader(RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, + const std::string& ctz, + std::shared_ptr io_ctx_holder, + FileMetaCache* meta_cache) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, false) { + _init_transactional_hive_profile(); +} + +void TransactionalHiveReader::_init_transactional_hive_profile() { static const char* transactional_hive_profile = "TransactionalHiveProfile"; ADD_TIMER(get_profile(), transactional_hive_profile); _transactional_orc_profile.num_delete_files = ADD_CHILD_COUNTER( diff --git a/be/src/format/table/transactional_hive_reader.h b/be/src/format/table/transactional_hive_reader.h index 77adf88aad046f..1f8c26a7ff298f 100644 --- a/be/src/format/table/transactional_hive_reader.h +++ b/be/src/format/table/transactional_hive_reader.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -53,6 +54,11 @@ class TransactionalHiveReader final : public OrcReader, public TableSchemaChange const TFileScanRangeParams& params, const TFileRangeDesc& range, size_t batch_size, const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache = nullptr); + TransactionalHiveReader(RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, + FileMetaCache* meta_cache = nullptr); ~TransactionalHiveReader() final = default; protected: @@ -69,6 +75,8 @@ class TransactionalHiveReader final : public OrcReader, public TableSchemaChange Status on_after_read_block(Block* block, size_t* read_rows) override; private: + void _init_transactional_hive_profile(); + struct TransactionalHiveProfile { RuntimeProfile::Counter* num_delete_files = nullptr; RuntimeProfile::Counter* num_delete_rows = nullptr; diff --git a/be/src/format/text/text_reader.cpp b/be/src/format/text/text_reader.cpp index 2fd9b749fbc6d2..0e6a4f89d27656 100644 --- a/be/src/format/text/text_reader.cpp +++ b/be/src/format/text/text_reader.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -113,8 +114,9 @@ void HiveTextFieldSplitter::_split_field_multi_char(const Slice& line, TextReader::TextReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector& file_slot_descs, size_t batch_size, - io::IOContext* io_ctx) - : CsvReader(state, profile, counter, params, range, file_slot_descs, batch_size, io_ctx) {} + io::IOContext* io_ctx, std::shared_ptr io_ctx_holder) + : CsvReader(state, profile, counter, params, range, file_slot_descs, batch_size, io_ctx, + std::move(io_ctx_holder)) {} Status TextReader::_init_options() { // get column_separator and line_delimiter diff --git a/be/src/format/text/text_reader.h b/be/src/format/text/text_reader.h index 677ede3f8bc6f3..c0cebf5da77ffd 100644 --- a/be/src/format/text/text_reader.h +++ b/be/src/format/text/text_reader.h @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -56,7 +57,7 @@ class TextReader : public CsvReader { TextReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector& file_slot_descs, size_t batch_size, - io::IOContext* io_ctx); + io::IOContext* io_ctx, std::shared_ptr io_ctx_holder = nullptr); ~TextReader() override = default;