Skip to content

[SPARK-47007][SQL][PYTHON][R][CONNECT] Add the map_sort function#45069

Closed
stefankandic wants to merge 22 commits into
apache:masterfrom
stefankandic:SPARK-47007
Closed

[SPARK-47007][SQL][PYTHON][R][CONNECT] Add the map_sort function#45069
stefankandic wants to merge 22 commits into
apache:masterfrom
stefankandic:SPARK-47007

Conversation

@stefankandic

@stefankandic stefankandic commented Feb 8, 2024

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Adding a new function map_sort to:

  • Scala API
  • Python API
  • R API
  • Spark Connect Scala Client
  • Spark Connect Python Client

Why are the changes needed?

In order to add the ability to do GROUP BY on map types we first have to be able to sort the maps by their key

Does this PR introduce any user-facing change?

Yes, new function map_sort

How was this patch tested?

With new UTs

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions Bot added the SQL label Feb 8, 2024
@HyukjinKwon HyukjinKwon changed the title [SPARK-47007] SortMap function [SPARK-47007][SQL] SortMap function Feb 10, 2024
@LuciferYang

Copy link
Copy Markdown
Contributor

Should re-run SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly *ExpressionsSchemaSuite" to re-generate golden files

@stefankandic stefankandic changed the title [SPARK-47007][SQL] SortMap function [SPARK-47007][SQL] MapSort function Feb 28, 2024
@zhengruifeng zhengruifeng changed the title [SPARK-47007][SQL] MapSort function [SPARK-47007][SQL][PYTHON][R][CONNECT] MapSort function Mar 7, 2024
@zhengruifeng

zhengruifeng commented Mar 7, 2024

Copy link
Copy Markdown
Contributor

updated the title since it also touch python/r/connect



@_try_remote_functions
def map_sort(col: "ColumnOrName", asc: bool = True) -> Column:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also put it in the docs at python/docs/source/reference/pyspark.sql/functions.rst.

* @since 4.0.0
*/
def map_sort(e: Column): Column = map_sort(e, asc = true)
// TODO: add test for this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this.

@HyukjinKwon

Copy link
Copy Markdown
Member

cc @cloud-fan too

