From 79dcb173d276e5c4ff71e998b02620d0112da899 Mon Sep 17 00:00:00 2001 From: Alex Vlasov/ericsson49 Date: Mon, 2 Sep 2019 23:33:58 +0300 Subject: [PATCH 1/5] Initial metrics implementation. Some metrics are not yet implemented. --- start/common/build.gradle | 4 + .../ethereum/beacon/node/metrics/Metrics.java | 167 ++++++++++++++++++ .../beacon/start/common/NodeLauncher.java | 26 ++- .../emulator/config/main/Configuration.java | 9 + .../java/org/ethereum/beacon/node/Node.java | 15 ++ .../beacon/node/NodeCommandLauncher.java | 58 ++++-- versions.gradle | 4 + .../beacon/wire/net/netty/NettyServer.java | 21 ++- 8 files changed, 282 insertions(+), 22 deletions(-) create mode 100644 start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java diff --git a/start/common/build.gradle b/start/common/build.gradle index 2d8aca98f..1d990d8b5 100644 --- a/start/common/build.gradle +++ b/start/common/build.gradle @@ -14,6 +14,10 @@ dependencies { implementation 'com.google.guava:guava' implementation 'io.projectreactor:reactor-core' + implementation 'io.prometheus:simpleclient' + implementation 'io.prometheus:simpleclient_hotspot' + implementation 'io.prometheus:simpleclient_httpserver' + testImplementation 'org.mockito:mockito-core' testImplementation 'org.apache.logging.log4j:log4j-core' diff --git a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java new file mode 100644 index 000000000..04ddb3dd1 --- /dev/null +++ b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java @@ -0,0 +1,167 @@ +package org.ethereum.beacon.node.metrics; + +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.exporter.HTTPServer; +import org.ethereum.beacon.chain.observer.ObservableBeaconState; +import org.ethereum.beacon.consensus.BeaconChainSpec; +import org.ethereum.beacon.consensus.BeaconStateEx; +import org.ethereum.beacon.core.operations.Attestation; +import org.ethereum.beacon.core.state.PendingAttestation; + +import java.io.IOException; +import java.util.List; + +/** + * Implements (some) metrics from Beacon chain metrics specs. + * @See + * + */ +public class Metrics { + static Gauge PEERS = + Gauge.build().name("beaconchain_peers").help("Tracks number of peers").register(); + static Gauge CURRENT_SLOT = + Gauge.build() + .name("beaconchain_current_slot") + .help("Latest slot recorded by the beacon chain") + .register(); + // extra metric, not required by spec + static Gauge CURRENT_EPOCH = + Gauge.build() + .name("beaconchain_current_epoch") + .help("Latest epoch recorded by the beacon chain") + .register(); + static Gauge CURRENT_JUSTIFIED_EPOCH = + Gauge.build() + .name("beaconchain_current_justified_epoch") + .help("Current justified epoch") + .register(); + static Gauge CURRENT_FINALIZED_EPOCH = + Gauge.build() + .name("beaconchain_current_finalized_epoch") + .help("Current finalized epoch") + .register(); + static Gauge CURRENT_PREV_JUSTIFIED_EPOCH = + Gauge.build() + .name("beaconchain_current_prev_justified_epoch") + .help("Current previously justified epoch") + .register(); + // the metric implementation is probably partially correct, since it accounts only validators, + // which have been included in the chain + static Gauge CURRENT_EPOCH_LIVE_VALIDATORS = + Gauge.build() + .name("beaconchain_current_epoch_live_validators") + .help("Number of active validators who reported for the current epoch") + .register(); + // the metric implementation is probably partially correct, since it accounts only validators, + // which have been included in the chain + static Gauge PREVIOUS_EPOCH_LIVE_VALIDATORS = + Gauge.build() + .name("beaconchain_previous_epoch_live_validators") + .help("Number of active validators who reported for the previous epoch") + .register(); + // not yet implemented + static Counter REORG_EVENTS_TOTAL = + Counter.build() + .name("beaconchain_reorg_events_total") + .help("Occurrence of a reorganization of the chain") + .register(); + // not yet supported by beacon chain yet + static Gauge PENDING_DEPOSITS = + Gauge.build() + .name("beaconchain_pending_deposits") + .help("Number of pending deposits") + .register(); + // not yet supported by beacon chain yet + static Gauge PENDING_EXITS = + Gauge.build().name("beaconchain_pending_exits").help("Number of pending exits").register(); + static Gauge TOTAL_DEPOSITS = + Gauge.build().name("beaconchain_total_deposits").help("Number of total deposits").register(); + // not yet implemented + static Gauge PREVIOUS_EPOCH_STALE_BLOCKS = + Gauge.build() + .name("beaconchain_previous_epoch_stale_blocks") + .help("Number of blocks not included into canonical chain in the previous epoch") + .register(); + // not yet implemented + static Gauge PROPAGATED_ATTESTATIONS = + Gauge.build() + .name("beaconchain_propagated_attestations") + .help("Number of distinct attestations to a slot received from the wire") + .register(); + private static HTTPServer metricsServer; + + // setting initial values explicitly (zeros by default) + static { + PEERS.set(0); + CURRENT_SLOT.set(Double.NaN); + CURRENT_EPOCH.set(Double.NaN); + CURRENT_JUSTIFIED_EPOCH.set(Double.NaN); + CURRENT_FINALIZED_EPOCH.set(Double.NaN); + CURRENT_PREV_JUSTIFIED_EPOCH.set(Double.NaN); + CURRENT_EPOCH_LIVE_VALIDATORS.set(Double.NaN); + PREVIOUS_EPOCH_LIVE_VALIDATORS.set(Double.NaN); + PENDING_DEPOSITS.set(0); + PENDING_EXITS.set(0); + TOTAL_DEPOSITS.set(Double.NaN); + PREVIOUS_EPOCH_STALE_BLOCKS.set(Double.NaN); + PROPAGATED_ATTESTATIONS.set(Double.NaN); + } + + public static void startMetricsServer(String host, int port) { + if (metricsServer != null) { + throw new IllegalStateException("Metrics server already started"); + } + try { + metricsServer = new HTTPServer(host, port); + } catch (IOException e) { + new RuntimeException("Cannot start metrics server", e); + } + } + + public static void stopMetricsServer() { + if (metricsServer == null) { + throw new IllegalStateException("Metrics server has not been started"); + } else { + metricsServer.stop(); + metricsServer = null; + } + } + + public static void peerAdded() { + PEERS.inc(); + } + + public static void peerRemoved() { + PEERS.dec(); + } + + public static void onNewState(BeaconChainSpec spec, ObservableBeaconState obs) { + BeaconStateEx state = obs.getLatestSlotState(); + + CURRENT_SLOT.set(state.getSlot().doubleValue()); + CURRENT_EPOCH.set(spec.get_current_epoch(state).doubleValue()); + CURRENT_JUSTIFIED_EPOCH.set(state.getCurrentJustifiedCheckpoint().getEpoch().doubleValue()); + CURRENT_FINALIZED_EPOCH.set(state.getFinalizedCheckpoint().getEpoch().doubleValue()); + CURRENT_PREV_JUSTIFIED_EPOCH.set( + state.getPreviousJustifiedCheckpoint().getEpoch().doubleValue()); + + CURRENT_EPOCH_LIVE_VALIDATORS.set( + calculateEpochValidators(state.getCurrentEpochAttestations().listCopy())); + PREVIOUS_EPOCH_LIVE_VALIDATORS.set( + calculateEpochValidators(state.getPreviousEpochAttestations().listCopy())); + + TOTAL_DEPOSITS.set(state.getEth1Data().getDepositCount().doubleValue()); + } + + private static int calculateEpochValidators(List epochAttestations) { + return epochAttestations.stream() + .mapToInt(pa -> pa.getAggregationBits().getBits().size()) + .sum(); + } + + public static void attestationPropagated(Attestation attestation) { + // dummy implementation right now + PROPAGATED_ATTESTATIONS.inc(); + } +} diff --git a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java index 317d59ada..faa19527f 100644 --- a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java +++ b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java @@ -1,10 +1,5 @@ package org.ethereum.beacon.start.common; -import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT; - -import java.nio.file.Paths; -import java.time.Duration; -import java.util.List; import org.ethereum.beacon.chain.DefaultBeaconChain; import org.ethereum.beacon.chain.MutableBeaconChain; import org.ethereum.beacon.chain.ProposedBlockProcessor; @@ -29,14 +24,15 @@ import org.ethereum.beacon.core.spec.SpecConstants; import org.ethereum.beacon.core.spec.SpecConstantsResolver; import org.ethereum.beacon.db.Database; +import org.ethereum.beacon.node.metrics.Metrics; import org.ethereum.beacon.pow.DepositContract; import org.ethereum.beacon.schedulers.Schedulers; import org.ethereum.beacon.ssz.SSZBuilder; import org.ethereum.beacon.ssz.SSZSerializer; import org.ethereum.beacon.validator.BeaconChainProposer; -import org.ethereum.beacon.validator.local.MultiValidatorService; import org.ethereum.beacon.validator.attester.BeaconChainAttesterImpl; import org.ethereum.beacon.validator.crypto.BLS381Credentials; +import org.ethereum.beacon.validator.local.MultiValidatorService; import org.ethereum.beacon.validator.proposer.BeaconChainProposerImpl; import org.ethereum.beacon.wire.Feedback; import org.ethereum.beacon.wire.MessageSerializer; @@ -55,6 +51,12 @@ import reactor.core.publisher.Mono; import tech.pegasys.artemis.util.uint.UInt64; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.List; + +import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT; + public class NodeLauncher { private final static long DB_BUFFER_SIZE = 64L << 20; // 64Mb @@ -161,6 +163,12 @@ void chainStarted(ChainStart chainStartEvent) { emptySlotTransition, schedulers, validatorCred != null ? Integer.MAX_VALUE : DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT); + + Flux.from(observableStateProcessor.getObservableStateStream()) + .subscribe( + obs -> { + Metrics.onNewState(spec, obs); + }); observableStateProcessor.start(); SSZSerializer ssz = new SSZBuilder() @@ -184,6 +192,12 @@ void chainStarted(ChainStart chainStartEvent) { wireApiSub = peerManager.getWireApiSub(); wireApiSyncRemote = peerManager.getWireApiSync(); + Flux.from(wireApiSub.inboundAttestationsStream()) + .subscribe( + a -> { + Metrics.attestationPropagated(a); + }); + blockTree = new BeaconBlockTree(spec.getObjectHasher()); syncQueue = new SyncQueueImpl(blockTree); diff --git a/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java index 1f6eb6ada..0b6363756 100644 --- a/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java +++ b/start/config/src/main/java/org/ethereum/beacon/emulator/config/main/Configuration.java @@ -10,6 +10,7 @@ public class Configuration { private String db; private List networks = new ArrayList<>(); private Validator validator; + String metricsEndpoint; public String getName() { return name; @@ -42,4 +43,12 @@ public Validator getValidator() { public void setValidator(Validator validator) { this.validator = validator; } + + public String getMetricsEndpoint() { + return metricsEndpoint; + } + + public void setMetricsEndpoint(String metricsEndpoint) { + this.metricsEndpoint = metricsEndpoint; + } } diff --git a/start/node/src/main/java/org/ethereum/beacon/node/Node.java b/start/node/src/main/java/org/ethereum/beacon/node/Node.java index 5ba769354..399156008 100644 --- a/start/node/src/main/java/org/ethereum/beacon/node/Node.java +++ b/start/node/src/main/java/org/ethereum/beacon/node/Node.java @@ -98,6 +98,17 @@ public class Node implements Runnable { ) private String specConstantsFile; + @CommandLine.Option( + names = "--metrics-endpoint", + paramLabel = "matrics-endpoint", + description = { + "Interface and port, Prometheus collection endpoint will be served from.", + "Should have form of interface:port.", + "Default endpoint is 0.0.0.0:8008." + } + ) + private String metricsEndpoint; + public String getName() { return name; } @@ -122,6 +133,10 @@ public String getSpecConstantsFile() { return specConstantsFile; } + public String getMetricsEndpoint() { + return metricsEndpoint; + } + public static void main(String[] args) { try { CommandLine commandLine = new CommandLine(new Node()); diff --git a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java index 8eabcc51b..cbb561ed7 100644 --- a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java +++ b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java @@ -1,20 +1,6 @@ package org.ethereum.beacon.node; import io.netty.channel.ChannelFutureListener; -import java.io.File; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.URI; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Random; -import java.util.TimeZone; -import java.util.concurrent.ThreadFactory; -import java.util.stream.IntStream; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -41,6 +27,7 @@ import org.ethereum.beacon.emulator.config.main.conract.EmulatorContract; import org.ethereum.beacon.emulator.config.main.network.NettyNetwork; import org.ethereum.beacon.emulator.config.main.network.Network; +import org.ethereum.beacon.node.metrics.Metrics; import org.ethereum.beacon.pow.DepositContract; import org.ethereum.beacon.schedulers.DefaultSchedulers; import org.ethereum.beacon.schedulers.Scheduler; @@ -53,9 +40,24 @@ import org.ethereum.beacon.wire.net.netty.NettyClient; import org.ethereum.beacon.wire.net.netty.NettyServer; import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; +import java.io.File; import java.lang.reflect.InvocationTargetException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; import java.nio.file.Paths; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.TimeZone; +import java.util.concurrent.ThreadFactory; +import java.util.stream.IntStream; public class NodeCommandLauncher implements Runnable { private static final Logger logger = LogManager.getLogger("node"); @@ -174,11 +176,35 @@ protected ThreadFactory createThreadFactory(String namePattern) { URI uri = URI.create(addr); tcpConnectionManager.addActivePeer(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); } + + Flux.from(connectionManager.channelsStream()) + .subscribe( + ch -> { + Metrics.peerAdded(); + ch.getCloseFuture().thenAccept(v -> Metrics.peerRemoved()); + }); } else { throw new IllegalArgumentException( "This type of network is not supported yet: " + networkCfg.getClass()); } + String metricsEndpoint = config.getConfig().getMetricsEndpoint(); + String metricsHost; + int metricsPort; + if (metricsEndpoint == null) { + metricsHost = "0.0.0.0"; + metricsPort = 8008; + } else { + String[] parts = metricsEndpoint.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Wrong metrics endpoint format: \"" + metricsEndpoint + "\""); + } + metricsHost = parts[0]; + metricsPort = Integer.parseInt(parts[1]); + } + Metrics.startMetricsServer(metricsHost, metricsPort); + NodeLauncher node = new NodeLauncher( specBuilder.buildSpec(), this.depositContract, @@ -346,6 +372,10 @@ public NodeCommandLauncher build() { } } + if (cliOptions.getMetricsEndpoint() != null) { + config.getConfig().setMetricsEndpoint(cliOptions.getMetricsEndpoint()); + } + return new NodeCommandLauncher( config, specBuilder, diff --git a/versions.gradle b/versions.gradle index 73d0030f3..7deb8ce43 100644 --- a/versions.gradle +++ b/versions.gradle @@ -40,5 +40,9 @@ dependencyManagement { dependency "org.rocksdb:rocksdbjni:6.2.2" dependency "com.googlecode.concurrent-locks:concurrent-locks:1.0.0" + + dependency "io.prometheus:simpleclient:0.6.0" + dependency "io.prometheus:simpleclient_hotspot:0.6.0" + dependency "io.prometheus:simpleclient_httpserver:0.6.0" } } diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java b/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java index 3bdace966..4ab8d20c1 100644 --- a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java +++ b/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java @@ -9,23 +9,40 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; -import java.util.concurrent.Executor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.ethereum.beacon.wire.net.Server; +import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.UnicastProcessor; +import java.util.concurrent.Executor; + public class NettyServer implements Server { private static final Logger logger = LogManager.getLogger(NettyServer.class); private UnicastProcessor channels = UnicastProcessor.create(); + private DirectProcessor channelsDispatcher = createDispatcherProcessor(channels); private FluxSink channelsSink = channels.sink(); private final int port; private ChannelFuture channelFuture; private final NioEventLoopGroup workerGroup; + /** + * UnicastProcessor allows single subscriber only. + * Work around by attaching a DirectProcessor to it. + */ + @NotNull + private static DirectProcessor createDispatcherProcessor( + Publisher channels) { + DirectProcessor processor = DirectProcessor.create(); + Flux.from(channels).subscribe(processor); + return processor; + } + public NettyServer(int port, NioEventLoopGroup workerGroup) { this.port = port; this.workerGroup = workerGroup; @@ -42,7 +59,7 @@ public NettyServer(int port) { @Override public Publisher channelsStream() { - return channels; + return channelsDispatcher; } private void onChannelActive(NettyChannel channel) { From 78fa67c899b93510d10ed47cca8cf37e32077daf Mon Sep 17 00:00:00 2001 From: Alex Vlasov/ericsson49 Date: Tue, 3 Sep 2019 18:33:07 +0300 Subject: [PATCH 2/5] Replace DirectProcessor with ReplayProcessor. This is a temporary decision to fix failed test. --- .../org/ethereum/beacon/wire/net/netty/NettyServer.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java b/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java index 4ab8d20c1..6e7341588 100644 --- a/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java +++ b/wire/src/main/java/org/ethereum/beacon/wire/net/netty/NettyServer.java @@ -13,10 +13,11 @@ import org.apache.logging.log4j.Logger; import org.ethereum.beacon.wire.net.Server; import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Processor; import org.reactivestreams.Publisher; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.publisher.ReplayProcessor; import reactor.core.publisher.UnicastProcessor; import java.util.concurrent.Executor; @@ -25,7 +26,7 @@ public class NettyServer implements Server { private static final Logger logger = LogManager.getLogger(NettyServer.class); private UnicastProcessor channels = UnicastProcessor.create(); - private DirectProcessor channelsDispatcher = createDispatcherProcessor(channels); + private Publisher channelsDispatcher = createDispatcherProcessor(channels); private FluxSink channelsSink = channels.sink(); private final int port; private ChannelFuture channelFuture; @@ -36,9 +37,9 @@ public class NettyServer implements Server { * Work around by attaching a DirectProcessor to it. */ @NotNull - private static DirectProcessor createDispatcherProcessor( + private static Publisher createDispatcherProcessor( Publisher channels) { - DirectProcessor processor = DirectProcessor.create(); + Processor processor = ReplayProcessor.create(); Flux.from(channels).subscribe(processor); return processor; } From 164df15eae276b9098193423fa7c796512d70297 Mon Sep 17 00:00:00 2001 From: Alex Vlasov/ericsson49 Date: Wed, 4 Sep 2019 01:23:46 +0300 Subject: [PATCH 3/5] Use config.db as a prefix for RocksDB path. Useful when starting several nodes from the same working directory. --- .../beacon/start/common/NodeLauncher.java | 19 +++++++++++++++---- .../beacon/node/NodeCommandLauncher.java | 1 + .../resources/config/default-node-config.yml | 2 +- .../resources/config/sample-node-config.yml | 2 +- .../org/ethereum/beacon/wire/NodeTest.java | 2 ++ 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java index faa19527f..6456b085d 100644 --- a/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java +++ b/start/common/src/main/java/org/ethereum/beacon/start/common/NodeLauncher.java @@ -64,6 +64,7 @@ public class NodeLauncher { private final BeaconChainSpec spec; private final DepositContract depositContract; private final List validatorCred; + private final String dbPrefix; private final BeaconChainStorageFactory storageFactory; private final Schedulers schedulers; @@ -103,6 +104,7 @@ public NodeLauncher( DepositContract depositContract, List validatorCred, ConnectionManager connectionManager, + String dbPrefix, BeaconChainStorageFactory storageFactory, Schedulers schedulers, boolean startSyncManager) { @@ -111,6 +113,7 @@ public NodeLauncher( this.depositContract = depositContract; this.validatorCred = validatorCred; this.connectionManager = connectionManager; + this.dbPrefix = dbPrefix; this.storageFactory = storageFactory; this.schedulers = schedulers; this.startSyncManager = startSyncManager; @@ -129,7 +132,13 @@ void chainStarted(ChainStart chainStartEvent) { new ExtendedSlotTransition(perEpochTransition, perSlotTransition, spec); emptySlotTransition = new EmptySlotTransition(extendedSlotTransition); - db = Database.rocksDB(Paths.get(computeDbName(chainStartEvent)).toString(), DB_BUFFER_SIZE); + if (dbPrefix == null) { + db = Database.inMemoryDB(); + } else { + db = + Database.rocksDB( + Paths.get(computeDbName(dbPrefix, chainStartEvent)).toString(), DB_BUFFER_SIZE); + } beaconChainStorage = storageFactory.create(db); blockVerifier = BeaconBlockVerifier.createDefault(spec); @@ -253,10 +262,12 @@ void chainStarted(ChainStart chainStartEvent) { // .subscribe(beaconChain::insert); } - private String computeDbName(ChainStart chainStart) { + private String computeDbName(String dbPrefix, ChainStart chainStart) { return String.format( - "db_start_time_%d_dep_root_%s", - chainStart.getTime().getValue(), chainStart.getEth1Data().getDepositRoot().toStringShort()); + "%s_start_time_%d_dep_root_%s", + dbPrefix, + chainStart.getTime().getValue(), + chainStart.getEth1Data().getDepositRoot().toStringShort()); } public void stop() { diff --git a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java index cbb561ed7..d18cffe5b 100644 --- a/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java +++ b/start/node/src/main/java/org/ethereum/beacon/node/NodeCommandLauncher.java @@ -210,6 +210,7 @@ protected ThreadFactory createThreadFactory(String namePattern) { this.depositContract, credentials, connectionManager, + config.getConfig().getDb(), new SSZBeaconChainStorageFactory( spec.getObjectHasher(), SerializerFactory.createSSZ(specConstants)), diff --git a/start/node/src/main/resources/config/default-node-config.yml b/start/node/src/main/resources/config/default-node-config.yml index dad4809a9..c8c2e40b7 100644 --- a/start/node/src/main/resources/config/default-node-config.yml +++ b/start/node/src/main/resources/config/default-node-config.yml @@ -1,5 +1,5 @@ config: - db: file://db + db: db networks: - type: netty diff --git a/start/node/src/main/resources/config/sample-node-config.yml b/start/node/src/main/resources/config/sample-node-config.yml index 51c6cca71..1114c5d80 100644 --- a/start/node/src/main/resources/config/sample-node-config.yml +++ b/start/node/src/main/resources/config/sample-node-config.yml @@ -1,6 +1,6 @@ config: # location of database - db: file://db + db: db # the list of networks networks: diff --git a/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java b/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java index 78ad3814d..177bd1335 100644 --- a/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java +++ b/wire/src/test/java/org/ethereum/beacon/wire/NodeTest.java @@ -91,6 +91,7 @@ public void test1() throws Exception { .map(BLS381Credentials::createWithDummySigner) .collect(Collectors.toList()), connectionManager, + null, new MemBeaconChainStorageFactory(spec.getObjectHasher()), schedulers, false); @@ -113,6 +114,7 @@ public void test1() throws Exception { depositContract, null, slaveConnectionManager, + null, new MemBeaconChainStorageFactory(spec.getObjectHasher()), schedulers, true); From 9dcca7c9a7e5397f8c677b13de7f43afaf85e7b9 Mon Sep 17 00:00:00 2001 From: Alex Vlasov/ericsson49 Date: Wed, 4 Sep 2019 14:33:18 +0300 Subject: [PATCH 4/5] A better implementation of beaconchain_propagated_attestations. --- .../ethereum/beacon/node/metrics/Metrics.java | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java index 04ddb3dd1..6c292073a 100644 --- a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java +++ b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java @@ -7,10 +7,15 @@ import org.ethereum.beacon.consensus.BeaconChainSpec; import org.ethereum.beacon.consensus.BeaconStateEx; import org.ethereum.beacon.core.operations.Attestation; +import org.ethereum.beacon.core.operations.attestation.AttestationData; import org.ethereum.beacon.core.state.PendingAttestation; +import org.ethereum.beacon.core.types.SlotNumber; +import tech.pegasys.artemis.util.collections.Bitlist; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Implements (some) metrics from Beacon chain metrics specs. @@ -83,7 +88,7 @@ public class Metrics { .name("beaconchain_previous_epoch_stale_blocks") .help("Number of blocks not included into canonical chain in the previous epoch") .register(); - // not yet implemented + // simple implementation currently - counts attestations arrived from wire during a slot static Gauge PROPAGATED_ATTESTATIONS = Gauge.build() .name("beaconchain_propagated_attestations") @@ -91,6 +96,13 @@ public class Metrics { .register(); private static HTTPServer metricsServer; + private static final Object attestation_lock = new Object(); + private static SlotNumber currentSlot = null; + /** + * Accumulates atetstations received during a slot. Cleared on a new slot. + */ + private static final Map currentSlotAttestations = new HashMap<>(); + // setting initial values explicitly (zeros by default) static { PEERS.set(0); @@ -139,6 +151,15 @@ public static void peerRemoved() { public static void onNewState(BeaconChainSpec spec, ObservableBeaconState obs) { BeaconStateEx state = obs.getLatestSlotState(); + synchronized (attestation_lock) { + if (currentSlot == null) { + currentSlot = state.getSlot(); + } else if (currentSlot.less(state.getSlot())) { + currentSlot = state.getSlot(); + currentSlotAttestations.clear(); + } + } + CURRENT_SLOT.set(state.getSlot().doubleValue()); CURRENT_EPOCH.set(spec.get_current_epoch(state).doubleValue()); CURRENT_JUSTIFIED_EPOCH.set(state.getCurrentJustifiedCheckpoint().getEpoch().doubleValue()); @@ -161,7 +182,22 @@ private static int calculateEpochValidators(List epochAttest } public static void attestationPropagated(Attestation attestation) { - // dummy implementation right now - PROPAGATED_ATTESTATIONS.inc(); + Bitlist aggregationBits = attestation.getAggregationBits(); + AttestationData attestationData = attestation.getData(); + + int attestationCount; + synchronized (attestation_lock) { + Bitlist attestations; + if (!currentSlotAttestations.containsKey(attestationData)) { + attestations = aggregationBits.cappedCopy(aggregationBits.maxSize()); + } else { + attestations = currentSlotAttestations.get(attestationData).or(aggregationBits); + } + currentSlotAttestations.put(attestationData, attestations); + attestationCount = + currentSlotAttestations.values().stream().mapToInt(bl -> bl.getBits().size()).sum(); + } + + PROPAGATED_ATTESTATIONS.set(attestationCount); } } From 9bb47149b005c5c197495e56406987420210b28c Mon Sep 17 00:00:00 2001 From: Alex Vlasov/ericsson49 Date: Wed, 4 Sep 2019 18:06:49 +0300 Subject: [PATCH 5/5] Fix - reset propagated attestations metrics, when a new slot started. --- .../src/main/java/org/ethereum/beacon/node/metrics/Metrics.java | 1 + 1 file changed, 1 insertion(+) diff --git a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java index 6c292073a..c451e8eca 100644 --- a/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java +++ b/start/common/src/main/java/org/ethereum/beacon/node/metrics/Metrics.java @@ -157,6 +157,7 @@ public static void onNewState(BeaconChainSpec spec, ObservableBeaconState obs) { } else if (currentSlot.less(state.getSlot())) { currentSlot = state.getSlot(); currentSlotAttestations.clear(); + PROPAGATED_ATTESTATIONS.set(0); } }