Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci/doc/templates/print-result-array.tpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import io.kuzzle.sdk.Protocol.WebSocket;
import io.kuzzle.sdk.Options.Protocol.WebSocketOptions;
import io.kuzzle.sdk.Options.KuzzleOptions;
import io.kuzzle.sdk.Options.SubscribeOptions;
import io.kuzzle.sdk.CoreClasses.Responses.Response;

import java.util.concurrent.ConcurrentHashMap;
Expand Down
1 change: 1 addition & 0 deletions .ci/doc/templates/print-result.tpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import io.kuzzle.sdk.Protocol.WebSocket;
import io.kuzzle.sdk.Options.Protocol.WebSocketOptions;
import io.kuzzle.sdk.Options.KuzzleOptions;
import io.kuzzle.sdk.Options.SubscribeOptions;
import io.kuzzle.sdk.Options.DocumentOptions;
import io.kuzzle.sdk.CoreClasses.Responses.Response;

Expand Down
29 changes: 29 additions & 0 deletions doc/3/controllers/realtime/count/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
code: true
type: page
title: count
description: Returns the number of other connections sharing the same subscription.
---

# count

Returns the number of other connections sharing the same subscription.

## Arguments

```java
public CompletableFuture<Integer> count(final String roomId)
throws NotConnectedException, InternalException
```

| Argument | Type | Description |
|-----------|-------------------|----------------------|
| `room_id` | <pre>String</pre> | Subscription room ID |

## Return

Returns the number of active connections using the same provided subscription room.

## Usage

<<< ./snippets/count.java
22 changes: 22 additions & 0 deletions doc/3/controllers/realtime/count/snippets/count.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
SubscribeOptions options = new SubscribeOptions();
options.setSubscribeToSelf(true);

ConcurrentHashMap<String, Object> filters = new ConcurrentHashMap<>();
filters.put("exists", "name");

ConcurrentHashMap<String, Object> document = new ConcurrentHashMap<>();
document.put("name", "nina-vkote");

final String roomId = kuzzle.getRealtimeController().subscribe(
"nyc-open-data",
"yellow-taxi",
filters,
notification -> {
if (notification.scope.equals(SubscribeOptions.Scope.IN.toString())) {
System.out.println("Document entered the scope");
} else {
System.out.println("Document left the scope");
}
}).get();

final Integer result = kuzzle.getRealtimeController().count(roomId).get();
7 changes: 7 additions & 0 deletions doc/3/controllers/realtime/count/snippets/count.test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name: Realtime#count
description: Counts subscribers for a subscription room.
hooks:
before:
after:
template: print-result
expected: 1
34 changes: 34 additions & 0 deletions doc/3/controllers/realtime/publish/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
code: true
type: page
title: publish
description: Publishes a real-time message.
---

# publish

Sends a real-time message to Kuzzle. The message will be broadcasted to all clients with subscriptions matching the index, the collection and the message content.

The index and collection are indicative and serve only to distinguish the rooms. They are not required to exist in the database.

**Note:** real-time messages are not persisted in the database.

## Arguments

```java
public CompletableFuture<Integer> publish(
final String index,
final String collection,
final ConcurrentHashMap<String, Object> message)
throws NotConnectedException, InternalException
```

| Argument | Type | Description |
|--------------|--------------------|-------------------------------------|
| `index` | <pre>String</pre> | Index name |
| `collection` | <pre>String</pre> | Collection name |
| `message` | <pre>ConcurrentHashMap<String, Object></pre> | ConcurrentHashMap representing a JSON payload |

## Usage

<<< ./snippets/publish.java
4 changes: 4 additions & 0 deletions doc/3/controllers/realtime/publish/snippets/publish.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ConcurrentHashMap<String, Object> document = new ConcurrentHashMap<>();
document.put("name", "nina-vkote");

kuzzle.getRealtimeController().publish("my-index", "my-collection", document);
7 changes: 7 additions & 0 deletions doc/3/controllers/realtime/publish/snippets/publish.test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name: Realtime#publish
description: Publishes a realtime message.
hooks:
before:
after:
template: default
expected: Success
25 changes: 25 additions & 0 deletions doc/3/controllers/realtime/unsubscribe/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
code: true
type: page
title: unsubscribe
description: Removes a subscription.
---

# unsubscribe

Removes a subscription.

## Arguments

```java
public CompletableFuture<Void> unsubscribe(final String roomId)
throws NotConnectedException, InternalException
```

| Argument | Type | Description |
|-----------|--------------------|----------------------|
| `room_id` | <pre>String</pre> | Subscription room ID |

## Usage

<<< ./snippets/unsubscribe.java
22 changes: 22 additions & 0 deletions doc/3/controllers/realtime/unsubscribe/snippets/unsubscribe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
SubscribeOptions options = new SubscribeOptions();
options.setSubscribeToSelf(true);

ConcurrentHashMap<String, Object> filters = new ConcurrentHashMap<>();
filters.put("exists", "name");

ConcurrentHashMap<String, Object> document = new ConcurrentHashMap<>();
document.put("name", "nina-vkote");

final String roomId = kuzzle.getRealtimeController().subscribe(
"nyc-open-data",
"yellow-taxi",
filters,
notification -> {
if (notification.scope.equals(SubscribeOptions.Scope.IN.toString())) {
System.out.println("Document entered the scope");
} else {
System.out.println("Document left the scope");
}
}).get();

kuzzle.getRealtimeController().unsubscribe(roomId).get();
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name: Realtime#unsubscribe
description: Removes a subscription.
hooks:
before:
after:
template: default
expected: Success
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,58 @@ public RealtimeController(Kuzzle kuzzle) {
});
}

