Compare commits
10 Commits
f68ac528e4
...
e7a849b003
| Author | SHA1 | Date | |
|---|---|---|---|
|
e7a849b003
|
|||
|
bd2a798320
|
|||
|
b8992b89b6
|
|||
|
efd9907b4a
|
|||
|
fbbd65f7ae
|
|||
|
8067331ff8
|
|||
|
b6db9b5322
|
|||
|
bf1126b06a
|
|||
|
ef552fb8bc
|
|||
|
1e62d7aac0
|
@@ -8,3 +8,4 @@
|
||||
--wrapconditions after-first
|
||||
--typeblanklines preserve
|
||||
--commas inline
|
||||
--stripunusedargs closure-only
|
||||
|
||||
@@ -11,4 +11,4 @@ RUN swift build --enable-test-discovery -c release -Xswiftc -g
|
||||
FROM swift:5.10-slim
|
||||
WORKDIR /run
|
||||
COPY --from=build /build/.build/release/dewPoint-controller /run
|
||||
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]
|
||||
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]
|
||||
|
||||
2
Makefile
2
Makefile
@@ -23,7 +23,7 @@ stop-mosquitto:
|
||||
@docker-compose rm -f mosquitto || true
|
||||
|
||||
test-docker:
|
||||
@docker-compose run --remove-orphans -i --rm test
|
||||
@docker-compose run --build --remove-orphans -i --rm test
|
||||
@docker-compose kill mosquitto-test
|
||||
@docker-compose rm -f
|
||||
|
||||
|
||||
@@ -8,17 +8,20 @@ let swiftSettings: [SwiftSetting] = [
|
||||
]
|
||||
|
||||
let package = Package(
|
||||
name: "dewPoint-controller",
|
||||
name: "dewpoint-controller",
|
||||
platforms: [
|
||||
.macOS(.v14)
|
||||
],
|
||||
products: [
|
||||
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
|
||||
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
|
||||
.library(name: "Models", targets: ["Models"]),
|
||||
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
|
||||
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
|
||||
.library(name: "SensorsService", targets: ["SensorsService"])
|
||||
.library(name: "SensorsService", targets: ["SensorsService"]),
|
||||
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
|
||||
],
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
|
||||
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
|
||||
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
|
||||
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
|
||||
@@ -28,20 +31,18 @@ let package = Package(
|
||||
],
|
||||
targets: [
|
||||
.executableTarget(
|
||||
name: "dewPoint-controller",
|
||||
name: "dewpoint-controller",
|
||||
dependencies: [
|
||||
"Models",
|
||||
"MQTTConnectionManager",
|
||||
"MQTTConnectionService",
|
||||
"SensorsService",
|
||||
"TopicDependencies",
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
.product(name: "NIO", package: "swift-nio"),
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||
]
|
||||
),
|
||||
.testTarget(
|
||||
name: "dewPoint-controllerTests",
|
||||
dependencies: ["dewPoint-controller"]
|
||||
),
|
||||
.target(
|
||||
name: "Models",
|
||||
dependencies: [
|
||||
@@ -50,11 +51,21 @@ let package = Package(
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.target(
|
||||
name: "MQTTConnectionManager",
|
||||
dependencies: [
|
||||
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.target(
|
||||
name: "MQTTConnectionService",
|
||||
dependencies: [
|
||||
"Models",
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
"MQTTConnectionManager",
|
||||
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
@@ -63,6 +74,7 @@ let package = Package(
|
||||
name: "MQTTConnectionServiceTests",
|
||||
dependencies: [
|
||||
"MQTTConnectionService",
|
||||
"MQTTConnectionManager",
|
||||
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
|
||||
]
|
||||
),
|
||||
@@ -71,6 +83,7 @@ let package = Package(
|
||||
dependencies: [
|
||||
"Models",
|
||||
"MQTTConnectionService",
|
||||
"TopicDependencies",
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
@@ -84,6 +97,15 @@ let package = Package(
|
||||
"SensorsService",
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||
]
|
||||
),
|
||||
.target(
|
||||
name: "TopicDependencies",
|
||||
dependencies: [
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
199
Sources/MQTTConnectionManager/Live.swift
Normal file
199
Sources/MQTTConnectionManager/Live.swift
Normal file
@@ -0,0 +1,199 @@
|
||||
import AsyncAlgorithms
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import Foundation
|
||||
import Logging
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
public extension DependencyValues {
|
||||
|
||||
/// A dependency that is responsible for managing the connection to
|
||||
/// an MQTT broker.
|
||||
var mqttConnectionManager: MQTTConnectionManager {
|
||||
get { self[MQTTConnectionManager.self] }
|
||||
set { self[MQTTConnectionManager.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the interface needed for the ``MQTTConnectionService``.
|
||||
///
|
||||
/// See ``MQTTConnectionManagerLive`` module for live implementation.
|
||||
@DependencyClient
|
||||
public struct MQTTConnectionManager: Sendable {
|
||||
|
||||
/// Connect to the MQTT broker.
|
||||
public var connect: @Sendable () async throws -> Void
|
||||
|
||||
/// Shutdown the connection to the MQTT broker.
|
||||
///
|
||||
/// - Note: You should cancel any tasks that are listening to the connection stream first.
|
||||
public var shutdown: @Sendable () -> Void
|
||||
|
||||
/// Create a stream of connection events.
|
||||
public var stream: @Sendable () throws -> AsyncStream<Event>
|
||||
|
||||
/// Represents connection events that clients can listen for and
|
||||
/// react accordingly.
|
||||
public enum Event: Sendable {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
public static func live(
|
||||
client: MQTTClient,
|
||||
cleanSession: Bool = false,
|
||||
logger: Logger? = nil,
|
||||
alwaysReconnect: Bool = true
|
||||
) -> Self {
|
||||
let manager = ConnectionManager(
|
||||
client: client,
|
||||
logger: logger,
|
||||
alwaysReconnect: alwaysReconnect
|
||||
)
|
||||
return .init {
|
||||
try await manager.connect(cleanSession: cleanSession)
|
||||
} shutdown: {
|
||||
manager.shutdown()
|
||||
} stream: {
|
||||
MQTTConnectionStream(client: client, logger: logger)
|
||||
.start()
|
||||
.removeDuplicates()
|
||||
.eraseToStream()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTConnectionManager: TestDependencyKey {
|
||||
public static var testValue: MQTTConnectionManager {
|
||||
Self()
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
final class MQTTConnectionStream: AsyncSequence, Sendable {
|
||||
|
||||
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
|
||||
typealias Element = MQTTConnectionManager.Event
|
||||
|
||||
private let client: MQTTClient
|
||||
private let continuation: AsyncStream<Element>.Continuation
|
||||
private let logger: Logger?
|
||||
private let name: String
|
||||
private let stream: AsyncStream<Element>
|
||||
|
||||
init(client: MQTTClient, logger: Logger?) {
|
||||
let (stream, continuation) = AsyncStream<Element>.makeStream()
|
||||
self.client = client
|
||||
self.continuation = continuation
|
||||
self.logger = logger
|
||||
self.name = UUID().uuidString
|
||||
self.stream = stream
|
||||
}
|
||||
|
||||
deinit { stop() }
|
||||
|
||||
func start(
|
||||
isolation: isolated (any Actor)? = #isolation
|
||||
) -> AsyncStream<Element> {
|
||||
// Check if the client is active and yield the result.
|
||||
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||
|
||||
// Register listener on the client for when the connection
|
||||
// closes.
|
||||
client.addCloseListener(named: name) { _ in
|
||||
self.logger?.trace("Client has disconnected.")
|
||||
self.continuation.yield(.disconnected)
|
||||
}
|
||||
|
||||
// Register listener on the client for when the client
|
||||
// is shutdown.
|
||||
client.addShutdownListener(named: name) { _ in
|
||||
self.logger?.trace("Client is shutting down.")
|
||||
self.continuation.yield(.shuttingDown)
|
||||
self.stop()
|
||||
}
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
func stop() {
|
||||
client.removeCloseListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
continuation.finish()
|
||||
}
|
||||
|
||||
public __consuming func makeAsyncIterator() -> AsyncIterator {
|
||||
start().makeAsyncIterator()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
actor ConnectionManager {
|
||||
private let client: MQTTClient
|
||||
private let logger: Logger?
|
||||
private let name: String
|
||||
private let shouldReconnect: Bool
|
||||
private var hasConnected: Bool = false
|
||||
|
||||
init(
|
||||
client: MQTTClient,
|
||||
logger: Logger?,
|
||||
alwaysReconnect: Bool
|
||||
) {
|
||||
self.client = client
|
||||
self.logger = logger
|
||||
self.name = UUID().uuidString
|
||||
self.shouldReconnect = alwaysReconnect
|
||||
}
|
||||
|
||||
deinit {
|
||||
// We should've already logged that we're shutting down if
|
||||
// the manager was shutdown properly, so don't log it twice.
|
||||
self.shutdown(withLogging: false)
|
||||
}
|
||||
|
||||
private func setHasConnected() {
|
||||
hasConnected = true
|
||||
}
|
||||
|
||||
func connect(
|
||||
isolation: isolated (any Actor)? = #isolation,
|
||||
cleanSession: Bool
|
||||
) async throws {
|
||||
guard !(await hasConnected) else { return }
|
||||
do {
|
||||
try await client.connect(cleanSession: cleanSession)
|
||||
await setHasConnected()
|
||||
|
||||
client.addCloseListener(named: name) { [weak self] _ in
|
||||
guard let `self` else { return }
|
||||
self.logger?.debug("Connection closed.")
|
||||
if self.shouldReconnect {
|
||||
self.logger?.debug("Reconnecting...")
|
||||
Task {
|
||||
try await self.connect(cleanSession: cleanSession)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.addShutdownListener(named: name) { _ in
|
||||
self.shutdown()
|
||||
}
|
||||
|
||||
} catch {
|
||||
logger?.trace("Failed to connect: \(error)")
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
nonisolated func shutdown(withLogging: Bool = true) {
|
||||
if withLogging {
|
||||
logger?.trace("Shutting down connection.")
|
||||
}
|
||||
client.removeCloseListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
}
|
||||
}
|
||||
@@ -1,151 +1,37 @@
|
||||
@preconcurrency import Foundation
|
||||
import Dependencies
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import MQTTConnectionManager
|
||||
import ServiceLifecycle
|
||||
|
||||
// TODO: This may not need to be an actor.
|
||||
|
||||
/// Manages the MQTT broker connection.
|
||||
public actor MQTTConnectionService: Service {
|
||||
@Dependency(\.mqttConnectionManager) var manager
|
||||
|
||||
private let cleanSession: Bool
|
||||
public let client: MQTTClient
|
||||
private let continuation: AsyncStream<Event>.Continuation
|
||||
public nonisolated let events: AsyncStream<Event>
|
||||
private let internalEventStream: ConnectionStream
|
||||
nonisolated var logger: Logger { client.logger }
|
||||
// private var shuttingDown = false
|
||||
private nonisolated let logger: Logger?
|
||||
|
||||
public init(
|
||||
cleanSession: Bool = true,
|
||||
client: MQTTClient
|
||||
logger: Logger? = nil
|
||||
) {
|
||||
self.cleanSession = cleanSession
|
||||
self.client = client
|
||||
self.internalEventStream = .init()
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
self.logger = logger
|
||||
}
|
||||
|
||||
deinit {
|
||||
self.logger.debug("MQTTConnectionService is gone.")
|
||||
self.internalEventStream.stop()
|
||||
continuation.finish()
|
||||
}
|
||||
|
||||
/// The entry-point of the service.
|
||||
///
|
||||
/// This method connects to the MQTT broker and manages the connection.
|
||||
/// It will attempt to gracefully shutdown the connection upon receiving
|
||||
/// `sigterm` signals.
|
||||
/// The entry-point of the service which starts the connection
|
||||
/// to the MQTT broker and handles graceful shutdown of the
|
||||
/// connection.
|
||||
public func run() async throws {
|
||||
await withGracefulShutdownHandler {
|
||||
await withDiscardingTaskGroup { group in
|
||||
group.addTask { await self.connect() }
|
||||
group.addTask {
|
||||
await self.internalEventStream.start { self.client.isActive() }
|
||||
}
|
||||
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() {
|
||||
if event == .shuttingDown {
|
||||
self.shutdown()
|
||||
break
|
||||
}
|
||||
self.logger.trace("Sending connection event: \(event)")
|
||||
self.continuation.yield(event)
|
||||
}
|
||||
group.cancelAll()
|
||||
try await withGracefulShutdownHandler {
|
||||
try await manager.connect()
|
||||
for await event in try manager.stream().cancelOnGracefulShutdown() {
|
||||
// We don't really need to do anything with the events, so just logging
|
||||
// for now. But we need to iterate on an async stream for the service to
|
||||
// continue to run and handle graceful shutdowns.
|
||||
logger?.trace("Received connection event: \(event)")
|
||||
}
|
||||
// when we reach here we are shutting down, so we shutdown
|
||||
// the manager.
|
||||
manager.shutdown()
|
||||
} onGracefulShutdown: {
|
||||
self.logger.trace("Received graceful shutdown.")
|
||||
self.shutdown()
|
||||
self.logger?.trace("Received graceful shutdown.")
|
||||
}
|
||||
}
|
||||
|
||||
func connect() async {
|
||||
do {
|
||||
try await withThrowingDiscardingTaskGroup { group in
|
||||
group.addTask {
|
||||
try await self.client.connect(cleanSession: self.cleanSession)
|
||||
}
|
||||
client.addCloseListener(named: "\(Self.self)") { _ in
|
||||
Task {
|
||||
self.logger.debug("Connection closed.")
|
||||
self.logger.debug("Reconnecting...")
|
||||
await self.connect()
|
||||
}
|
||||
}
|
||||
self.logger.debug("Connection successful.")
|
||||
self.continuation.yield(.connected)
|
||||
}
|
||||
} catch {
|
||||
logger.trace("Failed to connect: \(error)")
|
||||
continuation.yield(.disconnected)
|
||||
}
|
||||
}
|
||||
|
||||
private nonisolated func shutdown() {
|
||||
logger.debug("Begin shutting down MQTT broker connection.")
|
||||
client.removeCloseListener(named: "\(Self.self)")
|
||||
internalEventStream.stop()
|
||||
_ = client.disconnect()
|
||||
try? client.syncShutdownGracefully()
|
||||
continuation.finish()
|
||||
logger.info("MQTT broker connection closed.")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
extension MQTTConnectionService {
|
||||
|
||||
public enum Event: Sendable {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
// TODO: This functionality can probably move into the connection service.
|
||||
|
||||
private final class ConnectionStream: Sendable {
|
||||
|
||||
// private var cancellable: AnyCancellable?
|
||||
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
|
||||
let events: AsyncStream<MQTTConnectionService.Event>
|
||||
|
||||
init() {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
stop()
|
||||
}
|
||||
|
||||
func start(isActive connectionIsActive: @escaping () -> Bool) async {
|
||||
try? await Task.sleep(for: .seconds(1))
|
||||
let event: MQTTConnectionService.Event = connectionIsActive()
|
||||
? .connected
|
||||
: .disconnected
|
||||
|
||||
continuation.yield(event)
|
||||
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
||||
// .autoconnect()
|
||||
// .sink { [weak self] (_: Date) in
|
||||
// let event: MQTTConnectionService.Event = connectionIsActive()
|
||||
// ? .connected
|
||||
// : .disconnected
|
||||
//
|
||||
// self?.continuation.yield(event)
|
||||
// }
|
||||
}
|
||||
|
||||
func stop() {
|
||||
continuation.yield(.shuttingDown)
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -56,10 +56,11 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
||||
public var dewPoint: DewPoint? {
|
||||
get async {
|
||||
guard let temperature = temperature,
|
||||
let humidity = humidity
|
||||
let humidity = humidity,
|
||||
!temperature.value.isNaN,
|
||||
!humidity.value.isNaN
|
||||
else { return nil }
|
||||
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
|
||||
// return .init(dryBulb: temperature, humidity: humidity)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,12 +68,13 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
||||
public var enthalpy: EnthalpyOf<MoistAir>? {
|
||||
get async {
|
||||
guard let temperature = temperature,
|
||||
let humidity = humidity
|
||||
let humidity = humidity,
|
||||
!temperature.value.isNaN,
|
||||
!humidity.value.isNaN
|
||||
else { return nil }
|
||||
return try? await psychrometrics.enthalpy.moistAir(
|
||||
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
|
||||
)
|
||||
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,10 +131,10 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
||||
prefix = "\(prefix.dropLast())"
|
||||
}
|
||||
self.init(
|
||||
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state",
|
||||
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state",
|
||||
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state",
|
||||
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state"
|
||||
dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
|
||||
enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
|
||||
humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
|
||||
temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
/// This allows values to only publish changes if they have changed since the
|
||||
/// last time they were recieved.
|
||||
@propertyWrapper
|
||||
public struct TrackedChanges<Value> {
|
||||
public struct TrackedChanges<Value: Sendable>: Sendable {
|
||||
|
||||
/// The current tracking state.
|
||||
private var tracking: TrackingState
|
||||
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
|
||||
private var value: Value
|
||||
|
||||
/// Used to check if a new value is equal to an old value.
|
||||
private var isEqual: (Value, Value) -> Bool
|
||||
private var isEqual: @Sendable (Value, Value) -> Bool
|
||||
|
||||
/// Access to the underlying property that we are wrapping.
|
||||
public var wrappedValue: Value {
|
||||
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
|
||||
public init(
|
||||
wrappedValue: Value,
|
||||
needsProcessed: Bool = false,
|
||||
isEqual: @escaping (Value, Value) -> Bool
|
||||
isEqual: @escaping @Sendable (Value, Value) -> Bool
|
||||
) {
|
||||
self.value = wrappedValue
|
||||
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
|
||||
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
|
||||
wrappedValue: Value,
|
||||
needsProcessed: Bool = false
|
||||
) {
|
||||
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
|
||||
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
|
||||
$0 == $1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
|
||||
hasher.combine(needsProcessed)
|
||||
}
|
||||
}
|
||||
|
||||
extension TrackedChanges: Sendable where Value: Sendable {}
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import NIOFoundationCompat
|
||||
import PsychrometricClient
|
||||
|
||||
@@ -3,305 +3,160 @@ import DependenciesMacros
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionService
|
||||
@preconcurrency import MQTTNIO
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClient
|
||||
import ServiceLifecycle
|
||||
import TopicDependencies
|
||||
|
||||
@DependencyClient
|
||||
public struct SensorsClient: Sendable {
|
||||
/// Service that is responsible for listening to changes of the temperature and humidity
|
||||
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
|
||||
/// the sensor location.
|
||||
///
|
||||
///
|
||||
public actor SensorsService: Service {
|
||||
|
||||
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo>
|
||||
public var logger: Logger?
|
||||
public var publish: @Sendable (Double, String) async throws -> Void
|
||||
public var shutdown: @Sendable () -> Void = {}
|
||||
@Dependency(\.topicListener) var topicListener
|
||||
@Dependency(\.topicPublisher) var topicPublisher
|
||||
|
||||
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> {
|
||||
try await listen(topics)
|
||||
}
|
||||
/// The logger to use for the service.
|
||||
private let logger: Logger?
|
||||
|
||||
public func publish(_ value: Double, to topic: String) async throws {
|
||||
try await publish(value, topic)
|
||||
}
|
||||
}
|
||||
/// The sensors that we are listening for updates to, so
|
||||
/// that we can calculate the dew-point temperature and enthalpy
|
||||
/// values to publish back to the MQTT broker.
|
||||
var sensors: [TemperatureAndHumiditySensor]
|
||||
|
||||
extension SensorsClient: TestDependencyKey {
|
||||
public static var testValue: SensorsClient {
|
||||
Self()
|
||||
}
|
||||
}
|
||||
|
||||
public extension DependencyValues {
|
||||
var sensorsClient: SensorsClient {
|
||||
get { self[SensorsClient.self] }
|
||||
set { self[SensorsClient.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
public actor SensorsService2: Service {
|
||||
|
||||
@Dependency(\.sensorsClient) var client
|
||||
|
||||
private var sensors: [TemperatureAndHumiditySensor]
|
||||
|
||||
public init(
|
||||
sensors: [TemperatureAndHumiditySensor]
|
||||
) {
|
||||
self.sensors = sensors
|
||||
}
|
||||
|
||||
public func run() async throws {
|
||||
guard sensors.count > 0 else {
|
||||
throw SensorCountError()
|
||||
}
|
||||
|
||||
let stream = try await client.listen(to: topics)
|
||||
|
||||
do {
|
||||
try await withGracefulShutdownHandler {
|
||||
try await withThrowingDiscardingTaskGroup { group in
|
||||
for await result in stream.cancelOnGracefulShutdown() {
|
||||
group.addTask { try await self.handleResult(result) }
|
||||
}
|
||||
}
|
||||
} onGracefulShutdown: {
|
||||
Task {
|
||||
await self.client.logger?.trace("Received graceful shutdown.")
|
||||
try? await self.publishUpdates()
|
||||
await self.client.shutdown()
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
client.logger?.trace("Error: \(error)")
|
||||
client.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
private var topics: [String] {
|
||||
var topics: [String] {
|
||||
sensors.reduce(into: [String]()) { array, sensor in
|
||||
array.append(sensor.topics.temperature)
|
||||
array.append(sensor.topics.humidity)
|
||||
}
|
||||
}
|
||||
|
||||
private func handleResult(_ result: MQTTPublishInfo) async throws {
|
||||
let topic = result.topicName
|
||||
client.logger?.trace("Begin handling result for topic: \(topic)")
|
||||
/// Create a new sensors service that listens to the passed in
|
||||
/// sensors.
|
||||
///
|
||||
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - sensors: The sensors to listen for changes to.
|
||||
/// - logger: An optional logger to use.
|
||||
public init(
|
||||
sensors: [TemperatureAndHumiditySensor],
|
||||
logger: Logger? = nil
|
||||
) {
|
||||
self.sensors = sensors
|
||||
self.logger = logger
|
||||
}
|
||||
|
||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||
var buffer = result.payload
|
||||
return V(buffer: &buffer)
|
||||
}
|
||||
/// Start the service with graceful shutdown, which will attempt to publish
|
||||
/// any pending changes to the MQTT broker, upon a shutdown signal.
|
||||
public func run() async throws {
|
||||
precondition(sensors.count > 0, "Sensors should not be empty.")
|
||||
|
||||
if topic.contains("temperature") {
|
||||
client.logger?.trace("Begin handling temperature result.")
|
||||
guard let temperature = decode(DryBulb.self) else {
|
||||
client.logger?.trace("Failed to decode temperature: \(result.payload)")
|
||||
throw DecodingError()
|
||||
let stream = try await makeStream()
|
||||
|
||||
await withGracefulShutdownHandler {
|
||||
await withDiscardingTaskGroup { group in
|
||||
for await result in stream.cancelOnGracefulShutdown() {
|
||||
logger?.trace("Received result for topic: \(result.topic)")
|
||||
group.addTask { await self.handleResult(result) }
|
||||
}
|
||||
// group.cancelAll()
|
||||
}
|
||||
client.logger?.trace("Decoded temperature: \(temperature)")
|
||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||
|
||||
} else if topic.contains("humidity") {
|
||||
client.logger?.trace("Begin handling humidity result.")
|
||||
guard let humidity = decode(RelativeHumidity.self) else {
|
||||
client.logger?.trace("Failed to decode humidity: \(result.payload)")
|
||||
throw DecodingError()
|
||||
} onGracefulShutdown: {
|
||||
Task {
|
||||
self.logger?.trace("Received graceful shutdown.")
|
||||
try? await self.publishUpdates()
|
||||
await self.topicListener.shutdown()
|
||||
}
|
||||
client.logger?.trace("Decoded humidity: \(humidity)")
|
||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||
} else {
|
||||
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
try await publishUpdates()
|
||||
client.logger?.trace("Done handling result for topic: \(topic)")
|
||||
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
|
||||
try await topicListener.listen(to: topics)
|
||||
// ignore errors, so that we continue to listen, but log them
|
||||
// for debugging purposes.
|
||||
.compactMap { result in
|
||||
switch result {
|
||||
case let .failure(error):
|
||||
self.logger?.trace("Received error listening for sensors: \(error)")
|
||||
return nil
|
||||
case let .success(info):
|
||||
return (info.payload, info.topicName)
|
||||
}
|
||||
}
|
||||
// ignore duplicate values, to prevent publishing dew-point and enthalpy
|
||||
// changes to frequently.
|
||||
.removeDuplicates { lhs, rhs in
|
||||
lhs.buffer == rhs.buffer
|
||||
&& lhs.topic == rhs.topic
|
||||
}
|
||||
.eraseToStream()
|
||||
}
|
||||
|
||||
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
|
||||
do {
|
||||
let topic = result.topic
|
||||
assert(topics.contains(topic))
|
||||
logger?.trace("Begin handling result for topic: \(topic)")
|
||||
|
||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||
var buffer = result.buffer
|
||||
return V(buffer: &buffer)
|
||||
}
|
||||
|
||||
if topic.contains("temperature") {
|
||||
logger?.trace("Begin handling temperature result.")
|
||||
guard let temperature = decode(DryBulb.self) else {
|
||||
logger?.trace("Failed to decode temperature: \(result.buffer)")
|
||||
throw DecodingError()
|
||||
}
|
||||
logger?.trace("Decoded temperature: \(temperature)")
|
||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||
|
||||
} else if topic.contains("humidity") {
|
||||
logger?.trace("Begin handling humidity result.")
|
||||
guard let humidity = decode(RelativeHumidity.self) else {
|
||||
logger?.trace("Failed to decode humidity: \(result.buffer)")
|
||||
throw DecodingError()
|
||||
}
|
||||
logger?.trace("Decoded humidity: \(humidity)")
|
||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||
}
|
||||
|
||||
try await publishUpdates()
|
||||
logger?.trace("Done handling result for topic: \(topic)")
|
||||
} catch {
|
||||
logger?.error("Received error: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
private func publish(_ double: Double?, to topic: String) async throws {
|
||||
guard let double else { return }
|
||||
try await client.publish(double, to: topic)
|
||||
client.logger?.trace("Published update to topic: \(topic)")
|
||||
try await topicPublisher.publish(
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(double)"),
|
||||
qos: .exactlyOnce,
|
||||
retain: true
|
||||
)
|
||||
logger?.trace("Published update to topic: \(topic)")
|
||||
}
|
||||
|
||||
private func publishUpdates() async throws {
|
||||
for sensor in sensors.filter(\.needsProcessed) {
|
||||
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
||||
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public actor SensorsService: Service {
|
||||
private var sensors: [TemperatureAndHumiditySensor]
|
||||
private let client: MQTTClient
|
||||
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
|
||||
nonisolated var logger: Logger { client.logger }
|
||||
private var shuttingDown: Bool = false
|
||||
|
||||
public init(
|
||||
client: MQTTClient,
|
||||
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
|
||||
sensors: [TemperatureAndHumiditySensor]
|
||||
) {
|
||||
self.client = client
|
||||
self.events = events
|
||||
self.sensors = sensors
|
||||
}
|
||||
|
||||
/// The entry-point of the service.
|
||||
///
|
||||
/// This method is called to start the service and begin
|
||||
/// listening for sensor value changes then publishing the dew-point
|
||||
/// and enthalpy values of the sensors.
|
||||
public func run() async throws {
|
||||
do {
|
||||
try await withGracefulShutdownHandler {
|
||||
try await withThrowingDiscardingTaskGroup { group in
|
||||
client.addPublishListener(named: "\(Self.self)") { result in
|
||||
if self.shuttingDown {
|
||||
self.logger.trace("Shutting down.")
|
||||
} else if !self.client.isActive() {
|
||||
self.logger.trace("Client is not currently active")
|
||||
} else {
|
||||
Task { try await self.handleResult(result) }
|
||||
}
|
||||
}
|
||||
for await event in self.events().cancelOnGracefulShutdown() {
|
||||
logger.trace("Received event: \(event)")
|
||||
if event == .shuttingDown {
|
||||
self.setIsShuttingDown()
|
||||
} else if event == .connected {
|
||||
group.addTask { try await self.subscribeToSensors() }
|
||||
} else {
|
||||
group.addTask { await self.unsubscribeToSensors() }
|
||||
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
|
||||
}
|
||||
}
|
||||
}
|
||||
} onGracefulShutdown: {
|
||||
// do something.
|
||||
self.logger.debug("Received graceful shutdown.")
|
||||
Task { [weak self] in await self?.setIsShuttingDown() }
|
||||
}
|
||||
} catch {
|
||||
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
|
||||
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
|
||||
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
|
||||
// to ignore the `noConnection` error.
|
||||
logger.trace("Run error: \(error)")
|
||||
// throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func setIsShuttingDown() {
|
||||
logger.debug("Received shut down event.")
|
||||
Task { try await publishUpdates() }
|
||||
Task { await self.unsubscribeToSensors() }
|
||||
shuttingDown = true
|
||||
client.removePublishListener(named: "\(Self.self)")
|
||||
}
|
||||
|
||||
private func handleResult(
|
||||
_ result: Result<MQTTPublishInfo, any Error>
|
||||
) async throws {
|
||||
logger.trace("Begin handling result")
|
||||
do {
|
||||
switch result {
|
||||
case let .failure(error):
|
||||
logger.debug("Failed receiving sensor: \(error)")
|
||||
throw error
|
||||
case let .success(value):
|
||||
// do something.
|
||||
let topic = value.topicName
|
||||
logger.trace("Received new value for topic: \(topic)")
|
||||
if topic.contains("temperature") {
|
||||
// do something.
|
||||
var buffer = value.payload
|
||||
guard let temperature = DryBulb(buffer: &buffer) else {
|
||||
logger.trace("Decoding error for topic: \(topic)")
|
||||
throw DecodingError()
|
||||
}
|
||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||
try await publishUpdates()
|
||||
|
||||
} else if topic.contains("humidity") {
|
||||
var buffer = value.payload
|
||||
// Decode and update the temperature value
|
||||
guard let humidity = RelativeHumidity(buffer: &buffer) else {
|
||||
logger.debug("Failed to decode humidity from buffer: \(buffer)")
|
||||
throw DecodingError()
|
||||
}
|
||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||
try await publishUpdates()
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
logger.trace("Handle Result error: \(error)")
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func subscribeToSensors() async throws {
|
||||
for sensor in sensors {
|
||||
_ = try await client.subscribe(to: [
|
||||
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
|
||||
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
|
||||
])
|
||||
logger.debug("Subscribed to sensor: \(sensor.location)")
|
||||
}
|
||||
}
|
||||
|
||||
private func unsubscribeToSensors() async {
|
||||
logger.trace("Begin unsubscribe to sensors.")
|
||||
guard client.isActive() else {
|
||||
logger.debug("Client is not active, skipping.")
|
||||
return
|
||||
}
|
||||
do {
|
||||
let topics = sensors.reduce(into: [String]()) { array, sensor in
|
||||
array.append(sensor.topics.temperature)
|
||||
array.append(sensor.topics.humidity)
|
||||
}
|
||||
try await client.unsubscribe(from: topics)
|
||||
logger.trace("Unsubscribed from sensors.")
|
||||
} catch {
|
||||
logger.trace("Unsubscribe error: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
private func publish(double: Double?, to topic: String) async throws {
|
||||
guard client.isActive() else { return }
|
||||
guard let double else { return }
|
||||
let rounded = round(double * 100) / 100
|
||||
logger.debug("Publishing \(rounded), to: \(topic)")
|
||||
try await client.publish(
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
|
||||
qos: .exactlyOnce,
|
||||
retain: true
|
||||
)
|
||||
}
|
||||
|
||||
private func publishUpdates() async throws {
|
||||
for sensor in sensors.filter(\.needsProcessed) {
|
||||
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
||||
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
||||
try sensors.hasProcessed(sensor)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// MARK: - Errors
|
||||
|
||||
struct DecodingError: Error {}
|
||||
struct MQTTClientNotConnected: Error {}
|
||||
struct NotFoundError: Error {}
|
||||
struct SensorExists: Error {}
|
||||
struct SensorCountError: Error {}
|
||||
struct SensorNotFoundError: Error {}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
@@ -319,14 +174,14 @@ private extension Array where Element == TemperatureAndHumiditySensor {
|
||||
with value: V
|
||||
) throws {
|
||||
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
|
||||
throw NotFoundError()
|
||||
throw SensorNotFoundError()
|
||||
}
|
||||
self[index][keyPath: keyPath] = value
|
||||
}
|
||||
|
||||
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
|
||||
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
|
||||
throw NotFoundError()
|
||||
throw SensorNotFoundError()
|
||||
}
|
||||
self[index].needsProcessed = false
|
||||
}
|
||||
|
||||
183
Sources/TopicDependencies/TopicListener.swift
Normal file
183
Sources/TopicDependencies/TopicListener.swift
Normal file
@@ -0,0 +1,183 @@
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import Foundation
|
||||
import MQTTNIO
|
||||
|
||||
/// A dependency that can generate an async stream of changes to the given topics.
|
||||
///
|
||||
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
|
||||
/// to generate the live dependency.
|
||||
@DependencyClient
|
||||
public struct TopicListener: Sendable {
|
||||
|
||||
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, MQTTListenResultError>>
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
|
||||
|
||||
/// Shutdown the listener stream.
|
||||
public var shutdown: @Sendable () -> Void
|
||||
|
||||
/// Create a new topic listener.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - listen: Generate an async stream of changes for the given topics.
|
||||
/// - shutdown: Shutdown the topic listener stream.
|
||||
public init(
|
||||
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
|
||||
shutdown: @Sendable @escaping () -> Void
|
||||
) {
|
||||
self._listen = listen
|
||||
self.shutdown = shutdown
|
||||
}
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topics: The topics to listen for changes to.
|
||||
/// - qos: The MQTTQoS for the subscription.
|
||||
public func listen(
|
||||
to topics: [String],
|
||||
qos: MQTTQoS = .atLeastOnce
|
||||
) async throws -> Stream {
|
||||
try await _listen(topics, qos)
|
||||
}
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topics: The topics to listen for changes to.
|
||||
/// - qos: The MQTTQoS for the subscription.
|
||||
public func listen(
|
||||
_ topics: String...,
|
||||
qos: MQTTQoS = .atLeastOnce
|
||||
) async throws -> Stream {
|
||||
try await listen(to: topics, qos: qos)
|
||||
}
|
||||
|
||||
/// Create the live implementation of the topic listener with the given MQTTClient.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - client: The MQTTClient to use.
|
||||
public static func live(client: MQTTClient) -> Self {
|
||||
let listener = MQTTTopicListener(client: client)
|
||||
return .init(
|
||||
listen: { try await listener.listen($0, $1) },
|
||||
shutdown: { listener.shutdown() }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
extension TopicListener: TestDependencyKey {
|
||||
public static var testValue: TopicListener { Self() }
|
||||
}
|
||||
|
||||
public extension DependencyValues {
|
||||
var topicListener: TopicListener {
|
||||
get { self[TopicListener.self] }
|
||||
set { self[TopicListener.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
private actor MQTTTopicListener {
|
||||
private let client: MQTTClient
|
||||
private let continuation: TopicListener.Stream.Continuation
|
||||
private let name: String
|
||||
let stream: TopicListener.Stream
|
||||
private var shuttingDown: Bool = false
|
||||
|
||||
init(
|
||||
client: MQTTClient
|
||||
) {
|
||||
let (stream, continuation) = TopicListener.Stream.makeStream()
|
||||
self.client = client
|
||||
self.continuation = continuation
|
||||
self.name = UUID().uuidString
|
||||
self.stream = stream
|
||||
}
|
||||
|
||||
deinit {
|
||||
if !shuttingDown {
|
||||
let message = """
|
||||
Shutdown was not called on topic listener. This could lead to potential errors or
|
||||
the stream never ending.
|
||||
|
||||
Please ensure that you call shutdown on the listener.
|
||||
"""
|
||||
client.logger.warning("\(message)")
|
||||
continuation.finish()
|
||||
}
|
||||
client.removePublishListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
}
|
||||
|
||||
func listen(
|
||||
_ topics: [String],
|
||||
_ qos: MQTTQoS = .atLeastOnce
|
||||
) async throws(TopicListenerError) -> TopicListener.Stream {
|
||||
var sleepTimes = 0
|
||||
|
||||
while !client.isActive() {
|
||||
guard sleepTimes < 10 else {
|
||||
throw .connectionTimeout
|
||||
}
|
||||
try? await Task.sleep(for: .milliseconds(100))
|
||||
sleepTimes += 1
|
||||
}
|
||||
|
||||
client.logger.trace("Client is active, begin subscribing to topics.")
|
||||
|
||||
let subscription = try? await client.subscribe(to: topics.map {
|
||||
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
|
||||
})
|
||||
|
||||
guard subscription != nil else {
|
||||
client.logger.error("Error subscribing to topics: \(topics)")
|
||||
throw .failedToSubscribe
|
||||
}
|
||||
|
||||
client.logger.trace("Done subscribing, begin listening to topics.")
|
||||
|
||||
client.addPublishListener(named: name) { result in
|
||||
switch result {
|
||||
case let .failure(error):
|
||||
self.client.logger.error("Received error while listening: \(error)")
|
||||
self.continuation.yield(.failure(.init(error)))
|
||||
case let .success(publishInfo):
|
||||
if topics.contains(publishInfo.topicName) {
|
||||
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
|
||||
self.continuation.yield(.success(publishInfo))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
private func setIsShuttingDown() {
|
||||
shuttingDown = true
|
||||
}
|
||||
|
||||
nonisolated func shutdown() {
|
||||
client.logger.trace("Closing topic listener...")
|
||||
continuation.finish()
|
||||
client.removePublishListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
Task { await self.setIsShuttingDown() }
|
||||
}
|
||||
}
|
||||
|
||||
public enum TopicListenerError: Error {
|
||||
case connectionTimeout
|
||||
case failedToSubscribe
|
||||
}
|
||||
|
||||
public struct MQTTListenResultError: Error {
|
||||
let underlyingError: any Error
|
||||
|
||||
init(_ underlyingError: any Error) {
|
||||
self.underlyingError = underlyingError
|
||||
}
|
||||
}
|
||||
117
Sources/TopicDependencies/TopicPublisher.swift
Normal file
117
Sources/TopicDependencies/TopicPublisher.swift
Normal file
@@ -0,0 +1,117 @@
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||
///
|
||||
/// - Note: This dependency only conforms to `TestDependencyKey` because it
|
||||
/// requires an active `MQTTClient` to generate the live dependency.
|
||||
@DependencyClient
|
||||
public struct TopicPublisher: Sendable {
|
||||
|
||||
private var _publish: @Sendable (PublishRequest) async throws -> Void
|
||||
|
||||
/// Create a new topic publisher.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - publish: Handle the publish request.
|
||||
public init(
|
||||
publish: @Sendable @escaping (PublishRequest) async throws -> Void
|
||||
) {
|
||||
self._publish = publish
|
||||
}
|
||||
|
||||
/// Publish a new value to the given topic.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topicName: The topic to publish the new value to.
|
||||
/// - payload: The value to publish.
|
||||
/// - qos: The MQTTQoS.
|
||||
/// - retain: The retain flag.
|
||||
public func publish(
|
||||
to topicName: String,
|
||||
payload: ByteBuffer,
|
||||
qos: MQTTQoS,
|
||||
retain: Bool = false
|
||||
) async throws {
|
||||
try await _publish(.init(
|
||||
topicName: topicName,
|
||||
payload: payload,
|
||||
qos: qos,
|
||||
retain: retain
|
||||
))
|
||||
}
|
||||
|
||||
/// Create the live topic publisher with the given `MQTTClient`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - client: The mqtt broker client to use.
|
||||
public static func live(client: MQTTClient) -> Self {
|
||||
.init(
|
||||
publish: { request in
|
||||
guard client.isActive() else {
|
||||
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
|
||||
return
|
||||
}
|
||||
client.logger.trace("Begin publishing to topic: \(request.topicName)")
|
||||
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
|
||||
try await client.publish(
|
||||
to: request.topicName,
|
||||
payload: request.payload,
|
||||
qos: request.qos,
|
||||
retain: request.retain
|
||||
)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/// Represents the parameters required to publish a new value to the
|
||||
/// MQTT broker.
|
||||
public struct PublishRequest: Equatable, Sendable {
|
||||
|
||||
/// The topic to publish the new value to.
|
||||
public let topicName: String
|
||||
|
||||
/// The value to publish.
|
||||
public let payload: ByteBuffer
|
||||
|
||||
/// The qos of the request.
|
||||
public let qos: MQTTQoS
|
||||
|
||||
/// The retain flag for the request.
|
||||
public let retain: Bool
|
||||
|
||||
/// Create a new publish request.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topicName: The topic to publish to.
|
||||
/// - payload: The value to publish.
|
||||
/// - qos: The qos of the request.
|
||||
/// - retain: The retain flag of the request.
|
||||
public init(
|
||||
topicName: String,
|
||||
payload: ByteBuffer,
|
||||
qos: MQTTQoS,
|
||||
retain: Bool
|
||||
) {
|
||||
self.topicName = topicName
|
||||
self.payload = payload
|
||||
self.qos = qos
|
||||
self.retain = retain
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension TopicPublisher: TestDependencyKey {
|
||||
public static var testValue: TopicPublisher { Self() }
|
||||
}
|
||||
|
||||
public extension DependencyValues {
|
||||
|
||||
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||
var topicPublisher: TopicPublisher {
|
||||
get { self[TopicPublisher.self] }
|
||||
set { self[TopicPublisher.self] = newValue }
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,19 @@
|
||||
import Dependencies
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionManager
|
||||
import MQTTConnectionService
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
import SensorsService
|
||||
import ServiceLifecycle
|
||||
import TopicDependencies
|
||||
|
||||
@main
|
||||
struct Application {
|
||||
|
||||
/// The main entry point of the application.
|
||||
static func main() async throws {
|
||||
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
@@ -31,23 +35,37 @@ struct Application {
|
||||
logger: logger
|
||||
)
|
||||
|
||||
let mqttConnection = MQTTConnectionService(client: mqtt)
|
||||
let sensors = SensorsService(
|
||||
client: mqtt,
|
||||
events: { mqttConnection.events },
|
||||
sensors: .live
|
||||
)
|
||||
do {
|
||||
try await withDependencies {
|
||||
$0.psychrometricClient = .liveValue
|
||||
$0.topicListener = .live(client: mqtt)
|
||||
$0.topicPublisher = .live(client: mqtt)
|
||||
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
|
||||
} operation: {
|
||||
let mqttConnection = MQTTConnectionService(logger: logger)
|
||||
let sensors = SensorsService(sensors: .live, logger: logger)
|
||||
|
||||
let serviceGroup = ServiceGroup(
|
||||
services: [
|
||||
mqttConnection,
|
||||
sensors
|
||||
],
|
||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||
logger: logger
|
||||
)
|
||||
var serviceGroupConfiguration = ServiceGroupConfiguration(
|
||||
services: [
|
||||
mqttConnection,
|
||||
sensors
|
||||
],
|
||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||
logger: logger
|
||||
)
|
||||
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
|
||||
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
|
||||
|
||||
try await serviceGroup.run()
|
||||
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
|
||||
|
||||
try await serviceGroup.run()
|
||||
}
|
||||
|
||||
try await mqtt.shutdown()
|
||||
try await eventloopGroup.shutdownGracefully()
|
||||
} catch {
|
||||
try await eventloopGroup.shutdownGracefully()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import Combine
|
||||
import Logging
|
||||
import Models
|
||||
@testable import MQTTConnectionService
|
||||
@testable import MQTTConnectionManager
|
||||
import MQTTConnectionService
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import ServiceLifecycle
|
||||
@@ -13,22 +13,59 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "AsyncClientTests")
|
||||
var logger = Logger(label: "MQTTConnectionServiceTests")
|
||||
logger.logLevel = .trace
|
||||
return logger
|
||||
}()
|
||||
|
||||
func testGracefulShutdownWorks() async throws {
|
||||
try await testGracefulShutdown { trigger in
|
||||
let client = createClient(identifier: "testGracefulShutdown")
|
||||
let service = MQTTConnectionService(client: client)
|
||||
try await service.run()
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
XCTAssert(client.isActive())
|
||||
trigger.triggerGracefulShutdown()
|
||||
// try await Task.sleep(for: .seconds(2))
|
||||
// XCTAssertFalse(client.isActive())
|
||||
// func testGracefulShutdownWorks() async throws {
|
||||
// let client = createClient(identifier: "testGracefulShutdown")
|
||||
// let service = MQTTConnectionService(client: client)
|
||||
// await service.connect()
|
||||
// try await Task.sleep(for: .seconds(1))
|
||||
// XCTAssert(client.isActive())
|
||||
// service.shutdown()
|
||||
// XCTAssertFalse(client.isActive())
|
||||
// }
|
||||
|
||||
func testWhatHappensIfConnectIsCalledMultipleTimes() async throws {
|
||||
let client = createClient(identifier: "testWhatHappensIfConnectIsCalledMultipleTimes")
|
||||
let manager = MQTTConnectionManager.live(client: client)
|
||||
try await manager.connect()
|
||||
try await manager.connect()
|
||||
}
|
||||
|
||||
// TODO: Move to integration tests.
|
||||
func testMQTTConnectionStream() async throws {
|
||||
let client = createClient(identifier: "testNonManagedStream")
|
||||
let manager = MQTTConnectionManager.live(
|
||||
client: client,
|
||||
logger: Self.logger,
|
||||
alwaysReconnect: false
|
||||
)
|
||||
let stream = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||
var events = [MQTTConnectionManager.Event]()
|
||||
|
||||
_ = try await manager.connect()
|
||||
|
||||
Task {
|
||||
while !client.isActive() {
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
}
|
||||
try await Task.sleep(for: .milliseconds(200))
|
||||
manager.shutdown()
|
||||
try await client.disconnect()
|
||||
try await Task.sleep(for: .milliseconds(200))
|
||||
try await client.shutdown()
|
||||
try await Task.sleep(for: .milliseconds(200))
|
||||
stream.stop()
|
||||
}
|
||||
|
||||
for await event in stream.removeDuplicates() {
|
||||
events.append(event)
|
||||
}
|
||||
|
||||
XCTAssertEqual(events, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||
}
|
||||
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
@@ -58,65 +95,4 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
||||
)
|
||||
}
|
||||
|
||||
func testEventStream() async throws {
|
||||
var connection: ConnectionStream? = ConnectionStream()
|
||||
|
||||
let task = Task {
|
||||
guard let events = connection?.events else { return }
|
||||
print("before loop")
|
||||
for await event in events {
|
||||
print("\(event)")
|
||||
}
|
||||
print("after loop")
|
||||
}
|
||||
|
||||
let ending = Task {
|
||||
try await Task.sleep(for: .seconds(2))
|
||||
connection = nil
|
||||
}
|
||||
|
||||
connection?.start()
|
||||
try await ending.value
|
||||
task.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ConnectionStream {
|
||||
|
||||
enum Event {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
let events: AsyncStream<Event>
|
||||
private let continuation: AsyncStream<Event>.Continuation
|
||||
private var cancellable: AnyCancellable?
|
||||
|
||||
init() {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
print("connection stream is gone.")
|
||||
stop()
|
||||
}
|
||||
|
||||
func start() {
|
||||
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
||||
.autoconnect()
|
||||
.sink { [weak self] _ in
|
||||
print("will send event.")
|
||||
self?.continuation.yield(.connected)
|
||||
}
|
||||
}
|
||||
|
||||
func stop() {
|
||||
continuation.yield(.shuttingDown)
|
||||
cancellable = nil
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
@testable import SensorsService
|
||||
import TopicDependencies
|
||||
import XCTest
|
||||
|
||||
final class SensorsClientTests: XCTestCase {
|
||||
@@ -12,7 +13,7 @@ final class SensorsClientTests: XCTestCase {
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "AsyncClientTests")
|
||||
var logger = Logger(label: "SensorsClientTests")
|
||||
logger.logLevel = .debug
|
||||
return logger
|
||||
}()
|
||||
@@ -25,42 +26,28 @@ final class SensorsClientTests: XCTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
// func createClient(identifier: String) -> SensorsClient {
|
||||
// let envVars = EnvVars(
|
||||
// appEnv: .testing,
|
||||
// host: Self.hostname,
|
||||
// port: "1883",
|
||||
// identifier: identifier,
|
||||
// userName: nil,
|
||||
// password: nil
|
||||
// )
|
||||
// return .init(envVars: envVars, logger: Self.logger)
|
||||
// }
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
let envVars = EnvVars(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
let config = MQTTClient.Configuration(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
return .init(
|
||||
host: Self.hostname,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
||||
logger: Self.logger,
|
||||
configuration: config
|
||||
func testWhatHappensIfClientDisconnectsWhileListening() async throws {
|
||||
let client = createClient(identifier: "testWhatHappensIfClientDisconnectsWhileListening")
|
||||
let listener = TopicListener.live(client: client)
|
||||
try await client.connect()
|
||||
|
||||
let stream = try await listener.listen("/some/topic")
|
||||
|
||||
// try await Task.sleep(for: .seconds(1))
|
||||
// try await client.disconnect()
|
||||
//
|
||||
// try await client.connect()
|
||||
// try await Task.sleep(for: .seconds(1))
|
||||
try await client.publish(
|
||||
to: "/some/topic",
|
||||
payload: ByteBufferAllocator().buffer(string: "Foo"),
|
||||
qos: .atLeastOnce,
|
||||
retain: true
|
||||
)
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
|
||||
listener.shutdown()
|
||||
try await client.shutdown()
|
||||
}
|
||||
|
||||
// func testConnectAndShutdown() async throws {
|
||||
@@ -132,52 +119,52 @@ final class SensorsClientTests: XCTestCase {
|
||||
// try await mqtt.shutdown()
|
||||
// }
|
||||
|
||||
func testCapturingSensorClient() async throws {
|
||||
class CapturedValues {
|
||||
var values = [(value: Double, topic: String)]()
|
||||
var didShutdown = false
|
||||
|
||||
init() {}
|
||||
}
|
||||
|
||||
let capturedValues = CapturedValues()
|
||||
|
||||
try await withDependencies {
|
||||
$0.sensorsClient = .testing(
|
||||
yielding: [
|
||||
(value: 76, to: "not-listening"),
|
||||
(value: 75, to: "test")
|
||||
]
|
||||
) { value, topic in
|
||||
capturedValues.values.append((value, topic))
|
||||
} captureShutdownEvent: {
|
||||
capturedValues.didShutdown = $0
|
||||
}
|
||||
} operation: {
|
||||
@Dependency(\.sensorsClient) var client
|
||||
let stream = try await client.listen(to: ["test"])
|
||||
|
||||
for await value in stream {
|
||||
var buffer = value.payload
|
||||
guard let double = Double(buffer: &buffer) else {
|
||||
XCTFail("Failed to decode double")
|
||||
return
|
||||
}
|
||||
|
||||
XCTAssertEqual(double, 75)
|
||||
XCTAssertEqual(value.topicName, "test")
|
||||
try await client.publish(26, to: "publish")
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
client.shutdown()
|
||||
}
|
||||
|
||||
XCTAssertEqual(capturedValues.values.count, 1)
|
||||
XCTAssertEqual(capturedValues.values.first?.value, 26)
|
||||
XCTAssertEqual(capturedValues.values.first?.topic, "publish")
|
||||
XCTAssertTrue(capturedValues.didShutdown)
|
||||
}
|
||||
}
|
||||
|
||||
// func testCapturingSensorClient() async throws {
|
||||
// class CapturedValues {
|
||||
// var values = [(value: Double, topic: String)]()
|
||||
// var didShutdown = false
|
||||
//
|
||||
// init() {}
|
||||
// }
|
||||
//
|
||||
// let capturedValues = CapturedValues()
|
||||
//
|
||||
// try await withDependencies {
|
||||
// $0.sensorsClient = .testing(
|
||||
// yielding: [
|
||||
// (value: 76, to: "not-listening"),
|
||||
// (value: 75, to: "test")
|
||||
// ]
|
||||
// ) { value, topic in
|
||||
// capturedValues.values.append((value, topic))
|
||||
// } captureShutdownEvent: {
|
||||
// capturedValues.didShutdown = $0
|
||||
// }
|
||||
// } operation: {
|
||||
// @Dependency(\.sensorsClient) var client
|
||||
// let stream = try await client.listen(to: ["test"])
|
||||
//
|
||||
// for await result in stream {
|
||||
// var buffer = result.buffer
|
||||
// guard let double = Double(buffer: &buffer) else {
|
||||
// XCTFail("Failed to decode double")
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// XCTAssertEqual(double, 75)
|
||||
// XCTAssertEqual(result.topic, "test")
|
||||
// try await client.publish(26, to: "publish")
|
||||
// try await Task.sleep(for: .milliseconds(100))
|
||||
// client.shutdown()
|
||||
// }
|
||||
//
|
||||
// XCTAssertEqual(capturedValues.values.count, 1)
|
||||
// XCTAssertEqual(capturedValues.values.first?.value, 26)
|
||||
// XCTAssertEqual(capturedValues.values.first?.topic, "publish")
|
||||
// XCTAssertTrue(capturedValues.didShutdown)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func testSensorCapturesPublishedState() async throws {
|
||||
// let client = createClient(identifier: "testSensorCapturesPublishedState")
|
||||
// let mqtt = client.client
|
||||
@@ -234,10 +221,47 @@ final class SensorsClientTests: XCTestCase {
|
||||
//
|
||||
// await client.shutdown()
|
||||
// }
|
||||
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
let envVars = EnvVars(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
let config = MQTTClient.Configuration(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
return .init(
|
||||
host: Self.hostname,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
||||
logger: Self.logger,
|
||||
configuration: config
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Helpers for tests.
|
||||
|
||||
extension AsyncStream {
|
||||
func first() async -> Element {
|
||||
var first: Element
|
||||
for await value in self {
|
||||
first = value
|
||||
}
|
||||
return first
|
||||
}
|
||||
}
|
||||
|
||||
class PublishInfoContainer {
|
||||
private(set) var info: [MQTTPublishInfo]
|
||||
private var topicFilters: [String]?
|
||||
@@ -258,41 +282,35 @@ class PublishInfoContainer {
|
||||
}
|
||||
}
|
||||
|
||||
extension SensorsClient {
|
||||
|
||||
static func testing(
|
||||
yielding: [(value: Double, to: String)],
|
||||
capturePublishedValues: @escaping (Double, String) -> Void,
|
||||
captureShutdownEvent: @escaping (Bool) -> Void
|
||||
) -> Self {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self)
|
||||
let logger = Logger(label: "\(Self.self).testing")
|
||||
|
||||
return .init(
|
||||
listen: { topics in
|
||||
for (value, topic) in yielding where topics.contains(topic) {
|
||||
continuation.yield(
|
||||
MQTTPublishInfo(
|
||||
qos: .atLeastOnce,
|
||||
retain: true,
|
||||
topicName: topic,
|
||||
payload: ByteBuffer(string: "\(value)"),
|
||||
properties: MQTTProperties()
|
||||
)
|
||||
)
|
||||
}
|
||||
return stream
|
||||
},
|
||||
logger: logger,
|
||||
publish: { value, topic in
|
||||
capturePublishedValues(value, topic)
|
||||
},
|
||||
shutdown: {
|
||||
captureShutdownEvent(true)
|
||||
continuation.finish()
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
// extension SensorsClient {
|
||||
//
|
||||
// static func testing(
|
||||
// yielding: [(value: Double, to: String)],
|
||||
// capturePublishedValues: @escaping (Double, String) -> Void,
|
||||
// captureShutdownEvent: @escaping (Bool) -> Void
|
||||
// ) -> Self {
|
||||
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
|
||||
// let logger = Logger(label: "\(Self.self).testing")
|
||||
//
|
||||
// return .init(
|
||||
// listen: { topics in
|
||||
// for (value, topic) in yielding where topics.contains(topic) {
|
||||
// continuation.yield(
|
||||
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
|
||||
// )
|
||||
// }
|
||||
// return stream
|
||||
// },
|
||||
// logger: logger,
|
||||
// publish: { value, topic in
|
||||
// capturePublishedValues(value, topic)
|
||||
// },
|
||||
// shutdown: {
|
||||
// captureShutdownEvent(true)
|
||||
// continuation.finish()
|
||||
// }
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
|
||||
struct TopicNotFoundError: Error {}
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
import XCTest
|
||||
import class Foundation.Bundle
|
||||
|
||||
//final class dewPoint_controllerTests: XCTestCase {
|
||||
// func testExample() throws {
|
||||
// // This is an example of a functional test case.
|
||||
// // Use XCTAssert and related functions to verify your tests produce the correct
|
||||
// // results.
|
||||
//
|
||||
// // Some of the APIs that we use below are available in macOS 10.13 and above.
|
||||
// guard #available(macOS 10.13, *) else {
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Mac Catalyst won't have `Process`, but it is supported for executables.
|
||||
// #if !targetEnvironment(macCatalyst)
|
||||
//
|
||||
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
|
||||
//
|
||||
// let process = Process()
|
||||
// process.executableURL = fooBinary
|
||||
//
|
||||
// let pipe = Pipe()
|
||||
// process.standardOutput = pipe
|
||||
//
|
||||
// try process.run()
|
||||
// process.waitUntilExit()
|
||||
//
|
||||
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
|
||||
// let output = String(data: data, encoding: .utf8)
|
||||
//
|
||||
// XCTAssertEqual(output, "Hello, world!\n")
|
||||
// #endif
|
||||
// }
|
||||
//
|
||||
// /// Returns path to the built products directory.
|
||||
// var productsDirectory: URL {
|
||||
// #if os(macOS)
|
||||
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
|
||||
// return bundle.bundleURL.deletingLastPathComponent()
|
||||
// }
|
||||
// fatalError("couldn't find the products directory")
|
||||
// #else
|
||||
// return Bundle.main.bundleURL
|
||||
// #endif
|
||||
// }
|
||||
//}
|
||||
@@ -14,7 +14,7 @@ services:
|
||||
depends_on:
|
||||
- mosquitto
|
||||
environment:
|
||||
- MOSQUITTO_SERVER=mosquitto
|
||||
- MQTT_HOST=mosquitto
|
||||
|
||||
test:
|
||||
build:
|
||||
|
||||
Reference in New Issue
Block a user