Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions start/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ dependencies {
implementation 'com.google.guava:guava'
implementation 'io.projectreactor:reactor-core'

implementation 'io.prometheus:simpleclient'
implementation 'io.prometheus:simpleclient_hotspot'
implementation 'io.prometheus:simpleclient_httpserver'

testImplementation 'org.mockito:mockito-core'
testImplementation 'org.apache.logging.log4j:log4j-core'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package org.ethereum.beacon.node.metrics;

import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.HTTPServer;
import org.ethereum.beacon.chain.observer.ObservableBeaconState;
import org.ethereum.beacon.consensus.BeaconChainSpec;
import org.ethereum.beacon.consensus.BeaconStateEx;
import org.ethereum.beacon.core.operations.Attestation;
import org.ethereum.beacon.core.operations.attestation.AttestationData;
import org.ethereum.beacon.core.state.PendingAttestation;
import org.ethereum.beacon.core.types.SlotNumber;
import tech.pegasys.artemis.util.collections.Bitlist;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Implements (some) metrics from Beacon chain metrics specs.
* @See <a href="https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md"/>
*
*/
public class Metrics {
static Gauge PEERS =
Gauge.build().name("beaconchain_peers").help("Tracks number of peers").register();
static Gauge CURRENT_SLOT =
Gauge.build()
.name("beaconchain_current_slot")
.help("Latest slot recorded by the beacon chain")
.register();
// extra metric, not required by spec
static Gauge CURRENT_EPOCH =
Gauge.build()
.name("beaconchain_current_epoch")
.help("Latest epoch recorded by the beacon chain")
.register();
static Gauge CURRENT_JUSTIFIED_EPOCH =
Gauge.build()
.name("beaconchain_current_justified_epoch")
.help("Current justified epoch")
.register();
static Gauge CURRENT_FINALIZED_EPOCH =
Gauge.build()
.name("beaconchain_current_finalized_epoch")
.help("Current finalized epoch")
.register();
static Gauge CURRENT_PREV_JUSTIFIED_EPOCH =
Gauge.build()
.name("beaconchain_current_prev_justified_epoch")
.help("Current previously justified epoch")
.register();
// the metric implementation is probably partially correct, since it accounts only validators,
// which have been included in the chain
static Gauge CURRENT_EPOCH_LIVE_VALIDATORS =
Gauge.build()
.name("beaconchain_current_epoch_live_validators")
.help("Number of active validators who reported for the current epoch")
.register();
// the metric implementation is probably partially correct, since it accounts only validators,
// which have been included in the chain
static Gauge PREVIOUS_EPOCH_LIVE_VALIDATORS =
Gauge.build()
.name("beaconchain_previous_epoch_live_validators")
.help("Number of active validators who reported for the previous epoch")
.register();
// not yet implemented
static Counter REORG_EVENTS_TOTAL =
Counter.build()
.name("beaconchain_reorg_events_total")
.help("Occurrence of a reorganization of the chain")
.register();
// not yet supported by beacon chain yet
static Gauge PENDING_DEPOSITS =
Gauge.build()
.name("beaconchain_pending_deposits")
.help("Number of pending deposits")
.register();
// not yet supported by beacon chain yet
static Gauge PENDING_EXITS =
Gauge.build().name("beaconchain_pending_exits").help("Number of pending exits").register();
static Gauge TOTAL_DEPOSITS =
Gauge.build().name("beaconchain_total_deposits").help("Number of total deposits").register();
// not yet implemented
static Gauge PREVIOUS_EPOCH_STALE_BLOCKS =
Gauge.build()
.name("beaconchain_previous_epoch_stale_blocks")
.help("Number of blocks not included into canonical chain in the previous epoch")
.register();
// simple implementation currently - counts attestations arrived from wire during a slot
static Gauge PROPAGATED_ATTESTATIONS =
Gauge.build()
.name("beaconchain_propagated_attestations")
.help("Number of distinct attestations to a slot received from the wire")
.register();
private static HTTPServer metricsServer;

private static final Object attestation_lock = new Object();
private static SlotNumber currentSlot = null;
/**
* Accumulates atetstations received during a slot. Cleared on a new slot.
*/
private static final Map<AttestationData, Bitlist> currentSlotAttestations = new HashMap<>();

// setting initial values explicitly (zeros by default)
static {
PEERS.set(0);
CURRENT_SLOT.set(Double.NaN);
CURRENT_EPOCH.set(Double.NaN);
CURRENT_JUSTIFIED_EPOCH.set(Double.NaN);
CURRENT_FINALIZED_EPOCH.set(Double.NaN);
CURRENT_PREV_JUSTIFIED_EPOCH.set(Double.NaN);
CURRENT_EPOCH_LIVE_VALIDATORS.set(Double.NaN);
PREVIOUS_EPOCH_LIVE_VALIDATORS.set(Double.NaN);
PENDING_DEPOSITS.set(0);
PENDING_EXITS.set(0);
TOTAL_DEPOSITS.set(Double.NaN);
PREVIOUS_EPOCH_STALE_BLOCKS.set(Double.NaN);
PROPAGATED_ATTESTATIONS.set(Double.NaN);
}

public static void startMetricsServer(String host, int port) {
if (metricsServer != null) {
throw new IllegalStateException("Metrics server already started");
}
try {
metricsServer = new HTTPServer(host, port);
} catch (IOException e) {
new RuntimeException("Cannot start metrics server", e);
}
}

public static void stopMetricsServer() {
if (metricsServer == null) {
throw new IllegalStateException("Metrics server has not been started");
} else {
metricsServer.stop();
metricsServer = null;
}
}

public static void peerAdded() {
PEERS.inc();
}

public static void peerRemoved() {
PEERS.dec();
}

public static void onNewState(BeaconChainSpec spec, ObservableBeaconState obs) {
BeaconStateEx state = obs.getLatestSlotState();

synchronized (attestation_lock) {
if (currentSlot == null) {
currentSlot = state.getSlot();
} else if (currentSlot.less(state.getSlot())) {
currentSlot = state.getSlot();
currentSlotAttestations.clear();
PROPAGATED_ATTESTATIONS.set(0);
}
}

CURRENT_SLOT.set(state.getSlot().doubleValue());
CURRENT_EPOCH.set(spec.get_current_epoch(state).doubleValue());
CURRENT_JUSTIFIED_EPOCH.set(state.getCurrentJustifiedCheckpoint().getEpoch().doubleValue());
CURRENT_FINALIZED_EPOCH.set(state.getFinalizedCheckpoint().getEpoch().doubleValue());
CURRENT_PREV_JUSTIFIED_EPOCH.set(
state.getPreviousJustifiedCheckpoint().getEpoch().doubleValue());

CURRENT_EPOCH_LIVE_VALIDATORS.set(
calculateEpochValidators(state.getCurrentEpochAttestations().listCopy()));
PREVIOUS_EPOCH_LIVE_VALIDATORS.set(
calculateEpochValidators(state.getPreviousEpochAttestations().listCopy()));

TOTAL_DEPOSITS.set(state.getEth1Data().getDepositCount().doubleValue());
}

private static int calculateEpochValidators(List<PendingAttestation> epochAttestations) {
return epochAttestations.stream()
.mapToInt(pa -> pa.getAggregationBits().getBits().size())
.sum();
}

public static void attestationPropagated(Attestation attestation) {
Bitlist aggregationBits = attestation.getAggregationBits();
AttestationData attestationData = attestation.getData();

int attestationCount;
synchronized (attestation_lock) {
Bitlist attestations;
if (!currentSlotAttestations.containsKey(attestationData)) {
attestations = aggregationBits.cappedCopy(aggregationBits.maxSize());
} else {
attestations = currentSlotAttestations.get(attestationData).or(aggregationBits);
}
currentSlotAttestations.put(attestationData, attestations);
attestationCount =
currentSlotAttestations.values().stream().mapToInt(bl -> bl.getBits().size()).sum();
}

PROPAGATED_ATTESTATIONS.set(attestationCount);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package org.ethereum.beacon.start.common;

import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT;

import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import org.ethereum.beacon.chain.DefaultBeaconChain;
import org.ethereum.beacon.chain.MutableBeaconChain;
import org.ethereum.beacon.chain.ProposedBlockProcessor;
Expand All @@ -29,14 +24,15 @@
import org.ethereum.beacon.core.spec.SpecConstants;
import org.ethereum.beacon.core.spec.SpecConstantsResolver;
import org.ethereum.beacon.db.Database;
import org.ethereum.beacon.node.metrics.Metrics;
import org.ethereum.beacon.pow.DepositContract;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.ssz.SSZBuilder;
import org.ethereum.beacon.ssz.SSZSerializer;
import org.ethereum.beacon.validator.BeaconChainProposer;
import org.ethereum.beacon.validator.local.MultiValidatorService;
import org.ethereum.beacon.validator.attester.BeaconChainAttesterImpl;
import org.ethereum.beacon.validator.crypto.BLS381Credentials;
import org.ethereum.beacon.validator.local.MultiValidatorService;
import org.ethereum.beacon.validator.proposer.BeaconChainProposerImpl;
import org.ethereum.beacon.wire.Feedback;
import org.ethereum.beacon.wire.MessageSerializer;
Expand All @@ -55,13 +51,20 @@
import reactor.core.publisher.Mono;
import tech.pegasys.artemis.util.uint.UInt64;

import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;

import static org.ethereum.beacon.chain.observer.ObservableStateProcessorImpl.DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT;

public class NodeLauncher {

private final static long DB_BUFFER_SIZE = 64L << 20; // 64Mb

private final BeaconChainSpec spec;
private final DepositContract depositContract;
private final List<BLS381Credentials> validatorCred;
private final String dbPrefix;
private final BeaconChainStorageFactory storageFactory;
private final Schedulers schedulers;

Expand Down Expand Up @@ -101,6 +104,7 @@ public NodeLauncher(
DepositContract depositContract,
List<BLS381Credentials> validatorCred,
ConnectionManager<?> connectionManager,
String dbPrefix,
BeaconChainStorageFactory storageFactory,
Schedulers schedulers,
boolean startSyncManager) {
Expand All @@ -109,6 +113,7 @@ public NodeLauncher(
this.depositContract = depositContract;
this.validatorCred = validatorCred;
this.connectionManager = connectionManager;
this.dbPrefix = dbPrefix;
this.storageFactory = storageFactory;
this.schedulers = schedulers;
this.startSyncManager = startSyncManager;
Expand All @@ -127,7 +132,13 @@ void chainStarted(ChainStart chainStartEvent) {
new ExtendedSlotTransition(perEpochTransition, perSlotTransition, spec);
emptySlotTransition = new EmptySlotTransition(extendedSlotTransition);

db = Database.rocksDB(Paths.get(computeDbName(chainStartEvent)).toString(), DB_BUFFER_SIZE);
if (dbPrefix == null) {
db = Database.inMemoryDB();
} else {
db =
Database.rocksDB(
Paths.get(computeDbName(dbPrefix, chainStartEvent)).toString(), DB_BUFFER_SIZE);
}
beaconChainStorage = storageFactory.create(db);

blockVerifier = BeaconBlockVerifier.createDefault(spec);
Expand Down Expand Up @@ -161,6 +172,12 @@ void chainStarted(ChainStart chainStartEvent) {
emptySlotTransition,
schedulers,
validatorCred != null ? Integer.MAX_VALUE : DEFAULT_EMPTY_SLOT_TRANSITIONS_LIMIT);

Flux.from(observableStateProcessor.getObservableStateStream())
.subscribe(
obs -> {
Metrics.onNewState(spec, obs);
});
observableStateProcessor.start();

SSZSerializer ssz = new SSZBuilder()
Expand All @@ -184,6 +201,12 @@ void chainStarted(ChainStart chainStartEvent) {
wireApiSub = peerManager.getWireApiSub();
wireApiSyncRemote = peerManager.getWireApiSync();

Flux.from(wireApiSub.inboundAttestationsStream())
.subscribe(
a -> {
Metrics.attestationPropagated(a);
});

blockTree = new BeaconBlockTree(spec.getObjectHasher());
syncQueue = new SyncQueueImpl(blockTree);

Expand Down Expand Up @@ -239,10 +262,12 @@ void chainStarted(ChainStart chainStartEvent) {
// .subscribe(beaconChain::insert);
}

private String computeDbName(ChainStart chainStart) {
private String computeDbName(String dbPrefix, ChainStart chainStart) {
return String.format(
"db_start_time_%d_dep_root_%s",
chainStart.getTime().getValue(), chainStart.getEth1Data().getDepositRoot().toStringShort());
"%s_start_time_%d_dep_root_%s",
dbPrefix,
chainStart.getTime().getValue(),
chainStart.getEth1Data().getDepositRoot().toStringShort());
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class Configuration {
private String db;
private List<Network> networks = new ArrayList<>();
private Validator validator;
String metricsEndpoint;

public String getName() {
return name;
Expand Down Expand Up @@ -42,4 +43,12 @@ public Validator getValidator() {
public void setValidator(Validator validator) {
this.validator = validator;
}

public String getMetricsEndpoint() {
return metricsEndpoint;
}

public void setMetricsEndpoint(String metricsEndpoint) {
this.metricsEndpoint = metricsEndpoint;
}
}
15 changes: 15 additions & 0 deletions start/node/src/main/java/org/ethereum/beacon/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ public class Node implements Runnable {
)
private String specConstantsFile;

@CommandLine.Option(
names = "--metrics-endpoint",
paramLabel = "matrics-endpoint",
description = {
"Interface and port, Prometheus collection endpoint will be served from.",
"Should have form of interface:port.",
"Default endpoint is 0.0.0.0:8008."
}
)
private String metricsEndpoint;

public String getName() {
return name;
}
Expand All @@ -122,6 +133,10 @@ public String getSpecConstantsFile() {
return specConstantsFile;
}

public String getMetricsEndpoint() {
return metricsEndpoint;
}

public static void main(String[] args) {
try {
CommandLine commandLine = new CommandLine(new Node());
Expand Down
Loading