From b3276b6bcc059e6e9465e967d9cc3f539c039106 Mon Sep 17 00:00:00 2001 From: daemin Date: Tue, 2 Jul 2019 01:09:32 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BB=A5=E5=91=BD=E4=BB=A4=E8=A1=8C?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E7=9A=84=E6=96=B9=E5=BC=8F=E6=8C=87=E5=AE=9A?= =?UTF-8?q?detached?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dtstack/flinkx/launcher/ClusterClientFactory.java | 4 ++-- .../dtstack/flinkx/launcher/LauncherOptionParser.java | 6 ++++++ .../com/dtstack/flinkx/launcher/LauncherOptions.java | 10 ++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java index c368218e89..739394882b 100644 --- a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java +++ b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java @@ -75,7 +75,7 @@ public static ClusterClient createStandaloneClient(LauncherOptions launcherOptio InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress()); config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName()); config.setInteger(JobManagerOptions.PORT, address.getPort()); - clusterClient.setDetached(true); + clusterClient.setDetached(launcherOptions.getDetached()); return clusterClient; } @@ -128,7 +128,7 @@ public static ClusterClient createYarnClient(LauncherOptions launcherOptions) { AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(config, yarnConf, ".", yarnClient, false); ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId); - clusterClient.setDetached(true); + clusterClient.setDetached(launcherOptions.getDetached()); return clusterClient; } catch(Exception e) { throw new RuntimeException(e); diff --git a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptionParser.java b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptionParser.java index 6bbe2ae9dd..70e211bdaf 100644 --- a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptionParser.java +++ b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptionParser.java @@ -52,6 +52,8 @@ public class LauncherOptionParser { public static final String OPTION_FLINK_LIB_JAR = "flinkLibJar"; + public static final String OPTION_DETACHED = "detached"; + private Options options = new Options(); private BasicParser parser = new BasicParser(); @@ -67,6 +69,7 @@ public LauncherOptionParser(String[] args) { options.addOption(OPTION_QUEUE, true, "yarn job queue"); options.addOption(OPTION_FLINK_CONF_PROP, true, "flink perjob conf prop"); options.addOption(OPTION_FLINK_LIB_JAR, true, "flink lib jar"); + options.addOption(OPTION_DETACHED, false, "detached mode"); try { CommandLine cl = parser.parse(options, args); @@ -111,6 +114,9 @@ public LauncherOptionParser(String[] args) { launcherOptions.setConfProp(confProp); } + if (cl.hasOption(OPTION_DETACHED)) { + launcherOptions.setDetached(true); + } } catch (Exception e) { printUsage(); diff --git a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptions.java b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptions.java index 854794df0d..8d33254c42 100644 --- a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptions.java +++ b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/LauncherOptions.java @@ -48,6 +48,8 @@ public class LauncherOptions { private String confProp; + private boolean detached; + public int getParallelism() { return parallelism; } @@ -135,4 +137,12 @@ public String getConfProp() { public void setConfProp(String confProp) { this.confProp = confProp; } + + public boolean getDetached() { + return detached; + } + + public void setDetached(boolean detached) { + this.detached = detached; + } } From 1dfc70cccfa3a1584c9ddf70415b792124205bf3 Mon Sep 17 00:00:00 2001 From: daemin Date: Sat, 6 Jul 2019 12:16:33 +0800 Subject: [PATCH 2/2] fix --- flinkx-core/pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flinkx-core/pom.xml b/flinkx-core/pom.xml index 0babfab2a5..26d3863d5f 100644 --- a/flinkx-core/pom.xml +++ b/flinkx-core/pom.xml @@ -131,6 +131,8 @@ com.google.guava:* + ch.qos.logback:* + org.slf4j:*