54 Commits

Author SHA1 Message Date
6472d3cd1e feat: Begins using swift argument parser and creating cli client dependency
All checks were successful
CI / Run Tests (push) Successful in 4m27s
2024-11-16 22:32:32 -05:00
3416ce1003 feat: Prep for moving tests into single integration suite 2024-11-16 01:42:29 -05:00
c84427a9b3 feat: Renaming and moves some items around, listeners now manage reconnection events.
All checks were successful
CI / Run Tests (push) Successful in 4m16s
2024-11-15 17:15:01 -05:00
947472f62d feat: Minimal readme and cleans up docker files.
All checks were successful
CI / Run Tests (push) Successful in 27m31s
2024-11-14 22:21:02 -05:00
d16135dd50 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:20:25 -05:00
19e97652fd feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:19:26 -05:00
1089452212 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 22:17:08 -05:00
5e998a60d0 feat: Minimal readme and cleans up docker files.
Some checks failed
CI / Run Tests (push) Failing after 13s
2024-11-14 22:12:20 -05:00
9e2af22a36 feat: Fix CI
All checks were successful
CI / Run Tests (push) Successful in 15m25s
2024-11-14 21:36:56 -05:00
89f3601c2c fix: Fixes sensor service test that was flaky and moves docker stuff into it's own directory.
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 21:34:48 -05:00
d4b6f6ad2b fix: Fixes sensor service test that was flaky and moves docker stuff into it's own directory.
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 21:24:23 -05:00
ec3cd40fef feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m57s
2024-11-14 21:00:01 -05:00
953c9d5b7c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m57s
2024-11-14 20:54:00 -05:00
00bb6ca1a6 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3m40s
2024-11-14 20:45:50 -05:00
41fb3c5715 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 20:44:31 -05:00
8e4430804c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 4m31s
2024-11-14 20:02:50 -05:00
a8f689136d feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 20:01:45 -05:00
2607be6658 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 20:00:24 -05:00
b05e18b258 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:59:45 -05:00
394b49d1a0 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m14s
2024-11-14 19:55:19 -05:00
6bec0d6fa5 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m33s
2024-11-14 19:52:28 -05:00
63d65bd7cd feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:51:25 -05:00
320f3e792e feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:49:04 -05:00
74b73e7534 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:47:43 -05:00
7954fc5dcd feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:45:40 -05:00
115c4dc252 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:43:07 -05:00
853a157ae7 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:35:26 -05:00
30b8ea3661 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 11s
2024-11-14 19:32:47 -05:00
d26ab714ab feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 10s
2024-11-14 19:30:20 -05:00
b45ad76fff feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 19:24:58 -05:00
c4395b9089 feat: Run CI
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 19:00:22 -05:00
b3874b96c5 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8m52s
2024-11-14 18:41:37 -05:00
4024bb624f feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 18:39:10 -05:00
6371ffed47 fix: Fixing naming of dewpoint-controller, part 2.
Some checks failed
CI / Run Tests (push) Failing after 2m35s
2024-11-14 18:10:52 -05:00
76b06e86fa fix: Fixing naming of dewpoint-controller, part 2.
Some checks failed
CI / Run Tests (push) Failing after 1m41s
2024-11-14 18:01:48 -05:00
fccfa4d006 fix: Fixing naming of dewpoint-controller, part 1. 2024-11-14 18:01:18 -05:00
5df08d6c91 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m40s
2024-11-14 17:55:40 -05:00
1c99e4861d feat: Run CI
Some checks failed
CI / Run Tests (push) Has been cancelled
2024-11-14 17:48:07 -05:00
a0b7053eae feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 33s
2024-11-14 17:41:42 -05:00
df3ed6a407 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 12s
2024-11-14 17:40:47 -05:00
1d9d8dc449 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m6s
2024-11-14 17:17:45 -05:00
9a53d36f4c feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 54s
2024-11-14 17:16:00 -05:00
44a6a878eb feat: Run CI
All checks were successful
CI / Run Tests (push) Successful in 33s
2024-11-14 17:12:34 -05:00
c13a1a14a3 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m39s
2024-11-14 17:09:31 -05:00
6c916215ea feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 1m37s
2024-11-14 17:02:35 -05:00
be7442c06a feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 17:00:41 -05:00
26a30c2a07 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 9s
2024-11-14 16:59:39 -05:00
5f131d8fa2 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 16:58:22 -05:00
d6e217f556 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 8s
2024-11-14 16:55:52 -05:00
b39ccafc92 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 9s
2024-11-14 16:54:04 -05:00
8336c56adf feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 14s
2024-11-14 16:41:24 -05:00
fac8945386 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 36s
2024-11-14 16:38:42 -05:00
5b319cae9b feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 7s
2024-11-14 16:37:35 -05:00
ca7024cb60 feat: Run CI
Some checks failed
CI / Run Tests (push) Failing after 3s
2024-11-14 16:32:48 -05:00
43 changed files with 1565 additions and 1513 deletions

View File

@@ -10,5 +10,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Test
run: make test
- name: Setup QEMU
uses: docker/setup-qemu-action@v3
- name: Setup Docker buildx
uses: docker/setup-buildx-action@v3
- name: Run Test
run: make test-docker
- name: Cleanup.
if: always()
run: docker compose --file docker/docker-compose-test.yaml down

2
.gitignore vendored
View File

