feat: Renaming and moves some items around, listeners now manage reconnection events.
All checks were successful
CI / Run Tests (push) Successful in 4m16s

This commit is contained in:
2024-11-15 17:15:01 -05:00
parent 947472f62d
commit c84427a9b3
14 changed files with 649 additions and 725 deletions

View File

@@ -15,10 +15,9 @@ let package = Package(
products: [ products: [
.executable(name: "dewpoint-controller", targets: ["DewPointController"]), .executable(name: "dewpoint-controller", targets: ["DewPointController"]),
.library(name: "Models", targets: ["Models"]), .library(name: "Models", targets: ["Models"]),
.library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]), .library(name: "MQTTManager", targets: ["MQTTManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"]), .library(name: "SensorsService", targets: ["SensorsService"])
.library(name: "TopicDependencies", targets: ["TopicDependencies"])
], ],
dependencies: [ dependencies: [
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
@@ -34,10 +33,9 @@ let package = Package(
name: "DewPointController", name: "DewPointController",
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionManager", "MQTTManager",
"MQTTConnectionService", "MQTTConnectionService",
"SensorsService", "SensorsService",
"TopicDependencies",
.product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "MQTTNIO", package: "mqtt-nio"),
.product(name: "NIO", package: "swift-nio"), .product(name: "NIO", package: "swift-nio"),
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics") .product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
@@ -52,7 +50,7 @@ let package = Package(
swiftSettings: swiftSettings swiftSettings: swiftSettings
), ),
.target( .target(
name: "MQTTConnectionManager", name: "MQTTManager",
dependencies: [ dependencies: [
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"), .product(name: "Dependencies", package: "swift-dependencies"),
@@ -65,7 +63,7 @@ let package = Package(
name: "MQTTConnectionService", name: "MQTTConnectionService",
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionManager", "MQTTManager",
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle") .product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
@@ -74,7 +72,7 @@ let package = Package(
name: "MQTTConnectionServiceTests", name: "MQTTConnectionServiceTests",
dependencies: [ dependencies: [
"MQTTConnectionService", "MQTTConnectionService",
"MQTTConnectionManager", "MQTTManager",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
] ]
), ),
@@ -82,8 +80,7 @@ let package = Package(
name: "SensorsService", name: "SensorsService",
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionManager", "MQTTManager",
"TopicDependencies",
.product(name: "Dependencies", package: "swift-dependencies"), .product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"), .product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "MQTTNIO", package: "mqtt-nio"),
@@ -97,15 +94,6 @@ let package = Package(
"SensorsService", "SensorsService",
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics") .product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
] ]
),
.target(
name: "TopicDependencies",
dependencies: [
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio")
],
swiftSettings: swiftSettings
) )
] ]
) )

View File

