diff --git a/Package.swift b/Package.swift index 1ac6670..dbba5dd 100644 --- a/Package.swift +++ b/Package.swift @@ -15,7 +15,6 @@ let package = Package( .library(name: "Models", targets: ["Models"]), .library(name: "Client", targets: ["Client"]), .library(name: "ClientLive", targets: ["ClientLive"]), - .library(name: "MQTTStore", targets: ["MQTTStore"]), ], dependencies: [ .package(url: "https://github.com/adam-fowler/mqtt-nio.git", from: "2.0.0"), @@ -88,15 +87,5 @@ let package = Package( "ClientLive" ] ), - .target( - name: "MQTTStore", - dependencies: [ - .product(name: "MQTTNIO", package: "mqtt-nio") - ] - ), - .testTarget( - name: "MQTTStoreTests", - dependencies: ["MQTTStore"] - ) ] ) diff --git a/Sources/Bootstrap/Bootstrap.swift b/Sources/Bootstrap/Bootstrap.swift index 930bb6c..0f377b7 100644 --- a/Sources/Bootstrap/Bootstrap.swift +++ b/Sources/Bootstrap/Bootstrap.swift @@ -118,11 +118,10 @@ extension EventLoopFuture where Value == (EnvVars, Topics) { logger: Logger? ) -> EventLoopFuture { map { envVars, topics in - let nioClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger) + let mqttClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger) return DewPointEnvironment.init( - mqttClient: .live(client: nioClient, topics: topics), envVars: envVars, - nioClient: nioClient, + mqttClient: mqttClient, topics: topics ) } @@ -139,7 +138,7 @@ extension EventLoopFuture where Value == DewPointEnvironment { guard autoConnect else { return self } return flatMap { environment in logger?.debug("Connecting to MQTT Broker...") - return environment.nioClient.connect() + return environment.mqttClient.connect() .map { _ in logger?.debug("Successfully connected to MQTT Broker...") return environment diff --git a/Sources/Client/Interface.swift b/Sources/Client/Interface.swift index e7f94e8..def1cf3 100644 --- a/Sources/Client/Interface.swift +++ b/Sources/Client/Interface.swift @@ -5,90 +5,7 @@ import Models import NIO import Psychrometrics -/// Represents the applications interactions with the MQTT Broker. -/// -/// This is an abstraction around the ``MQTTNIO.MQTTClient``. -public struct MQTTClient { - - /// Retrieve the humidity from the MQTT Broker. - public var fetchHumidity: (Sensor) -> EventLoopFuture - - /// Retrieve a set point from the MQTT Broker. - public var fetchSetPoint: (KeyPath) -> EventLoopFuture - - /// Retrieve the temperature from the MQTT Broker. - public var fetchTemperature: (Sensor, PsychrometricEnvironment.Units?) -> EventLoopFuture - - /// Publish a change of state message for a relay. - public var setRelay: (KeyPath, Relay.State) -> EventLoopFuture - - /// Disconnect and close the connection to the MQTT Broker. - public var shutdown: () -> EventLoopFuture - - /// Publish the current dew point to the MQTT Broker - public var publishDewPoint: (DewPoint, String) -> EventLoopFuture - - public init( - fetchHumidity: @escaping (Sensor) -> EventLoopFuture, - fetchSetPoint: @escaping (KeyPath) -> EventLoopFuture, - fetchTemperature: @escaping (Sensor, PsychrometricEnvironment.Units?) -> EventLoopFuture, - setRelay: @escaping (KeyPath, Relay.State) -> EventLoopFuture, - shutdown: @escaping () -> EventLoopFuture, - publishDewPoint: @escaping (DewPoint, String) -> EventLoopFuture - ) { - self.fetchHumidity = fetchHumidity - self.fetchSetPoint = fetchSetPoint - self.fetchTemperature = fetchTemperature - self.setRelay = setRelay - self.shutdown = shutdown - self.publishDewPoint = publishDewPoint - } - - /// Fetches the current temperature and humidity and calculates the current dew point. - /// - /// - Parameters: - /// - temperature: The temperature sensor to fetch the temperature from. - /// - humidity: The humidity sensor to fetch the humidity from. - /// - units: Optional units for the dew point. - public func currentDewPoint( - temperature: Sensor, - humidity: Sensor, - units: PsychrometricEnvironment.Units? = nil - ) -> EventLoopFuture { - fetchTemperature(temperature, units) - .and(fetchHumidity(humidity)) - .convertToDewPoint(units: units) - } - - /// Convenience to send a change of state message to a relay. - /// - /// - Parameters: - /// - relay: The relay to send the message to. - /// - state: The state to change the relay to. - public func `set`(relay: KeyPath, to state: Relay.State) -> EventLoopFuture { - setRelay(relay, state) - } - - /// Convenience to publish the current dew point back to the MQTT Broker. - /// - /// This is synactic sugar around ``MQTTClient.publishDewPoint``. - /// - /// - Parameters: - /// - dewPoint: The dew point value to publish. - /// - topic: The dew point topic to publish to. - public func publish(dewPoint: DewPoint, to topic: String) -> EventLoopFuture { - publishDewPoint(dewPoint, topic) - } -} - -extension EventLoopFuture where Value == (Temperature, RelativeHumidity) { - - fileprivate func convertToDewPoint(units: PsychrometricEnvironment.Units?) -> EventLoopFuture { - map { .init(dryBulb: $0, humidity: $1, units: units) } - } -} - -public struct Client2 { +public struct Client { /// Add the publish listeners to the MQTT Broker, to be notified of published changes. public var addListeners: () -> Void diff --git a/Sources/ClientLive/Helpers.swift b/Sources/ClientLive/Helpers.swift index 70f1f33..660315e 100644 --- a/Sources/ClientLive/Helpers.swift +++ b/Sources/ClientLive/Helpers.swift @@ -3,6 +3,7 @@ import Logging import Models import MQTTNIO import NIO +import Psychrometrics /// Represents a type that can be initialized by a ``ByteBuffer``. protocol BufferInitalizable { @@ -33,154 +34,219 @@ extension RelativeHumidity: BufferInitalizable { } } -/// Represents errors thrown while communicating with the MQTT Broker. -enum MQTTError: Error { +extension MQTTNIO.MQTTClient { - /// Sensor error. - case sensor(reason: String, error: Error?) - - /// Relay error. - case relay(reason: String, error: Error?) -} - -protocol FetchableTopic { - associatedtype Value: BufferInitalizable - var topic: String { get } -} - -extension Double: BufferInitalizable { - - init?(buffer: inout ByteBuffer) { - guard let string = buffer.readString(length: buffer.readableBytes) else { - return nil - } - self.init(string) + func logFailure(topic: String, error: Error) { + logger.error("\(topic): \(error)") } } -//extension SetPoint: FetchableTopic { -// typealias Value = Double -//} +extension Result where Success == MQTTPublishInfo { + func logIfFailure(client: MQTTNIO.MQTTClient, topic: String) -> ByteBuffer? { + switch self { + case let .success(value): + guard value.topicName == topic else { return nil } + return value.payload + case let .failure(error): + client.logFailure(topic: topic, error: error) + return nil + } + } +} -extension Sensor: FetchableTopic where Reading: BufferInitalizable { - typealias Value = Reading +extension Optional where Wrapped == ByteBuffer { + + func parse(as type: T.Type) -> T? where T: BufferInitalizable { + switch self { + case var .some(buffer): + return T.init(buffer: &buffer) + case .none: + return nil + } + } +} + +fileprivate struct TemperatureAndHumiditySensorKeyPathEnvelope { + + let humidityTopic: KeyPath + let temperatureTopic: KeyPath + let temperatureState: WritableKeyPath + let humidityState: WritableKeyPath + + func addListener(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) { + + let temperatureTopic = topics.sensors[keyPath: temperatureTopic] + client.logger.trace("Adding listener for topic: \(temperatureTopic)") + client.addPublishListener(named: temperatureTopic) { result in + result.logIfFailure(client: client, topic: temperatureTopic) + .parse(as: Temperature.self) + .map { temperature in + state.sensors[keyPath: temperatureState] = temperature + } + } + + let humidityTopic = topics.sensors[keyPath: humidityTopic] + client.logger.trace("Adding listener for topic: \(humidityTopic)") + client.addPublishListener(named: humidityTopic) { result in + result.logIfFailure(client: client, topic: humidityTopic) + .parse(as: RelativeHumidity.self) + .map { humidity in + state.sensors[keyPath: humidityState] = humidity + } + } + } +} + +extension Array where Element == TemperatureAndHumiditySensorKeyPathEnvelope { + func addListeners(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) { + _ = self.map { envelope in + envelope.addListener(to: client, topics: topics, state: state) + } + } +} + +extension Array where Element == MQTTSubscribeInfo { + static func sensors(topics: Topics) -> Self { + [ + .init(topicFilter: topics.sensors.mixedAirSensor.temperature, qos: .atLeastOnce), + .init(topicFilter: topics.sensors.mixedAirSensor.humidity, qos: .atLeastOnce), + .init(topicFilter: topics.sensors.postCoilSensor.temperature, qos: .atLeastOnce), + .init(topicFilter: topics.sensors.postCoilSensor.humidity, qos: .atLeastOnce), + .init(topicFilter: topics.sensors.returnAirSensor.temperature, qos: .atLeastOnce), + .init(topicFilter: topics.sensors.returnAirSensor.humidity, qos: .atLeastOnce), + .init(topicFilter: topics.sensors.supplyAirSensor.temperature, qos: .atLeastOnce), + .init(topicFilter: topics.sensors.supplyAirSensor.humidity, qos: .atLeastOnce), + ] + } +} + +extension State { + func addSensorListeners(to client: MQTTNIO.MQTTClient, topics: Topics) { + let envelopes: [TemperatureAndHumiditySensorKeyPathEnvelope] = [ + .init( + humidityTopic: \.mixedAirSensor.humidity, + temperatureTopic: \.mixedAirSensor.temperature, + temperatureState: \.mixedAirSensor.temperature, + humidityState: \.mixedAirSensor.humidity + ), + .init( + humidityTopic: \.postCoilSensor.humidity, + temperatureTopic: \.postCoilSensor.temperature, + temperatureState: \.postCoilSensor.temperature, + humidityState: \.postCoilSensor.humidity + ), + .init( + humidityTopic: \.returnAirSensor.humidity, + temperatureTopic: \.returnAirSensor.temperature, + temperatureState: \.returnAirSensor.temperature, + humidityState: \.returnAirSensor.humidity + ), + .init( + humidityTopic: \.supplyAirSensor.humidity, + temperatureTopic: \.supplyAirSensor.temperature, + temperatureState: \.supplyAirSensor.temperature, + humidityState: \.supplyAirSensor.humidity + ), + ] + envelopes.addListeners(to: client, topics: topics, state: self) + } +} + +extension Client.SensorPublishRequest { + + func dewPointData(topics: Topics, units: PsychrometricEnvironment.Units?) -> (DewPoint, String)? { + switch self { + case let .mixed(sensor): + guard let dp = sensor.dewPoint(units: units) else { return nil } + return (dp, topics.sensors.mixedAirSensor.dewPoint) + case let .postCoil(sensor): + guard let dp = sensor.dewPoint(units: units) else { return nil } + return (dp, topics.sensors.postCoilSensor.dewPoint) + case let .return(sensor): + guard let dp = sensor.dewPoint(units: units) else { return nil } + return (dp, topics.sensors.returnAirSensor.dewPoint) + case let .supply(sensor): + guard let dp = sensor.dewPoint(units: units) else { return nil } + return (dp, topics.sensors.supplyAirSensor.dewPoint) + } + } + + func enthalpyData(altitude: Length, topics: Topics, units: PsychrometricEnvironment.Units?) -> (EnthalpyOf, String)? { + switch self { + case let .mixed(sensor): + guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } + return (enthalpy, topics.sensors.mixedAirSensor.enthalpy) + case let .postCoil(sensor): + guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } + return (enthalpy, topics.sensors.postCoilSensor.enthalpy) + case let .return(sensor): + guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } + return (enthalpy, topics.sensors.returnAirSensor.enthalpy) + case let .supply(sensor): + guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } + return (enthalpy, topics.sensors.supplyAirSensor.enthalpy) + } + } + + func setHasProcessed(state: State) { + switch self { + case .mixed: + state.sensors.mixedAirSensor.needsProcessed = false + case .postCoil: + state.sensors.postCoilSensor.needsProcessed = false + case .return: + state.sensors.returnAirSensor.needsProcessed = false + case .supply: + state.sensors.supplyAirSensor.needsProcessed = false + } + } } extension MQTTNIO.MQTTClient { - func mqttSubscription( - topic: String, - qos: MQTTQoS = .atLeastOnce, - retainAsPublished: Bool = true, - retainHandling: MQTTSubscribeInfoV5.RetainHandling = .sendAlways - ) -> MQTTSubscribeInfoV5 { - .init(topicFilter: topic, qos: qos, retainAsPublished: retainAsPublished, retainHandling: retainHandling) - } - - func fetch( - _ subscription: MQTTSubscribeInfoV5 - ) -> EventLoopFuture where Value: BufferInitalizable { - logger.debug("Fetching data for: \(subscription.topicFilter)") - return v5.subscribe(to: [subscription]) - .flatMap { _ in - let promise = self.eventLoopGroup.next().makePromise(of: Value.self) - self.addPublishListener(named: subscription.topicFilter + "-listener") { result in - - result.mapBuffer(to: Value.self) - .unwrap(or: MQTTError.sensor(reason: "Invalid sensor reading", error: nil)) - .fullfill(promise: promise) - - self.logger.debug("Done fetching data for: \(subscription.topicFilter)") - } - - return promise.futureResult - } - } - - /// Fetch a sensor state and convert it appropriately, when the sensor type is ``BufferInitializable``. - /// - /// - Parameters: - /// - sensor: The sensor to fetch the state of. - func fetch( - sensor: Sensor - ) -> EventLoopFuture where S: BufferInitalizable { - return fetch(mqttSubscription(topic: sensor.topic)) - } - - func fetch(setPoint: KeyPath, setPoints: Topics.SetPoints) -> EventLoopFuture { - return fetch(mqttSubscription(topic: setPoints[keyPath: setPoint])) - } - - func `set`(relay relayTopic: String, to state: Relay.State, qos: MQTTQoS = .atLeastOnce) -> EventLoopFuture { - publish( - to: relayTopic, - payload: ByteBufferAllocator().buffer(string: state.rawValue), - qos: qos + func publishDewPoint( + request: Client.SensorPublishRequest, + state: State, + topics: Topics + ) -> EventLoopFuture<(MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics)> { + guard let (dewPoint, topic) = request.dewPointData(topics: topics, units: state.units) + else { + logger.trace("No dew point for sensor.") + return eventLoopGroup.next().makeSucceededFuture((self, request, state, topics)) + } + logger.debug("Publishing dew-point: \(dewPoint), to: \(topic)") + return publish( + to: topic, + payload: ByteBufferAllocator().buffer(string: "\(dewPoint.rawValue)"), + qos: .atLeastOnce ) + .map { (self, request, state, topics) } } } -extension Result where Success == MQTTPublishInfo, Failure == Error { - - func mapBuffer(to type: S.Type) -> Result where S: BufferInitalizable { - map { info in - var buffer = info.payload - return S.init(buffer: &buffer) +extension EventLoopFuture where Value == (Client.SensorPublishRequest, State) { + func setHasProcessed() -> EventLoopFuture { + map { request, state in + request.setHasProcessed(state: state) } } } -extension Result { - - func fullfill(promise: EventLoopPromise) { - switch self { - case let.success(value): - promise.succeed(value) - case let .failure(error): - promise.fail(error) - } - } - -} - -extension Result where Failure == Error { - - func unwrap( - or error: @autoclosure @escaping () -> F - ) -> Result where Success == Optional, Failure == F { - flatMap { optionalResult in - guard let value = optionalResult else { - return .failure(error()) +extension EventLoopFuture where Value == (MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics) { + func publishEnthalpy() -> EventLoopFuture<(Client.SensorPublishRequest, State)> { + flatMap { client, request, state, topics in + guard let (enthalpy, topic) = request.enthalpyData(altitude: state.altitude, topics: topics, units: state.units) + else { + client.logger.trace("No enthalpy for sensor.") + return client.eventLoopGroup.next().makeSucceededFuture((request, state)) } - return .success(value) - } - } -} - -extension Temperature { - - func convert(to units: PsychrometricEnvironment.Units) -> Self { - let temperatureUnits = Units.defaultFor(units: units) - return .init(self[temperatureUnits], units: temperatureUnits) - } -} - -extension EventLoopFuture where Value == Temperature { - - func convertIfNeeded(to units: PsychrometricEnvironment.Units?) -> EventLoopFuture { - map { currentTemperature in - guard let units = units else { return currentTemperature } - return currentTemperature.convert(to: units) - } - } -} - -extension EventLoopFuture { - - func debug(logger: Logger?) -> EventLoopFuture { - map { value in - logger?.debug("Value: \(value)") - return value + client.logger.debug("Publishing enthalpy: \(enthalpy), to: \(topic)") + return client.publish( + to: topic, + payload: ByteBufferAllocator().buffer(string: "\(enthalpy.rawValue)"), + qos: .atLeastOnce + ) + .map { (request, state) } } } } diff --git a/Sources/ClientLive/Live.swift b/Sources/ClientLive/Live.swift index 4b5d459..0480176 100644 --- a/Sources/ClientLive/Live.swift +++ b/Sources/ClientLive/Live.swift @@ -6,47 +6,7 @@ import MQTTNIO import NIO import Psychrometrics -extension Client.MQTTClient { - - /// Creates the live implementation of our ``Client.MQTTClient`` for the application. - /// - /// - Parameters: - /// - client: The ``MQTTNIO.MQTTClient`` used to send and recieve messages from the MQTT Broker. - public static func live(client: MQTTNIO.MQTTClient, topics: Topics) -> Self { - .init( - fetchHumidity: { sensor in - client.fetch(sensor: sensor) - .debug(logger: client.logger) - }, - fetchSetPoint: { setPointKeyPath in - client.fetch(client.mqttSubscription(topic: topics.setPoints[keyPath: setPointKeyPath])) - .debug(logger: client.logger) - }, - fetchTemperature: { sensor, units in - client.fetch(sensor: sensor) - .debug(logger: client.logger) - .convertIfNeeded(to: units) - .debug(logger: client.logger) - }, - setRelay: { relayKeyPath, state in - client.set(relay: topics.commands.relays[keyPath: relayKeyPath], to: state) - }, - shutdown: { - client.disconnect() - .map { try? client.syncShutdownGracefully() } - }, - publishDewPoint: { dewPoint, topic in - client.publish( - to: topic, - payload: ByteBufferAllocator().buffer(string: "\(dewPoint.rawValue)"), - qos: .atLeastOnce - ) - } - ) - } -} - -extension Client2 { +extension Client { // The state passed in here needs to be a class or we get escaping errors in the `addListeners` method. public static func live( @@ -79,227 +39,3 @@ extension Client2 { ) } } - -// MARK: - Client2 Helpers. -extension MQTTNIO.MQTTClient { - - func logFailure(topic: String, error: Error) { - logger.error("\(topic): \(error)") - } -} - -extension Result where Success == MQTTPublishInfo { - func logIfFailure(client: MQTTNIO.MQTTClient, topic: String) -> ByteBuffer? { - switch self { - case let .success(value): - guard value.topicName == topic else { return nil } - return value.payload - case let .failure(error): - client.logFailure(topic: topic, error: error) - return nil - } - } -} - -extension Optional where Wrapped == ByteBuffer { - - func parse(as type: T.Type) -> T? where T: BufferInitalizable { - switch self { - case var .some(buffer): - return T.init(buffer: &buffer) - case .none: - return nil - } - } -} - -struct TemperatureAndHumiditySensorKeyPathEnvelope { - - let humidityTopic: KeyPath - let temperatureTopic: KeyPath - let temperatureState: WritableKeyPath - let humidityState: WritableKeyPath - - func addListener(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) { - - let temperatureTopic = topics.sensors[keyPath: temperatureTopic] - client.logger.trace("Adding listener for topic: \(temperatureTopic)") - client.addPublishListener(named: temperatureTopic) { result in - result.logIfFailure(client: client, topic: temperatureTopic) - .parse(as: Temperature.self) - .map { temperature in - state.sensors[keyPath: temperatureState] = temperature - } - } - - let humidityTopic = topics.sensors[keyPath: humidityTopic] - client.logger.trace("Adding listener for topic: \(humidityTopic)") - client.addPublishListener(named: humidityTopic) { result in - result.logIfFailure(client: client, topic: humidityTopic) - .parse(as: RelativeHumidity.self) - .map { humidity in - state.sensors[keyPath: humidityState] = humidity - } - } - } -} - -extension Array where Element == TemperatureAndHumiditySensorKeyPathEnvelope { - func addListeners(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) { - _ = self.map { envelope in - envelope.addListener(to: client, topics: topics, state: state) - } - } -} - -extension Array where Element == MQTTSubscribeInfo { - static func sensors(topics: Topics) -> Self { - [ - .init(topicFilter: topics.sensors.mixedAirSensor.temperature, qos: .atLeastOnce), - .init(topicFilter: topics.sensors.mixedAirSensor.humidity, qos: .atLeastOnce), - .init(topicFilter: topics.sensors.postCoilSensor.temperature, qos: .atLeastOnce), - .init(topicFilter: topics.sensors.postCoilSensor.humidity, qos: .atLeastOnce), - .init(topicFilter: topics.sensors.returnAirSensor.temperature, qos: .atLeastOnce), - .init(topicFilter: topics.sensors.returnAirSensor.humidity, qos: .atLeastOnce), - .init(topicFilter: topics.sensors.supplyAirSensor.temperature, qos: .atLeastOnce), - .init(topicFilter: topics.sensors.supplyAirSensor.humidity, qos: .atLeastOnce), - ] - } -} - -extension State { - func addSensorListeners(to client: MQTTNIO.MQTTClient, topics: Topics) { - let envelopes: [TemperatureAndHumiditySensorKeyPathEnvelope] = [ - .init( - humidityTopic: \.mixedAirSensor.humidity, - temperatureTopic: \.mixedAirSensor.temperature, - temperatureState: \.mixedAirSensor.temperature, - humidityState: \.mixedAirSensor.humidity - ), - .init( - humidityTopic: \.postCoilSensor.humidity, - temperatureTopic: \.postCoilSensor.temperature, - temperatureState: \.postCoilSensor.temperature, - humidityState: \.postCoilSensor.humidity - ), - .init( - humidityTopic: \.returnAirSensor.humidity, - temperatureTopic: \.returnAirSensor.temperature, - temperatureState: \.returnAirSensor.temperature, - humidityState: \.returnAirSensor.humidity - ), - .init( - humidityTopic: \.supplyAirSensor.humidity, - temperatureTopic: \.supplyAirSensor.temperature, - temperatureState: \.supplyAirSensor.temperature, - humidityState: \.supplyAirSensor.humidity - ), - ] - envelopes.addListeners(to: client, topics: topics, state: self) - } -} - -extension Client2.SensorPublishRequest { - - func dewPointData(topics: Topics, units: PsychrometricEnvironment.Units?) -> (DewPoint, String)? { - switch self { - case let .mixed(sensor): - guard let dp = sensor.dewPoint(units: units) else { return nil } - return (dp, topics.sensors.mixedAirSensor.dewPoint) - case let .postCoil(sensor): - guard let dp = sensor.dewPoint(units: units) else { return nil } - return (dp, topics.sensors.postCoilSensor.dewPoint) - case let .return(sensor): - guard let dp = sensor.dewPoint(units: units) else { return nil } - return (dp, topics.sensors.returnAirSensor.dewPoint) - case let .supply(sensor): - guard let dp = sensor.dewPoint(units: units) else { return nil } - return (dp, topics.sensors.supplyAirSensor.dewPoint) - } - } - - func enthalpyData(altitude: Length, topics: Topics, units: PsychrometricEnvironment.Units?) -> (EnthalpyOf, String)? { - switch self { - case let .mixed(sensor): - guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } - return (enthalpy, topics.sensors.mixedAirSensor.enthalpy) - case let .postCoil(sensor): - guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } - return (enthalpy, topics.sensors.postCoilSensor.enthalpy) - case let .return(sensor): - guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } - return (enthalpy, topics.sensors.returnAirSensor.enthalpy) - case let .supply(sensor): - guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil } - return (enthalpy, topics.sensors.supplyAirSensor.enthalpy) - } - } - - func setHasProcessed(state: State) { - switch self { - case .mixed: - state.sensors.mixedAirSensor.needsProcessed = false - case .postCoil: - state.sensors.postCoilSensor.needsProcessed = false - case .return: - state.sensors.returnAirSensor.needsProcessed = false - case .supply: - state.sensors.supplyAirSensor.needsProcessed = false - } - } -} - -extension MQTTNIO.MQTTClient { - - func publishDewPoint( - request: Client2.SensorPublishRequest, - state: State, - topics: Topics - ) -> EventLoopFuture<(MQTTNIO.MQTTClient, Client2.SensorPublishRequest, State, Topics)> { - guard let (dewPoint, topic) = request.dewPointData(topics: topics, units: state.units) - else { - logger.trace("No dew point for sensor.") - return eventLoopGroup.next().makeSucceededFuture((self, request, state, topics)) - } - logger.debug("Publishing dew-point: \(dewPoint), to: \(topic)") - return publish( - to: topic, - payload: ByteBufferAllocator().buffer(string: "\(dewPoint.rawValue)"), - qos: .atLeastOnce - ) - .map { (self, request, state, topics) } - } -} - -extension EventLoopFuture where Value == (Client2.SensorPublishRequest, State) { - func setHasProcessed( -// request: Client2.SensorPublishRequest, state: State - ) -> EventLoopFuture { - map { request, state in - request.setHasProcessed(state: state) - } - } -} - -extension EventLoopFuture where Value == (MQTTNIO.MQTTClient, Client2.SensorPublishRequest, State, Topics) { - func publishEnthalpy( -// request: Client2.SensorPublishRequest, -// state: State, -// topics: Topics - ) -> EventLoopFuture<(Client2.SensorPublishRequest, State)> { - flatMap { client, request, state, topics in - guard let (enthalpy, topic) = request.enthalpyData(altitude: state.altitude, topics: topics, units: state.units) - else { - client.logger.trace("No enthalpy for sensor.") - return client.eventLoopGroup.next().makeSucceededFuture((request, state)) - } - client.logger.debug("Publishing enthalpy: \(enthalpy), to: \(topic)") - return client.publish( - to: topic, - payload: ByteBufferAllocator().buffer(string: "\(enthalpy.rawValue)"), - qos: .atLeastOnce - ) - .map { (request, state) } - } - } -} diff --git a/Sources/DewPointEnvironment/Environment.swift b/Sources/DewPointEnvironment/Environment.swift index fd804a3..4e22e9b 100644 --- a/Sources/DewPointEnvironment/Environment.swift +++ b/Sources/DewPointEnvironment/Environment.swift @@ -5,20 +5,17 @@ import MQTTNIO public struct DewPointEnvironment { - public var mqttClient: Client.MQTTClient public var envVars: EnvVars - public var nioClient: MQTTNIO.MQTTClient + public var mqttClient: MQTTNIO.MQTTClient public var topics: Topics public init( - mqttClient: Client.MQTTClient, envVars: EnvVars, - nioClient: MQTTNIO.MQTTClient, + mqttClient: MQTTNIO.MQTTClient, topics: Topics = .init() ) { - self.mqttClient = mqttClient self.envVars = envVars - self.nioClient = nioClient + self.mqttClient = mqttClient self.topics = topics } } diff --git a/Sources/MQTTStore/MQTTStore.swift b/Sources/MQTTStore/MQTTStore.swift deleted file mode 100644 index 0d92c2e..0000000 --- a/Sources/MQTTStore/MQTTStore.swift +++ /dev/null @@ -1,95 +0,0 @@ -import Logging -import Foundation -import MQTTNIO -import NIO - -// TODO: This works and allows tests to complete, but should potentially be simplified. - -typealias PublishTopicHandler = (inout State, Result) -> Void - -struct ServerDetails { - let identifier: String - let hostname: String - let port: Int - let version: MQTTClient.Version - let cleanSession: Bool - let useTLS: Bool - let useWebSocket: Bool - let webSocketUrl: String - let username: String? - let password: String? -} - -class MQTTStore { - typealias Subscription = (topic: String, onPublish: PublishTopicHandler) - - var state: State - var subscriptions: [Subscription] - var client: MQTTClient? - var serverDetails: ServerDetails - var eventLoopGroup: EventLoopGroup - var logger: Logger? - - init( - state: State, - subscriptions: [Subscription], - serverDetails: ServerDetails, - eventLoopGroup: EventLoopGroup, - logger: Logger? = nil - ) { - self.state = state - self.subscriptions = subscriptions - self.serverDetails = serverDetails - self.eventLoopGroup = eventLoopGroup - self.logger = logger - self.createClient() - } - - private func createClient() { - let client = MQTTClient( - host: serverDetails.hostname, - identifier: serverDetails.identifier, - eventLoopGroupProvider: .shared(eventLoopGroup), - logger: logger, - configuration: .init( - version: serverDetails.version, - userName: serverDetails.username, - password: serverDetails.password, - useSSL: serverDetails.useTLS, - useWebSockets: serverDetails.useWebSocket, - webSocketURLPath: serverDetails.webSocketUrl - ) - ) - for subscription in subscriptions { - client.addPublishListener( - named: subscription.topic, - { result in subscription.onPublish(&self.state, result) } - ) - } - self.client = client - } - - func createSubscriptions() -> EventLoopFuture { - let subscriptionInfo = subscriptions.map { MQTTSubscribeInfo.init(topicFilter: $0.0, qos: .atLeastOnce) } - return client?.subscribe(to: subscriptionInfo).map { _ in } ?? eventLoopGroup.next().makeSucceededVoidFuture() - } - - func connect(cleanSession: Bool) -> EventLoopFuture { - client?.connect(cleanSession: cleanSession) ?? eventLoopGroup.next().makeSucceededFuture(false) - } - - func connectAndSubscribe(cleanSession: Bool) -> EventLoopFuture { - connect(cleanSession: cleanSession) - .flatMap{ _ in self.createSubscriptions() } - } - - func destroy() -> EventLoopFuture { - guard let client = client else { - return eventLoopGroup.next().makeSucceededVoidFuture() - } - return client.disconnect().map { _ in - try? self.client?.syncShutdownGracefully() - self.client = nil - } - } -} diff --git a/Sources/Models/Relay.swift b/Sources/Models/Relay.swift deleted file mode 100644 index c337a08..0000000 --- a/Sources/Models/Relay.swift +++ /dev/null @@ -1,40 +0,0 @@ - -/// Represents a relay that can be controlled by the MQTT Broker. -public struct Relay { - - /// The topic for the relay. - public var topic: String - - /// Create a new relay at the given topic. - /// - /// - Parameters: - /// - topic: The topic for commanding the relay. - public init(topic: String) { - self.topic = topic - } -} - -public enum Relay2 { - - /// The topic to read the current state of the relay from. - case read(topic: String) - - /// The topic to command the relay state. - case command(topic: String) -} - -extension Relay { - - /// Represents the different commands that can be sent to a relay. - public enum State: String { - - /// Toggle the relay state on or off based on it's current state. - case toggle - - /// Turn the relay off. - case off - - /// Turn the relay on. - case on - } -} diff --git a/Sources/Models/Sensor.swift b/Sources/Models/Sensor.swift deleted file mode 100644 index e6341bb..0000000 --- a/Sources/Models/Sensor.swift +++ /dev/null @@ -1,15 +0,0 @@ - -/// Represents a sensor that provides a reading. -public struct Sensor: Equatable { - - /// The topic to retrieve the reading from. - public var topic: String - - /// Create a new sensor for the given topic. - /// - /// - Parameters: - /// - topic: The topic to retrieve the readings from. - public init(topic: String) { - self.topic = topic - } -} diff --git a/Sources/dewPoint-controller/main.swift b/Sources/dewPoint-controller/main.swift index ae761b3..6e335e1 100644 --- a/Sources/dewPoint-controller/main.swift +++ b/Sources/dewPoint-controller/main.swift @@ -22,13 +22,13 @@ if environment.envVars.appEnv == .production { logger.logLevel = .info } -let relay = Relay(topic: environment.topics.commands.relays.dehumidification1) -let tempSensor = Sensor(topic: environment.topics.sensors.returnAirSensor.temperature) -let humiditySensor = Sensor(topic: environment.topics.sensors.returnAirSensor.humidity) +//let relay = Relay(topic: environment.topics.commands.relays.dehumidification1) +//let tempSensor = Sensor(topic: environment.topics.sensors.returnAirSensor.temperature) +//let humiditySensor = Sensor(topic: environment.topics.sensors.returnAirSensor.humidity) defer { logger.debug("Disconnecting") - try? environment.mqttClient.shutdown().wait() +// try? environment.mqttClient.shutdown().wait() } while true { @@ -41,18 +41,18 @@ while true { // logger.debug("Fetching dew point...") - let dp = try environment.mqttClient.currentDewPoint( - temperature: tempSensor, - humidity: humiditySensor, - units: .imperial - ).wait() +// let dp = try environment.mqttClient.currentDewPoint( +// temperature: tempSensor, +// humidity: humiditySensor, +// units: .imperial +// ).wait() - logger.info("Dew Point: \(dp.rawValue) \(dp.units.symbol)") +// logger.info("Dew Point: \(dp.rawValue) \(dp.units.symbol)") - try environment.mqttClient.publish( - dewPoint: dp, - to: environment.topics.sensors.returnAirSensor.dewPoint - ).wait() +// try environment.mqttClient.publish( +// dewPoint: dp, +// to: environment.topics.sensors.returnAirSensor.dewPoint +// ).wait() logger.debug("Published dew point...") diff --git a/Tests/ClientTests/ClientTests.swift b/Tests/ClientTests/ClientTests.swift index a595948..71a16f1 100644 --- a/Tests/ClientTests/ClientTests.swift +++ b/Tests/ClientTests/ClientTests.swift @@ -14,16 +14,16 @@ final class ClientLiveTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" let topics = Topics() - func test_mqtt_subscription() throws { - let mqttClient = createMQTTClient(identifier: "test_subscription") - _ = try mqttClient.connect().wait() - let sub = try mqttClient.v5.subscribe( - to: [mqttClient.mqttSubscription(topic: "test/subscription")] - ).wait() - XCTAssertEqual(sub.reasons[0], .grantedQoS1) - try mqttClient.disconnect().wait() - try mqttClient.syncShutdownGracefully() - } +// func test_mqtt_subscription() throws { +// let mqttClient = createMQTTClient(identifier: "test_subscription") +// _ = try mqttClient.connect().wait() +// let sub = try mqttClient.v5.subscribe( +// to: [mqttClient.mqttSubscription(topic: "test/subscription")] +// ).wait() +// XCTAssertEqual(sub.reasons[0], .grantedQoS1) +// try mqttClient.disconnect().wait() +// try mqttClient.syncShutdownGracefully() +// } func test_mqtt_listener() throws { let lock = Lock() @@ -65,7 +65,7 @@ final class ClientLiveTests: XCTestCase { let mqttClient = createMQTTClient(identifier: "return-temperature-tests") let state = State() let topics = Topics() - let client = Client2.live(client: mqttClient, state: state, topics: topics) + let client = Client.live(client: mqttClient, state: state, topics: topics) client.addListeners() try client.connect().wait() @@ -88,7 +88,7 @@ final class ClientLiveTests: XCTestCase { let mqttClient = createMQTTClient(identifier: "return-temperature-tests") let state = State() let topics = Topics() - let client = Client2.live(client: mqttClient, state: state, topics: topics) + let client = Client.live(client: mqttClient, state: state, topics: topics) client.addListeners() try client.connect().wait() @@ -165,12 +165,12 @@ final class ClientLiveTests: XCTestCase { // } // Uses default topic names. - func createClient(mqttClient: MQTTNIO.MQTTClient, autoConnect: Bool = true) throws -> Client.MQTTClient { - if autoConnect { - _ = try mqttClient.connect().wait() - } - return .live(client: mqttClient, topics: .init()) - } +// func createClient(mqttClient: MQTTNIO.MQTTClient, autoConnect: Bool = true) throws -> Client.MQTTClient { +// if autoConnect { +// _ = try mqttClient.connect().wait() +// } +// return .live(client: mqttClient, topics: .init()) +// } let logger: Logger = { var logger = Logger(label: "MQTTTests") diff --git a/Tests/MQTTStoreTests/MQTTStoreTests.swift b/Tests/MQTTStoreTests/MQTTStoreTests.swift deleted file mode 100644 index a88096c..0000000 --- a/Tests/MQTTStoreTests/MQTTStoreTests.swift +++ /dev/null @@ -1,93 +0,0 @@ -import Logging -import XCTest -import MQTTNIO -@testable import MQTTStore -import NIO - -final class ServerTests: XCTestCase { - - func testConnect() throws { - let store = createTestStore() - _ = try store.connect(cleanSession: true).wait() - try store.destroy().wait() - } - - func testSubscriptionHandler() throws { - let store = createTestStore() - _ = try store.connectAndSubscribe(cleanSession: true).wait() - - _ = try store.client?.publish( - to: "test/topic", - payload: ByteBufferAllocator().buffer(string: "test"), - qos: .atLeastOnce - ).wait() - - Thread.sleep(forTimeInterval: 2) - - XCTAssertEqual(store.state.messages.count, 1) - XCTAssertEqual(store.state.messages[0], "test") - try store.destroy().wait() - } - - func createClient(identifier: String? = nil) -> MQTTClient { - MQTTClient( - host: serverDetails.hostname, - identifier: identifier ?? serverDetails.identifier, - eventLoopGroupProvider: .createNew, - logger: logger, - configuration: .init( - version: serverDetails.version, - userName: serverDetails.username, - password: serverDetails.password, - useSSL: serverDetails.useTLS, - useWebSockets: serverDetails.useWebSocket, - webSocketURLPath: serverDetails.webSocketUrl - ) - ) - } - - func createTestStore() -> MQTTStore { - .init( - state: .init(messages: []), - subscriptions: [("test/topic", stateHandler(_:_:))], - serverDetails: serverDetails, - eventLoopGroup: MultiThreadedEventLoopGroup.init(numberOfThreads: 1), - logger: logger - ) - } - - let logger: Logger = { - var logger = Logger(label: "MQTT Test") - logger.logLevel = .trace - return logger - }() - - var serverDetails: ServerDetails { - .init( - identifier: "Test Server", - hostname: "localhost", - port: 1883, - version: .v3_1_1, - cleanSession: true, - useTLS: false, - useWebSocket: false, - webSocketUrl: "/mqtt", - username: nil, - password: nil - ) - } - - struct TestState { - var messages: [String] - } - - func stateHandler(_ state: inout TestState, _ result: Result) { - switch result { - case let .success(value): - let payload = String(buffer: value.payload) - state.messages.append(payload) - case .failure: - break - } - } -}