diff --git a/build.gradle b/build.gradle
index 554214c37..7ea498ea7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -27,7 +27,9 @@ allprojects {
}
repositories {
+ mavenLocal()
jcenter()
+ maven { url "https://jitpack.io" }
}
task allDependencies(type: DependencyReportTask) {}
diff --git a/chain/src/main/java/org/ethereum/beacon/chain/DefaultBeaconChain.java b/chain/src/main/java/org/ethereum/beacon/chain/DefaultBeaconChain.java
index a8636349d..d3991216c 100644
--- a/chain/src/main/java/org/ethereum/beacon/chain/DefaultBeaconChain.java
+++ b/chain/src/main/java/org/ethereum/beacon/chain/DefaultBeaconChain.java
@@ -8,9 +8,9 @@
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.BeaconTupleStorage;
+import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.BeaconStateEx;
import org.ethereum.beacon.consensus.BlockTransition;
-import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.transition.EmptySlotTransition;
import org.ethereum.beacon.consensus.verifier.BeaconBlockVerifier;
import org.ethereum.beacon.consensus.verifier.BeaconStateVerifier;
diff --git a/chain/src/test/java/org/ethereum/beacon/chain/DefaultBeaconChainTest.java b/chain/src/test/java/org/ethereum/beacon/chain/DefaultBeaconChainTest.java
index 561e78255..12a585e51 100644
--- a/chain/src/test/java/org/ethereum/beacon/chain/DefaultBeaconChainTest.java
+++ b/chain/src/test/java/org/ethereum/beacon/chain/DefaultBeaconChainTest.java
@@ -1,5 +1,7 @@
package org.ethereum.beacon.chain;
+import java.util.Collections;
+import java.util.stream.IntStream;
import org.ethereum.beacon.chain.MutableBeaconChain.ImportResult;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.impl.SSZBeaconChainStorageFactory;
@@ -32,9 +34,6 @@
import tech.pegasys.artemis.ethereum.core.Hash32;
import tech.pegasys.artemis.util.uint.UInt64;
-import java.util.Collections;
-import java.util.stream.IntStream;
-
public class DefaultBeaconChainTest {
@Test
diff --git a/chain/src/test/java/org/ethereum/beacon/chain/util/SampleObservableState.java b/chain/src/test/java/org/ethereum/beacon/chain/util/SampleObservableState.java
index 75ec7d14e..5eda4f77d 100644
--- a/chain/src/test/java/org/ethereum/beacon/chain/util/SampleObservableState.java
+++ b/chain/src/test/java/org/ethereum/beacon/chain/util/SampleObservableState.java
@@ -14,6 +14,7 @@
import org.ethereum.beacon.chain.storage.util.StorageUtils;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.BeaconStateEx;
+import org.ethereum.beacon.consensus.ChainStart;
import org.ethereum.beacon.consensus.StateTransitions;
import org.ethereum.beacon.consensus.TestUtils;
import org.ethereum.beacon.consensus.transition.EmptySlotTransition;
@@ -30,7 +31,6 @@
import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.crypto.BLS381.KeyPair;
import org.ethereum.beacon.db.InMemoryDatabase;
-import org.ethereum.beacon.consensus.ChainStart;
import org.ethereum.beacon.schedulers.Schedulers;
import org.javatuples.Pair;
import org.reactivestreams.Publisher;
diff --git a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java
index c451e8eca..a6fae4014 100644
--- a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java
+++ b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java
@@ -3,6 +3,10 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.HTTPServer;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.ethereum.beacon.chain.observer.ObservableBeaconState;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.BeaconStateEx;
@@ -12,11 +16,6 @@
import org.ethereum.beacon.core.types.SlotNumber;
import tech.pegasys.artemis.util.collections.Bitlist;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Implements (some) metrics from Beacon chain metrics specs.
* @See
diff --git a/start/common/src/main/java/org/ethereum/beacon/start/common/DatabaseManager.java b/start/common/src/main/java/org/ethereum/beacon/start/common/DatabaseManager.java
index b9f0c32ea..0531051e6 100644
--- a/start/common/src/main/java/org/ethereum/beacon/start/common/DatabaseManager.java
+++ b/start/common/src/main/java/org/ethereum/beacon/start/common/DatabaseManager.java
@@ -1,14 +1,13 @@
package org.ethereum.beacon.start.common;
-import org.ethereum.beacon.core.types.Time;
-import org.ethereum.beacon.db.Database;
-import org.ethereum.beacon.db.InMemoryDatabase;
-import tech.pegasys.artemis.ethereum.core.Hash32;
-
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import org.ethereum.beacon.core.types.Time;
+import org.ethereum.beacon.db.Database;
+import org.ethereum.beacon.db.InMemoryDatabase;
+import tech.pegasys.artemis.ethereum.core.Hash32;
/**
* DB Manager which incapsulates logic to create or remove/clean DB. This is probably an
diff --git a/start/common/src/main/java/org/ethereum/beacon/start/common/Launcher.java b/start/common/src/main/java/org/ethereum/beacon/start/common/Launcher.java
index e75d9f627..7d0214464 100644
--- a/start/common/src/main/java/org/ethereum/beacon/start/common/Launcher.java
+++ b/start/common/src/main/java/org/ethereum/beacon/start/common/Launcher.java
@@ -1,5 +1,6 @@
package org.ethereum.beacon.start.common;
+import java.util.List;
import org.ethereum.beacon.bench.BenchmarkController;
import org.ethereum.beacon.bench.BenchmarkController.BenchmarkRoutine;
import org.ethereum.beacon.chain.DefaultBeaconChain;
@@ -11,6 +12,7 @@
import org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.BeaconChainStorageFactory;
+import org.ethereum.beacon.chain.storage.util.StorageUtils;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.BeaconStateEx;
import org.ethereum.beacon.consensus.ChainStart;
@@ -29,7 +31,6 @@
import org.ethereum.beacon.db.InMemoryDatabase;
import org.ethereum.beacon.pow.DepositContract;
import org.ethereum.beacon.schedulers.Schedulers;
-import org.ethereum.beacon.chain.storage.util.StorageUtils;
import org.ethereum.beacon.util.stats.MeasurementsCollector;
import org.ethereum.beacon.validator.BeaconChainProposer;
import org.ethereum.beacon.validator.attester.BeaconChainAttesterImpl;
@@ -41,8 +42,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.util.List;
-
public class Launcher {
private final BeaconChainSpec spec;
private final DepositContract depositContract;
diff --git a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java
index cbcd1c03c..61a4dd9f1 100644
--- a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java
+++ b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java
@@ -1,5 +1,12 @@
package org.ethereum.beacon.start.common;
+import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.chain.DefaultBeaconChain;
import org.ethereum.beacon.chain.MutableBeaconChain;
import org.ethereum.beacon.chain.ProposedBlockProcessor;
@@ -33,13 +40,11 @@
import org.ethereum.beacon.validator.local.MultiValidatorService;
import org.ethereum.beacon.validator.proposer.BeaconChainProposerImpl;
import org.ethereum.beacon.wire.Feedback;
-import org.ethereum.beacon.wire.MessageSerializer;
-import org.ethereum.beacon.wire.SimplePeerManagerImpl;
+import org.ethereum.beacon.wire.PeerManager;
import org.ethereum.beacon.wire.WireApiSub;
import org.ethereum.beacon.wire.WireApiSync;
import org.ethereum.beacon.wire.WireApiSyncServer;
-import org.ethereum.beacon.wire.message.SSZMessageSerializer;
-import org.ethereum.beacon.wire.net.ConnectionManager;
+import org.ethereum.beacon.wire.impl.libp2p.Libp2pLauncher;
import org.ethereum.beacon.wire.sync.BeaconBlockTree;
import org.ethereum.beacon.wire.sync.SyncManagerImpl;
import org.ethereum.beacon.wire.sync.SyncQueue;
@@ -47,14 +52,11 @@
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import tech.pegasys.artemis.util.bytes.Bytes4;
import tech.pegasys.artemis.util.uint.UInt64;
-import java.time.Duration;
-import java.util.List;
-
-import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT;
-
public class NodeLauncher {
+ private static final Logger logger = LogManager.getLogger(NodeLauncher.class);
private final BeaconChainSpec spec;
private final DepositContract depositContract;
@@ -80,12 +82,13 @@ public class NodeLauncher {
private byte networkId = 1;
private UInt64 chainId = UInt64.valueOf(1);
+ private Bytes4 fork = Bytes4.ZERO;
private boolean startSyncManager = false;
private WireApiSub wireApiSub;
private WireApiSync wireApiSyncRemote;
- private final ConnectionManager> connectionManager;
- private SimplePeerManagerImpl peerManager;
+ private final Libp2pLauncher networkLauncher;
+ private PeerManager peerManager;
private BeaconBlockTree blockTree;
private SyncQueue syncQueue;
private SyncManagerImpl syncManager;
@@ -95,7 +98,7 @@ public NodeLauncher(
BeaconChainSpec spec,
DepositContract depositContract,
List validatorCred,
- ConnectionManager> connectionManager,
+ Libp2pLauncher networkLauncher,
Database db,
BeaconChainStorage beaconChainStorage,
Schedulers schedulers,
@@ -104,7 +107,7 @@ public NodeLauncher(
this.spec = spec;
this.depositContract = depositContract;
this.validatorCred = validatorCred;
- this.connectionManager = connectionManager;
+ this.networkLauncher = networkLauncher;
this.db = db;
this.beaconChainStorage = beaconChainStorage;
this.schedulers = schedulers;
@@ -167,19 +170,20 @@ void chainStarted(ChainStart chainStartEvent) {
.withExternalVarResolver(new SpecConstantsResolver(spec.getConstants()))
.withExtraObjectCreator(SpecConstants.class, spec.getConstants())
.buildSerializer();
- MessageSerializer messageSerializer = new SSZMessageSerializer(ssz);
syncServer = new WireApiSyncServer(beaconChainStorage);
- peerManager = new SimplePeerManagerImpl(
- networkId,
- chainId,
- connectionManager.channelsStream(),
- ssz,
- spec,
- messageSerializer,
- schedulers,
- syncServer,
- beaconChain.getBlockStatesStream());
+ networkLauncher.setSpec(spec);
+ networkLauncher.setSszSerializer(ssz);
+ networkLauncher.setSchedulers(schedulers);
+ networkLauncher.setWireApiSyncServer(syncServer);
+ networkLauncher.setHeadStream(beaconChain.getBlockStatesStream());
+ networkLauncher.setFork(fork);
+
+ networkLauncher.init();
+ peerManager = networkLauncher.getPeerManager();
+
+ Flux.from(peerManager.connectedPeerStream()).subscribe(ch -> Metrics.peerAdded());
+ Flux.from(peerManager.disconnectedPeerStream()).subscribe(ch -> Metrics.peerRemoved());
wireApiSub = peerManager.getWireApiSub();
wireApiSyncRemote = peerManager.getWireApiSync();
@@ -240,9 +244,11 @@ void chainStarted(ChainStart chainStartEvent) {
syncManager.start();
}
-// Flux.from(wireApiSub.inboundBlocksStream())
-// .publishOn(schedulers.reactorEvents())
-// .subscribe(beaconChain::insert);
+ try {
+ networkLauncher.start().get(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.error("Problem with starting network", e);
+ }
}
public void stop() {
diff --git a/start/common/src/main/java/org/ethereum/beacon/start/common/util/SimulationKeyPairGenerator.java b/start/common/src/main/java/org/ethereum/beacon/start/common/util/SimulationKeyPairGenerator.java
index 9fc6e633b..d2ea476ba 100644
--- a/start/common/src/main/java/org/ethereum/beacon/start/common/util/SimulationKeyPairGenerator.java
+++ b/start/common/src/main/java/org/ethereum/beacon/start/common/util/SimulationKeyPairGenerator.java
@@ -1,17 +1,16 @@
package org.ethereum.beacon.start.common.util;
import com.google.common.primitives.Bytes;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
import org.ethereum.beacon.crypto.BLS381;
import org.ethereum.beacon.crypto.Hashes;
import org.ethereum.beacon.crypto.bls.bc.BCParameters;
import tech.pegasys.artemis.util.bytes.Bytes32;
import tech.pegasys.artemis.util.bytes.BytesValue;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
/**
* Key pair generation utilities for simulation purposes.
*/
diff --git a/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/network/Libp2pNetwork.java b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/network/Libp2pNetwork.java
new file mode 100644
index 000000000..936535f66
--- /dev/null
+++ b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/network/Libp2pNetwork.java
@@ -0,0 +1,147 @@
+package org.ethereum.beacon.emulator.config.main.network;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Libp2pNetwork extends Network {
+
+ public static class Peer {
+ private String addr;
+ private String id;
+
+ public Peer(String addr, String id) {
+ this.addr = addr;
+ this.id = id;
+ }
+
+ public String getAddr() {
+ return addr;
+ }
+
+ public void setAddr(String addr) {
+ this.addr = addr;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+ }
+
+ public static class GossipOptions {
+ Integer gossipD;
+ Integer gossipDLow;
+ Integer gossipDHigh;
+ Integer gossipDLazy;
+ Integer fanoutTTL;
+ Integer gossipAdvertise;
+ Integer gossipHistory;
+ Integer heartbeatIntervalMillis;
+
+ public Integer getGossipD() {
+ return gossipD;
+ }
+
+ public void setGossipD(Integer gossipD) {
+ this.gossipD = gossipD;
+ }
+
+ public Integer getGossipDLow() {
+ return gossipDLow;
+ }
+
+ public void setGossipDLow(Integer gossipDLow) {
+ this.gossipDLow = gossipDLow;
+ }
+
+ public Integer getGossipDHigh() {
+ return gossipDHigh;
+ }
+
+ public void setGossipDHigh(Integer gossipDHigh) {
+ this.gossipDHigh = gossipDHigh;
+ }
+
+ public Integer getGossipDLazy() {
+ return gossipDLazy;
+ }
+
+ public void setGossipDLazy(Integer gossipDLazy) {
+ this.gossipDLazy = gossipDLazy;
+ }
+
+ public Integer getFanoutTTL() {
+ return fanoutTTL;
+ }
+
+ public void setFanoutTTL(Integer fanoutTTL) {
+ this.fanoutTTL = fanoutTTL;
+ }
+
+ public Integer getGossipAdvertise() {
+ return gossipAdvertise;
+ }
+
+ public void setGossipAdvertise(Integer gossipAdvertise) {
+ this.gossipAdvertise = gossipAdvertise;
+ }
+
+ public Integer getGossipHistory() {
+ return gossipHistory;
+ }
+
+ public void setGossipHistory(Integer gossipHistory) {
+ this.gossipHistory = gossipHistory;
+ }
+
+ public Integer getHeartbeatIntervalMillis() {
+ return heartbeatIntervalMillis;
+ }
+
+ public void setHeartbeatIntervalMillis(Integer heartbeatIntervalMillis) {
+ this.heartbeatIntervalMillis = heartbeatIntervalMillis;
+ }
+ }
+
+ private Integer listenPort;
+ private String privateKey;
+ private List activePeers = new ArrayList<>();
+ private GossipOptions gossipOptions;
+
+ public Integer getListenPort() {
+ return listenPort;
+ }
+
+ public void setListenPort(Integer listenPort) {
+ this.listenPort = listenPort;
+ }
+
+ public List getActivePeers() {
+ return activePeers;
+ }
+
+ public void setActivePeers(
+ List activePeers) {
+ this.activePeers = activePeers;
+ }
+
+ public GossipOptions getGossipOptions() {
+ return gossipOptions;
+ }
+
+ public void setGossipOptions(
+ GossipOptions gossipOptions) {
+ this.gossipOptions = gossipOptions;
+ }
+
+ public String getPrivateKey() {
+ return privateKey;
+ }
+
+ public void setPrivateKey(String privateKey) {
+ this.privateKey = privateKey;
+ }
+}
diff --git a/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/network/Network.java b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/network/Network.java
index 93d51ca20..b2f6e99d3 100644
--- a/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/network/Network.java
+++ b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/network/Network.java
@@ -6,5 +6,6 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = NettyNetwork.class, name = "netty"),
+ @JsonSubTypes.Type(value = Libp2pNetwork.class, name = "libp2p"),
})
public abstract class Network {}
diff --git a/start/node/src/main/java/org/ethereum/beacon/node/ConfigUtils.java b/start/node/src/main/java/org/ethereum/beacon/node/ConfigUtils.java
index 368387866..08f08cdab 100644
--- a/start/node/src/main/java/org/ethereum/beacon/node/ConfigUtils.java
+++ b/start/node/src/main/java/org/ethereum/beacon/node/ConfigUtils.java
@@ -1,5 +1,8 @@
package org.ethereum.beacon.node;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.ChainStart;
import org.ethereum.beacon.core.operations.Deposit;
@@ -28,10 +31,6 @@
import tech.pegasys.artemis.util.collections.ReadList;
import tech.pegasys.artemis.util.uint.UInt64;
-import java.util.List;
-import java.util.Random;
-import java.util.stream.Collectors;
-
public class ConfigUtils {
public static List createCredentials(Signer config, boolean isBlsSign) {
diff --git a/start/node/src/main/java/org/ethereum/beacon/node/Node.java b/start/node/src/main/java/org/ethereum/beacon/node/Node.java
index 9ea04bfe9..92f59f1b9 100644
--- a/start/node/src/main/java/org/ethereum/beacon/node/Node.java
+++ b/start/node/src/main/java/org/ethereum/beacon/node/Node.java
@@ -1,5 +1,7 @@
package org.ethereum.beacon.node;
+import java.io.File;
+import java.util.List;
import org.ethereum.beacon.node.Node.VersionProvider;
import org.ethereum.beacon.node.command.LogLevel;
import org.ethereum.beacon.start.common.ClientInfo;
@@ -7,9 +9,6 @@
import picocli.CommandLine.IVersionProvider;
import picocli.CommandLine.RunLast;
-import java.io.File;
-import java.util.List;
-
@CommandLine.Command(
description = "Beacon chain node",
name = "node",
@@ -53,7 +52,8 @@ public class Node implements Runnable {
split = ",",
description = {
"Peers that node is actively connecting to.",
- "URL format: tcp://:"
+ "URL format: :",
+ "URL sample: /ip4/10.0.0.128/tcp/40001:11111111111111111111111111111111111111111111111111111111111111111111"
}
)
private List activePeers;
@@ -147,6 +147,13 @@ public class Node implements Runnable {
)
private boolean forceDBClean = false;
+ @CommandLine.Option(
+ names = "--db-prefix",
+ paramLabel = "db-prefix",
+ description = "Specifies db-prefix, used to construct db directory"
+ )
+ private String dbPrefix;
+
public String getName() {
return name;
}
@@ -187,6 +194,10 @@ public String getStartMode() {
return startMode;
}
+ public String getDbPrefix() {
+ return dbPrefix;
+ }
+
public static void main(String[] args) {
try {
CommandLine commandLine = new CommandLine(new Node());
diff --git a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java
index f86887495..e9d0f323e 100644
--- a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java
+++ b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java
@@ -1,6 +1,21 @@
package org.ethereum.beacon.node;
-import io.netty.channel.ChannelFutureListener;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+import java.util.concurrent.ThreadFactory;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -11,6 +26,7 @@
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.impl.SSZBeaconChainStorageFactory;
import org.ethereum.beacon.chain.storage.impl.SerializerFactory;
+import org.ethereum.beacon.chain.storage.util.StorageUtils;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.BeaconStateEx;
import org.ethereum.beacon.consensus.ChainStart;
@@ -33,47 +49,25 @@
import org.ethereum.beacon.emulator.config.main.Signer.Insecure;
import org.ethereum.beacon.emulator.config.main.ValidatorKeys.Private;
import org.ethereum.beacon.emulator.config.main.conract.EmulatorContract;
+import org.ethereum.beacon.emulator.config.main.network.Libp2pNetwork;
+import org.ethereum.beacon.emulator.config.main.network.Libp2pNetwork.Peer;
import org.ethereum.beacon.emulator.config.main.network.NettyNetwork;
import org.ethereum.beacon.emulator.config.main.network.Network;
import org.ethereum.beacon.node.metrics.Metrics;
import org.ethereum.beacon.pow.DepositContract;
import org.ethereum.beacon.schedulers.DefaultSchedulers;
-import org.ethereum.beacon.schedulers.Scheduler;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.start.common.DatabaseManager;
import org.ethereum.beacon.start.common.NodeLauncher;
import org.ethereum.beacon.start.common.util.MDCControlledSchedulers;
import org.ethereum.beacon.start.common.util.SimpleDepositContract;
-import org.ethereum.beacon.chain.storage.util.StorageUtils;
import org.ethereum.beacon.util.Objects;
import org.ethereum.beacon.validator.crypto.BLS381Credentials;
-import org.ethereum.beacon.wire.net.ConnectionManager;
-import org.ethereum.beacon.wire.net.netty.NettyClient;
-import org.ethereum.beacon.wire.net.netty.NettyServer;
+import org.ethereum.beacon.wire.impl.libp2p.Libp2pLauncher;
import org.jetbrains.annotations.NotNull;
-import reactor.core.publisher.Flux;
import tech.pegasys.artemis.ethereum.core.Hash32;
import tech.pegasys.artemis.util.bytes.BytesValue;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-import java.util.TimeZone;
-import java.util.concurrent.ThreadFactory;
-import java.util.stream.IntStream;
-
public class NodeCommandLauncher implements Runnable {
private static final Logger logger = LogManager.getLogger("node");
@@ -189,12 +183,15 @@ protected ThreadFactory createThreadFactory(String namePattern) {
config.getConfig().getValidator().getSigner(),
config.getChainSpec().getSpecHelpersOptions().isBlsSign());
- ConnectionManager> connectionManager;
if (config.getConfig().getNetworks().size() != 1) {
throw new IllegalArgumentException("1 network should be specified in config");
}
Network networkCfg = config.getConfig().getNetworks().get(0);
+ Libp2pLauncher libp2pLauncher = new Libp2pLauncher();
+
if (networkCfg instanceof NettyNetwork) {
+ throw new UnsupportedOperationException("Netty network is not supported anymore");
+ /*
NettyNetwork nettyConfig = (NettyNetwork) networkCfg;
NettyServer nettyServer = null;
if (nettyConfig.getListenPort() != null) {
@@ -215,15 +212,22 @@ protected ThreadFactory createThreadFactory(String namePattern) {
connectionManager = tcpConnectionManager;
for (String addr : nettyConfig.getActivePeers()) {
URI uri = URI.create(addr);
- tcpConnectionManager.addActivePeer(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
+ tcpConnectionManager
+ .addActivePeer(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
}
+ */
+ } else if (networkCfg instanceof Libp2pNetwork) {
+ Libp2pNetwork cfg = (Libp2pNetwork) networkCfg;
- Flux.from(connectionManager.channelsStream())
- .subscribe(
- ch -> {
- Metrics.peerAdded();
- ch.getCloseFuture().thenAccept(v -> Metrics.peerRemoved());
- });
+ if (cfg.getListenPort() != null) {
+ libp2pLauncher.setListenPort(cfg.getListenPort());
+ }
+ for (Peer peer : cfg.getActivePeers()) {
+ libp2pLauncher.addActivePeer(peer.getAddr(), peer.getId());
+ }
+ if (cfg.getPrivateKey() != null) {
+ libp2pLauncher.setPrivKey(BytesValue.fromHexString(cfg.getPrivateKey()));
+ }
} else {
throw new IllegalArgumentException(
"This type of network is not supported yet: " + networkCfg.getClass());
@@ -320,7 +324,7 @@ protected ThreadFactory createThreadFactory(String namePattern) {
specBuilder.buildSpec(),
depositContract,
credentials,
- connectionManager,
+ libp2pLauncher, //connectionManager,
db,
beaconChainStorage,
schedulers,
@@ -376,15 +380,15 @@ public NodeCommandLauncher build() {
if (cliOptions.getListenPort() != null || cliOptions.getActivePeers() != null) {
- NettyNetwork network = (NettyNetwork) config
+ Libp2pNetwork network = (Libp2pNetwork) config
.getConfig()
.getNetworks()
.stream()
- .filter(n -> n instanceof NettyNetwork)
+ .filter(n -> n instanceof Libp2pNetwork)
.findFirst().orElse(null);
if (network == null) {
- network = new NettyNetwork();
+ network = new Libp2pNetwork();
config.getConfig().getNetworks().add(network);
}
@@ -393,7 +397,18 @@ public NodeCommandLauncher build() {
}
if (cliOptions.getActivePeers() != null) {
- network.setActivePeers(cliOptions.getActivePeers());
+ network.setActivePeers(
+ cliOptions
+ .getActivePeers()
+ .stream()
+ .map(
+ a -> {
+ int idx = a.indexOf(":");
+ if (idx < 0)
+ throw new ConfigException("Invalid peer URL formal: '" + a + "'");
+ return new Peer(a.substring(0, idx), a.substring(idx + 1));
+ })
+ .collect(Collectors.toList()));
}
}
@@ -487,6 +502,10 @@ public NodeCommandLauncher build() {
config.getConfig().setMetricsEndpoint(cliOptions.getMetricsEndpoint());
}
+ if (cliOptions.getDbPrefix() != null) {
+ config.getConfig().setDb(cliOptions.getDbPrefix());
+ }
+
return new NodeCommandLauncher(
config,
specBuilder,
diff --git a/start/node/src/main/resources/config/default-node-config.yml b/start/node/src/main/resources/config/default-node-config.yml
index b423429fc..9f8ea67fd 100644
--- a/start/node/src/main/resources/config/default-node-config.yml
+++ b/start/node/src/main/resources/config/default-node-config.yml
@@ -1,7 +1,10 @@
config:
db: db
networks:
- - type: netty
+ # Libp2p based network conforming to spec
+ - type: libp2p
+ # TCP port the node should listen for incoming connections
+# listenPort: 40001
validator:
contract: !emulator
diff --git a/start/node/src/main/resources/config/sample-node-config.yml b/start/node/src/main/resources/config/sample-node-config.yml
index 1114c5d80..dc6d75d72 100644
--- a/start/node/src/main/resources/config/sample-node-config.yml
+++ b/start/node/src/main/resources/config/sample-node-config.yml
@@ -4,14 +4,16 @@ config:
# the list of networks
networks:
- # Simple proprietary protocol base on Netty TCP stack
- - type: netty
+ # Libp2p based network conforming to spec
+ - type: libp2p
# TCP port the node should listen for incoming connections
listenPort: 40001
# list of remote peers this peer will be actively connecting to
activePeers:
- - tcp://localhost:40002
- - tcp://localhost:40003
+ - addr: /ip4/127.0.0.1/tcp/40002
+ id: 0x11111111111111111111111111111111111111111111111111111111111111111111
+ - addr: /ip4/127.0.0.1/tcp/40003
+ id: 0x22222222222222222222222222222222222222222222222222222222222222222222
# Config for a validator service
validator:
diff --git a/test/src/test/resources/eth2.0-spec-tests b/test/src/test/resources/eth2.0-spec-tests
index ae6dd9011..c9015f535 160000
--- a/test/src/test/resources/eth2.0-spec-tests
+++ b/test/src/test/resources/eth2.0-spec-tests
@@ -1 +1 @@
-Subproject commit ae6dd9011df05fab8c7e651c09cf9c940973bf81
+Subproject commit c9015f53590ce36de849980a0b6fc77a1a09defc
diff --git a/versions.gradle b/versions.gradle
index 7deb8ce43..b58d7fbae 100644
--- a/versions.gradle
+++ b/versions.gradle
@@ -40,6 +40,8 @@ dependencyManagement {
dependency "org.rocksdb:rocksdbjni:6.2.2"
dependency "com.googlecode.concurrent-locks:concurrent-locks:1.0.0"
+ dependency 'io.libp2p:jvm-libp2p-minimal:0.0.1-SNAPSHOT'
+
dependency "io.prometheus:simpleclient:0.6.0"
dependency "io.prometheus:simpleclient_hotspot:0.6.0"
diff --git a/wire/build.gradle b/wire/build.gradle
index a2cfeda68..b59fa3655 100644
--- a/wire/build.gradle
+++ b/wire/build.gradle
@@ -11,6 +11,7 @@ dependencies {
implementation 'io.projectreactor:reactor-core'
implementation 'io.netty:netty-all'
implementation 'io.vertx:vertx-core'
+ implementation 'io.libp2p:jvm-libp2p-minimal'
testImplementation 'org.mockito:mockito-core'
testImplementation 'io.projectreactor:reactor-test'
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/Feedback.java b/wire/src/main/java/org/ethereum/beacon/wire/Feedback.java
index 62bbe1faa..ae6fb0401 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/Feedback.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/Feedback.java
@@ -1,6 +1,7 @@
package org.ethereum.beacon.wire;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import java.util.function.Function;
/**
@@ -17,6 +18,16 @@ static Feedback of(T result) {
return new Impl<>(result);
}
+ static Feedback of(T result, Consumer errorFeedbackHandler) {
+ Feedback ret = of(result);
+ ret.getFeedback().whenComplete((v, t) -> {
+ if (t != null) {
+ errorFeedbackHandler.accept(t);
+ }
+ });
+ return ret;
+ }
+
/**
* Return the wrapped value
*/
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/Peer.java b/wire/src/main/java/org/ethereum/beacon/wire/Peer.java
index f021da60d..4939d6c96 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/Peer.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/Peer.java
@@ -1,17 +1,20 @@
package org.ethereum.beacon.wire;
-import org.ethereum.beacon.wire.channel.Channel;
-import tech.pegasys.artemis.util.bytes.BytesValue;
+import java.util.concurrent.CompletableFuture;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.message.payload.HelloMessage;
/**
* Represent connected peer
*/
public interface Peer {
+ CompletableFuture getRemoteHelloMessage();
+
/**
* Returns raw bytes {@link Channel} of this peer
*/
- Channel getRawChannel();
+ PeerConnection getConnection();
/**
* Returns {@link WireApiSync} instance linked to this peer
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/PeerConnection.java b/wire/src/main/java/org/ethereum/beacon/wire/PeerConnection.java
new file mode 100644
index 000000000..dbfd5dd70
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/PeerConnection.java
@@ -0,0 +1,16 @@
+package org.ethereum.beacon.wire;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface PeerConnection {
+
+ /**
+ * Returns the future which completes when this connection is closed.
+ */
+ CompletableFuture getCloseFuture();
+
+ /**
+ * Closes this connection.
+ */
+ void close();
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/PeerManager.java b/wire/src/main/java/org/ethereum/beacon/wire/PeerManager.java
index 3c9359706..0772f5517 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/PeerManager.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/PeerManager.java
@@ -1,11 +1,10 @@
package org.ethereum.beacon.wire;
+import java.util.List;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.stream.RxUtil;
import org.reactivestreams.Publisher;
-import java.util.List;
-
/** Manages connected peers and aggregates their `high-level` APIs */
public interface PeerManager {
@@ -32,6 +31,10 @@ default Publisher> activePeersStream() {
return RxUtil.collect(activatedPeerStream(), disconnectedPeerStream());
}
+ default Publisher> connectedPeersStream() {
+ return RxUtil.collect(connectedPeerStream(), disconnectedPeerStream());
+ }
+
/**
* Returns WireApiSync instance which is the aggregation of all connected peer APIs. When
* currently no active peers the API enqueues invocations and execute them upon any active peer
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/SimplePeerManagerImpl.java b/wire/src/main/java/org/ethereum/beacon/wire/SimplePeerManagerImpl.java
deleted file mode 100644
index c909f6831..000000000
--- a/wire/src/main/java/org/ethereum/beacon/wire/SimplePeerManagerImpl.java
+++ /dev/null
@@ -1,147 +0,0 @@
-package org.ethereum.beacon.wire;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.ethereum.beacon.chain.BeaconTupleDetails;
-import org.ethereum.beacon.consensus.BeaconChainSpec;
-import org.ethereum.beacon.core.types.SlotNumber;
-import org.ethereum.beacon.schedulers.Schedulers;
-import org.ethereum.beacon.ssz.SSZSerializer;
-import org.ethereum.beacon.stream.SimpleProcessor;
-import org.ethereum.beacon.wire.channel.Channel;
-import org.ethereum.beacon.wire.message.payload.HelloMessage;
-import org.ethereum.beacon.wire.sync.WireApiSyncRouter;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import tech.pegasys.artemis.util.bytes.BytesValue;
-import tech.pegasys.artemis.util.uint.UInt64;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class SimplePeerManagerImpl implements PeerManager {
- private static final Logger logger = LogManager.getLogger(SimplePeerManagerImpl.class);
-
- private final byte networkId;
- private final UInt64 chainId;
-
- private final Publisher> channelsStream;
- private final SSZSerializer ssz;
- private final BeaconChainSpec spec;
- private final MessageSerializer messageSerializer;
- private final Schedulers schedulers;
- private final WireApiSync syncServer;
- private final Publisher headStream;
- private final WireApiSyncRouter wireApiSyncRouter;
- private final WireApiSubRouter wireApiSubRouter;
-
- private SlotNumber maxKnownSlot;
- private final SimpleProcessor maxSlotStream;
-
- private final Flux connectedPeersStream;
- private final List activePeers = Collections.synchronizedList(new ArrayList<>());
-
- public SimplePeerManagerImpl(
- byte networkId,
- UInt64 chainId,
- Publisher> channelsStream,
- SSZSerializer ssz,
- BeaconChainSpec spec,
- MessageSerializer messageSerializer,
- Schedulers schedulers,
- WireApiSync syncServer,
- Publisher headStream) {
-
- this.networkId = networkId;
- this.chainId = chainId;
- this.channelsStream = channelsStream;
- this.ssz = ssz;
- this.spec = spec;
- this.messageSerializer = messageSerializer;
- this.schedulers = schedulers;
- this.syncServer = syncServer;
- this.headStream = headStream;
-
- this.maxSlotStream = new SimpleProcessor<>(schedulers.events(), "PeerManager.maxSlot");
- connectedPeersStream = Flux.from(channelsStream)
- .map(this::createPeer)
- .doOnNext(this::updateBestSlot)
- .replay(1).autoConnect();
-
- Flux.from(activatedPeerStream()).subscribe(this::onNewActivePeer);
-
- wireApiSyncRouter = new WireApiSyncRouter(
- Flux.from(activatedPeerStream()).map(Peer::getSyncApi),
- Flux.from(disconnectedPeerStream()).map(Peer::getSyncApi));
-
- wireApiSubRouter = new WireApiSubRouter(
- Flux.from(activatedPeerStream()).map(Peer::getSubApi),
- Flux.from(disconnectedPeerStream()).map(Peer::getSubApi));
- }
-
- protected HelloMessage createLocalHello() {
- BeaconTupleDetails head = Mono.from(headStream).block(Duration.ofSeconds(10)); // TODO
- return new HelloMessage(
- networkId,
- chainId,
- head.getFinalState().getFinalizedCheckpoint().getRoot(),
- head.getFinalState().getFinalizedCheckpoint().getEpoch(),
- spec.getObjectHasher().getHashTruncateLast(head.getBlock()),
- head.getBlock().getSlot());
- }
-
- private void updateBestSlot(PeerImpl peer) {
- peer.getRemoteHelloMessage().thenAccept(helloMessage -> {
- if (helloMessage.getBestSlot().greater(maxKnownSlot)) {
- maxKnownSlot = helloMessage.getBestSlot();
- maxSlotStream.onNext(maxKnownSlot);
- }
- });
- }
-
- protected PeerImpl createPeer(Channel channel) {
- logger.info("Creating a peer from new channel: " + channel);
- return new PeerImpl(channel, createLocalHello(), ssz, messageSerializer, syncServer, schedulers);
- }
-
- @Override
- public Publisher connectedPeerStream() {
- return connectedPeersStream.map(p -> p);
- }
-
- @Override
- public Publisher disconnectedPeerStream() {
- return connectedPeersStream.flatMap(
- peer -> Mono.fromFuture(peer.getRawChannel().getCloseFuture().thenApply(v -> peer)));
- }
-
- @Override
- public Publisher activatedPeerStream() {
- return connectedPeersStream.flatMap(
- peer -> Mono.fromFuture(peer.getPeerActiveFuture().thenApply(v -> peer)));
- }
-
- protected void onNewActivePeer(Peer peer) {
- logger.info("New active peer: " + peer);
- activePeers.add(peer);
- peer.getRawChannel().getCloseFuture().thenAccept(v -> activePeers.remove(peer));
- }
-
- @Override
- public Publisher getMaxSlotStream() {
- return maxSlotStream;
- }
-
- @Override
- public WireApiSync getWireApiSync() {
- return wireApiSyncRouter;
- }
-
- @Override
- public WireApiSub getWireApiSub() {
- return wireApiSubRouter;
- }
-}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/WireApiSync.java b/wire/src/main/java/org/ethereum/beacon/wire/WireApiSync.java
index 12be015b0..8a100c9f7 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/WireApiSync.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/WireApiSync.java
@@ -13,9 +13,10 @@
import org.ethereum.beacon.core.BeaconBlockHeader;
import org.ethereum.beacon.wire.message.payload.BlockBodiesRequestMessage;
import org.ethereum.beacon.wire.message.payload.BlockBodiesResponseMessage;
-import org.ethereum.beacon.wire.message.payload.BlockRootsRequestMessage;
-import org.ethereum.beacon.wire.message.payload.BlockHeadersResponseMessage;
import org.ethereum.beacon.wire.message.payload.BlockHeadersRequestMessage;
+import org.ethereum.beacon.wire.message.payload.BlockHeadersResponseMessage;
+import org.ethereum.beacon.wire.message.payload.BlockRequestMessage;
+import org.ethereum.beacon.wire.message.payload.BlockRootsRequestMessage;
import org.ethereum.beacon.wire.message.payload.BlockRootsResponseMessage;
import tech.pegasys.artemis.ethereum.core.Hash32;
@@ -28,29 +29,41 @@ public interface WireApiSync {
/**
* Requests block roots from remote peer(s)
*/
- CompletableFuture requestBlockRoots(
- BlockRootsRequestMessage requestMessage);
+ default CompletableFuture requestBlockRoots(
+ BlockRootsRequestMessage requestMessage) {
+ throw new UnsupportedOperationException();
+ }
/**
* Requests block headers from remote peer(s)
*/
- CompletableFuture requestBlockHeaders(
- BlockHeadersRequestMessage requestMessage);
+ default CompletableFuture requestBlockHeaders(
+ BlockHeadersRequestMessage requestMessage) {
+ throw new UnsupportedOperationException();
+ }
/**
* Requests block bodies from remote peer(s)
*/
- CompletableFuture> requestBlockBodies(
- BlockBodiesRequestMessage requestMessage);
+ default CompletableFuture> requestBlockBodies(
+ BlockBodiesRequestMessage requestMessage) {
+ throw new UnsupportedOperationException();
+ }
/**
* Handy shortcut to download headers+bodies
*/
default CompletableFuture>> requestBlocks(
- BlockHeadersRequestMessage requestMessage, ObjectHasher hasher) {
+ BlockRequestMessage requestMessage, ObjectHasher hasher) {
+ BlockHeadersRequestMessage hReq = new BlockHeadersRequestMessage(
+ requestMessage.getHeadBlockRoot(),
+ requestMessage.getStartSlot(),
+ requestMessage.getCount(),
+ requestMessage.getStep()
+ );
CompletableFuture> headersFuture = requestBlockHeaders(
- requestMessage).thenApply(BlockHeadersResponseMessage::getHeaders);
+ hReq).thenApply(BlockHeadersResponseMessage::getHeaders);
CompletableFuture>> bodiesFuture =
headersFuture.thenCompose(
@@ -74,4 +87,9 @@ default CompletableFuture>> requestBlocks(
.collect(Collectors.toList()));
});
}
+
+ default CompletableFuture>> requestRecentBlocks(
+ List blockRoots, ObjectHasher hasher) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRpcMalformedException.java b/wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRpcMalformedException.java
new file mode 100644
index 000000000..9bac0b6ce
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRpcMalformedException.java
@@ -0,0 +1,11 @@
+package org.ethereum.beacon.wire.exceptions;
+
+/**
+ * Indicates remote side RPC protocol violation
+ */
+public class WireRpcMalformedException extends WireRpcException {
+
+ public WireRpcMalformedException(String message) {
+ super(message);
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRemoteRpcError.java b/wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRpcRemoteError.java
similarity index 61%
rename from wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRemoteRpcError.java
rename to wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRpcRemoteError.java
index 6188a0b1a..2e9c4695a 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRemoteRpcError.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/exceptions/WireRpcRemoteError.java
@@ -3,9 +3,9 @@
/**
* This exception is a 'deserialized version' of error answer from remote RPC party
*/
-public class WireRemoteRpcError extends WireRpcException {
+public class WireRpcRemoteError extends WireRpcException {
- public WireRemoteRpcError(String message) {
+ public WireRpcRemoteError(String message) {
super(message);
}
}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/AbstractPeerManager.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/AbstractPeerManager.java
new file mode 100644
index 000000000..9758b99ca
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/AbstractPeerManager.java
@@ -0,0 +1,113 @@
+package org.ethereum.beacon.wire.impl;
+
+import java.time.Duration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.ethereum.beacon.chain.BeaconTupleDetails;
+import org.ethereum.beacon.consensus.BeaconChainSpec;
+import org.ethereum.beacon.core.types.SlotNumber;
+import org.ethereum.beacon.schedulers.Schedulers;
+import org.ethereum.beacon.stream.SimpleProcessor;
+import org.ethereum.beacon.wire.Peer;
+import org.ethereum.beacon.wire.PeerManager;
+import org.ethereum.beacon.wire.WireApiSub;
+import org.ethereum.beacon.wire.WireApiSync;
+import org.ethereum.beacon.wire.message.payload.HelloMessage;
+import org.ethereum.beacon.wire.sync.WireApiSyncRouter;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import tech.pegasys.artemis.util.bytes.Bytes4;
+
+public abstract class AbstractPeerManager implements PeerManager {
+ private static final Logger logger = LogManager.getLogger(AbstractPeerManager.class);
+
+ protected final Bytes4 fork;
+ protected final BeaconChainSpec spec;
+ protected final Publisher headStream;
+ protected final Schedulers schedulers;
+ protected final WireApiSyncRouter wireApiSyncRouter;
+
+ protected SlotNumber maxKnownSlot;
+ protected final SimpleProcessor maxSlotStream;
+ protected final SimpleProcessor connectedPeersStream;
+
+ public AbstractPeerManager(
+ BeaconChainSpec spec,
+ Bytes4 fork,
+ Schedulers schedulers,
+ Publisher headStream) {
+
+ this.spec = spec;
+ this.fork = fork;
+ this.headStream = headStream;
+ this.schedulers = schedulers;
+
+ this.maxSlotStream = new SimpleProcessor<>(schedulers.events(), "PeerManager.maxSlot");
+ this.connectedPeersStream = new SimpleProcessor<>(schedulers.events(), "PeerManager.connectedPeers");
+
+ Flux.from(activatedPeerStream()).subscribe(this::onNewActivePeer);
+
+ wireApiSyncRouter = new WireApiSyncRouter(
+ Flux.from(activatedPeerStream()).map(Peer::getSyncApi),
+ Flux.from(disconnectedPeerStream()).map(Peer::getSyncApi));
+ }
+
+ protected void onNewPeer(Peer peer) {
+ connectedPeersStream.onNext(peer);
+ updateBestSlot(peer);
+ }
+
+ protected HelloMessage createLocalHello() {
+ BeaconTupleDetails head = Mono.from(headStream).block(Duration.ofSeconds(10)); // TODO
+ return new HelloMessage(
+ Bytes4.ZERO,
+ head.getFinalState().getFinalizedCheckpoint().getRoot(),
+ head.getFinalState().getFinalizedCheckpoint().getEpoch(),
+ spec.getObjectHasher().getHashTruncateLast(head.getBlock()),
+ head.getBlock().getSlot());
+ }
+
+ private void updateBestSlot(Peer peer) {
+ peer.getRemoteHelloMessage().thenAccept(helloMessage -> {
+ if (helloMessage.getHeadSlot().greater(maxKnownSlot)) {
+ maxKnownSlot = helloMessage.getHeadSlot();
+ maxSlotStream.onNext(maxKnownSlot);
+ }
+ });
+ }
+
+ @Override
+ public Publisher connectedPeerStream() {
+ return connectedPeersStream;
+ }
+
+ @Override
+ public Publisher disconnectedPeerStream() {
+ return Flux.from(connectedPeersStream).flatMap(
+ peer -> Mono.fromFuture(peer.getConnection().getCloseFuture().thenApply(v -> peer)));
+ }
+
+ @Override
+ public Publisher activatedPeerStream() {
+ return Flux.from(connectedPeersStream).flatMap(
+ peer -> Mono.fromFuture(peer.getRemoteHelloMessage().thenApply(v -> peer)));
+ }
+
+ protected void onNewActivePeer(Peer peer) {
+ logger.info("New active peer: " + peer);
+ }
+
+ @Override
+ public Publisher getMaxSlotStream() {
+ return maxSlotStream;
+ }
+
+ @Override
+ public WireApiSync getWireApiSync() {
+ return wireApiSyncRouter;
+ }
+
+ @Override
+ public abstract WireApiSub getWireApiSub();
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/GossipWireApiSub.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/GossipWireApiSub.java
new file mode 100644
index 000000000..544d1ef8a
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/GossipWireApiSub.java
@@ -0,0 +1,76 @@
+package org.ethereum.beacon.wire.impl.libp2p;
+
+import io.libp2p.core.crypto.PrivKey;
+import io.libp2p.core.pubsub.MessageApi;
+import io.libp2p.core.pubsub.PubsubApi;
+import io.libp2p.core.pubsub.PubsubPublisherApi;
+import io.libp2p.core.pubsub.PubsubSubscription;
+import io.libp2p.core.pubsub.Topic;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.Random;
+import org.ethereum.beacon.core.BeaconBlock;
+import org.ethereum.beacon.core.operations.Attestation;
+import org.ethereum.beacon.ssz.SSZSerializer;
+import org.ethereum.beacon.wire.WireApiSub;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.FluxSink;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+
+public class GossipWireApiSub implements WireApiSub {
+ private final SSZSerializer sszSerializer;
+ private final PubsubApi gossip;
+ private final PubsubPublisherApi publisher;
+ private final Topic blocksTopic = new Topic("/eth2/beacon_block/ssz");
+ private final Topic attestationsTopic = new Topic("/eth2/beacon_attestation/ssz");
+ private final PubsubSubscription subscription;
+ private final EmitterProcessor blocksStream = EmitterProcessor.create();
+ private final FluxSink blocksSink = blocksStream.sink();
+ private final EmitterProcessor attestationsStream = EmitterProcessor.create();
+ private final FluxSink attestationsSink = attestationsStream.sink();
+
+ public GossipWireApiSub(SSZSerializer sszSerializer, PubsubApi gossip,
+ PrivKey publisherKey) {
+ this.sszSerializer = sszSerializer;
+ this.gossip = gossip;
+ subscription = gossip.subscribe(this::onNewMessage, blocksTopic, attestationsTopic);
+ publisher = gossip.createPublisher(publisherKey, new Random().nextLong());
+ }
+
+ private void onNewMessage(MessageApi msg) {
+ if (msg.getTopics().contains(blocksTopic)) {
+ BeaconBlock block = sszSerializer
+ .decode(BytesValue.wrapBuffer(msg.getData()), BeaconBlock.class);
+ blocksSink.next(block);
+ } else if (msg.getTopics().contains(attestationsTopic)) {
+ Attestation attest = sszSerializer
+ .decode(BytesValue.wrapBuffer(msg.getData()), Attestation.class);
+ attestationsSink.next(attest);
+ }
+ }
+
+ @Override
+ public void sendProposedBlock(BeaconBlock block) {
+ byte[] bytes = sszSerializer.encode(block);
+ ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
+ publisher.publish(byteBuf, blocksTopic);
+ }
+
+ @Override
+ public void sendAttestation(Attestation attestation) {
+ byte[] bytes = sszSerializer.encode(attestation);
+ ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
+ publisher.publish(byteBuf, attestationsTopic);
+ }
+
+ @Override
+ public Publisher inboundBlocksStream() {
+ return blocksStream;
+ }
+
+ @Override
+ public Publisher inboundAttestationsStream() {
+ return attestationsStream;
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pLauncher.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pLauncher.java
new file mode 100644
index 000000000..bfebc2b88
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pLauncher.java
@@ -0,0 +1,163 @@
+package org.ethereum.beacon.wire.impl.libp2p;
+
+import io.libp2p.core.Host;
+import io.libp2p.core.PeerId;
+import io.libp2p.core.crypto.KEY_TYPE;
+import io.libp2p.core.crypto.KeyKt;
+import io.libp2p.core.crypto.PrivKey;
+import io.libp2p.core.dsl.BuildersJKt;
+import io.libp2p.core.multiformats.Multiaddr;
+import io.libp2p.crypto.keys.Secp256k1Kt;
+import io.libp2p.mux.mplex.MplexStreamMuxer;
+import io.libp2p.protocol.Ping;
+import io.libp2p.pubsub.gossip.Gossip;
+import io.libp2p.security.secio.SecIoSecureChannel;
+import io.libp2p.transport.tcp.TcpTransport;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.ethereum.beacon.chain.BeaconTupleDetails;
+import org.ethereum.beacon.consensus.BeaconChainSpec;
+import org.ethereum.beacon.schedulers.Schedulers;
+import org.ethereum.beacon.ssz.SSZSerializer;
+import org.ethereum.beacon.wire.WireApiSub;
+import org.ethereum.beacon.wire.WireApiSync;
+import org.ethereum.beacon.wire.impl.libp2p.encoding.RpcMessageCodecFactory;
+import org.ethereum.beacon.wire.impl.libp2p.encoding.SSZMessageCodec;
+import org.javatuples.Pair;
+import org.reactivestreams.Publisher;
+import tech.pegasys.artemis.util.bytes.Bytes4;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+
+public class Libp2pLauncher {
+ private static final Logger logger = LogManager.getLogger(Libp2pLauncher.class);
+
+ int listenPort = 40002;
+ List> activePeers = new ArrayList<>();
+ PrivKey privKey;
+ int activePeerReconnectTimeoutSec = 10;
+
+ // TODO gossip params
+ int gossipD = 3;
+ int gossipDLow = 2;
+ int gossipDHigh = 4;
+ int gossipDGossip = 3;
+
+ BeaconChainSpec spec;
+ Bytes4 fork;
+ SSZSerializer sszSerializer;
+ Schedulers schedulers;
+ WireApiSync wireApiSyncServer;
+ Publisher headStream;
+
+ Libp2pPeerManager peerManager;
+ Host host;
+
+ public void init() {
+ if (privKey == null) {
+ privKey = KeyKt.generateKeyPair(KEY_TYPE.SECP256K1).component1();
+ }
+ Gossip gossip = new Gossip(); // TODO gossip params
+ RpcMessageCodecFactory rpcCodecFactory = SSZMessageCodec.createFactory(sszSerializer);
+ WireApiSub wireApiSub = new GossipWireApiSub(sszSerializer, gossip.getApi(), privKey);
+ peerManager = new Libp2pPeerManager(
+ spec, fork, schedulers, headStream, wireApiSub, rpcCodecFactory, wireApiSyncServer);
+
+ host = BuildersJKt.hostJ(
+ b -> {
+ b.getIdentity().setFactory(() -> privKey);
+ b.getTransports().add(TcpTransport::new);
+ b.getSecureChannels().add(SecIoSecureChannel::new);
+ b.getMuxers().add(MplexStreamMuxer::new);
+ b.getNetwork().listen("/ip4/0.0.0.0/tcp/" + listenPort);
+
+ b.getProtocols().add(new Ping());
+ b.getProtocols().add(gossip);
+ b.getProtocols().addAll(peerManager.rpcMethods.all());
+
+ b.getConnectionHandlers().add(peerManager);
+ });
+ }
+
+ public CompletableFuture start() {
+ logger.info("Starting libp2p network...");
+ CompletableFuture ret = host.start().thenApply(i -> {
+ logger.info("Listening for connections on port " + listenPort + " with peerId " + PeerId
+ .fromPubKey(privKey.publicKey()).toHex());
+ return null;
+ });
+
+ for (Pair activePeer : activePeers) {
+ connectActively(activePeer.getValue0(), activePeer.getValue1());
+ }
+
+ return ret;
+ }
+
+ void connectActively(Multiaddr addr, PeerId id) {
+ logger.info("Connecting to " + addr + " peerId: " + id.toHex());
+ host.getNetwork().connect(id, addr).whenComplete((conn, t) -> {
+ if (t != null) {
+ logger.info("Connection to " + addr + " failed. Will retry shortly : " + t);
+ schedulers.events().executeWithDelay(Duration.ofSeconds(activePeerReconnectTimeoutSec), () -> {
+ connectActively(addr, id);
+ });
+ } else {
+ conn.closeFuture().thenAccept(ignore -> {
+ logger.info("Connection to " + addr + " closed. Will retry shortly");
+ schedulers.events().executeWithDelay(Duration.ofSeconds(activePeerReconnectTimeoutSec), () -> {
+ connectActively(addr, id);
+ });
+ });
+ }
+ });
+ }
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
+ public void setPrivKey(BytesValue secp256k1PrivateKeyBytes) {
+ this.privKey = Secp256k1Kt.unmarshalSecp256k1PrivateKey(secp256k1PrivateKeyBytes.getArrayUnsafe());
+ }
+
+ public void addActivePeer(String multiaddr, String hexPeerId) {
+ activePeers.add(Pair.with(new Multiaddr(multiaddr), PeerId.fromHex(hexPeerId)));
+ }
+
+ public void setSpec(BeaconChainSpec spec) {
+ this.spec = spec;
+ }
+
+ public void setFork(Bytes4 fork) {
+ this.fork = fork;
+ }
+
+ public void setSszSerializer(SSZSerializer sszSerializer) {
+ this.sszSerializer = sszSerializer;
+ }
+
+ public void setSchedulers(Schedulers schedulers) {
+ this.schedulers = schedulers;
+ }
+
+ public void setWireApiSyncServer(WireApiSync wireApiSyncServer) {
+ this.wireApiSyncServer = wireApiSyncServer;
+ }
+
+ public void setHeadStream(
+ Publisher headStream) {
+ this.headStream = headStream;
+ }
+
+ public Libp2pPeerManager getPeerManager() {
+ return peerManager;
+ }
+
+ public Host getHost() {
+ return host;
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pMethodHandler.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pMethodHandler.java
new file mode 100644
index 000000000..ba1cba73d
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pMethodHandler.java
@@ -0,0 +1,172 @@
+package org.ethereum.beacon.wire.impl.libp2p;
+
+import io.libp2p.core.Connection;
+import io.libp2p.core.P2PAbstractChannel;
+import io.libp2p.core.Stream;
+import io.libp2p.core.multistream.Mode;
+import io.libp2p.core.multistream.Multistream;
+import io.libp2p.core.multistream.ProtocolBinding;
+import io.libp2p.core.multistream.ProtocolMatcher;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.util.concurrent.CompletableFuture;
+import org.ethereum.beacon.wire.exceptions.WireRpcClosedException;
+import org.ethereum.beacon.wire.exceptions.WireRpcException;
+import org.ethereum.beacon.wire.exceptions.WireRpcMalformedException;
+import org.ethereum.beacon.wire.impl.libp2p.Libp2pMethodHandler.Controller;
+import org.ethereum.beacon.wire.impl.libp2p.encoding.MessageCodec;
+import org.ethereum.beacon.wire.impl.libp2p.encoding.RpcMessageCodec;
+import org.javatuples.Pair;
+import org.jetbrains.annotations.NotNull;
+
+public abstract class Libp2pMethodHandler
+ implements ProtocolBinding> {
+
+ private final String methodMultistreamId;
+ private final MessageCodec requestCodec;
+ private final MessageCodec> responseCodec;
+ private boolean notification = false;
+
+ public Libp2pMethodHandler(String methodMultistreamId,
+ RpcMessageCodec codec) {
+ this.methodMultistreamId = methodMultistreamId;
+ this.requestCodec = codec.getRequestMessageCodec();
+ this.responseCodec = codec.getResponseMessageCodec();
+ }
+
+ public CompletableFuture invokeRemote(Connection connection, TRequest request) {
+ return connection
+ .getMuxerSession()
+ .createStream(Multistream.create(this.toInitiator(methodMultistreamId)).toStreamHandler())
+ .getControler()
+ .thenCompose(ctr -> ctr.invoke(request));
+ }
+
+ protected abstract CompletableFuture invokeLocal(Connection connection, TRequest request);
+
+
+ public Libp2pMethodHandler setNotification() {
+ this.notification = true;
+ return this;
+ }
+
+ @NotNull
+ @Override
+ public String getAnnounce() {
+ return methodMultistreamId;
+ }
+
+ @NotNull
+ @Override
+ public ProtocolMatcher getMatcher() {
+ return new ProtocolMatcher(Mode.STRICT, getAnnounce(), null);
+ }
+
+ @NotNull
+ @Override
+ public CompletableFuture initChannel(P2PAbstractChannel channel, String s) {
+ // TODO timeout handlers
+ AbstractHandler handler;
+ if (channel.isInitiator()) {
+ handler = new RequesterHandler();
+ } else {
+ handler = new ResponderHandler(((Stream)channel).getConn());
+ }
+ channel.getNettyChannel().pipeline().addLast(handler);
+ return handler.activeFuture;
+ }
+
+ interface Controller {
+ CompletableFuture invoke(TRequest request);
+ }
+
+ abstract class AbstractHandler extends SimpleChannelInboundHandler
+ implements Controller {
+
+ final CompletableFuture activeFuture = new CompletableFuture<>();
+ }
+
+ class ResponderHandler extends AbstractHandler {
+ private final Connection connection;
+
+ public ResponderHandler(Connection connection) {
+ this.connection = connection;
+ activeFuture.complete(this);
+ }
+
+ @Override
+ public CompletableFuture invoke(TRequest tRequest) {
+ throw new IllegalStateException("This method shouldn't be called for Responder");
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
+ TRequest request = requestCodec.deserialize(byteBuf);
+ invokeLocal(connection, request)
+ .whenComplete(
+ (resp, err) -> {
+ ByteBuf respBuf = Unpooled.buffer();
+ responseCodec.serialize(Pair.with(resp, err), respBuf);
+ ctx.writeAndFlush(respBuf);
+ ctx.channel().disconnect();
+ });
+ }
+ }
+
+ class RequesterHandler extends AbstractHandler {
+ private ChannelHandlerContext ctx;
+ private CompletableFuture respFuture;
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
+ if (respFuture == null) {
+ throw new WireRpcMalformedException("Some data received prior to request: " + byteBuf);
+ }
+ Pair response = responseCodec.deserialize(byteBuf);
+ if (response.getValue0() != null) {
+ respFuture.complete(response.getValue0());
+ } else {
+ respFuture.completeExceptionally(response.getValue1());
+ }
+ }
+
+ @Override
+ public CompletableFuture invoke(TRequest tRequest) {
+ ByteBuf reqByteBuf = Unpooled.buffer();
+ requestCodec.serialize(tRequest, reqByteBuf);
+ respFuture = new CompletableFuture<>();
+ ctx.writeAndFlush(reqByteBuf);
+ if (notification) {
+ ctx.channel().close();
+ return CompletableFuture.completedFuture(null);
+ } else {
+ ctx.channel().disconnect();
+ return respFuture;
+ }
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ this.ctx = ctx;
+ activeFuture.complete(this);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ WireRpcException exception = new WireRpcException("Channel exception", cause);
+ activeFuture.completeExceptionally(exception);
+ respFuture.completeExceptionally(exception);
+ ctx.channel().close();
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ WireRpcClosedException exception = new WireRpcClosedException("Stream closed.");
+ activeFuture.completeExceptionally(exception);
+ respFuture.completeExceptionally(exception);
+ ctx.channel().close();
+ }
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pPeer.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pPeer.java
new file mode 100644
index 000000000..91ad96c1a
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pPeer.java
@@ -0,0 +1,92 @@
+package org.ethereum.beacon.wire.impl.libp2p;
+
+import io.libp2p.core.Connection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.ethereum.beacon.consensus.hasher.ObjectHasher;
+import org.ethereum.beacon.core.BeaconBlock;
+import org.ethereum.beacon.wire.Feedback;
+import org.ethereum.beacon.wire.Peer;
+import org.ethereum.beacon.wire.PeerConnection;
+import org.ethereum.beacon.wire.WireApiSub;
+import org.ethereum.beacon.wire.WireApiSync;
+import org.ethereum.beacon.wire.message.payload.BlockRequestMessage;
+import org.ethereum.beacon.wire.message.payload.HelloMessage;
+import org.ethereum.beacon.wire.message.payload.RecentBlockRequestMessage;
+import tech.pegasys.artemis.ethereum.core.Hash32;
+
+public class Libp2pPeer implements Peer {
+
+ class Libp2pPeerConnection implements PeerConnection {
+
+ @Override
+ public CompletableFuture getCloseFuture() {
+ return connection.closeFuture().thenApply(unit -> null);
+ }
+
+ @Override
+ public void close() {
+ connection.getNettyChannel().close();
+ }
+ }
+
+ class Libp2pWireSync implements WireApiSync {
+
+ @Override
+ public CompletableFuture>> requestBlocks(
+ BlockRequestMessage requestMessage, ObjectHasher hasher) {
+ return rpcMethods
+ .blocks
+ .invokeRemote(connection, requestMessage)
+ .thenApply(
+ resp -> Feedback.of(resp.getBlocks(), Libp2pPeer.this::invalidBlockReported));
+ }
+
+ @Override
+ public CompletableFuture>> requestRecentBlocks(
+ List blockRoots,
+ ObjectHasher hasher) {
+ return rpcMethods
+ .recentBlocks
+ .invokeRemote(connection, new RecentBlockRequestMessage(blockRoots))
+ .thenApply(
+ resp -> Feedback.of(resp.getBlocks(), Libp2pPeer.this::invalidBlockReported));
+ }
+ }
+
+ final Connection connection;
+ private final RpcMethods rpcMethods;
+ final CompletableFuture remoteHello = new CompletableFuture<>();
+ private final Libp2pPeerConnection peerConnection = new Libp2pPeerConnection();
+ private final Libp2pWireSync wireSync = new Libp2pWireSync();
+
+ public Libp2pPeer(Connection connection, RpcMethods rpcMethods) {
+ this.connection = connection;
+ this.rpcMethods = rpcMethods;
+ }
+
+ private void invalidBlockReported(Throwable err) {
+
+ }
+
+ @Override
+ public CompletableFuture getRemoteHelloMessage() {
+ return remoteHello;
+ }
+
+ @Override
+ public PeerConnection getConnection() {
+ return peerConnection;
+ }
+
+ @Override
+ public WireApiSync getSyncApi() {
+ return wireSync;
+ }
+
+ @Override
+ public WireApiSub getSubApi() {
+ throw new UnsupportedOperationException(
+ "SubApi is managed by Libp2p gossip and is not available on per peer basis");
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pPeerManager.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pPeerManager.java
new file mode 100644
index 000000000..b52fcf7f2
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/Libp2pPeerManager.java
@@ -0,0 +1,88 @@
+package org.ethereum.beacon.wire.impl.libp2p;
+
+import io.libp2p.core.Connection;
+import io.libp2p.core.ConnectionHandler;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.ethereum.beacon.chain.BeaconTupleDetails;
+import org.ethereum.beacon.consensus.BeaconChainSpec;
+import org.ethereum.beacon.schedulers.Schedulers;
+import org.ethereum.beacon.wire.WireApiSub;
+import org.ethereum.beacon.wire.WireApiSync;
+import org.ethereum.beacon.wire.exceptions.WireRpcMalformedException;
+import org.ethereum.beacon.wire.impl.AbstractPeerManager;
+import org.ethereum.beacon.wire.impl.libp2p.encoding.RpcMessageCodecFactory;
+import org.ethereum.beacon.wire.message.payload.GoodbyeMessage;
+import org.ethereum.beacon.wire.message.payload.HelloMessage;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import tech.pegasys.artemis.util.bytes.Bytes4;
+
+public class Libp2pPeerManager extends AbstractPeerManager implements ConnectionHandler {
+ private static final Logger logger = LogManager.getLogger(Libp2pPeerManager.class);
+
+ private final WireApiSub wireApiSub;
+
+ final RpcMethods rpcMethods;
+ private volatile List connectedPeers = Collections.emptyList();
+
+ public Libp2pPeerManager(
+ BeaconChainSpec spec,
+ Bytes4 fork,
+ Schedulers schedulers,
+ Publisher headStream,
+ WireApiSub wireApiSub,
+ RpcMessageCodecFactory codecFactory,
+ WireApiSync server) {
+ super(spec, fork, schedulers, headStream);
+
+ this.wireApiSub = wireApiSub;
+
+ rpcMethods = new RpcMethods(spec.getObjectHasher(), codecFactory, server, this::hello, this::goodbye);
+
+ Flux.from(connectedPeersStream()).subscribe(l ->
+ connectedPeers = l.stream().map(p -> (Libp2pPeer) p).collect(Collectors.toList()));
+ }
+
+ private Void goodbye(Connection connection, GoodbyeMessage message) {
+ logger.info("Peer " + connection + " said goodbye: " + message);
+ return null;
+ }
+
+ private HelloMessage hello(Connection connection, HelloMessage helloMessage) {
+ logger.info("Peer " + connection + " said hello: " + helloMessage);
+ if (connection.isInitiator()) {
+ throw new WireRpcMalformedException(
+ "Responder peer shouldn't initiate Hello message: " + helloMessage);
+ } else {
+ getPeer(connection).getRemoteHelloMessage().complete(helloMessage);
+ return createLocalHello();
+ }
+ }
+
+ private Libp2pPeer getPeer(Connection conn) {
+ return connectedPeers.stream()
+ .filter(p -> conn == p.connection)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Can't find a peer for connection: " + conn));
+ }
+
+ @Override
+ public void handleConnection(Connection connection) {
+ logger.info("New connection: " + connection);
+ Libp2pPeer peer = new Libp2pPeer(connection, rpcMethods);
+ onNewPeer(peer);
+ if (connection.isInitiator()) {
+ rpcMethods.hello.invokeRemote(connection, createLocalHello())
+ .thenApply(resp -> peer.getRemoteHelloMessage().complete(resp));
+ }
+ }
+
+ @Override
+ public WireApiSub getWireApiSub() {
+ return wireApiSub;
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/RpcMethods.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/RpcMethods.java
new file mode 100644
index 000000000..4c64d503e
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/RpcMethods.java
@@ -0,0 +1,84 @@
+package org.ethereum.beacon.wire.impl.libp2p;
+
+import io.libp2p.core.Connection;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import org.ethereum.beacon.consensus.hasher.ObjectHasher;
+import org.ethereum.beacon.wire.WireApiSync;
+import org.ethereum.beacon.wire.impl.libp2p.encoding.RpcMessageCodecFactory;
+import org.ethereum.beacon.wire.message.payload.BlockRequestMessage;
+import org.ethereum.beacon.wire.message.payload.BlockResponseMessage;
+import org.ethereum.beacon.wire.message.payload.GoodbyeMessage;
+import org.ethereum.beacon.wire.message.payload.HelloMessage;
+import org.ethereum.beacon.wire.message.payload.RecentBlockRequestMessage;
+import org.ethereum.beacon.wire.message.payload.RecentBlockResponseMessage;
+import tech.pegasys.artemis.ethereum.core.Hash32;
+
+public class RpcMethods {
+
+ final Libp2pMethodHandler hello;
+ final Libp2pMethodHandler goodbye;
+ final Libp2pMethodHandler blocks;
+ final Libp2pMethodHandler recentBlocks;
+
+
+ public RpcMethods(
+ ObjectHasher hasher,
+ RpcMessageCodecFactory codecFactory, WireApiSync server,
+ BiFunction helloHandler,
+ BiFunction goodbyeHandler) {
+
+ hello = new Libp2pMethodHandler(
+ "/eth2/beacon_chain/req/hello/1/ssz",
+ codecFactory.create(HelloMessage.class, HelloMessage.class)) {
+ @Override
+ protected CompletableFuture invokeLocal(Connection connection,
+ HelloMessage helloMessage) {
+ return CompletableFuture.completedFuture(helloHandler.apply(connection, helloMessage));
+ }
+ };
+
+ goodbye = new Libp2pMethodHandler(
+ "/eth2/beacon_chain/req/goodbye/1/ssz",
+ codecFactory.create(GoodbyeMessage.class, Void.class)) {
+ @Override
+ protected CompletableFuture invokeLocal(Connection connection,
+ GoodbyeMessage msg) {
+ goodbyeHandler.apply(connection, msg);
+ return null;
+ }
+ }.setNotification();
+
+ blocks = new Libp2pMethodHandler(
+ "/eth2/beacon_chain/req/beacon_blocks/1/ssz",
+ codecFactory.create(BlockRequestMessage.class, BlockResponseMessage.class)) {
+ @Override
+ protected CompletableFuture invokeLocal(Connection connection,
+ BlockRequestMessage msg) {
+ return server.requestBlocks(msg, hasher).thenApply(l -> new BlockResponseMessage(l.get()));
+ }
+ };
+
+ recentBlocks = new Libp2pMethodHandler(
+ "/eth2/beacon_chain/req/recent_beacon_blocks/1/ssz",
+ codecFactory.create(RecentBlockRequestMessage.class, RecentBlockResponseMessage.class)) {
+ @Override
+ protected CompletableFuture invokeLocal(Connection connection,
+ RecentBlockRequestMessage msg) {
+ return server.requestRecentBlocks(msg.getBlockRoots(), hasher)
+ .thenApply(l -> new RecentBlockResponseMessage(l.get()));
+ }
+ };
+ }
+
+ public List> all() {
+ return Arrays.asList(
+ hello,
+ goodbye,
+ blocks,
+ recentBlocks
+ );
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/MessageCodec.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/MessageCodec.java
new file mode 100644
index 000000000..78cd2d4c9
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/MessageCodec.java
@@ -0,0 +1,11 @@
+package org.ethereum.beacon.wire.impl.libp2p.encoding;
+
+import io.netty.buffer.ByteBuf;
+
+public interface MessageCodec {
+
+ void serialize(TMessage msg, ByteBuf buf);
+
+ TMessage deserialize(ByteBuf buf);
+}
+
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/RpcMessageCodec.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/RpcMessageCodec.java
new file mode 100644
index 000000000..f12f258a2
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/RpcMessageCodec.java
@@ -0,0 +1,10 @@
+package org.ethereum.beacon.wire.impl.libp2p.encoding;
+
+import org.javatuples.Pair;
+
+public interface RpcMessageCodec {
+
+ MessageCodec getRequestMessageCodec();
+
+ MessageCodec> getResponseMessageCodec();
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/RpcMessageCodecFactory.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/RpcMessageCodecFactory.java
new file mode 100644
index 000000000..1814bfdce
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/RpcMessageCodecFactory.java
@@ -0,0 +1,7 @@
+package org.ethereum.beacon.wire.impl.libp2p.encoding;
+
+public interface RpcMessageCodecFactory {
+
+ RpcMessageCodec create(Class reqClass,
+ Class respClass);
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/SSZMessageCodec.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/SSZMessageCodec.java
new file mode 100644
index 000000000..c3f6229ee
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/libp2p/encoding/SSZMessageCodec.java
@@ -0,0 +1,178 @@
+package org.ethereum.beacon.wire.impl.libp2p.encoding;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.CorruptedFrameException;
+import org.ethereum.beacon.ssz.SSZSerializer;
+import org.ethereum.beacon.wire.exceptions.WireRpcMalformedException;
+import org.ethereum.beacon.wire.exceptions.WireRpcRemoteError;
+import org.ethereum.beacon.wire.message.ErrorCode;
+import org.javatuples.Pair;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+
+public class SSZMessageCodec implements RpcMessageCodec {
+
+ public static RpcMessageCodecFactory createFactory(SSZSerializer sszSerializer) {
+ return new RpcMessageCodecFactory() {
+ @Override
+ public RpcMessageCodec create(
+ Class reqClass,
+ Class respClass) {
+ return new SSZMessageCodec<>(sszSerializer, reqClass, respClass);
+ }
+ };
+ }
+
+ private final SSZSerializer sszSerializer;
+ private final Class requestClass;
+ private final Class responseClass;
+
+ public SSZMessageCodec(SSZSerializer sszSerializer, Class requestClass,
+ Class responseClass) {
+ this.sszSerializer = sszSerializer;
+ this.requestClass = requestClass;
+ this.responseClass = responseClass;
+ }
+
+ @Override
+ public MessageCodec getRequestMessageCodec() {
+ return new Request();
+ }
+
+ @Override
+ public MessageCodec> getResponseMessageCodec() {
+ return new Response();
+ }
+
+ private void serializeMsg(Object msg, ByteBuf buf) {
+ byte[] msgBytes = sszSerializer.encode(msg);
+ writeRawVarint32(buf, msgBytes.length);
+ buf.writeBytes(msgBytes);
+ }
+
+ private TMessage deserializeMsg(ByteBuf buf, Class clazz) {
+ int msgSize = readRawVarint32(buf);
+ if (msgSize != buf.readableBytes()) {
+ throw new WireRpcMalformedException("Size in header (" + msgSize + ") doesn't match payload size: " + buf.readableBytes());
+ }
+ return sszSerializer.decode(BytesValue.wrapBuffer(buf), clazz);
+ }
+
+ class Request implements MessageCodec {
+
+ @Override
+ public void serialize(TRequest msg, ByteBuf buf) {
+ serializeMsg(msg, buf);
+ }
+
+ @Override
+ public TRequest deserialize(ByteBuf buf) {
+ return deserializeMsg(buf, requestClass);
+ }
+ }
+
+ class Response implements MessageCodec> {
+
+ @Override
+ public void serialize(Pair msg, ByteBuf buf) {
+ if (msg.getValue1() != null) {
+ buf.writeByte(ErrorCode.ServerError.getCode());
+ byte[] errMsgBytes = msg.getValue1().getMessage().getBytes();
+ writeRawVarint32(buf, errMsgBytes.length);
+ buf.writeBytes(errMsgBytes);
+ } else {
+ buf.writeByte(ErrorCode.OK.getCode());
+ serializeMsg(msg.getValue0(), buf);
+ }
+ }
+
+ @Override
+ public Pair deserialize(ByteBuf buf) {
+ byte errCode = buf.readByte();
+ ErrorCode error = ErrorCode.fromCode(errCode);
+ TResponse message = null;
+ Throwable err = null;
+ if (error == ErrorCode.OK) {
+ message = deserializeMsg(buf, responseClass);
+ } else {
+ int msgSize = readRawVarint32(buf);
+ if (msgSize != buf.readableBytes()) {
+ throw new WireRpcMalformedException("Size in header (" + msgSize + ") doesn't match payload size: " + buf.readableBytes());
+ }
+ byte[] msgBytes = new byte[buf.readableBytes()];
+ buf.readBytes(msgBytes);
+ String errMsg = new String(msgBytes);
+ err = new WireRpcRemoteError("Error: " + err + ": " + errMsg);
+ }
+ return Pair.with(message, err);
+ }
+ }
+
+ /**
+ * Encodes int as Protobuf varint
+ * Copied from https://github.com/netty/netty/blob/00afb19d7a37de21b35ce4f6cb3fa7f74809f2ab/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java#L58
+ */
+ static void writeRawVarint32(ByteBuf out, int value) {
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ out.writeByte(value);
+ return;
+ } else {
+ out.writeByte((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ }
+ }
+
+ /**
+ * Decodes Protobuf varint
+ * Copied from: https://github.com/netty/netty/blob/00afb19d7a37de21b35ce4f6cb3fa7f74809f2ab/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java#L73
+ */
+ static int readRawVarint32(ByteBuf buffer) {
+ if (!buffer.isReadable()) {
+ return 0;
+ }
+ buffer.markReaderIndex();
+ byte tmp = buffer.readByte();
+ if (tmp >= 0) {
+ return tmp;
+ } else {
+ int result = tmp & 127;
+ if (!buffer.isReadable()) {
+ buffer.resetReaderIndex();
+ return 0;
+ }
+ if ((tmp = buffer.readByte()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 127) << 7;
+ if (!buffer.isReadable()) {
+ buffer.resetReaderIndex();
+ return 0;
+ }
+ if ((tmp = buffer.readByte()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 127) << 14;
+ if (!buffer.isReadable()) {
+ buffer.resetReaderIndex();
+ return 0;
+ }
+ if ((tmp = buffer.readByte()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 127) << 21;
+ if (!buffer.isReadable()) {
+ buffer.resetReaderIndex();
+ return 0;
+ }
+ result |= (tmp = buffer.readByte()) << 28;
+ if (tmp < 0) {
+ throw new CorruptedFrameException("malformed varint.");
+ }
+ }
+ }
+ }
+ return result;
+ }
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/PeerImpl.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/PeerImpl.java
similarity index 76%
rename from wire/src/main/java/org/ethereum/beacon/wire/PeerImpl.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/PeerImpl.java
index 966569496..ca809cd64 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/PeerImpl.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/PeerImpl.java
@@ -1,12 +1,16 @@
-package org.ethereum.beacon.wire;
+package org.ethereum.beacon.wire.impl.plain;
import java.util.concurrent.CompletableFuture;
-import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.ssz.SSZSerializer;
-import org.ethereum.beacon.wire.channel.Channel;
-import org.ethereum.beacon.wire.channel.beacon.BeaconPipeline;
-import org.ethereum.beacon.wire.channel.beacon.WireApiSubAdapter;
+import org.ethereum.beacon.wire.MessageSerializer;
+import org.ethereum.beacon.wire.Peer;
+import org.ethereum.beacon.wire.WireApiPeer;
+import org.ethereum.beacon.wire.WireApiSub;
+import org.ethereum.beacon.wire.WireApiSync;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.beacon.BeaconPipeline;
+import org.ethereum.beacon.wire.impl.plain.channel.beacon.WireApiSubAdapter;
import org.ethereum.beacon.wire.message.payload.GoodbyeMessage;
import org.ethereum.beacon.wire.message.payload.HelloMessage;
import tech.pegasys.artemis.util.bytes.BytesValue;
@@ -53,10 +57,11 @@ public void goodbye(GoodbyeMessage message) {
}
@Override
- public Channel getRawChannel() {
+ public Channel getConnection() {
return channel;
}
+ @Override
public CompletableFuture getRemoteHelloMessage() {
return remoteHelloMessageFut;
}
@@ -68,12 +73,12 @@ public CompletableFuture getPeerActiveFuture() {
private void onHello(HelloMessage message) {
remoteHelloMessageFut.complete(message);
- if (localHelloMessage.getNetworkId() != message.getNetworkId()) {
- disconnect(new GoodbyeMessage(GoodbyeMessage.IRRELEVANT_NETWORK));
- }
- if (!localHelloMessage.getChainId().equals(message.getChainId())) {
- disconnect(new GoodbyeMessage(GoodbyeMessage.IRRELEVANT_NETWORK));
- }
+// if (localHelloMessage.getNetworkId() != message.getNetworkId()) {
+// disconnect(new GoodbyeMessage(GoodbyeMessage.IRRELEVANT_NETWORK));
+// }
+// if (!localHelloMessage.getChainId().equals(message.getChainId())) {
+// disconnect(new GoodbyeMessage(GoodbyeMessage.IRRELEVANT_NETWORK));
+// }
peerActiveFut.complete(message);
}
@@ -103,7 +108,7 @@ public String toString() {
String bestSlot;
try {
bestSlot =
- getRemoteHelloMessage().isDone() ? getRemoteHelloMessage().get().getBestSlot().toString() : null;
+ getRemoteHelloMessage().isDone() ? getRemoteHelloMessage().get().getHeadSlot().toString() : null;
} catch (Exception e) {
bestSlot = "(err )" + e;
}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/SimplePeerManagerImpl.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/SimplePeerManagerImpl.java
new file mode 100644
index 000000000..6d64feb13
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/SimplePeerManagerImpl.java
@@ -0,0 +1,58 @@
+package org.ethereum.beacon.wire.impl.plain;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.ethereum.beacon.chain.BeaconTupleDetails;
+import org.ethereum.beacon.consensus.BeaconChainSpec;
+import org.ethereum.beacon.schedulers.Schedulers;
+import org.ethereum.beacon.ssz.SSZSerializer;
+import org.ethereum.beacon.wire.MessageSerializer;
+import org.ethereum.beacon.wire.Peer;
+import org.ethereum.beacon.wire.WireApiSub;
+import org.ethereum.beacon.wire.WireApiSync;
+import org.ethereum.beacon.wire.impl.AbstractPeerManager;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import tech.pegasys.artemis.util.bytes.Bytes4;
+import tech.pegasys.artemis.util.bytes.BytesValue;
+
+public class SimplePeerManagerImpl extends AbstractPeerManager {
+ private static final Logger logger = LogManager.getLogger(SimplePeerManagerImpl.class);
+
+ private final SSZSerializer ssz;
+ private final MessageSerializer messageSerializer;
+ private final WireApiSync syncServer;
+ private final WireApiSubRouter wireApiSubRouter;
+
+ public SimplePeerManagerImpl(
+ Publisher> channelsStream,
+ SSZSerializer ssz,
+ BeaconChainSpec spec,
+ MessageSerializer messageSerializer,
+ Schedulers schedulers,
+ WireApiSync syncServer,
+ Publisher headStream) {
+ super(spec, Bytes4.ZERO, schedulers, headStream);
+
+ this.ssz = ssz;
+ this.messageSerializer = messageSerializer;
+ this.syncServer = syncServer;
+
+ Flux.from(channelsStream).subscribe(ch -> onNewPeer(createPeer(ch)));
+
+ wireApiSubRouter = new WireApiSubRouter(
+ Flux.from(activatedPeerStream()).map(Peer::getSubApi),
+ Flux.from(disconnectedPeerStream()).map(Peer::getSubApi));
+ }
+
+ protected PeerImpl createPeer(Channel channel) {
+ logger.info("Creating a peer from new channel: " + channel);
+ return new PeerImpl(channel, createLocalHello(), ssz, messageSerializer, syncServer, schedulers);
+ }
+
+ @Override
+ public WireApiSub getWireApiSub() {
+ return wireApiSubRouter;
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/WireApiSubRouter.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/WireApiSubRouter.java
similarity index 97%
rename from wire/src/main/java/org/ethereum/beacon/wire/WireApiSubRouter.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/WireApiSubRouter.java
index f3ed98232..84d6bd473 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/WireApiSubRouter.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/WireApiSubRouter.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire;
+package org.ethereum.beacon.wire.impl.plain;
import java.util.Collections;
import java.util.List;
@@ -8,7 +8,7 @@
import org.ethereum.beacon.core.operations.Attestation;
import org.ethereum.beacon.stream.RxUtil;
import org.ethereum.beacon.util.Utils;
-import org.javatuples.Pair;
+import org.ethereum.beacon.wire.WireApiSub;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/Channel.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/Channel.java
similarity index 91%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/Channel.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/Channel.java
index 71a91d4a0..211d4c6fc 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/Channel.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/Channel.java
@@ -1,6 +1,7 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import java.util.concurrent.CompletableFuture;
+import org.ethereum.beacon.wire.PeerConnection;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
@@ -9,7 +10,7 @@
* The channel is assumed closed when {@link #inboundMessageStream()} is in Complete
* state.
*/
-public interface Channel {
+public interface Channel extends PeerConnection {
/**
* Returns the steam of inbound messages. When the stream completes the channel is assumed
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelCodec.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelCodec.java
similarity index 95%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelCodec.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelCodec.java
index 6c26434f6..c06cb1086 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelCodec.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelCodec.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import java.util.function.Function;
import org.reactivestreams.Publisher;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelFilter.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelFilter.java
similarity index 94%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelFilter.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelFilter.java
index 51198c405..6fb08da4c 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelFilter.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelFilter.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelHub.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelHub.java
similarity index 94%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelHub.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelHub.java
index 8c38d348c..c86b0a8a6 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelHub.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelHub.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import org.reactivestreams.Publisher;
import reactor.core.publisher.ConnectableFlux;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelOp.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelOp.java
similarity index 87%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelOp.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelOp.java
index dd9037552..bbe306a35 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/ChannelOp.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/ChannelOp.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
/**
* Represents {@link Channel} operation which has inbound {@link Channel} with messages of
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/IdentityChannel.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/IdentityChannel.java
similarity index 91%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/IdentityChannel.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/IdentityChannel.java
index c10712db4..60bfbdcbf 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/IdentityChannel.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/IdentityChannel.java
@@ -1,6 +1,5 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
-import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannel.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannel.java
similarity index 92%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannel.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannel.java
index 45a323e46..f94f115d5 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannel.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannel.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import org.reactivestreams.Publisher;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelAdapter.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelAdapter.java
similarity index 98%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelAdapter.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelAdapter.java
index 26530b82e..0d055f550 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelAdapter.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelAdapter.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
@@ -9,7 +9,6 @@
import org.ethereum.beacon.wire.exceptions.WireException;
import org.ethereum.beacon.wire.exceptions.WireRpcClosedException;
import org.ethereum.beacon.wire.exceptions.WireRpcTimeoutException;
-import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelClassFilter.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelClassFilter.java
similarity index 97%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelClassFilter.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelClassFilter.java
index 3b35a9ff6..a85d1b956 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelClassFilter.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelClassFilter.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelMapper.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelMapper.java
similarity index 98%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelMapper.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelMapper.java
index a9bd0752c..0189fc6eb 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcChannelMapper.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcChannelMapper.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcMessage.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcMessage.java
similarity index 98%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/RpcMessage.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcMessage.java
index 9e7b2db1d..fd8cb61c3 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/RpcMessage.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/RpcMessage.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel;
+package org.ethereum.beacon.wire.impl.plain.channel;
import java.util.HashMap;
import java.util.Map;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconPayloadCodec.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconPayloadCodec.java
similarity index 89%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconPayloadCodec.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconPayloadCodec.java
index 2f792254e..2f7d0fb72 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconPayloadCodec.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconPayloadCodec.java
@@ -1,12 +1,12 @@
-package org.ethereum.beacon.wire.channel.beacon;
+package org.ethereum.beacon.wire.impl.plain.channel.beacon;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.ssz.SSZSerializer;
-import org.ethereum.beacon.wire.channel.Channel;
-import org.ethereum.beacon.wire.channel.ChannelCodec;
-import org.ethereum.beacon.wire.channel.RpcMessage;
-import org.ethereum.beacon.wire.exceptions.WireRemoteRpcError;
+import org.ethereum.beacon.wire.exceptions.WireRpcRemoteError;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.ChannelCodec;
+import org.ethereum.beacon.wire.impl.plain.channel.RpcMessage;
import org.ethereum.beacon.wire.message.RequestMessage;
import org.ethereum.beacon.wire.message.RequestMessagePayload;
import org.ethereum.beacon.wire.message.ResponseMessage;
@@ -74,6 +74,6 @@ protected static ResponseMessage serializeError(SSZSerializer sszSerializer, Thr
}
protected static Throwable deserializeError(SSZSerializer sszSerializer, int respCode, BytesValue data) {
- return new WireRemoteRpcError("Remote peer call error: code = " + respCode + ", payload: " + data);
+ return new WireRpcRemoteError("Remote peer call error: code = " + respCode + ", payload: " + data);
}
}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconPipeline.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconPipeline.java
similarity index 93%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconPipeline.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconPipeline.java
index 2d4c3c1e4..08e053b52 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconPipeline.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconPipeline.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.channel.beacon;
+package org.ethereum.beacon.wire.impl.plain.channel.beacon;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
@@ -12,15 +12,15 @@
import org.ethereum.beacon.wire.MessageSerializer;
import org.ethereum.beacon.wire.WireApiPeer;
import org.ethereum.beacon.wire.WireApiSync;
-import org.ethereum.beacon.wire.channel.Channel;
-import org.ethereum.beacon.wire.channel.ChannelCodec;
-import org.ethereum.beacon.wire.channel.ChannelHub;
-import org.ethereum.beacon.wire.channel.IdentityChannel;
-import org.ethereum.beacon.wire.channel.RpcChannel;
-import org.ethereum.beacon.wire.channel.RpcChannelAdapter;
-import org.ethereum.beacon.wire.channel.RpcChannelClassFilter;
-import org.ethereum.beacon.wire.channel.RpcChannelMapper;
-import org.ethereum.beacon.wire.channel.RpcMessage;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.ChannelCodec;
+import org.ethereum.beacon.wire.impl.plain.channel.ChannelHub;
+import org.ethereum.beacon.wire.impl.plain.channel.IdentityChannel;
+import org.ethereum.beacon.wire.impl.plain.channel.RpcChannel;
+import org.ethereum.beacon.wire.impl.plain.channel.RpcChannelAdapter;
+import org.ethereum.beacon.wire.impl.plain.channel.RpcChannelClassFilter;
+import org.ethereum.beacon.wire.impl.plain.channel.RpcChannelMapper;
+import org.ethereum.beacon.wire.impl.plain.channel.RpcMessage;
import org.ethereum.beacon.wire.message.Message;
import org.ethereum.beacon.wire.message.RequestMessage;
import org.ethereum.beacon.wire.message.RequestMessagePayload;
@@ -83,7 +83,7 @@ public void initFromMessageChannel(Channel messageChannel) {
messageChannel.getCloseFuture().thenAccept(v -> System.out.println("### Closed"));
ChannelCodec<
- RpcMessage,
+ RpcMessage,
RpcMessage>
payloadCodec = new BeaconPayloadCodec(rpcMessageChannel, sszSerializer);
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconRpcMapper.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconRpcMapper.java
similarity index 85%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconRpcMapper.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconRpcMapper.java
index 20c7be6ec..00eba3951 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/BeaconRpcMapper.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/BeaconRpcMapper.java
@@ -1,8 +1,8 @@
-package org.ethereum.beacon.wire.channel.beacon;
+package org.ethereum.beacon.wire.impl.plain.channel.beacon;
import java.util.concurrent.atomic.AtomicLong;
-import org.ethereum.beacon.wire.channel.Channel;
-import org.ethereum.beacon.wire.channel.RpcChannelMapper;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.RpcChannelMapper;
import org.ethereum.beacon.wire.message.Message;
import org.ethereum.beacon.wire.message.RequestMessage;
import org.ethereum.beacon.wire.message.ResponseMessage;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/WireApiSubAdapter.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/WireApiSubAdapter.java
similarity index 92%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/WireApiSubAdapter.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/WireApiSubAdapter.java
index 5fe7c6a68..10cc0fbe4 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/WireApiSubAdapter.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/WireApiSubAdapter.java
@@ -1,9 +1,8 @@
-package org.ethereum.beacon.wire.channel.beacon;
+package org.ethereum.beacon.wire.impl.plain.channel.beacon;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.operations.Attestation;
import org.ethereum.beacon.wire.WireApiSub;
-import org.ethereum.beacon.wire.channel.beacon.WireApiSubRpc;
import org.reactivestreams.Publisher;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/WireApiSubRpc.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/WireApiSubRpc.java
similarity index 72%
rename from wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/WireApiSubRpc.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/WireApiSubRpc.java
index 9467dc40e..085c251d9 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/channel/beacon/WireApiSubRpc.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/channel/beacon/WireApiSubRpc.java
@@ -1,8 +1,7 @@
-package org.ethereum.beacon.wire.channel.beacon;
+package org.ethereum.beacon.wire.impl.plain.channel.beacon;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.operations.Attestation;
-import org.reactivestreams.Publisher;
public interface WireApiSubRpc {
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/Client.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/Client.java
similarity index 82%
rename from wire/src/main/java/org/ethereum/beacon/wire/net/Client.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/Client.java
index db0274d49..4168a2bb8 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/Client.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/Client.java
@@ -1,7 +1,7 @@
-package org.ethereum.beacon.wire.net;
+package org.ethereum.beacon.wire.impl.plain.net;
import java.util.concurrent.CompletableFuture;
-import org.ethereum.beacon.wire.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
import tech.pegasys.artemis.util.bytes.BytesValue;
/**
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/ConnectionManager.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/ConnectionManager.java
similarity index 97%
rename from wire/src/main/java/org/ethereum/beacon/wire/net/ConnectionManager.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/ConnectionManager.java
index b3748ff9c..9ddf05bf0 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/ConnectionManager.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/ConnectionManager.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.net;
+package org.ethereum.beacon.wire.impl.plain.net;
import java.time.Duration;
import java.util.Collections;
@@ -10,7 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.schedulers.Scheduler;
-import org.ethereum.beacon.wire.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/Server.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/Server.java
similarity index 89%
rename from wire/src/main/java/org/ethereum/beacon/wire/net/Server.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/Server.java
index e7104c3c9..0f0caadcd 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/Server.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/Server.java
@@ -1,7 +1,7 @@
-package org.ethereum.beacon.wire.net;
+package org.ethereum.beacon.wire.impl.plain.net;
import io.netty.channel.ChannelFuture;
-import org.ethereum.beacon.wire.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
import org.reactivestreams.Publisher;
import tech.pegasys.artemis.util.bytes.BytesValue;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyChannel.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyChannel.java
similarity index 94%
rename from wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyChannel.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyChannel.java
index 078be81ad..a61680009 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyChannel.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyChannel.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.net.netty;
+package org.ethereum.beacon.wire.impl.plain.net.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
@@ -8,7 +8,7 @@
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.ethereum.beacon.wire.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
@@ -16,7 +16,8 @@
import reactor.core.publisher.ReplayProcessor;
import tech.pegasys.artemis.util.bytes.BytesValue;
-public class NettyChannel extends SimpleChannelInboundHandler implements Channel {
+public class NettyChannel extends SimpleChannelInboundHandler implements
+ Channel {
private static final Logger logger = LogManager.getLogger(NettyChannel.class);
private final Consumer activeChannelListener;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyChannelInitializer.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyChannelInitializer.java
similarity index 96%
rename from wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyChannelInitializer.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyChannelInitializer.java
index 3e7fe67da..34e36997f 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyChannelInitializer.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyChannelInitializer.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.net.netty;
+package org.ethereum.beacon.wire.impl.plain.net.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyClient.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyClient.java
similarity index 93%
rename from wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyClient.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyClient.java
index 154b5555b..006dff2cf 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyClient.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyClient.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.net.netty;
+package org.ethereum.beacon.wire.impl.plain.net.netty;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
@@ -10,7 +10,7 @@
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import org.ethereum.beacon.wire.net.Client;
+import org.ethereum.beacon.wire.impl.plain.net.Client;
public class NettyClient implements Client {
private final NioEventLoopGroup workerGroup;
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyServer.java
similarity index 97%
rename from wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java
rename to wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyServer.java
index 6e7341588..984168e69 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/impl/plain/net/netty/NettyServer.java
@@ -1,4 +1,4 @@
-package org.ethereum.beacon.wire.net.netty;
+package org.ethereum.beacon.wire.impl.plain.net.netty;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
@@ -9,9 +9,10 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
+import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.ethereum.beacon.wire.net.Server;
+import org.ethereum.beacon.wire.impl.plain.net.Server;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
@@ -20,8 +21,6 @@
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.UnicastProcessor;
-import java.util.concurrent.Executor;
-
public class NettyServer implements Server {
private static final Logger logger = LogManager.getLogger(NettyServer.class);
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/message/ErrorCode.java b/wire/src/main/java/org/ethereum/beacon/wire/message/ErrorCode.java
new file mode 100644
index 000000000..5918ea42b
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/message/ErrorCode.java
@@ -0,0 +1,25 @@
+package org.ethereum.beacon.wire.message;
+
+public enum ErrorCode {
+ OK(0),
+ IvalidRequest(1),
+ ServerError(2),
+ Unknown(-1);
+
+ private final int code;
+
+ ErrorCode(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public static ErrorCode fromCode(int code) {
+ for (ErrorCode errorCode : values()) {
+ if (errorCode.getCode() == code) return errorCode;
+ }
+ return Unknown;
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/message/payload/BlockRequestMessage.java b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/BlockRequestMessage.java
new file mode 100644
index 000000000..d17590f80
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/BlockRequestMessage.java
@@ -0,0 +1,57 @@
+package org.ethereum.beacon.wire.message.payload;
+
+import org.ethereum.beacon.core.types.SlotNumber;
+import org.ethereum.beacon.ssz.annotation.SSZ;
+import org.ethereum.beacon.ssz.annotation.SSZSerializable;
+import org.ethereum.beacon.wire.message.RequestMessagePayload;
+import tech.pegasys.artemis.ethereum.core.Hash32;
+import tech.pegasys.artemis.util.uint.UInt64;
+
+@SSZSerializable
+public class BlockRequestMessage extends RequestMessagePayload {
+ public static final int METHOD_ID = 0x0D;
+
+ @SSZ private final Hash32 headBlockRoot;
+ @SSZ private final SlotNumber startSlot;
+ @SSZ private final UInt64 count;
+ @SSZ private final UInt64 step;
+
+ public BlockRequestMessage(Hash32 headBlockRoot,
+ SlotNumber startSlot, UInt64 count, UInt64 step) {
+ this.headBlockRoot = headBlockRoot;
+ this.startSlot = startSlot;
+ this.count = count;
+ this.step = step;
+ }
+
+ @Override
+ public int getMethodId() {
+ return METHOD_ID;
+ }
+
+ public Hash32 getHeadBlockRoot() {
+ return headBlockRoot;
+ }
+
+ public SlotNumber getStartSlot() {
+ return startSlot;
+ }
+
+ public UInt64 getCount() {
+ return count;
+ }
+
+ public UInt64 getStep() {
+ return step;
+ }
+
+ @Override
+ public String toString() {
+ return "BlockRequestMessage{" +
+ "headBlockRoot=" + headBlockRoot +
+ ", startSlot=" + startSlot +
+ ", count=" + count +
+ ", step=" + step +
+ '}';
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/message/payload/BlockResponseMessage.java b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/BlockResponseMessage.java
new file mode 100644
index 000000000..35e9bc52a
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/BlockResponseMessage.java
@@ -0,0 +1,28 @@
+package org.ethereum.beacon.wire.message.payload;
+
+import java.util.List;
+import org.ethereum.beacon.core.BeaconBlock;
+import org.ethereum.beacon.ssz.annotation.SSZ;
+import org.ethereum.beacon.ssz.annotation.SSZSerializable;
+import org.ethereum.beacon.wire.message.ResponseMessagePayload;
+
+@SSZSerializable
+public class BlockResponseMessage extends ResponseMessagePayload {
+
+ @SSZ private final List blocks;
+
+ public BlockResponseMessage(List blocks) {
+ this.blocks = blocks;
+ }
+
+ public List getBlocks() {
+ return blocks;
+ }
+
+ @Override
+ public String toString() {
+ return "BlockResponseMessage{" +
+ "blocks=" + blocks +
+ '}';
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/message/payload/HelloMessage.java b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/HelloMessage.java
index 0b47eeffa..c26608166 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/message/payload/HelloMessage.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/HelloMessage.java
@@ -6,29 +6,25 @@
import org.ethereum.beacon.ssz.annotation.SSZSerializable;
import org.ethereum.beacon.wire.message.RequestMessagePayload;
import tech.pegasys.artemis.ethereum.core.Hash32;
-import tech.pegasys.artemis.util.uint.UInt64;
+import tech.pegasys.artemis.util.bytes.Bytes4;
@SSZSerializable
public class HelloMessage extends RequestMessagePayload {
public static final int METHOD_ID = 0x0;
- @SSZ(type = "uint8")
- private final int networkId;
- @SSZ private final UInt64 chainId;
- @SSZ private final Hash32 latestFinalizedRoot;
- @SSZ private final EpochNumber latestFinalizedEpoch;
- @SSZ private final Hash32 bestRoot;
- @SSZ private final SlotNumber bestSlot;
+ @SSZ private final Bytes4 fork;
+ @SSZ private final Hash32 finalizedRoot;
+ @SSZ private final EpochNumber finalizedEpoch;
+ @SSZ private final Hash32 headRoot;
+ @SSZ private final SlotNumber headSlot;
- public HelloMessage(int networkId, UInt64 chainId,
- Hash32 latestFinalizedRoot, EpochNumber latestFinalizedEpoch,
- Hash32 bestRoot, SlotNumber bestSlot) {
- this.networkId = networkId;
- this.chainId = chainId;
- this.latestFinalizedRoot = latestFinalizedRoot;
- this.latestFinalizedEpoch = latestFinalizedEpoch;
- this.bestRoot = bestRoot;
- this.bestSlot = bestSlot;
+ public HelloMessage(Bytes4 fork, Hash32 finalizedRoot,
+ EpochNumber finalizedEpoch, Hash32 headRoot, SlotNumber headSlot) {
+ this.fork = fork;
+ this.finalizedRoot = finalizedRoot;
+ this.finalizedEpoch = finalizedEpoch;
+ this.headRoot = headRoot;
+ this.headSlot = headSlot;
}
@Override
@@ -36,39 +32,34 @@ public int getMethodId() {
return METHOD_ID;
}
- public int getNetworkId() {
- return (byte) networkId;
+ public Bytes4 getFork() {
+ return fork;
}
- public UInt64 getChainId() {
- return chainId;
+ public Hash32 getFinalizedRoot() {
+ return finalizedRoot;
}
- public Hash32 getLatestFinalizedRoot() {
- return latestFinalizedRoot;
+ public EpochNumber getFinalizedEpoch() {
+ return finalizedEpoch;
}
- public EpochNumber getLatestFinalizedEpoch() {
- return latestFinalizedEpoch;
+ public Hash32 getHeadRoot() {
+ return headRoot;
}
- public Hash32 getBestRoot() {
- return bestRoot;
- }
-
- public SlotNumber getBestSlot() {
- return bestSlot;
+ public SlotNumber getHeadSlot() {
+ return headSlot;
}
@Override
public String toString() {
return "HelloMessage{" +
- "networkId=" + networkId +
- ", chainId=" + chainId +
- ", latestFinalizedRoot=" + latestFinalizedRoot +
- ", latestFinalizedEpoch=" + latestFinalizedEpoch +
- ", bestRoot=" + bestRoot +
- ", bestSlot=" + bestSlot +
+ "fork=" + fork +
+ ", finalizedRoot=" + finalizedRoot +
+ ", finalizedEpoch=" + finalizedEpoch +
+ ", headRoot=" + headRoot +
+ ", headSlot=" + headSlot +
'}';
}
}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/message/payload/RecentBlockRequestMessage.java b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/RecentBlockRequestMessage.java
new file mode 100644
index 000000000..57105489b
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/RecentBlockRequestMessage.java
@@ -0,0 +1,35 @@
+package org.ethereum.beacon.wire.message.payload;
+
+import java.util.List;
+import org.ethereum.beacon.ssz.annotation.SSZ;
+import org.ethereum.beacon.ssz.annotation.SSZSerializable;
+import org.ethereum.beacon.wire.message.RequestMessagePayload;
+import tech.pegasys.artemis.ethereum.core.Hash32;
+
+@SSZSerializable
+public class RecentBlockRequestMessage extends RequestMessagePayload {
+ public static final int METHOD_ID = 0x0E;
+
+ @SSZ private final List blockRoots;
+
+ public RecentBlockRequestMessage(
+ List blockRoots) {
+ this.blockRoots = blockRoots;
+ }
+
+ @Override
+ public int getMethodId() {
+ return METHOD_ID;
+ }
+
+ public List getBlockRoots() {
+ return blockRoots;
+ }
+
+ @Override
+ public String toString() {
+ return "RecentBlockRequestMessage{" +
+ "blockRoots=" + blockRoots +
+ '}';
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/message/payload/RecentBlockResponseMessage.java b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/RecentBlockResponseMessage.java
new file mode 100644
index 000000000..374c2879a
--- /dev/null
+++ b/wire/src/main/java/org/ethereum/beacon/wire/message/payload/RecentBlockResponseMessage.java
@@ -0,0 +1,28 @@
+package org.ethereum.beacon.wire.message.payload;
+
+import java.util.List;
+import org.ethereum.beacon.core.BeaconBlock;
+import org.ethereum.beacon.ssz.annotation.SSZ;
+import org.ethereum.beacon.ssz.annotation.SSZSerializable;
+import org.ethereum.beacon.wire.message.ResponseMessagePayload;
+
+@SSZSerializable
+public class RecentBlockResponseMessage extends ResponseMessagePayload {
+
+ @SSZ private final List blocks;
+
+ public RecentBlockResponseMessage(List blocks) {
+ this.blocks = blocks;
+ }
+
+ public List getBlocks() {
+ return blocks;
+ }
+
+ @Override
+ public String toString() {
+ return "RecentBlockResponseMessage{" +
+ "blocks=" + blocks +
+ '}';
+ }
+}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/sync/SyncManagerImpl.java b/wire/src/main/java/org/ethereum/beacon/wire/sync/SyncManagerImpl.java
index 3d0424930..1b8658559 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/sync/SyncManagerImpl.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/sync/SyncManagerImpl.java
@@ -1,5 +1,19 @@
package org.ethereum.beacon.wire.sync;
+import static java.lang.Math.max;
+import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.ExistingBlock;
+import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.ExpiredBlock;
+import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.InvalidBlock;
+import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.NoParent;
+import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.OK;
+import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.StateMismatch;
+import static org.ethereum.beacon.stream.RxUtil.fromOptional;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.chain.BeaconTuple;
@@ -18,6 +32,7 @@
import org.ethereum.beacon.wire.WireApiSync;
import org.ethereum.beacon.wire.exceptions.WireInvalidConsensusDataException;
import org.ethereum.beacon.wire.message.payload.BlockHeadersRequestMessage;
+import org.ethereum.beacon.wire.message.payload.BlockRequestMessage;
import org.ethereum.beacon.wire.sync.SyncQueue.BlockRequest;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
@@ -27,21 +42,6 @@
import tech.pegasys.artemis.ethereum.core.Hash32;
import tech.pegasys.artemis.util.uint.UInt64s;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import static java.lang.Math.max;
-import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.ExistingBlock;
-import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.ExpiredBlock;
-import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.InvalidBlock;
-import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.NoParent;
-import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.OK;
-import static org.ethereum.beacon.chain.MutableBeaconChain.ImportResult.StateMismatch;
-import static org.ethereum.beacon.stream.RxUtil.fromOptional;
-
public class SyncManagerImpl implements SyncManager {
private static final Logger logger = LogManager.getLogger(SyncManagerImpl.class);
@@ -180,7 +180,7 @@ public void start() {
blockRequestFlux
.map(
req ->
- new BlockHeadersRequestMessage(
+ new BlockRequestMessage(
req.getStartRoot().orElse(BlockHeadersRequestMessage.NULL_START_ROOT),
req.getStartSlot().orElse(BlockHeadersRequestMessage.NULL_START_SLOT),
req.getMaxCount(),
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/sync/WireApiSyncRouter.java b/wire/src/main/java/org/ethereum/beacon/wire/sync/WireApiSyncRouter.java
index 35f595d5b..3425ebf65 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/sync/WireApiSyncRouter.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/sync/WireApiSyncRouter.java
@@ -1,11 +1,14 @@
package org.ethereum.beacon.wire.sync;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.ethereum.beacon.consensus.hasher.ObjectHasher;
+import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.stream.RxUtil;
import org.ethereum.beacon.util.Utils;
import org.ethereum.beacon.wire.Feedback;
@@ -14,12 +17,14 @@
import org.ethereum.beacon.wire.message.payload.BlockBodiesResponseMessage;
import org.ethereum.beacon.wire.message.payload.BlockHeadersRequestMessage;
import org.ethereum.beacon.wire.message.payload.BlockHeadersResponseMessage;
+import org.ethereum.beacon.wire.message.payload.BlockRequestMessage;
import org.ethereum.beacon.wire.message.payload.BlockRootsRequestMessage;
import org.ethereum.beacon.wire.message.payload.BlockRootsResponseMessage;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
+import tech.pegasys.artemis.ethereum.core.Hash32;
/**
* Tracks and aggregates {@link WireApiSync} instances from separate peers
@@ -75,4 +80,16 @@ public CompletableFuture> requestBlockBodie
BlockBodiesRequestMessage requestMessage) {
return submitAsyncTask(api -> api.requestBlockBodies(requestMessage));
}
+
+ @Override
+ public CompletableFuture>> requestBlocks(
+ BlockRequestMessage requestMessage, ObjectHasher hasher) {
+ return submitAsyncTask(api -> api.requestBlocks(requestMessage, hasher));
+ }
+
+ @Override
+ public CompletableFuture>> requestRecentBlocks(List blockRoots,
+ ObjectHasher hasher) {
+ return submitAsyncTask(api -> api.requestRecentBlocks(blockRoots, hasher));
+ }
}
diff --git a/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java b/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java
index b8522ea8c..fba2f439b 100644
--- a/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java
+++ b/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java
@@ -1,5 +1,19 @@
package org.ethereum.beacon.wire;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.impl.MemBeaconChainStorageFactory;
import org.ethereum.beacon.chain.storage.util.StorageUtils;
@@ -21,13 +35,14 @@
import org.ethereum.beacon.start.common.util.SimpleDepositContract;
import org.ethereum.beacon.start.common.util.SimulateUtils;
import org.ethereum.beacon.validator.crypto.BLS381Credentials;
-import org.ethereum.beacon.wire.channel.Channel;
-import org.ethereum.beacon.wire.net.ConnectionManager;
-import org.ethereum.beacon.wire.net.netty.NettyClient;
-import org.ethereum.beacon.wire.net.netty.NettyServer;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.net.ConnectionManager;
+import org.ethereum.beacon.wire.impl.plain.net.netty.NettyClient;
+import org.ethereum.beacon.wire.impl.plain.net.netty.NettyServer;
import org.ethereum.beacon.wire.sync.SyncManager;
import org.javatuples.Pair;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -35,23 +50,9 @@
import tech.pegasys.artemis.util.bytes.BytesValue;
import tech.pegasys.artemis.util.uint.UInt64;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.time.Duration;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
public class NodeTest {
+ @Ignore
@Test
public void test1() throws Exception {
Random rnd = new Random();
@@ -104,7 +105,7 @@ public void test1() throws Exception {
.stream()
.map(BLS381Credentials::createWithDummySigner)
.collect(Collectors.toList()),
- connectionManager,
+ null,
db,
chainStorage,
schedulers,
@@ -133,7 +134,7 @@ public void test1() throws Exception {
specBuilder.buildSpec(),
depositContract,
null,
- slaveConnectionManager,
+ null,
db,
chainStorage,
schedulers,
diff --git a/wire/src/test/java/org/ethereum/beacon/wire/PeersTest.java b/wire/src/test/java/org/ethereum/beacon/wire/PeersTest.java
index 0fe4cbfc8..4e87a8d0f 100644
--- a/wire/src/test/java/org/ethereum/beacon/wire/PeersTest.java
+++ b/wire/src/test/java/org/ethereum/beacon/wire/PeersTest.java
@@ -6,34 +6,34 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
import org.ethereum.beacon.core.spec.SpecConstants;
import org.ethereum.beacon.core.spec.SpecConstantsResolver;
import org.ethereum.beacon.schedulers.Schedulers;
-import org.ethereum.beacon.start.common.Launcher;
import org.ethereum.beacon.simulator.SimulatorLauncher;
import org.ethereum.beacon.simulator.SimulatorLauncher.Builder;
import org.ethereum.beacon.ssz.SSZBuilder;
import org.ethereum.beacon.ssz.SSZSerializer;
+import org.ethereum.beacon.start.common.Launcher;
import org.ethereum.beacon.stream.SimpleProcessor;
-import org.ethereum.beacon.wire.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.SimplePeerManagerImpl;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.net.ConnectionManager;
+import org.ethereum.beacon.wire.impl.plain.net.Server;
+import org.ethereum.beacon.wire.impl.plain.net.netty.NettyClient;
+import org.ethereum.beacon.wire.impl.plain.net.netty.NettyServer;
import org.ethereum.beacon.wire.message.SSZMessageSerializer;
-import org.ethereum.beacon.wire.net.ConnectionManager;
-import org.ethereum.beacon.wire.net.netty.NettyClient;
-import org.ethereum.beacon.wire.net.netty.NettyServer;
-import org.ethereum.beacon.wire.net.Server;
import org.ethereum.beacon.wire.sync.BeaconBlockTree;
import org.ethereum.beacon.wire.sync.SyncManagerImpl;
import org.ethereum.beacon.wire.sync.SyncQueue;
import org.ethereum.beacon.wire.sync.SyncQueueImpl;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import tech.pegasys.artemis.util.bytes.BytesValue;
-import tech.pegasys.artemis.util.uint.UInt64;
public class PeersTest {
@@ -66,6 +66,7 @@ public void test01() {
sink.complete();
}
+ @Ignore
@Test
public void test1() throws Exception {
int slotCount = 32;
@@ -93,8 +94,6 @@ public void test1() throws Exception {
MessageSerializer messageSerializer = new SSZMessageSerializer(ssz);
WireApiSyncServer syncServer = new WireApiSyncServer(peer0.getBeaconChainStorage());
SimplePeerManagerImpl peerManager = new SimplePeerManagerImpl(
- (byte) 1,
- UInt64.valueOf(1),
connectionManager.channelsStream(),
ssz,
peer0.getSpec(),
@@ -107,7 +106,7 @@ public void test1() throws Exception {
.subscribe(
peer -> {
System.out.println("Remote peer connected: " + peer);
- Flux.from(peer.getRawChannel().inboundMessageStream())
+ Flux.from(((Channel)peer.getConnection()).inboundMessageStream())
.doOnError(e -> System.out.println("#### Error: " + e))
.doOnComplete(() -> System.out.println("#### Complete"))
.doOnNext(msg -> System.out.println("#### on message"))
@@ -131,8 +130,6 @@ public void test1() throws Exception {
.buildSerializer();
MessageSerializer messageSerializer = new SSZMessageSerializer(ssz);
SimplePeerManagerImpl peerManager = new SimplePeerManagerImpl(
- (byte) 1,
- UInt64.valueOf(1),
connectionManager.channelsStream(),
ssz,
peer1.getSpec(),
diff --git a/wire/src/test/java/org/ethereum/beacon/wire/WireApiSubRouterTest.java b/wire/src/test/java/org/ethereum/beacon/wire/WireApiSubRouterTest.java
index 769cc5b4c..cc0182b0e 100644
--- a/wire/src/test/java/org/ethereum/beacon/wire/WireApiSubRouterTest.java
+++ b/wire/src/test/java/org/ethereum/beacon/wire/WireApiSubRouterTest.java
@@ -9,7 +9,8 @@
import org.ethereum.beacon.core.operations.Attestation;
import org.ethereum.beacon.core.util.TestDataFactory;
import org.ethereum.beacon.wire.WireApiSubRouterTest.TestRouter.Connection;
-import org.ethereum.beacon.wire.channel.beacon.WireApiSubAdapter;
+import org.ethereum.beacon.wire.impl.plain.WireApiSubRouter;
+import org.ethereum.beacon.wire.impl.plain.channel.beacon.WireApiSubAdapter;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
diff --git a/wire/src/test/java/org/ethereum/beacon/wire/channel/BeaconPipelineChannelTest.java b/wire/src/test/java/org/ethereum/beacon/wire/channel/BeaconPipelineChannelTest.java
index e543987ec..8400faac0 100644
--- a/wire/src/test/java/org/ethereum/beacon/wire/channel/BeaconPipelineChannelTest.java
+++ b/wire/src/test/java/org/ethereum/beacon/wire/channel/BeaconPipelineChannelTest.java
@@ -6,8 +6,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.ethereum.beacon.core.types.SlotNumber;
import org.ethereum.beacon.schedulers.ControlledSchedulers;
@@ -17,7 +15,8 @@
import org.ethereum.beacon.ssz.SSZSerializer;
import org.ethereum.beacon.wire.Feedback;
import org.ethereum.beacon.wire.WireApiSync;
-import org.ethereum.beacon.wire.channel.beacon.BeaconPipeline;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.beacon.BeaconPipeline;
import org.ethereum.beacon.wire.message.Message;
import org.ethereum.beacon.wire.message.payload.BlockBodiesRequestMessage;
import org.ethereum.beacon.wire.message.payload.BlockBodiesResponseMessage;
@@ -28,7 +27,6 @@
import org.ethereum.beacon.wire.message.payload.BlockRootsResponseMessage.BlockRootSlot;
import org.junit.Assert;
import org.junit.Test;
-import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
diff --git a/wire/src/test/java/org/ethereum/beacon/wire/net/ConnectionManagerTest.java b/wire/src/test/java/org/ethereum/beacon/wire/net/ConnectionManagerTest.java
index e6bb3ec46..02710cb3e 100644
--- a/wire/src/test/java/org/ethereum/beacon/wire/net/ConnectionManagerTest.java
+++ b/wire/src/test/java/org/ethereum/beacon/wire/net/ConnectionManagerTest.java
@@ -7,13 +7,14 @@
import java.util.concurrent.CompletableFuture;
import org.ethereum.beacon.schedulers.ControlledSchedulers;
import org.ethereum.beacon.schedulers.Schedulers;
-import org.ethereum.beacon.wire.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.channel.Channel;
+import org.ethereum.beacon.wire.impl.plain.net.Client;
+import org.ethereum.beacon.wire.impl.plain.net.ConnectionManager;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import tech.pegasys.artemis.util.bytes.BytesValue;
public class ConnectionManagerTest {
diff --git a/wire/src/test/java/org/ethereum/beacon/wire/net/NettyChannelTest.java b/wire/src/test/java/org/ethereum/beacon/wire/net/NettyChannelTest.java
index c227702df..0d65a083c 100644
--- a/wire/src/test/java/org/ethereum/beacon/wire/net/NettyChannelTest.java
+++ b/wire/src/test/java/org/ethereum/beacon/wire/net/NettyChannelTest.java
@@ -6,9 +6,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.ethereum.beacon.wire.net.netty.NettyChannel;
-import org.ethereum.beacon.wire.net.netty.NettyClient;
-import org.ethereum.beacon.wire.net.netty.NettyServer;
+import org.ethereum.beacon.wire.impl.plain.net.netty.NettyChannel;
+import org.ethereum.beacon.wire.impl.plain.net.netty.NettyClient;
+import org.ethereum.beacon.wire.impl.plain.net.netty.NettyServer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;