feat: Adds MQTT connection stream, need to clean up the manager and remove stream from it.

This commit is contained in:
2024-11-13 10:06:28 -05:00
parent efd9907b4a
commit b8992b89b6
4 changed files with 166 additions and 84 deletions

View File

@@ -71,6 +71,7 @@ let package = Package(
name: "MQTTConnectionServiceTests", name: "MQTTConnectionServiceTests",
dependencies: [ dependencies: [
"MQTTConnectionService", "MQTTConnectionService",
"MQTTConnectionManagerLive",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
] ]
), ),

View File

@@ -25,6 +25,58 @@ public extension MQTTConnectionManager {
// MARK: - Helpers // MARK: - Helpers
final class MQTTConnectionStream: Sendable {
private let client: MQTTClient
private let continuation: AsyncStream<MQTTConnectionManager.Event>.Continuation
private var logger: Logger { client.logger }
private let name: String
private let stream: AsyncStream<MQTTConnectionManager.Event>
init(client: MQTTClient) {
let (stream, continuation) = AsyncStream<MQTTConnectionManager.Event>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
continuation.yield(client.isActive() ? .connected : .disconnected)
}
deinit { stop() }
func start() -> AsyncStream<MQTTConnectionManager.Event> {
client.addCloseListener(named: name) { _ in
self.logger.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
client.addShutdownListener(named: name) { _ in
self.logger.trace("Client is shutting down.")
self.continuation.yield(.shuttingDown)
self.stop()
}
let task = Task {
while !Task.isCancelled {
try? await Task.sleep(for: .milliseconds(100))
continuation.yield(
self.client.isActive() ? .connected : .disconnected
)
}
}
continuation.onTermination = { _ in
task.cancel()
}
return stream
}
func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}
// TODO: Remove stream stuff from this.
private actor ConnectionManager { private actor ConnectionManager {
private let client: MQTTClient private let client: MQTTClient
private let continuation: AsyncStream<MQTTConnectionManager.Event>.Continuation private let continuation: AsyncStream<MQTTConnectionManager.Event>.Continuation

View File

@@ -1,5 +1,6 @@
import Logging import Logging
import Models import Models
@testable import MQTTConnectionManagerLive
@testable import MQTTConnectionService @testable import MQTTConnectionService
import MQTTNIO import MQTTNIO
import NIO import NIO
@@ -17,14 +18,42 @@ final class MQTTConnectionServiceTests: XCTestCase {
return logger return logger
}() }()
func testGracefulShutdownWorks() async throws { // func testGracefulShutdownWorks() async throws {
let client = createClient(identifier: "testGracefulShutdown") // let client = createClient(identifier: "testGracefulShutdown")
let service = MQTTConnectionService(client: client) // let service = MQTTConnectionService(client: client)
await service.connect() // await service.connect()
try await Task.sleep(for: .seconds(1)) // try await Task.sleep(for: .seconds(1))
XCTAssert(client.isActive()) // XCTAssert(client.isActive())
service.shutdown() // service.shutdown()
XCTAssertFalse(client.isActive()) // XCTAssertFalse(client.isActive())
// }
func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live(client: client, logger: Self.logger)
let stream = MQTTConnectionStream(client: client)
var events = [MQTTConnectionManager.Event]()
_ = try await manager.connect()
Task {
while !client.isActive() {
try await Task.sleep(for: .milliseconds(100))
}
try await Task.sleep(for: .milliseconds(200))
manager.shutdown()
try await client.disconnect()
try await Task.sleep(for: .milliseconds(200))
stream.stop()
}
for await event in stream.start().removeDuplicates() {
events.append(event)
}
XCTAssertEqual(events, [.disconnected, .connected, .disconnected])
try await client.shutdown()
} }
func createClient(identifier: String) -> MQTTClient { func createClient(identifier: String) -> MQTTClient {

View File

@@ -132,52 +132,52 @@ final class SensorsClientTests: XCTestCase {
// try await mqtt.shutdown() // try await mqtt.shutdown()
// } // }
func testCapturingSensorClient() async throws { // func testCapturingSensorClient() async throws {
class CapturedValues { // class CapturedValues {
var values = [(value: Double, topic: String)]() // var values = [(value: Double, topic: String)]()
var didShutdown = false // var didShutdown = false
//
init() {} // init() {}
} // }
//
let capturedValues = CapturedValues() // let capturedValues = CapturedValues()
//
try await withDependencies { // try await withDependencies {
$0.sensorsClient = .testing( // $0.sensorsClient = .testing(
yielding: [ // yielding: [
(value: 76, to: "not-listening"), // (value: 76, to: "not-listening"),
(value: 75, to: "test") // (value: 75, to: "test")
] // ]
) { value, topic in // ) { value, topic in
capturedValues.values.append((value, topic)) // capturedValues.values.append((value, topic))
} captureShutdownEvent: { // } captureShutdownEvent: {
capturedValues.didShutdown = $0 // capturedValues.didShutdown = $0
} // }
} operation: { // } operation: {
@Dependency(\.sensorsClient) var client // @Dependency(\.sensorsClient) var client
let stream = try await client.listen(to: ["test"]) // let stream = try await client.listen(to: ["test"])
//
for await result in stream { // for await result in stream {
var buffer = result.buffer // var buffer = result.buffer
guard let double = Double(buffer: &buffer) else { // guard let double = Double(buffer: &buffer) else {
XCTFail("Failed to decode double") // XCTFail("Failed to decode double")
return // return
} // }
//
XCTAssertEqual(double, 75) // XCTAssertEqual(double, 75)
XCTAssertEqual(result.topic, "test") // XCTAssertEqual(result.topic, "test")
try await client.publish(26, to: "publish") // try await client.publish(26, to: "publish")
try await Task.sleep(for: .milliseconds(100)) // try await Task.sleep(for: .milliseconds(100))
client.shutdown() // client.shutdown()
} // }
//
XCTAssertEqual(capturedValues.values.count, 1) // XCTAssertEqual(capturedValues.values.count, 1)
XCTAssertEqual(capturedValues.values.first?.value, 26) // XCTAssertEqual(capturedValues.values.first?.value, 26)
XCTAssertEqual(capturedValues.values.first?.topic, "publish") // XCTAssertEqual(capturedValues.values.first?.topic, "publish")
XCTAssertTrue(capturedValues.didShutdown) // XCTAssertTrue(capturedValues.didShutdown)
} // }
} // }
//
// func testSensorCapturesPublishedState() async throws { // func testSensorCapturesPublishedState() async throws {
// let client = createClient(identifier: "testSensorCapturesPublishedState") // let client = createClient(identifier: "testSensorCapturesPublishedState")
// let mqtt = client.client // let mqtt = client.client
@@ -258,35 +258,35 @@ class PublishInfoContainer {
} }
} }
extension SensorsClient { // extension SensorsClient {
//
static func testing( // static func testing(
yielding: [(value: Double, to: String)], // yielding: [(value: Double, to: String)],
capturePublishedValues: @escaping (Double, String) -> Void, // capturePublishedValues: @escaping (Double, String) -> Void,
captureShutdownEvent: @escaping (Bool) -> Void // captureShutdownEvent: @escaping (Bool) -> Void
) -> Self { // ) -> Self {
let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self) // let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
let logger = Logger(label: "\(Self.self).testing") // let logger = Logger(label: "\(Self.self).testing")
//
return .init( // return .init(
listen: { topics in // listen: { topics in
for (value, topic) in yielding where topics.contains(topic) { // for (value, topic) in yielding where topics.contains(topic) {
continuation.yield( // continuation.yield(
(buffer: ByteBuffer(string: "\(value)"), topic: topic) // (buffer: ByteBuffer(string: "\(value)"), topic: topic)
) // )
} // }
return stream // return stream
}, // },
logger: logger, // logger: logger,
publish: { value, topic in // publish: { value, topic in
capturePublishedValues(value, topic) // capturePublishedValues(value, topic)
}, // },
shutdown: { // shutdown: {
captureShutdownEvent(true) // captureShutdownEvent(true)
continuation.finish() // continuation.finish()
} // }
) // )
} // }
} // }
struct TopicNotFoundError: Error {} struct TopicNotFoundError: Error {}