Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
183744b
Fix BenchmarkLauncher compilation
Nashatyrev Apr 18, 2019
6c16c13
Add a dedicated storage for blockHeaders. For now it just refers to b…
Nashatyrev Apr 18, 2019
8cc3cc5
Initial draft drop of wire implementation
Nashatyrev Apr 18, 2019
97ac010
Differentiate raw Message and MessagePayload. Minor wire refactorings…
Nashatyrev Apr 18, 2019
b01d419
Differentiate Response and Request MessagePayload.
Nashatyrev Apr 18, 2019
24c7cd3
Draft intermediate commit. Add some sync related code
Nashatyrev Apr 22, 2019
a2edd67
Intermediate sync manager draft commit
Nashatyrev Apr 23, 2019
ad2261d
Implement simple sync strategy
Nashatyrev Apr 25, 2019
98c922a
Channel draft implementation
Nashatyrev Apr 26, 2019
731571e
Complete BeaconPipeline. Bunch of Channel fixes. Create and pass Beac…
Nashatyrev Apr 30, 2019
698a344
Fix assertion
Nashatyrev Apr 30, 2019
c330326
Add toString()
Nashatyrev May 6, 2019
7ee4d64
Refactor SimulatorLauncher for using from tests
Nashatyrev May 6, 2019
98fe2f3
Add SyncTest
Nashatyrev May 6, 2019
d41de50
Fix block root calculation (should be truncated hash)
Nashatyrev May 6, 2019
313da29
Fix several sync bugs
Nashatyrev May 6, 2019
06272b5
Fix another couple of bugs in sync code
Nashatyrev May 6, 2019
d708f5c
Complete the SyncTest simple case
Nashatyrev May 7, 2019
597e474
Add missing WireApi's. Add notified new blocks to the SyncManager
Nashatyrev May 7, 2019
36491f7
Add simple netty server/client Channel implementation
Nashatyrev May 7, 2019
c243cca
Extract Server/Client interfaces. Fix a couple of issues
Nashatyrev May 8, 2019
1dc9a83
Add Peer implementation and simple PeerManager
Nashatyrev May 8, 2019
27f12de
Some refactorings.
Nashatyrev May 9, 2019
bd6dea6
Further work on sync by wire
Nashatyrev May 9, 2019
494b72c
Add some toString's
Nashatyrev May 10, 2019
8222598
Add SyncManager Long/Short mode
Nashatyrev May 10, 2019
7ed10a7
Move MDCControlledSchedulers to a separate class. Remove unnecessary …
Nashatyrev May 13, 2019
062fa13
Add NodeLauncher and NodeTest where one peer syncs to another by TCP.…
Nashatyrev May 15, 2019
cc62328
Refactor: move netty related code to a separate package
Nashatyrev May 15, 2019
5f1850c
WireApiSyncRouter feeds active peers in cycle
Nashatyrev May 16, 2019
1bf6cd0
Add missing hashCode() methods
Nashatyrev May 16, 2019
b9156a6
Implement flood pub for new blocks and attestations
Nashatyrev May 16, 2019
1042271
Fix ConnectionManager, add test
Nashatyrev May 16, 2019
f3fda15
Minor cleanup
Nashatyrev May 17, 2019
1a06c0c
Draft template for Node launcher
Nashatyrev May 17, 2019
178706f
Merge remote-tracking branch 'origin/develop' into feature/wire
Nashatyrev May 17, 2019
e401c85
Resolve merge compile conflicts
Nashatyrev May 17, 2019
1d6823a
Fix build error
Nashatyrev May 17, 2019
4e25524
Use common classes for BenchmarkRunner
Nashatyrev May 20, 2019
89b5755
Fix test dependencies
Nashatyrev May 20, 2019
6b547fa
Filter 'future' attestations in ObservableState
Nashatyrev May 20, 2019
234f1da
Expose 'syncMode' from SyncManager
Nashatyrev May 20, 2019
7ea8888
Add toString()
Nashatyrev May 20, 2019
e6966f9
Make NodeTest a unit test
Nashatyrev May 20, 2019
9d7f0a1
Merge remote-tracking branch 'origin/develop' into feature/wire
Nashatyrev May 20, 2019
215c78e
Fix mode switching
Nashatyrev May 20, 2019
1701eb5
Fix NodeTest: check sync mode on different stages
Nashatyrev May 20, 2019
13d4804
Add specHelpers cache control via config
Nashatyrev May 20, 2019
069f448
Add DebugCacheFactory
Nashatyrev May 20, 2019
9d6bc62
Temporarily disable cache for tests (due to invalid fork cache handling)
Nashatyrev May 20, 2019
e4e3da1
Fix the test
Nashatyrev May 20, 2019
0a32744
Add hashCode()
Nashatyrev May 20, 2019
2903a44
Extract useful TestDataFactory test util from ModelSerializerTest
Nashatyrev May 20, 2019
9d6ac85
Close server on test complete
Nashatyrev May 20, 2019
52ba766
Remove obsolete test
Nashatyrev May 20, 2019
c51831f
Disable specHelpers cache for tests due to incorrect fork handling
Nashatyrev May 20, 2019
b129310
Removed @Test from the test util class
Nashatyrev May 20, 2019
feffd76
Remove obsolete test util field
Nashatyrev May 20, 2019
4a748a1
Refactor Validator config
Nashatyrev May 20, 2019
99d331e
Create NodeLauncher from config
Nashatyrev May 20, 2019
094b39f
Reduce loglevel for StateCachingTransition
Nashatyrev May 21, 2019
35e3fea
Add 'node' command line options
Nashatyrev May 21, 2019
d2c370f
Fix connectionManager
Nashatyrev May 21, 2019
cabbeac
Fix peerManager
Nashatyrev May 21, 2019
de938b9
Fix SyncServer to correctly manage empty slots
Nashatyrev May 21, 2019
ad208da
Enable syncManager
Nashatyrev May 21, 2019
a41153b
Use SyncQueue production params
Nashatyrev May 21, 2019
8ebb472
Synchronize the BlockTree class
Nashatyrev May 21, 2019
9ae41e5
Fix test compile error
Nashatyrev May 21, 2019
1d28c91
Don't recalculate empty slot transitions from head on every slot
Nashatyrev May 22, 2019
5a69b83
Fix typo
Nashatyrev May 22, 2019
f585230
Fix: bodies can be the same for different blocks. Thus duplicates are…
Nashatyrev May 22, 2019
ba58fde
Rely on SpecHelpers.verify_attestation when populating pending operat…
Nashatyrev May 22, 2019
37d63dd
Temporarily make the sync less aggressive
Nashatyrev May 22, 2019
1dd9506
Make deposit amount configurable
Nashatyrev May 22, 2019
f59a680
Move netty to our Schedulers. Add node name for logs and thread namin…
Nashatyrev May 22, 2019
95d76ba
Return meaningful result from BeaconChain.insert()
Nashatyrev May 22, 2019
f317c9b
Submit to SyncManager not only inbound new blocks but own created and…
Nashatyrev May 22, 2019
941b61a
Fix concurrent modification, make stream elements (arrays) immutable
Nashatyrev May 23, 2019
14905c9
Write unhandled exceptions to log by default
Nashatyrev May 23, 2019
4aaf917
Fix Log4j warnings at startup
Nashatyrev May 23, 2019
f7702f1
Set test genesis slot to 0
Nashatyrev May 23, 2019
56b12db
Wire javadoc added
Nashatyrev May 23, 2019
1da43be
Make syncManager sync mode delays configurable (should be 0 delay for…
Nashatyrev May 23, 2019
322f7be
Fix tests
Nashatyrev May 23, 2019
485b05d
Add --genesis-time cli option. Add default genesis time generation
Nashatyrev May 24, 2019
b227b27
Add uncaught exceptions handler for DefaultScheduler Threads to repor…
Nashatyrev May 24, 2019
56af85d
Don't stop Validator service subscription on internal validator error
Nashatyrev May 24, 2019
f8b7dcd
Don't generate ObservableState if the current slot is far above the l…
Nashatyrev May 24, 2019
c4d232d
Fix build script for node module
mkalinin May 24, 2019
59a0b23
Modify zip() operator prefetch parameter to 1. By default it is 32 an…
Nashatyrev May 24, 2019
df8b760
Add ability to disconnect when removing activePeer
Nashatyrev May 24, 2019
257403b
Merge remote-tracking branch 'origin/feature/wire' into feature/wire
Nashatyrev May 24, 2019
124780f
New line at the end
Nashatyrev May 24, 2019
481a170
Fix the test
Nashatyrev May 24, 2019
cea0aad
Reword comment
Nashatyrev May 24, 2019
2d77a30
Rename OtherError to UnexpectedError
Nashatyrev May 24, 2019
0d477ef
Merge remote-tracking branch 'origin/develop' into feature/wire
Nashatyrev May 24, 2019
8013b77
Add some test out
Nashatyrev May 24, 2019
be9c3aa
Fix tests: due to finalization changes we need a bit longer chain to …
Nashatyrev May 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ public BeaconStateEx getFinalState() {
return getState();
}

@Override
public String toString() {
return getFinalState().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ private void initializeStorage() {
}

@Override
public synchronized boolean insert(BeaconBlock block) {
public synchronized ImportResult insert(BeaconBlock block) {
if (rejectedByTime(block)) {
return false;
return ImportResult.ExpiredBlock;
}

if (exist(block)) {
return false;
return ImportResult.ExistingBlock;
}

if (!hasParent(block)) {
return false;
return ImportResult.NoParent;
}

long s = System.nanoTime();
Expand All @@ -121,7 +121,7 @@ public synchronized boolean insert(BeaconBlock block) {
if (!blockVerification.isPassed()) {
logger.warn("Block verification failed: " + blockVerification + ": " +
block.toString(spec.getConstants(), parentState.getGenesisTime(), spec::signing_root));
return false;
return ImportResult.InvalidBlock;
}

BeaconStateEx postBlockState = blockTransition.apply(preBlockState, block);
Expand All @@ -130,7 +130,7 @@ public synchronized boolean insert(BeaconBlock block) {
stateVerifier.verify(postBlockState, block);
if (!stateVerification.isPassed()) {
logger.warn("State verification failed: " + stateVerification);
return false;
return ImportResult.StateMismatch;
}

BeaconTuple newTuple = BeaconTuple.of(block, postBlockState);
Expand All @@ -154,7 +154,7 @@ public synchronized boolean insert(BeaconBlock block) {
spec::signing_root),
String.format("%.3f", ((double) total) / 1_000_000_000d));

return true;
return ImportResult.OK;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@

public interface MutableBeaconChain extends BeaconChain {

enum ImportResult {
OK,
ExistingBlock,
NoParent,
ExpiredBlock,
InvalidBlock,
StateMismatch,
UnexpectedError
}

/**
* Inserts new block into a chain.
*
* @param block a block.
* @return whether a block was inserted or not.
*/
boolean insert(BeaconBlock block);
ImportResult insert(BeaconBlock block);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.ethereum.beacon.chain;

import org.ethereum.beacon.chain.MutableBeaconChain.ImportResult;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.stream.SimpleProcessor;
Expand All @@ -19,10 +20,10 @@ public ProposedBlockProcessorImpl(MutableBeaconChain beaconChain, Schedulers sch
}

@Override
public void newBlockProposed(BeaconBlock newBlcok) {
boolean result = beaconChain.insert(newBlcok);
if (result) {
blocksStream.onNext(newBlcok);
public void newBlockProposed(BeaconBlock newBlock) {
ImportResult result = beaconChain.insert(newBlock);
if (result == ImportResult.OK) {
blocksStream.onNext(newBlock);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ethereum.beacon.chain.BeaconChainHead;
import org.ethereum.beacon.chain.BeaconTuple;
import org.ethereum.beacon.chain.BeaconTupleDetails;
Expand Down Expand Up @@ -39,7 +41,12 @@
import reactor.core.publisher.Flux;

public class ObservableStateProcessorImpl implements ObservableStateProcessor {
private static final Logger logger = LogManager.getLogger(ObservableStateProcessorImpl.class);

private static final int MAX_TUPLE_CACHE_SIZE = 32;

private final int maxEmptySlotTransitions = 256;

private final BeaconTupleStorage tupleStorage;

private final HeadFunction headFunction;
Expand Down Expand Up @@ -218,22 +225,40 @@ private void newHead(BeaconTupleDetails head) {

private void newSlot(SlotNumber newSlot) {
if (head.getBlock().getSlot().greater(newSlot)) {
logger.info("Ignore new slot " + newSlot + " below head block: " + head.getBlock());
return;
}
if (newSlot.greater(head.getBlock().getSlot().plus(maxEmptySlotTransitions))) {
logger.debug("Ignore new slot " + newSlot + " far above head block: " + head.getBlock());
return;
}

updateCurrentObservableState(head, newSlot);
}

private void updateCurrentObservableState(BeaconTupleDetails head, SlotNumber slot) {
assert slot.greaterEqual(head.getBlock().getSlot());

PendingOperations pendingOperations =
getPendingOperations(head.getFinalState(), copyAttestationCache());
if (slot.greater(head.getBlock().getSlot())) {
BeaconStateEx stateUponASlot = emptySlotTransition.apply(head.getFinalState(), slot);
BeaconStateEx stateUponASlot;
if (latestState.getSlot().greater(spec.getConstants().getGenesisSlot())
&& spec.getObjectHasher()
.getHashTruncateLast(head.getBlock())
.equals(
spec.get_block_root_at_slot(latestState, latestState.getSlot().decrement()))) {

// latestState is actual with respect to current head
stateUponASlot = emptySlotTransition.apply(latestState, slot);
} else {
// recalculate all empty slots starting from the head
stateUponASlot = emptySlotTransition.apply(head.getFinalState(), slot);
}
latestState = stateUponASlot;
PendingOperations pendingOperations = getPendingOperations(stateUponASlot, copyAttestationCache());
observableStateStream.onNext(
new ObservableBeaconState(head.getBlock(), stateUponASlot, pendingOperations));
} else {
PendingOperations pendingOperations = getPendingOperations(head.getFinalState(), copyAttestationCache());
if (head.getPostSlotState().isPresent()) {
latestState = head.getPostSlotState().get();
observableStateStream.onNext(new ObservableBeaconState(
Expand All @@ -255,16 +280,9 @@ private PendingOperations getPendingOperations(
BeaconState state, Map<BLSPubkey, List<Attestation>> attestationMap) {
List<Attestation> attestations = attestationMap.values().stream()
.flatMap(Collection::stream)
.filter(attestation -> {
/* attestation_slot = get_attestation_slot(state, attestation)
assert attestation_slot + MIN_ATTESTATION_INCLUSION_DELAY <= state.slot <= attestation_slot + SLOTS_PER_EPOCH */
SlotNumber attestationSlot = spec.get_attestation_slot(state, attestation.getData());
SlotNumber lowerBoundary =
attestationSlot.plus(spec.getConstants().getMinAttestationInclusionDelay());
SlotNumber upperBoundary = attestationSlot.plus(spec.getConstants().getSlotsPerEpoch());
return lowerBoundary.lessEqual(state.getSlot())
&& state.getSlot().lessEqual(upperBoundary);
})
.filter(attestation ->
attestation.getData().getTargetEpoch().lessEqual(spec.get_current_epoch(state)))
.filter(attestation -> spec.verify_attestation(state, attestation))
.sorted(Comparator.comparing(attestation -> attestation.getData().getTargetEpoch()))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public List<Attestation> peekAggregateAttestations(int maxCount) {

private Attestation aggregateAttestations(List<Attestation> attestations) {
assert !attestations.isEmpty();
assert attestations.stream().skip(1).allMatch(a -> a.equals(attestations.get(0)));
assert attestations.stream().skip(1).allMatch(a -> a.getData().equals(attestations.get(0).getData()));

Bitfield participants =
attestations.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.ethereum.beacon.chain.storage;

import org.ethereum.beacon.core.BeaconBlockHeader;
import org.ethereum.beacon.db.source.DataSource;
import org.ethereum.beacon.db.source.SingleValueSource;
import tech.pegasys.artemis.ethereum.core.Hash32;

public interface BeaconChainStorage {

BeaconBlockStorage getBlockStorage();

DataSource<Hash32, BeaconBlockHeader> getBlockHeaderStorage();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not wrap it with dedicated interface like others. Or, lets postpone it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's postpone, since I'm not yet sure how this header storage should be organized


BeaconStateStorage getStateStorage();

BeaconTupleStorage getTupleStorage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,30 @@
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.BeaconStateStorage;
import org.ethereum.beacon.chain.storage.BeaconTupleStorage;
import org.ethereum.beacon.core.BeaconBlockHeader;
import org.ethereum.beacon.db.source.DataSource;
import org.ethereum.beacon.db.source.SingleValueSource;
import tech.pegasys.artemis.ethereum.core.Hash32;

/** A default implementation of {@link BeaconChainStorage}. */
public class BeaconChainStorageImpl implements BeaconChainStorage {

private final BeaconBlockStorage blockStorage;
private final DataSource<Hash32, BeaconBlockHeader> blockHeaderStorage;
private final BeaconStateStorage stateStorage;
private final BeaconTupleStorage tupleStorage;
private final SingleValueSource<Hash32> justifiedStorage;
private final SingleValueSource<Hash32> finalizedStorage;

public BeaconChainStorageImpl(
BeaconBlockStorage blockStorage,
DataSource<Hash32, BeaconBlockHeader> blockHeaderStorage,
BeaconStateStorage stateStorage,
BeaconTupleStorage tupleStorage,
SingleValueSource<Hash32> justifiedStorage,
SingleValueSource<Hash32> finalizedStorage) {
this.blockStorage = blockStorage;
this.blockHeaderStorage = blockHeaderStorage;
this.stateStorage = stateStorage;
this.tupleStorage = tupleStorage;
this.justifiedStorage = justifiedStorage;
Expand All @@ -34,6 +39,11 @@ public BeaconBlockStorage getBlockStorage() {
return blockStorage;
}

@Override
public DataSource<Hash32, BeaconBlockHeader> getBlockHeaderStorage() {
return blockHeaderStorage;
}

@Override
public BeaconStateStorage getStateStorage() {
return stateStorage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.ethereum.beacon.chain.storage.impl;

import java.util.Optional;
import javax.annotation.Nonnull;
import org.ethereum.beacon.chain.storage.BeaconBlockStorage;
import org.ethereum.beacon.consensus.hasher.ObjectHasher;
import org.ethereum.beacon.core.BeaconBlock;
import org.ethereum.beacon.core.BeaconBlockHeader;
import org.ethereum.beacon.db.source.DataSource;
import tech.pegasys.artemis.ethereum.core.Hash32;

public class DelegateBlockHeaderStorageImpl implements DataSource<Hash32, BeaconBlockHeader> {

private final BeaconBlockStorage delegateBlockStorage;
private final ObjectHasher<Hash32> objectHasher;

public DelegateBlockHeaderStorageImpl(
BeaconBlockStorage delegateBlockStorage,
ObjectHasher<Hash32> objectHasher) {
this.delegateBlockStorage = delegateBlockStorage;
this.objectHasher = objectHasher;
}

@Override
public Optional<BeaconBlockHeader> get(@Nonnull Hash32 key) {
return delegateBlockStorage
.get(key)
.map(this::createHeader);
}

private BeaconBlockHeader createHeader(BeaconBlock block) {
return new BeaconBlockHeader(
block.getSlot(),
block.getPreviousBlockRoot(),
block.getStateRoot(),
objectHasher.getHash(block.getBody()),
block.getSignature());
}

@Override
public void put(@Nonnull Hash32 key, @Nonnull BeaconBlockHeader value) {
throw new UnsupportedOperationException();
}

@Override
public void remove(@Nonnull Hash32 key) {
throw new UnsupportedOperationException();
}

@Override
public void flush() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ public BeaconChainStorage create(Database database) {
new BeaconStateStorageImpl(new HashMapDataSource<>(), objectHasher);
BeaconTupleStorage tupleStorage = new BeaconTupleStorageImpl(blockStorage, stateStorage);

return new BeaconChainStorageImpl(blockStorage, stateStorage, tupleStorage,
SingleValueSource.memSource(), SingleValueSource.memSource());
return new BeaconChainStorageImpl(
blockStorage,
new DelegateBlockHeaderStorageImpl(blockStorage, objectHasher),
stateStorage,
tupleStorage,
SingleValueSource.memSource(),
SingleValueSource.memSource());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ public BeaconChainStorage create(Database database) {
SingleValueSource<Hash32> finalizedStorage = createHash32Storage(database, "finalized-hash");

return new BeaconChainStorageImpl(
blockStorage, stateStorage, tupleStorage, justifiedStorage, finalizedStorage);
blockStorage,
new DelegateBlockHeaderStorageImpl(blockStorage, objectHasher),
stateStorage,
tupleStorage,
justifiedStorage,
finalizedStorage);
}

private SingleValueSource<Hash32> createHash32Storage(Database database, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ public interface SerializerFactory {
<T> Function<T, BytesValue> getSerializer(Class<T> objectClass);

static SerializerFactory createSSZ(SpecConstants specConstants) {
return new SSZSerializerFactory(
new SSZBuilder()
return new SSZSerializerFactory(new SSZBuilder()
.withExternalVarResolver(new SpecConstantsResolver(specConstants))
.buildSerializer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Collections;
import java.util.stream.IntStream;
import org.ethereum.beacon.chain.MutableBeaconChain.ImportResult;
import org.ethereum.beacon.chain.storage.BeaconChainStorage;
import org.ethereum.beacon.chain.storage.impl.SSZBeaconChainStorageFactory;
import org.ethereum.beacon.chain.storage.impl.SerializerFactory;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void insertAChain() {
BeaconTuple recentlyProcessed = beaconChain.getRecentlyProcessed();
BeaconBlock aBlock = createBlock(recentlyProcessed, spec,
schedulers.getCurrentTime(), perSlotTransition);
Assert.assertTrue(beaconChain.insert(aBlock));
Assert.assertEquals(ImportResult.OK, beaconChain.insert(aBlock));
Assert.assertEquals(aBlock, beaconChain.getRecentlyProcessed().getBlock());

System.out.println("Inserted block: " + (idx + 1));
Expand Down
Loading