Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ sc-transaction-graph = { version = "2.0.0-alpha.5", path = "./graph" }
sp-transaction-pool = { version = "2.0.0-alpha.5", path = "../../primitives/transaction-pool" }
sc-client-api = { version = "2.0.0-alpha.5", path = "../api" }
sp-blockchain = { version = "2.0.0-alpha.5", path = "../../primitives/blockchain" }
futures-timer = "2.0"
intervalier = "0.3"
Comment thread
bkchr marked this conversation as resolved.
parity-util-mem = { version = "0.6.0", default-features = false, features = ["primitive-types"] }

[dev-dependencies]
Expand Down
23 changes: 22 additions & 1 deletion client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};

use std::{collections::HashMap, sync::Arc, pin::Pin};
use futures::{Future, FutureExt, future::ready, channel::oneshot};
use futures::{prelude::*, future::ready, channel::oneshot};
use parking_lot::Mutex;

use sp_runtime::{
Expand Down Expand Up @@ -151,6 +151,27 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
}

/// Create new basic transaction pool with provided api, for tests.
#[cfg(test)]
pub fn new_test(
pool_api: Arc<PoolApi>,
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>, intervalier::BackSignalControl) {
let pool = Arc::new(sc_transaction_graph::Pool::new(Default::default(), pool_api.clone()));
let (revalidation_queue, background_task, notifier) =
revalidation::RevalidationQueue::new_test(pool_api.clone(), pool.clone());
(
BasicPool {
api: pool_api,
pool,
revalidation_queue: Arc::new(revalidation_queue),
revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
ready_poll: Default::default(),
},
background_task,
notifier,
)
}

/// Create new basic transaction pool with provided api and custom
/// revalidation type.
pub fn with_revalidation_type(
Expand Down
47 changes: 33 additions & 14 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use sp_runtime::traits::{Zero, SaturatedConversion};
use sp_runtime::generic::BlockId;
use sp_runtime::transaction_validity::TransactionValidityError;

use futures::{prelude::*, channel::mpsc, stream::unfold};
use futures::{prelude::*, channel::mpsc};
use std::time::Duration;
use futures_timer::Delay;

#[cfg(not(test))]
const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(200);
Expand Down Expand Up @@ -53,12 +52,6 @@ struct RevalidationWorker<Api: ChainApi> {

impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}

fn interval(duration: Duration) -> impl Stream<Item=()> + Unpin {
unfold((), move |_| {
Delay::new(duration).map(|_| Some(((), ())))
}).map(drop)
}

/// Revalidate batch of transaction.
///
/// Each transaction is validated against chain, and invalid are
Expand Down Expand Up @@ -207,8 +200,13 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
/// It does two things: periodically tries to process some transactions
/// from the queue and also accepts messages to enqueue some more
/// transactions from the pool.
pub async fn run(mut self, from_queue: mpsc::UnboundedReceiver<WorkerPayload<Api>>) {
let interval = interval(BACKGROUND_REVALIDATION_INTERVAL).fuse();
pub async fn run<R: intervalier::IntoStream>(
mut self,
from_queue: mpsc::UnboundedReceiver<WorkerPayload<Api>>,
interval: R,
) where R: Send, R::Guard: Send
{
let interval = interval.into_stream().fuse();
let from_queue = from_queue.fuse();
futures::pin_mut!(interval, from_queue);
let this = &mut self;
Expand Down Expand Up @@ -270,9 +268,12 @@ where
}
}

/// New revalidation queue with background worker.
pub fn new_background(api: Arc<Api>, pool: Arc<Pool<Api>>) ->
(Self, Pin<Box<dyn Future<Output=()> + Send>>)
pub fn new_with_interval<R: intervalier::IntoStream>(
api: Arc<Api>,
pool: Arc<Pool<Api>>,
interval: R,
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>)
where R: Send + 'static, R::Guard: Send
{
let (to_worker, from_queue) = mpsc::unbounded();

Expand All @@ -285,7 +286,25 @@ where
background: Some(to_worker),
};

(queue, worker.run(from_queue).boxed())
(queue, worker.run(from_queue, interval).boxed())
}

