diff --git a/be/src/exec/scan/file_scanner.cpp b/be/src/exec/scan/file_scanner.cpp index c0a79f9f38d9ec..5a0955fb55ffe0 100644 --- a/be/src/exec/scan/file_scanner.cpp +++ b/be/src/exec/scan/file_scanner.cpp @@ -1197,9 +1197,9 @@ Status FileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_DEFLATE: case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: case TFileFormatType::FORMAT_PROTO: { - auto reader = - CsvReader::create_unique(_state, _profile, &_counter, *_params, range, - _file_slot_descs, _state->batch_size(), _io_ctx.get()); + auto reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range, + _file_slot_descs, _state->batch_size(), nullptr, + _io_ctx); CsvInitContext csv_ctx; _fill_base_init_context(&csv_ctx); csv_ctx.is_load = _is_load; @@ -1209,8 +1209,8 @@ Status FileScanner::_get_next_reader() { } case TFileFormatType::FORMAT_TEXT: { auto reader = TextReader::create_unique(_state, _profile, &_counter, *_params, range, - _file_slot_descs, _state->batch_size(), - _io_ctx.get()); + _file_slot_descs, _state->batch_size(), nullptr, + _io_ctx); CsvInitContext text_ctx; _fill_base_init_context(&text_ctx); text_ctx.is_load = _is_load; @@ -1221,7 +1221,7 @@ Status FileScanner::_get_next_reader() { case TFileFormatType::FORMAT_JSON: { _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range, _file_slot_descs, &_scanner_eof, - _state->batch_size(), _io_ctx.get()); + _state->batch_size(), nullptr, _io_ctx); JsonInitContext json_ctx; _fill_base_init_context(&json_ctx); json_ctx.col_default_value_ctx = &_col_default_value_ctx; @@ -1239,8 +1239,7 @@ Status FileScanner::_get_next_reader() { break; } case TFileFormatType::FORMAT_NATIVE: { - auto reader = - NativeReader::create_unique(_profile, *_params, range, _io_ctx.get(), _state); + auto reader = NativeReader::create_unique(_profile, *_params, range, _io_ctx, _state); ReaderInitContext native_ctx; _fill_base_init_context(&native_ctx); init_status = static_cast(reader.get())->init_reader(&native_ctx); @@ -1382,7 +1381,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, // IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping needed std::unique_ptr iceberg_reader = IcebergParquetReader::create_unique( _kv_cache, _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr); + _io_ctx, _state, file_meta_cache_ptr); iceberg_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { return _create_row_id_column_iterator(); @@ -1394,21 +1393,21 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, // PaimonParquetReader IS-A ParquetReader, no wrapping needed auto paimon_reader = PaimonParquetReader::create_unique( _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _kv_cache, - _io_ctx.get(), _state, file_meta_cache_ptr); + _io_ctx, _state, file_meta_cache_ptr); init_status = static_cast(paimon_reader.get())->init_reader(&pctx); _cur_reader = std::move(paimon_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { // HudiParquetReader IS-A ParquetReader, no wrapping needed auto hudi_reader = HudiParquetReader::create_unique( - _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr); + _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _io_ctx, + _state, file_meta_cache_ptr); init_status = static_cast(hudi_reader.get())->init_reader(&pctx); _cur_reader = std::move(hudi_reader); } else if (range.table_format_params.table_format_type == "hive") { auto hive_reader = HiveParquetReader::create_unique( - _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr, + _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), _io_ctx, + _state, &_is_file_slot, file_meta_cache_ptr, _state->query_options().enable_parquet_lazy_mat); hive_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { @@ -1420,7 +1419,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, if (!parquet_reader) { parquet_reader = ParquetReader::create_unique( _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr, + _io_ctx, _state, file_meta_cache_ptr, _state->query_options().enable_parquet_lazy_mat); } parquet_reader->set_create_row_id_column_iterator_func( @@ -1433,7 +1432,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr, if (!parquet_reader) { parquet_reader = ParquetReader::create_unique( _profile, *_params, range, _state->batch_size(), &_state->timezone_obj(), - _io_ctx.get(), _state, file_meta_cache_ptr, + _io_ctx, _state, file_meta_cache_ptr, _state->query_options().enable_parquet_lazy_mat); } init_status = static_cast(parquet_reader.get())->init_reader(&pctx); @@ -1460,7 +1459,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // TransactionalHiveReader IS-A OrcReader, no wrapping needed auto tran_orc_reader = TransactionalHiveReader::create_unique( _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr); + _io_ctx, file_meta_cache_ptr); tran_orc_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { return _create_row_id_column_iterator(); @@ -1473,7 +1472,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed std::unique_ptr iceberg_reader = IcebergOrcReader::create_unique( _kv_cache, _profile, _state, *_params, range, _state->batch_size(), - _state->timezone(), _io_ctx.get(), file_meta_cache_ptr); + _state->timezone(), _io_ctx, file_meta_cache_ptr); iceberg_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { return _create_row_id_column_iterator(); @@ -1486,7 +1485,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // PaimonOrcReader IS-A OrcReader, no wrapping needed auto paimon_reader = PaimonOrcReader::create_unique( _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _kv_cache, _io_ctx.get(), file_meta_cache_ptr); + _kv_cache, _io_ctx, file_meta_cache_ptr); init_status = static_cast(paimon_reader.get())->init_reader(&octx); _cur_reader = std::move(paimon_reader); @@ -1495,7 +1494,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, // HudiOrcReader IS-A OrcReader, no wrapping needed auto hudi_reader = HudiOrcReader::create_unique(_profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr); + _io_ctx, file_meta_cache_ptr); init_status = static_cast(hudi_reader.get())->init_reader(&octx); _cur_reader = std::move(hudi_reader); @@ -1503,7 +1502,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, range.table_format_params.table_format_type == "hive") { auto hive_reader = HiveOrcReader::create_unique( _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), - _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr, + _io_ctx, &_is_file_slot, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat); hive_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { @@ -1515,10 +1514,9 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "tvf") { if (!orc_reader) { - orc_reader = OrcReader::create_unique(_profile, _state, *_params, range, - _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr, - _state->query_options().enable_orc_lazy_mat); + orc_reader = OrcReader::create_unique( + _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), + _io_ctx, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat); } orc_reader->set_create_row_id_column_iterator_func( [this]() -> std::shared_ptr { @@ -1528,10 +1526,9 @@ Status FileScanner::_init_orc_reader(FileMetaCache* file_meta_cache_ptr, _cur_reader = std::move(orc_reader); } else if (_is_load) { if (!orc_reader) { - orc_reader = OrcReader::create_unique(_profile, _state, *_params, range, - _state->batch_size(), _state->timezone(), - _io_ctx.get(), file_meta_cache_ptr, - _state->query_options().enable_orc_lazy_mat); + orc_reader = OrcReader::create_unique( + _profile, _state, *_params, range, _state->batch_size(), _state->timezone(), + _io_ctx, file_meta_cache_ptr, _state->query_options().enable_orc_lazy_mat); } init_status = static_cast(orc_reader.get())->init_reader(&octx); _cur_reader = std::move(orc_reader); @@ -1631,8 +1628,8 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, switch (format_type) { case TFileFormatType::FORMAT_PARQUET: { std::unique_ptr parquet_reader = ParquetReader::create_unique( - _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx.get(), - _state, file_meta_cache_ptr, false); + _profile, *_params, range, 1, &_state->timezone_obj(), _io_ctx, _state, + file_meta_cache_ptr, false); RETURN_IF_ERROR( _init_parquet_reader(file_meta_cache_ptr, std::move(parquet_reader))); // _init_parquet_reader may create a new table-format specific reader @@ -1643,7 +1640,7 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, } case TFileFormatType::FORMAT_ORC: { std::unique_ptr orc_reader = OrcReader::create_unique( - _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx.get(), + _profile, _state, *_params, range, 1, _state->timezone(), _io_ctx, file_meta_cache_ptr, false); RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr, std::move(orc_reader))); // Same as above: re-apply read_by_rows to the actual _cur_reader. diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h index cd4066ec987ad8..fb3c291e5ce3aa 100644 --- a/be/src/exec/scan/file_scanner.h +++ b/be/src/exec/scan/file_scanner.h @@ -190,7 +190,7 @@ class FileScanner : public Scanner { std::unique_ptr _file_cache_statistics; std::unique_ptr _file_reader_stats; - std::unique_ptr _io_ctx; + std::shared_ptr _io_ctx; // Whether to fill partition columns from path, default is true. bool _fill_partition_from_path = true; @@ -294,7 +294,7 @@ class FileScanner : public Scanner { }; Status _init_io_ctx() { - _io_ctx.reset(new io::IOContext()); + _io_ctx = std::make_shared(); _io_ctx->query_id = &_state->query_id(); return Status::OK(); }; diff --git a/be/src/format/arrow/arrow_stream_reader.cpp b/be/src/format/arrow/arrow_stream_reader.cpp index 7d496d803a6248..7000c55507b63c 100644 --- a/be/src/format/arrow/arrow_stream_reader.cpp +++ b/be/src/format/arrow/arrow_stream_reader.cpp @@ -57,7 +57,8 @@ ArrowStreamReader::~ArrowStreamReader() = default; Status ArrowStreamReader::init_reader() { io::FileReaderSPtr file_reader; RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &file_reader, _state, false)); - _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), _io_ctx->file_reader_stats) : file_reader; _pip_stream = ArrowPipInputStream::create_unique(_file_reader); diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp index 266f569acbe9ae..731f9e61049713 100644 --- a/be/src/format/csv/csv_reader.cpp +++ b/be/src/format/csv/csv_reader.cpp @@ -666,7 +666,8 @@ Status CsvReader::_create_file_reader(bool need_schema) { io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size))); } - _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), _io_ctx->file_reader_stats) : file_reader; } diff --git a/be/src/format/json/new_json_reader.cpp b/be/src/format/json/new_json_reader.cpp index cc5208b7c30bf0..81ceab2f6b879a 100644 --- a/be/src/format/json/new_json_reader.cpp +++ b/be/src/format/json/new_json_reader.cpp @@ -513,7 +513,8 @@ Status NewJsonReader::_open_file_reader(bool need_schema) { io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } - _file_reader = _io_ctx ? std::make_shared(std::move(file_reader), + _file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), _io_ctx->file_reader_stats) : file_reader; } diff --git a/be/src/format/native/native_reader.cpp b/be/src/format/native/native_reader.cpp index cdf742c6925615..029d7ff2024f20 100644 --- a/be/src/format/native/native_reader.cpp +++ b/be/src/format/native/native_reader.cpp @@ -19,6 +19,8 @@ #include +#include + #include "core/block/block.h" #include "format/native/native_format.h" #include "io/file_factory.h" @@ -38,6 +40,16 @@ NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& _io_ctx(io_ctx), _state(state) {} +NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, + std::shared_ptr io_ctx_holder, RuntimeState* state) + : _profile(profile), + _scan_params(params), + _scan_range(range), + _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr), + _io_ctx_holder(std::move(io_ctx_holder)), + _state(state) {} + NativeReader::~NativeReader() { (void)close(); } @@ -127,15 +139,20 @@ Status NativeReader::init_reader() { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, file_description); - auto reader_res = io::DelegateReader::create_file_reader( - _profile, system_properties, file_description, reader_options, - io::DelegateReader::AccessMode::RANDOM, _io_ctx); + auto reader_res = + _io_ctx_holder ? io::DelegateReader::create_file_reader( + _profile, system_properties, file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, + std::static_pointer_cast(_io_ctx_holder)) + : io::DelegateReader::create_file_reader( + _profile, system_properties, file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, _io_ctx); if (!reader_res.has_value()) { return reader_res.error(); } _file_reader = reader_res.value(); - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { _file_reader = std::make_shared(_file_reader, _io_ctx->file_reader_stats); } diff --git a/be/src/format/native/native_reader.h b/be/src/format/native/native_reader.h index 796340c4d43e8d..83d15493310040 100644 --- a/be/src/format/native/native_reader.h +++ b/be/src/format/native/native_reader.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -51,6 +52,10 @@ class NativeReader : public TableFormatReader { NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state); + NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, std::shared_ptr io_ctx_holder, + RuntimeState* state); + ~NativeReader() override; // Initialize underlying file reader and any format specific state. @@ -80,6 +85,7 @@ class NativeReader : public TableFormatReader { io::FileReaderSPtr _file_reader; io::IOContext* _io_ctx = nullptr; + std::shared_ptr _io_ctx_holder; RuntimeState* _state = nullptr; bool _eof = false; diff --git a/be/src/format/orc/vorc_reader.cpp b/be/src/format/orc/vorc_reader.cpp index f41b4d0a7e572e..2b79363e983588 100644 --- a/be/src/format/orc/vorc_reader.cpp +++ b/be/src/format/orc/vorc_reader.cpp @@ -2288,7 +2288,7 @@ Status OrcReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) _reader_metrics.SelectedRowGroupCount); COUNTER_UPDATE(_orc_profile.evaluated_row_group_count, _reader_metrics.EvaluatedRowGroupCount); - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { _io_ctx->file_reader_stats->read_rows += _reader_metrics.ReadRowCount; } } @@ -3442,7 +3442,7 @@ void ORCFileInputStream::_build_small_ranges_input_stripe_streams( std::make_shared(_profile, _file_reader, merged_range); std::shared_ptr tracing_file_reader; - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { tracing_file_reader = std::make_shared( std::move(merge_range_file_reader), _io_ctx->file_reader_stats); } else { @@ -3475,7 +3475,8 @@ void ORCFileInputStream::_build_large_ranges_input_stripe_streams( for (const auto& range : ranges) { auto stripe_stream_input_stream = std::make_shared( getName(), - _io_ctx ? std::make_shared(_file_reader, + _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(_file_reader, _io_ctx->file_reader_stats) : _file_reader, _io_ctx, _profile); diff --git a/be/src/format/orc/vorc_reader.h b/be/src/format/orc/vorc_reader.h index 6d9f74ae4a0ace..9f13726fd514e3 100644 --- a/be/src/format/orc/vorc_reader.h +++ b/be/src/format/orc/vorc_reader.h @@ -904,9 +904,10 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { : _file_name(file_name), _inner_reader(inner_reader), _file_reader(inner_reader), - _tracing_file_reader(io_ctx ? std::make_shared( - _file_reader, io_ctx->file_reader_stats) - : _file_reader), + _tracing_file_reader(io_ctx && io_ctx->file_reader_stats + ? std::make_shared( + _file_reader, io_ctx->file_reader_stats) + : _file_reader), _orc_once_max_read_bytes(orc_once_max_read_bytes), _orc_max_merge_distance_bytes(orc_max_merge_distance_bytes), _io_ctx(io_ctx), diff --git a/be/src/format/parquet/vparquet_reader.cpp b/be/src/format/parquet/vparquet_reader.cpp index 8485d9e9d2a173..060e077cf5a579 100644 --- a/be/src/format/parquet/vparquet_reader.cpp +++ b/be/src/format/parquet/vparquet_reader.cpp @@ -315,10 +315,18 @@ Status ParquetReader::_open_file() { _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, _file_description); - _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( - _profile, _system_properties, _file_description, reader_options, - io::DelegateReader::AccessMode::RANDOM, _io_ctx)); - _tracing_file_reader = _io_ctx ? std::make_shared( + if (_io_ctx_holder) { + _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( + _profile, _system_properties, _file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, + std::static_pointer_cast(_io_ctx_holder))); + } else { + _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( + _profile, _system_properties, _file_description, reader_options, + io::DelegateReader::AccessMode::RANDOM, _io_ctx)); + } + _tracing_file_reader = _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared( _file_reader, _io_ctx->file_reader_stats) : _file_reader; } @@ -898,7 +906,7 @@ Status ParquetReader::_next_row_group_reader() { } _reader_statistics.read_rows += candidate_row_ranges.count(); - if (_io_ctx) { + if (_io_ctx && _io_ctx->file_reader_stats) { _io_ctx->file_reader_stats->read_rows += candidate_row_ranges.count(); } @@ -948,7 +956,8 @@ Status ParquetReader::_next_row_group_reader() { : _file_reader; } _current_group_reader.reset(new RowGroupReader( - _io_ctx ? std::make_shared(group_file_reader, + _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(group_file_reader, _io_ctx->file_reader_stats) : group_file_reader, _read_table_columns, _current_row_group_index.row_group_id, row_group, _ctz, _io_ctx, diff --git a/be/src/format/table/hive_reader.h b/be/src/format/table/hive_reader.h index 9bcaa0536e7374..31a577f2dd9faa 100644 --- a/be/src/format/table/hive_reader.h +++ b/be/src/format/table/hive_reader.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include "format/orc/vorc_reader.h" @@ -35,6 +36,15 @@ class HiveOrcReader final : public OrcReader, public TableSchemaChangeHelper { enable_lazy_mat), _is_file_slot(is_file_slot) {} + HiveOrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, + const std::set* is_file_slot, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, enable_lazy_mat), + _is_file_slot(is_file_slot) {} + ~HiveOrcReader() final = default; protected: @@ -62,6 +72,15 @@ class HiveParquetReader final : public ParquetReader, public TableSchemaChangeHe enable_lazy_mat), _is_file_slot(is_file_slot) {} + HiveParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz, + std::shared_ptr io_ctx_holder, RuntimeState* state, + const std::set* is_file_slot, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : ParquetReader(profile, params, range, batch_size, ctz, std::move(io_ctx_holder), + state, meta_cache, enable_lazy_mat), + _is_file_slot(is_file_slot) {} + ~HiveParquetReader() final = default; protected: diff --git a/be/src/format/table/hudi_reader.h b/be/src/format/table/hudi_reader.h index 878f874b3edaea..4b032bc190a262 100644 --- a/be/src/format/table/hudi_reader.h +++ b/be/src/format/table/hudi_reader.h @@ -16,6 +16,7 @@ // under the License. #pragma once #include +#include #include #include "format/orc/vorc_reader.h" @@ -34,6 +35,12 @@ class HudiParquetReader final : public ParquetReader, public TableSchemaChangeHe FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true) : ParquetReader(profile, params, range, batch_size, ctz, io_ctx, state, meta_cache, enable_lazy_mat) {} + HudiParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz, + std::shared_ptr io_ctx_holder, RuntimeState* state, + FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true) + : ParquetReader(profile, params, range, batch_size, ctz, std::move(io_ctx_holder), + state, meta_cache, enable_lazy_mat) {} ~HudiParquetReader() final = default; protected: @@ -50,6 +57,12 @@ class HudiOrcReader final : public OrcReader, public TableSchemaChangeHelper { bool enable_lazy_mat = true) : OrcReader(profile, state, params, range, batch_size, ctz, io_ctx, meta_cache, enable_lazy_mat) {} + HudiOrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, enable_lazy_mat) {} ~HudiOrcReader() final = default; protected: diff --git a/be/src/format/table/iceberg_reader.h b/be/src/format/table/iceberg_reader.h index ab28b57dfa5656..d4156964efe1a9 100644 --- a/be/src/format/table/iceberg_reader.h +++ b/be/src/format/table/iceberg_reader.h @@ -76,6 +76,14 @@ class IcebergParquetReader final : public IcebergReaderMixin { : IcebergReaderMixin(kv_cache, profile, params, range, batch_size, ctz, io_ctx, state, meta_cache) {} + IcebergParquetReader(ShardedKVCache* kv_cache, RuntimeProfile* profile, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const cctz::time_zone* ctz, + std::shared_ptr io_ctx_holder, RuntimeState* state, + FileMetaCache* meta_cache) + : IcebergReaderMixin(kv_cache, profile, params, range, batch_size, ctz, + std::move(io_ctx_holder), state, meta_cache) {} + void set_delete_rows() final { // Call ParquetReader's set_delete_rows(const vector*) ParquetReader::set_delete_rows(_iceberg_delete_rows); @@ -113,6 +121,13 @@ class IcebergOrcReader final : public IcebergReaderMixin { : IcebergReaderMixin(kv_cache, profile, state, params, range, batch_size, ctz, io_ctx, meta_cache) {} + IcebergOrcReader(ShardedKVCache* kv_cache, RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, FileMetaCache* meta_cache) + : IcebergReaderMixin(kv_cache, profile, state, params, range, batch_size, + ctz, std::move(io_ctx_holder), meta_cache) {} + void set_delete_rows() final { // Call OrcReader's set_position_delete_rowids this->set_position_delete_rowids(_iceberg_delete_rows); diff --git a/be/src/format/table/paimon_reader.h b/be/src/format/table/paimon_reader.h index 0a916dfc094e42..42a5d211a9bc04 100644 --- a/be/src/format/table/paimon_reader.h +++ b/be/src/format/table/paimon_reader.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include "format/orc/vorc_reader.h" @@ -42,6 +43,16 @@ class PaimonOrcReader final : public OrcReader, public TableSchemaChangeHelper { _kv_cache(kv_cache) { _init_paimon_profile(); } + PaimonOrcReader(RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const std::string& ctz, ShardedKVCache* kv_cache, + std::shared_ptr io_ctx_holder, + FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, enable_lazy_mat), + _kv_cache(kv_cache) { + _init_paimon_profile(); + } ~PaimonOrcReader() final = default; protected: @@ -77,6 +88,16 @@ class PaimonParquetReader final : public ParquetReader, public TableSchemaChange _kv_cache(kv_cache) { _init_paimon_profile(); } + PaimonParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz, + ShardedKVCache* kv_cache, std::shared_ptr io_ctx_holder, + RuntimeState* state, FileMetaCache* meta_cache = nullptr, + bool enable_lazy_mat = true) + : ParquetReader(profile, params, range, batch_size, ctz, std::move(io_ctx_holder), + state, meta_cache, enable_lazy_mat), + _kv_cache(kv_cache) { + _init_paimon_profile(); + } ~PaimonParquetReader() final = default; protected: diff --git a/be/src/format/table/transactional_hive_reader.cpp b/be/src/format/table/transactional_hive_reader.cpp index 2308d746b14b27..16b86bb6eca735 100644 --- a/be/src/format/table/transactional_hive_reader.cpp +++ b/be/src/format/table/transactional_hive_reader.cpp @@ -19,6 +19,8 @@ #include +#include + #include "core/data_type/data_type_factory.hpp" #include "format/orc/vorc_reader.h" #include "format/table/table_schema_change_helper.h" @@ -40,6 +42,21 @@ TransactionalHiveReader::TransactionalHiveReader(RuntimeProfile* profile, Runtim const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache) : OrcReader(profile, state, params, range, batch_size, ctz, io_ctx, meta_cache, false) { + _init_transactional_hive_profile(); +} + +TransactionalHiveReader::TransactionalHiveReader(RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, + const TFileRangeDesc& range, size_t batch_size, + const std::string& ctz, + std::shared_ptr io_ctx_holder, + FileMetaCache* meta_cache) + : OrcReader(profile, state, params, range, batch_size, ctz, std::move(io_ctx_holder), + meta_cache, false) { + _init_transactional_hive_profile(); +} + +void TransactionalHiveReader::_init_transactional_hive_profile() { static const char* transactional_hive_profile = "TransactionalHiveProfile"; ADD_TIMER(get_profile(), transactional_hive_profile); _transactional_orc_profile.num_delete_files = ADD_CHILD_COUNTER( diff --git a/be/src/format/table/transactional_hive_reader.h b/be/src/format/table/transactional_hive_reader.h index 77adf88aad046f..1f8c26a7ff298f 100644 --- a/be/src/format/table/transactional_hive_reader.h +++ b/be/src/format/table/transactional_hive_reader.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -53,6 +54,11 @@ class TransactionalHiveReader final : public OrcReader, public TableSchemaChange const TFileScanRangeParams& params, const TFileRangeDesc& range, size_t batch_size, const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache = nullptr); + TransactionalHiveReader(RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + size_t batch_size, const std::string& ctz, + std::shared_ptr io_ctx_holder, + FileMetaCache* meta_cache = nullptr); ~TransactionalHiveReader() final = default; protected: @@ -69,6 +75,8 @@ class TransactionalHiveReader final : public OrcReader, public TableSchemaChange Status on_after_read_block(Block* block, size_t* read_rows) override; private: + void _init_transactional_hive_profile(); + struct TransactionalHiveProfile { RuntimeProfile::Counter* num_delete_files = nullptr; RuntimeProfile::Counter* num_delete_rows = nullptr; diff --git a/be/src/format/text/text_reader.cpp b/be/src/format/text/text_reader.cpp index 2fd9b749fbc6d2..0e6a4f89d27656 100644 --- a/be/src/format/text/text_reader.cpp +++ b/be/src/format/text/text_reader.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -113,8 +114,9 @@ void HiveTextFieldSplitter::_split_field_multi_char(const Slice& line, TextReader::TextReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector& file_slot_descs, size_t batch_size, - io::IOContext* io_ctx) - : CsvReader(state, profile, counter, params, range, file_slot_descs, batch_size, io_ctx) {} + io::IOContext* io_ctx, std::shared_ptr io_ctx_holder) + : CsvReader(state, profile, counter, params, range, file_slot_descs, batch_size, io_ctx, + std::move(io_ctx_holder)) {} Status TextReader::_init_options() { // get column_separator and line_delimiter diff --git a/be/src/format/text/text_reader.h b/be/src/format/text/text_reader.h index 677ede3f8bc6f3..c0cebf5da77ffd 100644 --- a/be/src/format/text/text_reader.h +++ b/be/src/format/text/text_reader.h @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -56,7 +57,7 @@ class TextReader : public CsvReader { TextReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector& file_slot_descs, size_t batch_size, - io::IOContext* io_ctx); + io::IOContext* io_ctx, std::shared_ptr io_ctx_holder = nullptr); ~TextReader() override = default;