83 Commits

Author SHA1 Message Date
c84427a9b3 feat: Renaming and moves some items around, listeners now manage reconnection events.
All checks were successful
CI / Run Tests (push) Successful in 4m16s
2024-11-15 17:15:01 -05:00
947472f62d feat: Minimal readme and cleans up docker files.
All checks were successful
CI / Run Tests (push) Successful in 27m31s
2024-11-14 22:21:02 -05:00
d16135dd50 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:20:25 -05:00
19e97652fd feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:19:26 -05:00
1089452212 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:17:08 -05:00
5e998a60d0 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Failing after 13s
2024-11-14 22:12:20 -05:00
9e2af22a36 feat: Fix CI
All checks were successful
CI / Run Tests (push) Successful in 15m25s
2024-11-14 21:36:56 -05:00
89f3601c2c fix: Fixes sensor service test that was flaky and moves docker stuff into it's own directory.
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 21:34:48 -05:00
d4b6f6ad2b fix: Fixes sensor service test that was flaky and moves docker stuff into it's own directory.
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 21:24:23 -05:00
ec3cd40fef feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m57s
2024-11-14 21:00:01 -05:00
953c9d5b7c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m57s
2024-11-14 20:54:00 -05:00
00bb6ca1a6 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m40s
2024-11-14 20:45:50 -05:00
41fb3c5715 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 20:44:31 -05:00
8e4430804c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 4m31s
2024-11-14 20:02:50 -05:00
a8f689136d feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 20:01:45 -05:00
2607be6658 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 20:00:24 -05:00
b05e18b258 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:59:45 -05:00
394b49d1a0 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m14s
2024-11-14 19:55:19 -05:00
6bec0d6fa5 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m33s
2024-11-14 19:52:28 -05:00
63d65bd7cd feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:51:25 -05:00
320f3e792e feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:49:04 -05:00
74b73e7534 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:47:43 -05:00
7954fc5dcd feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:45:40 -05:00
115c4dc252 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:43:07 -05:00
853a157ae7 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:35:26 -05:00
30b8ea3661 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:32:47 -05:00
d26ab714ab feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:30:20 -05:00
b45ad76fff feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 19:24:58 -05:00
c4395b9089 feat: Run CI
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 19:00:22 -05:00
b3874b96c5 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8m52s
2024-11-14 18:41:37 -05:00
4024bb624f feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 18:39:10 -05:00
6371ffed47 fix: Fixing naming of dewpoint-controller, part 2.
Some checks failed
CI / Run Tests (push) Failing after 2m35s
2024-11-14 18:10:52 -05:00
76b06e86fa fix: Fixing naming of dewpoint-controller, part 2.
Some checks failed
CI / Run Tests (push) Failing after 1m41s
2024-11-14 18:01:48 -05:00
fccfa4d006 fix: Fixing naming of dewpoint-controller, part 1. 2024-11-14 18:01:18 -05:00
5df08d6c91 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m40s
2024-11-14 17:55:40 -05:00
1c99e4861d feat: Run CI
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 17:48:07 -05:00
a0b7053eae feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 33s
2024-11-14 17:41:42 -05:00
df3ed6a407 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 17:40:47 -05:00
1d9d8dc449 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m6s
2024-11-14 17:17:45 -05:00
9a53d36f4c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 54s
2024-11-14 17:16:00 -05:00
44a6a878eb feat: Run CI
All checks were successful
CI / Run Tests (push) Successful in 33s
2024-11-14 17:12:34 -05:00
c13a1a14a3 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m39s
2024-11-14 17:09:31 -05:00
6c916215ea feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m37s
2024-11-14 17:02:35 -05:00
be7442c06a feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 17:00:41 -05:00
26a30c2a07 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 9s
2024-11-14 16:59:39 -05:00
5f131d8fa2 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 16:58:22 -05:00
d6e217f556 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 16:55:52 -05:00
b39ccafc92 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 9s
2024-11-14 16:54:04 -05:00
8336c56adf feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 14s
2024-11-14 16:41:24 -05:00
fac8945386 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 36s
2024-11-14 16:38:42 -05:00
5b319cae9b feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 7s
2024-11-14 16:37:35 -05:00
ca7024cb60 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 16:32:48 -05:00
ce327a6f1c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 15:11:29 -05:00
95f8565cde feat: Adds initial CI
Some checks failed
CI / Test (push) Failing after 39s
2024-11-14 15:05:09 -05:00
163f603b69 feat: Fixes some tests and docker builds 2024-11-14 14:58:09 -05:00
e7a849b003 feat: Adding more tests 2024-11-14 07:43:40 -05:00
bd2a798320 feat: Seperates connection stream and moves connection manager out of the connection service module. 2024-11-13 17:12:56 -05:00
b8992b89b6 feat: Adds MQTT connection stream, need to clean up the manager and remove stream from it. 2024-11-13 10:06:28 -05:00
efd9907b4a feat: Cleans up some of the shutdown logic so that the MQTTClient is disconnected properly. 2024-11-12 22:19:09 -05:00
fbbd65f7ae feat: Cleaning up some unused code. 2024-11-12 21:18:02 -05:00
8067331ff8 feat: Removes sensor client in favor of more generic topic listener and publisher 2024-11-12 16:42:14 -05:00
b6db9b5322 feat: Begins breaking out topic listener and publisher as it's own dependency. 2024-11-12 11:12:34 -05:00
bf1126b06a feat: Adds MQTTConnectionManagerLive module. 2024-11-11 22:00:14 -05:00
ef552fb8bc feat: Removes unused files 2024-11-11 16:28:11 -05:00
1e62d7aac0 feat: Adds sensor client dependency 2024-11-11 15:23:45 -05:00
f68ac528e4 feat: Working on sensor client dependency 2024-11-10 22:27:59 -05:00
10294801fc feat: Working on sensor client dependency 2024-11-10 21:16:56 -05:00
a65605e9e7 feat: Adds graceful shutdown to services 2024-11-10 01:30:22 -05:00
320a733d12 feat: Removing old libraries 2024-11-09 12:15:59 -05:00
936dd0b816 feat: Fixes tests for sensor service since using newer psychrometrics version. 2024-11-09 11:51:20 -05:00
a87addaf0b feat: Updates to newer psychrometrics package. Not yet a working example. 2024-11-09 11:35:30 -05:00
e2683d3f06 feat: Adds MQTTConnectionServiceTests 2024-11-09 10:30:25 -05:00
6c5115dcde feat: Adds MQTTConnectionServiceTests 2024-11-09 09:42:03 -05:00
90c5b7c77f feat: Adds MQTTConnectionService 2024-11-09 09:03:31 -05:00
79bb162434 feat: Updates for Sendable conformance 2024-11-09 01:16:57 -05:00
529b9b0bc5 feat: Begins removing old interfaces and renaming 2024-11-09 00:17:28 -05:00
48d51419d7 feat: Adds SensorsService 2024-11-08 23:52:01 -05:00
adc7fc1295 feat: Working on async integrations. 2024-11-08 17:14:22 -05:00
f40c4ef859 feat: Working on async integrations. 2024-11-08 13:35:46 -05:00
e6d1d4578d feat: Working on async integrations. 2024-11-08 11:05:07 -05:00
408e0484cd feat: Begins more work on async integration 2024-11-07 17:10:21 -05:00
19b2eb42c5 feat: Commit due to moving to nas 2024-11-07 14:31:02 -05:00
7122fc818b feat: Commit due to moving to nas 2024-11-07 10:17:13 -05:00
53 changed files with 1725 additions and 1687 deletions

0
.dockerignore Normal file → Executable file
View File

7
.editorconfig Normal file
View File

@@ -0,0 +1,7 @@
root = true
[*.swift]
indent_style = space
indent_size = 2
tab_width = 2
trim_trailing_whitespace = true

21
.gitea/workflows/ci.yaml Normal file
View File

@@ -0,0 +1,21 @@
---
name: CI
on:
push:
pull_request:
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Setup Docker buildx
uses: docker/setup-buildx-action@v3
- name: Run Test
run: make test-docker
- name: Cleanup.
if: always()
run: docker compose --file docker/docker-compose-test.yaml down

5
.github/workflows/release.yml vendored Normal file → Executable file
View File

@@ -35,10 +35,6 @@ jobs:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker-setup-buildx-action@v2
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
- name: Extract metadata (tags, labels) for Docker
id: meta
@@ -60,7 +56,6 @@ jobs:
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/amd64,linux/arm64
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
- name: Generate artifact attestation

1
.gitignore vendored Normal file → Executable file
View File

@@ -9,3 +9,4 @@ DerivedData/
.topics
mqtt_password.txt
.env
.smbdelete*

11
.swiftformat Normal file
View File

@@ -0,0 +1,11 @@
--self init-only
--indent 2
--ifdef indent
--trimwhitespace always
--wraparguments before-first
--wrapparameters before-first
--wrapcollections preserve
--wrapconditions after-first
--typeblanklines preserve
--commas inline
--stripunusedargs closure-only

View File

0
.swiftpm/xcode/xcshareddata/xcschemes/Client.xcscheme Normal file → Executable file
View File

View File

View File

0
.swiftpm/xcode/xcshareddata/xcschemes/EnvVars.xcscheme Normal file → Executable file
View File

0
.swiftpm/xcode/xcshareddata/xcschemes/Models.xcscheme Normal file → Executable file
View File

View File

@@ -146,6 +146,20 @@
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "SensorsService"
BuildableName = "SensorsService"
BlueprintName = "SensorsService"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
@@ -174,6 +188,26 @@
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "MQTTConnectionServiceTests"
BuildableName = "MQTTConnectionServiceTests"
BlueprintName = "MQTTConnectionServiceTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "SensorsServiceTests"
BuildableName = "SensorsServiceTests"
BlueprintName = "SensorsServiceTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction

View File

0
Bootstrap/dewPoint-env-example Normal file → Executable file
View File

0
Bootstrap/topics-example Normal file → Executable file
View File

View File

@@ -1,14 +0,0 @@
# Build the executable
FROM swift:5.10 AS build
WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build --enable-test-discovery -c release -Xswiftc -g
# Run image
FROM swift:5.10-slim
WORKDIR /run
COPY --from=build /build/.build/release/dewPoint-controller /run
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]

18
Makefile Normal file → Executable file
View File

