diff --git a/communication/Cargo.toml b/communication/Cargo.toml index 1dbf86193..db396014f 100644 --- a/communication/Cargo.toml +++ b/communication/Cargo.toml @@ -17,7 +17,7 @@ license = "MIT" default = ["getopts"] [dependencies] -getopts = { version = "0.2.14", optional = true} +getopts = { version = "0.2.14", optional = true } bincode = { version = "1.0", optional = true } serde_derive = "1.0" serde = "1.0" diff --git a/communication/examples/comm_hello.rs b/communication/examples/comm_hello.rs index 085c4e0ea..85d67faa2 100644 --- a/communication/examples/comm_hello.rs +++ b/communication/examples/comm_hello.rs @@ -6,7 +6,7 @@ use timely_communication::{Message, Allocate}; fn main() { // extract the configuration from user-supplied arguments, initialize the computation. - let config = timely_communication::Configuration::from_args(std::env::args()).unwrap(); + let config = timely_communication::Config::from_args(std::env::args()).unwrap(); let guards = timely_communication::initialize(config, |mut allocator| { println!("worker {} of {} started", allocator.index(), allocator.peers()); diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index f31076d2f..959913aeb 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -18,7 +18,7 @@ use logging_core::Logger; /// Possible configurations for the communication infrastructure. -pub enum Configuration { +pub enum Config { /// Use one thread. Thread, /// Use one process with an indicated number of threads. @@ -38,81 +38,97 @@ pub enum Configuration { } } -#[cfg(feature = "getopts")] -impl Configuration { - - /// Returns a `getopts::Options` struct that can be used to print - /// usage information in higher-level systems. - pub fn options() -> getopts::Options { - let mut opts = getopts::Options::new(); +impl Config { + /// Installs options into a [`getopts::Options`] struct that corresponds + /// to the parameters in the configuration. + /// + /// It is the caller's responsibility to ensure that the installed options + /// do not conflict with any other options that may exist in `opts`, or + /// that may be installed into `opts` in the future. + /// + /// This method is only available if the `getopts` feature is enabled, which + /// it is by default. + #[cfg(feature = "getopts")] + pub fn install_options(opts: &mut getopts::Options) { opts.optopt("w", "threads", "number of per-process worker threads", "NUM"); opts.optopt("p", "process", "identity of this process", "IDX"); opts.optopt("n", "processes", "number of processes", "NUM"); opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE"); opts.optflag("r", "report", "reports connection progress"); - - opts } - /// Constructs a new configuration by parsing supplied text arguments. + /// Instantiates a configuration based upon the parsed options in `matches`. /// - /// Most commonly, this uses `std::env::Args()` as the supplied iterator. - pub fn from_args>(args: I) -> Result { - let opts = Configuration::options(); - - opts.parse(args) - .map_err(|e| format!("{:?}", e)) - .map(|matches| { - - // let mut config = Configuration::new(1, 0, Vec::new()); - let threads = matches.opt_str("w").map(|x| x.parse().unwrap_or(1)).unwrap_or(1); - let process = matches.opt_str("p").map(|x| x.parse().unwrap_or(0)).unwrap_or(0); - let processes = matches.opt_str("n").map(|x| x.parse().unwrap_or(1)).unwrap_or(1); - let report = matches.opt_present("report"); - - assert!(process < processes); + /// The `matches` object must have been constructed from a + /// [`getopts::Options`] which contained at least the options installed by + /// [`Self::install_options`]. + /// + /// This method is only available if the `getopts` feature is enabled, which + /// it is by default. + #[cfg(feature = "getopts")] + pub fn from_matches(matches: &getopts::Matches) -> Result { + let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?; + let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?; + let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?; + let report = matches.opt_present("report"); - if processes > 1 { - let mut addresses = Vec::new(); - if let Some(hosts) = matches.opt_str("h") { - let reader = ::std::io::BufReader::new(::std::fs::File::open(hosts.clone()).unwrap()); - for x in reader.lines().take(processes) { - addresses.push(x.unwrap()); - } - if addresses.len() < processes { - panic!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes); - } + if processes > 1 { + let mut addresses = Vec::new(); + if let Some(hosts) = matches.opt_str("h") { + let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?; + let reader = ::std::io::BufReader::new(file); + for line in reader.lines().take(processes) { + addresses.push(line.map_err(|e| e.to_string())?); } - else { - for index in 0..processes { - addresses.push(format!("localhost:{}", 2101 + index)); - } + if addresses.len() < processes { + return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes)); } - - assert!(processes == addresses.len()); - Configuration::Cluster { - threads, - process, - addresses, - report, - log_fn: Box::new( | _ | None), + } + else { + for index in 0..processes { + addresses.push(format!("localhost:{}", 2101 + index)); } } - else if threads > 1 { Configuration::Process(threads) } - else { Configuration::Thread } - }) + + assert!(processes == addresses.len()); + Ok(Config::Cluster { + threads, + process, + addresses, + report, + log_fn: Box::new( | _ | None), + }) + } else if threads > 1 { + Ok(Config::Process(threads)) + } else { + Ok(Config::Thread) + } + } + + /// Constructs a new configuration by parsing the supplied text arguments. + /// + /// Most commonly, callers supply `std::env::args()` as the iterator. + /// + /// This method is only available if the `getopts` feature is enabled, which + /// it is by default. + #[cfg(feature = "getopts")] + pub fn from_args>(args: I) -> Result { + let mut opts = getopts::Options::new(); + Config::install_options(&mut opts); + let matches = opts.parse(args).map_err(|e| e.to_string())?; + Config::from_matches(&matches) } /// Attempts to assemble the described communication infrastructure. pub fn try_build(self) -> Result<(Vec, Box), String> { match self { - Configuration::Thread => { + Config::Thread => { Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(()))) }, - Configuration::Process(threads) => { + Config::Process(threads) => { Ok((Process::new_vector(threads).into_iter().map(|x| GenericBuilder::Process(x)).collect(), Box::new(()))) }, - Configuration::Cluster { threads, process, addresses, report, log_fn } => { + Config::Cluster { threads, process, addresses, report, log_fn } => { match initialize_networking(addresses, process, threads, report, log_fn) { Ok((stuff, guard)) => { Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard))) @@ -137,7 +153,7 @@ impl Configuration { /// use timely_communication::Allocate; /// /// // configure for two threads, just one process. -/// let config = timely_communication::Configuration::Process(2); +/// let config = timely_communication::Config::Process(2); /// /// // initializes communication, spawns workers /// let guards = timely_communication::initialize(config, |mut allocator| { @@ -190,7 +206,7 @@ impl Configuration { /// result: Ok(1) /// ``` pub fn initializeT+Send+Sync+'static>( - config: Configuration, + config: Config, func: F, ) -> Result,String> { let (allocators, others) = config.try_build()?; diff --git a/communication/src/lib.rs b/communication/src/lib.rs index 4e4209b55..66ed3e0a3 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -19,7 +19,7 @@ //! use timely_communication::Allocate; //! //! // configure for two threads, just one process. -//! let config = timely_communication::Configuration::Process(2); +//! let config = timely_communication::Config::Process(2); //! //! // initializes communication, spawns workers //! let guards = timely_communication::initialize(config, |mut allocator| { @@ -104,7 +104,7 @@ use abomonation::Abomonation; pub use allocator::Generic as Allocator; pub use allocator::Allocate; -pub use initialize::{initialize, initialize_from, Configuration, WorkerGuards}; +pub use initialize::{initialize, initialize_from, Config, WorkerGuards}; pub use message::Message; /// A composite trait for types that may be used with channels. diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 990863666..282025bad 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -16,9 +16,12 @@ keywords = ["timely", "dataflow"] license = "MIT" [features] +default = ["getopts"] bincode= ["timely_communication/bincode"] +getopts = ["getopts-dep", "timely_communication/getopts"] [dependencies] +getopts-dep = { package = "getopts", version = "0.2.14", optional = true } serde = "1.0" serde_derive = "1.0" abomonation = "0.7.3" diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 5d4eae319..93e6a8867 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -9,8 +9,7 @@ use timely::logging::TimelyEvent; fn main() { // initializes and runs a timely dataflow. - let config = timely::Configuration::from_args(::std::env::args()).unwrap(); - timely::execute(config, |worker| { + timely::execute_from_args(std::env::args(), |worker| { let batch = std::env::args().nth(1).unwrap().parse::().unwrap(); let rounds = std::env::args().nth(2).unwrap().parse::().unwrap(); diff --git a/timely/examples/sequence.rs b/timely/examples/sequence.rs index 8b7053570..7cf318933 100644 --- a/timely/examples/sequence.rs +++ b/timely/examples/sequence.rs @@ -2,11 +2,11 @@ extern crate timely; use std::time::{Instant, Duration}; -use timely::Configuration; +use timely::Config; use timely::synchronization::Sequencer; fn main() { - timely::execute(Configuration::Process(4), |worker| { + timely::execute(Config::process(4), |worker| { let timer = Instant::now(); let mut sequencer = Sequencer::new(worker, Instant::now()); @@ -21,6 +21,6 @@ fn main() { } worker.step(); } - + }).unwrap(); // asserts error-free execution; } diff --git a/timely/examples/threadless.rs b/timely/examples/threadless.rs index 823aa777e..b5fc0da7b 100644 --- a/timely/examples/threadless.rs +++ b/timely/examples/threadless.rs @@ -2,12 +2,13 @@ extern crate timely; use timely::dataflow::{InputHandle, ProbeHandle}; use timely::dataflow::operators::{Inspect, Probe}; +use timely::WorkerConfig; fn main() { // create a naked single-threaded worker. let allocator = timely::communication::allocator::Thread::new(); - let mut worker = timely::worker::Worker::new(allocator); + let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator); // create input and probe handles. let mut input = InputHandle::new(); diff --git a/timely/examples/unordered_input.rs b/timely/examples/unordered_input.rs index 91fd98900..fb48fc1bb 100644 --- a/timely/examples/unordered_input.rs +++ b/timely/examples/unordered_input.rs @@ -2,11 +2,11 @@ extern crate timely; extern crate timely_communication; use timely::dataflow::operators::*; -use timely_communication::Configuration; +use timely::Config; // use timely::progress::timestamp::RootTimestamp; fn main() { - timely::execute(Configuration::Thread, |worker| { + timely::execute(Config::thread(), |worker| { let (mut input, mut cap) = worker.dataflow::(|scope| { let (input, stream) = scope.new_unordered_input(); stream.inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/capture/capture.rs index 993aa575e..f3f2181cb 100644 --- a/timely/src/dataflow/operators/capture/capture.rs +++ b/timely/src/dataflow/operators/capture/capture.rs @@ -36,7 +36,7 @@ pub trait Capture { /// let (send, recv) = ::std::sync::mpsc::channel(); /// let send = Arc::new(Mutex::new(send)); /// - /// timely::execute(timely::Configuration::Thread, move |worker| { + /// timely::execute(timely::Config::thread(), move |worker| { /// /// // this is only to validate the output. /// let send = send.lock().unwrap().clone(); @@ -76,7 +76,7 @@ pub trait Capture { /// let (send0, recv0) = ::std::sync::mpsc::channel(); /// let send0 = Arc::new(Mutex::new(send0)); /// - /// timely::execute(timely::Configuration::Thread, move |worker| { + /// timely::execute(timely::Config::thread(), move |worker| { /// /// // this is only to validate the output. /// let send0 = send0.lock().unwrap().clone(); diff --git a/timely/src/dataflow/operators/capture/extract.rs b/timely/src/dataflow/operators/capture/extract.rs index 11c51a326..e17146394 100644 --- a/timely/src/dataflow/operators/capture/extract.rs +++ b/timely/src/dataflow/operators/capture/extract.rs @@ -22,7 +22,7 @@ pub trait Extract { /// let (send, recv) = ::std::sync::mpsc::channel(); /// let send = Arc::new(Mutex::new(send)); /// - /// timely::execute(timely::Configuration::Thread, move |worker| { + /// timely::execute(timely::Config::thread(), move |worker| { /// /// // this is only to validate the output. /// let send = send.lock().unwrap().clone(); diff --git a/timely/src/dataflow/operators/capture/mod.rs b/timely/src/dataflow/operators/capture/mod.rs index 6343f415b..713b84c96 100644 --- a/timely/src/dataflow/operators/capture/mod.rs +++ b/timely/src/dataflow/operators/capture/mod.rs @@ -24,7 +24,7 @@ //! use timely::dataflow::operators::{Capture, ToStream, Inspect}; //! use timely::dataflow::operators::capture::{EventLink, Replay}; //! -//! timely::execute(timely::Configuration::Thread, |worker| { +//! timely::execute(timely::Config::thread(), |worker| { //! let handle1 = Rc::new(EventLink::new()); //! let handle2 = Some(handle1.clone()); //! @@ -52,7 +52,7 @@ //! use timely::dataflow::operators::{Capture, ToStream, Inspect}; //! use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay}; //! -//! timely::execute(timely::Configuration::Thread, |worker| { +//! timely::execute(timely::Config::thread(), |worker| { //! let list = TcpListener::bind("127.0.0.1:8000").unwrap(); //! let send = TcpStream::connect("127.0.0.1:8000").unwrap(); //! let recv = list.incoming().next().unwrap().unwrap(); diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 7774e6611..6fb1e7e8c 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -189,7 +189,7 @@ fn notificator_delivers_notifications_in_topo_order() { /// use timely::dataflow::operators::generic::operator::Operator; /// use timely::dataflow::channels::pact::Pipeline; /// -/// timely::execute(timely::Configuration::Thread, |worker| { +/// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 84905381c..53a0d247a 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -147,7 +147,7 @@ pub trait Operator { /// use timely::dataflow::operators::generic::operator::Operator; /// use timely::dataflow::channels::pact::Pipeline; /// - /// timely::execute(timely::Configuration::Thread, |worker| { + /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); @@ -208,7 +208,7 @@ pub trait Operator { /// use timely::dataflow::operators::generic::operator::Operator; /// use timely::dataflow::channels::pact::Pipeline; /// - /// timely::execute(timely::Configuration::Thread, |worker| { + /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { /// let (in1_handle, in1) = scope.new_input(); /// let (in2_handle, in2) = scope.new_input(); diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 01b63e2e6..120e44abe 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -39,7 +39,7 @@ pub trait Input : Scope { /// use timely::dataflow::operators::{Input, Inspect}; /// /// // construct and execute a timely dataflow - /// timely::execute(Configuration::Thread, |worker| { + /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it /// let mut input = worker.dataflow(|scope| { @@ -71,7 +71,7 @@ pub trait Input : Scope { /// use timely::dataflow::operators::input::Handle; /// /// // construct and execute a timely dataflow - /// timely::execute(Configuration::Thread, |worker| { + /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it /// let mut input = Handle::new(); @@ -189,7 +189,7 @@ impl Handle { /// use timely::dataflow::operators::input::Handle; /// /// // construct and execute a timely dataflow - /// timely::execute(Configuration::Thread, |worker| { + /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it /// let mut input = Handle::new(); @@ -226,7 +226,7 @@ impl Handle { /// use timely::dataflow::operators::input::Handle; /// /// // construct and execute a timely dataflow - /// timely::execute(Configuration::Thread, |worker| { + /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it /// let mut input = Handle::new(); diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 4272591f6..e3cfcabf3 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -26,7 +26,7 @@ pub trait Probe { /// use timely::dataflow::operators::{Input, Probe, Inspect}; /// /// // construct and execute a timely dataflow - /// timely::execute(Configuration::Thread, |worker| { + /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it /// let (mut input, probe) = worker.dataflow(|scope| { @@ -56,7 +56,7 @@ pub trait Probe { /// use timely::dataflow::operators::probe::Handle; /// /// // construct and execute a timely dataflow - /// timely::execute(Configuration::Thread, |worker| { + /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it /// let mut probe = Handle::new(); @@ -181,14 +181,14 @@ impl Clone for Handle { #[cfg(test)] mod tests { - use crate::communication::Configuration; + use crate::Config; use crate::dataflow::operators::{Input, Probe}; #[test] fn probe() { // initializes and runs a timely dataflow computation - crate::execute(Configuration::Thread, |worker| { + crate::execute(Config::thread(), |worker| { // create a new input, and inspect its output let (mut input, probe) = worker.dataflow(move |scope| { diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index 5db3a94df..d39fab9a2 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -49,7 +49,7 @@ pub trait UnorderedInput { /// let (send, recv) = ::std::sync::mpsc::channel(); /// let send = Arc::new(Mutex::new(send)); /// - /// timely::execute(Configuration::Thread, move |worker| { + /// timely::execute(Config::thread(), move |worker| { /// /// // this is only to validate the output. /// let send = send.lock().unwrap().clone(); diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 3d2c28290..824490376 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -12,7 +12,7 @@ use crate::progress::{Source, Target}; use crate::progress::timestamp::Refines; use crate::order::Product; use crate::logging::TimelyLogger as Logger; -use crate::worker::AsWorker; +use crate::worker::{AsWorker, Config}; use super::{ScopeParent, Scope}; @@ -52,6 +52,7 @@ where G: ScopeParent, T: Timestamp+Refines { + fn config(&self) -> &Config { self.parent.config() } fn index(&self) -> usize { self.parent.index() } fn peers(&self) -> usize { self.parent.peers() } fn allocate(&mut self, identifier: usize, address: &[usize]) -> (Vec>>>, Box>>) { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index ab9c82f57..277033a7c 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -1,8 +1,79 @@ //! Starts a timely dataflow execution from configuration information and per-worker logic. -use crate::communication::{initialize_from, Configuration, Allocator, allocator::AllocateBuilder, WorkerGuards}; +use crate::communication::{initialize_from, Allocator, allocator::AllocateBuilder, WorkerGuards}; use crate::dataflow::scopes::Child; use crate::worker::Worker; +use crate::{CommunicationConfig, WorkerConfig}; + +/// Configures the execution of a timely dataflow computation. +pub struct Config { + /// Configuration for the communication infrastructure. + pub communication: CommunicationConfig, + /// Configuration for the worker threads. + pub worker: WorkerConfig, +} + +impl Config { + /// Installs options into a [`getopts::Options`] struct that correspond + /// to the parameters in the configuration. + /// + /// It is the caller's responsibility to ensure that the installed options + /// do not conflict with any other options that may exist in `opts`, or + /// that may be installed into `opts` in the future. + /// + /// This method is only available if the `getopts` feature is enabled, which + /// it is by default. + #[cfg(feature = "getopts")] + pub fn install_options(opts: &mut getopts_dep::Options) { + CommunicationConfig::install_options(opts); + WorkerConfig::install_options(opts); + } + + /// Instantiates a configuration based upon the parsed options in `matches`. + /// + /// The `matches` object must have been constructed from a + /// [`getopts::Options`] which contained at least the options installed by + /// [`Self::install_options`]. + /// + /// This method is only available if the `getopts` feature is enabled, which + /// it is by default. + #[cfg(feature = "getopts")] + pub fn from_matches(matches: &getopts_dep::Matches) -> Result { + Ok(Config { + communication: CommunicationConfig::from_matches(matches)?, + worker: WorkerConfig::from_matches(matches)?, + }) + } + + /// Constructs a new configuration by parsing the supplied text arguments. + /// + /// Most commonly, callers supply `std::env::args()` as the iterator. + #[cfg(feature = "getopts")] + pub fn from_args>(args: I) -> Result { + let mut opts = getopts_dep::Options::new(); + Config::install_options(&mut opts); + let matches = opts.parse(args).map_err(|e| e.to_string())?; + Config::from_matches(&matches) + } + + /// Constructs a `Config` that uses one worker thread and the + /// defaults for all other parameters. + pub fn thread() -> Config { + Config { + communication: CommunicationConfig::Thread, + worker: WorkerConfig::default(), + } + } + + /// Constructs an `Config` that uses `n` worker threads and the + /// defaults for all other parameters. + pub fn process(n: usize) -> Config { + Config { + communication: CommunicationConfig::Process(n), + worker: WorkerConfig::default(), + } + } +} /// Executes a single-threaded timely dataflow computation. /// @@ -82,7 +153,7 @@ where F: FnOnce(&mut Worker)->T+Send+Sync+'static { let alloc = crate::communication::allocator::thread::Thread::new(); - let mut worker = crate::worker::Worker::new(alloc); + let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc); let result = func(&mut worker); while worker.step_or_park(None) { } result @@ -111,7 +182,7 @@ where /// use timely::dataflow::operators::{ToStream, Inspect}; /// /// // execute a timely dataflow using three worker threads. -/// timely::execute(timely::Configuration::Process(3), |worker| { +/// timely::execute(timely::Config::process(3), |worker| { /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) /// .inspect(|x| println!("seen: {:?}", x)); @@ -133,7 +204,7 @@ where /// let send = Arc::new(Mutex::new(send)); /// /// // execute a timely dataflow using three worker threads. -/// timely::execute(timely::Configuration::Process(3), move |worker| { +/// timely::execute(timely::Config::process(3), move |worker| { /// let send = send.lock().unwrap().clone(); /// worker.dataflow::<(),_,_>(move |scope| { /// (0..10).to_stream(scope) @@ -145,12 +216,15 @@ where /// // the extracted data should have data (0..10) thrice at timestamp 0. /// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::>()); /// ``` -pub fn execute(mut config: Configuration, func: F) -> Result,String> +pub fn execute( + mut config: Config, + func: F +) -> Result,String> where T:Send+'static, F: Fn(&mut Worker)->T+Send+Sync+'static { - if let Configuration::Cluster { ref mut log_fn, .. } = config { + if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication { *log_fn = Box::new(|events_setup| { @@ -181,11 +255,11 @@ where }); } - let (allocators, other) = config.try_build()?; + let (allocators, other) = config.communication.try_build()?; initialize_from(allocators, other, move |allocator| { - let mut worker = Worker::new(allocator); + let mut worker = Worker::new(WorkerConfig::default(), allocator); // If an environment variable is set, use it as the default timely logging. if let Ok(addr) = ::std::env::var("TIMELY_WORKER_LOG_ADDR") { @@ -239,6 +313,9 @@ where /// If not specified, `localhost` will be used, with port numbers increasing from 2101 (chosen /// arbitrarily). /// +/// This method is only available if the `getopts` feature is enabled, which +/// it is by default. +/// /// # Examples /// /// ```rust @@ -265,12 +342,13 @@ where /// host2:port /// host3:port /// ``` +#[cfg(feature = "getopts")] pub fn execute_from_args(iter: I, func: F) -> Result,String> where I: Iterator, T:Send+'static, F: Fn(&mut Worker)->T+Send+Sync+'static, { - let configuration = Configuration::from_args(iter)?; - execute(configuration, func) + let config = Config::from_args(iter)?; + execute(config, func) } /// Executes a timely dataflow from supplied allocators and logging. @@ -279,23 +357,29 @@ pub fn execute_from_args(iter: I, func: F) -> Result,St /// /// ```rust /// use timely::dataflow::operators::{ToStream, Inspect}; +/// use timely::WorkerConfig; /// /// // execute a timely dataflow using command line parameters -/// let (builders, other) = timely::Configuration::Process(3).try_build().unwrap(); -/// timely::execute::execute_from(builders, other, |worker| { +/// let (builders, other) = timely::CommunicationConfig::Process(3).try_build().unwrap(); +/// timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| { /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) /// .inspect(|x| println!("seen: {:?}", x)); /// }) /// }).unwrap(); /// ``` -pub fn execute_from(builders: Vec, others: Box, func: F) -> Result,String> +pub fn execute_from( + builders: Vec, + others: Box, + worker_config: WorkerConfig, + func: F, +) -> Result, String> where A: AllocateBuilder+'static, T: Send+'static, F: Fn(&mut Worker<::Allocator>)->T+Send+Sync+'static { initialize_from(builders, others, move |allocator| { - let mut worker = Worker::new(allocator); + let mut worker = Worker::new(worker_config.clone(), allocator); let result = func(&mut worker); while worker.step_or_park(None) { } result diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 5ddbe19ed..2241ae39f 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -67,10 +67,14 @@ extern crate timely_communication; extern crate timely_bytes; extern crate timely_logging; -pub use execute::{execute, execute_directly, execute_from_args, example}; +pub use execute::{execute, execute_directly, example}; +#[cfg(feature = "getopts")] +pub use execute::execute_from_args; pub use order::PartialOrder; -pub use timely_communication::Configuration; +pub use timely_communication::Config as CommunicationConfig; +pub use worker::Config as WorkerConfig; +pub use execute::Config as Config; /// Re-export of the `timely_communication` crate. pub mod communication { diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index fa9535459..bf571d2da 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -24,6 +24,8 @@ use crate::progress::broadcast::Progcaster; use crate::progress::reachability; use crate::progress::timestamp::Refines; +use crate::worker::ProgressMode; + // IMPORTANT : by convention, a child identifier of zero is used to indicate inputs and outputs of // the Subgraph itself. An identifier greater than zero corresponds to an actual child, which can // be found at position (id - 1) in the `children` field of the Subgraph. @@ -198,7 +200,7 @@ where shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))), scope_summary, - eager_progress_send: ::std::env::var("DEFAULT_PROGRESS_MODE") != Ok("DEMAND".to_owned()), + progress_mode: worker.config().progress_mode, } } } @@ -249,7 +251,7 @@ where shared_progress: Rc>>, scope_summary: Vec>>, - eager_progress_send: bool, + progress_mode: ProgressMode, } impl Schedule for Subgraph @@ -482,7 +484,7 @@ where // If we are requested to eagerly send progress updates, or if there are // updates visible in the scope-wide frontier, we must send all updates. - let must_send = self.eager_progress_send || { + let must_send = self.progress_mode == ProgressMode::Eager || { let tracker = &mut self.pointstamp_tracker; self.local_pointstamp .iter() diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 7b39fa5b9..be006de06 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -65,10 +65,10 @@ impl Sequencer { /// ```rust /// use std::time::{Instant, Duration}; /// - /// use timely::Configuration; + /// use timely::Config; /// use timely::synchronization::Sequencer; /// - /// timely::execute(Configuration::Process(4), |worker| { + /// timely::execute(Config::process(4), |worker| { /// let timer = Instant::now(); /// let mut sequencer = Sequencer::new(worker, timer); /// diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 317028d1f..4e5024d54 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -3,9 +3,11 @@ use std::rc::Rc; use std::cell::{RefCell, RefMut}; use std::any::Any; +use std::str::FromStr; use std::time::{Instant, Duration}; use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::sync::Arc; use crate::communication::{Allocate, Data, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; @@ -16,11 +18,109 @@ use crate::progress::operate::Operate; use crate::dataflow::scopes::Child; use crate::logging::TimelyLogger; +/// Progress mode. (???) +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum ProgressMode { + /// ??? + Eager, + /// ??? + Demand, +} + +impl Default for ProgressMode { + fn default() -> ProgressMode { + ProgressMode::Eager + } +} + +impl FromStr for ProgressMode { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "eager" => Ok(ProgressMode::Eager), + "demand" => Ok(ProgressMode::Demand), + _ => Err(format!("unknown progress mode: {}", s)), + } + } +} + +/// Worker configuration. +#[derive(Debug, Default, Clone)] +pub struct Config { + /// The progress mode to use. + pub(crate) progress_mode: ProgressMode, + /// A map from parameter name to typed parameter values. + registry: HashMap>, +} + +impl Config { + /// Installs options into a [`getopts::Options`] struct that correspond + /// to the parameters in the configuration. + /// + /// It is the caller's responsibility to ensure that the installed options + /// do not conflict with any other options that may exist in `opts`, or + /// that may be installed into `opts` in the future. + /// + /// This method is only available if the `getopts` feature is enabled, which + /// it is by default. + #[cfg(feature = "getopts")] + pub fn install_options(opts: &mut getopts_dep::Options) { + opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE"); + } + + /// Instantiates a configuration based upon the parsed options in `matches`. + /// + /// The `matches` object must have been constructed from a + /// [`getopts::Options`] which contained at least the options installed by + /// [`Self::install_options`]. + /// + /// This method is only available if the `getopts` feature is enabled, which + /// it is by default. + #[cfg(feature = "getopts")] + pub fn from_matches(matches: &getopts_dep::Matches) -> Result { + let progress_mode = matches + .opt_get_default("progress-mode", ProgressMode::Eager) + .map_err(|e| e.to_string())?; + Ok(Config::default().progress_mode(progress_mode)) + } + + /// Sets the progress mode to `progress_mode`. + pub fn progress_mode(mut self, progress_mode: ProgressMode) -> Self { + self.progress_mode = progress_mode; + self + } + + /// Sets a typed configuration parameter for the given `key`. + /// + /// It is recommended to install a single configuration struct using a key + /// that uniquely identifies your project, to avoid clashes. For example, + /// differential dataflow registers a configuration struct under the key + /// "differential". + pub fn set(&mut self, key: String, val: T) -> &mut Self + where + T: Send + Sync + 'static, + { + self.registry.insert(key, Arc::new(val)); + self + } + + /// Gets the value for configured parameter `key`. + /// + /// Returns `None` if `key` has not previously been set with + /// [`WorkerConfig::set`], or if the specified `T` does not match the `T` + /// from the call to `set`. + pub fn get(&self, key: &str) -> Option<&T> { + self.registry.get(key).and_then(|val| val.downcast_ref()) + } +} + /// Methods provided by the root Worker. /// /// These methods are often proxied by child scopes, and this trait provides access. pub trait AsWorker : Scheduler { - + /// Returns the worker configuration parameters. + fn config(&self) -> &Config; /// Index of the worker among its peers. fn index(&self) -> usize; /// Number of peer workers. @@ -53,6 +153,7 @@ pub trait AsWorker : Scheduler { /// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`, /// and has a list of dataflows that it manages. pub struct Worker { + config: Config, timer: Instant, paths: Rc>>>, allocator: Rc>, @@ -71,6 +172,7 @@ pub struct Worker { } impl AsWorker for Worker { + fn config(&self) -> &Config { &self.config } fn index(&self) -> usize { self.allocator.borrow().index() } fn peers(&self) -> usize { self.allocator.borrow().peers() } fn allocate(&mut self, identifier: usize, address: &[usize]) -> (Vec>>>, Box>>) { @@ -102,10 +204,11 @@ impl Scheduler for Worker { impl Worker { /// Allocates a new `Worker` bound to a channel allocator. - pub fn new(c: A) -> Worker { + pub fn new(config: Config, c: A) -> Worker { let now = Instant::now(); let index = c.index(); Worker { + config, timer: now.clone(), paths: Default::default(), allocator: Rc::new(RefCell::new(c)), @@ -512,6 +615,7 @@ use crate::communication::Message; impl Clone for Worker { fn clone(&self) -> Self { Worker { + config: self.config.clone(), timer: self.timer, paths: self.paths.clone(), allocator: self.allocator.clone(), diff --git a/timely/tests/barrier.rs b/timely/tests/barrier.rs index f41e04920..9e627762b 100644 --- a/timely/tests/barrier.rs +++ b/timely/tests/barrier.rs @@ -1,16 +1,20 @@ extern crate timely; -use timely::Configuration; +use timely::{Config, CommunicationConfig, WorkerConfig}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::{Feedback, ConnectLoop}; use timely::dataflow::operators::generic::operator::Operator; -#[test] fn barrier_sync_1w() { barrier_sync_helper(Configuration::Thread); } -#[test] fn barrier_sync_2w() { barrier_sync_helper(Configuration::Process(2)); } -#[test] fn barrier_sync_3w() { barrier_sync_helper(Configuration::Process(3)); } +#[test] fn barrier_sync_1w() { barrier_sync_helper(CommunicationConfig::Thread); } +#[test] fn barrier_sync_2w() { barrier_sync_helper(CommunicationConfig::Process(2)); } +#[test] fn barrier_sync_3w() { barrier_sync_helper(CommunicationConfig::Process(3)); } // This method asserts that each round of execution is notified of at most one time. -fn barrier_sync_helper(config: ::timely::Configuration) { +fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) { + let config = Config { + communication: comm_config, + worker: WorkerConfig::default(), + }; timely::execute(config, move |worker| { worker.dataflow(move |scope| { let (handle, stream) = scope.feedback::(1);