From f651c64d3b1286dc87e8262be415f49e8f3f2345 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Tue, 13 Aug 2024 16:14:36 +0800 Subject: [PATCH 1/2] transform rules support substring function --- .../content.zh/docs/core-concept/transform.md | 1 + docs/content/docs/core-concept/transform.md | 1 + .../functions/SystemFunctionUtils.java | 57 ++++++++++++++++++- .../metadata/TransformSqlOperatorTable.java | 13 +++++ .../transform/PostTransformOperatorTest.java | 13 ++++- 5 files changed, 82 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 62f8f210eb3..82f8bfcd9d2 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -125,6 +125,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | LOWER(string) | lower(string) | Returns string in lowercase. | | TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | +| SUBSTR(string FROM integer1 [ FOR integer2 ]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | | SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | | CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 62f8f210eb3..82f8bfcd9d2 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -125,6 +125,7 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | LOWER(string) | lower(string) | Returns string in lowercase. | | TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | +| SUBSTR(string FROM integer1 [ FOR integer2 ]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | | SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | | CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index ba569fc0890..649b9b8cce7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -361,11 +361,64 @@ public static boolean notLike(String str, String regex) { } public static String substr(String str, int beginIndex) { - return str.substring(beginIndex); + return substring(str, beginIndex); } public static String substr(String str, int beginIndex, int length) { - return str.substring(beginIndex, beginIndex + length); + return substring(str, beginIndex, length); + } + + public static String substring(String str, int beginIndex) { + return substring(str, beginIndex, Integer.MAX_VALUE); + } + + public static String substring(String str, int beginIndex, int length) { + if (length < 0) { + LOG.error( + "length of 'substring(str, beginIndex, length)' must be >= 0 and Int type, but length = {}", + length); + throw new RuntimeException( + "length of 'substring(str, beginIndex, length)' must be >= 0 and Int type, but length = " + + length); + } + if (length > Integer.MAX_VALUE || beginIndex > Integer.MAX_VALUE) { + LOG.error( + "length or start of 'substring(str, beginIndex, length)' must be Int type, but length = {}, beginIndex = {}", + beginIndex, + length); + throw new RuntimeException( + "length or start of 'substring(str, beginIndex, length)' must be Int type, but length = " + + beginIndex + + ", beginIndex = " + + length); + } + if (str.isEmpty()) { + return ""; + } + + int startPos; + int endPos; + + if (beginIndex > 0) { + startPos = beginIndex - 1; + if (startPos >= str.length()) { + return ""; + } + } else if (beginIndex < 0) { + startPos = str.length() + beginIndex; + if (startPos < 0) { + return ""; + } + } else { + startPos = 0; + } + + if ((str.length() - startPos) < length) { + endPos = str.length(); + } else { + endPos = startPos + length; + } + return str.substring(startPos, endPos); } public static String upper(String str) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 658550da0c9..2c25c9c0fde 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -191,6 +191,19 @@ public void lookupOperatorOverloads( SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER)), SqlFunctionCategory.STRING); + public static final SqlFunction SUBSTRING = + new SqlFunction( + "SUBSTRING", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE, + null, + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER), + OperandTypes.family( + SqlTypeFamily.CHARACTER, + SqlTypeFamily.INTEGER, + SqlTypeFamily.INTEGER)), + SqlFunctionCategory.STRING); // ------------------ // Temporal Functions diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 067842c313a..8aaa31cd1ce 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -1499,7 +1499,18 @@ void testBuildInFunctionTransform() throws Exception { testExpressionConditionTransform("concat('123', 'abc') = '123abc'"); testExpressionConditionTransform("upper('abc') = 'ABC'"); testExpressionConditionTransform("lower('ABC') = 'abc'"); - testExpressionConditionTransform("SUBSTR('ABC', 1, 1) = 'B'"); + testExpressionConditionTransform("SUBSTR('ABC', -1) = 'C'"); + testExpressionConditionTransform("SUBSTR('ABC', -2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTR('ABC', 0) = 'ABC'"); + testExpressionConditionTransform("SUBSTR('ABC', 1) = 'ABC'"); + testExpressionConditionTransform("SUBSTR('ABC', 2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTR('ABC', 2, 100) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC', -1) = 'C'"); + testExpressionConditionTransform("SUBSTRING('ABC', -2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 0) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 1) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 2, 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC', 2, 100) = 'BC'"); testExpressionConditionTransform("'ABC' like '^[a-zA-Z]'"); testExpressionConditionTransform("'123' not like '^[a-zA-Z]'"); testExpressionConditionTransform("abs(2) = 2"); From e0651d15341e2706b0bd3579c06609761a995ac2 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Tue, 13 Aug 2024 19:57:49 +0800 Subject: [PATCH 2/2] fix comment --- .../content.zh/docs/core-concept/transform.md | 4 ++-- docs/content/docs/core-concept/transform.md | 20 +++++++++---------- .../metadata/TransformSqlOperatorTable.java | 14 +------------ .../transform/PostTransformOperatorTest.java | 6 ++++++ 4 files changed, 19 insertions(+), 25 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 82f8bfcd9d2..8eb9b884942 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -125,8 +125,8 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | LOWER(string) | lower(string) | Returns string in lowercase. | | TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | -| SUBSTR(string FROM integer1 [ FOR integer2 ]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | -| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | +| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | | CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | ## Temporal Functions diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index 82f8bfcd9d2..dfccb63c15c 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -117,17 +117,17 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ ## String Functions -| Function | Janino Code | Description | -| -------------------- | ------------------------ | ------------------------------------------------- | -| string1 || string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. | -| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. | -| UPPER(string) | upper(string) | Returns string in uppercase. | -| LOWER(string) | lower(string) | Returns string in lowercase. | -| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | +| Function | Janino Code | Description | +| -------------------- | ------------------------ |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| string1 || string2 | concat(string1, string2) | Returns the concatenation of STRING1 and STRING2. | +| CHAR_LENGTH(string) | charLength(string) | Returns the number of characters in STRING. | +| UPPER(string) | upper(string) | Returns string in uppercase. | +| LOWER(string) | lower(string) | Returns string in lowercase. | +| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes whitespaces at both sides. | | REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, string3) | Returns a string from STRING1 with all the substrings that match a regular expression STRING2 consecutively being replaced with STRING3. E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". | -| SUBSTR(string FROM integer1 [ FOR integer2 ]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | -| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position INT1 with length INT2 (to the end by default). | -| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | +| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | substring(string,integer1,integer2) | Returns a substring of STRING starting from position integer1 with length integer2 (to the end by default). | +| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'. | ## Temporal Functions diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 2c25c9c0fde..986337c8a3f 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -191,19 +191,7 @@ public void lookupOperatorOverloads( SqlTypeFamily.INTEGER, SqlTypeFamily.INTEGER)), SqlFunctionCategory.STRING); - public static final SqlFunction SUBSTRING = - new SqlFunction( - "SUBSTRING", - SqlKind.OTHER_FUNCTION, - TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE, - null, - OperandTypes.or( - OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER), - OperandTypes.family( - SqlTypeFamily.CHARACTER, - SqlTypeFamily.INTEGER, - SqlTypeFamily.INTEGER)), - SqlFunctionCategory.STRING); + public static final SqlFunction SUBSTRING = SqlStdOperatorTable.SUBSTRING; // ------------------ // Temporal Functions diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 8aaa31cd1ce..9e0e9843c33 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -1511,6 +1511,12 @@ void testBuildInFunctionTransform() throws Exception { testExpressionConditionTransform("SUBSTRING('ABC', 1) = 'ABC'"); testExpressionConditionTransform("SUBSTRING('ABC', 2, 2) = 'BC'"); testExpressionConditionTransform("SUBSTRING('ABC', 2, 100) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM -1) = 'C'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM -2 FOR 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 0) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 1) = 'ABC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 2) = 'BC'"); + testExpressionConditionTransform("SUBSTRING('ABC' FROM 2 FOR 100) = 'BC'"); testExpressionConditionTransform("'ABC' like '^[a-zA-Z]'"); testExpressionConditionTransform("'123' not like '^[a-zA-Z]'"); testExpressionConditionTransform("abs(2) = 2");