Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set(AGENT_VERSION_MAJOR 2)
set(AGENT_VERSION_MINOR 0)
set(AGENT_VERSION_PATCH 0)
set(AGENT_VERSION_BUILD 12)
set(AGENT_VERSION_RC "_RC24")
set(AGENT_VERSION_RC "_RC25")

# This minimum version is to support Visual Studio 2017 and C++ feature checking and FetchContent
cmake_minimum_required(VERSION 3.16 FATAL_ERROR)
Expand Down
88 changes: 58 additions & 30 deletions src/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <boost/range/any_range.hpp>
#include <boost/range/functions.hpp>
#include <boost/range/metafunctions.hpp>
#include <boost/range/numeric.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
Expand Down Expand Up @@ -76,9 +77,8 @@ namespace mtconnect {
m_context(context),
m_strand(m_context),
m_xmlParser(make_unique<parser::XmlParser>()),
m_version(
GetOption<string>(options, config::SchemaVersion)
.value_or(to_string(AGENT_VERSION_MAJOR) + "." + to_string(AGENT_VERSION_MINOR))),
m_schemaVersion(
GetOption<string>(options, config::SchemaVersion)),
m_deviceXmlPath(deviceXmlPath),
m_circularBuffer(GetOption<int>(options, config::BufferSize).value_or(17),
GetOption<int>(options, config::CheckpointFrequency).value_or(1000)),
Expand All @@ -102,8 +102,14 @@ namespace mtconnect {
uint32_t(GetOption<int>(options, mtconnect::configuration::JsonVersion).value_or(2));

// Create the Printers
m_printers["xml"] = make_unique<printer::XmlPrinter>(m_version, m_pretty);
m_printers["json"] = make_unique<printer::JsonPrinter>(jsonVersion, m_version, m_pretty);
m_printers["xml"] = make_unique<printer::XmlPrinter>(m_pretty);
m_printers["json"] = make_unique<printer::JsonPrinter>(jsonVersion, m_pretty);

if (m_schemaVersion)
{
for (auto &[k, pr] : m_printers)
pr->setSchemaVersion(*m_schemaVersion);
}
}

void Agent::initialize(pipeline::PipelineContextPtr context)
Expand All @@ -114,19 +120,27 @@ namespace mtconnect {
m_loopback =
std::make_shared<source::LoopbackSource>("AgentSource", m_strand, context, m_options);

int major, minor;
char c;
stringstream vstr(m_version);
vstr >> major >> c >> minor;
auto devices = loadXMLDeviceFile(m_deviceXmlPath);
if (!m_schemaVersion)
{
m_schemaVersion.emplace(StrDefaultSchemaVersion());
}

auto version = IntSchemaVersion(*m_schemaVersion);
for (auto &[k, pr] : m_printers)
pr->setSchemaVersion(*m_schemaVersion);

auto disableAgentDevice = GetOption<bool>(m_options, config::DisableAgentDevice);
if (!(disableAgentDevice && *disableAgentDevice) && (major > 1 || (major == 1 && minor >= 7)))
if (!(disableAgentDevice && *disableAgentDevice) && version >= SCHEMA_VERSION(1, 7))
{
createAgentDevice();
}
loadXMLDeviceFile(m_deviceXmlPath);
loadCachedProbe();

// For the DeviceAdded event for each device
for (auto device : devices)
addDevice(device);

loadCachedProbe();
m_initialized = true;
}

Expand Down Expand Up @@ -277,13 +291,21 @@ namespace mtconnect {
}
}

