import Dependencies import DependenciesMacros import Foundation import Logging import Models import MQTTManager import MQTTNIO import NIO import PsychrometricClient import ServiceLifecycle /// Service that is responsible for listening to changes of the temperature and humidity /// sensors, then publishing back the calculated dew-point temperature and enthalpy for /// the sensor location. /// /// public actor SensorsService: Service { @Dependency(\.mqtt) var mqtt /// The logger to use for the service. private let logger: Logger? /// The sensors that we are listening for updates to, so /// that we can calculate the dew-point temperature and enthalpy /// values to publish back to the MQTT broker. private var sensors: [TemperatureAndHumiditySensor] private var topics: [String] { sensors.reduce(into: [String]()) { array, sensor in array.append(sensor.topics.temperature) array.append(sensor.topics.humidity) } } /// Create a new sensors service that listens to the passed in /// sensors. /// /// - Note: The service will fail to start if the array of sensors is not greater than 0. /// /// - Parameters: /// - sensors: The sensors to listen for changes to. /// - logger: An optional logger to use. public init( sensors: [TemperatureAndHumiditySensor], logger: Logger? = nil ) { self.sensors = sensors self.logger = logger } /// Start the service with graceful shutdown, which will attempt to publish /// any pending changes to the MQTT broker, upon a shutdown signal. public func run() async throws { precondition(sensors.count > 0, "Sensors should not be empty.") let stream = try await makeStream() await withGracefulShutdownHandler { for await result in stream.cancelOnGracefulShutdown() { logger?.debug("Received result for topic: \(result.topic)") await handleResult(result) } } onGracefulShutdown: { self.logger?.debug("Received graceful shutdown.") Task { try await self.shutdown() } } } @_spi(Internal) public func shutdown() async throws { try await publishUpdates() } private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> { // ignore duplicate values, to prevent publishing dew-point and enthalpy // changes to frequently. try await mqtt.listen(to: topics) .map { ($0.payload, $0.topicName) } .removeDuplicates { lhs, rhs in lhs.buffer == rhs.buffer && lhs.topic == rhs.topic } .eraseToStream() } private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async { do { let topic = result.topic assert(topics.contains(topic)) logger?.debug("Begin handling result for topic: \(topic)") func decode(_: V.Type) -> V? { return V(buffer: result.buffer) } if topic.contains("temperature") { logger?.debug("Begin handling temperature result.") guard let temperature = decode(DryBulb.self) else { logger?.debug("Failed to decode temperature: \(result.buffer)") throw DecodingError() } logger?.debug("Decoded temperature: \(temperature)") try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) } else if topic.contains("humidity") { logger?.debug("Begin handling humidity result.") guard let humidity = decode(RelativeHumidity.self) else { logger?.debug("Failed to decode humidity: \(result.buffer)") throw DecodingError() } logger?.debug("Decoded humidity: \(humidity)") try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) } try await publishUpdates() logger?.debug("Done handling result for topic: \(topic)") } catch { logger?.error("Received error while handling result: \(error)") } } private func publish(_ double: Double?, to topic: String) async throws { guard let double else { return } try await mqtt.publish( ByteBufferAllocator().buffer(string: "\(double)"), to: topic, qos: .exactlyOnce, retain: true ) logger?.debug("Published update to topic: \(topic)") } private func publishUpdates() async throws { for sensor in sensors.filter(\.needsProcessed) { try await publish(sensor.dewPoint?.fahrenheit, to: sensor.topics.dewPoint) try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy) try sensors.hasProcessed(sensor) } } } // MARK: - Errors struct DecodingError: Error {} struct SensorNotFoundError: Error {} // MARK: - Helpers private extension TemperatureAndHumiditySensor.Topics { func contains(_ topic: String) -> Bool { temperature == topic || humidity == topic } } private extension Array where Element == TemperatureAndHumiditySensor { mutating func update( topic: String, keyPath: WritableKeyPath, with value: V ) throws { guard let index = firstIndex(where: { $0.topics.contains(topic) }) else { throw SensorNotFoundError() } self[index][keyPath: keyPath] = value } mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws { guard let index = firstIndex(where: { $0.id == sensor.id }) else { throw SensorNotFoundError() } self[index].setHasProcessed() } } /// Represents a type that can be initialized by a ``ByteBuffer``. protocol BufferInitalizable { init?(buffer: ByteBuffer) } extension Double: BufferInitalizable { /// Attempt to create / parse a double from a byte buffer. init?(buffer: ByteBuffer) { let string = String(buffer: buffer) self.init(string) } } extension Tagged: BufferInitalizable where RawValue: BufferInitalizable { init?(buffer: ByteBuffer) { guard let value = RawValue(buffer: buffer) else { return nil } self.init(value) } } extension Humidity: BufferInitalizable { init?(buffer: ByteBuffer) { guard let value = Double(buffer: buffer) else { return nil } self.init(value) } } extension Temperature: BufferInitalizable { init?(buffer: ByteBuffer) { guard let value = Double(buffer: buffer) else { return nil } self.init(value, units: .celsius) } }