Compare commits
10 Commits
f68ac528e4
...
e7a849b003
| Author | SHA1 | Date | |
|---|---|---|---|
|
e7a849b003
|
|||
|
bd2a798320
|
|||
|
b8992b89b6
|
|||
|
efd9907b4a
|
|||
|
fbbd65f7ae
|
|||
|
8067331ff8
|
|||
|
b6db9b5322
|
|||
|
bf1126b06a
|
|||
|
ef552fb8bc
|
|||
|
1e62d7aac0
|
@@ -8,3 +8,4 @@
|
|||||||
--wrapconditions after-first
|
--wrapconditions after-first
|
||||||
--typeblanklines preserve
|
--typeblanklines preserve
|
||||||
--commas inline
|
--commas inline
|
||||||
|
--stripunusedargs closure-only
|
||||||
|
|||||||
@@ -11,4 +11,4 @@ RUN swift build --enable-test-discovery -c release -Xswiftc -g
|
|||||||
FROM swift:5.10-slim
|
FROM swift:5.10-slim
|
||||||
WORKDIR /run
|
WORKDIR /run
|
||||||
COPY --from=build /build/.build/release/dewPoint-controller /run
|
COPY --from=build /build/.build/release/dewPoint-controller /run
|
||||||
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]
|
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]
|
||||||
|
|||||||
2
Makefile
2
Makefile
@@ -23,7 +23,7 @@ stop-mosquitto:
|
|||||||
@docker-compose rm -f mosquitto || true
|
@docker-compose rm -f mosquitto || true
|
||||||
|
|
||||||
test-docker:
|
test-docker:
|
||||||
@docker-compose run --remove-orphans -i --rm test
|
@docker-compose run --build --remove-orphans -i --rm test
|
||||||
@docker-compose kill mosquitto-test
|
@docker-compose kill mosquitto-test
|
||||||
@docker-compose rm -f
|
@docker-compose rm -f
|
||||||
|
|
||||||
|
|||||||
@@ -8,17 +8,20 @@ let swiftSettings: [SwiftSetting] = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
let package = Package(
|
let package = Package(
|
||||||
name: "dewPoint-controller",
|
name: "dewpoint-controller",
|
||||||
platforms: [
|
platforms: [
|
||||||
.macOS(.v14)
|
.macOS(.v14)
|
||||||
],
|
],
|
||||||
products: [
|
products: [
|
||||||
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
|
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
|
||||||
.library(name: "Models", targets: ["Models"]),
|
.library(name: "Models", targets: ["Models"]),
|
||||||
|
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
|
||||||
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
|
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
|
||||||
.library(name: "SensorsService", targets: ["SensorsService"])
|
.library(name: "SensorsService", targets: ["SensorsService"]),
|
||||||
|
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
|
||||||
],
|
],
|
||||||
dependencies: [
|
dependencies: [
|
||||||
|
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
|
||||||
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
|
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
|
||||||
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
|
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
|
||||||
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
|
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
|
||||||
@@ -28,20 +31,18 @@ let package = Package(
|
|||||||
],
|
],
|
||||||
targets: [
|
targets: [
|
||||||
.executableTarget(
|
.executableTarget(
|
||||||
name: "dewPoint-controller",
|
name: "dewpoint-controller",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
"Models",
|
"Models",
|
||||||
|
"MQTTConnectionManager",
|
||||||
"MQTTConnectionService",
|
"MQTTConnectionService",
|
||||||
"SensorsService",
|
"SensorsService",
|
||||||
|
"TopicDependencies",
|
||||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||||
.product(name: "NIO", package: "swift-nio"),
|
.product(name: "NIO", package: "swift-nio"),
|
||||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
.testTarget(
|
|
||||||
name: "dewPoint-controllerTests",
|
|
||||||
dependencies: ["dewPoint-controller"]
|
|
||||||
),
|
|
||||||
.target(
|
.target(
|
||||||
name: "Models",
|
name: "Models",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
@@ -50,11 +51,21 @@ let package = Package(
|
|||||||
],
|
],
|
||||||
swiftSettings: swiftSettings
|
swiftSettings: swiftSettings
|
||||||
),
|
),
|
||||||
|
.target(
|
||||||
|
name: "MQTTConnectionManager",
|
||||||
|
dependencies: [
|
||||||
|
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
|
||||||
|
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||||
|
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||||
|
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||||
|
],
|
||||||
|
swiftSettings: swiftSettings
|
||||||
|
),
|
||||||
.target(
|
.target(
|
||||||
name: "MQTTConnectionService",
|
name: "MQTTConnectionService",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
"Models",
|
"Models",
|
||||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
"MQTTConnectionManager",
|
||||||
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
||||||
],
|
],
|
||||||
swiftSettings: swiftSettings
|
swiftSettings: swiftSettings
|
||||||
@@ -63,6 +74,7 @@ let package = Package(
|
|||||||
name: "MQTTConnectionServiceTests",
|
name: "MQTTConnectionServiceTests",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
"MQTTConnectionService",
|
"MQTTConnectionService",
|
||||||
|
"MQTTConnectionManager",
|
||||||
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
|
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
@@ -71,6 +83,7 @@ let package = Package(
|
|||||||
dependencies: [
|
dependencies: [
|
||||||
"Models",
|
"Models",
|
||||||
"MQTTConnectionService",
|
"MQTTConnectionService",
|
||||||
|
"TopicDependencies",
|
||||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||||
@@ -84,6 +97,15 @@ let package = Package(
|
|||||||
"SensorsService",
|
"SensorsService",
|
||||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||||
]
|
]
|
||||||
|
),
|
||||||
|
.target(
|
||||||
|
name: "TopicDependencies",
|
||||||
|
dependencies: [
|
||||||
|
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||||
|
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||||
|
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||||
|
],
|
||||||
|
swiftSettings: swiftSettings
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|||||||
199
Sources/MQTTConnectionManager/Live.swift
Normal file
199
Sources/MQTTConnectionManager/Live.swift
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
import AsyncAlgorithms
|
||||||
|
import Dependencies
|
||||||
|
import DependenciesMacros
|
||||||
|
import Foundation
|
||||||
|
import Logging
|
||||||
|
import MQTTNIO
|
||||||
|
import NIO
|
||||||
|
|
||||||
|
public extension DependencyValues {
|
||||||
|
|
||||||
|
/// A dependency that is responsible for managing the connection to
|
||||||
|
/// an MQTT broker.
|
||||||
|
var mqttConnectionManager: MQTTConnectionManager {
|
||||||
|
get { self[MQTTConnectionManager.self] }
|
||||||
|
set { self[MQTTConnectionManager.self] = newValue }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents the interface needed for the ``MQTTConnectionService``.
|
||||||
|
///
|
||||||
|
/// See ``MQTTConnectionManagerLive`` module for live implementation.
|
||||||
|
@DependencyClient
|
||||||
|
public struct MQTTConnectionManager: Sendable {
|
||||||
|
|
||||||
|
/// Connect to the MQTT broker.
|
||||||
|
public var connect: @Sendable () async throws -> Void
|
||||||
|
|
||||||
|
/// Shutdown the connection to the MQTT broker.
|
||||||
|
///
|
||||||
|
/// - Note: You should cancel any tasks that are listening to the connection stream first.
|
||||||
|
public var shutdown: @Sendable () -> Void
|
||||||
|
|
||||||
|
/// Create a stream of connection events.
|
||||||
|
public var stream: @Sendable () throws -> AsyncStream<Event>
|
||||||
|
|
||||||
|
/// Represents connection events that clients can listen for and
|
||||||
|
/// react accordingly.
|
||||||
|
public enum Event: Sendable {
|
||||||
|
case connected
|
||||||
|
case disconnected
|
||||||
|
case shuttingDown
|
||||||
|
}
|
||||||
|
|
||||||
|
public static func live(
|
||||||
|
client: MQTTClient,
|
||||||
|
cleanSession: Bool = false,
|
||||||
|
logger: Logger? = nil,
|
||||||
|
alwaysReconnect: Bool = true
|
||||||
|
) -> Self {
|
||||||
|
let manager = ConnectionManager(
|
||||||
|
client: client,
|
||||||
|
logger: logger,
|
||||||
|
alwaysReconnect: alwaysReconnect
|
||||||
|
)
|
||||||
|
return .init {
|
||||||
|
try await manager.connect(cleanSession: cleanSession)
|
||||||
|
} shutdown: {
|
||||||
|
manager.shutdown()
|
||||||
|
} stream: {
|
||||||
|
MQTTConnectionStream(client: client, logger: logger)
|
||||||
|
.start()
|
||||||
|
.removeDuplicates()
|
||||||
|
.eraseToStream()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension MQTTConnectionManager: TestDependencyKey {
|
||||||
|
public static var testValue: MQTTConnectionManager {
|
||||||
|
Self()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Helpers
|
||||||
|
|
||||||
|
final class MQTTConnectionStream: AsyncSequence, Sendable {
|
||||||
|
|
||||||
|
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
|
||||||
|
typealias Element = MQTTConnectionManager.Event
|
||||||
|
|
||||||
|
private let client: MQTTClient
|
||||||
|
private let continuation: AsyncStream<Element>.Continuation
|
||||||
|
private let logger: Logger?
|
||||||
|
private let name: String
|
||||||
|
private let stream: AsyncStream<Element>
|
||||||
|
|
||||||
|
init(client: MQTTClient, logger: Logger?) {
|
||||||
|
let (stream, continuation) = AsyncStream<Element>.makeStream()
|
||||||
|
self.client = client
|
||||||
|
self.continuation = continuation
|
||||||
|
self.logger = logger
|
||||||
|
self.name = UUID().uuidString
|
||||||
|
self.stream = stream
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit { stop() }
|
||||||
|
|
||||||
|
func start(
|
||||||
|
isolation: isolated (any Actor)? = #isolation
|
||||||
|
) -> AsyncStream<Element> {
|
||||||
|
// Check if the client is active and yield the result.
|
||||||
|
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||||
|
|
||||||
|
// Register listener on the client for when the connection
|
||||||
|
// closes.
|
||||||
|
client.addCloseListener(named: name) { _ in
|
||||||
|
self.logger?.trace("Client has disconnected.")
|
||||||
|
self.continuation.yield(.disconnected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register listener on the client for when the client
|
||||||
|
// is shutdown.
|
||||||
|
client.addShutdownListener(named: name) { _ in
|
||||||
|
self.logger?.trace("Client is shutting down.")
|
||||||
|
self.continuation.yield(.shuttingDown)
|
||||||
|
self.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func stop() {
|
||||||
|
client.removeCloseListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
continuation.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
public __consuming func makeAsyncIterator() -> AsyncIterator {
|
||||||
|
start().makeAsyncIterator()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
actor ConnectionManager {
|
||||||
|
private let client: MQTTClient
|
||||||
|
private let logger: Logger?
|
||||||
|
private let name: String
|
||||||
|
private let shouldReconnect: Bool
|
||||||
|
private var hasConnected: Bool = false
|
||||||
|
|
||||||
|
init(
|
||||||
|
client: MQTTClient,
|
||||||
|
logger: Logger?,
|
||||||
|
alwaysReconnect: Bool
|
||||||
|
) {
|
||||||
|
self.client = client
|
||||||
|
self.logger = logger
|
||||||
|
self.name = UUID().uuidString
|
||||||
|
self.shouldReconnect = alwaysReconnect
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
// We should've already logged that we're shutting down if
|
||||||
|
// the manager was shutdown properly, so don't log it twice.
|
||||||
|
self.shutdown(withLogging: false)
|
||||||
|
}
|
||||||
|
|
||||||
|
private func setHasConnected() {
|
||||||
|
hasConnected = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func connect(
|
||||||
|
isolation: isolated (any Actor)? = #isolation,
|
||||||
|
cleanSession: Bool
|
||||||
|
) async throws {
|
||||||
|
guard !(await hasConnected) else { return }
|
||||||
|
do {
|
||||||
|
try await client.connect(cleanSession: cleanSession)
|
||||||
|
await setHasConnected()
|
||||||
|
|
||||||
|
client.addCloseListener(named: name) { [weak self] _ in
|
||||||
|
guard let `self` else { return }
|
||||||
|
self.logger?.debug("Connection closed.")
|
||||||
|
if self.shouldReconnect {
|
||||||
|
self.logger?.debug("Reconnecting...")
|
||||||
|
Task {
|
||||||
|
try await self.connect(cleanSession: cleanSession)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
client.addShutdownListener(named: name) { _ in
|
||||||
|
self.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch {
|
||||||
|
logger?.trace("Failed to connect: \(error)")
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nonisolated func shutdown(withLogging: Bool = true) {
|
||||||
|
if withLogging {
|
||||||
|
logger?.trace("Shutting down connection.")
|
||||||
|
}
|
||||||
|
client.removeCloseListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,151 +1,37 @@
|
|||||||
@preconcurrency import Foundation
|
import Dependencies
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
import MQTTNIO
|
import MQTTConnectionManager
|
||||||
import NIO
|
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
|
|
||||||
// TODO: This may not need to be an actor.
|
|
||||||
|
|
||||||
/// Manages the MQTT broker connection.
|
|
||||||
public actor MQTTConnectionService: Service {
|
public actor MQTTConnectionService: Service {
|
||||||
|
@Dependency(\.mqttConnectionManager) var manager
|
||||||
|
|
||||||
private let cleanSession: Bool
|
private nonisolated let logger: Logger?
|
||||||
public let client: MQTTClient
|
|
||||||
private let continuation: AsyncStream<Event>.Continuation
|
|
||||||
public nonisolated let events: AsyncStream<Event>
|
|
||||||
private let internalEventStream: ConnectionStream
|
|
||||||
nonisolated var logger: Logger { client.logger }
|
|
||||||
// private var shuttingDown = false
|
|
||||||
|
|
||||||
public init(
|
public init(
|
||||||
cleanSession: Bool = true,
|
logger: Logger? = nil
|
||||||
client: MQTTClient
|
|
||||||
) {
|
) {
|
||||||
self.cleanSession = cleanSession
|
self.logger = logger
|
||||||
self.client = client
|
|
||||||
self.internalEventStream = .init()
|
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
|
||||||
self.events = stream
|
|
||||||
self.continuation = continuation
|
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
/// The entry-point of the service which starts the connection
|
||||||
self.logger.debug("MQTTConnectionService is gone.")
|
/// to the MQTT broker and handles graceful shutdown of the
|
||||||
self.internalEventStream.stop()
|
/// connection.
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The entry-point of the service.
|
|
||||||
///
|
|
||||||
/// This method connects to the MQTT broker and manages the connection.
|
|
||||||
/// It will attempt to gracefully shutdown the connection upon receiving
|
|
||||||
/// `sigterm` signals.
|
|
||||||
public func run() async throws {
|
public func run() async throws {
|
||||||
await withGracefulShutdownHandler {
|
try await withGracefulShutdownHandler {
|
||||||
await withDiscardingTaskGroup { group in
|
try await manager.connect()
|
||||||
group.addTask { await self.connect() }
|
for await event in try manager.stream().cancelOnGracefulShutdown() {
|
||||||
group.addTask {
|
// We don't really need to do anything with the events, so just logging
|
||||||
await self.internalEventStream.start { self.client.isActive() }
|
// for now. But we need to iterate on an async stream for the service to
|
||||||
}
|
// continue to run and handle graceful shutdowns.
|
||||||
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() {
|
logger?.trace("Received connection event: \(event)")
|
||||||
if event == .shuttingDown {
|
|
||||||
self.shutdown()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
self.logger.trace("Sending connection event: \(event)")
|
|
||||||
self.continuation.yield(event)
|
|
||||||
}
|
|
||||||
group.cancelAll()
|
|
||||||
}
|
}
|
||||||
|
// when we reach here we are shutting down, so we shutdown
|
||||||
|
// the manager.
|
||||||
|
manager.shutdown()
|
||||||
} onGracefulShutdown: {
|
} onGracefulShutdown: {
|
||||||
self.logger.trace("Received graceful shutdown.")
|
self.logger?.trace("Received graceful shutdown.")
|
||||||
self.shutdown()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect() async {
|
|
||||||
do {
|
|
||||||
try await withThrowingDiscardingTaskGroup { group in
|
|
||||||
group.addTask {
|
|
||||||
try await self.client.connect(cleanSession: self.cleanSession)
|
|
||||||
}
|
|
||||||
client.addCloseListener(named: "\(Self.self)") { _ in
|
|
||||||
Task {
|
|
||||||
self.logger.debug("Connection closed.")
|
|
||||||
self.logger.debug("Reconnecting...")
|
|
||||||
await self.connect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.logger.debug("Connection successful.")
|
|
||||||
self.continuation.yield(.connected)
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
logger.trace("Failed to connect: \(error)")
|
|
||||||
continuation.yield(.disconnected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private nonisolated func shutdown() {
|
|
||||||
logger.debug("Begin shutting down MQTT broker connection.")
|
|
||||||
client.removeCloseListener(named: "\(Self.self)")
|
|
||||||
internalEventStream.stop()
|
|
||||||
_ = client.disconnect()
|
|
||||||
try? client.syncShutdownGracefully()
|
|
||||||
continuation.finish()
|
|
||||||
logger.info("MQTT broker connection closed.")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
extension MQTTConnectionService {
|
|
||||||
|
|
||||||
public enum Event: Sendable {
|
|
||||||
case connected
|
|
||||||
case disconnected
|
|
||||||
case shuttingDown
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This functionality can probably move into the connection service.
|
|
||||||
|
|
||||||
private final class ConnectionStream: Sendable {
|
|
||||||
|
|
||||||
// private var cancellable: AnyCancellable?
|
|
||||||
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
|
|
||||||
let events: AsyncStream<MQTTConnectionService.Event>
|
|
||||||
|
|
||||||
init() {
|
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
|
|
||||||
self.events = stream
|
|
||||||
self.continuation = continuation
|
|
||||||
}
|
|
||||||
|
|
||||||
deinit {
|
|
||||||
stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func start(isActive connectionIsActive: @escaping () -> Bool) async {
|
|
||||||
try? await Task.sleep(for: .seconds(1))
|
|
||||||
let event: MQTTConnectionService.Event = connectionIsActive()
|
|
||||||
? .connected
|
|
||||||
: .disconnected
|
|
||||||
|
|
||||||
continuation.yield(event)
|
|
||||||
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
|
||||||
// .autoconnect()
|
|
||||||
// .sink { [weak self] (_: Date) in
|
|
||||||
// let event: MQTTConnectionService.Event = connectionIsActive()
|
|
||||||
// ? .connected
|
|
||||||
// : .disconnected
|
|
||||||
//
|
|
||||||
// self?.continuation.yield(event)
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
func stop() {
|
|
||||||
continuation.yield(.shuttingDown)
|
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,10 +56,11 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
|||||||
public var dewPoint: DewPoint? {
|
public var dewPoint: DewPoint? {
|
||||||
get async {
|
get async {
|
||||||
guard let temperature = temperature,
|
guard let temperature = temperature,
|
||||||
let humidity = humidity
|
let humidity = humidity,
|
||||||
|
!temperature.value.isNaN,
|
||||||
|
!humidity.value.isNaN
|
||||||
else { return nil }
|
else { return nil }
|
||||||
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
|
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
|
||||||
// return .init(dryBulb: temperature, humidity: humidity)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,12 +68,13 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
|||||||
public var enthalpy: EnthalpyOf<MoistAir>? {
|
public var enthalpy: EnthalpyOf<MoistAir>? {
|
||||||
get async {
|
get async {
|
||||||
guard let temperature = temperature,
|
guard let temperature = temperature,
|
||||||
let humidity = humidity
|
let humidity = humidity,
|
||||||
|
!temperature.value.isNaN,
|
||||||
|
!humidity.value.isNaN
|
||||||
else { return nil }
|
else { return nil }
|
||||||
return try? await psychrometrics.enthalpy.moistAir(
|
return try? await psychrometrics.enthalpy.moistAir(
|
||||||
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
|
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
|
||||||
)
|
)
|
||||||
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,10 +131,10 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
|||||||
prefix = "\(prefix.dropLast())"
|
prefix = "\(prefix.dropLast())"
|
||||||
}
|
}
|
||||||
self.init(
|
self.init(
|
||||||
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state",
|
dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
|
||||||
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state",
|
enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
|
||||||
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state",
|
humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
|
||||||
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state"
|
temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
/// This allows values to only publish changes if they have changed since the
|
/// This allows values to only publish changes if they have changed since the
|
||||||
/// last time they were recieved.
|
/// last time they were recieved.
|
||||||
@propertyWrapper
|
@propertyWrapper
|
||||||
public struct TrackedChanges<Value> {
|
public struct TrackedChanges<Value: Sendable>: Sendable {
|
||||||
|
|
||||||
/// The current tracking state.
|
/// The current tracking state.
|
||||||
private var tracking: TrackingState
|
private var tracking: TrackingState
|
||||||
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
|
|||||||
private var value: Value
|
private var value: Value
|
||||||
|
|
||||||
/// Used to check if a new value is equal to an old value.
|
/// Used to check if a new value is equal to an old value.
|
||||||
private var isEqual: (Value, Value) -> Bool
|
private var isEqual: @Sendable (Value, Value) -> Bool
|
||||||
|
|
||||||
/// Access to the underlying property that we are wrapping.
|
/// Access to the underlying property that we are wrapping.
|
||||||
public var wrappedValue: Value {
|
public var wrappedValue: Value {
|
||||||
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
|
|||||||
public init(
|
public init(
|
||||||
wrappedValue: Value,
|
wrappedValue: Value,
|
||||||
needsProcessed: Bool = false,
|
needsProcessed: Bool = false,
|
||||||
isEqual: @escaping (Value, Value) -> Bool
|
isEqual: @escaping @Sendable (Value, Value) -> Bool
|
||||||
) {
|
) {
|
||||||
self.value = wrappedValue
|
self.value = wrappedValue
|
||||||
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
|
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
|
||||||
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
|
|||||||
wrappedValue: Value,
|
wrappedValue: Value,
|
||||||
needsProcessed: Bool = false
|
needsProcessed: Bool = false
|
||||||
) {
|
) {
|
||||||
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
|
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
|
||||||
|
$0 == $1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
|
|||||||
hasher.combine(needsProcessed)
|
hasher.combine(needsProcessed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension TrackedChanges: Sendable where Value: Sendable {}
|
|
||||||
|
|||||||
@@ -1,6 +1,3 @@
|
|||||||
import Logging
|
|
||||||
import Models
|
|
||||||
import MQTTNIO
|
|
||||||
import NIO
|
import NIO
|
||||||
import NIOFoundationCompat
|
import NIOFoundationCompat
|
||||||
import PsychrometricClient
|
import PsychrometricClient
|
||||||
|
|||||||
@@ -3,305 +3,160 @@ import DependenciesMacros
|
|||||||
import Foundation
|
import Foundation
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
import MQTTConnectionService
|
import MQTTNIO
|
||||||
@preconcurrency import MQTTNIO
|
|
||||||
import NIO
|
import NIO
|
||||||
import PsychrometricClient
|
import PsychrometricClient
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
|
import TopicDependencies
|
||||||
|
|
||||||
@DependencyClient
|
/// Service that is responsible for listening to changes of the temperature and humidity
|
||||||
public struct SensorsClient: Sendable {
|
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
|
||||||
|
/// the sensor location.
|
||||||
|
///
|
||||||
|
///
|
||||||
|
public actor SensorsService: Service {
|
||||||
|
|
||||||
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo>
|
@Dependency(\.topicListener) var topicListener
|
||||||
public var logger: Logger?
|
@Dependency(\.topicPublisher) var topicPublisher
|
||||||
public var publish: @Sendable (Double, String) async throws -> Void
|
|
||||||
public var shutdown: @Sendable () -> Void = {}
|
|
||||||
|
|
||||||
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> {
|
/// The logger to use for the service.
|
||||||
try await listen(topics)
|
private let logger: Logger?
|
||||||
}
|
|
||||||
|
|
||||||
public func publish(_ value: Double, to topic: String) async throws {
|
/// The sensors that we are listening for updates to, so
|
||||||
try await publish(value, topic)
|
/// that we can calculate the dew-point temperature and enthalpy
|
||||||
}
|
/// values to publish back to the MQTT broker.
|
||||||
}
|
var sensors: [TemperatureAndHumiditySensor]
|
||||||
|
|
||||||
extension SensorsClient: TestDependencyKey {
|
var topics: [String] {
|
||||||
public static var testValue: SensorsClient {
|
|
||||||
Self()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public extension DependencyValues {
|
|
||||||
var sensorsClient: SensorsClient {
|
|
||||||
get { self[SensorsClient.self] }
|
|
||||||
set { self[SensorsClient.self] = newValue }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public actor SensorsService2: Service {
|
|
||||||
|
|
||||||
@Dependency(\.sensorsClient) var client
|
|
||||||
|
|
||||||
private var sensors: [TemperatureAndHumiditySensor]
|
|
||||||
|
|
||||||
public init(
|
|
||||||
sensors: [TemperatureAndHumiditySensor]
|
|
||||||
) {
|
|
||||||
self.sensors = sensors
|
|
||||||
}
|
|
||||||
|
|
||||||
public func run() async throws {
|
|
||||||
guard sensors.count > 0 else {
|
|
||||||
throw SensorCountError()
|
|
||||||
}
|
|
||||||
|
|
||||||
let stream = try await client.listen(to: topics)
|
|
||||||
|
|
||||||
do {
|
|
||||||
try await withGracefulShutdownHandler {
|
|
||||||
try await withThrowingDiscardingTaskGroup { group in
|
|
||||||
for await result in stream.cancelOnGracefulShutdown() {
|
|
||||||
group.addTask { try await self.handleResult(result) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} onGracefulShutdown: {
|
|
||||||
Task {
|
|
||||||
await self.client.logger?.trace("Received graceful shutdown.")
|
|
||||||
try? await self.publishUpdates()
|
|
||||||
await self.client.shutdown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
client.logger?.trace("Error: \(error)")
|
|
||||||
client.shutdown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private var topics: [String] {
|
|
||||||
sensors.reduce(into: [String]()) { array, sensor in
|
sensors.reduce(into: [String]()) { array, sensor in
|
||||||
array.append(sensor.topics.temperature)
|
array.append(sensor.topics.temperature)
|
||||||
array.append(sensor.topics.humidity)
|
array.append(sensor.topics.humidity)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleResult(_ result: MQTTPublishInfo) async throws {
|
/// Create a new sensors service that listens to the passed in
|
||||||
let topic = result.topicName
|
/// sensors.
|
||||||
client.logger?.trace("Begin handling result for topic: \(topic)")
|
///
|
||||||
|
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - sensors: The sensors to listen for changes to.
|
||||||
|
/// - logger: An optional logger to use.
|
||||||
|
public init(
|
||||||
|
sensors: [TemperatureAndHumiditySensor],
|
||||||
|
logger: Logger? = nil
|
||||||
|
) {
|
||||||
|
self.sensors = sensors
|
||||||
|
self.logger = logger
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the service with graceful shutdown, which will attempt to publish
|
||||||
|
/// any pending changes to the MQTT broker, upon a shutdown signal.
|
||||||
|
public func run() async throws {
|
||||||
|
precondition(sensors.count > 0, "Sensors should not be empty.")
|
||||||
|
|
||||||
|
let stream = try await makeStream()
|
||||||
|
|
||||||
|
await withGracefulShutdownHandler {
|
||||||
|
await withDiscardingTaskGroup { group in
|
||||||
|
for await result in stream.cancelOnGracefulShutdown() {
|
||||||
|
logger?.trace("Received result for topic: \(result.topic)")
|
||||||
|
group.addTask { await self.handleResult(result) }
|
||||||
|
}
|
||||||
|
// group.cancelAll()
|
||||||
|
}
|
||||||
|
} onGracefulShutdown: {
|
||||||
|
Task {
|
||||||
|
self.logger?.trace("Received graceful shutdown.")
|
||||||
|
try? await self.publishUpdates()
|
||||||
|
await self.topicListener.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
|
||||||
|
try await topicListener.listen(to: topics)
|
||||||
|
// ignore errors, so that we continue to listen, but log them
|
||||||
|
// for debugging purposes.
|
||||||
|
.compactMap { result in
|
||||||
|
switch result {
|
||||||
|
case let .failure(error):
|
||||||
|
self.logger?.trace("Received error listening for sensors: \(error)")
|
||||||
|
return nil
|
||||||
|
case let .success(info):
|
||||||
|
return (info.payload, info.topicName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// ignore duplicate values, to prevent publishing dew-point and enthalpy
|
||||||
|
// changes to frequently.
|
||||||
|
.removeDuplicates { lhs, rhs in
|
||||||
|
lhs.buffer == rhs.buffer
|
||||||
|
&& lhs.topic == rhs.topic
|
||||||
|
}
|
||||||
|
.eraseToStream()
|
||||||
|
}
|
||||||
|
|
||||||
|
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
|
||||||
|
do {
|
||||||
|
let topic = result.topic
|
||||||
|
assert(topics.contains(topic))
|
||||||
|
logger?.trace("Begin handling result for topic: \(topic)")
|
||||||
|
|
||||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||||
var buffer = result.payload
|
var buffer = result.buffer
|
||||||
return V(buffer: &buffer)
|
return V(buffer: &buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
if topic.contains("temperature") {
|
if topic.contains("temperature") {
|
||||||
client.logger?.trace("Begin handling temperature result.")
|
logger?.trace("Begin handling temperature result.")
|
||||||
guard let temperature = decode(DryBulb.self) else {
|
guard let temperature = decode(DryBulb.self) else {
|
||||||
client.logger?.trace("Failed to decode temperature: \(result.payload)")
|
logger?.trace("Failed to decode temperature: \(result.buffer)")
|
||||||
throw DecodingError()
|
throw DecodingError()
|
||||||
}
|
}
|
||||||
client.logger?.trace("Decoded temperature: \(temperature)")
|
logger?.trace("Decoded temperature: \(temperature)")
|
||||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||||
|
|
||||||
} else if topic.contains("humidity") {
|
} else if topic.contains("humidity") {
|
||||||
client.logger?.trace("Begin handling humidity result.")
|
logger?.trace("Begin handling humidity result.")
|
||||||
guard let humidity = decode(RelativeHumidity.self) else {
|
guard let humidity = decode(RelativeHumidity.self) else {
|
||||||
client.logger?.trace("Failed to decode humidity: \(result.payload)")
|
logger?.trace("Failed to decode humidity: \(result.buffer)")
|
||||||
throw DecodingError()
|
throw DecodingError()
|
||||||
}
|
}
|
||||||
client.logger?.trace("Decoded humidity: \(humidity)")
|
logger?.trace("Decoded humidity: \(humidity)")
|
||||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||||
} else {
|
|
||||||
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try await publishUpdates()
|
try await publishUpdates()
|
||||||
client.logger?.trace("Done handling result for topic: \(topic)")
|
logger?.trace("Done handling result for topic: \(topic)")
|
||||||
|
} catch {
|
||||||
|
logger?.error("Received error: \(error)")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func publish(_ double: Double?, to topic: String) async throws {
|
private func publish(_ double: Double?, to topic: String) async throws {
|
||||||
guard let double else { return }
|
guard let double else { return }
|
||||||
try await client.publish(double, to: topic)
|
try await topicPublisher.publish(
|
||||||
client.logger?.trace("Published update to topic: \(topic)")
|
to: topic,
|
||||||
|
payload: ByteBufferAllocator().buffer(string: "\(double)"),
|
||||||
|
qos: .exactlyOnce,
|
||||||
|
retain: true
|
||||||
|
)
|
||||||
|
logger?.trace("Published update to topic: \(topic)")
|
||||||
}
|
}
|
||||||
|
|
||||||
private func publishUpdates() async throws {
|
private func publishUpdates() async throws {
|
||||||
for sensor in sensors.filter(\.needsProcessed) {
|
for sensor in sensors.filter(\.needsProcessed) {
|
||||||
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
||||||
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public actor SensorsService: Service {
|
|
||||||
private var sensors: [TemperatureAndHumiditySensor]
|
|
||||||
private let client: MQTTClient
|
|
||||||
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
|
|
||||||
nonisolated var logger: Logger { client.logger }
|
|
||||||
private var shuttingDown: Bool = false
|
|
||||||
|
|
||||||
public init(
|
|
||||||
client: MQTTClient,
|
|
||||||
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
|
|
||||||
sensors: [TemperatureAndHumiditySensor]
|
|
||||||
) {
|
|
||||||
self.client = client
|
|
||||||
self.events = events
|
|
||||||
self.sensors = sensors
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The entry-point of the service.
|
|
||||||
///
|
|
||||||
/// This method is called to start the service and begin
|
|
||||||
/// listening for sensor value changes then publishing the dew-point
|
|
||||||
/// and enthalpy values of the sensors.
|
|
||||||
public func run() async throws {
|
|
||||||
do {
|
|
||||||
try await withGracefulShutdownHandler {
|
|
||||||
try await withThrowingDiscardingTaskGroup { group in
|
|
||||||
client.addPublishListener(named: "\(Self.self)") { result in
|
|
||||||
if self.shuttingDown {
|
|
||||||
self.logger.trace("Shutting down.")
|
|
||||||
} else if !self.client.isActive() {
|
|
||||||
self.logger.trace("Client is not currently active")
|
|
||||||
} else {
|
|
||||||
Task { try await self.handleResult(result) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for await event in self.events().cancelOnGracefulShutdown() {
|
|
||||||
logger.trace("Received event: \(event)")
|
|
||||||
if event == .shuttingDown {
|
|
||||||
self.setIsShuttingDown()
|
|
||||||
} else if event == .connected {
|
|
||||||
group.addTask { try await self.subscribeToSensors() }
|
|
||||||
} else {
|
|
||||||
group.addTask { await self.unsubscribeToSensors() }
|
|
||||||
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} onGracefulShutdown: {
|
|
||||||
// do something.
|
|
||||||
self.logger.debug("Received graceful shutdown.")
|
|
||||||
Task { [weak self] in await self?.setIsShuttingDown() }
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
|
|
||||||
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
|
|
||||||
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
|
|
||||||
// to ignore the `noConnection` error.
|
|
||||||
logger.trace("Run error: \(error)")
|
|
||||||
// throw error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func setIsShuttingDown() {
|
|
||||||
logger.debug("Received shut down event.")
|
|
||||||
Task { try await publishUpdates() }
|
|
||||||
Task { await self.unsubscribeToSensors() }
|
|
||||||
shuttingDown = true
|
|
||||||
client.removePublishListener(named: "\(Self.self)")
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleResult(
|
|
||||||
_ result: Result<MQTTPublishInfo, any Error>
|
|
||||||
) async throws {
|
|
||||||
logger.trace("Begin handling result")
|
|
||||||
do {
|
|
||||||
switch result {
|
|
||||||
case let .failure(error):
|
|
||||||
logger.debug("Failed receiving sensor: \(error)")
|
|
||||||
throw error
|
|
||||||
case let .success(value):
|
|
||||||
// do something.
|
|
||||||
let topic = value.topicName
|
|
||||||
logger.trace("Received new value for topic: \(topic)")
|
|
||||||
if topic.contains("temperature") {
|
|
||||||
// do something.
|
|
||||||
var buffer = value.payload
|
|
||||||
guard let temperature = DryBulb(buffer: &buffer) else {
|
|
||||||
logger.trace("Decoding error for topic: \(topic)")
|
|
||||||
throw DecodingError()
|
|
||||||
}
|
|
||||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
|
||||||
try await publishUpdates()
|
|
||||||
|
|
||||||
} else if topic.contains("humidity") {
|
|
||||||
var buffer = value.payload
|
|
||||||
// Decode and update the temperature value
|
|
||||||
guard let humidity = RelativeHumidity(buffer: &buffer) else {
|
|
||||||
logger.debug("Failed to decode humidity from buffer: \(buffer)")
|
|
||||||
throw DecodingError()
|
|
||||||
}
|
|
||||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
|
||||||
try await publishUpdates()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
logger.trace("Handle Result error: \(error)")
|
|
||||||
throw error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func subscribeToSensors() async throws {
|
|
||||||
for sensor in sensors {
|
|
||||||
_ = try await client.subscribe(to: [
|
|
||||||
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
|
|
||||||
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
|
|
||||||
])
|
|
||||||
logger.debug("Subscribed to sensor: \(sensor.location)")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func unsubscribeToSensors() async {
|
|
||||||
logger.trace("Begin unsubscribe to sensors.")
|
|
||||||
guard client.isActive() else {
|
|
||||||
logger.debug("Client is not active, skipping.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
do {
|
|
||||||
let topics = sensors.reduce(into: [String]()) { array, sensor in
|
|
||||||
array.append(sensor.topics.temperature)
|
|
||||||
array.append(sensor.topics.humidity)
|
|
||||||
}
|
|
||||||
try await client.unsubscribe(from: topics)
|
|
||||||
logger.trace("Unsubscribed from sensors.")
|
|
||||||
} catch {
|
|
||||||
logger.trace("Unsubscribe error: \(error)")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func publish(double: Double?, to topic: String) async throws {
|
|
||||||
guard client.isActive() else { return }
|
|
||||||
guard let double else { return }
|
|
||||||
let rounded = round(double * 100) / 100
|
|
||||||
logger.debug("Publishing \(rounded), to: \(topic)")
|
|
||||||
try await client.publish(
|
|
||||||
to: topic,
|
|
||||||
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
|
|
||||||
qos: .exactlyOnce,
|
|
||||||
retain: true
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
private func publishUpdates() async throws {
|
|
||||||
for sensor in sensors.filter(\.needsProcessed) {
|
|
||||||
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
|
||||||
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
|
||||||
try sensors.hasProcessed(sensor)
|
try sensors.hasProcessed(sensor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: - Errors
|
// MARK: - Errors
|
||||||
|
|
||||||
struct DecodingError: Error {}
|
struct DecodingError: Error {}
|
||||||
struct MQTTClientNotConnected: Error {}
|
struct SensorNotFoundError: Error {}
|
||||||
struct NotFoundError: Error {}
|
|
||||||
struct SensorExists: Error {}
|
|
||||||
struct SensorCountError: Error {}
|
|
||||||
|
|
||||||
// MARK: - Helpers
|
// MARK: - Helpers
|
||||||
|
|
||||||
@@ -319,14 +174,14 @@ private extension Array where Element == TemperatureAndHumiditySensor {
|
|||||||
with value: V
|
with value: V
|
||||||
) throws {
|
) throws {
|
||||||
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
|
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
|
||||||
throw NotFoundError()
|
throw SensorNotFoundError()
|
||||||
}
|
}
|
||||||
self[index][keyPath: keyPath] = value
|
self[index][keyPath: keyPath] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
|
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
|
||||||
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
|
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
|
||||||
throw NotFoundError()
|
throw SensorNotFoundError()
|
||||||
}
|
}
|
||||||
self[index].needsProcessed = false
|
self[index].needsProcessed = false
|
||||||
}
|
}
|
||||||
|
|||||||
183
Sources/TopicDependencies/TopicListener.swift
Normal file
183
Sources/TopicDependencies/TopicListener.swift
Normal file
@@ -0,0 +1,183 @@
|
|||||||
|
import Dependencies
|
||||||
|
import DependenciesMacros
|
||||||
|
import Foundation
|
||||||
|
import MQTTNIO
|
||||||
|
|
||||||
|
/// A dependency that can generate an async stream of changes to the given topics.
|
||||||
|
///
|
||||||
|
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
|
||||||
|
/// to generate the live dependency.
|
||||||
|
@DependencyClient
|
||||||
|
public struct TopicListener: Sendable {
|
||||||
|
|
||||||
|
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, MQTTListenResultError>>
|
||||||
|
|
||||||
|
/// Create an async stream that listens for changes to the given topics.
|
||||||
|
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
|
||||||
|
|
||||||
|
/// Shutdown the listener stream.
|
||||||
|
public var shutdown: @Sendable () -> Void
|
||||||
|
|
||||||
|
/// Create a new topic listener.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - listen: Generate an async stream of changes for the given topics.
|
||||||
|
/// - shutdown: Shutdown the topic listener stream.
|
||||||
|
public init(
|
||||||
|
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
|
||||||
|
shutdown: @Sendable @escaping () -> Void
|
||||||
|
) {
|
||||||
|
self._listen = listen
|
||||||
|
self.shutdown = shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an async stream that listens for changes to the given topics.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topics: The topics to listen for changes to.
|
||||||
|
/// - qos: The MQTTQoS for the subscription.
|
||||||
|
public func listen(
|
||||||
|
to topics: [String],
|
||||||
|
qos: MQTTQoS = .atLeastOnce
|
||||||
|
) async throws -> Stream {
|
||||||
|
try await _listen(topics, qos)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an async stream that listens for changes to the given topics.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topics: The topics to listen for changes to.
|
||||||
|
/// - qos: The MQTTQoS for the subscription.
|
||||||
|
public func listen(
|
||||||
|
_ topics: String...,
|
||||||
|
qos: MQTTQoS = .atLeastOnce
|
||||||
|
) async throws -> Stream {
|
||||||
|
try await listen(to: topics, qos: qos)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create the live implementation of the topic listener with the given MQTTClient.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - client: The MQTTClient to use.
|
||||||
|
public static func live(client: MQTTClient) -> Self {
|
||||||
|
let listener = MQTTTopicListener(client: client)
|
||||||
|
return .init(
|
||||||
|
listen: { try await listener.listen($0, $1) },
|
||||||
|
shutdown: { listener.shutdown() }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension TopicListener: TestDependencyKey {
|
||||||
|
public static var testValue: TopicListener { Self() }
|
||||||
|
}
|
||||||
|
|
||||||
|
public extension DependencyValues {
|
||||||
|
var topicListener: TopicListener {
|
||||||
|
get { self[TopicListener.self] }
|
||||||
|
set { self[TopicListener.self] = newValue }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Helpers
|
||||||
|
|
||||||
|
private actor MQTTTopicListener {
|
||||||
|
private let client: MQTTClient
|
||||||
|
private let continuation: TopicListener.Stream.Continuation
|
||||||
|
private let name: String
|
||||||
|
let stream: TopicListener.Stream
|
||||||
|
private var shuttingDown: Bool = false
|
||||||
|
|
||||||
|
init(
|
||||||
|
client: MQTTClient
|
||||||
|
) {
|
||||||
|
let (stream, continuation) = TopicListener.Stream.makeStream()
|
||||||
|
self.client = client
|
||||||
|
self.continuation = continuation
|
||||||
|
self.name = UUID().uuidString
|
||||||
|
self.stream = stream
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
if !shuttingDown {
|
||||||
|
let message = """
|
||||||
|
Shutdown was not called on topic listener. This could lead to potential errors or
|
||||||
|
the stream never ending.
|
||||||
|
|
||||||
|
Please ensure that you call shutdown on the listener.
|
||||||
|
"""
|
||||||
|
client.logger.warning("\(message)")
|
||||||
|
continuation.finish()
|
||||||
|
}
|
||||||
|
client.removePublishListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func listen(
|
||||||
|
_ topics: [String],
|
||||||
|
_ qos: MQTTQoS = .atLeastOnce
|
||||||
|
) async throws(TopicListenerError) -> TopicListener.Stream {
|
||||||
|
var sleepTimes = 0
|
||||||
|
|
||||||
|
while !client.isActive() {
|
||||||
|
guard sleepTimes < 10 else {
|
||||||
|
throw .connectionTimeout
|
||||||
|
}
|
||||||
|
try? await Task.sleep(for: .milliseconds(100))
|
||||||
|
sleepTimes += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
client.logger.trace("Client is active, begin subscribing to topics.")
|
||||||
|
|
||||||
|
let subscription = try? await client.subscribe(to: topics.map {
|
||||||
|
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
|
||||||
|
})
|
||||||
|
|
||||||
|
guard subscription != nil else {
|
||||||
|
client.logger.error("Error subscribing to topics: \(topics)")
|
||||||
|
throw .failedToSubscribe
|
||||||
|
}
|
||||||
|
|
||||||
|
client.logger.trace("Done subscribing, begin listening to topics.")
|
||||||
|
|
||||||
|
client.addPublishListener(named: name) { result in
|
||||||
|
switch result {
|
||||||
|
case let .failure(error):
|
||||||
|
self.client.logger.error("Received error while listening: \(error)")
|
||||||
|
self.continuation.yield(.failure(.init(error)))
|
||||||
|
case let .success(publishInfo):
|
||||||
|
if topics.contains(publishInfo.topicName) {
|
||||||
|
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
|
||||||
|
self.continuation.yield(.success(publishInfo))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream
|
||||||
|
}
|
||||||
|
|
||||||
|
private func setIsShuttingDown() {
|
||||||
|
shuttingDown = true
|
||||||
|
}
|
||||||
|
|
||||||
|
nonisolated func shutdown() {
|
||||||
|
client.logger.trace("Closing topic listener...")
|
||||||
|
continuation.finish()
|
||||||
|
client.removePublishListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
Task { await self.setIsShuttingDown() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public enum TopicListenerError: Error {
|
||||||
|
case connectionTimeout
|
||||||
|
case failedToSubscribe
|
||||||
|
}
|
||||||
|
|
||||||
|
public struct MQTTListenResultError: Error {
|
||||||
|
let underlyingError: any Error
|
||||||
|
|
||||||
|
init(_ underlyingError: any Error) {
|
||||||
|
self.underlyingError = underlyingError
|
||||||
|
}
|
||||||
|
}
|
||||||
117
Sources/TopicDependencies/TopicPublisher.swift
Normal file
117
Sources/TopicDependencies/TopicPublisher.swift
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
import Dependencies
|
||||||
|
import DependenciesMacros
|
||||||
|
import MQTTNIO
|
||||||
|
import NIO
|
||||||
|
|
||||||
|
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||||
|
///
|
||||||
|
/// - Note: This dependency only conforms to `TestDependencyKey` because it
|
||||||
|
/// requires an active `MQTTClient` to generate the live dependency.
|
||||||
|
@DependencyClient
|
||||||
|
public struct TopicPublisher: Sendable {
|
||||||
|
|
||||||
|
private var _publish: @Sendable (PublishRequest) async throws -> Void
|
||||||
|
|
||||||
|
/// Create a new topic publisher.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - publish: Handle the publish request.
|
||||||
|
public init(
|
||||||
|
publish: @Sendable @escaping (PublishRequest) async throws -> Void
|
||||||
|
) {
|
||||||
|
self._publish = publish
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish a new value to the given topic.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topicName: The topic to publish the new value to.
|
||||||
|
/// - payload: The value to publish.
|
||||||
|
/// - qos: The MQTTQoS.
|
||||||
|
/// - retain: The retain flag.
|
||||||
|
public func publish(
|
||||||
|
to topicName: String,
|
||||||
|
payload: ByteBuffer,
|
||||||
|
qos: MQTTQoS,
|
||||||
|
retain: Bool = false
|
||||||
|
) async throws {
|
||||||
|
try await _publish(.init(
|
||||||
|
topicName: topicName,
|
||||||
|
payload: payload,
|
||||||
|
qos: qos,
|
||||||
|
retain: retain
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create the live topic publisher with the given `MQTTClient`.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - client: The mqtt broker client to use.
|
||||||
|
public static func live(client: MQTTClient) -> Self {
|
||||||
|
.init(
|
||||||
|
publish: { request in
|
||||||
|
guard client.isActive() else {
|
||||||
|
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
client.logger.trace("Begin publishing to topic: \(request.topicName)")
|
||||||
|
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
|
||||||
|
try await client.publish(
|
||||||
|
to: request.topicName,
|
||||||
|
payload: request.payload,
|
||||||
|
qos: request.qos,
|
||||||
|
retain: request.retain
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents the parameters required to publish a new value to the
|
||||||
|
/// MQTT broker.
|
||||||
|
public struct PublishRequest: Equatable, Sendable {
|
||||||
|
|
||||||
|
/// The topic to publish the new value to.
|
||||||
|
public let topicName: String
|
||||||
|
|
||||||
|
/// The value to publish.
|
||||||
|
public let payload: ByteBuffer
|
||||||
|
|
||||||
|
/// The qos of the request.
|
||||||
|
public let qos: MQTTQoS
|
||||||
|
|
||||||
|
/// The retain flag for the request.
|
||||||
|
public let retain: Bool
|
||||||
|
|
||||||
|
/// Create a new publish request.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topicName: The topic to publish to.
|
||||||
|
/// - payload: The value to publish.
|
||||||
|
/// - qos: The qos of the request.
|
||||||
|
/// - retain: The retain flag of the request.
|
||||||
|
public init(
|
||||||
|
topicName: String,
|
||||||
|
payload: ByteBuffer,
|
||||||
|
qos: MQTTQoS,
|
||||||
|
retain: Bool
|
||||||
|
) {
|
||||||
|
self.topicName = topicName
|
||||||
|
self.payload = payload
|
||||||
|
self.qos = qos
|
||||||
|
self.retain = retain
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension TopicPublisher: TestDependencyKey {
|
||||||
|
public static var testValue: TopicPublisher { Self() }
|
||||||
|
}
|
||||||
|
|
||||||
|
public extension DependencyValues {
|
||||||
|
|
||||||
|
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||||
|
var topicPublisher: TopicPublisher {
|
||||||
|
get { self[TopicPublisher.self] }
|
||||||
|
set { self[TopicPublisher.self] = newValue }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,15 +1,19 @@
|
|||||||
|
import Dependencies
|
||||||
import Foundation
|
import Foundation
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
|
import MQTTConnectionManager
|
||||||
import MQTTConnectionService
|
import MQTTConnectionService
|
||||||
import MQTTNIO
|
import MQTTNIO
|
||||||
import NIO
|
import NIO
|
||||||
import PsychrometricClientLive
|
import PsychrometricClientLive
|
||||||
import SensorsService
|
import SensorsService
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
|
import TopicDependencies
|
||||||
|
|
||||||
@main
|
@main
|
||||||
struct Application {
|
struct Application {
|
||||||
|
|
||||||
/// The main entry point of the application.
|
/// The main entry point of the application.
|
||||||
static func main() async throws {
|
static func main() async throws {
|
||||||
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
@@ -31,14 +35,17 @@ struct Application {
|
|||||||
logger: logger
|
logger: logger
|
||||||
)
|
)
|
||||||
|
|
||||||
let mqttConnection = MQTTConnectionService(client: mqtt)
|
do {
|
||||||
let sensors = SensorsService(
|
try await withDependencies {
|
||||||
client: mqtt,
|
$0.psychrometricClient = .liveValue
|
||||||
events: { mqttConnection.events },
|
$0.topicListener = .live(client: mqtt)
|
||||||
sensors: .live
|
$0.topicPublisher = .live(client: mqtt)
|
||||||
)
|
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
|
||||||
|
} operation: {
|
||||||
|
let mqttConnection = MQTTConnectionService(logger: logger)
|
||||||
|
let sensors = SensorsService(sensors: .live, logger: logger)
|
||||||
|
|
||||||
let serviceGroup = ServiceGroup(
|
var serviceGroupConfiguration = ServiceGroupConfiguration(
|
||||||
services: [
|
services: [
|
||||||
mqttConnection,
|
mqttConnection,
|
||||||
sensors
|
sensors
|
||||||
@@ -46,9 +53,20 @@ struct Application {
|
|||||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||||
logger: logger
|
logger: logger
|
||||||
)
|
)
|
||||||
|
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
|
||||||
|
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
|
||||||
|
|
||||||
|
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
|
||||||
|
|
||||||
try await serviceGroup.run()
|
try await serviceGroup.run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try await mqtt.shutdown()
|
||||||
|
try await eventloopGroup.shutdownGracefully()
|
||||||
|
} catch {
|
||||||
|
try await eventloopGroup.shutdownGracefully()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: - Helpers
|
// MARK: - Helpers
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import Combine
|
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
@testable import MQTTConnectionService
|
@testable import MQTTConnectionManager
|
||||||
|
import MQTTConnectionService
|
||||||
import MQTTNIO
|
import MQTTNIO
|
||||||
import NIO
|
import NIO
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
@@ -13,22 +13,59 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
|||||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||||
|
|
||||||
static let logger: Logger = {
|
static let logger: Logger = {
|
||||||
var logger = Logger(label: "AsyncClientTests")
|
var logger = Logger(label: "MQTTConnectionServiceTests")
|
||||||
logger.logLevel = .trace
|
logger.logLevel = .trace
|
||||||
return logger
|
return logger
|
||||||
}()
|
}()
|
||||||
|
|
||||||
func testGracefulShutdownWorks() async throws {
|
// func testGracefulShutdownWorks() async throws {
|
||||||
try await testGracefulShutdown { trigger in
|
// let client = createClient(identifier: "testGracefulShutdown")
|
||||||
let client = createClient(identifier: "testGracefulShutdown")
|
// let service = MQTTConnectionService(client: client)
|
||||||
let service = MQTTConnectionService(client: client)
|
// await service.connect()
|
||||||
try await service.run()
|
// try await Task.sleep(for: .seconds(1))
|
||||||
try await Task.sleep(for: .seconds(1))
|
// XCTAssert(client.isActive())
|
||||||
XCTAssert(client.isActive())
|
// service.shutdown()
|
||||||
trigger.triggerGracefulShutdown()
|
// XCTAssertFalse(client.isActive())
|
||||||
// try await Task.sleep(for: .seconds(2))
|
// }
|
||||||
// XCTAssertFalse(client.isActive())
|
|
||||||
|
func testWhatHappensIfConnectIsCalledMultipleTimes() async throws {
|
||||||
|
let client = createClient(identifier: "testWhatHappensIfConnectIsCalledMultipleTimes")
|
||||||
|
let manager = MQTTConnectionManager.live(client: client)
|
||||||
|
try await manager.connect()
|
||||||
|
try await manager.connect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Move to integration tests.
|
||||||
|
func testMQTTConnectionStream() async throws {
|
||||||
|
let client = createClient(identifier: "testNonManagedStream")
|
||||||
|
let manager = MQTTConnectionManager.live(
|
||||||
|
client: client,
|
||||||
|
logger: Self.logger,
|
||||||
|
alwaysReconnect: false
|
||||||
|
)
|
||||||
|
let stream = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||||
|
var events = [MQTTConnectionManager.Event]()
|
||||||
|
|
||||||
|
_ = try await manager.connect()
|
||||||
|
|
||||||
|
Task {
|
||||||
|
while !client.isActive() {
|
||||||
|
try await Task.sleep(for: .milliseconds(100))
|
||||||
|
}
|
||||||
|
try await Task.sleep(for: .milliseconds(200))
|
||||||
|
manager.shutdown()
|
||||||
|
try await client.disconnect()
|
||||||
|
try await Task.sleep(for: .milliseconds(200))
|
||||||
|
try await client.shutdown()
|
||||||
|
try await Task.sleep(for: .milliseconds(200))
|
||||||
|
stream.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
for await event in stream.removeDuplicates() {
|
||||||
|
events.append(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
XCTAssertEqual(events, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||||
}
|
}
|
||||||
|
|
||||||
func createClient(identifier: String) -> MQTTClient {
|
func createClient(identifier: String) -> MQTTClient {
|
||||||
@@ -58,65 +95,4 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testEventStream() async throws {
|
|
||||||
var connection: ConnectionStream? = ConnectionStream()
|
|
||||||
|
|
||||||
let task = Task {
|
|
||||||
guard let events = connection?.events else { return }
|
|
||||||
print("before loop")
|
|
||||||
for await event in events {
|
|
||||||
print("\(event)")
|
|
||||||
}
|
|
||||||
print("after loop")
|
|
||||||
}
|
|
||||||
|
|
||||||
let ending = Task {
|
|
||||||
try await Task.sleep(for: .seconds(2))
|
|
||||||
connection = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
connection?.start()
|
|
||||||
try await ending.value
|
|
||||||
task.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
class ConnectionStream {
|
|
||||||
|
|
||||||
enum Event {
|
|
||||||
case connected
|
|
||||||
case disconnected
|
|
||||||
case shuttingDown
|
|
||||||
}
|
|
||||||
|
|
||||||
let events: AsyncStream<Event>
|
|
||||||
private let continuation: AsyncStream<Event>.Continuation
|
|
||||||
private var cancellable: AnyCancellable?
|
|
||||||
|
|
||||||
init() {
|
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
|
||||||
self.events = stream
|
|
||||||
self.continuation = continuation
|
|
||||||
}
|
|
||||||
|
|
||||||
deinit {
|
|
||||||
print("connection stream is gone.")
|
|
||||||
stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func start() {
|
|
||||||
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
|
||||||
.autoconnect()
|
|
||||||
.sink { [weak self] _ in
|
|
||||||
print("will send event.")
|
|
||||||
self?.continuation.yield(.connected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func stop() {
|
|
||||||
continuation.yield(.shuttingDown)
|
|
||||||
cancellable = nil
|
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import MQTTNIO
|
|||||||
import NIO
|
import NIO
|
||||||
import PsychrometricClientLive
|
import PsychrometricClientLive
|
||||||
@testable import SensorsService
|
@testable import SensorsService
|
||||||
|
import TopicDependencies
|
||||||
import XCTest
|
import XCTest
|
||||||
|
|
||||||
final class SensorsClientTests: XCTestCase {
|
final class SensorsClientTests: XCTestCase {
|
||||||
@@ -12,7 +13,7 @@ final class SensorsClientTests: XCTestCase {
|
|||||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||||
|
|
||||||
static let logger: Logger = {
|
static let logger: Logger = {
|
||||||
var logger = Logger(label: "AsyncClientTests")
|
var logger = Logger(label: "SensorsClientTests")
|
||||||
logger.logLevel = .debug
|
logger.logLevel = .debug
|
||||||
return logger
|
return logger
|
||||||
}()
|
}()
|
||||||
@@ -25,42 +26,28 @@ final class SensorsClientTests: XCTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func createClient(identifier: String) -> SensorsClient {
|
func testWhatHappensIfClientDisconnectsWhileListening() async throws {
|
||||||
// let envVars = EnvVars(
|
let client = createClient(identifier: "testWhatHappensIfClientDisconnectsWhileListening")
|
||||||
// appEnv: .testing,
|
let listener = TopicListener.live(client: client)
|
||||||
// host: Self.hostname,
|
try await client.connect()
|
||||||
// port: "1883",
|
|
||||||
// identifier: identifier,
|
let stream = try await listener.listen("/some/topic")
|
||||||
// userName: nil,
|
|
||||||
// password: nil
|
// try await Task.sleep(for: .seconds(1))
|
||||||
// )
|
// try await client.disconnect()
|
||||||
// return .init(envVars: envVars, logger: Self.logger)
|
//
|
||||||
// }
|
// try await client.connect()
|
||||||
func createClient(identifier: String) -> MQTTClient {
|
// try await Task.sleep(for: .seconds(1))
|
||||||
let envVars = EnvVars(
|
try await client.publish(
|
||||||
appEnv: .testing,
|
to: "/some/topic",
|
||||||
host: Self.hostname,
|
payload: ByteBufferAllocator().buffer(string: "Foo"),
|
||||||
port: "1883",
|
qos: .atLeastOnce,
|
||||||
identifier: identifier,
|
retain: true
|
||||||
userName: nil,
|
|
||||||
password: nil
|
|
||||||
)
|
|
||||||
let config = MQTTClient.Configuration(
|
|
||||||
version: .v3_1_1,
|
|
||||||
userName: envVars.userName,
|
|
||||||
password: envVars.password,
|
|
||||||
useSSL: false,
|
|
||||||
useWebSockets: false,
|
|
||||||
tlsConfiguration: nil,
|
|
||||||
webSocketURLPath: nil
|
|
||||||
)
|
|
||||||
return .init(
|
|
||||||
host: Self.hostname,
|
|
||||||
identifier: identifier,
|
|
||||||
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
|
||||||
logger: Self.logger,
|
|
||||||
configuration: config
|
|
||||||
)
|
)
|
||||||
|
try await Task.sleep(for: .seconds(1))
|
||||||
|
|
||||||
|
listener.shutdown()
|
||||||
|
try await client.shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
// func testConnectAndShutdown() async throws {
|
// func testConnectAndShutdown() async throws {
|
||||||
@@ -132,52 +119,52 @@ final class SensorsClientTests: XCTestCase {
|
|||||||
// try await mqtt.shutdown()
|
// try await mqtt.shutdown()
|
||||||
// }
|
// }
|
||||||
|
|
||||||
func testCapturingSensorClient() async throws {
|
// func testCapturingSensorClient() async throws {
|
||||||
class CapturedValues {
|
// class CapturedValues {
|
||||||
var values = [(value: Double, topic: String)]()
|
// var values = [(value: Double, topic: String)]()
|
||||||
var didShutdown = false
|
// var didShutdown = false
|
||||||
|
//
|
||||||
init() {}
|
// init() {}
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
let capturedValues = CapturedValues()
|
// let capturedValues = CapturedValues()
|
||||||
|
//
|
||||||
try await withDependencies {
|
// try await withDependencies {
|
||||||
$0.sensorsClient = .testing(
|
// $0.sensorsClient = .testing(
|
||||||
yielding: [
|
// yielding: [
|
||||||
(value: 76, to: "not-listening"),
|
// (value: 76, to: "not-listening"),
|
||||||
(value: 75, to: "test")
|
// (value: 75, to: "test")
|
||||||
]
|
// ]
|
||||||
) { value, topic in
|
// ) { value, topic in
|
||||||
capturedValues.values.append((value, topic))
|
// capturedValues.values.append((value, topic))
|
||||||
} captureShutdownEvent: {
|
// } captureShutdownEvent: {
|
||||||
capturedValues.didShutdown = $0
|
// capturedValues.didShutdown = $0
|
||||||
}
|
// }
|
||||||
} operation: {
|
// } operation: {
|
||||||
@Dependency(\.sensorsClient) var client
|
// @Dependency(\.sensorsClient) var client
|
||||||
let stream = try await client.listen(to: ["test"])
|
// let stream = try await client.listen(to: ["test"])
|
||||||
|
//
|
||||||
for await value in stream {
|
// for await result in stream {
|
||||||
var buffer = value.payload
|
// var buffer = result.buffer
|
||||||
guard let double = Double(buffer: &buffer) else {
|
// guard let double = Double(buffer: &buffer) else {
|
||||||
XCTFail("Failed to decode double")
|
// XCTFail("Failed to decode double")
|
||||||
return
|
// return
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
XCTAssertEqual(double, 75)
|
// XCTAssertEqual(double, 75)
|
||||||
XCTAssertEqual(value.topicName, "test")
|
// XCTAssertEqual(result.topic, "test")
|
||||||
try await client.publish(26, to: "publish")
|
// try await client.publish(26, to: "publish")
|
||||||
try await Task.sleep(for: .milliseconds(100))
|
// try await Task.sleep(for: .milliseconds(100))
|
||||||
client.shutdown()
|
// client.shutdown()
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
XCTAssertEqual(capturedValues.values.count, 1)
|
// XCTAssertEqual(capturedValues.values.count, 1)
|
||||||
XCTAssertEqual(capturedValues.values.first?.value, 26)
|
// XCTAssertEqual(capturedValues.values.first?.value, 26)
|
||||||
XCTAssertEqual(capturedValues.values.first?.topic, "publish")
|
// XCTAssertEqual(capturedValues.values.first?.topic, "publish")
|
||||||
XCTAssertTrue(capturedValues.didShutdown)
|
// XCTAssertTrue(capturedValues.didShutdown)
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// func testSensorCapturesPublishedState() async throws {
|
// func testSensorCapturesPublishedState() async throws {
|
||||||
// let client = createClient(identifier: "testSensorCapturesPublishedState")
|
// let client = createClient(identifier: "testSensorCapturesPublishedState")
|
||||||
// let mqtt = client.client
|
// let mqtt = client.client
|
||||||
@@ -234,10 +221,47 @@ final class SensorsClientTests: XCTestCase {
|
|||||||
//
|
//
|
||||||
// await client.shutdown()
|
// await client.shutdown()
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
func createClient(identifier: String) -> MQTTClient {
|
||||||
|
let envVars = EnvVars(
|
||||||
|
appEnv: .testing,
|
||||||
|
host: Self.hostname,
|
||||||
|
port: "1883",
|
||||||
|
identifier: identifier,
|
||||||
|
userName: nil,
|
||||||
|
password: nil
|
||||||
|
)
|
||||||
|
let config = MQTTClient.Configuration(
|
||||||
|
version: .v3_1_1,
|
||||||
|
userName: envVars.userName,
|
||||||
|
password: envVars.password,
|
||||||
|
useSSL: false,
|
||||||
|
useWebSockets: false,
|
||||||
|
tlsConfiguration: nil,
|
||||||
|
webSocketURLPath: nil
|
||||||
|
)
|
||||||
|
return .init(
|
||||||
|
host: Self.hostname,
|
||||||
|
identifier: identifier,
|
||||||
|
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
||||||
|
logger: Self.logger,
|
||||||
|
configuration: config
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: Helpers for tests.
|
// MARK: Helpers for tests.
|
||||||
|
|
||||||
|
extension AsyncStream {
|
||||||
|
func first() async -> Element {
|
||||||
|
var first: Element
|
||||||
|
for await value in self {
|
||||||
|
first = value
|
||||||
|
}
|
||||||
|
return first
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class PublishInfoContainer {
|
class PublishInfoContainer {
|
||||||
private(set) var info: [MQTTPublishInfo]
|
private(set) var info: [MQTTPublishInfo]
|
||||||
private var topicFilters: [String]?
|
private var topicFilters: [String]?
|
||||||
@@ -258,41 +282,35 @@ class PublishInfoContainer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension SensorsClient {
|
// extension SensorsClient {
|
||||||
|
//
|
||||||
static func testing(
|
// static func testing(
|
||||||
yielding: [(value: Double, to: String)],
|
// yielding: [(value: Double, to: String)],
|
||||||
capturePublishedValues: @escaping (Double, String) -> Void,
|
// capturePublishedValues: @escaping (Double, String) -> Void,
|
||||||
captureShutdownEvent: @escaping (Bool) -> Void
|
// captureShutdownEvent: @escaping (Bool) -> Void
|
||||||
) -> Self {
|
// ) -> Self {
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self)
|
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
|
||||||
let logger = Logger(label: "\(Self.self).testing")
|
// let logger = Logger(label: "\(Self.self).testing")
|
||||||
|
//
|
||||||
return .init(
|
// return .init(
|
||||||
listen: { topics in
|
// listen: { topics in
|
||||||
for (value, topic) in yielding where topics.contains(topic) {
|
// for (value, topic) in yielding where topics.contains(topic) {
|
||||||
continuation.yield(
|
// continuation.yield(
|
||||||
MQTTPublishInfo(
|
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
|
||||||
qos: .atLeastOnce,
|
// )
|
||||||
retain: true,
|
// }
|
||||||
topicName: topic,
|
// return stream
|
||||||
payload: ByteBuffer(string: "\(value)"),
|
// },
|
||||||
properties: MQTTProperties()
|
// logger: logger,
|
||||||
)
|
// publish: { value, topic in
|
||||||
)
|
// capturePublishedValues(value, topic)
|
||||||
}
|
// },
|
||||||
return stream
|
// shutdown: {
|
||||||
},
|
// captureShutdownEvent(true)
|
||||||
logger: logger,
|
// continuation.finish()
|
||||||
publish: { value, topic in
|
// }
|
||||||
capturePublishedValues(value, topic)
|
// )
|
||||||
},
|
// }
|
||||||
shutdown: {
|
// }
|
||||||
captureShutdownEvent(true)
|
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TopicNotFoundError: Error {}
|
struct TopicNotFoundError: Error {}
|
||||||
|
|||||||
@@ -1,47 +0,0 @@
|
|||||||
import XCTest
|
|
||||||
import class Foundation.Bundle
|
|
||||||
|
|
||||||
//final class dewPoint_controllerTests: XCTestCase {
|
|
||||||
// func testExample() throws {
|
|
||||||
// // This is an example of a functional test case.
|
|
||||||
// // Use XCTAssert and related functions to verify your tests produce the correct
|
|
||||||
// // results.
|
|
||||||
//
|
|
||||||
// // Some of the APIs that we use below are available in macOS 10.13 and above.
|
|
||||||
// guard #available(macOS 10.13, *) else {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Mac Catalyst won't have `Process`, but it is supported for executables.
|
|
||||||
// #if !targetEnvironment(macCatalyst)
|
|
||||||
//
|
|
||||||
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
|
|
||||||
//
|
|
||||||
// let process = Process()
|
|
||||||
// process.executableURL = fooBinary
|
|
||||||
//
|
|
||||||
// let pipe = Pipe()
|
|
||||||
// process.standardOutput = pipe
|
|
||||||
//
|
|
||||||
// try process.run()
|
|
||||||
// process.waitUntilExit()
|
|
||||||
//
|
|
||||||
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
|
|
||||||
// let output = String(data: data, encoding: .utf8)
|
|
||||||
//
|
|
||||||
// XCTAssertEqual(output, "Hello, world!\n")
|
|
||||||
// #endif
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /// Returns path to the built products directory.
|
|
||||||
// var productsDirectory: URL {
|
|
||||||
// #if os(macOS)
|
|
||||||
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
|
|
||||||
// return bundle.bundleURL.deletingLastPathComponent()
|
|
||||||
// }
|
|
||||||
// fatalError("couldn't find the products directory")
|
|
||||||
// #else
|
|
||||||
// return Bundle.main.bundleURL
|
|
||||||
// #endif
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
@@ -14,7 +14,7 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
- mosquitto
|
- mosquitto
|
||||||
environment:
|
environment:
|
||||||
- MOSQUITTO_SERVER=mosquitto
|
- MQTT_HOST=mosquitto
|
||||||
|
|
||||||
test:
|
test:
|
||||||
build:
|
build:
|
||||||
|
|||||||
Reference in New Issue
Block a user