From 921f48870b0a98f492da9d3005a9099edc08e2a6 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jun 2026 10:03:11 +0800 Subject: [PATCH] [fix](be) Apply scanner v2 load counter fixes ### What problem does this PR solve? Issue Number: None Related PR: #63781, #64671 Problem Summary: File scanner v2 did not carry the same fixes as the existing file scanner path. Predicate rows filtered inside v2 file readers were still reported through scanner load counters unless the scanner was a real load source, and Hive TEXTFILE empty physical lines were still skipped unless read_csv_empty_line_as_null was enabled. This change gates v2 load counter reporting with the same FILE_STREAM exception used by FileScanner and adds a delimited text hook so Hive Text v2 treats empty physical lines as records while CSV keeps the old default behavior. ### Release note Fix file scanner v2 load counter reporting and Hive TEXTFILE empty-line handling. ### Check List (For Author) - Test: Unit Test / Manual test - Added TextV2ReaderTest coverage for Hive TEXTFILE empty line records, single-column empty string fields, and COUNT pushdown. - Ran git diff --check. - Ran clang-format v16 through build-support/run_clang_format.py for changed files. - Attempted ./run-be-ut.sh --run --filter='TextV2ReaderTest.*:FileScannerV2Test.*', but the local run was blocked because the script needed to update/download datasketches-cpp and network access was unavailable; no BE UT binary was already built. - Attempted clang-tidy with the available compile_commands.json, but it pointed at a stale /mnt/disk3/gabriel path; the project clang-tidy wrapper also requires bash 4+ while only system bash is available. - Behavior changed: Yes. File scanner v2 now matches v1 load counter gating and Hive TEXTFILE empty-line semantics. - Does this need documentation: No --- be/src/exec/scan/file_scanner_v2.cpp | 14 ++++ be/src/exec/scan/file_scanner_v2.h | 1 + .../delimited_text/delimited_text_reader.cpp | 21 ++++-- .../delimited_text/delimited_text_reader.h | 3 + .../format_v2/delimited_text/text_reader.cpp | 6 ++ be/src/format_v2/delimited_text/text_reader.h | 1 + .../delimited_text/text_reader_test.cpp | 69 +++++++++++++++++-- 7 files changed, 102 insertions(+), 13 deletions(-) diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index 87bc1bf4be6133..92398fe0bf5d88 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -669,6 +669,20 @@ void FileScannerV2::_collect_profile_before_close() { _report_condition_cache_profile(); } +bool FileScannerV2::_should_update_load_counters() const { + if (_is_load) { + return true; + } + // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a + // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by + // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM + // is only reachable from such load entries, never from normal queries, so use it to + // identify these scanners. + return (_params != nullptr && _params->__isset.file_type && + _params->file_type == TFileType::FILE_STREAM) || + (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM); +} + void FileScannerV2::_report_file_reader_predicate_filtered_rows() { const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0; const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows; diff --git a/be/src/exec/scan/file_scanner_v2.h b/be/src/exec/scan/file_scanner_v2.h index cfdd687837270c..7140842b12ff3d 100644 --- a/be/src/exec/scan/file_scanner_v2.h +++ b/be/src/exec/scan/file_scanner_v2.h @@ -81,6 +81,7 @@ class FileScannerV2 final : public Scanner { protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; void _collect_profile_before_close() override; + bool _should_update_load_counters() const override; private: TFileFormatType::type _get_current_format_type() const; diff --git a/be/src/format_v2/delimited_text/delimited_text_reader.cpp b/be/src/format_v2/delimited_text/delimited_text_reader.cpp index 2a7b0b5d67e097..ba4986ee531740 100644 --- a/be/src/format_v2/delimited_text/delimited_text_reader.cpp +++ b/be/src/format_v2/delimited_text/delimited_text_reader.cpp @@ -399,7 +399,8 @@ Status DelimitedTextReader::get_aggregate_result(const FileAggregateRequest& req } if (line.size == 0) { update_counter(_text_profile.empty_lines_read, 1); - if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) { + if (_empty_line_as_record() || + (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null())) { ++count; } continue; @@ -595,14 +596,16 @@ Status DelimitedTextReader::_fill_columns_from_line(const Slice& line, DORIS_CHECK(columns != nullptr); if (line.size == 0) { update_counter(_text_profile.empty_lines_read, 1); - if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) { - for (const auto& column : _requested_columns) { - RETURN_IF_ERROR(_append_null((*columns)[column.block_position.value()].get())); - update_counter(_text_profile.cells_deserialized, 1); + if (!_empty_line_as_record()) { + if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) { + for (const auto& column : _requested_columns) { + RETURN_IF_ERROR(_append_null((*columns)[column.block_position.value()].get())); + update_counter(_text_profile.cells_deserialized, 1); + } + ++(*rows); } - ++(*rows); + return Status::OK(); } - return Status::OK(); } RETURN_IF_ERROR(_validate_line(line)); @@ -635,6 +638,10 @@ Slice DelimitedTextReader::_normalize_value(Slice value) const { return value; } +bool DelimitedTextReader::_empty_line_as_record() const { + return false; +} + bool DelimitedTextReader::_can_split() const { return _file_compress_type == TFileCompressType::PLAIN; } diff --git a/be/src/format_v2/delimited_text/delimited_text_reader.h b/be/src/format_v2/delimited_text/delimited_text_reader.h index 1c6b0b0bef0fa3..06cb93dd7f7b65 100644 --- a/be/src/format_v2/delimited_text/delimited_text_reader.h +++ b/be/src/format_v2/delimited_text/delimited_text_reader.h @@ -117,6 +117,9 @@ class DelimitedTextReader : public FileReader { // text keeps the raw field because empty string and NULL are distinct unless null_format // matches exactly. virtual Slice _normalize_value(Slice value) const; + // Whether an empty physical line is one logical record. CSV keeps the existing default + // skip behavior, while Hive TEXTFILE treats an empty line as a record with one empty field. + virtual bool _empty_line_as_record() const; // Whether this file can start at a non-zero split offset. Compressed delimited files cannot be // split because the decompressor needs the stream from the beginning. virtual bool _can_split() const; diff --git a/be/src/format_v2/delimited_text/text_reader.cpp b/be/src/format_v2/delimited_text/text_reader.cpp index ed54020a1802b3..930052a14f1229 100644 --- a/be/src/format_v2/delimited_text/text_reader.cpp +++ b/be/src/format_v2/delimited_text/text_reader.cpp @@ -155,4 +155,10 @@ Status TextReader::_deserialize_one_cell(const RequestedColumn& column, IColumn* return column.serde->deserialize_one_cell_from_hive_text(*output, value, _options); } +bool TextReader::_empty_line_as_record() const { + // Hive TEXTFILE treats an empty physical line as a record. The splitter maps it + // to one empty field and missing trailing fields are filled with null_format. + return true; +} + } // namespace doris::format::text diff --git a/be/src/format_v2/delimited_text/text_reader.h b/be/src/format_v2/delimited_text/text_reader.h index a8e4e7b9f8626d..8efbfe359c7e64 100644 --- a/be/src/format_v2/delimited_text/text_reader.h +++ b/be/src/format_v2/delimited_text/text_reader.h @@ -56,6 +56,7 @@ class TextReader final : public ::doris::format::DelimitedTextReader { void _split_line_multi_char(const Slice& line); Status _deserialize_one_cell(const RequestedColumn& column, IColumn* output, Slice value) override; + bool _empty_line_as_record() const override; }; } // namespace doris::format::text diff --git a/be/test/format_v2/delimited_text/text_reader_test.cpp b/be/test/format_v2/delimited_text/text_reader_test.cpp index 2c68f501b95422..b6402cab5d86d6 100644 --- a/be/test/format_v2/delimited_text/text_reader_test.cpp +++ b/be/test/format_v2/delimited_text/text_reader_test.cpp @@ -782,32 +782,89 @@ TEST_F(TextV2ReaderTest, SkipLinesUsedWhenHeaderTypeUnset) { EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 3); } -// Scenario: empty physical lines are skipped by default, but read_csv_empty_line_as_null turns one -// empty text line into one all-null logical row through the shared delimited text base. -TEST_F(TextV2ReaderTest, EmptyLineAsNullWhenQueryOptionEnabled) { +// Scenario: Hive TEXTFILE treats an empty physical line as a record. For the first field it +// deserializes an empty value; missing trailing fields are filled with null_format. +TEST_F(TextV2ReaderTest, EmptyLineAsRecordByDefault) { const auto empty_line_path = (_test_dir / "empty_line.text").string(); std::ofstream output(empty_line_path, std::ios::binary); output << "\n"; output << "4,erin,40\n"; output.close(); - _state._query_options.__set_read_csv_empty_line_as_null(true); auto reader = create_reader(empty_line_path, &_params, _slots, &_state, &_profile); std::vector schema; ASSERT_TRUE(reader->get_schema(&schema).ok()); auto request = std::make_shared(); - request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0))}; + request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0)), + LocalColumnIndex::top_level(LocalColumnId(1)), + LocalColumnIndex::top_level(LocalColumnId(2))}; request->local_positions.emplace(LocalColumnId(0), LocalIndex(0)); + request->local_positions.emplace(LocalColumnId(1), LocalIndex(1)); + request->local_positions.emplace(LocalColumnId(2), LocalIndex(2)); ASSERT_TRUE(reader->open(request).ok()); - auto block = make_block(schema, {0}); + auto block = make_block(schema, {0, 1, 2}); size_t rows = 0; bool eof = false; ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); ASSERT_EQ(rows, 2); EXPECT_TRUE(is_null_at(*block.get_by_position(0).column, 0)); + EXPECT_TRUE(is_null_at(*block.get_by_position(1).column, 0)); + EXPECT_TRUE(is_null_at(*block.get_by_position(2).column, 0)); EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 1), 4); + EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 1), "erin"); + EXPECT_EQ(nullable_int_at(*block.get_by_position(2).column, 1), 40); +} + +// Scenario: for a single-column Hive TEXTFILE table, an empty physical line is one empty string +// field rather than a skipped row. +TEST_F(TextV2ReaderTest, EmptyLineAsSingleEmptyStringField) { + const auto empty_line_path = (_test_dir / "empty_line_single_string.text").string(); + std::ofstream output(empty_line_path, std::ios::binary); + output << "\n"; + output << "erin\n"; + output.close(); + + _params.__set_column_idxs({0}); + const std::vector slots {make_test_slot( + &_pool, 0, 0, make_nullable(std::make_shared()), "value")}; + auto reader = create_reader(empty_line_path, &_params, slots, &_state, &_profile); + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + + auto request = std::make_shared(); + request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0))}; + request->local_positions.emplace(LocalColumnId(0), LocalIndex(0)); + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_block(schema, {0}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 2); + EXPECT_FALSE(is_null_at(*block.get_by_position(0).column, 0)); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 0), ""); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 1), "erin"); +} + +// Scenario: text v2 COUNT pushdown counts empty physical lines as Hive TEXTFILE records. +TEST_F(TextV2ReaderTest, CountAggregatePreservesEmptyLines) { + const auto empty_line_path = (_test_dir / "empty_line_count.text").string(); + std::ofstream output(empty_line_path, std::ios::binary); + output << "\n"; + output << "4,erin,40\n"; + output.close(); + + auto reader = create_reader(empty_line_path, &_params, _slots, &_state, &_profile); + auto request = std::make_shared(); + ASSERT_TRUE(reader->open(request).ok()); + + FileAggregateRequest aggregate_request; + aggregate_request.agg_type = TPushAggOp::type::COUNT; + FileAggregateResult aggregate_result; + ASSERT_TRUE(reader->get_aggregate_result(aggregate_request, &aggregate_result).ok()); + EXPECT_EQ(aggregate_result.count, 2); } // Scenario: Text v2 COUNT pushdown scans rows because text files do not expose row-count metadata.