222 lines
6.3 KiB
Swift
222 lines
6.3 KiB
Swift
import AsyncAlgorithms
|
|
import Dependencies
|
|
import DependenciesMacros
|
|
import Foundation
|
|
import Logging
|
|
import MQTTNIO
|
|
import NIO
|
|
|
|
public extension DependencyValues {
|
|
|
|
/// A dependency that is responsible for managing the connection to
|
|
/// an MQTT broker, listen to topics, and publish values back to the
|
|
/// broker.
|
|
var mqtt: MQTTManager {
|
|
get { self[MQTTManager.self] }
|
|
set { self[MQTTManager.self] = newValue }
|
|
}
|
|
}
|
|
|
|
/// Represents the interface needed to connect, listen, and publish to an MQTT broker.
|
|
///
|
|
@DependencyClient
|
|
public struct MQTTManager: Sendable {
|
|
|
|
public typealias ListenStream = AsyncStream<MQTTPublishInfo>
|
|
|
|
/// Connect to the MQTT broker.
|
|
public var connect: @Sendable () async throws -> Void
|
|
|
|
/// Create a stream of connection events.
|
|
///
|
|
/// - SeeAlso: ``Event``
|
|
public var connectionStream: @Sendable () throws -> AsyncStream<Event>
|
|
|
|
private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream
|
|
|
|
/// Publish a value to the MQTT broker for a given topic.
|
|
public var publish: @Sendable (PublishRequest) async throws -> Void
|
|
|
|
/// Shutdown the connection to the MQTT broker.
|
|
public var shutdown: @Sendable () -> Void
|
|
|
|
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
|
|
|
|
public init(
|
|
connect: @escaping @Sendable () async throws -> Void,
|
|
connectionStream: @escaping @Sendable () throws -> AsyncStream<MQTTManager.Event>,
|
|
listen: @escaping @Sendable ([String], MQTTQoS) async throws -> MQTTManager.ListenStream,
|
|
publish: @escaping @Sendable (MQTTManager.PublishRequest) async throws -> Void,
|
|
shutdown: @escaping @Sendable () -> Void,
|
|
withClient: @escaping @Sendable ((MQTTClient) async throws -> Void) async throws -> Void = { _ in unimplemented() }
|
|
) {
|
|
self.connect = connect
|
|
self.connectionStream = connectionStream
|
|
self._listen = listen
|
|
self.publish = publish
|
|
self.shutdown = shutdown
|
|
self._withClient = withClient
|
|
}
|
|
|
|
/// Create an async stream that listens for changes to the given topics.
|
|
///
|
|
/// - Parameters:
|
|
/// - topics: The topics to listen for changes to.
|
|
/// - qos: The MQTTQoS for the subscription.
|
|
public func listen(
|
|
to topics: [String],
|
|
qos: MQTTQoS = .atLeastOnce
|
|
) async throws -> ListenStream {
|
|
try await _listen(topics, qos)
|
|
}
|
|
|
|
/// Create an async stream that listens for changes to the given topics.
|
|
///
|
|
/// - Parameters:
|
|
/// - topics: The topics to listen for changes to.
|
|
/// - qos: The MQTTQoS for the subscription.
|
|
public func listen(
|
|
_ topics: String...,
|
|
qos: MQTTQoS = .atLeastOnce
|
|
) async throws -> ListenStream {
|
|
try await listen(to: topics, qos: qos)
|
|
}
|
|
|
|
/// Publish a new value to the given topic.
|
|
///
|
|
/// - Parameters:
|
|
/// - payload: The value to publish.
|
|
/// - topicName: The topic to publish the new value to.
|
|
/// - qos: The MQTTQoS.
|
|
/// - retain: The retain flag.
|
|
public func publish(
|
|
_ payload: ByteBuffer,
|
|
to topicName: String,
|
|
qos: MQTTQoS,
|
|
retain: Bool = false,
|
|
properties: MQTTProperties = .init()
|
|
) async throws {
|
|
try await publish(.init(
|
|
topicName: topicName,
|
|
payload: payload,
|
|
qos: qos,
|
|
retain: retain,
|
|
properties: properties
|
|
))
|
|
}
|
|
|
|
/// Perform an operation with the underlying MQTTClient, this can be useful in
|
|
/// tests, so this module needs imported with `@_spi(Internal) import MQTTManager` to use this method.
|
|
@_spi(Internal)
|
|
public func withClient(
|
|
_ callback: @Sendable (MQTTClient) async throws -> Void
|
|
) async throws {
|
|
try await _withClient(callback)
|
|
}
|
|
|
|
/// Represents connection events that clients can listen for and
|
|
/// react accordingly.
|
|
public enum Event: Equatable, Sendable {
|
|
case connected
|
|
case disconnected
|
|
case shuttingDown
|
|
}
|
|
|
|
/// Represents the parameters required to publish a new value to the
|
|
/// MQTT broker.
|
|
public struct PublishRequest: Sendable {
|
|
|
|
/// The topic to publish the new value to.
|
|
public let topicName: String
|
|
|
|
/// The value to publish.
|
|
public let payload: ByteBuffer
|
|
|
|
/// The qos of the request.
|
|
public let qos: MQTTQoS
|
|
|
|
/// The retain flag for the request.
|
|
public let retain: Bool
|
|
|
|
public let properties: MQTTProperties
|
|
|
|
/// Create a new publish request.
|
|
///
|
|
/// - Parameters:
|
|
/// - topicName: The topic to publish to.
|
|
/// - payload: The value to publish.
|
|
/// - qos: The qos of the request.
|
|
/// - retain: The retain flag of the request.
|
|
public init(
|
|
topicName: String,
|
|
payload: ByteBuffer,
|
|
qos: MQTTQoS,
|
|
retain: Bool,
|
|
properties: MQTTProperties
|
|
) {
|
|
self.topicName = topicName
|
|
self.payload = payload
|
|
self.qos = qos
|
|
self.retain = retain
|
|
self.properties = properties
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
public extension MQTTManager {
|
|
/// Create the live manager.
|
|
///
|
|
static func live(
|
|
client: MQTTClient,
|
|
cleanSession: Bool = false,
|
|
logger: Logger? = nil,
|
|
alwaysReconnect: Bool = true
|
|
) -> Self {
|
|
let manager = ConnectionManager(
|
|
client: client,
|
|
logger: logger,
|
|
alwaysReconnect: alwaysReconnect
|
|
)
|
|
return .init(
|
|
connect: { try await manager.connect(cleanSession: cleanSession) },
|
|
connectionStream: {
|
|
MQTTConnectionStream(client: client, logger: logger)
|
|
.start()
|
|
.removeDuplicates()
|
|
.eraseToStream()
|
|
},
|
|
listen: { topics, qos in
|
|
try await manager.listen(to: topics, qos: qos)
|
|
},
|
|
publish: { request in
|
|
let topic = request.topicName
|
|
guard client.isActive() else {
|
|
logger?.debug("Client is not active, unable to publish to topic: \(topic)")
|
|
return
|
|
}
|
|
logger?.trace("Begin publishing to topic: \(topic)")
|
|
defer { logger?.debug("Done publishing to topic: \(topic)") }
|
|
try await client.publish(
|
|
to: request.topicName,
|
|
payload: request.payload,
|
|
qos: request.qos,
|
|
retain: request.retain,
|
|
properties: request.properties
|
|
).get()
|
|
},
|
|
shutdown: {
|
|
Task { try await client.shutdown() }
|
|
manager.shutdown()
|
|
},
|
|
withClient: { callback in
|
|
try await callback(client)
|
|
}
|
|
)
|
|
}
|
|
}
|
|
|
|
extension MQTTManager: TestDependencyKey {
|
|
public static let testValue: MQTTManager = Self()
|
|
}
|