From c84427a9b32307f2d2c7f1aa2c11adbb921544aa Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Fri, 15 Nov 2024 17:15:01 -0500 Subject: [PATCH] feat: Renaming and moves some items around, listeners now manage reconnection events. --- Package.swift | 26 +-- Sources/DewPointController/Application.swift | 7 +- Sources/MQTTConnectionManager/Live.swift | 221 ------------------ .../MQTTConnectionService.swift | 14 +- Sources/MQTTManager/Interface.swift | 197 ++++++++++++++++ .../Internal/ConnectionManager.swift | 98 ++++++++ .../Internal/ConnectionStream.swift | 74 ++++++ .../Internal/TopicListenerStream.swift | 177 ++++++++++++++ Sources/SensorsService/Helpers.swift | 42 ---- Sources/SensorsService/SensorsService.swift | 106 ++++----- Sources/TopicDependencies/TopicListener.swift | 186 --------------- .../TopicDependencies/TopicPublisher.swift | 117 ---------- .../MQTTConnectionServiceTests.swift | 9 +- .../SensorsClientTests.swift | 100 +++----- 14 files changed, 649 insertions(+), 725 deletions(-) delete mode 100644 Sources/MQTTConnectionManager/Live.swift create mode 100644 Sources/MQTTManager/Interface.swift create mode 100644 Sources/MQTTManager/Internal/ConnectionManager.swift create mode 100644 Sources/MQTTManager/Internal/ConnectionStream.swift create mode 100644 Sources/MQTTManager/Internal/TopicListenerStream.swift delete mode 100755 Sources/SensorsService/Helpers.swift delete mode 100644 Sources/TopicDependencies/TopicListener.swift delete mode 100644 Sources/TopicDependencies/TopicPublisher.swift diff --git a/Package.swift b/Package.swift index 5a71392..92bb0a8 100755 --- a/Package.swift +++ b/Package.swift @@ -15,10 +15,9 @@ let package = Package( products: [ .executable(name: "dewpoint-controller", targets: ["DewPointController"]), .library(name: "Models", targets: ["Models"]), - .library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]), + .library(name: "MQTTManager", targets: ["MQTTManager"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), - .library(name: "SensorsService", targets: ["SensorsService"]), - .library(name: "TopicDependencies", targets: ["TopicDependencies"]) + .library(name: "SensorsService", targets: ["SensorsService"]) ], dependencies: [ .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), @@ -34,10 +33,9 @@ let package = Package( name: "DewPointController", dependencies: [ "Models", - "MQTTConnectionManager", + "MQTTManager", "MQTTConnectionService", "SensorsService", - "TopicDependencies", .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "NIO", package: "swift-nio"), .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") @@ -52,7 +50,7 @@ let package = Package( swiftSettings: swiftSettings ), .target( - name: "MQTTConnectionManager", + name: "MQTTManager", dependencies: [ .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), .product(name: "Dependencies", package: "swift-dependencies"), @@ -65,7 +63,7 @@ let package = Package( name: "MQTTConnectionService", dependencies: [ "Models", - "MQTTConnectionManager", + "MQTTManager", .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") ], swiftSettings: swiftSettings @@ -74,7 +72,7 @@ let package = Package( name: "MQTTConnectionServiceTests", dependencies: [ "MQTTConnectionService", - "MQTTConnectionManager", + "MQTTManager", .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") ] ), @@ -82,8 +80,7 @@ let package = Package( name: "SensorsService", dependencies: [ "Models", - "MQTTConnectionManager", - "TopicDependencies", + "MQTTManager", .product(name: "Dependencies", package: "swift-dependencies"), .product(name: "DependenciesMacros", package: "swift-dependencies"), .product(name: "MQTTNIO", package: "mqtt-nio"), @@ -97,15 +94,6 @@ let package = Package( "SensorsService", .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") ] - ), - .target( - name: "TopicDependencies", - dependencies: [ - .product(name: "Dependencies", package: "swift-dependencies"), - .product(name: "DependenciesMacros", package: "swift-dependencies"), - .product(name: "MQTTNIO", package: "mqtt-nio") - ], - swiftSettings: swiftSettings ) ] ) diff --git a/Sources/DewPointController/Application.swift b/Sources/DewPointController/Application.swift index a1853fb..a6cc71a 100644 --- a/Sources/DewPointController/Application.swift +++ b/Sources/DewPointController/Application.swift @@ -2,14 +2,13 @@ import Dependencies import Foundation import Logging import Models -import MQTTConnectionManager import MQTTConnectionService +import MQTTManager import MQTTNIO import NIO import PsychrometricClientLive import SensorsService import ServiceLifecycle -import TopicDependencies @main struct Application { @@ -38,9 +37,7 @@ struct Application { do { try await withDependencies { $0.psychrometricClient = .liveValue - $0.topicListener = .live(client: mqtt) - $0.topicPublisher = .live(client: mqtt) - $0.mqttConnectionManager = .live(client: mqtt, logger: logger) + $0.mqtt = .live(client: mqtt, logger: logger) } operation: { let mqttConnection = MQTTConnectionService(logger: logger) let sensors = SensorsService(sensors: .live, logger: logger) diff --git a/Sources/MQTTConnectionManager/Live.swift b/Sources/MQTTConnectionManager/Live.swift deleted file mode 100644 index 47e1fa2..0000000 --- a/Sources/MQTTConnectionManager/Live.swift +++ /dev/null @@ -1,221 +0,0 @@ -import AsyncAlgorithms -import Dependencies -import DependenciesMacros -import Foundation -import Logging -import MQTTNIO -import NIO - -public extension DependencyValues { - - /// A dependency that is responsible for managing the connection to - /// an MQTT broker. - var mqttConnectionManager: MQTTConnectionManager { - get { self[MQTTConnectionManager.self] } - set { self[MQTTConnectionManager.self] = newValue } - } -} - -/// Represents the interface needed for the ``MQTTConnectionService``. -/// -/// See ``MQTTConnectionManagerLive`` module for live implementation. -@DependencyClient -public struct MQTTConnectionManager: Sendable { - - /// Connect to the MQTT broker. - public var connect: @Sendable () async throws -> Void - - /// Shutdown the connection to the MQTT broker. - /// - /// - Note: You should cancel any tasks that are listening to the connection stream first. - public var shutdown: @Sendable () -> Void - - /// Create a stream of connection events. - public var stream: @Sendable () throws -> AsyncStream - - /// Perform an operation with the underlying MQTTClient, this can be useful in - /// tests, so this module needs imported with `@_spi(Testing) import` to use this method. - private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void - - @_spi(Internal) - public func withClient( - _ callback: @Sendable (MQTTClient) async throws -> Void - ) async throws { - try await _withClient(callback) - } - - /// Represents connection events that clients can listen for and - /// react accordingly. - public enum Event: Sendable { - case connected - case disconnected - case shuttingDown - } - - public static func live( - client: MQTTClient, - cleanSession: Bool = false, - logger: Logger? = nil, - alwaysReconnect: Bool = true - ) -> Self { - let manager = ConnectionManager( - client: client, - logger: logger, - alwaysReconnect: alwaysReconnect - ) - return .init { - try await manager.connect(cleanSession: cleanSession) - } shutdown: { - manager.shutdown() - } stream: { - MQTTConnectionStream(client: client, logger: logger) - .start() - .removeDuplicates() - .eraseToStream() - } _withClient: { callback in - try await callback(client) - } - } -} - -extension MQTTConnectionManager: TestDependencyKey { - public static var testValue: MQTTConnectionManager { - Self() - } -} - -// MARK: - Helpers - -@_spi(Internal) -public final actor MQTTConnectionStream: Sendable { - - public typealias Element = MQTTConnectionManager.Event - - private let client: MQTTClient - private let continuation: AsyncStream.Continuation - private let logger: Logger? - private let name: String - private let stream: AsyncStream - private var isShuttingDown = false - - public init(client: MQTTClient, logger: Logger?) { - let (stream, continuation) = AsyncStream.makeStream() - self.client = client - self.continuation = continuation - self.logger = logger - self.name = UUID().uuidString - self.stream = stream - } - - deinit { stop() } - - public nonisolated func start() -> AsyncStream { - // Check if the client is active and yield the initial result. - continuation.yield(client.isActive() ? .connected : .disconnected) - - // Continually check if the client is active. - let task = Task { - let isShuttingDown = await self.isShuttingDown - while !Task.isCancelled, !isShuttingDown { - try await Task.sleep(for: .milliseconds(100)) - continuation.yield(client.isActive() ? .connected : .disconnected) - } - } - - // Register listener on the client for when the connection - // closes. - client.addCloseListener(named: name) { _ in - self.logger?.trace("Client has disconnected.") - self.continuation.yield(.disconnected) - } - - // Register listener on the client for when the client - // is shutdown. - client.addShutdownListener(named: name) { _ in - self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)") - self.continuation.yield(.shuttingDown) - Task { await self.setIsShuttingDown() } - task.cancel() - self.stop() - } - - return stream - } - - private func setIsShuttingDown() { - isShuttingDown = true - } - - public nonisolated func stop() { - client.removeCloseListener(named: name) - client.removeShutdownListener(named: name) - continuation.finish() - } - -} - -actor ConnectionManager { - private let client: MQTTClient - private let logger: Logger? - private let name: String - private let shouldReconnect: Bool - private var hasConnected: Bool = false - - init( - client: MQTTClient, - logger: Logger?, - alwaysReconnect: Bool - ) { - self.client = client - self.logger = logger - self.name = UUID().uuidString - self.shouldReconnect = alwaysReconnect - } - - deinit { - // We should've already logged that we're shutting down if - // the manager was shutdown properly, so don't log it twice. - self.shutdown(withLogging: false) - } - - private func setHasConnected() { - hasConnected = true - } - - func connect( - cleanSession: Bool - ) async throws { - guard !hasConnected else { return } - do { - try await client.connect(cleanSession: cleanSession) - setHasConnected() - - client.addCloseListener(named: name) { [weak self] _ in - guard let `self` else { return } - self.logger?.debug("Connection closed.") - if self.shouldReconnect { - self.logger?.debug("Reconnecting...") - Task { - try await self.connect(cleanSession: cleanSession) - } - } - } - - client.addShutdownListener(named: name) { _ in - self.shutdown() - } - - } catch { - logger?.trace("Failed to connect: \(error)") - throw error - } - } - - nonisolated func shutdown(withLogging: Bool = true) { - if withLogging { - logger?.trace("Shutting down connection.") - } - client.removeCloseListener(named: name) - client.removeShutdownListener(named: name) - } -} diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index 9bbc620..438316c 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -1,13 +1,13 @@ import Dependencies import Logging import Models -import MQTTConnectionManager +import MQTTManager import ServiceLifecycle -public actor MQTTConnectionService: Service { - @Dependency(\.mqttConnectionManager) var manager +public struct MQTTConnectionService: Service { + @Dependency(\.mqtt) var mqtt - private nonisolated let logger: Logger? + private let logger: Logger? public init( logger: Logger? = nil @@ -20,8 +20,8 @@ public actor MQTTConnectionService: Service { /// connection. public func run() async throws { try await withGracefulShutdownHandler { - try await manager.connect() - for await event in try manager.stream().cancelOnGracefulShutdown() { + try await mqtt.connect() + for await event in try mqtt.connectionStream().cancelOnGracefulShutdown() { // We don't really need to do anything with the events, so just logging // for now. But we need to iterate on an async stream for the service to // continue to run and handle graceful shutdowns. @@ -29,7 +29,7 @@ public actor MQTTConnectionService: Service { } // when we reach here we are shutting down, so we shutdown // the manager. - manager.shutdown() + mqtt.shutdown() } onGracefulShutdown: { self.logger?.trace("Received graceful shutdown.") } diff --git a/Sources/MQTTManager/Interface.swift b/Sources/MQTTManager/Interface.swift new file mode 100644 index 0000000..2307aa1 --- /dev/null +++ b/Sources/MQTTManager/Interface.swift @@ -0,0 +1,197 @@ +import AsyncAlgorithms +import Dependencies +import DependenciesMacros +import Foundation +import Logging +import MQTTNIO +import NIO + +public extension DependencyValues { + + /// A dependency that is responsible for managing the connection to + /// an MQTT broker. + var mqtt: MQTTManager { + get { self[MQTTManager.self] } + set { self[MQTTManager.self] = newValue } + } +} + +/// Represents the interface needed for the ``MQTTConnectionService``. +/// +/// See ``MQTTConnectionManagerLive`` module for live implementation. +@DependencyClient +public struct MQTTManager: Sendable { + + public typealias ListenStream = AsyncStream + + /// Connect to the MQTT broker. + public var connect: @Sendable () async throws -> Void + + /// Create a stream of connection events. + public var connectionStream: @Sendable () throws -> AsyncStream + + private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream + + /// Publish a value to the MQTT broker for a given topic. + public var publish: @Sendable (PublishRequest) async throws -> Void + + /// Shutdown the connection to the MQTT broker. + public var shutdown: @Sendable () -> Void + + /// Perform an operation with the underlying MQTTClient, this can be useful in + /// tests, so this module needs imported with `@_spi(Testing) import` to use this method. + private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void + + /// Create an async stream that listens for changes to the given topics. + /// + /// - Parameters: + /// - topics: The topics to listen for changes to. + /// - qos: The MQTTQoS for the subscription. + public func listen( + to topics: [String], + qos: MQTTQoS = .atLeastOnce + ) async throws -> ListenStream { + try await _listen(topics, qos) + } + + /// Create an async stream that listens for changes to the given topics. + /// + /// - Parameters: + /// - topics: The topics to listen for changes to. + /// - qos: The MQTTQoS for the subscription. + public func listen( + _ topics: String..., + qos: MQTTQoS = .atLeastOnce + ) async throws -> ListenStream { + try await listen(to: topics, qos: qos) + } + + /// Publish a new value to the given topic. + /// + /// - Parameters: + /// - payload: The value to publish. + /// - topicName: The topic to publish the new value to. + /// - qos: The MQTTQoS. + /// - retain: The retain flag. + public func publish( + _ payload: ByteBuffer, + to topicName: String, + qos: MQTTQoS, + retain: Bool = false + ) async throws { + try await publish(.init( + topicName: topicName, + payload: payload, + qos: qos, + retain: retain + )) + } + + /// Perform an operation with the underlying MQTTClient, this can be useful in + /// tests, so this module needs imported with `@_spi(Testing) import` to use this method. + @_spi(Internal) + public func withClient( + _ callback: @Sendable (MQTTClient) async throws -> Void + ) async throws { + try await _withClient(callback) + } + + /// Represents connection events that clients can listen for and + /// react accordingly. + public enum Event: Sendable { + case connected + case disconnected + case shuttingDown + } + + /// Represents the parameters required to publish a new value to the + /// MQTT broker. + public struct PublishRequest: Equatable, Sendable { + + /// The topic to publish the new value to. + public let topicName: String + + /// The value to publish. + public let payload: ByteBuffer + + /// The qos of the request. + public let qos: MQTTQoS + + /// The retain flag for the request. + public let retain: Bool + + /// Create a new publish request. + /// + /// - Parameters: + /// - topicName: The topic to publish to. + /// - payload: The value to publish. + /// - qos: The qos of the request. + /// - retain: The retain flag of the request. + public init( + topicName: String, + payload: ByteBuffer, + qos: MQTTQoS, + retain: Bool + ) { + self.topicName = topicName + self.payload = payload + self.qos = qos + self.retain = retain + } + } + +} + +public extension MQTTManager { + /// Create the live manager. + /// + static func live( + client: MQTTClient, + cleanSession: Bool = false, + logger: Logger? = nil, + alwaysReconnect: Bool = true + ) -> Self { + let manager = ConnectionManager( + client: client, + logger: logger, + alwaysReconnect: alwaysReconnect + ) + return .init( + connect: { try await manager.connect(cleanSession: cleanSession) }, + connectionStream: { + MQTTConnectionStream(client: client, logger: logger) + .start() + .removeDuplicates() + .eraseToStream() + }, + _listen: { topics, qos in + try await manager.listen(to: topics, qos: qos) + }, + publish: { request in + let topic = request.topicName + guard client.isActive() else { + logger?.debug("Client is not active, unable to publish to topic: \(topic)") + return + } + logger?.trace("Begin publishing to topic: \(topic)") + defer { logger?.trace("Done publishing to topic: \(topic)") } + try await client.publish( + to: request.topicName, + payload: request.payload, + qos: request.qos, + retain: request.retain + ) + }, + shutdown: { + manager.shutdown() + }, + _withClient: { callback in + try await callback(client) + } + ) + } +} + +extension MQTTManager: TestDependencyKey { + public static let testValue: MQTTManager = Self() +} diff --git a/Sources/MQTTManager/Internal/ConnectionManager.swift b/Sources/MQTTManager/Internal/ConnectionManager.swift new file mode 100644 index 0000000..5b40817 --- /dev/null +++ b/Sources/MQTTManager/Internal/ConnectionManager.swift @@ -0,0 +1,98 @@ +import Foundation +import Logging +import MQTTNIO + +actor ConnectionManager { + private let client: MQTTClient + private let logger: Logger? + private let name: String + private let shouldReconnect: Bool + private var hasConnected: Bool = false + private var listeners: [TopicListenerStream] = [] + private var isShuttingDown = false + + init( + client: MQTTClient, + logger: Logger?, + alwaysReconnect: Bool + ) { + var logger = logger + logger?[metadataKey: "instance"] = "\(Self.self)" + self.logger = logger + + self.client = client + self.name = UUID().uuidString + self.shouldReconnect = alwaysReconnect + } + + deinit { + if !isShuttingDown { + let message = """ + Did not properly close the connection manager. This can lead to + dangling references. + + Please call `shutdown` to properly close all connections and listener streams. + """ + logger?.warning("\(message)") + self.shutdown() + } + } + + private func setHasConnected() { + hasConnected = true + } + + func listen( + to topics: [String], + qos: MQTTQoS + ) async throws -> MQTTManager.ListenStream { + let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos) + listeners.append(listener) + await listener.start() + return listener.stream + } + + func connect( + cleanSession: Bool + ) async throws { + guard !hasConnected else { return } + do { + try await client.connect(cleanSession: cleanSession) + setHasConnected() + + client.addCloseListener(named: name) { [weak self] _ in + guard let `self` else { return } + self.logger?.debug("Connection closed.") + if self.shouldReconnect { + self.logger?.debug("Reconnecting...") + Task { + try await self.connect(cleanSession: cleanSession) + } + } + } + + client.addShutdownListener(named: name) { _ in + self.shutdown() + } + + } catch { + logger?.trace("Failed to connect: \(error)") + throw error + } + } + + private func shutdownListeners() { + _ = listeners.map { $0.shutdown() } + listeners = [] + isShuttingDown = true + } + + nonisolated func shutdown(withLogging: Bool = true) { + if withLogging { + logger?.trace("Shutting down connection.") + } + client.removeCloseListener(named: name) + client.removeShutdownListener(named: name) + Task { await shutdownListeners() } + } +} diff --git a/Sources/MQTTManager/Internal/ConnectionStream.swift b/Sources/MQTTManager/Internal/ConnectionStream.swift new file mode 100644 index 0000000..ad97dc7 --- /dev/null +++ b/Sources/MQTTManager/Internal/ConnectionStream.swift @@ -0,0 +1,74 @@ +import Foundation +import Logging +import MQTTNIO + +@_spi(Internal) +public actor MQTTConnectionStream: Sendable { + + public typealias Element = MQTTManager.Event + + private let client: MQTTClient + private let continuation: AsyncStream.Continuation + private let logger: Logger? + nonisolated let name: String + private let stream: AsyncStream + private var isShuttingDown = false + + public init(client: MQTTClient, logger: Logger?) { + var logger = logger + logger?[metadataKey: "type"] = "\(Self.self)" + self.logger = logger + + let (stream, continuation) = AsyncStream.makeStream() + self.client = client + self.continuation = continuation + self.name = UUID().uuidString + self.stream = stream + } + + deinit { stop() } + + public nonisolated func start() -> AsyncStream { + // Check if the client is active and yield the initial result. + continuation.yield(client.isActive() ? .connected : .disconnected) + + // Continually check if the client is active. + let task = Task { + let isShuttingDown = await self.isShuttingDown + while !Task.isCancelled, !isShuttingDown { + try await Task.sleep(for: .milliseconds(100)) + continuation.yield(client.isActive() ? .connected : .disconnected) + } + } + + // Register listener on the client for when the connection + // closes. + client.addCloseListener(named: name) { _ in + self.logger?.trace("Client has disconnected.") + self.continuation.yield(.disconnected) + } + + // Register listener on the client for when the client + // is shutdown. + client.addShutdownListener(named: name) { _ in + self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)") + self.continuation.yield(.shuttingDown) + Task { await self.setIsShuttingDown() } + task.cancel() + self.stop() + } + + return stream + } + + private func setIsShuttingDown() { + isShuttingDown = true + } + + public nonisolated func stop() { + client.removeCloseListener(named: name) + client.removeShutdownListener(named: name) + continuation.finish() + } + +} diff --git a/Sources/MQTTManager/Internal/TopicListenerStream.swift b/Sources/MQTTManager/Internal/TopicListenerStream.swift new file mode 100644 index 0000000..4db44c7 --- /dev/null +++ b/Sources/MQTTManager/Internal/TopicListenerStream.swift @@ -0,0 +1,177 @@ +import Foundation +import Logging +import MQTTNIO + +actor TopicListenerStream { + + typealias Stream = MQTTManager.ListenStream + + private let client: MQTTClient + private let configuration: Configuration + private let continuation: Stream.Continuation + private let logger: Logger? + private let name: String + let stream: Stream + private var shuttingDown: Bool = false + private var onShutdownHandler: (@Sendable () -> Void)? + + init( + client: MQTTClient, + logger: Logger?, + topics: [String], + qos: MQTTQoS + ) { + // Setup the logger so we can more easily decipher log messages. + var logger = logger + logger?[metadataKey: "type"] = "\(Self.self)" + self.logger = logger + + let (stream, continuation) = Stream.makeStream() + self.client = client + self.configuration = .init(qos: qos, topics: topics) + self.continuation = continuation + self.name = UUID().uuidString + self.stream = stream + } + + struct Configuration: Sendable { + let qos: MQTTQoS + let topics: [String] + } + + deinit { + if !shuttingDown { + let message = """ + Shutdown was not called on topic listener. This could lead to potential errors or + the stream never ending. + + Please ensure that you call shutdown on the listener. + """ + client.logger.warning("\(message)") + continuation.finish() + } + client.removePublishListener(named: name) + client.removeShutdownListener(named: name) + } + + private func subscribe() async throws { + guard !shuttingDown else { return } + logger?.debug("Begin subscribing to topics.") + do { + _ = try await client.subscribe(to: configuration.topics.map { + MQTTSubscribeInfo(topicFilter: $0, qos: configuration.qos) + }) + } catch { + logger?.error("Received error while subscribing to topics: \(configuration.topics)") + throw TopicListenerError.failedToSubscribe + } + logger?.debug("Done subscribing to topics.") + } + + public func start() { + logger?.trace("Starting listener for topics: \(configuration.topics)") + + let stream = MQTTConnectionStream(client: client, logger: logger) + .start() + .removeDuplicates() + .eraseToStream() + + let task = Task { + // Listen for connection events to restablish the stream upon a + // client becoming disconnected / reconnected, and properly shutdown + // the stream on the client being shutdown. + for await event in stream { + logger?.trace("Received event: \(event)") + switch event { + case .shuttingDown: + shutdown() + case .disconnected: + try await Task.sleep(for: .milliseconds(100)) + case .connected: + try await subscribe() + client.addPublishListener(named: name) { result in + switch result { + case let .failure(error): + self.logger?.error("Received error while listening: \(error)") + case let .success(publishInfo): + if self.configuration.topics.contains(publishInfo.topicName) { + self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)") + self.continuation.yield(publishInfo) + } + } + } + } + } + } + + onShutdownHandler = { task.cancel() } + } + + // TODO: remove. + func listen( + _ topics: [String], + _ qos: MQTTQoS = .atLeastOnce + ) async throws -> Stream { + var sleepTimes = 0 + + while !client.isActive() { + guard sleepTimes < 10 else { + throw TopicListenerError.connectionTimeout + } + try? await Task.sleep(for: .milliseconds(100)) + sleepTimes += 1 + } + + client.logger.trace("Client is active, begin subscribing to topics.") + + try await subscribe() + + client.logger.trace("Done subscribing, begin listening to topics.") + + client.addPublishListener(named: name) { result in + switch result { + case let .failure(error): + self.logger?.error("Received error while listening: \(error)") + case let .success(publishInfo): + if topics.contains(publishInfo.topicName) { + self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)") + self.continuation.yield(publishInfo) + } + } + } + + return stream + } + + private func setIsShuttingDown() { + shuttingDown = true + onShutdownHandler = nil + } + + public nonisolated func shutdown() { + client.logger.trace("Closing topic listener...") + continuation.finish() + client.removePublishListener(named: name) + client.removeShutdownListener(named: name) + Task { + await onShutdownHandler?() + await self.setIsShuttingDown() + } + } + +} + +// MARK: - Errors + +public enum TopicListenerError: Error { + case connectionTimeout + case failedToSubscribe +} + +public struct MQTTListenResultError: Error { + let underlyingError: any Error + + init(_ underlyingError: any Error) { + self.underlyingError = underlyingError + } +} diff --git a/Sources/SensorsService/Helpers.swift b/Sources/SensorsService/Helpers.swift deleted file mode 100755 index 5c5940a..0000000 --- a/Sources/SensorsService/Helpers.swift +++ /dev/null @@ -1,42 +0,0 @@ -import NIO -import NIOFoundationCompat -import PsychrometricClient - -/// Represents a type that can be initialized by a ``ByteBuffer``. -protocol BufferInitalizable { - init?(buffer: inout ByteBuffer) -} - -extension Double: BufferInitalizable { - - /// Attempt to create / parse a double from a byte buffer. - init?(buffer: inout ByteBuffer) { - guard let string = buffer.readString( - length: buffer.readableBytes, - encoding: String.Encoding.utf8 - ) - else { return nil } - self.init(string) - } -} - -extension Tagged: BufferInitalizable where RawValue: BufferInitalizable { - init?(buffer: inout ByteBuffer) { - guard let value = RawValue(buffer: &buffer) else { return nil } - self.init(value) - } -} - -extension Humidity: BufferInitalizable { - init?(buffer: inout ByteBuffer) { - guard let value = Double(buffer: &buffer) else { return nil } - self.init(value) - } -} - -extension Temperature: BufferInitalizable { - init?(buffer: inout ByteBuffer) { - guard let value = Double(buffer: &buffer) else { return nil } - self.init(value) - } -} diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index e45fe00..460a96e 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -3,12 +3,11 @@ import DependenciesMacros import Foundation import Logging import Models -import MQTTConnectionManager +import MQTTManager import MQTTNIO import NIO import PsychrometricClient import ServiceLifecycle -import TopicDependencies /// Service that is responsible for listening to changes of the temperature and humidity /// sensors, then publishing back the calculated dew-point temperature and enthalpy for @@ -17,9 +16,7 @@ import TopicDependencies /// public actor SensorsService: Service { - @Dependency(\.mqttConnectionManager.stream) var connectionStream - @Dependency(\.topicListener) var topicListener - @Dependency(\.topicPublisher) var topicPublisher + @Dependency(\.mqtt) var mqtt /// The logger to use for the service. private let logger: Logger? @@ -27,12 +24,9 @@ public actor SensorsService: Service { /// The sensors that we are listening for updates to, so /// that we can calculate the dew-point temperature and enthalpy /// values to publish back to the MQTT broker. - var sensors: [TemperatureAndHumiditySensor] + private var sensors: [TemperatureAndHumiditySensor] - @_spi(Internal) - public var isListening: Bool = false - - var topics: [String] { + private var topics: [String] { sensors.reduce(into: [String]()) { array, sensor in array.append(sensor.topics.temperature) array.append(sensor.topics.humidity) @@ -60,33 +54,16 @@ public actor SensorsService: Service { public func run() async throws { precondition(sensors.count > 0, "Sensors should not be empty.") - try await withGracefulShutdownHandler { - // Listen for connection events, so that we can automatically - // reconnect any sensor topics we're listening to upon a disconnect / reconnect - // event. We can also shutdown any topic listeners upon a shutdown event. - for await event in try connectionStream().cancelOnGracefulShutdown() { - switch event { - case .shuttingDown: - logger?.debug("Received shutdown event.") - isListening = false - try await self.shutdown() - case .disconnected: - logger?.debug("Received disconnected event.") - isListening = false - try await Task.sleep(for: .milliseconds(100)) - case .connected: - logger?.debug("Received connected event.") - let stream = try await makeStream() - isListening = true - for await result in stream.cancelOnGracefulShutdown() { - logger?.debug("Received result for topic: \(result.topic)") - await self.handleResult(result) - } - } + let stream = try await makeStream() + + await withGracefulShutdownHandler { + for await result in stream.cancelOnGracefulShutdown() { + logger?.debug("Received result for topic: \(result.topic)") + await handleResult(result) } } onGracefulShutdown: { + self.logger?.debug("Received graceful shutdown.") Task { - self.logger?.debug("Received graceful shutdown.") try await self.shutdown() } } @@ -95,24 +72,13 @@ public actor SensorsService: Service { @_spi(Internal) public func shutdown() async throws { try await publishUpdates() - topicListener.shutdown() } private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> { - try await topicListener.listen(to: topics) - // ignore errors, so that we continue to listen, but log them - // for debugging purposes. - .compactMap { result in - switch result { - case let .failure(error): - self.logger?.debug("Received error listening for sensors: \(error)") - return nil - case let .success(info): - return (info.payload, info.topicName) - } - } - // ignore duplicate values, to prevent publishing dew-point and enthalpy - // changes to frequently. + // ignore duplicate values, to prevent publishing dew-point and enthalpy + // changes to frequently. + try await mqtt.listen(to: topics) + .map { ($0.payload, $0.topicName) } .removeDuplicates { lhs, rhs in lhs.buffer == rhs.buffer && lhs.topic == rhs.topic @@ -127,8 +93,7 @@ public actor SensorsService: Service { logger?.debug("Begin handling result for topic: \(topic)") func decode(_: V.Type) -> V? { - var buffer = result.buffer - return V(buffer: &buffer) + return V(buffer: result.buffer) } if topic.contains("temperature") { @@ -159,9 +124,9 @@ public actor SensorsService: Service { private func publish(_ double: Double?, to topic: String) async throws { guard let double else { return } - try await topicPublisher.publish( + try await mqtt.publish( + ByteBufferAllocator().buffer(string: "\(double)"), to: topic, - payload: ByteBufferAllocator().buffer(string: "\(double)"), qos: .exactlyOnce, retain: true ) @@ -210,3 +175,38 @@ private extension Array where Element == TemperatureAndHumiditySensor { self[index].needsProcessed = false } } + +/// Represents a type that can be initialized by a ``ByteBuffer``. +protocol BufferInitalizable { + init?(buffer: ByteBuffer) +} + +extension Double: BufferInitalizable { + + /// Attempt to create / parse a double from a byte buffer. + init?(buffer: ByteBuffer) { + let string = String(buffer: buffer) + self.init(string) + } +} + +extension Tagged: BufferInitalizable where RawValue: BufferInitalizable { + init?(buffer: ByteBuffer) { + guard let value = RawValue(buffer: buffer) else { return nil } + self.init(value) + } +} + +extension Humidity: BufferInitalizable { + init?(buffer: ByteBuffer) { + guard let value = Double(buffer: buffer) else { return nil } + self.init(value) + } +} + +extension Temperature: BufferInitalizable { + init?(buffer: ByteBuffer) { + guard let value = Double(buffer: buffer) else { return nil } + self.init(value) + } +} diff --git a/Sources/TopicDependencies/TopicListener.swift b/Sources/TopicDependencies/TopicListener.swift deleted file mode 100644 index 6a50059..0000000 --- a/Sources/TopicDependencies/TopicListener.swift +++ /dev/null @@ -1,186 +0,0 @@ -import Dependencies -import DependenciesMacros -import Foundation -import MQTTNIO - -/// A dependency that can generate an async stream of changes to the given topics. -/// -/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient -/// to generate the live dependency. -@DependencyClient -public struct TopicListener: Sendable { - - public typealias Stream = AsyncStream> - - /// Create an async stream that listens for changes to the given topics. - private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream - - /// Shutdown the listener stream. - public var shutdown: @Sendable () -> Void - - /// Create a new topic listener. - /// - /// - Parameters: - /// - listen: Generate an async stream of changes for the given topics. - /// - shutdown: Shutdown the topic listener stream. - public init( - listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream, - shutdown: @Sendable @escaping () -> Void - ) { - self._listen = listen - self.shutdown = shutdown - } - - /// Create an async stream that listens for changes to the given topics. - /// - /// - Parameters: - /// - topics: The topics to listen for changes to. - /// - qos: The MQTTQoS for the subscription. - public func listen( - to topics: [String], - qos: MQTTQoS = .atLeastOnce - ) async throws -> Stream { - try await _listen(topics, qos) - } - - /// Create an async stream that listens for changes to the given topics. - /// - /// - Parameters: - /// - topics: The topics to listen for changes to. - /// - qos: The MQTTQoS for the subscription. - public func listen( - _ topics: String..., - qos: MQTTQoS = .atLeastOnce - ) async throws -> Stream { - try await listen(to: topics, qos: qos) - } - - /// Create the live implementation of the topic listener with the given MQTTClient. - /// - /// - Parameters: - /// - client: The MQTTClient to use. - public static func live(client: MQTTClient) -> Self { - let listener = MQTTTopicListener(client: client) - return .init( - listen: { try await listener.listen($0, $1) }, - shutdown: { listener.shutdown() } - ) - } -} - -extension TopicListener: TestDependencyKey { - public static var testValue: TopicListener { Self() } -} - -public extension DependencyValues { - var topicListener: TopicListener { - get { self[TopicListener.self] } - set { self[TopicListener.self] = newValue } - } -} - -// MARK: - Helpers - -private actor MQTTTopicListener { - - private let client: MQTTClient - private let continuation: TopicListener.Stream.Continuation - private let name: String - let stream: TopicListener.Stream - private var shuttingDown: Bool = false - - init( - client: MQTTClient - ) { - let (stream, continuation) = TopicListener.Stream.makeStream() - self.client = client - self.continuation = continuation - self.name = UUID().uuidString - self.stream = stream - } - - deinit { - if !shuttingDown { - let message = """ - Shutdown was not called on topic listener. This could lead to potential errors or - the stream never ending. - - Please ensure that you call shutdown on the listener. - """ - client.logger.warning("\(message)") - continuation.finish() - } - client.removePublishListener(named: name) - client.removeShutdownListener(named: name) - } - - func listen( - _ topics: [String], - _ qos: MQTTQoS = .atLeastOnce - ) async throws -> TopicListener.Stream { - var sleepTimes = 0 - - while !client.isActive() { - guard sleepTimes < 10 else { - throw TopicListenerError.connectionTimeout - } - try? await Task.sleep(for: .milliseconds(100)) - sleepTimes += 1 - } - - client.logger.trace("Client is active, begin subscribing to topics.") - - let subscription = try? await client.subscribe(to: topics.map { - MQTTSubscribeInfo(topicFilter: $0, qos: qos) - }) - - guard subscription != nil else { - client.logger.error("Error subscribing to topics: \(topics)") - throw TopicListenerError.failedToSubscribe - } - - client.logger.trace("Done subscribing, begin listening to topics.") - - client.addPublishListener(named: name) { result in - switch result { - case let .failure(error): - self.client.logger.error("Received error while listening: \(error)") - self.continuation.yield(.failure(MQTTListenResultError(error))) - case let .success(publishInfo): - if topics.contains(publishInfo.topicName) { - self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)") - self.continuation.yield(.success(publishInfo)) - } - } - } - - return stream - } - - private func setIsShuttingDown() { - shuttingDown = true - } - - nonisolated func shutdown() { - client.logger.trace("Closing topic listener...") - continuation.finish() - client.removePublishListener(named: name) - client.removeShutdownListener(named: name) - Task { await self.setIsShuttingDown() } - } -} - -// MARK: - Errors - -public enum TopicListenerError: Error { - case connectionTimeout - case failedToSubscribe -} - -public struct MQTTListenResultError: Error { - let underlyingError: any Error - - init(_ underlyingError: any Error) { - self.underlyingError = underlyingError - } -} diff --git a/Sources/TopicDependencies/TopicPublisher.swift b/Sources/TopicDependencies/TopicPublisher.swift deleted file mode 100644 index 77da633..0000000 --- a/Sources/TopicDependencies/TopicPublisher.swift +++ /dev/null @@ -1,117 +0,0 @@ -import Dependencies -import DependenciesMacros -import MQTTNIO -import NIO - -/// A dependency that is responsible for publishing values to an MQTT broker. -/// -/// - Note: This dependency only conforms to `TestDependencyKey` because it -/// requires an active `MQTTClient` to generate the live dependency. -@DependencyClient -public struct TopicPublisher: Sendable { - - private var _publish: @Sendable (PublishRequest) async throws -> Void - - /// Create a new topic publisher. - /// - /// - Parameters: - /// - publish: Handle the publish request. - public init( - publish: @Sendable @escaping (PublishRequest) async throws -> Void - ) { - self._publish = publish - } - - /// Publish a new value to the given topic. - /// - /// - Parameters: - /// - topicName: The topic to publish the new value to. - /// - payload: The value to publish. - /// - qos: The MQTTQoS. - /// - retain: The retain flag. - public func publish( - to topicName: String, - payload: ByteBuffer, - qos: MQTTQoS, - retain: Bool = false - ) async throws { - try await _publish(.init( - topicName: topicName, - payload: payload, - qos: qos, - retain: retain - )) - } - - /// Create the live topic publisher with the given `MQTTClient`. - /// - /// - Parameters: - /// - client: The mqtt broker client to use. - public static func live(client: MQTTClient) -> Self { - .init( - publish: { request in - guard client.isActive() else { - client.logger.trace("Client is not connected, unable to publish to \(request.topicName)") - return - } - client.logger.trace("Begin publishing to topic: \(request.topicName)") - defer { client.logger.trace("Done publishing to topic: \(request.topicName)") } - try await client.publish( - to: request.topicName, - payload: request.payload, - qos: request.qos, - retain: request.retain - ) - } - ) - } - - /// Represents the parameters required to publish a new value to the - /// MQTT broker. - public struct PublishRequest: Equatable, Sendable { - - /// The topic to publish the new value to. - public let topicName: String - - /// The value to publish. - public let payload: ByteBuffer - - /// The qos of the request. - public let qos: MQTTQoS - - /// The retain flag for the request. - public let retain: Bool - - /// Create a new publish request. - /// - /// - Parameters: - /// - topicName: The topic to publish to. - /// - payload: The value to publish. - /// - qos: The qos of the request. - /// - retain: The retain flag of the request. - public init( - topicName: String, - payload: ByteBuffer, - qos: MQTTQoS, - retain: Bool - ) { - self.topicName = topicName - self.payload = payload - self.qos = qos - self.retain = retain - } - } -} - -extension TopicPublisher: TestDependencyKey { - public static var testValue: TopicPublisher { Self() } -} - -public extension DependencyValues { - - /// A dependency that is responsible for publishing values to an MQTT broker. - var topicPublisher: TopicPublisher { - get { self[TopicPublisher.self] } - set { self[TopicPublisher.self] = newValue } - } -} diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift index 1a4125c..bcd9651 100644 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift @@ -1,8 +1,8 @@ import AsyncAlgorithms import Logging import Models -@_spi(Internal) import MQTTConnectionManager import MQTTConnectionService +@_spi(Internal) import MQTTManager import MQTTNIO import NIO import ServiceLifecycle @@ -22,15 +22,16 @@ final class MQTTConnectionServiceTests: XCTestCase { // TODO: Move to integration tests. func testMQTTConnectionStream() async throws { let client = createClient(identifier: "testNonManagedStream") - let manager = MQTTConnectionManager.live( + let manager = MQTTManager.live( client: client, logger: Self.logger, alwaysReconnect: false ) + defer { manager.shutdown() } let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger) let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger) - var events1 = [MQTTConnectionManager.Event]() - var events2 = [MQTTConnectionManager.Event]() + var events1 = [MQTTManager.Event]() + var events2 = [MQTTManager.Event]() let stream1 = connectionStream1.start() let stream2 = connectionStream2.start() diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/SensorsServiceTests/SensorsClientTests.swift index 0006e21..8f6ec36 100755 --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/SensorsServiceTests/SensorsClientTests.swift @@ -1,12 +1,11 @@ import Dependencies import Logging import Models -@_spi(Internal) import MQTTConnectionManager +@_spi(Internal) import MQTTManager import MQTTNIO import NIO import PsychrometricClientLive @_spi(Internal) import SensorsService -import TopicDependencies import XCTest final class SensorsClientTests: XCTestCase { @@ -23,25 +22,24 @@ final class SensorsClientTests: XCTestCase { let client = createClient(identifier: "\(Self.self)") withDependencies { - $0.mqttConnectionManager = .live(client: client, logger: Self.logger) + $0.mqtt = .live(client: client, logger: Self.logger) $0.psychrometricClient = PsychrometricClient.liveValue - $0.topicListener = .live(client: client) - $0.topicPublisher = .live(client: client) } operation: { super.invokeTest() } } func testListeningResumesAfterDisconnectThenReconnect() async throws { - @Dependency(\.mqttConnectionManager) var manager struct TimeoutError: Error {} let sensor = TemperatureAndHumiditySensor(location: .return) - var results = [TopicPublisher.PublishRequest]() + let results = ResultContainer() try await withDependencies { - $0.topicPublisher = .capturing { results.append($0) } + $0.mqtt.publish = results.append } operation: { + @Dependency(\.mqtt) var manager + let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger) let task = Task { try await sensorsService.run() } defer { task.cancel() } @@ -58,9 +56,7 @@ final class SensorsClientTests: XCTestCase { } // Give time to re-subscribe. - while !(await sensorsService.isListening) { - try await Task.sleep(for: .milliseconds(100)) - } + try await Task.sleep(for: .milliseconds(200)) try await client.publish( to: sensor.topics.temperature, @@ -77,7 +73,7 @@ final class SensorsClientTests: XCTestCase { } var timeoutCount = 0 - while !(results.count == 2) { + while !(await results.count == 2) { guard timeoutCount < 20 else { throw TimeoutError() } @@ -85,6 +81,8 @@ final class SensorsClientTests: XCTestCase { timeoutCount += 1 } + let results = await results.results() + XCTAssertEqual(results.count, 2) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint })) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy })) @@ -122,63 +120,23 @@ final class SensorsClientTests: XCTestCase { // MARK: Helpers for tests. -class PublishInfoContainer { - private(set) var info: [MQTTPublishInfo] - private var topicFilters: [String]? - - init(topicFilters: [String]? = nil) { - self.info = [] - self.topicFilters = topicFilters - } - - func addPublishInfo(_ info: MQTTPublishInfo) async { - guard let topicFilters else { - self.info.append(info) - return - } - if topicFilters.contains(info.topicName) { - self.info.append(info) - } - } -} - -extension TopicPublisher { - static func capturing( - _ callback: @escaping (PublishRequest) -> Void - ) -> Self { - .init { callback($0) } - } -} - -// extension SensorsClient { -// -// static func testing( -// yielding: [(value: Double, to: String)], -// capturePublishedValues: @escaping (Double, String) -> Void, -// captureShutdownEvent: @escaping (Bool) -> Void -// ) -> Self { -// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self) -// let logger = Logger(label: "\(Self.self).testing") -// -// return .init( -// listen: { topics in -// for (value, topic) in yielding where topics.contains(topic) { -// continuation.yield( -// (buffer: ByteBuffer(string: "\(value)"), topic: topic) -// ) -// } -// return stream -// }, -// logger: logger, -// publish: { value, topic in -// capturePublishedValues(value, topic) -// }, -// shutdown: { -// captureShutdownEvent(true) -// continuation.finish() -// } -// ) -// } -// } - struct TopicNotFoundError: Error {} + +actor ResultContainer: Sendable { + + private var storage = [MQTTManager.PublishRequest]() + + init() {} + + @Sendable func append(_ result: MQTTManager.PublishRequest) async { + storage.append(result) + } + + var count: Int { + get async { storage.count } + } + + func results() async -> [MQTTManager.PublishRequest] { + storage + } +}