/// New revalidation queue with background worker.
pub fn new_background(api: Arc<Api>, pool: Arc<Pool<Api>>) ->
(Self, Pin<Box<dyn Future<Output=()> + Send>>)
{
Self::new_with_interval(api, pool, intervalier::Interval::new(BACKGROUND_REVALIDATION_INTERVAL))
}

/// New revalidation queue with background worker and test signal.
#[cfg(test)]
pub fn new_test(api: Arc<Api>, pool: Arc<Pool<Api>>) ->
(Self, Pin<Box<dyn Future<Output=()> + Send>>, intervalier::BackSignalControl)
{
let (interval, notifier) = intervalier::BackSignalInterval::new(BACKGROUND_REVALIDATION_INTERVAL);
let (queue, background) = Self::new_with_interval(api, pool, interval);

(queue, background, notifier)
}

/// Queue some transaction for later revalidation.
Expand Down
74 changes: 39 additions & 35 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,25 @@ use substrate_test_runtime_client::{
AccountKeyring::*,
};
use substrate_test_runtime_transaction_pool::{TestApi, uxt};
use crate::revalidation::BACKGROUND_REVALIDATION_INTERVAL;
use futures::task::Poll;
use futures::{prelude::*, task::Poll};
use codec::Encode;

fn pool() -> Pool<TestApi> {
Pool::new(Default::default(), TestApi::with_alice_nonce(209).into())
}

fn maintained_pool() -> (BasicPool<TestApi, Block>, futures::executor::ThreadPool) {
let (pool, background_task) = BasicPool::new(Default::default(), std::sync::Arc::new(TestApi::with_alice_nonce(209)));
fn maintained_pool() -> (
BasicPool<TestApi, Block>,
futures::executor::ThreadPool,
intervalier::BackSignalControl,
) {
let (pool, background_task, notifier) = BasicPool::new_test(
std::sync::Arc::new(TestApi::with_alice_nonce(209))
);

let thread_pool = futures::executor::ThreadPool::new().unwrap();
thread_pool.spawn_ok(background_task.expect("basic pool have background task"));
(pool, thread_pool)
thread_pool.spawn_ok(background_task);
(pool, thread_pool, notifier)
}

fn header(number: u64) -> Header {
Expand Down Expand Up @@ -190,7 +195,7 @@ fn block_event_with_retracted(id: u64, retracted: Vec<Hash>) -> ChainEvent<Block
fn should_prune_old_during_maintenance() {
let xt = uxt(Alice, 209);

let (pool, _guard) = maintained_pool();
let (pool, _guard, _notifier) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
Expand All @@ -206,21 +211,19 @@ fn should_revalidate_during_maintenance() {
let xt1 = uxt(Alice, 209);
let xt2 = uxt(Alice, 210);

let (pool, _guard) = maintained_pool();
let (pool, _guard, mut notifier) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt2.clone())).expect("2. Imported");
assert_eq!(pool.status().ready, 2);
assert_eq!(pool.api.validation_requests().len(), 2);

pool.api.push_block(1, vec![xt1.clone()]);

block_on(pool.maintain(block_event(1)));

// maintaince is in background
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));

notifier.clear();
block_on(pool.maintain(block_event(1)));
assert_eq!(pool.status().ready, 1);
block_on(notifier.next());

