Skip to content

Conversation

@duongkame
Copy link
Contributor

@duongkame duongkame commented Jan 18, 2024

What changes were proposed in this pull request?

See RATIS-2009

Found this issue when running tests for #1014, TestServerRestartWithGrpc failed because RaftConfiguration entry zero-copy buffer are released too early (while the log entry are still in the EntryCache.)

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-2009

How was this patch tested?

Existing tests.

@duongkame duongkame marked this pull request as ready for review January 18, 2024 20:34
@duongkame
Copy link
Contributor Author

@szetszwo can you have a look?

…ing submitted to RaftServer by SlidingWindow.
@duongkame
Copy link
Contributor Author

duongkame commented Jan 18, 2024

I also just attempted to solve the following test failure in this PR. @szetszwo

   org.apache.ratis.grpc.TestWatchRequestWithGrpc.testWatchRequestAsyncChangeLeader  Time elapsed: 34.557 s  <<< ERROR!
   java.util.concurrent.TimeoutException
        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at org.apache.ratis.WatchRequestTests.checkMajority(WatchRequestTests.java:274)
        at org.apache.ratis.WatchRequestTests.runTestWatchRequestAsyncChangeLeader(WatchRequestTests.java:360)
        at org.apache.ratis.WatchRequestTests.runTest(WatchRequestTests.java:153)
        at org.apache.ratis.WatchRequestTests.lambda$testWatchRequestAsyncChangeLeader$6(WatchRequestTests.java:337)
        at org.apache.ratis.server.impl.MiniRaftCluster$Factory$Get.runWithNewCluster(MiniRaftCluster.java:140)
        at org.apache.ratis.server.impl.MiniRaftCluster$Factory$Get.runWithNewCluster(MiniRaftCluster.java:120)
        at org.apache.ratis.WatchRequestTests.testWatchRequestAsyncChangeLeader(WatchRequestTests.java:336)
        ...

testWatchRequestAsyncChangeLeader may timeout as above. In the log, it has

2024-01-18 01:41:51,651 [grpc-default-executor-0] ERROR impl.OrderedAsync (OrderedAsync.java:lambda$sendRequest$9(247)) - client-6F96E8AE8714: Failed* RaftClientRequest:client-6F96E8AE8714->s2@group-4F85081496F8, cid=64, seq=20, RW, m7
java.util.concurrent.CompletionException: java.io.IOException: java.lang.IllegalStateException: Failed to retain: object has already been completely released.
     at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
     at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
     at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
     at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
     at org.apache.ratis.grpc.client.GrpcClientProtocolClient$AsyncStreamObservers.completeReplyExceptionally(GrpcClientProtocolClient.java:394)
     at org.apache.ratis.grpc.client.GrpcClientProtocolClient$AsyncStreamObservers.access$000(GrpcClientProtocolClient.java:300)
     at org.apache.ratis.grpc.client.GrpcClientProtocolClient$AsyncStreamObservers$1.onError(GrpcClientProtocolClient.java:331)
     at org.apache.ratis.thirdparty.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
     at org.apache.ratis.thirdparty.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
     at org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
     at org.apache.ratis.grpc.metrics.intercept.client.MetricClientCallListener.onClose(MetricClientCallListener.java:47)
     at org.apache.ratis.thirdparty.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
     at org.apache.ratis.thirdparty.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
     at org.apache.ratis.thirdparty.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
     at org.apache.ratis.thirdparty.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
     at org.apache.ratis.thirdparty.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
     at org.apache.ratis.thirdparty.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: java.lang.IllegalStateException: Failed to retain: object has already been completely released.
     at org.apache.ratis.grpc.GrpcUtil.unwrapException(GrpcUtil.java:103)
     at org.apache.ratis.grpc.GrpcUtil.unwrapIOException(GrpcUtil.java:163)
     at org.apache.ratis.grpc.client.GrpcClientProtocolClient$AsyncStreamObservers$1.onError(GrpcClientProtocolClient.java:330)
     ... 13 more
