Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/iceberg/arrow/literal_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>
#include <vector>

#include <arrow/array/builder_base.h>
#include <arrow/array/util.h>
#include <arrow/buffer.h>
#include <arrow/compute/api.h>
Expand Down Expand Up @@ -195,4 +196,27 @@ Result<std::shared_ptr<::arrow::Array>> MakeDefaultArray(
return array;
}

Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder) {
// The builder's own memory pool is not exposed, so the small scalar buffer uses the
// default pool.
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Scalar> scalar,
ToArrowScalar(literal, ::arrow::default_memory_pool()));

// For an extension builder (e.g. `arrow.uuid`) target its storage type: ToArrowScalar
// yields the storage scalar (fixed_size_binary(16) for uuid) and Scalar::CastTo has no
// kernel that targets an extension type. This mirrors MakeDefaultArray's extension
// handling.
std::shared_ptr<::arrow::DataType> target_type = builder->type();
if (target_type->id() == ::arrow::Type::EXTENSION) {
target_type = internal::checked_cast<const ::arrow::ExtensionType&>(*target_type)
.storage_type();
}

if (!scalar->type->Equals(*target_type)) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(scalar, scalar->CastTo(target_type));
}
ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendScalar(*scalar));
return {};
}

} // namespace iceberg::arrow
4 changes: 4 additions & 0 deletions src/iceberg/arrow/literal_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ Result<std::shared_ptr<::arrow::Array>> MakeDefaultArray(
const Literal& literal, const std::shared_ptr<::arrow::DataType>& type,
int64_t num_rows, ::arrow::MemoryPool* pool);

/// \brief Append the literal value once to `builder`, e.g. to materialize a missing
/// field with a default value while building rows one at a time.
Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder);

} // namespace iceberg::arrow
9 changes: 9 additions & 0 deletions src/iceberg/avro/avro_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <avro/Types.hh>

#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/literal_util_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/metadata_columns.h"
Expand Down Expand Up @@ -88,6 +89,9 @@ Status AppendStructToBuilder(const ::avro::NodePtr& avro_node,
metadata_context, field_builder));
} else if (field_projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull());
} else if (field_projection.kind == FieldProjection::Kind::kDefault) {
ICEBERG_RETURN_UNEXPECTED(arrow::AppendDefaultToBuilder(
std::get<Literal>(field_projection.from), field_builder));
} else if (field_projection.kind == FieldProjection::Kind::kMetadata) {
int32_t field_id = expected_field.field_id();
if (field_id == MetadataColumns::kFilePathColumnId) {
Expand Down Expand Up @@ -463,6 +467,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
return {};
}

if (projection.kind == FieldProjection::Kind::kDefault) {
return arrow::AppendDefaultToBuilder(std::get<Literal>(projection.from),
array_builder);
}

if (avro_node->type() == ::avro::AVRO_UNION) {
size_t branch = avro_datum.unionBranch();
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/avro/avro_direct_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <avro/Types.hh>

#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/literal_util_internal.h"
#include "iceberg/avro/avro_direct_decoder_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/metadata_columns.h"
Expand Down Expand Up @@ -209,6 +210,9 @@ Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
auto* field_builder = struct_builder->field_builder(static_cast<int>(proj_idx));
if (field_projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull());
} else if (field_projection.kind == FieldProjection::Kind::kDefault) {
ICEBERG_RETURN_UNEXPECTED(arrow::AppendDefaultToBuilder(
std::get<Literal>(field_projection.from), field_builder));
} else if (field_projection.kind == FieldProjection::Kind::kMetadata) {
int32_t field_id = expected_field.field_id();
if (field_id == MetadataColumns::kFilePathColumnId) {
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ Result<FieldProjection> ProjectStruct(const StructType& struct_type,
iter->second.local_index, prune_source));
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
child_projection.kind = FieldProjection::Kind::kMetadata;
} else if (expected_field.initial_default() != nullptr) {
// Rows written before the field existed assume its `initial-default` value.
child_projection.kind = FieldProjection::Kind::kDefault;
child_projection.from = *expected_field.initial_default();
} else if (expected_field.optional()) {
child_projection.kind = FieldProjection::Kind::kNull;
} else {
Expand Down
38 changes: 38 additions & 0 deletions src/iceberg/test/avro_data_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/expression/literal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/schema_util.h"
Expand Down Expand Up @@ -662,6 +663,43 @@ TEST(AppendDatumToBuilderTest, StructWithMissingOptionalField) {
avro_data, expected_json));
}

