From f0909e5c56974e61d3ff01a8dda85b5852e8c8d0 Mon Sep 17 00:00:00 2001 From: eshitachandwani <59800922+eshitachandwani@users.noreply.github.com> Date: Tue, 22 Jul 2025 12:22:48 +0530 Subject: [PATCH 01/11] Update cdsbalancer_test.go --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index a152b78d7df5..8e48265cf432 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -207,7 +207,10 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { t.Helper() - cdsResourceRequestedCh := make(chan []string, 1) + // If more than 10 requests are received and no reader is available, + // additional requests will be dropped due to the use of a non-blocking + // send using select with a default case. + cdsResourceRequestedCh := make(chan []string, 10) cdsResourceCanceledCh := make(chan struct{}, 1) mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, From 2410c79b314896e47efbd85f60954b5d1c8cdbf5 Mon Sep 17 00:00:00 2001 From: Eshita Chandwani Date: Tue, 5 Aug 2025 19:35:40 +0530 Subject: [PATCH 02/11] changes --- .../cdsbalancer/aggregate_cluster_test.go | 3 +- .../balancer/cdsbalancer/cdsbalancer_test.go | 96 ++++++++++++++++--- 2 files changed, 85 insertions(+), 14 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go index f1dead24d5ce..770fc2afb043 100644 --- a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go @@ -872,10 +872,9 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { // removed from the tree no longer has a watcher and the new cluster added has a // new watcher. func (s) TestWatchers(t *testing.T) { - mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServer(t) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServerWithResourceCheck(ctx, t) const ( clusterA = clusterName diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 8e48265cf432..aa04752acc69 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -274,6 +274,78 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e. return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh } +func setupWithManagementServerWithResourceCheck(ctx context.Context, t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { + return setupWithManagementServerAndListenerWithResourceCheck(ctx, t, nil) +} + +// Same as setupWithManagementServer, but also allows the caller to specify +// a listener to be used by the management server. +func setupWithManagementServerAndListenerWithResourceCheck(ctx context.Context, t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { + t.Helper() + cdsResourceRequestedCh := make(chan []string, 1) + cdsResourceCanceledCh := make(chan struct{}, 1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + switch len(req.GetResourceNames()) { + case 0: + select { + case cdsResourceCanceledCh <- struct{}{}: + default: + } + default: + select { + case cdsResourceRequestedCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + } + return nil + }, + // Required for aggregate clusters as all resources cannot be requested + // at once. + AllowResourceSubset: true, + }) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + config, err := bootstrap.NewConfigFromContents(bc) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err) + } + pool := xdsclient.NewPool(config) + xdsC, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + t.Cleanup(xdsClose) + + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cds_experimental":{ + "cluster": "%s" + } + }] + }`, clusterName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC)) + + cc, err := grpc.NewClient(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.NewClient(%q) = %v", lis.Addr().String(), err) + } + cc.Connect() + t.Cleanup(func() { cc.Close() }) + + return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh +} + // Helper function to compare the load balancing configuration received on the // channel with the expected one. Both configs are marshalled to JSON and then // compared. @@ -324,11 +396,11 @@ func verifyRPCError(gotErr error, wantCode codes.Code, wantErr, wantNodeID strin // configuration changes, it stops requesting the old cluster resource and // starts requesting the new one. func (s) TestConfigurationUpdate_Success(t *testing.T) { - _, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServer(t) - - // Verify that the specified cluster resource is requested. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + _, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServerWithResourceCheck(ctx, t) + + // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { t.Fatal(err) @@ -692,11 +764,11 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { // continue using the previous good update. func (s) TestClusterUpdate_Failure(t *testing.T) { _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t) - - // Verify that the specified cluster resource is requested. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerWithResourceCheck(ctx, t) + + // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { t.Fatal(err) @@ -809,12 +881,12 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { func (s) TestResolverError(t *testing.T) { _, resolverErrCh, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) lis := testutils.NewListenerWrapper(t, nil) - mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListenerWithResourceCheck(ctx, t, lis) // Grab the wrapped connection from the listener wrapper. This will be used // to verify the connection is closed. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() val, err := lis.NewConnCh.Receive(ctx) if err != nil { t.Fatalf("Failed to receive new connection from wrapped listener: %v", err) @@ -952,11 +1024,11 @@ func (s) TestResolverError(t *testing.T) { // - when the cluster resource is re-sent by the management server, RPCs // should start succeeding. func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { - mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t) - - // Verify that the specified cluster resource is requested. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerWithResourceCheck(ctx, t) + + // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { t.Fatal(err) From 916870d6354b380286612c15effe24fd56b29295 Mon Sep 17 00:00:00 2001 From: Eshita Chandwani Date: Tue, 5 Aug 2025 19:55:00 +0530 Subject: [PATCH 03/11] changes --- .../cdsbalancer/aggregate_cluster_test.go | 20 +++---- .../balancer/cdsbalancer/cdsbalancer_test.go | 57 ++++++++----------- 2 files changed, 34 insertions(+), 43 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go index 770fc2afb043..1ef88230f9c3 100644 --- a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go @@ -131,7 +131,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) // Push the first cluster resource through the management server and // verify the configuration pushed to the child policy. @@ -174,7 +174,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { // contains the expected discovery mechanisms. func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) // Configure the management server with the aggregate cluster resource // pointing to two child clusters, one EDS and one LogicalDNS. Include the @@ -281,7 +281,7 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { // policy contains a single discovery mechanism. func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) // Configure the management server with the aggregate cluster resource // pointing to two child clusters. @@ -356,7 +356,7 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { // discovery mechanisms. func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) // Start off with the requested cluster being a leaf EDS cluster. resources := e2e.UpdateOptions{ @@ -450,7 +450,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // longer exceed maximum depth, but be at the maximum allowed depth, and // verifies that an RPC can be made successfully. func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -538,7 +538,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) // Configure the management server with an aggregate cluster resource having // a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources @@ -605,7 +605,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) // Configure the management server with an aggregate cluster resource that // has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources @@ -683,7 +683,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { // child policy and that an RPC can be successfully made. func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -768,7 +768,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { // that the aggregate cluster graph has no leaf clusters. func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -816,7 +816,7 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { // child policy and RPCs should get routed to that leaf cluster. func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) // Start a test service backend. server := stubserver.StartTestService(t, nil) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index aa04752acc69..f5abb104936d 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -195,42 +195,17 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { // - the grpc channel to the test backend service // - the manual resolver configured on the channel // - the xDS client used the grpc channel -// - a channel on which requested cluster resource names are sent -// - a channel used to signal that previously requested cluster resources are -// no longer requested -func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { +func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { return setupWithManagementServerAndListener(t, nil) } // Same as setupWithManagementServer, but also allows the caller to specify // a listener to be used by the management server. -func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { +func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { t.Helper() - // If more than 10 requests are received and no reader is available, - // additional requests will be dropped due to the use of a non-blocking - // send using select with a default case. - cdsResourceRequestedCh := make(chan []string, 10) - cdsResourceCanceledCh := make(chan struct{}, 1) mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - if req.GetTypeUrl() == version.V3ClusterURL { - switch len(req.GetResourceNames()) { - case 0: - select { - case cdsResourceCanceledCh <- struct{}{}: - default: - } - default: - select { - case cdsResourceRequestedCh <- req.GetResourceNames(): - default: - } - } - } - return nil - }, // Required for aggregate clusters as all resources cannot be requested // at once. AllowResourceSubset: true, @@ -271,14 +246,30 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e. cc.Connect() t.Cleanup(func() { cc.Close() }) - return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh + return mgmtServer, nodeID, cc, r, xdsC } +// Performs the following setup required for tests: +// - Spins up an xDS management server +// - Creates an xDS client talking to this management server +// - Creates a manual resolver that configures the cds LB policy as the +// top-level policy, and pushes an initial configuration to it +// - Creates a gRPC channel with the above manual resolver +// +// Returns the following: +// - the xDS management server +// - the nodeID expected by the management server +// - the grpc channel to the test backend service +// - the manual resolver configured on the channel +// - the xDS client used the grpc channel +// - a channel on which requested cluster resource names are sent +// - a channel used to signal that previously requested cluster resources are +// no longer requested func setupWithManagementServerWithResourceCheck(ctx context.Context, t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { return setupWithManagementServerAndListenerWithResourceCheck(ctx, t, nil) } -// Same as setupWithManagementServer, but also allows the caller to specify +// Same as setupWithManagementServerWithResourceCheck, but also allows the caller to specify // a listener to be used by the management server. func setupWithManagementServerAndListenerWithResourceCheck(ctx context.Context, t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { t.Helper() @@ -691,7 +682,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -715,7 +706,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) { // balancing configuration pushed to the child is as expected. func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, @@ -1103,7 +1094,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { func (s) TestClose(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -1150,7 +1141,7 @@ func (s) TestClose(t *testing.T) { func (s) TestExitIdle(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) // Start a test service backend. server := stubserver.StartTestService(t, nil) From 6ab5c5e197be53e54b4daafdd7471f4282317e82 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 14 Aug 2025 01:51:26 +0530 Subject: [PATCH 04/11] change test --- .../cdsbalancer/aggregate_cluster_test.go | 16 ++- .../balancer/cdsbalancer/cdsbalancer_test.go | 111 ++++++++++++------ 2 files changed, 87 insertions(+), 40 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go index 1ef88230f9c3..79fc4432ab76 100644 --- a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go @@ -35,9 +35,11 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" ) @@ -874,7 +876,19 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { func (s) TestWatchers(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServerWithResourceCheck(ctx, t) + cdsResourceRequestedCh := make(chan []string, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) > 0 { + select { + case cdsResourceRequestedCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + } + return nil + } + mgmtServer, nodeID, _, _, _ := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) const ( clusterA = clusterName diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index f5abb104936d..ac5416cf67fb 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -88,6 +88,7 @@ func waitForResourceNames(ctx context.Context, resourceNamesCh chan []string, wa case gotNames := <-resourceNamesCh: // Sort both slices before comparing them, as the order of clusters // does not matter. + fmt.Printf("eshita : gotNames: %v, wantNames: %v", gotNames, wantNames) if cmp.Equal(gotNames, wantNames, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { return nil } @@ -265,35 +266,17 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e. // - a channel on which requested cluster resource names are sent // - a channel used to signal that previously requested cluster resources are // no longer requested -func setupWithManagementServerWithResourceCheck(ctx context.Context, t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { - return setupWithManagementServerAndListenerWithResourceCheck(ctx, t, nil) +func setupWithManagementServerWithResourceCheck(ctx context.Context, t *testing.T, nStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { + return setupWithManagementServerAndListenerWithResourceCheck(ctx, t, nil, nStreamRequest) } // Same as setupWithManagementServerWithResourceCheck, but also allows the caller to specify // a listener to be used by the management server. -func setupWithManagementServerAndListenerWithResourceCheck(ctx context.Context, t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { +func setupWithManagementServerAndListenerWithResourceCheck(ctx context.Context, t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { t.Helper() - cdsResourceRequestedCh := make(chan []string, 1) - cdsResourceCanceledCh := make(chan struct{}, 1) mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - Listener: lis, - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - if req.GetTypeUrl() == version.V3ClusterURL { - switch len(req.GetResourceNames()) { - case 0: - select { - case cdsResourceCanceledCh <- struct{}{}: - default: - } - default: - select { - case cdsResourceRequestedCh <- req.GetResourceNames(): - case <-ctx.Done(): - } - } - } - return nil - }, + Listener: lis, + OnStreamRequest: onStreamRequest, // Required for aggregate clusters as all resources cannot be requested // at once. AllowResourceSubset: true, @@ -334,7 +317,7 @@ func setupWithManagementServerAndListenerWithResourceCheck(ctx context.Context, cc.Connect() t.Cleanup(func() { cc.Close() }) - return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh + return mgmtServer, nodeID, cc, r, xdsC } // Helper function to compare the load balancing configuration received on the @@ -386,10 +369,29 @@ func verifyRPCError(gotErr error, wantCode codes.Code, wantErr, wantNodeID strin // configuration again, it does not send out a new request, and when the // configuration changes, it stops requesting the old cluster resource and // starts requesting the new one. -func (s) TestConfigurationUpdate_Success(t *testing.T) { +func TestConfigurationUpdate_Success(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - _, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServerWithResourceCheck(ctx, t) + cdsResourceCanceledCh := make(chan struct{}, 1) + cdsResourceRequestedCh := make(chan []string, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + switch len(req.GetResourceNames()) { + case 0: + select { + case cdsResourceCanceledCh <- struct{}{}: + default: + } + default: + select { + case cdsResourceRequestedCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + } + return nil + } + _, _, _, r, xdsClient := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} @@ -757,13 +759,19 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerWithResourceCheck(ctx, t) - - // Verify that the specified cluster resource is requested. - wantNames := []string{clusterName} - if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { - t.Fatal(err) + cdsResourceCanceledCh := make(chan struct{}, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) == 0 { + select { + case cdsResourceCanceledCh <- struct{}{}: + default: + } + } + } + return nil } + mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) // Configure the management server to return a cluster resource that // contains a config_source_specifier for the `lrs_server` field which is not @@ -874,7 +882,26 @@ func (s) TestResolverError(t *testing.T) { lis := testutils.NewListenerWrapper(t, nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListenerWithResourceCheck(ctx, t, lis) + cdsResourceCanceledCh := make(chan struct{}, 1) + cdsResourceRequestedCh := make(chan []string, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + switch len(req.GetResourceNames()) { + case 0: + select { + case cdsResourceCanceledCh <- struct{}{}: + default: + } + default: + select { + case cdsResourceRequestedCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + } + return nil + } + mgmtServer, nodeID, cc, r, _ := setupWithManagementServerAndListenerWithResourceCheck(ctx, t, lis, onStreamReq) // Grab the wrapped connection from the listener wrapper. This will be used // to verify the connection is closed. @@ -1017,13 +1044,19 @@ func (s) TestResolverError(t *testing.T) { func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerWithResourceCheck(ctx, t) - - // Verify that the specified cluster resource is requested. - wantNames := []string{clusterName} - if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { - t.Fatal(err) + cdsResourceCanceledCh := make(chan struct{}, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) == 0 { + select { + case cdsResourceCanceledCh <- struct{}{}: + default: + } + } + } + return nil } + mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) // Start a test service backend. server := stubserver.StartTestService(t, nil) From ba6af81d6864c3bc76f10b238161eecebe4621ad Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 14 Aug 2025 02:20:06 +0530 Subject: [PATCH 05/11] remove ctx --- .../cdsbalancer/aggregate_cluster_test.go | 2 +- .../balancer/cdsbalancer/cdsbalancer_test.go | 26 ++++++------------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go index 79fc4432ab76..e96db537a346 100644 --- a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go @@ -888,7 +888,7 @@ func (s) TestWatchers(t *testing.T) { } return nil } - mgmtServer, nodeID, _, _, _ := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) + mgmtServer, nodeID, _, _, _ := setupWithManagementServerWithResourceCheck(t, onStreamReq) const ( clusterA = clusterName diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index ac5416cf67fb..0300cb76da92 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -263,16 +263,13 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e. // - the grpc channel to the test backend service // - the manual resolver configured on the channel // - the xDS client used the grpc channel -// - a channel on which requested cluster resource names are sent -// - a channel used to signal that previously requested cluster resources are -// no longer requested -func setupWithManagementServerWithResourceCheck(ctx context.Context, t *testing.T, nStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { - return setupWithManagementServerAndListenerWithResourceCheck(ctx, t, nil, nStreamRequest) +func setupWithManagementServerWithResourceCheck(t *testing.T, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { + return setupWithManagementServerAndListenerWithResourceCheck(t, nil, onStreamRequest) } // Same as setupWithManagementServerWithResourceCheck, but also allows the caller to specify // a listener to be used by the management server. -func setupWithManagementServerAndListenerWithResourceCheck(ctx context.Context, t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { +func setupWithManagementServerAndListenerWithResourceCheck(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { t.Helper() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, @@ -372,17 +369,10 @@ func verifyRPCError(gotErr error, wantCode codes.Code, wantErr, wantNodeID strin func TestConfigurationUpdate_Success(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cdsResourceCanceledCh := make(chan struct{}, 1) cdsResourceRequestedCh := make(chan []string, 1) onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { if req.GetTypeUrl() == version.V3ClusterURL { - switch len(req.GetResourceNames()) { - case 0: - select { - case cdsResourceCanceledCh <- struct{}{}: - default: - } - default: + if len(req.GetResourceNames()) > 0 { select { case cdsResourceRequestedCh <- req.GetResourceNames(): case <-ctx.Done(): @@ -391,7 +381,7 @@ func TestConfigurationUpdate_Success(t *testing.T) { } return nil } - _, _, _, r, xdsClient := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) + _, _, _, r, xdsClient := setupWithManagementServerWithResourceCheck(t, onStreamReq) // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} @@ -771,7 +761,7 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(t, onStreamReq) // Configure the management server to return a cluster resource that // contains a config_source_specifier for the `lrs_server` field which is not @@ -901,7 +891,7 @@ func (s) TestResolverError(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, r, _ := setupWithManagementServerAndListenerWithResourceCheck(ctx, t, lis, onStreamReq) + mgmtServer, nodeID, cc, r, _ := setupWithManagementServerAndListenerWithResourceCheck(t, lis, onStreamReq) // Grab the wrapped connection from the listener wrapper. This will be used // to verify the connection is closed. @@ -1056,7 +1046,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(ctx, t, onStreamReq) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(t, onStreamReq) // Start a test service backend. server := stubserver.StartTestService(t, nil) From 2974342ce48bab0cfc058a57c8018f838f16599b Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 14 Aug 2025 02:24:54 +0530 Subject: [PATCH 06/11] add comment --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 0300cb76da92..2e948bf2d59c 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -256,6 +256,7 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e. // - Creates a manual resolver that configures the cds LB policy as the // top-level policy, and pushes an initial configuration to it // - Creates a gRPC channel with the above manual resolver +// - Executes OnStreamRequest callback provided by the caller // // Returns the following: // - the xDS management server From e74f6d15252da45f4a669c550115c3266b52f908 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 14 Aug 2025 02:39:08 +0530 Subject: [PATCH 07/11] change from to 2 function --- .../cdsbalancer/aggregate_cluster_test.go | 22 ++--- .../balancer/cdsbalancer/cdsbalancer_test.go | 93 +++---------------- 2 files changed, 24 insertions(+), 91 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go index e96db537a346..c86e73ca1c63 100644 --- a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go @@ -133,7 +133,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) // Push the first cluster resource through the management server and // verify the configuration pushed to the child policy. @@ -176,7 +176,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { // contains the expected discovery mechanisms. func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters, one EDS and one LogicalDNS. Include the @@ -283,7 +283,7 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { // policy contains a single discovery mechanism. func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters. @@ -358,7 +358,7 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { // discovery mechanisms. func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) // Start off with the requested cluster being a leaf EDS cluster. resources := e2e.UpdateOptions{ @@ -452,7 +452,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // longer exceed maximum depth, but be at the maximum allowed depth, and // verifies that an RPC can be made successfully. func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -540,7 +540,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) // Configure the management server with an aggregate cluster resource having // a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources @@ -607,7 +607,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) // Configure the management server with an aggregate cluster resource that // has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources @@ -685,7 +685,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { // child policy and that an RPC can be successfully made. func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -770,7 +770,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { // that the aggregate cluster graph has no leaf clusters. func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -818,7 +818,7 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { // child policy and RPCs should get routed to that leaf cluster. func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -888,7 +888,7 @@ func (s) TestWatchers(t *testing.T) { } return nil } - mgmtServer, nodeID, _, _, _ := setupWithManagementServerWithResourceCheck(t, onStreamReq) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, onStreamReq) const ( clusterA = clusterName diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 2e948bf2d59c..a18635e4eb17 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -183,73 +183,6 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { return cdsBalancerCh } -// Performs the following setup required for tests: -// - Spins up an xDS management server -// - Creates an xDS client talking to this management server -// - Creates a manual resolver that configures the cds LB policy as the -// top-level policy, and pushes an initial configuration to it -// - Creates a gRPC channel with the above manual resolver -// -// Returns the following: -// - the xDS management server -// - the nodeID expected by the management server -// - the grpc channel to the test backend service -// - the manual resolver configured on the channel -// - the xDS client used the grpc channel -func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { - return setupWithManagementServerAndListener(t, nil) -} - -// Same as setupWithManagementServer, but also allows the caller to specify -// a listener to be used by the management server. -func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { - t.Helper() - - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - Listener: lis, - // Required for aggregate clusters as all resources cannot be requested - // at once. - AllowResourceSubset: true, - }) - - // Create bootstrap configuration pointing to the above management server. - nodeID := uuid.New().String() - bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) - - config, err := bootstrap.NewConfigFromContents(bc) - if err != nil { - t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err) - } - pool := xdsclient.NewPool(config) - xdsC, xdsClose, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{ - Name: t.Name(), - }) - if err != nil { - t.Fatalf("Failed to create xDS client: %v", err) - } - t.Cleanup(xdsClose) - - r := manual.NewBuilderWithScheme("whatever") - jsonSC := fmt.Sprintf(`{ - "loadBalancingConfig":[{ - "cds_experimental":{ - "cluster": "%s" - } - }] - }`, clusterName) - scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) - r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC)) - - cc, err := grpc.NewClient(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) - if err != nil { - t.Fatalf("grpc.NewClient(%q) = %v", lis.Addr().String(), err) - } - cc.Connect() - t.Cleanup(func() { cc.Close() }) - - return mgmtServer, nodeID, cc, r, xdsC -} - // Performs the following setup required for tests: // - Spins up an xDS management server // - Creates an xDS client talking to this management server @@ -264,13 +197,13 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e. // - the grpc channel to the test backend service // - the manual resolver configured on the channel // - the xDS client used the grpc channel -func setupWithManagementServerWithResourceCheck(t *testing.T, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { - return setupWithManagementServerAndListenerWithResourceCheck(t, nil, onStreamRequest) +func setupWithManagementServer(t *testing.T, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { + return setupWithManagementServerAndListener(t, nil, onStreamRequest) } -// Same as setupWithManagementServerWithResourceCheck, but also allows the caller to specify +// Same as setupWithManagementServer, but also allows the caller to specify // a listener to be used by the management server. -func setupWithManagementServerAndListenerWithResourceCheck(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { +func setupWithManagementServerAndListener(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { t.Helper() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, @@ -382,7 +315,7 @@ func TestConfigurationUpdate_Success(t *testing.T) { } return nil } - _, _, _, r, xdsClient := setupWithManagementServerWithResourceCheck(t, onStreamReq) + _, _, _, r, xdsClient := setupWithManagementServer(t, onStreamReq) // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} @@ -530,7 +463,7 @@ func (s) TestConfigurationUpdate_MissingXdsClient(t *testing.T) { // Tests success scenarios where the cds LB policy receives a cluster resource // from the management server. Verifies that the load balancing configuration // pushed to the child is as expected. -func (s) TestClusterUpdate_Success(t *testing.T) { +func TestClusterUpdate_Success(t *testing.T) { tests := []struct { name string clusterResource *v3clusterpb.Cluster @@ -675,7 +608,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -699,7 +632,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) { // balancing configuration pushed to the child is as expected. func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, @@ -762,7 +695,7 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(t, onStreamReq) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, onStreamReq) // Configure the management server to return a cluster resource that // contains a config_source_specifier for the `lrs_server` field which is not @@ -892,7 +825,7 @@ func (s) TestResolverError(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, r, _ := setupWithManagementServerAndListenerWithResourceCheck(t, lis, onStreamReq) + mgmtServer, nodeID, cc, r, _ := setupWithManagementServerAndListener(t, lis, onStreamReq) // Grab the wrapped connection from the listener wrapper. This will be used // to verify the connection is closed. @@ -1047,7 +980,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServerWithResourceCheck(t, onStreamReq) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, onStreamReq) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -1118,7 +1051,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { func (s) TestClose(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -1165,7 +1098,7 @@ func (s) TestClose(t *testing.T) { func (s) TestExitIdle(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) From 2c3d63bb5037b6edd3405ab68b0f915f1d174344 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 14 Aug 2025 02:50:22 +0530 Subject: [PATCH 08/11] make only one test --- .../cdsbalancer/aggregate_cluster_test.go | 22 +++++++-------- .../balancer/cdsbalancer/cdsbalancer_test.go | 27 +++++++------------ 2 files changed, 21 insertions(+), 28 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go index c86e73ca1c63..4db998b4c543 100644 --- a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go @@ -133,7 +133,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Push the first cluster resource through the management server and // verify the configuration pushed to the child policy. @@ -176,7 +176,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { // contains the expected discovery mechanisms. func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters, one EDS and one LogicalDNS. Include the @@ -283,7 +283,7 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { // policy contains a single discovery mechanism. func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters. @@ -358,7 +358,7 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { // discovery mechanisms. func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Start off with the requested cluster being a leaf EDS cluster. resources := e2e.UpdateOptions{ @@ -452,7 +452,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // longer exceed maximum depth, but be at the maximum allowed depth, and // verifies that an RPC can be made successfully. func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -540,7 +540,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with an aggregate cluster resource having // a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources @@ -607,7 +607,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with an aggregate cluster resource that // has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources @@ -685,7 +685,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { // child policy and that an RPC can be successfully made. func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -770,7 +770,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { // that the aggregate cluster graph has no leaf clusters. func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -818,7 +818,7 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { // child policy and RPCs should get routed to that leaf cluster. func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -888,7 +888,7 @@ func (s) TestWatchers(t *testing.T) { } return nil } - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, onStreamReq) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, onStreamReq) const ( clusterA = clusterName diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index a18635e4eb17..8c08a03847a1 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -88,7 +88,6 @@ func waitForResourceNames(ctx context.Context, resourceNamesCh chan []string, wa case gotNames := <-resourceNamesCh: // Sort both slices before comparing them, as the order of clusters // does not matter. - fmt.Printf("eshita : gotNames: %v, wantNames: %v", gotNames, wantNames) if cmp.Equal(gotNames, wantNames, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { return nil } @@ -197,13 +196,7 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { // - the grpc channel to the test backend service // - the manual resolver configured on the channel // - the xDS client used the grpc channel -func setupWithManagementServer(t *testing.T, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { - return setupWithManagementServerAndListener(t, nil, onStreamRequest) -} - -// Same as setupWithManagementServer, but also allows the caller to specify -// a listener to be used by the management server. -func setupWithManagementServerAndListener(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { +func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { t.Helper() mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ Listener: lis, @@ -300,7 +293,7 @@ func verifyRPCError(gotErr error, wantCode codes.Code, wantErr, wantNodeID strin // configuration again, it does not send out a new request, and when the // configuration changes, it stops requesting the old cluster resource and // starts requesting the new one. -func TestConfigurationUpdate_Success(t *testing.T) { +func (s) TestConfigurationUpdate_Success(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cdsResourceRequestedCh := make(chan []string, 1) @@ -315,7 +308,7 @@ func TestConfigurationUpdate_Success(t *testing.T) { } return nil } - _, _, _, r, xdsClient := setupWithManagementServer(t, onStreamReq) + _, _, _, r, xdsClient := setupWithManagementServer(t, nil, onStreamReq) // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} @@ -608,7 +601,7 @@ func TestClusterUpdate_Success(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -632,7 +625,7 @@ func TestClusterUpdate_Success(t *testing.T) { // balancing configuration pushed to the child is as expected. func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, @@ -695,7 +688,7 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, onStreamReq) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq) // Configure the management server to return a cluster resource that // contains a config_source_specifier for the `lrs_server` field which is not @@ -825,7 +818,7 @@ func (s) TestResolverError(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, r, _ := setupWithManagementServerAndListener(t, lis, onStreamReq) + mgmtServer, nodeID, cc, r, _ := setupWithManagementServer(t, lis, onStreamReq) // Grab the wrapped connection from the listener wrapper. This will be used // to verify the connection is closed. @@ -980,7 +973,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { } return nil } - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, onStreamReq) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -1051,7 +1044,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { func (s) TestClose(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -1098,7 +1091,7 @@ func (s) TestClose(t *testing.T) { func (s) TestExitIdle(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) From 2b471783bd3f2d3e7cae8a0da7c75954f437ae43 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 14 Aug 2025 02:57:34 +0530 Subject: [PATCH 09/11] add s --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 8c08a03847a1..e1844a385f9f 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -456,7 +456,7 @@ func (s) TestConfigurationUpdate_MissingXdsClient(t *testing.T) { // Tests success scenarios where the cds LB policy receives a cluster resource // from the management server. Verifies that the load balancing configuration // pushed to the child is as expected. -func TestClusterUpdate_Success(t *testing.T) { +func (s) TestClusterUpdate_Success(t *testing.T) { tests := []struct { name string clusterResource *v3clusterpb.Cluster From 715d076593364ececb3ac51bbf666538d9b52509 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 14 Aug 2025 13:10:20 +0530 Subject: [PATCH 10/11] remove defaults --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index e1844a385f9f..dd4f0b4ced2d 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -183,7 +183,8 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { } // Performs the following setup required for tests: -// - Spins up an xDS management server +// - Spins up an xDS management server and and the provided onStreamRequest +// function is set to be called for every incoming request on the ADS stream. // - Creates an xDS client talking to this management server // - Creates a manual resolver that configures the cds LB policy as the // top-level policy, and pushes an initial configuration to it @@ -682,7 +683,7 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { if len(req.GetResourceNames()) == 0 { select { case cdsResourceCanceledCh <- struct{}{}: - default: + case <-ctx.Done(): } } } @@ -807,7 +808,7 @@ func (s) TestResolverError(t *testing.T) { case 0: select { case cdsResourceCanceledCh <- struct{}{}: - default: + case <-ctx.Done(): } default: select { @@ -967,7 +968,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { if len(req.GetResourceNames()) == 0 { select { case cdsResourceCanceledCh <- struct{}{}: - default: + case <-ctx.Done(): } } } From 97b05cec6dcedbbd99839d8a59c5b9f98c5ebf1e Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Mon, 18 Aug 2025 08:01:16 +0530 Subject: [PATCH 11/11] remove unwanted commment --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index dd4f0b4ced2d..ac2324628056 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -189,7 +189,6 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { // - Creates a manual resolver that configures the cds LB policy as the // top-level policy, and pushes an initial configuration to it // - Creates a gRPC channel with the above manual resolver -// - Executes OnStreamRequest callback provided by the caller // // Returns the following: // - the xDS management server