From 3cd6600c1c1e31c6a53590eaee61b4ffaaf3b546 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Jun 2026 11:25:50 +0800 Subject: [PATCH 1/7] [feature](be) Support json reader in file scanner v2 ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: File scanner v2 did not have a native JSON file reader. This change adds JSON as a supported v2 file format, wires it through table reader creation and hive reader schema mapping, and implements JSON parsing/materialization directly in the v2 reader without delegating to the legacy NewJsonReader path. Unit tests cover line JSON, outer array documents, json_root, jsonpaths, requested column ordering, nullable missing fields, required missing fields, strict malformed JSON errors, and ignore-malformed null rows. ### Release note Support JSON reader in file scanner v2. ### Check List (For Author) - Test: Unit Test / Manual test - Added JsonReaderTest coverage for different JSON input scenarios. - Ran git diff --check. - Ran build-support/check-format.sh. - Attempted ./run-be-ut.sh --run --filter='JsonReaderTest.*', but sandbox execution failed because nproc is unavailable, .git/modules submodule config writes are denied, and GitHub dependency download DNS is blocked. Retried with escalated permissions twice, but approval review timed out before execution. - Behavior changed: Yes. File scanner v2 can create a native JSON reader for FORMAT_JSON. - Does this need documentation: No --- be/src/exec/scan/file_scanner_v2.cpp | 10 +- be/src/format_v2/file_reader.h | 1 + be/src/format_v2/json/json_reader.cpp | 1109 ++++++++++++++++++ be/src/format_v2/json/json_reader.h | 161 +++ be/src/format_v2/table/hive_reader.cpp | 11 +- be/src/format_v2/table_reader.cpp | 14 + be/src/format_v2/table_reader.h | 1 + be/test/exec/scan/file_scanner_v2_test.cpp | 3 + be/test/format_v2/json/json_reader_test.cpp | 360 ++++++ be/test/format_v2/table/hive_reader_test.cpp | 11 + 10 files changed, 1675 insertions(+), 6 deletions(-) create mode 100644 be/src/format_v2/json/json_reader.cpp create mode 100644 be/src/format_v2/json/json_reader.h create mode 100644 be/test/format_v2/json/json_reader_test.cpp diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index 83b2c84e37c0f1..7b25f4e1c6929b 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -120,6 +120,10 @@ bool is_text_format(TFileFormatType::type format_type) { return format_type == TFileFormatType::FORMAT_TEXT; } +bool is_json_format(TFileFormatType::type format_type) { + return format_type == TFileFormatType::FORMAT_JSON; +} + bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || column_name == BeConsts::ICEBERG_ROWID_COL) { @@ -213,7 +217,8 @@ bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFile return is_supported_table_format(range); } else if (format_type == TFileFormatType::FORMAT_JNI) { return is_supported_jni_table_format(range); - } else if (is_csv_format(format_type) || is_text_format(format_type)) { + } else if (is_csv_format(format_type) || is_text_format(format_type) || + is_json_format(format_type)) { return is_supported_table_format(range); } else { LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2"; @@ -597,6 +602,9 @@ Status FileScannerV2::_to_file_format(TFileFormatType::type format_type, case TFileFormatType::FORMAT_TEXT: *file_format = format::FileFormat::TEXT; return Status::OK(); + case TFileFormatType::FORMAT_JSON: + *file_format = format::FileFormat::JSON; + return Status::OK(); default: return Status::NotSupported("FileScannerV2 does not support file format {}", to_string(format_type)); diff --git a/be/src/format_v2/file_reader.h b/be/src/format_v2/file_reader.h index 2de374b83150f1..3f192ae093a47b 100644 --- a/be/src/format_v2/file_reader.h +++ b/be/src/format_v2/file_reader.h @@ -114,6 +114,7 @@ enum class FileFormat { PARQUET, ORC, CSV, + JSON, TEXT, JNI, }; diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp new file mode 100644 index 00000000000000..f061d809554f8e --- /dev/null +++ b/be/src/format_v2/json/json_reader.cpp @@ -0,0 +1,1109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/json/json_reader.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include "common/cast_set.h" +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_array.h" +#include "core/column/column_map.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_struct.h" +#include "core/data_type/data_type_array.h" +#include "core/data_type/data_type_map.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_string.h" +#include "core/data_type/data_type_struct.h" +#include "exprs/vexpr_context.h" +#include "format/file_reader/new_plain_text_line_reader.h" +#include "format_v2/column_mapper.h" +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/fs/stream_load_pipe.h" +#include "io/fs/tracing_file_reader.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "util/decompressor.h" +#include "util/slice.h" + +namespace doris::format::json { +namespace { + +DataTypePtr nullable_type(DataTypePtr type) { + return type != nullptr && type->is_nullable() ? std::move(type) + : make_nullable(std::move(type)); +} + +DataTypePtr json_file_type_from_slot_type(const DataTypePtr& type) { + if (type == nullptr) { + return nullptr; + } + + const bool is_nullable = type->is_nullable(); + const auto nested_type = remove_nullable(type); + DataTypePtr file_type; + switch (nested_type->get_primitive_type()) { + case TYPE_CHAR: + case TYPE_VARCHAR: + file_type = std::make_shared(); + break; + case TYPE_ARRAY: { + const auto* array_type = assert_cast(nested_type.get()); + file_type = std::make_shared( + json_file_type_from_slot_type(array_type->get_nested_type())); + break; + } + case TYPE_MAP: { + const auto* map_type = assert_cast(nested_type.get()); + file_type = std::make_shared( + json_file_type_from_slot_type(map_type->get_key_type()), + json_file_type_from_slot_type(map_type->get_value_type())); + break; + } + case TYPE_STRUCT: { + const auto* struct_type = assert_cast(nested_type.get()); + DataTypes file_children; + file_children.reserve(struct_type->get_elements().size()); + for (const auto& child_type : struct_type->get_elements()) { + file_children.push_back(json_file_type_from_slot_type(child_type)); + } + file_type = + std::make_shared(file_children, struct_type->get_element_names()); + break; + } + default: + file_type = nested_type; + break; + } + + return is_nullable ? make_nullable(file_type) : file_type; +} + +ColumnDefinition synthetic_file_child(const std::string& name, DataTypePtr type, int32_t local_id); + +std::vector synthesize_file_children_from_type(const DataTypePtr& type) { + std::vector children; + if (type == nullptr) { + return children; + } + const auto nested_type = remove_nullable(type); + switch (nested_type->get_primitive_type()) { + case TYPE_ARRAY: { + const auto* array_type = assert_cast(nested_type.get()); + children.push_back(synthetic_file_child("element", array_type->get_nested_type(), 0)); + break; + } + case TYPE_MAP: { + const auto* map_type = assert_cast(nested_type.get()); + children.push_back(synthetic_file_child("key", map_type->get_key_type(), 0)); + children.push_back(synthetic_file_child("value", map_type->get_value_type(), 1)); + break; + } + case TYPE_STRUCT: { + const auto* struct_type = assert_cast(nested_type.get()); + children.reserve(struct_type->get_elements().size()); + for (size_t idx = 0; idx < struct_type->get_elements().size(); ++idx) { + children.push_back(synthetic_file_child(struct_type->get_element_name(idx), + struct_type->get_element(idx), + cast_set(idx))); + } + break; + } + default: + break; + } + return children; +} + +ColumnDefinition synthetic_file_child(const std::string& name, DataTypePtr type, int32_t local_id) { + ColumnDefinition child; + child.identifier = Field::create_field(name); + child.local_id = local_id; + child.name = name; + child.type = std::move(type); + child.children = synthesize_file_children_from_type(child.type); + return child; +} + +std::string lower_key(std::string_view key) { + std::string lowered(key.data(), key.size()); + std::transform(lowered.begin(), lowered.end(), lowered.begin(), ::tolower); + return lowered; +} + +} // namespace + +JsonReader::JsonReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile, + const TFileScanRangeParams* scan_params, const TFileRangeDesc& range, + const std::vector& file_slot_descs, + TFileCompressType::type range_compress_type, + std::optional stream_load_id) + : FileReader(system_properties, file_description, std::move(io_ctx), profile), + _scan_params(scan_params), + _range(range), + _source_file_slot_descs(file_slot_descs), + _range_compress_type(range_compress_type), + _stream_load_id(std::move(stream_load_id)) {} + +JsonReader::~JsonReader() { + static_cast(close()); +} + +Status JsonReader::init(RuntimeState* state) { + _runtime_state = state; + if (_scan_params == nullptr) { + return Status::InvalidArgument("JSON v2 reader requires scan params"); + } + if (_file_description == nullptr) { + return Status::InvalidArgument("JSON v2 reader requires file description"); + } + if (_runtime_state == nullptr) { + return Status::InvalidArgument("JSON v2 reader requires runtime state"); + } + if (!_scan_params->__isset.file_attributes) { + return Status::InvalidArgument("JSON v2 reader requires file attributes"); + } + + const auto& attributes = _scan_params->file_attributes; + if (attributes.__isset.text_params && attributes.text_params.__isset.line_delimiter) { + _line_delimiter = attributes.text_params.line_delimiter; + } else { + _line_delimiter = "\n"; + } + _line_delimiter_length = _line_delimiter.size(); + _jsonpaths = attributes.__isset.jsonpaths ? attributes.jsonpaths : ""; + _json_root = attributes.__isset.json_root ? attributes.json_root : ""; + _read_json_by_line = attributes.__isset.read_json_by_line && attributes.read_json_by_line; + _strip_outer_array = attributes.__isset.strip_outer_array && attributes.strip_outer_array; + _num_as_string = attributes.__isset.num_as_string && attributes.num_as_string; + _fuzzy_parse = attributes.__isset.fuzzy_parse && attributes.fuzzy_parse; + _openx_json_ignore_malformed = attributes.__isset.openx_json_ignore_malformed && + attributes.openx_json_ignore_malformed; + _is_hive_table = _range.table_format_params.table_format_type == "hive"; + _file_compress_type = _range_compress_type != TFileCompressType::UNKNOWN + ? _range_compress_type + : _scan_params->compress_type; + + _source_serdes = create_data_type_serdes(_source_file_slot_descs); + _file_schema.clear(); + _file_schema.reserve(_source_file_slot_descs.size()); + for (size_t idx = 0; idx < _source_file_slot_descs.size(); ++idx) { + const auto* slot = _source_file_slot_descs[idx]; + DORIS_CHECK(slot != nullptr); + ColumnDefinition field; + field.identifier = Field::create_field(slot->col_name()); + field.local_id = cast_set(idx); + field.name = slot->col_name(); + field.type = nullable_type(json_file_type_from_slot_type(slot->get_data_type_ptr())); + field.children = synthesize_file_children_from_type(field.type); + _file_schema.push_back(std::move(field)); + } + _eof = false; + return Status::OK(); +} + +Status JsonReader::get_schema(std::vector* file_schema) const { + if (file_schema == nullptr) { + return Status::InvalidArgument("JSON v2 file_schema is null"); + } + *file_schema = _file_schema; + return Status::OK(); +} + +std::unique_ptr JsonReader::create_column_mapper( + TableColumnMapperOptions options) const { + return std::make_unique(std::move(options)); +} + +Status JsonReader::open(std::shared_ptr request) { + RETURN_IF_ERROR(FileReader::open(std::move(request))); + DORIS_CHECK(_request != nullptr); + RETURN_IF_ERROR(_build_requested_columns(*_request, &_requested_columns)); + _slot_name_to_index.clear(); + _slot_name_to_index.reserve(_requested_columns.size()); + for (size_t idx = 0; idx < _requested_columns.size(); ++idx) { + auto name = _requested_columns[idx].slot_desc->col_name(); + _slot_name_to_index.emplace(_is_hive_table ? lower_key(name) : name, idx); + } + _previous_positions.clear(); + _reader_range = _json_range(); + RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_create_decompressor()); + if (_read_json_by_line) { + RETURN_IF_ERROR(_create_line_reader()); + } + RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); + _json_parser = std::make_unique(); + _padding_buffer.resize(_padded_size); + _reader_eof = false; + _single_document_read = false; + _eof = false; + return Status::OK(); +} + +Status JsonReader::get_block(Block* file_block, size_t* rows, bool* eof) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + DORIS_CHECK(eof != nullptr); + if (_json_parser == nullptr || _physical_file_reader == nullptr) { + return Status::InternalError("JSON v2 reader is not open"); + } + + const auto batch_size = _runtime_state->batch_size(); + const auto max_block_bytes = _runtime_state->preferred_block_size_bytes(); + *rows = 0; + *eof = false; + + while (file_block->rows() < batch_size && !_reader_eof && + file_block->bytes() < max_block_bytes) { + if (_read_json_by_line && _skip_first_line) { + size_t skipped_size = 0; + const uint8_t* skipped_line = nullptr; + RETURN_IF_ERROR(_line_reader->read_line(&skipped_line, &skipped_size, &_reader_eof, + _io_ctx.get())); + _skip_first_line = false; + continue; + } + + const size_t original_rows = file_block->rows(); + size_t size = 0; + bool is_empty_row = false; + Status st = Status::OK(); + try { + st = _parse_next_json(&size, &_reader_eof); + if (st.ok() && !_reader_eof && size > 0) { + st = _extract_json_value(size, &_reader_eof, &is_empty_row); + } + if (st.ok() && !_reader_eof && !is_empty_row) { + st = _append_rows_from_current_value(file_block, &is_empty_row, &_reader_eof); + } + } catch (simdjson::simdjson_error& e) { + st = Status::DataQualityError("Parse json data failed. code: {}, error info: {}", + e.error(), e.what()); + } + if (!st.ok()) { + RETURN_IF_ERROR(_handle_json_error(st, file_block, original_rows, &is_empty_row)); + } + if (!is_empty_row && file_block->rows() == original_rows) { + break; + } + } + + *rows = file_block->rows(); + RETURN_IF_ERROR(_apply_filters(file_block, rows)); + _reader_statistics.read_rows += *rows; + *eof = _reader_eof && *rows == 0; + _eof = *eof; + return Status::OK(); +} + +Status JsonReader::close() { + if (_line_reader != nullptr) { + _line_reader->close(); + _line_reader.reset(); + } + _json_parser.reset(); + _decompressor.reset(); + _physical_file_reader.reset(); + _tracing_file_reader.reset(); + _file_reader.reset(); + _requested_columns.clear(); + _slot_name_to_index.clear(); + _previous_positions.clear(); + _cached_string_values.clear(); + return Status::OK(); +} + +Status JsonReader::_build_requested_columns(const FileScanRequest& request, + std::vector* columns) const { + DORIS_CHECK(columns != nullptr); + columns->clear(); + std::vector by_position(request.local_positions.size()); + for (const auto& [file_column_id, block_position] : request.local_positions) { + if (file_column_id.value() < 0 || + static_cast(file_column_id.value()) >= _source_file_slot_descs.size()) { + return Status::InvalidArgument("JSON v2 request references unknown local column id {}", + file_column_id.value()); + } + if (block_position.value() >= by_position.size()) { + return Status::InvalidArgument("JSON v2 request has invalid block position {}", + block_position.value()); + } + const auto source_index = cast_set(file_column_id.value()); + RequestedColumn requested_column; + requested_column.file_column_id = file_column_id; + requested_column.block_position = block_position; + requested_column.source_index = source_index; + requested_column.slot_desc = _source_file_slot_descs[source_index]; + requested_column.serde = _source_serdes[source_index]; + by_position[block_position.value()] = std::move(requested_column); + } + for (size_t pos = 0; pos < by_position.size(); ++pos) { + if (!by_position[pos].file_column_id.is_valid()) { + return Status::InvalidArgument("JSON v2 request misses block position {}", pos); + } + } + *columns = std::move(by_position); + return Status::OK(); +} + +TFileRangeDesc JsonReader::_json_range() const { + auto range = _range; + range.__set_path(_file_description->path); + range.__set_start_offset(_file_description->range_start_offset); + range.__set_size(_file_description->range_size); + if (_file_description->file_size >= 0) { + range.__set_file_size(_file_description->file_size); + } + if (!_file_description->fs_name.empty()) { + range.__set_fs_name(_file_description->fs_name); + } + range.__set_file_cache_admission(_file_description->file_cache_admission); + if (_range_compress_type != TFileCompressType::UNKNOWN) { + range.__set_compress_type(_range_compress_type); + } + if (_stream_load_id.has_value()) { + range.__set_load_id(*_stream_load_id); + } + return range; +} + +Status JsonReader::_open_file_reader() { + _current_offset = _reader_range.start_offset; + if (_current_offset != 0) { + --_current_offset; + } + if (_scan_params->file_type == TFileType::FILE_STREAM) { + if (!_stream_load_id.has_value()) { + return Status::InvalidArgument("JSON v2 stream reader requires load id"); + } + RETURN_IF_ERROR(FileFactory::create_pipe_reader(*_stream_load_id, &_physical_file_reader, + _runtime_state, /*need_schema=*/false)); + } else { + _file_description->mtime = + _reader_range.__isset.modification_time ? _reader_range.modification_time : 0; + auto reader_options = FileFactory::get_reader_options(_runtime_state->query_options(), + *_file_description); + auto file_reader = DORIS_TRY(FileFactory::create_file_reader( + *_system_properties, *_file_description, reader_options, _profile)); + _physical_file_reader = + _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), + _io_ctx->file_reader_stats) + : file_reader; + } + _file_reader = _physical_file_reader; + _tracing_file_reader = _physical_file_reader; + return Status::OK(); +} + +Status JsonReader::_create_decompressor() { + return Decompressor::create_decompressor(_file_compress_type, &_decompressor); +} + +Status JsonReader::_create_line_reader() { + int64_t size = _reader_range.size; + if (_reader_range.start_offset != 0) { + ++size; + _skip_first_line = true; + } else { + _skip_first_line = false; + } + _line_reader = NewPlainTextLineReader::create_unique( + _profile, _physical_file_reader, _decompressor.get(), + std::make_shared(_line_delimiter, _line_delimiter_length, + false), + size, _current_offset); + return Status::OK(); +} + +Status JsonReader::_parse_jsonpath_and_json_root() { + _parsed_jsonpaths.clear(); + _parsed_json_root.clear(); + if (!_jsonpaths.empty()) { + rapidjson::Document jsonpaths_doc; + if (jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError() || + !jsonpaths_doc.IsArray()) { + return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); + } + for (int i = 0; i < jsonpaths_doc.Size(); ++i) { + const rapidjson::Value& path = jsonpaths_doc[i]; + if (!path.IsString()) { + return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); + } + std::string json_path = path.GetString(); + if (json_path.size() == 1 && json_path[0] == '$') { + json_path.insert(1, "."); + } + std::vector parsed_paths; + JsonFunctions::parse_json_paths(json_path, &parsed_paths); + _parsed_jsonpaths.push_back(std::move(parsed_paths)); + } + } + if (!_json_root.empty()) { + std::string json_root = _json_root; + if (json_root.size() == 1 && json_root[0] == '$') { + json_root.insert(1, "."); + } + JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); + } + return Status::OK(); +} + +Status JsonReader::_read_one_document(size_t* size, bool* eof) { + DORIS_CHECK(size != nullptr); + DORIS_CHECK(eof != nullptr); + *size = 0; + *eof = false; + if (_line_reader != nullptr) { + const uint8_t* line = nullptr; + RETURN_IF_ERROR(_line_reader->read_line(&line, size, eof, _io_ctx.get())); + if (*eof) { + return Status::OK(); + } + _document_buffer.assign(reinterpret_cast(line), *size); + return Status::OK(); + } + if (_single_document_read) { + *eof = true; + return Status::OK(); + } + _single_document_read = true; + if (_scan_params->file_type == TFileType::FILE_STREAM) { + return _read_one_document_from_pipe(size); + } + + auto read_size = _reader_range.size; + if (read_size <= 0 && _reader_range.__isset.file_size) { + read_size = _reader_range.file_size - _current_offset; + } + if (read_size <= 0) { + *eof = true; + return Status::OK(); + } + _document_buffer.resize(cast_set(read_size)); + Slice result(_document_buffer.data(), _document_buffer.size()); + RETURN_IF_ERROR(_physical_file_reader->read_at(_current_offset, result, size, _io_ctx.get())); + _document_buffer.resize(*size); + if (*size == 0) { + *eof = true; + } + return Status::OK(); +} + +Status JsonReader::_read_one_document_from_pipe(size_t* read_size) { + auto* stream_load_pipe = dynamic_cast(_physical_file_reader.get()); + if (stream_load_pipe == nullptr) { + return Status::InternalError("JSON v2 stream reader requires StreamLoadPipe"); + } + DorisUniqueBufferPtr file_buf; + RETURN_IF_ERROR(stream_load_pipe->read_one_message(&file_buf, read_size)); + _document_buffer.assign(reinterpret_cast(file_buf.get()), *read_size); + if (!stream_load_pipe->is_chunked_transfer()) { + return Status::OK(); + } + + while (true) { + DorisUniqueBufferPtr next_buf; + size_t next_size = 0; + RETURN_IF_ERROR(stream_load_pipe->read_one_message(&next_buf, &next_size)); + if (next_size == 0) { + break; + } + _document_buffer.append(reinterpret_cast(next_buf.get()), next_size); + *read_size += next_size; + } + return Status::OK(); +} + +Status JsonReader::_parse_next_json(size_t* size, bool* eof) { + RETURN_IF_ERROR(_read_one_document(size, eof)); + if (*eof) { + return Status::OK(); + } + if (*size >= 3 && static_cast(_document_buffer[0]) == 0xEF && + static_cast(_document_buffer[1]) == 0xBB && + static_cast(_document_buffer[2]) == 0xBF) { + _document_buffer.erase(0, 3); + *size -= 3; + } + if (*size + simdjson::SIMDJSON_PADDING > _padded_size) { + _padded_size = *size + simdjson::SIMDJSON_PADDING; + _padding_buffer.resize(_padded_size); + } + std::memcpy(_padding_buffer.data(), _document_buffer.data(), *size); + _original_doc_size = *size; + const auto error = + _json_parser->iterate(std::string_view(_padding_buffer.data(), *size), _padded_size) + .get(_original_json_doc); + if (error != simdjson::error_code::SUCCESS) { + return Status::DataQualityError( + "Parse json data for JsonDoc failed. code: {}, error info: {}", error, + simdjson::error_message(error)); + } + return Status::OK(); +} + +Status JsonReader::_extract_json_value(size_t size, bool* eof, bool* is_empty_row) { + DORIS_CHECK(eof != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + *is_empty_row = false; + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + auto type_res = _original_json_doc.type(); + if (type_res.error() != simdjson::error_code::SUCCESS) { + return Status::DataQualityError( + "Parse json data for JsonDoc failed. code: {}, error info: {}", type_res.error(), + simdjson::error_message(type_res.error())); + } + const auto type = type_res.value(); + if (type != simdjson::ondemand::json_type::object && + type != simdjson::ondemand::json_type::array) { + return Status::DataQualityError("Not an json object or json array"); + } + _parsed_from_json_root = false; + if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) { + simdjson::ondemand::object object = _original_json_doc; + Status st = JsonFunctions::extract_from_object(object, _parsed_json_root, &_json_value); + if (!st.ok()) { + return Status::DataQualityError("{}", st.to_string()); + } + _parsed_from_json_root = true; + } else { + _json_value = _original_json_doc; + } + + const auto value_type = _json_value.type().value(); + if (value_type == simdjson::ondemand::json_type::array && !_strip_outer_array) { + return Status::DataQualityError( + "JSON data is array-object, `strip_outer_array` must be TRUE."); + } + if (value_type != simdjson::ondemand::json_type::array && _strip_outer_array) { + return Status::DataQualityError( + "JSON data is not an array-object, `strip_outer_array` must be FALSE."); + } + if (!_parsed_jsonpaths.empty() && _strip_outer_array && + _json_value.count_elements().value() == 0) { + *is_empty_row = true; + } + return Status::OK(); +} + +Status JsonReader::_append_rows_from_current_value(Block* block, bool* is_empty_row, bool* eof) { + if (_parsed_jsonpaths.empty()) { + return _append_simple_json_rows(block, is_empty_row, eof); + } + if (_strip_outer_array) { + return _append_flat_array_jsonpath_rows(block, is_empty_row, eof); + } + return _append_nested_jsonpath_row(block, is_empty_row, eof); +} + +Status JsonReader::_append_simple_json_rows(Block* block, bool* is_empty_row, bool* eof) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + DORIS_CHECK(eof != nullptr); + bool valid = false; + if (_json_value.type().value() == simdjson::ondemand::json_type::array) { + _array = _json_value.get_array(); + if (_array.count_elements() == 0) { + *is_empty_row = true; + return Status::OK(); + } + _array_iter = _array.begin(); + while (_array_iter != _array.end()) { + simdjson::ondemand::object object_value = (*_array_iter).get_object(); + RETURN_IF_ERROR(_set_column_values_from_object(&object_value, block, &valid)); + ++_array_iter; + if (!valid) { + *is_empty_row = true; + return Status::OK(); + } + } + } else { + simdjson::ondemand::object object_value = _json_value.get_object(); + RETURN_IF_ERROR(_set_column_values_from_object(&object_value, block, &valid)); + if (!valid) { + *is_empty_row = true; + return Status::OK(); + } + } + *is_empty_row = false; + return Status::OK(); +} + +Status JsonReader::_append_flat_array_jsonpath_rows(Block* block, bool* is_empty_row, bool* eof) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + DORIS_CHECK(eof != nullptr); + const size_t original_rows = block->rows(); + bool valid = true; + _array = _json_value.get_array(); + _array_iter = _array.begin(); + while (_array_iter != _array.end()) { + simdjson::ondemand::object object_value = (*_array_iter).get_object(); + if (!_parsed_from_json_root && !_parsed_json_root.empty()) { + simdjson::ondemand::value rooted_value; + Status st = JsonFunctions::extract_from_object(object_value, _parsed_json_root, + &rooted_value); + if (!st.ok()) { + if (st.is()) { + ++_array_iter; + continue; + } + return st; + } + if (rooted_value.type().value() != simdjson::ondemand::json_type::object) { + ++_array_iter; + continue; + } + object_value = rooted_value.get_object(); + } + RETURN_IF_ERROR(_write_columns_by_jsonpath(&object_value, block, &valid)); + ++_array_iter; + } + *is_empty_row = block->rows() == original_rows; + return Status::OK(); +} + +Status JsonReader::_append_nested_jsonpath_row(Block* block, bool* is_empty_row, bool* eof) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + DORIS_CHECK(eof != nullptr); + if (_json_value.type().value() != simdjson::ondemand::json_type::object) { + return Status::DataQualityError("Not object item"); + } + bool valid = true; + simdjson::ondemand::object object_value = _json_value.get_object(); + RETURN_IF_ERROR(_write_columns_by_jsonpath(&object_value, block, &valid)); + *is_empty_row = !valid; + return Status::OK(); +} + +Status JsonReader::_set_column_values_from_object(simdjson::ondemand::object* object_value, + Block* block, bool* valid) { + DORIS_CHECK(object_value != nullptr); + DORIS_CHECK(block != nullptr); + DORIS_CHECK(valid != nullptr); + std::vector seen_columns(block->columns(), false); + const size_t cur_row_count = block->rows(); + bool has_valid_value = false; + size_t key_index = 0; + + for (auto field : *object_value) { + std::string_view key = field.unescaped_key().value(); + const size_t column_index = _column_index(key, key_index++); + if (column_index == static_cast(-1)) { + continue; + } + if (seen_columns[column_index]) { + if (_is_hive_table) { + _pop_back_last_inserted_value(block, column_index); + } else { + continue; + } + } + simdjson::ondemand::value value = field.value().value(); + const auto& requested = _requested_columns[column_index]; + auto* column_ptr = block->get_by_position(column_index).column->assert_mutable().get(); + RETURN_IF_ERROR(_write_data_to_column( + value, requested.slot_desc->get_data_type_ptr(), column_ptr, + requested.slot_desc->col_name(), requested.serde, valid)); + if (!*valid) { + return Status::OK(); + } + seen_columns[column_index] = true; + has_valid_value = true; + } + + for (size_t i = 0; i < _requested_columns.size(); ++i) { + if (seen_columns[i]) { + continue; + } + auto* column_ptr = block->get_by_position(i).column->assert_mutable().get(); + RETURN_IF_ERROR(_fill_missing_column(_requested_columns[i], column_ptr, valid)); + if (!*valid) { + _truncate_block_to_rows(block, cur_row_count); + return Status::OK(); + } + } + *valid = true; + if (!has_valid_value) { + return Status::OK(); + } + return Status::OK(); +} + +Status JsonReader::_write_columns_by_jsonpath(simdjson::ondemand::object* object_value, + Block* block, bool* valid) { + DORIS_CHECK(object_value != nullptr); + DORIS_CHECK(block != nullptr); + DORIS_CHECK(valid != nullptr); + bool has_valid_value = false; + const size_t cur_row_count = block->rows(); + _cached_string_values.clear(); + + for (size_t i = 0; i < _requested_columns.size(); ++i) { + const auto& requested = _requested_columns[i]; + auto* column_ptr = block->get_by_position(i).column->assert_mutable().get(); + simdjson::ondemand::value json_value; + Status st = Status::OK(); + if (requested.source_index < _parsed_jsonpaths.size()) { + st = JsonFunctions::extract_from_object( + *object_value, _parsed_jsonpaths[requested.source_index], &json_value); + if (!st.ok() && !st.is()) { + return st; + } + } + if (_is_root_path_for_column(requested)) { + if (is_column_nullable(*column_ptr)) { + auto* nullable_column = assert_cast(column_ptr); + nullable_column->get_null_map_data().push_back(0); + auto* column_string = + assert_cast(nullable_column->get_nested_column_ptr().get()); + column_string->insert_data(_padding_buffer.data(), _original_doc_size); + } else { + auto* column_string = assert_cast(column_ptr); + column_string->insert_data(_padding_buffer.data(), _original_doc_size); + } + has_valid_value = true; + } else if (requested.source_index >= _parsed_jsonpaths.size() || + st.is()) { + RETURN_IF_ERROR(_fill_missing_column(requested, column_ptr, valid)); + if (!*valid) { + _truncate_block_to_rows(block, cur_row_count); + return Status::OK(); + } + } else { + RETURN_IF_ERROR(_write_data_to_column( + json_value, requested.slot_desc->get_data_type_ptr(), column_ptr, + requested.slot_desc->col_name(), requested.serde, valid)); + if (!*valid) { + _truncate_block_to_rows(block, cur_row_count); + return Status::OK(); + } + has_valid_value = true; + } + } + + if (!has_valid_value) { + _truncate_block_to_rows(block, cur_row_count); + *valid = false; + return Status::OK(); + } + *valid = true; + return Status::OK(); +} + +template +Status JsonReader::_write_data_to_column(simdjson::ondemand::value& value, + const DataTypePtr& type_desc, IColumn* column_ptr, + const std::string& column_name, + const DataTypeSerDeSPtr& serde, bool* valid) { + ColumnNullable* nullable_column = nullptr; + IColumn* data_column_ptr = column_ptr; + DataTypeSerDeSPtr data_serde = serde; + const auto value_type = value.type().value(); + + if (is_column_nullable(*column_ptr)) { + nullable_column = assert_cast(column_ptr); + data_column_ptr = nullable_column->get_nested_column().get_ptr().get(); + data_serde = serde->get_nested_serdes()[0]; + if (value_type == simdjson::ondemand::json_type::null) { + nullable_column->insert_default(); + *valid = true; + return Status::OK(); + } + } else if (value_type == simdjson::ondemand::json_type::null) { + return Status::DataQualityError("Json value is null, but the column `{}` is not nullable.", + column_name); + } + + const auto primitive_type = type_desc->get_primitive_type(); + if (!is_complex_type(primitive_type)) { + if (value_type == simdjson::ondemand::json_type::string) { + std::string_view value_string; + if constexpr (use_string_cache) { + const auto cache_key = value.raw_json().value(); + if (_cached_string_values.contains(cache_key)) { + value_string = _cached_string_values[cache_key]; + } else { + value_string = value.get_string(); + _cached_string_values.emplace(cache_key, value_string); + } + } else { + value_string = value.get_string(); + } + Slice slice {value_string.data(), value_string.size()}; + RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, + _serde_options)); + } else if (value_type == simdjson::ondemand::json_type::boolean) { + const char* str_value = value.get_bool() ? "1" : "0"; + Slice slice {str_value, 1}; + RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, + _serde_options)); + } else { + std::string_view json_str = simdjson::to_json_string(value); + Slice slice {json_str.data(), json_str.size()}; + RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, + _serde_options)); + } + } else if (primitive_type == TYPE_STRUCT) { + if (value_type != simdjson::ondemand::json_type::object) { + return Status::DataQualityError( + "Json value isn't object, but the column `{}` is struct.", column_name); + } + const auto* type_struct = + assert_cast(remove_nullable(type_desc).get()); + auto* struct_column_ptr = assert_cast(data_column_ptr); + const auto sub_serdes = data_serde->get_nested_serdes(); + std::map sub_col_name_to_idx; + for (size_t sub_col_idx = 0; sub_col_idx < type_struct->get_elements().size(); + ++sub_col_idx) { + sub_col_name_to_idx.emplace(lower_key(type_struct->get_element_name(sub_col_idx)), + sub_col_idx); + } + std::vector has_value(type_struct->get_elements().size(), false); + simdjson::ondemand::object struct_value = value.get_object(); + for (auto sub : struct_value) { + const auto sub_key = lower_key(sub.unescaped_key().value()); + const auto it = sub_col_name_to_idx.find(sub_key); + if (it == sub_col_name_to_idx.end()) { + continue; + } + const auto sub_column_idx = it->second; + auto sub_column_ptr = struct_column_ptr->get_column(sub_column_idx).get_ptr(); + if (has_value[sub_column_idx]) { + sub_column_ptr->pop_back(1); + } + has_value[sub_column_idx] = true; + auto sub_value = sub.value().value(); + RETURN_IF_ERROR(_write_data_to_column( + sub_value, type_struct->get_element(sub_column_idx), sub_column_ptr.get(), + column_name + "." + sub_key, sub_serdes[sub_column_idx], valid)); + } + for (size_t sub_col_idx = 0; sub_col_idx < type_struct->get_elements().size(); + ++sub_col_idx) { + if (has_value[sub_col_idx]) { + continue; + } + auto sub_column_ptr = struct_column_ptr->get_column(sub_col_idx).get_ptr(); + if (!is_column_nullable(*sub_column_ptr)) { + return Status::DataQualityError( + "Json file structColumn miss field {} and this column isn't nullable.", + column_name + "." + type_struct->get_element_name(sub_col_idx)); + } + sub_column_ptr->insert_default(); + } + } else if (primitive_type == TYPE_MAP) { + if (value_type != simdjson::ondemand::json_type::object) { + return Status::DataQualityError("Json value isn't object, but the column `{}` is map.", + column_name); + } + const auto* map_type = assert_cast(remove_nullable(type_desc).get()); + auto* map_column_ptr = assert_cast(data_column_ptr); + const auto sub_serdes = data_serde->get_nested_serdes(); + size_t field_count = 0; + simdjson::ondemand::object object_value = value.get_object(); + for (auto member_value : object_value) { + auto* key_column = map_column_ptr->get_keys_ptr()->assert_mutable()->get_ptr().get(); + auto key_serde = sub_serdes[0]; + if (is_column_nullable(*key_column)) { + auto* nullable_key = assert_cast(key_column); + nullable_key->get_null_map_data().push_back(0); + key_column = nullable_key->get_nested_column().get_ptr().get(); + key_serde = key_serde->get_nested_serdes()[0]; + } + std::string_view key_view = member_value.unescaped_key().value(); + Slice key_slice(key_view.data(), key_view.size()); + RETURN_IF_ERROR(key_serde->deserialize_one_cell_from_json(*key_column, key_slice, + _serde_options)); + simdjson::ondemand::value field_value = member_value.value().value(); + RETURN_IF_ERROR(_write_data_to_column( + field_value, map_type->get_value_type(), + map_column_ptr->get_values_ptr()->assert_mutable()->get_ptr().get(), + column_name + ".value", sub_serdes[1], valid)); + ++field_count; + } + auto& offsets = map_column_ptr->get_offsets(); + offsets.emplace_back(offsets.back() + field_count); + } else if (primitive_type == TYPE_ARRAY) { + if (value_type != simdjson::ondemand::json_type::array) { + return Status::DataQualityError("Json value isn't array, but the column `{}` is array.", + column_name); + } + const auto* array_type = + assert_cast(remove_nullable(type_desc).get()); + auto* array_column_ptr = assert_cast(data_column_ptr); + const auto sub_serdes = data_serde->get_nested_serdes(); + size_t field_count = 0; + simdjson::ondemand::array array_value = value.get_array(); + for (simdjson::ondemand::value sub_value : array_value) { + RETURN_IF_ERROR(_write_data_to_column( + sub_value, array_type->get_nested_type(), + array_column_ptr->get_data().get_ptr().get(), column_name + ".element", + sub_serdes[0], valid)); + ++field_count; + } + auto& offsets = array_column_ptr->get_offsets(); + offsets.emplace_back(offsets.back() + field_count); + } else { + return Status::InternalError("Not support JSON value to complex column"); + } + + if (nullable_column && value_type != simdjson::ondemand::json_type::null) { + nullable_column->get_null_map_data().push_back(0); + } + *valid = true; + return Status::OK(); +} + +Status JsonReader::_fill_missing_column(const RequestedColumn& column, IColumn* column_ptr, + bool* valid) { + if (column.slot_desc->is_nullable()) { + auto* nullable_column = assert_cast(column_ptr); + nullable_column->insert_default(); + *valid = true; + return Status::OK(); + } + return Status::DataQualityError( + "The column `{}` is not nullable, but it's not found in jsondata.", + column.slot_desc->col_name()); +} + +Status JsonReader::_append_null_for_malformed_json(Block* block) { + DORIS_CHECK(block != nullptr); + for (int i = 0; i < block->columns(); ++i) { + auto& column_with_type = block->get_by_position(i); + if (!is_column_nullable(*column_with_type.column)) { + return Status::DataQualityError("malformed json, but the column `{}` is not nullable.", + column_with_type.column->get_name()); + } + auto column = IColumn::mutate(std::move(column_with_type.column)); + assert_cast(column.get())->insert_default(); + column_with_type.column = std::move(column); + } + return Status::OK(); +} + +Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t original_rows, + bool* is_empty_row) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + _truncate_block_to_rows(block, original_rows); + if (_openx_json_ignore_malformed && status.is()) { + RETURN_IF_ERROR(_append_null_for_malformed_json(block)); + *is_empty_row = false; + return Status::OK(); + } + return status; +} + +Status JsonReader::_apply_filters(Block* file_block, size_t* rows) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + const size_t rows_before_filter = *rows; + size_t rows_after_delete_filter = rows_before_filter; + if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) { + RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block, + file_block->columns())); + rows_after_delete_filter = + file_block->columns() == 0 ? rows_before_filter : file_block->rows(); + } + + size_t rows_after_filter = rows_after_delete_filter; + if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) { + RETURN_IF_ERROR( + VExprContext::filter_block(_request->conjuncts, file_block, file_block->columns())); + rows_after_filter = + file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); + if (_io_ctx != nullptr) { + _io_ctx->predicate_filtered_rows += rows_after_delete_filter - rows_after_filter; + } + } + *rows = rows_after_filter; + return Status::OK(); +} + +void JsonReader::_truncate_block_to_rows(Block* block, size_t num_rows) { + DORIS_CHECK(block != nullptr); + for (int i = 0; i < block->columns(); ++i) { + auto& column_with_type = block->get_by_position(i); + auto column = IColumn::mutate(std::move(column_with_type.column)); + if (column->size() > num_rows) { + column->pop_back(column->size() - num_rows); + } + column_with_type.column = std::move(column); + } +} + +void JsonReader::_pop_back_last_inserted_value(Block* block, size_t column_index) { + DORIS_CHECK(block != nullptr); + auto& column = block->get_by_position(column_index).column; + auto mutable_column = IColumn::mutate(std::move(column)); + mutable_column->pop_back(1); + column = std::move(mutable_column); +} + +size_t JsonReader::_column_index(std::string_view key, size_t key_index) { + std::string hive_key; + std::string_view lookup_key = key; + if (_is_hive_table) { + hive_key = lower_key(key); + lookup_key = hive_key; + } + if (key_index < _previous_positions.size()) { + const auto previous = _previous_positions[key_index]; + if (previous < _requested_columns.size()) { + const auto previous_name = _requested_columns[previous].slot_desc->col_name(); + if ((_is_hive_table ? lower_key(previous_name) : previous_name) == lookup_key) { + return previous; + } + } + } + const auto it = _slot_name_to_index.find(std::string(lookup_key)); + if (it == _slot_name_to_index.end()) { + return static_cast(-1); + } + if (key_index >= _previous_positions.size()) { + _previous_positions.resize(key_index + 1, static_cast(-1)); + } + _previous_positions[key_index] = it->second; + return it->second; +} + +bool JsonReader::_is_root_path_for_column(const RequestedColumn& column) const { + return column.source_index < _parsed_jsonpaths.size() && + JsonFunctions::is_root_path(_parsed_jsonpaths[column.source_index]); +} + +} // namespace doris::format::json diff --git a/be/src/format_v2/json/json_reader.h b/be/src/format_v2/json/json_reader.h new file mode 100644 index 00000000000000..fc78eb2f71bdbd --- /dev/null +++ b/be/src/format_v2/json/json_reader.h @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include // IWYU pragma: keep + +#include +#include +#include +#include +#include +#include + +#include "core/custom_allocator.h" +#include "core/data_type_serde/data_type_serde.h" +#include "exprs/json_functions.h" +#include "format_v2/file_reader.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/runtime_profile.h" + +namespace doris { +class Decompressor; +class LineReader; +class SlotDescriptor; +class IColumn; +} // namespace doris + +namespace doris::format::json { + +// FileScannerV2 JSON reader. +// +// JSON files do not carry an embedded physical schema. The v2 table layer still needs a +// file-local schema and FileScanRequest contract, so this reader exposes FE-provided file slots as +// v2 file-local columns and performs JSON parsing/materialization directly in the v2 path. +class JsonReader final : public FileReader { +public: + JsonReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile, + const TFileScanRangeParams* scan_params, const TFileRangeDesc& range, + const std::vector& file_slot_descs, + TFileCompressType::type range_compress_type = TFileCompressType::UNKNOWN, + std::optional stream_load_id = std::nullopt); + ~JsonReader() override; + + Status init(RuntimeState* state) override; + Status get_schema(std::vector* file_schema) const override; + std::unique_ptr create_column_mapper( + TableColumnMapperOptions options) const override; + Status open(std::shared_ptr request) override; + Status get_block(Block* file_block, size_t* rows, bool* eof) override; + Status close() override; + +private: + struct RequestedColumn { + LocalColumnId file_column_id = LocalColumnId::invalid(); + LocalIndex block_position; + size_t source_index = 0; + SlotDescriptor* slot_desc = nullptr; + DataTypeSerDeSPtr serde; + }; + + Status _build_requested_columns(const FileScanRequest& request, + std::vector* columns) const; + TFileRangeDesc _json_range() const; + Status _open_file_reader(); + Status _create_decompressor(); + Status _create_line_reader(); + Status _parse_jsonpath_and_json_root(); + Status _read_one_document(size_t* size, bool* eof); + Status _read_one_document_from_pipe(size_t* read_size); + Status _parse_next_json(size_t* size, bool* eof); + Status _extract_json_value(size_t size, bool* eof, bool* is_empty_row); + Status _append_rows_from_current_value(Block* block, bool* is_empty_row, bool* eof); + Status _append_simple_json_rows(Block* block, bool* is_empty_row, bool* eof); + Status _append_flat_array_jsonpath_rows(Block* block, bool* is_empty_row, bool* eof); + Status _append_nested_jsonpath_row(Block* block, bool* is_empty_row, bool* eof); + Status _set_column_values_from_object(simdjson::ondemand::object* object_value, Block* block, + bool* valid); + Status _write_columns_by_jsonpath(simdjson::ondemand::object* object_value, Block* block, + bool* valid); + template + Status _write_data_to_column(simdjson::ondemand::value& value, const DataTypePtr& type_desc, + IColumn* column_ptr, const std::string& column_name, + const DataTypeSerDeSPtr& serde, bool* valid); + Status _fill_missing_column(const RequestedColumn& column, IColumn* column_ptr, bool* valid); + Status _append_null_for_malformed_json(Block* block); + Status _handle_json_error(const Status& status, Block* block, size_t original_rows, + bool* is_empty_row); + Status _apply_filters(Block* file_block, size_t* rows); + void _truncate_block_to_rows(Block* block, size_t num_rows); + void _pop_back_last_inserted_value(Block* block, size_t column_index); + size_t _column_index(std::string_view key, size_t key_index); + bool _is_root_path_for_column(const RequestedColumn& column) const; + + const TFileScanRangeParams* _scan_params = nullptr; + TFileRangeDesc _range; + TFileRangeDesc _reader_range; + std::vector _source_file_slot_descs; + DataTypeSerDeSPtrs _source_serdes; + std::vector _file_schema; + RuntimeState* _runtime_state = nullptr; + TFileCompressType::type _range_compress_type = TFileCompressType::UNKNOWN; + std::optional _stream_load_id; + std::vector _requested_columns; + std::unordered_map _slot_name_to_index; + std::vector _previous_positions; + + io::FileReaderSPtr _physical_file_reader; + std::unique_ptr _decompressor; + std::unique_ptr _line_reader; + int64_t _current_offset = 0; + bool _reader_eof = false; + bool _skip_first_line = false; + bool _single_document_read = false; + + std::string _line_delimiter; + size_t _line_delimiter_length = 0; + std::string _jsonpaths; + std::string _json_root; + bool _read_json_by_line = false; + bool _strip_outer_array = false; + bool _num_as_string = false; + bool _fuzzy_parse = false; + bool _is_hive_table = false; + bool _openx_json_ignore_malformed = false; + TFileCompressType::type _file_compress_type = TFileCompressType::UNKNOWN; + + std::vector> _parsed_jsonpaths; + std::vector _parsed_json_root; + bool _parsed_from_json_root = false; + DataTypeSerDe::FormatOptions _serde_options; + + std::unique_ptr _json_parser; + simdjson::ondemand::document _original_json_doc; + simdjson::ondemand::value _json_value; + simdjson::ondemand::array _array; + simdjson::ondemand::array_iterator _array_iter; + std::string _document_buffer; + std::string _padding_buffer; + size_t _original_doc_size = 0; + size_t _padded_size = 1024 * 1024 * 8 + simdjson::SIMDJSON_PADDING; + std::unordered_map _cached_string_values; +}; + +} // namespace doris::format::json diff --git a/be/src/format_v2/table/hive_reader.cpp b/be/src/format_v2/table/hive_reader.cpp index 72940971152af4..ad4b75b00856ea 100644 --- a/be/src/format_v2/table/hive_reader.cpp +++ b/be/src/format_v2/table/hive_reader.cpp @@ -81,11 +81,12 @@ Status HiveReader::init(format::TableReadOptions&& options) { use_column_names = query_options.hive_orc_use_column_names; } else if (file_format == format::FileFormat::PARQUET) { use_column_names = query_options.hive_parquet_use_column_names; - } else if (file_format == format::FileFormat::CSV || file_format == format::FileFormat::TEXT) { - // Hive CSV/TEXT readers synthesize a file-local schema from FE-provided file slots because - // delimited text files do not carry embedded column names or field ids. The scan params' - // column_idxs still tell CsvReader/TextReader which physical field ordinal to read, while - // the table-level mapper can safely match the synthesized file schema by table column name. + } else if (file_format == format::FileFormat::CSV || file_format == format::FileFormat::TEXT || + file_format == format::FileFormat::JSON) { + // Hive CSV/TEXT/JSON readers synthesize a file-local schema from FE-provided file slots + // because these formats do not carry embedded column names or field ids. The scan params' + // format-specific attributes still tell the physical reader how to read values, while the + // table-level mapper can safely match the synthesized file schema by table column name. use_column_names = true; } else { return Status::NotSupported("HiveReader does not support file reader format {}", diff --git a/be/src/format_v2/table_reader.cpp b/be/src/format_v2/table_reader.cpp index 97fa4145c427a2..d90d4f6ea337d1 100644 --- a/be/src/format_v2/table_reader.cpp +++ b/be/src/format_v2/table_reader.cpp @@ -43,6 +43,7 @@ #include "format_v2/column_mapper.h" #include "format_v2/delimited_text/csv_reader.h" #include "format_v2/delimited_text/text_reader.h" +#include "format_v2/json/json_reader.h" #include "format_v2/parquet/parquet_reader.h" #include "roaring/roaring64map.hh" #include "storage/segment/condition_cache.h" @@ -73,6 +74,8 @@ std::string file_format_to_string(FileFormat format) { return "ORC"; case FileFormat::CSV: return "CSV"; + case FileFormat::JSON: + return "JSON"; case FileFormat::TEXT: return "TEXT"; case FileFormat::JNI: @@ -725,6 +728,16 @@ Status TableReader::create_file_reader(std::unique_ptr* reader) { _current_range_load_id); return Status::OK(); } + if (_format == FileFormat::JSON) { + if (_file_slot_descs == nullptr) { + return Status::InvalidArgument("JSON reader requires file slot descriptors"); + } + *reader = std::make_unique( + _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, + _scan_params, _current_file_range_desc, *_file_slot_descs, + _current_range_compress_type, _current_range_load_id); + return Status::OK(); + } return Status::NotSupported("TableReader does not support file format {}", file_format_to_string(_format)); } @@ -751,6 +764,7 @@ Status TableReader::prepare_split(const SplitReadOptions& options) { _current_task = std::make_unique(); _current_task->data_file = create_file_description(options.current_range); _current_file_description = *_current_task->data_file; + _current_file_range_desc = options.current_range; _current_range_compress_type = options.current_range.__isset.compress_type ? options.current_range.compress_type : TFileCompressType::UNKNOWN; diff --git a/be/src/format_v2/table_reader.h b/be/src/format_v2/table_reader.h index 23486e0fdf8b1d..a94cae621c9546 100644 --- a/be/src/format_v2/table_reader.h +++ b/be/src/format_v2/table_reader.h @@ -1422,6 +1422,7 @@ class TableReader { // each TFileRangeDesc, matching the old FileScanner reader contract. TFileCompressType::type _current_range_compress_type = TFileCompressType::UNKNOWN; std::optional _current_range_load_id; + TFileRangeDesc _current_file_range_desc; std::shared_ptr _system_properties; // partition key -> value std::map _partition_values; diff --git a/be/test/exec/scan/file_scanner_v2_test.cpp b/be/test/exec/scan/file_scanner_v2_test.cpp index 8a5d31e6fb5062..d3f0507aca1122 100644 --- a/be/test/exec/scan/file_scanner_v2_test.cpp +++ b/be/test/exec/scan/file_scanner_v2_test.cpp @@ -101,10 +101,12 @@ TEST(FileScannerV2Test, SupportedFormatMatrix) { {"hive", TFileFormatType::FORMAT_CSV_SNAPPYBLOCK, std::nullopt, true}, {"hive", TFileFormatType::FORMAT_PROTO, std::nullopt, true}, {"hive", TFileFormatType::FORMAT_TEXT, std::nullopt, true}, + {"hive", TFileFormatType::FORMAT_JSON, std::nullopt, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_ORC, false}, {"hive", TFileFormatType::FORMAT_ORC, TFileFormatType::FORMAT_PARQUET, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_CSV_PLAIN, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_TEXT, true}, + {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_JSON, true}, }; for (const auto& test_case : cases) { @@ -177,6 +179,7 @@ TEST(FileScannerV2Test, FileFormatConversionMatrix) { {TFileFormatType::FORMAT_CSV_SNAPPYBLOCK, format::FileFormat::CSV}, {TFileFormatType::FORMAT_PROTO, format::FileFormat::CSV}, {TFileFormatType::FORMAT_TEXT, format::FileFormat::TEXT}, + {TFileFormatType::FORMAT_JSON, format::FileFormat::JSON}, {TFileFormatType::FORMAT_ORC, std::nullopt}, }; diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp new file mode 100644 index 00000000000000..30c49676603212 --- /dev/null +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -0,0 +1,360 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/json/json_reader.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include "common/object_pool.h" +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "format_v2/column_data.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_profile.h" +#include "testutil/mock/mock_runtime_state.h" + +namespace doris::format::json { +namespace { + +TFileScanRangeParams json_scan_params(bool read_json_by_line = true, bool strip_outer_array = false, + std::string jsonpaths = "", std::string json_root = "", + bool ignore_malformed = false) { + TFileScanRangeParams params; + params.__set_format_type(TFileFormatType::FORMAT_JSON); + params.__set_file_type(TFileType::FILE_LOCAL); + params.__set_compress_type(TFileCompressType::PLAIN); + TFileAttributes attributes; + TFileTextScanRangeParams text_params; + text_params.__set_line_delimiter("\n"); + attributes.__set_text_params(std::move(text_params)); + attributes.__set_read_json_by_line(read_json_by_line); + attributes.__set_strip_outer_array(strip_outer_array); + attributes.__set_num_as_string(false); + attributes.__set_fuzzy_parse(false); + if (!jsonpaths.empty()) { + attributes.__set_jsonpaths(std::move(jsonpaths)); + } + if (!json_root.empty()) { + attributes.__set_json_root(std::move(json_root)); + } + if (ignore_malformed) { + attributes.__set_openx_json_ignore_malformed(true); + } + params.__set_file_attributes(std::move(attributes)); + return params; +} + +SlotDescriptor* make_test_slot(ObjectPool* pool, int slot_id, int slot_idx, DataTypePtr type, + const std::string& name) { + TSlotDescriptor slot_desc; + slot_desc.__set_id(slot_id); + slot_desc.__set_parent(0); + slot_desc.__set_slotType(type->to_thrift()); + slot_desc.__set_columnPos(slot_idx); + slot_desc.__set_byteOffset(0); + slot_desc.__set_nullIndicatorByte(slot_idx / 8); + slot_desc.__set_nullIndicatorBit(slot_idx % 8); + slot_desc.__set_slotIdx(slot_idx); + slot_desc.__set_isMaterialized(true); + slot_desc.__set_colName(name); + return pool->add(new SlotDescriptor(slot_desc)); +} + +std::vector build_slots(ObjectPool* pool) { + return {make_test_slot(pool, 0, 0, make_nullable(std::make_shared()), "id"), + make_test_slot(pool, 1, 1, make_nullable(std::make_shared()), "name")}; +} + +std::vector build_slots_with_required_name(ObjectPool* pool) { + return {make_test_slot(pool, 0, 0, make_nullable(std::make_shared()), "id"), + make_test_slot(pool, 1, 1, std::make_shared(), "name")}; +} + +std::unique_ptr file_description(const std::string& path) { + auto desc = std::make_unique(); + desc->path = path; + desc->file_size = static_cast(std::filesystem::file_size(path)); + desc->range_start_offset = 0; + desc->range_size = desc->file_size; + return desc; +} + +std::filesystem::path write_json_file(const std::string& name, const std::string& content) { + const auto test_dir = std::filesystem::temp_directory_path() / "doris_format_v2_json_reader"; + std::filesystem::create_directories(test_dir); + const auto file_path = test_dir / name; + std::ofstream out(file_path); + out << content; + return file_path; +} + +TFileRangeDesc file_range(const std::filesystem::path& file_path) { + TFileRangeDesc range; + range.__set_path(file_path.string()); + range.__set_start_offset(0); + range.__set_size(static_cast(std::filesystem::file_size(file_path))); + range.__set_file_size(static_cast(std::filesystem::file_size(file_path))); + return range; +} + +Block make_block(const std::vector& schema, + const std::vector& local_ids) { + Block block; + for (const auto local_id : local_ids) { + const auto it = std::ranges::find_if( + schema, [&](const auto& column) { return column.local_id == local_id; }); + EXPECT_TRUE(it != schema.end()); + block.insert({it->type->create_column(), it->type, it->name}); + } + return block; +} + +struct ReadResult { + Status status; + Status second_status = Status::OK(); + Block block; + size_t rows = 0; + bool eof = false; + size_t second_rows = 0; + bool second_eof = false; + std::vector schema; +}; + +ReadResult read_once(const std::string& file_name, const std::string& content, + TFileScanRangeParams params, const std::vector& slots, + const std::vector& requested_local_ids, bool read_twice = false) { + const auto file_path = write_json_file(file_name, content); + auto range = file_range(file_path); + + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto desc = file_description(file_path.string()); + RuntimeProfile profile("json_v2_reader_test"); + MockRuntimeState state; + JsonReader reader(system_properties, desc, nullptr, &profile, ¶ms, range, slots); + + ReadResult result; + result.status = reader.init(&state); + if (!result.status.ok()) { + return result; + } + result.status = reader.get_schema(&result.schema); + if (!result.status.ok()) { + return result; + } + + auto request = std::make_shared(); + for (size_t i = 0; i < requested_local_ids.size(); ++i) { + request->local_positions.emplace(LocalColumnId(requested_local_ids[i]), LocalIndex(i)); + } + result.status = reader.open(request); + if (!result.status.ok()) { + return result; + } + + result.block = make_block(result.schema, requested_local_ids); + result.status = reader.get_block(&result.block, &result.rows, &result.eof); + if (result.status.ok() && read_twice) { + auto eof_block = make_block(result.schema, requested_local_ids); + result.second_status = + reader.get_block(&eof_block, &result.second_rows, &result.second_eof); + } + return result; +} + +std::string nullable_string_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data_at(row).to_string(); +} + +int32_t nullable_int_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data()[row]; +} + +bool nullable_is_null_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + return nullable.is_null_at(row); +} + +} // namespace + +TEST(JsonReaderTest, ReadsRequestedColumnsInFileScanRequestOrder) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("order.jsonl", + R"({"id":1,"name":"alice"})" + "\n" + R"({"id":2,"name":"bob"})" + "\n", + json_scan_params(), slots, {1, 0}, true); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.schema.size(), 2); + EXPECT_EQ(result.schema[0].name, "id"); + EXPECT_EQ(result.schema[0].local_id, 0); + EXPECT_EQ(result.schema[1].name, "name"); + EXPECT_EQ(result.schema[1].local_id, 1); + ASSERT_EQ(result.rows, 2); + ASSERT_EQ(result.block.columns(), 2); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 0), "alice"); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 1), "bob"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 0), 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 1), 2); + ASSERT_TRUE(result.second_status.ok()) << result.second_status.to_string(); + EXPECT_EQ(result.second_rows, 0); + EXPECT_TRUE(result.second_eof); +} + +TEST(JsonReaderTest, ReadsSingleDocumentOuterArray) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = + read_once("outer_array.json", R"([{"id":3,"name":"carol"},{"id":4,"name":"dave"}])", + json_scan_params(false, true), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 3); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "carol"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 4); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "dave"); +} + +TEST(JsonReaderTest, ReadsJsonRootByLine) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("json_root.jsonl", + R"({"payload":{"id":5,"name":"eve"}})" + "\n" + R"({"payload":{"id":6,"name":"frank"}})" + "\n", + json_scan_params(true, false, "", "$.payload"), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 5); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "eve"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 6); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "frank"); +} + +TEST(JsonReaderTest, ReadsJsonPathsBySourceSlotAndReturnsRequestedBlockOrder) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("jsonpaths.jsonl", + R"({"payload":{"id":7,"user":"grace"}})" + "\n" + R"({"payload":{"id":8,"user":"heidi"}})" + "\n", + json_scan_params(true, false, R"(["$.payload.id","$.payload.user"])"), + slots, {1, 0}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 0), "grace"); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 1), "heidi"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 0), 7); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 1), 8); +} + +TEST(JsonReaderTest, ReadsJsonPathsFromSingleDocumentOuterArray) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once( + "outer_array_jsonpaths.json", + R"([{"payload":{"id":12,"user":"kate"}},{"payload":{"id":13,"user":"leo"}}])", + json_scan_params(false, true, R"(["$.payload.id","$.payload.user"])"), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 12); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "kate"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 13); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "leo"); +} + +TEST(JsonReaderTest, FillsMissingNullableColumnWithNull) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("missing_nullable.jsonl", + R"({"id":9})" + "\n", + json_scan_params(), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 9); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(1).column, 0)); +} + +TEST(JsonReaderTest, ReturnsErrorForMissingRequiredColumn) { + ObjectPool pool; + auto slots = build_slots_with_required_name(&pool); + auto result = read_once("missing_required.jsonl", + R"({"id":10})" + "\n", + json_scan_params(), slots, {0, 1}); + + EXPECT_FALSE(result.status.ok()); +} + +TEST(JsonReaderTest, ReturnsErrorForMalformedJsonByDefault) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("malformed_strict.jsonl", + "not-json\n" + R"({"id":11,"name":"judy"})" + "\n", + json_scan_params(), slots, {0, 1}); + + EXPECT_FALSE(result.status.ok()); +} + +TEST(JsonReaderTest, IgnoresMalformedJsonAsNullRowsWhenConfigured) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("ignore_malformed.jsonl", + "not-json\n" + R"({"id":11,"name":"judy"})" + "\n", + json_scan_params(true, false, "", "", true), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(0).column, 0)); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(1).column, 0)); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 11); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "judy"); +} + +} // namespace doris::format::json diff --git a/be/test/format_v2/table/hive_reader_test.cpp b/be/test/format_v2/table/hive_reader_test.cpp index 03b87db0c5ccb6..a41effaa91a3a9 100644 --- a/be/test/format_v2/table/hive_reader_test.cpp +++ b/be/test/format_v2/table/hive_reader_test.cpp @@ -89,6 +89,17 @@ TEST_F(HiveV2ReaderTest, InitSupportsTextFileFormat) { EXPECT_EQ(reader.mapping_mode(), TableColumnMappingMode::BY_NAME); } +// Scenario: Hive JSON files also synthesize a file-local schema from FE slots, so they should use +// name mapping at the table-reader layer while JsonReader consumes JSON attributes. +TEST_F(HiveV2ReaderTest, InitSupportsJsonFileFormat) { + TFileScanRangeParams params; + params.__set_format_type(TFileFormatType::FORMAT_JSON); + HiveReader reader; + + ASSERT_TRUE(init_hive_reader(FileFormat::JSON, ¶ms, &state, &profile, &reader).ok()); + EXPECT_EQ(reader.mapping_mode(), TableColumnMappingMode::BY_NAME); +} + // Scenario: positional mapping is only for Hive Parquet/ORC sessions that disable name mapping. // CSV keeps the synthesized file-column names and leaves column_idxs for the CsvReader itself. TEST_F(HiveV2ReaderTest, CsvDoesNotConsumeColumnIdxsAsPositionalSchemaMapping) { From dfe6a5ea421b8fdd03ec1f3e608478cd34097c1d Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Jun 2026 11:31:46 +0800 Subject: [PATCH 2/7] [doc](be) Add comments for json reader ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: Add comments to the file scanner v2 JSON reader interfaces and non-obvious implementation paths, including synthetic schema handling, requested column mapping, simdjson buffer lifetime, json_root/jsonpaths behavior, duplicate key handling, and malformed-row rollback. ### Release note None ### Check List (For Author) - Test: No need to test (comment-only change) - Ran git diff --check. - Ran build-support/check-format.sh. - Behavior changed: No - Does this need documentation: No --- be/src/format_v2/json/json_reader.cpp | 33 +++++++++++++++++++++++++++ be/src/format_v2/json/json_reader.h | 20 ++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp index f061d809554f8e..ff247d5c70d62d 100644 --- a/be/src/format_v2/json/json_reader.cpp +++ b/be/src/format_v2/json/json_reader.cpp @@ -64,6 +64,9 @@ DataTypePtr json_file_type_from_slot_type(const DataTypePtr& type) { return nullptr; } + // Text-like file readers expose CHAR/VARCHAR as STRING and let the table column mapper cast to + // the destination slot type. JSON follows the same file-schema convention so that v2 mapping + // behaves consistently across text formats. const bool is_nullable = type->is_nullable(); const auto nested_type = remove_nullable(type); DataTypePtr file_type; @@ -214,6 +217,8 @@ Status JsonReader::init(RuntimeState* state) { _source_serdes = create_data_type_serdes(_source_file_slot_descs); _file_schema.clear(); _file_schema.reserve(_source_file_slot_descs.size()); + // JSON has no physical footer schema. The FE file slots are therefore the authoritative schema + // for both field names and source local ids. for (size_t idx = 0; idx < _source_file_slot_descs.size(); ++idx) { const auto* slot = _source_file_slot_descs[idx]; DORIS_CHECK(slot != nullptr); @@ -311,6 +316,8 @@ Status JsonReader::get_block(Block* file_block, size_t* rows, bool* eof) { if (!st.ok()) { RETURN_IF_ERROR(_handle_json_error(st, file_block, original_rows, &is_empty_row)); } + // An ignored or empty JSON object can produce no row. Avoid spinning forever on a document + // that was consumed but produced no materialized value. if (!is_empty_row && file_block->rows() == original_rows) { break; } @@ -345,6 +352,9 @@ Status JsonReader::_build_requested_columns(const FileScanRequest& request, std::vector* columns) const { DORIS_CHECK(columns != nullptr); columns->clear(); + // FileScanRequest stores a map from file-local id to output block position. Materialization is + // position-driven, so normalize it into a dense vector ordered by block position while keeping + // the original source index for jsonpaths. std::vector by_position(request.local_positions.size()); for (const auto& [file_column_id, block_position] : request.local_positions) { if (file_column_id.value() < 0 || @@ -431,6 +441,8 @@ Status JsonReader::_create_decompressor() { Status JsonReader::_create_line_reader() { int64_t size = _reader_range.size; if (_reader_range.start_offset != 0) { + // Start one byte earlier and discard the first partial line, matching split semantics used + // by text readers. ++size; _skip_first_line = true; } else { @@ -491,6 +503,8 @@ Status JsonReader::_read_one_document(size_t* size, bool* eof) { _document_buffer.assign(reinterpret_cast(line), *size); return Status::OK(); } + // Non-line mode treats the split as one JSON document. This supports a single object or an + // array with strip_outer_array=true. if (_single_document_read) { *eof = true; return Status::OK(); @@ -558,6 +572,8 @@ Status JsonReader::_parse_next_json(size_t* size, bool* eof) { _padded_size = *size + simdjson::SIMDJSON_PADDING; _padding_buffer.resize(_padded_size); } + // Ondemand values reference the input buffer. Keep the padded bytes in a member buffer until the + // current document is fully materialized. std::memcpy(_padding_buffer.data(), _document_buffer.data(), *size); _original_doc_size = *size; const auto error = @@ -592,6 +608,8 @@ Status JsonReader::_extract_json_value(size_t size, bool* eof, bool* is_empty_ro } _parsed_from_json_root = false; if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) { + // In object mode json_root can be applied once here. In outer-array mode each array element + // needs its own root extraction, which is handled while iterating the array. simdjson::ondemand::object object = _original_json_doc; Status st = JsonFunctions::extract_from_object(object, _parsed_json_root, &_json_value); if (!st.ok()) { @@ -672,6 +690,8 @@ Status JsonReader::_append_flat_array_jsonpath_rows(Block* block, bool* is_empty while (_array_iter != _array.end()) { simdjson::ondemand::object object_value = (*_array_iter).get_object(); if (!_parsed_from_json_root && !_parsed_json_root.empty()) { + // For strip_outer_array, json_root is evaluated against each element. Elements without + // the requested root do not produce rows, matching the load reader behavior. simdjson::ondemand::value rooted_value; Status st = JsonFunctions::extract_from_object(object_value, _parsed_json_root, &rooted_value); @@ -727,6 +747,8 @@ Status JsonReader::_set_column_values_from_object(simdjson::ondemand::object* ob } if (seen_columns[column_index]) { if (_is_hive_table) { + // Hive JSON keeps the last duplicate key ignoring case. The earlier value has + // already been appended, so remove it before writing the replacement. _pop_back_last_inserted_value(block, column_index); } else { continue; @@ -785,6 +807,8 @@ Status JsonReader::_write_columns_by_jsonpath(simdjson::ondemand::object* object } } if (_is_root_path_for_column(requested)) { + // A root jsonpath means "materialize the whole current JSON document" instead of a + // field under it. Use the original bytes so callers receive the same document text. if (is_column_nullable(*column_ptr)) { auto* nullable_column = assert_cast(column_ptr); nullable_column->get_null_map_data().push_back(0); @@ -816,6 +840,8 @@ Status JsonReader::_write_columns_by_jsonpath(simdjson::ondemand::object* object } if (!has_valid_value) { + // jsonpaths can legally match nothing. Roll the row back so an all-missing path set does + // not create a synthetic row of nulls. _truncate_block_to_rows(block, cur_row_count); *valid = false; return Status::OK(); @@ -903,6 +929,8 @@ Status JsonReader::_write_data_to_column(simdjson::ondemand::value& value, const auto sub_column_idx = it->second; auto sub_column_ptr = struct_column_ptr->get_column(sub_column_idx).get_ptr(); if (has_value[sub_column_idx]) { + // Struct fields follow Hive-style duplicate handling: the last matching nested key + // wins. Remove the earlier nested value before appending the new one. sub_column_ptr->pop_back(1); } has_value[sub_column_idx] = true; @@ -1019,6 +1047,9 @@ Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t bool* is_empty_row) { DORIS_CHECK(block != nullptr); DORIS_CHECK(is_empty_row != nullptr); + // Deserialization can fail after several columns have already appended data. Always restore the + // block to the row count before this document before either surfacing the error or appending + // the ignore-malformed null row. _truncate_block_to_rows(block, original_rows); if (_openx_json_ignore_malformed && status.is()) { RETURN_IF_ERROR(_append_null_for_malformed_json(block)); @@ -1082,6 +1113,8 @@ size_t JsonReader::_column_index(std::string_view key, size_t key_index) { lookup_key = hive_key; } if (key_index < _previous_positions.size()) { + // Most JSON lines share field order. Reuse the previous line's key-position mapping before + // falling back to the hash table lookup. const auto previous = _previous_positions[key_index]; if (previous < _requested_columns.size()) { const auto previous_name = _requested_columns[previous].slot_desc->col_name(); diff --git a/be/src/format_v2/json/json_reader.h b/be/src/format_v2/json/json_reader.h index fc78eb2f71bdbd..c415961866277c 100644 --- a/be/src/format_v2/json/json_reader.h +++ b/be/src/format_v2/json/json_reader.h @@ -49,6 +49,9 @@ namespace doris::format::json { // v2 file-local columns and performs JSON parsing/materialization directly in the v2 path. class JsonReader final : public FileReader { public: + // `file_slot_descs` is the FE-planned file schema. JSON has no physical schema, so the reader + // exposes these slots as synthetic file-local columns and materializes only the columns + // requested by FileScanRequest. JsonReader(std::shared_ptr& system_properties, std::unique_ptr& file_description, std::shared_ptr io_ctx, RuntimeProfile* profile, @@ -58,15 +61,23 @@ class JsonReader final : public FileReader { std::optional stream_load_id = std::nullopt); ~JsonReader() override; + // Initializes scan attributes and builds the synthetic schema from FE slots. Status init(RuntimeState* state) override; Status get_schema(std::vector* file_schema) const override; std::unique_ptr create_column_mapper( TableColumnMapperOptions options) const override; + // Opens the underlying file or stream and binds requested local column ids to output block + // positions. After this call, `get_block` can be called until it returns eof. Status open(std::shared_ptr request) override; + // Appends rows into `file_block` according to the FileScanRequest order. The block must already + // contain columns matching the requested positions. Status get_block(Block* file_block, size_t* rows, bool* eof) override; Status close() override; private: + // A requested column keeps both identities: + // - `source_index`: index in FE file slots, used for jsonpaths and SerDe lookup. + // - `block_position`: index in the caller's output block, used for materialization. struct RequestedColumn { LocalColumnId file_column_id = LocalColumnId::invalid(); LocalIndex block_position; @@ -77,14 +88,19 @@ class JsonReader final : public FileReader { Status _build_requested_columns(const FileScanRequest& request, std::vector* columns) const; + // Reconciles TableReader's split/range descriptor with FileReader's concrete file description. TFileRangeDesc _json_range() const; Status _open_file_reader(); Status _create_decompressor(); Status _create_line_reader(); Status _parse_jsonpath_and_json_root(); + // Reads one logical JSON document: one line for JSON Lines, or the whole range/pipe payload for + // single-document mode. Status _read_one_document(size_t* size, bool* eof); Status _read_one_document_from_pipe(size_t* read_size); + // Moves the logical document into a simdjson-padded buffer and creates an ondemand document. Status _parse_next_json(size_t* size, bool* eof); + // Applies json_root and validates the object/array shape required by strip_outer_array. Status _extract_json_value(size_t size, bool* eof, bool* is_empty_row); Status _append_rows_from_current_value(Block* block, bool* is_empty_row, bool* eof); Status _append_simple_json_rows(Block* block, bool* is_empty_row, bool* eof); @@ -99,6 +115,8 @@ class JsonReader final : public FileReader { IColumn* column_ptr, const std::string& column_name, const DataTypeSerDeSPtr& serde, bool* valid); Status _fill_missing_column(const RequestedColumn& column, IColumn* column_ptr, bool* valid); + // Implements openx_json_ignore_malformed by appending a null row after rolling back any partial + // writes for the malformed document. Status _append_null_for_malformed_json(Block* block); Status _handle_json_error(const Status& status, Block* block, size_t original_rows, bool* is_empty_row); @@ -146,6 +164,8 @@ class JsonReader final : public FileReader { bool _parsed_from_json_root = false; DataTypeSerDe::FormatOptions _serde_options; + // simdjson ondemand values point into `_padding_buffer`, so the buffer must outlive all values + // created from the current document. std::unique_ptr _json_parser; simdjson::ondemand::document _original_json_doc; simdjson::ondemand::value _json_value; From f4307ddaad231bcf29993c2c88348a0e4febe08e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Jun 2026 11:42:07 +0800 Subject: [PATCH 3/7] [test](be) Fix json reader required slot test ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: The JsonReaderTest helper always set a valid null indicator for constructed slot descriptors, so SlotDescriptor treated even non-nullable DataTypeString slots as nullable. This made the missing-required-column test expect an error while the test input actually described a nullable slot. Fix the helper to set nullIndicatorBit to -1 for non-nullable types. ### Release note None ### Check List (For Author) - Test: Unit Test / Manual test - Ran git diff --check. - Ran build-support/check-format.sh. - Attempted ./run-be-ut.sh --run --filter='JsonReaderTest.ReturnsErrorForMissingRequiredColumn', but sandbox execution failed because nproc is unavailable, .git/modules submodule config writes are denied, and GitHub dependency download DNS is blocked. Retried with escalated permissions twice, but approval review timed out before execution. - Behavior changed: No - Does this need documentation: No --- be/test/format_v2/json/json_reader_test.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp index 30c49676603212..a455f696b71d0f 100644 --- a/be/test/format_v2/json/json_reader_test.cpp +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -79,8 +79,13 @@ SlotDescriptor* make_test_slot(ObjectPool* pool, int slot_id, int slot_idx, Data slot_desc.__set_slotType(type->to_thrift()); slot_desc.__set_columnPos(slot_idx); slot_desc.__set_byteOffset(0); - slot_desc.__set_nullIndicatorByte(slot_idx / 8); - slot_desc.__set_nullIndicatorBit(slot_idx % 8); + if (type->is_nullable()) { + slot_desc.__set_nullIndicatorByte(slot_idx / 8); + slot_desc.__set_nullIndicatorBit(slot_idx % 8); + } else { + slot_desc.__set_nullIndicatorByte(0); + slot_desc.__set_nullIndicatorBit(-1); + } slot_desc.__set_slotIdx(slot_idx); slot_desc.__set_isMaterialized(true); slot_desc.__set_colName(name); From abdd6b9621c26051d9aa43c8e06055ae0779acc4 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Jun 2026 14:20:52 +0800 Subject: [PATCH 4/7] [fix](be) Fix json reader malformed and nullable handling ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: File scanner v2 JSON reader had three regressions. First, openx_json_ignore_malformed appended an all-null row for malformed records, while Hive/OpenX semantics skip malformed records. Second, empty JSON lines were still passed to simdjson and failed with EMPTY. Third, the reader assumed nullable output columns always had nullable source serdes and called get_nested_serdes on scalar serdes, which broke CDC TVF JSON rows whose output file column is nullable but source slot serde is not. This change skips malformed rows, treats empty JSON lines as empty rows, and only unwraps serdes when the source type is actually nullable. ### Release note None ### Check List (For Author) - Test: Unit Test / Manual test - Added JsonReaderTest coverage for present required columns, ignored malformed rows, and empty JSON lines. - Ran git diff --check. - Ran build-support/check-format.sh. - Attempted ./run-be-ut.sh --run --filter='JsonReaderTest.*', but sandbox execution failed because nproc is unavailable, .git/modules submodule config writes are denied, and GitHub dependency download DNS is blocked. Retried with escalated permissions twice, but approval review timed out before execution. - Behavior changed: No - Does this need documentation: No --- be/src/format_v2/json/json_reader.cpp | 40 +++++++++------------ be/src/format_v2/json/json_reader.h | 3 -- be/test/format_v2/json/json_reader_test.cpp | 39 ++++++++++++++++---- 3 files changed, 49 insertions(+), 33 deletions(-) diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp index ff247d5c70d62d..5462e04694c8ba 100644 --- a/be/src/format_v2/json/json_reader.cpp +++ b/be/src/format_v2/json/json_reader.cpp @@ -303,8 +303,12 @@ Status JsonReader::get_block(Block* file_block, size_t* rows, bool* eof) { Status st = Status::OK(); try { st = _parse_next_json(&size, &_reader_eof); - if (st.ok() && !_reader_eof && size > 0) { - st = _extract_json_value(size, &_reader_eof, &is_empty_row); + if (st.ok() && !_reader_eof) { + if (size == 0) { + is_empty_row = true; + } else { + st = _extract_json_value(size, &_reader_eof, &is_empty_row); + } } if (st.ok() && !_reader_eof && !is_empty_row) { st = _append_rows_from_current_value(file_block, &is_empty_row, &_reader_eof); @@ -559,7 +563,7 @@ Status JsonReader::_read_one_document_from_pipe(size_t* read_size) { Status JsonReader::_parse_next_json(size_t* size, bool* eof) { RETURN_IF_ERROR(_read_one_document(size, eof)); - if (*eof) { + if (*eof || *size == 0) { return Status::OK(); } if (*size >= 3 && static_cast(_document_buffer[0]) == 0xEF && @@ -863,7 +867,9 @@ Status JsonReader::_write_data_to_column(simdjson::ondemand::value& value, if (is_column_nullable(*column_ptr)) { nullable_column = assert_cast(column_ptr); data_column_ptr = nullable_column->get_nested_column().get_ptr().get(); - data_serde = serde->get_nested_serdes()[0]; + if (type_desc->is_nullable()) { + data_serde = serde->get_nested_serdes()[0]; + } if (value_type == simdjson::ondemand::json_type::null) { nullable_column->insert_default(); *valid = true; @@ -969,7 +975,9 @@ Status JsonReader::_write_data_to_column(simdjson::ondemand::value& value, auto* nullable_key = assert_cast(key_column); nullable_key->get_null_map_data().push_back(0); key_column = nullable_key->get_nested_column().get_ptr().get(); - key_serde = key_serde->get_nested_serdes()[0]; + if (map_type->get_key_type()->is_nullable()) { + key_serde = key_serde->get_nested_serdes()[0]; + } } std::string_view key_view = member_value.unescaped_key().value(); Slice key_slice(key_view.data(), key_view.size()); @@ -1028,32 +1036,16 @@ Status JsonReader::_fill_missing_column(const RequestedColumn& column, IColumn* column.slot_desc->col_name()); } -Status JsonReader::_append_null_for_malformed_json(Block* block) { - DORIS_CHECK(block != nullptr); - for (int i = 0; i < block->columns(); ++i) { - auto& column_with_type = block->get_by_position(i); - if (!is_column_nullable(*column_with_type.column)) { - return Status::DataQualityError("malformed json, but the column `{}` is not nullable.", - column_with_type.column->get_name()); - } - auto column = IColumn::mutate(std::move(column_with_type.column)); - assert_cast(column.get())->insert_default(); - column_with_type.column = std::move(column); - } - return Status::OK(); -} - Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t original_rows, bool* is_empty_row) { DORIS_CHECK(block != nullptr); DORIS_CHECK(is_empty_row != nullptr); // Deserialization can fail after several columns have already appended data. Always restore the - // block to the row count before this document before either surfacing the error or appending - // the ignore-malformed null row. + // block to the row count before this document before either surfacing the error or skipping the + // ignored malformed document. _truncate_block_to_rows(block, original_rows); if (_openx_json_ignore_malformed && status.is()) { - RETURN_IF_ERROR(_append_null_for_malformed_json(block)); - *is_empty_row = false; + *is_empty_row = true; return Status::OK(); } return status; diff --git a/be/src/format_v2/json/json_reader.h b/be/src/format_v2/json/json_reader.h index c415961866277c..c50357efb87534 100644 --- a/be/src/format_v2/json/json_reader.h +++ b/be/src/format_v2/json/json_reader.h @@ -115,9 +115,6 @@ class JsonReader final : public FileReader { IColumn* column_ptr, const std::string& column_name, const DataTypeSerDeSPtr& serde, bool* valid); Status _fill_missing_column(const RequestedColumn& column, IColumn* column_ptr, bool* valid); - // Implements openx_json_ignore_malformed by appending a null row after rolling back any partial - // writes for the malformed document. - Status _append_null_for_malformed_json(Block* block); Status _handle_json_error(const Status& status, Block* block, size_t original_rows, bool* is_empty_row); Status _apply_filters(Block* file_block, size_t* rows); diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp index a455f696b71d0f..ef2a2782156eed 100644 --- a/be/test/format_v2/json/json_reader_test.cpp +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -333,6 +333,20 @@ TEST(JsonReaderTest, ReturnsErrorForMissingRequiredColumn) { EXPECT_FALSE(result.status.ok()); } +TEST(JsonReaderTest, ReadsPresentRequiredColumn) { + ObjectPool pool; + auto slots = build_slots_with_required_name(&pool); + auto result = read_once("present_required.jsonl", + R"({"id":14,"name":"mallory"})" + "\n", + json_scan_params(), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 14); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "mallory"); +} + TEST(JsonReaderTest, ReturnsErrorForMalformedJsonByDefault) { ObjectPool pool; auto slots = build_slots(&pool); @@ -345,7 +359,7 @@ TEST(JsonReaderTest, ReturnsErrorForMalformedJsonByDefault) { EXPECT_FALSE(result.status.ok()); } -TEST(JsonReaderTest, IgnoresMalformedJsonAsNullRowsWhenConfigured) { +TEST(JsonReaderTest, IgnoresMalformedJsonWhenConfigured) { ObjectPool pool; auto slots = build_slots(&pool); auto result = read_once("ignore_malformed.jsonl", @@ -355,11 +369,24 @@ TEST(JsonReaderTest, IgnoresMalformedJsonAsNullRowsWhenConfigured) { json_scan_params(true, false, "", "", true), slots, {0, 1}); ASSERT_TRUE(result.status.ok()) << result.status.to_string(); - ASSERT_EQ(result.rows, 2); - EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(0).column, 0)); - EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(1).column, 0)); - EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 11); - EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "judy"); + ASSERT_EQ(result.rows, 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 11); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "judy"); +} + +TEST(JsonReaderTest, SkipsEmptyJsonLine) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("empty_line.jsonl", + "\n" + R"({"id":15,"name":"nancy"})" + "\n", + json_scan_params(), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 15); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "nancy"); } } // namespace doris::format::json From 30792124ea46ec519ba18ee4ad8fa6ea5acee9ac Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Jun 2026 17:10:37 +0800 Subject: [PATCH 5/7] [fix](be) Preserve json reader slot nullability ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: The file scanner v2 JSON reader exposed every synthetic file column as nullable. For CDC TVF sources with non-nullable columns, this made the table mapper see Nullable(INT) where the source slot was INT and produced a mapping projection cast failure. This change preserves the source slot nullability in the JSON file schema while keeping the existing runtime handling for nullable output columns. ### Release note None ### Check List (For Author) - Test: Unit Test / Manual test - Updated JsonReaderTest to assert nullable and non-nullable file schema columns. - Ran git diff --check. - Ran build-support/check-format.sh. - Attempted ./run-be-ut.sh --run --filter='JsonReaderTest.*', but sandbox execution failed because nproc is unavailable, .git/modules submodule config writes are denied, and GitHub dependency download DNS is blocked. Retried with escalated permissions twice, but approval review timed out before execution. - Behavior changed: No - Does this need documentation: No --- be/src/format_v2/json/json_reader.cpp | 7 +------ be/test/format_v2/json/json_reader_test.cpp | 10 +++++++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp index 5462e04694c8ba..6fe9d71c51c1c2 100644 --- a/be/src/format_v2/json/json_reader.cpp +++ b/be/src/format_v2/json/json_reader.cpp @@ -54,11 +54,6 @@ namespace doris::format::json { namespace { -DataTypePtr nullable_type(DataTypePtr type) { - return type != nullptr && type->is_nullable() ? std::move(type) - : make_nullable(std::move(type)); -} - DataTypePtr json_file_type_from_slot_type(const DataTypePtr& type) { if (type == nullptr) { return nullptr; @@ -226,7 +221,7 @@ Status JsonReader::init(RuntimeState* state) { field.identifier = Field::create_field(slot->col_name()); field.local_id = cast_set(idx); field.name = slot->col_name(); - field.type = nullable_type(json_file_type_from_slot_type(slot->get_data_type_ptr())); + field.type = json_file_type_from_slot_type(slot->get_data_type_ptr()); field.children = synthesize_file_children_from_type(field.type); _file_schema.push_back(std::move(field)); } diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp index ef2a2782156eed..f81f57dd739a0a 100644 --- a/be/test/format_v2/json/json_reader_test.cpp +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -200,6 +200,11 @@ std::string nullable_string_at(const IColumn& column, size_t row) { return nested.get_data_at(row).to_string(); } +std::string string_at(const IColumn& column, size_t row) { + const auto& nested = assert_cast(column); + return nested.get_data_at(row).to_string(); +} + int32_t nullable_int_at(const IColumn& column, size_t row) { const auto& nullable = assert_cast(column); const auto& nested = assert_cast(nullable.get_nested_column()); @@ -342,9 +347,12 @@ TEST(JsonReaderTest, ReadsPresentRequiredColumn) { json_scan_params(), slots, {0, 1}); ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.schema.size(), 2); + EXPECT_TRUE(result.schema[0].type->is_nullable()); + EXPECT_FALSE(result.schema[1].type->is_nullable()); ASSERT_EQ(result.rows, 1); EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 14); - EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "mallory"); + EXPECT_EQ(string_at(*result.block.get_by_position(1).column, 0), "mallory"); } TEST(JsonReaderTest, ReturnsErrorForMalformedJsonByDefault) { From 35bff7cfd8af5ffc16ae1993b4c4cc876f4ca002 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jun 2026 09:17:46 +0800 Subject: [PATCH 6/7] [test](regression) Update openx json malformed expectations ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: The file scanner v2 JSON reader now skips malformed rows when openx_json_ignore_malformed is enabled. The Hive openx JSON regression expected the old behavior that materialized malformed rows as all NULL values, so the expected output failed against the corrected result. This updates the expected q1 output to contain only the valid JSON rows. ### Release note None ### Check List (For Author) - Test: Manual test - Ran git diff --check. - Did not run the external Hive regression locally because the external Hive test environment is not available in this workspace. - Behavior changed: No - Does this need documentation: No --- .../external_table_p0/hive/test_hive_openx_json.out | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out index 6eadea56694c85..c5e191d779307c 100644 --- a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out +++ b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out @@ -1,15 +1,5 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !q1 -- -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N -\N \N \N \N \N 1 Alice [1, 2, 3] {"math":90, "english":85} {"a":100, "b":"test1", "c":1234567890} 2 Bob [4, 5] {"math":80, "science":95} {"a":200, "b":"test2", "c":9876543210} @@ -22,4 +12,3 @@ [1,2,3] \N \N \N \N {"name":"bad1","id":5,"numbers":[1,2,3] \N \N \N \N {"name":"bad2","id":6,"numbers":"not an array","scores":{"key4":40},"details":{"a":4,"b":"text","c":4000000}} \N \N \N \N - From 1f90db7181bad140197565cd59de75c556b989ef Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Jun 2026 09:28:28 +0800 Subject: [PATCH 7/7] [fix](be) Keep openx json malformed null rows ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: File scanner v2 JSON reader incorrectly skipped malformed JSON documents when openx_json_ignore_malformed was enabled. The existing OpenX JSON reader semantics materialize one all-NULL row for each ignored malformed document when all projected columns are nullable. This change restores that compatibility by rolling back any partial writes and appending a NULL row for malformed documents, and updates the JsonReader unit test and Hive regression expected output accordingly. ### Release note None ### Check List (For Author) - Test: Unit Test / Manual test - Ran git diff --check. - Formatted changed BE C++ files with Homebrew clang-format 16.0.6. - Attempted ./run-be-ut.sh --run --filter='JsonReaderTest.*' with JDK17; sandbox execution failed because nproc is unavailable, submodule config writes are denied, and GitHub dependency download DNS is blocked. Escalated retries timed out before execution. - Behavior changed: No - Does this need documentation: No --- be/src/format_v2/json/json_reader.cpp | 22 ++++++++++++++++--- be/src/format_v2/json/json_reader.h | 1 + be/test/format_v2/json/json_reader_test.cpp | 10 +++++---- .../hive/test_hive_openx_json.out | 10 +++++++++ 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp index 6fe9d71c51c1c2..04f74f52aefa9a 100644 --- a/be/src/format_v2/json/json_reader.cpp +++ b/be/src/format_v2/json/json_reader.cpp @@ -1031,16 +1031,32 @@ Status JsonReader::_fill_missing_column(const RequestedColumn& column, IColumn* column.slot_desc->col_name()); } +Status JsonReader::_append_null_for_malformed_json(Block* block) { + DORIS_CHECK(block != nullptr); + for (int i = 0; i < block->columns(); ++i) { + auto& column_with_type = block->get_by_position(i); + if (!is_column_nullable(*column_with_type.column)) { + return Status::DataQualityError("malformed json, but the column `{}` is not nullable.", + column_with_type.column->get_name()); + } + auto column = IColumn::mutate(std::move(column_with_type.column)); + assert_cast(column.get())->insert_default(); + column_with_type.column = std::move(column); + } + return Status::OK(); +} + Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t original_rows, bool* is_empty_row) { DORIS_CHECK(block != nullptr); DORIS_CHECK(is_empty_row != nullptr); // Deserialization can fail after several columns have already appended data. Always restore the - // block to the row count before this document before either surfacing the error or skipping the - // ignored malformed document. + // block to the row count before this document before either surfacing the error or appending + // the ignore-malformed null row. _truncate_block_to_rows(block, original_rows); if (_openx_json_ignore_malformed && status.is()) { - *is_empty_row = true; + RETURN_IF_ERROR(_append_null_for_malformed_json(block)); + *is_empty_row = false; return Status::OK(); } return status; diff --git a/be/src/format_v2/json/json_reader.h b/be/src/format_v2/json/json_reader.h index c50357efb87534..52cdfad6728d64 100644 --- a/be/src/format_v2/json/json_reader.h +++ b/be/src/format_v2/json/json_reader.h @@ -115,6 +115,7 @@ class JsonReader final : public FileReader { IColumn* column_ptr, const std::string& column_name, const DataTypeSerDeSPtr& serde, bool* valid); Status _fill_missing_column(const RequestedColumn& column, IColumn* column_ptr, bool* valid); + Status _append_null_for_malformed_json(Block* block); Status _handle_json_error(const Status& status, Block* block, size_t original_rows, bool* is_empty_row); Status _apply_filters(Block* file_block, size_t* rows); diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp index f81f57dd739a0a..994d71c0e6aefc 100644 --- a/be/test/format_v2/json/json_reader_test.cpp +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -367,7 +367,7 @@ TEST(JsonReaderTest, ReturnsErrorForMalformedJsonByDefault) { EXPECT_FALSE(result.status.ok()); } -TEST(JsonReaderTest, IgnoresMalformedJsonWhenConfigured) { +TEST(JsonReaderTest, IgnoresMalformedJsonAsNullRowsWhenConfigured) { ObjectPool pool; auto slots = build_slots(&pool); auto result = read_once("ignore_malformed.jsonl", @@ -377,9 +377,11 @@ TEST(JsonReaderTest, IgnoresMalformedJsonWhenConfigured) { json_scan_params(true, false, "", "", true), slots, {0, 1}); ASSERT_TRUE(result.status.ok()) << result.status.to_string(); - ASSERT_EQ(result.rows, 1); - EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 11); - EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "judy"); + ASSERT_EQ(result.rows, 2); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(0).column, 0)); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(1).column, 0)); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 11); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "judy"); } TEST(JsonReaderTest, SkipsEmptyJsonLine) { diff --git a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out index c5e191d779307c..aa7220b55f16b2 100644 --- a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out +++ b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out @@ -1,5 +1,15 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !q1 -- +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N +\N \N \N \N \N 1 Alice [1, 2, 3] {"math":90, "english":85} {"a":100, "b":"test1", "c":1234567890} 2 Bob [4, 5] {"math":80, "science":95} {"a":200, "b":"test2", "c":9876543210}