@@ -8,17 +8,15 @@ bootstrap-topics:
bootstrap: bootstrap-env bootstrap-topics
build:
@swift build
@swift build -Xswiftc -strict-concurrency=complete
clean:
rm -rf .build
run:
@swift run dewPoint-controller
start-mosquitto:
@docker-compose start mosquitto
stop-mosquitto:
@docker-compose rm -f mosquitto || true
@swift run dewpoint-controller
test-docker:
@docker-compose run -i test
@docker-compose kill mosquitto-test
@docker compose --file docker/docker-compose-test.yaml \
run --build --remove-orphans -i --rm test
@docker compose --file docker/docker-compose-test.yaml down

223
Package.resolved Normal file → Executable file
View File

@@ -1,61 +1,168 @@
{
"object": {
"pins": [
{
"package": "mqtt-nio",
"repositoryURL": "https://github.com/swift-server-community/mqtt-nio.git",
"state": {
"branch": null,
"revision": "ca8af7a30c4690456ce7de276cd0f037489ba707",
"version": "2.5.3"
}
},
{
"package": "swift-log",
"repositoryURL": "https://github.com/apple/swift-log.git",
"state": {
"branch": null,
"revision": "5d66f7ba25daf4f94100e7022febf3c75e37a6c7",
"version": "1.4.2"
}
},
{
"package": "swift-nio",
"repositoryURL": "https://github.com/apple/swift-nio",
"state": {
"branch": null,
"revision": "6aa9347d9bc5bbfe6a84983aec955c17ffea96ef",
"version": "2.33.0"
}
},
{
"package": "swift-nio-ssl",
"repositoryURL": "https://github.com/apple/swift-nio-ssl.git",
"state": {
"branch": null,
"revision": "b5260a31c2a72a89fa684f5efb3054d8725a2316",
"version": "2.18.0"
}
},
{
"package": "swift-nio-transport-services",
"repositoryURL": "https://github.com/apple/swift-nio-transport-services.git",
"state": {
"branch": null,
"revision": "8ab824b140d0ebcd87e9149266ddc353e3705a3e",
"version": "1.11.4"
}
},
{
"package": "swift-psychrometrics",
"repositoryURL": "https://github.com/swift-psychrometrics/swift-psychrometrics",
"state": {
"branch": null,
"revision": "03573545c3750b406921eb22a9575c8062beef88",
"version": "0.1.2"
}
"originHash" : "33fdcea7245de36c7e638047a16bba6605bc9bac0117aab7cb9397289a33214e",
"pins" : [
{
"identity" : "combine-schedulers",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/combine-schedulers",
"state" : {
"revision" : "9fa31f4403da54855f1e2aeaeff478f4f0e40b13",
"version" : "1.0.2"
}
]
},
"version": 1
},
{
"identity" : "mqtt-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-server-community/mqtt-nio.git",
"state" : {
"revision" : "267b83ab5690d463ff00585a4fd6dc54b698e1d2",
"version" : "2.11.0"
}
},
{
"identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-async-algorithms.git",
"state" : {
"revision" : "5c8bd186f48c16af0775972700626f0b74588278",
"version" : "1.0.2"
}
},
{
"identity" : "swift-atomics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"state" : {
"revision" : "cd142fd2f64be2100422d658e7411e39489da985",
"version" : "1.2.0"
}
},
{
"identity" : "swift-clocks",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-clocks",
"state" : {
"revision" : "b9b24b69e2adda099a1fa381cda1eeec272d5b53",
"version" : "1.0.5"
}
},
{
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-collections.git",
"state" : {
"revision" : "671108c96644956dddcd89dd59c203dcdb36cec7",
"version" : "1.1.4"
}
},
{
"identity" : "swift-concurrency-extras",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-concurrency-extras",
"state" : {
"revision" : "163409ef7dae9d960b87f34b51587b6609a76c1f",
"version" : "1.3.0"
}
},
{
"identity" : "swift-dependencies",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-dependencies",
"state" : {
"revision" : "96eecd47660e8307877acb8c41cc5295ba7350a7",
"version" : "1.5.2"
}
},
{
"identity" : "swift-log",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-log.git",
"state" : {
"revision" : "9cb486020ebf03bfa5b5df985387a14a98744537",
"version" : "1.6.1"
}
},
{
"identity" : "swift-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio",
"state" : {
"revision" : "914081701062b11e3bb9e21accc379822621995e",
"version" : "2.76.1"
}
},
{
"identity" : "swift-nio-ssl",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-ssl.git",
"state" : {
"revision" : "b5260a31c2a72a89fa684f5efb3054d8725a2316",
"version" : "2.18.0"
}
},
{
"identity" : "swift-nio-transport-services",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-transport-services.git",
"state" : {
"revision" : "8ab824b140d0ebcd87e9149266ddc353e3705a3e",
"version" : "1.11.4"
}
},
{
"identity" : "swift-psychrometrics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-psychrometrics/swift-psychrometrics",
"state" : {
"revision" : "6a457f3cefd9477f7aa76b2fb8ad557988c447bd",
"version" : "0.2.3"
}
},
{
"identity" : "swift-service-lifecycle",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-server/swift-service-lifecycle.git",
"state" : {
"revision" : "f70b838872863396a25694d8b19fe58bcd0b7903",
"version" : "2.6.2"
}
},
{
"identity" : "swift-syntax",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftlang/swift-syntax",
"state" : {
"revision" : "0687f71944021d616d34d922343dcef086855920",
"version" : "600.0.1"
}
},
{
"identity" : "swift-system",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-system.git",
"state" : {
"revision" : "c8a44d836fe7913603e246acab7c528c2e780168",
"version" : "1.4.0"
}
},
{
"identity" : "swift-tagged",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-tagged",
"state" : {
"revision" : "3907a9438f5b57d317001dc99f3f11b46882272b",
"version" : "0.10.0"
}
},
{
"identity" : "xctest-dynamic-overlay",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
"state" : {
"revision" : "770f990d3e4eececb57ac04a6076e22f8c97daeb",
"version" : "1.4.2"
}
}
],
"version" : 3
}

125
Package.swift Normal file → Executable file
View File

