Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions Sources/OneWay/AnyEffect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public struct AnyEffect<Element>: Effect where Element: Sendable {
public enum Method: Sendable {
case register(any EffectID, cancelInFlight: Bool)
case cancel(any EffectID)
case throttle(id: any EffectID, interval: Double, latest: Bool)
case none
}

Expand Down Expand Up @@ -158,6 +159,49 @@ public struct AnyEffect<Element>: Effect where Element: Sendable {
)
return copy
}

/// Creates an effect that emits elements from this effect, but only if a certain amount of time
/// has passed between emissions.
///
/// First, create a Hashable ID that will be used to identify the throttle effect:
///
/// ```swift
/// enum ThrottleID {
/// case button
/// }
/// ```
///
/// Then, apply the `throttle` modifier using the defined ID:
///
/// ```swift
/// func reduce(state: inout State, action: Action) -> AnyEffect<Action> {
/// switch action {
/// // ...
/// case .perform:
/// return .just(.increment)
/// .throttle(id: ThrottleID.button, for: 1.0)
/// // ...
/// }
/// }
/// ```
///
/// - Parameters:
/// - id: The effect's identifier.
/// - seconds: The duration for which the effect should wait before sending an element.
/// - latest: A boolean value that indicates whether to emit the most recent element.
/// If `false`, the effect emits the first element and ignores subsequent elements during the
/// time interval. If `true`, the effect emits the first element and then the most recent
/// element after the time interval has passed. Defaults to `false`.
/// - Returns: A new effect that emits elements according to the throttle behavior.
public consuming func throttle(
id: some EffectID,
for seconds: Double,
latest: Bool = false
) -> Self {
var copy = self
copy.method = .throttle(id: id, interval: seconds, latest: latest)
return copy
}
}

