diff --git a/Sources/OneWay/AnyEffect.swift b/Sources/OneWay/AnyEffect.swift index e7ebd15..d639b98 100644 --- a/Sources/OneWay/AnyEffect.swift +++ b/Sources/OneWay/AnyEffect.swift @@ -14,6 +14,7 @@ public struct AnyEffect: Effect where Element: Sendable { public enum Method: Sendable { case register(any EffectID, cancelInFlight: Bool) case cancel(any EffectID) + case throttle(id: any EffectID, interval: Duration, latest: Bool) case none } @@ -158,6 +159,49 @@ public struct AnyEffect: Effect where Element: Sendable { ) return copy } + + /// Creates an effect that emits elements from this effect, but only if a certain amount of time + /// has passed between emissions. + /// + /// First, create a `Hashable` ID that will be used to identify the throttle effect: + /// + /// ```swift + /// enum ThrottleID { + /// case button + /// } + /// ``` + /// + /// Then, apply the `throttle` modifier using the defined ID: + /// + /// ```swift + /// func reduce(state: inout State, action: Action) -> AnyEffect { + /// switch action { + /// // ... + /// case .perform: + /// return .just(.increment) + /// .throttle(id: ThrottleID.button, for: .seconds(1)) + /// // ... + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - id: The effect’s identifier. + /// - interval: The duration that must elapse before another element can be emitted. + /// - latest: A Boolean value indicating whether to emit the most recent element. + /// If `false`, the effect emits the first element and ignores subsequent ones during the + /// interval. If `true`, it emits the first element and then the most recent element once + /// the interval has passed. Defaults to `false`. + /// - Returns: A new effect that emits elements according to the throttle behavior. + public consuming func throttle( + id: some EffectID, + for interval: Duration, + latest: Bool = false + ) -> Self { + var copy = self + copy.method = .throttle(id: id, interval: interval, latest: latest) + return copy + } } extension AnyEffect { diff --git a/Sources/OneWay/Store.swift b/Sources/OneWay/Store.swift index 316b59f..9b1f619 100644 --- a/Sources/OneWay/Store.swift +++ b/Sources/OneWay/Store.swift @@ -14,7 +14,7 @@ import Foundation /// It is fully thread-safe as it is implemented using an actor. It stores the `State` and can /// change the `State` by receiving `Actions`. You can define `Action` and `State` in ``Reducer``. /// If you create a data flow through `Store`, you can make it flow in one direction. -public actor Store +public actor Store> where R.Action: Sendable, R.State: Sendable & Equatable { /// A convenience type alias for referring to a action of a given reducer's action. public typealias Action = R.Action @@ -46,27 +46,34 @@ where R.Action: Sendable, R.State: Sendable & Equatable { !isProcessing && tasks.isEmpty } - private let reducer: any Reducer + private let reducer: R + private let clock: C private let continuation: AsyncStream.Continuation private var isProcessing: Bool = false private var actionQueue: [Action] = [] private var bindingTask: Task? private var tasks: [TaskID: Task] = [:] private var cancellables: [EffectIDWrapper: Set] = [:] + private var throttleTimestamps: [EffectIDWrapper: C.Instant] = [:] + private var trailingThrottledEffects: [EffectIDWrapper: AnyEffect] = [:] - /// Initializes a store from a reducer and an initial state. + /// Initializes a store from a reducer, an initial state, and a clock. /// /// - Parameters: - /// - reducer: The reducer is responsible for transitioning the current state to the next - /// state. - /// - state: The state to initialize a store. + /// - reducer: The reducer responsible for transitioning the current state to the next + /// state in response to actions. + /// - state: The initial state used to create the store. + /// - clock: The clock that determines how time-based effects (such as debounce or throttle) + /// are scheduled. Defaults to `ContinuousClock`. public init( reducer: @Sendable @autoclosure () -> R, - state: State + state: State, + clock: C = ContinuousClock() ) { self.initialState = state self.state = state self.reducer = reducer() + self.clock = clock (states, continuation) = AsyncStream.makeStream() Task { await bindExternalEffect() } defer { continuation.yield(state) } @@ -86,35 +93,10 @@ where R.Action: Sendable, R.State: Sendable & Equatable { isProcessing = true await Task.yield() for action in actionQueue { - let taskID = TaskID() let effect = reducer.reduce(state: &state, action: action) - let task = Task { [weak self, taskID] in - guard !Task.isCancelled else { return } - for await value in effect.values { - guard let self else { break } - guard !Task.isCancelled else { break } - await send(value) - } - await self?.removeTask(taskID) - } - tasks[taskID] = task - - switch effect.method { - case let .register(id, cancelInFlight): - let effectID = EffectIDWrapper(id) - if cancelInFlight { - let taskIDs = cancellables[effectID, default: []] - taskIDs.forEach { removeTask($0) } - cancellables.removeValue(forKey: effectID) - } - cancellables[effectID, default: []].insert(taskID) - case let .cancel(id): - let effectID = EffectIDWrapper(id) - let taskIDs = cancellables[effectID, default: []] - taskIDs.forEach { removeTask($0) } - cancellables.removeValue(forKey: effectID) - case .none: - break + let isThrottled = await throttleIfNeeded(for: effect) + if !isThrottled { + await execute(effect: effect) } } actionQueue = [] @@ -130,6 +112,81 @@ where R.Action: Sendable, R.State: Sendable & Equatable { tasks.forEach { $0.value.cancel() } tasks.removeAll() actionQueue.removeAll() + cancellables.removeAll() + trailingThrottledEffects.removeAll() + throttleTimestamps.removeAll() + } + + private func throttleIfNeeded(for effect: AnyEffect) async -> Bool { + guard case let .throttle(id, interval, latest) = effect.method else { + return false + } + let effectID = EffectIDWrapper(id) + let now = clock.now + if let last = throttleTimestamps[effectID], + last.duration(to: now) < interval { + if latest { + trailingThrottledEffects[effectID] = effect + } + return true + } else { + throttleTimestamps[effectID] = now + if latest { + Task { [weak self] in + do { + try await self?.clock.sleep(for: interval) + await self?.executeTrailingThrottledEffects(effectID) + } + catch { + await self?.removeTrailingThrottledEffects(effectID) + } + } + } + return false + } + } + + private func execute(effect: AnyEffect) async { + let taskID = TaskID() + let task = Task { [weak self, taskID] in + guard !Task.isCancelled else { return } + for await value in effect.values { + guard let self else { break } + guard !Task.isCancelled else { break } + await send(value) + } + await self?.removeTask(taskID) + } + tasks[taskID] = task + + switch effect.method { + case let .register(id, cancelInFlight): + let effectID = EffectIDWrapper(id) + if cancelInFlight { + let taskIDs = cancellables[effectID, default: []] + taskIDs.forEach { removeTask($0) } + cancellables.removeValue(forKey: effectID) + } + cancellables[effectID, default: []].insert(taskID) + case let .cancel(id): + let effectID = EffectIDWrapper(id) + let taskIDs = cancellables[effectID, default: []] + taskIDs.forEach { removeTask($0) } + cancellables.removeValue(forKey: effectID) + case .throttle, + .none: + break + } + } + + private func executeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async { + if let effect = trailingThrottledEffects.removeValue(forKey: effectID) { + await execute(effect: effect) + } + } + + private func removeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async { + trailingThrottledEffects.removeValue(forKey: effectID) } private func bindExternalEffect() { diff --git a/Sources/OneWay/ViewStore.swift b/Sources/OneWay/ViewStore.swift index 31df70c..fd7a542 100644 --- a/Sources/OneWay/ViewStore.swift +++ b/Sources/OneWay/ViewStore.swift @@ -15,7 +15,7 @@ import Combine /// It can observe state changes and send actions. It can primarily be used in SwiftUI's `View`, /// `UIView` or `UIViewController` operating on main thread. @MainActor -public final class ViewStore +public final class ViewStore> where R.Action: Sendable, R.State: Sendable & Equatable { /// A convenience type alias for referring to a action of a given reducer's action. public typealias Action = R.Action @@ -41,25 +41,29 @@ where R.Action: Sendable, R.State: Sendable & Equatable { /// state changes public let states: AsyncViewStateSequence - private let store: Store + private let store: Store private let continuation: AsyncStream.Continuation private var task: Task? - /// Initializes a view store from a reducer and an initial state. + /// Initializes a view store from a reducer, an initial state, and a clock. /// /// - Parameters: - /// - reducer: The reducer is responsible for transitioning the current state to the next - /// state. - /// - state: The state to initialize a store. + /// - reducer: The reducer responsible for transitioning the current state to the next + /// state in response to actions. + /// - state: The initial state used to create the store. + /// - clock: The clock that determines how time-based effects (such as debounce or throttle) + /// are scheduled. Defaults to `ContinuousClock`. public init( reducer: @Sendable @autoclosure () -> R, - state: State + state: State, + clock: C = ContinuousClock() ) { self.initialState = state self.state = state self.store = Store( reducer: reducer(), - state: state + state: state, + clock: clock ) let (stream, continuation) = AsyncStream.makeStream() self.states = AsyncViewStateSequence(stream) diff --git a/Sources/OneWayTesting/Store+Testing.swift b/Sources/OneWayTesting/Store+Testing.swift index c29751b..b4d92f5 100644 --- a/Sources/OneWayTesting/Store+Testing.swift +++ b/Sources/OneWayTesting/Store+Testing.swift @@ -69,7 +69,7 @@ extension Store { #if canImport(XCTest) if isTimeout && result != input { XCTFail( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", file: filePath, line: line ) @@ -87,7 +87,7 @@ extension Store { case .testing: if isTimeout && result != input { Issue.record( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", sourceLocation: Testing.SourceLocation( fileID: String(describing: fileID), filePath: String(describing: filePath), @@ -156,7 +156,7 @@ extension Store { #if canImport(XCTest) if isTimeout && result != input { XCTFail( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", file: filePath, line: line ) @@ -174,7 +174,7 @@ extension Store { case .testing: if isTimeout && result != input { Issue.record( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", sourceLocation: Testing.SourceLocation( fileID: String(describing: fileID), filePath: String(describing: filePath), diff --git a/Tests/OneWayTests/StoreTests.swift b/Tests/OneWayTests/StoreTests.swift index d6a74cf..9be0f12 100644 --- a/Tests/OneWayTests/StoreTests.swift +++ b/Tests/OneWayTests/StoreTests.swift @@ -14,7 +14,7 @@ import OneWayTesting import XCTest final class StoreTests: XCTestCase { - private var sut: Store! + private var sut: Store>! private var clock: TestClock! override func setUp() { @@ -23,7 +23,8 @@ final class StoreTests: XCTestCase { self.clock = clock sut = Store( reducer: TestReducer(clock: clock), - state: TestReducer.State(count: 0, text: "") + state: TestReducer.State(count: 0, text: ""), + clock: clock ) } @@ -202,6 +203,39 @@ final class StoreTests: XCTestCase { await sut.expect(\.count, 10) } + + func test_throttle() async { + await sut.send(.throttledIncrement) + await sut.send(.throttledIncrement) + await clock.advance(by: .seconds(10)) + await sut.send(.throttledIncrement) + await sut.expect(\.count, 1) + + await clock.advance(by: .seconds(100)) + await sut.expect(\.count, 1) + + await sut.send(.throttledIncrement) + await sut.expect(\.count, 2) + } + + func test_throttle_latest() async { + await sut.send(.throttledIncrementLatest) + await sut.expect(\.count, 1) + + await sut.send(.throttledIncrementLatest) + await sut.expect(\.count, 1) + + await clock.advance(by: .seconds(100)) + await sut.expect(\.count, 2) + + await sut.send(.throttledIncrementLatest) + await clock.advance(by: .seconds(10)) + await sut.send(.throttledIncrementLatest) + await sut.expect(\.count, 3) + + await clock.advance(by: .seconds(100)) + await sut.expect(\.count, 4) + } } #if canImport(Combine) @@ -224,6 +258,8 @@ private struct TestReducer: Reducer { case cancelLongTimeTask case debouncedIncrement case debouncedSequence + case throttledIncrement + case throttledIncrementLatest } struct State: Equatable { @@ -246,6 +282,11 @@ private struct TestReducer: Reducer { case incrementSequence } + enum Throttle { + case increment + case incrementLatest + } + func reduce(state: inout State, action: Action) -> AnyEffect { switch action { case .increment: @@ -294,6 +335,14 @@ private struct TestReducer: Reducer { send(.increment) } .debounce(id: Debounce.incrementSequence, for: .seconds(100), clock: clock) + + case .throttledIncrement: + return .just(.increment) + .throttle(id: Throttle.increment, for: .seconds(100)) + + case .throttledIncrementLatest: + return .just(.increment) + .throttle(id: Throttle.incrementLatest, for: .seconds(100), latest: true) } } diff --git a/Tests/OneWayTests/ViewStoreTests.swift b/Tests/OneWayTests/ViewStoreTests.swift index cc0a801..7c7038f 100644 --- a/Tests/OneWayTests/ViewStoreTests.swift +++ b/Tests/OneWayTests/ViewStoreTests.swift @@ -11,7 +11,7 @@ import XCTest #if !os(Linux) final class ViewStoreTests: XCTestCase { @MainActor - private var sut: ViewStore! + private var sut: ViewStore! @MainActor override func setUp() async throws {