Skip to content

Commit 78d551c

Browse files
authored
update to rs3 proto w expected_min_written_offset (#454)
1 parent 08ec90c commit 78d551c

File tree

4 files changed

+11
-11
lines changed

4 files changed

+11
-11
lines changed
Submodule rs3 updated from da6498b to fb2d8f8

kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3Client.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ private <K extends Comparable<K>> KeyValueIterator<K, byte[]> sendRange(
260260
.setStoreId(uuidToProto(storeId))
261261
.setLssId(lssIdProto(lssId))
262262
.setPssId(pssId)
263-
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
263+
.setExpectedMinWrittenOffset(walOffsetProto(expectedWrittenOffset));
264264
final Supplier<String> rangeDescription =
265265
() -> "Range(storeId=" + storeId + ", lssId=" + lssId + ", pssId=" + pssId + ")";
266266
final var asyncStub = stubs.stubs(storeId, pssId).asyncStub();
@@ -335,7 +335,7 @@ private Optional<Rs3.KeyValue> sendGet(
335335
requestBuilder.setStoreId(uuidToProto(storeId))
336336
.setLssId(lssIdProto(lssId))
337337
.setPssId(pssId)
338-
.setExpectedWrittenOffset(walOffsetProto(expectedWrittenOffset));
338+
.setExpectedMinWrittenOffset(walOffsetProto(expectedWrittenOffset));
339339

340340
final var request = requestBuilder.build();
341341
final RS3Grpc.RS3BlockingStub stub = stubs.stubs(storeId, pssId).syncStub();

kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/GrpcRS3ClientTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ public void shouldGetWithExpectedWrittenOffset() {
666666
.setLssId(lssIdProto(LSS_ID))
667667
.setPssId(PSS_ID)
668668
.setStoreId(uuidToProto(STORE_ID))
669-
.setExpectedWrittenOffset(GrpcRs3Util.walOffsetProto(123L))
669+
.setExpectedMinWrittenOffset(GrpcRs3Util.walOffsetProto(123L))
670670
.setKey(Rs3.Key.newBuilder().setBasicKey(
671671
GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8))
672672
))
@@ -706,7 +706,7 @@ public void shouldGet() {
706706
.setKey(Rs3.Key.newBuilder().setBasicKey(
707707
GrpcRs3Util.basicKeyProto("foo".getBytes(StandardCharsets.UTF_8))
708708
))
709-
.setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
709+
.setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
710710
.build()
711711
);
712712
}
@@ -825,7 +825,7 @@ public void shouldWindowedGet() {
825825
verify(stub).get(Rs3.GetRequest.newBuilder()
826826
.setLssId(lssIdProto(LSS_ID))
827827
.setPssId(PSS_ID)
828-
.setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
828+
.setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
829829
.setStoreId(uuidToProto(STORE_ID))
830830
.setKey(Rs3.Key.newBuilder().setWindowKey(keyProto))
831831
.build()
@@ -866,7 +866,7 @@ public void shouldRetryWindowedGet() {
866866
.get(Rs3.GetRequest.newBuilder()
867867
.setLssId(lssIdProto(LSS_ID))
868868
.setPssId(PSS_ID)
869-
.setExpectedWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
869+
.setExpectedMinWrittenOffset(GrpcRs3Util.UNWRITTEN_WAL_OFFSET)
870870
.setStoreId(uuidToProto(STORE_ID))
871871
.setKey(Rs3.Key.newBuilder().setWindowKey(keyProto))
872872
.build()

kafka-client/src/test/java/dev/responsive/kafka/internal/db/rs3/client/grpc/TestGrpcRs3Service.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void get(
8383
return;
8484
}
8585

86-
if (req.getExpectedWrittenOffset().getIsWritten()) {
87-
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
86+
if (req.getExpectedMinWrittenOffset().getIsWritten()) {
87+
if (offset.get() < req.getExpectedMinWrittenOffset().getOffset()) {
8888
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
8989
return;
9090
}
@@ -113,8 +113,8 @@ public void range(
113113
return;
114114
}
115115

116-
if (req.getExpectedWrittenOffset().getIsWritten()) {
117-
if (offset.get() < req.getExpectedWrittenOffset().getOffset()) {
116+
if (req.getExpectedMinWrittenOffset().getIsWritten()) {
117+
if (offset.get() < req.getExpectedMinWrittenOffset().getOffset()) {
118118
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT));
119119
return;
120120
}

0 commit comments

Comments
 (0)