Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,8 @@ func (n *Node) Run(ctx context.Context) error {
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
log.G(ctx).Infof("soft state changed for node %x to not longer a leader, resetting and cancelling all waits", n.opts.ID)

if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Publish(IsFollower)
Expand Down Expand Up @@ -1679,6 +1681,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa

// Do this check after calling register to avoid a race.
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
log.G(ctx).Errorf("node %x is no longer leader, aborting propose", n.opts.ID)
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
Expand All @@ -1703,14 +1706,23 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
select {
case x, ok := <-ch:
if !ok {
// Wait notification channel was closed. This should only happen if the wait was cancelled.
log.G(ctx).Errorf("wait cancelled, likely because node %x lost leader position", n.opts.ID)
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait cancelled but node %x is still a leader.", n.opts.ID)
}
return nil, ErrLostLeadership
}
return x.(proto.Message), nil
case <-waitCtx.Done():
n.wait.cancel(r.ID)
// if channel is closed, wait item was canceled, otherwise it was triggered
// If we can read from the channel, wait item was triggered. Otherwise it was cancelled.
x, ok := <-ch
if !ok {
log.G(ctx).WithError(waitCtx.Err()).Errorf("wait context cancelled, likeyly because node %x lost leader position", n.opts.ID)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likelyly is a misspelling.

if atomic.LoadUint32(&n.signalledLeadership) == 1 {
log.G(ctx).Errorf("wait context cancelled but node %x is still a leader", n.opts.ID)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message may appear at shutdown, because that's when the context gets cancelled.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Will adjust the comment.

On a related node, we don't wait for all transaction for complete during shutdown ?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, transactions can take an arbitrarily long time to reach consensus.

}
return nil, ErrLostLeadership
}
return x.(proto.Message), nil
Expand Down Expand Up @@ -1779,21 +1791,26 @@ func (n *Node) processEntry(ctx context.Context, entry raftpb.Entry) error {
}

if !n.wait.trigger(r.ID, r) {
log.G(ctx).Errorf("wait not found for raft id %x", r.ID)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"proposal id"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I'll fix this.


// There was no wait on this ID, meaning we don't have a
// transaction in progress that would be committed to the
// memory store by the "trigger" call. Either a different node
// wrote this to raft, or we wrote it before losing the leader
// position and cancelling the transaction. Create a new
// transaction to commit the data.
// position and cancelling the transaction. This entry still needs
// to be committed since other nodes have already committed it.
// Create a new transaction to commit this entry.

// It should not be possible for processInternalRaftRequest
// to be running in this situation, but out of caution we
// cancel any current invocations to avoid a deadlock.
// TODO(anshul) This call is likely redundant, remove after consideration.
n.wait.cancelAll()

err := n.memoryStore.ApplyStoreActions(r.Action)
if err != nil {
log.G(ctx).WithError(err).Error("failed to apply actions from raft")
// TODO(anshul) return err here ?
}
}
return nil
Expand Down