@@ -2,99 +2,98 @@
import PackageDescription
let swiftSettings: [SwiftSetting] = [
.enableExperimentalFeature("StrictConcurrency"),
.enableUpcomingFeature("InferSendableCaptures")
]
let package = Package(
name: "dewPoint-controller",
name: "dewpoint-controller",
platforms: [
.macOS(.v12)
.macOS(.v14)
],
products: [
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]),
.library(name: "Bootstrap", targets: ["Bootstrap"]),
.library(name: "DewPointEnvironment", targets: ["DewPointEnvironment"]),
.library(name: "EnvVars", targets: ["EnvVars"]),
.executable(name: "dewpoint-controller", targets: ["DewPointController"]),
.library(name: "Models", targets: ["Models"]),
.library(name: "Client", targets: ["Client"]),
.library(name: "ClientLive", targets: ["ClientLive"]),
.library(name: "MQTTManager", targets: ["MQTTManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"])
],
dependencies: [
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", from: "0.1.0")
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"),
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
],
targets: [
.executableTarget(
name: "dewPoint-controller",
name: "DewPointController",
dependencies: [
"Bootstrap",
"ClientLive",
"TopicsLive",
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio")
]
),
.testTarget(
name: "dewPoint-controllerTests",
dependencies: ["dewPoint-controller"]
),
.target(
name: "Bootstrap",
dependencies: [
"DewPointEnvironment",
"EnvVars",
"ClientLive",
"Models",
"MQTTManager",
"MQTTConnectionService",
"SensorsService",
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio")
.product(name: "NIO", package: "swift-nio"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.target(
name: "DewPointEnvironment",
dependencies: [
"EnvVars",
"Client",
"Models",
.product(name: "MQTTNIO", package: "mqtt-nio"),
]
),
.target(
name: "EnvVars",
dependencies: []
),
.target(
name: "Models",
dependencies: [
.product(name: "Psychrometrics", package: "swift-psychrometrics"),
]
.product(name: "Logging", package: "swift-log"),
.product(name: "PsychrometricClient", package: "swift-psychrometrics")
],
swiftSettings: swiftSettings
),
.target(
name: "Client",
name: "MQTTManager",
dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
),
.target(
name: "MQTTConnectionService",
dependencies: [
"Models",
.product(name: "CoreUnitTypes", package: "swift-psychrometrics"),
.product(name: "NIO", package: "swift-nio"),
.product(name: "Psychrometrics", package: "swift-psychrometrics")
]
),
.target(
name: "ClientLive",
dependencies: [
"Client",
"EnvVars",
.product(name: "MQTTNIO", package: "mqtt-nio")
]
"MQTTManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
),
.testTarget(
name: "ClientTests",
name: "MQTTConnectionServiceTests",
dependencies: [
"Client",
"ClientLive"
"MQTTConnectionService",
"MQTTManager",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
]
),
.target(
name: "TopicsLive",
name: "SensorsService",
dependencies: [
"Models"
]
"Models",
"MQTTManager",
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
),
.testTarget(
name: "SensorsServiceTests",
dependencies: [
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
)
]
)

8
README.md Normal file → Executable file
View File

@@ -1,3 +1,7 @@
# dewPoint-controller
# dewpoint-controller
A description of this package.
![CI](https://git.housh.dev/michael/swift-mqtt-dewpoint/actions/workflows/ci.yaml/badge.svg?branch=main)
Listens to an MQTT broker for temperature and humidity sensors and calculates
the dew-point temperature and enthalpy for the sensor, then publishes those back
to the MQTT broker.

View File

@@ -1,167 +0,0 @@
import ClientLive
import DewPointEnvironment
import EnvVars
import Logging
import Foundation
import Models
import MQTTNIO
import NIO
/// Sets up the application environment and connections required.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for debugging.
/// - autoConnect: A flag whether to auto-connect to the MQTT broker or not.
public func bootstrap(
eventLoopGroup: EventLoopGroup,
logger: Logger? = nil,
autoConnect: Bool = true
) -> EventLoopFuture<DewPointEnvironment> {
logger?.debug("Bootstrapping Dew Point Controller...")
return loadEnvVars(eventLoopGroup: eventLoopGroup, logger: logger)
.and(loadTopics(eventLoopGroup: eventLoopGroup, logger: logger))
.makeDewPointEnvironment(eventLoopGroup: eventLoopGroup, logger: logger)
.connectToMQTTBroker(autoConnect: autoConnect, logger: logger)
}
/// Loads the ``EnvVars`` either using the defualts, from a file in the root directory under `.dewPoint-env` or in the shell / application environment.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for debugging.
private func loadEnvVars(
eventLoopGroup: EventLoopGroup,
logger: Logger?
) -> EventLoopFuture<EnvVars> {
logger?.debug("Loading env vars...")
// TODO: Need to have the env file path passed in / dynamic.
let envFilePath = URL(fileURLWithPath: #file)
.deletingLastPathComponent()
.deletingLastPathComponent()
.deletingLastPathComponent()
.appendingPathComponent(".dewPoint-env")
let decoder = JSONDecoder()
let encoder = JSONEncoder()
let defaultEnvVars = EnvVars()
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
// Read from file `.dewPoint-env` file if it exists.
let localEnvVarsDict = (try? Data(contentsOf: envFilePath))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
// Merge with variables in the shell environment.
let envVarsDict = defaultEnvDict
.merging(localEnvVarsDict, uniquingKeysWith: { $1 })
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
// Produces the final env vars from the merged items or uses defaults if something
// went wrong.
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
logger?.debug("Done loading env vars...")
return eventLoopGroup.next().makeSucceededFuture(envVars)
}
// MARK: TODO perhaps make loading from file an option passed in when app is launched.
/// Load the topics from file in application root directory at `.topics`, if available or fall back to the defualt.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for debugging.
private func loadTopics(eventLoopGroup: EventLoopGroup, logger: Logger?) -> EventLoopFuture<Topics> {
logger?.debug("Loading topics from file...")
let topicsFilePath = URL(fileURLWithPath: #file)
.deletingLastPathComponent()
.deletingLastPathComponent()
.deletingLastPathComponent()
.appendingPathComponent(".topics")
let decoder = JSONDecoder()
// Attempt to load the topics from file in root directory.
let localTopics = (try? Data.init(contentsOf: topicsFilePath))
.flatMap { try? decoder.decode(Topics.self, from: $0) }
logger?.debug(
localTopics == nil
? "Failed to load topics from file, falling back to defaults."
: "Done loading topics from file."
)
// If we were able to load from file use that, else fallback to the defaults.
return eventLoopGroup.next().makeSucceededFuture(localTopics ?? .init())
}
extension EventLoopFuture where Value == (EnvVars, Topics) {
/// Creates the ``DewPointEnvironment`` for the application after the ``EnvVars`` have been loaded.
///
/// - Parameters:
/// - eventLoopGroup: The event loop group for the application.
/// - logger: An optional logger for the application.
fileprivate func makeDewPointEnvironment(
eventLoopGroup: EventLoopGroup,
logger: Logger?
) -> EventLoopFuture<DewPointEnvironment> {
map { envVars, topics in
let mqttClient = MQTTClient(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: logger)
return DewPointEnvironment.init(
envVars: envVars,
mqttClient: mqttClient,
topics: topics
)
}
}
}
extension EventLoopFuture where Value == DewPointEnvironment {
/// Connects to the MQTT broker after the ``DewPointEnvironment`` has been setup.
///
/// - Parameters:
/// - logger: An optional logger for debugging.
fileprivate func connectToMQTTBroker(autoConnect: Bool, logger: Logger?) -> EventLoopFuture<DewPointEnvironment> {
guard autoConnect else { return self }
return flatMap { environment in
logger?.debug("Connecting to MQTT Broker...")
return environment.mqttClient.connect()
.map { _ in
logger?.debug("Successfully connected to MQTT Broker...")
return environment
}
}
}
}
extension MQTTNIO.MQTTClient {
fileprivate convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .v5_0,
userName: envVars.userName,
password: envVars.password
)
)
}
}

View File

@@ -1,44 +0,0 @@
import CoreUnitTypes
import Logging
import Foundation
import Models
import NIO
import Psychrometrics
public struct Client {
/// Add the publish listeners to the MQTT Broker, to be notified of published changes.
public var addListeners: () -> Void
/// Connect to the MQTT Broker.
public var connect: () -> EventLoopFuture<Void>
public var publishSensor: (SensorPublishRequest) -> EventLoopFuture<Void>
/// Subscribe to appropriate topics / events.
public var subscribe: () -> EventLoopFuture<Void>
/// Disconnect and close the connection to the MQTT Broker.
public var shutdown: () -> EventLoopFuture<Void>
public init(
addListeners: @escaping () -> Void,
connect: @escaping () -> EventLoopFuture<Void>,
publishSensor: @escaping (SensorPublishRequest) -> EventLoopFuture<Void>,
shutdown: @escaping () -> EventLoopFuture<Void>,
subscribe: @escaping () -> EventLoopFuture<Void>
) {
self.addListeners = addListeners
self.connect = connect
self.publishSensor = publishSensor
self.shutdown = shutdown
self.subscribe = subscribe
}
public enum SensorPublishRequest {
case mixed(State.Sensors.TemperatureHumiditySensor<State.Sensors.Mixed>)
case postCoil(State.Sensors.TemperatureHumiditySensor<State.Sensors.PostCoil>)
case `return`(State.Sensors.TemperatureHumiditySensor<State.Sensors.Return>)
case supply(State.Sensors.TemperatureHumiditySensor<State.Sensors.Supply>)
}
}

View File

@@ -1,261 +0,0 @@
import CoreUnitTypes
import Logging
import Models
import MQTTNIO
import NIO
import NIOFoundationCompat
import Psychrometrics
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: inout ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let string = buffer.readString(
length: buffer.readableBytes,
encoding: String.Encoding.utf8
)
else { return nil }
self.init(string)
}
}
extension Temperature: BufferInitalizable {
/// Attempt to create / parse a temperature from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value, units: .celsius)
}
}
extension RelativeHumidity: BufferInitalizable {
/// Attempt to create / parse a relative humidity from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension MQTTNIO.MQTTClient {
/// Logs a failure for a given topic and error.
func logFailure(topic: String, error: Error) {
logger.error("\(topic): \(error)")
}
}
extension Result where Success == MQTTPublishInfo {
func logIfFailure(client: MQTTNIO.MQTTClient, topic: String) -> ByteBuffer? {
switch self {
case let .success(value):
guard value.topicName == topic else { return nil }
return value.payload
case let .failure(error):
client.logFailure(topic: topic, error: error)
return nil
}
}
}
extension Optional where Wrapped == ByteBuffer {
func parse<T>(as type: T.Type) -> T? where T: BufferInitalizable {
switch self {
case var .some(buffer):
return T.init(buffer: &buffer)
case .none:
return nil
}
}
}
fileprivate struct TemperatureAndHumiditySensorKeyPathEnvelope {
let humidityTopic: KeyPath<Topics.Sensors, String>
let temperatureTopic: KeyPath<Topics.Sensors, String>
let temperatureState: WritableKeyPath<State.Sensors, Temperature?>
let humidityState: WritableKeyPath<State.Sensors, RelativeHumidity?>
func addListener(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
let temperatureTopic = topics.sensors[keyPath: temperatureTopic]
client.logger.trace("Adding listener for topic: \(temperatureTopic)")
client.addPublishListener(named: temperatureTopic) { result in
result.logIfFailure(client: client, topic: temperatureTopic)
.parse(as: Temperature.self)
.map { temperature in
state.sensors[keyPath: temperatureState] = temperature
}
}
let humidityTopic = topics.sensors[keyPath: humidityTopic]
client.logger.trace("Adding listener for topic: \(humidityTopic)")
client.addPublishListener(named: humidityTopic) { result in
result.logIfFailure(client: client, topic: humidityTopic)
.parse(as: RelativeHumidity.self)
.map { humidity in
state.sensors[keyPath: humidityState] = humidity
}
}
}
}
extension Array where Element == TemperatureAndHumiditySensorKeyPathEnvelope {
func addListeners(to client: MQTTNIO.MQTTClient, topics: Topics, state: State) {
_ = self.map { envelope in
envelope.addListener(to: client, topics: topics, state: state)
}
}
}
extension Array where Element == MQTTSubscribeInfo {
static func sensors(topics: Topics) -> Self {
[
.init(topicFilter: topics.sensors.mixedAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.mixedAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.postCoilSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.returnAirSensor.humidity, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.temperature, qos: .atLeastOnce),
.init(topicFilter: topics.sensors.supplyAirSensor.humidity, qos: .atLeastOnce),
]
}
}
extension State {
func addSensorListeners(to client: MQTTNIO.MQTTClient, topics: Topics) {
let envelopes: [TemperatureAndHumiditySensorKeyPathEnvelope] = [
.init(
humidityTopic: \.mixedAirSensor.humidity,
temperatureTopic: \.mixedAirSensor.temperature,
temperatureState: \.mixedAirSensor.temperature,
humidityState: \.mixedAirSensor.humidity
),
.init(
humidityTopic: \.postCoilSensor.humidity,
temperatureTopic: \.postCoilSensor.temperature,
temperatureState: \.postCoilSensor.temperature,
humidityState: \.postCoilSensor.humidity
),
.init(
humidityTopic: \.returnAirSensor.humidity,
temperatureTopic: \.returnAirSensor.temperature,
temperatureState: \.returnAirSensor.temperature,
humidityState: \.returnAirSensor.humidity
),
.init(
humidityTopic: \.supplyAirSensor.humidity,
temperatureTopic: \.supplyAirSensor.temperature,
temperatureState: \.supplyAirSensor.temperature,
humidityState: \.supplyAirSensor.humidity
),
]
envelopes.addListeners(to: client, topics: topics, state: self)
}
}
extension Client.SensorPublishRequest {
func dewPointData(topics: Topics, units: PsychrometricEnvironment.Units?) -> (DewPoint, String)? {
switch self {
case let .mixed(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.mixedAirSensor.dewPoint)
case let .postCoil(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.postCoilSensor.dewPoint)
case let .return(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.returnAirSensor.dewPoint)
case let .supply(sensor):
guard let dp = sensor.dewPoint(units: units) else { return nil }
return (dp, topics.sensors.supplyAirSensor.dewPoint)
}
}
func enthalpyData(altitude: Length, topics: Topics, units: PsychrometricEnvironment.Units?) -> (EnthalpyOf<MoistAir>, String)? {
switch self {
case let .mixed(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.mixedAirSensor.enthalpy)
case let .postCoil(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.postCoilSensor.enthalpy)
case let .return(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.returnAirSensor.enthalpy)
case let .supply(sensor):
guard let enthalpy = sensor.enthalpy(altitude: altitude, units: units) else { return nil }
return (enthalpy, topics.sensors.supplyAirSensor.enthalpy)
}
}
func setHasProcessed(state: State) {
switch self {
case .mixed:
state.sensors.mixedAirSensor.needsProcessed = false
case .postCoil:
state.sensors.postCoilSensor.needsProcessed = false
case .return:
state.sensors.returnAirSensor.needsProcessed = false
case .supply:
state.sensors.supplyAirSensor.needsProcessed = false
}
}
}
extension MQTTNIO.MQTTClient {
func publishDewPoint(
request: Client.SensorPublishRequest,
state: State,
topics: Topics
) -> EventLoopFuture<(MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics)> {
guard let (dewPoint, topic) = request.dewPointData(topics: topics, units: state.units)
else {
logger.trace("No dew point for sensor.")
return eventLoopGroup.next().makeSucceededFuture((self, request, state, topics))
}
let roundedDewPoint = round(dewPoint.rawValue * 100) / 100
logger.debug("Publishing dew-point: \(dewPoint), to: \(topic)")
return publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(roundedDewPoint)"),
qos: .atLeastOnce,
retain: true
)
.map { (self, request, state, topics) }
}
}
extension EventLoopFuture where Value == (Client.SensorPublishRequest, State) {
func setHasProcessed() -> EventLoopFuture<Void> {
map { request, state in
request.setHasProcessed(state: state)
}
}
}
extension EventLoopFuture where Value == (MQTTNIO.MQTTClient, Client.SensorPublishRequest, State, Topics) {
func publishEnthalpy() -> EventLoopFuture<(Client.SensorPublishRequest, State)> {
flatMap { client, request, state, topics in
guard let (enthalpy, topic) = request.enthalpyData(altitude: state.altitude, topics: topics, units: state.units)
else {
client.logger.trace("No enthalpy for sensor.")
return client.eventLoopGroup.next().makeSucceededFuture((request, state))
}
let roundedEnthalpy = round(enthalpy.rawValue * 100) / 100
client.logger.debug("Publishing enthalpy: \(enthalpy), to: \(topic)")
return client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(roundedEnthalpy)"),
qos: .atLeastOnce
)
.map { (request, state) }
}
}
}

