10 Commits

17 changed files with 907 additions and 680 deletions

View File

@@ -8,3 +8,4 @@
--wrapconditions after-first --wrapconditions after-first
--typeblanklines preserve --typeblanklines preserve
--commas inline --commas inline
--stripunusedargs closure-only

View File

@@ -11,4 +11,4 @@ RUN swift build --enable-test-discovery -c release -Xswiftc -g
FROM swift:5.10-slim FROM swift:5.10-slim
WORKDIR /run WORKDIR /run
COPY --from=build /build/.build/release/dewPoint-controller /run COPY --from=build /build/.build/release/dewPoint-controller /run
CMD ["/bin/bash", "-xc", "./dewPoint-controller"] CMD ["/bin/bash", "-xc", "./dewpoint-controller"]

View File

@@ -23,7 +23,7 @@ stop-mosquitto:
@docker-compose rm -f mosquitto || true @docker-compose rm -f mosquitto || true
test-docker: test-docker:
@docker-compose run --remove-orphans -i --rm test @docker-compose run --build --remove-orphans -i --rm test
@docker-compose kill mosquitto-test @docker-compose kill mosquitto-test
@docker-compose rm -f @docker-compose rm -f

View File

@@ -8,17 +8,20 @@ let swiftSettings: [SwiftSetting] = [
] ]
let package = Package( let package = Package(
name: "dewPoint-controller", name: "dewpoint-controller",
platforms: [ platforms: [
.macOS(.v14) .macOS(.v14)
], ],
products: [ products: [
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]), .executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
.library(name: "Models", targets: ["Models"]), .library(name: "Models", targets: ["Models"]),
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"]) .library(name: "SensorsService", targets: ["SensorsService"]),
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
], ],
dependencies: [ dependencies: [
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"), .package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"), .package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
@@ -28,20 +31,18 @@ let package = Package(
], ],
targets: [ targets: [
.executableTarget( .executableTarget(
name: "dewPoint-controller", name: "dewpoint-controller",
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionManager",
"MQTTConnectionService", "MQTTConnectionService",
"SensorsService", "SensorsService",
"TopicDependencies",
.product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio"), .product(name: "NIO", package: "swift-nio"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics") .product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
] ]
), ),
.testTarget(
name: "dewPoint-controllerTests",
dependencies: ["dewPoint-controller"]
),
.target( .target(
name: "Models", name: "Models",
dependencies: [ dependencies: [
@@ -50,11 +51,21 @@ let package = Package(
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
), ),
.target(
name: "MQTTConnectionManager",
dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
),
.target( .target(
name: "MQTTConnectionService", name: "MQTTConnectionService",
dependencies: [ dependencies: [
"Models", "Models",
.product(name: "MQTTNIO", package: "mqtt-nio"), "MQTTConnectionManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle") .product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
@@ -63,6 +74,7 @@ let package = Package(
name: "MQTTConnectionServiceTests", name: "MQTTConnectionServiceTests",
dependencies: [ dependencies: [
"MQTTConnectionService", "MQTTConnectionService",
"MQTTConnectionManager",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
] ]
), ),
@@ -71,6 +83,7 @@ let package = Package(
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionService", "MQTTConnectionService",
"TopicDependencies",
.product(name: "Dependencies", package: "swift-dependencies"), .product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"), .product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "MQTTNIO", package: "mqtt-nio"),
@@ -84,6 +97,15 @@ let package = Package(
"SensorsService", "SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics") .product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
] ]
),
.target(
name: "TopicDependencies",
dependencies: [
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
) )
] ]
) )

View File

