From 163f603b69f25a542b60e500cc74dbc41c5d0315 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Thu, 14 Nov 2024 14:58:09 -0500 Subject: [PATCH] feat: Fixes some tests and docker builds --- Dockerfile | 4 +- Package.resolved | 6 +- Package.swift | 4 +- Sources/MQTTConnectionManager/Live.swift | 56 ++-- Sources/SensorsService/SensorsService.swift | 62 ++-- Sources/TopicDependencies/TopicListener.swift | 13 +- .../MQTTConnectionServiceTests.swift | 46 ++- .../SensorsClientTests.swift | 265 ++++-------------- docker-compose.yaml | 2 - 9 files changed, 176 insertions(+), 282 deletions(-) diff --git a/Dockerfile b/Dockerfile index 11db039..9a03055 100755 --- a/Dockerfile +++ b/Dockerfile @@ -5,10 +5,10 @@ WORKDIR /build COPY ./Package.* ./ RUN swift package resolve COPY . . -RUN swift build --enable-test-discovery -c release -Xswiftc -g +RUN swift build -c release -Xswiftc -g # Run image FROM swift:5.10-slim WORKDIR /run -COPY --from=build /build/.build/release/dewPoint-controller /run +COPY --from=build /build/.build/release/dewpoint-controller /run CMD ["/bin/bash", "-xc", "./dewpoint-controller"] diff --git a/Package.resolved b/Package.resolved index 379a782..6da5981 100755 --- a/Package.resolved +++ b/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "d3104d51323f6bc98cf3ab2930e5a26f72c9a4fdb7640360ba27628672397841", + "originHash" : "c2538e3229d6c80f3d6a979c2f3605d63b4973e1e49786819017473eb2916f4e", "pins" : [ { "identity" : "combine-schedulers", @@ -69,8 +69,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/pointfreeco/swift-dependencies", "state" : { - "revision" : "0fc0255e780bf742abeef29dec80924f5f0ae7b9", - "version" : "1.4.1" + "revision" : "96eecd47660e8307877acb8c41cc5295ba7350a7", + "version" : "1.5.2" } }, { diff --git a/Package.swift b/Package.swift index f968129..fb79fa9 100755 --- a/Package.swift +++ b/Package.swift @@ -24,7 +24,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), .package(url: "https://github.com/apple/swift-log", from: "1.6.0"), - .package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"), + .package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.5.2"), .package(url: "https://github.com/swift-psychrometrics/swift-psychrometrics", exact: "0.2.3"), .package(url: "https://github.com/swift-server-community/mqtt-nio.git", from: "2.0.0"), .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.3.0") @@ -82,7 +82,7 @@ let package = Package( name: "SensorsService", dependencies: [ "Models", - "MQTTConnectionService", + "MQTTConnectionManager", "TopicDependencies", .product(name: "Dependencies", package: "swift-dependencies"), .product(name: "DependenciesMacros", package: "swift-dependencies"), diff --git a/Sources/MQTTConnectionManager/Live.swift b/Sources/MQTTConnectionManager/Live.swift index 723c17b..47e1fa2 100644 --- a/Sources/MQTTConnectionManager/Live.swift +++ b/Sources/MQTTConnectionManager/Live.swift @@ -33,6 +33,17 @@ public struct MQTTConnectionManager: Sendable { /// Create a stream of connection events. public var stream: @Sendable () throws -> AsyncStream + /// Perform an operation with the underlying MQTTClient, this can be useful in + /// tests, so this module needs imported with `@_spi(Testing) import` to use this method. + private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void + + @_spi(Internal) + public func withClient( + _ callback: @Sendable (MQTTClient) async throws -> Void + ) async throws { + try await _withClient(callback) + } + /// Represents connection events that clients can listen for and /// react accordingly. public enum Event: Sendable { @@ -61,6 +72,8 @@ public struct MQTTConnectionManager: Sendable { .start() .removeDuplicates() .eraseToStream() + } _withClient: { callback in + try await callback(client) } } } @@ -73,18 +86,19 @@ extension MQTTConnectionManager: TestDependencyKey { // MARK: - Helpers -final class MQTTConnectionStream: AsyncSequence, Sendable { +@_spi(Internal) +public final actor MQTTConnectionStream: Sendable { - typealias AsyncIterator = AsyncStream.AsyncIterator - typealias Element = MQTTConnectionManager.Event + public typealias Element = MQTTConnectionManager.Event private let client: MQTTClient private let continuation: AsyncStream.Continuation private let logger: Logger? private let name: String private let stream: AsyncStream + private var isShuttingDown = false - init(client: MQTTClient, logger: Logger?) { + public init(client: MQTTClient, logger: Logger?) { let (stream, continuation) = AsyncStream.makeStream() self.client = client self.continuation = continuation @@ -95,12 +109,19 @@ final class MQTTConnectionStream: AsyncSequence, Sendable { deinit { stop() } - func start( - isolation: isolated (any Actor)? = #isolation - ) -> AsyncStream { - // Check if the client is active and yield the result. + public nonisolated func start() -> AsyncStream { + // Check if the client is active and yield the initial result. continuation.yield(client.isActive() ? .connected : .disconnected) + // Continually check if the client is active. + let task = Task { + let isShuttingDown = await self.isShuttingDown + while !Task.isCancelled, !isShuttingDown { + try await Task.sleep(for: .milliseconds(100)) + continuation.yield(client.isActive() ? .connected : .disconnected) + } + } + // Register listener on the client for when the connection // closes. client.addCloseListener(named: name) { _ in @@ -111,24 +132,26 @@ final class MQTTConnectionStream: AsyncSequence, Sendable { // Register listener on the client for when the client // is shutdown. client.addShutdownListener(named: name) { _ in - self.logger?.trace("Client is shutting down.") + self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)") self.continuation.yield(.shuttingDown) + Task { await self.setIsShuttingDown() } + task.cancel() self.stop() } return stream } - func stop() { + private func setIsShuttingDown() { + isShuttingDown = true + } + + public nonisolated func stop() { client.removeCloseListener(named: name) client.removeShutdownListener(named: name) continuation.finish() } - public __consuming func makeAsyncIterator() -> AsyncIterator { - start().makeAsyncIterator() - } - } actor ConnectionManager { @@ -160,13 +183,12 @@ actor ConnectionManager { } func connect( - isolation: isolated (any Actor)? = #isolation, cleanSession: Bool ) async throws { - guard !(await hasConnected) else { return } + guard !hasConnected else { return } do { try await client.connect(cleanSession: cleanSession) - await setHasConnected() + setHasConnected() client.addCloseListener(named: name) { [weak self] _ in guard let `self` else { return } diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index 3e65664..978cfc2 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -3,6 +3,7 @@ import DependenciesMacros import Foundation import Logging import Models +import MQTTConnectionManager import MQTTNIO import NIO import PsychrometricClient @@ -16,6 +17,7 @@ import TopicDependencies /// public actor SensorsService: Service { + @Dependency(\.mqttConnectionManager.stream) var connectionStream @Dependency(\.topicListener) var topicListener @Dependency(\.topicPublisher) var topicPublisher @@ -55,25 +57,41 @@ public actor SensorsService: Service { public func run() async throws { precondition(sensors.count > 0, "Sensors should not be empty.") - let stream = try await makeStream() - - await withGracefulShutdownHandler { - await withDiscardingTaskGroup { group in - for await result in stream.cancelOnGracefulShutdown() { - logger?.trace("Received result for topic: \(result.topic)") - group.addTask { await self.handleResult(result) } + try await withGracefulShutdownHandler { + // Listen for connection events, so that we can automatically + // reconnect any sensor topics we're listening to upon a disconnect / reconnect + // event. We can also shutdown any topic listeners upon a shutdown event. + for await event in try connectionStream().cancelOnGracefulShutdown() { + switch event { + case .shuttingDown: + logger?.debug("Received shutdown event.") + try await self.shutdown() + case .disconnected: + logger?.debug("Received disconnected event.") + try await Task.sleep(for: .milliseconds(100)) + case .connected: + logger?.debug("Received connected event.") + let stream = try await makeStream() + for await result in stream.cancelOnGracefulShutdown() { + logger?.debug("Received result for topic: \(result.topic)") + await self.handleResult(result) + } } - // group.cancelAll() } } onGracefulShutdown: { Task { - self.logger?.trace("Received graceful shutdown.") - try? await self.publishUpdates() - await self.topicListener.shutdown() + self.logger?.debug("Received graceful shutdown.") + try await self.shutdown() } } } + @_spi(Internal) + public func shutdown() async throws { + try await publishUpdates() + topicListener.shutdown() + } + private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> { try await topicListener.listen(to: topics) // ignore errors, so that we continue to listen, but log them @@ -81,7 +99,7 @@ public actor SensorsService: Service { .compactMap { result in switch result { case let .failure(error): - self.logger?.trace("Received error listening for sensors: \(error)") + self.logger?.debug("Received error listening for sensors: \(error)") return nil case let .success(info): return (info.payload, info.topicName) @@ -100,7 +118,7 @@ public actor SensorsService: Service { do { let topic = result.topic assert(topics.contains(topic)) - logger?.trace("Begin handling result for topic: \(topic)") + logger?.debug("Begin handling result for topic: \(topic)") func decode(_: V.Type) -> V? { var buffer = result.buffer @@ -108,28 +126,28 @@ public actor SensorsService: Service { } if topic.contains("temperature") { - logger?.trace("Begin handling temperature result.") + logger?.debug("Begin handling temperature result.") guard let temperature = decode(DryBulb.self) else { - logger?.trace("Failed to decode temperature: \(result.buffer)") + logger?.debug("Failed to decode temperature: \(result.buffer)") throw DecodingError() } - logger?.trace("Decoded temperature: \(temperature)") + logger?.debug("Decoded temperature: \(temperature)") try sensors.update(topic: topic, keyPath: \.temperature, with: temperature) } else if topic.contains("humidity") { - logger?.trace("Begin handling humidity result.") + logger?.debug("Begin handling humidity result.") guard let humidity = decode(RelativeHumidity.self) else { - logger?.trace("Failed to decode humidity: \(result.buffer)") + logger?.debug("Failed to decode humidity: \(result.buffer)") throw DecodingError() } - logger?.trace("Decoded humidity: \(humidity)") + logger?.debug("Decoded humidity: \(humidity)") try sensors.update(topic: topic, keyPath: \.humidity, with: humidity) } try await publishUpdates() - logger?.trace("Done handling result for topic: \(topic)") + logger?.debug("Done handling result for topic: \(topic)") } catch { - logger?.error("Received error: \(error)") + logger?.error("Received error while handling result: \(error)") } } @@ -141,7 +159,7 @@ public actor SensorsService: Service { qos: .exactlyOnce, retain: true ) - logger?.trace("Published update to topic: \(topic)") + logger?.debug("Published update to topic: \(topic)") } private func publishUpdates() async throws { diff --git a/Sources/TopicDependencies/TopicListener.swift b/Sources/TopicDependencies/TopicListener.swift index c3c986f..6a50059 100644 --- a/Sources/TopicDependencies/TopicListener.swift +++ b/Sources/TopicDependencies/TopicListener.swift @@ -10,7 +10,7 @@ import MQTTNIO @DependencyClient public struct TopicListener: Sendable { - public typealias Stream = AsyncStream> + public typealias Stream = AsyncStream> /// Create an async stream that listens for changes to the given topics. private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream @@ -82,6 +82,7 @@ public extension DependencyValues { // MARK: - Helpers private actor MQTTTopicListener { + private let client: MQTTClient private let continuation: TopicListener.Stream.Continuation private let name: String @@ -116,12 +117,12 @@ private actor MQTTTopicListener { func listen( _ topics: [String], _ qos: MQTTQoS = .atLeastOnce - ) async throws(TopicListenerError) -> TopicListener.Stream { + ) async throws -> TopicListener.Stream { var sleepTimes = 0 while !client.isActive() { guard sleepTimes < 10 else { - throw .connectionTimeout + throw TopicListenerError.connectionTimeout } try? await Task.sleep(for: .milliseconds(100)) sleepTimes += 1 @@ -135,7 +136,7 @@ private actor MQTTTopicListener { guard subscription != nil else { client.logger.error("Error subscribing to topics: \(topics)") - throw .failedToSubscribe + throw TopicListenerError.failedToSubscribe } client.logger.trace("Done subscribing, begin listening to topics.") @@ -144,7 +145,7 @@ private actor MQTTTopicListener { switch result { case let .failure(error): self.client.logger.error("Received error while listening: \(error)") - self.continuation.yield(.failure(.init(error))) + self.continuation.yield(.failure(MQTTListenResultError(error))) case let .success(publishInfo): if topics.contains(publishInfo.topicName) { self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)") @@ -169,6 +170,8 @@ private actor MQTTTopicListener { } } +// MARK: - Errors + public enum TopicListenerError: Error { case connectionTimeout case failedToSubscribe diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift index 060c6e3..1a4125c 100644 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift @@ -1,6 +1,7 @@ +import AsyncAlgorithms import Logging import Models -@testable import MQTTConnectionManager +@_spi(Internal) import MQTTConnectionManager import MQTTConnectionService import MQTTNIO import NIO @@ -18,23 +19,6 @@ final class MQTTConnectionServiceTests: XCTestCase { return logger }() -// func testGracefulShutdownWorks() async throws { -// let client = createClient(identifier: "testGracefulShutdown") -// let service = MQTTConnectionService(client: client) -// await service.connect() -// try await Task.sleep(for: .seconds(1)) -// XCTAssert(client.isActive()) -// service.shutdown() -// XCTAssertFalse(client.isActive()) -// } - - func testWhatHappensIfConnectIsCalledMultipleTimes() async throws { - let client = createClient(identifier: "testWhatHappensIfConnectIsCalledMultipleTimes") - let manager = MQTTConnectionManager.live(client: client) - try await manager.connect() - try await manager.connect() - } - // TODO: Move to integration tests. func testMQTTConnectionStream() async throws { let client = createClient(identifier: "testNonManagedStream") @@ -43,8 +27,13 @@ final class MQTTConnectionServiceTests: XCTestCase { logger: Self.logger, alwaysReconnect: false ) - let stream = MQTTConnectionStream(client: client, logger: Self.logger) - var events = [MQTTConnectionManager.Event]() + let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger) + let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger) + var events1 = [MQTTConnectionManager.Event]() + var events2 = [MQTTConnectionManager.Event]() + + let stream1 = connectionStream1.start() + let stream2 = connectionStream2.start() _ = try await manager.connect() @@ -55,17 +44,22 @@ final class MQTTConnectionServiceTests: XCTestCase { try await Task.sleep(for: .milliseconds(200)) manager.shutdown() try await client.disconnect() - try await Task.sleep(for: .milliseconds(200)) + try await Task.sleep(for: .seconds(1)) try await client.shutdown() - try await Task.sleep(for: .milliseconds(200)) - stream.stop() + try await Task.sleep(for: .seconds(1)) + connectionStream1.stop() + connectionStream2.stop() } - for await event in stream.removeDuplicates() { - events.append(event) + for await event in stream1.removeDuplicates() { + events1.append(event) + } + for await event in stream2.removeDuplicates() { + events2.append(event) } - XCTAssertEqual(events, [.disconnected, .connected, .disconnected, .shuttingDown]) + XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown]) + XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown]) } func createClient(identifier: String) -> MQTTClient { diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/SensorsServiceTests/SensorsClientTests.swift index 2b557be..a935cae 100755 --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/SensorsServiceTests/SensorsClientTests.swift @@ -1,10 +1,11 @@ import Dependencies import Logging import Models +@_spi(Internal) import MQTTConnectionManager import MQTTNIO import NIO import PsychrometricClientLive -@testable import SensorsService +@_spi(Internal) import SensorsService import TopicDependencies import XCTest @@ -14,214 +15,74 @@ final class SensorsClientTests: XCTestCase { static let logger: Logger = { var logger = Logger(label: "SensorsClientTests") - logger.logLevel = .debug + logger.logLevel = .trace return logger }() override func invokeTest() { + let client = createClient(identifier: "\(Self.self)") + withDependencies { + $0.mqttConnectionManager = .live(client: client, logger: Self.logger) $0.psychrometricClient = PsychrometricClient.liveValue + $0.topicListener = .live(client: client) + $0.topicPublisher = .live(client: client) } operation: { super.invokeTest() } } - func testWhatHappensIfClientDisconnectsWhileListening() async throws { - let client = createClient(identifier: "testWhatHappensIfClientDisconnectsWhileListening") - let listener = TopicListener.live(client: client) - try await client.connect() + func testListeningResumesAfterDisconnectThenReconnect() async throws { + @Dependency(\.mqttConnectionManager) var manager + struct TimeoutError: Error {} - let stream = try await listener.listen("/some/topic") + let sensor = TemperatureAndHumiditySensor(location: .return) + var results = [TopicPublisher.PublishRequest]() -// try await Task.sleep(for: .seconds(1)) -// try await client.disconnect() -// -// try await client.connect() -// try await Task.sleep(for: .seconds(1)) - try await client.publish( - to: "/some/topic", - payload: ByteBufferAllocator().buffer(string: "Foo"), - qos: .atLeastOnce, - retain: true - ) - try await Task.sleep(for: .seconds(1)) + try await withDependencies { + $0.topicPublisher = .capturing { results.append($0) } + } operation: { + let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger) + let task = Task { try await sensorsService.run() } + defer { task.cancel() } - listener.shutdown() - try await client.shutdown() + try await manager.connect() + defer { manager.shutdown() } + + try await manager.withClient { client in + try await client.disconnect() + try await client.connect() + try await Task.sleep(for: .milliseconds(100)) + try await client.publish( + to: sensor.topics.temperature, + payload: ByteBufferAllocator().buffer(string: "25"), + qos: .atLeastOnce, + retain: false + ) + try await client.publish( + to: sensor.topics.humidity, + payload: ByteBufferAllocator().buffer(string: "50"), + qos: .atLeastOnce, + retain: false + ) + } + + var timeoutCount = 0 + while !(results.count == 2) { + guard timeoutCount < 20 else { + throw TimeoutError() + } + try await Task.sleep(for: .milliseconds(100)) + timeoutCount += 1 + } + + XCTAssertEqual(results.count, 2) + XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint })) + XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy })) + try await sensorsService.shutdown() + } } -// func testConnectAndShutdown() async throws { -// let client = createClient(identifier: "testConnectAndShutdown") -// await client.connect() -// await client.shutdown() -// } - -// func testSensorService() async throws { -// let mqtt = createClient(identifier: "testSensorService") -// // let mqtt = await client.client -// let sensor = TemperatureAndHumiditySensor(location: .mixedAir) -// let publishInfo = PublishInfoContainer(topicFilters: [ -// sensor.topics.dewPoint, -// sensor.topics.enthalpy -// ]) -// let service = SensorsService(client: mqtt, sensors: [sensor]) -// -// // fix to connect the mqtt client. -// try await mqtt.connect() -// let task = Task { try await service.run() } -// -// _ = try await mqtt.subscribe(to: [ -// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: .exactlyOnce), -// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: .exactlyOnce) -// ]) -// -// let listener = mqtt.createPublishListener() -// Task { -// for await result in listener { -// switch result { -// case let .failure(error): -// XCTFail("\(error)") -// case let .success(value): -// await publishInfo.addPublishInfo(value) -// } -// } -// } -// -// try await mqtt.publish( -// to: sensor.topics.temperature, -// payload: ByteBufferAllocator().buffer(string: "75.123"), -// qos: MQTTQoS.exactlyOnce, -// retain: true -// ) -// -// try await Task.sleep(for: .seconds(1)) -// -// // XCTAssert(client.sensors.first!.needsProcessed) - // // let firstSensor = await client.sensors.first! - // // XCTAssertEqual(firstSensor.temperature, .init(75.123, units: .celsius)) -// -// try await mqtt.publish( -// to: sensor.topics.humidity, -// payload: ByteBufferAllocator().buffer(string: "50"), -// qos: MQTTQoS.exactlyOnce, -// retain: true -// ) -// -// try await Task.sleep(for: .seconds(1)) -// -// // not working for some reason -// // XCTAssertEqual(publishInfo.info.count, 2) -// -// XCTAssert(publishInfo.info.count > 1) -// -// // fix to shutdown the mqtt client. -// task.cancel() -// try await mqtt.shutdown() -// } - -// func testCapturingSensorClient() async throws { -// class CapturedValues { -// var values = [(value: Double, topic: String)]() -// var didShutdown = false -// -// init() {} -// } -// -// let capturedValues = CapturedValues() -// -// try await withDependencies { -// $0.sensorsClient = .testing( -// yielding: [ -// (value: 76, to: "not-listening"), -// (value: 75, to: "test") -// ] -// ) { value, topic in -// capturedValues.values.append((value, topic)) -// } captureShutdownEvent: { -// capturedValues.didShutdown = $0 -// } -// } operation: { -// @Dependency(\.sensorsClient) var client -// let stream = try await client.listen(to: ["test"]) -// -// for await result in stream { -// var buffer = result.buffer -// guard let double = Double(buffer: &buffer) else { -// XCTFail("Failed to decode double") -// return -// } -// -// XCTAssertEqual(double, 75) -// XCTAssertEqual(result.topic, "test") -// try await client.publish(26, to: "publish") -// try await Task.sleep(for: .milliseconds(100)) -// client.shutdown() -// } -// -// XCTAssertEqual(capturedValues.values.count, 1) -// XCTAssertEqual(capturedValues.values.first?.value, 26) -// XCTAssertEqual(capturedValues.values.first?.topic, "publish") -// XCTAssertTrue(capturedValues.didShutdown) -// } -// } -// -// func testSensorCapturesPublishedState() async throws { -// let client = createClient(identifier: "testSensorCapturesPublishedState") -// let mqtt = client.client -// let sensor = TemperatureAndHumiditySensor(location: .mixedAir) -// let publishInfo = PublishInfoContainer(topicFilters: [ -// sensor.topics.dewPoint, -// sensor.topics.enthalpy -// ]) -// -// try await client.addSensor(sensor) -// await client.connect() -// try await client.start() -// -// _ = try await mqtt.subscribe(to: [ -// MQTTSubscribeInfo(topicFilter: sensor.topics.dewPoint, qos: MQTTQoS.exactlyOnce), -// MQTTSubscribeInfo(topicFilter: sensor.topics.enthalpy, qos: MQTTQoS.exactlyOnce) -// ]) -// -// let listener = mqtt.createPublishListener() -// Task { -// for await result in listener { -// switch result { -// case let .failure(error): -// XCTFail("\(error)") -// case let .success(value): -// await publishInfo.addPublishInfo(value) -// } -// } -// } -// -// try await mqtt.publish( -// to: sensor.topics.temperature, -// payload: ByteBufferAllocator().buffer(string: "75.123"), -// qos: MQTTQoS.exactlyOnce, -// retain: true -// ) -// -// try await Task.sleep(for: .seconds(1)) -// -// // XCTAssert(client.sensors.first!.needsProcessed) -// let firstSensor = client.sensors.first! -// XCTAssertEqual(firstSensor.temperature, DryBulb.celsius(75.123)) -// -// try await mqtt.publish( -// to: sensor.topics.humidity, -// payload: ByteBufferAllocator().buffer(string: "50"), -// qos: MQTTQoS.exactlyOnce, -// retain: true -// ) -// -// try await Task.sleep(for: .seconds(1)) -// -// XCTAssertEqual(publishInfo.info.count, 2) -// -// await client.shutdown() -// } - func createClient(identifier: String) -> MQTTClient { let envVars = EnvVars( appEnv: .testing, @@ -252,16 +113,6 @@ final class SensorsClientTests: XCTestCase { // MARK: Helpers for tests. -extension AsyncStream { - func first() async -> Element { - var first: Element - for await value in self { - first = value - } - return first - } -} - class PublishInfoContainer { private(set) var info: [MQTTPublishInfo] private var topicFilters: [String]? @@ -282,6 +133,14 @@ class PublishInfoContainer { } } +extension TopicPublisher { + static func capturing( + _ callback: @escaping (PublishRequest) -> Void + ) -> Self { + .init { callback($0) } + } +} + // extension SensorsClient { // // static func testing( diff --git a/docker-compose.yaml b/docker-compose.yaml index 81008c6..2391da5 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -10,7 +10,6 @@ services: build: context: . dockerfile: Dockerfile - platform: linux/amd64 depends_on: - mosquitto environment: @@ -20,7 +19,6 @@ services: build: context: . dockerfile: Dockerfile.test - platform: linux/amd64 working_dir: /app networks: - test