diff --git a/Dockerfile b/Dockerfile index 775cde6..11db039 100755 --- a/Dockerfile +++ b/Dockerfile @@ -11,4 +11,4 @@ RUN swift build --enable-test-discovery -c release -Xswiftc -g FROM swift:5.10-slim WORKDIR /run COPY --from=build /build/.build/release/dewPoint-controller /run -CMD ["/bin/bash", "-xc", "./dewPoint-controller"] +CMD ["/bin/bash", "-xc", "./dewpoint-controller"] diff --git a/Package.swift b/Package.swift index 344da28..4c71d4e 100755 --- a/Package.swift +++ b/Package.swift @@ -8,17 +8,18 @@ let swiftSettings: [SwiftSetting] = [ ] let package = Package( - name: "dewPoint-controller", + name: "dewpoint-controller", platforms: [ .macOS(.v14) ], products: [ - .executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]), + .executable(name: "dewpoint-controller", targets: ["dewPoint-controller"]), .library(name: "Models", targets: ["Models"]), .library(name: "MQTTConnectionManagerLive", targets: ["MQTTConnectionManagerLive"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "SensorsClientLive", targets: ["SensorsClientLive"]), - .library(name: "SensorsService", targets: ["SensorsService"]) + .library(name: "SensorsService", targets: ["SensorsService"]), + .library(name: "TopicDependencies", targets: ["TopicDependencies"]) ], dependencies: [ .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), @@ -100,6 +101,15 @@ let package = Package( "SensorsService", .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") ] + ), + .target( + name: "TopicDependencies", + dependencies: [ + .product(name: "Dependencies", package: "swift-dependencies"), + .product(name: "DependenciesMacros", package: "swift-dependencies"), + .product(name: "MQTTNIO", package: "mqtt-nio") + ], + swiftSettings: swiftSettings ) ] ) diff --git a/Sources/SensorsClientLive/Live.swift b/Sources/SensorsClientLive/Live.swift index c1cf051..e1d665e 100644 --- a/Sources/SensorsClientLive/Live.swift +++ b/Sources/SensorsClientLive/Live.swift @@ -20,7 +20,6 @@ public extension SensorsClient { return .init( listen: { try await listener.listen($0) }, - logger: client.logger, publish: { try await listener.publish($0, $1) }, shutdown: { listener.shutdown() } ) diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index 29cad79..92a9626 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -22,9 +22,6 @@ public struct SensorsClient: Sendable { /// Start listening for changes to sensor values on the MQTT broker. public var listen: @Sendable ([String]) async throws -> AsyncStream - /// A logger to use for the service. - public var logger: Logger? - /// Publish dew-point or enthalpy values back to the MQTT broker. public var publish: @Sendable (Double, String) async throws -> Void @@ -68,6 +65,8 @@ public actor SensorsService: Service { private var sensors: [TemperatureAndHumiditySensor] + private let logger: Logger? + /// Create a new sensors service that listens to the passed in /// sensors. /// @@ -76,9 +75,11 @@ public actor SensorsService: Service { /// - Parameters: /// - sensors: The sensors to listen for changes to. public init( - sensors: [TemperatureAndHumiditySensor] + sensors: [TemperatureAndHumiditySensor], + logger: Logger? = nil ) { self.sensors = sensors + self.logger = logger } /// Start the service with graceful shutdown, which will attempt to publish @@ -96,7 +97,7 @@ public actor SensorsService: Service { } } onGracefulShutdown: { Task { - await self.client.logger?.trace("Received graceful shutdown.") + self.logger?.trace("Received graceful shutdown.") try? await self.publishUpdates() await self.client.shutdown() } @@ -114,7 +115,7 @@ public actor SensorsService: Service { do { let topic = result.topic assert(topics.contains(topic)) - client.logger?.trace("Begin handling result for topic: \(topic)") + logger?.trace("Begin handling result for topic: \(topic)") func decode(_: V.Type) -> V? { var buffer = result.buffer @@ -122,35 +123,35 @@ public actor SensorsService: Service { } if topic.contains("temperature") { - client.logger?.trace("Begin handling temperature result.") + logger?.trace("Begin handling temperature result.") guard let temperature = decode(DryBulb.self) else { - client.logger?.trace("Failed to decode temperature: \(result.buffer)") + logger?.trace("Failed to decode temperature: \(result.buffer)") throw DecodingError() } - client.logger?.trace("Decoded temperature: \(temperature)") + logger?.trace("Decoded temperature: \(temperature)") try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) } else if topic.contains("humidity") { - client.logger?.trace("Begin handling humidity result.") + logger?.trace("Begin handling humidity result.") guard let humidity = decode(RelativeHumidity.self) else { - client.logger?.trace("Failed to decode humidity: \(result.buffer)") + logger?.trace("Failed to decode humidity: \(result.buffer)") throw DecodingError() } - client.logger?.trace("Decoded humidity: \(humidity)") + logger?.trace("Decoded humidity: \(humidity)") try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) } try await publishUpdates() - client.logger?.trace("Done handling result for topic: \(topic)") + logger?.trace("Done handling result for topic: \(topic)") } catch { - client.logger?.error("Received error: \(error)") + logger?.error("Received error: \(error)") } } private func publish(_ double: Double?, to topic: String) async throws { guard let double else { return } try await client.publish(double, to: topic) - client.logger?.trace("Published update to topic: \(topic)") + logger?.trace("Published update to topic: \(topic)") } private func publishUpdates() async throws { diff --git a/Sources/TopicDependencies/TopicListener.swift b/Sources/TopicDependencies/TopicListener.swift new file mode 100644 index 0000000..31e5236 --- /dev/null +++ b/Sources/TopicDependencies/TopicListener.swift @@ -0,0 +1,133 @@ +import Dependencies +import DependenciesMacros +import Foundation +import MQTTNIO + +/// A dependency that can generate an async stream of changes to the given topics. +/// +/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient +/// to generate the live dependency. +@DependencyClient +public struct TopicListener: Sendable { + + /// Create an async stream that listens for changes to the given topics. + private var _listen: @Sendable (_ topics: [String]) async throws -> AsyncThrowingStream + + /// Shutdown the listener stream. + public var shutdown: @Sendable () -> Void + + /// Create a new topic listener. + /// + /// - Parameters: + /// - listen: Generate an async stream of changes for the given topics. + /// - shutdown: Shutdown the topic listener stream. + public init( + listen: @Sendable @escaping ([String]) async throws -> AsyncThrowingStream, + shutdown: @Sendable @escaping () -> Void + ) { + self._listen = listen + self.shutdown = shutdown + } + + /// Create an async stream that listens for changes to the given topics. + /// + /// - Parameters: + /// - topics: The topics to listen for changes to. + public func listen(to topics: [String]) async throws -> AsyncThrowingStream { + try await _listen(topics) + } + + /// Create an async stream that listens for changes to the given topics. + /// + /// - Parameters: + /// - topics: The topics to listen for changes to. + public func listen(_ topics: String...) async throws -> AsyncThrowingStream { + try await listen(to: topics) + } + + /// Create the live implementation of the topic listener with the given MQTTClient. + /// + /// - Parameters: + /// - client: The MQTTClient to use. + public static func live(client: MQTTClient) -> Self { + let listener = MQTTTopicListener(client: client) + return .init( + listen: { await listener.listen($0) }, + shutdown: { listener.shutdown() } + ) + } +} + +extension TopicListener: TestDependencyKey { + public static var testValue: TopicListener { + Self() + } +} + +public extension DependencyValues { + var topicListener: TopicListener { + get { self[TopicListener.self] } + set { self[TopicListener.self] = newValue } + } +} + +// MARK: - Helpers + +private actor MQTTTopicListener { + private let client: MQTTClient + private let continuation: AsyncThrowingStream.Continuation + private let name: String + let stream: AsyncThrowingStream + private var shuttingDown: Bool = false + + init( + client: MQTTClient + ) { + let (stream, continuation) = AsyncThrowingStream.makeStream() + self.client = client + self.continuation = continuation + self.name = UUID().uuidString + self.stream = stream + } + + deinit { + if !shuttingDown { + let message = """ + Shutdown was not called on topic listener. This could lead to potential errors or + the stream never ending. + + Please ensure that you call shutdown on the listener. + """ + client.logger.warning("\(message)") + continuation.finish() + } + client.removePublishListener(named: name) + } + + func listen(_ topics: [String]) async -> AsyncThrowingStream { + assert(client.isActive(), "The client is not connected.") + client.addPublishListener(named: name) { result in + switch result { + case let .failure(error): + self.client.logger.error("Received error while listening: \(error)") + self.continuation.yield(with: .failure(error)) + case let .success(publishInfo): + if topics.contains(publishInfo.topicName) { + self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)") + self.continuation.yield(publishInfo) + } + } + } + return stream + } + + private func setIsShuttingDown() { + shuttingDown = true + } + + nonisolated func shutdown() { + client.logger.trace("Closing topic listener...") + continuation.finish() + Task { await self.setIsShuttingDown() } + } +} diff --git a/Sources/TopicDependencies/TopicPublisher.swift b/Sources/TopicDependencies/TopicPublisher.swift new file mode 100644 index 0000000..9ddfeee --- /dev/null +++ b/Sources/TopicDependencies/TopicPublisher.swift @@ -0,0 +1,114 @@ +import Dependencies +import DependenciesMacros +import MQTTNIO +import NIO + +/// A dependency that is responsible for publishing values to an MQTT broker. +/// +/// - Note: This dependency only conforms to `TestDependencyKey` because it +/// requires an active `MQTTClient` to generate the live dependency. +@DependencyClient +public struct TopicPublisher: Sendable { + + private var _publish: @Sendable (PublishRequest) async throws -> Void + + /// Create a new topic publisher. + /// + /// - Parameters: + /// - publish: Handle the publish request. + public init( + publish: @Sendable @escaping (PublishRequest) async throws -> Void + ) { + self._publish = publish + } + + /// Publish a new value to the given topic. + /// + /// - Parameters: + /// - topicName: The topic to publish the new value to. + /// - payload: The value to publish. + /// - qos: The MQTTQoS. + /// - retain: The retain flag. + public func publish( + to topicName: String, + payload: ByteBuffer, + qos: MQTTQoS, + retain: Bool = false + ) async throws { + try await _publish(.init( + topicName: topicName, + payload: payload, + qos: qos, + retain: retain + )) + } + + /// Create the live topic publisher with the given `MQTTClient`. + /// + /// - Parameters: + /// - client: The mqtt broker client to use. + public static func live(client: MQTTClient) -> Self { + .init( + publish: { request in + assert(client.isActive(), "Client not connected.") + client.logger.trace("Begin publishing to topic: \(request.topicName)") + defer { client.logger.trace("Done publishing to topic: \(request.topicName)") } + try await client.publish( + to: request.topicName, + payload: request.payload, + qos: request.qos, + retain: request.retain + ) + } + ) + } + + /// Represents the parameters required to publish a new value to the + /// MQTT broker. + public struct PublishRequest: Equatable, Sendable { + + /// The topic to publish the new value to. + public let topicName: String + + /// The value to publish. + public let payload: ByteBuffer + + /// The qos of the request. + public let qos: MQTTQoS + + /// The retain flag for the request. + public let retain: Bool + + /// Create a new publish request. + /// + /// - Parameters: + /// - topicName: The topic to publish to. + /// - payload: The value to publish. + /// - qos: The qos of the request. + /// - retain: The retain flag of the request. + public init( + topicName: String, + payload: ByteBuffer, + qos: MQTTQoS, + retain: Bool + ) { + self.topicName = topicName + self.payload = payload + self.qos = qos + self.retain = retain + } + } +} + +extension TopicPublisher: TestDependencyKey { + public static var testValue: TopicPublisher { Self() } +} + +public extension DependencyValues { + + /// A dependency that is responsible for publishing values to an MQTT broker. + var topicPublisher: TopicPublisher { + get { self[TopicPublisher.self] } + set { self[TopicPublisher.self] = newValue } + } +} diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index 8e6662f..6b8b8e4 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -39,7 +39,7 @@ struct Application { $0.mqttConnectionManager = .live(client: mqtt, logger: logger) } operation: { let mqttConnection = MQTTConnectionService(cleanSession: false, logger: logger) - let sensors = SensorsService(sensors: .live) + let sensors = SensorsService(sensors: .live, logger: logger) var serviceGroupConfiguration = ServiceGroupConfiguration( services: [