diff --git a/be/src/exec/scan/file_scanner_v2.cpp b/be/src/exec/scan/file_scanner_v2.cpp index 83b2c84e37c0f1..7b25f4e1c6929b 100644 --- a/be/src/exec/scan/file_scanner_v2.cpp +++ b/be/src/exec/scan/file_scanner_v2.cpp @@ -120,6 +120,10 @@ bool is_text_format(TFileFormatType::type format_type) { return format_type == TFileFormatType::FORMAT_TEXT; } +bool is_json_format(TFileFormatType::type format_type) { + return format_type == TFileFormatType::FORMAT_JSON; +} + bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) { if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) || column_name == BeConsts::ICEBERG_ROWID_COL) { @@ -213,7 +217,8 @@ bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFile return is_supported_table_format(range); } else if (format_type == TFileFormatType::FORMAT_JNI) { return is_supported_jni_table_format(range); - } else if (is_csv_format(format_type) || is_text_format(format_type)) { + } else if (is_csv_format(format_type) || is_text_format(format_type) || + is_json_format(format_type)) { return is_supported_table_format(range); } else { LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2"; @@ -597,6 +602,9 @@ Status FileScannerV2::_to_file_format(TFileFormatType::type format_type, case TFileFormatType::FORMAT_TEXT: *file_format = format::FileFormat::TEXT; return Status::OK(); + case TFileFormatType::FORMAT_JSON: + *file_format = format::FileFormat::JSON; + return Status::OK(); default: return Status::NotSupported("FileScannerV2 does not support file format {}", to_string(format_type)); diff --git a/be/src/format_v2/file_reader.h b/be/src/format_v2/file_reader.h index 2de374b83150f1..3f192ae093a47b 100644 --- a/be/src/format_v2/file_reader.h +++ b/be/src/format_v2/file_reader.h @@ -114,6 +114,7 @@ enum class FileFormat { PARQUET, ORC, CSV, + JSON, TEXT, JNI, }; diff --git a/be/src/format_v2/json/json_reader.cpp b/be/src/format_v2/json/json_reader.cpp new file mode 100644 index 00000000000000..04f74f52aefa9a --- /dev/null +++ b/be/src/format_v2/json/json_reader.cpp @@ -0,0 +1,1145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/json/json_reader.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include "common/cast_set.h" +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_array.h" +#include "core/column/column_map.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_struct.h" +#include "core/data_type/data_type_array.h" +#include "core/data_type/data_type_map.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_string.h" +#include "core/data_type/data_type_struct.h" +#include "exprs/vexpr_context.h" +#include "format/file_reader/new_plain_text_line_reader.h" +#include "format_v2/column_mapper.h" +#include "io/file_factory.h" +#include "io/fs/file_reader.h" +#include "io/fs/stream_load_pipe.h" +#include "io/fs/tracing_file_reader.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "util/decompressor.h" +#include "util/slice.h" + +namespace doris::format::json { +namespace { + +DataTypePtr json_file_type_from_slot_type(const DataTypePtr& type) { + if (type == nullptr) { + return nullptr; + } + + // Text-like file readers expose CHAR/VARCHAR as STRING and let the table column mapper cast to + // the destination slot type. JSON follows the same file-schema convention so that v2 mapping + // behaves consistently across text formats. + const bool is_nullable = type->is_nullable(); + const auto nested_type = remove_nullable(type); + DataTypePtr file_type; + switch (nested_type->get_primitive_type()) { + case TYPE_CHAR: + case TYPE_VARCHAR: + file_type = std::make_shared(); + break; + case TYPE_ARRAY: { + const auto* array_type = assert_cast(nested_type.get()); + file_type = std::make_shared( + json_file_type_from_slot_type(array_type->get_nested_type())); + break; + } + case TYPE_MAP: { + const auto* map_type = assert_cast(nested_type.get()); + file_type = std::make_shared( + json_file_type_from_slot_type(map_type->get_key_type()), + json_file_type_from_slot_type(map_type->get_value_type())); + break; + } + case TYPE_STRUCT: { + const auto* struct_type = assert_cast(nested_type.get()); + DataTypes file_children; + file_children.reserve(struct_type->get_elements().size()); + for (const auto& child_type : struct_type->get_elements()) { + file_children.push_back(json_file_type_from_slot_type(child_type)); + } + file_type = + std::make_shared(file_children, struct_type->get_element_names()); + break; + } + default: + file_type = nested_type; + break; + } + + return is_nullable ? make_nullable(file_type) : file_type; +} + +ColumnDefinition synthetic_file_child(const std::string& name, DataTypePtr type, int32_t local_id); + +std::vector synthesize_file_children_from_type(const DataTypePtr& type) { + std::vector children; + if (type == nullptr) { + return children; + } + const auto nested_type = remove_nullable(type); + switch (nested_type->get_primitive_type()) { + case TYPE_ARRAY: { + const auto* array_type = assert_cast(nested_type.get()); + children.push_back(synthetic_file_child("element", array_type->get_nested_type(), 0)); + break; + } + case TYPE_MAP: { + const auto* map_type = assert_cast(nested_type.get()); + children.push_back(synthetic_file_child("key", map_type->get_key_type(), 0)); + children.push_back(synthetic_file_child("value", map_type->get_value_type(), 1)); + break; + } + case TYPE_STRUCT: { + const auto* struct_type = assert_cast(nested_type.get()); + children.reserve(struct_type->get_elements().size()); + for (size_t idx = 0; idx < struct_type->get_elements().size(); ++idx) { + children.push_back(synthetic_file_child(struct_type->get_element_name(idx), + struct_type->get_element(idx), + cast_set(idx))); + } + break; + } + default: + break; + } + return children; +} + +ColumnDefinition synthetic_file_child(const std::string& name, DataTypePtr type, int32_t local_id) { + ColumnDefinition child; + child.identifier = Field::create_field(name); + child.local_id = local_id; + child.name = name; + child.type = std::move(type); + child.children = synthesize_file_children_from_type(child.type); + return child; +} + +std::string lower_key(std::string_view key) { + std::string lowered(key.data(), key.size()); + std::transform(lowered.begin(), lowered.end(), lowered.begin(), ::tolower); + return lowered; +} + +} // namespace + +JsonReader::JsonReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile, + const TFileScanRangeParams* scan_params, const TFileRangeDesc& range, + const std::vector& file_slot_descs, + TFileCompressType::type range_compress_type, + std::optional stream_load_id) + : FileReader(system_properties, file_description, std::move(io_ctx), profile), + _scan_params(scan_params), + _range(range), + _source_file_slot_descs(file_slot_descs), + _range_compress_type(range_compress_type), + _stream_load_id(std::move(stream_load_id)) {} + +JsonReader::~JsonReader() { + static_cast(close()); +} + +Status JsonReader::init(RuntimeState* state) { + _runtime_state = state; + if (_scan_params == nullptr) { + return Status::InvalidArgument("JSON v2 reader requires scan params"); + } + if (_file_description == nullptr) { + return Status::InvalidArgument("JSON v2 reader requires file description"); + } + if (_runtime_state == nullptr) { + return Status::InvalidArgument("JSON v2 reader requires runtime state"); + } + if (!_scan_params->__isset.file_attributes) { + return Status::InvalidArgument("JSON v2 reader requires file attributes"); + } + + const auto& attributes = _scan_params->file_attributes; + if (attributes.__isset.text_params && attributes.text_params.__isset.line_delimiter) { + _line_delimiter = attributes.text_params.line_delimiter; + } else { + _line_delimiter = "\n"; + } + _line_delimiter_length = _line_delimiter.size(); + _jsonpaths = attributes.__isset.jsonpaths ? attributes.jsonpaths : ""; + _json_root = attributes.__isset.json_root ? attributes.json_root : ""; + _read_json_by_line = attributes.__isset.read_json_by_line && attributes.read_json_by_line; + _strip_outer_array = attributes.__isset.strip_outer_array && attributes.strip_outer_array; + _num_as_string = attributes.__isset.num_as_string && attributes.num_as_string; + _fuzzy_parse = attributes.__isset.fuzzy_parse && attributes.fuzzy_parse; + _openx_json_ignore_malformed = attributes.__isset.openx_json_ignore_malformed && + attributes.openx_json_ignore_malformed; + _is_hive_table = _range.table_format_params.table_format_type == "hive"; + _file_compress_type = _range_compress_type != TFileCompressType::UNKNOWN + ? _range_compress_type + : _scan_params->compress_type; + + _source_serdes = create_data_type_serdes(_source_file_slot_descs); + _file_schema.clear(); + _file_schema.reserve(_source_file_slot_descs.size()); + // JSON has no physical footer schema. The FE file slots are therefore the authoritative schema + // for both field names and source local ids. + for (size_t idx = 0; idx < _source_file_slot_descs.size(); ++idx) { + const auto* slot = _source_file_slot_descs[idx]; + DORIS_CHECK(slot != nullptr); + ColumnDefinition field; + field.identifier = Field::create_field(slot->col_name()); + field.local_id = cast_set(idx); + field.name = slot->col_name(); + field.type = json_file_type_from_slot_type(slot->get_data_type_ptr()); + field.children = synthesize_file_children_from_type(field.type); + _file_schema.push_back(std::move(field)); + } + _eof = false; + return Status::OK(); +} + +Status JsonReader::get_schema(std::vector* file_schema) const { + if (file_schema == nullptr) { + return Status::InvalidArgument("JSON v2 file_schema is null"); + } + *file_schema = _file_schema; + return Status::OK(); +} + +std::unique_ptr JsonReader::create_column_mapper( + TableColumnMapperOptions options) const { + return std::make_unique(std::move(options)); +} + +Status JsonReader::open(std::shared_ptr request) { + RETURN_IF_ERROR(FileReader::open(std::move(request))); + DORIS_CHECK(_request != nullptr); + RETURN_IF_ERROR(_build_requested_columns(*_request, &_requested_columns)); + _slot_name_to_index.clear(); + _slot_name_to_index.reserve(_requested_columns.size()); + for (size_t idx = 0; idx < _requested_columns.size(); ++idx) { + auto name = _requested_columns[idx].slot_desc->col_name(); + _slot_name_to_index.emplace(_is_hive_table ? lower_key(name) : name, idx); + } + _previous_positions.clear(); + _reader_range = _json_range(); + RETURN_IF_ERROR(_open_file_reader()); + RETURN_IF_ERROR(_create_decompressor()); + if (_read_json_by_line) { + RETURN_IF_ERROR(_create_line_reader()); + } + RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); + _json_parser = std::make_unique(); + _padding_buffer.resize(_padded_size); + _reader_eof = false; + _single_document_read = false; + _eof = false; + return Status::OK(); +} + +Status JsonReader::get_block(Block* file_block, size_t* rows, bool* eof) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + DORIS_CHECK(eof != nullptr); + if (_json_parser == nullptr || _physical_file_reader == nullptr) { + return Status::InternalError("JSON v2 reader is not open"); + } + + const auto batch_size = _runtime_state->batch_size(); + const auto max_block_bytes = _runtime_state->preferred_block_size_bytes(); + *rows = 0; + *eof = false; + + while (file_block->rows() < batch_size && !_reader_eof && + file_block->bytes() < max_block_bytes) { + if (_read_json_by_line && _skip_first_line) { + size_t skipped_size = 0; + const uint8_t* skipped_line = nullptr; + RETURN_IF_ERROR(_line_reader->read_line(&skipped_line, &skipped_size, &_reader_eof, + _io_ctx.get())); + _skip_first_line = false; + continue; + } + + const size_t original_rows = file_block->rows(); + size_t size = 0; + bool is_empty_row = false; + Status st = Status::OK(); + try { + st = _parse_next_json(&size, &_reader_eof); + if (st.ok() && !_reader_eof) { + if (size == 0) { + is_empty_row = true; + } else { + st = _extract_json_value(size, &_reader_eof, &is_empty_row); + } + } + if (st.ok() && !_reader_eof && !is_empty_row) { + st = _append_rows_from_current_value(file_block, &is_empty_row, &_reader_eof); + } + } catch (simdjson::simdjson_error& e) { + st = Status::DataQualityError("Parse json data failed. code: {}, error info: {}", + e.error(), e.what()); + } + if (!st.ok()) { + RETURN_IF_ERROR(_handle_json_error(st, file_block, original_rows, &is_empty_row)); + } + // An ignored or empty JSON object can produce no row. Avoid spinning forever on a document + // that was consumed but produced no materialized value. + if (!is_empty_row && file_block->rows() == original_rows) { + break; + } + } + + *rows = file_block->rows(); + RETURN_IF_ERROR(_apply_filters(file_block, rows)); + _reader_statistics.read_rows += *rows; + *eof = _reader_eof && *rows == 0; + _eof = *eof; + return Status::OK(); +} + +Status JsonReader::close() { + if (_line_reader != nullptr) { + _line_reader->close(); + _line_reader.reset(); + } + _json_parser.reset(); + _decompressor.reset(); + _physical_file_reader.reset(); + _tracing_file_reader.reset(); + _file_reader.reset(); + _requested_columns.clear(); + _slot_name_to_index.clear(); + _previous_positions.clear(); + _cached_string_values.clear(); + return Status::OK(); +} + +Status JsonReader::_build_requested_columns(const FileScanRequest& request, + std::vector* columns) const { + DORIS_CHECK(columns != nullptr); + columns->clear(); + // FileScanRequest stores a map from file-local id to output block position. Materialization is + // position-driven, so normalize it into a dense vector ordered by block position while keeping + // the original source index for jsonpaths. + std::vector by_position(request.local_positions.size()); + for (const auto& [file_column_id, block_position] : request.local_positions) { + if (file_column_id.value() < 0 || + static_cast(file_column_id.value()) >= _source_file_slot_descs.size()) { + return Status::InvalidArgument("JSON v2 request references unknown local column id {}", + file_column_id.value()); + } + if (block_position.value() >= by_position.size()) { + return Status::InvalidArgument("JSON v2 request has invalid block position {}", + block_position.value()); + } + const auto source_index = cast_set(file_column_id.value()); + RequestedColumn requested_column; + requested_column.file_column_id = file_column_id; + requested_column.block_position = block_position; + requested_column.source_index = source_index; + requested_column.slot_desc = _source_file_slot_descs[source_index]; + requested_column.serde = _source_serdes[source_index]; + by_position[block_position.value()] = std::move(requested_column); + } + for (size_t pos = 0; pos < by_position.size(); ++pos) { + if (!by_position[pos].file_column_id.is_valid()) { + return Status::InvalidArgument("JSON v2 request misses block position {}", pos); + } + } + *columns = std::move(by_position); + return Status::OK(); +} + +TFileRangeDesc JsonReader::_json_range() const { + auto range = _range; + range.__set_path(_file_description->path); + range.__set_start_offset(_file_description->range_start_offset); + range.__set_size(_file_description->range_size); + if (_file_description->file_size >= 0) { + range.__set_file_size(_file_description->file_size); + } + if (!_file_description->fs_name.empty()) { + range.__set_fs_name(_file_description->fs_name); + } + range.__set_file_cache_admission(_file_description->file_cache_admission); + if (_range_compress_type != TFileCompressType::UNKNOWN) { + range.__set_compress_type(_range_compress_type); + } + if (_stream_load_id.has_value()) { + range.__set_load_id(*_stream_load_id); + } + return range; +} + +Status JsonReader::_open_file_reader() { + _current_offset = _reader_range.start_offset; + if (_current_offset != 0) { + --_current_offset; + } + if (_scan_params->file_type == TFileType::FILE_STREAM) { + if (!_stream_load_id.has_value()) { + return Status::InvalidArgument("JSON v2 stream reader requires load id"); + } + RETURN_IF_ERROR(FileFactory::create_pipe_reader(*_stream_load_id, &_physical_file_reader, + _runtime_state, /*need_schema=*/false)); + } else { + _file_description->mtime = + _reader_range.__isset.modification_time ? _reader_range.modification_time : 0; + auto reader_options = FileFactory::get_reader_options(_runtime_state->query_options(), + *_file_description); + auto file_reader = DORIS_TRY(FileFactory::create_file_reader( + *_system_properties, *_file_description, reader_options, _profile)); + _physical_file_reader = + _io_ctx && _io_ctx->file_reader_stats + ? std::make_shared(std::move(file_reader), + _io_ctx->file_reader_stats) + : file_reader; + } + _file_reader = _physical_file_reader; + _tracing_file_reader = _physical_file_reader; + return Status::OK(); +} + +Status JsonReader::_create_decompressor() { + return Decompressor::create_decompressor(_file_compress_type, &_decompressor); +} + +Status JsonReader::_create_line_reader() { + int64_t size = _reader_range.size; + if (_reader_range.start_offset != 0) { + // Start one byte earlier and discard the first partial line, matching split semantics used + // by text readers. + ++size; + _skip_first_line = true; + } else { + _skip_first_line = false; + } + _line_reader = NewPlainTextLineReader::create_unique( + _profile, _physical_file_reader, _decompressor.get(), + std::make_shared(_line_delimiter, _line_delimiter_length, + false), + size, _current_offset); + return Status::OK(); +} + +Status JsonReader::_parse_jsonpath_and_json_root() { + _parsed_jsonpaths.clear(); + _parsed_json_root.clear(); + if (!_jsonpaths.empty()) { + rapidjson::Document jsonpaths_doc; + if (jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError() || + !jsonpaths_doc.IsArray()) { + return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); + } + for (int i = 0; i < jsonpaths_doc.Size(); ++i) { + const rapidjson::Value& path = jsonpaths_doc[i]; + if (!path.IsString()) { + return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); + } + std::string json_path = path.GetString(); + if (json_path.size() == 1 && json_path[0] == '$') { + json_path.insert(1, "."); + } + std::vector parsed_paths; + JsonFunctions::parse_json_paths(json_path, &parsed_paths); + _parsed_jsonpaths.push_back(std::move(parsed_paths)); + } + } + if (!_json_root.empty()) { + std::string json_root = _json_root; + if (json_root.size() == 1 && json_root[0] == '$') { + json_root.insert(1, "."); + } + JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); + } + return Status::OK(); +} + +Status JsonReader::_read_one_document(size_t* size, bool* eof) { + DORIS_CHECK(size != nullptr); + DORIS_CHECK(eof != nullptr); + *size = 0; + *eof = false; + if (_line_reader != nullptr) { + const uint8_t* line = nullptr; + RETURN_IF_ERROR(_line_reader->read_line(&line, size, eof, _io_ctx.get())); + if (*eof) { + return Status::OK(); + } + _document_buffer.assign(reinterpret_cast(line), *size); + return Status::OK(); + } + // Non-line mode treats the split as one JSON document. This supports a single object or an + // array with strip_outer_array=true. + if (_single_document_read) { + *eof = true; + return Status::OK(); + } + _single_document_read = true; + if (_scan_params->file_type == TFileType::FILE_STREAM) { + return _read_one_document_from_pipe(size); + } + + auto read_size = _reader_range.size; + if (read_size <= 0 && _reader_range.__isset.file_size) { + read_size = _reader_range.file_size - _current_offset; + } + if (read_size <= 0) { + *eof = true; + return Status::OK(); + } + _document_buffer.resize(cast_set(read_size)); + Slice result(_document_buffer.data(), _document_buffer.size()); + RETURN_IF_ERROR(_physical_file_reader->read_at(_current_offset, result, size, _io_ctx.get())); + _document_buffer.resize(*size); + if (*size == 0) { + *eof = true; + } + return Status::OK(); +} + +Status JsonReader::_read_one_document_from_pipe(size_t* read_size) { + auto* stream_load_pipe = dynamic_cast(_physical_file_reader.get()); + if (stream_load_pipe == nullptr) { + return Status::InternalError("JSON v2 stream reader requires StreamLoadPipe"); + } + DorisUniqueBufferPtr file_buf; + RETURN_IF_ERROR(stream_load_pipe->read_one_message(&file_buf, read_size)); + _document_buffer.assign(reinterpret_cast(file_buf.get()), *read_size); + if (!stream_load_pipe->is_chunked_transfer()) { + return Status::OK(); + } + + while (true) { + DorisUniqueBufferPtr next_buf; + size_t next_size = 0; + RETURN_IF_ERROR(stream_load_pipe->read_one_message(&next_buf, &next_size)); + if (next_size == 0) { + break; + } + _document_buffer.append(reinterpret_cast(next_buf.get()), next_size); + *read_size += next_size; + } + return Status::OK(); +} + +Status JsonReader::_parse_next_json(size_t* size, bool* eof) { + RETURN_IF_ERROR(_read_one_document(size, eof)); + if (*eof || *size == 0) { + return Status::OK(); + } + if (*size >= 3 && static_cast(_document_buffer[0]) == 0xEF && + static_cast(_document_buffer[1]) == 0xBB && + static_cast(_document_buffer[2]) == 0xBF) { + _document_buffer.erase(0, 3); + *size -= 3; + } + if (*size + simdjson::SIMDJSON_PADDING > _padded_size) { + _padded_size = *size + simdjson::SIMDJSON_PADDING; + _padding_buffer.resize(_padded_size); + } + // Ondemand values reference the input buffer. Keep the padded bytes in a member buffer until the + // current document is fully materialized. + std::memcpy(_padding_buffer.data(), _document_buffer.data(), *size); + _original_doc_size = *size; + const auto error = + _json_parser->iterate(std::string_view(_padding_buffer.data(), *size), _padded_size) + .get(_original_json_doc); + if (error != simdjson::error_code::SUCCESS) { + return Status::DataQualityError( + "Parse json data for JsonDoc failed. code: {}, error info: {}", error, + simdjson::error_message(error)); + } + return Status::OK(); +} + +Status JsonReader::_extract_json_value(size_t size, bool* eof, bool* is_empty_row) { + DORIS_CHECK(eof != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + *is_empty_row = false; + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + auto type_res = _original_json_doc.type(); + if (type_res.error() != simdjson::error_code::SUCCESS) { + return Status::DataQualityError( + "Parse json data for JsonDoc failed. code: {}, error info: {}", type_res.error(), + simdjson::error_message(type_res.error())); + } + const auto type = type_res.value(); + if (type != simdjson::ondemand::json_type::object && + type != simdjson::ondemand::json_type::array) { + return Status::DataQualityError("Not an json object or json array"); + } + _parsed_from_json_root = false; + if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) { + // In object mode json_root can be applied once here. In outer-array mode each array element + // needs its own root extraction, which is handled while iterating the array. + simdjson::ondemand::object object = _original_json_doc; + Status st = JsonFunctions::extract_from_object(object, _parsed_json_root, &_json_value); + if (!st.ok()) { + return Status::DataQualityError("{}", st.to_string()); + } + _parsed_from_json_root = true; + } else { + _json_value = _original_json_doc; + } + + const auto value_type = _json_value.type().value(); + if (value_type == simdjson::ondemand::json_type::array && !_strip_outer_array) { + return Status::DataQualityError( + "JSON data is array-object, `strip_outer_array` must be TRUE."); + } + if (value_type != simdjson::ondemand::json_type::array && _strip_outer_array) { + return Status::DataQualityError( + "JSON data is not an array-object, `strip_outer_array` must be FALSE."); + } + if (!_parsed_jsonpaths.empty() && _strip_outer_array && + _json_value.count_elements().value() == 0) { + *is_empty_row = true; + } + return Status::OK(); +} + +Status JsonReader::_append_rows_from_current_value(Block* block, bool* is_empty_row, bool* eof) { + if (_parsed_jsonpaths.empty()) { + return _append_simple_json_rows(block, is_empty_row, eof); + } + if (_strip_outer_array) { + return _append_flat_array_jsonpath_rows(block, is_empty_row, eof); + } + return _append_nested_jsonpath_row(block, is_empty_row, eof); +} + +Status JsonReader::_append_simple_json_rows(Block* block, bool* is_empty_row, bool* eof) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + DORIS_CHECK(eof != nullptr); + bool valid = false; + if (_json_value.type().value() == simdjson::ondemand::json_type::array) { + _array = _json_value.get_array(); + if (_array.count_elements() == 0) { + *is_empty_row = true; + return Status::OK(); + } + _array_iter = _array.begin(); + while (_array_iter != _array.end()) { + simdjson::ondemand::object object_value = (*_array_iter).get_object(); + RETURN_IF_ERROR(_set_column_values_from_object(&object_value, block, &valid)); + ++_array_iter; + if (!valid) { + *is_empty_row = true; + return Status::OK(); + } + } + } else { + simdjson::ondemand::object object_value = _json_value.get_object(); + RETURN_IF_ERROR(_set_column_values_from_object(&object_value, block, &valid)); + if (!valid) { + *is_empty_row = true; + return Status::OK(); + } + } + *is_empty_row = false; + return Status::OK(); +} + +Status JsonReader::_append_flat_array_jsonpath_rows(Block* block, bool* is_empty_row, bool* eof) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + DORIS_CHECK(eof != nullptr); + const size_t original_rows = block->rows(); + bool valid = true; + _array = _json_value.get_array(); + _array_iter = _array.begin(); + while (_array_iter != _array.end()) { + simdjson::ondemand::object object_value = (*_array_iter).get_object(); + if (!_parsed_from_json_root && !_parsed_json_root.empty()) { + // For strip_outer_array, json_root is evaluated against each element. Elements without + // the requested root do not produce rows, matching the load reader behavior. + simdjson::ondemand::value rooted_value; + Status st = JsonFunctions::extract_from_object(object_value, _parsed_json_root, + &rooted_value); + if (!st.ok()) { + if (st.is()) { + ++_array_iter; + continue; + } + return st; + } + if (rooted_value.type().value() != simdjson::ondemand::json_type::object) { + ++_array_iter; + continue; + } + object_value = rooted_value.get_object(); + } + RETURN_IF_ERROR(_write_columns_by_jsonpath(&object_value, block, &valid)); + ++_array_iter; + } + *is_empty_row = block->rows() == original_rows; + return Status::OK(); +} + +Status JsonReader::_append_nested_jsonpath_row(Block* block, bool* is_empty_row, bool* eof) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + DORIS_CHECK(eof != nullptr); + if (_json_value.type().value() != simdjson::ondemand::json_type::object) { + return Status::DataQualityError("Not object item"); + } + bool valid = true; + simdjson::ondemand::object object_value = _json_value.get_object(); + RETURN_IF_ERROR(_write_columns_by_jsonpath(&object_value, block, &valid)); + *is_empty_row = !valid; + return Status::OK(); +} + +Status JsonReader::_set_column_values_from_object(simdjson::ondemand::object* object_value, + Block* block, bool* valid) { + DORIS_CHECK(object_value != nullptr); + DORIS_CHECK(block != nullptr); + DORIS_CHECK(valid != nullptr); + std::vector seen_columns(block->columns(), false); + const size_t cur_row_count = block->rows(); + bool has_valid_value = false; + size_t key_index = 0; + + for (auto field : *object_value) { + std::string_view key = field.unescaped_key().value(); + const size_t column_index = _column_index(key, key_index++); + if (column_index == static_cast(-1)) { + continue; + } + if (seen_columns[column_index]) { + if (_is_hive_table) { + // Hive JSON keeps the last duplicate key ignoring case. The earlier value has + // already been appended, so remove it before writing the replacement. + _pop_back_last_inserted_value(block, column_index); + } else { + continue; + } + } + simdjson::ondemand::value value = field.value().value(); + const auto& requested = _requested_columns[column_index]; + auto* column_ptr = block->get_by_position(column_index).column->assert_mutable().get(); + RETURN_IF_ERROR(_write_data_to_column( + value, requested.slot_desc->get_data_type_ptr(), column_ptr, + requested.slot_desc->col_name(), requested.serde, valid)); + if (!*valid) { + return Status::OK(); + } + seen_columns[column_index] = true; + has_valid_value = true; + } + + for (size_t i = 0; i < _requested_columns.size(); ++i) { + if (seen_columns[i]) { + continue; + } + auto* column_ptr = block->get_by_position(i).column->assert_mutable().get(); + RETURN_IF_ERROR(_fill_missing_column(_requested_columns[i], column_ptr, valid)); + if (!*valid) { + _truncate_block_to_rows(block, cur_row_count); + return Status::OK(); + } + } + *valid = true; + if (!has_valid_value) { + return Status::OK(); + } + return Status::OK(); +} + +Status JsonReader::_write_columns_by_jsonpath(simdjson::ondemand::object* object_value, + Block* block, bool* valid) { + DORIS_CHECK(object_value != nullptr); + DORIS_CHECK(block != nullptr); + DORIS_CHECK(valid != nullptr); + bool has_valid_value = false; + const size_t cur_row_count = block->rows(); + _cached_string_values.clear(); + + for (size_t i = 0; i < _requested_columns.size(); ++i) { + const auto& requested = _requested_columns[i]; + auto* column_ptr = block->get_by_position(i).column->assert_mutable().get(); + simdjson::ondemand::value json_value; + Status st = Status::OK(); + if (requested.source_index < _parsed_jsonpaths.size()) { + st = JsonFunctions::extract_from_object( + *object_value, _parsed_jsonpaths[requested.source_index], &json_value); + if (!st.ok() && !st.is()) { + return st; + } + } + if (_is_root_path_for_column(requested)) { + // A root jsonpath means "materialize the whole current JSON document" instead of a + // field under it. Use the original bytes so callers receive the same document text. + if (is_column_nullable(*column_ptr)) { + auto* nullable_column = assert_cast(column_ptr); + nullable_column->get_null_map_data().push_back(0); + auto* column_string = + assert_cast(nullable_column->get_nested_column_ptr().get()); + column_string->insert_data(_padding_buffer.data(), _original_doc_size); + } else { + auto* column_string = assert_cast(column_ptr); + column_string->insert_data(_padding_buffer.data(), _original_doc_size); + } + has_valid_value = true; + } else if (requested.source_index >= _parsed_jsonpaths.size() || + st.is()) { + RETURN_IF_ERROR(_fill_missing_column(requested, column_ptr, valid)); + if (!*valid) { + _truncate_block_to_rows(block, cur_row_count); + return Status::OK(); + } + } else { + RETURN_IF_ERROR(_write_data_to_column( + json_value, requested.slot_desc->get_data_type_ptr(), column_ptr, + requested.slot_desc->col_name(), requested.serde, valid)); + if (!*valid) { + _truncate_block_to_rows(block, cur_row_count); + return Status::OK(); + } + has_valid_value = true; + } + } + + if (!has_valid_value) { + // jsonpaths can legally match nothing. Roll the row back so an all-missing path set does + // not create a synthetic row of nulls. + _truncate_block_to_rows(block, cur_row_count); + *valid = false; + return Status::OK(); + } + *valid = true; + return Status::OK(); +} + +template +Status JsonReader::_write_data_to_column(simdjson::ondemand::value& value, + const DataTypePtr& type_desc, IColumn* column_ptr, + const std::string& column_name, + const DataTypeSerDeSPtr& serde, bool* valid) { + ColumnNullable* nullable_column = nullptr; + IColumn* data_column_ptr = column_ptr; + DataTypeSerDeSPtr data_serde = serde; + const auto value_type = value.type().value(); + + if (is_column_nullable(*column_ptr)) { + nullable_column = assert_cast(column_ptr); + data_column_ptr = nullable_column->get_nested_column().get_ptr().get(); + if (type_desc->is_nullable()) { + data_serde = serde->get_nested_serdes()[0]; + } + if (value_type == simdjson::ondemand::json_type::null) { + nullable_column->insert_default(); + *valid = true; + return Status::OK(); + } + } else if (value_type == simdjson::ondemand::json_type::null) { + return Status::DataQualityError("Json value is null, but the column `{}` is not nullable.", + column_name); + } + + const auto primitive_type = type_desc->get_primitive_type(); + if (!is_complex_type(primitive_type)) { + if (value_type == simdjson::ondemand::json_type::string) { + std::string_view value_string; + if constexpr (use_string_cache) { + const auto cache_key = value.raw_json().value(); + if (_cached_string_values.contains(cache_key)) { + value_string = _cached_string_values[cache_key]; + } else { + value_string = value.get_string(); + _cached_string_values.emplace(cache_key, value_string); + } + } else { + value_string = value.get_string(); + } + Slice slice {value_string.data(), value_string.size()}; + RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, + _serde_options)); + } else if (value_type == simdjson::ondemand::json_type::boolean) { + const char* str_value = value.get_bool() ? "1" : "0"; + Slice slice {str_value, 1}; + RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, + _serde_options)); + } else { + std::string_view json_str = simdjson::to_json_string(value); + Slice slice {json_str.data(), json_str.size()}; + RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, + _serde_options)); + } + } else if (primitive_type == TYPE_STRUCT) { + if (value_type != simdjson::ondemand::json_type::object) { + return Status::DataQualityError( + "Json value isn't object, but the column `{}` is struct.", column_name); + } + const auto* type_struct = + assert_cast(remove_nullable(type_desc).get()); + auto* struct_column_ptr = assert_cast(data_column_ptr); + const auto sub_serdes = data_serde->get_nested_serdes(); + std::map sub_col_name_to_idx; + for (size_t sub_col_idx = 0; sub_col_idx < type_struct->get_elements().size(); + ++sub_col_idx) { + sub_col_name_to_idx.emplace(lower_key(type_struct->get_element_name(sub_col_idx)), + sub_col_idx); + } + std::vector has_value(type_struct->get_elements().size(), false); + simdjson::ondemand::object struct_value = value.get_object(); + for (auto sub : struct_value) { + const auto sub_key = lower_key(sub.unescaped_key().value()); + const auto it = sub_col_name_to_idx.find(sub_key); + if (it == sub_col_name_to_idx.end()) { + continue; + } + const auto sub_column_idx = it->second; + auto sub_column_ptr = struct_column_ptr->get_column(sub_column_idx).get_ptr(); + if (has_value[sub_column_idx]) { + // Struct fields follow Hive-style duplicate handling: the last matching nested key + // wins. Remove the earlier nested value before appending the new one. + sub_column_ptr->pop_back(1); + } + has_value[sub_column_idx] = true; + auto sub_value = sub.value().value(); + RETURN_IF_ERROR(_write_data_to_column( + sub_value, type_struct->get_element(sub_column_idx), sub_column_ptr.get(), + column_name + "." + sub_key, sub_serdes[sub_column_idx], valid)); + } + for (size_t sub_col_idx = 0; sub_col_idx < type_struct->get_elements().size(); + ++sub_col_idx) { + if (has_value[sub_col_idx]) { + continue; + } + auto sub_column_ptr = struct_column_ptr->get_column(sub_col_idx).get_ptr(); + if (!is_column_nullable(*sub_column_ptr)) { + return Status::DataQualityError( + "Json file structColumn miss field {} and this column isn't nullable.", + column_name + "." + type_struct->get_element_name(sub_col_idx)); + } + sub_column_ptr->insert_default(); + } + } else if (primitive_type == TYPE_MAP) { + if (value_type != simdjson::ondemand::json_type::object) { + return Status::DataQualityError("Json value isn't object, but the column `{}` is map.", + column_name); + } + const auto* map_type = assert_cast(remove_nullable(type_desc).get()); + auto* map_column_ptr = assert_cast(data_column_ptr); + const auto sub_serdes = data_serde->get_nested_serdes(); + size_t field_count = 0; + simdjson::ondemand::object object_value = value.get_object(); + for (auto member_value : object_value) { + auto* key_column = map_column_ptr->get_keys_ptr()->assert_mutable()->get_ptr().get(); + auto key_serde = sub_serdes[0]; + if (is_column_nullable(*key_column)) { + auto* nullable_key = assert_cast(key_column); + nullable_key->get_null_map_data().push_back(0); + key_column = nullable_key->get_nested_column().get_ptr().get(); + if (map_type->get_key_type()->is_nullable()) { + key_serde = key_serde->get_nested_serdes()[0]; + } + } + std::string_view key_view = member_value.unescaped_key().value(); + Slice key_slice(key_view.data(), key_view.size()); + RETURN_IF_ERROR(key_serde->deserialize_one_cell_from_json(*key_column, key_slice, + _serde_options)); + simdjson::ondemand::value field_value = member_value.value().value(); + RETURN_IF_ERROR(_write_data_to_column( + field_value, map_type->get_value_type(), + map_column_ptr->get_values_ptr()->assert_mutable()->get_ptr().get(), + column_name + ".value", sub_serdes[1], valid)); + ++field_count; + } + auto& offsets = map_column_ptr->get_offsets(); + offsets.emplace_back(offsets.back() + field_count); + } else if (primitive_type == TYPE_ARRAY) { + if (value_type != simdjson::ondemand::json_type::array) { + return Status::DataQualityError("Json value isn't array, but the column `{}` is array.", + column_name); + } + const auto* array_type = + assert_cast(remove_nullable(type_desc).get()); + auto* array_column_ptr = assert_cast(data_column_ptr); + const auto sub_serdes = data_serde->get_nested_serdes(); + size_t field_count = 0; + simdjson::ondemand::array array_value = value.get_array(); + for (simdjson::ondemand::value sub_value : array_value) { + RETURN_IF_ERROR(_write_data_to_column( + sub_value, array_type->get_nested_type(), + array_column_ptr->get_data().get_ptr().get(), column_name + ".element", + sub_serdes[0], valid)); + ++field_count; + } + auto& offsets = array_column_ptr->get_offsets(); + offsets.emplace_back(offsets.back() + field_count); + } else { + return Status::InternalError("Not support JSON value to complex column"); + } + + if (nullable_column && value_type != simdjson::ondemand::json_type::null) { + nullable_column->get_null_map_data().push_back(0); + } + *valid = true; + return Status::OK(); +} + +Status JsonReader::_fill_missing_column(const RequestedColumn& column, IColumn* column_ptr, + bool* valid) { + if (column.slot_desc->is_nullable()) { + auto* nullable_column = assert_cast(column_ptr); + nullable_column->insert_default(); + *valid = true; + return Status::OK(); + } + return Status::DataQualityError( + "The column `{}` is not nullable, but it's not found in jsondata.", + column.slot_desc->col_name()); +} + +Status JsonReader::_append_null_for_malformed_json(Block* block) { + DORIS_CHECK(block != nullptr); + for (int i = 0; i < block->columns(); ++i) { + auto& column_with_type = block->get_by_position(i); + if (!is_column_nullable(*column_with_type.column)) { + return Status::DataQualityError("malformed json, but the column `{}` is not nullable.", + column_with_type.column->get_name()); + } + auto column = IColumn::mutate(std::move(column_with_type.column)); + assert_cast(column.get())->insert_default(); + column_with_type.column = std::move(column); + } + return Status::OK(); +} + +Status JsonReader::_handle_json_error(const Status& status, Block* block, size_t original_rows, + bool* is_empty_row) { + DORIS_CHECK(block != nullptr); + DORIS_CHECK(is_empty_row != nullptr); + // Deserialization can fail after several columns have already appended data. Always restore the + // block to the row count before this document before either surfacing the error or appending + // the ignore-malformed null row. + _truncate_block_to_rows(block, original_rows); + if (_openx_json_ignore_malformed && status.is()) { + RETURN_IF_ERROR(_append_null_for_malformed_json(block)); + *is_empty_row = false; + return Status::OK(); + } + return status; +} + +Status JsonReader::_apply_filters(Block* file_block, size_t* rows) { + DORIS_CHECK(file_block != nullptr); + DORIS_CHECK(rows != nullptr); + const size_t rows_before_filter = *rows; + size_t rows_after_delete_filter = rows_before_filter; + if (_request != nullptr && rows_before_filter > 0 && !_request->delete_conjuncts.empty()) { + RETURN_IF_ERROR(VExprContext::filter_block(_request->delete_conjuncts, file_block, + file_block->columns())); + rows_after_delete_filter = + file_block->columns() == 0 ? rows_before_filter : file_block->rows(); + } + + size_t rows_after_filter = rows_after_delete_filter; + if (_request != nullptr && rows_after_delete_filter > 0 && !_request->conjuncts.empty()) { + RETURN_IF_ERROR( + VExprContext::filter_block(_request->conjuncts, file_block, file_block->columns())); + rows_after_filter = + file_block->columns() == 0 ? rows_after_delete_filter : file_block->rows(); + if (_io_ctx != nullptr) { + _io_ctx->predicate_filtered_rows += rows_after_delete_filter - rows_after_filter; + } + } + *rows = rows_after_filter; + return Status::OK(); +} + +void JsonReader::_truncate_block_to_rows(Block* block, size_t num_rows) { + DORIS_CHECK(block != nullptr); + for (int i = 0; i < block->columns(); ++i) { + auto& column_with_type = block->get_by_position(i); + auto column = IColumn::mutate(std::move(column_with_type.column)); + if (column->size() > num_rows) { + column->pop_back(column->size() - num_rows); + } + column_with_type.column = std::move(column); + } +} + +void JsonReader::_pop_back_last_inserted_value(Block* block, size_t column_index) { + DORIS_CHECK(block != nullptr); + auto& column = block->get_by_position(column_index).column; + auto mutable_column = IColumn::mutate(std::move(column)); + mutable_column->pop_back(1); + column = std::move(mutable_column); +} + +size_t JsonReader::_column_index(std::string_view key, size_t key_index) { + std::string hive_key; + std::string_view lookup_key = key; + if (_is_hive_table) { + hive_key = lower_key(key); + lookup_key = hive_key; + } + if (key_index < _previous_positions.size()) { + // Most JSON lines share field order. Reuse the previous line's key-position mapping before + // falling back to the hash table lookup. + const auto previous = _previous_positions[key_index]; + if (previous < _requested_columns.size()) { + const auto previous_name = _requested_columns[previous].slot_desc->col_name(); + if ((_is_hive_table ? lower_key(previous_name) : previous_name) == lookup_key) { + return previous; + } + } + } + const auto it = _slot_name_to_index.find(std::string(lookup_key)); + if (it == _slot_name_to_index.end()) { + return static_cast(-1); + } + if (key_index >= _previous_positions.size()) { + _previous_positions.resize(key_index + 1, static_cast(-1)); + } + _previous_positions[key_index] = it->second; + return it->second; +} + +bool JsonReader::_is_root_path_for_column(const RequestedColumn& column) const { + return column.source_index < _parsed_jsonpaths.size() && + JsonFunctions::is_root_path(_parsed_jsonpaths[column.source_index]); +} + +} // namespace doris::format::json diff --git a/be/src/format_v2/json/json_reader.h b/be/src/format_v2/json/json_reader.h new file mode 100644 index 00000000000000..52cdfad6728d64 --- /dev/null +++ b/be/src/format_v2/json/json_reader.h @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include // IWYU pragma: keep + +#include +#include +#include +#include +#include +#include + +#include "core/custom_allocator.h" +#include "core/data_type_serde/data_type_serde.h" +#include "exprs/json_functions.h" +#include "format_v2/file_reader.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/runtime_profile.h" + +namespace doris { +class Decompressor; +class LineReader; +class SlotDescriptor; +class IColumn; +} // namespace doris + +namespace doris::format::json { + +// FileScannerV2 JSON reader. +// +// JSON files do not carry an embedded physical schema. The v2 table layer still needs a +// file-local schema and FileScanRequest contract, so this reader exposes FE-provided file slots as +// v2 file-local columns and performs JSON parsing/materialization directly in the v2 path. +class JsonReader final : public FileReader { +public: + // `file_slot_descs` is the FE-planned file schema. JSON has no physical schema, so the reader + // exposes these slots as synthetic file-local columns and materializes only the columns + // requested by FileScanRequest. + JsonReader(std::shared_ptr& system_properties, + std::unique_ptr& file_description, + std::shared_ptr io_ctx, RuntimeProfile* profile, + const TFileScanRangeParams* scan_params, const TFileRangeDesc& range, + const std::vector& file_slot_descs, + TFileCompressType::type range_compress_type = TFileCompressType::UNKNOWN, + std::optional stream_load_id = std::nullopt); + ~JsonReader() override; + + // Initializes scan attributes and builds the synthetic schema from FE slots. + Status init(RuntimeState* state) override; + Status get_schema(std::vector* file_schema) const override; + std::unique_ptr create_column_mapper( + TableColumnMapperOptions options) const override; + // Opens the underlying file or stream and binds requested local column ids to output block + // positions. After this call, `get_block` can be called until it returns eof. + Status open(std::shared_ptr request) override; + // Appends rows into `file_block` according to the FileScanRequest order. The block must already + // contain columns matching the requested positions. + Status get_block(Block* file_block, size_t* rows, bool* eof) override; + Status close() override; + +private: + // A requested column keeps both identities: + // - `source_index`: index in FE file slots, used for jsonpaths and SerDe lookup. + // - `block_position`: index in the caller's output block, used for materialization. + struct RequestedColumn { + LocalColumnId file_column_id = LocalColumnId::invalid(); + LocalIndex block_position; + size_t source_index = 0; + SlotDescriptor* slot_desc = nullptr; + DataTypeSerDeSPtr serde; + }; + + Status _build_requested_columns(const FileScanRequest& request, + std::vector* columns) const; + // Reconciles TableReader's split/range descriptor with FileReader's concrete file description. + TFileRangeDesc _json_range() const; + Status _open_file_reader(); + Status _create_decompressor(); + Status _create_line_reader(); + Status _parse_jsonpath_and_json_root(); + // Reads one logical JSON document: one line for JSON Lines, or the whole range/pipe payload for + // single-document mode. + Status _read_one_document(size_t* size, bool* eof); + Status _read_one_document_from_pipe(size_t* read_size); + // Moves the logical document into a simdjson-padded buffer and creates an ondemand document. + Status _parse_next_json(size_t* size, bool* eof); + // Applies json_root and validates the object/array shape required by strip_outer_array. + Status _extract_json_value(size_t size, bool* eof, bool* is_empty_row); + Status _append_rows_from_current_value(Block* block, bool* is_empty_row, bool* eof); + Status _append_simple_json_rows(Block* block, bool* is_empty_row, bool* eof); + Status _append_flat_array_jsonpath_rows(Block* block, bool* is_empty_row, bool* eof); + Status _append_nested_jsonpath_row(Block* block, bool* is_empty_row, bool* eof); + Status _set_column_values_from_object(simdjson::ondemand::object* object_value, Block* block, + bool* valid); + Status _write_columns_by_jsonpath(simdjson::ondemand::object* object_value, Block* block, + bool* valid); + template + Status _write_data_to_column(simdjson::ondemand::value& value, const DataTypePtr& type_desc, + IColumn* column_ptr, const std::string& column_name, + const DataTypeSerDeSPtr& serde, bool* valid); + Status _fill_missing_column(const RequestedColumn& column, IColumn* column_ptr, bool* valid); + Status _append_null_for_malformed_json(Block* block); + Status _handle_json_error(const Status& status, Block* block, size_t original_rows, + bool* is_empty_row); + Status _apply_filters(Block* file_block, size_t* rows); + void _truncate_block_to_rows(Block* block, size_t num_rows); + void _pop_back_last_inserted_value(Block* block, size_t column_index); + size_t _column_index(std::string_view key, size_t key_index); + bool _is_root_path_for_column(const RequestedColumn& column) const; + + const TFileScanRangeParams* _scan_params = nullptr; + TFileRangeDesc _range; + TFileRangeDesc _reader_range; + std::vector _source_file_slot_descs; + DataTypeSerDeSPtrs _source_serdes; + std::vector _file_schema; + RuntimeState* _runtime_state = nullptr; + TFileCompressType::type _range_compress_type = TFileCompressType::UNKNOWN; + std::optional _stream_load_id; + std::vector _requested_columns; + std::unordered_map _slot_name_to_index; + std::vector _previous_positions; + + io::FileReaderSPtr _physical_file_reader; + std::unique_ptr _decompressor; + std::unique_ptr _line_reader; + int64_t _current_offset = 0; + bool _reader_eof = false; + bool _skip_first_line = false; + bool _single_document_read = false; + + std::string _line_delimiter; + size_t _line_delimiter_length = 0; + std::string _jsonpaths; + std::string _json_root; + bool _read_json_by_line = false; + bool _strip_outer_array = false; + bool _num_as_string = false; + bool _fuzzy_parse = false; + bool _is_hive_table = false; + bool _openx_json_ignore_malformed = false; + TFileCompressType::type _file_compress_type = TFileCompressType::UNKNOWN; + + std::vector> _parsed_jsonpaths; + std::vector _parsed_json_root; + bool _parsed_from_json_root = false; + DataTypeSerDe::FormatOptions _serde_options; + + // simdjson ondemand values point into `_padding_buffer`, so the buffer must outlive all values + // created from the current document. + std::unique_ptr _json_parser; + simdjson::ondemand::document _original_json_doc; + simdjson::ondemand::value _json_value; + simdjson::ondemand::array _array; + simdjson::ondemand::array_iterator _array_iter; + std::string _document_buffer; + std::string _padding_buffer; + size_t _original_doc_size = 0; + size_t _padded_size = 1024 * 1024 * 8 + simdjson::SIMDJSON_PADDING; + std::unordered_map _cached_string_values; +}; + +} // namespace doris::format::json diff --git a/be/src/format_v2/table/hive_reader.cpp b/be/src/format_v2/table/hive_reader.cpp index 72940971152af4..ad4b75b00856ea 100644 --- a/be/src/format_v2/table/hive_reader.cpp +++ b/be/src/format_v2/table/hive_reader.cpp @@ -81,11 +81,12 @@ Status HiveReader::init(format::TableReadOptions&& options) { use_column_names = query_options.hive_orc_use_column_names; } else if (file_format == format::FileFormat::PARQUET) { use_column_names = query_options.hive_parquet_use_column_names; - } else if (file_format == format::FileFormat::CSV || file_format == format::FileFormat::TEXT) { - // Hive CSV/TEXT readers synthesize a file-local schema from FE-provided file slots because - // delimited text files do not carry embedded column names or field ids. The scan params' - // column_idxs still tell CsvReader/TextReader which physical field ordinal to read, while - // the table-level mapper can safely match the synthesized file schema by table column name. + } else if (file_format == format::FileFormat::CSV || file_format == format::FileFormat::TEXT || + file_format == format::FileFormat::JSON) { + // Hive CSV/TEXT/JSON readers synthesize a file-local schema from FE-provided file slots + // because these formats do not carry embedded column names or field ids. The scan params' + // format-specific attributes still tell the physical reader how to read values, while the + // table-level mapper can safely match the synthesized file schema by table column name. use_column_names = true; } else { return Status::NotSupported("HiveReader does not support file reader format {}", diff --git a/be/src/format_v2/table_reader.cpp b/be/src/format_v2/table_reader.cpp index 97fa4145c427a2..d90d4f6ea337d1 100644 --- a/be/src/format_v2/table_reader.cpp +++ b/be/src/format_v2/table_reader.cpp @@ -43,6 +43,7 @@ #include "format_v2/column_mapper.h" #include "format_v2/delimited_text/csv_reader.h" #include "format_v2/delimited_text/text_reader.h" +#include "format_v2/json/json_reader.h" #include "format_v2/parquet/parquet_reader.h" #include "roaring/roaring64map.hh" #include "storage/segment/condition_cache.h" @@ -73,6 +74,8 @@ std::string file_format_to_string(FileFormat format) { return "ORC"; case FileFormat::CSV: return "CSV"; + case FileFormat::JSON: + return "JSON"; case FileFormat::TEXT: return "TEXT"; case FileFormat::JNI: @@ -725,6 +728,16 @@ Status TableReader::create_file_reader(std::unique_ptr* reader) { _current_range_load_id); return Status::OK(); } + if (_format == FileFormat::JSON) { + if (_file_slot_descs == nullptr) { + return Status::InvalidArgument("JSON reader requires file slot descriptors"); + } + *reader = std::make_unique( + _system_properties, _current_task->data_file, _io_ctx, _scanner_profile, + _scan_params, _current_file_range_desc, *_file_slot_descs, + _current_range_compress_type, _current_range_load_id); + return Status::OK(); + } return Status::NotSupported("TableReader does not support file format {}", file_format_to_string(_format)); } @@ -751,6 +764,7 @@ Status TableReader::prepare_split(const SplitReadOptions& options) { _current_task = std::make_unique(); _current_task->data_file = create_file_description(options.current_range); _current_file_description = *_current_task->data_file; + _current_file_range_desc = options.current_range; _current_range_compress_type = options.current_range.__isset.compress_type ? options.current_range.compress_type : TFileCompressType::UNKNOWN; diff --git a/be/src/format_v2/table_reader.h b/be/src/format_v2/table_reader.h index 23486e0fdf8b1d..a94cae621c9546 100644 --- a/be/src/format_v2/table_reader.h +++ b/be/src/format_v2/table_reader.h @@ -1422,6 +1422,7 @@ class TableReader { // each TFileRangeDesc, matching the old FileScanner reader contract. TFileCompressType::type _current_range_compress_type = TFileCompressType::UNKNOWN; std::optional _current_range_load_id; + TFileRangeDesc _current_file_range_desc; std::shared_ptr _system_properties; // partition key -> value std::map _partition_values; diff --git a/be/test/exec/scan/file_scanner_v2_test.cpp b/be/test/exec/scan/file_scanner_v2_test.cpp index 8a5d31e6fb5062..d3f0507aca1122 100644 --- a/be/test/exec/scan/file_scanner_v2_test.cpp +++ b/be/test/exec/scan/file_scanner_v2_test.cpp @@ -101,10 +101,12 @@ TEST(FileScannerV2Test, SupportedFormatMatrix) { {"hive", TFileFormatType::FORMAT_CSV_SNAPPYBLOCK, std::nullopt, true}, {"hive", TFileFormatType::FORMAT_PROTO, std::nullopt, true}, {"hive", TFileFormatType::FORMAT_TEXT, std::nullopt, true}, + {"hive", TFileFormatType::FORMAT_JSON, std::nullopt, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_ORC, false}, {"hive", TFileFormatType::FORMAT_ORC, TFileFormatType::FORMAT_PARQUET, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_CSV_PLAIN, true}, {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_TEXT, true}, + {"hive", TFileFormatType::FORMAT_PARQUET, TFileFormatType::FORMAT_JSON, true}, }; for (const auto& test_case : cases) { @@ -177,6 +179,7 @@ TEST(FileScannerV2Test, FileFormatConversionMatrix) { {TFileFormatType::FORMAT_CSV_SNAPPYBLOCK, format::FileFormat::CSV}, {TFileFormatType::FORMAT_PROTO, format::FileFormat::CSV}, {TFileFormatType::FORMAT_TEXT, format::FileFormat::TEXT}, + {TFileFormatType::FORMAT_JSON, format::FileFormat::JSON}, {TFileFormatType::FORMAT_ORC, std::nullopt}, }; diff --git a/be/test/format_v2/json/json_reader_test.cpp b/be/test/format_v2/json/json_reader_test.cpp new file mode 100644 index 00000000000000..994d71c0e6aefc --- /dev/null +++ b/be/test/format_v2/json/json_reader_test.cpp @@ -0,0 +1,402 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format_v2/json/json_reader.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include "common/object_pool.h" +#include "core/assert_cast.h" +#include "core/block/block.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "format_v2/column_data.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_profile.h" +#include "testutil/mock/mock_runtime_state.h" + +namespace doris::format::json { +namespace { + +TFileScanRangeParams json_scan_params(bool read_json_by_line = true, bool strip_outer_array = false, + std::string jsonpaths = "", std::string json_root = "", + bool ignore_malformed = false) { + TFileScanRangeParams params; + params.__set_format_type(TFileFormatType::FORMAT_JSON); + params.__set_file_type(TFileType::FILE_LOCAL); + params.__set_compress_type(TFileCompressType::PLAIN); + TFileAttributes attributes; + TFileTextScanRangeParams text_params; + text_params.__set_line_delimiter("\n"); + attributes.__set_text_params(std::move(text_params)); + attributes.__set_read_json_by_line(read_json_by_line); + attributes.__set_strip_outer_array(strip_outer_array); + attributes.__set_num_as_string(false); + attributes.__set_fuzzy_parse(false); + if (!jsonpaths.empty()) { + attributes.__set_jsonpaths(std::move(jsonpaths)); + } + if (!json_root.empty()) { + attributes.__set_json_root(std::move(json_root)); + } + if (ignore_malformed) { + attributes.__set_openx_json_ignore_malformed(true); + } + params.__set_file_attributes(std::move(attributes)); + return params; +} + +SlotDescriptor* make_test_slot(ObjectPool* pool, int slot_id, int slot_idx, DataTypePtr type, + const std::string& name) { + TSlotDescriptor slot_desc; + slot_desc.__set_id(slot_id); + slot_desc.__set_parent(0); + slot_desc.__set_slotType(type->to_thrift()); + slot_desc.__set_columnPos(slot_idx); + slot_desc.__set_byteOffset(0); + if (type->is_nullable()) { + slot_desc.__set_nullIndicatorByte(slot_idx / 8); + slot_desc.__set_nullIndicatorBit(slot_idx % 8); + } else { + slot_desc.__set_nullIndicatorByte(0); + slot_desc.__set_nullIndicatorBit(-1); + } + slot_desc.__set_slotIdx(slot_idx); + slot_desc.__set_isMaterialized(true); + slot_desc.__set_colName(name); + return pool->add(new SlotDescriptor(slot_desc)); +} + +std::vector build_slots(ObjectPool* pool) { + return {make_test_slot(pool, 0, 0, make_nullable(std::make_shared()), "id"), + make_test_slot(pool, 1, 1, make_nullable(std::make_shared()), "name")}; +} + +std::vector build_slots_with_required_name(ObjectPool* pool) { + return {make_test_slot(pool, 0, 0, make_nullable(std::make_shared()), "id"), + make_test_slot(pool, 1, 1, std::make_shared(), "name")}; +} + +std::unique_ptr file_description(const std::string& path) { + auto desc = std::make_unique(); + desc->path = path; + desc->file_size = static_cast(std::filesystem::file_size(path)); + desc->range_start_offset = 0; + desc->range_size = desc->file_size; + return desc; +} + +std::filesystem::path write_json_file(const std::string& name, const std::string& content) { + const auto test_dir = std::filesystem::temp_directory_path() / "doris_format_v2_json_reader"; + std::filesystem::create_directories(test_dir); + const auto file_path = test_dir / name; + std::ofstream out(file_path); + out << content; + return file_path; +} + +TFileRangeDesc file_range(const std::filesystem::path& file_path) { + TFileRangeDesc range; + range.__set_path(file_path.string()); + range.__set_start_offset(0); + range.__set_size(static_cast(std::filesystem::file_size(file_path))); + range.__set_file_size(static_cast(std::filesystem::file_size(file_path))); + return range; +} + +Block make_block(const std::vector& schema, + const std::vector& local_ids) { + Block block; + for (const auto local_id : local_ids) { + const auto it = std::ranges::find_if( + schema, [&](const auto& column) { return column.local_id == local_id; }); + EXPECT_TRUE(it != schema.end()); + block.insert({it->type->create_column(), it->type, it->name}); + } + return block; +} + +struct ReadResult { + Status status; + Status second_status = Status::OK(); + Block block; + size_t rows = 0; + bool eof = false; + size_t second_rows = 0; + bool second_eof = false; + std::vector schema; +}; + +ReadResult read_once(const std::string& file_name, const std::string& content, + TFileScanRangeParams params, const std::vector& slots, + const std::vector& requested_local_ids, bool read_twice = false) { + const auto file_path = write_json_file(file_name, content); + auto range = file_range(file_path); + + auto system_properties = std::make_shared(); + system_properties->system_type = TFileType::FILE_LOCAL; + auto desc = file_description(file_path.string()); + RuntimeProfile profile("json_v2_reader_test"); + MockRuntimeState state; + JsonReader reader(system_properties, desc, nullptr, &profile, ¶ms, range, slots); + + ReadResult result; + result.status = reader.init(&state); + if (!result.status.ok()) { + return result; + } + result.status = reader.get_schema(&result.schema); + if (!result.status.ok()) { + return result; + } + + auto request = std::make_shared(); + for (size_t i = 0; i < requested_local_ids.size(); ++i) { + request->local_positions.emplace(LocalColumnId(requested_local_ids[i]), LocalIndex(i)); + } + result.status = reader.open(request); + if (!result.status.ok()) { + return result; + } + + result.block = make_block(result.schema, requested_local_ids); + result.status = reader.get_block(&result.block, &result.rows, &result.eof); + if (result.status.ok() && read_twice) { + auto eof_block = make_block(result.schema, requested_local_ids); + result.second_status = + reader.get_block(&eof_block, &result.second_rows, &result.second_eof); + } + return result; +} + +std::string nullable_string_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data_at(row).to_string(); +} + +std::string string_at(const IColumn& column, size_t row) { + const auto& nested = assert_cast(column); + return nested.get_data_at(row).to_string(); +} + +int32_t nullable_int_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + const auto& nested = assert_cast(nullable.get_nested_column()); + return nested.get_data()[row]; +} + +bool nullable_is_null_at(const IColumn& column, size_t row) { + const auto& nullable = assert_cast(column); + return nullable.is_null_at(row); +} + +} // namespace + +TEST(JsonReaderTest, ReadsRequestedColumnsInFileScanRequestOrder) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("order.jsonl", + R"({"id":1,"name":"alice"})" + "\n" + R"({"id":2,"name":"bob"})" + "\n", + json_scan_params(), slots, {1, 0}, true); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.schema.size(), 2); + EXPECT_EQ(result.schema[0].name, "id"); + EXPECT_EQ(result.schema[0].local_id, 0); + EXPECT_EQ(result.schema[1].name, "name"); + EXPECT_EQ(result.schema[1].local_id, 1); + ASSERT_EQ(result.rows, 2); + ASSERT_EQ(result.block.columns(), 2); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 0), "alice"); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 1), "bob"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 0), 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 1), 2); + ASSERT_TRUE(result.second_status.ok()) << result.second_status.to_string(); + EXPECT_EQ(result.second_rows, 0); + EXPECT_TRUE(result.second_eof); +} + +TEST(JsonReaderTest, ReadsSingleDocumentOuterArray) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = + read_once("outer_array.json", R"([{"id":3,"name":"carol"},{"id":4,"name":"dave"}])", + json_scan_params(false, true), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 3); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "carol"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 4); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "dave"); +} + +TEST(JsonReaderTest, ReadsJsonRootByLine) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("json_root.jsonl", + R"({"payload":{"id":5,"name":"eve"}})" + "\n" + R"({"payload":{"id":6,"name":"frank"}})" + "\n", + json_scan_params(true, false, "", "$.payload"), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 5); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "eve"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 6); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "frank"); +} + +TEST(JsonReaderTest, ReadsJsonPathsBySourceSlotAndReturnsRequestedBlockOrder) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("jsonpaths.jsonl", + R"({"payload":{"id":7,"user":"grace"}})" + "\n" + R"({"payload":{"id":8,"user":"heidi"}})" + "\n", + json_scan_params(true, false, R"(["$.payload.id","$.payload.user"])"), + slots, {1, 0}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 0), "grace"); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(0).column, 1), "heidi"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 0), 7); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(1).column, 1), 8); +} + +TEST(JsonReaderTest, ReadsJsonPathsFromSingleDocumentOuterArray) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once( + "outer_array_jsonpaths.json", + R"([{"payload":{"id":12,"user":"kate"}},{"payload":{"id":13,"user":"leo"}}])", + json_scan_params(false, true, R"(["$.payload.id","$.payload.user"])"), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 12); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "kate"); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 13); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "leo"); +} + +TEST(JsonReaderTest, FillsMissingNullableColumnWithNull) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("missing_nullable.jsonl", + R"({"id":9})" + "\n", + json_scan_params(), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 9); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(1).column, 0)); +} + +TEST(JsonReaderTest, ReturnsErrorForMissingRequiredColumn) { + ObjectPool pool; + auto slots = build_slots_with_required_name(&pool); + auto result = read_once("missing_required.jsonl", + R"({"id":10})" + "\n", + json_scan_params(), slots, {0, 1}); + + EXPECT_FALSE(result.status.ok()); +} + +TEST(JsonReaderTest, ReadsPresentRequiredColumn) { + ObjectPool pool; + auto slots = build_slots_with_required_name(&pool); + auto result = read_once("present_required.jsonl", + R"({"id":14,"name":"mallory"})" + "\n", + json_scan_params(), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.schema.size(), 2); + EXPECT_TRUE(result.schema[0].type->is_nullable()); + EXPECT_FALSE(result.schema[1].type->is_nullable()); + ASSERT_EQ(result.rows, 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 14); + EXPECT_EQ(string_at(*result.block.get_by_position(1).column, 0), "mallory"); +} + +TEST(JsonReaderTest, ReturnsErrorForMalformedJsonByDefault) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("malformed_strict.jsonl", + "not-json\n" + R"({"id":11,"name":"judy"})" + "\n", + json_scan_params(), slots, {0, 1}); + + EXPECT_FALSE(result.status.ok()); +} + +TEST(JsonReaderTest, IgnoresMalformedJsonAsNullRowsWhenConfigured) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("ignore_malformed.jsonl", + "not-json\n" + R"({"id":11,"name":"judy"})" + "\n", + json_scan_params(true, false, "", "", true), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 2); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(0).column, 0)); + EXPECT_TRUE(nullable_is_null_at(*result.block.get_by_position(1).column, 0)); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 1), 11); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 1), "judy"); +} + +TEST(JsonReaderTest, SkipsEmptyJsonLine) { + ObjectPool pool; + auto slots = build_slots(&pool); + auto result = read_once("empty_line.jsonl", + "\n" + R"({"id":15,"name":"nancy"})" + "\n", + json_scan_params(), slots, {0, 1}); + + ASSERT_TRUE(result.status.ok()) << result.status.to_string(); + ASSERT_EQ(result.rows, 1); + EXPECT_EQ(nullable_int_at(*result.block.get_by_position(0).column, 0), 15); + EXPECT_EQ(nullable_string_at(*result.block.get_by_position(1).column, 0), "nancy"); +} + +} // namespace doris::format::json diff --git a/be/test/format_v2/table/hive_reader_test.cpp b/be/test/format_v2/table/hive_reader_test.cpp index 03b87db0c5ccb6..a41effaa91a3a9 100644 --- a/be/test/format_v2/table/hive_reader_test.cpp +++ b/be/test/format_v2/table/hive_reader_test.cpp @@ -89,6 +89,17 @@ TEST_F(HiveV2ReaderTest, InitSupportsTextFileFormat) { EXPECT_EQ(reader.mapping_mode(), TableColumnMappingMode::BY_NAME); } +// Scenario: Hive JSON files also synthesize a file-local schema from FE slots, so they should use +// name mapping at the table-reader layer while JsonReader consumes JSON attributes. +TEST_F(HiveV2ReaderTest, InitSupportsJsonFileFormat) { + TFileScanRangeParams params; + params.__set_format_type(TFileFormatType::FORMAT_JSON); + HiveReader reader; + + ASSERT_TRUE(init_hive_reader(FileFormat::JSON, ¶ms, &state, &profile, &reader).ok()); + EXPECT_EQ(reader.mapping_mode(), TableColumnMappingMode::BY_NAME); +} + // Scenario: positional mapping is only for Hive Parquet/ORC sessions that disable name mapping. // CSV keeps the synthesized file-column names and leaves column_idxs for the CsvReader itself. TEST_F(HiveV2ReaderTest, CsvDoesNotConsumeColumnIdxsAsPositionalSchemaMapping) { diff --git a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out index 6eadea56694c85..aa7220b55f16b2 100644 --- a/regression-test/data/external_table_p0/hive/test_hive_openx_json.out +++ b/regression-test/data/external_table_p0/hive/test_hive_openx_json.out @@ -22,4 +22,3 @@ [1,2,3] \N \N \N \N {"name":"bad1","id":5,"numbers":[1,2,3] \N \N \N \N {"name":"bad2","id":6,"numbers":"not an array","scores":{"key4":40},"details":{"a":4,"b":"text","c":4000000}} \N \N \N \N -