From 0cfd59ffd08a87e782328fd3c2b292dbd82c1f92 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 2 May 2024 14:51:44 +0800 Subject: [PATCH 1/3] [FLINK-35281][hotfix][cdc-common] FlinkEnvironmentUtils#addJar add each jar only once --- .../composer/flink/FlinkEnvironmentUtils.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java index 1f1034c09b4..9b0df628950 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java @@ -22,29 +22,48 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.lang.reflect.Field; import java.net.URL; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** Utilities for {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */ @Internal public class FlinkEnvironmentUtils { + private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvironmentUtils.class); /** * Add the specified JAR to {@link StreamExecutionEnvironment} so that the JAR will be uploaded * together with the job graph. */ public static void addJar(StreamExecutionEnvironment env, URL jarUrl) { + addJar(env, Lists.newArrayList(jarUrl)); + } + + /** + * Add the specified JARs to {@link StreamExecutionEnvironment} so that the JAR will be uploaded + * together with the job graph. + */ + public static void addJar(StreamExecutionEnvironment env, List jarUrls) { try { Class envClass = StreamExecutionEnvironment.class; Field field = envClass.getDeclaredField("configuration"); field.setAccessible(true); Configuration configuration = ((Configuration) field.get(env)); - List jars = - configuration.getOptional(PipelineOptions.JARS).orElse(new ArrayList<>()); - jars.add(jarUrl.toString()); - configuration.set(PipelineOptions.JARS, jars); + Set jars = + configuration.getOptional(PipelineOptions.JARS).orElse(new ArrayList<>()) + .stream() + .collect(Collectors.toSet()); + jars.addAll(jarUrls.stream().map(URL::toString).collect(Collectors.toList())); + LOG.info("add jar path is: " + String.join(",", jars)); + configuration.set(PipelineOptions.JARS, jars.stream().collect(Collectors.toList())); } catch (Exception e) { throw new RuntimeException("Failed to add JAR to Flink execution environment", e); } From 03f797e3885f38ca05f6f6c223cb8a067348c3fd Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 9 May 2024 14:16:17 +0800 Subject: [PATCH 2/3] adjust based on pr --- .../composer/flink/FlinkEnvironmentUtils.java | 25 ++++----- .../flink/FlinkEnvironmentUtilsTest.java | 53 +++++++++++++++++++ 2 files changed, 66 insertions(+), 12 deletions(-) create mode 100644 flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java index 9b0df628950..5fa70552b79 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java @@ -22,17 +22,17 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Field; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** Utilities for {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */ @Internal @@ -44,26 +44,27 @@ public class FlinkEnvironmentUtils { * together with the job graph. */ public static void addJar(StreamExecutionEnvironment env, URL jarUrl) { - addJar(env, Lists.newArrayList(jarUrl)); + addJar(env, Collections.singletonList(jarUrl)); } /** * Add the specified JARs to {@link StreamExecutionEnvironment} so that the JAR will be uploaded * together with the job graph. */ - public static void addJar(StreamExecutionEnvironment env, List jarUrls) { + public static void addJar(StreamExecutionEnvironment env, Collection jarUrls) { try { Class envClass = StreamExecutionEnvironment.class; Field field = envClass.getDeclaredField("configuration"); field.setAccessible(true); Configuration configuration = ((Configuration) field.get(env)); - Set jars = - configuration.getOptional(PipelineOptions.JARS).orElse(new ArrayList<>()) - .stream() - .collect(Collectors.toSet()); - jars.addAll(jarUrls.stream().map(URL::toString).collect(Collectors.toList())); - LOG.info("add jar path is: " + String.join(",", jars)); - configuration.set(PipelineOptions.JARS, jars.stream().collect(Collectors.toList())); + List previousJars = + configuration.getOptional(PipelineOptions.JARS).orElse(new ArrayList<>()); + List currentJars = + Stream.concat(previousJars.stream(), jarUrls.stream().map(URL::toString)) + .distinct() + .collect(Collectors.toList()); + LOG.info("pipeline.jars is " + String.join(",", currentJars)); + configuration.set(PipelineOptions.JARS, currentJars); } catch (Exception e) { throw new RuntimeException("Failed to add JAR to Flink execution environment", e); } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java new file mode 100644 index 00000000000..70bf2fb3ef5 --- /dev/null +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtilsTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.URL; +import java.util.Collections; +import java.util.List; + +/** Test for {@link FlinkEnvironmentUtils}. */ +public class FlinkEnvironmentUtilsTest { + + @Test + public void testAddJars() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(PipelineOptions.JARS, Collections.EMPTY_LIST); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.createLocalEnvironment(configuration); + + FlinkEnvironmentUtils.addJar( + env, Lists.newArrayList(new URL("file://a.jar"), new URL("file://a.jar"))); + List expectedJars = Lists.newArrayList("file://a.jar"); + Assert.assertEquals(expectedJars, env.getConfiguration().get(PipelineOptions.JARS)); + + FlinkEnvironmentUtils.addJar( + env, Lists.newArrayList(new URL("file://b.jar"), new URL("file://a.jar"))); + expectedJars.add("file://b.jar"); + Assert.assertEquals(expectedJars, env.getConfiguration().get(PipelineOptions.JARS)); + } +} From d0685ee1efeb84c2abd64a25bf00400c0ad8ace9 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 27 Jun 2024 20:11:49 +0800 Subject: [PATCH 3/3] add private constructor to FlinkEnvironmentUtils --- .../apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java index 5fa70552b79..b00717cb0a8 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkEnvironmentUtils.java @@ -39,6 +39,8 @@ public class FlinkEnvironmentUtils { private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvironmentUtils.class); + private FlinkEnvironmentUtils() {} + /** * Add the specified JAR to {@link StreamExecutionEnvironment} so that the JAR will be uploaded * together with the job graph.