diff --git a/.DS_Store b/.DS_Store index cb6c1f7..5a044c1 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index ed69403..3d67bf6 100644 --- a/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/CombineExt.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,16 +1,23 @@ { - "object": { - "pins": [ - { - "package": "combine-schedulers", - "repositoryURL": "https://github.com/pointfreeco/combine-schedulers", - "state": { - "branch": null, - "revision": "afc84b6a3639198b7b8b6d79f04eb3c2ee590d29", - "version": "0.1.1" - } + "pins" : [ + { + "identity" : "combine-schedulers", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/combine-schedulers", + "state" : { + "revision" : "4cf088c29a20f52be0f2ca54992b492c54e0076b", + "version" : "0.5.3" } - ] - }, - "version": 1 + }, + { + "identity" : "xctest-dynamic-overlay", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/xctest-dynamic-overlay", + "state" : { + "revision" : "ef8e14e7ce1c0c304c644c6ba365d06c468ded6b", + "version" : "0.3.3" + } + } + ], + "version" : 2 } diff --git a/Sources/Common/Sink.swift b/Sources/Common/Sink.swift index ba76529..813ec5c 100644 --- a/Sources/Common/Sink.swift +++ b/Sources/Common/Sink.swift @@ -21,6 +21,7 @@ class Sink: Subscriber { private var upstreamSubscription: Subscription? private let transformOutput: TransformOutput? private let transformFailure: TransformFailure? + private var upstreamIsCancelled = false /// Initialize a new sink subscribing to the upstream publisher and /// fulfilling the demand of the downstream subscriber using a backpresurre @@ -41,7 +42,18 @@ class Sink: Subscriber { self.buffer = DemandBuffer(subscriber: downstream) self.transformOutput = transformOutput self.transformFailure = transformFailure - upstream.subscribe(self) + + // A subscription can only be cancelled once. The `upstreamIsCancelled` value + // is used to suppress a second call to cancel when the Sink is deallocated, + // when a sink receives completion, and when a custom operator like `withLatestFrom` + // calls `cancelUpstream()` manually. + upstream + .handleEvents( + receiveCancel: { [weak self] in + self?.upstreamIsCancelled = true + } + ) + .subscribe(self) } func demand(_ demand: Subscribers.Demand) { @@ -93,6 +105,8 @@ class Sink: Subscriber { } func cancelUpstream() { + guard upstreamIsCancelled == false else { return } + upstreamSubscription.kill() } diff --git a/Sources/Relays/CurrentValueRelay.swift b/Sources/Relays/CurrentValueRelay.swift index 8de54f6..234a263 100644 --- a/Sources/Relays/CurrentValueRelay.swift +++ b/Sources/Relays/CurrentValueRelay.swift @@ -75,6 +75,7 @@ private extension CurrentValueRelay { func forceFinish() { self.sink?.shouldForwardCompletion = true self.sink?.receive(completion: .finished) + self.sink = nil } func request(_ demand: Subscribers.Demand) { @@ -82,7 +83,7 @@ private extension CurrentValueRelay { } func cancel() { - sink = nil + forceFinish() } } } diff --git a/Tests/CurrentValueRelayTests.swift b/Tests/CurrentValueRelayTests.swift index 72dfb48..d9a2579 100644 --- a/Tests/CurrentValueRelayTests.swift +++ b/Tests/CurrentValueRelayTests.swift @@ -120,5 +120,209 @@ class CurrentValueRelayTests: XCTestCase { XCTAssertFalse(completed) XCTAssertEqual(values, ["initial", "1", "2", "3"]) } + + // There was a race condition which caused the value of a relay + // to leak. Details of the race condition are in this PR: + // + // https://github.com/CombineCommunity/CombineExt/pull/137 + // + // The easiest way to reproduce the race condition is + // to initialize `cancellables` before `relay`. + // The first two tests confirm the value of the relay is + // released regardless of when cancellables is initialized. + // + // The last two tests check the scenario where a relay is + // chained with a withLatestFrom operator. This leads + // to two objects being leaked if cancellables is initialized + // before the relays. + final class StoredObject { + static var storedObjectReleased = false + + let value = 10 + + init() { + Self.storedObjectReleased = false + } + + deinit { + Self.storedObjectReleased = true + } + } + + final class StoredObject2 { + static var storedObjectReleased = false + + let value = 20 + + init() { + Self.storedObjectReleased = false + } + + deinit { + Self.storedObjectReleased = true + } + } + + func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredAfterCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes before the relay. + var cancellables = Set() + let relay = CurrentValueRelay(StoredObject()) + + init() { + relay + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertNil(container) + + // In this case the cancellables is deallocated before the relay. + // The deinit method of AnyCancellable calls cancel for all subscriptions. + // Completion will never be called for a canceled subscription. + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertTrue(ContainerClass.receivedCancel) + } + + func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredBeforeCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes after the relay. + let relay = CurrentValueRelay(StoredObject()) + var cancellables = Set() + + init() { + relay + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertNil(container) + + // In this case the cancellables is deinited after the CurrentValueRelay, + // so completion will be called. Since the relay was completed, cancel will + // not be called. + XCTAssertTrue(ContainerClass.receivedCompletion) + XCTAssertFalse(ContainerClass.receivedCancel) + } + + func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredBeforeCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes after the relay. In this case, there + // is no leak. + let relay = CurrentValueRelay(StoredObject()) + let relay2 = CurrentValueRelay(StoredObject2()) + var cancellables: Set? = Set() + + init() { + relay + .withLatestFrom(relay2) + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables!) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + XCTAssertFalse(StoredObject2.storedObjectReleased) + // When the leak was fixed, the stream started crashing because cancel + // was called twice on relay. A fix for the crash was added, + // so setting the container to nil which deallocates cancellables + // confirms there is no crash. + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertTrue(StoredObject2.storedObjectReleased) + XCTAssertNil(container) + } + + func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredAfterCancellables() { + final class ContainerClass { + static var receivedCompletion = false + static var receivedCancel = false + + // Cancellables comes before the relay. In this case, the objects + // for both relays leak. + var cancellables: Set? = Set() + let relay = CurrentValueRelay(StoredObject()) + let relay2 = CurrentValueRelay(StoredObject2()) + + init() { + relay + .withLatestFrom(relay2) + .handleEvents(receiveCancel: { + Self.receivedCancel = true + }) + .sink( + receiveCompletion: { _ in + Self.receivedCompletion = true + }, + receiveValue: { _ in } + ) + .store(in: &cancellables!) + } + } + + var container: ContainerClass? = ContainerClass() + + XCTAssertFalse(ContainerClass.receivedCompletion) + XCTAssertFalse(StoredObject.storedObjectReleased) + XCTAssertFalse(StoredObject2.storedObjectReleased) + // When the leak was fixed, the stream started crashing because cancel + // was called twice on relay. A fix for the crash was added, + // so setting the container to nil which deallocates cancellables + // confirms there is no crash. + container = nil + XCTAssertTrue(StoredObject.storedObjectReleased) + XCTAssertTrue(StoredObject2.storedObjectReleased) + XCTAssertNil(container) + } } #endif