feat: Adds overloads for invoking stream's that require an argument from the current reducer's state.

This commit is contained in:
2024-02-24 14:51:35 -05:00
parent 7ee77bd6b7
commit 632e3eb9fc
4 changed files with 677 additions and 20 deletions

View File

@@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1520"
version = "1.7">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ComposableSubscriber"
BuildableName = "ComposableSubscriber"
BlueprintName = "ComposableSubscriber"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES"
shouldAutocreateTestPlan = "YES">
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "swift-composable-subscriberTests"
BuildableName = "swift-composable-subscriberTests"
BlueprintName = "swift-composable-subscriberTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ComposableSubscriber"
BuildableName = "ComposableSubscriber"
BlueprintName = "ComposableSubscriber"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -24,5 +24,9 @@ let package = Package(
.product(name: "ComposableArchitecture", package: "swift-composable-architecture"), .product(name: "ComposableArchitecture", package: "swift-composable-architecture"),
] ]
), ),
.testTarget(
name: "swift-composable-subscriberTests",
dependencies: ["ComposableSubscriber"]
)
] ]
) )

View File

@@ -2,6 +2,65 @@ import ComposableArchitecture
import SwiftUI import SwiftUI
extension Reducer { extension Reducer {
/// A higher order reducer for subscribing to an `AsyncStream` from your app.
///
/// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
///
/// Then the `Reducer` on a `task` action we can do something like...
/// ```swift
/// Reduce<State, Action> { state, action in
/// switch action {
/// case.task:
/// return .run { send in
/// for await value in await dependency.stream() {
/// await send(.responseAction(value))
/// }
/// }
/// }
/// }
/// ```
/// When you have a lot of publishers/subscribers this gets very repetetive.
///
/// This gives a new way to subscribe to an async stream using a higher order reducer.
///
/// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
///
/// ## Example
///
/// ```swift
/// @Reducer
/// struct MyFeature {
/// struct State: Equatable {
/// var numberFact: String?
/// }
///
/// enum Action {
/// case receiveNumberFact(String)
/// case task
/// }
///
/// @Dependency(\.numberFact.stream) var numberFactStream
///
/// var body: some Reducer<State, Action> {
/// Reduce<State, Action> { state, action in
/// switch action {
/// case let .receiveNumberFact(numberFact):
/// state.numberFact = numberFact
/// return .none
/// case .task:
/// return .none
/// }
/// }
/// .subscribe(to: numberFactStream, on: \.task, with: \.receive)
/// }
/// }
/// ```
///
/// - Parameters:
/// - stream: The async stream to subscribe to on the reducer
/// - triggerAction: The action to invoke the stream when received.
/// - responseAction: The action to invoke with the streamed elements.
/// - animation: Optional animation used when elements are received.
public func subscribe<TriggerAction, StreamElement>( public func subscribe<TriggerAction, StreamElement>(
to stream: @escaping @Sendable () async -> AsyncStream<StreamElement>, to stream: @escaping @Sendable () async -> AsyncStream<StreamElement>,
on triggerAction: CaseKeyPath<Action, TriggerAction>, on triggerAction: CaseKeyPath<Action, TriggerAction>,
@@ -11,12 +70,154 @@ extension Reducer {
.init( .init(
parent: self, parent: self,
on: triggerAction, on: triggerAction,
to: stream, to: .noState(stream: stream),
with: .action(action: AnyCasePath(responseAction), animation: animation), with: .action(action: AnyCasePath(responseAction), animation: animation),
transform: { $0 } transform: { $0 }
) )
} }
/// A higher order reducer for subscribing to an `AsyncStream` from your app.
///
/// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
///
/// Then the `Reducer` on a `task` action we can do something like...
/// ```swift
/// Reduce<State, Action> { state, action in
/// switch action {
/// case.task:
/// return .run { send in
/// for await value in await dependency.stream() {
/// await send(.responseAction(value))
/// }
/// }
/// }
/// }
/// ```
/// When you have a lot of publishers/subscribers this gets very repetitive.
///
/// This gives a new way to subscribe to an async stream using a higher order reducer.
///
/// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
///
/// ## Example
///
/// In this example, to invoke the stream we need a piece of information on the current `State` of the reducer.
///
/// ```swift
/// @Reducer
/// struct MyFeature {
/// struct State: Equatable {
/// var number: Int
/// var numberFact: String
/// }
///
/// enum Action {
/// case receiveNumberFact(String)
/// case task
/// }
///
/// @Dependency(\.numberFact.stream) var numberFactStream
///
/// var body: some Reducer<State, Action> {
/// Reduce<State, Action> { state, action in
/// switch action {
/// case let .receiveNumberFact(numberFact):
/// state.numberFact = numberFact
/// return .none
/// case .task:
/// return .none
/// }
/// }
/// .subscribe(using: \.number, to: numberFactStream, on: \.task, with: \.receive)
/// }
/// }
/// ```
///
/// - Parameters:
/// - toStreamArgument: The argument used to invoke the stream with.
/// - stream: The async stream to subscribe to on the reducer
/// - triggerAction: The action to invoke the stream when received.
/// - responseAction: The action to invoke with the streamed elements.
/// - animation: Optional animation used when elements are received.
public func subscribe<TriggerAction, StreamElement, StreamArgument>(
using toStreamArgument: @escaping @Sendable (State) -> StreamArgument,
to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream<StreamElement>,
on triggerAction: CaseKeyPath<Action, TriggerAction>,
with responseAction: CaseKeyPath<Action, StreamElement>,
animation: Animation? = nil
) -> _SubscribeReducer<Self, TriggerAction, StreamElement, StreamElement> {
.init(
parent: self,
on: triggerAction,
to: .state(stream: { await stream(toStreamArgument($0)) }),
with: .action(action: AnyCasePath(responseAction), animation: animation),
transform: { $0 }
)
}
/// A higher order reducer for subscribing to an `AsyncStream` from your app.
///
/// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
///
/// Then the `Reducer` on a `task` action we can do something like...
/// ```swift
/// Reduce<State, Action> { state, action in
/// switch action {
/// case.task:
/// return .run { send in
/// for await value in await dependency.stream() {
/// await send(.responseAction(value))
/// }
/// }
/// }
/// }
/// ```
/// When you have a lot of publishers/subscribers this gets very repetetive.
///
/// This gives a new way to subscribe to an async stream using a higher order reducer.
///
/// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
///
/// ## Example
///
/// In this example, we transform the output of the stream that we subscribe to.
///
/// ```swift
/// @Reducer
/// struct MyFeature {
/// struct State: Equatable {
/// var numberFact: String?
/// }
///
/// enum Action {
/// case receiveNumberFact(String)
/// case task
/// }
///
/// @Dependency(\.numberFact.stream) var numberFactStream
///
/// var body: some Reducer<State, Action> {
/// Reduce<State, Action> { state, action in
/// switch action {
/// case let .receiveNumberFact(numberFact):
/// state.numberFact = numberFact
/// return .none
/// case .task:
/// return .none
/// }
/// }
/// .subscribe(to: numberFactStream, on: \.task, with: \.receive) { numberFact in
/// "\(numberFact) And my custom transformation"
/// }
/// }
/// }
/// ```
///
/// - Parameters:
/// - stream: The async stream to subscribe to on the reducer
/// - triggerAction: The action to invoke the stream when received.
/// - responseAction: The action to invoke with the streamed elements.
/// - animation: Optional animation used when elements are received.
public func subscribe<TriggerAction, StreamElement, Value>( public func subscribe<TriggerAction, StreamElement, Value>(
to stream: @escaping @Sendable () async -> AsyncStream<StreamElement>, to stream: @escaping @Sendable () async -> AsyncStream<StreamElement>,
on triggerAction: CaseKeyPath<Action, TriggerAction>, on triggerAction: CaseKeyPath<Action, TriggerAction>,
@@ -27,12 +228,158 @@ extension Reducer {
.init( .init(
parent: self, parent: self,
on: triggerAction, on: triggerAction,
to: stream, to: .noState(stream: stream),
with: .action(action: AnyCasePath(responseAction), animation: animation), with: .action(action: AnyCasePath(responseAction), animation: animation),
transform: transform transform: transform
) )
} }
/// A higher order reducer for subscribing to an `AsyncStream` from your app.
///
/// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
///
/// Then the `Reducer` on a `task` action we can do something like...
/// ```swift
/// Reduce<State, Action> { state, action in
/// switch action {
/// case.task:
/// return .run { send in
/// for await value in await dependency.stream() {
/// await send(.responseAction(value))
/// }
/// }
/// }
/// }
/// ```
/// When you have a lot of publishers/subscribers this gets very repetitive.
///
/// This gives a new way to subscribe to an async stream using a higher order reducer.
///
/// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
///
/// ## Example
///
/// In this example, to invoke the stream we need a piece of information on the current `State` of the reducer.
///
/// ```swift
/// @Reducer
/// struct MyFeature {
/// struct State: Equatable {
/// var number: Int
/// var numberFact: String
/// }
///
/// enum Action {
/// case receiveNumberFact(String)
/// case task
/// }
///
/// @Dependency(\.numberFact.stream) var numberFactStream
///
/// var body: some Reducer<State, Action> {
/// Reduce<State, Action> { state, action in
/// switch action {
/// case let .receiveNumberFact(numberFact):
/// state.numberFact = numberFact
/// return .none
/// case .task:
/// return .none
/// }
/// }
/// .subscribe(using: \.number, to: numberFactStream, on: \.task, with: \.receive) { numberFact in
/// "\(numberFact) Appended with my custom transformation."
/// }
/// }
/// }
/// ```
///
/// - Parameters:
/// - toStreamArgument: The argument used to invoke the stream with.
/// - stream: The async stream to subscribe to on the reducer
/// - triggerAction: The action to invoke the stream when received.
/// - responseAction: The action to invoke with the streamed elements.
/// - animation: Optional animation used when elements are received.
public func subscribe<TriggerAction, StreamElement, Value, StreamArgument>(
using toStreamArgument: @escaping @Sendable (State) -> StreamArgument,
to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream<StreamElement>,
on triggerAction: CaseKeyPath<Action, TriggerAction>,
with responseAction: CaseKeyPath<Action, Value>,
animation: Animation? = nil,
transform: @escaping @Sendable (StreamElement) -> Value
) -> _SubscribeReducer<Self, TriggerAction, StreamElement, Value> {
.init(
parent: self,
on: triggerAction,
to: .state(stream: { await stream(toStreamArgument($0)) }),
with: .action(action: AnyCasePath(responseAction), animation: animation),
transform: transform
)
}
/// A higher order reducer for subscribing to an `AsyncStream` from your app.
///
/// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
///
/// Then the `Reducer` on a `task` action we can do something like...
/// ```swift
/// Reduce<State, Action> { state, action in
/// switch action {
/// case.task:
/// return .run { send in
/// for await value in await dependency.stream() {
/// await send(.responseAction(value))
/// }
/// }
/// }
/// }
/// ```
/// When you have a lot of publishers/subscribers this gets very repetetive.
///
/// This gives a new way to subscribe to an async stream using a higher order reducer.
///
/// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
///
/// ## Example
///
/// In this example, we use the stream element to also call another operation on an external dependency.
///
/// ```swift
/// @Reducer
/// struct MyFeature {
/// struct State: Equatable {
/// var numberFact: String?
/// }
///
/// enum Action {
/// case receiveNumberFact(String)
/// case task
/// }
///
/// @Dependency(\.numberFact.stream) var numberFactStream
///
/// var body: some Reducer<State, Action> {
/// Reduce<State, Action> { state, action in
/// switch action {
/// case let .receiveNumberFact(numberFact):
/// state.numberFact = numberFact
/// return .none
/// case .task:
/// return .none
/// }
/// }
/// .subscribe(on: \.task, with: \.receive) { send, numberFact in
/// await send(.receive(numberFact))
/// await otherDependency.doSomethingElse(with: numberFact)
/// }
/// }
/// }
/// ```
///
/// - Parameters:
/// - stream: The async stream to subscribe to on the reducer
/// - triggerAction: The action to invoke the stream when received.
/// - responseAction: The action to invoke with the streamed elements.
/// - animation: Optional animation used when elements are received.
public func subscribe<TriggerAction, StreamElement>( public func subscribe<TriggerAction, StreamElement>(
to stream: @escaping @Sendable () async -> AsyncStream<StreamElement>, to stream: @escaping @Sendable () async -> AsyncStream<StreamElement>,
on triggerAction: CaseKeyPath<Action, TriggerAction>, on triggerAction: CaseKeyPath<Action, TriggerAction>,
@@ -41,7 +388,88 @@ extension Reducer {
.init( .init(
parent: self, parent: self,
on: triggerAction, on: triggerAction,
to: stream, to: .noState(stream: stream),
with: .operation(f: operation),
transform: { $0 }
)
}
/// A higher order reducer for subscribing to an `AsyncStream` from your app.
///
/// A common pattern in our app for shared data is to create a dependency that exposes an `AsyncStream` of data that is shared.
///
/// Then the `Reducer` on a `task` action we can do something like...
/// ```swift
/// Reduce<State, Action> { state, action in
/// switch action {
/// case.task:
/// return .run { send in
/// for await value in await dependency.stream() {
/// await send(.responseAction(value))
/// }
/// }
/// }
/// }
/// ```
/// When you have a lot of publishers/subscribers this gets very repetetive.
///
/// This gives a new way to subscribe to an async stream using a higher order reducer.
///
/// Any dependency that returns an `AsyncStream` can be subscribed to in the following way.
///
/// ## Example
///
/// In this example, we use the stream element to also call another operation on an external dependency.
///
/// ```swift
/// @Reducer
/// struct MyFeature {
/// struct State: Equatable {
/// var number: Int
/// var numberFact: String?
/// }
///
/// enum Action {
/// case receiveNumberFact(String)
/// case task
/// }
///
/// @Dependency(\.numberFact.stream) var numberFactStream
///
/// var body: some Reducer<State, Action> {
/// Reduce<State, Action> { state, action in
/// switch action {
/// case let .receiveNumberFact(numberFact):
/// state.numberFact = numberFact
/// return .none
/// case .task:
/// return .none
/// }
/// }
/// .subscribe(using: \.number, on: \.task, with: \.receive) { send, numberFact in
/// await send(.receive(numberFact))
/// await otherDependency.doSomethingElse(with: numberFact)
/// }
/// }
/// }
/// ```
///
/// - Parameters:
/// - toStreamArgument: The argument used to invoke the stream with.
/// - stream: The async stream to subscribe to on the reducer
/// - triggerAction: The action to invoke the stream when received.
/// - responseAction: The action to invoke with the streamed elements.
/// - animation: Optional animation used when elements are received.
public func subscribe<TriggerAction, StreamElement, StreamArgument>(
using toStreamArgument: @escaping @Sendable (State) -> StreamArgument,
to stream: @escaping @Sendable (StreamArgument) async -> AsyncStream<StreamElement>,
on triggerAction: CaseKeyPath<Action, TriggerAction>,
operation: @escaping @Sendable (_ send: Send<Action>, StreamElement) async throws -> Void
) -> _SubscribeReducer<Self, TriggerAction, StreamElement, StreamElement> {
.init(
parent: self,
on: triggerAction,
to: .state(stream: { await stream(toStreamArgument($0)) }),
with: .operation(f: operation), with: .operation(f: operation),
transform: { $0 } transform: { $0 }
) )
@@ -54,6 +482,21 @@ enum Operation<Action, Value> {
case operation(f: (_ send: Send<Action>, Value) async throws -> Void) case operation(f: (_ send: Send<Action>, Value) async throws -> Void)
} }
@usableFromInline
enum Stream<State, Value> {
case noState(stream: (@Sendable () async -> AsyncStream<Value>))
case state(stream: (@Sendable (State) async -> AsyncStream<Value>))
fileprivate func callAsFunction(state: State) async -> AsyncStream<Value> {
switch self {
case let .noState(stream: stream):
return await stream()
case let .state(stream: stream):
return await stream(state)
}
}
}
public struct _SubscribeReducer<Parent: Reducer, TriggerAction, StreamElement, Value>: Reducer { public struct _SubscribeReducer<Parent: Reducer, TriggerAction, StreamElement, Value>: Reducer {
@usableFromInline @usableFromInline
let parent: Parent let parent: Parent
@@ -62,7 +505,7 @@ public struct _SubscribeReducer<Parent: Reducer, TriggerAction, StreamElement, V
let triggerAction: AnyCasePath<Parent.Action, TriggerAction> let triggerAction: AnyCasePath<Parent.Action, TriggerAction>
@usableFromInline @usableFromInline
let stream: () async -> AsyncStream<StreamElement> let stream: Stream<Parent.State, StreamElement>
@usableFromInline @usableFromInline
let operation: Operation<Parent.Action, Value> let operation: Operation<Parent.Action, Value>
@@ -73,7 +516,7 @@ public struct _SubscribeReducer<Parent: Reducer, TriggerAction, StreamElement, V
init( init(
parent: Parent, parent: Parent,
on triggerAction: CaseKeyPath<Parent.Action, TriggerAction>, on triggerAction: CaseKeyPath<Parent.Action, TriggerAction>,
to stream: @escaping @Sendable () async -> AsyncStream<StreamElement>, to stream: Stream<Parent.State, StreamElement>,
with operation: Operation<Parent.Action, Value>, with operation: Operation<Parent.Action, Value>,
transform: @escaping @Sendable (StreamElement) -> Value transform: @escaping @Sendable (StreamElement) -> Value
) { ) {
@@ -93,8 +536,8 @@ public struct _SubscribeReducer<Parent: Reducer, TriggerAction, StreamElement, V
return .merge( return .merge(
effects, effects,
.run { send in .run { [state = state] send in
for await value in await stream() { for await value in await stream(state: state) {
switch operation { switch operation {
case .action(let action, let animation): case .action(let action, let animation):
await send(action.embed(transform(value)), animation: animation) await send(action.embed(transform(value)), animation: animation)

View File

@@ -1,12 +1,144 @@
import ComposableArchitecture
import XCTest import XCTest
@testable import swift_composable_subscriber @testable import ComposableSubscriber
@DependencyClient
struct NumberClient {
var numberStreamWithoutArg: @Sendable () async -> AsyncStream<Int> = { .never }
var numberStreamWithArg: @Sendable (Int) async -> AsyncStream<Int> = { _ in .never }
}
extension NumberClient: TestDependencyKey {
static var live: NumberClient {
NumberClient(
numberStreamWithoutArg: {
AsyncStream { continuation in
continuation.yield(1)
continuation.finish()
}
},
numberStreamWithArg: { number in
AsyncStream { continuation in
continuation.yield(number)
continuation.finish()
}
}
)
}
static let testValue = Self()
}
extension DependencyValues {
var numberClient: NumberClient {
get { self[NumberClient.self] }
set { self[NumberClient.self] = newValue }
}
}
struct NumberState: Equatable {
var number: Int
var currentNumber: Int?
}
@CasePathable
enum NumberAction {
case receive(Int)
case task
}
@Reducer
struct ReducerWithArg {
typealias State = NumberState
typealias Action = NumberAction
@Dependency(\.numberClient) var numberClient
var body: some Reducer<State, Action> {
Reduce<State, Action> { state, action in
switch action {
case let .receive(number):
state.currentNumber = number
return .none
case .task:
return .none
}
}
.subscribe(
using: \.number,
to: numberClient.numberStreamWithArg,
on: \.task,
with: \.receive
)
}
}
@Reducer
struct ReducerWithTransform {
typealias State = NumberState
typealias Action = NumberAction
@Dependency(\.numberClient) var numberClient
var body: some Reducer<State, Action> {
Reduce<State, Action> { state, action in
switch action {
case let .receive(number):
state.currentNumber = number
return .none
case .task:
return .none
}
}
.subscribe(
using: \.number,
to: numberClient.numberStreamWithArg,
on: \.task,
with: \.receive
) {
$0 * 2
}
}
}
@MainActor
final class swift_composable_subscriberTests: XCTestCase { final class swift_composable_subscriberTests: XCTestCase {
func testExample() throws {
// XCTest Documentation
// https://developer.apple.com/documentation/xctest
// Defining Test Cases and Test Methods func testSubscribeWithArg() async throws {
// https://developer.apple.com/documentation/xctest/defining_test_cases_and_test_methods let store = TestStore(
initialState: ReducerWithArg.State(number: 19),
reducer: ReducerWithArg.init
) {
$0.numberClient = .live
} }
let task = await store.send(.task)
await store.receive(\.receive) {
$0.currentNumber = 19
}
await task.cancel()
await store.finish()
}
func testSubscribeWithArgAndTransform() async throws {
let store = TestStore(
initialState: ReducerWithTransform.State(number: 10),
reducer: ReducerWithTransform.init
) {
$0.numberClient = .live
}
let task = await store.send(.task)
await store.receive(\.receive) {
$0.currentNumber = 20
}
await task.cancel()
await store.finish()
}
} }