Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 131 additions & 59 deletions src/AsyncResourcePool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,22 +10,26 @@ namespace AsyncResourcePool
{
public sealed class AsyncResourcePool<TResource> : IAsyncResourcePool<TResource>
{
private readonly object _stateLock = new object();
private readonly object _pendingResourceRequestLock = new object();
private readonly int _minNumResources;
private readonly int _maxNumResources;
private readonly TimeSpan? _resourcesExpireAfter;
private readonly int _maxNumResourceCreationAttempts;
private readonly TimeSpan _resourceCreationRetryInterval;
private readonly Func<Task<TResource>> _resourceTaskFactory;
private readonly Queue<TimestampedResource> _availableResources;
private readonly Queue<ResourceRequestMessage> _pendingResourceRequests;
private readonly ConcurrentQueue<TimestampedResource> _availableResources;
private readonly ConcurrentQueue<ResourceRequestMessage> _pendingResourceRequests;
private readonly ActionBlock<IResourceMessage> _messageHandler;

/// <summary>
/// The total number of resources produced, including available resources and resources
/// that are currently in use
/// </summary>
private int _numResources = 0;
private int _numResourcesActive = 0;
private int _numResourcesInUse = 0;
private int _numResourcesInLimbo = 0;
private ResourceRequestMessage _pendingResourceRequest = null;

/// <summary>
/// This constructor should be used only if asynchronous resource creation is not available
Expand All @@ -48,8 +53,8 @@ public AsyncResourcePool(Func<Task<TResource>> resourceTaskFactory, AsyncResourc
_resourcesExpireAfter = options.ResourcesExpireAfter;
_maxNumResourceCreationAttempts = options.MaxNumResourceCreationAttempts;
_resourceCreationRetryInterval = options.ResourceCreationRetryInterval;
_availableResources = new Queue<TimestampedResource>();
_pendingResourceRequests = new Queue<ResourceRequestMessage>();
_availableResources = new ConcurrentQueue<TimestampedResource>();
_pendingResourceRequests = new ConcurrentQueue<ResourceRequestMessage>();
_resourceTaskFactory = resourceTaskFactory;

// Important: These functions must be called after all instance members have been initialised!
Expand Down Expand Up @@ -119,10 +124,21 @@ private async void SetupErrorHandling()

private void ClearAllPendingRequests(Exception ex)
{
while (_pendingResourceRequests.Count > 0)
lock(_pendingResourceRequestLock)
{
var request = _pendingResourceRequests.Dequeue();
request.TaskCompletionSource.SetException(ex);
while (!(_pendingResourceRequest == null && _pendingResourceRequests.IsEmpty))
{
ResourceRequestMessage request = _pendingResourceRequest;
if (request == null)
{
_pendingResourceRequests.TryDequeue(out request);
}
else
{
_pendingResourceRequest = null;
}
request?.TaskCompletionSource.SetException(ex);
}
}
}

Expand Down Expand Up @@ -168,17 +184,25 @@ private async void SetupPeriodicPurge()
private ReusableResource<TResource> TryGetReusableResource()
{
ReusableResource<TResource> reusableResource = null;
while (_availableResources.Count > 0)
while (!_availableResources.IsEmpty)
{
var timestampedResource = _availableResources.Dequeue();
var resource = timestampedResource.Resource;
if (IsResourceExpired(timestampedResource))
{
DisposeResource(resource);
}
else
TimestampedResource timestampedResource = null;
if (_availableResources.TryDequeue(out timestampedResource))
{
reusableResource = GetReusableResource(resource);
lock(_stateLock)
{
var resource = timestampedResource.Resource;
if (IsResourceExpired(timestampedResource))
{
DisposeResource(resource);
}
else
{
Interlocked.Increment(ref _numResourcesInLimbo);
reusableResource = GetReusableResource(resource);
break;
}
}
}
}

Expand All @@ -189,23 +213,38 @@ private ReusableResource<TResource> TryGetReusableResource()

private void HandlePendingResourceRequests()
{
while (_pendingResourceRequests.Count > 0)
lock(_pendingResourceRequestLock)
{
if (_pendingResourceRequests.Peek().CancellationToken.IsCancellationRequested)
while (!(_pendingResourceRequest == null && _pendingResourceRequests.IsEmpty))
{
_pendingResourceRequests.Dequeue(); // Throw away cancelled requests
continue;
}
if (_pendingResourceRequest == null)
{
_pendingResourceRequests.TryDequeue(out _pendingResourceRequest);
}

var result = TryGetReusableResource();
if (result != null)
{
var request = _pendingResourceRequests.Dequeue();
request.TaskCompletionSource.SetResult(result);
}
else
{
break;
// Throw away cancelled requests
if (_pendingResourceRequest == null ||
_pendingResourceRequest.CancellationToken.IsCancellationRequested)
{
_pendingResourceRequest = null;
continue;
}

var resource = TryGetReusableResource();
if (resource != null)
{
lock(_stateLock)
{
Interlocked.Decrement(ref _numResourcesInLimbo);
Interlocked.Increment(ref _numResourcesInUse);
}
_pendingResourceRequest.TaskCompletionSource.SetResult(resource);
_pendingResourceRequest = null;
}
else
{
break;
}
}
}
}
Expand All @@ -214,22 +253,30 @@ private void HandleResourceAvailable(ResourceAvailableMessage resourceAvailableM
{
var resource = resourceAvailableMessage.Resource;
var timestampedResource = TimestampedResource.Create(resource);
_availableResources.Enqueue(timestampedResource);
lock(_stateLock)
{
Interlocked.Decrement(ref _numResourcesInLimbo);
Interlocked.Increment(ref _numResourcesActive);
_availableResources.Enqueue(timestampedResource);
}
}

private void HandlePurgeExpiredResource(PurgeExpiredResourcesMessage purgeExpiredResourcesMessage)
{
var nonExpiredResources = new List<TimestampedResource>();
while (_availableResources.Count > 0)
while (!_availableResources.IsEmpty)
{
var timestampedResource = _availableResources.Dequeue();
if (IsResourceExpired(timestampedResource))
TimestampedResource timestampedResource = null;
if (_availableResources.TryDequeue(out timestampedResource))
{
DisposeResource(timestampedResource.Resource);
}
else
{
nonExpiredResources.Add(timestampedResource);
if (IsResourceExpired(timestampedResource))
{
DisposeResource(timestampedResource.Resource);
}
else
{
nonExpiredResources.Add(timestampedResource);
}
}
}

Expand All @@ -246,12 +293,13 @@ private void HandlePurgeExpiredResource(PurgeExpiredResourcesMessage purgeExpire
/// </summary>
private async void HandleEnsureAvailableResourcesMessage(EnsureAvailableResourcesMessage ensureAvailableResourcesMessage)
{
var effectiveNumResourcesAvailable = _numResources - _numResourcesInUse;
var availableResourcesGap = _minNumResources - effectiveNumResourcesAvailable;
var remainingCapacity = _maxNumResources - _numResources;
IEnumerable<Task> createResourceTasks = null;
var effectiveNumResourcesAvailable = _numResourcesActive - _numResourcesInUse;
var availableResourcesGap = _minNumResources - effectiveNumResourcesAvailable - _numResourcesInLimbo;
var remainingCapacity = _maxNumResources - _numResourcesActive - _numResourcesInLimbo;
var numResourcesToCreate = Math.Max(0, Math.Min(availableResourcesGap, remainingCapacity));

var createResourceTasks = Enumerable.Range(0, numResourcesToCreate)
createResourceTasks = Enumerable.Range(0, numResourcesToCreate)
.Select(_ => TryCreateResource());

try
Expand All @@ -270,18 +318,21 @@ private async Task TryCreateResource()
var resourceTask = _resourceTaskFactory();
try
{
// Increment before we wait for the task. Otherwise, while
// waiting, another thread may read the incorrect value.
Interlocked.Increment(ref _numResources);

lock(_stateLock)
{
Interlocked.Increment(ref _numResourcesInLimbo);
}
var resource = await resourceTask;

MakeResourceAvailable(resource);
}
catch (Exception)
{
// Roll back!
Interlocked.Decrement(ref _numResources);
lock(_stateLock)
{
Interlocked.Decrement(ref _numResourcesInLimbo);
}
throw;
}
}
Expand All @@ -303,10 +354,14 @@ private bool IsResourceExpired(TimestampedResource timestampedResource)

private ReusableResource<TResource> GetReusableResource(TResource resource)
{
Interlocked.Increment(ref _numResourcesInUse);
return new ReusableResource<TResource>(resource, () =>
{
Interlocked.Decrement(ref _numResourcesInUse);
lock(_stateLock)
{
Interlocked.Decrement(ref _numResourcesInUse);
Interlocked.Decrement(ref _numResourcesActive);
Interlocked.Increment(ref _numResourcesInLimbo);
}
MakeResourceAvailable(resource);
});
}
Expand All @@ -326,7 +381,10 @@ public Task<ReusableResource<TResource>> Get(CancellationToken cancellationToken

private async void DisposeResource(TResource resource)
{
Interlocked.Decrement(ref _numResources);
lock(_stateLock)
{
Interlocked.Decrement(ref _numResourcesActive);
}

if (resource is IDisposable disposableResource)
{
Expand All @@ -340,17 +398,31 @@ public async void Dispose()
await _messageHandler.Completion; // Even after we mark Complete, still need to finish processing.

// Clean up any remaining requests.
while (_pendingResourceRequests.Count > 0)
lock(_pendingResourceRequestLock)
{
var request = _pendingResourceRequests.Dequeue();
request.TaskCompletionSource.SetException(GetObjectDisposedException());
while (!(_pendingResourceRequest == null && _pendingResourceRequests.IsEmpty))
{
ResourceRequestMessage request = _pendingResourceRequest;
if (request == null)
{
_pendingResourceRequests.TryDequeue(out request);
}
else
{
_pendingResourceRequest = null;
}
request?.TaskCompletionSource.SetException(GetObjectDisposedException());
}
}

// Clean up any remaining resources.
while (_availableResources.Count > 0)
while (!_availableResources.IsEmpty)
{
var timestampedResource = _availableResources.Dequeue();
DisposeResource(timestampedResource.Resource);
TimestampedResource timestampedResource = null;
if (_availableResources.TryDequeue(out timestampedResource))
{
DisposeResource(timestampedResource.Resource);
}
}
}

Expand Down Expand Up @@ -429,4 +501,4 @@ public static TimestampedResource Create(TResource resource)
}
}
}
}
}
36 changes: 33 additions & 3 deletions tests/AsyncResourcePoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ public async Task Get_ShouldCauseAdditionalResourceCreation_WhenResourcesAreRetr
}

// Allow some time for the non-awaited tasks to run until they either finish or get stuck waiting
await Task.Delay(100);
for (var j = 0; j < 10; ++j)
{
if (expectedNumResourcesCreated == testHarness.CreatedResources.Count)
{
break;
}
await Task.Delay(100);
}

Assert.Equal(expectedNumResourcesCreated, testHarness.CreatedResources.Count);
}
Expand Down Expand Up @@ -171,6 +178,8 @@ async Task<TestResource> DisposeWatchFactory()

// Wait for the 8 resources to be disposed from expiry.
await Task.Delay(expiry * 1.5);
// Wait for disposal to finish
await initialResourcesDisposedTaskCompletionSource.Task;

var numResourcesRemainingInPool = testHarness.CreatedResources.Count - numDisposals;
Assert.Equal(minNumResources, numResourcesRemainingInPool);
Expand Down Expand Up @@ -214,10 +223,28 @@ async Task<TestResource> ResourceFactory()
[Fact(Timeout = Timeout)]
public async Task AllResourcesShouldBeDisposedAfterConnectionPoolIsDisposedOnceReusableResourceIsDisposed()
{
var testHarness = new TestHarness();
var count = 0;
var numDisposals = 0;
const int numRequestedResources = 3;

var initialResourcesDisposedTaskCompletionSource = new TaskCompletionSource<bool>();

async Task<TestResource> DisposeWatchFactory()
{
return new TestResource(++count, _ =>
{
var num = Interlocked.Increment(ref numDisposals);
if (num == numRequestedResources)
{
initialResourcesDisposedTaskCompletionSource.SetResult(true);
}
});
}

var testHarness = new TestHarness(DisposeWatchFactory);

ReusableResource<TestResource> reusableResource;
using (var sut = CreateSut(testHarness, 3, 5))
using (var sut = CreateSut(testHarness, numRequestedResources, 5))
{
reusableResource = await sut.Get();

Expand All @@ -226,6 +253,9 @@ public async Task AllResourcesShouldBeDisposedAfterConnectionPoolIsDisposedOnceR

reusableResource.Dispose();

// Wait for all the disposals to be handled async
await initialResourcesDisposedTaskCompletionSource.Task;

// The pool should still handle disposal of the resource wrapped by the disposed
// ReusableResource even when the pool itself has already been disposed.
Assert.All(testHarness.CreatedResources, resource => Assert.True(resource.IsDisposed));
Expand Down