Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,27 @@
package com.dtstack.flinkx.launcher;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.File;
import java.io.FilenameFilter;
import java.lang.reflect.Field;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static com.dtstack.flinkx.launcher.LauncherOptions.OPTION_MODE;

/**
* The Factory of ClusterClient
*
Expand All @@ -54,52 +48,46 @@
*/
public class ClusterClientFactory {

public static ClusterClient createClusterClient(Properties props) {
String clientType = props.getProperty(OPTION_MODE);
if(clientType.equals(ClusterMode.MODE_STANDALONE)) {
return createStandaloneClient(props);
} else if(clientType.equals(ClusterMode.MODE_YARN)) {
return createYarnClient(props);
public static ClusterClient createClusterClient(LauncherOptions launcherOptions) {
String clientType = launcherOptions.getMode();
if(ClusterMode.standalone.name().equals(clientType)) {
return createStandaloneClient(launcherOptions);
} else if(ClusterMode.yarn.name().equals(clientType)) {
return createYarnClient(launcherOptions);
}else if(ClusterMode.yarnPer.name().equals(clientType)){
return createPerYarnClient(launcherOptions);
}
throw new IllegalArgumentException("Unsupported cluster client type: ");
}

public static StandaloneClusterClient createStandaloneClient(Properties props) {
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
private static StandaloneClusterClient createStandaloneClient(LauncherOptions launcherOptions) {
String flinkConfDir = launcherOptions.getFlinkconf();
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
StandaloneClusterClient clusterClient = descriptor.retrieve(null);
clusterClient.setDetached(true);
return clusterClient;
}

public static YarnClusterClient createYarnClient(Properties props) {
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
String yarnConfDir = props.getProperty(LauncherOptions.OPTION_YARN_CONF_DIR);
org.apache.hadoop.conf.Configuration yarnConf = new YarnConfiguration();
private static YarnClusterClient createPerYarnClient(LauncherOptions launcherOptions) {
YarnClusterClient cluster = null;
try {
Configuration flinkConf = FlinkUtil.getFlinkConfiguration(launcherOptions.getFlinkconf());
ClusterSpecification clusterSpecification = FlinkUtil.createDefaultClusterSpecification(flinkConf,launcherOptions.getPriority());
AbstractYarnClusterDescriptor descriptor = FlinkUtil.createPerJobClusterDescriptor(flinkConf,FlinkUtil.getYarnConfiguration(flinkConf,launcherOptions.getYarnconf()),launcherOptions.getFlinkLibJar(),launcherOptions.getQueue());
cluster = descriptor.deploySessionCluster(clusterSpecification);
} catch (Exception e){
throw new RuntimeException("Couldn't deploy Yarn session cluster" + e.getMessage());
}
return cluster;
}

private static YarnClusterClient createYarnClient(LauncherOptions launcherOptions) {
Configuration config = FlinkUtil.getFlinkConfiguration(launcherOptions.getFlinkconf());
String yarnConfDir = launcherOptions.getYarnconf();
if(StringUtils.isNotBlank(yarnConfDir)) {
try {

config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
FileSystem.initialize(config);

File dir = new File(yarnConfDir);
if(dir.exists() && dir.isDirectory()) {
File[] xmlFileList = new File(yarnConfDir).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if(name.endsWith(".xml"))
return true;
return false;
}
});
if(xmlFileList != null) {
for(File xmlFile : xmlFileList) {
yarnConf.addResource(xmlFile.toURI().toURL());
}
}

org.apache.hadoop.conf.Configuration yarnConf = FlinkUtil.getYarnConfiguration(config,yarnConfDir);
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
Expand Down Expand Up @@ -147,14 +135,11 @@ public boolean accept(File dir, String name) {
YarnClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
clusterClient.setDetached(true);
return clusterClient;
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}



throw new UnsupportedOperationException("Haven't been developed yet!");
}

Expand All @@ -177,31 +162,4 @@ private static org.apache.hadoop.conf.Configuration haYarnConf(org.apache.hadoop
}
return yarnConf;
}

private static org.apache.hadoop.conf.Configuration getYarnConf(String yarnConfDir) {
org.apache.hadoop.conf.Configuration yarnConf = new YarnConfiguration();
try {

File dir = new File(yarnConfDir);
if(dir.exists() && dir.isDirectory()) {
File[] xmlFileList = new File(yarnConfDir).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if(name.endsWith(".xml"))
return true;
return false;
}
});
if(xmlFileList != null) {
for(File xmlFile : xmlFileList) {
yarnConf.addResource(xmlFile.toURI().toURL());
}
}
}
} catch(Exception e) {
throw new RuntimeException(e);
}
return yarnConf;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
* Company: www.dtstack.com
* @author huyifan.zju@163.com
*/
public class ClusterMode {
public enum ClusterMode {

public static final String MODE_LOCAL = "local";
local(0),standalone(1),yarn(2),yarnPer(3);

public static final String MODE_STANDALONE = "standalone";
private int type;

public static final String MODE_YARN = "yarn";
ClusterMode(int type){
this.type = type;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.dtstack.flinkx.launcher;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.*;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptorV2;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.File;
import java.io.FilenameFilter;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
* Created by sishu.yss on 2018/9/26.
*/
public class FlinkUtil {

public static Configuration getFlinkConfiguration(String flinkConfDir) {
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
return config;
}


public static org.apache.hadoop.conf.Configuration getYarnConfiguration(Configuration config, String yarnConfDir) throws Exception {
org.apache.hadoop.conf.Configuration yarnConf = new YarnConfiguration();
config.setString(ConfigConstants.PATH_HADOOP_CONFIG, yarnConfDir);
FileSystem.initialize(config);
File dir = new File(yarnConfDir);
if (dir.exists() && dir.isDirectory()) {
File[] xmlFileList = new File(yarnConfDir).listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (name.endsWith(".xml")) {
return true;
}
return false;
}
});
if (xmlFileList != null) {
for (File xmlFile : xmlFileList) {
yarnConf.addResource(xmlFile.toURI().toURL());
}
}
}
return yarnConf;
}

public static ClusterSpecification createDefaultClusterSpecification(Configuration configuration,int priority) {
final int numberTaskManagers = 1;

// JobManager Memory
final int jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);

// Task Managers memory
final int taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);

int slotsPerTaskManager = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);

return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setNumberTaskManagers(numberTaskManagers)
.setSlotsPerTaskManager(slotsPerTaskManager)
.setPriority(priority)
.createClusterSpecification();
}