@@ -0,0 +1,199 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Shutdown the connection to the MQTT broker.
///
/// - Note: You should cancel any tasks that are listening to the connection stream first.
public var shutdown: @Sendable () -> Void
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
public static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init {
try await manager.connect(cleanSession: cleanSession)
} shutdown: {
manager.shutdown()
} stream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
}
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
// MARK: - Helpers
final class MQTTConnectionStream: AsyncSequence, Sendable {
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
private let name: String
private let stream: AsyncStream<Element>
init(client: MQTTClient, logger: Logger?) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.logger = logger
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
func start(
isolation: isolated (any Actor)? = #isolation
) -> AsyncStream<Element> {
// Check if the client is active and yield the result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down.")
self.continuation.yield(.shuttingDown)
self.stop()
}
return stream
}
func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
public __consuming func makeAsyncIterator() -> AsyncIterator {
start().makeAsyncIterator()
}
}
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
self.client = client
self.logger = logger
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
// We should've already logged that we're shutting down if
// the manager was shutdown properly, so don't log it twice.
self.shutdown(withLogging: false)
}
private func setHasConnected() {
hasConnected = true
}
func connect(
isolation: isolated (any Actor)? = #isolation,
cleanSession: Bool
) async throws {
guard !(await hasConnected) else { return }
do {
try await client.connect(cleanSession: cleanSession)
await setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
}

View File

@@ -1,151 +1,37 @@
@preconcurrency import Foundation import Dependencies
import Logging import Logging
import Models import Models
import MQTTNIO import MQTTConnectionManager
import NIO
import ServiceLifecycle import ServiceLifecycle
// TODO: This may not need to be an actor.
/// Manages the MQTT broker connection.
public actor MQTTConnectionService: Service { public actor MQTTConnectionService: Service {
@Dependency(\.mqttConnectionManager) var manager
private let cleanSession: Bool private nonisolated let logger: Logger?
public let client: MQTTClient
private let continuation: AsyncStream<Event>.Continuation
public nonisolated let events: AsyncStream<Event>
private let internalEventStream: ConnectionStream
nonisolated var logger: Logger { client.logger }
// private var shuttingDown = false
public init( public init(
cleanSession: Bool = true, logger: Logger? = nil
client: MQTTClient
) { ) {
self.cleanSession = cleanSession self.logger = logger
self.client = client
self.internalEventStream = .init()
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
} }
deinit { /// The entry-point of the service which starts the connection
self.logger.debug("MQTTConnectionService is gone.") /// to the MQTT broker and handles graceful shutdown of the
self.internalEventStream.stop() /// connection.
continuation.finish()
}
/// The entry-point of the service.
///
/// This method connects to the MQTT broker and manages the connection.
/// It will attempt to gracefully shutdown the connection upon receiving
/// `sigterm` signals.
public func run() async throws { public func run() async throws {
await withGracefulShutdownHandler { try await withGracefulShutdownHandler {
await withDiscardingTaskGroup { group in try await manager.connect()
group.addTask { await self.connect() } for await event in try manager.stream().cancelOnGracefulShutdown() {
group.addTask { // We don't really need to do anything with the events, so just logging
await self.internalEventStream.start { self.client.isActive() } // for now. But we need to iterate on an async stream for the service to
} // continue to run and handle graceful shutdowns.
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() { logger?.trace("Received connection event: \(event)")
if event == .shuttingDown {
self.shutdown()
break
}
self.logger.trace("Sending connection event: \(event)")
self.continuation.yield(event)
}
group.cancelAll()
} }
// when we reach here we are shutting down, so we shutdown
// the manager.
manager.shutdown()
} onGracefulShutdown: { } onGracefulShutdown: {
self.logger.trace("Received graceful shutdown.") self.logger?.trace("Received graceful shutdown.")
self.shutdown()
} }
} }
func connect() async {
do {
try await withThrowingDiscardingTaskGroup { group in
group.addTask {
try await self.client.connect(cleanSession: self.cleanSession)
}
client.addCloseListener(named: "\(Self.self)") { _ in
Task {
self.logger.debug("Connection closed.")
self.logger.debug("Reconnecting...")
await self.connect()
}
}
self.logger.debug("Connection successful.")
self.continuation.yield(.connected)
}
} catch {
logger.trace("Failed to connect: \(error)")
continuation.yield(.disconnected)
}
}
private nonisolated func shutdown() {
logger.debug("Begin shutting down MQTT broker connection.")
client.removeCloseListener(named: "\(Self.self)")
internalEventStream.stop()
_ = client.disconnect()
try? client.syncShutdownGracefully()
continuation.finish()
logger.info("MQTT broker connection closed.")
}
}
extension MQTTConnectionService {
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
// TODO: This functionality can probably move into the connection service.
private final class ConnectionStream: Sendable {
// private var cancellable: AnyCancellable?
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
let events: AsyncStream<MQTTConnectionService.Event>
init() {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
stop()
}
func start(isActive connectionIsActive: @escaping () -> Bool) async {
try? await Task.sleep(for: .seconds(1))
let event: MQTTConnectionService.Event = connectionIsActive()
? .connected
: .disconnected
continuation.yield(event)
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
// .autoconnect()
// .sink { [weak self] (_: Date) in
// let event: MQTTConnectionService.Event = connectionIsActive()
// ? .connected
// : .disconnected
//
// self?.continuation.yield(event)
// }
}
func stop() {
continuation.yield(.shuttingDown)
continuation.finish()
}
}
} }

View File

@@ -56,10 +56,11 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var dewPoint: DewPoint? { public var dewPoint: DewPoint? {
get async { get async {
guard let temperature = temperature, guard let temperature = temperature,
let humidity = humidity let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil } else { return nil }
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity)) return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
// return .init(dryBulb: temperature, humidity: humidity)
} }
} }
@@ -67,12 +68,13 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var enthalpy: EnthalpyOf<MoistAir>? { public var enthalpy: EnthalpyOf<MoistAir>? {
get async { get async {
guard let temperature = temperature, guard let temperature = temperature,
let humidity = humidity let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil } else { return nil }
return try? await psychrometrics.enthalpy.moistAir( return try? await psychrometrics.enthalpy.moistAir(
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude) .dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
) )
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
} }
} }
@@ -129,10 +131,10 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
prefix = "\(prefix.dropLast())" prefix = "\(prefix.dropLast())"
} }
self.init( self.init(
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state", dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state", enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state", humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state" temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
) )
} }
} }

