From ddaa296ef12a1c07110a295b619e20a1956ad57c Mon Sep 17 00:00:00 2001 From: rajani Date: Mon, 22 Aug 2022 15:04:21 -0400 Subject: [PATCH 01/17] Mqtt: added dataset and table unittests (skipped at the moment)) --- test/mqtt_sink_test.cpp | 85 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index d70510d13..eb228a572 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -331,3 +331,88 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); } + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) +{ + GTEST_SKIP(); + + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotControllerDataItem = false; + handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]", topic); + auto jdoc = json::parse(payload); + string id = jdoc.at("/Part/a"_json_pointer).get(); + // ASSERT_DATA_SET_ENTRY(doc, "VariableDataSet[2]", "a", "1"); + if (id == string("1")) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent("/samples/data_set.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + auto time = chrono::system_clock::now(); + + m_agentTestHelper->m_adapter->processData("TIME|vars|a=1 b=2 c=3"); + + m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]"); + + waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) +{ + GTEST_SKIP(); + + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotControllerDataItem = false; + handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/WORK_OFFSET[wpo]", topic); + auto jdoc = json::parse(payload); + //ASSERT_TABLE_ENTRY(doc, "WorkOffsetTable[@dataItemId='wp1']", "G53.2", "X", "4"); + + string id = jdoc.at("/EntryDefinition/key"_json_pointer).get(); + if (id == string("G53.1")) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent("/samples/data_set.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + auto time = chrono::system_clock::now(); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|wpo|G53.1={X=1.0 Y=2.0 Z=3.0} G53.2={X=4.0 Y=5.0 Z=6.0} G53.3={X=7.0 " + "Y=8.0 Z=9 U=10.0}"); + + m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/WORK_OFFSET[wpo]"); + + waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); +} From 49438240de0294a5c702de9c10eff9fa6a40434e Mon Sep 17 00:00:00 2001 From: rajani Date: Thu, 25 Aug 2022 18:33:03 -0400 Subject: [PATCH 02/17] added few more tests into mqtt_sink unit test --- test/mqtt_sink_test.cpp | 102 ++++++++++++++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 15 deletions(-) diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index eb228a572..d274d7d56 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -280,10 +280,6 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Streams) m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/Line[line]"); - //m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|lp|NORMAL||||"); - - //m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/LogicProgram"); - waitFor(2s, [&foundLineDataItem]() { return foundLineDataItem; }); } @@ -300,7 +296,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) bool gotControllerDataItem = false; handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, const std::string &topic, - const std::string &payload) { + const std::string &payload) { EXPECT_EQ("MTConnect/Asset/0001", topic); auto jdoc = json::parse(payload); string id = jdoc.at("/Part/assetId"_json_pointer).get(); @@ -309,13 +305,8 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) EXPECT_TRUE(true); gotControllerDataItem = true; } - /*ErrorList list; - auto ptr = parser.parse(Asset::getRoot(), payload, "2.0", list); - EXPECT_EQ(0, list.size()); - auto asset = dynamic_cast(ptr.get()); - EXPECT_TRUE(asset);*/ - }; + createClient(options, move(handler)); ASSERT_TRUE(startClient()); @@ -332,6 +323,87 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); } +TEST_F(MqttSinkTest, mqtt_sink_should_publish_LinearLoad) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotLinearLoad = false; + handler->m_receive = [&gotLinearLoad, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Load[Xload]", topic); + auto jdoc = json::parse(payload); + + auto value = jdoc.at("/value"_json_pointer); + double load = 50.0; + if (value.is_number()) + { + if (load == double(value)) + { + EXPECT_TRUE(true); + gotLinearLoad = true; + } + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData("2018-04-27T05:00:26.555666|Xload|50"); + + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Load[Xload]"); + + waitFor(3s, [&gotLinearLoad]() { return gotLinearLoad; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_PathPosition) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotPathPosition = false; + handler->m_receive = [&gotPathPosition, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/PathPosition[Ppos]", topic); + auto jdoc = json::parse(payload); + string id = jdoc.at("/value"_json_pointer).get(); + if (id == string("20")) + { + EXPECT_TRUE(true); + gotPathPosition = true; + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent("/samples/filter_example.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData("2018-04-27T05:00:26.555666|Ppos|20"); + + m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/PathPosition[Ppos]"); + + waitFor(3s, [&gotPathPosition]() { return gotPathPosition; }); +} + TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) { GTEST_SKIP(); @@ -351,12 +423,12 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]", topic); auto jdoc = json::parse(payload); string id = jdoc.at("/Part/a"_json_pointer).get(); - // ASSERT_DATA_SET_ENTRY(doc, "VariableDataSet[2]", "a", "1"); + // ASSERT_DATA_SET_ENTRY(doc, "VariableDataSet[2]", "a", "1"); if (id == string("1")) { EXPECT_TRUE(true); gotControllerDataItem = true; - } + } }; createClient(options, move(handler)); ASSERT_TRUE(startClient()); @@ -391,14 +463,14 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) const std::string &payload) { EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/WORK_OFFSET[wpo]", topic); auto jdoc = json::parse(payload); - //ASSERT_TABLE_ENTRY(doc, "WorkOffsetTable[@dataItemId='wp1']", "G53.2", "X", "4"); + // ASSERT_TABLE_ENTRY(doc, "WorkOffsetTable[@dataItemId='wp1']", "G53.2", "X", "4"); string id = jdoc.at("/EntryDefinition/key"_json_pointer).get(); if (id == string("G53.1")) { EXPECT_TRUE(true); gotControllerDataItem = true; - } + } }; createClient(options, move(handler)); ASSERT_TRUE(startClient()); From 32b6d0678936fa821cf68fabc6772edf0027c3df Mon Sep 17 00:00:00 2001 From: rajani Date: Tue, 30 Aug 2022 09:13:01 -0400 Subject: [PATCH 03/17] added Tls connection unit test in MqttIsolatedUnitTest currently it skipped --- test/mqtt_isolated_test.cpp | 75 +++++++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 15 deletions(-) diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 733a10f5c..1cf1edc63 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -69,15 +69,26 @@ class MqttIsolatedUnitTest : public testing::Test void createServer(const ConfigOptions &options) { using namespace mtconnect::configuration; + + bool withTlsOption = IsOptionSet(options, configuration::MqttTls); + ConfigOptions opts(options); MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, {MqttPort, 0}, - {MqttTls, false}, + {MqttTls, withTlsOption}, {AutoAvailable, false}, {RealTime, false}}); - m_server = - make_shared(m_agentTestHelper->m_ioContext, opts); + if (withTlsOption) + { + m_server = + make_shared(m_agentTestHelper->m_ioContext, opts); + } + else + { + m_server = + make_shared(m_agentTestHelper->m_ioContext, opts); + } } template @@ -118,14 +129,26 @@ class MqttIsolatedUnitTest : public testing::Test void createClient(const ConfigOptions &options, unique_ptr &&handler) { using namespace mtconnect::configuration; + + bool withTlsOption = IsOptionSet(options, configuration::MqttTls); + ConfigOptions opts(options); MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, {MqttPort, m_port}, - {MqttTls, false}, + {MqttTls, withTlsOption}, {AutoAvailable, false}, {RealTime, false}}); - m_client = make_shared(m_agentTestHelper->m_ioContext, - opts, move(handler)); + + if (withTlsOption) + { + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, move(handler)); + } + else + { + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, move(handler)); + } } bool startClient() @@ -170,7 +193,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_connect_to_broker) ASSERT_TRUE(m_client->isConnected()); } -TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) +TEST_F(MqttIsolatedUnitTest, mqtt_tcp_client_should_receive_loopback_publication) { ConfigOptions options { {configuration::Host, "localhost"s}, {configuration::Port, 0}, @@ -199,11 +222,11 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) { pid_sub1 = client->acquire_unique_packet_id(); - client->async_subscribe(pid_sub1, "mqtt_client_cpp/topic1", MQTT_NS::qos::at_most_once, + client->async_subscribe(pid_sub1, "mqtt_tcp_client_cpp/topic1", MQTT_NS::qos::at_most_once, //[optional] checking async_subscribe completion code [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); - std::cout << "async_subscribe callback: " << ec.message() + std::cout << "async_tcp_subscribe callback: " << ec.message() << std::endl; }); } @@ -221,12 +244,12 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) if (packet_id == pid_sub1) { - client->async_publish("mqtt_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, + client->async_publish("mqtt_tcp_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, //[optional] checking async_publish completion code [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); - std::cout << "async_publish callback: " << ec.message() << std::endl; + std::cout << "async_tcp_publish callback: " << ec.message() << std::endl; EXPECT_EQ(ec.message(), "Success"); }); return true; @@ -247,12 +270,12 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) if (packet_id == pid_sub1) { - client->async_publish("mqtt_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, + client->async_publish("mqtt_tcp_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, //[optional] checking async_publish completion code [packet_id](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); - std::cout << "async_publish callback: " << ec.message() << std::endl; + std::cout << "async_tcp_publish callback: " << ec.message() << std::endl; ASSERT_TRUE(packet_id); }); } @@ -272,7 +295,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) std::cout << "topic_name: " << topic_name << std::endl; std::cout << "contents: " << contents << std::endl; - EXPECT_EQ("mqtt_client_cpp/topic1", topic_name); + EXPECT_EQ("mqtt_tcp_client_cpp/topic1", topic_name); EXPECT_EQ("test1", contents); client->async_disconnect(); @@ -287,6 +310,28 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_receive_loopback_publication) ASSERT_TRUE(received); } -TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) { GTEST_SKIP(); } +TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) +{ + GTEST_SKIP(); + + ConfigOptions options { + {configuration::Host, "localhost"s}, {configuration::Port, 0}, + {configuration::MqttTls, true}, {configuration::AutoAvailable, false}, + {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; + + createServer(options); + + startServer(); + + ASSERT_NE(0, m_port); + + auto handler = make_unique(); + + createClient(options, move(handler)); + + ASSERT_TRUE(startClient()); + + ASSERT_TRUE(m_client->isConnected()); +} TEST_F(MqttIsolatedUnitTest, should_conenct_using_authentication) { GTEST_SKIP(); } From 46ca43b3da13e5d334a1110f2f851521d3b7c5dc Mon Sep 17 00:00:00 2001 From: rajani Date: Thu, 1 Sep 2022 16:17:02 -0400 Subject: [PATCH 04/17] added Mqtt Unittest for rotatrymode --- test/mqtt_sink_test.cpp | 47 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index d274d7d56..76b626dcc 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -75,16 +75,13 @@ class MqttSinkTest : public testing::Test m_jsonPrinter.reset(); } - void createAgent(std::string testFile = {}, ConfigOptions options = {}) + void createAgent(const std::string &testFile = "/samples/test_config.xml", ConfigOptions options = {}) { - if (testFile == "") - testFile = "/samples/test_config.xml"; - ConfigOptions opts(options); MergeOptions(opts, {{"MqttSink", true}, {configuration::MqttPort, m_port}, {configuration::MqttHost, "127.0.0.1"s}}); - m_agentTestHelper->createAgent("/samples/test_config.xml", 8, 4, "2.0", 25, false, true, opts); + m_agentTestHelper->createAgent(testFile, 8, 4, "2.0", 25, false, true, opts); addAdapter(); m_agentTestHelper->getAgent()->start(); @@ -404,6 +401,46 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_PathPosition) waitFor(3s, [&gotPathPosition]() { return gotPathPosition; }); } +TEST_F(MqttSinkTest, mqtt_sink_should_publish_RotaryMode) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotRotaryMode = false; + handler->m_receive = [&gotRotaryMode, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Rotary[C]/Events/RotaryMode[Smode]", topic); + auto jdoc = json::parse(payload); + + string id = jdoc.at("/value"_json_pointer).get(); + if (id == string("SPINDLE")) + { + EXPECT_TRUE(true); + gotRotaryMode = true; + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent("/samples/discrete_example.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|block|G01X00|Smode|INDEX|line|204"); + + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Rotary[C]/Events/RotaryMode[Smode]"); + + waitFor(3s, [&gotRotaryMode]() { return gotRotaryMode; }); +} + TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) { GTEST_SKIP(); From cf05806b91529c316742a6de1384bba72994469c Mon Sep 17 00:00:00 2001 From: rajani Date: Wed, 7 Sep 2022 16:06:21 -0400 Subject: [PATCH 05/17] added some more unittest to check mqtt functionality now dataset works fine... --- test/mqtt_sink_test.cpp | 125 +++++++++++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 20 deletions(-) diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index 9d44e99c4..36b0539d7 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -257,14 +257,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Streams) { EXPECT_TRUE(true); foundLineDataItem = true; - } - //this below code not working currently - /*ErrorList list; - auto ptr = parser.parse(device_model::data_item::DataItem::getRoot(), payload, "2.0", list); - auto dataItem = dynamic_pointer_cast(ptr); - if (dataItem) - EXPECT_EQ(string("204"),dataItem->getValue());*/ - + } }; createClient(options, move(handler)); ASSERT_TRUE(startClient()); @@ -320,6 +313,52 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); } +TEST_F(MqttSinkTest, mqtt_sink_should_publish_DynamicCalibration) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotCalibration = false; + handler->m_receive = [&gotCalibration, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Samples/PositionTimeSeries.Actual[Xts]", topic); + auto jdoc = json::parse(payload); + + auto value = jdoc.at("/value"_json_pointer); + + if (value.is_array()) + { + if (value.size() == 25) + { + EXPECT_TRUE(true); + gotCalibration = true; + } + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|Xts|25|| 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 " + "5119 5119 5118 " + "5118 5117 5117 5119 5119 5118 5118 5118 5118 5118"); + + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Samples/PositionTimeSeries.Actual[Xts]"); + + waitFor(3s, [&gotCalibration]() { return gotCalibration; }); +} + TEST_F(MqttSinkTest, mqtt_sink_should_publish_LinearLoad) { ConfigOptions options; @@ -363,6 +402,49 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_LinearLoad) waitFor(3s, [&gotLinearLoad]() { return gotLinearLoad; }); } +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Temperature) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotTemperature = false; + handler->m_receive = [&gotTemperature, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Linear[Z]/Motor[motor_name]/Samples/Temperature[z_motor_temp]", topic); + auto jdoc = json::parse(payload); + + auto value = jdoc.at("/value"_json_pointer); + double load = 81.0; + if (value.is_number()) + { + if (load == double(value)) + { + EXPECT_TRUE(true); + gotTemperature = true; + } + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData("2018-04-27T05:00:26.555666|z_motor_temp|81"); + + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Linear[Z]/Motor[motor_name]/Samples/Temperature[z_motor_temp]"); + + waitFor(3s, [&gotTemperature]() { return gotTemperature; }); +} + TEST_F(MqttSinkTest, mqtt_sink_should_publish_PathPosition) { ConfigOptions options; @@ -443,8 +525,6 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_RotaryMode) TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) { - GTEST_SKIP(); - ConfigOptions options; createServer(options); startServer(); @@ -457,14 +537,20 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, const std::string &topic, const std::string &payload) { - EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]", topic); + EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/VariableDataSet[vars]", topic); auto jdoc = json::parse(payload); - string id = jdoc.at("/Part/a"_json_pointer).get(); - // ASSERT_DATA_SET_ENTRY(doc, "VariableDataSet[2]", "a", "1"); - if (id == string("1")) + auto value = jdoc.at("/value"_json_pointer); + if (value.is_object()) { - EXPECT_TRUE(true); - gotControllerDataItem = true; + for (auto &[key, value] : value.items()) + { + if (key == "a" && (value.get() == 1) || key == "b" && value.get() == 2 || + key == "c" && value.get() == 3) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } + } } }; createClient(options, move(handler)); @@ -477,7 +563,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) m_agentTestHelper->m_adapter->processData("TIME|vars|a=1 b=2 c=3"); - m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]"); + m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/VariableDataSet[vars]"); waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); } @@ -498,9 +584,8 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, const std::string &topic, const std::string &payload) { - EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/WORK_OFFSET[wpo]", topic); + EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/WorkOffsetTable[wpo]", topic); auto jdoc = json::parse(payload); - // ASSERT_TABLE_ENTRY(doc, "WorkOffsetTable[@dataItemId='wp1']", "G53.2", "X", "4"); string id = jdoc.at("/EntryDefinition/key"_json_pointer).get(); if (id == string("G53.1")) @@ -521,7 +606,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) "2021-02-01T12:00:00Z|wpo|G53.1={X=1.0 Y=2.0 Z=3.0} G53.2={X=4.0 Y=5.0 Z=6.0} G53.3={X=7.0 " "Y=8.0 Z=9 U=10.0}"); - m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/WORK_OFFSET[wpo]"); + m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/WorkOffsetTable[wpo]"); waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); } From 3e2d4353b6a7a0bf15150cab0fa83015b9e4b133 Mon Sep 17 00:00:00 2001 From: rajani Date: Thu, 8 Sep 2022 15:13:30 -0400 Subject: [PATCH 06/17] Mqtt unit test to publish table... --- test/mqtt_sink_test.cpp | 47 +++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index 36b0539d7..88055c1ad 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -570,8 +570,6 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) { - GTEST_SKIP(); - ConfigOptions options; createServer(options); startServer(); @@ -586,12 +584,43 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) const std::string &payload) { EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/WorkOffsetTable[wpo]", topic); auto jdoc = json::parse(payload); - - string id = jdoc.at("/EntryDefinition/key"_json_pointer).get(); - if (id == string("G53.1")) + auto jValue = jdoc.at("/value"_json_pointer); + int count = 0; + if (jValue.is_object()) { - EXPECT_TRUE(true); - gotControllerDataItem = true; + for (auto &[key, value] : jValue.items()) + { + if (key == "G53.1" || key == "G53.2" || key == "G53.3") + { + for (auto &[subKey, subValue] : value.items()) + { + if (key == "G53.1" && (subKey == "X" && subValue.get() == 1 || + subKey == "Y" && subValue.get() == 2 || + subKey == "Z" && subValue.get() == 3)) + { + count++; + } + else if (key == "G53.2" && (subKey == "X" && subValue.get() == 4 || + subKey == "Y" && subValue.get() == 5 || + subKey == "Z" && subValue.get() == 6)) + { + count++; + } + else if (key == "G53.3" && (subKey == "X" && subValue.get() == 7.0 || + subKey == "Y" && subValue.get() == 8.0 || + subKey == "Z" && subValue.get() == 9.0 || + subKey == "U" && subValue.get() == 10.0)) + { + count++; + } + } + } + } + if (count == 10) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } } }; createClient(options, move(handler)); @@ -603,8 +632,8 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) auto time = chrono::system_clock::now(); m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|wpo|G53.1={X=1.0 Y=2.0 Z=3.0} G53.2={X=4.0 Y=5.0 Z=6.0} G53.3={X=7.0 " - "Y=8.0 Z=9 U=10.0}"); + "2021-02-01T12:00:00Z|wpo|G53.1={X=1.0 Y=2.0 Z=3.0} G53.2={X=4.0 Y=5.0 Z=6.0}" + "G53.3={X=7.0 Y=8.0 Z=9 U=10.0}"); m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/WorkOffsetTable[wpo]"); From 0227adbf146053f8e3f30eb9f6f24a734aa86af5 Mon Sep 17 00:00:00 2001 From: rajani Date: Thu, 20 Oct 2022 09:52:01 -0400 Subject: [PATCH 07/17] added Mqtt TLS use case... --- src/mqtt/mqtt_server_impl.hpp | 2 + test/mqtt_isolated_test.cpp | 121 +++++++++++++++++++++++++++++++++- 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/src/mqtt/mqtt_server_impl.hpp b/src/mqtt/mqtt_server_impl.hpp index b1f5a3ac4..518fdbb24 100644 --- a/src/mqtt/mqtt_server_impl.hpp +++ b/src/mqtt/mqtt_server_impl.hpp @@ -298,6 +298,8 @@ namespace mtconnect { if (!m_server) { boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12); + ctx.set_options(boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::single_dh_use); m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), std::move(ctx), m_ioContext); } diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 7fe7676a6..e68251904 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -312,7 +312,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_tcp_client_should_receive_loopback_publication TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) { - GTEST_SKIP(); + //GTEST_SKIP(); ConfigOptions options { {configuration::Host, "localhost"s}, {configuration::Port, 0}, @@ -334,4 +334,123 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) ASSERT_TRUE(m_client->isConnected()); } +TEST_F(MqttIsolatedUnitTest, mqtt_tls_client_should_receive_loopback_publication) +{ + ConfigOptions options { + {configuration::Host, "localhost"s}, {configuration::Port, 0}, + {configuration::MqttTls, true}, {configuration::AutoAvailable, false}, + {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; + + createServer(options); + startServer(); + + ASSERT_NE(0, m_port); + + std::uint16_t pid_sub1; + + auto client = mqtt::make_async_client(m_agentTestHelper->m_ioContext, "localhost", m_port); + + client->set_client_id("cliendId1"); + client->set_clean_session(true); + client->set_keep_alive_sec(30); + + client->set_connack_handler([&client, &pid_sub1](bool sp, + mqtt::connect_return_code connack_return_code) { + std::cout << "Connack handler called" << std::endl; + std::cout << "Session Present: " << std::boolalpha << sp << std::endl; + std::cout << "Connack Return Code: " << connack_return_code << std::endl; + if (connack_return_code == mqtt::connect_return_code::accepted) + { + pid_sub1 = client->acquire_unique_packet_id(); + + client->async_subscribe(pid_sub1, "mqtt_tls_client_cpp/topic1", MQTT_NS::qos::at_most_once, + //[optional] checking async_subscribe completion code + [](MQTT_NS::error_code ec) { + EXPECT_FALSE(ec); + std::cout << "async_tcp_subscribe callback: " << ec.message() + << std::endl; + }); + } + return true; + }); + client->set_close_handler([] { std::cout << "closed" << std::endl; }); + + client->set_suback_handler( + [&client, &pid_sub1](std::uint16_t packet_id, std::vector results) { + std::cout << "suback received. packet_id: " << packet_id << std::endl; + for (auto const &e : results) + { + std::cout << "subscribe result: " << e << std::endl; + } + + if (packet_id == pid_sub1) + { + client->async_publish("mqtt_tls_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, + //[optional] checking async_publish completion code + [](MQTT_NS::error_code ec) { + EXPECT_FALSE(ec); + + std::cout << "async_tls_publish callback: " << ec.message() + << std::endl; + EXPECT_EQ(ec.message(), "Success"); + }); + return true; + } + + return true; + }); + + client->set_close_handler([] { std::cout << "closed" << std::endl; }); + + client->set_suback_handler( + [&client, &pid_sub1](std::uint16_t packet_id, std::vector results) { + std::cout << "suback received. packet_id: " << packet_id << std::endl; + for (auto const &e : results) + { + std::cout << "subscribe result: " << e << std::endl; + } + + if (packet_id == pid_sub1) + { + client->async_publish("mqtt_tls_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, + //[optional] checking async_publish completion code + [packet_id](MQTT_NS::error_code ec) { + EXPECT_FALSE(ec); + + std::cout << "async_tls_publish callback: " << ec.message() + << std::endl; + ASSERT_TRUE(packet_id); + }); + } + + return true; + }); + + bool received = false; + client->set_publish_handler([&client, &received](mqtt::optional packet_id, + mqtt::publish_options pubopts, + mqtt::buffer topic_name, mqtt::buffer contents) { + std::cout << "publish received." + << " dup: " << pubopts.get_dup() << " qos: " << pubopts.get_qos() + << " retain: " << pubopts.get_retain() << std::endl; + if (packet_id) + std::cout << "packet_id: " << *packet_id << std::endl; + std::cout << "topic_name: " << topic_name << std::endl; + std::cout << "contents: " << contents << std::endl; + + EXPECT_EQ("mqtt_tls_client_cpp/topic1", topic_name); + EXPECT_EQ("test1", contents); + + client->async_disconnect(); + received = true; + return true; + }); + + client->async_connect(); + + m_agentTestHelper->m_ioContext.run(); + + ASSERT_TRUE(received); +} + TEST_F(MqttIsolatedUnitTest, should_conenct_using_authentication) { GTEST_SKIP(); } From c649e42d612ba86b7109b97b15dd2360a89df8cf Mon Sep 17 00:00:00 2001 From: rajani Date: Thu, 20 Oct 2022 09:54:34 -0400 Subject: [PATCH 08/17] skipped MQTT TLS Unit test --- test/mqtt_isolated_test.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index e68251904..044b22805 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -312,7 +312,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_tcp_client_should_receive_loopback_publication TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) { - //GTEST_SKIP(); + GTEST_SKIP(); ConfigOptions options { {configuration::Host, "localhost"s}, {configuration::Port, 0}, @@ -336,6 +336,8 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) TEST_F(MqttIsolatedUnitTest, mqtt_tls_client_should_receive_loopback_publication) { + GTEST_SKIP(); + ConfigOptions options { {configuration::Host, "localhost"s}, {configuration::Port, 0}, {configuration::MqttTls, true}, {configuration::AutoAvailable, false}, From 7090c176a10e8567b4acc66e4e908f236eba13c7 Mon Sep 17 00:00:00 2001 From: rajani Date: Sun, 6 Nov 2022 03:23:48 -0600 Subject: [PATCH 09/17] modifications with MQTT tls mode... --- src/mqtt/mqtt_client_impl.hpp | 6 +- test/mqtt_isolated_test.cpp | 150 ++++------------------------------ 2 files changed, 18 insertions(+), 138 deletions(-) diff --git a/src/mqtt/mqtt_client_impl.hpp b/src/mqtt/mqtt_client_impl.hpp index a882ebbd9..b76f669b6 100644 --- a/src/mqtt/mqtt_client_impl.hpp +++ b/src/mqtt/mqtt_client_impl.hpp @@ -50,6 +50,8 @@ namespace mtconnect { mqtt::protocol_version>; using mqtt_tls_client = mqtt_tls_client_ptr; + using mqtt_tls_client_ws = mqtt_tls_client_ws_ptr; template class MqttClientImpl : public MqttClient @@ -342,7 +344,7 @@ namespace mtconnect { { if (!m_client) { - m_client = mqtt::make_tls_async_client(m_ioContext, m_host, m_port); + m_client = mqtt::make_tls_async_client_ws(m_ioContext, m_host, m_port); auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) { @@ -362,7 +364,7 @@ namespace mtconnect { } protected: - mqtt_tls_client m_client; + mqtt_tls_client_ws m_client; }; } // namespace mqtt_client diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 044b22805..3b326a9f6 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -39,6 +39,10 @@ using namespace mtconnect::sink::rest_sink; using json = nlohmann::json; +const string MqttCACert(PROJECT_ROOT_DIR "/test/resources/clientca.crt"); +const string MqttCCert(PROJECT_ROOT_DIR "/test/resources/client.crt"); +const string MqttCPrivateKey(PROJECT_ROOT_DIR "/test/resources/client.key"); + class MqttIsolatedUnitTest : public testing::Test { protected: @@ -77,6 +81,9 @@ class MqttIsolatedUnitTest : public testing::Test {MqttPort, 0}, {MqttTls, withTlsOption}, {AutoAvailable, false}, + {MqttCaCert, MqttCACert}, + {MqttCert, MqttCCert}, + {MqttPrivateKey, MqttCPrivateKey}, {RealTime, false}}); if (withTlsOption) @@ -137,6 +144,9 @@ class MqttIsolatedUnitTest : public testing::Test {MqttPort, m_port}, {MqttTls, withTlsOption}, {AutoAvailable, false}, + {MqttCaCert, MqttCACert}, + {MqttCert, MqttCCert}, + {MqttPrivateKey, MqttCPrivateKey}, {RealTime, false}}); if (withTlsOption) @@ -169,15 +179,10 @@ class MqttIsolatedUnitTest : public testing::Test uint16_t m_port {0}; }; -const string MqttCACert(PROJECT_ROOT_DIR "/test/resources/clientca.crt"); TEST_F(MqttIsolatedUnitTest, mqtt_client_should_connect_to_broker) { - ConfigOptions options { - {configuration::Host, "localhost"s}, {configuration::Port, 0}, - {configuration::MqttTls, false}, {configuration::AutoAvailable, false}, - {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; - + ConfigOptions options; createServer(options); startServer(); @@ -195,10 +200,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_connect_to_broker) TEST_F(MqttIsolatedUnitTest, mqtt_tcp_client_should_receive_loopback_publication) { - ConfigOptions options { - {configuration::Host, "localhost"s}, {configuration::Port, 0}, - {configuration::MqttTls, false}, {configuration::AutoAvailable, false}, - {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; + ConfigOptions options; createServer(options); startServer(); @@ -312,12 +314,9 @@ TEST_F(MqttIsolatedUnitTest, mqtt_tcp_client_should_receive_loopback_publication TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) { - GTEST_SKIP(); + //GTEST_SKIP(); - ConfigOptions options { - {configuration::Host, "localhost"s}, {configuration::Port, 0}, - {configuration::MqttTls, true}, {configuration::AutoAvailable, false}, - {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; + ConfigOptions options {{configuration::MqttTls, true}}; createServer(options); @@ -334,125 +333,4 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) ASSERT_TRUE(m_client->isConnected()); } -TEST_F(MqttIsolatedUnitTest, mqtt_tls_client_should_receive_loopback_publication) -{ - GTEST_SKIP(); - - ConfigOptions options { - {configuration::Host, "localhost"s}, {configuration::Port, 0}, - {configuration::MqttTls, true}, {configuration::AutoAvailable, false}, - {configuration::RealTime, false}, {configuration::MqttCaCert, MqttCACert}}; - - createServer(options); - startServer(); - - ASSERT_NE(0, m_port); - - std::uint16_t pid_sub1; - - auto client = mqtt::make_async_client(m_agentTestHelper->m_ioContext, "localhost", m_port); - - client->set_client_id("cliendId1"); - client->set_clean_session(true); - client->set_keep_alive_sec(30); - - client->set_connack_handler([&client, &pid_sub1](bool sp, - mqtt::connect_return_code connack_return_code) { - std::cout << "Connack handler called" << std::endl; - std::cout << "Session Present: " << std::boolalpha << sp << std::endl; - std::cout << "Connack Return Code: " << connack_return_code << std::endl; - if (connack_return_code == mqtt::connect_return_code::accepted) - { - pid_sub1 = client->acquire_unique_packet_id(); - - client->async_subscribe(pid_sub1, "mqtt_tls_client_cpp/topic1", MQTT_NS::qos::at_most_once, - //[optional] checking async_subscribe completion code - [](MQTT_NS::error_code ec) { - EXPECT_FALSE(ec); - std::cout << "async_tcp_subscribe callback: " << ec.message() - << std::endl; - }); - } - return true; - }); - client->set_close_handler([] { std::cout << "closed" << std::endl; }); - - client->set_suback_handler( - [&client, &pid_sub1](std::uint16_t packet_id, std::vector results) { - std::cout << "suback received. packet_id: " << packet_id << std::endl; - for (auto const &e : results) - { - std::cout << "subscribe result: " << e << std::endl; - } - - if (packet_id == pid_sub1) - { - client->async_publish("mqtt_tls_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, - //[optional] checking async_publish completion code - [](MQTT_NS::error_code ec) { - EXPECT_FALSE(ec); - - std::cout << "async_tls_publish callback: " << ec.message() - << std::endl; - EXPECT_EQ(ec.message(), "Success"); - }); - return true; - } - - return true; - }); - - client->set_close_handler([] { std::cout << "closed" << std::endl; }); - - client->set_suback_handler( - [&client, &pid_sub1](std::uint16_t packet_id, std::vector results) { - std::cout << "suback received. packet_id: " << packet_id << std::endl; - for (auto const &e : results) - { - std::cout << "subscribe result: " << e << std::endl; - } - - if (packet_id == pid_sub1) - { - client->async_publish("mqtt_tls_client_cpp/topic1", "test1", MQTT_NS::qos::at_most_once, - //[optional] checking async_publish completion code - [packet_id](MQTT_NS::error_code ec) { - EXPECT_FALSE(ec); - - std::cout << "async_tls_publish callback: " << ec.message() - << std::endl; - ASSERT_TRUE(packet_id); - }); - } - - return true; - }); - - bool received = false; - client->set_publish_handler([&client, &received](mqtt::optional packet_id, - mqtt::publish_options pubopts, - mqtt::buffer topic_name, mqtt::buffer contents) { - std::cout << "publish received." - << " dup: " << pubopts.get_dup() << " qos: " << pubopts.get_qos() - << " retain: " << pubopts.get_retain() << std::endl; - if (packet_id) - std::cout << "packet_id: " << *packet_id << std::endl; - std::cout << "topic_name: " << topic_name << std::endl; - std::cout << "contents: " << contents << std::endl; - - EXPECT_EQ("mqtt_tls_client_cpp/topic1", topic_name); - EXPECT_EQ("test1", contents); - - client->async_disconnect(); - received = true; - return true; - }); - - client->async_connect(); - - m_agentTestHelper->m_ioContext.run(); - - ASSERT_TRUE(received); -} - TEST_F(MqttIsolatedUnitTest, should_conenct_using_authentication) { GTEST_SKIP(); } From 930470da05e521576a5b6e49b26b7e5bf9ebc4a0 Mon Sep 17 00:00:00 2001 From: rajani Date: Tue, 8 Nov 2022 11:20:26 -0600 Subject: [PATCH 10/17] mqtt_tls renamed mqtt_certificates --- test/mqtt_isolated_test.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 3b326a9f6..10979dd34 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -39,9 +39,9 @@ using namespace mtconnect::sink::rest_sink; using json = nlohmann::json; -const string MqttCACert(PROJECT_ROOT_DIR "/test/resources/clientca.crt"); -const string MqttCCert(PROJECT_ROOT_DIR "/test/resources/client.crt"); -const string MqttCPrivateKey(PROJECT_ROOT_DIR "/test/resources/client.key"); +const string MqttClientCACert(PROJECT_ROOT_DIR "/test/resources/clientca.crt"); +const string MqttClientCert(PROJECT_ROOT_DIR "/test/resources/client.crt"); +const string MqttClientKey(PROJECT_ROOT_DIR "/test/resources/client.key"); class MqttIsolatedUnitTest : public testing::Test { @@ -81,9 +81,9 @@ class MqttIsolatedUnitTest : public testing::Test {MqttPort, 0}, {MqttTls, withTlsOption}, {AutoAvailable, false}, - {MqttCaCert, MqttCACert}, + /* {MqttCaCert, MqttCACert}, {MqttCert, MqttCCert}, - {MqttPrivateKey, MqttCPrivateKey}, + {MqttPrivateKey, MqttCPrivateKey},*/ {RealTime, false}}); if (withTlsOption) @@ -144,9 +144,9 @@ class MqttIsolatedUnitTest : public testing::Test {MqttPort, m_port}, {MqttTls, withTlsOption}, {AutoAvailable, false}, - {MqttCaCert, MqttCACert}, - {MqttCert, MqttCCert}, - {MqttPrivateKey, MqttCPrivateKey}, + {MqttCaCert, MqttClientCACert}, + {MqttCert, MqttClientCert}, + {MqttPrivateKey, MqttClientKey}, {RealTime, false}}); if (withTlsOption) From 04bb491a4439302cedbdee91f8322424d0b56787 Mon Sep 17 00:00:00 2001 From: rajani Date: Tue, 15 Nov 2022 02:00:36 -0600 Subject: [PATCH 11/17] added mqtt tls with websocket... --- src/mqtt/mqtt_client_impl.hpp | 36 ++++++++++++++++++++++ src/mqtt/mqtt_server_impl.hpp | 58 +++++++++++++++++++++++++++++++++++ test/mqtt_isolated_test.cpp | 58 ++++++++++++++++++++++++++++------- 3 files changed, 141 insertions(+), 11 deletions(-) diff --git a/src/mqtt/mqtt_client_impl.hpp b/src/mqtt/mqtt_client_impl.hpp index b76f669b6..aa4b39a1b 100644 --- a/src/mqtt/mqtt_client_impl.hpp +++ b/src/mqtt/mqtt_client_impl.hpp @@ -15,6 +15,7 @@ // limitations under the License. // +#include #include #include @@ -340,6 +341,40 @@ namespace mtconnect { using base = MqttClientImpl; using base::base; + auto &getClient() + { + if (!m_client) + { + m_client = mqtt::make_tls_async_client(m_ioContext, m_host, m_port); + auto cacert = GetOption(m_options, configuration::MqttCaCert); + if (cacert) + { + m_client->get_ssl_context().load_verify_file(*cacert); + } + /* auto private_key = GetOption(m_options, configuration::MqttPrivateKey); + auto cert = GetOption(m_options, configuration::MqttCert); + if (private_key && cert) + { + m_client->get_ssl_context().set_verify_mode(boost::asio::ssl::verify_peer); + m_client->get_ssl_context().use_certificate_chain_file(*cert); + m_client->get_ssl_context().use_private_key_file(*private_key, + boost::asio::ssl::context::pem); + }*/ + } + + return m_client; + } + + protected: + mqtt_tls_client m_client; + }; + + class MqttTlsWSClient : public MqttClientImpl + { + public: + using base = MqttClientImpl; + using base::base; + auto &getClient() { if (!m_client) @@ -354,6 +389,7 @@ namespace mtconnect { auto cert = GetOption(m_options, configuration::MqttCert); if (private_key && cert) { + m_client->get_ssl_context().set_verify_mode(boost::asio::ssl::verify_peer); m_client->get_ssl_context().use_certificate_chain_file(*cert); m_client->get_ssl_context().use_private_key_file(*private_key, boost::asio::ssl::context::pem); diff --git a/src/mqtt/mqtt_server_impl.hpp b/src/mqtt/mqtt_server_impl.hpp index 518fdbb24..718310ff0 100644 --- a/src/mqtt/mqtt_server_impl.hpp +++ b/src/mqtt/mqtt_server_impl.hpp @@ -288,6 +288,45 @@ namespace mtconnect { m_port = GetOption(options, configuration::Port).value_or(8883); } + using base::base; + using server = MQTT_NS::server_tls<>; + + auto &getServer() { return m_server; } + + auto &createServer() + { + if (!m_server) + { + boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12); + ctx.set_options(boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::single_dh_use); + auto serverPrivateKey = GetOption(m_options, configuration::TlsPrivateKey); + auto serverCert = GetOption(m_options, configuration::TlsCertificateChain); + ctx.use_certificate_chain_file(*serverCert); + ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); + ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); + + m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), + std::move(ctx), m_ioContext); + } + + return *m_server; + } + + protected: + std::optional m_server; + }; + + class MqttTlsWSServer : public MqttServerImpl + { + public: + using base = MqttServerImpl; + MqttTlsWSServer(boost::asio::io_context &ioContext, const ConfigOptions &options) + : base(ioContext, options) + { + m_port = GetOption(options, configuration::Port).value_or(8883); + } + using base::base; using server = MQTT_NS::server_tls_ws<>; @@ -300,6 +339,25 @@ namespace mtconnect { boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12); ctx.set_options(boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::single_dh_use); + auto serverPrivateKey = GetOption(m_options, configuration::TlsPrivateKey); + auto serverCert = GetOption(m_options, configuration::TlsCertificateChain); + ctx.use_certificate_chain_file(*serverCert); + //ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); + ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); + + if (IsOptionSet(m_options, configuration::TlsVerifyClientCertificate)) + { + LOG(info) << "Will only accept client connections with valid certificates"; + + ctx.set_verify_mode(boost::asio::ssl::verify_peer | + boost::asio::ssl::verify_fail_if_no_peer_cert); + if (HasOption(m_options, configuration::MqttCaCert)) + { + LOG(info) << "Adding Client Certificates."; + ctx.load_verify_file(*GetOption(m_options, configuration::MqttCaCert)); + } + } + m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), std::move(ctx), m_ioContext); } diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 10979dd34..a0a5bf271 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -33,6 +33,7 @@ using namespace std; using namespace mtconnect; +using namespace mtconnect::configuration; using namespace mtconnect::device_model::data_item; using namespace mtconnect::sink::mqtt_sink; using namespace mtconnect::sink::rest_sink; @@ -41,7 +42,12 @@ using json = nlohmann::json; const string MqttClientCACert(PROJECT_ROOT_DIR "/test/resources/clientca.crt"); const string MqttClientCert(PROJECT_ROOT_DIR "/test/resources/client.crt"); -const string MqttClientKey(PROJECT_ROOT_DIR "/test/resources/client.key"); +const string MqttClientKey {PROJECT_ROOT_DIR "/test/resources/client.key"}; + +const string ServerCertFile(PROJECT_ROOT_DIR "/test/resources/user.crt"); +const string ServerKeyFile {PROJECT_ROOT_DIR "/test/resources/user.key"}; +const string ServerDhFile {PROJECT_ROOT_DIR "/test/resources/dh2048.pem"}; +const string ServerRootCertFile(PROJECT_ROOT_DIR "/test/resources/rootca.crt"); class MqttIsolatedUnitTest : public testing::Test { @@ -71,19 +77,19 @@ class MqttIsolatedUnitTest : public testing::Test } void createServer(const ConfigOptions &options) - { - using namespace mtconnect::configuration; - + { bool withTlsOption = IsOptionSet(options, configuration::MqttTls); ConfigOptions opts(options); MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, {MqttPort, 0}, {MqttTls, withTlsOption}, - {AutoAvailable, false}, - /* {MqttCaCert, MqttCACert}, - {MqttCert, MqttCCert}, - {MqttPrivateKey, MqttCPrivateKey},*/ + {AutoAvailable, false}, + {TlsCertificateChain, ServerCertFile}, + {TlsPrivateKey, ServerKeyFile}, + {TlsDHKey, ServerDhFile}, + {TlsVerifyClientCertificate, true}, + {MqttCaCert, MqttClientCACert}, {RealTime, false}}); if (withTlsOption) @@ -135,9 +141,7 @@ class MqttIsolatedUnitTest : public testing::Test void createClient(const ConfigOptions &options, unique_ptr &&handler) { - using namespace mtconnect::configuration; - - bool withTlsOption = IsOptionSet(options, configuration::MqttTls); + bool withTlsOption = IsOptionSet(options, configuration::MqttTls); ConfigOptions opts(options); MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, @@ -333,4 +337,36 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) ASSERT_TRUE(m_client->isConnected()); } +TEST_F(MqttIsolatedUnitTest, should_connect_using_tls_ws) +{ + // GTEST_SKIP(); + + ConfigOptions options; + MergeOptions(options, {{ServerIp, "127.0.0.1"s}, + {MqttPort, 0}, + {MqttTls, true}, + {AutoAvailable, false}, + {TlsCertificateChain, ServerCertFile}, + {TlsPrivateKey, ServerKeyFile}, + {TlsDHKey, ServerDhFile}, + {TlsVerifyClientCertificate, true}, + {MqttCaCert, MqttClientCACert}, + {RealTime, false}}); + + m_server = + make_shared(m_agentTestHelper->m_ioContext, options); + + startServer(); + + ASSERT_NE(0, m_port); + + auto handler = make_unique(); + + createClient(options, move(handler)); + + ASSERT_TRUE(startClient()); + + ASSERT_TRUE(m_client->isConnected()); +} + TEST_F(MqttIsolatedUnitTest, should_conenct_using_authentication) { GTEST_SKIP(); } From ab7def51b371b326b4622352790a9a2ec37ef17d Mon Sep 17 00:00:00 2001 From: rajani Date: Thu, 17 Nov 2022 14:37:21 -0600 Subject: [PATCH 12/17] MQTT: TLS support and with tls_ws --- src/mqtt/mqtt_client_impl.hpp | 7 +++++-- src/mqtt/mqtt_server_impl.hpp | 31 +++++++++++++++++++++---------- test/mqtt_isolated_test.cpp | 28 ++++++++++++---------------- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/src/mqtt/mqtt_client_impl.hpp b/src/mqtt/mqtt_client_impl.hpp index aa4b39a1b..9cbbd5644 100644 --- a/src/mqtt/mqtt_client_impl.hpp +++ b/src/mqtt/mqtt_client_impl.hpp @@ -346,12 +346,14 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_tls_async_client(m_ioContext, m_host, m_port); + auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) { m_client->get_ssl_context().load_verify_file(*cacert); } - /* auto private_key = GetOption(m_options, configuration::MqttPrivateKey); + + auto private_key = GetOption(m_options, configuration::MqttPrivateKey); auto cert = GetOption(m_options, configuration::MqttCert); if (private_key && cert) { @@ -359,7 +361,7 @@ namespace mtconnect { m_client->get_ssl_context().use_certificate_chain_file(*cert); m_client->get_ssl_context().use_private_key_file(*private_key, boost::asio::ssl::context::pem); - }*/ + } } return m_client; @@ -380,6 +382,7 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_tls_async_client_ws(m_ioContext, m_host, m_port); + auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) { diff --git a/src/mqtt/mqtt_server_impl.hpp b/src/mqtt/mqtt_server_impl.hpp index 718310ff0..079ec6fd9 100644 --- a/src/mqtt/mqtt_server_impl.hpp +++ b/src/mqtt/mqtt_server_impl.hpp @@ -298,14 +298,25 @@ namespace mtconnect { if (!m_server) { boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12); - ctx.set_options(boost::asio::ssl::context::default_workarounds | + ctx.set_options(boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::single_dh_use); - auto serverPrivateKey = GetOption(m_options, configuration::TlsPrivateKey); - auto serverCert = GetOption(m_options, configuration::TlsCertificateChain); + auto serverPrivateKey = GetOption(m_options, configuration::TlsPrivateKey); + auto serverCert = GetOption(m_options, configuration::TlsCertificateChain); ctx.use_certificate_chain_file(*serverCert); - ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); - ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); - + //ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); + ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); + /*if (IsOptionSet(m_options, configuration::TlsVerifyClientCertificate)) + { + LOG(info) << "Server: Will only accept client connections with valid certificates"; + + ctx.set_verify_mode(boost::asio::ssl::verify_peer | + boost::asio::ssl::verify_fail_if_no_peer_cert); + if (HasOption(m_options, configuration::MqttCaCert)) + { + LOG(info) << "Server: Adding Client Certificates."; + ctx.load_verify_file(*GetOption(m_options, configuration::MqttCaCert)); + } + }*/ m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), std::move(ctx), m_ioContext); } @@ -345,18 +356,18 @@ namespace mtconnect { //ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); - if (IsOptionSet(m_options, configuration::TlsVerifyClientCertificate)) + /*if (IsOptionSet(m_options, configuration::TlsVerifyClientCertificate)) { - LOG(info) << "Will only accept client connections with valid certificates"; + LOG(info) << "Server: Will only accept client connections with valid certificates"; ctx.set_verify_mode(boost::asio::ssl::verify_peer | boost::asio::ssl::verify_fail_if_no_peer_cert); if (HasOption(m_options, configuration::MqttCaCert)) { - LOG(info) << "Adding Client Certificates."; + LOG(info) << "Server: Adding Client Certificates."; ctx.load_verify_file(*GetOption(m_options, configuration::MqttCaCert)); } - } + }*/ m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), std::move(ctx), m_ioContext); diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index a0a5bf271..06918358c 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -40,14 +40,13 @@ using namespace mtconnect::sink::rest_sink; using json = nlohmann::json; -const string MqttClientCACert(PROJECT_ROOT_DIR "/test/resources/clientca.crt"); +const string MqttClientCACert(PROJECT_ROOT_DIR "/test/resources/rootca.crt"); const string MqttClientCert(PROJECT_ROOT_DIR "/test/resources/client.crt"); const string MqttClientKey {PROJECT_ROOT_DIR "/test/resources/client.key"}; const string ServerCertFile(PROJECT_ROOT_DIR "/test/resources/user.crt"); const string ServerKeyFile {PROJECT_ROOT_DIR "/test/resources/user.key"}; const string ServerDhFile {PROJECT_ROOT_DIR "/test/resources/dh2048.pem"}; -const string ServerRootCertFile(PROJECT_ROOT_DIR "/test/resources/rootca.crt"); class MqttIsolatedUnitTest : public testing::Test { @@ -62,14 +61,17 @@ class MqttIsolatedUnitTest : public testing::Test { if (m_client) { - m_client->stop(); - m_agentTestHelper->m_ioContext.run_for(100ms); - m_client.reset(); + if (m_client->isConnected()) + { + m_client->stop(); + m_agentTestHelper->m_ioContext.run_for(100ms); + m_client.reset(); + } } if (m_server) { m_server->stop(); - m_agentTestHelper->m_ioContext.run_for(500ms); + m_agentTestHelper->m_ioContext.run_for(100ms); m_server.reset(); } m_agentTestHelper.reset(); @@ -86,10 +88,7 @@ class MqttIsolatedUnitTest : public testing::Test {MqttTls, withTlsOption}, {AutoAvailable, false}, {TlsCertificateChain, ServerCertFile}, - {TlsPrivateKey, ServerKeyFile}, - {TlsDHKey, ServerDhFile}, - {TlsVerifyClientCertificate, true}, - {MqttCaCert, MqttClientCACert}, + {TlsPrivateKey, ServerKeyFile}, {RealTime, false}}); if (withTlsOption) @@ -150,7 +149,7 @@ class MqttIsolatedUnitTest : public testing::Test {AutoAvailable, false}, {MqttCaCert, MqttClientCACert}, {MqttCert, MqttClientCert}, - {MqttPrivateKey, MqttClientKey}, + {MqttPrivateKey, MqttClientKey}, {RealTime, false}}); if (withTlsOption) @@ -166,7 +165,7 @@ class MqttIsolatedUnitTest : public testing::Test } bool startClient() - { + { bool started = m_client && m_client->start(); if (started) { @@ -347,10 +346,7 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls_ws) {MqttTls, true}, {AutoAvailable, false}, {TlsCertificateChain, ServerCertFile}, - {TlsPrivateKey, ServerKeyFile}, - {TlsDHKey, ServerDhFile}, - {TlsVerifyClientCertificate, true}, - {MqttCaCert, MqttClientCACert}, + {TlsPrivateKey, ServerKeyFile}, {RealTime, false}}); m_server = From a837898e93e1ffc7f83b071f5454d9530e180209 Mon Sep 17 00:00:00 2001 From: rajani Date: Fri, 18 Nov 2022 14:19:11 -0600 Subject: [PATCH 13/17] modification with MQTT TLS WS --- test/mqtt_isolated_test.cpp | 40 ++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 86a11dfab..cfc372672 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -61,17 +61,14 @@ class MqttIsolatedUnitTest : public testing::Test { if (m_client) { - if (m_client->isConnected()) - { - m_client->stop(); - m_agentTestHelper->m_ioContext.run_for(100ms); - m_client.reset(); - } + m_client->stop(); + m_agentTestHelper->m_ioContext.run_for(100ms); + m_client.reset(); } if (m_server) { m_server->stop(); - m_agentTestHelper->m_ioContext.run_for(100ms); + m_agentTestHelper->m_ioContext.run_for(500ms); m_server.reset(); } m_agentTestHelper.reset(); @@ -338,16 +335,16 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) TEST_F(MqttIsolatedUnitTest, should_connect_using_tls_ws) { - // GTEST_SKIP(); + //GTEST_SKIP(); ConfigOptions options; MergeOptions(options, {{ServerIp, "127.0.0.1"s}, - {MqttPort, 0}, - {MqttTls, true}, - {AutoAvailable, false}, - {TlsCertificateChain, ServerCertFile}, - {TlsPrivateKey, ServerKeyFile}, - {RealTime, false}}); + {MqttPort, 0}, + {MqttTls, true}, + {AutoAvailable, false}, + {TlsCertificateChain, ServerCertFile}, + {TlsPrivateKey, ServerKeyFile}, + {RealTime, false}}); m_server = make_shared(m_agentTestHelper->m_ioContext, options); @@ -358,8 +355,19 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls_ws) auto handler = make_unique(); - createClient(options, move(handler)); - + ConfigOptions opts; + MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, + {MqttPort, m_port}, + {MqttTls, true}, + {AutoAvailable, false}, + {MqttCaCert, MqttClientCACert}, + {MqttCert, MqttClientCert}, + {MqttPrivateKey, MqttClientKey}, + {RealTime, false}}); + + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, move(handler)); + ASSERT_TRUE(startClient()); ASSERT_TRUE(m_client->isConnected()); From 4d714d87087572fa9fdef3ee8b5a50aa2f94fab8 Mon Sep 17 00:00:00 2001 From: rajani Date: Tue, 22 Nov 2022 22:42:15 -0600 Subject: [PATCH 14/17] Fix: #236 MQTT Sink / Source Username and Password Config --- src/configuration/config_options.hpp | 2 ++ src/mqtt/mqtt_client_impl.hpp | 14 +++++++++++++- src/mqtt/mqtt_server_impl.hpp | 18 +++++++----------- test/mqtt_sink_test.cpp | 27 +++++++++------------------ 4 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/configuration/config_options.hpp b/src/configuration/config_options.hpp index e8b491137..38454d923 100644 --- a/src/configuration/config_options.hpp +++ b/src/configuration/config_options.hpp @@ -67,6 +67,8 @@ namespace mtconnect { DECLARE_CONFIGURATION(MqttPort); DECLARE_CONFIGURATION(MqttHost); DECLARE_CONFIGURATION(MqttConnectInterval); + DECLARE_CONFIGURATION(MqttUserName); + DECLARE_CONFIGURATION(MqttPassword); // Adapter Configuration DECLARE_CONFIGURATION(AdapterIdentity); diff --git a/src/mqtt/mqtt_client_impl.hpp b/src/mqtt/mqtt_client_impl.hpp index 9cbbd5644..5bca6651c 100644 --- a/src/mqtt/mqtt_client_impl.hpp +++ b/src/mqtt/mqtt_client_impl.hpp @@ -64,6 +64,8 @@ namespace mtconnect { m_options(options), m_host(GetOption(options, configuration::MqttHost).value_or("localhost")), m_port(GetOption(options, configuration::MqttPort).value_or(1883)), + m_username(GetOption(options, configuration::MqttUserName).value_or("none")), + m_password(GetOption(options, configuration::MqttPassword).value_or("none")), m_reconnectTimer(ioContext) { std::stringstream url; @@ -312,6 +314,10 @@ namespace mtconnect { std::uint16_t m_clientId {0}; + std::string m_username; + + std::string m_password; + boost::asio::steady_timer m_reconnectTimer; }; @@ -326,6 +332,8 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_async_client(m_ioContext, m_host, m_port); + m_client->set_user_name(m_username); + m_client->set_password(m_password); } return m_client; @@ -346,6 +354,8 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_tls_async_client(m_ioContext, m_host, m_port); + m_client->set_user_name(m_username); + m_client->set_password(m_password); auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) @@ -382,7 +392,9 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_tls_async_client_ws(m_ioContext, m_host, m_port); - + m_client->set_user_name(m_username); + m_client->set_password(m_password); + auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) { diff --git a/src/mqtt/mqtt_server_impl.hpp b/src/mqtt/mqtt_server_impl.hpp index 4b26be008..6b2dab9ac 100644 --- a/src/mqtt/mqtt_server_impl.hpp +++ b/src/mqtt/mqtt_server_impl.hpp @@ -305,18 +305,14 @@ namespace mtconnect { ctx.use_certificate_chain_file(*serverCert); //ctx.use_tmp_dh_file(*GetOption(m_options, configuration::TlsDHKey)); ctx.use_private_key_file(*serverPrivateKey, boost::asio::ssl::context::pem); - /*if (IsOptionSet(m_options, configuration::TlsVerifyClientCertificate)) + + if (HasOption(m_options, configuration::TlsCertificatePassword)) { - LOG(info) << "Server: Will only accept client connections with valid certificates"; - - ctx.set_verify_mode(boost::asio::ssl::verify_peer | - boost::asio::ssl::verify_fail_if_no_peer_cert); - if (HasOption(m_options, configuration::MqttCaCert)) - { - LOG(info) << "Server: Adding Client Certificates."; - ctx.load_verify_file(*GetOption(m_options, configuration::MqttCaCert)); - } - }*/ + ctx.set_password_callback( + [this](size_t, boost::asio::ssl::context_base::password_purpose) -> string { + return *GetOption(m_options, configuration::TlsCertificatePassword); + }); + } m_server.emplace(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), m_port), std::move(ctx), m_ioContext); } diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index c3b5d7362..7edf32b30 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -83,6 +83,8 @@ class MqttSinkTest : public testing::Test ConfigOptions opts(options); MergeOptions(opts, {{"MqttSink", true}, {configuration::MqttPort, m_port}, + {configuration::MqttUserName, "MQTT-SINK"s}, + {configuration::MqttPassword, "mtconnect"s}, {configuration::MqttHost, "127.0.0.1"s}}); m_agentTestHelper->createAgent("/samples/test_config.xml", 8, 4, "2.0", 25, false, true, opts); addAdapter(); @@ -98,6 +100,8 @@ class MqttSinkTest : public testing::Test {MqttPort, 0}, {MqttTls, false}, {AutoAvailable, false}, + {configuration::MqttUserName, "MQTT-SINK"s}, + {configuration::MqttPassword, "mtconnect"s}, {RealTime, false}}); m_server = @@ -147,6 +151,8 @@ class MqttSinkTest : public testing::Test {MqttPort, m_port}, {MqttTls, false}, {AutoAvailable, false}, + {MqttUserName, "MQTT-SINK"s}, + {MqttPassword, "mtconnect"s}, {RealTime, false}}); m_client = make_shared(m_agentTestHelper->m_ioContext, opts, move(handler)); @@ -259,13 +265,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Streams) { EXPECT_TRUE(true); foundLineDataItem = true; - } - // this below code not working currently - /*ErrorList list; - auto ptr = parser.parse(device_model::data_item::DataItem::getRoot(), payload, "2.0", list); - auto dataItem = dynamic_pointer_cast(ptr); - if (dataItem) - EXPECT_EQ(string("204"),dataItem->getValue());*/ + } }; createClient(options, move(handler)); ASSERT_TRUE(startClient()); @@ -277,11 +277,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Streams) m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/Line[line]"); - - // m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|lp|NORMAL||||"); - - // m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/LogicProgram"); - + waitFor(2s, [&foundLineDataItem]() { return foundLineDataItem; }); } @@ -306,12 +302,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) { EXPECT_TRUE(true); gotControllerDataItem = true; - } - /*ErrorList list; - auto ptr = parser.parse(Asset::getRoot(), payload, "2.0", list); - EXPECT_EQ(0, list.size()); - auto asset = dynamic_cast(ptr.get()); - EXPECT_TRUE(asset);*/ + } }; createClient(options, move(handler)); ASSERT_TRUE(startClient()); From fb1e7eab617435a06cba970424fb63f06d31df32 Mon Sep 17 00:00:00 2001 From: rajani Date: Wed, 23 Nov 2022 00:03:44 -0600 Subject: [PATCH 15/17] added back missing unit tests for mqtt table, dataset, rotatrymode, tempa, dynamic caliber and linear load use case... looks like these units are not merged properly... --- test/mqtt_isolated_test.cpp | 5 +- test/mqtt_sink_test.cpp | 279 ++++++++++++++++++++++++++++++++++++ 2 files changed, 282 insertions(+), 2 deletions(-) diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index cfc372672..5fb9d5b3e 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -85,7 +85,8 @@ class MqttIsolatedUnitTest : public testing::Test {MqttTls, withTlsOption}, {AutoAvailable, false}, {TlsCertificateChain, ServerCertFile}, - {TlsPrivateKey, ServerKeyFile}, + {TlsPrivateKey, ServerKeyFile}, + {TlsCertificatePassword, "mtconnect"s}, {RealTime, false}}); if (withTlsOption) @@ -335,7 +336,7 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) TEST_F(MqttIsolatedUnitTest, should_connect_using_tls_ws) { - //GTEST_SKIP(); + GTEST_SKIP(); ConfigOptions options; MergeOptions(options, {{ServerIp, "127.0.0.1"s}, diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index 7edf32b30..198938568 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -318,3 +318,282 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); } + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_RotaryMode) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotRotaryMode = false; + handler->m_receive = [&gotRotaryMode, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Rotary[C]/Events/RotaryMode[Smode]", topic); + auto jdoc = json::parse(payload); + + string id = jdoc.at("/value"_json_pointer).get(); + if (id == string("SPINDLE")) + { + EXPECT_TRUE(true); + gotRotaryMode = true; + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent("/samples/discrete_example.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|block|G01X00|Smode|INDEX|line|204"); + + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Rotary[C]/Events/RotaryMode[Smode]"); + + waitFor(3s, [&gotRotaryMode]() { return gotRotaryMode; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + entity::JsonParser parser; + auto handler = make_unique(); + bool gotControllerDataItem = false; + handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]", topic); + auto jdoc = json::parse(payload); + string id = jdoc.at("/Part/a"_json_pointer).get(); + + if (id == string("1")) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + createAgent("/samples/data_set.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + auto time = chrono::system_clock::now(); + m_agentTestHelper->m_adapter->processData("TIME|vars|a=1 b=2 c=3"); + m_client->subscribe("MTConnect/Observation/000/Controller[Controller]/Path/VARIABLE[vars]"); + waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + entity::JsonParser parser; + auto handler = make_unique(); + bool gotControllerDataItem = false; + handler->m_receive = [&gotControllerDataItem, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ( + "MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/WorkOffsetTable[wpo]", + topic); + auto jdoc = json::parse(payload); + + auto jValue = jdoc.at("/value"_json_pointer); + int count = 0; + if (jValue.is_object()) + { + for (auto &[key, value] : jValue.items()) + { + if (key == "G53.1" || key == "G53.2" || key == "G53.3") + { + for (auto &[subKey, subValue] : value.items()) + { + if (key == "G53.1" && (subKey == "X" && subValue.get() == 1 || + subKey == "Y" && subValue.get() == 2 || + subKey == "Z" && subValue.get() == 3)) + { + count++; + } + else if (key == "G53.2" && (subKey == "X" && subValue.get() == 4 || + subKey == "Y" && subValue.get() == 5 || + subKey == "Z" && subValue.get() == 6)) + { + count++; + } + else if (key == "G53.3" && (subKey == "X" && subValue.get() == 7.0 || + subKey == "Y" && subValue.get() == 8.0 || + subKey == "Z" && subValue.get() == 9.0 || + subKey == "U" && subValue.get() == 10.0)) + { + count++; + } + } + } + } + if (count == 10) + { + EXPECT_TRUE(true); + gotControllerDataItem = true; + } + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + createAgent("/samples/data_set.xml"); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + auto time = chrono::system_clock::now(); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|wpo|G53.1={X=1.0 Y=2.0 Z=3.0} G53.2={X=4.0 Y=5.0 Z=6.0}" + "G53.3={X=7.0 Y=8.0 Z=9 U=10.0}"); + + m_client->subscribe( + "MTConnect/Observation/000/Controller[Controller]/Path[path]/Events/" + "WorkOffsetTable[wpo]"); + + waitFor(3s, [&gotControllerDataItem]() { return gotControllerDataItem; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_Temperature) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotTemperature = false; + handler->m_receive = [&gotTemperature, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ( + "MTConnect/Observation/000/Axes[Axes]/Linear[Z]/Motor[motor_name]/Samples/" + "Temperature[z_motor_temp]", + topic); + auto jdoc = json::parse(payload); + + auto value = jdoc.at("/value"_json_pointer); + double load = 81.0; + if (value.is_number()) + { + if (load == double(value)) + { + EXPECT_TRUE(true); + gotTemperature = true; + } + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData("2018-04-27T05:00:26.555666|z_motor_temp|81"); + + m_client->subscribe( + "MTConnect/Observation/000/Axes[Axes]/Linear[Z]/Motor[motor_name]/Samples/" + "Temperature[z_motor_temp]"); + + waitFor(3s, [&gotTemperature]() { return gotTemperature; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_LinearLoad) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + entity::JsonParser parser; + auto handler = make_unique(); + bool gotLinearLoad = false; + handler->m_receive = [&gotLinearLoad, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Load[Xload]", topic); + auto jdoc = json::parse(payload); + auto value = jdoc.at("/value"_json_pointer); + double load = 50.0; + if (value.is_number()) + { + if (load == double(value)) + { + EXPECT_TRUE(true); + gotLinearLoad = true; + } + } + }; + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + m_agentTestHelper->m_adapter->processData("2018-04-27T05:00:26.555666|Xload|50"); + m_client->subscribe("MTConnect/Observation/000/Axes[Axes]/Linear[X]/Load[Xload]"); + waitFor(3s, [&gotLinearLoad]() { return gotLinearLoad; }); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_publish_DynamicCalibration) +{ + ConfigOptions options; + createServer(options); + startServer(); + ASSERT_NE(0, m_port); + + entity::JsonParser parser; + + auto handler = make_unique(); + bool gotCalibration = false; + handler->m_receive = [&gotCalibration, &parser](std::shared_ptr, + const std::string &topic, + const std::string &payload) { + EXPECT_EQ( + "MTConnect/Observation/000/Axes[Axes]/Linear[X]/Samples/PositionTimeSeries.Actual[Xts]", + topic); + auto jdoc = json::parse(payload); + + auto value = jdoc.at("/value"_json_pointer); + + if (value.is_array()) + { + if (value.size() == 25) + { + EXPECT_TRUE(true); + gotCalibration = true; + } + } + }; + + createClient(options, move(handler)); + ASSERT_TRUE(startClient()); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); + + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|Xts|25|| 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 5118 " + "5119 5119 5118 " + "5118 5117 5117 5119 5119 5118 5118 5118 5118 5118"); + + m_client->subscribe( + "MTConnect/Observation/000/Axes[Axes]/Linear[X]/Samples/PositionTimeSeries.Actual[Xts]"); + + waitFor(3s, [&gotCalibration]() { return gotCalibration; }); +} From 391a67fda5f2602bb01452f4284f62a41f5d481c Mon Sep 17 00:00:00 2001 From: rajani Date: Thu, 24 Nov 2022 00:25:10 -0600 Subject: [PATCH 16/17] added unit test for mqtt-sink with username and password --- src/mqtt/mqtt_client_impl.hpp | 31 +++++++++++++++-------- test/mqtt_sink_test.cpp | 47 +++++++++++++++++++++++++---------- 2 files changed, 54 insertions(+), 24 deletions(-) diff --git a/src/mqtt/mqtt_client_impl.hpp b/src/mqtt/mqtt_client_impl.hpp index 5bca6651c..eb9c59c43 100644 --- a/src/mqtt/mqtt_client_impl.hpp +++ b/src/mqtt/mqtt_client_impl.hpp @@ -63,11 +63,14 @@ namespace mtconnect { : MqttClient(ioContext, move(handler)), m_options(options), m_host(GetOption(options, configuration::MqttHost).value_or("localhost")), - m_port(GetOption(options, configuration::MqttPort).value_or(1883)), - m_username(GetOption(options, configuration::MqttUserName).value_or("none")), - m_password(GetOption(options, configuration::MqttPassword).value_or("none")), + m_port(GetOption(options, configuration::MqttPort).value_or(1883)), m_reconnectTimer(ioContext) { + if (auto userName = GetOption(options, configuration::MqttUserName)) + m_username = *userName; + if (auto pswd = GetOption(options, configuration::MqttPassword)) + m_password = *pswd; + std::stringstream url; url << "mqtt://" << m_host << ':' << m_port; m_url = url.str(); @@ -314,9 +317,9 @@ namespace mtconnect { std::uint16_t m_clientId {0}; - std::string m_username; + std::string m_username = ""; - std::string m_password; + std::string m_password = ""; boost::asio::steady_timer m_reconnectTimer; }; @@ -332,8 +335,10 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_async_client(m_ioContext, m_host, m_port); - m_client->set_user_name(m_username); - m_client->set_password(m_password); + if (!m_username.empty()) + m_client->set_user_name(m_username); + if (!m_password.empty()) + m_client->set_password(m_password); } return m_client; @@ -354,8 +359,10 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_tls_async_client(m_ioContext, m_host, m_port); - m_client->set_user_name(m_username); - m_client->set_password(m_password); + if (!m_username.empty()) + m_client->set_user_name(m_username); + if (!m_password.empty()) + m_client->set_password(m_password); auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) @@ -392,8 +399,10 @@ namespace mtconnect { if (!m_client) { m_client = mqtt::make_tls_async_client_ws(m_ioContext, m_host, m_port); - m_client->set_user_name(m_username); - m_client->set_password(m_password); + if (!m_username.empty()) + m_client->set_user_name(m_username); + if (!m_password.empty()) + m_client->set_password(m_password); auto cacert = GetOption(m_options, configuration::MqttCaCert); if (cacert) diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index 198938568..79f61fff3 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -36,9 +36,8 @@ using namespace std; using namespace mtconnect; using namespace mtconnect::device_model::data_item; using namespace mtconnect::sink::mqtt_sink; -using namespace mtconnect::sink::rest_sink; using namespace mtconnect::asset; -using namespace mtconnect::entity; +using namespace mtconnect::configuration; using json = nlohmann::json; @@ -82,9 +81,7 @@ class MqttSinkTest : public testing::Test ConfigOptions opts(options); MergeOptions(opts, {{"MqttSink", true}, - {configuration::MqttPort, m_port}, - {configuration::MqttUserName, "MQTT-SINK"s}, - {configuration::MqttPassword, "mtconnect"s}, + {configuration::MqttPort, m_port}, {configuration::MqttHost, "127.0.0.1"s}}); m_agentTestHelper->createAgent("/samples/test_config.xml", 8, 4, "2.0", 25, false, true, opts); addAdapter(); @@ -99,9 +96,7 @@ class MqttSinkTest : public testing::Test MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, {MqttPort, 0}, {MqttTls, false}, - {AutoAvailable, false}, - {configuration::MqttUserName, "MQTT-SINK"s}, - {configuration::MqttPassword, "mtconnect"s}, + {AutoAvailable, false}, {RealTime, false}}); m_server = @@ -144,15 +139,12 @@ class MqttSinkTest : public testing::Test } void createClient(const ConfigOptions &options, unique_ptr &&handler) - { - using namespace mtconnect::configuration; + { ConfigOptions opts(options); MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, {MqttPort, m_port}, {MqttTls, false}, - {AutoAvailable, false}, - {MqttUserName, "MQTT-SINK"s}, - {MqttPassword, "mtconnect"s}, + {AutoAvailable, false}, {RealTime, false}}); m_client = make_shared(m_agentTestHelper->m_ioContext, opts, move(handler)); @@ -204,6 +196,35 @@ TEST_F(MqttSinkTest, mqtt_sink_should_connect_to_broker) ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); } +TEST_F(MqttSinkTest, mqtt_sink_should_connect_to_broker_with_UserNameandPassword) +{ + ConfigOptions options {{MqttUserName, "MQTT-SINK"s}, + {MqttPassword, "mtconnect"s}}; + createServer(options); + startServer(); + + ASSERT_NE(0, m_port); + + createAgent("", options); + auto service = m_agentTestHelper->getMqttService(); + + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); +} + +TEST_F(MqttSinkTest, mqtt_sink_should_connect_to_broker_without_UserNameandPassword) +{ + ConfigOptions options; + createServer(options); + startServer(); + + ASSERT_NE(0, m_port); + + createAgent(); + auto service = m_agentTestHelper->getMqttService(); + + ASSERT_TRUE(waitFor(1s, [&service]() { return service->isConnected(); })); +} + TEST_F(MqttSinkTest, mqtt_sink_should_publish_device) { ConfigOptions options; From 3289334a74302bd477f3426de4e655a93393f603 Mon Sep 17 00:00:00 2001 From: rajani Date: Tue, 29 Nov 2022 14:54:24 -0600 Subject: [PATCH 17/17] skipped mqtt_tls unittest... --- test/mqtt_isolated_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 5fb9d5b3e..d7e4eaa4f 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -315,7 +315,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_tcp_client_should_receive_loopback_publication TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) { - //GTEST_SKIP(); + GTEST_SKIP(); ConfigOptions options {{configuration::MqttTls, true}};