From 9e3cbdf0a37c7054f1f39df65070b85c13b9d5da Mon Sep 17 00:00:00 2001 From: Seungyeop Yeom Date: Sat, 11 Oct 2025 21:11:15 +0900 Subject: [PATCH] Add throttle operator --- Sources/OneWay/AnyEffect.swift | 44 ++++++ Sources/OneWay/Store.swift | 162 ++++++++++++++++++---- Sources/OneWayTesting/Store+Testing.swift | 8 +- Tests/OneWayTests/StoreTests.swift | 50 +++++++ 4 files changed, 233 insertions(+), 31 deletions(-) diff --git a/Sources/OneWay/AnyEffect.swift b/Sources/OneWay/AnyEffect.swift index eb9702f..937fcd3 100644 --- a/Sources/OneWay/AnyEffect.swift +++ b/Sources/OneWay/AnyEffect.swift @@ -14,6 +14,7 @@ public struct AnyEffect: Effect where Element: Sendable { public enum Method: Sendable { case register(any EffectID, cancelInFlight: Bool) case cancel(any EffectID) + case throttle(id: any EffectID, interval: Double, latest: Bool) case none } @@ -158,6 +159,49 @@ public struct AnyEffect: Effect where Element: Sendable { ) return copy } + + /// Creates an effect that emits elements from this effect, but only if a certain amount of time + /// has passed between emissions. + /// + /// First, create a Hashable ID that will be used to identify the throttle effect: + /// + /// ```swift + /// enum ThrottleID { + /// case button + /// } + /// ``` + /// + /// Then, apply the `throttle` modifier using the defined ID: + /// + /// ```swift + /// func reduce(state: inout State, action: Action) -> AnyEffect { + /// switch action { + /// // ... + /// case .perform: + /// return .just(.increment) + /// .throttle(id: ThrottleID.button, for: 1.0) + /// // ... + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - id: The effect's identifier. + /// - seconds: The duration for which the effect should wait before sending an element. + /// - latest: A boolean value that indicates whether to emit the most recent element. + /// If `false`, the effect emits the first element and ignores subsequent elements during the + /// time interval. If `true`, the effect emits the first element and then the most recent + /// element after the time interval has passed. Defaults to `false`. + /// - Returns: A new effect that emits elements according to the throttle behavior. + public consuming func throttle( + id: some EffectID, + for seconds: Double, + latest: Bool = false + ) -> Self { + var copy = self + copy.method = .throttle(id: id, interval: seconds, latest: latest) + return copy + } } extension AnyEffect { diff --git a/Sources/OneWay/Store.swift b/Sources/OneWay/Store.swift index db1d7e6..4269712 100644 --- a/Sources/OneWay/Store.swift +++ b/Sources/OneWay/Store.swift @@ -8,6 +8,9 @@ #if canImport(Foundation) import Foundation #endif +#if canImport(CoreFoundation) +import CoreFoundation +#endif /// `Store` is an actor that holds and manages state values. /// @@ -53,6 +56,9 @@ where R.Action: Sendable, R.State: Sendable & Equatable { private var bindingTask: Task? private var tasks: [TaskID: Task] = [:] private var cancellables: [EffectIDWrapper: Set] = [:] + private var throttleTimestamps: [EffectIDWrapper: any Sendable] = [:] + private var throttleTimestampsUsingAbsoluteTime: [EffectIDWrapper: CFAbsoluteTime] = [:] + private var trailingThrottledEffects: [EffectIDWrapper: AnyEffect] = [:] /// Initializes a store from a reducer and an initial state. /// @@ -86,35 +92,15 @@ where R.Action: Sendable, R.State: Sendable & Equatable { isProcessing = true await Task.yield() for action in actionQueue { - let taskID = TaskID() let effect = reducer.reduce(state: &state, action: action) - let task = Task { [weak self, taskID] in - guard !Task.isCancelled else { return } - for await value in effect.values { - guard let self else { break } - guard !Task.isCancelled else { break } - await send(value) - } - await self?.removeTask(taskID) + let isThrottled: Bool + if #available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) { + isThrottled = await throttleIfNeeded(for: effect) + } else { + isThrottled = await throttleIfNeededUsingAbsoluteTime(for: effect) } - tasks[taskID] = task - - switch effect.method { - case let .register(id, cancelInFlight): - let effectID = EffectIDWrapper(id) - if cancelInFlight { - let taskIDs = cancellables[effectID, default: []] - taskIDs.forEach { removeTask($0) } - cancellables.removeValue(forKey: effectID) - } - cancellables[effectID, default: []].insert(taskID) - case let .cancel(id): - let effectID = EffectIDWrapper(id) - let taskIDs = cancellables[effectID, default: []] - taskIDs.forEach { removeTask($0) } - cancellables.removeValue(forKey: effectID) - case .none: - break + if !isThrottled { + await execute(effect: effect) } } actionQueue = [] @@ -130,6 +116,128 @@ where R.Action: Sendable, R.State: Sendable & Equatable { tasks.forEach { $0.value.cancel() } tasks.removeAll() actionQueue.removeAll() + cancellables.removeAll() + trailingThrottledEffects.removeAll() + if #available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) { + throttleTimestamps.removeAll() + } else { + throttleTimestampsUsingAbsoluteTime.removeAll() + } + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + private func throttleIfNeeded(for effect: AnyEffect) async -> Bool { + guard case let .throttle(id, interval, latest) = effect.method else { + return false + } + let effectID = EffectIDWrapper(id) + let clock = ContinuousClock() + let now = clock.now + if let last = throttleTimestamps[effectID] as? ContinuousClock.Instant { + if last.duration(to: now) < .seconds(interval) { + print("🔥 interval1", last.duration(to: now), "<" , interval, state) + } else { + print("🔥 interval1", last.duration(to: now), ">" , interval, state) + } + } + if let last = throttleTimestamps[effectID] as? ContinuousClock.Instant, + last.duration(to: now) < .seconds(interval) { + if latest { + trailingThrottledEffects[effectID] = effect + } + return true + } else { + if throttleTimestamps[effectID] == nil { + print("🔥 new", state) + } + throttleTimestamps[effectID] = now + if latest { + Task { [weak self] in + do { + try await clock.sleep(for: .seconds(interval)) + await self?.executeTrailingThrottledEffects(effectID) + } + catch { + await self?.removeTrailingThrottledEffects(effectID) + } + } + } + return false + } + } + + private func throttleIfNeededUsingAbsoluteTime(for effect: AnyEffect) async -> Bool { + guard case let .throttle(id, interval, latest) = effect.method else { + return false + } + let effectID = EffectIDWrapper(id) + let now = CFAbsoluteTimeGetCurrent() + if let last = throttleTimestampsUsingAbsoluteTime[effectID], + (now - last) < interval { + if latest { + trailingThrottledEffects[effectID] = effect + } + return true + } else { + throttleTimestampsUsingAbsoluteTime[effectID] = now + if latest { + Task { [weak self] in + do { + let NSEC_PER_SEC: Double = 1_000_000_000 + let nanoseconds = UInt64(interval * NSEC_PER_SEC) + try await Task.sleep(nanoseconds: nanoseconds) + await self?.executeTrailingThrottledEffects(effectID) + } + catch { + await self?.removeTrailingThrottledEffects(effectID) + } + } + } + return false + } + } + + private func execute(effect: AnyEffect) async { + let taskID = TaskID() + let task = Task { [weak self, taskID] in + guard !Task.isCancelled else { return } + for await value in effect.values { + guard let self else { break } + guard !Task.isCancelled else { break } + await send(value) + } + await self?.removeTask(taskID) + } + tasks[taskID] = task + + switch effect.method { + case let .register(id, cancelInFlight): + let effectID = EffectIDWrapper(id) + if cancelInFlight { + let taskIDs = cancellables[effectID, default: []] + taskIDs.forEach { removeTask($0) } + cancellables.removeValue(forKey: effectID) + } + cancellables[effectID, default: []].insert(taskID) + case let .cancel(id): + let effectID = EffectIDWrapper(id) + let taskIDs = cancellables[effectID, default: []] + taskIDs.forEach { removeTask($0) } + cancellables.removeValue(forKey: effectID) + case .throttle, + .none: + break + } + } + + private func executeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async { + if let effect = trailingThrottledEffects.removeValue(forKey: effectID) { + await execute(effect: effect) + } + } + + private func removeTrailingThrottledEffects(_ effectID: EffectIDWrapper) async { + trailingThrottledEffects.removeValue(forKey: effectID) } private func bindExternalEffect() { diff --git a/Sources/OneWayTesting/Store+Testing.swift b/Sources/OneWayTesting/Store+Testing.swift index 9d56de0..cd4a76c 100644 --- a/Sources/OneWayTesting/Store+Testing.swift +++ b/Sources/OneWayTesting/Store+Testing.swift @@ -69,7 +69,7 @@ extension Store { #if canImport(XCTest) if isTimeout && result != input { XCTFail( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", file: filePath, line: line ) @@ -87,7 +87,7 @@ extension Store { case .testing: if isTimeout && result != input { Issue.record( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", sourceLocation: Testing.SourceLocation( fileID: String(describing: fileID), filePath: String(describing: filePath), @@ -156,7 +156,7 @@ extension Store { #if canImport(XCTest) if isTimeout && result != input { XCTFail( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", file: filePath, line: line ) @@ -174,7 +174,7 @@ extension Store { case .testing: if isTimeout && result != input { Issue.record( - "Exceeded timeout of \(timeout) seconds", + "Timeout exceeded \(timeout) seconds: received \(input), expected \(result)", sourceLocation: Testing.SourceLocation( fileID: String(describing: fileID), filePath: String(describing: filePath), diff --git a/Tests/OneWayTests/StoreTests.swift b/Tests/OneWayTests/StoreTests.swift index e327c93..3116a4c 100644 --- a/Tests/OneWayTests/StoreTests.swift +++ b/Tests/OneWayTests/StoreTests.swift @@ -249,6 +249,41 @@ final class StoreTests: XCTestCase { await sut.expect(\.count, 10) } + + func test_throttle() async { + let clock = ContinuousClock() + await sut.send(.throttledIncrement) + await sut.send(.throttledIncrement) + try! await clock.sleep(for: .seconds(0.01)) + await sut.send(.throttledIncrement) + await sut.expect(\.count, 1) + + try! await clock.sleep(for: .seconds(0.1)) + await sut.expect(\.count, 1) + + await sut.send(.throttledIncrement) + await sut.expect(\.count, 2) + } + + func test_throttle_latest() async { + let clock = ContinuousClock() + await sut.send(.throttledIncrementLatest) + await sut.expect(\.count, 1) + + await sut.send(.throttledIncrementLatest) + await sut.expect(\.count, 1) + + try! await clock.sleep(for: .seconds(0.1)) + await sut.expect(\.count, 2) + + await sut.send(.throttledIncrementLatest) + try! await clock.sleep(for: .seconds(0.01)) + await sut.send(.throttledIncrementLatest) + await sut.expect(\.count, 3) + + try! await clock.sleep(for: .seconds(0.1)) + await sut.expect(\.count, 4) + } } #if canImport(Combine) @@ -274,6 +309,8 @@ private struct TestReducer: Reducer { case debouncedIncrementWithClock case debouncedSequence case debouncedSequenceWithClock + case throttledIncrement + case throttledIncrementLatest } struct State: Equatable { @@ -296,6 +333,11 @@ private struct TestReducer: Reducer { case incrementSequence } + enum Throttle { + case increment + case incrementLatest + } + func reduce(state: inout State, action: Action) -> AnyEffect { switch action { case .increment: @@ -358,6 +400,14 @@ private struct TestReducer: Reducer { send(.increment) } .debounce(id: Debounce.incrementSequence, for: .seconds(100), clock: clock) + + case .throttledIncrement: + return .just(.increment) + .throttle(id: Throttle.increment, for: 0.1) + + case .throttledIncrementLatest: + return .just(.increment) + .throttle(id: Throttle.incrementLatest, for: 0.1, latest: true) } }