View File

@@ -3,7 +3,7 @@
/// This allows values to only publish changes if they have changed since the /// This allows values to only publish changes if they have changed since the
/// last time they were recieved. /// last time they were recieved.
@propertyWrapper @propertyWrapper
public struct TrackedChanges<Value> { public struct TrackedChanges<Value: Sendable>: Sendable {
/// The current tracking state. /// The current tracking state.
private var tracking: TrackingState private var tracking: TrackingState
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
private var value: Value private var value: Value
/// Used to check if a new value is equal to an old value. /// Used to check if a new value is equal to an old value.
private var isEqual: (Value, Value) -> Bool private var isEqual: @Sendable (Value, Value) -> Bool
/// Access to the underlying property that we are wrapping. /// Access to the underlying property that we are wrapping.
public var wrappedValue: Value { public var wrappedValue: Value {
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
public init( public init(
wrappedValue: Value, wrappedValue: Value,
needsProcessed: Bool = false, needsProcessed: Bool = false,
isEqual: @escaping (Value, Value) -> Bool isEqual: @escaping @Sendable (Value, Value) -> Bool
) { ) {
self.value = wrappedValue self.value = wrappedValue
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
wrappedValue: Value, wrappedValue: Value,
needsProcessed: Bool = false needsProcessed: Bool = false
) { ) {
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==) self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
$0 == $1
}
} }
} }
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
hasher.combine(needsProcessed) hasher.combine(needsProcessed)
} }
} }
extension TrackedChanges: Sendable where Value: Sendable {}

View File

@@ -1,6 +1,3 @@
import Logging
import Models
import MQTTNIO
import NIO import NIO
import NIOFoundationCompat import NIOFoundationCompat
import PsychrometricClient import PsychrometricClient

View File

