Cleaning up the api.
This commit is contained in:
139
Sources/ClientLive/Helpers.swift
Normal file
139
Sources/ClientLive/Helpers.swift
Normal file
@@ -0,0 +1,139 @@
|
||||
import CoreUnitTypes
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
/// Represents a type that can be initialized by a ``ByteBuffer``.
|
||||
protocol BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer)
|
||||
}
|
||||
|
||||
extension Temperature: BufferInitalizable {
|
||||
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let string = buffer.readString(length: buffer.readableBytes, encoding: .utf8),
|
||||
let value = Double(string)
|
||||
else {
|
||||
return nil
|
||||
}
|
||||
self.init(value, units: .celsius)
|
||||
}
|
||||
}
|
||||
|
||||
extension RelativeHumidity: BufferInitalizable {
|
||||
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let string = buffer.readString(length: buffer.readableBytes, encoding: .utf8),
|
||||
let value = Double(string)
|
||||
else {
|
||||
return nil
|
||||
}
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents errors thrown while communicating with the MQTT Broker.
|
||||
enum MQTTError: Error {
|
||||
|
||||
/// Sensor error.
|
||||
case sensor(reason: String, error: Error?)
|
||||
|
||||
/// Relay error.
|
||||
case relay(reason: String, error: Error?)
|
||||
}
|
||||
|
||||
extension MQTTNIO.MQTTClient {
|
||||
|
||||
/// Fetch a sensor state and convert it appropriately, when the sensor type is ``BufferInitializable``.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - sensor: The sensor to fetch the state of.
|
||||
func fetch<S>(
|
||||
sensor: Sensor<S>
|
||||
) -> EventLoopFuture<S> where S: BufferInitalizable {
|
||||
logger.debug("Fetching data for sensor: \(sensor.topic)")
|
||||
let subscription = MQTTSubscribeInfoV5.init(
|
||||
topicFilter: sensor.topic,
|
||||
qos: .atLeastOnce,
|
||||
retainAsPublished: true,
|
||||
retainHandling: .sendAlways
|
||||
)
|
||||
return v5.subscribe(to: [subscription])
|
||||
.flatMap { _ in
|
||||
let promise = self.eventLoopGroup.next().makePromise(of: S.self)
|
||||
self.addPublishListener(named: sensor.topic) { result in
|
||||
|
||||
result.mapBuffer(to: S.self)
|
||||
.unwrap(or: MQTTError.sensor(reason: "Invalid sensor reading", error: nil))
|
||||
.fullfill(promise: promise)
|
||||
|
||||
self.logger.debug("Done fetching data for sensor: \(sensor.topic)")
|
||||
}
|
||||
|
||||
return promise.futureResult
|
||||
}
|
||||
}
|
||||
|
||||
func `set`(relay: Relay, to state: Relay.State, qos: MQTTQoS = .atLeastOnce) -> EventLoopFuture<Void> {
|
||||
publish(
|
||||
to: relay.topic,
|
||||
payload: ByteBufferAllocator().buffer(string: state.rawValue),
|
||||
qos: qos
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
extension Result where Success == MQTTPublishInfo, Failure == Error {
|
||||
|
||||
func mapBuffer<S>(to type: S.Type) -> Result<S?, Error> where S: BufferInitalizable {
|
||||
map { info in
|
||||
var buffer = info.payload
|
||||
return S.init(buffer: &buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Result {
|
||||
|
||||
func fullfill(promise: EventLoopPromise<Success>) {
|
||||
switch self {
|
||||
case let.success(value):
|
||||
promise.succeed(value)
|
||||
case let .failure(error):
|
||||
promise.fail(error)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
extension Result where Failure == Error {
|
||||
|
||||
func unwrap<S, F>(
|
||||
or error: @autoclosure @escaping () -> F
|
||||
) -> Result<S, Error> where Success == Optional<S>, Failure == F {
|
||||
flatMap { optionalResult in
|
||||
guard let value = optionalResult else {
|
||||
return .failure(error())
|
||||
}
|
||||
return .success(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Temperature {
|
||||
|
||||
func convert(to units: PsychrometricEnvironment.Units) -> Self {
|
||||
let temperatureUnits = Units.defaultFor(units: units)
|
||||
return .init(self[temperatureUnits], units: temperatureUnits)
|
||||
}
|
||||
}
|
||||
|
||||
extension EventLoopFuture where Value == Temperature {
|
||||
|
||||
func convertIfNeeded(to units: PsychrometricEnvironment.Units?) -> EventLoopFuture<Temperature> {
|
||||
map { currentTemperature in
|
||||
guard let units = units else { return currentTemperature }
|
||||
return currentTemperature.convert(to: units)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,135 +5,35 @@ import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
extension Client {
|
||||
extension Client.MQTTClient {
|
||||
|
||||
public static func live(client: MQTTClient) -> Self {
|
||||
/// Creates the live implementation of our ``Client.MQTTClient`` for the application.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - client: The ``MQTTNIO.MQTTClient`` used to send and recieve messages from the MQTT Broker.
|
||||
public static func live(client: MQTTNIO.MQTTClient) -> Self {
|
||||
.init(
|
||||
fetchHumidity: { sensor in
|
||||
client.fetchHumidity(sensor: sensor)
|
||||
client.fetch(sensor: sensor)
|
||||
},
|
||||
fetchTemperature: { sensor, units in
|
||||
client.fetchTemperature(sensor: sensor, units: units)
|
||||
client.fetch(sensor: sensor)
|
||||
.convertIfNeeded(to: units)
|
||||
},
|
||||
toggleRelay: { relay in
|
||||
client.publish(relay: relay, state: .toggle, qos: .atLeastOnce)
|
||||
},
|
||||
turnOnRelay: { relay in
|
||||
client.publish(relay: relay, state: .on, qos: .atLeastOnce)
|
||||
},
|
||||
turnOffRelay: { relay in
|
||||
client.publish(relay: relay, state: .off, qos: .atLeastOnce)
|
||||
setRelay: { relay, state in
|
||||
client.set(relay: relay, to: state)
|
||||
},
|
||||
shutdown: {
|
||||
client.disconnect()
|
||||
.map { try? client.syncShutdownGracefully() }
|
||||
},
|
||||
publishDewPoint: { dewPoint, topic in
|
||||
client.publish(
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(dewPoint.rawValue)"),
|
||||
qos: .atLeastOnce
|
||||
)
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
enum TemperatureError: Error {
|
||||
case invalidTemperature
|
||||
}
|
||||
|
||||
enum HumidityError: Error {
|
||||
case invalidHumidity
|
||||
}
|
||||
|
||||
extension Relay {
|
||||
enum State: String {
|
||||
case toggle, on, off
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTClient {
|
||||
|
||||
fileprivate func publish(relay: Relay, state: Relay.State, qos: MQTTQoS = .atLeastOnce) -> EventLoopFuture<Void> {
|
||||
publish(
|
||||
to: relay.topic,
|
||||
payload: ByteBufferAllocator().buffer(string: state.rawValue),
|
||||
qos: qos
|
||||
)
|
||||
}
|
||||
|
||||
// MARK: - TODO it feels like the subscriptions should happen in the `bootstrap` process.
|
||||
fileprivate func fetchTemperature(
|
||||
sensor: TemperatureSensor,
|
||||
units: PsychrometricEnvironment.Units?
|
||||
) -> EventLoopFuture<Temperature> {
|
||||
logger.debug("Adding listener for temperature sensor...")
|
||||
let subscription = MQTTSubscribeInfoV5.init(
|
||||
topicFilter: sensor.topic,
|
||||
qos: .atLeastOnce,
|
||||
retainAsPublished: true,
|
||||
retainHandling: .sendAlways
|
||||
)
|
||||
return v5.subscribe(to: [subscription])
|
||||
.flatMap { _ in
|
||||
let promise = self.eventLoopGroup.next().makePromise(of: Temperature.self)
|
||||
self.addPublishListener(named: "temperature-sensor", { result in
|
||||
switch result.temperature() {
|
||||
case let .success(celsius):
|
||||
let userUnits = units ?? PsychrometricEnvironment.shared.units
|
||||
let temperatureUnits = Temperature.Units.defaultFor(units: userUnits)
|
||||
promise.succeed(.init(celsius[temperatureUnits], units: temperatureUnits))
|
||||
case let .failure(error):
|
||||
promise.fail(error)
|
||||
}
|
||||
})
|
||||
|
||||
return promise.futureResult
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - TODO it feels like the subscriptions should happen in the `bootstrap` process.
|
||||
fileprivate func fetchHumidity(sensor: HumiditySensor) -> EventLoopFuture<RelativeHumidity> {
|
||||
logger.debug("Adding listener for humidity sensor...")
|
||||
let subscription = MQTTSubscribeInfoV5.init(
|
||||
topicFilter: sensor.topic,
|
||||
qos: .atLeastOnce,
|
||||
retainAsPublished: true,
|
||||
retainHandling: .sendAlways
|
||||
)
|
||||
return v5.subscribe(to: [subscription])
|
||||
.flatMap { _ in
|
||||
let promise = self.eventLoopGroup.next().makePromise(of: RelativeHumidity.self)
|
||||
self.addPublishListener(named: "humidity-sensor", { result in
|
||||
switch result.humidity() {
|
||||
case let .success(humidity):
|
||||
promise.succeed(humidity)
|
||||
case let .failure(error):
|
||||
promise.fail(error)
|
||||
}
|
||||
})
|
||||
return promise.futureResult
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Result where Success == MQTTPublishInfo, Failure == Error {
|
||||
|
||||
fileprivate func humidity() -> Result<RelativeHumidity, Error> {
|
||||
flatMap { info in
|
||||
var buffer = info.payload
|
||||
guard let string = buffer.readString(length: buffer.readableBytes),
|
||||
let double = Double(string)
|
||||
else {
|
||||
return .failure(HumidityError.invalidHumidity)
|
||||
}
|
||||
return .success(.init(double))
|
||||
}
|
||||
}
|
||||
|
||||
fileprivate func temperature() -> Result<Temperature, Error> {
|
||||
flatMap { info in
|
||||
var buffer = info.payload
|
||||
guard let string = buffer.readString(length: buffer.readableBytes),
|
||||
let temperatureValue = Double(string)
|
||||
else {
|
||||
return .failure(TemperatureError.invalidTemperature)
|
||||
}
|
||||
return .success(.celsius(temperatureValue))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user