diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/swift-composable-subscriber.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/swift-composable-subscriber.xcscheme
new file mode 100644
index 0000000..70249e8
--- /dev/null
+++ b/.swiftpm/xcode/xcshareddata/xcschemes/swift-composable-subscriber.xcscheme
@@ -0,0 +1,78 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Package.swift b/Package.swift
index 17472e4..bd17220 100644
--- a/Package.swift
+++ b/Package.swift
@@ -24,5 +24,9 @@ let package = Package(
.product(name: "ComposableArchitecture", package: "swift-composable-architecture"),
]
),
+ .testTarget(
+ name: "swift-composable-subscriberTests",
+ dependencies: ["ComposableSubscriber"]
+ )
]
)
diff --git a/Sources/ComposableSubscriber/SubscriberReducer.swift b/Sources/ComposableSubscriber/SubscriberReducer.swift
index 00db96b..c451d5b 100644
--- a/Sources/ComposableSubscriber/SubscriberReducer.swift
+++ b/Sources/ComposableSubscriber/SubscriberReducer.swift
@@ -2,7 +2,66 @@ import ComposableArchitecture
import SwiftUI
extension Reducer {
- public func subscribe(
+ /// A higher order reducer for subscribing to an `AsyncStream` from your app.
+ ///
+ /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
+ ///
+ /// Then the `Reducer` on a `task` action we can do something like...
+ /// ```swift
+ /// Reduce { state, action in
+ /// switch action {
+ /// case.task:
+ /// return .run { send in
+ /// for await value in await dependency.stream() {
+ /// await send(.responseAction(value))
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// When you have a lot of publishers/subscribers this gets very repetetive.
+ ///
+ /// This gives a new way to subscribe to an async stream using a higher order reducer.
+ ///
+ /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
+ ///
+ /// ## Example
+ ///
+ /// ```swift
+ /// @Reducer
+ /// struct MyFeature {
+ /// struct State: Equatable {
+ /// var numberFact: String?
+ /// }
+ ///
+ /// enum Action {
+ /// case receiveNumberFact(String)
+ /// case task
+ /// }
+ ///
+ /// @Dependency(\.numberFact.stream) var numberFactStream
+ ///
+ /// var body: some Reducer {
+ /// Reduce { state, action in
+ /// switch action {
+ /// case let .receiveNumberFact(numberFact):
+ /// state.numberFact = numberFact
+ /// return .none
+ /// case .task:
+ /// return .none
+ /// }
+ /// }
+ /// .subscribe(to: numberFactStream, on: \.task, with: \.receive)
+ /// }
+ /// }
+ /// ```
+ ///
+ /// - Parameters:
+ /// - stream: The async stream to subscribe to on the reducer
+ /// - triggerAction: The action to invoke the stream when received.
+ /// - responseAction: The action to invoke with the streamed elements.
+ /// - animation: Optional animation used when elements are received.
+ public func subscribe(
to stream: @escaping @Sendable () async -> AsyncStream,
on triggerAction: CaseKeyPath,
with responseAction: CaseKeyPath,
@@ -11,12 +70,154 @@ extension Reducer {
.init(
parent: self,
on: triggerAction,
- to: stream,
+ to: .noState(stream: stream),
with: .action(action: AnyCasePath(responseAction), animation: animation),
transform: { $0 }
)
}
-
+
+ /// A higher order reducer for subscribing to an `AsyncStream` from your app.
+ ///
+ /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
+ ///
+ /// Then the `Reducer` on a `task` action we can do something like...
+ /// ```swift
+ /// Reduce { state, action in
+ /// switch action {
+ /// case.task:
+ /// return .run { send in
+ /// for await value in await dependency.stream() {
+ /// await send(.responseAction(value))
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// When you have a lot of publishers/subscribers this gets very repetitive.
+ ///
+ /// This gives a new way to subscribe to an async stream using a higher order reducer.
+ ///
+ /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
+ ///
+ /// ## Example
+ ///
+ /// In this example, to invoke the stream we need a piece of information on the current `State` of the reducer.
+ ///
+ /// ```swift
+ /// @Reducer
+ /// struct MyFeature {
+ /// struct State: Equatable {
+ /// var number: Int
+ /// var numberFact: String
+ /// }
+ ///
+ /// enum Action {
+ /// case receiveNumberFact(String)
+ /// case task
+ /// }
+ ///
+ /// @Dependency(\.numberFact.stream) var numberFactStream
+ ///
+ /// var body: some Reducer {
+ /// Reduce { state, action in
+ /// switch action {
+ /// case let .receiveNumberFact(numberFact):
+ /// state.numberFact = numberFact
+ /// return .none
+ /// case .task:
+ /// return .none
+ /// }
+ /// }
+ /// .subscribe(using: \.number, to: numberFactStream, on: \.task, with: \.receive)
+ /// }
+ /// }
+ /// ```
+ ///
+ /// - Parameters:
+ /// - toStreamArgument: The argument used to invoke the stream with.
+ /// - stream: The async stream to subscribe to on the reducer
+ /// - triggerAction: The action to invoke the stream when received.
+ /// - responseAction: The action to invoke with the streamed elements.
+ /// - animation: Optional animation used when elements are received.
+ public func subscribe(
+ using toStreamArgument: @escaping @Sendable (State) -> StreamArgument,
+ to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream,
+ on triggerAction: CaseKeyPath,
+ with responseAction: CaseKeyPath,
+ animation: Animation? = nil
+ ) -> _SubscribeReducer {
+ .init(
+ parent: self,
+ on: triggerAction,
+ to: .state(stream: { await stream(toStreamArgument($0)) }),
+ with: .action(action: AnyCasePath(responseAction), animation: animation),
+ transform: { $0 }
+ )
+ }
+
+ /// A higher order reducer for subscribing to an `AsyncStream` from your app.
+ ///
+ /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
+ ///
+ /// Then the `Reducer` on a `task` action we can do something like...
+ /// ```swift
+ /// Reduce { state, action in
+ /// switch action {
+ /// case.task:
+ /// return .run { send in
+ /// for await value in await dependency.stream() {
+ /// await send(.responseAction(value))
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// When you have a lot of publishers/subscribers this gets very repetetive.
+ ///
+ /// This gives a new way to subscribe to an async stream using a higher order reducer.
+ ///
+ /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
+ ///
+ /// ## Example
+ ///
+ /// In this example, we transform the output of the stream that we subscribe to.
+ ///
+ /// ```swift
+ /// @Reducer
+ /// struct MyFeature {
+ /// struct State: Equatable {
+ /// var numberFact: String?
+ /// }
+ ///
+ /// enum Action {
+ /// case receiveNumberFact(String)
+ /// case task
+ /// }
+ ///
+ /// @Dependency(\.numberFact.stream) var numberFactStream
+ ///
+ /// var body: some Reducer {
+ /// Reduce { state, action in
+ /// switch action {
+ /// case let .receiveNumberFact(numberFact):
+ /// state.numberFact = numberFact
+ /// return .none
+ /// case .task:
+ /// return .none
+ /// }
+ /// }
+ /// .subscribe(to: numberFactStream, on: \.task, with: \.receive) { numberFact in
+ /// "\(numberFact) And my custom transformation"
+ /// }
+ /// }
+ /// }
+ /// ```
+ ///
+ /// - Parameters:
+ /// - stream: The async stream to subscribe to on the reducer
+ /// - triggerAction: The action to invoke the stream when received.
+ /// - responseAction: The action to invoke with the streamed elements.
+ /// - animation: Optional animation used when elements are received.
public func subscribe(
to stream: @escaping @Sendable () async -> AsyncStream,
on triggerAction: CaseKeyPath,
@@ -27,12 +228,158 @@ extension Reducer {
.init(
parent: self,
on: triggerAction,
- to: stream,
+ to: .noState(stream: stream),
with: .action(action: AnyCasePath(responseAction), animation: animation),
transform: transform
)
}
-
+
+ /// A higher order reducer for subscribing to an `AsyncStream` from your app.
+ ///
+ /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
+ ///
+ /// Then the `Reducer` on a `task` action we can do something like...
+ /// ```swift
+ /// Reduce { state, action in
+ /// switch action {
+ /// case.task:
+ /// return .run { send in
+ /// for await value in await dependency.stream() {
+ /// await send(.responseAction(value))
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// When you have a lot of publishers/subscribers this gets very repetitive.
+ ///
+ /// This gives a new way to subscribe to an async stream using a higher order reducer.
+ ///
+ /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
+ ///
+ /// ## Example
+ ///
+ /// In this example, to invoke the stream we need a piece of information on the current `State` of the reducer.
+ ///
+ /// ```swift
+ /// @Reducer
+ /// struct MyFeature {
+ /// struct State: Equatable {
+ /// var number: Int
+ /// var numberFact: String
+ /// }
+ ///
+ /// enum Action {
+ /// case receiveNumberFact(String)
+ /// case task
+ /// }
+ ///
+ /// @Dependency(\.numberFact.stream) var numberFactStream
+ ///
+ /// var body: some Reducer {
+ /// Reduce { state, action in
+ /// switch action {
+ /// case let .receiveNumberFact(numberFact):
+ /// state.numberFact = numberFact
+ /// return .none
+ /// case .task:
+ /// return .none
+ /// }
+ /// }
+ /// .subscribe(using: \.number, to: numberFactStream, on: \.task, with: \.receive) { numberFact in
+ /// "\(numberFact) Appended with my custom transformation."
+ /// }
+ /// }
+ /// }
+ /// ```
+ ///
+ /// - Parameters:
+ /// - toStreamArgument: The argument used to invoke the stream with.
+ /// - stream: The async stream to subscribe to on the reducer
+ /// - triggerAction: The action to invoke the stream when received.
+ /// - responseAction: The action to invoke with the streamed elements.
+ /// - animation: Optional animation used when elements are received.
+ public func subscribe(
+ using toStreamArgument: @escaping @Sendable (State) -> StreamArgument,
+ to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream,
+ on triggerAction: CaseKeyPath,
+ with responseAction: CaseKeyPath,
+ animation: Animation? = nil,
+ transform: @escaping @Sendable (StreamElement) -> Value
+ ) -> _SubscribeReducer {
+ .init(
+ parent: self,
+ on: triggerAction,
+ to: .state(stream: { await stream(toStreamArgument($0)) }),
+ with: .action(action: AnyCasePath(responseAction), animation: animation),
+ transform: transform
+ )
+ }
+
+ /// A higher order reducer for subscribing to an `AsyncStream` from your app.
+ ///
+ /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
+ ///
+ /// Then the `Reducer` on a `task` action we can do something like...
+ /// ```swift
+ /// Reduce { state, action in
+ /// switch action {
+ /// case.task:
+ /// return .run { send in
+ /// for await value in await dependency.stream() {
+ /// await send(.responseAction(value))
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// When you have a lot of publishers/subscribers this gets very repetetive.
+ ///
+ /// This gives a new way to subscribe to an async stream using a higher order reducer.
+ ///
+ /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
+ ///
+ /// ## Example
+ ///
+ /// In this example, we use the stream element to also call another operation on an external dependency.
+ ///
+ /// ```swift
+ /// @Reducer
+ /// struct MyFeature {
+ /// struct State: Equatable {
+ /// var numberFact: String?
+ /// }
+ ///
+ /// enum Action {
+ /// case receiveNumberFact(String)
+ /// case task
+ /// }
+ ///
+ /// @Dependency(\.numberFact.stream) var numberFactStream
+ ///
+ /// var body: some Reducer {
+ /// Reduce { state, action in
+ /// switch action {
+ /// case let .receiveNumberFact(numberFact):
+ /// state.numberFact = numberFact
+ /// return .none
+ /// case .task:
+ /// return .none
+ /// }
+ /// }
+ /// .subscribe(on: \.task, with: \.receive) { send, numberFact in
+ /// await send(.receive(numberFact))
+ /// await otherDependency.doSomethingElse(with: numberFact)
+ /// }
+ /// }
+ /// }
+ /// ```
+ ///
+ /// - Parameters:
+ /// - stream: The async stream to subscribe to on the reducer
+ /// - triggerAction: The action to invoke the stream when received.
+ /// - responseAction: The action to invoke with the streamed elements.
+ /// - animation: Optional animation used when elements are received.
public func subscribe(
to stream: @escaping @Sendable () async -> AsyncStream,
on triggerAction: CaseKeyPath,
@@ -41,11 +388,92 @@ extension Reducer {
.init(
parent: self,
on: triggerAction,
- to: stream,
+ to: .noState(stream: stream),
with: .operation(f: operation),
transform: { $0 }
)
}
+
+ /// A higher order reducer for subscribing to an `AsyncStream` from your app.
+ ///
+ /// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
+ ///
+ /// Then the `Reducer` on a `task` action we can do something like...
+ /// ```swift
+ /// Reduce { state, action in
+ /// switch action {
+ /// case.task:
+ /// return .run { send in
+ /// for await value in await dependency.stream() {
+ /// await send(.responseAction(value))
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ /// When you have a lot of publishers/subscribers this gets very repetetive.
+ ///
+ /// This gives a new way to subscribe to an async stream using a higher order reducer.
+ ///
+ /// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
+ ///
+ /// ## Example
+ ///
+ /// In this example, we use the stream element to also call another operation on an external dependency.
+ ///
+ /// ```swift
+ /// @Reducer
+ /// struct MyFeature {
+ /// struct State: Equatable {
+ /// var number: Int
+ /// var numberFact: String?
+ /// }
+ ///
+ /// enum Action {
+ /// case receiveNumberFact(String)
+ /// case task
+ /// }
+ ///
+ /// @Dependency(\.numberFact.stream) var numberFactStream
+ ///
+ /// var body: some Reducer {
+ /// Reduce { state, action in
+ /// switch action {
+ /// case let .receiveNumberFact(numberFact):
+ /// state.numberFact = numberFact
+ /// return .none
+ /// case .task:
+ /// return .none
+ /// }
+ /// }
+ /// .subscribe(using: \.number, on: \.task, with: \.receive) { send, numberFact in
+ /// await send(.receive(numberFact))
+ /// await otherDependency.doSomethingElse(with: numberFact)
+ /// }
+ /// }
+ /// }
+ /// ```
+ ///
+ /// - Parameters:
+ /// - toStreamArgument: The argument used to invoke the stream with.
+ /// - stream: The async stream to subscribe to on the reducer
+ /// - triggerAction: The action to invoke the stream when received.
+ /// - responseAction: The action to invoke with the streamed elements.
+ /// - animation: Optional animation used when elements are received.
+ public func subscribe(
+ using toStreamArgument: @escaping @Sendable (State) -> StreamArgument,
+ to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream,
+ on triggerAction: CaseKeyPath,
+ operation: @escaping @Sendable (_ send: Send, StreamElement) async throws -> Void
+ ) -> _SubscribeReducer {
+ .init(
+ parent: self,
+ on: triggerAction,
+ to: .state(stream: { await stream(toStreamArgument($0)) }),
+ with: .operation(f: operation),
+ transform: { $0 }
+ )
+ }
}
@usableFromInline
@@ -54,6 +482,21 @@ enum Operation {
case operation(f: (_ send: Send, Value) async throws -> Void)
}
+@usableFromInline
+enum Stream {
+ case noState(stream: (@Sendable () async -> AsyncStream))
+ case state(stream: (@Sendable (State) async -> AsyncStream))
+
+ fileprivate func callAsFunction(state: State) async -> AsyncStream {
+ switch self {
+ case let .noState(stream: stream):
+ return await stream()
+ case let .state(stream: stream):
+ return await stream(state)
+ }
+ }
+}
+
public struct _SubscribeReducer: Reducer {
@usableFromInline
let parent: Parent
@@ -62,7 +505,7 @@ public struct _SubscribeReducer
@usableFromInline
- let stream: () async -> AsyncStream
+ let stream: Stream
@usableFromInline
let operation: Operation
@@ -73,7 +516,7 @@ public struct _SubscribeReducer,
- to stream: @escaping @Sendable () async -> AsyncStream,
+ to stream: Stream,
with operation: Operation,
transform: @escaping @Sendable (StreamElement) -> Value
) {
@@ -90,11 +533,11 @@ public struct _SubscribeReducer AsyncStream = { .never }
+ var numberStreamWithArg: @Sendable (Int) async -> AsyncStream = { _ in .never }
+}
+
+extension NumberClient: TestDependencyKey {
+
+ static var live: NumberClient {
+ NumberClient(
+ numberStreamWithoutArg: {
+ AsyncStream { continuation in
+ continuation.yield(1)
+ continuation.finish()
+ }
+ },
+ numberStreamWithArg: { number in
+ AsyncStream { continuation in
+ continuation.yield(number)
+ continuation.finish()
+ }
+ }
+ )
+ }
+
+ static let testValue = Self()
+}
+
+extension DependencyValues {
+ var numberClient: NumberClient {
+ get { self[NumberClient.self] }
+ set { self[NumberClient.self] = newValue }
+ }
+}
+
+struct NumberState: Equatable {
+ var number: Int
+ var currentNumber: Int?
+}
+
+@CasePathable
+enum NumberAction {
+ case receive(Int)
+ case task
+}
+
+@Reducer
+struct ReducerWithArg {
+
+ typealias State = NumberState
+ typealias Action = NumberAction
+
+ @Dependency(\.numberClient) var numberClient
+
+ var body: some Reducer {
+ Reduce { state, action in
+ switch action {
+ case let .receive(number):
+ state.currentNumber = number
+ return .none
+ case .task:
+ return .none
+ }
+ }
+ .subscribe(
+ using: \.number,
+ to: numberClient.numberStreamWithArg,
+ on: \.task,
+ with: \.receive
+ )
+ }
+}
+
+@Reducer
+struct ReducerWithTransform {
+
+ typealias State = NumberState
+ typealias Action = NumberAction
+
+ @Dependency(\.numberClient) var numberClient
+
+ var body: some Reducer {
+ Reduce { state, action in
+ switch action {
+ case let .receive(number):
+ state.currentNumber = number
+ return .none
+ case .task:
+ return .none
+ }
+ }
+ .subscribe(
+ using: \.number,
+ to: numberClient.numberStreamWithArg,
+ on: \.task,
+ with: \.receive
+ ) {
+ $0 * 2
+ }
+ }
+}
+
+@MainActor
+final class swift_composable_subscriberTests: XCTestCase {
+
+ func testSubscribeWithArg() async throws {
+ let store = TestStore(
+ initialState: ReducerWithArg.State(number: 19),
+ reducer: ReducerWithArg.init
+ ) {
+ $0.numberClient = .live
+ }
+
+ let task = await store.send(.task)
+ await store.receive(\.receive) {
+ $0.currentNumber = 19
+ }
+
+ await task.cancel()
+ await store.finish()
+ }
+
+ func testSubscribeWithArgAndTransform() async throws {
+ let store = TestStore(
+ initialState: ReducerWithTransform.State(number: 10),
+ reducer: ReducerWithTransform.init
+ ) {
+ $0.numberClient = .live
+ }
+
+ let task = await store.send(.task)
+ await store.receive(\.receive) {
+ $0.currentNumber = 20
+ }
+
+ await task.cancel()
+ await store.finish()
+ }
+
}