Skip to content
This repository was archived by the owner on Oct 28, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Changed: [#5568](https://github.com/ethereum/aleth/pull/5568) Improve rlpx handshake log messages and create new rlpx log channel.
- Changed: [#5576](https://github.com/ethereum/aleth/pull/5576) Moved sstore_combinations and static_Call50000_sha256 tests to stTimeConsuming test suite. (testeth runs them only with `--all` flag)
- Changed: [#5589](https://github.com/ethereum/aleth/pull/5589) Make aleth output always line-buffered even when redirected to file or pipe.
- Changed: [#5602](https://github.com/ethereum/aleth/pull/5602) Better predicting external IP address and UDP port.
- Fixed: [#5562](https://github.com/ethereum/aleth/pull/5562) Don't send header request messages to peers that haven't sent us Status yet.
- Fixed: [#5581](https://github.com/ethereum/aleth/pull/5581) Fixed finding neighbour nodes in Discovery.

Expand Down
75 changes: 75 additions & 0 deletions libp2p/EndpointTracker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#include "EndpointTracker.h"

namespace dev
{
namespace p2p
{
/// Register the statement about endpoint from one othe peers.
/// @returns number of currently kept statements in favor of @a _externalEndpoint
size_t EndpointTracker::addEndpointStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint)
{
// remove previous statement by this peer
auto it = m_statementsMap.find(_sourceEndpoint);
if (it != m_statementsMap.end())
removeStatement(it);

return addStatement(_sourceEndpoint, _externalEndpoint);
}

/// Find endpoint with max number of statemens
bi::udp::endpoint EndpointTracker::bestEndpoint() const
{
if (m_endpointStatementCountMap.empty())
return {};

// find endpoint with max count
auto itMax =
std::max_element(m_endpointStatementCountMap.begin(), m_endpointStatementCountMap.end(),
[](std::pair<bi::udp::endpoint const, size_t> const& _endpointAndCount1,
std::pair<bi::udp::endpoint const, size_t> const& _endpointAndCount2) {
return _endpointAndCount1.second < _endpointAndCount2.second;
});

return itMax->first;
}

/// Remove old statements
void EndpointTracker::garbageCollectStatements(std::chrono::seconds const& _timeToLive)
{
auto const expiration = std::chrono::steady_clock::now() - _timeToLive;
for (auto it = m_statementsMap.begin(); it != m_statementsMap.end();)
{
if (it->second.second < expiration)
it = removeStatement(it);
else
++it;
}
}

size_t EndpointTracker::addStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint)
{
EndpointAndTimePoint endpointAndTime{_externalEndpoint, std::chrono::steady_clock::now()};
m_statementsMap.insert({_sourceEndpoint, endpointAndTime});
return ++m_endpointStatementCountMap[_externalEndpoint];
}

EndpointTracker::SourceToStatementMap::iterator EndpointTracker::removeStatement(
SourceToStatementMap::iterator _it)
{
// first decrement statement counter
auto itCount = m_endpointStatementCountMap.find(_it->second.first);
assert(itCount != m_endpointStatementCountMap.end() && itCount->second > 0);
if (--itCount->second == 0)
m_endpointStatementCountMap.erase(itCount);

return m_statementsMap.erase(_it);
}

} // namespace p2p
} // namespace dev
47 changes: 47 additions & 0 deletions libp2p/EndpointTracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#pragma once

#include "Common.h"

namespace dev
{
namespace p2p
{
/// Class for keeping track of our external endpoint as seen by our peers.
/// Keeps track of what external endpoint is seen by every peer
/// and finds which endpoint is reported most often
class EndpointTracker
{
public:
/// Register the statement about endpoint from one of the peers.
/// @returns number of currently kept statements in favor of @a _externalEndpoint
size_t addEndpointStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) typo (statemens)

/// Find endpoint with max number of statements
bi::udp::endpoint bestEndpoint() const;

/// Remove statements older than _timeToLive
void garbageCollectStatements(std::chrono::seconds const& _timeToLive);

private:
using EndpointAndTimePoint =
std::pair<bi::udp::endpoint, std::chrono::steady_clock::time_point>;
using SourceToStatementMap = std::map<bi::udp::endpoint, EndpointAndTimePoint>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be unsorted map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost doesn't have a hashing function defined for udp::endpoint (but has operator<), so I used map to avoid defining it myself

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check in the latest boost version? If not there we should report a bug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's a bug, maybe not every type should define hashing function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like good candidate for being a key in a hash map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

@nbougalis nbougalis Nov 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resurrecting this from the dead, just to say that I'm surprised that boostorg/asio#245 got no responses so far. We have been carrying custom hashing support for these types in rippled so it's been "out of sight, out of mind" for that codebase, but this really is something that Boost should provide.

I'm happy to have Ripple sponsor a bounty to get support coded up, or I can have someone from my team do it if nobody else is interested.


size_t addStatement(
bi::udp::endpoint const& _sourceEndpoint, bi::udp::endpoint const& _externalEndpoint);

SourceToStatementMap::iterator removeStatement(SourceToStatementMap::iterator _it);

/// Statements about our external endpoint, maps statement source peer => endpoint, timestamp
SourceToStatementMap m_statementsMap;
/// map external endpoint => how many sources reported it
std::map<bi::udp::endpoint, size_t> m_endpointStatementCountMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be unsorted map.

};

} // namespace p2p
} // namespace dev
93 changes: 67 additions & 26 deletions libp2p/NodeTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ BOOST_LOG_INLINE_GLOBAL_LOGGER_CTOR_ARGS(g_discoveryWarnLogger,

// Cadence at which we timeout sent pings and evict unresponsive nodes
constexpr chrono::milliseconds c_handleTimeoutsIntervalMs{5000};
// Cadence at which we remove old records from EndpointTracker
constexpr chrono::milliseconds c_removeOldEndpointStatementsIntervalMs{5000};
// Change external endpoint after this number of peers report new one
constexpr size_t c_minEndpointTrackStatements{10};
// Interval during which each endpoint statement is kept
constexpr std::chrono::minutes c_endpointStatementTimeToLiveMin{5};

} // namespace

Expand All @@ -35,6 +41,7 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint
ENR const& _enr, bool _enabled, bool _allowLocalDiscovery)
: m_hostNodeID{_alias.pub()},
m_hostNodeIDHash{sha3(m_hostNodeID)},
m_hostStaticIP{isAllowedEndpoint(_endpoint) ? _endpoint.address() : bi::address{}},
m_hostNodeEndpoint{_endpoint},
m_hostENR{_enr},
m_secret{_alias.secret()},
Expand All @@ -44,6 +51,7 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint
m_allowLocalDiscovery{_allowLocalDiscovery},
m_discoveryTimer{make_shared<ba::steady_timer>(_io)},
m_timeoutsTimer{make_shared<ba::steady_timer>(_io)},
m_endpointTrackingTimer{make_shared<ba::steady_timer>(_io)},
m_io{_io}
{
for (unsigned i = 0; i < s_bins; i++)
Expand All @@ -60,6 +68,7 @@ NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint
m_socket->connect();
doDiscovery();
doHandleTimeouts();
doEndpointTracking();
}
catch (exception const& _e)
{
Expand Down Expand Up @@ -484,6 +493,7 @@ void NodeTable::onPacketReceived(
}
}


shared_ptr<NodeEntry> NodeTable::handlePong(
bi::udp::endpoint const& _from, DiscoveryDatagram const& _packet)
{
Expand Down Expand Up @@ -536,13 +546,20 @@ shared_ptr<NodeEntry> NodeTable::handlePong(

m_sentPings.erase(_from);

// update our endpoint address and UDP port
DEV_GUARDED(x_nodes)
// update our external endpoint address and UDP port
if (m_endpointTracker.addEndpointStatement(_from, pong.destination) >=
c_minEndpointTrackStatements)
{
if ((!m_hostNodeEndpoint || !isAllowedEndpoint(m_hostNodeEndpoint)) &&
isPublicAddress(pong.destination.address()))
m_hostNodeEndpoint.setAddress(pong.destination.address());
m_hostNodeEndpoint.setUdpPort(pong.destination.udpPort());
auto newUdpEndpoint = m_endpointTracker.bestEndpoint();
if (!m_hostStaticIP.is_unspecified())
newUdpEndpoint.address(m_hostStaticIP);

if (newUdpEndpoint != m_hostNodeEndpoint)
{
m_hostNodeEndpoint = NodeIPEndpoint{
newUdpEndpoint.address(), newUdpEndpoint.port(), m_hostNodeEndpoint.tcpPort()};
LOG(m_logger) << "New external endpoint found: " << m_hostNodeEndpoint;
}
}

return sourceNodeEntry;
Expand Down Expand Up @@ -735,7 +752,7 @@ void NodeTable::doDiscovery()
if (_ec.value() == boost::asio::error::operation_aborted ||
discoveryTimer->expires_at() == c_steadyClockMin)
{
clog(VerbosityDebug, "discov") << "Discovery timer was probably cancelled";
clog(VerbosityDebug, "discov") << "Discovery timer was cancelled";
return;
}
else if (_ec)
Expand All @@ -755,24 +772,7 @@ void NodeTable::doDiscovery()

void NodeTable::doHandleTimeouts()
{
m_timeoutsTimer->expires_from_now(c_handleTimeoutsIntervalMs);
auto timeoutsTimer{m_timeoutsTimer};
m_timeoutsTimer->async_wait([this, timeoutsTimer](boost::system::error_code const& _ec) {
// We can't use m_logger if an error occurred because captured this might be already
// destroyed
if (_ec.value() == boost::asio::error::operation_aborted ||
timeoutsTimer->expires_at() == c_steadyClockMin)
{
clog(VerbosityDebug, "discov") << "evictions timer was probably cancelled";
return;
}
else if (_ec)
{
clog(VerbosityDebug, "discov")
<< "evictions timer encountered an error: " << _ec.value() << " " << _ec.message();
return;
}

runBackgroundTask(c_handleTimeoutsIntervalMs, m_timeoutsTimer, [this]() {
vector<shared_ptr<NodeEntry>> nodesToActivate;
for (auto it = m_sentPings.begin(); it != m_sentPings.end();)
{
Expand All @@ -796,11 +796,52 @@ void NodeTable::doHandleTimeouts()
// activate replacement nodes and put them into buckets
for (auto const& n : nodesToActivate)
noteActiveNode(n);
});
}

doHandleTimeouts();
void NodeTable::doEndpointTracking()
{
runBackgroundTask(c_removeOldEndpointStatementsIntervalMs, m_endpointTrackingTimer,
[this]() { m_endpointTracker.garbageCollectStatements(c_endpointStatementTimeToLiveMin); });
}

void NodeTable::runBackgroundTask(std::chrono::milliseconds const& _period,
std::shared_ptr<ba::steady_timer> _timer, std::function<void()> _f)
{
_timer->expires_from_now(_period);
_timer->async_wait([=](boost::system::error_code const& _ec) {
// We can't use m_logger if an error occurred because captured this might be already
// destroyed
if (_ec.value() == boost::asio::error::operation_aborted ||
_timer->expires_at() == c_steadyClockMin)
{
clog(VerbosityDebug, "discov") << "Timer was cancelled";
return;
}
else if (_ec)
{
clog(VerbosityDebug, "discov")
<< "Timer error detected: " << _ec.value() << " " << _ec.message();
return;
}

_f();

runBackgroundTask(_period, move(_timer), move(_f));
});
}

void NodeTable::cancelTimer(std::shared_ptr<ba::steady_timer> _timer)
{
// We "cancel" the timers by setting c_steadyClockMin rather than calling cancel()
// because cancel won't set the boost error code if the timers have already expired and
// the handlers are in the ready queue.
//
// Note that we "cancel" via io_service::post to ensure thread safety when accessing the
// timers
m_io.post([_timer] { _timer->expires_at(c_steadyClockMin); });
}

unique_ptr<DiscoveryDatagram> DiscoveryDatagram::interpretUDP(bi::udp::endpoint const& _from, bytesConstRef _packet)
{
unique_ptr<DiscoveryDatagram> decoded;
Expand Down
39 changes: 23 additions & 16 deletions libp2p/NodeTable.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2018 Aleth Authors.
// Copyright 2019 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.

#pragma once

#include <algorithm>

#include <boost/integer/static_log2.hpp>

#include "Common.h"
#include "ENR.h"
#include "EndpointTracker.h"
#include <libp2p/UDP.h>
#include <boost/integer/static_log2.hpp>
#include <algorithm>

namespace dev
{
Expand Down Expand Up @@ -127,16 +126,9 @@ class NodeTable : UDPSocketEvents
{
if (m_socket->isOpen())
{
// We "cancel" the timers by setting c_steadyClockMin rather than calling cancel()
// because cancel won't set the boost error code if the timers have already expired and
// the handlers are in the ready queue.
//
// Note that we "cancel" via io_service::post to ensure thread safety when accessing the
// timers
auto discoveryTimer{m_discoveryTimer};
m_io.post([discoveryTimer] { discoveryTimer->expires_at(c_steadyClockMin); });
auto timeoutsTimer{m_timeoutsTimer};
m_io.post([timeoutsTimer] { timeoutsTimer->expires_at(c_steadyClockMin); });
cancelTimer(m_discoveryTimer);
cancelTimer(m_timeoutsTimer);
cancelTimer(m_endpointTrackingTimer);
m_socket->disconnect();
}
}
Expand Down Expand Up @@ -184,7 +176,13 @@ class NodeTable : UDPSocketEvents
/// Returns the Node to the corresponding node id or the empty Node if that id is not found.
Node node(NodeID const& _id);

// protected only for derived classes in tests

void runBackgroundTask(std::chrono::milliseconds const& _period,
std::shared_ptr<ba::steady_timer> _timer, std::function<void()> _f);

void cancelTimer(std::shared_ptr<ba::steady_timer> _timer);

// protected only for derived classes in tests
protected:
/**
* NodeValidation is used to record information about the nodes that we have sent Ping to.
Expand Down Expand Up @@ -312,6 +310,9 @@ class NodeTable : UDPSocketEvents
/// bring in their replacements
void doHandleTimeouts();

// Remove old records in m_endpointTracker.
void doEndpointTracking();

// Useful only for tests.
void setRequestTimeToLive(std::chrono::seconds const& _time) { m_requestTimeToLive = _time; }
uint32_t nextRequestExpirationTime() const
Expand All @@ -329,6 +330,9 @@ class NodeTable : UDPSocketEvents

NodeID const m_hostNodeID;
h256 const m_hostNodeIDHash;
// Host IP address given to constructor
bi::address const m_hostStaticIP;
// Dynamically updated host endpoint
NodeIPEndpoint m_hostNodeEndpoint;
ENR const m_hostENR;
Secret m_secret; ///< This nodes secret key.
Expand Down Expand Up @@ -358,8 +362,11 @@ class NodeTable : UDPSocketEvents

bool m_allowLocalDiscovery; ///< Allow nodes with local addresses to be included in the discovery process

EndpointTracker m_endpointTracker;

std::shared_ptr<ba::steady_timer> m_discoveryTimer;
std::shared_ptr<ba::steady_timer> m_timeoutsTimer;
std::shared_ptr<ba::steady_timer> m_endpointTrackingTimer;

ba::io_service& m_io;
};
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ set(unittest_sources

unittests/libp2p/capability.cpp
unittests/libp2p/eip-8.cpp
unittests/libp2p/EndpointTrackerTest.cpp
unittests/libp2p/ENRTest.cpp
unittests/libp2p/rlpx.cpp

Expand Down
Loading