View File

@@ -1,158 +0,0 @@
import Foundation
@_exported import Client
import CoreUnitTypes
import Models
import MQTTNIO
import NIO
import Psychrometrics
extension Client {
// The state passed in here needs to be a class or we get escaping errors in the `addListeners` method.
public static func live(
client: MQTTNIO.MQTTClient,
state: State,
topics: Topics
) -> Self {
.init(
addListeners: {
state.addSensorListeners(to: client, topics: topics)
},
connect: {
client.connect()
.map { _ in }
},
publishSensor: { request in
client.publishDewPoint(request: request, state: state, topics: topics)
.publishEnthalpy()
.setHasProcessed()
},
shutdown: {
client.disconnect()
.map { try? client.syncShutdownGracefully() }
},
subscribe: {
// Sensor subscriptions
client.subscribe(to: .sensors(topics: topics))
.map { _ in }
}
)
}
}
import Logging
import NIOTransportServices
import EnvVars
public class AsyncClient {
//public static let eventLoopGroup = NIOTSEventLoopGroup()
public static let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
public let client: MQTTClient
public private(set) var shuttingDown: Bool
var logger: Logger { client.logger }
public init(envVars: EnvVars, logger: Logger) {
let config = MQTTClient.Configuration.init(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
self.client = .init(
host: envVars.host,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(Self.eventLoopGroup),
logger: logger,
configuration: config
)
self.shuttingDown = false
}
public func connect() async {
do {
try await self.client.connect()
self.client.addCloseListener(named: "AsyncClient") { [self] result in
guard !self.shuttingDown else { return }
Task {
self.logger.debug("Connection closed.")
self.logger.debug("Reconnecting...")
await self.connect()
}
}
logger.debug("Connection successful.")
} catch {
logger.trace("Connection Failed.\n\(error)")
}
}
public func shutdown() async {
self.shuttingDown = true
try? await self.client.disconnect()
try? await self.client.shutdown()
}
func addSensorListeners() async {
}
// Need to save the recieved values somewhere.
func addPublishListener<T>(
topic: String,
decoding: T.Type
) async throws where T: BufferInitalizable {
_ = try await self.client.subscribe(to: [.init(topicFilter: topic, qos: .atLeastOnce)])
Task {
let listener = self.client.createPublishListener()
for await result in listener {
switch result {
case let .success(packet):
var buffer = packet.payload
guard let value = T.init(buffer: &buffer) else {
logger.debug("Could not decode buffer: \(buffer)")
return
}
logger.debug("Recieved value: \(value)")
case let .failure(error):
logger.trace("Error:\n\(error)")
}
}
}
}
private func publish(string: String, to topic: String) async throws {
try await self.client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: string),
qos: .atLeastOnce
)
}
private func publish(double: Double, to topic: String) async throws {
let rounded = round(double * 100) / 100
try await publish(string: "\(rounded)", to: topic)
}
func publishDewPoint(_ request: Client.SensorPublishRequest) async throws {
// fix
guard let (dewPoint, topic) = request.dewPointData(topics: .init(), units: nil) else { return }
try await self.publish(double: dewPoint.rawValue, to: topic)
logger.debug("Published dewpoint: \(dewPoint.rawValue), to: \(topic)")
}
func publishEnthalpy(_ request: Client.SensorPublishRequest) async throws {
// fix
guard let (enthalpy, topic) = request.enthalpyData(altitude: .seaLevel, topics: .init(), units: nil) else { return }
try await self.publish(double: enthalpy.rawValue, to: topic)
logger.debug("Publihsed enthalpy: \(enthalpy.rawValue), to: \(topic)")
}
public func publishSensor(_ request: Client.SensorPublishRequest) async throws {
try await publishDewPoint(request)
try await publishEnthalpy(request)
}
}

View File

@@ -0,0 +1,116 @@
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionService
import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
@main
struct Application {
/// The main entry point of the application.
static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var logger = Logger(label: "dewpoint-controller")
logger.logLevel = .trace
logger.info("Starting dewpoint-controller!")
let environment = loadEnvVars(logger: logger)
if environment.appEnv == .production {
logger.debug("Updating logging level to info.")
logger.logLevel = .info
}
let mqtt = MQTTClient(
envVars: environment,
eventLoopGroup: eventloopGroup,
logger: logger
)
do {
try await withDependencies {
$0.psychrometricClient = .liveValue
$0.mqtt = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
try await serviceGroup.run()
}
try await mqtt.shutdown()
try await eventloopGroup.shutdownGracefully()
} catch {
try await eventloopGroup.shutdownGracefully()
}
}
}
// MARK: - Helpers
private func loadEnvVars(logger: Logger) -> EnvVars {
let defaultEnvVars = EnvVars()
let encoder = JSONEncoder()
let decoder = JSONDecoder()
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
let envVarsDict = defaultEnvDict
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
logger.debug("Done loading EnvVars...")
return envVars
}
private extension MQTTNIO.MQTTClient {
convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .v3_1_1,
disablePing: false,
userName: envVars.userName,
password: envVars.password
)
)
}
}
private extension Array where Element == TemperatureAndHumiditySensor {
static var live: Self {
TemperatureAndHumiditySensor.Location.allCases.map { location in
TemperatureAndHumiditySensor(location: location)
}
}
}

View File

@@ -1,21 +0,0 @@
import Client
import EnvVars
import Models
import MQTTNIO
public struct DewPointEnvironment {
public var envVars: EnvVars
public var mqttClient: MQTTNIO.MQTTClient
public var topics: Topics
public init(
envVars: EnvVars,
mqttClient: MQTTNIO.MQTTClient,
topics: Topics = .init()
) {
self.envVars = envVars
self.mqttClient = mqttClient
self.topics = topics
}
}

View File

@@ -0,0 +1,37 @@
import Dependencies
import Logging
import Models
import MQTTManager
import ServiceLifecycle
public struct MQTTConnectionService: Service {
@Dependency(\.mqtt) var mqtt
private let logger: Logger?
public init(
logger: Logger? = nil
) {
self.logger = logger
}
/// The entry-point of the service which starts the connection
/// to the MQTT broker and handles graceful shutdown of the
/// connection.
public func run() async throws {
try await withGracefulShutdownHandler {
try await mqtt.connect()
for await event in try mqtt.connectionStream().cancelOnGracefulShutdown() {
// We don't really need to do anything with the events, so just logging
// for now. But we need to iterate on an async stream for the service to
// continue to run and handle graceful shutdowns.
logger?.trace("Received connection event: \(event)")
}
// when we reach here we are shutting down, so we shutdown
// the manager.
mqtt.shutdown()
} onGracefulShutdown: {
self.logger?.trace("Received graceful shutdown.")
}
}
}

View File

