From adc7fc12958cfd6ad72ff63bf9c146eaf8fc3bad Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Fri, 8 Nov 2024 17:14:22 -0500 Subject: [PATCH] feat: Working on async integrations. --- Sources/Bootstrap/Bootstrap.swift | 72 +++++---- Sources/ClientLive/Helpers.swift | 1 + ...{AsyncClient.swift => SensorsClient.swift} | 138 ++++++++---------- Sources/DewPointEnvironment/Environment.swift | 4 +- Sources/EnvVars/EnvVars.swift | 20 +-- Sources/Models/Mode.swift | 22 +-- Sources/Models/State.swift | 14 +- .../Models/TemperatureAndHumiditySensor.swift | 42 ++++-- Sources/Models/Topics.swift | 2 + Sources/Models/TrackedChanges.swift | 2 +- Sources/TopicsLive/Live.swift | 19 +-- Tests/ClientTests/AsyncClientTests.swift | 138 ------------------ Tests/ClientTests/SensorsClientTests.swift | 116 +++++++++++++++ docker-compose.yaml | 2 - 14 files changed, 289 insertions(+), 303 deletions(-) rename Sources/ClientLive/{AsyncClient.swift => SensorsClient.swift} (55%) delete mode 100755 Tests/ClientTests/AsyncClientTests.swift create mode 100755 Tests/ClientTests/SensorsClientTests.swift diff --git a/Sources/Bootstrap/Bootstrap.swift b/Sources/Bootstrap/Bootstrap.swift index c560dd8..05058b5 100755 --- a/Sources/Bootstrap/Bootstrap.swift +++ b/Sources/Bootstrap/Bootstrap.swift @@ -1,8 +1,8 @@ import ClientLive import DewPointEnvironment import EnvVars -import Logging import Foundation +import Logging import Models import MQTTNIO import NIO @@ -18,9 +18,8 @@ public func bootstrap( logger: Logger? = nil, autoConnect: Bool = true ) -> EventLoopFuture { - logger?.debug("Bootstrapping Dew Point Controller...") - + return loadEnvVars(eventLoopGroup: eventLoopGroup, logger: logger) .and(loadTopics(eventLoopGroup: eventLoopGroup, logger: logger)) .makeDewPointEnvironment(eventLoopGroup: eventLoopGroup, logger: logger) @@ -36,106 +35,105 @@ private func loadEnvVars( eventLoopGroup: EventLoopGroup, logger: Logger? ) -> EventLoopFuture { - logger?.debug("Loading env vars...") - + // TODO: Need to have the env file path passed in / dynamic. let envFilePath = URL(fileURLWithPath: #file) .deletingLastPathComponent() .deletingLastPathComponent() .deletingLastPathComponent() .appendingPathComponent(".dewPoint-env") - + let decoder = JSONDecoder() let encoder = JSONEncoder() - + let defaultEnvVars = EnvVars() - + let defaultEnvDict = (try? encoder.encode(defaultEnvVars)) .flatMap { try? decoder.decode([String: String].self, from: $0) } ?? [:] - + // Read from file `.dewPoint-env` file if it exists. let localEnvVarsDict = (try? Data(contentsOf: envFilePath)) .flatMap { try? decoder.decode([String: String].self, from: $0) } ?? [:] - + // Merge with variables in the shell environment. let envVarsDict = defaultEnvDict .merging(localEnvVarsDict, uniquingKeysWith: { $1 }) .merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 }) - + // Produces the final env vars from the merged items or uses defaults if something // went wrong. let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict)) .flatMap { try? decoder.decode(EnvVars.self, from: $0) } - ?? defaultEnvVars - + ?? defaultEnvVars + logger?.debug("Done loading env vars...") return eventLoopGroup.next().makeSucceededFuture(envVars) } // MARK: TODO perhaps make loading from file an option passed in when app is launched. + /// Load the topics from file in application root directory at `.topics`, if available or fall back to the defualt. /// /// - Parameters: /// - eventLoopGroup: The event loop group for the application. /// - logger: An optional logger for debugging. private func loadTopics(eventLoopGroup: EventLoopGroup, logger: Logger?) -> EventLoopFuture { - logger?.debug("Loading topics from file...") - + let topicsFilePath = URL(fileURLWithPath: #file) .deletingLastPathComponent() .deletingLastPathComponent() .deletingLastPathComponent() .appendingPathComponent(".topics") - + let decoder = JSONDecoder() - + // Attempt to load the topics from file in root directory. - let localTopics = (try? Data.init(contentsOf: topicsFilePath)) + let localTopics = (try? Data(contentsOf: topicsFilePath)) .flatMap { try? decoder.decode(Topics.self, from: $0) } - + logger?.debug( localTopics == nil ? "Failed to load topics from file, falling back to defaults." : "Done loading topics from file." ) - + // If we were able to load from file use that, else fallback to the defaults. return eventLoopGroup.next().makeSucceededFuture(localTopics ?? .init()) } -extension EventLoopFuture where Value == (EnvVars, Topics) { - +private extension EventLoopFuture where Value == (EnvVars, Topics) { + /// Creates the ``DewPointEnvironment`` for the application after the ``EnvVars`` have been loaded. /// /// - Parameters: /// - eventLoopGroup: The event loop group for the application. /// - logger: An optional logger for the application. - fileprivate func makeDewPointEnvironment( + func makeDewPointEnvironment( eventLoopGroup: EventLoopGroup, logger: Logger? ) -> EventLoopFuture { - map { envVars, topics in - let mqttClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger) - return DewPointEnvironment.init( - envVars: envVars, - mqttClient: mqttClient, - topics: topics - ) - } + map { envVars, topics in + let mqttClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger) + return DewPointEnvironment( + envVars: envVars, + mqttClient: mqttClient, + topics: topics + ) + } } } -extension EventLoopFuture where Value == DewPointEnvironment { - +private extension EventLoopFuture where Value == DewPointEnvironment { + /// Connects to the MQTT broker after the ``DewPointEnvironment`` has been setup. /// /// - Parameters: /// - logger: An optional logger for debugging. - fileprivate func connectToMQTTBroker(autoConnect: Bool, logger: Logger?) -> EventLoopFuture { + func connectToMQTTBroker(autoConnect: Bool, logger: Logger?) -> EventLoopFuture { guard autoConnect else { return self } return flatMap { environment in logger?.debug("Connecting to MQTT Broker...") @@ -148,9 +146,9 @@ extension EventLoopFuture where Value == DewPointEnvironment { } } -extension MQTTNIO.MQTTClient { - - fileprivate convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) { +private extension MQTTNIO.MQTTClient { + + convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) { self.init( host: envVars.host, port: envVars.port != nil ? Int(envVars.port!) : nil, diff --git a/Sources/ClientLive/Helpers.swift b/Sources/ClientLive/Helpers.swift index e047ed6..4998ca5 100755 --- a/Sources/ClientLive/Helpers.swift +++ b/Sources/ClientLive/Helpers.swift @@ -40,6 +40,7 @@ extension RelativeHumidity: BufferInitalizable { } } +// TODO: Remove below when migrated to async client. extension MQTTNIO.MQTTClient { /// Logs a failure for a given topic and error. func logFailure(topic: String, error: Error) { diff --git a/Sources/ClientLive/AsyncClient.swift b/Sources/ClientLive/SensorsClient.swift similarity index 55% rename from Sources/ClientLive/AsyncClient.swift rename to Sources/ClientLive/SensorsClient.swift index 396dcae..e64eb8b 100644 --- a/Sources/ClientLive/AsyncClient.swift +++ b/Sources/ClientLive/SensorsClient.swift @@ -5,7 +5,8 @@ import MQTTNIO import NIO import Psychrometrics -public class AsyncClient { +// TODO: Pass in eventLoopGroup and MQTTClient. +public actor SensorsClient { public static let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) public let client: MQTTClient @@ -39,17 +40,17 @@ public class AsyncClient { self.sensors = sensors } - public func addSensor(_ sensor: TemperatureAndHumiditySensor) throws { + public func addSensor(_ sensor: TemperatureAndHumiditySensor) async throws { guard sensors.firstIndex(where: { $0.location == sensor.location }) == nil else { throw SensorExists() } sensors.append(sensor) } - public func connect() async { + public func connect(cleanSession: Bool = true) async { do { - try await client.connect() - client.addCloseListener(named: "AsyncClient") { [self] _ in + try await client.connect(cleanSession: cleanSession) + client.addCloseListener(named: "SensorsClient") { [self] _ in guard !self.shuttingDown else { return } Task { self.logger.debug("Connection closed.") @@ -63,6 +64,17 @@ public class AsyncClient { } } + public func start() async throws { + do { + try await subscribeToSensors() + try await addSensorListeners() + logger.debug("Begin listening to sensors...") + } catch { + logger.trace("Error:\n\(error)") + throw error + } + } + public func shutdown() async { shuttingDown = true try? await client.disconnect() @@ -77,105 +89,61 @@ public class AsyncClient { } func addSensorListeners(qos: MQTTQoS = .exactlyOnce) async throws { - for sensor in sensors { - try await client.subscribeToSensor(sensor, qos: qos) - let listener = client.createPublishListener() - for await result in listener { + try await subscribeToSensors(qos: qos) + client.addPublishListener(named: "SensorsClient") { result in + do { switch result { case let .success(value): var buffer = value.payload let topic = value.topicName - logger.debug("Received new value for topic: \(topic)") + self.logger.trace("Received new value for topic: \(topic)") if topic.contains("temperature") { // Decode and update the temperature value guard let temperature = Temperature(buffer: &buffer) else { - logger.debug("Failed to decode temperature from buffer: \(buffer)") + self.logger.debug("Failed to decode temperature from buffer: \(buffer)") throw DecodingError() } - try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) - + try self.sensors.update(topic: topic, keyPath: \.temperature, with: temperature) + Task { try await self.publishUpdates() } } else if topic.contains("humidity") { // Decode and update the temperature value guard let humidity = RelativeHumidity(buffer: &buffer) else { - logger.debug("Failed to decode humidity from buffer: \(buffer)") + self.logger.debug("Failed to decode humidity from buffer: \(buffer)") throw DecodingError() } - try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) - - } else { - let message = """ - Unexpected value for topic: \(topic) - Expected to contain either 'temperature' or 'humidity' - """ - logger.debug("\(message)") + try self.sensors.update(topic: topic, keyPath: \.humidity, with: humidity) + Task { try await self.publishUpdates() } } - // TODO: Publish dew-point & enthalpy if needed. - case let .failure(error): - logger.trace("Error:\n\(error)") + self.logger.trace("Error:\n\(error)") throw error } + } catch { + self.logger.trace("Error:\n\(error)") } } } - // Need to save the recieved values somewhere. - // TODO: Remove. - func addPublishListener( - topic: String, - decoding _: T.Type - ) async throws where T: BufferInitalizable { - _ = try await client.subscribe(to: [.init(topicFilter: topic, qos: .atLeastOnce)]) - Task { - let listener = self.client.createPublishListener() - for await result in listener { - switch result { - case let .success(packet): - var buffer = packet.payload - guard let value = T(buffer: &buffer) else { - logger.debug("Could not decode buffer: \(buffer)") - return - } - logger.debug("Recieved value: \(value)") - case let .failure(error): - logger.trace("Error:\n\(error)") - } - } - } - } - - private func publish(string: String, to topic: String) async throws { + private func publish(double: Double?, to topic: String) async throws { + guard let double else { return } + let rounded = round(double * 100) / 100 + logger.debug("Publishing \(rounded), to: \(topic)") try await client.publish( to: topic, - payload: ByteBufferAllocator().buffer(string: string), - qos: .atLeastOnce + payload: ByteBufferAllocator().buffer(string: "\(rounded)"), + qos: .exactlyOnce, + retain: true ) } - private func publish(double: Double, to topic: String) async throws { - let rounded = round(double * 100) / 100 - try await publish(string: "\(rounded)", to: topic) - } - - func publishDewPoint(_ request: Client.SensorPublishRequest) async throws { - // fix - guard let (dewPoint, topic) = request.dewPointData(topics: .init(), units: nil) else { return } - try await publish(double: dewPoint.rawValue, to: topic) - logger.debug("Published dewpoint: \(dewPoint.rawValue), to: \(topic)") - } - - func publishEnthalpy(_ request: Client.SensorPublishRequest) async throws { - // fix - guard let (enthalpy, topic) = request.enthalpyData(altitude: .seaLevel, topics: .init(), units: nil) else { return } - try await publish(double: enthalpy.rawValue, to: topic) - logger.debug("Publihsed enthalpy: \(enthalpy.rawValue), to: \(topic)") - } - - public func publishSensor(_ request: Client.SensorPublishRequest) async throws { - try await publishDewPoint(request) - try await publishEnthalpy(request) + func publishUpdates() async throws { + for sensor in sensors.filter(\.needsProcessed) { + try await publish(double: sensor.dewPoint?.rawValue, to: sensor.topics.dewPoint) + try await publish(double: sensor.enthalpy?.rawValue, to: sensor.topics.enthalpy) + try sensors.hasProcessed(sensor) + } } } @@ -204,13 +172,22 @@ struct DecodingError: Error {} struct NotFoundError: Error {} struct SensorExists: Error {} -extension TemperatureAndHumiditySensor.Topics { +private extension TemperatureAndHumiditySensor.Topics { func contains(_ topic: String) -> Bool { temperature == topic || humidity == topic } } -extension Array where Element == TemperatureAndHumiditySensor { +// TODO: Move to dewpoint-controller/main.swift +public extension Array where Element == TemperatureAndHumiditySensor { + static var live: Self { + TemperatureAndHumiditySensor.Location.allCases.map { + TemperatureAndHumiditySensor(location: $0) + } + } +} + +private extension Array where Element == TemperatureAndHumiditySensor { mutating func update( topic: String, @@ -223,4 +200,11 @@ extension Array where Element == TemperatureAndHumiditySensor { self[index][keyPath: keyPath] = value } + mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws { + guard let index = firstIndex(where: { $0.id == sensor.id }) else { + throw NotFoundError() + } + self[index].needsProcessed = false + } + } diff --git a/Sources/DewPointEnvironment/Environment.swift b/Sources/DewPointEnvironment/Environment.swift index 4e22e9b..4fa9e2f 100755 --- a/Sources/DewPointEnvironment/Environment.swift +++ b/Sources/DewPointEnvironment/Environment.swift @@ -4,11 +4,11 @@ import Models import MQTTNIO public struct DewPointEnvironment { - + public var envVars: EnvVars public var mqttClient: MQTTNIO.MQTTClient public var topics: Topics - + public init( envVars: EnvVars, mqttClient: MQTTNIO.MQTTClient, diff --git a/Sources/EnvVars/EnvVars.swift b/Sources/EnvVars/EnvVars.swift index 47f4a26..d0d9fe1 100755 --- a/Sources/EnvVars/EnvVars.swift +++ b/Sources/EnvVars/EnvVars.swift @@ -5,25 +5,25 @@ import Foundation /// /// This allows us to keep sensitve settings out of the repository. public struct EnvVars: Codable, Equatable { - + /// The current app environment. public var appEnv: AppEnv - + /// The MQTT host. public var host: String - + /// The MQTT port. public var port: String? - + /// The identifier to use when connecting to the MQTT broker. public var identifier: String - + /// The MQTT user name. public var userName: String? - + /// The MQTT user password. public var password: String? - + /// Create a new ``EnvVars`` /// /// - Parameters: @@ -40,7 +40,7 @@ public struct EnvVars: Codable, Equatable { identifier: String = "dewPoint-controller", userName: String? = "mqtt_user", password: String? = "secret!" - ){ + ) { self.appEnv = appEnv self.host = host self.port = port @@ -48,7 +48,7 @@ public struct EnvVars: Codable, Equatable { self.userName = userName self.password = password } - + /// Custom coding keys. private enum CodingKeys: String, CodingKey { case appEnv = "APP_ENV" @@ -58,7 +58,7 @@ public struct EnvVars: Codable, Equatable { case userName = "MQTT_USERNAME" case password = "MQTT_PASSWORD" } - + /// Represents the different app environments. public enum AppEnv: String, Codable { case development diff --git a/Sources/Models/Mode.swift b/Sources/Models/Mode.swift index de7ef8d..d093e73 100755 --- a/Sources/Models/Mode.swift +++ b/Sources/Models/Mode.swift @@ -1,36 +1,38 @@ import CoreUnitTypes +// TODO: Remove + /// Represents the different modes that the controller can be in. public enum Mode: Equatable { - + /// Allows controller to run in humidify or dehumidify mode. case auto - + /// Only handle humidify mode. case humidifyOnly(HumidifyMode) - + /// Only handle dehumidify mode. case dehumidifyOnly(DehumidifyMode) - + /// Don't control humidify or dehumidify modes. case off - + /// Represents the control modes for the humidify control state. public enum HumidifyMode: Equatable { - + /// Control humidifying based off dew-point. case dewPoint(Temperature) - + /// Control humidifying based off relative humidity. case relativeHumidity(RelativeHumidity) } - + /// Represents the control modes for the dehumidify control state. public enum DehumidifyMode: Equatable { - + /// Control dehumidifying based off dew-point. case dewPoint(high: Temperature, low: Temperature) - + /// Control humidifying based off relative humidity. case relativeHumidity(high: RelativeHumidity, low: RelativeHumidity) } diff --git a/Sources/Models/State.swift b/Sources/Models/State.swift index 58fc585..7753ebc 100755 --- a/Sources/Models/State.swift +++ b/Sources/Models/State.swift @@ -1,6 +1,7 @@ import Foundation import Psychrometrics +// TODO: Remove // TODO: Make this a struct, then create a Store class that holds the state?? public final class State { @@ -50,9 +51,9 @@ public final class State { } } -extension State.Sensors { +public extension State.Sensors { - public struct TemperatureHumiditySensor: Equatable { + struct TemperatureHumiditySensor: Equatable { @TrackedChanges public var temperature: Temperature? @@ -97,8 +98,9 @@ extension State.Sensors { } // MARK: - Temperature / Humidity Sensor Location Namespaces - public enum MixedAir { } - public enum PostCoil { } - public enum Return { } - public enum Supply { } + + enum MixedAir {} + enum PostCoil {} + enum Return {} + enum Supply {} } diff --git a/Sources/Models/TemperatureAndHumiditySensor.swift b/Sources/Models/TemperatureAndHumiditySensor.swift index 40f071a..b382be8 100644 --- a/Sources/Models/TemperatureAndHumiditySensor.swift +++ b/Sources/Models/TemperatureAndHumiditySensor.swift @@ -75,6 +75,8 @@ public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable { } /// Check whether any of the sensor values have changed and need processed. + /// + /// - Note: Setting a value will set to both the temperature and humidity properties. public var needsProcessed: Bool { get { $temperature.needsProcessed || $humidity.needsProcessed } set { @@ -85,9 +87,9 @@ public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable { /// Represents the different locations of a temperature and humidity sensor, which can /// be used to derive the topic to both listen and publish new values to. - public enum Location: String, Equatable, Hashable { - case mixedAir = "mixed-air" - case postCoil = "post-coil" + public enum Location: String, CaseIterable, Equatable, Hashable { + case mixedAir = "mixed_air" + case postCoil = "post_coil" case `return` case supply } @@ -95,23 +97,41 @@ public struct TemperatureAndHumiditySensor: Equatable, Hashable, Identifiable { /// Represents the MQTT topics to listen for updated sensor values on. public struct Topics: Equatable, Hashable { - /// The temperature topic of the sensor. - public let temperature: String + /// The dew-point temperature topic for the sensor. + public let dewPoint: String + + /// The enthalpy topic for the sensor. + public let enthalpy: String /// The humidity topic of the sensor. public let humidity: String + /// The temperature topic of the sensor. + public let temperature: String + public init( - temperature: String, - humidity: String + dewPoint: String, + enthalpy: String, + humidity: String, + temperature: String ) { - self.temperature = temperature + self.dewPoint = dewPoint + self.enthalpy = enthalpy self.humidity = humidity + self.temperature = temperature } - init(location: TemperatureAndHumiditySensor.Location) { - self.temperature = "sensors/\(location.rawValue)/temperature" - self.humidity = "sensors/\(location.rawValue)/humidity" + public init(topicPrefix: String? = "frankensystem", location: TemperatureAndHumiditySensor.Location) { + var prefix = topicPrefix ?? "" + if prefix.reversed().starts(with: "/") { + prefix = "\(prefix.dropLast())" + } + self.init( + dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state", + enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state", + humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state", + temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state" + ) } } } diff --git a/Sources/Models/Topics.swift b/Sources/Models/Topics.swift index 96aa85e..69e6dc3 100755 --- a/Sources/Models/Topics.swift +++ b/Sources/Models/Topics.swift @@ -1,3 +1,5 @@ +// TODO: Remove + /// A container for all the different MQTT topics that are needed by the application. public struct Topics: Codable, Equatable { /// The command topics the application can publish to. diff --git a/Sources/Models/TrackedChanges.swift b/Sources/Models/TrackedChanges.swift index e8ac271..800b505 100755 --- a/Sources/Models/TrackedChanges.swift +++ b/Sources/Models/TrackedChanges.swift @@ -52,7 +52,7 @@ public struct TrackedChanges { case needsProcessed } - /// Check whether the value needs processed. + /// Whether the value needs processed. public var needsProcessed: Bool { get { tracking == .needsProcessed } set { diff --git a/Sources/TopicsLive/Live.swift b/Sources/TopicsLive/Live.swift index 3a4ad57..a4efc8b 100755 --- a/Sources/TopicsLive/Live.swift +++ b/Sources/TopicsLive/Live.swift @@ -1,27 +1,28 @@ import Models // TODO: Fix other live topics -extension Topics { - - public static let live = Self.init( +public extension Topics { + + static let live = Self( commands: .init(), sensors: .init( mixedAirSensor: .live(location: .mixedAir), postCoilSensor: .live(location: .postCoil), returnAirSensor: .live(location: .return), - supplyAirSensor: .live(location: .supply)), + supplyAirSensor: .live(location: .supply) + ), setPoints: .init(), states: .init() ) } -extension Topics.Sensors { - fileprivate enum Location: CustomStringConvertible { +private extension Topics.Sensors { + enum Location: CustomStringConvertible { case mixedAir case postCoil case `return` case supply - + var description: String { switch self { case .mixedAir: @@ -37,8 +38,8 @@ extension Topics.Sensors { } } -extension Topics.Sensors.TemperatureAndHumiditySensor { - fileprivate static func live( +private extension Topics.Sensors.TemperatureAndHumiditySensor { + static func live( prefix: String = "frankensystem", location: Topics.Sensors.Location ) -> Self { diff --git a/Tests/ClientTests/AsyncClientTests.swift b/Tests/ClientTests/AsyncClientTests.swift deleted file mode 100755 index b6f6a70..0000000 --- a/Tests/ClientTests/AsyncClientTests.swift +++ /dev/null @@ -1,138 +0,0 @@ -@testable import ClientLive -import EnvVars -import Logging -import Models -import MQTTNIO -import NIO -import Psychrometrics -import XCTest - -final class AsyncClientTests: XCTestCase { - - static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" - - static let logger: Logger = { - var logger = Logger(label: "AsyncClientTests") - logger.logLevel = .trace - return logger - }() - - func createClient(identifier: String) -> AsyncClient { - let envVars = EnvVars( - appEnv: .testing, - host: Self.hostname, - port: "1883", - identifier: identifier, - userName: nil, - password: nil - ) - return .init(envVars: envVars, logger: Self.logger) - } - - func testConnectAndShutdown() async throws { - let client = createClient(identifier: "testConnectAndShutdown") - await client.connect() - await client.shutdown() - } - - func testPublishingSensor() async throws { - let client = createClient(identifier: "testPublishingSensor") - await client.connect() - let topic = Topics().sensors.mixedAirSensor.dewPoint - try await client.addPublishListener(topic: topic, decoding: Temperature.self) - try await client.publishSensor(.mixed(.init(temperature: 71.123, humidity: 50.5, needsProcessed: true))) - try await client.publishSensor(.mixed(.init(temperature: 72.123, humidity: 50.5, needsProcessed: true))) - await client.shutdown() - } - - func testSensor() async throws { - let client = createClient(identifier: "testSensor") - let mqtt = client.client - try client.addSensor(.init(location: .mixedAir)) - await client.connect() - - Task { try await client.addSensorListeners() } - - try await mqtt.publish( - to: "sensors/mixed-air/temperture", - payload: ByteBufferAllocator().buffer(string: "75.123"), - qos: .atLeastOnce - ) - - try await Task.sleep(for: .seconds(2)) - - XCTAssert(client.sensors.first!.needsProcessed) - XCTAssertEqual(client.sensors.first!.temperature, 75.123) - - await client.shutdown() - } - -// func testNewSensorSyntax() async throws { -// let client = createClient(identifier: "testNewSensorSyntax") -// let mqtt = client.client -// let receivedPublishInfo = PublishInfoContainer() -// let payload = ByteBufferAllocator().buffer(string: "75.123") -// let sensor = TemperatureAndHumiditySensor(location: .return) -// -// await client.connect() -// -// try await mqtt.subscribeToTemperature(sensor: sensor) -// -// let listener = mqtt.createPublishListener() -// -// Task { [receivedPublishInfo] in -// for await result in listener { -// switch result { -// case let .failure(error): -// XCTFail("\(error)") -// case let .success(publish): -// await receivedPublishInfo.addPublishInfo(publish) -// } -// } -// } -// -// try await mqtt.publish(to: sensor.topics.temperature, payload: payload, qos: .atLeastOnce) -// -// try await Task.sleep(for: .seconds(2)) -// -// XCTAssertEqual(receivedPublishInfo.count, 1) -// -// if let publish = receivedPublishInfo.first { -// var buffer = publish.payload -// let string = buffer.readString(length: buffer.readableBytes) -// XCTAssertEqual(string, "75.123") -// } else { -// XCTFail("Did not receive any publish info.") -// } -// -// try await mqtt.disconnect() -// try mqtt.syncShutdownGracefully() -// } -} - -// MARK: Helpers for tests, some of these should be able to be removed once the AsyncClient interface is done. - -extension MQTTClient { - - func subscribeToTemperature(sensor: TemperatureAndHumiditySensor) async throws { - _ = try await subscribe(to: [ - .init(topicFilter: sensor.topics.temperature, qos: .atLeastOnce) - ]) - } -} - -class PublishInfoContainer { - private var receivedPublishInfo: [MQTTPublishInfo] - - init() { - self.receivedPublishInfo = [] - } - - func addPublishInfo(_ info: MQTTPublishInfo) async { - receivedPublishInfo.append(info) - } - - var count: Int { receivedPublishInfo.count } - - var first: MQTTPublishInfo? { receivedPublishInfo.first } -} diff --git a/Tests/ClientTests/SensorsClientTests.swift b/Tests/ClientTests/SensorsClientTests.swift new file mode 100755 index 0000000..bb8bd07 --- /dev/null +++ b/Tests/ClientTests/SensorsClientTests.swift @@ -0,0 +1,116 @@ +@testable import ClientLive +import EnvVars +import Logging +import Models +import MQTTNIO +import NIO +import Psychrometrics +import XCTest + +final class AsyncClientTests: XCTestCase { + + static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" + + static let logger: Logger = { + var logger = Logger(label: "AsyncClientTests") + logger.logLevel = .debug + return logger + }() + + func createClient(identifier: String) -> SensorsClient { + let envVars = EnvVars( + appEnv: .testing, + host: Self.hostname, + port: "1883", + identifier: identifier, + userName: nil, + password: nil + ) + return .init(envVars: envVars, logger: Self.logger) + } + + func testConnectAndShutdown() async throws { + let client = createClient(identifier: "testConnectAndShutdown") + await client.connect() + await client.shutdown() + } + + func testSensorCapturesPublishedState() async throws { + let client = createClient(identifier: "testSensorCapturesPublishedState") + let mqtt = await client.client + let sensor = TemperatureAndHumiditySensor(location: .mixedAir, units: .metric) + let publishInfo = PublishInfoContainer(topicFilters: [ + sensor.topics.dewPoint, + sensor.topics.enthalpy + ]) + + try await client.addSensor(sensor) + await client.connect() + try await client.start() + + _ = try await mqtt.subscribe(to: [ + .init(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce), + .init(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce) + ]) + + let listener = mqtt.createPublishListener() + Task { + for await result in listener { + switch result { + case let .failure(error): + XCTFail("\(error)") + case let .success(value): + await publishInfo.addPublishInfo(value) + } + } + } + + try await mqtt.publish( + to: sensor.topics.temperature, + payload: ByteBufferAllocator().buffer(string: "75.123"), + qos: .exactlyOnce, + retain: true + ) + + try await Task.sleep(for: .seconds(1)) + + // XCTAssert(client.sensors.first!.needsProcessed) + let firstSensor = await client.sensors.first! + XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius)) + + try await mqtt.publish( + to: sensor.topics.humidity, + payload: ByteBufferAllocator().buffer(string: "50"), + qos: .exactlyOnce, + retain: true + ) + + try await Task.sleep(for: .seconds(1)) + + XCTAssertEqual(publishInfo.info.count, 2) + + await client.shutdown() + } +} + +// MARK: Helpers for tests. + +class PublishInfoContainer { + private(set) var info: [MQTTPublishInfo] + private var topicFilters: [String]? + + init(topicFilters: [String]? = nil) { + self.info = [] + self.topicFilters = topicFilters + } + + func addPublishInfo(_ info: MQTTPublishInfo) async { + guard let topicFilters else { + self.info.append(info) + return + } + if topicFilters.contains(info.topicName) { + self.info.append(info) + } + } +} diff --git a/docker-compose.yaml b/docker-compose.yaml index 10dc381..c0ff58c 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -13,8 +13,6 @@ services: working_dir: /app networks: - test -# volumes: -# - .:/app depends_on: - mosquitto-test environment: