diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme index e1db4cc..8f82c6f 100755 --- a/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme +++ b/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme @@ -146,6 +146,20 @@ ReferencedContainer = "container:"> + + + + + + + + + + + + EventLoopFuture { -// logger?.debug("Bootstrapping Dew Point Controller...") -// -// return loadEnvVars(eventLoopGroup: eventLoopGroup, logger: logger) -// .and(loadTopics(eventLoopGroup: eventLoopGroup, logger: logger)) -// .makeDewPointEnvironment(eventLoopGroup: eventLoopGroup, logger: logger) -// .connectToMQTTBroker(autoConnect: autoConnect, logger: logger) -// } -// -// /// Loads the ``EnvVars`` either using the defualts, from a file in the root directory under `.dewPoint-env` or in the shell / application environment. -// /// -// /// - Parameters: -// /// - eventLoopGroup: The event loop group for the application. -// /// - logger: An optional logger for debugging. -// private func loadEnvVars( -// eventLoopGroup: EventLoopGroup, -// logger: Logger? -// ) -> EventLoopFuture { -// logger?.debug("Loading env vars...") -// -// // TODO: Need to have the env file path passed in / dynamic. -// let envFilePath = URL(fileURLWithPath: #file) -// .deletingLastPathComponent() -// .deletingLastPathComponent() -// .deletingLastPathComponent() -// .appendingPathComponent(".dewPoint-env") -// -// let decoder = JSONDecoder() -// let encoder = JSONEncoder() -// -// let defaultEnvVars = EnvVars() -// -// let defaultEnvDict = (try? encoder.encode(defaultEnvVars)) -// .flatMap { try? decoder.decode([String: String].self, from: $0) } -// ?? [:] -// -// // Read from file `.dewPoint-env` file if it exists. -// let localEnvVarsDict = (try? Data(contentsOf: envFilePath)) -// .flatMap { try? decoder.decode([String: String].self, from: $0) } -// ?? [:] -// -// // Merge with variables in the shell environment. -// let envVarsDict = defaultEnvDict -// .merging(localEnvVarsDict, uniquingKeysWith: { $1 }) -// .merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 }) -// -// // Produces the final env vars from the merged items or uses defaults if something -// // went wrong. -// let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict)) -// .flatMap { try? decoder.decode(EnvVars.self, from: $0) } -// ?? defaultEnvVars -// -// logger?.debug("Done loading env vars...") -// return eventLoopGroup.next().makeSucceededFuture(envVars) -// } -// -// // MARK: TODO perhaps make loading from file an option passed in when app is launched. -// -// /// Load the topics from file in application root directory at `.topics`, if available or fall back to the defualt. -// /// -// /// - Parameters: -// /// - eventLoopGroup: The event loop group for the application. -// /// - logger: An optional logger for debugging. -// private func loadTopics(eventLoopGroup: EventLoopGroup, logger: Logger?) -> EventLoopFuture { -// logger?.debug("Loading topics from file...") -// -// let topicsFilePath = URL(fileURLWithPath: #file) -// .deletingLastPathComponent() -// .deletingLastPathComponent() -// .deletingLastPathComponent() -// .appendingPathComponent(".topics") -// -// let decoder = JSONDecoder() -// -// // Attempt to load the topics from file in root directory. -// let localTopics = (try? Data(contentsOf: topicsFilePath)) -// .flatMap { try? decoder.decode(Topics.self, from: $0) } -// -// logger?.debug( -// localTopics == nil -// ? "Failed to load topics from file, falling back to defaults." -// : "Done loading topics from file." -// ) -// -// // If we were able to load from file use that, else fallback to the defaults. -// return eventLoopGroup.next().makeSucceededFuture(localTopics ?? .init()) -// } -// -// private extension EventLoopFuture where Value == (EnvVars, Topics) { -// -// /// Creates the ``DewPointEnvironment`` for the application after the ``EnvVars`` have been loaded. -// /// -// /// - Parameters: -// /// - eventLoopGroup: The event loop group for the application. -// /// - logger: An optional logger for the application. -// func makeDewPointEnvironment( -// eventLoopGroup: EventLoopGroup, -// logger: Logger? -// ) -> EventLoopFuture { -// map { envVars, topics in -// let mqttClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger) -// return DewPointEnvironment( -// envVars: envVars, -// mqttClient: mqttClient, -// topics: topics -// ) -// } -// } -// } -// -// private extension EventLoopFuture where Value == DewPointEnvironment { -// -// /// Connects to the MQTT broker after the ``DewPointEnvironment`` has been setup. -// /// -// /// - Parameters: -// /// - logger: An optional logger for debugging. -// func connectToMQTTBroker(autoConnect: Bool, logger: Logger?) -> EventLoopFuture { -// guard autoConnect else { return self } -// return flatMap { environment in -// logger?.debug("Connecting to MQTT Broker...") -// return environment.mqttClient.connect() -// .map { _ in -// logger?.debug("Successfully connected to MQTT Broker...") -// return environment -// } -// } -// } -// } -// -// private extension MQTTNIO.MQTTClient { -// -// convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) { -// self.init( -// host: envVars.host, -// port: envVars.port != nil ? Int(envVars.port!) : nil, -// identifier: envVars.identifier, -// eventLoopGroupProvider: .shared(eventLoopGroup), -// logger: logger, -// configuration: .init( -// version: .v5_0, -// userName: envVars.userName, -// password: envVars.password -// ) -// ) -// } -// } diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index 290653a..c499713 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -90,7 +90,6 @@ public actor MQTTConnectionService: Service { logger.debug("Begin shutting down MQTT broker connection.") client.removeCloseListener(named: "\(Self.self)") internalEventStream.stop() - // continuation.yield(.shuttingDown) _ = client.disconnect() try? client.syncShutdownGracefully() continuation.finish() diff --git a/Sources/SensorsService/Helpers.swift b/Sources/SensorsService/Helpers.swift index fd336c8..f66311e 100755 --- a/Sources/SensorsService/Helpers.swift +++ b/Sources/SensorsService/Helpers.swift @@ -3,7 +3,7 @@ import Models import MQTTNIO import NIO import NIOFoundationCompat -import SharedModels +import PsychrometricClient /// Represents a type that can be initialized by a ``ByteBuffer``. protocol BufferInitalizable { @@ -23,14 +23,6 @@ extension Double: BufferInitalizable { } } -// extension DryBulb: BufferInitalizable { -// /// Attempt to create / parse a temperature from a byte buffer. -// init?(buffer: inout ByteBuffer) { -// guard let value = Double(buffer: &buffer) else { return nil } -// self.init(.init(value, units: .celsius)) -// } -// } - extension Tagged: BufferInitalizable where RawValue: BufferInitalizable { init?(buffer: inout ByteBuffer) { guard let value = RawValue(buffer: &buffer) else { return nil } @@ -51,11 +43,3 @@ extension Temperature: BufferInitalizable { self.init(value) } } - -// extension RelativeHumidity: BufferInitalizable { -// /// Attempt to create / parse a relative humidity from a byte buffer. -// init?(buffer: inout ByteBuffer) { -// guard let value = Double(buffer: &buffer) else { return nil } -// self.init(value) -// } -// } diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index 36b9139..9b5df72 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -1,3 +1,5 @@ +import Dependencies +import DependenciesMacros import Foundation import Logging import Models @@ -7,6 +9,129 @@ import NIO import PsychrometricClient import ServiceLifecycle +@DependencyClient +public struct SensorsClient: Sendable { + + public var listen: @Sendable (_ topics: [String]) async throws -> AsyncStream + public var logger: Logger? + public var publish: @Sendable (_ value: Double, _ topic: String) async throws -> Void + public var shutdown: @Sendable () -> Void = {} + + public func listen(to topics: [String]) async throws -> AsyncStream { + try await listen(topics) + } + + public func publish(_ value: Double, to topic: String) async throws { + try await publish(value, topic) + } +} + +extension SensorsClient: TestDependencyKey { + public static var testValue: SensorsClient { + Self() + } +} + +public extension DependencyValues { + var sensorsClient: SensorsClient { + get { self[SensorsClient.self] } + set { self[SensorsClient.self] = newValue } + } +} + +public actor SensorsService2: Service { + + @Dependency(\.sensorsClient) var client + + private var sensors: [TemperatureAndHumiditySensor] + + public init(sensors: [TemperatureAndHumiditySensor]) { + self.sensors = sensors + } + + public func run() async throws { + guard sensors.count > 0 else { + throw SensorCountError() + } + + let stream = try await client.listen(to: topics) + + do { + try await withGracefulShutdownHandler { + try await withThrowingDiscardingTaskGroup { group in + for await result in stream.cancelOnGracefulShutdown() { + group.addTask { try await self.handleResult(result) } + } + } + } onGracefulShutdown: { + Task { + await self.client.logger?.trace("Received graceful shutdown.") + try? await self.publishUpdates() + await self.client.shutdown() + } + } + } catch { + client.logger?.trace("Error: \(error)") + client.shutdown() + } + } + + private var topics: [String] { + sensors.reduce(into: [String]()) { array, sensor in + array.append(sensor.topics.temperature) + array.append(sensor.topics.humidity) + } + } + + private func handleResult(_ result: MQTTPublishInfo) async throws { + let topic = result.topicName + client.logger?.trace("Begin handling result for topic: \(topic)") + + func decode(_: V.Type) -> V? { + var buffer = result.payload + return V(buffer: &buffer) + } + + if topic.contains("temperature") { + client.logger?.trace("Begin handling temperature result.") + guard let temperature = decode(DryBulb.self) else { + client.logger?.trace("Failed to decode temperature: \(result.payload)") + throw DecodingError() + } + client.logger?.trace("Decoded temperature: \(temperature)") + try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) + + } else if topic.contains("humidity") { + client.logger?.trace("Begin handling humidity result.") + guard let humidity = decode(RelativeHumidity.self) else { + client.logger?.trace("Failed to decode humidity: \(result.payload)") + throw DecodingError() + } + client.logger?.trace("Decoded humidity: \(humidity)") + try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) + } else { + client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!") + return + } + + try await publishUpdates() + client.logger?.trace("Done handling result for topic: \(topic)") + } + + private func publish(_ double: Double?, to topic: String) async throws { + guard let double else { return } + try await client.publish(double, to: topic) + client.logger?.trace("Published update to topic: \(topic)") + } + + private func publishUpdates() async throws { + for sensor in sensors.filter(\.needsProcessed) { + try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint) + try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy) + } + } +} + public actor SensorsService: Service { private var sensors: [TemperatureAndHumiditySensor] private let client: MQTTClient @@ -174,6 +299,7 @@ struct DecodingError: Error {} struct MQTTClientNotConnected: Error {} struct NotFoundError: Error {} struct SensorExists: Error {} +struct SensorCountError: Error {} // MARK: - Helpers diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index e5f6827..c459087 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -83,7 +83,7 @@ private extension MQTTNIO.MQTTClient { eventLoopGroupProvider: .shared(eventLoopGroup), logger: logger, configuration: .init( - version: .v5_0, + version: .v3_1_1, disablePing: false, userName: envVars.userName, password: envVars.password diff --git a/Sources/dewPoint-controller/main.swift.old b/Sources/dewPoint-controller/main.swift.old deleted file mode 100755 index c82840e..0000000 --- a/Sources/dewPoint-controller/main.swift.old +++ /dev/null @@ -1,74 +0,0 @@ -// import Bootstrap -// import ClientLive -// import CoreUnitTypes -// import Logging -// import Models -// import MQTTNIO -// import NIO -// import TopicsLive -// import Foundation -// -// var logger: Logger = { -// var logger = Logger(label: "dewPoint-logger") -// logger.logLevel = .debug -// return logger -// }() -// -// logger.info("Starting Swift Dew Point Controller!") -// -// let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) -// var environment = try bootstrap(eventLoopGroup: eventLoopGroup, logger: logger, autoConnect: false).wait() -// -// // Set the log level to info only in production mode. -// if environment.envVars.appEnv == .production { -// logger.debug("Updating logging level to info.") -// logger.logLevel = .info -// } -// -// // Set up the client, topics and state. -// environment.topics = .live -// let state = State() -// let client = Client.live(client: environment.mqttClient, state: state, topics: environment.topics) -// -// defer { -// logger.debug("Disconnecting") -// } -// -// // Add topic listeners. -// client.addListeners() -// -// while true { -// if !environment.mqttClient.isActive() { -// logger.trace("Connecting to MQTT broker...") -// try client.connect().wait() -// try client.subscribe().wait() -// Thread.sleep(forTimeInterval: 1) -// } -// -// // Check if sensors need processed. -// if state.sensors.needsProcessed { -// logger.debug("Sensor state has changed...") -// if state.sensors.mixedAirSensor.needsProcessed { -// logger.trace("Publishing mixed air sensor.") -// try client.publishSensor(.mixed(state.sensors.mixedAirSensor)).wait() -// } -// if state.sensors.postCoilSensor.needsProcessed { -// logger.trace("Publishing post coil sensor.") -// try client.publishSensor(.postCoil(state.sensors.postCoilSensor)).wait() -// } -// if state.sensors.returnAirSensor.needsProcessed { -// logger.trace("Publishing return air sensor.") -// try client.publishSensor(.return(state.sensors.returnAirSensor)).wait() -// } -// if state.sensors.supplyAirSensor.needsProcessed { -// logger.trace("Publishing supply air sensor.") -// try client.publishSensor(.supply(state.sensors.supplyAirSensor)).wait() -// } -// } -// -// // logger.debug("Fetching dew point...") -// // -// // logger.debug("Published dew point...") -// -// Thread.sleep(forTimeInterval: 5) -// } diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/SensorsServiceTests/SensorsClientTests.swift index e95b9b1..917a0e6 100755 --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/SensorsServiceTests/SensorsClientTests.swift @@ -69,67 +69,105 @@ final class SensorsClientTests: XCTestCase { // await client.shutdown() // } - func testSensorService() async throws { - let mqtt = createClient(identifier: "testSensorService") - // let mqtt = await client.client - let sensor = TemperatureAndHumiditySensor(location: .mixedAir) - let publishInfo = PublishInfoContainer(topicFilters: [ - sensor.topics.dewPoint, - sensor.topics.enthalpy - ]) - let service = SensorsService(client: mqtt, sensors: [sensor]) +// func testSensorService() async throws { +// let mqtt = createClient(identifier: "testSensorService") +// // let mqtt = await client.client +// let sensor = TemperatureAndHumiditySensor(location: .mixedAir) +// let publishInfo = PublishInfoContainer(topicFilters: [ +// sensor.topics.dewPoint, +// sensor.topics.enthalpy +// ]) +// let service = SensorsService(client: mqtt, sensors: [sensor]) +// +// // fix to connect the mqtt client. +// try await mqtt.connect() +// let task = Task { try await service.run() } +// +// _ = try await mqtt.subscribe(to: [ +// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce), +// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce) +// ]) +// +// let listener = mqtt.createPublishListener() +// Task { +// for await result in listener { +// switch result { +// case let .failure(error): +// XCTFail("\(error)") +// case let .success(value): +// await publishInfo.addPublishInfo(value) +// } +// } +// } +// +// try await mqtt.publish( +// to: sensor.topics.temperature, +// payload: ByteBufferAllocator().buffer(string: "75.123"), +// qos: MQTTQoS.exactlyOnce, +// retain: true +// ) +// +// try await Task.sleep(for: .seconds(1)) +// +// // XCTAssert(client.sensors.first!.needsProcessed) + // // let firstSensor = await client.sensors.first! + // // XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius)) +// +// try await mqtt.publish( +// to: sensor.topics.humidity, +// payload: ByteBufferAllocator().buffer(string: "50"), +// qos: MQTTQoS.exactlyOnce, +// retain: true +// ) +// +// try await Task.sleep(for: .seconds(1)) +// +// // not working for some reason +// // XCTAssertEqual(publishInfo.info.count, 2) +// +// XCTAssert(publishInfo.info.count > 1) +// +// // fix to shutdown the mqtt client. +// task.cancel() +// try await mqtt.shutdown() +// } - // fix to connect the mqtt client. - try await mqtt.connect() - let task = Task { try await service.run() } + func testCapturingSensorClient() async throws { + class CapturedValues { + var values = [(value: Double, topic: String)]() + var didShutdown = false - _ = try await mqtt.subscribe(to: [ - MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce), - MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce) - ]) - - let listener = mqtt.createPublishListener() - Task { - for await result in listener { - switch result { - case let .failure(error): - XCTFail("\(error)") - case let .success(value): - await publishInfo.addPublishInfo(value) - } - } + init() {} } - try await mqtt.publish( - to: sensor.topics.temperature, - payload: ByteBufferAllocator().buffer(string: "75.123"), - qos: MQTTQoS.exactlyOnce, - retain: true - ) + let capturedValues = CapturedValues() - try await Task.sleep(for: .seconds(1)) - - // XCTAssert(client.sensors.first!.needsProcessed) -// let firstSensor = await client.sensors.first! -// XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius)) - - try await mqtt.publish( - to: sensor.topics.humidity, - payload: ByteBufferAllocator().buffer(string: "50"), - qos: MQTTQoS.exactlyOnce, - retain: true - ) - - try await Task.sleep(for: .seconds(1)) - - // not working for some reason - // XCTAssertEqual(publishInfo.info.count, 2) - - XCTAssert(publishInfo.info.count > 1) - - // fix to shutdown the mqtt client. - task.cancel() - try await mqtt.shutdown() + try await withDependencies { + $0.sensorsClient = .testing { value, topic in + capturedValues.values.append((value, topic)) + } captureShutdownEvent: { + capturedValues.didShutdown = $0 + } + } operation: { + @Dependency(\.sensorsClient) var client + let stream = try await client.listen(to: ["test"]) + for await value in stream { + var buffer = value.payload + guard let double = Double(buffer: &buffer) else { + XCTFail("Failed to decode double") + return + } + XCTAssertEqual(double, 75) + XCTAssertEqual(value.topicName, "test") + try await client.publish(26, to: "publish") + try await Task.sleep(for: .milliseconds(100)) + client.shutdown() + } + XCTAssertEqual(capturedValues.values.count, 1) + XCTAssertEqual(capturedValues.values.first?.value, 26) + XCTAssertEqual(capturedValues.values.first?.topic, "publish") + XCTAssertTrue(capturedValues.didShutdown) + } } // func testSensorCapturesPublishedState() async throws { @@ -211,3 +249,42 @@ class PublishInfoContainer { } } } + +extension SensorsClient { + + static func testing( + capturePublishedValues: @escaping (Double, String) -> Void, + captureShutdownEvent: @escaping (Bool) -> Void + ) -> Self { + let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self) + let logger = Logger(label: "\(Self.self).testing") + + return .init( + listen: { topics in + guard let topic = topics.randomElement() else { + throw TopicNotFoundError() + } + continuation.yield( + MQTTPublishInfo( + qos: .atLeastOnce, + retain: true, + topicName: topic, + payload: ByteBuffer(string: "75"), + properties: MQTTProperties() + ) + ) + return stream + }, + logger: logger, + publish: { value, topic in + capturePublishedValues(value, topic) + }, + shutdown: { + captureShutdownEvent(true) + continuation.finish() + } + ) + } +} + +struct TopicNotFoundError: Error {}