@@ -0,0 +1,197 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqtt: MQTTManager {
get { self[MQTTManager.self] }
set { self[MQTTManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTManager: Sendable {
public typealias ListenStream = AsyncStream<MQTTPublishInfo>
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Create a stream of connection events.
public var connectionStream: @Sendable () throws -> AsyncStream<Event>
private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream
/// Publish a value to the MQTT broker for a given topic.
public var publish: @Sendable (PublishRequest) async throws -> Void
/// Shutdown the connection to the MQTT broker.
public var shutdown: @Sendable () -> Void
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await listen(to: topics, qos: qos)
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - payload: The value to publish.
/// - topicName: The topic to publish the new value to.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
_ payload: ByteBuffer,
to topicName: String,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
public extension MQTTManager {
/// Create the live manager.
///
static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init(
connect: { try await manager.connect(cleanSession: cleanSession) },
connectionStream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
},
_listen: { topics, qos in
try await manager.listen(to: topics, qos: qos)
},
publish: { request in
let topic = request.topicName
guard client.isActive() else {
logger?.debug("Client is not active, unable to publish to topic: \(topic)")
return
}
logger?.trace("Begin publishing to topic: \(topic)")
defer { logger?.trace("Done publishing to topic: \(topic)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
},
shutdown: {
manager.shutdown()
},
_withClient: { callback in
try await callback(client)
}
)
}
}
extension MQTTManager: TestDependencyKey {
public static let testValue: MQTTManager = Self()
}

View File

@@ -0,0 +1,98 @@
import Foundation
import Logging
import MQTTNIO
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
private var listeners: [TopicListenerStream] = []
private var isShuttingDown = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
var logger = logger
logger?[metadataKey: "instance"] = "\(Self.self)"
self.logger = logger
self.client = client
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
if !isShuttingDown {
let message = """
Did not properly close the connection manager. This can lead to
dangling references.
Please call `shutdown` to properly close all connections and listener streams.
"""
logger?.warning("\(message)")
self.shutdown()
}
}
private func setHasConnected() {
hasConnected = true
}
func listen(
to topics: [String],
qos: MQTTQoS
) async throws -> MQTTManager.ListenStream {
let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos)
listeners.append(listener)
await listener.start()
return listener.stream
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
private func shutdownListeners() {
_ = listeners.map { $0.shutdown() }
listeners = []
isShuttingDown = true
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
Task { await shutdownListeners() }
}
}

View File

@@ -0,0 +1,74 @@
import Foundation
import Logging
import MQTTNIO
@_spi(Internal)
public actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
nonisolated let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}

View File

@@ -0,0 +1,177 @@
import Foundation
import Logging
import MQTTNIO
actor TopicListenerStream {
typealias Stream = MQTTManager.ListenStream
private let client: MQTTClient
private let configuration: Configuration
private let continuation: Stream.Continuation
private let logger: Logger?
private let name: String
let stream: Stream
private var shuttingDown: Bool = false
private var onShutdownHandler: (@Sendable () -> Void)?
init(
client: MQTTClient,
logger: Logger?,
topics: [String],
qos: MQTTQoS
) {
// Setup the logger so we can more easily decipher log messages.
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = Stream.makeStream()
self.client = client
self.configuration = .init(qos: qos, topics: topics)
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
struct Configuration: Sendable {
let qos: MQTTQoS
let topics: [String]
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
private func subscribe() async throws {
guard !shuttingDown else { return }
logger?.debug("Begin subscribing to topics.")
do {
_ = try await client.subscribe(to: configuration.topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: configuration.qos)
})
} catch {
logger?.error("Received error while subscribing to topics: \(configuration.topics)")
throw TopicListenerError.failedToSubscribe
}
logger?.debug("Done subscribing to topics.")
}
public func start() {
logger?.trace("Starting listener for topics: \(configuration.topics)")
let stream = MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
let task = Task {
// Listen for connection events to restablish the stream upon a
// client becoming disconnected / reconnected, and properly shutdown
// the stream on the client being shutdown.
for await event in stream {
logger?.trace("Received event: \(event)")
switch event {
case .shuttingDown:
shutdown()
case .disconnected:
try await Task.sleep(for: .milliseconds(100))
case .connected:
try await subscribe()
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.logger?.error("Received error while listening: \(error)")
case let .success(publishInfo):
if self.configuration.topics.contains(publishInfo.topicName) {
self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(publishInfo)
}
}
}
}
}
}
onShutdownHandler = { task.cancel() }
}
// TODO: remove.
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw TopicListenerError.connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
try await subscribe()
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.logger?.error("Received error while listening: \(error)")
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(publishInfo)
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
onShutdownHandler = nil
}
public nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task {
await onShutdownHandler?()
await self.setIsShuttingDown()
}
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -1,29 +1,30 @@
import Foundation
import Logging
/// Holds common settings for connecting to your MQTT broker. The default values can be used,
/// they can be loaded from the shell environment, or from a file located in the root directory.
///
/// This allows us to keep sensitve settings out of the repository.
public struct EnvVars: Codable, Equatable {
public struct EnvVars: Codable, Equatable, Sendable {
/// The current app environment.
public var appEnv: AppEnv
/// The MQTT host.
public var host: String
/// The MQTT port.
public var port: String?
/// The identifier to use when connecting to the MQTT broker.
public var identifier: String
/// The MQTT user name.
public var userName: String?
/// The MQTT user password.
public var password: String?
/// Create a new ``EnvVars``
///
/// - Parameters:
@@ -40,7 +41,7 @@ public struct EnvVars: Codable, Equatable {
identifier: String = "dewPoint-controller",
userName: String? = "mqtt_user",
password: String? = "secret!"
){
) {
self.appEnv = appEnv
self.host = host
self.port = port
@@ -48,7 +49,7 @@ public struct EnvVars: Codable, Equatable {
self.userName = userName
self.password = password
}
/// Custom coding keys.
private enum CodingKeys: String, CodingKey {
case appEnv = "APP_ENV"
@@ -58,9 +59,9 @@ public struct EnvVars: Codable, Equatable {
case userName = "MQTT_USERNAME"
case password = "MQTT_PASSWORD"
}
/// Represents the different app environments.
public enum AppEnv: String, Codable {
public enum AppEnv: String, Codable, Sendable {
case development
case production
case staging

View File

@@ -1,37 +0,0 @@
import CoreUnitTypes
/// Represents the different modes that the controller can be in.
public enum Mode: Equatable {
/// Allows controller to run in humidify or dehumidify mode.
case auto
/// Only handle humidify mode.
case humidifyOnly(HumidifyMode)
/// Only handle dehumidify mode.
case dehumidifyOnly(DehumidifyMode)
/// Don't control humidify or dehumidify modes.
case off
/// Represents the control modes for the humidify control state.
public enum HumidifyMode: Equatable {
/// Control humidifying based off dew-point.
case dewPoint(Temperature)
/// Control humidifying based off relative humidity.
case relativeHumidity(RelativeHumidity)
}
/// Represents the control modes for the dehumidify control state.
public enum DehumidifyMode: Equatable {
/// Control dehumidifying based off dew-point.
case dewPoint(high: Temperature, low: Temperature)
/// Control humidifying based off relative humidity.
case relativeHumidity(high: RelativeHumidity, low: RelativeHumidity)
}
}

View File

@@ -1,104 +0,0 @@
import Foundation
import Psychrometrics
// TODO: Make this a struct, then create a Store class that holds the state??
public final class State {
public var altitude: Length
public var sensors: Sensors
public var units: PsychrometricEnvironment.Units {
didSet {
PsychrometricEnvironment.shared.units = units
}
}
public init(
altitude: Length = .seaLevel,
sensors: Sensors = .init(),
units: PsychrometricEnvironment.Units = .imperial
) {
self.altitude = altitude
self.sensors = sensors
self.units = units
}
public struct Sensors: Equatable {
public var mixedAirSensor: TemperatureHumiditySensor<Mixed>
public var postCoilSensor: TemperatureHumiditySensor<PostCoil>
public var returnAirSensor: TemperatureHumiditySensor<Return>
public var supplyAirSensor: TemperatureHumiditySensor<Supply>
public init(
mixedAirSensor: TemperatureHumiditySensor<Mixed> = .init(),
postCoilSensor: TemperatureHumiditySensor<PostCoil> = .init(),
returnAirSensor: TemperatureHumiditySensor<Return> = .init(),
supplyAirSensor: TemperatureHumiditySensor<Supply> = .init()
) {
self.mixedAirSensor = mixedAirSensor
self.postCoilSensor = postCoilSensor
self.returnAirSensor = returnAirSensor
self.supplyAirSensor = supplyAirSensor
}
public var needsProcessed: Bool {
mixedAirSensor.needsProcessed
|| postCoilSensor.needsProcessed
|| returnAirSensor.needsProcessed
|| supplyAirSensor.needsProcessed
}
}
}
extension State.Sensors {
public struct TemperatureHumiditySensor<Location>: Equatable {
@TrackedChanges
public var temperature: Temperature?
@TrackedChanges
public var humidity: RelativeHumidity?
public var needsProcessed: Bool {
get { $temperature.needsProcessed || $humidity.needsProcessed }
set {
$temperature.needsProcessed = newValue
$humidity.needsProcessed = newValue
}
}
public func dewPoint(units: PsychrometricEnvironment.Units? = nil) -> DewPoint? {
guard let temperature = temperature,
let humidity = humidity,
!temperature.rawValue.isNaN,
!humidity.rawValue.isNaN
else { return nil }
return .init(dryBulb: temperature, humidity: humidity, units: units)
}
public func enthalpy(altitude: Length, units: PsychrometricEnvironment.Units? = nil) -> EnthalpyOf<MoistAir>? {
guard let temperature = temperature,
let humidity = humidity,
!temperature.rawValue.isNaN,
!humidity.rawValue.isNaN
else { return nil }
return .init(dryBulb: temperature, humidity: humidity, altitude: altitude, units: units)
}
public init(
temperature: Temperature? = nil,
humidity: RelativeHumidity? = nil,
needsProcessed: Bool = false
) {
self._temperature = .init(wrappedValue: temperature, needsProcessed: needsProcessed)
self._humidity = .init(wrappedValue: humidity, needsProcessed: needsProcessed)
}
}
// MARK: - Temperature / Humidity Sensor Location Namespaces
public enum Mixed { }
public enum PostCoil { }
public enum Return { }
public enum Supply { }
}

View File

@@ -0,0 +1,141 @@
import Dependencies
import PsychrometricClient
/// Represents a temperature and humidity sensor that can be used to derive
/// the dew-point temperature and enthalpy values.
///
/// > Note: Temperature values are received in `celsius`.
public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
@Dependency(\.psychrometricClient) private var psychrometrics
/// The identifier of the sensor, same as the location.
public var id: Location { location }
/// The altitude of the sensor.
public let altitude: Length
/// The current humidity value of the sensor.
@TrackedChanges
public var humidity: RelativeHumidity?
/// The location identifier of the sensor
public let location: Location
/// The current temperature value of the sensor.
@TrackedChanges
public var temperature: DryBulb?
/// The topics to listen for updated sensor values.
public let topics: Topics
/// Create a new temperature and humidity sensor.
///
/// - Parameters:
/// - location: The location of the sensor.
/// - altitude: The altitude of the sensor.
/// - temperature: The current temperature value of the sensor.
/// - humidity: The current relative humidity value of the sensor.
/// - needsProcessed: If the sensor needs to be processed.
public init(
location: Location,
altitude: Length = .feet(800.0),
temperature: DryBulb? = nil,
humidity: RelativeHumidity? = nil,
needsProcessed: Bool = false,
topics: Topics? = nil
) {
self.altitude = altitude
self.location = location
self._temperature = TrackedChanges(wrappedValue: temperature, needsProcessed: needsProcessed)
self._humidity = TrackedChanges(wrappedValue: humidity, needsProcessed: needsProcessed)
self.topics = topics ?? .init(location: location)
}
/// The calculated dew-point temperature of the sensor.
public var dewPoint: DewPoint? {
get async {
guard let temperature = temperature,
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
}
}
/// The calculated enthalpy of the sensor.
public var enthalpy: EnthalpyOf<MoistAir>? {
get async {
guard let temperature = temperature,
let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil }
return try? await psychrometrics.enthalpy.moistAir(
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
)
}
}
/// Check whether any of the sensor values have changed and need processed.
///
/// - Note: Setting a value will set to both the temperature and humidity properties.
public var needsProcessed: Bool {
get { $temperature.needsProcessed || $humidity.needsProcessed }
set {
$temperature.needsProcessed = newValue
$humidity.needsProcessed = newValue
}
}
/// Represents the different locations of a temperature and humidity sensor, which can
/// be used to derive the topic to both listen and publish new values to.
public enum Location: String, CaseIterable, Equatable, Hashable, Sendable {
case mixedAir = "mixed_air"
case postCoil = "post_coil"
case `return`
case supply
}
/// Represents the MQTT topics to listen for updated sensor values on.
public struct Topics: Equatable, Hashable, Sendable {
/// The dew-point temperature topic for the sensor.
public let dewPoint: String
/// The enthalpy topic for the sensor.
public let enthalpy: String
/// The humidity topic of the sensor.
public let humidity: String
/// The temperature topic of the sensor.
public let temperature: String
public init(
dewPoint: String,
enthalpy: String,
humidity: String,
temperature: String
) {
self.dewPoint = dewPoint
self.enthalpy = enthalpy
self.humidity = humidity
self.temperature = temperature
}
public init(topicPrefix: String? = "frankensystem", location: TemperatureAndHumiditySensor.Location) {
var prefix = topicPrefix ?? ""
if prefix.reversed().starts(with: "/") {
prefix = "\(prefix.dropLast())"
}
self.init(
dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
)
}
}
}

View File

@@ -1,269 +0,0 @@
/// A container for all the different topics that are needed by the application.
public struct Topics: Codable, Equatable {
/// The command topics the application can publish to.
public var commands: Commands
/// The sensor topics the application can read from / write to.
public var sensors: Sensors
/// The set point topics the application can read set point values from.
public var setPoints: SetPoints
/// The state topics the application can read state values from.
public var states: States
/// Create the topics required by the application.
///
/// - Parameters:
/// - sensors: The sensor topics.
/// - setPoints: The set point topics
/// - states: The states topics
/// - relays: The relay topics
public init(
commands: Commands = .init(),
sensors: Sensors = .init(),
setPoints: SetPoints = .init(),
states: States = .init()
) {
self.commands = commands
self.sensors = sensors
self.setPoints = setPoints
self.states = states
}
/// Represents the sensor topics.
public struct Sensors: Codable, Equatable {
public var mixedAirSensor: TemperatureAndHumiditySensor<State.Sensors.Mixed>
public var postCoilSensor: TemperatureAndHumiditySensor<State.Sensors.PostCoil>
public var returnAirSensor: TemperatureAndHumiditySensor<State.Sensors.Return>
public var supplyAirSensor: TemperatureAndHumiditySensor<State.Sensors.Supply>
public init(
mixedAirSensor: TemperatureAndHumiditySensor<State.Sensors.Mixed> = .default(location: "mixed=air"),
postCoilSensor: TemperatureAndHumiditySensor<State.Sensors.PostCoil> = .default(location: "post-coil"),
returnAirSensor: TemperatureAndHumiditySensor<State.Sensors.Return> = .default(location: "return"),
supplyAirSensor: TemperatureAndHumiditySensor<State.Sensors.Supply> = .default(location: "supply")
) {
self.mixedAirSensor = mixedAirSensor
self.postCoilSensor = postCoilSensor
self.returnAirSensor = returnAirSensor
self.supplyAirSensor = supplyAirSensor
}
public struct TemperatureAndHumiditySensor<Location>: Codable, Equatable {
public var temperature: String
public var humidity: String
public var dewPoint: String
public var enthalpy: String
/// Create a new sensor topic container.
///
/// - Parameters:
/// - temperature: The temperature sensor topic.
/// - humidity: The humidity sensor topic.
/// - dewPoint: The dew point sensor topic.
public init(
temperature: String,
humidity: String,
dewPoint: String,
enthalpy: String
) {
self.temperature = temperature
self.humidity = humidity
self.dewPoint = dewPoint
self.enthalpy = enthalpy
}
}
}
/// A container for set point related topics used by the application.
public struct SetPoints: Codable, Equatable {
/// The topic for the humidify set point.
public var humidify: Humidify
/// The topics for dehumidification set points.
public var dehumidify: Dehumidify
/// Create a new set point topic container.
///
/// - Parameters:
/// - humidify: The topic for humidification set points.
/// - dehumidify: The topics for dehumidification set points.
public init(
humidify: Humidify = .init(),
dehumidify: Dehumidify = .init()
) {
self.humidify = humidify
self.dehumidify = dehumidify
}
/// A container for the humidification set point topics used by the application.
public struct Humidify: Codable, Equatable {
/// The topic for dew point control mode set point.
public var dewPoint: String
/// The topic for relative humidity control mode set point.
public var relativeHumidity: String
/// Create a new container for the humidification set point topics.
///
/// - Parameters:
/// - dewPoint: The topic for dew point control mode set point.
/// - relativeHumidity: The topic for relative humidity control mode set point.
public init(
dewPoint: String = "set_points/humidify/dew_point",
relativeHumidity: String = "set_points/humidify/relative_humidity"
) {
self.dewPoint = dewPoint
self.relativeHumidity = relativeHumidity
}
}
/// A container for dehumidifcation set point topics.
public struct Dehumidify: Codable, Equatable {
/// A low setting for dew point control modes.
public var lowDewPoint: String
/// A high setting for dew point control modes.
public var highDewPoint: String
/// A low setting for relative humidity control modes.
public var lowRelativeHumidity: String
/// A high setting for relative humidity control modes.
public var highRelativeHumidity: String
/// Create a new container for dehumidification set point topics.
///
/// - Parameters:
/// - lowDewPoint: A low setting for dew point control modes.
/// - highDewPoint: A high setting for dew point control modes.
/// - lowRelativeHumidity: A low setting for relative humidity control modes.
/// - highRelativeHumidity: A high setting for relative humidity control modes.
public init(
lowDewPoint: String = "set_points/dehumidify/low_dew_point",
highDewPoint: String = "set_points/dehumidify/high_dew_point",
lowRelativeHumidity: String = "set_points/dehumidify/low_relative_humidity",
highRelativeHumidity: String = "set_points/dehumidify/high_relative_humidity"
) {
self.lowDewPoint = lowDewPoint
self.highDewPoint = highDewPoint
self.lowRelativeHumidity = lowRelativeHumidity
self.highRelativeHumidity = highRelativeHumidity
}
}
}
/// A container for control state topics used by the application.
public struct States: Codable, Equatable {
/// The topic for the control mode.
public var mode: String
/// The relay state topics.
public var relays: Relays
/// Create a new container for control state topics.
///
/// - Parameters:
/// - mode: The topic for the control mode.
public init(
mode: String = "states/mode",
relays: Relays = .init()
) {
self.mode = mode
self.relays = relays
}
/// A container for reading the current state of a relay.
public struct Relays: Codable, Equatable {
/// The dehumidification stage-1 relay topic.
public var dehumdification1: String
/// The dehumidification stage-2 relay topic.
public var dehumidification2: String
/// The humidification relay topic.
public var humdification: String
/// Create a new container for relay state topics.
///
/// - Parameters:
/// - dehumidification1: The dehumidification stage-1 relay topic.
/// - dehumidification2: The dehumidification stage-2 relay topic.
/// - humidification: The humidification relay topic.
public init(
dehumidefication1: String = "states/relays/dehumidification_1",
dehumidification2: String = "states/relays/dehumidification_2",
humidification: String = "states/relays/humidification"
) {
self.dehumdification1 = dehumidefication1
self.dehumidification2 = dehumidification2
self.humdification = humidification
}
}
}
/// A container for commands topics that the application can publish to.
public struct Commands: Codable, Equatable {
/// The relay command topics.
public var relays: Relays
/// Create a new command topics container.
///
/// - Parameters:
/// - relays: The relay command topics.
public init(relays: Relays = .init()) {
self.relays = relays
}
/// A container for relay command topics used by the application.
public struct Relays: Codable, Equatable {
/// The dehumidification stage-1 relay topic.
public var dehumidification1: String
/// The dehumidification stage-2 relay topic.
public var dehumidification2: String
/// The humidification relay topic.
public var humidification: String
/// Create a new container for commanding relays.
///
/// - Parameters:
/// - dehumidification1: The dehumidification stage-1 relay topic.
/// - dehumidification2: The dehumidification stage-2 relay topic.
/// - humidification: The humidification relay topic.
public init(
dehumidification1: String = "relays/dehumidification_1",
dehumidification2: String = "relays/dehumidification_2",
humidification: String = "relays/humidification"
) {
self.dehumidification1 = dehumidification1
self.dehumidification2 = dehumidification2
self.humidification = humidification
}
}
}
}
// MARK: Helpers
extension Topics.Sensors.TemperatureAndHumiditySensor {
public static func `default`(location: String) -> Self {
.init(
temperature: "sensors/\(location)/temperature",
humidity: "sensors/\(location)/humidity",
dewPoint: "sensors/\(location)/dew-point",
enthalpy: "sensors/\(location)/enthalpy"
)
}
}

60
Sources/Models/TrackedChanges.swift Normal file → Executable file
View File

@@ -1,11 +1,20 @@
/// A property wrapper that tracks changes of a property.
///
/// This allows values to only publish changes if they have changed since the
/// last time they were recieved.
@propertyWrapper
public struct TrackedChanges<Value> {
public struct TrackedChanges<Value: Sendable>: Sendable {
/// The current tracking state.
private var tracking: TrackingState
/// The current wrapped value.
private var value: Value
private var isEqual: (Value, Value) -> Bool
/// Used to check if a new value is equal to an old value.
private var isEqual: @Sendable (Value, Value) -> Bool
/// Access to the underlying property that we are wrapping.
public var wrappedValue: Value {
get { value }
set {
@@ -16,22 +25,34 @@ public struct TrackedChanges<Value> {
tracking = .needsProcessed
}
}
/// Create a new property that tracks it's changes.
///
/// - Parameters:
/// - wrappedValue: The value that we are wrapping.
/// - needsProcessed: Whether this value needs processed (default = false).
/// - isEqual: Method to compare old values against new values.
public init(
wrappedValue: Value,
needsProcessed: Bool = false,
isEqual: @escaping (Value, Value) -> Bool
isEqual: @escaping @Sendable (Value, Value) -> Bool
) {
self.value = wrappedValue
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
self.isEqual = isEqual
}
/// Represents whether a wrapped value has changed and needs processed or not.
enum TrackingState {
/// The state when nothing has changed and we've already processed the current value.
case hasProcessed
/// The state when the value has changed and has not been processed yet.
case needsProcessed
}
/// Whether the value needs processed.
public var needsProcessed: Bool {
get { tracking == .needsProcessed }
set {
@@ -42,7 +63,7 @@ public struct TrackedChanges<Value> {
}
}
}
public var projectedValue: Self {
get { self }
set { self = newValue }
@@ -54,11 +75,26 @@ extension TrackedChanges: Equatable where Value: Equatable {
lhs.wrappedValue == rhs.wrappedValue
&& lhs.needsProcessed == rhs.needsProcessed
}
/// Create a new property that tracks it's changes, using the default equality check.
///
/// - Parameters:
/// - wrappedValue: The value that we are wrapping.
/// - needsProcessed: Whether this value needs processed (default = false).
public init(
wrappedValue: Value,
needsProcessed: Bool = false
) {
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
$0 == $1
}
}
}
extension TrackedChanges: Hashable where Value: Hashable {
public func hash(into hasher: inout Hasher) {
hasher.combine(wrappedValue)
hasher.combine(needsProcessed)
}
}

View File

@@ -0,0 +1,212 @@
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import Models
import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClient
import ServiceLifecycle
/// Service that is responsible for listening to changes of the temperature and humidity
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
/// the sensor location.
///
///
public actor SensorsService: Service {
@Dependency(\.mqtt) var mqtt
/// The logger to use for the service.
private let logger: Logger?
/// The sensors that we are listening for updates to, so
/// that we can calculate the dew-point temperature and enthalpy
/// values to publish back to the MQTT broker.
private var sensors: [TemperatureAndHumiditySensor]
private var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
}
/// Create a new sensors service that listens to the passed in
/// sensors.
///
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
///
/// - Parameters:
/// - sensors: The sensors to listen for changes to.
/// - logger: An optional logger to use.
public init(
sensors: [TemperatureAndHumiditySensor],
logger: Logger? = nil
) {
self.sensors = sensors
self.logger = logger
}
/// Start the service with graceful shutdown, which will attempt to publish
/// any pending changes to the MQTT broker, upon a shutdown signal.
public func run() async throws {
precondition(sensors.count > 0, "Sensors should not be empty.")
let stream = try await makeStream()
await withGracefulShutdownHandler {
for await result in stream.cancelOnGracefulShutdown() {
logger?.debug("Received result for topic: \(result.topic)")
await handleResult(result)
}
} onGracefulShutdown: {
self.logger?.debug("Received graceful shutdown.")
Task {
try await self.shutdown()
}
}
}
@_spi(Internal)
public func shutdown() async throws {
try await publishUpdates()
}
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
// ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently.
try await mqtt.listen(to: topics)
.map { ($0.payload, $0.topicName) }
.removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic
}
.eraseToStream()
}
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
do {
let topic = result.topic
assert(topics.contains(topic))
logger?.debug("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
return V(buffer: result.buffer)
}
if topic.contains("temperature") {
logger?.debug("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else {
logger?.debug("Failed to decode temperature: \(result.buffer)")
throw DecodingError()
}
logger?.debug("Decoded temperature: \(temperature)")
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
} else if topic.contains("humidity") {
logger?.debug("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
logger?.debug("Failed to decode humidity: \(result.buffer)")
throw DecodingError()
}
logger?.debug("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
}
try await publishUpdates()
logger?.debug("Done handling result for topic: \(topic)")
} catch {
logger?.error("Received error while handling result: \(error)")
}
}
private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return }
try await mqtt.publish(
ByteBufferAllocator().buffer(string: "\(double)"),
to: topic,
qos: .exactlyOnce,
retain: true
)
logger?.debug("Published update to topic: \(topic)")
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
try sensors.hasProcessed(sensor)
}
}
}
// MARK: - Errors
struct DecodingError: Error {}
struct SensorNotFoundError: Error {}
// MARK: - Helpers
private extension TemperatureAndHumiditySensor.Topics {
func contains(_ topic: String) -> Bool {
temperature == topic || humidity == topic
}
}
private extension Array where Element == TemperatureAndHumiditySensor {
mutating func update<V>(
topic: String,
keyPath: WritableKeyPath<TemperatureAndHumiditySensor, V>,
with value: V
) throws {
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
throw SensorNotFoundError()
}
self[index][keyPath: keyPath] = value
}
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
guard let index = firstIndex(where: { $0.id == sensor.id }) else {
throw SensorNotFoundError()
}
self[index].needsProcessed = false
}
}
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: ByteBuffer) {
let string = String(buffer: buffer)
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = RawValue(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}

View File

@@ -1,52 +0,0 @@
import Models
// TODO: Fix other live topics
extension Topics {
public static let live = Self.init(
commands: .init(),
sensors: .init(
mixedAirSensor: .live(location: .mixedAir),
postCoilSensor: .live(location: .postCoil),
returnAirSensor: .live(location: .return),
supplyAirSensor: .live(location: .supply)),
setPoints: .init(),
states: .init()
)
}
extension Topics.Sensors {
fileprivate enum Location: CustomStringConvertible {
case mixedAir
case postCoil
case `return`
case supply
var description: String {
switch self {
case .mixedAir:
return "mixed_air"
case .postCoil:
return "post_coil"
case .return:
return "return"
case .supply:
return "supply"
}
}
}
}
extension Topics.Sensors.TemperatureAndHumiditySensor {
fileprivate static func live(
prefix: String = "frankensystem",
location: Topics.Sensors.Location
) -> Self {
.init(
temperature: "\(prefix)/sensor/\(location.description)_temperature/state",
humidity: "\(prefix)/sensor/\(location.description)_humidity/state",
dewPoint: "\(prefix)/sensor/\(location.description)_dew_point/state",
enthalpy: "\(prefix)/sensor/\(location.description)_enthalpy/state"
)
}
}

View File

@@ -1,73 +0,0 @@
import Bootstrap
import ClientLive
import CoreUnitTypes
import Logging
import Models
import MQTTNIO
import NIO
import TopicsLive
import Foundation
var logger: Logger = {
var logger = Logger(label: "dewPoint-logger")
logger.logLevel = .debug
return logger
}()
logger.info("Starting Swift Dew Point Controller!")
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var environment = try bootstrap(eventLoopGroup: eventLoopGroup, logger: logger, autoConnect: false).wait()
// Set the log level to info only in production mode.
if environment.envVars.appEnv == .production {
logger.logLevel = .info
}
// Set up the client, topics and state.
environment.topics = .live
let state = State()
let client = Client.live(client: environment.mqttClient, state: state, topics: environment.topics)
defer {
logger.debug("Disconnecting")
}
// Add topic listeners.
client.addListeners()
while true {
if !environment.mqttClient.isActive() {
logger.trace("Connecting to MQTT broker...")
try client.connect().wait()
try client.subscribe().wait()
Thread.sleep(forTimeInterval: 1)
}
// Check if sensors need processed.
if state.sensors.needsProcessed {
logger.debug("Sensor state has changed...")
if state.sensors.mixedAirSensor.needsProcessed {
logger.trace("Publishing mixed air sensor.")
try client.publishSensor(.mixed(state.sensors.mixedAirSensor)).wait()
}
if state.sensors.postCoilSensor.needsProcessed {
logger.trace("Publishing post coil sensor.")
try client.publishSensor(.postCoil(state.sensors.postCoilSensor)).wait()
}
if state.sensors.returnAirSensor.needsProcessed {
logger.trace("Publishing return air sensor.")
try client.publishSensor(.return(state.sensors.returnAirSensor)).wait()
}
if state.sensors.supplyAirSensor.needsProcessed {
logger.trace("Publishing supply air sensor.")
try client.publishSensor(.supply(state.sensors.supplyAirSensor)).wait()
}
}
// logger.debug("Fetching dew point...")
//
// logger.debug("Published dew point...")
Thread.sleep(forTimeInterval: 5)
}

View File

@@ -1,45 +0,0 @@
import XCTest
import EnvVars
import Logging
import Models
@testable import ClientLive
import Psychrometrics
final class AsyncClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
logger.logLevel = .trace
return logger
}()
func createClient(identifier: String) -> AsyncClient {
let envVars = EnvVars.init(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
return .init(envVars: envVars, logger: Self.logger)
}
func testConnectAndShutdown() async throws {
let client = createClient(identifier: "testConnectAndShutdown")
await client.connect()
await client.shutdown()
}
func testPublishingSensor() async throws {
let client = createClient(identifier: "testPublishingSensor")
await client.connect()
let topic = Topics().sensors.mixedAirSensor.dewPoint
try await client.addPublishListener(topic: topic, decoding: Temperature.self)
try await client.publishSensor(.mixed(.init(temperature: 71.123, humidity: 50.5, needsProcessed: true)))
try await client.publishSensor(.mixed(.init(temperature: 72.123, humidity: 50.5, needsProcessed: true)))
await client.shutdown()
}
}

