feat: Seperates connection stream and moves connection manager out of the connection service module.

This commit is contained in:
2024-11-13 17:12:56 -05:00
parent b8992b89b6
commit bd2a798320
8 changed files with 254 additions and 213 deletions

View File

@@ -8,3 +8,4 @@
--wrapconditions after-first --wrapconditions after-first
--typeblanklines preserve --typeblanklines preserve
--commas inline --commas inline
--stripunusedargs closure-only

View File

@@ -15,12 +15,13 @@ let package = Package(
products: [ products: [
.executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]), .executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]),
.library(name: "Models", targets: ["Models"]), .library(name: "Models", targets: ["Models"]),
.library(name: "MQTTConnectionManagerLive", targets: ["MQTTConnectionManagerLive"]), .library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]),
.library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]),
.library(name: "SensorsService", targets: ["SensorsService"]), .library(name: "SensorsService", targets: ["SensorsService"]),
.library(name: "TopicDependencies", targets: ["TopicDependencies"]) .library(name: "TopicDependencies", targets: ["TopicDependencies"])
], ],
dependencies: [ dependencies: [
.package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.6.0"), .package(url: "https://github.com/apple/swift-log", from: "1.6.0"),
.package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"), .package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"),
@@ -33,7 +34,8 @@ let package = Package(
name: "dewpoint-controller", name: "dewpoint-controller",
dependencies: [ dependencies: [
"Models", "Models",
"MQTTConnectionManagerLive", "MQTTConnectionManager",
"MQTTConnectionService",
"SensorsService", "SensorsService",
"TopicDependencies", "TopicDependencies",
.product(name: "MQTTNIO", package: "mqtt-nio"), .product(name: "MQTTNIO", package: "mqtt-nio"),
@@ -50,9 +52,11 @@ let package = Package(
swiftSettings: swiftSettings swiftSettings: swiftSettings
), ),
.target( .target(
name: "MQTTConnectionManagerLive", name: "MQTTConnectionManager",
dependencies: [ dependencies: [
"MQTTConnectionService", .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "Dependencies", package: "swift-dependencies"),
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "MQTTNIO", package: "mqtt-nio") .product(name: "MQTTNIO", package: "mqtt-nio")
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
@@ -61,8 +65,7 @@ let package = Package(
name: "MQTTConnectionService", name: "MQTTConnectionService",
dependencies: [ dependencies: [
"Models", "Models",
.product(name: "Dependencies", package: "swift-dependencies"), "MQTTConnectionManager",
.product(name: "DependenciesMacros", package: "swift-dependencies"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle") .product(name: "ServiceLifecycle", package: "swift-service-lifecycle")
], ],
swiftSettings: swiftSettings swiftSettings: swiftSettings
@@ -71,7 +74,7 @@ let package = Package(
name: "MQTTConnectionServiceTests", name: "MQTTConnectionServiceTests",
dependencies: [ dependencies: [
"MQTTConnectionService", "MQTTConnectionService",
"MQTTConnectionManagerLive", "MQTTConnectionManager",
.product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle")
] ]
), ),

View File

@@ -0,0 +1,194 @@
import AsyncAlgorithms
import Dependencies
import DependenciesMacros
import Foundation
import Logging
import MQTTNIO
import NIO
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
/// Connect to the MQTT broker.
public var connect: @Sendable () async throws -> Void
/// Shutdown the connection to the MQTT broker.
///
/// - Note: You should cancel any tasks that are listening to the connection stream first.
public var shutdown: @Sendable () -> Void
/// Create a stream of connection events.
public var stream: @Sendable () throws -> AsyncStream<Event>
/// Represents connection events that clients can listen for and
/// react accordingly.
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
public static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil,
alwaysReconnect: Bool = true
) -> Self {
let manager = ConnectionManager(
client: client,
logger: logger,
alwaysReconnect: alwaysReconnect
)
return .init {
try await manager.connect(cleanSession: cleanSession)
} shutdown: {
manager.shutdown()
} stream: {
MQTTConnectionStream(client: client)
.start()
.removeDuplicates()
.eraseToStream()
}
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
// MARK: - Helpers
final class MQTTConnectionStream: AsyncSequence, Sendable {
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
typealias Element = MQTTConnectionManager.Event
private let client: MQTTClient
private let continuation: AsyncStream<Element>.Continuation
private var logger: Logger { client.logger }
private let name: String
private let stream: AsyncStream<Element>
init(client: MQTTClient) {
let (stream, continuation) = AsyncStream<Element>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
continuation.yield(client.isActive() ? .connected : .disconnected)
}
deinit { stop() }
func start(
isolation: isolated (any Actor)? = #isolation
) -> AsyncStream<Element> {
client.addCloseListener(named: name) { _ in
self.logger.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
client.addShutdownListener(named: name) { _ in
self.logger.trace("Client is shutting down.")
self.continuation.yield(.shuttingDown)
self.stop()
}
let task = Task {
while !Task.isCancelled {
try? await Task.sleep(for: .milliseconds(100))
continuation.yield(
self.client.isActive() ? .connected : .disconnected
)
}
}
continuation.onTermination = { _ in
task.cancel()
}
return stream
}
func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
public __consuming func makeAsyncIterator() -> AsyncIterator {
start().makeAsyncIterator()
}
}
final class ConnectionManager: Sendable {
private let client: MQTTClient
private let logger: Logger?
private let name: String
private let shouldReconnect: Bool
init(
client: MQTTClient,
logger: Logger?,
alwaysReconnect: Bool
) {
self.client = client
self.logger = logger
self.name = UUID().uuidString
self.shouldReconnect = alwaysReconnect
}
deinit {
// We should've already logged that we're shutting down if
// the manager was shutdown properly, so don't log it twice.
self.shutdown(withLogging: false)
}
func connect(
isolation: isolated (any Actor)? = #isolation,
cleanSession: Bool
) async throws {
do {
try await client.connect(cleanSession: cleanSession)
client.addCloseListener(named: name) { [weak self] _ in
guard let `self` else { return }
self.logger?.debug("Connection closed.")
if self.shouldReconnect {
self.logger?.debug("Reconnecting...")
Task {
try await self.connect(cleanSession: cleanSession)
}
}
}
client.addShutdownListener(named: name) { [weak self] _ in
self?.shutdown()
}
} catch {
logger?.trace("Failed to connect: \(error)")
throw error
}
}
func shutdown(withLogging: Bool = true) {
if withLogging {
logger?.trace("Shutting down connection.")
}
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
}

View File

@@ -1,136 +0,0 @@
import Foundation
import Logging
@_exported import MQTTConnectionService
import MQTTNIO
public extension MQTTConnectionManager {
static func live(
client: MQTTClient,
cleanSession: Bool = false,
logger: Logger? = nil
) -> Self {
let manager = ConnectionManager(client: client, logger: logger)
return .init {
try await manager.connect(cleanSession: cleanSession)
return manager.stream
.removeDuplicates()
.eraseToStream()
} shutdown: {
manager.shutdown()
}
}
}
// MARK: - Helpers
final class MQTTConnectionStream: Sendable {
private let client: MQTTClient
private let continuation: AsyncStream<MQTTConnectionManager.Event>.Continuation
private var logger: Logger { client.logger }
private let name: String
private let stream: AsyncStream<MQTTConnectionManager.Event>
init(client: MQTTClient) {
let (stream, continuation) = AsyncStream<MQTTConnectionManager.Event>.makeStream()
self.client = client
self.continuation = continuation
self.name = UUID().uuidString
self.stream = stream
continuation.yield(client.isActive() ? .connected : .disconnected)
}
deinit { stop() }
func start() -> AsyncStream<MQTTConnectionManager.Event> {
client.addCloseListener(named: name) { _ in
self.logger.trace("Client has disconnected.")
self.continuation.yield(.disconnected)
}
client.addShutdownListener(named: name) { _ in
self.logger.trace("Client is shutting down.")
self.continuation.yield(.shuttingDown)
self.stop()
}
let task = Task {
while !Task.isCancelled {
try? await Task.sleep(for: .milliseconds(100))
continuation.yield(
self.client.isActive() ? .connected : .disconnected
)
}
}
continuation.onTermination = { _ in
task.cancel()
}
return stream
}
func stop() {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.finish()
}
}
// TODO: Remove stream stuff from this.
private actor ConnectionManager {
private let client: MQTTClient
private let continuation: AsyncStream<MQTTConnectionManager.Event>.Continuation
private nonisolated let logger: Logger?
private let name: String
private var started: Bool = false
let stream: AsyncStream<MQTTConnectionManager.Event>
init(
client: MQTTClient,
logger: Logger?
) {
let (stream, continuation) = AsyncStream<MQTTConnectionManager.Event>.makeStream()
self.client = client
self.continuation = continuation
self.logger = logger
self.name = UUID().uuidString
self.stream = stream
}
deinit {
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
}
func connect(cleanSession: Bool) async throws {
do {
try await client.connect(cleanSession: cleanSession)
continuation.yield(.connected)
client.addCloseListener(named: name) { _ in
self.continuation.yield(.disconnected)
self.logger?.debug("Connection closed.")
self.logger?.debug("Reconnecting...")
Task { try await self.connect(cleanSession: cleanSession) }
}
client.addShutdownListener(named: name) { _ in
self.shutdown()
}
} catch {
client.logger.trace("Failed to connect: \(error)")
continuation.yield(.disconnected)
throw error
}
}
nonisolated func shutdown() {
client.logger.trace("Shutting down connection.")
client.removeCloseListener(named: name)
client.removeShutdownListener(named: name)
continuation.yield(.shuttingDown)
continuation.finish()
}
}

View File

@@ -1,44 +1,9 @@
import Dependencies import Dependencies
import DependenciesMacros
import Foundation
import Logging import Logging
import Models import Models
import MQTTConnectionManager
import ServiceLifecycle import ServiceLifecycle
/// Represents the interface needed for the ``MQTTConnectionService``.
///
/// See ``MQTTConnectionManagerLive`` module for live implementation.
@DependencyClient
public struct MQTTConnectionManager: Sendable {
public var connect: @Sendable () async throws -> AsyncStream<Event>
public var shutdown: () -> Void
public enum Event: Sendable {
case connected
case disconnected
case shuttingDown
}
}
extension MQTTConnectionManager: TestDependencyKey {
public static var testValue: MQTTConnectionManager {
Self()
}
}
public extension DependencyValues {
/// A dependency that is responsible for managing the connection to
/// an MQTT broker.
var mqttConnectionManager: MQTTConnectionManager {
get { self[MQTTConnectionManager.self] }
set { self[MQTTConnectionManager.self] = newValue }
}
}
// MARK: - MQTTConnectionService
public actor MQTTConnectionService: Service { public actor MQTTConnectionService: Service {
@Dependency(\.mqttConnectionManager) var manager @Dependency(\.mqttConnectionManager) var manager
@@ -55,13 +20,15 @@ public actor MQTTConnectionService: Service {
/// connection. /// connection.
public func run() async throws { public func run() async throws {
try await withGracefulShutdownHandler { try await withGracefulShutdownHandler {
let stream = try await manager.connect() try await manager.connect()
for await event in stream.cancelOnGracefulShutdown() { for await event in try manager.stream().cancelOnGracefulShutdown() {
// We don't really need to do anything with the events, so just logging // We don't really need to do anything with the events, so just logging
// for now. But we need to iterate on an async stream for the service to // for now. But we need to iterate on an async stream for the service to
// continue to run and handle graceful shutdowns. // continue to run and handle graceful shutdowns.
logger?.trace("Received connection event: \(event)") logger?.trace("Received connection event: \(event)")
} }
// when we reach here we are shutting down, so we shutdown
// the manager.
manager.shutdown() manager.shutdown()
} onGracefulShutdown: { } onGracefulShutdown: {
self.logger?.trace("Received graceful shutdown.") self.logger?.trace("Received graceful shutdown.")

View File

@@ -50,7 +50,10 @@ public struct TopicPublisher: Sendable {
public static func live(client: MQTTClient) -> Self { public static func live(client: MQTTClient) -> Self {
.init( .init(
publish: { request in publish: { request in
assert(client.isActive(), "Client not connected.") guard client.isActive() else {
client.logger.trace("Client is not connected, unable to publish to \(request.topicName)")
return
}
client.logger.trace("Begin publishing to topic: \(request.topicName)") client.logger.trace("Begin publishing to topic: \(request.topicName)")
defer { client.logger.trace("Done publishing to topic: \(request.topicName)") } defer { client.logger.trace("Done publishing to topic: \(request.topicName)") }
try await client.publish( try await client.publish(

View File

@@ -2,7 +2,8 @@ import Dependencies
import Foundation import Foundation
import Logging import Logging
import Models import Models
import MQTTConnectionManagerLive import MQTTConnectionManager
import MQTTConnectionService
import MQTTNIO import MQTTNIO
import NIO import NIO
import PsychrometricClientLive import PsychrometricClientLive
@@ -15,7 +16,7 @@ struct Application {
/// The main entry point of the application. /// The main entry point of the application.
static func main() async throws { static func main() async throws {
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
var logger = Logger(label: "dewpoint-controller") var logger = Logger(label: "dewpoint-controller")
logger.logLevel = .trace logger.logLevel = .trace
@@ -34,6 +35,7 @@ struct Application {
logger: logger logger: logger
) )
do {
try await withDependencies { try await withDependencies {
$0.psychrometricClient = .liveValue $0.psychrometricClient = .liveValue
$0.topicListener = .live(client: mqtt) $0.topicListener = .live(client: mqtt)
@@ -60,6 +62,9 @@ struct Application {
} }
try await mqtt.shutdown() try await mqtt.shutdown()
} catch {
try await eventloopGroup.shutdownGracefully()
}
} }
} }

View File

@@ -1,7 +1,7 @@
import Logging import Logging
import Models import Models
@testable import MQTTConnectionManagerLive @testable import MQTTConnectionManager
@testable import MQTTConnectionService import MQTTConnectionService
import MQTTNIO import MQTTNIO
import NIO import NIO
import ServiceLifecycle import ServiceLifecycle
@@ -30,7 +30,11 @@ final class MQTTConnectionServiceTests: XCTestCase {
func testMQTTConnectionStream() async throws { func testMQTTConnectionStream() async throws {
let client = createClient(identifier: "testNonManagedStream") let client = createClient(identifier: "testNonManagedStream")
let manager = MQTTConnectionManager.live(client: client, logger: Self.logger) let manager = MQTTConnectionManager.live(
client: client,
logger: Self.logger,
alwaysReconnect: false
)
let stream = MQTTConnectionStream(client: client) let stream = MQTTConnectionStream(client: client)
var events = [MQTTConnectionManager.Event]() var events = [MQTTConnectionManager.Event]()
@@ -44,16 +48,16 @@ final class MQTTConnectionServiceTests: XCTestCase {
manager.shutdown() manager.shutdown()
try await client.disconnect() try await client.disconnect()
try await Task.sleep(for: .milliseconds(200)) try await Task.sleep(for: .milliseconds(200))
try await client.shutdown()
try await Task.sleep(for: .milliseconds(200))
stream.stop() stream.stop()
} }
for await event in stream.start().removeDuplicates() { for await event in stream.removeDuplicates() {
events.append(event) events.append(event)
} }
XCTAssertEqual(events, [.disconnected, .connected, .disconnected]) XCTAssertEqual(events, [.disconnected, .connected, .disconnected, .shuttingDown])
try await client.shutdown()
} }
func createClient(identifier: String) -> MQTTClient { func createClient(identifier: String) -> MQTTClient {