31 Commits

Author SHA1 Message Date
ce327a6f1c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 15:11:29 -05:00
95f8565cde feat: Adds initial CI
Some checks failed
CI / Test (push) Failing after 39s
2024-11-14 15:05:09 -05:00
163f603b69 feat: Fixes some tests and docker builds 2024-11-14 14:58:09 -05:00
e7a849b003 feat: Adding more tests 2024-11-14 07:43:40 -05:00
bd2a798320 feat: Seperates connection stream and moves connection manager out of the connection service module. 2024-11-13 17:12:56 -05:00
b8992b89b6 feat: Adds MQTT connection stream, need to clean up the manager and remove stream from it. 2024-11-13 10:06:28 -05:00
efd9907b4a feat: Cleans up some of the shutdown logic so that the MQTTClient is disconnected properly. 2024-11-12 22:19:09 -05:00
fbbd65f7ae feat: Cleaning up some unused code. 2024-11-12 21:18:02 -05:00
8067331ff8 feat: Removes sensor client in favor of more generic topic listener and publisher 2024-11-12 16:42:14 -05:00
b6db9b5322 feat: Begins breaking out topic listener and publisher as it's own dependency. 2024-11-12 11:12:34 -05:00
bf1126b06a feat: Adds MQTTConnectionManagerLive module. 2024-11-11 22:00:14 -05:00
ef552fb8bc feat: Removes unused files 2024-11-11 16:28:11 -05:00
1e62d7aac0 feat: Adds sensor client dependency 2024-11-11 15:23:45 -05:00
f68ac528e4 feat: Working on sensor client dependency 2024-11-10 22:27:59 -05:00
10294801fc feat: Working on sensor client dependency 2024-11-10 21:16:56 -05:00
a65605e9e7 feat: Adds graceful shutdown to services 2024-11-10 01:30:22 -05:00
320a733d12 feat: Removing old libraries 2024-11-09 12:15:59 -05:00
936dd0b816 feat: Fixes tests for sensor service since using newer psychrometrics version. 2024-11-09 11:51:20 -05:00
a87addaf0b feat: Updates to newer psychrometrics package. Not yet a working example. 2024-11-09 11:35:30 -05:00
e2683d3f06 feat: Adds MQTTConnectionServiceTests 2024-11-09 10:30:25 -05:00
6c5115dcde feat: Adds MQTTConnectionServiceTests 2024-11-09 09:42:03 -05:00
90c5b7c77f feat: Adds MQTTConnectionService 2024-11-09 09:03:31 -05:00
79bb162434 feat: Updates for Sendable conformance 2024-11-09 01:16:57 -05:00
529b9b0bc5 feat: Begins removing old interfaces and renaming 2024-11-09 00:17:28 -05:00
48d51419d7 feat: Adds SensorsService 2024-11-08 23:52:01 -05:00
adc7fc1295 feat: Working on async integrations. 2024-11-08 17:14:22 -05:00
f40c4ef859 feat: Working on async integrations. 2024-11-08 13:35:46 -05:00
e6d1d4578d feat: Working on async integrations. 2024-11-08 11:05:07 -05:00
408e0484cd feat: Begins more work on async integration 2024-11-07 17:10:21 -05:00
19b2eb42c5 feat: Commit due to moving to nas 2024-11-07 14:31:02 -05:00
7122fc818b feat: Commit due to moving to nas 2024-11-07 10:17:13 -05:00
49 changed files with 1733 additions and 1621 deletions

0
.dockerignore Normal file → Executable file
View File

7
.editorconfig Normal file
View File

@@ -0,0 +1,7 @@
root = true
[*.swift]
indent_style = space
indent_size = 2
tab_width = 2
trim_trailing_whitespace = true

14
.gitea/workflows/ci.yaml Normal file
View File

@@ -0,0 +1,14 @@
---
name: CI
on:
push:
pull_request:
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Test
run: make test

0
.github/workflows/release.yml vendored Normal file → Executable file
View File

1
.gitignore vendored Normal file → Executable file
View File

@@ -9,3 +9,4 @@ DerivedData/
.topics
mqtt_password.txt
.env
.smbdelete*

11
.swiftformat Normal file
View File

@@ -0,0 +1,11 @@
--self init-only
--indent 2
--ifdef indent
--trimwhitespace always
--wraparguments before-first
--wrapparameters before-first
--wrapcollections preserve
--wrapconditions after-first
--typeblanklines preserve
--commas inline
--stripunusedargs closure-only

View File

0
.swiftpm/xcode/xcshareddata/xcschemes/Client.xcscheme Normal file → Executable file
View File

View File

View File

0
.swiftpm/xcode/xcshareddata/xcschemes/EnvVars.xcscheme Normal file → Executable file
View File

0
.swiftpm/xcode/xcshareddata/xcschemes/Models.xcscheme Normal file → Executable file
View File

View File

@@ -146,6 +146,20 @@
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "SensorsService"
BuildableName = "SensorsService"
BlueprintName = "SensorsService"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
@@ -174,6 +188,26 @@
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "MQTTConnectionServiceTests"
BuildableName = "MQTTConnectionServiceTests"
BlueprintName = "MQTTConnectionServiceTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "SensorsServiceTests"
BuildableName = "SensorsServiceTests"
BlueprintName = "SensorsServiceTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction

View File

0
Bootstrap/dewPoint-env-example Normal file → Executable file
View File

0
Bootstrap/topics-example Normal file → Executable file
View File

6
Dockerfile Normal file → Executable file
View File

@@ -5,10 +5,10 @@ WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build --enable-test-discovery -c release -Xswiftc -g
RUN swift build -c release -Xswiftc -g
# Run image
FROM swift:5.10-slim
WORKDIR /run
COPY --from=build /build/.build/release/dewPoint-controller /run
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]
COPY --from=build /build/.build/release/dewpoint-controller /run
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]

6
Dockerfile.test Normal file
View File

@@ -0,0 +1,6 @@
FROM swift:5.10
WORKDIR /app
COPY ./Package.* ./
RUN swift package resolve
COPY . .
CMD ["/bin/bash", "-xc", "swift", "test"]

10
Makefile Normal file → Executable file
View File

@@ -8,7 +8,10 @@ bootstrap-topics:
bootstrap: bootstrap-env bootstrap-topics
build:
@swift build
@swift build -Xswiftc -strict-concurrency=complete
clean:
rm -rf .build
run:
@swift run dewPoint-controller
@@ -20,5 +23,8 @@ stop-mosquitto:
@docker-compose rm -f mosquitto || true
test-docker:
@docker-compose run -i test
@docker-compose run --build --remove-orphans -i --rm test
@docker-compose kill mosquitto-test
@docker-compose rm -f
test: test-docker

223
Package.resolved Normal file → Executable file
View File

@@ -1,61 +1,168 @@
{
"object": {
"pins": [
{
"package": "mqtt-nio",
"repositoryURL": "https://github.com/swift-server-community/mqtt-nio.git",
"state": {
"branch": null,
"revision": "ca8af7a30c4690456ce7de276cd0f037489ba707",
"version": "2.5.3"
}
},
{
"package": "swift-log",
"repositoryURL": "https://github.com/apple/swift-log.git",
"state": {
"branch": null,
"revision": "5d66f7ba25daf4f94100e7022febf3c75e37a6c7",
"version": "1.4.2"
}
},
{
"package": "swift-nio",
"repositoryURL": "https://github.com/apple/swift-nio",
"state": {
"branch": null,
"revision": "6aa9347d9bc5bbfe6a84983aec955c17ffea96ef",
"version": "2.33.0"
}
},
{
"package": "swift-nio-ssl",
"repositoryURL": "https://github.com/apple/swift-nio-ssl.git",
"state": {
"branch": null,
"revision": "b5260a31c2a72a89fa684f5efb3054d8725a2316",
"version": "2.18.0"
}
},
{
"package": "swift-nio-transport-services",
"repositoryURL": "https://github.com/apple/swift-nio-transport-services.git",
"state": {
"branch": null,
"revision": "8ab824b140d0ebcd87e9149266ddc353e3705a3e",
"version": "1.11.4"
}
},
{
"package": "swift-psychrometrics",
"repositoryURL": "https://github.com/swift-psychrometrics/swift-psychrometrics",
"state": {
"branch": null,
"revision": "03573545c3750b406921eb22a9575c8062beef88",
"version": "0.1.2"
}
"originHash" : "c2538e3229d6c80f3d6a979c2f3605d63b4973e1e49786819017473eb2916f4e",
"pins" : [
{
"identity" : "combine-schedulers",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/combine-schedulers",
"state" : {
"revision" : "9fa31f4403da54855f1e2aeaeff478f4f0e40b13",
"version" : "1.0.2"
}
]
},
"version": 1
},
{
"identity" : "mqtt-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-server-community/mqtt-nio.git",
"state" : {
"revision" : "267b83ab5690d463ff00585a4fd6dc54b698e1d2",
"version" : "2.11.0"
}
},
{
"identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-async-algorithms.git",
"state" : {
"revision" : "5c8bd186f48c16af0775972700626f0b74588278",
"version" : "1.0.2"
}
},
{
"identity" : "swift-atomics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"state" : {
"revision" : "cd142fd2f64be2100422d658e7411e39489da985",
"version" : "1.2.0"
}
},
{
"identity" : "swift-clocks",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-clocks",
"state" : {
"revision" : "b9b24b69e2adda099a1fa381cda1eeec272d5b53",
"version" : "1.0.5"
}
},
{
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-collections.git",
"state" : {
"revision" : "671108c96644956dddcd89dd59c203dcdb36cec7",
"version" : "1.1.4"
}
},
{
"identity" : "swift-concurrency-extras",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-concurrency-extras",
"state" : {
"revision" : "6054df64b55186f08b6d0fd87152081b8ad8d613",
"version" : "1.2.0"
}
},
{
"identity" : "swift-dependencies",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-dependencies",
"state" : {
"revision" : "96eecd47660e8307877acb8c41cc5295ba7350a7",
"version" : "1.5.2"
}
},
{
"identity" : "swift-log",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-log.git",
"state" : {
"revision" : "9cb486020ebf03bfa5b5df985387a14a98744537",
"version" : "1.6.1"
}
},
{
"identity" : "swift-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio",
"state" : {
"revision" : "914081701062b11e3bb9e21accc379822621995e",
"version" : "2.76.1"
}
},
{
"identity" : "swift-nio-ssl",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-ssl.git",
"state" : {
"revision" : "b5260a31c2a72a89fa684f5efb3054d8725a2316",
"version" : "2.18.0"
}
},
{
"identity" : "swift-nio-transport-services",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-transport-services.git",
"state" : {
"revision" : "8ab824b140d0ebcd87e9149266ddc353e3705a3e",
"version" : "1.11.4"
}
},
{
"identity" : "swift-psychrometrics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-psychrometrics/swift-psychrometrics",
"state" : {
"revision" : "6a457f3cefd9477f7aa76b2fb8ad557988c447bd",
"version" : "0.2.3"
}
},
{
"identity" : "swift-service-lifecycle",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-server/swift-service-lifecycle.git",
"state" : {
"revision" : "f70b838872863396a25694d8b19fe58bcd0b7903",
"version" : "2.6.2"
}
},
{
"identity" : "swift-syntax",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftlang/swift-syntax",
"state" : {
"revision" : "0687f71944021d616d34d922343dcef086855920",
"version" : "600.0.1"
}
},
{
"identity" : "swift-system",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-system.git",
"state" : {
"revision" : "c8a44d836fe7913603e246acab7c528c2e780168",
"version" : "1.4.0"
}
},
{
"identity" : "swift-tagged",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-tagged",
"state" : {
"revision" : "3907a9438f5b57d317001dc99f3f11b46882272b",
"version" : "0.10.0"
}
},
{
"identity" : "xctest-dynamic-overlay",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
"state" : {
"revision" : "770f990d3e4eececb57ac04a6076e22f8c97daeb",
"version" : "1.4.2"
}
}
],
"version" : 3
}

