From 8c0ff34ccd3572405e68c9f92b75e66d86d0f6de Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Thu, 7 Feb 2019 20:04:29 -0800 Subject: [PATCH 01/21] Merge #14897: randomize GETDATA(tx) request order and introduce bias toward outbound MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1cff3d6cb0 Change in transaction pull scheduling to prevent InvBlock-related attacks (Gleb Naumenko) Pull request description: This code makes executing two particular (and potentially other) attacks harder. ### InvBlock This behavior was described well [here](https://www.cs.umd.edu/projects/coinscope/coinscope.pdf) (page 11). Per current implementation, if node A receives _INV_ (tx) from node B, node A sends _GETDATA_ to B and waits for _TX_ message back. Node A is likely to receive more _INVs_ (regarding the same tx) from other peers. But node A would not send another _GETDATA_ unless it does not hear _TX_ back from node B for next 2 minutes (to save bandwidth) Thus, if B is a malicious node, it can prevent node A from getting the transaction (even if all A’s peers have it) for 2 minutes. This behavior seems to be an inherent limitation of the current P2P relay protocol, and I don’t see how it can be fundamentally changed (I can see workarounds which involve rewriting a lot of P2P code though). ### What does this PR fix? The attacks I’m looking at involve preventing A from learning the transaction for 2*N minutes. To do that, an attacker has to spin up N nodes and send N _INVs_ simultaneously to node A (then InvBlocks will be queued with an interval of 2 minutes according to current implementation) More precisely, 2 scenarios I’m looking at are: 1. An attacker censors a particular transaction. By performing InvBlock from different nodes, an attacker can execute a network-wide censorship of a particular transaction (or all transactions). The earlier an attacker founds the transaction he wants to censor, the easier it is to perform an attack. As it was pointed out by @gwillen, this is even more dangerous in the case of lightning, where transactions are known in advance. 2. Topology inference described in papers [1](https://www.cs.umd.edu/projects/coinscope/coinscope.pdf), [2](https://arxiv.org/pdf/1812.00942.pdf) involve network-wide InvBlock. This fix would not mitigate this type of inference, but I believe it will make it more expensive to perform (an attacker would have to create more transactions and perform more rounds to learn the topology, the second paper itself notes that InvBlock isolation is important for the attack). ### How does it work This PR introduces bias toward outbound connections (they have higher priority when a node chooses from whom it should request a transaction) and randomizes the order. As per @gmaxwell suggestion, GETDATA requests queue is created after processing all incoming messages from all nodes. After this fix, if the incoming messages were [I1, I2, I3, O1, O2, O3, O4], the queue for _GETDATA_ may look like [O2, O1, O3, O4, I1, I3, I2, ….]. If {I1, I2, I3} were significantly earlier (but the difference is less than TX_TIMEOUT=60 s) than others, the queue for _GETDATA_ may look like [I2, O2, O1, O3, O4, I1, I3, ….]. ### Other comments: 1. This mitigation works better if the connectivity is higher (especially outbound, because it would be less likely that 2 _GETDATAs_ for inbound malicious nodes queued together) Tree-SHA512: 2ad1e80c3c7e16ff0f2d1160aa7d9a5eaae88baa88467f156b987fe2a387f767a41e11507d7f99ea02ab75e89ab93b6a278d138cb1054f1aaa2df336e9b2ca6a --- src/net.cpp | 50 ---------- src/net.h | 10 -- src/net_processing.cpp | 201 +++++++++++++++++++++++++++++++++++------ 3 files changed, 171 insertions(+), 90 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index b71c538862f5..8fc258e8200f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -96,8 +96,6 @@ std::map mapLocalHost; static bool vfLimited[NET_MAX] = {}; std::string strSubVersion; -unordered_limitedmap mapAlreadyAskedFor(MAX_INV_SZ, MAX_INV_SZ * 2); - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -3271,54 +3269,6 @@ CNode::~CNode() CloseSocket(hSocket); } -void CNode::AskFor(const CInv& inv, int64_t doubleRequestDelay) -{ - if (queueAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ) { - int64_t nNow = GetTime(); - if(nNow - nLastWarningTime > WARNING_INTERVAL) { - LogPrintf("CNode::AskFor -- WARNING: inventory message dropped: vecAskFor.size = %d, setAskFor.size = %d, MAPASKFOR_MAX_SZ = %d, SETASKFOR_MAX_SZ = %d, nSkipped = %d, peer=%d\n", - queueAskFor.size(), setAskFor.size(), MAPASKFOR_MAX_SZ, SETASKFOR_MAX_SZ, nNumWarningsSkipped, id); - nLastWarningTime = nNow; - nNumWarningsSkipped = 0; - } - else { - ++nNumWarningsSkipped; - } - return; - } - // a peer may not have multiple non-responded queue positions for a single inv item - if (!setAskFor.emplace(inv.hash).second) - return; - - // We're using queueAskFor as a priority queue, - // the key is the earliest time the request can be sent - int64_t nRequestTime; - auto it = mapAlreadyAskedFor.find(inv.hash); - if (it != mapAlreadyAskedFor.end()) - nRequestTime = it->second; - else - nRequestTime = 0; - - LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000), id); - - // Make sure not to reuse time indexes to keep things in the same order - int64_t nNow = GetTimeMicros() - 1000000; - static int64_t nLastTime; - ++nLastTime; - nNow = std::max(nNow, nLastTime); - nLastTime = nNow; - - // Each retry is 2 minutes after the last - nRequestTime = std::max(nRequestTime + doubleRequestDelay, nNow); - if (it != mapAlreadyAskedFor.end()) - mapAlreadyAskedFor.update(it, nRequestTime); - else - mapAlreadyAskedFor.insert(std::make_pair(inv.hash, nRequestTime)); - - queueAskFor.emplace(nRequestTime, inv); - setAskForInQueue.emplace(inv.hash); -} - void CNode::RemoveAskFor(const uint256& hash) { setAskFor.erase(hash); diff --git a/src/net.h b/src/net.h index 8e8f9edb9aae..b0ac6feeff11 100644 --- a/src/net.h +++ b/src/net.h @@ -92,10 +92,6 @@ static const bool DEFAULT_UPNP = USE_UPNP; #else static const bool DEFAULT_UPNP = false; #endif -/** The maximum number of entries in mapAskFor */ -static const size_t MAPASKFOR_MAX_SZ = MAX_INV_SZ; -/** The maximum number of entries in setAskFor (larger due to getdata latency)*/ -static const size_t SETASKFOR_MAX_SZ = 2 * MAX_INV_SZ; /** The maximum number of peer connections to maintain. * Masternodes are forced to accept at least this many connections */ @@ -663,8 +659,6 @@ extern bool fDiscover; extern bool fListen; extern bool fRelayTxes; -extern unordered_limitedmap mapAlreadyAskedFor; - /** Subversion as sent to the P2P network in `version` messages */ extern std::string strSubVersion; @@ -861,9 +855,6 @@ class CNode // List of non-tx/non-block inventory items std::vector vInventoryOtherToSend; CCriticalSection cs_inventory; - std::unordered_set setAskFor; - std::unordered_set setAskForInQueue; - std::priority_queue, std::vector>, std::greater<>> queueAskFor; int64_t nNextInvSend; // Used for headers announcements - unfiltered blocks to relay // Also protected by cs_inventory @@ -1038,7 +1029,6 @@ class CNode vBlockHashesToAnnounce.push_back(hash); } - void AskFor(const CInv& inv, int64_t doubleRequestDelay = 2 * 60 * 1000000); void RemoveAskFor(const uint256& hash); void CloseSocketDisconnect(); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index bcf4521ea3c8..66c41a82229b 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -57,6 +57,21 @@ # error "Dash Core cannot be compiled without assertions." #endif +/** Maximum number of in-flight transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; +/** Maximum number of announced transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** How many microseconds to delay requesting transactions from inbound peers */ +static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; +/** How long to wait (in microseconds) before downloading a transaction from an additional peer */ +static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ +static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; +static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, +"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); +/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ +static const unsigned int MAX_GETDATA_SZ = 1000; + std::atomic nTimeBestReceived(0); // Used only to inform the wallet of when we last received a block struct IteratorComparator @@ -254,6 +269,66 @@ struct CNodeState { //! Time of last new block announcement int64_t m_last_block_announcement; + /* + * State associated with transaction download. + * + * Tx download algorithm: + * + * When inv comes in, queue up (process_time, txid) inside the peer's + * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer + * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). + * + * The process_time for a transaction is set to nNow for outbound peers, + * nNow + 2 seconds for inbound peers. This is the time at which we'll + * consider trying to request the transaction from the peer in + * SendMessages(). The delay for inbound peers is to allow outbound peers + * a chance to announce before we request from inbound peers, to prevent + * an adversary from using inbound connections to blind us to a + * transaction (InvBlock). + * + * When we call SendMessages() for a given peer, + * we will loop over the transactions in m_tx_process_time, looking + * at the transactions whose process_time <= nNow. We'll request each + * such transaction that we don't have already and that hasn't been + * requested from another peer recently, up until we hit the + * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested txid, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate transaction + * requests amongst our peers. + * + * For transactions that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, txid) + * back into the peer's m_tx_process_time at the point in the future at + * which the most recent GETDATA request would time out (ie + * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). + * We add an additional delay for inbound peers, again to prefer + * attempting download from outbound peers first. + * We also add an extra small random delay up to 2 seconds + * to avoid biasing some peers over others. (e.g., due to fixed ordering + * of peer processing in ThreadMessageHandler). + * + * When we receive a transaction from a peer, we remove the txid from the + * peer's m_tx_in_flight set and from their recently announced set + * (m_tx_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the transaction is not accepted but also not added to + * the reject filter, then we will eventually redownload from other + * peers. + */ + struct TxDownloadState { + /* Track when to attempt download of announced transactions (process + * time in micros -> txid) + */ + std::multimap m_tx_process_time; + + //! Store all the transactions a peer has recently announced + std::set m_tx_announced; + + //! Store transactions which were requested by us + std::set m_tx_in_flight; + }; + + TxDownloadState m_tx_download; + CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; nMisbehavior = 0; @@ -279,6 +354,9 @@ struct CNodeState { } }; +// Keeps track of the time (in microseconds) when transactions were requested last time +unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); + /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -572,6 +650,58 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vectorsecond; + } + return 0; +} + +void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + auto it = g_already_asked_for.find(txid); + if (it == g_already_asked_for.end()) { + g_already_asked_for.insert(std::make_pair(txid, request_time)); + } else { + g_already_asked_for.update(it, request_time); + } +} + + +void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; + if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_tx_announced.insert(txid); + + int64_t process_time; + int64_t last_request_time = GetTxRequestTime(txid); + // First time requesting this tx + if (last_request_time == 0) { + process_time = nNow; + } else { + // Randomize the delay to avoid biasing some peers over others (such as due to + // fixed ordering of peer processing in ThreadMessageHandler) + process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); + } + + // We delay processing announcements from non-preferred (eg inbound) peers + if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY; + + peer_download_state.m_tx_process_time.emplace(process_time, txid); +} + } // namespace // This function is used for testing the stale tip eviction logic, see @@ -2189,6 +2319,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr LOCK(cs_main); + int64_t nNow = GetTimeMicros(); + for (CInv &inv : vInv) { if(!inv.IsKnownType()) { @@ -2257,7 +2389,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr doubleRequestDelay = 10 * 1000000; break; } - pfrom->AskFor(inv, doubleRequestDelay); + RequestTx(State(pfrom->GetId()), inv.hash, nNow); } } } @@ -2489,10 +2621,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr CInv inv(nInvType, tx.GetHash()); pfrom->AddInventoryKnown(inv); - { - LOCK(cs_main); - connman->RemoveAskFor(inv.hash); - } // Process custom logic, no matter if tx will be accepted to mempool later or not if (nInvType == MSG_DSTX) { @@ -2546,6 +2674,11 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr bool fMissingInputs = false; CValidationState state; + CNodeState* nodestate = State(pfrom->GetId()); + nodestate->m_tx_download.m_tx_announced.erase(inv.hash); + nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash); + EraseTxRequest(inv.hash); + if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */, false /* bypass_limits */, 0 /* nAbsurdFee */)) { // Process custom txes, this changes AlreadyHave to "true" @@ -2587,14 +2720,16 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } } if (!fRejectedParents) { + int64_t nNow = GetTimeMicros(); + for (const CTxIn& txin : tx.vin) { CInv _inv(MSG_TX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) pfrom->AskFor(_inv); + if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow); // We don't know if the previous tx was a regular or a mixing one, try both CInv _inv2(MSG_DSTX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv2); - if (!AlreadyHave(_inv2)) pfrom->AskFor(_inv2); + if (!AlreadyHave(_inv2)) RequestTx(State(pfrom->GetId()), _inv2.hash, nNow); } AddOrphanTx(ptx, pfrom->GetId()); @@ -4051,33 +4186,39 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // // Message: getdata (non-blocks) // - while (!pto->queueAskFor.empty() && pto->queueAskFor.top().first <= nNow) - { - const CInv& inv = pto->queueAskFor.top().second; - auto jt = pto->setAskForInQueue.find(inv.hash); - if (jt == pto->setAskForInQueue.end()) { - pto->queueAskFor.pop(); - continue; - } - - if (!AlreadyHave(inv)) - { - LogPrint(BCLog::NET, "SendMessages -- GETDATA -- requesting inv = %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= 1000) - { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - LogPrint(BCLog::NET, "SendMessages -- GETDATA -- pushed size = %lu peer=%d\n", vGetData.size(), pto->GetId()); - vGetData.clear(); + auto& tx_process_time = state.m_tx_download.m_tx_process_time; + while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { + const uint256& txid = tx_process_time.begin()->second; + CInv inv(MSG_TX | GetFetchFlags(pto), txid); + if (!AlreadyHave(inv)) { + // If this transaction was last requested more than 1 minute ago, + // then request. + int64_t last_request_time = GetTxRequestTime(inv.hash); + if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { + LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.push_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); + } + UpdateTxRequestTime(inv.hash, nNow); + state.m_tx_download.m_tx_in_flight.insert(inv.hash); + } else { + // This transaction is in flight from someone else; queue + // up processing to happen after the download times out + // (with a slight delay for inbound peers, to prefer + // requests to outbound peers). + RequestTx(&state, txid, nNow); } } else { - //If we're not going to ask, don't expect a response. - LogPrint(BCLog::NET, "SendMessages -- GETDATA -- already have inv = %s peer=%d\n", inv.ToString(), pto->GetId()); - pto->setAskFor.erase(inv.hash); + // We have already seen this transaction, no need to download. + state.m_tx_download.m_tx_announced.erase(inv.hash); + state.m_tx_download.m_tx_in_flight.erase(inv.hash); } - pto->queueAskFor.pop(); - pto->setAskForInQueue.erase(jt); + tx_process_time.erase(tx_process_time.begin()); } + + if (!vGetData.empty()) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); LogPrint(BCLog::NET, "SendMessages -- GETDATA -- pushed size = %lu peer=%d\n", vGetData.size(), pto->GetId()); From 74eabc23e54a1329b6c0231b8ea0e3df583ed4f8 Mon Sep 17 00:00:00 2001 From: MarcoFalke Date: Wed, 12 Jun 2019 12:32:52 -0400 Subject: [PATCH 02/21] Merge #15834: Fix transaction relay bugs introduced in #14897 and expire transactions from peer in-flight map 308b76732f Fix bug around transaction requests (Suhas Daftuar) f635a3ba11 Expire old entries from the in-flight tx map (Suhas Daftuar) e32e08407e Remove NOTFOUND transactions from in-flight data structures (Suhas Daftuar) 23163b7593 Add an explicit memory bound to m_tx_process_time (Suhas Daftuar) 218697b645 Improve NOTFOUND comment (Suhas Daftuar) Pull request description: #14897 introduced several bugs that could lead to a node no longer requesting transactions from one or more of its peers. Credit to ajtowns for originally reporting many of these bugs along with an originally proposed fix in #15776. This PR does a few things: - Fix a bug in NOTFOUND processing, where the in-flight map for a peer was keeping transactions it shouldn't - Eliminate the possibility of a memory attack on the CNodeState `m_tx_process_time` data structure by explicitly bounding its size - Remove entries from a peer's in-flight map after 10 minutes, so that we should always eventually resume transaction requests even if there are other bugs like the NOTFOUND one - Fix a bug relating to the coordination of request times when multiple peers announce the same transaction The expiry mechanism added here is something we'll likely want to remove in the future, but is belt-and-suspenders for now to try to ensure we don't have other bugs that could lead to transaction relay failing due to some unforeseen conditions. ACKs for commit 308b76: ajtowns: utACK 308b76732f97020c86977e29c854e8e27262cf7c morcos: light ACK 308b767 laanwj: Code review ACK 308b76732f97020c86977e29c854e8e27262cf7c jonatack: Light ACK 308b76732f97020c86977e29c854e8e27262cf7c. jamesob: ACK 308b76732f MarcoFalke: ACK 308b76732f97020c86977e29c854e8e27262cf7c (Tested two of the three bugs this pull fixes, see comment above) jamesob: Concept ACK https://github.com/bitcoin/bitcoin/pull/15834/commits/308b76732f97020c86977e29c854e8e27262cf7c MarcoFalke: ACK 308b76732f Tree-SHA512: 8865dca5294447859d95655e8699085643db60c22f0719e76e961651a1398251bc932494b68932e33f68d4f6084579ab3bed7d0e7dd4ac6c362590eaf9414eda --- src/net_processing.cpp | 125 +++++++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 30 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 66c41a82229b..854f1411e801 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -62,11 +62,13 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; /** Maximum number of announced transactions from a peer */ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; /** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; +static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ -static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; // 1 minute /** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ -static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; +static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds +/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */ +static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL; static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, "To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ @@ -323,8 +325,11 @@ struct CNodeState { //! Store all the transactions a peer has recently announced std::set m_tx_announced; - //! Store transactions which were requested by us - std::set m_tx_in_flight; + //! Store transactions which were requested by us, with timestamp + std::map m_tx_in_flight; + + //! Periodically check for stuck getdata requests + int64_t m_check_expiry_timer{0}; }; TxDownloadState m_tx_download; @@ -674,30 +679,40 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO } } - -void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) { - // Too many queued announcements from this peer, or we already have - // this announcement - return; - } - peer_download_state.m_tx_announced.insert(txid); - int64_t process_time; int64_t last_request_time = GetTxRequestTime(txid); // First time requesting this tx if (last_request_time == 0) { - process_time = nNow; + process_time = current_time; } else { // Randomize the delay to avoid biasing some peers over others (such as due to // fixed ordering of peer processing in ThreadMessageHandler) process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); } - // We delay processing announcements from non-preferred (eg inbound) peers - if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY; + // We delay processing announcements from inbound peers + if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; + + return process_time; +} + +void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; + if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || + peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS || + peer_download_state.m_tx_announced.count(txid)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_tx_announced.insert(txid); + + // Calculate the time to try requesting this transaction. Use + // fPreferredDownload as a proxy for outbound peers. + int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, txid); } @@ -1580,12 +1595,19 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam if (!vNotFound.empty()) { // Let the peer know that we didn't find what it asked for, so it doesn't - // have to wait around forever. Currently only SPV clients actually care - // about this message: it's needed when they are recursively walking the - // dependencies of relevant unconfirmed transactions. SPV clients want to - // do that because they want to know about (and store and rebroadcast and - // risk analyze) the dependencies of transactions relevant to them, without - // having to download the entire memory pool. + // have to wait around forever. + // SPV clients care about this message: it's needed when they are + // recursively walking the dependencies of relevant unconfirmed + // transactions. SPV clients want to do that because they want to know + // about (and store and rebroadcast and risk analyze) the dependencies + // of transactions relevant to them, without having to download the + // entire memory pool. + // Also, other nodes can use these messages to automatically request a + // transaction from some other peer that annnounced it, and stop + // waiting for us to respond. + // In normal operation, we often send NOTFOUND messages for parents of + // transactions that we relay; if a peer is missing a parent, they may + // assume we have them and request the parents from us. connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound)); } } @@ -3338,8 +3360,27 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (strCommand == NetMsgType::NOTFOUND) { - // We do not care about the NOTFOUND message, but logging an Unknown Command - // message would be undesirable as we transmit it ourselves. + // Remove the NOTFOUND transactions from the peer + LOCK(cs_main); + CNodeState *state = State(pfrom->GetId()); + std::vector vInv; + vRecv >> vInv; + if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + for (CInv &inv : vInv) { + if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) { + // If we receive a NOTFOUND message for a txid we requested, erase + // it from our data structures for this peer. + auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash); + if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { + // Skip any further work if this is a spurious NOTFOUND + // message. + continue; + } + state->m_tx_download.m_tx_in_flight.erase(in_flight_it); + state->m_tx_download.m_tx_announced.erase(inv.hash); + } + } + } return true; } @@ -4186,9 +4227,33 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // // Message: getdata (non-blocks) // + + // For robustness, expire old requests after a long timeout, so that + // we can resume downloading transactions from a peer even if they + // were unresponsive in the past. + // Eventually we should consider disconnecting peers, but this is + // conservative. + if (state.m_tx_download.m_check_expiry_timer <= nNow) { + for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { + if (it->second <= nNow - TX_EXPIRY_INTERVAL) { + LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); + state.m_tx_download.m_tx_announced.erase(it->first); + state.m_tx_download.m_tx_in_flight.erase(it++); + } else { + ++it; + } + } + // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize + // so that we're not doing this for all peers at the same time. + state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL); + } + auto& tx_process_time = state.m_tx_download.m_tx_process_time; while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const uint256& txid = tx_process_time.begin()->second; + const uint256 txid = tx_process_time.begin()->second; + // Erase this entry from tx_process_time (it may be added back for + // processing at a later time, see below) + tx_process_time.erase(tx_process_time.begin()); CInv inv(MSG_TX | GetFetchFlags(pto), txid); if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, @@ -4202,20 +4267,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM vGetData.clear(); } UpdateTxRequestTime(inv.hash, nNow); - state.m_tx_download.m_tx_in_flight.insert(inv.hash); + state.m_tx_download.m_tx_in_flight.emplace(inv.hash, nNow); } else { // This transaction is in flight from someone else; queue // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - RequestTx(&state, txid, nNow); + int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload); + tx_process_time.emplace(next_process_time, txid); } } else { // We have already seen this transaction, no need to download. state.m_tx_download.m_tx_announced.erase(inv.hash); state.m_tx_download.m_tx_in_flight.erase(inv.hash); } - tx_process_time.erase(tx_process_time.begin()); } From 8c11a8e6986578bbb1ba8285e184db0ea2218aed Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 13:58:50 +0200 Subject: [PATCH 03/21] Remove MSG_WITNESS_TX --- src/net_processing.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 854f1411e801..7b04e8ab895c 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -3367,7 +3367,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr vRecv >> vInv; if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { for (CInv &inv : vInv) { - if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) { + if (inv.type == MSG_TX) { // If we receive a NOTFOUND message for a txid we requested, erase // it from our data structures for this peer. auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash); @@ -4254,7 +4254,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); - CInv inv(MSG_TX | GetFetchFlags(pto), txid); + CInv inv(MSG_TX, txid); if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. From 29d3b75f28f8b6a2333f737aff29fbc3a636f1ac Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 13:25:38 +0200 Subject: [PATCH 04/21] Generalize TX request code --- src/net_processing.cpp | 86 +++++++++++++++++++++++++----------------- src/net_processing.h | 3 ++ 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 7b04e8ab895c..f3eb5ca9584c 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -315,18 +315,20 @@ struct CNodeState { * that if somehow the transaction is not accepted but also not added to * the reject filter, then we will eventually redownload from other * peers. + * + * DASH: For Dash, this does not only handles TXs but also all Dash specific objects */ struct TxDownloadState { /* Track when to attempt download of announced transactions (process * time in micros -> txid) */ - std::multimap m_tx_process_time; + std::multimap m_tx_process_time; //! Store all the transactions a peer has recently announced - std::set m_tx_announced; + std::set m_tx_announced; //! Store transactions which were requested by us, with timestamp - std::map m_tx_in_flight; + std::map m_tx_in_flight; //! Periodically check for stuck getdata requests int64_t m_check_expiry_timer{0}; @@ -654,35 +656,40 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vectorsecond; } return 0; } -void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - auto it = g_already_asked_for.find(txid); + AssertLockHeld(cs_main); + auto it = g_already_asked_for.find(hash); if (it == g_already_asked_for.end()) { - g_already_asked_for.insert(std::make_pair(txid, request_time)); + g_already_asked_for.insert(std::make_pair(hash, request_time)); } else { g_already_asked_for.update(it, request_time); } } -int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +int64_t CalculateObjectGetDataTime(const uint256& hash, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { + AssertLockHeld(cs_main); int64_t process_time; - int64_t last_request_time = GetTxRequestTime(txid); + int64_t last_request_time = GetObjectRequestTime(hash); // First time requesting this tx if (last_request_time == 0) { process_time = current_time; @@ -698,26 +705,35 @@ int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool u return process_time; } -void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { + AssertLockHeld(cs_main); CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_announced.count(txid)) { + peer_download_state.m_tx_announced.count(inv)) { // Too many queued announcements from this peer, or we already have // this announcement return; } - peer_download_state.m_tx_announced.insert(txid); + peer_download_state.m_tx_announced.insert(inv); // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload); + int64_t process_time = CalculateObjectGetDataTime(inv.hash, nNow, !state->fPreferredDownload); - peer_download_state.m_tx_process_time.emplace(process_time, txid); + peer_download_state.m_tx_process_time.emplace(process_time, inv); } -} // namespace +void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + RequestObject(state, inv, nNow); +} // This function is used for testing the stale tip eviction logic, see // DoS_tests.cpp @@ -2411,7 +2427,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr doubleRequestDelay = 10 * 1000000; break; } - RequestTx(State(pfrom->GetId()), inv.hash, nNow); + RequestObject(State(pfrom->GetId()), inv, nNow); } } } @@ -2697,9 +2713,9 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr CValidationState state; CNodeState* nodestate = State(pfrom->GetId()); - nodestate->m_tx_download.m_tx_announced.erase(inv.hash); - nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash); - EraseTxRequest(inv.hash); + nodestate->m_tx_download.m_tx_announced.erase(inv); + nodestate->m_tx_download.m_tx_in_flight.erase(inv); + EraseObjectRequest(inv.hash); if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */, false /* bypass_limits */, 0 /* nAbsurdFee */)) { @@ -2747,11 +2763,11 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr for (const CTxIn& txin : tx.vin) { CInv _inv(MSG_TX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow); + if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, nNow); // We don't know if the previous tx was a regular or a mixing one, try both CInv _inv2(MSG_DSTX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv2); - if (!AlreadyHave(_inv2)) RequestTx(State(pfrom->GetId()), _inv2.hash, nNow); + if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, nNow); } AddOrphanTx(ptx, pfrom->GetId()); @@ -3367,17 +3383,17 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr vRecv >> vInv; if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { for (CInv &inv : vInv) { - if (inv.type == MSG_TX) { + if (inv.IsKnownType()) { // If we receive a NOTFOUND message for a txid we requested, erase // it from our data structures for this peer. - auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash); + auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv); if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { // Skip any further work if this is a spurious NOTFOUND // message. continue; } state->m_tx_download.m_tx_in_flight.erase(in_flight_it); - state->m_tx_download.m_tx_announced.erase(inv.hash); + state->m_tx_download.m_tx_announced.erase(inv); } } } @@ -4248,17 +4264,17 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL); } + // DASH this code also handles non-TXs (Dash specific messages) auto& tx_process_time = state.m_tx_download.m_tx_process_time; while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const uint256 txid = tx_process_time.begin()->second; + const CInv inv = tx_process_time.begin()->second; // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); - CInv inv(MSG_TX, txid); if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. - int64_t last_request_time = GetTxRequestTime(inv.hash); + int64_t last_request_time = GetObjectRequestTime(inv.hash); if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); @@ -4266,20 +4282,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); vGetData.clear(); } - UpdateTxRequestTime(inv.hash, nNow); - state.m_tx_download.m_tx_in_flight.emplace(inv.hash, nNow); + UpdateObjectRequestTime(inv.hash, nNow); + state.m_tx_download.m_tx_in_flight.emplace(inv, nNow); } else { // This transaction is in flight from someone else; queue // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload); + int64_t next_process_time = CalculateObjectGetDataTime(txid, nNow, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, txid); } } else { // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(inv.hash); - state.m_tx_download.m_tx_in_flight.erase(inv.hash); + state.m_tx_download.m_tx_announced.erase(inv); + state.m_tx_download.m_tx_in_flight.erase(inv); } } diff --git a/src/net_processing.h b/src/net_processing.h index 667bf913b548..6d98f0c30c3a 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -84,4 +84,7 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); void Misbehaving(NodeId nodeid, int howmuch, const std::string& message=""); bool IsBanned(NodeId nodeid); +void EraseObjectRequest(const uint256& hash); +void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow); + #endif // BITCOIN_NET_PROCESSING_H From 56da433bd9877f4581fda77e00af7753c211d0bc Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 13:58:01 +0200 Subject: [PATCH 05/21] Fix governance.cpp --- src/governance/governance.cpp | 4 ++-- src/net_processing.cpp | 10 ++++++++++ src/net_processing.h | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 949da6b19b1f..d0fc86c8d995 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -1033,8 +1033,8 @@ int CGovernanceManager::RequestGovernanceObjectVotes(const std::vector& // stop early to prevent setAskFor overflow { LOCK(cs_main); - size_t nProjectedSize = pnode->setAskFor.size() + nProjectedVotes; - if (nProjectedSize > SETASKFOR_MAX_SZ / 2) continue; + size_t nProjectedSize = GetRequestedObjectCount(pnode->GetId()) + nProjectedVotes; + if (nProjectedSize > MAX_INV_SZ) continue; // to early to ask the same node if (mapAskedRecently[nHashGovobj].count(pnode->addr)) continue; } diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f3eb5ca9584c..d164dfce7f67 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -735,6 +735,16 @@ void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS RequestObject(state, inv, nNow); } +size_t GetRequestedObjectCount(NodeId nodeId) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return 0; + } + return state->m_tx_download.m_tx_process_time.size(); +} + // This function is used for testing the stale tip eviction logic, see // DoS_tests.cpp void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) diff --git a/src/net_processing.h b/src/net_processing.h index 6d98f0c30c3a..ced80382e6f5 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -86,5 +86,6 @@ bool IsBanned(NodeId nodeid); void EraseObjectRequest(const uint256& hash); void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow); +size_t GetRequestedObjectCount(NodeId nodeId); #endif // BITCOIN_NET_PROCESSING_H From 414943b6110bf61a9be5cf2c270487981eddc6cf Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 14:22:38 +0200 Subject: [PATCH 06/21] Make interval and timeout dependend on INV type --- src/net_processing.cpp | 64 ++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index d164dfce7f67..707b039687a1 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -67,8 +67,8 @@ static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; // 1 minute /** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds -/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */ -static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL; +/** How long to wait (expiry * factor microseconds) before expiring an in-flight getdata request to a peer */ +static constexpr int64_t TX_EXPIRY_INTERVAL_FACTOR = 10; static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, "To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ @@ -685,18 +685,47 @@ void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIV } } -int64_t CalculateObjectGetDataTime(const uint256& hash, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +int64_t GetObjectInterval(int invType) +{ + // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA + switch(invType) + { + case MSG_QUORUM_RECOVERED_SIG: + return 15 * 1000000; + case MSG_CLSIG: + return 5 * 1000000; + case MSG_ISLOCK: + return 10 * 1000000; + default: + return GETDATA_TX_INTERVAL; + } +} + +int64_t GetObjectExpiryInterval(int invType) +{ + return GetObjectInterval(invType) * TX_EXPIRY_INTERVAL_FACTOR; +} + +int64_t GetObjectRandomDelay(int invType) +{ + if (invType == MSG_TX) { + return GetRand(MAX_GETDATA_RANDOM_DELAY); + } + return 0; +} + +int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); int64_t process_time; - int64_t last_request_time = GetObjectRequestTime(hash); + int64_t last_request_time = GetObjectRequestTime(inv.hash); // First time requesting this tx if (last_request_time == 0) { process_time = current_time; } else { // Randomize the delay to avoid biasing some peers over others (such as due to // fixed ordering of peer processing in ThreadMessageHandler) - process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); + process_time = last_request_time + GetObjectInterval(inv.type) + GetObjectRandomDelay(inv.type); } // We delay processing announcements from inbound peers @@ -720,7 +749,7 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_L // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateObjectGetDataTime(inv.hash, nNow, !state->fPreferredDownload); + int64_t process_time = CalculateObjectGetDataTime(inv, nNow, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, inv); } @@ -2424,19 +2453,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } else if (!fAlreadyHave) { bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); if (allowWhileInIBD || (!fImporting && !fReindex && !IsInitialBlockDownload())) { - int64_t doubleRequestDelay = 2 * 60 * 1000000; - // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA - switch (inv.type) { - case MSG_QUORUM_RECOVERED_SIG: - doubleRequestDelay = 15 * 1000000; - break; - case MSG_CLSIG: - doubleRequestDelay = 5 * 1000000; - break; - case MSG_ISLOCK: - doubleRequestDelay = 10 * 1000000; - break; - } RequestObject(State(pfrom->GetId()), inv, nNow); } } @@ -4261,7 +4277,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // conservative. if (state.m_tx_download.m_check_expiry_timer <= nNow) { for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= nNow - TX_EXPIRY_INTERVAL) { + if (it->second <= nNow - GetObjectExpiryInterval(it->first.type)) { LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); state.m_tx_download.m_tx_announced.erase(it->first); state.m_tx_download.m_tx_in_flight.erase(it++); @@ -4271,7 +4287,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM } // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL); + state.m_tx_download.m_check_expiry_timer = nNow + GetObjectExpiryInterval(MSG_TX)/2 + GetRand(GetObjectExpiryInterval(MSG_TX)); } // DASH this code also handles non-TXs (Dash specific messages) @@ -4285,7 +4301,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // If this transaction was last requested more than 1 minute ago, // then request. int64_t last_request_time = GetObjectRequestTime(inv.hash); - if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { + if (last_request_time <= nNow - GetObjectInterval(inv.type)) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); if (vGetData.size() >= MAX_GETDATA_SZ) { @@ -4299,8 +4315,8 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - int64_t next_process_time = CalculateObjectGetDataTime(txid, nNow, !state.fPreferredDownload); - tx_process_time.emplace(next_process_time, txid); + int64_t next_process_time = CalculateObjectGetDataTime(inv, nNow, !state.fPreferredDownload); + tx_process_time.emplace(next_process_time, inv); } } else { // We have already seen this transaction, no need to download. From 004d9224c4f5cc4ef4dc3d33f528552fceac6c86 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 14:37:11 +0200 Subject: [PATCH 07/21] Replace uses of CConnman::RemoveAskFor with EraseObjectRequest --- src/governance/governance.cpp | 4 ++-- src/llmq/quorums_blockprocessor.cpp | 2 +- src/llmq/quorums_chainlocks.cpp | 2 +- src/llmq/quorums_dkgsessionhandler.cpp | 4 ++-- src/llmq/quorums_instantsend.cpp | 2 +- src/llmq/quorums_signing.cpp | 2 +- src/net.cpp | 18 ------------------ src/net.h | 3 --- src/spork.cpp | 2 +- 9 files changed, 9 insertions(+), 30 deletions(-) diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index d0fc86c8d995..5edee2bcf899 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -135,7 +135,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm { LOCK(cs_main); - connman.RemoveAskFor(nHash); + EraseObjectRequest(nHash); } if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) { @@ -210,7 +210,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm { LOCK(cs_main); - connman.RemoveAskFor(nHash); + EraseObjectRequest(nHash); } if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) { diff --git a/src/llmq/quorums_blockprocessor.cpp b/src/llmq/quorums_blockprocessor.cpp index 86ab34752119..57ac4fc685bd 100644 --- a/src/llmq/quorums_blockprocessor.cpp +++ b/src/llmq/quorums_blockprocessor.cpp @@ -36,7 +36,7 @@ void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, const std::string& strC auto hash = ::SerializeHash(qc); { LOCK(cs_main); - connman.RemoveAskFor(hash); + EraseObjectRequest(hash); } if (qc.IsNull()) { diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index c43055505edf..148993a60507 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -103,7 +103,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock { { LOCK(cs_main); - g_connman->RemoveAskFor(hash); + EraseObjectRequest(hash); } { diff --git a/src/llmq/quorums_dkgsessionhandler.cpp b/src/llmq/quorums_dkgsessionhandler.cpp index 45453f12ccfc..f80297c6bb3d 100644 --- a/src/llmq/quorums_dkgsessionhandler.cpp +++ b/src/llmq/quorums_dkgsessionhandler.cpp @@ -50,7 +50,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv) return; } - g_connman->RemoveAskFor(hash); + EraseObjectRequest(hash); pendingMessages.emplace_back(std::make_pair(from, std::move(pm))); } @@ -443,7 +443,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi auto hash = ::SerializeHash(msg); { LOCK(cs_main); - g_connman->RemoveAskFor(hash); + EraseObjectRequest(hash); } bool ban = false; diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index 6e7a7bbf2c10..9f2be1855b26 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -880,7 +880,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has { { LOCK(cs_main); - g_connman->RemoveAskFor(hash); + EraseObjectRequest(hash); } CTransactionRef tx; diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 79c1b7fc1caf..663ff7110c2e 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -669,7 +669,7 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re { LOCK(cs_main); - connman.RemoveAskFor(recoveredSig.GetHash()); + EraseObjectRequest(recoveredSig.GetHash()); } if (db.HasRecoveredSigForHash(recoveredSig.GetHash())) { diff --git a/src/net.cpp b/src/net.cpp index 8fc258e8200f..3803ba56cf67 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3054,16 +3054,6 @@ void CConnman::RelayInvFiltered(CInv &inv, const uint256& relatedTxHash, const i } } -void CConnman::RemoveAskFor(const uint256& hash) -{ - mapAlreadyAskedFor.erase(hash); - - LOCK(cs_vNodes); - for (const auto& pnode : vNodes) { - pnode->RemoveAskFor(hash); - } -} - void CConnman::RecordBytesRecv(uint64_t bytes) { LOCK(cs_totalBytesRecv); @@ -3269,14 +3259,6 @@ CNode::~CNode() CloseSocket(hSocket); } -void CNode::RemoveAskFor(const uint256& hash) -{ - setAskFor.erase(hash); - // we don't really remove it from queueAskFor as it would be too expensive to rebuild the heap - // instead, we're ignoring the entry later as it won't be found in setAskForInQueue anymore - setAskForInQueue.erase(hash); -} - bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; diff --git a/src/net.h b/src/net.h index b0ac6feeff11..66c177ff8977 100644 --- a/src/net.h +++ b/src/net.h @@ -353,7 +353,6 @@ class CConnman void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion = MIN_PEER_PROTO_VERSION, bool fAllowMasternodeConnections = false); // This overload will not update node filters, so use it only for the cases when other messages will update related transaction data in filters void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion = MIN_PEER_PROTO_VERSION, bool fAllowMasternodeConnections = false); - void RemoveAskFor(const uint256& hash); // Addrman functions size_t GetAddressCount() const; @@ -1029,8 +1028,6 @@ class CNode vBlockHashesToAnnounce.push_back(hash); } - void RemoveAskFor(const uint256& hash); - void CloseSocketDisconnect(); void copyStats(CNodeStats &stats); diff --git a/src/spork.cpp b/src/spork.cpp index acbb1d5fb821..d919c24a0589 100644 --- a/src/spork.cpp +++ b/src/spork.cpp @@ -131,7 +131,7 @@ void CSporkManager::ProcessSpork(CNode* pfrom, const std::string& strCommand, CD std::string strLogMsg; { LOCK(cs_main); - connman.RemoveAskFor(hash); + EraseObjectRequest(hash); if(!chainActive.Tip()) return; strLogMsg = strprintf("SPORK -- hash: %s id: %d value: %10d bestHeight: %d peer=%d", hash.ToString(), spork.nSporkID, spork.nValue, chainActive.Height(), pfrom->GetId()); } From 2443c8f582b0d8926afa99d5538c80d4259e64df Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 14:37:31 +0200 Subject: [PATCH 08/21] Replace uses of CNode::AskFor with RequestObject --- src/llmq/quorums_instantsend.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index 9f2be1855b26..b85cf3f26162 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -1345,7 +1345,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid) txid.ToString(), pnode->GetId()); CInv inv(MSG_TX, txid); - pnode->AskFor(inv); + RequestObject(pnode->GetId(), inv, GetTimeMicros()); } } for (CNode* pnode : nodesToAskFor) { From ef14b19f052196d77e6e990a9fb30a292eb05548 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 15:17:22 +0200 Subject: [PATCH 09/21] Don't re-request erased object requests --- src/net_processing.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 707b039687a1..3f9d93765e54 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -363,6 +363,7 @@ struct CNodeState { // Keeps track of the time (in microseconds) when transactions were requested last time unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); +unordered_limitedmap g_erased_object_requests(MAX_INV_SZ, MAX_INV_SZ * 2); /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -662,6 +663,7 @@ void EraseObjectRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); g_already_asked_for.erase(hash); + g_erased_object_requests.insert(std::make_pair(hash, GetTimeMillis())); } int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -4297,6 +4299,11 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); + if (g_erased_object_requests.count(inv.hash)) { + state.m_tx_download.m_tx_announced.erase(inv); + state.m_tx_download.m_tx_in_flight.erase(inv); + continue; + } if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. From 4bfc20cb6782da74f92ac99fbac4c7eebbe34192 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 13:23:29 +0200 Subject: [PATCH 10/21] Force re-requesting of IS locked TXs --- src/llmq/quorums_instantsend.cpp | 2 +- src/net_processing.cpp | 12 +++++++++--- src/net_processing.h | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index b85cf3f26162..a4637fa95b80 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -1345,7 +1345,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid) txid.ToString(), pnode->GetId()); CInv inv(MSG_TX, txid); - RequestObject(pnode->GetId(), inv, GetTimeMicros()); + RequestObject(pnode->GetId(), inv, GetTimeMicros(), true); } } for (CNode* pnode : nodesToAskFor) { diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3f9d93765e54..c7898f0e8bff 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -736,7 +736,7 @@ int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool u return process_time; } -void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; @@ -754,16 +754,22 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_L int64_t process_time = CalculateObjectGetDataTime(inv, nNow, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, inv); + + if (fForce) { + // make sure this object is actually requested ASAP + g_erased_object_requests.erase(inv.hash); + g_already_asked_for.erase(inv.hash); + } } -void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); auto* state = State(nodeId); if (!state) { return; } - RequestObject(state, inv, nNow); + RequestObject(state, inv, nNow, fForce); } size_t GetRequestedObjectCount(NodeId nodeId) diff --git a/src/net_processing.h b/src/net_processing.h index ced80382e6f5..55890f74f144 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -85,7 +85,7 @@ void Misbehaving(NodeId nodeid, int howmuch, const std::string& message=""); bool IsBanned(NodeId nodeid); void EraseObjectRequest(const uint256& hash); -void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow); +void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow, bool fForce=false); size_t GetRequestedObjectCount(NodeId nodeId); #endif // BITCOIN_NET_PROCESSING_H From 26fcd3f0bf726b9f50645e258dbc0ac18b086bd7 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 13:25:29 +0200 Subject: [PATCH 11/21] Also remove m_tx_announced and m_tx_in_flight entries when EraseObjectRequest is called Otherwise they'll run into false-positive timeouts. --- src/governance/governance.cpp | 4 ++-- src/llmq/quorums_blockprocessor.cpp | 2 +- src/llmq/quorums_chainlocks.cpp | 2 +- src/llmq/quorums_dkgsessionhandler.cpp | 27 +++++++++++++------------- src/llmq/quorums_dkgsessionhandler.h | 3 ++- src/llmq/quorums_instantsend.cpp | 2 +- src/llmq/quorums_signing.cpp | 2 +- src/net_processing.cpp | 26 ++++++++++++++++++------- src/net_processing.h | 2 +- src/spork.cpp | 2 +- 10 files changed, 43 insertions(+), 29 deletions(-) diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 5edee2bcf899..53675979cc43 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -135,7 +135,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm { LOCK(cs_main); - EraseObjectRequest(nHash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash)); } if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) { @@ -210,7 +210,7 @@ void CGovernanceManager::ProcessMessage(CNode* pfrom, const std::string& strComm { LOCK(cs_main); - EraseObjectRequest(nHash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nHash)); } if (pfrom->nVersion < MIN_GOVERNANCE_PEER_PROTO_VERSION) { diff --git a/src/llmq/quorums_blockprocessor.cpp b/src/llmq/quorums_blockprocessor.cpp index 57ac4fc685bd..c13caa17372e 100644 --- a/src/llmq/quorums_blockprocessor.cpp +++ b/src/llmq/quorums_blockprocessor.cpp @@ -36,7 +36,7 @@ void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, const std::string& strC auto hash = ::SerializeHash(qc); { LOCK(cs_main); - EraseObjectRequest(hash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_QUORUM_FINAL_COMMITMENT, hash)); } if (qc.IsNull()) { diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index 148993a60507..f31ecee17513 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -103,7 +103,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock { { LOCK(cs_main); - EraseObjectRequest(hash); + EraseObjectRequest(from, CInv(MSG_CLSIG, hash)); } { diff --git a/src/llmq/quorums_dkgsessionhandler.cpp b/src/llmq/quorums_dkgsessionhandler.cpp index f80297c6bb3d..ef0369a05bc1 100644 --- a/src/llmq/quorums_dkgsessionhandler.cpp +++ b/src/llmq/quorums_dkgsessionhandler.cpp @@ -18,8 +18,9 @@ namespace llmq { -CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode) : - maxMessagesPerNode(_maxMessagesPerNode) +CDKGPendingMessages::CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) : + maxMessagesPerNode(_maxMessagesPerNode), + invType(_invType) { } @@ -50,7 +51,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv) return; } - EraseObjectRequest(hash); + EraseObjectRequest(from, CInv(invType, hash)); pendingMessages.emplace_back(std::make_pair(from, std::move(pm))); } @@ -90,10 +91,10 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctp blsWorker(_blsWorker), dkgManager(_dkgManager), curSession(std::make_shared(_params, _blsWorker, _dkgManager)), - pendingContributions((size_t)_params.size * 2), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) - pendingComplaints((size_t)_params.size * 2), - pendingJustifications((size_t)_params.size * 2), - pendingPrematureCommitments((size_t)_params.size * 2) + pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) + pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT), + pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION), + pendingPrematureCommitments((size_t)_params.size * 2, MSG_QUORUM_PREMATURE_COMMITMENT) { phaseHandlerThread = std::thread([this] { RenameThread(strprintf("dash-q-phase-%d", (uint8_t)params.type).c_str()); @@ -416,7 +417,7 @@ std::set BatchVerifyMessageSigs(CDKGSession& session, const std::vector< return ret; } -template +template bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendingMessages, size_t maxCount) { auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); @@ -443,7 +444,7 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi auto hash = ::SerializeHash(msg); { LOCK(cs_main); - EraseObjectRequest(hash); + EraseObjectRequest(p.first, CInv(MessageType, hash)); } bool ban = false; @@ -540,7 +541,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->Contribute(pendingContributions); }; auto fContributeWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingContributions, 8); + return ProcessPendingMessageBatch(*curSession, pendingContributions, 8); }; HandlePhase(QuorumPhase_Contribute, QuorumPhase_Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait); @@ -549,7 +550,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndComplain(pendingComplaints); }; auto fComplainWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingComplaints, 8); + return ProcessPendingMessageBatch(*curSession, pendingComplaints, 8); }; HandlePhase(QuorumPhase_Complain, QuorumPhase_Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait); @@ -558,7 +559,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndJustify(pendingJustifications); }; auto fJustifyWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingJustifications, 8); + return ProcessPendingMessageBatch(*curSession, pendingJustifications, 8); }; HandlePhase(QuorumPhase_Justify, QuorumPhase_Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait); @@ -567,7 +568,7 @@ void CDKGSessionHandler::HandleDKGRound() curSession->VerifyAndCommit(pendingPrematureCommitments); }; auto fCommitWait = [this] { - return ProcessPendingMessageBatch(*curSession, pendingPrematureCommitments, 8); + return ProcessPendingMessageBatch(*curSession, pendingPrematureCommitments, 8); }; HandlePhase(QuorumPhase_Commit, QuorumPhase_Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait); diff --git a/src/llmq/quorums_dkgsessionhandler.h b/src/llmq/quorums_dkgsessionhandler.h index 5244c6d9ceb0..7ee399973bd8 100644 --- a/src/llmq/quorums_dkgsessionhandler.h +++ b/src/llmq/quorums_dkgsessionhandler.h @@ -40,13 +40,14 @@ class CDKGPendingMessages private: mutable CCriticalSection cs; + int invType; size_t maxMessagesPerNode; std::list pendingMessages; std::map messagesPerNode; std::set seenMessages; public: - explicit CDKGPendingMessages(size_t _maxMessagesPerNode); + explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType); void PushPendingMessage(NodeId from, CDataStream& vRecv); std::list PopPendingMessages(size_t maxCount); diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index a4637fa95b80..a2781ede030a 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -880,7 +880,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has { { LOCK(cs_main); - EraseObjectRequest(hash); + EraseObjectRequest(from, CInv(MSG_ISLOCK, hash)); } CTransactionRef tx; diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 663ff7110c2e..842ea20ff788 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -669,7 +669,7 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re { LOCK(cs_main); - EraseObjectRequest(recoveredSig.GetHash()); + EraseObjectRequest(nodeId, CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig.GetHash())); } if (db.HasRecoveredSigForHash(recoveredSig.GetHash())) { diff --git a/src/net_processing.cpp b/src/net_processing.cpp index c7898f0e8bff..7b7dbc1d2092 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -659,11 +659,26 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vectorm_tx_download.m_tx_announced.erase(inv); + nodestate->m_tx_download.m_tx_in_flight.erase(inv); + } +} + +void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + EraseObjectRequest(state, inv); } int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -2746,10 +2761,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr bool fMissingInputs = false; CValidationState state; - CNodeState* nodestate = State(pfrom->GetId()); - nodestate->m_tx_download.m_tx_announced.erase(inv); - nodestate->m_tx_download.m_tx_in_flight.erase(inv); - EraseObjectRequest(inv.hash); + EraseObjectRequest(pfrom->GetId(), inv); if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */, false /* bypass_limits */, 0 /* nAbsurdFee */)) { diff --git a/src/net_processing.h b/src/net_processing.h index 55890f74f144..8c0260004aad 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -84,7 +84,7 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats); void Misbehaving(NodeId nodeid, int howmuch, const std::string& message=""); bool IsBanned(NodeId nodeid); -void EraseObjectRequest(const uint256& hash); +void EraseObjectRequest(NodeId nodeId, const CInv& inv); void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow, bool fForce=false); size_t GetRequestedObjectCount(NodeId nodeId); diff --git a/src/spork.cpp b/src/spork.cpp index d919c24a0589..f1c65133d2c2 100644 --- a/src/spork.cpp +++ b/src/spork.cpp @@ -131,7 +131,7 @@ void CSporkManager::ProcessSpork(CNode* pfrom, const std::string& strCommand, CD std::string strLogMsg; { LOCK(cs_main); - EraseObjectRequest(hash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_SPORK, hash)); if(!chainActive.Tip()) return; strLogMsg = strprintf("SPORK -- hash: %s id: %d value: %10d bestHeight: %d peer=%d", hash.ToString(), spork.nSporkID, spork.nValue, chainActive.Height(), pfrom->GetId()); } From 81503598b93355ecc2e841d1653613c822d9f567 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 13:26:34 +0200 Subject: [PATCH 12/21] No inbound delay for non-TX objects and masternodes --- src/net_processing.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 7b7dbc1d2092..391875c4e9ab 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -746,7 +746,7 @@ int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool u } // We delay processing announcements from inbound peers - if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; + if (inv.type == MSG_TX && !fMasternodeMode && use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; return process_time; } From 2d4cc8a19ee095d5c248c67681dcd0a1fe74d45f Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 7 Apr 2020 13:26:47 +0200 Subject: [PATCH 13/21] More logging for object request handling --- src/net_processing.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 391875c4e9ab..cc7827df1b6e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -662,6 +662,7 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector& interruptM // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); if (g_erased_object_requests.count(inv.hash)) { + LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); state.m_tx_download.m_tx_announced.erase(inv); state.m_tx_download.m_tx_in_flight.erase(inv); continue; @@ -4342,11 +4346,13 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // requests to outbound peers). int64_t next_process_time = CalculateObjectGetDataTime(inv, nNow, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, inv); + LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time, next_process_time - nNow, pto->GetId()); } } else { // We have already seen this transaction, no need to download. state.m_tx_download.m_tx_announced.erase(inv); state.m_tx_download.m_tx_in_flight.erase(inv); + LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); } } From a76bafb98c9ef9321c9ddef0f382c57ef36427d1 Mon Sep 17 00:00:00 2001 From: "Wladimir J. van der Laan" Date: Wed, 29 May 2019 13:39:45 +0200 Subject: [PATCH 14/21] Merge #16046: util: Add type safe GetTime fa013664ae23d0682a195b9bded85bc19c99536e util: Add type safe GetTime (MarcoFalke) Pull request description: There are basically two ways to get the time in Bitcoin Core: * get the system time (via `GetSystemTimeInSeconds` or `GetTime{Millis,Micros}`) * get the mockable time (via `GetTime`) Both return the same type (a plain int). This can lead to (test-only) bugs such as 99464bc38e9575ff47f8e33223b252dcea2055e3. Fix that by deprecating `GetTime` and adding a `GetTime<>` that returns the mockable time in a non-int type. The new util function is currently unused, but new code should it where possible. ACKs for commit fa0136: promag: utACK fa013664. Tree-SHA512: efab9c463f079fd8fd3030c479637c7b1e8be567a881234bd0f555c8f87e518e3b43ef2466128103db8fc40295aaf24e87ad76d91f338c631246fc703477e95c --- src/Makefile.bench.include | 1 + src/bench/util_time.cpp | 42 ++++++++++++++++++++++++++++++++++++++ src/test/util_tests.cpp | 21 +++++++++++++++++++ src/utiltime.cpp | 16 ++++++++++++++- src/utiltime.h | 25 +++++++++++++++-------- 5 files changed, 95 insertions(+), 10 deletions(-) create mode 100644 src/bench/util_time.cpp diff --git a/src/Makefile.bench.include b/src/Makefile.bench.include index 20192f9fed9f..b5ae95a3bb38 100644 --- a/src/Makefile.bench.include +++ b/src/Makefile.bench.include @@ -29,6 +29,7 @@ bench_bench_dash_SOURCES = \ bench/ccoins_caching.cpp \ bench/merkle_root.cpp \ bench/mempool_eviction.cpp \ + bench/util_time.cpp \ bench/base58.cpp \ bench/lockedpool.cpp \ bench/poly1305.cpp \ diff --git a/src/bench/util_time.cpp b/src/bench/util_time.cpp new file mode 100644 index 000000000000..72d97354aa55 --- /dev/null +++ b/src/bench/util_time.cpp @@ -0,0 +1,42 @@ +// Copyright (c) 2019 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include + +static void BenchTimeDeprecated(benchmark::State& state) +{ + while (state.KeepRunning()) { + (void)GetTime(); + } +} + +static void BenchTimeMock(benchmark::State& state) +{ + SetMockTime(111); + while (state.KeepRunning()) { + (void)GetTime(); + } + SetMockTime(0); +} + +static void BenchTimeMillis(benchmark::State& state) +{ + while (state.KeepRunning()) { + (void)GetTime(); + } +} + +static void BenchTimeMillisSys(benchmark::State& state) +{ + while (state.KeepRunning()) { + (void)GetTimeMillis(); + } +} + +BENCHMARK(BenchTimeDeprecated, 100000000); +BENCHMARK(BenchTimeMillis, 6000000); +BENCHMARK(BenchTimeMillisSys, 6000000); +BENCHMARK(BenchTimeMock, 300000000); diff --git a/src/test/util_tests.cpp b/src/test/util_tests.cpp index 1254391e0181..f7726420e2ee 100644 --- a/src/test/util_tests.cpp +++ b/src/test/util_tests.cpp @@ -354,6 +354,27 @@ BOOST_AUTO_TEST_CASE(gettime) BOOST_CHECK((GetTime() & ~0xFFFFFFFFLL) == 0); } +BOOST_AUTO_TEST_CASE(util_time_GetTime) +{ + SetMockTime(111); + // Check that mock time does not change after a sleep + for (const auto& num_sleep : {0, 1}) { + MilliSleep(num_sleep); + BOOST_CHECK_EQUAL(111, GetTime()); // Deprecated time getter + BOOST_CHECK_EQUAL(111, GetTime().count()); + BOOST_CHECK_EQUAL(111000, GetTime().count()); + BOOST_CHECK_EQUAL(111000000, GetTime().count()); + } + + SetMockTime(0); + // Check that system time changes after a sleep + const auto ms_0 = GetTime(); + const auto us_0 = GetTime(); + MilliSleep(1); + BOOST_CHECK(ms_0 < GetTime()); + BOOST_CHECK(us_0 < GetTime()); +} + BOOST_AUTO_TEST_CASE(test_ParseInt32) { int32_t n; diff --git a/src/utiltime.cpp b/src/utiltime.cpp index f9d910dd19f4..72692f09486b 100644 --- a/src/utiltime.cpp +++ b/src/utiltime.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto -// Copyright (c) 2009-2015 The Bitcoin Core developers +// Copyright (c) 2009-2019 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -27,6 +27,20 @@ int64_t GetTime() return now; } +template +T GetTime() +{ + const std::chrono::seconds mocktime{nMockTime.load(std::memory_order_relaxed)}; + + return std::chrono::duration_cast( + mocktime.count() ? + mocktime : + std::chrono::microseconds{GetTimeMicros()}); +} +template std::chrono::seconds GetTime(); +template std::chrono::milliseconds GetTime(); +template std::chrono::microseconds GetTime(); + void SetMockTime(int64_t nMockTimeIn) { nMockTime.store(nMockTimeIn, std::memory_order_relaxed); diff --git a/src/utiltime.h b/src/utiltime.h index 8ae8540b89c7..b5e6b1d39f77 100644 --- a/src/utiltime.h +++ b/src/utiltime.h @@ -1,5 +1,5 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto -// Copyright (c) 2009-2015 The Bitcoin Core developers +// Copyright (c) 2009-2019 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -8,25 +8,32 @@ #include #include +#include /** - * GetTimeMicros() and GetTimeMillis() both return the system time, but in - * different units. GetTime() returns the system time in seconds, but also - * supports mocktime, where the time can be specified by the user, eg for - * testing (eg with the setmocktime rpc, or -mocktime argument). - * - * TODO: Rework these functions to be type-safe (so that we don't inadvertently - * compare numbers with different units, or compare a mocktime to system time). + * DEPRECATED + * Use either GetSystemTimeInSeconds (not mockable) or GetTime (mockable) */ - int64_t GetTime(); + +/** Returns the system time (not mockable) */ int64_t GetTimeMillis(); +/** Returns the system time (not mockable) */ int64_t GetTimeMicros(); +/** Returns the system time (not mockable) */ int64_t GetSystemTimeInSeconds(); // Like GetTime(), but not mockable + +/** For testing. Set e.g. with the setmocktime rpc, or -mocktime argument */ void SetMockTime(int64_t nMockTimeIn); +/** For testing */ int64_t GetMockTime(); + void MilliSleep(int64_t n); +/** Return system time (or mocked time, if set) */ +template +T GetTime(); + std::string DateTimeStrFormat(const char* pszFormat, int64_t nTime); #endif // BITCOIN_UTILTIME_H From 8e5fbedf21b120d87a12bc6063454fa716fc18f8 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 14:27:07 +0200 Subject: [PATCH 15/21] net: Use mockable time for tx download # Conflicts: # src/net_processing.cpp # src/random.cpp # src/random.h --- src/net_processing.cpp | 83 ++++++++++++++++++++++-------------------- src/random.cpp | 5 +++ src/random.h | 4 +- 3 files changed, 51 insertions(+), 41 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index cc7827df1b6e..91c9b839ef27 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -62,11 +62,11 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; /** Maximum number of announced transactions from a peer */ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; /** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds +static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}}; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ -static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; // 1 minute +static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}}; /** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ -static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds +static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}}; /** How long to wait (expiry * factor microseconds) before expiring an in-flight getdata request to a peer */ static constexpr int64_t TX_EXPIRY_INTERVAL_FACTOR = 10; static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, @@ -322,16 +322,16 @@ struct CNodeState { /* Track when to attempt download of announced transactions (process * time in micros -> txid) */ - std::multimap m_tx_process_time; + std::multimap m_tx_process_time; //! Store all the transactions a peer has recently announced std::set m_tx_announced; //! Store transactions which were requested by us, with timestamp - std::map m_tx_in_flight; + std::map m_tx_in_flight; //! Periodically check for stuck getdata requests - int64_t m_check_expiry_timer{0}; + std::chrono::microseconds m_check_expiry_timer{0}; }; TxDownloadState m_tx_download; @@ -362,8 +362,8 @@ struct CNodeState { }; // Keeps track of the time (in microseconds) when transactions were requested last time -unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); -unordered_limitedmap g_erased_object_requests(MAX_INV_SZ, MAX_INV_SZ * 2); +unordered_limitedmap g_already_asked_for(MAX_INV_SZ, MAX_INV_SZ * 2); +unordered_limitedmap g_erased_object_requests(MAX_INV_SZ, MAX_INV_SZ * 2); /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -682,17 +682,17 @@ void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED EraseObjectRequest(state, inv); } -int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +std::chrono::microseconds GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); auto it = g_already_asked_for.find(hash); if (it != g_already_asked_for.end()) { return it->second; } - return 0; + return {}; } -void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void UpdateObjectRequestTime(const uint256& hash, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); auto it = g_already_asked_for.find(hash); @@ -703,7 +703,7 @@ void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIV } } -int64_t GetObjectInterval(int invType) +std::chrono::microseconds GetObjectInterval(int invType) { // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA switch(invType) @@ -719,26 +719,26 @@ int64_t GetObjectInterval(int invType) } } -int64_t GetObjectExpiryInterval(int invType) +std::chrono::microseconds GetObjectExpiryInterval(int invType) { return GetObjectInterval(invType) * TX_EXPIRY_INTERVAL_FACTOR; } -int64_t GetObjectRandomDelay(int invType) +std::chrono::microseconds GetObjectRandomDelay(int invType) { if (invType == MSG_TX) { - return GetRand(MAX_GETDATA_RANDOM_DELAY); + return GetRandMicros(MAX_GETDATA_RANDOM_DELAY); } - return 0; + return {}; } -int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chrono::microseconds current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); - int64_t process_time; - int64_t last_request_time = GetObjectRequestTime(inv.hash); + std::chrono::microseconds process_time; + const auto last_request_time = GetObjectRequestTime(inv.hash); // First time requesting this tx - if (last_request_time == 0) { + if (last_request_time.count() == 0) { process_time = current_time; } else { // Randomize the delay to avoid biasing some peers over others (such as due to @@ -752,7 +752,7 @@ int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool u return process_time; } -void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; @@ -767,7 +767,7 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow, bool fForce // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateObjectGetDataTime(inv, nNow, !state->fPreferredDownload); + std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, inv); @@ -777,17 +777,17 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow, bool fForce g_already_asked_for.erase(inv.hash); } - LogPrint(BCLog::NET, "%s -- inv=(%s), nNow=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), nNow, process_time, process_time - nNow); + LogPrint(BCLog::NET, "%s -- inv=(%s), current_time=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), current_time.count(), process_time.count(), (process_time - current_time).count()); } -void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); auto* state = State(nodeId); if (!state) { return; } - RequestObject(state, inv, nNow, fForce); + RequestObject(state, inv, current_time, fForce); } size_t GetRequestedObjectCount(NodeId nodeId) @@ -2422,7 +2422,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr LOCK(cs_main); - int64_t nNow = GetTimeMicros(); + const auto current_time = GetTime(); for (CInv &inv : vInv) { @@ -2479,7 +2479,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } else if (!fAlreadyHave) { bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); if (allowWhileInIBD || (!fImporting && !fReindex && !IsInitialBlockDownload())) { - RequestObject(State(pfrom->GetId()), inv, nNow); + RequestObject(State(pfrom->GetId()), inv, current_time); } } } @@ -2807,16 +2807,16 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } } if (!fRejectedParents) { - int64_t nNow = GetTimeMicros(); + const auto current_time = GetTime(); for (const CTxIn& txin : tx.vin) { CInv _inv(MSG_TX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, nNow); + if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, current_time); // We don't know if the previous tx was a regular or a mixing one, try both CInv _inv2(MSG_DSTX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv2); - if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, nNow); + if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, current_time); } AddOrphanTx(ptx, pfrom->GetId()); @@ -4207,6 +4207,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); // Detect whether we're stalling + const auto current_time = GetTime(); + // nNow is the current system time (GetTimeMicros is not mockable) and + // should be replaced by the mockable current_time eventually nNow = GetTimeMicros(); if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { // Stalling only triggers when the block download window cannot move. During normal steady state, @@ -4298,9 +4301,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // were unresponsive in the past. // Eventually we should consider disconnecting peers, but this is // conservative. - if (state.m_tx_download.m_check_expiry_timer <= nNow) { + if (state.m_tx_download.m_check_expiry_timer <= current_time) { for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= nNow - GetObjectExpiryInterval(it->first.type)) { + if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) { LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); state.m_tx_download.m_tx_announced.erase(it->first); state.m_tx_download.m_tx_in_flight.erase(it++); @@ -4310,12 +4313,12 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM } // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = nNow + GetObjectExpiryInterval(MSG_TX)/2 + GetRand(GetObjectExpiryInterval(MSG_TX)); + state.m_tx_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); } // DASH this code also handles non-TXs (Dash specific messages) auto& tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { + while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { const CInv inv = tx_process_time.begin()->second; // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) @@ -4329,24 +4332,24 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. - int64_t last_request_time = GetObjectRequestTime(inv.hash); - if (last_request_time <= nNow - GetObjectInterval(inv.type)) { + const auto last_request_time = GetObjectRequestTime(inv.hash); + if (last_request_time <= current_time - GetObjectInterval(inv.type)) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); if (vGetData.size() >= MAX_GETDATA_SZ) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); vGetData.clear(); } - UpdateObjectRequestTime(inv.hash, nNow); - state.m_tx_download.m_tx_in_flight.emplace(inv, nNow); + UpdateObjectRequestTime(inv.hash, current_time); + state.m_tx_download.m_tx_in_flight.emplace(inv, current_time); } else { // This transaction is in flight from someone else; queue // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - int64_t next_process_time = CalculateObjectGetDataTime(inv, nNow, !state.fPreferredDownload); + const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, inv); - LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time, next_process_time - nNow, pto->GetId()); + LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId()); } } else { // We have already seen this transaction, no need to download. diff --git a/src/random.cpp b/src/random.cpp index 0a354adc5888..b562df6dc1ad 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -364,6 +364,11 @@ uint64_t GetRand(uint64_t nMax) return (nRand % nMax); } +std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept +{ + return std::chrono::microseconds{GetRand(duration_max.count())}; +} + int GetRandInt(int nMax) { return GetRand(nMax); diff --git a/src/random.h b/src/random.h index ff009047a1c4..51c1568d9e87 100644 --- a/src/random.h +++ b/src/random.h @@ -10,7 +10,8 @@ #include #include -#include +#include // For std::chrono::microseconds +#include /* Seed OpenSSL PRNG with additional entropy data */ void RandAddSeed(); @@ -20,6 +21,7 @@ void RandAddSeed(); */ void GetRandBytes(unsigned char* buf, int num); uint64_t GetRand(uint64_t nMax); +std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept; int GetRandInt(int nMax); uint256 GetRandHash(); From a7b38efb9872a0fa255c5c8efc7205a7bf74ac78 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 14:49:27 +0200 Subject: [PATCH 16/21] Fix GetObjectInterval and EraseObjectRequest --- src/net_processing.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 91c9b839ef27..56e05496b95a 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -664,7 +664,7 @@ void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_ AssertLockHeld(cs_main); LogPrint(BCLog::NET, "%s -- inv=(%s)\n", __func__, inv.ToString()); g_already_asked_for.erase(inv.hash); - g_erased_object_requests.insert(std::make_pair(inv.hash, GetTimeMillis())); + g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime())); if (nodestate) { nodestate->m_tx_download.m_tx_announced.erase(inv); @@ -709,11 +709,11 @@ std::chrono::microseconds GetObjectInterval(int invType) switch(invType) { case MSG_QUORUM_RECOVERED_SIG: - return 15 * 1000000; + return std::chrono::seconds{15}; case MSG_CLSIG: - return 5 * 1000000; + return std::chrono::seconds{5}; case MSG_ISLOCK: - return 10 * 1000000; + return std::chrono::seconds{10}; default: return GETDATA_TX_INTERVAL; } From 775e4ba8230092b83502bcf21dfb5213357846b1 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Wed, 8 Apr 2020 00:59:19 +0300 Subject: [PATCH 17/21] transactions -> objects + corresponding changes in comments --- src/net_processing.cpp | 142 ++++++++++++++++++++--------------------- 1 file changed, 70 insertions(+), 72 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 56e05496b95a..5b80d90ceebd 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -57,10 +57,10 @@ # error "Dash Core cannot be compiled without assertions." #endif -/** Maximum number of in-flight transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; -/** Maximum number of announced transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** Maximum number of in-flight objects from a peer */ +static constexpr int32_t MAX_PEER_OBJECT_IN_FLIGHT = 100; +/** Maximum number of announced objects from a peer */ +static constexpr int32_t MAX_PEER_OBJECT_ANNOUNCEMENTS = 2 * MAX_INV_SZ; /** How many microseconds to delay requesting transactions from inbound peers */ static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}}; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ @@ -272,69 +272,67 @@ struct CNodeState { int64_t m_last_block_announcement; /* - * State associated with transaction download. + * State associated with objects download. * * Tx download algorithm: * - * When inv comes in, queue up (process_time, txid) inside the peer's - * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer - * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). + * When inv comes in, queue up (process_time, inv) inside the peer's + * CNodeState (m_object_process_time) as long as m_object_announced for the peer + * isn't too big (MAX_PEER_OBJECT_ANNOUNCEMENTS). * - * The process_time for a transaction is set to nNow for outbound peers, + * The process_time for a objects is set to nNow for outbound peers, * nNow + 2 seconds for inbound peers. This is the time at which we'll - * consider trying to request the transaction from the peer in + * consider trying to request the objects from the peer in * SendMessages(). The delay for inbound peers is to allow outbound peers * a chance to announce before we request from inbound peers, to prevent * an adversary from using inbound connections to blind us to a - * transaction (InvBlock). + * objects (InvBlock). * * When we call SendMessages() for a given peer, - * we will loop over the transactions in m_tx_process_time, looking - * at the transactions whose process_time <= nNow. We'll request each - * such transaction that we don't have already and that hasn't been + * we will loop over the objects in m_object_process_time, looking + * at the objects whose process_time <= nNow. We'll request each + * such objects that we don't have already and that hasn't been * requested from another peer recently, up until we hit the - * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update - * g_already_asked_for for each requested txid, storing the time of the - * GETDATA request. We use g_already_asked_for to coordinate transaction + * MAX_PEER_OBJECT_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested inv, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate objects * requests amongst our peers. * - * For transactions that we still need but we have already recently - * requested from some other peer, we'll reinsert (process_time, txid) - * back into the peer's m_tx_process_time at the point in the future at + * For objects that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, inv) + * back into the peer's m_object_process_time at the point in the future at * which the most recent GETDATA request would time out (ie - * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). + * GetObjectInterval + the request time stored in g_already_asked_for). * We add an additional delay for inbound peers, again to prefer * attempting download from outbound peers first. * We also add an extra small random delay up to 2 seconds * to avoid biasing some peers over others. (e.g., due to fixed ordering * of peer processing in ThreadMessageHandler). * - * When we receive a transaction from a peer, we remove the txid from the - * peer's m_tx_in_flight set and from their recently announced set - * (m_tx_announced). We also clear g_already_asked_for for that entry, so - * that if somehow the transaction is not accepted but also not added to + * When we receive a objects from a peer, we remove the inv from the + * peer's m_object_in_flight set and from their recently announced set + * (m_object_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the objects is not accepted but also not added to * the reject filter, then we will eventually redownload from other * peers. - * - * DASH: For Dash, this does not only handles TXs but also all Dash specific objects */ - struct TxDownloadState { - /* Track when to attempt download of announced transactions (process - * time in micros -> txid) + struct ObjectDownloadState { + /* Track when to attempt download of announced objects (process + * time in micros -> inv) */ - std::multimap m_tx_process_time; + std::multimap m_object_process_time; - //! Store all the transactions a peer has recently announced - std::set m_tx_announced; + //! Store all the objects a peer has recently announced + std::set m_object_announced; - //! Store transactions which were requested by us, with timestamp - std::map m_tx_in_flight; + //! Store objects which were requested by us, with timestamp + std::map m_object_in_flight; //! Periodically check for stuck getdata requests std::chrono::microseconds m_check_expiry_timer{0}; }; - TxDownloadState m_tx_download; + ObjectDownloadState m_object_download; CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; @@ -667,8 +665,8 @@ void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_ g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime())); if (nodestate) { - nodestate->m_tx_download.m_tx_announced.erase(inv); - nodestate->m_tx_download.m_tx_in_flight.erase(inv); + nodestate->m_object_download.m_object_announced.erase(inv); + nodestate->m_object_download.m_object_in_flight.erase(inv); } } @@ -755,21 +753,21 @@ std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chron void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); - CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_announced.count(inv)) { + CNodeState::ObjectDownloadState& peer_download_state = state->m_object_download; + if (peer_download_state.m_object_announced.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || + peer_download_state.m_object_process_time.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS || + peer_download_state.m_object_announced.count(inv)) { // Too many queued announcements from this peer, or we already have // this announcement return; } - peer_download_state.m_tx_announced.insert(inv); + peer_download_state.m_object_announced.insert(inv); // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload); - peer_download_state.m_tx_process_time.emplace(process_time, inv); + peer_download_state.m_object_process_time.emplace(process_time, inv); if (fForce) { // make sure this object is actually requested ASAP @@ -797,7 +795,7 @@ size_t GetRequestedObjectCount(NodeId nodeId) if (!state) { return 0; } - return state->m_tx_download.m_tx_process_time.size(); + return state->m_object_download.m_object_process_time.size(); } // This function is used for testing the stale tip eviction logic, see @@ -3430,19 +3428,19 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr CNodeState *state = State(pfrom->GetId()); std::vector vInv; vRecv >> vInv; - if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (vInv.size() <= MAX_PEER_OBJECT_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { for (CInv &inv : vInv) { if (inv.IsKnownType()) { // If we receive a NOTFOUND message for a txid we requested, erase // it from our data structures for this peer. - auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv); - if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { + auto in_flight_it = state->m_object_download.m_object_in_flight.find(inv); + if (in_flight_it == state->m_object_download.m_object_in_flight.end()) { // Skip any further work if this is a spurious NOTFOUND // message. continue; } - state->m_tx_download.m_tx_in_flight.erase(in_flight_it); - state->m_tx_download.m_tx_announced.erase(inv); + state->m_object_download.m_object_in_flight.erase(in_flight_it); + state->m_object_download.m_object_announced.erase(inv); } } } @@ -4297,40 +4295,40 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // // For robustness, expire old requests after a long timeout, so that - // we can resume downloading transactions from a peer even if they + // we can resume downloading objects from a peer even if they // were unresponsive in the past. // Eventually we should consider disconnecting peers, but this is // conservative. - if (state.m_tx_download.m_check_expiry_timer <= current_time) { - for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { + if (state.m_object_download.m_check_expiry_timer <= current_time) { + for (auto it=state.m_object_download.m_object_in_flight.begin(); it != state.m_object_download.m_object_in_flight.end();) { if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) { - LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); - state.m_tx_download.m_tx_announced.erase(it->first); - state.m_tx_download.m_tx_in_flight.erase(it++); + LogPrint(BCLog::NET, "timeout of inflight object %s from peer=%d\n", it->first.ToString(), pto->GetId()); + state.m_object_download.m_object_announced.erase(it->first); + state.m_object_download.m_object_in_flight.erase(it++); } else { ++it; } } - // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize + // On average, we do this check every GetObjectExpiryInterval. Randomize // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); + state.m_object_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); } // DASH this code also handles non-TXs (Dash specific messages) - auto& tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const CInv inv = tx_process_time.begin()->second; - // Erase this entry from tx_process_time (it may be added back for + auto& object_process_time = state.m_object_download.m_object_process_time; + while (!object_process_time.empty() && object_process_time.begin()->first <= current_time && state.m_object_download.m_object_in_flight.size() < MAX_PEER_OBJECT_IN_FLIGHT) { + const CInv inv = object_process_time.begin()->second; + // Erase this entry from object_process_time (it may be added back for // processing at a later time, see below) - tx_process_time.erase(tx_process_time.begin()); + object_process_time.erase(object_process_time.begin()); if (g_erased_object_requests.count(inv.hash)) { LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); - state.m_tx_download.m_tx_announced.erase(inv); - state.m_tx_download.m_tx_in_flight.erase(inv); + state.m_object_download.m_object_announced.erase(inv); + state.m_object_download.m_object_in_flight.erase(inv); continue; } if (!AlreadyHave(inv)) { - // If this transaction was last requested more than 1 minute ago, + // If this object was last requested more than GetObjectInterval ago, // then request. const auto last_request_time = GetObjectRequestTime(inv.hash); if (last_request_time <= current_time - GetObjectInterval(inv.type)) { @@ -4341,20 +4339,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM vGetData.clear(); } UpdateObjectRequestTime(inv.hash, current_time); - state.m_tx_download.m_tx_in_flight.emplace(inv, current_time); + state.m_object_download.m_object_in_flight.emplace(inv, current_time); } else { - // This transaction is in flight from someone else; queue + // This object is in flight from someone else; queue // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload); - tx_process_time.emplace(next_process_time, inv); + object_process_time.emplace(next_process_time, inv); LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId()); } } else { - // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(inv); - state.m_tx_download.m_tx_in_flight.erase(inv); + // We have already seen this object, no need to download. + state.m_object_download.m_object_announced.erase(inv); + state.m_object_download.m_object_in_flight.erase(inv); LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); } } From 6b32192bdc00a2cb96b3bf2c8a663581936c9ec2 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 14:55:27 +0200 Subject: [PATCH 18/21] Fix compilation --- src/llmq/quorums_instantsend.cpp | 2 +- src/net_processing.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/llmq/quorums_instantsend.cpp b/src/llmq/quorums_instantsend.cpp index a2781ede030a..d335fa4cc417 100644 --- a/src/llmq/quorums_instantsend.cpp +++ b/src/llmq/quorums_instantsend.cpp @@ -1345,7 +1345,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid) txid.ToString(), pnode->GetId()); CInv inv(MSG_TX, txid); - RequestObject(pnode->GetId(), inv, GetTimeMicros(), true); + RequestObject(pnode->GetId(), inv, GetTime(), true); } } for (CNode* pnode : nodesToAskFor) { diff --git a/src/net_processing.h b/src/net_processing.h index 8c0260004aad..1c070160a3e8 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -85,7 +85,7 @@ void Misbehaving(NodeId nodeid, int howmuch, const std::string& message=""); bool IsBanned(NodeId nodeid); void EraseObjectRequest(NodeId nodeId, const CInv& inv); -void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow, bool fForce=false); +void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce=false); size_t GetRequestedObjectCount(NodeId nodeId); #endif // BITCOIN_NET_PROCESSING_H From 6d93b33f48b40a4ac00acbd50e6dd46374651287 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 16:54:14 +0200 Subject: [PATCH 19/21] Fix compilation of util_time.cpp --- src/bench/util_time.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/bench/util_time.cpp b/src/bench/util_time.cpp index 72d97354aa55..6900ff3f3311 100644 --- a/src/bench/util_time.cpp +++ b/src/bench/util_time.cpp @@ -4,7 +4,7 @@ #include -#include +#include static void BenchTimeDeprecated(benchmark::State& state) { @@ -36,7 +36,7 @@ static void BenchTimeMillisSys(benchmark::State& state) } } -BENCHMARK(BenchTimeDeprecated, 100000000); -BENCHMARK(BenchTimeMillis, 6000000); -BENCHMARK(BenchTimeMillisSys, 6000000); -BENCHMARK(BenchTimeMock, 300000000); +BENCHMARK(BenchTimeDeprecated/*, 100000000*/); +BENCHMARK(BenchTimeMillis/*, 6000000*/); +BENCHMARK(BenchTimeMillisSys/*, 6000000*/); +BENCHMARK(BenchTimeMock/*, 300000000*/); From 24b25e13daa78a858815b6317d0a22c1ca235673 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 22:13:54 +0200 Subject: [PATCH 20/21] Add wait_func to sync_mempool --- test/functional/test_framework/util.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py index 2f5cfba790cc..7ddc9a18234b 100644 --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -435,7 +435,7 @@ def sync_chain(rpc_connections, *, wait=1, timeout=60): timeout -= wait raise AssertionError("Chain sync failed: Best block hashes don't match") -def sync_mempools(rpc_connections, *, wait=1, timeout=60): +def sync_mempools(rpc_connections, *, wait=1, timeout=60, wait_func=None): """ Wait until everybody has the same transactions in their memory pools @@ -448,6 +448,8 @@ def sync_mempools(rpc_connections, *, wait=1, timeout=60): num_match = num_match + 1 if num_match == len(rpc_connections): return + if wait_func is not None: + wait_func() time.sleep(wait) timeout -= wait raise AssertionError("Mempool sync failed") From 5cf417bc3ad730d541aaecaf42f50183cad5f6af Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 22:14:39 +0200 Subject: [PATCH 21/21] Bump mocktime while syncing mempools Otherwise the inv/getdata logic won't work with inbound connections due to the added delay of 2 seconds. --- test/functional/p2p-instantsend.py | 4 ++-- test/functional/test_framework/test_framework.py | 6 ++++-- test/functional/walletbackup.py | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/test/functional/p2p-instantsend.py b/test/functional/p2p-instantsend.py index 02f60115b978..c8f3815dc6b6 100755 --- a/test/functional/p2p-instantsend.py +++ b/test/functional/p2p-instantsend.py @@ -57,7 +57,7 @@ def test_block_doublespend(self): # wait for the transaction to propagate connected_nodes = self.nodes.copy() del connected_nodes[self.isolated_idx] - sync_mempools(connected_nodes) + sync_mempools(connected_nodes, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) for node in connected_nodes: self.wait_for_instantlock(is_id, node) # send doublespend transaction to isolated node @@ -119,7 +119,7 @@ def test_mempool_doublespend(self): receiver_addr = receiver.getnewaddress() is_id = sender.sendtoaddress(receiver_addr, 0.9) # wait for the transaction to propagate - sync_mempools(self.nodes) + sync_mempools(self.nodes, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) for node in self.nodes: self.wait_for_instantlock(is_id, node) assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", isolated.getrawtransaction, dblspnd_txid) diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py index 94e37adc9ff2..30ae7979c205 100755 --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -352,13 +352,15 @@ def sync_all(self, node_groups=None): for group in node_groups: sync_blocks(group) - sync_mempools(group) + sync_mempools(group, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) def disable_mocktime(self): self.mocktime = 0 - def bump_mocktime(self, t): + def bump_mocktime(self, t, update_nodes=False): self.mocktime += t + if update_nodes: + set_node_times(self.nodes, self.mocktime) def set_cache_mocktime(self): # For backwared compatibility of the python scripts diff --git a/test/functional/walletbackup.py b/test/functional/walletbackup.py index 6c79a0eb1a2f..8798c92fe896 100755 --- a/test/functional/walletbackup.py +++ b/test/functional/walletbackup.py @@ -70,7 +70,7 @@ def do_one_round(self): # Have the miner (node3) mine a block. # Must sync mempools before mining. - sync_mempools(self.nodes) + sync_mempools(self.nodes, wait=0.1, wait_func=lambda: self.bump_mocktime(3, True)) self.nodes[3].generate(1) sync_blocks(self.nodes)