From 68a6c11eae9fa829893d974b8230f311ae561a3f Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Sun, 2 Apr 2023 16:10:56 -0700 Subject: [PATCH 1/4] Checkpoint before data set refactor --- src/mtconnect/agent.hpp | 5 ++ src/mtconnect/buffer/checkpoint.cpp | 1 - src/mtconnect/buffer/checkpoint.hpp | 84 +++++++++++++++++++ src/mtconnect/buffer/circular_buffer.hpp | 9 ++ src/mtconnect/entity/entity.hpp | 2 +- src/mtconnect/mqtt/mqtt_client_impl.hpp | 2 +- src/mtconnect/observation/observation.hpp | 14 ++++ src/mtconnect/pipeline/duplicate_filter.hpp | 25 ++---- src/mtconnect/pipeline/pipeline_contract.hpp | 6 ++ src/mtconnect/printer/json_printer_helper.hpp | 9 +- src/mtconnect/sink/mqtt_sink/mqtt_service.cpp | 6 +- src/mtconnect/sink/rest_sink/rest_service.cpp | 10 +-- src/mtconnect/sink/rest_sink/server.cpp | 8 +- src/mtconnect/sink/rest_sink/server.hpp | 4 +- src/mtconnect/sink/rest_sink/session_impl.cpp | 16 ++-- src/mtconnect/sink/rest_sink/session_impl.hpp | 3 +- .../source/adapter/adapter_pipeline.cpp | 8 +- .../source/adapter/mqtt/mqtt_adapter.cpp | 4 +- test/agent_adapter_test.cpp | 1 + test/agent_test.cpp | 3 +- test/agent_test_helper.hpp | 8 +- test/data_item_mapping_test.cpp | 1 + test/duplicate_filter_test.cpp | 19 ++++- test/embedded_ruby_test.cpp | 1 + test/http_server_test.cpp | 18 ++-- test/mqtt_isolated_test.cpp | 12 +-- test/mqtt_sink_test.cpp | 20 ++--- test/mtconnect_xml_transform_test.cpp | 1 + test/period_filter_test.cpp | 1 + test/response_document_test.cpp | 1 + test/testsink_service.hpp | 2 +- test/tls_http_server_test.cpp | 8 +- test/topic_mapping_test.cpp | 1 + 33 files changed, 223 insertions(+), 90 deletions(-) diff --git a/src/mtconnect/agent.hpp b/src/mtconnect/agent.hpp index c893d1f6..994251fd 100644 --- a/src/mtconnect/agent.hpp +++ b/src/mtconnect/agent.hpp @@ -551,6 +551,11 @@ namespace mtconnect { void sourceFailed(const std::string &identity) override { m_agent->sourceFailed(identity); } + bool isDuplicate(const ObservationPtr &obs) const override + { + return m_agent->getCircularBuffer().isDuplicate(obs); + } + protected: Agent *m_agent; }; diff --git a/src/mtconnect/buffer/checkpoint.cpp b/src/mtconnect/buffer/checkpoint.cpp index da052170..106c11e2 100644 --- a/src/mtconnect/buffer/checkpoint.cpp +++ b/src/mtconnect/buffer/checkpoint.cpp @@ -271,6 +271,5 @@ namespace mtconnect { return true; } - } // namespace buffer } // namespace mtconnect diff --git a/src/mtconnect/buffer/checkpoint.hpp b/src/mtconnect/buffer/checkpoint.hpp index e2888e0a..4b4d91c6 100644 --- a/src/mtconnect/buffer/checkpoint.hpp +++ b/src/mtconnect/buffer/checkpoint.hpp @@ -51,6 +51,90 @@ namespace mtconnect::buffer { /// @return `true` if the data set changed bool dataSetDifference(observation::ObservationPtr observation) const; + /// @brief Checks if the observation is a duplicate with existing observations + /// @param[in] obs the observation + /// @return `true` if the obsrvation is a duplicate + bool isDuplicate(const observation::ObservationPtr &obs) const + { + using namespace observation; + using namespace std; + + auto di = obs->getDataItem(); + const auto &id = di->getId(); + auto old = m_observations.find(id); + + if (old != m_observations.end()) + { + auto &oldObs = old->second; + if (obs->isUnavailable() != oldObs->isUnavailable()) + return false; + + if (di->isCondition()) + { + auto *cond = dynamic_cast(obs.get()); + auto *oldCond = dynamic_cast(oldObs.get()); + + // Check for normal resetting all conditions. If there are + // no active conditions, then this is a duplicate normal + if (cond->getLevel() == Condition::NORMAL && cond->getCode().empty()) + { + if (oldCond->getLevel() == Condition::NORMAL && oldCond->getCode().empty()) + return true; + else + return false; + } + + // If there is already an active condition with this code, + // then check if nothing has changed between activations. + if (const auto &e = oldCond->find(cond->getCode())) + { + if (cond->getLevel() != e->getLevel()) + return false; + + if ((cond->hasValue() != e->hasValue()) || + (cond->hasValue() && cond->getValue() != e->getValue())) + return false; + + if ((cond->hasProperty("qualifier") != e->hasProperty("qualifier")) || + (cond->hasProperty("qualifier") && + cond->get("qualifier") != e->get("qualifier"))) + return false; + + if ((cond->hasProperty("nativeSeverity") != e->hasProperty("nativeSeverity")) || + (cond->hasProperty("nativeSeverity") && + cond->get("nativeSeverity") != e->get("nativeSeverity"))) + return false; + + return true; + } + else + { + return false; + } + } + else if (!di->isDiscrete()) + { + if (di->isDataSet()) + { + auto set = dynamic_pointer_cast(obs); + if (!obs->hasProperty("resetTriggered") && !set->getDataSet().empty()) + { + auto oldSet = dynamic_pointer_cast(oldObs); + return set->getDataSet() != oldSet->getDataSet(); + } + } + else + { + auto &value = obs->getValue(); + auto &oldValue = oldObs->getValue(); + + return value == oldValue; + } + } + } + return false; + } + /// @brief copy another checkpoint to this checkpoint /// @param[in] checkpoint a checkpoint to copy /// @param[in] filterSet an optional filter set diff --git a/src/mtconnect/buffer/circular_buffer.hpp b/src/mtconnect/buffer/circular_buffer.hpp index e4e33d73..8b47b0a9 100644 --- a/src/mtconnect/buffer/circular_buffer.hpp +++ b/src/mtconnect/buffer/circular_buffer.hpp @@ -173,6 +173,15 @@ namespace mtconnect::buffer { auto getCheckpointFreq() const { return m_checkpointFreq; } auto getCheckpointCount() const { return m_checkpointCount; } + /// @brief Check if observation is a duplicate by validating against the latest checkpoint + /// @param[in] obs the observation to check + /// @return `true` if the observation is a duplicate + bool isDuplicate(const observation::ObservationPtr &obs) const + { + std::lock_guard lock(m_sequenceLock); + return m_latest.isDuplicate(obs); + } + /// @brief Get a checkpoint at a sequence number /// @param at the sequence number to get the checkpoint at /// @param filterSet the filter to apply to the new checkpoint diff --git a/src/mtconnect/entity/entity.hpp b/src/mtconnect/entity/entity.hpp index 31c28e4d..f5eebb56 100644 --- a/src/mtconnect/entity/entity.hpp +++ b/src/mtconnect/entity/entity.hpp @@ -386,7 +386,7 @@ namespace mtconnect { AttributeSet m_attributes; }; - /// @brief variant visitor to compare two entities for equality + /// @brief variant visitor to compare two entity parameter values for equality struct ValueEqualVisitor { ValueEqualVisitor(const Value &t) : m_this(t) {} diff --git a/src/mtconnect/mqtt/mqtt_client_impl.hpp b/src/mtconnect/mqtt/mqtt_client_impl.hpp index 45782bb1..706810c0 100644 --- a/src/mtconnect/mqtt/mqtt_client_impl.hpp +++ b/src/mtconnect/mqtt/mqtt_client_impl.hpp @@ -70,7 +70,7 @@ namespace mtconnect { /// - MqttHost, defaults to LocalHost MqttClientImpl(boost::asio::io_context &ioContext, const ConfigOptions &options, std::unique_ptr &&handler) - : MqttClient(ioContext, move(handler)), + : MqttClient(ioContext, std::move(handler)), m_options(options), m_host(GetOption(options, configuration::MqttHost).value_or("localhost")), m_port(GetOption(options, configuration::MqttPort).value_or(1883)), diff --git a/src/mtconnect/observation/observation.hpp b/src/mtconnect/observation/observation.hpp index 32131f48..ecb85d63 100644 --- a/src/mtconnect/observation/observation.hpp +++ b/src/mtconnect/observation/observation.hpp @@ -351,6 +351,20 @@ namespace mtconnect::observation { return nullptr; } + /// @brief const find a condition by code in the condition list + /// @param[in] code te code + /// @return shared pointer to the condition if found + const ConditionPtr find(const std::string &code) const + { + if (m_code == code) + return std::dynamic_pointer_cast(Entity::getptr()); + + if (m_prev) + return m_prev->find(code); + + return nullptr; + } + /// @brief replace a condition with another in the condition list /// @param[in] old the condition to be placed /// @param[in] _new the replacement condition diff --git a/src/mtconnect/pipeline/duplicate_filter.hpp b/src/mtconnect/pipeline/duplicate_filter.hpp index 2b7540c6..05800514 100644 --- a/src/mtconnect/pipeline/duplicate_filter.hpp +++ b/src/mtconnect/pipeline/duplicate_filter.hpp @@ -34,15 +34,13 @@ namespace mtconnect::pipeline { DuplicateFilter(const DuplicateFilter &) = default; /// @brief Create a duplicate filter with shared state from the context /// @param context the context - DuplicateFilter(PipelineContextPtr context) - : Transform("DuplicateFilter"), m_state(context->getSharedState(m_name)) + DuplicateFilter(PipelineContextPtr context) : Transform("DuplicateFilter"), m_context(context) { using namespace observation; static constexpr auto lambda = [](const Observation &o) { - return !o.isOrphan() && !o.getDataItem()->isDiscrete(); + return !o.isOrphan() && !o.getDataItem()->isDiscrete() && !o.getDataItem()->isDataSet(); }; - m_guard = LambdaGuard>( - lambda, RUN) || + m_guard = LambdaGuard>(lambda, RUN) || TypeGuard(SKIP); } ~DuplicateFilter() override = default; @@ -50,29 +48,18 @@ namespace mtconnect::pipeline { const entity::EntityPtr operator()(const entity::EntityPtr entity) override { using namespace observation; - std::lock_guard guard(*m_state); auto o = std::dynamic_pointer_cast(entity); if (o->isOrphan()) return entity::EntityPtr(); - auto di = o->getDataItem(); - auto &id = di->getId(); - - auto &values = m_state->m_values; - auto old = values.find(id); - if (old != values.end() && old->second == o->getValue()) + if (m_context->m_contract->isDuplicate(o)) return entity::EntityPtr(); - - if (old == values.end()) - values[id] = o->getValue(); else - old->second = o->getValue(); - - return next(entity); + return next(entity); } protected: - std::shared_ptr m_state; + PipelineContextPtr m_context; }; } // namespace mtconnect::pipeline diff --git a/src/mtconnect/pipeline/pipeline_contract.hpp b/src/mtconnect/pipeline/pipeline_contract.hpp index 77976f63..f5dc11ea 100644 --- a/src/mtconnect/pipeline/pipeline_contract.hpp +++ b/src/mtconnect/pipeline/pipeline_contract.hpp @@ -34,8 +34,12 @@ namespace mtconnect { class Asset; using AssetPtr = std::shared_ptr; } // namespace asset + namespace observation { + class Observation; + } using DataItemPtr = std::shared_ptr; using DevicePtr = std::shared_ptr; + using ObservationPtr = std::shared_ptr; using StringList = std::list; namespace observation { @@ -96,6 +100,8 @@ namespace mtconnect { /// @brief The source is no longer viable, do not try to reconnect /// @param[in] identity the identity of the source virtual void sourceFailed(const std::string &identity) = 0; + + virtual bool isDuplicate(const ObservationPtr &obs) const = 0; }; } // namespace pipeline } // namespace mtconnect diff --git a/src/mtconnect/printer/json_printer_helper.hpp b/src/mtconnect/printer/json_printer_helper.hpp index 9e65d5fe..fb6210a0 100644 --- a/src/mtconnect/printer/json_printer_helper.hpp +++ b/src/mtconnect/printer/json_printer_helper.hpp @@ -363,13 +363,14 @@ namespace mtconnect::printer { using ObjectPtr = std::optional; /// @brief alias for a smart pointer to an AutoJsonArray type for the WriterType using ArrayPtr = std::optional; - + /// @brief Structure that holds either an object or an array - struct StackMember { + struct StackMember + { ObjectPtr m_object; ArrayPtr m_array; }; - + /// @brief alias for a list of variants using Stack = std::list; @@ -398,7 +399,7 @@ namespace mtconnect::printer { auto &member = m_stack.emplace_back(); member.m_array.emplace(base::m_writer); } - + /// @brief Closes open objects and arrays, stopping when the size is `<=` to `to` /// @param[in] to stop when the size of the stack is equal to `to`, defaults to `0` void clear(size_t to = 0) diff --git a/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp b/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp index 5174d8a8..a27fdb25 100644 --- a/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp +++ b/src/mtconnect/sink/mqtt_sink/mqtt_service.cpp @@ -41,7 +41,7 @@ namespace mtconnect { MqttService::MqttService(boost::asio::io_context &context, sink::SinkContractPtr &&contract, const ConfigOptions &options, const ptree &config) - : Sink("MqttService", move(contract)), m_context(context), m_options(options) + : Sink("MqttService", std::move(contract)), m_context(context), m_options(options) { auto jsonPrinter = dynamic_cast(m_sinkContract->getPrinter("json")); m_jsonPrinter = make_unique(jsonPrinter->getJsonVersion()); @@ -93,11 +93,11 @@ namespace mtconnect { if (IsOptionSet(m_options, configuration::MqttTls)) { - m_client = make_shared(m_context, m_options, move(clientHandler)); + m_client = make_shared(m_context, m_options, std::move(clientHandler)); } else { - m_client = make_shared(m_context, m_options, move(clientHandler)); + m_client = make_shared(m_context, m_options, std::move(clientHandler)); } } diff --git a/src/mtconnect/sink/rest_sink/rest_service.cpp b/src/mtconnect/sink/rest_sink/rest_service.cpp index 6ba9067d..66fe262d 100644 --- a/src/mtconnect/sink/rest_sink/rest_service.cpp +++ b/src/mtconnect/sink/rest_sink/rest_service.cpp @@ -41,7 +41,7 @@ namespace mtconnect { namespace sink::rest_sink { RestService::RestService(asio::io_context &context, SinkContractPtr &&contract, const ConfigOptions &options, const ptree &config) - : Sink("RestService", move(contract)), + : Sink("RestService", std::move(contract)), m_context(context), m_strand(context), m_schemaVersion(GetOption(options, config::SchemaVersion).value_or("x.y")), @@ -68,7 +68,7 @@ namespace mtconnect { auto printer = m_sinkContract->getPrinter("xml"); auto doc = printError(printer, "INVALID_REQUEST", msg); ResponsePtr resp = std::make_unique(st, doc, printer->mimeType()); - session->writeFailureResponse(move(resp)); + session->writeFailureResponse(std::move(resp)); }); auto xmlPrinter = dynamic_cast(m_sinkContract->getPrinter("xml")); @@ -366,7 +366,7 @@ namespace mtconnect { static inline void respond(rest_sink::SessionPtr session, rest_sink::ResponsePtr &&response) { - session->writeResponse(move(response)); + session->writeResponse(std::move(response)); } void RestService::createFileRoutings() @@ -381,12 +381,12 @@ namespace mtconnect { ResponsePtr response = make_unique(rest_sink::status::permanent_redirect, file->m_buffer, file->m_mimeType); response->m_location = *file->m_redirect; - session->writeResponse(move(response)); + session->writeResponse(std::move(response)); } else { ResponsePtr response = make_unique(rest_sink::status::ok, file); - session->writeResponse(move(response)); + session->writeResponse(std::move(response)); } } return bool(file); diff --git a/src/mtconnect/sink/rest_sink/server.cpp b/src/mtconnect/sink/rest_sink/server.cpp index 0ba986f7..a1728ade 100644 --- a/src/mtconnect/sink/rest_sink/server.cpp +++ b/src/mtconnect/sink/rest_sink/server.cpp @@ -197,7 +197,7 @@ namespace mtconnect::sink::rest_sink { if (m_tlsEnabled) { auto dectector = - make_shared(move(socket), m_sslContext, m_tlsOnly, m_allowPuts, + make_shared(std::move(socket), m_sslContext, m_tlsOnly, m_allowPuts, m_allowPutsFrom, m_fields, dispatcher, m_errorFunction); dectector->run(); @@ -205,9 +205,9 @@ namespace mtconnect::sink::rest_sink { else { boost::beast::flat_buffer buffer; - boost::beast::tcp_stream stream(move(socket)); - auto session = make_shared(move(stream), move(buffer), m_fields, dispatcher, - m_errorFunction); + boost::beast::tcp_stream stream(std::move(socket)); + auto session = make_shared(std::move(stream), std::move(buffer), m_fields, + dispatcher, m_errorFunction); if (!m_allowPutsFrom.empty()) session->allowPutsFrom(m_allowPutsFrom); diff --git a/src/mtconnect/sink/rest_sink/server.hpp b/src/mtconnect/sink/rest_sink/server.hpp index d923134e..c7c84557 100644 --- a/src/mtconnect/sink/rest_sink/server.hpp +++ b/src/mtconnect/sink/rest_sink/server.hpp @@ -77,7 +77,7 @@ namespace mtconnect::sink::rest_sink { m_errorFunction = [](SessionPtr session, status st, const std::string &msg) { ResponsePtr response = std::make_unique(st, msg, "text/plain"); - session->writeFailureResponse(move(response)); + session->writeFailureResponse(std::move(response)); return true; }; @@ -174,7 +174,7 @@ namespace mtconnect::sink::rest_sink { { LOG(error) << session->getRemote().address() << ": Error processing request: " << re.what(); ResponsePtr resp = std::make_unique(re); - session->writeResponse(move(resp)); + session->writeResponse(std::move(resp)); } catch (ParameterError &pe) { diff --git a/src/mtconnect/sink/rest_sink/session_impl.cpp b/src/mtconnect/sink/rest_sink/session_impl.cpp index 2f6d6e43..ec5f210c 100644 --- a/src/mtconnect/sink/rest_sink/session_impl.cpp +++ b/src/mtconnect/sink/rest_sink/session_impl.cpp @@ -376,9 +376,10 @@ namespace mtconnect::sink::rest_sink { using namespace std; using namespace http; namespace fs = std::filesystem; + using std::move; m_complete = complete; - m_outgoing = move(responsePtr); + m_outgoing = std::move(responsePtr); if (m_outgoing->m_file && !m_outgoing->m_file->m_cached) { @@ -452,12 +453,12 @@ namespace mtconnect::sink::rest_sink { { if (m_streaming) { - m_outgoing = move(response); + m_outgoing = std::move(response); writeChunk(m_outgoing->m_body, [this] { closeStream(); }); } else { - writeResponse(move(response)); + writeResponse(std::move(response)); } } @@ -475,7 +476,7 @@ namespace mtconnect::sink::rest_sink { HttpsSession(boost::beast::tcp_stream &&socket, boost::asio::ssl::context &context, boost::beast::flat_buffer &&buffer, const FieldList &list, Dispatch dispatch, ErrorFunction error) - : SessionImpl(move(buffer), list, dispatch, error), + : SessionImpl(std::move(buffer), list, dispatch, error), m_stream(std::move(socket), context) { m_remote = beast::get_lowest_layer(m_stream).socket().remote_endpoint(); @@ -571,14 +572,15 @@ namespace mtconnect::sink::rest_sink { { LOG(debug) << "Received HTTPS request"; // Create https session - session = std::make_shared(move(m_stream), m_tlsContext, move(m_buffer), - m_fields, m_dispatch, m_errorFunction); + session = + std::make_shared(std::move(m_stream), m_tlsContext, std::move(m_buffer), + m_fields, m_dispatch, m_errorFunction); } else { LOG(debug) << "Received HTTP request"; // Create http session - session = std::make_shared(move(m_stream), move(m_buffer), m_fields, + session = std::make_shared(std::move(m_stream), std::move(m_buffer), m_fields, m_dispatch, m_errorFunction); // Start the session, but set diff --git a/src/mtconnect/sink/rest_sink/session_impl.hpp b/src/mtconnect/sink/rest_sink/session_impl.hpp index 6ab2b982..404d25b9 100644 --- a/src/mtconnect/sink/rest_sink/session_impl.hpp +++ b/src/mtconnect/sink/rest_sink/session_impl.hpp @@ -118,7 +118,8 @@ namespace mtconnect { /// @param error error format function HttpSession(boost::beast::tcp_stream &&stream, boost::beast::flat_buffer &&buffer, const FieldList &list, Dispatch dispatch, ErrorFunction error) - : SessionImpl(move(buffer), list, dispatch, error), m_stream(std::move(stream)) + : SessionImpl(std::move(buffer), list, dispatch, error), + m_stream(std::move(stream)) { m_remote = m_stream.socket().remote_endpoint(); } diff --git a/src/mtconnect/source/adapter/adapter_pipeline.cpp b/src/mtconnect/source/adapter/adapter_pipeline.cpp index acf2a1d3..fb841f5e 100644 --- a/src/mtconnect/source/adapter/adapter_pipeline.cpp +++ b/src/mtconnect/source/adapter/adapter_pipeline.cpp @@ -125,15 +125,15 @@ namespace mtconnect { if (IsOptionSet(m_options, configuration::UpcaseDataItemValue)) next = next->bind(make_shared()); + // Convert values + if (IsOptionSet(m_options, configuration::ConversionRequired)) + next = next->bind(make_shared()); + // Filter dups, by delta, and by period next = next->bind(make_shared(m_context)); next = next->bind(make_shared(m_context)); next = next->bind(make_shared(m_context, m_strand)); - // Convert values - if (IsOptionSet(m_options, configuration::ConversionRequired)) - next = next->bind(make_shared()); - // Deliver std::optional obsMetrics; obsMetrics = m_identity + "_observation_update_rate"; diff --git a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp index d17ce64b..55892a4c 100644 --- a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp +++ b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp @@ -109,10 +109,10 @@ namespace mtconnect { if (IsOptionSet(m_options, configuration::MqttTls)) m_client = make_shared(m_ioContext, m_options, - move(clientHandler)); + std::move(clientHandler)); else m_client = make_shared(m_ioContext, m_options, - move(clientHandler)); + std::move(clientHandler)); m_identity = m_client->getIdentity(); m_name = m_client->getUrl(); diff --git a/test/agent_adapter_test.cpp b/test/agent_adapter_test.cpp index b193b5fa..d672f77c 100644 --- a/test/agent_adapter_test.cpp +++ b/test/agent_adapter_test.cpp @@ -82,6 +82,7 @@ struct MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &dev, bool flag) override {} void sourceFailed(const std::string &id) override { m_failed = true; } + bool isDuplicate(const ObservationPtr &obs) const override { return false; } bool m_failed = false; std::string m_result; diff --git a/test/agent_test.cpp b/test/agent_test.cpp index f023f1fa..1ed4fc5c 100644 --- a/test/agent_test.cpp +++ b/test/agent_test.cpp @@ -1483,7 +1483,8 @@ TEST_F(AgentTest, ConditionSequence) } m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|lp|FAULT|4200|ALARM_D||4200 ALARM_D Power on effective parameter set"); + "2021-02-01T12:00:00Z|lp|FAULT|4200|ALARM_D|LOW|4200 ALARM_D Power on effective parameter " + "set"); { PARSE_XML_RESPONSE("/current"); diff --git a/test/agent_test_helper.hpp b/test/agent_test_helper.hpp index 6289828e..bb83eac2 100644 --- a/test/agent_test_helper.hpp +++ b/test/agent_test_helper.hpp @@ -74,7 +74,7 @@ namespace mtconnect { } else { - writeResponse(move(response), complete); + writeResponse(std::move(response), complete); } } void beginStreaming(const std::string &mimeType, Complete complete) override @@ -212,8 +212,8 @@ class AgentTestHelper auto sinkContract = m_agent->makeSinkContract(); sinkContract->m_pipelineContext = m_context; - auto sink = m_sinkFactory.make("RestService", "RestService", m_ioContext, move(sinkContract), - options, ptree {}); + auto sink = m_sinkFactory.make("RestService", "RestService", m_ioContext, + std::move(sinkContract), options, ptree {}); m_restService = std::dynamic_pointer_cast(sink); m_agent->addSink(m_restService); @@ -222,7 +222,7 @@ class AgentTestHelper auto mqttContract = m_agent->makeSinkContract(); mqttContract->m_pipelineContext = m_context; auto mqttsink = m_sinkFactory.make("MqttService", "MqttService", m_ioContext, - move(mqttContract), options, ptree {}); + std::move(mqttContract), options, ptree {}); m_mqttService = std::dynamic_pointer_cast(mqttsink); m_agent->addSink(m_mqttService); } diff --git a/test/data_item_mapping_test.cpp b/test/data_item_mapping_test.cpp index eda6f922..f4033fdf 100644 --- a/test/data_item_mapping_test.cpp +++ b/test/data_item_mapping_test.cpp @@ -58,6 +58,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} + bool isDuplicate(const ObservationPtr &obs) const override { return false; } std::map &m_dataItems; }; diff --git a/test/duplicate_filter_test.cpp b/test/duplicate_filter_test.cpp index a96d8e95..8f47b879 100644 --- a/test/duplicate_filter_test.cpp +++ b/test/duplicate_filter_test.cpp @@ -21,7 +21,9 @@ #include +#include "mtconnect/buffer/checkpoint.hpp" #include "mtconnect/observation/observation.hpp" +#include "mtconnect/pipeline/deliver.hpp" #include "mtconnect/pipeline/delta_filter.hpp" #include "mtconnect/pipeline/duplicate_filter.hpp" #include "mtconnect/pipeline/period_filter.hpp" @@ -56,15 +58,23 @@ class MockPipelineContract : public PipelineContract return m_dataItems[name]; } void eachDataItem(EachDataItem fun) override {} - void deliverObservation(observation::ObservationPtr obs) override {} + void deliverObservation(observation::ObservationPtr obs) override + { + m_checkpoint.addObservation(obs); + } void deliverAsset(AssetPtr) override {} void deliverDevice(DevicePtr) override {} void deliverAssetCommand(entity::EntityPtr) override {} void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} + bool isDuplicate(const ObservationPtr &obs) const override + { + return m_checkpoint.isDuplicate(obs); + } std::map &m_dataItems; + buffer::Checkpoint m_checkpoint; }; class DuplicateFilterTest : public testing::Test @@ -115,6 +125,7 @@ TEST_F(DuplicateFilterTest, test_simple_event) auto filter = make_shared(m_context); m_mapper->bind(filter); + filter->bind(make_shared(m_context)); auto os1 = observe({"a", "READY"}); auto list1 = os1->getValue(); @@ -136,6 +147,7 @@ TEST_F(DuplicateFilterTest, test_simple_sample) auto filter = make_shared(m_context); m_mapper->bind(filter); + filter->bind(make_shared(m_context)); auto os1 = observe({"a", "1.5"}); auto list1 = os1->getValue(); @@ -169,6 +181,7 @@ TEST_F(DuplicateFilterTest, test_minimum_delta) auto rate = make_shared(m_context); filter->bind(rate); + rate->bind(make_shared(m_context)); { auto os = observe({"a", "1.5"}); @@ -201,3 +214,7 @@ TEST_F(DuplicateFilterTest, test_minimum_delta) ASSERT_EQ(1, list.size()); } } + +TEST_F(DuplicateFilterTest, test_condition_duplicates) { GTEST_SKIP(); } + +TEST_F(DuplicateFilterTest, test_data_set_duplicates) { GTEST_SKIP(); } diff --git a/test/embedded_ruby_test.cpp b/test/embedded_ruby_test.cpp index 944be9b2..b2c20d93 100644 --- a/test/embedded_ruby_test.cpp +++ b/test/embedded_ruby_test.cpp @@ -93,6 +93,7 @@ namespace { void deliverCommand(entity::EntityPtr c) override { m_command = c; } void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} + bool isDuplicate(const ObservationPtr &obs) const override { return false; } const Agent *m_agent; ObservationPtr m_observation; diff --git a/test/http_server_test.cpp b/test/http_server_test.cpp index 9c64f9c3..a0e86274 100644 --- a/test/http_server_test.cpp +++ b/test/http_server_test.cpp @@ -336,7 +336,7 @@ TEST_F(RestServiceTest, simple_request_response) string("Device given as: ") + get(request->m_parameters.find("device")->second); else resp->m_body = "All Devices"; - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -374,7 +374,7 @@ TEST_F(RestServiceTest, request_response_with_query_parameters) ResponsePtr resp = make_unique(status::ok); resp->m_body = "Done"; - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -423,7 +423,7 @@ TEST_F(RestServiceTest, request_put_when_put_allowed) auto handler = [&](SessionPtr session, RequestPtr request) -> bool { EXPECT_EQ(http::verb::put, request->m_verb); ResponsePtr resp = make_unique(status::ok, "Put ok"); - session->writeResponse(move(resp), []() { + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; return true; }); @@ -468,7 +468,7 @@ TEST_F(RestServiceTest, request_put_when_put_allowed_from_ip_address) auto handler = [&](SessionPtr session, RequestPtr request) -> bool { EXPECT_EQ(http::verb::put, request->m_verb); ResponsePtr resp = make_unique(status::ok, "Put ok"); - session->writeResponse(move(resp), []() { + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; return true; }); @@ -494,7 +494,7 @@ TEST_F(RestServiceTest, request_with_connect_close) auto handler = [&](SessionPtr session, RequestPtr request) -> bool { savedSession = session; ResponsePtr resp = make_unique(status::ok, "Probe"); - session->writeResponse(move(resp), []() { + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; return true; }); @@ -524,7 +524,7 @@ TEST_F(RestServiceTest, put_content_to_server) body = request->m_body; ResponsePtr resp = make_unique(status::ok); - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -549,7 +549,7 @@ TEST_F(RestServiceTest, put_content_with_put_values) ct = request->m_contentType; ResponsePtr resp = make_unique(status::ok); - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -654,7 +654,7 @@ TEST_F(RestServiceTest, additional_header_fields) auto probe = [&](SessionPtr session, RequestPtr request) -> bool { ResponsePtr resp = make_unique(status::ok, "Done"); - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -695,7 +695,7 @@ TEST_F(RestServiceTest, failure_when_tls_only) auto probe = [&](SessionPtr session, RequestPtr request) -> bool { ResponsePtr resp = make_unique(status::ok, "Done"); - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; diff --git a/test/mqtt_isolated_test.cpp b/test/mqtt_isolated_test.cpp index 94b49b40..b5db41c4 100644 --- a/test/mqtt_isolated_test.cpp +++ b/test/mqtt_isolated_test.cpp @@ -157,12 +157,12 @@ class MqttIsolatedUnitTest : public testing::Test if (withTlsOption) { m_client = make_shared(m_agentTestHelper->m_ioContext, - opts, move(handler)); + opts, std::move(handler)); } else { m_client = make_shared(m_agentTestHelper->m_ioContext, - opts, move(handler)); + opts, std::move(handler)); } } @@ -198,7 +198,7 @@ TEST_F(MqttIsolatedUnitTest, mqtt_client_should_connect_to_broker) auto handler = make_unique(); - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); @@ -351,7 +351,7 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls) auto handler = make_unique(); - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); @@ -384,7 +384,7 @@ TEST_F(MqttIsolatedUnitTest, should_connect_using_tls_ws) MergeOptions(opts, {{MqttPort, m_port}}); m_client = make_shared(m_agentTestHelper->m_ioContext, - opts, move(handler)); + opts, std::move(handler)); ASSERT_TRUE(startClient()); @@ -420,7 +420,7 @@ TEST_F(MqttIsolatedUnitTest, should_conenct_using_tls_authentication) ConfigOptions opts(options); MergeOptions(opts, {{MqttPort, m_port}}); - createClient(opts, move(handler)); + createClient(opts, std::move(handler)); ASSERT_TRUE(startClient()); diff --git a/test/mqtt_sink_test.cpp b/test/mqtt_sink_test.cpp index a0d04ed6..22902711 100644 --- a/test/mqtt_sink_test.cpp +++ b/test/mqtt_sink_test.cpp @@ -154,7 +154,7 @@ class MqttSinkTest : public testing::Test {AutoAvailable, false}, {RealTime, false}}); m_client = make_shared(m_agentTestHelper->m_ioContext, - opts, move(handler)); + opts, std::move(handler)); } bool startClient() @@ -257,7 +257,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_device) gotDevice = true; }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Device/000"); @@ -293,7 +293,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Streams) foundLineDataItem = true; } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent(); @@ -330,7 +330,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Asset) gotControllerDataItem = true; } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent(); @@ -369,7 +369,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_RotaryMode) } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent("/samples/discrete_example.xml"); @@ -406,7 +406,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Dataset) gotControllerDataItem = true; } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent("/samples/data_set.xml"); auto service = m_agentTestHelper->getMqttService(); @@ -472,7 +472,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Table) } } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent("/samples/data_set.xml"); auto service = m_agentTestHelper->getMqttService(); @@ -520,7 +520,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_Temperature) } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent(); @@ -560,7 +560,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_LinearLoad) } } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent(); auto service = m_agentTestHelper->getMqttService(); @@ -600,7 +600,7 @@ TEST_F(MqttSinkTest, mqtt_sink_should_publish_DynamicCalibration) } }; - createClient(options, move(handler)); + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); createAgent(); diff --git a/test/mtconnect_xml_transform_test.cpp b/test/mtconnect_xml_transform_test.cpp index 32e9cdc0..9c273876 100644 --- a/test/mtconnect_xml_transform_test.cpp +++ b/test/mtconnect_xml_transform_test.cpp @@ -58,6 +58,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} + bool isDuplicate(const ObservationPtr &obs) const override { return false; } DevicePtr m_device; }; diff --git a/test/period_filter_test.cpp b/test/period_filter_test.cpp index 351a950c..51d0032b 100644 --- a/test/period_filter_test.cpp +++ b/test/period_filter_test.cpp @@ -66,6 +66,7 @@ struct MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} + bool isDuplicate(const ObservationPtr &obs) const override { return false; } std::map &m_dataItems; diff --git a/test/response_document_test.cpp b/test/response_document_test.cpp index 92a65079..b548d661 100644 --- a/test/response_document_test.cpp +++ b/test/response_document_test.cpp @@ -60,6 +60,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} + bool isDuplicate(const ObservationPtr &obs) const override { return false; } DevicePtr m_device; }; diff --git a/test/testsink_service.hpp b/test/testsink_service.hpp index 96a72630..affd7c3f 100644 --- a/test/testsink_service.hpp +++ b/test/testsink_service.hpp @@ -29,7 +29,7 @@ namespace mtconnect { public: sink_plugin_test(const string &name, boost::asio::io_context &context, sink::SinkContractPtr &&contract, const ConfigOptions &config) - : sink::Sink(name, move(contract)) + : sink::Sink(name, std::move(contract)) {} ~sink_plugin_test() = default; diff --git a/test/tls_http_server_test.cpp b/test/tls_http_server_test.cpp index 393a0eae..d3f4fa19 100644 --- a/test/tls_http_server_test.cpp +++ b/test/tls_http_server_test.cpp @@ -396,7 +396,7 @@ TEST_F(TlsRestServiceTest, create_server_and_load_certificates) string("Device given as: ") + get(request->m_parameters.find("device")->second); else resp->m_body = "All Devices"; - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -519,7 +519,7 @@ TEST_F(TlsRestServiceTest, check_failed_client_certificate) auto probe = [&](SessionPtr session, RequestPtr request) -> bool { ResponsePtr resp = make_unique(status::ok); resp->m_body = "Done"; - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -548,7 +548,7 @@ TEST_F(TlsRestServiceTest, check_valid_client_certificate) auto probe = [&](SessionPtr session, RequestPtr request) -> bool { ResponsePtr resp = make_unique(status::ok); resp->m_body = "Done"; - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; @@ -578,7 +578,7 @@ TEST_F(TlsRestServiceTest, check_valid_client_certificate_without_server_ca) auto probe = [&](SessionPtr session, RequestPtr request) -> bool { ResponsePtr resp = make_unique(status::ok); resp->m_body = "Done"; - session->writeResponse(move(resp), []() { cout << "Written" << endl; }); + session->writeResponse(std::move(resp), []() { cout << "Written" << endl; }); return true; }; diff --git a/test/topic_mapping_test.cpp b/test/topic_mapping_test.cpp index ab277ad0..b33158aa 100644 --- a/test/topic_mapping_test.cpp +++ b/test/topic_mapping_test.cpp @@ -60,6 +60,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} + bool isDuplicate(const ObservationPtr &obs) const override { return false; } std::map &m_dataItems; std::map &m_devices; From 0859c714243ce1570e30ca5d789d7a3f785d5a32 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Sun, 2 Apr 2023 18:20:04 -0700 Subject: [PATCH 2/4] Refactored duplicate filtering to include conditions and data sets --- src/mtconnect/agent.hpp | 4 +- src/mtconnect/buffer/checkpoint.cpp | 21 ++++++---- src/mtconnect/buffer/checkpoint.hpp | 40 ++++++++++---------- src/mtconnect/buffer/circular_buffer.hpp | 14 +------ src/mtconnect/entity/requirement.hpp | 1 + src/mtconnect/observation/observation.hpp | 1 + src/mtconnect/pipeline/duplicate_filter.hpp | 12 ++---- src/mtconnect/pipeline/pipeline_contract.hpp | 2 +- test/agent_adapter_test.cpp | 2 +- test/data_item_mapping_test.cpp | 2 +- test/duplicate_filter_test.cpp | 4 +- test/embedded_ruby_test.cpp | 2 +- test/mtconnect_xml_transform_test.cpp | 2 +- test/period_filter_test.cpp | 2 +- test/response_document_test.cpp | 2 +- test/topic_mapping_test.cpp | 2 +- 16 files changed, 54 insertions(+), 59 deletions(-) diff --git a/src/mtconnect/agent.hpp b/src/mtconnect/agent.hpp index 994251fd..8483855c 100644 --- a/src/mtconnect/agent.hpp +++ b/src/mtconnect/agent.hpp @@ -551,9 +551,9 @@ namespace mtconnect { void sourceFailed(const std::string &identity) override { m_agent->sourceFailed(identity); } - bool isDuplicate(const ObservationPtr &obs) const override + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { - return m_agent->getCircularBuffer().isDuplicate(obs); + return m_agent->getCircularBuffer().checkDuplicate(obs); } protected: diff --git a/src/mtconnect/buffer/checkpoint.cpp b/src/mtconnect/buffer/checkpoint.cpp index 106c11e2..8652da0f 100644 --- a/src/mtconnect/buffer/checkpoint.cpp +++ b/src/mtconnect/buffer/checkpoint.cpp @@ -225,12 +225,12 @@ namespace mtconnect { } } - bool Checkpoint::dataSetDifference(ObservationPtr obs) const + ObservationPtr Checkpoint::dataSetDifference(const ObservationPtr &obs) const { if (obs->isOrphan()) - return false; + return nullptr; - auto setEvent = dynamic_pointer_cast(obs); + auto setEvent = dynamic_pointer_cast(obs); auto item = obs->getDataItem(); if (item->isDataSet() && !setEvent->getDataSet().empty() && !obs->hasProperty("resetTriggered")) @@ -262,14 +262,21 @@ namespace mtconnect { if (changed) { - setEvent->setDataSet(eventSet); + if (!eventSet.empty()) + { + auto copy = dynamic_pointer_cast(setEvent->copy()); + copy->setDataSet(eventSet); + return copy; + } + else + { + return nullptr; + } } - - return !eventSet.empty(); } } - return true; + return obs; } } // namespace buffer } // namespace mtconnect diff --git a/src/mtconnect/buffer/checkpoint.hpp b/src/mtconnect/buffer/checkpoint.hpp index 4b4d91c6..226b9f54 100644 --- a/src/mtconnect/buffer/checkpoint.hpp +++ b/src/mtconnect/buffer/checkpoint.hpp @@ -48,13 +48,13 @@ namespace mtconnect::buffer { /// @brief If this is a data set event, diff the value /// @param[in] observation the data set observation - /// @return `true` if the data set changed - bool dataSetDifference(observation::ObservationPtr observation) const; + /// @return The observation or a copy if the data set changed + observation::ObservationPtr dataSetDifference(const observation::ObservationPtr &observation) const; /// @brief Checks if the observation is a duplicate with existing observations /// @param[in] obs the observation /// @return `true` if the obsrvation is a duplicate - bool isDuplicate(const observation::ObservationPtr &obs) const + const observation::ObservationPtr checkDuplicate(const observation::ObservationPtr &obs) const { using namespace observation; using namespace std; @@ -67,7 +67,9 @@ namespace mtconnect::buffer { { auto &oldObs = old->second; if (obs->isUnavailable() != oldObs->isUnavailable()) - return false; + return obs; + else if (obs->isUnavailable()) + return nullptr; if (di->isCondition()) { @@ -79,9 +81,9 @@ namespace mtconnect::buffer { if (cond->getLevel() == Condition::NORMAL && cond->getCode().empty()) { if (oldCond->getLevel() == Condition::NORMAL && oldCond->getCode().empty()) - return true; + return nullptr; else - return false; + return obs; } // If there is already an active condition with this code, @@ -89,50 +91,48 @@ namespace mtconnect::buffer { if (const auto &e = oldCond->find(cond->getCode())) { if (cond->getLevel() != e->getLevel()) - return false; + return obs; if ((cond->hasValue() != e->hasValue()) || (cond->hasValue() && cond->getValue() != e->getValue())) - return false; + return obs; if ((cond->hasProperty("qualifier") != e->hasProperty("qualifier")) || (cond->hasProperty("qualifier") && cond->get("qualifier") != e->get("qualifier"))) - return false; + return obs; if ((cond->hasProperty("nativeSeverity") != e->hasProperty("nativeSeverity")) || (cond->hasProperty("nativeSeverity") && cond->get("nativeSeverity") != e->get("nativeSeverity"))) - return false; + return obs; - return true; + return nullptr; } else { - return false; + return obs; } } else if (!di->isDiscrete()) { if (di->isDataSet()) { - auto set = dynamic_pointer_cast(obs); - if (!obs->hasProperty("resetTriggered") && !set->getDataSet().empty()) - { - auto oldSet = dynamic_pointer_cast(oldObs); - return set->getDataSet() != oldSet->getDataSet(); - } + return dataSetDifference(obs); } else { auto &value = obs->getValue(); auto &oldValue = oldObs->getValue(); - return value == oldValue; + if (value == oldValue) + return nullptr; + else + return obs; } } } - return false; + return obs; } /// @brief copy another checkpoint to this checkpoint diff --git a/src/mtconnect/buffer/circular_buffer.hpp b/src/mtconnect/buffer/circular_buffer.hpp index 8b47b0a9..b2d4f64e 100644 --- a/src/mtconnect/buffer/circular_buffer.hpp +++ b/src/mtconnect/buffer/circular_buffer.hpp @@ -121,16 +121,6 @@ namespace mtconnect::buffer { std::lock_guard lock(m_sequenceLock); auto dataItem = observation->getDataItem(); - - if (!dataItem->isDiscrete()) - { - if (!observation->isUnavailable() && dataItem->isDataSet() && - !m_latest.dataSetDifference(observation)) - { - return 0; - } - } - auto seq = m_sequence; observation->setSequence(seq); @@ -176,10 +166,10 @@ namespace mtconnect::buffer { /// @brief Check if observation is a duplicate by validating against the latest checkpoint /// @param[in] obs the observation to check /// @return `true` if the observation is a duplicate - bool isDuplicate(const observation::ObservationPtr &obs) const + const observation::ObservationPtr checkDuplicate(const observation::ObservationPtr &obs) const { std::lock_guard lock(m_sequenceLock); - return m_latest.isDuplicate(obs); + return m_latest.checkDuplicate(obs); } /// @brief Get a checkpoint at a sequence number diff --git a/src/mtconnect/entity/requirement.hpp b/src/mtconnect/entity/requirement.hpp index b68c305f..e92cf6e5 100644 --- a/src/mtconnect/entity/requirement.hpp +++ b/src/mtconnect/entity/requirement.hpp @@ -46,6 +46,7 @@ namespace mtconnect::entity { class Entity; using EntityPtr = std::shared_ptr; + using ConstEntityPtr = std::shared_ptr; /// @brief List of shared entities using EntityList = std::list>; /// @brief Vector of doubles diff --git a/src/mtconnect/observation/observation.hpp b/src/mtconnect/observation/observation.hpp index ecb85d63..7548b3dd 100644 --- a/src/mtconnect/observation/observation.hpp +++ b/src/mtconnect/observation/observation.hpp @@ -36,6 +36,7 @@ namespace mtconnect::observation { class Observation; using ObservationPtr = std::shared_ptr; + using ConstObservationPtr = std::shared_ptr; using ObservationList = std::list; /// @brief Abstract observation diff --git a/src/mtconnect/pipeline/duplicate_filter.hpp b/src/mtconnect/pipeline/duplicate_filter.hpp index 05800514..9ca9bfdd 100644 --- a/src/mtconnect/pipeline/duplicate_filter.hpp +++ b/src/mtconnect/pipeline/duplicate_filter.hpp @@ -36,12 +36,7 @@ namespace mtconnect::pipeline { /// @param context the context DuplicateFilter(PipelineContextPtr context) : Transform("DuplicateFilter"), m_context(context) { - using namespace observation; - static constexpr auto lambda = [](const Observation &o) { - return !o.isOrphan() && !o.getDataItem()->isDiscrete() && !o.getDataItem()->isDataSet(); - }; - m_guard = LambdaGuard>(lambda, RUN) || - TypeGuard(SKIP); + m_guard = TypeGuard(RUN); } ~DuplicateFilter() override = default; @@ -53,10 +48,11 @@ namespace mtconnect::pipeline { if (o->isOrphan()) return entity::EntityPtr(); - if (m_context->m_contract->isDuplicate(o)) + auto o2 = m_context->m_contract->checkDuplicate(o); + if (!o2) return entity::EntityPtr(); else - return next(entity); + return next(o2); } protected: diff --git a/src/mtconnect/pipeline/pipeline_contract.hpp b/src/mtconnect/pipeline/pipeline_contract.hpp index f5dc11ea..76c345d4 100644 --- a/src/mtconnect/pipeline/pipeline_contract.hpp +++ b/src/mtconnect/pipeline/pipeline_contract.hpp @@ -101,7 +101,7 @@ namespace mtconnect { /// @param[in] identity the identity of the source virtual void sourceFailed(const std::string &identity) = 0; - virtual bool isDuplicate(const ObservationPtr &obs) const = 0; + virtual const ObservationPtr checkDuplicate(const ObservationPtr &obs) const = 0; }; } // namespace pipeline } // namespace mtconnect diff --git a/test/agent_adapter_test.cpp b/test/agent_adapter_test.cpp index d672f77c..386e2787 100644 --- a/test/agent_adapter_test.cpp +++ b/test/agent_adapter_test.cpp @@ -82,7 +82,7 @@ struct MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &dev, bool flag) override {} void sourceFailed(const std::string &id) override { m_failed = true; } - bool isDuplicate(const ObservationPtr &obs) const override { return false; } + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } bool m_failed = false; std::string m_result; diff --git a/test/data_item_mapping_test.cpp b/test/data_item_mapping_test.cpp index f4033fdf..4a7f9a96 100644 --- a/test/data_item_mapping_test.cpp +++ b/test/data_item_mapping_test.cpp @@ -58,7 +58,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} - bool isDuplicate(const ObservationPtr &obs) const override { return false; } + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } std::map &m_dataItems; }; diff --git a/test/duplicate_filter_test.cpp b/test/duplicate_filter_test.cpp index 8f47b879..1385d1ac 100644 --- a/test/duplicate_filter_test.cpp +++ b/test/duplicate_filter_test.cpp @@ -68,9 +68,9 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} - bool isDuplicate(const ObservationPtr &obs) const override + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { - return m_checkpoint.isDuplicate(obs); + return m_checkpoint.checkDuplicate(obs); } std::map &m_dataItems; diff --git a/test/embedded_ruby_test.cpp b/test/embedded_ruby_test.cpp index b2c20d93..e5aa6df2 100644 --- a/test/embedded_ruby_test.cpp +++ b/test/embedded_ruby_test.cpp @@ -93,7 +93,7 @@ namespace { void deliverCommand(entity::EntityPtr c) override { m_command = c; } void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} - bool isDuplicate(const ObservationPtr &obs) const override { return false; } + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } const Agent *m_agent; ObservationPtr m_observation; diff --git a/test/mtconnect_xml_transform_test.cpp b/test/mtconnect_xml_transform_test.cpp index 9c273876..2e8f947a 100644 --- a/test/mtconnect_xml_transform_test.cpp +++ b/test/mtconnect_xml_transform_test.cpp @@ -58,7 +58,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} - bool isDuplicate(const ObservationPtr &obs) const override { return false; } + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } DevicePtr m_device; }; diff --git a/test/period_filter_test.cpp b/test/period_filter_test.cpp index 51d0032b..c908c656 100644 --- a/test/period_filter_test.cpp +++ b/test/period_filter_test.cpp @@ -66,7 +66,7 @@ struct MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} - bool isDuplicate(const ObservationPtr &obs) const override { return false; } + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } std::map &m_dataItems; diff --git a/test/response_document_test.cpp b/test/response_document_test.cpp index b548d661..9926cd0a 100644 --- a/test/response_document_test.cpp +++ b/test/response_document_test.cpp @@ -60,7 +60,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} - bool isDuplicate(const ObservationPtr &obs) const override { return false; } + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } DevicePtr m_device; }; diff --git a/test/topic_mapping_test.cpp b/test/topic_mapping_test.cpp index b33158aa..a24856a2 100644 --- a/test/topic_mapping_test.cpp +++ b/test/topic_mapping_test.cpp @@ -60,7 +60,7 @@ class MockPipelineContract : public PipelineContract void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} - bool isDuplicate(const ObservationPtr &obs) const override { return false; } + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } std::map &m_dataItems; std::map &m_devices; From 171ab1f7c3f82a5843f6484c98ef81a2afba2a26 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Sun, 2 Apr 2023 19:00:06 -0700 Subject: [PATCH 3/4] Formatted and cleaned up the data set diff --- src/mtconnect/buffer/checkpoint.cpp | 62 +++++++++++++---------------- src/mtconnect/buffer/checkpoint.hpp | 14 +++++-- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/src/mtconnect/buffer/checkpoint.cpp b/src/mtconnect/buffer/checkpoint.cpp index 8652da0f..6c6fecb2 100644 --- a/src/mtconnect/buffer/checkpoint.cpp +++ b/src/mtconnect/buffer/checkpoint.cpp @@ -225,53 +225,47 @@ namespace mtconnect { } } - ObservationPtr Checkpoint::dataSetDifference(const ObservationPtr &obs) const + ObservationPtr Checkpoint::dataSetDifference(const ObservationPtr &obs, + const ConstObservationPtr &old) const { if (obs->isOrphan()) return nullptr; auto setEvent = dynamic_pointer_cast(obs); auto item = obs->getDataItem(); - if (item->isDataSet() && !setEvent->getDataSet().empty() && - !obs->hasProperty("resetTriggered")) + if (!setEvent->getDataSet().empty() && !obs->hasProperty("resetTriggered")) { - const auto &id = item->getId(); - const auto ptr = m_observations.find(id); + auto oldEvent = dynamic_pointer_cast(old); + auto &oldSet = oldEvent->getDataSet(); + DataSet eventSet = setEvent->getDataSet(); + bool changed = false; - if (ptr != m_observations.end() && !ptr->second->isUnavailable()) + for (auto it = eventSet.begin(); it != eventSet.end();) { - const auto old = dynamic_pointer_cast(ptr->second); - auto &set = old->getDataSet(); - - DataSet eventSet = setEvent->getDataSet(); - bool changed = false; - - for (auto it = eventSet.begin(); it != eventSet.end();) + const auto v = oldSet.find(*it); + if (v == oldSet.end() || !v->same(*it)) { - const auto v = set.find(*it); - if (v == set.end() || !v->same(*it)) - { - it++; - } - else - { - changed = true; - eventSet.erase(it++); - } + it++; + } + else + { + changed = true; + eventSet.erase(it++); } + } - if (changed) + // If the data set has changed and been edited by delting it against the current latest. + if (changed) + { + if (!eventSet.empty()) { - if (!eventSet.empty()) - { - auto copy = dynamic_pointer_cast(setEvent->copy()); - copy->setDataSet(eventSet); - return copy; - } - else - { - return nullptr; - } + auto copy = dynamic_pointer_cast(setEvent->copy()); + copy->setDataSet(eventSet); + return copy; + } + else + { + return nullptr; } } } diff --git a/src/mtconnect/buffer/checkpoint.hpp b/src/mtconnect/buffer/checkpoint.hpp index 226b9f54..fae05755 100644 --- a/src/mtconnect/buffer/checkpoint.hpp +++ b/src/mtconnect/buffer/checkpoint.hpp @@ -48,12 +48,16 @@ namespace mtconnect::buffer { /// @brief If this is a data set event, diff the value /// @param[in] observation the data set observation + /// @param[in] old the previous value of the data set /// @return The observation or a copy if the data set changed - observation::ObservationPtr dataSetDifference(const observation::ObservationPtr &observation) const; + observation::ObservationPtr dataSetDifference( + const observation::ObservationPtr &observation, + const observation::ConstObservationPtr &old) const; /// @brief Checks if the observation is a duplicate with existing observations /// @param[in] obs the observation - /// @return `true` if the obsrvation is a duplicate + /// @return an observation, possibly changed if it is not a duplicate. `nullptr` if it is a + /// duplicate.. const observation::ObservationPtr checkDuplicate(const observation::ObservationPtr &obs) const { using namespace observation; @@ -66,9 +70,11 @@ namespace mtconnect::buffer { if (old != m_observations.end()) { auto &oldObs = old->second; + // Filter out unavailable duplicates, only allow through changed + // state. If both are unavailable, disregard. if (obs->isUnavailable() != oldObs->isUnavailable()) return obs; - else if (obs->isUnavailable()) + else if (obs->isUnavailable()) return nullptr; if (di->isCondition()) @@ -118,7 +124,7 @@ namespace mtconnect::buffer { { if (di->isDataSet()) { - return dataSetDifference(obs); + return dataSetDifference(obs, oldObs); } else { From 875a56da72465ca5d2e4ae9827ac83903e0578b4 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Sun, 2 Apr 2023 20:05:40 -0700 Subject: [PATCH 4/4] Added tests for duplicate conditions --- src/mtconnect/buffer/checkpoint.hpp | 4 ++ test/duplicate_filter_test.cpp | 94 ++++++++++++++++++++++++++++- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/src/mtconnect/buffer/checkpoint.hpp b/src/mtconnect/buffer/checkpoint.hpp index fae05755..7e2e6c5e 100644 --- a/src/mtconnect/buffer/checkpoint.hpp +++ b/src/mtconnect/buffer/checkpoint.hpp @@ -115,6 +115,10 @@ namespace mtconnect::buffer { return nullptr; } + else if (cond->getLevel() == Condition::NORMAL) + { + return nullptr; + } else { return obs; diff --git a/test/duplicate_filter_test.cpp b/test/duplicate_filter_test.cpp index 1385d1ac..4481aa55 100644 --- a/test/duplicate_filter_test.cpp +++ b/test/duplicate_filter_test.cpp @@ -215,6 +215,96 @@ TEST_F(DuplicateFilterTest, test_minimum_delta) } } -TEST_F(DuplicateFilterTest, test_condition_duplicates) { GTEST_SKIP(); } +TEST_F(DuplicateFilterTest, test_condition_duplicates) +{ + auto filter = make_shared(m_context); + m_mapper->bind(filter); + filter->bind(make_shared(m_context)); + + auto *contract = dynamic_cast(m_context->m_contract.get()); + makeDataItem({{"id", "c1"s}, {"type", "SYSTEM"s}, {"category", "CONDITION"s}}); + { + auto os = observe({"c1", "warning", "XXX", "100", "HIGH", "XXX Happened"}); + auto list = os->getValue(); + ASSERT_EQ(1, list.size()); + } + + { + auto os = observe({"c1", "warning", "XXX", "100", "HIGH", "XXX Happened"}); + auto list = os->getValue(); + ASSERT_EQ(0, list.size()); + } + + { + auto os = observe({"c1", "warning", "YYY", "100", "HIGH", "XXX Happened"}); + auto list = os->getValue(); + ASSERT_EQ(1, list.size()); + auto obs = contract->m_checkpoint.getObservation("c1"); + ASSERT_TRUE(obs); + auto cond = dynamic_pointer_cast(obs); + ASSERT_TRUE(cond); + ASSERT_EQ("YYY", cond->get("nativeCode")); + ASSERT_EQ(Condition::WARNING, cond->getLevel()); + auto prev = cond->getPrev(); + ASSERT_TRUE(prev); + ASSERT_FALSE(prev->getPrev()); + ASSERT_EQ("XXX", prev->get("nativeCode")); + ASSERT_EQ("100", prev->get("nativeSeverity")); + } + + { + auto os = observe({"c1", "warning", "XXX", "101", "HIGH", "XXX Happened"}); + auto list = os->getValue(); + ASSERT_EQ(1, list.size()); -TEST_F(DuplicateFilterTest, test_data_set_duplicates) { GTEST_SKIP(); } + auto obs = contract->m_checkpoint.getObservation("c1"); + ASSERT_TRUE(obs); + auto cond = dynamic_pointer_cast(obs); + ASSERT_TRUE(cond); + ASSERT_EQ("101", cond->get("nativeSeverity")); + ASSERT_EQ("XXX", cond->get("nativeCode")); + + auto prev = cond->getPrev(); + ASSERT_TRUE(prev); + ASSERT_EQ("YYY", prev->get("nativeCode")); + ASSERT_FALSE(prev->getPrev()); + } + + { + auto os = observe({"c1", "normal", "XXX", "", "", "Normal"}); + auto list = os->getValue(); + ASSERT_EQ(1, list.size()); + + auto obs = contract->m_checkpoint.getObservation("c1"); + ASSERT_TRUE(obs); + auto cond = dynamic_pointer_cast(obs); + ASSERT_EQ("YYY", cond->get("nativeCode")); + ASSERT_TRUE(cond); + ASSERT_FALSE(cond->getPrev()); + } + + { + auto os = observe({"c1", "normal", "XXX", "", "", ""}); + auto list = os->getValue(); + ASSERT_EQ(0, list.size()); + } + + { + auto os = observe({"c1", "normal", "", "", "", ""}); + auto list = os->getValue(); + ASSERT_EQ(1, list.size()); + + auto obs = contract->m_checkpoint.getObservation("c1"); + ASSERT_TRUE(obs); + auto cond = dynamic_pointer_cast(obs); + ASSERT_TRUE(cond); + ASSERT_EQ(Condition::NORMAL, cond->getLevel()); + ASSERT_FALSE(cond->getPrev()); + } + + { + auto os = observe({"c1", "normal", "", "", "", ""}); + auto list = os->getValue(); + ASSERT_EQ(0, list.size()); + } +}