135
Package.swift Normal file → Executable file
View File

@@ -2,99 +2,110 @@
import PackageDescription
let swiftSettings: [SwiftSetting] = [
.enableExperimentalFeature("StrictConcurrency"),
.enableUpcomingFeature("InferSendableCaptures")
]
let package = Package(
name: "dewPoint-controller",
name: "dewpoint-controller",
platforms: [
.macOS(.v12)
.macOS(.v14)
],
products: [
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
.library(name: "Bootstrap", targets: ["Bootstrap"]),
.library(name: "DewPointEnvironment", targets: ["DewPointEnvironment"]),
.library(name: "EnvVars", targets: ["EnvVars"]),
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
.library(name: "Models", targets: ["Models"]),
.library(name: "Client", targets: ["Client"]),
.library(name: "ClientLive", targets: ["ClientLive"]),
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"]),
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
],
dependencies: [
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", from: "0.1.0")
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"),
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
],
targets: [
.executableTarget(
name: "dewPoint-controller",
name: "dewpoint-controller",
dependencies: [
"Bootstrap",
"ClientLive",
"TopicsLive",
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio")
]
),
.testTarget(
name: "dewPoint-controllerTests",
dependencies: ["dewPoint-controller"]
),
.target(
name: "Bootstrap",
dependencies: [
"DewPointEnvironment",
"EnvVars",
"ClientLive",
"Models",
"MQTTConnectionManager",
"MQTTConnectionService",
"SensorsService",
"TopicDependencies",
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio")
.product(name: "NIO", package: "swift-nio"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.target(
name: "DewPointEnvironment",
dependencies: [
"EnvVars",
"Client",
"Models",
.product(name: "MQTTNIO", package: "mqtt-nio"),
]
),
.target(
name: "EnvVars",
dependencies: []
),
.target(
name: "Models",
dependencies: [
.product(name: "Psychrometrics", package: "swift-psychrometrics"),
]
.product(name: "Logging", package: "swift-log"),
.product(name: "PsychrometricClient", package: "swift-psychrometrics")
],
swiftSettings: swiftSettings
),
.target(
name: "Client",
name: "MQTTConnectionManager",
dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
),
.target(
name: "MQTTConnectionService",
dependencies: [
"Models",
.product(name: "CoreUnitTypes", package: "swift-psychrometrics"),
.product(name: "NIO", package: "swift-nio"),
.product(name: "Psychrometrics", package: "swift-psychrometrics")
]
),
.target(
name: "ClientLive",
dependencies: [
"Client",
"EnvVars",
.product(name: "MQTTNIO", package: "mqtt-nio")
]
"MQTTConnectionManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
),
.testTarget(
name: "ClientTests",
name: "MQTTConnectionServiceTests",
dependencies: [
"Client",
"ClientLive"
"MQTTConnectionService",
"MQTTConnectionManager",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
]
),
.target(
name: "TopicsLive",
name: "SensorsService",
dependencies: [
"Models"
"Models",
"MQTTConnectionManager",
"TopicDependencies",
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
),
.testTarget(
name: "SensorsServiceTests",
dependencies: [
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.target(
name: "TopicDependencies",
dependencies: [
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
)
]
)

0
README.md Normal file → Executable file
View File

View File

@@ -1,167 +0,0 @@
import ClientLive
import DewPointEnvironment
import EnvVars
import Logging
import Foundation
import Models
import MQTTNIO
import NIO
/// Sets up the application environment and connections required.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for debugging.
/// - autoConnect: A flag whether to auto-connect to the MQTT broker or not.
public func bootstrap(
eventLoopGroup: EventLoopGroup,
logger: Logger? = nil,
autoConnect: Bool = true
) -> EventLoopFuture<DewPointEnvironment> {
logger?.debug("Bootstrapping Dew Point Controller...")
return loadEnvVars(eventLoopGroup: eventLoopGroup, logger: logger)
.and(loadTopics(eventLoopGroup: eventLoopGroup, logger: logger))
.makeDewPointEnvironment(eventLoopGroup: eventLoopGroup, logger: logger)
.connectToMQTTBroker(autoConnect: autoConnect, logger: logger)
}
/// Loads the ``EnvVars`` either using the defualts, from a file in the root directory under `.dewPoint-env` or in the shell / application environment.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for debugging.
private func loadEnvVars(
eventLoopGroup: EventLoopGroup,
logger: Logger?
) -> EventLoopFuture<EnvVars> {
logger?.debug("Loading env vars...")
// TODO: Need to have the env file path passed in / dynamic.
let envFilePath = URL(fileURLWithPath: #file)
.deletingLastPathComponent()
.deletingLastPathComponent()
.deletingLastPathComponent()
.appendingPathComponent(".dewPoint-env")
let decoder = JSONDecoder()
let encoder = JSONEncoder()
let defaultEnvVars = EnvVars()
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
// Read from file `.dewPoint-env` file if it exists.
let localEnvVarsDict = (try? Data(contentsOf: envFilePath))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
// Merge with variables in the shell environment.
let envVarsDict = defaultEnvDict
.merging(localEnvVarsDict, uniquingKeysWith: { $1 })
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
// Produces the final env vars from the merged items or uses defaults if something
// went wrong.
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
logger?.debug("Done loading env vars...")
return eventLoopGroup.next().makeSucceededFuture(envVars)
}
// MARK: TODO perhaps make loading from file an option passed in when app is launched.
/// Load the topics from file in application root directory at `.topics`, if available or fall back to the defualt.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for debugging.
private func loadTopics(eventLoopGroup: EventLoopGroup, logger: Logger?) -> EventLoopFuture<Topics> {
logger?.debug("Loading topics from file...")
let topicsFilePath = URL(fileURLWithPath: #file)
.deletingLastPathComponent()
.deletingLastPathComponent()
.deletingLastPathComponent()
.appendingPathComponent(".topics")
let decoder = JSONDecoder()
// Attempt to load the topics from file in root directory.
let localTopics = (try? Data.init(contentsOf: topicsFilePath))
.flatMap { try? decoder.decode(Topics.self, from: $0) }
logger?.debug(
localTopics == nil
? "Failed to load topics from file, falling back to defaults."
: "Done loading topics from file."
)
// If we were able to load from file use that, else fallback to the defaults.
return eventLoopGroup.next().makeSucceededFuture(localTopics ?? .init())
}
extension EventLoopFuture where Value == (EnvVars, Topics) {
/// Creates the ``DewPointEnvironment`` for the application after the ``EnvVars`` have been loaded.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for the application.
fileprivate func makeDewPointEnvironment(
eventLoopGroup: EventLoopGroup,
logger: Logger?
) -> EventLoopFuture<DewPointEnvironment> {
map { envVars, topics in
let mqttClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger)
return DewPointEnvironment.init(
envVars: envVars,
mqttClient: mqttClient,
topics: topics
)
}
}
}
extension EventLoopFuture where Value == DewPointEnvironment {
/// Connects to the MQTT broker after the ``DewPointEnvironment`` has been setup.
///
/// - Parameters:
/// - logger: An optional logger for debugging.
fileprivate func connectToMQTTBroker(autoConnect: Bool, logger: Logger?) -> EventLoopFuture<DewPointEnvironment> {
guard autoConnect else { return self }
return flatMap { environment in
logger?.debug("Connecting to MQTT Broker...")
return environment.mqttClient.connect()
.map { _ in
logger?.debug("Successfully connected to MQTT Broker...")
return environment
}
}
}
}
extension MQTTNIO.MQTTClient {
fileprivate convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .v5_0,
userName: envVars.userName,
password: envVars.password
)
)
}
}

View File

@@ -1,44 +0,0 @@
import CoreUnitTypes
import Logging
import Foundation
import Models
import NIO
import Psychrometrics
public struct Client {
/// Add the publish listeners to the MQTT Broker, to be notified of published changes.
public var addListeners: () -> Void
/// Connect to the MQTT Broker.
public var connect: () -> EventLoopFuture<Void>
public var publishSensor: (SensorPublishRequest) -> EventLoopFuture<Void>
/// Subscribe to appropriate topics / events.
public var subscribe: () -> EventLoopFuture<Void>
/// Disconnect and close the connection to the MQTT Broker.
public var shutdown: () -> EventLoopFuture<Void>
public init(
addListeners: @escaping () -> Void,
connect: @escaping () -> EventLoopFuture<Void>,
publishSensor: @escaping (SensorPublishRequest) -> EventLoopFuture<Void>,
shutdown: @escaping () -> EventLoopFuture<Void>,
subscribe: @escaping () -> EventLoopFuture<Void>
) {
self.addListeners = addListeners
self.connect = connect
self.publishSensor = publishSensor
self.shutdown = shutdown
self.subscribe = subscribe
}
public enum SensorPublishRequest {
case mixed(State.Sensors.TemperatureHumiditySensor<State.Sensors.Mixed>)
case postCoil(State.Sensors.TemperatureHumiditySensor<State.Sensors.PostCoil>)
case `return`(State.Sensors.TemperatureHumiditySensor<State.Sensors.Return>)
case supply(State.Sensors.TemperatureHumiditySensor<State.Sensors.Supply>)
}
}

View File

@@ -1,261 +0,0 @@
import CoreUnitTypes
import Logging
import Models
import MQTTNIO
import NIO
import NIOFoundationCompat
import Psychrometrics
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: inout ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let string = buffer.readString(
length: buffer.readableBytes,
encoding: String.Encoding.utf8
)
else { return nil }
self.init(string)
}
}
extension Temperature: BufferInitalizable {
/// Attempt to create / parse a temperature from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value, units: .celsius)
}
}
extension RelativeHumidity: BufferInitalizable {
/// Attempt to create / parse a relative humidity from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension MQTTNIO.MQTTClient {
/// Logs a failure for a given topic and error.
func logFailure(topic: String, error: Error) {
logger.error("\(topic): \(error)")
}
}
extension Result where Success == MQTTPublishInfo {
func logIfFailure(client: MQTTNIO.MQTTClient, topic: String) -> ByteBuffer? {
switch self {
case let .success(value):
guard value.topicName == topic else { return nil }
return value.payload
case let .failure(error):
client.logFailure(topic: topic, error: error)
return nil
}
}
}
extension Optional where Wrapped == ByteBuffer {
func parse<T>(as type: T.Type) -> T? where T: BufferInitalizable {
switch self {
case var .some(buffer):
return T.init(buffer: &buffer)
case .none:
return nil
}
}
}
fileprivate struct TemperatureAndHumiditySensorKeyPathEnvelope {
let humidityTopic: KeyPath<Topics.Sensors, String>
let temperatureTopic: KeyPath<Topics.Sensors, String>
let temperatureState: WritableKeyPath<State.Sensors, Temperature?>
let humidityState: WritableKeyPath<State.Sensors, RelativeHumidity?>
func addListener(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
let temperatureTopic = topics.sensors[keyPath: temperatureTopic]
client.logger.trace("Adding listener for topic: \(temperatureTopic)")
client.addPublishListener(named: temperatureTopic) { result in
result.logIfFailure(client: client, topic: temperatureTopic)
.parse(as: Temperature.self)
.map { temperature in
state.sensors[keyPath: temperatureState] = temperature
}
}
let humidityTopic = topics.sensors[keyPath: humidityTopic]
client.logger.trace("Adding listener for topic: \(humidityTopic)")
client.addPublishListener(named: humidityTopic) { result in
result.logIfFailure(client: client, topic: humidityTopic)
.parse(as: RelativeHumidity.self)
.map { humidity in
state.sensors[keyPath: humidityState] = humidity
}
}
}
}
extension Array where Element == TemperatureAndHumiditySensorKeyPathEnvelope {
func addListeners(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
_ = self.map { envelope in
envelope.addListener(to: client, topics: topics, state: state)
}
}
}
extension Array where Element == MQTTSubscribeInfo {
static func sensors(topics: Topics) -> Self {
[
.init(topicFilter: topics.sensors.mixedAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.mixedAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.humidity, qos: .atLeastOnce),
]
}
}
extension State {
func addSensorListeners(to client: MQTTNIO.MQTTClient, topics: Topics) {
let envelopes: [TemperatureAndHumiditySensorKeyPathEnvelope] = [
.init(
humidityTopic: \.mixedAirSensor.humidity,
temperatureTopic: \.mixedAirSensor.temperature,
temperatureState: \.mixedAirSensor.temperature,
humidityState: \.mixedAirSensor.humidity
),
.init(
humidityTopic: \.postCoilSensor.humidity,
temperatureTopic: \.postCoilSensor.temperature,
temperatureState: \.postCoilSensor.temperature,
humidityState: \.postCoilSensor.humidity
),
.init(
humidityTopic: \.returnAirSensor.humidity,
temperatureTopic: \.returnAirSensor.temperature,
temperatureState: \.returnAirSensor.temperature,
humidityState: \.returnAirSensor.humidity
),
.init(
humidityTopic: \.supplyAirSensor.humidity,
temperatureTopic: \.supplyAirSensor.temperature,
temperatureState: \.supplyAirSensor.temperature,
humidityState: \.supplyAirSensor.humidity
),
]
envelopes.addListeners(to: client, topics: topics, state: self)
}
}
extension Client.SensorPublishRequest {
func dewPointData(topics: Topics, units: PsychrometricEnvironment.Units?) -> (DewPoint, String)? {
switch self {
case let .mixed(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.mixedAirSensor.dewPoint)
case let .postCoil(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.postCoilSensor.dewPoint)
case let .return(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.returnAirSensor.dewPoint)
case let .supply(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.supplyAirSensor.dewPoint)
}
}
func enthalpyData(altitude: Length, topics: Topics, units: PsychrometricEnvironment.Units?) -> (EnthalpyOf<MoistAir>, String)? {
switch self {
case let .mixed(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.mixedAirSensor.enthalpy)
case let .postCoil(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.postCoilSensor.enthalpy)
case let .return(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.returnAirSensor.enthalpy)
case let .supply(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.supplyAirSensor.enthalpy)
}
}
func setHasProcessed(state: State) {
switch self {
case .mixed:
state.sensors.mixedAirSensor.needsProcessed = false
case .postCoil:
state.sensors.postCoilSensor.needsProcessed = false
case .return:
state.sensors.returnAirSensor.needsProcessed = false
case .supply:
state.sensors.supplyAirSensor.needsProcessed = false
}
}
}
extension MQTTNIO.MQTTClient {
func publishDewPoint(
request: Client.SensorPublishRequest,
state: State,
topics: Topics
) -> EventLoopFuture<(MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics)> {
guard let (dewPoint, topic) = request.dewPointData(topics: topics, units: state.units)
else {
logger.trace("No dew point for sensor.")
return eventLoopGroup.next().makeSucceededFuture((self, request, state, topics))
}
let roundedDewPoint = round(dewPoint.rawValue * 100) / 100
logger.debug("Publishing dew-point: \(dewPoint), to: \(topic)")
return publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(roundedDewPoint)"),
qos: .atLeastOnce,
retain: true
)
.map { (self, request, state, topics) }
}
}
extension EventLoopFuture where Value == (Client.SensorPublishRequest, State) {
func setHasProcessed() -> EventLoopFuture<Void> {
map { request, state in
request.setHasProcessed(state: state)
}
}
}
extension EventLoopFuture where Value == (MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics) {
func publishEnthalpy() -> EventLoopFuture<(Client.SensorPublishRequest, State)> {
flatMap { client, request, state, topics in
guard let (enthalpy, topic) = request.enthalpyData(altitude: state.altitude, topics: topics, units: state.units)
else {
client.logger.trace("No enthalpy for sensor.")
return client.eventLoopGroup.next().makeSucceededFuture((request, state))
}
let roundedEnthalpy = round(enthalpy.rawValue * 100) / 100
client.logger.debug("Publishing enthalpy: \(enthalpy), to: \(topic)")
return client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(roundedEnthalpy)"),
qos: .atLeastOnce
)
.map { (request, state) }
}
}
}

View File

@@ -1,158 +0,0 @@
import Foundation
@_exported import Client
import CoreUnitTypes
import Models
import MQTTNIO
import NIO
import Psychrometrics
extension Client {
// The state passed in here needs to be a class or we get escaping errors in the `addListeners` method.
public static func live(
client: MQTTNIO.MQTTClient,
state: State,
topics: Topics
) -> Self {
.init(
addListeners: {
state.addSensorListeners(to: client, topics: topics)
},
connect: {
client.connect()
.map { _ in }
},
publishSensor: { request in
client.publishDewPoint(request: request, state: state, topics: topics)
.publishEnthalpy()
.setHasProcessed()
},
shutdown: {
client.disconnect()
.map { try? client.syncShutdownGracefully() }
},
subscribe: {
// Sensor subscriptions
client.subscribe(to: .sensors(topics: topics))
.map { _ in }
}
)
}
}
import Logging
import NIOTransportServices
import EnvVars
public class AsyncClient {
//public static let eventLoopGroup = NIOTSEventLoopGroup()
public static let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
public let client: MQTTClient
public private(set) var shuttingDown: Bool
var logger: Logger { client.logger }
public init(envVars: EnvVars, logger: Logger) {
let config = MQTTClient.Configuration.init(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
self.client = .init(
host: envVars.host,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(Self.eventLoopGroup),
logger: logger,
configuration: config
)
self.shuttingDown = false
}
public func connect() async {
do {
try await self.client.connect()
self.client.addCloseListener(named: "AsyncClient") { [self] result in
guard !self.shuttingDown else { return }
Task {
self.logger.debug("Connection closed.")
self.logger.debug("Reconnecting...")
await self.connect()
}
}
logger.debug("Connection successful.")
} catch {
logger.trace("Connection Failed.\n\(error)")
}
}
public func shutdown() async {
self.shuttingDown = true
try? await self.client.disconnect()
try? await self.client.shutdown()
}
func addSensorListeners() async {
}
// Need to save the recieved values somewhere.
func addPublishListener<T>(
topic: String,
decoding: T.Type
) async throws where T: BufferInitalizable {
_ = try await self.client.subscribe(to: [.init(topicFilter: topic, qos: .atLeastOnce)])
Task {
let listener = self.client.createPublishListener()
for await result in listener {
switch result {
case let .success(packet):
var buffer = packet.payload
guard let value = T.init(buffer: &buffer) else {
logger.debug("Could not decode buffer: \(buffer)")
return
}
logger.debug("Recieved value: \(value)")
case let .failure(error):
logger.trace("Error:\n\(error)")
}
}
}
}
private func publish(string: String, to topic: String) async throws {
try await self.client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: string),
qos: .atLeastOnce
)
}
private func publish(double: Double, to topic: String) async throws {
let rounded = round(double * 100) / 100
try await publish(string: "\(rounded)", to: topic)
}
func publishDewPoint(_ request: Client.SensorPublishRequest) async throws {
// fix
guard let (dewPoint, topic) = request.dewPointData(topics: .init(), units: nil) else { return }
try await self.publish(double: dewPoint.rawValue, to: topic)
logger.debug("Published dewpoint: \(dewPoint.rawValue), to: \(topic)")
}
func publishEnthalpy(_ request: Client.SensorPublishRequest) async throws {
// fix
guard let (enthalpy, topic) = request.enthalpyData(altitude: .seaLevel, topics: .init(), units: nil) else { return }
try await self.publish(double: enthalpy.rawValue, to: topic)
logger.debug("Publihsed enthalpy: \(enthalpy.rawValue), to: \(topic)")
}
public func publishSensor(_ request: Client.SensorPublishRequest) async throws {
try await publishDewPoint(request)
try await publishEnthalpy(request)
}
}

