import AsyncAlgorithms import Dependencies import DependenciesMacros import Foundation import Logging import MQTTNIO import NIO public extension DependencyValues { /// A dependency that is responsible for managing the connection to /// an MQTT broker. var mqttConnectionManager: MQTTConnectionManager { get { self[MQTTConnectionManager.self] } set { self[MQTTConnectionManager.self] = newValue } } } /// Represents the interface needed for the ``MQTTConnectionService``. /// /// See ``MQTTConnectionManagerLive`` module for live implementation. @DependencyClient public struct MQTTConnectionManager: Sendable { /// Connect to the MQTT broker. public var connect: @Sendable () async throws -> Void /// Shutdown the connection to the MQTT broker. /// /// - Note: You should cancel any tasks that are listening to the connection stream first. public var shutdown: @Sendable () -> Void /// Create a stream of connection events. public var stream: @Sendable () throws -> AsyncStream /// Perform an operation with the underlying MQTTClient, this can be useful in /// tests, so this module needs imported with `@_spi(Testing) import` to use this method. private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void @_spi(Internal) public func withClient( _ callback: @Sendable (MQTTClient) async throws -> Void ) async throws { try await _withClient(callback) } /// Represents connection events that clients can listen for and /// react accordingly. public enum Event: Sendable { case connected case disconnected case shuttingDown } public static func live( client: MQTTClient, cleanSession: Bool = false, logger: Logger? = nil, alwaysReconnect: Bool = true ) -> Self { let manager = ConnectionManager( client: client, logger: logger, alwaysReconnect: alwaysReconnect ) return .init { try await manager.connect(cleanSession: cleanSession) } shutdown: { manager.shutdown() } stream: { MQTTConnectionStream(client: client, logger: logger) .start() .removeDuplicates() .eraseToStream() } _withClient: { callback in try await callback(client) } } } extension MQTTConnectionManager: TestDependencyKey { public static var testValue: MQTTConnectionManager { Self() } } // MARK: - Helpers @_spi(Internal) public final actor MQTTConnectionStream: Sendable { public typealias Element = MQTTConnectionManager.Event private let client: MQTTClient private let continuation: AsyncStream.Continuation private let logger: Logger? private let name: String private let stream: AsyncStream private var isShuttingDown = false public init(client: MQTTClient, logger: Logger?) { let (stream, continuation) = AsyncStream.makeStream() self.client = client self.continuation = continuation self.logger = logger self.name = UUID().uuidString self.stream = stream } deinit { stop() } public nonisolated func start() -> AsyncStream { // Check if the client is active and yield the initial result. continuation.yield(client.isActive() ? .connected : .disconnected) // Continually check if the client is active. let task = Task { let isShuttingDown = await self.isShuttingDown while !Task.isCancelled, !isShuttingDown { try await Task.sleep(for: .milliseconds(100)) continuation.yield(client.isActive() ? .connected : .disconnected) } } // Register listener on the client for when the connection // closes. client.addCloseListener(named: name) { _ in self.logger?.trace("Client has disconnected.") self.continuation.yield(.disconnected) } // Register listener on the client for when the client // is shutdown. client.addShutdownListener(named: name) { _ in self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)") self.continuation.yield(.shuttingDown) Task { await self.setIsShuttingDown() } task.cancel() self.stop() } return stream } private func setIsShuttingDown() { isShuttingDown = true } public nonisolated func stop() { client.removeCloseListener(named: name) client.removeShutdownListener(named: name) continuation.finish() } } actor ConnectionManager { private let client: MQTTClient private let logger: Logger? private let name: String private let shouldReconnect: Bool private var hasConnected: Bool = false init( client: MQTTClient, logger: Logger?, alwaysReconnect: Bool ) { self.client = client self.logger = logger self.name = UUID().uuidString self.shouldReconnect = alwaysReconnect } deinit { // We should've already logged that we're shutting down if // the manager was shutdown properly, so don't log it twice. self.shutdown(withLogging: false) } private func setHasConnected() { hasConnected = true } func connect( cleanSession: Bool ) async throws { guard !hasConnected else { return } do { try await client.connect(cleanSession: cleanSession) setHasConnected() client.addCloseListener(named: name) { [weak self] _ in guard let `self` else { return } self.logger?.debug("Connection closed.") if self.shouldReconnect { self.logger?.debug("Reconnecting...") Task { try await self.connect(cleanSession: cleanSession) } } } client.addShutdownListener(named: name) { _ in self.shutdown() } } catch { logger?.trace("Failed to connect: \(error)") throw error } } nonisolated func shutdown(withLogging: Bool = true) { if withLogging { logger?.trace("Shutting down connection.") } client.removeCloseListener(named: name) client.removeShutdownListener(named: name) } }