Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
from resource_management.libraries.functions import StackFeature
from resource_management.libraries.functions import get_kinit_path
from resource_management.libraries.functions import stack_select
from resource_management.libraries.functions.check_process_status import check_process_status
from resource_management.libraries.functions.check_process_status import \
check_process_status
from resource_management.libraries.functions.default import default
from resource_management.libraries.functions.format import format
from resource_management.libraries.functions.stack_features import check_stack_feature
from resource_management.libraries.functions.stack_features import \
check_stack_feature
from resource_management.libraries.functions.version import format_stack_version
from resource_management.libraries.script.script import Script

Expand Down Expand Up @@ -362,22 +364,27 @@ def is_file_exists_in_HDFS(self, path, as_user):

return False

def get_interpreter_settings(self):
import params
import json

interpreter_config = os.path.join(params.conf_dir, "interpreter.json")
def copy_interpreter_from_HDFS_to_FS(self, params):
if params.conf_stored_in_hdfs:
zeppelin_conf_fs = self.get_zeppelin_conf_FS(params)

if self.is_file_exists_in_HDFS(zeppelin_conf_fs, params.zeppelin_user):
# copy from hdfs to /etc/zeppelin/conf/interpreter.json
kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths',None))
kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
kinit_if_needed = format("{kinit_path_local} -kt {zeppelin_kerberos_keytab} {zeppelin_kerberos_principal};")
interpreter_config = os.path.join(params.conf_dir, "interpreter.json")
shell.call(format("rm {interpreter_config};"
"{kinit_if_needed} hdfs --config {hadoop_conf_dir} dfs -get {zeppelin_conf_fs} {interpreter_config}"),
user=params.zeppelin_user)
"{kinit_if_needed} hdfs --config {hadoop_conf_dir} dfs -get {zeppelin_conf_fs} {interpreter_config}"),
user=params.zeppelin_user)
return True
return False

def get_interpreter_settings(self):
import params
import json

self.copy_interpreter_from_HDFS_to_FS(params)
interpreter_config = os.path.join(params.conf_dir, "interpreter.json")
config_content = sudo.read_file(interpreter_config)
config_data = json.loads(config_content)
return config_data
Expand Down Expand Up @@ -592,22 +599,23 @@ def create_interpreter_json(self):
import interpreter_json_template
import params

interpreter_json = interpreter_json_template.template
File(format("{params.conf_dir}/interpreter.json"),
content=interpreter_json,
owner=params.zeppelin_user,
group=params.zeppelin_group,
mode=0664)

if params.conf_stored_in_hdfs:
params.HdfsResource(self.get_zeppelin_conf_FS(params),
type="file",
action="create_on_execute",
source=format("{params.conf_dir}/interpreter.json"),
owner=params.zeppelin_user,
recursive_chown=True,
recursive_chmod=True,
replace_existing_files=True)
if not self.copy_interpreter_from_HDFS_to_FS(params):
interpreter_json = interpreter_json_template.template
File(format("{params.conf_dir}/interpreter.json"),
content=interpreter_json,
owner=params.zeppelin_user,
group=params.zeppelin_group,
mode=0664)

if params.conf_stored_in_hdfs:
params.HdfsResource(self.get_zeppelin_conf_FS(params),
type="file",
action="create_on_execute",
source=format("{params.conf_dir}/interpreter.json"),
owner=params.zeppelin_user,
recursive_chown=True,
recursive_chmod=True,
replace_existing_files=True)

def get_zeppelin_spark_dependencies(self):
import params
Expand Down