diff --git a/Package.swift b/Package.swift index 2db2a77..83fcc74 100755 --- a/Package.swift +++ b/Package.swift @@ -3,7 +3,8 @@ import PackageDescription let swiftSettings: [SwiftSetting] = [ - .enableExperimentalFeature("StrictConcurrency") + .enableExperimentalFeature("StrictConcurrency"), + .enableUpcomingFeature("InferSendableCaptures") ] let package = Package( @@ -21,6 +22,7 @@ let package = Package( dependencies: [ .package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"), .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), + .package(url: "https://github.com/apple/swift-log", from: "1.6.0"), .package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"), .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0") ], @@ -28,9 +30,12 @@ let package = Package( .executableTarget( name: "dewPoint-controller", dependencies: [ - "Bootstrap", + "Models", + "MQTTConnectionService", + "SensorsService", .product(name: "MQTTNIO", package: "mqtt-nio"), - .product(name: "NIO", package: "swift-nio") + .product(name: "NIO", package: "swift-nio"), + .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") ] ), .testTarget( @@ -49,6 +54,7 @@ let package = Package( .target( name: "Models", dependencies: [ + .product(name: "Logging", package: "swift-log"), .product(name: "PsychrometricClient", package: "swift-psychrometrics") ], swiftSettings: swiftSettings @@ -73,6 +79,7 @@ let package = Package( name: "SensorsService", dependencies: [ "Models", + "MQTTConnectionService", .product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") ], diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index 305d990..290653a 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -1,15 +1,22 @@ -import EnvVars +@preconcurrency import Foundation import Logging +import Models import MQTTNIO import NIO import ServiceLifecycle +// TODO: This may not need to be an actor. + /// Manages the MQTT broker connection. public actor MQTTConnectionService: Service { + private let cleanSession: Bool public let client: MQTTClient - private var shuttingDown = false + private let continuation: AsyncStream.Continuation + public nonisolated let events: AsyncStream + private let internalEventStream: ConnectionStream nonisolated var logger: Logger { client.logger } + // private var shuttingDown = false public init( cleanSession: Bool = true, @@ -17,6 +24,16 @@ public actor MQTTConnectionService: Service { ) { self.cleanSession = cleanSession self.client = client + self.internalEventStream = .init() + let (stream, continuation) = AsyncStream.makeStream(of: Event.self) + self.events = stream + self.continuation = continuation + } + + deinit { + self.logger.debug("MQTTConnectionService is gone.") + self.internalEventStream.stop() + continuation.finish() } /// The entry-point of the service. @@ -25,29 +42,35 @@ public actor MQTTConnectionService: Service { /// It will attempt to gracefully shutdown the connection upon receiving /// `sigterm` signals. public func run() async throws { - await withDiscardingTaskGroup { group in - await withGracefulShutdownHandler { + await withGracefulShutdownHandler { + await withDiscardingTaskGroup { group in group.addTask { await self.connect() } - } onGracefulShutdown: { - // try? self.client.syncShutdownGracefully() - Task { await self.shutdown() } + group.addTask { + await self.internalEventStream.start { self.client.isActive() } + } + for await event in self.internalEventStream.events.cancelOnGracefulShutdown() { + if event == .shuttingDown { + self.shutdown() + break + } + self.logger.trace("Sending connection event: \(event)") + self.continuation.yield(event) + } + group.cancelAll() } + } onGracefulShutdown: { + self.logger.trace("Received graceful shutdown.") + self.shutdown() } } - func shutdown() async { - shuttingDown = true - try? await client.disconnect() - try? await client.shutdown() - } - func connect() async { do { try await withThrowingDiscardingTaskGroup { group in group.addTask { try await self.client.connect(cleanSession: self.cleanSession) } - client.addCloseListener(named: "SensorsClient") { [self] _ in + client.addCloseListener(named: "\(Self.self)") { _ in Task { self.logger.debug("Connection closed.") self.logger.debug("Reconnecting...") @@ -55,22 +78,75 @@ public actor MQTTConnectionService: Service { } } self.logger.debug("Connection successful.") + self.continuation.yield(.connected) } } catch { - logger.trace("Failed to connect.") + logger.trace("Failed to connect: \(error)") + continuation.yield(.disconnected) } -// do { -// try await client.connect(cleanSession: cleanSession) -// client.addCloseListener(named: "SensorsClient") { [self] _ in -// Task { -// self.logger.debug("Connection closed.") -// self.logger.debug("Reconnecting...") -// await self.connect() -// } -// } -// logger.debug("Connection successful.") -// } catch { -// logger.trace("Connection Failed.\(error)") -// } } + + private nonisolated func shutdown() { + logger.debug("Begin shutting down MQTT broker connection.") + client.removeCloseListener(named: "\(Self.self)") + internalEventStream.stop() + // continuation.yield(.shuttingDown) + _ = client.disconnect() + try? client.syncShutdownGracefully() + continuation.finish() + logger.info("MQTT broker connection closed.") + } + +} + +extension MQTTConnectionService { + + public enum Event: Sendable { + case connected + case disconnected + case shuttingDown + } + + // TODO: This functionality can probably move into the connection service. + + private final class ConnectionStream: Sendable { + + // private var cancellable: AnyCancellable? + private let continuation: AsyncStream.Continuation + let events: AsyncStream + + init() { + let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self) + self.events = stream + self.continuation = continuation + } + + deinit { + stop() + } + + func start(isActive connectionIsActive: @escaping () -> Bool) async { + try? await Task.sleep(for: .seconds(1)) + let event: MQTTConnectionService.Event = connectionIsActive() + ? .connected + : .disconnected + + continuation.yield(event) +// cancellable = Timer.publish(every: 1.0, on: .main, in: .common) +// .autoconnect() +// .sink { [weak self] (_: Date) in +// let event: MQTTConnectionService.Event = connectionIsActive() +// ? .connected +// : .disconnected +// +// self?.continuation.yield(event) +// } + } + + func stop() { + continuation.yield(.shuttingDown) + continuation.finish() + } + } + } diff --git a/Sources/Models/EnvVars.swift b/Sources/Models/EnvVars.swift index 926f1f2..a7d4392 100755 --- a/Sources/Models/EnvVars.swift +++ b/Sources/Models/EnvVars.swift @@ -1,4 +1,5 @@ import Foundation +import Logging /// Holds common settings for connecting to your MQTT broker. The default values can be used, /// they can be loaded from the shell environment, or from a file located in the root directory. diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index e9d8fcf..36b9139 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -1,6 +1,7 @@ import Foundation import Logging import Models +import MQTTConnectionService @preconcurrency import MQTTNIO import NIO import PsychrometricClient @@ -9,13 +10,17 @@ import ServiceLifecycle public actor SensorsService: Service { private var sensors: [TemperatureAndHumiditySensor] private let client: MQTTClient - var logger: Logger { client.logger } + private let events: @Sendable () -> AsyncStream + nonisolated var logger: Logger { client.logger } + private var shuttingDown: Bool = false public init( client: MQTTClient, + events: @Sendable @escaping () -> AsyncStream, sensors: [TemperatureAndHumiditySensor] ) { self.client = client + self.events = events self.sensors = sensors } @@ -25,50 +30,90 @@ public actor SensorsService: Service { /// listening for sensor value changes then publishing the dew-point /// and enthalpy values of the sensors. public func run() async throws { - guard client.isActive() else { - throw MQTTClientNotConnected() - } - try await withThrowingDiscardingTaskGroup { group in - group.addTask { try await self.subscribeToSensors() } - for await result in client.createPublishListener().cancelOnGracefulShutdown() { - group.addTask { - try await self.handleResult(result) + do { + try await withGracefulShutdownHandler { + try await withThrowingDiscardingTaskGroup { group in + client.addPublishListener(named: "\(Self.self)") { result in + if self.shuttingDown { + self.logger.trace("Shutting down.") + } else if !self.client.isActive() { + self.logger.trace("Client is not currently active") + } else { + Task { try await self.handleResult(result) } + } + } + for await event in self.events().cancelOnGracefulShutdown() { + logger.trace("Received event: \(event)") + if event == .shuttingDown { + self.setIsShuttingDown() + } else if event == .connected { + group.addTask { try await self.subscribeToSensors() } + } else { + group.addTask { await self.unsubscribeToSensors() } + group.addTask { try? await Task.sleep(for: .milliseconds(100)) } + } + } } + } onGracefulShutdown: { + // do something. + self.logger.debug("Received graceful shutdown.") + Task { [weak self] in await self?.setIsShuttingDown() } } + } catch { + // WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue, + // but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown. + // However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe + // to ignore the `noConnection` error. + logger.trace("Run error: \(error)") + // throw error } } + private func setIsShuttingDown() { + logger.debug("Received shut down event.") + Task { try await publishUpdates() } + Task { await self.unsubscribeToSensors() } + shuttingDown = true + client.removePublishListener(named: "\(Self.self)") + } + private func handleResult( _ result: Result ) async throws { - switch result { - case let .failure(error): - logger.debug("Failed receiving sensor: \(error)") - throw error - case let .success(value): - // do something. - let topic = value.topicName - logger.trace("Received new value for topic: \(topic)") - if topic.contains("temperature") { + logger.trace("Begin handling result") + do { + switch result { + case let .failure(error): + logger.debug("Failed receiving sensor: \(error)") + throw error + case let .success(value): // do something. - var buffer = value.payload - guard let temperature = DryBulb(buffer: &buffer) else { - logger.trace("Decoding error for topic: \(topic)") - throw DecodingError() - } - try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) - try await publishUpdates() + let topic = value.topicName + logger.trace("Received new value for topic: \(topic)") + if topic.contains("temperature") { + // do something. + var buffer = value.payload + guard let temperature = DryBulb(buffer: &buffer) else { + logger.trace("Decoding error for topic: \(topic)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) + try await publishUpdates() - } else if topic.contains("humidity") { - var buffer = value.payload - // Decode and update the temperature value - guard let humidity = RelativeHumidity(buffer: &buffer) else { - logger.debug("Failed to decode humidity from buffer: \(buffer)") - throw DecodingError() + } else if topic.contains("humidity") { + var buffer = value.payload + // Decode and update the temperature value + guard let humidity = RelativeHumidity(buffer: &buffer) else { + logger.debug("Failed to decode humidity from buffer: \(buffer)") + throw DecodingError() + } + try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) + try await publishUpdates() } - try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) - try await publishUpdates() } + } catch { + logger.trace("Handle Result error: \(error)") + throw error } } @@ -82,7 +127,26 @@ public actor SensorsService: Service { } } + private func unsubscribeToSensors() async { + logger.trace("Begin unsubscribe to sensors.") + guard client.isActive() else { + logger.debug("Client is not active, skipping.") + return + } + do { + let topics = sensors.reduce(into: [String]()) { array, sensor in + array.append(sensor.topics.temperature) + array.append(sensor.topics.humidity) + } + try await client.unsubscribe(from: topics) + logger.trace("Unsubscribed from sensors.") + } catch { + logger.trace("Unsubscribe error: \(error)") + } + } + private func publish(double: Double?, to topic: String) async throws { + guard client.isActive() else { return } guard let double else { return } let rounded = round(double * 100) / 100 logger.debug("Publishing \(rounded), to: \(topic)") diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift new file mode 100644 index 0000000..e5f6827 --- /dev/null +++ b/Sources/dewPoint-controller/Application.swift @@ -0,0 +1,101 @@ +import Foundation +import Logging +import Models +import MQTTConnectionService +import MQTTNIO +import NIO +import PsychrometricClientLive +import SensorsService +import ServiceLifecycle + +@main +struct Application { + /// The main entry point of the application. + static func main() async throws { + let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + var logger = Logger(label: "dewpoint-controller") + logger.logLevel = .trace + + logger.info("Starting dewpoint-controller!") + + let environment = loadEnvVars(logger: logger) + + if environment.appEnv == .production { + logger.debug("Updating logging level to info.") + logger.logLevel = .info + } + + let mqtt = MQTTClient( + envVars: environment, + eventLoopGroup: eventloopGroup, + logger: logger + ) + + let mqttConnection = MQTTConnectionService(client: mqtt) + let sensors = SensorsService( + client: mqtt, + events: { mqttConnection.events }, + sensors: .live + ) + + let serviceGroup = ServiceGroup( + services: [ + mqttConnection, + sensors + ], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + + try await serviceGroup.run() + } +} + +// MARK: - Helpers + +private func loadEnvVars(logger: Logger) -> EnvVars { + let defaultEnvVars = EnvVars() + let encoder = JSONEncoder() + let decoder = JSONDecoder() + + let defaultEnvDict = (try? encoder.encode(defaultEnvVars)) + .flatMap { try? decoder.decode([String: String].self, from: $0) } + ?? [:] + + let envVarsDict = defaultEnvDict + .merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 }) + + let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict)) + .flatMap { try? decoder.decode(EnvVars.self, from: $0) } + ?? defaultEnvVars + + logger.debug("Done loading EnvVars...") + + return envVars +} + +private extension MQTTNIO.MQTTClient { + convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) { + self.init( + host: envVars.host, + port: envVars.port != nil ? Int(envVars.port!) : nil, + identifier: envVars.identifier, + eventLoopGroupProvider: .shared(eventLoopGroup), + logger: logger, + configuration: .init( + version: .v5_0, + disablePing: false, + userName: envVars.userName, + password: envVars.password + ) + ) + } +} + +private extension Array where Element == TemperatureAndHumiditySensor { + static var live: Self { + TemperatureAndHumiditySensor.Location.allCases.map { location in + TemperatureAndHumiditySensor(location: location) + } + } +} diff --git a/Sources/dewPoint-controller/main.swift b/Sources/dewPoint-controller/main.swift.old similarity index 100% rename from Sources/dewPoint-controller/main.swift rename to Sources/dewPoint-controller/main.swift.old diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift index a3543e3..9ab7825 100644 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift @@ -1,5 +1,6 @@ -import EnvVars +import Combine import Logging +import Models @testable import MQTTConnectionService import MQTTNIO import NIO @@ -25,6 +26,8 @@ final class MQTTConnectionServiceTests: XCTestCase { try await Task.sleep(for: .seconds(1)) XCTAssert(client.isActive()) trigger.triggerGracefulShutdown() + // try await Task.sleep(for: .seconds(2)) + // XCTAssertFalse(client.isActive()) } } @@ -55,4 +58,65 @@ final class MQTTConnectionServiceTests: XCTestCase { ) } + func testEventStream() async throws { + var connection: ConnectionStream? = ConnectionStream() + + let task = Task { + guard let events = connection?.events else { return } + print("before loop") + for await event in events { + print("\(event)") + } + print("after loop") + } + + let ending = Task { + try await Task.sleep(for: .seconds(2)) + connection = nil + } + + connection?.start() + try await ending.value + task.cancel() + } + +} + +class ConnectionStream { + + enum Event { + case connected + case disconnected + case shuttingDown + } + + let events: AsyncStream + private let continuation: AsyncStream.Continuation + private var cancellable: AnyCancellable? + + init() { + let (stream, continuation) = AsyncStream.makeStream(of: Event.self) + self.events = stream + self.continuation = continuation + } + + deinit { + print("connection stream is gone.") + stop() + } + + func start() { + cancellable = Timer.publish(every: 1.0, on: .main, in: .common) + .autoconnect() + .sink { [weak self] _ in + print("will send event.") + self?.continuation.yield(.connected) + } + } + + func stop() { + continuation.yield(.shuttingDown) + cancellable = nil + continuation.finish() + } } diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/SensorsServiceTests/SensorsClientTests.swift index 2e96ce5..e95b9b1 100755 --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/SensorsServiceTests/SensorsClientTests.swift @@ -1,6 +1,4 @@ -@testable import ClientLive import Dependencies -import EnvVars import Logging import Models import MQTTNIO diff --git a/docker-compose.yaml b/docker-compose.yaml index c0ff58c..0f04f6a 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -5,6 +5,17 @@ services: restart: unless-stopped env_file: .env + local: + container_name: local-server + build: + context: . + dockerfile: Dockerfile + platform: linux/amd64 + depends_on: + - mosquitto + environment: + - MOSQUITTO_SERVER=mosquitto + test: build: context: .