Skip to content
This repository was archived by the owner on Jun 18, 2025. It is now read-only.

fix: invoke reconnect when readInternal return error#188

Merged
YoEight merged 13 commits intokurrent-io:masterfrom
AwaedFintech:master
Nov 12, 2024
Merged

fix: invoke reconnect when readInternal return error#188
YoEight merged 13 commits intokurrent-io:masterfrom
AwaedFintech:master

Conversation

@itgram
Copy link
Copy Markdown
Contributor

@itgram itgram commented Oct 22, 2024

Fixed: Trigger a reconnect when readInternal returns an error.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Oct 22, 2024

CLA assistant check
All committers have signed the CLA.

@itgram
Copy link
Copy Markdown
Contributor Author

itgram commented Oct 23, 2024

@w1am

Could you please merge this PR?

Let me know if this works for you!

@YoEight
Copy link
Copy Markdown
Contributor

YoEight commented Oct 23, 2024

Hey @itgram, thanks for contribution.

Why that change is needed in the first place? Could you extend on the reason a little bit?

@itgram
Copy link
Copy Markdown
Contributor Author

itgram commented Oct 23, 2024

@YoEight

In a cluster mode setup of EventStoreDB, when one of the cluster nodes drops and we attempt to use ReadStream method, it raises an error and doesn’t rebalance with the updated cluster members.

This issue occurs here: https://github.com/EventStore/EventStore-Client-Go/blob/8206ac84067e5a6d591f71fafb82758015fb9f3c/esdb/client.go#L291.

If we call ReadStream again, it fails repeatedly because client.grpcClient.handleError is not invoked in case of an error, which prevents the reconnection process from being triggered.

@YoEight
Copy link
Copy Markdown
Contributor

YoEight commented Oct 23, 2024

Could you come up with a test that shows your patch is fixing the behavior you are talking about?

Copy link
Copy Markdown
Contributor

@YoEight YoEight left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution.

As-is, I don't think your test reproduces the scenario you described earlier. I'd really like to replicate all the steps that lead to your issue before applying your patch.

Comment thread esdb/cluster_test.go Outdated
Comment thread esdb/cluster_test.go Outdated
Comment thread esdb/cluster_test.go
@itgram itgram requested a review from YoEight October 26, 2024 11:28
@itgram
Copy link
Copy Markdown
Contributor Author

itgram commented Oct 27, 2024

@YoEight

I couldn’t find a way to stop the leader node of EventStoreDB programmatically from the code.

@YoEight
Copy link
Copy Markdown
Contributor

YoEight commented Oct 27, 2024

If you already know the leader node of the cluster, what you can do is to call those endpoints on it, the order matters:

  1. POST /admin/node/priority/-1
  2. POST /admin/node/resign

This will not stop the leader per se but rather trigger a new election cycle and guarantee (in the context of your test) that a new a leader is elected.

@itgram
Copy link
Copy Markdown
Contributor Author

itgram commented Oct 27, 2024

If you already know the leader node of the cluster, what you can do is to call those endpoints on it, the order matters:

  1. POST /admin/node/priority/-1
  2. POST /admin/node/resign

This will not stop the leader per se but rather trigger a new election cycle and guarantee (in the context of your test) that a new a leader is elected.

Interesting, but as you know, I still haven't been able to determine the leader node programmatically yet.

@YoEight
Copy link
Copy Markdown
Contributor

YoEight commented Oct 28, 2024

The best way to achieve this is to leverage the GET /gossip endpoint. It gives you something like this:

