Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/mtconnect/agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,11 @@ namespace mtconnect {

void sourceFailed(const std::string &identity) override { m_agent->sourceFailed(identity); }

const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override
{
return m_agent->getCircularBuffer().checkDuplicate(obs);
}

protected:
Agent *m_agent;
};
Expand Down
62 changes: 31 additions & 31 deletions src/mtconnect/buffer/checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,52 +225,52 @@ namespace mtconnect {
}
}

bool Checkpoint::dataSetDifference(ObservationPtr obs) const
ObservationPtr Checkpoint::dataSetDifference(const ObservationPtr &obs,
const ConstObservationPtr &old) const
{
if (obs->isOrphan())
return false;
return nullptr;

auto setEvent = dynamic_pointer_cast<DataSetEvent>(obs);
auto setEvent = dynamic_pointer_cast<const DataSetEvent>(obs);
auto item = obs->getDataItem();
if (item->isDataSet() && !setEvent->getDataSet().empty() &&
!obs->hasProperty("resetTriggered"))
if (!setEvent->getDataSet().empty() && !obs->hasProperty("resetTriggered"))
{
const auto &id = item->getId();
const auto ptr = m_observations.find(id);
auto oldEvent = dynamic_pointer_cast<const DataSetEvent>(old);
auto &oldSet = oldEvent->getDataSet();
DataSet eventSet = setEvent->getDataSet();
bool changed = false;

if (ptr != m_observations.end() && !ptr->second->isUnavailable())
for (auto it = eventSet.begin(); it != eventSet.end();)
{
const auto old = dynamic_pointer_cast<DataSetEvent>(ptr->second);
auto &set = old->getDataSet();

DataSet eventSet = setEvent->getDataSet();
bool changed = false;

for (auto it = eventSet.begin(); it != eventSet.end();)
const auto v = oldSet.find(*it);
if (v == oldSet.end() || !v->same(*it))
{
const auto v = set.find(*it);
if (v == set.end() || !v->same(*it))
{
it++;
}
else
{
changed = true;
eventSet.erase(it++);
}
it++;
}

if (changed)
else
{
setEvent->setDataSet(eventSet);
changed = true;
eventSet.erase(it++);
}
}

return !eventSet.empty();
// If the data set has changed and been edited by delting it against the current latest.
if (changed)
{
if (!eventSet.empty())
{
auto copy = dynamic_pointer_cast<DataSetEvent>(setEvent->copy());
copy->setDataSet(eventSet);
return copy;
}
else
{
return nullptr;
}
}
}

return true;
return obs;
}

} // namespace buffer
} // namespace mtconnect
98 changes: 96 additions & 2 deletions src/mtconnect/buffer/checkpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,102 @@ namespace mtconnect::buffer {

/// @brief If this is a data set event, diff the value
/// @param[in] observation the data set observation
/// @return `true` if the data set changed
bool dataSetDifference(observation::ObservationPtr observation) const;
/// @param[in] old the previous value of the data set
/// @return The observation or a copy if the data set changed
observation::ObservationPtr dataSetDifference(
const observation::ObservationPtr &observation,
const observation::ConstObservationPtr &old) const;

/// @brief Checks if the observation is a duplicate with existing observations
/// @param[in] obs the observation
/// @return an observation, possibly changed if it is not a duplicate. `nullptr` if it is a
/// duplicate..
const observation::ObservationPtr checkDuplicate(const observation::ObservationPtr &obs) const
{
using namespace observation;
using namespace std;

auto di = obs->getDataItem();
const auto &id = di->getId();
auto old = m_observations.find(id);

if (old != m_observations.end())
{
auto &oldObs = old->second;
// Filter out unavailable duplicates, only allow through changed
// state. If both are unavailable, disregard.
if (obs->isUnavailable() != oldObs->isUnavailable())
return obs;
else if (obs->isUnavailable())
return nullptr;

if (di->isCondition())
{
auto *cond = dynamic_cast<Condition *>(obs.get());
auto *oldCond = dynamic_cast<Condition *>(oldObs.get());

// Check for normal resetting all conditions. If there are
// no active conditions, then this is a duplicate normal
if (cond->getLevel() == Condition::NORMAL && cond->getCode().empty())
{
if (oldCond->getLevel() == Condition::NORMAL && oldCond->getCode().empty())
return nullptr;
else
return obs;
}

// If there is already an active condition with this code,
// then check if nothing has changed between activations.
if (const auto &e = oldCond->find(cond->getCode()))
{
if (cond->getLevel() != e->getLevel())
return obs;

if ((cond->hasValue() != e->hasValue()) ||
(cond->hasValue() && cond->getValue() != e->getValue()))
return obs;

if ((cond->hasProperty("qualifier") != e->hasProperty("qualifier")) ||
(cond->hasProperty("qualifier") &&
cond->get<string>("qualifier") != e->get<string>("qualifier")))
return obs;

if ((cond->hasProperty("nativeSeverity") != e->hasProperty("nativeSeverity")) ||
(cond->hasProperty("nativeSeverity") &&
cond->get<string>("nativeSeverity") != e->get<string>("nativeSeverity")))
return obs;

return nullptr;
}
else if (cond->getLevel() == Condition::NORMAL)
{
return nullptr;
}
else
{
return obs;
}
}
else if (!di->isDiscrete())
{
if (di->isDataSet())
{
return dataSetDifference(obs, oldObs);
}
else
{
auto &value = obs->getValue();
auto &oldValue = oldObs->getValue();

if (value == oldValue)
return nullptr;
else
return obs;
}
}
}
return obs;
}

/// @brief copy another checkpoint to this checkpoint
/// @param[in] checkpoint a checkpoint to copy
Expand Down
19 changes: 9 additions & 10 deletions src/mtconnect/buffer/circular_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,6 @@ namespace mtconnect::buffer {

std::lock_guard<std::recursive_mutex> lock(m_sequenceLock);
auto dataItem = observation->getDataItem();

if (!dataItem->isDiscrete())
{
if (!observation->isUnavailable() && dataItem->isDataSet() &&
!m_latest.dataSetDifference(observation))
{
return 0;
}
}

auto seq = m_sequence;

observation->setSequence(seq);
Expand Down Expand Up @@ -173,6 +163,15 @@ namespace mtconnect::buffer {
auto getCheckpointFreq() const { return m_checkpointFreq; }
auto getCheckpointCount() const { return m_checkpointCount; }

/// @brief Check if observation is a duplicate by validating against the latest checkpoint
/// @param[in] obs the observation to check
/// @return `true` if the observation is a duplicate
const observation::ObservationPtr checkDuplicate(const observation::ObservationPtr &obs) const
{
std::lock_guard<std::recursive_mutex> lock(m_sequenceLock);
return m_latest.checkDuplicate(obs);
}

/// @brief Get a checkpoint at a sequence number
/// @param at the sequence number to get the checkpoint at
/// @param filterSet the filter to apply to the new checkpoint
Expand Down
2 changes: 1 addition & 1 deletion src/mtconnect/entity/entity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ namespace mtconnect {
AttributeSet m_attributes;
};

/// @brief variant visitor to compare two entities for equality
/// @brief variant visitor to compare two entity parameter values for equality
struct ValueEqualVisitor
{
ValueEqualVisitor(const Value &t) : m_this(t) {}
Expand Down
1 change: 1 addition & 0 deletions src/mtconnect/entity/requirement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
namespace mtconnect::entity {
class Entity;
using EntityPtr = std::shared_ptr<Entity>;
using ConstEntityPtr = std::shared_ptr<const Entity>;
/// @brief List of shared entities
using EntityList = std::list<std::shared_ptr<Entity>>;
/// @brief Vector of doubles
Expand Down
2 changes: 1 addition & 1 deletion src/mtconnect/mqtt/mqtt_client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ namespace mtconnect {
/// - MqttHost, defaults to LocalHost
MqttClientImpl(boost::asio::io_context &ioContext, const ConfigOptions &options,
std::unique_ptr<ClientHandler> &&handler)
: MqttClient(ioContext, move(handler)),
: MqttClient(ioContext, std::move(handler)),
m_options(options),
m_host(GetOption<std::string>(options, configuration::MqttHost).value_or("localhost")),
m_port(GetOption<int>(options, configuration::MqttPort).value_or(1883)),
Expand Down
15 changes: 15 additions & 0 deletions src/mtconnect/observation/observation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace mtconnect::observation {

class Observation;
using ObservationPtr = std::shared_ptr<Observation>;
using ConstObservationPtr = std::shared_ptr<const Observation>;
using ObservationList = std::list<ObservationPtr>;

/// @brief Abstract observation
Expand Down Expand Up @@ -351,6 +352,20 @@ namespace mtconnect::observation {
return nullptr;
}

/// @brief const find a condition by code in the condition list
/// @param[in] code te code
/// @return shared pointer to the condition if found
const ConditionPtr find(const std::string &code) const
{
if (m_code == code)
return std::dynamic_pointer_cast<Condition>(Entity::getptr());

if (m_prev)
return m_prev->find(code);

return nullptr;
}

/// @brief replace a condition with another in the condition list
/// @param[in] old the condition to be placed
/// @param[in] _new the replacement condition
Expand Down
29 changes: 6 additions & 23 deletions src/mtconnect/pipeline/duplicate_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,45 +34,28 @@ namespace mtconnect::pipeline {
DuplicateFilter(const DuplicateFilter &) = default;
/// @brief Create a duplicate filter with shared state from the context
/// @param context the context
DuplicateFilter(PipelineContextPtr context)
: Transform("DuplicateFilter"), m_state(context->getSharedState<State>(m_name))
DuplicateFilter(PipelineContextPtr context) : Transform("DuplicateFilter"), m_context(context)
{
using namespace observation;
static constexpr auto lambda = [](const Observation &o) {
return !o.isOrphan() && !o.getDataItem()->isDiscrete();
};
m_guard = LambdaGuard<Observation, ExactTypeGuard<Event, Sample, ThreeSpaceSample, Message>>(
lambda, RUN) ||
TypeGuard<Observation>(SKIP);
m_guard = TypeGuard<observation::Observation>(RUN);
}
~DuplicateFilter() override = default;

const entity::EntityPtr operator()(const entity::EntityPtr entity) override
{
using namespace observation;
std::lock_guard<TransformState> guard(*m_state);

auto o = std::dynamic_pointer_cast<Observation>(entity);
if (o->isOrphan())
return entity::EntityPtr();

auto di = o->getDataItem();
auto &id = di->getId();

auto &values = m_state->m_values;
auto old = values.find(id);
if (old != values.end() && old->second == o->getValue())
auto o2 = m_context->m_contract->checkDuplicate(o);
if (!o2)
return entity::EntityPtr();

if (old == values.end())
values[id] = o->getValue();
else
old->second = o->getValue();

return next(entity);
return next(o2);
}

protected:
std::shared_ptr<State> m_state;
PipelineContextPtr m_context;
};
} // namespace mtconnect::pipeline
6 changes: 6 additions & 0 deletions src/mtconnect/pipeline/pipeline_contract.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ namespace mtconnect {
class Asset;
using AssetPtr = std::shared_ptr<Asset>;
} // namespace asset
namespace observation {
class Observation;
}
using DataItemPtr = std::shared_ptr<device_model::data_item::DataItem>;
using DevicePtr = std::shared_ptr<device_model::Device>;
using ObservationPtr = std::shared_ptr<observation::Observation>;
using StringList = std::list<std::string>;

namespace observation {
Expand Down Expand Up @@ -96,6 +100,8 @@ namespace mtconnect {
/// @brief The source is no longer viable, do not try to reconnect
/// @param[in] identity the identity of the source
virtual void sourceFailed(const std::string &identity) = 0;

virtual const ObservationPtr checkDuplicate(const ObservationPtr &obs) const = 0;
};
} // namespace pipeline
} // namespace mtconnect
9 changes: 5 additions & 4 deletions src/mtconnect/printer/json_printer_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,14 @@ namespace mtconnect::printer {
using ObjectPtr = std::optional<ObjectType>;
/// @brief alias for a smart pointer to an AutoJsonArray type for the WriterType
using ArrayPtr = std::optional<ArrayType>;

/// @brief Structure that holds either an object or an array
struct StackMember {
struct StackMember
{
ObjectPtr m_object;
ArrayPtr m_array;
};

/// @brief alias for a list of variants
using Stack = std::list<StackMember>;

Expand Down Expand Up @@ -398,7 +399,7 @@ namespace mtconnect::printer {
auto &member = m_stack.emplace_back();
member.m_array.emplace(base::m_writer);
}

/// @brief Closes open objects and arrays, stopping when the size is `<=` to `to`
/// @param[in] to stop when the size of the stack is equal to `to`, defaults to `0`
void clear(size_t to = 0)
Expand Down
Loading