Skip to content

Commit 7bcf536

Browse files
committed
fix: fix flaky test testNonBlockingWithMultipleMessages
1 parent 31c40fb commit 7bcf536

File tree

2 files changed

+90
-90
lines changed

2 files changed

+90
-90
lines changed

tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java

Lines changed: 89 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,144 +1274,144 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio
12741274
@Test
12751275
@Timeout(value = 1, unit = TimeUnit.MINUTES)
12761276
public void testNonBlockingWithMultipleMessages() throws Exception {
1277-
String multiEventTaskId = "multi-event-test";
1277+
String multiEventTaskId = "multi-event-test-" + java.util.UUID.randomUUID();
12781278
try {
1279-
// 1. Send first non-blocking message to create task in WORKING state
1280-
Message message1 = Message.builder(MESSAGE)
1279+
// 1. Send first non-blocking message to create task in WORKING state
1280+
Message message1 = Message.builder(MESSAGE)
12811281
.taskId(multiEventTaskId)
12821282
.contextId("test-context")
12831283
.parts(new TextPart("First request"))
12841284
.build();
12851285

1286-
AtomicReference<String> taskIdRef = new AtomicReference<>();
1287-
CountDownLatch firstTaskLatch = new CountDownLatch(1);
1286+
AtomicReference<String> taskIdRef = new AtomicReference<>();
1287+
CountDownLatch firstTaskLatch = new CountDownLatch(1);
12881288

1289-
BiConsumer<ClientEvent, AgentCard> firstMessageConsumer = (event, agentCard) -> {
1290-
if (event instanceof TaskEvent te) {
1291-
taskIdRef.set(te.getTask().id());
1292-
firstTaskLatch.countDown();
1293-
} else if (event instanceof TaskUpdateEvent tue && tue.getUpdateEvent() instanceof TaskStatusUpdateEvent status) {
1294-
taskIdRef.set(status.taskId());
1295-
firstTaskLatch.countDown();
1296-
}
1297-
};
1298-
1299-
// Non-blocking message creates task in WORKING state and returns immediately
1300-
// Queue stays open because task is not in final state
1301-
getPollingClient().sendMessage(message1, List.of(firstMessageConsumer), null);
1302-
1303-
assertTrue(firstTaskLatch.await(10, TimeUnit.SECONDS));
1304-
String taskId = taskIdRef.get();
1305-
assertNotNull(taskId);
1306-
assertEquals(multiEventTaskId, taskId);
1307-
1308-
// 2. Resubscribe to task (queue should still be open)
1309-
CountDownLatch resubEventLatch = new CountDownLatch(2); // artifact-2 + completion
1310-
List<io.a2a.spec.UpdateEvent> resubReceivedEvents = new CopyOnWriteArrayList<>();
1311-
AtomicBoolean resubUnexpectedEvent = new AtomicBoolean(false);
1312-
AtomicReference<Throwable> resubErrorRef = new AtomicReference<>();
1289+
BiConsumer<ClientEvent, AgentCard> firstMessageConsumer = (event, agentCard) -> {
1290+
if (event instanceof TaskEvent te) {
1291+
taskIdRef.set(te.getTask().id());
1292+
firstTaskLatch.countDown();
1293+
} else if (event instanceof TaskUpdateEvent tue && tue.getUpdateEvent() instanceof TaskStatusUpdateEvent status) {
1294+
taskIdRef.set(status.taskId());
1295+
firstTaskLatch.countDown();
1296+
}
1297+
};
13131298

1314-
BiConsumer<ClientEvent, AgentCard> resubConsumer = (event, agentCard) -> {
1315-
if (event instanceof TaskUpdateEvent tue) {
1316-
resubReceivedEvents.add(tue.getUpdateEvent());
1317-
resubEventLatch.countDown();
1318-
} else {
1319-
resubUnexpectedEvent.set(true);
1320-
}
1321-
};
1299+
// Non-blocking message creates task in WORKING state and returns immediately
1300+
// Queue stays open because task is not in final state
1301+
getPollingClient().sendMessage(message1, List.of(firstMessageConsumer), null);
1302+
1303+
assertTrue(firstTaskLatch.await(10, TimeUnit.SECONDS));
1304+
String taskId = taskIdRef.get();
1305+
assertNotNull(taskId);
1306+
assertEquals(multiEventTaskId, taskId);
1307+
1308+
// 2. Resubscribe to task (queue should still be open)
1309+
CountDownLatch resubEventLatch = new CountDownLatch(2); // artifact-2 + completion
1310+
List<io.a2a.spec.UpdateEvent> resubReceivedEvents = new CopyOnWriteArrayList<>();
1311+
AtomicBoolean resubUnexpectedEvent = new AtomicBoolean(false);
1312+
AtomicReference<Throwable> resubErrorRef = new AtomicReference<>();
1313+
1314+
BiConsumer<ClientEvent, AgentCard> resubConsumer = (event, agentCard) -> {
1315+
if (event instanceof TaskUpdateEvent tue) {
1316+
resubReceivedEvents.add(tue.getUpdateEvent());
1317+
resubEventLatch.countDown();
1318+
} else {
1319+
resubUnexpectedEvent.set(true);
1320+
}
1321+
};
13221322

1323-
Consumer<Throwable> resubErrorHandler = error -> {
1324-
if (!isStreamClosedError(error)) {
1325-
resubErrorRef.set(error);
1326-
}
1327-
};
1323+
Consumer<Throwable> resubErrorHandler = error -> {
1324+
if (!isStreamClosedError(error)) {
1325+
resubErrorRef.set(error);
1326+
}
1327+
};
13281328

1329-
// Wait for subscription to be active
1330-
CountDownLatch subscriptionLatch = new CountDownLatch(1);
1331-
awaitStreamingSubscription()
1329+
// Wait for subscription to be active
1330+
CountDownLatch subscriptionLatch = new CountDownLatch(1);
1331+
awaitStreamingSubscription()
13321332
.whenComplete((unused, throwable) -> subscriptionLatch.countDown());
13331333

1334-
getClient().resubscribe(new TaskIdParams(taskId),
1334+
getClient().resubscribe(new TaskIdParams(taskId),
13351335
List.of(resubConsumer),
13361336
resubErrorHandler);
13371337

1338-
assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS));
1338+
assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS));
13391339

