13 Commits

Author SHA1 Message Date
ce327a6f1c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 15:11:29 -05:00
95f8565cde feat: Adds initial CI
Some checks failed
CI / Test (push) Failing after 39s
2024-11-14 15:05:09 -05:00
163f603b69 feat: Fixes some tests and docker builds 2024-11-14 14:58:09 -05:00
e7a849b003 feat: Adding more tests 2024-11-14 07:43:40 -05:00
bd2a798320 feat: Seperates connection stream and moves connection manager out of the connection service module. 2024-11-13 17:12:56 -05:00
b8992b89b6 feat: Adds MQTT connection stream, need to clean up the manager and remove stream from it. 2024-11-13 10:06:28 -05:00
efd9907b4a feat: Cleans up some of the shutdown logic so that the MQTTClient is disconnected properly. 2024-11-12 22:19:09 -05:00
fbbd65f7ae feat: Cleaning up some unused code. 2024-11-12 21:18:02 -05:00
8067331ff8 feat: Removes sensor client in favor of more generic topic listener and publisher 2024-11-12 16:42:14 -05:00
b6db9b5322 feat: Begins breaking out topic listener and publisher as it's own dependency. 2024-11-12 11:12:34 -05:00
bf1126b06a feat: Adds MQTTConnectionManagerLive module. 2024-11-11 22:00:14 -05:00
ef552fb8bc feat: Removes unused files 2024-11-11 16:28:11 -05:00
1e62d7aac0 feat: Adds sensor client dependency 2024-11-11 15:23:45 -05:00
19 changed files with 923 additions and 788 deletions

14
.gitea/workflows/ci.yaml Normal file
View File

@@ -0,0 +1,14 @@
---
name: CI
on:
push:
pull_request:
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Test
run: make test

View File

@@ -8,3 +8,4 @@
--wrapconditions after-first
--typeblanklines preserve
--commas inline
--stripunusedargs closure-only

View File

@@ -5,10 +5,10 @@ WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build --enable-test-discovery -c release -Xswiftc -g
RUN swift build -c release -Xswiftc -g
# Run image
FROM swift:5.10-slim
WORKDIR /run
COPY --from=build /build/.build/release/dewPoint-controller /run
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]
COPY --from=build /build/.build/release/dewpoint-controller /run
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]

View File

@@ -23,7 +23,7 @@ stop-mosquitto:
@docker-compose rm -f mosquitto || true
test-docker:
@docker-compose run --remove-orphans -i --rm test
@docker-compose run --build --remove-orphans -i --rm test
@docker-compose kill mosquitto-test
@docker-compose rm -f

View File

