diff --git a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java index 391f3d959..82e06736a 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java +++ b/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java @@ -185,6 +185,9 @@ public static ClientContext create(StubSettings settings) throws IOException { if (executorProvider.shouldAutoClose()) { backgroundResources.add(new ExecutorAsBackgroundResource(executor)); } + if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) { + backgroundResources.add(watchdog); + } return newBuilder() .setBackgroundResources(backgroundResources.build()) diff --git a/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java index 1533df71e..e22f14122 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/FixedWatchdogProvider.java @@ -82,4 +82,9 @@ public WatchdogProvider withExecutor(ScheduledExecutorService executor) { public Watchdog getWatchdog() { return watchdog; } + + @Override + public boolean shouldAutoClose() { + return false; + } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java index b4f7c4f57..b33c57048 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/InstantiatingWatchdogProvider.java @@ -33,7 +33,6 @@ import com.google.api.core.BetaApi; import com.google.common.base.Preconditions; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.threeten.bp.Duration; @@ -103,10 +102,11 @@ public Watchdog getWatchdog() { return null; } - Watchdog watchdog = new Watchdog(clock); - executor.scheduleAtFixedRate( - watchdog, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS); + return Watchdog.create(clock, checkInterval, executor); + } - return watchdog; + @Override + public boolean shouldAutoClose() { + return true; } } diff --git a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java index 9ffff65e9..ddf399ff7 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java +++ b/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java @@ -31,11 +31,15 @@ import com.google.api.core.ApiClock; import com.google.api.core.InternalApi; +import com.google.api.gax.core.BackgroundResource; import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.threeten.bp.Duration; @@ -56,15 +60,34 @@ * */ @InternalApi -public class Watchdog implements Runnable { +public class Watchdog implements Runnable, BackgroundResource { // Dummy value to convert the ConcurrentHashMap into a Set private static Object PRESENT = new Object(); private final ConcurrentHashMap openStreams = new ConcurrentHashMap<>(); private final ApiClock clock; + private final Duration scheduleInterval; + private final ScheduledExecutorService executor; + private ScheduledFuture future; + + /** returns a Watchdog which is scheduled at the provided interval. */ + public static Watchdog create( + ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { + Watchdog watchdog = new Watchdog(clock, scheduleInterval, executor); + watchdog.start(); + return watchdog; + } - public Watchdog(ApiClock clock) { + private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorService executor) { this.clock = Preconditions.checkNotNull(clock, "clock can't be null"); + this.scheduleInterval = scheduleInterval; + this.executor = executor; + } + + private void start() { + future = + executor.scheduleAtFixedRate( + this, scheduleInterval.toMillis(), scheduleInterval.toMillis(), TimeUnit.MILLISECONDS); } /** Wraps the target observer with timing constraints. */ @@ -98,6 +121,38 @@ public void run() { } } + @Override + public void shutdown() { + future.cancel(false); + executor.shutdown(); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated(); + } + + @Override + public void shutdownNow() { + future.cancel(true); + executor.shutdownNow(); + } + + @Override + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(duration, unit); + } + + @Override + public void close() { + shutdown(); + } + enum State { /** Stream has been started, but doesn't have any outstanding requests. */ IDLE, diff --git a/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java b/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java index 14126c26d..a1fa227e8 100644 --- a/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java +++ b/gax/src/main/java/com/google/api/gax/rpc/WatchdogProvider.java @@ -50,4 +50,6 @@ public interface WatchdogProvider { WatchdogProvider withExecutor(ScheduledExecutorService executor); Watchdog getWatchdog(); + + boolean shouldAutoClose(); } diff --git a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java index 2d0830a5e..17529fcb2 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/ClientContextTest.java @@ -29,6 +29,8 @@ */ package com.google.api.gax.rpc; +import static com.google.common.truth.Truth.assertThat; + import com.google.api.core.ApiClock; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.ExecutorProvider; @@ -287,4 +289,41 @@ private void runTest( Truth.assertThat(executor.shutdownCalled).isEqualTo(shouldAutoClose); } } + + @Test + public void testWatchdogProvider() throws IOException { + FakeClientSettings.Builder builder = new FakeClientSettings.Builder(); + + InterceptingExecutor executor = new InterceptingExecutor(1); + FakeTransportChannel transportChannel = FakeTransportChannel.create(new FakeChannel()); + FakeTransportProvider transportProvider = + new FakeTransportProvider(transportChannel, executor, true, null, null); + ApiClock clock = Mockito.mock(ApiClock.class); + + builder.setClock(clock); + builder.setCredentialsProvider( + FixedCredentialsProvider.create(Mockito.mock(Credentials.class))); + builder.setExecutorProvider(new FakeExecutorProvider(executor, true)); + builder.setTransportChannelProvider(transportProvider); + + Duration watchdogCheckInterval = Duration.ofSeconds(11); + builder.setWatchdogProvider( + InstantiatingWatchdogProvider.create() + .withClock(clock) + .withCheckInterval(watchdogCheckInterval) + .withExecutor(executor)); + builder.setWatchdogCheckInterval(watchdogCheckInterval); + + HeaderProvider headerProvider = Mockito.mock(HeaderProvider.class); + Mockito.when(headerProvider.getHeaders()).thenReturn(ImmutableMap.of("k1", "v1")); + HeaderProvider internalHeaderProvider = Mockito.mock(HeaderProvider.class); + + Mockito.when(internalHeaderProvider.getHeaders()).thenReturn(ImmutableMap.of("k2", "v2")); + builder.setHeaderProvider(headerProvider); + builder.setInternalHeaderProvider(internalHeaderProvider); + + ClientContext context = ClientContext.create(builder.build()); + List resources = context.getBackgroundResources(); + assertThat(resources.get(2)).isInstanceOf(Watchdog.class); + } } diff --git a/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java b/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java index ba7d476b9..de1fd43ab 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/FixedWatchdogProviderTest.java @@ -61,6 +61,7 @@ public void testNoModifications() { assertThat(provider.needsCheckInterval()).isFalse(); assertThat(provider.needsClock()).isFalse(); assertThat(provider.needsExecutor()).isFalse(); + assertThat(provider.shouldAutoClose()).isFalse(); Throwable actualError = null; try { diff --git a/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java b/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java index b7c87a620..0607dc717 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/InstantiatingWatchdogProviderTest.java @@ -60,6 +60,8 @@ public void happyPath() { assertThat(provider.needsCheckInterval()).isTrue(); provider = provider.withCheckInterval(checkInterval); + assertThat(provider.shouldAutoClose()).isTrue(); + Watchdog watchdog = provider.getWatchdog(); Mockito.verify(executor) .scheduleAtFixedRate( diff --git a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java index f1d4be8ff..b63e80cc6 100644 --- a/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java +++ b/gax/src/test/java/com/google/api/gax/rpc/WatchdogTest.java @@ -29,25 +29,34 @@ */ package com.google.api.gax.rpc; +import static com.google.common.truth.Truth.assertThat; + import com.google.api.core.SettableApiFuture; +import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.FakeApiClock; import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCall; import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCallable; import com.google.common.collect.Queues; -import com.google.common.truth.Truth; import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; import org.threeten.bp.Duration; @RunWith(JUnit4.class) public class WatchdogTest { + private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1); + private FakeApiClock clock; + private final Duration checkInterval = Duration.ofMillis(1000); private Duration waitTime = Duration.ofSeconds(10); private Duration idleTime = Duration.ofMinutes(5); @@ -59,7 +68,7 @@ public class WatchdogTest { @Before public void setUp() { clock = new FakeApiClock(0); - watchdog = new Watchdog(clock); + watchdog = Watchdog.create(clock, checkInterval, EXECUTOR); callable = new MockServerStreamingCallable<>(); innerObserver = new AccumulatingObserver<>(); @@ -70,7 +79,7 @@ public void setUp() { @Test public void testRequestPassthrough() throws Exception { innerObserver.controller.get().request(1); - Truth.assertThat(call.getController().popLastPull()).isEqualTo(1); + assertThat(call.getController().popLastPull()).isEqualTo(1); } @Test @@ -79,11 +88,11 @@ public void testWaitTimeout() throws Exception { clock.incrementNanoTime(waitTime.toNanos() - 1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isFalse(); + assertThat(call.getController().isCancelled()).isFalse(); clock.incrementNanoTime(1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isTrue(); + assertThat(call.getController().isCancelled()).isTrue(); call.getController() .getObserver() .onError(new RuntimeException("Some upstream exception representing cancellation")); @@ -94,18 +103,18 @@ public void testWaitTimeout() throws Exception { } catch (ExecutionException t) { actualError = t.getCause(); } - Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); + assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); } @Test public void testIdleTimeout() throws InterruptedException { clock.incrementNanoTime(idleTime.toNanos() - 1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isFalse(); + assertThat(call.getController().isCancelled()).isFalse(); clock.incrementNanoTime(1); watchdog.run(); - Truth.assertThat(call.getController().isCancelled()).isTrue(); + assertThat(call.getController().isCancelled()).isTrue(); call.getController() .getObserver() .onError(new RuntimeException("Some upstream exception representing cancellation")); @@ -116,7 +125,7 @@ public void testIdleTimeout() throws InterruptedException { } catch (ExecutionException t) { actualError = t.getCause(); } - Truth.assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); + assertThat(actualError).isInstanceOf(WatchdogTimeoutException.class); } @Test @@ -141,12 +150,12 @@ public void testMultiple() throws Exception { watchdog.run(); // Call1 should be ok - Truth.assertThat(call1.getController().isCancelled()).isFalse(); + assertThat(call1.getController().isCancelled()).isFalse(); // Should not throw - Truth.assertThat(downstreamObserver1.done.isDone()).isFalse(); + assertThat(downstreamObserver1.done.isDone()).isFalse(); // Call2 should be timed out - Truth.assertThat(call2.getController().isCancelled()).isTrue(); + assertThat(call2.getController().isCancelled()).isTrue(); call2.getController().getObserver().onError(new CancellationException("User cancelled")); Throwable error = null; try { @@ -154,7 +163,36 @@ public void testMultiple() throws Exception { } catch (ExecutionException t) { error = t.getCause(); } - Truth.assertThat(error).isInstanceOf(WatchdogTimeoutException.class); + assertThat(error).isInstanceOf(WatchdogTimeoutException.class); + } + + @Test + @SuppressWarnings("unchecked") + public void testWatchdogBeingClosed() { + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + mockExecutor.scheduleAtFixedRate( + Mockito.any(Watchdog.class), + Mockito.anyLong(), + Mockito.anyLong(), + Mockito.any(TimeUnit.class))) + .thenReturn(future); + Watchdog underTest = Watchdog.create(clock, checkInterval, mockExecutor); + assertThat(underTest).isInstanceOf(BackgroundResource.class); + + underTest.close(); + underTest.shutdown(); + + Mockito.verify(mockExecutor) + .scheduleAtFixedRate( + underTest, checkInterval.toMillis(), checkInterval.toMillis(), TimeUnit.MILLISECONDS); + Mockito.verify(future, Mockito.times(2)).cancel(false); + Mockito.verify(mockExecutor, Mockito.times(2)).shutdown(); + + underTest.shutdownNow(); + Mockito.verify(future).cancel(true); + Mockito.verify(mockExecutor).shutdownNow(); } static class AccumulatingObserver implements ResponseObserver {