Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions doc/3/core-classes/kuzzle-options/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ public int getMaxRequestDelay()
public KuzzleOptions setMaxRequestDelay(int maxRequestDelay)
```

### queueFilter
### autoResubscribe

Function to filter the request queue before replaying requests.
Automatically renew all subscriptions on a `reconnected` event.

```java
public Predicate<ConcurrentHashMap<String, Object>> getQueueFilter()
public KuzzleOptions setQueueFilter(
Predicate<ConcurrentHashMap<String, Object>> filter
)
public boolean isAutoResubscribe();
public KuzzleOptions setAutoResubscribe(boolean autoResubscribe);
```
19 changes: 19 additions & 0 deletions doc/3/essentials/network-loss-resilience/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
code: false
type: page
title: Network Loss Resilience
description: Learn how to use the Kuzzle JAVA SDK with an instable network
order: 400
---

# Network Loss Resilience

The Kuzzle JAVA SDK provides tools that allow it to be used with an unstable network connection.

## Automatic reconnection & resubscription

The Kuzzle JAVA SDK can automatically reconnect in case of a network disconnection and it can renew realtime subscriptions if there are any.

To control the auto reconnection feature with WebSocket (enabled by default), refer to the `autoReconnect` option of the [WebSocketOptions](/sdk/java/3/protocols/websocket-options) object.

To control the auto resubscription (enabled by default), refer to the `autoResubscribe` option of the [KuzzleOptions](/sdk/java/3/core-classes/kuzzle-options) object.
20 changes: 19 additions & 1 deletion doc/3/protocols/websocket-options/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,23 @@ If the websocket auto reconnects.

```java
public boolean getAutoReconnect()
public KuzzleOptions setAutoReconnect(boolean autoReconnect)
public WebSocketOptions setAutoReconnect(boolean autoReconnect)
```

### reconnectionDelay

Time between each reconnection attempt.

```java
public long getReconnectionDelay();
public WebSocketOptions setReconnectionDelay(long reconnectionDelay);
```

### reconnectionRetries

Number of attempts to try and reconnect. -1 for infinite attempts until the connection is established again.

```java
public long getReconnectionRetries();
public WebSocketOptions setReconnectionRetries(long reconnectionRetries);
```
91 changes: 71 additions & 20 deletions src/main/java/io/kuzzle/sdk/API/Controllers/RealtimeController.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,36 @@
import io.kuzzle.sdk.Handlers.NotificationHandler;
import io.kuzzle.sdk.Kuzzle;
import io.kuzzle.sdk.Options.SubscribeOptions;
import io.kuzzle.sdk.Protocol.ProtocolState;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class RealtimeController extends BaseController {
private class Subscription {
public boolean subscribeToSelf;
public String index;
public String collection;
public ConcurrentHashMap<String, Object> filter;
public NotificationHandler handler;

public Subscription(final boolean subscribeToSelf, final NotificationHandler handler) {
this.subscribeToSelf = subscribeToSelf;
public SubscribeOptions options;

public Subscription(final String index,
final String collection,
final ConcurrentHashMap<String, Object> filter,
final NotificationHandler handler,
final SubscribeOptions options) {
this.index = index;
this.collection = collection;
this.filter = filter;
this.handler = handler;
this.options = (options != null ? options : new SubscribeOptions());
}
}

private ConcurrentHashMap<String, ArrayList<Subscription>> subscriptions = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ArrayList<Subscription>> currentSubscriptions = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, ArrayList<Subscription>> subscriptionsCache = new ConcurrentHashMap<>();

public RealtimeController(Kuzzle kuzzle) {
super(kuzzle);
Expand All @@ -41,17 +54,22 @@ public RealtimeController(Kuzzle kuzzle) {
sdkInstanceId = response.Volatile.get("sdkInstanceId").toString();
}

ArrayList<Subscription> subs = RealtimeController.this.subscriptions.get(((Response) args[0]).room);
ArrayList<Subscription> subs = RealtimeController.this.currentSubscriptions.get(((Response) args[0]).room);

if (subs != null) {
final String instanceId = sdkInstanceId;
subs.forEach(sub -> {
if (sub != null && (instanceId.equals(kuzzle.instanceId) && sub.subscribeToSelf || !instanceId.equals(kuzzle.instanceId))) {
if (sub != null && (instanceId.equals(kuzzle.instanceId) && sub.options.isSubscribeToSelf() || !instanceId.equals(kuzzle.instanceId))) {
sub.handler.run(response);
}
});
}
});
kuzzle.register(Event.networkStateChange, state -> {
if (state[0] == ProtocolState.CLOSE) {
this.currentSubscriptions.clear();
}
});
}

/**
Expand All @@ -67,7 +85,7 @@ public CompletableFuture<Integer> count(final String roomId) throws NotConnected
.query(new KuzzleMap()
.put("controller", "realtime")
.put("action", "count")
.put("body", new KuzzleMap().put("room_id", roomId)))
.put("body", new KuzzleMap().put("roomId", roomId)))
.thenApplyAsync((response) -> ((KuzzleMap) response.result).getNumber("count").intValue());
}

Expand All @@ -94,6 +112,22 @@ public CompletableFuture<Void> publish(final String index, final String collecti
.thenApplyAsync((response) -> null);
}

public void renewSubscriptions() {
for (Map.Entry sub : subscriptionsCache.entrySet()) {
subscriptionsCache.get(sub.getKey()).clear();
((ArrayList<Subscription>) sub.getValue()).forEach(subscription ->
{
try {
subscribe(subscription);
} catch (NotConnectedException e) {
e.printStackTrace();
} catch (InternalException e) {
e.printStackTrace();
}
});
}
}

/**
* Subscribe to a collection.
*
Expand All @@ -108,12 +142,11 @@ public CompletableFuture<Void> publish(final String index, final String collecti
*/
public CompletableFuture<String> subscribe(final String index, final String collection, final ConcurrentHashMap<String, Object> filters, final NotificationHandler handler, final SubscribeOptions options) throws NotConnectedException, InternalException {
ConcurrentHashMap<String, Object> queryOptions = new ConcurrentHashMap<>();
boolean subscribeToSelf = true;
final SubscribeOptions opts = (options == null ? new SubscribeOptions() : new SubscribeOptions(options));

synchronized (RealtimeController.class) {
if (options != null) {
subscribeToSelf = options.isSubscribeToSelf();
queryOptions = options.toHashMap();
if (opts != null) {
queryOptions = opts.toHashMap();
}
}

Expand All @@ -124,29 +157,43 @@ public CompletableFuture<String> subscribe(final String index, final String coll
.put("collection", collection)
.put("body", filters);

boolean finalSubscribeToSelf = subscribeToSelf;
return kuzzle
.query(query)
.thenApplyAsync(
(response) -> {
String channel = ((ConcurrentHashMap<String, Object>) response.result).get("channel").toString();
Subscription subscription = new Subscription(
options == null ? new SubscribeOptions().isSubscribeToSelf() : finalSubscribeToSelf,
handler
index,
collection,
filters,
handler,
opts
);

if (subscriptions.get(channel) == null) {
if (currentSubscriptions.get(channel) == null) {
ArrayList<Subscription> item = new ArrayList<>();
item.add(subscription);
subscriptions.put(channel, item);
currentSubscriptions.put(channel, item);
subscriptionsCache.put(channel, item);
} else {
subscriptions.get(channel).add(subscription);
currentSubscriptions.get(channel).add(subscription);
subscriptionsCache.get(channel).add(subscription);
}

return ((ConcurrentHashMap<String, Object>) response.result).get("roomId").toString();
});
}

private CompletableFuture<String> subscribe(final Subscription subscribe) throws NotConnectedException, InternalException {
return subscribe(
subscribe.index,
subscribe.collection,
subscribe.filter,
subscribe.handler,
subscribe.options
);
}

public CompletableFuture<String> subscribe(final String index, final String collection, final ConcurrentHashMap<String, Object> filters, final NotificationHandler handler) throws NotConnectedException, InternalException {
return this.subscribe(index, collection, filters, handler, null);
}
Expand All @@ -166,9 +213,13 @@ public CompletableFuture<Void> unsubscribe(final String roomId) throws NotConnec
.put("action", "unsubscribe")
.put("body", new KuzzleMap().put("roomId", roomId)))
.thenApplyAsync((response) -> {
ArrayList<Subscription> subs = subscriptions.get(roomId);
ArrayList<Subscription> subs = currentSubscriptions.get(roomId);
if (subs != null) {
currentSubscriptions.get(roomId).clear();
}
subs = subscriptionsCache.get(roomId);
if (subs != null) {
subscriptions.get(roomId).clear();
subscriptionsCache.get(roomId).clear();
}
return null;
});
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/io/kuzzle/sdk/Kuzzle.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class Kuzzle extends EventManager {

private RealtimeController realtimeController;

private boolean autoResubscribe;

/**
* Initialize a new instance of Kuzzle
*
Expand Down Expand Up @@ -134,6 +136,8 @@ public Kuzzle(final AbstractProtocol networkProtocol,
kOptions.getRefreshedTokenDuration());
this.maxRequestDelay = new AtomicInteger(kOptions.getMaxRequestDelay());

this.autoResubscribe = options.isAutoResubscribe();

this.version = "3";
this.instanceId = UUID.randomUUID().toString();
this.sdkName = "java@" + version;
Expand Down Expand Up @@ -199,11 +203,13 @@ protected void onResponseReceived(final Object... payload) {

protected void onStateChanged(final Object... args) {
// If not connected anymore: close tasks and clean up the requests buffer
if ((ProtocolState) args[0] == ProtocolState.CLOSE) {
if (args[0] == ProtocolState.CLOSE) {
for (final Task<Response> task : requests.values()) {
task.setException(new ConnectionLostException());
}
requests.clear();
} else if (args[0] == ProtocolState.OPEN && realtimeController != null && autoResubscribe) {
realtimeController.renewSubscriptions();
}
}

Expand Down Expand Up @@ -281,4 +287,12 @@ public void setAuthenticationToken(final String token) {
}
authenticationToken.set(token);
}

public boolean isAutoResubscribe() {
return autoResubscribe;
}

public void setAutoResubscribe(boolean autoResubscribe) {
this.autoResubscribe = autoResubscribe;
}
}
12 changes: 12 additions & 0 deletions src/main/java/io/kuzzle/sdk/Options/KuzzleOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class KuzzleOptions {
private Predicate<ConcurrentHashMap<String, Object>> queueFilter = (
ConcurrentHashMap<String, Object> obj) -> true;

private boolean autoResubscribe = true;

/**
* Initialize a new KuzzleOptions instance.
*/
Expand All @@ -48,6 +50,7 @@ public KuzzleOptions(KuzzleOptions options) {
this.maxQueueSize = options.maxQueueSize;
this.minTokenDuration = options.minTokenDuration;
this.refreshedTokenDuration = options.refreshedTokenDuration;
this.autoResubscribe = options.autoResubscribe;

this.maxRequestDelay = options.maxRequestDelay;

Expand Down Expand Up @@ -147,4 +150,13 @@ public KuzzleOptions setQueueFilter(
(ConcurrentHashMap<String, Object> obj) -> true);
return this;
}

public boolean isAutoResubscribe() {
return autoResubscribe;
}

public KuzzleOptions setAutoResubscribe(boolean autoResubscribe) {
this.autoResubscribe = autoResubscribe;
return this;
}
}
30 changes: 30 additions & 0 deletions src/main/java/io/kuzzle/sdk/Options/Protocol/WebSocketOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ public class WebSocketOptions {
*/
private boolean autoReconnect = true;

/**
* The number of milliseconds between 2 automatic reconnection attempts.
*/
private long reconnectionDelay = 1000;

/**
* The maximum number of automatic reconnection attempts.
*/
private long reconnectionRetries = 20;

/**
* Initialize a new WebSocketOptions instance.
*/
Expand All @@ -39,6 +49,8 @@ public WebSocketOptions(WebSocketOptions other) {
this.ssl = other.ssl;
this.connectionTimeout = other.connectionTimeout;
this.autoReconnect = other.autoReconnect;
this.reconnectionDelay = other.reconnectionDelay;
this.reconnectionRetries = other.reconnectionRetries;
}

/**
Expand Down Expand Up @@ -112,4 +124,22 @@ public WebSocketOptions setAutoReconnect(boolean autoReconnect) {
this.autoReconnect = autoReconnect;
return this;
}

public long getReconnectionDelay() {
return reconnectionDelay;
}

public WebSocketOptions setReconnectionDelay(long reconnectionDelay) {
this.reconnectionDelay = reconnectionDelay;
return this;
}

public long getReconnectionRetries() {
return reconnectionRetries;
}

public WebSocketOptions setReconnectionRetries(long reconnectionRetries) {
this.reconnectionRetries = reconnectionRetries;
return this;
}
}
1 change: 1 addition & 0 deletions src/main/java/io/kuzzle/sdk/Protocol/ProtocolState.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
public enum ProtocolState {
CLOSE, // The network protocol does not accept requests.
OPEN, // The network protocol accepts new requests.
RECONNECTING,
}
Loading