Skip to content

Commit 0d06aa5

Browse files
committed
docs(udf): add user-guide page and runnable AddOne example
1 parent 086cba7 commit 0d06aa5

3 files changed

Lines changed: 180 additions & 0 deletions

File tree

docs/source/user-guide/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ sessioncontext
3737
dataframe
3838
parquet
3939
proto-plans
40+
scalar-udf
4041
api-reference
4142
```
4243

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Scalar UDFs
21+
22+
A scalar UDF is a Java-implemented SQL function that operates on one row at a
23+
time, expressed in vectorised form: each invocation receives a batch of input
24+
columns and returns a single output column of the same length.
25+
26+
## Implement
27+
28+
Implement the `ScalarUdf` interface:
29+
30+
```java
31+
import java.util.List;
32+
import org.apache.arrow.memory.BufferAllocator;
33+
import org.apache.arrow.vector.FieldVector;
34+
import org.apache.arrow.vector.IntVector;
35+
import org.apache.datafusion.ScalarUdf;
36+
37+
public final class AddOne implements ScalarUdf {
38+
@Override
39+
public FieldVector evaluate(BufferAllocator allocator, List<FieldVector> args) {
40+
IntVector in = (IntVector) args.get(0);
41+
IntVector out = new IntVector("add_one", allocator);
42+
out.allocateNew(in.getValueCount());
43+
for (int i = 0; i < in.getValueCount(); i++) {
44+
if (in.isNull(i)) out.setNull(i);
45+
else out.set(i, in.get(i) + 1);
46+
}
47+
out.setValueCount(in.getValueCount());
48+
return out;
49+
}
50+
}
51+
```
52+
53+
Allocate any new vectors — including the result — from the supplied
54+
`BufferAllocator`. The input vectors are read-only views; do not close them.
55+
Ownership of the returned vector transfers to the framework on return.
56+
57+
## Register
58+
59+
```java
60+
try (SessionContext ctx = new SessionContext()) {
61+
ctx.registerUdf(
62+
"add_one",
63+
new AddOne(),
64+
new ArrowType.Int(32, true),
65+
List.of(new ArrowType.Int(32, true)),
66+
Volatility.IMMUTABLE);
67+
68+
try (DataFrame df = ctx.sql("SELECT add_one(x) FROM t");
69+
ArrowReader r = df.collect(allocator)) {
70+
// ...
71+
}
72+
}
73+
```
74+
75+
The signature is exact: a call must match the declared `argTypes` exactly. Use
76+
`Volatility.IMMUTABLE` for pure functions, `STABLE` for functions that are
77+
deterministic within a single query, and `VOLATILE` for non-deterministic
78+
functions.
79+
80+
## Errors
81+
82+
If the UDF throws, the exception class and message surface in the
83+
`RuntimeException` raised from `collect()`. If the returned vector is `null`,
84+
has the wrong row count, or the wrong type, the runtime raises a
85+
`RuntimeException` with a descriptive message.
86+
87+
## Threading
88+
89+
DataFusion may invoke a UDF concurrently from multiple worker threads. If the
90+
implementation carries mutable state, the implementation must synchronize it.
91+
92+
## Limitations (v1)
93+
94+
- Scalar UDFs only — no aggregates, window functions, or table functions.
95+
- Exact-signature only — no variadic or polymorphic argument lists.
96+
- No nullable-argument short-circuiting; null inputs are passed through to the
97+
UDF as nulls in the input vector.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.datafusion.examples;
21+
22+
import java.util.List;
23+
import org.apache.arrow.memory.BufferAllocator;
24+
import org.apache.arrow.memory.RootAllocator;
25+
import org.apache.arrow.vector.FieldVector;
26+
import org.apache.arrow.vector.IntVector;
27+
import org.apache.arrow.vector.VectorSchemaRoot;
28+
import org.apache.arrow.vector.ipc.ArrowReader;
29+
import org.apache.arrow.vector.types.pojo.ArrowType;
30+
import org.apache.datafusion.DataFrame;
31+
import org.apache.datafusion.ScalarUdf;
32+
import org.apache.datafusion.SessionContext;
33+
import org.apache.datafusion.Volatility;
34+
35+
/** Demonstrates registering a Java scalar UDF and invoking it from SQL. */
36+
public final class AddOneExample {
37+
38+
/** Adds 1 to each value of an Int32 column. */
39+
public static final class AddOne implements ScalarUdf {
40+
@Override
41+
public FieldVector evaluate(BufferAllocator allocator, List<FieldVector> args) {
42+
IntVector in = (IntVector) args.get(0);
43+
IntVector out = new IntVector("add_one_out", allocator);
44+
int n = in.getValueCount();
45+
out.allocateNew(n);
46+
for (int i = 0; i < n; i++) {
47+
if (in.isNull(i)) {
48+
out.setNull(i);
49+
} else {
50+
out.set(i, in.get(i) + 1);
51+
}
52+
}
53+
out.setValueCount(n);
54+
return out;
55+
}
56+
}
57+
58+
public static void main(String[] args) throws Exception {
59+
try (SessionContext ctx = new SessionContext();
60+
BufferAllocator allocator = new RootAllocator()) {
61+
ctx.registerUdf(
62+
"add_one",
63+
new AddOne(),
64+
new ArrowType.Int(32, true),
65+
List.of(new ArrowType.Int(32, true)),
66+
Volatility.IMMUTABLE);
67+
68+
try (DataFrame df =
69+
ctx.sql(
70+
"SELECT add_one(x) AS y FROM (VALUES (CAST(1 AS INT)),(CAST(2 AS INT)),(CAST(3 AS INT))) AS t(x)");
71+
ArrowReader reader = df.collect(allocator)) {
72+
while (reader.loadNextBatch()) {
73+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
74+
IntVector y = (IntVector) root.getVector("y");
75+
for (int i = 0; i < y.getValueCount(); i++) {
76+
System.out.println("y = " + y.get(i));
77+
}
78+
}
79+
}
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)