Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit f8ee0e1

Browse files
sorpaasbkchrdvdplm
committed
pow: replace the thread-base mining loop with a future-based mining worker (#7060)
* New worker design * Remove unused thread import * Add back missing inherent data provider registration * Add function to get a Cloned metadata * Add some docs * Derive Eq and PartialEq for MiningMetadata * Fix cargo lock * Fix line width * Add docs and fix issues in UntilImportedOrTimeout * Update client/consensus/pow/src/lib.rs Co-authored-by: David <dvdplm@gmail.com> * Add back comments Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: David <dvdplm@gmail.com>
1 parent b7a3b2b commit f8ee0e1

File tree

4 files changed

+358
-168
lines changed

4 files changed

+358
-168
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/consensus/pow/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ sp-consensus-pow = { version = "0.8.0-rc6", path = "../../../primitives/consensu
2424
sp-consensus = { version = "0.8.0-rc6", path = "../../../primitives/consensus/common" }
2525
log = "0.4.8"
2626
futures = { version = "0.3.1", features = ["compat"] }
27+
futures-timer = "3.0.1"
28+
parking_lot = "0.10.0"
2729
sp-timestamp = { version = "2.0.0-rc6", path = "../../../primitives/timestamp" }
2830
derive_more = "0.99.2"
2931
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc6"}

client/consensus/pow/src/lib.rs

Lines changed: 141 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,17 @@
3131
//! as the storage, but it is not recommended as it won't work well with light
3232
//! clients.
3333
34-
use std::sync::Arc;
35-
use std::any::Any;
36-
use std::borrow::Cow;
37-
use std::thread;
38-
use std::collections::HashMap;
39-
use std::marker::PhantomData;
40-
use std::cmp::Ordering;
41-
use sc_client_api::{BlockOf, backend::AuxStore};
34+
mod worker;
35+
36+
pub use crate::worker::{MiningWorker, MiningMetadata, MiningBuild};
37+
38+
use std::{
39+
sync::Arc, any::Any, borrow::Cow, collections::HashMap, marker::PhantomData,
40+
cmp::Ordering, time::Duration,
41+
};
42+
use futures::{prelude::*, future::Either};
43+
use parking_lot::Mutex;
44+
use sc_client_api::{BlockOf, backend::AuxStore, BlockchainEvents};
4245
use sp_blockchain::{HeaderBackend, ProvideCache, well_known_cache_keys::Id as CacheKeyId};
4346
use sp_block_builder::BlockBuilder as BlockBuilderApi;
4447
use sp_runtime::{Justification, RuntimeString};
@@ -61,6 +64,8 @@ use sc_client_api;
6164
use log::*;
6265
use sp_timestamp::{InherentError as TIError, TimestampInherentData};
6366

67+
use crate::worker::UntilImportedOrTimeout;
68+
6469
#[derive(derive_more::Display, Debug)]
6570
pub enum Error<B: BlockT> {
6671
#[display(fmt = "Header uses the wrong engine {:?}", _0)]
@@ -193,15 +198,6 @@ pub trait PowAlgorithm<B: BlockT> {
193198
seal: &Seal,
194199
difficulty: Self::Difficulty,
195200
) -> Result<bool, Error<B>>;
196-
/// Mine a seal that satisfies the given difficulty.
197-
fn mine(
198-
&self,
199-
parent: &BlockId<B>,
200-
pre_hash: &B::Hash,
201-
pre_digest: Option<&[u8]>,
202-
difficulty: Self::Difficulty,
203-
round: u32,
204-
) -> Result<Option<Seal>, Error<B>>;
205201
}
206202

207203
/// A block importer for PoW.
@@ -534,194 +530,171 @@ pub fn import_queue<B, Transaction, Algorithm>(
534530
))
535531
}
536532

