Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5313,6 +5313,49 @@
],
"sqlState" : "0A000"
},
"NEAREST_BY_JOIN" : {
"message" : [
"Invalid nearest-by join."
],
"subClass" : {
"EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
"message" : [
"EXACT nearest-by join is incompatible with the nondeterministic ranking expression <expression>. Use APPROX, or replace the expression with a deterministic one."
]
},
"NON_ORDERABLE_RANKING_EXPRESSION" : {
"message" : [
"The ranking expression <expression> of type <type> is not orderable. Provide an expression that returns an orderable type, such as a numeric distance like abs(a.col - b.col) or a numeric similarity score."
]
},
"NUM_RESULTS_OUT_OF_RANGE" : {
"message" : [
"The number of results <numResults> must be between <min> and <max>. Update the literal in `APPROX NEAREST <numResults> BY ...` (or `EXACT NEAREST <numResults> BY ...`) to fall within that range."
]
},
"STREAMING_NOT_SUPPORTED" : {
"message" : [
"Nearest-by join is not supported with streaming DataFrames/Datasets."
]
},
"UNSUPPORTED_DIRECTION" : {
"message" : [
"Unsupported nearest-by join direction '<direction>'. Supported nearest-by join directions include: <supported>."
]
},
"UNSUPPORTED_JOIN_TYPE" : {
"message" : [
"Unsupported nearest-by join type <joinType>. Supported types: <supported>."
]
},
"UNSUPPORTED_MODE" : {
"message" : [
"Unsupported nearest-by join mode '<mode>'. Supported modes include: <supported>."
]
}
},
"sqlState" : "42604"
},
"NEGATIVE_SCALE_DISALLOWED" : {
"message" : [
"Negative scale is not allowed: '<scale>'. Set the config <sqlConf> to \"true\" to allow it."
Expand Down Expand Up @@ -7837,6 +7880,11 @@
"Referencing a lateral column alias <lca> in window expression <windowExpr>."
]
},
"LATERAL_JOIN_NEAREST_BY" : {
"message" : [
"LATERAL correlation with NEAREST BY clause."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers explicit LATERAL JOIN. Do we care about lateral column alias usage for queries over the results of the nearest-neighbor join as well, or is that orthogonal?

]
},
"LATERAL_JOIN_USING" : {
"message" : [
"JOIN USING with LATERAL correlation."
Expand Down
5 changes: 5 additions & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ Below is a list of all the keywords in Spark SQL.
|ANTI|non-reserved|strict-non-reserved|non-reserved|
|ANY|reserved|non-reserved|reserved|
|ANY_VALUE|non-reserved|non-reserved|non-reserved|
|APPROX|non-reserved|non-reserved|non-reserved|
|ARCHIVE|non-reserved|non-reserved|non-reserved|
|ARRAY|non-reserved|non-reserved|reserved|
|AS|reserved|non-reserved|reserved|
Expand Down Expand Up @@ -515,6 +516,7 @@ Below is a list of all the keywords in Spark SQL.
|DFS|non-reserved|non-reserved|non-reserved|
|DIRECTORIES|non-reserved|non-reserved|non-reserved|
|DIRECTORY|non-reserved|non-reserved|non-reserved|
|DISTANCE|non-reserved|non-reserved|non-reserved|
|DISTINCT|reserved|non-reserved|reserved|
|DISTRIBUTE|non-reserved|non-reserved|non-reserved|
|DIV|non-reserved|non-reserved|not a keyword|
Expand All @@ -528,6 +530,7 @@ Below is a list of all the keywords in Spark SQL.
|ESCAPE|reserved|non-reserved|reserved|
|ESCAPED|non-reserved|non-reserved|non-reserved|
|EVOLUTION|non-reserved|non-reserved|non-reserved|
|EXACT|non-reserved|non-reserved|non-reserved|
|EXCEPT|reserved|strict-non-reserved|reserved|
|EXCHANGE|non-reserved|non-reserved|non-reserved|
|EXCLUDE|non-reserved|non-reserved|non-reserved|
Expand Down Expand Up @@ -648,6 +651,7 @@ Below is a list of all the keywords in Spark SQL.
|NANOSECOND|non-reserved|non-reserved|non-reserved|
|NANOSECONDS|non-reserved|non-reserved|non-reserved|
|NATURAL|reserved|strict-non-reserved|reserved|
|NEAREST|non-reserved|non-reserved|non-reserved|
|NEXT|non-reserved|non-reserved|non-reserved|
|NO|non-reserved|non-reserved|reserved|
|NONE|non-reserved|non-reserved|reserved|
Expand Down Expand Up @@ -738,6 +742,7 @@ Below is a list of all the keywords in Spark SQL.
|SETS|non-reserved|non-reserved|non-reserved|
|SHORT|non-reserved|non-reserved|non-reserved|
|SHOW|non-reserved|non-reserved|non-reserved|
|SIMILARITY|non-reserved|non-reserved|non-reserved|
|SINGLE|non-reserved|non-reserved|non-reserved|
|SKEWED|non-reserved|non-reserved|non-reserved|
|SMALLINT|non-reserved|non-reserved|reserved|
Expand Down
26 changes: 25 additions & 1 deletion docs/sql-ref-syntax-qry-select-join.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ A SQL join is used to combine rows from two relations based on join criteria. Th
### Syntax

```sql
relation { [ join_type ] JOIN [ LATERAL ] relation [ join_criteria ] | NATURAL join_type JOIN [ LATERAL ] relation }
relation { [ join_type ] JOIN [ LATERAL ] relation [ join_criteria | nearest_by_clause ] | NATURAL join_type JOIN [ LATERAL ] relation }
```

### Parameters
Expand All @@ -53,6 +53,30 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [ join_criteria ] | NATURAL j

Specifies an expression with a return type of boolean.

* **nearest_by_clause**

Specifies a nearest-by top-K ranking join. For each row on the left (query side), returns up to `num_results` rows from the right (base side), ranked by `ranking_expression`. Only `INNER` (the default) and `LEFT OUTER` join types are supported with this clause.

**Syntax:** `{ APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE | SIMILARITY } ranking_expression`

`APPROX | EXACT`

Controls the search algorithm contract. `APPROX` allows the optimizer to use faster approximate strategies (such as indexed nearest-neighbor search when available). `EXACT` forces brute-force evaluation and requires `ranking_expression` to be deterministic.

`num_results`

A positive integer literal between 1 and 100000 that limits the number of matches per left row. Defaults to 1 when omitted.

`DISTANCE | SIMILARITY`

`DISTANCE` ranks rows by smallest value of `ranking_expression` first. `SIMILARITY` ranks rows by largest value first. Matched right-side rows are emitted in best-first order: smallest ranking value first under `DISTANCE`, largest first under `SIMILARITY`. (Downstream operators may reorder; add an explicit `ORDER BY` if you need to lock in the ordering.)

`ranking_expression`

A scalar expression that returns an orderable type. Must be deterministic with `EXACT`; may be nondeterministic with `APPROX` (e.g., `rand()` for randomized tie-breaking). The expression is evaluated once per (left, right) pair on the brute-force path, so avoid expensive or side-effecting UDFs in ranking expressions.

**Performance note.** The current implementation evaluates the full cross-product of the left and right sides and bounds memory per left row by `num_results`. Per-query work is `O(|left| × |right| × log num_results)`. Index-backed approximate strategies (transparent to `APPROX` queries) are planned in a future release; until then, pre-filter the right side (e.g. via a subquery) when it is large.

### Join Types

#### **Inner Join**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ AND: 'AND';
ANTI: 'ANTI';
ANY: 'ANY';
ANY_VALUE: 'ANY_VALUE';
APPROX: 'APPROX';
ARCHIVE: 'ARCHIVE';
ARRAY: 'ARRAY' {incComplexTypeLevelCounter();};
AS: 'AS';
Expand Down Expand Up @@ -234,6 +235,7 @@ DETERMINISTIC: 'DETERMINISTIC';
DFS: 'DFS';
DIRECTORIES: 'DIRECTORIES';
DIRECTORY: 'DIRECTORY';
DISTANCE: 'DISTANCE';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DIV: 'DIV';
Expand All @@ -247,6 +249,7 @@ ENFORCED: 'ENFORCED';
ESCAPE: 'ESCAPE';
ESCAPED: 'ESCAPED';
EVOLUTION: 'EVOLUTION';
EXACT: 'EXACT';
EXCEPT: 'EXCEPT';
EXCHANGE: 'EXCHANGE';
EXCLUDE: 'EXCLUDE';
Expand Down Expand Up @@ -366,6 +369,7 @@ NAMESPACES: 'NAMESPACES';
NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NEAREST: 'NEAREST';
NEXT: 'NEXT';
NO: 'NO';
NONE: 'NONE';
Expand Down Expand Up @@ -456,6 +460,7 @@ SETMINUS: 'MINUS';
SETS: 'SETS';
SHORT: 'SHORT';
SHOW: 'SHOW';
SIMILARITY: 'SIMILARITY';
SINGLE: 'SINGLE';
SKEWED: 'SKEWED';
SMALLINT: 'SMALLINT';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ relationExtension
;

joinRelation
: (joinType) JOIN LATERAL? right=relationPrimary joinCriteria?
: (joinType) JOIN LATERAL? right=relationPrimary (joinCriteria | nearestByClause)?
| NATURAL joinType JOIN LATERAL? right=relationPrimary
;

Expand All @@ -1068,6 +1068,10 @@ joinCriteria
| USING identifierList
;

nearestByClause
: (APPROX | EXACT) NEAREST num=INTEGER_VALUE? BY (DISTANCE | SIMILARITY) expression
;

sample
: TABLESAMPLE LEFT_PAREN sampleMethod? RIGHT_PAREN (REPEATABLE LEFT_PAREN seed=integerValue RIGHT_PAREN)?
;
Expand Down Expand Up @@ -1930,6 +1934,7 @@ ansiNonReserved
| ANALYZE
| ANTI
| ANY_VALUE
| APPROX
| ARCHIVE
| ARRAY
| ASC
Expand Down Expand Up @@ -2006,6 +2011,7 @@ ansiNonReserved
| DFS
| DIRECTORIES
| DIRECTORY
| DISTANCE
| DISTRIBUTE
| DIV
| DO
Expand All @@ -2015,6 +2021,7 @@ ansiNonReserved
| ENFORCED
| ESCAPED
| EVOLUTION
| EXACT
| EXCHANGE
| EXCLUDE
| EXCLUSIVE
Expand Down Expand Up @@ -2112,6 +2119,7 @@ ansiNonReserved
| NAMESPACES
| NANOSECOND
| NANOSECONDS
| NEAREST
| NEXT
| NO
| NONE
Expand Down Expand Up @@ -2187,6 +2195,7 @@ ansiNonReserved
| SETS
| SHORT
| SHOW
| SIMILARITY
| SINGLE
| SKEWED
| SMALLINT
Expand Down Expand Up @@ -2303,6 +2312,7 @@ nonReserved
| AND
| ANY
| ANY_VALUE
| APPROX
| ARCHIVE
| ARRAY
| AS
Expand Down Expand Up @@ -2398,6 +2408,7 @@ nonReserved
| DFS
| DIRECTORIES
| DIRECTORY
| DISTANCE
| DISTINCT
| DISTRIBUTE
| DIV
Expand All @@ -2411,6 +2422,7 @@ nonReserved
| ESCAPE
| ESCAPED
| EVOLUTION
| EXACT
| EXCHANGE
| EXCLUDE
| EXCLUSIVE
Expand Down Expand Up @@ -2523,6 +2535,7 @@ nonReserved
| NAMESPACES
| NANOSECOND
| NANOSECONDS
| NEAREST
| NEXT
| NO
| NONE
Expand Down Expand Up @@ -2609,6 +2622,7 @@ nonReserved
| SETS
| SHORT
| SHOW
| SIMILARITY
| SINGLE
| SKEWED
| SMALLINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,33 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx)
}

