Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
#


bin=$(dirname "${BASH_SOURCE-$0}")
bin=$(cd "${bin}">/dev/null; pwd)

Expand Down Expand Up @@ -50,11 +51,6 @@ while getopts "hc:p:r:d:l:v:u:g:" o; do
;;
u)
ZEPPELIN_IMPERSONATE_USER="${OPTARG}"
if [[ -z "$ZEPPELIN_IMPERSONATE_CMD" ]]; then
ZEPPELIN_IMPERSONATE_RUN_CMD=`echo "ssh ${ZEPPELIN_IMPERSONATE_USER}@localhost" `
else
ZEPPELIN_IMPERSONATE_RUN_CMD=$(eval "echo ${ZEPPELIN_IMPERSONATE_CMD} ")
fi
;;
g)
INTERPRETER_SETTING_NAME=${OPTARG}
Expand Down Expand Up @@ -96,6 +92,15 @@ INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
ZEPPELIN_PID="${ZEPPELIN_PID_DIR}/zeppelin-interpreter-${INTERPRETER_ID}-${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.pid"
ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/zeppelin-interpreter-${INTERPRETER_SETTING_NAME}-"

if [[ -z "$ZEPPELIN_IMPERSONATE_CMD" ]]; then
if [[ "${INTERPRETER_ID}" != "spark" || "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" == "false" ]]; then
ZEPPELIN_IMPERSONATE_RUN_CMD=`echo "ssh ${ZEPPELIN_IMPERSONATE_USER}@localhost" `
fi
else
ZEPPELIN_IMPERSONATE_RUN_CMD=$(eval "echo ${ZEPPELIN_IMPERSONATE_CMD} ")
fi


if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
ZEPPELIN_LOGFILE+="${ZEPPELIN_IMPERSONATE_USER}-"
fi
Expand Down Expand Up @@ -195,7 +200,7 @@ fi

addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"

