From 48d51419d7cb3fc44867f87985f9e1ba367c6ca7 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Fri, 8 Nov 2024 23:52:01 -0500 Subject: [PATCH] feat: Adds SensorsService --- Package.resolved | 142 +++++++++++------- Package.swift | 16 +- Sources/ClientLive/SensorsClient.swift | 67 ++++++++- Sources/ClientLive/SensorsService.swift | 131 ++++++++++++++++ Sources/DewPointEnvironment/Environment.swift | 2 + Sources/EnvVars/EnvVars.swift | 2 + Tests/ClientTests/SensorsClientTests.swift | 62 +++++++- 7 files changed, 348 insertions(+), 74 deletions(-) create mode 100644 Sources/ClientLive/SensorsService.swift diff --git a/Package.resolved b/Package.resolved index 7777430..3a1e32a 100755 --- a/Package.resolved +++ b/Package.resolved @@ -1,61 +1,87 @@ { - "object": { - "pins": [ - { - "package": "mqtt-nio", - "repositoryURL": "https://github.com/swift-server-community/mqtt-nio.git", - "state": { - "branch": null, - "revision": "ca8af7a30c4690456ce7de276cd0f037489ba707", - "version": "2.5.3" - } - }, - { - "package": "swift-log", - "repositoryURL": "https://github.com/apple/swift-log.git", - "state": { - "branch": null, - "revision": "5d66f7ba25daf4f94100e7022febf3c75e37a6c7", - "version": "1.4.2" - } - }, - { - "package": "swift-nio", - "repositoryURL": "https://github.com/apple/swift-nio", - "state": { - "branch": null, - "revision": "6aa9347d9bc5bbfe6a84983aec955c17ffea96ef", - "version": "2.33.0" - } - }, - { - "package": "swift-nio-ssl", - "repositoryURL": "https://github.com/apple/swift-nio-ssl.git", - "state": { - "branch": null, - "revision": "b5260a31c2a72a89fa684f5efb3054d8725a2316", - "version": "2.18.0" - } - }, - { - "package": "swift-nio-transport-services", - "repositoryURL": "https://github.com/apple/swift-nio-transport-services.git", - "state": { - "branch": null, - "revision": "8ab824b140d0ebcd87e9149266ddc353e3705a3e", - "version": "1.11.4" - } - }, - { - "package": "swift-psychrometrics", - "repositoryURL": "https://github.com/swift-psychrometrics/swift-psychrometrics", - "state": { - "branch": null, - "revision": "03573545c3750b406921eb22a9575c8062beef88", - "version": "0.1.2" - } + "originHash" : "869c04e4de5c59d3c7c00851dffa2bfde39aa29dc5be3cbbb759ad950fffd706", + "pins" : [ + { + "identity" : "mqtt-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swift-server-community/mqtt-nio.git", + "state" : { + "revision" : "ca8af7a30c4690456ce7de276cd0f037489ba707", + "version" : "2.5.3" } - ] - }, - "version": 1 + }, + { + "identity" : "swift-async-algorithms", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-async-algorithms.git", + "state" : { + "revision" : "5c8bd186f48c16af0775972700626f0b74588278", + "version" : "1.0.2" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "671108c96644956dddcd89dd59c203dcdb36cec7", + "version" : "1.1.4" + } + }, + { + "identity" : "swift-log", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-log.git", + "state" : { + "revision" : "9cb486020ebf03bfa5b5df985387a14a98744537", + "version" : "1.6.1" + } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio", + "state" : { + "revision" : "6aa9347d9bc5bbfe6a84983aec955c17ffea96ef", + "version" : "2.33.0" + } + }, + { + "identity" : "swift-nio-ssl", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-ssl.git", + "state" : { + "revision" : "b5260a31c2a72a89fa684f5efb3054d8725a2316", + "version" : "2.18.0" + } + }, + { + "identity" : "swift-nio-transport-services", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-transport-services.git", + "state" : { + "revision" : "8ab824b140d0ebcd87e9149266ddc353e3705a3e", + "version" : "1.11.4" + } + }, + { + "identity" : "swift-psychrometrics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swift-psychrometrics/swift-psychrometrics", + "state" : { + "revision" : "03573545c3750b406921eb22a9575c8062beef88", + "version" : "0.1.2" + } + }, + { + "identity" : "swift-service-lifecycle", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swift-server/swift-service-lifecycle.git", + "state" : { + "revision" : "f70b838872863396a25694d8b19fe58bcd0b7903", + "version" : "2.6.2" + } + } + ], + "version" : 3 } diff --git a/Package.swift b/Package.swift index 036b79a..300eac6 100755 --- a/Package.swift +++ b/Package.swift @@ -5,7 +5,7 @@ import PackageDescription let package = Package( name: "dewPoint-controller", platforms: [ - .macOS(.v12) + .macOS(.v14) ], products: [ .executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]), @@ -14,12 +14,13 @@ let package = Package( .library(name: "EnvVars", targets: ["EnvVars"]), .library(name: "Models", targets: ["Models"]), .library(name: "Client", targets: ["Client"]), - .library(name: "ClientLive", targets: ["ClientLive"]), + .library(name: "ClientLive", targets: ["ClientLive"]) ], dependencies: [ .package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"), .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), - .package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", from: "0.1.0") + .package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", from: "0.1.0"), + .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0") ], targets: [ .executableTarget( @@ -53,7 +54,7 @@ let package = Package( "EnvVars", "Client", "Models", - .product(name: "MQTTNIO", package: "mqtt-nio"), + .product(name: "MQTTNIO", package: "mqtt-nio") ] ), .target( @@ -63,7 +64,7 @@ let package = Package( .target( name: "Models", dependencies: [ - .product(name: "Psychrometrics", package: "swift-psychrometrics"), + .product(name: "Psychrometrics", package: "swift-psychrometrics") ] ), .target( @@ -80,7 +81,8 @@ let package = Package( dependencies: [ "Client", "EnvVars", - .product(name: "MQTTNIO", package: "mqtt-nio") + .product(name: "MQTTNIO", package: "mqtt-nio"), + .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") ] ), .testTarget( @@ -95,6 +97,6 @@ let package = Package( dependencies: [ "Models" ] - ), + ) ] ) diff --git a/Sources/ClientLive/SensorsClient.swift b/Sources/ClientLive/SensorsClient.swift index e64eb8b..030f0df 100644 --- a/Sources/ClientLive/SensorsClient.swift +++ b/Sources/ClientLive/SensorsClient.swift @@ -4,6 +4,7 @@ import Models import MQTTNIO import NIO import Psychrometrics +import ServiceLifecycle // TODO: Pass in eventLoopGroup and MQTTClient. public actor SensorsClient { @@ -65,14 +66,22 @@ public actor SensorsClient { } public func start() async throws { - do { - try await subscribeToSensors() - try await addSensorListeners() - logger.debug("Begin listening to sensors...") - } catch { - logger.trace("Error:\n\(error)") - throw error + await withGracefulShutdownHandler { + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { try await self.subscribeToSensors() } + group.addTask { try await self.addSensorListeners() } + } + } onGracefulShutdown: { + Task { await self.shutdown() } } +// do { +// try await subscribeToSensors() +// try await addSensorListeners() +// logger.debug("Begin listening to sensors...") +// } catch { +// logger.trace("Error:\n(error)") +// throw error +// } } public func shutdown() async { @@ -88,6 +97,48 @@ public actor SensorsClient { } } + private func _addSensorListeners(qos _: MQTTQoS = .exactlyOnce) async throws { + // try await withThrowingDiscardingTaskGroup { group in + // group.addTask { try await self.subscribeToSensors(qos: qos) } + + for await result in client.createPublishListener() { + switch result { + case let .failure(error): + logger.trace("Error:\n\(error)") + case let .success(value): + let topic = value.topicName + logger.trace("Received new value for topic: \(topic)") + if topic.contains("temperature") { + // do something. + var buffer = value.payload + guard let temperature = Temperature(buffer: &buffer) else { + logger.trace("Decoding error for topic: \(topic)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) + // group.addTask { + Task { + try await self.publishUpdates() + } + + } else if topic.contains("humidity") { + var buffer = value.payload + // Decode and update the temperature value + guard let humidity = RelativeHumidity(buffer: &buffer) else { + logger.debug("Failed to decode humidity from buffer: \(buffer)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) + // group.addTask { + Task { + try await self.publishUpdates() + } + } + // } + } + } + } + func addSensorListeners(qos: MQTTQoS = .exactlyOnce) async throws { try await subscribeToSensors(qos: qos) client.addPublishListener(named: "SensorsClient") { result in @@ -138,7 +189,7 @@ public actor SensorsClient { ) } - func publishUpdates() async throws { + private func publishUpdates() async throws { for sensor in sensors.filter(\.needsProcessed) { try await publish(double: sensor.dewPoint?.rawValue, to: sensor.topics.dewPoint) try await publish(double: sensor.enthalpy?.rawValue, to: sensor.topics.enthalpy) diff --git a/Sources/ClientLive/SensorsService.swift b/Sources/ClientLive/SensorsService.swift new file mode 100644 index 0000000..fa74b38 --- /dev/null +++ b/Sources/ClientLive/SensorsService.swift @@ -0,0 +1,131 @@ +import Foundation +import Logging +@preconcurrency import Models +@preconcurrency import MQTTNIO +import NIO +import Psychrometrics +import ServiceLifecycle + +public actor SensorsService: Service { + private var sensors: [TemperatureAndHumiditySensor] + private let client: MQTTClient + var logger: Logger { client.logger } + + public init( + client: MQTTClient, + sensors: [TemperatureAndHumiditySensor] + ) { + self.client = client + self.sensors = sensors + } + + public func run() async throws { + guard client.isActive() else { + throw MQTTClientNotConnected() + } + try await withThrowingDiscardingTaskGroup { group in + group.addTask { try await self.subscribeToSensors() } + for await result in client.createPublishListener().cancelOnGracefulShutdown() { + group.addTask { + try await self.handleResult(result) + } + } + } + } + + private func handleResult( + _ result: Result + ) async throws { + switch result { + case let .failure(error): + logger.debug("Failed receiving sensor: \(error)") + throw error + case let .success(value): + // do something. + let topic = value.topicName + logger.trace("Received new value for topic: \(topic)") + if topic.contains("temperature") { + // do something. + var buffer = value.payload + guard let temperature = Temperature(buffer: &buffer) else { + logger.trace("Decoding error for topic: \(topic)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) + try await publishUpdates() + + } else if topic.contains("humidity") { + var buffer = value.payload + // Decode and update the temperature value + guard let humidity = RelativeHumidity(buffer: &buffer) else { + logger.debug("Failed to decode humidity from buffer: \(buffer)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) + try await publishUpdates() + } + } + } + + private func subscribeToSensors() async throws { + for sensor in sensors { + _ = try await client.subscribe(to: [ + MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce), + MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce) + ]) + logger.debug("Subscribed to sensor: \(sensor.location)") + } + } + + private func publish(double: Double?, to topic: String) async throws { + guard let double else { return } + let rounded = round(double * 100) / 100 + logger.debug("Publishing \(rounded), to: \(topic)") + try await client.publish( + to: topic, + payload: ByteBufferAllocator().buffer(string: "\(rounded)"), + qos: .exactlyOnce, + retain: true + ) + } + + private func publishUpdates() async throws { + for sensor in sensors.filter(\.needsProcessed) { + try await publish(double: sensor.dewPoint?.rawValue, to: sensor.topics.dewPoint) + try await publish(double: sensor.enthalpy?.rawValue, to: sensor.topics.enthalpy) + try sensors.hasProcessed(sensor) + } + } + +} + +// MARK: - Helpers + +struct MQTTClientNotConnected: Error {} + +private extension TemperatureAndHumiditySensor.Topics { + func contains(_ topic: String) -> Bool { + temperature == topic || humidity == topic + } +} + +private extension Array where Element == TemperatureAndHumiditySensor { + + mutating func update( + topic: String, + keyPath: WritableKeyPath, + with value: V + ) throws { + guard let index = firstIndex(where: { $0.topics.contains(topic) }) else { + throw NotFoundError() + } + self[index][keyPath: keyPath] = value + } + + mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws { + guard let index = firstIndex(where: { $0.id == sensor.id }) else { + throw NotFoundError() + } + self[index].needsProcessed = false + } +} diff --git a/Sources/DewPointEnvironment/Environment.swift b/Sources/DewPointEnvironment/Environment.swift index 4fa9e2f..a522660 100755 --- a/Sources/DewPointEnvironment/Environment.swift +++ b/Sources/DewPointEnvironment/Environment.swift @@ -3,6 +3,8 @@ import EnvVars import Models import MQTTNIO +// TODO: Remove + public struct DewPointEnvironment { public var envVars: EnvVars diff --git a/Sources/EnvVars/EnvVars.swift b/Sources/EnvVars/EnvVars.swift index d0d9fe1..1927992 100755 --- a/Sources/EnvVars/EnvVars.swift +++ b/Sources/EnvVars/EnvVars.swift @@ -1,5 +1,7 @@ import Foundation +// TODO: Move to Models + /// Holds common settings for connecting to your MQTT broker. The default values can be used, /// they can be loaded from the shell environment, or from a file located in the root directory. /// diff --git a/Tests/ClientTests/SensorsClientTests.swift b/Tests/ClientTests/SensorsClientTests.swift index bb8bd07..1c12cc3 100755 --- a/Tests/ClientTests/SensorsClientTests.swift +++ b/Tests/ClientTests/SensorsClientTests.swift @@ -7,7 +7,7 @@ import NIO import Psychrometrics import XCTest -final class AsyncClientTests: XCTestCase { +final class SensorsClientTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" @@ -35,6 +35,66 @@ final class AsyncClientTests: XCTestCase { await client.shutdown() } + func testSensorService() async throws { + let client = createClient(identifier: "testSensorService") + let mqtt = await client.client + let sensor = TemperatureAndHumiditySensor(location: .mixedAir, units: .metric) + let publishInfo = PublishInfoContainer(topicFilters: [ + sensor.topics.dewPoint, + sensor.topics.enthalpy + ]) + let service = SensorsService(client: mqtt, sensors: [sensor]) + + // fix to connect the mqtt client. + await client.connect() + let task = Task { try await service.run() } + + _ = try await mqtt.subscribe(to: [ + .init(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce), + .init(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce) + ]) + + let listener = mqtt.createPublishListener() + Task { + for await result in listener { + switch result { + case let .failure(error): + XCTFail("\(error)") + case let .success(value): + await publishInfo.addPublishInfo(value) + } + } + } + + try await mqtt.publish( + to: sensor.topics.temperature, + payload: ByteBufferAllocator().buffer(string: "75.123"), + qos: .exactlyOnce, + retain: true + ) + + try await Task.sleep(for: .seconds(1)) + + // XCTAssert(client.sensors.first!.needsProcessed) +// let firstSensor = await client.sensors.first! +// XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius)) + + try await mqtt.publish( + to: sensor.topics.humidity, + payload: ByteBufferAllocator().buffer(string: "50"), + qos: .exactlyOnce, + retain: true + ) + + try await Task.sleep(for: .seconds(1)) + + XCTAssertEqual(publishInfo.info.count, 2) + + // fix to shutdown the mqtt client. + task.cancel() + await client.shutdown() + } + func testSensorCapturesPublishedState() async throws { let client = createClient(identifier: "testSensorCapturesPublishedState") let mqtt = await client.client