From 632e3eb9fc9789719287f00e8f2ea0b13921aa89 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Sat, 24 Feb 2024 14:51:35 -0500 Subject: [PATCH] feat: Adds overloads for invoking stream's that require an argument from the current reducer's state. --- .../swift-composable-subscriber.xcscheme | 78 +++ Package.swift | 4 + .../SubscriberReducer.swift | 465 +++++++++++++++++- .../swift_composable_subscriberTests.swift | 150 +++++- 4 files changed, 677 insertions(+), 20 deletions(-) create mode 100644 .swiftpm/xcode/xcshareddata/xcschemes/swift-composable-subscriber.xcscheme diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/swift-composable-subscriber.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/swift-composable-subscriber.xcscheme new file mode 100644 index 0000000..70249e8 --- /dev/null +++ b/.swiftpm/xcode/xcshareddata/xcschemes/swift-composable-subscriber.xcscheme @@ -0,0 +1,78 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/Package.swift b/Package.swift index 17472e4..bd17220 100644 --- a/Package.swift +++ b/Package.swift @@ -24,5 +24,9 @@ let package = Package( .product(name: "ComposableArchitecture", package: "swift-composable-architecture"), ] ), + .testTarget( + name: "swift-composable-subscriberTests", + dependencies: ["ComposableSubscriber"] + ) ] ) diff --git a/Sources/ComposableSubscriber/SubscriberReducer.swift b/Sources/ComposableSubscriber/SubscriberReducer.swift index 00db96b..c451d5b 100644 --- a/Sources/ComposableSubscriber/SubscriberReducer.swift +++ b/Sources/ComposableSubscriber/SubscriberReducer.swift @@ -2,7 +2,66 @@ import ComposableArchitecture import SwiftUI extension Reducer { - public func subscribe( + /// A higher order reducer for subscribing to an `AsyncStream` from your app. + /// + /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared. + /// + /// Then the `Reducer` on a `task` action we can do something like... + /// ```swift + /// Reduce { state, action in + /// switch action { + /// case.task: + /// return .run { send in + /// for await value in await dependency.stream() { + /// await send(.responseAction(value)) + /// } + /// } + /// } + /// } + /// ``` + /// When you have a lot of publishers/subscribers this gets very repetetive. + /// + /// This gives a new way to subscribe to an async stream using a higher order reducer. + /// + /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way. + /// + /// ## Example + /// + /// ```swift + /// @Reducer + /// struct MyFeature { + /// struct State: Equatable { + /// var numberFact: String? + /// } + /// + /// enum Action { + /// case receiveNumberFact(String) + /// case task + /// } + /// + /// @Dependency(\.numberFact.stream) var numberFactStream + /// + /// var body: some Reducer { + /// Reduce { state, action in + /// switch action { + /// case let .receiveNumberFact(numberFact): + /// state.numberFact = numberFact + /// return .none + /// case .task: + /// return .none + /// } + /// } + /// .subscribe(to: numberFactStream, on: \.task, with: \.receive) + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - stream: The async stream to subscribe to on the reducer + /// - triggerAction: The action to invoke the stream when received. + /// - responseAction: The action to invoke with the streamed elements. + /// - animation: Optional animation used when elements are received. + public func subscribe( to stream: @escaping @Sendable () async -> AsyncStream, on triggerAction: CaseKeyPath, with responseAction: CaseKeyPath, @@ -11,12 +70,154 @@ extension Reducer { .init( parent: self, on: triggerAction, - to: stream, + to: .noState(stream: stream), with: .action(action: AnyCasePath(responseAction), animation: animation), transform: { $0 } ) } - + + /// A higher order reducer for subscribing to an `AsyncStream` from your app. + /// + /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared. + /// + /// Then the `Reducer` on a `task` action we can do something like... + /// ```swift + /// Reduce { state, action in + /// switch action { + /// case.task: + /// return .run { send in + /// for await value in await dependency.stream() { + /// await send(.responseAction(value)) + /// } + /// } + /// } + /// } + /// ``` + /// When you have a lot of publishers/subscribers this gets very repetitive. + /// + /// This gives a new way to subscribe to an async stream using a higher order reducer. + /// + /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way. + /// + /// ## Example + /// + /// In this example, to invoke the stream we need a piece of information on the current `State` of the reducer. + /// + /// ```swift + /// @Reducer + /// struct MyFeature { + /// struct State: Equatable { + /// var number: Int + /// var numberFact: String + /// } + /// + /// enum Action { + /// case receiveNumberFact(String) + /// case task + /// } + /// + /// @Dependency(\.numberFact.stream) var numberFactStream + /// + /// var body: some Reducer { + /// Reduce { state, action in + /// switch action { + /// case let .receiveNumberFact(numberFact): + /// state.numberFact = numberFact + /// return .none + /// case .task: + /// return .none + /// } + /// } + /// .subscribe(using: \.number, to: numberFactStream, on: \.task, with: \.receive) + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - toStreamArgument: The argument used to invoke the stream with. + /// - stream: The async stream to subscribe to on the reducer + /// - triggerAction: The action to invoke the stream when received. + /// - responseAction: The action to invoke with the streamed elements. + /// - animation: Optional animation used when elements are received. + public func subscribe( + using toStreamArgument: @escaping @Sendable (State) -> StreamArgument, + to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream, + on triggerAction: CaseKeyPath, + with responseAction: CaseKeyPath, + animation: Animation? = nil + ) -> _SubscribeReducer { + .init( + parent: self, + on: triggerAction, + to: .state(stream: { await stream(toStreamArgument($0)) }), + with: .action(action: AnyCasePath(responseAction), animation: animation), + transform: { $0 } + ) + } + + /// A higher order reducer for subscribing to an `AsyncStream` from your app. + /// + /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared. + /// + /// Then the `Reducer` on a `task` action we can do something like... + /// ```swift + /// Reduce { state, action in + /// switch action { + /// case.task: + /// return .run { send in + /// for await value in await dependency.stream() { + /// await send(.responseAction(value)) + /// } + /// } + /// } + /// } + /// ``` + /// When you have a lot of publishers/subscribers this gets very repetetive. + /// + /// This gives a new way to subscribe to an async stream using a higher order reducer. + /// + /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way. + /// + /// ## Example + /// + /// In this example, we transform the output of the stream that we subscribe to. + /// + /// ```swift + /// @Reducer + /// struct MyFeature { + /// struct State: Equatable { + /// var numberFact: String? + /// } + /// + /// enum Action { + /// case receiveNumberFact(String) + /// case task + /// } + /// + /// @Dependency(\.numberFact.stream) var numberFactStream + /// + /// var body: some Reducer { + /// Reduce { state, action in + /// switch action { + /// case let .receiveNumberFact(numberFact): + /// state.numberFact = numberFact + /// return .none + /// case .task: + /// return .none + /// } + /// } + /// .subscribe(to: numberFactStream, on: \.task, with: \.receive) { numberFact in + /// "\(numberFact) And my custom transformation" + /// } + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - stream: The async stream to subscribe to on the reducer + /// - triggerAction: The action to invoke the stream when received. + /// - responseAction: The action to invoke with the streamed elements. + /// - animation: Optional animation used when elements are received. public func subscribe( to stream: @escaping @Sendable () async -> AsyncStream, on triggerAction: CaseKeyPath, @@ -27,12 +228,158 @@ extension Reducer { .init( parent: self, on: triggerAction, - to: stream, + to: .noState(stream: stream), with: .action(action: AnyCasePath(responseAction), animation: animation), transform: transform ) } - + + /// A higher order reducer for subscribing to an `AsyncStream` from your app. + /// + /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared. + /// + /// Then the `Reducer` on a `task` action we can do something like... + /// ```swift + /// Reduce { state, action in + /// switch action { + /// case.task: + /// return .run { send in + /// for await value in await dependency.stream() { + /// await send(.responseAction(value)) + /// } + /// } + /// } + /// } + /// ``` + /// When you have a lot of publishers/subscribers this gets very repetitive. + /// + /// This gives a new way to subscribe to an async stream using a higher order reducer. + /// + /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way. + /// + /// ## Example + /// + /// In this example, to invoke the stream we need a piece of information on the current `State` of the reducer. + /// + /// ```swift + /// @Reducer + /// struct MyFeature { + /// struct State: Equatable { + /// var number: Int + /// var numberFact: String + /// } + /// + /// enum Action { + /// case receiveNumberFact(String) + /// case task + /// } + /// + /// @Dependency(\.numberFact.stream) var numberFactStream + /// + /// var body: some Reducer { + /// Reduce { state, action in + /// switch action { + /// case let .receiveNumberFact(numberFact): + /// state.numberFact = numberFact + /// return .none + /// case .task: + /// return .none + /// } + /// } + /// .subscribe(using: \.number, to: numberFactStream, on: \.task, with: \.receive) { numberFact in + /// "\(numberFact) Appended with my custom transformation." + /// } + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - toStreamArgument: The argument used to invoke the stream with. + /// - stream: The async stream to subscribe to on the reducer + /// - triggerAction: The action to invoke the stream when received. + /// - responseAction: The action to invoke with the streamed elements. + /// - animation: Optional animation used when elements are received. + public func subscribe( + using toStreamArgument: @escaping @Sendable (State) -> StreamArgument, + to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream, + on triggerAction: CaseKeyPath, + with responseAction: CaseKeyPath, + animation: Animation? = nil, + transform: @escaping @Sendable (StreamElement) -> Value + ) -> _SubscribeReducer { + .init( + parent: self, + on: triggerAction, + to: .state(stream: { await stream(toStreamArgument($0)) }), + with: .action(action: AnyCasePath(responseAction), animation: animation), + transform: transform + ) + } + + /// A higher order reducer for subscribing to an `AsyncStream` from your app. + /// + /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared. + /// + /// Then the `Reducer` on a `task` action we can do something like... + /// ```swift + /// Reduce { state, action in + /// switch action { + /// case.task: + /// return .run { send in + /// for await value in await dependency.stream() { + /// await send(.responseAction(value)) + /// } + /// } + /// } + /// } + /// ``` + /// When you have a lot of publishers/subscribers this gets very repetetive. + /// + /// This gives a new way to subscribe to an async stream using a higher order reducer. + /// + /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way. + /// + /// ## Example + /// + /// In this example, we use the stream element to also call another operation on an external dependency. + /// + /// ```swift + /// @Reducer + /// struct MyFeature { + /// struct State: Equatable { + /// var numberFact: String? + /// } + /// + /// enum Action { + /// case receiveNumberFact(String) + /// case task + /// } + /// + /// @Dependency(\.numberFact.stream) var numberFactStream + /// + /// var body: some Reducer { + /// Reduce { state, action in + /// switch action { + /// case let .receiveNumberFact(numberFact): + /// state.numberFact = numberFact + /// return .none + /// case .task: + /// return .none + /// } + /// } + /// .subscribe(on: \.task, with: \.receive) { send, numberFact in + /// await send(.receive(numberFact)) + /// await otherDependency.doSomethingElse(with: numberFact) + /// } + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - stream: The async stream to subscribe to on the reducer + /// - triggerAction: The action to invoke the stream when received. + /// - responseAction: The action to invoke with the streamed elements. + /// - animation: Optional animation used when elements are received. public func subscribe( to stream: @escaping @Sendable () async -> AsyncStream, on triggerAction: CaseKeyPath, @@ -41,11 +388,92 @@ extension Reducer { .init( parent: self, on: triggerAction, - to: stream, + to: .noState(stream: stream), with: .operation(f: operation), transform: { $0 } ) } + + /// A higher order reducer for subscribing to an `AsyncStream` from your app. + /// + /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared. + /// + /// Then the `Reducer` on a `task` action we can do something like... + /// ```swift + /// Reduce { state, action in + /// switch action { + /// case.task: + /// return .run { send in + /// for await value in await dependency.stream() { + /// await send(.responseAction(value)) + /// } + /// } + /// } + /// } + /// ``` + /// When you have a lot of publishers/subscribers this gets very repetetive. + /// + /// This gives a new way to subscribe to an async stream using a higher order reducer. + /// + /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way. + /// + /// ## Example + /// + /// In this example, we use the stream element to also call another operation on an external dependency. + /// + /// ```swift + /// @Reducer + /// struct MyFeature { + /// struct State: Equatable { + /// var number: Int + /// var numberFact: String? + /// } + /// + /// enum Action { + /// case receiveNumberFact(String) + /// case task + /// } + /// + /// @Dependency(\.numberFact.stream) var numberFactStream + /// + /// var body: some Reducer { + /// Reduce { state, action in + /// switch action { + /// case let .receiveNumberFact(numberFact): + /// state.numberFact = numberFact + /// return .none + /// case .task: + /// return .none + /// } + /// } + /// .subscribe(using: \.number, on: \.task, with: \.receive) { send, numberFact in + /// await send(.receive(numberFact)) + /// await otherDependency.doSomethingElse(with: numberFact) + /// } + /// } + /// } + /// ``` + /// + /// - Parameters: + /// - toStreamArgument: The argument used to invoke the stream with. + /// - stream: The async stream to subscribe to on the reducer + /// - triggerAction: The action to invoke the stream when received. + /// - responseAction: The action to invoke with the streamed elements. + /// - animation: Optional animation used when elements are received. + public func subscribe( + using toStreamArgument: @escaping @Sendable (State) -> StreamArgument, + to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream, + on triggerAction: CaseKeyPath, + operation: @escaping @Sendable (_ send: Send, StreamElement) async throws -> Void + ) -> _SubscribeReducer { + .init( + parent: self, + on: triggerAction, + to: .state(stream: { await stream(toStreamArgument($0)) }), + with: .operation(f: operation), + transform: { $0 } + ) + } } @usableFromInline @@ -54,6 +482,21 @@ enum Operation { case operation(f: (_ send: Send, Value) async throws -> Void) } +@usableFromInline +enum Stream { + case noState(stream: (@Sendable () async -> AsyncStream)) + case state(stream: (@Sendable (State) async -> AsyncStream)) + + fileprivate func callAsFunction(state: State) async -> AsyncStream { + switch self { + case let .noState(stream: stream): + return await stream() + case let .state(stream: stream): + return await stream(state) + } + } +} + public struct _SubscribeReducer: Reducer { @usableFromInline let parent: Parent @@ -62,7 +505,7 @@ public struct _SubscribeReducer @usableFromInline - let stream: () async -> AsyncStream + let stream: Stream @usableFromInline let operation: Operation @@ -73,7 +516,7 @@ public struct _SubscribeReducer, - to stream: @escaping @Sendable () async -> AsyncStream, + to stream: Stream, with operation: Operation, transform: @escaping @Sendable (StreamElement) -> Value ) { @@ -90,11 +533,11 @@ public struct _SubscribeReducer AsyncStream = { .never } + var numberStreamWithArg: @Sendable (Int) async -> AsyncStream = { _ in .never } +} + +extension NumberClient: TestDependencyKey { + + static var live: NumberClient { + NumberClient( + numberStreamWithoutArg: { + AsyncStream { continuation in + continuation.yield(1) + continuation.finish() + } + }, + numberStreamWithArg: { number in + AsyncStream { continuation in + continuation.yield(number) + continuation.finish() + } + } + ) + } + + static let testValue = Self() +} + +extension DependencyValues { + var numberClient: NumberClient { + get { self[NumberClient.self] } + set { self[NumberClient.self] = newValue } + } +} + +struct NumberState: Equatable { + var number: Int + var currentNumber: Int? +} + +@CasePathable +enum NumberAction { + case receive(Int) + case task +} + +@Reducer +struct ReducerWithArg { + + typealias State = NumberState + typealias Action = NumberAction + + @Dependency(\.numberClient) var numberClient + + var body: some Reducer { + Reduce { state, action in + switch action { + case let .receive(number): + state.currentNumber = number + return .none + case .task: + return .none + } + } + .subscribe( + using: \.number, + to: numberClient.numberStreamWithArg, + on: \.task, + with: \.receive + ) + } +} + +@Reducer +struct ReducerWithTransform { + + typealias State = NumberState + typealias Action = NumberAction + + @Dependency(\.numberClient) var numberClient + + var body: some Reducer { + Reduce { state, action in + switch action { + case let .receive(number): + state.currentNumber = number + return .none + case .task: + return .none + } + } + .subscribe( + using: \.number, + to: numberClient.numberStreamWithArg, + on: \.task, + with: \.receive + ) { + $0 * 2 + } + } +} + +@MainActor +final class swift_composable_subscriberTests: XCTestCase { + + func testSubscribeWithArg() async throws { + let store = TestStore( + initialState: ReducerWithArg.State(number: 19), + reducer: ReducerWithArg.init + ) { + $0.numberClient = .live + } + + let task = await store.send(.task) + await store.receive(\.receive) { + $0.currentNumber = 19 + } + + await task.cancel() + await store.finish() + } + + func testSubscribeWithArgAndTransform() async throws { + let store = TestStore( + initialState: ReducerWithTransform.State(number: 10), + reducer: ReducerWithTransform.init + ) { + $0.numberClient = .live + } + + let task = await store.send(.task) + await store.receive(\.receive) { + $0.currentNumber = 20 + } + + await task.cancel() + await store.finish() + } + }