Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion be/src/exec/scan/file_scanner_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ bool is_json_format(TFileFormatType::type format_type) {
return format_type == TFileFormatType::FORMAT_JSON;
}

bool is_native_format(TFileFormatType::type format_type) {
return format_type == TFileFormatType::FORMAT_NATIVE;
}

bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
column_name == BeConsts::ICEBERG_ROWID_COL) {
Expand Down Expand Up @@ -221,7 +225,7 @@ bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFile
} else if (format_type == TFileFormatType::FORMAT_JNI) {
return is_supported_jni_table_format(range);
} else if (is_csv_format(format_type) || is_text_format(format_type) ||
is_json_format(format_type)) {
is_json_format(format_type) || is_native_format(format_type)) {
return is_supported_table_format(range);
} else {
LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2";
Expand Down Expand Up @@ -615,6 +619,9 @@ Status FileScannerV2::_to_file_format(TFileFormatType::type format_type,
case TFileFormatType::FORMAT_JSON:
*file_format = format::FileFormat::JSON;
return Status::OK();
case TFileFormatType::FORMAT_NATIVE:
*file_format = format::FileFormat::NATIVE;
return Status::OK();
default:
return Status::NotSupported("FileScannerV2 does not support file format {}",
to_string(format_type));
Expand Down
40 changes: 9 additions & 31 deletions be/src/format_v2/delimited_text/delimited_text_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_string.h"
#include "core/data_type/data_type_struct.h"
#include "exprs/vexpr_context.h"
#include "format/line_reader.h"
#include "format_v2/column_mapper.h"
#include "format_v2/materialized_reader_util.h"
#include "io/file_factory.h"
#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
Expand Down Expand Up @@ -341,36 +341,14 @@ Status DelimitedTextReader::get_block(Block* file_block, size_t* rows, bool* eof
const size_t rows_before_filter = *rows;
update_counter(_text_profile.rows_read_before_filter, rows_before_filter);

size_t rows_after_delete_filter = rows_before_filter;
if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) {
{
SCOPED_TIMER(_text_profile.delete_conjunct_filter_time);
RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block,
file_block->columns()));
}
rows_after_delete_filter =
file_block->columns() == 0 ? rows_before_filter : file_block->rows();
update_counter(_text_profile.rows_filtered_by_delete_conjunct,
rows_before_filter - rows_after_delete_filter);
}

size_t rows_after_filter = rows_after_delete_filter;
if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) {
{
SCOPED_TIMER(_text_profile.conjunct_filter_time);
RETURN_IF_ERROR(VExprContext::filter_block(_request->conjuncts, file_block,
file_block->columns()));
}
rows_after_filter =
file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows();
const auto rows_filtered_by_conjunct = rows_after_delete_filter - rows_after_filter;
update_counter(_text_profile.rows_filtered_by_conjunct, rows_filtered_by_conjunct);
if (_io_ctx != nullptr) {
_io_ctx->predicate_filtered_rows += rows_filtered_by_conjunct;
}
}

*rows = rows_after_filter;
MaterializedReaderFilterProfile filter_profile;
filter_profile.delete_conjunct_filter_time = _text_profile.delete_conjunct_filter_time;
filter_profile.conjunct_filter_time = _text_profile.conjunct_filter_time;
filter_profile.rows_filtered_by_delete_conjunct =
_text_profile.rows_filtered_by_delete_conjunct;
filter_profile.rows_filtered_by_conjunct = _text_profile.rows_filtered_by_conjunct;
RETURN_IF_ERROR(apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block,
rows, &filter_profile));
update_counter(_text_profile.rows_returned, *rows);
_reader_statistics.read_rows += *rows;
*eof = _line_reader_eof && *rows == 0;
Expand Down
1 change: 1 addition & 0 deletions be/src/format_v2/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ enum class FileFormat {
JSON,
TEXT,
JNI,
NATIVE,
};

// 通用文件层 scan 请求。
Expand Down
3 changes: 1 addition & 2 deletions be/src/format_v2/jni/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ std::string MaxComputeJniReader::connector_class() const {
return "org/apache/doris/maxcompute/MaxComputeJniScanner";
}

