diff --git a/Makefile b/Makefile index 723185a..d4ee37f 100755 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ stop-mosquitto: @docker-compose rm -f mosquitto || true test-docker: - @docker-compose run --remove-orphans -i --rm test + @docker-compose run --build --remove-orphans -i --rm test @docker-compose kill mosquitto-test @docker-compose rm -f diff --git a/Package.swift b/Package.swift index 120730b..ee0f1d7 100755 --- a/Package.swift +++ b/Package.swift @@ -40,10 +40,6 @@ let package = Package( .product(name: "PsychrometricClientLive", package: "swift-psychrometrics") ] ), - .testTarget( - name: "dewPoint-controllerTests", - dependencies: ["dewPoint-controller"] - ), .target( name: "Models", dependencies: [ diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index b7e123d..ded1dd4 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -71,7 +71,7 @@ public actor MQTTConnectionService: Service { } } - private nonisolated func shutdown() { + nonisolated func shutdown() { logger.debug("Begin shutting down MQTT broker connection.") client.removeCloseListener(named: name) internalEventStream.stop() diff --git a/Sources/Models/TrackedChanges.swift b/Sources/Models/TrackedChanges.swift index 6114ac4..a1b60cf 100755 --- a/Sources/Models/TrackedChanges.swift +++ b/Sources/Models/TrackedChanges.swift @@ -3,7 +3,7 @@ /// This allows values to only publish changes if they have changed since the /// last time they were recieved. @propertyWrapper -public struct TrackedChanges { +public struct TrackedChanges: Sendable { /// The current tracking state. private var tracking: TrackingState @@ -12,7 +12,7 @@ public struct TrackedChanges { private var value: Value /// Used to check if a new value is equal to an old value. - private var isEqual: (Value, Value) -> Bool + private var isEqual: @Sendable (Value, Value) -> Bool /// Access to the underlying property that we are wrapping. public var wrappedValue: Value { @@ -35,7 +35,7 @@ public struct TrackedChanges { public init( wrappedValue: Value, needsProcessed: Bool = false, - isEqual: @escaping (Value, Value) -> Bool + isEqual: @escaping @Sendable (Value, Value) -> Bool ) { self.value = wrappedValue self.tracking = needsProcessed ? .needsProcessed : .hasProcessed @@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable { wrappedValue: Value, needsProcessed: Bool = false ) { - self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==) + self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) { + $0 == $1 + } } } @@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable { hasher.combine(needsProcessed) } } - -extension TrackedChanges: Sendable where Value: Sendable {} diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index 4518029..29cad79 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -88,10 +88,10 @@ public actor SensorsService: Service { let stream = try await client.listen(to: topics) - try await withGracefulShutdownHandler { - try await withThrowingDiscardingTaskGroup { group in + await withGracefulShutdownHandler { + await withDiscardingTaskGroup { group in for await result in stream { - group.addTask { try await self.handleResult(result) } + group.addTask { await self.handleResult(result) } } } } onGracefulShutdown: { @@ -110,37 +110,41 @@ public actor SensorsService: Service { } } - private func handleResult(_ result: SensorsClient.PublishInfo) async throws { - let topic = result.topic - assert(topics.contains(topic)) - client.logger?.trace("Begin handling result for topic: \(topic)") + private func handleResult(_ result: SensorsClient.PublishInfo) async { + do { + let topic = result.topic + assert(topics.contains(topic)) + client.logger?.trace("Begin handling result for topic: \(topic)") - func decode(_: V.Type) -> V? { - var buffer = result.buffer - return V(buffer: &buffer) - } - - if topic.contains("temperature") { - client.logger?.trace("Begin handling temperature result.") - guard let temperature = decode(DryBulb.self) else { - client.logger?.trace("Failed to decode temperature: \(result.buffer)") - throw DecodingError() + func decode(_: V.Type) -> V? { + var buffer = result.buffer + return V(buffer: &buffer) } - client.logger?.trace("Decoded temperature: \(temperature)") - try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) - } else if topic.contains("humidity") { - client.logger?.trace("Begin handling humidity result.") - guard let humidity = decode(RelativeHumidity.self) else { - client.logger?.trace("Failed to decode humidity: \(result.buffer)") - throw DecodingError() + if topic.contains("temperature") { + client.logger?.trace("Begin handling temperature result.") + guard let temperature = decode(DryBulb.self) else { + client.logger?.trace("Failed to decode temperature: \(result.buffer)") + throw DecodingError() + } + client.logger?.trace("Decoded temperature: \(temperature)") + try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) + + } else if topic.contains("humidity") { + client.logger?.trace("Begin handling humidity result.") + guard let humidity = decode(RelativeHumidity.self) else { + client.logger?.trace("Failed to decode humidity: \(result.buffer)") + throw DecodingError() + } + client.logger?.trace("Decoded humidity: \(humidity)") + try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) } - client.logger?.trace("Decoded humidity: \(humidity)") - try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) - } - try await publishUpdates() - client.logger?.trace("Done handling result for topic: \(topic)") + try await publishUpdates() + client.logger?.trace("Done handling result for topic: \(topic)") + } catch { + client.logger?.error("Received error: \(error)") + } } private func publish(_ double: Double?, to topic: String) async throws { diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index 59dab0d..1c6af48 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -11,6 +11,7 @@ import ServiceLifecycle @main struct Application { + /// The main entry point of the application. static func main() async throws { let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift index 9ab7825..75f7cd0 100644 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift @@ -1,4 +1,3 @@ -import Combine import Logging import Models @testable import MQTTConnectionService @@ -13,22 +12,19 @@ final class MQTTConnectionServiceTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let logger: Logger = { - var logger = Logger(label: "AsyncClientTests") + var logger = Logger(label: "MQTTConnectionServiceTests") logger.logLevel = .trace return logger }() func testGracefulShutdownWorks() async throws { - try await testGracefulShutdown { trigger in - let client = createClient(identifier: "testGracefulShutdown") - let service = MQTTConnectionService(client: client) - try await service.run() - try await Task.sleep(for: .seconds(1)) - XCTAssert(client.isActive()) - trigger.triggerGracefulShutdown() - // try await Task.sleep(for: .seconds(2)) - // XCTAssertFalse(client.isActive()) - } + let client = createClient(identifier: "testGracefulShutdown") + let service = MQTTConnectionService(client: client) + await service.connect() + try await Task.sleep(for: .seconds(1)) + XCTAssert(client.isActive()) + service.shutdown() + XCTAssertFalse(client.isActive()) } func createClient(identifier: String) -> MQTTClient { @@ -58,65 +54,4 @@ final class MQTTConnectionServiceTests: XCTestCase { ) } - func testEventStream() async throws { - var connection: ConnectionStream? = ConnectionStream() - - let task = Task { - guard let events = connection?.events else { return } - print("before loop") - for await event in events { - print("\(event)") - } - print("after loop") - } - - let ending = Task { - try await Task.sleep(for: .seconds(2)) - connection = nil - } - - connection?.start() - try await ending.value - task.cancel() - } - -} - -class ConnectionStream { - - enum Event { - case connected - case disconnected - case shuttingDown - } - - let events: AsyncStream - private let continuation: AsyncStream.Continuation - private var cancellable: AnyCancellable? - - init() { - let (stream, continuation) = AsyncStream.makeStream(of: Event.self) - self.events = stream - self.continuation = continuation - } - - deinit { - print("connection stream is gone.") - stop() - } - - func start() { - cancellable = Timer.publish(every: 1.0, on: .main, in: .common) - .autoconnect() - .sink { [weak self] _ in - print("will send event.") - self?.continuation.yield(.connected) - } - } - - func stop() { - continuation.yield(.shuttingDown) - cancellable = nil - continuation.finish() - } } diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/SensorsServiceTests/SensorsClientTests.swift index 679d46c..5f02291 100755 --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/SensorsServiceTests/SensorsClientTests.swift @@ -12,7 +12,7 @@ final class SensorsClientTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let logger: Logger = { - var logger = Logger(label: "AsyncClientTests") + var logger = Logger(label: "SensorsClientTests") logger.logLevel = .debug return logger }() diff --git a/Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift b/Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift deleted file mode 100755 index ab4b519..0000000 --- a/Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift +++ /dev/null @@ -1,47 +0,0 @@ -import XCTest -import class Foundation.Bundle - -//final class dewPoint_controllerTests: XCTestCase { -// func testExample() throws { -// // This is an example of a functional test case. -// // Use XCTAssert and related functions to verify your tests produce the correct -// // results. -// -// // Some of the APIs that we use below are available in macOS 10.13 and above. -// guard #available(macOS 10.13, *) else { -// return -// } -// -// // Mac Catalyst won't have `Process`, but it is supported for executables. -// #if !targetEnvironment(macCatalyst) -// -// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller") -// -// let process = Process() -// process.executableURL = fooBinary -// -// let pipe = Pipe() -// process.standardOutput = pipe -// -// try process.run() -// process.waitUntilExit() -// -// let data = pipe.fileHandleForReading.readDataToEndOfFile() -// let output = String(data: data, encoding: .utf8) -// -// XCTAssertEqual(output, "Hello, world!\n") -// #endif -// } -// -// /// Returns path to the built products directory. -// var productsDirectory: URL { -// #if os(macOS) -// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") { -// return bundle.bundleURL.deletingLastPathComponent() -// } -// fatalError("couldn't find the products directory") -// #else -// return Bundle.main.bundleURL -// #endif -// } -//}