import Foundation import Logging import MQTTNIO actor ConnectionManager { private let client: MQTTClient private let logger: Logger? private let name: String private let shouldReconnect: Bool private var hasConnected: Bool = false private var listeners: [TopicListenerStream] = [] private var isShuttingDown = false init( client: MQTTClient, logger: Logger?, alwaysReconnect: Bool ) { var logger = logger logger?[metadataKey: "instance"] = "\(Self.self)" self.logger = logger self.client = client self.name = UUID().uuidString self.shouldReconnect = alwaysReconnect } deinit { if !isShuttingDown { let message = """ Did not properly close the connection manager. This can lead to dangling references. Please call `shutdown` to properly close all connections and listener streams. """ logger?.warning("\(message)") self.shutdown() } } private func setHasConnected() { hasConnected = true } func listen( to topics: [String], qos: MQTTQoS ) async throws -> MQTTManager.ListenStream { let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos) listeners.append(listener) await listener.start() return listener.stream } func connect( cleanSession: Bool ) async throws { guard !hasConnected else { return } do { try await client.connect(cleanSession: cleanSession) setHasConnected() client.addCloseListener(named: name) { [weak self] _ in guard let `self` else { return } self.logger?.debug("Connection closed.") if self.shouldReconnect { self.logger?.debug("Reconnecting...") Task { try await self.connect(cleanSession: cleanSession) } } } client.addShutdownListener(named: name) { _ in self.shutdown() } } catch { logger?.trace("Failed to connect: \(error)") throw error } } private func shutdownListeners() { _ = listeners.map { $0.shutdown() } listeners = [] isShuttingDown = true } nonisolated func shutdown(withLogging: Bool = true) { if withLogging { logger?.trace("Shutting down connection.") } client.removeCloseListener(named: name) client.removeShutdownListener(named: name) Task { await shutdownListeners() } } }