Status MaxComputeJniReader::build_scanner_params(
std::map<std::string, std::string>* params) const {
Status MaxComputeJniReader::build_scanner_params(std::map<std::string, std::string>* params) const {
DORIS_CHECK(params != nullptr);
DORIS_CHECK(_table_desc != nullptr);
params->clear();
Expand Down
18 changes: 7 additions & 11 deletions be/src/format_v2/jni/trino_connector_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ constexpr std::string_view TRINO_CONNECTOR_NAME = "connector.name";

Status TrinoConnectorJniReader::validate_scan_range(const TFileRangeDesc& range) const {
if (!range.__isset.table_format_params) {
return Status::InternalError(
"missing table_format_params for trino connector jni reader");
return Status::InternalError("missing table_format_params for trino connector jni reader");
}
if (!range.table_format_params.__isset.trino_connector_params) {
return Status::InternalError(
Expand All @@ -52,8 +51,7 @@ Status TrinoConnectorJniReader::validate_scan_range(const TFileRangeDesc& range)
"missing trino connector.name option for trino connector jni reader, possibly "
"caused by FE/BE protocol mismatch");
}
if (!trino_params.__isset.trino_connector_split ||
trino_params.trino_connector_split.empty()) {
if (!trino_params.__isset.trino_connector_split || trino_params.trino_connector_split.empty()) {
return Status::InternalError(
"missing trino_connector_split for trino connector jni reader, possibly caused "
"by FE/BE protocol mismatch");
Expand Down Expand Up @@ -109,8 +107,7 @@ Status TrinoConnectorJniReader::build_scanner_params(
(*params)["trino_connector_column_handles"] = trino_params.trino_connector_column_handles;
(*params)["trino_connector_column_metadata"] = trino_params.trino_connector_column_metadata;
(*params)["trino_connector_predicate"] = trino_params.trino_connector_predicate;
(*params)["trino_connector_trascation_handle"] =
trino_params.trino_connector_trascation_handle;
(*params)["trino_connector_trascation_handle"] = trino_params.trino_connector_trascation_handle;

for (const auto& kv : trino_params.trino_connector_options) {
(*params)[std::string(TRINO_CONNECTOR_OPTION_PREFIX) + kv.first] = kv.second;
Expand All @@ -125,13 +122,12 @@ Status TrinoConnectorJniReader::_set_spi_plugins_dir() const {
Jni::LocalClass plugin_loader_cls;
const std::string plugin_loader_class =
"org/apache/doris/trinoconnector/TrinoConnectorPluginLoader";
RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, plugin_loader_class.c_str(),
&plugin_loader_cls));
RETURN_IF_ERROR(
Jni::Util::get_jni_scanner_class(env, plugin_loader_class.c_str(), &plugin_loader_cls));

Jni::MethodId set_plugins_dir_method;
RETURN_IF_ERROR(plugin_loader_cls.get_static_method(env, "setPluginsDir",
"(Ljava/lang/String;)V",
&set_plugins_dir_method));
RETURN_IF_ERROR(plugin_loader_cls.get_static_method(
env, "setPluginsDir", "(Ljava/lang/String;)V", &set_plugins_dir_method));

Jni::LocalString trino_connector_plugin_path;
RETURN_IF_ERROR(Jni::LocalString::new_string(
Expand Down
26 changes: 2 additions & 24 deletions be/src/format_v2/json/json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_string.h"
#include "core/data_type/data_type_struct.h"
#include "exprs/vexpr_context.h"
#include "format/file_reader/new_plain_text_line_reader.h"
#include "format_v2/column_mapper.h"
#include "format_v2/materialized_reader_util.h"
#include "io/file_factory.h"
#include "io/fs/file_reader.h"
#include "io/fs/stream_load_pipe.h"
Expand Down Expand Up @@ -1063,29 +1063,7 @@ Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t
}

Status JsonReader::_apply_filters(Block* file_block, size_t* rows) {
DORIS_CHECK(file_block != nullptr);
DORIS_CHECK(rows != nullptr);
const size_t rows_before_filter = *rows;
size_t rows_after_delete_filter = rows_before_filter;
if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) {
RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block,
file_block->columns()));
rows_after_delete_filter =
file_block->columns() == 0 ? rows_before_filter : file_block->rows();
}

size_t rows_after_filter = rows_after_delete_filter;
if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) {
RETURN_IF_ERROR(
VExprContext::filter_block(_request->conjuncts, file_block, file_block->columns()));
rows_after_filter =
file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows();
if (_io_ctx != nullptr) {
_io_ctx->predicate_filtered_rows += rows_after_delete_filter - rows_after_filter;
}
}
*rows = rows_after_filter;
return Status::OK();
return apply_materialized_reader_filters(_request.get(), _io_ctx.get(), file_block, rows);
}

void JsonReader::_truncate_block_to_rows(Block* block, size_t num_rows) {
Expand Down
89 changes: 89 additions & 0 deletions be/src/format_v2/materialized_reader_util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "format_v2/materialized_reader_util.h"

#include <utility>

#include "core/block/block.h"
#include "core/data_type/data_type_nullable.h"
#include "exprs/vexpr_context.h"
#include "format_v2/file_reader.h"
#include "io/io_common.h"

namespace doris::format {
namespace {

void update_counter(RuntimeProfile::Counter* counter, int64_t value) {
if (counter != nullptr) {
COUNTER_UPDATE(counter, value);
}
}

} // namespace

ColumnPtr make_column_nullable_if_needed(ColumnPtr column, const DataTypePtr& target_type) {
if (target_type != nullptr && target_type->is_nullable() && column.get() != nullptr &&
!column->is_nullable()) {
return make_nullable(std::move(column));
}
return column;
}

Status apply_materialized_reader_filters(const FileScanRequest* request, io::IOContext* io_ctx,
Block* file_block, size_t* rows,
const MaterializedReaderFilterProfile* profile) {
DORIS_CHECK(file_block != nullptr);
DORIS_CHECK(rows != nullptr);
const size_t rows_before_filter = *rows;
size_t rows_after_delete_filter = rows_before_filter;
if (request != nullptr && rows_before_filter > 0 && !request->delete_conjuncts.empty()) {
{
SCOPED_TIMER(profile == nullptr ? nullptr : profile->delete_conjunct_filter_time);
RETURN_IF_ERROR(VExprContext::filter_block(request->delete_conjuncts, file_block,
file_block->columns()));
}
rows_after_delete_filter =
file_block->columns() == 0 ? rows_before_filter : file_block->rows();
if (profile != nullptr) {
update_counter(profile->rows_filtered_by_delete_conjunct,
rows_before_filter - rows_after_delete_filter);
}
}

size_t rows_after_filter = rows_after_delete_filter;
if (request != nullptr && rows_after_delete_filter > 0 && !request->conjuncts.empty()) {
{
SCOPED_TIMER(profile == nullptr ? nullptr : profile->conjunct_filter_time);
RETURN_IF_ERROR(VExprContext::filter_block(request->conjuncts, file_block,
file_block->columns()));
}
rows_after_filter =
file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows();
const auto rows_filtered_by_conjunct = rows_after_delete_filter - rows_after_filter;
if (profile != nullptr) {
update_counter(profile->rows_filtered_by_conjunct, rows_filtered_by_conjunct);
}
if (io_ctx != nullptr) {
io_ctx->predicate_filtered_rows += rows_filtered_by_conjunct;
}
}
*rows = rows_after_filter;
return Status::OK();
}

} // namespace doris::format
63 changes: 63 additions & 0 deletions be/src/format_v2/materialized_reader_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>

#include "common/status.h"
#include "core/column/column.h"
#include "core/data_type/data_type.h"
#include "runtime/runtime_profile.h"

namespace doris {
class Block;

namespace io {
struct IOContext;
} // namespace io

namespace format {
struct FileScanRequest;

// Shared helpers for FileReader implementations that deserialize or build already materialized
// Doris columns and then hand those columns to TableReader for final mapping.
ColumnPtr make_column_nullable_if_needed(ColumnPtr column, const DataTypePtr& target_type);

// Optional profile counters for text-like readers. Native/JSON do not expose per-reader filter
// counters today, so they call apply_materialized_reader_filters() without this struct.
struct MaterializedReaderFilterProfile {
RuntimeProfile::Counter* delete_conjunct_filter_time = nullptr;
RuntimeProfile::Counter* conjunct_filter_time = nullptr;
RuntimeProfile::Counter* rows_filtered_by_delete_conjunct = nullptr;
RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr;
};

// Applies file-local filters in the same order used by FileScannerV2 readers:
// 1. delete_conjuncts remove rows that should not be visible to the scan output;
// 2. conjuncts apply ordinary file-local predicates.
//
// Only ordinary conjunct filtering contributes to IOContext::predicate_filtered_rows. This matches
// the previous JSON/Text/CSV behavior and keeps scanner accounting separate from delete filtering.
// When `profile` is provided, the helper also updates text-reader timer and row counters so CSV
// and Hive text keep their existing observability after sharing this implementation.
Status apply_materialized_reader_filters(const FileScanRequest* request, io::IOContext* io_ctx,
Block* file_block, size_t* rows,
const MaterializedReaderFilterProfile* profile = nullptr);

} // namespace format
} // namespace doris
Loading
Loading