From d41189f54aceb1e6f81d518a893ea8c3c8394b9c Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Thu, 19 Sep 2019 17:17:27 +0800 Subject: [PATCH] Fix Spark remote debugging regression --- .../spark/common/ISparkBatchDebugJob.java | 4 +-- .../spark/common/ISparkBatchJob.java | 20 ++++++++--- .../hdinsight/spark/common/SparkBatchJob.java | 35 ++++++++++++------- ...arkBatchJobRemoteDebugExecutorProcess.java | 5 +++ .../run/SparkJobExecutorLogInputStream.java | 19 ++++++++-- .../spark/run/SparkJobLogInputStream.java | 17 +-------- 6 files changed, 62 insertions(+), 38 deletions(-) diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchDebugJob.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchDebugJob.java index 4217b73b2a9..766cec9650b 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchDebugJob.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchDebugJob.java @@ -25,7 +25,6 @@ import rx.Observable; import java.io.IOException; -import java.net.URISyntaxException; public interface ISparkBatchDebugJob extends ISparkBatchJob { /** @@ -34,6 +33,5 @@ public interface ISparkBatchDebugJob extends ISparkBatchJob { * @return Spark driver node debugging port * @throws IOException exceptions for the driver debugging port not found */ - public Observable getSparkDriverDebuggingPort() throws IOException; - + Observable getSparkDriverDebuggingPort() throws IOException; } diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchJob.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchJob.java index 0ab627b4f01..fbc294589b0 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchJob.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/ISparkBatchJob.java @@ -22,14 +22,14 @@ package com.microsoft.azure.hdinsight.spark.common; -import com.microsoft.azure.hdinsight.common.MessageInfoType; -import com.microsoft.azuretools.azurecommons.helpers.NotNull; import rx.Observable; import rx.Observer; -import java.io.IOException; +import com.microsoft.azure.hdinsight.common.MessageInfoType; +import com.microsoft.azuretools.azurecommons.helpers.NotNull; + import java.net.URI; -import java.net.URISyntaxException; +import java.util.AbstractMap; import java.util.AbstractMap.SimpleImmutableEntry; public interface ISparkBatchJob { @@ -100,6 +100,18 @@ public interface ISparkBatchJob { @NotNull Observable> getDriverLog(@NotNull String type, long logOffset, int size); + /** + * Get Spark job specified container log observable + * + * @param containerLogUrl the container log URL + * @param type the log type, such as `stderr`, `stdout` + * @param logOffset the log offset that fetching would start from + * @param size the fetching size, -1 for all. + * @return the log and its starting offset pair observable + */ + @NotNull + Observable> getContainerLog(@NotNull String containerLogUrl, @NotNull String type, long logOffset, int size); + /** * Get Spark job submission log observable * diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java index e26bc39ccc8..607730a924b 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/common/SparkBatchJob.java @@ -449,21 +449,30 @@ public Observable> getDriverLog(@NotNull Stri return Observable.empty(); } - String logGot = JobUtils.getInformationFromYarnLogDom( - getSubmission().getCredentialsProvider(), - getCurrentLogUrl(), - type, - offset, - size); - - if (StringUtils.isEmpty(logGot)) { - return Observable.empty(); - } - - return Observable.just(new SimpleImmutableEntry<>(logGot, offset)); + return getContainerLog(getCurrentLogUrl(), type, offset, size); }); } + @NotNull + @Override + public Observable> getContainerLog(@NotNull String containerLogUrl, + @NotNull String type, + long logOffset, + int size) { + String logGot = JobUtils.getInformationFromYarnLogDom( + getSubmission().getCredentialsProvider(), + containerLogUrl, + type, + logOffset, + size); + + if (StringUtils.isEmpty(logGot)) { + return Observable.empty(); + } + + return Observable.just(new SimpleImmutableEntry<>(logGot, logOffset)); + } + /** * Parse host from host:port combination string * @@ -567,7 +576,7 @@ String getSparkJobApplicationId(URI batchBaseUri, int batchId) throws IOExceptio /** * Get Spark Job Yarn application with retries * - * @param batchBaseUri the connection URI of HDInsight Livy batch job, http://livy:8998/batches, the function will help translate it to Yarn connection URI. + * @param yarnConnectUri the connection URI of HDInsight Livy batch job, http://livy:8998/batches, the function will help translate it to Yarn connection URI. * @param applicationID the Yarn application ID * @return the Yarn application got * @throws IOException exceptions in transaction diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRemoteDebugExecutorProcess.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRemoteDebugExecutorProcess.java index 9377fcf4928..f557ff19dfc 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRemoteDebugExecutorProcess.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkBatchJobRemoteDebugExecutorProcess.java @@ -72,6 +72,11 @@ protected Observable prepareArtifact() { return Observable.just(parentJob); } + @Override + protected Observable submitJob(ISparkBatchJob parentJob) { + return Observable.just(parentJob); + } + @Override public InputStream getInputStream() { return stdOutInputStream; diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobExecutorLogInputStream.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobExecutorLogInputStream.java index 570c85be6f9..6fd0f28cae2 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobExecutorLogInputStream.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobExecutorLogInputStream.java @@ -23,15 +23,26 @@ package com.microsoft.azure.hdinsight.spark.run; import com.microsoft.azure.hdinsight.spark.common.ISparkBatchJob; -import com.microsoft.azure.hdinsight.spark.common.SparkBatchJob; import com.microsoft.azuretools.azurecommons.helpers.NotNull; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Optional; + class SparkJobExecutorLogInputStream extends SparkJobLogInputStream { + private final String logUrl; + public SparkJobExecutorLogInputStream(@NotNull String logType, @NotNull String logUrl) { super(logType); - setLogUrl(logUrl); + this.logUrl = logUrl; + } + + @Override + protected synchronized Optional> fetchLog(long logOffset, int fetchSize) { + return getAttachedJob() + .map(job -> job.getContainerLog(getLogUrl(), getLogType(), logOffset, fetchSize) + .toBlocking().singleOrDefault(null)); } @Override @@ -40,4 +51,8 @@ public ISparkBatchJob attachJob(@NotNull ISparkBatchJob sparkJob) { return sparkJob; } + + public String getLogUrl() { + return logUrl; + } } diff --git a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobLogInputStream.java b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobLogInputStream.java index 87d7a844564..1a55c7ddd47 100644 --- a/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobLogInputStream.java +++ b/Utils/hdinsight-node-common/src/com/microsoft/azure/hdinsight/spark/run/SparkJobLogInputStream.java @@ -23,16 +23,11 @@ package com.microsoft.azure.hdinsight.spark.run; import com.microsoft.azure.hdinsight.spark.common.ISparkBatchJob; -import com.microsoft.azure.hdinsight.spark.common.SparkBatchJob; -import com.microsoft.azure.hdinsight.spark.jobs.JobUtils; import com.microsoft.azuretools.azurecommons.helpers.NotNull; import com.microsoft.azuretools.azurecommons.helpers.Nullable; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import java.io.IOException; import java.io.InputStream; -import java.util.AbstractMap; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Optional; @@ -43,8 +38,6 @@ public class SparkJobLogInputStream extends InputStream { private String logType; @Nullable private ISparkBatchJob sparkBatchJob; - @Nullable - private String logUrl; private long offset = 0; @NotNull @@ -61,7 +54,7 @@ public ISparkBatchJob attachJob(@NotNull ISparkBatchJob sparkJob) { return sparkJob; } - private synchronized Optional> fetchLog(long logOffset, int fetchSize) { + protected synchronized Optional> fetchLog(long logOffset, int fetchSize) { return getAttachedJob() .map(job -> job.getDriverLog(getLogType(), logOffset, fetchSize) .toBlocking().singleOrDefault(null)); @@ -106,14 +99,6 @@ public int available() throws IOException { } } - void setLogUrl(@Nullable String logUrl) { - this.logUrl = logUrl; - } - - public Optional getLogUrl() { - return Optional.ofNullable(logUrl); - } - @NotNull public String getLogType() { return logType;