Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions src/Discovery/src/Eureka/EurekaDiscoveryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,20 +373,19 @@ internal async Task FetchRegistryAsync(bool doFullUpdate, CancellationToken canc

private static ReadOnlyDictionary<string, IReadOnlyList<IServiceInstance>> ToServiceInstanceMap(ApplicationInfoCollection apps)
{
// @formatter:wrap_chained_method_calls chop_always
// @formatter:wrap_before_first_method_call true

return apps
.SelectMany(app => app.Instances)
.GroupBy(instance => instance.AppName, StringComparer.OrdinalIgnoreCase)
.ToDictionary(grouping => grouping.Key, grouping => (IReadOnlyList<IServiceInstance>)grouping
.Select(instance => instance.ToServiceInstance())
.ToList()
.AsReadOnly(), StringComparer.OrdinalIgnoreCase)
.AsReadOnly();

// @formatter:wrap_before_first_method_call restore
// @formatter:wrap_chained_method_calls restore
var dictionary = new Dictionary<string, IReadOnlyList<IServiceInstance>>(StringComparer.OrdinalIgnoreCase);

foreach (string vipAddress in apps.VipInstanceMap.Keys.ToArray())
{
ReadOnlyCollection<InstanceInfo> instancesByVipAddress = apps.GetInstancesByVipAddress(vipAddress);

if (instancesByVipAddress.Count > 0)
{
dictionary[vipAddress] = instancesByVipAddress.Select(instance => instance.ToServiceInstance()).ToList().AsReadOnly();
}
}

return new ReadOnlyDictionary<string, IReadOnlyList<IServiceInstance>>(dictionary);
}

private void RaiseFetchEvents(ApplicationsFetchedEventArgs? applicationsEventArgs, DiscoveryInstancesFetchedEventArgs? instancesEventArgs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public async Task<IList<IServiceInstance>> ResolveInstancesAsync(string serviceI

if (instancesFromCache != null)
{
instancesFromCache = RemoveDuplicatesByUri(instancesFromCache);
LogReturningInstancesFromCache(instancesFromCache.Count);
return instancesFromCache;
}
Expand All @@ -103,6 +104,8 @@ public async Task<IList<IServiceInstance>> ResolveInstancesAsync(string serviceI
}
}

instances = RemoveDuplicatesByUri(instances);

if (_distributedCache != null)
{
byte[] cacheValue = ToCacheValue(instances);
Expand All @@ -112,6 +115,22 @@ public async Task<IList<IServiceInstance>> ResolveInstancesAsync(string serviceI
return instances;
}

private static List<IServiceInstance> RemoveDuplicatesByUri(List<IServiceInstance> instances)
{
var seenUris = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var result = new List<IServiceInstance>();

foreach (IServiceInstance instance in instances)
{
if (seenUris.Add(instance.Uri.AbsoluteUri))
{
result.Add(instance);
}
}

return result;
}

private static List<IServiceInstance>? FromCacheValue(byte[]? cacheValue)
{
if (cacheValue is { Length: > 0 })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ public async Task InstancesFetched_event_is_raised_after_configuration_change()
await using WebApplication webApplication = builder.Build();

ConfigurationDiscoveryClient discoveryClient = webApplication.Services.GetServices<IDiscoveryClient>().OfType<ConfigurationDiscoveryClient>().Single();
int eventCount = 0;
DiscoveryInstancesFetchedEventArgs? eventArgs = null;
int eventCount = 0;

discoveryClient.InstancesFetched += (_, args) =>
{
eventCount++;
eventArgs = args;
Interlocked.Increment(ref eventCount);
};

fileProvider.ReplaceAppSettingsJsonFile("""
Expand Down
110 changes: 100 additions & 10 deletions src/Discovery/test/Eureka.Test/EurekaDiscoveryClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System.Globalization;
using System.Net;
using FluentAssertions.Extensions;
using Microsoft.AspNetCore.Builder;
Expand Down Expand Up @@ -655,38 +656,127 @@ public async Task ApplicationEventsFireAfterFetch()
webApplication.Services.GetRequiredService<HttpClientHandlerFactory>().Using(handler);

var discoveryClient = webApplication.Services.GetRequiredService<EurekaDiscoveryClient>();
int applicationsEventCount = 0;
ApplicationsFetchedEventArgs? applicationsEventArgs = null;
int instancesEventCount = 0;
int applicationsEventCount = 0;
DiscoveryInstancesFetchedEventArgs? instancesEventArgs = null;
int instancesEventCount = 0;

discoveryClient.ApplicationsFetched += (_, args) =>
{
applicationsEventCount++;
applicationsEventArgs = args;
Interlocked.Increment(ref applicationsEventCount);
};

discoveryClient.InstancesFetched += (_, args) =>
{
instancesEventCount++;
instancesEventArgs = args;
Interlocked.Increment(ref instancesEventCount);
};

await discoveryClient.FetchRegistryAsync(true, TestContext.Current.CancellationToken);
SpinWait.SpinUntil(() => applicationsEventCount == 1 && instancesEventCount == 1, 5.Seconds()).Should().BeTrue();

applicationsEventArgs.Should().NotBeNull();
InstanceInfo oldInstanceFromAppEvent = applicationsEventArgs.Applications.Should().ContainSingle().Which.Instances.Should().ContainSingle().Which;
oldInstanceFromAppEvent.ActionType.Should().Be(ActionType.Added);

instancesEventArgs.Should().NotBeNull();
IServiceInstance oldInstanceFromEvent = instancesEventArgs.InstancesByServiceId.Should().ContainKey("foo").WhoseValue.Should().ContainSingle().Which;
oldInstanceFromEvent.Uri.ToString().Should().Be("http://localhost:8080/");

IList<IServiceInstance> oldInstancesFromGet = await discoveryClient.GetInstancesAsync("foo", TestContext.Current.CancellationToken);
oldInstancesFromGet.Should().ContainSingle().Which.Uri.Should().Be(oldInstanceFromEvent.Uri);

await discoveryClient.FetchRegistryAsync(false, TestContext.Current.CancellationToken);
SpinWait.SpinUntil(() => applicationsEventCount == 2 && instancesEventCount == 2, 5.Seconds()).Should().BeTrue();

InstanceInfo newInstanceFromAppEvent = applicationsEventArgs.Applications.Should().ContainSingle().Which.Instances.Should().ContainSingle().Which;
newInstanceFromAppEvent.ActionType.Should().Be(ActionType.Modified);

IServiceInstance newInstanceFromEvent = instancesEventArgs.InstancesByServiceId.Should().ContainKey("foo").WhoseValue.Should().ContainSingle().Which;
newInstanceFromEvent.Uri.ToString().Should().Be("http://modified-host:8080/");

IList<IServiceInstance> newInstancesFromGet = await discoveryClient.GetInstancesAsync("foo", TestContext.Current.CancellationToken);
newInstancesFromGet.Should().ContainSingle().Which.Uri.Should().Be(newInstanceFromEvent.Uri);

handler.Mock.VerifyNoOutstandingExpectation();
}

applicationsEventArgs.Should().NotBeNull();
InstanceInfo newInstanceInfo = applicationsEventArgs.Applications.Should().ContainSingle().Which.Instances.Should().ContainSingle().Which;
newInstanceInfo.ActionType.Should().Be(ActionType.Modified);
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task InstancesFetched_returns_same_data_as_GetInstancesAsync(bool filterOnlyUpInstances)
{
const string registryJson = """
{
"applications": {
"application": [
{
"name": "ignored",
"instance": [
{
"instanceId": "id1",
"hostName": "h1",
"app": "app1",
"ipAddr": "10.0.0.1",
"status": "UP",
"dataCenterInfo": {
"@class": "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
"name": "MyOwn"
},
"vipAddress": "vapp1"
},
{
"instanceId": "id2",
"hostName": "h2",
"app": "app1",
"ipAddr": "10.0.0.2",
"status": "DOWN",
"dataCenterInfo": {
"@class": "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
"name": "MyOwn"
},
"vipAddress": "vapp1"
}
]
}
]
}
}
""";

instancesEventArgs.Should().NotBeNull();
IServiceInstance newServiceInstance = instancesEventArgs.InstancesByServiceId.Should().ContainKey("foo").WhoseValue.Should().ContainSingle().Which;
newServiceInstance.Uri.ToString().Should().Be("http://modified-host:8080/");
var appSettings = new Dictionary<string, string?>
{
["Eureka:Client:ShouldFetchRegistry"] = "false",
["Eureka:Client:ShouldRegisterWithEureka"] = "false",
["Eureka:Client:ShouldFilterOnlyUpInstances"] = filterOnlyUpInstances.ToString(CultureInfo.InvariantCulture)
};

WebApplicationBuilder builder = TestWebApplicationBuilderFactory.Create();
builder.Configuration.AddInMemoryCollection(appSettings);
builder.Services.AddEurekaDiscoveryClient();

var handler = new DelegateToMockHttpClientHandler();
handler.Mock.Expect(HttpMethod.Get, "http://localhost:8761/eureka/apps").Respond("application/json", registryJson);

await using WebApplication webApplication = builder.Build();
webApplication.Services.GetRequiredService<HttpClientHandlerFactory>().Using(handler);

var discoveryClient = webApplication.Services.GetRequiredService<EurekaDiscoveryClient>();
DiscoveryInstancesFetchedEventArgs? eventArgs = null;
discoveryClient.InstancesFetched += (_, args) => eventArgs = args;

await discoveryClient.FetchRegistryAsync(true, TestContext.Current.CancellationToken);
SpinWait.SpinUntil(() => eventArgs != null, 5.Seconds()).Should().BeTrue();

eventArgs.Should().NotBeNull();

IList<IServiceInstance> instancesFromGet = await discoveryClient.GetInstancesAsync("vapp1", TestContext.Current.CancellationToken);
IReadOnlyList<IServiceInstance> instancesFromEvent = eventArgs.InstancesByServiceId.Should().ContainKey("vapp1").WhoseValue;

instancesFromEvent.Should().BeEquivalentTo(instancesFromGet);

handler.Mock.VerifyNoOutstandingExpectation();
}

private sealed class ExtraRequestHeadersDelegatingHandler : DelegatingHandler
Expand Down
Loading