View File

@@ -1,186 +0,0 @@
import Client
@testable import ClientLive
import CoreUnitTypes
import Foundation
import Logging
import Models
import MQTTNIO
import NIO
import NIOConcurrencyHelpers
import XCTest
final class ClientLiveTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
let topics = Topics()
// func test_mqtt_subscription() throws {
// let mqttClient = createMQTTClient(identifier: "test_subscription")
// _ = try mqttClient.connect().wait()
// let sub = try mqttClient.v5.subscribe(
// to: [mqttClient.mqttSubscription(topic: "test/subscription")]
// ).wait()
// XCTAssertEqual(sub.reasons[0], .grantedQoS1)
// try mqttClient.disconnect().wait()
// try mqttClient.syncShutdownGracefully()
// }
func test_mqtt_listener() throws {
let lock = Lock()
var publishRecieved: [MQTTPublishInfo] = []
let payloadString = "test"
let payload = ByteBufferAllocator().buffer(string: payloadString)
let client = self.createMQTTClient(identifier: "testMQTTListener_publisher")
_ = try client.connect().wait()
client.addPublishListener(named: "test") { result in
switch result {
case .success(let publish):
var buffer = publish.payload
let string = buffer.readString(length: buffer.readableBytes)
XCTAssertEqual(string, payloadString)
lock.withLock {
publishRecieved.append(publish)
}
case .failure(let error):
XCTFail("\(error)")
}
}
try client.publish(to: "testMQTTSubscribe", payload: payload, qos: .atLeastOnce, retain: true).wait()
let sub = try client.v5.subscribe(to: [.init(topicFilter: "testMQTTSubscribe", qos: .atLeastOnce)]).wait()
XCTAssertEqual(sub.reasons[0], .grantedQoS1)
Thread.sleep(forTimeInterval: 2)
lock.withLock {
XCTAssertEqual(publishRecieved.count, 1)
}
try client.disconnect().wait()
try client.syncShutdownGracefully()
}
func test_client2_returnTemperature_listener() throws {
let mqttClient = createMQTTClient(identifier: "return-temperature-tests")
let state = State()
let topics = Topics()
let client = Client.live(client: mqttClient, state: state, topics: topics)
client.addListeners()
try client.connect().wait()
try client.subscribe().wait()
_ = try mqttClient.publish(
to: topics.sensors.returnAirSensor.temperature,
payload: ByteBufferAllocator().buffer(string: "75.1234"),
qos: .atLeastOnce
).wait()
Thread.sleep(forTimeInterval: 2)
XCTAssertEqual(state.sensors.returnAirSensor.temperature, .celsius(75.1234))
try mqttClient.disconnect().wait()
try mqttClient.syncShutdownGracefully()
// try client.shutdown().wait()
}
func test_client2_returnSensor_publish() throws {
let mqttClient = createMQTTClient(identifier: "return-temperature-tests")
let state = State()
let topics = Topics()
let client = Client.live(client: mqttClient, state: state, topics: topics)
client.addListeners()
try client.connect().wait()
try client.subscribe().wait()
_ = try mqttClient.publish(
to: topics.sensors.returnAirSensor.temperature,
payload: ByteBufferAllocator().buffer(string: "75.1234"),
qos: .atLeastOnce
).wait()
_ = try mqttClient.publish(
to: topics.sensors.returnAirSensor.humidity,
payload: ByteBufferAllocator().buffer(string: "\(50.0)"),
qos: .atLeastOnce
).wait()
Thread.sleep(forTimeInterval: 2)
XCTAssert(state.sensors.returnAirSensor.needsProcessed)
try client.publishSensor(.return(state.sensors.returnAirSensor)).wait()
XCTAssertFalse(state.sensors.returnAirSensor.needsProcessed)
try mqttClient.disconnect().wait()
try mqttClient.syncShutdownGracefully()
// try client.shutdown().wait()
}
// func test_fetch_humidity() throws {
// let lock = Lock()
// let publishClient = createMQTTClient(identifier: "publishHumidity")
// let mqttClient = createMQTTClient(identifier: "fetchHumidity")
// _ = try publishClient.connect().wait()
// let client = try createClient(mqttClient: mqttClient)
// var humidityRecieved: [RelativeHumidity] = []
//
// _ = try publishClient.publish(
// to: topics.sensors.humidity,
// payload: ByteBufferAllocator().buffer(string: "\(50.0)"),
// qos: .atLeastOnce
// ).wait()
//
// Thread.sleep(forTimeInterval: 2)
// try publishClient.disconnect().wait()
// let humidity = try client.fetchHumidity(.init(topic: self.topics.sensors.humidity)).wait()
// XCTAssertEqual(humidity, 50)
// Thread.sleep(forTimeInterval: 2)
// lock.withLock {
// humidityRecieved.append(humidity)
// }
// try mqttClient.disconnect().wait()
// try mqttClient.syncShutdownGracefully()
// }
// MARK: - Helpers
func createMQTTClient(identifier: String) -> MQTTNIO.MQTTClient {
MQTTNIO.MQTTClient(
host: Self.hostname,
port: 1883,
identifier: identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: self.logger,
configuration: .init(version: .v5_0)
)
}
// func createWebSocketClient(identifier: String) -> MQTTNIO.MQTTClient {
// MQTTNIO.MQTTClient(
// host: Self.hostname,
// port: 8080,
// identifier: identifier,
// eventLoopGroupProvider: .createNew,
// logger: self.logger,
// configuration: .init(useWebSockets: true, webSocketURLPath: "/mqtt")
// )
// }
// Uses default topic names.
// func createClient(mqttClient: MQTTNIO.MQTTClient, autoConnect: Bool = true) throws -> Client.MQTTClient {
// if autoConnect {
// _ = try mqttClient.connect().wait()
// }
// return .live(client: mqttClient, topics: .init())
// }
let logger: Logger = {
var logger = Logger(label: "MQTTTests")
logger.logLevel = .trace
return logger
}()
let eventLoopGroup = MultiThreadedEventLoopGroup.init(numberOfThreads: 1)
}

