diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 034695237d83..d0724432f3c9 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -144,16 +144,23 @@ public static void main(String[] args) throws Exception { @VisibleForTesting public static void main(Function environmentVarGetter) throws Exception { JvmInitializers.runOnStartup(); - System.out.format("SDK Fn Harness started%n"); - System.out.format("Harness ID %s%n", environmentVarGetter.apply(HARNESS_ID)); - System.out.format( - "Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR)); - System.out.format( - "Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR)); - System.out.format( - "Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR)); + + Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = + getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR)); + Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = + getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR)); + Endpoints.ApiServiceDescriptor statusApiServiceDescriptor = + environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null + ? null + : getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR)); String id = environmentVarGetter.apply(HARNESS_ID); + System.out.format("SDK Fn Harness started%n"); + System.out.format("Harness ID %s%n", id); + System.out.format("Logging location %s%n", loggingApiServiceDescriptor); + System.out.format("Control location %s%n", controlApiServiceDescriptor); + System.out.format("Status location %s%n", statusApiServiceDescriptor); + String pipelineOptionsJson = environmentVarGetter.apply(PIPELINE_OPTIONS); // Try looking for a file first. If that exists it should override PIPELINE_OPTIONS to avoid // maxing out the kernel's environment space @@ -179,16 +186,6 @@ public static void main(Function environmentVarGetter) throws Ex PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson); - Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = - getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR)); - - Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = - getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR)); - - Endpoints.ApiServiceDescriptor statusApiServiceDescriptor = - environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null - ? null - : getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR)); String runnerCapabilitesOrNull = environmentVarGetter.apply(RUNNER_CAPABILITIES); Set runnerCapabilites = runnerCapabilitesOrNull == null @@ -415,7 +412,7 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) { // directExecutor() when building the channel. BeamFnControlClient control = new BeamFnControlClient( - controlStub.withExecutor(MoreExecutors.directExecutor()), + controlStub.withExecutor(MoreExecutors.directExecutor()).withWaitForReady(), outboundObserverFactory, executorService, handlers); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index 99a7ee64832e..97179115c047 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -214,7 +214,8 @@ public StreamWriter(ManagedChannel channel) { this.streamPhaser = new AdvancingPhaser(1); this.channel = channel; - BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel); + BeamFnLoggingGrpc.BeamFnLoggingStub stub = + BeamFnLoggingGrpc.newStub(channel).withWaitForReady(); this.inboundObserver = new LogControlObserver(); this.outboundObserver = new DirectStreamObserver(