537-
/// Start the background mining thread for PoW. Note that because PoW mining
538-
/// is CPU-intensive, it is not possible to use an async future to define this.
539-
/// However, it's not recommended to use background threads in the rest of the
540-
/// codebase.
533+
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
534+
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
535+
///
536+
/// Two values are returned -- a worker, which contains functions that allows querying the current
537+
/// mining metadata and submitting mined blocks, and a future, which must be polled to fill in
538+
/// information in the worker.
541539
///
542-
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime
543-
/// digest to be inserted for blocks being built. This can encode authorship
544-
/// information, or just be a graffiti. `round` is for number of rounds the
545-
/// CPU miner runs each time. This parameter should be tweaked so that each
546-
/// mining round is within sub-second time.
547-
pub fn start_mine<B: BlockT, C, Algorithm, E, SO, S, CAW>(
548-
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
540+
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
541+
/// for blocks being built. This can encode authorship information, or just be a graffiti.
542+
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, CAW>(
543+
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
549544
client: Arc<C>,
545+
select_chain: S,
550546
algorithm: Algorithm,
551547
mut env: E,
552-
pre_runtime: Option<Vec<u8>>,
553-
round: u32,
554548
mut sync_oracle: SO,
555-
build_time: std::time::Duration,
556-
select_chain: Option<S>,
549+
pre_runtime: Option<Vec<u8>>,
557550
inherent_data_providers: sp_inherents::InherentDataProviders,
551+
timeout: Duration,
552+
build_time: Duration,
558553
can_author_with: CAW,
559-
) where
560-
C: HeaderBackend<B> + AuxStore + ProvideRuntimeApi<B> + 'static,
561-
Algorithm: PowAlgorithm<B> + Send + Sync + 'static,
562-
E: Environment<B> + Send + Sync + 'static,
554+
) -> (Arc<Mutex<MiningWorker<Block, Algorithm, C>>>, impl Future<Output = ()>) where
555+
Block: BlockT,
556+
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
557+
S: SelectChain<Block> + 'static,
558+
Algorithm: PowAlgorithm<Block> + Clone,
559+
Algorithm::Difficulty: 'static,
560+
E: Environment<Block> + Send + Sync + 'static,
563561
E::Error: std::fmt::Debug,
564-
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
565-
SO: SyncOracle + Send + Sync + 'static,
566-
S: SelectChain<B> + 'static,
567-
CAW: CanAuthorWith<B> + Send + 'static,
562+
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
563+
SO: SyncOracle + Clone + Send + Sync + 'static,
564+
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
568565
{
569566
if let Err(_) = register_pow_inherent_data_provider(&inherent_data_providers) {
570567
warn!("Registering inherent data provider for timestamp failed");
571568
}
572569

573-
thread::spawn(move || {
574-
loop {
575-
match mine_loop(
576-
&mut block_import,
577-
client.as_ref(),
578-
&algorithm,
579-
&mut env,
580-
pre_runtime.as_ref(),
581-
round,
582-
&mut sync_oracle,
583-
build_time.clone(),
584-
select_chain.as_ref(),
585-
&inherent_data_providers,
586-
&can_author_with,
587-
) {
588-
Ok(()) => (),
589-
Err(e) => error!(
590-
"Mining block failed with {:?}. Sleep for 1 second before restarting...",
591-
e
592-
),
593-
}
594-
std::thread::sleep(std::time::Duration::new(1, 0));
595-
}
596-
});
597-
}
570+
let timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
571+
let worker = Arc::new(Mutex::new(MiningWorker::<Block, Algorithm, C> {
572+
build: None,
573+
algorithm: algorithm.clone(),
574+
block_import,
575+
}));
576+
let worker_ret = worker.clone();
577+
578+
let task = timer.for_each(move |()| {
579+
let worker = worker.clone();
598580

599-
fn mine_loop<B: BlockT, C, Algorithm, E, SO, S, CAW>(
600-
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
601-
client: &C,
602-
algorithm: &Algorithm,
603-
env: &mut E,
604-
pre_runtime: Option<&Vec<u8>>,
605-
round: u32,
606-
sync_oracle: &mut SO,
607-
build_time: std::time::Duration,
608-
select_chain: Option<&S>,
609-
inherent_data_providers: &sp_inherents::InherentDataProviders,
610-
can_author_with: &CAW,
611-
) -> Result<(), Error<B>> where
612-
C: HeaderBackend<B> + AuxStore + ProvideRuntimeApi<B>,
613-
Algorithm: PowAlgorithm<B>,
614-
Algorithm::Difficulty: 'static,
615-
E: Environment<B>,
616-
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
617-
E::Error: std::fmt::Debug,
618-
SO: SyncOracle,
619-
S: SelectChain<B>,
620-
sp_api::TransactionFor<C, B>: 'static,
621-
CAW: CanAuthorWith<B>,
622-
{
623-
'outer: loop {
624581
if sync_oracle.is_major_syncing() {
625582
debug!(target: "pow", "Skipping proposal due to sync.");
626-
std::thread::sleep(std::time::Duration::new(1, 0));
627-
continue 'outer
583+
worker.lock().on_major_syncing();
584+
return Either::Left(future::ready(()))
628585
}
629586

630-
let (best_hash, best_header) = match select_chain {
631-
Some(select_chain) => {
632-
let header = select_chain.best_chain()
633-
.map_err(Error::BestHeaderSelectChain)?;
634-
let hash = header.hash();
635-
(hash, header)
636-
},
637-
None => {
638-
let hash = client.info().best_hash;
639-
let header = client.header(BlockId::Hash(hash))
640-
.map_err(Error::BestHeader)?
641-
.ok_or(Error::NoBestHeader)?;
642-
(hash, header)
587+
let best_header = match select_chain.best_chain() {
588+
Ok(x) => x,
589+
Err(err) => {
590+
warn!(
591+
target: "pow",
592+
"Unable to pull new block for authoring. \
593+
Select best chain error: {:?}",
594+
err
595+
);
596+
return Either::Left(future::ready(()))
643597
},
644598
};
599+
let best_hash = best_header.hash();
645600

646601
if let Err(err) = can_author_with.can_author_with(&BlockId::Hash(best_hash)) {
647602
warn!(
648603
target: "pow",
649604
"Skipping proposal `can_author_with` returned: {} \
650-
Probably a node update is required!",
605+
Probably a node update is required!",
651606
err,
652607
);
653-
std::thread::sleep(std::time::Duration::from_secs(1));
654-
continue 'outer
608+
return Either::Left(future::ready(()))
655609
}
656610

657-
let proposer = futures::executor::block_on(env.init(&best_header))
658-
.map_err(|e| Error::Environment(format!("{:?}", e)))?;
659-
660-
let inherent_data = inherent_data_providers
661-
.create_inherent_data().map_err(Error::CreateInherents)?;
662-
let mut inherent_digest = Digest::default();
663-
if let Some(pre_runtime) = &pre_runtime {
664-
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
611+
if worker.lock().best_hash() == Some(best_hash) {
612+
return Either::Left(future::ready(()))
665613
}
666-
let proposal = futures::executor::block_on(proposer.propose(
667-
inherent_data,
668-
inherent_digest,
669-
build_time.clone(),
670-
RecordProof::No,
671-
)).map_err(|e| Error::BlockProposingError(format!("{:?}", e)))?;
672-
673-
let (header, body) = proposal.block.deconstruct();
674-
let (difficulty, seal) = {
675-
let difficulty = algorithm.difficulty(best_hash)?;
676-
677-
loop {
678-
let seal = algorithm.mine(
679-
&BlockId::Hash(best_hash),
680-
&header.hash(),
681-
pre_runtime.map(|v| &v[..]),
682-
difficulty,
683-
round,
684-
)?;
685-
686-
if let Some(seal) = seal {
687-
break (difficulty, seal)
688-
}
689614

690-
if best_hash != client.info().best_hash {
691-
continue 'outer
692-
}
693-
}
615+
// The worker is locked for the duration of the whole proposing period. Within this period,
616+
// the mining target is outdated and useless anyway.
617+
618+
let difficulty = match algorithm.difficulty(best_hash) {
619+
Ok(x) => x,
620+
Err(err) => {
621+
warn!(
622+
target: "pow",
623+
"Unable to propose new block for authoring. \
624+
Fetch difficulty failed: {:?}",
625+
err,
626+
);
627+
return Either::Left(future::ready(()))
628+
},
694629
};
695630

696-
log::info!("✅ Successfully mined block: {}", best_hash);
697-
698-
let (hash, seal) = {
699-
let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
700-
let mut header = header.clone();
701-
header.digest_mut().push(seal);
702-
let hash = header.hash();
703-
let seal = header.digest_mut().pop()
704-
.expect("Pushed one seal above; length greater than zero; qed");
705-
(hash, seal)
631+
let awaiting_proposer = env.init(&best_header);
632+
let inherent_data = match inherent_data_providers.create_inherent_data() {
633+
Ok(x) => x,
634+
Err(err) => {
635+
warn!(
636+
target: "pow",
637+
"Unable to propose new block for authoring. \
638+
Creating inherent data failed: {:?}",
639+
err,
640+
);
641+
return Either::Left(future::ready(()))
642+
},
706643
};
644+
let mut inherent_digest = Digest::<Block::Hash>::default();
645+
if let Some(pre_runtime) = &pre_runtime {
646+
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
647+
}
707648

708-
let intermediate = PowIntermediate::<Algorithm::Difficulty> {
709-
difficulty: Some(difficulty),
710-
};
649+
let pre_runtime = pre_runtime.clone();
650+
651+
Either::Right(async move {
652+
let proposer = match awaiting_proposer.await {
653+
Ok(x) => x,
654+
Err(err) => {
655+
warn!(
656+
target: "pow",
657+
"Unable to propose new block for authoring. \
658+
Creating proposer failed: {:?}",
659+
err,
660+
);
661+
return
662+
},
663+
};
664+
665+
let proposal = match proposer.propose(
666+
inherent_data,
667+
inherent_digest,
668+
build_time.clone(),
669+
RecordProof::No,
670+
).await {
671+
Ok(x) => x,
672+
Err(err) => {
673+
warn!(
674+
target: "pow",
675+
"Unable to propose new block for authoring. \
676+
Creating proposal failed: {:?}",
677+
err,
678+
);
679+
return
680+
},
681+
};
682+
683+
let build = MiningBuild::<Block, Algorithm, C> {
684+
metadata: MiningMetadata {
685+
best_hash,
686+
pre_hash: proposal.block.header().hash(),
687+
pre_runtime: pre_runtime.clone(),
688+
difficulty,
689+
},
690+
proposal,
691+
};
711692

712-
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
713-
import_block.post_digests.push(seal);
714-
import_block.body = Some(body);
715-
import_block.storage_changes = Some(proposal.storage_changes);
716-
import_block.intermediates.insert(
717-
Cow::from(INTERMEDIATE_KEY),
718-
Box::new(intermediate) as Box<dyn Any>
719-
);
720-
import_block.post_hash = Some(hash);
693+
worker.lock().on_build(build);
694+
})
695+
});
721696

722-
block_import.import_block(import_block, HashMap::default())
723-
.map_err(|e| Error::BlockBuiltError(best_hash, e))?;
724-
}
697+
(worker_ret, task)
725698
}
726699

727700
/// Find PoW pre-runtime.

0 commit comments

Comments
 (0)