View File

@@ -0,0 +1,93 @@
import AsyncAlgorithms
import Logging
import Models
import MQTTConnectionService
@_spi(Internal) import MQTTManager
import MQTTNIO
import NIO
import ServiceLifecycle
import ServiceLifecycleTestKit
import XCTest
final class MQTTConnectionServiceTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "MQTTConnectionServiceTests")
logger.logLevel = .trace
return logger
}()
// TODO: Move to integration tests.
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
defer { manager.shutdown() }
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTManager.Event]()
var events2 = [MQTTManager.Event]()
let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
manager.shutdown()
try await client.disconnect()
try await Task.sleep(for: .seconds(1))
try await client.shutdown()
try await Task.sleep(for: .seconds(1))
connectionStream1.stop()
connectionStream2.stop()
}
for await event in stream1.removeDuplicates() {
events1.append(event)
}
for await event in stream2.removeDuplicates() {
events2.append(event)
}
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
}

View File

@@ -0,0 +1,142 @@
import Dependencies
import Logging
import Models
@_spi(Internal) import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
@_spi(Internal) import SensorsService
import XCTest
final class SensorsClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "SensorsClientTests")
logger.logLevel = .trace
return logger
}()
override func invokeTest() {
let client = createClient(identifier: "\(Self.self)")
withDependencies {
$0.mqtt = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue
} operation: {
super.invokeTest()
}
}
func testListeningResumesAfterDisconnectThenReconnect() async throws {
struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return)
let results = ResultContainer()
try await withDependencies {
$0.mqtt.publish = results.append
} operation: {
@Dependency(\.mqtt) var manager
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() }
defer { task.cancel() }
try await manager.connect()
defer { manager.shutdown() }
try await manager.withClient { client in
try await client.disconnect()
try await client.connect()
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
// Give time to re-subscribe.
try await Task.sleep(for: .milliseconds(200))
try await client.publish(
to: sensor.topics.temperature,
payload: ByteBufferAllocator().buffer(string: "25"),
qos: .atLeastOnce,
retain: false
)
try await client.publish(
to: sensor.topics.humidity,
payload: ByteBufferAllocator().buffer(string: "50"),
qos: .atLeastOnce,
retain: false
)
}
var timeoutCount = 0
while !(await results.count == 2) {
guard timeoutCount < 20 else {
throw TimeoutError()
}
try await Task.sleep(for: .milliseconds(100))
timeoutCount += 1
}
let results = await results.results()
XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
try await sensorsService.shutdown()
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
}
// MARK: Helpers for tests.
struct TopicNotFoundError: Error {}
actor ResultContainer: Sendable {
private var storage = [MQTTManager.PublishRequest]()
init() {}
@Sendable func append(_ result: MQTTManager.PublishRequest) async {
storage.append(result)
}
var count: Int {
get async { storage.count }
}
func results() async -> [MQTTManager.PublishRequest] {
storage
}
}

