diff --git a/.swiftlint.yml b/.swiftlint.yml new file mode 100644 index 0000000..e2ccc42 --- /dev/null +++ b/.swiftlint.yml @@ -0,0 +1,2 @@ +disabled_rules: + - closing_brace diff --git a/.swiftpm/dewpoint-controller-Package.xctestplan b/.swiftpm/dewpoint-controller-Package.xctestplan index 5e8bdf3..741526b 100644 --- a/.swiftpm/dewpoint-controller-Package.xctestplan +++ b/.swiftpm/dewpoint-controller-Package.xctestplan @@ -17,17 +17,11 @@ }, "testTargets" : [ { + "parallelizable" : true, "target" : { "containerPath" : "container:", - "identifier" : "MQTTConnectionServiceTests", - "name" : "MQTTConnectionServiceTests" - } - }, - { - "target" : { - "containerPath" : "container:", - "identifier" : "SensorsServiceTests", - "name" : "SensorsServiceTests" + "identifier" : "IntegrationTests", + "name" : "IntegrationTests" } } ], diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme index 62a0843..4ff0e6e 100644 --- a/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme +++ b/.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme @@ -77,6 +77,34 @@ ReferencedContainer = "container:"> + + + + + + + + - - - - diff --git a/Makefile b/Makefile index 9ec2468..d2c29b5 100755 --- a/Makefile +++ b/Makefile @@ -1,29 +1,39 @@ +DOCKER_IMAGE_NAME?="swift-mqtt-dewpoint" +DOCKER_TAG_NAME?="latest" -bootstrap-env: +.PHONY: bootstrap +bootstrap: @cp Bootstrap/dewPoint-env-example .dewPoint-env -bootstrap-topics: - @cp Bootstrap/topics-example .topics - -bootstrap: bootstrap-env bootstrap-topics - +.PHONY: build build: @swift build -Xswiftc -strict-concurrency=complete +.PHONY: build-docker +build-docker: + @docker build \ + --file docker/Dockerfile \ + --tag "${DOCKER_IMAGE_NAME}:${DOCKER_TAG_NAME}" . + +.PHONY: clean clean: rm -rf .build +.PHONY: run run: @swift run dewpoint-controller +.PHONY: test-docker test-docker: @docker compose --file docker/docker-compose-test.yaml \ run --build --remove-orphans -i --rm test @docker compose --file docker/docker-compose-test.yaml down +.PHONY: start-mosquitto start-mosquitto: @docker compose --file docker/docker-compose.yaml \ up -d mosquitto +.PHONY: test-swift test-swift: start-mosquitto @swift test --enable-code-coverage diff --git a/Package.resolved b/Package.resolved index 9feac26..520fd2b 100755 --- a/Package.resolved +++ b/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "33fdcea7245de36c7e638047a16bba6605bc9bac0117aab7cb9397289a33214e", + "originHash" : "486be5d69e4f0ba7b9f42046df31a727c7e394e4ecfae5671e1b194bed7c9e9b", "pins" : [ { "identity" : "combine-schedulers", @@ -10,6 +10,15 @@ "version" : "1.0.2" } }, + { + "identity" : "dotenv", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swiftpackages/DotEnv.git", + "state" : { + "revision" : "1f15bb9de727d694af1d003a1a5d7a553752850f", + "version" : "3.0.0" + } + }, { "identity" : "mqtt-nio", "kind" : "remoteSourceControl", @@ -19,6 +28,15 @@ "version" : "2.11.0" } }, + { + "identity" : "swift-argument-parser", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-argument-parser.git", + "state" : { + "revision" : "41982a3656a71c768319979febd796c6fd111d5c", + "version" : "1.5.0" + } + }, { "identity" : "swift-async-algorithms", "kind" : "remoteSourceControl", @@ -64,6 +82,15 @@ "version" : "1.3.0" } }, + { + "identity" : "swift-custom-dump", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pointfreeco/swift-custom-dump", + "state" : { + "revision" : "82645ec760917961cfa08c9c0c7104a57a0fa4b1", + "version" : "1.3.3" + } + }, { "identity" : "swift-dependencies", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index 86e17da..1ed959a 100755 --- a/Package.swift +++ b/Package.swift @@ -14,30 +14,43 @@ let package = Package( ], products: [ .executable(name: "dewpoint-controller", targets: ["DewPointController"]), + .library(name: "CliClient", targets: ["CliClient"]), .library(name: "Models", targets: ["Models"]), .library(name: "MQTTManager", targets: ["MQTTManager"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "SensorsService", targets: ["SensorsService"]) ], dependencies: [ + .package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), - .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), .package(url: "https://github.com/apple/swift-log", from: "1.6.0"), + .package(url: "https://github.com/swiftpackages/DotEnv.git", from: "3.0.0"), .package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"), + .package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.0.0"), .package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"), .package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"), .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0") ], targets: [ + .target( + name: "CliClient", + dependencies: [ + "Models", + .product(name: "Dependencies", package: "swift-dependencies"), + .product(name: "DependenciesMacros", package: "swift-dependencies"), + .product(name: "DotEnv", package: "DotEnv"), + .product(name: "MQTTNIO", package: "mqtt-nio") + ] + ), .executableTarget( name: "DewPointController", dependencies: [ - "Models", - "MQTTManager", + "CliClient", "MQTTConnectionService", "SensorsService", - .product(name: "MQTTNIO", package: "mqtt-nio"), - .product(name: "NIO", package: "swift-nio"), + .product(name: "ArgumentParser", package: "swift-argument-parser"), + .product(name: "CustomDump", package: "swift-custom-dump"), + // .product(name: "DotEnv", package: "DotEnv"), .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") ] ), @@ -69,10 +82,12 @@ let package = Package( swiftSettings: swiftSettings ), .testTarget( - name: "MQTTConnectionServiceTests", + name: "IntegrationTests", dependencies: [ + "DewPointController", "MQTTConnectionService", "MQTTManager", + "SensorsService", .product(name: "PsychrometricClientLive", package: "swift-psychrometrics"), .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") ] @@ -89,13 +104,6 @@ let package = Package( .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") ], swiftSettings: swiftSettings - ), - .testTarget( - name: "SensorsServiceTests", - dependencies: [ - "SensorsService", - .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") - ] ) ] ) diff --git a/Sources/CliClient/CliClient.swift b/Sources/CliClient/CliClient.swift new file mode 100644 index 0000000..986a83e --- /dev/null +++ b/Sources/CliClient/CliClient.swift @@ -0,0 +1,188 @@ +import Dependencies +import DependenciesMacros +import DotEnv +import Foundation +import Logging +import Models +import MQTTNIO +import NIO + +public extension DependencyValues { + var cliClient: CliClient { + get { self[CliClient.self] } + set { self[CliClient.self] = newValue } + } +} + +@DependencyClient +public struct CliClient { + + public var logLevel: @Sendable (EnvVars) -> Logger.Level = { _ in .debug } + public var makeEnvVars: @Sendable (EnvVarsRequest) async throws -> EnvVars + public var makeClient: @Sendable (ClientRequest) throws -> MQTTClient + public var parseMqttClientVersion: @Sendable (String) -> MQTTClient.Version? + + public struct ClientRequest: Sendable { + public let envVars: EnvVars + public let eventLoopGroup: MultiThreadedEventLoopGroup + public let logger: Logger? + + public init( + envVars: EnvVars, + eventLoopGroup: MultiThreadedEventLoopGroup, + logger: Logger? + ) { + self.envVars = envVars + self.eventLoopGroup = eventLoopGroup + self.logger = logger + } + + } + + public struct EnvVarsRequest: Sendable { + + public let envFilePath: String? + public let logger: Logger? + public let mqttClientVersion: String? + + public init( + envFilePath: String? = nil, + logger: Logger? = nil, + version mqttClientVersion: String? = nil + ) { + self.envFilePath = envFilePath + self.logger = logger + self.mqttClientVersion = mqttClientVersion + } + } +} + +extension CliClient: DependencyKey { + public static let testValue: CliClient = Self() + public static var liveValue: CliClient { + Self( + logLevel: { Logger.Level.from(environment: $0) }, + makeEnvVars: { + try EnvVars.load( + dotEnvFile: $0.envFilePath, + logger: $0.logger, + version: $0.mqttClientVersion + ) + }, + makeClient: { + MQTTClient( + envVars: $0.envVars, + eventLoopGroup: $0.eventLoopGroup, + logger: $0.logger + ) + }, + parseMqttClientVersion: { .init(string: $0) } + ) + } +} + +extension EnvVars { + + /// Load the `EnvVars` from the environment. + /// + /// - Paramaters: + /// - logger: An optional logger to use for debugging. + /// - version: A version that is specified from command line, ignoring any environment variable. + static func load( + dotEnvFile: String?, + logger: Logger?, + version: String? + ) throws -> EnvVars { + let defaultEnvVars = EnvVars() + let encoder = JSONEncoder() + let decoder = JSONDecoder() + + if let dotEnvFile { + try DotEnv.load(path: dotEnvFile) + } + + let defaultEnvDict = (try? encoder.encode(defaultEnvVars)) + .flatMap { try? decoder.decode([String: String].self, from: $0) } + ?? [:] + + let envVarsDict = defaultEnvDict + .merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 }) + + var envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict)) + .flatMap { try? decoder.decode(EnvVars.self, from: $0) } + ?? defaultEnvVars + + if let version { + envVars.version = version + } + + logger?.debug("Done loading EnvVars...") + + return envVars + } +} + +@_spi(Internal) +public extension MQTTClient { + convenience init( + envVars: EnvVars, + eventLoopGroup: EventLoopGroup, + logger: Logger? + ) { + self.init( + host: envVars.host, + port: envVars.port != nil ? Int(envVars.port!) : nil, + identifier: envVars.identifier, + eventLoopGroupProvider: .shared(eventLoopGroup), + logger: logger, + configuration: .init( + version: .parseOrDefualt(string: envVars.version), + disablePing: false, + userName: envVars.userName, + password: envVars.password + ) + ) + } +} + +extension MQTTClient.Version { + static let `default` = Self.v3_1_1 + + static func parseOrDefualt(string: String?) -> Self { + guard let string, let value = Self(string: string) else { + return .default + } + return value + } + + init?(string: String) { + if string.contains("5") { + self = .v5_0 + } else if string.contains("3") { + self = .v3_1_1 + } else { + return nil + } + } +} + +@_spi(Internal) +public extension Logger.Level { + + /// Parse a `Logger.Level` from the loaded `EnvVars`. + static func from(environment envVars: EnvVars) -> Self { + // If the log level was set via an environment variable. + if let logLevel = envVars.logLevel { + return logLevel + } + // Parse the appEnv to derive an log level. + switch envVars.appEnv { + case .staging, .development: + return .debug + case .production: + return .info + case .testing: + return .trace + } + } +} diff --git a/Sources/DewPointController/Application.swift b/Sources/DewPointController/Application.swift index a6cc71a..03eb2e5 100644 --- a/Sources/DewPointController/Application.swift +++ b/Sources/DewPointController/Application.swift @@ -1,3 +1,4 @@ +import ArgumentParser import Dependencies import Foundation import Logging @@ -11,106 +12,10 @@ import SensorsService import ServiceLifecycle @main -struct Application { - - /// The main entry point of the application. - static func main() async throws { - let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - var logger = Logger(label: "dewpoint-controller") - logger.logLevel = .trace - - logger.info("Starting dewpoint-controller!") - - let environment = loadEnvVars(logger: logger) - - if environment.appEnv == .production { - logger.debug("Updating logging level to info.") - logger.logLevel = .info - } - - let mqtt = MQTTClient( - envVars: environment, - eventLoopGroup: eventloopGroup, - logger: logger - ) - - do { - try await withDependencies { - $0.psychrometricClient = .liveValue - $0.mqtt = .live(client: mqtt, logger: logger) - } operation: { - let mqttConnection = MQTTConnectionService(logger: logger) - let sensors = SensorsService(sensors: .live, logger: logger) - - var serviceGroupConfiguration = ServiceGroupConfiguration( - services: [ - mqttConnection, - sensors - ], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) - serviceGroupConfiguration.maximumCancellationDuration = .seconds(5) - serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10) - - let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) - - try await serviceGroup.run() - } - - try await mqtt.shutdown() - try await eventloopGroup.shutdownGracefully() - } catch { - try await eventloopGroup.shutdownGracefully() - } - } -} - -// MARK: - Helpers - -private func loadEnvVars(logger: Logger) -> EnvVars { - let defaultEnvVars = EnvVars() - let encoder = JSONEncoder() - let decoder = JSONDecoder() - - let defaultEnvDict = (try? encoder.encode(defaultEnvVars)) - .flatMap { try? decoder.decode([String: String].self, from: $0) } - ?? [:] - - let envVarsDict = defaultEnvDict - .merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 }) - - let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict)) - .flatMap { try? decoder.decode(EnvVars.self, from: $0) } - ?? defaultEnvVars - - logger.debug("Done loading EnvVars...") - - return envVars -} - -private extension MQTTNIO.MQTTClient { - convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) { - self.init( - host: envVars.host, - port: envVars.port != nil ? Int(envVars.port!) : nil, - identifier: envVars.identifier, - eventLoopGroupProvider: .shared(eventLoopGroup), - logger: logger, - configuration: .init( - version: .v3_1_1, - disablePing: false, - userName: envVars.userName, - password: envVars.password - ) - ) - } -} - -private extension Array where Element == TemperatureAndHumiditySensor { - static var live: Self { - TemperatureAndHumiditySensor.Location.allCases.map { location in - TemperatureAndHumiditySensor(location: location) - } - } +struct Application: AsyncParsableCommand { + static let configuration = CommandConfiguration( + commandName: "dewpoint-controller", + abstract: "Command for running the dewpoint mqtt service.", + subcommands: [Run.self, Debug.self] + ) } diff --git a/Sources/DewPointController/DebugCommand.swift b/Sources/DewPointController/DebugCommand.swift new file mode 100644 index 0000000..bd7b371 --- /dev/null +++ b/Sources/DewPointController/DebugCommand.swift @@ -0,0 +1,74 @@ +import ArgumentParser +import CliClient +import CustomDump +import Dependencies +import DotEnv +import Foundation +import Logging +import Models + +extension Application { + + struct Debug: AsyncParsableCommand { + + static let configuration: CommandConfiguration = .init( + commandName: "debug", + abstract: "Debug the environment variables." + ) + + @OptionGroup + var shared: SharedOptions + + @Flag( + name: [.customLong("show-password")], + help: "Don't redact the password from the console." + ) + var showPassword: Bool = false + + mutating func run() async throws { + @Dependency(\.cliClient) var client + let logger = Logger(label: "debug-command") + + print("--------------------------") + print("Running debug command...") + if let envFile = shared.envFile { + print("Reading env file: \(envFile)") + print("--------------------------") + } else { + print("No env file set.") + print("--------------------------") + } + + print("Loading EnvVars") + print("--------------------------") + let envVars = try await client.makeEnvVars(shared.envVarsRequest(logger: logger)) + printEnvVars(envVars: envVars, showPassword: showPassword) + print("--------------------------") + + if let logLevel = shared.logLevel, let level = logLevel() { + print("Log Level option: \(level)") + print("--------------------------") + } else { + print("Log Level option: nil") + print("--------------------------") + } + } + + private func printEnvVars(envVars: EnvVars, showPassword: Bool) { + // show the proper password to show depending on if it exists + // and if we should redact it or not. + var passwordString: String? + switch (showPassword, envVars.password) { + case (true, .none), (_, .none): + break + case (true, let .some(password)): + passwordString = password + case (false, .some): + passwordString = "" + } + var envVars = envVars + envVars.password = passwordString + customDump(envVars) + } + } +} diff --git a/Sources/DewPointController/RunCommand.swift b/Sources/DewPointController/RunCommand.swift new file mode 100644 index 0000000..f3cbb6e --- /dev/null +++ b/Sources/DewPointController/RunCommand.swift @@ -0,0 +1,86 @@ +import ArgumentParser +import CliClient +import Dependencies +import Foundation +import Logging +import Models +import MQTTConnectionService +import MQTTManager +import MQTTNIO +import NIO +import PsychrometricClientLive +import SensorsService +import ServiceLifecycle + +extension Application { + /// Run the controller. + /// + struct Run: AsyncParsableCommand { + + static let configuration = CommandConfiguration( + commandName: "run", + abstract: "Run the controller." + ) + + @OptionGroup + var shared: SharedOptions + + mutating func run() async throws { + @Dependency(\.cliClient) var cliClient + + let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + var logger = Logger(label: "dewpoint-controller") + let mqtt = try await setup(eventLoopGroup: eventloopGroup, logger: &logger) + + do { + try await withDependencies { + $0.psychrometricClient = .liveValue + $0.mqtt = .live(client: mqtt, logger: logger) + } operation: { + let mqttConnection = MQTTConnectionService(logger: logger) + let sensors = SensorsService(sensors: .live, logger: logger) + + var serviceGroupConfiguration = ServiceGroupConfiguration( + services: [ + mqttConnection, + sensors + ], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + serviceGroupConfiguration.maximumCancellationDuration = .seconds(5) + serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10) + + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + logger.info("Starting dewpoint-controller!") + try await serviceGroup.run() + } + + try await mqtt.shutdown() + try await eventloopGroup.shutdownGracefully() + } catch { + try await eventloopGroup.shutdownGracefully() + } + } + + private func setup( + eventLoopGroup: MultiThreadedEventLoopGroup, + logger: inout Logger + ) async throws -> MQTTClient { + @Dependency(\.cliClient) var cliClient + + let environment = try await cliClient.makeEnvVars(shared.envVarsRequest(logger: logger)) + logger.logLevel = cliClient.logLevel(environment) + + return try cliClient.makeClient(.init( + envVars: environment, + eventLoopGroup: eventLoopGroup, + logger: logger + )) + } + } + +} + +// MARK: - Helpers diff --git a/Sources/DewPointController/SharedOptions.swift b/Sources/DewPointController/SharedOptions.swift new file mode 100644 index 0000000..9260dce --- /dev/null +++ b/Sources/DewPointController/SharedOptions.swift @@ -0,0 +1,50 @@ +import ArgumentParser +import CliClient +import Logging +import Models +import MQTTNIO + +extension Application { + + struct SharedOptions: ParsableArguments { + @Option( + name: [.short, .customLong("env-file")], + help: "A file path to an env file." + ) + var envFile: String? + + @Option( + name: [.short, .customLong("log-level")], + help: "Set the logging level." + ) + var logLevel: LogLevelContainer? + + @Option( + name: [.short, .long], + help: "Set the MQTT connecition version." + ) + var version: String? + + func envVarsRequest(logger: Logger?) -> CliClient.EnvVarsRequest { + .init(envFilePath: envFile, logger: logger, version: version) + } + + } +} + +/// A container type for making `Logger.Level` into a type +/// that can be parsed as a command line argument. This is +/// to suppress warnings vs. having `Logger.Level` adopt the +/// protocol. +@_spi(Internal) +public struct LogLevelContainer: ExpressibleByArgument { + public let logLevel: Logger.Level? + + public init?(argument: String) { + self.logLevel = .init(rawValue: argument.lowercased()) + } + + public func callAsFunction() -> Logger.Level? { + logLevel + } +} diff --git a/Sources/DewPointController/Utils.swift b/Sources/DewPointController/Utils.swift new file mode 100644 index 0000000..9af4447 --- /dev/null +++ b/Sources/DewPointController/Utils.swift @@ -0,0 +1,10 @@ +import Models + +@_spi(Internal) +public extension Array where Element == TemperatureAndHumiditySensor { + static var live: Self { + TemperatureAndHumiditySensor.Location.allCases.map { location in + TemperatureAndHumiditySensor(location: location) + } + } +} diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index fd5e85f..3039a66 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -5,13 +5,14 @@ import MQTTManager import ServiceLifecycle public struct MQTTConnectionService: Service { - @Dependency(\.mqtt) var mqtt private let logger: Logger? public init( logger: Logger? = nil ) { + var logger = logger + logger?[metadataKey: "type"] = "mqtt-connection-service" self.logger = logger } @@ -19,6 +20,7 @@ public struct MQTTConnectionService: Service { /// to the MQTT broker and handles graceful shutdown of the /// connection. public func run() async throws { + @Dependency(\.mqtt) var mqtt try await mqtt.connect() try await withGracefulShutdownHandler { @@ -30,11 +32,7 @@ public struct MQTTConnectionService: Service { } } onGracefulShutdown: { self.logger?.trace("Received graceful shutdown.") - shutdown() + mqtt.shutdown() } } - - public func shutdown() { - mqtt.shutdown() - } } diff --git a/Sources/MQTTManager/Interface.swift b/Sources/MQTTManager/Interface.swift index c618782..4784e47 100644 --- a/Sources/MQTTManager/Interface.swift +++ b/Sources/MQTTManager/Interface.swift @@ -9,16 +9,16 @@ import NIO public extension DependencyValues { /// A dependency that is responsible for managing the connection to - /// an MQTT broker. + /// an MQTT broker, listen to topics, and publish values back to the + /// broker. var mqtt: MQTTManager { get { self[MQTTManager.self] } set { self[MQTTManager.self] = newValue } } } -/// Represents the interface needed for the ``MQTTConnectionService``. +/// Represents the interface needed to connect, listen, and publish to an MQTT broker. /// -/// See ``MQTTConnectionManagerLive`` module for live implementation. @DependencyClient public struct MQTTManager: Sendable { @@ -28,6 +28,8 @@ public struct MQTTManager: Sendable { public var connect: @Sendable () async throws -> Void /// Create a stream of connection events. + /// + /// - SeeAlso: ``Event`` public var connectionStream: @Sendable () throws -> AsyncStream private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream @@ -38,10 +40,24 @@ public struct MQTTManager: Sendable { /// Shutdown the connection to the MQTT broker. public var shutdown: @Sendable () -> Void - /// Perform an operation with the underlying MQTTClient, this can be useful in - /// tests, so this module needs imported with `@_spi(Testing) import` to use this method. private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void + public init( + connect: @escaping @Sendable () async throws -> Void, + connectionStream: @escaping @Sendable () throws -> AsyncStream, + listen: @escaping @Sendable ([String], MQTTQoS) async throws -> MQTTManager.ListenStream, + publish: @escaping @Sendable (MQTTManager.PublishRequest) async throws -> Void, + shutdown: @escaping @Sendable () -> Void, + withClient: @escaping @Sendable ((MQTTClient) async throws -> Void) async throws -> Void = { _ in unimplemented() } + ) { + self.connect = connect + self.connectionStream = connectionStream + self._listen = listen + self.publish = publish + self.shutdown = shutdown + self._withClient = withClient + } + /// Create an async stream that listens for changes to the given topics. /// /// - Parameters: @@ -77,18 +93,20 @@ public struct MQTTManager: Sendable { _ payload: ByteBuffer, to topicName: String, qos: MQTTQoS, - retain: Bool = false + retain: Bool = false, + properties: MQTTProperties = .init() ) async throws { try await publish(.init( topicName: topicName, payload: payload, qos: qos, - retain: retain + retain: retain, + properties: properties )) } /// Perform an operation with the underlying MQTTClient, this can be useful in - /// tests, so this module needs imported with `@_spi(Testing) import` to use this method. + /// tests, so this module needs imported with `@_spi(Internal) import MQTTManager` to use this method. @_spi(Internal) public func withClient( _ callback: @Sendable (MQTTClient) async throws -> Void @@ -98,7 +116,7 @@ public struct MQTTManager: Sendable { /// Represents connection events that clients can listen for and /// react accordingly. - public enum Event: Sendable { + public enum Event: Equatable, Sendable { case connected case disconnected case shuttingDown @@ -106,7 +124,7 @@ public struct MQTTManager: Sendable { /// Represents the parameters required to publish a new value to the /// MQTT broker. - public struct PublishRequest: Equatable, Sendable { + public struct PublishRequest: Sendable { /// The topic to publish the new value to. public let topicName: String @@ -120,6 +138,8 @@ public struct MQTTManager: Sendable { /// The retain flag for the request. public let retain: Bool + public let properties: MQTTProperties + /// Create a new publish request. /// /// - Parameters: @@ -131,12 +151,14 @@ public struct MQTTManager: Sendable { topicName: String, payload: ByteBuffer, qos: MQTTQoS, - retain: Bool + retain: Bool, + properties: MQTTProperties ) { self.topicName = topicName self.payload = payload self.qos = qos self.retain = retain + self.properties = properties } } @@ -164,7 +186,7 @@ public extension MQTTManager { .removeDuplicates() .eraseToStream() }, - _listen: { topics, qos in + listen: { topics, qos in try await manager.listen(to: topics, qos: qos) }, publish: { request in @@ -174,19 +196,20 @@ public extension MQTTManager { return } logger?.trace("Begin publishing to topic: \(topic)") - defer { logger?.trace("Done publishing to topic: \(topic)") } + defer { logger?.debug("Done publishing to topic: \(topic)") } try await client.publish( to: request.topicName, payload: request.payload, qos: request.qos, - retain: request.retain - ) + retain: request.retain, + properties: request.properties + ).get() }, shutdown: { Task { try await client.shutdown() } manager.shutdown() }, - _withClient: { callback in + withClient: { callback in try await callback(client) } ) diff --git a/Sources/MQTTManager/Internal/TopicListenerStream.swift b/Sources/MQTTManager/Internal/TopicListenerStream.swift index 4db44c7..ade6b24 100644 --- a/Sources/MQTTManager/Internal/TopicListenerStream.swift +++ b/Sources/MQTTManager/Internal/TopicListenerStream.swift @@ -107,42 +107,6 @@ actor TopicListenerStream { onShutdownHandler = { task.cancel() } } - // TODO: remove. - func listen( - _ topics: [String], - _ qos: MQTTQoS = .atLeastOnce - ) async throws -> Stream { - var sleepTimes = 0 - - while !client.isActive() { - guard sleepTimes < 10 else { - throw TopicListenerError.connectionTimeout - } - try? await Task.sleep(for: .milliseconds(100)) - sleepTimes += 1 - } - - client.logger.trace("Client is active, begin subscribing to topics.") - - try await subscribe() - - client.logger.trace("Done subscribing, begin listening to topics.") - - client.addPublishListener(named: name) { result in - switch result { - case let .failure(error): - self.logger?.error("Received error while listening: \(error)") - case let .success(publishInfo): - if topics.contains(publishInfo.topicName) { - self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)") - self.continuation.yield(publishInfo) - } - } - } - - return stream - } - private func setIsShuttingDown() { shuttingDown = true onShutdownHandler = nil diff --git a/Sources/Models/EnvVars.swift b/Sources/Models/EnvVars.swift index a7d4392..be211f3 100755 --- a/Sources/Models/EnvVars.swift +++ b/Sources/Models/EnvVars.swift @@ -25,6 +25,12 @@ public struct EnvVars: Codable, Equatable, Sendable { /// The MQTT user password. public var password: String? + /// Set a custom logging level. + public var logLevel: Logger.Level? + + /// Set the mqtt broker version. + public var version: String? + /// Create a new ``EnvVars`` /// /// - Parameters: @@ -38,9 +44,11 @@ public struct EnvVars: Codable, Equatable, Sendable { appEnv: AppEnv = .development, host: String = "127.0.0.1", port: String? = "1883", - identifier: String = "dewPoint-controller", + identifier: String = "dewpoint-controller", userName: String? = "mqtt_user", - password: String? = "secret!" + password: String? = "secret!", + logLevel: Logger.Level? = nil, + version: String? = nil ) { self.appEnv = appEnv self.host = host @@ -48,6 +56,8 @@ public struct EnvVars: Codable, Equatable, Sendable { self.identifier = identifier self.userName = userName self.password = password + self.logLevel = logLevel + self.version = version } /// Custom coding keys. @@ -58,6 +68,8 @@ public struct EnvVars: Codable, Equatable, Sendable { case identifier = "MQTT_IDENTIFIER" case userName = "MQTT_USERNAME" case password = "MQTT_PASSWORD" + case logLevel = "LOG_LEVEL" + case version = "MQTT_VERSION" } /// Represents the different app environments. diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/IntegrationTests/IntegrationTests.swift old mode 100755 new mode 100644 similarity index 58% rename from Tests/SensorsServiceTests/SensorsClientTests.swift rename to Tests/IntegrationTests/IntegrationTests.swift index 8f6ec36..c46e8cc --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/IntegrationTests/IntegrationTests.swift @@ -1,20 +1,25 @@ import Dependencies + +// @_spi(Internal) import dewpoint_controller import Logging import Models +import MQTTConnectionService @_spi(Internal) import MQTTManager import MQTTNIO import NIO import PsychrometricClientLive @_spi(Internal) import SensorsService +import ServiceLifecycle +import ServiceLifecycleTestKit import XCTest -final class SensorsClientTests: XCTestCase { +final class IntegrationTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let logger: Logger = { - var logger = Logger(label: "SensorsClientTests") - logger.logLevel = .trace + var logger = Logger(label: "IntegrationTests") + logger.logLevel = .info return logger }() @@ -29,6 +34,71 @@ final class SensorsClientTests: XCTestCase { } } + func testConnectionServiceShutdown() async throws { + @Dependency(\.mqtt) var mqtt + + let service = MQTTConnectionService(logger: Self.logger) + let task = Task { try await service.run() } + defer { task.cancel() } + + try await Task.sleep(for: .milliseconds(200)) + + // check the connection is active here. + try await mqtt.withClient { client in + XCTAssert(client.isActive()) + } + mqtt.shutdown() + + try await Task.sleep(for: .milliseconds(500)) + + // check the connection is active here. + try await mqtt.withClient { client in + XCTAssertFalse(client.isActive()) + } + } + + func testMQTTConnectionStream() async throws { + let client = createClient(identifier: "testNonManagedStream") + let manager = MQTTManager.live( + client: client, + logger: Self.logger, + alwaysReconnect: false + ) + defer { manager.shutdown() } + let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger) + let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger) + var events1 = [MQTTManager.Event]() + var events2 = [MQTTManager.Event]() + + let stream1 = connectionStream1.start() + let stream2 = connectionStream2.start() + + _ = try await manager.connect() + + Task { + while !client.isActive() { + try await Task.sleep(for: .milliseconds(100)) + } + try await Task.sleep(for: .milliseconds(200)) + try await client.disconnect() + try await Task.sleep(for: .milliseconds(500)) + manager.shutdown() + try await Task.sleep(for: .milliseconds(500)) + connectionStream1.stop() + connectionStream2.stop() + } + + for await event in stream1.removeDuplicates() { + events1.append(event) + } + for await event in stream2.removeDuplicates() { + events2.append(event) + } + + XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown]) + XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown]) + } + func testListeningResumesAfterDisconnectThenReconnect() async throws { struct TimeoutError: Error {} @@ -99,6 +169,8 @@ final class SensorsClientTests: XCTestCase { userName: nil, password: nil ) + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + // return .init(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: Self.logger) let config = MQTTClient.Configuration( version: .v3_1_1, userName: envVars.userName, @@ -111,14 +183,15 @@ final class SensorsClientTests: XCTestCase { return .init( host: Self.hostname, identifier: identifier, - eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)), + eventLoopGroupProvider: .shared(eventLoopGroup), logger: Self.logger, configuration: config ) } + } -// MARK: Helpers for tests. +// - MARK: Helpers struct TopicNotFoundError: Error {} diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift deleted file mode 100644 index 8940623..0000000 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ /dev/null @@ -1,128 +0,0 @@ -import Dependencies -import Logging -import Models -import MQTTConnectionService -@_spi(Internal) import MQTTManager -import MQTTNIO -import NIO -import PsychrometricClientLive -import ServiceLifecycle -import ServiceLifecycleTestKit -import XCTest - -// TODO: Rename to integration test when other tests are moved. -final class MQTTConnectionServiceTests: XCTestCase { - - static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" - - static let logger: Logger = { - var logger = Logger(label: "MQTTConnectionServiceTests") - logger.logLevel = .info - return logger - }() - - override func invokeTest() { - let client = createClient(identifier: "\(Self.self)") - - withDependencies { - $0.mqtt = .live(client: client, logger: Self.logger) - $0.psychrometricClient = PsychrometricClient.liveValue - } operation: { - super.invokeTest() - } - } - - func testConnectionServiceShutdown() async throws { - @Dependency(\.mqtt) var mqtt - - let service = MQTTConnectionService(logger: Self.logger) - let task = Task { try await service.run() } - defer { task.cancel() } - - try await Task.sleep(for: .milliseconds(200)) - - // check the connection is active here. - try await mqtt.withClient { client in - XCTAssert(client.isActive()) - } - service.shutdown() - - try await Task.sleep(for: .milliseconds(500)) - - // check the connection is active here. - try await mqtt.withClient { client in - XCTAssertFalse(client.isActive()) - } - } - - // TODO: Move to integration tests. - func testMQTTConnectionStream() async throws { - let client = createClient(identifier: "testNonManagedStream") - let manager = MQTTManager.live( - client: client, - logger: Self.logger, - alwaysReconnect: false - ) - defer { manager.shutdown() } - let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger) - let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger) - var events1 = [MQTTManager.Event]() - var events2 = [MQTTManager.Event]() - - let stream1 = connectionStream1.start() - let stream2 = connectionStream2.start() - - _ = try await manager.connect() - - Task { - while !client.isActive() { - try await Task.sleep(for: .milliseconds(100)) - } - try await Task.sleep(for: .milliseconds(200)) - try await client.disconnect() - try await Task.sleep(for: .milliseconds(500)) - manager.shutdown() - try await Task.sleep(for: .milliseconds(500)) - connectionStream1.stop() - connectionStream2.stop() - } - - for await event in stream1.removeDuplicates() { - events1.append(event) - } - for await event in stream2.removeDuplicates() { - events2.append(event) - } - - XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown]) - XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown]) - } - - func createClient(identifier: String) -> MQTTClient { - let envVars = EnvVars( - appEnv: .testing, - host: Self.hostname, - port: "1883", - identifier: identifier, - userName: nil, - password: nil - ) - let config = MQTTClient.Configuration( - version: .v3_1_1, - userName: envVars.userName, - password: envVars.password, - useSSL: false, - useWebSockets: false, - tlsConfiguration: nil, - webSocketURLPath: nil - ) - return .init( - host: Self.hostname, - identifier: identifier, - eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)), - logger: Self.logger, - configuration: config - ) - } - -} diff --git a/docker/Dockerfile b/docker/Dockerfile index 3f2437b..fb0fdba 100755 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,6 +1,8 @@ # Used this to build the release version of the image. # Build the executable -FROM swift:5.10 AS build +ARG SWIFT_IMAGE_VERSION="5.10" + +FROM swift:${SWIFT_IMAGE_VERSION} AS build WORKDIR /build COPY ./Package.* ./ RUN swift package resolve @@ -8,7 +10,6 @@ COPY . . RUN swift build -c release -Xswiftc -g # Run image -FROM swift:5.10-slim -WORKDIR /run -COPY --from=build /build/.build/release/dewpoint-controller /run -CMD ["/bin/bash", "-xc", "./dewpoint-controller"] +FROM swift:${SWIFT_IMAGE_VERSION}-slim +COPY --from=build /build/.build/release/dewpoint-controller /usr/local/bin +CMD ["/bin/bash", "-xc", "/usr/local/bin/dewpoint-controller run"]