From 5af5bf8dd07b7e7890d7aefd1b9a89933c824f5e Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 24 Jun 2026 14:19:33 +0300 Subject: [PATCH 1/9] IGNITE-28813 wip --- .../query/calcite/CalciteQueryProcessor.java | 14 ++ .../calcite/exec/exp/agg/Accumulators.java | 87 +++++---- .../exp/agg/PluginAccumulatorsExtension.java | 38 ++++ ...AggregatFunctionViaPluginProviderTest.java | 167 ++++++++++++++++++ .../testsuites/IntegrationTestSuite.java | 2 + 5 files changed, 271 insertions(+), 37 deletions(-) create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java create mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index ca6fbd6f390a8..05ef25e19c21a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -82,6 +82,8 @@ import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension; import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig; @@ -313,6 +315,8 @@ public CalciteQueryProcessor(GridKernalContext ctx) { } distrCfg = new DistributedCalciteConfiguration(ctx, log); + + extendAccumulatorsFromPlugins(ctx); } /** @@ -853,4 +857,14 @@ public FrameworkConfig frameworkConfig() { public InjectResourcesService injectService() { return injectSvc; } + + /** */ + private static void extendAccumulatorsFromPlugins(GridKernalContext ctx) { + PluginAccumulatorsExtension[] extensions = ctx.plugins().extensions(PluginAccumulatorsExtension.class); + + if (!F.isEmpty(extensions)) { + for (PluginAccumulatorsExtension extension : extensions) + Accumulators.addPluginAccumulatorFactories(extension.accumulatorFactories()); + } + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java index 9c15e4a7a2375..8af20faf6ff85 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java @@ -24,8 +24,10 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; import org.apache.calcite.avatica.util.ByteString; @@ -37,6 +39,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension.PluginAccumulatorFactory; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; @@ -54,6 +57,9 @@ * */ public class Accumulators { + /** */ + private static final Map> PLUGIN_FACTORY_BY_NAME = new ConcurrentHashMap<>(); + /** */ public static Supplier> accumulatorFactory(AggregateCall call, ExecutionContext ctx) { Supplier> supplier = accumulatorFunctionFactory(call, ctx); @@ -71,38 +77,29 @@ private static Supplier> accumulatorFunctionFactory( ) { RowHandler hnd = ctx.rowHandler(); - switch (call.getAggregation().getName()) { - case "COUNT": - return () -> new LongCount<>(call, hnd); - case "AVG": - return avgFactory(call, hnd); - case "SUM": - return sumFactory(call, hnd); - case "$SUM0": - return sumEmptyIsZeroFactory(call, hnd); - case "MIN": - case "EVERY": - return minFactory(call, hnd); - case "MAX": - case "SOME": - return maxFactory(call, hnd); - case "SINGLE_VALUE": - return () -> new SingleVal<>(call, hnd); - case "LITERAL_AGG": - return () -> new LiteralVal<>(call, hnd); - case "ANY_VALUE": - return () -> new AnyVal<>(call, hnd); - case "LISTAGG": - case "ARRAY_AGG": - case "ARRAY_CONCAT_AGG": - return listAggregateSupplier(call, ctx); - case "BIT_AND": - case "BIT_OR": - case "BIT_XOR": - return bitWiseFactory(call, hnd); - default: - throw new AssertionError(call.getAggregation().getName()); - } + String aggFunName = call.getAggregation().getName(); + + return switch (aggFunName) { + case "COUNT" -> () -> new LongCount<>(call, hnd); + case "AVG" -> avgFactory(call, hnd); + case "SUM" -> sumFactory(call, hnd); + case "$SUM0" -> sumEmptyIsZeroFactory(call, hnd); + case "MIN", "EVERY" -> minFactory(call, hnd); + case "MAX", "SOME" -> maxFactory(call, hnd); + case "SINGLE_VALUE" -> () -> new SingleVal<>(call, hnd); + case "LITERAL_AGG" -> () -> new LiteralVal<>(call, hnd); + case "ANY_VALUE" -> () -> new AnyVal<>(call, hnd); + case "LISTAGG", "ARRAY_AGG", "ARRAY_CONCAT_AGG" -> listAggregateSupplier(call, ctx); + case "BIT_AND", "BIT_OR", "BIT_XOR" -> bitWiseFactory(call, hnd); + default -> { + PluginAccumulatorFactory factory = (PluginAccumulatorFactory) PLUGIN_FACTORY_BY_NAME.get(aggFunName); + + if (factory == null) + throw new AssertionError("Accumulator factory not found for: " + aggFunName); + + yield () -> factory.create(call, ctx); + } + }; } /** */ @@ -280,7 +277,7 @@ private static Supplier> maxFactory(AggregateCall call, R } /** */ - private abstract static class AbstractAccumulator implements Accumulator { + public abstract static class AbstractAccumulator implements Accumulator { /** */ private final RowHandler hnd; @@ -288,13 +285,13 @@ private abstract static class AbstractAccumulator implements Accumulator hnd) { + protected AbstractAccumulator(AggregateCall aggCall, RowHandler hnd) { this.aggCall = aggCall; this.hnd = hnd; } /** */ - T get(int idx, Row row) { + protected T get(int idx, Row row) { assert idx < arguments().size() : "idx=" + idx + "; arguments=" + arguments(); return (T)hnd.get(arguments().get(idx), row); @@ -311,7 +308,7 @@ protected List arguments() { } /** */ - int columnCount(Row row) { + protected int columnCount(Row row) { return hnd.columnCount(row); } } @@ -1344,8 +1341,9 @@ public ListAggAccumulator(AggregateCall aggCall, RowHandler hnd) { if (builder == null) builder = new StringBuilder(); - if (builder.length() != 0) + if (!builder.isEmpty()) builder.append(extractSeparator(row)); + builder.append(val); } @@ -1510,4 +1508,19 @@ private DistinctAccumulator(AggregateCall aggCall, RowHandler hnd, Supplier return acc.returnType(typeFactory); } } + + /** */ + public static void addPluginAccumulatorFactories(Map> factoryByAggFunName) { + for (Map.Entry> e : factoryByAggFunName.entrySet()) { + String aggFunName = e.getKey().trim().toUpperCase(Locale.ROOT); + + if (aggFunName.isBlank()) + throw new AssertionError("Invalid aggregate function name: " + aggFunName); + + PluginAccumulatorFactory prev = PLUGIN_FACTORY_BY_NAME.putIfAbsent(aggFunName, e.getValue()); + +// if (prev != null) +// throw new AssertionError("Duplicate aggregate function name: " + aggFunName); + } + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java new file mode 100644 index 0000000000000..aa80d7f11c782 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; + +import java.util.Map; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.plugin.Extension; +import org.apache.ignite.plugin.PluginProvider; + +/** Class for extending {@link Accumulators} via {@link PluginProvider plugins}. */ +@FunctionalInterface +public interface PluginAccumulatorsExtension extends Extension { + /** @return Accumulator factories by aggregate function name. Name must be non-empty and unique. */ + Map> accumulatorFactories(); + + /** */ + @FunctionalInterface + interface PluginAccumulatorFactory { + /** */ + Accumulator create(AggregateCall call, ExecutionContext ctx); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java new file mode 100644 index 0000000000000..b53ee86118916 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.integration; + +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable; +import org.apache.calcite.sql.util.SqlOperatorTables; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.util.Optionality; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators.AbstractAccumulator; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.jspecify.annotations.Nullable; +import org.junit.Test; + +/** Test for adding aggregat function via {@link PluginProvider}. */ +public class AddAggregatFunctionViaPluginProviderTest extends AbstractBasicIntegrationTest { + /** */ + private static final String TEST_SUM_FUN_NAME = "TEST_SUM"; + + /** {@inheritDoc} */ + @Override protected int nodeCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setPluginProviders(new TestPluginProvider()); + } + + /** */ + @Test + public void test() { + assertQuery("SELECT TEST_SUM(x) FROM (VALUES (1), (2), (3)) t(x)") + .returns(6L) + .check(); + } + + /** */ + private static class TestPluginProvider extends AbstractTestPluginProvider { + /** {@inheritDoc} */ + @Override public String name() { + return getClass().getName(); + } + + /** {@inheritDoc} */ + @Override public @Nullable T createComponent(PluginContext ctx, Class cls) { + if (!FrameworkConfig.class.equals(cls)) + return null; + + FrameworkConfig cfg = CalciteQueryProcessor.FRAMEWORK_CONFIG; + + return (T) Frameworks.newConfigBuilder(cfg) + .operatorTable(SqlOperatorTables.chain( + new TestSqlOperatorTable().init(), cfg.getOperatorTable() + )) + .build(); + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + registry.registerExtension( + PluginAccumulatorsExtension.class, + () -> Map.of(TEST_SUM_FUN_NAME, (call, ctx1) -> new TestSum<>(call, ctx1.rowHandler())) + ); + } + } + + /** */ + public static class TestSqlSumAggFunction extends SqlAggFunction { + /** */ + public TestSqlSumAggFunction() { + super( + TEST_SUM_FUN_NAME, + null, + SqlKind.SUM, + ReturnTypes.AGG_SUM, + null, + OperandTypes.NUMERIC, + SqlFunctionCategory.NUMERIC, + false, + false, + Optionality.FORBIDDEN + ); + } + } + + /** */ + public static class TestSqlOperatorTable extends ReflectiveSqlOperatorTable { + /** */ + @SuppressWarnings("unused") + public static final SqlAggFunction TEST_SUM = new TestSqlSumAggFunction(); + } + + /** */ + private static class TestSum extends AbstractAccumulator { + /** */ + private long sum; + + /** */ + protected TestSum(AggregateCall aggCall, RowHandler hnd) { + super(aggCall, hnd); + } + + /** {@inheritDoc} */ + @Override public void add(Row row) { + Number val = get(0, row); + + if (val != null) + sum += val.longValue(); + } + + /** {@inheritDoc} */ + @Override public void apply(Accumulator other) { + sum += ((TestSum)other).sum; + } + + /** {@inheritDoc} */ + @Override public Object end() { + return sum; + } + + /** {@inheritDoc} */ + @Override public List argumentTypes(IgniteTypeFactory typeFactory) { + return List.of(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true)); + } + + /** {@inheritDoc} */ + @Override public RelDataType returnType(IgniteTypeFactory typeFactory) { + return typeFactory.createSqlType(org.apache.calcite.sql.type.SqlTypeName.BIGINT); + } + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java index f4e123c6a7c40..177e07c04a712 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.query.calcite.CancelTest; import org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest; import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest; +import org.apache.ignite.internal.processors.query.calcite.integration.AddAggregatFunctionViaPluginProviderTest; import org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.CacheStoreTest; @@ -183,6 +184,7 @@ CacheWithInterceptorIntegrationTest.class, TxThreadLockingTest.class, SelectByKeyFieldTest.class, + AddAggregatFunctionViaPluginProviderTest.class, }) public class IntegrationTestSuite { } From 747cdabe3f32e1d1ce9ef56b48247658a3fab3d7 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 24 Jun 2026 17:35:23 +0300 Subject: [PATCH 2/9] IGNITE-28813 Wip --- .../query/calcite/CalciteQueryProcessor.java | 26 +++--- ...Extension.java => AccumulatorFactory.java} | 18 +---- .../calcite/exec/exp/agg/Accumulators.java | 27 ++----- .../agg/PluginAccumulatorFactoryRegistry.java | 81 +++++++++++++++++++ ...AggregatFunctionViaPluginProviderTest.java | 4 +- 5 files changed, 105 insertions(+), 51 deletions(-) rename modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/{PluginAccumulatorsExtension.java => AccumulatorFactory.java} (62%) create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorFactoryRegistry.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 05ef25e19c21a..d2df50ac51afb 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -82,8 +82,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorFactoryRegistry; import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig; @@ -282,9 +281,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query public CalciteQueryProcessor(GridKernalContext ctx) { super(ctx); - FrameworkConfig customFrameworkCfg = ctx.plugins().createComponent(FrameworkConfig.class); - frameworkCfg = customFrameworkCfg != null ? customFrameworkCfg : FRAMEWORK_CONFIG; - + frameworkCfg = frameworkCfg(ctx); failureProcessor = ctx.failure(); schemaHolder = new SchemaHolderImpl(ctx, frameworkCfg); qryPlanCache = new QueryPlanCacheImpl(ctx); @@ -315,8 +312,6 @@ public CalciteQueryProcessor(GridKernalContext ctx) { } distrCfg = new DistributedCalciteConfiguration(ctx, log); - - extendAccumulatorsFromPlugins(ctx); } /** @@ -718,8 +713,7 @@ private T processQuery( if (timeout <= 0) timeout = distrCfg.defaultQueryTimeout(); - if (frameworkCfg != FRAMEWORK_CONFIG) - qryCtx = QueryContext.of(frameworkCfg, qryCtx); + qryCtx = QueryContext.of(frameworkCfg, qryCtx); RootQuery qry = new RootQuery<>( sql, @@ -859,12 +853,14 @@ public InjectResourcesService injectService() { } /** */ - private static void extendAccumulatorsFromPlugins(GridKernalContext ctx) { - PluginAccumulatorsExtension[] extensions = ctx.plugins().extensions(PluginAccumulatorsExtension.class); + private static FrameworkConfig frameworkCfg(GridKernalContext ctx) { + FrameworkConfig customFrameworkCfg = ctx.plugins().createComponent(FrameworkConfig.class); + customFrameworkCfg = customFrameworkCfg != null ? customFrameworkCfg : FRAMEWORK_CONFIG; - if (!F.isEmpty(extensions)) { - for (PluginAccumulatorsExtension extension : extensions) - Accumulators.addPluginAccumulatorFactories(extension.accumulatorFactories()); - } + PluginAccumulatorFactoryRegistry registry = new PluginAccumulatorFactoryRegistry(ctx); + + return Frameworks.newConfigBuilder(customFrameworkCfg) + .context(Contexts.chain(customFrameworkCfg.getContext(), Contexts.of(registry))) + .build(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactory.java similarity index 62% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactory.java index aa80d7f11c782..807fff6beb81d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactory.java @@ -17,22 +17,12 @@ package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; -import java.util.Map; import org.apache.calcite.rel.core.AggregateCall; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; -import org.apache.ignite.plugin.Extension; -import org.apache.ignite.plugin.PluginProvider; -/** Class for extending {@link Accumulators} via {@link PluginProvider plugins}. */ +/** {@link Accumulator} factory. */ @FunctionalInterface -public interface PluginAccumulatorsExtension extends Extension { - /** @return Accumulator factories by aggregate function name. Name must be non-empty and unique. */ - Map> accumulatorFactories(); - - /** */ - @FunctionalInterface - interface PluginAccumulatorFactory { - /** */ - Accumulator create(AggregateCall call, ExecutionContext ctx); - } +public interface AccumulatorFactory { + /** @return New accumulator. */ + Accumulator create(AggregateCall call, ExecutionContext ctx); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java index 8af20faf6ff85..1ecc8aea4f7a8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java @@ -24,10 +24,9 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; import org.apache.calcite.avatica.util.ByteString; @@ -39,7 +38,6 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension.PluginAccumulatorFactory; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; @@ -58,7 +56,10 @@ */ public class Accumulators { /** */ - private static final Map> PLUGIN_FACTORY_BY_NAME = new ConcurrentHashMap<>(); + static final Set BUILT_IN_AGGREGATE_NAMES = Set.of( + "COUNT", "AVG", "SUM", "$SUM0", "MIN", "EVERY", "MAX", "SOME", "SINGLE_VALUE", "LITERAL_AGG", + "ANY_VALUE", "LISTAGG", "ARRAY_AGG", "ARRAY_CONCAT_AGG", "BIT_AND", "BIT_OR", "BIT_XOR" + ); /** */ public static Supplier> accumulatorFactory(AggregateCall call, ExecutionContext ctx) { @@ -79,6 +80,7 @@ private static Supplier> accumulatorFunctionFactory( String aggFunName = call.getAggregation().getName(); + // When adding a new one, remember to add it to BUILT_IN_AGGREGATE_NAMES. return switch (aggFunName) { case "COUNT" -> () -> new LongCount<>(call, hnd); case "AVG" -> avgFactory(call, hnd); @@ -92,7 +94,7 @@ private static Supplier> accumulatorFunctionFactory( case "LISTAGG", "ARRAY_AGG", "ARRAY_CONCAT_AGG" -> listAggregateSupplier(call, ctx); case "BIT_AND", "BIT_OR", "BIT_XOR" -> bitWiseFactory(call, hnd); default -> { - PluginAccumulatorFactory factory = (PluginAccumulatorFactory) PLUGIN_FACTORY_BY_NAME.get(aggFunName); + AccumulatorFactory factory = ctx.unwrap(PluginAccumulatorFactoryRegistry.class).factory(aggFunName); if (factory == null) throw new AssertionError("Accumulator factory not found for: " + aggFunName); @@ -1508,19 +1510,4 @@ private DistinctAccumulator(AggregateCall aggCall, RowHandler hnd, Supplier return acc.returnType(typeFactory); } } - - /** */ - public static void addPluginAccumulatorFactories(Map> factoryByAggFunName) { - for (Map.Entry> e : factoryByAggFunName.entrySet()) { - String aggFunName = e.getKey().trim().toUpperCase(Locale.ROOT); - - if (aggFunName.isBlank()) - throw new AssertionError("Invalid aggregate function name: " + aggFunName); - - PluginAccumulatorFactory prev = PLUGIN_FACTORY_BY_NAME.putIfAbsent(aggFunName, e.getValue()); - -// if (prev != null) -// throw new AssertionError("Duplicate aggregate function name: " + aggFunName); - } - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorFactoryRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorFactoryRegistry.java new file mode 100644 index 0000000000000..e765d27b5b978 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorFactoryRegistry.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; + +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.Extension; +import org.apache.ignite.plugin.PluginProvider; +import org.jetbrains.annotations.Nullable; + +/** Registry for {@link AccumulatorFactory}s. */ +public class PluginAccumulatorFactoryRegistry { + /** Factory by aggregate function name. */ + private final Map> factoryByAggFunName; + + /** */ + public PluginAccumulatorFactoryRegistry(GridKernalContext ctx) { + factoryByAggFunName = factories(ctx); + } + + /** @return Plugin accumulator factory by aggregate function name or {@code null} if not found. */ + public @Nullable AccumulatorFactory factory(String aggFunName) { + return (AccumulatorFactory) factoryByAggFunName.get(aggFunName); + } + + /** Extension for getting {@link AccumulatorFactory} from {@link PluginProvider}. */ + @FunctionalInterface + public interface PluginAccumulatorFactoryExtension extends Extension { + /** + * @return Accumulator factories by aggregate function name. Name must be non-empty, unique, and not reserved. + */ + Map> factories(); + } + + /** */ + private static Map> factories(GridKernalContext ctx) { + PluginAccumulatorFactoryExtension[] extensions = ctx.plugins().extensions( + PluginAccumulatorFactoryExtension.class + ); + + if (F.isEmpty(extensions)) + return Map.of(); + + Map> res = new HashMap<>(); + + for (PluginAccumulatorFactoryExtension extension : extensions) { + for (Map.Entry> e : extension.factories().entrySet()) { + String aggFunName = e.getKey(); + + if (aggFunName.isBlank()) + throw new AssertionError("Invalid aggregate function name: " + aggFunName); + else if (Accumulators.BUILT_IN_AGGREGATE_NAMES.contains(aggFunName)) + throw new AssertionError("Aggregate function name is reserved: " + aggFunName); + + AccumulatorFactory prev = res.putIfAbsent(aggFunName, e.getValue()); + + if (prev != null) + throw new AssertionError("Duplicate aggregate function name: " + aggFunName); + } + } + + return Map.copyOf(res); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java index b53ee86118916..a4f7103a0bb4b 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java @@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators.AbstractAccumulator; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorFactoryRegistry.PluginAccumulatorFactoryExtension; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.plugin.AbstractTestPluginProvider; import org.apache.ignite.plugin.ExtensionRegistry; @@ -94,7 +94,7 @@ private static class TestPluginProvider extends AbstractTestPluginProvider { /** {@inheritDoc} */ @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { registry.registerExtension( - PluginAccumulatorsExtension.class, + PluginAccumulatorFactoryExtension.class, () -> Map.of(TEST_SUM_FUN_NAME, (call, ctx1) -> new TestSum<>(call, ctx1.rowHandler())) ); } From 65818a33950f69ce819bad04e27d71e003695474 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 24 Jun 2026 18:15:25 +0300 Subject: [PATCH 3/9] IGNITE-28813 Wip --- .../query/calcite/CalciteQueryProcessor.java | 4 +- ...y.java => AccumulatorSupplierFactory.java} | 10 +-- .../calcite/exec/exp/agg/Accumulators.java | 64 +++++++++++-------- ...ry.java => PluginAccumulatorRegistry.java} | 43 ++++++------- ...AggregatFunctionViaPluginProviderTest.java | 6 +- 5 files changed, 65 insertions(+), 62 deletions(-) rename modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/{AccumulatorFactory.java => AccumulatorSupplierFactory.java} (75%) rename modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/{PluginAccumulatorFactoryRegistry.java => PluginAccumulatorRegistry.java} (54%) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index d2df50ac51afb..7fbb0ebbda299 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -82,7 +82,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorFactoryRegistry; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorRegistry; import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig; @@ -857,7 +857,7 @@ private static FrameworkConfig frameworkCfg(GridKernalContext ctx) { FrameworkConfig customFrameworkCfg = ctx.plugins().createComponent(FrameworkConfig.class); customFrameworkCfg = customFrameworkCfg != null ? customFrameworkCfg : FRAMEWORK_CONFIG; - PluginAccumulatorFactoryRegistry registry = new PluginAccumulatorFactoryRegistry(ctx); + PluginAccumulatorRegistry registry = new PluginAccumulatorRegistry(ctx); return Frameworks.newConfigBuilder(customFrameworkCfg) .context(Contexts.chain(customFrameworkCfg.getContext(), Contexts.of(registry))) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorSupplierFactory.java similarity index 75% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactory.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorSupplierFactory.java index 807fff6beb81d..7b7970ca0b177 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorSupplierFactory.java @@ -17,12 +17,12 @@ package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; +import java.util.function.Supplier; import org.apache.calcite.rel.core.AggregateCall; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; -/** {@link Accumulator} factory. */ -@FunctionalInterface -public interface AccumulatorFactory { - /** @return New accumulator. */ - Accumulator create(AggregateCall call, ExecutionContext ctx); +/** Factory that selects and creates an accumulator supplier for an aggregate call. */ +@FunctionalInterface public interface AccumulatorSupplierFactory { + /** @return Accumulator supplier. */ + Supplier> create(AggregateCall call, ExecutionContext ctx); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java index 1ecc8aea4f7a8..40742a811eafb 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java @@ -25,7 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; @@ -56,9 +55,24 @@ */ public class Accumulators { /** */ - static final Set BUILT_IN_AGGREGATE_NAMES = Set.of( - "COUNT", "AVG", "SUM", "$SUM0", "MIN", "EVERY", "MAX", "SOME", "SINGLE_VALUE", "LITERAL_AGG", - "ANY_VALUE", "LISTAGG", "ARRAY_AGG", "ARRAY_CONCAT_AGG", "BIT_AND", "BIT_OR", "BIT_XOR" + private static final Map> BUILT_IN_FACTORY_BY_NAME = Map.ofEntries( + Map.entry("COUNT", (call, ctx) -> () -> new LongCount<>(call, ctx.rowHandler())), + Map.entry("AVG", (call, ctx) -> avgFactory(call, ctx.rowHandler())), + Map.entry("SUM", (call, ctx) -> sumFactory(call, ctx.rowHandler())), + Map.entry("$SUM0", (call, ctx) -> sumEmptyIsZeroFactory(call, ctx.rowHandler())), + Map.entry("MIN", (call, ctx) -> minFactory(call, ctx.rowHandler())), + Map.entry("EVERY", (call, ctx) -> minFactory(call, ctx.rowHandler())), + Map.entry("MAX", (call, ctx) -> maxFactory(call, ctx.rowHandler())), + Map.entry("SOME", (call, ctx) -> maxFactory(call, ctx.rowHandler())), + Map.entry("SINGLE_VALUE", (call, ctx) -> () -> new SingleVal<>(call, ctx.rowHandler())), + Map.entry("LITERAL_AGG", (call, ctx) -> () -> new LiteralVal<>(call, ctx.rowHandler())), + Map.entry("ANY_VALUE", (call, ctx) -> () -> new AnyVal<>(call, ctx.rowHandler())), + Map.entry("LISTAGG", Accumulators::listAggregateSupplier), + Map.entry("ARRAY_AGG", Accumulators::listAggregateSupplier), + Map.entry("ARRAY_CONCAT_AGG", Accumulators::listAggregateSupplier), + Map.entry("BIT_AND", (call, ctx) -> bitWiseFactory(call, ctx.rowHandler())), + Map.entry("BIT_OR", (call, ctx) -> bitWiseFactory(call, ctx.rowHandler())), + Map.entry("BIT_XOR", (call, ctx) -> bitWiseFactory(call, ctx.rowHandler())) ); /** */ @@ -76,32 +90,21 @@ private static Supplier> accumulatorFunctionFactory( AggregateCall call, ExecutionContext ctx ) { - RowHandler hnd = ctx.rowHandler(); - String aggFunName = call.getAggregation().getName(); - // When adding a new one, remember to add it to BUILT_IN_AGGREGATE_NAMES. - return switch (aggFunName) { - case "COUNT" -> () -> new LongCount<>(call, hnd); - case "AVG" -> avgFactory(call, hnd); - case "SUM" -> sumFactory(call, hnd); - case "$SUM0" -> sumEmptyIsZeroFactory(call, hnd); - case "MIN", "EVERY" -> minFactory(call, hnd); - case "MAX", "SOME" -> maxFactory(call, hnd); - case "SINGLE_VALUE" -> () -> new SingleVal<>(call, hnd); - case "LITERAL_AGG" -> () -> new LiteralVal<>(call, hnd); - case "ANY_VALUE" -> () -> new AnyVal<>(call, hnd); - case "LISTAGG", "ARRAY_AGG", "ARRAY_CONCAT_AGG" -> listAggregateSupplier(call, ctx); - case "BIT_AND", "BIT_OR", "BIT_XOR" -> bitWiseFactory(call, hnd); - default -> { - AccumulatorFactory factory = ctx.unwrap(PluginAccumulatorFactoryRegistry.class).factory(aggFunName); - - if (factory == null) - throw new AssertionError("Accumulator factory not found for: " + aggFunName); - - yield () -> factory.create(call, ctx); - } - }; + AccumulatorSupplierFactory builtInFactory = + (AccumulatorSupplierFactory) BUILT_IN_FACTORY_BY_NAME.get(aggFunName); + + if (builtInFactory != null) + return builtInFactory.create(call, ctx); + + AccumulatorSupplierFactory pluginFactory = + ctx.unwrap(PluginAccumulatorRegistry.class).factory(aggFunName); + + if (pluginFactory == null) + throw new AssertionError("Accumulator factory not found for: " + aggFunName); + + return pluginFactory.create(call, ctx); } /** */ @@ -1510,4 +1513,9 @@ private DistinctAccumulator(AggregateCall aggCall, RowHandler hnd, Supplier return acc.returnType(typeFactory); } } + + /** */ + static boolean isBuiltInAggregate(String name) { + return BUILT_IN_FACTORY_BY_NAME.containsKey(name); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorFactoryRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java similarity index 54% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorFactoryRegistry.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java index e765d27b5b978..921ae8bbae196 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorFactoryRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java @@ -25,53 +25,48 @@ import org.apache.ignite.plugin.PluginProvider; import org.jetbrains.annotations.Nullable; -/** Registry for {@link AccumulatorFactory}s. */ -public class PluginAccumulatorFactoryRegistry { +/** Registry for {@link AccumulatorSupplierFactory}s. */ +public class PluginAccumulatorRegistry { /** Factory by aggregate function name. */ - private final Map> factoryByAggFunName; + private final Map> factoryByAggFunName; /** */ - public PluginAccumulatorFactoryRegistry(GridKernalContext ctx) { + public PluginAccumulatorRegistry(GridKernalContext ctx) { factoryByAggFunName = factories(ctx); } - /** @return Plugin accumulator factory by aggregate function name or {@code null} if not found. */ - public @Nullable AccumulatorFactory factory(String aggFunName) { - return (AccumulatorFactory) factoryByAggFunName.get(aggFunName); + /** @return Plugin accumulator supplier factory by aggregate function name or {@code null} if not found. */ + public @Nullable AccumulatorSupplierFactory factory(String aggFunName) { + return (AccumulatorSupplierFactory) factoryByAggFunName.get(aggFunName); } - /** Extension for getting {@link AccumulatorFactory} from {@link PluginProvider}. */ + /** Extension for getting {@link AccumulatorSupplierFactory} from {@link PluginProvider}. */ @FunctionalInterface - public interface PluginAccumulatorFactoryExtension extends Extension { - /** - * @return Accumulator factories by aggregate function name. Name must be non-empty, unique, and not reserved. - */ - Map> factories(); + public interface AccumulatorFactoryProvider extends Extension { + /** @return Factories by aggregate function name. Name must be non-empty, unique, and not reserved. */ + Map> factories(); } /** */ - private static Map> factories(GridKernalContext ctx) { - PluginAccumulatorFactoryExtension[] extensions = ctx.plugins().extensions( - PluginAccumulatorFactoryExtension.class + private static Map> factories(GridKernalContext ctx) { + AccumulatorFactoryProvider[] extensions = ctx.plugins().extensions( + AccumulatorFactoryProvider.class ); if (F.isEmpty(extensions)) return Map.of(); - Map> res = new HashMap<>(); + Map> res = new HashMap<>(); - for (PluginAccumulatorFactoryExtension extension : extensions) { - for (Map.Entry> e : extension.factories().entrySet()) { + for (AccumulatorFactoryProvider extension : extensions) { + for (Map.Entry> e : extension.factories().entrySet()) { String aggFunName = e.getKey(); if (aggFunName.isBlank()) throw new AssertionError("Invalid aggregate function name: " + aggFunName); - else if (Accumulators.BUILT_IN_AGGREGATE_NAMES.contains(aggFunName)) + else if (Accumulators.isBuiltInAggregate(aggFunName)) throw new AssertionError("Aggregate function name is reserved: " + aggFunName); - - AccumulatorFactory prev = res.putIfAbsent(aggFunName, e.getValue()); - - if (prev != null) + else if (res.putIfAbsent(aggFunName, e.getValue()) != null) throw new AssertionError("Duplicate aggregate function name: " + aggFunName); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java index a4f7103a0bb4b..f1fdf00b4b435 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java @@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators.AbstractAccumulator; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorFactoryRegistry.PluginAccumulatorFactoryExtension; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorRegistry.AccumulatorFactoryProvider; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.plugin.AbstractTestPluginProvider; import org.apache.ignite.plugin.ExtensionRegistry; @@ -94,8 +94,8 @@ private static class TestPluginProvider extends AbstractTestPluginProvider { /** {@inheritDoc} */ @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { registry.registerExtension( - PluginAccumulatorFactoryExtension.class, - () -> Map.of(TEST_SUM_FUN_NAME, (call, ctx1) -> new TestSum<>(call, ctx1.rowHandler())) + AccumulatorFactoryProvider.class, + () -> Map.of(TEST_SUM_FUN_NAME, (call, ctx1) -> () -> new TestSum<>(call, ctx1.rowHandler())) ); } } From 872cd088f25085cec82c2254a57633733b893378 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 24 Jun 2026 18:24:42 +0300 Subject: [PATCH 4/9] IGNITE-28813 Codestyle --- .../processors/query/calcite/exec/exp/agg/Accumulators.java | 2 +- .../query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java | 2 +- .../integration/AddAggregatFunctionViaPluginProviderTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java index 40742a811eafb..b9cb4a3a75c72 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java @@ -93,7 +93,7 @@ private static Supplier> accumulatorFunctionFactory( String aggFunName = call.getAggregation().getName(); AccumulatorSupplierFactory builtInFactory = - (AccumulatorSupplierFactory) BUILT_IN_FACTORY_BY_NAME.get(aggFunName); + (AccumulatorSupplierFactory)BUILT_IN_FACTORY_BY_NAME.get(aggFunName); if (builtInFactory != null) return builtInFactory.create(call, ctx); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java index 921ae8bbae196..ed36741074a67 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java @@ -37,7 +37,7 @@ public PluginAccumulatorRegistry(GridKernalContext ctx) { /** @return Plugin accumulator supplier factory by aggregate function name or {@code null} if not found. */ public @Nullable AccumulatorSupplierFactory factory(String aggFunName) { - return (AccumulatorSupplierFactory) factoryByAggFunName.get(aggFunName); + return (AccumulatorSupplierFactory)factoryByAggFunName.get(aggFunName); } /** Extension for getting {@link AccumulatorSupplierFactory} from {@link PluginProvider}. */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java index f1fdf00b4b435..cb73d605b6a36 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java @@ -84,7 +84,7 @@ private static class TestPluginProvider extends AbstractTestPluginProvider { FrameworkConfig cfg = CalciteQueryProcessor.FRAMEWORK_CONFIG; - return (T) Frameworks.newConfigBuilder(cfg) + return (T)Frameworks.newConfigBuilder(cfg) .operatorTable(SqlOperatorTables.chain( new TestSqlOperatorTable().init(), cfg.getOperatorTable() )) From e0bf118c8815f6248f8437598db14595c8c20992 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Thu, 25 Jun 2026 10:42:19 +0300 Subject: [PATCH 5/9] IGNITE-28813 Wip --- .../query/calcite/CalciteQueryProcessor.java | 33 ++++++++++++++++--- .../exp/agg/PluginAccumulatorRegistry.java | 3 +- .../processors/query/QueryContext.java | 11 ++++--- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 7fbb0ebbda299..7023666ecb132 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -272,6 +272,9 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query /** */ private final InjectResourcesService injectSvc; + /** */ + private final PluginAccumulatorRegistry pluginAccRegistry; + /** */ private volatile boolean started; @@ -281,6 +284,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query public CalciteQueryProcessor(GridKernalContext ctx) { super(ctx); + pluginAccRegistry = new PluginAccumulatorRegistry(ctx); frameworkCfg = frameworkCfg(ctx); failureProcessor = ctx.failure(); schemaHolder = new SchemaHolderImpl(ctx, frameworkCfg); @@ -713,7 +717,7 @@ private T processQuery( if (timeout <= 0) timeout = distrCfg.defaultQueryTimeout(); - qryCtx = QueryContext.of(frameworkCfg, qryCtx); + qryCtx = QueryContext.of(frameworkCfg(qryCtx), qryCtx); RootQuery qry = new RootQuery<>( sql, @@ -853,14 +857,33 @@ public InjectResourcesService injectService() { } /** */ - private static FrameworkConfig frameworkCfg(GridKernalContext ctx) { + private FrameworkConfig frameworkCfg(GridKernalContext ctx) { FrameworkConfig customFrameworkCfg = ctx.plugins().createComponent(FrameworkConfig.class); customFrameworkCfg = customFrameworkCfg != null ? customFrameworkCfg : FRAMEWORK_CONFIG; - PluginAccumulatorRegistry registry = new PluginAccumulatorRegistry(ctx); + return withPluginAccumulatorRegistry(customFrameworkCfg, pluginAccRegistry); + } + + /** */ + private FrameworkConfig frameworkCfg(@Nullable QueryContext ctx) { + FrameworkConfig customFrameworkCfg = ctx != null ? ctx.unwrap(FrameworkConfig.class) : null; + + if (customFrameworkCfg == null || customFrameworkCfg == frameworkCfg) + return frameworkCfg; - return Frameworks.newConfigBuilder(customFrameworkCfg) - .context(Contexts.chain(customFrameworkCfg.getContext(), Contexts.of(registry))) + if (customFrameworkCfg.getContext().unwrap(PluginAccumulatorRegistry.class) != null) + return customFrameworkCfg; + + return withPluginAccumulatorRegistry(customFrameworkCfg, pluginAccRegistry); + } + + /** */ + private static FrameworkConfig withPluginAccumulatorRegistry( + FrameworkConfig cfg, + PluginAccumulatorRegistry registry + ) { + return Frameworks.newConfigBuilder(cfg) + .context(Contexts.chain(cfg.getContext(), Contexts.of(registry))) .build(); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java index ed36741074a67..6d2d2d1a74315 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.util.typedef.F; @@ -60,7 +61,7 @@ private static Map> factories(GridKernalCo for (AccumulatorFactoryProvider extension : extensions) { for (Map.Entry> e : extension.factories().entrySet()) { - String aggFunName = e.getKey(); + String aggFunName = e.getKey().trim().toUpperCase(Locale.ROOT); if (aggFunName.isBlank()) throw new AssertionError("Invalid aggregate function name: " + aggFunName); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java index a9dbe02255ece..ac7dfc2eb90e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryContext.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; /** */ public final class QueryContext { @@ -36,10 +37,10 @@ private QueryContext(Object[] params) { } /** - * Finds an instance of an interface implemented by this object, - * or returns null if this object does not support that interface. + * Finds an instance of an interface implemented by this object + * or returns {@code null} if this object does not support that interface. */ - public C unwrap(Class aClass) { + public @Nullable C unwrap(Class aClass) { if (Object[].class == aClass) return aClass.cast(params); @@ -50,12 +51,12 @@ public C unwrap(Class aClass) { * @param params Context parameters. * @return Query context. */ - public static QueryContext of(Object... params) { + public static QueryContext of(@Nullable Object... params) { return !F.isEmpty(params) ? new QueryContext(build(null, params).toArray()) : new QueryContext(EMPTY); } /** */ - private static List build(List dst, Object[] src) { + private static List build(List dst, @Nullable Object[] src) { if (dst == null) dst = new ArrayList<>(); From 9bb0577789c19aa347ad215465735c64a9c41f8e Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Tue, 30 Jun 2026 10:51:07 +0300 Subject: [PATCH 6/9] IGNITE-28813 After review #1.0 --- .../query/calcite/CalciteQueryProcessor.java | 43 ++-------- ...y.java => AccumulatorFactoryProvider.java} | 19 ++++- .../calcite/exec/exp/agg/Accumulators.java | 82 ++++++++++--------- .../exp/agg/PluginAccumulatorRegistry.java | 77 ----------------- ...AggregatFunctionViaPluginProviderTest.java | 27 +++--- 5 files changed, 78 insertions(+), 170 deletions(-) rename modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/{AccumulatorSupplierFactory.java => AccumulatorFactoryProvider.java} (57%) delete mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 7023666ecb132..ca6fbd6f390a8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -82,7 +82,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorRegistry; import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig; @@ -272,9 +271,6 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query /** */ private final InjectResourcesService injectSvc; - /** */ - private final PluginAccumulatorRegistry pluginAccRegistry; - /** */ private volatile boolean started; @@ -284,8 +280,9 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query public CalciteQueryProcessor(GridKernalContext ctx) { super(ctx); - pluginAccRegistry = new PluginAccumulatorRegistry(ctx); - frameworkCfg = frameworkCfg(ctx); + FrameworkConfig customFrameworkCfg = ctx.plugins().createComponent(FrameworkConfig.class); + frameworkCfg = customFrameworkCfg != null ? customFrameworkCfg : FRAMEWORK_CONFIG; + failureProcessor = ctx.failure(); schemaHolder = new SchemaHolderImpl(ctx, frameworkCfg); qryPlanCache = new QueryPlanCacheImpl(ctx); @@ -717,7 +714,8 @@ private T processQuery( if (timeout <= 0) timeout = distrCfg.defaultQueryTimeout(); - qryCtx = QueryContext.of(frameworkCfg(qryCtx), qryCtx); + if (frameworkCfg != FRAMEWORK_CONFIG) + qryCtx = QueryContext.of(frameworkCfg, qryCtx); RootQuery qry = new RootQuery<>( sql, @@ -855,35 +853,4 @@ public FrameworkConfig frameworkConfig() { public InjectResourcesService injectService() { return injectSvc; } - - /** */ - private FrameworkConfig frameworkCfg(GridKernalContext ctx) { - FrameworkConfig customFrameworkCfg = ctx.plugins().createComponent(FrameworkConfig.class); - customFrameworkCfg = customFrameworkCfg != null ? customFrameworkCfg : FRAMEWORK_CONFIG; - - return withPluginAccumulatorRegistry(customFrameworkCfg, pluginAccRegistry); - } - - /** */ - private FrameworkConfig frameworkCfg(@Nullable QueryContext ctx) { - FrameworkConfig customFrameworkCfg = ctx != null ? ctx.unwrap(FrameworkConfig.class) : null; - - if (customFrameworkCfg == null || customFrameworkCfg == frameworkCfg) - return frameworkCfg; - - if (customFrameworkCfg.getContext().unwrap(PluginAccumulatorRegistry.class) != null) - return customFrameworkCfg; - - return withPluginAccumulatorRegistry(customFrameworkCfg, pluginAccRegistry); - } - - /** */ - private static FrameworkConfig withPluginAccumulatorRegistry( - FrameworkConfig cfg, - PluginAccumulatorRegistry registry - ) { - return Frameworks.newConfigBuilder(cfg) - .context(Contexts.chain(cfg.getContext(), Contexts.of(registry))) - .build(); - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorSupplierFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactoryProvider.java similarity index 57% rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorSupplierFactory.java rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactoryProvider.java index 7b7970ca0b177..c1bce8b31947f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorSupplierFactory.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/AccumulatorFactoryProvider.java @@ -18,11 +18,22 @@ package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; import java.util.function.Supplier; +import org.apache.calcite.plan.Context; import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.plugin.PluginProvider; +import org.jetbrains.annotations.Nullable; -/** Factory that selects and creates an accumulator supplier for an aggregate call. */ -@FunctionalInterface public interface AccumulatorSupplierFactory { - /** @return Accumulator supplier. */ - Supplier> create(AggregateCall call, ExecutionContext ctx); +/** + * Factory that selects and creates an accumulator supplier for an aggregate call. Allows overriding standard aggregate + * functions. + * + *

It can be set via {@link PluginProvider} when creating a configuration using + * {@link PluginProvider#createComponent} via {@link Frameworks.ConfigBuilder#context(Context)}.

+ */ +@FunctionalInterface +public interface AccumulatorFactoryProvider { + /** @return Accumulator supplier, {@code null} if no accumulator is required for this aggregate call. */ + @Nullable Supplier> factory(AggregateCall call, ExecutionContext ctx); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java index b9cb4a3a75c72..085d212be25d2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java @@ -54,27 +54,6 @@ * */ public class Accumulators { - /** */ - private static final Map> BUILT_IN_FACTORY_BY_NAME = Map.ofEntries( - Map.entry("COUNT", (call, ctx) -> () -> new LongCount<>(call, ctx.rowHandler())), - Map.entry("AVG", (call, ctx) -> avgFactory(call, ctx.rowHandler())), - Map.entry("SUM", (call, ctx) -> sumFactory(call, ctx.rowHandler())), - Map.entry("$SUM0", (call, ctx) -> sumEmptyIsZeroFactory(call, ctx.rowHandler())), - Map.entry("MIN", (call, ctx) -> minFactory(call, ctx.rowHandler())), - Map.entry("EVERY", (call, ctx) -> minFactory(call, ctx.rowHandler())), - Map.entry("MAX", (call, ctx) -> maxFactory(call, ctx.rowHandler())), - Map.entry("SOME", (call, ctx) -> maxFactory(call, ctx.rowHandler())), - Map.entry("SINGLE_VALUE", (call, ctx) -> () -> new SingleVal<>(call, ctx.rowHandler())), - Map.entry("LITERAL_AGG", (call, ctx) -> () -> new LiteralVal<>(call, ctx.rowHandler())), - Map.entry("ANY_VALUE", (call, ctx) -> () -> new AnyVal<>(call, ctx.rowHandler())), - Map.entry("LISTAGG", Accumulators::listAggregateSupplier), - Map.entry("ARRAY_AGG", Accumulators::listAggregateSupplier), - Map.entry("ARRAY_CONCAT_AGG", Accumulators::listAggregateSupplier), - Map.entry("BIT_AND", (call, ctx) -> bitWiseFactory(call, ctx.rowHandler())), - Map.entry("BIT_OR", (call, ctx) -> bitWiseFactory(call, ctx.rowHandler())), - Map.entry("BIT_XOR", (call, ctx) -> bitWiseFactory(call, ctx.rowHandler())) - ); - /** */ public static Supplier> accumulatorFactory(AggregateCall call, ExecutionContext ctx) { Supplier> supplier = accumulatorFunctionFactory(call, ctx); @@ -90,21 +69,49 @@ private static Supplier> accumulatorFunctionFactory( AggregateCall call, ExecutionContext ctx ) { - String aggFunName = call.getAggregation().getName(); - - AccumulatorSupplierFactory builtInFactory = - (AccumulatorSupplierFactory)BUILT_IN_FACTORY_BY_NAME.get(aggFunName); - - if (builtInFactory != null) - return builtInFactory.create(call, ctx); - - AccumulatorSupplierFactory pluginFactory = - ctx.unwrap(PluginAccumulatorRegistry.class).factory(aggFunName); - - if (pluginFactory == null) - throw new AssertionError("Accumulator factory not found for: " + aggFunName); + RowHandler hnd = ctx.rowHandler(); - return pluginFactory.create(call, ctx); + AccumulatorFactoryProvider prov = ctx.unwrap(AccumulatorFactoryProvider.class); + + if (prov != null) { + Supplier> fac = prov.factory(call, ctx); + + if (fac != null) + return fac; + } + + switch (call.getAggregation().getName()) { + case "COUNT": + return () -> new LongCount<>(call, hnd); + case "AVG": + return avgFactory(call, hnd); + case "SUM": + return sumFactory(call, hnd); + case "$SUM0": + return sumEmptyIsZeroFactory(call, hnd); + case "MIN": + case "EVERY": + return minFactory(call, hnd); + case "MAX": + case "SOME": + return maxFactory(call, hnd); + case "SINGLE_VALUE": + return () -> new SingleVal<>(call, hnd); + case "LITERAL_AGG": + return () -> new LiteralVal<>(call, hnd); + case "ANY_VALUE": + return () -> new AnyVal<>(call, hnd); + case "LISTAGG": + case "ARRAY_AGG": + case "ARRAY_CONCAT_AGG": + return listAggregateSupplier(call, ctx); + case "BIT_AND": + case "BIT_OR": + case "BIT_XOR": + return bitWiseFactory(call, hnd); + default: + throw new AssertionError(call.getAggregation().getName()); + } } /** */ @@ -1513,9 +1520,4 @@ private DistinctAccumulator(AggregateCall aggCall, RowHandler hnd, Supplier return acc.returnType(typeFactory); } } - - /** */ - static boolean isBuiltInAggregate(String name) { - return BUILT_IN_FACTORY_BY_NAME.containsKey(name); - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java deleted file mode 100644 index 6d2d2d1a74315..0000000000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorRegistry.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; - -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.plugin.Extension; -import org.apache.ignite.plugin.PluginProvider; -import org.jetbrains.annotations.Nullable; - -/** Registry for {@link AccumulatorSupplierFactory}s. */ -public class PluginAccumulatorRegistry { - /** Factory by aggregate function name. */ - private final Map> factoryByAggFunName; - - /** */ - public PluginAccumulatorRegistry(GridKernalContext ctx) { - factoryByAggFunName = factories(ctx); - } - - /** @return Plugin accumulator supplier factory by aggregate function name or {@code null} if not found. */ - public @Nullable AccumulatorSupplierFactory factory(String aggFunName) { - return (AccumulatorSupplierFactory)factoryByAggFunName.get(aggFunName); - } - - /** Extension for getting {@link AccumulatorSupplierFactory} from {@link PluginProvider}. */ - @FunctionalInterface - public interface AccumulatorFactoryProvider extends Extension { - /** @return Factories by aggregate function name. Name must be non-empty, unique, and not reserved. */ - Map> factories(); - } - - /** */ - private static Map> factories(GridKernalContext ctx) { - AccumulatorFactoryProvider[] extensions = ctx.plugins().extensions( - AccumulatorFactoryProvider.class - ); - - if (F.isEmpty(extensions)) - return Map.of(); - - Map> res = new HashMap<>(); - - for (AccumulatorFactoryProvider extension : extensions) { - for (Map.Entry> e : extension.factories().entrySet()) { - String aggFunName = e.getKey().trim().toUpperCase(Locale.ROOT); - - if (aggFunName.isBlank()) - throw new AssertionError("Invalid aggregate function name: " + aggFunName); - else if (Accumulators.isBuiltInAggregate(aggFunName)) - throw new AssertionError("Aggregate function name is reserved: " + aggFunName); - else if (res.putIfAbsent(aggFunName, e.getValue()) != null) - throw new AssertionError("Duplicate aggregate function name: " + aggFunName); - } - } - - return Map.copyOf(res); - } -} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java index cb73d605b6a36..f451b1ddb95bc 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java @@ -18,7 +18,8 @@ package org.apache.ignite.internal.processors.query.calcite.integration; import java.util.List; -import java.util.Map; +import java.util.function.Supplier; +import org.apache.calcite.plan.Contexts; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlAggFunction; @@ -34,13 +35,13 @@ import org.apache.calcite.util.Optionality; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorFactoryProvider; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators.AbstractAccumulator; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorRegistry.AccumulatorFactoryProvider; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.plugin.AbstractTestPluginProvider; -import org.apache.ignite.plugin.ExtensionRegistry; import org.apache.ignite.plugin.PluginContext; import org.apache.ignite.plugin.PluginProvider; import org.jspecify.annotations.Nullable; @@ -88,16 +89,9 @@ private static class TestPluginProvider extends AbstractTestPluginProvider { .operatorTable(SqlOperatorTables.chain( new TestSqlOperatorTable().init(), cfg.getOperatorTable() )) + .context(Contexts.chain(cfg.getContext(), Contexts.of(new TestSumAccumulatorFactoryProvider()))) .build(); } - - /** {@inheritDoc} */ - @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { - registry.registerExtension( - AccumulatorFactoryProvider.class, - () -> Map.of(TEST_SUM_FUN_NAME, (call, ctx1) -> () -> new TestSum<>(call, ctx1.rowHandler())) - ); - } } /** */ @@ -164,4 +158,15 @@ protected TestSum(AggregateCall aggCall, RowHandler hnd) { return typeFactory.createSqlType(org.apache.calcite.sql.type.SqlTypeName.BIGINT); } } + + /** */ + private static class TestSumAccumulatorFactoryProvider implements AccumulatorFactoryProvider { + /** {@inheritDoc} */ + @Override public @Nullable Supplier> factory(AggregateCall call, ExecutionContext ctx) { + if (call.getAggregation().getName().equals(TEST_SUM_FUN_NAME)) + return () -> new TestSum<>(call, ctx.rowHandler()); + + return null; + } + } } From 585e778e40a4202e3cc1084f0d88bae0467f6294 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Tue, 30 Jun 2026 14:23:40 +0300 Subject: [PATCH 7/9] IGNITE-28813 After review #1.1 --- .../integration/AddAggregatFunctionViaPluginProviderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java index f451b1ddb95bc..b88823e620498 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java @@ -47,14 +47,14 @@ import org.jspecify.annotations.Nullable; import org.junit.Test; -/** Test for adding aggregat function via {@link PluginProvider}. */ +/** Test for adding aggregate function via {@link PluginProvider}. */ public class AddAggregatFunctionViaPluginProviderTest extends AbstractBasicIntegrationTest { /** */ private static final String TEST_SUM_FUN_NAME = "TEST_SUM"; /** {@inheritDoc} */ @Override protected int nodeCount() { - return 1; + return 2; } /** {@inheritDoc} */ From f072a56b08f57f2e2dca40f781d2985b693d56da Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Tue, 30 Jun 2026 16:48:45 +0300 Subject: [PATCH 8/9] IGNITE-28813 After review #1.2 --- ...AggregatFunctionViaPluginProviderTest.java | 172 ------------------ .../OperatorsExtensionIntegrationTest.java | 96 ++++++++++ .../testsuites/IntegrationTestSuite.java | 2 - 3 files changed, 96 insertions(+), 174 deletions(-) delete mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java deleted file mode 100644 index b88823e620498..0000000000000 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.integration; - -import java.util.List; -import java.util.function.Supplier; -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlFunctionCategory; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable; -import org.apache.calcite.sql.util.SqlOperatorTables; -import org.apache.calcite.tools.FrameworkConfig; -import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.util.Optionality; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; -import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; -import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorFactoryProvider; -import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators.AbstractAccumulator; -import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; -import org.apache.ignite.plugin.AbstractTestPluginProvider; -import org.apache.ignite.plugin.PluginContext; -import org.apache.ignite.plugin.PluginProvider; -import org.jspecify.annotations.Nullable; -import org.junit.Test; - -/** Test for adding aggregate function via {@link PluginProvider}. */ -public class AddAggregatFunctionViaPluginProviderTest extends AbstractBasicIntegrationTest { - /** */ - private static final String TEST_SUM_FUN_NAME = "TEST_SUM"; - - /** {@inheritDoc} */ - @Override protected int nodeCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return super.getConfiguration(igniteInstanceName) - .setPluginProviders(new TestPluginProvider()); - } - - /** */ - @Test - public void test() { - assertQuery("SELECT TEST_SUM(x) FROM (VALUES (1), (2), (3)) t(x)") - .returns(6L) - .check(); - } - - /** */ - private static class TestPluginProvider extends AbstractTestPluginProvider { - /** {@inheritDoc} */ - @Override public String name() { - return getClass().getName(); - } - - /** {@inheritDoc} */ - @Override public @Nullable T createComponent(PluginContext ctx, Class cls) { - if (!FrameworkConfig.class.equals(cls)) - return null; - - FrameworkConfig cfg = CalciteQueryProcessor.FRAMEWORK_CONFIG; - - return (T)Frameworks.newConfigBuilder(cfg) - .operatorTable(SqlOperatorTables.chain( - new TestSqlOperatorTable().init(), cfg.getOperatorTable() - )) - .context(Contexts.chain(cfg.getContext(), Contexts.of(new TestSumAccumulatorFactoryProvider()))) - .build(); - } - } - - /** */ - public static class TestSqlSumAggFunction extends SqlAggFunction { - /** */ - public TestSqlSumAggFunction() { - super( - TEST_SUM_FUN_NAME, - null, - SqlKind.SUM, - ReturnTypes.AGG_SUM, - null, - OperandTypes.NUMERIC, - SqlFunctionCategory.NUMERIC, - false, - false, - Optionality.FORBIDDEN - ); - } - } - - /** */ - public static class TestSqlOperatorTable extends ReflectiveSqlOperatorTable { - /** */ - @SuppressWarnings("unused") - public static final SqlAggFunction TEST_SUM = new TestSqlSumAggFunction(); - } - - /** */ - private static class TestSum extends AbstractAccumulator { - /** */ - private long sum; - - /** */ - protected TestSum(AggregateCall aggCall, RowHandler hnd) { - super(aggCall, hnd); - } - - /** {@inheritDoc} */ - @Override public void add(Row row) { - Number val = get(0, row); - - if (val != null) - sum += val.longValue(); - } - - /** {@inheritDoc} */ - @Override public void apply(Accumulator other) { - sum += ((TestSum)other).sum; - } - - /** {@inheritDoc} */ - @Override public Object end() { - return sum; - } - - /** {@inheritDoc} */ - @Override public List argumentTypes(IgniteTypeFactory typeFactory) { - return List.of(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true)); - } - - /** {@inheritDoc} */ - @Override public RelDataType returnType(IgniteTypeFactory typeFactory) { - return typeFactory.createSqlType(org.apache.calcite.sql.type.SqlTypeName.BIGINT); - } - } - - /** */ - private static class TestSumAccumulatorFactoryProvider implements AccumulatorFactoryProvider { - /** {@inheritDoc} */ - @Override public @Nullable Supplier> factory(AggregateCall call, ExecutionContext ctx) { - if (call.getAggregation().getName().equals(TEST_SUM_FUN_NAME)) - return () -> new TestSum<>(call, ctx.rowHandler()); - - return null; - } - } -} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java index 53b6d23bcf14f..655dbc911b496 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java @@ -18,12 +18,18 @@ import java.math.BigDecimal; import java.sql.Timestamp; +import java.util.List; +import java.util.function.Supplier; import com.google.common.collect.ImmutableList; import org.apache.calcite.adapter.enumerable.NullPolicy; import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; @@ -43,12 +49,19 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.Optionality; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexImpTable; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorFactoryProvider; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators; import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteConvertletTable; import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteSqlNodeRewriter; import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteSqlValidator; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.plugin.AbstractTestPluginProvider; import org.apache.ignite.plugin.PluginContext; import org.jetbrains.annotations.Nullable; @@ -75,6 +88,9 @@ public class OperatorsExtensionIntegrationTest extends AbstractBasicIntegrationT .sqlValidatorConfig( ((IgniteSqlValidator.Config)CalciteQueryProcessor.FRAMEWORK_CONFIG.getSqlValidatorConfig()) .withSqlNodeRewriter(new SqlRewriter())) + .context(Contexts.chain( + CalciteQueryProcessor.FRAMEWORK_CONFIG.getContext(), + Contexts.of(new AccumulatorFactoryProviderImpl()))) .build(); return (T)cfg; @@ -134,6 +150,14 @@ public void testOperatorsCallsInViews() { assertQuery("SELECT val_str from my_view").returns(new BigDecimal("0")).check(); } + /** */ + @Test + public void testCustomAggregateFunction() { + assertQuery("SELECT TEST_SUM(x) FROM (VALUES (1), (2), (3)) t(x)") + .returns(6L) + .check(); + } + /** Rewrites LTRIM with 2 parameters. */ public static SqlCall rewriteLtrim(SqlValidator validator, SqlCall call) { if (call.operandCount() != 2) @@ -193,6 +217,9 @@ public static class OperatorTable extends ReflectiveSqlOperatorTable { OperandTypes.STRING_STRING, SqlFunctionCategory.STRING ); + + /** */ + public static final SqlAggFunction TEST_SUM = new SqlTestSumAggFunction(); } /** Extended convertlet table. */ @@ -229,4 +256,73 @@ private static class SqlRewriter implements IgniteSqlNodeRewriter { return node; } } + + /** */ + private static class AccumulatorFactoryProviderImpl implements AccumulatorFactoryProvider { + /** {@inheritDoc} */ + @Override public @org.jspecify.annotations.Nullable Supplier> factory(AggregateCall call, ExecutionContext ctx) { + if (call.getAggregation().getName().equals(OperatorTable.TEST_SUM.getName())) + return () -> new TestSum<>(call, ctx.rowHandler()); + + return null; + } + } + + /** */ + public static class SqlTestSumAggFunction extends SqlAggFunction { + /** */ + public SqlTestSumAggFunction() { + super( + "TEST_SUM", + null, + SqlKind.SUM, + ReturnTypes.AGG_SUM, + null, + OperandTypes.NUMERIC, + SqlFunctionCategory.NUMERIC, + false, + false, + Optionality.FORBIDDEN + ); + } + } + + /** */ + private static class TestSum extends Accumulators.AbstractAccumulator { + /** */ + private long sum; + + /** */ + protected TestSum(AggregateCall aggCall, RowHandler hnd) { + super(aggCall, hnd); + } + + /** {@inheritDoc} */ + @Override public void add(Row row) { + Number val = get(0, row); + + if (val != null) + sum += val.longValue(); + } + + /** {@inheritDoc} */ + @Override public void apply(Accumulator other) { + sum += ((TestSum)other).sum; + } + + /** {@inheritDoc} */ + @Override public Object end() { + return sum; + } + + /** {@inheritDoc} */ + @Override public List argumentTypes(IgniteTypeFactory typeFactory) { + return List.of(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true)); + } + + /** {@inheritDoc} */ + @Override public RelDataType returnType(IgniteTypeFactory typeFactory) { + return typeFactory.createSqlType(org.apache.calcite.sql.type.SqlTypeName.BIGINT); + } + } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java index 177e07c04a712..f4e123c6a7c40 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.query.calcite.CancelTest; import org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest; import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest; -import org.apache.ignite.internal.processors.query.calcite.integration.AddAggregatFunctionViaPluginProviderTest; import org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.CacheStoreTest; @@ -184,7 +183,6 @@ CacheWithInterceptorIntegrationTest.class, TxThreadLockingTest.class, SelectByKeyFieldTest.class, - AddAggregatFunctionViaPluginProviderTest.class, }) public class IntegrationTestSuite { } From 780e99bea7345503176386c31da4304b9e8bfef9 Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Tue, 30 Jun 2026 17:02:40 +0300 Subject: [PATCH 9/9] IGNITE-28813 Fix codestyle --- .../calcite/integration/OperatorsExtensionIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java index 655dbc911b496..d08259d0877d6 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/OperatorsExtensionIntegrationTest.java @@ -260,7 +260,7 @@ private static class SqlRewriter implements IgniteSqlNodeRewriter { /** */ private static class AccumulatorFactoryProviderImpl implements AccumulatorFactoryProvider { /** {@inheritDoc} */ - @Override public @org.jspecify.annotations.Nullable Supplier> factory(AggregateCall call, ExecutionContext ctx) { + @Override public @Nullable Supplier> factory(AggregateCall call, ExecutionContext ctx) { if (call.getAggregation().getName().equals(OperatorTable.TEST_SUM.getName())) return () -> new TestSum<>(call, ctx.rowHandler());