@@ -3,305 +3,160 @@ import DependenciesMacros
import Foundation import Foundation
import Logging import Logging
import Models import Models
import MQTTConnectionService import MQTTNIO
@preconcurrency import MQTTNIO
import NIO import NIO
import PsychrometricClient import PsychrometricClient
import ServiceLifecycle import ServiceLifecycle
import TopicDependencies
@DependencyClient /// Service that is responsible for listening to changes of the temperature and humidity
public struct SensorsClient: Sendable { /// sensors, then publishing back the calculated dew-point temperature and enthalpy for
/// the sensor location.
///
///
public actor SensorsService: Service {
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo> @Dependency(\.topicListener) var topicListener
public var logger: Logger? @Dependency(\.topicPublisher) var topicPublisher
public var publish: @Sendable (Double, String) async throws -> Void
public var shutdown: @Sendable () -> Void = {}
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> { /// The logger to use for the service.
try await listen(topics) private let logger: Logger?
}
public func publish(_ value: Double, to topic: String) async throws { /// The sensors that we are listening for updates to, so
try await publish(value, topic) /// that we can calculate the dew-point temperature and enthalpy
} /// values to publish back to the MQTT broker.
} var sensors: [TemperatureAndHumiditySensor]
extension SensorsClient: TestDependencyKey { var topics: [String] {
public static var testValue: SensorsClient {
Self()
}
}
public extension DependencyValues {
var sensorsClient: SensorsClient {
get { self[SensorsClient.self] }
set { self[SensorsClient.self] = newValue }
}
}
public actor SensorsService2: Service {
@Dependency(\.sensorsClient) var client
private var sensors: [TemperatureAndHumiditySensor]
public init(
sensors: [TemperatureAndHumiditySensor]
) {
self.sensors = sensors
}
public func run() async throws {
guard sensors.count > 0 else {
throw SensorCountError()
}
let stream = try await client.listen(to: topics)
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
for await result in stream.cancelOnGracefulShutdown() {
group.addTask { try await self.handleResult(result) }
}
}
} onGracefulShutdown: {
Task {
await self.client.logger?.trace("Received graceful shutdown.")
try? await self.publishUpdates()
await self.client.shutdown()
}
}
} catch {
client.logger?.trace("Error: \(error)")
client.shutdown()
}
}
private var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature) array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity) array.append(sensor.topics.humidity)
} }
} }
private func handleResult(_ result: MQTTPublishInfo) async throws { /// Create a new sensors service that listens to the passed in
let topic = result.topicName /// sensors.
client.logger?.trace("Begin handling result for topic: \(topic)") ///
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
///
/// - Parameters:
/// - sensors: The sensors to listen for changes to.
/// - logger: An optional logger to use.
public init(
sensors: [TemperatureAndHumiditySensor],
logger: Logger? = nil
) {
self.sensors = sensors
self.logger = logger
}
func decode<V: BufferInitalizable>(_: V.Type) -> V? { /// Start the service with graceful shutdown, which will attempt to publish
var buffer = result.payload /// any pending changes to the MQTT broker, upon a shutdown signal.
return V(buffer: &buffer) public func run() async throws {
} precondition(sensors.count > 0, "Sensors should not be empty.")
if topic.contains("temperature") { let stream = try await makeStream()
client.logger?.trace("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else { await withGracefulShutdownHandler {
client.logger?.trace("Failed to decode temperature: \(result.payload)") await withDiscardingTaskGroup { group in
throw DecodingError() for await result in stream.cancelOnGracefulShutdown() {
logger?.trace("Received result for topic: \(result.topic)")
group.addTask { await self.handleResult(result) }
}
// group.cancelAll()
} }
client.logger?.trace("Decoded temperature: \(temperature)") } onGracefulShutdown: {
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) Task {
self.logger?.trace("Received graceful shutdown.")
} else if topic.contains("humidity") { try? await self.publishUpdates()
client.logger?.trace("Begin handling humidity result.") await self.topicListener.shutdown()
guard let humidity = decode(RelativeHumidity.self) else {
client.logger?.trace("Failed to decode humidity: \(result.payload)")
throw DecodingError()
} }
client.logger?.trace("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
} else {
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
return
} }
}
try await publishUpdates() private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
client.logger?.trace("Done handling result for topic: \(topic)") try await topicListener.listen(to: topics)
// ignore errors, so that we continue to listen, but log them
// for debugging purposes.
.compactMap { result in
switch result {
case let .failure(error):
self.logger?.trace("Received error listening for sensors: \(error)")
return nil
case let .success(info):
return (info.payload, info.topicName)
}
}
// ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently.
.removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic
}
.eraseToStream()
}
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
do {
let topic = result.topic
assert(topics.contains(topic))
logger?.trace("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
var buffer = result.buffer
return V(buffer: &buffer)
}
if topic.contains("temperature") {
logger?.trace("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else {
logger?.trace("Failed to decode temperature: \(result.buffer)")
throw DecodingError()
}
logger?.trace("Decoded temperature: \(temperature)")
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
} else if topic.contains("humidity") {
logger?.trace("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
logger?.trace("Failed to decode humidity: \(result.buffer)")
throw DecodingError()
}
logger?.trace("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
}
try await publishUpdates()
logger?.trace("Done handling result for topic: \(topic)")
} catch {
logger?.error("Received error: \(error)")
}
} }
private func publish(_ double: Double?, to topic: String) async throws { private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return } guard let double else { return }
try await client.publish(double, to: topic) try await topicPublisher.publish(
client.logger?.trace("Published update to topic: \(topic)") to: topic,
payload: ByteBufferAllocator().buffer(string: "\(double)"),
qos: .exactlyOnce,
retain: true
)
logger?.trace("Published update to topic: \(topic)")
} }
private func publishUpdates() async throws { private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) { for sensor in sensors.filter(\.needsProcessed) {
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint) try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy) try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
}
}
}
public actor SensorsService: Service {
private var sensors: [TemperatureAndHumiditySensor]
private let client: MQTTClient
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
nonisolated var logger: Logger { client.logger }
private var shuttingDown: Bool = false
public init(
client: MQTTClient,
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
sensors: [TemperatureAndHumiditySensor]
) {
self.client = client
self.events = events
self.sensors = sensors
}
/// The entry-point of the service.
///
/// This method is called to start the service and begin
/// listening for sensor value changes then publishing the dew-point
/// and enthalpy values of the sensors.
public func run() async throws {
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
client.addPublishListener(named: "\(Self.self)") { result in
if self.shuttingDown {
self.logger.trace("Shutting down.")
} else if !self.client.isActive() {
self.logger.trace("Client is not currently active")
} else {
Task { try await self.handleResult(result) }
}
}
for await event in self.events().cancelOnGracefulShutdown() {
logger.trace("Received event: \(event)")
if event == .shuttingDown {
self.setIsShuttingDown()
} else if event == .connected {
group.addTask { try await self.subscribeToSensors() }
} else {
group.addTask { await self.unsubscribeToSensors() }
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
}
}
}
} onGracefulShutdown: {
// do something.
self.logger.debug("Received graceful shutdown.")
Task { [weak self] in await self?.setIsShuttingDown() }
}
} catch {
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
// to ignore the `noConnection` error.
logger.trace("Run error: \(error)")
// throw error
}
}
private func setIsShuttingDown() {
logger.debug("Received shut down event.")
Task { try await publishUpdates() }
Task { await self.unsubscribeToSensors() }
shuttingDown = true
client.removePublishListener(named: "\(Self.self)")
}
private func handleResult(
_ result: Result<MQTTPublishInfo, any Error>
) async throws {
logger.trace("Begin handling result")
do {
switch result {
case let .failure(error):
logger.debug("Failed receiving sensor: \(error)")
throw error
case let .success(value):
// do something.
let topic = value.topicName
logger.trace("Received new value for topic: \(topic)")
if topic.contains("temperature") {
// do something.
var buffer = value.payload
guard let temperature = DryBulb(buffer: &buffer) else {
logger.trace("Decoding error for topic: \(topic)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
try await publishUpdates()
} else if topic.contains("humidity") {
var buffer = value.payload
// Decode and update the temperature value
guard let humidity = RelativeHumidity(buffer: &buffer) else {
logger.debug("Failed to decode humidity from buffer: \(buffer)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
try await publishUpdates()
}
}
} catch {
logger.trace("Handle Result error: \(error)")
throw error
}
}
private func subscribeToSensors() async throws {
for sensor in sensors {
_ = try await client.subscribe(to: [
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
])
logger.debug("Subscribed to sensor: \(sensor.location)")
}
}
private func unsubscribeToSensors() async {
logger.trace("Begin unsubscribe to sensors.")
guard client.isActive() else {
logger.debug("Client is not active, skipping.")
return
}
do {
let topics = sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
try await client.unsubscribe(from: topics)
logger.trace("Unsubscribed from sensors.")
} catch {
logger.trace("Unsubscribe error: \(error)")
}
}
private func publish(double: Double?, to topic: String) async throws {
guard client.isActive() else { return }
guard let double else { return }
let rounded = round(double * 100) / 100
logger.debug("Publishing \(rounded), to: \(topic)")
try await client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
qos: .exactlyOnce,
retain: true
)
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
try sensors.hasProcessed(sensor) try sensors.hasProcessed(sensor)
} }
} }
} }
// MARK: - Errors // MARK: - Errors
struct DecodingError: Error {} struct DecodingError: Error {}
struct MQTTClientNotConnected: Error {} struct SensorNotFoundError: Error {}
struct NotFoundError: Error {}
struct SensorExists: Error {}
struct SensorCountError: Error {}
// MARK: - Helpers // MARK: - Helpers
@@ -319,14 +174,14 @@ private extension Array where Element == TemperatureAndHumiditySensor {
with value: V with value: V
) throws { ) throws {
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else { guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
throw NotFoundError() throw SensorNotFoundError()
} }
self[index][keyPath: keyPath] = value self[index][keyPath: keyPath] = value
} }
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws { mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
guard let index = firstIndex(where: { $0.id == sensor.id }) else { guard let index = firstIndex(where: { $0.id == sensor.id }) else {
throw NotFoundError() throw SensorNotFoundError()
} }
self[index].needsProcessed = false self[index].needsProcessed = false
} }