Caused by: java.lang.IllegalStateException: Failed to retain: object has already been completely released.
     at org.apache.ratis.util.ReferenceCountedObject$4.retain(ReferenceCountedObject.java:183)
     at org.apache.ratis.server.impl.RaftServerProxy.submitClientRequestAsync(RaftServerProxy.java:450)
     at org.apache.ratis.grpc.server.GrpcClientProtocolService$RequestStreamObserver.processClientRequest(GrpcClientProtocolService.java:248)
     at org.apache.ratis.grpc.server.GrpcClientProtocolService$OrderedRequestStreamObserver.processClientRequest(GrpcClientProtocolService.java:364)
     at org.apache.ratis.util.SlidingWindow$Server.processRequestsFromHead(SlidingWindow.java:459)
     at org.apache.ratis.util.SlidingWindow$Server.receivedRequest(SlidingWindow.java:451)
     at org.apache.ratis.grpc.server.GrpcClientProtocolService$OrderedRequestStreamObserver.processClientRequest(GrpcClientProtocolService.java:389)
     at org.apache.ratis.grpc.server.GrpcClientProtocolService$RequestStreamObserver.onNext(GrpcClientProtocolService.java:272)
     at org.apache.ratis.grpc.server.GrpcClientProtocolService$RequestStreamObserver.onNext(GrpcClientProtocolService.java:191)
     at org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
     at org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
     at org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:329)
     at org.apache.ratis.thirdparty.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:314)
     at org.apache.ratis.thirdparty.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:833)
     ... 5 more

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing also testWatchRequestAsyncChangeLeader! Please see the comments inlined.

Comment on lines 130 to 135
default CompletableFuture<Long> appendEntry(LogEntryProto entry, TransactionContext context) {
return appendEntry(entry);
return appendEntry(ReferenceCountedObject.wrap(entry), context);
}

CompletableFuture<Long> appendEntry(ReferenceCountedObject<LogEntryProto> entryRef, TransactionContext context);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is in ratis-server-api, we have to provide a default implementation of the new method for compatibility.

  /**
   * Append asynchronously an entry.
   * Used by the leader.
   */
  default CompletableFuture<Long> appendEntry(ReferenceCountedObject<LogEntryProto> entryRef, TransactionContext context) {
    return appendEntry(entryRef.get(), context);
  }

  /**
   * @deprecated use {@link #append(ReferenceCountedObject)}.
   */
  @Deprecated
  default CompletableFuture<Long> appendEntry(LogEntryProto entry, TransactionContext context) {
    throw new UnsupportedOperationException();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update also RaftLogBase:

@@ -345,15 +346,17 @@ public abstract class RaftLogBase implements RaftLog {
 
   @Override
   public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
-    return appendEntry(entry, null);
+    return appendEntry(ReferenceCountedObject.wrap(entry), null);
   }
 

final long seq = pending.getSeqNum();
processClientRequest(pending.getRequestRef(),
reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
pending.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use whenComplete(..).

      final ReferenceCountedObject<RaftClientRequest> ref = pending.getRequestRef();
      processClientRequest(ref, reply -> slidingWindow.receiveReply(seq, reply, this::sendReply))
          .whenComplete((v, e) -> ref.release());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processClientRequest eventually calls submitClientRequestAsync(ReferenceCountedObject<RaftClientRequest>) that returns a Future.

I realized that when calling any asynchronous method accepting a reference counter, we hardly need to wait for whenComplete to release the counter. Look at an example below

  final RaftClientRequest request = requestRef.retain();
  try {
  // do something with the request.
   // call asynchronous
   return submitClientRequestAsync()
      // We don't need and should not do this
      // .whenComplete((r, e) -> requestRef.release());
    ;
  } finally {
   // instead just release the counter here.
   requestRef.release()
  }

This is because:

  1. Inside submitClientRequestAsync, there's already retain when needed (in LogSegment cache). Other words, if the internal logic of submitClientRequestAsync needs to extend the life of the referenced object, that should be done there. The outside client code responsibility is to ensure the reference is retainable when it reaches the submitClientRequestAsync and this is done by the outside retain. The outside code releases the counter as soon as it finishes using it.
  2. There're cases that whenComplete is not called. For example, when client times out or disconects, the return future seems to be discarded (documented per RATIS-2007).

Comment on lines 70 to 72
void release() {
requestRef.release();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method could be removed.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duongkame , thanks for the update! Just a minor comment inlined.

Comment on lines 126 to 133
/**
* Append asynchronously an entry.
* Used by the leader.
* Used by the leader for scenarios that there is no needs to clean up resources when the given entry is no
* longer used/referenced by this log.
*/
default CompletableFuture<Long> appendEntry(LogEntryProto entry, TransactionContext context) {
return appendEntry(entry);
return appendEntry(ReferenceCountedObject.wrap(entry), context);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is not used anywhere except for the new append(ReferenceCountedObject). It should throw an exception and annotated as deprecated.

  /**
   * @deprecated use {@link #append(ReferenceCountedObject)}.
   */
  @Deprecated
  default CompletableFuture<Long> appendEntry(LogEntryProto entry, TransactionContext context) {
    throw new UnsupportedOperationException();
  }

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

@szetszwo szetszwo merged commit 82c31ea into apache:master Jan 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants