Files
swift-mqtt-dewpoint/Tests/IntegrationTests/IntegrationTests.swift
Michael Housh ce18c44363
Some checks failed
CI / Run Tests (push) Failing after 3m7s
feat: Working on cli client and tests
2024-11-17 22:23:44 -05:00

215 lines
6.0 KiB
Swift

import Dependencies
// @_spi(Internal) import dewpoint_controller
import Logging
import Models
import MQTTConnectionService
@_spi(Internal) import MQTTManager
import MQTTNIO
import NIO
import PsychrometricClientLive
@_spi(Internal) import SensorsService
import ServiceLifecycle
import ServiceLifecycleTestKit
import XCTest
final class IntegrationTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
static let logger: Logger = {
var logger = Logger(label: "IntegrationTests")
logger.logLevel = .info
return logger
}()
override func invokeTest() {
let client = createClient(identifier: "\(Self.self)")
withDependencies {
$0.mqtt = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue
} operation: {
super.invokeTest()
}
}
func testConnectionServiceShutdown() async throws {
@Dependency(\.mqtt) var mqtt
do {
let service = MQTTConnectionService(logger: Self.logger)
let task = Task { try await service.run() }
defer { task.cancel() }
try await Task.sleep(for: .milliseconds(200))
// check the connection is active here.
try await mqtt.withClient { client in
XCTAssert(client.isActive())
}
mqtt.shutdown()
try await Task.sleep(for: .milliseconds(500))
// check the connection is active here.
try await mqtt.withClient { client in
XCTAssertFalse(client.isActive())
}
} catch {
mqtt.shutdown()
try await Task.sleep(for: .milliseconds(500))
}
}
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
defer { manager.shutdown() }
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTManager.Event]()
var events2 = [MQTTManager.Event]()
let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
try await client.disconnect()
try await Task.sleep(for: .milliseconds(500))
manager.shutdown()
try await Task.sleep(for: .milliseconds(500))
connectionStream1.stop()
connectionStream2.stop()
}
for await event in stream1.removeDuplicates() {
events1.append(event)
}
for await event in stream2.removeDuplicates() {
events2.append(event)
}
XCTAssertEqual(events1, [.disconnected, .connected, .disconnected, .shuttingDown])
XCTAssertEqual(events2, [.disconnected, .connected, .disconnected, .shuttingDown])
}
func testListeningResumesAfterDisconnectThenReconnect() async throws {
struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return)
let results = ResultContainer()
try await withDependencies {
$0.mqtt.publish = results.append
} operation: {
@Dependency(\.mqtt) var manager
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() }
defer { task.cancel() }
try await manager.connect()
defer { manager.shutdown() }
try await manager.withClient { client in
try await client.disconnect()
try await client.connect()
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
// Give time to re-subscribe.
try await Task.sleep(for: .milliseconds(200))
try await client.publish(
to: sensor.topics.temperature,
payload: ByteBufferAllocator().buffer(string: "25"),
qos: .atLeastOnce,
retain: false
)
try await client.publish(
to: sensor.topics.humidity,
payload: ByteBufferAllocator().buffer(string: "50"),
qos: .atLeastOnce,
retain: false
)
}
var timeoutCount = 0
while !(await results.count == 2) {
guard timeoutCount < 20 else {
throw TimeoutError()
}
try await Task.sleep(for: .milliseconds(100))
timeoutCount += 1
}
let results = await results.results()
XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
try await sensorsService.shutdown()
}
}
func createClient(identifier: String) -> MQTTClient {
let envVars = EnvVars(
appEnv: .testing,
host: Self.hostname,
port: "1883",
identifier: identifier,
userName: nil,
password: nil
)
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
// return .init(envVars: envVars, eventLoopGroup: eventLoopGroup, logger: Self.logger)
let config = MQTTClient.Configuration(
version: .v3_1_1,
userName: envVars.userName,
password: envVars.password,
useSSL: false,
useWebSockets: false,
tlsConfiguration: nil,
webSocketURLPath: nil
)
return .init(
host: Self.hostname,
identifier: identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: Self.logger,
configuration: config
)
}
}
// - MARK: Helpers
struct TopicNotFoundError: Error {}
actor ResultContainer: Sendable {
private var storage = [MQTTManager.PublishRequest]()
init() {}
@Sendable func append(_ result: MQTTManager.PublishRequest) async {
storage.append(result)
}
var count: Int {
get async { storage.count }
}
func results() async -> [MQTTManager.PublishRequest] {
storage
}
}