feat: Removes sensor client in favor of more generic topic listener and publisher

This commit is contained in:
2024-11-12 16:42:14 -05:00
parent b6db9b5322
commit 8067331ff8
7 changed files with 141 additions and 224 deletions

View File

@@ -10,8 +10,10 @@ import MQTTNIO
@DependencyClient
public struct TopicListener: Sendable {
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, MQTTListenResultError>>
/// Create an async stream that listens for changes to the given topics.
private var _listen: @Sendable (_ topics: [String]) async throws -> AsyncThrowingStream<MQTTPublishInfo, any Error>
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
/// Shutdown the listener stream.
public var shutdown: @Sendable () -> Void
@@ -22,7 +24,7 @@ public struct TopicListener: Sendable {
/// - listen: Generate an async stream of changes for the given topics.
/// - shutdown: Shutdown the topic listener stream.
public init(
listen: @Sendable @escaping ([String]) async throws -> AsyncThrowingStream<MQTTPublishInfo, any Error>,
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
shutdown: @Sendable @escaping () -> Void
) {
self._listen = listen
@@ -33,16 +35,24 @@ public struct TopicListener: Sendable {
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
public func listen(to topics: [String]) async throws -> AsyncThrowingStream<MQTTPublishInfo, any Error> {
try await _listen(topics)
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
public func listen(_ topics: String...) async throws -> AsyncThrowingStream<MQTTPublishInfo, any Error> {
try await listen(to: topics)
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await listen(to: topics, qos: qos)
}
/// Create the live implementation of the topic listener with the given MQTTClient.
@@ -52,16 +62,14 @@ public struct TopicListener: Sendable {
public static func live(client: MQTTClient) -> Self {
let listener = MQTTTopicListener(client: client)
return .init(
listen: { await listener.listen($0) },
listen: { try await listener.listen($0, $1) },
shutdown: { listener.shutdown() }
)
}
}
extension TopicListener: TestDependencyKey {
public static var testValue: TopicListener {
Self()
}
public static var testValue: TopicListener { Self() }
}
public extension DependencyValues {
@@ -75,15 +83,15 @@ public extension DependencyValues {
private actor MQTTTopicListener {
private let client: MQTTClient
private let continuation: AsyncThrowingStream<MQTTPublishInfo, any Error>.Continuation
private let continuation: TopicListener.Stream.Continuation
private let name: String
let stream: AsyncThrowingStream<MQTTPublishInfo, any Error>
let stream: TopicListener.Stream
private var shuttingDown: Bool = false
init(
client: MQTTClient
) {
let (stream, continuation) = AsyncThrowingStream<MQTTPublishInfo, any Error>.makeStream()
let (stream, continuation) = TopicListener.Stream.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
@@ -102,22 +110,53 @@ private actor MQTTTopicListener {
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
func listen(_ topics: [String]) async -> AsyncThrowingStream<MQTTPublishInfo, any Error> {
assert(client.isActive(), "The client is not connected.")
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws(TopicListenerError) -> TopicListener.Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw .connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
let subscription = try? await client.subscribe(to: topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
})
guard subscription != nil else {
client.logger.error("Error subscribing to topics: \(topics)")
throw .failedToSubscribe
}
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.client.logger.error("Received error while listening: \(error)")
self.continuation.yield(with: .failure(error))
self.continuation.yield(.failure(.init(error)))
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(publishInfo)
self.continuation.yield(.success(publishInfo))
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
return stream
}
@@ -131,3 +170,16 @@ private actor MQTTTopicListener {
Task { await self.setIsShuttingDown() }
}
}
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}