diff --git a/Sources/MQTTConnectionService/MQTTConnectionService.swift b/Sources/MQTTConnectionService/MQTTConnectionService.swift index 37cc1b5..305d990 100644 --- a/Sources/MQTTConnectionService/MQTTConnectionService.swift +++ b/Sources/MQTTConnectionService/MQTTConnectionService.swift @@ -9,7 +9,7 @@ public actor MQTTConnectionService: Service { private let cleanSession: Bool public let client: MQTTClient private var shuttingDown = false - var logger: Logger { client.logger } + nonisolated var logger: Logger { client.logger } public init( cleanSession: Bool = true, @@ -25,32 +25,52 @@ public actor MQTTConnectionService: Service { /// It will attempt to gracefully shutdown the connection upon receiving /// `sigterm` signals. public func run() async throws { - await withGracefulShutdownHandler { - await self.connect() - } onGracefulShutdown: { - Task { await self.shutdown() } + await withDiscardingTaskGroup { group in + await withGracefulShutdownHandler { + group.addTask { await self.connect() } + } onGracefulShutdown: { + // try? self.client.syncShutdownGracefully() + Task { await self.shutdown() } + } } } - private func shutdown() async { + func shutdown() async { shuttingDown = true try? await client.disconnect() try? await client.shutdown() } - private func connect() async { + func connect() async { do { - try await client.connect(cleanSession: cleanSession) - client.addCloseListener(named: "SensorsClient") { [self] _ in - Task { - self.logger.debug("Connection closed.") - self.logger.debug("Reconnecting...") - await self.connect() + try await withThrowingDiscardingTaskGroup { group in + group.addTask { + try await self.client.connect(cleanSession: self.cleanSession) } + client.addCloseListener(named: "SensorsClient") { [self] _ in + Task { + self.logger.debug("Connection closed.") + self.logger.debug("Reconnecting...") + await self.connect() + } + } + self.logger.debug("Connection successful.") } - logger.debug("Connection successful.") } catch { - logger.trace("Connection Failed.\n\(error)") + logger.trace("Failed to connect.") } +// do { +// try await client.connect(cleanSession: cleanSession) +// client.addCloseListener(named: "SensorsClient") { [self] _ in +// Task { +// self.logger.debug("Connection closed.") +// self.logger.debug("Reconnecting...") +// await self.connect() +// } +// } +// logger.debug("Connection successful.") +// } catch { +// logger.trace("Connection Failed.\(error)") +// } } } diff --git a/Sources/SensorsService/SensorsService.swift b/Sources/SensorsService/SensorsService.swift index e133170..e6b2d2b 100644 --- a/Sources/SensorsService/SensorsService.swift +++ b/Sources/SensorsService/SensorsService.swift @@ -19,6 +19,11 @@ public actor SensorsService: Service { self.sensors = sensors } + /// The entry-point of the service. + /// + /// This method is called to start the service and begin + /// listening for sensor value changes then publishing the dew-point + /// and enthalpy values of the sensors. public func run() async throws { guard client.isActive() else { throw MQTTClientNotConnected() diff --git a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift index a2cc481..a3543e3 100644 --- a/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift +++ b/Tests/MQTTConnectionServiceTests/MQTTConnectionServiceTests.swift @@ -1,8 +1,9 @@ import EnvVars import Logging -import MQTTConnectionService +@testable import MQTTConnectionService import MQTTNIO import NIO +import ServiceLifecycle import ServiceLifecycleTestKit import XCTest @@ -12,22 +13,19 @@ final class MQTTConnectionServiceTests: XCTestCase { static let logger: Logger = { var logger = Logger(label: "AsyncClientTests") - logger.logLevel = .debug + logger.logLevel = .trace return logger }() func testGracefulShutdownWorks() async throws { - let client = createClient(identifier: "testGracefulShutdown") - try await testGracefulShutdown { trigger in + let client = createClient(identifier: "testGracefulShutdown") let service = MQTTConnectionService(client: client) try await service.run() + try await Task.sleep(for: .seconds(1)) + XCTAssert(client.isActive()) trigger.triggerGracefulShutdown() } - - try await Task.sleep(for: .seconds(1)) - - XCTAssertFalse(client.isActive()) } func createClient(identifier: String) -> MQTTClient {