Skip to content

Data: Enable Parquet variant shredding for Record writes#16370

Open
soumilshah1995 wants to merge 6 commits into
apache:mainfrom
soumilshah1995:kafka-connect-shredding
Open

Data: Enable Parquet variant shredding for Record writes#16370
soumilshah1995 wants to merge 6 commits into
apache:mainfrom
soumilshah1995:kafka-connect-shredding

Conversation

@soumilshah1995

Copy link
Copy Markdown

Summary

Kafka Connect uses Iceberg’s generic Record model with a Void engine schema. Parquet variant shredding was ineffective on that path because the generic ParquetFormatModel did not use a variant shredding analyzer / row copier, and RecordVariantShreddingAnalyzer could not resolve VARIANT columns (engine resolveColumnIndex is a dead end for Void).

This PR wires RecordVariantShreddingAnalyzer and Record::copy into GenericFormatModels and analyzes VARIANT columns by Iceberg Schema#columns() order so buffered inference and shredded Parquet columns work for Connect.

Changes

  • GenericFormatModels: register ParquetFormatModel with RecordVariantShreddingAnalyzer + Record::copy.
  • RecordVariantShreddingAnalyzer: implement analyzeVariantColumns using positional indices aligned with Record#get.

Config (Connect)

Table write properties (e.g. via iceberg.tables.write-props):

  • write.parquet.shred-variants=true
  • write.parquet.variant-inference-buffer-size=<rows>

Test plan

  • ./gradlew :iceberg-data:check :iceberg-kafka-connect:iceberg-kafka-connect:check (or CI green).
  • Connect sink writing VARIANT with write.parquet.shred-variants=true; inspect Parquet for typed_value paths / higher physical column count vs false.
  • Regression: Connect append with shredding disabled still succeeds.

@github-actions github-actions Bot added the data label May 16, 2026
@soumilshah1995 soumilshah1995 force-pushed the kafka-connect-shredding branch 2 times, most recently from 8ce1a9d to ccb48f1 Compare May 16, 2026 22:00
Register ParquetFormatModel with RecordVariantShreddingAnalyzer and Record::copy, and analyze VARIANT columns using Iceberg schema column order so shredding works with Void engine schemas (Kafka Connect).
@nssalian

Copy link
Copy Markdown
Collaborator

Thanks for the contribution @soumilshah1995. Please fix the tests and get CI in a good shape.

soumilshah199500 and others added 2 commits May 19, 2026 14:26
Cover Iceberg column-order resolution and FormatModelRegistry Parquet
shredding round-trip for generic Record writes (Kafka Connect path).

Co-authored-by: Cursor <cursoragent@cursor.com>
Rename setup() to before() per Iceberg test naming convention.

Co-authored-by: Cursor <cursoragent@cursor.com>
@soumilshah1995

Copy link
Copy Markdown
Author

@nssalian
Thank you very much for taking the time to review my changes. This is my first contribution, and I truly appreciate your patience and guidance. I have addressed the changes you mentioned. Please let me know if there is anything else you would like me to update or improve.

@soumilshah199500

Copy link
Copy Markdown

Hi @laskoviymishka can you review PR Thank you

@laskoviymishka laskoviymishka self-requested a review May 21, 2026 14:42
@laskoviymishka

Copy link
Copy Markdown
Contributor

I will take a look (was waiting to CI become green :D)

@nssalian nssalian left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments.
Separately, I think it would be great to update the title to be more Data: Enable Parquet variant shredding for Record writes since this is specifically in the data module.

* <p>Shredding extracts frequently-occurring fields from variant data into typed Parquet columns
* for improved query performance while maintaining the full variant data in the raw value field.
*/
class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, Void> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few issues to note in the doc string here:

  1. "used by Kafka Connect and other tools" misframes this class as Connect-specific. PR title needs a change too.
  2. The "Shredding extracts frequently-occurring fields..." paragraph describes what shredding is; that belongs on the base class VariantShreddingAnalyzer, not the Record-specific subclass. I don't think this is needed here.
  3. The doc string doesn't actually describe what's specific about this implementation (positional indexing aligned with Record.get(int)).
  4. Would be great to be consistent with SparkVariantShreddingAnalyzer

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for reviewing this. I’ll make the necessary changes.

* -1}, so variant columns were never analyzed and Parquet shredding never activated for Kafka
* Connect and other Record-based writers.
*/
@Override

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This javadoc describes pre-fix state ("is unused", "always produced -1", "never activated") rather than what the method
does. Suggest dropping it - {@inheritDoc} is the default for overrides and the override's behavior needs to be explained instead

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestRecordVariantShreddingAnalyzer {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test covers a record with a null variant value. Could you add one?

* -1}, so variant columns were never analyzed and Parquet shredding never activated for Kafka
* Connect and other Record-based writers.
*/
@Override

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This override duplicates the loop from VariantShreddingAnalyzer.analyzeVariantColumns. Spark/Flink override only the protected hooks (resolveColumnIndex, extractVariantValues), not the public template.

}

@Override
protected int resolveColumnIndex(Void engineSchema, String columnName) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this getting called?

}

@Override
protected List<VariantValue> extractVariantValues(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instanceof Variant silently skips both nulls and non-Variant values. Should this throw on non-Variant so caller bugs surface instead of silently shrinking the analysis set? Worth referring to the Spark and Flink implementation.

@soumilshah1995 soumilshah1995 changed the title Kafka Connect: Enable Parquet variant shredding for generic Record writes Data: Enable Parquet variant shredding for Record writes May 21, 2026
Use Schema-based resolveColumnIndex instead of overriding analyzeVariantColumns,
set engine schema in ParquetFormatModel for generic Record writes, reject
non-Variant column values, and add null/wrong-type tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

@laskoviymishka laskoviymishka left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice rework @soumilshah1995 -- the analyzer is much cleaner now, and wiring it through ParquetFormatModel.schema() feels like the right shape.

Two things before merge:

First, I don’t think the KC path is actually wired yet. RegistryBasedFileWriterFactory.newDataWriter() calls .schema(s).engineSchema(inputSchema()). In the Connect path, inputSchema() is null, so we first auto-populate engineSchema = icebergSchema, then immediately overwrite it with null.

After that, buildShreddedAppender() gets a null engine schema, resolveColumnIndex returns -1 for every column, and shredding silently does not activate for KC writes.

The current test misses this because it builds the writer directly through dataWriteBuilder(...).schema(...).build(), not through GenericFileWriterFactory.

Simplest fix is probably a null guard in WriteBuilderWrapper.engineSchema(), so the auto-populated value sticks. Or RegistryBasedFileWriterFactory can skip .engineSchema(...) when inputSchema() is null.

Can we also add a KC-shaped test through:

GenericFileWriterFactory.Builder.writerProperties("write.parquet.shred-variants", "true").build()

and assert typed_value groups show up in the output Parquet?

I’ll ping @AnatolyPopov too since he knows the kafka-connect side deeper.

Second, RecordVariantShreddingAnalyzer.resolveColumnIndex should fail loudly on null engineSchema, not return -1. Once the writer fix lands this should be unreachable, but silent fallback is exactly how this slipped through. Spark does a checkNotNull here too.

A few non-blockers:

  • Connect creates one TaskWriter per partition, so different partitions can infer different typed_value schemas in the same snapshot. Spec allows it, but worth a doc note.
  • PARQUET_VARIANT_BUFFER_SIZE_DEFAULT = 100 feels small for KC, where max.poll.records is often 500+. Worth documenting write.parquet.variant-inference-buffer-size as a knob operators may need to bump.
  • Two small inline nits: O(n) lookup for wide schemas, and raw-row assertion only checks row 0. Fine as follow-ups.

Fix the engineSchema overwrite, add the null check + KC-path test, and this should be good to land.

public ModelWriteBuilder<D, S> schema(Schema newSchema) {
this.schema = newSchema;
internal.schema(newSchema);
if (Schema.class.equals(schemaType)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right idea, but I think two things need to happen here for this to actually work end-to-end.

First, this auto-set fires every time schema(...) is called and silently clobbers a caller-set engineSchema if the order happens to be engineSchema(custom).schema(s). I'd gate it so explicit wins:

if (this.engineSchema == null && Schema.class.equals(schemaType)) {
  this.engineSchema = (S) newSchema;
}

Second — and this is the bigger one — RegistryBasedFileWriterFactory.newDataWriter() calls .schema(s).engineSchema(inputSchema()) back-to-back, and for the KC path GenericFileWriterFactory.Builder.build() goes through the deprecated 10-arg constructor and passes null for inputSchema. So this auto-populate does the right thing, then .engineSchema(null) immediately overwrites it because WriteBuilderWrapper.engineSchema(...) (just below) has no null guard. By the time buildShreddedAppender() runs, resolveColumnIndex gets null, returns -1 for every column, and shredding silently doesn't activate for KC writes. A null guard on the engineSchema(...) setter — skip the assignment if newSchema == null — would make the auto-populated value stick.

Also worth a @SuppressWarnings("unchecked") on the cast (or pulling it into a small helper) — it currently emits an unchecked-cast warning.

I'll ping @AnatolyPopov to take a look too since he's closer to the KC side and can sanity-check the wiring claim.

@Override
protected int resolveColumnIndex(Schema engineSchema, String columnName) {
if (engineSchema == null) {
return -1;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning -1 on a null engineSchema silently disables shredding for the whole column. Once the writer-side fix is in (see my comment in ParquetFormatModel.java), this should be unreachable in normal paths, so I'd rather it fail loudly than degrade silently:

Preconditions.checkNotNull(engineSchema, "Invalid engine schema: null");

Spark's analyzer does the same — calls sparkSchema.fieldIndex(name) with no null handling.

}

List<NestedField> cols = engineSchema.columns();
for (int i = 0; i < cols.size(); i++) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocker, but worth thinking about: this is O(n) per variant column, so O(n·k) for wide schemas (CDC tables with 200+ columns are not uncommon). Spark uses sparkSchema.fieldIndex(name) which is O(1). A precomputed Map<String, Integer> cached on the instance would match that.

Also, while we're here — the class Javadoc says "positional indices aligned with Schema#columns()", and extractVariantValues then uses record.get(int) against that same index. That's a real contract: the records being analyzed must have been built against the same schema passed as engineSchema. Worth one sentence in the Javadoc making that explicit so a future caller doesn't pass a projected schema and get silent misalignment.

assertThat(variantData.getFieldRepetitionCount("value")).isEqualTo(0);

Group typedValue = variantData.getGroup("typed_value", 0);
assertThat(typedValue.getGroup("a", 0).getInteger("typed_value", 0)).isEqualTo(42);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things on this round-trip test.

The high-level read-back via InternalTestHelpers.assertEquals covers all three records, which is great. The raw Parquet read here only checks row 0 though, and with BUFFER_SIZE=2 and 3 input rows the second buffer flush (rows 2→3 boundary) is exactly what we'd want physical evidence for. A short while ((row = rawReader.read()) != null) loop asserting each row's typed_value would close that. Fine as a follow-up.

More importantly: there's no test that goes through RegistryBasedFileWriterFactory (the actual KC production path). This test calls dataWriteBuilder(...).schema(...).build() directly and never touches .engineSchema(...), so it exercises exactly the path the PR's schema() fix targets but not the KC shape (.schema(s).engineSchema(null)) — which is why the engineSchema overwrite I flagged in ParquetFormatModel.java slipped through. A test that builds via GenericFileWriterFactory.Builder.writerProperties("write.parquet.shred-variants", "true").build() and asserts typed_value groups appear in the output would catch that regression.

soumilshah199500 and others added 2 commits May 21, 2026 17:26
Preserve auto-populated engine schema when callers pass null, cache column
index lookups, and add KC factory plus multi-row shredding round-trip tests.

Co-authored-by: Cursor <cursoragent@cursor.com>
Reconcile ParquetFormatModel with upstream copyFuncFactory API while
keeping engineSchema auto-population for generic Record writes.

Co-authored-by: Cursor <cursoragent@cursor.com>

@laskoviymishka laskoviymishka left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a clean, well-scoped feature, the analyzer resolves variant columns by Iceberg column order, the null-skip in extractVariantValues is right, and the round-trip tests genuinely write a shredded file and read it back through both the GenericFileWriterFactory and the registry paths. I like that you covered both the variant-first and variant-after-id layouts.

I'd hold this before merging. Two things are load-bearing.

RecordVariantShreddingAnalyzer is built once in GenericFormatModels.register() and stored on the ParquetFormatModel singleton, but it carries a mutable columnIndicesBySchema HashMap that resolveColumnIndex writes to via computeIfAbsent. With multiple writer threads sharing that singleton (a KC sink, say) that's a plain HashMap mutated concurrently — corruption or a spin, not just a lost update. The Spark analyzer stays stateless to avoid exactly this, and since the cache only saves one linear scan over <100 fields, I'd just drop it.

The engineSchema(null) change turns a public setter into a silent no-op, and the whole thing only works because schema() is called before engineSchema()schema() auto-derives, then the null gets dropped. That's a fragile, order-dependent protocol to bake into an interface method, and no test pins it down. I'd move the null guard into RegistryBasedFileWriterFactory, keep the setter a plain assignment, and make the auto-derive order-independent via an explicit flag in build().

A few things I'd want settled before merge:

  • make the analyzer thread-safe (stateless, or a per-writer instance)
  • move the engineSchema(null) guard out of the interface setter
  • make the engine-schema auto-derive order-independent and not keyed on Schema.class identity
  • add a test where a real non-null engine schema survives schema(), plus one with multiple variant columns
  • fix the misleading "KC path" comment on the partition-arg null

Once those are addressed I'm happy to take another pass and approve.

*/
class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, Schema> {

private final Map<Schema, Map<String, Integer>> columnIndicesBySchema = Maps.newHashMap();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This analyzer gets built once in GenericFormatModels.register() and lives on the ParquetFormatModel singleton, so this map is effectively process-global mutable state. resolveColumnIndex writes to it with computeIfAbsent, and if two writer threads (a KC sink with multiple workers, say) hit it at once we get concurrent structural modification on a plain HashMap — corruption or a spin, not just a lost update.

There's a second problem stacked on it: Schema doesn't override equals/hashCode, so this is keyed on object identity and never evicts. A long-lived sink across schema evolution grows it without bound.

The Spark analyzer stays stateless precisely to dodge both. I'd drop the cache entirely and build the index map locally in resolveColumnIndex — it's one linear scan over <100 top-level fields per buffered batch, which is noise on this path. wdyt?

@Override
public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
this.engineSchema = newSchema;
if (newSchema != null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a public setter on ModelWriteBuilder silently ignore null, with nothing in the contract saying so. Any caller that passes null to reset/clear now gets a no-op and no signal.

I think the cleaner home for this guard is RegistryBasedFileWriterFactory.newDataWriter — only call .engineSchema(inputSchema()) when inputSchema() is non-null, and leave this setter as a plain assignment. That keeps the KC-null intent without overloading the interface method. wdyt?

public ModelWriteBuilder<D, S> schema(Schema newSchema) {
this.schema = newSchema;
internal.schema(newSchema);
if (this.engineSchema == null && Schema.class.equals(schemaType)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correctness of this whole change hinges on schema() being called before engineSchema(): schema() auto-derives the engine schema, then engineSchema(null) is dropped by the null-guard in engineSchema() below, so the derived value survives. Reverse the call order and the auto-derive sees a non-null engineSchema and skips. That ordering is invisible to anyone implementing the builder.

I'd make this order-independent: keep an explicit engineSchemaWasSet flag, and do the auto-derive in build() only when the flag is false. Then schema()/engineSchema() can be called in either order and a real engine schema is never clobbered.

Separately, using Schema.class.equals(schemaType) as the signal leaks schema-type semantics into the parquet layer — any future model that uses Schema as its engine-schema token inherits an auto-derive it might not want. An explicit flag on create() (or a shouldDeriveEngineSchema() hook) would be more honest than a class-identity check. Thoughts?

}

@Override
@SuppressWarnings("unchecked")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only unchecked cast is (S) newSchema on the auto-derive line. I'd narrow the suppression to that statement (or a tiny helper) rather than the whole method, so a future cast in here doesn't get silently covered.

GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant)));
GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant),
new RecordVariantShreddingAnalyzer(),
(Function<Schema, UnaryOperator<Record>>) engineSchema -> Record::copy));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda takes engineSchema and ignores it, returning Record::copy unconditionally. That's fine for this path, but the Function<S, UnaryOperator<D>> abstraction exists so engines can produce a schema-aware copy — here it's pure indirection, and the parameter name reads as if it's used.

If the generic path will never use the engine schema for copying, could we pass a plain UnaryOperator<Record> and drop the factory wrapper? If the factory has to stay for signature reasons, I'd rename the parameter to ignored so it doesn't mislead. wdyt?

}

@Test
public void testFormatModelRegistryShreddingRoundTrip() throws IOException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both round-trip tests only exercise the auto-derive path — neither ever calls engineSchema() with a real, non-null Schema. Given the schema()/engineSchema() precedence logic is the riskiest part of this PR, I'd add a WriteBuilderWrapper-level test that sets an explicit engine schema and then calls schema(...), asserting the explicit one wins (and ideally one that calls them in the reverse order). That's the case that pins down the protocol we're relying on.

OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build();
EncryptedOutputFile encryptedOutputFile = fileFactory.newOutputFile();

// KC path: RegistryBasedFileWriterFactory passes inputSchema=null as engineSchema.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment points at the wrong null — the third arg here is the StructLike partition, not the engine schema. The engine-schema null comes from GenericFileWriterFactory not forwarding an inputSchema to the superclass, so inputSchema() returns null and RegistryBasedFileWriterFactory calls builder.engineSchema(null). I'd reword so a reader doesn't go looking for the null in the partition arg.

Map<Integer, Type> shreddedTypes =
analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA);

assertThat(shreddedTypes).containsOnlyKeys(2);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every test here shreds exactly one variant column. Could we add one with two variant columns at non-adjacent positions (say id LONG, v1 VARIANT, other STRING, v2 VARIANT) and assert both field IDs come back shredded? That's the case that would catch an off-by-one in indexByName or a "only the first variant gets shredded" regression — the current single-column tests can't.

While we're here, the literal 2 is the field ID of v; pulling it into a named constant would read better than the bare key.

private void assertAllRawParquetRowsShredded(OutputFile outputFile) throws IOException {
try (ParquetReader<Group> rawReader =
ParquetReader.builder(
new GroupReadSupport(), new org.apache.hadoop.fs.Path(outputFile.location()))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.hadoop.fs.Path is fully qualified inline to dodge the java.nio.file.Path import used for @TempDir. The idiomatic fix is to import org.apache.hadoop.fs.Path and declare the temp dir as @TempDir private java.nio.file.Path temp; — same trick applies to the inline org.apache.iceberg.data.parquet.GenericParquetReaders in assertRecordsRoundTrip. Minor, but it cleans up two FQNs.

columnIndicesBySchema.computeIfAbsent(
engineSchema, RecordVariantShreddingAnalyzer::indexByName);
Integer index = indices.get(columnName);
return index != null ? index : -1;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When resolveColumnIndex returns -1 the base class quietly skips shredding for that column. If an engine schema and the Iceberg schema disagree on a name (a rename, say), we'd silently write unshredded with no diagnostic. A LOG.warn here with the column name would make that failure mode discoverable instead of a silent fallback. Not blocking — just easy to lose a day to otherwise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants