Compare commits
18 Commits
0.1.3
...
f68ac528e4
| Author | SHA1 | Date | |
|---|---|---|---|
|
f68ac528e4
|
|||
|
10294801fc
|
|||
|
a65605e9e7
|
|||
|
320a733d12
|
|||
|
936dd0b816
|
|||
|
a87addaf0b
|
|||
|
e2683d3f06
|
|||
|
6c5115dcde
|
|||
|
90c5b7c77f
|
|||
|
79bb162434
|
|||
|
529b9b0bc5
|
|||
|
48d51419d7
|
|||
|
adc7fc1295
|
|||
|
f40c4ef859
|
|||
|
e6d1d4578d
|
|||
|
408e0484cd
|
|||
|
19b2eb42c5
|
|||
|
7122fc818b
|
0
.dockerignore
Normal file → Executable file
0
.dockerignore
Normal file → Executable file
7
.editorconfig
Normal file
7
.editorconfig
Normal file
@@ -0,0 +1,7 @@
|
||||
root = true
|
||||
|
||||
[*.swift]
|
||||
indent_style = space
|
||||
indent_size = 2
|
||||
tab_width = 2
|
||||
trim_trailing_whitespace = true
|
||||
1
.github/workflows/release.yml
vendored
Normal file → Executable file
1
.github/workflows/release.yml
vendored
Normal file → Executable file
@@ -56,7 +56,6 @@ jobs:
|
||||
push: true
|
||||
tags: ${{ steps.meta.outputs.tags }}
|
||||
labels: ${{ steps.meta.outputs.labels }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
|
||||
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
|
||||
- name: Generate artifact attestation
|
||||
|
||||
1
.gitignore
vendored
Normal file → Executable file
1
.gitignore
vendored
Normal file → Executable file
@@ -9,3 +9,4 @@ DerivedData/
|
||||
.topics
|
||||
mqtt_password.txt
|
||||
.env
|
||||
.smbdelete*
|
||||
|
||||
10
.swiftformat
Normal file
10
.swiftformat
Normal file
@@ -0,0 +1,10 @@
|
||||
--self init-only
|
||||
--indent 2
|
||||
--ifdef indent
|
||||
--trimwhitespace always
|
||||
--wraparguments before-first
|
||||
--wrapparameters before-first
|
||||
--wrapcollections preserve
|
||||
--wrapconditions after-first
|
||||
--typeblanklines preserve
|
||||
--commas inline
|
||||
0
.swiftpm/xcode/xcshareddata/xcschemes/Bootstrap.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/Bootstrap.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/Client.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/Client.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/ClientLive.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/ClientLive.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/DewPointEnvironment.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/DewPointEnvironment.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/EnvVars.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/EnvVars.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/Models.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/Models.xcscheme
Normal file → Executable file
34
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme
Normal file → Executable file
34
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme
Normal file → Executable file
@@ -146,6 +146,20 @@
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "SensorsService"
|
||||
BuildableName = "SensorsService"
|
||||
BlueprintName = "SensorsService"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
</BuildActionEntries>
|
||||
</BuildAction>
|
||||
<TestAction
|
||||
@@ -174,6 +188,26 @@
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "MQTTConnectionServiceTests"
|
||||
BuildableName = "MQTTConnectionServiceTests"
|
||||
BlueprintName = "MQTTConnectionServiceTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "SensorsServiceTests"
|
||||
BuildableName = "SensorsServiceTests"
|
||||
BlueprintName = "SensorsServiceTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
</Testables>
|
||||
</TestAction>
|
||||
<LaunchAction
|
||||
|
||||
0
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller.xcscheme
Normal file → Executable file
0
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller.xcscheme
Normal file → Executable file
0
Bootstrap/dewPoint-env-example
Normal file → Executable file
0
Bootstrap/dewPoint-env-example
Normal file → Executable file
0
Bootstrap/topics-example
Normal file → Executable file
0
Bootstrap/topics-example
Normal file → Executable file
0
Dockerfile
Normal file → Executable file
0
Dockerfile
Normal file → Executable file
6
Dockerfile.test
Normal file
6
Dockerfile.test
Normal file
@@ -0,0 +1,6 @@
|
||||
FROM swift:5.10
|
||||
WORKDIR /app
|
||||
COPY ./Package.* ./
|
||||
RUN swift package resolve
|
||||
COPY . .
|
||||
CMD ["/bin/bash", "-xc", "swift", "test"]
|
||||
10
Makefile
Normal file → Executable file
10
Makefile
Normal file → Executable file
@@ -8,7 +8,10 @@ bootstrap-topics:
|
||||
bootstrap: bootstrap-env bootstrap-topics
|
||||
|
||||
build:
|
||||
@swift build
|
||||
@swift build -Xswiftc -strict-concurrency=complete
|
||||
|
||||
clean:
|
||||
rm -rf .build
|
||||
|
||||
run:
|
||||
@swift run dewPoint-controller
|
||||
@@ -20,5 +23,8 @@ stop-mosquitto:
|
||||
@docker-compose rm -f mosquitto || true
|
||||
|
||||
test-docker:
|
||||
@docker-compose run -i test
|
||||
@docker-compose run --remove-orphans -i --rm test
|
||||
@docker-compose kill mosquitto-test
|
||||
@docker-compose rm -f
|
||||
|
||||
test: test-docker
|
||||
|
||||
189
Package.resolved
Normal file → Executable file
189
Package.resolved
Normal file → Executable file
@@ -1,61 +1,168 @@
|
||||
{
|
||||
"object": {
|
||||
"pins": [
|
||||
"originHash" : "d3104d51323f6bc98cf3ab2930e5a26f72c9a4fdb7640360ba27628672397841",
|
||||
"pins" : [
|
||||
{
|
||||
"package": "mqtt-nio",
|
||||
"repositoryURL": "https://github.com/swift-server-community/mqtt-nio.git",
|
||||
"state": {
|
||||
"branch": null,
|
||||
"revision": "ca8af7a30c4690456ce7de276cd0f037489ba707",
|
||||
"version": "2.5.3"
|
||||
"identity" : "combine-schedulers",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/combine-schedulers",
|
||||
"state" : {
|
||||
"revision" : "9fa31f4403da54855f1e2aeaeff478f4f0e40b13",
|
||||
"version" : "1.0.2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"package": "swift-log",
|
||||
"repositoryURL": "https://github.com/apple/swift-log.git",
|
||||
"state": {
|
||||
"branch": null,
|
||||
"revision": "5d66f7ba25daf4f94100e7022febf3c75e37a6c7",
|
||||
"version": "1.4.2"
|
||||
"identity" : "mqtt-nio",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/swift-server-community/mqtt-nio.git",
|
||||
"state" : {
|
||||
"revision" : "267b83ab5690d463ff00585a4fd6dc54b698e1d2",
|
||||
"version" : "2.11.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"package": "swift-nio",
|
||||
"repositoryURL": "https://github.com/apple/swift-nio",
|
||||
"state": {
|
||||
"branch": null,
|
||||
"revision": "6aa9347d9bc5bbfe6a84983aec955c17ffea96ef",
|
||||
"version": "2.33.0"
|
||||
"identity" : "swift-async-algorithms",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-async-algorithms.git",
|
||||
"state" : {
|
||||
"revision" : "5c8bd186f48c16af0775972700626f0b74588278",
|
||||
"version" : "1.0.2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"package": "swift-nio-ssl",
|
||||
"repositoryURL": "https://github.com/apple/swift-nio-ssl.git",
|
||||
"state": {
|
||||
"branch": null,
|
||||
"revision": "b5260a31c2a72a89fa684f5efb3054d8725a2316",
|
||||
"version": "2.18.0"
|
||||
"identity" : "swift-atomics",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-atomics.git",
|
||||
"state" : {
|
||||
"revision" : "cd142fd2f64be2100422d658e7411e39489da985",
|
||||
"version" : "1.2.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"package": "swift-nio-transport-services",
|
||||
"repositoryURL": "https://github.com/apple/swift-nio-transport-services.git",
|
||||
"state": {
|
||||
"branch": null,
|
||||
"revision": "8ab824b140d0ebcd87e9149266ddc353e3705a3e",
|
||||
"version": "1.11.4"
|
||||
"identity" : "swift-clocks",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/swift-clocks",
|
||||
"state" : {
|
||||
"revision" : "b9b24b69e2adda099a1fa381cda1eeec272d5b53",
|
||||
"version" : "1.0.5"
|
||||
}
|
||||
},
|
||||
{
|
||||
"package": "swift-psychrometrics",
|
||||
"repositoryURL": "https://github.com/swift-psychrometrics/swift-psychrometrics",
|
||||
"state": {
|
||||
"branch": null,
|
||||
"revision": "03573545c3750b406921eb22a9575c8062beef88",
|
||||
"version": "0.1.2"
|
||||
"identity" : "swift-collections",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-collections.git",
|
||||
"state" : {
|
||||
"revision" : "671108c96644956dddcd89dd59c203dcdb36cec7",
|
||||
"version" : "1.1.4"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"version": 1
|
||||
{
|
||||
"identity" : "swift-concurrency-extras",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/swift-concurrency-extras",
|
||||
"state" : {
|
||||
"revision" : "6054df64b55186f08b6d0fd87152081b8ad8d613",
|
||||
"version" : "1.2.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-dependencies",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/swift-dependencies",
|
||||
"state" : {
|
||||
"revision" : "0fc0255e780bf742abeef29dec80924f5f0ae7b9",
|
||||
"version" : "1.4.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-log",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-log.git",
|
||||
"state" : {
|
||||
"revision" : "9cb486020ebf03bfa5b5df985387a14a98744537",
|
||||
"version" : "1.6.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-nio",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio",
|
||||
"state" : {
|
||||
"revision" : "914081701062b11e3bb9e21accc379822621995e",
|
||||
"version" : "2.76.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-nio-ssl",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio-ssl.git",
|
||||
"state" : {
|
||||
"revision" : "b5260a31c2a72a89fa684f5efb3054d8725a2316",
|
||||
"version" : "2.18.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-nio-transport-services",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio-transport-services.git",
|
||||
"state" : {
|
||||
"revision" : "8ab824b140d0ebcd87e9149266ddc353e3705a3e",
|
||||
"version" : "1.11.4"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-psychrometrics",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/swift-psychrometrics/swift-psychrometrics",
|
||||
"state" : {
|
||||
"revision" : "6a457f3cefd9477f7aa76b2fb8ad557988c447bd",
|
||||
"version" : "0.2.3"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-service-lifecycle",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/swift-server/swift-service-lifecycle.git",
|
||||
"state" : {
|
||||
"revision" : "f70b838872863396a25694d8b19fe58bcd0b7903",
|
||||
"version" : "2.6.2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-syntax",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/swiftlang/swift-syntax",
|
||||
"state" : {
|
||||
"revision" : "0687f71944021d616d34d922343dcef086855920",
|
||||
"version" : "600.0.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-system",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-system.git",
|
||||
"state" : {
|
||||
"revision" : "c8a44d836fe7913603e246acab7c528c2e780168",
|
||||
"version" : "1.4.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-tagged",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/swift-tagged",
|
||||
"state" : {
|
||||
"revision" : "3907a9438f5b57d317001dc99f3f11b46882272b",
|
||||
"version" : "0.10.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "xctest-dynamic-overlay",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
|
||||
"state" : {
|
||||
"revision" : "770f990d3e4eececb57ac04a6076e22f8c97daeb",
|
||||
"version" : "1.4.2"
|
||||
}
|
||||
}
|
||||
],
|
||||
"version" : 3
|
||||
}
|
||||
|
||||
103
Package.swift
Normal file → Executable file
103
Package.swift
Normal file → Executable file
@@ -2,99 +2,88 @@
|
||||
|
||||
import PackageDescription
|
||||
|
||||
let swiftSettings: [SwiftSetting] = [
|
||||
.enableExperimentalFeature("StrictConcurrency"),
|
||||
.enableUpcomingFeature("InferSendableCaptures")
|
||||
]
|
||||
|
||||
let package = Package(
|
||||
name: "dewPoint-controller",
|
||||
platforms: [
|
||||
.macOS(.v12)
|
||||
.macOS(.v14)
|
||||
],
|
||||
products: [
|
||||
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
|
||||
.library(name: "Bootstrap", targets: ["Bootstrap"]),
|
||||
.library(name: "DewPointEnvironment", targets: ["DewPointEnvironment"]),
|
||||
.library(name: "EnvVars", targets: ["EnvVars"]),
|
||||
.library(name: "Models", targets: ["Models"]),
|
||||
.library(name: "Client", targets: ["Client"]),
|
||||
.library(name: "ClientLive", targets: ["ClientLive"]),
|
||||
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
|
||||
.library(name: "SensorsService", targets: ["SensorsService"])
|
||||
],
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
|
||||
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
|
||||
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", from: "0.1.0")
|
||||
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
|
||||
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
|
||||
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
|
||||
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
|
||||
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
|
||||
],
|
||||
targets: [
|
||||
.executableTarget(
|
||||
name: "dewPoint-controller",
|
||||
dependencies: [
|
||||
"Bootstrap",
|
||||
"ClientLive",
|
||||
"TopicsLive",
|
||||
"Models",
|
||||
"MQTTConnectionService",
|
||||
"SensorsService",
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
.product(name: "NIO", package: "swift-nio")
|
||||
.product(name: "NIO", package: "swift-nio"),
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||
]
|
||||
),
|
||||
.testTarget(
|
||||
name: "dewPoint-controllerTests",
|
||||
dependencies: ["dewPoint-controller"]
|
||||
),
|
||||
.target(
|
||||
name: "Bootstrap",
|
||||
dependencies: [
|
||||
"DewPointEnvironment",
|
||||
"EnvVars",
|
||||
"ClientLive",
|
||||
"Models",
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
.product(name: "NIO", package: "swift-nio")
|
||||
]
|
||||
),
|
||||
.target(
|
||||
name: "DewPointEnvironment",
|
||||
dependencies: [
|
||||
"EnvVars",
|
||||
"Client",
|
||||
"Models",
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
]
|
||||
),
|
||||
.target(
|
||||
name: "EnvVars",
|
||||
dependencies: []
|
||||
),
|
||||
.target(
|
||||
name: "Models",
|
||||
dependencies: [
|
||||
.product(name: "Psychrometrics", package: "swift-psychrometrics"),
|
||||
]
|
||||
.product(name: "Logging", package: "swift-log"),
|
||||
.product(name: "PsychrometricClient", package: "swift-psychrometrics")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.target(
|
||||
name: "Client",
|
||||
name: "MQTTConnectionService",
|
||||
dependencies: [
|
||||
"Models",
|
||||
.product(name: "CoreUnitTypes", package: "swift-psychrometrics"),
|
||||
.product(name: "NIO", package: "swift-nio"),
|
||||
.product(name: "Psychrometrics", package: "swift-psychrometrics")
|
||||
]
|
||||
),
|
||||
.target(
|
||||
name: "ClientLive",
|
||||
dependencies: [
|
||||
"Client",
|
||||
"EnvVars",
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||
]
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.testTarget(
|
||||
name: "ClientTests",
|
||||
name: "MQTTConnectionServiceTests",
|
||||
dependencies: [
|
||||
"Client",
|
||||
"ClientLive"
|
||||
"MQTTConnectionService",
|
||||
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
|
||||
]
|
||||
),
|
||||
.target(
|
||||
name: "TopicsLive",
|
||||
name: "SensorsService",
|
||||
dependencies: [
|
||||
"Models"
|
||||
]
|
||||
"Models",
|
||||
"MQTTConnectionService",
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.testTarget(
|
||||
name: "SensorsServiceTests",
|
||||
dependencies: [
|
||||
"SensorsService",
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
@@ -1,167 +0,0 @@
|
||||
import ClientLive
|
||||
import DewPointEnvironment
|
||||
import EnvVars
|
||||
import Logging
|
||||
import Foundation
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
/// Sets up the application environment and connections required.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - eventLoopGroup: The event loop group for the application.
|
||||
/// - logger: An optional logger for debugging.
|
||||
/// - autoConnect: A flag whether to auto-connect to the MQTT broker or not.
|
||||
public func bootstrap(
|
||||
eventLoopGroup: EventLoopGroup,
|
||||
logger: Logger? = nil,
|
||||
autoConnect: Bool = true
|
||||
) -> EventLoopFuture<DewPointEnvironment> {
|
||||
|
||||
logger?.debug("Bootstrapping Dew Point Controller...")
|
||||
|
||||
return loadEnvVars(eventLoopGroup: eventLoopGroup, logger: logger)
|
||||
.and(loadTopics(eventLoopGroup: eventLoopGroup, logger: logger))
|
||||
.makeDewPointEnvironment(eventLoopGroup: eventLoopGroup, logger: logger)
|
||||
.connectToMQTTBroker(autoConnect: autoConnect, logger: logger)
|
||||
}
|
||||
|
||||
/// Loads the ``EnvVars`` either using the defualts, from a file in the root directory under `.dewPoint-env` or in the shell / application environment.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - eventLoopGroup: The event loop group for the application.
|
||||
/// - logger: An optional logger for debugging.
|
||||
private func loadEnvVars(
|
||||
eventLoopGroup: EventLoopGroup,
|
||||
logger: Logger?
|
||||
) -> EventLoopFuture<EnvVars> {
|
||||
|
||||
logger?.debug("Loading env vars...")
|
||||
|
||||
// TODO: Need to have the env file path passed in / dynamic.
|
||||
let envFilePath = URL(fileURLWithPath: #file)
|
||||
.deletingLastPathComponent()
|
||||
.deletingLastPathComponent()
|
||||
.deletingLastPathComponent()
|
||||
.appendingPathComponent(".dewPoint-env")
|
||||
|
||||
let decoder = JSONDecoder()
|
||||
let encoder = JSONEncoder()
|
||||
|
||||
let defaultEnvVars = EnvVars()
|
||||
|
||||
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
|
||||
.flatMap { try? decoder.decode([String: String].self, from: $0) }
|
||||
?? [:]
|
||||
|
||||
// Read from file `.dewPoint-env` file if it exists.
|
||||
let localEnvVarsDict = (try? Data(contentsOf: envFilePath))
|
||||
.flatMap { try? decoder.decode([String: String].self, from: $0) }
|
||||
?? [:]
|
||||
|
||||
// Merge with variables in the shell environment.
|
||||
let envVarsDict = defaultEnvDict
|
||||
.merging(localEnvVarsDict, uniquingKeysWith: { $1 })
|
||||
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
|
||||
|
||||
// Produces the final env vars from the merged items or uses defaults if something
|
||||
// went wrong.
|
||||
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
|
||||
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
|
||||
?? defaultEnvVars
|
||||
|
||||
logger?.debug("Done loading env vars...")
|
||||
return eventLoopGroup.next().makeSucceededFuture(envVars)
|
||||
}
|
||||
|
||||
// MARK: TODO perhaps make loading from file an option passed in when app is launched.
|
||||
/// Load the topics from file in application root directory at `.topics`, if available or fall back to the defualt.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - eventLoopGroup: The event loop group for the application.
|
||||
/// - logger: An optional logger for debugging.
|
||||
private func loadTopics(eventLoopGroup: EventLoopGroup, logger: Logger?) -> EventLoopFuture<Topics> {
|
||||
|
||||
logger?.debug("Loading topics from file...")
|
||||
|
||||
let topicsFilePath = URL(fileURLWithPath: #file)
|
||||
.deletingLastPathComponent()
|
||||
.deletingLastPathComponent()
|
||||
.deletingLastPathComponent()
|
||||
.appendingPathComponent(".topics")
|
||||
|
||||
let decoder = JSONDecoder()
|
||||
|
||||
// Attempt to load the topics from file in root directory.
|
||||
let localTopics = (try? Data.init(contentsOf: topicsFilePath))
|
||||
.flatMap { try? decoder.decode(Topics.self, from: $0) }
|
||||
|
||||
logger?.debug(
|
||||
localTopics == nil
|
||||
? "Failed to load topics from file, falling back to defaults."
|
||||
: "Done loading topics from file."
|
||||
)
|
||||
|
||||
// If we were able to load from file use that, else fallback to the defaults.
|
||||
return eventLoopGroup.next().makeSucceededFuture(localTopics ?? .init())
|
||||
}
|
||||
|
||||
extension EventLoopFuture where Value == (EnvVars, Topics) {
|
||||
|
||||
/// Creates the ``DewPointEnvironment`` for the application after the ``EnvVars`` have been loaded.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - eventLoopGroup: The event loop group for the application.
|
||||
/// - logger: An optional logger for the application.
|
||||
fileprivate func makeDewPointEnvironment(
|
||||
eventLoopGroup: EventLoopGroup,
|
||||
logger: Logger?
|
||||
) -> EventLoopFuture<DewPointEnvironment> {
|
||||
map { envVars, topics in
|
||||
let mqttClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger)
|
||||
return DewPointEnvironment.init(
|
||||
envVars: envVars,
|
||||
mqttClient: mqttClient,
|
||||
topics: topics
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension EventLoopFuture where Value == DewPointEnvironment {
|
||||
|
||||
/// Connects to the MQTT broker after the ``DewPointEnvironment`` has been setup.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - logger: An optional logger for debugging.
|
||||
fileprivate func connectToMQTTBroker(autoConnect: Bool, logger: Logger?) -> EventLoopFuture<DewPointEnvironment> {
|
||||
guard autoConnect else { return self }
|
||||
return flatMap { environment in
|
||||
logger?.debug("Connecting to MQTT Broker...")
|
||||
return environment.mqttClient.connect()
|
||||
.map { _ in
|
||||
logger?.debug("Successfully connected to MQTT Broker...")
|
||||
return environment
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTNIO.MQTTClient {
|
||||
|
||||
fileprivate convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
|
||||
self.init(
|
||||
host: envVars.host,
|
||||
port: envVars.port != nil ? Int(envVars.port!) : nil,
|
||||
identifier: envVars.identifier,
|
||||
eventLoopGroupProvider: .shared(eventLoopGroup),
|
||||
logger: logger,
|
||||
configuration: .init(
|
||||
version: .v5_0,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
import CoreUnitTypes
|
||||
import Logging
|
||||
import Foundation
|
||||
import Models
|
||||
import NIO
|
||||
import Psychrometrics
|
||||
|
||||
public struct Client {
|
||||
|
||||
/// Add the publish listeners to the MQTT Broker, to be notified of published changes.
|
||||
public var addListeners: () -> Void
|
||||
|
||||
/// Connect to the MQTT Broker.
|
||||
public var connect: () -> EventLoopFuture<Void>
|
||||
|
||||
public var publishSensor: (SensorPublishRequest) -> EventLoopFuture<Void>
|
||||
|
||||
/// Subscribe to appropriate topics / events.
|
||||
public var subscribe: () -> EventLoopFuture<Void>
|
||||
|
||||
/// Disconnect and close the connection to the MQTT Broker.
|
||||
public var shutdown: () -> EventLoopFuture<Void>
|
||||
|
||||
public init(
|
||||
addListeners: @escaping () -> Void,
|
||||
connect: @escaping () -> EventLoopFuture<Void>,
|
||||
publishSensor: @escaping (SensorPublishRequest) -> EventLoopFuture<Void>,
|
||||
shutdown: @escaping () -> EventLoopFuture<Void>,
|
||||
subscribe: @escaping () -> EventLoopFuture<Void>
|
||||
) {
|
||||
self.addListeners = addListeners
|
||||
self.connect = connect
|
||||
self.publishSensor = publishSensor
|
||||
self.shutdown = shutdown
|
||||
self.subscribe = subscribe
|
||||
}
|
||||
|
||||
public enum SensorPublishRequest {
|
||||
case mixed(State.Sensors.TemperatureHumiditySensor<State.Sensors.Mixed>)
|
||||
case postCoil(State.Sensors.TemperatureHumiditySensor<State.Sensors.PostCoil>)
|
||||
case `return`(State.Sensors.TemperatureHumiditySensor<State.Sensors.Return>)
|
||||
case supply(State.Sensors.TemperatureHumiditySensor<State.Sensors.Supply>)
|
||||
}
|
||||
}
|
||||
@@ -1,261 +0,0 @@
|
||||
import CoreUnitTypes
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import NIOFoundationCompat
|
||||
import Psychrometrics
|
||||
|
||||
/// Represents a type that can be initialized by a ``ByteBuffer``.
|
||||
protocol BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer)
|
||||
}
|
||||
|
||||
extension Double: BufferInitalizable {
|
||||
|
||||
/// Attempt to create / parse a double from a byte buffer.
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let string = buffer.readString(
|
||||
length: buffer.readableBytes,
|
||||
encoding: String.Encoding.utf8
|
||||
)
|
||||
else { return nil }
|
||||
self.init(string)
|
||||
}
|
||||
}
|
||||
|
||||
extension Temperature: BufferInitalizable {
|
||||
/// Attempt to create / parse a temperature from a byte buffer.
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = Double(buffer: &buffer) else { return nil }
|
||||
self.init(value, units: .celsius)
|
||||
}
|
||||
}
|
||||
|
||||
extension RelativeHumidity: BufferInitalizable {
|
||||
/// Attempt to create / parse a relative humidity from a byte buffer.
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = Double(buffer: &buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTNIO.MQTTClient {
|
||||
/// Logs a failure for a given topic and error.
|
||||
func logFailure(topic: String, error: Error) {
|
||||
logger.error("\(topic): \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
extension Result where Success == MQTTPublishInfo {
|
||||
func logIfFailure(client: MQTTNIO.MQTTClient, topic: String) -> ByteBuffer? {
|
||||
switch self {
|
||||
case let .success(value):
|
||||
guard value.topicName == topic else { return nil }
|
||||
return value.payload
|
||||
case let .failure(error):
|
||||
client.logFailure(topic: topic, error: error)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Optional where Wrapped == ByteBuffer {
|
||||
|
||||
func parse<T>(as type: T.Type) -> T? where T: BufferInitalizable {
|
||||
switch self {
|
||||
case var .some(buffer):
|
||||
return T.init(buffer: &buffer)
|
||||
case .none:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fileprivate struct TemperatureAndHumiditySensorKeyPathEnvelope {
|
||||
|
||||
let humidityTopic: KeyPath<Topics.Sensors, String>
|
||||
let temperatureTopic: KeyPath<Topics.Sensors, String>
|
||||
let temperatureState: WritableKeyPath<State.Sensors, Temperature?>
|
||||
let humidityState: WritableKeyPath<State.Sensors, RelativeHumidity?>
|
||||
|
||||
func addListener(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
|
||||
|
||||
let temperatureTopic = topics.sensors[keyPath: temperatureTopic]
|
||||
client.logger.trace("Adding listener for topic: \(temperatureTopic)")
|
||||
client.addPublishListener(named: temperatureTopic) { result in
|
||||
result.logIfFailure(client: client, topic: temperatureTopic)
|
||||
.parse(as: Temperature.self)
|
||||
.map { temperature in
|
||||
state.sensors[keyPath: temperatureState] = temperature
|
||||
}
|
||||
}
|
||||
|
||||
let humidityTopic = topics.sensors[keyPath: humidityTopic]
|
||||
client.logger.trace("Adding listener for topic: \(humidityTopic)")
|
||||
client.addPublishListener(named: humidityTopic) { result in
|
||||
result.logIfFailure(client: client, topic: humidityTopic)
|
||||
.parse(as: RelativeHumidity.self)
|
||||
.map { humidity in
|
||||
state.sensors[keyPath: humidityState] = humidity
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Array where Element == TemperatureAndHumiditySensorKeyPathEnvelope {
|
||||
func addListeners(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
|
||||
_ = self.map { envelope in
|
||||
envelope.addListener(to: client, topics: topics, state: state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Array where Element == MQTTSubscribeInfo {
|
||||
static func sensors(topics: Topics) -> Self {
|
||||
[
|
||||
.init(topicFilter: topics.sensors.mixedAirSensor.temperature, qos: .atLeastOnce),
|
||||
.init(topicFilter: topics.sensors.mixedAirSensor.humidity, qos: .atLeastOnce),
|
||||
.init(topicFilter: topics.sensors.postCoilSensor.temperature, qos: .atLeastOnce),
|
||||
.init(topicFilter: topics.sensors.postCoilSensor.humidity, qos: .atLeastOnce),
|
||||
.init(topicFilter: topics.sensors.returnAirSensor.temperature, qos: .atLeastOnce),
|
||||
.init(topicFilter: topics.sensors.returnAirSensor.humidity, qos: .atLeastOnce),
|
||||
.init(topicFilter: topics.sensors.supplyAirSensor.temperature, qos: .atLeastOnce),
|
||||
.init(topicFilter: topics.sensors.supplyAirSensor.humidity, qos: .atLeastOnce),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
extension State {
|
||||
func addSensorListeners(to client: MQTTNIO.MQTTClient, topics: Topics) {
|
||||
let envelopes: [TemperatureAndHumiditySensorKeyPathEnvelope] = [
|
||||
.init(
|
||||
humidityTopic: \.mixedAirSensor.humidity,
|
||||
temperatureTopic: \.mixedAirSensor.temperature,
|
||||
temperatureState: \.mixedAirSensor.temperature,
|
||||
humidityState: \.mixedAirSensor.humidity
|
||||
),
|
||||
.init(
|
||||
humidityTopic: \.postCoilSensor.humidity,
|
||||
temperatureTopic: \.postCoilSensor.temperature,
|
||||
temperatureState: \.postCoilSensor.temperature,
|
||||
humidityState: \.postCoilSensor.humidity
|
||||
),
|
||||
.init(
|
||||
humidityTopic: \.returnAirSensor.humidity,
|
||||
temperatureTopic: \.returnAirSensor.temperature,
|
||||
temperatureState: \.returnAirSensor.temperature,
|
||||
humidityState: \.returnAirSensor.humidity
|
||||
),
|
||||
.init(
|
||||
humidityTopic: \.supplyAirSensor.humidity,
|
||||
temperatureTopic: \.supplyAirSensor.temperature,
|
||||
temperatureState: \.supplyAirSensor.temperature,
|
||||
humidityState: \.supplyAirSensor.humidity
|
||||
),
|
||||
]
|
||||
envelopes.addListeners(to: client, topics: topics, state: self)
|
||||
}
|
||||
}
|
||||
|
||||
extension Client.SensorPublishRequest {
|
||||
|
||||
func dewPointData(topics: Topics, units: PsychrometricEnvironment.Units?) -> (DewPoint, String)? {
|
||||
switch self {
|
||||
case let .mixed(sensor):
|
||||
guard let dp = sensor.dewPoint(units: units) else { return nil }
|
||||
return (dp, topics.sensors.mixedAirSensor.dewPoint)
|
||||
case let .postCoil(sensor):
|
||||
guard let dp = sensor.dewPoint(units: units) else { return nil }
|
||||
return (dp, topics.sensors.postCoilSensor.dewPoint)
|
||||
case let .return(sensor):
|
||||
guard let dp = sensor.dewPoint(units: units) else { return nil }
|
||||
return (dp, topics.sensors.returnAirSensor.dewPoint)
|
||||
case let .supply(sensor):
|
||||
guard let dp = sensor.dewPoint(units: units) else { return nil }
|
||||
return (dp, topics.sensors.supplyAirSensor.dewPoint)
|
||||
}
|
||||
}
|
||||
|
||||
func enthalpyData(altitude: Length, topics: Topics, units: PsychrometricEnvironment.Units?) -> (EnthalpyOf<MoistAir>, String)? {
|
||||
switch self {
|
||||
case let .mixed(sensor):
|
||||
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
|
||||
return (enthalpy, topics.sensors.mixedAirSensor.enthalpy)
|
||||
case let .postCoil(sensor):
|
||||
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
|
||||
return (enthalpy, topics.sensors.postCoilSensor.enthalpy)
|
||||
case let .return(sensor):
|
||||
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
|
||||
return (enthalpy, topics.sensors.returnAirSensor.enthalpy)
|
||||
case let .supply(sensor):
|
||||
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
|
||||
return (enthalpy, topics.sensors.supplyAirSensor.enthalpy)
|
||||
}
|
||||
}
|
||||
|
||||
func setHasProcessed(state: State) {
|
||||
switch self {
|
||||
case .mixed:
|
||||
state.sensors.mixedAirSensor.needsProcessed = false
|
||||
case .postCoil:
|
||||
state.sensors.postCoilSensor.needsProcessed = false
|
||||
case .return:
|
||||
state.sensors.returnAirSensor.needsProcessed = false
|
||||
case .supply:
|
||||
state.sensors.supplyAirSensor.needsProcessed = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTNIO.MQTTClient {
|
||||
|
||||
func publishDewPoint(
|
||||
request: Client.SensorPublishRequest,
|
||||
state: State,
|
||||
topics: Topics
|
||||
) -> EventLoopFuture<(MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics)> {
|
||||
guard let (dewPoint, topic) = request.dewPointData(topics: topics, units: state.units)
|
||||
else {
|
||||
logger.trace("No dew point for sensor.")
|
||||
return eventLoopGroup.next().makeSucceededFuture((self, request, state, topics))
|
||||
}
|
||||
let roundedDewPoint = round(dewPoint.rawValue * 100) / 100
|
||||
logger.debug("Publishing dew-point: \(dewPoint), to: \(topic)")
|
||||
return publish(
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(roundedDewPoint)"),
|
||||
qos: .atLeastOnce,
|
||||
retain: true
|
||||
)
|
||||
.map { (self, request, state, topics) }
|
||||
}
|
||||
}
|
||||
|
||||
extension EventLoopFuture where Value == (Client.SensorPublishRequest, State) {
|
||||
func setHasProcessed() -> EventLoopFuture<Void> {
|
||||
map { request, state in
|
||||
request.setHasProcessed(state: state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension EventLoopFuture where Value == (MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics) {
|
||||
func publishEnthalpy() -> EventLoopFuture<(Client.SensorPublishRequest, State)> {
|
||||
flatMap { client, request, state, topics in
|
||||
guard let (enthalpy, topic) = request.enthalpyData(altitude: state.altitude, topics: topics, units: state.units)
|
||||
else {
|
||||
client.logger.trace("No enthalpy for sensor.")
|
||||
return client.eventLoopGroup.next().makeSucceededFuture((request, state))
|
||||
}
|
||||
let roundedEnthalpy = round(enthalpy.rawValue * 100) / 100
|
||||
client.logger.debug("Publishing enthalpy: \(enthalpy), to: \(topic)")
|
||||
return client.publish(
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(roundedEnthalpy)"),
|
||||
qos: .atLeastOnce
|
||||
)
|
||||
.map { (request, state) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,158 +0,0 @@
|
||||
import Foundation
|
||||
@_exported import Client
|
||||
import CoreUnitTypes
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import Psychrometrics
|
||||
|
||||
extension Client {
|
||||
|
||||
// The state passed in here needs to be a class or we get escaping errors in the `addListeners` method.
|
||||
public static func live(
|
||||
client: MQTTNIO.MQTTClient,
|
||||
state: State,
|
||||
topics: Topics
|
||||
) -> Self {
|
||||
.init(
|
||||
addListeners: {
|
||||
state.addSensorListeners(to: client, topics: topics)
|
||||
},
|
||||
connect: {
|
||||
client.connect()
|
||||
.map { _ in }
|
||||
},
|
||||
publishSensor: { request in
|
||||
client.publishDewPoint(request: request, state: state, topics: topics)
|
||||
.publishEnthalpy()
|
||||
.setHasProcessed()
|
||||
},
|
||||
shutdown: {
|
||||
client.disconnect()
|
||||
.map { try? client.syncShutdownGracefully() }
|
||||
},
|
||||
subscribe: {
|
||||
// Sensor subscriptions
|
||||
client.subscribe(to: .sensors(topics: topics))
|
||||
.map { _ in }
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
import Logging
|
||||
import NIOTransportServices
|
||||
import EnvVars
|
||||
|
||||
public class AsyncClient {
|
||||
//public static let eventLoopGroup = NIOTSEventLoopGroup()
|
||||
public static let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
public let client: MQTTClient
|
||||
public private(set) var shuttingDown: Bool
|
||||
|
||||
var logger: Logger { client.logger }
|
||||
|
||||
public init(envVars: EnvVars, logger: Logger) {
|
||||
let config = MQTTClient.Configuration.init(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
self.client = .init(
|
||||
host: envVars.host,
|
||||
identifier: envVars.identifier,
|
||||
eventLoopGroupProvider: .shared(Self.eventLoopGroup),
|
||||
logger: logger,
|
||||
configuration: config
|
||||
)
|
||||
self.shuttingDown = false
|
||||
}
|
||||
|
||||
public func connect() async {
|
||||
do {
|
||||
try await self.client.connect()
|
||||
self.client.addCloseListener(named: "AsyncClient") { [self] result in
|
||||
guard !self.shuttingDown else { return }
|
||||
Task {
|
||||
self.logger.debug("Connection closed.")
|
||||
self.logger.debug("Reconnecting...")
|
||||
await self.connect()
|
||||
}
|
||||
}
|
||||
logger.debug("Connection successful.")
|
||||
} catch {
|
||||
logger.trace("Connection Failed.\n\(error)")
|
||||
}
|
||||
}
|
||||
|
||||
public func shutdown() async {
|
||||
self.shuttingDown = true
|
||||
try? await self.client.disconnect()
|
||||
try? await self.client.shutdown()
|
||||
}
|
||||
|
||||
func addSensorListeners() async {
|
||||
|
||||
}
|
||||
|
||||
// Need to save the recieved values somewhere.
|
||||
func addPublishListener<T>(
|
||||
topic: String,
|
||||
decoding: T.Type
|
||||
) async throws where T: BufferInitalizable {
|
||||
_ = try await self.client.subscribe(to: [.init(topicFilter: topic, qos: .atLeastOnce)])
|
||||
Task {
|
||||
let listener = self.client.createPublishListener()
|
||||
for await result in listener {
|
||||
switch result {
|
||||
case let .success(packet):
|
||||
var buffer = packet.payload
|
||||
guard let value = T.init(buffer: &buffer) else {
|
||||
logger.debug("Could not decode buffer: \(buffer)")
|
||||
return
|
||||
}
|
||||
logger.debug("Recieved value: \(value)")
|
||||
case let .failure(error):
|
||||
logger.trace("Error:\n\(error)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private func publish(string: String, to topic: String) async throws {
|
||||
try await self.client.publish(
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: string),
|
||||
qos: .atLeastOnce
|
||||
)
|
||||
}
|
||||
|
||||
private func publish(double: Double, to topic: String) async throws {
|
||||
let rounded = round(double * 100) / 100
|
||||
try await publish(string: "\(rounded)", to: topic)
|
||||
}
|
||||
|
||||
func publishDewPoint(_ request: Client.SensorPublishRequest) async throws {
|
||||
// fix
|
||||
guard let (dewPoint, topic) = request.dewPointData(topics: .init(), units: nil) else { return }
|
||||
try await self.publish(double: dewPoint.rawValue, to: topic)
|
||||
logger.debug("Published dewpoint: \(dewPoint.rawValue), to: \(topic)")
|
||||
}
|
||||
|
||||
func publishEnthalpy(_ request: Client.SensorPublishRequest) async throws {
|
||||
// fix
|
||||
guard let (enthalpy, topic) = request.enthalpyData(altitude: .seaLevel, topics: .init(), units: nil) else { return }
|
||||
try await self.publish(double: enthalpy.rawValue, to: topic)
|
||||
logger.debug("Publihsed enthalpy: \(enthalpy.rawValue), to: \(topic)")
|
||||
}
|
||||
|
||||
public func publishSensor(_ request: Client.SensorPublishRequest) async throws {
|
||||
try await publishDewPoint(request)
|
||||
try await publishEnthalpy(request)
|
||||
}
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
import Client
|
||||
import EnvVars
|
||||
import Models
|
||||
import MQTTNIO
|
||||
|
||||
public struct DewPointEnvironment {
|
||||
|
||||
public var envVars: EnvVars
|
||||
public var mqttClient: MQTTNIO.MQTTClient
|
||||
public var topics: Topics
|
||||
|
||||
public init(
|
||||
envVars: EnvVars,
|
||||
mqttClient: MQTTNIO.MQTTClient,
|
||||
topics: Topics = .init()
|
||||
) {
|
||||
self.envVars = envVars
|
||||
self.mqttClient = mqttClient
|
||||
self.topics = topics
|
||||
}
|
||||
}
|
||||
151
Sources/MQTTConnectionService/MQTTConnectionService.swift
Normal file
151
Sources/MQTTConnectionService/MQTTConnectionService.swift
Normal file
@@ -0,0 +1,151 @@
|
||||
@preconcurrency import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import ServiceLifecycle
|
||||
|
||||
// TODO: This may not need to be an actor.
|
||||
|
||||
/// Manages the MQTT broker connection.
|
||||
public actor MQTTConnectionService: Service {
|
||||
|
||||
private let cleanSession: Bool
|
||||
public let client: MQTTClient
|
||||
private let continuation: AsyncStream<Event>.Continuation
|
||||
public nonisolated let events: AsyncStream<Event>
|
||||
private let internalEventStream: ConnectionStream
|
||||
nonisolated var logger: Logger { client.logger }
|
||||
// private var shuttingDown = false
|
||||
|
||||
public init(
|
||||
cleanSession: Bool = true,
|
||||
client: MQTTClient
|
||||
) {
|
||||
self.cleanSession = cleanSession
|
||||
self.client = client
|
||||
self.internalEventStream = .init()
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
self.logger.debug("MQTTConnectionService is gone.")
|
||||
self.internalEventStream.stop()
|
||||
continuation.finish()
|
||||
}
|
||||
|
||||
/// The entry-point of the service.
|
||||
///
|
||||
/// This method connects to the MQTT broker and manages the connection.
|
||||
/// It will attempt to gracefully shutdown the connection upon receiving
|
||||
/// `sigterm` signals.
|
||||
public func run() async throws {
|
||||
await withGracefulShutdownHandler {
|
||||
await withDiscardingTaskGroup { group in
|
||||
group.addTask { await self.connect() }
|
||||
group.addTask {
|
||||
await self.internalEventStream.start { self.client.isActive() }
|
||||
}
|
||||
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() {
|
||||
if event == .shuttingDown {
|
||||
self.shutdown()
|
||||
break
|
||||
}
|
||||
self.logger.trace("Sending connection event: \(event)")
|
||||
self.continuation.yield(event)
|
||||
}
|
||||
group.cancelAll()
|
||||
}
|
||||
} onGracefulShutdown: {
|
||||
self.logger.trace("Received graceful shutdown.")
|
||||
self.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func connect() async {
|
||||
do {
|
||||
try await withThrowingDiscardingTaskGroup { group in
|
||||
group.addTask {
|
||||
try await self.client.connect(cleanSession: self.cleanSession)
|
||||
}
|
||||
client.addCloseListener(named: "\(Self.self)") { _ in
|
||||
Task {
|
||||
self.logger.debug("Connection closed.")
|
||||
self.logger.debug("Reconnecting...")
|
||||
await self.connect()
|
||||
}
|
||||
}
|
||||
self.logger.debug("Connection successful.")
|
||||
self.continuation.yield(.connected)
|
||||
}
|
||||
} catch {
|
||||
logger.trace("Failed to connect: \(error)")
|
||||
continuation.yield(.disconnected)
|
||||
}
|
||||
}
|
||||
|
||||
private nonisolated func shutdown() {
|
||||
logger.debug("Begin shutting down MQTT broker connection.")
|
||||
client.removeCloseListener(named: "\(Self.self)")
|
||||
internalEventStream.stop()
|
||||
_ = client.disconnect()
|
||||
try? client.syncShutdownGracefully()
|
||||
continuation.finish()
|
||||
logger.info("MQTT broker connection closed.")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
extension MQTTConnectionService {
|
||||
|
||||
public enum Event: Sendable {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
// TODO: This functionality can probably move into the connection service.
|
||||
|
||||
private final class ConnectionStream: Sendable {
|
||||
|
||||
// private var cancellable: AnyCancellable?
|
||||
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
|
||||
let events: AsyncStream<MQTTConnectionService.Event>
|
||||
|
||||
init() {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
stop()
|
||||
}
|
||||
|
||||
func start(isActive connectionIsActive: @escaping () -> Bool) async {
|
||||
try? await Task.sleep(for: .seconds(1))
|
||||
let event: MQTTConnectionService.Event = connectionIsActive()
|
||||
? .connected
|
||||
: .disconnected
|
||||
|
||||
continuation.yield(event)
|
||||
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
||||
// .autoconnect()
|
||||
// .sink { [weak self] (_: Date) in
|
||||
// let event: MQTTConnectionService.Event = connectionIsActive()
|
||||
// ? .connected
|
||||
// : .disconnected
|
||||
//
|
||||
// self?.continuation.yield(event)
|
||||
// }
|
||||
}
|
||||
|
||||
func stop() {
|
||||
continuation.yield(.shuttingDown)
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
7
Sources/EnvVars/EnvVars.swift → Sources/Models/EnvVars.swift
Normal file → Executable file
7
Sources/EnvVars/EnvVars.swift → Sources/Models/EnvVars.swift
Normal file → Executable file
@@ -1,10 +1,11 @@
|
||||
import Foundation
|
||||
import Logging
|
||||
|
||||
/// Holds common settings for connecting to your MQTT broker. The default values can be used,
|
||||
/// they can be loaded from the shell environment, or from a file located in the root directory.
|
||||
///
|
||||
/// This allows us to keep sensitve settings out of the repository.
|
||||
public struct EnvVars: Codable, Equatable {
|
||||
public struct EnvVars: Codable, Equatable, Sendable {
|
||||
|
||||
/// The current app environment.
|
||||
public var appEnv: AppEnv
|
||||
@@ -40,7 +41,7 @@ public struct EnvVars: Codable, Equatable {
|
||||
identifier: String = "dewPoint-controller",
|
||||
userName: String? = "mqtt_user",
|
||||
password: String? = "secret!"
|
||||
){
|
||||
) {
|
||||
self.appEnv = appEnv
|
||||
self.host = host
|
||||
self.port = port
|
||||
@@ -60,7 +61,7 @@ public struct EnvVars: Codable, Equatable {
|
||||
}
|
||||
|
||||
/// Represents the different app environments.
|
||||
public enum AppEnv: String, Codable {
|
||||
public enum AppEnv: String, Codable, Sendable {
|
||||
case development
|
||||
case production
|
||||
case staging
|
||||
@@ -1,37 +0,0 @@
|
||||
import CoreUnitTypes
|
||||
|
||||
/// Represents the different modes that the controller can be in.
|
||||
public enum Mode: Equatable {
|
||||
|
||||
/// Allows controller to run in humidify or dehumidify mode.
|
||||
case auto
|
||||
|
||||
/// Only handle humidify mode.
|
||||
case humidifyOnly(HumidifyMode)
|
||||
|
||||
/// Only handle dehumidify mode.
|
||||
case dehumidifyOnly(DehumidifyMode)
|
||||
|
||||
/// Don't control humidify or dehumidify modes.
|
||||
case off
|
||||
|
||||
/// Represents the control modes for the humidify control state.
|
||||
public enum HumidifyMode: Equatable {
|
||||
|
||||
/// Control humidifying based off dew-point.
|
||||
case dewPoint(Temperature)
|
||||
|
||||
/// Control humidifying based off relative humidity.
|
||||
case relativeHumidity(RelativeHumidity)
|
||||
}
|
||||
|
||||
/// Represents the control modes for the dehumidify control state.
|
||||
public enum DehumidifyMode: Equatable {
|
||||
|
||||
/// Control dehumidifying based off dew-point.
|
||||
case dewPoint(high: Temperature, low: Temperature)
|
||||
|
||||
/// Control humidifying based off relative humidity.
|
||||
case relativeHumidity(high: RelativeHumidity, low: RelativeHumidity)
|
||||
}
|
||||
}
|
||||
@@ -1,104 +0,0 @@
|
||||
import Foundation
|
||||
import Psychrometrics
|
||||
|
||||
// TODO: Make this a struct, then create a Store class that holds the state??
|
||||
public final class State {
|
||||
|
||||
public var altitude: Length
|
||||
public var sensors: Sensors
|
||||
public var units: PsychrometricEnvironment.Units {
|
||||
didSet {
|
||||
PsychrometricEnvironment.shared.units = units
|
||||
}
|
||||
}
|
||||
|
||||
public init(
|
||||
altitude: Length = .seaLevel,
|
||||
sensors: Sensors = .init(),
|
||||
units: PsychrometricEnvironment.Units = .imperial
|
||||
) {
|
||||
self.altitude = altitude
|
||||
self.sensors = sensors
|
||||
self.units = units
|
||||
}
|
||||
|
||||
public struct Sensors: Equatable {
|
||||
|
||||
public var mixedAirSensor: TemperatureHumiditySensor<Mixed>
|
||||
public var postCoilSensor: TemperatureHumiditySensor<PostCoil>
|
||||
public var returnAirSensor: TemperatureHumiditySensor<Return>
|
||||
public var supplyAirSensor: TemperatureHumiditySensor<Supply>
|
||||
|
||||
public init(
|
||||
mixedAirSensor: TemperatureHumiditySensor<Mixed> = .init(),
|
||||
postCoilSensor: TemperatureHumiditySensor<PostCoil> = .init(),
|
||||
returnAirSensor: TemperatureHumiditySensor<Return> = .init(),
|
||||
supplyAirSensor: TemperatureHumiditySensor<Supply> = .init()
|
||||
) {
|
||||
self.mixedAirSensor = mixedAirSensor
|
||||
self.postCoilSensor = postCoilSensor
|
||||
self.returnAirSensor = returnAirSensor
|
||||
self.supplyAirSensor = supplyAirSensor
|
||||
}
|
||||
|
||||
public var needsProcessed: Bool {
|
||||
mixedAirSensor.needsProcessed
|
||||
|| postCoilSensor.needsProcessed
|
||||
|| returnAirSensor.needsProcessed
|
||||
|| supplyAirSensor.needsProcessed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension State.Sensors {
|
||||
|
||||
public struct TemperatureHumiditySensor<Location>: Equatable {
|
||||
|
||||
@TrackedChanges
|
||||
public var temperature: Temperature?
|
||||
|
||||
@TrackedChanges
|
||||
public var humidity: RelativeHumidity?
|
||||
|
||||
public var needsProcessed: Bool {
|
||||
get { $temperature.needsProcessed || $humidity.needsProcessed }
|
||||
set {
|
||||
$temperature.needsProcessed = newValue
|
||||
$humidity.needsProcessed = newValue
|
||||
}
|
||||
}
|
||||
|
||||
public func dewPoint(units: PsychrometricEnvironment.Units? = nil) -> DewPoint? {
|
||||
guard let temperature = temperature,
|
||||
let humidity = humidity,
|
||||
!temperature.rawValue.isNaN,
|
||||
!humidity.rawValue.isNaN
|
||||
else { return nil }
|
||||
return .init(dryBulb: temperature, humidity: humidity, units: units)
|
||||
}
|
||||
|
||||
public func enthalpy(altitude: Length, units: PsychrometricEnvironment.Units? = nil) -> EnthalpyOf<MoistAir>? {
|
||||
guard let temperature = temperature,
|
||||
let humidity = humidity,
|
||||
!temperature.rawValue.isNaN,
|
||||
!humidity.rawValue.isNaN
|
||||
else { return nil }
|
||||
return .init(dryBulb: temperature, humidity: humidity, altitude: altitude, units: units)
|
||||
}
|
||||
|
||||
public init(
|
||||
temperature: Temperature? = nil,
|
||||
humidity: RelativeHumidity? = nil,
|
||||
needsProcessed: Bool = false
|
||||
) {
|
||||
self._temperature = .init(wrappedValue: temperature, needsProcessed: needsProcessed)
|
||||
self._humidity = .init(wrappedValue: humidity, needsProcessed: needsProcessed)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Temperature / Humidity Sensor Location Namespaces
|
||||
public enum Mixed { }
|
||||
public enum PostCoil { }
|
||||
public enum Return { }
|
||||
public enum Supply { }
|
||||
}
|
||||
139
Sources/Models/TemperatureAndHumiditySensor.swift
Normal file
139
Sources/Models/TemperatureAndHumiditySensor.swift
Normal file
@@ -0,0 +1,139 @@
|
||||
import Dependencies
|
||||
import PsychrometricClient
|
||||
|
||||
/// Represents a temperature and humidity sensor that can be used to derive
|
||||
/// the dew-point temperature and enthalpy values.
|
||||
///
|
||||
/// > Note: Temperature values are received in `celsius`.
|
||||
public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
|
||||
|
||||
@Dependency(\.psychrometricClient) private var psychrometrics
|
||||
|
||||
/// The identifier of the sensor, same as the location.
|
||||
public var id: Location { location }
|
||||
|
||||
/// The altitude of the sensor.
|
||||
public let altitude: Length
|
||||
|
||||
/// The current humidity value of the sensor.
|
||||
@TrackedChanges
|
||||
public var humidity: RelativeHumidity?
|
||||
|
||||
/// The location identifier of the sensor
|
||||
public let location: Location
|
||||
|
||||
/// The current temperature value of the sensor.
|
||||
@TrackedChanges
|
||||
public var temperature: DryBulb?
|
||||
|
||||
/// The topics to listen for updated sensor values.
|
||||
public let topics: Topics
|
||||
|
||||
/// Create a new temperature and humidity sensor.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - location: The location of the sensor.
|
||||
/// - altitude: The altitude of the sensor.
|
||||
/// - temperature: The current temperature value of the sensor.
|
||||
/// - humidity: The current relative humidity value of the sensor.
|
||||
/// - needsProcessed: If the sensor needs to be processed.
|
||||
public init(
|
||||
location: Location,
|
||||
altitude: Length = .feet(800.0),
|
||||
temperature: DryBulb? = nil,
|
||||
humidity: RelativeHumidity? = nil,
|
||||
needsProcessed: Bool = false,
|
||||
topics: Topics? = nil
|
||||
) {
|
||||
self.altitude = altitude
|
||||
self.location = location
|
||||
self._temperature = TrackedChanges(wrappedValue: temperature, needsProcessed: needsProcessed)
|
||||
self._humidity = TrackedChanges(wrappedValue: humidity, needsProcessed: needsProcessed)
|
||||
self.topics = topics ?? .init(location: location)
|
||||
}
|
||||
|
||||
/// The calculated dew-point temperature of the sensor.
|
||||
public var dewPoint: DewPoint? {
|
||||
get async {
|
||||
guard let temperature = temperature,
|
||||
let humidity = humidity
|
||||
else { return nil }
|
||||
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
|
||||
// return .init(dryBulb: temperature, humidity: humidity)
|
||||
}
|
||||
}
|
||||
|
||||
/// The calculated enthalpy of the sensor.
|
||||
public var enthalpy: EnthalpyOf<MoistAir>? {
|
||||
get async {
|
||||
guard let temperature = temperature,
|
||||
let humidity = humidity
|
||||
else { return nil }
|
||||
return try? await psychrometrics.enthalpy.moistAir(
|
||||
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
|
||||
)
|
||||
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
|
||||
}
|
||||
}
|
||||
|
||||
/// Check whether any of the sensor values have changed and need processed.
|
||||
///
|
||||
/// - Note: Setting a value will set to both the temperature and humidity properties.
|
||||
public var needsProcessed: Bool {
|
||||
get { $temperature.needsProcessed || $humidity.needsProcessed }
|
||||
set {
|
||||
$temperature.needsProcessed = newValue
|
||||
$humidity.needsProcessed = newValue
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the different locations of a temperature and humidity sensor, which can
|
||||
/// be used to derive the topic to both listen and publish new values to.
|
||||
public enum Location: String, CaseIterable, Equatable, Hashable, Sendable {
|
||||
case mixedAir = "mixed_air"
|
||||
case postCoil = "post_coil"
|
||||
case `return`
|
||||
case supply
|
||||
}
|
||||
|
||||
/// Represents the MQTT topics to listen for updated sensor values on.
|
||||
public struct Topics: Equatable, Hashable, Sendable {
|
||||
|
||||
/// The dew-point temperature topic for the sensor.
|
||||
public let dewPoint: String
|
||||
|
||||
/// The enthalpy topic for the sensor.
|
||||
public let enthalpy: String
|
||||
|
||||
/// The humidity topic of the sensor.
|
||||
public let humidity: String
|
||||
|
||||
/// The temperature topic of the sensor.
|
||||
public let temperature: String
|
||||
|
||||
public init(
|
||||
dewPoint: String,
|
||||
enthalpy: String,
|
||||
humidity: String,
|
||||
temperature: String
|
||||
) {
|
||||
self.dewPoint = dewPoint
|
||||
self.enthalpy = enthalpy
|
||||
self.humidity = humidity
|
||||
self.temperature = temperature
|
||||
}
|
||||
|
||||
public init(topicPrefix: String? = "frankensystem", location: TemperatureAndHumiditySensor.Location) {
|
||||
var prefix = topicPrefix ?? ""
|
||||
if prefix.reversed().starts(with: "/") {
|
||||
prefix = "\(prefix.dropLast())"
|
||||
}
|
||||
self.init(
|
||||
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state",
|
||||
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state",
|
||||
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state",
|
||||
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,269 +0,0 @@
|
||||
|
||||
/// A container for all the different topics that are needed by the application.
|
||||
public struct Topics: Codable, Equatable {
|
||||
|
||||
/// The command topics the application can publish to.
|
||||
public var commands: Commands
|
||||
|
||||
/// The sensor topics the application can read from / write to.
|
||||
public var sensors: Sensors
|
||||
|
||||
/// The set point topics the application can read set point values from.
|
||||
public var setPoints: SetPoints
|
||||
|
||||
/// The state topics the application can read state values from.
|
||||
public var states: States
|
||||
|
||||
/// Create the topics required by the application.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - sensors: The sensor topics.
|
||||
/// - setPoints: The set point topics
|
||||
/// - states: The states topics
|
||||
/// - relays: The relay topics
|
||||
public init(
|
||||
commands: Commands = .init(),
|
||||
sensors: Sensors = .init(),
|
||||
setPoints: SetPoints = .init(),
|
||||
states: States = .init()
|
||||
) {
|
||||
self.commands = commands
|
||||
self.sensors = sensors
|
||||
self.setPoints = setPoints
|
||||
self.states = states
|
||||
}
|
||||
|
||||
/// Represents the sensor topics.
|
||||
public struct Sensors: Codable, Equatable {
|
||||
|
||||
public var mixedAirSensor: TemperatureAndHumiditySensor<State.Sensors.Mixed>
|
||||
public var postCoilSensor: TemperatureAndHumiditySensor<State.Sensors.PostCoil>
|
||||
public var returnAirSensor: TemperatureAndHumiditySensor<State.Sensors.Return>
|
||||
public var supplyAirSensor: TemperatureAndHumiditySensor<State.Sensors.Supply>
|
||||
|
||||
public init(
|
||||
mixedAirSensor: TemperatureAndHumiditySensor<State.Sensors.Mixed> = .default(location: "mixed=air"),
|
||||
postCoilSensor: TemperatureAndHumiditySensor<State.Sensors.PostCoil> = .default(location: "post-coil"),
|
||||
returnAirSensor: TemperatureAndHumiditySensor<State.Sensors.Return> = .default(location: "return"),
|
||||
supplyAirSensor: TemperatureAndHumiditySensor<State.Sensors.Supply> = .default(location: "supply")
|
||||
) {
|
||||
self.mixedAirSensor = mixedAirSensor
|
||||
self.postCoilSensor = postCoilSensor
|
||||
self.returnAirSensor = returnAirSensor
|
||||
self.supplyAirSensor = supplyAirSensor
|
||||
}
|
||||
|
||||
public struct TemperatureAndHumiditySensor<Location>: Codable, Equatable {
|
||||
public var temperature: String
|
||||
public var humidity: String
|
||||
public var dewPoint: String
|
||||
public var enthalpy: String
|
||||
|
||||
/// Create a new sensor topic container.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - temperature: The temperature sensor topic.
|
||||
/// - humidity: The humidity sensor topic.
|
||||
/// - dewPoint: The dew point sensor topic.
|
||||
public init(
|
||||
temperature: String,
|
||||
humidity: String,
|
||||
dewPoint: String,
|
||||
enthalpy: String
|
||||
) {
|
||||
self.temperature = temperature
|
||||
self.humidity = humidity
|
||||
self.dewPoint = dewPoint
|
||||
self.enthalpy = enthalpy
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A container for set point related topics used by the application.
|
||||
public struct SetPoints: Codable, Equatable {
|
||||
|
||||
/// The topic for the humidify set point.
|
||||
public var humidify: Humidify
|
||||
|
||||
/// The topics for dehumidification set points.
|
||||
public var dehumidify: Dehumidify
|
||||
|
||||
/// Create a new set point topic container.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - humidify: The topic for humidification set points.
|
||||
/// - dehumidify: The topics for dehumidification set points.
|
||||
public init(
|
||||
humidify: Humidify = .init(),
|
||||
dehumidify: Dehumidify = .init()
|
||||
) {
|
||||
self.humidify = humidify
|
||||
self.dehumidify = dehumidify
|
||||
}
|
||||
|
||||
/// A container for the humidification set point topics used by the application.
|
||||
public struct Humidify: Codable, Equatable {
|
||||
|
||||
/// The topic for dew point control mode set point.
|
||||
public var dewPoint: String
|
||||
|
||||
/// The topic for relative humidity control mode set point.
|
||||
public var relativeHumidity: String
|
||||
|
||||
/// Create a new container for the humidification set point topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - dewPoint: The topic for dew point control mode set point.
|
||||
/// - relativeHumidity: The topic for relative humidity control mode set point.
|
||||
public init(
|
||||
dewPoint: String = "set_points/humidify/dew_point",
|
||||
relativeHumidity: String = "set_points/humidify/relative_humidity"
|
||||
) {
|
||||
self.dewPoint = dewPoint
|
||||
self.relativeHumidity = relativeHumidity
|
||||
}
|
||||
}
|
||||
|
||||
/// A container for dehumidifcation set point topics.
|
||||
public struct Dehumidify: Codable, Equatable {
|
||||
|
||||
/// A low setting for dew point control modes.
|
||||
public var lowDewPoint: String
|
||||
|
||||
/// A high setting for dew point control modes.
|
||||
public var highDewPoint: String
|
||||
|
||||
/// A low setting for relative humidity control modes.
|
||||
public var lowRelativeHumidity: String
|
||||
|
||||
/// A high setting for relative humidity control modes.
|
||||
public var highRelativeHumidity: String
|
||||
|
||||
/// Create a new container for dehumidification set point topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - lowDewPoint: A low setting for dew point control modes.
|
||||
/// - highDewPoint: A high setting for dew point control modes.
|
||||
/// - lowRelativeHumidity: A low setting for relative humidity control modes.
|
||||
/// - highRelativeHumidity: A high setting for relative humidity control modes.
|
||||
public init(
|
||||
lowDewPoint: String = "set_points/dehumidify/low_dew_point",
|
||||
highDewPoint: String = "set_points/dehumidify/high_dew_point",
|
||||
lowRelativeHumidity: String = "set_points/dehumidify/low_relative_humidity",
|
||||
highRelativeHumidity: String = "set_points/dehumidify/high_relative_humidity"
|
||||
) {
|
||||
self.lowDewPoint = lowDewPoint
|
||||
self.highDewPoint = highDewPoint
|
||||
self.lowRelativeHumidity = lowRelativeHumidity
|
||||
self.highRelativeHumidity = highRelativeHumidity
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A container for control state topics used by the application.
|
||||
public struct States: Codable, Equatable {
|
||||
|
||||
/// The topic for the control mode.
|
||||
public var mode: String
|
||||
|
||||
/// The relay state topics.
|
||||
public var relays: Relays
|
||||
|
||||
/// Create a new container for control state topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - mode: The topic for the control mode.
|
||||
public init(
|
||||
mode: String = "states/mode",
|
||||
relays: Relays = .init()
|
||||
) {
|
||||
self.mode = mode
|
||||
self.relays = relays
|
||||
}
|
||||
|
||||
/// A container for reading the current state of a relay.
|
||||
public struct Relays: Codable, Equatable {
|
||||
|
||||
/// The dehumidification stage-1 relay topic.
|
||||
public var dehumdification1: String
|
||||
|
||||
/// The dehumidification stage-2 relay topic.
|
||||
public var dehumidification2: String
|
||||
|
||||
/// The humidification relay topic.
|
||||
public var humdification: String
|
||||
|
||||
/// Create a new container for relay state topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - dehumidification1: The dehumidification stage-1 relay topic.
|
||||
/// - dehumidification2: The dehumidification stage-2 relay topic.
|
||||
/// - humidification: The humidification relay topic.
|
||||
public init(
|
||||
dehumidefication1: String = "states/relays/dehumidification_1",
|
||||
dehumidification2: String = "states/relays/dehumidification_2",
|
||||
humidification: String = "states/relays/humidification"
|
||||
) {
|
||||
self.dehumdification1 = dehumidefication1
|
||||
self.dehumidification2 = dehumidification2
|
||||
self.humdification = humidification
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A container for commands topics that the application can publish to.
|
||||
public struct Commands: Codable, Equatable {
|
||||
|
||||
/// The relay command topics.
|
||||
public var relays: Relays
|
||||
|
||||
/// Create a new command topics container.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - relays: The relay command topics.
|
||||
public init(relays: Relays = .init()) {
|
||||
self.relays = relays
|
||||
}
|
||||
|
||||
/// A container for relay command topics used by the application.
|
||||
public struct Relays: Codable, Equatable {
|
||||
|
||||
/// The dehumidification stage-1 relay topic.
|
||||
public var dehumidification1: String
|
||||
|
||||
/// The dehumidification stage-2 relay topic.
|
||||
public var dehumidification2: String
|
||||
|
||||
/// The humidification relay topic.
|
||||
public var humidification: String
|
||||
|
||||
/// Create a new container for commanding relays.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - dehumidification1: The dehumidification stage-1 relay topic.
|
||||
/// - dehumidification2: The dehumidification stage-2 relay topic.
|
||||
/// - humidification: The humidification relay topic.
|
||||
public init(
|
||||
dehumidification1: String = "relays/dehumidification_1",
|
||||
dehumidification2: String = "relays/dehumidification_2",
|
||||
humidification: String = "relays/humidification"
|
||||
) {
|
||||
self.dehumidification1 = dehumidification1
|
||||
self.dehumidification2 = dehumidification2
|
||||
self.humidification = humidification
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Helpers
|
||||
extension Topics.Sensors.TemperatureAndHumiditySensor {
|
||||
public static func `default`(location: String) -> Self {
|
||||
.init(
|
||||
temperature: "sensors/\(location)/temperature",
|
||||
humidity: "sensors/\(location)/humidity",
|
||||
dewPoint: "sensors/\(location)/dew-point",
|
||||
enthalpy: "sensors/\(location)/enthalpy"
|
||||
)
|
||||
}
|
||||
}
|
||||
38
Sources/Models/TrackedChanges.swift
Normal file → Executable file
38
Sources/Models/TrackedChanges.swift
Normal file → Executable file
@@ -1,11 +1,20 @@
|
||||
|
||||
/// A property wrapper that tracks changes of a property.
|
||||
///
|
||||
/// This allows values to only publish changes if they have changed since the
|
||||
/// last time they were recieved.
|
||||
@propertyWrapper
|
||||
public struct TrackedChanges<Value> {
|
||||
|
||||
/// The current tracking state.
|
||||
private var tracking: TrackingState
|
||||
|
||||
/// The current wrapped value.
|
||||
private var value: Value
|
||||
|
||||
/// Used to check if a new value is equal to an old value.
|
||||
private var isEqual: (Value, Value) -> Bool
|
||||
|
||||
/// Access to the underlying property that we are wrapping.
|
||||
public var wrappedValue: Value {
|
||||
get { value }
|
||||
set {
|
||||
@@ -17,6 +26,12 @@ public struct TrackedChanges<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new property that tracks it's changes.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - wrappedValue: The value that we are wrapping.
|
||||
/// - needsProcessed: Whether this value needs processed (default = false).
|
||||
/// - isEqual: Method to compare old values against new values.
|
||||
public init(
|
||||
wrappedValue: Value,
|
||||
needsProcessed: Bool = false,
|
||||
@@ -27,11 +42,17 @@ public struct TrackedChanges<Value> {
|
||||
self.isEqual = isEqual
|
||||
}
|
||||
|
||||
/// Represents whether a wrapped value has changed and needs processed or not.
|
||||
enum TrackingState {
|
||||
|
||||
/// The state when nothing has changed and we've already processed the current value.
|
||||
case hasProcessed
|
||||
|
||||
/// The state when the value has changed and has not been processed yet.
|
||||
case needsProcessed
|
||||
}
|
||||
|
||||
/// Whether the value needs processed.
|
||||
public var needsProcessed: Bool {
|
||||
get { tracking == .needsProcessed }
|
||||
set {
|
||||
@@ -55,6 +76,11 @@ extension TrackedChanges: Equatable where Value: Equatable {
|
||||
&& lhs.needsProcessed == rhs.needsProcessed
|
||||
}
|
||||
|
||||
/// Create a new property that tracks it's changes, using the default equality check.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - wrappedValue: The value that we are wrapping.
|
||||
/// - needsProcessed: Whether this value needs processed (default = false).
|
||||
public init(
|
||||
wrappedValue: Value,
|
||||
needsProcessed: Bool = false
|
||||
@@ -62,3 +88,13 @@ extension TrackedChanges: Equatable where Value: Equatable {
|
||||
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
|
||||
}
|
||||
}
|
||||
|
||||
extension TrackedChanges: Hashable where Value: Hashable {
|
||||
|
||||
public func hash(into hasher: inout Hasher) {
|
||||
hasher.combine(wrappedValue)
|
||||
hasher.combine(needsProcessed)
|
||||
}
|
||||
}
|
||||
|
||||
extension TrackedChanges: Sendable where Value: Sendable {}
|
||||
|
||||
45
Sources/SensorsService/Helpers.swift
Executable file
45
Sources/SensorsService/Helpers.swift
Executable file
@@ -0,0 +1,45 @@
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import NIOFoundationCompat
|
||||
import PsychrometricClient
|
||||
|
||||
/// Represents a type that can be initialized by a ``ByteBuffer``.
|
||||
protocol BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer)
|
||||
}
|
||||
|
||||
extension Double: BufferInitalizable {
|
||||
|
||||
/// Attempt to create / parse a double from a byte buffer.
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let string = buffer.readString(
|
||||
length: buffer.readableBytes,
|
||||
encoding: String.Encoding.utf8
|
||||
)
|
||||
else { return nil }
|
||||
self.init(string)
|
||||
}
|
||||
}
|
||||
|
||||
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = RawValue(buffer: &buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
extension Humidity<Relative>: BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = Double(buffer: &buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
extension Temperature<DryAir>: BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = Double(buffer: &buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
333
Sources/SensorsService/SensorsService.swift
Normal file
333
Sources/SensorsService/SensorsService.swift
Normal file
@@ -0,0 +1,333 @@
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionService
|
||||
@preconcurrency import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClient
|
||||
import ServiceLifecycle
|
||||
|
||||
@DependencyClient
|
||||
public struct SensorsClient: Sendable {
|
||||
|
||||
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo>
|
||||
public var logger: Logger?
|
||||
public var publish: @Sendable (Double, String) async throws -> Void
|
||||
public var shutdown: @Sendable () -> Void = {}
|
||||
|
||||
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> {
|
||||
try await listen(topics)
|
||||
}
|
||||
|
||||
public func publish(_ value: Double, to topic: String) async throws {
|
||||
try await publish(value, topic)
|
||||
}
|
||||
}
|
||||
|
||||
extension SensorsClient: TestDependencyKey {
|
||||
public static var testValue: SensorsClient {
|
||||
Self()
|
||||
}
|
||||
}
|
||||
|
||||
public extension DependencyValues {
|
||||
var sensorsClient: SensorsClient {
|
||||
get { self[SensorsClient.self] }
|
||||
set { self[SensorsClient.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
public actor SensorsService2: Service {
|
||||
|
||||
@Dependency(\.sensorsClient) var client
|
||||
|
||||
private var sensors: [TemperatureAndHumiditySensor]
|
||||
|
||||
public init(
|
||||
sensors: [TemperatureAndHumiditySensor]
|
||||
) {
|
||||
self.sensors = sensors
|
||||
}
|
||||
|
||||
public func run() async throws {
|
||||
guard sensors.count > 0 else {
|
||||
throw SensorCountError()
|
||||
}
|
||||
|
||||
let stream = try await client.listen(to: topics)
|
||||
|
||||
do {
|
||||
try await withGracefulShutdownHandler {
|
||||
try await withThrowingDiscardingTaskGroup { group in
|
||||
for await result in stream.cancelOnGracefulShutdown() {
|
||||
group.addTask { try await self.handleResult(result) }
|
||||
}
|
||||
}
|
||||
} onGracefulShutdown: {
|
||||
Task {
|
||||
await self.client.logger?.trace("Received graceful shutdown.")
|
||||
try? await self.publishUpdates()
|
||||
await self.client.shutdown()
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
client.logger?.trace("Error: \(error)")
|
||||
client.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
private var topics: [String] {
|
||||
sensors.reduce(into: [String]()) { array, sensor in
|
||||
array.append(sensor.topics.temperature)
|
||||
array.append(sensor.topics.humidity)
|
||||
}
|
||||
}
|
||||
|
||||
private func handleResult(_ result: MQTTPublishInfo) async throws {
|
||||
let topic = result.topicName
|
||||
client.logger?.trace("Begin handling result for topic: \(topic)")
|
||||
|
||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||
var buffer = result.payload
|
||||
return V(buffer: &buffer)
|
||||
}
|
||||
|
||||
if topic.contains("temperature") {
|
||||
client.logger?.trace("Begin handling temperature result.")
|
||||
guard let temperature = decode(DryBulb.self) else {
|
||||
client.logger?.trace("Failed to decode temperature: \(result.payload)")
|
||||
throw DecodingError()
|
||||
}
|
||||
client.logger?.trace("Decoded temperature: \(temperature)")
|
||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||
|
||||
} else if topic.contains("humidity") {
|
||||
client.logger?.trace("Begin handling humidity result.")
|
||||
guard let humidity = decode(RelativeHumidity.self) else {
|
||||
client.logger?.trace("Failed to decode humidity: \(result.payload)")
|
||||
throw DecodingError()
|
||||
}
|
||||
client.logger?.trace("Decoded humidity: \(humidity)")
|
||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||
} else {
|
||||
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
|
||||
return
|
||||
}
|
||||
|
||||
try await publishUpdates()
|
||||
client.logger?.trace("Done handling result for topic: \(topic)")
|
||||
}
|
||||
|
||||
private func publish(_ double: Double?, to topic: String) async throws {
|
||||
guard let double else { return }
|
||||
try await client.publish(double, to: topic)
|
||||
client.logger?.trace("Published update to topic: \(topic)")
|
||||
}
|
||||
|
||||
private func publishUpdates() async throws {
|
||||
for sensor in sensors.filter(\.needsProcessed) {
|
||||
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
||||
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public actor SensorsService: Service {
|
||||
private var sensors: [TemperatureAndHumiditySensor]
|
||||
private let client: MQTTClient
|
||||
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
|
||||
nonisolated var logger: Logger { client.logger }
|
||||
private var shuttingDown: Bool = false
|
||||
|
||||
public init(
|
||||
client: MQTTClient,
|
||||
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
|
||||
sensors: [TemperatureAndHumiditySensor]
|
||||
) {
|
||||
self.client = client
|
||||
self.events = events
|
||||
self.sensors = sensors
|
||||
}
|
||||
|
||||
/// The entry-point of the service.
|
||||
///
|
||||
/// This method is called to start the service and begin
|
||||
/// listening for sensor value changes then publishing the dew-point
|
||||
/// and enthalpy values of the sensors.
|
||||
public func run() async throws {
|
||||
do {
|
||||
try await withGracefulShutdownHandler {
|
||||
try await withThrowingDiscardingTaskGroup { group in
|
||||
client.addPublishListener(named: "\(Self.self)") { result in
|
||||
if self.shuttingDown {
|
||||
self.logger.trace("Shutting down.")
|
||||
} else if !self.client.isActive() {
|
||||
self.logger.trace("Client is not currently active")
|
||||
} else {
|
||||
Task { try await self.handleResult(result) }
|
||||
}
|
||||
}
|
||||
for await event in self.events().cancelOnGracefulShutdown() {
|
||||
logger.trace("Received event: \(event)")
|
||||
if event == .shuttingDown {
|
||||
self.setIsShuttingDown()
|
||||
} else if event == .connected {
|
||||
group.addTask { try await self.subscribeToSensors() }
|
||||
} else {
|
||||
group.addTask { await self.unsubscribeToSensors() }
|
||||
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
|
||||
}
|
||||
}
|
||||
}
|
||||
} onGracefulShutdown: {
|
||||
// do something.
|
||||
self.logger.debug("Received graceful shutdown.")
|
||||
Task { [weak self] in await self?.setIsShuttingDown() }
|
||||
}
|
||||
} catch {
|
||||
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
|
||||
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
|
||||
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
|
||||
// to ignore the `noConnection` error.
|
||||
logger.trace("Run error: \(error)")
|
||||
// throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func setIsShuttingDown() {
|
||||
logger.debug("Received shut down event.")
|
||||
Task { try await publishUpdates() }
|
||||
Task { await self.unsubscribeToSensors() }
|
||||
shuttingDown = true
|
||||
client.removePublishListener(named: "\(Self.self)")
|
||||
}
|
||||
|
||||
private func handleResult(
|
||||
_ result: Result<MQTTPublishInfo, any Error>
|
||||
) async throws {
|
||||
logger.trace("Begin handling result")
|
||||
do {
|
||||
switch result {
|
||||
case let .failure(error):
|
||||
logger.debug("Failed receiving sensor: \(error)")
|
||||
throw error
|
||||
case let .success(value):
|
||||
// do something.
|
||||
let topic = value.topicName
|
||||
logger.trace("Received new value for topic: \(topic)")
|
||||
if topic.contains("temperature") {
|
||||
// do something.
|
||||
var buffer = value.payload
|
||||
guard let temperature = DryBulb(buffer: &buffer) else {
|
||||
logger.trace("Decoding error for topic: \(topic)")
|
||||
throw DecodingError()
|
||||
}
|
||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||
try await publishUpdates()
|
||||
|
||||
} else if topic.contains("humidity") {
|
||||
var buffer = value.payload
|
||||
// Decode and update the temperature value
|
||||
guard let humidity = RelativeHumidity(buffer: &buffer) else {
|
||||
logger.debug("Failed to decode humidity from buffer: \(buffer)")
|
||||
throw DecodingError()
|
||||
}
|
||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||
try await publishUpdates()
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
logger.trace("Handle Result error: \(error)")
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func subscribeToSensors() async throws {
|
||||
for sensor in sensors {
|
||||
_ = try await client.subscribe(to: [
|
||||
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
|
||||
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
|
||||
])
|
||||
logger.debug("Subscribed to sensor: \(sensor.location)")
|
||||
}
|
||||
}
|
||||
|
||||
private func unsubscribeToSensors() async {
|
||||
logger.trace("Begin unsubscribe to sensors.")
|
||||
guard client.isActive() else {
|
||||
logger.debug("Client is not active, skipping.")
|
||||
return
|
||||
}
|
||||
do {
|
||||
let topics = sensors.reduce(into: [String]()) { array, sensor in
|
||||
array.append(sensor.topics.temperature)
|
||||
array.append(sensor.topics.humidity)
|
||||
}
|
||||
try await client.unsubscribe(from: topics)
|
||||
logger.trace("Unsubscribed from sensors.")
|
||||
} catch {
|
||||
logger.trace("Unsubscribe error: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
private func publish(double: Double?, to topic: String) async throws {
|
||||
guard client.isActive() else { return }
|
||||
guard let double else { return }
|
||||
let rounded = round(double * 100) / 100
|
||||
logger.debug("Publishing \(rounded), to: \(topic)")
|
||||
try await client.publish(
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
|
||||
qos: .exactlyOnce,
|
||||
retain: true
|
||||
)
|
||||
}
|
||||
|
||||
private func publishUpdates() async throws {
|
||||
for sensor in sensors.filter(\.needsProcessed) {
|
||||
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
|
||||
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
|
||||
try sensors.hasProcessed(sensor)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// MARK: - Errors
|
||||
|
||||
struct DecodingError: Error {}
|
||||
struct MQTTClientNotConnected: Error {}
|
||||
struct NotFoundError: Error {}
|
||||
struct SensorExists: Error {}
|
||||
struct SensorCountError: Error {}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
private extension TemperatureAndHumiditySensor.Topics {
|
||||
func contains(_ topic: String) -> Bool {
|
||||
temperature == topic || humidity == topic
|
||||
}
|
||||
}
|
||||
|
||||
private extension Array where Element == TemperatureAndHumiditySensor {
|
||||
|
||||
mutating func update<V>(
|
||||
topic: String,
|
||||
keyPath: WritableKeyPath<TemperatureAndHumiditySensor, V>,
|
||||
with value: V
|
||||
) throws {
|
||||
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
|
||||
throw NotFoundError()
|
||||
}
|
||||
self[index][keyPath: keyPath] = value
|
||||
}
|
||||
|
||||
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
|
||||
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
|
||||
throw NotFoundError()
|
||||
}
|
||||
self[index].needsProcessed = false
|
||||
}
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
import Models
|
||||
|
||||
// TODO: Fix other live topics
|
||||
extension Topics {
|
||||
|
||||
public static let live = Self.init(
|
||||
commands: .init(),
|
||||
sensors: .init(
|
||||
mixedAirSensor: .live(location: .mixedAir),
|
||||
postCoilSensor: .live(location: .postCoil),
|
||||
returnAirSensor: .live(location: .return),
|
||||
supplyAirSensor: .live(location: .supply)),
|
||||
setPoints: .init(),
|
||||
states: .init()
|
||||
)
|
||||
}
|
||||
|
||||
extension Topics.Sensors {
|
||||
fileprivate enum Location: CustomStringConvertible {
|
||||
case mixedAir
|
||||
case postCoil
|
||||
case `return`
|
||||
case supply
|
||||
|
||||
var description: String {
|
||||
switch self {
|
||||
case .mixedAir:
|
||||
return "mixed_air"
|
||||
case .postCoil:
|
||||
return "post_coil"
|
||||
case .return:
|
||||
return "return"
|
||||
case .supply:
|
||||
return "supply"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension Topics.Sensors.TemperatureAndHumiditySensor {
|
||||
fileprivate static func live(
|
||||
prefix: String = "frankensystem",
|
||||
location: Topics.Sensors.Location
|
||||
) -> Self {
|
||||
.init(
|
||||
temperature: "\(prefix)/sensor/\(location.description)_temperature/state",
|
||||
humidity: "\(prefix)/sensor/\(location.description)_humidity/state",
|
||||
dewPoint: "\(prefix)/sensor/\(location.description)_dew_point/state",
|
||||
enthalpy: "\(prefix)/sensor/\(location.description)_enthalpy/state"
|
||||
)
|
||||
}
|
||||
}
|
||||
101
Sources/dewPoint-controller/Application.swift
Normal file
101
Sources/dewPoint-controller/Application.swift
Normal file
@@ -0,0 +1,101 @@
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionService
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
import SensorsService
|
||||
import ServiceLifecycle
|
||||
|
||||
@main
|
||||
struct Application {
|
||||
/// The main entry point of the application.
|
||||
static func main() async throws {
|
||||
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
var logger = Logger(label: "dewpoint-controller")
|
||||
logger.logLevel = .trace
|
||||
|
||||
logger.info("Starting dewpoint-controller!")
|
||||
|
||||
let environment = loadEnvVars(logger: logger)
|
||||
|
||||
if environment.appEnv == .production {
|
||||
logger.debug("Updating logging level to info.")
|
||||
logger.logLevel = .info
|
||||
}
|
||||
|
||||
let mqtt = MQTTClient(
|
||||
envVars: environment,
|
||||
eventLoopGroup: eventloopGroup,
|
||||
logger: logger
|
||||
)
|
||||
|
||||
let mqttConnection = MQTTConnectionService(client: mqtt)
|
||||
let sensors = SensorsService(
|
||||
client: mqtt,
|
||||
events: { mqttConnection.events },
|
||||
sensors: .live
|
||||
)
|
||||
|
||||
let serviceGroup = ServiceGroup(
|
||||
services: [
|
||||
mqttConnection,
|
||||
sensors
|
||||
],
|
||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||
logger: logger
|
||||
)
|
||||
|
||||
try await serviceGroup.run()
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
private func loadEnvVars(logger: Logger) -> EnvVars {
|
||||
let defaultEnvVars = EnvVars()
|
||||
let encoder = JSONEncoder()
|
||||
let decoder = JSONDecoder()
|
||||
|
||||
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
|
||||
.flatMap { try? decoder.decode([String: String].self, from: $0) }
|
||||
?? [:]
|
||||
|
||||
let envVarsDict = defaultEnvDict
|
||||
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
|
||||
|
||||
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
|
||||
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
|
||||
?? defaultEnvVars
|
||||
|
||||
logger.debug("Done loading EnvVars...")
|
||||
|
||||
return envVars
|
||||
}
|
||||
|
||||
private extension MQTTNIO.MQTTClient {
|
||||
convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
|
||||
self.init(
|
||||
host: envVars.host,
|
||||
port: envVars.port != nil ? Int(envVars.port!) : nil,
|
||||
identifier: envVars.identifier,
|
||||
eventLoopGroupProvider: .shared(eventLoopGroup),
|
||||
logger: logger,
|
||||
configuration: .init(
|
||||
version: .v3_1_1,
|
||||
disablePing: false,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private extension Array where Element == TemperatureAndHumiditySensor {
|
||||
static var live: Self {
|
||||
TemperatureAndHumiditySensor.Location.allCases.map { location in
|
||||
TemperatureAndHumiditySensor(location: location)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
import Bootstrap
|
||||
import ClientLive
|
||||
import CoreUnitTypes
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import TopicsLive
|
||||
import Foundation
|
||||
|
||||
var logger: Logger = {
|
||||
var logger = Logger(label: "dewPoint-logger")
|
||||
logger.logLevel = .debug
|
||||
return logger
|
||||
}()
|
||||
|
||||
logger.info("Starting Swift Dew Point Controller!")
|
||||
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
var environment = try bootstrap(eventLoopGroup: eventLoopGroup, logger: logger, autoConnect: false).wait()
|
||||
|
||||
// Set the log level to info only in production mode.
|
||||
if environment.envVars.appEnv == .production {
|
||||
logger.logLevel = .info
|
||||
}
|
||||
|
||||
// Set up the client, topics and state.
|
||||
environment.topics = .live
|
||||
let state = State()
|
||||
let client = Client.live(client: environment.mqttClient, state: state, topics: environment.topics)
|
||||
|
||||
defer {
|
||||
logger.debug("Disconnecting")
|
||||
}
|
||||
|
||||
// Add topic listeners.
|
||||
client.addListeners()
|
||||
|
||||
while true {
|
||||
if !environment.mqttClient.isActive() {
|
||||
logger.trace("Connecting to MQTT broker...")
|
||||
try client.connect().wait()
|
||||
try client.subscribe().wait()
|
||||
Thread.sleep(forTimeInterval: 1)
|
||||
}
|
||||
|
||||
// Check if sensors need processed.
|
||||
if state.sensors.needsProcessed {
|
||||
logger.debug("Sensor state has changed...")
|
||||
if state.sensors.mixedAirSensor.needsProcessed {
|
||||
logger.trace("Publishing mixed air sensor.")
|
||||
try client.publishSensor(.mixed(state.sensors.mixedAirSensor)).wait()
|
||||
}
|
||||
if state.sensors.postCoilSensor.needsProcessed {
|
||||
logger.trace("Publishing post coil sensor.")
|
||||
try client.publishSensor(.postCoil(state.sensors.postCoilSensor)).wait()
|
||||
}
|
||||
if state.sensors.returnAirSensor.needsProcessed {
|
||||
logger.trace("Publishing return air sensor.")
|
||||
try client.publishSensor(.return(state.sensors.returnAirSensor)).wait()
|
||||
}
|
||||
if state.sensors.supplyAirSensor.needsProcessed {
|
||||
logger.trace("Publishing supply air sensor.")
|
||||
try client.publishSensor(.supply(state.sensors.supplyAirSensor)).wait()
|
||||
}
|
||||
}
|
||||
|
||||
// logger.debug("Fetching dew point...")
|
||||
//
|
||||
// logger.debug("Published dew point...")
|
||||
|
||||
Thread.sleep(forTimeInterval: 5)
|
||||
}
|
||||
@@ -1,45 +0,0 @@
|
||||
import XCTest
|
||||
import EnvVars
|
||||
import Logging
|
||||
import Models
|
||||
@testable import ClientLive
|
||||
import Psychrometrics
|
||||
|
||||
final class AsyncClientTests: XCTestCase {
|
||||
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "AsyncClientTests")
|
||||
logger.logLevel = .trace
|
||||
return logger
|
||||
}()
|
||||
|
||||
func createClient(identifier: String) -> AsyncClient {
|
||||
let envVars = EnvVars.init(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
return .init(envVars: envVars, logger: Self.logger)
|
||||
}
|
||||
|
||||
func testConnectAndShutdown() async throws {
|
||||
let client = createClient(identifier: "testConnectAndShutdown")
|
||||
await client.connect()
|
||||
await client.shutdown()
|
||||
}
|
||||
|
||||
func testPublishingSensor() async throws {
|
||||
let client = createClient(identifier: "testPublishingSensor")
|
||||
await client.connect()
|
||||
let topic = Topics().sensors.mixedAirSensor.dewPoint
|
||||
try await client.addPublishListener(topic: topic, decoding: Temperature.self)
|
||||
try await client.publishSensor(.mixed(.init(temperature: 71.123, humidity: 50.5, needsProcessed: true)))
|
||||
try await client.publishSensor(.mixed(.init(temperature: 72.123, humidity: 50.5, needsProcessed: true)))
|
||||
await client.shutdown()
|
||||
}
|
||||
}
|
||||
@@ -1,186 +0,0 @@
|
||||
import Client
|
||||
@testable import ClientLive
|
||||
import CoreUnitTypes
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import NIOConcurrencyHelpers
|
||||
import XCTest
|
||||
|
||||
final class ClientLiveTests: XCTestCase {
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
let topics = Topics()
|
||||
|
||||
// func test_mqtt_subscription() throws {
|
||||
// let mqttClient = createMQTTClient(identifier: "test_subscription")
|
||||
// _ = try mqttClient.connect().wait()
|
||||
// let sub = try mqttClient.v5.subscribe(
|
||||
// to: [mqttClient.mqttSubscription(topic: "test/subscription")]
|
||||
// ).wait()
|
||||
// XCTAssertEqual(sub.reasons[0], .grantedQoS1)
|
||||
// try mqttClient.disconnect().wait()
|
||||
// try mqttClient.syncShutdownGracefully()
|
||||
// }
|
||||
|
||||
func test_mqtt_listener() throws {
|
||||
let lock = Lock()
|
||||
var publishRecieved: [MQTTPublishInfo] = []
|
||||
let payloadString = "test"
|
||||
let payload = ByteBufferAllocator().buffer(string: payloadString)
|
||||
|
||||
let client = self.createMQTTClient(identifier: "testMQTTListener_publisher")
|
||||
_ = try client.connect().wait()
|
||||
client.addPublishListener(named: "test") { result in
|
||||
switch result {
|
||||
case .success(let publish):
|
||||
var buffer = publish.payload
|
||||
let string = buffer.readString(length: buffer.readableBytes)
|
||||
XCTAssertEqual(string, payloadString)
|
||||
lock.withLock {
|
||||
publishRecieved.append(publish)
|
||||
}
|
||||
case .failure(let error):
|
||||
XCTFail("\(error)")
|
||||
}
|
||||
}
|
||||
|
||||
try client.publish(to: "testMQTTSubscribe", payload: payload, qos: .atLeastOnce, retain: true).wait()
|
||||
let sub = try client.v5.subscribe(to: [.init(topicFilter: "testMQTTSubscribe", qos: .atLeastOnce)]).wait()
|
||||
XCTAssertEqual(sub.reasons[0], .grantedQoS1)
|
||||
|
||||
Thread.sleep(forTimeInterval: 2)
|
||||
lock.withLock {
|
||||
XCTAssertEqual(publishRecieved.count, 1)
|
||||
}
|
||||
|
||||
try client.disconnect().wait()
|
||||
try client.syncShutdownGracefully()
|
||||
|
||||
}
|
||||
|
||||
func test_client2_returnTemperature_listener() throws {
|
||||
let mqttClient = createMQTTClient(identifier: "return-temperature-tests")
|
||||
let state = State()
|
||||
let topics = Topics()
|
||||
let client = Client.live(client: mqttClient, state: state, topics: topics)
|
||||
|
||||
client.addListeners()
|
||||
try client.connect().wait()
|
||||
try client.subscribe().wait()
|
||||
|
||||
_ = try mqttClient.publish(
|
||||
to: topics.sensors.returnAirSensor.temperature,
|
||||
payload: ByteBufferAllocator().buffer(string: "75.1234"),
|
||||
qos: .atLeastOnce
|
||||
).wait()
|
||||
|
||||
Thread.sleep(forTimeInterval: 2)
|
||||
|
||||
XCTAssertEqual(state.sensors.returnAirSensor.temperature, .celsius(75.1234))
|
||||
try mqttClient.disconnect().wait()
|
||||
try mqttClient.syncShutdownGracefully()
|
||||
|
||||
// try client.shutdown().wait()
|
||||
}
|
||||
|
||||
func test_client2_returnSensor_publish() throws {
|
||||
let mqttClient = createMQTTClient(identifier: "return-temperature-tests")
|
||||
let state = State()
|
||||
let topics = Topics()
|
||||
let client = Client.live(client: mqttClient, state: state, topics: topics)
|
||||
|
||||
client.addListeners()
|
||||
try client.connect().wait()
|
||||
try client.subscribe().wait()
|
||||
|
||||
_ = try mqttClient.publish(
|
||||
to: topics.sensors.returnAirSensor.temperature,
|
||||
payload: ByteBufferAllocator().buffer(string: "75.1234"),
|
||||
qos: .atLeastOnce
|
||||
).wait()
|
||||
|
||||
_ = try mqttClient.publish(
|
||||
to: topics.sensors.returnAirSensor.humidity,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(50.0)"),
|
||||
qos: .atLeastOnce
|
||||
).wait()
|
||||
|
||||
Thread.sleep(forTimeInterval: 2)
|
||||
XCTAssert(state.sensors.returnAirSensor.needsProcessed)
|
||||
|
||||
try client.publishSensor(.return(state.sensors.returnAirSensor)).wait()
|
||||
XCTAssertFalse(state.sensors.returnAirSensor.needsProcessed)
|
||||
|
||||
try mqttClient.disconnect().wait()
|
||||
try mqttClient.syncShutdownGracefully()
|
||||
|
||||
// try client.shutdown().wait()
|
||||
}
|
||||
|
||||
// func test_fetch_humidity() throws {
|
||||
// let lock = Lock()
|
||||
// let publishClient = createMQTTClient(identifier: "publishHumidity")
|
||||
// let mqttClient = createMQTTClient(identifier: "fetchHumidity")
|
||||
// _ = try publishClient.connect().wait()
|
||||
// let client = try createClient(mqttClient: mqttClient)
|
||||
// var humidityRecieved: [RelativeHumidity] = []
|
||||
//
|
||||
// _ = try publishClient.publish(
|
||||
// to: topics.sensors.humidity,
|
||||
// payload: ByteBufferAllocator().buffer(string: "\(50.0)"),
|
||||
// qos: .atLeastOnce
|
||||
// ).wait()
|
||||
//
|
||||
// Thread.sleep(forTimeInterval: 2)
|
||||
// try publishClient.disconnect().wait()
|
||||
// let humidity = try client.fetchHumidity(.init(topic: self.topics.sensors.humidity)).wait()
|
||||
// XCTAssertEqual(humidity, 50)
|
||||
// Thread.sleep(forTimeInterval: 2)
|
||||
// lock.withLock {
|
||||
// humidityRecieved.append(humidity)
|
||||
// }
|
||||
// try mqttClient.disconnect().wait()
|
||||
// try mqttClient.syncShutdownGracefully()
|
||||
// }
|
||||
|
||||
// MARK: - Helpers
|
||||
func createMQTTClient(identifier: String) -> MQTTNIO.MQTTClient {
|
||||
MQTTNIO.MQTTClient(
|
||||
host: Self.hostname,
|
||||
port: 1883,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(eventLoopGroup),
|
||||
logger: self.logger,
|
||||
configuration: .init(version: .v5_0)
|
||||
)
|
||||
}
|
||||
|
||||
// func createWebSocketClient(identifier: String) -> MQTTNIO.MQTTClient {
|
||||
// MQTTNIO.MQTTClient(
|
||||
// host: Self.hostname,
|
||||
// port: 8080,
|
||||
// identifier: identifier,
|
||||
// eventLoopGroupProvider: .createNew,
|
||||
// logger: self.logger,
|
||||
// configuration: .init(useWebSockets: true, webSocketURLPath: "/mqtt")
|
||||
// )
|
||||
// }
|
||||
|
||||
// Uses default topic names.
|
||||
// func createClient(mqttClient: MQTTNIO.MQTTClient, autoConnect: Bool = true) throws -> Client.MQTTClient {
|
||||
// if autoConnect {
|
||||
// _ = try mqttClient.connect().wait()
|
||||
// }
|
||||
// return .live(client: mqttClient, topics: .init())
|
||||
// }
|
||||
|
||||
let logger: Logger = {
|
||||
var logger = Logger(label: "MQTTTests")
|
||||
logger.logLevel = .trace
|
||||
return logger
|
||||
}()
|
||||
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup.init(numberOfThreads: 1)
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
import Combine
|
||||
import Logging
|
||||
import Models
|
||||
@testable import MQTTConnectionService
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import ServiceLifecycle
|
||||
import ServiceLifecycleTestKit
|
||||
import XCTest
|
||||
|
||||
final class MQTTConnectionServiceTests: XCTestCase {
|
||||
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "AsyncClientTests")
|
||||
logger.logLevel = .trace
|
||||
return logger
|
||||
}()
|
||||
|
||||
func testGracefulShutdownWorks() async throws {
|
||||
try await testGracefulShutdown { trigger in
|
||||
let client = createClient(identifier: "testGracefulShutdown")
|
||||
let service = MQTTConnectionService(client: client)
|
||||
try await service.run()
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
XCTAssert(client.isActive())
|
||||
trigger.triggerGracefulShutdown()
|
||||
// try await Task.sleep(for: .seconds(2))
|
||||
// XCTAssertFalse(client.isActive())
|
||||
}
|
||||
}
|
||||
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
let envVars = EnvVars(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
let config = MQTTClient.Configuration(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
return .init(
|
||||
host: Self.hostname,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
||||
logger: Self.logger,
|
||||
configuration: config
|
||||
)
|
||||
}
|
||||
|
||||
func testEventStream() async throws {
|
||||
var connection: ConnectionStream? = ConnectionStream()
|
||||
|
||||
let task = Task {
|
||||
guard let events = connection?.events else { return }
|
||||
print("before loop")
|
||||
for await event in events {
|
||||
print("\(event)")
|
||||
}
|
||||
print("after loop")
|
||||
}
|
||||
|
||||
let ending = Task {
|
||||
try await Task.sleep(for: .seconds(2))
|
||||
connection = nil
|
||||
}
|
||||
|
||||
connection?.start()
|
||||
try await ending.value
|
||||
task.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ConnectionStream {
|
||||
|
||||
enum Event {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
let events: AsyncStream<Event>
|
||||
private let continuation: AsyncStream<Event>.Continuation
|
||||
private var cancellable: AnyCancellable?
|
||||
|
||||
init() {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
print("connection stream is gone.")
|
||||
stop()
|
||||
}
|
||||
|
||||
func start() {
|
||||
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
||||
.autoconnect()
|
||||
.sink { [weak self] _ in
|
||||
print("will send event.")
|
||||
self?.continuation.yield(.connected)
|
||||
}
|
||||
}
|
||||
|
||||
func stop() {
|
||||
continuation.yield(.shuttingDown)
|
||||
cancellable = nil
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
298
Tests/SensorsServiceTests/SensorsClientTests.swift
Executable file
298
Tests/SensorsServiceTests/SensorsClientTests.swift
Executable file
@@ -0,0 +1,298 @@
|
||||
import Dependencies
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
@testable import SensorsService
|
||||
import XCTest
|
||||
|
||||
final class SensorsClientTests: XCTestCase {
|
||||
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "AsyncClientTests")
|
||||
logger.logLevel = .debug
|
||||
return logger
|
||||
}()
|
||||
|
||||
override func invokeTest() {
|
||||
withDependencies {
|
||||
$0.psychrometricClient = PsychrometricClient.liveValue
|
||||
} operation: {
|
||||
super.invokeTest()
|
||||
}
|
||||
}
|
||||
|
||||
// func createClient(identifier: String) -> SensorsClient {
|
||||
// let envVars = EnvVars(
|
||||
// appEnv: .testing,
|
||||
// host: Self.hostname,
|
||||
// port: "1883",
|
||||
// identifier: identifier,
|
||||
// userName: nil,
|
||||
// password: nil
|
||||
// )
|
||||
// return .init(envVars: envVars, logger: Self.logger)
|
||||
// }
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
let envVars = EnvVars(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
let config = MQTTClient.Configuration(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
return .init(
|
||||
host: Self.hostname,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
||||
logger: Self.logger,
|
||||
configuration: config
|
||||
)
|
||||
}
|
||||
|
||||
// func testConnectAndShutdown() async throws {
|
||||
// let client = createClient(identifier: "testConnectAndShutdown")
|
||||
// await client.connect()
|
||||
// await client.shutdown()
|
||||
// }
|
||||
|
||||
// func testSensorService() async throws {
|
||||
// let mqtt = createClient(identifier: "testSensorService")
|
||||
// // let mqtt = await client.client
|
||||
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
|
||||
// let publishInfo = PublishInfoContainer(topicFilters: [
|
||||
// sensor.topics.dewPoint,
|
||||
// sensor.topics.enthalpy
|
||||
// ])
|
||||
// let service = SensorsService(client: mqtt, sensors: [sensor])
|
||||
//
|
||||
// // fix to connect the mqtt client.
|
||||
// try await mqtt.connect()
|
||||
// let task = Task { try await service.run() }
|
||||
//
|
||||
// _ = try await mqtt.subscribe(to: [
|
||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce),
|
||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce)
|
||||
// ])
|
||||
//
|
||||
// let listener = mqtt.createPublishListener()
|
||||
// Task {
|
||||
// for await result in listener {
|
||||
// switch result {
|
||||
// case let .failure(error):
|
||||
// XCTFail("\(error)")
|
||||
// case let .success(value):
|
||||
// await publishInfo.addPublishInfo(value)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// try await mqtt.publish(
|
||||
// to: sensor.topics.temperature,
|
||||
// payload: ByteBufferAllocator().buffer(string: "75.123"),
|
||||
// qos: MQTTQoS.exactlyOnce,
|
||||
// retain: true
|
||||
// )
|
||||
//
|
||||
// try await Task.sleep(for: .seconds(1))
|
||||
//
|
||||
// // XCTAssert(client.sensors.first!.needsProcessed)
|
||||
// // let firstSensor = await client.sensors.first!
|
||||
// // XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius))
|
||||
//
|
||||
// try await mqtt.publish(
|
||||
// to: sensor.topics.humidity,
|
||||
// payload: ByteBufferAllocator().buffer(string: "50"),
|
||||
// qos: MQTTQoS.exactlyOnce,
|
||||
// retain: true
|
||||
// )
|
||||
//
|
||||
// try await Task.sleep(for: .seconds(1))
|
||||
//
|
||||
// // not working for some reason
|
||||
// // XCTAssertEqual(publishInfo.info.count, 2)
|
||||
//
|
||||
// XCTAssert(publishInfo.info.count > 1)
|
||||
//
|
||||
// // fix to shutdown the mqtt client.
|
||||
// task.cancel()
|
||||
// try await mqtt.shutdown()
|
||||
// }
|
||||
|
||||
func testCapturingSensorClient() async throws {
|
||||
class CapturedValues {
|
||||
var values = [(value: Double, topic: String)]()
|
||||
var didShutdown = false
|
||||
|
||||
init() {}
|
||||
}
|
||||
|
||||
let capturedValues = CapturedValues()
|
||||
|
||||
try await withDependencies {
|
||||
$0.sensorsClient = .testing(
|
||||
yielding: [
|
||||
(value: 76, to: "not-listening"),
|
||||
(value: 75, to: "test")
|
||||
]
|
||||
) { value, topic in
|
||||
capturedValues.values.append((value, topic))
|
||||
} captureShutdownEvent: {
|
||||
capturedValues.didShutdown = $0
|
||||
}
|
||||
} operation: {
|
||||
@Dependency(\.sensorsClient) var client
|
||||
let stream = try await client.listen(to: ["test"])
|
||||
|
||||
for await value in stream {
|
||||
var buffer = value.payload
|
||||
guard let double = Double(buffer: &buffer) else {
|
||||
XCTFail("Failed to decode double")
|
||||
return
|
||||
}
|
||||
|
||||
XCTAssertEqual(double, 75)
|
||||
XCTAssertEqual(value.topicName, "test")
|
||||
try await client.publish(26, to: "publish")
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
client.shutdown()
|
||||
}
|
||||
|
||||
XCTAssertEqual(capturedValues.values.count, 1)
|
||||
XCTAssertEqual(capturedValues.values.first?.value, 26)
|
||||
XCTAssertEqual(capturedValues.values.first?.topic, "publish")
|
||||
XCTAssertTrue(capturedValues.didShutdown)
|
||||
}
|
||||
}
|
||||
|
||||
// func testSensorCapturesPublishedState() async throws {
|
||||
// let client = createClient(identifier: "testSensorCapturesPublishedState")
|
||||
// let mqtt = client.client
|
||||
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
|
||||
// let publishInfo = PublishInfoContainer(topicFilters: [
|
||||
// sensor.topics.dewPoint,
|
||||
// sensor.topics.enthalpy
|
||||
// ])
|
||||
//
|
||||
// try await client.addSensor(sensor)
|
||||
// await client.connect()
|
||||
// try await client.start()
|
||||
//
|
||||
// _ = try await mqtt.subscribe(to: [
|
||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: MQTTQoS.exactlyOnce),
|
||||
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: MQTTQoS.exactlyOnce)
|
||||
// ])
|
||||
//
|
||||
// let listener = mqtt.createPublishListener()
|
||||
// Task {
|
||||
// for await result in listener {
|
||||
// switch result {
|
||||
// case let .failure(error):
|
||||
// XCTFail("\(error)")
|
||||
// case let .success(value):
|
||||
// await publishInfo.addPublishInfo(value)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// try await mqtt.publish(
|
||||
// to: sensor.topics.temperature,
|
||||
// payload: ByteBufferAllocator().buffer(string: "75.123"),
|
||||
// qos: MQTTQoS.exactlyOnce,
|
||||
// retain: true
|
||||
// )
|
||||
//
|
||||
// try await Task.sleep(for: .seconds(1))
|
||||
//
|
||||
// // XCTAssert(client.sensors.first!.needsProcessed)
|
||||
// let firstSensor = client.sensors.first!
|
||||
// XCTAssertEqual(firstSensor.temperature, DryBulb.celsius(75.123))
|
||||
//
|
||||
// try await mqtt.publish(
|
||||
// to: sensor.topics.humidity,
|
||||
// payload: ByteBufferAllocator().buffer(string: "50"),
|
||||
// qos: MQTTQoS.exactlyOnce,
|
||||
// retain: true
|
||||
// )
|
||||
//
|
||||
// try await Task.sleep(for: .seconds(1))
|
||||
//
|
||||
// XCTAssertEqual(publishInfo.info.count, 2)
|
||||
//
|
||||
// await client.shutdown()
|
||||
// }
|
||||
}
|
||||
|
||||
// MARK: Helpers for tests.
|
||||
|
||||
class PublishInfoContainer {
|
||||
private(set) var info: [MQTTPublishInfo]
|
||||
private var topicFilters: [String]?
|
||||
|
||||
init(topicFilters: [String]? = nil) {
|
||||
self.info = []
|
||||
self.topicFilters = topicFilters
|
||||
}
|
||||
|
||||
func addPublishInfo(_ info: MQTTPublishInfo) async {
|
||||
guard let topicFilters else {
|
||||
self.info.append(info)
|
||||
return
|
||||
}
|
||||
if topicFilters.contains(info.topicName) {
|
||||
self.info.append(info)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension SensorsClient {
|
||||
|
||||
static func testing(
|
||||
yielding: [(value: Double, to: String)],
|
||||
capturePublishedValues: @escaping (Double, String) -> Void,
|
||||
captureShutdownEvent: @escaping (Bool) -> Void
|
||||
) -> Self {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self)
|
||||
let logger = Logger(label: "\(Self.self).testing")
|
||||
|
||||
return .init(
|
||||
listen: { topics in
|
||||
for (value, topic) in yielding where topics.contains(topic) {
|
||||
continuation.yield(
|
||||
MQTTPublishInfo(
|
||||
qos: .atLeastOnce,
|
||||
retain: true,
|
||||
topicName: topic,
|
||||
payload: ByteBuffer(string: "\(value)"),
|
||||
properties: MQTTProperties()
|
||||
)
|
||||
)
|
||||
}
|
||||
return stream
|
||||
},
|
||||
logger: logger,
|
||||
publish: { value, topic in
|
||||
capturePublishedValues(value, topic)
|
||||
},
|
||||
shutdown: {
|
||||
captureShutdownEvent(true)
|
||||
continuation.finish()
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct TopicNotFoundError: Error {}
|
||||
0
Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift
Normal file → Executable file
0
Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift
Normal file → Executable file
22
docker-compose.yaml
Normal file → Executable file
22
docker-compose.yaml
Normal file → Executable file
@@ -1,20 +1,29 @@
|
||||
# run this with docker-compose -f docker/docker-compose.yml run test
|
||||
# run this with docker-compose run test
|
||||
services:
|
||||
server:
|
||||
image: swift-mqtt-dewpoint:latest
|
||||
restart: unless-stopped
|
||||
env_file: .env
|
||||
|
||||
local:
|
||||
container_name: local-server
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
platform: linux/amd64
|
||||
depends_on:
|
||||
- mosquitto
|
||||
environment:
|
||||
- MOSQUITTO_SERVER=mosquitto
|
||||
|
||||
test:
|
||||
image: swift:latest
|
||||
#build:
|
||||
#context: ./
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile.test
|
||||
platform: linux/amd64
|
||||
working_dir: /app
|
||||
networks:
|
||||
- test
|
||||
volumes:
|
||||
- .:/app
|
||||
depends_on:
|
||||
- mosquitto-test
|
||||
environment:
|
||||
@@ -44,4 +53,3 @@ networks:
|
||||
test:
|
||||
driver: bridge
|
||||
external: false
|
||||
|
||||
|
||||
0
mosquitto/config/mosquitto.conf
Normal file → Executable file
0
mosquitto/config/mosquitto.conf
Normal file → Executable file
Reference in New Issue
Block a user