View File

@@ -0,0 +1,183 @@
import Dependencies
import DependenciesMacros
import Foundation
import MQTTNIO
/// A dependency that can generate an async stream of changes to the given topics.
///
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
/// to generate the live dependency.
@DependencyClient
public struct TopicListener: Sendable {
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, MQTTListenResultError>>
/// Create an async stream that listens for changes to the given topics.
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
/// Shutdown the listener stream.
public var shutdown: @Sendable () -> Void
/// Create a new topic listener.
///
/// - Parameters:
/// - listen: Generate an async stream of changes for the given topics.
/// - shutdown: Shutdown the topic listener stream.
public init(
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
shutdown: @Sendable @escaping () -> Void
) {
self._listen = listen
self.shutdown = shutdown
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await listen(to: topics, qos: qos)
}
/// Create the live implementation of the topic listener with the given MQTTClient.
///
/// - Parameters:
/// - client: The MQTTClient to use.
public static func live(client: MQTTClient) -> Self {
let listener = MQTTTopicListener(client: client)
return .init(
listen: { try await listener.listen($0, $1) },
shutdown: { listener.shutdown() }
)
}
}
extension TopicListener: TestDependencyKey {
public static var testValue: TopicListener { Self() }
}
public extension DependencyValues {
var topicListener: TopicListener {
get { self[TopicListener.self] }
set { self[TopicListener.self] = newValue }
}
}
// MARK: - Helpers
private actor MQTTTopicListener {
private let client: MQTTClient
private let continuation: TopicListener.Stream.Continuation
private let name: String
let stream: TopicListener.Stream
private var shuttingDown: Bool = false
init(
client: MQTTClient
) {
let (stream, continuation) = TopicListener.Stream.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws(TopicListenerError) -> TopicListener.Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw .connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
let subscription = try? await client.subscribe(to: topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
})
guard subscription != nil else {
client.logger.error("Error subscribing to topics: \(topics)")
throw .failedToSubscribe
}
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.client.logger.error("Received error while listening: \(error)")
self.continuation.yield(.failure(.init(error)))
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(.success(publishInfo))
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
}
nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task { await self.setIsShuttingDown() }
}
}
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -0,0 +1,117 @@
import Dependencies
import DependenciesMacros
import MQTTNIO
import NIO
/// A dependency that is responsible for publishing values to an MQTT broker.
///
/// - Note: This dependency only conforms to `TestDependencyKey` because it
/// requires an active `MQTTClient` to generate the live dependency.
@DependencyClient
public struct TopicPublisher: Sendable {
private var _publish: @Sendable (PublishRequest) async throws -> Void
/// Create a new topic publisher.
///
/// - Parameters:
/// - publish: Handle the publish request.
public init(
publish: @Sendable @escaping (PublishRequest) async throws -> Void
) {
self._publish = publish
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - topicName: The topic to publish the new value to.
/// - payload: The value to publish.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
to topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await _publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Create the live topic publisher with the given `MQTTClient`.
///
/// - Parameters:
/// - client: The mqtt broker client to use.
public static func live(client: MQTTClient) -> Self {
.init(
publish: { request in
guard client.isActive() else {
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
return
}
client.logger.trace("Begin publishing to topic: \(request.topicName)")
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
}
)
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
extension TopicPublisher: TestDependencyKey {
public static var testValue: TopicPublisher { Self() }
}
public extension DependencyValues {
/// A dependency that is responsible for publishing values to an MQTT broker.
var topicPublisher: TopicPublisher {
get { self[TopicPublisher.self] }
set { self[TopicPublisher.self] = newValue }
}
}

View File

@@ -1,15 +1,19 @@
import Dependencies
import Foundation import Foundation
import Logging import Logging
import Models import Models
import MQTTConnectionManager
import MQTTConnectionService import MQTTConnectionService
import MQTTNIO import MQTTNIO
import NIO import NIO
import PsychrometricClientLive import PsychrometricClientLive
import SensorsService import SensorsService
import ServiceLifecycle import ServiceLifecycle
import TopicDependencies
@main @main
struct Application { struct Application {
/// The main entry point of the application. /// The main entry point of the application.
static func main() async throws { static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
@@ -31,23 +35,37 @@ struct Application {
logger: logger logger: logger
) )
let mqttConnection = MQTTConnectionService(client: mqtt) do {
let sensors = SensorsService( try await withDependencies {
client: mqtt, $0.psychrometricClient = .liveValue
events: { mqttConnection.events }, $0.topicListener = .live(client: mqtt)
sensors: .live $0.topicPublisher = .live(client: mqtt)
) $0.mqttConnectionManager = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
let serviceGroup = ServiceGroup( var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [ services: [
mqttConnection, mqttConnection,
sensors sensors
], ],
gracefulShutdownSignals: [.sigterm, .sigint], gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger logger: logger
) )
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
try await serviceGroup.run() let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
try await serviceGroup.run()
}
try await mqtt.shutdown()
try await eventloopGroup.shutdownGracefully()
} catch {
try await eventloopGroup.shutdownGracefully()
}
} }
} }

