Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
536529e
discovery: v5 initial commit. No network, not working but compiling
zilm13 Sep 16, 2019
c040729
discovery: network implementation using Netty added
zilm13 Sep 17, 2019
d3f2051
discovery: call top network abstraction a parcel to avoid confusing w…
zilm13 Sep 18, 2019
fe91d72
discovery: add draft for sign() method and signature recovery
zilm13 Sep 18, 2019
d0394f2
discovery: cleanup DiscoveryServer interface
zilm13 Sep 18, 2019
c5fa4fa
discovery: enums cleanup
zilm13 Sep 18, 2019
88c540e
discovery: resolve all NodeTable TODOs
zilm13 Sep 18, 2019
f7c6f18
discovery: organize structures by usage
zilm13 Sep 18, 2019
2955275
discovery: fix NodeTable test
zilm13 Sep 18, 2019
19a8729
discovery: add cleanup of old contexts
zilm13 Sep 18, 2019
82854c6
discovery: refactor message handling with context
zilm13 Sep 18, 2019
5f48ed7
discovery: refactor packet handling
zilm13 Sep 19, 2019
39c5f35
discovery: add ExpirationScheduler test + fix bug
zilm13 Sep 21, 2019
5a78fa1
util: add javadoc to ExpirationScheduler
zilm13 Sep 23, 2019
e09ddc7
discovery: recurrent task fixes and refactoring
zilm13 Sep 23, 2019
1e775ea
discovery: noderecordv4, v5 refactoring. Typo fixes, redundant conver…
zilm13 Sep 25, 2019
840ac96
discovery: NodeRecord: add guaranteed fields to the interface, fix se…
zilm13 Sep 25, 2019
59180c9
util: remove Bytes33 type
zilm13 Sep 25, 2019
930e994
discovery: make all node record fields accessible. Remove ssz usage f…
zilm13 Sep 25, 2019
e6f734e
discovery: fix logDistance to use bit index instead of logarithm
zilm13 Sep 29, 2019
159457e
discovery: make NODES answer use Buckets
zilm13 Oct 1, 2019
7e90aef
discover: fix ECDSA signature sign and recover
zilm13 Oct 1, 2019
f593c22
discovery: remove NodeRecordV5 as V4 is the only available scheme
zilm13 Oct 1, 2019
25f222f
discovery: refactor Ethereum Node Record to further detach it from ot…
zilm13 Oct 1, 2019
bfd48b1
discovery: make udp client independent of server
zilm13 Oct 2, 2019
84dabf5
discovery: use enr scheme interpreter instead of full enr abstractions
zilm13 Oct 2, 2019
69c33f5
discovery: pipeline draft
zilm13 Oct 3, 2019
429930a
discovery: fix pipeline draft
zilm13 Oct 3, 2019
9a8ff0e
discovery: refactor packet flow to pipeline with actor-like packet ha…
zilm13 Oct 6, 2019
382db53
discovery: fix node availability statuses
zilm13 Oct 6, 2019
34df11a
discovery: remove obsolete condition
zilm13 Oct 6, 2019
cd32bc3
discovery: add bad packet logging
zilm13 Oct 6, 2019
14ce2d4
discovery: context renamed to session to be more intuitive
zilm13 Oct 6, 2019
ec36b97
discovery: fix bucket max distance
zilm13 Oct 14, 2019
3dac85e
discovery: fix DH hkdf_expand function
zilm13 Oct 15, 2019
077717a
discovery: fixes of AuthHeaderMessage handshake part + tests
zilm13 Oct 17, 2019
eb50643
discovery: handler logging improved
zilm13 Oct 18, 2019
15a841f
discovery: fix message request-response in discovery
zilm13 Oct 18, 2019
85209d1
discovery: remove duplicated commented out test
zilm13 Oct 21, 2019
89d054e
discovery: Clarifying 65-bytes public key issue
zilm13 Oct 21, 2019
5eb8c21
discovery: remove TODO on merge NodeRecord, because we should prefer …
zilm13 Oct 21, 2019
68b1cba
discovery: fix whoareyou received in case of session expiration was h…
zilm13 Oct 21, 2019
9e01ed3
discovery: fix requestId size
zilm13 Oct 21, 2019
4af79dd
discovery: clarify NodeSession class goal in Javadoc
zilm13 Oct 21, 2019
4cced86
discovery: fix message handlers had state and were created for each case
zilm13 Oct 21, 2019
c28a9b7
discovery: add toString to requestInfo
zilm13 Oct 21, 2019
b90e80a
discovery: refactor and remove side task from AuthHeaderMessagePacket
zilm13 Oct 21, 2019
80c6641
discovery: replace TODO in NodeSessionRequestHandler with appropriate…
zilm13 Oct 21, 2019
2acb0d1
discovery: this logic should be here
zilm13 Oct 21, 2019
b293d39
discovery: fix task manager didn't use recursive tasks
zilm13 Oct 21, 2019
8d54fe0
discovery: queue tasks, several simultaneous tasks in session capability
zilm13 Oct 23, 2019
4fcce9d
Merge branch 'develop' into feature/discv5-client
zilm13 Oct 24, 2019
35935d8
discovery: fix that dead nodes were not removed
zilm13 Oct 24, 2019
b5d3b56
discovery: live check task cleanup and comments
zilm13 Oct 24, 2019
2b62afa
discovery: bad packet handler cleanup
zilm13 Oct 24, 2019
01a60ad
discovery: extend packet handlers test
zilm13 Oct 24, 2019
5e64379
discovery: fix enrSeq behavior
zilm13 Oct 24, 2019
e0c22d8
discovery: clarifies homeNode provider goal plus javadoc
zilm13 Oct 24, 2019
fcdf192
discovery enr signature verification + tests
zilm13 Oct 24, 2019
cc59e3b
discovery: add packet and message community encoding tests
zilm13 Oct 31, 2019
134d754
discovery: fix encodeNodes test as result is clarified to be correct
zilm13 Nov 3, 2019
599108f
discovery: finish community tests + fixes
zilm13 Nov 3, 2019
66fcb7f
discovery: update idNonce signing test with correct community vector
zilm13 Nov 5, 2019
2eb78e2
discovery: fix node encoding in AuthHeaderMessagePacket and tests
zilm13 Nov 5, 2019
524833e
discovery: add sign to the enr scheme interface as it's a part of scheme
zilm13 Nov 11, 2019
1c4d0b6
discovery: spec fixes, interop test with geth started but not finished
zilm13 Nov 19, 2019
f29395f
discovery: fixed maximum packet size verification + nodes sent in fin…
zilm13 Nov 19, 2019
2bf36a7
discovery: fixes encoding flow, plus new task api
zilm13 Nov 19, 2019
f3412e3
discovery: fix and add appropriate asserts in Geth interop test
zilm13 Nov 19, 2019
310c825
discovery: interop test with geth fixed
zilm13 Nov 21, 2019
000a312
discovery: refactor ENR routines to have abstractions according to sp…
zilm13 Nov 22, 2019
2c61b55
discovery: better interface for NodeRecord encoding routines
zilm13 Nov 22, 2019
ccc23a1
discovery: javadocs and final cleanup
zilm13 Nov 22, 2019
dc52d39
discovery: fix tests when one node is not ready on init
zilm13 Nov 22, 2019
51d0185
Refactor method name to distinguish by parameter name
shahankhatch Nov 23, 2019
665e10d
Doc-ed changed method name
shahankhatch Nov 23, 2019
9b9f283
Doc-ed changed method name
shahankhatch Nov 23, 2019
9985d2e
Merge pull request #210 from shahankhatch/feature/discv5-client
zilm13 Nov 25, 2019
4eaa680
discovery: simplify NodeRecord creation from fields. Use field setup …
zilm13 Nov 26, 2019
f52cf8a
Merge branch 'feature/discv5-client' of https://github.com/harmony-de…
zilm13 Nov 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions discovery/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation project(':types')
implementation project(':util')
implementation project(':db:core')
implementation project(':chain')

implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
implementation 'io.netty:netty-all'
implementation 'org.apache.logging.log4j:log4j-core'
implementation 'org.web3j:core'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.ethereum.beacon.discovery;

import org.ethereum.beacon.discovery.enr.NodeRecord;

import java.util.concurrent.CompletableFuture;

/**
* Discovery Manager, top interface for peer discovery mechanism as described at <a
* href="https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md">https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md</a>
*/
public interface DiscoveryManager {

void start();

void stop();

/**
* Initiates FINDNODE with node `nodeRecord`
*
* @param nodeRecord Ethereum Node record
* @param distance Distance to search for
* @return Future which is fired when reply is received or fails in timeout/not successful
* handshake/bad message exchange.
*/
CompletableFuture<Void> findNodes(NodeRecord nodeRecord, int distance);

/**
* Initiates PING with node `nodeRecord`
*
* @param nodeRecord Ethereum Node record
* @return Future which is fired when reply is received or fails in timeout/not successful
* handshake/bad message exchange.
*/
CompletableFuture<Void> ping(NodeRecord nodeRecord);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package org.ethereum.beacon.discovery;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.discovery.enr.EnrField;
import org.ethereum.beacon.discovery.enr.NodeRecord;
import org.ethereum.beacon.discovery.enr.NodeRecordFactory;
import org.ethereum.beacon.discovery.network.DiscoveryClient;
import org.ethereum.beacon.discovery.network.NettyDiscoveryClientImpl;
import org.ethereum.beacon.discovery.network.NettyDiscoveryServer;
import org.ethereum.beacon.discovery.network.NettyDiscoveryServerImpl;
import org.ethereum.beacon.discovery.network.NetworkParcel;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.Field;
import org.ethereum.beacon.discovery.pipeline.Pipeline;
import org.ethereum.beacon.discovery.pipeline.PipelineImpl;
import org.ethereum.beacon.discovery.pipeline.handler.AuthHeaderMessagePacketHandler;
import org.ethereum.beacon.discovery.pipeline.handler.BadPacketHandler;
import org.ethereum.beacon.discovery.pipeline.handler.IncomingDataPacker;
import org.ethereum.beacon.discovery.pipeline.handler.MessageHandler;
import org.ethereum.beacon.discovery.pipeline.handler.MessagePacketHandler;
import org.ethereum.beacon.discovery.pipeline.handler.NewTaskHandler;
import org.ethereum.beacon.discovery.pipeline.handler.NextTaskHandler;
import org.ethereum.beacon.discovery.pipeline.handler.NodeIdToSession;
import org.ethereum.beacon.discovery.pipeline.handler.NodeSessionRequestHandler;
import org.ethereum.beacon.discovery.pipeline.handler.NotExpectedIncomingPacketHandler;
import org.ethereum.beacon.discovery.pipeline.handler.OutgoingParcelHandler;
import org.ethereum.beacon.discovery.pipeline.handler.UnknownPacketTagToSender;
import org.ethereum.beacon.discovery.pipeline.handler.UnknownPacketTypeByStatus;
import org.ethereum.beacon.discovery.pipeline.handler.WhoAreYouAttempt;
import org.ethereum.beacon.discovery.pipeline.handler.WhoAreYouPacketHandler;
import org.ethereum.beacon.discovery.pipeline.handler.WhoAreYouSessionResolver;
import org.ethereum.beacon.discovery.storage.AuthTagRepository;
import org.ethereum.beacon.discovery.storage.NodeBucketStorage;
import org.ethereum.beacon.discovery.storage.NodeTable;
import org.ethereum.beacon.discovery.task.TaskOptions;
import org.ethereum.beacon.discovery.task.TaskType;
import org.ethereum.beacon.schedulers.Scheduler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import tech.pegasys.artemis.util.bytes.Bytes4;
import tech.pegasys.artemis.util.bytes.BytesValue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class DiscoveryManagerImpl implements DiscoveryManager {
private static final Logger logger = LogManager.getLogger(DiscoveryManagerImpl.class);
private final ReplayProcessor<NetworkParcel> outgoingMessages = ReplayProcessor.cacheLast();
private final FluxSink<NetworkParcel> outgoingSink = outgoingMessages.sink();
private final NettyDiscoveryServer discoveryServer;
private final Scheduler scheduler;
private final Pipeline incomingPipeline = new PipelineImpl();
private final Pipeline outgoingPipeline = new PipelineImpl();
private final NodeRecordFactory nodeRecordFactory;
private DiscoveryClient discoveryClient;
private CountDownLatch clientStarted = new CountDownLatch(1);

public DiscoveryManagerImpl(
NodeTable nodeTable,
NodeBucketStorage nodeBucketStorage,
NodeRecord homeNode,
BytesValue homeNodePrivateKey,
NodeRecordFactory nodeRecordFactory,
Scheduler serverScheduler,
Scheduler taskScheduler) {
AuthTagRepository authTagRepo = new AuthTagRepository();
this.scheduler = serverScheduler;
this.nodeRecordFactory = nodeRecordFactory;
this.discoveryServer =
new NettyDiscoveryServerImpl(
((Bytes4) homeNode.get(EnrField.IP_V4)), (int) homeNode.get(EnrField.UDP_V4));
discoveryServer.useDatagramChannel(
channel -> {
discoveryClient = new NettyDiscoveryClientImpl(outgoingMessages, channel);
clientStarted.countDown();
});
NodeIdToSession nodeIdToSession =
new NodeIdToSession(
homeNode,
homeNodePrivateKey,
nodeBucketStorage,
authTagRepo,
nodeTable,
outgoingPipeline);
incomingPipeline
.addHandler(new IncomingDataPacker())
.addHandler(new WhoAreYouAttempt(homeNode.getNodeId()))
.addHandler(new WhoAreYouSessionResolver(authTagRepo))
.addHandler(new UnknownPacketTagToSender(homeNode))
.addHandler(nodeIdToSession)
.addHandler(new UnknownPacketTypeByStatus())
.addHandler(new NotExpectedIncomingPacketHandler())
.addHandler(new WhoAreYouPacketHandler(outgoingPipeline, taskScheduler))
.addHandler(
new AuthHeaderMessagePacketHandler(outgoingPipeline, taskScheduler, nodeRecordFactory))
.addHandler(new MessagePacketHandler())
.addHandler(new MessageHandler(nodeRecordFactory))
.addHandler(new BadPacketHandler());
outgoingPipeline
.addHandler(new OutgoingParcelHandler(outgoingSink))
.addHandler(new NodeSessionRequestHandler())
.addHandler(nodeIdToSession)
.addHandler(new NewTaskHandler())
.addHandler(new NextTaskHandler(outgoingPipeline, taskScheduler));
}

@Override
public void start() {
incomingPipeline.build();
outgoingPipeline.build();
Flux.from(discoveryServer.getIncomingPackets()).subscribe(incomingPipeline::push);
discoveryServer.start(scheduler);
try {
clientStarted.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Failed to start client", e);
}
}

@Override
public void stop() {
discoveryServer.stop();
}

private CompletableFuture<Void> executeTaskImpl(
NodeRecord nodeRecord, TaskType taskType, TaskOptions taskOptions) {
Envelope envelope = new Envelope();
envelope.put(Field.NODE, nodeRecord);
CompletableFuture<Void> future = new CompletableFuture<>();
envelope.put(Field.TASK, taskType);
envelope.put(Field.FUTURE, future);
envelope.put(Field.TASK_OPTIONS, taskOptions);
outgoingPipeline.push(envelope);
return future;
}

@Override
public CompletableFuture<Void> findNodes(NodeRecord nodeRecord, int distance) {
return executeTaskImpl(nodeRecord, TaskType.FINDNODE, new TaskOptions(true, distance));
}

@Override
public CompletableFuture<Void> ping(NodeRecord nodeRecord) {
return executeTaskImpl(nodeRecord, TaskType.PING, new TaskOptions(true));
}

@VisibleForTesting
Publisher<NetworkParcel> getOutgoingMessages() {
return outgoingMessages;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.ethereum.beacon.discovery;

import org.ethereum.beacon.discovery.message.DiscoveryMessage;

/** Handles discovery messages of several types */
public interface DiscoveryMessageProcessor<M extends DiscoveryMessage> {
Protocol getSupportedIdentity();

void handleMessage(M message, NodeSession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.ethereum.beacon.discovery;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.discovery.enr.NodeRecordFactory;
import org.ethereum.beacon.discovery.message.DiscoveryV5Message;
import org.ethereum.beacon.discovery.message.MessageCode;
import org.ethereum.beacon.discovery.message.handler.FindNodeHandler;
import org.ethereum.beacon.discovery.message.handler.MessageHandler;
import org.ethereum.beacon.discovery.message.handler.NodesHandler;
import org.ethereum.beacon.discovery.message.handler.PingHandler;
import org.ethereum.beacon.discovery.message.handler.PongHandler;

import java.util.HashMap;
import java.util.Map;

/**
* {@link DiscoveryV5Message} v5 messages processor. Uses several handlers, one fo each type of v5
* message to handle appropriate message.
*/
public class DiscoveryV5MessageProcessor implements DiscoveryMessageProcessor<DiscoveryV5Message> {
private static final Logger logger = LogManager.getLogger(DiscoveryV5MessageProcessor.class);
private final Map<MessageCode, MessageHandler> messageHandlers = new HashMap<>();
private final NodeRecordFactory nodeRecordFactory;

public DiscoveryV5MessageProcessor(NodeRecordFactory nodeRecordFactory) {
messageHandlers.put(MessageCode.PING, new PingHandler());
messageHandlers.put(MessageCode.PONG, new PongHandler());
messageHandlers.put(MessageCode.FINDNODE, new FindNodeHandler());
messageHandlers.put(MessageCode.NODES, new NodesHandler());
this.nodeRecordFactory = nodeRecordFactory;
}

@Override
public Protocol getSupportedIdentity() {
return Protocol.V5;
}

@Override
public void handleMessage(DiscoveryV5Message message, NodeSession session) {
MessageCode code = message.getCode();
MessageHandler messageHandler = messageHandlers.get(code);
logger.trace(() -> String.format("Handling message %s in session %s", message, session));
if (messageHandler == null) {
throw new RuntimeException("Not implemented yet");
}
messageHandler.handle(message.create(nodeRecordFactory), session);
}
}
Loading