def nearestByJoinWithLateralUnsupportedError(ctx: ParserRuleContext): Throwable = {
new ParseException(
errorClass = "UNSUPPORTED_FEATURE.LATERAL_JOIN_NEAREST_BY",
messageParameters = Map.empty,
ctx)
}

def unsupportedNearestByJoinTypeError(
ctx: ParserRuleContext,
joinType: String,
supported: String): Throwable = {
new ParseException(
errorClass = "NEAREST_BY_JOIN.UNSUPPORTED_JOIN_TYPE",
messageParameters = Map("joinType" -> toSQLStmt(joinType), "supported" -> supported),
ctx)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use NearestByJoinType.supportedDisplay instead of hardcoding the same string in two places — that constant exists precisely so the SQL and DataFrame paths stay in sync.

Suggested change
ctx)
Map("joinType" -> toSQLStmt(joinType), "supported" -> NearestByJoinType.supportedDisplay),

(Will need a NearestByJoinType import.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryParsingErrors lives in sql/api, which can't depend on sql/catalyst (where NearestByJoinType is defined) I Now i have the constant that is passed through as a param from AstBuilder rather than referenced directly here. Hope it is okay ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move NearestByJoinType to sql/api?

}

def nearestByJoinNumResultsOutOfRangeError(
ctx: ParserRuleContext,
numResults: String,
max: Int): Throwable = {
new ParseException(
errorClass = "NEAREST_BY_JOIN.NUM_RESULTS_OUT_OF_RANGE",
messageParameters = Map("numResults" -> numResults, "min" -> "1", "max" -> max.toString),
ctx)
}

