import Dependencies // @_spi(Internal) import dewpoint_controller import Logging import Models import MQTTConnectionService @_spi(Internal) import MQTTManager import MQTTNIO import NIO import PsychrometricClientLive @_spi(Internal) import SensorsService import ServiceLifecycle import ServiceLifecycleTestKit import XCTest final class IntegrationTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let logger: Logger = { var logger = Logger(label: "IntegrationTests") logger.logLevel = .info return logger }() override func invokeTest() { let client = createClient(identifier: "\(Self.self)") withDependencies { $0.mqtt = .live(client: client, logger: Self.logger) $0.psychrometricClient = PsychrometricClient.liveValue } operation: { super.invokeTest() } } func testConnectionServiceShutdown() async throws { @Dependency(\.mqtt) var mqtt let service = MQTTConnectionService(logger: Self.logger) let task = Task { try await service.run() } defer { task.cancel() } try await Task.sleep(for: .milliseconds(200)) // check the connection is active here. try await mqtt.withClient { client in XCTAssert(client.isActive()) } mqtt.shutdown() try await Task.sleep(for: .milliseconds(500)) // check the connection is active here. try await mqtt.withClient { client in XCTAssertFalse(client.isActive()) } } func testMQTTConnectionStream() async throws { let client = createClient(identifier: "testNonManagedStream") let manager = MQTTManager.live( client: client, logger: Self.logger, alwaysReconnect: false ) defer { manager.shutdown() } let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger) let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger) var events1 = [MQTTManager.Event]() var events2 = [MQTTManager.Event]() let stream1 = connectionStream1.start() let stream2 = connectionStream2.start() _ = try await manager.connect() Task { while !client.isActive() { try await Task.sleep(for: .milliseconds(100)) } try await Task.sleep(for: .milliseconds(200)) try await client.disconnect() try await Task.sleep(for: .milliseconds(500)) manager.shutdown() try await Task.sleep(for: .milliseconds(500)) connectionStream1.stop() connectionStream2.stop() } for await event in stream1.removeDuplicates() { events1.append(event) } for await event in stream2.removeDuplicates() { events2.append(event) } XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown]) XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown]) } func testListeningResumesAfterDisconnectThenReconnect() async throws { struct TimeoutError: Error {} let sensor = TemperatureAndHumiditySensor(location: .return) let results = ResultContainer() try await withDependencies { $0.mqtt.publish = results.append } operation: { @Dependency(\.mqtt) var manager let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger) let task = Task { try await sensorsService.run() } defer { task.cancel() } try await manager.connect() defer { manager.shutdown() } try await manager.withClient { client in try await client.disconnect() try await client.connect() while !client.isActive() { try await Task.sleep(for: .milliseconds(100)) } // Give time to re-subscribe. try await Task.sleep(for: .milliseconds(200)) try await client.publish( to: sensor.topics.temperature, payload: ByteBufferAllocator().buffer(string: "25"), qos: .atLeastOnce, retain: false ) try await client.publish( to: sensor.topics.humidity, payload: ByteBufferAllocator().buffer(string: "50"), qos: .atLeastOnce, retain: false ) } var timeoutCount = 0 while !(await results.count == 2) { guard timeoutCount < 20 else { throw TimeoutError() } try await Task.sleep(for: .milliseconds(100)) timeoutCount += 1 } let results = await results.results() XCTAssertEqual(results.count, 2) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint })) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy })) try await sensorsService.shutdown() } } func createClient(identifier: String) -> MQTTClient { let envVars = EnvVars( appEnv: .testing, host: Self.hostname, port: "1883", identifier: identifier, userName: nil, password: nil ) let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) // return .init(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: Self.logger) let config = MQTTClient.Configuration( version: .v3_1_1, userName: envVars.userName, password: envVars.password, useSSL: false, useWebSockets: false, tlsConfiguration: nil, webSocketURLPath: nil ) return .init( host: Self.hostname, identifier: identifier, eventLoopGroupProvider: .shared(eventLoopGroup), logger: Self.logger, configuration: config ) } } // - MARK: Helpers struct TopicNotFoundError: Error {} actor ResultContainer: Sendable { private var storage = [MQTTManager.PublishRequest]() init() {} @Sendable func append(_ result: MQTTManager.PublishRequest) async { storage.append(result) } var count: Int { get async { storage.count } } func results() async -> [MQTTManager.PublishRequest] { storage } }