void Agent::reloadDevices(const std::string &deviceFile)
bool Agent::reloadDevices(const std::string &deviceFile)
{
try
{
// Load the configuration for the Agent
auto devices = m_xmlParser->parseFile(
deviceFile, dynamic_cast<printer::XmlPrinter *>(m_printers["xml"].get()));

if (m_xmlParser->getSchemaVersion() &&
IntSchemaVersion(*m_xmlParser->getSchemaVersion()) != IntSchemaVersion(*m_schemaVersion))
{
LOG(info) << "Got version: " << *(m_xmlParser->getSchemaVersion());
LOG(warning) << "Schema version does not match agent schema version, restarting the agent";
return false;
}

// Fir the DeviceAdded event for each device
bool changed = false;
Expand All @@ -293,6 +315,8 @@ namespace mtconnect {
}
if (changed)
loadCachedProbe();

return true;
}
catch (runtime_error &e)
{
Expand Down Expand Up @@ -430,7 +454,7 @@ namespace mtconnect {
if (!fs::exists(backup))
fs::rename(file, backup);

printer::XmlPrinter printer(m_version, true);
printer::XmlPrinter printer(true);

std::list<DevicePtr> list;
copy_if(m_deviceIndex.begin(), m_deviceIndex.end(), back_inserter(list),
Expand Down Expand Up @@ -576,7 +600,7 @@ namespace mtconnect {

// Create the Agent Device
ErrorList errors;
Properties ps {{"uuid", uuid}, {"id", id}, {"name", "Agent"s}, {"mtconnectVersion", m_version}};
Properties ps {{"uuid", uuid}, {"id", id}, {"name", "Agent"s}, {"mtconnectVersion", *m_schemaVersion}};
m_agentDevice =
dynamic_pointer_cast<AgentDevice>(AgentDevice::getFactory()->make("Agent", ps, errors));
if (!errors.empty())
Expand All @@ -592,7 +616,7 @@ namespace mtconnect {
// Device management and Initialization
// ----------------------------------------------

void Agent::loadXMLDeviceFile(const std::string &configXmlPath)
std::list<device_model::DevicePtr> Agent::loadXMLDeviceFile(const std::string &configXmlPath)
{
NAMED_SCOPE("Agent::loadXMLDeviceFile");

Expand All @@ -602,9 +626,16 @@ namespace mtconnect {
auto devices = m_xmlParser->parseFile(
configXmlPath, dynamic_cast<printer::XmlPrinter *>(m_printers["xml"].get()));

// Fir the DeviceAdded event for each device
for (auto device : devices)
addDevice(device);
if (!m_schemaVersion && m_xmlParser->getSchemaVersion())
{
m_schemaVersion = m_xmlParser->getSchemaVersion();
}
else if (!m_schemaVersion && !m_xmlParser->getSchemaVersion())
{
m_schemaVersion = StrDefaultSchemaVersion();
}

return devices;
}
catch (runtime_error &e)
{
Expand All @@ -620,14 +651,15 @@ namespace mtconnect {
cerr << f.what() << endl;
throw f;
}

return {};
}

void Agent::verifyDevice(DevicePtr device)
{
NAMED_SCOPE("Agent::verifyDevice");

auto xmlPrinter = dynamic_cast<printer::XmlPrinter *>(m_printers["xml"].get());
const auto &schemaVersion = xmlPrinter->getSchemaVersion();

auto version = IntSchemaVersion(*m_schemaVersion);

// Add the devices to the device map and create availability and
// asset changed events if they don't exist
Expand All @@ -644,11 +676,7 @@ namespace mtconnect {
device->addDataItem(di, errors);
}

int major, minor;
char c;
stringstream ss(schemaVersion);
ss >> major >> c >> minor;
if (!device->getAssetChanged() && (major > 1 || (major == 1 && minor >= 2)))
if (!device->getAssetChanged() && version >= SCHEMA_VERSION(1, 2))
{
entity::ErrorList errors;
// Create asset change data item and add it to the device.
Expand All @@ -659,14 +687,14 @@ namespace mtconnect {
device->addDataItem(di, errors);
}

if (device->getAssetChanged() && (major > 1 || (major == 1 && minor >= 5)))
if (device->getAssetChanged() && version >= SCHEMA_VERSION(1, 5))
{
auto di = device->getAssetChanged();
if (!di->isDiscrete())
di->makeDiscrete();
}

if (!device->getAssetRemoved() && (major > 1 || (major == 1 && minor >= 3)))
if (!device->getAssetRemoved() && version >= SCHEMA_VERSION(1, 3))
{
// Create asset removed data item and add it to the device.
entity::ErrorList errors;
Expand All @@ -677,7 +705,7 @@ namespace mtconnect {
device->addDataItem(di, errors);
}

if (!device->getAssetCount() && (major >= 2))
if (!device->getAssetCount() && version >= SCHEMA_VERSION(2, 0))
{
entity::ErrorList errors;
auto di = DataItem::make({{"type", "ASSET_COUNT"s},
Expand Down
8 changes: 5 additions & 3 deletions src/agent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ namespace mtconnect {
const auto &getSources() const { return m_sources; }
const auto &getSinks() const { return m_sinks; }

const auto &getSchemaVersion() const { return m_schemaVersion; }

// Get device from device map
DevicePtr getDeviceByName(const std::string &name);
DevicePtr getDeviceByName(const std::string &name) const;
Expand Down Expand Up @@ -157,7 +159,7 @@ namespace mtconnect {
// Add the a device from a configuration file
void addDevice(DevicePtr device);
void deviceChanged(DevicePtr device, const std::string &oldUuid, const std::string &oldName);
void reloadDevices(const std::string &deviceFile);
bool reloadDevices(const std::string &deviceFile);

// Message when adapter has connected and disconnected
void connecting(const std::string &adapter);
Expand Down Expand Up @@ -221,7 +223,7 @@ namespace mtconnect {

// Initialization methods
void createAgentDevice();
void loadXMLDeviceFile(const std::string &config);
std::list<device_model::DevicePtr> loadXMLDeviceFile(const std::string &config);
void verifyDevice(DevicePtr device);
void initializeDataItems(DevicePtr device,
std::optional<std::set<std::string>> skip = std::nullopt);
Expand Down Expand Up @@ -296,7 +298,7 @@ namespace mtconnect {
std::unordered_map<std::string, WeakDataItemPtr> m_dataItemMap;

// Xml Config
std::string m_version;
std::optional<std::string> m_schemaVersion;
std::string m_deviceXmlPath;
bool m_versionDeviceXml = false;

Expand Down
58 changes: 44 additions & 14 deletions src/configuration/agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <boost/log/utility/setup/common_attributes.hpp>
#include <boost/log/utility/setup/console.hpp>
#include <boost/log/utility/setup/file.hpp>
#include <boost/filesystem.hpp>

#ifdef __APPLE__
#include <mach-o/dyld.h>
Expand Down Expand Up @@ -85,6 +86,7 @@

using namespace std;
namespace fs = std::filesystem;
namespace bfs = boost::filesystem;
namespace pt = boost::property_tree;
namespace logr = boost::log;
namespace dll = boost::dll;
Expand Down Expand Up @@ -276,18 +278,34 @@ namespace mtconnect::configuration {
LOG(warning)
<< "Detected change in configuration files. Will reload when youngest file is at least "
<< m_monitorDelay.count() << " seconds old";

if (devTime != *m_deviceTime)
{
auto t = bfs::last_write_time(m_devicesFile);
LOG(warning) << "Dected change in Devices file: " << m_devicesFile;
LOG(warning) << "... File changed at: " << put_time(localtime(&t), "%F %T");
}

if (cfgTime != *m_configTime)
{
auto t = bfs::last_write_time(m_devicesFile);
LOG(warning) << "Dected change in Config file: " << m_configFile;
LOG(warning) << "... File changed at: " << put_time(localtime(&t), "%F %T");
}

auto delta = min(now - cfgTime, now - devTime);
if (delta < m_monitorDelay)
{
LOG(warning) << "Changed, waiting " << int32_t((m_monitorDelay - delta).count());
LOG(warning) << "... Waiting " << int32_t((m_monitorDelay - delta).count()) << " seconds";
scheduleMonitorTimer();
}
else
{
if (cfgTime != *m_configTime)
{
LOG(warning)
<< "Monitor thread has detected change in configuration files, restarting agent.";
<< "Monitor thread has detected change in configuration files.";
LOG(warning) << ".... Restarting agent: " << m_configFile;

m_agent->stop();

Expand All @@ -311,21 +329,34 @@ namespace mtconnect::configuration {
}
},
true);

return;
}

// Handle device changed by delivering the device file to the agent
if (devTime != *m_deviceTime)
else if (devTime != *m_deviceTime)
{
// Handle device changed by delivering the device file to the agent
LOG(warning) << "Monitor thread has detected change in devices files.";
LOG(warning) << "... Reloading Devices File: " << m_devicesFile;

m_context->pause([this](AsyncContext &context) {
m_agent->reloadDevices(m_devicesFile);
m_deviceTime.reset();
if (!m_agent->reloadDevices(m_devicesFile))
{
m_configTime.emplace(m_configTime->min());
using namespace chrono;
using namespace chrono_literals;

using boost::placeholders::_1;

m_monitorTimer.expires_from_now(100ms);
m_monitorTimer.async_wait(boost::bind(&AgentConfiguration::monitorFiles, this, _1));
}
else
{
m_deviceTime.reset();
scheduleMonitorTimer();
}
});
}
}

scheduleMonitorTimer();
return;
}

Expand Down Expand Up @@ -624,8 +655,7 @@ namespace mtconnect::configuration {
{configuration::MaxCachedFileSize, "20k"s},
{configuration::MinCompressFileSize, "100k"s},
{configuration::ServiceName, "MTConnect Agent"s},
{configuration::SchemaVersion,
to_string(AGENT_VERSION_MAJOR) + "."s + to_string(AGENT_VERSION_MINOR)},
{configuration::SchemaVersion, ""s},
{configuration::LogStreams, false},
{configuration::ShdrVersion, 1},
{configuration::WorkerThreads, 1},
Expand Down Expand Up @@ -685,20 +715,20 @@ namespace mtconnect::configuration {
}

// Check for schema version
m_version = get<string>(options[configuration::SchemaVersion]);
auto port = get<int>(options[configuration::Port]);
LOG(info) << "Starting agent on port " << int(port);

// Make the Agent
m_agent = make_unique<Agent>(getAsyncContext(), m_devicesFile, options);

// Make the PipelineContext
m_pipelineContext = std::make_shared<pipeline::PipelineContext>();
m_pipelineContext->m_contract = m_agent->makePipelineContract();

loadSinks(config, options);

m_agent->initialize(m_pipelineContext);
m_version = *m_agent->getSchemaVersion();

DevicePtr device;
if (get<bool>(options[configuration::PreserveUUID]))
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt/mqtt_server_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace mtconnect {
// It makes sure wp.lock() never return nullptr in the handlers below
// including close_handler and error_handler.
ep.start_session(std::make_tuple(std::move(spep), std::move(g)));
ep.set_connect_handler([this, &server, wp](MQTT_NS::buffer client_id,
ep.set_connect_handler([this, wp](MQTT_NS::buffer client_id,
MQTT_NS::optional<MQTT_NS::buffer> username,
MQTT_NS::optional<MQTT_NS::buffer> password,
MQTT_NS::optional<MQTT_NS::will>,
Expand Down
Loading