View File

@@ -1,21 +0,0 @@
import Client
import EnvVars
import Models
import MQTTNIO
public struct DewPointEnvironment {
public var envVars: EnvVars
public var mqttClient: MQTTNIO.MQTTClient
public var topics: Topics
public init(
envVars: EnvVars,
mqttClient: MQTTNIO.MQTTClient,
topics: Topics = .init()
) {
self.envVars = envVars
self.mqttClient = mqttClient
self.topics = topics
}
}

View File

@@ -0,0 +1,221 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Shutdown the connection to the MQTT broker.
///
/// - Note: You should cancel any tasks that are listening to the connection stream first.
public var shutdown: @Sendable () -> Void
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
public static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init {
try await manager.connect(cleanSession: cleanSession)
} shutdown: {
manager.shutdown()
} stream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
} _withClient: { callback in
try await callback(client)
}
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
// MARK: - Helpers
@_spi(Internal)
public final actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
private let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.logger = logger
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
self.client = client
self.logger = logger
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
// We should've already logged that we're shutting down if
// the manager was shutdown properly, so don't log it twice.
self.shutdown(withLogging: false)
}
private func setHasConnected() {
hasConnected = true
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
}

View File

@@ -0,0 +1,37 @@
import Dependencies
import Logging
import Models
import MQTTConnectionManager
import ServiceLifecycle
public actor MQTTConnectionService: Service {
@Dependency(\.mqttConnectionManager) var manager
private nonisolated let logger: Logger?
public init(
logger: Logger? = nil
) {
self.logger = logger
}
/// The entry-point of the service which starts the connection
/// to the MQTT broker and handles graceful shutdown of the
/// connection.
public func run() async throws {
try await withGracefulShutdownHandler {
try await manager.connect()
for await event in try manager.stream().cancelOnGracefulShutdown() {
// We don't really need to do anything with the events, so just logging
// for now. But we need to iterate on an async stream for the service to
// continue to run and handle graceful shutdowns.
logger?.trace("Received connection event: \(event)")
}
// when we reach here we are shutting down, so we shutdown
// the manager.
manager.shutdown()
} onGracefulShutdown: {
self.logger?.trace("Received graceful shutdown.")
}
}
}

View File

