diff --git a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go index f1dead24d5ce..4db998b4c543 100644 --- a/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go +++ b/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go @@ -35,9 +35,11 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" ) @@ -131,7 +133,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Push the first cluster resource through the management server and // verify the configuration pushed to the child policy. @@ -174,7 +176,7 @@ func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) { // contains the expected discovery mechanisms. func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters, one EDS and one LogicalDNS. Include the @@ -281,7 +283,7 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { // policy contains a single discovery mechanism. func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with the aggregate cluster resource // pointing to two child clusters. @@ -356,7 +358,7 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { // discovery mechanisms. func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Start off with the requested cluster being a leaf EDS cluster. resources := e2e.UpdateOptions{ @@ -450,7 +452,7 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // longer exceed maximum depth, but be at the maximum allowed depth, and // verifies that an RPC can be made successfully. func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -538,7 +540,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with an aggregate cluster resource having // a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources @@ -605,7 +607,7 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { // pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) // Configure the management server with an aggregate cluster resource that // has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources @@ -683,7 +685,7 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { // child policy and that an RPC can be successfully made. func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -768,7 +770,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { // that the aggregate cluster graph has no leaf clusters. func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) const ( clusterNameA = clusterName // cluster name in cds LB policy config @@ -816,7 +818,7 @@ func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) { // child policy and RPCs should get routed to that leaf cluster. func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -872,10 +874,21 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) { // removed from the tree no longer has a watcher and the new cluster added has a // new watcher. func (s) TestWatchers(t *testing.T) { - mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServer(t) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + cdsResourceRequestedCh := make(chan []string, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) > 0 { + select { + case cdsResourceRequestedCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + } + return nil + } + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, onStreamReq) const ( clusterA = clusterName diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index a152b78d7df5..ac2324628056 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -183,7 +183,8 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { } // Performs the following setup required for tests: -// - Spins up an xDS management server +// - Spins up an xDS management server and and the provided onStreamRequest +// function is set to be called for every incoming request on the ADS stream. // - Creates an xDS client talking to this management server // - Creates a manual resolver that configures the cds LB policy as the // top-level policy, and pushes an initial configuration to it @@ -195,39 +196,11 @@ func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer { // - the grpc channel to the test backend service // - the manual resolver configured on the channel // - the xDS client used the grpc channel -// - a channel on which requested cluster resource names are sent -// - a channel used to signal that previously requested cluster resources are -// no longer requested -func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { - return setupWithManagementServerAndListener(t, nil) -} - -// Same as setupWithManagementServer, but also allows the caller to specify -// a listener to be used by the management server. -func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) { +func setupWithManagementServer(t *testing.T, lis net.Listener, onStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient) { t.Helper() - - cdsResourceRequestedCh := make(chan []string, 1) - cdsResourceCanceledCh := make(chan struct{}, 1) mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ - Listener: lis, - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - if req.GetTypeUrl() == version.V3ClusterURL { - switch len(req.GetResourceNames()) { - case 0: - select { - case cdsResourceCanceledCh <- struct{}{}: - default: - } - default: - select { - case cdsResourceRequestedCh <- req.GetResourceNames(): - default: - } - } - } - return nil - }, + Listener: lis, + OnStreamRequest: onStreamRequest, // Required for aggregate clusters as all resources cannot be requested // at once. AllowResourceSubset: true, @@ -268,7 +241,7 @@ func setupWithManagementServerAndListener(t *testing.T, lis net.Listener) (*e2e. cc.Connect() t.Cleanup(func() { cc.Close() }) - return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh + return mgmtServer, nodeID, cc, r, xdsC } // Helper function to compare the load balancing configuration received on the @@ -321,11 +294,23 @@ func verifyRPCError(gotErr error, wantCode codes.Code, wantErr, wantNodeID strin // configuration changes, it stops requesting the old cluster resource and // starts requesting the new one. func (s) TestConfigurationUpdate_Success(t *testing.T) { - _, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServer(t) - - // Verify that the specified cluster resource is requested. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + cdsResourceRequestedCh := make(chan []string, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) > 0 { + select { + case cdsResourceRequestedCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + } + return nil + } + _, _, _, r, xdsClient := setupWithManagementServer(t, nil, onStreamReq) + + // Verify that the specified cluster resource is requested. wantNames := []string{clusterName} if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { t.Fatal(err) @@ -616,7 +601,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -640,7 +625,7 @@ func (s) TestClusterUpdate_Success(t *testing.T) { // balancing configuration pushed to the child is as expected. func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, _, _, _ := setupWithManagementServer(t, nil, nil) clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, @@ -689,15 +674,21 @@ func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) { // continue using the previous good update. func (s) TestClusterUpdate_Failure(t *testing.T) { _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t) - - // Verify that the specified cluster resource is requested. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - wantNames := []string{clusterName} - if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { - t.Fatal(err) + cdsResourceCanceledCh := make(chan struct{}, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) == 0 { + select { + case cdsResourceCanceledCh <- struct{}{}: + case <-ctx.Done(): + } + } + } + return nil } + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq) // Configure the management server to return a cluster resource that // contains a config_source_specifier for the `lrs_server` field which is not @@ -806,12 +797,31 @@ func (s) TestClusterUpdate_Failure(t *testing.T) { func (s) TestResolverError(t *testing.T) { _, resolverErrCh, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) lis := testutils.NewListenerWrapper(t, nil) - mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServerAndListener(t, lis) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cdsResourceCanceledCh := make(chan struct{}, 1) + cdsResourceRequestedCh := make(chan []string, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + switch len(req.GetResourceNames()) { + case 0: + select { + case cdsResourceCanceledCh <- struct{}{}: + case <-ctx.Done(): + } + default: + select { + case cdsResourceRequestedCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + } + return nil + } + mgmtServer, nodeID, cc, r, _ := setupWithManagementServer(t, lis, onStreamReq) // Grab the wrapped connection from the listener wrapper. This will be used // to verify the connection is closed. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() val, err := lis.NewConnCh.Receive(ctx) if err != nil { t.Fatalf("Failed to receive new connection from wrapped listener: %v", err) @@ -949,15 +959,21 @@ func (s) TestResolverError(t *testing.T) { // - when the cluster resource is re-sent by the management server, RPCs // should start succeeding. func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { - mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t) - - // Verify that the specified cluster resource is requested. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - wantNames := []string{clusterName} - if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil { - t.Fatal(err) + cdsResourceCanceledCh := make(chan struct{}, 1) + onStreamReq := func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() == version.V3ClusterURL { + if len(req.GetResourceNames()) == 0 { + select { + case cdsResourceCanceledCh <- struct{}{}: + case <-ctx.Done(): + } + } + } + return nil } + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, onStreamReq) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -1028,7 +1044,7 @@ func (s) TestClusterUpdate_ResourceNotFound(t *testing.T) { func (s) TestClose(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil) @@ -1075,7 +1091,7 @@ func (s) TestClose(t *testing.T) { func (s) TestExitIdle(t *testing.T) { cdsBalancerCh := registerWrappedCDSPolicy(t) _, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t) - mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + mgmtServer, nodeID, cc, _, _ := setupWithManagementServer(t, nil, nil) // Start a test service backend. server := stubserver.StartTestService(t, nil)