[SPARK-56977][SQL] RewriteNearestByJoin should respect joinType in the synthetic join#56023
[SPARK-56977][SQL] RewriteNearestByJoin should respect joinType in the synthetic join#56023zhidongqu-db wants to merge 3 commits into
Conversation
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem. RewriteNearestByJoin hardcoded the synthetic cross-join to LeftOuter regardless of the user's joinType, reasoning that LeftOuter ≡ Inner for unconditioned joins with non-empty right, and that Generate(outer=false) + the inferred size(matches) > 0 filter would mop up the empty-right INNER case. That preserved semantics but hid a planner cliff: SparkStrategies only picks CartesianProductExec when joinType.isInstanceOf[InnerLike] (SparkStrategies.scala:394), so an unconditioned LeftOuter always falls through to BroadcastNestedLoopJoinExec. For large right sides, that means broadcast OOMs or the planner is wedged.
Design approach. Pass the user's joinType (already constrained to Inner or LeftOuter at NearestByJoin.scala:66) into the synthetic Join, and keep deriving Generate.outer as joinType == LeftOuter. Externally observable semantics are unchanged; EXPLAIN no longer misrepresents the user's INNER as LeftOuter, and INNER queries become eligible for CartesianProductExec. CheckCartesianProducts (Optimizer.scala:2548) already matches both Inner | LeftOuter, so the spark.sql.crossJoin.enabled = false gate behaves identically.
Key design decision. Routing the user's joinType directly into the synthetic Join (rather than e.g. introducing an Inner-only fast path) keeps the rewrite a single shape and lets the existing optimizer machinery do its job: EliminateOuterJoin can opportunistically narrow LeftOuter to Inner when downstream operators reject NULL right rows; the cross-join check fires uniformly; the SparkStrategies build-side logic picks the right physical operator per join type (the golden file's BuildLeft, Inner vs. previous BuildRight, LeftOuter reflects that).
Findings below are minor -- non-blocking.
General finding -- SQL coverage for INNER NEAREST BY against an empty right side
inputs/join-nearest-by.sql:35-37 covers LEFT OUTER NEAREST BY against an empty right side, but there's no analogous INNER NEAREST BY case. Before this PR, INNER against empty right ran through a join that materialized a row per left + NULL right and was dropped downstream via the Generate-side filter. After this PR, INNER empties at the join (0 rows -> 0 groups -> 0 output), which is a structurally different path. A trivial users INNER JOIN (SELECT * FROM products WHERE false) ... APPROX NEAREST 1 BY ... asserting an empty result would lock that in end-to-end.
|
@cloud-fan Added test coverage for both comments |
dtenedor
left a comment
There was a problem hiding this comment.
LGTM, thanks for adding the extra test coverage requested.
|
LGTM, merging to master and 4.2 |
…e synthetic join ### What changes were proposed in this pull request? This PR changes `RewriteNearestByJoin` to construct its synthetic cross-join with the user's `joinType` (`Inner` or `LeftOuter`) instead of always using `LeftOuter`. The `Generate` operator's `outer` flag continues to be derived from `joinType == LeftOuter`, so the externally observable semantics are unchanged. ### Why are the changes needed? The original implementation hardcoded the synthetic join to `LeftOuter` and justified it on the grounds that `LEFT OUTER` and `INNER` are equivalent for an unconditioned join when the right side is non-empty, and `Generate(outer = false)` would drop unwanted rows for `INNER` when right is empty. That reasoning holds for correctness but has a major performance cost: - **`INNER NEAREST BY` cannot be planned as a Cartesian product.** Spark's strategy picks `CartesianProductExec` only for `Inner` joins with no condition; an unconditioned `LeftOuter` join falls back to `BroadcastNestedLoopJoin`, which tries to broadcast the right side. When the right relation is large, the broadcast either OOMs or exceeds `spark.sql.autoBroadcastJoinThreshold` and the planner is left with no good option. `CartesianProductExec` partitions both sides and streams pairs, so it scales naturally with right-side size. Respecting the user's `INNER` join type re-enables this strategy for the common `INNER NEAREST BY` case. - It also makes the EXPLAIN output misleading (shows `LeftOuter` even though the user wrote `INNER`). - For `INNER` with an empty right side, the old plan generates one row per left input and then filters them away via `Generate(outer = false)` and the `size(matches) > 0` filter -- extra work that respecting `joinType` avoids at the source. ### Does this PR introduce _any_ user-facing change? No change in query results. `EXPLAIN` output for `INNER NEAREST BY` queries now shows `Inner` rather than `LeftOuter` for the synthetic join node, and the physical plan for such queries can now use `CartesianProductExec` instead of `BroadcastNestedLoopJoin` when the right relation is too large to broadcast. ### How was this patch tested? - `RewriteNearestByJoinSuite`: `expectedRewrite` now takes a `joinType: JoinType` and the existing tests (similarity/distance x inner/leftouter, EXACT, boundary k, self-join, nondeterministic ranking) assert the synthetic join matches the user's join type. - Golden file `sql-tests/results/join-nearest-by.sql.out` ### Was this patch authored or co-authored using generative AI tooling? Coauthored-by: Claude Code (Opus 4.7), human-reviewed and tested Closes #56023 from zhidongqu-db/respect-nn-join-type. Authored-by: Zero Qu <zhidong.qu@databricks.com> Signed-off-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> (cherry picked from commit 525caed) Signed-off-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
…py cherry-pick prompts ### What changes were proposed in this pull request? When a committer manually types `branch-M.N` at the cherry-pick prompt while `branch-M.x` exists and has not yet received the commit, the script now surfaces the Upstream-First policy and offers to pick into both branches in one step (the policy-compliant default). The committer can still pick only `branch-M.N` if the commit is genuinely a `branch-M.N`-only maintenance bugfix, or abort. Implementation notes: - Split `cherry_pick` into `_do_cherry_pick` (fetch + cherry-pick + push) and `cherry_pick` (prompt + policy check). The policy wrapper returns a list of refs so the main loop can advance its remaining-branches list correctly when one prompt consumes two branches. - Replace the `branch_iter` iterator with a mutable `remaining_branches` list in the main cherry-pick loop, so picks consumed by the two-branch path are accounted for in the next prompt's default. - Add an `already_picked` parameter to `cherry_pick` so the policy check skips its prompt when `branch-M.x` is in the set of refs already touched this session (e.g. when the PR was merged into `branch-M.x` and the loop is now picking into `branch-M.N`). ### Why are the changes needed? The Upstream-First backporting policy (documented in the header comment of `dev/merge_spark_pr.py`) requires non-bugfix commits to flow through `branch-M.x` before reaching `branch-M.N`. The merge script already orders `branch-M.x` ahead of `branch-M.N` as the cherry-pick default. However, when a committer types `branch-M.N` at the prompt, the script silently proceeds and `branch-M.x` is never revisited. This has led to commits landing on `branch-4.2` but missing `branch-4.x`. Six such commits observed on the current branches (as of 2026-05-22): - SPARK-56700 (#55651) - SPARK-56676 (#55623) - SPARK-56838 (#55836) - SPARK-56650 (#55589) - SPARK-56856 (#55969) - SPARK-56977 (#56023) All six landed on master and `branch-4.2` but were not cherry-picked to `branch-4.x`, requiring follow-up backports. ### Does this PR introduce _any_ user-facing change? Yes for committers using `dev/merge_spark_pr.py`. When the typed cherry-pick target is `branch-M.N` and `branch-M.x` exists and is not yet picked, an additional prompt asks whether to pick into both. Accepting the default ("both") preserves prior behavior plus an extra cherry-pick to `branch-M.x`. No change when the committer accepts the default `branch-M.x` target, or when picking into `branch-M.x` first and `branch-M.N` second (the typical policy-compliant flow). ### How was this patch tested? - `python3 -m doctest dev/merge_spark_pr.py` passes (34/34, all pre-existing tests — none cover the new policy logic). - New `cherry_pick` policy logic was reviewed for behavior but **not exercised end-to-end**: actually running `merge_spark_pr.py` requires committer privileges and a live open PR to merge. Edge cases were traced by reading the code (PR target = master with manual branch-M.N entry; PR target = branch-M.x with default branch-M.N pick; multiple iterations after a two-branch pick). - Reviewers familiar with the merge flow are encouraged to verify behavior on first real use, especially the abort path and the interaction with manual conflict resolution inside `_do_cherry_pick`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7) Closes #56058 from viirya/infra-merge-script-upstream-first-policy. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
What changes were proposed in this pull request?
This PR changes
RewriteNearestByJointo construct its synthetic cross-join with the user'sjoinType(InnerorLeftOuter) instead of always usingLeftOuter. TheGenerateoperator'souterflag continues to be derived fromjoinType == LeftOuter, so the externally observable semantics are unchanged.Why are the changes needed?
The original implementation hardcoded the synthetic join to
LeftOuterand justified it on the grounds thatLEFT OUTERandINNERare equivalent for an unconditioned join when the right side is non-empty, andGenerate(outer = false)would drop unwanted rows forINNERwhen right is empty.That reasoning holds for correctness but has a major performance cost:
INNER NEAREST BYcannot be planned as a Cartesian product. Spark's strategy picksCartesianProductExeconly forInnerjoins with no condition; an unconditionedLeftOuterjoin falls back toBroadcastNestedLoopJoin, which tries to broadcast the right side. When the right relation is large, the broadcast either OOMs or exceedsspark.sql.autoBroadcastJoinThresholdand the planner is left with no good option.CartesianProductExecpartitions both sides and streams pairs, so it scales naturally with right-side size. Respecting the user'sINNERjoin type re-enables this strategy for the commonINNER NEAREST BYcase.LeftOutereven though the user wroteINNER).INNERwith an empty right side, the old plan generates one row per left input and then filters them away viaGenerate(outer = false)and thesize(matches) > 0filter -- extra work that respectingjoinTypeavoids at the source.Does this PR introduce any user-facing change?
No change in query results.
EXPLAINoutput forINNER NEAREST BYqueries now showsInnerrather thanLeftOuterfor the synthetic join node, and the physical plan for such queries can now useCartesianProductExecinstead ofBroadcastNestedLoopJoinwhen the right relation is too large to broadcast.How was this patch tested?
RewriteNearestByJoinSuite:expectedRewritenow takes ajoinType: JoinTypeand the existing tests (similarity/distance x inner/leftouter, EXACT, boundary k, self-join, nondeterministic ranking) assert the synthetic join matches the user's join type.sql-tests/results/join-nearest-by.sql.outWas this patch authored or co-authored using generative AI tooling?
Coauthored-by: Claude Code (Opus 4.7), human-reviewed and tested