@@ -1,29 +1,30 @@
import Foundation
import Logging
/// Holds common settings for connecting to your MQTT broker. The default values can be used,
/// they can be loaded from the shell environment, or from a file located in the root directory.
///
/// This allows us to keep sensitve settings out of the repository.
public struct EnvVars: Codable, Equatable {
public struct EnvVars: Codable, Equatable, Sendable {
/// The current app environment.
public var appEnv: AppEnv
/// The MQTT host.
public var host: String
/// The MQTT port.
public var port: String?
/// The identifier to use when connecting to the MQTT broker.
public var identifier: String
/// The MQTT user name.
public var userName: String?
/// The MQTT user password.
public var password: String?
/// Create a new ``EnvVars``
///
/// - Parameters:
@@ -40,7 +41,7 @@ public struct EnvVars: Codable, Equatable {
identifier: String = "dewPoint-controller",
userName: String? = "mqtt_user",
password: String? = "secret!"
){
) {
self.appEnv = appEnv
self.host = host
self.port = port
@@ -48,7 +49,7 @@ public struct EnvVars: Codable, Equatable {
self.userName = userName
self.password = password
}
/// Custom coding keys.
private enum CodingKeys: String, CodingKey {
case appEnv = "APP_ENV"
@@ -58,9 +59,9 @@ public struct EnvVars: Codable, Equatable {
case userName = "MQTT_USERNAME"
case password = "MQTT_PASSWORD"
}
/// Represents the different app environments.
public enum AppEnv: String, Codable {
public enum AppEnv: String, Codable, Sendable {
case development
case production
case staging

View File

@@ -1,37 +0,0 @@
import CoreUnitTypes
/// Represents the different modes that the controller can be in.
public enum Mode: Equatable {
/// Allows controller to run in humidify or dehumidify mode.
case auto
/// Only handle humidify mode.
case humidifyOnly(HumidifyMode)
/// Only handle dehumidify mode.
case dehumidifyOnly(DehumidifyMode)
/// Don't control humidify or dehumidify modes.
case off
/// Represents the control modes for the humidify control state.
public enum HumidifyMode: Equatable {
/// Control humidifying based off dew-point.
case dewPoint(Temperature)
/// Control humidifying based off relative humidity.
case relativeHumidity(RelativeHumidity)
}
/// Represents the control modes for the dehumidify control state.
public enum DehumidifyMode: Equatable {
/// Control dehumidifying based off dew-point.
case dewPoint(high: Temperature, low: Temperature)
/// Control humidifying based off relative humidity.
case relativeHumidity(high: RelativeHumidity, low: RelativeHumidity)
}
}

View File

@@ -1,104 +0,0 @@
import Foundation
import Psychrometrics
// TODO: Make this a struct, then create a Store class that holds the state??
public final class State {
public var altitude: Length
public var sensors: Sensors
public var units: PsychrometricEnvironment.Units {
didSet {
PsychrometricEnvironment.shared.units = units
}
}
public init(
altitude: Length = .seaLevel,
sensors: Sensors = .init(),
units: PsychrometricEnvironment.Units = .imperial
) {
self.altitude = altitude
self.sensors = sensors
self.units = units
}
public struct Sensors: Equatable {
public var mixedAirSensor: TemperatureHumiditySensor<Mixed>
public var postCoilSensor: TemperatureHumiditySensor<PostCoil>
public var returnAirSensor: TemperatureHumiditySensor<Return>
public var supplyAirSensor: TemperatureHumiditySensor<Supply>
public init(
mixedAirSensor: TemperatureHumiditySensor<Mixed> = .init(),
postCoilSensor: TemperatureHumiditySensor<PostCoil> = .init(),
returnAirSensor: TemperatureHumiditySensor<Return> = .init(),
supplyAirSensor: TemperatureHumiditySensor<Supply> = .init()
) {
self.mixedAirSensor = mixedAirSensor
self.postCoilSensor = postCoilSensor
self.returnAirSensor = returnAirSensor
self.supplyAirSensor = supplyAirSensor
}
public var needsProcessed: Bool {
mixedAirSensor.needsProcessed
|| postCoilSensor.needsProcessed
|| returnAirSensor.needsProcessed
|| supplyAirSensor.needsProcessed
}
}
}
extension State.Sensors {
public struct TemperatureHumiditySensor<Location>: Equatable {
@TrackedChanges
public var temperature: Temperature?
@TrackedChanges
public var humidity: RelativeHumidity?
public var needsProcessed: Bool {
get { $temperature.needsProcessed || $humidity.needsProcessed }
set {
$temperature.needsProcessed = newValue
$humidity.needsProcessed = newValue
}
}
public func dewPoint(units: PsychrometricEnvironment.Units? = nil) -> DewPoint? {
guard let temperature = temperature,
let humidity = humidity,
!temperature.rawValue.isNaN,
!humidity.rawValue.isNaN
else { return nil }
return .init(dryBulb: temperature, humidity: humidity, units: units)
}
public func enthalpy(altitude: Length, units: PsychrometricEnvironment.Units? = nil) -> EnthalpyOf<MoistAir>? {
guard let temperature = temperature,
let humidity = humidity,
!temperature.rawValue.isNaN,
!humidity.rawValue.isNaN
else { return nil }
return .init(dryBulb: temperature, humidity: humidity, altitude: altitude, units: units)
}
public init(
temperature: Temperature? = nil,
humidity: RelativeHumidity? = nil,
needsProcessed: Bool = false
) {
self._temperature = .init(wrappedValue: temperature, needsProcessed: needsProcessed)
self._humidity = .init(wrappedValue: humidity, needsProcessed: needsProcessed)
}
}
// MARK: - Temperature / Humidity Sensor Location Namespaces
public enum Mixed { }
public enum PostCoil { }
public enum Return { }
public enum Supply { }
}

View File

@@ -0,0 +1,141 @@
import Dependencies
import PsychrometricClient
/// Represents a temperature and humidity sensor that can be used to derive
/// the dew-point temperature and enthalpy values.
///
/// > Note: Temperature values are received in `celsius`.
public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
@Dependency(\.psychrometricClient) private var psychrometrics
/// The identifier of the sensor, same as the location.
public var id: Location { location }
/// The altitude of the sensor.
public let altitude: Length
/// The current humidity value of the sensor.
@TrackedChanges
public var humidity: RelativeHumidity?
/// The location identifier of the sensor
public let location: Location
/// The current temperature value of the sensor.
@TrackedChanges
public var temperature: DryBulb?
/// The topics to listen for updated sensor values.
public let topics: Topics
/// Create a new temperature and humidity sensor.
///
/// - Parameters:
/// - location: The location of the sensor.
/// - altitude: The altitude of the sensor.
/// - temperature: The current temperature value of the sensor.
/// - humidity: The current relative humidity value of the sensor.
/// - needsProcessed: If the sensor needs to be processed.
public init(
location: Location,
altitude: Length = .feet(800.0),
temperature: DryBulb? = nil,
humidity: RelativeHumidity? = nil,
needsProcessed: Bool = false,
topics: Topics? = nil
) {
self.altitude = altitude
self.location = location
self._temperature = TrackedChanges(wrappedValue: temperature, needsProcessed: needsProcessed)
self._humidity = TrackedChanges(wrappedValue: humidity, needsProcessed: needsProcessed)
self.topics = topics ?? .init(location: location)
}
/// The calculated dew-point temperature of the sensor.
public var dewPoint: DewPoint? {
get async {
guard let temperature = temperature,
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
}
}
/// The calculated enthalpy of the sensor.
public var enthalpy: EnthalpyOf<MoistAir>? {
get async {
guard let temperature = temperature,
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.enthalpy.moistAir(
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
)
}
}
/// Check whether any of the sensor values have changed and need processed.
///
/// - Note: Setting a value will set to both the temperature and humidity properties.
public var needsProcessed: Bool {
get { $temperature.needsProcessed || $humidity.needsProcessed }
set {
$temperature.needsProcessed = newValue
$humidity.needsProcessed = newValue
}
}
/// Represents the different locations of a temperature and humidity sensor, which can
/// be used to derive the topic to both listen and publish new values to.
public enum Location: String, CaseIterable, Equatable, Hashable, Sendable {
case mixedAir = "mixed_air"
case postCoil = "post_coil"
case `return`
case supply
}
/// Represents the MQTT topics to listen for updated sensor values on.
public struct Topics: Equatable, Hashable, Sendable {
/// The dew-point temperature topic for the sensor.
public let dewPoint: String
/// The enthalpy topic for the sensor.
public let enthalpy: String
/// The humidity topic of the sensor.
public let humidity: String
/// The temperature topic of the sensor.
public let temperature: String
public init(
dewPoint: String,
enthalpy: String,
humidity: String,
temperature: String
) {
self.dewPoint = dewPoint
self.enthalpy = enthalpy
self.humidity = humidity
self.temperature = temperature
}
public init(topicPrefix: String? = "frankensystem", location: TemperatureAndHumiditySensor.Location) {
var prefix = topicPrefix ?? ""
if prefix.reversed().starts(with: "/") {
prefix = "\(prefix.dropLast())"
}
self.init(
dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
)
}
}
}

View File

@@ -1,269 +0,0 @@
/// A container for all the different topics that are needed by the application.
public struct Topics: Codable, Equatable {
/// The command topics the application can publish to.
public var commands: Commands
/// The sensor topics the application can read from / write to.
public var sensors: Sensors
/// The set point topics the application can read set point values from.
public var setPoints: SetPoints
/// The state topics the application can read state values from.
public var states: States
/// Create the topics required by the application.
///
/// - Parameters:
/// - sensors: The sensor topics.
/// - setPoints: The set point topics
/// - states: The states topics
/// - relays: The relay topics
public init(
commands: Commands = .init(),
sensors: Sensors = .init(),
setPoints: SetPoints = .init(),
states: States = .init()
) {
self.commands = commands
self.sensors = sensors
self.setPoints = setPoints
self.states = states
}
/// Represents the sensor topics.
public struct Sensors: Codable, Equatable {
public var mixedAirSensor: TemperatureAndHumiditySensor<State.Sensors.Mixed>
public var postCoilSensor: TemperatureAndHumiditySensor<State.Sensors.PostCoil>
public var returnAirSensor: TemperatureAndHumiditySensor<State.Sensors.Return>
public var supplyAirSensor: TemperatureAndHumiditySensor<State.Sensors.Supply>
public init(
mixedAirSensor: TemperatureAndHumiditySensor<State.Sensors.Mixed> = .default(location: "mixed=air"),
postCoilSensor: TemperatureAndHumiditySensor<State.Sensors.PostCoil> = .default(location: "post-coil"),
returnAirSensor: TemperatureAndHumiditySensor<State.Sensors.Return> = .default(location: "return"),
supplyAirSensor: TemperatureAndHumiditySensor<State.Sensors.Supply> = .default(location: "supply")
) {
self.mixedAirSensor = mixedAirSensor
self.postCoilSensor = postCoilSensor
self.returnAirSensor = returnAirSensor
self.supplyAirSensor = supplyAirSensor
}
public struct TemperatureAndHumiditySensor<Location>: Codable, Equatable {
public var temperature: String
public var humidity: String
public var dewPoint: String
public var enthalpy: String
/// Create a new sensor topic container.
///
/// - Parameters:
/// - temperature: The temperature sensor topic.
/// - humidity: The humidity sensor topic.
/// - dewPoint: The dew point sensor topic.
public init(
temperature: String,
humidity: String,
dewPoint: String,
enthalpy: String
) {
self.temperature = temperature
self.humidity = humidity
self.dewPoint = dewPoint
self.enthalpy = enthalpy
}
}
}
/// A container for set point related topics used by the application.
public struct SetPoints: Codable, Equatable {
/// The topic for the humidify set point.
public var humidify: Humidify
/// The topics for dehumidification set points.
public var dehumidify: Dehumidify
/// Create a new set point topic container.
///
/// - Parameters:
/// - humidify: The topic for humidification set points.
/// - dehumidify: The topics for dehumidification set points.
public init(
humidify: Humidify = .init(),
dehumidify: Dehumidify = .init()
) {
self.humidify = humidify
self.dehumidify = dehumidify
}
/// A container for the humidification set point topics used by the application.
public struct Humidify: Codable, Equatable {
/// The topic for dew point control mode set point.
public var dewPoint: String
/// The topic for relative humidity control mode set point.
public var relativeHumidity: String
/// Create a new container for the humidification set point topics.
///
/// - Parameters:
/// - dewPoint: The topic for dew point control mode set point.
/// - relativeHumidity: The topic for relative humidity control mode set point.
public init(
dewPoint: String = "set_points/humidify/dew_point",
relativeHumidity: String = "set_points/humidify/relative_humidity"
) {
self.dewPoint = dewPoint
self.relativeHumidity = relativeHumidity
}
}
/// A container for dehumidifcation set point topics.
public struct Dehumidify: Codable, Equatable {
/// A low setting for dew point control modes.
public var lowDewPoint: String
/// A high setting for dew point control modes.
public var highDewPoint: String
/// A low setting for relative humidity control modes.
public var lowRelativeHumidity: String
/// A high setting for relative humidity control modes.
public var highRelativeHumidity: String
/// Create a new container for dehumidification set point topics.
///
/// - Parameters:
/// - lowDewPoint: A low setting for dew point control modes.
/// - highDewPoint: A high setting for dew point control modes.
/// - lowRelativeHumidity: A low setting for relative humidity control modes.
/// - highRelativeHumidity: A high setting for relative humidity control modes.
public init(
lowDewPoint: String = "set_points/dehumidify/low_dew_point",
highDewPoint: String = "set_points/dehumidify/high_dew_point",
lowRelativeHumidity: String = "set_points/dehumidify/low_relative_humidity",
highRelativeHumidity: String = "set_points/dehumidify/high_relative_humidity"
) {
self.lowDewPoint = lowDewPoint
self.highDewPoint = highDewPoint
self.lowRelativeHumidity = lowRelativeHumidity
self.highRelativeHumidity = highRelativeHumidity
}
}
}
/// A container for control state topics used by the application.
public struct States: Codable, Equatable {
/// The topic for the control mode.
public var mode: String
/// The relay state topics.
public var relays: Relays
/// Create a new container for control state topics.
///
/// - Parameters:
/// - mode: The topic for the control mode.
public init(
mode: String = "states/mode",
relays: Relays = .init()
) {
self.mode = mode
self.relays = relays
}
/// A container for reading the current state of a relay.
public struct Relays: Codable, Equatable {
/// The dehumidification stage-1 relay topic.
public var dehumdification1: String
/// The dehumidification stage-2 relay topic.
public var dehumidification2: String
/// The humidification relay topic.
public var humdification: String
/// Create a new container for relay state topics.
///
/// - Parameters:
/// - dehumidification1: The dehumidification stage-1 relay topic.
/// - dehumidification2: The dehumidification stage-2 relay topic.
/// - humidification: The humidification relay topic.
public init(
dehumidefication1: String = "states/relays/dehumidification_1",
dehumidification2: String = "states/relays/dehumidification_2",
humidification: String = "states/relays/humidification"
) {
self.dehumdification1 = dehumidefication1
self.dehumidification2 = dehumidification2
self.humdification = humidification
}
}
}
/// A container for commands topics that the application can publish to.
public struct Commands: Codable, Equatable {
/// The relay command topics.
public var relays: Relays
/// Create a new command topics container.
///
/// - Parameters:
/// - relays: The relay command topics.
public init(relays: Relays = .init()) {
self.relays = relays
}
/// A container for relay command topics used by the application.
public struct Relays: Codable, Equatable {
/// The dehumidification stage-1 relay topic.
public var dehumidification1: String
/// The dehumidification stage-2 relay topic.
public var dehumidification2: String
/// The humidification relay topic.
public var humidification: String
/// Create a new container for commanding relays.
///
/// - Parameters:
/// - dehumidification1: The dehumidification stage-1 relay topic.
/// - dehumidification2: The dehumidification stage-2 relay topic.
/// - humidification: The humidification relay topic.
public init(
dehumidification1: String = "relays/dehumidification_1",
dehumidification2: String = "relays/dehumidification_2",
humidification: String = "relays/humidification"
) {
self.dehumidification1 = dehumidification1
self.dehumidification2 = dehumidification2
self.humidification = humidification
}
}
}
}
// MARK: Helpers
extension Topics.Sensors.TemperatureAndHumiditySensor {
public static func `default`(location: String) -> Self {
.init(
temperature: "sensors/\(location)/temperature",
humidity: "sensors/\(location)/humidity",
dewPoint: "sensors/\(location)/dew-point",
enthalpy: "sensors/\(location)/enthalpy"
)
}
}

60
Sources/Models/TrackedChanges.swift Normal file → Executable file
View File

@@ -1,11 +1,20 @@
/// A property wrapper that tracks changes of a property.
///
/// This allows values to only publish changes if they have changed since the
/// last time they were recieved.
@propertyWrapper
public struct TrackedChanges<Value> {
public struct TrackedChanges<Value: Sendable>: Sendable {
/// The current tracking state.
private var tracking: TrackingState
/// The current wrapped value.
private var value: Value
private var isEqual: (Value, Value) -> Bool
/// Used to check if a new value is equal to an old value.
private var isEqual: @Sendable (Value, Value) -> Bool
/// Access to the underlying property that we are wrapping.
public var wrappedValue: Value {
get { value }
set {
@@ -16,22 +25,34 @@ public struct TrackedChanges<Value> {
tracking = .needsProcessed
}
}
/// Create a new property that tracks it's changes.
///
/// - Parameters:
/// - wrappedValue: The value that we are wrapping.
/// - needsProcessed: Whether this value needs processed (default = false).
/// - isEqual: Method to compare old values against new values.
public init(
wrappedValue: Value,
needsProcessed: Bool = false,
isEqual: @escaping (Value, Value) -> Bool
isEqual: @escaping @Sendable (Value, Value) -> Bool
) {
self.value = wrappedValue
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
self.isEqual = isEqual
}
/// Represents whether a wrapped value has changed and needs processed or not.
enum TrackingState {
/// The state when nothing has changed and we've already processed the current value.
case hasProcessed
/// The state when the value has changed and has not been processed yet.
case needsProcessed
}
/// Whether the value needs processed.
public var needsProcessed: Bool {
get { tracking == .needsProcessed }
set {
@@ -42,7 +63,7 @@ public struct TrackedChanges<Value> {
}
}
}
public var projectedValue: Self {
get { self }
set { self = newValue }
@@ -54,11 +75,26 @@ extension TrackedChanges: Equatable where Value: Equatable {
lhs.wrappedValue == rhs.wrappedValue
&& lhs.needsProcessed == rhs.needsProcessed
}
/// Create a new property that tracks it's changes, using the default equality check.
///
/// - Parameters:
/// - wrappedValue: The value that we are wrapping.
/// - needsProcessed: Whether this value needs processed (default = false).
public init(
wrappedValue: Value,
needsProcessed: Bool = false
) {
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
$0 == $1
}
}
}
extension TrackedChanges: Hashable where Value: Hashable {
public func hash(into hasher: inout Hasher) {
hasher.combine(wrappedValue)
hasher.combine(needsProcessed)
}
}

View File

@@ -0,0 +1,42 @@
import NIO
import NIOFoundationCompat
import PsychrometricClient
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: inout ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let string = buffer.readString(
length: buffer.readableBytes,
encoding: String.Encoding.utf8
)
else { return nil }
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = RawValue(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}

View File

@@ -0,0 +1,206 @@
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import Models
import MQTTConnectionManager
import MQTTNIO
import NIO
import PsychrometricClient
import ServiceLifecycle
import TopicDependencies
/// Service that is responsible for listening to changes of the temperature and humidity
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
/// the sensor location.
///
///
public actor SensorsService: Service {
@Dependency(\.mqttConnectionManager.stream) var connectionStream
@Dependency(\.topicListener) var topicListener
@Dependency(\.topicPublisher) var topicPublisher
/// The logger to use for the service.
private let logger: Logger?
/// The sensors that we are listening for updates to, so
/// that we can calculate the dew-point temperature and enthalpy
/// values to publish back to the MQTT broker.
var sensors: [TemperatureAndHumiditySensor]
var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
}
/// Create a new sensors service that listens to the passed in
/// sensors.
///
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
///
/// - Parameters:
/// - sensors: The sensors to listen for changes to.
/// - logger: An optional logger to use.
public init(
sensors: [TemperatureAndHumiditySensor],
logger: Logger? = nil
) {
self.sensors = sensors
self.logger = logger
}
/// Start the service with graceful shutdown, which will attempt to publish
/// any pending changes to the MQTT broker, upon a shutdown signal.
public func run() async throws {
precondition(sensors.count > 0, "Sensors should not be empty.")
try await withGracefulShutdownHandler {
// Listen for connection events, so that we can automatically
// reconnect any sensor topics we're listening to upon a disconnect / reconnect
// event. We can also shutdown any topic listeners upon a shutdown event.
for await event in try connectionStream().cancelOnGracefulShutdown() {
switch event {
case .shuttingDown:
logger?.debug("Received shutdown event.")
try await self.shutdown()
case .disconnected:
logger?.debug("Received disconnected event.")
try await Task.sleep(for: .milliseconds(100))
case .connected:
logger?.debug("Received connected event.")
let stream = try await makeStream()
for await result in stream.cancelOnGracefulShutdown() {
logger?.debug("Received result for topic: \(result.topic)")
await self.handleResult(result)
}
}
}
} onGracefulShutdown: {
Task {
self.logger?.debug("Received graceful shutdown.")
try await self.shutdown()
}
}
}
@_spi(Internal)
public func shutdown() async throws {
try await publishUpdates()
topicListener.shutdown()
}
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
try await topicListener.listen(to: topics)
// ignore errors, so that we continue to listen, but log them
// for debugging purposes.
.compactMap { result in
switch result {
case let .failure(error):
self.logger?.debug("Received error listening for sensors: \(error)")
return nil
case let .success(info):
return (info.payload, info.topicName)
}
}
// ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently.
.removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic
}
.eraseToStream()
}
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
do {
let topic = result.topic
assert(topics.contains(topic))
logger?.debug("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
var buffer = result.buffer
return V(buffer: &buffer)
}
if topic.contains("temperature") {
logger?.debug("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else {
logger?.debug("Failed to decode temperature: \(result.buffer)")
throw DecodingError()
}
logger?.debug("Decoded temperature: \(temperature)")
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
} else if topic.contains("humidity") {
logger?.debug("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
logger?.debug("Failed to decode humidity: \(result.buffer)")
throw DecodingError()
}
logger?.debug("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
}
try await publishUpdates()
logger?.debug("Done handling result for topic: \(topic)")
} catch {
logger?.error("Received error while handling result: \(error)")
}
}
private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return }
try await topicPublisher.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(double)"),
qos: .exactlyOnce,
retain: true
)
logger?.debug("Published update to topic: \(topic)")
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
try sensors.hasProcessed(sensor)
}
}
}
// MARK: - Errors
struct DecodingError: Error {}
struct SensorNotFoundError: Error {}
// MARK: - Helpers
private extension TemperatureAndHumiditySensor.Topics {
func contains(_ topic: String) -> Bool {
temperature == topic || humidity == topic
}
}
private extension Array where Element == TemperatureAndHumiditySensor {
mutating func update<V>(
topic: String,
keyPath: WritableKeyPath<TemperatureAndHumiditySensor, V>,
with value: V
) throws {
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
throw SensorNotFoundError()
}
self[index][keyPath: keyPath] = value
}
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
throw SensorNotFoundError()
}
self[index].needsProcessed = false
}
}

View File

@@ -0,0 +1,186 @@
import Dependencies
import DependenciesMacros
import Foundation
import MQTTNIO
/// A dependency that can generate an async stream of changes to the given topics.
///
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
/// to generate the live dependency.
@DependencyClient
public struct TopicListener: Sendable {
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, any Error>>
/// Create an async stream that listens for changes to the given topics.
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
/// Shutdown the listener stream.
public var shutdown: @Sendable () -> Void
/// Create a new topic listener.
///
/// - Parameters:
/// - listen: Generate an async stream of changes for the given topics.
/// - shutdown: Shutdown the topic listener stream.
public init(
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
shutdown: @Sendable @escaping () -> Void
) {
self._listen = listen
self.shutdown = shutdown
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await listen(to: topics, qos: qos)
}
/// Create the live implementation of the topic listener with the given MQTTClient.
///
/// - Parameters:
/// - client: The MQTTClient to use.
public static func live(client: MQTTClient) -> Self {
let listener = MQTTTopicListener(client: client)
return .init(
listen: { try await listener.listen($0, $1) },
shutdown: { listener.shutdown() }
)
}
}
extension TopicListener: TestDependencyKey {
public static var testValue: TopicListener { Self() }
}
public extension DependencyValues {
var topicListener: TopicListener {
get { self[TopicListener.self] }
set { self[TopicListener.self] = newValue }
}
}
// MARK: - Helpers
private actor MQTTTopicListener {
private let client: MQTTClient
private let continuation: TopicListener.Stream.Continuation
private let name: String
let stream: TopicListener.Stream
private var shuttingDown: Bool = false
init(
client: MQTTClient
) {
let (stream, continuation) = TopicListener.Stream.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws -> TopicListener.Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw TopicListenerError.connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
let subscription = try? await client.subscribe(to: topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
})
guard subscription != nil else {
client.logger.error("Error subscribing to topics: \(topics)")
throw TopicListenerError.failedToSubscribe
}
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.client.logger.error("Received error while listening: \(error)")
self.continuation.yield(.failure(MQTTListenResultError(error)))
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(.success(publishInfo))
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
}
nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task { await self.setIsShuttingDown() }
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -0,0 +1,117 @@
import Dependencies
import DependenciesMacros
import MQTTNIO
import NIO
/// A dependency that is responsible for publishing values to an MQTT broker.
///
/// - Note: This dependency only conforms to `TestDependencyKey` because it
/// requires an active `MQTTClient` to generate the live dependency.
@DependencyClient
public struct TopicPublisher: Sendable {
private var _publish: @Sendable (PublishRequest) async throws -> Void
/// Create a new topic publisher.
///
/// - Parameters:
/// - publish: Handle the publish request.
public init(
publish: @Sendable @escaping (PublishRequest) async throws -> Void
) {
self._publish = publish
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - topicName: The topic to publish the new value to.
/// - payload: The value to publish.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
to topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await _publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Create the live topic publisher with the given `MQTTClient`.
///
/// - Parameters:
/// - client: The mqtt broker client to use.
public static func live(client: MQTTClient) -> Self {
.init(
publish: { request in
guard client.isActive() else {
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
return
}
client.logger.trace("Begin publishing to topic: \(request.topicName)")
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
}
)
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
extension TopicPublisher: TestDependencyKey {
public static var testValue: TopicPublisher { Self() }
}
public extension DependencyValues {
/// A dependency that is responsible for publishing values to an MQTT broker.
var topicPublisher: TopicPublisher {
get { self[TopicPublisher.self] }
set { self[TopicPublisher.self] = newValue }
}
}

View File

@@ -1,52 +0,0 @@
import Models
// TODO: Fix other live topics
extension Topics {
public static let live = Self.init(
commands: .init(),
sensors: .init(
mixedAirSensor: .live(location: .mixedAir),
postCoilSensor: .live(location: .postCoil),
returnAirSensor: .live(location: .return),
supplyAirSensor: .live(location: .supply)),
setPoints: .init(),
states: .init()
)
}
extension Topics.Sensors {
fileprivate enum Location: CustomStringConvertible {
case mixedAir
case postCoil
case `return`
case supply
var description: String {
switch self {
case .mixedAir:
return "mixed_air"
case .postCoil:
return "post_coil"
case .return:
return "return"
case .supply:
return "supply"
}
}
}
}
extension Topics.Sensors.TemperatureAndHumiditySensor {
fileprivate static func live(
prefix: String = "frankensystem",
location: Topics.Sensors.Location
) -> Self {
.init(
temperature: "\(prefix)/sensor/\(location.description)_temperature/state",
humidity: "\(prefix)/sensor/\(location.description)_humidity/state",
dewPoint: "\(prefix)/sensor/\(location.description)_dew_point/state",
enthalpy: "\(prefix)/sensor/\(location.description)_enthalpy/state"
)
}
}

View File

@@ -0,0 +1,119 @@
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
import TopicDependencies
@main
struct Application {
/// The main entry point of the application.
static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var logger = Logger(label: "dewpoint-controller")
logger.logLevel = .trace
logger.info("Starting dewpoint-controller!")
let environment = loadEnvVars(logger: logger)
if environment.appEnv == .production {
logger.debug("Updating logging level to info.")
logger.logLevel = .info
}
let mqtt = MQTTClient(
envVars: environment,
eventLoopGroup: eventloopGroup,
logger: logger
)
do {
try await withDependencies {
$0.psychrometricClient = .liveValue
$0.topicListener = .live(client: mqtt)
$0.topicPublisher = .live(client: mqtt)
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
try await serviceGroup.run()
}
try await mqtt.shutdown()
try await eventloopGroup.shutdownGracefully()
} catch {
try await eventloopGroup.shutdownGracefully()
}
}
}
// MARK: - Helpers
private func loadEnvVars(logger: Logger) -> EnvVars {
let defaultEnvVars = EnvVars()
let encoder = JSONEncoder()
let decoder = JSONDecoder()
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
let envVarsDict = defaultEnvDict
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
logger.debug("Done loading EnvVars...")
return envVars
}
private extension MQTTNIO.MQTTClient {
convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .v3_1_1,
disablePing: false,
userName: envVars.userName,
password: envVars.password
)
)
}
}
private extension Array where Element == TemperatureAndHumiditySensor {
static var live: Self {
TemperatureAndHumiditySensor.Location.allCases.map { location in
TemperatureAndHumiditySensor(location: location)
}
}
}

