Skip to content
This repository was archived by the owner on Oct 17, 2022. It is now read-only.

Commit c59d202

Browse files
[refactor] replace the certificate_store with a wrapper struct (#825)
* first iteration * fix after rebase * update mysten-infra to use the new additions in the typed-store reverse iterator * fix workerspace-hack issue * fix: remove some double iterations from the read_all, delete_all operations * fix: improve the efficiency of `after_round` * fix: improve consensus comment Co-authored-by: François Garillot <francois@garillot.net>
1 parent 65ef540 commit c59d202

38 files changed

+882
-209
lines changed

Cargo.lock

Lines changed: 27 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[workspace]
2-
members = ["config", "consensus", "crypto", "dag", "examples", "executor", "network", "node", "primary", "test_utils", "types", "worker", "workspace-hack"]
2+
members = ["config", "consensus", "crypto", "dag", "examples", "executor", "network", "node", "primary", "storage", "test_utils", "types", "worker", "workspace-hack"]
33

44
[profile.release]
55
codegen-units = 1

consensus/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ bytes = "1.2.1"
1313
match_opt = "0.1.2"
1414
rand = { version = "0.8.5", optional = true }
1515
serde = { version = "1.0.144", features = ["derive"] }
16-
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
16+
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }
1717
thiserror = "1.0.32"
1818
tokio = { version = "1.20.1", features = ["sync"] }
1919
tracing = "0.1.36"
2020

2121
config = { path = "../config" }
2222
fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "c022a2ae23ca7cc2778293fd3b1db42e8cd02d3b" }
2323
crypto = { path = "../crypto" }
24+
storage = { path = "../storage" }
2425
dag = { path = "../dag" }
2526
prometheus = "0.13.1"
2627
types = { path = "../types" }

