From d01b515be4051fc6061709628288154f6cdaaf15 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Tue, 19 Apr 2022 16:19:08 -0400 Subject: [PATCH] Working on async version. --- Package.swift | 3 +- Sources/ClientLive/Live.swift | 116 +++++++++++++++++++++++ Tests/ClientTests/AsyncClientTests.swift | 45 +++++++++ 3 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 Tests/ClientTests/AsyncClientTests.swift diff --git a/Package.swift b/Package.swift index 1196a3c..46ce0a8 100644 --- a/Package.swift +++ b/Package.swift @@ -5,7 +5,7 @@ import PackageDescription let package = Package( name: "dewPoint-controller", platforms: [ - .macOS(.v10_15) + .macOS(.v12) ], products: [ .executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]), @@ -79,6 +79,7 @@ let package = Package( name: "ClientLive", dependencies: [ "Client", + "EnvVars", .product(name: "MQTTNIO", package: "mqtt-nio") ] ), diff --git a/Sources/ClientLive/Live.swift b/Sources/ClientLive/Live.swift index 0480176..44dbd31 100644 --- a/Sources/ClientLive/Live.swift +++ b/Sources/ClientLive/Live.swift @@ -39,3 +39,119 @@ extension Client { ) } } + +import Logging +import NIOTransportServices +import EnvVars + +public class AsyncClient { + public static let eventLoopGroup = NIOTSEventLoopGroup() + public let client: MQTTClient + public private(set) var shuttingDown: Bool + + var logger: Logger { client.logger } + + public init(envVars: EnvVars, logger: Logger) { + let config = MQTTClient.Configuration.init( + version: .v3_1_1, + userName: envVars.userName, + password: envVars.password, + useSSL: false, + useWebSockets: false, + tlsConfiguration: nil, + webSocketURLPath: nil + ) + self.client = .init( + host: envVars.host, + identifier: envVars.identifier, + eventLoopGroupProvider: .shared(Self.eventLoopGroup), + logger: logger, + configuration: config + ) + self.shuttingDown = false + } + + public func connect() async { + do { + try await self.client.connect() + self.client.addCloseListener(named: "AsyncClient") { [self] result in + guard !self.shuttingDown else { return } + Task { + self.logger.debug("Connection closed.") + self.logger.debug("Reconnecting...") + await self.connect() + } + } + logger.debug("Connection successful.") + } catch { + logger.trace("Connection Failed.\n\(error)") + } + } + + public func shutdown() async { + self.shuttingDown = true + try? await self.client.disconnect() + try? await self.client.shutdown() + } + + func addSensorListeners() async { + + } + + // Need to save the recieved values somewhere. + func addPublishListener( + topic: String, + decoding: T.Type + ) async throws where T: BufferInitalizable { + _ = try await self.client.subscribe(to: [.init(topicFilter: topic, qos: .atLeastOnce)]) + Task { + let listener = self.client.createPublishListener() + for await result in listener { + switch result { + case let .success(packet): + var buffer = packet.payload + guard let value = T.init(buffer: &buffer) else { + logger.debug("Could not decode buffer: \(buffer)") + return + } + logger.debug("Recieved value: \(value)") + case let .failure(error): + logger.trace("Error:\n\(error)") + } + } + } + } + + + private func publish(string: String, to topic: String) async throws { + try await self.client.publish( + to: topic, + payload: ByteBufferAllocator().buffer(string: string), + qos: .atLeastOnce + ) + } + + private func publish(double: Double, to topic: String) async throws { + let rounded = round(double * 100) / 100 + try await publish(string: "\(rounded)", to: topic) + } + + func publishDewPoint(_ request: Client.SensorPublishRequest) async throws { + // fix + guard let (dewPoint, topic) = request.dewPointData(topics: .init(), units: nil) else { return } + try await self.publish(double: dewPoint.rawValue, to: topic) + logger.debug("Published dewpoint: \(dewPoint.rawValue), to: \(topic)") + } + + func publishEnthalpy(_ request: Client.SensorPublishRequest) async throws { + // fix + guard let (enthalpy, topic) = request.enthalpyData(altitude: .seaLevel, topics: .init(), units: nil) else { return } + try await self.publish(double: enthalpy.rawValue, to: topic) + logger.debug("Publihsed enthalpy: \(enthalpy.rawValue), to: \(topic)") + } + + public func publishSensor(_ request: Client.SensorPublishRequest) async throws { + try await publishDewPoint(request) + try await publishEnthalpy(request) + } +} diff --git a/Tests/ClientTests/AsyncClientTests.swift b/Tests/ClientTests/AsyncClientTests.swift new file mode 100644 index 0000000..afd9a1d --- /dev/null +++ b/Tests/ClientTests/AsyncClientTests.swift @@ -0,0 +1,45 @@ +import XCTest +import EnvVars +import Logging +import Models +@testable import ClientLive +import Psychrometrics + +final class AsyncClientTests: XCTestCase { + + static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" + + static let logger: Logger = { + var logger = Logger(label: "AsyncClientTests") + logger.logLevel = .trace + return logger + }() + + func createClient(identifier: String) -> AsyncClient { + let envVars = EnvVars.init( + appEnv: .testing, + host: Self.hostname, + port: "1883", + identifier: identifier, + userName: nil, + password: nil + ) + return .init(envVars: envVars, logger: Self.logger) + } + + func testConnectAndShutdown() async throws { + let client = createClient(identifier: "testConnectAndShutdown") + await client.connect() + await client.shutdown() + } + + func testPublishingSensor() async throws { + let client = createClient(identifier: "testPublishingSensor") + await client.connect() + let topic = Topics().sensors.mixedAirSensor.dewPoint + try await client.addPublishListener(topic: topic, decoding: Temperature.self) + try await client.publishSensor(.mixed(.init(temperature: 71.123, humidity: 50.5, needsProcessed: true))) + try await client.publishSensor(.mixed(.init(temperature: 72.123, humidity: 50.5, needsProcessed: true))) + await client.shutdown() + } +}