From 04bf513bff170ea300612667d142e5cf45239e6f Mon Sep 17 00:00:00 2001 From: Danny Guinther Date: Wed, 27 May 2020 09:36:50 -0400 Subject: [PATCH] Fix thread safety race conditions --- src/AsyncResourcePool.cs | 190 ++++++++++++++++++++++---------- tests/AsyncResourcePoolTests.cs | 36 +++++- 2 files changed, 164 insertions(+), 62 deletions(-) diff --git a/src/AsyncResourcePool.cs b/src/AsyncResourcePool.cs index 8374bbb..f08da93 100755 --- a/src/AsyncResourcePool.cs +++ b/src/AsyncResourcePool.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -9,22 +10,26 @@ namespace AsyncResourcePool { public sealed class AsyncResourcePool : IAsyncResourcePool { + private readonly object _stateLock = new object(); + private readonly object _pendingResourceRequestLock = new object(); private readonly int _minNumResources; private readonly int _maxNumResources; private readonly TimeSpan? _resourcesExpireAfter; private readonly int _maxNumResourceCreationAttempts; private readonly TimeSpan _resourceCreationRetryInterval; private readonly Func> _resourceTaskFactory; - private readonly Queue _availableResources; - private readonly Queue _pendingResourceRequests; + private readonly ConcurrentQueue _availableResources; + private readonly ConcurrentQueue _pendingResourceRequests; private readonly ActionBlock _messageHandler; /// /// The total number of resources produced, including available resources and resources /// that are currently in use /// - private int _numResources = 0; + private int _numResourcesActive = 0; private int _numResourcesInUse = 0; + private int _numResourcesInLimbo = 0; + private ResourceRequestMessage _pendingResourceRequest = null; /// /// This constructor should be used only if asynchronous resource creation is not available @@ -48,8 +53,8 @@ public AsyncResourcePool(Func> resourceTaskFactory, AsyncResourc _resourcesExpireAfter = options.ResourcesExpireAfter; _maxNumResourceCreationAttempts = options.MaxNumResourceCreationAttempts; _resourceCreationRetryInterval = options.ResourceCreationRetryInterval; - _availableResources = new Queue(); - _pendingResourceRequests = new Queue(); + _availableResources = new ConcurrentQueue(); + _pendingResourceRequests = new ConcurrentQueue(); _resourceTaskFactory = resourceTaskFactory; // Important: These functions must be called after all instance members have been initialised! @@ -119,10 +124,21 @@ private async void SetupErrorHandling() private void ClearAllPendingRequests(Exception ex) { - while (_pendingResourceRequests.Count > 0) + lock(_pendingResourceRequestLock) { - var request = _pendingResourceRequests.Dequeue(); - request.TaskCompletionSource.SetException(ex); + while (!(_pendingResourceRequest == null && _pendingResourceRequests.IsEmpty)) + { + ResourceRequestMessage request = _pendingResourceRequest; + if (request == null) + { + _pendingResourceRequests.TryDequeue(out request); + } + else + { + _pendingResourceRequest = null; + } + request?.TaskCompletionSource.SetException(ex); + } } } @@ -168,17 +184,25 @@ private async void SetupPeriodicPurge() private ReusableResource TryGetReusableResource() { ReusableResource reusableResource = null; - while (_availableResources.Count > 0) + while (!_availableResources.IsEmpty) { - var timestampedResource = _availableResources.Dequeue(); - var resource = timestampedResource.Resource; - if (IsResourceExpired(timestampedResource)) - { - DisposeResource(resource); - } - else + TimestampedResource timestampedResource = null; + if (_availableResources.TryDequeue(out timestampedResource)) { - reusableResource = GetReusableResource(resource); + lock(_stateLock) + { + var resource = timestampedResource.Resource; + if (IsResourceExpired(timestampedResource)) + { + DisposeResource(resource); + } + else + { + Interlocked.Increment(ref _numResourcesInLimbo); + reusableResource = GetReusableResource(resource); + break; + } + } } } @@ -189,23 +213,38 @@ private ReusableResource TryGetReusableResource() private void HandlePendingResourceRequests() { - while (_pendingResourceRequests.Count > 0) + lock(_pendingResourceRequestLock) { - if (_pendingResourceRequests.Peek().CancellationToken.IsCancellationRequested) + while (!(_pendingResourceRequest == null && _pendingResourceRequests.IsEmpty)) { - _pendingResourceRequests.Dequeue(); // Throw away cancelled requests - continue; - } + if (_pendingResourceRequest == null) + { + _pendingResourceRequests.TryDequeue(out _pendingResourceRequest); + } - var result = TryGetReusableResource(); - if (result != null) - { - var request = _pendingResourceRequests.Dequeue(); - request.TaskCompletionSource.SetResult(result); - } - else - { - break; + // Throw away cancelled requests + if (_pendingResourceRequest == null || + _pendingResourceRequest.CancellationToken.IsCancellationRequested) + { + _pendingResourceRequest = null; + continue; + } + + var resource = TryGetReusableResource(); + if (resource != null) + { + lock(_stateLock) + { + Interlocked.Decrement(ref _numResourcesInLimbo); + Interlocked.Increment(ref _numResourcesInUse); + } + _pendingResourceRequest.TaskCompletionSource.SetResult(resource); + _pendingResourceRequest = null; + } + else + { + break; + } } } } @@ -214,22 +253,30 @@ private void HandleResourceAvailable(ResourceAvailableMessage resourceAvailableM { var resource = resourceAvailableMessage.Resource; var timestampedResource = TimestampedResource.Create(resource); - _availableResources.Enqueue(timestampedResource); + lock(_stateLock) + { + Interlocked.Decrement(ref _numResourcesInLimbo); + Interlocked.Increment(ref _numResourcesActive); + _availableResources.Enqueue(timestampedResource); + } } private void HandlePurgeExpiredResource(PurgeExpiredResourcesMessage purgeExpiredResourcesMessage) { var nonExpiredResources = new List(); - while (_availableResources.Count > 0) + while (!_availableResources.IsEmpty) { - var timestampedResource = _availableResources.Dequeue(); - if (IsResourceExpired(timestampedResource)) + TimestampedResource timestampedResource = null; + if (_availableResources.TryDequeue(out timestampedResource)) { - DisposeResource(timestampedResource.Resource); - } - else - { - nonExpiredResources.Add(timestampedResource); + if (IsResourceExpired(timestampedResource)) + { + DisposeResource(timestampedResource.Resource); + } + else + { + nonExpiredResources.Add(timestampedResource); + } } } @@ -246,12 +293,13 @@ private void HandlePurgeExpiredResource(PurgeExpiredResourcesMessage purgeExpire /// private async void HandleEnsureAvailableResourcesMessage(EnsureAvailableResourcesMessage ensureAvailableResourcesMessage) { - var effectiveNumResourcesAvailable = _numResources - _numResourcesInUse; - var availableResourcesGap = _minNumResources - effectiveNumResourcesAvailable; - var remainingCapacity = _maxNumResources - _numResources; + IEnumerable createResourceTasks = null; + var effectiveNumResourcesAvailable = _numResourcesActive - _numResourcesInUse; + var availableResourcesGap = _minNumResources - effectiveNumResourcesAvailable - _numResourcesInLimbo; + var remainingCapacity = _maxNumResources - _numResourcesActive - _numResourcesInLimbo; var numResourcesToCreate = Math.Max(0, Math.Min(availableResourcesGap, remainingCapacity)); - var createResourceTasks = Enumerable.Range(0, numResourcesToCreate) + createResourceTasks = Enumerable.Range(0, numResourcesToCreate) .Select(_ => TryCreateResource()); try @@ -270,10 +318,10 @@ private async Task TryCreateResource() var resourceTask = _resourceTaskFactory(); try { - // Increment before we wait for the task. Otherwise, while - // waiting, another thread may read the incorrect value. - Interlocked.Increment(ref _numResources); - + lock(_stateLock) + { + Interlocked.Increment(ref _numResourcesInLimbo); + } var resource = await resourceTask; MakeResourceAvailable(resource); @@ -281,7 +329,10 @@ private async Task TryCreateResource() catch (Exception) { // Roll back! - Interlocked.Decrement(ref _numResources); + lock(_stateLock) + { + Interlocked.Decrement(ref _numResourcesInLimbo); + } throw; } } @@ -303,10 +354,14 @@ private bool IsResourceExpired(TimestampedResource timestampedResource) private ReusableResource GetReusableResource(TResource resource) { - Interlocked.Increment(ref _numResourcesInUse); return new ReusableResource(resource, () => { - Interlocked.Decrement(ref _numResourcesInUse); + lock(_stateLock) + { + Interlocked.Decrement(ref _numResourcesInUse); + Interlocked.Decrement(ref _numResourcesActive); + Interlocked.Increment(ref _numResourcesInLimbo); + } MakeResourceAvailable(resource); }); } @@ -326,7 +381,10 @@ public Task> Get(CancellationToken cancellationToken private async void DisposeResource(TResource resource) { - Interlocked.Decrement(ref _numResources); + lock(_stateLock) + { + Interlocked.Decrement(ref _numResourcesActive); + } if (resource is IDisposable disposableResource) { @@ -340,17 +398,31 @@ public async void Dispose() await _messageHandler.Completion; // Even after we mark Complete, still need to finish processing. // Clean up any remaining requests. - while (_pendingResourceRequests.Count > 0) + lock(_pendingResourceRequestLock) { - var request = _pendingResourceRequests.Dequeue(); - request.TaskCompletionSource.SetException(GetObjectDisposedException()); + while (!(_pendingResourceRequest == null && _pendingResourceRequests.IsEmpty)) + { + ResourceRequestMessage request = _pendingResourceRequest; + if (request == null) + { + _pendingResourceRequests.TryDequeue(out request); + } + else + { + _pendingResourceRequest = null; + } + request?.TaskCompletionSource.SetException(GetObjectDisposedException()); + } } // Clean up any remaining resources. - while (_availableResources.Count > 0) + while (!_availableResources.IsEmpty) { - var timestampedResource = _availableResources.Dequeue(); - DisposeResource(timestampedResource.Resource); + TimestampedResource timestampedResource = null; + if (_availableResources.TryDequeue(out timestampedResource)) + { + DisposeResource(timestampedResource.Resource); + } } } @@ -429,4 +501,4 @@ public static TimestampedResource Create(TResource resource) } } } -} \ No newline at end of file +} diff --git a/tests/AsyncResourcePoolTests.cs b/tests/AsyncResourcePoolTests.cs index a17f273..c65480c 100644 --- a/tests/AsyncResourcePoolTests.cs +++ b/tests/AsyncResourcePoolTests.cs @@ -58,7 +58,14 @@ public async Task Get_ShouldCauseAdditionalResourceCreation_WhenResourcesAreRetr } // Allow some time for the non-awaited tasks to run until they either finish or get stuck waiting - await Task.Delay(100); + for (var j = 0; j < 10; ++j) + { + if (expectedNumResourcesCreated == testHarness.CreatedResources.Count) + { + break; + } + await Task.Delay(100); + } Assert.Equal(expectedNumResourcesCreated, testHarness.CreatedResources.Count); } @@ -171,6 +178,8 @@ async Task DisposeWatchFactory() // Wait for the 8 resources to be disposed from expiry. await Task.Delay(expiry * 1.5); + // Wait for disposal to finish + await initialResourcesDisposedTaskCompletionSource.Task; var numResourcesRemainingInPool = testHarness.CreatedResources.Count - numDisposals; Assert.Equal(minNumResources, numResourcesRemainingInPool); @@ -214,10 +223,28 @@ async Task ResourceFactory() [Fact(Timeout = Timeout)] public async Task AllResourcesShouldBeDisposedAfterConnectionPoolIsDisposedOnceReusableResourceIsDisposed() { - var testHarness = new TestHarness(); + var count = 0; + var numDisposals = 0; + const int numRequestedResources = 3; + + var initialResourcesDisposedTaskCompletionSource = new TaskCompletionSource(); + + async Task DisposeWatchFactory() + { + return new TestResource(++count, _ => + { + var num = Interlocked.Increment(ref numDisposals); + if (num == numRequestedResources) + { + initialResourcesDisposedTaskCompletionSource.SetResult(true); + } + }); + } + + var testHarness = new TestHarness(DisposeWatchFactory); ReusableResource reusableResource; - using (var sut = CreateSut(testHarness, 3, 5)) + using (var sut = CreateSut(testHarness, numRequestedResources, 5)) { reusableResource = await sut.Get(); @@ -226,6 +253,9 @@ public async Task AllResourcesShouldBeDisposedAfterConnectionPoolIsDisposedOnceR reusableResource.Dispose(); + // Wait for all the disposals to be handled async + await initialResourcesDisposedTaskCompletionSource.Task; + // The pool should still handle disposal of the resource wrapped by the disposed // ReusableResource even when the pool itself has already been disposed. Assert.All(testHarness.CreatedResources, resource => Assert.True(resource.IsDisposed));