@@ -10,3 +10,5 @@ DerivedData/
mqtt_password.txt
.env
.smbdelete*
buildServer.json
.nvim/*

2
.swiftlint.yml Normal file
View File

@@ -0,0 +1,2 @@
disabled_rules:
- closing_brace

View File

@@ -0,0 +1,29 @@
{
"configurations" : [
{
"id" : "AFB1047B-4742-43D2-AFB9-680C1CB2D273",
"name" : "Test Scheme Action",
"options" : {
}
}
],
"defaultOptions" : {
"targetForVariableExpansion" : {
"containerPath" : "container:",
"identifier" : "dewpoint-controller",
"name" : "dewpoint-controller"
}
},
"testTargets" : [
{
"parallelizable" : true,
"target" : {
"containerPath" : "container:",
"identifier" : "IntegrationTests",
"name" : "IntegrationTests"
}
}
],
"version" : 1
}

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "DewPointEnvironment"
BuildableName = "DewPointEnvironment"
BlueprintName = "DewPointEnvironment"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "DewPointEnvironment"
BuildableName = "DewPointEnvironment"
BlueprintName = "DewPointEnvironment"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,67 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "EnvVars"
BuildableName = "EnvVars"
BlueprintName = "EnvVars"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "EnvVars"
BuildableName = "EnvVars"
BlueprintName = "EnvVars"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>

View File

@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
LastUpgradeVersion = "1610"
version = "1.7">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
buildImplicitDependencies = "YES"
buildArchitectures = "Automatic">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
@@ -14,9 +15,9 @@
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientLive"
BuildableName = "ClientLive"
BlueprintName = "ClientLive"
BlueprintIdentifier = "MQTTConnectionService"
BuildableName = "MQTTConnectionService"
BlueprintName = "MQTTConnectionService"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
@@ -26,9 +27,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
shouldUseLaunchSchemeArgsEnv = "YES"
shouldAutocreateTestPlan = "YES">
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
@@ -50,9 +50,9 @@
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientLive"
BuildableName = "ClientLive"
BlueprintName = "ClientLive"
BlueprintIdentifier = "MQTTConnectionService"
BuildableName = "MQTTConnectionService"
BlueprintName = "MQTTConnectionService"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>

View File

@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
LastUpgradeVersion = "1610"
version = "1.7">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
buildImplicitDependencies = "YES"
buildArchitectures = "Automatic">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
@@ -14,9 +15,9 @@
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Bootstrap"
BuildableName = "Bootstrap"
BlueprintName = "Bootstrap"
BlueprintIdentifier = "MQTTManager"
BuildableName = "MQTTManager"
BlueprintName = "MQTTManager"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
@@ -26,9 +27,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
shouldUseLaunchSchemeArgsEnv = "YES"
shouldAutocreateTestPlan = "YES">
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
@@ -50,9 +50,9 @@
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Bootstrap"
BuildableName = "Bootstrap"
BlueprintName = "Bootstrap"
BlueprintIdentifier = "MQTTManager"
BuildableName = "MQTTManager"
BlueprintName = "MQTTManager"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>

View File

@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
LastUpgradeVersion = "1610"
version = "1.7">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
buildImplicitDependencies = "YES"
buildArchitectures = "Automatic">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
@@ -14,9 +15,9 @@
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Client"
BuildableName = "Client"
BlueprintName = "Client"
BlueprintIdentifier = "SensorsService"
BuildableName = "SensorsService"
BlueprintName = "SensorsService"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
@@ -26,9 +27,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
</Testables>
shouldUseLaunchSchemeArgsEnv = "YES"
shouldAutocreateTestPlan = "YES">
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
@@ -50,9 +50,9 @@
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Client"
BuildableName = "Client"
BlueprintName = "Client"
BlueprintIdentifier = "SensorsService"
BuildableName = "SensorsService"
BlueprintName = "SensorsService"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>

View File

@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
LastUpgradeVersion = "1610"
version = "1.7">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
buildImplicitDependencies = "YES"
buildArchitectures = "Automatic">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
@@ -14,9 +15,9 @@
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Bootstrap"
BuildableName = "Bootstrap"
BlueprintName = "Bootstrap"
BlueprintIdentifier = "MQTTConnectionService"
BuildableName = "MQTTConnectionService"
BlueprintName = "MQTTConnectionService"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
@@ -28,51 +29,9 @@
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Client"
BuildableName = "Client"
BlueprintName = "Client"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientLive"
BuildableName = "ClientLive"
BlueprintName = "ClientLive"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "DewPointEnvironment"
BuildableName = "DewPointEnvironment"
BlueprintName = "DewPointEnvironment"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "EnvVars"
BuildableName = "EnvVars"
BlueprintName = "EnvVars"
BlueprintIdentifier = "MQTTManager"
BuildableName = "MQTTManager"
BlueprintName = "MQTTManager"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
@@ -90,62 +49,6 @@
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "TopicsLive"
BuildableName = "TopicsLive"
BlueprintName = "TopicsLive"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
@@ -160,6 +63,48 @@
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewpoint-controller"
BuildableName = "dewpoint-controller"
BlueprintName = "dewpoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "IntegrationTests"
BuildableName = "IntegrationTests"
BlueprintName = "IntegrationTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "CliClient"
BuildableName = "CliClient"
BlueprintName = "CliClient"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
@@ -167,44 +112,20 @@
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<TestPlans>
<TestPlanReference
reference = "container:.swiftpm/dewpoint-controller-Package.xctestplan"
default = "YES">
</TestPlanReference>
</TestPlans>
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "MQTTConnectionServiceTests"
BuildableName = "MQTTConnectionServiceTests"
BlueprintName = "MQTTConnectionServiceTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "SensorsServiceTests"
BuildableName = "SensorsServiceTests"
BlueprintName = "SensorsServiceTests"
BlueprintIdentifier = "IntegrationTests"
BuildableName = "IntegrationTests"
BlueprintName = "IntegrationTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
@@ -223,9 +144,9 @@
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
BlueprintIdentifier = "dewpoint-controller"
BuildableName = "dewpoint-controller"
BlueprintName = "dewpoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
@@ -239,9 +160,9 @@
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
BlueprintIdentifier = "dewpoint-controller"
BuildableName = "dewpoint-controller"
BlueprintName = "dewpoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>

View File

@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1330"
version = "1.3">
LastUpgradeVersion = "1610"
version = "1.7">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
buildImplicitDependencies = "YES"
buildArchitectures = "Automatic">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
@@ -14,37 +15,9 @@
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
BlueprintIdentifier = "dewpoint-controller"
BuildableName = "dewpoint-controller"
BlueprintName = "dewpoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
@@ -54,29 +27,8 @@
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "ClientTests"
BuildableName = "ClientTests"
BlueprintName = "ClientTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controllerTests"
BuildableName = "dewPoint-controllerTests"
BlueprintName = "dewPoint-controllerTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
shouldUseLaunchSchemeArgsEnv = "YES"
shouldAutocreateTestPlan = "YES">
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
@@ -92,9 +44,9 @@
runnableDebuggingMode = "0">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
BlueprintIdentifier = "dewpoint-controller"
BuildableName = "dewpoint-controller"
BlueprintName = "dewpoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildableProductRunnable>
@@ -109,9 +61,9 @@
runnableDebuggingMode = "0">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "dewPoint-controller"
BuildableName = "dewPoint-controller"
BlueprintName = "dewPoint-controller"
BlueprintIdentifier = "dewpoint-controller"
BuildableName = "dewpoint-controller"
BlueprintName = "dewpoint-controller"
ReferencedContainer = "container:">
</BuildableReference>
</BuildableProductRunnable>

View File

@@ -1,14 +0,0 @@
# Build the executable
FROM swift:5.10 AS build
WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build -c release -Xswiftc -g
# Run image
FROM swift:5.10-slim
WORKDIR /run
COPY --from=build /build/.build/release/dewpoint-controller /run
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]

View File

@@ -1,30 +1,39 @@
DOCKER_IMAGE_NAME?="swift-mqtt-dewpoint"
DOCKER_TAG_NAME?="latest"
bootstrap-env:
.PHONY: bootstrap
bootstrap:
@cp Bootstrap/dewPoint-env-example .dewPoint-env
bootstrap-topics:
@cp Bootstrap/topics-example .topics
bootstrap: bootstrap-env bootstrap-topics
.PHONY: build
build:
@swift build -Xswiftc -strict-concurrency=complete
.PHONY: build-docker
build-docker:
@docker build \
--file docker/Dockerfile \
--tag "${DOCKER_IMAGE_NAME}:${DOCKER_TAG_NAME}" .
.PHONY: clean
clean:
rm -rf .build
.PHONY: run
run:
@swift run dewPoint-controller
start-mosquitto:
@docker-compose start mosquitto
stop-mosquitto:
@docker-compose rm -f mosquitto || true
@swift run dewpoint-controller
.PHONY: test-docker
test-docker:
@docker-compose run --build --remove-orphans -i --rm test
@docker-compose kill mosquitto-test
@docker-compose rm -f
@docker compose --file docker/docker-compose-test.yaml \
run --build --remove-orphans -i --rm test
@docker compose --file docker/docker-compose-test.yaml down
test: test-docker
.PHONY: start-mosquitto
start-mosquitto:
@docker compose --file docker/docker-compose.yaml \
up -d mosquitto
.PHONY: test-swift
test-swift: start-mosquitto
@swift test --enable-code-coverage

View File

@@ -1,5 +1,5 @@
{
"originHash" : "c2538e3229d6c80f3d6a979c2f3605d63b4973e1e49786819017473eb2916f4e",
"originHash" : "486be5d69e4f0ba7b9f42046df31a727c7e394e4ecfae5671e1b194bed7c9e9b",
"pins" : [
{
"identity" : "combine-schedulers",
@@ -10,6 +10,15 @@
"version" : "1.0.2"
}
},
{
"identity" : "dotenv",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftpackages/DotEnv.git",
"state" : {
"revision" : "1f15bb9de727d694af1d003a1a5d7a553752850f",
"version" : "3.0.0"
}
},
{
"identity" : "mqtt-nio",
"kind" : "remoteSourceControl",
@@ -19,6 +28,15 @@
"version" : "2.11.0"
}
},
{
"identity" : "swift-argument-parser",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-argument-parser.git",
"state" : {
"revision" : "41982a3656a71c768319979febd796c6fd111d5c",
"version" : "1.5.0"
}
},
{
"identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl",
@@ -60,8 +78,17 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-concurrency-extras",
"state" : {
"revision" : "6054df64b55186f08b6d0fd87152081b8ad8d613",
"version" : "1.2.0"
"revision" : "163409ef7dae9d960b87f34b51587b6609a76c1f",
"version" : "1.3.0"
}
},
{
"identity" : "swift-custom-dump",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-custom-dump",
"state" : {
"revision" : "82645ec760917961cfa08c9c0c7104a57a0fa4b1",
"version" : "1.3.3"
}
},
{

View File

@@ -13,33 +13,44 @@ let package = Package(
.macOS(.v14)
],
products: [
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
.executable(name: "dewpoint-controller", targets: ["DewPointController"]),
.library(name: "CliClient", targets: ["CliClient"]),
.library(name: "Models", targets: ["Models"]),
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
.library(name: "MQTTManager", targets: ["MQTTManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"]),
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
.library(name: "SensorsService", targets: ["SensorsService"])
],
dependencies: [
.package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/swiftpackages/DotEnv.git", from: "3.0.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"),
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.0.0"),
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
],
targets: [
.executableTarget(
name: "dewpoint-controller",
.target(
name: "CliClient",
dependencies: [
"Models",
"MQTTConnectionManager",
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "DotEnv", package: "DotEnv"),
.product(name: "MQTTNIO", package: "mqtt-nio")
]
),
.executableTarget(
name: "DewPointController",
dependencies: [
"CliClient",
"MQTTConnectionService",
"SensorsService",
"TopicDependencies",
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
.product(name: "CustomDump", package: "swift-custom-dump"),
// .product(name: "DotEnv", package: "DotEnv"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
@@ -52,7 +63,7 @@ let package = Package(
swiftSettings: swiftSettings
),
.target(
name: "MQTTConnectionManager",
name: "MQTTManager",
dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
@@ -65,16 +76,19 @@ let package = Package(
name: "MQTTConnectionService",
dependencies: [
"Models",
"MQTTConnectionManager",
"MQTTManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
),
.testTarget(
name: "MQTTConnectionServiceTests",
name: "IntegrationTests",
dependencies: [
"DewPointController",
"MQTTConnectionService",
"MQTTConnectionManager",
"MQTTManager",
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics"),
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
]
),
@@ -82,30 +96,14 @@ let package = Package(
name: "SensorsService",
dependencies: [
"Models",
"MQTTConnectionManager",
"TopicDependencies",
"MQTTManager",
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "PsychrometricClient", package: "swift-psychrometrics"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
],
swiftSettings: swiftSettings
),
.testTarget(
name: "SensorsServiceTests",
dependencies: [
"SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
]
),
.target(
name: "TopicDependencies",
dependencies: [
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
)
]
)

View File

@@ -1,3 +1,7 @@
# dewPoint-controller
# dewpoint-controller
A description of this package.
![CI](https://git.housh.dev/michael/swift-mqtt-dewpoint/actions/workflows/ci.yaml/badge.svg?branch=main)
Listens to an MQTT broker for temperature and humidity sensors and calculates
the dew-point temperature and enthalpy for the sensor, then publishes those back
to the MQTT broker.

View File

@@ -0,0 +1,188 @@
import Dependencies
import DependenciesMacros
import DotEnv
import Foundation
import Logging
import Models
import MQTTNIO
import NIO
public extension DependencyValues {
var cliClient: CliClient {
get { self[CliClient.self] }
set { self[CliClient.self] = newValue }
}
}
@DependencyClient
public struct CliClient {
public var logLevel: @Sendable (EnvVars) -> Logger.Level = { _ in .debug }
public var makeEnvVars: @Sendable (EnvVarsRequest) async throws -> EnvVars
public var makeClient: @Sendable (ClientRequest) throws -> MQTTClient
public var parseMqttClientVersion: @Sendable (String) -> MQTTClient.Version?
public struct ClientRequest: Sendable {
public let envVars: EnvVars
public let eventLoopGroup: MultiThreadedEventLoopGroup
public let logger: Logger?
public init(
envVars: EnvVars,
eventLoopGroup: MultiThreadedEventLoopGroup,
logger: Logger?
) {
self.envVars = envVars
self.eventLoopGroup = eventLoopGroup
self.logger = logger
}
}
public struct EnvVarsRequest: Sendable {
public let envFilePath: String?
public let logger: Logger?
public let mqttClientVersion: String?
public init(
envFilePath: String? = nil,
logger: Logger? = nil,
version mqttClientVersion: String? = nil
) {
self.envFilePath = envFilePath
self.logger = logger
self.mqttClientVersion = mqttClientVersion
}
}
}
extension CliClient: DependencyKey {
public static let testValue: CliClient = Self()
public static var liveValue: CliClient {
Self(
logLevel: { Logger.Level.from(environment: $0) },
makeEnvVars: {
try EnvVars.load(
dotEnvFile: $0.envFilePath,
logger: $0.logger,
version: $0.mqttClientVersion
)
},
makeClient: {
MQTTClient(
envVars: $0.envVars,
eventLoopGroup: $0.eventLoopGroup,
logger: $0.logger
)
},
parseMqttClientVersion: { .init(string: $0) }
)
}
}
extension EnvVars {
/// Load the `EnvVars` from the environment.
///
/// - Paramaters:
/// - logger: An optional logger to use for debugging.
/// - version: A version that is specified from command line, ignoring any environment variable.
static func load(
dotEnvFile: String?,
logger: Logger?,
version: String?
) throws -> EnvVars {
let defaultEnvVars = EnvVars()
let encoder = JSONEncoder()
let decoder = JSONDecoder()
if let dotEnvFile {
try DotEnv.load(path: dotEnvFile)
}
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
let envVarsDict = defaultEnvDict
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
var envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
if let version {
envVars.version = version
}
logger?.debug("Done loading EnvVars...")
return envVars
}
}
@_spi(Internal)
public extension MQTTClient {
convenience init(
envVars: EnvVars,
eventLoopGroup: EventLoopGroup,
logger: Logger?
) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .parseOrDefualt(string: envVars.version),
disablePing: false,
userName: envVars.userName,
password: envVars.password
)
)
}
}
extension MQTTClient.Version {
static let `default` = Self.v3_1_1
static func parseOrDefualt(string: String?) -> Self {
guard let string, let value = Self(string: string) else {
return .default
}
return value
}
init?(string: String) {
if string.contains("5") {
self = .v5_0
} else if string.contains("3") {
self = .v3_1_1
} else {
return nil
}
}
}
@_spi(Internal)
public extension Logger.Level {
/// Parse a `Logger.Level` from the loaded `EnvVars`.
static func from(environment envVars: EnvVars) -> Self {
// If the log level was set via an environment variable.
if let logLevel = envVars.logLevel {
return logLevel
}
// Parse the appEnv to derive an log level.
switch envVars.appEnv {
case .staging, .development:
return .debug
case .production:
return .info
case .testing:
return .trace
}
}
}

View File

@@ -0,0 +1,21 @@
import ArgumentParser
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionService
import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
@main
struct Application: AsyncParsableCommand {
static let configuration = CommandConfiguration(
commandName: "dewpoint-controller",
abstract: "Command for running the dewpoint mqtt service.",
subcommands: [Run.self, Debug.self]
)
}

View File

@@ -0,0 +1,74 @@
import ArgumentParser
import CliClient
import CustomDump
import Dependencies
import DotEnv
import Foundation
import Logging
import Models
extension Application {
struct Debug: AsyncParsableCommand {
static let configuration: CommandConfiguration = .init(
commandName: "debug",
abstract: "Debug the environment variables."
)
@OptionGroup
var shared: SharedOptions
@Flag(
name: [.customLong("show-password")],
help: "Don't redact the password from the console."
)
var showPassword: Bool = false
mutating func run() async throws {
@Dependency(\.cliClient) var client
let logger = Logger(label: "debug-command")
print("--------------------------")
print("Running debug command...")
if let envFile = shared.envFile {
print("Reading env file: \(envFile)")
print("--------------------------")
} else {
print("No env file set.")
print("--------------------------")
}
print("Loading EnvVars")
print("--------------------------")
let envVars = try await client.makeEnvVars(shared.envVarsRequest(logger: logger))
printEnvVars(envVars: envVars, showPassword: showPassword)
print("--------------------------")
if let logLevel = shared.logLevel, let level = logLevel() {
print("Log Level option: \(level)")
print("--------------------------")
} else {
print("Log Level option: nil")
print("--------------------------")
}
}
private func printEnvVars(envVars: EnvVars, showPassword: Bool) {
// show the proper password to show depending on if it exists
// and if we should redact it or not.
var passwordString: String?
switch (showPassword, envVars.password) {
case (true, .none), (_, .none):
break
case (true, let .some(password)):
passwordString = password
case (false, .some):
passwordString = "<redacted>"
}
var envVars = envVars
envVars.password = passwordString
customDump(envVars)
}
}
}

View File

@@ -0,0 +1,86 @@
import ArgumentParser
import CliClient
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionService
import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
extension Application {
/// Run the controller.
///
struct Run: AsyncParsableCommand {
static let configuration = CommandConfiguration(
commandName: "run",
abstract: "Run the controller."
)
@OptionGroup
var shared: SharedOptions
mutating func run() async throws {
@Dependency(\.cliClient) var cliClient
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var logger = Logger(label: "dewpoint-controller")
let mqtt = try await setup(eventLoopGroup: eventloopGroup, logger: &logger)
do {
try await withDependencies {
$0.psychrometricClient = .liveValue
$0.mqtt = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
logger.info("Starting dewpoint-controller!")
try await serviceGroup.run()
}
try await mqtt.shutdown()
try await eventloopGroup.shutdownGracefully()
} catch {
try await eventloopGroup.shutdownGracefully()
}
}
private func setup(
eventLoopGroup: MultiThreadedEventLoopGroup,
logger: inout Logger
) async throws -> MQTTClient {
@Dependency(\.cliClient) var cliClient
let environment = try await cliClient.makeEnvVars(shared.envVarsRequest(logger: logger))
logger.logLevel = cliClient.logLevel(environment)
return try cliClient.makeClient(.init(
envVars: environment,
eventLoopGroup: eventLoopGroup,
logger: logger
))
}
}
}
// MARK: - Helpers

View File

@@ -0,0 +1,50 @@
import ArgumentParser
import CliClient
import Logging
import Models
import MQTTNIO
extension Application {
struct SharedOptions: ParsableArguments {
@Option(
name: [.short, .customLong("env-file")],
help: "A file path to an env file."
)
var envFile: String?
@Option(
name: [.short, .customLong("log-level")],
help: "Set the logging level."
)
var logLevel: LogLevelContainer?
@Option(
name: [.short, .long],
help: "Set the MQTT connecition version."
)
var version: String?
func envVarsRequest(logger: Logger?) -> CliClient.EnvVarsRequest {
.init(envFilePath: envFile, logger: logger, version: version)
}
}
}
/// A container type for making `Logger.Level` into a type
/// that can be parsed as a command line argument. This is
/// to suppress warnings vs. having `Logger.Level` adopt the
/// protocol.
@_spi(Internal)
public struct LogLevelContainer: ExpressibleByArgument {
public let logLevel: Logger.Level?
public init?(argument: String) {
self.logLevel = .init(rawValue: argument.lowercased())
}
public func callAsFunction() -> Logger.Level? {
logLevel
}
}

View File

@@ -0,0 +1,10 @@
import Models
@_spi(Internal)
public extension Array where Element == TemperatureAndHumiditySensor {
static var live: Self {
TemperatureAndHumiditySensor.Location.allCases.map { location in
TemperatureAndHumiditySensor(location: location)
}
}
}

View File

@@ -1,221 +0,0 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Shutdown the connection to the MQTT broker.
///
/// - Note: You should cancel any tasks that are listening to the connection stream first.
public var shutdown: @Sendable () -> Void
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
public static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init {
try await manager.connect(cleanSession: cleanSession)
} shutdown: {
manager.shutdown()
} stream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
} _withClient: { callback in
try await callback(client)
}
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
// MARK: - Helpers
@_spi(Internal)
public final actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
private let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.logger = logger
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
self.client = client
self.logger = logger
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
// We should've already logged that we're shutting down if
// the manager was shutdown properly, so don't log it twice.
self.shutdown(withLogging: false)
}
private func setHasConnected() {
hasConnected = true
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
}

View File

@@ -1,17 +1,18 @@
import Dependencies
import Logging
import Models
import MQTTConnectionManager
import MQTTManager
import ServiceLifecycle
public actor MQTTConnectionService: Service {
@Dependency(\.mqttConnectionManager) var manager
public struct MQTTConnectionService: Service {
private nonisolated let logger: Logger?
private let logger: Logger?
public init(
logger: Logger? = nil
) {
var logger = logger
logger?[metadataKey: "type"] = "mqtt-connection-service"
self.logger = logger
}
@@ -19,19 +20,19 @@ public actor MQTTConnectionService: Service {
/// to the MQTT broker and handles graceful shutdown of the
/// connection.
public func run() async throws {
@Dependency(\.mqtt) var mqtt
try await mqtt.connect()
try await withGracefulShutdownHandler {
try await manager.connect()
for await event in try manager.stream().cancelOnGracefulShutdown() {
for await event in try mqtt.connectionStream().cancelOnGracefulShutdown() {
// We don't really need to do anything with the events, so just logging
// for now. But we need to iterate on an async stream for the service to
// continue to run and handle graceful shutdowns.
logger?.trace("Received connection event: \(event)")
}
// when we reach here we are shutting down, so we shutdown
// the manager.
manager.shutdown()
} onGracefulShutdown: {
self.logger?.trace("Received graceful shutdown.")
mqtt.shutdown()
}
}
}

View File

@@ -0,0 +1,221 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker, listen to topics, and publish values back to the
/// broker.
var mqtt: MQTTManager {
get { self[MQTTManager.self] }
set { self[MQTTManager.self] = newValue }
}
}
/// Represents the interface needed to connect, listen, and publish to an MQTT broker.
///
@DependencyClient
public struct MQTTManager: Sendable {
public typealias ListenStream = AsyncStream<MQTTPublishInfo>
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Create a stream of connection events.
///
/// - SeeAlso: ``Event``
public var connectionStream: @Sendable () throws -> AsyncStream<Event>
private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream
/// Publish a value to the MQTT broker for a given topic.
public var publish: @Sendable (PublishRequest) async throws -> Void
/// Shutdown the connection to the MQTT broker.
public var shutdown: @Sendable () -> Void
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
public init(
connect: @escaping @Sendable () async throws -> Void,
connectionStream: @escaping @Sendable () throws -> AsyncStream<MQTTManager.Event>,
listen: @escaping @Sendable ([String], MQTTQoS) async throws -> MQTTManager.ListenStream,
publish: @escaping @Sendable (MQTTManager.PublishRequest) async throws -> Void,
shutdown: @escaping @Sendable () -> Void,
withClient: @escaping @Sendable ((MQTTClient) async throws -> Void) async throws -> Void = { _ in unimplemented() }
) {
self.connect = connect
self.connectionStream = connectionStream
self._listen = listen
self.publish = publish
self.shutdown = shutdown
self._withClient = withClient
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await listen(to: topics, qos: qos)
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - payload: The value to publish.
/// - topicName: The topic to publish the new value to.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
_ payload: ByteBuffer,
to topicName: String,
qos: MQTTQoS,
retain: Bool = false,
properties: MQTTProperties = .init()
) async throws {
try await publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain,
properties: properties
))
}
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Internal) import MQTTManager` to use this method.
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Equatable, Sendable {
case connected
case disconnected
case shuttingDown
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
public let properties: MQTTProperties
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool,
properties: MQTTProperties
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
self.properties = properties
}
}
}
public extension MQTTManager {
/// Create the live manager.
///
static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init(
connect: { try await manager.connect(cleanSession: cleanSession) },
connectionStream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
},
listen: { topics, qos in
try await manager.listen(to: topics, qos: qos)
},
publish: { request in
let topic = request.topicName
guard client.isActive() else {
logger?.debug("Client is not active, unable to publish to topic: \(topic)")
return
}
logger?.trace("Begin publishing to topic: \(topic)")
defer { logger?.debug("Done publishing to topic: \(topic)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain,
properties: request.properties
).get()
},
shutdown: {
Task { try await client.shutdown() }
manager.shutdown()
},
withClient: { callback in
try await callback(client)
}
)
}
}
extension MQTTManager: TestDependencyKey {
public static let testValue: MQTTManager = Self()
}

View File

@@ -0,0 +1,98 @@
import Foundation
import Logging
import MQTTNIO
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
private var listeners: [TopicListenerStream] = []
private var isShuttingDown = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
var logger = logger
logger?[metadataKey: "instance"] = "\(Self.self)"
self.logger = logger
self.client = client
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
if !isShuttingDown {
let message = """
Did not properly close the connection manager. This can lead to
dangling references.
Please call `shutdown` to properly close all connections and listener streams.
"""
logger?.warning("\(message)")
self.shutdown()
}
}
private func setHasConnected() {
hasConnected = true
}
func listen(
to topics: [String],
qos: MQTTQoS
) async throws -> MQTTManager.ListenStream {
let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos)
listeners.append(listener)
await listener.start()
return listener.stream
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
private func shutdownListeners() {
_ = listeners.map { $0.shutdown() }
listeners = []
isShuttingDown = true
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
Task { await shutdownListeners() }
}
}

View File

@@ -0,0 +1,74 @@
import Foundation
import Logging
import MQTTNIO
@_spi(Internal)
public actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
nonisolated let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}

View File

@@ -0,0 +1,141 @@
import Foundation
import Logging
import MQTTNIO
actor TopicListenerStream {
typealias Stream = MQTTManager.ListenStream
private let client: MQTTClient
private let configuration: Configuration
private let continuation: Stream.Continuation
private let logger: Logger?
private let name: String
let stream: Stream
private var shuttingDown: Bool = false
private var onShutdownHandler: (@Sendable () -> Void)?
init(
client: MQTTClient,
logger: Logger?,
topics: [String],
qos: MQTTQoS
) {
// Setup the logger so we can more easily decipher log messages.
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = Stream.makeStream()
self.client = client
self.configuration = .init(qos: qos, topics: topics)
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
struct Configuration: Sendable {
let qos: MQTTQoS
let topics: [String]
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
private func subscribe() async throws {
guard !shuttingDown else { return }
logger?.debug("Begin subscribing to topics.")
do {
_ = try await client.subscribe(to: configuration.topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: configuration.qos)
})
} catch {
logger?.error("Received error while subscribing to topics: \(configuration.topics)")
throw TopicListenerError.failedToSubscribe
}
logger?.debug("Done subscribing to topics.")
}
public func start() {
logger?.trace("Starting listener for topics: \(configuration.topics)")
let stream = MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
let task = Task {
// Listen for connection events to restablish the stream upon a
// client becoming disconnected / reconnected, and properly shutdown
// the stream on the client being shutdown.
for await event in stream {
logger?.trace("Received event: \(event)")
switch event {
case .shuttingDown:
shutdown()
case .disconnected:
try await Task.sleep(for: .milliseconds(100))
case .connected:
try await subscribe()
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.logger?.error("Received error while listening: \(error)")
case let .success(publishInfo):
if self.configuration.topics.contains(publishInfo.topicName) {
self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(publishInfo)
}
}
}
}
}
}
onShutdownHandler = { task.cancel() }
}
private func setIsShuttingDown() {
shuttingDown = true
onShutdownHandler = nil
}
public nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task {
await onShutdownHandler?()
await self.setIsShuttingDown()
}
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -25,6 +25,12 @@ public struct EnvVars: Codable, Equatable, Sendable {
/// The MQTT user password.
public var password: String?
/// Set a custom logging level.
public var logLevel: Logger.Level?
/// Set the mqtt broker version.
public var version: String?
/// Create a new ``EnvVars``
///
/// - Parameters:
@@ -38,9 +44,11 @@ public struct EnvVars: Codable, Equatable, Sendable {
appEnv: AppEnv = .development,
host: String = "127.0.0.1",
port: String? = "1883",
identifier: String = "dewPoint-controller",
identifier: String = "dewpoint-controller",
userName: String? = "mqtt_user",
password: String? = "secret!"
password: String? = "secret!",
logLevel: Logger.Level? = nil,
version: String? = nil
) {
self.appEnv = appEnv
self.host = host
@@ -48,6 +56,8 @@ public struct EnvVars: Codable, Equatable, Sendable {
self.identifier = identifier
self.userName = userName
self.password = password
self.logLevel = logLevel
self.version = version
}
/// Custom coding keys.
@@ -58,6 +68,8 @@ public struct EnvVars: Codable, Equatable, Sendable {
case identifier = "MQTT_IDENTIFIER"
case userName = "MQTT_USERNAME"
case password = "MQTT_PASSWORD"
case logLevel = "LOG_LEVEL"
case version = "MQTT_VERSION"
}
/// Represents the different app environments.

View File

@@ -1,42 +0,0 @@
import NIO
import NIOFoundationCompat
import PsychrometricClient
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: inout ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let string = buffer.readString(
length: buffer.readableBytes,
encoding: String.Encoding.utf8
)
else { return nil }
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = RawValue(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}

View File

@@ -3,12 +3,11 @@ import DependenciesMacros
import Foundation
import Logging
import Models
import MQTTConnectionManager
import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClient
import ServiceLifecycle
import TopicDependencies
/// Service that is responsible for listening to changes of the temperature and humidity
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
@@ -17,9 +16,7 @@ import TopicDependencies
///
public actor SensorsService: Service {
@Dependency(\.mqttConnectionManager.stream) var connectionStream
@Dependency(\.topicListener) var topicListener
@Dependency(\.topicPublisher) var topicPublisher
@Dependency(\.mqtt) var mqtt
/// The logger to use for the service.
private let logger: Logger?
@@ -27,9 +24,9 @@ public actor SensorsService: Service {
/// The sensors that we are listening for updates to, so
/// that we can calculate the dew-point temperature and enthalpy
/// values to publish back to the MQTT broker.
var sensors: [TemperatureAndHumiditySensor]
private var sensors: [TemperatureAndHumiditySensor]
var topics: [String] {
private var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity)
@@ -57,30 +54,16 @@ public actor SensorsService: Service {
public func run() async throws {
precondition(sensors.count > 0, "Sensors should not be empty.")
try await withGracefulShutdownHandler {
// Listen for connection events, so that we can automatically
// reconnect any sensor topics we're listening to upon a disconnect / reconnect
// event. We can also shutdown any topic listeners upon a shutdown event.
for await event in try connectionStream().cancelOnGracefulShutdown() {
switch event {
case .shuttingDown:
logger?.debug("Received shutdown event.")
try await self.shutdown()
case .disconnected:
logger?.debug("Received disconnected event.")
try await Task.sleep(for: .milliseconds(100))
case .connected:
logger?.debug("Received connected event.")
let stream = try await makeStream()
await withGracefulShutdownHandler {
for await result in stream.cancelOnGracefulShutdown() {
logger?.debug("Received result for topic: \(result.topic)")
await self.handleResult(result)
}
}
await handleResult(result)
}
} onGracefulShutdown: {
Task {
self.logger?.debug("Received graceful shutdown.")
Task {
try await self.shutdown()
}
}
@@ -89,24 +72,13 @@ public actor SensorsService: Service {
@_spi(Internal)
public func shutdown() async throws {
try await publishUpdates()
topicListener.shutdown()
}
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
try await topicListener.listen(to: topics)
// ignore errors, so that we continue to listen, but log them
// for debugging purposes.
.compactMap { result in
switch result {
case let .failure(error):
self.logger?.debug("Received error listening for sensors: \(error)")
return nil
case let .success(info):
return (info.payload, info.topicName)
}
}
// ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently.
try await mqtt.listen(to: topics)
.map { ($0.payload, $0.topicName) }
.removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic
@@ -121,8 +93,7 @@ public actor SensorsService: Service {
logger?.debug("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
var buffer = result.buffer
return V(buffer: &buffer)
return V(buffer: result.buffer)
}
if topic.contains("temperature") {
@@ -153,9 +124,9 @@ public actor SensorsService: Service {
private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return }
try await topicPublisher.publish(
try await mqtt.publish(
ByteBufferAllocator().buffer(string: "\(double)"),
to: topic,
payload: ByteBufferAllocator().buffer(string: "\(double)"),
qos: .exactlyOnce,
retain: true
)
@@ -204,3 +175,38 @@ private extension Array where Element == TemperatureAndHumiditySensor {
self[index].needsProcessed = false
}
}
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: ByteBuffer) {
let string = String(buffer: buffer)
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = RawValue(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}

View File

@@ -1,186 +0,0 @@
import Dependencies
import DependenciesMacros
import Foundation
import MQTTNIO
/// A dependency that can generate an async stream of changes to the given topics.
///
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
/// to generate the live dependency.
@DependencyClient
public struct TopicListener: Sendable {
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, any Error>>
/// Create an async stream that listens for changes to the given topics.
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
/// Shutdown the listener stream.
public var shutdown: @Sendable () -> Void
/// Create a new topic listener.
///
/// - Parameters:
/// - listen: Generate an async stream of changes for the given topics.
/// - shutdown: Shutdown the topic listener stream.
public init(
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
shutdown: @Sendable @escaping () -> Void
) {
self._listen = listen
self.shutdown = shutdown
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await listen(to: topics, qos: qos)
}
/// Create the live implementation of the topic listener with the given MQTTClient.
///
/// - Parameters:
/// - client: The MQTTClient to use.
public static func live(client: MQTTClient) -> Self {
let listener = MQTTTopicListener(client: client)
return .init(
listen: { try await listener.listen($0, $1) },
shutdown: { listener.shutdown() }
)
}
}
extension TopicListener: TestDependencyKey {
public static var testValue: TopicListener { Self() }
}
public extension DependencyValues {
var topicListener: TopicListener {
get { self[TopicListener.self] }
set { self[TopicListener.self] = newValue }
}
}
// MARK: - Helpers
private actor MQTTTopicListener {
private let client: MQTTClient
private let continuation: TopicListener.Stream.Continuation
private let name: String
let stream: TopicListener.Stream
private var shuttingDown: Bool = false
init(
client: MQTTClient
) {
let (stream, continuation) = TopicListener.Stream.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws -> TopicListener.Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw TopicListenerError.connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
let subscription = try? await client.subscribe(to: topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
})
guard subscription != nil else {
client.logger.error("Error subscribing to topics: \(topics)")
throw TopicListenerError.failedToSubscribe
}
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.client.logger.error("Received error while listening: \(error)")
self.continuation.yield(.failure(MQTTListenResultError(error)))
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(.success(publishInfo))
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
}
nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task { await self.setIsShuttingDown() }
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -1,117 +0,0 @@
import Dependencies
import DependenciesMacros
import MQTTNIO
import NIO
/// A dependency that is responsible for publishing values to an MQTT broker.
///
/// - Note: This dependency only conforms to `TestDependencyKey` because it
/// requires an active `MQTTClient` to generate the live dependency.
@DependencyClient
public struct TopicPublisher: Sendable {
private var _publish: @Sendable (PublishRequest) async throws -> Void
/// Create a new topic publisher.
///
/// - Parameters:
/// - publish: Handle the publish request.
public init(
publish: @Sendable @escaping (PublishRequest) async throws -> Void
) {
self._publish = publish
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - topicName: The topic to publish the new value to.
/// - payload: The value to publish.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
to topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await _publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Create the live topic publisher with the given `MQTTClient`.
///
/// - Parameters:
/// - client: The mqtt broker client to use.
public static func live(client: MQTTClient) -> Self {
.init(
publish: { request in
guard client.isActive() else {
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
return
}
client.logger.trace("Begin publishing to topic: \(request.topicName)")
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
}
)
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
extension TopicPublisher: TestDependencyKey {
public static var testValue: TopicPublisher { Self() }
}
public extension DependencyValues {
/// A dependency that is responsible for publishing values to an MQTT broker.
var topicPublisher: TopicPublisher {
get { self[TopicPublisher.self] }
set { self[TopicPublisher.self] = newValue }
}
}

View File

@@ -1,119 +0,0 @@
import Dependencies
import Foundation
import Logging
import Models
import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import PsychrometricClientLive
import SensorsService
import ServiceLifecycle
import TopicDependencies
@main
struct Application {
/// The main entry point of the application.
static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var logger = Logger(label: "dewpoint-controller")
logger.logLevel = .trace
logger.info("Starting dewpoint-controller!")
let environment = loadEnvVars(logger: logger)
if environment.appEnv == .production {
logger.debug("Updating logging level to info.")
logger.logLevel = .info
}
let mqtt = MQTTClient(
envVars: environment,
eventLoopGroup: eventloopGroup,
logger: logger
)
do {
try await withDependencies {
$0.psychrometricClient = .liveValue
$0.topicListener = .live(client: mqtt)
$0.topicPublisher = .live(client: mqtt)
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
} operation: {
let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger)
var serviceGroupConfiguration = ServiceGroupConfiguration(
services: [
mqttConnection,
sensors
],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
try await serviceGroup.run()
}
try await mqtt.shutdown()
try await eventloopGroup.shutdownGracefully()
} catch {
try await eventloopGroup.shutdownGracefully()
}
}
}
// MARK: - Helpers
private func loadEnvVars(logger: Logger) -> EnvVars {
let defaultEnvVars = EnvVars()
let encoder = JSONEncoder()
let decoder = JSONDecoder()
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
.flatMap { try? decoder.decode([String: String].self, from: $0) }
?? [:]
let envVarsDict = defaultEnvDict
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
?? defaultEnvVars
logger.debug("Done loading EnvVars...")
return envVars
}
private extension MQTTNIO.MQTTClient {
convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
self.init(
host: envVars.host,
port: envVars.port != nil ? Int(envVars.port!) : nil,
identifier: envVars.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: .v3_1_1,
disablePing: false,
userName: envVars.userName,
password: envVars.password
)
)
}
}
private extension Array where Element == TemperatureAndHumiditySensor {
static var live: Self {
TemperatureAndHumiditySensor.Location.allCases.map { location in
TemperatureAndHumiditySensor(location: location)
}
}
}

View File

@@ -0,0 +1,215 @@
import Dependencies
// @_spi(Internal) import dewpoint_controller
import Logging
import Models
import MQTTConnectionService
@_spi(Internal) import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
@_spi(Internal) import SensorsService
import ServiceLifecycle
import ServiceLifecycleTestKit
import XCTest
final class IntegrationTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "IntegrationTests")
logger.logLevel = .info
return logger
}()
override func invokeTest() {
let client = createClient(identifier: "\(Self.self)")
withDependencies {
$0.mqtt = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue
} operation: {
super.invokeTest()
}
}
func testConnectionServiceShutdown() async throws {
@Dependency(\.mqtt) var mqtt
let service = MQTTConnectionService(logger: Self.logger)
let task = Task { try await service.run() }
defer { task.cancel() }
try await Task.sleep(for: .milliseconds(200))
// check the connection is active here.
try await mqtt.withClient { client in
XCTAssert(client.isActive())
}
mqtt.shutdown()
try await Task.sleep(for: .milliseconds(500))
// check the connection is active here.
try await mqtt.withClient { client in
XCTAssertFalse(client.isActive())
}
}
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
defer { manager.shutdown() }
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTManager.Event]()
var events2 = [MQTTManager.Event]()
let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
try await client.disconnect()
try await Task.sleep(for: .milliseconds(500))
manager.shutdown()
try await Task.sleep(for: .milliseconds(500))
connectionStream1.stop()
connectionStream2.stop()
}
for await event in stream1.removeDuplicates() {
events1.append(event)
}
for await event in stream2.removeDuplicates() {
events2.append(event)
}
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func testListeningResumesAfterDisconnectThenReconnect() async throws {
struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return)
let results = ResultContainer()
try await withDependencies {
$0.mqtt.publish = results.append
} operation: {
@Dependency(\.mqtt) var manager
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() }
defer { task.cancel() }
try await manager.connect()
defer { manager.shutdown() }
try await manager.withClient { client in
try await client.disconnect()
try await client.connect()
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
// Give time to re-subscribe.
try await Task.sleep(for: .milliseconds(200))
try await client.publish(
to: sensor.topics.temperature,
payload: ByteBufferAllocator().buffer(string: "25"),
qos: .atLeastOnce,
retain: false
)
try await client.publish(
to: sensor.topics.humidity,
payload: ByteBufferAllocator().buffer(string: "50"),
qos: .atLeastOnce,
retain: false
)
}
var timeoutCount = 0
while !(await results.count == 2) {
guard timeoutCount < 20 else {
throw TimeoutError()
}
try await Task.sleep(for: .milliseconds(100))
timeoutCount += 1
}
let results = await results.results()
XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
try await sensorsService.shutdown()
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
// return .init(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: Self.logger)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: Self.logger,
configuration: config
)
}
}
// - MARK: Helpers
struct TopicNotFoundError: Error {}
actor ResultContainer: Sendable {
private var storage = [MQTTManager.PublishRequest]()
init() {}
@Sendable func append(_ result: MQTTManager.PublishRequest) async {
storage.append(result)
}
var count: Int {
get async { storage.count }
}
func results() async -> [MQTTManager.PublishRequest] {
storage
}
}

View File

@@ -1,92 +0,0 @@
import AsyncAlgorithms
import Logging
import Models
@_spi(Internal) import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO
import NIO
import ServiceLifecycle
import ServiceLifecycleTestKit
import XCTest
final class MQTTConnectionServiceTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "MQTTConnectionServiceTests")
logger.logLevel = .trace
return logger
}()
// TODO: Move to integration tests.
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTConnectionManager.Event]()
var events2 = [MQTTConnectionManager.Event]()
let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
manager.shutdown()
try await client.disconnect()
try await Task.sleep(for: .seconds(1))
try await client.shutdown()
try await Task.sleep(for: .seconds(1))
connectionStream1.stop()
connectionStream2.stop()
}
for await event in stream1.removeDuplicates() {
events1.append(event)
}
for await event in stream2.removeDuplicates() {
events2.append(event)
}
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
}

View File

@@ -1,175 +0,0 @@
import Dependencies
import Logging
import Models
@_spi(Internal) import MQTTConnectionManager
import MQTTNIO
import NIO
import PsychrometricClientLive
@_spi(Internal) import SensorsService
import TopicDependencies
import XCTest
final class SensorsClientTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "SensorsClientTests")
logger.logLevel = .trace
return logger
}()
override func invokeTest() {
let client = createClient(identifier: "\(Self.self)")
withDependencies {
$0.mqttConnectionManager = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue
$0.topicListener = .live(client: client)
$0.topicPublisher = .live(client: client)
} operation: {
super.invokeTest()
}
}
func testListeningResumesAfterDisconnectThenReconnect() async throws {
@Dependency(\.mqttConnectionManager) var manager
struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return)
var results = [TopicPublisher.PublishRequest]()
try await withDependencies {
$0.topicPublisher = .capturing { results.append($0) }
} operation: {
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() }
defer { task.cancel() }
try await manager.connect()
defer { manager.shutdown() }
try await manager.withClient { client in
try await client.disconnect()
try await client.connect()
try await Task.sleep(for: .milliseconds(100))
try await client.publish(
to: sensor.topics.temperature,
payload: ByteBufferAllocator().buffer(string: "25"),
qos: .atLeastOnce,
retain: false
)
try await client.publish(
to: sensor.topics.humidity,
payload: ByteBufferAllocator().buffer(string: "50"),
qos: .atLeastOnce,
retain: false
)
}
var timeoutCount = 0
while !(results.count == 2) {
guard timeoutCount < 20 else {
throw TimeoutError()
}
try await Task.sleep(for: .milliseconds(100))
timeoutCount += 1
}
XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
try await sensorsService.shutdown()
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
logger: Self.logger,
configuration: config
)
}
}
// MARK: Helpers for tests.
class PublishInfoContainer {
private(set) var info: [MQTTPublishInfo]
private var topicFilters: [String]?
init(topicFilters: [String]? = nil) {
self.info = []
self.topicFilters = topicFilters
}
func addPublishInfo(_ info: MQTTPublishInfo) async {
guard let topicFilters else {
self.info.append(info)
return
}
if topicFilters.contains(info.topicName) {
self.info.append(info)
}
}
}
extension TopicPublisher {
static func capturing(
_ callback: @escaping (PublishRequest) -> Void
) -> Self {
.init { callback($0) }
}
}
// extension SensorsClient {
//
// static func testing(
// yielding: [(value: Double, to: String)],
// capturePublishedValues: @escaping (Double, String) -> Void,
// captureShutdownEvent: @escaping (Bool) -> Void
// ) -> Self {
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
// let logger = Logger(label: "\(Self.self).testing")
//
// return .init(
// listen: { topics in
// for (value, topic) in yielding where topics.contains(topic) {
// continuation.yield(
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
// )
// }
// return stream
// },
// logger: logger,
// publish: { value, topic in
// capturePublishedValues(value, topic)
// },
// shutdown: {
// captureShutdownEvent(true)
// continuation.finish()
// }
// )
// }
// }
struct TopicNotFoundError: Error {}

View File

@@ -1,53 +0,0 @@
# run this with docker-compose run test
services:
server:
image: swift-mqtt-dewpoint:latest
restart: unless-stopped
env_file: .env
local:
container_name: local-server
build:
context: .
dockerfile: Dockerfile
depends_on:
- mosquitto
environment:
- MQTT_HOST=mosquitto
test:
build:
context: .
dockerfile: Dockerfile.test
working_dir: /app
networks:
- test
depends_on:
- mosquitto-test
environment:
- MOSQUITTO_SERVER=mosquitto-test
command: /bin/bash -xc "swift package clean && swift test"
mosquitto-test:
image: eclipse-mosquitto
networks:
- test
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/certs:/mosquitto/certs
mosquitto:
image: eclipse-mosquitto
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/certs:/mosquitto/certs
ports:
- "1883:1883"
- "8883:8883"
- "8080:8080"
- "8081:8081"
networks:
test:
driver: bridge
external: false

15
docker/Dockerfile Executable file
View File

@@ -0,0 +1,15 @@
# Used this to build the release version of the image.
# Build the executable
ARG SWIFT_IMAGE_VERSION="5.10"
FROM swift:${SWIFT_IMAGE_VERSION} AS build
WORKDIR /build
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build -c release -Xswiftc -g
# Run image
FROM swift:${SWIFT_IMAGE_VERSION}-slim
COPY --from=build /build/.build/release/dewpoint-controller /usr/local/bin
CMD ["/bin/bash", "-xc", "/usr/local/bin/dewpoint-controller run"]

View File

@@ -0,0 +1,5 @@
# Used to build a local MQTT broker for development and
# testing.
FROM eclipse-mosquitto:latest
COPY ./mosquitto/config/mosquitto.conf /mosquitto/config/mosquitto.conf
EXPOSE 1883

View File

@@ -1,6 +1,8 @@
# Used to build a test image.
FROM swift:5.10
WORKDIR /app
COPY ./Package.* ./
RUN swift package resolve
COPY . .
RUN swift build
CMD ["/bin/bash", "-xc", "swift", "test"]

17
docker/docker-compose-test.yaml Executable file
View File

@@ -0,0 +1,17 @@
# run this with docker-compose run test
services:
test:
build:
context: ..
dockerfile: docker/Dockerfile.test
working_dir: /app
depends_on:
- mosquitto
environment:
- MOSQUITTO_SERVER=mosquitto
command: /bin/bash -xc "swift test"
mosquitto:
build:
context: ..
dockerfile: docker/Dockerfile.mosquitto

18
docker/docker-compose.yaml Executable file
View File

@@ -0,0 +1,18 @@
# run this with docker-compose run dewpoint_controller
services:
dewpoint_controller:
container_name: dewpoint-controller
build:
context: ..
dockerfile: docker/Dockerfile
depends_on:
- mosquitto
environment:
- MQTT_HOST=mosquitto
mosquitto:
build:
context: ..
dockerfile: docker/Dockerfile.mosquitto
ports:
- "1883:1883"