From 90c5b7c77f06d8027402e3dcbaf427ee07c0c482 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Sat, 9 Nov 2024 09:03:31 -0500 Subject: [PATCH] feat: Adds MQTTConnectionService --- Package.swift | 41 ++++++++++---- .../MQTTConnectionService.swift | 56 +++++++++++++++++++ .../Models/TemperatureAndHumiditySensor.swift | 11 +--- Sources/SensorsService/SensorsService.swift | 5 +- 4 files changed, 92 insertions(+), 21 deletions(-) create mode 100644 Sources/MQTTConnectionService/MQTTConnectionService.swift diff --git a/Package.swift b/Package.swift index a18f0e8..c3ce95e 100755 --- a/Package.swift +++ b/Package.swift @@ -2,6 +2,10 @@ import PackageDescription +let swiftSettings: [SwiftSetting] = [ + .enableExperimentalFeature("StrictConcurrency") +] + let package = Package( name: "dewPoint-controller", platforms: [ @@ -47,7 +51,8 @@ let package = Package( "Models", .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "NIO", package: "swift-nio") - ] + ], + swiftSettings: swiftSettings ), .target( name: "DewPointEnvironment", @@ -56,17 +61,20 @@ let package = Package( "Client", "Models", .product(name: "MQTTNIO", package: "mqtt-nio") - ] + ], + swiftSettings: swiftSettings ), .target( name: "EnvVars", - dependencies: [] + dependencies: [], + swiftSettings: swiftSettings ), .target( name: "Models", dependencies: [ .product(name: "Psychrometrics", package: "swift-psychrometrics") - ] + ], + swiftSettings: swiftSettings ), .target( name: "Client", @@ -75,7 +83,8 @@ let package = Package( .product(name: "CoreUnitTypes", package: "swift-psychrometrics"), .product(name: "NIO", package: "swift-nio"), .product(name: "Psychrometrics", package: "swift-psychrometrics") - ] + ], + swiftSettings: swiftSettings ), .target( name: "ClientLive", @@ -84,7 +93,8 @@ let package = Package( "EnvVars", .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") - ] + ], + swiftSettings: swiftSettings ), .testTarget( name: "ClientTests", @@ -94,10 +104,13 @@ let package = Package( ] ), .target( - name: "TopicsLive", + name: "MQTTConnectionService", dependencies: [ - "Models" - ] + "EnvVars", + .product(name: "MQTTNIO", package: "mqtt-nio"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") + ], + swiftSettings: swiftSettings ), .target( name: "SensorsService", @@ -105,7 +118,8 @@ let package = Package( "Models", .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") - ] + ], + swiftSettings: swiftSettings ), .testTarget( name: "SensorsServiceTests", @@ -114,6 +128,13 @@ let package = Package( // TODO: Remove. "ClientLive" ] + ), + .target( + name: "TopicsLive", + dependencies: [ + "Models" + ], + swiftSettings: swiftSettings ) ] ) diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift new file mode 100644 index 0000000..37cc1b5 --- /dev/null +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -0,0 +1,56 @@ +import EnvVars +import Logging +import MQTTNIO +import NIO +import ServiceLifecycle + +/// Manages the MQTT broker connection. +public actor MQTTConnectionService: Service { + private let cleanSession: Bool + public let client: MQTTClient + private var shuttingDown = false + var logger: Logger { client.logger } + + public init( + cleanSession: Bool = true, + client: MQTTClient + ) { + self.cleanSession = cleanSession + self.client = client + } + + /// The entry-point of the service. + /// + /// This method connects to the MQTT broker and manages the connection. + /// It will attempt to gracefully shutdown the connection upon receiving + /// `sigterm` signals. + public func run() async throws { + await withGracefulShutdownHandler { + await self.connect() + } onGracefulShutdown: { + Task { await self.shutdown() } + } + } + + private func shutdown() async { + shuttingDown = true + try? await client.disconnect() + try? await client.shutdown() + } + + private func connect() async { + do { + try await client.connect(cleanSession: cleanSession) + client.addCloseListener(named: "SensorsClient") { [self] _ in + Task { + self.logger.debug("Connection closed.") + self.logger.debug("Reconnecting...") + await self.connect() + } + } + logger.debug("Connection successful.") + } catch { + logger.trace("Connection Failed.\n\(error)") + } + } +} diff --git a/Sources/Models/TemperatureAndHumiditySensor.swift b/Sources/Models/TemperatureAndHumiditySensor.swift index 1fde3f7..f1675b2 100644 --- a/Sources/Models/TemperatureAndHumiditySensor.swift +++ b/Sources/Models/TemperatureAndHumiditySensor.swift @@ -3,6 +3,7 @@ /// Represents a temperature and humidity sensor that can be used to derive /// the dew-point temperature and enthalpy values. /// +/// > Note: Temperature values are received in `celsius`. public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable, @unchecked Sendable { /// The identifier of the sensor, same as the location. @@ -25,9 +26,6 @@ public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable, @ /// The topics to listen for updated sensor values. public let topics: Topics - /// The psychrometric units of the sensor. - public let units: PsychrometricEnvironment.Units - /// Create a new temperature and humidity sensor. /// /// - Parameters: @@ -36,21 +34,18 @@ public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable, @ /// - temperature: The current temperature value of the sensor. /// - humidity: The current relative humidity value of the sensor. /// - needsProcessed: If the sensor needs to be processed. - /// - units: The unit of measure for the sensor. public init( location: Location, altitude: Length = .feet(800.0), temperature: Temperature? = nil, humidity: RelativeHumidity? = nil, needsProcessed: Bool = false, - units: PsychrometricEnvironment.Units = .imperial, topics: Topics? = nil ) { self.altitude = altitude self.location = location self._temperature = TrackedChanges(wrappedValue: temperature, needsProcessed: needsProcessed) self._humidity = TrackedChanges(wrappedValue: humidity, needsProcessed: needsProcessed) - self.units = units self.topics = topics ?? .init(location: location) } @@ -61,7 +56,7 @@ public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable, @ !temperature.rawValue.isNaN, !humidity.rawValue.isNaN else { return nil } - return .init(dryBulb: temperature, humidity: humidity, units: units) + return .init(dryBulb: temperature, humidity: humidity) } /// The calculated enthalpy of the sensor. @@ -71,7 +66,7 @@ public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable, @ !temperature.rawValue.isNaN, !humidity.rawValue.isNaN else { return nil } - return .init(dryBulb: temperature, humidity: humidity, altitude: altitude, units: units) + return .init(dryBulb: temperature, humidity: humidity, altitude: altitude) } /// Check whether any of the sensor values have changed and need processed. diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index 7aecbfc..e133170 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -1,6 +1,6 @@ import Foundation import Logging -@preconcurrency import Models +import Models @preconcurrency import MQTTNIO import NIO import Psychrometrics @@ -102,13 +102,12 @@ public actor SensorsService: Service { // MARK: - Errors struct DecodingError: Error {} +struct MQTTClientNotConnected: Error {} struct NotFoundError: Error {} struct SensorExists: Error {} // MARK: - Helpers -struct MQTTClientNotConnected: Error {} - private extension TemperatureAndHumiditySensor.Topics { func contains(_ topic: String) -> Bool { temperature == topic || humidity == topic