From 248f258e54e1a5983b1da5735f4e8838b66d2417 Mon Sep 17 00:00:00 2001 From: Jasdeep Saini Date: Mon, 11 Jul 2022 22:25:36 -0500 Subject: [PATCH 1/4] Bug Fix: Fix a leak in CurrentValueRelay. --- Sources/Relays/CurrentValueRelay.swift | 3 +- Tests/CurrentValueRelayTests.swift | 101 +++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/Sources/Relays/CurrentValueRelay.swift b/Sources/Relays/CurrentValueRelay.swift index 8de54f6..234a263 100644 --- a/Sources/Relays/CurrentValueRelay.swift +++ b/Sources/Relays/CurrentValueRelay.swift @@ -75,6 +75,7 @@ private extension CurrentValueRelay { func forceFinish() { self.sink?.shouldForwardCompletion = true self.sink?.receive(completion: .finished) + self.sink = nil } func request(_ demand: Subscribers.Demand) { @@ -82,7 +83,7 @@ private extension CurrentValueRelay { } func cancel() { - sink = nil + forceFinish() } } } diff --git a/Tests/CurrentValueRelayTests.swift b/Tests/CurrentValueRelayTests.swift index 72dfb48..38428a6 100644 --- a/Tests/CurrentValueRelayTests.swift +++ b/Tests/CurrentValueRelayTests.swift @@ -120,5 +120,106 @@ class CurrentValueRelayTests: XCTestCase { XCTAssertFalse(completed) XCTAssertEqual(values, ["initial", "1", "2", "3"]) } + + // There was a race condition which caused the value of a relay + // to leak. Details of the race condition are in this PR: + // + // https://github.com/CombineCommunity/CombineExt/pull/137 + // + // The easiest way to reproduce the race condition is + // to initialize `cancellables` before `relay`. + // These two tests confirm the value of the relay is + // released regardless of when cancellables is created. + final class StoredObject { + static var storedObjectReleased = false + + let value = 10 + + init() { + Self.storedObjectReleased = false + } + + deinit { + Self.storedObjectReleased = true + } + } + + func testFinishesOnDeinitWhenRelayIsAfterCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes before the relay. + var cancellables = Set() + let relay = CurrentValueRelay(StoredObject()) + + init() { + relay + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertNil(container) + + // In this case the cancellables is deinited before the CurrentValueRelay. + // Deiniting cancellables calls cancel for all subscriptions. Completion + // will never be called. + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertTrue(ContainerClass.receivedCancel) + } + + func testFinishesOnDeinitWhenRelayIsBeforeCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes after the relay. + let relay = CurrentValueRelay(StoredObject()) + var cancellables = Set() + + init() { + relay + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertNil(container) + + // In this case the cancellables is deinited after the CurrentValueRelay, + // so completion will be called. Since the relay was completed, cancel will + // not be called. + XCTAssertTrue(ContainerClass.receivedCompletion) + XCTAssertFalse(ContainerClass.receivedCancel) + } } #endif From ff26c085a460847d7b3ab6d722203e1d53b2647d Mon Sep 17 00:00:00 2001 From: Jasdeep Saini Date: Tue, 19 Jul 2022 16:19:36 -0500 Subject: [PATCH 2/4] - Fix a crash in CombineExt caused by the fix for a leak in CurrentValueRelay. --- .DS_Store | Bin 6148 -> 6148 bytes Sources/Common/Sink.swift | 12 +++- Tests/CurrentValueRelayTests.swift | 109 ++++++++++++++++++++++++++++- 3 files changed, 117 insertions(+), 4 deletions(-) diff --git a/.DS_Store b/.DS_Store index cb6c1f7c249a173809466a51d3e9c1827024bf15..5a044c1dc5192629a7fa13c779f861fc34862250 100644 GIT binary patch delta 93 zcmZoMXffE}#>jN!!DJpr9g*C87nh`*{3Hej2984ATf3GxA8~|Aq+pfcFUT+qPR`FQ Y0P0|1U~f;dIGKl0M&heKY0Bs-`*#H0l diff --git a/Sources/Common/Sink.swift b/Sources/Common/Sink.swift index ba76529..3c19b76 100644 --- a/Sources/Common/Sink.swift +++ b/Sources/Common/Sink.swift @@ -21,6 +21,7 @@ class Sink: Subscriber { private var upstreamSubscription: Subscription? private let transformOutput: TransformOutput? private let transformFailure: TransformFailure? + private var upstreamIsCancelled = false /// Initialize a new sink subscribing to the upstream publisher and /// fulfilling the demand of the downstream subscriber using a backpresurre @@ -41,7 +42,14 @@ class Sink: Subscriber { self.buffer = DemandBuffer(subscriber: downstream) self.transformOutput = transformOutput self.transformFailure = transformFailure - upstream.subscribe(self) + + // A subscription can only be cancelled once. The `upstreamIsCancelled` value + // is used to suppress a second call to cancel when a Sink is `deinit`'ed, + // when a sink receives completion, and when any custom operator call + // `cancelUpstream()` manually. + upstream.handleEvents(receiveCancel: { [weak self] in + self?.upstreamIsCancelled = true + }).subscribe(self) } func demand(_ demand: Subscribers.Demand) { @@ -93,6 +101,8 @@ class Sink: Subscriber { } func cancelUpstream() { + guard upstreamIsCancelled == false else { return } + upstreamSubscription.kill() } diff --git a/Tests/CurrentValueRelayTests.swift b/Tests/CurrentValueRelayTests.swift index 38428a6..1f605be 100644 --- a/Tests/CurrentValueRelayTests.swift +++ b/Tests/CurrentValueRelayTests.swift @@ -128,8 +128,13 @@ class CurrentValueRelayTests: XCTestCase { // // The easiest way to reproduce the race condition is // to initialize `cancellables` before `relay`. - // These two tests confirm the value of the relay is + // The first two tests confirm the value of the relay is // released regardless of when cancellables is created. + // + // The last two tests checks the scenario where a relay is + // chained with a withLatestFrom operator. This leads + // to two objects being leaked if cancellables is declared + // before the relays. final class StoredObject { static var storedObjectReleased = false @@ -144,7 +149,21 @@ class CurrentValueRelayTests: XCTestCase { } } - func testFinishesOnDeinitWhenRelayIsAfterCancellables() { + final class StoredObject2 { + static var storedObjectReleased = false + + let value = 20 + + init() { + Self.storedObjectReleased = false + } + + deinit { + Self.storedObjectReleased = true + } + } + + func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredAfterCancellables() { final class ContainerClass { static var receivedCompletion = false static var receivedCancel = false @@ -183,7 +202,7 @@ class CurrentValueRelayTests: XCTestCase { XCTAssertTrue(ContainerClass.receivedCancel) } - func testFinishesOnDeinitWhenRelayIsBeforeCancellables() { + func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredBeforeCancellables() { final class ContainerClass { static var receivedCompletion = false static var receivedCancel = false @@ -221,5 +240,89 @@ class CurrentValueRelayTests: XCTestCase { XCTAssertTrue(ContainerClass.receivedCompletion) XCTAssertFalse(ContainerClass.receivedCancel) } + + func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredBeforeCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes after the relay. In this case, there + // is no leak. + let relay = CurrentValueRelay(StoredObject()) + let relay2 = CurrentValueRelay(StoredObject2()) + var cancellables: Set? = Set() + + init() { + relay + .withLatestFrom(relay2) + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables!) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + XCTAssertFalse(StoredObject2.storedObjectReleased) + // When the leak was fixed, the stream started crashing because cancel + // was called twice on relay. A fix for the crash was added, + // so setting the container to nil which deallocates cancellables + // should not cause a crash. + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertTrue(StoredObject2.storedObjectReleased) + XCTAssertNil(container) + } + + func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredAfterCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes before the relay. In this case, the objects + // for both relays leak. + var cancellables: Set? = Set() + let relay = CurrentValueRelay(StoredObject()) + let relay2 = CurrentValueRelay(StoredObject2()) + + init() { + relay + .withLatestFrom(relay2) + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables!) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + XCTAssertFalse(StoredObject2.storedObjectReleased) + // When the leak was fixed, the stream started crashing because cancel + // was called twice on relay. A fix for the crash was added, + // so setting the container to nil which deallocates cancellables + // should not cause a crash. + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertTrue(StoredObject2.storedObjectReleased) + XCTAssertNil(container) + } } #endif From fb5f27193ecc30830895390a9510214faa2cd2b2 Mon Sep 17 00:00:00 2001 From: Jasdeep Saini Date: Tue, 19 Jul 2022 23:50:45 -0500 Subject: [PATCH 3/4] Minor documentation updates. --- .../xcshareddata/swiftpm/Package.resolved | 33 +++++++++++-------- Sources/Common/Sink.swift | 6 ++-- Tests/CurrentValueRelayTests.swift | 16 ++++----- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index ed69403..3d67bf6 100644 --- a/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,16 +1,23 @@ { - "object": { - "pins": [ - { - "package": "combine-schedulers", - "repositoryURL": "https://github.com/pointfreeco/combine-schedulers", - "state": { - "branch": null, - "revision": "afc84b6a3639198b7b8b6d79f04eb3c2ee590d29", - "version": "0.1.1" - } + "pins" : [ + { + "identity" : "combine-schedulers", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/combine-schedulers", + "state" : { + "revision" : "4cf088c29a20f52be0f2ca54992b492c54e0076b", + "version" : "0.5.3" } - ] - }, - "version": 1 + }, + { + "identity" : "xctest-dynamic-overlay", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/xctest-dynamic-overlay", + "state" : { + "revision" : "ef8e14e7ce1c0c304c644c6ba365d06c468ded6b", + "version" : "0.3.3" + } + } + ], + "version" : 2 } diff --git a/Sources/Common/Sink.swift b/Sources/Common/Sink.swift index 3c19b76..a3418a4 100644 --- a/Sources/Common/Sink.swift +++ b/Sources/Common/Sink.swift @@ -44,9 +44,9 @@ class Sink: Subscriber { self.transformFailure = transformFailure // A subscription can only be cancelled once. The `upstreamIsCancelled` value - // is used to suppress a second call to cancel when a Sink is `deinit`'ed, - // when a sink receives completion, and when any custom operator call - // `cancelUpstream()` manually. + // is used to suppress a second call to cancel when the Sink is deallocated, + // when a sink receives completion, and when a custom operator like `withLatestFrom` + // calls `cancelUpstream()` manually. upstream.handleEvents(receiveCancel: { [weak self] in self?.upstreamIsCancelled = true }).subscribe(self) diff --git a/Tests/CurrentValueRelayTests.swift b/Tests/CurrentValueRelayTests.swift index 1f605be..d9a2579 100644 --- a/Tests/CurrentValueRelayTests.swift +++ b/Tests/CurrentValueRelayTests.swift @@ -129,11 +129,11 @@ class CurrentValueRelayTests: XCTestCase { // The easiest way to reproduce the race condition is // to initialize `cancellables` before `relay`. // The first two tests confirm the value of the relay is - // released regardless of when cancellables is created. + // released regardless of when cancellables is initialized. // - // The last two tests checks the scenario where a relay is + // The last two tests check the scenario where a relay is // chained with a withLatestFrom operator. This leads - // to two objects being leaked if cancellables is declared + // to two objects being leaked if cancellables is initialized // before the relays. final class StoredObject { static var storedObjectReleased = false @@ -195,9 +195,9 @@ class CurrentValueRelayTests: XCTestCase { XCTAssertTrue(StoredObject.storedObjectReleased) XCTAssertNil(container) - // In this case the cancellables is deinited before the CurrentValueRelay. - // Deiniting cancellables calls cancel for all subscriptions. Completion - // will never be called. + // In this case the cancellables is deallocated before the relay. + // The deinit method of AnyCancellable calls cancel for all subscriptions. + // Completion will never be called for a canceled subscription. XCTAssertFalse(ContainerClass.receivedCompletion) XCTAssertTrue(ContainerClass.receivedCancel) } @@ -276,7 +276,7 @@ class CurrentValueRelayTests: XCTestCase { // When the leak was fixed, the stream started crashing because cancel // was called twice on relay. A fix for the crash was added, // so setting the container to nil which deallocates cancellables - // should not cause a crash. + // confirms there is no crash. container = nil XCTAssertTrue(StoredObject.storedObjectReleased) XCTAssertTrue(StoredObject2.storedObjectReleased) @@ -318,7 +318,7 @@ class CurrentValueRelayTests: XCTestCase { // When the leak was fixed, the stream started crashing because cancel // was called twice on relay. A fix for the crash was added, // so setting the container to nil which deallocates cancellables - // should not cause a crash. + // confirms there is no crash. container = nil XCTAssertTrue(StoredObject.storedObjectReleased) XCTAssertTrue(StoredObject2.storedObjectReleased) From c522cd1548c28367e943714aecdbc4addf712cd2 Mon Sep 17 00:00:00 2001 From: Shai Mishali Date: Sat, 23 Jul 2022 10:30:50 +0300 Subject: [PATCH 4/4] Update Sources/Common/Sink.swift --- Sources/Common/Sink.swift | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Sources/Common/Sink.swift b/Sources/Common/Sink.swift index a3418a4..813ec5c 100644 --- a/Sources/Common/Sink.swift +++ b/Sources/Common/Sink.swift @@ -47,9 +47,13 @@ class Sink: Subscriber { // is used to suppress a second call to cancel when the Sink is deallocated, // when a sink receives completion, and when a custom operator like `withLatestFrom` // calls `cancelUpstream()` manually. - upstream.handleEvents(receiveCancel: { [weak self] in - self?.upstreamIsCancelled = true - }).subscribe(self) + upstream + .handleEvents( + receiveCancel: { [weak self] in + self?.upstreamIsCancelled = true + } + ) + .subscribe(self) } func demand(_ demand: Subscribers.Demand) {