Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/exec/scan/file_scanner_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,20 @@ void FileScannerV2::_collect_profile_before_close() {
_report_condition_cache_profile();
}

bool FileScannerV2::_should_update_load_counters() const {
if (_is_load) {
return true;
}
// TVF based loads (e.g. http_stream, group commit relay) plan the load source as a
// tvf query scan without src tuple desc, so _is_load is false. But rows filtered by
// the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM
// is only reachable from such load entries, never from normal queries, so use it to
// identify these scanners.
return (_params != nullptr && _params->__isset.file_type &&
_params->file_type == TFileType::FILE_STREAM) ||
(_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM);
}

void FileScannerV2::_report_file_reader_predicate_filtered_rows() {
const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0;
const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/scan/file_scanner_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class FileScannerV2 final : public Scanner {
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
void _collect_profile_before_close() override;
bool _should_update_load_counters() const override;

private:
TFileFormatType::type _get_current_format_type() const;
Expand Down
21 changes: 14 additions & 7 deletions be/src/format_v2/delimited_text/delimited_text_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ Status DelimitedTextReader::get_aggregate_result(const FileAggregateRequest& req
}
if (line.size == 0) {
update_counter(_text_profile.empty_lines_read, 1);
if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) {
if (_empty_line_as_record() ||
(_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null())) {
++count;
}
continue;
Expand Down Expand Up @@ -595,14 +596,16 @@ Status DelimitedTextReader::_fill_columns_from_line(const Slice& line,
DORIS_CHECK(columns != nullptr);
if (line.size == 0) {
update_counter(_text_profile.empty_lines_read, 1);
if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) {
for (const auto& column : _requested_columns) {
RETURN_IF_ERROR(_append_null((*columns)[column.block_position.value()].get()));
update_counter(_text_profile.cells_deserialized, 1);
if (!_empty_line_as_record()) {
if (_runtime_state != nullptr && _runtime_state->is_read_csv_empty_line_as_null()) {
for (const auto& column : _requested_columns) {
RETURN_IF_ERROR(_append_null((*columns)[column.block_position.value()].get()));
update_counter(_text_profile.cells_deserialized, 1);
}
++(*rows);
}
++(*rows);
return Status::OK();
}
return Status::OK();
}
RETURN_IF_ERROR(_validate_line(line));

Expand Down Expand Up @@ -635,6 +638,10 @@ Slice DelimitedTextReader::_normalize_value(Slice value) const {
return value;
}

bool DelimitedTextReader::_empty_line_as_record() const {
return false;
}

bool DelimitedTextReader::_can_split() const {
return _file_compress_type == TFileCompressType::PLAIN;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/format_v2/delimited_text/delimited_text_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class DelimitedTextReader : public FileReader {
// text keeps the raw field because empty string and NULL are distinct unless null_format
// matches exactly.
virtual Slice _normalize_value(Slice value) const;
// Whether an empty physical line is one logical record. CSV keeps the existing default
// skip behavior, while Hive TEXTFILE treats an empty line as a record with one empty field.
virtual bool _empty_line_as_record() const;
// Whether this file can start at a non-zero split offset. Compressed delimited files cannot be
// split because the decompressor needs the stream from the beginning.
virtual bool _can_split() const;
Expand Down
6 changes: 6 additions & 0 deletions be/src/format_v2/delimited_text/text_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,10 @@ Status TextReader::_deserialize_one_cell(const RequestedColumn& column, IColumn*
return column.serde->deserialize_one_cell_from_hive_text(*output, value, _options);
}

bool TextReader::_empty_line_as_record() const {
// Hive TEXTFILE treats an empty physical line as a record. The splitter maps it
// to one empty field and missing trailing fields are filled with null_format.
return true;
}

} // namespace doris::format::text
1 change: 1 addition & 0 deletions be/src/format_v2/delimited_text/text_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class TextReader final : public ::doris::format::DelimitedTextReader {
void _split_line_multi_char(const Slice& line);
Status _deserialize_one_cell(const RequestedColumn& column, IColumn* output,
Slice value) override;
bool _empty_line_as_record() const override;
};

} // namespace doris::format::text
69 changes: 63 additions & 6 deletions be/test/format_v2/delimited_text/text_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,32 +782,89 @@ TEST_F(TextV2ReaderTest, SkipLinesUsedWhenHeaderTypeUnset) {
EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 0), 3);
}

// Scenario: empty physical lines are skipped by default, but read_csv_empty_line_as_null turns one
// empty text line into one all-null logical row through the shared delimited text base.
TEST_F(TextV2ReaderTest, EmptyLineAsNullWhenQueryOptionEnabled) {
// Scenario: Hive TEXTFILE treats an empty physical line as a record. For the first field it
// deserializes an empty value; missing trailing fields are filled with null_format.
TEST_F(TextV2ReaderTest, EmptyLineAsRecordByDefault) {
const auto empty_line_path = (_test_dir / "empty_line.text").string();
std::ofstream output(empty_line_path, std::ios::binary);
output << "\n";
output << "4,erin,40\n";
output.close();

_state._query_options.__set_read_csv_empty_line_as_null(true);
auto reader = create_reader(empty_line_path, &_params, _slots, &_state, &_profile);
std::vector<ColumnDefinition> schema;
ASSERT_TRUE(reader->get_schema(&schema).ok());

auto request = std::make_shared<FileScanRequest>();
request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0))};
request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0)),
LocalColumnIndex::top_level(LocalColumnId(1)),
LocalColumnIndex::top_level(LocalColumnId(2))};
request->local_positions.emplace(LocalColumnId(0), LocalIndex(0));
request->local_positions.emplace(LocalColumnId(1), LocalIndex(1));
request->local_positions.emplace(LocalColumnId(2), LocalIndex(2));
ASSERT_TRUE(reader->open(request).ok());

