Cleaned up / removed old MQTT client.

This commit is contained in:
2021-10-31 09:19:10 -04:00
parent 7181476aaf
commit 31eaa6ade1
12 changed files with 236 additions and 775 deletions

View File

@@ -3,6 +3,7 @@ import Logging
import Models
import MQTTNIO
import NIO
import Psychrometrics
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
@@ -33,154 +34,219 @@ extension RelativeHumidity: BufferInitalizable {
}
}
/// Represents errors thrown while communicating with the MQTT Broker.
enum MQTTError: Error {
extension MQTTNIO.MQTTClient {
/// Sensor error.
case sensor(reason: String, error: Error?)
/// Relay error.
case relay(reason: String, error: Error?)
}
protocol FetchableTopic {
associatedtype Value: BufferInitalizable
var topic: String { get }
}
extension Double: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let string = buffer.readString(length: buffer.readableBytes) else {
return nil
}
self.init(string)
func logFailure(topic: String, error: Error) {
logger.error("\(topic): \(error)")
}
}
//extension SetPoint: FetchableTopic {
// typealias Value = Double
//}
extension Result where Success == MQTTPublishInfo {
func logIfFailure(client: MQTTNIO.MQTTClient, topic: String) -> ByteBuffer? {
switch self {
case let .success(value):
guard value.topicName == topic else { return nil }
return value.payload
case let .failure(error):
client.logFailure(topic: topic, error: error)
return nil
}
}
}
extension Sensor: FetchableTopic where Reading: BufferInitalizable {
typealias Value = Reading
extension Optional where Wrapped == ByteBuffer {
func parse<T>(as type: T.Type) -> T? where T: BufferInitalizable {
switch self {
case var .some(buffer):
return T.init(buffer: &buffer)
case .none:
return nil
}
}
}
fileprivate struct TemperatureAndHumiditySensorKeyPathEnvelope {
let humidityTopic: KeyPath<Topics.Sensors, String>
let temperatureTopic: KeyPath<Topics.Sensors, String>
let temperatureState: WritableKeyPath<State.Sensors, Temperature?>
let humidityState: WritableKeyPath<State.Sensors, RelativeHumidity?>
func addListener(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
let temperatureTopic = topics.sensors[keyPath: temperatureTopic]
client.logger.trace("Adding listener for topic: \(temperatureTopic)")
client.addPublishListener(named: temperatureTopic) { result in
result.logIfFailure(client: client, topic: temperatureTopic)
.parse(as: Temperature.self)
.map { temperature in
state.sensors[keyPath: temperatureState] = temperature
}
}
let humidityTopic = topics.sensors[keyPath: humidityTopic]
client.logger.trace("Adding listener for topic: \(humidityTopic)")
client.addPublishListener(named: humidityTopic) { result in
result.logIfFailure(client: client, topic: humidityTopic)
.parse(as: RelativeHumidity.self)
.map { humidity in
state.sensors[keyPath: humidityState] = humidity
}
}
}
}
extension Array where Element == TemperatureAndHumiditySensorKeyPathEnvelope {
func addListeners(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
_ = self.map { envelope in
envelope.addListener(to: client, topics: topics, state: state)
}
}
}
extension Array where Element == MQTTSubscribeInfo {
static func sensors(topics: Topics) -> Self {
[
.init(topicFilter: topics.sensors.mixedAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.mixedAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.humidity, qos: .atLeastOnce),
]
}
}
extension State {
func addSensorListeners(to client: MQTTNIO.MQTTClient, topics: Topics) {
let envelopes: [TemperatureAndHumiditySensorKeyPathEnvelope] = [
.init(
humidityTopic: \.mixedAirSensor.humidity,
temperatureTopic: \.mixedAirSensor.temperature,
temperatureState: \.mixedAirSensor.temperature,
humidityState: \.mixedAirSensor.humidity
),
.init(
humidityTopic: \.postCoilSensor.humidity,
temperatureTopic: \.postCoilSensor.temperature,
temperatureState: \.postCoilSensor.temperature,
humidityState: \.postCoilSensor.humidity
),
.init(
humidityTopic: \.returnAirSensor.humidity,
temperatureTopic: \.returnAirSensor.temperature,
temperatureState: \.returnAirSensor.temperature,
humidityState: \.returnAirSensor.humidity
),
.init(
humidityTopic: \.supplyAirSensor.humidity,
temperatureTopic: \.supplyAirSensor.temperature,
temperatureState: \.supplyAirSensor.temperature,
humidityState: \.supplyAirSensor.humidity
),
]
envelopes.addListeners(to: client, topics: topics, state: self)
}
}
extension Client.SensorPublishRequest {
func dewPointData(topics: Topics, units: PsychrometricEnvironment.Units?) -> (DewPoint, String)? {
switch self {
case let .mixed(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.mixedAirSensor.dewPoint)
case let .postCoil(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.postCoilSensor.dewPoint)
case let .return(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.returnAirSensor.dewPoint)
case let .supply(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.supplyAirSensor.dewPoint)
}
}
func enthalpyData(altitude: Length, topics: Topics, units: PsychrometricEnvironment.Units?) -> (EnthalpyOf<MoistAir>, String)? {
switch self {
case let .mixed(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.mixedAirSensor.enthalpy)
case let .postCoil(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.postCoilSensor.enthalpy)
case let .return(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.returnAirSensor.enthalpy)
case let .supply(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.supplyAirSensor.enthalpy)
}
}
func setHasProcessed(state: State) {
switch self {
case .mixed:
state.sensors.mixedAirSensor.needsProcessed = false
case .postCoil:
state.sensors.postCoilSensor.needsProcessed = false
case .return:
state.sensors.returnAirSensor.needsProcessed = false
case .supply:
state.sensors.supplyAirSensor.needsProcessed = false
}
}
}
extension MQTTNIO.MQTTClient {
func mqttSubscription(
topic: String,
qos: MQTTQoS = .atLeastOnce,
retainAsPublished: Bool = true,
retainHandling: MQTTSubscribeInfoV5.RetainHandling = .sendAlways
) -> MQTTSubscribeInfoV5 {
.init(topicFilter: topic, qos: qos, retainAsPublished: retainAsPublished, retainHandling: retainHandling)
}
func fetch<Value>(
_ subscription: MQTTSubscribeInfoV5
) -> EventLoopFuture<Value> where Value: BufferInitalizable {
logger.debug("Fetching data for: \(subscription.topicFilter)")
return v5.subscribe(to: [subscription])
.flatMap { _ in
let promise = self.eventLoopGroup.next().makePromise(of: Value.self)
self.addPublishListener(named: subscription.topicFilter + "-listener") { result in
result.mapBuffer(to: Value.self)
.unwrap(or: MQTTError.sensor(reason: "Invalid sensor reading", error: nil))
.fullfill(promise: promise)
self.logger.debug("Done fetching data for: \(subscription.topicFilter)")
}
return promise.futureResult
}
}
/// Fetch a sensor state and convert it appropriately, when the sensor type is ``BufferInitializable``.
///
/// - Parameters:
/// - sensor: The sensor to fetch the state of.
func fetch<S>(
sensor: Sensor<S>
) -> EventLoopFuture<S> where S: BufferInitalizable {
return fetch(mqttSubscription(topic: sensor.topic))
}
func fetch(setPoint: KeyPath<Topics.SetPoints, String>, setPoints: Topics.SetPoints) -> EventLoopFuture<Double> {
return fetch(mqttSubscription(topic: setPoints[keyPath: setPoint]))
}
func `set`(relay relayTopic: String, to state: Relay.State, qos: MQTTQoS = .atLeastOnce) -> EventLoopFuture<Void> {
publish(
to: relayTopic,
payload: ByteBufferAllocator().buffer(string: state.rawValue),
qos: qos
func publishDewPoint(
request: Client.SensorPublishRequest,
state: State,
topics: Topics
) -> EventLoopFuture<(MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics)> {
guard let (dewPoint, topic) = request.dewPointData(topics: topics, units: state.units)
else {
logger.trace("No dew point for sensor.")
return eventLoopGroup.next().makeSucceededFuture((self, request, state, topics))
}
logger.debug("Publishing dew-point: \(dewPoint), to: \(topic)")
return publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(dewPoint.rawValue)"),
qos: .atLeastOnce
)
.map { (self, request, state, topics) }
}
}
extension Result where Success == MQTTPublishInfo, Failure == Error {
func mapBuffer<S>(to type: S.Type) -> Result<S?, Error> where S: BufferInitalizable {
map { info in
var buffer = info.payload
return S.init(buffer: &buffer)
extension EventLoopFuture where Value == (Client.SensorPublishRequest, State) {
func setHasProcessed() -> EventLoopFuture<Void> {
map { request, state in
request.setHasProcessed(state: state)
}
}
}
extension Result {
func fullfill(promise: EventLoopPromise<Success>) {
switch self {
case let.success(value):
promise.succeed(value)
case let .failure(error):
promise.fail(error)
}
}
}
extension Result where Failure == Error {
func unwrap<S, F>(
or error: @autoclosure @escaping () -> F
) -> Result<S, Error> where Success == Optional<S>, Failure == F {
flatMap { optionalResult in
guard let value = optionalResult else {
return .failure(error())
extension EventLoopFuture where Value == (MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics) {
func publishEnthalpy() -> EventLoopFuture<(Client.SensorPublishRequest, State)> {
flatMap { client, request, state, topics in
guard let (enthalpy, topic) = request.enthalpyData(altitude: state.altitude, topics: topics, units: state.units)
else {
client.logger.trace("No enthalpy for sensor.")
return client.eventLoopGroup.next().makeSucceededFuture((request, state))
}
return .success(value)
}
}
}
extension Temperature {
func convert(to units: PsychrometricEnvironment.Units) -> Self {
let temperatureUnits = Units.defaultFor(units: units)
return .init(self[temperatureUnits], units: temperatureUnits)
}
}
extension EventLoopFuture where Value == Temperature {
func convertIfNeeded(to units: PsychrometricEnvironment.Units?) -> EventLoopFuture<Temperature> {
map { currentTemperature in
guard let units = units else { return currentTemperature }
return currentTemperature.convert(to: units)
}
}
}
extension EventLoopFuture {
func debug(logger: Logger?) -> EventLoopFuture<Value> {
map { value in
logger?.debug("Value: \(value)")
return value
client.logger.debug("Publishing enthalpy: \(enthalpy), to: \(topic)")
return client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(enthalpy.rawValue)"),
qos: .atLeastOnce
)
.map { (request, state) }
}
}
}

View File

@@ -6,47 +6,7 @@ import MQTTNIO
import NIO
import Psychrometrics
extension Client.MQTTClient {
/// Creates the live implementation of our ``Client.MQTTClient`` for the application.
///
/// - Parameters:
/// - client: The ``MQTTNIO.MQTTClient`` used to send and recieve messages from the MQTT Broker.
public static func live(client: MQTTNIO.MQTTClient, topics: Topics) -> Self {
.init(
fetchHumidity: { sensor in
client.fetch(sensor: sensor)
.debug(logger: client.logger)
},
fetchSetPoint: { setPointKeyPath in
client.fetch(client.mqttSubscription(topic: topics.setPoints[keyPath: setPointKeyPath]))
.debug(logger: client.logger)
},
fetchTemperature: { sensor, units in
client.fetch(sensor: sensor)
.debug(logger: client.logger)
.convertIfNeeded(to: units)
.debug(logger: client.logger)
},
setRelay: { relayKeyPath, state in
client.set(relay: topics.commands.relays[keyPath: relayKeyPath], to: state)
},
shutdown: {
client.disconnect()
.map { try? client.syncShutdownGracefully() }
},
publishDewPoint: { dewPoint, topic in
client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(dewPoint.rawValue)"),
qos: .atLeastOnce
)
}
)
}
}
extension Client2 {
extension Client {
// The state passed in here needs to be a class or we get escaping errors in the `addListeners` method.
public static func live(
@@ -79,227 +39,3 @@ extension Client2 {
)
}
}
// MARK: - Client2 Helpers.
extension MQTTNIO.MQTTClient {
func logFailure(topic: String, error: Error) {
logger.error("\(topic): \(error)")
}
}
extension Result where Success == MQTTPublishInfo {
func logIfFailure(client: MQTTNIO.MQTTClient, topic: String) -> ByteBuffer? {
switch self {
case let .success(value):
guard value.topicName == topic else { return nil }
return value.payload
case let .failure(error):
client.logFailure(topic: topic, error: error)
return nil
}
}
}
extension Optional where Wrapped == ByteBuffer {
func parse<T>(as type: T.Type) -> T? where T: BufferInitalizable {
switch self {
case var .some(buffer):
return T.init(buffer: &buffer)
case .none:
return nil
}
}
}
struct TemperatureAndHumiditySensorKeyPathEnvelope {
let humidityTopic: KeyPath<Topics.Sensors, String>
let temperatureTopic: KeyPath<Topics.Sensors, String>
let temperatureState: WritableKeyPath<State.Sensors, Temperature?>
let humidityState: WritableKeyPath<State.Sensors, RelativeHumidity?>
func addListener(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
let temperatureTopic = topics.sensors[keyPath: temperatureTopic]
client.logger.trace("Adding listener for topic: \(temperatureTopic)")
client.addPublishListener(named: temperatureTopic) { result in
result.logIfFailure(client: client, topic: temperatureTopic)
.parse(as: Temperature.self)
.map { temperature in
state.sensors[keyPath: temperatureState] = temperature
}
}
let humidityTopic = topics.sensors[keyPath: humidityTopic]
client.logger.trace("Adding listener for topic: \(humidityTopic)")
client.addPublishListener(named: humidityTopic) { result in
result.logIfFailure(client: client, topic: humidityTopic)
.parse(as: RelativeHumidity.self)
.map { humidity in
state.sensors[keyPath: humidityState] = humidity
}
}
}
}
extension Array where Element == TemperatureAndHumiditySensorKeyPathEnvelope {
func addListeners(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
_ = self.map { envelope in
envelope.addListener(to: client, topics: topics, state: state)
}
}
}
extension Array where Element == MQTTSubscribeInfo {
static func sensors(topics: Topics) -> Self {
[
.init(topicFilter: topics.sensors.mixedAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.mixedAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.humidity, qos: .atLeastOnce),
]
}
}
extension State {
func addSensorListeners(to client: MQTTNIO.MQTTClient, topics: Topics) {
let envelopes: [TemperatureAndHumiditySensorKeyPathEnvelope] = [
.init(
humidityTopic: \.mixedAirSensor.humidity,
temperatureTopic: \.mixedAirSensor.temperature,
temperatureState: \.mixedAirSensor.temperature,
humidityState: \.mixedAirSensor.humidity
),
.init(
humidityTopic: \.postCoilSensor.humidity,
temperatureTopic: \.postCoilSensor.temperature,
temperatureState: \.postCoilSensor.temperature,
humidityState: \.postCoilSensor.humidity
),
.init(
humidityTopic: \.returnAirSensor.humidity,
temperatureTopic: \.returnAirSensor.temperature,
temperatureState: \.returnAirSensor.temperature,
humidityState: \.returnAirSensor.humidity
),
.init(
humidityTopic: \.supplyAirSensor.humidity,
temperatureTopic: \.supplyAirSensor.temperature,
temperatureState: \.supplyAirSensor.temperature,
humidityState: \.supplyAirSensor.humidity
),
]
envelopes.addListeners(to: client, topics: topics, state: self)
}
}
extension Client2.SensorPublishRequest {
func dewPointData(topics: Topics, units: PsychrometricEnvironment.Units?) -> (DewPoint, String)? {
switch self {
case let .mixed(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.mixedAirSensor.dewPoint)
case let .postCoil(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.postCoilSensor.dewPoint)
case let .return(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.returnAirSensor.dewPoint)
case let .supply(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.supplyAirSensor.dewPoint)
}
}
func enthalpyData(altitude: Length, topics: Topics, units: PsychrometricEnvironment.Units?) -> (EnthalpyOf<MoistAir>, String)? {
switch self {
case let .mixed(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.mixedAirSensor.enthalpy)
case let .postCoil(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.postCoilSensor.enthalpy)
case let .return(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.returnAirSensor.enthalpy)
case let .supply(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.supplyAirSensor.enthalpy)
}
}
func setHasProcessed(state: State) {
switch self {
case .mixed:
state.sensors.mixedAirSensor.needsProcessed = false
case .postCoil:
state.sensors.postCoilSensor.needsProcessed = false
case .return:
state.sensors.returnAirSensor.needsProcessed = false
case .supply:
state.sensors.supplyAirSensor.needsProcessed = false
}
}
}
extension MQTTNIO.MQTTClient {
func publishDewPoint(
request: Client2.SensorPublishRequest,
state: State,
topics: Topics
) -> EventLoopFuture<(MQTTNIO.MQTTClient, Client2.SensorPublishRequest, State, Topics)> {
guard let (dewPoint, topic) = request.dewPointData(topics: topics, units: state.units)
else {
logger.trace("No dew point for sensor.")
return eventLoopGroup.next().makeSucceededFuture((self, request, state, topics))
}
logger.debug("Publishing dew-point: \(dewPoint), to: \(topic)")
return publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(dewPoint.rawValue)"),
qos: .atLeastOnce
)
.map { (self, request, state, topics) }
}
}
extension EventLoopFuture where Value == (Client2.SensorPublishRequest, State) {
func setHasProcessed(
// request: Client2.SensorPublishRequest, state: State
) -> EventLoopFuture<Void> {
map { request, state in
request.setHasProcessed(state: state)
}
}
}
extension EventLoopFuture where Value == (MQTTNIO.MQTTClient, Client2.SensorPublishRequest, State, Topics) {
func publishEnthalpy(
// request: Client2.SensorPublishRequest,
// state: State,
// topics: Topics
) -> EventLoopFuture<(Client2.SensorPublishRequest, State)> {
flatMap { client, request, state, topics in
guard let (enthalpy, topic) = request.enthalpyData(altitude: state.altitude, topics: topics, units: state.units)
else {
client.logger.trace("No enthalpy for sensor.")
return client.eventLoopGroup.next().makeSucceededFuture((request, state))
}
client.logger.debug("Publishing enthalpy: \(enthalpy), to: \(topic)")
return client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(enthalpy.rawValue)"),
qos: .atLeastOnce
)
.map { (request, state) }
}
}
}