diff --git a/src/configuration/config_options.hpp b/src/configuration/config_options.hpp index a0f471ac1..752040102 100644 --- a/src/configuration/config_options.hpp +++ b/src/configuration/config_options.hpp @@ -68,6 +68,8 @@ namespace mtconnect { DECLARE_CONFIGURATION(MqttPort); DECLARE_CONFIGURATION(MqttHost); DECLARE_CONFIGURATION(MqttConnectInterval); + DECLARE_CONFIGURATION(MqttUserName); + DECLARE_CONFIGURATION(MqttPassword); // Adapter Configuration DECLARE_CONFIGURATION(AdapterIdentity); diff --git a/src/mqtt/mqtt_client_impl.hpp b/src/mqtt/mqtt_client_impl.hpp index d11d01054..77479d8c0 100644 --- a/src/mqtt/mqtt_client_impl.hpp +++ b/src/mqtt/mqtt_client_impl.hpp @@ -15,6 +15,7 @@ // limitations under the License. // +#include #include #include @@ -50,6 +51,8 @@ namespace mtconnect { mqtt::protocol_version>; using mqtt_tls_client = mqtt_tls_client_ptr; + using mqtt_tls_client_ws = mqtt_tls_client_ws_ptr; template class MqttClientImpl : public MqttClient @@ -60,9 +63,14 @@ namespace mtconnect { : MqttClient(ioContext, move(handler)), m_options(options), m_host(GetOption(options, configuration::MqttHost).value_or("localhost")), - m_port(GetOption(options, configuration::MqttPort).value_or(1883)), + m_port(GetOption(options, configuration::MqttPort).value_or(1883)), m_reconnectTimer(ioContext) { + if (auto userName = GetOption(options, configuration::MqttUserName)) + m_username = *userName; + if (auto pswd = GetOption(options, configuration::MqttPassword)) + m_password = *pswd; + std::stringstream url; url << "mqtt://" << m_host << ':' << m_port; m_url = url.str(); @@ -320,6 +328,10 @@ namespace mtconnect { std::uint16_t m_clientId {0}; + std::string m_username = ""; + + std::string m_password = ""; + boost::asio::steady_timer m_reconnectTimer; }; @@ -334,6 +346,10 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_async_client(m_ioContext, m_host, m_port); + if (!m_username.empty()) + m_client->set_user_name(m_username); + if (!m_password.empty()) + m_client->set_password(m_password); } return m_client; @@ -354,15 +370,22 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_tls_async_client(m_ioContext, m_host, m_port); + if (!m_username.empty()) + m_client->set_user_name(m_username); + if (!m_password.empty()) + m_client->set_password(m_password); + auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) { m_client->get_ssl_context().load_verify_file(*cacert); } + auto private_key = GetOption(m_options, configuration::MqttPrivateKey); auto cert = GetOption(m_options, configuration::MqttCert); if (private_key && cert) { + m_client->get_ssl_context().set_verify_mode(boost::asio::ssl::verify_peer); m_client->get_ssl_context().use_certificate_chain_file(*cert); m_client->get_ssl_context().use_private_key_file(*private_key, boost::asio::ssl::context::pem); @@ -376,5 +399,44 @@ namespace mtconnect { mqtt_tls_client m_client; }; + class MqttTlsWSClient : public MqttClientImpl + { + public: + using base = MqttClientImpl; + using base::base; + + auto &getClient() + { + if (!m_client) + { + m_client = mqtt::make_tls_async_client_ws(m_ioContext, m_host, m_port); + if (!m_username.empty()) + m_client->set_user_name(m_username); + if (!m_password.empty()) + m_client->set_password(m_password); + + auto cacert = GetOption(m_options, configuration::MqttCaCert); + if (cacert) + { + m_client->get_ssl_context().load_verify_file(*cacert); + } + auto private_key = GetOption(m_options, configuration::MqttPrivateKey); + auto cert = GetOption(m_options, configuration::MqttCert); + if (private_key && cert) + { + m_client->get_ssl_context().set_verify_mode(boost::asio::ssl::verify_peer); + m_client->get_ssl_context().use_certificate_chain_file(*cert); + m_client->get_ssl_context().use_private_key_file(*private_key, + boost::asio::ssl::context::pem); + } + } + + return m_client; + } + + protected: + mqtt_tls_client_ws m_client; + }; + } // namespace mqtt_client } // namespace mtconnect diff --git a/src/mqtt/mqtt_server_impl.hpp b/src/mqtt/mqtt_server_impl.hpp index ca957c85b..6b2dab9ac 100644 --- a/src/mqtt/mqtt_server_impl.hpp +++ b/src/mqtt/mqtt_server_impl.hpp @@ -288,6 +288,52 @@ namespace mtconnect { m_port = GetOption(options, configuration::Port).value_or(8883); } + using base::base; + using server = MQTT_NS::server_tls<>; + + auto &getServer() { return m_server; } + + auto &createServer() + { + if (!m_server) + { + boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12); + ctx.set_options(boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::single_dh_use); + auto serverPrivateKey = GetOption(m_options, configuration::TlsPrivateKey); + auto serverCert = GetOption(m_options, configuration::TlsCertificateChain); + ctx.use_certificate_chain_file(*serverCert); + //ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); + ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); + + if (HasOption(m_options, configuration::TlsCertificatePassword)) + { + ctx.set_password_callback( + [this](size_t, boost::asio::ssl::context_base::password_purpose) -> string { + return *GetOption(m_options, configuration::TlsCertificatePassword); + }); + } + m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), + std::move(ctx), m_ioContext); + } + + return *m_server; + } + + protected: + std::optional m_server; + }; + + class MqttTlsWSServer : public MqttServerImpl + { + public: + using base = MqttServerImpl; + MqttTlsWSServer(boost::asio::io_context &ioContext, const ConfigOptions &options) + : base(ioContext, options) + { + m_port = GetOption(options, configuration::Port).value_or(8883); + } + using base::base; using server = MQTT_NS::server_tls_ws<>; @@ -298,6 +344,27 @@ namespace mtconnect { if (!m_server) { boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12); + ctx.set_options(boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::single_dh_use); + auto serverPrivateKey = GetOption(m_options, configuration::TlsPrivateKey); + auto serverCert = GetOption(m_options, configuration::TlsCertificateChain); + ctx.use_certificate_chain_file(*serverCert); + //ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); + ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); + + /*if (IsOptionSet(m_options, configuration::TlsVerifyClientCertificate)) + { + LOG(info) << "Server: Will only accept client connections with valid certificates"; + + ctx.set_verify_mode(boost::asio::ssl::verify_peer | + boost::asio::ssl::verify_fail_if_no_peer_cert); + if (HasOption(m_options, configuration::MqttCaCert)) + { + LOG(info) << "Server: Adding Client Certificates."; + ctx.load_verify_file(*GetOption(m_options, configuration::MqttCaCert)); + } + }*/ + m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), std::move(ctx), m_ioContext); } diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 63d626c64..d7e4eaa4f 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -33,12 +33,21 @@ using namespace std; using namespace mtconnect; +using namespace mtconnect::configuration; using namespace mtconnect::device_model::data_item; using namespace mtconnect::sink::mqtt_sink; using namespace mtconnect::sink::rest_sink; using json = nlohmann::json; +const string MqttClientCACert(PROJECT_ROOT_DIR "/test/resources/rootca.crt"); +const string MqttClientCert(PROJECT_ROOT_DIR "/test/resources/client.crt"); +const string MqttClientKey {PROJECT_ROOT_DIR "/test/resources/client.key"}; + +const string ServerCertFile(PROJECT_ROOT_DIR "/test/resources/user.crt"); +const string ServerKeyFile {PROJECT_ROOT_DIR "/test/resources/user.key"}; +const string ServerDhFile {PROJECT_ROOT_DIR "/test/resources/dh2048.pem"}; + class MqttIsolatedUnitTest : public testing::Test { protected: @@ -67,17 +76,29 @@ class MqttIsolatedUnitTest : public testing::Test } void createServer(const ConfigOptions &options) - { - using namespace mtconnect::configuration; + { + bool withTlsOption = IsOptionSet(options, configuration::MqttTls); + ConfigOptions opts(options); MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, {MqttPort, 0}, - {MqttTls, false}, - {AutoAvailable, false}, + {MqttTls, withTlsOption}, + {AutoAvailable, false}, + {TlsCertificateChain, ServerCertFile}, + {TlsPrivateKey, ServerKeyFile}, + {TlsCertificatePassword, "mtconnect"s}, {RealTime, false}}); - m_server = - make_shared(m_agentTestHelper->m_ioContext, opts); + if (withTlsOption) + { + m_server = + make_shared(m_agentTestHelper->m_ioContext, opts); + } + else + { + m_server = + make_shared(m_agentTestHelper->m_ioContext, opts); + } } template @@ -117,19 +138,32 @@ class MqttIsolatedUnitTest : public testing::Test void createClient(const ConfigOptions &options, unique_ptr &&handler) { - using namespace mtconnect::configuration; + bool withTlsOption = IsOptionSet(options, configuration::MqttTls); + ConfigOptions opts(options); MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, {MqttPort, m_port}, - {MqttTls, false}, + {MqttTls, withTlsOption}, {AutoAvailable, false}, + {MqttCaCert, MqttClientCACert}, + {MqttCert, MqttClientCert}, + {MqttPrivateKey, MqttClientKey}, {RealTime, false}}); - m_client = make_shared(m_agentTestHelper->m_ioContext, - opts, move(handler)); + + if (withTlsOption) + { + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, move(handler)); + } + else + { + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, move(handler)); + } } bool startClient() - { + { bool started = m_client && m_client->start(); if (started) { @@ -146,15 +180,10 @@ class MqttIsolatedUnitTest : public testing::Test uint16_t m_port {0}; }; -const string MqttCACert(PROJECT_ROOT_DIR "/test/resources/clientca.crt"); TEST_F(MqttIsolatedUnitTest, mqtt_client_should_connect_to_broker) { - ConfigOptions options { - {configuration::Host, "localhost"s}, {configuration::Port, 0}, - {configuration::MqttTls, false}, {configuration::AutoAvailable, false}, - {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; - + ConfigOptions options; createServer(options); startServer(); @@ -170,12 +199,9 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_connect_to_broker) ASSERT_TRUE(m_client->isConnected()); } -TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) +TEST_F(MqttIsolatedUnitTest, mqtt_tcp_client_should_receive_loopback_publication) { - ConfigOptions options { - {configuration::Host, "localhost"s}, {configuration::Port, 0}, - {configuration::MqttTls, false}, {configuration::AutoAvailable, false}, - {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; + ConfigOptions options; createServer(options); startServer(); @@ -199,11 +225,11 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) { pid_sub1 = client->acquire_unique_packet_id(); - client->async_subscribe(pid_sub1, "mqtt_client_cpp/topic1", MQTT_NS::qos::at_most_once, + client->async_subscribe(pid_sub1, "mqtt_tcp_client_cpp/topic1", MQTT_NS::qos::at_most_once, //[optional] checking async_subscribe completion code [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); - std::cout << "async_subscribe callback: " << ec.message() + std::cout << "async_tcp_subscribe callback: " << ec.message() << std::endl; }); } @@ -221,12 +247,12 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) if (packet_id == pid_sub1) { - client->async_publish("mqtt_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, + client->async_publish("mqtt_tcp_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, //[optional] checking async_publish completion code [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); - std::cout << "async_publish callback: " << ec.message() << std::endl; + std::cout << "async_tcp_publish callback: " << ec.message() << std::endl; EXPECT_EQ(ec.message(), "Success"); }); return true; @@ -247,12 +273,12 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) if (packet_id == pid_sub1) { - client->async_publish("mqtt_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, + client->async_publish("mqtt_tcp_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, //[optional] checking async_publish completion code [packet_id](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); - std::cout << "async_publish callback: " << ec.message() << std::endl; + std::cout << "async_tcp_publish callback: " << ec.message() << std::endl; ASSERT_TRUE(packet_id); }); } @@ -272,7 +298,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) std::cout << "topic_name: " << topic_name << std::endl; std::cout << "contents: " << contents << std::endl; - EXPECT_EQ("mqtt_client_cpp/topic1", topic_name); + EXPECT_EQ("mqtt_tcp_client_cpp/topic1", topic_name); EXPECT_EQ("test1", contents); client->async_disconnect(); @@ -287,6 +313,65 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) ASSERT_TRUE(received); } -TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) { GTEST_SKIP(); } +TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) +{ + GTEST_SKIP(); + + ConfigOptions options {{configuration::MqttTls, true}}; + + createServer(options); + + startServer(); + + ASSERT_NE(0, m_port); + + auto handler = make_unique(); + + createClient(options, move(handler)); + + ASSERT_TRUE(startClient()); + + ASSERT_TRUE(m_client->isConnected()); +} + +TEST_F(MqttIsolatedUnitTest, should_connect_using_tls_ws) +{ + GTEST_SKIP(); + + ConfigOptions options; + MergeOptions(options, {{ServerIp, "127.0.0.1"s}, + {MqttPort, 0}, + {MqttTls, true}, + {AutoAvailable, false}, + {TlsCertificateChain, ServerCertFile}, + {TlsPrivateKey, ServerKeyFile}, + {RealTime, false}}); + + m_server = + make_shared(m_agentTestHelper->m_ioContext, options); + + startServer(); + + ASSERT_NE(0, m_port); + + auto handler = make_unique(); + + ConfigOptions opts; + MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, + {MqttPort, m_port}, + {MqttTls, true}, + {AutoAvailable, false}, + {MqttCaCert, MqttClientCACert}, + {MqttCert, MqttClientCert}, + {MqttPrivateKey, MqttClientKey}, + {RealTime, false}}); + + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, move(handler)); + + ASSERT_TRUE(startClient()); + + ASSERT_TRUE(m_client->isConnected()); +} TEST_F(MqttIsolatedUnitTest, should_conenct_using_authentication) { GTEST_SKIP(); } diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index c3b5d7362..79f61fff3 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -36,9 +36,8 @@ using namespace std; using namespace mtconnect; using namespace mtconnect::device_model::data_item; using namespace mtconnect::sink::mqtt_sink; -using namespace mtconnect::sink::rest_sink; using namespace mtconnect::asset; -using namespace mtconnect::entity; +using namespace mtconnect::configuration; using json = nlohmann::json; @@ -82,7 +81,7 @@ class MqttSinkTest : public testing::Test ConfigOptions opts(options); MergeOptions(opts, {{"MqttSink", true}, - {configuration::MqttPort, m_port}, + {configuration::MqttPort, m_port}, {configuration::MqttHost, "127.0.0.1"s}}); m_agentTestHelper->createAgent("/samples/test_config.xml", 8, 4, "2.0", 25, false, true, opts); addAdapter(); @@ -97,7 +96,7 @@ class MqttSinkTest : public testing::Test MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, {MqttPort, 0}, {MqttTls, false}, - {AutoAvailable, false}, + {AutoAvailable, false}, {RealTime, false}}); m_server = @@ -140,13 +139,12 @@ class MqttSinkTest : public testing::Test } void createClient(const ConfigOptions &options, unique_ptr &&handler) - { - using namespace mtconnect::configuration; + { ConfigOptions opts(options); MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, {MqttPort, m_port}, {MqttTls, false}, - {AutoAvailable, false}, + {AutoAvailable, false}, {RealTime, false}}); m_client = make_shared(m_agentTestHelper->m_ioContext, opts, move(handler)); @@ -198,6 +196,35 @@ TEST_F(MqttSinkTest, mqtt_sink_should_connect_to_broker) ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); } +TEST_F(MqttSinkTest, mqtt_sink_should_connect_to_broker_with_UserNameandPassword) +{ + ConfigOptions options {{MqttUserName, "MQTT-SINK"s}, + {MqttPassword, "mtconnect"s}}; + createServer(options); + startServer(); + + ASSERT_NE(0, m_port); + + createAgent("", options); + auto service = m_agentTestHelper->getMqttService(); + + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_connect_to_broker_without_UserNameandPassword) +{ + ConfigOptions options; + createServer(options); + startServer(); + + ASSERT_NE(0, m_port); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); +} + TEST_F(MqttSinkTest, mqtt_sink_should_publish_device) { ConfigOptions options; @@ -259,13 +286,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Streams) { EXPECT_TRUE(true); foundLineDataItem = true; - } - // this below code not working currently - /*ErrorList list; - auto ptr = parser.parse(device_model::data_item::DataItem::getRoot(), payload, "2.0", list); - auto dataItem = dynamic_pointer_cast(ptr); - if (dataItem) - EXPECT_EQ(string("204"),dataItem->getValue());*/ + } }; createClient(options, move(handler)); ASSERT_TRUE(startClient()); @@ -277,11 +298,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Streams) m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/Line[line]"); - - // m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|lp|NORMAL||||"); - - // m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/LogicProgram"); - + waitFor(2s, [&foundLineDataItem]() { return foundLineDataItem; }); } @@ -306,12 +323,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) { EXPECT_TRUE(true); gotControllerDataItem = true; - } - /*ErrorList list; - auto ptr = parser.parse(Asset::getRoot(), payload, "2.0", list); - EXPECT_EQ(0, list.size()); - auto asset = dynamic_cast(ptr.get()); - EXPECT_TRUE(asset);*/ + } }; createClient(options, move(handler)); ASSERT_TRUE(startClient()); @@ -327,3 +339,282 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); } + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_RotaryMode) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotRotaryMode = false; + handler->m_receive = [&gotRotaryMode, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Rotary[C]/Events/RotaryMode[Smode]", topic); + auto jdoc = json::parse(payload); + + string id = jdoc.at("/value"_json_pointer).get(); + if (id == string("SPINDLE")) + { + EXPECT_TRUE(true); + gotRotaryMode = true; + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent("/samples/discrete_example.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|block|G01X00|Smode|INDEX|line|204"); + + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Rotary[C]/Events/RotaryMode[Smode]"); + + waitFor(3s, [&gotRotaryMode]() { return gotRotaryMode; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + entity::JsonParser parser; + auto handler = make_unique(); + bool gotControllerDataItem = false; + handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]", topic); + auto jdoc = json::parse(payload); + string id = jdoc.at("/Part/a"_json_pointer).get(); + + if (id == string("1")) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + createAgent("/samples/data_set.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + auto time = chrono::system_clock::now(); + m_agentTestHelper->m_adapter->processData("TIME|vars|a=1 b=2 c=3"); + m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]"); + waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + entity::JsonParser parser; + auto handler = make_unique(); + bool gotControllerDataItem = false; + handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ( + "MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/WorkOffsetTable[wpo]", + topic); + auto jdoc = json::parse(payload); + + auto jValue = jdoc.at("/value"_json_pointer); + int count = 0; + if (jValue.is_object()) + { + for (auto &[key, value] : jValue.items()) + { + if (key == "G53.1" || key == "G53.2" || key == "G53.3") + { + for (auto &[subKey, subValue] : value.items()) + { + if (key == "G53.1" && (subKey == "X" && subValue.get() == 1 || + subKey == "Y" && subValue.get() == 2 || + subKey == "Z" && subValue.get() == 3)) + { + count++; + } + else if (key == "G53.2" && (subKey == "X" && subValue.get() == 4 || + subKey == "Y" && subValue.get() == 5 || + subKey == "Z" && subValue.get() == 6)) + { + count++; + } + else if (key == "G53.3" && (subKey == "X" && subValue.get() == 7.0 || + subKey == "Y" && subValue.get() == 8.0 || + subKey == "Z" && subValue.get() == 9.0 || + subKey == "U" && subValue.get() == 10.0)) + { + count++; + } + } + } + } + if (count == 10) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + createAgent("/samples/data_set.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + auto time = chrono::system_clock::now(); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|wpo|G53.1={X=1.0 Y=2.0 Z=3.0} G53.2={X=4.0 Y=5.0 Z=6.0}" + "G53.3={X=7.0 Y=8.0 Z=9 U=10.0}"); + + m_client->subscribe( + "MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/" + "WorkOffsetTable[wpo]"); + + waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Temperature) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotTemperature = false; + handler->m_receive = [&gotTemperature, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ( + "MTConnect/Observation/000/Axes[Axes]/Linear[Z]/Motor[motor_name]/Samples/" + "Temperature[z_motor_temp]", + topic); + auto jdoc = json::parse(payload); + + auto value = jdoc.at("/value"_json_pointer); + double load = 81.0; + if (value.is_number()) + { + if (load == double(value)) + { + EXPECT_TRUE(true); + gotTemperature = true; + } + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData("2018-04-27T05:00:26.555666|z_motor_temp|81"); + + m_client->subscribe( + "MTConnect/Observation/000/Axes[Axes]/Linear[Z]/Motor[motor_name]/Samples/" + "Temperature[z_motor_temp]"); + + waitFor(3s, [&gotTemperature]() { return gotTemperature; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_LinearLoad) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + entity::JsonParser parser; + auto handler = make_unique(); + bool gotLinearLoad = false; + handler->m_receive = [&gotLinearLoad, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Load[Xload]", topic); + auto jdoc = json::parse(payload); + auto value = jdoc.at("/value"_json_pointer); + double load = 50.0; + if (value.is_number()) + { + if (load == double(value)) + { + EXPECT_TRUE(true); + gotLinearLoad = true; + } + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + m_agentTestHelper->m_adapter->processData("2018-04-27T05:00:26.555666|Xload|50"); + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Load[Xload]"); + waitFor(3s, [&gotLinearLoad]() { return gotLinearLoad; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_DynamicCalibration) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotCalibration = false; + handler->m_receive = [&gotCalibration, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ( + "MTConnect/Observation/000/Axes[Axes]/Linear[X]/Samples/PositionTimeSeries.Actual[Xts]", + topic); + auto jdoc = json::parse(payload); + + auto value = jdoc.at("/value"_json_pointer); + + if (value.is_array()) + { + if (value.size() == 25) + { + EXPECT_TRUE(true); + gotCalibration = true; + } + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|Xts|25|| 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 " + "5119 5119 5118 " + "5118 5117 5117 5119 5119 5118 5118 5118 5118 5118"); + + m_client->subscribe( + "MTConnect/Observation/000/Axes[Axes]/Linear[X]/Samples/PositionTimeSeries.Actual[Xts]"); + + waitFor(3s, [&gotCalibration]() { return gotCalibration; }); +}