From 75bd63fb091305b458866af5ceb2cf083728a15f Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 1 Jul 2026 20:51:39 -0700 Subject: [PATCH] feat(avro): apply column default values when reading missing fields (3/4) When a column is present in the read schema but missing from an Avro data file (written before the column existed), fill it with the column's v3 initial-default instead of null. Reuses the shared arrow/literal_util materializer (merged in #792) and adds AppendDefaultToBuilder for the row-by-row Avro decode paths, plus a kDefault projection branch in the Avro schema/data projection. Part 3 of the v3 column-default-values work (POC #731), built on the schema support in #746 and the Parquet read path in #792. --- src/iceberg/arrow/literal_util.cc | 24 ++++++++++++++ src/iceberg/arrow/literal_util_internal.h | 4 +++ src/iceberg/avro/avro_data_util.cc | 9 ++++++ src/iceberg/avro/avro_direct_decoder.cc | 4 +++ src/iceberg/avro/avro_schema_util.cc | 4 +++ src/iceberg/test/avro_data_test.cc | 38 +++++++++++++++++++++++ src/iceberg/test/avro_test.cc | 29 +++++++++++++++++ src/iceberg/test/literal_util_test.cc | 26 ++++++++++++++++ 8 files changed, 138 insertions(+) diff --git a/src/iceberg/arrow/literal_util.cc b/src/iceberg/arrow/literal_util.cc index c13338199..ad2638e9b 100644 --- a/src/iceberg/arrow/literal_util.cc +++ b/src/iceberg/arrow/literal_util.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -195,4 +196,27 @@ Result> MakeDefaultArray( return array; } +Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder) { + // The builder's own memory pool is not exposed, so the small scalar buffer uses the + // default pool. + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Scalar> scalar, + ToArrowScalar(literal, ::arrow::default_memory_pool())); + + // For an extension builder (e.g. `arrow.uuid`) target its storage type: ToArrowScalar + // yields the storage scalar (fixed_size_binary(16) for uuid) and Scalar::CastTo has no + // kernel that targets an extension type. This mirrors MakeDefaultArray's extension + // handling. + std::shared_ptr<::arrow::DataType> target_type = builder->type(); + if (target_type->id() == ::arrow::Type::EXTENSION) { + target_type = internal::checked_cast(*target_type) + .storage_type(); + } + + if (!scalar->type->Equals(*target_type)) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(scalar, scalar->CastTo(target_type)); + } + ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendScalar(*scalar)); + return {}; +} + } // namespace iceberg::arrow diff --git a/src/iceberg/arrow/literal_util_internal.h b/src/iceberg/arrow/literal_util_internal.h index 4de60b528..d4c22a89a 100644 --- a/src/iceberg/arrow/literal_util_internal.h +++ b/src/iceberg/arrow/literal_util_internal.h @@ -44,4 +44,8 @@ Result> MakeDefaultArray( const Literal& literal, const std::shared_ptr<::arrow::DataType>& type, int64_t num_rows, ::arrow::MemoryPool* pool); +/// \brief Append the literal value once to `builder`, e.g. to materialize a missing +/// field with a default value while building rows one at a time. +Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder); + } // namespace iceberg::arrow diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc index b44e4b110..9a6fedc0c 100644 --- a/src/iceberg/avro/avro_data_util.cc +++ b/src/iceberg/avro/avro_data_util.cc @@ -31,6 +31,7 @@ #include #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/metadata_columns.h" @@ -88,6 +89,9 @@ Status AppendStructToBuilder(const ::avro::NodePtr& avro_node, metadata_context, field_builder)); } else if (field_projection.kind == FieldProjection::Kind::kNull) { ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else if (field_projection.kind == FieldProjection::Kind::kDefault) { + ICEBERG_RETURN_UNEXPECTED(arrow::AppendDefaultToBuilder( + std::get(field_projection.from), field_builder)); } else if (field_projection.kind == FieldProjection::Kind::kMetadata) { int32_t field_id = expected_field.field_id(); if (field_id == MetadataColumns::kFilePathColumnId) { @@ -463,6 +467,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, return {}; } + if (projection.kind == FieldProjection::Kind::kDefault) { + return arrow::AppendDefaultToBuilder(std::get(projection.from), + array_builder); + } + if (avro_node->type() == ::avro::AVRO_UNION) { size_t branch = avro_datum.unionBranch(); if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) { diff --git a/src/iceberg/avro/avro_direct_decoder.cc b/src/iceberg/avro/avro_direct_decoder.cc index 583b9ac5c..228ccfc44 100644 --- a/src/iceberg/avro/avro_direct_decoder.cc +++ b/src/iceberg/avro/avro_direct_decoder.cc @@ -29,6 +29,7 @@ #include #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" #include "iceberg/avro/avro_direct_decoder_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/metadata_columns.h" @@ -209,6 +210,9 @@ Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& auto* field_builder = struct_builder->field_builder(static_cast(proj_idx)); if (field_projection.kind == FieldProjection::Kind::kNull) { ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else if (field_projection.kind == FieldProjection::Kind::kDefault) { + ICEBERG_RETURN_UNEXPECTED(arrow::AppendDefaultToBuilder( + std::get(field_projection.from), field_builder)); } else if (field_projection.kind == FieldProjection::Kind::kMetadata) { int32_t field_id = expected_field.field_id(); if (field_id == MetadataColumns::kFilePathColumnId) { diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 14b464cee..5e6bee957 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -752,6 +752,10 @@ Result ProjectStruct(const StructType& struct_type, iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.initial_default() != nullptr) { + // Rows written before the field existed assume its `initial-default` value. + child_projection.kind = FieldProjection::Kind::kDefault; + child_projection.from = *expected_field.initial_default(); } else if (expected_field.optional()) { child_projection.kind = FieldProjection::Kind::kNull; } else { diff --git a/src/iceberg/test/avro_data_test.cc b/src/iceberg/test/avro_data_test.cc index 7731f58d3..a3e970edd 100644 --- a/src/iceberg/test/avro_data_test.cc +++ b/src/iceberg/test/avro_data_test.cc @@ -31,6 +31,7 @@ #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/expression/literal.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/schema_util.h" @@ -662,6 +663,43 @@ TEST(AppendDatumToBuilderTest, StructWithMissingOptionalField) { avro_data, expected_json)); } +TEST(AppendDatumToBuilderTest, StructWithMissingDefaultFields) { + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "id", iceberg::int32()), + // Missing required field with an initial-default: filled with the default. + SchemaField(2, "score", iceberg::int64(), /*optional=*/false, /*doc=*/{}, + std::make_shared(Literal::Long(100))), + // Missing optional field with an initial-default: also filled, not null. + SchemaField(3, "grade", iceberg::string(), /*optional=*/true, /*doc=*/{}, + std::make_shared(Literal::String("A"))), + }); + + // Create Avro schema that only has the id field (missing score and grade). + std::string avro_schema_json = R"({ + "type": "record", + "name": "person", + "fields": [ + {"name": "id", "type": "int", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + std::vector<::avro::GenericDatum> avro_data; + for (int i = 0; i < 2; ++i) { + ::avro::GenericDatum avro_datum(avro_schema.root()); + auto& record = avro_datum.value<::avro::GenericRecord>(); + record.fieldAt(0).value() = i + 1; + avro_data.push_back(avro_datum); + } + + const std::string expected_json = R"([ + {"id": 1, "score": 100, "grade": "A"}, + {"id": 2, "score": 100, "grade": "A"} + ])"; + ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_schema.root(), + avro_data, expected_json)); +} + TEST(AppendDatumToBuilderTest, NestedStructWithMissingOptionalFields) { Schema iceberg_schema({ SchemaField::MakeRequired(1, "id", iceberg::int32()), diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 3ae1696d0..baa665849 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -42,6 +42,7 @@ #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/avro/avro_writer.h" +#include "iceberg/expression/literal.h" #include "iceberg/file_reader.h" #include "iceberg/metadata_columns.h" #include "iceberg/schema.h" @@ -265,6 +266,34 @@ TEST_F(AvroReaderTest, ReadTwoFields) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(AvroReaderTest, ReadMissingFieldsWithDefaults) { + // The file contains only fields 1 and 2; the projected schema adds fields 3 and 4 + // with initial-defaults, which are filled for all rows written before the columns + // existed. + CreateSimpleAvroFile(); + + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "name", std::make_shared()), + SchemaField(3, "score", std::make_shared(), /*optional=*/false, + /*doc=*/{}, std::make_shared(Literal::Long(100))), + SchemaField(4, "status", std::make_shared(), /*optional=*/true, + /*doc=*/{}, std::make_shared(Literal::String("active"))), + }); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, + R"([[1, "Alice", 100, "active"], + [2, "Bob", 100, "active"], + [3, "Charlie", 100, "active"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + TEST_F(AvroReaderTest, RoundTripWithGenericFileIO) { file_io_ = std::make_shared(); temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro"); diff --git a/src/iceberg/test/literal_util_test.cc b/src/iceberg/test/literal_util_test.cc index 9f21050f3..94cd536b5 100644 --- a/src/iceberg/test/literal_util_test.cc +++ b/src/iceberg/test/literal_util_test.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -186,6 +187,31 @@ TEST(LiteralUtilTest, MakeDefaultArrayWrapsExtensionType) { } } +TEST(LiteralUtilTest, AppendDefaultToBuilderAppendsValue) { + ::arrow::Int64Builder builder; + ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk()); + ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk()); + + std::shared_ptr<::arrow::Array> array; + ASSERT_TRUE(builder.Finish(&array).ok()); + ASSERT_EQ(array->length(), 2); + const auto& long_array = static_cast(*array); + ASSERT_EQ(long_array.Value(0), 42); + ASSERT_EQ(long_array.Value(1), 42); +} + +TEST(LiteralUtilTest, AppendDefaultToBuilderCastsToBuilderType) { + // The literal's natural type (int32) differs from the builder type (int64); the value + // is cast to the builder type. + ::arrow::Int64Builder builder; + ASSERT_THAT(AppendDefaultToBuilder(Literal::Int(7), &builder), IsOk()); + + std::shared_ptr<::arrow::Array> array; + ASSERT_TRUE(builder.Finish(&array).ok()); + ASSERT_EQ(array->length(), 1); + ASSERT_EQ(static_cast(*array).Value(0), 7); +} + } // namespace } // namespace iceberg::arrow