Files
swift-mqtt-dewpoint/Sources/MQTTManager/Internal/ConnectionStream.swift
2024-11-15 17:15:01 -05:00

75 lines
2.1 KiB
Swift

import Foundation
import Logging
import MQTTNIO
@_spi(Internal)
public actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
nonisolated let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}