1340-
// 3. Send second streaming message to same taskId
1341-
Message message2 = Message.builder(MESSAGE)
1340+
// 3. Send second streaming message to same taskId
1341+
Message message2 = Message.builder(MESSAGE)
13421342
.taskId(multiEventTaskId) // Same taskId
13431343
.contextId("test-context")
13441344
.parts(new TextPart("Second request"))
13451345
.build();
13461346

1347-
CountDownLatch streamEventLatch = new CountDownLatch(2); // artifact-2 + completion
1348-
List<io.a2a.spec.UpdateEvent> streamReceivedEvents = new CopyOnWriteArrayList<>();
1349-
AtomicBoolean streamUnexpectedEvent = new AtomicBoolean(false);
1347+
CountDownLatch streamEventLatch = new CountDownLatch(2); // artifact-2 + completion
1348+
List<io.a2a.spec.UpdateEvent> streamReceivedEvents = new CopyOnWriteArrayList<>();
1349+
AtomicBoolean streamUnexpectedEvent = new AtomicBoolean(false);
13501350

1351-
BiConsumer<ClientEvent, AgentCard> streamConsumer = (event, agentCard) -> {
1352-
if (event instanceof TaskUpdateEvent tue) {
1353-
streamReceivedEvents.add(tue.getUpdateEvent());
1354-
streamEventLatch.countDown();
1355-
} else {
1356-
streamUnexpectedEvent.set(true);
1357-
}
1358-
};
1351+
BiConsumer<ClientEvent, AgentCard> streamConsumer = (event, agentCard) -> {
1352+
if (event instanceof TaskUpdateEvent tue) {
1353+
streamReceivedEvents.add(tue.getUpdateEvent());
1354+
streamEventLatch.countDown();
1355+
} else {
1356+
streamUnexpectedEvent.set(true);
1357+
}
1358+
};
13591359

1360-
// Streaming message adds artifact-2 and completes task
1361-
getClient().sendMessage(message2, List.of(streamConsumer), null);
1360+
// Streaming message adds artifact-2 and completes task
1361+
getClient().sendMessage(message2, List.of(streamConsumer), null);
13621362

1363-
// 4. Verify both consumers received artifact-2 and completion
1364-
assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS));
1365-
assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS));
1363+
// 4. Verify both consumers received artifact-2 and completion
1364+
assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS));
1365+
assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS));
13661366

1367-
assertFalse(resubUnexpectedEvent.get());
1368-
assertFalse(streamUnexpectedEvent.get());
1369-
assertNull(resubErrorRef.get());
1367+
assertFalse(resubUnexpectedEvent.get());
1368+
assertFalse(streamUnexpectedEvent.get());
1369+
assertNull(resubErrorRef.get());
13701370

