99 lines
2.4 KiB
Swift
99 lines
2.4 KiB
Swift
import Foundation
|
|
import Logging
|
|
import MQTTNIO
|
|
|
|
actor ConnectionManager {
|
|
private let client: MQTTClient
|
|
private let logger: Logger?
|
|
private let name: String
|
|
private let shouldReconnect: Bool
|
|
private var hasConnected: Bool = false
|
|
private var listeners: [TopicListenerStream] = []
|
|
private var isShuttingDown = false
|
|
|
|
init(
|
|
client: MQTTClient,
|
|
logger: Logger?,
|
|
alwaysReconnect: Bool
|
|
) {
|
|
var logger = logger
|
|
logger?[metadataKey: "instance"] = "\(Self.self)"
|
|
self.logger = logger
|
|
|
|
self.client = client
|
|
self.name = UUID().uuidString
|
|
self.shouldReconnect = alwaysReconnect
|
|
}
|
|
|
|
deinit {
|
|
if !isShuttingDown {
|
|
let message = """
|
|
Did not properly close the connection manager. This can lead to
|
|
dangling references.
|
|
|
|
Please call `shutdown` to properly close all connections and listener streams.
|
|
"""
|
|
logger?.warning("\(message)")
|
|
self.shutdown()
|
|
}
|
|
}
|
|
|
|
private func setHasConnected() {
|
|
hasConnected = true
|
|
}
|
|
|
|
func listen(
|
|
to topics: [String],
|
|
qos: MQTTQoS
|
|
) async throws -> MQTTManager.ListenStream {
|
|
let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos)
|
|
listeners.append(listener)
|
|
await listener.start()
|
|
return listener.stream
|
|
}
|
|
|
|
func connect(
|
|
cleanSession: Bool
|
|
) async throws {
|
|
guard !hasConnected else { return }
|
|
do {
|
|
try await client.connect(cleanSession: cleanSession)
|
|
setHasConnected()
|
|
|
|
client.addCloseListener(named: name) { [weak self] _ in
|
|
guard let `self` else { return }
|
|
self.logger?.debug("Connection closed.")
|
|
if self.shouldReconnect {
|
|
self.logger?.debug("Reconnecting...")
|
|
Task {
|
|
try await self.connect(cleanSession: cleanSession)
|
|
}
|
|
}
|
|
}
|
|
|
|
client.addShutdownListener(named: name) { _ in
|
|
self.shutdown()
|
|
}
|
|
|
|
} catch {
|
|
logger?.trace("Failed to connect: \(error)")
|
|
throw error
|
|
}
|
|
}
|
|
|
|
private func shutdownListeners() {
|
|
_ = listeners.map { $0.shutdown() }
|
|
listeners = []
|
|
isShuttingDown = true
|
|
}
|
|
|
|
nonisolated func shutdown(withLogging: Bool = true) {
|
|
if withLogging {
|
|
logger?.trace("Shutting down connection.")
|
|
}
|
|
client.removeCloseListener(named: name)
|
|
client.removeShutdownListener(named: name)
|
|
Task { await shutdownListeners() }
|
|
}
|
|
}
|