@@ -2,14 +2,13 @@ import Dependencies
import Foundation import Foundation
import Logging import Logging
import Models import Models
import MQTTConnectionManager
import MQTTConnectionService import MQTTConnectionService
import MQTTManager
import MQTTNIO import MQTTNIO
import NIO import NIO
import PsychrometricClientLive import PsychrometricClientLive
import SensorsService import SensorsService
import ServiceLifecycle import ServiceLifecycle
import TopicDependencies
@main @main
struct Application { struct Application {
@@ -38,9 +37,7 @@ struct Application {
do { do {
try await withDependencies { try await withDependencies {
$0.psychrometricClient = .liveValue $0.psychrometricClient = .liveValue
$0.topicListener = .live(client: mqtt) $0.mqtt = .live(client: mqtt, logger: logger)
$0.topicPublisher = .live(client: mqtt)
$0.mqttConnectionManager = .live(client: mqtt, logger: logger)
} operation: { } operation: {
let mqttConnection = MQTTConnectionService(logger: logger) let mqttConnection = MQTTConnectionService(logger: logger)
let sensors = SensorsService(sensors: .live, logger: logger) let sensors = SensorsService(sensors: .live, logger: logger)

View File

@@ -1,221 +0,0 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Shutdown the connection to the MQTT broker.
///
/// - Note: You should cancel any tasks that are listening to the connection stream first.
public var shutdown: @Sendable () -> Void
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
public static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init {
try await manager.connect(cleanSession: cleanSession)
} shutdown: {
manager.shutdown()
} stream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
} _withClient: { callback in
try await callback(client)
}
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
// MARK: - Helpers
@_spi(Internal)
public final actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
private let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.logger = logger
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
self.client = client
self.logger = logger
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
// We should've already logged that we're shutting down if
// the manager was shutdown properly, so don't log it twice.
self.shutdown(withLogging: false)
}
private func setHasConnected() {
hasConnected = true
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
}

View File

@@ -1,13 +1,13 @@
import Dependencies import Dependencies
import Logging import Logging
import Models import Models
import MQTTConnectionManager import MQTTManager
import ServiceLifecycle import ServiceLifecycle
public actor MQTTConnectionService: Service { public struct MQTTConnectionService: Service {
@Dependency(\.mqttConnectionManager) var manager @Dependency(\.mqtt) var mqtt
private nonisolated let logger: Logger? private let logger: Logger?
public init( public init(
logger: Logger? = nil logger: Logger? = nil
@@ -20,8 +20,8 @@ public actor MQTTConnectionService: Service {
/// connection. /// connection.
public func run() async throws { public func run() async throws {
try await withGracefulShutdownHandler { try await withGracefulShutdownHandler {
try await manager.connect() try await mqtt.connect()
for await event in try manager.stream().cancelOnGracefulShutdown() { for await event in try mqtt.connectionStream().cancelOnGracefulShutdown() {
// We don't really need to do anything with the events, so just logging // We don't really need to do anything with the events, so just logging
// for now. But we need to iterate on an async stream for the service to // for now. But we need to iterate on an async stream for the service to
// continue to run and handle graceful shutdowns. // continue to run and handle graceful shutdowns.
@@ -29,7 +29,7 @@ public actor MQTTConnectionService: Service {
} }
// when we reach here we are shutting down, so we shutdown // when we reach here we are shutting down, so we shutdown
// the manager. // the manager.
manager.shutdown() mqtt.shutdown()
} onGracefulShutdown: { } onGracefulShutdown: {
self.logger?.trace("Received graceful shutdown.") self.logger?.trace("Received graceful shutdown.")
} }

View File

@@ -0,0 +1,197 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqtt: MQTTManager {
get { self[MQTTManager.self] }
set { self[MQTTManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTManager: Sendable {
public typealias ListenStream = AsyncStream<MQTTPublishInfo>
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Create a stream of connection events.
public var connectionStream: @Sendable () throws -> AsyncStream<Event>
private var _listen: @Sendable ([String], MQTTQoS) async throws -> ListenStream
/// Publish a value to the MQTT broker for a given topic.
public var publish: @Sendable (PublishRequest) async throws -> Void
/// Shutdown the connection to the MQTT broker.
public var shutdown: @Sendable () -> Void
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
private var _withClient: @Sendable ((MQTTClient) async throws -> Void) async throws -> Void
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> ListenStream {
try await listen(to: topics, qos: qos)
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - payload: The value to publish.
/// - topicName: The topic to publish the new value to.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
_ payload: ByteBuffer,
to topicName: String,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Perform an operation with the underlying MQTTClient, this can be useful in
/// tests, so this module needs imported with `@_spi(Testing) import` to use this method.
@_spi(Internal)
public func withClient(
_ callback: @Sendable (MQTTClient) async throws -> Void
) async throws {
try await _withClient(callback)
}
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
public extension MQTTManager {
/// Create the live manager.
///
static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init(
connect: { try await manager.connect(cleanSession: cleanSession) },
connectionStream: {
MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
},
_listen: { topics, qos in
try await manager.listen(to: topics, qos: qos)
},
publish: { request in
let topic = request.topicName
guard client.isActive() else {
logger?.debug("Client is not active, unable to publish to topic: \(topic)")
return
}
logger?.trace("Begin publishing to topic: \(topic)")
defer { logger?.trace("Done publishing to topic: \(topic)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
},
shutdown: {
manager.shutdown()
},
_withClient: { callback in
try await callback(client)
}
)
}
}
extension MQTTManager: TestDependencyKey {
public static let testValue: MQTTManager = Self()
}

View File

@@ -0,0 +1,98 @@
import Foundation
import Logging
import MQTTNIO
actor ConnectionManager {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
private var hasConnected: Bool = false
private var listeners: [TopicListenerStream] = []
private var isShuttingDown = false
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
var logger = logger
logger?[metadataKey: "instance"] = "\(Self.self)"
self.logger = logger
self.client = client
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
if !isShuttingDown {
let message = """
Did not properly close the connection manager. This can lead to
dangling references.
Please call `shutdown` to properly close all connections and listener streams.
"""
logger?.warning("\(message)")
self.shutdown()
}
}
private func setHasConnected() {
hasConnected = true
}
func listen(
to topics: [String],
qos: MQTTQoS
) async throws -> MQTTManager.ListenStream {
let listener = TopicListenerStream(client: client, logger: logger, topics: topics, qos: qos)
listeners.append(listener)
await listener.start()
return listener.stream
}
func connect(
cleanSession: Bool
) async throws {
guard !hasConnected else { return }
do {
try await client.connect(cleanSession: cleanSession)
setHasConnected()
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
private func shutdownListeners() {
_ = listeners.map { $0.shutdown() }
listeners = []
isShuttingDown = true
}
nonisolated func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
Task { await shutdownListeners() }
}
}

View File

@@ -0,0 +1,74 @@
import Foundation
import Logging
import MQTTNIO
@_spi(Internal)
public actor MQTTConnectionStream: Sendable {
public typealias Element = MQTTManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private let logger: Logger?
nonisolated let name: String
private let stream: AsyncStream<Element>
private var isShuttingDown = false
public init(client: MQTTClient, logger: Logger?) {
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit { stop() }
public nonisolated func start() -> AsyncStream<Element> {
// Check if the client is active and yield the initial result.
continuation.yield(client.isActive() ? .connected : .disconnected)
// Continually check if the client is active.
let task = Task {
let isShuttingDown = await self.isShuttingDown
while !Task.isCancelled, !isShuttingDown {
try await Task.sleep(for: .milliseconds(100))
continuation.yield(client.isActive() ? .connected : .disconnected)
}
}
// Register listener on the client for when the connection
// closes.
client.addCloseListener(named: name) { _ in
self.logger?.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
// Register listener on the client for when the client
// is shutdown.
client.addShutdownListener(named: name) { _ in
self.logger?.trace("Client is shutting down, ending connection stream: \(self.name)")
self.continuation.yield(.shuttingDown)
Task { await self.setIsShuttingDown() }
task.cancel()
self.stop()
}
return stream
}
private func setIsShuttingDown() {
isShuttingDown = true
}
public nonisolated func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}

View File

@@ -0,0 +1,177 @@
import Foundation
import Logging
import MQTTNIO
actor TopicListenerStream {
typealias Stream = MQTTManager.ListenStream
private let client: MQTTClient
private let configuration: Configuration
private let continuation: Stream.Continuation
private let logger: Logger?
private let name: String
let stream: Stream
private var shuttingDown: Bool = false
private var onShutdownHandler: (@Sendable () -> Void)?
init(
client: MQTTClient,
logger: Logger?,
topics: [String],
qos: MQTTQoS
) {
// Setup the logger so we can more easily decipher log messages.
var logger = logger
logger?[metadataKey: "type"] = "\(Self.self)"
self.logger = logger
let (stream, continuation) = Stream.makeStream()
self.client = client
self.configuration = .init(qos: qos, topics: topics)
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
struct Configuration: Sendable {
let qos: MQTTQoS
let topics: [String]
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
private func subscribe() async throws {
guard !shuttingDown else { return }
logger?.debug("Begin subscribing to topics.")
do {
_ = try await client.subscribe(to: configuration.topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: configuration.qos)
})
} catch {
logger?.error("Received error while subscribing to topics: \(configuration.topics)")
throw TopicListenerError.failedToSubscribe
}
logger?.debug("Done subscribing to topics.")
}
public func start() {
logger?.trace("Starting listener for topics: \(configuration.topics)")
let stream = MQTTConnectionStream(client: client, logger: logger)
.start()
.removeDuplicates()
.eraseToStream()
let task = Task {
// Listen for connection events to restablish the stream upon a
// client becoming disconnected / reconnected, and properly shutdown
// the stream on the client being shutdown.
for await event in stream {
logger?.trace("Received event: \(event)")
switch event {
case .shuttingDown:
shutdown()
case .disconnected:
try await Task.sleep(for: .milliseconds(100))
case .connected:
try await subscribe()
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.logger?.error("Received error while listening: \(error)")
case let .success(publishInfo):
if self.configuration.topics.contains(publishInfo.topicName) {
self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(publishInfo)
}
}
}
}
}
}
onShutdownHandler = { task.cancel() }
}
// TODO: remove.
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw TopicListenerError.connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
try await subscribe()
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.logger?.error("Received error while listening: \(error)")
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.logger?.debug("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(publishInfo)
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
onShutdownHandler = nil
}
public nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task {
await onShutdownHandler?()
await self.setIsShuttingDown()
}
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -1,42 +0,0 @@
import NIO
import NIOFoundationCompat
import PsychrometricClient
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: inout ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: inout ByteBuffer) {
guard let string = buffer.readString(
length: buffer.readableBytes,
encoding: String.Encoding.utf8
)
else { return nil }
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = RawValue(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: inout ByteBuffer) {
guard let value = Double(buffer: &buffer) else { return nil }
self.init(value)
}
}

View File

@@ -3,12 +3,11 @@ import DependenciesMacros
import Foundation import Foundation
import Logging import Logging
import Models import Models
import MQTTConnectionManager import MQTTManager
import MQTTNIO import MQTTNIO
import NIO import NIO
import PsychrometricClient import PsychrometricClient
import ServiceLifecycle import ServiceLifecycle
import TopicDependencies
/// Service that is responsible for listening to changes of the temperature and humidity /// Service that is responsible for listening to changes of the temperature and humidity
/// sensors, then publishing back the calculated dew-point temperature and enthalpy for /// sensors, then publishing back the calculated dew-point temperature and enthalpy for
@@ -17,9 +16,7 @@ import TopicDependencies
/// ///
public actor SensorsService: Service { public actor SensorsService: Service {
@Dependency(\.mqttConnectionManager.stream) var connectionStream @Dependency(\.mqtt) var mqtt
@Dependency(\.topicListener) var topicListener
@Dependency(\.topicPublisher) var topicPublisher
/// The logger to use for the service. /// The logger to use for the service.
private let logger: Logger? private let logger: Logger?
@@ -27,12 +24,9 @@ public actor SensorsService: Service {
/// The sensors that we are listening for updates to, so /// The sensors that we are listening for updates to, so
/// that we can calculate the dew-point temperature and enthalpy /// that we can calculate the dew-point temperature and enthalpy
/// values to publish back to the MQTT broker. /// values to publish back to the MQTT broker.
var sensors: [TemperatureAndHumiditySensor] private var sensors: [TemperatureAndHumiditySensor]
@_spi(Internal) private var topics: [String] {
public var isListening: Bool = false
var topics: [String] {
sensors.reduce(into: [String]()) { array, sensor in sensors.reduce(into: [String]()) { array, sensor in
array.append(sensor.topics.temperature) array.append(sensor.topics.temperature)
array.append(sensor.topics.humidity) array.append(sensor.topics.humidity)
@@ -60,33 +54,16 @@ public actor SensorsService: Service {
public func run() async throws { public func run() async throws {
precondition(sensors.count > 0, "Sensors should not be empty.") precondition(sensors.count > 0, "Sensors should not be empty.")
try await withGracefulShutdownHandler {
// Listen for connection events, so that we can automatically
// reconnect any sensor topics we're listening to upon a disconnect / reconnect
// event. We can also shutdown any topic listeners upon a shutdown event.
for await event in try connectionStream().cancelOnGracefulShutdown() {
switch event {
case .shuttingDown:
logger?.debug("Received shutdown event.")
isListening = false
try await self.shutdown()
case .disconnected:
logger?.debug("Received disconnected event.")
isListening = false
try await Task.sleep(for: .milliseconds(100))
case .connected:
logger?.debug("Received connected event.")
let stream = try await makeStream() let stream = try await makeStream()
isListening = true
await withGracefulShutdownHandler {
for await result in stream.cancelOnGracefulShutdown() { for await result in stream.cancelOnGracefulShutdown() {
logger?.debug("Received result for topic: \(result.topic)") logger?.debug("Received result for topic: \(result.topic)")
await self.handleResult(result) await handleResult(result)
}
}
} }
} onGracefulShutdown: { } onGracefulShutdown: {
Task {
self.logger?.debug("Received graceful shutdown.") self.logger?.debug("Received graceful shutdown.")
Task {
try await self.shutdown() try await self.shutdown()
} }
} }
@@ -95,24 +72,13 @@ public actor SensorsService: Service {
@_spi(Internal) @_spi(Internal)
public func shutdown() async throws { public func shutdown() async throws {
try await publishUpdates() try await publishUpdates()
topicListener.shutdown()
} }
private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> { private func makeStream() async throws -> AsyncStream<(buffer: ByteBuffer, topic: String)> {
try await topicListener.listen(to: topics)
// ignore errors, so that we continue to listen, but log them
// for debugging purposes.
.compactMap { result in
switch result {
case let .failure(error):
self.logger?.debug("Received error listening for sensors: \(error)")
return nil
case let .success(info):
return (info.payload, info.topicName)
}
}
// ignore duplicate values, to prevent publishing dew-point and enthalpy // ignore duplicate values, to prevent publishing dew-point and enthalpy
// changes to frequently. // changes to frequently.
try await mqtt.listen(to: topics)
.map { ($0.payload, $0.topicName) }
.removeDuplicates { lhs, rhs in .removeDuplicates { lhs, rhs in
lhs.buffer == rhs.buffer lhs.buffer == rhs.buffer
&& lhs.topic == rhs.topic && lhs.topic == rhs.topic
@@ -127,8 +93,7 @@ public actor SensorsService: Service {
logger?.debug("Begin handling result for topic: \(topic)") logger?.debug("Begin handling result for topic: \(topic)")
func decode<V: BufferInitalizable>(_: V.Type) -> V? { func decode<V: BufferInitalizable>(_: V.Type) -> V? {
var buffer = result.buffer return V(buffer: result.buffer)
return V(buffer: &buffer)
} }
if topic.contains("temperature") { if topic.contains("temperature") {
@@ -159,9 +124,9 @@ public actor SensorsService: Service {
private func publish(_ double: Double?, to topic: String) async throws { private func publish(_ double: Double?, to topic: String) async throws {
guard let double else { return } guard let double else { return }
try await topicPublisher.publish( try await mqtt.publish(
ByteBufferAllocator().buffer(string: "\(double)"),
to: topic, to: topic,
payload: ByteBufferAllocator().buffer(string: "\(double)"),
qos: .exactlyOnce, qos: .exactlyOnce,
retain: true retain: true
) )
@@ -210,3 +175,38 @@ private extension Array where Element == TemperatureAndHumiditySensor {
self[index].needsProcessed = false self[index].needsProcessed = false
} }
} }
/// Represents a type that can be initialized by a ``ByteBuffer``.
protocol BufferInitalizable {
init?(buffer: ByteBuffer)
}
extension Double: BufferInitalizable {
/// Attempt to create / parse a double from a byte buffer.
init?(buffer: ByteBuffer) {
let string = String(buffer: buffer)
self.init(string)
}
}
extension Tagged: BufferInitalizable where RawValue: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = RawValue(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Humidity<Relative>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}
extension Temperature<DryAir>: BufferInitalizable {
init?(buffer: ByteBuffer) {
guard let value = Double(buffer: buffer) else { return nil }
self.init(value)
}
}

View File

@@ -1,186 +0,0 @@
import Dependencies
import DependenciesMacros
import Foundation
import MQTTNIO
/// A dependency that can generate an async stream of changes to the given topics.
///
/// - Note: This type only conforms to ``TestDependencyKey`` because it requires an MQTTClient
/// to generate the live dependency.
@DependencyClient
public struct TopicListener: Sendable {
public typealias Stream = AsyncStream<Result<MQTTPublishInfo, any Error>>
/// Create an async stream that listens for changes to the given topics.
private var _listen: @Sendable ([String], MQTTQoS) async throws -> Stream
/// Shutdown the listener stream.
public var shutdown: @Sendable () -> Void
/// Create a new topic listener.
///
/// - Parameters:
/// - listen: Generate an async stream of changes for the given topics.
/// - shutdown: Shutdown the topic listener stream.
public init(
listen: @Sendable @escaping ([String], MQTTQoS) async throws -> Stream,
shutdown: @Sendable @escaping () -> Void
) {
self._listen = listen
self.shutdown = shutdown
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
to topics: [String],
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await _listen(topics, qos)
}
/// Create an async stream that listens for changes to the given topics.
///
/// - Parameters:
/// - topics: The topics to listen for changes to.
/// - qos: The MQTTQoS for the subscription.
public func listen(
_ topics: String...,
qos: MQTTQoS = .atLeastOnce
) async throws -> Stream {
try await listen(to: topics, qos: qos)
}
/// Create the live implementation of the topic listener with the given MQTTClient.
///
/// - Parameters:
/// - client: The MQTTClient to use.
public static func live(client: MQTTClient) -> Self {
let listener = MQTTTopicListener(client: client)
return .init(
listen: { try await listener.listen($0, $1) },
shutdown: { listener.shutdown() }
)
}
}
extension TopicListener: TestDependencyKey {
public static var testValue: TopicListener { Self() }
}
public extension DependencyValues {
var topicListener: TopicListener {
get { self[TopicListener.self] }
set { self[TopicListener.self] = newValue }
}
}
// MARK: - Helpers
private actor MQTTTopicListener {
private let client: MQTTClient
private let continuation: TopicListener.Stream.Continuation
private let name: String
let stream: TopicListener.Stream
private var shuttingDown: Bool = false
init(
client: MQTTClient
) {
let (stream, continuation) = TopicListener.Stream.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
}
deinit {
if !shuttingDown {
let message = """
Shutdown was not called on topic listener. This could lead to potential errors or
the stream never ending.
Please ensure that you call shutdown on the listener.
"""
client.logger.warning("\(message)")
continuation.finish()
}
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
}
func listen(
_ topics: [String],
_ qos: MQTTQoS = .atLeastOnce
) async throws -> TopicListener.Stream {
var sleepTimes = 0
while !client.isActive() {
guard sleepTimes < 10 else {
throw TopicListenerError.connectionTimeout
}
try? await Task.sleep(for: .milliseconds(100))
sleepTimes += 1
}
client.logger.trace("Client is active, begin subscribing to topics.")
let subscription = try? await client.subscribe(to: topics.map {
MQTTSubscribeInfo(topicFilter: $0, qos: qos)
})
guard subscription != nil else {
client.logger.error("Error subscribing to topics: \(topics)")
throw TopicListenerError.failedToSubscribe
}
client.logger.trace("Done subscribing, begin listening to topics.")
client.addPublishListener(named: name) { result in
switch result {
case let .failure(error):
self.client.logger.error("Received error while listening: \(error)")
self.continuation.yield(.failure(MQTTListenResultError(error)))
case let .success(publishInfo):
if topics.contains(publishInfo.topicName) {
self.client.logger.trace("Recieved new value for topic: \(publishInfo.topicName)")
self.continuation.yield(.success(publishInfo))
}
}
}
return stream
}
private func setIsShuttingDown() {
shuttingDown = true
}
nonisolated func shutdown() {
client.logger.trace("Closing topic listener...")
continuation.finish()
client.removePublishListener(named: name)
client.removeShutdownListener(named: name)
Task { await self.setIsShuttingDown() }
}
}
// MARK: - Errors
public enum TopicListenerError: Error {
case connectionTimeout
case failedToSubscribe
}
public struct MQTTListenResultError: Error {
let underlyingError: any Error
init(_ underlyingError: any Error) {
self.underlyingError = underlyingError
}
}

View File

@@ -1,117 +0,0 @@
import Dependencies
import DependenciesMacros
import MQTTNIO
import NIO
/// A dependency that is responsible for publishing values to an MQTT broker.
///
/// - Note: This dependency only conforms to `TestDependencyKey` because it
/// requires an active `MQTTClient` to generate the live dependency.
@DependencyClient
public struct TopicPublisher: Sendable {
private var _publish: @Sendable (PublishRequest) async throws -> Void
/// Create a new topic publisher.
///
/// - Parameters:
/// - publish: Handle the publish request.
public init(
publish: @Sendable @escaping (PublishRequest) async throws -> Void
) {
self._publish = publish
}
/// Publish a new value to the given topic.
///
/// - Parameters:
/// - topicName: The topic to publish the new value to.
/// - payload: The value to publish.
/// - qos: The MQTTQoS.
/// - retain: The retain flag.
public func publish(
to topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool = false
) async throws {
try await _publish(.init(
topicName: topicName,
payload: payload,
qos: qos,
retain: retain
))
}
/// Create the live topic publisher with the given `MQTTClient`.
///
/// - Parameters:
/// - client: The mqtt broker client to use.
public static func live(client: MQTTClient) -> Self {
.init(
publish: { request in
guard client.isActive() else {
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
return
}
client.logger.trace("Begin publishing to topic: \(request.topicName)")
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
try await client.publish(
to: request.topicName,
payload: request.payload,
qos: request.qos,
retain: request.retain
)
}
)
}
/// Represents the parameters required to publish a new value to the
/// MQTT broker.
public struct PublishRequest: Equatable, Sendable {
/// The topic to publish the new value to.
public let topicName: String
/// The value to publish.
public let payload: ByteBuffer
/// The qos of the request.
public let qos: MQTTQoS
/// The retain flag for the request.
public let retain: Bool
/// Create a new publish request.
///
/// - Parameters:
/// - topicName: The topic to publish to.
/// - payload: The value to publish.
/// - qos: The qos of the request.
/// - retain: The retain flag of the request.
public init(
topicName: String,
payload: ByteBuffer,
qos: MQTTQoS,
retain: Bool
) {
self.topicName = topicName
self.payload = payload
self.qos = qos
self.retain = retain
}
}
}
extension TopicPublisher: TestDependencyKey {
public static var testValue: TopicPublisher { Self() }
}
public extension DependencyValues {
/// A dependency that is responsible for publishing values to an MQTT broker.
var topicPublisher: TopicPublisher {
get { self[TopicPublisher.self] }
set { self[TopicPublisher.self] = newValue }
}
}

View File

@@ -1,8 +1,8 @@
import AsyncAlgorithms import AsyncAlgorithms
import Logging import Logging
import Models import Models
@_spi(Internal) import MQTTConnectionManager
import MQTTConnectionService import MQTTConnectionService
@_spi(Internal) import MQTTManager
import MQTTNIO import MQTTNIO
import NIO import NIO
import ServiceLifecycle import ServiceLifecycle
@@ -22,15 +22,16 @@ final class MQTTConnectionServiceTests: XCTestCase {
// TODO: Move to integration tests. // TODO: Move to integration tests.
func testMQTTConnectionStream() async throws { func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream") let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live( let manager = MQTTManager.live(
client: client, client: client,
logger: Self.logger, logger: Self.logger,
alwaysReconnect: false alwaysReconnect: false
) )
defer { manager.shutdown() }
let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger) let connectionStream1 = MQTTConnectionStream(client: client, logger: Self.logger)
let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger) let connectionStream2 = MQTTConnectionStream(client: client, logger: Self.logger)
var events1 = [MQTTConnectionManager.Event]() var events1 = [MQTTManager.Event]()
var events2 = [MQTTConnectionManager.Event]() var events2 = [MQTTManager.Event]()
let stream1 = connectionStream1.start() let stream1 = connectionStream1.start()
let stream2 = connectionStream2.start() let stream2 = connectionStream2.start()

View File

@@ -1,12 +1,11 @@
import Dependencies import Dependencies
import Logging import Logging
import Models import Models
@_spi(Internal) import MQTTConnectionManager @_spi(Internal) import MQTTManager
import MQTTNIO import MQTTNIO
import NIO import NIO
import PsychrometricClientLive import PsychrometricClientLive
@_spi(Internal) import SensorsService @_spi(Internal) import SensorsService
import TopicDependencies
import XCTest import XCTest
final class SensorsClientTests: XCTestCase { final class SensorsClientTests: XCTestCase {
@@ -23,25 +22,24 @@ final class SensorsClientTests: XCTestCase {
let client = createClient(identifier: "\(Self.self)") let client = createClient(identifier: "\(Self.self)")
withDependencies { withDependencies {
$0.mqttConnectionManager = .live(client: client, logger: Self.logger) $0.mqtt = .live(client: client, logger: Self.logger)
$0.psychrometricClient = PsychrometricClient.liveValue $0.psychrometricClient = PsychrometricClient.liveValue
$0.topicListener = .live(client: client)
$0.topicPublisher = .live(client: client)
} operation: { } operation: {
super.invokeTest() super.invokeTest()
} }
} }
func testListeningResumesAfterDisconnectThenReconnect() async throws { func testListeningResumesAfterDisconnectThenReconnect() async throws {
@Dependency(\.mqttConnectionManager) var manager
struct TimeoutError: Error {} struct TimeoutError: Error {}
let sensor = TemperatureAndHumiditySensor(location: .return) let sensor = TemperatureAndHumiditySensor(location: .return)
var results = [TopicPublisher.PublishRequest]() let results = ResultContainer()
try await withDependencies { try await withDependencies {
$0.topicPublisher = .capturing { results.append($0) } $0.mqtt.publish = results.append
} operation: { } operation: {
@Dependency(\.mqtt) var manager
let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger) let sensorsService = SensorsService(sensors: [sensor], logger: Self.logger)
let task = Task { try await sensorsService.run() } let task = Task { try await sensorsService.run() }
defer { task.cancel() } defer { task.cancel() }
@@ -58,9 +56,7 @@ final class SensorsClientTests: XCTestCase {
} }
// Give time to re-subscribe. // Give time to re-subscribe.
while !(await sensorsService.isListening) { try await Task.sleep(for: .milliseconds(200))
try await Task.sleep(for: .milliseconds(100))
}
try await client.publish( try await client.publish(
to: sensor.topics.temperature, to: sensor.topics.temperature,
@@ -77,7 +73,7 @@ final class SensorsClientTests: XCTestCase {
} }
var timeoutCount = 0 var timeoutCount = 0
while !(results.count == 2) { while !(await results.count == 2) {
guard timeoutCount < 20 else { guard timeoutCount < 20 else {
throw TimeoutError() throw TimeoutError()
} }
@@ -85,6 +81,8 @@ final class SensorsClientTests: XCTestCase {
timeoutCount += 1 timeoutCount += 1
} }
let results = await results.results()
XCTAssertEqual(results.count, 2) XCTAssertEqual(results.count, 2)
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint })) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.dewPoint }))
XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy })) XCTAssert(results.contains(where: { $0.topicName == sensor.topics.enthalpy }))
@@ -122,63 +120,23 @@ final class SensorsClientTests: XCTestCase {
// MARK: Helpers for tests. // MARK: Helpers for tests.
class PublishInfoContainer {
private(set) var info: [MQTTPublishInfo]
private var topicFilters: [String]?
init(topicFilters: [String]? = nil) {
self.info = []
self.topicFilters = topicFilters
}
func addPublishInfo(_ info: MQTTPublishInfo) async {
guard let topicFilters else {
self.info.append(info)
return
}
if topicFilters.contains(info.topicName) {
self.info.append(info)
}
}
}
extension TopicPublisher {
static func capturing(
_ callback: @escaping (PublishRequest) -> Void
) -> Self {
.init { callback($0) }
}
}
// extension SensorsClient {
//
// static func testing(
// yielding: [(value: Double, to: String)],
// capturePublishedValues: @escaping (Double, String) -> Void,
// captureShutdownEvent: @escaping (Bool) -> Void
// ) -> Self {
// let (stream, continuation) = AsyncStream.makeStream(of: PublishInfo.self)
// let logger = Logger(label: "\(Self.self).testing")
//
// return .init(
// listen: { topics in
// for (value, topic) in yielding where topics.contains(topic) {
// continuation.yield(
// (buffer: ByteBuffer(string: "\(value)"), topic: topic)
// )
// }
// return stream
// },
// logger: logger,
// publish: { value, topic in
// capturePublishedValues(value, topic)
// },
// shutdown: {
// captureShutdownEvent(true)
// continuation.finish()
// }
// )
// }
// }
struct TopicNotFoundError: Error {} struct TopicNotFoundError: Error {}
actor ResultContainer: Sendable {
private var storage = [MQTTManager.PublishRequest]()
init() {}
@Sendable func append(_ result: MQTTManager.PublishRequest) async {
storage.append(result)
}
var count: Int {
get async { storage.count }
}
func results() async -> [MQTTManager.PublishRequest] {
storage
}
}