This repository was archived by the owner on Oct 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathHost.h
More file actions
363 lines (268 loc) · 13.7 KB
/
Host.h
File metadata and controls
363 lines (268 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
// Aleth: Ethereum C++ client, tools and libraries.
// Copyright 2018 Aleth Authors.
// Licensed under the GNU General Public License, Version 3.
#pragma once
#include "Common.h"
#include "Network.h"
#include "NodeTable.h"
#include "Peer.h"
#include "RLPXFrameCoder.h"
#include "RLPXSocket.h"
#include <libdevcore/Guards.h>
#include <libdevcore/Worker.h>
#include <libdevcrypto/Common.h>
#include <chrono>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <thread>
#include <utility>
#include <vector>
namespace io = boost::asio;
namespace bi = io::ip;
namespace std
{
template<> struct hash<pair<dev::p2p::NodeID, string>>
{
size_t operator()(pair<dev::p2p::NodeID, string> const& _value) const
{
size_t ret = hash<dev::p2p::NodeID>()(_value.first);
return ret ^ (hash<string>()(_value.second) + 0x9e3779b9 + (ret << 6) + (ret >> 2));
}
};
}
namespace dev
{
namespace p2p
{
class CapabilityFace;
class CapabilityHostFace;
class Host;
class SessionFace;
class HostNodeTableHandler: public NodeTableEventHandler
{
public:
HostNodeTableHandler(Host& _host);
Host const& host() const { return m_host; }
private:
virtual void processEvent(NodeID const& _n, NodeTableEventType const& _e);
Host& m_host;
};
struct SubReputation
{
bool isRude = false;
int utility = 0;
bytes data;
};
struct Reputation
{
std::unordered_map<std::string, SubReputation> subs;
};
class ReputationManager
{
public:
void noteRude(SessionFace const& _s, std::string const& _sub = std::string());
bool isRude(SessionFace const& _s, std::string const& _sub = std::string()) const;
void setData(SessionFace const& _s, std::string const& _sub, bytes const& _data);
bytes data(SessionFace const& _s, std::string const& _subs) const;
private:
std::unordered_map<std::pair<p2p::NodeID, std::string>, Reputation> m_nodes; ///< Nodes that were impolite while syncing. We avoid syncing from these if possible.
SharedMutex mutable x_nodes;
};
struct NodeInfo
{
NodeInfo() = default;
NodeInfo(NodeID const& _id, std::string const& _address, unsigned _port, std::string const& _version):
id(_id), address(_address), port(_port), version(_version) {}
std::string enode() const { return "enode://" + id.hex() + "@" + address + ":" + toString(port); }
NodeID id;
std::string address;
unsigned port;
std::string version;
};
/**
* @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
*
* @todo determinePublic: ipv6, udp
* @todo per-session keepalive/ping instead of broadcast; set ping-timeout via median-latency
*/
class Host: public Worker
{
friend class HostNodeTableHandler;
friend class RLPXHandshake;
friend class Session;
public:
/// Start server, listening for connections on the given port.
Host(
std::string const& _clientVersion,
NetworkConfig const& _n = NetworkConfig{},
bytesConstRef _restoreNetwork = bytesConstRef()
);
/// Alternative constructor that allows providing the node key directly
/// without restoring the network.
Host(
std::string const& _clientVersion,
KeyPair const& _alias,
NetworkConfig const& _n = NetworkConfig{}
);
/// Will block on network process events.
virtual ~Host();
/// Default hosts for current version of client.
static std::unordered_map<Public, std::string> pocHosts();
/// Register a host capability; all new peer connections will see this capability.
void registerCapability(std::shared_ptr<CapabilityFace> const& _cap);
/// Register a host capability with arbitrary name and version.
/// Might be useful when you want to handle several subprotocol versions with a single
/// capability class.
void registerCapability(std::shared_ptr<CapabilityFace> const& _cap, std::string const& _name,
unsigned _version);
bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; }
bool haveCapabilities() const { return !caps().empty(); }
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
/// Add a potential peer.
void addPeer(NodeSpec const& _s, PeerType _t);
/// Add node as a peer candidate. Node is added if discovery ping is successful and table has capacity.
void addNode(NodeID const& _node, NodeIPEndpoint const& _endpoint);
/// Create Peer and attempt keeping peer connected.
void requirePeer(NodeID const& _node, NodeIPEndpoint const& _endpoint);
/// Create Peer and attempt keeping peer connected.
void requirePeer(NodeID const& _node, bi::address const& _addr, unsigned short _udpPort, unsigned short _tcpPort) { requirePeer(_node, NodeIPEndpoint(_addr, _udpPort, _tcpPort)); }
/// returns true if a member of m_requiredPeers
bool isRequiredPeer(NodeID const&) const;
/// Note peer as no longer being required.
void relinquishPeer(NodeID const& _node);
/// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
/// Set multipier for max accepted connections.
void setPeerStretch(unsigned _n) { m_stretchPeers = _n; }
/// Get peer information.
PeerSessionInfos peerSessionInfo() const;
/// Get number of peers connected.
size_t peerCount() const;
/// Get the address we're listening on currently.
std::string listenAddress() const { return m_tcpPublic.address().is_unspecified() ? "0.0.0.0" : m_tcpPublic.address().to_string(); }
/// Get the port we're listening on currently.
unsigned short listenPort() const { return std::max(0, m_listenPort.load()); }
/// Serialise the set of known peers.
bytes saveNetwork() const;
// TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information.
Peers getPeers() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; }
NetworkConfig const& networkConfig() const { return m_netConfig; }
void setNetworkConfig(NetworkConfig const& _p, bool _dropPeers = false) { m_dropPeers = _dropPeers; auto had = isStarted(); if (had) stop(); m_netConfig = _p; if (had) start(); }
/// Start network. @threadsafe
void start();
/// Stop network. @threadsafe
/// Resets acceptor, socket, and IO service. Called by deallocator.
void stop();
/// @returns if network has been started.
bool isStarted() const { return isWorking(); }
/// @returns our reputation manager.
ReputationManager& repMan() { return m_repMan; }
/// @returns if network is started and interactive.
bool haveNetwork() const { return m_run; }
/// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error.
void startPeerSession(Public const& _id, RLP const& _hello, std::unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s);
/// Get session by id
std::shared_ptr<SessionFace> peerSession(NodeID const& _id) const
{
RecursiveGuard l(x_sessions);
return m_sessions.count(_id) ? m_sessions[_id].lock() : std::shared_ptr<SessionFace>();
}
/// Get our current node ID.
NodeID id() const { return m_alias.pub(); }
/// Get the public TCP endpoint.
bi::tcp::endpoint const& tcpPublic() const { return m_tcpPublic; }
/// Get the public endpoint information.
std::string enode() const { return "enode://" + id().hex() + "@" + (networkConfig().publicIPAddress.empty() ? m_tcpPublic.address().to_string() : networkConfig().publicIPAddress) + ":" + toString(m_tcpPublic.port()); }
/// Get the node information.
p2p::NodeInfo nodeInfo() const { return NodeInfo(id(), (networkConfig().publicIPAddress.empty() ? m_tcpPublic.address().to_string() : networkConfig().publicIPAddress), m_tcpPublic.port(), m_clientVersion); }
/// Apply function to each session
void forEachPeer(
std::string const& _capabilityName, std::function<bool(NodeID const&)> _f) const;
void scheduleExecution(int _delayMs, std::function<void()> _f);
std::shared_ptr<CapabilityHostFace> capabilityHost() const { return m_capabilityHost; }
protected:
void onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e);
/// Deserialise the data and populate the set of known peers.
void restoreNetwork(bytesConstRef _b);
private:
enum PeerSlotType { Egress, Ingress };
unsigned peerSlots(PeerSlotType _type) { return _type == Egress ? m_idealPeerCount : m_idealPeerCount * m_stretchPeers; }
bool havePeerSession(NodeID const& _id) { return !!peerSession(_id); }
/// Determines and sets m_tcpPublic to publicly advertised address.
void determinePublic();
void connect(std::shared_ptr<Peer> const& _p);
/// Returns true if pending and connected peer count is less than maximum
bool peerSlotsAvailable(PeerSlotType _type = Ingress);
/// Ping the peers to update the latency information and disconnect peers which have timed out.
void keepAlivePeers();
/// Disconnect peers which didn't respond to keepAlivePeers ping prior to c_keepAliveTimeOut.
void disconnectLatePeers();
/// Called only from startedWorking().
void runAcceptor();
/// Called by Worker. Not thread-safe; to be called only by worker.
virtual void startedWorking();
/// Called by startedWorking. Not thread-safe; to be called only be Worker.
void run(boost::system::error_code const& error); ///< Run network. Called serially via ASIO deadline timer. Manages connection state transitions.
/// Run network. Not thread-safe; to be called only by worker.
virtual void doWork();
/// Shutdown network. Not thread-safe; to be called only by worker.
virtual void doneWorking();
/// Get or create host identifier (KeyPair).
static KeyPair networkAlias(bytesConstRef _b);
bool nodeTableHasNode(Public const& _id) const;
Node nodeFromNodeTable(Public const& _id) const;
bool addNodeToNodeTable(Node const& _node, NodeTable::NodeRelation _relation = NodeTable::NodeRelation::Unknown);
/// Determines if a node with the supplied endpoint should be included in or restored from the
/// serialized network configuration data
bool isAllowedEndpoint(NodeIPEndpoint const& _endpointToCheck) const
{
return dev::p2p::isAllowedEndpoint(m_netConfig.allowLocalDiscovery, _endpointToCheck);
}
bytes m_restoreNetwork; ///< Set by constructor and used to set Host key and restore network peers & nodes.
std::atomic<bool> m_run{false}; ///< Whether network is running.
std::string m_clientVersion; ///< Our version string.
NetworkConfig m_netConfig; ///< Network settings.
/// Interface addresses (private, public)
std::set<bi::address> m_ifAddresses; ///< Interface addresses.
std::atomic<int> m_listenPort{-1}; ///< What port are we listening on. -1 means binding failed or acceptor hasn't been initialized.
io::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor.
/// Timer which, when network is running, calls run() every c_timerInterval ms.
io::deadline_timer m_timer;
static constexpr unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected.
std::set<Peer*> m_pendingPeerConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptr<Peer>const&).
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
KeyPair m_alias; ///< Alias for network communication. Network address is k*G. k is key material. TODO: Replace KeyPair.
std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery).
mutable std::mutex x_nodeTable;
std::shared_ptr<NodeTable> nodeTable() const { Guard l(x_nodeTable); return m_nodeTable; }
/// Shared storage of Peer objects. Peers are created or destroyed on demand by the Host. Active sessions maintain a shared_ptr to a Peer;
std::unordered_map<NodeID, std::shared_ptr<Peer>> m_peers;
/// Peers we try to connect regardless of p2p network.
std::set<NodeID> m_requiredPeers;
mutable Mutex x_requiredPeers;
/// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run())
/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::unordered_map<NodeID, std::weak_ptr<SessionFace>> m_sessions;
mutable RecursiveMutex x_sessions;
std::list<std::weak_ptr<RLPXHandshake>> m_connecting; ///< Pending connections.
Mutex x_connecting; ///< Mutex for m_connecting.
unsigned m_idealPeerCount = 11; ///< Ideal number of peers to be connected to.
unsigned m_stretchPeers = 7; ///< Accepted connection multiplier (max peers = ideal*stretch).
/// Each of the capabilities we support.
std::map<CapDesc, std::shared_ptr<CapabilityFace>> m_capabilities;
/// Deadline timers used for isolated network events. GC'd by run.
std::list<std::unique_ptr<io::deadline_timer>> m_timers;
Mutex x_timers;
std::chrono::steady_clock::time_point m_lastPing; ///< Time we sent the last ping to all peers.
bool m_accepting = false;
bool m_dropPeers = false;
ReputationManager m_repMan;
std::shared_ptr<CapabilityHostFace> m_capabilityHost;
Logger m_logger{createLogger(VerbosityDebug, "net")};
};
}
}