From b8992b89b648f41ac15298a43ff7dae5b0245e98 Mon Sep 17 00:00:00 2001 From: Michael Housh Date: Wed, 13 Nov 2024 10:06:28 -0500 Subject: [PATCH] feat: Adds MQTT connection stream, need to clean up the manager and remove stream from it. --- Package.swift | 1 + Sources/MQTTConnectionManagerLive/Live.swift | 52 ++++++ .../MQTTConnectionServiceTests.swift | 45 +++++- .../SensorsClientTests.swift | 152 +++++++++--------- 4 files changed, 166 insertions(+), 84 deletions(-) diff --git a/Package.swift b/Package.swift index 3653fb4..0585415 100755 --- a/Package.swift +++ b/Package.swift @@ -71,6 +71,7 @@ let package = Package( name: "MQTTConnectionServiceTests", dependencies: [ "MQTTConnectionService", + "MQTTConnectionManagerLive", .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") ] ), diff --git a/Sources/MQTTConnectionManagerLive/Live.swift b/Sources/MQTTConnectionManagerLive/Live.swift index 68eac47..6c8c5f5 100644 --- a/Sources/MQTTConnectionManagerLive/Live.swift +++ b/Sources/MQTTConnectionManagerLive/Live.swift @@ -25,6 +25,58 @@ public extension MQTTConnectionManager { // MARK: - Helpers +final class MQTTConnectionStream: Sendable { + private let client: MQTTClient + private let continuation: AsyncStream.Continuation + private var logger: Logger { client.logger } + private let name: String + private let stream: AsyncStream + + init(client: MQTTClient) { + let (stream, continuation) = AsyncStream.makeStream() + self.client = client + self.continuation = continuation + self.name = UUID().uuidString + self.stream = stream + continuation.yield(client.isActive() ? .connected : .disconnected) + } + + deinit { stop() } + + func start() -> AsyncStream { + client.addCloseListener(named: name) { _ in + self.logger.trace("Client has disconnected.") + self.continuation.yield(.disconnected) + } + client.addShutdownListener(named: name) { _ in + self.logger.trace("Client is shutting down.") + self.continuation.yield(.shuttingDown) + self.stop() + } + let task = Task { + while !Task.isCancelled { + try? await Task.sleep(for: .milliseconds(100)) + continuation.yield( + self.client.isActive() ? .connected : .disconnected + ) + } + } + continuation.onTermination = { _ in + task.cancel() + } + return stream + } + + func stop() { + client.removeCloseListener(named: name) + client.removeShutdownListener(named: name) + continuation.finish() + } + +} + +// TODO: Remove stream stuff from this. + private actor ConnectionManager { private let client: MQTTClient private let continuation: AsyncStream.Continuation diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift index 75f7cd0..1411b5a 100644 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift @@ -1,5 +1,6 @@ import Logging import Models +@testable import MQTTConnectionManagerLive @testable import MQTTConnectionService import MQTTNIO import NIO @@ -17,14 +18,42 @@ final class MQTTConnectionServiceTests: XCTestCase { return logger }() - func testGracefulShutdownWorks() async throws { - let client = createClient(identifier: "testGracefulShutdown") - let service = MQTTConnectionService(client: client) - await service.connect() - try await Task.sleep(for: .seconds(1)) - XCTAssert(client.isActive()) - service.shutdown() - XCTAssertFalse(client.isActive()) +// func testGracefulShutdownWorks() async throws { +// let client = createClient(identifier: "testGracefulShutdown") +// let service = MQTTConnectionService(client: client) +// await service.connect() +// try await Task.sleep(for: .seconds(1)) +// XCTAssert(client.isActive()) +// service.shutdown() +// XCTAssertFalse(client.isActive()) +// } + + func testMQTTConnectionStream() async throws { + let client = createClient(identifier: "testNonManagedStream") + let manager = MQTTConnectionManager.live(client: client, logger: Self.logger) + let stream = MQTTConnectionStream(client: client) + var events = [MQTTConnectionManager.Event]() + + _ = try await manager.connect() + + Task { + while !client.isActive() { + try await Task.sleep(for: .milliseconds(100)) + } + try await Task.sleep(for: .milliseconds(200)) + manager.shutdown() + try await client.disconnect() + try await Task.sleep(for: .milliseconds(200)) + stream.stop() + } + + for await event in stream.start().removeDuplicates() { + events.append(event) + } + + XCTAssertEqual(events, [.disconnected, .connected, .disconnected]) + + try await client.shutdown() } func createClient(identifier: String) -> MQTTClient { diff --git a/Tests/SensorsServiceTests/SensorsClientTests.swift b/Tests/SensorsServiceTests/SensorsClientTests.swift index 5f02291..99a84c2 100755 --- a/Tests/SensorsServiceTests/SensorsClientTests.swift +++ b/Tests/SensorsServiceTests/SensorsClientTests.swift @@ -132,52 +132,52 @@ final class SensorsClientTests: XCTestCase { // try await mqtt.shutdown() // } - func testCapturingSensorClient() async throws { - class CapturedValues { - var values = [(value: Double, topic: String)]() - var didShutdown = false - - init() {} - } - - let capturedValues = CapturedValues() - - try await withDependencies { - $0.sensorsClient = .testing( - yielding: [ - (value: 76, to: "not-listening"), - (value: 75, to: "test") - ] - ) { value, topic in - capturedValues.values.append((value, topic)) - } captureShutdownEvent: { - capturedValues.didShutdown = $0 - } - } operation: { - @Dependency(\.sensorsClient) var client - let stream = try await client.listen(to: ["test"]) - - for await result in stream { - var buffer = result.buffer - guard let double = Double(buffer: &buffer) else { - XCTFail("Failed to decode double") - return - } - - XCTAssertEqual(double, 75) - XCTAssertEqual(result.topic, "test") - try await client.publish(26, to: "publish") - try await Task.sleep(for: .milliseconds(100)) - client.shutdown() - } - - XCTAssertEqual(capturedValues.values.count, 1) - XCTAssertEqual(capturedValues.values.first?.value, 26) - XCTAssertEqual(capturedValues.values.first?.topic, "publish") - XCTAssertTrue(capturedValues.didShutdown) - } - } - +// func testCapturingSensorClient() async throws { +// class CapturedValues { +// var values = [(value: Double, topic: String)]() +// var didShutdown = false +// +// init() {} +// } +// +// let capturedValues = CapturedValues() +// +// try await withDependencies { +// $0.sensorsClient = .testing( +// yielding: [ +// (value: 76, to: "not-listening"), +// (value: 75, to: "test") +// ] +// ) { value, topic in +// capturedValues.values.append((value, topic)) +// } captureShutdownEvent: { +// capturedValues.didShutdown = $0 +// } +// } operation: { +// @Dependency(\.sensorsClient) var client +// let stream = try await client.listen(to: ["test"]) +// +// for await result in stream { +// var buffer = result.buffer +// guard let double = Double(buffer: &buffer) else { +// XCTFail("Failed to decode double") +// return +// } +// +// XCTAssertEqual(double, 75) +// XCTAssertEqual(result.topic, "test") +// try await client.publish(26, to: "publish") +// try await Task.sleep(for: .milliseconds(100)) +// client.shutdown() +// } +// +// XCTAssertEqual(capturedValues.values.count, 1) +// XCTAssertEqual(capturedValues.values.first?.value, 26) +// XCTAssertEqual(capturedValues.values.first?.topic, "publish") +// XCTAssertTrue(capturedValues.didShutdown) +// } +// } +// // func testSensorCapturesPublishedState() async throws { // let client = createClient(identifier: "testSensorCapturesPublishedState") // let mqtt = client.client @@ -258,35 +258,35 @@ class PublishInfoContainer { } } -extension SensorsClient { - - static func testing( - yielding: [(value: Double, to: String)], - capturePublishedValues: @escaping (Double, String) -> Void, - captureShutdownEvent: @escaping (Bool) -> Void - ) -> Self { - let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self) - let logger = Logger(label: "\(Self.self).testing") - - return .init( - listen: { topics in - for (value, topic) in yielding where topics.contains(topic) { - continuation.yield( - (buffer: ByteBuffer(string: "\(value)"), topic: topic) - ) - } - return stream - }, - logger: logger, - publish: { value, topic in - capturePublishedValues(value, topic) - }, - shutdown: { - captureShutdownEvent(true) - continuation.finish() - } - ) - } -} +// extension SensorsClient { +// +// static func testing( +// yielding: [(value: Double, to: String)], +// capturePublishedValues: @escaping (Double, String) -> Void, +// captureShutdownEvent: @escaping (Bool) -> Void +// ) -> Self { +// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self) +// let logger = Logger(label: "\(Self.self).testing") +// +// return .init( +// listen: { topics in +// for (value, topic) in yielding where topics.contains(topic) { +// continuation.yield( +// (buffer: ByteBuffer(string: "\(value)"), topic: topic) +// ) +// } +// return stream +// }, +// logger: logger, +// publish: { value, topic in +// capturePublishedValues(value, topic) +// }, +// shutdown: { +// captureShutdownEvent(true) +// continuation.finish() +// } +// ) +// } +// } struct TopicNotFoundError: Error {}