View File

@@ -1,7 +1,7 @@
import Combine
import Logging import Logging
import Models import Models
@testable import MQTTConnectionService @testable import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO import MQTTNIO
import NIO import NIO
import ServiceLifecycle import ServiceLifecycle
@@ -13,22 +13,59 @@ final class MQTTConnectionServiceTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = { static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests") var logger = Logger(label: "MQTTConnectionServiceTests")
logger.logLevel = .trace logger.logLevel = .trace
return logger return logger
}() }()
func testGracefulShutdownWorks() async throws { // func testGracefulShutdownWorks() async throws {
try await testGracefulShutdown { trigger in // let client = createClient(identifier: "testGracefulShutdown")
let client = createClient(identifier: "testGracefulShutdown") // let service = MQTTConnectionService(client: client)
let service = MQTTConnectionService(client: client) // await service.connect()
try await service.run() // try await Task.sleep(for: .seconds(1))
try await Task.sleep(for: .seconds(1)) // XCTAssert(client.isActive())
XCTAssert(client.isActive()) // service.shutdown()
trigger.triggerGracefulShutdown() // XCTAssertFalse(client.isActive())
// try await Task.sleep(for: .seconds(2)) // }
// XCTAssertFalse(client.isActive())
func testWhatHappensIfConnectIsCalledMultipleTimes() async throws {
let client = createClient(identifier: "testWhatHappensIfConnectIsCalledMultipleTimes")
let manager = MQTTConnectionManager.live(client: client)
try await manager.connect()
try await manager.connect()
}
// TODO: Move to integration tests.
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
let stream = MQTTConnectionStream(client: client, logger: Self.logger)
var events = [MQTTConnectionManager.Event]()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
manager.shutdown()
try await client.disconnect()
try await Task.sleep(for: .milliseconds(200))
try await client.shutdown()
try await Task.sleep(for: .milliseconds(200))
stream.stop()
} }
for await event in stream.removeDuplicates() {
events.append(event)
}
XCTAssertEqual(events, [.disconnected, .connected, .disconnected, .shuttingDown])
} }
func createClient(identifier: String) -> MQTTClient { func createClient(identifier: String) -> MQTTClient {
@@ -58,65 +95,4 @@ final class MQTTConnectionServiceTests: XCTestCase {
) )
} }
func testEventStream() async throws {
var connection: ConnectionStream? = ConnectionStream()
let task = Task {
guard let events = connection?.events else { return }
print("before loop")
for await event in events {
print("\(event)")
}
print("after loop")
}
let ending = Task {
try await Task.sleep(for: .seconds(2))
connection = nil
}
connection?.start()
try await ending.value
task.cancel()
}
}
class ConnectionStream {
enum Event {
case connected
case disconnected
case shuttingDown
}
let events: AsyncStream<Event>
private let continuation: AsyncStream<Event>.Continuation
private var cancellable: AnyCancellable?
init() {
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
print("connection stream is gone.")
stop()
}
func start() {
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.sink { [weak self] _ in
print("will send event.")
self?.continuation.yield(.connected)
}
}
func stop() {
continuation.yield(.shuttingDown)
cancellable = nil
continuation.finish()
}
} }

