diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index 87bc1bf4be6133..92398fe0bf5d88 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -669,6 +669,20 @@ void FileScannerV2::_collect_profile_before_close() { _report_condition_cache_profile(); } +bool FileScannerV2::_should_update_load_counters() const { + if (_is_load) { + return true; + } + // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a + // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by + // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM + // is only reachable from such load entries, never from normal queries, so use it to + // identify these scanners. + return (_params != nullptr && _params->__isset.file_type && + _params->file_type == TFileType::FILE_STREAM) || + (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM); +} + void FileScannerV2::_report_file_reader_predicate_filtered_rows() { const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0; const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows; diff --git a/be/src/exec/scan/file_scanner_v2.h b/be/src/exec/scan/file_scanner_v2.h index cfdd687837270c..7140842b12ff3d 100644 --- a/be/src/exec/scan/file_scanner_v2.h +++ b/be/src/exec/scan/file_scanner_v2.h @@ -81,6 +81,7 @@ class FileScannerV2 final : public Scanner { protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; void _collect_profile_before_close() override; + bool _should_update_load_counters() const override; private: TFileFormatType::type _get_current_format_type() const; diff --git a/be/src/format_v2/delimited_text/delimited_text_reader.cpp b/be/src/format_v2/delimited_text/delimited_text_reader.cpp index 2a7b0b5d67e097..ba4986ee531740 100644 --- a/be/src/format_v2/delimited_text/delimited_text_reader.cpp +++ b/be/src/format_v2/delimited_text/delimited_text_reader.cpp @@ -399,7 +399,8 @@ Status DelimitedTextReader::get_aggregate_result(const FileAggregateRequest& req } if (line.size == 0) { update_counter(_text_profile.empty_lines_read, 1); - if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) { + if (_empty_line_as_record() || + (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null())) { ++count; } continue; @@ -595,14 +596,16 @@ Status DelimitedTextReader::_fill_columns_from_line(const Slice& line, DORIS_CHECK(columns != nullptr); if (line.size == 0) { update_counter(_text_profile.empty_lines_read, 1); - if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) { - for (const auto& column : _requested_columns) { - RETURN_IF_ERROR(_append_null((*columns)[column.block_position.value()].get())); - update_counter(_text_profile.cells_deserialized, 1); + if (!_empty_line_as_record()) { + if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) { + for (const auto& column : _requested_columns) { + RETURN_IF_ERROR(_append_null((*columns)[column.block_position.value()].get())); + update_counter(_text_profile.cells_deserialized, 1); + } + ++(*rows); } - ++(*rows); + return Status::OK(); } - return Status::OK(); } RETURN_IF_ERROR(_validate_line(line)); @@ -635,6 +638,10 @@ Slice DelimitedTextReader::_normalize_value(Slice value) const { return value; } +bool DelimitedTextReader::_empty_line_as_record() const { + return false; +} + bool DelimitedTextReader::_can_split() const { return _file_compress_type == TFileCompressType::PLAIN; } diff --git a/be/src/format_v2/delimited_text/delimited_text_reader.h b/be/src/format_v2/delimited_text/delimited_text_reader.h index 1c6b0b0bef0fa3..06cb93dd7f7b65 100644 --- a/be/src/format_v2/delimited_text/delimited_text_reader.h +++ b/be/src/format_v2/delimited_text/delimited_text_reader.h @@ -117,6 +117,9 @@ class DelimitedTextReader : public FileReader { // text keeps the raw field because empty string and NULL are distinct unless null_format // matches exactly. virtual Slice _normalize_value(Slice value) const; + // Whether an empty physical line is one logical record. CSV keeps the existing default + // skip behavior, while Hive TEXTFILE treats an empty line as a record with one empty field. + virtual bool _empty_line_as_record() const; // Whether this file can start at a non-zero split offset. Compressed delimited files cannot be // split because the decompressor needs the stream from the beginning. virtual bool _can_split() const; diff --git a/be/src/format_v2/delimited_text/text_reader.cpp b/be/src/format_v2/delimited_text/text_reader.cpp index ed54020a1802b3..930052a14f1229 100644 --- a/be/src/format_v2/delimited_text/text_reader.cpp +++ b/be/src/format_v2/delimited_text/text_reader.cpp @@ -155,4 +155,10 @@ Status TextReader::_deserialize_one_cell(const RequestedColumn& column, IColumn* return column.serde->deserialize_one_cell_from_hive_text(*output, value, _options); } +bool TextReader::_empty_line_as_record() const { + // Hive TEXTFILE treats an empty physical line as a record. The splitter maps it + // to one empty field and missing trailing fields are filled with null_format. + return true; +} + } // namespace doris::format::text diff --git a/be/src/format_v2/delimited_text/text_reader.h b/be/src/format_v2/delimited_text/text_reader.h index a8e4e7b9f8626d..8efbfe359c7e64 100644 --- a/be/src/format_v2/delimited_text/text_reader.h +++ b/be/src/format_v2/delimited_text/text_reader.h @@ -56,6 +56,7 @@ class TextReader final : public ::doris::format::DelimitedTextReader { void _split_line_multi_char(const Slice& line); Status _deserialize_one_cell(const RequestedColumn& column, IColumn* output, Slice value) override; + bool _empty_line_as_record() const override; }; } // namespace doris::format::text diff --git a/be/test/format_v2/delimited_text/text_reader_test.cpp b/be/test/format_v2/delimited_text/text_reader_test.cpp index 2c68f501b95422..b6402cab5d86d6 100644 --- a/be/test/format_v2/delimited_text/text_reader_test.cpp +++ b/be/test/format_v2/delimited_text/text_reader_test.cpp @@ -782,32 +782,89 @@ TEST_F(TextV2ReaderTest, SkipLinesUsedWhenHeaderTypeUnset) { EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 3); } -// Scenario: empty physical lines are skipped by default, but read_csv_empty_line_as_null turns one -// empty text line into one all-null logical row through the shared delimited text base. -TEST_F(TextV2ReaderTest, EmptyLineAsNullWhenQueryOptionEnabled) { +// Scenario: Hive TEXTFILE treats an empty physical line as a record. For the first field it +// deserializes an empty value; missing trailing fields are filled with null_format. +TEST_F(TextV2ReaderTest, EmptyLineAsRecordByDefault) { const auto empty_line_path = (_test_dir / "empty_line.text").string(); std::ofstream output(empty_line_path, std::ios::binary); output << "\n"; output << "4,erin,40\n"; output.close(); - _state._query_options.__set_read_csv_empty_line_as_null(true); auto reader = create_reader(empty_line_path, &_params, _slots, &_state, &_profile); std::vector schema; ASSERT_TRUE(reader->get_schema(&schema).ok()); auto request = std::make_shared(); - request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0))}; + request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0)), + LocalColumnIndex::top_level(LocalColumnId(1)), + LocalColumnIndex::top_level(LocalColumnId(2))}; request->local_positions.emplace(LocalColumnId(0), LocalIndex(0)); + request->local_positions.emplace(LocalColumnId(1), LocalIndex(1)); + request->local_positions.emplace(LocalColumnId(2), LocalIndex(2)); ASSERT_TRUE(reader->open(request).ok()); - auto block = make_block(schema, {0}); + auto block = make_block(schema, {0, 1, 2}); size_t rows = 0; bool eof = false; ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); ASSERT_EQ(rows, 2); EXPECT_TRUE(is_null_at(*block.get_by_position(0).column, 0)); + EXPECT_TRUE(is_null_at(*block.get_by_position(1).column, 0)); + EXPECT_TRUE(is_null_at(*block.get_by_position(2).column, 0)); EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 1), 4); + EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 1), "erin"); + EXPECT_EQ(nullable_int_at(*block.get_by_position(2).column, 1), 40); +} + +// Scenario: for a single-column Hive TEXTFILE table, an empty physical line is one empty string +// field rather than a skipped row. +TEST_F(TextV2ReaderTest, EmptyLineAsSingleEmptyStringField) { + const auto empty_line_path = (_test_dir / "empty_line_single_string.text").string(); + std::ofstream output(empty_line_path, std::ios::binary); + output << "\n"; + output << "erin\n"; + output.close(); + + _params.__set_column_idxs({0}); + const std::vector slots {make_test_slot( + &_pool, 0, 0, make_nullable(std::make_shared()), "value")}; + auto reader = create_reader(empty_line_path, &_params, slots, &_state, &_profile); + std::vector schema; + ASSERT_TRUE(reader->get_schema(&schema).ok()); + + auto request = std::make_shared(); + request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0))}; + request->local_positions.emplace(LocalColumnId(0), LocalIndex(0)); + ASSERT_TRUE(reader->open(request).ok()); + + auto block = make_block(schema, {0}); + size_t rows = 0; + bool eof = false; + ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok()); + ASSERT_EQ(rows, 2); + EXPECT_FALSE(is_null_at(*block.get_by_position(0).column, 0)); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 0), ""); + EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 1), "erin"); +} + +// Scenario: text v2 COUNT pushdown counts empty physical lines as Hive TEXTFILE records. +TEST_F(TextV2ReaderTest, CountAggregatePreservesEmptyLines) { + const auto empty_line_path = (_test_dir / "empty_line_count.text").string(); + std::ofstream output(empty_line_path, std::ios::binary); + output << "\n"; + output << "4,erin,40\n"; + output.close(); + + auto reader = create_reader(empty_line_path, &_params, _slots, &_state, &_profile); + auto request = std::make_shared(); + ASSERT_TRUE(reader->open(request).ok()); + + FileAggregateRequest aggregate_request; + aggregate_request.agg_type = TPushAggOp::type::COUNT; + FileAggregateResult aggregate_result; + ASSERT_TRUE(reader->get_aggregate_result(aggregate_request, &aggregate_result).ok()); + EXPECT_EQ(aggregate_result.count, 2); } // Scenario: Text v2 COUNT pushdown scans rows because text files do not expose row-count metadata.