diff --git a/Makefile b/Makefile index 84ea4a2d..82305e40 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ clusterNode: ## Run tests against a cluster node. @$(DOCKER_COMPOSE_CMD) -f cluster-docker-compose.yml up -d @echo "Waiting for services to be fully ready..." @sleep 5 - @EVENTSTORE_INSECURE=false CLUSTER=true go test -count=1 -v ./esdb -run 'TestStreams|TestPersistentSubscriptions|TestProjections' + @EVENTSTORE_INSECURE=false CLUSTER=true go test -count=1 -v ./esdb -run 'TestStreams|TestPersistentSubscriptions|TestProjections|TestClusterRebalance' @$(DOCKER_COMPOSE_CMD) -f cluster-docker-compose.yml down --remove-orphans .PHONY: misc diff --git a/esdb/client.go b/esdb/client.go index c435fe19..bd46b532 100644 --- a/esdb/client.go +++ b/esdb/client.go @@ -9,7 +9,9 @@ import ( "io" "strconv" + "github.com/EventStore/EventStore-Client-Go/v4/protos/gossip" persistentProto "github.com/EventStore/EventStore-Client-Go/v4/protos/persistent" + "github.com/EventStore/EventStore-Client-Go/v4/protos/shared" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -759,7 +761,9 @@ func readInternal( result, err := streamsClient.Read(ctx, readRequest, callOptions...) if err != nil { defer cancel() - return nil, err + + err = client.grpcClient.handleError(handle, trailers, err) + return nil, fmt.Errorf("could not construct read operation. Reason: %w", err) } params := readStreamParams{ @@ -773,3 +777,20 @@ func readInternal( return newReadStream(params), nil } + +func (client *Client) Gossip(ctx context.Context) ([]*gossip.MemberInfo, error) { + handle, err := client.grpcClient.getConnectionHandle() + + if err != nil { + return nil, err + } + + gossipClient := gossip.NewGossipClient(handle.Connection()) + clusterInfo, err := gossipClient.Read(ctx, &shared.Empty{}) + + if err != nil { + return nil, err + } + + return clusterInfo.Members, nil +} diff --git a/esdb/client_test.go b/esdb/client_test.go index 8a2e3a09..1e16333b 100644 --- a/esdb/client_test.go +++ b/esdb/client_test.go @@ -126,3 +126,7 @@ func TestMisc(t *testing.T) { TestPositionParsing(t) UUIDParsingTests(t) } + +func TestClusterRebalance(t *testing.T) { + ClusterRebalanceTests(t) +} diff --git a/esdb/cluster_test.go b/esdb/cluster_test.go index e51cec90..f90a1c35 100644 --- a/esdb/cluster_test.go +++ b/esdb/cluster_test.go @@ -2,10 +2,14 @@ package esdb_test import ( "context" + "crypto/tls" + "fmt" + "net/http" "testing" "time" "github.com/EventStore/EventStore-Client-Go/v4/esdb" + "github.com/EventStore/EventStore-Client-Go/v4/protos/gossip" "github.com/stretchr/testify/assert" ) @@ -16,6 +20,12 @@ func ClusterTests(t *testing.T) { }) } +func ClusterRebalanceTests(t *testing.T) { + t.Run("ClusterRebalanceTests", func(t *testing.T) { + t.Run("readStreamAfterClusterRebalance", readStreamAfterClusterRebalance) + }) +} + func notLeaderExceptionButWorkAfterRetry(t *testing.T) { // Seems on GHA, we need to try more that once because the name generator is not random enough. for count := 0; count < 10; count++ { @@ -57,3 +67,72 @@ func notLeaderExceptionButWorkAfterRetry(t *testing.T) { t.Fatalf("we retried long enough but the test is still failing") } + +func readStreamAfterClusterRebalance(t *testing.T) { + // We purposely connect to a leader node. + db := CreateClient("esdb://admin:changeit@localhost:2111,localhost:2112,localhost:2113?nodepreference=leader&tlsverifycert=false", t) + defer db.Close() + + ctx := context.Background() + streamID := NAME_GENERATOR.Generate() + + // Start reading the stream + options := esdb.ReadStreamOptions{From: esdb.Start{}} + + stream, err := db.ReadStream(ctx, streamID, options, 10) + if err != nil { + t.Errorf("failed to read stream: %v", err) + return + } + + stream.Close() + + // Simulate leader node failure + members, err := db.Gossip(ctx) + + assert.Nil(t, err) + + for _, member := range members { + if member.State != gossip.MemberInfo_Leader || !member.GetIsAlive() { + continue + } + + // Shutdown the leader node + url := fmt.Sprintf("https://%s:%d/admin/shutdown", member.HttpEndPoint.Address, member.HttpEndPoint.Port) + t.Log("Shutting down leader node: ", url) + + req, err := http.NewRequest("POST", url, nil) + assert.Nil(t, err) + + req.SetBasicAuth("admin", "changeit") + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + resp, err := client.Do(req) + + assert.Nil(t, err) + resp.Body.Close() + + break + } + + // Wait for the cluster to rebalance + time.Sleep(5 * time.Second) + + // Try reading the stream again + for count := 0; count < 10; count++ { + stream, err = db.ReadStream(ctx, streamID, options, 10) + if err != nil { + continue + } + + stream.Close() + + t.Logf("Successfully read stream after %d retries", count+1) + return + } + + t.Fatalf("we retried long enough but the test is still failing") +}