import ComposableArchitecture extension Reducer { public func subscribe( to stream: @escaping () async -> AsyncStream, on triggerAction: CaseKeyPath, with responseAction: CaseKeyPath ) -> _SubscribeReducer { .init( parent: self, on: triggerAction, to: stream, with: responseAction, transform: { $0 } ) } public func subscribe( to stream: @escaping () async -> AsyncStream, on triggerAction: CaseKeyPath, with responseAction: CaseKeyPath, transform: @escaping (StreamElement) -> Value ) -> _SubscribeReducer { .init( parent: self, on: triggerAction, to: stream, with: responseAction, transform: transform ) } } public struct _SubscribeReducer: Reducer { @usableFromInline let parent: Parent @usableFromInline let triggerAction: AnyCasePath @usableFromInline let stream: () async -> AsyncStream @usableFromInline let responseAction: AnyCasePath @usableFromInline let transform: (StreamElement) -> Value init( parent: Parent, on triggerAction: CaseKeyPath, to stream: @escaping () async -> AsyncStream, with responseAction: CaseKeyPath, transform: @escaping (StreamElement) -> Value ) { self.parent = parent self.triggerAction = AnyCasePath(triggerAction) self.stream = stream self.responseAction = AnyCasePath(responseAction) self.transform = transform } public func reduce(into state: inout Parent.State, action: Parent.Action) -> Effect { let effects = parent.reduce(into: &state, action: action) guard self.triggerAction.extract(from: action) != nil else { return effects } return .merge( effects, .run { send in for await value in await stream() { await send(responseAction.embed(transform(value))) } } ) } }