Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions examples/columnation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
extern crate timely;
extern crate differential_dataflow;

use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;

fn main() {

let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let size: usize = std::env::args().nth(2).unwrap().parse().unwrap();

let mode = std::env::args().any(|a| a == "new");

if mode {
println!("Running NEW arrangement");
}
else {
println!("Running OLD arrangement");
}

let timer1 = ::std::time::Instant::now();
let timer2 = timer1.clone();

// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args(), move |worker| {

// define BFS dataflow; return handles to roots and edges inputs
let mut probe = Handle::new();
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {

use differential_dataflow::operators::{arrange::Arrange, JoinCore};
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, ColKeySpine};

let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();

if mode {
let data = data.arrange::<ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
}
else {
let data = data.arrange::<OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
}

(data_input, keys_input)
});

// Load up data in batches.
let mut counter = 0;
while counter < 10 * keys {
let mut i = worker.index();
while i < size {
let val = (counter + i) % keys;
data_input.insert(format!("{:?}", val));
i += worker.peers();
}
counter += size;
data_input.advance_to(data_input.time() + 1);
data_input.flush();
keys_input.advance_to(keys_input.time() + 1);
keys_input.flush();
while probe.less_than(data_input.time()) {
worker.step();
}
}
println!("{:?}\tloading complete", timer1.elapsed());

let mut queries = 0;

while queries < 10 * keys {
let mut i = worker.index();
while i < size {
let val = (queries + i) % keys;
keys_input.insert(format!("{:?}", val));
i += worker.peers();
}
queries += size;
data_input.advance_to(data_input.time() + 1);
data_input.flush();
keys_input.advance_to(keys_input.time() + 1);
keys_input.flush();
while probe.less_than(data_input.time()) {
worker.step();
}
}

println!("{:?}\tqueries complete", timer1.elapsed());

// loop { }

}).unwrap();

println!("{:?}\tshut down", timer2.elapsed());

}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
//! of the new and old counts of the old and new degrees of the affected node).

#![forbid(missing_docs)]
#![allow(array_into_iter)]


use std::fmt::Debug;

Expand Down
6 changes: 1 addition & 5 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,6 @@ where
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

let mut buffer = Vec::new();


let (activator, effort) =
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
Expand All @@ -569,8 +566,7 @@ where

input.for_each(|cap, data| {
capabilities.insert(cap.retain());
data.swap(&mut buffer);
batcher.push_batch(&mut buffer);
batcher.push_batch(data);
});

// The frontier may have advanced by multiple elements, which is an issue because
Expand Down
19 changes: 15 additions & 4 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A general purpose `Batcher` implementation based on radix sort.

use timely::communication::message::RefOrMut;
use timely::progress::frontier::Antichain;

use ::difference::Semigroup;
Expand Down Expand Up @@ -33,8 +34,20 @@ where
}

#[inline(never)]
fn push_batch(&mut self, batch: &mut Vec<((B::Key,B::Val),B::Time,B::R)>) {
self.sorter.push(batch);
fn push_batch(&mut self, batch: RefOrMut<Vec<((B::Key,B::Val),B::Time,B::R)>>) {
// `batch` is either a shared reference or an owned allocations.
match batch {
RefOrMut::Ref(reference) => {
// This is a moment at which we could capture the allocations backing
// `batch` into a different form of region, rather than just cloning.
let mut owned: Vec<_> = self.sorter.empty();
owned.clone_from(reference);
self.sorter.push(&mut owned);
},
RefOrMut::Mut(reference) => {
self.sorter.push(reference);
}
}
}

// Sealing a batch means finding those updates with times not greater or equal to any time
Expand All @@ -58,7 +71,6 @@ where
for mut buffer in merged.drain(..) {
for ((key, val), time, diff) in buffer.drain(..) {
if upper.less_equal(&time) {
// keep_count += 1;
self.frontier.insert(time.clone());
if keep.len() == keep.capacity() {
if keep.len() > 0 {
Expand All @@ -69,7 +81,6 @@ where
keep.push(((key, val), time, diff));
}
else {
// seal_count += 1;
builder.push((key, val, time, diff));
}
}
Expand Down
Loading