Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pow/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dependencies {
implementation project(':core')
implementation project(':ssz')
implementation project(':util')
implementation project(':crypto')

implementation 'io.projectreactor:reactor-core'
implementation 'com.google.guava:guava'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package org.ethereum.beacon.pow;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.ethereum.beacon.core.operations.Deposit;
import org.ethereum.beacon.core.operations.deposit.DepositData;
import org.ethereum.beacon.core.state.Eth1Data;
Expand All @@ -14,74 +8,136 @@
import org.ethereum.beacon.core.types.Gwei;
import org.ethereum.beacon.core.types.Time;
import org.ethereum.beacon.schedulers.Schedulers;
import org.ethereum.beacon.ssz.SSZBuilder;
import org.ethereum.beacon.ssz.SSZSerializer;
import org.ethereum.beacon.stream.SimpleProcessor;
import org.javatuples.Pair;
import org.javatuples.Triplet;
import org.reactivestreams.Publisher;
import reactor.core.publisher.MonoProcessor;
import tech.pegasys.artemis.ethereum.core.Hash32;
import tech.pegasys.artemis.util.bytes.Bytes32;
import tech.pegasys.artemis.util.bytes.Bytes48;
import tech.pegasys.artemis.util.bytes.Bytes8;
import tech.pegasys.artemis.util.bytes.Bytes96;
import tech.pegasys.artemis.util.bytes.BytesValue;
import tech.pegasys.artemis.util.collections.ReadVector;
import tech.pegasys.artemis.util.uint.UInt64;

public abstract class AbstractDepositContract implements DepositContract {
protected class DepositEventData {
public final byte[] deposit_root;
public final byte[] data;
public final byte[] merkle_tree_index;
public final byte[][] merkle_branch;

public DepositEventData(byte[] deposit_root, byte[] data, byte[] merkle_tree_index,
byte[][] merkle_branch) {
this.deposit_root = deposit_root;
this.data = data;
this.merkle_tree_index = merkle_tree_index;
this.merkle_branch = merkle_branch;
}
}

private final SSZSerializer ssz = new SSZBuilder().buildSerializer();

private long distanceFromHead;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class AbstractDepositContract implements DepositContract {
protected final Schedulers schedulers;
private final MonoProcessor<ChainStart> chainStartSink = MonoProcessor.create();
private final Publisher<ChainStart> chainStartStream;

private final SimpleProcessor<Deposit> depositStream;

private final MerkleTree<DepositData> tree;
private long distanceFromHead;
private List<Deposit> initialDeposits = new ArrayList<>();
private boolean startChainSubscribed;

public AbstractDepositContract(Schedulers schedulers) {
public AbstractDepositContract(
Schedulers schedulers, Function<BytesValue, Hash32> hashFunction, int treeDepth) {
this.schedulers = schedulers;

chainStartStream = chainStartSink
.publishOn(this.schedulers.reactorEvents())
.doOnSubscribe(s -> chainStartSubscribedPriv())
.name("PowClient.chainStart");
chainStartStream =
chainStartSink
.publishOn(this.schedulers.reactorEvents())
.doOnSubscribe(s -> chainStartSubscribedPriv())
.name("PowClient.chainStart");
depositStream = new SimpleProcessor<>(this.schedulers.reactorEvents(), "PowClient.deposit");
this.tree = new DepositBufferedMerkle(hashFunction, treeDepth, 1000);
}

protected synchronized void newDeposit(DepositEventData eventData, byte[] blockHash) {
if (startChainSubscribed && !chainStartSink.isTerminated()) {
DepositInfo depositInfo = createDepositInfo(eventData, blockHash);
initialDeposits.add(depositInfo.getDeposit());
depositStream.onNext(depositInfo.getDeposit());
/**
* Stores deposits data from invocation list eventDataList
*
* @param eventDataList All deposit events in blockHash
* @param blockHash Block hash
*/
protected synchronized void newDeposits(List<DepositEventData> eventDataList, byte[] blockHash) {
List<Deposit> deposits =
eventDataList.stream().map(this::newDeposit).collect(Collectors.toList());
if (deposits.isEmpty()) {
return;
}
int size = deposits.get(deposits.size() - 1).getIndex().increment().intValue();
for (Deposit deposit : deposits) {
Deposit depositProofed =
new Deposit(
ReadVector.wrap(tree.getProof(deposit.getIndex().intValue(), size), Integer::new),
deposit.getIndex(),
deposit.getData());
if (startChainSubscribed && !chainStartSink.isTerminated()) {
initialDeposits.add(depositProofed);
}
depositStream.onNext(depositProofed);
}
}

/**
* Same as {@link #newDeposits(List, byte[])} but doesn't store deposits data, instead expects its
* already stored
*/
private List<DepositInfo> restoreDeposits(List<DepositEventData> eventData, byte[] blockHash) {
List<Deposit> deposits =
eventData.stream().map(this::createUnProofedDeposit).collect(Collectors.toList());
if (deposits.isEmpty()) {
return Collections.emptyList();
}
int size = deposits.get(0).getIndex().plus(deposits.size()).intValue();
return deposits.stream()
.map(
deposit ->
new Deposit(
ReadVector.wrap(tree.getProof(deposit.getIndex().intValue(), size), Integer::new),
deposit.getIndex(),
deposit.getData()))
.map(
d ->
new DepositInfo(
d,
new Eth1Data(
tree.getRoot(d.getIndex().intValue()),
d.getIndex().decrement(),
Hash32.wrap(Bytes32.wrap(blockHash)))))
.collect(Collectors.toList());
}

protected synchronized void chainStart(byte[] deposit_root, byte[] time, byte[] blockHash) {
ChainStart chainStart = new ChainStart(
Time.castFrom(UInt64.fromBytesBigEndian(Bytes8.wrap(time))),
new Eth1Data(Hash32.wrap(Bytes32.wrap(deposit_root)),
UInt64.valueOf(initialDeposits.size()),
Hash32.wrap(Bytes32.wrap(blockHash))),
initialDeposits);
/**
* Inserts deposit in storage and returns it NOTE: returns Deposit with empty proof, proof should
* be filled by someone else
*
* @param eventData Deposit event
* @return Deposit
*/
private Deposit newDeposit(DepositEventData eventData) {
Deposit deposit = createUnProofedDeposit(eventData);
tree.addValue(deposit.getData());
return deposit;
}

public Hash32 getDepositRoot(byte[] merkleTreeIndex) {
UInt64 index = UInt64.fromBytesLittleEndian(Bytes8.wrap(merkleTreeIndex));
return tree.getRoot(index.intValue());
}

protected synchronized void chainStart(
byte[] deposit_root, byte[] deposit_count, byte[] time, byte[] blockHash) {
assert UInt64.fromBytesLittleEndian(Bytes8.wrap(deposit_count)).intValue()
== initialDeposits.size();
ChainStart chainStart =
new ChainStart(
Time.castFrom(UInt64.fromBytesLittleEndian(Bytes8.wrap(time))),
new Eth1Data(
Hash32.wrap(Bytes32.wrap(deposit_root)),
UInt64.valueOf(initialDeposits.size()),
Hash32.wrap(Bytes32.wrap(blockHash))),
initialDeposits);
chainStartSink.onNext(chainStart);
chainStartSink.onComplete();
chainStartDone();
Expand All @@ -108,27 +164,15 @@ public Publisher<Deposit> getDepositStream() {
return depositStream;
}

private DepositInfo createDepositInfo(DepositEventData eventData, byte[] blockHash) {
List<Hash32> merkleBranch = Arrays.stream(eventData.merkle_branch)
.map(bytes -> Hash32.wrap(Bytes32.wrap(bytes)))
.collect(Collectors.toList());
Deposit deposit = new Deposit(ReadVector.wrap(merkleBranch, Function.identity()),
UInt64.fromBytesBigEndian(Bytes8.wrap(eventData.merkle_tree_index)),
parseDepositData(eventData.data));
return new DepositInfo(deposit,
new Eth1Data(Hash32.wrap(Bytes32.wrap(eventData.deposit_root)),
UInt64.ZERO,
Hash32.wrap(Bytes32.wrap(blockHash))));
}

private DepositData parseDepositData(byte[] data) {
BLSPubkey pubkey = BLSPubkey.wrap(Bytes48.wrap(data, 0));
Hash32 withdrawalCredentials = Hash32.wrap(Bytes32.wrap(data, 48));
Gwei amount =
Gwei.castFrom(UInt64.fromBytesLittleEndian(Bytes8.wrap(data, Bytes48.SIZE + Bytes32.SIZE)));
BLSSignature signature =
BLSSignature.wrap(Bytes96.wrap(data, Bytes48.SIZE + Bytes32.SIZE + Bytes8.SIZE));
return new DepositData(pubkey, withdrawalCredentials, amount, signature);
private Deposit createUnProofedDeposit(DepositEventData eventData) {
UInt64 index = UInt64.fromBytesLittleEndian(Bytes8.wrap(eventData.merkleTreeIndex));
DepositData depositData =
new DepositData(
BLSPubkey.wrap(Bytes48.wrap(eventData.pubkey)),
Hash32.wrap(Bytes32.wrap(eventData.withdrawalCredentials)),
Gwei.castFrom(UInt64.fromBytesLittleEndian(Bytes8.wrap(eventData.amount))),
BLSSignature.wrap(Bytes96.wrap(eventData.signature)));
return new Deposit(ReadVector.wrap(Collections.emptyList(), Integer::new), index, depositData);
}

@Override
Expand All @@ -140,39 +184,62 @@ public boolean hasDepositRoot(Hash32 blockHash, Hash32 depositRoot) {

@Override
public Optional<Eth1Data> getLatestEth1Data() {
return getLatestBlockHashDepositRoot().map(
r -> new Eth1Data(
Hash32.wrap(Bytes32.wrap(r.getValue1())),
UInt64.ZERO,
Hash32.wrap(Bytes32.wrap(r.getValue0()))));
return getLatestBlockHashDepositRoot()
.map(
r ->
new Eth1Data(
Hash32.wrap(Bytes32.wrap(r.getValue0())),
UInt64.valueOf(r.getValue1()),
Hash32.wrap(Bytes32.wrap(r.getValue2()))));
}

protected abstract Optional<Pair<byte[], byte[]>> getLatestBlockHashDepositRoot();
protected abstract Optional<Triplet<byte[], Integer, byte[]>> getLatestBlockHashDepositRoot();

@Override
public List<DepositInfo> peekDeposits(int count, Eth1Data fromDepositExclusive,
Eth1Data tillDepositInclusive) {
return peekDepositsImpl(count,
fromDepositExclusive.getBlockHash().extractArray(),
fromDepositExclusive.getDepositRoot().extractArray(),
tillDepositInclusive.getBlockHash().extractArray(),
tillDepositInclusive.getDepositRoot().extractArray())
public List<DepositInfo> peekDeposits(
int count, Eth1Data fromDepositExclusive, Eth1Data tillDepositInclusive) {
return peekDepositsImpl(
count,
fromDepositExclusive.getBlockHash().extractArray(),
tillDepositInclusive.getBlockHash().extractArray())
.stream()
.map(blockDepositPair -> createDepositInfo(blockDepositPair.getValue1(), blockDepositPair.getValue0()))
.map(
blockDepositPair ->
restoreDeposits(blockDepositPair.getValue1(), blockDepositPair.getValue0()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

protected abstract List<Pair<byte[], DepositEventData>> peekDepositsImpl(
int count,
byte[] startBlockHash, byte[] startDepositRoot,
byte[] endBlockHash, byte[] endDepositRoot);
protected abstract List<Pair<byte[], List<DepositEventData>>> peekDepositsImpl(
int count, byte[] startBlockHash, byte[] endBlockHash);

protected long getDistanceFromHead() {
return distanceFromHead;
}

@Override
public void setDistanceFromHead(long distanceFromHead) {
this.distanceFromHead = distanceFromHead;
}

protected long getDistanceFromHead() {
return distanceFromHead;
protected class DepositEventData {
public final byte[] pubkey;
public final byte[] withdrawalCredentials;
public final byte[] amount;
public final byte[] signature;
public final byte[] merkleTreeIndex;

public DepositEventData(
byte[] pubkey,
byte[] withdrawalCredentials,
byte[] amount,
byte[] signature,
byte[] merkleTreeIndex) {
this.pubkey = pubkey;
this.withdrawalCredentials = withdrawalCredentials;
this.amount = amount;
this.signature = signature;
this.merkleTreeIndex = merkleTreeIndex;
}
}
}
}
Loading