diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index 92398fe0bf5d88..1e3c3479ca77be 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -127,6 +127,10 @@ bool is_json_format(TFileFormatType::type format_type) { return format_type == TFileFormatType::FORMAT_JSON; } +bool is_native_format(TFileFormatType::type format_type) { + return format_type == TFileFormatType::FORMAT_NATIVE; +} + bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || column_name == BeConsts::ICEBERG_ROWID_COL) { @@ -221,7 +225,7 @@ bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFile } else if (format_type == TFileFormatType::FORMAT_JNI) { return is_supported_jni_table_format(range); } else if (is_csv_format(format_type) || is_text_format(format_type) || - is_json_format(format_type)) { + is_json_format(format_type) || is_native_format(format_type)) { return is_supported_table_format(range); } else { LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2"; @@ -615,6 +619,9 @@ Status FileScannerV2::_to_file_format(TFileFormatType::type format_type, case TFileFormatType::FORMAT_JSON: *file_format = format::FileFormat::JSON; return Status::OK(); + case TFileFormatType::FORMAT_NATIVE: + *file_format = format::FileFormat::NATIVE; + return Status::OK(); default: return Status::NotSupported("FileScannerV2 does not support file format {}", to_string(format_type)); diff --git a/be/src/format_v2/delimited_text/delimited_text_reader.cpp b/be/src/format_v2/delimited_text/delimited_text_reader.cpp index ba4986ee531740..f6e84b4aa7750e 100644 --- a/be/src/format_v2/delimited_text/delimited_text_reader.cpp +++ b/be/src/format_v2/delimited_text/delimited_text_reader.cpp @@ -32,9 +32,9 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_string.h" #include "core/data_type/data_type_struct.h" -#include "exprs/vexpr_context.h" #include "format/line_reader.h" #include "format_v2/column_mapper.h" +#include "format_v2/materialized_reader_util.h" #include "io/file_factory.h" #include "io/fs/tracing_file_reader.h" #include "runtime/descriptors.h" @@ -341,36 +341,14 @@ Status DelimitedTextReader::get_block(Block* file_block, size_t* rows, bool* eof const size_t rows_before_filter = *rows; update_counter(_text_profile.rows_read_before_filter, rows_before_filter); - size_t rows_after_delete_filter = rows_before_filter; - if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) { - { - SCOPED_TIMER(_text_profile.delete_conjunct_filter_time); - RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block, - file_block->columns())); - } - rows_after_delete_filter = - file_block->columns() == 0 ? rows_before_filter : file_block->rows(); - update_counter(_text_profile.rows_filtered_by_delete_conjunct, - rows_before_filter - rows_after_delete_filter); - } - - size_t rows_after_filter = rows_after_delete_filter; - if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) { - { - SCOPED_TIMER(_text_profile.conjunct_filter_time); - RETURN_IF_ERROR(VExprContext::filter_block(_request->conjuncts, file_block, - file_block->columns())); - } - rows_after_filter = - file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); - const auto rows_filtered_by_conjunct = rows_after_delete_filter - rows_after_filter; - update_counter(_text_profile.rows_filtered_by_conjunct, rows_filtered_by_conjunct); - if (_io_ctx != nullptr) { - _io_ctx->predicate_filtered_rows += rows_filtered_by_conjunct; - } - } - - *rows = rows_after_filter; + MaterializedReaderFilterProfile filter_profile; + filter_profile.delete_conjunct_filter_time = _text_profile.delete_conjunct_filter_time; + filter_profile.conjunct_filter_time = _text_profile.conjunct_filter_time; + filter_profile.rows_filtered_by_delete_conjunct = + _text_profile.rows_filtered_by_delete_conjunct; + filter_profile.rows_filtered_by_conjunct = _text_profile.rows_filtered_by_conjunct; + RETURN_IF_ERROR(apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, + rows, &filter_profile)); update_counter(_text_profile.rows_returned, *rows); _reader_statistics.read_rows += *rows; *eof = _line_reader_eof && *rows == 0; diff --git a/be/src/format_v2/file_reader.h b/be/src/format_v2/file_reader.h index 3f192ae093a47b..0cdd68c03fb3ef 100644 --- a/be/src/format_v2/file_reader.h +++ b/be/src/format_v2/file_reader.h @@ -117,6 +117,7 @@ enum class FileFormat { JSON, TEXT, JNI, + NATIVE, }; // 通用文件层 scan 请求。 diff --git a/be/src/format_v2/jni/max_compute_jni_reader.cpp b/be/src/format_v2/jni/max_compute_jni_reader.cpp index b86a3a1325a35d..a26e9e229b5d82 100644 --- a/be/src/format_v2/jni/max_compute_jni_reader.cpp +++ b/be/src/format_v2/jni/max_compute_jni_reader.cpp @@ -66,8 +66,7 @@ std::string MaxComputeJniReader::connector_class() const { return "org/apache/doris/maxcompute/MaxComputeJniScanner"; } -Status MaxComputeJniReader::build_scanner_params( - std::map* params) const { +Status MaxComputeJniReader::build_scanner_params(std::map* params) const { DORIS_CHECK(params != nullptr); DORIS_CHECK(_table_desc != nullptr); params->clear(); diff --git a/be/src/format_v2/jni/trino_connector_jni_reader.cpp b/be/src/format_v2/jni/trino_connector_jni_reader.cpp index 3ea50f6df52939..11c9945c5dea16 100644 --- a/be/src/format_v2/jni/trino_connector_jni_reader.cpp +++ b/be/src/format_v2/jni/trino_connector_jni_reader.cpp @@ -32,8 +32,7 @@ constexpr std::string_view TRINO_CONNECTOR_NAME = "connector.name"; Status TrinoConnectorJniReader::validate_scan_range(const TFileRangeDesc& range) const { if (!range.__isset.table_format_params) { - return Status::InternalError( - "missing table_format_params for trino connector jni reader"); + return Status::InternalError("missing table_format_params for trino connector jni reader"); } if (!range.table_format_params.__isset.trino_connector_params) { return Status::InternalError( @@ -52,8 +51,7 @@ Status TrinoConnectorJniReader::validate_scan_range(const TFileRangeDesc& range) "missing trino connector.name option for trino connector jni reader, possibly " "caused by FE/BE protocol mismatch"); } - if (!trino_params.__isset.trino_connector_split || - trino_params.trino_connector_split.empty()) { + if (!trino_params.__isset.trino_connector_split || trino_params.trino_connector_split.empty()) { return Status::InternalError( "missing trino_connector_split for trino connector jni reader, possibly caused " "by FE/BE protocol mismatch"); @@ -109,8 +107,7 @@ Status TrinoConnectorJniReader::build_scanner_params( (*params)["trino_connector_column_handles"] = trino_params.trino_connector_column_handles; (*params)["trino_connector_column_metadata"] = trino_params.trino_connector_column_metadata; (*params)["trino_connector_predicate"] = trino_params.trino_connector_predicate; - (*params)["trino_connector_trascation_handle"] = - trino_params.trino_connector_trascation_handle; + (*params)["trino_connector_trascation_handle"] = trino_params.trino_connector_trascation_handle; for (const auto& kv : trino_params.trino_connector_options) { (*params)[std::string(TRINO_CONNECTOR_OPTION_PREFIX) + kv.first] = kv.second; @@ -125,13 +122,12 @@ Status TrinoConnectorJniReader::_set_spi_plugins_dir() const { Jni::LocalClass plugin_loader_cls; const std::string plugin_loader_class = "org/apache/doris/trinoconnector/TrinoConnectorPluginLoader"; - RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, plugin_loader_class.c_str(), - &plugin_loader_cls)); + RETURN_IF_ERROR( + Jni::Util::get_jni_scanner_class(env, plugin_loader_class.c_str(), &plugin_loader_cls)); Jni::MethodId set_plugins_dir_method; - RETURN_IF_ERROR(plugin_loader_cls.get_static_method(env, "setPluginsDir", - "(Ljava/lang/String;)V", - &set_plugins_dir_method)); + RETURN_IF_ERROR(plugin_loader_cls.get_static_method( + env, "setPluginsDir", "(Ljava/lang/String;)V", &set_plugins_dir_method)); Jni::LocalString trino_connector_plugin_path; RETURN_IF_ERROR(Jni::LocalString::new_string( diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp index 04f74f52aefa9a..f0219bb7d85345 100644 --- a/be/src/format_v2/json/json_reader.cpp +++ b/be/src/format_v2/json/json_reader.cpp @@ -39,9 +39,9 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_string.h" #include "core/data_type/data_type_struct.h" -#include "exprs/vexpr_context.h" #include "format/file_reader/new_plain_text_line_reader.h" #include "format_v2/column_mapper.h" +#include "format_v2/materialized_reader_util.h" #include "io/file_factory.h" #include "io/fs/file_reader.h" #include "io/fs/stream_load_pipe.h" @@ -1063,29 +1063,7 @@ Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t } Status JsonReader::_apply_filters(Block* file_block, size_t* rows) { - DORIS_CHECK(file_block != nullptr); - DORIS_CHECK(rows != nullptr); - const size_t rows_before_filter = *rows; - size_t rows_after_delete_filter = rows_before_filter; - if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) { - RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block, - file_block->columns())); - rows_after_delete_filter = - file_block->columns() == 0 ? rows_before_filter : file_block->rows(); - } - - size_t rows_after_filter = rows_after_delete_filter; - if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) { - RETURN_IF_ERROR( - VExprContext::filter_block(_request->conjuncts, file_block, file_block->columns())); - rows_after_filter = - file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); - if (_io_ctx != nullptr) { - _io_ctx->predicate_filtered_rows += rows_after_delete_filter - rows_after_filter; - } - } - *rows = rows_after_filter; - return Status::OK(); + return apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, rows); } void JsonReader::_truncate_block_to_rows(Block* block, size_t num_rows) { diff --git a/be/src/format_v2/materialized_reader_util.cpp b/be/src/format_v2/materialized_reader_util.cpp new file mode 100644 index 00000000000000..a7e533633510c4 --- /dev/null +++ b/be/src/format_v2/materialized_reader_util.cpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/materialized_reader_util.h" + +#include + +#include "core/block/block.h" +#include "core/data_type/data_type_nullable.h" +#include "exprs/vexpr_context.h" +#include "format_v2/file_reader.h" +#include "io/io_common.h" + +namespace doris::format { +namespace { + +void update_counter(RuntimeProfile::Counter* counter, int64_t value) { + if (counter != nullptr) { + COUNTER_UPDATE(counter, value); + } +} + +} // namespace + +ColumnPtr make_column_nullable_if_needed(ColumnPtr column, const DataTypePtr& target_type) { + if (target_type != nullptr && target_type->is_nullable() && column.get() != nullptr && + !column->is_nullable()) { + return make_nullable(std::move(column)); + } + return column; +} + +Status apply_materialized_reader_filters(const FileScanRequest* request, io::IOContext* io_ctx, + Block* file_block, size_t* rows, + const MaterializedReaderFilterProfile* profile) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + const size_t rows_before_filter = *rows; + size_t rows_after_delete_filter = rows_before_filter; + if (request != nullptr && rows_before_filter > 0 && !request->delete_conjuncts.empty()) { + { + SCOPED_TIMER(profile == nullptr ? nullptr : profile->delete_conjunct_filter_time); + RETURN_IF_ERROR(VExprContext::filter_block(request->delete_conjuncts, file_block, + file_block->columns())); + } + rows_after_delete_filter = + file_block->columns() == 0 ? rows_before_filter : file_block->rows(); + if (profile != nullptr) { + update_counter(profile->rows_filtered_by_delete_conjunct, + rows_before_filter - rows_after_delete_filter); + } + } + + size_t rows_after_filter = rows_after_delete_filter; + if (request != nullptr && rows_after_delete_filter > 0 && !request->conjuncts.empty()) { + { + SCOPED_TIMER(profile == nullptr ? nullptr : profile->conjunct_filter_time); + RETURN_IF_ERROR(VExprContext::filter_block(request->conjuncts, file_block, + file_block->columns())); + } + rows_after_filter = + file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); + const auto rows_filtered_by_conjunct = rows_after_delete_filter - rows_after_filter; + if (profile != nullptr) { + update_counter(profile->rows_filtered_by_conjunct, rows_filtered_by_conjunct); + } + if (io_ctx != nullptr) { + io_ctx->predicate_filtered_rows += rows_filtered_by_conjunct; + } + } + *rows = rows_after_filter; + return Status::OK(); +} + +} // namespace doris::format diff --git a/be/src/format_v2/materialized_reader_util.h b/be/src/format_v2/materialized_reader_util.h new file mode 100644 index 00000000000000..2fb1383dfb9569 --- /dev/null +++ b/be/src/format_v2/materialized_reader_util.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "core/column/column.h" +#include "core/data_type/data_type.h" +#include "runtime/runtime_profile.h" + +namespace doris { +class Block; + +namespace io { +struct IOContext; +} // namespace io + +namespace format { +struct FileScanRequest; + +// Shared helpers for FileReader implementations that deserialize or build already materialized +// Doris columns and then hand those columns to TableReader for final mapping. +ColumnPtr make_column_nullable_if_needed(ColumnPtr column, const DataTypePtr& target_type); + +// Optional profile counters for text-like readers. Native/JSON do not expose per-reader filter +// counters today, so they call apply_materialized_reader_filters() without this struct. +struct MaterializedReaderFilterProfile { + RuntimeProfile::Counter* delete_conjunct_filter_time = nullptr; + RuntimeProfile::Counter* conjunct_filter_time = nullptr; + RuntimeProfile::Counter* rows_filtered_by_delete_conjunct = nullptr; + RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr; +}; + +// Applies file-local filters in the same order used by FileScannerV2 readers: +// 1. delete_conjuncts remove rows that should not be visible to the scan output; +// 2. conjuncts apply ordinary file-local predicates. +// +// Only ordinary conjunct filtering contributes to IOContext::predicate_filtered_rows. This matches +// the previous JSON/Text/CSV behavior and keeps scanner accounting separate from delete filtering. +// When `profile` is provided, the helper also updates text-reader timer and row counters so CSV +// and Hive text keep their existing observability after sharing this implementation. +Status apply_materialized_reader_filters(const FileScanRequest* request, io::IOContext* io_ctx, + Block* file_block, size_t* rows, + const MaterializedReaderFilterProfile* profile = nullptr); + +} // namespace format +} // namespace doris diff --git a/be/src/format_v2/native/native_reader.cpp b/be/src/format_v2/native/native_reader.cpp new file mode 100644 index 00000000000000..2a0a89f80adc8d --- /dev/null +++ b/be/src/format_v2/native/native_reader.cpp @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/native/native_reader.h" + +#include +#include + +#include "common/cast_set.h" +#include "core/block/block.h" +#include "core/data_type/data_type_factory.hpp" +#include "core/data_type/data_type_nullable.h" +#include "format/native/native_format.h" +#include "format_v2/column_mapper.h" +#include "format_v2/materialized_reader_util.h" +#include "io/file_factory.h" +#include "io/fs/tracing_file_reader.h" +#include "runtime/runtime_state.h" +#include "util/slice.h" + +namespace doris::format::native { +namespace { + +Status parse_native_pblock(const std::string& buffer, const std::string& path, PBlock* pblock) { + DORIS_CHECK(pblock != nullptr); + if (!pblock->ParseFromArray(buffer.data(), cast_set(buffer.size()))) { + return Status::InternalError("Failed to parse native PBlock from file {}", path); + } + return Status::OK(); +} + +} // namespace + +NativeReader::NativeReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile) + : FileReader(system_properties, file_description, std::move(io_ctx), profile) {} + +NativeReader::~NativeReader() { + static_cast(close()); +} + +Status NativeReader::init(RuntimeState* state) { + _runtime_state = state; + if (_file_description == nullptr) { + return Status::InvalidArgument("Native v2 reader requires file description"); + } + RETURN_IF_ERROR(FileReader::init(state)); + RETURN_IF_ERROR(_validate_and_consume_header()); + return Status::OK(); +} + +Status NativeReader::get_schema(std::vector* file_schema) const { + if (file_schema == nullptr) { + return Status::InvalidArgument("Native v2 file_schema is null"); + } + RETURN_IF_ERROR(_ensure_schema_loaded()); + *file_schema = _file_schema; + return Status::OK(); +} + +std::unique_ptr NativeReader::create_column_mapper( + TableColumnMapperOptions options) const { + return std::make_unique(std::move(options)); +} + +Status NativeReader::open(std::shared_ptr request) { + RETURN_IF_ERROR(FileReader::open(std::move(request))); + DORIS_CHECK(_request != nullptr); + _first_block_consumed = false; + _reader_eof = false; + _eof = false; + return Status::OK(); +} + +Status NativeReader::get_block(Block* file_block, size_t* rows, bool* eof) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + DORIS_CHECK(eof != nullptr); + if (_request == nullptr) { + return Status::InternalError("Native v2 reader is not open"); + } + + *rows = 0; + *eof = false; + if (_reader_eof) { + *eof = true; + _eof = true; + return Status::OK(); + } + + std::string buffer; + bool local_eof = false; + if (_first_block_loaded && !_first_block_consumed) { + buffer = _first_block_buffer; + } else { + RETURN_IF_ERROR(_read_next_pblock(&buffer, &local_eof)); + } + + if (local_eof && buffer.empty()) { + _reader_eof = true; + *eof = true; + _eof = true; + return Status::OK(); + } + if (buffer.empty()) { + return Status::InternalError("read empty native block from file {}", + _file_description->path); + } + + PBlock pblock; + RETURN_IF_ERROR(parse_native_pblock(buffer, _file_description->path, &pblock)); + if (!_schema_inited) { + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + } + + Block source_block; + size_t uncompressed_bytes = 0; + int64_t decompress_time = 0; + RETURN_IF_ERROR(source_block.deserialize(pblock, &uncompressed_bytes, &decompress_time)); + RETURN_IF_ERROR(_materialize_requested_columns(source_block, file_block)); + *rows = file_block->rows(); + RETURN_IF_ERROR(_apply_filters(file_block, rows)); + _reader_statistics.read_rows += *rows; + + if (_first_block_loaded && !_first_block_consumed) { + _first_block_consumed = true; + } + if (_current_offset >= _file_size) { + _reader_eof = true; + } + *eof = _reader_eof && *rows == 0; + _eof = *eof; + return Status::OK(); +} + +Status NativeReader::close() { + _file_reader.reset(); + _tracing_file_reader.reset(); + _request.reset(); + _reader_eof = true; + _eof = true; + return Status::OK(); +} + +Status NativeReader::_validate_and_consume_header() { + DORIS_CHECK(_tracing_file_reader != nullptr); + _file_size = _tracing_file_reader->size(); + _current_offset = 0; + _reader_eof = (_file_size == 0); + + static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); + if (_reader_eof || _file_size < cast_set(HEADER_SIZE)) { + return Status::InternalError( + "invalid Doris Native file {}, file size {} is smaller than header size {}", + _file_description->path, _file_size, HEADER_SIZE); + } + + char header[HEADER_SIZE]; + Slice header_slice(header, sizeof(header)); + size_t bytes_read = 0; + RETURN_IF_ERROR(_tracing_file_reader->read_at(0, header_slice, &bytes_read, _io_ctx.get())); + if (bytes_read != sizeof(header)) { + return Status::InternalError( + "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes", + _file_description->path, sizeof(header), bytes_read); + } + if (std::memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) { + return Status::InternalError("invalid Doris Native magic header in file {}", + _file_description->path); + } + + uint32_t version = 0; + std::memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); + if (version != DORIS_NATIVE_FORMAT_VERSION) { + return Status::InternalError( + "unsupported Doris Native format version {} in file {}, expect {}", version, + _file_description->path, DORIS_NATIVE_FORMAT_VERSION); + } + + _current_offset = sizeof(header); + _reader_eof = (_file_size == _current_offset); + return Status::OK(); +} + +Status NativeReader::_ensure_schema_loaded() const { + if (_schema_inited) { + return Status::OK(); + } + if (!_first_block_loaded) { + bool local_eof = false; + RETURN_IF_ERROR(_read_next_pblock(&_first_block_buffer, &local_eof)); + if (local_eof && _first_block_buffer.empty()) { + return Status::EndOfFile("empty native file {}", _file_description->path); + } + if (_first_block_buffer.empty()) { + return Status::InternalError("first native block is empty {}", _file_description->path); + } + _first_block_loaded = true; + } + + PBlock pblock; + RETURN_IF_ERROR(parse_native_pblock(_first_block_buffer, _file_description->path, &pblock)); + RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); + return Status::OK(); +} + +Status NativeReader::_read_next_pblock(std::string* buffer, bool* eof) const { + DORIS_CHECK(buffer != nullptr); + DORIS_CHECK(eof != nullptr); + DORIS_CHECK(_tracing_file_reader != nullptr); + buffer->clear(); + *eof = false; + + if (_current_offset >= _file_size) { + *eof = true; + return Status::OK(); + } + + uint64_t block_len = 0; + Slice len_slice(reinterpret_cast(&block_len), sizeof(block_len)); + size_t bytes_read = 0; + RETURN_IF_ERROR( + _tracing_file_reader->read_at(_current_offset, len_slice, &bytes_read, _io_ctx.get())); + if (bytes_read == 0) { + *eof = true; + return Status::OK(); + } + if (bytes_read != sizeof(block_len)) { + return Status::InternalError( + "Failed to read native block length from file {}, expect {}, actual {}", + _file_description->path, sizeof(block_len), bytes_read); + } + _current_offset += sizeof(block_len); + if (block_len == 0) { + *eof = (_current_offset >= _file_size); + return Status::OK(); + } + + buffer->assign(block_len, '\0'); + Slice data_slice(buffer->data(), block_len); + bytes_read = 0; + RETURN_IF_ERROR( + _tracing_file_reader->read_at(_current_offset, data_slice, &bytes_read, _io_ctx.get())); + if (bytes_read != block_len) { + return Status::InternalError( + "Failed to read native block body from file {}, expect {}, actual {}", + _file_description->path, block_len, bytes_read); + } + _current_offset += block_len; + *eof = (_current_offset >= _file_size); + return Status::OK(); +} + +Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) const { + _file_schema.clear(); + _file_schema.reserve(pblock.column_metas_size()); + for (int idx = 0; idx < pblock.column_metas_size(); ++idx) { + const auto& meta = pblock.column_metas(idx); + ColumnDefinition field; + field.identifier = Field::create_field(meta.name()); + field.local_id = idx; + field.name = meta.name(); + field.type = make_nullable(DataTypeFactory::instance().create_data_type(meta)); + _file_schema.push_back(std::move(field)); + } + _schema_inited = true; + return Status::OK(); +} + +Status NativeReader::_materialize_requested_columns(const Block& source_block, + Block* file_block) const { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(_request != nullptr); + for (const auto& [file_column_id, block_position] : _request->local_positions) { + const auto source_idx = file_column_id.value(); + if (source_idx < 0 || cast_set(source_idx) >= source_block.columns()) { + return Status::InternalError("native file {} does not contain local column id {}", + _file_description->path, source_idx); + } + if (block_position.value() >= file_block->columns()) { + return Status::InternalError("native v2 request has invalid block position {}", + block_position.value()); + } + const auto& target = file_block->get_by_position(block_position.value()); + auto column = source_block.get_by_position(source_idx).column; + column = make_column_nullable_if_needed(std::move(column), target.type); + file_block->replace_by_position(block_position.value(), IColumn::mutate(std::move(column))); + } + return Status::OK(); +} + +Status NativeReader::_apply_filters(Block* file_block, size_t* rows) const { + return apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, rows); +} + +} // namespace doris::format::native diff --git a/be/src/format_v2/native/native_reader.h b/be/src/format_v2/native/native_reader.h new file mode 100644 index 00000000000000..3719a6afd6c4f5 --- /dev/null +++ b/be/src/format_v2/native/native_reader.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include "format_v2/file_reader.h" + +namespace doris::format::native { + +// FileScannerV2 reader for Doris Native files. +// +// Native files are self-describing only through the first serialized PBlock. TableReader asks for +// schema before open(), so this reader may read and cache that first PBlock during get_schema() and +// then replay it as the first data batch after open(). +class NativeReader final : public FileReader { +public: + NativeReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile); + ~NativeReader() override; + + Status init(RuntimeState* state) override; + Status get_schema(std::vector* file_schema) const override; + std::unique_ptr create_column_mapper( + TableColumnMapperOptions options) const override; + Status open(std::shared_ptr request) override; + Status get_block(Block* file_block, size_t* rows, bool* eof) override; + Status close() override; + +private: + Status _validate_and_consume_header(); + Status _ensure_schema_loaded() const; + Status _read_next_pblock(std::string* buffer, bool* eof) const; + Status _init_schema_from_pblock(const PBlock& pblock) const; + Status _materialize_requested_columns(const Block& source_block, Block* file_block) const; + Status _apply_filters(Block* file_block, size_t* rows) const; + + RuntimeState* _runtime_state = nullptr; + mutable int64_t _current_offset = 0; + mutable int64_t _file_size = 0; + mutable bool _reader_eof = true; + mutable bool _schema_inited = false; + mutable std::vector _file_schema; + mutable std::string _first_block_buffer; + mutable bool _first_block_loaded = false; + mutable bool _first_block_consumed = false; +}; + +} // namespace doris::format::native diff --git a/be/src/format_v2/table_reader.cpp b/be/src/format_v2/table_reader.cpp index d90d4f6ea337d1..5343a5a56323b6 100644 --- a/be/src/format_v2/table_reader.cpp +++ b/be/src/format_v2/table_reader.cpp @@ -44,6 +44,7 @@ #include "format_v2/delimited_text/csv_reader.h" #include "format_v2/delimited_text/text_reader.h" #include "format_v2/json/json_reader.h" +#include "format_v2/native/native_reader.h" #include "format_v2/parquet/parquet_reader.h" #include "roaring/roaring64map.hh" #include "storage/segment/condition_cache.h" @@ -80,6 +81,8 @@ std::string file_format_to_string(FileFormat format) { return "TEXT"; case FileFormat::JNI: return "JNI"; + case FileFormat::NATIVE: + return "NATIVE"; } return "UNKNOWN"; } @@ -738,6 +741,11 @@ Status TableReader::create_file_reader(std::unique_ptr* reader) { _current_range_compress_type, _current_range_load_id); return Status::OK(); } + if (_format == FileFormat::NATIVE) { + *reader = std::make_unique( + _system_properties, _current_task->data_file, _io_ctx, _scanner_profile); + return Status::OK(); + } return Status::NotSupported("TableReader does not support file format {}", file_format_to_string(_format)); } diff --git a/be/src/format_v2/table_reader.h b/be/src/format_v2/table_reader.h index a94cae621c9546..f139301c1a12d4 100644 --- a/be/src/format_v2/table_reader.h +++ b/be/src/format_v2/table_reader.h @@ -124,9 +124,8 @@ struct TableReadOptions { std::shared_ptr io_ctx; RuntimeState* runtime_state; RuntimeProfile* scanner_profile; - // File formats without self-describing metadata, such as CSV, need the original FE slot - // descriptors to build their file-local schema and deserialize values. Self-describing formats - // ignore this field and use metadata parsed from the file footer/header. + // File formats without complete self-describing metadata, such as CSV, Text, and JSON, need + // the FE-planned physical file slots to build their file-local schema and deserialize values. const std::vector* file_slot_descs = nullptr; // Push-down aggregate type. const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE; diff --git a/be/test/exec/scan/file_scanner_v2_test.cpp b/be/test/exec/scan/file_scanner_v2_test.cpp index d3f0507aca1122..43240cd7d17e49 100644 --- a/be/test/exec/scan/file_scanner_v2_test.cpp +++ b/be/test/exec/scan/file_scanner_v2_test.cpp @@ -107,6 +107,8 @@ TEST(FileScannerV2Test, SupportedFormatMatrix) { {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_CSV_PLAIN, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_TEXT, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_JSON, true}, + {"tvf", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_NATIVE, true}, + {"", TFileFormatType::FORMAT_WAL, std::nullopt, false}, }; for (const auto& test_case : cases) { @@ -180,6 +182,7 @@ TEST(FileScannerV2Test, FileFormatConversionMatrix) { {TFileFormatType::FORMAT_PROTO, format::FileFormat::CSV}, {TFileFormatType::FORMAT_TEXT, format::FileFormat::TEXT}, {TFileFormatType::FORMAT_JSON, format::FileFormat::JSON}, + {TFileFormatType::FORMAT_NATIVE, format::FileFormat::NATIVE}, {TFileFormatType::FORMAT_ORC, std::nullopt}, }; diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp index 994d71c0e6aefc..d968a9d5bee571 100644 --- a/be/test/format_v2/json/json_reader_test.cpp +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -35,7 +35,10 @@ #include "core/data_type/data_type_nullable.h" #include "core/data_type/data_type_number.h" #include "core/data_type/data_type_string.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" #include "format_v2/column_data.h" +#include "io/io_common.h" #include "runtime/descriptors.h" #include "runtime/runtime_profile.h" #include "testutil/mock/mock_runtime_state.h" @@ -216,6 +219,57 @@ bool nullable_is_null_at(const IColumn& column, size_t row) { return nullable.is_null_at(row); } +class NullableIntGreaterThanExpr final : public VExpr { +public: + NullableIntGreaterThanExpr(size_t block_position, int32_t value) + : VExpr(std::make_shared(), false), + _block_position(block_position), + _value(value) {} + + const std::string& expr_name() const override { return _name; } + + bool is_constant() const override { return false; } + + Status execute_column_impl(VExprContext*, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + DORIS_CHECK(block != nullptr); + const auto& nullable = + assert_cast(*block->get_by_position(_block_position).column); + const auto& data = assert_cast(nullable.get_nested_column()); + + auto result = ColumnUInt8::create(); + auto& result_data = result->get_data(); + result_data.resize(count); + for (size_t row = 0; row < count; ++row) { + const auto source_row = selector == nullptr ? row : (*selector)[row]; + result_data[row] = + !nullable.is_null_at(source_row) && data.get_element(source_row) > _value; + } + result_column = std::move(result); + return Status::OK(); + } + + Status clone_node(VExprSPtr* cloned_expr) const override { + DORIS_CHECK(cloned_expr != nullptr); + *cloned_expr = std::make_shared(_block_position, _value); + return Status::OK(); + } + +private: + size_t _block_position; + int32_t _value; + const std::string _name = "NullableIntGreaterThanExpr"; +}; + +VExprContextSPtr prepared_conjunct(RuntimeState* state, const VExprSPtr& expr) { + auto context = VExprContext::create_shared(expr); + auto status = context->prepare(state, RowDescriptor()); + EXPECT_TRUE(status.ok()) << status; + status = context->open(state); + EXPECT_TRUE(status.ok()) << status; + return context; +} + } // namespace TEST(JsonReaderTest, ReadsRequestedColumnsInFileScanRequestOrder) { @@ -399,4 +453,50 @@ TEST(JsonReaderTest, SkipsEmptyJsonLine) { EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "nancy"); } +// Scenario: JSON, Native, CSV, and Hive text all share the same file-local filter order: +// delete conjuncts run first, ordinary conjuncts run second, and only ordinary conjuncts contribute +// to IOContext::predicate_filtered_rows. This guards the JSON caller of the shared helper because +// CSV/Text already assert the optional profile-counter path. +TEST(JsonReaderTest, AppliesDeleteAndNormalConjunctsWithPredicateFilterAccounting) { + ObjectPool pool; + auto slots = build_slots(&pool); + const auto file_path = write_json_file("filters.jsonl", R"({"id":1,"name":"alice"})" + "\n" + R"({"id":2,"name":"bob"})" + "\n" + R"({"id":3,"name":"carol"})" + "\n"); + auto params = json_scan_params(); + auto range = file_range(file_path); + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto desc = file_description(file_path.string()); + RuntimeProfile profile("json_v2_reader_filter_test"); + MockRuntimeState state; + auto io_ctx = std::make_shared(); + JsonReader reader(system_properties, desc, io_ctx, &profile, ¶ms, range, slots); + + ASSERT_TRUE(reader.init(&state).ok()); + std::vector schema; + ASSERT_TRUE(reader.get_schema(&schema).ok()); + + auto request = std::make_shared(); + request->local_positions.emplace(LocalColumnId(0), LocalIndex(0)); + request->local_positions.emplace(LocalColumnId(1), LocalIndex(1)); + request->delete_conjuncts = { + prepared_conjunct(&state, std::make_shared(0, 1))}; + request->conjuncts = { + prepared_conjunct(&state, std::make_shared(0, 2))}; + ASSERT_TRUE(reader.open(request).ok()); + + auto block = make_block(schema, {0, 1}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader.get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 1); + EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 3); + EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 0), "carol"); + EXPECT_EQ(io_ctx->predicate_filtered_rows, 1); +} + } // namespace doris::format::json diff --git a/be/test/format_v2/native/native_reader_test.cpp b/be/test/format_v2/native/native_reader_test.cpp new file mode 100644 index 00000000000000..aaa7aa90e0681e --- /dev/null +++ b/be/test/format_v2/native/native_reader_test.cpp @@ -0,0 +1,419 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/native/native_reader.h" + +#include + +#include +#include +#include +#include +#include + +#include "agent/be_exec_version_manager.h" +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" +#include "format/native/native_format.h" +#include "format_v2/column_mapper.h" +#include "io/fs/local_file_system.h" +#include "io/io_common.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_profile.h" +#include "runtime/runtime_state.h" +#include "util/coding.h" +#include "util/uid_util.h" + +namespace doris::format::native { +namespace { + +std::unique_ptr file_description(const std::string& path) { + auto desc = std::make_unique(); + desc->path = path; + desc->file_size = static_cast(std::filesystem::file_size(path)); + desc->range_start_offset = 0; + desc->range_size = desc->file_size; + return desc; +} + +Status write_file(const std::string& path, std::string_view content) { + io::FileWriterPtr writer; + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &writer)); + if (!content.empty()) { + RETURN_IF_ERROR(writer->append({content.data(), content.size()})); + } + return writer->close(); +} + +std::unique_ptr create_reader(const std::string& path, RuntimeState* state, + RuntimeProfile* profile, + std::shared_ptr io_ctx = nullptr) { + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto desc = file_description(path); + return std::make_unique(system_properties, desc, std::move(io_ctx), profile); +} + +Block make_source_block() { + auto id_column = ColumnInt32::create(); + id_column->insert_value(10); + id_column->insert_value(20); + + auto name_column = ColumnString::create(); + name_column->insert_data("alice", 5); + name_column->insert_data("bob", 3); + + Block block; + block.insert({id_column->get_ptr(), std::make_shared(), "id"}); + block.insert({name_column->get_ptr(), std::make_shared(), "name"}); + return block; +} + +Status write_native_file(const std::string& path, const Block& block) { + io::FileWriterPtr writer; + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, &writer)); + RETURN_IF_ERROR(writer->append({DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)})); + + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + RETURN_IF_ERROR(writer->append({version_buffer, sizeof(version_buffer)})); + + PBlock pblock; + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + int64_t compressed_time = 0; + RETURN_IF_ERROR(block.serialize(BeExecVersionManager::get_newest_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, &compressed_time, + segment_v2::CompressionTypePB::SNAPPY)); + + const std::string payload = pblock.SerializeAsString(); + uint8_t len_buffer[sizeof(uint64_t)]; + encode_fixed64_le(len_buffer, payload.size()); + RETURN_IF_ERROR(writer->append({len_buffer, sizeof(len_buffer)})); + RETURN_IF_ERROR(writer->append(payload)); + return writer->close(); +} + +Block make_request_block(const std::vector& schema, + const std::vector& local_ids) { + Block block; + for (const auto local_id : local_ids) { + const auto it = std::find_if(schema.begin(), schema.end(), [&](const auto& column) { + return column.local_id == local_id; + }); + DORIS_CHECK(it != schema.end()); + block.insert({it->type->create_column(), it->type, it->name}); + } + return block; +} + +int32_t nullable_int_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data()[row]; +} + +std::string nullable_string_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data_at(row).to_string(); +} + +class NullableIntGreaterThanExpr final : public VExpr { +public: + NullableIntGreaterThanExpr(size_t block_position, int32_t value) + : VExpr(std::make_shared(), false), + _block_position(block_position), + _value(value) {} + + const std::string& expr_name() const override { return _name; } + + bool is_constant() const override { return false; } + + Status execute_column_impl(VExprContext*, const Block* block, const Selector* selector, + size_t count, ColumnPtr& result_column) const override { + DORIS_CHECK(block != nullptr); + const auto& nullable = + assert_cast(*block->get_by_position(_block_position).column); + const auto& data = assert_cast(nullable.get_nested_column()); + + auto result = ColumnUInt8::create(); + auto& result_data = result->get_data(); + result_data.resize(count); + for (size_t row = 0; row < count; ++row) { + const auto source_row = selector == nullptr ? row : (*selector)[row]; + result_data[row] = + !nullable.is_null_at(source_row) && data.get_element(source_row) > _value; + } + result_column = std::move(result); + return Status::OK(); + } + + Status clone_node(VExprSPtr* cloned_expr) const override { + DORIS_CHECK(cloned_expr != nullptr); + *cloned_expr = std::make_shared(_block_position, _value); + return Status::OK(); + } + +private: + size_t _block_position; + int32_t _value; + const std::string _name = "NullableIntGreaterThanExpr"; +}; + +VExprContextSPtr prepared_conjunct(RuntimeState* state, const VExprSPtr& expr) { + auto context = VExprContext::create_shared(expr); + auto status = context->prepare(state, RowDescriptor()); + EXPECT_TRUE(status.ok()) << status; + status = context->open(state); + EXPECT_TRUE(status.ok()) << status; + return context; +} + +} // namespace + +TEST(NativeV2ReaderTest, SchemaProbeReplaysFirstBlockAndProjectsColumns) { + const auto path = "./log/native_v2_reader_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + ASSERT_TRUE(write_native_file(path, make_source_block()).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_test"); + auto reader = create_reader(path, &state, &profile); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + ASSERT_EQ(schema.size(), 2); + EXPECT_EQ(schema[0].name, "id"); + EXPECT_EQ(schema[0].local_id, 0); + EXPECT_EQ(schema[1].name, "name"); + EXPECT_EQ(schema[1].local_id, 1); + EXPECT_TRUE(schema[0].type->is_nullable()); + EXPECT_TRUE(schema[1].type->is_nullable()); + + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(0)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_request_block(schema, {1, 0}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 2); + EXPECT_FALSE(eof); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 0), "alice"); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 1), "bob"); + EXPECT_EQ(nullable_int_at(*block.get_by_position(1).column, 0), 10); + EXPECT_EQ(nullable_int_at(*block.get_by_position(1).column, 1), 20); + + block.clear_column_data(2); + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + EXPECT_EQ(rows, 0); + EXPECT_TRUE(eof); + ASSERT_TRUE(reader->close().ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +TEST(NativeV2ReaderTest, AppliesConjunctsAndTracksPredicateFilteredRows) { + const auto path = + "./log/native_v2_reader_filter_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + ASSERT_TRUE(write_native_file(path, make_source_block()).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_filter_test"); + auto io_ctx = std::make_shared(); + auto reader = create_reader(path, &state, &profile, io_ctx); + ASSERT_TRUE(reader->init(&state).ok()); + + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(0)).ok()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(1)).ok()); + request->conjuncts = { + prepared_conjunct(&state, std::make_shared(0, 10))}; + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_request_block(schema, {0, 1}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 1); + EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 20); + EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 0), "bob"); + EXPECT_EQ(io_ctx->predicate_filtered_rows, 1); + ASSERT_TRUE(reader->close().ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +TEST(NativeV2ReaderTest, RejectsInvalidHeaderAndEmptyFile) { + std::filesystem::create_directories("./log"); + RuntimeState state; + RuntimeProfile profile("native_v2_reader_bad_header_test"); + + const auto bad_magic_path = + "./log/native_v2_bad_magic_" + UniqueId::gen_uid().to_string() + ".native"; + std::string bad_magic(sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t), '\0'); + bad_magic.replace(0, 4, "BAD!"); + ASSERT_TRUE(write_file(bad_magic_path, bad_magic).ok()); + auto bad_magic_reader = create_reader(bad_magic_path, &state, &profile); + EXPECT_FALSE(bad_magic_reader->init(&state).ok()); + static_cast(io::global_local_filesystem()->delete_file(bad_magic_path)); + + const auto empty_path = "./log/native_v2_empty_" + UniqueId::gen_uid().to_string() + ".native"; + ASSERT_TRUE(write_file(empty_path, "").ok()); + auto empty_reader = create_reader(empty_path, &state, &profile); + EXPECT_FALSE(empty_reader->init(&state).ok()); + static_cast(io::global_local_filesystem()->delete_file(empty_path)); +} + +TEST(NativeV2ReaderTest, RejectsUnsupportedVersionAndHeaderOnlyFile) { + std::filesystem::create_directories("./log"); + RuntimeState state; + RuntimeProfile profile("native_v2_reader_header_boundary_test"); + + const auto bad_version_path = + "./log/native_v2_bad_version_" + UniqueId::gen_uid().to_string() + ".native"; + std::string bad_version; + bad_version.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION + 1); + bad_version.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + ASSERT_TRUE(write_file(bad_version_path, bad_version).ok()); + auto bad_version_reader = create_reader(bad_version_path, &state, &profile); + EXPECT_FALSE(bad_version_reader->init(&state).ok()); + static_cast(io::global_local_filesystem()->delete_file(bad_version_path)); + + const auto header_only_path = + "./log/native_v2_header_only_" + UniqueId::gen_uid().to_string() + ".native"; + std::string header_only; + header_only.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + header_only.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + ASSERT_TRUE(write_file(header_only_path, header_only).ok()); + auto header_only_reader = create_reader(header_only_path, &state, &profile); + ASSERT_TRUE(header_only_reader->init(&state).ok()); + std::vector schema; + EXPECT_FALSE(header_only_reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(header_only_path)); +} + +TEST(NativeV2ReaderTest, RejectsTruncatedBlockDuringSchemaProbe) { + const auto path = "./log/native_v2_truncated_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + + std::string content; + content.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + content.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + uint8_t len_buffer[sizeof(uint64_t)]; + encode_fixed64_le(len_buffer, 8); + content.append(reinterpret_cast(len_buffer), sizeof(len_buffer)); + content.append("x"); + ASSERT_TRUE(write_file(path, content).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_truncated_test"); + auto reader = create_reader(path, &state, &profile); + ASSERT_TRUE(reader->init(&state).ok()); + std::vector schema; + EXPECT_FALSE(reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +TEST(NativeV2ReaderTest, RejectsZeroLengthBlockAndInvalidPBlock) { + std::filesystem::create_directories("./log"); + RuntimeState state; + RuntimeProfile profile("native_v2_reader_bad_block_test"); + + auto build_header = [] { + std::string content; + content.append(DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)); + uint8_t version_buffer[sizeof(uint32_t)]; + encode_fixed32_le(version_buffer, DORIS_NATIVE_FORMAT_VERSION); + content.append(reinterpret_cast(version_buffer), sizeof(version_buffer)); + return content; + }; + + const auto zero_len_path = + "./log/native_v2_zero_len_" + UniqueId::gen_uid().to_string() + ".native"; + auto zero_len_content = build_header(); + uint8_t len_buffer[sizeof(uint64_t)]; + encode_fixed64_le(len_buffer, 0); + zero_len_content.append(reinterpret_cast(len_buffer), sizeof(len_buffer)); + ASSERT_TRUE(write_file(zero_len_path, zero_len_content).ok()); + auto zero_len_reader = create_reader(zero_len_path, &state, &profile); + ASSERT_TRUE(zero_len_reader->init(&state).ok()); + std::vector schema; + EXPECT_FALSE(zero_len_reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(zero_len_path)); + + const auto invalid_pblock_path = + "./log/native_v2_invalid_pblock_" + UniqueId::gen_uid().to_string() + ".native"; + auto invalid_pblock_content = build_header(); + encode_fixed64_le(len_buffer, 1); + invalid_pblock_content.append(reinterpret_cast(len_buffer), sizeof(len_buffer)); + invalid_pblock_content.append("x"); + ASSERT_TRUE(write_file(invalid_pblock_path, invalid_pblock_content).ok()); + auto invalid_pblock_reader = create_reader(invalid_pblock_path, &state, &profile); + ASSERT_TRUE(invalid_pblock_reader->init(&state).ok()); + schema.clear(); + EXPECT_FALSE(invalid_pblock_reader->get_schema(&schema).ok()); + static_cast(io::global_local_filesystem()->delete_file(invalid_pblock_path)); +} + +TEST(NativeV2ReaderTest, RejectsUnknownRequestedLocalColumn) { + const auto path = + "./log/native_v2_unknown_column_" + UniqueId::gen_uid().to_string() + ".native"; + std::filesystem::create_directories("./log"); + ASSERT_TRUE(write_native_file(path, make_source_block()).ok()); + + RuntimeState state; + RuntimeProfile profile("native_v2_reader_unknown_column_test"); + auto reader = create_reader(path, &state, &profile); + ASSERT_TRUE(reader->init(&state).ok()); + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + + auto request = std::make_shared(); + FileScanRequestBuilder builder(request.get()); + ASSERT_TRUE(builder.add_non_predicate_column(LocalColumnId(42)).ok()); + ASSERT_TRUE(reader->open(request).ok()); + Block block; + block.insert({schema[0].type->create_column(), schema[0].type, schema[0].name}); + size_t rows = 0; + bool eof = false; + EXPECT_FALSE(reader->get_block(&block, &rows, &eof).ok()); + static_cast(io::global_local_filesystem()->delete_file(path)); +} + +} // namespace doris::format::native