79 Commits

Author SHA1 Message Date
916fcb3584 fix: Update to release workflow
All checks were successful
Create and publish a Docker image / build-and-push-image (push) Successful in 8m12s
CI / Run Tests (pull_request) Successful in 4m28s
CI / Run Tests (push) Successful in 5m5s
2024-11-19 15:08:07 -05:00
d9af0b8b30 fix: Update to release workflow
Some checks failed
CI / Run Tests (push) Has been cancelled
Create and publish a Docker image / build-and-push-image (push) Failing after 7m15s
2024-11-19 14:52:37 -05:00
aa666d799a fix: Update to release workflow
Some checks failed
CI / Run Tests (push) Has been cancelled
Create and publish a Docker image / build-and-push-image (push) Failing after 7m42s
2024-11-19 14:24:43 -05:00
3825517dae fix: Update to release workflow
Some checks failed
CI / Run Tests (push) Has been cancelled
Create and publish a Docker image / build-and-push-image (push) Failing after 13s
2024-11-19 14:22:21 -05:00
c21695a37e fix: Update to release workflow
Some checks failed
CI / Run Tests (push) Has been cancelled
Create and publish a Docker image / build-and-push-image (push) Failing after 8s
2024-11-19 14:18:50 -05:00
3743eefa69 fix: Update to release workflow
Some checks failed
CI / Run Tests (push) Has been cancelled
Create and publish a Docker image / build-and-push-image (push) Failing after 6s
2024-11-19 14:02:10 -05:00
845d566c60 feat: Adds release workflow to gitea
Some checks failed
CI / Run Tests (push) Has been cancelled
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
2024-11-19 13:43:04 -05:00
99f39b91af feat: More cli client tests and documentation.
All checks were successful
CI / Run Tests (push) Successful in 4m57s
2024-11-18 22:55:54 -05:00
55ea88a29f feat: Switch to old commit to checkout xcodebuild 2024-11-18 18:45:03 -05:00
756fd0bccf feat: cli client test updates
Some checks failed
CI / Run Tests (push) Failing after 3h8m1s
2024-11-18 17:16:44 -05:00
24f2ad63a7 feat: Moved cli client tests to XCTest to work in docker
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-18 15:53:23 -05:00
ce18c44363 feat: Working on cli client and tests
Some checks failed
CI / Run Tests (push) Failing after 3m7s
2024-11-17 22:23:44 -05:00
6472d3cd1e feat: Begins using swift argument parser and creating cli client dependency
All checks were successful
CI / Run Tests (push) Successful in 4m27s
2024-11-16 22:32:32 -05:00
3416ce1003 feat: Prep for moving tests into single integration suite 2024-11-16 01:42:29 -05:00
c84427a9b3 feat: Renaming and moves some items around, listeners now manage reconnection events.
All checks were successful
CI / Run Tests (push) Successful in 4m16s
2024-11-15 17:15:01 -05:00
947472f62d feat: Minimal readme and cleans up docker files.
All checks were successful
CI / Run Tests (push) Successful in 27m31s
2024-11-14 22:21:02 -05:00
d16135dd50 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:20:25 -05:00
19e97652fd feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:19:26 -05:00
1089452212 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:17:08 -05:00
5e998a60d0 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Failing after 13s
2024-11-14 22:12:20 -05:00
9e2af22a36 feat: Fix CI
All checks were successful
CI / Run Tests (push) Successful in 15m25s
2024-11-14 21:36:56 -05:00
89f3601c2c fix: Fixes sensor service test that was flaky and moves docker stuff into it's own directory.
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 21:34:48 -05:00
d4b6f6ad2b fix: Fixes sensor service test that was flaky and moves docker stuff into it's own directory.
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 21:24:23 -05:00
ec3cd40fef feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m57s
2024-11-14 21:00:01 -05:00
953c9d5b7c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m57s
2024-11-14 20:54:00 -05:00
00bb6ca1a6 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m40s
2024-11-14 20:45:50 -05:00
41fb3c5715 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 20:44:31 -05:00
8e4430804c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 4m31s
2024-11-14 20:02:50 -05:00
a8f689136d feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 20:01:45 -05:00
2607be6658 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 20:00:24 -05:00
b05e18b258 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:59:45 -05:00
394b49d1a0 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m14s
2024-11-14 19:55:19 -05:00
6bec0d6fa5 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m33s
2024-11-14 19:52:28 -05:00
63d65bd7cd feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:51:25 -05:00
320f3e792e feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:49:04 -05:00
74b73e7534 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:47:43 -05:00
7954fc5dcd feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:45:40 -05:00
115c4dc252 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:43:07 -05:00
853a157ae7 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:35:26 -05:00
30b8ea3661 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:32:47 -05:00
d26ab714ab feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:30:20 -05:00
b45ad76fff feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 19:24:58 -05:00
c4395b9089 feat: Run CI
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 19:00:22 -05:00
b3874b96c5 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8m52s
2024-11-14 18:41:37 -05:00
4024bb624f feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 18:39:10 -05:00
6371ffed47 fix: Fixing naming of dewpoint-controller, part 2.
Some checks failed
CI / Run Tests (push) Failing after 2m35s
2024-11-14 18:10:52 -05:00
76b06e86fa fix: Fixing naming of dewpoint-controller, part 2.
Some checks failed
CI / Run Tests (push) Failing after 1m41s
2024-11-14 18:01:48 -05:00
fccfa4d006 fix: Fixing naming of dewpoint-controller, part 1. 2024-11-14 18:01:18 -05:00
5df08d6c91 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m40s
2024-11-14 17:55:40 -05:00
1c99e4861d feat: Run CI
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 17:48:07 -05:00
a0b7053eae feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 33s
2024-11-14 17:41:42 -05:00
df3ed6a407 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 17:40:47 -05:00
1d9d8dc449 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m6s
2024-11-14 17:17:45 -05:00
9a53d36f4c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 54s
2024-11-14 17:16:00 -05:00
44a6a878eb feat: Run CI
All checks were successful
CI / Run Tests (push) Successful in 33s
2024-11-14 17:12:34 -05:00
c13a1a14a3 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m39s
2024-11-14 17:09:31 -05:00
6c916215ea feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m37s
2024-11-14 17:02:35 -05:00
be7442c06a feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 17:00:41 -05:00
26a30c2a07 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 9s
2024-11-14 16:59:39 -05:00
5f131d8fa2 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 16:58:22 -05:00
d6e217f556 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 16:55:52 -05:00
b39ccafc92 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 9s
2024-11-14 16:54:04 -05:00
8336c56adf feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 14s
2024-11-14 16:41:24 -05:00
fac8945386 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 36s
2024-11-14 16:38:42 -05:00
5b319cae9b feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 7s
2024-11-14 16:37:35 -05:00
ca7024cb60 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 16:32:48 -05:00
ce327a6f1c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 15:11:29 -05:00
95f8565cde feat: Adds initial CI
Some checks failed
CI / Test (push) Failing after 39s
2024-11-14 15:05:09 -05:00
163f603b69 feat: Fixes some tests and docker builds 2024-11-14 14:58:09 -05:00
e7a849b003 feat: Adding more tests 2024-11-14 07:43:40 -05:00
bd2a798320 feat: Seperates connection stream and moves connection manager out of the connection service module. 2024-11-13 17:12:56 -05:00
b8992b89b6 feat: Adds MQTT connection stream, need to clean up the manager and remove stream from it. 2024-11-13 10:06:28 -05:00
efd9907b4a feat: Cleans up some of the shutdown logic so that the MQTTClient is disconnected properly. 2024-11-12 22:19:09 -05:00
fbbd65f7ae feat: Cleaning up some unused code. 2024-11-12 21:18:02 -05:00
8067331ff8 feat: Removes sensor client in favor of more generic topic listener and publisher 2024-11-12 16:42:14 -05:00
b6db9b5322 feat: Begins breaking out topic listener and publisher as it's own dependency. 2024-11-12 11:12:34 -05:00
bf1126b06a feat: Adds MQTTConnectionManagerLive module. 2024-11-11 22:00:14 -05:00
ef552fb8bc feat: Removes unused files 2024-11-11 16:28:11 -05:00
1e62d7aac0 feat: Adds sensor client dependency 2024-11-11 15:23:45 -05:00
50 changed files with 2087 additions and 1931 deletions

21
.gitea/workflows/ci.yaml Normal file
View File

@@ -0,0 +1,21 @@
---
name: CI
on:
push:
pull_request:
jobs:
test:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Setup Docker buildx
uses: docker/setup-buildx-action@v3
- name: Run Test
run: make test-docker
- name: Cleanup.
if: always()
run: docker compose --file docker/docker-compose-test.yaml down

68
.gitea/workflows/release.yml Executable file
View File

@@ -0,0 +1,68 @@
#
name: Create and publish a Docker image
# Configures this workflow to run every time a change is pushed to the branch called `release`.
on:
push:
branches: ['release']
tags:
- '*'
workflow_dispatch:
# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds.
env:
REGISTRY: git.housh.dev
IMAGE_NAME: ${{ gitea.repository }}
# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu.
jobs:
build-and-push-image:
runs-on: ubuntu-latest
# Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job.
permissions:
contents: read
packages: write
attestations: write
id-token: write
#
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ gitea.actor }}
password: ${{ secrets.CONTAINER_TOKEN }}
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=semver,pattern={{version}}
type=sha
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
- name: Build and push Docker image
id: push
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with:
context: .
file: docker/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
# - name: Generate artifact attestation
# uses: actions/attest-build-provenance@v1
# with:
# subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
# subject-digest: ${{ steps.push.outputs.digest }}
# push-to-registry: true
# github-token: ${{ secrets.CONTAINER_TOKEN }}

4
.gitignore vendored
View File