def repetitiveWindowDefinitionError(name: String, ctx: WindowClauseContext): Throwable = {
new ParseException(
errorClass = "INVALID_SQL_SYNTAX.REPETITIVE_WINDOW_DEFINITION",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,34 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
messageParameters = Map.empty)
}

// Reject streaming inputs early. The optimizer rewrite is built around an
// unconditioned cross-product fed into a global `Aggregate` keyed by a per-row
// identifier (`__qid`). That shape doesn't compose cleanly with structured-streaming
// semantics: a stateful aggregate keyed by a freshly-generated identifier accumulates
// state indefinitely (every batch creates new keys, old keys never match again) and a
// cross-product against a streaming right side has no bounded state model today.
// Failing at analysis time is clearer than letting either fail at runtime. Streaming
// support is tracked as a follow-up; resolving it likely comes from a different
// grouping strategy or a dedicated physical operator.
case j: NearestByJoin if j.isStreaming =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the streaming guard: the current comment frames the issue as "MID is per-batch only", but MID itself is fine within a batch - the real blocker is that the rewrite uses a global Aggregate keyed by __qid, which Spark turns into a stateful streaming aggregation. Across micro-batches MID values restart, so state entries from old batches get merged with new rows for the same __qid, producing wrong top-K results.

The MID is just an implementation detail (we only need a per-row group key), so streaming support doesn't have to wait on a streaming-aware MID.