// test that pool revalidated transaction that left ready and not included in the block
assert_eq!(pool.api.validation_requests().len(), 3);
}
Expand All @@ -230,7 +233,7 @@ fn should_resubmit_from_retracted_during_maintenance() {
let xt = uxt(Alice, 209);
let retracted_hash = Hash::random();

let (pool, _guard) = maintained_pool();
let (pool, _guard, _notifier) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
Expand All @@ -249,7 +252,7 @@ fn should_not_retain_invalid_hashes_from_retracted() {
let xt = uxt(Alice, 209);
let retracted_hash = Hash::random();

let (pool, _guard) = maintained_pool();
let (pool, _guard, mut notifier) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);
Expand All @@ -260,10 +263,10 @@ fn should_not_retain_invalid_hashes_from_retracted() {

let event = block_event_with_retracted(1, vec![retracted_hash]);

notifier.clear();
block_on(pool.maintain(event));

// maintenance is in background
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
block_on(notifier.next());

let event = block_event_with_retracted(1, vec![retracted_hash]);

Expand All @@ -275,26 +278,26 @@ fn should_not_retain_invalid_hashes_from_retracted() {
fn should_revalidate_transaction_multiple_times() {
let xt = uxt(Alice, 209);

let (pool, _guard) = maintained_pool();
let (pool, _guard, mut notifier) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

pool.api.push_block(1, vec![xt.clone()]);

// maintenance is in background
notifier.clear();
block_on(pool.maintain(block_event(1)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
block_on(notifier.next());

block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

pool.api.push_block(2, vec![]);
pool.api.add_invalid(&xt);

// maintenance is in background
notifier.clear();
block_on(pool.maintain(block_event(2)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
block_on(notifier.next());

assert_eq!(pool.status().ready, 0);
}
Expand All @@ -305,23 +308,24 @@ fn should_revalidate_across_many_blocks() {
let xt2 = uxt(Alice, 210);
let xt3 = uxt(Alice, 211);

let (pool, _guard) = maintained_pool();
let (pool, _guard, mut notifier) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt2.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 2);

pool.api.push_block(1, vec![]);
notifier.clear();
block_on(pool.maintain(block_event(1)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));

block_on(notifier.next());

block_on(pool.submit_one(&BlockId::number(2), SOURCE, xt3.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 3);

pool.api.push_block(2, vec![xt1.clone()]);
notifier.clear();
block_on(pool.maintain(block_event(2)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
block_on(notifier.next());

assert_eq!(pool.status().ready, 2);
// xt1 and xt2 validated twice, then xt3 once, then xt2 and xt3 again
Expand All @@ -336,7 +340,7 @@ fn should_push_watchers_during_maintaince() {
}

// given
let (pool, _guard) = maintained_pool();
let (pool, _guard, mut notifier) = maintained_pool();

let tx0 = alice_uxt(0);
let watcher0 = block_on(
Expand All @@ -363,15 +367,15 @@ fn should_push_watchers_during_maintaince() {
// when
pool.api.add_invalid(&tx3);
pool.api.add_invalid(&tx4);
block_on(pool.maintain(block_event(0)));

// revalidation is in background
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
// clear timer events if any
notifier.clear();
block_on(pool.maintain(block_event(0)));
block_on(notifier.next());

// then
// hash3 is now invalid
// hash4 is now invalid

assert_eq!(pool.status().ready, 3);
assert_eq!(
futures::executor::block_on_stream(watcher3).collect::<Vec<_>>(),
Expand Down Expand Up @@ -409,7 +413,7 @@ fn should_push_watchers_during_maintaince() {

#[test]
fn can_track_heap_size() {
let (pool, _guard) = maintained_pool();
let (pool, _guard, _notifier) = maintained_pool();
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 209))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 210))).expect("1. Imported");
block_on(pool.submit_one(&BlockId::number(0), SOURCE, uxt(Alice, 211))).expect("1. Imported");
Expand Down Expand Up @@ -629,7 +633,7 @@ fn fork_aware_finalization() {

#[test]
fn ready_set_should_not_resolve_before_block_update() {
let (pool, _guard) = maintained_pool();
let (pool, _guard, _notifier) = maintained_pool();
let xt1 = uxt(Alice, 209);
block_on(pool.submit_one(&BlockId::number(1), SOURCE, xt1.clone())).expect("1. Imported");

Expand All @@ -638,7 +642,7 @@ fn ready_set_should_not_resolve_before_block_update() {

#[test]
fn ready_set_should_resolve_after_block_update() {
let (pool, _guard) = maintained_pool();
let (pool, _guard, _notifier) = maintained_pool();
pool.api.push_block(1, vec![]);

let xt1 = uxt(Alice, 209);
Expand All @@ -651,7 +655,7 @@ fn ready_set_should_resolve_after_block_update() {

#[test]
fn ready_set_should_eventually_resolve_when_block_update_arrives() {
let (pool, _guard) = maintained_pool();
let (pool, _guard, _notifier) = maintained_pool();
pool.api.push_block(1, vec![]);

let xt1 = uxt(Alice, 209);
Expand Down