diff --git a/.ci/doc/templates/print-result-array.tpl.java b/.ci/doc/templates/print-result-array.tpl.java index 9c751d0b..c4fd68cd 100644 --- a/.ci/doc/templates/print-result-array.tpl.java +++ b/.ci/doc/templates/print-result-array.tpl.java @@ -2,6 +2,7 @@ import io.kuzzle.sdk.Protocol.WebSocket; import io.kuzzle.sdk.Options.Protocol.WebSocketOptions; import io.kuzzle.sdk.Options.KuzzleOptions; +import io.kuzzle.sdk.Options.SubscribeOptions; import io.kuzzle.sdk.CoreClasses.Responses.Response; import java.util.concurrent.ConcurrentHashMap; diff --git a/.ci/doc/templates/print-result.tpl.java b/.ci/doc/templates/print-result.tpl.java index 12f427e3..86ff8f9c 100644 --- a/.ci/doc/templates/print-result.tpl.java +++ b/.ci/doc/templates/print-result.tpl.java @@ -2,6 +2,7 @@ import io.kuzzle.sdk.Protocol.WebSocket; import io.kuzzle.sdk.Options.Protocol.WebSocketOptions; import io.kuzzle.sdk.Options.KuzzleOptions; +import io.kuzzle.sdk.Options.SubscribeOptions; import io.kuzzle.sdk.Options.DocumentOptions; import io.kuzzle.sdk.CoreClasses.Responses.Response; diff --git a/doc/3/controllers/realtime/count/index.md b/doc/3/controllers/realtime/count/index.md new file mode 100644 index 00000000..36066ea6 --- /dev/null +++ b/doc/3/controllers/realtime/count/index.md @@ -0,0 +1,29 @@ +--- +code: true +type: page +title: count +description: Returns the number of other connections sharing the same subscription. +--- + +# count + +Returns the number of other connections sharing the same subscription. + +## Arguments + +```java +public CompletableFuture count(final String roomId) + throws NotConnectedException, InternalException +``` + +| Argument | Type | Description | +|-----------|-------------------|----------------------| +| `room_id` |
String
| Subscription room ID | + +## Return + +Returns the number of active connections using the same provided subscription room. + +## Usage + +<<< ./snippets/count.java diff --git a/doc/3/controllers/realtime/count/snippets/count.java b/doc/3/controllers/realtime/count/snippets/count.java new file mode 100644 index 00000000..f5748e60 --- /dev/null +++ b/doc/3/controllers/realtime/count/snippets/count.java @@ -0,0 +1,22 @@ +SubscribeOptions options = new SubscribeOptions(); +options.setSubscribeToSelf(true); + +ConcurrentHashMap filters = new ConcurrentHashMap<>(); +filters.put("exists", "name"); + +ConcurrentHashMap document = new ConcurrentHashMap<>(); +document.put("name", "nina-vkote"); + +final String roomId = kuzzle.getRealtimeController().subscribe( + "nyc-open-data", + "yellow-taxi", + filters, + notification -> { + if (notification.scope.equals(SubscribeOptions.Scope.IN.toString())) { + System.out.println("Document entered the scope"); + } else { + System.out.println("Document left the scope"); + } +}).get(); + +final Integer result = kuzzle.getRealtimeController().count(roomId).get(); \ No newline at end of file diff --git a/doc/3/controllers/realtime/count/snippets/count.test.yml b/doc/3/controllers/realtime/count/snippets/count.test.yml new file mode 100644 index 00000000..0392ea47 --- /dev/null +++ b/doc/3/controllers/realtime/count/snippets/count.test.yml @@ -0,0 +1,7 @@ +name: Realtime#count +description: Counts subscribers for a subscription room. +hooks: + before: + after: +template: print-result +expected: 1 \ No newline at end of file diff --git a/doc/3/controllers/realtime/publish/index.md b/doc/3/controllers/realtime/publish/index.md new file mode 100644 index 00000000..9df4a740 --- /dev/null +++ b/doc/3/controllers/realtime/publish/index.md @@ -0,0 +1,34 @@ +--- +code: true +type: page +title: publish +description: Publishes a real-time message. +--- + +# publish + +Sends a real-time message to Kuzzle. The message will be broadcasted to all clients with subscriptions matching the index, the collection and the message content. + +The index and collection are indicative and serve only to distinguish the rooms. They are not required to exist in the database. + +**Note:** real-time messages are not persisted in the database. + +## Arguments + +```java +public CompletableFuture publish( + final String index, + final String collection, + final ConcurrentHashMap message) + throws NotConnectedException, InternalException +``` + +| Argument | Type | Description | +|--------------|--------------------|-------------------------------------| +| `index` |
String
| Index name | +| `collection` |
String
| Collection name | +| `message` |
ConcurrentHashMap
| ConcurrentHashMap representing a JSON payload | + +## Usage + +<<< ./snippets/publish.java diff --git a/doc/3/controllers/realtime/publish/snippets/publish.java b/doc/3/controllers/realtime/publish/snippets/publish.java new file mode 100644 index 00000000..db209db1 --- /dev/null +++ b/doc/3/controllers/realtime/publish/snippets/publish.java @@ -0,0 +1,4 @@ +ConcurrentHashMap document = new ConcurrentHashMap<>(); +document.put("name", "nina-vkote"); + +kuzzle.getRealtimeController().publish("my-index", "my-collection", document); \ No newline at end of file diff --git a/doc/3/controllers/realtime/publish/snippets/publish.test.yml b/doc/3/controllers/realtime/publish/snippets/publish.test.yml new file mode 100644 index 00000000..579119b6 --- /dev/null +++ b/doc/3/controllers/realtime/publish/snippets/publish.test.yml @@ -0,0 +1,7 @@ +name: Realtime#publish +description: Publishes a realtime message. +hooks: + before: + after: +template: default +expected: Success \ No newline at end of file diff --git a/doc/3/controllers/realtime/unsubscribe/index.md b/doc/3/controllers/realtime/unsubscribe/index.md new file mode 100644 index 00000000..aeb14d05 --- /dev/null +++ b/doc/3/controllers/realtime/unsubscribe/index.md @@ -0,0 +1,25 @@ +--- +code: true +type: page +title: unsubscribe +description: Removes a subscription. +--- + +# unsubscribe + +Removes a subscription. + +## Arguments + +```java +public CompletableFuture unsubscribe(final String roomId) + throws NotConnectedException, InternalException +``` + +| Argument | Type | Description | +|-----------|--------------------|----------------------| +| `room_id` |
String
| Subscription room ID | + +## Usage + +<<< ./snippets/unsubscribe.java diff --git a/doc/3/controllers/realtime/unsubscribe/snippets/unsubscribe.java b/doc/3/controllers/realtime/unsubscribe/snippets/unsubscribe.java new file mode 100644 index 00000000..8efafdba --- /dev/null +++ b/doc/3/controllers/realtime/unsubscribe/snippets/unsubscribe.java @@ -0,0 +1,22 @@ +SubscribeOptions options = new SubscribeOptions(); +options.setSubscribeToSelf(true); + +ConcurrentHashMap filters = new ConcurrentHashMap<>(); +filters.put("exists", "name"); + +ConcurrentHashMap document = new ConcurrentHashMap<>(); +document.put("name", "nina-vkote"); + +final String roomId = kuzzle.getRealtimeController().subscribe( + "nyc-open-data", + "yellow-taxi", + filters, + notification -> { + if (notification.scope.equals(SubscribeOptions.Scope.IN.toString())) { + System.out.println("Document entered the scope"); + } else { + System.out.println("Document left the scope"); + } +}).get(); + +kuzzle.getRealtimeController().unsubscribe(roomId).get(); \ No newline at end of file diff --git a/doc/3/controllers/realtime/unsubscribe/snippets/unsubscribe.test.yml b/doc/3/controllers/realtime/unsubscribe/snippets/unsubscribe.test.yml new file mode 100644 index 00000000..909e8388 --- /dev/null +++ b/doc/3/controllers/realtime/unsubscribe/snippets/unsubscribe.test.yml @@ -0,0 +1,7 @@ +name: Realtime#unsubscribe +description: Removes a subscription. +hooks: + before: + after: +template: default +expected: Success \ No newline at end of file diff --git a/src/main/java/io/kuzzle/sdk/API/Controllers/RealtimeController.java b/src/main/java/io/kuzzle/sdk/API/Controllers/RealtimeController.java index eae90e18..d1aa0ca2 100644 --- a/src/main/java/io/kuzzle/sdk/API/Controllers/RealtimeController.java +++ b/src/main/java/io/kuzzle/sdk/API/Controllers/RealtimeController.java @@ -54,6 +54,58 @@ public RealtimeController(Kuzzle kuzzle) { }); } + /** + * Returns the number of other connections sharing the same subscription. + * + * @param roomId + * @return + * @throws NotConnectedException + * @throws InternalException + */ + public CompletableFuture count(final String roomId) throws NotConnectedException, InternalException { + return kuzzle + .query(new KuzzleMap() + .put("controller", "realtime") + .put("action", "count") + .put("body", new KuzzleMap().put("room_id", roomId))) + .thenApplyAsync((response) -> ((KuzzleMap) response.result).getNumber("count").intValue()); + } + + /** + * Sends a real-time message to Kuzzle. The message will be dispatched to + * all clients with subscriptions matching the index, the collection and + * the message content. + * + * @param index + * @param collection + * @param message + * @return + * @throws NotConnectedException + * @throws InternalException + */ + public CompletableFuture publish(final String index, final String collection, final ConcurrentHashMap message) throws NotConnectedException, InternalException { + return kuzzle + .query(new KuzzleMap() + .put("controller", "realtime") + .put("action", "publish") + .put("index", index) + .put("collection", collection) + .put("body", new KuzzleMap().put("message", message))) + .thenApplyAsync((response) -> null); + } + + /** + * Subscribe to a collection. + * + * @param index + * @param collection + * @param filters + * @param handler + * @param options + * @return + * @throws NotConnectedException + * @throws InternalException + */ public CompletableFuture subscribe(final String index, final String collection, final ConcurrentHashMap filters, final NotificationHandler handler, final SubscribeOptions options) throws NotConnectedException, InternalException { ConcurrentHashMap queryOptions = new ConcurrentHashMap<>(); boolean subscribeToSelf = true; @@ -98,4 +150,27 @@ public CompletableFuture subscribe(final String index, final String coll public CompletableFuture subscribe(final String index, final String collection, final ConcurrentHashMap filters, final NotificationHandler handler) throws NotConnectedException, InternalException { return this.subscribe(index, collection, filters, handler, null); } + + /** + * Removes a subscription + * + * @param roomId + * @return + * @throws NotConnectedException + * @throws InternalException + */ + public CompletableFuture unsubscribe(final String roomId) throws NotConnectedException, InternalException { + return kuzzle + .query(new KuzzleMap() + .put("controller", "realtime") + .put("action", "unsubscribe") + .put("body", new KuzzleMap().put("roomId", roomId))) + .thenApplyAsync((response) -> { + ArrayList subs = subscriptions.get(roomId); + if (subs != null) { + subscriptions.get(roomId).clear(); + } + return null; + }); + } } diff --git a/src/test/java/io/kuzzle/test/API/Controllers/RealtimeControllerTest.java b/src/test/java/io/kuzzle/test/API/Controllers/RealtimeControllerTest.java index d8859ef1..d04c03e0 100644 --- a/src/test/java/io/kuzzle/test/API/Controllers/RealtimeControllerTest.java +++ b/src/test/java/io/kuzzle/test/API/Controllers/RealtimeControllerTest.java @@ -23,6 +23,41 @@ public class RealtimeControllerTest { private AbstractProtocol networkProtocol = Mockito.mock(WebSocket.class); + @Test + public void countTest() throws NotConnectedException, InternalException { + Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol)); + NotificationHandler notificationHandler = mock(NotificationHandler.class); + + ArgumentCaptor arg = ArgumentCaptor.forClass(KuzzleMap.class); + + kuzzleSpy.getRealtimeController().count("roomId"); + verify(kuzzleSpy).query((KuzzleMap) arg.capture()); + + assertEquals("realtime", ((KuzzleMap) arg.getValue()).getString("controller")); + assertEquals("count", ((KuzzleMap) arg.getValue()).getString("action")); + assertEquals("roomId", ((KuzzleMap)((KuzzleMap) arg.getValue()).get("body")).getString("room_id")); + } + + @Test + public void publishTest() throws NotConnectedException, InternalException { + Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol)); + NotificationHandler notificationHandler = mock(NotificationHandler.class); + + ArgumentCaptor arg = ArgumentCaptor.forClass(KuzzleMap.class); + + ConcurrentHashMap message = new ConcurrentHashMap<>(); + message.put("foo", "bar"); + + kuzzleSpy.getRealtimeController().publish("index", "collection", message); + verify(kuzzleSpy).query((KuzzleMap) arg.capture()); + + assertEquals("realtime", ((KuzzleMap) arg.getValue()).getString("controller")); + assertEquals("publish", ((KuzzleMap) arg.getValue()).getString("action")); + assertEquals("index", ((KuzzleMap) arg.getValue()).getString("index")); + assertEquals("collection", ((KuzzleMap) arg.getValue()).getString("collection")); + assertEquals("bar", ((ConcurrentHashMap)((KuzzleMap)((KuzzleMap) arg.getValue()).get("body")).get("message")).get("foo").toString()); + } + @Test public void subscribeTest() throws NotConnectedException, InternalException { Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol)); @@ -83,4 +118,19 @@ public void notificationHandlerWithoutSubscribeToSelfTest() throws NotConnectedE kuzzleSpy.trigger(Event.unhandledResponse, response); verify(notificationHandler, never()).run(any(Response.class)); } + + @Test + public void unsubscribeTest() throws NotConnectedException, InternalException { + Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol)); + NotificationHandler notificationHandler = mock(NotificationHandler.class); + + ArgumentCaptor arg = ArgumentCaptor.forClass(KuzzleMap.class); + + kuzzleSpy.getRealtimeController().unsubscribe("roomId"); + verify(kuzzleSpy).query((KuzzleMap) arg.capture()); + + assertEquals("realtime", ((KuzzleMap) arg.getValue()).getString("controller")); + assertEquals("unsubscribe", ((KuzzleMap) arg.getValue()).getString("action")); + assertEquals("roomId", ((KuzzleMap)((KuzzleMap) arg.getValue()).get("body")).getString("roomId")); + } }