import Foundation import Logging import MQTTNIO @_spi(Internal) public actor MQTTConnectionStream: Sendable { public typealias Element = MQTTManager.Event private let client: MQTTClient private let continuation: AsyncStream.Continuation private let logger: Logger? nonisolated let name: String private let stream: AsyncStream private var isShuttingDown = false public init(client: MQTTClient, logger: Logger?) { var logger = logger logger?[metadataKey: "type"] = "\(Self.self)" self.logger = logger let (stream, continuation) = AsyncStream.makeStream() self.client = client self.continuation = continuation self.name = UUID().uuidString self.stream = stream } deinit { stop() } public nonisolated func start() -> AsyncStream { // Check if the client is active and yield the initial result. continuation.yield(client.isActive() ? .connected : .disconnected) // Continually check if the client is active. let task = Task { let isShuttingDown = await self.isShuttingDown while !Task.isCancelled, !isShuttingDown { try await Task.sleep(for: .milliseconds(100)) continuation.yield(client.isActive() ? .connected : .disconnected) } } // Register listener on the client for when the connection // closes. client.addCloseListener(named: name) { _ in self.logger?.trace("Client has disconnected.") self.continuation.yield(.disconnected) } // Register listener on the client for when the client // is shutdown. client.addShutdownListener(named: name) { _ in self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)") self.continuation.yield(.shuttingDown) Task { await self.setIsShuttingDown() } task.cancel() self.stop() } return stream } private func setIsShuttingDown() { isShuttingDown = true } public nonisolated func stop() { client.removeCloseListener(named: name) client.removeShutdownListener(named: name) continuation.finish() } }