From 94db78d25010eb90da3846618a4d387795f3c1af Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jun 2026 17:18:06 +0800 Subject: [PATCH] [feature](be) Add FileScannerV2 native reader ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: FileScannerV2 did not support Doris Native files. This change adds a native v2 FileReader implementation instead of wrapping the legacy NativeReader. The reader validates the Native header, reads serialized PBlock payloads, caches and replays the first block for schema probing, exposes nullable file-local schema, projects requested columns, and applies file-local filters. Shared materialized-column filtering is also used by JSON and delimited text readers so predicate accounting stays consistent. WAL is intentionally not implemented on the v2 path because current group commit WAL scans are load scans and FileScanOperator only selects FileScannerV2 when src_tuple_id does not resolve to an input tuple. ### Release note None ### Check List (For Author) - Test: - Style check: build-support/check-format.sh - Unit Test: not run locally because sandbox execution cannot write .git/modules for submodule setup and cannot download datasketches-cpp; the attempted run-be-ut command failed before compiling tests. - Behavior changed: No - Does this need documentation: No --- be/src/exec/scan/file_scanner_v2.cpp | 9 +- .../delimited_text/delimited_text_reader.cpp | 40 +- be/src/format_v2/file_reader.h | 1 + .../format_v2/jni/max_compute_jni_reader.cpp | 3 +- .../jni/trino_connector_jni_reader.cpp | 18 +- be/src/format_v2/json/json_reader.cpp | 26 +- be/src/format_v2/materialized_reader_util.cpp | 89 ++++ be/src/format_v2/materialized_reader_util.h | 63 +++ be/src/format_v2/native/native_reader.cpp | 311 +++++++++++++ be/src/format_v2/native/native_reader.h | 70 +++ be/src/format_v2/table_reader.cpp | 8 + be/src/format_v2/table_reader.h | 5 +- be/test/exec/scan/file_scanner_v2_test.cpp | 3 + be/test/format_v2/json/json_reader_test.cpp | 100 +++++ .../format_v2/native/native_reader_test.cpp | 419 ++++++++++++++++++ 15 files changed, 1093 insertions(+), 72 deletions(-) create mode 100644 be/src/format_v2/materialized_reader_util.cpp create mode 100644 be/src/format_v2/materialized_reader_util.h create mode 100644 be/src/format_v2/native/native_reader.cpp create mode 100644 be/src/format_v2/native/native_reader.h create mode 100644 be/test/format_v2/native/native_reader_test.cpp diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index 92398fe0bf5d88..1e3c3479ca77be 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -127,6 +127,10 @@ bool is_json_format(TFileFormatType::type format_type) { return format_type == TFileFormatType::FORMAT_JSON; } +bool is_native_format(TFileFormatType::type format_type) { + return format_type == TFileFormatType::FORMAT_NATIVE; +} + bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || column_name == BeConsts::ICEBERG_ROWID_COL) { @@ -221,7 +225,7 @@ bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFile } else if (format_type == TFileFormatType::FORMAT_JNI) { return is_supported_jni_table_format(range); } else if (is_csv_format(format_type) || is_text_format(format_type) || - is_json_format(format_type)) { + is_json_format(format_type) || is_native_format(format_type)) { return is_supported_table_format(range); } else { LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2"; @@ -615,6 +619,9 @@ Status FileScannerV2::_to_file_format(TFileFormatType::type format_type, case TFileFormatType::FORMAT_JSON: *file_format = format::FileFormat::JSON; return Status::OK(); + case TFileFormatType::FORMAT_NATIVE: + *file_format = format::FileFormat::NATIVE; + return Status::OK(); default: return Status::NotSupported("FileScannerV2 does not support file format {}", to_string(format_type)); diff --git a/be/src/format_v2/delimited_text/delimited_text_reader.cpp b/be/src/format_v2/delimited_text/delimited_text_reader.cpp index ba4986ee531740..f6e84b4aa7750e 100644 --- a/be/src/format_v2/delimited_text/delimited_text_reader.cpp +++ b/be/src/format_v2/delimited_text/delimited_text_reader.cpp @@ -32,9 +32,9 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_string.h" #include "core/data_type/data_type_struct.h" -#include "exprs/vexpr_context.h" #include "format/line_reader.h" #include "format_v2/column_mapper.h" +#include "format_v2/materialized_reader_util.h" #include "io/file_factory.h" #include "io/fs/tracing_file_reader.h" #include "runtime/descriptors.h" @@ -341,36 +341,14 @@ Status DelimitedTextReader::get_block(Block* file_block, size_t* rows, bool* eof const size_t rows_before_filter = *rows; update_counter(_text_profile.rows_read_before_filter, rows_before_filter); - size_t rows_after_delete_filter = rows_before_filter; - if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) { - { - SCOPED_TIMER(_text_profile.delete_conjunct_filter_time); - RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block, - file_block->columns())); - } - rows_after_delete_filter = - file_block->columns() == 0 ? rows_before_filter : file_block->rows(); - update_counter(_text_profile.rows_filtered_by_delete_conjunct, - rows_before_filter - rows_after_delete_filter); - } - - size_t rows_after_filter = rows_after_delete_filter; - if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) { - { - SCOPED_TIMER(_text_profile.conjunct_filter_time); - RETURN_IF_ERROR(VExprContext::filter_block(_request->conjuncts, file_block, - file_block->columns())); - } - rows_after_filter = - file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); - const auto rows_filtered_by_conjunct = rows_after_delete_filter - rows_after_filter; - update_counter(_text_profile.rows_filtered_by_conjunct, rows_filtered_by_conjunct); - if (_io_ctx != nullptr) { - _io_ctx->predicate_filtered_rows += rows_filtered_by_conjunct; - } - } - - *rows = rows_after_filter; + MaterializedReaderFilterProfile filter_profile; + filter_profile.delete_conjunct_filter_time = _text_profile.delete_conjunct_filter_time; + filter_profile.conjunct_filter_time = _text_profile.conjunct_filter_time; + filter_profile.rows_filtered_by_delete_conjunct = + _text_profile.rows_filtered_by_delete_conjunct; + filter_profile.rows_filtered_by_conjunct = _text_profile.rows_filtered_by_conjunct; + RETURN_IF_ERROR(apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, + rows, &filter_profile)); update_counter(_text_profile.rows_returned, *rows); _reader_statistics.read_rows += *rows; *eof = _line_reader_eof && *rows == 0; diff --git a/be/src/format_v2/file_reader.h b/be/src/format_v2/file_reader.h index 3f192ae093a47b..0cdd68c03fb3ef 100644 --- a/be/src/format_v2/file_reader.h +++ b/be/src/format_v2/file_reader.h @@ -117,6 +117,7 @@ enum class FileFormat { JSON, TEXT, JNI, + NATIVE, }; // 通用文件层 scan 请求。 diff --git a/be/src/format_v2/jni/max_compute_jni_reader.cpp b/be/src/format_v2/jni/max_compute_jni_reader.cpp index b86a3a1325a35d..a26e9e229b5d82 100644 --- a/be/src/format_v2/jni/max_compute_jni_reader.cpp +++ b/be/src/format_v2/jni/max_compute_jni_reader.cpp @@ -66,8 +66,7 @@ std::string MaxComputeJniReader::connector_class() const { return "org/apache/doris/maxcompute/MaxComputeJniScanner"; } -Status MaxComputeJniReader::build_scanner_params( - std::map* params) const { +Status MaxComputeJniReader::build_scanner_params(std::map* params) const { DORIS_CHECK(params != nullptr); DORIS_CHECK(_table_desc != nullptr); params->clear(); diff --git a/be/src/format_v2/jni/trino_connector_jni_reader.cpp b/be/src/format_v2/jni/trino_connector_jni_reader.cpp index 3ea50f6df52939..11c9945c5dea16 100644 --- a/be/src/format_v2/jni/trino_connector_jni_reader.cpp +++ b/be/src/format_v2/jni/trino_connector_jni_reader.cpp @@ -32,8 +32,7 @@ constexpr std::string_view TRINO_CONNECTOR_NAME = "connector.name"; Status TrinoConnectorJniReader::validate_scan_range(const TFileRangeDesc& range) const { if (!range.__isset.table_format_params) { - return Status::InternalError( - "missing table_format_params for trino connector jni reader"); + return Status::InternalError("missing table_format_params for trino connector jni reader"); } if (!range.table_format_params.__isset.trino_connector_params) { return Status::InternalError( @@ -52,8 +51,7 @@ Status TrinoConnectorJniReader::validate_scan_range(const TFileRangeDesc& range) "missing trino connector.name option for trino connector jni reader, possibly " "caused by FE/BE protocol mismatch"); } - if (!trino_params.__isset.trino_connector_split || - trino_params.trino_connector_split.empty()) { + if (!trino_params.__isset.trino_connector_split || trino_params.trino_connector_split.empty()) { return Status::InternalError( "missing trino_connector_split for trino connector jni reader, possibly caused " "by FE/BE protocol mismatch"); @@ -109,8 +107,7 @@ Status TrinoConnectorJniReader::build_scanner_params( (*params)["trino_connector_column_handles"] = trino_params.trino_connector_column_handles; (*params)["trino_connector_column_metadata"] = trino_params.trino_connector_column_metadata; (*params)["trino_connector_predicate"] = trino_params.trino_connector_predicate; - (*params)["trino_connector_trascation_handle"] = - trino_params.trino_connector_trascation_handle; + (*params)["trino_connector_trascation_handle"] = trino_params.trino_connector_trascation_handle; for (const auto& kv : trino_params.trino_connector_options) { (*params)[std::string(TRINO_CONNECTOR_OPTION_PREFIX) + kv.first] = kv.second; @@ -125,13 +122,12 @@ Status TrinoConnectorJniReader::_set_spi_plugins_dir() const { Jni::LocalClass plugin_loader_cls; const std::string plugin_loader_class = "org/apache/doris/trinoconnector/TrinoConnectorPluginLoader"; - RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, plugin_loader_class.c_str(), - &plugin_loader_cls)); + RETURN_IF_ERROR( + Jni::Util::get_jni_scanner_class(env, plugin_loader_class.c_str(), &plugin_loader_cls)); Jni::MethodId set_plugins_dir_method; - RETURN_IF_ERROR(plugin_loader_cls.get_static_method(env, "setPluginsDir", - "(Ljava/lang/String;)V", - &set_plugins_dir_method)); + RETURN_IF_ERROR(plugin_loader_cls.get_static_method( + env, "setPluginsDir", "(Ljava/lang/String;)V", &set_plugins_dir_method)); Jni::LocalString trino_connector_plugin_path; RETURN_IF_ERROR(Jni::LocalString::new_string( diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp index 04f74f52aefa9a..f0219bb7d85345 100644 --- a/be/src/format_v2/json/json_reader.cpp +++ b/be/src/format_v2/json/json_reader.cpp @@ -39,9 +39,9 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_string.h" #include "core/data_type/data_type_struct.h" -#include "exprs/vexpr_context.h" #include "format/file_reader/new_plain_text_line_reader.h" #include "format_v2/column_mapper.h" +#include "format_v2/materialized_reader_util.h" #include "io/file_factory.h" #include "io/fs/file_reader.h" #include "io/fs/stream_load_pipe.h" @@ -1063,29 +1063,7 @@ Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t } Status JsonReader::_apply_filters(Block* file_block, size_t* rows) { - DORIS_CHECK(file_block != nullptr); - DORIS_CHECK(rows != nullptr); - const size_t rows_before_filter = *rows; - size_t rows_after_delete_filter = rows_before_filter; - if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) { - RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block, - file_block->columns())); - rows_after_delete_filter = - file_block->columns() == 0 ? rows_before_filter : file_block->rows(); - } - - size_t rows_after_filter = rows_after_delete_filter; - if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) { - RETURN_IF_ERROR( - VExprContext::filter_block(_request->conjuncts, file_block, file_block->columns())); - rows_after_filter = - file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); - if (_io_ctx != nullptr) { - _io_ctx->predicate_filtered_rows += rows_after_delete_filter - rows_after_filter; - } - } - *rows = rows_after_filter; - return Status::OK(); + return apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, rows); } void JsonReader::_truncate_block_to_rows(Block* block, size_t num_rows) { diff --git a/be/src/format_v2/materialized_reader_util.cpp b/be/src/format_v2/materialized_reader_util.cpp new file mode 100644 index 00000000000000..a7e533633510c4 --- /dev/null +++ b/be/src/format_v2/materialized_reader_util.cpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/materialized_reader_util.h" + +#include + +#include "core/block/block.h" +#include "core/data_type/data_type_nullable.h" +#include "exprs/vexpr_context.h" +#include "format_v2/file_reader.h" +#include "io/io_common.h" + +namespace doris::format { +namespace { + +void update_counter(RuntimeProfile::Counter* counter, int64_t value) { + if (counter != nullptr) { + COUNTER_UPDATE(counter, value); + } +} + +} // namespace + +ColumnPtr make_column_nullable_if_needed(ColumnPtr column, const DataTypePtr& target_type) { + if (target_type != nullptr && target_type->is_nullable() && column.get() != nullptr && + !column->is_nullable()) { + return make_nullable(std::move(column)); + } + return column; +} + +Status apply_materialized_reader_filters(const FileScanRequest* request, io::IOContext* io_ctx, + Block* file_block, size_t* rows, + const MaterializedReaderFilterProfile* profile) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + const size_t rows_before_filter = *rows; + size_t rows_after_delete_filter = rows_before_filter; + if (request != nullptr && rows_before_filter > 0 && !request->delete_conjuncts.empty()) { + { + SCOPED_TIMER(profile == nullptr ? nullptr : profile->delete_conjunct_filter_time); + RETURN_IF_ERROR(VExprContext::filter_block(request->delete_conjuncts, file_block, + file_block->columns())); + } + rows_after_delete_filter = + file_block->columns() == 0 ? rows_before_filter : file_block->rows(); + if (profile != nullptr) { + update_counter(profile->rows_filtered_by_delete_conjunct, + rows_before_filter - rows_after_delete_filter); + } + } + + size_t rows_after_filter = rows_after_delete_filter; + if (request != nullptr && rows_after_delete_filter > 0 && !request->conjuncts.empty()) { + { + SCOPED_TIMER(profile == nullptr ? nullptr : profile->conjunct_filter_time); + RETURN_IF_ERROR(VExprContext::filter_block(request->conjuncts, file_block, + file_block->columns())); + } + rows_after_filter = + file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); + const auto rows_filtered_by_conjunct = rows_after_delete_filter - rows_after_filter; + if (profile != nullptr) { + update_counter(profile->rows_filtered_by_conjunct, rows_filtered_by_conjunct); + } + if (io_ctx != nullptr) { + io_ctx->predicate_filtered_rows += rows_filtered_by_conjunct; + } + } + *rows = rows_after_filter; + return Status::OK(); +} + +} // namespace doris::format diff --git a/be/src/format_v2/materialized_reader_util.h b/be/src/format_v2/materialized_reader_util.h new file mode 100644 index 00000000000000..2fb1383dfb9569 --- /dev/null +++ b/be/src/format_v2/materialized_reader_util.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "core/column/column.h" +#include "core/data_type/data_type.h" +#include "runtime/runtime_profile.h" + +namespace doris { +class Block; + +namespace io { +struct IOContext; +} // namespace io + +namespace format { +struct FileScanRequest; + +// Shared helpers for FileReader implementations that deserialize or build already materialized +// Doris columns and then hand those columns to TableReader for final mapping. +ColumnPtr make_column_nullable_if_needed(ColumnPtr column, const DataTypePtr& target_type); + +// Optional profile counters for text-like readers. Native/JSON do not expose per-reader filter +// counters today, so they call apply_materialized_reader_filters() without this struct. +struct MaterializedReaderFilterProfile { + RuntimeProfile::Counter* delete_conjunct_filter_time = nullptr; + RuntimeProfile::Counter* conjunct_filter_time = nullptr; + RuntimeProfile::Counter* rows_filtered_by_delete_conjunct = nullptr; + RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr; +}; + +// Applies file-local filters in the same order used by FileScannerV2 readers: +// 1. delete_conjuncts remove rows that should not be visible to the scan output; +// 2. conjuncts apply ordinary file-local predicates. +// +// Only ordinary conjunct filtering contributes to IOContext::predicate_filtered_rows. This matches +// the previous JSON/Text/CSV behavior and keeps scanner accounting separate from delete filtering. +// When `profile` is provided, the helper also updates text-reader timer and row counters so CSV +// and Hive text keep their existing observability after sharing this implementation. +Status apply_materialized_reader_filters(const FileScanRequest* request, io::IOContext* io_ctx, + Block* file_block, size_t* rows, + const MaterializedReaderFilterProfile* profile = nullptr); + +} // namespace format +} // namespace doris diff --git a/be/src/format_v2/native/native_reader.cpp b/be/src/format_v2/native/native_reader.cpp new file mode 100644 index 00000000000000..2a0a89f80adc8d --- /dev/null +++ b/be/src/format_v2/native/native_reader.cpp @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/native/native_reader.h" + +#include +#include + +#include "common/cast_set.h" +#include "core/block/block.h" +#include "core/data_type/data_type_factory.hpp" +#include "core/data_type/data_type_nullable.h" +#include "format/native/native_format.h" +#include "format_v2/column_mapper.h" +#include "format_v2/materialized_reader_util.h" +#include "io/file_factory.h" +#include "io/fs/tracing_file_reader.h" +#include "runtime/runtime_state.h" +#include "util/slice.h" + +namespace doris::format::native { +namespace { + +Status parse_native_pblock(const std::string& buffer, const std::string& path, PBlock* pblock) { + DORIS_CHECK(pblock != nullptr); + if (!pblock->ParseFromArray(buffer.data(), cast_set(buffer.size()))) { + return Status::InternalError("Failed to parse native PBlock from file {}", path); + } + return Status::OK(); +} + +} // namespace + +NativeReader::NativeReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile) + : FileReader(system_properties, file_description, std::move(io_ctx), profile) {} + +NativeReader::~NativeReader() { + static_cast(close()); +} + +Status NativeReader::init(RuntimeState* state) { + _runtime_state = state; + if (_file_description == nullptr) { + return Status::InvalidArgument("Native v2 reader requires file description"); + } + RETURN_IF_ERROR(FileReader::init(state)); + RETURN_IF_ERROR(_validate_and_consume_header()); + return Status::OK(); +} + +Status NativeReader::get_schema(std::vector* file_schema) const { + if (file_schema == nullptr) { + return Status::InvalidArgument("Native v2 file_schema is null"); + } + RETURN_IF_ERROR(_ensure_schema_loaded()); + *file_schema = _file_schema; + return Status::OK(); +} + +std::unique_ptr NativeReader::create_column_mapper( + TableColumnMapperOptions options) const { + return std::make_unique(std::move(options)); +} + +Status NativeReader::open(std::shared_ptr request) { + RETURN_IF_ERROR(FileReader::open(std::move(request))); + DORIS_CHECK(_request != nullptr); + _first_block_consumed = false; + _reader_eof = false; + _eof = false; + return Status::OK(); +} + +Status NativeReader::get_block(Block* file_block, size_t* rows, bool* eof) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + DORIS_CHECK(eof != nullptr); + if (_request == nullptr) { + return Status::InternalError("Native v2 reader is not open"); + } + + *rows = 0; + *eof = false; + if (_reader_eof) { + *eof = true; + _eof = true; + return Status::OK(); + } + + std::string buffer; + bool local_eof = false; + if (_first_block_loaded && !_first_block_consumed) { + buffer = _first_block_buffer; + } else { + RETURN_IF_ERROR(_read_next_pblock(&buffer, &local_eof)); + } + + if (local_eof && buffer.empty()) { + _reader_eof = true; + *eof = true; + _eof = true; + return Status::OK(); + } + if (buffer.empty()) { + return Status::InternalError("read empty native block from file {}", + _file_description->path); + } + + PBlock pblock; + RETURN_IF_ERROR(parse_native_pblock(buffer, _file_description->path, &pblock)); + if (!_schema_inited) { + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + Block source_block; + size_t uncompressed_bytes = 0; + int64_t decompress_time = 0; + RETURN_IF_ERROR(source_block.deserialize(pblock, &uncompressed_bytes, &decompress_time)); + RETURN_IF_ERROR(_materialize_requested_columns(source_block, file_block)); + *rows = file_block->rows(); + RETURN_IF_ERROR(_apply_filters(file_block, rows)); + _reader_statistics.read_rows += *rows; + + if (_first_block_loaded && !_first_block_consumed) { + _first_block_consumed = true; + } + if (_current_offset >= _file_size) { + _reader_eof = true; + } + *eof = _reader_eof && *rows == 0; + _eof = *eof; + return Status::OK(); +} + +Status NativeReader::close() { + _file_reader.reset(); + _tracing_file_reader.reset(); + _request.reset(); + _reader_eof = true; + _eof = true; + return Status::OK(); +} + +Status NativeReader::_validate_and_consume_header() { + DORIS_CHECK(_tracing_file_reader != nullptr); + _file_size = _tracing_file_reader->size(); + _current_offset = 0; + _reader_eof = (_file_size == 0); + + static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); + if (_reader_eof || _file_size < cast_set(HEADER_SIZE)) { + return Status::InternalError( + "invalid Doris Native file {}, file size {} is smaller than header size {}", + _file_description->path, _file_size, HEADER_SIZE); + } + + char header[HEADER_SIZE]; + Slice header_slice(header, sizeof(header)); + size_t bytes_read = 0; + RETURN_IF_ERROR(_tracing_file_reader->read_at(0, header_slice, &bytes_read, _io_ctx.get())); + if (bytes_read != sizeof(header)) { + return Status::InternalError( + "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes", + _file_description->path, sizeof(header), bytes_read); + } + if (std::memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) { + return Status::InternalError("invalid Doris Native magic header in file {}", + _file_description->path); + } + + uint32_t version = 0; + std::memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); + if (version != DORIS_NATIVE_FORMAT_VERSION) { + return Status::InternalError( + "unsupported Doris Native format version {} in file {}, expect {}", version, + _file_description->path, DORIS_NATIVE_FORMAT_VERSION); + } + + _current_offset = sizeof(header); + _reader_eof = (_file_size == _current_offset); + return Status::OK(); +} + +Status NativeReader::_ensure_schema_loaded() const { + if (_schema_inited) { + return Status::OK(); + } + if (!_first_block_loaded) { + bool local_eof = false; + RETURN_IF_ERROR(_read_next_pblock(&_first_block_buffer, &local_eof)); + if (local_eof && _first_block_buffer.empty()) { + return Status::EndOfFile("empty native file {}", _file_description->path); + } + if (_first_block_buffer.empty()) { + return Status::InternalError("first native block is empty {}", _file_description->path); + } + _first_block_loaded = true; + } + + PBlock pblock; + RETURN_IF_ERROR(parse_native_pblock(_first_block_buffer, _file_description->path, &pblock)); + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + return Status::OK(); +} + +Status NativeReader::_read_next_pblock(std::string* buffer, bool* eof) const { + DORIS_CHECK(buffer != nullptr); + DORIS_CHECK(eof != nullptr); + DORIS_CHECK(_tracing_file_reader != nullptr); + buffer->clear(); + *eof = false; + + if (_current_offset >= _file_size) { + *eof = true; + return Status::OK(); + } + + uint64_t block_len = 0; + Slice len_slice(reinterpret_cast(&block_len), sizeof(block_len)); + size_t bytes_read = 0; + RETURN_IF_ERROR( + _tracing_file_reader->read_at(_current_offset, len_slice, &bytes_read, _io_ctx.get())); + if (bytes_read == 0) { + *eof = true; + return Status::OK(); + } + if (bytes_read != sizeof(block_len)) { + return Status::InternalError( + "Failed to read native block length from file {}, expect {}, actual {}", + _file_description->path, sizeof(block_len), bytes_read); + } + _current_offset += sizeof(block_len); + if (block_len == 0) { + *eof = (_current_offset >= _file_size); + return Status::OK(); + } + + buffer->assign(block_len, '\0'); + Slice data_slice(buffer->data(), block_len); + bytes_read = 0; + RETURN_IF_ERROR( + _tracing_file_reader->read_at(_current_offset, data_slice, &bytes_read, _io_ctx.get())); + if (bytes_read != block_len) { + return Status::InternalError( + "Failed to read native block body from file {}, expect {}, actual {}", + _file_description->path, block_len, bytes_read); + } + _current_offset += block_len; + *eof = (_current_offset >= _file_size); + return Status::OK(); +} + +Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) const { + _file_schema.clear(); + _file_schema.reserve(pblock.column_metas_size()); + for (int idx = 0; idx < pblock.column_metas_size(); ++idx) { + const auto& meta = pblock.column_metas(idx); + ColumnDefinition field; + field.identifier = Field::create_field(meta.name()); + field.local_id = idx; + field.name = meta.name(); + field.type = make_nullable(DataTypeFactory::instance().create_data_type(meta)); + _file_schema.push_back(std::move(field)); + } + _schema_inited = true; + return Status::OK(); +} + +Status NativeReader::_materialize_requested_columns(const Block& source_block, + Block* file_block) const { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(_request != nullptr); + for (const auto& [file_column_id, block_position] : _request->local_positions) { + const auto source_idx = file_column_id.value(); + if (source_idx < 0 || cast_set(source_idx) >= source_block.columns()) { + return Status::InternalError("native file {} does not contain local column id {}", + _file_description->path, source_idx); + } + if (block_position.value() >= file_block->columns()) { + return Status::InternalError("native v2 request has invalid block position {}", + block_position.value()); + } + const auto& target = file_block->get_by_position(block_position.value()); + auto column = source_block.get_by_position(source_idx).column; + column = make_column_nullable_if_needed(std::move(column), target.type); + file_block->replace_by_position(block_position.value(), IColumn::mutate(std::move(column))); + } + return Status::OK(); +} + +Status NativeReader::_apply_filters(Block* file_block, size_t* rows) const { + return apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, rows); +} + +} // namespace doris::format::native diff --git a/be/src/format_v2/native/native_reader.h b/be/src/format_v2/native/native_reader.h new file mode 100644 index 00000000000000..3719a6afd6c4f5 --- /dev/null +++ b/be/src/format_v2/native/native_reader.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include "format_v2/file_reader.h" + +namespace doris::format::native { + +// FileScannerV2 reader for Doris Native files. +// +// Native files are self-describing only through the first serialized PBlock. TableReader asks for +// schema before open(), so this reader may read and cache that first PBlock during get_schema() and +// then replay it as the first data batch after open(). +class NativeReader final : public FileReader { +public: + NativeReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile); + ~NativeReader() override; + + Status init(RuntimeState* state) override; + Status get_schema(std::vector* file_schema) const override; + std::unique_ptr create_column_mapper( + TableColumnMapperOptions options) const override; + Status open(std::shared_ptr request) override; + Status get_block(Block* file_block, size_t* rows, bool* eof) override; + Status close() override; + +private: + Status _validate_and_consume_header(); + Status _ensure_schema_loaded() const; + Status _read_next_pblock(std::string* buffer, bool* eof) const; + Status _init_schema_from_pblock(const PBlock& pblock) const; + Status _materialize_requested_columns(const Block& source_block, Block* file_block) const; + Status _apply_filters(Block* file_block, size_t* rows) const; + + RuntimeState* _runtime_state = nullptr; + mutable int64_t _current_offset = 0; + mutable int64_t _file_size = 0; + mutable bool _reader_eof = true; + mutable bool _schema_inited = false; + mutable std::vector _file_schema; + mutable std::string _first_block_buffer; + mutable bool _first_block_loaded = false; + mutable bool _first_block_consumed = false; +}; + +} // namespace doris::format::native diff --git a/be/src/format_v2/table_reader.cpp b/be/src/format_v2/table_reader.cpp index d90d4f6ea337d1..5343a5a56323b6 100644 --- a/be/src/format_v2/table_reader.cpp +++ b/be/src/format_v2/table_reader.cpp @@ -44,6 +44,7 @@ #include "format_v2/delimited_text/csv_reader.h" #include "format_v2/delimited_text/text_reader.h" #include "format_v2/json/json_reader.h" +#include "format_v2/native/native_reader.h" #include "format_v2/parquet/parquet_reader.h" #include "roaring/roaring64map.hh" #include "storage/segment/condition_cache.h" @@ -80,6 +81,8 @@ std::string file_format_to_string(FileFormat format) { return "TEXT"; case FileFormat::JNI: return "JNI"; + case FileFormat::NATIVE: + return "NATIVE"; } return "UNKNOWN"; } @@ -738,6 +741,11 @@ Status TableReader::create_file_reader(std::unique_ptr* reader) { _current_range_compress_type, _current_range_load_id); return Status::OK(); } + if (_format == FileFormat::NATIVE) { + *reader = std::make_unique( + _system_properties, _current_task->data_file, _io_ctx, _scanner_profile); + return Status::OK(); + } return Status::NotSupported("TableReader does not support file format {}", file_format_to_string(_format)); } diff --git a/be/src/format_v2/table_reader.h b/be/src/format_v2/table_reader.h index a94cae621c9546..f139301c1a12d4 100644 --- a/be/src/format_v2/table_reader.h +++ b/be/src/format_v2/table_reader.h @@ -124,9 +124,8 @@ struct TableReadOptions { std::shared_ptr io_ctx; RuntimeState* runtime_state; RuntimeProfile* scanner_profile; - // File formats without self-describing metadata, such as CSV, need the original FE slot - // descriptors to build their file-local schema and deserialize values. Self-describing formats - // ignore this field and use metadata parsed from the file footer/header. + // File formats without complete self-describing metadata, such as CSV, Text, and JSON, need + // the FE-planned physical file slots to build their file-local schema and deserialize values. const std::vector* file_slot_descs = nullptr; // Push-down aggregate type. const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE; diff --git a/be/test/exec/scan/file_scanner_v2_test.cpp b/be/test/exec/scan/file_scanner_v2_test.cpp index d3f0507aca1122..43240cd7d17e49 100644 --- a/be/test/exec/scan/file_scanner_v2_test.cpp +++ b/be/test/exec/scan/file_scanner_v2_test.cpp @@ -107,6 +107,8 @@ TEST(FileScannerV2Test, SupportedFormatMatrix) { {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_CSV_PLAIN, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_TEXT, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_JSON, true}, + {"tvf", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_NATIVE, true}, + {"", TFileFormatType::FORMAT_WAL, std::nullopt, false}, }; for (const auto& test_case : cases) { @@ -180,6 +182,7 @@ TEST(FileScannerV2Test, FileFormatConversionMatrix) { {TFileFormatType::FORMAT_PROTO, format::FileFormat::CSV}, {TFileFormatType::FORMAT_TEXT, format::FileFormat::TEXT}, {TFileFormatType::FORMAT_JSON, format::FileFormat::JSON}, + {TFileFormatType::FORMAT_NATIVE, format::FileFormat::NATIVE}, {TFileFormatType::FORMAT_ORC, std::nullopt}, }; diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp index 994d71c0e6aefc..d968a9d5bee571 100644 --- a/be/test/format_v2/json/json_reader_test.cpp +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -35,7 +35,10 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" #include "format_v2/column_data.h" +#include "io/io_common.h" #include "runtime/descriptors.h" #include "runtime/runtime_profile.h" #include "testutil/mock/mock_runtime_state.h" @@ -216,6 +219,57 @@ bool nullable_is_null_at(const IColumn& column, size_t row) { return nullable.is_null_at(row); } +class NullableIntGreaterThanExpr final : public VExpr { +public: + NullableIntGreaterThanExpr(size_t block_position, int32_t value) + : VExpr(std::make_shared(), false), + _block_position(block_position), + _value(value) {} + + const std::string& expr_name() const override { return _name; } + + bool is_constant() const override { return false; } + + Status execute_column_impl(VExprContext*, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + DORIS_CHECK(block != nullptr); + const auto& nullable = + assert_cast(*block->get_by_position(_block_position).column); + const auto& data = assert_cast(nullable.get_nested_column()); + + auto result = ColumnUInt8::create(); + auto& result_data = result->get_data(); + result_data.resize(count); + for (size_t row = 0; row < count; ++row) { + const auto source_row = selector == nullptr ? row : (*selector)[row]; + result_data[row] = + !nullable.is_null_at(source_row) && data.get_element(source_row) > _value; + } + result_column = std::move(result); + return Status::OK(); + } + + Status clone_node(VExprSPtr* cloned_expr) const override { + DORIS_CHECK(cloned_expr != nullptr); + *cloned_expr = std::make_shared(_block_position, _value); + return Status::OK(); + } + +private: + size_t _block_position; + int32_t _value; + const std::string _name = "NullableIntGreaterThanExpr"; +}; + +VExprContextSPtr prepared_conjunct(RuntimeState* state, const VExprSPtr& expr) { + auto context = VExprContext::create_shared(expr); + auto status = context->prepare(state, RowDescriptor()); + EXPECT_TRUE(status.ok()) << status; + status = context->open(state); + EXPECT_TRUE(status.ok()) << status; + return context; +} + } // namespace TEST(JsonReaderTest, ReadsRequestedColumnsInFileScanRequestOrder) { @@ -399,4 +453,50 @@ TEST(JsonReaderTest, SkipsEmptyJsonLine) { EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "nancy"); } +// Scenario: JSON, Native, CSV, and Hive text all share the same file-local filter order: +// delete conjuncts run first, ordinary conjuncts run second, and only ordinary conjuncts contribute +// to IOContext::predicate_filtered_rows. This guards the JSON caller of the shared helper because +// CSV/Text already assert the optional profile-counter path. +TEST(JsonReaderTest, AppliesDeleteAndNormalConjunctsWithPredicateFilterAccounting) { + ObjectPool pool; + auto slots = build_slots(&pool); + const auto file_path = write_json_file("filters.jsonl", R"({"id":1,"name":"alice"})" + "\n" + R"({"id":2,"name":"bob"})" + "\n" + R"({"id":3,"name":"carol"})" + "\n"); + auto params = json_scan_params(); + auto range = file_range(file_path); + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto desc = file_description(file_path.string()); + RuntimeProfile profile("json_v2_reader_filter_test"); + MockRuntimeState state; + auto io_ctx = std::make_shared(); + JsonReader reader(system_properties, desc, io_ctx, &profile, ¶ms, range, slots); + + ASSERT_TRUE(reader.init(&state).ok()); + std::vector schema; + ASSERT_TRUE(reader.get_schema(&schema).ok()); + + auto request = std::make_shared(); + request->local_positions.emplace(LocalColumnId(0), LocalIndex(0)); + request->local_positions.emplace(LocalColumnId(1), LocalIndex(1)); + request->delete_conjuncts = { + prepared_conjunct(&state, std::make_shared(0, 1))}; + request->conjuncts = { + prepared_conjunct(&state, std::make_shared(0, 2))}; + ASSERT_TRUE(reader.open(request).ok()); + + auto block = make_block(schema, {0, 1}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader.get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 1); + EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 3); + EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 0), "carol"); + EXPECT_EQ(io_ctx->predicate_filtered_rows, 1); +} + } // namespace doris::format::json diff --git a/be/test/format_v2/native/native_reader_test.cpp b/be/test/format_v2/native/native_reader_test.cpp new file mode 100644 index 00000000000000..aaa7aa90e0681e --- /dev/null +++ b/be/test/format_v2/native/native_reader_test.cpp @@ -0,0 +1,419 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/native/native_reader.h" + +#include + +#include +#include +#include +#include +#include + +#include "agent/be_exec_version_manager.h" +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" +#include "format/native/native_format.h" +#include "format_v2/column_mapper.h" +#include "io/fs/local_file_system.h" +#include "io/io_common.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_profile.h" +#include "runtime/runtime_state.h" +#include "util/coding.h" +#include "util/uid_util.h" + +namespace doris::format::native { +namespace { + +std::unique_ptr file_description(const std::string& path) { + auto desc = std::make_unique(); + desc->path = path; + desc->file_size = static_cast(std::filesystem::file_size(path)); + desc->range_start_offset = 0; + desc->range_size = desc->file_size; + return desc; +} + +Status write_file(const std::string& path, std::string_view content) { + io::FileWriterPtr writer; + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &writer)); + if (!content.empty()) { + RETURN_IF_ERROR(writer->append({content.data(), content.size()})); + } + return writer->close(); +} + +std::unique_ptr create_reader(const std::string& path, RuntimeState* state, + RuntimeProfile* profile, + std::shared_ptr io_ctx = nullptr) { + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto desc = file_description(path); + return std::make_unique(system_properties, desc, std::move(io_ctx), profile); +} + +Block make_source_block() { + auto id_column = ColumnInt32::create(); + id_column->insert_value(10); + id_column->insert_value(20); + + auto name_column = ColumnString::create(); + name_column->insert_data("alice", 5); + name_column->insert_data("bob", 3); + + Block block; + block.insert({id_column->get_ptr(), std::make_shared(), "id"}); + block.insert({name_column->get_ptr(), std::make_shared(), "name"}); + return block; +} + +Status write_native_file(const std::string& path, const Block& block) { + io::FileWriterPtr writer; + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &writer)); + RETURN_IF_ERROR(writer->append({DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)})); + + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + RETURN_IF_ERROR(writer->append({version_buffer, sizeof(version_buffer)})); + + PBlock pblock; + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + int64_t compressed_time = 0; + RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, &compressed_time, + segment_v2::CompressionTypePB::SNAPPY)); + + const std::string payload = pblock.SerializeAsString(); + uint8_t len_buffer[sizeof(uint64_t)]; + encode_fixed64_le(len_buffer, payload.size()); + RETURN_IF_ERROR(writer->append({len_buffer, sizeof(len_buffer)})); + RETURN_IF_ERROR(writer->append(payload)); + return writer->close(); +} + +Block make_request_block(const std::vector& schema, + const std::vector& local_ids) { + Block block; + for (const auto local_id : local_ids) { + const auto it = std::find_if(schema.begin(), schema.end(), [&](const auto& column) { + return column.local_id == local_id; + }); + DORIS_CHECK(it != schema.end()); + block.insert({it->type->create_column(), it->type, it->name}); + } + return block; +} + +int32_t nullable_int_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data()[row]; +} + +std::string nullable_string_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data_at(row).to_string(); +} + +class NullableIntGreaterThanExpr final : public VExpr { +public: + NullableIntGreaterThanExpr(size_t block_position, int32_t value) + : VExpr(std::make_shared(), false), + _block_position(block_position), + _value(value) {} + + const std::string& expr_name() const override { return _name; } + + bool is_constant() const override { return false; } + + Status execute_column_impl(VExprContext*, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + DORIS_CHECK(block != nullptr); + const auto& nullable = + assert_cast(*block->get_by_position(_block_position).column); + const auto& data = assert_cast(nullable.get_nested_column()); + + auto result = ColumnUInt8::create(); + auto& result_data = result->get_data(); + result_data.resize(count); + for (size_t row = 0; row < count; ++row) { + const auto source_row = selector == nullptr ? row : (*selector)[row]; + result_data[row] = + !nullable.is_null_at(source_row) && data.get_element(source_row) > _value; + } + result_column = std::move(result); + return Status::OK(); + } + + Status clone_node(VExprSPtr* cloned_expr) const override { + DORIS_CHECK(cloned_expr != nullptr); + *cloned_expr = std::make_shared(_block_position, _value); + return Status::OK(); + } + +private: + size_t _block_position; + int32_t _value; + const std::string _name = "NullableIntGreaterThanExpr"; +}; + +VExprContextSPtr prepared_conjunct(RuntimeState* state, const VExprSPtr& expr) { + auto context = VExprContext::create_shared(expr); + auto status = context->prepare(state, RowDescriptor()); + EXPECT_TRUE(status.ok()) << status; + status = context->open(state); + EXPECT_TRUE(status.ok()) << status; + return context; +} + +} // namespace + +TEST(NativeV2ReaderTest, SchemaProbeReplaysFirstBlockAndProjectsColumns) { + const auto path = "./log/native_v2_reader_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + ASSERT_TRUE(write_native_file(path, make_source_block()).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_test"); + auto reader = create_reader(path, &state, &profile); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + ASSERT_EQ(schema.size(), 2); + EXPECT_EQ(schema[0].name, "id"); + EXPECT_EQ(schema[0].local_id, 0); + EXPECT_EQ(schema[1].name, "name"); + EXPECT_EQ(schema[1].local_id, 1); + EXPECT_TRUE(schema[0].type->is_nullable()); + EXPECT_TRUE(schema[1].type->is_nullable()); + + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(0)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_request_block(schema, {1, 0}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 2); + EXPECT_FALSE(eof); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 0), "alice"); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 1), "bob"); + EXPECT_EQ(nullable_int_at(*block.get_by_position(1).column, 0), 10); + EXPECT_EQ(nullable_int_at(*block.get_by_position(1).column, 1), 20); + + block.clear_column_data(2); + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_EQ(rows, 0); + EXPECT_TRUE(eof); + ASSERT_TRUE(reader->close().ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +TEST(NativeV2ReaderTest, AppliesConjunctsAndTracksPredicateFilteredRows) { + const auto path = + "./log/native_v2_reader_filter_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + ASSERT_TRUE(write_native_file(path, make_source_block()).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_filter_test"); + auto io_ctx = std::make_shared(); + auto reader = create_reader(path, &state, &profile, io_ctx); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(0)).ok()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + request->conjuncts = { + prepared_conjunct(&state, std::make_shared(0, 10))}; + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_request_block(schema, {0, 1}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 1); + EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 20); + EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 0), "bob"); + EXPECT_EQ(io_ctx->predicate_filtered_rows, 1); + ASSERT_TRUE(reader->close().ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +TEST(NativeV2ReaderTest, RejectsInvalidHeaderAndEmptyFile) { + std::filesystem::create_directories("./log"); + RuntimeState state; + RuntimeProfile profile("native_v2_reader_bad_header_test"); + + const auto bad_magic_path = + "./log/native_v2_bad_magic_" + UniqueId::gen_uid().to_string() + ".native"; + std::string bad_magic(sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t), '\0'); + bad_magic.replace(0, 4, "BAD!"); + ASSERT_TRUE(write_file(bad_magic_path, bad_magic).ok()); + auto bad_magic_reader = create_reader(bad_magic_path, &state, &profile); + EXPECT_FALSE(bad_magic_reader->init(&state).ok()); + static_cast(io::global_local_filesystem()->delete_file(bad_magic_path)); + + const auto empty_path = "./log/native_v2_empty_" + UniqueId::gen_uid().to_string() + ".native"; + ASSERT_TRUE(write_file(empty_path, "").ok()); + auto empty_reader = create_reader(empty_path, &state, &profile); + EXPECT_FALSE(empty_reader->init(&state).ok()); + static_cast(io::global_local_filesystem()->delete_file(empty_path)); +} + +TEST(NativeV2ReaderTest, RejectsUnsupportedVersionAndHeaderOnlyFile) { + std::filesystem::create_directories("./log"); + RuntimeState state; + RuntimeProfile profile("native_v2_reader_header_boundary_test"); + + const auto bad_version_path = + "./log/native_v2_bad_version_" + UniqueId::gen_uid().to_string() + ".native"; + std::string bad_version; + bad_version.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION + 1); + bad_version.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + ASSERT_TRUE(write_file(bad_version_path, bad_version).ok()); + auto bad_version_reader = create_reader(bad_version_path, &state, &profile); + EXPECT_FALSE(bad_version_reader->init(&state).ok()); + static_cast(io::global_local_filesystem()->delete_file(bad_version_path)); + + const auto header_only_path = + "./log/native_v2_header_only_" + UniqueId::gen_uid().to_string() + ".native"; + std::string header_only; + header_only.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + header_only.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + ASSERT_TRUE(write_file(header_only_path, header_only).ok()); + auto header_only_reader = create_reader(header_only_path, &state, &profile); + ASSERT_TRUE(header_only_reader->init(&state).ok()); + std::vector schema; + EXPECT_FALSE(header_only_reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(header_only_path)); +} + +TEST(NativeV2ReaderTest, RejectsTruncatedBlockDuringSchemaProbe) { + const auto path = "./log/native_v2_truncated_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + + std::string content; + content.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + content.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + uint8_t len_buffer[sizeof(uint64_t)]; + encode_fixed64_le(len_buffer, 8); + content.append(reinterpret_cast(len_buffer), sizeof(len_buffer)); + content.append("x"); + ASSERT_TRUE(write_file(path, content).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_truncated_test"); + auto reader = create_reader(path, &state, &profile); + ASSERT_TRUE(reader->init(&state).ok()); + std::vector schema; + EXPECT_FALSE(reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +TEST(NativeV2ReaderTest, RejectsZeroLengthBlockAndInvalidPBlock) { + std::filesystem::create_directories("./log"); + RuntimeState state; + RuntimeProfile profile("native_v2_reader_bad_block_test"); + + auto build_header = [] { + std::string content; + content.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + content.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + return content; + }; + + const auto zero_len_path = + "./log/native_v2_zero_len_" + UniqueId::gen_uid().to_string() + ".native"; + auto zero_len_content = build_header(); + uint8_t len_buffer[sizeof(uint64_t)]; + encode_fixed64_le(len_buffer, 0); + zero_len_content.append(reinterpret_cast(len_buffer), sizeof(len_buffer)); + ASSERT_TRUE(write_file(zero_len_path, zero_len_content).ok()); + auto zero_len_reader = create_reader(zero_len_path, &state, &profile); + ASSERT_TRUE(zero_len_reader->init(&state).ok()); + std::vector schema; + EXPECT_FALSE(zero_len_reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(zero_len_path)); + + const auto invalid_pblock_path = + "./log/native_v2_invalid_pblock_" + UniqueId::gen_uid().to_string() + ".native"; + auto invalid_pblock_content = build_header(); + encode_fixed64_le(len_buffer, 1); + invalid_pblock_content.append(reinterpret_cast(len_buffer), sizeof(len_buffer)); + invalid_pblock_content.append("x"); + ASSERT_TRUE(write_file(invalid_pblock_path, invalid_pblock_content).ok()); + auto invalid_pblock_reader = create_reader(invalid_pblock_path, &state, &profile); + ASSERT_TRUE(invalid_pblock_reader->init(&state).ok()); + schema.clear(); + EXPECT_FALSE(invalid_pblock_reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(invalid_pblock_path)); +} + +TEST(NativeV2ReaderTest, RejectsUnknownRequestedLocalColumn) { + const auto path = + "./log/native_v2_unknown_column_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + ASSERT_TRUE(write_native_file(path, make_source_block()).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_unknown_column_test"); + auto reader = create_reader(path, &state, &profile); + ASSERT_TRUE(reader->init(&state).ok()); + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(42)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + Block block; + block.insert({schema[0].type->create_column(), schema[0].type, schema[0].name}); + size_t rows = 0; + bool eof = false; + EXPECT_FALSE(reader->get_block(&block, &rows, &eof).ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +} // namespace doris::format::native