diff --git a/Makefile b/Makefile index 8b586e4..d6dcfaf 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,10 @@ start-mosquitto: --name mosquitto \ -d \ -p 1883:1883 \ - -v $(PWD)/mosquitto/config:/mosquitto/config \ + -p 8883:8883 \ + -p 8080:8080 \ + -p 8081:8081 \ + -v "$(PWD)/mosquitto/config:/mosquitto/config" \ eclipse-mosquitto stop-mosquitto: diff --git a/Package.swift b/Package.swift index 3c18d97..4986053 100644 --- a/Package.swift +++ b/Package.swift @@ -15,6 +15,7 @@ let package = Package( .library(name: "Models", targets: ["Models"]), .library(name: "Client", targets: ["Client"]), .library(name: "ClientLive", targets: ["ClientLive"]), + .library(name: "MQTTStore", targets: ["MQTTStore"]), ], dependencies: [ .package(url: "https://github.com/adam-fowler/mqtt-nio.git", from: "2.0.0"), @@ -87,5 +88,15 @@ let package = Package( "ClientLive" ] ), + .target( + name: "MQTTStore", + dependencies: [ + .product(name: "MQTTNIO", package: "mqtt-nio") + ] + ), + .testTarget( + name: "MQTTStoreTests", + dependencies: ["MQTTStore"] + ) ] ) diff --git a/Sources/ClientLive/Helpers.swift b/Sources/ClientLive/Helpers.swift index f264aa8..70f1f33 100644 --- a/Sources/ClientLive/Helpers.swift +++ b/Sources/ClientLive/Helpers.swift @@ -68,7 +68,12 @@ extension Sensor: FetchableTopic where Reading: BufferInitalizable { extension MQTTNIO.MQTTClient { - func mqttSubscription(topic: String, qos: MQTTQoS = .atLeastOnce, retainAsPublished: Bool = true, retainHandling: MQTTSubscribeInfoV5.RetainHandling = .sendAlways) -> MQTTSubscribeInfoV5 { + func mqttSubscription( + topic: String, + qos: MQTTQoS = .atLeastOnce, + retainAsPublished: Bool = true, + retainHandling: MQTTSubscribeInfoV5.RetainHandling = .sendAlways + ) -> MQTTSubscribeInfoV5 { .init(topicFilter: topic, qos: qos, retainAsPublished: retainAsPublished, retainHandling: retainHandling) } @@ -103,7 +108,6 @@ extension MQTTNIO.MQTTClient { } func fetch(setPoint: KeyPath, setPoints: Topics.SetPoints) -> EventLoopFuture { -// logger.debug("Fetching data for set point: \(setPoint.topic)") return fetch(mqttSubscription(topic: setPoints[keyPath: setPoint])) } diff --git a/Sources/MQTTStore/MQTTStore.swift b/Sources/MQTTStore/MQTTStore.swift new file mode 100644 index 0000000..0d92c2e --- /dev/null +++ b/Sources/MQTTStore/MQTTStore.swift @@ -0,0 +1,95 @@ +import Logging +import Foundation +import MQTTNIO +import NIO + +// TODO: This works and allows tests to complete, but should potentially be simplified. + +typealias PublishTopicHandler = (inout State, Result) -> Void + +struct ServerDetails { + let identifier: String + let hostname: String + let port: Int + let version: MQTTClient.Version + let cleanSession: Bool + let useTLS: Bool + let useWebSocket: Bool + let webSocketUrl: String + let username: String? + let password: String? +} + +class MQTTStore { + typealias Subscription = (topic: String, onPublish: PublishTopicHandler) + + var state: State + var subscriptions: [Subscription] + var client: MQTTClient? + var serverDetails: ServerDetails + var eventLoopGroup: EventLoopGroup + var logger: Logger? + + init( + state: State, + subscriptions: [Subscription], + serverDetails: ServerDetails, + eventLoopGroup: EventLoopGroup, + logger: Logger? = nil + ) { + self.state = state + self.subscriptions = subscriptions + self.serverDetails = serverDetails + self.eventLoopGroup = eventLoopGroup + self.logger = logger + self.createClient() + } + + private func createClient() { + let client = MQTTClient( + host: serverDetails.hostname, + identifier: serverDetails.identifier, + eventLoopGroupProvider: .shared(eventLoopGroup), + logger: logger, + configuration: .init( + version: serverDetails.version, + userName: serverDetails.username, + password: serverDetails.password, + useSSL: serverDetails.useTLS, + useWebSockets: serverDetails.useWebSocket, + webSocketURLPath: serverDetails.webSocketUrl + ) + ) + for subscription in subscriptions { + client.addPublishListener( + named: subscription.topic, + { result in subscription.onPublish(&self.state, result) } + ) + } + self.client = client + } + + func createSubscriptions() -> EventLoopFuture { + let subscriptionInfo = subscriptions.map { MQTTSubscribeInfo.init(topicFilter: $0.0, qos: .atLeastOnce) } + return client?.subscribe(to: subscriptionInfo).map { _ in } ?? eventLoopGroup.next().makeSucceededVoidFuture() + } + + func connect(cleanSession: Bool) -> EventLoopFuture { + client?.connect(cleanSession: cleanSession) ?? eventLoopGroup.next().makeSucceededFuture(false) + } + + func connectAndSubscribe(cleanSession: Bool) -> EventLoopFuture { + connect(cleanSession: cleanSession) + .flatMap{ _ in self.createSubscriptions() } + } + + func destroy() -> EventLoopFuture { + guard let client = client else { + return eventLoopGroup.next().makeSucceededVoidFuture() + } + return client.disconnect().map { _ in + try? self.client?.syncShutdownGracefully() + self.client = nil + } + } +} diff --git a/Tests/ClientTests/ClientTests.swift b/Tests/ClientTests/ClientTests.swift index 97c6c31..3859ff4 100644 --- a/Tests/ClientTests/ClientTests.swift +++ b/Tests/ClientTests/ClientTests.swift @@ -14,40 +14,77 @@ final class ClientLiveTests: XCTestCase { static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost" let topics = Topics() + func test_mqtt_subscription() throws { + let mqttClient = createMQTTClient(identifier: "test_subscription") + _ = try mqttClient.connect().wait() + let sub = try mqttClient.v5.subscribe( + to: [mqttClient.mqttSubscription(topic: "test/subscription")] + ).wait() + XCTAssertEqual(sub.reasons[0], .grantedQoS1) + try mqttClient.disconnect().wait() + try mqttClient.syncShutdownGracefully() + } + + func test_mqtt_listener() throws { + let lock = Lock() + var publishRecieved: [MQTTPublishInfo] = [] + let payloadString = "test" + let payload = ByteBufferAllocator().buffer(string: payloadString) + + let client = self.createMQTTClient(identifier: "testMQTTListener_publisher") + _ = try client.connect().wait() + client.addPublishListener(named: "test") { result in + switch result { + case .success(let publish): + var buffer = publish.payload + let string = buffer.readString(length: buffer.readableBytes) + XCTAssertEqual(string, payloadString) + lock.withLock { + publishRecieved.append(publish) + } + case .failure(let error): + XCTFail("\(error)") + } + } + + try client.publish(to: "testMQTTSubscribe", payload: payload, qos: .atLeastOnce, retain: true).wait() + let sub = try client.v5.subscribe(to: [.init(topicFilter: "testMQTTSubscribe", qos: .atLeastOnce)]).wait() + XCTAssertEqual(sub.reasons[0], .grantedQoS1) + + Thread.sleep(forTimeInterval: 2) + lock.withLock { + XCTAssertEqual(publishRecieved.count, 1) + } + + try client.disconnect().wait() + try client.syncShutdownGracefully() + + } + // func test_fetch_humidity() throws { // let lock = Lock() +// let publishClient = createMQTTClient(identifier: "publishHumidity") // let mqttClient = createMQTTClient(identifier: "fetchHumidity") -// -//// let exp = XCTestExpectation(description: "fetchHumidity") -// +// _ = try publishClient.connect().wait() // let client = try createClient(mqttClient: mqttClient) // var humidityRecieved: [RelativeHumidity] = [] // -// _ = try mqttClient.publish( +// _ = try publishClient.publish( // to: topics.sensors.humidity, // payload: ByteBufferAllocator().buffer(string: "\(50.0)"), // qos: .atLeastOnce // ).wait() // // Thread.sleep(forTimeInterval: 2) -// -//// .flatMapThrowing { _ in -// let humidity = try client.fetchHumidity(.init(topic: self.topics.sensors.humidity)).wait() -// XCTAssertEqual(humidity, 50) -// lock.withLock { -// humidityRecieved.append(humidity) -// } -//// exp.fulfill() -//// }.wait() -// +// try publishClient.disconnect().wait() +// let humidity = try client.fetchHumidity(.init(topic: self.topics.sensors.humidity)).wait() +// XCTAssertEqual(humidity, 50) // Thread.sleep(forTimeInterval: 2) // lock.withLock { -// XCTAssertEqual(humidityRecieved.count, 1) +// humidityRecieved.append(humidity) // } -// // try mqttClient.disconnect().wait() // try mqttClient.syncShutdownGracefully() -// // } // MARK: - Helpers @@ -56,12 +93,23 @@ final class ClientLiveTests: XCTestCase { host: Self.hostname, port: 1883, identifier: identifier, - eventLoopGroupProvider: .createNew, + eventLoopGroupProvider: .shared(eventLoopGroup), logger: self.logger, configuration: .init(version: .v5_0) ) } +// func createWebSocketClient(identifier: String) -> MQTTNIO.MQTTClient { +// MQTTNIO.MQTTClient( +// host: Self.hostname, +// port: 8080, +// identifier: identifier, +// eventLoopGroupProvider: .createNew, +// logger: self.logger, +// configuration: .init(useWebSockets: true, webSocketURLPath: "/mqtt") +// ) +// } + // Uses default topic names. func createClient(mqttClient: MQTTNIO.MQTTClient, autoConnect: Bool = true) throws -> Client.MQTTClient { if autoConnect { @@ -75,4 +123,6 @@ final class ClientLiveTests: XCTestCase { logger.logLevel = .trace return logger }() + + let eventLoopGroup = MultiThreadedEventLoopGroup.init(numberOfThreads: 1) } diff --git a/Tests/MQTTStoreTests/MQTTStoreTests.swift b/Tests/MQTTStoreTests/MQTTStoreTests.swift new file mode 100644 index 0000000..a6ae10d --- /dev/null +++ b/Tests/MQTTStoreTests/MQTTStoreTests.swift @@ -0,0 +1,77 @@ +import Logging +import XCTest +import MQTTNIO +@testable import MQTTStore +import NIO + +final class ServerTests: XCTestCase { + + func testConnect() throws { + let store = createTestStore() + _ = try store.connect(cleanSession: true).wait() + try store.destroy().wait() + } + + func testSubscriptionHandler() throws { + let store = createTestStore() + _ = try store.connectAndSubscribe(cleanSession: true).wait() + + _ = try store.client?.publish( + to: "test/topic", + payload: ByteBufferAllocator().buffer(string: "test"), + qos: .atLeastOnce + ).wait() + + Thread.sleep(forTimeInterval: 2) + + XCTAssertEqual(store.state.messages.count, 1) + XCTAssertEqual(store.state.messages[0], "test") + try store.destroy().wait() + } + + + func createTestStore() -> MQTTStore { + .init( + state: .init(messages: []), + subscriptions: [("test/topic", stateHandler(_:_:))], + serverDetails: serverDetails, + eventLoopGroup: MultiThreadedEventLoopGroup.init(numberOfThreads: 1), + logger: logger + ) + } + + let logger: Logger = { + var logger = Logger(label: "MQTT Test") + logger.logLevel = .trace + return logger + }() + + var serverDetails: ServerDetails { + .init( + identifier: "Test Server", + hostname: "localhost", + port: 1883, + version: .v3_1_1, + cleanSession: true, + useTLS: false, + useWebSocket: false, + webSocketUrl: "/mqtt", + username: nil, + password: nil + ) + } + + struct TestState { + var messages: [String] + } + + func stateHandler(_ state: inout TestState, _ result: Result) { + switch result { + case let .success(value): + let payload = String(buffer: value.payload) + state.messages.append(payload) + case .failure: + break + } + } +} diff --git a/Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift b/Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift index 6eb2937..ab4b519 100644 --- a/Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift +++ b/Tests/dewPoint-controllerTests/dewPoint_controllerTests.swift @@ -1,47 +1,47 @@ import XCTest import class Foundation.Bundle -final class dewPoint_controllerTests: XCTestCase { - func testExample() throws { - // This is an example of a functional test case. - // Use XCTAssert and related functions to verify your tests produce the correct - // results. - - // Some of the APIs that we use below are available in macOS 10.13 and above. - guard #available(macOS 10.13, *) else { - return - } - - // Mac Catalyst won't have `Process`, but it is supported for executables. - #if !targetEnvironment(macCatalyst) - - let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller") - - let process = Process() - process.executableURL = fooBinary - - let pipe = Pipe() - process.standardOutput = pipe - - try process.run() - process.waitUntilExit() - - let data = pipe.fileHandleForReading.readDataToEndOfFile() - let output = String(data: data, encoding: .utf8) - - XCTAssertEqual(output, "Hello, world!\n") - #endif - } - - /// Returns path to the built products directory. - var productsDirectory: URL { - #if os(macOS) - for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") { - return bundle.bundleURL.deletingLastPathComponent() - } - fatalError("couldn't find the products directory") - #else - return Bundle.main.bundleURL - #endif - } -} +//final class dewPoint_controllerTests: XCTestCase { +// func testExample() throws { +// // This is an example of a functional test case. +// // Use XCTAssert and related functions to verify your tests produce the correct +// // results. +// +// // Some of the APIs that we use below are available in macOS 10.13 and above. +// guard #available(macOS 10.13, *) else { +// return +// } +// +// // Mac Catalyst won't have `Process`, but it is supported for executables. +// #if !targetEnvironment(macCatalyst) +// +// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller") +// +// let process = Process() +// process.executableURL = fooBinary +// +// let pipe = Pipe() +// process.standardOutput = pipe +// +// try process.run() +// process.waitUntilExit() +// +// let data = pipe.fileHandleForReading.readDataToEndOfFile() +// let output = String(data: data, encoding: .utf8) +// +// XCTAssertEqual(output, "Hello, world!\n") +// #endif +// } +// +// /// Returns path to the built products directory. +// var productsDirectory: URL { +// #if os(macOS) +// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") { +// return bundle.bundleURL.deletingLastPathComponent() +// } +// fatalError("couldn't find the products directory") +// #else +// return Bundle.main.bundleURL +// #endif +// } +//} diff --git a/docker-compose.yaml b/docker-compose.yaml index 5f06f16..e821fe6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -20,3 +20,6 @@ services: - ./mosquitto/certs:/mosquitto/certs ports: - "1883:1883" + - "8883:8883" + - "8080:8080" + - "8081:8081"