View File

@@ -1,73 +0,0 @@
import Bootstrap
import ClientLive
import CoreUnitTypes
import Logging
import Models
import MQTTNIO
import NIO
import TopicsLive
import Foundation
var logger: Logger = {
var logger = Logger(label: "dewPoint-logger")
logger.logLevel = .debug
return logger
}()
logger.info("Starting Swift Dew Point Controller!")
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var environment = try bootstrap(eventLoopGroup: eventLoopGroup, logger: logger, autoConnect: false).wait()
// Set the log level to info only in production mode.
if environment.envVars.appEnv == .production {
logger.logLevel = .info
}
// Set up the client, topics and state.
environment.topics = .live
let state = State()
let client = Client.live(client: environment.mqttClient, state: state, topics: environment.topics)
defer {
logger.debug("Disconnecting")
}
// Add topic listeners.
client.addListeners()
while true {
if !environment.mqttClient.isActive() {
logger.trace("Connecting to MQTT broker...")
try client.connect().wait()
try client.subscribe().wait()
Thread.sleep(forTimeInterval: 1)
}
// Check if sensors need processed.
if state.sensors.needsProcessed {
logger.debug("Sensor state has changed...")
if state.sensors.mixedAirSensor.needsProcessed {
logger.trace("Publishing mixed air sensor.")
try client.publishSensor(.mixed(state.sensors.mixedAirSensor)).wait()
}
if state.sensors.postCoilSensor.needsProcessed {
logger.trace("Publishing post coil sensor.")
try client.publishSensor(.postCoil(state.sensors.postCoilSensor)).wait()
}
if state.sensors.returnAirSensor.needsProcessed {
logger.trace("Publishing return air sensor.")
try client.publishSensor(.return(state.sensors.returnAirSensor)).wait()
}
if state.sensors.supplyAirSensor.needsProcessed {
logger.trace("Publishing supply air sensor.")
try client.publishSensor(.supply(state.sensors.supplyAirSensor)).wait()
}
}
// logger.debug("Fetching dew point...")
//
// logger.debug("Published dew point...")
Thread.sleep(forTimeInterval: 5)
}

View File

@@ -1,45 +0,0 @@
import XCTest
import EnvVars
import Logging
import Models
@testable import ClientLive
import Psychrometrics
final class AsyncClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
logger.logLevel = .trace
return logger
}()
func createClient(identifier: String) -> AsyncClient {
let envVars = EnvVars.init(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
return .init(envVars: envVars, logger: Self.logger)
}
func testConnectAndShutdown() async throws {
let client = createClient(identifier: "testConnectAndShutdown")
await client.connect()
await client.shutdown()
}
func testPublishingSensor() async throws {
let client = createClient(identifier: "testPublishingSensor")
await client.connect()
let topic = Topics().sensors.mixedAirSensor.dewPoint
try await client.addPublishListener(topic: topic, decoding: Temperature.self)
try await client.publishSensor(.mixed(.init(temperature: 71.123, humidity: 50.5, needsProcessed: true)))
try await client.publishSensor(.mixed(.init(temperature: 72.123, humidity: 50.5, needsProcessed: true)))
await client.shutdown()
}
}

View File