View File

@@ -5,6 +5,7 @@ import MQTTNIO
import NIO import NIO
import PsychrometricClientLive import PsychrometricClientLive
@testable import SensorsService @testable import SensorsService
import TopicDependencies
import XCTest import XCTest
final class SensorsClientTests: XCTestCase { final class SensorsClientTests: XCTestCase {
@@ -12,7 +13,7 @@ final class SensorsClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = { static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests") var logger = Logger(label: "SensorsClientTests")
logger.logLevel = .debug logger.logLevel = .debug
return logger return logger
}() }()
@@ -25,42 +26,28 @@ final class SensorsClientTests: XCTestCase {
} }
} }
// func createClient(identifier: String) -> SensorsClient { func testWhatHappensIfClientDisconnectsWhileListening() async throws {
// let envVars = EnvVars( let client = createClient(identifier: "testWhatHappensIfClientDisconnectsWhileListening")
// appEnv: .testing, let listener = TopicListener.live(client: client)
// host: Self.hostname, try await client.connect()
// port: "1883",
// identifier: identifier, let stream = try await listener.listen("/some/topic")
// userName: nil,
// password: nil // try await Task.sleep(for: .seconds(1))
// ) // try await client.disconnect()
// return .init(envVars: envVars, logger: Self.logger) //
// } // try await client.connect()
func createClient(identifier: String) -> MQTTClient { // try await Task.sleep(for: .seconds(1))
let envVars = EnvVars( try await client.publish(
appEnv: .testing, to: "/some/topic",
host: Self.hostname, payload: ByteBufferAllocator().buffer(string: "Foo"),
port: "1883", qos: .atLeastOnce,
identifier: identifier, retain: true
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
) )
try await Task.sleep(for: .seconds(1))
listener.shutdown()
try await client.shutdown()
} }
// func testConnectAndShutdown() async throws { // func testConnectAndShutdown() async throws {
@@ -132,52 +119,52 @@ final class SensorsClientTests: XCTestCase {
// try await mqtt.shutdown() // try await mqtt.shutdown()
// } // }
func testCapturingSensorClient() async throws { // func testCapturingSensorClient() async throws {
class CapturedValues { // class CapturedValues {
var values = [(value: Double, topic: String)]() // var values = [(value: Double, topic: String)]()
var didShutdown = false // var didShutdown = false
//
init() {} // init() {}
} // }
//
let capturedValues = CapturedValues() // let capturedValues = CapturedValues()
//
try await withDependencies { // try await withDependencies {
$0.sensorsClient = .testing( // $0.sensorsClient = .testing(
yielding: [ // yielding: [
(value: 76, to: "not-listening"), // (value: 76, to: "not-listening"),
(value: 75, to: "test") // (value: 75, to: "test")
] // ]
) { value, topic in // ) { value, topic in
capturedValues.values.append((value, topic)) // capturedValues.values.append((value, topic))
} captureShutdownEvent: { // } captureShutdownEvent: {
capturedValues.didShutdown = $0 // capturedValues.didShutdown = $0
} // }
} operation: { // } operation: {
@Dependency(\.sensorsClient) var client // @Dependency(\.sensorsClient) var client
let stream = try await client.listen(to: ["test"]) // let stream = try await client.listen(to: ["test"])
//
for await value in stream { // for await result in stream {
var buffer = value.payload // var buffer = result.buffer
guard let double = Double(buffer: &buffer) else { // guard let double = Double(buffer: &buffer) else {
XCTFail("Failed to decode double") // XCTFail("Failed to decode double")
return // return
} // }
//
XCTAssertEqual(double, 75) // XCTAssertEqual(double, 75)
XCTAssertEqual(value.topicName, "test") // XCTAssertEqual(result.topic, "test")
try await client.publish(26, to: "publish") // try await client.publish(26, to: "publish")
try await Task.sleep(for: .milliseconds(100)) // try await Task.sleep(for: .milliseconds(100))
client.shutdown() // client.shutdown()
} // }
//
XCTAssertEqual(capturedValues.values.count, 1) // XCTAssertEqual(capturedValues.values.count, 1)
XCTAssertEqual(capturedValues.values.first?.value, 26) // XCTAssertEqual(capturedValues.values.first?.value, 26)
XCTAssertEqual(capturedValues.values.first?.topic, "publish") // XCTAssertEqual(capturedValues.values.first?.topic, "publish")
XCTAssertTrue(capturedValues.didShutdown) // XCTAssertTrue(capturedValues.didShutdown)
} // }
} // }
//
// func testSensorCapturesPublishedState() async throws { // func testSensorCapturesPublishedState() async throws {
// let client = createClient(identifier: "testSensorCapturesPublishedState") // let client = createClient(identifier: "testSensorCapturesPublishedState")
// let mqtt = client.client // let mqtt = client.client
@@ -234,10 +221,47 @@ final class SensorsClientTests: XCTestCase {
// //
// await client.shutdown() // await client.shutdown()
// } // }
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
} }
// MARK: Helpers for tests. // MARK: Helpers for tests.
extension AsyncStream {
func first() async -> Element {
var first: Element
for await value in self {
first = value
}
return first
}
}
class PublishInfoContainer { class PublishInfoContainer {
private(set) var info: [MQTTPublishInfo] private(set) var info: [MQTTPublishInfo]
private var topicFilters: [String]? private var topicFilters: [String]?
@@ -258,41 +282,35 @@ class PublishInfoContainer {
} }
} }
extension SensorsClient { // extension SensorsClient {
//
static func testing( // static func testing(
yielding: [(value: Double, to: String)], // yielding: [(value: Double, to: String)],
capturePublishedValues: @escaping (Double, String) -> Void, // capturePublishedValues: @escaping (Double, String) -> Void,
captureShutdownEvent: @escaping (Bool) -> Void // captureShutdownEvent: @escaping (Bool) -> Void
) -> Self { // ) -> Self {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self) // let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
let logger = Logger(label: "\(Self.self).testing") // let logger = Logger(label: "\(Self.self).testing")
//
return .init( // return .init(
listen: { topics in // listen: { topics in
for (value, topic) in yielding where topics.contains(topic) { // for (value, topic) in yielding where topics.contains(topic) {
continuation.yield( // continuation.yield(
MQTTPublishInfo( // (buffer: ByteBuffer(string: "\(value)"), topic: topic)
qos: .atLeastOnce, // )
retain: true, // }
topicName: topic, // return stream
payload: ByteBuffer(string: "\(value)"), // },
properties: MQTTProperties() // logger: logger,
) // publish: { value, topic in
) // capturePublishedValues(value, topic)
} // },
return stream // shutdown: {
}, // captureShutdownEvent(true)
logger: logger, // continuation.finish()
publish: { value, topic in // }
capturePublishedValues(value, topic) // )
}, // }
shutdown: { // }
captureShutdownEvent(true)
continuation.finish()
}
)
}
}
struct TopicNotFoundError: Error {} struct TopicNotFoundError: Error {}