auto block = make_block(schema, {0});
auto block = make_block(schema, {0, 1, 2});
size_t rows = 0;
bool eof = false;
ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
ASSERT_EQ(rows, 2);
EXPECT_TRUE(is_null_at(*block.get_by_position(0).column, 0));
EXPECT_TRUE(is_null_at(*block.get_by_position(1).column, 0));
EXPECT_TRUE(is_null_at(*block.get_by_position(2).column, 0));
EXPECT_EQ(nullable_int_at(*block.get_by_position(0).column, 1), 4);
EXPECT_EQ(nullable_string_at(*block.get_by_position(1).column, 1), "erin");
EXPECT_EQ(nullable_int_at(*block.get_by_position(2).column, 1), 40);
}

// Scenario: for a single-column Hive TEXTFILE table, an empty physical line is one empty string
// field rather than a skipped row.
TEST_F(TextV2ReaderTest, EmptyLineAsSingleEmptyStringField) {
const auto empty_line_path = (_test_dir / "empty_line_single_string.text").string();
std::ofstream output(empty_line_path, std::ios::binary);
output << "\n";
output << "erin\n";
output.close();

_params.__set_column_idxs({0});
const std::vector<SlotDescriptor*> slots {make_test_slot(
&_pool, 0, 0, make_nullable(std::make_shared<DataTypeString>()), "value")};
auto reader = create_reader(empty_line_path, &_params, slots, &_state, &_profile);
std::vector<ColumnDefinition> schema;
ASSERT_TRUE(reader->get_schema(&schema).ok());

auto request = std::make_shared<FileScanRequest>();
request->non_predicate_columns = {LocalColumnIndex::top_level(LocalColumnId(0))};
request->local_positions.emplace(LocalColumnId(0), LocalIndex(0));
ASSERT_TRUE(reader->open(request).ok());

auto block = make_block(schema, {0});
size_t rows = 0;
bool eof = false;
ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
ASSERT_EQ(rows, 2);
EXPECT_FALSE(is_null_at(*block.get_by_position(0).column, 0));
EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 0), "");
EXPECT_EQ(nullable_string_at(*block.get_by_position(0).column, 1), "erin");
}

// Scenario: text v2 COUNT pushdown counts empty physical lines as Hive TEXTFILE records.
TEST_F(TextV2ReaderTest, CountAggregatePreservesEmptyLines) {
const auto empty_line_path = (_test_dir / "empty_line_count.text").string();
std::ofstream output(empty_line_path, std::ios::binary);
output << "\n";
output << "4,erin,40\n";
output.close();

auto reader = create_reader(empty_line_path, &_params, _slots, &_state, &_profile);
auto request = std::make_shared<FileScanRequest>();
ASSERT_TRUE(reader->open(request).ok());

FileAggregateRequest aggregate_request;
aggregate_request.agg_type = TPushAggOp::type::COUNT;
FileAggregateResult aggregate_result;
ASSERT_TRUE(reader->get_aggregate_result(aggregate_request, &aggregate_result).ok());
EXPECT_EQ(aggregate_result.count, 2);
}

// Scenario: Text v2 COUNT pushdown scans rows because text files do not expose row-count metadata.
Expand Down
Loading