This repository was archived by the owner on Oct 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathTransactionQueue.h
More file actions
223 lines (179 loc) · 9.95 KB
/
TransactionQueue.h
File metadata and controls
223 lines (179 loc) · 9.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file TransactionQueue.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
#include "Transaction.h"
#include <libdevcore/Common.h>
#include <libdevcore/Guards.h>
#include <libdevcore/Log.h>
#include <libdevcore/LruCache.h>
#include <libethcore/Common.h>
#include <condition_variable>
#include <deque>
#include <functional>
#include <thread>
namespace dev
{
namespace eth
{
/**
* @brief A queue of Transactions, each stored as RLP.
* Maintains a transaction queue sorted by nonce diff and gas price.
* @threadsafe
*/
class TransactionQueue
{
public:
struct Limits { size_t current; size_t future; };
/// @brief TransactionQueue
/// @param _limit Maximum number of pending transactions in the queue.
/// @param _futureLimit Maximum number of future nonce transactions.
TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024);
TransactionQueue(Limits const& _l): TransactionQueue(_l.current, _l.future) {}
~TransactionQueue();
/// Add transaction to the queue to be verified and imported.
/// @param _data RLP encoded transaction data.
/// @param _nodeId Optional network identified of a node transaction comes from.
void enqueue(RLP const& _data, h512 const& _nodeId);
/// Verify and add transaction to the queue synchronously.
/// @param _tx RLP encoded transaction data.
/// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
/// @returns Import result code.
ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); }
/// Verify and add transaction to the queue synchronously.
/// @param _tx Trasnaction data.
/// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
/// @returns Import result code.
ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore);
/// Remove transaction from the queue
/// @param _txHash Trasnaction hash
void drop(h256 const& _txHash);
/// Get number of pending transactions for account.
/// @returns Pending transaction count.
unsigned waiting(Address const& _a) const;
/// Get top transactions from the queue. Returned transactions are not removed from the queue automatically.
/// @param _limit Max number of transactions to return.
/// @param _avoid Transactions to avoid returning.
/// @returns up to _limit transactions ordered by nonce and gas price.
Transactions topTransactions(unsigned _limit, h256Hash const& _avoid = h256Hash()) const;
/// Get a hash set of transactions in the queue
/// @returns A hash set of all transactions in the queue
h256Hash knownTransactions() const;
/// Get max nonce for an account
/// @returns Max transaction nonce for account in the queue
u256 maxNonce(Address const& _a) const;
/// Mark transaction as future. It wont be retured in topTransactions list until a transaction with a preceeding nonce is imported or marked with dropGood
/// @param _t Transaction hash
void setFuture(h256 const& _t);
/// Drop a trasnaction from the list if exists and move following future trasnactions to current (if any)
/// @param _t Transaction hash
void dropGood(Transaction const& _t);
struct Status
{
size_t current;
size_t future;
size_t unverified;
size_t dropped;
};
/// @returns the status of the transaction queue.
Status status() const { Status ret; DEV_GUARDED(x_queue) { ret.unverified = m_unverified.size(); } ReadGuard l(m_lock); ret.dropped = m_dropped.size(); ret.current = m_currentByHash.size(); ret.future = m_future.size(); return ret; }
/// @returns the transacrtion limits on current/future.
Limits limits() const { return Limits{m_limit, m_futureLimit}; }
/// Clear the queue
void clear();
/// Register a handler that will be called once there is a new transaction imported
template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); }
/// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported
template <class T> Handler<ImportResult, h256 const&, h512 const&> onImport(T const& _t) { return m_onImport.add(_t); }
/// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported
template <class T> Handler<h256 const&> onReplaced(T const& _t) { return m_onReplaced.add(_t); }
private:
/// Verified and imported transaction
struct VerifiedTransaction
{
VerifiedTransaction(Transaction const& _t): transaction(_t) {}
VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {}
VerifiedTransaction(VerifiedTransaction const&) = delete;
VerifiedTransaction& operator=(VerifiedTransaction const&) = delete;
Transaction transaction; ///< Transaction data
};
/// Transaction pending verification
struct UnverifiedTransaction
{
UnverifiedTransaction() {}
UnverifiedTransaction(bytesConstRef const& _t, h512 const& _nodeId): transaction(_t.toBytes()), nodeId(_nodeId) {}
UnverifiedTransaction(UnverifiedTransaction&& _t): transaction(std::move(_t.transaction)), nodeId(std::move(_t.nodeId)) {}
UnverifiedTransaction& operator=(UnverifiedTransaction&& _other)
{
assert(&_other != this);
transaction = std::move(_other.transaction);
nodeId = std::move(_other.nodeId);
return *this;
}
UnverifiedTransaction(UnverifiedTransaction const&) = delete;
UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete;
bytes transaction; ///< RLP encoded transaction data
h512 nodeId; ///< Network Id of the peer transaction comes from
};
struct PriorityCompare
{
TransactionQueue& queue;
/// Compare transaction by nonce height and gas price.
bool operator()(VerifiedTransaction const& _first, VerifiedTransaction const& _second) const
{
u256 const& height1 = _first.transaction.nonce() - queue.m_currentByAddressAndNonce[_first.transaction.sender()].begin()->first;
u256 const& height2 = _second.transaction.nonce() - queue.m_currentByAddressAndNonce[_second.transaction.sender()].begin()->first;
return height1 < height2 || (height1 == height2 && _first.transaction.gasPrice() > _second.transaction.gasPrice());
}
};
// Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order.
using PriorityQueue = std::multiset<VerifiedTransaction, PriorityCompare>;
ImportResult import(bytesConstRef _tx, IfDropped _ik = IfDropped::Ignore);
ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik);
ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction);
void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p);
void makeCurrent_WITH_LOCK(Transaction const& _t);
bool remove_WITH_LOCK(h256 const& _txHash);
u256 maxNonce_WITH_LOCK(Address const& _a) const;
void verifierBody();
mutable SharedMutex m_lock; ///< General lock.
h256Hash m_known; ///< Headers of transactions in both sets.
std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once.
///< Transactions that have previously been dropped. We technically only need to store the tx
///< hash, but we also store bool as a placeholder value so that we can use an LRU cache to cap
///< the number of transaction hashes stored.
LruCache<h256, bool> m_dropped;
PriorityQueue m_current;
std::unordered_map<h256, PriorityQueue::iterator> m_currentByHash; ///< Transaction hash to set ref
std::unordered_map<Address, std::map<u256, PriorityQueue::iterator>> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce
std::unordered_map<Address, std::map<u256, VerifiedTransaction>> m_future; /// Future transactions
Signal<> m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast.
Signal<ImportResult, h256 const&, h512 const&> m_onImport; ///< Called for each import attempt. Arguments are result, transaction id an node id. Be nice and exit fast.
Signal<h256 const&> m_onReplaced; ///< Called whan transction is dropped during a call to import() to make room for another transaction.
unsigned m_limit; ///< Max number of pending transactions
unsigned m_futureLimit; ///< Max number of future transactions
unsigned m_futureSize = 0; ///< Current number of future transactions
std::condition_variable m_queueReady; ///< Signaled when m_unverified has a new entry.
std::vector<std::thread> m_verifiers;
std::deque<UnverifiedTransaction> m_unverified; ///< Pending verification queue
mutable Mutex x_queue; ///< Verification queue mutex
std::atomic<bool> m_aborting = {false}; ///< Exit condition for verifier.
Logger m_logger{createLogger(VerbosityInfo, "tq")};
Logger m_loggerDetail{createLogger(VerbosityDebug, "tq")};
};
}
}