Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ pub trait Input : TimelyInput {
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::Config;
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::execute(Configuration::Thread, |worker| {
/// ::timely::execute(Config::thread(), |worker| {
///
/// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
/// // create input handle and collection.
Expand All @@ -56,11 +56,11 @@ pub trait Input : TimelyInput {
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::Config;
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::execute(Configuration::Thread, |worker| {
/// ::timely::execute(Config::thread(), |worker| {
///
/// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
/// // create input handle and collection.
Expand All @@ -87,11 +87,11 @@ pub trait Input : TimelyInput {
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::Config;
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::execute(Configuration::Thread, |worker| {
/// ::timely::execute(Config::thread(), |worker| {
///
/// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
/// // create input handle and collection.
Expand Down Expand Up @@ -150,11 +150,11 @@ impl<G: TimelyInput> Input for G where <G as ScopeParent>::Timestamp: Lattice {
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::Config;
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::execute(Configuration::Thread, |worker| {
/// ::timely::execute(Config::thread(), |worker| {
///
/// let (mut handle, probe) = worker.dataflow(|scope| {
/// // create input handle and collection.
Expand Down
30 changes: 29 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,32 @@ pub mod difference;
pub mod collection;
pub mod logging;
pub mod consolidation;
pub mod capture;
pub mod capture;

/// Configuration options for differential dataflow.
#[derive(Default)]
pub struct Config {
/// An amount of arrangement effort to spend each scheduling quantum.
///
/// The default value of `None` will not schedule operators that maintain arrangements
/// other than when computation is required. Setting the value to `Some(effort)` will
/// cause these operators to reschedule themselves as long as their arrangemnt has not
/// reached a compact representation, and each scheduling quantum they will perform
/// compaction work as if `effort` records had been added to the arrangement.
pub idle_merge_effort: Option<isize>
}

impl Config {
/// Assign an amount of effort to apply to idle arrangement operators.
pub fn idle_merge_effort(mut self, effort: Option<isize>) -> Self {
self.idle_merge_effort = effort;
self
}
}

/// Introduces differential options to a timely configuration.
pub fn configure(config: &mut timely::Config, options: &Config) {
if let Some(effort) = options.idle_merge_effort {
config.worker.set("differential/idle_merge_effort".to_string(), effort);
}
}
12 changes: 6 additions & 6 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ where
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::Config;
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::arrange::ArrangeBySelf;
/// use differential_dataflow::operators::reduce::Reduce;
Expand All @@ -189,7 +189,7 @@ where
/// use differential_dataflow::hashable::OrdWrapper;
///
/// fn main() {
/// ::timely::execute(Configuration::Thread, |worker| {
/// ::timely::execute(Config::thread(), |worker| {
///
/// // create a first dataflow
/// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
Expand Down Expand Up @@ -238,7 +238,7 @@ where
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::Config;
/// use timely::dataflow::ProbeHandle;
/// use timely::dataflow::operators::Probe;
/// use differential_dataflow::input::InputSession;
Expand All @@ -249,7 +249,7 @@ where
/// use differential_dataflow::hashable::OrdWrapper;
///
/// fn main() {
/// ::timely::execute(Configuration::Thread, |worker| {
/// ::timely::execute(Config::thread(), |worker| {
///
/// let mut input = InputSession::<_,(),isize>::new();
/// let mut probe = ProbeHandle::new();
Expand Down Expand Up @@ -349,7 +349,7 @@ where
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use timely::Configuration;
/// use timely::Config;
/// use timely::progress::frontier::AntichainRef;
/// use timely::dataflow::ProbeHandle;
/// use timely::dataflow::operators::Probe;
Expand All @@ -364,7 +364,7 @@ where
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::execute(Configuration::Thread, |worker| {
/// ::timely::execute(Config::thread(), |worker| {
///
/// let mut probe = ProbeHandle::new();
///
Expand Down
3 changes: 1 addition & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,7 @@ where


let (activator, effort) =
if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") {
let effort = text.parse::<isize>().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer");
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
}
else {
Expand Down
3 changes: 1 addition & 2 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ where

// Establish compaction effort to apply even without updates.
let (activator, effort) =
if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") {
let effort = text.parse::<isize>().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer");
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(stream.scope().activator_for(&info.address[..])), Some(effort))
}
else {
Expand Down
3 changes: 1 addition & 2 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,7 @@ where
// Determine if we should regularly exert the trace maintenance machinery,
// and with what amount of effort each time.
let (activator, effort) =
if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") {
let effort = text.parse::<isize>().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer");
if let Some(effort) = self.stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.stream.scope().activator_for(&operator_info.address[..])), Some(effort))
}
else {
Expand Down
12 changes: 6 additions & 6 deletions tests/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rand::{Rng, SeedableRng, StdRng};

use std::sync::{Arc, Mutex};

use timely::Configuration;
use timely::Config;

use timely::dataflow::*;
use timely::dataflow::operators::Capture;
Expand All @@ -21,11 +21,11 @@ use differential_dataflow::lattice::Lattice;
type Node = usize;
type Edge = (Node, Node);

#[test] fn bfs_10_20_1000() { test_sizes(10, 20, 1000, Configuration::Process(3)); }
#[test] fn bfs_100_200_10() { test_sizes(100, 200, 10, Configuration::Process(3)); }
#[test] fn bfs_100_2000_1() { test_sizes(100, 2000, 1, Configuration::Process(3)); }
#[test] fn bfs_10_20_1000() { test_sizes(10, 20, 1000, Config::process(3)); }
#[test] fn bfs_100_200_10() { test_sizes(100, 200, 10, Config::process(3)); }
#[test] fn bfs_100_2000_1() { test_sizes(100, 2000, 1, Config::process(3)); }

fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Configuration) {
fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Config) {

let root_list = vec![(1, 0, 1)];
let mut edge_list = Vec::new();
Expand Down Expand Up @@ -143,7 +143,7 @@ fn bfs_sequential(
fn bfs_differential(
roots_list: Vec<(usize, usize, isize)>,
edges_list: Vec<((usize, usize), usize, isize)>,
config: Configuration,
config: Config,
)
-> Vec<((usize, usize), usize, isize)>
{
Expand Down
8 changes: 4 additions & 4 deletions tests/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn run_test<T>(test: T, expected: Vec<(usize, Vec<((u64, i64), i64)>)>) -> ()
#[test]
fn test_import_vanilla() {
run_test(|input_epochs| {
timely::execute(timely::Configuration::Process(4), move |worker| {
timely::execute(timely::Config::process(4), move |worker| {
let ref input_epochs = input_epochs;
let index = worker.index();
let peers = worker.peers();
Expand Down Expand Up @@ -102,7 +102,7 @@ fn test_import_vanilla() {
fn test_import_completed_dataflow() {
// Runs the first dataflow to completion before constructing the subscriber.
run_test(|input_epochs| {
timely::execute(timely::Configuration::Process(4), move |worker| {
timely::execute(timely::Config::process(4), move |worker| {
let ref input_epochs = input_epochs;
let index = worker.index();
let peers = worker.peers();
Expand Down Expand Up @@ -161,7 +161,7 @@ fn test_import_completed_dataflow() {
#[test]
fn test_import_stalled_dataflow() {
// Runs the first dataflow to completion before constructing the subscriber.
timely::execute(timely::Configuration::Thread, move |worker| {
timely::execute(timely::Config::thread(), move |worker| {

let mut input = InputSession::new();

Expand Down Expand Up @@ -208,7 +208,7 @@ fn test_import_stalled_dataflow() {
fn import_skewed() {

run_test(|_input| {
let captured = timely::execute(timely::Configuration::Process(4), |worker| {
let captured = timely::execute(timely::Config::process(4), |worker| {
let index = worker.index();
let peers = worker.peers();

Expand Down
12 changes: 6 additions & 6 deletions tests/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::mem;

use timely::Configuration;
use timely::Config;

use timely::dataflow::*;
use timely::dataflow::operators::Capture;
Expand All @@ -24,11 +24,11 @@ use differential_dataflow::lattice::Lattice;
type Node = usize;
type Edge = (Node, Node);

#[test] fn scc_10_20_1000() { test_sizes(10, 20, 1000, Configuration::Process(3)); }
#[test] fn scc_100_200_10() { test_sizes(100, 200, 10, Configuration::Process(3)); }
#[test] fn scc_100_2000_1() { test_sizes(100, 2000, 1, Configuration::Process(3)); }
#[test] fn scc_10_20_1000() { test_sizes(10, 20, 1000, Config::process(3)); }
#[test] fn scc_100_200_10() { test_sizes(100, 200, 10, Config::process(3)); }
#[test] fn scc_100_2000_1() { test_sizes(100, 2000, 1, Config::process(3)); }

fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Configuration) {
fn test_sizes(nodes: usize, edges: usize, rounds: usize, config: Config) {

let mut edge_list = Vec::new();

Expand Down Expand Up @@ -167,7 +167,7 @@ fn assign(node: usize, root: usize, reverse: &HashMap<usize, Vec<usize>>, compon

fn scc_differential(
edges_list: Vec<((usize, usize), usize, isize)>,
config: Configuration,
config: Config,
)
-> Vec<((usize, usize), usize, isize)>
{
Expand Down