diff --git a/start/common/build.gradle b/start/common/build.gradle
index 2d8aca98f..1d990d8b5 100644
--- a/start/common/build.gradle
+++ b/start/common/build.gradle
@@ -14,6 +14,10 @@ dependencies {
implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'
+ implementation 'io.prometheus:simpleclient'
+ implementation 'io.prometheus:simpleclient_hotspot'
+ implementation 'io.prometheus:simpleclient_httpserver'
+
testImplementation 'org.mockito:mockito-core'
testImplementation 'org.apache.logging.log4j:log4j-core'
diff --git a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java
new file mode 100644
index 000000000..c451e8eca
--- /dev/null
+++ b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java
@@ -0,0 +1,204 @@
+package org.ethereum.beacon.node.metrics;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.exporter.HTTPServer;
+import org.ethereum.beacon.chain.observer.ObservableBeaconState;
+import org.ethereum.beacon.consensus.BeaconChainSpec;
+import org.ethereum.beacon.consensus.BeaconStateEx;
+import org.ethereum.beacon.core.operations.Attestation;
+import org.ethereum.beacon.core.operations.attestation.AttestationData;
+import org.ethereum.beacon.core.state.PendingAttestation;
+import org.ethereum.beacon.core.types.SlotNumber;
+import tech.pegasys.artemis.util.collections.Bitlist;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implements (some) metrics from Beacon chain metrics specs.
+ * @See
+ *
+ */
+public class Metrics {
+ static Gauge PEERS =
+ Gauge.build().name("beaconchain_peers").help("Tracks number of peers").register();
+ static Gauge CURRENT_SLOT =
+ Gauge.build()
+ .name("beaconchain_current_slot")
+ .help("Latest slot recorded by the beacon chain")
+ .register();
+ // extra metric, not required by spec
+ static Gauge CURRENT_EPOCH =
+ Gauge.build()
+ .name("beaconchain_current_epoch")
+ .help("Latest epoch recorded by the beacon chain")
+ .register();
+ static Gauge CURRENT_JUSTIFIED_EPOCH =
+ Gauge.build()
+ .name("beaconchain_current_justified_epoch")
+ .help("Current justified epoch")
+ .register();
+ static Gauge CURRENT_FINALIZED_EPOCH =
+ Gauge.build()
+ .name("beaconchain_current_finalized_epoch")
+ .help("Current finalized epoch")
+ .register();
+ static Gauge CURRENT_PREV_JUSTIFIED_EPOCH =
+ Gauge.build()
+ .name("beaconchain_current_prev_justified_epoch")
+ .help("Current previously justified epoch")
+ .register();
+ // the metric implementation is probably partially correct, since it accounts only validators,
+ // which have been included in the chain
+ static Gauge CURRENT_EPOCH_LIVE_VALIDATORS =
+ Gauge.build()
+ .name("beaconchain_current_epoch_live_validators")
+ .help("Number of active validators who reported for the current epoch")
+ .register();
+ // the metric implementation is probably partially correct, since it accounts only validators,
+ // which have been included in the chain
+ static Gauge PREVIOUS_EPOCH_LIVE_VALIDATORS =
+ Gauge.build()
+ .name("beaconchain_previous_epoch_live_validators")
+ .help("Number of active validators who reported for the previous epoch")
+ .register();
+ // not yet implemented
+ static Counter REORG_EVENTS_TOTAL =
+ Counter.build()
+ .name("beaconchain_reorg_events_total")
+ .help("Occurrence of a reorganization of the chain")
+ .register();
+ // not yet supported by beacon chain yet
+ static Gauge PENDING_DEPOSITS =
+ Gauge.build()
+ .name("beaconchain_pending_deposits")
+ .help("Number of pending deposits")
+ .register();
+ // not yet supported by beacon chain yet
+ static Gauge PENDING_EXITS =
+ Gauge.build().name("beaconchain_pending_exits").help("Number of pending exits").register();
+ static Gauge TOTAL_DEPOSITS =
+ Gauge.build().name("beaconchain_total_deposits").help("Number of total deposits").register();
+ // not yet implemented
+ static Gauge PREVIOUS_EPOCH_STALE_BLOCKS =
+ Gauge.build()
+ .name("beaconchain_previous_epoch_stale_blocks")
+ .help("Number of blocks not included into canonical chain in the previous epoch")
+ .register();
+ // simple implementation currently - counts attestations arrived from wire during a slot
+ static Gauge PROPAGATED_ATTESTATIONS =
+ Gauge.build()
+ .name("beaconchain_propagated_attestations")
+ .help("Number of distinct attestations to a slot received from the wire")
+ .register();
+ private static HTTPServer metricsServer;
+
+ private static final Object attestation_lock = new Object();
+ private static SlotNumber currentSlot = null;
+ /**
+ * Accumulates atetstations received during a slot. Cleared on a new slot.
+ */
+ private static final Map currentSlotAttestations = new HashMap<>();
+
+ // setting initial values explicitly (zeros by default)
+ static {
+ PEERS.set(0);
+ CURRENT_SLOT.set(Double.NaN);
+ CURRENT_EPOCH.set(Double.NaN);
+ CURRENT_JUSTIFIED_EPOCH.set(Double.NaN);
+ CURRENT_FINALIZED_EPOCH.set(Double.NaN);
+ CURRENT_PREV_JUSTIFIED_EPOCH.set(Double.NaN);
+ CURRENT_EPOCH_LIVE_VALIDATORS.set(Double.NaN);
+ PREVIOUS_EPOCH_LIVE_VALIDATORS.set(Double.NaN);
+ PENDING_DEPOSITS.set(0);
+ PENDING_EXITS.set(0);
+ TOTAL_DEPOSITS.set(Double.NaN);
+ PREVIOUS_EPOCH_STALE_BLOCKS.set(Double.NaN);
+ PROPAGATED_ATTESTATIONS.set(Double.NaN);
+ }
+
+ public static void startMetricsServer(String host, int port) {
+ if (metricsServer != null) {
+ throw new IllegalStateException("Metrics server already started");
+ }
+ try {
+ metricsServer = new HTTPServer(host, port);
+ } catch (IOException e) {
+ new RuntimeException("Cannot start metrics server", e);
+ }
+ }
+
+ public static void stopMetricsServer() {
+ if (metricsServer == null) {
+ throw new IllegalStateException("Metrics server has not been started");
+ } else {
+ metricsServer.stop();
+ metricsServer = null;
+ }
+ }
+
+ public static void peerAdded() {
+ PEERS.inc();
+ }
+
+ public static void peerRemoved() {
+ PEERS.dec();
+ }
+
+ public static void onNewState(BeaconChainSpec spec, ObservableBeaconState obs) {
+ BeaconStateEx state = obs.getLatestSlotState();
+
+ synchronized (attestation_lock) {
+ if (currentSlot == null) {
+ currentSlot = state.getSlot();
+ } else if (currentSlot.less(state.getSlot())) {
+ currentSlot = state.getSlot();
+ currentSlotAttestations.clear();
+ PROPAGATED_ATTESTATIONS.set(0);
+ }
+ }
+
+ CURRENT_SLOT.set(state.getSlot().doubleValue());
+ CURRENT_EPOCH.set(spec.get_current_epoch(state).doubleValue());
+ CURRENT_JUSTIFIED_EPOCH.set(state.getCurrentJustifiedCheckpoint().getEpoch().doubleValue());
+ CURRENT_FINALIZED_EPOCH.set(state.getFinalizedCheckpoint().getEpoch().doubleValue());
+ CURRENT_PREV_JUSTIFIED_EPOCH.set(
+ state.getPreviousJustifiedCheckpoint().getEpoch().doubleValue());
+
+ CURRENT_EPOCH_LIVE_VALIDATORS.set(
+ calculateEpochValidators(state.getCurrentEpochAttestations().listCopy()));
+ PREVIOUS_EPOCH_LIVE_VALIDATORS.set(
+ calculateEpochValidators(state.getPreviousEpochAttestations().listCopy()));
+
+ TOTAL_DEPOSITS.set(state.getEth1Data().getDepositCount().doubleValue());
+ }
+
+ private static int calculateEpochValidators(List epochAttestations) {
+ return epochAttestations.stream()
+ .mapToInt(pa -> pa.getAggregationBits().getBits().size())
+ .sum();
+ }
+
+ public static void attestationPropagated(Attestation attestation) {
+ Bitlist aggregationBits = attestation.getAggregationBits();
+ AttestationData attestationData = attestation.getData();
+
+ int attestationCount;
+ synchronized (attestation_lock) {
+ Bitlist attestations;
+ if (!currentSlotAttestations.containsKey(attestationData)) {
+ attestations = aggregationBits.cappedCopy(aggregationBits.maxSize());
+ } else {
+ attestations = currentSlotAttestations.get(attestationData).or(aggregationBits);
+ }
+ currentSlotAttestations.put(attestationData, attestations);
+ attestationCount =
+ currentSlotAttestations.values().stream().mapToInt(bl -> bl.getBits().size()).sum();
+ }
+
+ PROPAGATED_ATTESTATIONS.set(attestationCount);
+ }
+}
diff --git a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java
index 317d59ada..6456b085d 100644
--- a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java
+++ b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java
@@ -1,10 +1,5 @@
package org.ethereum.beacon.start.common;
-import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT;
-
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.util.List;
import org.ethereum.beacon.chain.DefaultBeaconChain;
import org.ethereum.beacon.chain.MutableBeaconChain;
import org.ethereum.beacon.chain.ProposedBlockProcessor;
@@ -29,14 +24,15 @@
import org.ethereum.beacon.core.spec.SpecConstants;
import org.ethereum.beacon.core.spec.SpecConstantsResolver;
import org.ethereum.beacon.db.Database;
+import org.ethereum.beacon.node.metrics.Metrics;
import org.ethereum.beacon.pow.DepositContract;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.ssz.SSZBuilder;
import org.ethereum.beacon.ssz.SSZSerializer;
import org.ethereum.beacon.validator.BeaconChainProposer;
-import org.ethereum.beacon.validator.local.MultiValidatorService;
import org.ethereum.beacon.validator.attester.BeaconChainAttesterImpl;
import org.ethereum.beacon.validator.crypto.BLS381Credentials;
+import org.ethereum.beacon.validator.local.MultiValidatorService;
import org.ethereum.beacon.validator.proposer.BeaconChainProposerImpl;
import org.ethereum.beacon.wire.Feedback;
import org.ethereum.beacon.wire.MessageSerializer;
@@ -55,6 +51,12 @@
import reactor.core.publisher.Mono;
import tech.pegasys.artemis.util.uint.UInt64;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.List;
+
+import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT;
+
public class NodeLauncher {
private final static long DB_BUFFER_SIZE = 64L << 20; // 64Mb
@@ -62,6 +64,7 @@ public class NodeLauncher {
private final BeaconChainSpec spec;
private final DepositContract depositContract;
private final List validatorCred;
+ private final String dbPrefix;
private final BeaconChainStorageFactory storageFactory;
private final Schedulers schedulers;
@@ -101,6 +104,7 @@ public NodeLauncher(
DepositContract depositContract,
List validatorCred,
ConnectionManager> connectionManager,
+ String dbPrefix,
BeaconChainStorageFactory storageFactory,
Schedulers schedulers,
boolean startSyncManager) {
@@ -109,6 +113,7 @@ public NodeLauncher(
this.depositContract = depositContract;
this.validatorCred = validatorCred;
this.connectionManager = connectionManager;
+ this.dbPrefix = dbPrefix;
this.storageFactory = storageFactory;
this.schedulers = schedulers;
this.startSyncManager = startSyncManager;
@@ -127,7 +132,13 @@ void chainStarted(ChainStart chainStartEvent) {
new ExtendedSlotTransition(perEpochTransition, perSlotTransition, spec);
emptySlotTransition = new EmptySlotTransition(extendedSlotTransition);
- db = Database.rocksDB(Paths.get(computeDbName(chainStartEvent)).toString(), DB_BUFFER_SIZE);
+ if (dbPrefix == null) {
+ db = Database.inMemoryDB();
+ } else {
+ db =
+ Database.rocksDB(
+ Paths.get(computeDbName(dbPrefix, chainStartEvent)).toString(), DB_BUFFER_SIZE);
+ }
beaconChainStorage = storageFactory.create(db);
blockVerifier = BeaconBlockVerifier.createDefault(spec);
@@ -161,6 +172,12 @@ void chainStarted(ChainStart chainStartEvent) {
emptySlotTransition,
schedulers,
validatorCred != null ? Integer.MAX_VALUE : DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT);
+
+ Flux.from(observableStateProcessor.getObservableStateStream())
+ .subscribe(
+ obs -> {
+ Metrics.onNewState(spec, obs);
+ });
observableStateProcessor.start();
SSZSerializer ssz = new SSZBuilder()
@@ -184,6 +201,12 @@ void chainStarted(ChainStart chainStartEvent) {
wireApiSub = peerManager.getWireApiSub();
wireApiSyncRemote = peerManager.getWireApiSync();
+ Flux.from(wireApiSub.inboundAttestationsStream())
+ .subscribe(
+ a -> {
+ Metrics.attestationPropagated(a);
+ });
+
blockTree = new BeaconBlockTree(spec.getObjectHasher());
syncQueue = new SyncQueueImpl(blockTree);
@@ -239,10 +262,12 @@ void chainStarted(ChainStart chainStartEvent) {
// .subscribe(beaconChain::insert);
}
- private String computeDbName(ChainStart chainStart) {
+ private String computeDbName(String dbPrefix, ChainStart chainStart) {
return String.format(
- "db_start_time_%d_dep_root_%s",
- chainStart.getTime().getValue(), chainStart.getEth1Data().getDepositRoot().toStringShort());
+ "%s_start_time_%d_dep_root_%s",
+ dbPrefix,
+ chainStart.getTime().getValue(),
+ chainStart.getEth1Data().getDepositRoot().toStringShort());
}
public void stop() {
diff --git a/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java
index 1f6eb6ada..0b6363756 100644
--- a/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java
+++ b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java
@@ -10,6 +10,7 @@ public class Configuration {
private String db;
private List networks = new ArrayList<>();
private Validator validator;
+ String metricsEndpoint;
public String getName() {
return name;
@@ -42,4 +43,12 @@ public Validator getValidator() {
public void setValidator(Validator validator) {
this.validator = validator;
}
+
+ public String getMetricsEndpoint() {
+ return metricsEndpoint;
+ }
+
+ public void setMetricsEndpoint(String metricsEndpoint) {
+ this.metricsEndpoint = metricsEndpoint;
+ }
}
diff --git a/start/node/src/main/java/org/ethereum/beacon/node/Node.java b/start/node/src/main/java/org/ethereum/beacon/node/Node.java
index 5ba769354..399156008 100644
--- a/start/node/src/main/java/org/ethereum/beacon/node/Node.java
+++ b/start/node/src/main/java/org/ethereum/beacon/node/Node.java
@@ -98,6 +98,17 @@ public class Node implements Runnable {
)
private String specConstantsFile;
+ @CommandLine.Option(
+ names = "--metrics-endpoint",
+ paramLabel = "matrics-endpoint",
+ description = {
+ "Interface and port, Prometheus collection endpoint will be served from.",
+ "Should have form of interface:port.",
+ "Default endpoint is 0.0.0.0:8008."
+ }
+ )
+ private String metricsEndpoint;
+
public String getName() {
return name;
}
@@ -122,6 +133,10 @@ public String getSpecConstantsFile() {
return specConstantsFile;
}
+ public String getMetricsEndpoint() {
+ return metricsEndpoint;
+ }
+
public static void main(String[] args) {
try {
CommandLine commandLine = new CommandLine(new Node());
diff --git a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java
index 8eabcc51b..d18cffe5b 100644
--- a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java
+++ b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java
@@ -1,20 +1,6 @@
package org.ethereum.beacon.node;
import io.netty.channel.ChannelFutureListener;
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-import java.util.TimeZone;
-import java.util.concurrent.ThreadFactory;
-import java.util.stream.IntStream;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -41,6 +27,7 @@
import org.ethereum.beacon.emulator.config.main.conract.EmulatorContract;
import org.ethereum.beacon.emulator.config.main.network.NettyNetwork;
import org.ethereum.beacon.emulator.config.main.network.Network;
+import org.ethereum.beacon.node.metrics.Metrics;
import org.ethereum.beacon.pow.DepositContract;
import org.ethereum.beacon.schedulers.DefaultSchedulers;
import org.ethereum.beacon.schedulers.Scheduler;
@@ -53,9 +40,24 @@
import org.ethereum.beacon.wire.net.netty.NettyClient;
import org.ethereum.beacon.wire.net.netty.NettyServer;
import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Flux;
+import java.io.File;
import java.lang.reflect.InvocationTargetException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
import java.nio.file.Paths;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.TimeZone;
+import java.util.concurrent.ThreadFactory;
+import java.util.stream.IntStream;
public class NodeCommandLauncher implements Runnable {
private static final Logger logger = LogManager.getLogger("node");
@@ -174,16 +176,41 @@ protected ThreadFactory createThreadFactory(String namePattern) {
URI uri = URI.create(addr);
tcpConnectionManager.addActivePeer(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
}
+
+ Flux.from(connectionManager.channelsStream())
+ .subscribe(
+ ch -> {
+ Metrics.peerAdded();
+ ch.getCloseFuture().thenAccept(v -> Metrics.peerRemoved());
+ });
} else {
throw new IllegalArgumentException(
"This type of network is not supported yet: " + networkCfg.getClass());
}
+ String metricsEndpoint = config.getConfig().getMetricsEndpoint();
+ String metricsHost;
+ int metricsPort;
+ if (metricsEndpoint == null) {
+ metricsHost = "0.0.0.0";
+ metricsPort = 8008;
+ } else {
+ String[] parts = metricsEndpoint.split(":");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Wrong metrics endpoint format: \"" + metricsEndpoint + "\"");
+ }
+ metricsHost = parts[0];
+ metricsPort = Integer.parseInt(parts[1]);
+ }
+ Metrics.startMetricsServer(metricsHost, metricsPort);
+
NodeLauncher node = new NodeLauncher(
specBuilder.buildSpec(),
this.depositContract,
credentials,
connectionManager,
+ config.getConfig().getDb(),
new SSZBeaconChainStorageFactory(
spec.getObjectHasher(),
SerializerFactory.createSSZ(specConstants)),
@@ -346,6 +373,10 @@ public NodeCommandLauncher build() {
}
}
+ if (cliOptions.getMetricsEndpoint() != null) {
+ config.getConfig().setMetricsEndpoint(cliOptions.getMetricsEndpoint());
+ }
+
return new NodeCommandLauncher(
config,
specBuilder,
diff --git a/start/node/src/main/resources/config/default-node-config.yml b/start/node/src/main/resources/config/default-node-config.yml
index dad4809a9..c8c2e40b7 100644
--- a/start/node/src/main/resources/config/default-node-config.yml
+++ b/start/node/src/main/resources/config/default-node-config.yml
@@ -1,5 +1,5 @@
config:
- db: file://db
+ db: db
networks:
- type: netty
diff --git a/start/node/src/main/resources/config/sample-node-config.yml b/start/node/src/main/resources/config/sample-node-config.yml
index 51c6cca71..1114c5d80 100644
--- a/start/node/src/main/resources/config/sample-node-config.yml
+++ b/start/node/src/main/resources/config/sample-node-config.yml
@@ -1,6 +1,6 @@
config:
# location of database
- db: file://db
+ db: db
# the list of networks
networks:
diff --git a/versions.gradle b/versions.gradle
index 73d0030f3..7deb8ce43 100644
--- a/versions.gradle
+++ b/versions.gradle
@@ -40,5 +40,9 @@ dependencyManagement {
dependency "org.rocksdb:rocksdbjni:6.2.2"
dependency "com.googlecode.concurrent-locks:concurrent-locks:1.0.0"
+
+ dependency "io.prometheus:simpleclient:0.6.0"
+ dependency "io.prometheus:simpleclient_hotspot:0.6.0"
+ dependency "io.prometheus:simpleclient_httpserver:0.6.0"
}
}
diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java b/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java
index 3bdace966..6e7341588 100644
--- a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java
+++ b/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java
@@ -9,23 +9,41 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
-import java.util.concurrent.Executor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.wire.net.Server;
+import org.jetbrains.annotations.NotNull;
+import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.UnicastProcessor;
+import java.util.concurrent.Executor;
+
public class NettyServer implements Server {
private static final Logger logger = LogManager.getLogger(NettyServer.class);
private UnicastProcessor channels = UnicastProcessor.create();
+ private Publisher channelsDispatcher = createDispatcherProcessor(channels);
private FluxSink channelsSink = channels.sink();
private final int port;
private ChannelFuture channelFuture;
private final NioEventLoopGroup workerGroup;
+ /**
+ * UnicastProcessor allows single subscriber only.
+ * Work around by attaching a DirectProcessor to it.
+ */
+ @NotNull
+ private static Publisher createDispatcherProcessor(
+ Publisher channels) {
+ Processor processor = ReplayProcessor.create();
+ Flux.from(channels).subscribe(processor);
+ return processor;
+ }
+
public NettyServer(int port, NioEventLoopGroup workerGroup) {
this.port = port;
this.workerGroup = workerGroup;
@@ -42,7 +60,7 @@ public NettyServer(int port) {
@Override
public Publisher channelsStream() {
- return channels;
+ return channelsDispatcher;
}
private void onChannelActive(NettyChannel channel) {
diff --git a/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java b/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java
index 78ad3814d..177bd1335 100644
--- a/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java
+++ b/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java
@@ -91,6 +91,7 @@ public void test1() throws Exception {
.map(BLS381Credentials::createWithDummySigner)
.collect(Collectors.toList()),
connectionManager,
+ null,
new MemBeaconChainStorageFactory(spec.getObjectHasher()),
schedulers,
false);
@@ -113,6 +114,7 @@ public void test1() throws Exception {
depositContract,
null,
slaveConnectionManager,
+ null,
new MemBeaconChainStorageFactory(spec.getObjectHasher()),
schedulers,
true);