Compare commits
54 Commits
psychromet
...
6472d3cd1e
| Author | SHA1 | Date | |
|---|---|---|---|
|
6472d3cd1e
|
|||
|
3416ce1003
|
|||
|
c84427a9b3
|
|||
|
947472f62d
|
|||
|
d16135dd50
|
|||
|
19e97652fd
|
|||
|
1089452212
|
|||
|
5e998a60d0
|
|||
|
9e2af22a36
|
|||
|
89f3601c2c
|
|||
|
d4b6f6ad2b
|
|||
|
ec3cd40fef
|
|||
|
953c9d5b7c
|
|||
|
00bb6ca1a6
|
|||
|
41fb3c5715
|
|||
|
8e4430804c
|
|||
|
a8f689136d
|
|||
|
2607be6658
|
|||
|
b05e18b258
|
|||
|
394b49d1a0
|
|||
|
6bec0d6fa5
|
|||
|
63d65bd7cd
|
|||
|
320f3e792e
|
|||
|
74b73e7534
|
|||
|
7954fc5dcd
|
|||
|
115c4dc252
|
|||
|
853a157ae7
|
|||
|
30b8ea3661
|
|||
|
d26ab714ab
|
|||
|
b45ad76fff
|
|||
|
c4395b9089
|
|||
|
b3874b96c5
|
|||
|
4024bb624f
|
|||
|
6371ffed47
|
|||
|
76b06e86fa
|
|||
|
fccfa4d006
|
|||
|
5df08d6c91
|
|||
|
1c99e4861d
|
|||
|
a0b7053eae
|
|||
|
df3ed6a407
|
|||
|
1d9d8dc449
|
|||
|
9a53d36f4c
|
|||
|
44a6a878eb
|
|||
|
c13a1a14a3
|
|||
|
6c916215ea
|
|||
|
be7442c06a
|
|||
|
26a30c2a07
|
|||
|
5f131d8fa2
|
|||
|
d6e217f556
|
|||
|
b39ccafc92
|
|||
|
8336c56adf
|
|||
|
fac8945386
|
|||
|
5b319cae9b
|
|||
|
ca7024cb60
|
@@ -10,5 +10,12 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Test
|
||||
run: make test
|
||||
- name: Setup QEMU
|
||||
uses: docker/setup-qemu-action@v3
|
||||
- name: Setup Docker buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
- name: Run Test
|
||||
run: make test-docker
|
||||
- name: Cleanup.
|
||||
if: always()
|
||||
run: docker compose --file docker/docker-compose-test.yaml down
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -10,3 +10,5 @@ DerivedData/
|
||||
mqtt_password.txt
|
||||
.env
|
||||
.smbdelete*
|
||||
buildServer.json
|
||||
.nvim/*
|
||||
|
||||
2
.swiftlint.yml
Normal file
2
.swiftlint.yml
Normal file
@@ -0,0 +1,2 @@
|
||||
disabled_rules:
|
||||
- closing_brace
|
||||
29
.swiftpm/dewpoint-controller-Package.xctestplan
Normal file
29
.swiftpm/dewpoint-controller-Package.xctestplan
Normal file
@@ -0,0 +1,29 @@
|
||||
{
|
||||
"configurations" : [
|
||||
{
|
||||
"id" : "AFB1047B-4742-43D2-AFB9-680C1CB2D273",
|
||||
"name" : "Test Scheme Action",
|
||||
"options" : {
|
||||
|
||||
}
|
||||
}
|
||||
],
|
||||
"defaultOptions" : {
|
||||
"targetForVariableExpansion" : {
|
||||
"containerPath" : "container:",
|
||||
"identifier" : "dewpoint-controller",
|
||||
"name" : "dewpoint-controller"
|
||||
}
|
||||
},
|
||||
"testTargets" : [
|
||||
{
|
||||
"parallelizable" : true,
|
||||
"target" : {
|
||||
"containerPath" : "container:",
|
||||
"identifier" : "IntegrationTests",
|
||||
"name" : "IntegrationTests"
|
||||
}
|
||||
}
|
||||
],
|
||||
"version" : 1
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Scheme
|
||||
LastUpgradeVersion = "1330"
|
||||
version = "1.3">
|
||||
<BuildAction
|
||||
parallelizeBuildables = "YES"
|
||||
buildImplicitDependencies = "YES">
|
||||
<BuildActionEntries>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "DewPointEnvironment"
|
||||
BuildableName = "DewPointEnvironment"
|
||||
BlueprintName = "DewPointEnvironment"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
</BuildActionEntries>
|
||||
</BuildAction>
|
||||
<TestAction
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES">
|
||||
<Testables>
|
||||
</Testables>
|
||||
</TestAction>
|
||||
<LaunchAction
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
launchStyle = "0"
|
||||
useCustomWorkingDirectory = "NO"
|
||||
ignoresPersistentStateOnLaunch = "NO"
|
||||
debugDocumentVersioning = "YES"
|
||||
debugServiceExtension = "internal"
|
||||
allowLocationSimulation = "YES">
|
||||
</LaunchAction>
|
||||
<ProfileAction
|
||||
buildConfiguration = "Release"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES"
|
||||
savedToolIdentifier = ""
|
||||
useCustomWorkingDirectory = "NO"
|
||||
debugDocumentVersioning = "YES">
|
||||
<MacroExpansion>
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "DewPointEnvironment"
|
||||
BuildableName = "DewPointEnvironment"
|
||||
BlueprintName = "DewPointEnvironment"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</MacroExpansion>
|
||||
</ProfileAction>
|
||||
<AnalyzeAction
|
||||
buildConfiguration = "Debug">
|
||||
</AnalyzeAction>
|
||||
<ArchiveAction
|
||||
buildConfiguration = "Release"
|
||||
revealArchiveInOrganizer = "YES">
|
||||
</ArchiveAction>
|
||||
</Scheme>
|
||||
@@ -1,67 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Scheme
|
||||
LastUpgradeVersion = "1330"
|
||||
version = "1.3">
|
||||
<BuildAction
|
||||
parallelizeBuildables = "YES"
|
||||
buildImplicitDependencies = "YES">
|
||||
<BuildActionEntries>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "EnvVars"
|
||||
BuildableName = "EnvVars"
|
||||
BlueprintName = "EnvVars"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
</BuildActionEntries>
|
||||
</BuildAction>
|
||||
<TestAction
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES">
|
||||
<Testables>
|
||||
</Testables>
|
||||
</TestAction>
|
||||
<LaunchAction
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
launchStyle = "0"
|
||||
useCustomWorkingDirectory = "NO"
|
||||
ignoresPersistentStateOnLaunch = "NO"
|
||||
debugDocumentVersioning = "YES"
|
||||
debugServiceExtension = "internal"
|
||||
allowLocationSimulation = "YES">
|
||||
</LaunchAction>
|
||||
<ProfileAction
|
||||
buildConfiguration = "Release"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES"
|
||||
savedToolIdentifier = ""
|
||||
useCustomWorkingDirectory = "NO"
|
||||
debugDocumentVersioning = "YES">
|
||||
<MacroExpansion>
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "EnvVars"
|
||||
BuildableName = "EnvVars"
|
||||
BlueprintName = "EnvVars"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</MacroExpansion>
|
||||
</ProfileAction>
|
||||
<AnalyzeAction
|
||||
buildConfiguration = "Debug">
|
||||
</AnalyzeAction>
|
||||
<ArchiveAction
|
||||
buildConfiguration = "Release"
|
||||
revealArchiveInOrganizer = "YES">
|
||||
</ArchiveAction>
|
||||
</Scheme>
|
||||
24
.swiftpm/xcode/xcshareddata/xcschemes/ClientLive.xcscheme → .swiftpm/xcode/xcshareddata/xcschemes/MQTTConnectionService.xcscheme
Executable file → Normal file
24
.swiftpm/xcode/xcshareddata/xcschemes/ClientLive.xcscheme → .swiftpm/xcode/xcshareddata/xcschemes/MQTTConnectionService.xcscheme
Executable file → Normal file
@@ -1,10 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Scheme
|
||||
LastUpgradeVersion = "1330"
|
||||
version = "1.3">
|
||||
LastUpgradeVersion = "1610"
|
||||
version = "1.7">
|
||||
<BuildAction
|
||||
parallelizeBuildables = "YES"
|
||||
buildImplicitDependencies = "YES">
|
||||
buildImplicitDependencies = "YES"
|
||||
buildArchitectures = "Automatic">
|
||||
<BuildActionEntries>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
@@ -14,9 +15,9 @@
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "ClientLive"
|
||||
BuildableName = "ClientLive"
|
||||
BlueprintName = "ClientLive"
|
||||
BlueprintIdentifier = "MQTTConnectionService"
|
||||
BuildableName = "MQTTConnectionService"
|
||||
BlueprintName = "MQTTConnectionService"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
@@ -26,9 +27,8 @@
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES">
|
||||
<Testables>
|
||||
</Testables>
|
||||
shouldUseLaunchSchemeArgsEnv = "YES"
|
||||
shouldAutocreateTestPlan = "YES">
|
||||
</TestAction>
|
||||
<LaunchAction
|
||||
buildConfiguration = "Debug"
|
||||
@@ -50,9 +50,9 @@
|
||||
<MacroExpansion>
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "ClientLive"
|
||||
BuildableName = "ClientLive"
|
||||
BlueprintName = "ClientLive"
|
||||
BlueprintIdentifier = "MQTTConnectionService"
|
||||
BuildableName = "MQTTConnectionService"
|
||||
BlueprintName = "MQTTConnectionService"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</MacroExpansion>
|
||||
24
.swiftpm/xcode/xcshareddata/xcschemes/Bootstrap.xcscheme → .swiftpm/xcode/xcshareddata/xcschemes/MQTTManager.xcscheme
Executable file → Normal file
24
.swiftpm/xcode/xcshareddata/xcschemes/Bootstrap.xcscheme → .swiftpm/xcode/xcshareddata/xcschemes/MQTTManager.xcscheme
Executable file → Normal file
@@ -1,10 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Scheme
|
||||
LastUpgradeVersion = "1330"
|
||||
version = "1.3">
|
||||
LastUpgradeVersion = "1610"
|
||||
version = "1.7">
|
||||
<BuildAction
|
||||
parallelizeBuildables = "YES"
|
||||
buildImplicitDependencies = "YES">
|
||||
buildImplicitDependencies = "YES"
|
||||
buildArchitectures = "Automatic">
|
||||
<BuildActionEntries>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
@@ -14,9 +15,9 @@
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "Bootstrap"
|
||||
BuildableName = "Bootstrap"
|
||||
BlueprintName = "Bootstrap"
|
||||
BlueprintIdentifier = "MQTTManager"
|
||||
BuildableName = "MQTTManager"
|
||||
BlueprintName = "MQTTManager"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
@@ -26,9 +27,8 @@
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES">
|
||||
<Testables>
|
||||
</Testables>
|
||||
shouldUseLaunchSchemeArgsEnv = "YES"
|
||||
shouldAutocreateTestPlan = "YES">
|
||||
</TestAction>
|
||||
<LaunchAction
|
||||
buildConfiguration = "Debug"
|
||||
@@ -50,9 +50,9 @@
|
||||
<MacroExpansion>
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "Bootstrap"
|
||||
BuildableName = "Bootstrap"
|
||||
BlueprintName = "Bootstrap"
|
||||
BlueprintIdentifier = "MQTTManager"
|
||||
BuildableName = "MQTTManager"
|
||||
BlueprintName = "MQTTManager"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</MacroExpansion>
|
||||
24
.swiftpm/xcode/xcshareddata/xcschemes/Client.xcscheme → .swiftpm/xcode/xcshareddata/xcschemes/SensorsService.xcscheme
Executable file → Normal file
24
.swiftpm/xcode/xcshareddata/xcschemes/Client.xcscheme → .swiftpm/xcode/xcshareddata/xcschemes/SensorsService.xcscheme
Executable file → Normal file
@@ -1,10 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Scheme
|
||||
LastUpgradeVersion = "1330"
|
||||
version = "1.3">
|
||||
LastUpgradeVersion = "1610"
|
||||
version = "1.7">
|
||||
<BuildAction
|
||||
parallelizeBuildables = "YES"
|
||||
buildImplicitDependencies = "YES">
|
||||
buildImplicitDependencies = "YES"
|
||||
buildArchitectures = "Automatic">
|
||||
<BuildActionEntries>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
@@ -14,9 +15,9 @@
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "Client"
|
||||
BuildableName = "Client"
|
||||
BlueprintName = "Client"
|
||||
BlueprintIdentifier = "SensorsService"
|
||||
BuildableName = "SensorsService"
|
||||
BlueprintName = "SensorsService"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
@@ -26,9 +27,8 @@
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES">
|
||||
<Testables>
|
||||
</Testables>
|
||||
shouldUseLaunchSchemeArgsEnv = "YES"
|
||||
shouldAutocreateTestPlan = "YES">
|
||||
</TestAction>
|
||||
<LaunchAction
|
||||
buildConfiguration = "Debug"
|
||||
@@ -50,9 +50,9 @@
|
||||
<MacroExpansion>
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "Client"
|
||||
BuildableName = "Client"
|
||||
BlueprintName = "Client"
|
||||
BlueprintIdentifier = "SensorsService"
|
||||
BuildableName = "SensorsService"
|
||||
BlueprintName = "SensorsService"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</MacroExpansion>
|
||||
213
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme
Executable file → Normal file
213
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller-Package.xcscheme
Executable file → Normal file
@@ -1,10 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Scheme
|
||||
LastUpgradeVersion = "1330"
|
||||
version = "1.3">
|
||||
LastUpgradeVersion = "1610"
|
||||
version = "1.7">
|
||||
<BuildAction
|
||||
parallelizeBuildables = "YES"
|
||||
buildImplicitDependencies = "YES">
|
||||
buildImplicitDependencies = "YES"
|
||||
buildArchitectures = "Automatic">
|
||||
<BuildActionEntries>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
@@ -14,9 +15,9 @@
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "Bootstrap"
|
||||
BuildableName = "Bootstrap"
|
||||
BlueprintName = "Bootstrap"
|
||||
BlueprintIdentifier = "MQTTConnectionService"
|
||||
BuildableName = "MQTTConnectionService"
|
||||
BlueprintName = "MQTTConnectionService"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
@@ -28,51 +29,9 @@
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "Client"
|
||||
BuildableName = "Client"
|
||||
BlueprintName = "Client"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "ClientLive"
|
||||
BuildableName = "ClientLive"
|
||||
BlueprintName = "ClientLive"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "DewPointEnvironment"
|
||||
BuildableName = "DewPointEnvironment"
|
||||
BlueprintName = "DewPointEnvironment"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "EnvVars"
|
||||
BuildableName = "EnvVars"
|
||||
BlueprintName = "EnvVars"
|
||||
BlueprintIdentifier = "MQTTManager"
|
||||
BuildableName = "MQTTManager"
|
||||
BlueprintName = "MQTTManager"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
@@ -90,62 +49,6 @@
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controller"
|
||||
BuildableName = "dewPoint-controller"
|
||||
BlueprintName = "dewPoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "NO"
|
||||
buildForArchiving = "NO"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "ClientTests"
|
||||
BuildableName = "ClientTests"
|
||||
BlueprintName = "ClientTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "NO"
|
||||
buildForArchiving = "NO"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controllerTests"
|
||||
BuildableName = "dewPoint-controllerTests"
|
||||
BlueprintName = "dewPoint-controllerTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "TopicsLive"
|
||||
BuildableName = "TopicsLive"
|
||||
BlueprintName = "TopicsLive"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
@@ -160,6 +63,48 @@
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewpoint-controller"
|
||||
BuildableName = "dewpoint-controller"
|
||||
BlueprintName = "dewpoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "IntegrationTests"
|
||||
BuildableName = "IntegrationTests"
|
||||
BlueprintName = "IntegrationTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "YES"
|
||||
buildForArchiving = "YES"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "CliClient"
|
||||
BuildableName = "CliClient"
|
||||
BlueprintName = "CliClient"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
</BuildActionEntries>
|
||||
</BuildAction>
|
||||
<TestAction
|
||||
@@ -167,44 +112,20 @@
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES">
|
||||
<TestPlans>
|
||||
<TestPlanReference
|
||||
reference = "container:.swiftpm/dewpoint-controller-Package.xctestplan"
|
||||
default = "YES">
|
||||
</TestPlanReference>
|
||||
</TestPlans>
|
||||
<Testables>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "ClientTests"
|
||||
BuildableName = "ClientTests"
|
||||
BlueprintName = "ClientTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controllerTests"
|
||||
BuildableName = "dewPoint-controllerTests"
|
||||
BlueprintName = "dewPoint-controllerTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "MQTTConnectionServiceTests"
|
||||
BuildableName = "MQTTConnectionServiceTests"
|
||||
BlueprintName = "MQTTConnectionServiceTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "SensorsServiceTests"
|
||||
BuildableName = "SensorsServiceTests"
|
||||
BlueprintName = "SensorsServiceTests"
|
||||
BlueprintIdentifier = "IntegrationTests"
|
||||
BuildableName = "IntegrationTests"
|
||||
BlueprintName = "IntegrationTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
@@ -223,9 +144,9 @@
|
||||
<MacroExpansion>
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controller"
|
||||
BuildableName = "dewPoint-controller"
|
||||
BlueprintName = "dewPoint-controller"
|
||||
BlueprintIdentifier = "dewpoint-controller"
|
||||
BuildableName = "dewpoint-controller"
|
||||
BlueprintName = "dewpoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</MacroExpansion>
|
||||
@@ -239,9 +160,9 @@
|
||||
<MacroExpansion>
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controller"
|
||||
BuildableName = "dewPoint-controller"
|
||||
BlueprintName = "dewPoint-controller"
|
||||
BlueprintIdentifier = "dewpoint-controller"
|
||||
BuildableName = "dewpoint-controller"
|
||||
BlueprintName = "dewpoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</MacroExpansion>
|
||||
|
||||
78
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller.xcscheme
Executable file → Normal file
78
.swiftpm/xcode/xcshareddata/xcschemes/dewPoint-controller.xcscheme
Executable file → Normal file
@@ -1,10 +1,11 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Scheme
|
||||
LastUpgradeVersion = "1330"
|
||||
version = "1.3">
|
||||
LastUpgradeVersion = "1610"
|
||||
version = "1.7">
|
||||
<BuildAction
|
||||
parallelizeBuildables = "YES"
|
||||
buildImplicitDependencies = "YES">
|
||||
buildImplicitDependencies = "YES"
|
||||
buildArchitectures = "Automatic">
|
||||
<BuildActionEntries>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
@@ -14,37 +15,9 @@
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controller"
|
||||
BuildableName = "dewPoint-controller"
|
||||
BlueprintName = "dewPoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "NO"
|
||||
buildForArchiving = "NO"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "ClientTests"
|
||||
BuildableName = "ClientTests"
|
||||
BlueprintName = "ClientTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
<BuildActionEntry
|
||||
buildForTesting = "YES"
|
||||
buildForRunning = "YES"
|
||||
buildForProfiling = "NO"
|
||||
buildForArchiving = "NO"
|
||||
buildForAnalyzing = "YES">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controllerTests"
|
||||
BuildableName = "dewPoint-controllerTests"
|
||||
BlueprintName = "dewPoint-controllerTests"
|
||||
BlueprintIdentifier = "dewpoint-controller"
|
||||
BuildableName = "dewpoint-controller"
|
||||
BlueprintName = "dewpoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildActionEntry>
|
||||
@@ -54,29 +27,8 @@
|
||||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
shouldUseLaunchSchemeArgsEnv = "YES">
|
||||
<Testables>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "ClientTests"
|
||||
BuildableName = "ClientTests"
|
||||
BlueprintName = "ClientTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
<TestableReference
|
||||
skipped = "NO">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controllerTests"
|
||||
BuildableName = "dewPoint-controllerTests"
|
||||
BlueprintName = "dewPoint-controllerTests"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</TestableReference>
|
||||
</Testables>
|
||||
shouldUseLaunchSchemeArgsEnv = "YES"
|
||||
shouldAutocreateTestPlan = "YES">
|
||||
</TestAction>
|
||||
<LaunchAction
|
||||
buildConfiguration = "Debug"
|
||||
@@ -92,9 +44,9 @@
|
||||
runnableDebuggingMode = "0">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controller"
|
||||
BuildableName = "dewPoint-controller"
|
||||
BlueprintName = "dewPoint-controller"
|
||||
BlueprintIdentifier = "dewpoint-controller"
|
||||
BuildableName = "dewpoint-controller"
|
||||
BlueprintName = "dewpoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildableProductRunnable>
|
||||
@@ -109,9 +61,9 @@
|
||||
runnableDebuggingMode = "0">
|
||||
<BuildableReference
|
||||
BuildableIdentifier = "primary"
|
||||
BlueprintIdentifier = "dewPoint-controller"
|
||||
BuildableName = "dewPoint-controller"
|
||||
BlueprintName = "dewPoint-controller"
|
||||
BlueprintIdentifier = "dewpoint-controller"
|
||||
BuildableName = "dewpoint-controller"
|
||||
BlueprintName = "dewpoint-controller"
|
||||
ReferencedContainer = "container:">
|
||||
</BuildableReference>
|
||||
</BuildableProductRunnable>
|
||||
|
||||
14
Dockerfile
14
Dockerfile
@@ -1,14 +0,0 @@
|
||||
|
||||
# Build the executable
|
||||
FROM swift:5.10 AS build
|
||||
WORKDIR /build
|
||||
COPY ./Package.* ./
|
||||
RUN swift package resolve
|
||||
COPY . .
|
||||
RUN swift build -c release -Xswiftc -g
|
||||
|
||||
# Run image
|
||||
FROM swift:5.10-slim
|
||||
WORKDIR /run
|
||||
COPY --from=build /build/.build/release/dewpoint-controller /run
|
||||
CMD ["/bin/bash", "-xc", "./dewpoint-controller"]
|
||||
43
Makefile
43
Makefile
@@ -1,30 +1,39 @@
|
||||
DOCKER_IMAGE_NAME?="swift-mqtt-dewpoint"
|
||||
DOCKER_TAG_NAME?="latest"
|
||||
|
||||
bootstrap-env:
|
||||
.PHONY: bootstrap
|
||||
bootstrap:
|
||||
@cp Bootstrap/dewPoint-env-example .dewPoint-env
|
||||
|
||||
bootstrap-topics:
|
||||
@cp Bootstrap/topics-example .topics
|
||||
|
||||
bootstrap: bootstrap-env bootstrap-topics
|
||||
|
||||
.PHONY: build
|
||||
build:
|
||||
@swift build -Xswiftc -strict-concurrency=complete
|
||||
|
||||
.PHONY: build-docker
|
||||
build-docker:
|
||||
@docker build \
|
||||
--file docker/Dockerfile \
|
||||
--tag "${DOCKER_IMAGE_NAME}:${DOCKER_TAG_NAME}" .
|
||||
|
||||
.PHONY: clean
|
||||
clean:
|
||||
rm -rf .build
|
||||
|
||||
.PHONY: run
|
||||
run:
|
||||
@swift run dewPoint-controller
|
||||
|
||||
start-mosquitto:
|
||||
@docker-compose start mosquitto
|
||||
|
||||
stop-mosquitto:
|
||||
@docker-compose rm -f mosquitto || true
|
||||
@swift run dewpoint-controller
|
||||
|
||||
.PHONY: test-docker
|
||||
test-docker:
|
||||
@docker-compose run --build --remove-orphans -i --rm test
|
||||
@docker-compose kill mosquitto-test
|
||||
@docker-compose rm -f
|
||||
@docker compose --file docker/docker-compose-test.yaml \
|
||||
run --build --remove-orphans -i --rm test
|
||||
@docker compose --file docker/docker-compose-test.yaml down
|
||||
|
||||
test: test-docker
|
||||
.PHONY: start-mosquitto
|
||||
start-mosquitto:
|
||||
@docker compose --file docker/docker-compose.yaml \
|
||||
up -d mosquitto
|
||||
|
||||
.PHONY: test-swift
|
||||
test-swift: start-mosquitto
|
||||
@swift test --enable-code-coverage
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"originHash" : "c2538e3229d6c80f3d6a979c2f3605d63b4973e1e49786819017473eb2916f4e",
|
||||
"originHash" : "486be5d69e4f0ba7b9f42046df31a727c7e394e4ecfae5671e1b194bed7c9e9b",
|
||||
"pins" : [
|
||||
{
|
||||
"identity" : "combine-schedulers",
|
||||
@@ -10,6 +10,15 @@
|
||||
"version" : "1.0.2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "dotenv",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/swiftpackages/DotEnv.git",
|
||||
"state" : {
|
||||
"revision" : "1f15bb9de727d694af1d003a1a5d7a553752850f",
|
||||
"version" : "3.0.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "mqtt-nio",
|
||||
"kind" : "remoteSourceControl",
|
||||
@@ -19,6 +28,15 @@
|
||||
"version" : "2.11.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-argument-parser",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-argument-parser.git",
|
||||
"state" : {
|
||||
"revision" : "41982a3656a71c768319979febd796c6fd111d5c",
|
||||
"version" : "1.5.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-async-algorithms",
|
||||
"kind" : "remoteSourceControl",
|
||||
@@ -60,8 +78,17 @@
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/swift-concurrency-extras",
|
||||
"state" : {
|
||||
"revision" : "6054df64b55186f08b6d0fd87152081b8ad8d613",
|
||||
"version" : "1.2.0"
|
||||
"revision" : "163409ef7dae9d960b87f34b51587b6609a76c1f",
|
||||
"version" : "1.3.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-custom-dump",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/pointfreeco/swift-custom-dump",
|
||||
"state" : {
|
||||
"revision" : "82645ec760917961cfa08c9c0c7104a57a0fa4b1",
|
||||
"version" : "1.3.3"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@@ -13,33 +13,44 @@ let package = Package(
|
||||
.macOS(.v14)
|
||||
],
|
||||
products: [
|
||||
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
|
||||
.executable(name: "dewpoint-controller", targets: ["DewPointController"]),
|
||||
.library(name: "CliClient", targets: ["CliClient"]),
|
||||
.library(name: "Models", targets: ["Models"]),
|
||||
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
|
||||
.library(name: "MQTTManager", targets: ["MQTTManager"]),
|
||||
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
|
||||
.library(name: "SensorsService", targets: ["SensorsService"]),
|
||||
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
|
||||
.library(name: "SensorsService", targets: ["SensorsService"])
|
||||
],
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.3.0"),
|
||||
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
|
||||
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
|
||||
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
|
||||
.package(url: "https://github.com/swiftpackages/DotEnv.git", from: "3.0.0"),
|
||||
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"),
|
||||
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.0.0"),
|
||||
.package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"),
|
||||
.package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"),
|
||||
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0")
|
||||
],
|
||||
targets: [
|
||||
.executableTarget(
|
||||
name: "dewpoint-controller",
|
||||
.target(
|
||||
name: "CliClient",
|
||||
dependencies: [
|
||||
"Models",
|
||||
"MQTTConnectionManager",
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||
.product(name: "DotEnv", package: "DotEnv"),
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||
]
|
||||
),
|
||||
.executableTarget(
|
||||
name: "DewPointController",
|
||||
dependencies: [
|
||||
"CliClient",
|
||||
"MQTTConnectionService",
|
||||
"SensorsService",
|
||||
"TopicDependencies",
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
.product(name: "NIO", package: "swift-nio"),
|
||||
.product(name: "ArgumentParser", package: "swift-argument-parser"),
|
||||
.product(name: "CustomDump", package: "swift-custom-dump"),
|
||||
// .product(name: "DotEnv", package: "DotEnv"),
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||
]
|
||||
),
|
||||
@@ -52,7 +63,7 @@ let package = Package(
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.target(
|
||||
name: "MQTTConnectionManager",
|
||||
name: "MQTTManager",
|
||||
dependencies: [
|
||||
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
@@ -65,16 +76,19 @@ let package = Package(
|
||||
name: "MQTTConnectionService",
|
||||
dependencies: [
|
||||
"Models",
|
||||
"MQTTConnectionManager",
|
||||
"MQTTManager",
|
||||
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.testTarget(
|
||||
name: "MQTTConnectionServiceTests",
|
||||
name: "IntegrationTests",
|
||||
dependencies: [
|
||||
"DewPointController",
|
||||
"MQTTConnectionService",
|
||||
"MQTTConnectionManager",
|
||||
"MQTTManager",
|
||||
"SensorsService",
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics"),
|
||||
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
|
||||
]
|
||||
),
|
||||
@@ -82,30 +96,14 @@ let package = Package(
|
||||
name: "SensorsService",
|
||||
dependencies: [
|
||||
"Models",
|
||||
"MQTTConnectionManager",
|
||||
"TopicDependencies",
|
||||
"MQTTManager",
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio"),
|
||||
.product(name: "PsychrometricClient", package: "swift-psychrometrics"),
|
||||
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
),
|
||||
.testTarget(
|
||||
name: "SensorsServiceTests",
|
||||
dependencies: [
|
||||
"SensorsService",
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||
]
|
||||
),
|
||||
.target(
|
||||
name: "TopicDependencies",
|
||||
dependencies: [
|
||||
.product(name: "Dependencies", package: "swift-dependencies"),
|
||||
.product(name: "DependenciesMacros", package: "swift-dependencies"),
|
||||
.product(name: "MQTTNIO", package: "mqtt-nio")
|
||||
],
|
||||
swiftSettings: swiftSettings
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
# dewPoint-controller
|
||||
# dewpoint-controller
|
||||
|
||||
A description of this package.
|
||||

|
||||
|
||||
Listens to an MQTT broker for temperature and humidity sensors and calculates
|
||||
the dew-point temperature and enthalpy for the sensor, then publishes those back
|
||||
to the MQTT broker.
|
||||
|
||||
188
Sources/CliClient/CliClient.swift
Normal file
188
Sources/CliClient/CliClient.swift
Normal file
@@ -0,0 +1,188 @@
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import DotEnv
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
public extension DependencyValues {
|
||||
var cliClient: CliClient {
|
||||
get { self[CliClient.self] }
|
||||
set { self[CliClient.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
@DependencyClient
|
||||
public struct CliClient {
|
||||
|
||||
public var logLevel: @Sendable (EnvVars) -> Logger.Level = { _ in .debug }
|
||||
public var makeEnvVars: @Sendable (EnvVarsRequest) async throws -> EnvVars
|
||||
public var makeClient: @Sendable (ClientRequest) throws -> MQTTClient
|
||||
public var parseMqttClientVersion: @Sendable (String) -> MQTTClient.Version?
|
||||
|
||||
public struct ClientRequest: Sendable {
|
||||
public let envVars: EnvVars
|
||||
public let eventLoopGroup: MultiThreadedEventLoopGroup
|
||||
public let logger: Logger?
|
||||
|
||||
public init(
|
||||
envVars: EnvVars,
|
||||
eventLoopGroup: MultiThreadedEventLoopGroup,
|
||||
logger: Logger?
|
||||
) {
|
||||
self.envVars = envVars
|
||||
self.eventLoopGroup = eventLoopGroup
|
||||
self.logger = logger
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public struct EnvVarsRequest: Sendable {
|
||||
|
||||
public let envFilePath: String?
|
||||
public let logger: Logger?
|
||||
public let mqttClientVersion: String?
|
||||
|
||||
public init(
|
||||
envFilePath: String? = nil,
|
||||
logger: Logger? = nil,
|
||||
version mqttClientVersion: String? = nil
|
||||
) {
|
||||
self.envFilePath = envFilePath
|
||||
self.logger = logger
|
||||
self.mqttClientVersion = mqttClientVersion
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension CliClient: DependencyKey {
|
||||
public static let testValue: CliClient = Self()
|
||||
public static var liveValue: CliClient {
|
||||
Self(
|
||||
logLevel: { Logger.Level.from(environment: $0) },
|
||||
makeEnvVars: {
|
||||
try EnvVars.load(
|
||||
dotEnvFile: $0.envFilePath,
|
||||
logger: $0.logger,
|
||||
version: $0.mqttClientVersion
|
||||
)
|
||||
},
|
||||
makeClient: {
|
||||
MQTTClient(
|
||||
envVars: $0.envVars,
|
||||
eventLoopGroup: $0.eventLoopGroup,
|
||||
logger: $0.logger
|
||||
)
|
||||
},
|
||||
parseMqttClientVersion: { .init(string: $0) }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
extension EnvVars {
|
||||
|
||||
/// Load the `EnvVars` from the environment.
|
||||
///
|
||||
/// - Paramaters:
|
||||
/// - logger: An optional logger to use for debugging.
|
||||
/// - version: A version that is specified from command line, ignoring any environment variable.
|
||||
static func load(
|
||||
dotEnvFile: String?,
|
||||
logger: Logger?,
|
||||
version: String?
|
||||
) throws -> EnvVars {
|
||||
let defaultEnvVars = EnvVars()
|
||||
let encoder = JSONEncoder()
|
||||
let decoder = JSONDecoder()
|
||||
|
||||
if let dotEnvFile {
|
||||
try DotEnv.load(path: dotEnvFile)
|
||||
}
|
||||
|
||||
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
|
||||
.flatMap { try? decoder.decode([String: String].self, from: $0) }
|
||||
?? [:]
|
||||
|
||||
let envVarsDict = defaultEnvDict
|
||||
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
|
||||
|
||||
var envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
|
||||
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
|
||||
?? defaultEnvVars
|
||||
|
||||
if let version {
|
||||
envVars.version = version
|
||||
}
|
||||
|
||||
logger?.debug("Done loading EnvVars...")
|
||||
|
||||
return envVars
|
||||
}
|
||||
}
|
||||
|
||||
@_spi(Internal)
|
||||
public extension MQTTClient {
|
||||
convenience init(
|
||||
envVars: EnvVars,
|
||||
eventLoopGroup: EventLoopGroup,
|
||||
logger: Logger?
|
||||
) {
|
||||
self.init(
|
||||
host: envVars.host,
|
||||
port: envVars.port != nil ? Int(envVars.port!) : nil,
|
||||
identifier: envVars.identifier,
|
||||
eventLoopGroupProvider: .shared(eventLoopGroup),
|
||||
logger: logger,
|
||||
configuration: .init(
|
||||
version: .parseOrDefualt(string: envVars.version),
|
||||
disablePing: false,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTClient.Version {
|
||||
static let `default` = Self.v3_1_1
|
||||
|
||||
static func parseOrDefualt(string: String?) -> Self {
|
||||
guard let string, let value = Self(string: string) else {
|
||||
return .default
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
init?(string: String) {
|
||||
if string.contains("5") {
|
||||
self = .v5_0
|
||||
} else if string.contains("3") {
|
||||
self = .v3_1_1
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@_spi(Internal)
|
||||
public extension Logger.Level {
|
||||
|
||||
/// Parse a `Logger.Level` from the loaded `EnvVars`.
|
||||
static func from(environment envVars: EnvVars) -> Self {
|
||||
// If the log level was set via an environment variable.
|
||||
if let logLevel = envVars.logLevel {
|
||||
return logLevel
|
||||
}
|
||||
// Parse the appEnv to derive an log level.
|
||||
switch envVars.appEnv {
|
||||
case .staging, .development:
|
||||
return .debug
|
||||
case .production:
|
||||
return .info
|
||||
case .testing:
|
||||
return .trace
|
||||
}
|
||||
}
|
||||
}
|
||||
21
Sources/DewPointController/Application.swift
Normal file
21
Sources/DewPointController/Application.swift
Normal file
@@ -0,0 +1,21 @@
|
||||
import ArgumentParser
|
||||
import Dependencies
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionService
|
||||
import MQTTManager
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
import SensorsService
|
||||
import ServiceLifecycle
|
||||
|
||||
@main
|
||||
struct Application: AsyncParsableCommand {
|
||||
static let configuration = CommandConfiguration(
|
||||
commandName: "dewpoint-controller",
|
||||
abstract: "Command for running the dewpoint mqtt service.",
|
||||
subcommands: [Run.self, Debug.self]
|
||||
)
|
||||
}
|
||||
74
Sources/DewPointController/DebugCommand.swift
Normal file
74
Sources/DewPointController/DebugCommand.swift
Normal file
@@ -0,0 +1,74 @@
|
||||
import ArgumentParser
|
||||
import CliClient
|
||||
import CustomDump
|
||||
import Dependencies
|
||||
import DotEnv
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
|
||||
extension Application {
|
||||
|
||||
struct Debug: AsyncParsableCommand {
|
||||
|
||||
static let configuration: CommandConfiguration = .init(
|
||||
commandName: "debug",
|
||||
abstract: "Debug the environment variables."
|
||||
)
|
||||
|
||||
@OptionGroup
|
||||
var shared: SharedOptions
|
||||
|
||||
@Flag(
|
||||
name: [.customLong("show-password")],
|
||||
help: "Don't redact the password from the console."
|
||||
)
|
||||
var showPassword: Bool = false
|
||||
|
||||
mutating func run() async throws {
|
||||
@Dependency(\.cliClient) var client
|
||||
let logger = Logger(label: "debug-command")
|
||||
|
||||
print("--------------------------")
|
||||
print("Running debug command...")
|
||||
if let envFile = shared.envFile {
|
||||
print("Reading env file: \(envFile)")
|
||||
print("--------------------------")
|
||||
} else {
|
||||
print("No env file set.")
|
||||
print("--------------------------")
|
||||
}
|
||||
|
||||
print("Loading EnvVars")
|
||||
print("--------------------------")
|
||||
let envVars = try await client.makeEnvVars(shared.envVarsRequest(logger: logger))
|
||||
printEnvVars(envVars: envVars, showPassword: showPassword)
|
||||
print("--------------------------")
|
||||
|
||||
if let logLevel = shared.logLevel, let level = logLevel() {
|
||||
print("Log Level option: \(level)")
|
||||
print("--------------------------")
|
||||
} else {
|
||||
print("Log Level option: nil")
|
||||
print("--------------------------")
|
||||
}
|
||||
}
|
||||
|
||||
private func printEnvVars(envVars: EnvVars, showPassword: Bool) {
|
||||
// show the proper password to show depending on if it exists
|
||||
// and if we should redact it or not.
|
||||
var passwordString: String?
|
||||
switch (showPassword, envVars.password) {
|
||||
case (true, .none), (_, .none):
|
||||
break
|
||||
case (true, let .some(password)):
|
||||
passwordString = password
|
||||
case (false, .some):
|
||||
passwordString = "<redacted>"
|
||||
}
|
||||
var envVars = envVars
|
||||
envVars.password = passwordString
|
||||
customDump(envVars)
|
||||
}
|
||||
}
|
||||
}
|
||||
86
Sources/DewPointController/RunCommand.swift
Normal file
86
Sources/DewPointController/RunCommand.swift
Normal file
@@ -0,0 +1,86 @@
|
||||
import ArgumentParser
|
||||
import CliClient
|
||||
import Dependencies
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionService
|
||||
import MQTTManager
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
import SensorsService
|
||||
import ServiceLifecycle
|
||||
|
||||
extension Application {
|
||||
/// Run the controller.
|
||||
///
|
||||
struct Run: AsyncParsableCommand {
|
||||
|
||||
static let configuration = CommandConfiguration(
|
||||
commandName: "run",
|
||||
abstract: "Run the controller."
|
||||
)
|
||||
|
||||
@OptionGroup
|
||||
var shared: SharedOptions
|
||||
|
||||
mutating func run() async throws {
|
||||
@Dependency(\.cliClient) var cliClient
|
||||
|
||||
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
var logger = Logger(label: "dewpoint-controller")
|
||||
let mqtt = try await setup(eventLoopGroup: eventloopGroup, logger: &logger)
|
||||
|
||||
do {
|
||||
try await withDependencies {
|
||||
$0.psychrometricClient = .liveValue
|
||||
$0.mqtt = .live(client: mqtt, logger: logger)
|
||||
} operation: {
|
||||
let mqttConnection = MQTTConnectionService(logger: logger)
|
||||
let sensors = SensorsService(sensors: .live, logger: logger)
|
||||
|
||||
var serviceGroupConfiguration = ServiceGroupConfiguration(
|
||||
services: [
|
||||
mqttConnection,
|
||||
sensors
|
||||
],
|
||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||
logger: logger
|
||||
)
|
||||
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
|
||||
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
|
||||
|
||||
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
|
||||
|
||||
logger.info("Starting dewpoint-controller!")
|
||||
try await serviceGroup.run()
|
||||
}
|
||||
|
||||
try await mqtt.shutdown()
|
||||
try await eventloopGroup.shutdownGracefully()
|
||||
} catch {
|
||||
try await eventloopGroup.shutdownGracefully()
|
||||
}
|
||||
}
|
||||
|
||||
private func setup(
|
||||
eventLoopGroup: MultiThreadedEventLoopGroup,
|
||||
logger: inout Logger
|
||||
) async throws -> MQTTClient {
|
||||
@Dependency(\.cliClient) var cliClient
|
||||
|
||||
let environment = try await cliClient.makeEnvVars(shared.envVarsRequest(logger: logger))
|
||||
logger.logLevel = cliClient.logLevel(environment)
|
||||
|
||||
return try cliClient.makeClient(.init(
|
||||
envVars: environment,
|
||||
eventLoopGroup: eventLoopGroup,
|
||||
logger: logger
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
50
Sources/DewPointController/SharedOptions.swift
Normal file
50
Sources/DewPointController/SharedOptions.swift
Normal file
@@ -0,0 +1,50 @@
|
||||
import ArgumentParser
|
||||
import CliClient
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTNIO
|
||||
|
||||
extension Application {
|
||||
|
||||
struct SharedOptions: ParsableArguments {
|
||||
@Option(
|
||||
name: [.short, .customLong("env-file")],
|
||||
help: "A file path to an env file."
|
||||
)
|
||||
var envFile: String?
|
||||
|
||||
@Option(
|
||||
name: [.short, .customLong("log-level")],
|
||||
help: "Set the logging level."
|
||||
)
|
||||
var logLevel: LogLevelContainer?
|
||||
|
||||
@Option(
|
||||
name: [.short, .long],
|
||||
help: "Set the MQTT connecition version."
|
||||
)
|
||||
var version: String?
|
||||
|
||||
func envVarsRequest(logger: Logger?) -> CliClient.EnvVarsRequest {
|
||||
.init(envFilePath: envFile, logger: logger, version: version)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/// A container type for making `Logger.Level` into a type
|
||||
/// that can be parsed as a command line argument. This is
|
||||
/// to suppress warnings vs. having `Logger.Level` adopt the
|
||||
/// protocol.
|
||||
@_spi(Internal)
|
||||
public struct LogLevelContainer: ExpressibleByArgument {
|
||||
public let logLevel: Logger.Level?
|
||||
|
||||
public init?(argument: String) {
|
||||
self.logLevel = .init(rawValue: argument.lowercased())
|
||||
}
|
||||
|
||||
public func callAsFunction() -> Logger.Level? {
|
||||
logLevel
|
||||
}
|
||||
}
|
||||
10
Sources/DewPointController/Utils.swift
Normal file
10
Sources/DewPointController/Utils.swift
Normal file
@@ -0,0 +1,10 @@
|
||||
import Models
|
||||
|
||||
@_spi(Internal)
|
||||
public extension Array where Element == TemperatureAndHumiditySensor {
|
||||
static var live: Self {
|
||||
TemperatureAndHumiditySensor.Location.allCases.map { location in
|
||||
TemperatureAndHumiditySensor(location: location)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,221 +0,0 @@
|
||||
import AsyncAlgorithms
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import Foundation
|
||||
import Logging
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
public extension DependencyValues {
|
||||
|
||||
/// A dependency that is responsible for managing the connection to
|
||||
/// an MQTT broker.
|
||||
var mqttConnectionManager: MQTTConnectionManager {
|
||||
get { self[MQTTConnectionManager.self] }
|
||||
set { self[MQTTConnectionManager.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the interface needed for the ``MQTTConnectionService``.
|
||||
///
|
||||
/// See ``MQTTConnectionManagerLive`` module for live implementation.
|
||||
@DependencyClient
|
||||
public struct MQTTConnectionManager: Sendable {
|
||||
|
||||
/// Connect to the MQTT broker.
|
||||
public var connect: @Sendable () async throws -> Void
|
||||
|
||||
/// Shutdown the connection to the MQTT broker.
|
||||
///
|
||||
/// - Note: You should cancel any tasks that are listening to the connection stream first.
|
||||
public var shutdown: @Sendable () -> Void
|
||||
|
||||
/// Create a stream of connection events.
|
||||
public var stream: @Sendable () throws -> AsyncStream<Event>
|
||||
|
||||
/// Perform an operation with the underlying MQTTClient, this can be useful in
|
||||
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
|
||||
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
|
||||
|
||||
@_spi(Internal)
|
||||
public func withClient(
|
||||
_ callback: @Sendable (MQTTClient) async throws -> Void
|
||||
) async throws {
|
||||
try await _withClient(callback)
|
||||
}
|
||||
|
||||
/// Represents connection events that clients can listen for and
|
||||
/// react accordingly.
|
||||
public enum Event: Sendable {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
public static func live(
|
||||
client: MQTTClient,
|
||||
cleanSession: Bool = false,
|
||||
logger: Logger? = nil,
|
||||
alwaysReconnect: Bool = true
|
||||
) -> Self {
|
||||
let manager = ConnectionManager(
|
||||
client: client,
|
||||
logger: logger,
|
||||
alwaysReconnect: alwaysReconnect
|
||||
)
|
||||
return .init {
|
||||
try await manager.connect(cleanSession: cleanSession)
|
||||
} shutdown: {
|
||||
manager.shutdown()
|
||||
} stream: {
|
||||
MQTTConnectionStream(client: client, logger: logger)
|
||||
.start()
|
||||
.removeDuplicates()
|
||||
.eraseToStream()
|
||||
} _withClient: { callback in
|
||||
try await callback(client)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTConnectionManager: TestDependencyKey {
|
||||
public static var testValue: MQTTConnectionManager {
|
||||
Self()
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
@_spi(Internal)
|
||||
public final actor MQTTConnectionStream: Sendable {
|
||||
|
||||
public typealias Element = MQTTConnectionManager.Event
|
||||
|
||||
private let client: MQTTClient
|
||||
private let continuation: AsyncStream<Element>.Continuation
|
||||
private let logger: Logger?
|
||||
private let name: String
|
||||
private let stream: AsyncStream<Element>
|
||||
private var isShuttingDown = false
|
||||
|
||||
public init(client: MQTTClient, logger: Logger?) {
|
||||
let (stream, continuation) = AsyncStream<Element>.makeStream()
|
||||
self.client = client
|
||||
self.continuation = continuation
|
||||
self.logger = logger
|
||||
self.name = UUID().uuidString
|
||||
self.stream = stream
|
||||
}
|
||||
|
||||
deinit { stop() }
|
||||
|
||||
public nonisolated func start() -> AsyncStream<Element> {
|
||||
// Check if the client is active and yield the initial result.
|
||||
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||
|
||||
// Continually check if the client is active.
|
||||
let task = Task {
|
||||
let isShuttingDown = await self.isShuttingDown
|
||||
while !Task.isCancelled, !isShuttingDown {
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||
}
|
||||
}
|
||||
|
||||
// Register listener on the client for when the connection
|
||||
// closes.
|
||||
client.addCloseListener(named: name) { _ in
|
||||
self.logger?.trace("Client has disconnected.")
|
||||
self.continuation.yield(.disconnected)
|
||||
}
|
||||
|
||||
// Register listener on the client for when the client
|
||||
// is shutdown.
|
||||
client.addShutdownListener(named: name) { _ in
|
||||
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
|
||||
self.continuation.yield(.shuttingDown)
|
||||
Task { await self.setIsShuttingDown() }
|
||||
task.cancel()
|
||||
self.stop()
|
||||
}
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
private func setIsShuttingDown() {
|
||||
isShuttingDown = true
|
||||
}
|
||||
|
||||
public nonisolated func stop() {
|
||||
client.removeCloseListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
continuation.finish()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
actor ConnectionManager {
|
||||
private let client: MQTTClient
|
||||
private let logger: Logger?
|
||||
private let name: String
|
||||
private let shouldReconnect: Bool
|
||||
private var hasConnected: Bool = false
|
||||
|
||||
init(
|
||||
client: MQTTClient,
|
||||
logger: Logger?,
|
||||
alwaysReconnect: Bool
|
||||
) {
|
||||
self.client = client
|
||||
self.logger = logger
|
||||
self.name = UUID().uuidString
|
||||
self.shouldReconnect = alwaysReconnect
|
||||
}
|
||||
|
||||
deinit {
|
||||
// We should've already logged that we're shutting down if
|
||||
// the manager was shutdown properly, so don't log it twice.
|
||||
self.shutdown(withLogging: false)
|
||||
}
|
||||
|
||||
private func setHasConnected() {
|
||||
hasConnected = true
|
||||
}
|
||||
|
||||
func connect(
|
||||
cleanSession: Bool
|
||||
) async throws {
|
||||
guard !hasConnected else { return }
|
||||
do {
|
||||
try await client.connect(cleanSession: cleanSession)
|
||||
setHasConnected()
|
||||
|
||||
client.addCloseListener(named: name) { [weak self] _ in
|
||||
guard let `self` else { return }
|
||||
self.logger?.debug("Connection closed.")
|
||||
if self.shouldReconnect {
|
||||
self.logger?.debug("Reconnecting...")
|
||||
Task {
|
||||
try await self.connect(cleanSession: cleanSession)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.addShutdownListener(named: name) { _ in
|
||||
self.shutdown()
|
||||
}
|
||||
|
||||
} catch {
|
||||
logger?.trace("Failed to connect: \(error)")
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
nonisolated func shutdown(withLogging: Bool = true) {
|
||||
if withLogging {
|
||||
logger?.trace("Shutting down connection.")
|
||||
}
|
||||
client.removeCloseListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
}
|
||||
}
|
||||
@@ -1,17 +1,18 @@
|
||||
import Dependencies
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionManager
|
||||
import MQTTManager
|
||||
import ServiceLifecycle
|
||||
|
||||
public actor MQTTConnectionService: Service {
|
||||
@Dependency(\.mqttConnectionManager) var manager
|
||||
public struct MQTTConnectionService: Service {
|
||||
|
||||
private nonisolated let logger: Logger?
|
||||
private let logger: Logger?
|
||||
|
||||
public init(
|
||||
logger: Logger? = nil
|
||||
) {
|
||||
var logger = logger
|
||||
logger?[metadataKey: "type"] = "mqtt-connection-service"
|
||||
self.logger = logger
|
||||
}
|
||||
|
||||
@@ -19,19 +20,19 @@ public actor MQTTConnectionService: Service {
|
||||
/// to the MQTT broker and handles graceful shutdown of the
|
||||
/// connection.
|
||||
public func run() async throws {
|
||||
@Dependency(\.mqtt) var mqtt
|
||||
try await mqtt.connect()
|
||||
|
||||
try await withGracefulShutdownHandler {
|
||||
try await manager.connect()
|
||||
for await event in try manager.stream().cancelOnGracefulShutdown() {
|
||||
for await event in try mqtt.connectionStream().cancelOnGracefulShutdown() {
|
||||
// We don't really need to do anything with the events, so just logging
|
||||
// for now. But we need to iterate on an async stream for the service to
|
||||
// continue to run and handle graceful shutdowns.
|
||||
logger?.trace("Received connection event: \(event)")
|
||||
}
|
||||
// when we reach here we are shutting down, so we shutdown
|
||||
// the manager.
|
||||
manager.shutdown()
|
||||
} onGracefulShutdown: {
|
||||
self.logger?.trace("Received graceful shutdown.")
|
||||
mqtt.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
221
Sources/MQTTManager/Interface.swift
Normal file
221
Sources/MQTTManager/Interface.swift
Normal file
@@ -0,0 +1,221 @@
|
||||
import AsyncAlgorithms
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import Foundation
|
||||
import Logging
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
public extension DependencyValues {
|
||||
|
||||
/// A dependency that is responsible for managing the connection to
|
||||
/// an MQTT broker, listen to topics, and publish values back to the
|
||||
/// broker.
|
||||
var mqtt: MQTTManager {
|
||||
get { self[MQTTManager.self] }
|
||||
set { self[MQTTManager.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the interface needed to connect, listen, and publish to an MQTT broker.
|
||||
///
|
||||
@DependencyClient
|
||||
public struct MQTTManager: Sendable {
|
||||
|
||||
public typealias ListenStream = AsyncStream<MQTTPublishInfo>
|
||||
|
||||
/// Connect to the MQTT broker.
|
||||
public var connect: @Sendable () async throws -> Void
|
||||
|
||||
/// Create a stream of connection events.
|
||||
///
|
||||
/// - SeeAlso: ``Event``
|
||||
public var connectionStream: @Sendable () throws -> AsyncStream<Event>
|
||||
|
||||
private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream
|
||||
|
||||
/// Publish a value to the MQTT broker for a given topic.
|
||||
public var publish: @Sendable (PublishRequest) async throws -> Void
|
||||
|
||||
/// Shutdown the connection to the MQTT broker.
|
||||
public var shutdown: @Sendable () -> Void
|
||||
|
||||
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
|
||||
|
||||
public init(
|
||||
connect: @escaping @Sendable () async throws -> Void,
|
||||
connectionStream: @escaping @Sendable () throws -> AsyncStream<MQTTManager.Event>,
|
||||
listen: @escaping @Sendable ([String], MQTTQoS) async throws -> MQTTManager.ListenStream,
|
||||
publish: @escaping @Sendable (MQTTManager.PublishRequest) async throws -> Void,
|
||||
shutdown: @escaping @Sendable () -> Void,
|
||||
withClient: @escaping @Sendable ((MQTTClient) async throws -> Void) async throws -> Void = { _ in unimplemented() }
|
||||
) {
|
||||
self.connect = connect
|
||||
self.connectionStream = connectionStream
|
||||
self._listen = listen
|
||||
self.publish = publish
|
||||
self.shutdown = shutdown
|
||||
self._withClient = withClient
|
||||
}
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topics: The topics to listen for changes to.
|
||||
/// - qos: The MQTTQoS for the subscription.
|
||||
public func listen(
|
||||
to topics: [String],
|
||||
qos: MQTTQoS = .atLeastOnce
|
||||
) async throws -> ListenStream {
|
||||
try await _listen(topics, qos)
|
||||
}
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topics: The topics to listen for changes to.
|
||||
/// - qos: The MQTTQoS for the subscription.
|
||||
public func listen(
|
||||
_ topics: String...,
|
||||
qos: MQTTQoS = .atLeastOnce
|
||||
) async throws -> ListenStream {
|
||||
try await listen(to: topics, qos: qos)
|
||||
}
|
||||
|
||||
/// Publish a new value to the given topic.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - payload: The value to publish.
|
||||
/// - topicName: The topic to publish the new value to.
|
||||
/// - qos: The MQTTQoS.
|
||||
/// - retain: The retain flag.
|
||||
public func publish(
|
||||
_ payload: ByteBuffer,
|
||||
to topicName: String,
|
||||
qos: MQTTQoS,
|
||||
retain: Bool = false,
|
||||
properties: MQTTProperties = .init()
|
||||
) async throws {
|
||||
try await publish(.init(
|
||||
topicName: topicName,
|
||||
payload: payload,
|
||||
qos: qos,
|
||||
retain: retain,
|
||||
properties: properties
|
||||
))
|
||||
}
|
||||
|
||||
/// Perform an operation with the underlying MQTTClient, this can be useful in
|
||||
/// tests, so this module needs imported with `@_spi(Internal) import MQTTManager` to use this method.
|
||||
@_spi(Internal)
|
||||
public func withClient(
|
||||
_ callback: @Sendable (MQTTClient) async throws -> Void
|
||||
) async throws {
|
||||
try await _withClient(callback)
|
||||
}
|
||||
|
||||
/// Represents connection events that clients can listen for and
|
||||
/// react accordingly.
|
||||
public enum Event: Equatable, Sendable {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
/// Represents the parameters required to publish a new value to the
|
||||
/// MQTT broker.
|
||||
public struct PublishRequest: Sendable {
|
||||
|
||||
/// The topic to publish the new value to.
|
||||
public let topicName: String
|
||||
|
||||
/// The value to publish.
|
||||
public let payload: ByteBuffer
|
||||
|
||||
/// The qos of the request.
|
||||
public let qos: MQTTQoS
|
||||
|
||||
/// The retain flag for the request.
|
||||
public let retain: Bool
|
||||
|
||||
public let properties: MQTTProperties
|
||||
|
||||
/// Create a new publish request.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topicName: The topic to publish to.
|
||||
/// - payload: The value to publish.
|
||||
/// - qos: The qos of the request.
|
||||
/// - retain: The retain flag of the request.
|
||||
public init(
|
||||
topicName: String,
|
||||
payload: ByteBuffer,
|
||||
qos: MQTTQoS,
|
||||
retain: Bool,
|
||||
properties: MQTTProperties
|
||||
) {
|
||||
self.topicName = topicName
|
||||
self.payload = payload
|
||||
self.qos = qos
|
||||
self.retain = retain
|
||||
self.properties = properties
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public extension MQTTManager {
|
||||
/// Create the live manager.
|
||||
///
|
||||
static func live(
|
||||
client: MQTTClient,
|
||||
cleanSession: Bool = false,
|
||||
logger: Logger? = nil,
|
||||
alwaysReconnect: Bool = true
|
||||
) -> Self {
|
||||
let manager = ConnectionManager(
|
||||
client: client,
|
||||
logger: logger,
|
||||
alwaysReconnect: alwaysReconnect
|
||||
)
|
||||
return .init(
|
||||
connect: { try await manager.connect(cleanSession: cleanSession) },
|
||||
connectionStream: {
|
||||
MQTTConnectionStream(client: client, logger: logger)
|
||||
.start()
|
||||
.removeDuplicates()
|
||||
.eraseToStream()
|
||||
},
|
||||
listen: { topics, qos in
|
||||
try await manager.listen(to: topics, qos: qos)
|
||||
},
|
||||
publish: { request in
|
||||
let topic = request.topicName
|
||||
guard client.isActive() else {
|
||||
logger?.debug("Client is not active, unable to publish to topic: \(topic)")
|
||||
return
|
||||
}
|
||||
logger?.trace("Begin publishing to topic: \(topic)")
|
||||
defer { logger?.debug("Done publishing to topic: \(topic)") }
|
||||
try await client.publish(
|
||||
to: request.topicName,
|
||||
payload: request.payload,
|
||||
qos: request.qos,
|
||||
retain: request.retain,
|
||||
properties: request.properties
|
||||
).get()
|
||||
},
|
||||
shutdown: {
|
||||
Task { try await client.shutdown() }
|
||||
manager.shutdown()
|
||||
},
|
||||
withClient: { callback in
|
||||
try await callback(client)
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
extension MQTTManager: TestDependencyKey {
|
||||
public static let testValue: MQTTManager = Self()
|
||||
}
|
||||
98
Sources/MQTTManager/Internal/ConnectionManager.swift
Normal file
98
Sources/MQTTManager/Internal/ConnectionManager.swift
Normal file
@@ -0,0 +1,98 @@
|
||||
import Foundation
|
||||
import Logging
|
||||
import MQTTNIO
|
||||
|
||||
actor ConnectionManager {
|
||||
private let client: MQTTClient
|
||||
private let logger: Logger?
|
||||
private let name: String
|
||||
private let shouldReconnect: Bool
|
||||
private var hasConnected: Bool = false
|
||||
private var listeners: [TopicListenerStream] = []
|
||||
private var isShuttingDown = false
|
||||
|
||||
init(
|
||||
client: MQTTClient,
|
||||
logger: Logger?,
|
||||
alwaysReconnect: Bool
|
||||
) {
|
||||
var logger = logger
|
||||
logger?[metadataKey: "instance"] = "\(Self.self)"
|
||||
self.logger = logger
|
||||
|
||||
self.client = client
|
||||
self.name = UUID().uuidString
|
||||
self.shouldReconnect = alwaysReconnect
|
||||
}
|
||||
|
||||
deinit {
|
||||
if !isShuttingDown {
|
||||
let message = """
|
||||
Did not properly close the connection manager. This can lead to
|
||||
dangling references.
|
||||
|
||||
Please call `shutdown` to properly close all connections and listener streams.
|
||||
"""
|
||||
logger?.warning("\(message)")
|
||||
self.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
private func setHasConnected() {
|
||||
hasConnected = true
|
||||
}
|
||||
|
||||
func listen(
|
||||
to topics: [String],
|
||||
qos: MQTTQoS
|
||||
) async throws -> MQTTManager.ListenStream {
|
||||
let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos)
|
||||
listeners.append(listener)
|
||||
await listener.start()
|
||||
return listener.stream
|
||||
}
|
||||
|
||||
func connect(
|
||||
cleanSession: Bool
|
||||
) async throws {
|
||||
guard !hasConnected else { return }
|
||||
do {
|
||||
try await client.connect(cleanSession: cleanSession)
|
||||
setHasConnected()
|
||||
|
||||
client.addCloseListener(named: name) { [weak self] _ in
|
||||
guard let `self` else { return }
|
||||
self.logger?.debug("Connection closed.")
|
||||
if self.shouldReconnect {
|
||||
self.logger?.debug("Reconnecting...")
|
||||
Task {
|
||||
try await self.connect(cleanSession: cleanSession)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.addShutdownListener(named: name) { _ in
|
||||
self.shutdown()
|
||||
}
|
||||
|
||||
} catch {
|
||||
logger?.trace("Failed to connect: \(error)")
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func shutdownListeners() {
|
||||
_ = listeners.map { $0.shutdown() }
|
||||
listeners = []
|
||||
isShuttingDown = true
|
||||
}
|
||||
|
||||
nonisolated func shutdown(withLogging: Bool = true) {
|
||||
if withLogging {
|
||||
logger?.trace("Shutting down connection.")
|
||||
}
|
||||
client.removeCloseListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
Task { await shutdownListeners() }
|
||||
}
|
||||
}
|
||||
74
Sources/MQTTManager/Internal/ConnectionStream.swift
Normal file
74
Sources/MQTTManager/Internal/ConnectionStream.swift
Normal file
@@ -0,0 +1,74 @@
|
||||
import Foundation
|
||||
import Logging
|
||||
import MQTTNIO
|
||||
|
||||
@_spi(Internal)
|
||||
public actor MQTTConnectionStream: Sendable {
|
||||
|
||||
public typealias Element = MQTTManager.Event
|
||||
|
||||
private let client: MQTTClient
|
||||
private let continuation: AsyncStream<Element>.Continuation
|
||||
private let logger: Logger?
|
||||
nonisolated let name: String
|
||||
private let stream: AsyncStream<Element>
|
||||
private var isShuttingDown = false
|
||||
|
||||
public init(client: MQTTClient, logger: Logger?) {
|
||||
var logger = logger
|
||||
logger?[metadataKey: "type"] = "\(Self.self)"
|
||||
self.logger = logger
|
||||
|
||||
let (stream, continuation) = AsyncStream<Element>.makeStream()
|
||||
self.client = client
|
||||
self.continuation = continuation
|
||||
self.name = UUID().uuidString
|
||||
self.stream = stream
|
||||
}
|
||||
|
||||
deinit { stop() }
|
||||
|
||||
public nonisolated func start() -> AsyncStream<Element> {
|
||||
// Check if the client is active and yield the initial result.
|
||||
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||
|
||||
// Continually check if the client is active.
|
||||
let task = Task {
|
||||
let isShuttingDown = await self.isShuttingDown
|
||||
while !Task.isCancelled, !isShuttingDown {
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
continuation.yield(client.isActive() ? .connected : .disconnected)
|
||||
}
|
||||
}
|
||||
|
||||
// Register listener on the client for when the connection
|
||||
// closes.
|
||||
client.addCloseListener(named: name) { _ in
|
||||
self.logger?.trace("Client has disconnected.")
|
||||
self.continuation.yield(.disconnected)
|
||||
}
|
||||
|
||||
// Register listener on the client for when the client
|
||||
// is shutdown.
|
||||
client.addShutdownListener(named: name) { _ in
|
||||
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
|
||||
self.continuation.yield(.shuttingDown)
|
||||
Task { await self.setIsShuttingDown() }
|
||||
task.cancel()
|
||||
self.stop()
|
||||
}
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
private func setIsShuttingDown() {
|
||||
isShuttingDown = true
|
||||
}
|
||||
|
||||
public nonisolated func stop() {
|
||||
client.removeCloseListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
continuation.finish()
|
||||
}
|
||||
|
||||
}
|
||||
141
Sources/MQTTManager/Internal/TopicListenerStream.swift
Normal file
141
Sources/MQTTManager/Internal/TopicListenerStream.swift
Normal file
@@ -0,0 +1,141 @@
|
||||
import Foundation
|
||||
import Logging
|
||||
import MQTTNIO
|
||||
|
||||
actor TopicListenerStream {
|
||||
|
||||
typealias Stream = MQTTManager.ListenStream
|
||||
|
||||
private let client: MQTTClient
|
||||
private let configuration: Configuration
|
||||
private let continuation: Stream.Continuation
|
||||
private let logger: Logger?
|
||||
private let name: String
|
||||
let stream: Stream
|
||||
private var shuttingDown: Bool = false
|
||||
private var onShutdownHandler: (@Sendable () -> Void)?
|
||||
|
||||
init(
|
||||
client: MQTTClient,
|
||||
logger: Logger?,
|
||||
topics: [String],
|
||||
qos: MQTTQoS
|
||||
) {
|
||||
// Setup the logger so we can more easily decipher log messages.
|
||||
var logger = logger
|
||||
logger?[metadataKey: "type"] = "\(Self.self)"
|
||||
self.logger = logger
|
||||
|
||||
let (stream, continuation) = Stream.makeStream()
|
||||
self.client = client
|
||||
self.configuration = .init(qos: qos, topics: topics)
|
||||
self.continuation = continuation
|
||||
self.name = UUID().uuidString
|
||||
self.stream = stream
|
||||
}
|
||||
|
||||
struct Configuration: Sendable {
|
||||
let qos: MQTTQoS
|
||||
let topics: [String]
|
||||
}
|
||||
|
||||
deinit {
|
||||
if !shuttingDown {
|
||||
let message = """
|
||||
Shutdown was not called on topic listener. This could lead to potential errors or
|
||||
the stream never ending.
|
||||
|
||||
Please ensure that you call shutdown on the listener.
|
||||
"""
|
||||
client.logger.warning("\(message)")
|
||||
continuation.finish()
|
||||
}
|
||||
client.removePublishListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
}
|
||||
|
||||
private func subscribe() async throws {
|
||||
guard !shuttingDown else { return }
|
||||
logger?.debug("Begin subscribing to topics.")
|
||||
do {
|
||||
_ = try await client.subscribe(to: configuration.topics.map {
|
||||
MQTTSubscribeInfo(topicFilter: $0, qos: configuration.qos)
|
||||
})
|
||||
} catch {
|
||||
logger?.error("Received error while subscribing to topics: \(configuration.topics)")
|
||||
throw TopicListenerError.failedToSubscribe
|
||||
}
|
||||
logger?.debug("Done subscribing to topics.")
|
||||
}
|
||||
|
||||
public func start() {
|
||||
logger?.trace("Starting listener for topics: \(configuration.topics)")
|
||||
|
||||
let stream = MQTTConnectionStream(client: client, logger: logger)
|
||||
.start()
|
||||
.removeDuplicates()
|
||||
.eraseToStream()
|
||||
|
||||
let task = Task {
|
||||
// Listen for connection events to restablish the stream upon a
|
||||
// client becoming disconnected / reconnected, and properly shutdown
|
||||
// the stream on the client being shutdown.
|
||||
for await event in stream {
|
||||
logger?.trace("Received event: \(event)")
|
||||
switch event {
|
||||
case .shuttingDown:
|
||||
shutdown()
|
||||
case .disconnected:
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
case .connected:
|
||||
try await subscribe()
|
||||
client.addPublishListener(named: name) { result in
|
||||
switch result {
|
||||
case let .failure(error):
|
||||
self.logger?.error("Received error while listening: \(error)")
|
||||
case let .success(publishInfo):
|
||||
if self.configuration.topics.contains(publishInfo.topicName) {
|
||||
self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)")
|
||||
self.continuation.yield(publishInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
onShutdownHandler = { task.cancel() }
|
||||
}
|
||||
|
||||
private func setIsShuttingDown() {
|
||||
shuttingDown = true
|
||||
onShutdownHandler = nil
|
||||
}
|
||||
|
||||
public nonisolated func shutdown() {
|
||||
client.logger.trace("Closing topic listener...")
|
||||
continuation.finish()
|
||||
client.removePublishListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
Task {
|
||||
await onShutdownHandler?()
|
||||
await self.setIsShuttingDown()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// MARK: - Errors
|
||||
|
||||
public enum TopicListenerError: Error {
|
||||
case connectionTimeout
|
||||
case failedToSubscribe
|
||||
}
|
||||
|
||||
public struct MQTTListenResultError: Error {
|
||||
let underlyingError: any Error
|
||||
|
||||
init(_ underlyingError: any Error) {
|
||||
self.underlyingError = underlyingError
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,12 @@ public struct EnvVars: Codable, Equatable, Sendable {
|
||||
/// The MQTT user password.
|
||||
public var password: String?
|
||||
|
||||
/// Set a custom logging level.
|
||||
public var logLevel: Logger.Level?
|
||||
|
||||
/// Set the mqtt broker version.
|
||||
public var version: String?
|
||||
|
||||
/// Create a new ``EnvVars``
|
||||
///
|
||||
/// - Parameters:
|
||||
@@ -38,9 +44,11 @@ public struct EnvVars: Codable, Equatable, Sendable {
|
||||
appEnv: AppEnv = .development,
|
||||
host: String = "127.0.0.1",
|
||||
port: String? = "1883",
|
||||
identifier: String = "dewPoint-controller",
|
||||
identifier: String = "dewpoint-controller",
|
||||
userName: String? = "mqtt_user",
|
||||
password: String? = "secret!"
|
||||
password: String? = "secret!",
|
||||
logLevel: Logger.Level? = nil,
|
||||
version: String? = nil
|
||||
) {
|
||||
self.appEnv = appEnv
|
||||
self.host = host
|
||||
@@ -48,6 +56,8 @@ public struct EnvVars: Codable, Equatable, Sendable {
|
||||
self.identifier = identifier
|
||||
self.userName = userName
|
||||
self.password = password
|
||||
self.logLevel = logLevel
|
||||
self.version = version
|
||||
}
|
||||
|
||||
/// Custom coding keys.
|
||||
@@ -58,6 +68,8 @@ public struct EnvVars: Codable, Equatable, Sendable {
|
||||
case identifier = "MQTT_IDENTIFIER"
|
||||
case userName = "MQTT_USERNAME"
|
||||
case password = "MQTT_PASSWORD"
|
||||
case logLevel = "LOG_LEVEL"
|
||||
case version = "MQTT_VERSION"
|
||||
}
|
||||
|
||||
/// Represents the different app environments.
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
import NIO
|
||||
import NIOFoundationCompat
|
||||
import PsychrometricClient
|
||||
|
||||
/// Represents a type that can be initialized by a ``ByteBuffer``.
|
||||
protocol BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer)
|
||||
}
|
||||
|
||||
extension Double: BufferInitalizable {
|
||||
|
||||
/// Attempt to create / parse a double from a byte buffer.
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let string = buffer.readString(
|
||||
length: buffer.readableBytes,
|
||||
encoding: String.Encoding.utf8
|
||||
)
|
||||
else { return nil }
|
||||
self.init(string)
|
||||
}
|
||||
}
|
||||
|
||||
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = RawValue(buffer: &buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
extension Humidity<Relative>: BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = Double(buffer: &buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
extension Temperature<DryAir>: BufferInitalizable {
|
||||
init?(buffer: inout ByteBuffer) {
|
||||
guard let value = Double(buffer: &buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,11 @@ import DependenciesMacros
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionManager
|
||||
import MQTTManager
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClient
|
||||
import ServiceLifecycle
|
||||
import TopicDependencies
|
||||
|
||||
/// Service that is responsible for listening to changes of the temperature and humidity
|
||||
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for
|
||||
@@ -17,9 +16,7 @@ import TopicDependencies
|
||||
///
|
||||
public actor SensorsService: Service {
|
||||
|
||||
@Dependency(\.mqttConnectionManager.stream) var connectionStream
|
||||
@Dependency(\.topicListener) var topicListener
|
||||
@Dependency(\.topicPublisher) var topicPublisher
|
||||
@Dependency(\.mqtt) var mqtt
|
||||
|
||||
/// The logger to use for the service.
|
||||
private let logger: Logger?
|
||||
@@ -27,9 +24,9 @@ public actor SensorsService: Service {
|
||||
/// The sensors that we are listening for updates to, so
|
||||
/// that we can calculate the dew-point temperature and enthalpy
|
||||
/// values to publish back to the MQTT broker.
|
||||
var sensors: [TemperatureAndHumiditySensor]
|
||||
private var sensors: [TemperatureAndHumiditySensor]
|
||||
|
||||
var topics: [String] {
|
||||
private var topics: [String] {
|
||||
sensors.reduce(into: [String]()) { array, sensor in
|
||||
array.append(sensor.topics.temperature)
|
||||
array.append(sensor.topics.humidity)
|
||||
@@ -57,30 +54,16 @@ public actor SensorsService: Service {
|
||||
public func run() async throws {
|
||||
precondition(sensors.count > 0, "Sensors should not be empty.")
|
||||
|
||||
try await withGracefulShutdownHandler {
|
||||
// Listen for connection events, so that we can automatically
|
||||
// reconnect any sensor topics we're listening to upon a disconnect / reconnect
|
||||
// event. We can also shutdown any topic listeners upon a shutdown event.
|
||||
for await event in try connectionStream().cancelOnGracefulShutdown() {
|
||||
switch event {
|
||||
case .shuttingDown:
|
||||
logger?.debug("Received shutdown event.")
|
||||
try await self.shutdown()
|
||||
case .disconnected:
|
||||
logger?.debug("Received disconnected event.")
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
case .connected:
|
||||
logger?.debug("Received connected event.")
|
||||
let stream = try await makeStream()
|
||||
|
||||
await withGracefulShutdownHandler {
|
||||
for await result in stream.cancelOnGracefulShutdown() {
|
||||
logger?.debug("Received result for topic: \(result.topic)")
|
||||
await self.handleResult(result)
|
||||
}
|
||||
}
|
||||
await handleResult(result)
|
||||
}
|
||||
} onGracefulShutdown: {
|
||||
Task {
|
||||
self.logger?.debug("Received graceful shutdown.")
|
||||
Task {
|
||||
try await self.shutdown()
|
||||
}
|
||||
}
|
||||
@@ -89,24 +72,13 @@ public actor SensorsService: Service {
|
||||
@_spi(Internal)
|
||||
public func shutdown() async throws {
|
||||
try await publishUpdates()
|
||||
topicListener.shutdown()
|
||||
}
|
||||
|
||||
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
|
||||
try await topicListener.listen(to: topics)
|
||||
// ignore errors, so that we continue to listen, but log them
|
||||
// for debugging purposes.
|
||||
.compactMap { result in
|
||||
switch result {
|
||||
case let .failure(error):
|
||||
self.logger?.debug("Received error listening for sensors: \(error)")
|
||||
return nil
|
||||
case let .success(info):
|
||||
return (info.payload, info.topicName)
|
||||
}
|
||||
}
|
||||
// ignore duplicate values, to prevent publishing dew-point and enthalpy
|
||||
// changes to frequently.
|
||||
try await mqtt.listen(to: topics)
|
||||
.map { ($0.payload, $0.topicName) }
|
||||
.removeDuplicates { lhs, rhs in
|
||||
lhs.buffer == rhs.buffer
|
||||
&& lhs.topic == rhs.topic
|
||||
@@ -121,8 +93,7 @@ public actor SensorsService: Service {
|
||||
logger?.debug("Begin handling result for topic: \(topic)")
|
||||
|
||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||
var buffer = result.buffer
|
||||
return V(buffer: &buffer)
|
||||
return V(buffer: result.buffer)
|
||||
}
|
||||
|
||||
if topic.contains("temperature") {
|
||||
@@ -153,9 +124,9 @@ public actor SensorsService: Service {
|
||||
|
||||
private func publish(_ double: Double?, to topic: String) async throws {
|
||||
guard let double else { return }
|
||||
try await topicPublisher.publish(
|
||||
try await mqtt.publish(
|
||||
ByteBufferAllocator().buffer(string: "\(double)"),
|
||||
to: topic,
|
||||
payload: ByteBufferAllocator().buffer(string: "\(double)"),
|
||||
qos: .exactlyOnce,
|
||||
retain: true
|
||||
)
|
||||
@@ -204,3 +175,38 @@ private extension Array where Element == TemperatureAndHumiditySensor {
|
||||
self[index].needsProcessed = false
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a type that can be initialized by a ``ByteBuffer``.
|
||||
protocol BufferInitalizable {
|
||||
init?(buffer: ByteBuffer)
|
||||
}
|
||||
|
||||
extension Double: BufferInitalizable {
|
||||
|
||||
/// Attempt to create / parse a double from a byte buffer.
|
||||
init?(buffer: ByteBuffer) {
|
||||
let string = String(buffer: buffer)
|
||||
self.init(string)
|
||||
}
|
||||
}
|
||||
|
||||
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
|
||||
init?(buffer: ByteBuffer) {
|
||||
guard let value = RawValue(buffer: buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
extension Humidity<Relative>: BufferInitalizable {
|
||||
init?(buffer: ByteBuffer) {
|
||||
guard let value = Double(buffer: buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
extension Temperature<DryAir>: BufferInitalizable {
|
||||
init?(buffer: ByteBuffer) {
|
||||
guard let value = Double(buffer: buffer) else { return nil }
|
||||
self.init(value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,186 +0,0 @@
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import Foundation
|
||||
import MQTTNIO
|
||||
|
||||
/// A dependency that can generate an async stream of changes to the given topics.
|
||||
///
|
||||
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
|
||||
/// to generate the live dependency.
|
||||
@DependencyClient
|
||||
public struct TopicListener: Sendable {
|
||||
|
||||
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, any Error>>
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
|
||||
|
||||
/// Shutdown the listener stream.
|
||||
public var shutdown: @Sendable () -> Void
|
||||
|
||||
/// Create a new topic listener.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - listen: Generate an async stream of changes for the given topics.
|
||||
/// - shutdown: Shutdown the topic listener stream.
|
||||
public init(
|
||||
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
|
||||
shutdown: @Sendable @escaping () -> Void
|
||||
) {
|
||||
self._listen = listen
|
||||
self.shutdown = shutdown
|
||||
}
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topics: The topics to listen for changes to.
|
||||
/// - qos: The MQTTQoS for the subscription.
|
||||
public func listen(
|
||||
to topics: [String],
|
||||
qos: MQTTQoS = .atLeastOnce
|
||||
) async throws -> Stream {
|
||||
try await _listen(topics, qos)
|
||||
}
|
||||
|
||||
/// Create an async stream that listens for changes to the given topics.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topics: The topics to listen for changes to.
|
||||
/// - qos: The MQTTQoS for the subscription.
|
||||
public func listen(
|
||||
_ topics: String...,
|
||||
qos: MQTTQoS = .atLeastOnce
|
||||
) async throws -> Stream {
|
||||
try await listen(to: topics, qos: qos)
|
||||
}
|
||||
|
||||
/// Create the live implementation of the topic listener with the given MQTTClient.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - client: The MQTTClient to use.
|
||||
public static func live(client: MQTTClient) -> Self {
|
||||
let listener = MQTTTopicListener(client: client)
|
||||
return .init(
|
||||
listen: { try await listener.listen($0, $1) },
|
||||
shutdown: { listener.shutdown() }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
extension TopicListener: TestDependencyKey {
|
||||
public static var testValue: TopicListener { Self() }
|
||||
}
|
||||
|
||||
public extension DependencyValues {
|
||||
var topicListener: TopicListener {
|
||||
get { self[TopicListener.self] }
|
||||
set { self[TopicListener.self] = newValue }
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
private actor MQTTTopicListener {
|
||||
|
||||
private let client: MQTTClient
|
||||
private let continuation: TopicListener.Stream.Continuation
|
||||
private let name: String
|
||||
let stream: TopicListener.Stream
|
||||
private var shuttingDown: Bool = false
|
||||
|
||||
init(
|
||||
client: MQTTClient
|
||||
) {
|
||||
let (stream, continuation) = TopicListener.Stream.makeStream()
|
||||
self.client = client
|
||||
self.continuation = continuation
|
||||
self.name = UUID().uuidString
|
||||
self.stream = stream
|
||||
}
|
||||
|
||||
deinit {
|
||||
if !shuttingDown {
|
||||
let message = """
|
||||
Shutdown was not called on topic listener. This could lead to potential errors or
|
||||
the stream never ending.
|
||||
|
||||
Please ensure that you call shutdown on the listener.
|
||||
"""
|
||||
client.logger.warning("\(message)")
|
||||
continuation.finish()
|
||||
}
|
||||
client.removePublishListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
}
|
||||
|
||||
func listen(
|
||||
_ topics: [String],
|
||||
_ qos: MQTTQoS = .atLeastOnce
|
||||
) async throws -> TopicListener.Stream {
|
||||
var sleepTimes = 0
|
||||
|
||||
while !client.isActive() {
|
||||
guard sleepTimes < 10 else {
|
||||
throw TopicListenerError.connectionTimeout
|
||||
}
|
||||
try? await Task.sleep(for: .milliseconds(100))
|
||||
sleepTimes += 1
|
||||
}
|
||||
|
||||
client.logger.trace("Client is active, begin subscribing to topics.")
|
||||
|
||||
let subscription = try? await client.subscribe(to: topics.map {
|
||||
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
|
||||
})
|
||||
|
||||
guard subscription != nil else {
|
||||
client.logger.error("Error subscribing to topics: \(topics)")
|
||||
throw TopicListenerError.failedToSubscribe
|
||||
}
|
||||
|
||||
client.logger.trace("Done subscribing, begin listening to topics.")
|
||||
|
||||
client.addPublishListener(named: name) { result in
|
||||
switch result {
|
||||
case let .failure(error):
|
||||
self.client.logger.error("Received error while listening: \(error)")
|
||||
self.continuation.yield(.failure(MQTTListenResultError(error)))
|
||||
case let .success(publishInfo):
|
||||
if topics.contains(publishInfo.topicName) {
|
||||
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
|
||||
self.continuation.yield(.success(publishInfo))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
private func setIsShuttingDown() {
|
||||
shuttingDown = true
|
||||
}
|
||||
|
||||
nonisolated func shutdown() {
|
||||
client.logger.trace("Closing topic listener...")
|
||||
continuation.finish()
|
||||
client.removePublishListener(named: name)
|
||||
client.removeShutdownListener(named: name)
|
||||
Task { await self.setIsShuttingDown() }
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Errors
|
||||
|
||||
public enum TopicListenerError: Error {
|
||||
case connectionTimeout
|
||||
case failedToSubscribe
|
||||
}
|
||||
|
||||
public struct MQTTListenResultError: Error {
|
||||
let underlyingError: any Error
|
||||
|
||||
init(_ underlyingError: any Error) {
|
||||
self.underlyingError = underlyingError
|
||||
}
|
||||
}
|
||||
@@ -1,117 +0,0 @@
|
||||
import Dependencies
|
||||
import DependenciesMacros
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
|
||||
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||
///
|
||||
/// - Note: This dependency only conforms to `TestDependencyKey` because it
|
||||
/// requires an active `MQTTClient` to generate the live dependency.
|
||||
@DependencyClient
|
||||
public struct TopicPublisher: Sendable {
|
||||
|
||||
private var _publish: @Sendable (PublishRequest) async throws -> Void
|
||||
|
||||
/// Create a new topic publisher.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - publish: Handle the publish request.
|
||||
public init(
|
||||
publish: @Sendable @escaping (PublishRequest) async throws -> Void
|
||||
) {
|
||||
self._publish = publish
|
||||
}
|
||||
|
||||
/// Publish a new value to the given topic.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topicName: The topic to publish the new value to.
|
||||
/// - payload: The value to publish.
|
||||
/// - qos: The MQTTQoS.
|
||||
/// - retain: The retain flag.
|
||||
public func publish(
|
||||
to topicName: String,
|
||||
payload: ByteBuffer,
|
||||
qos: MQTTQoS,
|
||||
retain: Bool = false
|
||||
) async throws {
|
||||
try await _publish(.init(
|
||||
topicName: topicName,
|
||||
payload: payload,
|
||||
qos: qos,
|
||||
retain: retain
|
||||
))
|
||||
}
|
||||
|
||||
/// Create the live topic publisher with the given `MQTTClient`.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - client: The mqtt broker client to use.
|
||||
public static func live(client: MQTTClient) -> Self {
|
||||
.init(
|
||||
publish: { request in
|
||||
guard client.isActive() else {
|
||||
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
|
||||
return
|
||||
}
|
||||
client.logger.trace("Begin publishing to topic: \(request.topicName)")
|
||||
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
|
||||
try await client.publish(
|
||||
to: request.topicName,
|
||||
payload: request.payload,
|
||||
qos: request.qos,
|
||||
retain: request.retain
|
||||
)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/// Represents the parameters required to publish a new value to the
|
||||
/// MQTT broker.
|
||||
public struct PublishRequest: Equatable, Sendable {
|
||||
|
||||
/// The topic to publish the new value to.
|
||||
public let topicName: String
|
||||
|
||||
/// The value to publish.
|
||||
public let payload: ByteBuffer
|
||||
|
||||
/// The qos of the request.
|
||||
public let qos: MQTTQoS
|
||||
|
||||
/// The retain flag for the request.
|
||||
public let retain: Bool
|
||||
|
||||
/// Create a new publish request.
|
||||
///
|
||||
/// - Parameters:
|
||||
/// - topicName: The topic to publish to.
|
||||
/// - payload: The value to publish.
|
||||
/// - qos: The qos of the request.
|
||||
/// - retain: The retain flag of the request.
|
||||
public init(
|
||||
topicName: String,
|
||||
payload: ByteBuffer,
|
||||
qos: MQTTQoS,
|
||||
retain: Bool
|
||||
) {
|
||||
self.topicName = topicName
|
||||
self.payload = payload
|
||||
self.qos = qos
|
||||
self.retain = retain
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension TopicPublisher: TestDependencyKey {
|
||||
public static var testValue: TopicPublisher { Self() }
|
||||
}
|
||||
|
||||
public extension DependencyValues {
|
||||
|
||||
/// A dependency that is responsible for publishing values to an MQTT broker.
|
||||
var topicPublisher: TopicPublisher {
|
||||
get { self[TopicPublisher.self] }
|
||||
set { self[TopicPublisher.self] = newValue }
|
||||
}
|
||||
}
|
||||
@@ -1,119 +0,0 @@
|
||||
import Dependencies
|
||||
import Foundation
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionManager
|
||||
import MQTTConnectionService
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
import SensorsService
|
||||
import ServiceLifecycle
|
||||
import TopicDependencies
|
||||
|
||||
@main
|
||||
struct Application {
|
||||
|
||||
/// The main entry point of the application.
|
||||
static func main() async throws {
|
||||
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
var logger = Logger(label: "dewpoint-controller")
|
||||
logger.logLevel = .trace
|
||||
|
||||
logger.info("Starting dewpoint-controller!")
|
||||
|
||||
let environment = loadEnvVars(logger: logger)
|
||||
|
||||
if environment.appEnv == .production {
|
||||
logger.debug("Updating logging level to info.")
|
||||
logger.logLevel = .info
|
||||
}
|
||||
|
||||
let mqtt = MQTTClient(
|
||||
envVars: environment,
|
||||
eventLoopGroup: eventloopGroup,
|
||||
logger: logger
|
||||
)
|
||||
|
||||
do {
|
||||
try await withDependencies {
|
||||
$0.psychrometricClient = .liveValue
|
||||
$0.topicListener = .live(client: mqtt)
|
||||
$0.topicPublisher = .live(client: mqtt)
|
||||
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
|
||||
} operation: {
|
||||
let mqttConnection = MQTTConnectionService(logger: logger)
|
||||
let sensors = SensorsService(sensors: .live, logger: logger)
|
||||
|
||||
var serviceGroupConfiguration = ServiceGroupConfiguration(
|
||||
services: [
|
||||
mqttConnection,
|
||||
sensors
|
||||
],
|
||||
gracefulShutdownSignals: [.sigterm, .sigint],
|
||||
logger: logger
|
||||
)
|
||||
serviceGroupConfiguration.maximumCancellationDuration = .seconds(5)
|
||||
serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10)
|
||||
|
||||
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
|
||||
|
||||
try await serviceGroup.run()
|
||||
}
|
||||
|
||||
try await mqtt.shutdown()
|
||||
try await eventloopGroup.shutdownGracefully()
|
||||
} catch {
|
||||
try await eventloopGroup.shutdownGracefully()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
private func loadEnvVars(logger: Logger) -> EnvVars {
|
||||
let defaultEnvVars = EnvVars()
|
||||
let encoder = JSONEncoder()
|
||||
let decoder = JSONDecoder()
|
||||
|
||||
let defaultEnvDict = (try? encoder.encode(defaultEnvVars))
|
||||
.flatMap { try? decoder.decode([String: String].self, from: $0) }
|
||||
?? [:]
|
||||
|
||||
let envVarsDict = defaultEnvDict
|
||||
.merging(ProcessInfo.processInfo.environment, uniquingKeysWith: { $1 })
|
||||
|
||||
let envVars = (try? JSONSerialization.data(withJSONObject: envVarsDict))
|
||||
.flatMap { try? decoder.decode(EnvVars.self, from: $0) }
|
||||
?? defaultEnvVars
|
||||
|
||||
logger.debug("Done loading EnvVars...")
|
||||
|
||||
return envVars
|
||||
}
|
||||
|
||||
private extension MQTTNIO.MQTTClient {
|
||||
convenience init(envVars: EnvVars, eventLoopGroup: EventLoopGroup, logger: Logger?) {
|
||||
self.init(
|
||||
host: envVars.host,
|
||||
port: envVars.port != nil ? Int(envVars.port!) : nil,
|
||||
identifier: envVars.identifier,
|
||||
eventLoopGroupProvider: .shared(eventLoopGroup),
|
||||
logger: logger,
|
||||
configuration: .init(
|
||||
version: .v3_1_1,
|
||||
disablePing: false,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private extension Array where Element == TemperatureAndHumiditySensor {
|
||||
static var live: Self {
|
||||
TemperatureAndHumiditySensor.Location.allCases.map { location in
|
||||
TemperatureAndHumiditySensor(location: location)
|
||||
}
|
||||
}
|
||||
}
|
||||
215
Tests/IntegrationTests/IntegrationTests.swift
Normal file
215
Tests/IntegrationTests/IntegrationTests.swift
Normal file
@@ -0,0 +1,215 @@
|
||||
import Dependencies
|
||||
|
||||
// @_spi(Internal) import dewpoint_controller
|
||||
import Logging
|
||||
import Models
|
||||
import MQTTConnectionService
|
||||
@_spi(Internal) import MQTTManager
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
@_spi(Internal) import SensorsService
|
||||
import ServiceLifecycle
|
||||
import ServiceLifecycleTestKit
|
||||
import XCTest
|
||||
|
||||
final class IntegrationTests: XCTestCase {
|
||||
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "IntegrationTests")
|
||||
logger.logLevel = .info
|
||||
return logger
|
||||
}()
|
||||
|
||||
override func invokeTest() {
|
||||
let client = createClient(identifier: "\(Self.self)")
|
||||
|
||||
withDependencies {
|
||||
$0.mqtt = .live(client: client, logger: Self.logger)
|
||||
$0.psychrometricClient = PsychrometricClient.liveValue
|
||||
} operation: {
|
||||
super.invokeTest()
|
||||
}
|
||||
}
|
||||
|
||||
func testConnectionServiceShutdown() async throws {
|
||||
@Dependency(\.mqtt) var mqtt
|
||||
|
||||
let service = MQTTConnectionService(logger: Self.logger)
|
||||
let task = Task { try await service.run() }
|
||||
defer { task.cancel() }
|
||||
|
||||
try await Task.sleep(for: .milliseconds(200))
|
||||
|
||||
// check the connection is active here.
|
||||
try await mqtt.withClient { client in
|
||||
XCTAssert(client.isActive())
|
||||
}
|
||||
mqtt.shutdown()
|
||||
|
||||
try await Task.sleep(for: .milliseconds(500))
|
||||
|
||||
// check the connection is active here.
|
||||
try await mqtt.withClient { client in
|
||||
XCTAssertFalse(client.isActive())
|
||||
}
|
||||
}
|
||||
|
||||
func testMQTTConnectionStream() async throws {
|
||||
let client = createClient(identifier: "testNonManagedStream")
|
||||
let manager = MQTTManager.live(
|
||||
client: client,
|
||||
logger: Self.logger,
|
||||
alwaysReconnect: false
|
||||
)
|
||||
defer { manager.shutdown() }
|
||||
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||
var events1 = [MQTTManager.Event]()
|
||||
var events2 = [MQTTManager.Event]()
|
||||
|
||||
let stream1 = connectionStream1.start()
|
||||
let stream2 = connectionStream2.start()
|
||||
|
||||
_ = try await manager.connect()
|
||||
|
||||
Task {
|
||||
while !client.isActive() {
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
}
|
||||
try await Task.sleep(for: .milliseconds(200))
|
||||
try await client.disconnect()
|
||||
try await Task.sleep(for: .milliseconds(500))
|
||||
manager.shutdown()
|
||||
try await Task.sleep(for: .milliseconds(500))
|
||||
connectionStream1.stop()
|
||||
connectionStream2.stop()
|
||||
}
|
||||
|
||||
for await event in stream1.removeDuplicates() {
|
||||
events1.append(event)
|
||||
}
|
||||
for await event in stream2.removeDuplicates() {
|
||||
events2.append(event)
|
||||
}
|
||||
|
||||
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||
}
|
||||
|
||||
func testListeningResumesAfterDisconnectThenReconnect() async throws {
|
||||
struct TimeoutError: Error {}
|
||||
|
||||
let sensor = TemperatureAndHumiditySensor(location: .return)
|
||||
let results = ResultContainer()
|
||||
|
||||
try await withDependencies {
|
||||
$0.mqtt.publish = results.append
|
||||
} operation: {
|
||||
@Dependency(\.mqtt) var manager
|
||||
|
||||
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
|
||||
let task = Task { try await sensorsService.run() }
|
||||
defer { task.cancel() }
|
||||
|
||||
try await manager.connect()
|
||||
defer { manager.shutdown() }
|
||||
|
||||
try await manager.withClient { client in
|
||||
try await client.disconnect()
|
||||
try await client.connect()
|
||||
|
||||
while !client.isActive() {
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
}
|
||||
|
||||
// Give time to re-subscribe.
|
||||
try await Task.sleep(for: .milliseconds(200))
|
||||
|
||||
try await client.publish(
|
||||
to: sensor.topics.temperature,
|
||||
payload: ByteBufferAllocator().buffer(string: "25"),
|
||||
qos: .atLeastOnce,
|
||||
retain: false
|
||||
)
|
||||
try await client.publish(
|
||||
to: sensor.topics.humidity,
|
||||
payload: ByteBufferAllocator().buffer(string: "50"),
|
||||
qos: .atLeastOnce,
|
||||
retain: false
|
||||
)
|
||||
}
|
||||
|
||||
var timeoutCount = 0
|
||||
while !(await results.count == 2) {
|
||||
guard timeoutCount < 20 else {
|
||||
throw TimeoutError()
|
||||
}
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
timeoutCount += 1
|
||||
}
|
||||
|
||||
let results = await results.results()
|
||||
|
||||
XCTAssertEqual(results.count, 2)
|
||||
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
|
||||
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
|
||||
try await sensorsService.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
let envVars = EnvVars(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
// return .init(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: Self.logger)
|
||||
let config = MQTTClient.Configuration(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
return .init(
|
||||
host: Self.hostname,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(eventLoopGroup),
|
||||
logger: Self.logger,
|
||||
configuration: config
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// - MARK: Helpers
|
||||
|
||||
struct TopicNotFoundError: Error {}
|
||||
|
||||
actor ResultContainer: Sendable {
|
||||
|
||||
private var storage = [MQTTManager.PublishRequest]()
|
||||
|
||||
init() {}
|
||||
|
||||
@Sendable func append(_ result: MQTTManager.PublishRequest) async {
|
||||
storage.append(result)
|
||||
}
|
||||
|
||||
var count: Int {
|
||||
get async { storage.count }
|
||||
}
|
||||
|
||||
func results() async -> [MQTTManager.PublishRequest] {
|
||||
storage
|
||||
}
|
||||
}
|
||||
@@ -1,92 +0,0 @@
|
||||
import AsyncAlgorithms
|
||||
import Logging
|
||||
import Models
|
||||
@_spi(Internal) import MQTTConnectionManager
|
||||
import MQTTConnectionService
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import ServiceLifecycle
|
||||
import ServiceLifecycleTestKit
|
||||
import XCTest
|
||||
|
||||
final class MQTTConnectionServiceTests: XCTestCase {
|
||||
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "MQTTConnectionServiceTests")
|
||||
logger.logLevel = .trace
|
||||
return logger
|
||||
}()
|
||||
|
||||
// TODO: Move to integration tests.
|
||||
func testMQTTConnectionStream() async throws {
|
||||
let client = createClient(identifier: "testNonManagedStream")
|
||||
let manager = MQTTConnectionManager.live(
|
||||
client: client,
|
||||
logger: Self.logger,
|
||||
alwaysReconnect: false
|
||||
)
|
||||
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
|
||||
var events1 = [MQTTConnectionManager.Event]()
|
||||
var events2 = [MQTTConnectionManager.Event]()
|
||||
|
||||
let stream1 = connectionStream1.start()
|
||||
let stream2 = connectionStream2.start()
|
||||
|
||||
_ = try await manager.connect()
|
||||
|
||||
Task {
|
||||
while !client.isActive() {
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
}
|
||||
try await Task.sleep(for: .milliseconds(200))
|
||||
manager.shutdown()
|
||||
try await client.disconnect()
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
try await client.shutdown()
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
connectionStream1.stop()
|
||||
connectionStream2.stop()
|
||||
}
|
||||
|
||||
for await event in stream1.removeDuplicates() {
|
||||
events1.append(event)
|
||||
}
|
||||
for await event in stream2.removeDuplicates() {
|
||||
events2.append(event)
|
||||
}
|
||||
|
||||
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
|
||||
}
|
||||
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
let envVars = EnvVars(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
let config = MQTTClient.Configuration(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
return .init(
|
||||
host: Self.hostname,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
||||
logger: Self.logger,
|
||||
configuration: config
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,175 +0,0 @@
|
||||
import Dependencies
|
||||
import Logging
|
||||
import Models
|
||||
@_spi(Internal) import MQTTConnectionManager
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
import PsychrometricClientLive
|
||||
@_spi(Internal) import SensorsService
|
||||
import TopicDependencies
|
||||
import XCTest
|
||||
|
||||
final class SensorsClientTests: XCTestCase {
|
||||
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "SensorsClientTests")
|
||||
logger.logLevel = .trace
|
||||
return logger
|
||||
}()
|
||||
|
||||
override func invokeTest() {
|
||||
let client = createClient(identifier: "\(Self.self)")
|
||||
|
||||
withDependencies {
|
||||
$0.mqttConnectionManager = .live(client: client, logger: Self.logger)
|
||||
$0.psychrometricClient = PsychrometricClient.liveValue
|
||||
$0.topicListener = .live(client: client)
|
||||
$0.topicPublisher = .live(client: client)
|
||||
} operation: {
|
||||
super.invokeTest()
|
||||
}
|
||||
}
|
||||
|
||||
func testListeningResumesAfterDisconnectThenReconnect() async throws {
|
||||
@Dependency(\.mqttConnectionManager) var manager
|
||||
struct TimeoutError: Error {}
|
||||
|
||||
let sensor = TemperatureAndHumiditySensor(location: .return)
|
||||
var results = [TopicPublisher.PublishRequest]()
|
||||
|
||||
try await withDependencies {
|
||||
$0.topicPublisher = .capturing { results.append($0) }
|
||||
} operation: {
|
||||
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
|
||||
let task = Task { try await sensorsService.run() }
|
||||
defer { task.cancel() }
|
||||
|
||||
try await manager.connect()
|
||||
defer { manager.shutdown() }
|
||||
|
||||
try await manager.withClient { client in
|
||||
try await client.disconnect()
|
||||
try await client.connect()
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
try await client.publish(
|
||||
to: sensor.topics.temperature,
|
||||
payload: ByteBufferAllocator().buffer(string: "25"),
|
||||
qos: .atLeastOnce,
|
||||
retain: false
|
||||
)
|
||||
try await client.publish(
|
||||
to: sensor.topics.humidity,
|
||||
payload: ByteBufferAllocator().buffer(string: "50"),
|
||||
qos: .atLeastOnce,
|
||||
retain: false
|
||||
)
|
||||
}
|
||||
|
||||
var timeoutCount = 0
|
||||
while !(results.count == 2) {
|
||||
guard timeoutCount < 20 else {
|
||||
throw TimeoutError()
|
||||
}
|
||||
try await Task.sleep(for: .milliseconds(100))
|
||||
timeoutCount += 1
|
||||
}
|
||||
|
||||
XCTAssertEqual(results.count, 2)
|
||||
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
|
||||
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
|
||||
try await sensorsService.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
let envVars = EnvVars(
|
||||
appEnv: .testing,
|
||||
host: Self.hostname,
|
||||
port: "1883",
|
||||
identifier: identifier,
|
||||
userName: nil,
|
||||
password: nil
|
||||
)
|
||||
let config = MQTTClient.Configuration(
|
||||
version: .v3_1_1,
|
||||
userName: envVars.userName,
|
||||
password: envVars.password,
|
||||
useSSL: false,
|
||||
useWebSockets: false,
|
||||
tlsConfiguration: nil,
|
||||
webSocketURLPath: nil
|
||||
)
|
||||
return .init(
|
||||
host: Self.hostname,
|
||||
identifier: identifier,
|
||||
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)),
|
||||
logger: Self.logger,
|
||||
configuration: config
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Helpers for tests.
|
||||
|
||||
class PublishInfoContainer {
|
||||
private(set) var info: [MQTTPublishInfo]
|
||||
private var topicFilters: [String]?
|
||||
|
||||
init(topicFilters: [String]? = nil) {
|
||||
self.info = []
|
||||
self.topicFilters = topicFilters
|
||||
}
|
||||
|
||||
func addPublishInfo(_ info: MQTTPublishInfo) async {
|
||||
guard let topicFilters else {
|
||||
self.info.append(info)
|
||||
return
|
||||
}
|
||||
if topicFilters.contains(info.topicName) {
|
||||
self.info.append(info)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extension TopicPublisher {
|
||||
static func capturing(
|
||||
_ callback: @escaping (PublishRequest) -> Void
|
||||
) -> Self {
|
||||
.init { callback($0) }
|
||||
}
|
||||
}
|
||||
|
||||
// extension SensorsClient {
|
||||
//
|
||||
// static func testing(
|
||||
// yielding: [(value: Double, to: String)],
|
||||
// capturePublishedValues: @escaping (Double, String) -> Void,
|
||||
// captureShutdownEvent: @escaping (Bool) -> Void
|
||||
// ) -> Self {
|
||||
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
|
||||
// let logger = Logger(label: "\(Self.self).testing")
|
||||
//
|
||||
// return .init(
|
||||
// listen: { topics in
|
||||
// for (value, topic) in yielding where topics.contains(topic) {
|
||||
// continuation.yield(
|
||||
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
|
||||
// )
|
||||
// }
|
||||
// return stream
|
||||
// },
|
||||
// logger: logger,
|
||||
// publish: { value, topic in
|
||||
// capturePublishedValues(value, topic)
|
||||
// },
|
||||
// shutdown: {
|
||||
// captureShutdownEvent(true)
|
||||
// continuation.finish()
|
||||
// }
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
|
||||
struct TopicNotFoundError: Error {}
|
||||
@@ -1,53 +0,0 @@
|
||||
# run this with docker-compose run test
|
||||
services:
|
||||
server:
|
||||
image: swift-mqtt-dewpoint:latest
|
||||
restart: unless-stopped
|
||||
env_file: .env
|
||||
|
||||
local:
|
||||
container_name: local-server
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
depends_on:
|
||||
- mosquitto
|
||||
environment:
|
||||
- MQTT_HOST=mosquitto
|
||||
|
||||
test:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile.test
|
||||
working_dir: /app
|
||||
networks:
|
||||
- test
|
||||
depends_on:
|
||||
- mosquitto-test
|
||||
environment:
|
||||
- MOSQUITTO_SERVER=mosquitto-test
|
||||
command: /bin/bash -xc "swift package clean && swift test"
|
||||
|
||||
mosquitto-test:
|
||||
image: eclipse-mosquitto
|
||||
networks:
|
||||
- test
|
||||
volumes:
|
||||
- ./mosquitto/config:/mosquitto/config
|
||||
- ./mosquitto/certs:/mosquitto/certs
|
||||
|
||||
mosquitto:
|
||||
image: eclipse-mosquitto
|
||||
volumes:
|
||||
- ./mosquitto/config:/mosquitto/config
|
||||
- ./mosquitto/certs:/mosquitto/certs
|
||||
ports:
|
||||
- "1883:1883"
|
||||
- "8883:8883"
|
||||
- "8080:8080"
|
||||
- "8081:8081"
|
||||
|
||||
networks:
|
||||
test:
|
||||
driver: bridge
|
||||
external: false
|
||||
15
docker/Dockerfile
Executable file
15
docker/Dockerfile
Executable file
@@ -0,0 +1,15 @@
|
||||
# Used this to build the release version of the image.
|
||||
# Build the executable
|
||||
ARG SWIFT_IMAGE_VERSION="5.10"
|
||||
|
||||
FROM swift:${SWIFT_IMAGE_VERSION} AS build
|
||||
WORKDIR /build
|
||||
COPY ./Package.* ./
|
||||
RUN swift package resolve
|
||||
COPY . .
|
||||
RUN swift build -c release -Xswiftc -g
|
||||
|
||||
# Run image
|
||||
FROM swift:${SWIFT_IMAGE_VERSION}-slim
|
||||
COPY --from=build /build/.build/release/dewpoint-controller /usr/local/bin
|
||||
CMD ["/bin/bash", "-xc", "/usr/local/bin/dewpoint-controller run"]
|
||||
5
docker/Dockerfile.mosquitto
Normal file
5
docker/Dockerfile.mosquitto
Normal file
@@ -0,0 +1,5 @@
|
||||
# Used to build a local MQTT broker for development and
|
||||
# testing.
|
||||
FROM eclipse-mosquitto:latest
|
||||
COPY ./mosquitto/config/mosquitto.conf /mosquitto/config/mosquitto.conf
|
||||
EXPOSE 1883
|
||||
@@ -1,6 +1,8 @@
|
||||
# Used to build a test image.
|
||||
FROM swift:5.10
|
||||
WORKDIR /app
|
||||
COPY ./Package.* ./
|
||||
RUN swift package resolve
|
||||
COPY . .
|
||||
RUN swift build
|
||||
CMD ["/bin/bash", "-xc", "swift", "test"]
|
||||
17
docker/docker-compose-test.yaml
Executable file
17
docker/docker-compose-test.yaml
Executable file
@@ -0,0 +1,17 @@
|
||||
# run this with docker-compose run test
|
||||
services:
|
||||
test:
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: docker/Dockerfile.test
|
||||
working_dir: /app
|
||||
depends_on:
|
||||
- mosquitto
|
||||
environment:
|
||||
- MOSQUITTO_SERVER=mosquitto
|
||||
command: /bin/bash -xc "swift test"
|
||||
|
||||
mosquitto:
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: docker/Dockerfile.mosquitto
|
||||
18
docker/docker-compose.yaml
Executable file
18
docker/docker-compose.yaml
Executable file
@@ -0,0 +1,18 @@
|
||||
# run this with docker-compose run dewpoint_controller
|
||||
services:
|
||||
dewpoint_controller:
|
||||
container_name: dewpoint-controller
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: docker/Dockerfile
|
||||
depends_on:
|
||||
- mosquitto
|
||||
environment:
|
||||
- MQTT_HOST=mosquitto
|
||||
|
||||
mosquitto:
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: docker/Dockerfile.mosquitto
|
||||
ports:
|
||||
- "1883:1883"
|
||||
Reference in New Issue
Block a user