Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@
*/
public interface SnapshotManagementApi {

/** trigger create snapshot file. */
RaftClientReply create(long timeoutMs) throws IOException;
/** The same as create(0, timeoutMs). */
default RaftClientReply create(long timeoutMs) throws IOException {
return create(0, timeoutMs);
}

/** The same as create(force? 1 : 0, timeoutMs). */
default RaftClientReply create(boolean force, long timeoutMs) throws IOException {
return create(force? 1 : 0, timeoutMs);
}

/**
* Trigger to create a snapshot.
*
* @param creationGap When (creationGap > 0) and (astAppliedIndex - lastSnapshotIndex < creationGap),
* return lastSnapshotIndex; otherwise, take a new snapshot and then return its index.
* When creationGap == 0, use the server configured value as the creationGap.
* @return a reply. When {@link RaftClientReply#isSuccess()} is true,
* {@link RaftClientReply#getLogIndex()} is the snapshot index fulfilling the operation.
*/
RaftClientReply create(long creationGap, long timeoutMs) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ static SnapshotManagementRequest toSnapshotManagementRequest(SnapshotManagementR
switch(p.getOpCase()) {
case CREATE:
return SnapshotManagementRequest.newCreate(clientId, serverId,
ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId(), m.getTimeoutMs());
ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId(), m.getTimeoutMs(),
p.getCreate().getCreationGap());
default:
throw new IllegalArgumentException("Unexpected op " + p.getOpCase() + " in " + p);
}
Expand All @@ -671,7 +672,7 @@ static SnapshotManagementRequestProto toSnapshotManagementRequestProto(
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
final SnapshotManagementRequest.Create create = request.getCreate();
if (create != null) {
b.setCreate(SnapshotCreateRequestProto.newBuilder().build());
b.setCreate(SnapshotCreateRequestProto.newBuilder().setCreationGap(create.getCreationGap()).build());
}
return b.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class SnapshotManagementImpl implements SnapshotManagementApi {
}

@Override
public RaftClientReply create(long timeoutMs) throws IOException {
public RaftClientReply create(long creationGap, long timeoutMs) throws IOException {
final long callId = CallId.getAndIncrement();
return client.io().sendRequestWithRetry(() -> SnapshotManagementRequest.newCreate(client.getId(),
Optional.ofNullable(server).orElseGet(client::getLeaderId), client.getGroupId(), callId, timeoutMs));
Optional.ofNullable(server).orElseGet(client::getLeaderId),
client.getGroupId(), callId, timeoutMs, creationGap));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@ public final class SnapshotManagementRequest extends RaftClientRequest {
public abstract static class Op {

}
public static class Create extends Op {

public static final class Create extends Op {
private final long creationGap;
private Create(long creationGap) {
this.creationGap = creationGap;
}

public long getCreationGap() {
return creationGap;
}

@Override
public String toString() {
Expand All @@ -35,8 +44,13 @@ public String toString() {

public static SnapshotManagementRequest newCreate(ClientId clientId,
RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs) {
return newCreate(clientId, serverId, groupId, callId, timeoutMs, 0);
}

public static SnapshotManagementRequest newCreate(ClientId clientId,
RaftPeerId serverId, RaftGroupId groupId, long callId, long timeoutMs, long creationGap) {
return new SnapshotManagementRequest(clientId,
serverId, groupId, callId, timeoutMs,new SnapshotManagementRequest.Create());
serverId, groupId, callId, timeoutMs, new SnapshotManagementRequest.Create(creationGap));
}

private final Op op;
Expand Down
2 changes: 1 addition & 1 deletion ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ message SnapshotManagementRequestProto {
}

message SnapshotCreateRequestProto {

uint64 creationGap = 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to set this to optional for compatibility

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In proto 3, optional is generally discouraged; see https://cloud.google.com/apis/design/proto3

Beginning in protobuf v3.14, primitive fields can distinguish between the default value and unset value by using the optional keyword, although this is generally discouraged.

In our case, the default value 0 means that it will use server configuration. So, it is compatible to the previous code.

}

message StartLeaderElectionRequestProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,9 +1223,10 @@ CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotManagementRequest r
LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(getMemberId(), request);
Preconditions.assertNotNull(request.getCreate(), "create");

//TODO(liuyaolong): get the gap value from shell command
long minGapValue = RaftServerConfigKeys.Snapshot.creationGap(proxy.getProperties());
final long creationGap = request.getCreate().getCreationGap();
long minGapValue = creationGap > 0? creationGap : RaftServerConfigKeys.Snapshot.creationGap(proxy.getProperties());
final long lastSnapshotIndex = Optional.ofNullable(stateMachine.getLatestSnapshot())
.map(SnapshotInfo::getIndex)
.orElse(0L);
Expand Down