consensus/src/consensus.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
collections::HashMap,
1111
sync::Arc,
1212
};
13-
use store::Store;
13+
use storage::CertificateStore;
1414
use tokio::{sync::watch, task::JoinHandle};
1515
use tracing::{info, instrument};
1616
use types::{
@@ -61,7 +61,7 @@ impl ConsensusState {
6161
genesis: Vec<Certificate>,
6262
metrics: Arc<ConsensusMetrics>,
6363
recover_last_committed: HashMap<PublicKey, Round>,
64-
cert_store: Store<CertificateDigest, Certificate>,
64+
cert_store: CertificateStore,
6565
gc_depth: Round,
6666
) -> Self {
6767
let last_committed_round = *recover_last_committed
@@ -88,7 +88,7 @@ impl ConsensusState {
8888

8989
#[instrument(level = "info", skip_all)]
9090
pub async fn construct_dag_from_cert_store(
91-
cert_store: Store<CertificateDigest, Certificate>,
91+
cert_store: CertificateStore,
9292
last_committed_round: Round,
9393
gc_depth: Round,
9494
) -> Dag {
@@ -99,14 +99,11 @@ impl ConsensusState {
9999
);
100100

101101
let min_round = last_committed_round.saturating_sub(gc_depth);
102-
let cert_map = cert_store
103-
.iter(Some(Box::new(move |(_dig, cert)| {
104-
cert.header.round > min_round
105-
})))
106-
.await;
102+
// get all certificates at a round > min_round
103+
let cert_map = cert_store.after_round(min_round + 1).unwrap();
107104

108105
let num_certs = cert_map.len();
109-
for (digest, cert) in cert_map {
106+
for (digest, cert) in cert_map.into_iter().map(|c| (c.digest(), c)) {
110107
let inner = dag.get_mut(&cert.header.round);
111108
match inner {
112109
Some(m) => {
@@ -205,7 +202,7 @@ where
205202
pub fn spawn(
206203
committee: Committee,
207204
store: Arc<ConsensusStore>,
208-
cert_store: Store<CertificateDigest, Certificate>,
205+
cert_store: CertificateStore,
209206
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
210207
rx_primary: metered_channel::Receiver<Certificate>,
211208
tx_primary: metered_channel::Sender<Certificate>,
@@ -248,7 +245,7 @@ where
248245
async fn run(
249246
&mut self,
250247
recover_last_committed: HashMap<PublicKey, Round>,
251-
cert_store: Store<CertificateDigest, Certificate>,
248+
cert_store: CertificateStore,
252249
gc_depth: Round,
253250
) -> StoreResult<()> {
254251
// The consensus state (everything else is immutable).

consensus/src/tests/bullshark_tests.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use fastcrypto::traits::KeyPair;
1010
use prometheus::Registry;
1111
#[cfg(test)]
1212
use std::collections::{BTreeSet, VecDeque};
13+
use storage::CertificateStore;
1314
use store::{reopen, rocks, rocks::DBMap};
1415
use test_utils::mock_committee;
1516
#[allow(unused_imports)]
@@ -32,19 +33,23 @@ pub fn make_consensus_store(store_path: &std::path::Path) -> Arc<ConsensusStore>
3233
Arc::new(ConsensusStore::new(last_committed_map, sequence_map))
3334
}
3435

35-
pub fn make_certificate_store(
36-
store_path: &std::path::Path,
37-
) -> store::Store<CertificateDigest, Certificate> {
36+
pub fn make_certificate_store(store_path: &std::path::Path) -> CertificateStore {
3837
const CERTIFICATES_CF: &str = "certificates";
39-
40-
let rocksdb =
41-
rocks::open_cf(store_path, None, &[CERTIFICATES_CF]).expect("Failed creating database");
42-
43-
let certificate_map = reopen!(&rocksdb,
44-
CERTIFICATES_CF;<CertificateDigest, Certificate>
38+
const CERTIFICATE_ID_BY_ROUND_CF: &str = "certificate_id_by_round";
39+
40+
let rocksdb = rocks::open_cf(
41+
store_path,
42+
None,
43+
&[CERTIFICATES_CF, CERTIFICATE_ID_BY_ROUND_CF],
44+
)
45+
.expect("Failed creating database");
46+
47+
let (certificate_map, certificate_id_by_round_map) = reopen!(&rocksdb,
48+
CERTIFICATES_CF;<CertificateDigest, Certificate>,
49+
CERTIFICATE_ID_BY_ROUND_CF;<(Round, CertificateDigest), u8>
4550
);
4651

47-
store::Store::new(certificate_map)
52+
CertificateStore::new(certificate_map, certificate_id_by_round_map)
4853
}
4954

5055
// Run for 4 dag rounds in ideal conditions (all nodes reference all other nodes). We should commit

consensus/src/tests/tusk_tests.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use fastcrypto::traits::KeyPair;
1010
use prometheus::Registry;
1111
#[cfg(test)]
1212
use std::collections::{BTreeSet, VecDeque};
13+
use storage::CertificateStore;
1314
use store::{reopen, rocks, rocks::DBMap};
1415
use test_utils::mock_committee;
1516
#[allow(unused_imports)]
@@ -32,19 +33,23 @@ pub fn make_consensus_store(store_path: &std::path::Path) -> Arc<ConsensusStore>
3233
Arc::new(ConsensusStore::new(last_committed_map, sequence_map))
3334
}
3435

35-
pub fn make_certificate_store(
36-
store_path: &std::path::Path,
37-
) -> store::Store<CertificateDigest, Certificate> {
36+
pub fn make_certificate_store(store_path: &std::path::Path) -> CertificateStore {
3837
const CERTIFICATES_CF: &str = "certificates";
39-
40-
let rocksdb =
41-
rocks::open_cf(store_path, None, &[CERTIFICATES_CF]).expect("Failed creating database");
42-
43-
let certificate_map = reopen!(&rocksdb,
44-
CERTIFICATES_CF;<CertificateDigest, Certificate>
38+
const CERTIFICATE_ID_BY_ROUND_CF: &str = "certificate_id_by_round";
39+
40+
let rocksdb = rocks::open_cf(
41+
store_path,
42+
None,
43+
&[CERTIFICATES_CF, CERTIFICATE_ID_BY_ROUND_CF],
44+
)
45+
.expect("Failed creating database");
46+
47+
let (certificate_map, certificate_id_by_round_map) = reopen!(&rocksdb,
48+
CERTIFICATES_CF;<CertificateDigest, Certificate>,
49+
CERTIFICATE_ID_BY_ROUND_CF;<(Round, CertificateDigest), u8>
4550
);
4651

47-
store::Store::new(certificate_map)
52+
CertificateStore::new(certificate_map, certificate_id_by_round_map)
4853
}
4954

5055
// Run for 4 dag rounds in ideal conditions (all nodes reference all other nodes). We should commit

executor/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ prometheus = "0.13.1"
2626
backoff = { version = "0.4.0", features = ["tokio"] }
2727

2828
types = { path = "../types" }
29-
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
29+
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }
3030

31-
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
31+
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }
3232
workspace-hack = { version = "0.1", path = "../workspace-hack" }
3333
match_opt = "0.1.2"
3434

@@ -39,4 +39,4 @@ tempfile = "3.3.0"
3939
primary = { path = "../primary" }
4040
test_utils = { path = "../test_utils" }
4141
types = { path = "../types" }
42-
telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
42+
telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }

network/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ tonic = { version = "0.7.2", features = ["tls"] }
2222
tracing = "0.1.36"
2323
types = { path = "../types" }
2424

25-
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
2625
serde = "1.0.144"
26+
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }
2727
workspace-hack = { version = "0.1", path = "../workspace-hack" }
2828
eyre = "0.6.8"
2929

node/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ clap = "2.34"
1515
dhat = { version = "0.3.0", optional = true }
1616
futures = "0.3.23"
1717
multiaddr = "0.14.0"
18-
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
18+
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }
1919
rand = "0.8.5"
20-
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
21-
telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "8d090689be14078f2ca41c356e7bbc0af21f73ab" }
20+
store = { git = "https://github.com/mystenlabs/mysten-infra.git", package = "typed-store", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }
21+
telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "696fa25982acdc87ab64ec6a33cb111edc63a331" }
2222
thiserror = "1.0.32"
2323
tokio = { version = "1.20.1", features = ["full"] }
2424
tokio-stream = "0.1.9"
@@ -37,6 +37,7 @@ executor = { path = "../executor" }
3737
network = { path = "../network" }
3838
primary = { path = "../primary" }
3939
prometheus = "0.13.1"
40+
storage = { path = "../storage" }
4041
types = { path = "../types" }
4142
worker = { path = "../worker" }
4243
workspace-hack = { version = "0.1", path = "../workspace-hack" }

node/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use fastcrypto::traits::{KeyPair as _, VerifyingKey};
1414
use primary::{BlockCommand, NetworkModel, PayloadToken, Primary, PrimaryChannelMetrics};
1515
use prometheus::{IntGauge, Registry};
1616
use std::{fmt::Debug, sync::Arc};
17+
use storage::{CertificateStore, CertificateToken};
1718
use store::{
1819
reopen,
1920
rocks::{open_cf, DBMap},
@@ -34,7 +35,7 @@ pub mod restarter;
3435
/// All the data stores of the node.
3536
pub struct NodeStorage {
3637
pub header_store: Store<HeaderDigest, Header>,
37-
pub certificate_store: Store<CertificateDigest, Certificate>,
38+
pub certificate_store: CertificateStore,
3839
pub payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
3940
pub batch_store: Store<BatchDigest, SerializedBatchMessage>,
4041
pub consensus_store: Arc<ConsensusStore>,
@@ -45,6 +46,7 @@ impl NodeStorage {
4546
/// The datastore column family names.
4647
const HEADERS_CF: &'static str = "headers";
4748
const CERTIFICATES_CF: &'static str = "certificates";
49+
const CERTIFICATE_ID_BY_ROUND_CF: &'static str = "certificate_id_by_round";
4850
const PAYLOAD_CF: &'static str = "payload";
4951
const BATCHES_CF: &'static str = "batches";
5052
const LAST_COMMITTED_CF: &'static str = "last_committed";
@@ -59,6 +61,7 @@ impl NodeStorage {
5961
&[
6062
Self::HEADERS_CF,
6163
Self::CERTIFICATES_CF,
64+
Self::CERTIFICATE_ID_BY_ROUND_CF,
6265
Self::PAYLOAD_CF,
6366
Self::BATCHES_CF,
6467
Self::LAST_COMMITTED_CF,
@@ -71,6 +74,7 @@ impl NodeStorage {
7174
let (
7275
header_map,
7376
certificate_map,
77+
certificate_id_by_round_map,
7478
payload_map,
7579
batch_map,
7680
last_committed_map,
@@ -79,6 +83,7 @@ impl NodeStorage {
7983
) = reopen!(&rocksdb,
8084
Self::HEADERS_CF;<HeaderDigest, Header>,
8185
Self::CERTIFICATES_CF;<CertificateDigest, Certificate>,
86+
Self::CERTIFICATE_ID_BY_ROUND_CF;<(Round, CertificateDigest), CertificateToken>,
8287
Self::PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>,
8388
Self::BATCHES_CF;<BatchDigest, SerializedBatchMessage>,
8489
Self::LAST_COMMITTED_CF;<PublicKey, Round>,
@@ -87,7 +92,7 @@ impl NodeStorage {
8792
);
8893

8994
let header_store = Store::new(header_map);
90-
let certificate_store = Store::new(certificate_map);
95+
let certificate_store = CertificateStore::new(certificate_map, certificate_id_by_round_map);
9196
let payload_store = Store::new(payload_map);
9297
let batch_store = Store::new(batch_map);
9398
let consensus_store = Arc::new(ConsensusStore::new(last_committed_map, sequence_map));

0 commit comments

Comments
 (0)