/**
* Returns the number of other connections sharing the same subscription.
*
* @param roomId
* @return
* @throws NotConnectedException
* @throws InternalException
*/
public CompletableFuture<Integer> count(final String roomId) throws NotConnectedException, InternalException {
return kuzzle
.query(new KuzzleMap()
.put("controller", "realtime")
.put("action", "count")
.put("body", new KuzzleMap().put("room_id", roomId)))
.thenApplyAsync((response) -> ((KuzzleMap) response.result).getNumber("count").intValue());
}

/**
* Sends a real-time message to Kuzzle. The message will be dispatched to
* all clients with subscriptions matching the index, the collection and
* the message content.
*
* @param index
* @param collection
* @param message
* @return
* @throws NotConnectedException
* @throws InternalException
*/
public CompletableFuture<Void> publish(final String index, final String collection, final ConcurrentHashMap<String, Object> message) throws NotConnectedException, InternalException {
return kuzzle
.query(new KuzzleMap()
.put("controller", "realtime")
.put("action", "publish")
.put("index", index)
.put("collection", collection)
.put("body", new KuzzleMap().put("message", message)))
.thenApplyAsync((response) -> null);
}

/**
* Subscribe to a collection.
*
* @param index
* @param collection
* @param filters
* @param handler
* @param options
* @return
* @throws NotConnectedException
* @throws InternalException
*/
public CompletableFuture<String> subscribe(final String index, final String collection, final ConcurrentHashMap<String, Object> filters, final NotificationHandler handler, final SubscribeOptions options) throws NotConnectedException, InternalException {
ConcurrentHashMap<String, Object> queryOptions = new ConcurrentHashMap<>();
boolean subscribeToSelf = true;
Expand Down Expand Up @@ -98,4 +150,27 @@ public CompletableFuture<String> subscribe(final String index, final String coll
public CompletableFuture<String> subscribe(final String index, final String collection, final ConcurrentHashMap<String, Object> filters, final NotificationHandler handler) throws NotConnectedException, InternalException {
return this.subscribe(index, collection, filters, handler, null);
}

/**
* Removes a subscription
*
* @param roomId
* @return
* @throws NotConnectedException
* @throws InternalException
*/
public CompletableFuture<Void> unsubscribe(final String roomId) throws NotConnectedException, InternalException {
return kuzzle
.query(new KuzzleMap()
.put("controller", "realtime")
.put("action", "unsubscribe")
.put("body", new KuzzleMap().put("roomId", roomId)))
.thenApplyAsync((response) -> {
ArrayList<Subscription> subs = subscriptions.get(roomId);
if (subs != null) {
subscriptions.get(roomId).clear();
}
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,41 @@
public class RealtimeControllerTest {
private AbstractProtocol networkProtocol = Mockito.mock(WebSocket.class);

@Test
public void countTest() throws NotConnectedException, InternalException {
Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol));
NotificationHandler notificationHandler = mock(NotificationHandler.class);

ArgumentCaptor arg = ArgumentCaptor.forClass(KuzzleMap.class);

kuzzleSpy.getRealtimeController().count("roomId");
verify(kuzzleSpy).query((KuzzleMap) arg.capture());

assertEquals("realtime", ((KuzzleMap) arg.getValue()).getString("controller"));
assertEquals("count", ((KuzzleMap) arg.getValue()).getString("action"));
assertEquals("roomId", ((KuzzleMap)((KuzzleMap) arg.getValue()).get("body")).getString("room_id"));
}

@Test
public void publishTest() throws NotConnectedException, InternalException {
Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol));
NotificationHandler notificationHandler = mock(NotificationHandler.class);

ArgumentCaptor arg = ArgumentCaptor.forClass(KuzzleMap.class);

ConcurrentHashMap<String, Object> message = new ConcurrentHashMap<>();
message.put("foo", "bar");

kuzzleSpy.getRealtimeController().publish("index", "collection", message);
verify(kuzzleSpy).query((KuzzleMap) arg.capture());

assertEquals("realtime", ((KuzzleMap) arg.getValue()).getString("controller"));
assertEquals("publish", ((KuzzleMap) arg.getValue()).getString("action"));
assertEquals("index", ((KuzzleMap) arg.getValue()).getString("index"));
assertEquals("collection", ((KuzzleMap) arg.getValue()).getString("collection"));
assertEquals("bar", ((ConcurrentHashMap<String, Object>)((KuzzleMap)((KuzzleMap) arg.getValue()).get("body")).get("message")).get("foo").toString());
}

@Test
public void subscribeTest() throws NotConnectedException, InternalException {
Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol));
Expand Down Expand Up @@ -83,4 +118,19 @@ public void notificationHandlerWithoutSubscribeToSelfTest() throws NotConnectedE
kuzzleSpy.trigger(Event.unhandledResponse, response);
verify(notificationHandler, never()).run(any(Response.class));
}

@Test
public void unsubscribeTest() throws NotConnectedException, InternalException {
Kuzzle kuzzleSpy = spy(new Kuzzle(networkProtocol));
NotificationHandler notificationHandler = mock(NotificationHandler.class);

ArgumentCaptor arg = ArgumentCaptor.forClass(KuzzleMap.class);

kuzzleSpy.getRealtimeController().unsubscribe("roomId");
verify(kuzzleSpy).query((KuzzleMap) arg.capture());

assertEquals("realtime", ((KuzzleMap) arg.getValue()).getString("controller"));
assertEquals("unsubscribe", ((KuzzleMap) arg.getValue()).getString("action"));
assertEquals("roomId", ((KuzzleMap)((KuzzleMap) arg.getValue()).get("body")).getString("roomId"));
}
}