feat: Fixes some tests and docker builds

This commit is contained in:
2024-11-14 14:58:09 -05:00
parent e7a849b003
commit 163f603b69
9 changed files with 176 additions and 282 deletions

View File

@@ -33,6 +33,17 @@ public struct MQTTConnectionManager: Sendable {
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
@@ -61,6 +72,8 @@ public struct MQTTConnectionManager: Sendable {
.start()
.removeDuplicates()
.eraseToStream()
} _withClient: { callback in
try await callback(client)
}
}
}
@@ -73,18 +86,19 @@ extension MQTTConnectionManager: TestDependencyKey {
// MARK: - Helpers
final class MQTTConnectionStream: AsyncSequence, Sendable {
@_spi(Internal)
public final actor MQTTConnectionStream: Sendable {
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
typealias Element = MQTTConnectionManager.Event
public typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
private let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
init(client: MQTTClient, logger: Logger?) {
public init(client: MQTTClient, logger: Logger?) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
@@ -95,12 +109,19 @@ final class MQTTConnectionStream: AsyncSequence, Sendable {
deinit { stop() }
func start(
isolation: isolated (any Actor)? = #isolation
) -> AsyncStream<Element> {
// Check if the client is active and yield the result.
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
@@ -111,24 +132,26 @@ final class MQTTConnectionStream: AsyncSequence, Sendable {
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down.")
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
func stop() {
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
public __consuming func makeAsyncIterator() -> AsyncIterator {
start().makeAsyncIterator()
}
}
actor ConnectionManager {
@@ -160,13 +183,12 @@ actor ConnectionManager {
}
func connect(
isolation: isolated (any Actor)? = #isolation,
cleanSession: Bool
) async throws {
guard !(await hasConnected) else { return }
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
await setHasConnected()
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }