10 Commits

17 changed files with 907 additions and 680 deletions

View File

@@ -8,3 +8,4 @@
--wrapconditions after-first
--typeblanklines preserve
--commas inline
--stripunusedargs closure-only

View File

@@ -11,4 +11,4 @@ RUN swift build --enable-test-discovery -c release -Xswiftc -g
FROM swift:5.10-slim
WORKDIR /run
COPY --from=build /build/.build/release/dewPoint-controller /run
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]

View File

@@ -23,7 +23,7 @@ stop-mosquitto:
@docker-compose rm -f mosquitto || true
test-docker:
@docker-compose run --remove-orphans -i --rm test
@docker-compose run --build --remove-orphans -i --rm test
@docker-compose kill mosquitto-test
@docker-compose rm -f

View File

@@ -8,17 +8,20 @@ let swiftSettings: [SwiftSetting] = [
]
let package = Package(
name: "dewPoint-controller",
name: "dewpoint-controller",
platforms: [
.macOS(.v14)
],
products: [
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
.library(name: "Models", targets: ["Models"]),
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"])
.library(name: "SensorsService", targets: ["SensorsService"]),
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
],
dependencies: [
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
@@ -28,20 +31,18 @@ let package = Package(
],
targets: [
.executableTarget(
name: "dewPoint-controller",
name: "dewpoint-controller",
dependencies: [
"Models",
"MQTTConnectionManager",
"MQTTConnectionService",
"SensorsService",
"TopicDependencies",
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.testTarget(
name: "dewPoint-controllerTests",
dependencies: ["dewPoint-controller"]
),
.target(
name: "Models",
dependencies: [
@@ -50,11 +51,21 @@ let package = Package(
],
swiftSettings: swiftSettings
),
.target(
name: "MQTTConnectionManager",
dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
),
.target(
name: "MQTTConnectionService",
dependencies: [
"Models",
.product(name: "MQTTNIO", package: "mqtt-nio"),
"MQTTConnectionManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
@@ -63,6 +74,7 @@ let package = Package(
name: "MQTTConnectionServiceTests",
dependencies: [
"MQTTConnectionService",
"MQTTConnectionManager",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
]
),
@@ -71,6 +83,7 @@ let package = Package(
dependencies: [
"Models",
"MQTTConnectionService",
"TopicDependencies",
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"),
@@ -84,6 +97,15 @@ let package = Package(
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.target(
name: "TopicDependencies",
dependencies: [
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
)
]
)

View File

@@ -0,0 +1,199 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Shutdown the connection to the MQTT broker.
///
/// - Note: You should cancel any tasks that are listening to the connection stream first.
public var shutdown: @Sendable () -> Void
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
public static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init {
try await manager.connect(cleanSession: cleanSession)
} shutdown: {
manager.shutdown()
} stream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
}
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
// MARK: - Helpers
final class MQTTConnectionStream: AsyncSequence, Sendable {
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
private let name: String
private let stream: AsyncStream<Element>
init(client: MQTTClient, logger: Logger?) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.logger = logger
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
func start(
isolation: isolated (any Actor)? = #isolation
) -> AsyncStream<Element> {
// Check if the client is active and yield the result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down.")
self.continuation.yield(.shuttingDown)
self.stop()
}
return stream
}
func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
public __consuming func makeAsyncIterator() -> AsyncIterator {
start().makeAsyncIterator()
}
}
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
self.client = client
self.logger = logger
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
// We should've already logged that we're shutting down if
// the manager was shutdown properly, so don't log it twice.
self.shutdown(withLogging: false)
}
private func setHasConnected() {
hasConnected = true
}
func connect(
isolation: isolated (any Actor)? = #isolation,
cleanSession: Bool
) async throws {
guard !(await hasConnected) else { return }
do {
try await client.connect(cleanSession: cleanSession)
await setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
}

View File

@@ -1,151 +1,37 @@
@preconcurrency import Foundation
import Dependencies
import Logging
import Models
import MQTTNIO
import NIO
import MQTTConnectionManager
import ServiceLifecycle
// TODO: This may not need to be an actor.
/// Manages the MQTT broker connection.
public actor MQTTConnectionService: Service {
@Dependency(\.mqttConnectionManager) var manager
private let cleanSession: Bool
public let client: MQTTClient
private let continuation: AsyncStream<Event>.Continuation
public nonisolated let events: AsyncStream<Event>
private let internalEventStream: ConnectionStream
nonisolated var logger: Logger { client.logger }
// private var shuttingDown = false
private nonisolated let logger: Logger?
public init(
cleanSession: Bool = true,
client: MQTTClient
logger: Logger? = nil
) {
self.cleanSession = cleanSession
self.client = client
self.internalEventStream = .init()
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
self.logger = logger
}
deinit {
self.logger.debug("MQTTConnectionService is gone.")
self.internalEventStream.stop()
continuation.finish()
}
/// The entry-point of the service.
///
/// This method connects to the MQTT broker and manages the connection.
/// It will attempt to gracefully shutdown the connection upon receiving
/// `sigterm` signals.
/// The entry-point of the service which starts the connection
/// to the MQTT broker and handles graceful shutdown of the
/// connection.
public func run() async throws {
await withGracefulShutdownHandler {
await withDiscardingTaskGroup { group in
group.addTask { await self.connect() }
group.addTask {
await self.internalEventStream.start { self.client.isActive() }
}
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() {
if event == .shuttingDown {
self.shutdown()
break
}
self.logger.trace("Sending connection event: \(event)")
self.continuation.yield(event)
}
group.cancelAll()
try await withGracefulShutdownHandler {
try await manager.connect()
for await event in try manager.stream().cancelOnGracefulShutdown() {
// We don't really need to do anything with the events, so just logging
// for now. But we need to iterate on an async stream for the service to
// continue to run and handle graceful shutdowns.
logger?.trace("Received connection event: \(event)")
}
// when we reach here we are shutting down, so we shutdown
// the manager.
manager.shutdown()
} onGracefulShutdown: {
self.logger.trace("Received graceful shutdown.")
self.shutdown()
self.logger?.trace("Received graceful shutdown.")
}
}
func connect() async {
do {
try await withThrowingDiscardingTaskGroup { group in
group.addTask {
try await self.client.connect(cleanSession: self.cleanSession)
}
client.addCloseListener(named: "\(Self.self)") { _ in
Task {
self.logger.debug("Connection closed.")
self.logger.debug("Reconnecting...")
await self.connect()
}
}
self.logger.debug("Connection successful.")
self.continuation.yield(.connected)
}
} catch {
logger.trace("Failed to connect: \(error)")
continuation.yield(.disconnected)
}
}
private nonisolated func shutdown() {
logger.debug("Begin shutting down MQTT broker connection.")
client.removeCloseListener(named: "\(Self.self)")
internalEventStream.stop()
_ = client.disconnect()
try? client.syncShutdownGracefully()
continuation.finish()
logger.info("MQTT broker connection closed.")
}
}
extension MQTTConnectionService {
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
// TODO: This functionality can probably move into the connection service.
private final class ConnectionStream: Sendable {
// private var cancellable: AnyCancellable?
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
let events: AsyncStream<MQTTConnectionService.Event>
init() {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
stop()
}
func start(isActive connectionIsActive: @escaping () -> Bool) async {
try? await Task.sleep(for: .seconds(1))
let event: MQTTConnectionService.Event = connectionIsActive()
? .connected
: .disconnected
continuation.yield(event)
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
// .autoconnect()
// .sink { [weak self] (_: Date) in
// let event: MQTTConnectionService.Event = connectionIsActive()
// ? .connected
// : .disconnected
//
// self?.continuation.yield(event)
// }
}
func stop() {
continuation.yield(.shuttingDown)
continuation.finish()
}
}
}

View File

@@ -56,10 +56,11 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var dewPoint: DewPoint? {
get async {
guard let temperature = temperature,
let humidity = humidity
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
// return .init(dryBulb: temperature, humidity: humidity)
}
}
@@ -67,12 +68,13 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var enthalpy: EnthalpyOf<MoistAir>? {
get async {
guard let temperature = temperature,
let humidity = humidity
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.enthalpy.moistAir(
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
)
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
}
}
@@ -129,10 +131,10 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
prefix = "\(prefix.dropLast())"
}
self.init(
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state"
dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
)
}
}

View File

@@ -3,7 +3,7 @@
/// This allows values to only publish changes if they have changed since the
/// last time they were recieved.
@propertyWrapper
public struct TrackedChanges<Value> {
public struct TrackedChanges<Value: Sendable>: Sendable {
/// The current tracking state.
private var tracking: TrackingState
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
private var value: Value
/// Used to check if a new value is equal to an old value.
private var isEqual: (Value, Value) -> Bool
private var isEqual: @Sendable (Value, Value) -> Bool
/// Access to the underlying property that we are wrapping.
public var wrappedValue: Value {
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
public init(
wrappedValue: Value,
needsProcessed: Bool = false,
isEqual: @escaping (Value, Value) -> Bool
isEqual: @escaping @Sendable (Value, Value) -> Bool
) {
self.value = wrappedValue
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
wrappedValue: Value,
needsProcessed: Bool = false
) {
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
$0 == $1
}
}
}
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
hasher.combine(needsProcessed)
}
}
extension TrackedChanges: Sendable where Value: Sendable {}

View File

@@ -1,6 +1,3 @@
import Logging
import Models
import MQTTNIO
import NIO
import NIOFoundationCompat
import PsychrometricClient

View File

@@ -3,305 +3,160 @@ import DependenciesMacros
import Foundation
import Logging
import Models
import MQTTConnectionService
@preconcurrency import MQTTNIO
import MQTTNIO
import NIO
import PsychrometricClient
import ServiceLifecycle
import TopicDependencies
@DependencyClient
public struct SensorsClient: Sendable {
/// Service that is responsible for listening to changes of the temperature and humidity
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
/// the sensor location.
///
///
public actor SensorsService: Service {
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo>
public var logger: Logger?
public var publish: @Sendable (Double, String) async throws -> Void
public var shutdown: @Sendable () -> Void = {}
@Dependency(\.topicListener) var topicListener
@Dependency(\.topicPublisher) var topicPublisher
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> {
try await listen(topics)
}
/// The logger to use for the service.
private let logger: Logger?
public func publish(_ value: Double, to topic: String) async throws {
try await publish(value, topic)
}
}
/// The sensors that we are listening for updates to, so
/// that we can calculate the dew-point temperature and enthalpy
/// values to publish back to the MQTT broker.
var sensors: [TemperatureAndHumiditySensor]
extension SensorsClient: TestDependencyKey {
public static var testValue: SensorsClient {
Self()
}
}
public extension DependencyValues {
var sensorsClient: SensorsClient {
get { self[SensorsClient.self] }
set { self[SensorsClient.self] = newValue }
}
}
public actor SensorsService2: Service {
@Dependency(\.sensorsClient) var client
private var sensors: [TemperatureAndHumiditySensor]
public init(
sensors: [TemperatureAndHumiditySensor]
) {
self.sensors = sensors
}
public func run() async throws {
guard sensors.count > 0 else {
throw SensorCountError()
}
let stream = try await client.listen(to: topics)
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
for await result in stream.cancelOnGracefulShutdown() {
group.addTask { try await self.handleResult(result) }
}
}
} onGracefulShutdown: {
Task {
await self.client.logger?.trace("Received graceful shutdown.")
try? await self.publishUpdates()
await self.client.shutdown()
}
}
} catch {
client.logger?.trace("Error: \(error)")
client.shutdown()
}
}
private var topics: [String] {
var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
}
private func handleResult(_ result: MQTTPublishInfo) async throws {
let topic = result.topicName
client.logger?.trace("Begin handling result for topic: \(topic)")
/// Create a new sensors service that listens to the passed in
/// sensors.
///
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
///
/// - Parameters:
/// - sensors: The sensors to listen for changes to.
/// - logger: An optional logger to use.
public init(
sensors: [TemperatureAndHumiditySensor],
logger: Logger? = nil
) {
self.sensors = sensors
self.logger = logger
}
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
var buffer = result.payload
return V(buffer: &buffer)
}
/// Start the service with graceful shutdown, which will attempt to publish
/// any pending changes to the MQTT broker, upon a shutdown signal.
public func run() async throws {
precondition(sensors.count > 0, "Sensors should not be empty.")
if topic.contains("temperature") {
client.logger?.trace("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else {
client.logger?.trace("Failed to decode temperature: \(result.payload)")
throw DecodingError()
let stream = try await makeStream()
await withGracefulShutdownHandler {
await withDiscardingTaskGroup { group in
for await result in stream.cancelOnGracefulShutdown() {
logger?.trace("Received result for topic: \(result.topic)")
group.addTask { await self.handleResult(result) }
}
// group.cancelAll()
}
client.logger?.trace("Decoded temperature: \(temperature)")
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
} else if topic.contains("humidity") {
client.logger?.trace("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
client.logger?.trace("Failed to decode humidity: \(result.payload)")
throw DecodingError()
} onGracefulShutdown: {
Task {
self.logger?.trace("Received graceful shutdown.")
try? await self.publishUpdates()
await self.topicListener.shutdown()
}
client.logger?.trace("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
} else {
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
return
}
}
try await publishUpdates()
client.logger?.trace("Done handling result for topic: \(topic)")
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
try await topicListener.listen(to: topics)
// ignore errors, so that we continue to listen, but log them
// for debugging purposes.
.compactMap { result in
switch result {
case let .failure(error):
self.logger?.trace("Received error listening for sensors: \(error)")
return nil
case let .success(info):
return (info.payload, info.topicName)
}
}
// ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently.
.removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic
}
.eraseToStream()
}
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
do {
let topic = result.topic
assert(topics.contains(topic))
logger?.trace("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
var buffer = result.buffer
return V(buffer: &buffer)
}
if topic.contains("temperature") {
logger?.trace("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else {
logger?.trace("Failed to decode temperature: \(result.buffer)")
throw DecodingError()
}
logger?.trace("Decoded temperature: \(temperature)")
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
} else if topic.contains("humidity") {
logger?.trace("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
logger?.trace("Failed to decode humidity: \(result.buffer)")
throw DecodingError()
}
logger?.trace("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
}
try await publishUpdates()
logger?.trace("Done handling result for topic: \(topic)")
} catch {
logger?.error("Received error: \(error)")
}
}
private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return }
try await client.publish(double, to: topic)
client.logger?.trace("Published update to topic: \(topic)")
try await topicPublisher.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(double)"),
qos: .exactlyOnce,
retain: true
)
logger?.trace("Published update to topic: \(topic)")
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
}
}
}
public actor SensorsService: Service {
private var sensors: [TemperatureAndHumiditySensor]
private let client: MQTTClient
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
nonisolated var logger: Logger { client.logger }
private var shuttingDown: Bool = false
public init(
client: MQTTClient,
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
sensors: [TemperatureAndHumiditySensor]
) {
self.client = client
self.events = events
self.sensors = sensors
}
/// The entry-point of the service.
///
/// This method is called to start the service and begin
/// listening for sensor value changes then publishing the dew-point
/// and enthalpy values of the sensors.
public func run() async throws {
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
client.addPublishListener(named: "\(Self.self)") { result in
if self.shuttingDown {
self.logger.trace("Shutting down.")
} else if !self.client.isActive() {
self.logger.trace("Client is not currently active")
} else {
Task { try await self.handleResult(result) }
}
}
for await event in self.events().cancelOnGracefulShutdown() {
logger.trace("Received event: \(event)")
if event == .shuttingDown {
self.setIsShuttingDown()
} else if event == .connected {
group.addTask { try await self.subscribeToSensors() }
} else {
group.addTask { await self.unsubscribeToSensors() }
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
}
}
}
} onGracefulShutdown: {
// do something.
self.logger.debug("Received graceful shutdown.")
Task { [weak self] in await self?.setIsShuttingDown() }
}
} catch {
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
// to ignore the `noConnection` error.
logger.trace("Run error: \(error)")
// throw error
}
}
private func setIsShuttingDown() {
logger.debug("Received shut down event.")
Task { try await publishUpdates() }
Task { await self.unsubscribeToSensors() }
shuttingDown = true
client.removePublishListener(named: "\(Self.self)")
}
private func handleResult(
_ result: Result<MQTTPublishInfo, any Error>
) async throws {
logger.trace("Begin handling result")
do {
switch result {
case let .failure(error):
logger.debug("Failed receiving sensor: \(error)")
throw error
case let .success(value):
// do something.
let topic = value.topicName
logger.trace("Received new value for topic: \(topic)")
if topic.contains("temperature") {
// do something.
var buffer = value.payload
guard let temperature = DryBulb(buffer: &buffer) else {
logger.trace("Decoding error for topic: \(topic)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
try await publishUpdates()
} else if topic.contains("humidity") {
var buffer = value.payload
// Decode and update the temperature value
guard let humidity = RelativeHumidity(buffer: &buffer) else {
logger.debug("Failed to decode humidity from buffer: \(buffer)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
try await publishUpdates()
}
}
} catch {
logger.trace("Handle Result error: \(error)")
throw error
}
}
private func subscribeToSensors() async throws {
for sensor in sensors {
_ = try await client.subscribe(to: [
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
])
logger.debug("Subscribed to sensor: \(sensor.location)")
}
}
private func unsubscribeToSensors() async {
logger.trace("Begin unsubscribe to sensors.")
guard client.isActive() else {
logger.debug("Client is not active, skipping.")
return
}
do {
let topics = sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
try await client.unsubscribe(from: topics)
logger.trace("Unsubscribed from sensors.")
} catch {
logger.trace("Unsubscribe error: \(error)")
}
}
private func publish(double: Double?, to topic: String) async throws {
guard client.isActive() else { return }
guard let double else { return }
let rounded = round(double * 100) / 100
logger.debug("Publishing \(rounded), to: \(topic)")
try await client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
qos: .exactlyOnce,
retain: true
)
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
try sensors.hasProcessed(sensor)
}
}
}
// MARK: - Errors
struct DecodingError: Error {}
struct MQTTClientNotConnected: Error {}
struct NotFoundError: Error {}
struct SensorExists: Error {}
struct SensorCountError: Error {}
struct SensorNotFoundError: Error {}
// MARK: - Helpers
@@ -319,14 +174,14 @@ private extension Array where Element == TemperatureAndHumiditySensor {
with value: V
) throws {
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
throw NotFoundError()
throw SensorNotFoundError()
}
self[index][keyPath: keyPath] = value
}
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
throw NotFoundError()
throw SensorNotFoundError()
}
self[index].needsProcessed = false
}

