diff --git a/.swiftformat b/.swiftformat index d812125..b7d8e75 100644 --- a/.swiftformat +++ b/.swiftformat @@ -2,7 +2,8 @@ --indent 2 --ifdef indent --trimwhitespace always ---wraparguments preserve +--wraparguments before-first +--wrapparameters before-first --wrapcollections preserve --wrapconditions after-first --typeblanklines preserve diff --git a/Sources/ClientLive/AsyncClient.swift b/Sources/ClientLive/AsyncClient.swift new file mode 100644 index 0000000..396dcae --- /dev/null +++ b/Sources/ClientLive/AsyncClient.swift @@ -0,0 +1,226 @@ +import EnvVars +import Logging +import Models +import MQTTNIO +import NIO +import Psychrometrics + +public class AsyncClient { + + public static let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + public let client: MQTTClient + public private(set) var shuttingDown: Bool + public private(set) var sensors: [TemperatureAndHumiditySensor] + + var logger: Logger { client.logger } + + public init( + envVars: EnvVars, + logger: Logger, + sensors: [TemperatureAndHumiditySensor] = [] + ) { + let config = MQTTClient.Configuration( + version: .v3_1_1, + userName: envVars.userName, + password: envVars.password, + useSSL: false, + useWebSockets: false, + tlsConfiguration: nil, + webSocketURLPath: nil + ) + self.client = MQTTClient( + host: envVars.host, + identifier: envVars.identifier, + eventLoopGroupProvider: .shared(Self.eventLoopGroup), + logger: logger, + configuration: config + ) + self.shuttingDown = false + self.sensors = sensors + } + + public func addSensor(_ sensor: TemperatureAndHumiditySensor) throws { + guard sensors.firstIndex(where: { $0.location == sensor.location }) == nil else { + throw SensorExists() + } + sensors.append(sensor) + } + + public func connect() async { + do { + try await client.connect() + client.addCloseListener(named: "AsyncClient") { [self] _ in + guard !self.shuttingDown else { return } + Task { + self.logger.debug("Connection closed.") + self.logger.debug("Reconnecting...") + await self.connect() + } + } + logger.debug("Connection successful.") + } catch { + logger.trace("Connection Failed.\n\(error)") + } + } + + public func shutdown() async { + shuttingDown = true + try? await client.disconnect() + try? await client.shutdown() + } + + /// Subscribe to changes of the temperature and humidity sensors. + func subscribeToSensors(qos: MQTTQoS = .exactlyOnce) async throws { + for sensor in sensors { + try await client.subscribeToSensor(sensor, qos: qos) + } + } + + func addSensorListeners(qos: MQTTQoS = .exactlyOnce) async throws { + for sensor in sensors { + try await client.subscribeToSensor(sensor, qos: qos) + let listener = client.createPublishListener() + for await result in listener { + switch result { + case let .success(value): + var buffer = value.payload + let topic = value.topicName + logger.debug("Received new value for topic: \(topic)") + + if topic.contains("temperature") { + // Decode and update the temperature value + guard let temperature = Temperature(buffer: &buffer) else { + logger.debug("Failed to decode temperature from buffer: \(buffer)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) + + } else if topic.contains("humidity") { + // Decode and update the temperature value + guard let humidity = RelativeHumidity(buffer: &buffer) else { + logger.debug("Failed to decode humidity from buffer: \(buffer)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) + + } else { + let message = """ + Unexpected value for topic: \(topic) + Expected to contain either 'temperature' or 'humidity' + """ + logger.debug("\(message)") + } + + // TODO: Publish dew-point & enthalpy if needed. + + case let .failure(error): + logger.trace("Error:\n\(error)") + throw error + } + } + } + } + + // Need to save the recieved values somewhere. + // TODO: Remove. + func addPublishListener( + topic: String, + decoding _: T.Type + ) async throws where T: BufferInitalizable { + _ = try await client.subscribe(to: [.init(topicFilter: topic, qos: .atLeastOnce)]) + Task { + let listener = self.client.createPublishListener() + for await result in listener { + switch result { + case let .success(packet): + var buffer = packet.payload + guard let value = T(buffer: &buffer) else { + logger.debug("Could not decode buffer: \(buffer)") + return + } + logger.debug("Recieved value: \(value)") + case let .failure(error): + logger.trace("Error:\n\(error)") + } + } + } + } + + private func publish(string: String, to topic: String) async throws { + try await client.publish( + to: topic, + payload: ByteBufferAllocator().buffer(string: string), + qos: .atLeastOnce + ) + } + + private func publish(double: Double, to topic: String) async throws { + let rounded = round(double * 100) / 100 + try await publish(string: "\(rounded)", to: topic) + } + + func publishDewPoint(_ request: Client.SensorPublishRequest) async throws { + // fix + guard let (dewPoint, topic) = request.dewPointData(topics: .init(), units: nil) else { return } + try await publish(double: dewPoint.rawValue, to: topic) + logger.debug("Published dewpoint: \(dewPoint.rawValue), to: \(topic)") + } + + func publishEnthalpy(_ request: Client.SensorPublishRequest) async throws { + // fix + guard let (enthalpy, topic) = request.enthalpyData(altitude: .seaLevel, topics: .init(), units: nil) else { return } + try await publish(double: enthalpy.rawValue, to: topic) + logger.debug("Publihsed enthalpy: \(enthalpy.rawValue), to: \(topic)") + } + + public func publishSensor(_ request: Client.SensorPublishRequest) async throws { + try await publishDewPoint(request) + try await publishEnthalpy(request) + } +} + +// MARK: - Helpers + +private extension MQTTClient { + + func subscribeToSensor( + _ sensor: TemperatureAndHumiditySensor, + qos: MQTTQoS = .exactlyOnce + ) async throws { + do { + _ = try await subscribe(to: [ + MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: qos), + MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: qos) + ]) + logger.debug("Subscribed to temperature-humidity sensor: \(sensor.id)") + } catch { + logger.trace("Failed to subscribe to temperature-humidity sensor: \(sensor.id)") + throw error + } + } +} + +struct DecodingError: Error {} +struct NotFoundError: Error {} +struct SensorExists: Error {} + +extension TemperatureAndHumiditySensor.Topics { + func contains(_ topic: String) -> Bool { + temperature == topic || humidity == topic + } +} + +extension Array where Element == TemperatureAndHumiditySensor { + + mutating func update( + topic: String, + keyPath: WritableKeyPath, + with value: V + ) throws { + guard let index = firstIndex(where: { $0.topics.contains(topic) }) else { + throw NotFoundError() + } + self[index][keyPath: keyPath] = value + } + +} diff --git a/Sources/ClientLive/Live.swift b/Sources/ClientLive/Live.swift index 0a80053..068a983 100755 --- a/Sources/ClientLive/Live.swift +++ b/Sources/ClientLive/Live.swift @@ -1,15 +1,15 @@ -import Foundation @_exported import Client import CoreUnitTypes +import Foundation import Models import MQTTNIO import NIO import Psychrometrics -extension Client { +public extension Client { // The state passed in here needs to be a class or we get escaping errors in the `addListeners` method. - public static func live( + static func live( client: MQTTNIO.MQTTClient, state: State, topics: Topics @@ -34,124 +34,8 @@ extension Client { subscribe: { // Sensor subscriptions client.subscribe(to: .sensors(topics: topics)) - .map { _ in } + .map { _ in } } ) } } - -import Logging -import EnvVars - -public class AsyncClient { - - public static let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - public let client: MQTTClient - public private(set) var shuttingDown: Bool - - var logger: Logger { client.logger } - - public init(envVars: EnvVars, logger: Logger) { - let config = MQTTClient.Configuration.init( - version: .v3_1_1, - userName: envVars.userName, - password: envVars.password, - useSSL: false, - useWebSockets: false, - tlsConfiguration: nil, - webSocketURLPath: nil - ) - self.client = MQTTClient( - host: envVars.host, - identifier: envVars.identifier, - eventLoopGroupProvider: .shared(Self.eventLoopGroup), - logger: logger, - configuration: config - ) - self.shuttingDown = false - } - - public func connect() async { - do { - try await self.client.connect() - self.client.addCloseListener(named: "AsyncClient") { [self] result in - guard !self.shuttingDown else { return } - Task { - self.logger.debug("Connection closed.") - self.logger.debug("Reconnecting...") - await self.connect() - } - } - logger.debug("Connection successful.") - } catch { - logger.trace("Connection Failed.\n\(error)") - } - } - - public func shutdown() async { - self.shuttingDown = true - try? await self.client.disconnect() - try? await self.client.shutdown() - } - - func addSensorListeners() async { - - } - - // Need to save the recieved values somewhere. - func addPublishListener( - topic: String, - decoding: T.Type - ) async throws where T: BufferInitalizable { - _ = try await self.client.subscribe(to: [.init(topicFilter: topic, qos: .atLeastOnce)]) - Task { - let listener = self.client.createPublishListener() - for await result in listener { - switch result { - case let .success(packet): - var buffer = packet.payload - guard let value = T.init(buffer: &buffer) else { - logger.debug("Could not decode buffer: \(buffer)") - return - } - logger.debug("Recieved value: \(value)") - case let .failure(error): - logger.trace("Error:\n\(error)") - } - } - } - } - - - private func publish(string: String, to topic: String) async throws { - try await self.client.publish( - to: topic, - payload: ByteBufferAllocator().buffer(string: string), - qos: .atLeastOnce - ) - } - - private func publish(double: Double, to topic: String) async throws { - let rounded = round(double * 100) / 100 - try await publish(string: "\(rounded)", to: topic) - } - - func publishDewPoint(_ request: Client.SensorPublishRequest) async throws { - // fix - guard let (dewPoint, topic) = request.dewPointData(topics: .init(), units: nil) else { return } - try await self.publish(double: dewPoint.rawValue, to: topic) - logger.debug("Published dewpoint: \(dewPoint.rawValue), to: \(topic)") - } - - func publishEnthalpy(_ request: Client.SensorPublishRequest) async throws { - // fix - guard let (enthalpy, topic) = request.enthalpyData(altitude: .seaLevel, topics: .init(), units: nil) else { return } - try await self.publish(double: enthalpy.rawValue, to: topic) - logger.debug("Publihsed enthalpy: \(enthalpy.rawValue), to: \(topic)") - } - - public func publishSensor(_ request: Client.SensorPublishRequest) async throws { - try await publishDewPoint(request) - try await publishEnthalpy(request) - } -} diff --git a/Sources/Models/TemperatureAndHumiditySensor.swift b/Sources/Models/TemperatureAndHumiditySensor.swift index fdd067a..40f071a 100644 --- a/Sources/Models/TemperatureAndHumiditySensor.swift +++ b/Sources/Models/TemperatureAndHumiditySensor.swift @@ -3,7 +3,7 @@ import Psychrometrics /// Represents a temperature and humidity sensor that can be used to derive /// the dew-point temperature and enthalpy values. /// -public struct TemperatureAndHumiditySensor: Equatable, Identifiable { +public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable { /// The identifier of the sensor, same as the location. public var id: Location { location } @@ -93,7 +93,7 @@ public struct TemperatureAndHumiditySensor: Equatable, Identifiable { } /// Represents the MQTT topics to listen for updated sensor values on. - public struct Topics: Equatable { + public struct Topics: Equatable, Hashable { /// The temperature topic of the sensor. public let temperature: String @@ -111,7 +111,7 @@ public struct TemperatureAndHumiditySensor: Equatable, Identifiable { init(location: TemperatureAndHumiditySensor.Location) { self.temperature = "sensors/\(location.rawValue)/temperature" - self.humidity = "sensors/\(location.rawValue)/temperature" + self.humidity = "sensors/\(location.rawValue)/humidity" } } } diff --git a/Sources/Models/TrackedChanges.swift b/Sources/Models/TrackedChanges.swift index 77f3ca8..e8ac271 100755 --- a/Sources/Models/TrackedChanges.swift +++ b/Sources/Models/TrackedChanges.swift @@ -88,3 +88,11 @@ extension TrackedChanges: Equatable where Value: Equatable { self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==) } } + +extension TrackedChanges: Hashable where Value: Hashable { + + public func hash(into hasher: inout Hasher) { + hasher.combine(wrappedValue) + hasher.combine(needsProcessed) + } +} diff --git a/Tests/ClientTests/AsyncClientTests.swift b/Tests/ClientTests/AsyncClientTests.swift index 847520a..b6f6a70 100755 --- a/Tests/ClientTests/AsyncClientTests.swift +++ b/Tests/ClientTests/AsyncClientTests.swift @@ -45,47 +45,69 @@ final class AsyncClientTests: XCTestCase { await client.shutdown() } - func testNewSensorSyntax() async throws { - let client = createClient(identifier: "testNewSensorSyntax") + func testSensor() async throws { + let client = createClient(identifier: "testSensor") let mqtt = client.client - let receivedPublishInfo = PublishInfoContainer() - let payload = ByteBufferAllocator().buffer(string: "75.123") - let sensor = TemperatureAndHumiditySensor(location: .return) - + try client.addSensor(.init(location: .mixedAir)) await client.connect() - try await mqtt.subscribeToTemperature(sensor: sensor) + Task { try await client.addSensorListeners() } - let listener = mqtt.createPublishListener() - - Task { [receivedPublishInfo] in - for await result in listener { - switch result { - case let .failure(error): - XCTFail("\(error)") - case let .success(publish): - await receivedPublishInfo.addPublishInfo(publish) - } - } - } - - try await mqtt.publish(to: sensor.topics.temperature, payload: payload, qos: .atLeastOnce) + try await mqtt.publish( + to: "sensors/mixed-air/temperture", + payload: ByteBufferAllocator().buffer(string: "75.123"), + qos: .atLeastOnce + ) try await Task.sleep(for: .seconds(2)) - XCTAssertEqual(receivedPublishInfo.count, 1) + XCTAssert(client.sensors.first!.needsProcessed) + XCTAssertEqual(client.sensors.first!.temperature, 75.123) - if let publish = receivedPublishInfo.first { - var buffer = publish.payload - let string = buffer.readString(length: buffer.readableBytes) - XCTAssertEqual(string, "75.123") - } else { - XCTFail("Did not receive any publish info.") - } - - try await mqtt.disconnect() - try mqtt.syncShutdownGracefully() + await client.shutdown() } + +// func testNewSensorSyntax() async throws { +// let client = createClient(identifier: "testNewSensorSyntax") +// let mqtt = client.client +// let receivedPublishInfo = PublishInfoContainer() +// let payload = ByteBufferAllocator().buffer(string: "75.123") +// let sensor = TemperatureAndHumiditySensor(location: .return) +// +// await client.connect() +// +// try await mqtt.subscribeToTemperature(sensor: sensor) +// +// let listener = mqtt.createPublishListener() +// +// Task { [receivedPublishInfo] in +// for await result in listener { +// switch result { +// case let .failure(error): +// XCTFail("\(error)") +// case let .success(publish): +// await receivedPublishInfo.addPublishInfo(publish) +// } +// } +// } +// +// try await mqtt.publish(to: sensor.topics.temperature, payload: payload, qos: .atLeastOnce) +// +// try await Task.sleep(for: .seconds(2)) +// +// XCTAssertEqual(receivedPublishInfo.count, 1) +// +// if let publish = receivedPublishInfo.first { +// var buffer = publish.payload +// let string = buffer.readString(length: buffer.readableBytes) +// XCTAssertEqual(string, "75.123") +// } else { +// XCTFail("Did not receive any publish info.") +// } +// +// try await mqtt.disconnect() +// try mqtt.syncShutdownGracefully() +// } } // MARK: Helpers for tests, some of these should be able to be removed once the AsyncClient interface is done.