messageHandlers = new HashMap<>();
+ private final NodeRecordFactory nodeRecordFactory;
+
+ public DiscoveryV5MessageProcessor(NodeRecordFactory nodeRecordFactory) {
+ messageHandlers.put(MessageCode.PING, new PingHandler());
+ messageHandlers.put(MessageCode.PONG, new PongHandler());
+ messageHandlers.put(MessageCode.FINDNODE, new FindNodeHandler());
+ messageHandlers.put(MessageCode.NODES, new NodesHandler());
+ this.nodeRecordFactory = nodeRecordFactory;
+ }
+
+ @Override
+ public Protocol getSupportedIdentity() {
+ return Protocol.V5;
+ }
+
+ @Override
+ public void handleMessage(DiscoveryV5Message message, NodeSession session) {
+ MessageCode code = message.getCode();
+ MessageHandler messageHandler = messageHandlers.get(code);
+ logger.trace(() -> String.format("Handling message %s in session %s", message, session));
+ if (messageHandler == null) {
+ throw new RuntimeException("Not implemented yet");
+ }
+ messageHandler.handle(message.create(nodeRecordFactory), session);
+ }
+}
diff --git a/discovery/src/main/java/org/ethereum/beacon/discovery/Functions.java b/discovery/src/main/java/org/ethereum/beacon/discovery/Functions.java
new file mode 100644
index 000000000..f60ee7c46
--- /dev/null
+++ b/discovery/src/main/java/org/ethereum/beacon/discovery/Functions.java
@@ -0,0 +1,296 @@
+package org.ethereum.beacon.discovery;
+
+import com.google.common.base.Objects;
+import org.bouncycastle.crypto.Digest;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+import org.bouncycastle.crypto.generators.HKDFBytesGenerator;
+import org.bouncycastle.crypto.params.ECDomainParameters;
+import org.bouncycastle.crypto.params.HKDFParameters;
+import org.bouncycastle.math.ec.ECPoint;
+import org.bouncycastle.util.Arrays;
+import org.ethereum.beacon.crypto.Hashes;
+import org.ethereum.beacon.util.Utils;
+import org.web3j.crypto.ECDSASignature;
+import org.web3j.crypto.ECKeyPair;
+import org.web3j.crypto.Hash;
+import org.web3j.crypto.Sign;
+import tech.pegasys.artemis.util.bytes.Bytes32;
+import tech.pegasys.artemis.util.bytes.Bytes32s;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+import java.util.Random;
+
+import static org.ethereum.beacon.util.Utils.extractBytesFromUnsignedBigInt;
+import static org.web3j.crypto.Sign.CURVE_PARAMS;
+
+/** Set of cryptography and utilities functions used in discovery */
+public class Functions {
+ public static final ECDomainParameters SECP256K1_CURVE =
+ new ECDomainParameters(
+ CURVE_PARAMS.getCurve(), CURVE_PARAMS.getG(), CURVE_PARAMS.getN(), CURVE_PARAMS.getH());
+ public static final int PUBKEY_SIZE = 64;
+ private static final int RECIPIENT_KEY_LENGTH = 16;
+ private static final int INITIATOR_KEY_LENGTH = 16;
+ private static final int AUTH_RESP_KEY_LENGTH = 16;
+ private static final int MS_IN_SECOND = 1000;
+
+ /** SHA2 (SHA256) */
+ public static Bytes32 hash(BytesValue value) {
+ return Hashes.sha256(value);
+ }
+
+ /** SHA3 (Keccak256) */
+ public static Bytes32 hashKeccak(BytesValue value) {
+ return Bytes32.wrap(Hash.sha3(value.extractArray()));
+ }
+
+ /**
+ * Creates a signature of message `x` using the given key.
+ *
+ * @param key private key
+ * @param x message, hashed
+ * @return ECDSA signature with properties merged together: r || s
+ */
+ public static BytesValue sign(BytesValue key, BytesValue x) {
+ Sign.SignatureData signatureData =
+ Sign.signMessage(x.extractArray(), ECKeyPair.create(key.extractArray()), false);
+ Bytes32 r = Bytes32.wrap(signatureData.getR());
+ Bytes32 s = Bytes32.wrap(signatureData.getS());
+ return r.concat(s);
+ }
+
+ /**
+ * Verifies that signature is made by signer
+ *
+ * @param signature Signature, ECDSA
+ * @param x message, hashed
+ * @param pubKey Public key of supposed signer, compressed, 33 bytes
+ * @return whether `signature` reflects message `x` signed with `pubkey`
+ */
+ public static boolean verifyECDSASignature(
+ BytesValue signature, BytesValue x, BytesValue pubKey) {
+ assert pubKey.size() == 33;
+ ECPoint ecPoint = Functions.publicKeyToPoint(pubKey);
+ BytesValue pubKeyUncompressed = BytesValue.wrap(ecPoint.getEncoded(false)).slice(1);
+ ECDSASignature ecdsaSignature =
+ new ECDSASignature(
+ new BigInteger(1, signature.slice(0, 32).extractArray()),
+ new BigInteger(1, signature.slice(32).extractArray()));
+ for (int recId = 0; recId < 4; ++recId) {
+ BigInteger calculatedPubKey =
+ Sign.recoverFromSignature(recId, ecdsaSignature, x.extractArray());
+ if (calculatedPubKey == null) {
+ continue;
+ }
+ if (Arrays.areEqual(
+ pubKeyUncompressed.extractArray(),
+ extractBytesFromUnsignedBigInt(calculatedPubKey, PUBKEY_SIZE))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * AES-GCM encryption/authentication with the given `key`, `nonce` and additional authenticated
+ * data `ad`. Size of `key` is 16 bytes (AES-128), size of `nonce` 12 bytes.
+ */
+ public static BytesValue aesgcm_encrypt(
+ BytesValue privateKey, BytesValue nonce, BytesValue message, BytesValue aad) {
+ try {
+ Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+ cipher.init(
+ Cipher.ENCRYPT_MODE,
+ new SecretKeySpec(privateKey.extractArray(), "AES"),
+ new GCMParameterSpec(128, nonce.extractArray()));
+ cipher.updateAAD(aad.extractArray());
+ return BytesValue.wrap(cipher.doFinal(message.extractArray()));
+ } catch (Exception e) {
+ throw new RuntimeException("No AES/GCM cipher provider", e);
+ }
+ }
+
+ /**
+ * AES-GCM decryption of `encoded` data with the given `key`, `nonce` and additional authenticated
+ * data `ad`. Size of `key` is 16 bytes (AES-128), size of `nonce` 12 bytes.
+ */
+ public static BytesValue aesgcm_decrypt(
+ BytesValue privateKey, BytesValue nonce, BytesValue encoded, BytesValue aad) {
+ try {
+ Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+ cipher.init(
+ Cipher.DECRYPT_MODE,
+ new SecretKeySpec(privateKey.extractArray(), "AES"),
+ new GCMParameterSpec(128, nonce.extractArray()));
+ cipher.updateAAD(aad.extractArray());
+ return BytesValue.wrap(cipher.doFinal(encoded.extractArray()));
+ } catch (Exception e) {
+ throw new RuntimeException("No AES/GCM cipher provider", e);
+ }
+ }
+
+ /** Maps public key to point on {@link #SECP256K1_CURVE} */
+ public static ECPoint publicKeyToPoint(BytesValue pkey) {
+ byte[] destPubPointBytes;
+ if (pkey.size() == 64) { // uncompressed
+ destPubPointBytes = new byte[pkey.size() + 1];
+ destPubPointBytes[0] = 0x04; // default prefix
+ System.arraycopy(pkey.extractArray(), 0, destPubPointBytes, 1, pkey.size());
+ } else {
+ destPubPointBytes = pkey.extractArray();
+ }
+ return SECP256K1_CURVE.getCurve().decodePoint(destPubPointBytes);
+ }
+
+ /** Derives public key in SECP256K1, compressed */
+ public static BytesValue derivePublicKeyFromPrivate(BytesValue privateKey) {
+ ECKeyPair ecKeyPair = ECKeyPair.create(privateKey.extractArray());
+ final BytesValue pubKey =
+ BytesValue.wrap(
+ Utils.extractBytesFromUnsignedBigInt(ecKeyPair.getPublicKey(), PUBKEY_SIZE));
+ ECPoint ecPoint = Functions.publicKeyToPoint(pubKey);
+ return BytesValue.wrap(ecPoint.getEncoded(true));
+ }
+
+ /** Derives key agreement ECDH by multiplying private key by public */
+ public static BytesValue deriveECDHKeyAgreement(BytesValue srcPrivKey, BytesValue destPubKey) {
+ ECPoint pudDestPoint = publicKeyToPoint(destPubKey);
+ ECPoint mult = pudDestPoint.multiply(new BigInteger(1, srcPrivKey.extractArray()));
+ return BytesValue.wrap(mult.getEncoded(true));
+ }
+
+ /**
+ * The ephemeral key is used to perform Diffie-Hellman key agreement with B's static public key
+ * and the session keys are derived from it using the HKDF key derivation function.
+ *
+ *
+ * ephemeral-key = random private key
+ * ephemeral-pubkey = public key corresponding to ephemeral-key
+ * dest-pubkey = public key of B
+ * secret = agree(ephemeral-key, dest-pubkey)
+ * info = "discovery v5 key agreement" || node-id-A || node-id-B
+ * prk = HKDF-Extract(secret, id-nonce)
+ * initiator-key, recipient-key, auth-resp-key = HKDF-Expand(prk, info)
+ */
+ public static HKDFKeys hkdf_expand(
+ BytesValue srcNodeId,
+ BytesValue destNodeId,
+ BytesValue srcPrivKey,
+ BytesValue destPubKey,
+ BytesValue idNonce) {
+ BytesValue keyAgreement = deriveECDHKeyAgreement(srcPrivKey, destPubKey);
+ return hkdf_expand(srcNodeId, destNodeId, keyAgreement, idNonce);
+ }
+
+ /**
+ * {@link #hkdf_expand(BytesValue, BytesValue, BytesValue, BytesValue, BytesValue)} but with
+ * keyAgreement already derived by {@link #deriveECDHKeyAgreement(BytesValue, BytesValue)}
+ */
+ public static HKDFKeys hkdf_expand(
+ BytesValue srcNodeId, BytesValue destNodeId, BytesValue keyAgreement, BytesValue idNonce) {
+ try {
+ BytesValue info =
+ BytesValue.wrap("discovery v5 key agreement".getBytes())
+ .concat(srcNodeId)
+ .concat(destNodeId);
+ HKDFParameters hkdfParameters =
+ new HKDFParameters(
+ keyAgreement.extractArray(), idNonce.extractArray(), info.extractArray());
+ Digest digest = new SHA256Digest();
+ HKDFBytesGenerator hkdfBytesGenerator = new HKDFBytesGenerator(digest);
+ hkdfBytesGenerator.init(hkdfParameters);
+ // initiator-key || recipient-key || auth-resp-key
+ byte[] hkdfOutputBytes =
+ new byte[INITIATOR_KEY_LENGTH + RECIPIENT_KEY_LENGTH + AUTH_RESP_KEY_LENGTH];
+ hkdfBytesGenerator.generateBytes(
+ hkdfOutputBytes, 0, INITIATOR_KEY_LENGTH + RECIPIENT_KEY_LENGTH + AUTH_RESP_KEY_LENGTH);
+ BytesValue hkdfOutput = BytesValue.wrap(hkdfOutputBytes);
+ BytesValue initiatorKey = hkdfOutput.slice(0, INITIATOR_KEY_LENGTH);
+ BytesValue recipientKey = hkdfOutput.slice(INITIATOR_KEY_LENGTH, RECIPIENT_KEY_LENGTH);
+ BytesValue authRespKey = hkdfOutput.slice(INITIATOR_KEY_LENGTH + RECIPIENT_KEY_LENGTH);
+ return new HKDFKeys(initiatorKey, recipientKey, authRespKey);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /** Current time in seconds */
+ public static long getTime() {
+ return System.currentTimeMillis() / MS_IN_SECOND;
+ }
+
+ /** Random provider */
+ public static Random getRandom() {
+ return new SecureRandom();
+ }
+
+ /**
+ * The 'distance' between two node IDs is the bitwise XOR of the IDs, taken as the number.
+ *
+ *
distance(n₁, n₂) = n₁ XOR n₂
+ *
+ *
LogDistance is reverse of length of common prefix in bits (length - number of leftmost zeros
+ * in XOR)
+ */
+ public static int logDistance(Bytes32 nodeId1, Bytes32 nodeId2) {
+ BytesValue distance = Bytes32s.xor(nodeId1, nodeId2);
+ int logDistance = Byte.SIZE * distance.size(); // 256
+ final int maxLogDistance = logDistance;
+ for (int i = 0; i < maxLogDistance; ++i) {
+ if (distance.getHighBit(i)) {
+ break;
+ } else {
+ logDistance--;
+ }
+ }
+ return logDistance;
+ }
+
+ /**
+ * Stores set of keys derived by simple key derivation function (KDF) based on a hash-based
+ * message authentication code (HMAC)
+ */
+ public static class HKDFKeys {
+ private final BytesValue initiatorKey;
+ private final BytesValue recipientKey;
+ private final BytesValue authResponseKey;
+
+ public HKDFKeys(BytesValue initiatorKey, BytesValue recipientKey, BytesValue authResponseKey) {
+ this.initiatorKey = initiatorKey;
+ this.recipientKey = recipientKey;
+ this.authResponseKey = authResponseKey;
+ }
+
+ public BytesValue getInitiatorKey() {
+ return initiatorKey;
+ }
+
+ public BytesValue getRecipientKey() {
+ return recipientKey;
+ }
+
+ public BytesValue getAuthResponseKey() {
+ return authResponseKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ HKDFKeys hkdfKeys = (HKDFKeys) o;
+ return Objects.equal(initiatorKey, hkdfKeys.initiatorKey)
+ && Objects.equal(recipientKey, hkdfKeys.recipientKey)
+ && Objects.equal(authResponseKey, hkdfKeys.authResponseKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(initiatorKey, recipientKey, authResponseKey);
+ }
+ }
+}
diff --git a/discovery/src/main/java/org/ethereum/beacon/discovery/MessageProcessor.java b/discovery/src/main/java/org/ethereum/beacon/discovery/MessageProcessor.java
new file mode 100644
index 000000000..16a7bd3f7
--- /dev/null
+++ b/discovery/src/main/java/org/ethereum/beacon/discovery/MessageProcessor.java
@@ -0,0 +1,37 @@
+package org.ethereum.beacon.discovery;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.ethereum.beacon.discovery.message.DiscoveryMessage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Highest level processor which knows several processors for different versions of {@link
+ * DiscoveryMessage}'s.
+ */
+public class MessageProcessor {
+ private static final Logger logger = LogManager.getLogger(MessageProcessor.class);
+ private final Map messageProcessors = new HashMap<>();
+
+ public MessageProcessor(DiscoveryMessageProcessor... messageProcessors) {
+ for (int i = 0; i < messageProcessors.length; ++i) {
+ this.messageProcessors.put(messageProcessors[i].getSupportedIdentity(), messageProcessors[i]);
+ }
+ }
+
+ public void handleIncoming(DiscoveryMessage message, NodeSession session) {
+ Protocol protocol = message.getProtocol();
+ DiscoveryMessageProcessor messageHandler = messageProcessors.get(protocol);
+ if (messageHandler == null) {
+ String error =
+ String.format(
+ "Message %s with identity %s received in session %s is not supported",
+ message, protocol, session);
+ logger.error(error);
+ throw new RuntimeException(error);
+ }
+ messageHandler.handleMessage(message, session);
+ }
+}
diff --git a/discovery/src/main/java/org/ethereum/beacon/discovery/NodeRecordInfo.java b/discovery/src/main/java/org/ethereum/beacon/discovery/NodeRecordInfo.java
new file mode 100644
index 000000000..5ec4fd91a
--- /dev/null
+++ b/discovery/src/main/java/org/ethereum/beacon/discovery/NodeRecordInfo.java
@@ -0,0 +1,101 @@
+package org.ethereum.beacon.discovery;
+
+import com.google.common.base.Objects;
+import org.ethereum.beacon.discovery.enr.NodeRecord;
+import org.ethereum.beacon.discovery.enr.NodeRecordFactory;
+import org.web3j.rlp.RlpDecoder;
+import org.web3j.rlp.RlpEncoder;
+import org.web3j.rlp.RlpList;
+import org.web3j.rlp.RlpString;
+import org.web3j.rlp.RlpType;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Container for {@link NodeRecord}. Also saves all necessary data about presence of this node and
+ * last test of its availability
+ */
+public class NodeRecordInfo {
+ private final NodeRecord node;
+ private final Long lastRetry;
+ private final NodeStatus status;
+ private final Integer retry;
+
+ public NodeRecordInfo(NodeRecord node, Long lastRetry, NodeStatus status, Integer retry) {
+ this.node = node;
+ this.lastRetry = lastRetry;
+ this.status = status;
+ this.retry = retry;
+ }
+
+ public static NodeRecordInfo createDefault(NodeRecord nodeRecord) {
+ return new NodeRecordInfo(nodeRecord, -1L, NodeStatus.ACTIVE, 0);
+ }
+
+ public static NodeRecordInfo fromRlpBytes(BytesValue bytes, NodeRecordFactory nodeRecordFactory) {
+ RlpList internalList = (RlpList) RlpDecoder.decode(bytes.extractArray()).getValues().get(0);
+ return new NodeRecordInfo(
+ nodeRecordFactory.fromBytes(((RlpString) internalList.getValues().get(0)).getBytes()),
+ ((RlpString) internalList.getValues().get(1)).asPositiveBigInteger().longValue(),
+ NodeStatus.fromNumber(((RlpString) internalList.getValues().get(2)).getBytes()[0]),
+ ((RlpString) internalList.getValues().get(1)).asPositiveBigInteger().intValue());
+ }
+
+ public BytesValue toRlpBytes() {
+ List values = new ArrayList<>();
+ values.add(RlpString.create(getNode().serialize().extractArray()));
+ values.add(RlpString.create(getLastRetry()));
+ values.add(RlpString.create(getStatus().byteCode()));
+ values.add(RlpString.create(getRetry()));
+ byte[] bytes = RlpEncoder.encode(new RlpList(values));
+ return BytesValue.wrap(bytes);
+ }
+
+ public NodeRecord getNode() {
+ return (NodeRecord) node;
+ }
+
+ public Long getLastRetry() {
+ return lastRetry;
+ }
+
+ public NodeStatus getStatus() {
+ return status;
+ }
+
+ public Integer getRetry() {
+ return retry;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ NodeRecordInfo that = (NodeRecordInfo) o;
+ return Objects.equal(node, that.node)
+ && Objects.equal(lastRetry, that.lastRetry)
+ && status == that.status
+ && Objects.equal(retry, that.retry);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(node, lastRetry, status, retry);
+ }
+
+ @Override
+ public String toString() {
+ return "NodeRecordInfo{"
+ + "node="
+ + node
+ + ", lastRetry="
+ + lastRetry
+ + ", status="
+ + status
+ + ", retry="
+ + retry
+ + '}';
+ }
+}
diff --git a/discovery/src/main/java/org/ethereum/beacon/discovery/NodeSession.java b/discovery/src/main/java/org/ethereum/beacon/discovery/NodeSession.java
new file mode 100644
index 000000000..b1328e165
--- /dev/null
+++ b/discovery/src/main/java/org/ethereum/beacon/discovery/NodeSession.java
@@ -0,0 +1,332 @@
+package org.ethereum.beacon.discovery;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.ethereum.beacon.discovery.enr.NodeRecord;
+import org.ethereum.beacon.discovery.packet.Packet;
+import org.ethereum.beacon.discovery.pipeline.info.RequestInfo;
+import org.ethereum.beacon.discovery.pipeline.info.RequestInfoFactory;
+import org.ethereum.beacon.discovery.storage.AuthTagRepository;
+import org.ethereum.beacon.discovery.storage.NodeBucket;
+import org.ethereum.beacon.discovery.storage.NodeBucketStorage;
+import org.ethereum.beacon.discovery.storage.NodeTable;
+import org.ethereum.beacon.discovery.task.TaskOptions;
+import org.ethereum.beacon.discovery.task.TaskType;
+import org.ethereum.beacon.util.ExpirationScheduler;
+import tech.pegasys.artemis.util.bytes.Bytes32;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static org.ethereum.beacon.discovery.task.TaskStatus.AWAIT;
+
+/**
+ * Stores session status and all keys for discovery message exchange between us, `homeNode` and the
+ * other `node`
+ */
+public class NodeSession {
+ public static final int NONCE_SIZE = 12;
+ public static final int REQUEST_ID_SIZE = 8;
+ private static final Logger logger = LogManager.getLogger(NodeSession.class);
+ private static final int CLEANUP_DELAY_SECONDS = 60;
+ private final NodeRecord homeNodeRecord;
+ private final Bytes32 homeNodeId;
+ private final AuthTagRepository authTagRepo;
+ private final NodeTable nodeTable;
+ private final NodeBucketStorage nodeBucketStorage;
+ private final Consumer outgoing;
+ private final Random rnd;
+ private NodeRecord nodeRecord;
+ private SessionStatus status = SessionStatus.INITIAL;
+ private Bytes32 idNonce;
+ private BytesValue initiatorKey;
+ private BytesValue recipientKey;
+ private Map requestIdStatuses = new ConcurrentHashMap<>();
+ private ExpirationScheduler requestExpirationScheduler =
+ new ExpirationScheduler<>(CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
+ private CompletableFuture completableFuture = null;
+ private BytesValue staticNodeKey;
+
+ public NodeSession(
+ NodeRecord nodeRecord,
+ NodeRecord homeNodeRecord,
+ BytesValue staticNodeKey,
+ NodeTable nodeTable,
+ NodeBucketStorage nodeBucketStorage,
+ AuthTagRepository authTagRepo,
+ Consumer outgoing,
+ Random rnd) {
+ this.nodeRecord = nodeRecord;
+ this.outgoing = outgoing;
+ this.authTagRepo = authTagRepo;
+ this.nodeTable = nodeTable;
+ this.nodeBucketStorage = nodeBucketStorage;
+ this.homeNodeRecord = homeNodeRecord;
+ this.staticNodeKey = staticNodeKey;
+ this.homeNodeId = homeNodeRecord.getNodeId();
+ this.rnd = rnd;
+ }
+
+ public NodeRecord getNodeRecord() {
+ return nodeRecord;
+ }
+
+ public synchronized void updateNodeRecord(NodeRecord nodeRecord) {
+ logger.trace(
+ () ->
+ String.format(
+ "NodeRecord updated from %s to %s in session %s",
+ this.nodeRecord, nodeRecord, this));
+ this.nodeRecord = nodeRecord;
+ }
+
+ private void completeConnectFuture() {
+ if (completableFuture != null) {
+ completableFuture.complete(null);
+ completableFuture = null;
+ }
+ }
+
+ public void sendOutgoing(Packet packet) {
+ logger.trace(() -> String.format("Sending outgoing packet %s in session %s", packet, this));
+ outgoing.accept(packet);
+ }
+
+ /**
+ * Creates object with request information: requestId etc, RequestInfo, designed to maintain
+ * request status and its changes. Also stores info in session repository to track related
+ * messages.
+ *
+ * The value selected as request ID must allow for concurrent conversations. Using a timestamp
+ * can result in parallel conversations with the same id, so this should be avoided. Request IDs
+ * also prevent replay of responses. Using a simple counter would be fine if the implementation
+ * could ensure that restarts or even re-installs would increment the counter based on previously
+ * saved state in all circumstances. The easiest to implement is a random number.
+ *
+ * @param taskType Type of task, clarifies starting and reply message types
+ * @param taskOptions Task options
+ * @param future Future to be fired when task is successfully completed or exceptionally break
+ * when its failed
+ * @return info bundle.
+ */
+ public synchronized RequestInfo createNextRequest(
+ TaskType taskType, TaskOptions taskOptions, CompletableFuture future) {
+ byte[] requestId = new byte[REQUEST_ID_SIZE];
+ rnd.nextBytes(requestId);
+ BytesValue wrappedId = BytesValue.wrap(requestId);
+ if (taskOptions.isLivenessUpdate()) {
+ future.whenComplete(
+ (aVoid, throwable) -> {
+ if (throwable == null) {
+ updateLiveness();
+ }
+ });
+ }
+ RequestInfo requestInfo = RequestInfoFactory.create(taskType, wrappedId, taskOptions, future);
+ requestIdStatuses.put(wrappedId, requestInfo);
+ requestExpirationScheduler.put(
+ wrappedId,
+ new Runnable() {
+ @Override
+ public void run() {
+ logger.debug(
+ () ->
+ String.format(
+ "Request %s expired for id %s in session %s: no reply",
+ requestInfo, wrappedId, this));
+ requestIdStatuses.remove(wrappedId);
+ }
+ });
+ return requestInfo;
+ }
+
+ /** Updates request info. Thread-safe. */
+ public synchronized void updateRequestInfo(BytesValue requestId, RequestInfo newRequestInfo) {
+ RequestInfo oldRequestInfo = requestIdStatuses.remove(requestId);
+ if (oldRequestInfo == null) {
+ logger.debug(
+ () ->
+ String.format(
+ "An attempt to update requestId %s in session %s which does not exist",
+ requestId, this));
+ return;
+ }
+ requestIdStatuses.put(requestId, newRequestInfo);
+ requestExpirationScheduler.put(
+ requestId,
+ new Runnable() {
+ @Override
+ public void run() {
+ logger.debug(
+ String.format(
+ "Request %s expired for id %s in session %s: no reply",
+ newRequestInfo, requestId, this));
+ requestIdStatuses.remove(requestId);
+ }
+ });
+ }
+
+ public synchronized void cancelAllRequests(String message) {
+ logger.debug(() -> String.format("Cancelling all requests in session %s", this));
+ Set requestIdsCopy = new HashSet<>(requestIdStatuses.keySet());
+ requestIdsCopy.forEach(
+ requestId -> {
+ RequestInfo requestInfo = clearRequestId(requestId);
+ requestInfo
+ .getFuture()
+ .completeExceptionally(
+ new RuntimeException(
+ String.format(
+ "Request %s cancelled due to reason: %s", requestInfo, message)));
+ });
+ }
+
+ /** Generates random nonce of {@link #NONCE_SIZE} size */
+ public synchronized BytesValue generateNonce() {
+ byte[] nonce = new byte[NONCE_SIZE];
+ rnd.nextBytes(nonce);
+ return BytesValue.wrap(nonce);
+ }
+
+ /** If true indicates that handshake is complete */
+ public synchronized boolean isAuthenticated() {
+ return SessionStatus.AUTHENTICATED.equals(status);
+ }
+
+ /** Resets stored authTags for this session making them obsolete */
+ public void cleanup() {
+ authTagRepo.expire(this);
+ }
+
+ public Optional getAuthTag() {
+ return authTagRepo.getTag(this);
+ }
+
+ public void setAuthTag(BytesValue authTag) {
+ authTagRepo.put(authTag, this);
+ }
+
+ public Bytes32 getHomeNodeId() {
+ return homeNodeId;
+ }
+
+ /** @return initiator key, also known as write key */
+ public BytesValue getInitiatorKey() {
+ return initiatorKey;
+ }
+
+ public void setInitiatorKey(BytesValue initiatorKey) {
+ this.initiatorKey = initiatorKey;
+ }
+
+ /** @return recipient key, also known as read key */
+ public BytesValue getRecipientKey() {
+ return recipientKey;
+ }
+
+ public void setRecipientKey(BytesValue recipientKey) {
+ this.recipientKey = recipientKey;
+ }
+
+ public synchronized void clearRequestId(BytesValue requestId, TaskType taskType) {
+ RequestInfo requestInfo = clearRequestId(requestId);
+ requestInfo.getFuture().complete(null);
+ assert taskType.equals(requestInfo.getTaskType());
+ }
+
+ /** Updates nodeRecord {@link NodeStatus} to ACTIVE of the node associated with this session */
+ public synchronized void updateLiveness() {
+ NodeRecordInfo nodeRecordInfo =
+ new NodeRecordInfo(getNodeRecord(), Functions.getTime(), NodeStatus.ACTIVE, 0);
+ nodeTable.save(nodeRecordInfo);
+ nodeBucketStorage.put(nodeRecordInfo);
+ }
+
+ private synchronized RequestInfo clearRequestId(BytesValue requestId) {
+ RequestInfo requestInfo = requestIdStatuses.remove(requestId);
+ requestExpirationScheduler.cancel(requestId);
+ return requestInfo;
+ }
+
+ public synchronized Optional getRequestId(BytesValue requestId) {
+ RequestInfo requestInfo = requestIdStatuses.get(requestId);
+ return requestId == null ? Optional.empty() : Optional.of(requestInfo);
+ }
+
+ /**
+ * Returns any queued {@link RequestInfo} which was not started because session is not
+ * authenticated
+ */
+ public synchronized Optional getFirstAwaitRequestInfo() {
+ return requestIdStatuses.values().stream()
+ .filter(requestInfo -> AWAIT.equals(requestInfo.getTaskStatus()))
+ .findFirst();
+ }
+
+ public NodeTable getNodeTable() {
+ return nodeTable;
+ }
+
+ public void putRecordInBucket(NodeRecordInfo nodeRecordInfo) {
+ nodeBucketStorage.put(nodeRecordInfo);
+ }
+
+ public Optional getBucket(int index) {
+ return nodeBucketStorage.get(index);
+ }
+
+ public synchronized Bytes32 getIdNonce() {
+ return idNonce;
+ }
+
+ public synchronized void setIdNonce(Bytes32 idNonce) {
+ this.idNonce = idNonce;
+ }
+
+ public NodeRecord getHomeNodeRecord() {
+ return homeNodeRecord;
+ }
+
+ @Override
+ public String toString() {
+ return "NodeSession{"
+ + "nodeRecord="
+ + nodeRecord
+ + ", homeNodeId="
+ + homeNodeId
+ + ", status="
+ + status
+ + '}';
+ }
+
+ public synchronized SessionStatus getStatus() {
+ return status;
+ }
+
+ public synchronized void setStatus(SessionStatus newStatus) {
+ logger.debug(
+ () ->
+ String.format(
+ "Switching status of node %s from %s to %s", nodeRecord, status, newStatus));
+ this.status = newStatus;
+ }
+
+ public BytesValue getStaticNodeKey() {
+ return staticNodeKey;
+ }
+
+ public enum SessionStatus {
+ INITIAL, // other side is trying to connect, or we are initiating (before random packet is sent
+ WHOAREYOU_SENT, // other side is initiator, we've sent whoareyou in response
+ RANDOM_PACKET_SENT, // our node is initiator, we've sent random packet
+ AUTHENTICATED
+ }
+}
diff --git a/discovery/src/main/java/org/ethereum/beacon/discovery/NodeStatus.java b/discovery/src/main/java/org/ethereum/beacon/discovery/NodeStatus.java
new file mode 100644
index 000000000..c96dd707d
--- /dev/null
+++ b/discovery/src/main/java/org/ethereum/beacon/discovery/NodeStatus.java
@@ -0,0 +1,33 @@
+package org.ethereum.beacon.discovery;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Status of {@link org.ethereum.beacon.discovery.enr.NodeRecord} */
+public enum NodeStatus {
+ ACTIVE(0x01), // Alive
+ SLEEP(0x02), // Didn't answer last time(s)
+ DEAD(0x03); // Didnt' answer for a long time
+
+ private static final Map codeMap = new HashMap<>();
+
+ static {
+ for (NodeStatus type : NodeStatus.values()) {
+ codeMap.put(type.code, type);
+ }
+ }
+
+ private int code;
+
+ NodeStatus(int code) {
+ this.code = code;
+ }
+
+ public static NodeStatus fromNumber(int i) {
+ return codeMap.get(i);
+ }
+
+ public byte byteCode() {
+ return (byte) code;
+ }
+}
diff --git a/discovery/src/main/java/org/ethereum/beacon/discovery/Protocol.java b/discovery/src/main/java/org/ethereum/beacon/discovery/Protocol.java
new file mode 100644
index 000000000..beaf30f9a
--- /dev/null
+++ b/discovery/src/main/java/org/ethereum/beacon/discovery/Protocol.java
@@ -0,0 +1,32 @@
+package org.ethereum.beacon.discovery;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Discovery protocol versions */
+public enum Protocol {
+ V4("v4"),
+ V5("v5");
+
+ private static final Map nameMap = new HashMap<>();
+
+ static {
+ for (Protocol scheme : Protocol.values()) {
+ nameMap.put(scheme.name, scheme);
+ }
+ }
+
+ private String name;
+
+ private Protocol(String name) {
+ this.name = name;
+ }
+
+ public static Protocol fromString(String name) {
+ return nameMap.get(name);
+ }
+
+ public String stringName() {
+ return name;
+ }
+}
diff --git a/discovery/src/main/java/org/ethereum/beacon/discovery/RlpUtil.java b/discovery/src/main/java/org/ethereum/beacon/discovery/RlpUtil.java
new file mode 100644
index 000000000..b56190ac0
--- /dev/null
+++ b/discovery/src/main/java/org/ethereum/beacon/discovery/RlpUtil.java
@@ -0,0 +1,102 @@
+package org.ethereum.beacon.discovery;
+
+import org.ethereum.beacon.discovery.enr.IdentitySchema;
+import org.javatuples.Pair;
+import org.web3j.rlp.RlpDecoder;
+import org.web3j.rlp.RlpList;
+import org.web3j.rlp.RlpString;
+import tech.pegasys.artemis.util.bytes.Bytes8;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+import tech.pegasys.artemis.util.uint.UInt64;
+
+import java.math.BigInteger;
+import java.util.function.Function;
+
+import static org.web3j.rlp.RlpDecoder.OFFSET_LONG_LIST;
+import static org.web3j.rlp.RlpDecoder.OFFSET_SHORT_LIST;
+
+/**
+ * Handy utilities used for RLP encoding and decoding and not fulfilled by {@link
+ * org.web3j.rlp.RlpEncoder} and {@link RlpDecoder}
+ */
+public class RlpUtil {
+ /**
+ * Calculates length of list beginning from the start of the data. So, there could everything else
+ * after first list in data, method helps to cut data in this case.
+ */
+ private static int calcListLen(BytesValue data) {
+ int prefix = data.get(0) & 0xFF;
+ int prefixAddon = 1;
+ if (prefix >= OFFSET_SHORT_LIST && prefix <= OFFSET_LONG_LIST) {
+
+ // 4. the data is a list if the range of the
+ // first byte is [0xc0, 0xf7], and the concatenation of
+ // the RLP encodings of all items of the list which the
+ // total payload is equal to the first byte minus 0xc0 follows the first byte;
+
+ byte listLen = (byte) (prefix - OFFSET_SHORT_LIST);
+ return listLen & 0xFF + prefixAddon;
+ } else if (prefix > OFFSET_LONG_LIST) {
+
+ // 5. the data is a list if the range of the
+ // first byte is [0xf8, 0xff], and the total payload of the
+ // list which length is equal to the
+ // first byte minus 0xf7 follows the first byte,
+ // and the concatenation of the RLP encodings of all items of
+ // the list follows the total payload of the list;
+
+ int lenOfListLen = (prefix - OFFSET_LONG_LIST) & 0xFF;
+ prefixAddon += lenOfListLen;
+ return UInt64.fromBytesBigEndian(Bytes8.leftPad(data.slice(1, lenOfListLen & 0xFF)))
+ .intValue()
+ + prefixAddon;
+ } else {
+ throw new RuntimeException("Not a start of RLP list!!");
+ }
+ }
+
+ /**
+ * @return first rlp list in provided data, plus remaining data starting from the end of this list
+ */
+ public static Pair decodeFirstList(BytesValue data) {
+ int len = RlpUtil.calcListLen(data);
+ return Pair.with(RlpDecoder.decode(data.slice(0, len).extractArray()), data.slice(len));
+ }
+
+ /**
+ * Encodes object to {@link RlpString}. Supports numbers, {@link BytesValue} etc.
+ *
+ * @throws RuntimeException with errorMessageFunction applied with `object` when encoding is not
+ * possible
+ */
+ public static RlpString encode(Object object, Function