TEST(AppendDatumToBuilderTest, StructWithMissingDefaultFields) {
Schema iceberg_schema({
SchemaField::MakeRequired(1, "id", iceberg::int32()),
// Missing required field with an initial-default: filled with the default.
SchemaField(2, "score", iceberg::int64(), /*optional=*/false, /*doc=*/{},
std::make_shared<const Literal>(Literal::Long(100))),
// Missing optional field with an initial-default: also filled, not null.
SchemaField(3, "grade", iceberg::string(), /*optional=*/true, /*doc=*/{},
std::make_shared<const Literal>(Literal::String("A"))),
});

// Create Avro schema that only has the id field (missing score and grade).
std::string avro_schema_json = R"({
"type": "record",
"name": "person",
"fields": [
{"name": "id", "type": "int", "field-id": 1}
]
})";
auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);

std::vector<::avro::GenericDatum> avro_data;
for (int i = 0; i < 2; ++i) {
::avro::GenericDatum avro_datum(avro_schema.root());
auto& record = avro_datum.value<::avro::GenericRecord>();
record.fieldAt(0).value<int32_t>() = i + 1;
avro_data.push_back(avro_datum);
}

const std::string expected_json = R"([
{"id": 1, "score": 100, "grade": "A"},
{"id": 2, "score": 100, "grade": "A"}
])";
ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_schema.root(),
avro_data, expected_json));
}

TEST(AppendDatumToBuilderTest, NestedStructWithMissingOptionalFields) {
Schema iceberg_schema({
SchemaField::MakeRequired(1, "id", iceberg::int32()),
Expand Down
29 changes: 29 additions & 0 deletions src/iceberg/test/avro_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/avro/avro_writer.h"
#include "iceberg/expression/literal.h"
#include "iceberg/file_reader.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/schema.h"
Expand Down Expand Up @@ -265,6 +266,34 @@ TEST_F(AvroReaderTest, ReadTwoFields) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

TEST_F(AvroReaderTest, ReadMissingFieldsWithDefaults) {
// The file contains only fields 1 and 2; the projected schema adds fields 3 and 4
// with initial-defaults, which are filled for all rows written before the columns
// existed.
CreateSimpleAvroFile();

auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()),
SchemaField(3, "score", std::make_shared<LongType>(), /*optional=*/false,
/*doc=*/{}, std::make_shared<const Literal>(Literal::Long(100))),
SchemaField(4, "status", std::make_shared<StringType>(), /*optional=*/true,
/*doc=*/{}, std::make_shared<const Literal>(Literal::String("active"))),
});

auto reader_result = ReaderFactoryRegistry::Open(
FileFormatType::kAvro,
{.path = temp_avro_file_, .io = file_io_, .projection = schema});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());

ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader,
R"([[1, "Alice", 100, "active"],
[2, "Bob", 100, "active"],
[3, "Charlie", 100, "active"]])"));
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}

TEST_F(AvroReaderTest, RoundTripWithGenericFileIO) {
file_io_ = std::make_shared<iceberg::test::StdFileIO>();
temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
Expand Down
26 changes: 26 additions & 0 deletions src/iceberg/test/literal_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include <arrow/array.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/buffer.h>
#include <arrow/extension/uuid.h>
#include <arrow/scalar.h>
Expand Down Expand Up @@ -186,6 +187,31 @@ TEST(LiteralUtilTest, MakeDefaultArrayWrapsExtensionType) {
}
}

TEST(LiteralUtilTest, AppendDefaultToBuilderAppendsValue) {
::arrow::Int64Builder builder;
ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk());
ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk());

std::shared_ptr<::arrow::Array> array;
ASSERT_TRUE(builder.Finish(&array).ok());
ASSERT_EQ(array->length(), 2);
const auto& long_array = static_cast<const ::arrow::Int64Array&>(*array);
ASSERT_EQ(long_array.Value(0), 42);
ASSERT_EQ(long_array.Value(1), 42);
}

TEST(LiteralUtilTest, AppendDefaultToBuilderCastsToBuilderType) {
// The literal's natural type (int32) differs from the builder type (int64); the value
// is cast to the builder type.
::arrow::Int64Builder builder;
ASSERT_THAT(AppendDefaultToBuilder(Literal::Int(7), &builder), IsOk());

std::shared_ptr<::arrow::Array> array;
ASSERT_TRUE(builder.Finish(&array).ok());
ASSERT_EQ(array->length(), 1);
ASSERT_EQ(static_cast<const ::arrow::Int64Array&>(*array).Value(0), 7);
}

} // namespace

} // namespace iceberg::arrow