From 319afdcd4361f0ef88074df4232960b8904d8a86 Mon Sep 17 00:00:00 2001 From: Rui Fang Date: Thu, 1 Apr 2021 12:24:57 +0800 Subject: [PATCH 1/2] Revert "Update based on review comments" This reverts commit 04f7ef57c1de9ce4f313330059ed4f89ca26e258. --- .../spark/run/SparkBatchJobRunner.java | 14 +++++++------- .../azure/hdinsight/common/WasbUri.java | 2 +- .../spark/common/SparkSubmissionParameter.java | 18 ++++++++++++++---- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java index 21978fd26e6..df41eaeb731 100644 --- a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java +++ b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java @@ -75,11 +75,11 @@ protected SparkSubmissionParameter updateStorageConfigForSubmissionParameter(Spa // be of URI schema which starts with "https://". Then job submission will fail with error like // "Server returned HTTP response code: 401 for URL: https://accountName.dfs.core.windows.net/fs0/Reference.jar" // Therefore, we need to transform the Gen2 "https" URI to "abfs" url to avoid the error. - final SparkSubmissionParameter submissionParameter = submitModel.getSubmissionParameter(); - submissionParameter.setReferencedJars(submissionParameter.getReferencedJars().stream() + final SparkSubmissionParameter newParameter = SparkSubmissionParameter.copyOf(submitModel.getSubmissionParameter()); + newParameter.setReferencedJars(newParameter.getReferencedJars().stream() .map(this::transformToGen2Uri) .collect(Collectors.toList())); - submissionParameter.setReferencedFiles(submissionParameter.getReferencedFiles().stream() + newParameter.setReferencedFiles(newParameter.getReferencedFiles().stream() .map(this::transformToGen2Uri) .collect(Collectors.toList())); @@ -88,12 +88,12 @@ protected SparkSubmissionParameter updateStorageConfigForSubmissionParameter(Spa try { final WasbUri fsRoot = WasbUri.parse(submitModel.getJobUploadStorageModel().getUploadPath()); final String storageKey = submitModel.getJobUploadStorageModel().getStorageKey(); - final Object existingConfigEntry = submissionParameter.getJobConfig().get(SparkSubmissionParameter.Conf); + final Object existingConfigEntry = newParameter.getJobConfig().get(SparkSubmissionParameter.Conf); final SparkConfigures wrappedConfig = existingConfigEntry instanceof Map ? new SparkConfigures(existingConfigEntry) : new SparkConfigures(); - wrappedConfig.put("spark.hadoop." + fsRoot.getHadoopBlobFsPropertyKey(), storageKey); - submissionParameter.getJobConfig().put(SparkSubmissionParameter.Conf, wrappedConfig); + wrappedConfig.put(fsRoot.getHadoopBlobFsPropertyKey(), storageKey); + newParameter.updateJobConfig(SparkSubmissionParameter.Conf, wrappedConfig); } catch (final UnknownFormatConversionException error) { final String errorHint = "Azure blob storage uploading path is not in correct format"; log().warn(String.format("%s. Uploading Path: %s. Error message: %s. Stacktrace:\n%s", @@ -108,7 +108,7 @@ protected SparkSubmissionParameter updateStorageConfigForSubmissionParameter(Spa } } - return submissionParameter; + return newParameter; } @Override diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java index 9ebe0a398ea..d2d025a37fc 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java @@ -73,7 +73,7 @@ public int hashCode() { } public String getHadoopBlobFsPropertyKey() { - return String.format("fs.azure.account.key.%s.blob.%s", getStorageAccount(), getEndpointSuffix()); + return String.format("spark.hadoop.fs.azure.account.key.%s.blob.%s", getStorageAccount(), getEndpointSuffix()); } public static WasbUri parse(final String blobUri) { diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java index 8b8ca8e207a..833716c20ff 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java @@ -8,6 +8,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.microsoft.azure.hdinsight.sdk.rest.IConvertible; import com.microsoft.azuretools.azurecommons.helpers.NotNull; import com.microsoft.azuretools.azurecommons.helpers.Nullable; @@ -104,10 +106,10 @@ public static SparkSubmissionParameter copyOf(SparkSubmissionParameter parameter parameter.getLocalArtifactPath(), parameter.getFile(), parameter.getMainClassName(), - new ArrayList<>(parameter.getReferencedFiles()), - new ArrayList<>(parameter.getReferencedJars()), - new ArrayList<>(parameter.getArgs()), - new HashMap<>(parameter.getJobConfig())); + ImmutableList.copyOf(parameter.getReferencedFiles()), + ImmutableList.copyOf(parameter.getReferencedJars()), + ImmutableList.copyOf(parameter.getArgs()), + ImmutableMap.copyOf(parameter.getJobConfig())); copiedParameter.setName(parameter.getName()); return copiedParameter; } @@ -308,6 +310,14 @@ public static List checkJobConfigMap(Map updateJobConfig(String key, Object value) { + Map clonedJobConfig = new HashMap<>(this.jobConfig); + clonedJobConfig.put(key, value); + this.jobConfig = clonedJobConfig; + return this.jobConfig; + } + @NotNull public List> flatJobConfig() { List> flattedConfigs = new ArrayList<>(); From 12986e820f10daa050fd17f005612b38c2734e6d Mon Sep 17 00:00:00 2001 From: Rui Fang Date: Thu, 1 Apr 2021 12:24:57 +0800 Subject: [PATCH 2/2] Revert "Fix linked non-cluster-default blob storage not working for HDI cluster issue" This reverts commit 03717c54512054bbc3d7268124a2703d8d0ce1dd. --- .../spark/run/ArcadiaSparkBatchRunner.kt | 19 +++++++- .../spark/run/ArisSparkBatchRunner.kt | 3 +- .../run/CosmosServerlessSparkBatchRunner.kt | 6 +-- .../spark/run/CosmosSparkBatchRunner.kt | 5 ++- .../spark/run/SparkBatchJobRunner.java | 43 ++++--------------- .../azure/hdinsight/common/WasbUri.java | 2 +- .../common/SparkSubmissionParameter.java | 8 ---- 7 files changed, 36 insertions(+), 50 deletions(-) diff --git a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArcadiaSparkBatchRunner.kt b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArcadiaSparkBatchRunner.kt index eb953dc0276..88dbb5f672d 100644 --- a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArcadiaSparkBatchRunner.kt +++ b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArcadiaSparkBatchRunner.kt @@ -66,10 +66,27 @@ class ArcadiaSparkBatchRunner : SparkBatchJobRunner() { compute.workSpace.webUrl ) + if (submitModel.jobUploadStorageModel.storageAccountType == SparkSubmitStorageType.BLOB) { + val fsRoot = WasbUri.parse(arcadiaModel.jobUploadStorageModel.uploadPath + ?: throw ExecutionException("No uploading path set in Run Configuration")) + val storageKey = arcadiaModel.jobUploadStorageModel.storageKey + val configEntry = submitModel.submissionParameter.jobConfig[SparkSubmissionParameter.Conf] + val wrappedConfig = if (configEntry != null && configEntry is java.util.Map<*, *>) { + SparkConfigures(configEntry) + } else { + SparkConfigures() + } + submitModel.submissionParameter.jobConfig[SparkSubmissionParameter.Conf] = + wrappedConfig.apply { + put("spark.hadoop.fs.azure.account.key.${fsRoot.storageAccount}.blob.core.windows.net", + storageKey) + } + } + val jobDeploy = SparkBatchJobDeployFactory.getInstance().buildSparkBatchJobDeploy(submitModel, compute) ArcadiaSparkBatchJob( - updateStorageConfigForSubmissionParameter(submitModel), + prepareSubmissionParameterWithTransformedGen2Uri(submitModel.submissionParameter), submission, jobDeploy) }} diff --git a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArisSparkBatchRunner.kt b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArisSparkBatchRunner.kt index d9655dd5663..6814da8fe5a 100644 --- a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArisSparkBatchRunner.kt +++ b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/ArisSparkBatchRunner.kt @@ -89,7 +89,8 @@ class ArisSparkBatchRunner : SparkBatchJobRunner() { // Livy release notes: https://livy.apache.org/history/ // JIRA: https://issues.apache.org/jira/browse/LIVY-41 submitModel.submissionParameter.apply { - updateStorageConfigForSubmissionParameter(submitModel).apply { name = mainClassName + "_$currentUtcTime" } + name = mainClassName + "_$currentUtcTime" + prepareSubmissionParameterWithTransformedGen2Uri(this) }, SparkBatchSubmission.getInstance(), jobDeploy diff --git a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosServerlessSparkBatchRunner.kt b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosServerlessSparkBatchRunner.kt index 91d98aaf8aa..2c3c0071136 100644 --- a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosServerlessSparkBatchRunner.kt +++ b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosServerlessSparkBatchRunner.kt @@ -42,8 +42,8 @@ class CosmosServerlessSparkBatchRunner : SparkBatchJobRunner() { return "CosmosServerlessSparkBatchRun" } - override fun updateStorageConfigForSubmissionParameter(submitModel: SparkSubmitModel): SparkSubmissionParameter { - return CreateSparkBatchJobParameters.copyOf(submitModel.submissionParameter as CreateSparkBatchJobParameters).apply { + override fun prepareSubmissionParameterWithTransformedGen2Uri(parameter: SparkSubmissionParameter): SparkSubmissionParameter { + return CreateSparkBatchJobParameters.copyOf(parameter as CreateSparkBatchJobParameters).apply { referencedJars = this.referencedJars.stream() .map { transformToGen2Uri(it) } .collect(Collectors.toList()) @@ -70,7 +70,7 @@ class CosmosServerlessSparkBatchRunner : SparkBatchJobRunner() { CosmosServerlessSparkBatchJob( account, AdlsDeploy(storageRootPath, accessToken), - updateStorageConfigForSubmissionParameter(submitModel) as CreateSparkBatchJobParameters, + prepareSubmissionParameterWithTransformedGen2Uri(submissionParameter) as CreateSparkBatchJobParameters, SparkBatchSubmission.getInstance()) } } \ No newline at end of file diff --git a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosSparkBatchRunner.kt b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosSparkBatchRunner.kt index b0a4fcb037e..2cece1244c8 100644 --- a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosSparkBatchRunner.kt +++ b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/CosmosSparkBatchRunner.kt @@ -24,12 +24,15 @@ package com.microsoft.azure.hdinsight.spark.run import com.intellij.execution.ExecutionException import com.intellij.execution.configurations.RunProfile +import com.microsoft.azure.hdinsight.common.MessageInfoType import com.microsoft.azure.hdinsight.sdk.common.azure.serverless.AzureSparkCosmosClusterManager import com.microsoft.azure.hdinsight.spark.common.* import com.microsoft.azure.hdinsight.spark.run.configuration.CosmosSparkRunConfiguration import rx.Observable import rx.Observable.just +import rx.Observer import java.net.URI +import java.util.AbstractMap.SimpleImmutableEntry class CosmosSparkBatchRunner : SparkBatchJobRunner() { override fun canRun(executorId: String, profile: RunProfile): Boolean { @@ -52,7 +55,7 @@ class CosmosSparkBatchRunner : SparkBatchJobRunner() { ?.let { just(URI.create(it)) } ?: clusterDetail.get().map { it.livyUri } } .map { livyUri -> CosmosSparkBatchJob( - updateStorageConfigForSubmissionParameter(submitModel), + prepareSubmissionParameterWithTransformedGen2Uri(submitModel.submissionParameter), SparkBatchAzureSubmission(tenantId, accountName, clusterId, livyUri)) } } diff --git a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java index df41eaeb731..d802e82e857 100644 --- a/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java +++ b/PluginsAndFeatures/azure-toolkit-for-intellij/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRunner.java @@ -21,7 +21,6 @@ import com.intellij.openapi.project.Project; import com.microsoft.azure.hdinsight.common.AbfsUri; import com.microsoft.azure.hdinsight.common.ClusterManagerEx; -import com.microsoft.azure.hdinsight.common.WasbUri; import com.microsoft.azure.hdinsight.common.logger.ILogger; import com.microsoft.azure.hdinsight.sdk.cluster.IClusterDetail; import com.microsoft.azure.hdinsight.spark.common.*; @@ -43,7 +42,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.UnknownFormatConversionException; import java.util.stream.Collectors; import static com.microsoft.azure.hdinsight.spark.common.SparkBatchSubmission.getClusterSubmission; @@ -70,44 +68,18 @@ protected String transformToGen2Uri(String url) { : url; } - protected SparkSubmissionParameter updateStorageConfigForSubmissionParameter(SparkSubmitModel submitModel) throws ExecutionException { - // If we use virtual file system to select referenced jars or files on ADLS Gen2 storage, the selected file path will - // be of URI schema which starts with "https://". Then job submission will fail with error like - // "Server returned HTTP response code: 401 for URL: https://accountName.dfs.core.windows.net/fs0/Reference.jar" - // Therefore, we need to transform the Gen2 "https" URI to "abfs" url to avoid the error. - final SparkSubmissionParameter newParameter = SparkSubmissionParameter.copyOf(submitModel.getSubmissionParameter()); + // If we use virtual file system to select referenced jars or files on ADLS Gen2 storage, the selected file path will + // be of URI schema which starts with "https://". Then job submission will fail with error like + // "Server returned HTTP response code: 401 for URL: https://accountName.dfs.core.windows.net/fs0/Reference.jar" + // Therefore, we need to transform the Gen2 "https" URI to "abfs" url to avoid the error. + protected SparkSubmissionParameter prepareSubmissionParameterWithTransformedGen2Uri(SparkSubmissionParameter parameter) { + final SparkSubmissionParameter newParameter = SparkSubmissionParameter.copyOf(parameter); newParameter.setReferencedJars(newParameter.getReferencedJars().stream() .map(this::transformToGen2Uri) .collect(Collectors.toList())); newParameter.setReferencedFiles(newParameter.getReferencedFiles().stream() .map(this::transformToGen2Uri) .collect(Collectors.toList())); - - // If job upload storage type is Azure Blob storage, we need to put blob storage credential into livy configuration - if (submitModel.getJobUploadStorageModel().getStorageAccountType() == SparkSubmitStorageType.BLOB) { - try { - final WasbUri fsRoot = WasbUri.parse(submitModel.getJobUploadStorageModel().getUploadPath()); - final String storageKey = submitModel.getJobUploadStorageModel().getStorageKey(); - final Object existingConfigEntry = newParameter.getJobConfig().get(SparkSubmissionParameter.Conf); - final SparkConfigures wrappedConfig = existingConfigEntry instanceof Map - ? new SparkConfigures(existingConfigEntry) - : new SparkConfigures(); - wrappedConfig.put(fsRoot.getHadoopBlobFsPropertyKey(), storageKey); - newParameter.updateJobConfig(SparkSubmissionParameter.Conf, wrappedConfig); - } catch (final UnknownFormatConversionException error) { - final String errorHint = "Azure blob storage uploading path is not in correct format"; - log().warn(String.format("%s. Uploading Path: %s. Error message: %s. Stacktrace:\n%s", - errorHint, submitModel.getJobUploadStorageModel().getUploadPath(), error.getMessage(), - ExceptionUtils.getStackTrace(error))); - throw new ExecutionException(errorHint); - } catch (final Exception error) { - final String errorHint = "Failed to update config for linked Azure Blob storage"; - log().warn(String.format("%s. Error message: %s. Stacktrace:\n%s", - errorHint, error.getMessage(), ExceptionUtils.getStackTrace(error))); - throw new ExecutionException(errorHint); - } - } - return newParameter; } @@ -134,7 +106,8 @@ public Observable buildSparkBatchJob(@NotNull SparkSubmitModel s final Deployable jobDeploy = SparkBatchJobDeployFactory.getInstance().buildSparkBatchJobDeploy( submitModel, clusterDetail); - final SparkSubmissionParameter submissionParameter = updateStorageConfigForSubmissionParameter(submitModel); + final SparkSubmissionParameter submissionParameter = + prepareSubmissionParameterWithTransformedGen2Uri(submitModel.getSubmissionParameter()); updateCurrentBackgroundableTaskIndicator(progressIndicator -> { progressIndicator.setFraction(1.0f); diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java index d2d025a37fc..9ebe0a398ea 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/common/WasbUri.java @@ -73,7 +73,7 @@ public int hashCode() { } public String getHadoopBlobFsPropertyKey() { - return String.format("spark.hadoop.fs.azure.account.key.%s.blob.%s", getStorageAccount(), getEndpointSuffix()); + return String.format("fs.azure.account.key.%s.blob.%s", getStorageAccount(), getEndpointSuffix()); } public static WasbUri parse(final String blobUri) { diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java index 833716c20ff..4bf38dbe1ad 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkSubmissionParameter.java @@ -310,14 +310,6 @@ public static List checkJobConfigMap(Map updateJobConfig(String key, Object value) { - Map clonedJobConfig = new HashMap<>(this.jobConfig); - clonedJobConfig.put(key, value); - this.jobConfig = clonedJobConfig; - return this.jobConfig; - } - @NotNull public List> flatJobConfig() { List> flattedConfigs = new ArrayList<>();