1371-
// Both should have received 2 events: artifact-2 and completion
1372-
assertEquals(2, resubReceivedEvents.size());
1373-
assertEquals(2, streamReceivedEvents.size());
1371+
// Both should have received 2 events: artifact-2 and completion
1372+
assertEquals(2, resubReceivedEvents.size());
1373+
assertEquals(2, streamReceivedEvents.size());
13741374

1375-
// Verify resubscription events
1376-
long resubArtifactCount = resubReceivedEvents.stream()
1375+
// Verify resubscription events
1376+
long resubArtifactCount = resubReceivedEvents.stream()
13771377
.filter(e -> e instanceof TaskArtifactUpdateEvent)
13781378
.count();
1379-
assertEquals(1, resubArtifactCount);
1379+
assertEquals(1, resubArtifactCount);
13801380

1381-
long resubCompletionCount = resubReceivedEvents.stream()
1381+
long resubCompletionCount = resubReceivedEvents.stream()
13821382
.filter(e -> e instanceof TaskStatusUpdateEvent)
13831383
.filter(e -> ((TaskStatusUpdateEvent) e).isFinal())
13841384
.count();
1385-
assertEquals(1, resubCompletionCount);
1385+
assertEquals(1, resubCompletionCount);
13861386

1387-
// Verify streaming events
1388-
long streamArtifactCount = streamReceivedEvents.stream()
1387+
// Verify streaming events
1388+
long streamArtifactCount = streamReceivedEvents.stream()
13891389
.filter(e -> e instanceof TaskArtifactUpdateEvent)
13901390
.count();
1391-
assertEquals(1, streamArtifactCount);
1391+
assertEquals(1, streamArtifactCount);
13921392

1393-
long streamCompletionCount = streamReceivedEvents.stream()
1393+
long streamCompletionCount = streamReceivedEvents.stream()
13941394
.filter(e -> e instanceof TaskStatusUpdateEvent)
13951395
.filter(e -> ((TaskStatusUpdateEvent) e).isFinal())
13961396
.count();
1397-
assertEquals(1, streamCompletionCount);
1397+
assertEquals(1, streamCompletionCount);
13981398

1399-
// Verify artifact-2 details from resubscription
1400-
TaskArtifactUpdateEvent resubArtifact = (TaskArtifactUpdateEvent) resubReceivedEvents.stream()
1399+
// Verify artifact-2 details from resubscription
1400+
TaskArtifactUpdateEvent resubArtifact = (TaskArtifactUpdateEvent) resubReceivedEvents.stream()
14011401
.filter(e -> e instanceof TaskArtifactUpdateEvent)
14021402
.findFirst()
14031403
.orElseThrow();
1404-
assertEquals("artifact-2", resubArtifact.artifact().artifactId());
1405-
assertEquals("Second message artifact",
1404+
assertEquals("artifact-2", resubArtifact.artifact().artifactId());
1405+
assertEquals("Second message artifact",
14061406
((TextPart) resubArtifact.artifact().parts().get(0)).text());
14071407

1408-
// Verify artifact-2 details from streaming
1409-
TaskArtifactUpdateEvent streamArtifact = (TaskArtifactUpdateEvent) streamReceivedEvents.stream()
1408+
// Verify artifact-2 details from streaming
1409+
TaskArtifactUpdateEvent streamArtifact = (TaskArtifactUpdateEvent) streamReceivedEvents.stream()
14101410
.filter(e -> e instanceof TaskArtifactUpdateEvent)
14111411
.findFirst()
14121412
.orElseThrow();
1413-
assertEquals("artifact-2", streamArtifact.artifact().artifactId());
1414-
assertEquals("Second message artifact",
1413+
assertEquals("artifact-2", streamArtifact.artifact().artifactId());
1414+
assertEquals("Second message artifact",
14151415
((TextPart) streamArtifact.artifact().parts().get(0)).text());
14161416
} finally {
14171417
deleteTaskInTaskStore(multiEventTaskId);

tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public void execute(RequestContext context, EventQueue eventQueue) throws A2AErr
2727
String taskId = context.getTaskId();
2828

2929
// Special handling for multi-event test
30-
if ("multi-event-test".equals(taskId)) {
30+
if (taskId != null && taskId.startsWith("multi-event-test")) {
3131
// First call: context.getTask() == null (new task)
3232
if (context.getTask() == null) {
3333
updater.startWork();

0 commit comments

Comments
 (0)