DataTypeMismatch(
errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> "2",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use ordinalNumber(1). See #45177

DataTypeMismatch(
errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> "1",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> "1",
"requiredType" -> toSQLType(ArrayType),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array? not the map type?

override def checkInputDataTypes(): TypeCheckResult = base.dataType match {
case MapType(kt, _, _) if RowOrdering.isOrderable(kt) =>
ascendingOrder match {
case Literal(_: Boolean, BooleanType) =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the requirement of Literal too much, how about foldable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was set due to convention of SortArray.checkInputDataTypes which uses Literal as well.

Comment on lines +964 to +976
val sortedKeys = Array
.tabulate(numElements)(i => (keys.get(i, keyType).asInstanceOf[Any], i))
.sortBy(_._1)(ordering)

val newKeys = new Array[Any](numElements)
val newValues = new Array[Any](numElements)

sortedKeys.zipWithIndex.foreach { case (elem, index) =>
newKeys(index) = keys.get(elem._2, keyType)
newValues(index) = values.get(elem._2, valueType)
}

new ArrayBasedMapData(new GenericArrayData(newKeys), new GenericArrayData(newValues))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder wouldn't it be easier?

    val sorted = Array
      .tabulate(numElements)(i => (keys.get(i, keyType), values.get(i, valueType)))
      .sortBy(_._1)(ordering)

    ArrayBasedMapData(sorted.map(_._1), sorted.map(_._2))

or there is another reason like simpler Java implementation?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to replace existing tree-based sorting.
Replaced with this shortened impl.


@ExpressionDescription(
usage = """
_FUNC_(map[, ascendingOrder]) - Sorts the input map in ascending or descending order

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you leave a high-level description of the expression here, and add arguments w/ detailed description of map and ascendingOrder.

Since the expression focuses on sorting, describe characteristics of the sort alg: stable or not, and etc.

)
}

test("map_sort function") {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you cover more corner cases like:

  1. empty map
  2. duplicate keys, see the config spark.sql.mapKeyDedupPolicy
  3. null keys
  4. check the error class INVALID_ORDERING_TYPE

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated map_sort tests

Comment thread sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala Outdated
Comment thread sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala Outdated
exception = intercept[SparkRuntimeException] {
sql("SELECT map_sort(map(1, 1, 2, 2, 1, 1))").collect()
},
errorClass = "DUPLICATED_MAP_KEY",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behaviour when spark.sql.mapKeyDedupPolicy is LAST_WIN?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is my understanding that MAP_KEY_DEDUP_POLICY only applies to map creation - map_sort accepts already created maps so the policy will be caught before it even gets to the logic for this function (?)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's remove this test.

usage = """
_FUNC_(map[, ascendingOrder]) - Sorts the input map in ascending or descending order
according to the natural ordering of the map keys. The sorting algorithm used is
an adaptive, stable and iterative merge sort algorithm. If the input map is empty,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to expose the implementation details like merge-sort. It is enough to mention user-visible behavior like stable.

"""
Arguments:
* map - an expression. The map that will be sorted.
* ascendingOrder - an expression. The ordering in which the map will be sorted.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All args are expression in general. Could you mention that it is a boolean parameter, and true means ascending order (apparently false is descending order).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the template for Sequence ExpressionDefinision which states - an expression.

Restructured the docs a bit now.

stevomitric and others added 3 commits March 18, 2024 16:08
…sSuite.scala

Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
…sSuite.scala

Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
}

override def nullSafeEval(array: Any, ascending: Any): Any = {
// put keys and their respective indices inside a tuple

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not relevant comment anymore

val boxedKeyType = CodeGenerator.boxedType(keyType)
val javaKeyType = CodeGenerator.javaType(keyType)

val simpleEntryType = s"java.util.AbstractMap.SimpleEntry<$boxedKeyType, Integer>"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you align the impl to none-codegen, and put values instead of indexes.

exception = intercept[SparkRuntimeException] {
sql("SELECT map_sort(map(1, 1, 2, 2, 1, 1))").collect()
},
errorClass = "DUPLICATED_MAP_KEY",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's remove this test.

Comment thread python/pyspark/sql/functions/builtin.py Outdated
Comment thread python/pyspark/sql/functions/builtin.py Outdated

>>> import pyspark.sql.functions as sf
>>> df = spark.sql("SELECT map(3, 'c', 1, 'a', 2, 'b') as data")
>>> df.select(sf.map_sort(df.data, False)).show()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@MaxGekk MaxGekk left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevomitric @stefankandic Could you update PR's description and title, and fix function and expression names according your changes:

Function:

map_sort

Expression:

case class MapSort

@stefankandic stefankandic changed the title [SPARK-47007][SQL][PYTHON][R][CONNECT] MapSort function [SPARK-47007][SQL][PYTHON][R][CONNECT] map_sort function Mar 19, 2024
Comment thread python/pyspark/sql/functions/builtin.py Outdated
+--------------------+
|map_sort(data, true)|
+--------------------+
|{1 -> a, 2 -> b, ...|

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If truncate = False, why it is truncated?

I have double checked locally:

>>> df.select(sf.map_sort(df.data)).show(truncate=False)
+------------------------+
|map_sort(data, true)    |
+------------------------+
|{1 -> a, 2 -> b, 3 -> c}|
+------------------------+

Comment thread python/pyspark/sql/functions/builtin.py Outdated
+---------------------+
|map_sort(data, false)|
+---------------------+
| {3 -> c, 2 -> b, ...|

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, need to fix the output.

@MaxGekk MaxGekk changed the title [SPARK-47007][SQL][PYTHON][R][CONNECT] map_sort function [SPARK-47007][SQL][PYTHON][R][CONNECT] Add the map_sort function Mar 19, 2024
"inputType" -> toSQLType(ascendingOrder.dataType))
)
}
case MapType(_, _, _) =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's don't depend on the number of parameters in MapType:

Suggested change
case MapType(_, _, _) =>
case _: MapType =>

See https://github.com/databricks/scala-style-guide?tab=readme-ov-file#pattern-matching

override def dataType: DataType = base.dataType

override def checkInputDataTypes(): TypeCheckResult = base.dataType match {
case MapType(kt, _, _) if RowOrdering.isOrderable(kt) =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case MapType(kt, _, _) if RowOrdering.isOrderable(kt) =>
case m: MapType if RowOrdering.isOrderable(m.keyType) =>

val c = ctx.freshName("c")
val newKeys = ctx.freshName("newKeys")
val newValues = ctx.freshName("newValues")
val originalIndex = ctx.freshName("originalIndex")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used somewhere?

@MaxGekk

MaxGekk commented Mar 20, 2024

Copy link
Copy Markdown
Member

+1, LGTM. Merging to master.
Thank you, @stevomitric @stefankandic and @HyukjinKwon @zhengruifeng for review.

@MaxGekk MaxGekk closed this in 747846b Mar 20, 2024
@cloud-fan

Copy link
Copy Markdown
Contributor

Sorry I missed this. Why do we add this public function? Do other systems have it? To support GROUP BY map type, an internal MapSort expression is sufficient.

@MaxGekk

MaxGekk commented Mar 20, 2024

Copy link
Copy Markdown
Member

Do other systems have it?

@stevomitric @stefankandic Could you check other systems, please.

@cloud-fan

Copy link
Copy Markdown
Contributor

I can't find it in other systems, and it does not make sense as map elements are order-less. I'm reverting it, please re-submit it without exposing the function publicly.

@dongjoon-hyun

Copy link
Copy Markdown
Member

+1 for Wenchen's decision. Thank you for reverting.

@stevomitric

Copy link
Copy Markdown
Contributor

Created new PR which omits creating new map_sort function - @MaxGekk @cloud-fan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants