75 lines
2.1 KiB
Swift
75 lines
2.1 KiB
Swift
import Foundation
|
|
import Logging
|
|
import MQTTNIO
|
|
|
|
@_spi(Internal)
|
|
public actor MQTTConnectionStream: Sendable {
|
|
|
|
public typealias Element = MQTTManager.Event
|
|
|
|
private let client: MQTTClient
|
|
private let continuation: AsyncStream<Element>.Continuation
|
|
private let logger: Logger?
|
|
nonisolated let name: String
|
|
private let stream: AsyncStream<Element>
|
|
private var isShuttingDown = false
|
|
|
|
public init(client: MQTTClient, logger: Logger?) {
|
|
var logger = logger
|
|
logger?[metadataKey: "type"] = "\(Self.self)"
|
|
self.logger = logger
|
|
|
|
let (stream, continuation) = AsyncStream<Element>.makeStream()
|
|
self.client = client
|
|
self.continuation = continuation
|
|
self.name = UUID().uuidString
|
|
self.stream = stream
|
|
}
|
|
|
|
deinit { stop() }
|
|
|
|
public nonisolated func start() -> AsyncStream<Element> {
|
|
// Check if the client is active and yield the initial result.
|
|
continuation.yield(client.isActive() ? .connected : .disconnected)
|
|
|
|
// Continually check if the client is active.
|
|
let task = Task {
|
|
let isShuttingDown = await self.isShuttingDown
|
|
while !Task.isCancelled, !isShuttingDown {
|
|
try await Task.sleep(for: .milliseconds(100))
|
|
continuation.yield(client.isActive() ? .connected : .disconnected)
|
|
}
|
|
}
|
|
|
|
// Register listener on the client for when the connection
|
|
// closes.
|
|
client.addCloseListener(named: name) { _ in
|
|
self.logger?.trace("Client has disconnected.")
|
|
self.continuation.yield(.disconnected)
|
|
}
|
|
|
|
// Register listener on the client for when the client
|
|
// is shutdown.
|
|
client.addShutdownListener(named: name) { _ in
|
|
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
|
|
self.continuation.yield(.shuttingDown)
|
|
Task { await self.setIsShuttingDown() }
|
|
task.cancel()
|
|
self.stop()
|
|
}
|
|
|
|
return stream
|
|
}
|
|
|
|
private func setIsShuttingDown() {
|
|
isShuttingDown = true
|
|
}
|
|
|
|
public nonisolated func stop() {
|
|
client.removeCloseListener(named: name)
|
|
client.removeShutdownListener(named: name)
|
|
continuation.finish()
|
|
}
|
|
|
|
}
|