import ComposableArchitecture extension Reducer { public func subscribe( to stream: @escaping () async -> AsyncStream, on triggerAction: CaseKeyPath, with responseAction: CaseKeyPath ) -> _SubscribeReducer { .init( parent: self, on: triggerAction, to: stream, with: responseAction ) } } public struct _SubscribeReducer: Reducer { @usableFromInline let parent: Parent @usableFromInline let triggerAction: AnyCasePath @usableFromInline let stream: () async -> AsyncStream @usableFromInline let responseAction: AnyCasePath init( parent: Parent, on triggerAction: CaseKeyPath, to stream: @escaping () async -> AsyncStream, with responseAction: CaseKeyPath ) { self.parent = parent self.triggerAction = AnyCasePath(triggerAction) self.stream = stream self.responseAction = AnyCasePath(responseAction) } public func reduce(into state: inout Parent.State, action: Parent.Action) -> Effect { let effects = parent.reduce(into: &state, action: action) guard self.triggerAction.extract(from: action) != nil else { return effects } return .merge( effects, .run { send in for await value in await stream() { await send(responseAction.embed(value)) } } ) } }