Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ where

inner.join_core(&edges, |_k,l,d| Some((d.clone(), l+1)))
.concat(&nodes)
.reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
}
6 changes: 3 additions & 3 deletions src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
let reached =
forward
.join_map(&reverse, |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.semijoin(&goals);

let active =
Expand All @@ -96,7 +96,7 @@ where
.join_core(&forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
.concat(&forward)
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next, src), dist)| (next, (src, dist)));

forward_next.map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
Expand All @@ -113,7 +113,7 @@ where
.join_core(&reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
.concat(&reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((s[0].0.clone(), 1)))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.map(|((next,rev), dist)| (next, (rev, dist)));

reverse_next.map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
4 changes: 2 additions & 2 deletions src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where
// keep edges from active edge destinations.
let active =
edges.map(|(_src,dst)| dst)
.threshold(|_,c| if c.is_zero() { R::from(0 as i8) } else { R::from(1 as i8) });
.threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });

graph.enter(&edges.scope())
.semijoin(&active)
Expand Down Expand Up @@ -65,7 +65,7 @@ where

// NOTE: With a node -> int function, can be improved by:
// let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
let labels = propagate(&cycle, &nodes);
let labels = propagate(cycle, &nodes);

edges.join_map(&labels, |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
.join_map(&labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
Expand Down
5 changes: 2 additions & 3 deletions src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ where
.distinct();

// repeatedly apply color-picking logic.
sequence(&start, &edges, |_node, vals| {
sequence(&start, edges, |_node, vals| {

// look for the first absent positive integer.
// start at 1 in case we ever use NonZero<u32>.

(1u32 ..)
.filter(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.next()
.find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.unwrap()
})
}
Expand Down
22 changes: 10 additions & 12 deletions src/algorithms/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
/// use differential_dataflow::algorithms::identifiers::Identifiers;
/// use differential_dataflow::operators::Threshold;
///
/// fn main() {
/// ::timely::example(|scope| {
/// ::timely::example(|scope| {
///
/// let identifiers =
/// scope.new_collection_from(1 .. 10).1
/// .identifiers()
/// // assert no conflicts
/// .map(|(data, id)| id)
/// .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
/// .assert_empty();
/// });
/// }
/// let identifiers =
/// scope.new_collection_from(1 .. 10).1
/// .identifiers()
/// // assert no conflicts
/// .map(|(data, id)| id)
/// .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
/// .assert_empty();
/// });
/// ```
fn identifiers(&self) -> Collection<G, (D, u64), R>;
}
Expand Down Expand Up @@ -63,7 +61,7 @@ where
.as_collection()
.iterate(|diff|
init.enter(&diff.scope())
.concat(&diff)
.concat(diff)
.map(|pair| (pair.hashed(), pair))
.reduce(|_hash, input, output| {
// keep round-positive records as changes.
Expand Down
6 changes: 2 additions & 4 deletions src/algorithms/prefix_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ where
let combine1 = ::std::rc::Rc::new(combine);
let combine2 = combine1.clone();

let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y));
let values = broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y));

values
let ranges = aggregate(self.clone(), move |k,x,y| (*combine1)(k,x,y));
broadcast(ranges, locations, zero, move |k,x,y| (*combine2)(k,x,y))
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ pub mod source {
let mut progress_session = progress.session(&progress_caps[0]);

// We presume the iterator will yield if appropriate.
while let Some(message) = source.next() {
for message in source.by_ref() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
Expand Down Expand Up @@ -624,7 +624,7 @@ pub mod sink {
// and so any record we see is in fact guaranteed to happen.
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let mut input = builder.new_input(&stream, Pipeline);
let mut input = builder.new_input(stream, Pipeline);
let (mut updates_out, updates) = builder.new_output();

builder.build_reschedule(
Expand Down Expand Up @@ -655,7 +655,7 @@ pub mod sink {
if let Some(sink) = updates_sink.upgrade() {
let mut sink = sink.borrow_mut();
while let Some(message) = send_queue.front() {
if let Some(duration) = sink.poll(&message) {
if let Some(duration) = sink.poll(message) {
// Reschedule after `duration` and then bail.
reactivator.activate_after(duration);
return true;
Expand Down Expand Up @@ -736,7 +736,7 @@ pub mod sink {
frontier = input.frontier.frontier().to_owned();

while let Some(message) = send_queue.front() {
if let Some(duration) = sink.poll(&message) {
if let Some(duration) = sink.poll(message) {
// Reschedule after `duration` and then bail.
reactivator.activate_after(duration);
// Signal that work remains to be done.
Expand Down
Loading