From 99b2ac7e2b6705a1bf3b2d087c399bce199380ee Mon Sep 17 00:00:00 2001 From: godfreyhe Date: Mon, 26 Apr 2021 11:55:02 +0800 Subject: [PATCH] [FLINK-22463][table-planner-blink] Fix IllegalArgumentException in WindowAttachedWindowingStrategy when two phase is enabled for distinct agg --- .../WindowAttachedWindowingStrategy.java | 2 +- .../stream/WindowAggregateJsonPlanTest.java | 33 + .../jsonplan/WindowAggregateJsonITCase.java | 45 +- .../testDistinctSplitEnabled.out | 1261 +++++++++++++++++ 4 files changed, 1339 insertions(+), 2 deletions(-) create mode 100644 flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java index a28c77a6f4c52c..cb3cf7c3f61aa1 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/logical/WindowAttachedWindowingStrategy.java @@ -48,7 +48,7 @@ public WindowAttachedWindowingStrategy( @JsonProperty(FIELD_NAME_WINDOW_START) int windowStart, @JsonProperty(FIELD_NAME_WINDOW_END) int windowEnd) { super(window, timeAttributeType); - checkArgument(windowEnd >= 0 && windowStart >= 0); + checkArgument(windowEnd >= 0); this.windowStart = windowStart; this.windowEnd = windowEnd; } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java index a0bc8d9313a9af..f71f991a08b7dd 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction; import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; @@ -185,4 +186,36 @@ public void testProcTimeCumulateWindow() { + " INTERVAL '15' SECOND))\n" + "GROUP BY b, window_start, window_end"); } + + @Test + public void testDistinctSplitEnabled() { + tEnv.getConfig() + .getConfiguration() + .setBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true); + String sinkTableDdl = + "CREATE TABLE MySink (\n" + + " a bigint,\n" + + " window_start timestamp(3),\n" + + " window_end timestamp(3),\n" + + " cnt_star bigint,\n" + + " sum_b bigint,\n" + + " cnt_distinct_c bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'table-sink-class' = 'DEFAULT')"; + tEnv.executeSql(sinkTableDdl); + + util.verifyJsonPlan( + "insert into MySink select a, " + + " window_start, " + + " window_end, " + + " count(*), " + + " sum(b), " + + " count(distinct c) AS uv " + + "FROM TABLE (" + + " CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) " + + "GROUP BY a, window_start, window_end"); + } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java index 1e3e122199910e..e18d14af897aa2 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/WindowAggregateJsonITCase.java @@ -42,7 +42,7 @@ public class WindowAggregateJsonITCase extends JsonPlanTestBase { public static Object[] parameters() { return new Object[][] { new Object[] {AggregatePhaseStrategy.ONE_PHASE}, - new Object[] {AggregatePhaseStrategy.ONE_PHASE} + new Object[] {AggregatePhaseStrategy.TWO_PHASE} }; } @@ -180,4 +180,47 @@ public void testEventTimeCumulateWindow() throws Exception { "+I[null, 1]"), result); } + + @Test + public void testDistinctSplitEnabled() throws Exception { + tableEnv.getConfig() + .getConfiguration() + .setBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true); + createTestValuesSinkTable( + "MySink", "name STRING", "max_double DOUBLE", "cnt_distinct_int BIGINT"); + + String jsonPlan = + tableEnv.getJsonPlan( + "insert into MySink select name, " + + " max(`double`),\n" + + " count(distinct `int`) " + + "FROM TABLE (" + + " CUMULATE(\n" + + " TABLE MyTable,\n" + + " DESCRIPTOR(rowtime),\n" + + " INTERVAL '5' SECOND,\n" + + " INTERVAL '15' SECOND))" + + "GROUP BY name, window_start, window_end"); + tableEnv.executeJsonPlan(jsonPlan).await(); + + List result = TestValuesTableFactory.getResults("MySink"); + assertResult( + Arrays.asList( + "+I[a, 5.0, 3]", + "+I[a, 5.0, 4]", + "+I[a, 5.0, 4]", + "+I[b, 3.0, 1]", + "+I[b, 3.0, 1]", + "+I[b, 3.0, 1]", + "+I[b, 4.0, 1]", + "+I[b, 4.0, 1]", + "+I[b, 4.0, 1]", + "+I[b, 6.0, 2]", + "+I[b, 6.0, 2]", + "+I[null, 7.0, 1]", + "+I[null, 7.0, 1]", + "+I[null, 7.0, 1]"), + result); + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out new file mode 100644 index 00000000000000..f6b73fefcfc501 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out @@ -0,0 +1,1261 @@ +{ + "flinkVersion" : "", + "nodes" : [ { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan", + "scanTableSource" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MyTable" + }, + "catalogTable" : { + "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND", + "schema.4.expr" : "PROCTIME()", + "schema.0.data-type" : "INT", + "schema.2.name" : "c", + "schema.1.name" : "b", + "schema.4.name" : "proctime", + "schema.1.data-type" : "BIGINT", + "schema.3.data-type" : "TIMESTAMP(3)", + "schema.2.data-type" : "VARCHAR(2147483647)", + "schema.3.name" : "rowtime", + "connector" : "values", + "schema.watermark.0.rowtime" : "rowtime", + "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)", + "schema.3.expr" : "TO_TIMESTAMP(`c`)", + "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL", + "schema.0.name" : "a" + } + }, + "id" : 1, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "TO_TIMESTAMP", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "PROCTIME", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ ], + "type" : { + "timestampKind" : "PROCTIME", + "typeName" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false + } + } ], + "condition" : null, + "id" : 2, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, c, TO_TIMESTAMP(c) AS rowtime, PROCTIME() AS proctime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner", + "watermarkExpr" : { + "kind" : "REX_CALL", + "operator" : { + "name" : "-", + "kind" : "MINUS", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "LITERAL", + "value" : 1000, + "type" : { + "typeName" : "INTERVAL_SECOND", + "nullable" : false, + "precision" : 2, + "scale" : 6 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, + "rowtimeFieldIndex" : 3, + "id" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "proctime" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "MOD", + "kind" : "MOD", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "REX_CALL", + "operator" : { + "name" : "HASH_CODE", + "kind" : "OTHER_FUNCTION", + "syntax" : "FUNCTION" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "VARCHAR", + "nullable" : true, + "precision" : 2147483647 + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "LITERAL", + "value" : "1024", + "type" : { + "typeName" : "INTEGER", + "nullable" : false + } + } ], + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "timestampKind" : "ROWTIME", + "typeName" : "TIMESTAMP", + "nullable" : true + } + } ], + "condition" : null, + "id" : 4, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "$f5" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, c, MOD(HASH_CODE(c), 1024) AS $f5, rowtime])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate", + "grouping" : [ 0, 3 ], + "aggCalls" : [ { + "name" : null, + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : null, + "aggFunction" : { + "name" : "SUM", + "kind" : "SUM", + "syntax" : "FUNCTION" + }, + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "name" : null, + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "CumulativeWindow", + "maxSize" : "PT1H", + "step" : "PT10M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 4, + "isRowtime" : true + }, + "id" : 5, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "$f5" : "INT" + }, { + "count1$0" : "BIGINT" + }, { + "sum$1" : "BIGINT" + }, { + "count$2" : "BIGINT" + }, { + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlO3hwdnIAK29yZy5hcGFjaGUuZmxpbmsudGFibGUuYXBpLmRhdGF2aWV3Lk1hcFZpZXcAAAAAAAAAAAAAAHhwc3IAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZQAAAAAAAAABAgAFWgAOaXNJbnN0YW50aWFibGVMAAphdHRyaWJ1dGVzcQB+AAFMAAtjb21wYXJpc2lvbnQAS0xvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uO0wAE2ltcGxlbWVudGF0aW9uQ2xhc3NxAH4AA0wACXN1cGVyVHlwZXQANUxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGU7eHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Vc2VyRGVmaW5lZFR5cGUAAAAAAAAAAQIAA1oAB2lzRmluYWxMAAtkZXNjcmlwdGlvbnQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAEG9iamVjdElkZW50aWZpZXJ0ADFMb3JnL2FwYWNoZS9mbGluay90YWJsZS9jYXRhbG9nL09iamVjdElkZW50aWZpZXI7eHIAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZQAAAAAAAAABAgACWgAKaXNOdWxsYWJsZUwACHR5cGVSb290dAA2TG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZVJvb3Q7eHABfnIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZVJvb3QAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AA9TVFJVQ1RVUkVEX1RZUEUBcHABc3IAJmphdmEudXRpbC5Db2xsZWN0aW9ucyRVbm1vZGlmaWFibGVMaXN0/A8lMbXsjhACAAFMAARsaXN0cQB+AAF4cgAsamF2YS51dGlsLkNvbGxlY3Rpb25zJFVubW9kaWZpYWJsZUNvbGxlY3Rpb24ZQgCAy173HgIAAUwAAWN0ABZMamF2YS91dGlsL0NvbGxlY3Rpb247eHBzcgATamF2YS51dGlsLkFycmF5TGlzdHiB0h2Zx2GdAwABSQAEc2l6ZXhwAAAAAXcEAAAAAXNyAEdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZEF0dHJpYnV0ZQAAAAAAAAABAgADTAALZGVzY3JpcHRpb25xAH4ADEwABG5hbWVxAH4ADEwABHR5cGVxAH4ABHhwcHQAA21hcHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQAAAAAAAAABAgACTAAHa2V5VHlwZXEAfgAETAAJdmFsdWVUeXBlcQB+AAR4cQB+AA4BfnEAfgARdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgAOAX5xAH4AEXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AA4AfnEAfgARdAAGQklHSU5UeHEAfgAafnIASW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZSRTdHJ1Y3R1cmVkQ29tcGFyaXNpb24AAAAAAAAAABIAAHhxAH4AEnQABE5PTkVxAH4AB3BzcQB+ABkAAAABdwQAAAABc3IALW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuS2V5VmFsdWVEYXRhVHlwZY4kybjNPKCeAgACTAALa2V5RGF0YVR5cGV0ACdMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9EYXRhVHlwZTtMAA12YWx1ZURhdGFUeXBlcQB+AC94cQB+AAJ2cgANamF2YS51dGlsLk1hcAAAAAAAAAAAAAAAeHBxAH4AH3NyACtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkF0b21pY0RhdGFUeXBlGohTKfp6IzICAAB4cQB+AAJ2cgAmb3JnLmFwYWNoZS5mbGluay50YWJsZS5kYXRhLlN0cmluZ0RhdGEAAAAAAAAAAAAAAHhwcQB+ACNzcQB+ADN2cgAOamF2YS5sYW5nLkxvbmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwcQB+ACd4AAAUV/0AAAABAAAAAQBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5Sb3dEYXRhU2VyaWFsaXplciRSb3dEYXRhU2VyaWFsaXplclNuYXBzaG90AAAAAwAAAAGs7QAFc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5NYXBUeXBlAAAAAAAAAAECAAJMAAdrZXlUeXBldAAyTG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZTtMAAl2YWx1ZVR5cGVxAH4AAXhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgACAX5xAH4ABXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AAIAfnEAfgAFdAAGQklHSU5UABRX/QAAAAEAAAABAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLk1hcERhdGFTZXJpYWxpemVyJE1hcERhdGFTZXJpYWxpemVyU25hcHNob3QAAAADrO0ABXNyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuVmFyQ2hhclR5cGUAAAAAAAAAAQIAAUkABmxlbmd0aHhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAHVkFSQ0hBUn////+s7QAFc3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cgAwb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlAAAAAAAAAAECAAJaAAppc051bGxhYmxlTAAIdHlwZVJvb3R0ADZMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlUm9vdDt4cAB+cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlUm9vdAAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQABkJJR0lOVKztAAVzcgA9b3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5TdHJpbmdEYXRhU2VyaWFsaXplcgAAAAAAAAABAgAAeHIAQm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5UeXBlU2VyaWFsaXplclNpbmdsZXRvbnmph6rHLndFAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwrO0ABXNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhyAEJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuVHlwZVNlcmlhbGl6ZXJTaW5nbGV0b255qYeqxy53RQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4cA==')" + }, { + "$slice_end" : "BIGINT" + } ] + }, + "description" : "LocalWindowAggregate(groupBy=[a, $f5], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], select=[a, $f5, COUNT(*) AS count1$0, SUM(b) AS sum$1, COUNT(distinct$0 c) AS count$2, DISTINCT(c) AS distinct$0, slice_end('w$) AS $slice_end])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 6, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "$f5" : "INT" + }, { + "count1$0" : "BIGINT" + }, { + "sum$1" : "BIGINT" + }, { + "count$2" : "BIGINT" + }, { + "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlO3hwdnIAK29yZy5hcGFjaGUuZmxpbmsudGFibGUuYXBpLmRhdGF2aWV3Lk1hcFZpZXcAAAAAAAAAAAAAAHhwc3IAM29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZQAAAAAAAAABAgAFWgAOaXNJbnN0YW50aWFibGVMAAphdHRyaWJ1dGVzcQB+AAFMAAtjb21wYXJpc2lvbnQAS0xvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZENvbXBhcmlzaW9uO0wAE2ltcGxlbWVudGF0aW9uQ2xhc3NxAH4AA0wACXN1cGVyVHlwZXQANUxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvU3RydWN0dXJlZFR5cGU7eHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Vc2VyRGVmaW5lZFR5cGUAAAAAAAAAAQIAA1oAB2lzRmluYWxMAAtkZXNjcmlwdGlvbnQAEkxqYXZhL2xhbmcvU3RyaW5nO0wAEG9iamVjdElkZW50aWZpZXJ0ADFMb3JnL2FwYWNoZS9mbGluay90YWJsZS9jYXRhbG9nL09iamVjdElkZW50aWZpZXI7eHIAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZQAAAAAAAAABAgACWgAKaXNOdWxsYWJsZUwACHR5cGVSb290dAA2TG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZVJvb3Q7eHABfnIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5Mb2dpY2FsVHlwZVJvb3QAAAAAAAAAABIAAHhyAA5qYXZhLmxhbmcuRW51bQAAAAAAAAAAEgAAeHB0AA9TVFJVQ1RVUkVEX1RZUEUBcHABc3IAJmphdmEudXRpbC5Db2xsZWN0aW9ucyRVbm1vZGlmaWFibGVMaXN0/A8lMbXsjhACAAFMAARsaXN0cQB+AAF4cgAsamF2YS51dGlsLkNvbGxlY3Rpb25zJFVubW9kaWZpYWJsZUNvbGxlY3Rpb24ZQgCAy173HgIAAUwAAWN0ABZMamF2YS91dGlsL0NvbGxlY3Rpb247eHBzcgATamF2YS51dGlsLkFycmF5TGlzdHiB0h2Zx2GdAwABSQAEc2l6ZXhwAAAAAXcEAAAAAXNyAEdvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuU3RydWN0dXJlZFR5cGUkU3RydWN0dXJlZEF0dHJpYnV0ZQAAAAAAAAABAgADTAALZGVzY3JpcHRpb25xAH4ADEwABG5hbWVxAH4ADEwABHR5cGVxAH4ABHhwcHQAA21hcHNyACxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTWFwVHlwZQAAAAAAAAABAgACTAAHa2V5VHlwZXEAfgAETAAJdmFsdWVUeXBlcQB+AAR4cQB+AA4BfnEAfgARdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgAOAX5xAH4AEXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AA4AfnEAfgARdAAGQklHSU5UeHEAfgAafnIASW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5TdHJ1Y3R1cmVkVHlwZSRTdHJ1Y3R1cmVkQ29tcGFyaXNpb24AAAAAAAAAABIAAHhxAH4AEnQABE5PTkVxAH4AB3BzcQB+ABkAAAABdwQAAAABc3IALW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuS2V5VmFsdWVEYXRhVHlwZY4kybjNPKCeAgACTAALa2V5RGF0YVR5cGV0ACdMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9EYXRhVHlwZTtMAA12YWx1ZURhdGFUeXBlcQB+AC94cQB+AAJ2cgANamF2YS51dGlsLk1hcAAAAAAAAAAAAAAAeHBxAH4AH3NyACtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLkF0b21pY0RhdGFUeXBlGohTKfp6IzICAAB4cQB+AAJ2cgAmb3JnLmFwYWNoZS5mbGluay50YWJsZS5kYXRhLlN0cmluZ0RhdGEAAAAAAAAAAAAAAHhwcQB+ACNzcQB+ADN2cgAOamF2YS5sYW5nLkxvbmc7i+SQzI8j3wIAAUoABXZhbHVleHIAEGphdmEubGFuZy5OdW1iZXKGrJUdC5TgiwIAAHhwcQB+ACd4AAAUV/0AAAABAAAAAQBUb3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5Sb3dEYXRhU2VyaWFsaXplciRSb3dEYXRhU2VyaWFsaXplclNuYXBzaG90AAAAAwAAAAGs7QAFc3IALG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5NYXBUeXBlAAAAAAAAAAECAAJMAAdrZXlUeXBldAAyTG9yZy9hcGFjaGUvZmxpbmsvdGFibGUvdHlwZXMvbG9naWNhbC9Mb2dpY2FsVHlwZTtMAAl2YWx1ZVR5cGVxAH4AAXhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAADTUFQc3IAMG9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5WYXJDaGFyVHlwZQAAAAAAAAABAgABSQAGbGVuZ3RoeHEAfgACAX5xAH4ABXQAB1ZBUkNIQVJ/////c3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cQB+AAIAfnEAfgAFdAAGQklHSU5UABRX/QAAAAEAAAABAFRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLk1hcERhdGFTZXJpYWxpemVyJE1hcERhdGFTZXJpYWxpemVyU25hcHNob3QAAAADrO0ABXNyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuVmFyQ2hhclR5cGUAAAAAAAAAAQIAAUkABmxlbmd0aHhyADBvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGUAAAAAAAAAAQIAAloACmlzTnVsbGFibGVMAAh0eXBlUm9vdHQANkxvcmcvYXBhY2hlL2ZsaW5rL3RhYmxlL3R5cGVzL2xvZ2ljYWwvTG9naWNhbFR5cGVSb290O3hwAX5yADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnR5cGVzLmxvZ2ljYWwuTG9naWNhbFR5cGVSb290AAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAHVkFSQ0hBUn////+s7QAFc3IAL29yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMubG9naWNhbC5CaWdJbnRUeXBlAAAAAAAAAAECAAB4cgAwb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlAAAAAAAAAAECAAJaAAppc051bGxhYmxlTAAIdHlwZVJvb3R0ADZMb3JnL2FwYWNoZS9mbGluay90YWJsZS90eXBlcy9sb2dpY2FsL0xvZ2ljYWxUeXBlUm9vdDt4cAB+cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5sb2dpY2FsLkxvZ2ljYWxUeXBlUm9vdAAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQABkJJR0lOVKztAAVzcgA9b3JnLmFwYWNoZS5mbGluay50YWJsZS5ydW50aW1lLnR5cGV1dGlscy5TdHJpbmdEYXRhU2VyaWFsaXplcgAAAAAAAAABAgAAeHIAQm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5UeXBlU2VyaWFsaXplclNpbmdsZXRvbnmph6rHLndFAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwrO0ABXNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhyAEJvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuVHlwZVNlcmlhbGl6ZXJTaW5nbGV0b255qYeqxy53RQIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4cA==')" + }, { + "$slice_end" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[hash[a, $f5]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate", + "grouping" : [ 0, 1 ], + "aggCalls" : [ { + "name" : null, + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : null, + "aggFunction" : { + "name" : "SUM", + "kind" : "SUM", + "syntax" : "FUNCTION" + }, + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "name" : null, + "aggFunction" : { + "name" : "COUNT", + "kind" : "COUNT", + "syntax" : "FUNCTION_STAR" + }, + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "SliceAttached", + "window" : { + "type" : "CumulativeWindow", + "maxSize" : "PT1H", + "step" : "PT10M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "sliceEnd" : 6, + "isRowtime" : true + }, + "namedWindowProperties" : [ { + "name" : "window_start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "id" : 7, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "localAggInputRowType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "b" : "BIGINT" + }, { + "c" : "VARCHAR(2147483647)" + }, { + "$f5" : "INT" + }, { + "rowtime" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "$f5" : "INT" + }, { + "$f2" : "BIGINT NOT NULL" + }, { + "$f3" : "BIGINT" + }, { + "$f4" : "BIGINT NOT NULL" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "GlobalWindowAggregate(groupBy=[a, $f5], window=[CUMULATE(slice_end=[$slice_end], max_size=[1 h], step=[10 min])], select=[a, $f5, COUNT(count1$0) AS $f2, SUM(sum$1) AS $f3, COUNT(distinct$0 count$2) AS $f4, start('w$) AS window_start, end('w$) AS window_end])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "condition" : null, + "id" : 8, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "$f5" : "INT" + }, { + "$f4" : "BIGINT NOT NULL" + }, { + "$f5_0" : "BIGINT" + }, { + "$f6" : "BIGINT NOT NULL" + } ] + }, + "description" : "Calc(select=[a, window_start, window_end, $f5, $f2 AS $f4, $f3 AS $f5_0, $f4 AS $f6])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : null, + "aggFunction" : { + "name" : "$SUM0", + "kind" : "SUM0", + "syntax" : "FUNCTION" + }, + "argList" : [ 4 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : null, + "aggFunction" : { + "name" : "SUM", + "kind" : "SUM", + "syntax" : "FUNCTION" + }, + "argList" : [ 5 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "name" : null, + "aggFunction" : { + "name" : "$SUM0", + "kind" : "SUM0", + "syntax" : "FUNCTION" + }, + "argList" : [ 6 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "CumulativeWindow", + "maxSize" : "PT1H", + "step" : "PT10M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 1, + "windowEnd" : 2, + "isRowtime" : true + }, + "id" : 9, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "sum$0" : "BIGINT" + }, { + "sum$1" : "BIGINT" + }, { + "sum$2" : "BIGINT" + }, { + "$window_end" : "BIGINT" + } ] + }, + "description" : "LocalWindowAggregate(groupBy=[a], window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], select=[a, $SUM0($f4) AS sum$0, SUM($f5_0) AS sum$1, $SUM0($f6) AS sum$2, slice_end('w$) AS $window_end])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange", + "id" : 10, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "sum$0" : "BIGINT" + }, { + "sum$1" : "BIGINT" + }, { + "sum$2" : "BIGINT" + }, { + "$window_end" : "BIGINT" + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate", + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : null, + "aggFunction" : { + "name" : "$SUM0", + "kind" : "SUM0", + "syntax" : "FUNCTION" + }, + "argList" : [ 4 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + }, { + "name" : null, + "aggFunction" : { + "name" : "SUM", + "kind" : "SUM", + "syntax" : "FUNCTION" + }, + "argList" : [ 5 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "name" : null, + "aggFunction" : { + "name" : "$SUM0", + "kind" : "SUM0", + "syntax" : "FUNCTION" + }, + "argList" : [ 6 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "windowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "CumulativeWindow", + "maxSize" : "PT1H", + "step" : "PT10M" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : -1, + "windowEnd" : 4, + "isRowtime" : true + }, + "namedWindowProperties" : [ { + "name" : "window_start", + "property" : { + "kind" : "WindowStart", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + }, { + "name" : "window_end", + "property" : { + "kind" : "WindowEnd", + "reference" : { + "name" : "w$", + "type" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "ROWTIME" + } + } + } + } ], + "id" : 11, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "localAggInputRowType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "$f5" : "INT" + }, { + "$f4" : "BIGINT NOT NULL" + }, { + "$f5_0" : "BIGINT" + }, { + "$f6" : "BIGINT NOT NULL" + } ] + }, + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "INT" + }, { + "$f1" : "BIGINT NOT NULL" + }, { + "$f2" : "BIGINT" + }, { + "$f3" : "BIGINT NOT NULL" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "REGULAR" + } + } ] + }, + "description" : "GlobalWindowAggregate(groupBy=[a], window=[CUMULATE(win_end=[$window_end], max_size=[1 h], step=[10 min])], select=[a, $SUM0(sum$0) AS $f1, SUM(sum$1) AS $f2, $SUM0(sum$2) AS $f3, start('w$) AS window_start, end('w$) AS window_end])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc", + "projection" : [ { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : { + "typeName" : "INTEGER", + "nullable" : true + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : false, + "precision" : 3 + } + } ], + "type" : { + "typeName" : "TIMESTAMP", + "nullable" : true, + "precision" : 3 + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + }, { + "kind" : "REX_CALL", + "operator" : { + "name" : "CAST", + "kind" : "CAST", + "syntax" : "SPECIAL" + }, + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : { + "typeName" : "BIGINT", + "nullable" : false + } + } ], + "type" : { + "typeName" : "BIGINT", + "nullable" : true + } + } ], + "condition" : null, + "id" : 12, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt_star" : "BIGINT" + }, { + "sum_b" : "BIGINT" + }, { + "cnt_distinct_c" : "BIGINT" + } ] + }, + "description" : "Calc(select=[CAST(a) AS a, CAST(window_start) AS window_start, CAST(window_end) AS window_end, CAST($f1) AS cnt_star, $f2 AS sum_b, CAST($f3) AS cnt_distinct_c])" + }, { + "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", + "dynamicTableSink" : { + "identifier" : { + "catalogName" : "default_catalog", + "databaseName" : "default_database", + "tableName" : "MySink" + }, + "catalogTable" : { + "schema.5.name" : "cnt_distinct_c", + "sink-insert-only" : "false", + "schema.0.data-type" : "BIGINT", + "schema.2.name" : "window_end", + "schema.1.name" : "window_start", + "schema.4.name" : "sum_b", + "schema.1.data-type" : "TIMESTAMP(3)", + "schema.3.data-type" : "BIGINT", + "table-sink-class" : "DEFAULT", + "schema.2.data-type" : "TIMESTAMP(3)", + "schema.3.name" : "cnt_star", + "connector" : "values", + "schema.5.data-type" : "BIGINT", + "schema.4.data-type" : "BIGINT", + "schema.0.name" : "a" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "id" : 13, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "nullable" : true, + "fields" : [ { + "a" : "BIGINT" + }, { + "window_start" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "window_end" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : true, + "precision" : 3, + "kind" : "REGULAR" + } + }, { + "cnt_star" : "BIGINT" + }, { + "sum_b" : "BIGINT" + }, { + "cnt_distinct_c" : "BIGINT" + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, window_start, window_end, cnt_star, sum_b, cnt_distinct_c])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file