Compare commits
13 Commits
f68ac528e4
...
psychromet
| Author | SHA1 | Date | |
|---|---|---|---|
|
ce327a6f1c
|
|||
|
95f8565cde
|
|||
|
163f603b69
|
|||
|
e7a849b003
|
|||
|
bd2a798320
|
|||
|
b8992b89b6
|
|||
|
efd9907b4a
|
|||
|
fbbd65f7ae
|
|||
|
8067331ff8
|
|||
|
b6db9b5322
|
|||
|
bf1126b06a
|
|||
|
ef552fb8bc
|
|||
|
1e62d7aac0
|
14
.gitea/workflows/ci.yaml
Normal file
14
.gitea/workflows/ci.yaml
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
---
|
||||||
|
name: CI
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
pull_request:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: Run Tests
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- name: Test
|
||||||
|
run: make test
|
||||||
@@ -8,3 +8,4 @@
|
|||||||
--wrapconditions after-first
|
--wrapconditions after-first
|
||||||
--typeblanklines preserve
|
--typeblanklines preserve
|
||||||
--commas inline
|
--commas inline
|
||||||
|
--stripunusedargs closure-only
|
||||||
|
|||||||
@@ -5,10 +5,10 @@ WORKDIR /build
|
|||||||
COPY ./Package.* ./
|
COPY ./Package.* ./
|
||||||
RUN swift package resolve
|
RUN swift package resolve
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN swift build --enable-test-discovery -c release -Xswiftc -g
|
RUN swift build -c release -Xswiftc -g
|
||||||
|
|
||||||
# Run image
|
# Run image
|
||||||
FROM swift:5.10-slim
|
FROM swift:5.10-slim
|
||||||
WORKDIR /run
|
WORKDIR /run
|
||||||
COPY --from=build /build/.build/release/dewPoint-controller /run
|
COPY --from=build /build/.build/release/dewpoint-controller /run
|
||||||
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]
|
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]
|
||||||
|
|||||||
2
Makefile
2
Makefile
@@ -23,7 +23,7 @@ stop-mosquitto:
|
|||||||
@docker-compose rm -f mosquitto || true
|
@docker-compose rm -f mosquitto || true
|
||||||
|
|
||||||
test-docker:
|
test-docker:
|
||||||
@docker-compose run --remove-orphans -i --rm test
|
@docker-compose run --build --remove-orphans -i --rm test
|
||||||
@docker-compose kill mosquitto-test
|
@docker-compose kill mosquitto-test
|
||||||
@docker-compose rm -f
|
@docker-compose rm -f
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"originHash" : "d3104d51323f6bc98cf3ab2930e5a26f72c9a4fdb7640360ba27628672397841",
|
"originHash" : "c2538e3229d6c80f3d6a979c2f3605d63b4973e1e49786819017473eb2916f4e",
|
||||||
"pins" : [
|
"pins" : [
|
||||||
{
|
{
|
||||||
"identity" : "combine-schedulers",
|
"identity" : "combine-schedulers",
|
||||||
@@ -69,8 +69,8 @@
|
|||||||
"kind" : "remoteSourceControl",
|
"kind" : "remoteSourceControl",
|
||||||
"location" : "https://github.com/pointfreeco/swift-dependencies",
|
"location" : "https://github.com/pointfreeco/swift-dependencies",
|
||||||
"state" : {
|
"state" : {
|
||||||
"revision" : "0fc0255e780bf742abeef29dec80924f5f0ae7b9",
|
"revision" : "96eecd47660e8307877acb8c41cc5295ba7350a7",
|
||||||
"version" : "1.4.1"
|
"version" : "1.5.2"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -8,40 +8,41 @@ let swiftSettings: [SwiftSetting] = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
let package = Package(
|
let package = Package(
|
||||||
name: "dewPoint-controller",
|
name: "dewpoint-controller",
|
||||||
platforms: [
|
platforms: [
|
||||||
.macOS(.v14)
|
.macOS(.v14)
|
||||||
],
|
],
|
||||||
products: [
|
products: [
|
||||||
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
|
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
|
||||||
.library(name: "Models", targets: ["Models"]),
|
.library(name: "Models", targets: ["Models"]),
|
||||||
|
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
|
||||||
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
|
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
|
||||||
.library(name: "SensorsService", targets: ["SensorsService"])
|
.library(name: "SensorsService", targets: ["SensorsService"]),
|
||||||
|
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
|
||||||
],
|
],
|
||||||
dependencies: [
|
dependencies: [
|
||||||
|
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
|
||||||
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
|
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
|
||||||
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
|
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
|
||||||
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
|
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"),
|
||||||
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
|
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
|
||||||
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
|
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
|
||||||
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
|
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
|
||||||
],
|
],
|
||||||
targets: [
|
targets: [
|
||||||
.executableTarget(
|
.executableTarget(
|
||||||
name: "dewPoint-controller",
|
name: "dewpoint-controller",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
"Models",
|
"Models",
|
||||||
|
"MQTTConnectionManager",
|
||||||
"MQTTConnectionService",
|
"MQTTConnectionService",
|
||||||
"SensorsService",
|
"SensorsService",
|
||||||
|
"TopicDependencies",
|
||||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||||
.product(name: "NIO", package: "swift-nio"),
|
.product(name: "NIO", package: "swift-nio"),
|
||||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
.testTarget(
|
|
||||||
name: "dewPoint-controllerTests",
|
|
||||||
dependencies: ["dewPoint-controller"]
|
|
||||||
),
|
|
||||||
.target(
|
.target(
|
||||||
name: "Models",
|
name: "Models",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
@@ -50,11 +51,21 @@ let package = Package(
|
|||||||
],
|
],
|
||||||
swiftSettings: swiftSettings
|
swiftSettings: swiftSettings
|
||||||
),
|
),
|
||||||
|
.target(
|
||||||
|
name: "MQTTConnectionManager",
|
||||||
|
dependencies: [
|
||||||
|
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
|
||||||
|
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||||
|
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||||
|
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||||
|
],
|
||||||
|
swiftSettings: swiftSettings
|
||||||
|
),
|
||||||
.target(
|
.target(
|
||||||
name: "MQTTConnectionService",
|
name: "MQTTConnectionService",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
"Models",
|
"Models",
|
||||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
"MQTTConnectionManager",
|
||||||
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
||||||
],
|
],
|
||||||
swiftSettings: swiftSettings
|
swiftSettings: swiftSettings
|
||||||
@@ -63,6 +74,7 @@ let package = Package(
|
|||||||
name: "MQTTConnectionServiceTests",
|
name: "MQTTConnectionServiceTests",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
"MQTTConnectionService",
|
"MQTTConnectionService",
|
||||||
|
"MQTTConnectionManager",
|
||||||
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
|
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
@@ -70,7 +82,8 @@ let package = Package(
|
|||||||
name: "SensorsService",
|
name: "SensorsService",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
"Models",
|
"Models",
|
||||||
"MQTTConnectionService",
|
"MQTTConnectionManager",
|
||||||
|
"TopicDependencies",
|
||||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||||
@@ -84,6 +97,15 @@ let package = Package(
|
|||||||
"SensorsService",
|
"SensorsService",
|
||||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||||
]
|
]
|
||||||
|
),
|
||||||
|
.target(
|
||||||
|
name: "TopicDependencies",
|
||||||
|
dependencies: [
|
||||||
|
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||||
|
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||||
|
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||||
|
],
|
||||||
|
swiftSettings: swiftSettings
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|||||||
221
Sources/MQTTConnectionManager/Live.swift
Normal file
221
Sources/MQTTConnectionManager/Live.swift
Normal file
@@ -0,0 +1,221 @@
|
|||||||
|
import AsyncAlgorithms
|
||||||
|
import Dependencies
|
||||||
|
import DependenciesMacros
|
||||||
|
import Foundation
|
||||||
|
import Logging
|
||||||
|
import MQTTNIO
|
||||||
|
import NIO
|
||||||
|
|
||||||
|
public extension DependencyValues {
|
||||||
|
|
||||||
|
/// A dependency that is responsible for managing the connection to
|
||||||
|
/// an MQTT broker.
|
||||||
|
var mqttConnectionManager: MQTTConnectionManager {
|
||||||
|
get { self[MQTTConnectionManager.self] }
|
||||||
|
set { self[MQTTConnectionManager.self] = newValue }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents the interface needed for the ``MQTTConnectionService``.
|
||||||
|
///
|
||||||
|
/// See ``MQTTConnectionManagerLive`` module for live implementation.
|
||||||
|
@DependencyClient
|
||||||
|
public struct MQTTConnectionManager: Sendable {
|
||||||
|
|
||||||
|
/// Connect to the MQTT broker.
|
||||||
|
public var connect: @Sendable () async throws -> Void
|
||||||
|
|
||||||
|
/// Shutdown the connection to the MQTT broker.
|
||||||
|
///
|
||||||
|
/// - Note: You should cancel any tasks that are listening to the connection stream first.
|
||||||
|
public var shutdown: @Sendable () -> Void
|
||||||
|
|
||||||
|
/// Create a stream of connection events.
|
||||||
|
public var stream: @Sendable () throws -> AsyncStream<Event>
|
||||||
|
|
||||||
|
/// Perform an operation with the underlying MQTTClient, this can be useful in
|
||||||
|
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
|
||||||
|
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
|
||||||
|
|
||||||
|
@_spi(Internal)
|
||||||
|
public func withClient(
|
||||||
|
_ callback: @Sendable (MQTTClient) async throws -> Void
|
||||||
|
) async throws {
|
||||||
|
try await _withClient(callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents connection events that clients can listen for and
|
||||||
|
/// react accordingly.
|
||||||
|
public enum Event: Sendable {
|
||||||
|
case connected
|
||||||
|
case disconnected
|
||||||
|
case shuttingDown
|
||||||
|
}
|
||||||
|
|
||||||
|
public static func live(
|
||||||
|
client: MQTTClient,
|
||||||
|
cleanSession: Bool = false,
|
||||||
|
logger: Logger? = nil,
|
||||||
|
alwaysReconnect: Bool = true
|
||||||
|
) -> Self {
|
||||||
|
let manager = ConnectionManager(
|
||||||
|
client: client,
|
||||||
|
logger: logger,
|
||||||
|
alwaysReconnect: alwaysReconnect
|
||||||
|
)
|
||||||
|
return .init {
|
||||||
|
try await manager.connect(cleanSession: cleanSession)
|
||||||
|
} shutdown: {
|
||||||
|
manager.shutdown()
|
||||||
|
} stream: {
|
||||||
|
MQTTConnectionStream(client: client, logger: logger)
|
||||||
|
.start()
|
||||||
|
.removeDuplicates()
|
||||||
|
.eraseToStream()
|
||||||
|
} _withClient: { callback in
|
||||||
|
try await callback(client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension MQTTConnectionManager: TestDependencyKey {
|
||||||
|
public static var testValue: MQTTConnectionManager {
|
||||||
|
Self()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Helpers
|
||||||
|
|
||||||
|
@_spi(Internal)
|
||||||
|
public final actor MQTTConnectionStream: Sendable {
|
||||||
|
|
||||||
|
public typealias Element = MQTTConnectionManager.Event
|
||||||
|
|
||||||
|
private let client: MQTTClient
|
||||||
|
private let continuation: AsyncStream<Element>.Continuation
|
||||||
|
private let logger: Logger?
|
||||||
|
private let name: String
|
||||||
|
private let stream: AsyncStream<Element>
|
||||||
|
private var isShuttingDown = false
|
||||||
|
|
||||||
|
public init(client: MQTTClient, logger: Logger?) {
|
||||||
|
let (stream, continuation) = AsyncStream<Element>.makeStream()
|
||||||
|
self.client = client
|
||||||
|
self.continuation = continuation
|
||||||
|
self.logger = logger
|
||||||
|
self.name = UUID().uuidString
|
||||||
|
self.stream = stream
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit { stop() }
|
||||||
|
|
||||||
|
public nonisolated func start() -> AsyncStream<Element> {
|
||||||
|
// Check if the client is active and yield the initial result.
|
||||||
|
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||||
|
|
||||||
|
// Continually check if the client is active.
|
||||||
|
let task = Task {
|
||||||
|
let isShuttingDown = await self.isShuttingDown
|
||||||
|
while !Task.isCancelled, !isShuttingDown {
|
||||||
|
try await Task.sleep(for: .milliseconds(100))
|
||||||
|
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register listener on the client for when the connection
|
||||||
|
// closes.
|
||||||
|
client.addCloseListener(named: name) { _ in
|
||||||
|
self.logger?.trace("Client has disconnected.")
|
||||||
|
self.continuation.yield(.disconnected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register listener on the client for when the client
|
||||||
|
// is shutdown.
|
||||||
|
client.addShutdownListener(named: name) { _ in
|
||||||
|
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
|
||||||
|
self.continuation.yield(.shuttingDown)
|
||||||
|
Task { await self.setIsShuttingDown() }
|
||||||
|
task.cancel()
|
||||||
|
self.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream
|
||||||
|
}
|
||||||
|
|
||||||
|
private func setIsShuttingDown() {
|
||||||
|
isShuttingDown = true
|
||||||
|
}
|
||||||
|
|
||||||
|
public nonisolated func stop() {
|
||||||
|
client.removeCloseListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
continuation.finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
actor ConnectionManager {
|
||||||
|
private let client: MQTTClient
|
||||||
|
private let logger: Logger?
|
||||||
|
private let name: String
|
||||||
|
private let shouldReconnect: Bool
|
||||||
|
private var hasConnected: Bool = false
|
||||||
|
|
||||||
|
init(
|
||||||
|
client: MQTTClient,
|
||||||
|
logger: Logger?,
|
||||||
|
alwaysReconnect: Bool
|
||||||
|
) {
|
||||||
|
self.client = client
|
||||||
|
self.logger = logger
|
||||||
|
self.name = UUID().uuidString
|
||||||
|
self.shouldReconnect = alwaysReconnect
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
// We should've already logged that we're shutting down if
|
||||||
|
// the manager was shutdown properly, so don't log it twice.
|
||||||
|
self.shutdown(withLogging: false)
|
||||||
|
}
|
||||||
|
|
||||||
|
private func setHasConnected() {
|
||||||
|
hasConnected = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func connect(
|
||||||
|
cleanSession: Bool
|
||||||
|
) async throws {
|
||||||
|
guard !hasConnected else { return }
|
||||||
|
do {
|
||||||
|
try await client.connect(cleanSession: cleanSession)
|
||||||
|
setHasConnected()
|
||||||
|
|
||||||
|
client.addCloseListener(named: name) { [weak self] _ in
|
||||||
|
guard let `self` else { return }
|
||||||
|
self.logger?.debug("Connection closed.")
|
||||||
|
if self.shouldReconnect {
|
||||||
|
self.logger?.debug("Reconnecting...")
|
||||||
|
Task {
|
||||||
|
try await self.connect(cleanSession: cleanSession)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
client.addShutdownListener(named: name) { _ in
|
||||||
|
self.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch {
|
||||||
|
logger?.trace("Failed to connect: \(error)")
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nonisolated func shutdown(withLogging: Bool = true) {
|
||||||
|
if withLogging {
|
||||||
|
logger?.trace("Shutting down connection.")
|
||||||
|
}
|
||||||
|
client.removeCloseListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,151 +1,37 @@
|
|||||||
@preconcurrency import Foundation
|
import Dependencies
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
import MQTTNIO
|
import MQTTConnectionManager
|
||||||
import NIO
|
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
|
|
||||||
// TODO: This may not need to be an actor.
|
|
||||||
|
|
||||||
/// Manages the MQTT broker connection.
|
|
||||||
public actor MQTTConnectionService: Service {
|
public actor MQTTConnectionService: Service {
|
||||||
|
@Dependency(\.mqttConnectionManager) var manager
|
||||||
|
|
||||||
private let cleanSession: Bool
|
private nonisolated let logger: Logger?
|
||||||
public let client: MQTTClient
|
|
||||||
private let continuation: AsyncStream<Event>.Continuation
|
|
||||||
public nonisolated let events: AsyncStream<Event>
|
|
||||||
private let internalEventStream: ConnectionStream
|
|
||||||
nonisolated var logger: Logger { client.logger }
|
|
||||||
// private var shuttingDown = false
|
|
||||||
|
|
||||||
public init(
|
public init(
|
||||||
cleanSession: Bool = true,
|
logger: Logger? = nil
|
||||||
client: MQTTClient
|
|
||||||
) {
|
) {
|
||||||
self.cleanSession = cleanSession
|
self.logger = logger
|
||||||
self.client = client
|
|
||||||
self.internalEventStream = .init()
|
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
|
||||||
self.events = stream
|
|
||||||
self.continuation = continuation
|
|
||||||
}
|
}
|
||||||
|
|
||||||
deinit {
|
/// The entry-point of the service which starts the connection
|
||||||
self.logger.debug("MQTTConnectionService is gone.")
|
/// to the MQTT broker and handles graceful shutdown of the
|
||||||
self.internalEventStream.stop()
|
/// connection.
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The entry-point of the service.
|
|
||||||
///
|
|
||||||
/// This method connects to the MQTT broker and manages the connection.
|
|
||||||
/// It will attempt to gracefully shutdown the connection upon receiving
|
|
||||||
/// `sigterm` signals.
|
|
||||||
public func run() async throws {
|
public func run() async throws {
|
||||||
await withGracefulShutdownHandler {
|
try await withGracefulShutdownHandler {
|
||||||
await withDiscardingTaskGroup { group in
|
try await manager.connect()
|
||||||
group.addTask { await self.connect() }
|
for await event in try manager.stream().cancelOnGracefulShutdown() {
|
||||||
group.addTask {
|
// We don't really need to do anything with the events, so just logging
|
||||||
await self.internalEventStream.start { self.client.isActive() }
|
// for now. But we need to iterate on an async stream for the service to
|
||||||
}
|
// continue to run and handle graceful shutdowns.
|
||||||
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() {
|
logger?.trace("Received connection event: \(event)")
|
||||||
if event == .shuttingDown {
|
|
||||||
self.shutdown()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
self.logger.trace("Sending connection event: \(event)")
|
|
||||||
self.continuation.yield(event)
|
|
||||||
}
|
|
||||||
group.cancelAll()
|
|
||||||
}
|
}
|
||||||
|
// when we reach here we are shutting down, so we shutdown
|
||||||
|
// the manager.
|
||||||
|
manager.shutdown()
|
||||||
} onGracefulShutdown: {
|
} onGracefulShutdown: {
|
||||||
self.logger.trace("Received graceful shutdown.")
|
self.logger?.trace("Received graceful shutdown.")
|
||||||
self.shutdown()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect() async {
|
|
||||||
do {
|
|
||||||
try await withThrowingDiscardingTaskGroup { group in
|
|
||||||
group.addTask {
|
|
||||||
try await self.client.connect(cleanSession: self.cleanSession)
|
|
||||||
}
|
|
||||||
client.addCloseListener(named: "\(Self.self)") { _ in
|
|
||||||
Task {
|
|
||||||
self.logger.debug("Connection closed.")
|
|
||||||
self.logger.debug("Reconnecting...")
|
|
||||||
await self.connect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.logger.debug("Connection successful.")
|
|
||||||
self.continuation.yield(.connected)
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
logger.trace("Failed to connect: \(error)")
|
|
||||||
continuation.yield(.disconnected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private nonisolated func shutdown() {
|
|
||||||
logger.debug("Begin shutting down MQTT broker connection.")
|
|
||||||
client.removeCloseListener(named: "\(Self.self)")
|
|
||||||
internalEventStream.stop()
|
|
||||||
_ = client.disconnect()
|
|
||||||
try? client.syncShutdownGracefully()
|
|
||||||
continuation.finish()
|
|
||||||
logger.info("MQTT broker connection closed.")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
extension MQTTConnectionService {
|
|
||||||
|
|
||||||
public enum Event: Sendable {
|
|
||||||
case connected
|
|
||||||
case disconnected
|
|
||||||
case shuttingDown
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This functionality can probably move into the connection service.
|
|
||||||
|
|
||||||
private final class ConnectionStream: Sendable {
|
|
||||||
|
|
||||||
// private var cancellable: AnyCancellable?
|
|
||||||
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
|
|
||||||
let events: AsyncStream<MQTTConnectionService.Event>
|
|
||||||
|
|
||||||
init() {
|
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
|
|
||||||
self.events = stream
|
|
||||||
self.continuation = continuation
|
|
||||||
}
|
|
||||||
|
|
||||||
deinit {
|
|
||||||
stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func start(isActive connectionIsActive: @escaping () -> Bool) async {
|
|
||||||
try? await Task.sleep(for: .seconds(1))
|
|
||||||
let event: MQTTConnectionService.Event = connectionIsActive()
|
|
||||||
? .connected
|
|
||||||
: .disconnected
|
|
||||||
|
|
||||||
continuation.yield(event)
|
|
||||||
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
|
||||||
// .autoconnect()
|
|
||||||
// .sink { [weak self] (_: Date) in
|
|
||||||
// let event: MQTTConnectionService.Event = connectionIsActive()
|
|
||||||
// ? .connected
|
|
||||||
// : .disconnected
|
|
||||||
//
|
|
||||||
// self?.continuation.yield(event)
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
func stop() {
|
|
||||||
continuation.yield(.shuttingDown)
|
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,10 +56,11 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
|||||||
public var dewPoint: DewPoint? {
|
public var dewPoint: DewPoint? {
|
||||||
get async {
|
get async {
|
||||||
guard let temperature = temperature,
|
guard let temperature = temperature,
|
||||||
let humidity = humidity
|
let humidity = humidity,
|
||||||
|
!temperature.value.isNaN,
|
||||||
|
!humidity.value.isNaN
|
||||||
else { return nil }
|
else { return nil }
|
||||||
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
|
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
|
||||||
// return .init(dryBulb: temperature, humidity: humidity)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,12 +68,13 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
|||||||
public var enthalpy: EnthalpyOf<MoistAir>? {
|
public var enthalpy: EnthalpyOf<MoistAir>? {
|
||||||
get async {
|
get async {
|
||||||
guard let temperature = temperature,
|
guard let temperature = temperature,
|
||||||
let humidity = humidity
|
let humidity = humidity,
|
||||||
|
!temperature.value.isNaN,
|
||||||
|
!humidity.value.isNaN
|
||||||
else { return nil }
|
else { return nil }
|
||||||
return try? await psychrometrics.enthalpy.moistAir(
|
return try? await psychrometrics.enthalpy.moistAir(
|
||||||
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
|
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
|
||||||
)
|
)
|
||||||
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,10 +131,10 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
|||||||
prefix = "\(prefix.dropLast())"
|
prefix = "\(prefix.dropLast())"
|
||||||
}
|
}
|
||||||
self.init(
|
self.init(
|
||||||
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state",
|
dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
|
||||||
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state",
|
enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
|
||||||
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state",
|
humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
|
||||||
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state"
|
temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
/// This allows values to only publish changes if they have changed since the
|
/// This allows values to only publish changes if they have changed since the
|
||||||
/// last time they were recieved.
|
/// last time they were recieved.
|
||||||
@propertyWrapper
|
@propertyWrapper
|
||||||
public struct TrackedChanges<Value> {
|
public struct TrackedChanges<Value: Sendable>: Sendable {
|
||||||
|
|
||||||
/// The current tracking state.
|
/// The current tracking state.
|
||||||
private var tracking: TrackingState
|
private var tracking: TrackingState
|
||||||
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
|
|||||||
private var value: Value
|
private var value: Value
|
||||||
|
|
||||||
/// Used to check if a new value is equal to an old value.
|
/// Used to check if a new value is equal to an old value.
|
||||||
private var isEqual: (Value, Value) -> Bool
|
private var isEqual: @Sendable (Value, Value) -> Bool
|
||||||
|
|
||||||
/// Access to the underlying property that we are wrapping.
|
/// Access to the underlying property that we are wrapping.
|
||||||
public var wrappedValue: Value {
|
public var wrappedValue: Value {
|
||||||
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
|
|||||||
public init(
|
public init(
|
||||||
wrappedValue: Value,
|
wrappedValue: Value,
|
||||||
needsProcessed: Bool = false,
|
needsProcessed: Bool = false,
|
||||||
isEqual: @escaping (Value, Value) -> Bool
|
isEqual: @escaping @Sendable (Value, Value) -> Bool
|
||||||
) {
|
) {
|
||||||
self.value = wrappedValue
|
self.value = wrappedValue
|
||||||
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
|
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
|
||||||
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
|
|||||||
wrappedValue: Value,
|
wrappedValue: Value,
|
||||||
needsProcessed: Bool = false
|
needsProcessed: Bool = false
|
||||||
) {
|
) {
|
||||||
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
|
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
|
||||||
|
$0 == $1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
|
|||||||
hasher.combine(needsProcessed)
|
hasher.combine(needsProcessed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension TrackedChanges: Sendable where Value: Sendable {}
|
|
||||||
|
|||||||
@@ -1,6 +1,3 @@
|
|||||||
import Logging
|
|
||||||
import Models
|
|
||||||
import MQTTNIO
|
|
||||||
import NIO
|
import NIO
|
||||||
import NIOFoundationCompat
|
import NIOFoundationCompat
|
||||||
import PsychrometricClient
|
import PsychrometricClient
|
||||||
|
|||||||
@@ -3,305 +3,178 @@ import DependenciesMacros
|
|||||||
import Foundation
|
import Foundation
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
import MQTTConnectionService
|
import MQTTConnectionManager
|
||||||
@preconcurrency import MQTTNIO
|
import MQTTNIO
|
||||||
import NIO
|
import NIO
|
||||||
import PsychrometricClient
|
import PsychrometricClient
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
|
import TopicDependencies
|
||||||
|
|
||||||
@DependencyClient
|
/// Service that is responsible for listening to changes of the temperature and humidity
|
||||||
public struct SensorsClient: Sendable {
|
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
|
||||||
|
/// the sensor location.
|
||||||
|
///
|
||||||
|
///
|
||||||
|
public actor SensorsService: Service {
|
||||||
|
|
||||||
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo>
|
@Dependency(\.mqttConnectionManager.stream) var connectionStream
|
||||||
public var logger: Logger?
|
@Dependency(\.topicListener) var topicListener
|
||||||
public var publish: @Sendable (Double, String) async throws -> Void
|
@Dependency(\.topicPublisher) var topicPublisher
|
||||||
public var shutdown: @Sendable () -> Void = {}
|
|
||||||
|
|
||||||
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> {
|
/// The logger to use for the service.
|
||||||
try await listen(topics)
|
private let logger: Logger?
|
||||||
}
|
|
||||||
|
|
||||||
public func publish(_ value: Double, to topic: String) async throws {
|
/// The sensors that we are listening for updates to, so
|
||||||
try await publish(value, topic)
|
/// that we can calculate the dew-point temperature and enthalpy
|
||||||
}
|
/// values to publish back to the MQTT broker.
|
||||||
}
|
var sensors: [TemperatureAndHumiditySensor]
|
||||||
|
|
||||||
extension SensorsClient: TestDependencyKey {
|
var topics: [String] {
|
||||||
public static var testValue: SensorsClient {
|
|
||||||
Self()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public extension DependencyValues {
|
|
||||||
var sensorsClient: SensorsClient {
|
|
||||||
get { self[SensorsClient.self] }
|
|
||||||
set { self[SensorsClient.self] = newValue }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public actor SensorsService2: Service {
|
|
||||||
|
|
||||||
@Dependency(\.sensorsClient) var client
|
|
||||||
|
|
||||||
private var sensors: [TemperatureAndHumiditySensor]
|
|
||||||
|
|
||||||
public init(
|
|
||||||
sensors: [TemperatureAndHumiditySensor]
|
|
||||||
) {
|
|
||||||
self.sensors = sensors
|
|
||||||
}
|
|
||||||
|
|
||||||
public func run() async throws {
|
|
||||||
guard sensors.count > 0 else {
|
|
||||||
throw SensorCountError()
|
|
||||||
}
|
|
||||||
|
|
||||||
let stream = try await client.listen(to: topics)
|
|
||||||
|
|
||||||
do {
|
|
||||||
try await withGracefulShutdownHandler {
|
|
||||||
try await withThrowingDiscardingTaskGroup { group in
|
|
||||||
for await result in stream.cancelOnGracefulShutdown() {
|
|
||||||
group.addTask { try await self.handleResult(result) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} onGracefulShutdown: {
|
|
||||||
Task {
|
|
||||||
await self.client.logger?.trace("Received graceful shutdown.")
|
|
||||||
try? await self.publishUpdates()
|
|
||||||
await self.client.shutdown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
client.logger?.trace("Error: \(error)")
|
|
||||||
client.shutdown()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private var topics: [String] {
|
|
||||||
sensors.reduce(into: [String]()) { array, sensor in
|
sensors.reduce(into: [String]()) { array, sensor in
|
||||||
array.append(sensor.topics.temperature)
|
array.append(sensor.topics.temperature)
|
||||||
array.append(sensor.topics.humidity)
|
array.append(sensor.topics.humidity)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func handleResult(_ result: MQTTPublishInfo) async throws {
|
/// Create a new sensors service that listens to the passed in
|
||||||
let topic = result.topicName
|
/// sensors.
|
||||||
client.logger?.trace("Begin handling result for topic: \(topic)")
|
///
|
||||||
|
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - sensors: The sensors to listen for changes to.
|
||||||
|
/// - logger: An optional logger to use.
|
||||||
|
public init(
|
||||||
|
sensors: [TemperatureAndHumiditySensor],
|
||||||
|
logger: Logger? = nil
|
||||||
|
) {
|
||||||
|
self.sensors = sensors
|
||||||
|
self.logger = logger
|
||||||
|
}
|
||||||
|
|
||||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
/// Start the service with graceful shutdown, which will attempt to publish
|
||||||
var buffer = result.payload
|
/// any pending changes to the MQTT broker, upon a shutdown signal.
|
||||||
return V(buffer: &buffer)
|
public func run() async throws {
|
||||||
}
|
precondition(sensors.count > 0, "Sensors should not be empty.")
|
||||||
|
|
||||||
if topic.contains("temperature") {
|
try await withGracefulShutdownHandler {
|
||||||
client.logger?.trace("Begin handling temperature result.")
|
// Listen for connection events, so that we can automatically
|
||||||
guard let temperature = decode(DryBulb.self) else {
|
// reconnect any sensor topics we're listening to upon a disconnect / reconnect
|
||||||
client.logger?.trace("Failed to decode temperature: \(result.payload)")
|
// event. We can also shutdown any topic listeners upon a shutdown event.
|
||||||
throw DecodingError()
|
for await event in try connectionStream().cancelOnGracefulShutdown() {
|
||||||
|
switch event {
|
||||||
|
case .shuttingDown:
|
||||||
|
logger?.debug("Received shutdown event.")
|
||||||
|
try await self.shutdown()
|
||||||
|
case .disconnected:
|
||||||
|
logger?.debug("Received disconnected event.")
|
||||||
|
try await Task.sleep(for: .milliseconds(100))
|
||||||
|
case .connected:
|
||||||
|
logger?.debug("Received connected event.")
|
||||||
|
let stream = try await makeStream()
|
||||||
|
for await result in stream.cancelOnGracefulShutdown() {
|
||||||
|
logger?.debug("Received result for topic: \(result.topic)")
|
||||||
|
await self.handleResult(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
client.logger?.trace("Decoded temperature: \(temperature)")
|
} onGracefulShutdown: {
|
||||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
Task {
|
||||||
|
self.logger?.debug("Received graceful shutdown.")
|
||||||
} else if topic.contains("humidity") {
|
try await self.shutdown()
|
||||||
client.logger?.trace("Begin handling humidity result.")
|
|
||||||
guard let humidity = decode(RelativeHumidity.self) else {
|
|
||||||
client.logger?.trace("Failed to decode humidity: \(result.payload)")
|
|
||||||
throw DecodingError()
|
|
||||||
}
|
}
|
||||||
client.logger?.trace("Decoded humidity: \(humidity)")
|
|
||||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
|
||||||
} else {
|
|
||||||
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@_spi(Internal)
|
||||||
|
public func shutdown() async throws {
|
||||||
try await publishUpdates()
|
try await publishUpdates()
|
||||||
client.logger?.trace("Done handling result for topic: \(topic)")
|
topicListener.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
|
||||||
|
try await topicListener.listen(to: topics)
|
||||||
|
// ignore errors, so that we continue to listen, but log them
|
||||||
|
// for debugging purposes.
|
||||||
|
.compactMap { result in
|
||||||
|
switch result {
|
||||||
|
case let .failure(error):
|
||||||
|
self.logger?.debug("Received error listening for sensors: \(error)")
|
||||||
|
return nil
|
||||||
|
case let .success(info):
|
||||||
|
return (info.payload, info.topicName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// ignore duplicate values, to prevent publishing dew-point and enthalpy
|
||||||
|
// changes to frequently.
|
||||||
|
.removeDuplicates { lhs, rhs in
|
||||||
|
lhs.buffer == rhs.buffer
|
||||||
|
&& lhs.topic == rhs.topic
|
||||||
|
}
|
||||||
|
.eraseToStream()
|
||||||
|
}
|
||||||
|
|
||||||
|
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
|
||||||
|
do {
|
||||||
|
let topic = result.topic
|
||||||
|
assert(topics.contains(topic))
|
||||||
|
logger?.debug("Begin handling result for topic: \(topic)")
|
||||||
|
|
||||||
|
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||||
|
var buffer = result.buffer
|
||||||
|
return V(buffer: &buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
if topic.contains("temperature") {
|
||||||
|
logger?.debug("Begin handling temperature result.")
|
||||||
|
guard let temperature = decode(DryBulb.self) else {
|
||||||
|
logger?.debug("Failed to decode temperature: \(result.buffer)")
|
||||||
|
throw DecodingError()
|
||||||
|
}
|
||||||
|
logger?.debug("Decoded temperature: \(temperature)")
|
||||||
|
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||||
|
|
||||||
|
} else if topic.contains("humidity") {
|
||||||
|
logger?.debug("Begin handling humidity result.")
|
||||||
|
guard let humidity = decode(RelativeHumidity.self) else {
|
||||||
|
logger?.debug("Failed to decode humidity: \(result.buffer)")
|
||||||
|
throw DecodingError()
|
||||||
|
}
|
||||||
|
logger?.debug("Decoded humidity: \(humidity)")
|
||||||
|
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||||
|
}
|
||||||
|
|
||||||
|
try await publishUpdates()
|
||||||
|
logger?.debug("Done handling result for topic: \(topic)")
|
||||||
|
} catch {
|
||||||
|
logger?.error("Received error while handling result: \(error)")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func publish(_ double: Double?, to topic: String) async throws {
|
private func publish(_ double: Double?, to topic: String) async throws {
|
||||||
guard let double else { return }
|
guard let double else { return }
|
||||||
try await client.publish(double, to: topic)
|
try await topicPublisher.publish(
|
||||||
client.logger?.trace("Published update to topic: \(topic)")
|
to: topic,
|
||||||
|
payload: ByteBufferAllocator().buffer(string: "\(double)"),
|
||||||
|
qos: .exactlyOnce,
|
||||||
|
retain: true
|
||||||
|
)
|
||||||
|
logger?.debug("Published update to topic: \(topic)")
|
||||||
}
|
}
|
||||||
|
|
||||||
private func publishUpdates() async throws {
|
private func publishUpdates() async throws {
|
||||||
for sensor in sensors.filter(\.needsProcessed) {
|
for sensor in sensors.filter(\.needsProcessed) {
|
||||||
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
||||||
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public actor SensorsService: Service {
|
|
||||||
private var sensors: [TemperatureAndHumiditySensor]
|
|
||||||
private let client: MQTTClient
|
|
||||||
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
|
|
||||||
nonisolated var logger: Logger { client.logger }
|
|
||||||
private var shuttingDown: Bool = false
|
|
||||||
|
|
||||||
public init(
|
|
||||||
client: MQTTClient,
|
|
||||||
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
|
|
||||||
sensors: [TemperatureAndHumiditySensor]
|
|
||||||
) {
|
|
||||||
self.client = client
|
|
||||||
self.events = events
|
|
||||||
self.sensors = sensors
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The entry-point of the service.
|
|
||||||
///
|
|
||||||
/// This method is called to start the service and begin
|
|
||||||
/// listening for sensor value changes then publishing the dew-point
|
|
||||||
/// and enthalpy values of the sensors.
|
|
||||||
public func run() async throws {
|
|
||||||
do {
|
|
||||||
try await withGracefulShutdownHandler {
|
|
||||||
try await withThrowingDiscardingTaskGroup { group in
|
|
||||||
client.addPublishListener(named: "\(Self.self)") { result in
|
|
||||||
if self.shuttingDown {
|
|
||||||
self.logger.trace("Shutting down.")
|
|
||||||
} else if !self.client.isActive() {
|
|
||||||
self.logger.trace("Client is not currently active")
|
|
||||||
} else {
|
|
||||||
Task { try await self.handleResult(result) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for await event in self.events().cancelOnGracefulShutdown() {
|
|
||||||
logger.trace("Received event: \(event)")
|
|
||||||
if event == .shuttingDown {
|
|
||||||
self.setIsShuttingDown()
|
|
||||||
} else if event == .connected {
|
|
||||||
group.addTask { try await self.subscribeToSensors() }
|
|
||||||
} else {
|
|
||||||
group.addTask { await self.unsubscribeToSensors() }
|
|
||||||
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} onGracefulShutdown: {
|
|
||||||
// do something.
|
|
||||||
self.logger.debug("Received graceful shutdown.")
|
|
||||||
Task { [weak self] in await self?.setIsShuttingDown() }
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
|
|
||||||
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
|
|
||||||
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
|
|
||||||
// to ignore the `noConnection` error.
|
|
||||||
logger.trace("Run error: \(error)")
|
|
||||||
// throw error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func setIsShuttingDown() {
|
|
||||||
logger.debug("Received shut down event.")
|
|
||||||
Task { try await publishUpdates() }
|
|
||||||
Task { await self.unsubscribeToSensors() }
|
|
||||||
shuttingDown = true
|
|
||||||
client.removePublishListener(named: "\(Self.self)")
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleResult(
|
|
||||||
_ result: Result<MQTTPublishInfo, any Error>
|
|
||||||
) async throws {
|
|
||||||
logger.trace("Begin handling result")
|
|
||||||
do {
|
|
||||||
switch result {
|
|
||||||
case let .failure(error):
|
|
||||||
logger.debug("Failed receiving sensor: \(error)")
|
|
||||||
throw error
|
|
||||||
case let .success(value):
|
|
||||||
// do something.
|
|
||||||
let topic = value.topicName
|
|
||||||
logger.trace("Received new value for topic: \(topic)")
|
|
||||||
if topic.contains("temperature") {
|
|
||||||
// do something.
|
|
||||||
var buffer = value.payload
|
|
||||||
guard let temperature = DryBulb(buffer: &buffer) else {
|
|
||||||
logger.trace("Decoding error for topic: \(topic)")
|
|
||||||
throw DecodingError()
|
|
||||||
}
|
|
||||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
|
||||||
try await publishUpdates()
|
|
||||||
|
|
||||||
} else if topic.contains("humidity") {
|
|
||||||
var buffer = value.payload
|
|
||||||
// Decode and update the temperature value
|
|
||||||
guard let humidity = RelativeHumidity(buffer: &buffer) else {
|
|
||||||
logger.debug("Failed to decode humidity from buffer: \(buffer)")
|
|
||||||
throw DecodingError()
|
|
||||||
}
|
|
||||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
|
||||||
try await publishUpdates()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
logger.trace("Handle Result error: \(error)")
|
|
||||||
throw error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func subscribeToSensors() async throws {
|
|
||||||
for sensor in sensors {
|
|
||||||
_ = try await client.subscribe(to: [
|
|
||||||
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
|
|
||||||
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
|
|
||||||
])
|
|
||||||
logger.debug("Subscribed to sensor: \(sensor.location)")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func unsubscribeToSensors() async {
|
|
||||||
logger.trace("Begin unsubscribe to sensors.")
|
|
||||||
guard client.isActive() else {
|
|
||||||
logger.debug("Client is not active, skipping.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
do {
|
|
||||||
let topics = sensors.reduce(into: [String]()) { array, sensor in
|
|
||||||
array.append(sensor.topics.temperature)
|
|
||||||
array.append(sensor.topics.humidity)
|
|
||||||
}
|
|
||||||
try await client.unsubscribe(from: topics)
|
|
||||||
logger.trace("Unsubscribed from sensors.")
|
|
||||||
} catch {
|
|
||||||
logger.trace("Unsubscribe error: \(error)")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func publish(double: Double?, to topic: String) async throws {
|
|
||||||
guard client.isActive() else { return }
|
|
||||||
guard let double else { return }
|
|
||||||
let rounded = round(double * 100) / 100
|
|
||||||
logger.debug("Publishing \(rounded), to: \(topic)")
|
|
||||||
try await client.publish(
|
|
||||||
to: topic,
|
|
||||||
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
|
|
||||||
qos: .exactlyOnce,
|
|
||||||
retain: true
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
private func publishUpdates() async throws {
|
|
||||||
for sensor in sensors.filter(\.needsProcessed) {
|
|
||||||
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
|
||||||
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
|
||||||
try sensors.hasProcessed(sensor)
|
try sensors.hasProcessed(sensor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: - Errors
|
// MARK: - Errors
|
||||||
|
|
||||||
struct DecodingError: Error {}
|
struct DecodingError: Error {}
|
||||||
struct MQTTClientNotConnected: Error {}
|
struct SensorNotFoundError: Error {}
|
||||||
struct NotFoundError: Error {}
|
|
||||||
struct SensorExists: Error {}
|
|
||||||
struct SensorCountError: Error {}
|
|
||||||
|
|
||||||
// MARK: - Helpers
|
// MARK: - Helpers
|
||||||
|
|
||||||
@@ -319,14 +192,14 @@ private extension Array where Element == TemperatureAndHumiditySensor {
|
|||||||
with value: V
|
with value: V
|
||||||
) throws {
|
) throws {
|
||||||
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
|
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
|
||||||
throw NotFoundError()
|
throw SensorNotFoundError()
|
||||||
}
|
}
|
||||||
self[index][keyPath: keyPath] = value
|
self[index][keyPath: keyPath] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
|
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
|
||||||
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
|
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
|
||||||
throw NotFoundError()
|
throw SensorNotFoundError()
|
||||||
}
|
}
|
||||||
self[index].needsProcessed = false
|
self[index].needsProcessed = false
|
||||||
}
|
}
|
||||||
|
|||||||
186
Sources/TopicDependencies/TopicListener.swift
Normal file
186
Sources/TopicDependencies/TopicListener.swift
Normal file
@@ -0,0 +1,186 @@
|
|||||||
|
import Dependencies
|
||||||
|
import DependenciesMacros
|
||||||
|
import Foundation
|
||||||
|
import MQTTNIO
|
||||||
|
|
||||||
|
/// A dependency that can generate an async stream of changes to the given topics.
|
||||||
|
///
|
||||||
|
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
|
||||||
|
/// to generate the live dependency.
|
||||||
|
@DependencyClient
|
||||||
|
public struct TopicListener: Sendable {
|
||||||
|
|
||||||
|
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, any Error>>
|
||||||
|
|
||||||
|
/// Create an async stream that listens for changes to the given topics.
|
||||||
|
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
|
||||||
|
|
||||||
|
/// Shutdown the listener stream.
|
||||||
|
public var shutdown: @Sendable () -> Void
|
||||||
|
|
||||||
|
/// Create a new topic listener.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - listen: Generate an async stream of changes for the given topics.
|
||||||
|
/// - shutdown: Shutdown the topic listener stream.
|
||||||
|
public init(
|
||||||
|
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
|
||||||
|
shutdown: @Sendable @escaping () -> Void
|
||||||
|
) {
|
||||||
|
self._listen = listen
|
||||||
|
self.shutdown = shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an async stream that listens for changes to the given topics.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topics: The topics to listen for changes to.
|
||||||
|
/// - qos: The MQTTQoS for the subscription.
|
||||||
|
public func listen(
|
||||||
|
to topics: [String],
|
||||||
|
qos: MQTTQoS = .atLeastOnce
|
||||||
|
) async throws -> Stream {
|
||||||
|
try await _listen(topics, qos)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an async stream that listens for changes to the given topics.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topics: The topics to listen for changes to.
|
||||||
|
/// - qos: The MQTTQoS for the subscription.
|
||||||
|
public func listen(
|
||||||
|
_ topics: String...,
|
||||||
|
qos: MQTTQoS = .atLeastOnce
|
||||||
|
) async throws -> Stream {
|
||||||
|
try await listen(to: topics, qos: qos)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create the live implementation of the topic listener with the given MQTTClient.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - client: The MQTTClient to use.
|
||||||
|
public static func live(client: MQTTClient) -> Self {
|
||||||
|
let listener = MQTTTopicListener(client: client)
|
||||||
|
return .init(
|
||||||
|
listen: { try await listener.listen($0, $1) },
|
||||||
|
shutdown: { listener.shutdown() }
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension TopicListener: TestDependencyKey {
|
||||||
|
public static var testValue: TopicListener { Self() }
|
||||||
|
}
|
||||||
|
|
||||||
|
public extension DependencyValues {
|
||||||
|
var topicListener: TopicListener {
|
||||||
|
get { self[TopicListener.self] }
|
||||||
|
set { self[TopicListener.self] = newValue }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Helpers
|
||||||
|
|
||||||
|
private actor MQTTTopicListener {
|
||||||
|
|
||||||
|
private let client: MQTTClient
|
||||||
|
private let continuation: TopicListener.Stream.Continuation
|
||||||
|
private let name: String
|
||||||
|
let stream: TopicListener.Stream
|
||||||
|
private var shuttingDown: Bool = false
|
||||||
|
|
||||||
|
init(
|
||||||
|
client: MQTTClient
|
||||||
|
) {
|
||||||
|
let (stream, continuation) = TopicListener.Stream.makeStream()
|
||||||
|
self.client = client
|
||||||
|
self.continuation = continuation
|
||||||
|
self.name = UUID().uuidString
|
||||||
|
self.stream = stream
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
if !shuttingDown {
|
||||||
|
let message = """
|
||||||
|
Shutdown was not called on topic listener. This could lead to potential errors or
|
||||||
|
the stream never ending.
|
||||||
|
|
||||||
|
Please ensure that you call shutdown on the listener.
|
||||||
|
"""
|
||||||
|
client.logger.warning("\(message)")
|
||||||
|
continuation.finish()
|
||||||
|
}
|
||||||
|
client.removePublishListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func listen(
|
||||||
|
_ topics: [String],
|
||||||
|
_ qos: MQTTQoS = .atLeastOnce
|
||||||
|
) async throws -> TopicListener.Stream {
|
||||||
|
var sleepTimes = 0
|
||||||
|
|
||||||
|
while !client.isActive() {
|
||||||
|
guard sleepTimes < 10 else {
|
||||||
|
throw TopicListenerError.connectionTimeout
|
||||||
|
}
|
||||||
|
try? await Task.sleep(for: .milliseconds(100))
|
||||||
|
sleepTimes += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
client.logger.trace("Client is active, begin subscribing to topics.")
|
||||||
|
|
||||||
|
let subscription = try? await client.subscribe(to: topics.map {
|
||||||
|
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
|
||||||
|
})
|
||||||
|
|
||||||
|
guard subscription != nil else {
|
||||||
|
client.logger.error("Error subscribing to topics: \(topics)")
|
||||||
|
throw TopicListenerError.failedToSubscribe
|
||||||
|
}
|
||||||
|
|
||||||
|
client.logger.trace("Done subscribing, begin listening to topics.")
|
||||||
|
|
||||||
|
client.addPublishListener(named: name) { result in
|
||||||
|
switch result {
|
||||||
|
case let .failure(error):
|
||||||
|
self.client.logger.error("Received error while listening: \(error)")
|
||||||
|
self.continuation.yield(.failure(MQTTListenResultError(error)))
|
||||||
|
case let .success(publishInfo):
|
||||||
|
if topics.contains(publishInfo.topicName) {
|
||||||
|
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
|
||||||
|
self.continuation.yield(.success(publishInfo))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream
|
||||||
|
}
|
||||||
|
|
||||||
|
private func setIsShuttingDown() {
|
||||||
|
shuttingDown = true
|
||||||
|
}
|
||||||
|
|
||||||
|
nonisolated func shutdown() {
|
||||||
|
client.logger.trace("Closing topic listener...")
|
||||||
|
continuation.finish()
|
||||||
|
client.removePublishListener(named: name)
|
||||||
|
client.removeShutdownListener(named: name)
|
||||||
|
Task { await self.setIsShuttingDown() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MARK: - Errors
|
||||||
|
|
||||||
|
public enum TopicListenerError: Error {
|
||||||
|
case connectionTimeout
|
||||||
|
case failedToSubscribe
|
||||||
|
}
|
||||||
|
|
||||||
|
public struct MQTTListenResultError: Error {
|
||||||
|
let underlyingError: any Error
|
||||||
|
|
||||||
|
init(_ underlyingError: any Error) {
|
||||||
|
self.underlyingError = underlyingError
|
||||||
|
}
|
||||||
|
}
|
||||||
117
Sources/TopicDependencies/TopicPublisher.swift
Normal file
117
Sources/TopicDependencies/TopicPublisher.swift
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
import Dependencies
|
||||||
|
import DependenciesMacros
|
||||||
|
import MQTTNIO
|
||||||
|
import NIO
|
||||||
|
|
||||||
|
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||||
|
///
|
||||||
|
/// - Note: This dependency only conforms to `TestDependencyKey` because it
|
||||||
|
/// requires an active `MQTTClient` to generate the live dependency.
|
||||||
|
@DependencyClient
|
||||||
|
public struct TopicPublisher: Sendable {
|
||||||
|
|
||||||
|
private var _publish: @Sendable (PublishRequest) async throws -> Void
|
||||||
|
|
||||||
|
/// Create a new topic publisher.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - publish: Handle the publish request.
|
||||||
|
public init(
|
||||||
|
publish: @Sendable @escaping (PublishRequest) async throws -> Void
|
||||||
|
) {
|
||||||
|
self._publish = publish
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Publish a new value to the given topic.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topicName: The topic to publish the new value to.
|
||||||
|
/// - payload: The value to publish.
|
||||||
|
/// - qos: The MQTTQoS.
|
||||||
|
/// - retain: The retain flag.
|
||||||
|
public func publish(
|
||||||
|
to topicName: String,
|
||||||
|
payload: ByteBuffer,
|
||||||
|
qos: MQTTQoS,
|
||||||
|
retain: Bool = false
|
||||||
|
) async throws {
|
||||||
|
try await _publish(.init(
|
||||||
|
topicName: topicName,
|
||||||
|
payload: payload,
|
||||||
|
qos: qos,
|
||||||
|
retain: retain
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create the live topic publisher with the given `MQTTClient`.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - client: The mqtt broker client to use.
|
||||||
|
public static func live(client: MQTTClient) -> Self {
|
||||||
|
.init(
|
||||||
|
publish: { request in
|
||||||
|
guard client.isActive() else {
|
||||||
|
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
client.logger.trace("Begin publishing to topic: \(request.topicName)")
|
||||||
|
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
|
||||||
|
try await client.publish(
|
||||||
|
to: request.topicName,
|
||||||
|
payload: request.payload,
|
||||||
|
qos: request.qos,
|
||||||
|
retain: request.retain
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents the parameters required to publish a new value to the
|
||||||
|
/// MQTT broker.
|
||||||
|
public struct PublishRequest: Equatable, Sendable {
|
||||||
|
|
||||||
|
/// The topic to publish the new value to.
|
||||||
|
public let topicName: String
|
||||||
|
|
||||||
|
/// The value to publish.
|
||||||
|
public let payload: ByteBuffer
|
||||||
|
|
||||||
|
/// The qos of the request.
|
||||||
|
public let qos: MQTTQoS
|
||||||
|
|
||||||
|
/// The retain flag for the request.
|
||||||
|
public let retain: Bool
|
||||||
|
|
||||||
|
/// Create a new publish request.
|
||||||
|
///
|
||||||
|
/// - Parameters:
|
||||||
|
/// - topicName: The topic to publish to.
|
||||||
|
/// - payload: The value to publish.
|
||||||
|
/// - qos: The qos of the request.
|
||||||
|
/// - retain: The retain flag of the request.
|
||||||
|
public init(
|
||||||
|
topicName: String,
|
||||||
|
payload: ByteBuffer,
|
||||||
|
qos: MQTTQoS,
|
||||||
|
retain: Bool
|
||||||
|
) {
|
||||||
|
self.topicName = topicName
|
||||||
|
self.payload = payload
|
||||||
|
self.qos = qos
|
||||||
|
self.retain = retain
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension TopicPublisher: TestDependencyKey {
|
||||||
|
public static var testValue: TopicPublisher { Self() }
|
||||||
|
}
|
||||||
|
|
||||||
|
public extension DependencyValues {
|
||||||
|
|
||||||
|
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||||
|
var topicPublisher: TopicPublisher {
|
||||||
|
get { self[TopicPublisher.self] }
|
||||||
|
set { self[TopicPublisher.self] = newValue }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,15 +1,19 @@
|
|||||||
|
import Dependencies
|
||||||
import Foundation
|
import Foundation
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
|
import MQTTConnectionManager
|
||||||
import MQTTConnectionService
|
import MQTTConnectionService
|
||||||
import MQTTNIO
|
import MQTTNIO
|
||||||
import NIO
|
import NIO
|
||||||
import PsychrometricClientLive
|
import PsychrometricClientLive
|
||||||
import SensorsService
|
import SensorsService
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
|
import TopicDependencies
|
||||||
|
|
||||||
@main
|
@main
|
||||||
struct Application {
|
struct Application {
|
||||||
|
|
||||||
/// The main entry point of the application.
|
/// The main entry point of the application.
|
||||||
static func main() async throws {
|
static func main() async throws {
|
||||||
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
@@ -31,23 +35,37 @@ struct Application {
|
|||||||
logger: logger
|
logger: logger
|
||||||
)
|
)
|
||||||
|
|
||||||
let mqttConnection = MQTTConnectionService(client: mqtt)
|
do {
|
||||||
let sensors = SensorsService(
|
try await withDependencies {
|
||||||
client: mqtt,
|
$0.psychrometricClient = .liveValue
|
||||||
events: { mqttConnection.events },
|
$0.topicListener = .live(client: mqtt)
|
||||||
sensors: .live
|
$0.topicPublisher = .live(client: mqtt)
|
||||||
)
|
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
|
||||||
|
} operation: {
|
||||||
|
let mqttConnection = MQTTConnectionService(logger: logger)
|
||||||
|
let sensors = SensorsService(sensors: .live, logger: logger)
|
||||||
|
|
||||||
let serviceGroup = ServiceGroup(
|
var serviceGroupConfiguration = ServiceGroupConfiguration(
|
||||||
services: [
|
services: [
|
||||||
mqttConnection,
|
mqttConnection,
|
||||||
sensors
|
sensors
|
||||||
],
|
],
|
||||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||||
logger: logger
|
logger: logger
|
||||||
)
|
)
|
||||||
|
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
|
||||||
|
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
|
||||||
|
|
||||||
try await serviceGroup.run()
|
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
|
||||||
|
|
||||||
|
try await serviceGroup.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
try await mqtt.shutdown()
|
||||||
|
try await eventloopGroup.shutdownGracefully()
|
||||||
|
} catch {
|
||||||
|
try await eventloopGroup.shutdownGracefully()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
import Combine
|
import AsyncAlgorithms
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
@testable import MQTTConnectionService
|
@_spi(Internal) import MQTTConnectionManager
|
||||||
|
import MQTTConnectionService
|
||||||
import MQTTNIO
|
import MQTTNIO
|
||||||
import NIO
|
import NIO
|
||||||
import ServiceLifecycle
|
import ServiceLifecycle
|
||||||
@@ -13,22 +14,52 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
|||||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||||
|
|
||||||
static let logger: Logger = {
|
static let logger: Logger = {
|
||||||
var logger = Logger(label: "AsyncClientTests")
|
var logger = Logger(label: "MQTTConnectionServiceTests")
|
||||||
logger.logLevel = .trace
|
logger.logLevel = .trace
|
||||||
return logger
|
return logger
|
||||||
}()
|
}()
|
||||||
|
|
||||||
func testGracefulShutdownWorks() async throws {
|
// TODO: Move to integration tests.
|
||||||
try await testGracefulShutdown { trigger in
|
func testMQTTConnectionStream() async throws {
|
||||||
let client = createClient(identifier: "testGracefulShutdown")
|
let client = createClient(identifier: "testNonManagedStream")
|
||||||
let service = MQTTConnectionService(client: client)
|
let manager = MQTTConnectionManager.live(
|
||||||
try await service.run()
|
client: client,
|
||||||
|
logger: Self.logger,
|
||||||
|
alwaysReconnect: false
|
||||||
|
)
|
||||||
|
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||||
|
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||||
|
var events1 = [MQTTConnectionManager.Event]()
|
||||||
|
var events2 = [MQTTConnectionManager.Event]()
|
||||||
|
|
||||||
|
let stream1 = connectionStream1.start()
|
||||||
|
let stream2 = connectionStream2.start()
|
||||||
|
|
||||||
|
_ = try await manager.connect()
|
||||||
|
|
||||||
|
Task {
|
||||||
|
while !client.isActive() {
|
||||||
|
try await Task.sleep(for: .milliseconds(100))
|
||||||
|
}
|
||||||
|
try await Task.sleep(for: .milliseconds(200))
|
||||||
|
manager.shutdown()
|
||||||
|
try await client.disconnect()
|
||||||
try await Task.sleep(for: .seconds(1))
|
try await Task.sleep(for: .seconds(1))
|
||||||
XCTAssert(client.isActive())
|
try await client.shutdown()
|
||||||
trigger.triggerGracefulShutdown()
|
try await Task.sleep(for: .seconds(1))
|
||||||
// try await Task.sleep(for: .seconds(2))
|
connectionStream1.stop()
|
||||||
// XCTAssertFalse(client.isActive())
|
connectionStream2.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for await event in stream1.removeDuplicates() {
|
||||||
|
events1.append(event)
|
||||||
|
}
|
||||||
|
for await event in stream2.removeDuplicates() {
|
||||||
|
events2.append(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||||
|
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||||
}
|
}
|
||||||
|
|
||||||
func createClient(identifier: String) -> MQTTClient {
|
func createClient(identifier: String) -> MQTTClient {
|
||||||
@@ -58,65 +89,4 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testEventStream() async throws {
|
|
||||||
var connection: ConnectionStream? = ConnectionStream()
|
|
||||||
|
|
||||||
let task = Task {
|
|
||||||
guard let events = connection?.events else { return }
|
|
||||||
print("before loop")
|
|
||||||
for await event in events {
|
|
||||||
print("\(event)")
|
|
||||||
}
|
|
||||||
print("after loop")
|
|
||||||
}
|
|
||||||
|
|
||||||
let ending = Task {
|
|
||||||
try await Task.sleep(for: .seconds(2))
|
|
||||||
connection = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
connection?.start()
|
|
||||||
try await ending.value
|
|
||||||
task.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
class ConnectionStream {
|
|
||||||
|
|
||||||
enum Event {
|
|
||||||
case connected
|
|
||||||
case disconnected
|
|
||||||
case shuttingDown
|
|
||||||
}
|
|
||||||
|
|
||||||
let events: AsyncStream<Event>
|
|
||||||
private let continuation: AsyncStream<Event>.Continuation
|
|
||||||
private var cancellable: AnyCancellable?
|
|
||||||
|
|
||||||
init() {
|
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
|
||||||
self.events = stream
|
|
||||||
self.continuation = continuation
|
|
||||||
}
|
|
||||||
|
|
||||||
deinit {
|
|
||||||
print("connection stream is gone.")
|
|
||||||
stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func start() {
|
|
||||||
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
|
||||||
.autoconnect()
|
|
||||||
.sink { [weak self] _ in
|
|
||||||
print("will send event.")
|
|
||||||
self?.continuation.yield(.connected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func stop() {
|
|
||||||
continuation.yield(.shuttingDown)
|
|
||||||
cancellable = nil
|
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
import Dependencies
|
import Dependencies
|
||||||
import Logging
|
import Logging
|
||||||
import Models
|
import Models
|
||||||
|
@_spi(Internal) import MQTTConnectionManager
|
||||||
import MQTTNIO
|
import MQTTNIO
|
||||||
import NIO
|
import NIO
|
||||||
import PsychrometricClientLive
|
import PsychrometricClientLive
|
||||||
@testable import SensorsService
|
@_spi(Internal) import SensorsService
|
||||||
|
import TopicDependencies
|
||||||
import XCTest
|
import XCTest
|
||||||
|
|
||||||
final class SensorsClientTests: XCTestCase {
|
final class SensorsClientTests: XCTestCase {
|
||||||
@@ -12,30 +14,75 @@ final class SensorsClientTests: XCTestCase {
|
|||||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||||
|
|
||||||
static let logger: Logger = {
|
static let logger: Logger = {
|
||||||
var logger = Logger(label: "AsyncClientTests")
|
var logger = Logger(label: "SensorsClientTests")
|
||||||
logger.logLevel = .debug
|
logger.logLevel = .trace
|
||||||
return logger
|
return logger
|
||||||
}()
|
}()
|
||||||
|
|
||||||
override func invokeTest() {
|
override func invokeTest() {
|
||||||
|
let client = createClient(identifier: "\(Self.self)")
|
||||||
|
|
||||||
withDependencies {
|
withDependencies {
|
||||||
|
$0.mqttConnectionManager = .live(client: client, logger: Self.logger)
|
||||||
$0.psychrometricClient = PsychrometricClient.liveValue
|
$0.psychrometricClient = PsychrometricClient.liveValue
|
||||||
|
$0.topicListener = .live(client: client)
|
||||||
|
$0.topicPublisher = .live(client: client)
|
||||||
} operation: {
|
} operation: {
|
||||||
super.invokeTest()
|
super.invokeTest()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func createClient(identifier: String) -> SensorsClient {
|
func testListeningResumesAfterDisconnectThenReconnect() async throws {
|
||||||
// let envVars = EnvVars(
|
@Dependency(\.mqttConnectionManager) var manager
|
||||||
// appEnv: .testing,
|
struct TimeoutError: Error {}
|
||||||
// host: Self.hostname,
|
|
||||||
// port: "1883",
|
let sensor = TemperatureAndHumiditySensor(location: .return)
|
||||||
// identifier: identifier,
|
var results = [TopicPublisher.PublishRequest]()
|
||||||
// userName: nil,
|
|
||||||
// password: nil
|
try await withDependencies {
|
||||||
// )
|
$0.topicPublisher = .capturing { results.append($0) }
|
||||||
// return .init(envVars: envVars, logger: Self.logger)
|
} operation: {
|
||||||
// }
|
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
|
||||||
|
let task = Task { try await sensorsService.run() }
|
||||||
|
defer { task.cancel() }
|
||||||
|
|
||||||
|
try await manager.connect()
|
||||||
|
defer { manager.shutdown() }
|
||||||
|
|
||||||
|
try await manager.withClient { client in
|
||||||
|
try await client.disconnect()
|
||||||
|
try await client.connect()
|
||||||
|
try await Task.sleep(for: .milliseconds(100))
|
||||||
|
try await client.publish(
|
||||||
|
to: sensor.topics.temperature,
|
||||||
|
payload: ByteBufferAllocator().buffer(string: "25"),
|
||||||
|
qos: .atLeastOnce,
|
||||||
|
retain: false
|
||||||
|
)
|
||||||
|
try await client.publish(
|
||||||
|
to: sensor.topics.humidity,
|
||||||
|
payload: ByteBufferAllocator().buffer(string: "50"),
|
||||||
|
qos: .atLeastOnce,
|
||||||
|
retain: false
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeoutCount = 0
|
||||||
|
while !(results.count == 2) {
|
||||||
|
guard timeoutCount < 20 else {
|
||||||
|
throw TimeoutError()
|
||||||
|
}
|
||||||
|
try await Task.sleep(for: .milliseconds(100))
|
||||||
|
timeoutCount += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
XCTAssertEqual(results.count, 2)
|
||||||
|
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
|
||||||
|
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
|
||||||
|
try await sensorsService.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func createClient(identifier: String) -> MQTTClient {
|
func createClient(identifier: String) -> MQTTClient {
|
||||||
let envVars = EnvVars(
|
let envVars = EnvVars(
|
||||||
appEnv: .testing,
|
appEnv: .testing,
|
||||||
@@ -62,178 +109,6 @@ final class SensorsClientTests: XCTestCase {
|
|||||||
configuration: config
|
configuration: config
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// func testConnectAndShutdown() async throws {
|
|
||||||
// let client = createClient(identifier: "testConnectAndShutdown")
|
|
||||||
// await client.connect()
|
|
||||||
// await client.shutdown()
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func testSensorService() async throws {
|
|
||||||
// let mqtt = createClient(identifier: "testSensorService")
|
|
||||||
// // let mqtt = await client.client
|
|
||||||
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
|
|
||||||
// let publishInfo = PublishInfoContainer(topicFilters: [
|
|
||||||
// sensor.topics.dewPoint,
|
|
||||||
// sensor.topics.enthalpy
|
|
||||||
// ])
|
|
||||||
// let service = SensorsService(client: mqtt, sensors: [sensor])
|
|
||||||
//
|
|
||||||
// // fix to connect the mqtt client.
|
|
||||||
// try await mqtt.connect()
|
|
||||||
// let task = Task { try await service.run() }
|
|
||||||
//
|
|
||||||
// _ = try await mqtt.subscribe(to: [
|
|
||||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce),
|
|
||||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce)
|
|
||||||
// ])
|
|
||||||
//
|
|
||||||
// let listener = mqtt.createPublishListener()
|
|
||||||
// Task {
|
|
||||||
// for await result in listener {
|
|
||||||
// switch result {
|
|
||||||
// case let .failure(error):
|
|
||||||
// XCTFail("\(error)")
|
|
||||||
// case let .success(value):
|
|
||||||
// await publishInfo.addPublishInfo(value)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// try await mqtt.publish(
|
|
||||||
// to: sensor.topics.temperature,
|
|
||||||
// payload: ByteBufferAllocator().buffer(string: "75.123"),
|
|
||||||
// qos: MQTTQoS.exactlyOnce,
|
|
||||||
// retain: true
|
|
||||||
// )
|
|
||||||
//
|
|
||||||
// try await Task.sleep(for: .seconds(1))
|
|
||||||
//
|
|
||||||
// // XCTAssert(client.sensors.first!.needsProcessed)
|
|
||||||
// // let firstSensor = await client.sensors.first!
|
|
||||||
// // XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius))
|
|
||||||
//
|
|
||||||
// try await mqtt.publish(
|
|
||||||
// to: sensor.topics.humidity,
|
|
||||||
// payload: ByteBufferAllocator().buffer(string: "50"),
|
|
||||||
// qos: MQTTQoS.exactlyOnce,
|
|
||||||
// retain: true
|
|
||||||
// )
|
|
||||||
//
|
|
||||||
// try await Task.sleep(for: .seconds(1))
|
|
||||||
//
|
|
||||||
// // not working for some reason
|
|
||||||
// // XCTAssertEqual(publishInfo.info.count, 2)
|
|
||||||
//
|
|
||||||
// XCTAssert(publishInfo.info.count > 1)
|
|
||||||
//
|
|
||||||
// // fix to shutdown the mqtt client.
|
|
||||||
// task.cancel()
|
|
||||||
// try await mqtt.shutdown()
|
|
||||||
// }
|
|
||||||
|
|
||||||
func testCapturingSensorClient() async throws {
|
|
||||||
class CapturedValues {
|
|
||||||
var values = [(value: Double, topic: String)]()
|
|
||||||
var didShutdown = false
|
|
||||||
|
|
||||||
init() {}
|
|
||||||
}
|
|
||||||
|
|
||||||
let capturedValues = CapturedValues()
|
|
||||||
|
|
||||||
try await withDependencies {
|
|
||||||
$0.sensorsClient = .testing(
|
|
||||||
yielding: [
|
|
||||||
(value: 76, to: "not-listening"),
|
|
||||||
(value: 75, to: "test")
|
|
||||||
]
|
|
||||||
) { value, topic in
|
|
||||||
capturedValues.values.append((value, topic))
|
|
||||||
} captureShutdownEvent: {
|
|
||||||
capturedValues.didShutdown = $0
|
|
||||||
}
|
|
||||||
} operation: {
|
|
||||||
@Dependency(\.sensorsClient) var client
|
|
||||||
let stream = try await client.listen(to: ["test"])
|
|
||||||
|
|
||||||
for await value in stream {
|
|
||||||
var buffer = value.payload
|
|
||||||
guard let double = Double(buffer: &buffer) else {
|
|
||||||
XCTFail("Failed to decode double")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
XCTAssertEqual(double, 75)
|
|
||||||
XCTAssertEqual(value.topicName, "test")
|
|
||||||
try await client.publish(26, to: "publish")
|
|
||||||
try await Task.sleep(for: .milliseconds(100))
|
|
||||||
client.shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
XCTAssertEqual(capturedValues.values.count, 1)
|
|
||||||
XCTAssertEqual(capturedValues.values.first?.value, 26)
|
|
||||||
XCTAssertEqual(capturedValues.values.first?.topic, "publish")
|
|
||||||
XCTAssertTrue(capturedValues.didShutdown)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// func testSensorCapturesPublishedState() async throws {
|
|
||||||
// let client = createClient(identifier: "testSensorCapturesPublishedState")
|
|
||||||
// let mqtt = client.client
|
|
||||||
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
|
|
||||||
// let publishInfo = PublishInfoContainer(topicFilters: [
|
|
||||||
// sensor.topics.dewPoint,
|
|
||||||
// sensor.topics.enthalpy
|
|
||||||
// ])
|
|
||||||
//
|
|
||||||
// try await client.addSensor(sensor)
|
|
||||||
// await client.connect()
|
|
||||||
// try await client.start()
|
|
||||||
//
|
|
||||||
// _ = try await mqtt.subscribe(to: [
|
|
||||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: MQTTQoS.exactlyOnce),
|
|
||||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: MQTTQoS.exactlyOnce)
|
|
||||||
// ])
|
|
||||||
//
|
|
||||||
// let listener = mqtt.createPublishListener()
|
|
||||||
// Task {
|
|
||||||
// for await result in listener {
|
|
||||||
// switch result {
|
|
||||||
// case let .failure(error):
|
|
||||||
// XCTFail("\(error)")
|
|
||||||
// case let .success(value):
|
|
||||||
// await publishInfo.addPublishInfo(value)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// try await mqtt.publish(
|
|
||||||
// to: sensor.topics.temperature,
|
|
||||||
// payload: ByteBufferAllocator().buffer(string: "75.123"),
|
|
||||||
// qos: MQTTQoS.exactlyOnce,
|
|
||||||
// retain: true
|
|
||||||
// )
|
|
||||||
//
|
|
||||||
// try await Task.sleep(for: .seconds(1))
|
|
||||||
//
|
|
||||||
// // XCTAssert(client.sensors.first!.needsProcessed)
|
|
||||||
// let firstSensor = client.sensors.first!
|
|
||||||
// XCTAssertEqual(firstSensor.temperature, DryBulb.celsius(75.123))
|
|
||||||
//
|
|
||||||
// try await mqtt.publish(
|
|
||||||
// to: sensor.topics.humidity,
|
|
||||||
// payload: ByteBufferAllocator().buffer(string: "50"),
|
|
||||||
// qos: MQTTQoS.exactlyOnce,
|
|
||||||
// retain: true
|
|
||||||
// )
|
|
||||||
//
|
|
||||||
// try await Task.sleep(for: .seconds(1))
|
|
||||||
//
|
|
||||||
// XCTAssertEqual(publishInfo.info.count, 2)
|
|
||||||
//
|
|
||||||
// await client.shutdown()
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: Helpers for tests.
|
// MARK: Helpers for tests.
|
||||||
@@ -258,41 +133,43 @@ class PublishInfoContainer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension SensorsClient {
|
extension TopicPublisher {
|
||||||
|
static func capturing(
|
||||||
static func testing(
|
_ callback: @escaping (PublishRequest) -> Void
|
||||||
yielding: [(value: Double, to: String)],
|
|
||||||
capturePublishedValues: @escaping (Double, String) -> Void,
|
|
||||||
captureShutdownEvent: @escaping (Bool) -> Void
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self)
|
.init { callback($0) }
|
||||||
let logger = Logger(label: "\(Self.self).testing")
|
|
||||||
|
|
||||||
return .init(
|
|
||||||
listen: { topics in
|
|
||||||
for (value, topic) in yielding where topics.contains(topic) {
|
|
||||||
continuation.yield(
|
|
||||||
MQTTPublishInfo(
|
|
||||||
qos: .atLeastOnce,
|
|
||||||
retain: true,
|
|
||||||
topicName: topic,
|
|
||||||
payload: ByteBuffer(string: "\(value)"),
|
|
||||||
properties: MQTTProperties()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return stream
|
|
||||||
},
|
|
||||||
logger: logger,
|
|
||||||
publish: { value, topic in
|
|
||||||
capturePublishedValues(value, topic)
|
|
||||||
},
|
|
||||||
shutdown: {
|
|
||||||
captureShutdownEvent(true)
|
|
||||||
continuation.finish()
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extension SensorsClient {
|
||||||
|
//
|
||||||
|
// static func testing(
|
||||||
|
// yielding: [(value: Double, to: String)],
|
||||||
|
// capturePublishedValues: @escaping (Double, String) -> Void,
|
||||||
|
// captureShutdownEvent: @escaping (Bool) -> Void
|
||||||
|
// ) -> Self {
|
||||||
|
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
|
||||||
|
// let logger = Logger(label: "\(Self.self).testing")
|
||||||
|
//
|
||||||
|
// return .init(
|
||||||
|
// listen: { topics in
|
||||||
|
// for (value, topic) in yielding where topics.contains(topic) {
|
||||||
|
// continuation.yield(
|
||||||
|
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
|
||||||
|
// )
|
||||||
|
// }
|
||||||
|
// return stream
|
||||||
|
// },
|
||||||
|
// logger: logger,
|
||||||
|
// publish: { value, topic in
|
||||||
|
// capturePublishedValues(value, topic)
|
||||||
|
// },
|
||||||
|
// shutdown: {
|
||||||
|
// captureShutdownEvent(true)
|
||||||
|
// continuation.finish()
|
||||||
|
// }
|
||||||
|
// )
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
struct TopicNotFoundError: Error {}
|
struct TopicNotFoundError: Error {}
|
||||||
|
|||||||
@@ -1,47 +0,0 @@
|
|||||||
import XCTest
|
|
||||||
import class Foundation.Bundle
|
|
||||||
|
|
||||||
//final class dewPoint_controllerTests: XCTestCase {
|
|
||||||
// func testExample() throws {
|
|
||||||
// // This is an example of a functional test case.
|
|
||||||
// // Use XCTAssert and related functions to verify your tests produce the correct
|
|
||||||
// // results.
|
|
||||||
//
|
|
||||||
// // Some of the APIs that we use below are available in macOS 10.13 and above.
|
|
||||||
// guard #available(macOS 10.13, *) else {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Mac Catalyst won't have `Process`, but it is supported for executables.
|
|
||||||
// #if !targetEnvironment(macCatalyst)
|
|
||||||
//
|
|
||||||
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
|
|
||||||
//
|
|
||||||
// let process = Process()
|
|
||||||
// process.executableURL = fooBinary
|
|
||||||
//
|
|
||||||
// let pipe = Pipe()
|
|
||||||
// process.standardOutput = pipe
|
|
||||||
//
|
|
||||||
// try process.run()
|
|
||||||
// process.waitUntilExit()
|
|
||||||
//
|
|
||||||
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
|
|
||||||
// let output = String(data: data, encoding: .utf8)
|
|
||||||
//
|
|
||||||
// XCTAssertEqual(output, "Hello, world!\n")
|
|
||||||
// #endif
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// /// Returns path to the built products directory.
|
|
||||||
// var productsDirectory: URL {
|
|
||||||
// #if os(macOS)
|
|
||||||
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
|
|
||||||
// return bundle.bundleURL.deletingLastPathComponent()
|
|
||||||
// }
|
|
||||||
// fatalError("couldn't find the products directory")
|
|
||||||
// #else
|
|
||||||
// return Bundle.main.bundleURL
|
|
||||||
// #endif
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
@@ -10,17 +10,15 @@ services:
|
|||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
platform: linux/amd64
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- mosquitto
|
- mosquitto
|
||||||
environment:
|
environment:
|
||||||
- MOSQUITTO_SERVER=mosquitto
|
- MQTT_HOST=mosquitto
|
||||||
|
|
||||||
test:
|
test:
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: Dockerfile.test
|
dockerfile: Dockerfile.test
|
||||||
platform: linux/amd64
|
|
||||||
working_dir: /app
|
working_dir: /app
|
||||||
networks:
|
networks:
|
||||||
- test
|
- test
|
||||||
|
|||||||
Reference in New Issue
Block a user