@@ -1,186 +0,0 @@
import Client
@testable import ClientLive
import CoreUnitTypes
import Foundation
import Logging
import Models
import MQTTNIO
import NIO
import NIOConcurrencyHelpers
import XCTest
final class ClientLiveTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
let topics = Topics()
// func test_mqtt_subscription() throws {
// let mqttClient = createMQTTClient(identifier: "test_subscription")
// _ = try mqttClient.connect().wait()
// let sub = try mqttClient.v5.subscribe(
// to: [mqttClient.mqttSubscription(topic: "test/subscription")]
// ).wait()
// XCTAssertEqual(sub.reasons[0], .grantedQoS1)
// try mqttClient.disconnect().wait()
// try mqttClient.syncShutdownGracefully()
// }
func test_mqtt_listener() throws {
let lock = Lock()
var publishRecieved: [MQTTPublishInfo] = []
let payloadString = "test"
let payload = ByteBufferAllocator().buffer(string: payloadString)
let client = self.createMQTTClient(identifier: "testMQTTListener_publisher")
_ = try client.connect().wait()
client.addPublishListener(named: "test") { result in
switch result {
case .success(let publish):
var buffer = publish.payload
let string = buffer.readString(length: buffer.readableBytes)
XCTAssertEqual(string, payloadString)
lock.withLock {
publishRecieved.append(publish)
}
case .failure(let error):
XCTFail("\(error)")
}
}
try client.publish(to: "testMQTTSubscribe", payload: payload, qos: .atLeastOnce, retain: true).wait()
let sub = try client.v5.subscribe(to: [.init(topicFilter: "testMQTTSubscribe", qos: .atLeastOnce)]).wait()
XCTAssertEqual(sub.reasons[0], .grantedQoS1)
Thread.sleep(forTimeInterval: 2)
lock.withLock {
XCTAssertEqual(publishRecieved.count, 1)
}
try client.disconnect().wait()
try client.syncShutdownGracefully()
}
func test_client2_returnTemperature_listener() throws {
let mqttClient = createMQTTClient(identifier: "return-temperature-tests")
let state = State()
let topics = Topics()
let client = Client.live(client: mqttClient, state: state, topics: topics)
client.addListeners()
try client.connect().wait()
try client.subscribe().wait()
_ = try mqttClient.publish(
to: topics.sensors.returnAirSensor.temperature,
payload: ByteBufferAllocator().buffer(string: "75.1234"),
qos: .atLeastOnce
).wait()
Thread.sleep(forTimeInterval: 2)
XCTAssertEqual(state.sensors.returnAirSensor.temperature, .celsius(75.1234))
try mqttClient.disconnect().wait()
try mqttClient.syncShutdownGracefully()
// try client.shutdown().wait()
}
func test_client2_returnSensor_publish() throws {
let mqttClient = createMQTTClient(identifier: "return-temperature-tests")
let state = State()
let topics = Topics()
let client = Client.live(client: mqttClient, state: state, topics: topics)
client.addListeners()
try client.connect().wait()
try client.subscribe().wait()
_ = try mqttClient.publish(
to: topics.sensors.returnAirSensor.temperature,
payload: ByteBufferAllocator().buffer(string: "75.1234"),
qos: .atLeastOnce
).wait()
_ = try mqttClient.publish(
to: topics.sensors.returnAirSensor.humidity,
payload: ByteBufferAllocator().buffer(string: "\(50.0)"),
qos: .atLeastOnce
).wait()
Thread.sleep(forTimeInterval: 2)
XCTAssert(state.sensors.returnAirSensor.needsProcessed)
try client.publishSensor(.return(state.sensors.returnAirSensor)).wait()
XCTAssertFalse(state.sensors.returnAirSensor.needsProcessed)
try mqttClient.disconnect().wait()
try mqttClient.syncShutdownGracefully()
// try client.shutdown().wait()
}
// func test_fetch_humidity() throws {
// let lock = Lock()
// let publishClient = createMQTTClient(identifier: "publishHumidity")
// let mqttClient = createMQTTClient(identifier: "fetchHumidity")
// _ = try publishClient.connect().wait()
// let client = try createClient(mqttClient: mqttClient)
// var humidityRecieved: [RelativeHumidity] = []
//
// _ = try publishClient.publish(
// to: topics.sensors.humidity,
// payload: ByteBufferAllocator().buffer(string: "\(50.0)"),
// qos: .atLeastOnce
// ).wait()
//
// Thread.sleep(forTimeInterval: 2)
// try publishClient.disconnect().wait()
// let humidity = try client.fetchHumidity(.init(topic: self.topics.sensors.humidity)).wait()
// XCTAssertEqual(humidity, 50)
// Thread.sleep(forTimeInterval: 2)
// lock.withLock {
// humidityRecieved.append(humidity)
// }
// try mqttClient.disconnect().wait()
// try mqttClient.syncShutdownGracefully()
// }
// MARK: - Helpers
func createMQTTClient(identifier: String) -> MQTTNIO.MQTTClient {
MQTTNIO.MQTTClient(
host: Self.hostname,
port: 1883,
identifier: identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: self.logger,
configuration: .init(version: .v5_0)
)
}
// func createWebSocketClient(identifier: String) -> MQTTNIO.MQTTClient {
// MQTTNIO.MQTTClient(
// host: Self.hostname,
// port: 8080,
// identifier: identifier,
// eventLoopGroupProvider: .createNew,
// logger: self.logger,
// configuration: .init(useWebSockets: true, webSocketURLPath: "/mqtt")
// )
// }
// Uses default topic names.
// func createClient(mqttClient: MQTTNIO.MQTTClient, autoConnect: Bool = true) throws -> Client.MQTTClient {
// if autoConnect {
// _ = try mqttClient.connect().wait()
// }
// return .live(client: mqttClient, topics: .init())
// }
let logger: Logger = {
var logger = Logger(label: "MQTTTests")
logger.logLevel = .trace
return logger
}()
let eventLoopGroup = MultiThreadedEventLoopGroup.init(numberOfThreads: 1)
}

View File

@@ -0,0 +1,92 @@
import AsyncAlgorithms
import Logging
import Models
@_spi(Internal) import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import ServiceLifecycle
import ServiceLifecycleTestKit
import XCTest
final class MQTTConnectionServiceTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "MQTTConnectionServiceTests")
logger.logLevel = .trace
return logger
}()
// TODO: Move to integration tests.
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTConnectionManager.Event]()
var events2 = [MQTTConnectionManager.Event]()
let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
manager.shutdown()
try await client.disconnect()
try await Task.sleep(for: .seconds(1))
try await client.shutdown()
try await Task.sleep(for: .seconds(1))
connectionStream1.stop()
connectionStream2.stop()
}
for await event in stream1.removeDuplicates() {
events1.append(event)
}
for await event in stream2.removeDuplicates() {
events2.append(event)
}
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
}

View File

@@ -0,0 +1,175 @@
import Dependencies
import Logging
import Models
@_spi(Internal) import MQTTConnectionManager
import MQTTNIO
import NIO
import PsychrometricClientLive
@_spi(Internal) import SensorsService
import TopicDependencies
import XCTest
final class SensorsClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "SensorsClientTests")
logger.logLevel = .trace
return logger
}()
override func invokeTest() {
let client = createClient(identifier: "\(Self.self)")
withDependencies {
$0.mqttConnectionManager = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue
$0.topicListener = .live(client: client)
$0.topicPublisher = .live(client: client)
} operation: {
super.invokeTest()
}
}
func testListeningResumesAfterDisconnectThenReconnect() async throws {
@Dependency(\.mqttConnectionManager) var manager
struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return)
var results = [TopicPublisher.PublishRequest]()
try await withDependencies {
$0.topicPublisher = .capturing { results.append($0) }
} operation: {
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() }
defer { task.cancel() }
try await manager.connect()
defer { manager.shutdown() }
try await manager.withClient { client in
try await client.disconnect()
try await client.connect()
try await Task.sleep(for: .milliseconds(100))
try await client.publish(
to: sensor.topics.temperature,
payload: ByteBufferAllocator().buffer(string: "25"),
qos: .atLeastOnce,
retain: false
)
try await client.publish(
to: sensor.topics.humidity,
payload: ByteBufferAllocator().buffer(string: "50"),
qos: .atLeastOnce,
retain: false
)
}
var timeoutCount = 0
while !(results.count == 2) {
guard timeoutCount < 20 else {
throw TimeoutError()
}
try await Task.sleep(for: .milliseconds(100))
timeoutCount += 1
}
XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
try await sensorsService.shutdown()
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
}
// MARK: Helpers for tests.
class PublishInfoContainer {
private(set) var info: [MQTTPublishInfo]
private var topicFilters: [String]?
init(topicFilters: [String]? = nil) {
self.info = []
self.topicFilters = topicFilters
}
func addPublishInfo(_ info: MQTTPublishInfo) async {
guard let topicFilters else {
self.info.append(info)
return
}
if topicFilters.contains(info.topicName) {
self.info.append(info)
}
}
}
extension TopicPublisher {
static func capturing(
_ callback: @escaping (PublishRequest) -> Void
) -> Self {
.init { callback($0) }
}
}
// extension SensorsClient {
//
// static func testing(
// yielding: [(value: Double, to: String)],
// capturePublishedValues: @escaping (Double, String) -> Void,
// captureShutdownEvent: @escaping (Bool) -> Void
// ) -> Self {
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
// let logger = Logger(label: "\(Self.self).testing")
//
// return .init(
// listen: { topics in
// for (value, topic) in yielding where topics.contains(topic) {
// continuation.yield(
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
// )
// }
// return stream
// },
// logger: logger,
// publish: { value, topic in
// capturePublishedValues(value, topic)
// },
// shutdown: {
// captureShutdownEvent(true)
// continuation.finish()
// }
// )
// }
// }
struct TopicNotFoundError: Error {}

View File

@@ -1,47 +0,0 @@
import XCTest
import class Foundation.Bundle
//final class dewPoint_controllerTests: XCTestCase {
// func testExample() throws {
// // This is an example of a functional test case.
// // Use XCTAssert and related functions to verify your tests produce the correct
// // results.
//
// // Some of the APIs that we use below are available in macOS 10.13 and above.
// guard #available(macOS 10.13, *) else {
// return
// }
//
// // Mac Catalyst won't have `Process`, but it is supported for executables.
// #if !targetEnvironment(macCatalyst)
//
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
//
// let process = Process()
// process.executableURL = fooBinary
//
// let pipe = Pipe()
// process.standardOutput = pipe
//
// try process.run()
// process.waitUntilExit()
//
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
// let output = String(data: data, encoding: .utf8)
//
// XCTAssertEqual(output, "Hello, world!\n")
// #endif
// }
//
// /// Returns path to the built products directory.
// var productsDirectory: URL {
// #if os(macOS)
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
// return bundle.bundleURL.deletingLastPathComponent()
// }
// fatalError("couldn't find the products directory")
// #else
// return Bundle.main.bundleURL
// #endif
// }
//}

22
docker-compose.yaml Normal file → Executable file
View File

@@ -1,20 +1,27 @@
# run this with docker-compose -f docker/docker-compose.yml run test
# run this with docker-compose run test
services:
server:
image: swift-mqtt-dewpoint:latest
restart: unless-stopped
env_file: .env
local:
container_name: local-server
build:
context: .
dockerfile: Dockerfile
depends_on:
- mosquitto
environment:
- MQTT_HOST=mosquitto
test:
image: swift:latest
#build:
#context: ./
platform: linux/amd64
build:
context: .
dockerfile: Dockerfile.test
working_dir: /app
networks:
- test
volumes:
- .:/app
depends_on:
- mosquitto-test
environment:
@@ -44,4 +51,3 @@ networks:
test:
driver: bridge
external: false

0
mosquitto/config/mosquitto.conf Normal file → Executable file
View File