@@ -1,5 +1,5 @@
{
"originHash" : "d3104d51323f6bc98cf3ab2930e5a26f72c9a4fdb7640360ba27628672397841",
"originHash" : "c2538e3229d6c80f3d6a979c2f3605d63b4973e1e49786819017473eb2916f4e",
"pins" : [
{
"identity" : "combine-schedulers",
@@ -69,8 +69,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-dependencies",
"state" : {
"revision" : "0fc0255e780bf742abeef29dec80924f5f0ae7b9",
"version" : "1.4.1"
"revision" : "96eecd47660e8307877acb8c41cc5295ba7350a7",
"version" : "1.5.2"
}
},
{

View File

@@ -8,40 +8,41 @@ let swiftSettings: [SwiftSetting] = [
]
let package = Package(
name: "dewPoint-controller",
name: "dewpoint-controller",
platforms: [
.macOS(.v14)
],
products: [
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
.library(name: "Models", targets: ["Models"]),
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"])
.library(name: "SensorsService", targets: ["SensorsService"]),
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
],
dependencies: [
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"),
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
],
targets: [
.executableTarget(
name: "dewPoint-controller",
name: "dewpoint-controller",
dependencies: [
"Models",
"MQTTConnectionManager",
"MQTTConnectionService",
"SensorsService",
"TopicDependencies",
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.testTarget(
name: "dewPoint-controllerTests",
dependencies: ["dewPoint-controller"]
),
.target(
name: "Models",
dependencies: [
@@ -50,11 +51,21 @@ let package = Package(
],
swiftSettings: swiftSettings
),
.target(
name: "MQTTConnectionManager",
dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
),
.target(
name: "MQTTConnectionService",
dependencies: [
"Models",
.product(name: "MQTTNIO", package: "mqtt-nio"),
"MQTTConnectionManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
@@ -63,6 +74,7 @@ let package = Package(
name: "MQTTConnectionServiceTests",
dependencies: [
"MQTTConnectionService",
"MQTTConnectionManager",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
]
),
@@ -70,7 +82,8 @@ let package = Package(
name: "SensorsService",
dependencies: [
"Models",
"MQTTConnectionService",
"MQTTConnectionManager",
"TopicDependencies",
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"),
@@ -84,6 +97,15 @@ let package = Package(
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.target(
name: "TopicDependencies",
dependencies: [
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
)
]
)

View File

@@ -0,0 +1,221 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Shutdown the connection to the MQTT broker.
///
/// - Note: You should cancel any tasks that are listening to the connection stream first.
public var shutdown: @Sendable () -> Void
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
public static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init {
try await manager.connect(cleanSession: cleanSession)
} shutdown: {
manager.shutdown()
} stream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
} _withClient: { callback in
try await callback(client)
}
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
// MARK: - Helpers
@_spi(Internal)
public final actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
private let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.logger = logger
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
self.client = client
self.logger = logger
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
// We should've already logged that we're shutting down if
// the manager was shutdown properly, so don't log it twice.
self.shutdown(withLogging: false)
}
private func setHasConnected() {
hasConnected = true
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
}

View File

@@ -1,151 +1,37 @@
@preconcurrency import Foundation
import Dependencies
import Logging
import Models
import MQTTNIO
import NIO
import MQTTConnectionManager
import ServiceLifecycle
// TODO: This may not need to be an actor.
/// Manages the MQTT broker connection.
public actor MQTTConnectionService: Service {
@Dependency(\.mqttConnectionManager) var manager
private let cleanSession: Bool
public let client: MQTTClient
private let continuation: AsyncStream<Event>.Continuation
public nonisolated let events: AsyncStream<Event>
private let internalEventStream: ConnectionStream
nonisolated var logger: Logger { client.logger }
// private var shuttingDown = false
private nonisolated let logger: Logger?
public init(
cleanSession: Bool = true,
client: MQTTClient
logger: Logger? = nil
) {
self.cleanSession = cleanSession
self.client = client
self.internalEventStream = .init()
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
self.logger = logger
}
deinit {
self.logger.debug("MQTTConnectionService is gone.")
self.internalEventStream.stop()
continuation.finish()
}
/// The entry-point of the service.
///
/// This method connects to the MQTT broker and manages the connection.
/// It will attempt to gracefully shutdown the connection upon receiving
/// `sigterm` signals.
/// The entry-point of the service which starts the connection
/// to the MQTT broker and handles graceful shutdown of the
/// connection.
public func run() async throws {
await withGracefulShutdownHandler {
await withDiscardingTaskGroup { group in
group.addTask { await self.connect() }
group.addTask {
await self.internalEventStream.start { self.client.isActive() }
}
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() {
if event == .shuttingDown {
self.shutdown()
break
}
self.logger.trace("Sending connection event: \(event)")
self.continuation.yield(event)
}
group.cancelAll()
try await withGracefulShutdownHandler {
try await manager.connect()
for await event in try manager.stream().cancelOnGracefulShutdown() {
// We don't really need to do anything with the events, so just logging
// for now. But we need to iterate on an async stream for the service to
// continue to run and handle graceful shutdowns.
logger?.trace("Received connection event: \(event)")
}
// when we reach here we are shutting down, so we shutdown
// the manager.
manager.shutdown()
} onGracefulShutdown: {
self.logger.trace("Received graceful shutdown.")
self.shutdown()
self.logger?.trace("Received graceful shutdown.")
}
}
func connect() async {
do {
try await withThrowingDiscardingTaskGroup { group in
group.addTask {
try await self.client.connect(cleanSession: self.cleanSession)
}
client.addCloseListener(named: "\(Self.self)") { _ in
Task {
self.logger.debug("Connection closed.")
self.logger.debug("Reconnecting...")
await self.connect()
}
}
self.logger.debug("Connection successful.")
self.continuation.yield(.connected)
}
} catch {
logger.trace("Failed to connect: \(error)")
continuation.yield(.disconnected)
}
}
private nonisolated func shutdown() {
logger.debug("Begin shutting down MQTT broker connection.")
client.removeCloseListener(named: "\(Self.self)")
internalEventStream.stop()
_ = client.disconnect()
try? client.syncShutdownGracefully()
continuation.finish()
logger.info("MQTT broker connection closed.")
}
}
extension MQTTConnectionService {
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
// TODO: This functionality can probably move into the connection service.
private final class ConnectionStream: Sendable {
// private var cancellable: AnyCancellable?
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
let events: AsyncStream<MQTTConnectionService.Event>
init() {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
stop()
}
func start(isActive connectionIsActive: @escaping () -> Bool) async {
try? await Task.sleep(for: .seconds(1))
let event: MQTTConnectionService.Event = connectionIsActive()
? .connected
: .disconnected
continuation.yield(event)
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
// .autoconnect()
// .sink { [weak self] (_: Date) in
// let event: MQTTConnectionService.Event = connectionIsActive()
// ? .connected
// : .disconnected
//
// self?.continuation.yield(event)
// }
}
func stop() {
continuation.yield(.shuttingDown)
continuation.finish()
}
}
}

View File

@@ -56,10 +56,11 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var dewPoint: DewPoint? {
get async {
guard let temperature = temperature,
let humidity = humidity
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
// return .init(dryBulb: temperature, humidity: humidity)
}
}
@@ -67,12 +68,13 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var enthalpy: EnthalpyOf<MoistAir>? {
get async {
guard let temperature = temperature,
let humidity = humidity
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.enthalpy.moistAir(
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
)
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
}
}
@@ -129,10 +131,10 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
prefix = "\(prefix.dropLast())"
}
self.init(
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state"
dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
)
}
}

View File

@@ -3,7 +3,7 @@
/// This allows values to only publish changes if they have changed since the
/// last time they were recieved.
@propertyWrapper
public struct TrackedChanges<Value> {
public struct TrackedChanges<Value: Sendable>: Sendable {
/// The current tracking state.
private var tracking: TrackingState
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
private var value: Value
/// Used to check if a new value is equal to an old value.
private var isEqual: (Value, Value) -> Bool
private var isEqual: @Sendable (Value, Value) -> Bool
/// Access to the underlying property that we are wrapping.
public var wrappedValue: Value {
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
public init(
wrappedValue: Value,
needsProcessed: Bool = false,
isEqual: @escaping (Value, Value) -> Bool
isEqual: @escaping @Sendable (Value, Value) -> Bool
) {
self.value = wrappedValue
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
wrappedValue: Value,
needsProcessed: Bool = false
) {
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
$0 == $1
}
}
}
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
hasher.combine(needsProcessed)
}
}
extension TrackedChanges: Sendable where Value: Sendable {}

View File

@@ -1,6 +1,3 @@
import Logging
import Models
import MQTTNIO
import NIO
import NIOFoundationCompat
import PsychrometricClient

View File

@@ -3,305 +3,178 @@ import DependenciesMacros
import Foundation
import Logging
import Models
import MQTTConnectionService
@preconcurrency import MQTTNIO
import MQTTConnectionManager
import MQTTNIO
import NIO
import PsychrometricClient
import ServiceLifecycle
import TopicDependencies
@DependencyClient
public struct SensorsClient: Sendable {
/// Service that is responsible for listening to changes of the temperature and humidity
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
/// the sensor location.
///
///
public actor SensorsService: Service {
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo>
public var logger: Logger?
public var publish: @Sendable (Double, String) async throws -> Void
public var shutdown: @Sendable () -> Void = {}
@Dependency(\.mqttConnectionManager.stream) var connectionStream
@Dependency(\.topicListener) var topicListener
@Dependency(\.topicPublisher) var topicPublisher
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> {
try await listen(topics)
}
/// The logger to use for the service.
private let logger: Logger?
public func publish(_ value: Double, to topic: String) async throws {
try await publish(value, topic)
}
}
/// The sensors that we are listening for updates to, so
/// that we can calculate the dew-point temperature and enthalpy
/// values to publish back to the MQTT broker.
var sensors: [TemperatureAndHumiditySensor]
extension SensorsClient: TestDependencyKey {
public static var testValue: SensorsClient {
Self()
}
}
public extension DependencyValues {
var sensorsClient: SensorsClient {
get { self[SensorsClient.self] }
set { self[SensorsClient.self] = newValue }
}
}
public actor SensorsService2: Service {
@Dependency(\.sensorsClient) var client
private var sensors: [TemperatureAndHumiditySensor]
public init(
sensors: [TemperatureAndHumiditySensor]
) {
self.sensors = sensors
}
public func run() async throws {
guard sensors.count > 0 else {
throw SensorCountError()
}
let stream = try await client.listen(to: topics)
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
for await result in stream.cancelOnGracefulShutdown() {
group.addTask { try await self.handleResult(result) }
}
}
} onGracefulShutdown: {
Task {
await self.client.logger?.trace("Received graceful shutdown.")
try? await self.publishUpdates()
await self.client.shutdown()
}
}
} catch {
client.logger?.trace("Error: \(error)")
client.shutdown()
}
}
private var topics: [String] {
var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
}
private func handleResult(_ result: MQTTPublishInfo) async throws {
let topic = result.topicName
client.logger?.trace("Begin handling result for topic: \(topic)")
/// Create a new sensors service that listens to the passed in
/// sensors.
///
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
///
/// - Parameters:
/// - sensors: The sensors to listen for changes to.
/// - logger: An optional logger to use.
public init(
sensors: [TemperatureAndHumiditySensor],
logger: Logger? = nil
) {
self.sensors = sensors
self.logger = logger
}
/// Start the service with graceful shutdown, which will attempt to publish
/// any pending changes to the MQTT broker, upon a shutdown signal.
public func run() async throws {
precondition(sensors.count > 0, "Sensors should not be empty.")
try await withGracefulShutdownHandler {
// Listen for connection events, so that we can automatically
// reconnect any sensor topics we're listening to upon a disconnect / reconnect
// event. We can also shutdown any topic listeners upon a shutdown event.
for await event in try connectionStream().cancelOnGracefulShutdown() {
switch event {
case .shuttingDown:
logger?.debug("Received shutdown event.")
try await self.shutdown()
case .disconnected:
logger?.debug("Received disconnected event.")
try await Task.sleep(for: .milliseconds(100))
case .connected:
logger?.debug("Received connected event.")
let stream = try await makeStream()
for await result in stream.cancelOnGracefulShutdown() {
logger?.debug("Received result for topic: \(result.topic)")
await self.handleResult(result)
}
}
}
} onGracefulShutdown: {
Task {
self.logger?.debug("Received graceful shutdown.")
try await self.shutdown()
}
}
}
@_spi(Internal)
public func shutdown() async throws {
try await publishUpdates()
topicListener.shutdown()
}
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
try await topicListener.listen(to: topics)
// ignore errors, so that we continue to listen, but log them
// for debugging purposes.
.compactMap { result in
switch result {
case let .failure(error):
self.logger?.debug("Received error listening for sensors: \(error)")
return nil
case let .success(info):
return (info.payload, info.topicName)
}
}
// ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently.
.removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic
}
.eraseToStream()
}
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
do {
let topic = result.topic
assert(topics.contains(topic))
logger?.debug("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
var buffer = result.payload
var buffer = result.buffer
return V(buffer: &buffer)
}
if topic.contains("temperature") {
client.logger?.trace("Begin handling temperature result.")
logger?.debug("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else {
client.logger?.trace("Failed to decode temperature: \(result.payload)")
logger?.debug("Failed to decode temperature: \(result.buffer)")
throw DecodingError()
}
client.logger?.trace("Decoded temperature: \(temperature)")
logger?.debug("Decoded temperature: \(temperature)")
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
} else if topic.contains("humidity") {
client.logger?.trace("Begin handling humidity result.")
logger?.debug("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
client.logger?.trace("Failed to decode humidity: \(result.payload)")
logger?.debug("Failed to decode humidity: \(result.buffer)")
throw DecodingError()
}
client.logger?.trace("Decoded humidity: \(humidity)")
logger?.debug("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
} else {
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
return
}
try await publishUpdates()
client.logger?.trace("Done handling result for topic: \(topic)")
logger?.debug("Done handling result for topic: \(topic)")
} catch {
logger?.error("Received error while handling result: \(error)")
}
}
private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return }
try await client.publish(double, to: topic)
client.logger?.trace("Published update to topic: \(topic)")
try await topicPublisher.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(double)"),
qos: .exactlyOnce,
retain: true
)
logger?.debug("Published update to topic: \(topic)")
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
}
}
}
public actor SensorsService: Service {
private var sensors: [TemperatureAndHumiditySensor]
private let client: MQTTClient
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
nonisolated var logger: Logger { client.logger }
private var shuttingDown: Bool = false
public init(
client: MQTTClient,
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
sensors: [TemperatureAndHumiditySensor]
) {
self.client = client
self.events = events
self.sensors = sensors
}
/// The entry-point of the service.
///
/// This method is called to start the service and begin
/// listening for sensor value changes then publishing the dew-point
/// and enthalpy values of the sensors.
public func run() async throws {
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
client.addPublishListener(named: "\(Self.self)") { result in
if self.shuttingDown {
self.logger.trace("Shutting down.")
} else if !self.client.isActive() {
self.logger.trace("Client is not currently active")
} else {
Task { try await self.handleResult(result) }
}
}
for await event in self.events().cancelOnGracefulShutdown() {
logger.trace("Received event: \(event)")
if event == .shuttingDown {
self.setIsShuttingDown()
} else if event == .connected {
group.addTask { try await self.subscribeToSensors() }
} else {
group.addTask { await self.unsubscribeToSensors() }
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
}
}
}
} onGracefulShutdown: {
// do something.
self.logger.debug("Received graceful shutdown.")
Task { [weak self] in await self?.setIsShuttingDown() }
}
} catch {
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
// to ignore the `noConnection` error.
logger.trace("Run error: \(error)")
// throw error
}
}
private func setIsShuttingDown() {
logger.debug("Received shut down event.")
Task { try await publishUpdates() }
Task { await self.unsubscribeToSensors() }
shuttingDown = true
client.removePublishListener(named: "\(Self.self)")
}
private func handleResult(
_ result: Result<MQTTPublishInfo, any Error>
) async throws {
logger.trace("Begin handling result")
do {
switch result {
case let .failure(error):
logger.debug("Failed receiving sensor: \(error)")
throw error
case let .success(value):
// do something.
let topic = value.topicName
logger.trace("Received new value for topic: \(topic)")
if topic.contains("temperature") {
// do something.
var buffer = value.payload
guard let temperature = DryBulb(buffer: &buffer) else {
logger.trace("Decoding error for topic: \(topic)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
try await publishUpdates()
} else if topic.contains("humidity") {
var buffer = value.payload
// Decode and update the temperature value
guard let humidity = RelativeHumidity(buffer: &buffer) else {
logger.debug("Failed to decode humidity from buffer: \(buffer)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
try await publishUpdates()
}
}
} catch {
logger.trace("Handle Result error: \(error)")
throw error
}
}
private func subscribeToSensors() async throws {
for sensor in sensors {
_ = try await client.subscribe(to: [
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
])
logger.debug("Subscribed to sensor: \(sensor.location)")
}
}
private func unsubscribeToSensors() async {
logger.trace("Begin unsubscribe to sensors.")
guard client.isActive() else {
logger.debug("Client is not active, skipping.")
return
}
do {
let topics = sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
try await client.unsubscribe(from: topics)
logger.trace("Unsubscribed from sensors.")
} catch {
logger.trace("Unsubscribe error: \(error)")
}
}
private func publish(double: Double?, to topic: String) async throws {
guard client.isActive() else { return }
guard let double else { return }
let rounded = round(double * 100) / 100
logger.debug("Publishing \(rounded), to: \(topic)")
try await client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
qos: .exactlyOnce,
retain: true
)
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
try sensors.hasProcessed(sensor)
}
}
}
// MARK: - Errors
struct DecodingError: Error {}
struct MQTTClientNotConnected: Error {}
struct NotFoundError: Error {}
struct SensorExists: Error {}
struct SensorCountError: Error {}
struct SensorNotFoundError: Error {}
// MARK: - Helpers
@@ -319,14 +192,14 @@ private extension Array where Element == TemperatureAndHumiditySensor {
with value: V
) throws {
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
throw NotFoundError()
throw SensorNotFoundError()
}
self[index][keyPath: keyPath] = value
}
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
throw NotFoundError()
throw SensorNotFoundError()
}
self[index].needsProcessed = false
}

