feat: experimental Spark JSON support via codegen dispatcher#4305
Draft
andygrove wants to merge 4 commits into
Draft
feat: experimental Spark JSON support via codegen dispatcher#4305andygrove wants to merge 4 commits into
andygrove wants to merge 4 commits into
Conversation
Add `spark.comet.exec.json.engine` (default `rust`, experimental `java`) that routes the JSON expressions in scope through the JVM UDF framework introduced in apache#4232, delegating to Spark's own expression classes for byte-exact compatibility at the cost of JNI roundtrips per batch. Expressions in scope when `engine=java`: - `get_json_object` -> `GetJsonObjectUDF` - `from_json` -> `FromJsonUDF` - `to_json` -> `ToJsonUDF` A fresh Spark expression is built per `evaluate` call. Spark's JSON evaluators (`GetJsonObjectEvaluator`, `StructsToJsonEvaluator`, `JsonToStructsEvaluator`) hold mutable per-row state, and the JVM UDF framework shares one UDF instance across native worker threads, so a cached cross-thread expression races on its evaluator state. `from_json` / `to_json` use a serde-side `CometLambdaRegistry` to pass the configured Spark expression (schema, options, timezone) to the UDF. The serde rebinds the child to `BoundReference(0)` so the UDF can call `eval(row)` against a single-column wrapper row. `json_array_length` and `json_object_keys` are out of scope: both are `RuntimeReplaceable` in Spark 4.x and Catalyst's `ReplaceExpressions` rule rewrites them to `StaticInvoke` before Comet sees the plan, so `classOf[LengthOfJsonArray]` / `classOf[JsonObjectKeys]` serde registrations never match. Adding support requires recognizing the rewritten `StaticInvoke` form in Comet's serde dispatch. This PR was scaffolded with the project's brainstorming, writing-plans, and subagent-driven-development skills.
This was referenced May 14, 2026
Resolve conflicts: - The common module rename (PR apache#4325) moved the UDF files added by this branch under common/.../udf/ to spark/.../udf/. Git's location-conflict resolver guessed the wrong destination (.../shims/); fix manually by placing FromJsonUDF / GetJsonObjectUDF / ToJsonUDF under spark/src/main/scala/org/apache/comet/udf/. (Follow-up commit retires them in favor of the codegen dispatcher anyway.) - CometConf.scala: keep both COMET_JSON_ENGINE (this PR) and COMET_SCALA_UDF_CODEGEN_ENABLED (main). - serde/structs.scala: combine the JSON engine selector with main's improved native ignoreNullFields / options handling. engine=java keeps routing through convertViaJvmUdf; engine=rust uses main's tightened native path.
…f hand-written UDFs Replace the three hand-written `GetJsonObjectUDF` / `FromJsonUDF` / `ToJsonUDF` JVM UDF implementations and the `CometLambdaRegistry` indirection with the Arrow-direct codegen dispatcher introduced in PR apache#4417 (`CometScalaUDF.emitJvmCodegenDispatch`). The dispatcher Janino-compiles Spark's own `doGenCode` (or `eval(row)` for CodegenFallback expressions) so the JSON family inherits Spark-identical semantics with no per-expression glue. Changes: - Delete the three hand-written UDF files under `spark/src/main/scala/org/apache/comet/udf/` and their unit-test suites. The codegen dispatcher's per-task `kernelCache` provides the same per-thread isolation that `CometLambdaRegistry` was working around. - Rewrite the JSON serdes (`CometGetJsonObject` in `strings.scala`, `CometStructsToJson` and `CometJsonToStructs` in `structs.scala`) to go through a new `JsonRoute` helper. `engine=rust` keeps the native path; `engine=java` delegates to `CometScalaUDF.emitJvmCodegenDispatch` when `spark.comet.exec.scalaUDF.codegen.enabled=true`. - Generalize the codegen dispatcher to accept `CodegenFallback` expressions. `CodegenFallback.doGenCode` emits `references[N].eval(row)`, the same shape the `HigherOrderFunction` carve-out already relied on; lifting the rejection lets `JsonToStructs` and `StructsToJson` (which are `CodegenFallback` in Spark 4) ride the same path. - Unwrap `RuntimeReplaceable` expressions inside `CometScalaUDF.emitJvmCodegenDispatch` before binding. Spark 4's `StructsToJson` is `RuntimeReplaceable` and its `doGenCode` throws "Cannot generate code for expression"; calling `.replacement` gives the `Invoke(StructsToJsonEvaluator, ...)` form that does codegen. - Update the JSON compatibility doc and the `CometJsonJvmSuite` config to reference the codegen flag. Test plan: - `CometJsonJvmSuite`: 3/3 pass (get_json_object, from_json, to_json round-trip via the codegen dispatcher). - `CometJsonExpressionSuite`: 8/8 pass on the unchanged native path. - `CometStringExpressionSuite`: 33/33, `CometCodegenSuite`: 60/60, `CometCodegenSourceSuite`: 50/50, `CometSqlFileTestSuite`: 284/284. - `cargo clippy --all-targets --workspace -- -D warnings`: clean.
- Add CometJsonJvmSuite to pr_build_linux.yml so check-missing-suites passes. - Remove unused HigherOrderFunction, LambdaFunction, NamedLambdaVariable imports from CometBatchKernelCodegen.scala (referenced only in comments). - Remove unused serializeDataType import from strings.scala.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
The native Rust JSON expressions in Comet have known compatibility gaps and feature restrictions:
from_jsononly supports PERMISSIVE mode with simple schemas,to_jsondoes not handle map/array at the top level, andget_json_objectdiffers from Spark on certain path expressions. Routing through Spark's own expression classes guarantees byte-exact compatibility, at the cost of a JNI roundtrip per batch.This PR adds a
spark.comet.exec.json.engineselector that routes the JSON expressions through the Arrow-direct codegen dispatcher introduced in #4417. The dispatcher Janino-compiles Spark's owndoGenCode(oreval(row)forCodegenFallbackexpressions) so the JSON family inherits Spark-identical semantics with no per-expression glue code. The existing native path remains the default.Configs
spark.comet.exec.json.enginein{rust, java}, defaultrustrust: native DataFusion implementation. Fast, but has known compatibility gaps.java(experimental): routes through the codegen dispatcher so Spark's owndoGenCoderuns inside the Comet pipeline. Requiresspark.comet.exec.scalaUDF.codegen.enabled=true; otherwise the operator falls back to Spark.Reuses the existing
spark.comet.exec.scalaUDF.codegen.enabled(defaultfalse) gate introduced for the codegen dispatcher.What changes are included in this PR?
JsonRoutehelper instructs.scalathat each JSON serde delegates to. It picks between the native path and the codegen dispatcher based onengineandscalaUDF.codegen.enabled.CometGetJsonObject(strings.scala),CometStructsToJson, andCometJsonToStructs(structs.scala) to use the route helper. The JVM arm delegates toCometScalaUDF.emitJvmCodegenDispatch.spark.comet.exec.json.engineconfig inCometConf.CometBatchKernelCodegen.canHandlenow acceptsCodegenFallbackexpressions.CodegenFallback.doGenCodeemitsreferences[N].eval(row), the same mechanism the existingHigherOrderFunctioncarve-out relies on; lifting the rejection lets non-HOF CodegenFallback expressions (hereJsonToStructsandStructsToJson) ride the same path.CometScalaUDF.emitJvmCodegenDispatchnow unwrapsRuntimeReplaceablebefore binding. Spark 4'sStructsToJsonisRuntimeReplaceableand itsdoGenCodealways throws; calling.replacementgives theInvoke(StructsToJsonEvaluator, ...)form that does codegen.docs/source/user-guide/latest/compatibility/json.md.json_array_lengthandjson_object_keysare intentionally out of scope. Both areRuntimeReplaceablein Spark 4.x and Catalyst'sReplaceExpressionsrewrites them toStaticInvokebefore Comet sees the plan, soclassOf[LengthOfJsonArray]/classOf[JsonObjectKeys]registrations never match. Adding support requires recognizing the rewrittenStaticInvokeform in Comet's serde dispatch and is left to a follow-up.How are these changes tested?
CometJsonJvmSuite: 3 tests coveringget_json_object,from_json, andto_json(from_json(...))round-trip withengine=javaand the codegen flag enabled.CometJsonExpressionSuite: 8 native-path tests continue to pass on defaultengine=rust.CometCodegenSuite,CometCodegenSourceSuite,CometStringExpressionSuite, andCometSqlFileTestSuitecontinue to pass.CometCodegenSourceSuitehas an updated test asserting the new CodegenFallback admission.This PR was scaffolded with the project's brainstorming, writing-plans, and subagent-driven-development skills.