@@ -5,8 +5,10 @@
xcuserdata/ xcuserdata/
DerivedData/ DerivedData/
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata .swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
.dewPoint-env .dewPoint-env*
.topics .topics
mqtt_password.txt mqtt_password.txt
.env .env
.smbdelete* .smbdelete*
buildServer.json
.nvim/*

View File

@@ -8,3 +8,4 @@
--wrapconditions after-first --wrapconditions after-first
--typeblanklines preserve --typeblanklines preserve
--commas inline --commas inline
--stripunusedargs closure-only

9
.swiftlint.yml Normal file
View File

@@ -0,0 +1,9 @@
disabled_rules:
- closing_brace
- fuction_body_length
included:
- Sources
- Tests
ignore_multiline_statement_conditions: true

View File

@@ -0,0 +1,28 @@
{
"configurations" : [
{
"id" : "AFB1047B-4742-43D2-AFB9-680C1CB2D273",
"name" : "Test Scheme Action",
"options" : {
}
}
],
"defaultOptions" : {
"targetForVariableExpansion" : {
"containerPath" : "container:",
"identifier" : "dewpoint-controller",
"name" : "dewpoint-controller"
}
},
"testTargets" : [
{
"target" : {
"containerPath" : "container:",
"identifier" : "IntegrationTests",
"name" : "IntegrationTests"
}
}
],
"version" : 1
}

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Bootstrap"
BuildableName = "Bootstrap"
BlueprintName = "Bootstrap"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Bootstrap"
BuildableName = "Bootstrap"
BlueprintName = "Bootstrap"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Client"
BuildableName = "Client"
BlueprintName = "Client"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Client"
BuildableName = "Client"
BlueprintName = "Client"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientLive"
BuildableName = "ClientLive"
BlueprintName = "ClientLive"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientLive"
BuildableName = "ClientLive"
BlueprintName = "ClientLive"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "DewPointEnvironment"
BuildableName = "DewPointEnvironment"
BlueprintName = "DewPointEnvironment"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "DewPointEnvironment"
BuildableName = "DewPointEnvironment"
BlueprintName = "DewPointEnvironment"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "EnvVars"
BuildableName = "EnvVars"
BlueprintName = "EnvVars"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "EnvVars"
BuildableName = "EnvVars"
BlueprintName = "EnvVars"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Models"
BuildableName = "Models"
BlueprintName = "Models"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Models"
BuildableName = "Models"
BlueprintName = "Models"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,256 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Bootstrap"
BuildableName = "Bootstrap"
BlueprintName = "Bootstrap"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Client"
BuildableName = "Client"
BlueprintName = "Client"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientLive"
BuildableName = "ClientLive"
BlueprintName = "ClientLive"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "DewPointEnvironment"
BuildableName = "DewPointEnvironment"
BlueprintName = "DewPointEnvironment"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "EnvVars"
BuildableName = "EnvVars"
BlueprintName = "EnvVars"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Models"
BuildableName = "Models"
BlueprintName = "Models"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "TopicsLive"
BuildableName = "TopicsLive"
BlueprintName = "TopicsLive"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "SensorsService"
BuildableName = "SensorsService"
BlueprintName = "SensorsService"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "MQTTConnectionServiceTests"
BuildableName = "MQTTConnectionServiceTests"
BlueprintName = "MQTTConnectionServiceTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "SensorsServiceTests"
BuildableName = "SensorsServiceTests"
BlueprintName = "SensorsServiceTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,126 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
<BuildableProductRunnable
runnableDebuggingMode = "0">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildableProductRunnable>
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<BuildableProductRunnable
runnableDebuggingMode = "0">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildableProductRunnable>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,14 +0,0 @@
# Build the executable
FROM swift:5.10 AS build
WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build --enable-test-discovery -c release -Xswiftc -g
# Run image
FROM swift:5.10-slim
WORKDIR /run
COPY --from=build /build/.build/release/dewPoint-controller /run
CMD ["/bin/bash", "-xc", "./dewPoint-controller"]

View File

@@ -1,6 +0,0 @@
FROM swift:5.10
WORKDIR /app
COPY ./Package.* ./
RUN swift package resolve
COPY . .
CMD ["/bin/bash", "-xc", "swift", "test"]

View File

@@ -1,30 +1,39 @@
DOCKER_IMAGE_NAME?="swift-mqtt-dewpoint"
DOCKER_TAG_NAME?="latest"
bootstrap-env: .PHONY: bootstrap
bootstrap:
@cp Bootstrap/dewPoint-env-example .dewPoint-env @cp Bootstrap/dewPoint-env-example .dewPoint-env
bootstrap-topics: .PHONY: build
@cp Bootstrap/topics-example .topics
bootstrap: bootstrap-env bootstrap-topics
build: build:
@swift build -Xswiftc -strict-concurrency=complete @swift build -Xswiftc -strict-concurrency=complete
.PHONY: build-docker
build-docker:
@docker build \
--file docker/Dockerfile \
--tag "${DOCKER_IMAGE_NAME}:${DOCKER_TAG_NAME}" .
.PHONY: clean
clean: clean:
rm -rf .build rm -rf .build
.PHONY: run
run: run:
@swift run dewPoint-controller @swift run dewpoint-controller
start-mosquitto:
@docker-compose start mosquitto
stop-mosquitto:
@docker-compose rm -f mosquitto || true
.PHONY: test-docker
test-docker: test-docker:
@docker-compose run --remove-orphans -i --rm test @docker compose --file docker/docker-compose-test.yaml \
@docker-compose kill mosquitto-test run --build --remove-orphans -i --rm test
@docker-compose rm -f @docker compose --file docker/docker-compose-test.yaml down
test: test-docker .PHONY: start-mosquitto
start-mosquitto:
@docker compose --file docker/docker-compose.yaml \
up -d mosquitto
.PHONY: test-swift
test-swift: start-mosquitto
@swift test --enable-code-coverage

View File

@@ -1,5 +1,5 @@
{ {
"originHash" : "d3104d51323f6bc98cf3ab2930e5a26f72c9a4fdb7640360ba27628672397841", "originHash" : "486be5d69e4f0ba7b9f42046df31a727c7e394e4ecfae5671e1b194bed7c9e9b",
"pins" : [ "pins" : [
{ {
"identity" : "combine-schedulers", "identity" : "combine-schedulers",
@@ -10,6 +10,15 @@
"version" : "1.0.2" "version" : "1.0.2"
} }
}, },
{
"identity" : "dotenv",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftpackages/DotEnv.git",
"state" : {
"revision" : "1f15bb9de727d694af1d003a1a5d7a553752850f",
"version" : "3.0.0"
}
},
{ {
"identity" : "mqtt-nio", "identity" : "mqtt-nio",
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
@@ -19,6 +28,15 @@
"version" : "2.11.0" "version" : "2.11.0"
} }
}, },
{
"identity" : "swift-argument-parser",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-argument-parser.git",
"state" : {
"revision" : "41982a3656a71c768319979febd796c6fd111d5c",
"version" : "1.5.0"
}
},
{ {
"identity" : "swift-async-algorithms", "identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
@@ -60,8 +78,17 @@
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-concurrency-extras", "location" : "https://github.com/pointfreeco/swift-concurrency-extras",
"state" : { "state" : {
"revision" : "6054df64b55186f08b6d0fd87152081b8ad8d613", "revision" : "163409ef7dae9d960b87f34b51587b6609a76c1f",
"version" : "1.2.0" "version" : "1.3.0"
}
},
{
"identity" : "swift-custom-dump",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-custom-dump",
"state" : {
"revision" : "82645ec760917961cfa08c9c0c7104a57a0fa4b1",
"version" : "1.3.3"
} }
}, },
{ {
@@ -69,8 +96,8 @@
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-dependencies", "location" : "https://github.com/pointfreeco/swift-dependencies",
"state" : { "state" : {
"revision" : "0fc0255e780bf742abeef29dec80924f5f0ae7b9", "revision" : "96eecd47660e8307877acb8c41cc5295ba7350a7",
"version" : "1.4.1" "version" : "1.5.2"
} }
}, },
{ {

View File

@@ -8,39 +8,61 @@ let swiftSettings: [SwiftSetting] = [
] ]
let package = Package( let package = Package(
name: "dewPoint-controller", name: "dewpoint-controller",
platforms: [ platforms: [
.macOS(.v14) .macOS(.v14)
], ],
products: [ products: [
.executable(name: "dewPoint-controller", targets: ["dewPoint-controller"]), .executable(name: "dewpoint-controller", targets: ["DewPointController"]),
.library(name: "CliClient", targets: ["CliClient"]),
.library(name: "Models", targets: ["Models"]), .library(name: "Models", targets: ["Models"]),
.library(name: "MQTTManager", targets: ["MQTTManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"]) .library(name: "SensorsService", targets: ["SensorsService"])
], ],
dependencies: [ dependencies: [
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), .package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"), .package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"), .package(url: "https://github.com/swiftpackages/DotEnv.git", from: "3.0.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"),
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.0.0"),
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"), .package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"), .package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0") .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
], ],
targets: [ targets: [
.executableTarget( .target(
name: "dewPoint-controller", name: "CliClient",
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionService", .product(name: "Dependencies", package: "swift-dependencies"),
"SensorsService", .product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "DotEnv", package: "DotEnv"),
.product(name: "NIO", package: "swift-nio"), .product(name: "MQTTNIO", package: "mqtt-nio")
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
] ]
), ),
.testTarget( .testTarget(
name: "dewPoint-controllerTests", name: "CliClientTests",
dependencies: ["dewPoint-controller"] dependencies: [
"CliClient"
],
resources: [
.copy("test.env"),
.copy("test-env.json")
]
),
.executableTarget(
name: "DewPointController",
dependencies: [
"CliClient",
"MQTTConnectionService",
"SensorsService",
.product(name: "ArgumentParser", package: "swift-argument-parser"),
.product(name: "CustomDump", package: "swift-custom-dump"),
// .product(name: "DotEnv", package: "DotEnv"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
), ),
.target( .target(
name: "Models", name: "Models",
@@ -50,19 +72,33 @@ let package = Package(
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
), ),
.target(
name: "MQTTManager",
dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
),
.target( .target(
name: "MQTTConnectionService", name: "MQTTConnectionService",
dependencies: [ dependencies: [
"Models", "Models",
.product(name: "MQTTNIO", package: "mqtt-nio"), "MQTTManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle") .product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
), ),
.testTarget( .testTarget(
name: "MQTTConnectionServiceTests", name: "IntegrationTests",
dependencies: [ dependencies: [
"DewPointController",
"MQTTConnectionService", "MQTTConnectionService",
"MQTTManager",
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics"),
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
] ]
), ),
@@ -70,20 +106,14 @@ let package = Package(
name: "SensorsService", name: "SensorsService",
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionService", "MQTTManager",
.product(name: "Dependencies", package: "swift-dependencies"), .product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"), .product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "PsychrometricClient", package: "swift-psychrometrics"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle") .product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
),
.testTarget(
name: "SensorsServiceTests",
dependencies: [
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
) )
] ]
) )

View File

@@ -1,3 +1,7 @@
# dewPoint-controller # dewpoint-controller
A description of this package. ![CI](https://git.housh.dev/michael/swift-mqtt-dewpoint/actions/workflows/ci.yaml/badge.svg?branch=main)
Listens to an MQTT broker for temperature and humidity sensors and calculates
the dew-point temperature and enthalpy for the sensor, then publishes those back
to the MQTT broker.

View File

@@ -0,0 +1,224 @@
import Dependencies
import DependenciesMacros
import DotEnv
import Foundation
import Logging
import Models
import MQTTNIO
import NIO
public extension DependencyValues {
var cliClient: CliClient {
get { self[CliClient.self] }
set { self[CliClient.self] = newValue }
}
}
/// Represents the interface needed for the command line application.
///
///
@DependencyClient
public struct CliClient {
/// Parse a log level from the given `EnvVars`.
public var logLevel: @Sendable (EnvVars) -> Logger.Level = { _ in .debug }
/// Generate the `EnvVars` with the given parameters.
public var makeEnvVars: @Sendable (EnvVarsRequest) async throws -> EnvVars
/// Generate the `MQTTClient` with the given parameters.
public var makeClient: @Sendable (ClientRequest) throws -> MQTTClient
/// Attempt to parse a string to an `MQTTClient.Version`.
public var parseMqttClientVersion: @Sendable (String) -> MQTTClient.Version?
/// Represents the parameters needed to create an `MQTTClient`.
///
public struct ClientRequest: Sendable {
/// The environment variables used to create the client.
public let environment: EnvVars
/// The event loop group for the client.
public let eventLoopGroup: MultiThreadedEventLoopGroup
/// A logger to use with the client.
public let logger: Logger?
/// Create a new client request.
///
/// - Parameters:
/// - environment: The environment variables to use.
/// - eventLoopGroup: The event loop group to use on the client.
/// - logger: An optional logger to use on the client.
public init(
environment: EnvVars,
eventLoopGroup: MultiThreadedEventLoopGroup,
logger: Logger?
) {
self.environment = environment
self.eventLoopGroup = eventLoopGroup
self.logger = logger
}
}
public struct EnvVarsRequest: Sendable {
public let envFilePath: String?
public let logger: Logger?
public let mqttClientVersion: String?
public init(
envFilePath: String? = nil,
logger: Logger? = nil,
version mqttClientVersion: String? = nil
) {
self.envFilePath = envFilePath
self.logger = logger
self.mqttClientVersion = mqttClientVersion
}
}
}
extension CliClient: DependencyKey {
public static let testValue: CliClient = Self()
public static var liveValue: CliClient {
Self(
logLevel: { Logger.Level.from(environment: $0) },
makeEnvVars: {
try await EnvVars.load(
dotEnvFile: $0.envFilePath,
logger: $0.logger,
version: $0.mqttClientVersion
)
},
makeClient: {
MQTTClient(
environment: $0.environment,
eventLoopGroup: $0.eventLoopGroup,
logger: $0.logger
)
},
parseMqttClientVersion: { .init(string: $0) }
)
}
}
// MARK: - Helpers
extension EnvironmentDependency {
func dotEnvDict(path: String?) async throws -> [String: String] {
guard let path,
let file = FileType(path: path)
else { return [:] }
return try await load(file)
}
}
extension EnvVars {
/// Load the `EnvVars` from the environment.
///
/// - Paramaters:
/// - dotEnvFile: An optional environment file to load.
/// - logger: An optional logger to use for debugging.
/// - version: A version that is specified from command line, ignoring any environment variable.
static func load(
dotEnvFile: String?,
logger: Logger?,
version: String?
) async throws -> EnvVars {
@Dependency(\.environment) var environment
let defaultEnvVars = EnvVars()
let coders = environment.coders()
let defaultEnvDict = (try? coders.encode(defaultEnvVars))
.flatMap { try? coders.decode([String: String].self, from: $0) }
?? [:]
let dotEnvDict = try await environment.dotEnvDict(path: dotEnvFile)
let envVarsDict = defaultEnvDict
.merging(environment.processInfo(), uniquingKeysWith: { $1 })
.merging(dotEnvDict, uniquingKeysWith: { $1 })
var envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? coders.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
if let version {
envVars.version = version
}
logger?.debug("Done loading EnvVars...")
return envVars
}
}
@_spi(Internal)
public extension MQTTClient {
convenience init(
environment envVars: EnvVars,
eventLoopGroup: EventLoopGroup,
logger: Logger?
) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .parseOrDefault(string: envVars.version),
disablePing: false,
userName: envVars.userName,
password: envVars.password
)
)
}
}
@_spi(Internal)
public extension MQTTClient.Version {
static let `default` = Self.v3_1_1
static func parseOrDefault(string: String?) -> Self {
guard let string, let value = Self(string: string) else {
return .default
}
return value
}
init?(string: String) {
if string.contains("5") {
self = .v5_0
} else if string.contains("3") {
self = .v3_1_1
} else {
return nil
}
}
}
extension Logger.Level {
/// Parse a `Logger.Level` from the loaded `EnvVars`.
static func from(environment envVars: EnvVars) -> Self {
// If the log level was set via an environment variable.
if let logLevel = envVars.logLevel {
return logLevel
}
// Parse the appEnv to derive an log level.
switch envVars.appEnv {
case .staging, .development:
return .debug
case .production:
return .info
case .testing:
return .trace
}
}
}

View File

@@ -0,0 +1,134 @@
import Dependencies
import DependenciesMacros
import DotEnv
import Foundation
import Models
@_spi(Internal)
public extension DependencyValues {
/// A dependecy responsible for loding environment variables.
///
/// This is just used internally of this module, but is useful to
/// override for testing purposes, so import using `@_spi(Internal)`.
var environment: EnvironmentDependency {
get { self[EnvironmentDependency.self] }
set { self[EnvironmentDependency.self] = newValue }
}
}
/// Responsible for loading environment variables and files.
///
///
@_spi(Internal)
@DependencyClient
public struct EnvironmentDependency: Sendable {
public var coders: @Sendable () -> any Coderable = { JSONCoders() }
/// Load the variables based on the request.
public var load: @Sendable (FileType) async throws -> [String: String] = { _ in [:] }
/// Load the environment variables based on the current process environment.
///
/// You can override this to use an empty environment, which is useful for testing purposes.
public var processInfo: @Sendable () -> [String: String] = { [:] }
/// Represents the types of files that can be loaded and decoded into
/// the environment.
public enum FileType: Equatable {
case dotEnv(path: String)
case json(path: String)
public init?(path: String) {
let strings = path.split(separator: ".")
guard let ext = strings.last else {
return nil
}
switch ext {
case "env":
self = .dotEnv(path: path)
case "json":
self = .json(path: path)
default:
return nil
}
}
}
}
struct DecodeError: Error {}
@_spi(Internal)
extension EnvironmentDependency: DependencyKey {
public static let testValue: EnvironmentDependency = Self()
public static func live(
decoder: JSONDecoder = .init(),
encoder: JSONEncoder = .init()
) -> Self {
Self(
coders: { JSONCoders(decoder: decoder, encoder: encoder) },
load: { file in
switch file {
case let .dotEnv(path: path):
let file = try DotEnv.read(path: path)
return file.lines.reduce(into: [String: String]()) { partialResult, line in
partialResult[line.key] = line.value
}
case let .json(path: path):
let url = url(for: path)
return try decoder.decode(
[String: String].self,
from: Data(contentsOf: url)
)
}
},
processInfo: { ProcessInfo.processInfo.environment }
)
}
public static let liveValue: EnvironmentDependency = .live()
}
/// A type that encode and decode values.
///
/// This is really just here to override tests with coders that will throw an error,
/// instead of encoding or decoding data.
///
@_spi(Internal)
public protocol Coderable {
func encode<T: Encodable>(_ instance: T) throws -> Data
func decode<T: Decodable>(_ type: T.Type, from data: Data) throws -> T
}
struct JSONCoders: Coderable {
let decoder: JSONDecoder
let encoder: JSONEncoder
init(
decoder: JSONDecoder = .init(),
encoder: JSONEncoder = .init()
) {
self.decoder = decoder
self.encoder = encoder
}
func encode<T>(_ instance: T) throws -> Data where T: Encodable {
try encoder.encode(instance)
}
func decode<T>(_ type: T.Type, from data: Data) throws -> T where T: Decodable {
try decoder.decode(T.self, from: data)
}
}
private func url(for path: String) -> URL {
#if os(Linux)
return URL(fileURLWithPath: path)
#else
return URL(filePath: path)
#endif
}

View File

@@ -0,0 +1,21 @@
import ArgumentParser
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionService
import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
@main
struct Application: AsyncParsableCommand {
static let configuration = CommandConfiguration(
commandName: "dewpoint-controller",
abstract: "Command for running the dewpoint mqtt service.",
subcommands: [Run.self, Debug.self]
)
}

View File

@@ -0,0 +1,74 @@
import ArgumentParser
import CliClient
import CustomDump
import Dependencies
import DotEnv
import Foundation
import Logging
import Models
extension Application {
struct Debug: AsyncParsableCommand {
static let configuration: CommandConfiguration = .init(
commandName: "debug",
abstract: "Debug the environment variables and command line arguments."
)
@OptionGroup
var options: SharedOptions
@Flag(
name: [.customLong("show-password")],
help: "Don't redact the password from the console."
)
var showPassword: Bool = false
mutating func run() async throws {
@Dependency(\.cliClient) var client
let logger = Logger(label: "debug-command")
print("--------------------------")
print("Running debug command...")
if let envFile = options.envFile {
print("Reading env file: \(envFile)")
print("--------------------------")
} else {
print("No env file set.")
print("--------------------------")
}
print("Loading EnvVars")
print("--------------------------")
let envVars = try await client.makeEnvVars(options.envVarsRequest(logger: logger))
printEnvVars(envVars: envVars, showPassword: showPassword)
print("--------------------------")
if let logLevel = options.logLevel {
print("Log Level option: \(logLevel)")
print("--------------------------")
} else {
print("Log Level option: nil")
print("--------------------------")
}
}
private func printEnvVars(envVars: EnvVars, showPassword: Bool) {
// show the proper password to show depending on if it exists
// and if we should redact it or not.
var passwordString: String?
switch (showPassword, envVars.password) {
case (true, .none), (_, .none):
break
case (true, let .some(password)):
passwordString = password
case (false, .some):
passwordString = "<redacted>"
}
var envVars = envVars
envVars.password = passwordString
customDump(envVars)
}
}
}

View File

@@ -0,0 +1,90 @@
import ArgumentParser
import CliClient
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionService
import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
extension Application {
/// Run the controller.
///
struct Run: AsyncParsableCommand {
static let configuration = CommandConfiguration(
commandName: "run",
abstract: "Run the controller."
)
@OptionGroup
var options: SharedOptions
mutating func run() async throws {
@Dependency(\.cliClient) var cliClient
let (mqtt, logger) = try await cliClient.setupRun(options: options)
logger.info("Setting up environment...")
do {
try await withDependencies {
$0.psychrometricClient = .liveValue
$0.mqtt = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
// These settings prevent services from running forever after we've
// received a shutdown signal. In general it should not needed unless the
// services don't shutdown their async streams properly.
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
logger.info("Starting dewpoint-controller!")
try await serviceGroup.run()
}
// Here we've received a shutdown signal and shutdown all the
// services.
try await mqtt.shutdown()
} catch {
// If something fails, shutdown the mqtt client.
try await mqtt.shutdown()
}
}
}
}
private extension CliClient {
func setupRun(
eventLoopGroup: MultiThreadedEventLoopGroup = .init(numberOfThreads: 1),
loggerLabel: String = "dewpoint-controller",
options: Application.SharedOptions
) async throws -> (MQTTClient, Logger) {
var logger = Logger(label: loggerLabel)
let environment = try await makeEnvVars(options.envVarsRequest(logger: logger))
logger.logLevel = logLevel(environment)
let client = try makeClient(.init(
environment: environment,
eventLoopGroup: eventLoopGroup,
logger: logger
))
return (client, logger)
}
}

View File

@@ -0,0 +1,52 @@
import ArgumentParser
import CliClient
import Logging
import Models
import MQTTNIO
extension Application {
struct SharedOptions: ParsableArguments {
@Option(
name: [.short, .customLong("env-file")],
help: "A file path to an env file."
)
var envFile: String?
@Option(
name: [.short, .customLong("log-level")],
help: "Set the logging level."
)
var logLevelContainer: LogLevelContainer?
@Option(
name: [.short, .long],
help: "Set the MQTT connecition version."
)
var version: String?
func envVarsRequest(logger: Logger?) -> CliClient.EnvVarsRequest {
.init(envFilePath: envFile, logger: logger, version: version)
}
var logLevel: Logger.Level? { logLevelContainer?.logLevel }
}
}
/// A container type for making `Logger.Level` into a type
/// that can be parsed as a command line argument. This is
/// to suppress warnings vs. having `Logger.Level` adopt the
/// protocol.
@_spi(Internal)
public struct LogLevelContainer: ExpressibleByArgument {
public let logLevel: Logger.Level?
public init?(argument: String) {
self.logLevel = .init(rawValue: argument.lowercased())
}
public func callAsFunction() -> Logger.Level? {
logLevel
}
}

View File

@@ -1,151 +1,38 @@
@preconcurrency import Foundation import Dependencies
import Logging import Logging
import Models import Models
import MQTTNIO import MQTTManager
import NIO
import ServiceLifecycle import ServiceLifecycle
// TODO: This may not need to be an actor. public struct MQTTConnectionService: Service {
/// Manages the MQTT broker connection. private let logger: Logger?
public actor MQTTConnectionService: Service {
private let cleanSession: Bool
public let client: MQTTClient
private let continuation: AsyncStream<Event>.Continuation
public nonisolated let events: AsyncStream<Event>
private let internalEventStream: ConnectionStream
nonisolated var logger: Logger { client.logger }
// private var shuttingDown = false
public init( public init(
cleanSession: Bool = true, logger: Logger? = nil
client: MQTTClient
) { ) {
self.cleanSession = cleanSession var logger = logger
self.client = client logger?[metadataKey: "type"] = "mqtt-connection-service"
self.internalEventStream = .init() self.logger = logger
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
} }
deinit { /// The entry-point of the service which starts the connection
self.logger.debug("MQTTConnectionService is gone.") /// to the MQTT broker and handles graceful shutdown of the
self.internalEventStream.stop() /// connection.
continuation.finish()
}
/// The entry-point of the service.
///
/// This method connects to the MQTT broker and manages the connection.
/// It will attempt to gracefully shutdown the connection upon receiving
/// `sigterm` signals.
public func run() async throws { public func run() async throws {
await withGracefulShutdownHandler { @Dependency(\.mqtt) var mqtt
await withDiscardingTaskGroup { group in try await mqtt.connect()
group.addTask { await self.connect() }
group.addTask { try await withGracefulShutdownHandler {
await self.internalEventStream.start { self.client.isActive() } for await event in try mqtt.connectionStream().cancelOnGracefulShutdown() {
} // We don't really need to do anything with the events, so just logging
for await event in self.internalEventStream.events.cancelOnGracefulShutdown() { // for now. But we need to iterate on an async stream for the service to
if event == .shuttingDown { // continue to run and handle graceful shutdowns.
self.shutdown() logger?.trace("Received connection event: \(event)")
break
}
self.logger.trace("Sending connection event: \(event)")
self.continuation.yield(event)
}
group.cancelAll()
} }
} onGracefulShutdown: { } onGracefulShutdown: {
self.logger.trace("Received graceful shutdown.") self.logger?.trace("Received graceful shutdown.")
self.shutdown() mqtt.shutdown()
} }
} }
func connect() async {
do {
try await withThrowingDiscardingTaskGroup { group in
group.addTask {
try await self.client.connect(cleanSession: self.cleanSession)
}
client.addCloseListener(named: "\(Self.self)") { _ in
Task {
self.logger.debug("Connection closed.")
self.logger.debug("Reconnecting...")
await self.connect()
}
}
self.logger.debug("Connection successful.")
self.continuation.yield(.connected)
}
} catch {
logger.trace("Failed to connect: \(error)")
continuation.yield(.disconnected)
}
}
private nonisolated func shutdown() {
logger.debug("Begin shutting down MQTT broker connection.")
client.removeCloseListener(named: "\(Self.self)")
internalEventStream.stop()
_ = client.disconnect()
try? client.syncShutdownGracefully()
continuation.finish()
logger.info("MQTT broker connection closed.")
}
}
extension MQTTConnectionService {
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
// TODO: This functionality can probably move into the connection service.
private final class ConnectionStream: Sendable {
// private var cancellable: AnyCancellable?
private let continuation: AsyncStream<MQTTConnectionService.Event>.Continuation
let events: AsyncStream<MQTTConnectionService.Event>
init() {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTConnectionService.Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
stop()
}
func start(isActive connectionIsActive: @escaping () -> Bool) async {
try? await Task.sleep(for: .seconds(1))
let event: MQTTConnectionService.Event = connectionIsActive()
? .connected
: .disconnected
continuation.yield(event)
// cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
// .autoconnect()
// .sink { [weak self] (_: Date) in
// let event: MQTTConnectionService.Event = connectionIsActive()
// ? .connected
// : .disconnected
//
// self?.continuation.yield(event)
// }
}
func stop() {
continuation.yield(.shuttingDown)
continuation.finish()
}
}
} }

View File

@@ -0,0 +1,221 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker, listen to topics, and publish values back to the
/// broker.
var mqtt: MQTTManager {
get { self[MQTTManager.self] }
set { self[MQTTManager.self] = newValue }
}
}
/// Represents the interface needed to connect, listen, and publish to an MQTT broker.
///
@DependencyClient
public struct MQTTManager: Sendable {
public typealias ListenStream = AsyncStream<MQTTPublishInfo>
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Create a stream of connection events.
///
/// - SeeAlso: ``Event``
public var connectionStream: @Sendable () throws -> AsyncStream<Event>
private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream
/// Publish a value to the MQTT broker for a given topic.
public var publish: @Sendable (PublishRequest) async throws -> Void
/// Shutdown the connection to the MQTT broker.
public var shutdown: @Sendable () -> Void
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
public init(
connect: @escaping @Sendable () async throws -> Void,
connectionStream: @escaping @Sendable () throws -> AsyncStream<MQTTManager.Event>,
listen: @escaping @Sendable ([String], MQTTQoS) async throws -> MQTTManager.ListenStream,
publish: @escaping @Sendable (MQTTManager.PublishRequest) async throws -> Void,
shutdown: @escaping @Sendable () -> Void,
withClient: @escaping @Sendable ((MQTTClient) async throws -> Void) async throws -> Void = { _ in unimplemented() }
) {
self.connect = connect
self.connectionStream = connectionStream
self._listen = listen
self.publish = publish
self.shutdown = shutdown
self._withClient = withClient
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await listen(to: topics, qos: qos)
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - payload: The value to publish.
/// - topicName: The topic to publish the new value to.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
_ payload: ByteBuffer,
to topicName: String,
qos: MQTTQoS,
retain: Bool = false,
properties: MQTTProperties = .init()
) async throws {
try await publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain,
properties: properties
))
}
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Internal) import MQTTManager` to use this method.
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Equatable, Sendable {
case connected
case disconnected
case shuttingDown
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
public let properties: MQTTProperties
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool,
properties: MQTTProperties
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
self.properties = properties
}
}
}
public extension MQTTManager {
/// Create the live manager.
///
static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init(
connect: { try await manager.connect(cleanSession: cleanSession) },
connectionStream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
},
listen: { topics, qos in
try await manager.listen(to: topics, qos: qos)
},
publish: { request in
let topic = request.topicName
guard client.isActive() else {
logger?.debug("Client is not active, unable to publish to topic: \(topic)")
return
}
logger?.trace("Begin publishing to topic: \(topic)")
defer { logger?.debug("Done publishing to topic: \(topic)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain,
properties: request.properties
).get()
},
shutdown: {
Task { try await client.shutdown() }
manager.shutdown()
},
withClient: { callback in
try await callback(client)
}
)
}
}
extension MQTTManager: TestDependencyKey {
public static let testValue: MQTTManager = Self()
}

View File

@@ -0,0 +1,98 @@
import Foundation
import Logging
import MQTTNIO
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
private var listeners: [TopicListenerStream] = []
private var isShuttingDown = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
var logger = logger
logger?[metadataKey: "instance"] = "\(Self.self)"
self.logger = logger
self.client = client
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
if !isShuttingDown {
let message = """
Did not properly close the connection manager. This can lead to
dangling references.
Please call `shutdown` to properly close all connections and listener streams.
"""
logger?.warning("\(message)")
self.shutdown()
}
}
private func setHasConnected() {
hasConnected = true
}
func listen(
to topics: [String],
qos: MQTTQoS
) async throws -> MQTTManager.ListenStream {
let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos)
listeners.append(listener)
await listener.start()
return listener.stream
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
private func shutdownListeners() {
_ = listeners.map { $0.shutdown() }
listeners = []
isShuttingDown = true
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
Task { await shutdownListeners() }
}
}

View File

@@ -0,0 +1,74 @@
import Foundation
import Logging
import MQTTNIO
@_spi(Internal)
public actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
nonisolated let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}

View File

@@ -0,0 +1,141 @@
import Foundation
import Logging
import MQTTNIO
actor TopicListenerStream {
typealias Stream = MQTTManager.ListenStream
private let client: MQTTClient
private let configuration: Configuration
private let continuation: Stream.Continuation
private let logger: Logger?
private let name: String
let stream: Stream
private var shuttingDown: Bool = false
private var onShutdownHandler: (@Sendable () -> Void)?
init(
client: MQTTClient,
logger: Logger?,
topics: [String],
qos: MQTTQoS
) {
// Setup the logger so we can more easily decipher log messages.
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = Stream.makeStream()
self.client = client
self.configuration = .init(qos: qos, topics: topics)
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
struct Configuration: Sendable {
let qos: MQTTQoS
let topics: [String]
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
private func subscribe() async throws {
guard !shuttingDown else { return }
logger?.debug("Begin subscribing to topics.")
do {
_ = try await client.subscribe(to: configuration.topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: configuration.qos)
})
} catch {
logger?.error("Received error while subscribing to topics: \(configuration.topics)")
throw TopicListenerError.failedToSubscribe
}
logger?.debug("Done subscribing to topics.")
}
public func start() {
logger?.trace("Starting listener for topics: \(configuration.topics)")
let stream = MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
let task = Task {
// Listen for connection events to restablish the stream upon a
// client becoming disconnected / reconnected, and properly shutdown
// the stream on the client being shutdown.
for await event in stream {
logger?.trace("Received event: \(event)")
switch event {
case .shuttingDown:
shutdown()
case .disconnected:
try await Task.sleep(for: .milliseconds(100))
case .connected:
try await subscribe()
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.logger?.error("Received error while listening: \(error)")
case let .success(publishInfo):
if self.configuration.topics.contains(publishInfo.topicName) {
self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(publishInfo)
}
}
}
}
}
}
onShutdownHandler = { task.cancel() }
}
private func setIsShuttingDown() {
shuttingDown = true
onShutdownHandler = nil
}
public nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task {
await onShutdownHandler?()
await self.setIsShuttingDown()
}
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -25,6 +25,12 @@ public struct EnvVars: Codable, Equatable, Sendable {
/// The MQTT user password. /// The MQTT user password.
public var password: String? public var password: String?
/// Set a custom logging level.
public var logLevel: Logger.Level?
/// Set the mqtt broker version.
public var version: String?
/// Create a new ``EnvVars`` /// Create a new ``EnvVars``
/// ///
/// - Parameters: /// - Parameters:
@@ -38,9 +44,11 @@ public struct EnvVars: Codable, Equatable, Sendable {
appEnv: AppEnv = .development, appEnv: AppEnv = .development,
host: String = "127.0.0.1", host: String = "127.0.0.1",
port: String? = "1883", port: String? = "1883",
identifier: String = "dewPoint-controller", identifier: String = "dewpoint-controller",
userName: String? = "mqtt_user", userName: String? = "mqtt_user",
password: String? = "secret!" password: String? = "secret!",
logLevel: Logger.Level? = nil,
version: String? = nil
) { ) {
self.appEnv = appEnv self.appEnv = appEnv
self.host = host self.host = host
@@ -48,6 +56,8 @@ public struct EnvVars: Codable, Equatable, Sendable {
self.identifier = identifier self.identifier = identifier
self.userName = userName self.userName = userName
self.password = password self.password = password
self.logLevel = logLevel
self.version = version
} }
/// Custom coding keys. /// Custom coding keys.
@@ -58,6 +68,8 @@ public struct EnvVars: Codable, Equatable, Sendable {
case identifier = "MQTT_IDENTIFIER" case identifier = "MQTT_IDENTIFIER"
case userName = "MQTT_USERNAME" case userName = "MQTT_USERNAME"
case password = "MQTT_PASSWORD" case password = "MQTT_PASSWORD"
case logLevel = "LOG_LEVEL"
case version = "MQTT_VERSION"
} }
/// Represents the different app environments. /// Represents the different app environments.

View File

@@ -56,10 +56,11 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var dewPoint: DewPoint? { public var dewPoint: DewPoint? {
get async { get async {
guard let temperature = temperature, guard let temperature = temperature,
let humidity = humidity let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil } else { return nil }
return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity)) return try? await psychrometrics.dewPoint(.dryBulb(temperature, relativeHumidity: humidity))
// return .init(dryBulb: temperature, humidity: humidity)
} }
} }
@@ -67,12 +68,13 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
public var enthalpy: EnthalpyOf<MoistAir>? { public var enthalpy: EnthalpyOf<MoistAir>? {
get async { get async {
guard let temperature = temperature, guard let temperature = temperature,
let humidity = humidity let humidity = humidity,
!temperature.value.isNaN,
!humidity.value.isNaN
else { return nil } else { return nil }
return try? await psychrometrics.enthalpy.moistAir( return try? await psychrometrics.enthalpy.moistAir(
.dryBulb(temperature, relativeHumidity: humidity, altitude: altitude) .dryBulb(temperature, relativeHumidity: humidity, altitude: altitude)
) )
// return .init(dryBulb: temperature, humidity: humidity, altitude: altitude)
} }
} }
@@ -129,11 +131,19 @@ public struct TemperatureAndHumiditySensor: Identifiable, Sendable {
prefix = "\(prefix.dropLast())" prefix = "\(prefix.dropLast())"
} }
self.init( self.init(
dewPoint: "\(prefix)/sensors/\(location.rawValue)_dew_point/state", dewPoint: "\(prefix)/sensor/\(location.rawValue)_dew_point/state",
enthalpy: "\(prefix)/sensors/\(location.rawValue)_enthalpy/state", enthalpy: "\(prefix)/sensor/\(location.rawValue)_enthalpy/state",
humidity: "\(prefix)/sensors/\(location.rawValue)_humidity/state", humidity: "\(prefix)/sensor/\(location.rawValue)_humidity/state",
temperature: "\(prefix)/sensors/\(location.rawValue)_temperature/state" temperature: "\(prefix)/sensor/\(location.rawValue)_temperature/state"
) )
} }
} }
} }
public extension Array where Element == TemperatureAndHumiditySensor {
static var live: Self {
TemperatureAndHumiditySensor.Location.allCases.map {
TemperatureAndHumiditySensor(location: $0)
}
}
}

View File

@@ -3,7 +3,7 @@
/// This allows values to only publish changes if they have changed since the /// This allows values to only publish changes if they have changed since the
/// last time they were recieved. /// last time they were recieved.
@propertyWrapper @propertyWrapper
public struct TrackedChanges<Value> { public struct TrackedChanges<Value: Sendable>: Sendable {
/// The current tracking state. /// The current tracking state.
private var tracking: TrackingState private var tracking: TrackingState
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
private var value: Value private var value: Value
/// Used to check if a new value is equal to an old value. /// Used to check if a new value is equal to an old value.
private var isEqual: (Value, Value) -> Bool private var isEqual: @Sendable (Value, Value) -> Bool
/// Access to the underlying property that we are wrapping. /// Access to the underlying property that we are wrapping.
public var wrappedValue: Value { public var wrappedValue: Value {
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
public init( public init(
wrappedValue: Value, wrappedValue: Value,
needsProcessed: Bool = false, needsProcessed: Bool = false,
isEqual: @escaping (Value, Value) -> Bool isEqual: @escaping @Sendable (Value, Value) -> Bool
) { ) {
self.value = wrappedValue self.value = wrappedValue
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
wrappedValue: Value, wrappedValue: Value,
needsProcessed: Bool = false needsProcessed: Bool = false
) { ) {
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==) self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
$0 == $1
}
} }
} }
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
hasher.combine(needsProcessed) hasher.combine(needsProcessed)
} }
} }
extension TrackedChanges: Sendable where Value: Sendable {}

View File

@@ -1,45 +0,0 @@
import Logging
import Models
import MQTTNIO
import NIO
import NIOFoundationCompat
import PsychrometricClient
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: inout ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let string = buffer.readString(
length: buffer.readableBytes,
encoding: String.Encoding.utf8
)
else { return nil }
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = RawValue(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}

View File

@@ -3,81 +3,29 @@ import DependenciesMacros
import Foundation import Foundation
import Logging import Logging
import Models import Models
import MQTTConnectionService import MQTTManager
@preconcurrency import MQTTNIO import MQTTNIO
import NIO import NIO
import PsychrometricClient import PsychrometricClient
import ServiceLifecycle import ServiceLifecycle
@DependencyClient /// Service that is responsible for listening to changes of the temperature and humidity
public struct SensorsClient: Sendable { /// sensors, then publishing back the calculated dew-point temperature and enthalpy for
/// the sensor location.
///
///
public actor SensorsService: Service {
public var listen: @Sendable ([String]) async throws -> AsyncStream<MQTTPublishInfo> @Dependency(\.mqtt) var mqtt
public var logger: Logger?
public var publish: @Sendable (Double, String) async throws -> Void
public var shutdown: @Sendable () -> Void = {}
public func listen(to topics: [String]) async throws -> AsyncStream<MQTTPublishInfo> { /// The logger to use for the service.
try await listen(topics) private let logger: Logger?
}
public func publish(_ value: Double, to topic: String) async throws {
try await publish(value, topic)
}
}
extension SensorsClient: TestDependencyKey {
public static var testValue: SensorsClient {
Self()
}
}
public extension DependencyValues {
var sensorsClient: SensorsClient {
get { self[SensorsClient.self] }
set { self[SensorsClient.self] = newValue }
}
}
public actor SensorsService2: Service {
@Dependency(\.sensorsClient) var client
/// The sensors that we are listening for updates to, so
/// that we can calculate the dew-point temperature and enthalpy
/// values to publish back to the MQTT broker.
private var sensors: [TemperatureAndHumiditySensor] private var sensors: [TemperatureAndHumiditySensor]
public init(
sensors: [TemperatureAndHumiditySensor]
) {
self.sensors = sensors
}
public func run() async throws {
guard sensors.count > 0 else {
throw SensorCountError()
}
let stream = try await client.listen(to: topics)
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
for await result in stream.cancelOnGracefulShutdown() {
group.addTask { try await self.handleResult(result) }
}
}
} onGracefulShutdown: {
Task {
await self.client.logger?.trace("Received graceful shutdown.")
try? await self.publishUpdates()
await self.client.shutdown()
}
}
} catch {
client.logger?.trace("Error: \(error)")
client.shutdown()
}
}
private var topics: [String] { private var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature) array.append(sensor.topics.temperature)
@@ -85,223 +33,119 @@ public actor SensorsService2: Service {
} }
} }
private func handleResult(_ result: MQTTPublishInfo) async throws { /// Create a new sensors service that listens to the passed in
let topic = result.topicName /// sensors.
client.logger?.trace("Begin handling result for topic: \(topic)") ///
/// - Note: The service will fail to start if the array of sensors is not greater than 0.
///
/// - Parameters:
/// - sensors: The sensors to listen for changes to.
/// - logger: An optional logger to use.
public init(
sensors: [TemperatureAndHumiditySensor],
logger: Logger? = nil
) {
self.sensors = sensors
self.logger = logger
}
func decode<V: BufferInitalizable>(_: V.Type) -> V? { /// Start the service with graceful shutdown, which will attempt to publish
var buffer = result.payload /// any pending changes to the MQTT broker, upon a shutdown signal.
return V(buffer: &buffer) public func run() async throws {
} precondition(sensors.count > 0, "Sensors should not be empty.")
if topic.contains("temperature") { let stream = try await makeStream()
client.logger?.trace("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else { await withGracefulShutdownHandler {
client.logger?.trace("Failed to decode temperature: \(result.payload)") for await result in stream.cancelOnGracefulShutdown() {
throw DecodingError() logger?.debug("Received result for topic: \(result.topic)")
await handleResult(result)
} }
client.logger?.trace("Decoded temperature: \(temperature)") } onGracefulShutdown: {
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) self.logger?.debug("Received graceful shutdown.")
Task {
} else if topic.contains("humidity") { try await self.shutdown()
client.logger?.trace("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
client.logger?.trace("Failed to decode humidity: \(result.payload)")
throw DecodingError()
} }
client.logger?.trace("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
} else {
client.logger?.error("Received unexpected topic, expected topic to contain 'temperature' or 'humidity'!")
return
} }
}
@_spi(Internal)
public func shutdown() async throws {
try await publishUpdates() try await publishUpdates()
client.logger?.trace("Done handling result for topic: \(topic)") }
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
// ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently.
try await mqtt.listen(to: topics)
.map { ($0.payload, $0.topicName) }
.removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic
}
.eraseToStream()
}
private func handleResult(_ result: (buffer: ByteBuffer, topic: String)) async {
do {
let topic = result.topic
assert(topics.contains(topic))
logger?.debug("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
return V(buffer: result.buffer)
}
if topic.contains("temperature") {
logger?.debug("Begin handling temperature result.")
guard let temperature = decode(DryBulb.self) else {
logger?.debug("Failed to decode temperature: \(result.buffer)")
throw DecodingError()
}
logger?.debug("Decoded temperature: \(temperature)")
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
} else if topic.contains("humidity") {
logger?.debug("Begin handling humidity result.")
guard let humidity = decode(RelativeHumidity.self) else {
logger?.debug("Failed to decode humidity: \(result.buffer)")
throw DecodingError()
}
logger?.debug("Decoded humidity: \(humidity)")
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
}
try await publishUpdates()
logger?.debug("Done handling result for topic: \(topic)")
} catch {
logger?.error("Received error while handling result: \(error)")
}
} }
private func publish(_ double: Double?, to topic: String) async throws { private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return } guard let double else { return }
try await client.publish(double, to: topic) try await mqtt.publish(
client.logger?.trace("Published update to topic: \(topic)") ByteBufferAllocator().buffer(string: "\(double)"),
to: topic,
qos: .exactlyOnce,
retain: true
)
logger?.debug("Published update to topic: \(topic)")
} }
private func publishUpdates() async throws { private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) { for sensor in sensors.filter(\.needsProcessed) {
try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint) try await publish(sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy) try await publish(sensor.enthalpy?.value, to: sensor.topics.enthalpy)
}
}
}
public actor SensorsService: Service {
private var sensors: [TemperatureAndHumiditySensor]
private let client: MQTTClient
private let events: @Sendable () -> AsyncStream<MQTTConnectionService.Event>
nonisolated var logger: Logger { client.logger }
private var shuttingDown: Bool = false
public init(
client: MQTTClient,
events: @Sendable @escaping () -> AsyncStream<MQTTConnectionService.Event>,
sensors: [TemperatureAndHumiditySensor]
) {
self.client = client
self.events = events
self.sensors = sensors
}
/// The entry-point of the service.
///
/// This method is called to start the service and begin
/// listening for sensor value changes then publishing the dew-point
/// and enthalpy values of the sensors.
public func run() async throws {
do {
try await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
client.addPublishListener(named: "\(Self.self)") { result in
if self.shuttingDown {
self.logger.trace("Shutting down.")
} else if !self.client.isActive() {
self.logger.trace("Client is not currently active")
} else {
Task { try await self.handleResult(result) }
}
}
for await event in self.events().cancelOnGracefulShutdown() {
logger.trace("Received event: \(event)")
if event == .shuttingDown {
self.setIsShuttingDown()
} else if event == .connected {
group.addTask { try await self.subscribeToSensors() }
} else {
group.addTask { await self.unsubscribeToSensors() }
group.addTask { try? await Task.sleep(for: .milliseconds(100)) }
}
}
}
} onGracefulShutdown: {
// do something.
self.logger.debug("Received graceful shutdown.")
Task { [weak self] in await self?.setIsShuttingDown() }
}
} catch {
// WARN: We always get an MQTTNIO `noConnection` error here, which generally is not an issue,
// but causes service ServiceLifecycle to fail, so currently just ignoring errors that are thrown.
// However we do receive the unsubscribe message back from the MQTT broker, so it is likely safe
// to ignore the `noConnection` error.
logger.trace("Run error: \(error)")
// throw error
}
}
private func setIsShuttingDown() {
logger.debug("Received shut down event.")
Task { try await publishUpdates() }
Task { await self.unsubscribeToSensors() }
shuttingDown = true
client.removePublishListener(named: "\(Self.self)")
}
private func handleResult(
_ result: Result<MQTTPublishInfo, any Error>
) async throws {
logger.trace("Begin handling result")
do {
switch result {
case let .failure(error):
logger.debug("Failed receiving sensor: \(error)")
throw error
case let .success(value):
// do something.
let topic = value.topicName
logger.trace("Received new value for topic: \(topic)")
if topic.contains("temperature") {
// do something.
var buffer = value.payload
guard let temperature = DryBulb(buffer: &buffer) else {
logger.trace("Decoding error for topic: \(topic)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
try await publishUpdates()
} else if topic.contains("humidity") {
var buffer = value.payload
// Decode and update the temperature value
guard let humidity = RelativeHumidity(buffer: &buffer) else {
logger.debug("Failed to decode humidity from buffer: \(buffer)")
throw DecodingError()
}
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
try await publishUpdates()
}
}
} catch {
logger.trace("Handle Result error: \(error)")
throw error
}
}
private func subscribeToSensors() async throws {
for sensor in sensors {
_ = try await client.subscribe(to: [
MQTTSubscribeInfo(topicFilter: sensor.topics.temperature, qos: .atLeastOnce),
MQTTSubscribeInfo(topicFilter: sensor.topics.humidity, qos: .atLeastOnce)
])
logger.debug("Subscribed to sensor: \(sensor.location)")
}
}
private func unsubscribeToSensors() async {
logger.trace("Begin unsubscribe to sensors.")
guard client.isActive() else {
logger.debug("Client is not active, skipping.")
return
}
do {
let topics = sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
}
try await client.unsubscribe(from: topics)
logger.trace("Unsubscribed from sensors.")
} catch {
logger.trace("Unsubscribe error: \(error)")
}
}
private func publish(double: Double?, to topic: String) async throws {
guard client.isActive() else { return }
guard let double else { return }
let rounded = round(double * 100) / 100
logger.debug("Publishing \(rounded), to: \(topic)")
try await client.publish(
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(rounded)"),
qos: .exactlyOnce,
retain: true
)
}
private func publishUpdates() async throws {
for sensor in sensors.filter(\.needsProcessed) {
try await publish(double: sensor.dewPoint?.value, to: sensor.topics.dewPoint)
try await publish(double: sensor.enthalpy?.value, to: sensor.topics.enthalpy)
try sensors.hasProcessed(sensor) try sensors.hasProcessed(sensor)
} }
} }
} }
// MARK: - Errors // MARK: - Errors
struct DecodingError: Error {} struct DecodingError: Error {}
struct MQTTClientNotConnected: Error {} struct SensorNotFoundError: Error {}
struct NotFoundError: Error {}
struct SensorExists: Error {}
struct SensorCountError: Error {}
// MARK: - Helpers // MARK: - Helpers
@@ -319,15 +163,50 @@ private extension Array where Element == TemperatureAndHumiditySensor {
with value: V with value: V
) throws { ) throws {
guard let index = firstIndex(where: { $0.topics.contains(topic) }) else { guard let index = firstIndex(where: { $0.topics.contains(topic) }) else {
throw NotFoundError() throw SensorNotFoundError()
} }
self[index][keyPath: keyPath] = value self[index][keyPath: keyPath] = value
} }
mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws { mutating func hasProcessed(_ sensor: TemperatureAndHumiditySensor) throws {
guard let index = firstIndex(where: { $0.id == sensor.id }) else { guard let index = firstIndex(where: { $0.id == sensor.id }) else {
throw NotFoundError() throw SensorNotFoundError()
} }
self[index].needsProcessed = false self[index].needsProcessed = false
} }
} }
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: ByteBuffer) {
let string = String(buffer: buffer)
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = RawValue(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}

View File

@@ -1,101 +0,0 @@
import Foundation
import Logging
import Models
import MQTTConnectionService
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
@main
struct Application {
/// The main entry point of the application.
static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var logger = Logger(label: "dewpoint-controller")
logger.logLevel = .trace
logger.info("Starting dewpoint-controller!")
let environment = loadEnvVars(logger: logger)
if environment.appEnv == .production {
logger.debug("Updating logging level to info.")
logger.logLevel = .info
}
let mqtt = MQTTClient(
envVars: environment,
eventLoopGroup: eventloopGroup,
logger: logger
)
let mqttConnection = MQTTConnectionService(client: mqtt)
let sensors = SensorsService(
client: mqtt,
events: { mqttConnection.events },
sensors: .live
)
let serviceGroup = ServiceGroup(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
try await serviceGroup.run()
}
}
// MARK: - Helpers
private func loadEnvVars(logger: Logger) -> EnvVars {
let defaultEnvVars = EnvVars()
let encoder = JSONEncoder()
let decoder = JSONDecoder()
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
let envVarsDict = defaultEnvDict
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
logger.debug("Done loading EnvVars...")
return envVars
}
private extension MQTTNIO.MQTTClient {
convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .v3_1_1,
disablePing: false,
userName: envVars.userName,
password: envVars.password
)
)
}
}
private extension Array where Element == TemperatureAndHumiditySensor {
static var live: Self {
TemperatureAndHumiditySensor.Location.allCases.map { location in
TemperatureAndHumiditySensor(location: location)
}
}
}

View File

@@ -0,0 +1,212 @@
@_spi(Internal) import CliClient
import Dependencies
import Foundation
import Logging
import Models
import MQTTNIO
import XCTest
final class CliClientTests: XCTestCase {
override func invokeTest() {
withDependencies {
$0.cliClient = .liveValue
$0.environment = .liveValue
$0.environment.processInfo = { [:] }
} operation: {
super.invokeTest()
}
}
func testParsingMQTTVersion() {
@Dependency(\.cliClient) var cliClient
let arguments = [
(MQTTClient.Version.v3_1_1, ["3", "3.1", "3.1.1", "00367894"]),
(MQTTClient.Version.v5_0, ["5", "5.1", "5.1.1", "00000500012"]),
(nil, ["0", "0.1", "0.1.1", "0000000001267894", "blob"])
]
for (version, strings) in arguments {
for string in strings {
XCTAssertEqual(cliClient.parseMqttClientVersion(string), version)
}
}
XCTAssertEqual(MQTTClient.Version.parseOrDefault(string: nil), .v3_1_1)
}
func testLogLevelFromEnvironment() {
@Dependency(\.cliClient) var cliClient
let arguments = [
(Logger.Level.debug, EnvVars(appEnv: .staging, logLevel: nil)),
(Logger.Level.debug, EnvVars(appEnv: .development, logLevel: nil)),
(Logger.Level.info, EnvVars(appEnv: .production, logLevel: nil)),
(Logger.Level.trace, EnvVars(appEnv: .testing, logLevel: nil)),
(Logger.Level.info, EnvVars(appEnv: .staging, logLevel: .info)),
(Logger.Level.trace, EnvVars(appEnv: .development, logLevel: .trace)),
(Logger.Level.warning, EnvVars(appEnv: .production, logLevel: .warning)),
(Logger.Level.debug, EnvVars(appEnv: .testing, logLevel: .debug))
]
for (expected, envVars) in arguments {
XCTAssertEqual(expected, cliClient.logLevel(envVars))
}
}
func testMakeEnvVars() async throws {
@Dependency(\.cliClient) var cliClient
@Dependency(\.environment) var environment
let arguments = [
(
CliClient.EnvVarsRequest(envFilePath: nil, logger: nil, version: nil),
EnvVars()
),
(
CliClient.EnvVarsRequest(envFilePath: nil, logger: nil, version: "3"),
EnvVars(version: "3")
),
(
CliClient.EnvVarsRequest(
envFilePath: "test.env", // Needs to be a bundled resource.
logger: nil,
version: nil
),
EnvVars.test
),
(
CliClient.EnvVarsRequest(
envFilePath: "test-env.json", // Needs to be a bundled resource.
logger: nil,
version: nil
),
EnvVars.test
)
]
for (request, expectedEnvVars) in arguments {
var request = request
if let file = request.envFilePath {
request = .init(
envFilePath: cleanFilePath(file),
logger: request.logger,
version: request.mqttClientVersion
)
}
let result = try await cliClient.makeEnvVars(request)
XCTAssertEqual(result, expectedEnvVars)
}
}
func testMakeEnvVarsWithFailingDecoder() async throws {
try await withDependencies {
$0.environment.coders = { ThrowingDecoder() }
} operation: {
@Dependency(\.cliClient) var cliClient
let envVars = try await cliClient.makeEnvVars(.init())
XCTAssertEqual(envVars, EnvVars())
}
}
func testMakeEnvVarsWithFailingEncoder() async throws {
try await withDependencies {
$0.environment.coders = { ThrowingEncoder() }
} operation: {
@Dependency(\.cliClient) var cliClient
let envVars = try await cliClient.makeEnvVars(.init())
XCTAssertEqual(envVars, EnvVars())
}
}
func testFileType() {
let arguments = [
(EnvironmentDependency.FileType.dotEnv(path: "test.env"), "test.env"),
(EnvironmentDependency.FileType.json(path: "test.json"), "test.json"),
(nil, "test"),
(nil, "")
]
for (expected, file) in arguments {
XCTAssertEqual(EnvironmentDependency.FileType(path: file), expected)
}
}
func testEnvironmentLiveValueProcessInfo() {
let environment = EnvironmentDependency.liveValue
XCTAssertEqual(environment.processInfo(), ProcessInfo.processInfo.environment)
}
func testMakeClient() throws {
@Dependency(\.cliClient) var cliClient
let envVars = EnvVars.test
let client = try cliClient.makeClient(.init(
environment: envVars,
eventLoopGroup: .init(numberOfThreads: 1),
logger: nil
))
XCTAssertEqual(client.host, envVars.host)
XCTAssertEqual(client.port, Int(envVars.port!))
XCTAssertEqual(client.identifier, envVars.identifier)
XCTAssertEqual(client.configuration.version, .v5_0)
XCTAssertEqual(client.configuration.userName, envVars.userName)
XCTAssertEqual(client.configuration.password, envVars.password)
try client.syncShutdownGracefully()
}
}
// - MARK: Helper
private func cleanFilePath(_ path: String) -> String {
#if os(Linux)
return "Tests/CliClientTests/\(path)"
#else
let split = path.split(separator: ".")
let fileName = split.first!
let ext = split.last!
let url = Bundle.module.url(forResource: String(fileName), withExtension: String(ext))!.absoluteString
let cleaned = url.split(separator: "file://").last!
return String(cleaned)
#endif
}
extension EnvVars {
static let test = EnvVars(
appEnv: .testing,
host: "test.mqtt",
port: "1234",
identifier: "testing-mqtt",
userName: "test-user",
password: "super-secret",
logLevel: .debug,
version: "5.0"
)
}
struct ThrowingDecoder: Coderable {
func encode<T>(_ instance: T) throws -> Data where T: Encodable {
try JSONEncoder().encode(instance)
}
func decode<T>(_ type: T.Type, from data: Data) throws -> T where T: Decodable {
throw DecodeError()
}
}
struct ThrowingEncoder: Coderable {
func encode<T>(_ instance: T) throws -> Data where T: Encodable {
throw EncodeError()
}
func decode<T>(_ type: T.Type, from data: Data) throws -> T where T: Decodable {
try JSONDecoder().decode(T.self, from: data)
}
}
struct DecodeError: Error {}
struct EncodeError: Error {}

View File

@@ -0,0 +1,10 @@
{
"APP_ENV": "testing",
"MQTT_HOST": "test.mqtt",
"MQTT_PORT": "1234",
"MQTT_IDENTIFIER": "testing-mqtt",
"MQTT_USERNAME": "test-user",
"MQTT_PASSWORD": "super-secret",
"LOG_LEVEL": "debug",
"MQTT_VERSION": "5.0"
}

View File

@@ -0,0 +1,8 @@
APP_ENV="testing"
MQTT_HOST="test.mqtt"
MQTT_PORT="1234"
MQTT_IDENTIFIER="testing-mqtt"
MQTT_USERNAME="test-user"
MQTT_PASSWORD="super-secret"
LOG_LEVEL="debug"
MQTT_VERSION="5.0"

View File

@@ -0,0 +1,214 @@
import Dependencies
// @_spi(Internal) import dewpoint_controller
import Logging
import Models
import MQTTConnectionService
@_spi(Internal) import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
@_spi(Internal) import SensorsService
import ServiceLifecycle
import ServiceLifecycleTestKit
import XCTest
final class IntegrationTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "IntegrationTests")
logger.logLevel = .info
return logger
}()
override func invokeTest() {
let client = createClient(identifier: "\(Self.self)")
withDependencies {
$0.mqtt = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue
} operation: {
super.invokeTest()
}
}
func testConnectionServiceShutdown() async throws {
@Dependency(\.mqtt) var mqtt
do {
let service = MQTTConnectionService(logger: Self.logger)
let task = Task { try await service.run() }
defer { task.cancel() }
try await Task.sleep(for: .milliseconds(200))
// check the connection is active here.
try await mqtt.withClient { client in
XCTAssert(client.isActive())
}
mqtt.shutdown()
try await Task.sleep(for: .milliseconds(500))
// check the connection is active here.
try await mqtt.withClient { client in
XCTAssertFalse(client.isActive())
}
} catch {
mqtt.shutdown()
try await Task.sleep(for: .milliseconds(500))
}
}
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
defer { manager.shutdown() }
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTManager.Event]()
var events2 = [MQTTManager.Event]()
let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
try await client.disconnect()
try await Task.sleep(for: .milliseconds(500))
manager.shutdown()
try await Task.sleep(for: .milliseconds(500))
connectionStream1.stop()
connectionStream2.stop()
}
for await event in stream1.removeDuplicates() {
events1.append(event)
}
for await event in stream2.removeDuplicates() {
events2.append(event)
}
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func testListeningResumesAfterDisconnectThenReconnect() async throws {
struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return)
let results = ResultContainer()
try await withDependencies {
$0.mqtt.publish = results.append
} operation: {
@Dependency(\.mqtt) var manager
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() }
defer { task.cancel() }
try await manager.connect()
defer { manager.shutdown() }
try await manager.withClient { client in
try await client.disconnect()
try await client.connect()
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
// Give time to re-subscribe.
try await Task.sleep(for: .milliseconds(200))
try await client.publish(
to: sensor.topics.temperature,
payload: ByteBufferAllocator().buffer(string: "25"),
qos: .atLeastOnce,
retain: false
)
try await client.publish(
to: sensor.topics.humidity,
payload: ByteBufferAllocator().buffer(string: "50"),
qos: .atLeastOnce,
retain: false
)
}
var timeoutCount = 0
while !(await results.count == 2) {
guard timeoutCount < 20 else {
throw TimeoutError()
}
try await Task.sleep(for: .milliseconds(100))
timeoutCount += 1
}
let results = await results.results()
XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
try await sensorsService.shutdown()
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
// return .init(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: Self.logger)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: Self.logger,
configuration: config
)
}
}
// - MARK: Helpers
struct TopicNotFoundError: Error {}
actor ResultContainer: Sendable {
private var storage = [MQTTManager.PublishRequest]()
init() {}
@Sendable func append(_ result: MQTTManager.PublishRequest) async {
storage.append(result)
}
var count: Int {
get async { storage.count }
}
func results() async -> [MQTTManager.PublishRequest] {
storage
}
}

View File

@@ -1,122 +0,0 @@
import Combine
import Logging
import Models
@testable import MQTTConnectionService
import MQTTNIO
import NIO
import ServiceLifecycle
import ServiceLifecycleTestKit
import XCTest
final class MQTTConnectionServiceTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
logger.logLevel = .trace
return logger
}()
func testGracefulShutdownWorks() async throws {
try await testGracefulShutdown { trigger in
let client = createClient(identifier: "testGracefulShutdown")
let service = MQTTConnectionService(client: client)
try await service.run()
try await Task.sleep(for: .seconds(1))
XCTAssert(client.isActive())
trigger.triggerGracefulShutdown()
// try await Task.sleep(for: .seconds(2))
// XCTAssertFalse(client.isActive())
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
func testEventStream() async throws {
var connection: ConnectionStream? = ConnectionStream()
let task = Task {
guard let events = connection?.events else { return }
print("before loop")
for await event in events {
print("\(event)")
}
print("after loop")
}
let ending = Task {
try await Task.sleep(for: .seconds(2))
connection = nil
}
connection?.start()
try await ending.value
task.cancel()
}
}
class ConnectionStream {
enum Event {
case connected
case disconnected
case shuttingDown
}
let events: AsyncStream<Event>
private let continuation: AsyncStream<Event>.Continuation
private var cancellable: AnyCancellable?
init() {
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
self.events = stream
self.continuation = continuation
}
deinit {
print("connection stream is gone.")
stop()
}
func start() {
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
.autoconnect()
.sink { [weak self] _ in
print("will send event.")
self?.continuation.yield(.connected)
}
}
func stop() {
continuation.yield(.shuttingDown)
cancellable = nil
continuation.finish()
}
}

View File

@@ -1,298 +0,0 @@
import Dependencies
import Logging
import Models
import MQTTNIO
import NIO
import PsychrometricClientLive
@testable import SensorsService
import XCTest
final class SensorsClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "AsyncClientTests")
logger.logLevel = .debug
return logger
}()
override func invokeTest() {
withDependencies {
$0.psychrometricClient = PsychrometricClient.liveValue
} operation: {
super.invokeTest()
}
}
// func createClient(identifier: String) -> SensorsClient {
// let envVars = EnvVars(
// appEnv: .testing,
// host: Self.hostname,
// port: "1883",
// identifier: identifier,
// userName: nil,
// password: nil
// )
// return .init(envVars: envVars, logger: Self.logger)
// }
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
// func testConnectAndShutdown() async throws {
// let client = createClient(identifier: "testConnectAndShutdown")
// await client.connect()
// await client.shutdown()
// }
// func testSensorService() async throws {
// let mqtt = createClient(identifier: "testSensorService")
// // let mqtt = await client.client
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
// let publishInfo = PublishInfoContainer(topicFilters: [
// sensor.topics.dewPoint,
// sensor.topics.enthalpy
// ])
// let service = SensorsService(client: mqtt, sensors: [sensor])
//
// // fix to connect the mqtt client.
// try await mqtt.connect()
// let task = Task { try await service.run() }
//
// _ = try await mqtt.subscribe(to: [
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce),
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce)
// ])
//
// let listener = mqtt.createPublishListener()
// Task {
// for await result in listener {
// switch result {
// case let .failure(error):
// XCTFail("\(error)")
// case let .success(value):
// await publishInfo.addPublishInfo(value)
// }
// }
// }
//
// try await mqtt.publish(
// to: sensor.topics.temperature,
// payload: ByteBufferAllocator().buffer(string: "75.123"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// // XCTAssert(client.sensors.first!.needsProcessed)
// // let firstSensor = await client.sensors.first!
// // XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius))
//
// try await mqtt.publish(
// to: sensor.topics.humidity,
// payload: ByteBufferAllocator().buffer(string: "50"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// // not working for some reason
// // XCTAssertEqual(publishInfo.info.count, 2)
//
// XCTAssert(publishInfo.info.count > 1)
//
// // fix to shutdown the mqtt client.
// task.cancel()
// try await mqtt.shutdown()
// }
func testCapturingSensorClient() async throws {
class CapturedValues {
var values = [(value: Double, topic: String)]()
var didShutdown = false
init() {}
}
let capturedValues = CapturedValues()
try await withDependencies {
$0.sensorsClient = .testing(
yielding: [
(value: 76, to: "not-listening"),
(value: 75, to: "test")
]
) { value, topic in
capturedValues.values.append((value, topic))
} captureShutdownEvent: {
capturedValues.didShutdown = $0
}
} operation: {
@Dependency(\.sensorsClient) var client
let stream = try await client.listen(to: ["test"])
for await value in stream {
var buffer = value.payload
guard let double = Double(buffer: &buffer) else {
XCTFail("Failed to decode double")
return
}
XCTAssertEqual(double, 75)
XCTAssertEqual(value.topicName, "test")
try await client.publish(26, to: "publish")
try await Task.sleep(for: .milliseconds(100))
client.shutdown()
}
XCTAssertEqual(capturedValues.values.count, 1)
XCTAssertEqual(capturedValues.values.first?.value, 26)
XCTAssertEqual(capturedValues.values.first?.topic, "publish")
XCTAssertTrue(capturedValues.didShutdown)
}
}
// func testSensorCapturesPublishedState() async throws {
// let client = createClient(identifier: "testSensorCapturesPublishedState")
// let mqtt = client.client
// let sensor = TemperatureAndHumiditySensor(location: .mixedAir)
// let publishInfo = PublishInfoContainer(topicFilters: [
// sensor.topics.dewPoint,
// sensor.topics.enthalpy
// ])
//
// try await client.addSensor(sensor)
// await client.connect()
// try await client.start()
//
// _ = try await mqtt.subscribe(to: [
// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: MQTTQoS.exactlyOnce),
// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: MQTTQoS.exactlyOnce)
// ])
//
// let listener = mqtt.createPublishListener()
// Task {
// for await result in listener {
// switch result {
// case let .failure(error):
// XCTFail("\(error)")
// case let .success(value):
// await publishInfo.addPublishInfo(value)
// }
// }
// }
//
// try await mqtt.publish(
// to: sensor.topics.temperature,
// payload: ByteBufferAllocator().buffer(string: "75.123"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// // XCTAssert(client.sensors.first!.needsProcessed)
// let firstSensor = client.sensors.first!
// XCTAssertEqual(firstSensor.temperature, DryBulb.celsius(75.123))
//
// try await mqtt.publish(
// to: sensor.topics.humidity,
// payload: ByteBufferAllocator().buffer(string: "50"),
// qos: MQTTQoS.exactlyOnce,
// retain: true
// )
//
// try await Task.sleep(for: .seconds(1))
//
// XCTAssertEqual(publishInfo.info.count, 2)
//
// await client.shutdown()
// }
}
// MARK: Helpers for tests.
class PublishInfoContainer {
private(set) var info: [MQTTPublishInfo]
private var topicFilters: [String]?
init(topicFilters: [String]? = nil) {
self.info = []
self.topicFilters = topicFilters
}
func addPublishInfo(_ info: MQTTPublishInfo) async {
guard let topicFilters else {
self.info.append(info)
return
}
if topicFilters.contains(info.topicName) {
self.info.append(info)
}
}
}
extension SensorsClient {
static func testing(
yielding: [(value: Double, to: String)],
capturePublishedValues: @escaping (Double, String) -> Void,
captureShutdownEvent: @escaping (Bool) -> Void
) -> Self {
let (stream, continuation) = AsyncStream.makeStream(of: MQTTPublishInfo.self)
let logger = Logger(label: "\(Self.self).testing")
return .init(
listen: { topics in
for (value, topic) in yielding where topics.contains(topic) {
continuation.yield(
MQTTPublishInfo(
qos: .atLeastOnce,
retain: true,
topicName: topic,
payload: ByteBuffer(string: "\(value)"),
properties: MQTTProperties()
)
)
}
return stream
},
logger: logger,
publish: { value, topic in
capturePublishedValues(value, topic)
},
shutdown: {
captureShutdownEvent(true)
continuation.finish()
}
)
}
}
struct TopicNotFoundError: Error {}

View File

@@ -1,47 +0,0 @@
import XCTest
import class Foundation.Bundle
//final class dewPoint_controllerTests: XCTestCase {
// func testExample() throws {
// // This is an example of a functional test case.
// // Use XCTAssert and related functions to verify your tests produce the correct
// // results.
//
// // Some of the APIs that we use below are available in macOS 10.13 and above.
// guard #available(macOS 10.13, *) else {
// return
// }
//
// // Mac Catalyst won't have `Process`, but it is supported for executables.
// #if !targetEnvironment(macCatalyst)
//
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
//
// let process = Process()
// process.executableURL = fooBinary
//
// let pipe = Pipe()
// process.standardOutput = pipe
//
// try process.run()
// process.waitUntilExit()
//
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
// let output = String(data: data, encoding: .utf8)
//
// XCTAssertEqual(output, "Hello, world!\n")
// #endif
// }
//
// /// Returns path to the built products directory.
// var productsDirectory: URL {
// #if os(macOS)
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
// return bundle.bundleURL.deletingLastPathComponent()
// }
// fatalError("couldn't find the products directory")
// #else
// return Bundle.main.bundleURL
// #endif
// }
//}

View File

@@ -1,55 +0,0 @@
# run this with docker-compose run test
services:
server:
image: swift-mqtt-dewpoint:latest
restart: unless-stopped
env_file: .env
local:
container_name: local-server
build:
context: .
dockerfile: Dockerfile
platform: linux/amd64
depends_on:
- mosquitto
environment:
- MOSQUITTO_SERVER=mosquitto
test:
build:
context: .
dockerfile: Dockerfile.test
platform: linux/amd64
working_dir: /app
networks:
- test
depends_on:
- mosquitto-test
environment:
- MOSQUITTO_SERVER=mosquitto-test
command: /bin/bash -xc "swift package clean && swift test"
mosquitto-test:
image: eclipse-mosquitto
networks:
- test
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/certs:/mosquitto/certs
mosquitto:
image: eclipse-mosquitto
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/certs:/mosquitto/certs
ports:
- "1883:1883"
- "8883:8883"
- "8080:8080"
- "8081:8081"
networks:
test:
driver: bridge
external: false

15
docker/Dockerfile Executable file
View File

@@ -0,0 +1,15 @@
# Used this to build the release version of the image.
# Build the executable
ARG SWIFT_IMAGE_VERSION="5.10"
FROM swift:${SWIFT_IMAGE_VERSION} AS build
WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build -c release -Xswiftc -g
# Run image
FROM swift:${SWIFT_IMAGE_VERSION}-slim
COPY --from=build /build/.build/release/dewpoint-controller /usr/local/bin
CMD ["/bin/bash", "-xc", "/usr/local/bin/dewpoint-controller run"]

View File

@@ -0,0 +1,5 @@
# Used to build a local MQTT broker for development and
# testing.
FROM eclipse-mosquitto:latest
COPY ./mosquitto/config/mosquitto.conf /mosquitto/config/mosquitto.conf
EXPOSE 1883

9
docker/Dockerfile.test Normal file
View File

@@ -0,0 +1,9 @@
# Used to build a test image.
ARG SWIFT_IMAGE_VERSION="5.10"
FROM swift:${SWIFT_IMAGE_VERSION}
WORKDIR /app
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build
CMD ["/bin/bash", "-xc", "swift", "test"]

19
docker/docker-compose-test.yaml Executable file
View File

@@ -0,0 +1,19 @@
# run this with docker-compose run test
name: swift-mqtt-dewpoint-test
services:
test:
build:
context: ..
dockerfile: docker/Dockerfile.test
working_dir: /app
depends_on:
- mosquitto
environment:
- MOSQUITTO_SERVER=mosquitto
command: /bin/bash -xc "swift test"
mosquitto:
build:
context: ..
dockerfile: docker/Dockerfile.mosquitto

20
docker/docker-compose.yaml Executable file
View File

@@ -0,0 +1,20 @@
# run this with docker-compose run dewpoint_controller
name: swift-mqtt-dewpoint
services:
dewpoint_controller:
container_name: dewpoint-controller
build:
context: ..
dockerfile: docker/Dockerfile
depends_on:
- mosquitto
environment:
- MQTT_HOST=mosquitto
mosquitto:
build:
context: ..
dockerfile: docker/Dockerfile.mosquitto
ports:
- "1883:1883"