import EnvVars import Logging import Models import MQTTNIO import NIO import Psychrometrics // TODO: Pass in eventLoopGroup and MQTTClient. public actor SensorsClient { public static let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) public let client: MQTTClient public private(set) var shuttingDown: Bool public private(set) var sensors: [TemperatureAndHumiditySensor] var logger: Logger { client.logger } public init( envVars: EnvVars, logger: Logger, sensors: [TemperatureAndHumiditySensor] = [] ) { let config = MQTTClient.Configuration( version: .v3_1_1, userName: envVars.userName, password: envVars.password, useSSL: false, useWebSockets: false, tlsConfiguration: nil, webSocketURLPath: nil ) self.client = MQTTClient( host: envVars.host, identifier: envVars.identifier, eventLoopGroupProvider: .shared(Self.eventLoopGroup), logger: logger, configuration: config ) self.shuttingDown = false self.sensors = sensors } public func addSensor(_ sensor: TemperatureAndHumiditySensor) async throws { guard sensors.firstIndex(where: { $0.location == sensor.location }) == nil else { throw SensorExists() } sensors.append(sensor) } public func connect(cleanSession: Bool = true) async { do { try await client.connect(cleanSession: cleanSession) client.addCloseListener(named: "SensorsClient") { [self] _ in guard !self.shuttingDown else { return } Task { self.logger.debug("Connection closed.") self.logger.debug("Reconnecting...") await self.connect() } } logger.debug("Connection successful.") } catch { logger.trace("Connection Failed.\n\(error)") } } public func start() async throws { do { try await subscribeToSensors() try await addSensorListeners() logger.debug("Begin listening to sensors...") } catch { logger.trace("Error:\n\(error)") throw error } } public func shutdown() async { shuttingDown = true try? await client.disconnect() try? await client.shutdown() } /// Subscribe to changes of the temperature and humidity sensors. func subscribeToSensors(qos: MQTTQoS = .exactlyOnce) async throws { for sensor in sensors { try await client.subscribeToSensor(sensor, qos: qos) } } func addSensorListeners(qos: MQTTQoS = .exactlyOnce) async throws { try await subscribeToSensors(qos: qos) client.addPublishListener(named: "SensorsClient") { result in do { switch result { case let .success(value): var buffer = value.payload let topic = value.topicName self.logger.trace("Received new value for topic: \(topic)") if topic.contains("temperature") { // Decode and update the temperature value guard let temperature = Temperature(buffer: &buffer) else { self.logger.debug("Failed to decode temperature from buffer: \(buffer)") throw DecodingError() } try self.sensors.update(topic: topic, keyPath: \.temperature, with: temperature) Task { try await self.publishUpdates() } } else if topic.contains("humidity") { // Decode and update the temperature value guard let humidity = RelativeHumidity(buffer: &buffer) else { self.logger.debug("Failed to decode humidity from buffer: \(buffer)") throw DecodingError() } try self.sensors.update(topic: topic, keyPath: \.humidity, with: humidity) Task { try await self.publishUpdates() } } case let .failure(error): self.logger.trace("Error:\n\(error)") throw error } } catch { self.logger.trace("Error:\n\(error)") } } } private func publish(double: Double?, to topic: String) async throws { guard let double else { return } let rounded = round(double * 100) / 100 logger.debug("Publishing \(rounded), to: \(topic)") try await client.publish( to: topic, payload: ByteBufferAllocator().buffer(string: "\(rounded)"), qos: .exactlyOnce, retain: true ) } func publishUpdates() async throws { for sensor in sensors.filter(\.needsProcessed) { try await publish(double: sensor.dewPoint?.rawValue, to: sensor.topics.dewPoint) try await publish(double: sensor.enthalpy?.rawValue, to: sensor.topics.enthalpy) try sensors.hasProcessed(sensor) } } } // MARK: - Helpers private extension MQTTClient { func subscribeToSensor( _ sensor: TemperatureAndHumiditySensor, qos: MQTTQoS = .exactlyOnce ) async throws { do { _ = try await subscribe(to: [ MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: qos), MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: qos) ]) logger.debug("Subscribed to temperature-humidity sensor: \(sensor.id)") } catch { logger.trace("Failed to subscribe to temperature-humidity sensor: \(sensor.id)") throw error } } } struct DecodingError: Error {} struct NotFoundError: Error {} struct SensorExists: Error {} private extension TemperatureAndHumiditySensor.Topics { func contains(_ topic: String) -> Bool { temperature == topic || humidity == topic } } // TODO: Move to dewpoint-controller/main.swift public extension Array where Element == TemperatureAndHumiditySensor { static var live: Self { TemperatureAndHumiditySensor.Location.allCases.map { TemperatureAndHumiditySensor(location: $0) } } } private extension Array where Element == TemperatureAndHumiditySensor { mutating func update( topic: String, keyPath: WritableKeyPath, with value: V ) throws { guard let index = firstIndex(where: { $0.topics.contains(topic) }) else { throw NotFoundError() } self[index][keyPath: keyPath] = value } mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws { guard let index = firstIndex(where: { $0.id == sensor.id }) else { throw NotFoundError() } self[index].needsProcessed = false } }