feat: Removes unused files
This commit is contained in:
2
Makefile
2
Makefile
@@ -23,7 +23,7 @@ stop-mosquitto:
|
||||
@docker-compose rm -f mosquitto || true
|
||||
|
||||
test-docker:
|
||||
@docker-compose run --remove-orphans -i --rm test
|
||||
@docker-compose run --build --remove-orphans -i --rm test
|
||||
@docker-compose kill mosquitto-test
|
||||
@docker-compose rm -f
|
||||
|
||||
|
||||
@@ -40,10 +40,6 @@ let package = Package(
|
||||
.product(name: "PsychrometricClientLive", package: "swift-psychrometrics")
|
||||
]
|
||||
),
|
||||
.testTarget(
|
||||
name: "dewPoint-controllerTests",
|
||||
dependencies: ["dewPoint-controller"]
|
||||
),
|
||||
.target(
|
||||
name: "Models",
|
||||
dependencies: [
|
||||
|
||||
@@ -71,7 +71,7 @@ public actor MQTTConnectionService: Service {
|
||||
}
|
||||
}
|
||||
|
||||
private nonisolated func shutdown() {
|
||||
nonisolated func shutdown() {
|
||||
logger.debug("Begin shutting down MQTT broker connection.")
|
||||
client.removeCloseListener(named: name)
|
||||
internalEventStream.stop()
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
/// This allows values to only publish changes if they have changed since the
|
||||
/// last time they were recieved.
|
||||
@propertyWrapper
|
||||
public struct TrackedChanges<Value> {
|
||||
public struct TrackedChanges<Value: Sendable>: Sendable {
|
||||
|
||||
/// The current tracking state.
|
||||
private var tracking: TrackingState
|
||||
@@ -12,7 +12,7 @@ public struct TrackedChanges<Value> {
|
||||
private var value: Value
|
||||
|
||||
/// Used to check if a new value is equal to an old value.
|
||||
private var isEqual: (Value, Value) -> Bool
|
||||
private var isEqual: @Sendable (Value, Value) -> Bool
|
||||
|
||||
/// Access to the underlying property that we are wrapping.
|
||||
public var wrappedValue: Value {
|
||||
@@ -35,7 +35,7 @@ public struct TrackedChanges<Value> {
|
||||
public init(
|
||||
wrappedValue: Value,
|
||||
needsProcessed: Bool = false,
|
||||
isEqual: @escaping (Value, Value) -> Bool
|
||||
isEqual: @escaping @Sendable (Value, Value) -> Bool
|
||||
) {
|
||||
self.value = wrappedValue
|
||||
self.tracking = needsProcessed ? .needsProcessed : .hasProcessed
|
||||
@@ -85,7 +85,9 @@ extension TrackedChanges: Equatable where Value: Equatable {
|
||||
wrappedValue: Value,
|
||||
needsProcessed: Bool = false
|
||||
) {
|
||||
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed, isEqual: ==)
|
||||
self.init(wrappedValue: wrappedValue, needsProcessed: needsProcessed) {
|
||||
$0 == $1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,5 +98,3 @@ extension TrackedChanges: Hashable where Value: Hashable {
|
||||
hasher.combine(needsProcessed)
|
||||
}
|
||||
}
|
||||
|
||||
extension TrackedChanges: Sendable where Value: Sendable {}
|
||||
|
||||
@@ -88,10 +88,10 @@ public actor SensorsService: Service {
|
||||
|
||||
let stream = try await client.listen(to: topics)
|
||||
|
||||
try await withGracefulShutdownHandler {
|
||||
try await withThrowingDiscardingTaskGroup { group in
|
||||
await withGracefulShutdownHandler {
|
||||
await withDiscardingTaskGroup { group in
|
||||
for await result in stream {
|
||||
group.addTask { try await self.handleResult(result) }
|
||||
group.addTask { await self.handleResult(result) }
|
||||
}
|
||||
}
|
||||
} onGracefulShutdown: {
|
||||
@@ -110,37 +110,41 @@ public actor SensorsService: Service {
|
||||
}
|
||||
}
|
||||
|
||||
private func handleResult(_ result: SensorsClient.PublishInfo) async throws {
|
||||
let topic = result.topic
|
||||
assert(topics.contains(topic))
|
||||
client.logger?.trace("Begin handling result for topic: \(topic)")
|
||||
private func handleResult(_ result: SensorsClient.PublishInfo) async {
|
||||
do {
|
||||
let topic = result.topic
|
||||
assert(topics.contains(topic))
|
||||
client.logger?.trace("Begin handling result for topic: \(topic)")
|
||||
|
||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||
var buffer = result.buffer
|
||||
return V(buffer: &buffer)
|
||||
}
|
||||
|
||||
if topic.contains("temperature") {
|
||||
client.logger?.trace("Begin handling temperature result.")
|
||||
guard let temperature = decode(DryBulb.self) else {
|
||||
client.logger?.trace("Failed to decode temperature: \(result.buffer)")
|
||||
throw DecodingError()
|
||||
func decode<V: BufferInitalizable>(_: V.Type) -> V? {
|
||||
var buffer = result.buffer
|
||||
return V(buffer: &buffer)
|
||||
}
|
||||
client.logger?.trace("Decoded temperature: \(temperature)")
|
||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||
|
||||
} else if topic.contains("humidity") {
|
||||
client.logger?.trace("Begin handling humidity result.")
|
||||
guard let humidity = decode(RelativeHumidity.self) else {
|
||||
client.logger?.trace("Failed to decode humidity: \(result.buffer)")
|
||||
throw DecodingError()
|
||||
if topic.contains("temperature") {
|
||||
client.logger?.trace("Begin handling temperature result.")
|
||||
guard let temperature = decode(DryBulb.self) else {
|
||||
client.logger?.trace("Failed to decode temperature: \(result.buffer)")
|
||||
throw DecodingError()
|
||||
}
|
||||
client.logger?.trace("Decoded temperature: \(temperature)")
|
||||
try sensors.update(topic: topic, keyPath: \.temperature, with: temperature)
|
||||
|
||||
} else if topic.contains("humidity") {
|
||||
client.logger?.trace("Begin handling humidity result.")
|
||||
guard let humidity = decode(RelativeHumidity.self) else {
|
||||
client.logger?.trace("Failed to decode humidity: \(result.buffer)")
|
||||
throw DecodingError()
|
||||
}
|
||||
client.logger?.trace("Decoded humidity: \(humidity)")
|
||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||
}
|
||||
client.logger?.trace("Decoded humidity: \(humidity)")
|
||||
try sensors.update(topic: topic, keyPath: \.humidity, with: humidity)
|
||||
}
|
||||
|
||||
try await publishUpdates()
|
||||
client.logger?.trace("Done handling result for topic: \(topic)")
|
||||
try await publishUpdates()
|
||||
client.logger?.trace("Done handling result for topic: \(topic)")
|
||||
} catch {
|
||||
client.logger?.error("Received error: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
private func publish(_ double: Double?, to topic: String) async throws {
|
||||
|
||||
@@ -11,6 +11,7 @@ import ServiceLifecycle
|
||||
|
||||
@main
|
||||
struct Application {
|
||||
|
||||
/// The main entry point of the application.
|
||||
static func main() async throws {
|
||||
let eventloopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import Combine
|
||||
import Logging
|
||||
import Models
|
||||
@testable import MQTTConnectionService
|
||||
@@ -13,22 +12,19 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "AsyncClientTests")
|
||||
var logger = Logger(label: "MQTTConnectionServiceTests")
|
||||
logger.logLevel = .trace
|
||||
return logger
|
||||
}()
|
||||
|
||||
func testGracefulShutdownWorks() async throws {
|
||||
try await testGracefulShutdown { trigger in
|
||||
let client = createClient(identifier: "testGracefulShutdown")
|
||||
let service = MQTTConnectionService(client: client)
|
||||
try await service.run()
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
XCTAssert(client.isActive())
|
||||
trigger.triggerGracefulShutdown()
|
||||
// try await Task.sleep(for: .seconds(2))
|
||||
// XCTAssertFalse(client.isActive())
|
||||
}
|
||||
let client = createClient(identifier: "testGracefulShutdown")
|
||||
let service = MQTTConnectionService(client: client)
|
||||
await service.connect()
|
||||
try await Task.sleep(for: .seconds(1))
|
||||
XCTAssert(client.isActive())
|
||||
service.shutdown()
|
||||
XCTAssertFalse(client.isActive())
|
||||
}
|
||||
|
||||
func createClient(identifier: String) -> MQTTClient {
|
||||
@@ -58,65 +54,4 @@ final class MQTTConnectionServiceTests: XCTestCase {
|
||||
)
|
||||
}
|
||||
|
||||
func testEventStream() async throws {
|
||||
var connection: ConnectionStream? = ConnectionStream()
|
||||
|
||||
let task = Task {
|
||||
guard let events = connection?.events else { return }
|
||||
print("before loop")
|
||||
for await event in events {
|
||||
print("\(event)")
|
||||
}
|
||||
print("after loop")
|
||||
}
|
||||
|
||||
let ending = Task {
|
||||
try await Task.sleep(for: .seconds(2))
|
||||
connection = nil
|
||||
}
|
||||
|
||||
connection?.start()
|
||||
try await ending.value
|
||||
task.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ConnectionStream {
|
||||
|
||||
enum Event {
|
||||
case connected
|
||||
case disconnected
|
||||
case shuttingDown
|
||||
}
|
||||
|
||||
let events: AsyncStream<Event>
|
||||
private let continuation: AsyncStream<Event>.Continuation
|
||||
private var cancellable: AnyCancellable?
|
||||
|
||||
init() {
|
||||
let (stream, continuation) = AsyncStream.makeStream(of: Event.self)
|
||||
self.events = stream
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
deinit {
|
||||
print("connection stream is gone.")
|
||||
stop()
|
||||
}
|
||||
|
||||
func start() {
|
||||
cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
|
||||
.autoconnect()
|
||||
.sink { [weak self] _ in
|
||||
print("will send event.")
|
||||
self?.continuation.yield(.connected)
|
||||
}
|
||||
}
|
||||
|
||||
func stop() {
|
||||
continuation.yield(.shuttingDown)
|
||||
cancellable = nil
|
||||
continuation.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ final class SensorsClientTests: XCTestCase {
|
||||
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
|
||||
|
||||
static let logger: Logger = {
|
||||
var logger = Logger(label: "AsyncClientTests")
|
||||
var logger = Logger(label: "SensorsClientTests")
|
||||
logger.logLevel = .debug
|
||||
return logger
|
||||
}()
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
import XCTest
|
||||
import class Foundation.Bundle
|
||||
|
||||
//final class dewPoint_controllerTests: XCTestCase {
|
||||
// func testExample() throws {
|
||||
// // This is an example of a functional test case.
|
||||
// // Use XCTAssert and related functions to verify your tests produce the correct
|
||||
// // results.
|
||||
//
|
||||
// // Some of the APIs that we use below are available in macOS 10.13 and above.
|
||||
// guard #available(macOS 10.13, *) else {
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Mac Catalyst won't have `Process`, but it is supported for executables.
|
||||
// #if !targetEnvironment(macCatalyst)
|
||||
//
|
||||
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
|
||||
//
|
||||
// let process = Process()
|
||||
// process.executableURL = fooBinary
|
||||
//
|
||||
// let pipe = Pipe()
|
||||
// process.standardOutput = pipe
|
||||
//
|
||||
// try process.run()
|
||||
// process.waitUntilExit()
|
||||
//
|
||||
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
|
||||
// let output = String(data: data, encoding: .utf8)
|
||||
//
|
||||
// XCTAssertEqual(output, "Hello, world!\n")
|
||||
// #endif
|
||||
// }
|
||||
//
|
||||
// /// Returns path to the built products directory.
|
||||
// var productsDirectory: URL {
|
||||
// #if os(macOS)
|
||||
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
|
||||
// return bundle.bundleURL.deletingLastPathComponent()
|
||||
// }
|
||||
// fatalError("couldn't find the products directory")
|
||||
// #else
|
||||
// return Bundle.main.bundleURL
|
||||
// #endif
|
||||
// }
|
||||
//}
|
||||
Reference in New Issue
Block a user