extension AnyEffect {
Expand Down
162 changes: 135 additions & 27 deletions Sources/OneWay/Store.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#if canImport(Foundation)
import Foundation
#endif
#if canImport(CoreFoundation)
import CoreFoundation
#endif

/// `Store` is an actor that holds and manages state values.
///
Expand Down Expand Up @@ -53,6 +56,9 @@ where R.Action: Sendable, R.State: Sendable & Equatable {
private var bindingTask: Task<Void, Never>?
private var tasks: [TaskID: Task<Void, Never>] = [:]
private var cancellables: [EffectIDWrapper: Set<TaskID>] = [:]
private var throttleTimestamps: [EffectIDWrapper: any Sendable] = [:]
private var throttleTimestampsUsingAbsoluteTime: [EffectIDWrapper: CFAbsoluteTime] = [:]
private var trailingThrottledEffects: [EffectIDWrapper: AnyEffect<Action>] = [:]

/// Initializes a store from a reducer and an initial state.
///
Expand Down Expand Up @@ -86,35 +92,15 @@ where R.Action: Sendable, R.State: Sendable & Equatable {
isProcessing = true
await Task.yield()
for action in actionQueue {
let taskID = TaskID()
let effect = reducer.reduce(state: &state, action: action)
let task = Task { [weak self, taskID] in
guard !Task.isCancelled else { return }
for await value in effect.values {
guard let self else { break }
guard !Task.isCancelled else { break }
await send(value)
}
await self?.removeTask(taskID)
let isThrottled: Bool
if #available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) {
isThrottled = await throttleIfNeeded(for: effect)
} else {
isThrottled = await throttleIfNeededUsingAbsoluteTime(for: effect)
}
tasks[taskID] = task

switch effect.method {
case let .register(id, cancelInFlight):
let effectID = EffectIDWrapper(id)
if cancelInFlight {
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
}
cancellables[effectID, default: []].insert(taskID)
case let .cancel(id):
let effectID = EffectIDWrapper(id)
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
case .none:
break
if !isThrottled {
await execute(effect: effect)
}
}
actionQueue = []
Expand All @@ -130,6 +116,128 @@ where R.Action: Sendable, R.State: Sendable & Equatable {
tasks.forEach { $0.value.cancel() }
tasks.removeAll()
actionQueue.removeAll()
cancellables.removeAll()
trailingThrottledEffects.removeAll()
if #available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) {
throttleTimestamps.removeAll()
} else {
throttleTimestampsUsingAbsoluteTime.removeAll()
}
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
private func throttleIfNeeded(for effect: AnyEffect<Action>) async -> Bool {
guard case let .throttle(id, interval, latest) = effect.method else {
return false
}
let effectID = EffectIDWrapper(id)
let clock = ContinuousClock()
let now = clock.now
if let last = throttleTimestamps[effectID] as? ContinuousClock.Instant {
if last.duration(to: now) < .seconds(interval) {
print("πŸ”₯ interval1", last.duration(to: now), "<" , interval, state)
} else {
print("πŸ”₯ interval1", last.duration(to: now), ">" , interval, state)
}
}
if let last = throttleTimestamps[effectID] as? ContinuousClock.Instant,
last.duration(to: now) < .seconds(interval) {
if latest {
trailingThrottledEffects[effectID] = effect
}
return true
} else {
if throttleTimestamps[effectID] == nil {
print("πŸ”₯ new", state)
}
throttleTimestamps[effectID] = now
if latest {
Task { [weak self] in
do {
try await clock.sleep(for: .seconds(interval))
await self?.executeTrailingThrottledEffects(effectID)
}
catch {
await self?.removeTrailingThrottledEffects(effectID)
}
}
}
return false
}
}

private func throttleIfNeededUsingAbsoluteTime(for effect: AnyEffect<Action>) async -> Bool {
guard case let .throttle(id, interval, latest) = effect.method else {
return false
}
let effectID = EffectIDWrapper(id)
let now = CFAbsoluteTimeGetCurrent()
if let last = throttleTimestampsUsingAbsoluteTime[effectID],
(now - last) < interval {
if latest {
trailingThrottledEffects[effectID] = effect
}
return true
} else {
throttleTimestampsUsingAbsoluteTime[effectID] = now
if latest {
Task { [weak self] in
do {
let NSEC_PER_SEC: Double = 1_000_000_000
let nanoseconds = UInt64(interval * NSEC_PER_SEC)
try await Task.sleep(nanoseconds: nanoseconds)
await self?.executeTrailingThrottledEffects(effectID)
}
catch {
await self?.removeTrailingThrottledEffects(effectID)
}
}
}
return false
}
}

private func execute(effect: AnyEffect<Action>) async {
let taskID = TaskID()
let task = Task { [weak self, taskID] in
guard !Task.isCancelled else { return }
for await value in effect.values {
guard let self else { break }
guard !Task.isCancelled else { break }
await send(value)
}
await self?.removeTask(taskID)
}
tasks[taskID] = task

switch effect.method {
case let .register(id, cancelInFlight):
let effectID = EffectIDWrapper(id)
if cancelInFlight {
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
}
cancellables[effectID, default: []].insert(taskID)
case let .cancel(id):
let effectID = EffectIDWrapper(id)
let taskIDs = cancellables[effectID, default: []]
taskIDs.forEach { removeTask($0) }
cancellables.removeValue(forKey: effectID)
case .throttle,
.none:
break
}
}

private func executeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async {
if let effect = trailingThrottledEffects.removeValue(forKey: effectID) {
await execute(effect: effect)
}
}

private func removeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async {
trailingThrottledEffects.removeValue(forKey: effectID)
}

private func bindExternalEffect() {
Expand Down
8 changes: 4 additions & 4 deletions Sources/OneWayTesting/Store+Testing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ extension Store {
#if canImport(XCTest)
if isTimeout && result != input {
XCTFail(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
file: filePath,
line: line
)
Expand All @@ -87,7 +87,7 @@ extension Store {
case .testing:
if isTimeout && result != input {
Issue.record(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
sourceLocation: Testing.SourceLocation(
fileID: String(describing: fileID),
filePath: String(describing: filePath),
Expand Down Expand Up @@ -156,7 +156,7 @@ extension Store {
#if canImport(XCTest)
if isTimeout && result != input {
XCTFail(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
file: filePath,
line: line
)
Expand All @@ -174,7 +174,7 @@ extension Store {
case .testing:
if isTimeout && result != input {
Issue.record(
"Exceeded timeout of \(timeout) seconds",
"Timeout exceeded \(timeout) seconds: received \(input), expected \(result)",
sourceLocation: Testing.SourceLocation(
fileID: String(describing: fileID),
filePath: String(describing: filePath),
Expand Down
50 changes: 50 additions & 0 deletions Tests/OneWayTests/StoreTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,41 @@ final class StoreTests: XCTestCase {

await sut.expect(\.count, 10)
}

func test_throttle() async {
let clock = ContinuousClock()
await sut.send(.throttledIncrement)
await sut.send(.throttledIncrement)
try! await clock.sleep(for: .seconds(0.01))
await sut.send(.throttledIncrement)
await sut.expect(\.count, 1)

try! await clock.sleep(for: .seconds(0.1))
await sut.expect(\.count, 1)

await sut.send(.throttledIncrement)
await sut.expect(\.count, 2)
}

func test_throttle_latest() async {
let clock = ContinuousClock()
await sut.send(.throttledIncrementLatest)
await sut.expect(\.count, 1)

await sut.send(.throttledIncrementLatest)
await sut.expect(\.count, 1)

try! await clock.sleep(for: .seconds(0.1))
await sut.expect(\.count, 2)

await sut.send(.throttledIncrementLatest)
try! await clock.sleep(for: .seconds(0.01))
await sut.send(.throttledIncrementLatest)
await sut.expect(\.count, 3)

try! await clock.sleep(for: .seconds(0.1))
await sut.expect(\.count, 4)
}
}

#if canImport(Combine)
Expand All @@ -274,6 +309,8 @@ private struct TestReducer: Reducer {
case debouncedIncrementWithClock
case debouncedSequence
case debouncedSequenceWithClock
case throttledIncrement
case throttledIncrementLatest
}

struct State: Equatable {
Expand All @@ -296,6 +333,11 @@ private struct TestReducer: Reducer {
case incrementSequence
}

enum Throttle {
case increment
case incrementLatest
}

func reduce(state: inout State, action: Action) -> AnyEffect<Action> {
switch action {
case .increment:
Expand Down Expand Up @@ -358,6 +400,14 @@ private struct TestReducer: Reducer {
send(.increment)
}
.debounce(id: Debounce.incrementSequence, for: .seconds(100), clock: clock)

case .throttledIncrement:
return .just(.increment)
.throttle(id: Throttle.increment, for: 0.1)

case .throttledIncrementLatest:
return .just(.increment)
.throttle(id: Throttle.incrementLatest, for: 0.1, latest: true)
}
}

Expand Down
Loading