From 8a8660c073a85589e809794aa56c0699dee743c5 Mon Sep 17 00:00:00 2001 From: SeungYeop Yeom Date: Tue, 16 Jan 2024 22:48:05 +0900 Subject: [PATCH] Add `Create` effect that creates an asynchronous stream --- Sources/OneWay/AnyEffect.swift | 21 ++++++ Sources/OneWay/Effect.swift | 25 +++++++ Tests/OneWayTests/EffectTests.swift | 105 ++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+) diff --git a/Sources/OneWay/AnyEffect.swift b/Sources/OneWay/AnyEffect.swift index 070152f..4fff697 100644 --- a/Sources/OneWay/AnyEffect.swift +++ b/Sources/OneWay/AnyEffect.swift @@ -246,4 +246,25 @@ extension AnyEffect { build() ).eraseToAnyEffect() } + + /// An effect that creates an asynchronous stream. + /// + /// - Parameters: + /// - bufferingPolicy: A `Continuation.BufferingPolicy` value to set the stream's buffering + /// behavior. By default, the stream buffers an unlimited number of elements. You can also set + /// the policy to buffer a specified number of oldest or newest elements. + /// - build: A custom closure that yields values to the `AsyncStream`. This closure receives + /// an `AsyncStream.Continuation` instance that it uses to provide elements to the stream and + /// terminate the stream when finished. + /// - Returns: A new effect. + @inlinable + public static func create( + bufferingPolicy: AsyncStream.Continuation.BufferingPolicy = .unbounded, + build: @escaping (AsyncStream.Continuation) -> Void + ) -> AnyEffect { + Effects.Create( + bufferingPolicy: bufferingPolicy, + build: build + ).eraseToAnyEffect() + } } diff --git a/Sources/OneWay/Effect.swift b/Sources/OneWay/Effect.swift index 2de79f6..ba4159f 100644 --- a/Sources/OneWay/Effect.swift +++ b/Sources/OneWay/Effect.swift @@ -203,4 +203,29 @@ public enum Effects { } } } + + /// An effect that creates an asynchronous stream. + public struct Create: Effect where Element: Sendable { + private let stream: AsyncStream + + /// Initializes a `Create` effect. + /// + /// - Parameters: + /// - bufferingPolicy: A `Continuation.BufferingPolicy` value to set the stream's + /// buffering behavior. By default, the stream buffers an unlimited number of elements. + /// You can also set the policy to buffer a specified number of oldest or newest elements. + /// - build: A custom closure that yields values to the `AsyncStream`. This closure + /// receives an `AsyncStream.Continuation` instance that it uses to provide elements to + /// the stream and terminate the stream when finished. + public init( + bufferingPolicy: AsyncStream.Continuation.BufferingPolicy = .unbounded, + build: @escaping (AsyncStream.Continuation) -> Void + ) { + self.stream = AsyncStream(bufferingPolicy: bufferingPolicy, build) + } + + public var values: AsyncStream { + stream + } + } } diff --git a/Tests/OneWayTests/EffectTests.swift b/Tests/OneWayTests/EffectTests.swift index afb0ce5..7a59ec6 100644 --- a/Tests/OneWayTests/EffectTests.swift +++ b/Tests/OneWayTests/EffectTests.swift @@ -301,4 +301,109 @@ final class EffectTests: XCTestCase { ] ) } + + func test_createSynchronously() async { + let values = Effects.Create { continuation in + continuation.yield(Action.first) + continuation.yield(Action.second) + continuation.yield(Action.third) + continuation.yield(Action.fourth) + continuation.yield(Action.fifth) + continuation.finish() + }.values + + var result: [Action] = [] + for await value in values { + result.append(value) + } + + XCTAssertEqual( + result, + [ + .first, + .second, + .third, + .fourth, + .fifth, + ] + ) + } + + func test_createAsynchronously() async { + let clock = TestClock() + + let values = Effects.Create { continuation in + Task { @MainActor in + try! await clock.sleep(for: .seconds(100)) + continuation.yield(Action.first) + continuation.yield(Action.second) + } + Task { @MainActor in + try! await clock.sleep(for: .seconds(200)) + continuation.yield(Action.third) + continuation.yield(Action.fourth) + continuation.yield(Action.fifth) + } + Task { @MainActor in + try! await clock.sleep(for: .seconds(300)) + continuation.finish() + } + }.values + + var result: [Action] = [] + await clock.advance(by: .seconds(300)) + for await value in values { + result.append(value) + } + + XCTAssertEqual( + result, + [ + .first, + .second, + .third, + .fourth, + .fifth, + ] + ) + } + + func test_createAsynchronouslyWithCompletionHandler() async { + let values = Effects.Create { continuation in + perform { action in + continuation.yield(action) + if action == .fifth { + continuation.finish() + } + } + }.values + + var result: [Action] = [] + for await value in values { + result.append(value) + } + + XCTAssertEqual( + result, + [ + .first, + .second, + .third, + .fourth, + .fifth, + ] + ) + + func perform(completionHandler: @Sendable @escaping (Action) -> Void) { + DispatchQueue.main.asyncAfter(deadline: .now() + 0.1) { + completionHandler(.first) + completionHandler(.second) + } + DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) { + completionHandler(.third) + completionHandler(.fourth) + completionHandler(.fifth) + } + } + } }