From efd9907b4aad81b3c3e87565a85e14626e1be708 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Tue, 12 Nov 2024 22:19:09 -0500 Subject: [PATCH] feat: Cleans up some of the shutdown logic so that the MQTTClient is disconnected properly. --- Sources/MQTTConnectionManagerLive/Live.swift | 4 +++- Sources/MQTTConnectionService/MQTTConnectionService.swift | 8 ++------ Sources/TopicDependencies/TopicListener.swift | 6 ++---- Sources/dewPoint-controller/Application.swift | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/Sources/MQTTConnectionManagerLive/Live.swift b/Sources/MQTTConnectionManagerLive/Live.swift index 47f54a4..68eac47 100644 --- a/Sources/MQTTConnectionManagerLive/Live.swift +++ b/Sources/MQTTConnectionManagerLive/Live.swift @@ -10,7 +10,7 @@ public extension MQTTConnectionManager { logger: Logger? = nil ) -> Self { let manager = ConnectionManager(client: client, logger: logger) - return .init { _ in + return .init { try await manager.connect(cleanSession: cleanSession) return manager.stream @@ -76,6 +76,8 @@ private actor ConnectionManager { nonisolated func shutdown() { client.logger.trace("Shutting down connection.") + client.removeCloseListener(named: name) + client.removeShutdownListener(named: name) continuation.yield(.shuttingDown) continuation.finish() } diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index f755d60..da75c3b 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -11,11 +11,10 @@ import ServiceLifecycle @DependencyClient public struct MQTTConnectionManager: Sendable { - public var connect: @Sendable (_ cleanSession: Bool) async throws -> AsyncStream + public var connect: @Sendable () async throws -> AsyncStream public var shutdown: () -> Void public enum Event: Sendable { - case notStarted case connected case disconnected case shuttingDown @@ -43,14 +42,11 @@ public extension DependencyValues { public actor MQTTConnectionService: Service { @Dependency(\.mqttConnectionManager) var manager - private let cleanSession: Bool private nonisolated let logger: Logger? public init( - cleanSession: Bool = false, logger: Logger? = nil ) { - self.cleanSession = cleanSession self.logger = logger } @@ -59,7 +55,7 @@ public actor MQTTConnectionService: Service { /// connection. public func run() async throws { try await withGracefulShutdownHandler { - let stream = try await manager.connect(cleanSession) + let stream = try await manager.connect() for await event in stream.cancelOnGracefulShutdown() { // We don't really need to do anything with the events, so just logging // for now. But we need to iterate on an async stream for the service to diff --git a/Sources/TopicDependencies/TopicListener.swift b/Sources/TopicDependencies/TopicListener.swift index 56bcbf3..c3c986f 100644 --- a/Sources/TopicDependencies/TopicListener.swift +++ b/Sources/TopicDependencies/TopicListener.swift @@ -153,10 +153,6 @@ private actor MQTTTopicListener { } } - client.addShutdownListener(named: name) { _ in - self.shutdown() - } - return stream } @@ -167,6 +163,8 @@ private actor MQTTTopicListener { nonisolated func shutdown() { client.logger.trace("Closing topic listener...") continuation.finish() + client.removePublishListener(named: name) + client.removeShutdownListener(named: name) Task { await self.setIsShuttingDown() } } } diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index a2ba83d..9880970 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -40,7 +40,7 @@ struct Application { $0.topicPublisher = .live(client: mqtt) $0.mqttConnectionManager = .live(client: mqtt, logger: logger) } operation: { - let mqttConnection = MQTTConnectionService(cleanSession: false, logger: logger) + let mqttConnection = MQTTConnectionService(logger: logger) let sensors = SensorsService(sensors: .live, logger: logger) var serviceGroupConfiguration = ServiceGroupConfiguration(