From 7cacc8ff70fedfb22e7a379c66316249dc57a107 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Fri, 10 Jan 2025 16:37:11 +0800 Subject: [PATCH 1/4] [FLINK-35325][transform] Skip insufficient_quota error when running test case using ad model. --- .../flink/FlinkPipelineUdfITCase.java | 23 +++++++++++-------- .../runtime/model/TestOpenAIChatModel.java | 12 ++++++++-- .../model/TestOpenAIEmbeddingModel.java | 13 ++++++++--- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index 5cf424cd379..b6b8ff563a0 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -54,7 +54,9 @@ import java.util.stream.Stream; import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; /** Integration test for UDFs. */ @@ -896,15 +898,18 @@ void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception { // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); - execution.execute(); - - // Check the order and content of all received events - String[] outputEvents = outCaptor.toString().trim().split("\n"); - assertThat(outputEvents) - .contains( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`emb` STRING}, primaryKeys=col1, options=({key1=value1})}") - // The result of transform by model is not fixed. - .hasSize(9); + assertThatThrownBy( + () -> { + execution.execute(); + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .contains( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`emb` STRING}, primaryKeys=col1, options=({key1=value1})}") + // The result of transform by model is not fixed. + .hasSize(9); + }) + .satisfies(anyCauseMatches("quota")); } private static Stream testParams() { diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java index bba2d8b25ba..c5c952abb77 100644 --- a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java +++ b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java @@ -23,6 +23,10 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + /** A test for {@link OpenAIChatModel}. */ public class TestOpenAIChatModel { @Test @@ -34,7 +38,11 @@ public void testEval() { configuration.set(ModelOptions.OPENAI_MODEL_NAME, "gpt-4o-mini"); UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration; openAIChatModel.open(userDefinedFunctionContext); - String response = openAIChatModel.eval("Who invented the electric light?"); - Assertions.assertFalse(response.isEmpty()); + assertThatThrownBy( + () -> { + String response = openAIChatModel.eval("Who invented the electric light?"); + Assertions.assertFalse(response.isEmpty()); + }) + .satisfies(anyCauseMatches("quota")); } } diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java index 118fb562824..71198960134 100644 --- a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java +++ b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java @@ -24,6 +24,9 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + /** A test for {@link OpenAIEmbeddingModel}. */ public class TestOpenAIEmbeddingModel { @@ -36,8 +39,12 @@ public void testEval() { configuration.set(ModelOptions.OPENAI_MODEL_NAME, "text-embedding-3-small"); UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration; openAIEmbeddingModel.open(userDefinedFunctionContext); - ArrayData arrayData = - openAIEmbeddingModel.eval("Flink CDC is a streaming data integration tool"); - Assertions.assertNotNull(arrayData); + assertThatThrownBy( + () -> { + ArrayData arrayData = + openAIEmbeddingModel.eval("Flink CDC is a streaming data integration tool"); + Assertions.assertNotNull(arrayData); + }) + .satisfies(anyCauseMatches("quota")); } } From 523c2ad62ba93e6080adff4a8acc5abb4b564942 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Fri, 10 Jan 2025 16:37:11 +0800 Subject: [PATCH 2/4] fix checkstyle. --- .../flink/cdc/runtime/model/TestOpenAIChatModel.java | 10 +++++----- .../cdc/runtime/model/TestOpenAIEmbeddingModel.java | 11 ++++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java index c5c952abb77..c81aef44198 100644 --- a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java +++ b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** A test for {@link OpenAIChatModel}. */ @@ -39,10 +38,11 @@ public void testEval() { UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration; openAIChatModel.open(userDefinedFunctionContext); assertThatThrownBy( - () -> { - String response = openAIChatModel.eval("Who invented the electric light?"); - Assertions.assertFalse(response.isEmpty()); - }) + () -> { + String response = + openAIChatModel.eval("Who invented the electric light?"); + Assertions.assertFalse(response.isEmpty()); + }) .satisfies(anyCauseMatches("quota")); } } diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java index 71198960134..a933663c48b 100644 --- a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java +++ b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java @@ -40,11 +40,12 @@ public void testEval() { UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration; openAIEmbeddingModel.open(userDefinedFunctionContext); assertThatThrownBy( - () -> { - ArrayData arrayData = - openAIEmbeddingModel.eval("Flink CDC is a streaming data integration tool"); - Assertions.assertNotNull(arrayData); - }) + () -> { + ArrayData arrayData = + openAIEmbeddingModel.eval( + "Flink CDC is a streaming data integration tool"); + Assertions.assertNotNull(arrayData); + }) .satisfies(anyCauseMatches("quota")); } } From c1a4247c471f6c89d3475b1fb3cd34a643d2897d Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Fri, 10 Jan 2025 16:48:40 +0800 Subject: [PATCH 3/4] Address comment. --- .../flink/FlinkPipelineUdfITCase.java | 24 ++++++++----------- .../runtime/model/TestOpenAIChatModel.java | 14 ++++------- .../model/TestOpenAIEmbeddingModel.java | 16 ++++--------- 3 files changed, 19 insertions(+), 35 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index b6b8ff563a0..a56d57c5340 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -54,9 +55,7 @@ import java.util.stream.Stream; import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; -import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; /** Integration test for UDFs. */ @@ -841,6 +840,7 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) th @ParameterizedTest @MethodSource("testParams") + @Disabled("For manual test as there is a limit for quota.") void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); @@ -898,18 +898,14 @@ void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception { // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); - assertThatThrownBy( - () -> { - execution.execute(); - // Check the order and content of all received events - String[] outputEvents = outCaptor.toString().trim().split("\n"); - assertThat(outputEvents) - .contains( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`emb` STRING}, primaryKeys=col1, options=({key1=value1})}") - // The result of transform by model is not fixed. - .hasSize(9); - }) - .satisfies(anyCauseMatches("quota")); + execution.execute(); + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .contains( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`emb` STRING}, primaryKeys=col1, options=({key1=value1})}") + // The result of transform by model is not fixed. + .hasSize(9); } private static Stream testParams() { diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java index c81aef44198..cf275935d84 100644 --- a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java +++ b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java @@ -21,14 +21,13 @@ import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - /** A test for {@link OpenAIChatModel}. */ public class TestOpenAIChatModel { @Test + @Disabled("For manual test as there is a limit for quota.") public void testEval() { OpenAIChatModel openAIChatModel = new OpenAIChatModel(); Configuration configuration = new Configuration(); @@ -37,12 +36,7 @@ public void testEval() { configuration.set(ModelOptions.OPENAI_MODEL_NAME, "gpt-4o-mini"); UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration; openAIChatModel.open(userDefinedFunctionContext); - assertThatThrownBy( - () -> { - String response = - openAIChatModel.eval("Who invented the electric light?"); - Assertions.assertFalse(response.isEmpty()); - }) - .satisfies(anyCauseMatches("quota")); + String response = openAIChatModel.eval("Who invented the electric light?"); + Assertions.assertFalse(response.isEmpty()); } } diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java index a933663c48b..f6003dd0a69 100644 --- a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java +++ b/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java @@ -22,15 +22,14 @@ import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - /** A test for {@link OpenAIEmbeddingModel}. */ public class TestOpenAIEmbeddingModel { @Test + @Disabled("For manual test as there is a limit for quota.") public void testEval() { OpenAIEmbeddingModel openAIEmbeddingModel = new OpenAIEmbeddingModel(); Configuration configuration = new Configuration(); @@ -39,13 +38,8 @@ public void testEval() { configuration.set(ModelOptions.OPENAI_MODEL_NAME, "text-embedding-3-small"); UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration; openAIEmbeddingModel.open(userDefinedFunctionContext); - assertThatThrownBy( - () -> { - ArrayData arrayData = - openAIEmbeddingModel.eval( - "Flink CDC is a streaming data integration tool"); - Assertions.assertNotNull(arrayData); - }) - .satisfies(anyCauseMatches("quota")); + ArrayData arrayData = + openAIEmbeddingModel.eval("Flink CDC is a streaming data integration tool"); + Assertions.assertNotNull(arrayData); } } From c2b75c57042a9eb7d59fd40c5b01c709b113c14b Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Fri, 10 Jan 2025 17:05:36 +0800 Subject: [PATCH 4/4] Address comment. --- .../apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java index a56d57c5340..f3161105e42 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java @@ -899,6 +899,7 @@ void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception { // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); execution.execute(); + // Check the order and content of all received events String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents)