Skip to content
This repository was archived by the owner on Oct 28, 2021. It is now read-only.

Commit 5d2ac04

Browse files
committed
Only send new blocks in certain cases
Don't send new blocks if we're not syncing or we're too far behind. This new code follows the logic in EthereumCapability::maintainBlockHashes. Also make some global constants constexpr and rename some constants for consistency reasons.
1 parent e390d3c commit 5d2ac04

File tree

6 files changed

+65
-43
lines changed

6 files changed

+65
-43
lines changed

libdevcore/Common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,6 @@ mt19937_64 g_randomGenerator(random_device{}());
140140

141141
int randomNumber(int _min, int _max)
142142
{
143-
return std::uniform_int_distribution<int>{_min, _max}(g_randomGenerator);
143+
return uniform_int_distribution<int>{_min, _max}(g_randomGenerator);
144144
}
145145
} // namespace dev

libethereum/BlockChain.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,12 +428,12 @@ tuple<ImportRoute, bool, unsigned> BlockChain::sync(
428428
_bq.drain(blocks, _max);
429429

430430
h256s badBlockHashes;
431-
std::tuple<ImportRoute, unsigned> importResult = sync(blocks, badBlockHashes, _stateDB);
432-
bool moreBlocks = _bq.doneDrain(badBlockHashes);
431+
tuple<ImportRoute, unsigned> const importResult = sync(blocks, badBlockHashes, _stateDB);
432+
bool const moreBlocks = _bq.doneDrain(badBlockHashes);
433433
return {get<0>(importResult), moreBlocks, get<1>(importResult)};
434434
}
435435

436-
std::tuple<ImportRoute, unsigned> BlockChain::sync(
436+
tuple<ImportRoute, unsigned> BlockChain::sync(
437437
VerifiedBlocks const& _blocks, h256s& _badBlockHashes, OverlayDB const& _stateDB)
438438
{
439439
h256s fresh;

libethereum/BlockChain.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,15 @@ class BlockChain
101101
void process();
102102

103103
/// Sync the chain with any incoming blocks. All blocks should, if processed in order.
104-
/// @returns fresh blocks, dead blocks and true iff there are additional blocks to be processed
105-
/// waiting.
104+
/// @returns tuple with three members - the first (ImportRoute) contains hashes of fresh blocks
105+
/// and dead blocks as well as a list of imported transactions. The second is a bool which is
106+
/// true iff there are additional blocks to be processed. The third is the imported block count.
106107
std::tuple<ImportRoute, bool, unsigned> sync(
107108
BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max);
109+
110+
/// Import the supplied blocks into the chain. Blocks should be processed in order.
111+
/// @returns a tuple with two members - the first (ImportRoute) contains fresh blocks, dead
112+
/// blocks and imported transactions. The second contains the imported block count.
108113
std::tuple<ImportRoute, unsigned> sync(
109114
VerifiedBlocks const& _blocks, h256s& _badBlockHashes, OverlayDB const& _stateDB);
110115

libethereum/Client.cpp

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ static_assert(BOOST_VERSION >= 106400, "Wrong boost headers version");
4343

4444
namespace
4545
{
46+
unsigned constexpr c_syncMin = 1;
47+
unsigned constexpr c_syncMax = 1000;
48+
double constexpr c_targetDuration = 1;
49+
4650
std::string filtersToString(h256Hash const& _fs)
4751
{
4852
std::stringstream str;
@@ -377,10 +381,6 @@ void Client::appendFromBlock(h256 const& _block, BlockPolarity _polarity, h256Ha
377381
}
378382
}
379383

380-
unsigned static const c_syncMin = 1;
381-
unsigned static const c_syncMax = 1000;
382-
double static const c_targetDuration = 1;
383-
384384
void Client::syncBlockQueue()
385385
{
386386
// cdebug << "syncBlockQueue()";
@@ -389,18 +389,14 @@ void Client::syncBlockQueue()
389389
unsigned count;
390390
Timer t;
391391

392-
std::shared_ptr<VerifiedBlocks> verifiedBlocks = std::make_shared<VerifiedBlocks>();
392+
shared_ptr<VerifiedBlocks> verifiedBlocks = make_shared<VerifiedBlocks>();
393393
m_bq.drain(*verifiedBlocks, m_syncAmount);
394394

395-
// Propagate new blocks to peers before importing them into the chain
395+
// Propagate new blocks to peers before importing them into the chain.
396396
auto h = m_host.lock();
397-
if (!h)
398-
{
399-
// TODO: Can we ever hit this?
400-
LOG(m_logger) << "Host unavailable??";
401-
return;
402-
}
403-
h->propagateBlocks(verifiedBlocks);
397+
assert(h); // capability is owned by Host and should be available for the duration of the
398+
// Client's lifetime
399+
h->propagateNewBlocks(verifiedBlocks);
404400

405401
h256s badBlockHashes;
406402
tie(ir, count) = bc().sync(*verifiedBlocks, badBlockHashes, m_stateDB);

libethereum/EthereumCapability.cpp

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ std::chrono::milliseconds constexpr EthereumCapability::c_backgroundWorkInterval
2626

2727
namespace
2828
{
29-
constexpr unsigned c_maxSendTransactions = 256;
30-
constexpr unsigned c_maxHeadersToSend = 1024;
31-
constexpr unsigned c_maxIncomingNewHashes = 1024;
29+
constexpr unsigned c_maxSendTransactionsCount = 256;
30+
constexpr unsigned c_maxSendHeadersCount = 1024;
31+
constexpr unsigned c_maxIncomingNewHashesCount = 1024;
3232
constexpr unsigned c_peerTimeoutSeconds = 10;
3333
constexpr int c_minBlockBroadcastPeers = 4;
34+
constexpr size_t c_maxSendNewBlocksCount = 20;
3435

3536
string toString(Asking _a)
3637
{
@@ -412,13 +413,14 @@ EthereumCapability::EthereumCapability(shared_ptr<p2p::CapabilityHostFace> _host
412413
m_tq(_tq),
413414
m_bq(_bq),
414415
m_networkId(_networkId),
415-
m_hostData(new EthereumHostData(m_chain, m_db))
416+
m_hostData(new EthereumHostData(m_chain, m_db)),
417+
m_latestBlockHashSent{_ch.currentHash()},
418+
m_latestBlockSent{_ch.currentHash()}
416419
{
417420
// TODO: Composition would be better. Left like that to avoid initialization
418421
// issues as BlockChainSync accesses other EthereumHost members.
419422
m_sync.reset(new BlockChainSync(*this));
420423
m_peerObserver.reset(new EthereumPeerObserver(m_sync, m_tq));
421-
m_latestBlockHashSent = _ch.currentHash();
422424
m_tq.onImport([this](ImportResult _ir, h256 const& _h, h512 const& _nodeId) { onTransactionImported(_ir, _h, _nodeId); });
423425
std::random_device seed;
424426
m_urng = std::mt19937_64(seed());
@@ -433,8 +435,10 @@ bool EthereumCapability::ensureInitialised()
433435
{
434436
if (!m_latestBlockHashSent)
435437
{
438+
assert(!m_latestBlockSent.load());
436439
// First time - just initialise.
437440
m_latestBlockHashSent = m_chain.currentHash();
441+
m_latestBlockSent = m_chain.currentHash();
438442
LOG(m_logger) << "Initialising: latest=" << m_latestBlockHashSent;
439443

440444
m_transactionsSent = m_tq.knownTransactions();
@@ -450,7 +454,8 @@ void EthereumCapability::reset()
450454
// reset() can be called from RPC handling thread,
451455
// but we access m_latestBlockHashSent and m_transactionsSent only from the network thread
452456
m_host->postWork([this]() {
453-
m_latestBlockHashSent = h256();
457+
m_latestBlockHashSent = {};
458+
m_latestBlockSent.exchange({});
454459
m_transactionsSent.clear();
455460
});
456461
}
@@ -464,7 +469,7 @@ void EthereumCapability::maintainTransactions()
464469
{
465470
// Send any new transactions.
466471
unordered_map<NodeID, std::vector<size_t>> peerTransactions;
467-
auto ts = m_tq.topTransactions(c_maxSendTransactions);
472+
auto ts = m_tq.topTransactions(c_maxSendTransactionsCount);
468473
{
469474
for (size_t i = 0; i < ts.size(); ++i)
470475
{
@@ -537,14 +542,14 @@ std::pair<std::vector<NodeID>, std::vector<NodeID>> EthereumCapability::randomPa
537542

538543
void EthereumCapability::maintainBlockHashes(h256 const& _currentHash)
539544
{
540-
// Send any new blocks.
545+
// Send any new block hashes
541546
auto detailsFrom = m_chain.details(m_latestBlockHashSent);
542547
auto detailsTo = m_chain.details(_currentHash);
543548
if (detailsFrom.totalDifficulty < detailsTo.totalDifficulty)
544549
{
545-
if (diff(detailsFrom.number, detailsTo.number) < 20)
550+
if (diff(detailsFrom.number, detailsTo.number) <= c_maxSendNewBlocksCount)
546551
{
547-
// don't be sending more than 20 "new" blocks. if there are any more we were probably
552+
// don't be sending more than c_maxSendNewBlocksCount "new" block hashes. if there are any more we were probably
548553
// waaaay behind.
549554
LOG(m_logger) << "Sending new block hashes (current is " << _currentHash << ", was "
550555
<< m_latestBlockHashSent << ")";
@@ -679,9 +684,9 @@ bool EthereumCapability::interpretCapabilityPacket(
679684
const auto skip = _r[2].toInt<u256>();
680685
const auto reverse = _r[3].toInt<bool>();
681686

682-
auto numHeadersToSend = maxHeaders <= c_maxHeadersToSend ?
687+
auto numHeadersToSend = maxHeaders <= c_maxSendHeadersCount ?
683688
static_cast<unsigned>(maxHeaders) :
684-
c_maxHeadersToSend;
689+
c_maxSendHeadersCount;
685690

686691
if (skip > std::numeric_limits<unsigned>::max() - 1)
687692
{
@@ -757,11 +762,11 @@ bool EthereumCapability::interpretCapabilityPacket(
757762
LOG(m_logger) << "BlockHashes (" << dec << itemCount << " entries) "
758763
<< (itemCount ? "" : " : NoMoreHashes") << " from " << _peerID;
759764

760-
if (itemCount > c_maxIncomingNewHashes)
765+
if (itemCount > c_maxIncomingNewHashesCount)
761766
{
762767
LOG(m_logger) << "Received too many hashes (" << itemCount << ") from " << _peerID
763-
<< ", only processing first " << c_maxIncomingNewHashes << " hashes";
764-
itemCount = c_maxIncomingNewHashes;
768+
<< ", only processing first " << c_maxIncomingNewHashesCount << " hashes";
769+
itemCount = c_maxIncomingNewHashesCount;
765770
}
766771

767772
vector<pair<h256, u256>> hashes(itemCount);
@@ -952,13 +957,21 @@ void EthereumCapability::removeSentTransactions(std::vector<h256> const& _txHash
952957
}
953958
}
954959

955-
void EthereumCapability::propagateBlocks(std::shared_ptr<VerifiedBlocks const> const& _blocks)
960+
void EthereumCapability::propagateNewBlocks(std::shared_ptr<VerifiedBlocks const> const& _newBlocks)
956961
{
957-
if (_blocks->empty())
962+
// Safe to call isSyncing() from a non-network thread since the underlying variable is
963+
// std::atomic
964+
if (_newBlocks->empty() || !isSyncing())
958965
return;
959966

960-
m_host->postWork([this, _blocks]() {
961-
auto const currentHash = _blocks->back().verified.info.hash();
967+
// Check if we're too far behind
968+
auto const currentHash = _newBlocks->back().verified.info.hash();
969+
auto const detailsFrom = m_chain.details(m_latestBlockSent);
970+
auto const detailsTo = m_chain.details(currentHash);
971+
if (diff(detailsFrom.number, detailsTo.number) > c_maxSendNewBlocksCount)
972+
return;
973+
974+
m_host->postWork([this, _newBlocks, currentHash]() {
962975
auto const peersWithoutBlock = selectPeers(
963976
[&](EthereumPeer const& _peer) { return !_peer.isBlockKnown(currentHash); });
964977

@@ -967,7 +980,7 @@ void EthereumCapability::propagateBlocks(std::shared_ptr<VerifiedBlocks const> c
967980

968981
std::vector<NodeID> const peersToSend = randomPeers(peersWithoutBlock, peersToSendNumber);
969982
for (NodeID const& peerID : peersToSend)
970-
for (auto const& b : *_blocks)
983+
for (auto const& b : *_newBlocks)
971984
{
972985
RLPStream ts;
973986
m_host->prep(peerID, name(), ts, NewBlockPacket, 2)
@@ -983,8 +996,11 @@ void EthereumCapability::propagateBlocks(std::shared_ptr<VerifiedBlocks const> c
983996
}
984997
}
985998
if (!peersToSend.empty())
986-
LOG(m_logger) << "Sent " << _blocks->size() << " block(s) to " << peersToSend.size()
999+
{
1000+
m_latestBlockSent = currentHash;
1001+
LOG(m_logger) << "Sent " << _newBlocks->size() << " block(s) to " << peersToSend.size()
9871002
<< " peers";
1003+
}
9881004
});
9891005
}
9901006

libethereum/EthereumCapability.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,9 @@ class EthereumCapability : public p2p::CapabilityFace
144144
void removeSentTransactions(std::vector<h256> const& _txHashes);
145145

146146
/// Send new blocks to peers. Should be done after we've verified the PoW but before we've
147-
/// imported the blocks into the chain (in order to reduce the uncle rate). Actual sending of
148-
/// blocks is done on the network thread.
149-
void propagateBlocks(std::shared_ptr<VerifiedBlocks const> const& _blocks);
147+
/// imported the blocks into the chain (in order to reduce the uncle rate). Thread-safe (actual
148+
/// sending of blocks is done on the network thread).
149+
void propagateNewBlocks(std::shared_ptr<VerifiedBlocks const> const& _newBlocks);
150150

151151
private:
152152
static char const* const c_stateNames[static_cast<int>(SyncState::Size)];
@@ -184,7 +184,12 @@ class EthereumCapability : public p2p::CapabilityFace
184184

185185
u256 m_networkId;
186186

187+
// We need to keep track of sent blocks and block hashes separately since we propagate new
188+
// blocks after we've verified their PoW (and a few other things i.e. they've been imported into
189+
// the block queue and verified) but we propagate new block hashes after blocks have been
190+
// imported into the chain
187191
h256 m_latestBlockHashSent;
192+
std::atomic<h256> m_latestBlockSent;
188193
h256Hash m_transactionsSent;
189194

190195
std::atomic<bool> m_newTransactions = {false};

0 commit comments

Comments
 (0)