Skip to content

Commit 180f3a6

Browse files
authored
Attempt to fix flaky GrpcLoggingServiceTest (#37891)
1 parent 2c99a01 commit 180f3a6

File tree

1 file changed

+55
-61
lines changed

1 file changed

+55
-61
lines changed

runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222

2323
import java.util.ArrayList;
2424
import java.util.Collection;
25-
import java.util.concurrent.BlockingQueue;
26-
import java.util.concurrent.Callable;
2725
import java.util.concurrent.ConcurrentLinkedQueue;
2826
import java.util.concurrent.CountDownLatch;
2927
import java.util.concurrent.ExecutorService;
@@ -63,39 +61,42 @@ public void testMultipleClientsSuccessfullyProcessed() throws Exception {
6361
GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
6462
try (GrpcFnServer<GrpcLoggingService> server =
6563
GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create())) {
66-
67-
Collection<Callable<Void>> tasks = new ArrayList<>();
64+
ExecutorService executorService = Executors.newCachedThreadPool();
65+
Collection<Future<?>> futures = new ArrayList<>();
66+
CountDownLatch waitForServerHangup = new CountDownLatch(3);
6867
for (int i = 1; i <= 3; ++i) {
6968
final int instructionId = i;
70-
tasks.add(
71-
() -> {
72-
CountDownLatch waitForServerHangup = new CountDownLatch(1);
73-
String url = server.getApiServiceDescriptor().getUrl();
74-
ManagedChannel channel = InProcessChannelBuilder.forName(url).build();
75-
StreamObserver<LogEntry.List> outboundObserver =
76-
BeamFnLoggingGrpc.newStub(channel)
77-
.logging(
78-
TestStreams.withOnNext(messageDiscarder)
79-
.withOnCompleted(new CountDown(waitForServerHangup))
80-
.build());
81-
outboundObserver.onNext(createLogsWithIds(instructionId, -instructionId));
82-
outboundObserver.onCompleted();
83-
waitForServerHangup.await();
84-
return null;
85-
});
69+
futures.add(
70+
executorService.submit(
71+
() -> {
72+
String url = server.getApiServiceDescriptor().getUrl();
73+
ManagedChannel channel = InProcessChannelBuilder.forName(url).build();
74+
StreamObserver<LogEntry.List> outboundObserver =
75+
BeamFnLoggingGrpc.newStub(channel)
76+
.logging(
77+
TestStreams.withOnNext(messageDiscarder)
78+
.withOnCompleted(new CountDown(waitForServerHangup))
79+
.build());
80+
outboundObserver.onNext(createLogsWithIds(instructionId, -instructionId));
81+
outboundObserver.onCompleted();
82+
}));
8683
}
87-
ExecutorService executorService = Executors.newCachedThreadPool();
88-
executorService.invokeAll(tasks);
89-
assertThat(
90-
logs,
91-
containsInAnyOrder(
92-
createLogWithId(1L),
93-
createLogWithId(2L),
94-
createLogWithId(3L),
95-
createLogWithId(-1L),
96-
createLogWithId(-2L),
97-
createLogWithId(-3L)));
84+
// Make sure all streams were created and issued client operations.
85+
for (Future<?> f : futures) {
86+
f.get();
87+
}
88+
// Ensure all the streams were completed as expected before closing the server.
89+
waitForServerHangup.await();
9890
}
91+
assertThat(
92+
logs,
93+
containsInAnyOrder(
94+
createLogWithId(1L),
95+
createLogWithId(2L),
96+
createLogWithId(3L),
97+
createLogWithId(-1L),
98+
createLogWithId(-2L),
99+
createLogWithId(-3L)));
99100
}
100101

101102
@Test
@@ -107,32 +108,23 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
107108
GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create())) {
108109

109110
CountDownLatch waitForTermination = new CountDownLatch(3);
110-
final BlockingQueue<StreamObserver<LogEntry.List>> outboundObservers =
111-
new LinkedBlockingQueue<>();
112-
Collection<Callable<Void>> tasks = new ArrayList<>();
113-
for (int i = 1; i <= 3; ++i) {
114-
final int instructionId = i;
115-
tasks.add(
116-
() -> {
117-
ManagedChannel channel =
118-
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl())
119-
.build();
120-
StreamObserver<LogEntry.List> outboundObserver =
121-
BeamFnLoggingGrpc.newStub(channel)
122-
.logging(
123-
TestStreams.withOnNext(messageDiscarder)
124-
.withOnError(new CountDown(waitForTermination))
125-
.build());
126-
outboundObserver.onNext(createLogsWithIds(instructionId, -instructionId));
127-
outboundObservers.add(outboundObserver);
128-
return null;
129-
});
111+
final Collection<StreamObserver<LogEntry.List>> outboundObservers = new ArrayList<>();
112+
// Create all the streams
113+
for (int instructionId = 1; instructionId <= 3; ++instructionId) {
114+
ManagedChannel channel =
115+
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl()).build();
116+
StreamObserver<LogEntry.List> outboundObserver =
117+
BeamFnLoggingGrpc.newStub(channel)
118+
.logging(
119+
TestStreams.withOnNext(messageDiscarder)
120+
.withOnError(new CountDown(waitForTermination))
121+
.build());
122+
outboundObserver.onNext(createLogsWithIds(instructionId, -instructionId));
123+
outboundObservers.add(outboundObserver);
130124
}
131-
ExecutorService executorService = Executors.newCachedThreadPool();
132-
executorService.invokeAll(tasks);
133125

134-
for (int i = 1; i <= 3; ++i) {
135-
outboundObservers.take().onError(new RuntimeException("Client " + i));
126+
for (StreamObserver<LogEntry.List> outboundObserver : outboundObservers) {
127+
outboundObserver.onError(new RuntimeException("Client"));
136128
}
137129
waitForTermination.await();
138130
}
@@ -142,19 +134,19 @@ public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Excep
142134
public void testServerCloseHangsUpClients() throws Exception {
143135
LinkedBlockingQueue<LogEntry> logs = new LinkedBlockingQueue<>();
144136
ExecutorService executorService = Executors.newCachedThreadPool();
145-
Collection<Future<Void>> futures = new ArrayList<>();
146137
final GrpcLoggingService service =
147138
GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
139+
CountDownLatch waitForServerHangup = new CountDownLatch(3);
148140
try (GrpcFnServer<GrpcLoggingService> server =
149141
GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create())) {
150142

143+
Collection<Future<?>> futures = new ArrayList<>();
151144
for (int i = 1; i <= 3; ++i) {
152145
final long instructionId = i;
153146
futures.add(
154147
executorService.submit(
155148
() -> {
156149
{
157-
CountDownLatch waitForServerHangup = new CountDownLatch(1);
158150
ManagedChannel channel =
159151
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl())
160152
.build();
@@ -165,19 +157,21 @@ public void testServerCloseHangsUpClients() throws Exception {
165157
.withOnCompleted(new CountDown(waitForServerHangup))
166158
.build());
167159
outboundObserver.onNext(createLogsWithIds(instructionId));
168-
waitForServerHangup.await();
169160
return null;
170161
}
171162
}));
172163
}
164+
// Ensure all the streams have started and sent their instruction.
165+
for (Future<?> f : futures) {
166+
f.get();
167+
}
173168
// Wait till each client has sent their message showing that they have connected.
174169
for (int i = 1; i <= 3; ++i) {
175170
logs.take();
176171
}
172+
// Close the server without closing the streams and ensure they observe the hangup.
177173
}
178-
for (Future<Void> future : futures) {
179-
future.get();
180-
}
174+
waitForServerHangup.await();
181175
}
182176

183177
private BeamFnApi.LogEntry.List createLogsWithIds(long... ids) {

0 commit comments

Comments
 (0)