Added MQTTStore.

This commit is contained in:
2021-10-25 16:52:34 -04:00
parent f45f667af6
commit 5c92213ee0
8 changed files with 308 additions and 65 deletions

View File

@@ -16,7 +16,10 @@ start-mosquitto:
--name mosquitto \
-d \
-p 1883:1883 \
-v $(PWD)/mosquitto/config:/mosquitto/config \
-p 8883:8883 \
-p 8080:8080 \
-p 8081:8081 \
-v "$(PWD)/mosquitto/config:/mosquitto/config" \
eclipse-mosquitto
stop-mosquitto:

View File

@@ -15,6 +15,7 @@ let package = Package(
.library(name: "Models", targets: ["Models"]),
.library(name: "Client", targets: ["Client"]),
.library(name: "ClientLive", targets: ["ClientLive"]),
.library(name: "MQTTStore", targets: ["MQTTStore"]),
],
dependencies: [
.package(url: "https://github.com/adam-fowler/mqtt-nio.git", from: "2.0.0"),
@@ -87,5 +88,15 @@ let package = Package(
"ClientLive"
]
),
.target(
name: "MQTTStore",
dependencies: [
.product(name: "MQTTNIO", package: "mqtt-nio")
]
),
.testTarget(
name: "MQTTStoreTests",
dependencies: ["MQTTStore"]
)
]
)

View File

@@ -68,7 +68,12 @@ extension Sensor: FetchableTopic where Reading: BufferInitalizable {
extension MQTTNIO.MQTTClient {
func mqttSubscription(topic: String, qos: MQTTQoS = .atLeastOnce, retainAsPublished: Bool = true, retainHandling: MQTTSubscribeInfoV5.RetainHandling = .sendAlways) -> MQTTSubscribeInfoV5 {
func mqttSubscription(
topic: String,
qos: MQTTQoS = .atLeastOnce,
retainAsPublished: Bool = true,
retainHandling: MQTTSubscribeInfoV5.RetainHandling = .sendAlways
) -> MQTTSubscribeInfoV5 {
.init(topicFilter: topic, qos: qos, retainAsPublished: retainAsPublished, retainHandling: retainHandling)
}
@@ -103,7 +108,6 @@ extension MQTTNIO.MQTTClient {
}
func fetch(setPoint: KeyPath<Topics.SetPoints, String>, setPoints: Topics.SetPoints) -> EventLoopFuture<Double> {
// logger.debug("Fetching data for set point: \(setPoint.topic)")
return fetch(mqttSubscription(topic: setPoints[keyPath: setPoint]))
}

View File

@@ -0,0 +1,95 @@
import Logging
import Foundation
import MQTTNIO
import NIO
// TODO: This works and allows tests to complete, but should potentially be simplified.
typealias PublishTopicHandler<State> = (inout State, Result<MQTTPublishInfo, Error>) -> Void
struct ServerDetails {
let identifier: String
let hostname: String
let port: Int
let version: MQTTClient.Version
let cleanSession: Bool
let useTLS: Bool
let useWebSocket: Bool
let webSocketUrl: String
let username: String?
let password: String?
}
class MQTTStore<State> {
typealias Subscription = (topic: String, onPublish: PublishTopicHandler<State>)
var state: State
var subscriptions: [Subscription]
var client: MQTTClient?
var serverDetails: ServerDetails
var eventLoopGroup: EventLoopGroup
var logger: Logger?
init(
state: State,
subscriptions: [Subscription],
serverDetails: ServerDetails,
eventLoopGroup: EventLoopGroup,
logger: Logger? = nil
) {
self.state = state
self.subscriptions = subscriptions
self.serverDetails = serverDetails
self.eventLoopGroup = eventLoopGroup
self.logger = logger
self.createClient()
}
private func createClient() {
let client = MQTTClient(
host: serverDetails.hostname,
identifier: serverDetails.identifier,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: logger,
configuration: .init(
version: serverDetails.version,
userName: serverDetails.username,
password: serverDetails.password,
useSSL: serverDetails.useTLS,
useWebSockets: serverDetails.useWebSocket,
webSocketURLPath: serverDetails.webSocketUrl
)
)
for subscription in subscriptions {
client.addPublishListener(
named: subscription.topic,
{ result in subscription.onPublish(&self.state, result) }
)
}
self.client = client
}
func createSubscriptions() -> EventLoopFuture<Void> {
let subscriptionInfo = subscriptions.map { MQTTSubscribeInfo.init(topicFilter: $0.0, qos: .atLeastOnce) }
return client?.subscribe(to: subscriptionInfo).map { _ in } ?? eventLoopGroup.next().makeSucceededVoidFuture()
}
func connect(cleanSession: Bool) -> EventLoopFuture<Bool> {
client?.connect(cleanSession: cleanSession) ?? eventLoopGroup.next().makeSucceededFuture(false)
}
func connectAndSubscribe(cleanSession: Bool) -> EventLoopFuture<Void> {
connect(cleanSession: cleanSession)
.flatMap{ _ in self.createSubscriptions() }
}
func destroy() -> EventLoopFuture<Void> {
guard let client = client else {
return eventLoopGroup.next().makeSucceededVoidFuture()
}
return client.disconnect().map { _ in
try? self.client?.syncShutdownGracefully()
self.client = nil
}
}
}

View File

@@ -14,40 +14,77 @@ final class ClientLiveTests: XCTestCase {
static let hostname = ProcessInfo.processInfo.environment["MOSQUITTO_SERVER"] ?? "localhost"
let topics = Topics()
func test_mqtt_subscription() throws {
let mqttClient = createMQTTClient(identifier: "test_subscription")
_ = try mqttClient.connect().wait()
let sub = try mqttClient.v5.subscribe(
to: [mqttClient.mqttSubscription(topic: "test/subscription")]
).wait()
XCTAssertEqual(sub.reasons[0], .grantedQoS1)
try mqttClient.disconnect().wait()
try mqttClient.syncShutdownGracefully()
}
func test_mqtt_listener() throws {
let lock = Lock()
var publishRecieved: [MQTTPublishInfo] = []
let payloadString = "test"
let payload = ByteBufferAllocator().buffer(string: payloadString)
let client = self.createMQTTClient(identifier: "testMQTTListener_publisher")
_ = try client.connect().wait()
client.addPublishListener(named: "test") { result in
switch result {
case .success(let publish):
var buffer = publish.payload
let string = buffer.readString(length: buffer.readableBytes)
XCTAssertEqual(string, payloadString)
lock.withLock {
publishRecieved.append(publish)
}
case .failure(let error):
XCTFail("\(error)")
}
}
try client.publish(to: "testMQTTSubscribe", payload: payload, qos: .atLeastOnce, retain: true).wait()
let sub = try client.v5.subscribe(to: [.init(topicFilter: "testMQTTSubscribe", qos: .atLeastOnce)]).wait()
XCTAssertEqual(sub.reasons[0], .grantedQoS1)
Thread.sleep(forTimeInterval: 2)
lock.withLock {
XCTAssertEqual(publishRecieved.count, 1)
}
try client.disconnect().wait()
try client.syncShutdownGracefully()
}
// func test_fetch_humidity() throws {
// let lock = Lock()
// let publishClient = createMQTTClient(identifier: "publishHumidity")
// let mqttClient = createMQTTClient(identifier: "fetchHumidity")
//
//// let exp = XCTestExpectation(description: "fetchHumidity")
//
// _ = try publishClient.connect().wait()
// let client = try createClient(mqttClient: mqttClient)
// var humidityRecieved: [RelativeHumidity] = []
//
// _ = try mqttClient.publish(
// _ = try publishClient.publish(
// to: topics.sensors.humidity,
// payload: ByteBufferAllocator().buffer(string: "\(50.0)"),
// qos: .atLeastOnce
// ).wait()
//
// Thread.sleep(forTimeInterval: 2)
//
//// .flatMapThrowing { _ in
// let humidity = try client.fetchHumidity(.init(topic: self.topics.sensors.humidity)).wait()
// XCTAssertEqual(humidity, 50)
// lock.withLock {
// humidityRecieved.append(humidity)
// }
//// exp.fulfill()
//// }.wait()
//
// try publishClient.disconnect().wait()
// let humidity = try client.fetchHumidity(.init(topic: self.topics.sensors.humidity)).wait()
// XCTAssertEqual(humidity, 50)
// Thread.sleep(forTimeInterval: 2)
// lock.withLock {
// XCTAssertEqual(humidityRecieved.count, 1)
// humidityRecieved.append(humidity)
// }
//
// try mqttClient.disconnect().wait()
// try mqttClient.syncShutdownGracefully()
//
// }
// MARK: - Helpers
@@ -56,12 +93,23 @@ final class ClientLiveTests: XCTestCase {
host: Self.hostname,
port: 1883,
identifier: identifier,
eventLoopGroupProvider: .createNew,
eventLoopGroupProvider: .shared(eventLoopGroup),
logger: self.logger,
configuration: .init(version: .v5_0)
)
}
// func createWebSocketClient(identifier: String) -> MQTTNIO.MQTTClient {
// MQTTNIO.MQTTClient(
// host: Self.hostname,
// port: 8080,
// identifier: identifier,
// eventLoopGroupProvider: .createNew,
// logger: self.logger,
// configuration: .init(useWebSockets: true, webSocketURLPath: "/mqtt")
// )
// }
// Uses default topic names.
func createClient(mqttClient: MQTTNIO.MQTTClient, autoConnect: Bool = true) throws -> Client.MQTTClient {
if autoConnect {
@@ -75,4 +123,6 @@ final class ClientLiveTests: XCTestCase {
logger.logLevel = .trace
return logger
}()
let eventLoopGroup = MultiThreadedEventLoopGroup.init(numberOfThreads: 1)
}

View File

@@ -0,0 +1,77 @@
import Logging
import XCTest
import MQTTNIO
@testable import MQTTStore
import NIO
final class ServerTests: XCTestCase {
func testConnect() throws {
let store = createTestStore()
_ = try store.connect(cleanSession: true).wait()
try store.destroy().wait()
}
func testSubscriptionHandler() throws {
let store = createTestStore()
_ = try store.connectAndSubscribe(cleanSession: true).wait()
_ = try store.client?.publish(
to: "test/topic",
payload: ByteBufferAllocator().buffer(string: "test"),
qos: .atLeastOnce
).wait()
Thread.sleep(forTimeInterval: 2)
XCTAssertEqual(store.state.messages.count, 1)
XCTAssertEqual(store.state.messages[0], "test")
try store.destroy().wait()
}
func createTestStore() -> MQTTStore<TestState> {
.init(
state: .init(messages: []),
subscriptions: [("test/topic", stateHandler(_:_:))],
serverDetails: serverDetails,
eventLoopGroup: MultiThreadedEventLoopGroup.init(numberOfThreads: 1),
logger: logger
)
}
let logger: Logger = {
var logger = Logger(label: "MQTT Test")
logger.logLevel = .trace
return logger
}()
var serverDetails: ServerDetails {
.init(
identifier: "Test Server",
hostname: "localhost",
port: 1883,
version: .v3_1_1,
cleanSession: true,
useTLS: false,
useWebSocket: false,
webSocketUrl: "/mqtt",
username: nil,
password: nil
)
}
struct TestState {
var messages: [String]
}
func stateHandler(_ state: inout TestState, _ result: Result<MQTTPublishInfo, Error>) {
switch result {
case let .success(value):
let payload = String(buffer: value.payload)
state.messages.append(payload)
case .failure:
break
}
}
}

View File

@@ -1,47 +1,47 @@
import XCTest
import class Foundation.Bundle
final class dewPoint_controllerTests: XCTestCase {
func testExample() throws {
// This is an example of a functional test case.
// Use XCTAssert and related functions to verify your tests produce the correct
// results.
// Some of the APIs that we use below are available in macOS 10.13 and above.
guard #available(macOS 10.13, *) else {
return
}
// Mac Catalyst won't have `Process`, but it is supported for executables.
#if !targetEnvironment(macCatalyst)
let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
let process = Process()
process.executableURL = fooBinary
let pipe = Pipe()
process.standardOutput = pipe
try process.run()
process.waitUntilExit()
let data = pipe.fileHandleForReading.readDataToEndOfFile()
let output = String(data: data, encoding: .utf8)
XCTAssertEqual(output, "Hello, world!\n")
#endif
}
/// Returns path to the built products directory.
var productsDirectory: URL {
#if os(macOS)
for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
return bundle.bundleURL.deletingLastPathComponent()
}
fatalError("couldn't find the products directory")
#else
return Bundle.main.bundleURL
#endif
}
}
//final class dewPoint_controllerTests: XCTestCase {
// func testExample() throws {
// // This is an example of a functional test case.
// // Use XCTAssert and related functions to verify your tests produce the correct
// // results.
//
// // Some of the APIs that we use below are available in macOS 10.13 and above.
// guard #available(macOS 10.13, *) else {
// return
// }
//
// // Mac Catalyst won't have `Process`, but it is supported for executables.
// #if !targetEnvironment(macCatalyst)
//
// let fooBinary = productsDirectory.appendingPathComponent("dewPoint-controller")
//
// let process = Process()
// process.executableURL = fooBinary
//
// let pipe = Pipe()
// process.standardOutput = pipe
//
// try process.run()
// process.waitUntilExit()
//
// let data = pipe.fileHandleForReading.readDataToEndOfFile()
// let output = String(data: data, encoding: .utf8)
//
// XCTAssertEqual(output, "Hello, world!\n")
// #endif
// }
//
// /// Returns path to the built products directory.
// var productsDirectory: URL {
// #if os(macOS)
// for bundle in Bundle.allBundles where bundle.bundlePath.hasSuffix(".xctest") {
// return bundle.bundleURL.deletingLastPathComponent()
// }
// fatalError("couldn't find the products directory")
// #else
// return Bundle.main.bundleURL
// #endif
// }
//}

View File

@@ -20,3 +20,6 @@ services:
- ./mosquitto/certs:/mosquitto/certs
ports:
- "1883:1883"
- "8883:8883"
- "8080:8080"
- "8081:8081"