diff --git a/src/Discovery/src/Eureka/EurekaDiscoveryClient.cs b/src/Discovery/src/Eureka/EurekaDiscoveryClient.cs index e955b3c454..8076affe7a 100644 --- a/src/Discovery/src/Eureka/EurekaDiscoveryClient.cs +++ b/src/Discovery/src/Eureka/EurekaDiscoveryClient.cs @@ -373,20 +373,19 @@ internal async Task FetchRegistryAsync(bool doFullUpdate, CancellationToken canc private static ReadOnlyDictionary> ToServiceInstanceMap(ApplicationInfoCollection apps) { - // @formatter:wrap_chained_method_calls chop_always - // @formatter:wrap_before_first_method_call true - - return apps - .SelectMany(app => app.Instances) - .GroupBy(instance => instance.AppName, StringComparer.OrdinalIgnoreCase) - .ToDictionary(grouping => grouping.Key, grouping => (IReadOnlyList)grouping - .Select(instance => instance.ToServiceInstance()) - .ToList() - .AsReadOnly(), StringComparer.OrdinalIgnoreCase) - .AsReadOnly(); - - // @formatter:wrap_before_first_method_call restore - // @formatter:wrap_chained_method_calls restore + var dictionary = new Dictionary>(StringComparer.OrdinalIgnoreCase); + + foreach (string vipAddress in apps.VipInstanceMap.Keys.ToArray()) + { + ReadOnlyCollection instancesByVipAddress = apps.GetInstancesByVipAddress(vipAddress); + + if (instancesByVipAddress.Count > 0) + { + dictionary[vipAddress] = instancesByVipAddress.Select(instance => instance.ToServiceInstance()).ToList().AsReadOnly(); + } + } + + return new ReadOnlyDictionary>(dictionary); } private void RaiseFetchEvents(ApplicationsFetchedEventArgs? applicationsEventArgs, DiscoveryInstancesFetchedEventArgs? instancesEventArgs) diff --git a/src/Discovery/src/HttpClients/LoadBalancers/ServiceInstancesResolver.cs b/src/Discovery/src/HttpClients/LoadBalancers/ServiceInstancesResolver.cs index e1231d171d..79a97ddc46 100644 --- a/src/Discovery/src/HttpClients/LoadBalancers/ServiceInstancesResolver.cs +++ b/src/Discovery/src/HttpClients/LoadBalancers/ServiceInstancesResolver.cs @@ -83,6 +83,7 @@ public async Task> ResolveInstancesAsync(string serviceI if (instancesFromCache != null) { + instancesFromCache = RemoveDuplicatesByUri(instancesFromCache); LogReturningInstancesFromCache(instancesFromCache.Count); return instancesFromCache; } @@ -103,6 +104,8 @@ public async Task> ResolveInstancesAsync(string serviceI } } + instances = RemoveDuplicatesByUri(instances); + if (_distributedCache != null) { byte[] cacheValue = ToCacheValue(instances); @@ -112,6 +115,22 @@ public async Task> ResolveInstancesAsync(string serviceI return instances; } + private static List RemoveDuplicatesByUri(List instances) + { + var seenUris = new HashSet(StringComparer.OrdinalIgnoreCase); + var result = new List(); + + foreach (IServiceInstance instance in instances) + { + if (seenUris.Add(instance.Uri.AbsoluteUri)) + { + result.Add(instance); + } + } + + return result; + } + private static List? FromCacheValue(byte[]? cacheValue) { if (cacheValue is { Length: > 0 }) diff --git a/src/Discovery/test/Configuration.Test/ConfigurationDiscoveryClientTest.cs b/src/Discovery/test/Configuration.Test/ConfigurationDiscoveryClientTest.cs index 7fdf7aa3df..f83e5b555c 100644 --- a/src/Discovery/test/Configuration.Test/ConfigurationDiscoveryClientTest.cs +++ b/src/Discovery/test/Configuration.Test/ConfigurationDiscoveryClientTest.cs @@ -229,13 +229,13 @@ public async Task InstancesFetched_event_is_raised_after_configuration_change() await using WebApplication webApplication = builder.Build(); ConfigurationDiscoveryClient discoveryClient = webApplication.Services.GetServices().OfType().Single(); - int eventCount = 0; DiscoveryInstancesFetchedEventArgs? eventArgs = null; + int eventCount = 0; discoveryClient.InstancesFetched += (_, args) => { - eventCount++; eventArgs = args; + Interlocked.Increment(ref eventCount); }; fileProvider.ReplaceAppSettingsJsonFile(""" diff --git a/src/Discovery/test/Eureka.Test/EurekaDiscoveryClientTest.cs b/src/Discovery/test/Eureka.Test/EurekaDiscoveryClientTest.cs index 1891be394b..f98e97c2e0 100644 --- a/src/Discovery/test/Eureka.Test/EurekaDiscoveryClientTest.cs +++ b/src/Discovery/test/Eureka.Test/EurekaDiscoveryClientTest.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. +using System.Globalization; using System.Net; using FluentAssertions.Extensions; using Microsoft.AspNetCore.Builder; @@ -655,38 +656,127 @@ public async Task ApplicationEventsFireAfterFetch() webApplication.Services.GetRequiredService().Using(handler); var discoveryClient = webApplication.Services.GetRequiredService(); - int applicationsEventCount = 0; ApplicationsFetchedEventArgs? applicationsEventArgs = null; - int instancesEventCount = 0; + int applicationsEventCount = 0; DiscoveryInstancesFetchedEventArgs? instancesEventArgs = null; + int instancesEventCount = 0; discoveryClient.ApplicationsFetched += (_, args) => { - applicationsEventCount++; applicationsEventArgs = args; + Interlocked.Increment(ref applicationsEventCount); }; discoveryClient.InstancesFetched += (_, args) => { - instancesEventCount++; instancesEventArgs = args; + Interlocked.Increment(ref instancesEventCount); }; await discoveryClient.FetchRegistryAsync(true, TestContext.Current.CancellationToken); SpinWait.SpinUntil(() => applicationsEventCount == 1 && instancesEventCount == 1, 5.Seconds()).Should().BeTrue(); + applicationsEventArgs.Should().NotBeNull(); + InstanceInfo oldInstanceFromAppEvent = applicationsEventArgs.Applications.Should().ContainSingle().Which.Instances.Should().ContainSingle().Which; + oldInstanceFromAppEvent.ActionType.Should().Be(ActionType.Added); + + instancesEventArgs.Should().NotBeNull(); + IServiceInstance oldInstanceFromEvent = instancesEventArgs.InstancesByServiceId.Should().ContainKey("foo").WhoseValue.Should().ContainSingle().Which; + oldInstanceFromEvent.Uri.ToString().Should().Be("http://localhost:8080/"); + + IList oldInstancesFromGet = await discoveryClient.GetInstancesAsync("foo", TestContext.Current.CancellationToken); + oldInstancesFromGet.Should().ContainSingle().Which.Uri.Should().Be(oldInstanceFromEvent.Uri); + await discoveryClient.FetchRegistryAsync(false, TestContext.Current.CancellationToken); SpinWait.SpinUntil(() => applicationsEventCount == 2 && instancesEventCount == 2, 5.Seconds()).Should().BeTrue(); + InstanceInfo newInstanceFromAppEvent = applicationsEventArgs.Applications.Should().ContainSingle().Which.Instances.Should().ContainSingle().Which; + newInstanceFromAppEvent.ActionType.Should().Be(ActionType.Modified); + + IServiceInstance newInstanceFromEvent = instancesEventArgs.InstancesByServiceId.Should().ContainKey("foo").WhoseValue.Should().ContainSingle().Which; + newInstanceFromEvent.Uri.ToString().Should().Be("http://modified-host:8080/"); + + IList newInstancesFromGet = await discoveryClient.GetInstancesAsync("foo", TestContext.Current.CancellationToken); + newInstancesFromGet.Should().ContainSingle().Which.Uri.Should().Be(newInstanceFromEvent.Uri); + handler.Mock.VerifyNoOutstandingExpectation(); + } - applicationsEventArgs.Should().NotBeNull(); - InstanceInfo newInstanceInfo = applicationsEventArgs.Applications.Should().ContainSingle().Which.Instances.Should().ContainSingle().Which; - newInstanceInfo.ActionType.Should().Be(ActionType.Modified); + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task InstancesFetched_returns_same_data_as_GetInstancesAsync(bool filterOnlyUpInstances) + { + const string registryJson = """ + { + "applications": { + "application": [ + { + "name": "ignored", + "instance": [ + { + "instanceId": "id1", + "hostName": "h1", + "app": "app1", + "ipAddr": "10.0.0.1", + "status": "UP", + "dataCenterInfo": { + "@class": "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo", + "name": "MyOwn" + }, + "vipAddress": "vapp1" + }, + { + "instanceId": "id2", + "hostName": "h2", + "app": "app1", + "ipAddr": "10.0.0.2", + "status": "DOWN", + "dataCenterInfo": { + "@class": "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo", + "name": "MyOwn" + }, + "vipAddress": "vapp1" + } + ] + } + ] + } + } + """; - instancesEventArgs.Should().NotBeNull(); - IServiceInstance newServiceInstance = instancesEventArgs.InstancesByServiceId.Should().ContainKey("foo").WhoseValue.Should().ContainSingle().Which; - newServiceInstance.Uri.ToString().Should().Be("http://modified-host:8080/"); + var appSettings = new Dictionary + { + ["Eureka:Client:ShouldFetchRegistry"] = "false", + ["Eureka:Client:ShouldRegisterWithEureka"] = "false", + ["Eureka:Client:ShouldFilterOnlyUpInstances"] = filterOnlyUpInstances.ToString(CultureInfo.InvariantCulture) + }; + + WebApplicationBuilder builder = TestWebApplicationBuilderFactory.Create(); + builder.Configuration.AddInMemoryCollection(appSettings); + builder.Services.AddEurekaDiscoveryClient(); + + var handler = new DelegateToMockHttpClientHandler(); + handler.Mock.Expect(HttpMethod.Get, "http://localhost:8761/eureka/apps").Respond("application/json", registryJson); + + await using WebApplication webApplication = builder.Build(); + webApplication.Services.GetRequiredService().Using(handler); + + var discoveryClient = webApplication.Services.GetRequiredService(); + DiscoveryInstancesFetchedEventArgs? eventArgs = null; + discoveryClient.InstancesFetched += (_, args) => eventArgs = args; + + await discoveryClient.FetchRegistryAsync(true, TestContext.Current.CancellationToken); + SpinWait.SpinUntil(() => eventArgs != null, 5.Seconds()).Should().BeTrue(); + + eventArgs.Should().NotBeNull(); + + IList instancesFromGet = await discoveryClient.GetInstancesAsync("vapp1", TestContext.Current.CancellationToken); + IReadOnlyList instancesFromEvent = eventArgs.InstancesByServiceId.Should().ContainKey("vapp1").WhoseValue; + + instancesFromEvent.Should().BeEquivalentTo(instancesFromGet); + + handler.Mock.VerifyNoOutstandingExpectation(); } private sealed class ExtraRequestHeadersDelegatingHandler : DelegatingHandler diff --git a/src/Discovery/test/HttpClients.Test/LoadBalancers/RoundRobinLoadBalancerTest.cs b/src/Discovery/test/HttpClients.Test/LoadBalancers/RoundRobinLoadBalancerTest.cs index afb96b2624..c84ead2d37 100644 --- a/src/Discovery/test/HttpClients.Test/LoadBalancers/RoundRobinLoadBalancerTest.cs +++ b/src/Discovery/test/HttpClients.Test/LoadBalancers/RoundRobinLoadBalancerTest.cs @@ -112,6 +112,67 @@ public async Task ResolveServiceInstanceAsync_FindsService_ReturnsURI() result.Should().Be(new Uri("https://foundit:5555/test/bar/foo?test=1&test2=2")); } + [Fact] + public async Task ResolveServiceInstanceAsync_RemovesDuplicates_CaseInsensitive() + { + var client = new TestDiscoveryClient([ + new TestServiceInstance(new Uri("HTTPS://CASE-HOST:1234/")), + new TestServiceInstance(new Uri("https://case-host:1234/")) + ], "svc"); + + var resolver = new ServiceInstancesResolver([client], NullLogger.Instance); + var loadBalancer = new RoundRobinLoadBalancer(resolver, null, null, NullLogger.Instance); + + Uri first = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://svc/api"), TestContext.Current.CancellationToken); + Uri second = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://svc/api"), TestContext.Current.CancellationToken); + + first.Should().Be(second); + } + + [Fact] + public async Task ResolveServiceInstanceAsync_RemovesDuplicates_CaseInsensitive_MultipleDiscoveryClients() + { + var targetUri = new Uri("https://merged:1/"); + var clientA = new TestDiscoveryClient([new TestServiceInstance(targetUri)], "svc"); + var clientB = new TestDiscoveryClient([new TestServiceInstance(targetUri)], "svc"); + + var resolver = new ServiceInstancesResolver([ + clientA, + clientB + ], NullLogger.Instance); + + var loadBalancer = new RoundRobinLoadBalancer(resolver, null, null, NullLogger.Instance); + + Uri first = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://svc/api"), TestContext.Current.CancellationToken); + Uri second = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://svc/api"), TestContext.Current.CancellationToken); + + first.Should().Be(second); + } + + [Fact] + public async Task ResolveServiceInstanceAsync_RemovesDuplicates_Rotates() + { + var shared = new Uri("https://shared:100/"); + var other = new Uri("https://other:100/"); + + var client = new TestDiscoveryClient([ + new TestServiceInstance(shared), + new TestServiceInstance(shared), + new TestServiceInstance(other) + ], "svc"); + + var resolver = new ServiceInstancesResolver([client], NullLogger.Instance); + var loadBalancer = new RoundRobinLoadBalancer(resolver, null, null, NullLogger.Instance); + + Uri first = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://svc/api"), TestContext.Current.CancellationToken); + Uri second = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://svc/api"), TestContext.Current.CancellationToken); + Uri third = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://svc/api"), TestContext.Current.CancellationToken); + + first.Should().Be(new Uri(shared, "api")); + second.Should().Be(new Uri(other, "api")); + third.Should().Be(first); + } + [Fact] public async Task ResolveServiceInstanceAsync_SkipsOverThrowingDiscoveryClients() { @@ -134,15 +195,28 @@ public async Task ResolveServiceInstanceAsync_SkipsOverThrowingDiscoveryClients( [Fact] public async Task ResolveServiceInstanceAsync_CachesInstances() { - ConfigurationDiscoveryOptions options = CreateTestServiceInstances(); + var options = new ConfigurationDiscoveryOptions + { + Services = + { + new ConfigurationServiceInstance + { + ServiceId = "fruit-service", + Host = "before-reload", + Port = 8000, + IsSecure = true + } + } + }; + TestOptionsMonitor optionsMonitor = TestOptionsMonitor.Create(options); var client = new ConfigurationDiscoveryClient(optionsMonitor); IDistributedCache distributedCache = GetCache(); var resolver = new ServiceInstancesResolver([client], distributedCache, null, NullLogger.Instance); var loadBalancer = new RoundRobinLoadBalancer(resolver, null, null, NullLogger.Instance); - Uri fruitUri = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://fruit-service/api"), TestContext.Current.CancellationToken); - fruitUri.Should().Be("https://fruit-ball:8000/api"); + Uri first = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://fruit-service/api"), TestContext.Current.CancellationToken); + first.Should().Be("https://before-reload:8000/api"); optionsMonitor.Change(new ConfigurationDiscoveryOptions { @@ -151,15 +225,15 @@ public async Task ResolveServiceInstanceAsync_CachesInstances() new ConfigurationServiceInstance { ServiceId = "fruit-service", - Host = "CHANGED", + Host = "after-reload", Port = 8000, IsSecure = true } } }); - fruitUri = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://fruit-service/api"), TestContext.Current.CancellationToken); - fruitUri.Should().Be("https://fruit-ball:8000/api"); + Uri second = await loadBalancer.ResolveServiceInstanceAsync(new Uri("https://fruit-service/api"), TestContext.Current.CancellationToken); + second.Should().Be(first); } private static ConfigurationDiscoveryOptions CreateTestServiceInstances() @@ -251,9 +325,10 @@ private sealed class TestServiceInstance(Uri uri) : IServiceInstance public IReadOnlyDictionary Metadata => throw new NotImplementedException(); } - private sealed class TestDiscoveryClient(IServiceInstance? instance = null) : IDiscoveryClient + private sealed class TestDiscoveryClient(IList instances, string? serviceId = null) : IDiscoveryClient { - private readonly IServiceInstance? _instance = instance; + private readonly IList _instances = instances; + private readonly string? _serviceId = serviceId; public string Description => throw new NotImplementedException(); @@ -261,31 +336,34 @@ private sealed class TestDiscoveryClient(IServiceInstance? instance = null) : ID public event EventHandler? InstancesFetched; #pragma warning restore CS0067 // The event is never used - public Task> GetServiceIdsAsync(CancellationToken cancellationToken) + public TestDiscoveryClient() + : this([]) { - throw new NotImplementedException(); } - public Task> GetInstancesAsync(string serviceId, CancellationToken cancellationToken) + public TestDiscoveryClient(IServiceInstance instance, string? serviceId = null) + : this([instance], serviceId) { - IList instances = []; + } - if (_instance != null) - { - instances.Add(_instance); - } + public Task> GetServiceIdsAsync(CancellationToken cancellationToken) + { + throw new NotSupportedException(); + } - return Task.FromResult(instances); + public Task> GetInstancesAsync(string serviceId, CancellationToken cancellationToken) + { + return Task.FromResult(_serviceId == null || serviceId == _serviceId ? _instances : []); } - public IServiceInstance GetLocalServiceInstance() + public IServiceInstance? GetLocalServiceInstance() { - throw new NotImplementedException(); + return null; } public Task ShutdownAsync(CancellationToken cancellationToken) { - throw new NotImplementedException(); + return Task.CompletedTask; } }