View File

@@ -1,47 +0,0 @@
import XCTest
import class Foundation.Bundle
//final class dewPoint_controllerTests: XCTestCase {
// func testExample() throws {
// // This is an example of a functional test case.
// // Use XCTAssert and related functions to verify your tests produce the correct
// // results.
//
// // Some of the APIs that we use below are available in macOS 10.13 and above.
// guard #available(macOS 10.13, *) else {
// return
// }
//
// // Mac Catalyst won't have `Process`, but it is supported for executables.
// #if !targetEnvironment(macCatalyst)
//
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
//
// let process = Process()
// process.executableURL = fooBinary
//
// let pipe = Pipe()
// process.standardOutput = pipe
//
// try process.run()
// process.waitUntilExit()
//
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
// let output = String(data: data, encoding: .utf8)
//
// XCTAssertEqual(output, "Hello, world!\n")
// #endif
// }
//
// /// Returns path to the built products directory.
// var productsDirectory: URL {
// #if os(macOS)
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
// return bundle.bundleURL.deletingLastPathComponent()
// }
// fatalError("couldn't find the products directory")
// #else
// return Bundle.main.bundleURL
// #endif
// }
//}

View File

@@ -1,47 +0,0 @@
# run this with docker-compose -f docker/docker-compose.yml run test
services:
server:
image: swift-mqtt-dewpoint:latest
restart: unless-stopped
env_file: .env
test:
image: swift:latest
#build:
#context: ./
platform: linux/amd64
working_dir: /app
networks:
- test
volumes:
- .:/app
depends_on:
- mosquitto-test
environment:
- MOSQUITTO_SERVER=mosquitto-test
command: /bin/bash -xc "swift package clean && swift test"
mosquitto-test:
image: eclipse-mosquitto
networks:
- test
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/certs:/mosquitto/certs
mosquitto:
image: eclipse-mosquitto
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/certs:/mosquitto/certs
ports:
- "1883:1883"
- "8883:8883"
- "8080:8080"
- "8081:8081"
networks:
test:
driver: bridge
external: false

14
docker/Dockerfile Executable file
View File

@@ -0,0 +1,14 @@
# Used this to build the release version of the image.
# Build the executable
FROM swift:5.10 AS build
WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build -c release -Xswiftc -g
# Run image
FROM swift:5.10-slim
WORKDIR /run
COPY --from=build /build/.build/release/dewpoint-controller /run
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]

View File

@@ -0,0 +1,5 @@
# Used to build a local MQTT broker for development and
# testing.
FROM eclipse-mosquitto:latest
COPY ./mosquitto/config/mosquitto.conf /mosquitto/config/mosquitto.conf
EXPOSE 1883

8
docker/Dockerfile.test Normal file
View File

@@ -0,0 +1,8 @@
# Used to build a test image.
FROM swift:5.10
WORKDIR /app
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build
CMD ["/bin/bash", "-xc", "swift", "test"]

17
docker/docker-compose-test.yaml Executable file
View File

@@ -0,0 +1,17 @@
# run this with docker-compose run test
services:
test:
build:
context: ..
dockerfile: docker/Dockerfile.test
working_dir: /app
depends_on:
- mosquitto
environment:
- MOSQUITTO_SERVER=mosquitto
command: /bin/bash -xc "swift test"
mosquitto:
build:
context: ..
dockerfile: docker/Dockerfile.mosquitto

18
docker/docker-compose.yaml Executable file
View File

@@ -0,0 +1,18 @@
# run this with docker-compose run dewpoint_controller
services:
dewpoint_controller:
container_name: dewpoint-controller
build:
context: ..
dockerfile: docker/Dockerfile
depends_on:
- mosquitto
environment:
- MQTT_HOST=mosquitto
mosquitto:
build:
context: ..
dockerfile: docker/Dockerfile.mosquitto
ports:
- "1883:1883"

0
mosquitto/config/mosquitto.conf Normal file → Executable file
View File