import AsyncAlgorithms import Logging import Models @_spi(Internal) import MQTTConnectionManager import MQTTConnectionService import MQTTNIO import NIO import ServiceLifecycle import ServiceLifecycleTestKit import XCTest final class MQTTConnectionServiceTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" static let logger: Logger = { var logger = Logger(label: "MQTTConnectionServiceTests") logger.logLevel = .trace return logger }() // TODO: Move to integration tests. func testMQTTConnectionStream() async throws { let client = createClient(identifier: "testNonManagedStream") let manager = MQTTConnectionManager.live( client: client, logger: Self.logger, alwaysReconnect: false ) let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger) let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger) var events1 = [MQTTConnectionManager.Event]() var events2 = [MQTTConnectionManager.Event]() let stream1 = connectionStream1.start() let stream2 = connectionStream2.start() _ = try await manager.connect() Task { while !client.isActive() { try await Task.sleep(for: .milliseconds(100)) } try await Task.sleep(for: .milliseconds(200)) manager.shutdown() try await client.disconnect() try await Task.sleep(for: .seconds(1)) try await client.shutdown() try await Task.sleep(for: .seconds(1)) connectionStream1.stop() connectionStream2.stop() } for await event in stream1.removeDuplicates() { events1.append(event) } for await event in stream2.removeDuplicates() { events2.append(event) } XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown]) XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown]) } func createClient(identifier: String) -> MQTTClient { let envVars = EnvVars( appEnv: .testing, host: Self.hostname, port: "1883", identifier: identifier, userName: nil, password: nil ) let config = MQTTClient.Configuration( version: .v3_1_1, userName: envVars.userName, password: envVars.password, useSSL: false, useWebSockets: false, tlsConfiguration: nil, webSocketURLPath: nil ) return .init( host: Self.hostname, identifier: identifier, eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)), logger: Self.logger, configuration: config ) } }