diff --git a/internal/xds/clients/lrsclient/lrsclient.go b/internal/xds/clients/lrsclient/lrsclient.go index 8f7e1852d4bf..3e4594957619 100644 --- a/internal/xds/clients/lrsclient/lrsclient.go +++ b/internal/xds/clients/lrsclient/lrsclient.go @@ -61,10 +61,7 @@ type LRSClient struct { // New returns a new LRS Client configured with the provided config. func New(config Config) (*LRSClient, error) { - switch { - case config.Node.ID == "": - return nil, errors.New("lrsclient: node ID in node is empty") - case config.TransportBuilder == nil: + if config.TransportBuilder == nil { return nil, errors.New("lrsclient: transport builder is nil") } diff --git a/internal/xds/xdsclient/clientimpl.go b/internal/xds/xdsclient/clientimpl.go index 9a73b96cf3de..dab08620ac1d 100644 --- a/internal/xds/xdsclient/clientimpl.go +++ b/internal/xds/xdsclient/clientimpl.go @@ -129,7 +129,21 @@ func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecor if err != nil { return nil, err } - c := &clientImpl{XDSClient: client, xdsClientConfig: gConfig, bootstrapConfig: config, target: target, refCount: 1} + lrsC, err := lrsclient.New(lrsclient.Config{ + Node: gConfig.Node, + TransportBuilder: gConfig.TransportBuilder, + }) + if err != nil { + return nil, err + } + c := &clientImpl{ + XDSClient: client, + xdsClientConfig: gConfig, + bootstrapConfig: config, + target: target, + refCount: 1, + lrsClient: lrsC, + } c.logger = prefixLogger(c) return c, nil } diff --git a/internal/xds/xdsclient/clientimpl_loadreport.go b/internal/xds/xdsclient/clientimpl_loadreport.go index 2cfe28d4a824..ffd0c90b8f54 100644 --- a/internal/xds/xdsclient/clientimpl_loadreport.go +++ b/internal/xds/xdsclient/clientimpl_loadreport.go @@ -32,18 +32,6 @@ import ( // // It returns a lrsclient.LoadStore for the user to report loads. func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) { - if c.lrsClient == nil { - lrsC, err := lrsclient.New(lrsclient.Config{ - Node: c.xdsClientConfig.Node, - TransportBuilder: c.xdsClientConfig.TransportBuilder, - }) - if err != nil { - c.logger.Warningf("Failed to create an lrs client to the management server to report load: %v", server, err) - return nil, func(context.Context) {} - } - c.lrsClient = lrsC - } - load, err := c.lrsClient.ReportLoad(clients.ServerIdentifier{ ServerURI: server.ServerURI(), Extensions: grpctransport.ServerIdentifierExtension{ diff --git a/internal/xds/xdsclient/tests/loadreport_test.go b/internal/xds/xdsclient/tests/loadreport_test.go index 97ab891ab794..e33f3799cd8b 100644 --- a/internal/xds/xdsclient/tests/loadreport_test.go +++ b/internal/xds/xdsclient/tests/loadreport_test.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "net" + "sync" "testing" "github.com/google/go-cmp/cmp" @@ -30,17 +31,24 @@ import ( "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/testutils/xds/fakeserver" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/clients" + "google.golang.org/grpc/internal/xds/xdsclient" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" "google.golang.org/protobuf/testing/protocmp" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/protobuf/types/known/durationpb" ) @@ -437,3 +445,113 @@ func (s) TestReportLoad_StreamCreation(t *testing.T) { defer sCancel3() cancel3(sCtx3) } + +// TestConcurrentReportLoad verifies that the client can safely handle concurrent +// requests to initiate load reporting streams. It launches multiple goroutines +// that all call client.ReportLoad simultaneously. +func (s) TestConcurrentReportLoad(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true}) + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + client := createXDSClient(t, bc) + + serverConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address}) + if err != nil { + t.Fatalf("Failed to create server config for testing: %v", err) + } + + // Call ReportLoad() concurrently from multiple go routines. + var wg sync.WaitGroup + const numGoroutines = 10 + wg.Add(numGoroutines) + for range numGoroutines { + go func() { + defer wg.Done() + _, cancelStore := client.ReportLoad(serverConfig) + defer cancelStore(ctx) + }() + } + wg.Wait() +} + +// TestConcurrentChannels verifies that we can create multiple gRPC channels +// concurrently with a shared XDSClient, each of which will create a new LRS +// stream without any race. +func (s) TestConcurrentChannels(t *testing.T) { + // TODO(emchandwani) : Unskip after https://github.com/grpc/grpc-go/pull/8526 gets merged. + t.Skip() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true, SupportLoadReportingService: true}) + + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + if internal.NewXDSResolverWithPoolForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") + } + + config, err := bootstrap.NewConfigFromContents(bc) + if err != nil { + t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err) + } + pool := xdsclient.NewPool(config) + + resolverBuilder := internal.NewXDSResolverWithPoolForTesting.(func(*xdsclient.Pool) (resolver.Builder, error)) + xdsResolver, err := resolverBuilder(pool) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + server := stubserver.StartTestService(t, nil) + defer server.Stop() + + // Configure the management server with resources that enable LRS. + const serviceName = "my-service-e2e-lrs-test" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: testutils.ParsePort(t, server.Address), + SecLevel: e2e.SecurityLevelNone, + }) + resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{ + Self: &v3corepb.SelfConfigSource{}, + }, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + const ( + numGoroutines = 10 + numRPCs = 10 + ) + for range numGoroutines { + wg.Add(1) + go func() { + defer wg.Done() + for range numRPCs { + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver)) + if err != nil { + t.Errorf("grpc.NewClient() failed: %v", err) + return + } + defer cc.Close() + + testClient := testgrpc.NewTestServiceClient(cc) + if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("EmptyCall() failed: %v", err) + return + } + } + }() + } + wg.Wait() +}