import Dependencies import Logging import Models @_spi(Internal) import MQTTConnectionManager import MQTTNIO import NIO import PsychrometricClientLive @_spi(Internal) import SensorsService import TopicDependencies import XCTest final class SensorsClientTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let logger: Logger = { var logger = Logger(label: "SensorsClientTests") logger.logLevel = .trace return logger }() override func invokeTest() { let client = createClient(identifier: "\(Self.self)") withDependencies { $0.mqttConnectionManager = .live(client: client, logger: Self.logger) $0.psychrometricClient = PsychrometricClient.liveValue $0.topicListener = .live(client: client) $0.topicPublisher = .live(client: client) } operation: { super.invokeTest() } } func testListeningResumesAfterDisconnectThenReconnect() async throws { @Dependency(\.mqttConnectionManager) var manager struct TimeoutError: Error {} let sensor = TemperatureAndHumiditySensor(location: .return) var results = [TopicPublisher.PublishRequest]() try await withDependencies { $0.topicPublisher = .capturing { results.append($0) } } operation: { let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger) let task = Task { try await sensorsService.run() } defer { task.cancel() } try await manager.connect() defer { manager.shutdown() } try await manager.withClient { client in try await client.disconnect() try await client.connect() try await Task.sleep(for: .milliseconds(100)) try await client.publish( to: sensor.topics.temperature, payload: ByteBufferAllocator().buffer(string: "25"), qos: .atLeastOnce, retain: false ) try await client.publish( to: sensor.topics.humidity, payload: ByteBufferAllocator().buffer(string: "50"), qos: .atLeastOnce, retain: false ) } var timeoutCount = 0 while !(results.count == 2) { guard timeoutCount < 20 else { throw TimeoutError() } try await Task.sleep(for: .milliseconds(100)) timeoutCount += 1 } XCTAssertEqual(results.count, 2) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint })) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy })) try await sensorsService.shutdown() } } func createClient(identifier: String) -> MQTTClient { let envVars = EnvVars( appEnv: .testing, host: Self.hostname, port: "1883", identifier: identifier, userName: nil, password: nil ) let config = MQTTClient.Configuration( version: .v3_1_1, userName: envVars.userName, password: envVars.password, useSSL: false, useWebSockets: false, tlsConfiguration: nil, webSocketURLPath: nil ) return .init( host: Self.hostname, identifier: identifier, eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)), logger: Self.logger, configuration: config ) } } // MARK: Helpers for tests. class PublishInfoContainer { private(set) var info: [MQTTPublishInfo] private var topicFilters: [String]? init(topicFilters: [String]? = nil) { self.info = [] self.topicFilters = topicFilters } func addPublishInfo(_ info: MQTTPublishInfo) async { guard let topicFilters else { self.info.append(info) return } if topicFilters.contains(info.topicName) { self.info.append(info) } } } extension TopicPublisher { static func capturing( _ callback: @escaping (PublishRequest) -> Void ) -> Self { .init { callback($0) } } } // extension SensorsClient { // // static func testing( // yielding: [(value: Double, to: String)], // capturePublishedValues: @escaping (Double, String) -> Void, // captureShutdownEvent: @escaping (Bool) -> Void // ) -> Self { // let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self) // let logger = Logger(label: "\(Self.self).testing") // // return .init( // listen: { topics in // for (value, topic) in yielding where topics.contains(topic) { // continuation.yield( // (buffer: ByteBuffer(string: "\(value)"), topic: topic) // ) // } // return stream // }, // logger: logger, // publish: { value, topic in // capturePublishedValues(value, topic) // }, // shutdown: { // captureShutdownEvent(true) // continuation.finish() // } // ) // } // } struct TopicNotFoundError: Error {}