A few directions for the follow-up:

  1. Group by struct(left.*) instead of MID. Pure Catalyst change - every distinct left row is its own group. Need to handle duplicate left rows (carry a count, expand at the end) and bail out on map-typed left columns. Lowest-risk path.
  2. Dedicated physical operator that does per-row top-K against a broadcast/streaming right side, no cross-join + aggregate. This is also the operator the SPIP calls out as future work for performance, so it solves two problems at once.
  3. Batch-scoped aggregate (include batch_id in the key, or a non-incremental aggregate variant) - doable but tangles us up with streaming state/watermark semantics, not worth it IMO.

Happy to leave the guard in this PR; just suggesting we update the comment to reflect the actual reason so future-us isn't misled.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhidongqu-db Thanks for the suggestion. I would try to address this in a folllow-up.

j.failAnalysis(
errorClass = "NEAREST_BY_JOIN.STREAMING_NOT_SUPPORTED",
messageParameters = Map.empty)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the CheckCartesianProducts skip was the right call, but the user-visible failure now leaks rewrite internals. With spark.sql.crossJoin.enabled = false, a JOIN ... APPROX NEAREST 1 BY ... query fails with _LEGACY_ERROR_TEMP_1211:

Detected implicit cartesian product for LEFT OUTER join between logical plans
Project [... uuid(Some(<random>)) AS __qid#x] +- ...
and ...
Join condition is missing or trivial.
Either: use the CROSS JOIN syntax ..., or: enable implicit cartesian products by setting spark.sql.crossJoin.enabled=true.

The user wrote JOIN ... APPROX NEAREST 1 BY ... and gets back: LEFT OUTER (a join type they didn't write — the rewrite's synthetic outer), __qid and uuid(Some(...)) (purely internal), and "use the CROSS JOIN syntax" advice that doesn't apply to NEAREST BY. Only the conf-flip suggestion is actionable.

The new test golden output (sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out, around the new _LEGACY_ERROR_TEMP_1211 block) makes this concrete — that's exactly what the user sees.

Could this be gated here, alongside STREAMING_NOT_SUPPORTED, with a NEAREST BY-specific arm — e.g.:

case j: NearestByJoin if !conf.crossJoinEnabled =>
  j.failAnalysis(
    errorClass = "NEAREST_BY_JOIN.CROSS_JOIN_NOT_ENABLED",
    messageParameters = Map.empty)

plus a new sub-condition explaining that NEAREST BY is implemented as a bounded cross-product and pointing the user at spark.sql.crossJoin.enabled = true. Since CheckAnalysis runs before the optimizer rewrite, the synthetic plan whose internals leak into _LEGACY_ERROR_TEMP_1211 would never be produced for this case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a follow-up on this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang Thanks !! Will follow-up on this.


case j @ NearestByJoin(_, _, _, _, _, rankingExpression, _)
if !RowOrdering.isOrderable(rankingExpression.dataType) =>
j.failAnalysis(
errorClass = "NEAREST_BY_JOIN.NON_ORDERABLE_RANKING_EXPRESSION",
messageParameters = Map(
"expression" -> toSQLExpr(rankingExpression),
"type" -> toSQLType(rankingExpression.dataType)))

case j @ NearestByJoin(_, _, _, false, _, rankingExpression, _)
if !rankingExpression.deterministic =>
j.failAnalysis(

@sigmod sigmod May 4, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to fail this case?
We still call the result of the following query "exact results" rather than "approximate results"?

SELECT any_value(t.v)
FROM t

I view them as

  • exact results: can be deterministic or non-deterministic, but deliver a well-defined semantics w.r.t. input/output.
  • approx results: there's no well-defined semantics w.r.t. input/output.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this depends on how we define EXACT semantic here. We explicitly mentioned in the SPIP that EXACT with non-deterministic ordering expr should fail. The intention was to have the EXACT keyword express the semantic of deterministic ordering given a deterministic input and scoring expr. If the scoring expr is not deterministic in the first place - e.g. LLM generated scores, the query would fail and user should use APPROX where the keyword explicitly does not imply deterministic results

errorClass = "NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
messageParameters = Map("expression" -> toSQLExpr(rankingExpression)))

case a: Aggregate =>
a.groupingExpressions.foreach(
expression =>
Expand Down Expand Up @@ -949,6 +977,17 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
context = j.origin.getQueryContext,
summary = j.origin.context.summary)

case j: NearestByJoin if !j.duplicateResolved =>
val conflictingAttributes =
j.left.outputSet.intersect(j.right.outputSet).map(toSQLExpr(_)).mkString(", ")
throw SparkException.internalError(
msg = s"""
|Failure when resolving conflicting references in ${j.nodeName}:
|${planToString(plan)}
|Conflicting attributes: $conflictingAttributes.""".stripMargin,
context = j.origin.getQueryContext,
summary = j.origin.context.summary)

// TODO: although map type is not orderable, technically map type should be able to be
// used in equality comparison, remove this type check once we support it.
case o if mapColumnInSetOperation(o).isDefined =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty)

newPlan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
_.containsAnyPattern(
JOIN, LATERAL_JOIN, AS_OF_JOIN, NEAREST_BY_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
ruleId) {
case p: LogicalPlan if !p.childrenResolved => p
// To resolve duplicate expression IDs for Join.
Expand All @@ -50,6 +51,10 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
case j @ AsOfJoin(left, right, _, _, _, _, _)
if !j.duplicateResolved && noMissingInput(right) =>
j.copy(right = dedupRight(left, right))
// Resolve duplicate output for NearestByJoin.
case j @ NearestByJoin(left, right, _, _, _, _, _)
if !j.duplicateResolved && noMissingInput(right) =>
j.copy(right = dedupRight(left, right))
// intersect/except will be rewritten to join at the beginning of optimizer. Here we need to
// deduplicate the right side plan, so that we won't produce an invalid self-join later.
case i @ Intersect(left, right, _) if !i.duplicateResolved && noMissingInput(right) =>
Expand Down
Loading