Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion Sources/Common/Sink.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
private var upstreamSubscription: Subscription?
private let transformOutput: TransformOutput?
private let transformFailure: TransformFailure?
private var upstreamIsCancelled = false

/// Initialize a new sink subscribing to the upstream publisher and
/// fulfilling the demand of the downstream subscriber using a backpresurre
Expand All @@ -41,7 +42,18 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
self.buffer = DemandBuffer(subscriber: downstream)
self.transformOutput = transformOutput
self.transformFailure = transformFailure
upstream.subscribe(self)

// A subscription can only be cancelled once. The `upstreamIsCancelled` value
// is used to suppress a second call to cancel when the Sink is deallocated,
// when a sink receives completion, and when a custom operator like `withLatestFrom`
// calls `cancelUpstream()` manually.
upstream
.handleEvents(
receiveCancel: { [weak self] in
self?.upstreamIsCancelled = true
}
)
.subscribe(self)
}

func demand(_ demand: Subscribers.Demand) {
Expand Down Expand Up @@ -93,6 +105,8 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
}

func cancelUpstream() {
guard upstreamIsCancelled == false else { return }

upstreamSubscription.kill()
}

Expand Down
3 changes: 2 additions & 1 deletion Sources/Relays/CurrentValueRelay.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ private extension CurrentValueRelay {
func forceFinish() {
self.sink?.shouldForwardCompletion = true
self.sink?.receive(completion: .finished)
self.sink = nil
}
Comment on lines 75 to 79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now cancel and forceFinish are the same you can simplify and maybe just keep the cancel method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent of the method is clearer by leaving it in the forceFinish method. It is called by cancel method of CurrentValueRelay.Subscription and deinit method of CurrentValueRelay.

The method isn't cancelling the subscription, it is finishing the subscription. It just happens to be called from cancel, because of how a CurrentValueRelay works.


func request(_ demand: Subscribers.Demand) {
sink?.demand(demand)
}

func cancel() {
sink = nil
forceFinish()
}
}
}
Expand Down
204 changes: 204 additions & 0 deletions Tests/CurrentValueRelayTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,209 @@ class CurrentValueRelayTests: XCTestCase {
XCTAssertFalse(completed)
XCTAssertEqual(values, ["initial", "1", "2", "3"])
}

// There was a race condition which caused the value of a relay
// to leak. Details of the race condition are in this PR:
//
// https://github.com/CombineCommunity/CombineExt/pull/137
//
// The easiest way to reproduce the race condition is
// to initialize `cancellables` before `relay`.
// The first two tests confirm the value of the relay is
// released regardless of when cancellables is initialized.
//
// The last two tests check the scenario where a relay is
// chained with a withLatestFrom operator. This leads
// to two objects being leaked if cancellables is initialized
// before the relays.
final class StoredObject {
static var storedObjectReleased = false

let value = 10

init() {
Self.storedObjectReleased = false
}

deinit {
Self.storedObjectReleased = true
}
}

final class StoredObject2 {
static var storedObjectReleased = false

let value = 20

init() {
Self.storedObjectReleased = false
}

deinit {
Self.storedObjectReleased = true
}
}

func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredAfterCancellables() {
final class ContainerClass {
static var receivedCompletion = false
static var receivedCancel = false

// Cancellables comes before the relay.
var cancellables = Set<AnyCancellable>()
let relay = CurrentValueRelay(StoredObject())

init() {
relay
.handleEvents(receiveCancel: {
Self.receivedCancel = true
})
.sink(
receiveCompletion: { _ in
Self.receivedCompletion = true
},
receiveValue: { _ in }
)
.store(in: &cancellables)
}
}

var container: ContainerClass? = ContainerClass()

XCTAssertFalse(ContainerClass.receivedCompletion)
XCTAssertFalse(StoredObject.storedObjectReleased)
container = nil
XCTAssertTrue(StoredObject.storedObjectReleased)
XCTAssertNil(container)

// In this case the cancellables is deallocated before the relay.
// The deinit method of AnyCancellable calls cancel for all subscriptions.
// Completion will never be called for a canceled subscription.
XCTAssertFalse(ContainerClass.receivedCompletion)
XCTAssertTrue(ContainerClass.receivedCancel)
}