View File

@@ -0,0 +1,186 @@
import Dependencies
import DependenciesMacros
import Foundation
import MQTTNIO
/// A dependency that can generate an async stream of changes to the given topics.
///
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
/// to generate the live dependency.
@DependencyClient
public struct TopicListener: Sendable {
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, any Error>>
/// Create an async stream that listens for changes to the given topics.
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
/// Shutdown the listener stream.
public var shutdown: @Sendable () -> Void
/// Create a new topic listener.
///
/// - Parameters:
/// - listen: Generate an async stream of changes for the given topics.
/// - shutdown: Shutdown the topic listener stream.
public init(
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
shutdown: @Sendable @escaping () -> Void
) {
self._listen = listen
self.shutdown = shutdown
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await listen(to: topics, qos: qos)
}
/// Create the live implementation of the topic listener with the given MQTTClient.
///
/// - Parameters:
/// - client: The MQTTClient to use.
public static func live(client: MQTTClient) -> Self {
let listener = MQTTTopicListener(client: client)
return .init(
listen: { try await listener.listen($0, $1) },
shutdown: { listener.shutdown() }
)
}
}
extension TopicListener: TestDependencyKey {
public static var testValue: TopicListener { Self() }
}
public extension DependencyValues {
var topicListener: TopicListener {
get { self[TopicListener.self] }
set { self[TopicListener.self] = newValue }
}
}
// MARK: - Helpers
private actor MQTTTopicListener {
private let client: MQTTClient
private let continuation: TopicListener.Stream.Continuation
private let name: String
let stream: TopicListener.Stream
private var shuttingDown: Bool = false
init(
client: MQTTClient
) {
let (stream, continuation) = TopicListener.Stream.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws -> TopicListener.Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw TopicListenerError.connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
let subscription = try? await client.subscribe(to: topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
})
guard subscription != nil else {
client.logger.error("Error subscribing to topics: \(topics)")
throw TopicListenerError.failedToSubscribe
}
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.client.logger.error("Received error while listening: \(error)")
self.continuation.yield(.failure(MQTTListenResultError(error)))
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(.success(publishInfo))
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
}
nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task { await self.setIsShuttingDown() }
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -0,0 +1,117 @@
import Dependencies
import DependenciesMacros
import MQTTNIO
import NIO
/// A dependency that is responsible for publishing values to an MQTT broker.
///
/// - Note: This dependency only conforms to `TestDependencyKey` because it
/// requires an active `MQTTClient` to generate the live dependency.
@DependencyClient
public struct TopicPublisher: Sendable {
private var _publish: @Sendable (PublishRequest) async throws -> Void
/// Create a new topic publisher.
///
/// - Parameters:
/// - publish: Handle the publish request.
public init(
publish: @Sendable @escaping (PublishRequest) async throws -> Void
) {
self._publish = publish
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - topicName: The topic to publish the new value to.
/// - payload: The value to publish.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
to topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await _publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Create the live topic publisher with the given `MQTTClient`.
///
/// - Parameters:
/// - client: The mqtt broker client to use.
public static func live(client: MQTTClient) -> Self {
.init(
publish: { request in
guard client.isActive() else {
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
return
}
client.logger.trace("Begin publishing to topic: \(request.topicName)")
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
}
)
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
extension TopicPublisher: TestDependencyKey {
public static var testValue: TopicPublisher { Self() }
}
public extension DependencyValues {
/// A dependency that is responsible for publishing values to an MQTT broker.
var topicPublisher: TopicPublisher {
get { self[TopicPublisher.self] }
set { self[TopicPublisher.self] = newValue }
}
}

View File

@@ -1,15 +1,19 @@
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
import TopicDependencies
@main
struct Application {
/// The main entry point of the application.
static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
@@ -31,14 +35,17 @@ struct Application {
logger: logger
)
let mqttConnection = MQTTConnectionService(client: mqtt)
let sensors = SensorsService(
client: mqtt,
events: { mqttConnection.events },
sensors: .live
)
do {
try await withDependencies {
$0.psychrometricClient = .liveValue
$0.topicListener = .live(client: mqtt)
$0.topicPublisher = .live(client: mqtt)
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
let serviceGroup = ServiceGroup(
var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [
mqttConnection,
sensors
@@ -46,9 +53,20 @@ struct Application {
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
try await serviceGroup.run()
}
try await mqtt.shutdown()
try await eventloopGroup.shutdownGracefully()
} catch {
try await eventloopGroup.shutdownGracefully()
}
}
}
// MARK: - Helpers

View File

@@ -1,7 +1,8 @@
import Combine
import AsyncAlgorithms
import Logging
import Models
@testable import MQTTConnectionService
@_spi(Internal) import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import ServiceLifecycle
@@ -13,22 +14,52 @@ final class MQTTConnectionServiceTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
var logger = Logger(label: "MQTTConnectionServiceTests")
logger.logLevel = .trace
return logger
}()
func testGracefulShutdownWorks() async throws {
try await testGracefulShutdown { trigger in
let client = createClient(identifier: "testGracefulShutdown")
let service = MQTTConnectionService(client: client)
try await service.run()
try await Task.sleep(for: .seconds(1))
XCTAssert(client.isActive())
trigger.triggerGracefulShutdown()
// try await Task.sleep(for: .seconds(2))
// XCTAssertFalse(client.isActive())
// TODO: Move to integration tests.
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTConnectionManager.Event]()
var events2 = [MQTTConnectionManager.Event]()
let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
manager.shutdown()
try await client.disconnect()
try await Task.sleep(for: .seconds(1))
try await client.shutdown()
try await Task.sleep(for: .seconds(1))
connectionStream1.stop()
connectionStream2.stop()
}
for await event in stream1.removeDuplicates() {
events1.append(event)
}
for await event in stream2.removeDuplicates() {
events2.append(event)
}
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func createClient(identifier: String) -> MQTTClient {
@@ -58,65 +89,4 @@ final class MQTTConnectionServiceTests: XCTestCase {
)
}
func testEventStream() async throws {
var connection: ConnectionStream? = ConnectionStream()
let task = Task {
guard let events = connection?.events else { return }
print("before loop")
for await event in events {
print("\(event)")
}
print("after loop")
}
let ending = Task {
try await Task.sleep(for: .seconds(2))
connection = nil
}
connection?.start()
try await ending.value
task.cancel()
}
}
class ConnectionStream {
enum Event {
case connected
case disconnected
case shuttingDown
}
let events: AsyncStream<Event>
private let continuation: AsyncStream<Event>.Continuation
private var cancellable: AnyCancellable?
init() {
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
print("connection stream is gone.")
stop()
}
func start() {
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.sink { [weak self] _ in
print("will send event.")
self?.continuation.yield(.connected)
}
}
func stop() {
continuation.yield(.shuttingDown)
cancellable = nil
continuation.finish()
}
}

View File

@@ -1,10 +1,12 @@
import Dependencies
import Logging
import Models
@_spi(Internal) import MQTTConnectionManager
import MQTTNIO
import NIO
import PsychrometricClientLive
@testable import SensorsService
@_spi(Internal) import SensorsService
import TopicDependencies
import XCTest
final class SensorsClientTests: XCTestCase {
@@ -12,30 +14,75 @@ final class SensorsClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
logger.logLevel = .debug
var logger = Logger(label: "SensorsClientTests")
logger.logLevel = .trace
return logger
}()
override func invokeTest() {
let client = createClient(identifier: "\(Self.self)")
withDependencies {
$0.mqttConnectionManager = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue
$0.topicListener = .live(client: client)
$0.topicPublisher = .live(client: client)
} operation: {
super.invokeTest()
}
}
// func createClient(identifier: String) -> SensorsClient {
// let envVars = EnvVars(
// appEnv: .testing,
// host: Self.hostname,
// port: "1883",
// identifier: identifier,
// userName: nil,
// password: nil
// )
// return .init(envVars: envVars, logger: Self.logger)
// }
func testListeningResumesAfterDisconnectThenReconnect() async throws {
@Dependency(\.mqttConnectionManager) var manager
struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return)
var results = [TopicPublisher.PublishRequest]()
try await withDependencies {
$0.topicPublisher = .capturing { results.append($0) }
} operation: {
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() }
defer { task.cancel() }
try await manager.connect()
defer { manager.shutdown() }
try await manager.withClient { client in
try await client.disconnect()
try await client.connect()
try await Task.sleep(for: .milliseconds(100))
try await client.publish(
to: sensor.topics.temperature,
payload: ByteBufferAllocator().buffer(string: "25"),
qos: .atLeastOnce,
retain: false
)
try await client.publish(
to: sensor.topics.humidity,
payload: ByteBufferAllocator().buffer(string: "50"),
qos: .atLeastOnce,
retain: false
)
}
var timeoutCount = 0
while !(results.count == 2) {
guard timeoutCount < 20 else {
throw TimeoutError()
}
try await Task.sleep(for: .milliseconds(100))
timeoutCount += 1
}
XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
try await sensorsService.shutdown()
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
@@ -62,178 +109,6 @@ final class SensorsClientTests: XCTestCase {
configuration: config
)
}
// func testConnectAndShutdown() async throws {
// let client = createClient(identifier: "testConnectAndShutdown")
// await client.connect()
// await client.shutdown()
// }
// func testSensorService() async throws {
// let mqtt = createClient(identifier: "testSensorService")
// // let mqtt = await client.client
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
// let publishInfo = PublishInfoContainer(topicFilters: [
// sensor.topics.dewPoint,
// sensor.topics.enthalpy
// ])
// let service = SensorsService(client: mqtt, sensors: [sensor])
//
// // fix to connect the mqtt client.
// try await mqtt.connect()
// let task = Task { try await service.run() }
//
// _ = try await mqtt.subscribe(to: [
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce),
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce)
// ])
//
// let listener = mqtt.createPublishListener()
// Task {
// for await result in listener {
// switch result {
// case let .failure(error):
// XCTFail("\(error)")
// case let .success(value):
// await publishInfo.addPublishInfo(value)
// }
// }
// }
//
// try await mqtt.publish(
// to: sensor.topics.temperature,
// payload: ByteBufferAllocator().buffer(string: "75.123"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// // XCTAssert(client.sensors.first!.needsProcessed)
// // let firstSensor = await client.sensors.first!
// // XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius))
//
// try await mqtt.publish(
// to: sensor.topics.humidity,
// payload: ByteBufferAllocator().buffer(string: "50"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// // not working for some reason
// // XCTAssertEqual(publishInfo.info.count, 2)
//
// XCTAssert(publishInfo.info.count > 1)
//
// // fix to shutdown the mqtt client.
// task.cancel()
// try await mqtt.shutdown()
// }
func testCapturingSensorClient() async throws {
class CapturedValues {
var values = [(value: Double, topic: String)]()
var didShutdown = false
init() {}
}
let capturedValues = CapturedValues()
try await withDependencies {
$0.sensorsClient = .testing(
yielding: [
(value: 76, to: "not-listening"),
(value: 75, to: "test")
]
) { value, topic in
capturedValues.values.append((value, topic))
} captureShutdownEvent: {
capturedValues.didShutdown = $0
}
} operation: {
@Dependency(\.sensorsClient) var client
let stream = try await client.listen(to: ["test"])
for await value in stream {
var buffer = value.payload
guard let double = Double(buffer: &buffer) else {
XCTFail("Failed to decode double")
return
}
XCTAssertEqual(double, 75)
XCTAssertEqual(value.topicName, "test")
try await client.publish(26, to: "publish")
try await Task.sleep(for: .milliseconds(100))
client.shutdown()
}
XCTAssertEqual(capturedValues.values.count, 1)
XCTAssertEqual(capturedValues.values.first?.value, 26)
XCTAssertEqual(capturedValues.values.first?.topic, "publish")
XCTAssertTrue(capturedValues.didShutdown)
}
}
// func testSensorCapturesPublishedState() async throws {
// let client = createClient(identifier: "testSensorCapturesPublishedState")
// let mqtt = client.client
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
// let publishInfo = PublishInfoContainer(topicFilters: [
// sensor.topics.dewPoint,
// sensor.topics.enthalpy
// ])
//
// try await client.addSensor(sensor)
// await client.connect()
// try await client.start()
//
// _ = try await mqtt.subscribe(to: [
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: MQTTQoS.exactlyOnce),
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: MQTTQoS.exactlyOnce)
// ])
//
// let listener = mqtt.createPublishListener()
// Task {
// for await result in listener {
// switch result {
// case let .failure(error):
// XCTFail("\(error)")
// case let .success(value):
// await publishInfo.addPublishInfo(value)
// }
// }
// }
//
// try await mqtt.publish(
// to: sensor.topics.temperature,
// payload: ByteBufferAllocator().buffer(string: "75.123"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// // XCTAssert(client.sensors.first!.needsProcessed)
// let firstSensor = client.sensors.first!
// XCTAssertEqual(firstSensor.temperature, DryBulb.celsius(75.123))
//
// try await mqtt.publish(
// to: sensor.topics.humidity,
// payload: ByteBufferAllocator().buffer(string: "50"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// XCTAssertEqual(publishInfo.info.count, 2)
//
// await client.shutdown()
// }
}
// MARK: Helpers for tests.
@@ -258,41 +133,43 @@ class PublishInfoContainer {
}
}
extension SensorsClient {
static func testing(
yielding: [(value: Double, to: String)],
capturePublishedValues: @escaping (Double, String) -> Void,
captureShutdownEvent: @escaping (Bool) -> Void
extension TopicPublisher {
static func capturing(
_ callback: @escaping (PublishRequest) -> Void
) -> Self {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self)
let logger = Logger(label: "\(Self.self).testing")
return .init(
listen: { topics in
for (value, topic) in yielding where topics.contains(topic) {
continuation.yield(
MQTTPublishInfo(
qos: .atLeastOnce,
retain: true,
topicName: topic,
payload: ByteBuffer(string: "\(value)"),
properties: MQTTProperties()
)
)
}
return stream
},
logger: logger,
publish: { value, topic in
capturePublishedValues(value, topic)
},
shutdown: {
captureShutdownEvent(true)
continuation.finish()
}
)
.init { callback($0) }
}
}
// extension SensorsClient {
//
// static func testing(
// yielding: [(value: Double, to: String)],
// capturePublishedValues: @escaping (Double, String) -> Void,
// captureShutdownEvent: @escaping (Bool) -> Void
// ) -> Self {
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
// let logger = Logger(label: "\(Self.self).testing")
//
// return .init(
// listen: { topics in
// for (value, topic) in yielding where topics.contains(topic) {
// continuation.yield(
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
// )
// }
// return stream
// },
// logger: logger,
// publish: { value, topic in
// capturePublishedValues(value, topic)
// },
// shutdown: {
// captureShutdownEvent(true)
// continuation.finish()
// }
// )
// }
// }
struct TopicNotFoundError: Error {}

View File

@@ -1,47 +0,0 @@
import XCTest
import class Foundation.Bundle
//final class dewPoint_controllerTests: XCTestCase {
// func testExample() throws {
// // This is an example of a functional test case.
// // Use XCTAssert and related functions to verify your tests produce the correct
// // results.
//
// // Some of the APIs that we use below are available in macOS 10.13 and above.
// guard #available(macOS 10.13, *) else {
// return
// }
//
// // Mac Catalyst won't have `Process`, but it is supported for executables.
// #if !targetEnvironment(macCatalyst)
//
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
//
// let process = Process()
// process.executableURL = fooBinary
//
// let pipe = Pipe()
// process.standardOutput = pipe
//
// try process.run()
// process.waitUntilExit()
//
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
// let output = String(data: data, encoding: .utf8)
//
// XCTAssertEqual(output, "Hello, world!\n")
// #endif
// }
//
// /// Returns path to the built products directory.
// var productsDirectory: URL {
// #if os(macOS)
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
// return bundle.bundleURL.deletingLastPathComponent()
// }
// fatalError("couldn't find the products directory")
// #else
// return Bundle.main.bundleURL
// #endif
// }
//}

View File

@@ -10,17 +10,15 @@ services:
build:
context: .
dockerfile: Dockerfile
platform: linux/amd64
depends_on:
- mosquitto
environment:
- MOSQUITTO_SERVER=mosquitto
- MQTT_HOST=mosquitto
test:
build:
context: .
dockerfile: Dockerfile.test
platform: linux/amd64
working_dir: /app
networks:
- test