diff --git a/Sources/MQTTConnectionManagerLive/Live.swift b/Sources/MQTTConnectionManagerLive/Live.swift index 540c6c4..47f54a4 100644 --- a/Sources/MQTTConnectionManagerLive/Live.swift +++ b/Sources/MQTTConnectionManagerLive/Live.swift @@ -12,7 +12,11 @@ public extension MQTTConnectionManager { let manager = ConnectionManager(client: client, logger: logger) return .init { _ in try await manager.connect(cleanSession: cleanSession) + return manager.stream + .removeDuplicates() + .eraseToStream() + } shutdown: { manager.shutdown() } @@ -26,6 +30,7 @@ private actor ConnectionManager { private let continuation: AsyncStream.Continuation private nonisolated let logger: Logger? private let name: String + private var started: Bool = false let stream: AsyncStream init( @@ -42,6 +47,7 @@ private actor ConnectionManager { deinit { client.removeCloseListener(named: name) + client.removeShutdownListener(named: name) } func connect(cleanSession: Bool) async throws { @@ -51,13 +57,16 @@ private actor ConnectionManager { continuation.yield(.connected) client.addCloseListener(named: name) { _ in - Task { - self.continuation.yield(.disconnected) - self.logger?.debug("Connection closed.") - self.logger?.debug("Reconnecting...") - try await self.connect(cleanSession: cleanSession) - } + self.continuation.yield(.disconnected) + self.logger?.debug("Connection closed.") + self.logger?.debug("Reconnecting...") + Task { try await self.connect(cleanSession: cleanSession) } } + + client.addShutdownListener(named: name) { _ in + self.shutdown() + } + } catch { client.logger.trace("Failed to connect: \(error)") continuation.yield(.disconnected) @@ -66,6 +75,7 @@ private actor ConnectionManager { } nonisolated func shutdown() { + client.logger.trace("Shutting down connection.") continuation.yield(.shuttingDown) continuation.finish() } diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index 5070e7d..f755d60 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -15,6 +15,7 @@ public struct MQTTConnectionManager: Sendable { public var shutdown: () -> Void public enum Event: Sendable { + case notStarted case connected case disconnected case shuttingDown @@ -65,9 +66,9 @@ public actor MQTTConnectionService: Service { // continue to run and handle graceful shutdowns. logger?.trace("Received connection event: \(event)") } + manager.shutdown() } onGracefulShutdown: { self.logger?.trace("Received graceful shutdown.") - Task { await self.manager.shutdown() } } } } diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index 8b5e19d..a2ba83d 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -36,7 +36,6 @@ struct Application { try await withDependencies { $0.psychrometricClient = .liveValue - // $0.sensorsClient = .live(client: mqtt) $0.topicListener = .live(client: mqtt) $0.topicPublisher = .live(client: mqtt) $0.mqttConnectionManager = .live(client: mqtt, logger: logger) @@ -59,6 +58,8 @@ struct Application { try await serviceGroup.run() } + + try await mqtt.shutdown() } }