func testStoredObjectIsDeallocatedWhenRelayIsDeallocatedAndDeclaredBeforeCancellables() {
final class ContainerClass {
static var receivedCompletion = false
static var receivedCancel = false

// Cancellables comes after the relay.
let relay = CurrentValueRelay(StoredObject())
var cancellables = Set<AnyCancellable>()

init() {
relay
.handleEvents(receiveCancel: {
Self.receivedCancel = true
})
.sink(
receiveCompletion: { _ in
Self.receivedCompletion = true
},
receiveValue: { _ in }
)
.store(in: &cancellables)
}
}

var container: ContainerClass? = ContainerClass()

XCTAssertFalse(ContainerClass.receivedCompletion)
XCTAssertFalse(StoredObject.storedObjectReleased)
container = nil
XCTAssertTrue(StoredObject.storedObjectReleased)
XCTAssertNil(container)

// In this case the cancellables is deinited after the CurrentValueRelay,
// so completion will be called. Since the relay was completed, cancel will
// not be called.
XCTAssertTrue(ContainerClass.receivedCompletion)
XCTAssertFalse(ContainerClass.receivedCancel)
}

func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredBeforeCancellables() {
final class ContainerClass {
static var receivedCompletion = false
static var receivedCancel = false

// Cancellables comes after the relay. In this case, there
// is no leak.
let relay = CurrentValueRelay(StoredObject())
let relay2 = CurrentValueRelay(StoredObject2())
var cancellables: Set<AnyCancellable>? = Set<AnyCancellable>()

init() {
relay
.withLatestFrom(relay2)
.handleEvents(receiveCancel: {
Self.receivedCancel = true
})
.sink(
receiveCompletion: { _ in
Self.receivedCompletion = true
},
receiveValue: { _ in }
)
.store(in: &cancellables!)
}
}

var container: ContainerClass? = ContainerClass()

XCTAssertFalse(ContainerClass.receivedCompletion)
XCTAssertFalse(StoredObject.storedObjectReleased)
XCTAssertFalse(StoredObject2.storedObjectReleased)
// When the leak was fixed, the stream started crashing because cancel
// was called twice on relay. A fix for the crash was added,
// so setting the container to nil which deallocates cancellables
// confirms there is no crash.
container = nil
XCTAssertTrue(StoredObject.storedObjectReleased)
XCTAssertTrue(StoredObject2.storedObjectReleased)
XCTAssertNil(container)
}

func testBothStoredObjectsAreDeallocatedWhenRelayAndWithLatestFromOperatorAreDeallocatedAndDeclaredAfterCancellables() {
final class ContainerClass {
static var receivedCompletion = false
static var receivedCancel = false

// Cancellables comes before the relay. In this case, the objects
// for both relays leak.
var cancellables: Set<AnyCancellable>? = Set<AnyCancellable>()
let relay = CurrentValueRelay(StoredObject())
let relay2 = CurrentValueRelay(StoredObject2())

init() {
relay
.withLatestFrom(relay2)
.handleEvents(receiveCancel: {
Self.receivedCancel = true
})
.sink(
receiveCompletion: { _ in
Self.receivedCompletion = true
},
receiveValue: { _ in }
)
.store(in: &cancellables!)
}
}

var container: ContainerClass? = ContainerClass()

XCTAssertFalse(ContainerClass.receivedCompletion)
XCTAssertFalse(StoredObject.storedObjectReleased)
XCTAssertFalse(StoredObject2.storedObjectReleased)
// When the leak was fixed, the stream started crashing because cancel
// was called twice on relay. A fix for the crash was added,
// so setting the container to nil which deallocates cancellables
// confirms there is no crash.
container = nil
XCTAssertTrue(StoredObject.storedObjectReleased)
XCTAssertTrue(StoredObject2.storedObjectReleased)
XCTAssertNil(container)
}
}
#endif