if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" && "${INTERPRETER_ID}" != "spark" ]]; then
suid="$(id -u ${ZEPPELIN_IMPERSONATE_USER})"
if [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
INTERPRETER_RUN_COMMAND=${ZEPPELIN_IMPERSONATE_RUN_CMD}" '"
Expand All @@ -206,15 +211,12 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
fi

if [[ -n "${SPARK_SUBMIT}" ]]; then
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By removing this ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER you disable the feature that was introduced with #1840 (# export ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER #Optional, by default is true; can be set to false if you don't want to use --proxy-user option with Spark interpreter when impersonation enabled), is that intended? If so, we will need to update the doc.

INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
else
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
fi
INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
else
INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} ${INTP_PORT}`
fi


if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
INTERPRETER_RUN_COMMAND+="'"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface InterpreterClient {

String getInterpreterSettingName();

void start(String userName, Boolean isUserImpersonate);
void start(String userName);

void stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class InterpreterLaunchContext {
private Properties properties;
private InterpreterOption option;
private InterpreterRunner runner;
private String userName;
private String interpreterGroupId;
private String interpreterSettingId;
private String interpreterSettingGroup;
Expand All @@ -38,13 +39,15 @@ public class InterpreterLaunchContext {
public InterpreterLaunchContext(Properties properties,
InterpreterOption option,
InterpreterRunner runner,
String userName,
String interpreterGroupId,
String interpreterSettingId,
String interpreterSettingGroup,
String interpreterSettingName) {
this.properties = properties;
this.option = option;
this.runner = runner;
this.userName = userName;
this.interpreterGroupId = interpreterGroupId;
this.interpreterSettingId = interpreterSettingId;
this.interpreterSettingGroup = interpreterSettingGroup;
Expand Down Expand Up @@ -78,4 +81,8 @@ public String getInterpreterSettingGroup() {
public String getInterpreterSettingName() {
return interpreterSettingName;
}

public String getUserName() {
return userName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,14 @@ List<Interpreter> createInterpreters(String user, String interpreterGroupId, Str
}

synchronized RemoteInterpreterProcess createInterpreterProcess(String interpreterGroupId,
String userName,
Properties properties)
throws IOException {
if (launcher == null) {
createLauncher();
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(properties, option, interpreterRunner,
InterpreterLaunchContext(properties, option, interpreterRunner, userName,
interpreterGroupId, id, group, name);
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
process.setRemoteInterpreterEventPoller(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(Strin
throws IOException {
if (remoteInterpreterProcess == null) {
LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId());
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, properties);
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName,
properties);
synchronized (remoteInterpreterProcess) {
if (!remoteInterpreterProcess.isRunning()) {
remoteInterpreterProcess.start(userName, false);
remoteInterpreterProcess.start(userName);
remoteInterpreterProcess.getRemoteInterpreterEventPoller()
.setInterpreterProcess(remoteInterpreterProcess);
remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
buildEnvFromProperties(), connectTimeout, name);
buildEnvFromProperties(context), connectTimeout, name, option.isUserImpersonate());
}
}

protected Map<String, String> buildEnvFromProperties() {
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
Map<String, String> env = new HashMap<>();
for (Object key : properties.keySet()) {
for (Object key : context.getProperties().keySet()) {
if (RemoteInterpreterUtils.isEnvString((String) key)) {
env.put((String) key, properties.getProperty((String) key));
env.put((String) key, context.getProperties().getProperty((String) key));
}
}
return env;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage rec
}

@Override
protected Map<String, String> buildEnvFromProperties() {
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
Map<String, String> env = new HashMap<String, String>();
Properties sparkProperties = new Properties();
String sparkMaster = getSparkMaster(properties);
Expand Down Expand Up @@ -70,6 +70,11 @@ protected Map<String, String> buildEnvFromProperties() {
for (String name : sparkProperties.stringPropertyNames()) {
sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
}
String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) ||
!useProxyUserEnv.equals("false"))) {
sparkConfBuilder.append(" --proxy-user " + context.getUserName());
}

env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());

Expand Down Expand Up @@ -194,12 +199,12 @@ private boolean isYarnMode() {
}

private String toShellFormat(String value) {
if (value.contains("\'") && value.contains("\"")) {
if (value.contains("'") && value.contains("\"")) {
throw new RuntimeException("Spark property value could not contain both \" and '");
} else if (value.contains("\'")) {
} else if (value.contains("'")) {
return "\"" + value + "\"";
} else {
return "\'" + value + "\'";
return "'" + value + "'";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
private final String interpreterDir;
private final String localRepoDir;
private final String interpreterSettingName;
private final boolean isUserImpersonated;

private Map<String, String> env;

Expand All @@ -74,7 +75,8 @@ public RemoteInterpreterManagedProcess(
String localRepoDir,
Map<String, String> env,
int connectTimeout,
String interpreterSettingName) {
String interpreterSettingName,
boolean isUserImpersonated) {
super(connectTimeout);
this.interpreterRunner = intpRunner;
this.callbackPortRange = callbackPortRange;
Expand All @@ -83,6 +85,7 @@ public RemoteInterpreterManagedProcess(
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
this.interpreterSettingName = interpreterSettingName;
this.isUserImpersonated = isUserImpersonated;
}

@Override
Expand All @@ -96,7 +99,7 @@ public int getPort() {
}

@Override
public void start(String userName, Boolean isUserImpersonate) {
public void start(String userName) {
// start server process
final String callbackHost;
final int callbackPort;
Expand Down Expand Up @@ -161,7 +164,7 @@ public void run() {
cmdLine.addArgument(Integer.toString(callbackPort), false);
cmdLine.addArgument("-r", false);
cmdLine.addArgument(interpreterPortRange, false);
if (isUserImpersonate && !userName.equals("anonymous")) {
if (isUserImpersonated && !userName.equals("anonymous")) {
cmdLine.addArgument("-u", false);
cmdLine.addArgument(userName, false);
}
Expand Down Expand Up @@ -272,6 +275,11 @@ public String getInterpreterRunner() {
return interpreterRunner;
}

@VisibleForTesting
public boolean isUserImpersonated() {
return isUserImpersonated;
}

public boolean isRunning() {
return running.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public String getInterpreterSettingName() {
}

@Override
public void start(String userName, Boolean isUserImpersonate) {
public void start(String userName) {
// assume process is externally managed. nothing to do
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public void testLauncher() throws IOException {
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "groupName", "name");
option.setUserImpersonate(true);
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
Expand All @@ -48,6 +49,7 @@ public void testLauncher() throws IOException {
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(1, interpreterProcess.getEnv().size());
assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
assertEquals(true, interpreterProcess.isUserImpersonated());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testLocalMode() throws IOException {
properties.setProperty("spark.jars", "jar_1");

InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
Expand All @@ -67,7 +67,7 @@ public void testYarnClientMode_1() throws IOException {
properties.setProperty("spark.jars", "jar_1");

InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
Expand All @@ -93,7 +93,7 @@ public void testYarnClientMode_2() throws IOException {
properties.setProperty("spark.jars", "jar_1");

InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
Expand All @@ -118,7 +118,7 @@ public void testYarnClusterMode_1() throws IOException {
properties.setProperty("spark.jars", "jar_1");

InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
Expand All @@ -145,7 +145,8 @@ public void testYarnClusterMode_2() throws IOException {
properties.setProperty("spark.jars", "jar_1");

InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
option.setUserImpersonate(true);
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
Expand All @@ -156,6 +157,6 @@ public void testYarnClusterMode_2() throws IOException {
assertEquals(3, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
}