diff --git a/Package.swift b/Package.swift index eba01c6..120730b 100755 --- a/Package.swift +++ b/Package.swift @@ -16,6 +16,7 @@ let package = Package( .executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]), .library(name: "Models", targets: ["Models"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), + .library(name: "SensorsClientLive", targets: ["SensorsClientLive"]), .library(name: "SensorsService", targets: ["SensorsService"]) ], dependencies: [ @@ -33,6 +34,7 @@ let package = Package( "Models", "MQTTConnectionService", "SensorsService", + "SensorsClientLive", .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "NIO", package: "swift-nio"), .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") @@ -66,6 +68,15 @@ let package = Package( .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") ] ), + .target( + name: "SensorsClientLive", + dependencies: [ + "SensorsService", + .product(name: "Dependencies", package: "swift-dependencies"), + .product(name: "MQTTNIO", package: "mqtt-nio") + ], + swiftSettings: swiftSettings + ), .target( name: "SensorsService", dependencies: [ diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index c499713..b7e123d 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -1,22 +1,18 @@ -@preconcurrency import Foundation +import Foundation import Logging import Models import MQTTNIO import NIO import ServiceLifecycle -// TODO: This may not need to be an actor. - /// Manages the MQTT broker connection. public actor MQTTConnectionService: Service { private let cleanSession: Bool - public let client: MQTTClient - private let continuation: AsyncStream.Continuation - public nonisolated let events: AsyncStream + private let client: MQTTClient private let internalEventStream: ConnectionStream nonisolated var logger: Logger { client.logger } - // private var shuttingDown = false + private let name: String public init( cleanSession: Bool = true, @@ -25,22 +21,19 @@ public actor MQTTConnectionService: Service { self.cleanSession = cleanSession self.client = client self.internalEventStream = .init() - let (stream, continuation) = AsyncStream.makeStream(of: Event.self) - self.events = stream - self.continuation = continuation + self.name = UUID().uuidString } deinit { self.logger.debug("MQTTConnectionService is gone.") self.internalEventStream.stop() - continuation.finish() } /// The entry-point of the service. /// /// This method connects to the MQTT broker and manages the connection. /// It will attempt to gracefully shutdown the connection upon receiving - /// `sigterm` signals. + /// a shutdown signals. public func run() async throws { await withGracefulShutdownHandler { await withDiscardingTaskGroup { group in @@ -50,11 +43,9 @@ public actor MQTTConnectionService: Service { } for await event in self.internalEventStream.events.cancelOnGracefulShutdown() { if event == .shuttingDown { - self.shutdown() break } self.logger.trace("Sending connection event: \(event)") - self.continuation.yield(event) } group.cancelAll() } @@ -66,33 +57,26 @@ public actor MQTTConnectionService: Service { func connect() async { do { - try await withThrowingDiscardingTaskGroup { group in - group.addTask { - try await self.client.connect(cleanSession: self.cleanSession) + try await client.connect(cleanSession: cleanSession) + client.addCloseListener(named: name) { _ in + Task { + self.logger.debug("Connection closed.") + self.logger.debug("Reconnecting...") + await self.connect() } - client.addCloseListener(named: "\(Self.self)") { _ in - Task { - self.logger.debug("Connection closed.") - self.logger.debug("Reconnecting...") - await self.connect() - } - } - self.logger.debug("Connection successful.") - self.continuation.yield(.connected) } + logger.debug("Connection successful.") } catch { logger.trace("Failed to connect: \(error)") - continuation.yield(.disconnected) } } private nonisolated func shutdown() { logger.debug("Begin shutting down MQTT broker connection.") - client.removeCloseListener(named: "\(Self.self)") + client.removeCloseListener(named: name) internalEventStream.stop() _ = client.disconnect() try? client.syncShutdownGracefully() - continuation.finish() logger.info("MQTT broker connection closed.") } @@ -106,11 +90,8 @@ extension MQTTConnectionService { case shuttingDown } - // TODO: This functionality can probably move into the connection service. + private actor ConnectionStream: Sendable { - private final class ConnectionStream: Sendable { - - // private var cancellable: AnyCancellable? private let continuation: AsyncStream.Continuation let events: AsyncStream @@ -131,18 +112,9 @@ extension MQTTConnectionService { : .disconnected continuation.yield(event) -// cancellable = Timer.publish(every: 1.0, on: .main, in: .common) -// .autoconnect() -// .sink { [weak self] (_: Date) in -// let event: MQTTConnectionService.Event = connectionIsActive() -// ? .connected -// : .disconnected -// -// self?.continuation.yield(event) -// } } - func stop() { + nonisolated func stop() { continuation.yield(.shuttingDown) continuation.finish() } diff --git a/Sources/Models/TemperatureAndHumiditySensor.swift b/Sources/Models/TemperatureAndHumiditySensor.swift index e97f777..48a0f38 100644 --- a/Sources/Models/TemperatureAndHumiditySensor.swift +++ b/Sources/Models/TemperatureAndHumiditySensor.swift @@ -129,10 +129,10 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable { prefix = "\(prefix.dropLast())" } self.init( - dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state", - enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state", - humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state", - temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state" + dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state", + enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state", + humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state", + temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state" ) } } diff --git a/Sources/SensorsClientLive/Live.swift b/Sources/SensorsClientLive/Live.swift new file mode 100644 index 0000000..c1cf051 --- /dev/null +++ b/Sources/SensorsClientLive/Live.swift @@ -0,0 +1,127 @@ +import Dependencies +import Foundation +import MQTTNIO +import NIO +@_exported import SensorsService + +public extension SensorsClient { + + /// Creates the live implementation of the sensor client. + static func live( + client: MQTTClient, + publishQoS: MQTTQoS = .exactlyOnce, + subscribeQoS: MQTTQoS = .atLeastOnce + ) -> Self { + let listener = SensorClientListener( + client: client, + publishQoS: publishQoS, + subscribeQoS: subscribeQoS + ) + + return .init( + listen: { try await listener.listen($0) }, + logger: client.logger, + publish: { try await listener.publish($0, $1) }, + shutdown: { listener.shutdown() } + ) + } +} + +struct ConnectionTimeoutError: Error {} + +private actor SensorClientListener { + + let client: MQTTClient + private let continuation: AsyncStream.Continuation + let name: String + let publishQoS: MQTTQoS + let stream: AsyncStream + let subscribeQoS: MQTTQoS + + init( + client: MQTTClient, + publishQoS: MQTTQoS, + subscribeQoS: MQTTQoS + ) { + let (stream, continuation) = AsyncStream.makeStream() + self.client = client + self.continuation = continuation + self.name = UUID().uuidString + self.publishQoS = publishQoS + self.stream = stream + self.subscribeQoS = subscribeQoS + } + + deinit { + client.logger.trace("Sensor listener is gone.") + self.client.removeCloseListener(named: name) + self.client.removePublishListener(named: name) + } + + func listen(_ topics: [String]) async throws -> AsyncStream { + client.logger.trace("Begin listen...") + // Ensure we are subscribed to the topics. + var sleepTimes = 0 + + while !client.isActive() { + guard sleepTimes < 10 else { + throw ConnectionTimeoutError() + } + try await Task.sleep(for: .milliseconds(100)) + sleepTimes += 1 + } + + client.logger.trace("Connection is active, begin listening for updates.") + client.logger.trace("Topics: \(topics)") + + _ = try await client.subscribe(to: topics.map { topic in + MQTTSubscribeInfo(topicFilter: topic, qos: subscribeQoS) + }) + + client.logger.trace("Done subscribing to topics.") + + client.addPublishListener(named: name) { result in + self.client.logger.trace("Received new result...") + switch result { + case let .failure(error): + self.client.logger.error("Received error while listening: \(error)") + case let .success(publishInfo): + // Only publish values back to caller if they are listening to a + // the topic. + if topics.contains(publishInfo.topicName) { + self.client.logger.trace("Recieved published info for: \(publishInfo.topicName)") + self.continuation.yield((buffer: publishInfo.payload, topic: publishInfo.topicName)) + } else { + self.client.logger.trace("Skipping topic: \(publishInfo.topicName)") + } + } + } + + client.addShutdownListener(named: name) { _ in + self.continuation.finish() + } + + return stream + } + + func publish(_ double: Double, _ topic: String) async throws { + // Ensure the client is active before publishing values. + guard client.isActive() else { return } + + // Round the double and publish. + let rounded = round(double * 100) / 100 + client.logger.trace("Begin publishing: \(rounded) to: \(topic)") + try await client.publish( + to: topic, + payload: ByteBufferAllocator().buffer(string: "\(rounded)"), + qos: publishQoS, + retain: true + ) + client.logger.trace("Begin publishing: \(rounded) to: \(topic)") + } + + nonisolated func shutdown() { + continuation.finish() + } + +} diff --git a/Sources/SensorsService/Helpers.swift b/Sources/SensorsService/Helpers.swift index f66311e..5c5940a 100755 --- a/Sources/SensorsService/Helpers.swift +++ b/Sources/SensorsService/Helpers.swift @@ -1,6 +1,3 @@ -import Logging -import Models -import MQTTNIO import NIO import NIOFoundationCompat import PsychrometricClient diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index 828cfd5..4518029 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -3,24 +3,40 @@ import DependenciesMacros import Foundation import Logging import Models -import MQTTConnectionService -@preconcurrency import MQTTNIO import NIO import PsychrometricClient import ServiceLifecycle +/// Represents the interface required for the sensor service to operate. +/// +/// This allows the dependency to be controlled for testing purposes and +/// not rely on an active MQTT broker connection. +/// +/// For the live implementation see ``SensorsClientLive`` module. +/// @DependencyClient public struct SensorsClient: Sendable { - public var listen: @Sendable ([String]) async throws -> AsyncStream + public typealias PublishInfo = (buffer: ByteBuffer, topic: String) + + /// Start listening for changes to sensor values on the MQTT broker. + public var listen: @Sendable ([String]) async throws -> AsyncStream + + /// A logger to use for the service. public var logger: Logger? + + /// Publish dew-point or enthalpy values back to the MQTT broker. public var publish: @Sendable (Double, String) async throws -> Void + + /// Shutdown the service. public var shutdown: @Sendable () -> Void = {} - public func listen(to topics: [String]) async throws -> AsyncStream { + /// Start listening for changes to sensor values on the MQTT broker. + public func listen(to topics: [String]) async throws -> AsyncStream { try await listen(topics) } + /// Publish dew-point or enthalpy values back to the MQTT broker. public func publish(_ value: Double, to topic: String) async throws { try await publish(value, topic) } @@ -39,42 +55,51 @@ public extension DependencyValues { } } -public actor SensorsService2: Service { +// MARK: - SensorsService + +/// Service that is responsible for listening to changes of the temperature and humidity +/// sensors, then publishing back the calculated dew-point temperature and enthalpy for +/// the sensor location. +/// +/// +public actor SensorsService: Service { @Dependency(\.sensorsClient) var client private var sensors: [TemperatureAndHumiditySensor] + /// Create a new sensors service that listens to the passed in + /// sensors. + /// + /// - Note: The service will fail to start if the array of sensors is not greater than 0. + /// + /// - Parameters: + /// - sensors: The sensors to listen for changes to. public init( sensors: [TemperatureAndHumiditySensor] ) { self.sensors = sensors } + /// Start the service with graceful shutdown, which will attempt to publish + /// any pending changes to the MQTT broker, upon a shutdown signal. public func run() async throws { - guard sensors.count > 0 else { - throw SensorCountError() - } + precondition(sensors.count > 0) let stream = try await client.listen(to: topics) - do { - try await withGracefulShutdownHandler { - try await withThrowingDiscardingTaskGroup { group in - for await result in stream.cancelOnGracefulShutdown() { - group.addTask { try await self.handleResult(result) } - } - } - } onGracefulShutdown: { - Task { - await self.client.logger?.trace("Received graceful shutdown.") - try? await self.publishUpdates() - await self.client.shutdown() + try await withGracefulShutdownHandler { + try await withThrowingDiscardingTaskGroup { group in + for await result in stream { + group.addTask { try await self.handleResult(result) } } } - } catch { - client.logger?.trace("Error: \(error)") - client.shutdown() + } onGracefulShutdown: { + Task { + await self.client.logger?.trace("Received graceful shutdown.") + try? await self.publishUpdates() + await self.client.shutdown() + } } } @@ -85,19 +110,20 @@ public actor SensorsService2: Service { } } - private func handleResult(_ result: MQTTPublishInfo) async throws { - let topic = result.topicName + private func handleResult(_ result: SensorsClient.PublishInfo) async throws { + let topic = result.topic + assert(topics.contains(topic)) client.logger?.trace("Begin handling result for topic: \(topic)") func decode(_: V.Type) -> V? { - var buffer = result.payload + var buffer = result.buffer return V(buffer: &buffer) } if topic.contains("temperature") { client.logger?.trace("Begin handling temperature result.") guard let temperature = decode(DryBulb.self) else { - client.logger?.trace("Failed to decode temperature: \(result.payload)") + client.logger?.trace("Failed to decode temperature: \(result.buffer)") throw DecodingError() } client.logger?.trace("Decoded temperature: \(temperature)") @@ -106,14 +132,11 @@ public actor SensorsService2: Service { } else if topic.contains("humidity") { client.logger?.trace("Begin handling humidity result.") guard let humidity = decode(RelativeHumidity.self) else { - client.logger?.trace("Failed to decode humidity: \(result.payload)") + client.logger?.trace("Failed to decode humidity: \(result.buffer)") throw DecodingError() } client.logger?.trace("Decoded humidity: \(humidity)") try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) - } else { - client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!") - return } try await publishUpdates() @@ -134,174 +157,10 @@ public actor SensorsService2: Service { } } -public actor SensorsService: Service { - private var sensors: [TemperatureAndHumiditySensor] - private let client: MQTTClient - private let events: @Sendable () -> AsyncStream - nonisolated var logger: Logger { client.logger } - private var shuttingDown: Bool = false - - public init( - client: MQTTClient, - events: @Sendable @escaping () -> AsyncStream, - sensors: [TemperatureAndHumiditySensor] - ) { - self.client = client - self.events = events - self.sensors = sensors - } - - /// The entry-point of the service. - /// - /// This method is called to start the service and begin - /// listening for sensor value changes then publishing the dew-point - /// and enthalpy values of the sensors. - public func run() async throws { - do { - try await withGracefulShutdownHandler { - try await withThrowingDiscardingTaskGroup { group in - client.addPublishListener(named: "\(Self.self)") { result in - if self.shuttingDown { - self.logger.trace("Shutting down.") - } else if !self.client.isActive() { - self.logger.trace("Client is not currently active") - } else { - Task { try await self.handleResult(result) } - } - } - for await event in self.events().cancelOnGracefulShutdown() { - logger.trace("Received event: \(event)") - if event == .shuttingDown { - self.setIsShuttingDown() - } else if event == .connected { - group.addTask { try await self.subscribeToSensors() } - } else { - group.addTask { await self.unsubscribeToSensors() } - group.addTask { try? await Task.sleep(for: .milliseconds(100)) } - } - } - } - } onGracefulShutdown: { - // do something. - self.logger.debug("Received graceful shutdown.") - Task { [weak self] in await self?.setIsShuttingDown() } - } - } catch { - // WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue, - // but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown. - // However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe - // to ignore the `noConnection` error. - logger.trace("Run error: \(error)") - // throw error - } - } - - private func setIsShuttingDown() { - logger.debug("Received shut down event.") - Task { try await publishUpdates() } - Task { await self.unsubscribeToSensors() } - shuttingDown = true - client.removePublishListener(named: "\(Self.self)") - } - - private func handleResult( - _ result: Result - ) async throws { - logger.trace("Begin handling result") - do { - switch result { - case let .failure(error): - logger.debug("Failed receiving sensor: \(error)") - throw error - case let .success(value): - // do something. - let topic = value.topicName - logger.trace("Received new value for topic: \(topic)") - if topic.contains("temperature") { - // do something. - var buffer = value.payload - guard let temperature = DryBulb(buffer: &buffer) else { - logger.trace("Decoding error for topic: \(topic)") - throw DecodingError() - } - try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) - try await publishUpdates() - - } else if topic.contains("humidity") { - var buffer = value.payload - // Decode and update the temperature value - guard let humidity = RelativeHumidity(buffer: &buffer) else { - logger.debug("Failed to decode humidity from buffer: \(buffer)") - throw DecodingError() - } - try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) - try await publishUpdates() - } - } - } catch { - logger.trace("Handle Result error: \(error)") - throw error - } - } - - private func subscribeToSensors() async throws { - for sensor in sensors { - _ = try await client.subscribe(to: [ - MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce), - MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce) - ]) - logger.debug("Subscribed to sensor: \(sensor.location)") - } - } - - private func unsubscribeToSensors() async { - logger.trace("Begin unsubscribe to sensors.") - guard client.isActive() else { - logger.debug("Client is not active, skipping.") - return - } - do { - let topics = sensors.reduce(into: [String]()) { array, sensor in - array.append(sensor.topics.temperature) - array.append(sensor.topics.humidity) - } - try await client.unsubscribe(from: topics) - logger.trace("Unsubscribed from sensors.") - } catch { - logger.trace("Unsubscribe error: \(error)") - } - } - - private func publish(double: Double?, to topic: String) async throws { - guard client.isActive() else { return } - guard let double else { return } - let rounded = round(double * 100) / 100 - logger.debug("Publishing \(rounded), to: \(topic)") - try await client.publish( - to: topic, - payload: ByteBufferAllocator().buffer(string: "\(rounded)"), - qos: .exactlyOnce, - retain: true - ) - } - - private func publishUpdates() async throws { - for sensor in sensors.filter(\.needsProcessed) { - try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint) - try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy) - try sensors.hasProcessed(sensor) - } - } - -} - // MARK: - Errors struct DecodingError: Error {} -struct MQTTClientNotConnected: Error {} -struct NotFoundError: Error {} -struct SensorExists: Error {} -struct SensorCountError: Error {} +struct SensorNotFoundError: Error {} // MARK: - Helpers @@ -319,14 +178,14 @@ private extension Array where Element == TemperatureAndHumiditySensor { with value: V ) throws { guard let index = firstIndex(where: { $0.topics.contains(topic) }) else { - throw NotFoundError() + throw SensorNotFoundError() } self[index][keyPath: keyPath] = value } mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws { guard let index = firstIndex(where: { $0.id == sensor.id }) else { - throw NotFoundError() + throw SensorNotFoundError() } self[index].needsProcessed = false } diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index c459087..59dab0d 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -1,3 +1,4 @@ +import Dependencies import Foundation import Logging import Models @@ -5,7 +6,7 @@ import MQTTConnectionService import MQTTNIO import NIO import PsychrometricClientLive -import SensorsService +import SensorsClientLive import ServiceLifecycle @main @@ -32,22 +33,27 @@ struct Application { ) let mqttConnection = MQTTConnectionService(client: mqtt) - let sensors = SensorsService( - client: mqtt, - events: { mqttConnection.events }, - sensors: .live - ) + try await withDependencies { + $0.psychrometricClient = PsychrometricClient.liveValue + $0.sensorsClient = .live(client: mqtt) + } operation: { + let sensors = SensorsService(sensors: .live) - let serviceGroup = ServiceGroup( - services: [ - mqttConnection, - sensors - ], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) + var serviceGroupConfiguration = ServiceGroupConfiguration( + services: [ + mqttConnection, + sensors + ], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + serviceGroupConfiguration.maximumCancellationDuration = .seconds(5) + serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10) - try await serviceGroup.run() + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await serviceGroup.run() + } } } diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/SensorsServiceTests/SensorsClientTests.swift index d9bdc75..679d46c 100755 --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/SensorsServiceTests/SensorsClientTests.swift @@ -157,15 +157,15 @@ final class SensorsClientTests: XCTestCase { @Dependency(\.sensorsClient) var client let stream = try await client.listen(to: ["test"]) - for await value in stream { - var buffer = value.payload + for await result in stream { + var buffer = result.buffer guard let double = Double(buffer: &buffer) else { XCTFail("Failed to decode double") return } XCTAssertEqual(double, 75) - XCTAssertEqual(value.topicName, "test") + XCTAssertEqual(result.topic, "test") try await client.publish(26, to: "publish") try await Task.sleep(for: .milliseconds(100)) client.shutdown() @@ -265,20 +265,14 @@ extension SensorsClient { capturePublishedValues: @escaping (Double, String) -> Void, captureShutdownEvent: @escaping (Bool) -> Void ) -> Self { - let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self) + let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self) let logger = Logger(label: "\(Self.self).testing") return .init( listen: { topics in for (value, topic) in yielding where topics.contains(topic) { continuation.yield( - MQTTPublishInfo( - qos: .atLeastOnce, - retain: true, - topicName: topic, - payload: ByteBuffer(string: "\(value)"), - properties: MQTTProperties() - ) + (buffer: ByteBuffer(string: "\(value)"), topic: topic) ) } return stream diff --git a/docker-compose.yaml b/docker-compose.yaml index 0f04f6a..81008c6 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,7 +14,7 @@ services: depends_on: - mosquitto environment: - - MOSQUITTO_SERVER=mosquitto + - MQTT_HOST=mosquitto test: build: