import ComposableArchitecture import SwiftUI extension Reducer { public func subscribe( to stream: @escaping @Sendable () async -> AsyncStream, on triggerAction: CaseKeyPath, with responseAction: CaseKeyPath, animation: Animation? = nil ) -> _SubscribeReducer { .init( parent: self, on: triggerAction, to: stream, with: .action(action: AnyCasePath(responseAction), animation: animation), transform: { $0 } ) } public func subscribe( to stream: @escaping @Sendable () async -> AsyncStream, on triggerAction: CaseKeyPath, with responseAction: CaseKeyPath, animation: Animation? = nil, transform: @escaping @Sendable (StreamElement) -> Value ) -> _SubscribeReducer { .init( parent: self, on: triggerAction, to: stream, with: .action(action: AnyCasePath(responseAction), animation: animation), transform: transform ) } public func subscribe( to stream: @escaping @Sendable () async -> AsyncStream, on triggerAction: CaseKeyPath, operation: @escaping @Sendable (_ send: Send, StreamElement) async throws -> Void ) -> _SubscribeReducer { .init( parent: self, on: triggerAction, to: stream, with: .operation(f: operation), transform: { $0 } ) } } @usableFromInline enum Operation { case action(action: AnyCasePath, animation: Animation?) case operation(f: (_ send: Send, Value) async throws -> Void) } public struct _SubscribeReducer: Reducer { @usableFromInline let parent: Parent @usableFromInline let triggerAction: AnyCasePath @usableFromInline let stream: () async -> AsyncStream @usableFromInline let operation: Operation @usableFromInline let transform: (StreamElement) -> Value init( parent: Parent, on triggerAction: CaseKeyPath, to stream: @escaping @Sendable () async -> AsyncStream, with operation: Operation, transform: @escaping @Sendable (StreamElement) -> Value ) { self.parent = parent self.triggerAction = AnyCasePath(triggerAction) self.stream = stream self.transform = transform self.operation = operation } public func reduce(into state: inout Parent.State, action: Parent.Action) -> Effect { let effects = parent.reduce(into: &state, action: action) guard self.triggerAction.extract(from: action) != nil else { return effects } return .merge( effects, .run { send in for await value in await stream() { switch operation { case .action(let action, let animation): await send(action.embed(transform(value)), animation: animation) case .operation(let f): try await f(send, transform(value)) } } } ) } }