feat: Adds graceful shutdown to services
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import EnvVars
|
||||
import Combine
|
||||
import Logging
|
||||
import Models
|
||||
@testable import MQTTConnectionService
|
||||
import MQTTNIO
|
||||
import NIO
|
||||
@@ -25,6 +26,8 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
XCTAssert(client.isActive())
|
||||
trigger.triggerGracefulShutdown()
|
||||
// try await Task.sleep(for: .seconds(2))
|
||||
// XCTAssertFalse(client.isActive())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,4 +58,65 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
||||
)
|
||||
}
|
||||
|
||||
func testEventStream() async throws {
|
||||
var connection: ConnectionStream? = ConnectionStream()
|
||||
|
||||
let task = Task {
|
||||
guard let events = connection?.events else { return }
|
||||
print("before loop")
|
||||
for await event in events {
|
||||
print("\(event)")
|
||||
}
|
||||
print("after loop")
|
||||
}
|
||||
|
||||
let ending = Task {
|
||||
try await Task.sleep(for: .seconds(2))
|
||||
connection = nil
|
||||
}
|
||||
|
||||
connection?.start()
|
||||
try await ending.value
|
||||
task.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ConnectionStream {
|
||||
|
||||
enum Event {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
let events: AsyncStream<Event>
|
||||
private let continuation: AsyncStream<Event>.Continuation
|
||||
private var cancellable: AnyCancellable?
|
||||
|
||||
init() {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
print("connection stream is gone.")
|
||||
stop()
|
||||
}
|
||||
|
||||
func start() {
|
||||
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
||||
.autoconnect()
|
||||
.sink { [weak self] _ in
|
||||
print("will send event.")
|
||||
self?.continuation.yield(.connected)
|
||||
}
|
||||
}
|
||||
|
||||
func stop() {
|
||||
continuation.yield(.shuttingDown)
|
||||
cancellable = nil
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user