{
  "members": [
    {
      "instanceId": "cf2e423a-a664-43ed-806f-10745c6d0ea0",
      "timeStamp": "2024-10-28T00:53:05.55955Z",
      "state": "Leader",
      "isAlive": true,
      "internalTcpIp": "127.0.0.1",
      "internalTcpPort": 1112,
      "internalSecureTcpPort": 0,
      "externalTcpIp": "127.0.0.1",
      "externalTcpPort": 0,
      "externalSecureTcpPort": 0,
      "internalHttpEndPointIp": "127.0.0.1",
      "internalHttpEndPointPort": 2113,
      "httpEndPointIp": "127.0.0.1",
      "httpEndPointPort": 2113,
      "lastCommitPosition": 891,
      "writerCheckpoint": 1063,
      "chaserCheckpoint": 1063,
      "epochPosition": 0,
      "epochNumber": 0,
      "epochId": "ded879b8-d7e6-45a6-87b2-2f3ef8c1f47b",
      "nodePriority": 0,
      "isReadOnlyReplica": false,
      "esVersion": "24.10.0-prerelease"
    },
   {...}
  ],
}

The leader node will have its state property set to Leader.

@YoEight
Copy link
Copy Markdown
Contributor

YoEight commented Oct 29, 2024

Hey @itgram

I came up with a test case that I believe replicates the steps that you described earlier:

func readStreamNotLeaderExceptionButWorkAfterRetry(t *testing.T) {
	db := CreateClient("esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?nodepreference=leader&tlsverifycert=false", t)
	defer db.Close()

	ctx := context.Background()
	streamID := NAME_GENERATOR.Generate()

	_, err := db.AppendToStream(ctx, streamID, esdb.AppendToStreamOptions{}, createTestEvent())

	assert.Nil(t, err)

	members, err := db.Gossip(ctx)

	assert.Nil(t, err)

	for _, member := range members {
		if member.State != gossip.MemberInfo_Leader {
			continue
		}

		url := fmt.Sprintf("https://%s:%d/admin/shutdown", member.HttpEndPoint.Address, member.HttpEndPoint.Port)
		req, err := http.NewRequest("POST", url, nil)
		assert.Nil(t, err)

		req.SetBasicAuth("admin", "changeit")
		client := &http.Client{
			Transport: &http.Transport{
				TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
			},
		}
		resp, err := client.Do(req)

		assert.Nil(t, err)
		resp.Body.Close()

		break
	}

	_, err = db.ReadStream(ctx, streamID, esdb.ReadStreamOptions{
		From:           esdb.Start{},
		RequiresLeader: true,
	}, math.MaxUint64)

	assert.Nil(t, err)
}

The Gossip function I used is one that reuse our internal grpc gossip client:

func (client *Client) Gossip(ctx context.Context) ([]*gossip.MemberInfo, error) {
	handle, err := client.grpcClient.getConnectionHandle()

	if err != nil {
		return nil, err
	}

	gossipClient := gossip.NewGossipClient(handle.Connection())
	clusterInfo, err := gossipClient.Read(ctx, &shared.Empty{})

	if err != nil {
		return nil, err
	}

	return clusterInfo.Members, nil
}

I wasn’t able to reproduce the issue on my end. While I can’t rule out the possibility of a bug, it may help to explore other potential causes. Do you have any additional leads or context that could help narrow down the source?

@itgram itgram marked this pull request as draft October 29, 2024 17:25
@itgram itgram marked this pull request as ready for review October 29, 2024 17:33
@itgram
Copy link
Copy Markdown
Contributor Author

itgram commented Oct 31, 2024

@YoEight
Please review the test case once more and share your feedback.

@itgram
Copy link
Copy Markdown
Contributor Author

itgram commented Nov 7, 2024

Hello @YoEight, do you have any concerns?

@YoEight
Copy link
Copy Markdown
Contributor

YoEight commented Nov 8, 2024

Hey @itgram, I haven’t forgotten about your PR! I’m finishing up a few other tasks first, then I’ll jump into reviewing it. Apologies for the delay, my time has been tighter than usual as we’re gearing up for the upcoming release. Thanks for your patience!

Copy link
Copy Markdown
Contributor

@YoEight YoEight left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm your patch fixed the issue, nice work!

@w1am w1am self-requested a review November 12, 2024 15:50
w1am
w1am previously approved these changes Nov 12, 2024
Comment thread esdb/client.go Outdated
Co-authored-by: Yo Eight <yo.eight@gmail.com>
@YoEight YoEight merged commit c29aa7d into kurrent-io:master Nov 12, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants