From bf1126b06a6892b3c18b32c534d31f050917a9a8 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Mon, 11 Nov 2024 22:00:14 -0500 Subject: [PATCH] feat: Adds MQTTConnectionManagerLive module. --- Package.swift | 15 +- Sources/MQTTConnectionManagerLive/Live.swift | 72 +++++++++ .../MQTTConnectionService.swift | 151 ++++++------------ Sources/dewPoint-controller/Application.swift | 7 +- 4 files changed, 137 insertions(+), 108 deletions(-) create mode 100644 Sources/MQTTConnectionManagerLive/Live.swift diff --git a/Package.swift b/Package.swift index ee0f1d7..344da28 100755 --- a/Package.swift +++ b/Package.swift @@ -15,6 +15,7 @@ let package = Package( products: [ .executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]), .library(name: "Models", targets: ["Models"]), + .library(name: "MQTTConnectionManagerLive", targets: ["MQTTConnectionManagerLive"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "SensorsClientLive", targets: ["SensorsClientLive"]), .library(name: "SensorsService", targets: ["SensorsService"]) @@ -32,8 +33,7 @@ let package = Package( name: "dewPoint-controller", dependencies: [ "Models", - "MQTTConnectionService", - "SensorsService", + "MQTTConnectionManagerLive", "SensorsClientLive", .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "NIO", package: "swift-nio"), @@ -48,11 +48,20 @@ let package = Package( ], swiftSettings: swiftSettings ), + .target( + name: "MQTTConnectionManagerLive", + dependencies: [ + "MQTTConnectionService", + .product(name: "MQTTNIO", package: "mqtt-nio") + ], + swiftSettings: swiftSettings + ), .target( name: "MQTTConnectionService", dependencies: [ "Models", - .product(name: "MQTTNIO", package: "mqtt-nio"), + .product(name: "Dependencies", package: "swift-dependencies"), + .product(name: "DependenciesMacros", package: "swift-dependencies"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") ], swiftSettings: swiftSettings diff --git a/Sources/MQTTConnectionManagerLive/Live.swift b/Sources/MQTTConnectionManagerLive/Live.swift new file mode 100644 index 0000000..540c6c4 --- /dev/null +++ b/Sources/MQTTConnectionManagerLive/Live.swift @@ -0,0 +1,72 @@ +import Foundation +import Logging +@_exported import MQTTConnectionService +import MQTTNIO + +public extension MQTTConnectionManager { + static func live( + client: MQTTClient, + cleanSession: Bool = false, + logger: Logger? = nil + ) -> Self { + let manager = ConnectionManager(client: client, logger: logger) + return .init { _ in + try await manager.connect(cleanSession: cleanSession) + return manager.stream + } shutdown: { + manager.shutdown() + } + } +} + +// MARK: - Helpers + +private actor ConnectionManager { + private let client: MQTTClient + private let continuation: AsyncStream.Continuation + private nonisolated let logger: Logger? + private let name: String + let stream: AsyncStream + + init( + client: MQTTClient, + logger: Logger? + ) { + let (stream, continuation) = AsyncStream.makeStream() + self.client = client + self.continuation = continuation + self.logger = logger + self.name = UUID().uuidString + self.stream = stream + } + + deinit { + client.removeCloseListener(named: name) + } + + func connect(cleanSession: Bool) async throws { + do { + try await client.connect(cleanSession: cleanSession) + + continuation.yield(.connected) + + client.addCloseListener(named: name) { _ in + Task { + self.continuation.yield(.disconnected) + self.logger?.debug("Connection closed.") + self.logger?.debug("Reconnecting...") + try await self.connect(cleanSession: cleanSession) + } + } + } catch { + client.logger.trace("Failed to connect: \(error)") + continuation.yield(.disconnected) + throw error + } + } + + nonisolated func shutdown() { + continuation.yield(.shuttingDown) + continuation.finish() + } +} diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index ded1dd4..4cea3e3 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -1,123 +1,70 @@ +import Dependencies +import DependenciesMacros import Foundation import Logging import Models -import MQTTNIO -import NIO import ServiceLifecycle -/// Manages the MQTT broker connection. -public actor MQTTConnectionService: Service { +/// Represents the interface needed for the ``MQTTConnectionService``. +/// +/// See ``MQTTConnectionManagerLive`` module for live implementation. +@DependencyClient +public struct MQTTConnectionManager: Sendable { - private let cleanSession: Bool - private let client: MQTTClient - private let internalEventStream: ConnectionStream - nonisolated var logger: Logger { client.logger } - private let name: String - - public init( - cleanSession: Bool = true, - client: MQTTClient - ) { - self.cleanSession = cleanSession - self.client = client - self.internalEventStream = .init() - self.name = UUID().uuidString - } - - deinit { - self.logger.debug("MQTTConnectionService is gone.") - self.internalEventStream.stop() - } - - /// The entry-point of the service. - /// - /// This method connects to the MQTT broker and manages the connection. - /// It will attempt to gracefully shutdown the connection upon receiving - /// a shutdown signals. - public func run() async throws { - await withGracefulShutdownHandler { - await withDiscardingTaskGroup { group in - group.addTask { await self.connect() } - group.addTask { - await self.internalEventStream.start { self.client.isActive() } - } - for await event in self.internalEventStream.events.cancelOnGracefulShutdown() { - if event == .shuttingDown { - break - } - self.logger.trace("Sending connection event: \(event)") - } - group.cancelAll() - } - } onGracefulShutdown: { - self.logger.trace("Received graceful shutdown.") - self.shutdown() - } - } - - func connect() async { - do { - try await client.connect(cleanSession: cleanSession) - client.addCloseListener(named: name) { _ in - Task { - self.logger.debug("Connection closed.") - self.logger.debug("Reconnecting...") - await self.connect() - } - } - logger.debug("Connection successful.") - } catch { - logger.trace("Failed to connect: \(error)") - } - } - - nonisolated func shutdown() { - logger.debug("Begin shutting down MQTT broker connection.") - client.removeCloseListener(named: name) - internalEventStream.stop() - _ = client.disconnect() - try? client.syncShutdownGracefully() - logger.info("MQTT broker connection closed.") - } - -} - -extension MQTTConnectionService { + public var connect: @Sendable (_ cleanSession: Bool) async throws -> AsyncStream + public var shutdown: () -> Void public enum Event: Sendable { case connected case disconnected case shuttingDown } +} - private actor ConnectionStream: Sendable { +extension MQTTConnectionManager: TestDependencyKey { + public static var testValue: MQTTConnectionManager { + Self() + } +} - private let continuation: AsyncStream.Continuation - let events: AsyncStream +public extension DependencyValues { + var mqttConnectionManager: MQTTConnectionManager { + get { self[MQTTConnectionManager.self] } + set { self[MQTTConnectionManager.self] = newValue } + } +} - init() { - let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self) - self.events = stream - self.continuation = continuation - } +// MARK: - MQTTConnectionService - deinit { - stop() - } +public actor MQTTConnectionService: Service { + @Dependency(\.mqttConnectionManager) var manager - func start(isActive connectionIsActive: @escaping () -> Bool) async { - try? await Task.sleep(for: .seconds(1)) - let event: MQTTConnectionService.Event = connectionIsActive() - ? .connected - : .disconnected + private let cleanSession: Bool + private nonisolated let logger: Logger? - continuation.yield(event) - } - - nonisolated func stop() { - continuation.yield(.shuttingDown) - continuation.finish() - } + public init( + cleanSession: Bool = false, + logger: Logger? = nil + ) { + self.cleanSession = cleanSession + self.logger = logger } + /// The entry-point of the service which starts the connection + /// to the MQTT broker and handles graceful shutdown of the + /// connection. + public func run() async throws { + try await withGracefulShutdownHandler { + let stream = try await manager.connect(cleanSession) + for await event in stream.cancelOnGracefulShutdown() { + // We don't really need to do anything with the events, so just logging + // for now. But we need to iterate on an async stream for the service to + // continue to run and handle graceful shutdowns. + logger?.trace("Received connection event: \(event)") + } + } onGracefulShutdown: { + self.logger?.trace("Received graceful shutdown.") + Task { await self.manager.shutdown() } + } + } } diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index 1c6af48..8e6662f 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -2,7 +2,7 @@ import Dependencies import Foundation import Logging import Models -import MQTTConnectionService +import MQTTConnectionManagerLive import MQTTNIO import NIO import PsychrometricClientLive @@ -33,11 +33,12 @@ struct Application { logger: logger ) - let mqttConnection = MQTTConnectionService(client: mqtt) try await withDependencies { - $0.psychrometricClient = PsychrometricClient.liveValue + $0.psychrometricClient = .liveValue $0.sensorsClient = .live(client: mqtt) + $0.mqttConnectionManager = .live(client: mqtt, logger: logger) } operation: { + let mqttConnection = MQTTConnectionService(cleanSession: false, logger: logger) let sensors = SensorsService(sensors: .live) var serviceGroupConfiguration = ServiceGroupConfiguration(