View File

@@ -1,47 +0,0 @@
import XCTest
import class Foundation.Bundle
//final class dewPoint_controllerTests: XCTestCase {
// func testExample() throws {
// // This is an example of a functional test case.
// // Use XCTAssert and related functions to verify your tests produce the correct
// // results.
//
// // Some of the APIs that we use below are available in macOS 10.13 and above.
// guard #available(macOS 10.13, *) else {
// return
// }
//
// // Mac Catalyst won't have `Process`, but it is supported for executables.
// #if !targetEnvironment(macCatalyst)
//
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
//
// let process = Process()
// process.executableURL = fooBinary
//
// let pipe = Pipe()
// process.standardOutput = pipe
//
// try process.run()
// process.waitUntilExit()
//
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
// let output = String(data: data, encoding: .utf8)
//
// XCTAssertEqual(output, "Hello, world!\n")
// #endif
// }
//
// /// Returns path to the built products directory.
// var productsDirectory: URL {
// #if os(macOS)
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
// return bundle.bundleURL.deletingLastPathComponent()
// }
// fatalError("couldn't find the products directory")
// #else
// return Bundle.main.bundleURL
// #endif
// }
//}

View File

@@ -14,7 +14,7 @@ services:
depends_on: depends_on:
- mosquitto - mosquitto
environment: environment:
- MOSQUITTO_SERVER=mosquitto - MQTT_HOST=mosquitto
test: test:
build: build: