diff --git a/.swiftformat b/.swiftformat index b7d8e75..08f338e 100644 --- a/.swiftformat +++ b/.swiftformat @@ -8,3 +8,4 @@ --wrapconditions after-first --typeblanklines preserve --commas inline +--stripunusedargs closure-only diff --git a/Package.swift b/Package.swift index 0585415..f968129 100755 --- a/Package.swift +++ b/Package.swift @@ -15,12 +15,13 @@ let package = Package( products: [ .executable(name: "dewpoint-controller", targets: ["dewpoint-controller"]), .library(name: "Models", targets: ["Models"]), - .library(name: "MQTTConnectionManagerLive", targets: ["MQTTConnectionManagerLive"]), + .library(name: "MQTTConnectionManager", targets: ["MQTTConnectionManager"]), .library(name: "MQTTConnectionService", targets: ["MQTTConnectionService"]), .library(name: "SensorsService", targets: ["SensorsService"]), .library(name: "TopicDependencies", targets: ["TopicDependencies"]) ], dependencies: [ + .package(url: "https://github.com/apple/swift-async-algorithms", from: "1.0.0"), .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), .package(url: "https://github.com/apple/swift-log", from: "1.6.0"), .package(url: "https://github.com/pointfreeco/swift-dependencies", from: "1.4.1"), @@ -33,7 +34,8 @@ let package = Package( name: "dewpoint-controller", dependencies: [ "Models", - "MQTTConnectionManagerLive", + "MQTTConnectionManager", + "MQTTConnectionService", "SensorsService", "TopicDependencies", .product(name: "MQTTNIO", package: "mqtt-nio"), @@ -50,9 +52,11 @@ let package = Package( swiftSettings: swiftSettings ), .target( - name: "MQTTConnectionManagerLive", + name: "MQTTConnectionManager", dependencies: [ - "MQTTConnectionService", + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), + .product(name: "Dependencies", package: "swift-dependencies"), + .product(name: "DependenciesMacros", package: "swift-dependencies"), .product(name: "MQTTNIO", package: "mqtt-nio") ], swiftSettings: swiftSettings @@ -61,8 +65,7 @@ let package = Package( name: "MQTTConnectionService", dependencies: [ "Models", - .product(name: "Dependencies", package: "swift-dependencies"), - .product(name: "DependenciesMacros", package: "swift-dependencies"), + "MQTTConnectionManager", .product(name: "ServiceLifecycle", package: "swift-service-lifecycle") ], swiftSettings: swiftSettings @@ -71,7 +74,7 @@ let package = Package( name: "MQTTConnectionServiceTests", dependencies: [ "MQTTConnectionService", - "MQTTConnectionManagerLive", + "MQTTConnectionManager", .product(name: "ServiceLifecycleTestKit", package: "swift-service-lifecycle") ] ), diff --git a/Sources/MQTTConnectionManager/Live.swift b/Sources/MQTTConnectionManager/Live.swift new file mode 100644 index 0000000..0053ed8 --- /dev/null +++ b/Sources/MQTTConnectionManager/Live.swift @@ -0,0 +1,194 @@ +import AsyncAlgorithms +import Dependencies +import DependenciesMacros +import Foundation +import Logging +import MQTTNIO +import NIO + +public extension DependencyValues { + + /// A dependency that is responsible for managing the connection to + /// an MQTT broker. + var mqttConnectionManager: MQTTConnectionManager { + get { self[MQTTConnectionManager.self] } + set { self[MQTTConnectionManager.self] = newValue } + } +} + +/// Represents the interface needed for the ``MQTTConnectionService``. +/// +/// See ``MQTTConnectionManagerLive`` module for live implementation. +@DependencyClient +public struct MQTTConnectionManager: Sendable { + + /// Connect to the MQTT broker. + public var connect: @Sendable () async throws -> Void + + /// Shutdown the connection to the MQTT broker. + /// + /// - Note: You should cancel any tasks that are listening to the connection stream first. + public var shutdown: @Sendable () -> Void + + /// Create a stream of connection events. + public var stream: @Sendable () throws -> AsyncStream + + /// Represents connection events that clients can listen for and + /// react accordingly. + public enum Event: Sendable { + case connected + case disconnected + case shuttingDown + } + + public static func live( + client: MQTTClient, + cleanSession: Bool = false, + logger: Logger? = nil, + alwaysReconnect: Bool = true + ) -> Self { + let manager = ConnectionManager( + client: client, + logger: logger, + alwaysReconnect: alwaysReconnect + ) + return .init { + try await manager.connect(cleanSession: cleanSession) + } shutdown: { + manager.shutdown() + } stream: { + MQTTConnectionStream(client: client) + .start() + .removeDuplicates() + .eraseToStream() + } + } +} + +extension MQTTConnectionManager: TestDependencyKey { + public static var testValue: MQTTConnectionManager { + Self() + } +} + +// MARK: - Helpers + +final class MQTTConnectionStream: AsyncSequence, Sendable { + + typealias AsyncIterator = AsyncStream.AsyncIterator + typealias Element = MQTTConnectionManager.Event + + private let client: MQTTClient + private let continuation: AsyncStream.Continuation + private var logger: Logger { client.logger } + private let name: String + private let stream: AsyncStream + + init(client: MQTTClient) { + let (stream, continuation) = AsyncStream.makeStream() + self.client = client + self.continuation = continuation + self.name = UUID().uuidString + self.stream = stream + continuation.yield(client.isActive() ? .connected : .disconnected) + } + + deinit { stop() } + + func start( + isolation: isolated (any Actor)? = #isolation + ) -> AsyncStream { + client.addCloseListener(named: name) { _ in + self.logger.trace("Client has disconnected.") + self.continuation.yield(.disconnected) + } + client.addShutdownListener(named: name) { _ in + self.logger.trace("Client is shutting down.") + self.continuation.yield(.shuttingDown) + self.stop() + } + let task = Task { + while !Task.isCancelled { + try? await Task.sleep(for: .milliseconds(100)) + continuation.yield( + self.client.isActive() ? .connected : .disconnected + ) + } + } + continuation.onTermination = { _ in + task.cancel() + } + return stream + } + + func stop() { + client.removeCloseListener(named: name) + client.removeShutdownListener(named: name) + continuation.finish() + } + + public __consuming func makeAsyncIterator() -> AsyncIterator { + start().makeAsyncIterator() + } + +} + +final class ConnectionManager: Sendable { + private let client: MQTTClient + private let logger: Logger? + private let name: String + private let shouldReconnect: Bool + + init( + client: MQTTClient, + logger: Logger?, + alwaysReconnect: Bool + ) { + self.client = client + self.logger = logger + self.name = UUID().uuidString + self.shouldReconnect = alwaysReconnect + } + + deinit { + // We should've already logged that we're shutting down if + // the manager was shutdown properly, so don't log it twice. + self.shutdown(withLogging: false) + } + + func connect( + isolation: isolated (any Actor)? = #isolation, + cleanSession: Bool + ) async throws { + do { + try await client.connect(cleanSession: cleanSession) + + client.addCloseListener(named: name) { [weak self] _ in + guard let `self` else { return } + self.logger?.debug("Connection closed.") + if self.shouldReconnect { + self.logger?.debug("Reconnecting...") + Task { + try await self.connect(cleanSession: cleanSession) + } + } + } + + client.addShutdownListener(named: name) { [weak self] _ in + self?.shutdown() + } + + } catch { + logger?.trace("Failed to connect: \(error)") + throw error + } + } + + func shutdown(withLogging: Bool = true) { + if withLogging { + logger?.trace("Shutting down connection.") + } + client.removeCloseListener(named: name) + client.removeShutdownListener(named: name) + } +} diff --git a/Sources/MQTTConnectionManagerLive/Live.swift b/Sources/MQTTConnectionManagerLive/Live.swift deleted file mode 100644 index 6c8c5f5..0000000 --- a/Sources/MQTTConnectionManagerLive/Live.swift +++ /dev/null @@ -1,136 +0,0 @@ -import Foundation -import Logging -@_exported import MQTTConnectionService -import MQTTNIO - -public extension MQTTConnectionManager { - static func live( - client: MQTTClient, - cleanSession: Bool = false, - logger: Logger? = nil - ) -> Self { - let manager = ConnectionManager(client: client, logger: logger) - return .init { - try await manager.connect(cleanSession: cleanSession) - - return manager.stream - .removeDuplicates() - .eraseToStream() - - } shutdown: { - manager.shutdown() - } - } -} - -// MARK: - Helpers - -final class MQTTConnectionStream: Sendable { - private let client: MQTTClient - private let continuation: AsyncStream.Continuation - private var logger: Logger { client.logger } - private let name: String - private let stream: AsyncStream - - init(client: MQTTClient) { - let (stream, continuation) = AsyncStream.makeStream() - self.client = client - self.continuation = continuation - self.name = UUID().uuidString - self.stream = stream - continuation.yield(client.isActive() ? .connected : .disconnected) - } - - deinit { stop() } - - func start() -> AsyncStream { - client.addCloseListener(named: name) { _ in - self.logger.trace("Client has disconnected.") - self.continuation.yield(.disconnected) - } - client.addShutdownListener(named: name) { _ in - self.logger.trace("Client is shutting down.") - self.continuation.yield(.shuttingDown) - self.stop() - } - let task = Task { - while !Task.isCancelled { - try? await Task.sleep(for: .milliseconds(100)) - continuation.yield( - self.client.isActive() ? .connected : .disconnected - ) - } - } - continuation.onTermination = { _ in - task.cancel() - } - return stream - } - - func stop() { - client.removeCloseListener(named: name) - client.removeShutdownListener(named: name) - continuation.finish() - } - -} - -// TODO: Remove stream stuff from this. - -private actor ConnectionManager { - private let client: MQTTClient - private let continuation: AsyncStream.Continuation - private nonisolated let logger: Logger? - private let name: String - private var started: Bool = false - let stream: AsyncStream - - init( - client: MQTTClient, - logger: Logger? - ) { - let (stream, continuation) = AsyncStream.makeStream() - self.client = client - self.continuation = continuation - self.logger = logger - self.name = UUID().uuidString - self.stream = stream - } - - deinit { - client.removeCloseListener(named: name) - client.removeShutdownListener(named: name) - } - - func connect(cleanSession: Bool) async throws { - do { - try await client.connect(cleanSession: cleanSession) - - continuation.yield(.connected) - - client.addCloseListener(named: name) { _ in - self.continuation.yield(.disconnected) - self.logger?.debug("Connection closed.") - self.logger?.debug("Reconnecting...") - Task { try await self.connect(cleanSession: cleanSession) } - } - - client.addShutdownListener(named: name) { _ in - self.shutdown() - } - - } catch { - client.logger.trace("Failed to connect: \(error)") - continuation.yield(.disconnected) - throw error - } - } - - nonisolated func shutdown() { - client.logger.trace("Shutting down connection.") - client.removeCloseListener(named: name) - client.removeShutdownListener(named: name) - continuation.yield(.shuttingDown) - continuation.finish() - } -} diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index da75c3b..9bbc620 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -1,44 +1,9 @@ import Dependencies -import DependenciesMacros -import Foundation import Logging import Models +import MQTTConnectionManager import ServiceLifecycle -/// Represents the interface needed for the ``MQTTConnectionService``. -/// -/// See ``MQTTConnectionManagerLive`` module for live implementation. -@DependencyClient -public struct MQTTConnectionManager: Sendable { - - public var connect: @Sendable () async throws -> AsyncStream - public var shutdown: () -> Void - - public enum Event: Sendable { - case connected - case disconnected - case shuttingDown - } -} - -extension MQTTConnectionManager: TestDependencyKey { - public static var testValue: MQTTConnectionManager { - Self() - } -} - -public extension DependencyValues { - - /// A dependency that is responsible for managing the connection to - /// an MQTT broker. - var mqttConnectionManager: MQTTConnectionManager { - get { self[MQTTConnectionManager.self] } - set { self[MQTTConnectionManager.self] = newValue } - } -} - -// MARK: - MQTTConnectionService - public actor MQTTConnectionService: Service { @Dependency(\.mqttConnectionManager) var manager @@ -55,13 +20,15 @@ public actor MQTTConnectionService: Service { /// connection. public func run() async throws { try await withGracefulShutdownHandler { - let stream = try await manager.connect() - for await event in stream.cancelOnGracefulShutdown() { + try await manager.connect() + for await event in try manager.stream().cancelOnGracefulShutdown() { // We don't really need to do anything with the events, so just logging // for now. But we need to iterate on an async stream for the service to // continue to run and handle graceful shutdowns. logger?.trace("Received connection event: \(event)") } + // when we reach here we are shutting down, so we shutdown + // the manager. manager.shutdown() } onGracefulShutdown: { self.logger?.trace("Received graceful shutdown.") diff --git a/Sources/TopicDependencies/TopicPublisher.swift b/Sources/TopicDependencies/TopicPublisher.swift index 9ddfeee..77da633 100644 --- a/Sources/TopicDependencies/TopicPublisher.swift +++ b/Sources/TopicDependencies/TopicPublisher.swift @@ -50,7 +50,10 @@ public struct TopicPublisher: Sendable { public static func live(client: MQTTClient) -> Self { .init( publish: { request in - assert(client.isActive(), "Client not connected.") + guard client.isActive() else { + client.logger.trace("Client is not connected, unable to publish to \(request.topicName)") + return + } client.logger.trace("Begin publishing to topic: \(request.topicName)") defer { client.logger.trace("Done publishing to topic: \(request.topicName)") } try await client.publish( diff --git a/Sources/dewPoint-controller/Application.swift b/Sources/dewPoint-controller/Application.swift index 9880970..6c218bc 100644 --- a/Sources/dewPoint-controller/Application.swift +++ b/Sources/dewPoint-controller/Application.swift @@ -2,7 +2,8 @@ import Dependencies import Foundation import Logging import Models -import MQTTConnectionManagerLive +import MQTTConnectionManager +import MQTTConnectionService import MQTTNIO import NIO import PsychrometricClientLive @@ -15,7 +16,7 @@ struct Application { /// The main entry point of the application. static func main() async throws { - let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) var logger = Logger(label: "dewpoint-controller") logger.logLevel = .trace @@ -34,32 +35,36 @@ struct Application { logger: logger ) - try await withDependencies { - $0.psychrometricClient = .liveValue - $0.topicListener = .live(client: mqtt) - $0.topicPublisher = .live(client: mqtt) - $0.mqttConnectionManager = .live(client: mqtt, logger: logger) - } operation: { - let mqttConnection = MQTTConnectionService(logger: logger) - let sensors = SensorsService(sensors: .live, logger: logger) + do { + try await withDependencies { + $0.psychrometricClient = .liveValue + $0.topicListener = .live(client: mqtt) + $0.topicPublisher = .live(client: mqtt) + $0.mqttConnectionManager = .live(client: mqtt, logger: logger) + } operation: { + let mqttConnection = MQTTConnectionService(logger: logger) + let sensors = SensorsService(sensors: .live, logger: logger) - var serviceGroupConfiguration = ServiceGroupConfiguration( - services: [ - mqttConnection, - sensors - ], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) - serviceGroupConfiguration.maximumCancellationDuration = .seconds(5) - serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10) + var serviceGroupConfiguration = ServiceGroupConfiguration( + services: [ + mqttConnection, + sensors + ], + gracefulShutdownSignals: [.sigterm, .sigint], + logger: logger + ) + serviceGroupConfiguration.maximumCancellationDuration = .seconds(5) + serviceGroupConfiguration.maximumGracefulShutdownDuration = .seconds(10) - let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) - try await serviceGroup.run() + try await serviceGroup.run() + } + + try await mqtt.shutdown() + } catch { + try await eventloopGroup.shutdownGracefully() } - - try await mqtt.shutdown() } } diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift index 1411b5a..bce5ec5 100644 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift @@ -1,7 +1,7 @@ import Logging import Models -@testable import MQTTConnectionManagerLive -@testable import MQTTConnectionService +@testable import MQTTConnectionManager +import MQTTConnectionService import MQTTNIO import NIO import ServiceLifecycle @@ -30,7 +30,11 @@ final class MQTTConnectionServiceTests: XCTestCase { func testMQTTConnectionStream() async throws { let client = createClient(identifier: "testNonManagedStream") - let manager = MQTTConnectionManager.live(client: client, logger: Self.logger) + let manager = MQTTConnectionManager.live( + client: client, + logger: Self.logger, + alwaysReconnect: false + ) let stream = MQTTConnectionStream(client: client) var events = [MQTTConnectionManager.Event]() @@ -44,16 +48,16 @@ final class MQTTConnectionServiceTests: XCTestCase { manager.shutdown() try await client.disconnect() try await Task.sleep(for: .milliseconds(200)) + try await client.shutdown() + try await Task.sleep(for: .milliseconds(200)) stream.stop() } - for await event in stream.start().removeDuplicates() { + for await event in stream.removeDuplicates() { events.append(event) } - XCTAssertEqual(events, [.disconnected, .connected, .disconnected]) - - try await client.shutdown() + XCTAssertEqual(events, [.disconnected, .connected, .disconnected, .shuttingDown]) } func createClient(identifier: String) -> MQTTClient {