public static AbstractYarnClusterDescriptor createPerJobClusterDescriptor(Configuration flinkConfiguration, org.apache.hadoop.conf.Configuration yarnConf,String flinkJarPath,String queue) throws Exception {
Configuration newConf = new Configuration(flinkConfiguration);
newConf.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(newConf,yarnConf, false);
List<URL> classpaths = new ArrayList<URL>();
if (StringUtils.isNotBlank(flinkJarPath)) {
File[] jars = new File(flinkJarPath).listFiles();
for (File file : jars){
if (file.toURI().toURL().toString().contains("flink-dist")){
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
} else {
classpaths.add(file.toURI().toURL());
}
}
} else {
throw new RuntimeException("The Flink jar path is null");
}
clusterDescriptor.setProvidedUserJarFiles(classpaths);
if(StringUtils.isNotBlank(queue)){
clusterDescriptor.setQueue(queue);
}
return clusterDescriptor;
}

private static AbstractYarnClusterDescriptor getClusterDescriptor(Configuration flinkConfiguration, org.apache.hadoop.conf.Configuration yarnConf, boolean flip6) throws NoSuchFieldException, IllegalAccessException {
AbstractYarnClusterDescriptor clusterDescriptor;
if (flip6) {
clusterDescriptor = new YarnClusterDescriptorV2(
flinkConfiguration,
".");
} else {
clusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
".");
}
Field confField = AbstractYarnClusterDescriptor.class.getDeclaredField("conf");
confField.setAccessible(true);
confField.set(clusterDescriptor, yarnConf);
return clusterDescriptor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import static com.dtstack.flinkx.launcher.ClusterMode.MODE_LOCAL;
import static com.dtstack.flinkx.launcher.LauncherOptions.*;

/**
* FlinkX commandline Launcher
Expand All @@ -41,14 +38,14 @@
*/
public class Launcher {

private static List<String> initFlinkxArgList(Properties props) {
private static List<String> initFlinkxArgList(LauncherOptions launcherOptions) {
List<String> argList = new ArrayList<>();
argList.add("-job");
argList.add((String) props.get(OPTION_JOB));
argList.add(launcherOptions.getJob());
argList.add("-jobid");
argList.add((String) props.get(OPTION_JOB_ID));
argList.add(launcherOptions.getJobid());
argList.add("-pluginRoot");
argList.add((String) props.get(OPTION_PLUGIN_ROOT));
argList.add(launcherOptions.getPlugin());
return argList;
}

Expand Down Expand Up @@ -80,28 +77,25 @@ private static List<URL> analyzeUserClasspath(String content, String pluginRoot)


public static void main(String[] args) throws Exception {
Properties properties = new LauncherOptionParser(args).getProperties();
String mode = (String) properties.get(OPTION_MODE);
List<String> argList = initFlinkxArgList(properties);

if(mode.equals(MODE_LOCAL)) {
LauncherOptions launcherOptions = new LauncherOptionParser(args).getLauncherOptions();
String mode = launcherOptions.getMode();
List<String> argList = initFlinkxArgList(launcherOptions);
if(mode.equals(ClusterMode.local.name())) {
String[] localArgs = argList.toArray(new String[argList.size()]);
com.dtstack.flinkx.Main.main(localArgs);
} else {
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(properties);
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
String monitor = clusterClient.getWebInterfaceURL();
argList.add("-monitor");
argList.add(monitor);
String pluginRoot = properties.getProperty(OPTION_PLUGIN_ROOT);
String content = properties.getProperty(OPTION_JOB);
String pluginRoot = launcherOptions.getPlugin();
String content = launcherOptions.getJob();
File jarFile = new File(pluginRoot + File.separator + "flinkx.jar");
List<URL> urlList = analyzeUserClasspath(content, pluginRoot);
String[] remoteArgs = argList.toArray(new String[argList.size()]);
PackagedProgram program = new PackagedProgram(jarFile, urlList, remoteArgs);
clusterClient.run(program, 1);
clusterClient.run(program, launcherOptions.getParallelism());
clusterClient.shutdown();
}

}

}
Loading