diff --git a/src/input.rs b/src/input.rs index eb64f00e8..1bb182c6c 100644 --- a/src/input.rs +++ b/src/input.rs @@ -25,11 +25,11 @@ pub trait Input : TimelyInput { /// extern crate timely; /// extern crate differential_dataflow; /// - /// use timely::Configuration; + /// use timely::Config; /// use differential_dataflow::input::Input; /// /// fn main() { - /// ::timely::execute(Configuration::Thread, |worker| { + /// ::timely::execute(Config::thread(), |worker| { /// /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { /// // create input handle and collection. @@ -56,11 +56,11 @@ pub trait Input : TimelyInput { /// extern crate timely; /// extern crate differential_dataflow; /// - /// use timely::Configuration; + /// use timely::Config; /// use differential_dataflow::input::Input; /// /// fn main() { - /// ::timely::execute(Configuration::Thread, |worker| { + /// ::timely::execute(Config::thread(), |worker| { /// /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { /// // create input handle and collection. @@ -87,11 +87,11 @@ pub trait Input : TimelyInput { /// extern crate timely; /// extern crate differential_dataflow; /// - /// use timely::Configuration; + /// use timely::Config; /// use differential_dataflow::input::Input; /// /// fn main() { - /// ::timely::execute(Configuration::Thread, |worker| { + /// ::timely::execute(Config::thread(), |worker| { /// /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| { /// // create input handle and collection. @@ -150,11 +150,11 @@ impl Input for G where ::Timestamp: Lattice { /// extern crate timely; /// extern crate differential_dataflow; /// -/// use timely::Configuration; +/// use timely::Config; /// use differential_dataflow::input::Input; /// /// fn main() { -/// ::timely::execute(Configuration::Thread, |worker| { +/// ::timely::execute(Config::thread(), |worker| { /// /// let (mut handle, probe) = worker.dataflow(|scope| { /// // create input handle and collection. diff --git a/src/lib.rs b/src/lib.rs index 740a4e06a..55e33c62c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,4 +111,32 @@ pub mod difference; pub mod collection; pub mod logging; pub mod consolidation; -pub mod capture; \ No newline at end of file +pub mod capture; + +/// Configuration options for differential dataflow. +#[derive(Default)] +pub struct Config { + /// An amount of arrangement effort to spend each scheduling quantum. + /// + /// The default value of `None` will not schedule operators that maintain arrangements + /// other than when computation is required. Setting the value to `Some(effort)` will + /// cause these operators to reschedule themselves as long as their arrangemnt has not + /// reached a compact representation, and each scheduling quantum they will perform + /// compaction work as if `effort` records had been added to the arrangement. + pub idle_merge_effort: Option +} + +impl Config { + /// Assign an amount of effort to apply to idle arrangement operators. + pub fn idle_merge_effort(mut self, effort: Option) -> Self { + self.idle_merge_effort = effort; + self + } +} + +/// Introduces differential options to a timely configuration. +pub fn configure(config: &mut timely::Config, options: &Config) { + if let Some(effort) = options.idle_merge_effort { + config.worker.set("differential/idle_merge_effort".to_string(), effort); + } +} diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 105983fbe..42f7f0f36 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -180,7 +180,7 @@ where /// extern crate timely; /// extern crate differential_dataflow; /// - /// use timely::Configuration; + /// use timely::Config; /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::arrange::ArrangeBySelf; /// use differential_dataflow::operators::reduce::Reduce; @@ -189,7 +189,7 @@ where /// use differential_dataflow::hashable::OrdWrapper; /// /// fn main() { - /// ::timely::execute(Configuration::Thread, |worker| { + /// ::timely::execute(Config::thread(), |worker| { /// /// // create a first dataflow /// let mut trace = worker.dataflow::(|scope| { @@ -238,7 +238,7 @@ where /// extern crate timely; /// extern crate differential_dataflow; /// - /// use timely::Configuration; + /// use timely::Config; /// use timely::dataflow::ProbeHandle; /// use timely::dataflow::operators::Probe; /// use differential_dataflow::input::InputSession; @@ -249,7 +249,7 @@ where /// use differential_dataflow::hashable::OrdWrapper; /// /// fn main() { - /// ::timely::execute(Configuration::Thread, |worker| { + /// ::timely::execute(Config::thread(), |worker| { /// /// let mut input = InputSession::<_,(),isize>::new(); /// let mut probe = ProbeHandle::new(); @@ -349,7 +349,7 @@ where /// extern crate timely; /// extern crate differential_dataflow; /// - /// use timely::Configuration; + /// use timely::Config; /// use timely::progress::frontier::AntichainRef; /// use timely::dataflow::ProbeHandle; /// use timely::dataflow::operators::Probe; @@ -364,7 +364,7 @@ where /// use differential_dataflow::input::Input; /// /// fn main() { - /// ::timely::execute(Configuration::Thread, |worker| { + /// ::timely::execute(Config::thread(), |worker| { /// /// let mut probe = ProbeHandle::new(); /// diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index c4cf6fb12..886bcb590 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -558,8 +558,7 @@ where let (activator, effort) = - if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") { - let effort = text.parse::().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer"); + if let Some(effort) = self.inner.scope().config().get::("differential/idle_merge_effort").cloned() { (Some(self.scope().activator_for(&info.address[..])), Some(effort)) } else { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 4ac873fec..88f137197 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -170,8 +170,7 @@ where // Establish compaction effort to apply even without updates. let (activator, effort) = - if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") { - let effort = text.parse::().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer"); + if let Some(effort) = stream.scope().config().get::("differential/idle_merge_effort").cloned() { (Some(stream.scope().activator_for(&info.address[..])), Some(effort)) } else { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index d9dab6873..66412a875 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -358,8 +358,7 @@ where // Determine if we should regularly exert the trace maintenance machinery, // and with what amount of effort each time. let (activator, effort) = - if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") { - let effort = text.parse::().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer"); + if let Some(effort) = self.stream.scope().config().get::("differential/idle_merge_effort").cloned() { (Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort)) } else { diff --git a/tests/bfs.rs b/tests/bfs.rs index 5f7668933..a8d0f61e4 100644 --- a/tests/bfs.rs +++ b/tests/bfs.rs @@ -6,7 +6,7 @@ use rand::{Rng, SeedableRng, StdRng}; use std::sync::{Arc, Mutex}; -use timely::Configuration; +use timely::Config; use timely::dataflow::*; use timely::dataflow::operators::Capture; @@ -21,11 +21,11 @@ use differential_dataflow::lattice::Lattice; type Node = usize; type Edge = (Node, Node); -#[test] fn bfs_10_20_1000() { test_sizes(10, 20, 1000, Configuration::Process(3)); } -#[test] fn bfs_100_200_10() { test_sizes(100, 200, 10, Configuration::Process(3)); } -#[test] fn bfs_100_2000_1() { test_sizes(100, 2000, 1, Configuration::Process(3)); } +#[test] fn bfs_10_20_1000() { test_sizes(10, 20, 1000, Config::process(3)); } +#[test] fn bfs_100_200_10() { test_sizes(100, 200, 10, Config::process(3)); } +#[test] fn bfs_100_2000_1() { test_sizes(100, 2000, 1, Config::process(3)); } -fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Configuration) { +fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Config) { let root_list = vec![(1, 0, 1)]; let mut edge_list = Vec::new(); @@ -143,7 +143,7 @@ fn bfs_sequential( fn bfs_differential( roots_list: Vec<(usize, usize, isize)>, edges_list: Vec<((usize, usize), usize, isize)>, - config: Configuration, + config: Config, ) -> Vec<((usize, usize), usize, isize)> { diff --git a/tests/import.rs b/tests/import.rs index 48c908458..5d36f2925 100644 --- a/tests/import.rs +++ b/tests/import.rs @@ -46,7 +46,7 @@ fn run_test(test: T, expected: Vec<(usize, Vec<((u64, i64), i64)>)>) -> () #[test] fn test_import_vanilla() { run_test(|input_epochs| { - timely::execute(timely::Configuration::Process(4), move |worker| { + timely::execute(timely::Config::process(4), move |worker| { let ref input_epochs = input_epochs; let index = worker.index(); let peers = worker.peers(); @@ -102,7 +102,7 @@ fn test_import_vanilla() { fn test_import_completed_dataflow() { // Runs the first dataflow to completion before constructing the subscriber. run_test(|input_epochs| { - timely::execute(timely::Configuration::Process(4), move |worker| { + timely::execute(timely::Config::process(4), move |worker| { let ref input_epochs = input_epochs; let index = worker.index(); let peers = worker.peers(); @@ -161,7 +161,7 @@ fn test_import_completed_dataflow() { #[test] fn test_import_stalled_dataflow() { // Runs the first dataflow to completion before constructing the subscriber. - timely::execute(timely::Configuration::Thread, move |worker| { + timely::execute(timely::Config::thread(), move |worker| { let mut input = InputSession::new(); @@ -208,7 +208,7 @@ fn test_import_stalled_dataflow() { fn import_skewed() { run_test(|_input| { - let captured = timely::execute(timely::Configuration::Process(4), |worker| { + let captured = timely::execute(timely::Config::process(4), |worker| { let index = worker.index(); let peers = worker.peers(); diff --git a/tests/scc.rs b/tests/scc.rs index 83a923525..76670d392 100644 --- a/tests/scc.rs +++ b/tests/scc.rs @@ -9,7 +9,7 @@ use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::mem; -use timely::Configuration; +use timely::Config; use timely::dataflow::*; use timely::dataflow::operators::Capture; @@ -24,11 +24,11 @@ use differential_dataflow::lattice::Lattice; type Node = usize; type Edge = (Node, Node); -#[test] fn scc_10_20_1000() { test_sizes(10, 20, 1000, Configuration::Process(3)); } -#[test] fn scc_100_200_10() { test_sizes(100, 200, 10, Configuration::Process(3)); } -#[test] fn scc_100_2000_1() { test_sizes(100, 2000, 1, Configuration::Process(3)); } +#[test] fn scc_10_20_1000() { test_sizes(10, 20, 1000, Config::process(3)); } +#[test] fn scc_100_200_10() { test_sizes(100, 200, 10, Config::process(3)); } +#[test] fn scc_100_2000_1() { test_sizes(100, 2000, 1, Config::process(3)); } -fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Configuration) { +fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Config) { let mut edge_list = Vec::new(); @@ -167,7 +167,7 @@ fn assign(node: usize, root: usize, reverse: &HashMap>, compon fn scc_differential( edges_list: Vec<((usize, usize), usize, isize)>, - config: Configuration, + config: Config, ) -> Vec<((usize, usize), usize, isize)> {