View File

@@ -0,0 +1,183 @@
import Dependencies
import DependenciesMacros
import Foundation
import MQTTNIO
/// A dependency that can generate an async stream of changes to the given topics.
///
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
/// to generate the live dependency.
@DependencyClient
public struct TopicListener: Sendable {
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, MQTTListenResultError>>
/// Create an async stream that listens for changes to the given topics.
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
/// Shutdown the listener stream.
public var shutdown: @Sendable () -> Void
/// Create a new topic listener.
///
/// - Parameters:
/// - listen: Generate an async stream of changes for the given topics.
/// - shutdown: Shutdown the topic listener stream.
public init(
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
shutdown: @Sendable @escaping () -> Void
) {
self._listen = listen
self.shutdown = shutdown
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await listen(to: topics, qos: qos)
}
/// Create the live implementation of the topic listener with the given MQTTClient.
///
/// - Parameters:
/// - client: The MQTTClient to use.
public static func live(client: MQTTClient) -> Self {
let listener = MQTTTopicListener(client: client)
return .init(
listen: { try await listener.listen($0, $1) },
shutdown: { listener.shutdown() }
)
}
}
extension TopicListener: TestDependencyKey {
public static var testValue: TopicListener { Self() }
}
public extension DependencyValues {
var topicListener: TopicListener {
get { self[TopicListener.self] }
set { self[TopicListener.self] = newValue }
}
}
// MARK: - Helpers
private actor MQTTTopicListener {
private let client: MQTTClient
private let continuation: TopicListener.Stream.Continuation
private let name: String
let stream: TopicListener.Stream
private var shuttingDown: Bool = false
init(
client: MQTTClient
) {
let (stream, continuation) = TopicListener.Stream.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws(TopicListenerError) -> TopicListener.Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw .connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
let subscription = try? await client.subscribe(to: topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
})
guard subscription != nil else {
client.logger.error("Error subscribing to topics: \(topics)")
throw .failedToSubscribe
}
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.client.logger.error("Received error while listening: \(error)")
self.continuation.yield(.failure(.init(error)))
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(.success(publishInfo))
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
}
nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task { await self.setIsShuttingDown() }
}
}
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -0,0 +1,117 @@
import Dependencies
import DependenciesMacros
import MQTTNIO
import NIO
/// A dependency that is responsible for publishing values to an MQTT broker.
///
/// - Note: This dependency only conforms to `TestDependencyKey` because it
/// requires an active `MQTTClient` to generate the live dependency.
@DependencyClient
public struct TopicPublisher: Sendable {
private var _publish: @Sendable (PublishRequest) async throws -> Void
/// Create a new topic publisher.
///
/// - Parameters:
/// - publish: Handle the publish request.
public init(
publish: @Sendable @escaping (PublishRequest) async throws -> Void
) {
self._publish = publish
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - topicName: The topic to publish the new value to.
/// - payload: The value to publish.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
to topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await _publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Create the live topic publisher with the given `MQTTClient`.
///
/// - Parameters:
/// - client: The mqtt broker client to use.
public static func live(client: MQTTClient) -> Self {
.init(
publish: { request in
guard client.isActive() else {
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
return
}
client.logger.trace("Begin publishing to topic: \(request.topicName)")
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
}
)
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
extension TopicPublisher: TestDependencyKey {
public static var testValue: TopicPublisher { Self() }
}
public extension DependencyValues {
/// A dependency that is responsible for publishing values to an MQTT broker.
var topicPublisher: TopicPublisher {
get { self[TopicPublisher.self] }
set { self[TopicPublisher.self] = newValue }
}
}

View File

@@ -1,15 +1,19 @@
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
import TopicDependencies
@main
struct Application {
/// The main entry point of the application.
static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
@@ -31,23 +35,37 @@ struct Application {
logger: logger
)
let mqttConnection = MQTTConnectionService(client: mqtt)
let sensors = SensorsService(
client: mqtt,
events: { mqttConnection.events },
sensors: .live
)
do {
try await withDependencies {
$0.psychrometricClient = .liveValue
$0.topicListener = .live(client: mqtt)
$0.topicPublisher = .live(client: mqtt)
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
let serviceGroup = ServiceGroup(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
try await serviceGroup.run()
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
try await serviceGroup.run()
}
try await mqtt.shutdown()
try await eventloopGroup.shutdownGracefully()
} catch {
try await eventloopGroup.shutdownGracefully()
}
}
}

View File

@@ -1,7 +1,7 @@
import Combine
import Logging
import Models
@testable import MQTTConnectionService
@testable import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import ServiceLifecycle
@@ -13,22 +13,59 @@ final class MQTTConnectionServiceTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
var logger = Logger(label: "MQTTConnectionServiceTests")
logger.logLevel = .trace
return logger
}()
func testGracefulShutdownWorks() async throws {
try await testGracefulShutdown { trigger in
let client = createClient(identifier: "testGracefulShutdown")
let service = MQTTConnectionService(client: client)
try await service.run()
try await Task.sleep(for: .seconds(1))
XCTAssert(client.isActive())
trigger.triggerGracefulShutdown()
// try await Task.sleep(for: .seconds(2))
// XCTAssertFalse(client.isActive())
// func testGracefulShutdownWorks() async throws {
// let client = createClient(identifier: "testGracefulShutdown")
// let service = MQTTConnectionService(client: client)
// await service.connect()
// try await Task.sleep(for: .seconds(1))
// XCTAssert(client.isActive())
// service.shutdown()
// XCTAssertFalse(client.isActive())
// }
func testWhatHappensIfConnectIsCalledMultipleTimes() async throws {
let client = createClient(identifier: "testWhatHappensIfConnectIsCalledMultipleTimes")
let manager = MQTTConnectionManager.live(client: client)
try await manager.connect()
try await manager.connect()
}
// TODO: Move to integration tests.
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
let stream = MQTTConnectionStream(client: client, logger: Self.logger)
var events = [MQTTConnectionManager.Event]()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
manager.shutdown()
try await client.disconnect()
try await Task.sleep(for: .milliseconds(200))
try await client.shutdown()
try await Task.sleep(for: .milliseconds(200))
stream.stop()
}
for await event in stream.removeDuplicates() {
events.append(event)
}
XCTAssertEqual(events, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func createClient(identifier: String) -> MQTTClient {
@@ -58,65 +95,4 @@ final class MQTTConnectionServiceTests: XCTestCase {
)
}
func testEventStream() async throws {
var connection: ConnectionStream? = ConnectionStream()
let task = Task {
guard let events = connection?.events else { return }
print("before loop")
for await event in events {
print("\(event)")
}
print("after loop")
}
let ending = Task {
try await Task.sleep(for: .seconds(2))
connection = nil
}
connection?.start()
try await ending.value
task.cancel()
}
}
class ConnectionStream {
enum Event {
case connected
case disconnected
case shuttingDown
}
let events: AsyncStream<Event>
private let continuation: AsyncStream<Event>.Continuation
private var cancellable: AnyCancellable?
init() {
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
print("connection stream is gone.")
stop()
}
func start() {
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.sink { [weak self] _ in
print("will send event.")
self?.continuation.yield(.connected)
}
}
func stop() {
continuation.yield(.shuttingDown)
cancellable = nil
continuation.finish()
}
}

View File

@@ -5,6 +5,7 @@ import MQTTNIO
import NIO
import PsychrometricClientLive
@testable import SensorsService
import TopicDependencies
import XCTest
final class SensorsClientTests: XCTestCase {
@@ -12,7 +13,7 @@ final class SensorsClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
var logger = Logger(label: "SensorsClientTests")
logger.logLevel = .debug
return logger
}()
@@ -25,42 +26,28 @@ final class SensorsClientTests: XCTestCase {
}
}
// func createClient(identifier: String) -> SensorsClient {
// let envVars = EnvVars(
// appEnv: .testing,
// host: Self.hostname,
// port: "1883",
// identifier: identifier,
// userName: nil,
// password: nil
// )
// return .init(envVars: envVars, logger: Self.logger)
// }
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
func testWhatHappensIfClientDisconnectsWhileListening() async throws {
let client = createClient(identifier: "testWhatHappensIfClientDisconnectsWhileListening")
let listener = TopicListener.live(client: client)
try await client.connect()
let stream = try await listener.listen("/some/topic")
// try await Task.sleep(for: .seconds(1))
// try await client.disconnect()
//
// try await client.connect()
// try await Task.sleep(for: .seconds(1))
try await client.publish(
to: "/some/topic",
payload: ByteBufferAllocator().buffer(string: "Foo"),
qos: .atLeastOnce,
retain: true
)
try await Task.sleep(for: .seconds(1))
listener.shutdown()
try await client.shutdown()
}
// func testConnectAndShutdown() async throws {
@@ -132,52 +119,52 @@ final class SensorsClientTests: XCTestCase {
// try await mqtt.shutdown()
// }
func testCapturingSensorClient() async throws {
class CapturedValues {
var values = [(value: Double, topic: String)]()
var didShutdown = false
init() {}
}
let capturedValues = CapturedValues()
try await withDependencies {
$0.sensorsClient = .testing(
yielding: [
(value: 76, to: "not-listening"),
(value: 75, to: "test")
]
) { value, topic in
capturedValues.values.append((value, topic))
} captureShutdownEvent: {
capturedValues.didShutdown = $0
}
} operation: {
@Dependency(\.sensorsClient) var client
let stream = try await client.listen(to: ["test"])
for await value in stream {
var buffer = value.payload
guard let double = Double(buffer: &buffer) else {
XCTFail("Failed to decode double")
return
}
XCTAssertEqual(double, 75)
XCTAssertEqual(value.topicName, "test")
try await client.publish(26, to: "publish")
try await Task.sleep(for: .milliseconds(100))
client.shutdown()
}
XCTAssertEqual(capturedValues.values.count, 1)
XCTAssertEqual(capturedValues.values.first?.value, 26)
XCTAssertEqual(capturedValues.values.first?.topic, "publish")
XCTAssertTrue(capturedValues.didShutdown)
}
}
// func testCapturingSensorClient() async throws {
// class CapturedValues {
// var values = [(value: Double, topic: String)]()
// var didShutdown = false
//
// init() {}
// }
//
// let capturedValues = CapturedValues()
//
// try await withDependencies {
// $0.sensorsClient = .testing(
// yielding: [
// (value: 76, to: "not-listening"),
// (value: 75, to: "test")
// ]
// ) { value, topic in
// capturedValues.values.append((value, topic))
// } captureShutdownEvent: {
// capturedValues.didShutdown = $0
// }
// } operation: {
// @Dependency(\.sensorsClient) var client
// let stream = try await client.listen(to: ["test"])
//
// for await result in stream {
// var buffer = result.buffer
// guard let double = Double(buffer: &buffer) else {
// XCTFail("Failed to decode double")
// return
// }
//
// XCTAssertEqual(double, 75)
// XCTAssertEqual(result.topic, "test")
// try await client.publish(26, to: "publish")
// try await Task.sleep(for: .milliseconds(100))
// client.shutdown()
// }
//
// XCTAssertEqual(capturedValues.values.count, 1)
// XCTAssertEqual(capturedValues.values.first?.value, 26)
// XCTAssertEqual(capturedValues.values.first?.topic, "publish")
// XCTAssertTrue(capturedValues.didShutdown)
// }
// }
//
// func testSensorCapturesPublishedState() async throws {
// let client = createClient(identifier: "testSensorCapturesPublishedState")
// let mqtt = client.client
@@ -234,10 +221,47 @@ final class SensorsClientTests: XCTestCase {
//
// await client.shutdown()
// }
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
}
// MARK: Helpers for tests.
extension AsyncStream {
func first() async -> Element {
var first: Element
for await value in self {
first = value
}
return first
}
}
class PublishInfoContainer {
private(set) var info: [MQTTPublishInfo]
private var topicFilters: [String]?
@@ -258,41 +282,35 @@ class PublishInfoContainer {
}
}
extension SensorsClient {
static func testing(
yielding: [(value: Double, to: String)],
capturePublishedValues: @escaping (Double, String) -> Void,
captureShutdownEvent: @escaping (Bool) -> Void
) -> Self {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self)
let logger = Logger(label: "\(Self.self).testing")
return .init(
listen: { topics in
for (value, topic) in yielding where topics.contains(topic) {
continuation.yield(
MQTTPublishInfo(
qos: .atLeastOnce,
retain: true,
topicName: topic,
payload: ByteBuffer(string: "\(value)"),
properties: MQTTProperties()
)
)
}
return stream
},
logger: logger,
publish: { value, topic in
capturePublishedValues(value, topic)
},
shutdown: {
captureShutdownEvent(true)
continuation.finish()
}
)
}
}
// extension SensorsClient {
//
// static func testing(
// yielding: [(value: Double, to: String)],
// capturePublishedValues: @escaping (Double, String) -> Void,
// captureShutdownEvent: @escaping (Bool) -> Void
// ) -> Self {
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
// let logger = Logger(label: "\(Self.self).testing")
//
// return .init(
// listen: { topics in
// for (value, topic) in yielding where topics.contains(topic) {
// continuation.yield(
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
// )
// }
// return stream
// },
// logger: logger,
// publish: { value, topic in
// capturePublishedValues(value, topic)
// },
// shutdown: {
// captureShutdownEvent(true)
// continuation.finish()
// }
// )
// }
// }
struct TopicNotFoundError: Error {}

View File

@@ -1,47 +0,0 @@
import XCTest
import class Foundation.Bundle
//final class dewPoint_controllerTests: XCTestCase {
// func testExample() throws {
// // This is an example of a functional test case.
// // Use XCTAssert and related functions to verify your tests produce the correct
// // results.
//
// // Some of the APIs that we use below are available in macOS 10.13 and above.
// guard #available(macOS 10.13, *) else {
// return
// }
//
// // Mac Catalyst won't have `Process`, but it is supported for executables.
// #if !targetEnvironment(macCatalyst)
//
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
//
// let process = Process()
// process.executableURL = fooBinary
//
// let pipe = Pipe()
// process.standardOutput = pipe
//
// try process.run()
// process.waitUntilExit()
//
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
// let output = String(data: data, encoding: .utf8)
//
// XCTAssertEqual(output, "Hello, world!\n")
// #endif
// }
//
// /// Returns path to the built products directory.
// var productsDirectory: URL {
// #if os(macOS)
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
// return bundle.bundleURL.deletingLastPathComponent()
// }
// fatalError("couldn't find the products directory")
// #else
// return Bundle.main.bundleURL
// #endif
// }
//}

View File

@@ -14,7 +14,7 @@ services:
depends_on:
- mosquitto
environment:
- MOSQUITTO_SERVER=mosquitto
- MQTT_HOST=mosquitto
test:
build: