From 1ce41a9608ef19c22b595b42f38b16847ce34800 Mon Sep 17 00:00:00 2001 From: tugce-applied Date: Mon, 16 Feb 2026 16:37:06 +0000 Subject: [PATCH 1/2] [SPARK-55552][SQL] Add VariantType support to ColumnarBatchRow.copy() and MutableColumnarRow ColumnarBatchRow.copy() and MutableColumnarRow.copy()/get() did not handle VariantType, causing RuntimeException 'Not implemented. VariantType' when using VariantType columns in streaming custom data sources. PR #53137 (SPARK-54427) fixed ColumnarRow but missed ColumnarBatchRow and MutableColumnarRow. PR #54006 attempted this fix but was abandoned. This patch adds: - PhysicalVariantType branch in ColumnarBatchRow.copy() - VariantType branch in MutableColumnarRow.copy() and get() - Test in ColumnarBatchSuite validating VariantVal round-trip through copy() --- .../sql/vectorized/ColumnarBatchRow.java | 2 ++ .../vectorized/MutableColumnarRow.java | 4 +++ .../vectorized/ColumnarBatchSuite.scala | 35 +++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java index 4be45dc5d399d..3d1e780f6e057 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java @@ -86,6 +86,8 @@ public InternalRow copy() { row.update(i, getArray(i).copy()); } else if (pdt instanceof PhysicalMapType) { row.update(i, getMap(i).copy()); + } else if (pdt instanceof PhysicalVariantType) { + row.update(i, getVariant(i)); } else { throw new RuntimeException("Not implemented. " + dt); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java index 49c27f9775624..a46b5143eef6d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java @@ -96,6 +96,8 @@ public InternalRow copy() { row.update(i, getArray(i).copy()); } else if (dt instanceof MapType) { row.update(i, getMap(i).copy()); + } else if (dt instanceof VariantType) { + row.update(i, getVariant(i)); } else { throw new RuntimeException("Not implemented. " + dt); } @@ -217,6 +219,8 @@ public Object get(int ordinal, DataType dataType) { return getStruct(ordinal, structType.fields().length); } else if (dataType instanceof MapType) { return getMap(ordinal); + } else if (dataType instanceof VariantType) { + return getVariant(ordinal); } else { throw new SparkUnsupportedOperationException( "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString())); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 97ad2c1f5bf96..6d90bb985e269 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -2025,4 +2025,39 @@ class ColumnarBatchSuite extends SparkFunSuite { } } } + + testVector("[SPARK-55552] Variant", 3, VariantType) { + column => + val valueChild = column.getChild(0) + val metadataChild = column.getChild(1) + + column.putNotNull(0) + valueChild.appendByteArray(Array[Byte](1, 2, 3), 0, 3) + metadataChild.appendByteArray(Array[Byte](10, 11), 0, 2) + + column.putNotNull(1) + valueChild.appendByteArray(Array[Byte](4, 5), 0, 2) + metadataChild.appendByteArray(Array[Byte](12, 13, 14), 0, 3) + + column.putNull(2) + valueChild.appendNull() + metadataChild.appendNull() + + val batchRow = new ColumnarBatchRow(Array(column)) + (0 until 3).foreach { i => + batchRow.rowId = i + val batchRowCopy = batchRow.copy() + if (i < 2) { + assert(!batchRow.isNullAt(0)) + assert(!batchRowCopy.isNullAt(0)) + val original = batchRow.getVariant(0) + val copied = batchRowCopy.get(0, VariantType).asInstanceOf[VariantVal] + assert(java.util.Arrays.equals(original.getValue, copied.getValue)) + assert(java.util.Arrays.equals(original.getMetadata, copied.getMetadata)) + } else { + assert(batchRow.isNullAt(0)) + assert(batchRowCopy.isNullAt(0)) + } + } + } } From 28aa0e18f34317de9c3045725395bb5cfe25b8b8 Mon Sep 17 00:00:00 2001 From: tugce-applied Date: Mon, 16 Feb 2026 16:43:22 +0000 Subject: [PATCH 2/2] Retrigger CI after enabling GitHub Actions