Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import rx.Observable;

import java.io.IOException;
import java.net.URISyntaxException;

public interface ISparkBatchDebugJob extends ISparkBatchJob {
/**
Expand All @@ -34,6 +33,5 @@ public interface ISparkBatchDebugJob extends ISparkBatchJob {
* @return Spark driver node debugging port
* @throws IOException exceptions for the driver debugging port not found
*/
public Observable<Integer> getSparkDriverDebuggingPort() throws IOException;

Observable<Integer> getSparkDriverDebuggingPort() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

package com.microsoft.azure.hdinsight.spark.common;

import com.microsoft.azure.hdinsight.common.MessageInfoType;
import com.microsoft.azuretools.azurecommons.helpers.NotNull;
import rx.Observable;
import rx.Observer;

import java.io.IOException;
import com.microsoft.azure.hdinsight.common.MessageInfoType;
import com.microsoft.azuretools.azurecommons.helpers.NotNull;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleImmutableEntry;

public interface ISparkBatchJob {
Expand Down Expand Up @@ -100,6 +100,18 @@ public interface ISparkBatchJob {
@NotNull
Observable<SimpleImmutableEntry<String, Long>> getDriverLog(@NotNull String type, long logOffset, int size);

/**
* Get Spark job specified container log observable
*
* @param containerLogUrl the container log URL
* @param type the log type, such as `stderr`, `stdout`
* @param logOffset the log offset that fetching would start from
* @param size the fetching size, -1 for all.
* @return the log and its starting offset pair observable
*/
@NotNull
Observable<AbstractMap.SimpleImmutableEntry<String, Long>> getContainerLog(@NotNull String containerLogUrl, @NotNull String type, long logOffset, int size);

/**
* Get Spark job submission log observable
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,21 +449,30 @@ public Observable<SimpleImmutableEntry<String, Long>> getDriverLog(@NotNull Stri
return Observable.empty();
}

String logGot = JobUtils.getInformationFromYarnLogDom(
getSubmission().getCredentialsProvider(),
getCurrentLogUrl(),
type,
offset,
size);

if (StringUtils.isEmpty(logGot)) {
return Observable.empty();
}

return Observable.just(new SimpleImmutableEntry<>(logGot, offset));
return getContainerLog(getCurrentLogUrl(), type, offset, size);
});
}

@NotNull
@Override
public Observable<SimpleImmutableEntry<String, Long>> getContainerLog(@NotNull String containerLogUrl,
@NotNull String type,
long logOffset,
int size) {
String logGot = JobUtils.getInformationFromYarnLogDom(
getSubmission().getCredentialsProvider(),
containerLogUrl,
type,
logOffset,
size);

if (StringUtils.isEmpty(logGot)) {
return Observable.empty();
}

return Observable.just(new SimpleImmutableEntry<>(logGot, logOffset));
}

/**
* Parse host from host:port combination string
*
Expand Down Expand Up @@ -567,7 +576,7 @@ String getSparkJobApplicationId(URI batchBaseUri, int batchId) throws IOExceptio
/**
* Get Spark Job Yarn application with retries
*
* @param batchBaseUri the connection URI of HDInsight Livy batch job, http://livy:8998/batches, the function will help translate it to Yarn connection URI.
* @param yarnConnectUri the connection URI of HDInsight Livy batch job, http://livy:8998/batches, the function will help translate it to Yarn connection URI.
* @param applicationID the Yarn application ID
* @return the Yarn application got
* @throws IOException exceptions in transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ protected Observable<? extends ISparkBatchJob> prepareArtifact() {
return Observable.just(parentJob);
}

@Override
protected Observable<? extends ISparkBatchJob> submitJob(ISparkBatchJob parentJob) {
return Observable.just(parentJob);
}

@Override
public InputStream getInputStream() {
return stdOutInputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,26 @@
package com.microsoft.azure.hdinsight.spark.run;

import com.microsoft.azure.hdinsight.spark.common.ISparkBatchJob;
import com.microsoft.azure.hdinsight.spark.common.SparkBatchJob;
import com.microsoft.azuretools.azurecommons.helpers.NotNull;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Optional;


class SparkJobExecutorLogInputStream extends SparkJobLogInputStream {
private final String logUrl;

public SparkJobExecutorLogInputStream(@NotNull String logType, @NotNull String logUrl) {
super(logType);

setLogUrl(logUrl);
this.logUrl = logUrl;
}

@Override
protected synchronized Optional<SimpleImmutableEntry<String, Long>> fetchLog(long logOffset, int fetchSize) {
return getAttachedJob()
.map(job -> job.getContainerLog(getLogUrl(), getLogType(), logOffset, fetchSize)
.toBlocking().singleOrDefault(null));
}

@Override
Expand All @@ -40,4 +51,8 @@ public ISparkBatchJob attachJob(@NotNull ISparkBatchJob sparkJob) {

return sparkJob;
}

public String getLogUrl() {
return logUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
package com.microsoft.azure.hdinsight.spark.run;

import com.microsoft.azure.hdinsight.spark.common.ISparkBatchJob;
import com.microsoft.azure.hdinsight.spark.common.SparkBatchJob;
import com.microsoft.azure.hdinsight.spark.jobs.JobUtils;
import com.microsoft.azuretools.azurecommons.helpers.NotNull;
import com.microsoft.azuretools.azurecommons.helpers.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Optional;

Expand All @@ -43,8 +38,6 @@ public class SparkJobLogInputStream extends InputStream {
private String logType;
@Nullable
private ISparkBatchJob sparkBatchJob;
@Nullable
private String logUrl;

private long offset = 0;
@NotNull
Expand All @@ -61,7 +54,7 @@ public ISparkBatchJob attachJob(@NotNull ISparkBatchJob sparkJob) {
return sparkJob;
}

private synchronized Optional<SimpleImmutableEntry<String, Long>> fetchLog(long logOffset, int fetchSize) {
protected synchronized Optional<SimpleImmutableEntry<String, Long>> fetchLog(long logOffset, int fetchSize) {
return getAttachedJob()
.map(job -> job.getDriverLog(getLogType(), logOffset, fetchSize)
.toBlocking().singleOrDefault(null));
Expand Down Expand Up @@ -106,14 +99,6 @@ public int available() throws IOException {
}
}

void setLogUrl(@Nullable String logUrl) {
this.logUrl = logUrl;
}

public Optional<String> getLogUrl() {
return Optional.ofNullable(logUrl);
}

@NotNull
public String getLogType() {
return logType;
Expand Down