diff --git a/Cargo.toml b/Cargo.toml index 895c0407e..96ebd6d9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ rust-version = "1.86" differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.22.0" } #timely = { version = "0.28", default-features = false } columnar = { version = "0.12", default-features = false } -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [workspace.lints.clippy] diff --git a/README.md b/README.md index 41cfe05f1..d6a1b3e93 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Alternately, here is a fragment that computes the set of nodes reachable from a ```rust let reachable = roots.iterate(|scope, reach| - edges.enter(&scope) + edges.enter(scope) .semijoin(reach) .map(|(src, dst)| dst) .concat(reach) @@ -337,7 +337,7 @@ edges.iterate(|scope, inner| { .map(|(node,_)| node); // keep edges between active vertices - edges.enter(&scope) + edges.enter(scope) .semijoin(active) .map(|(src,dst)| (dst,src)) .semijoin(active) diff --git a/advent_of_code_2017/src/bin/day_06.rs b/advent_of_code_2017/src/bin/day_06.rs index 5cae0bef8..10017e224 100644 --- a/advent_of_code_2017/src/bin/day_06.rs +++ b/advent_of_code_2017/src/bin/day_06.rs @@ -24,7 +24,7 @@ fn main() { let stable = banks.iterate(|scope, iter| iter.map_in_place(|banks| recycle(banks)) - .concat(banks.enter(&scope)) + .concat(banks.enter(scope)) .distinct() ); @@ -43,7 +43,7 @@ fn main() { loop_point .iterate(|scope, iter| iter.map_in_place(|banks| recycle(banks)) - .concat(loop_point.enter(&scope)) + .concat(loop_point.enter(scope)) .distinct() ) .map(|_| ((),())) diff --git a/advent_of_code_2017/src/bin/day_07.rs b/advent_of_code_2017/src/bin/day_07.rs index 13b35094e..0d4a91be2 100644 --- a/advent_of_code_2017/src/bin/day_07.rs +++ b/advent_of_code_2017/src/bin/day_07.rs @@ -1104,10 +1104,10 @@ tvhftq (35)"; let total_weights: VecCollection<_,String> = weights .iterate(|scope, inner| { - parents.enter(&scope) + parents.enter(scope) .semijoin(inner) .map(|(_, parent)| parent) - .concat(weights.enter(&scope)) + .concat(weights.enter(scope)) }); parents diff --git a/advent_of_code_2017/src/bin/day_08.rs b/advent_of_code_2017/src/bin/day_08.rs index 385bc6489..e55aacee3 100644 --- a/advent_of_code_2017/src/bin/day_08.rs +++ b/advent_of_code_2017/src/bin/day_08.rs @@ -1107,7 +1107,7 @@ wui inc -120 if i > -2038"; .map(|_| ((0, String::new()), 0)) .iterate(|scope, valid| { - let edits = edits.enter(&scope); + let edits = edits.enter(scope); valid .prefix_sum_at(edits.map(|(key,_)| key), 0, |_k,x,y| *x + *y) diff --git a/advent_of_code_2017/src/bin/day_09.rs b/advent_of_code_2017/src/bin/day_09.rs index b462993cf..8a9a0f310 100644 --- a/advent_of_code_2017/src/bin/day_09.rs +++ b/advent_of_code_2017/src/bin/day_09.rs @@ -120,7 +120,7 @@ where if input.len() > 1 { result = combine(result, &(input[1].0).1); } output.push((result, 1)); }) - .concat(unit_ranges.enter(&scope)) + .concat(unit_ranges.enter(scope)) ) } @@ -154,10 +154,10 @@ where .iterate(|scope, state| { aggregates .filter(|&((_, log),_)| log < 64) // the log = 64 interval doesn't help us here (overflows). - .enter(&scope) + .enter(scope) .map(|((pos, log), data)| (pos, (log, data))) .join_map(state, move |&pos, &(log, ref data), state| (pos + (1 << log), combine(state, data))) - .concat(init_state.enter(&scope)) + .concat(init_state.enter(scope)) .distinct() }) .consolidate() diff --git a/advent_of_code_2017/src/bin/day_12.rs b/advent_of_code_2017/src/bin/day_12.rs index 4c92ee174..238b7bd51 100644 --- a/advent_of_code_2017/src/bin/day_12.rs +++ b/advent_of_code_2017/src/bin/day_12.rs @@ -2035,8 +2035,8 @@ fn main() { let labels = nodes .iterate(|scope, label| { - let edges = edges.enter(&scope); - let nodes = nodes.enter(&scope); + let edges = edges.enter(scope); + let nodes = nodes.enter(scope); label .join_map(edges, |_src, &lbl, &tgt| (tgt, lbl)) .concat(nodes) diff --git a/diagnostics/src/logging.rs b/diagnostics/src/logging.rs index c16490865..cc4b8331a 100644 --- a/diagnostics/src/logging.rs +++ b/diagnostics/src/logging.rs @@ -415,7 +415,7 @@ struct TimelyDemuxState { /// Build timely logging collections and arrangements. fn construct_timely<'scope>( - scope: &mut Scope<'scope, Duration>, + scope: Scope<'scope, Duration>, stream: Stream<'scope, Duration, Vec<(Duration, TimelyEvent)>>, ) -> (TimelyTraces, TimelyCollections<'scope>) { type OpUpdate = ((usize, String, Vec), Duration, i64); @@ -423,7 +423,7 @@ fn construct_timely<'scope>( type ElUpdate = (usize, Duration, i64); type MsgUpdate = (usize, Duration, i64); - let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope.clone()); + let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope); let mut input = demux.new_input(stream, Pipeline); let (op_out, operates) = demux.new_output::>(); @@ -546,12 +546,12 @@ struct DifferentialCollections<'scope> { /// Build differential logging collections and arrangements. fn construct_differential<'scope>( - scope: &mut Scope<'scope, Duration>, + scope: Scope<'scope, Duration>, stream: Stream<'scope, Duration, Vec<(Duration, DifferentialEvent)>>, ) -> (DifferentialTraces, DifferentialCollections<'scope>) { type Update = (usize, Duration, i64); - let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone()); + let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope); let mut input = demux.new_input(stream, Pipeline); let (bat_out, batches) = demux.new_output::>(); diff --git a/differential-dataflow/examples/accumulate.rs b/differential-dataflow/examples/accumulate.rs index d8da7bdcd..588290acc 100644 --- a/differential-dataflow/examples/accumulate.rs +++ b/differential-dataflow/examples/accumulate.rs @@ -20,7 +20,7 @@ fn main() { scope.iterative::(|inner| { data.enter_at(inner, |_| 0) .consolidate() - .leave(&scope) + .leave(scope) }); input diff --git a/differential-dataflow/examples/arrange.rs b/differential-dataflow/examples/arrange.rs index d3f5abcec..2e5623679 100644 --- a/differential-dataflow/examples/arrange.rs +++ b/differential-dataflow/examples/arrange.rs @@ -2,7 +2,6 @@ use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::operators::*; use timely::order::Product; -use timely::scheduling::Scheduler; use differential_dataflow::input::Input; use differential_dataflow::AsCollection; @@ -109,8 +108,8 @@ fn main() { // repeatedly update minimal distances each node can be reached from each root roots.clone().iterate(|scope, dists| { - let edges = edges.enter(&scope); - let roots = roots.enter(&scope); + let edges = edges.enter(scope); + let roots = roots.enter(scope); dists.arrange_by_key() .join_core(edges, |_k,l,d| Some((*d, l+1))) @@ -175,4 +174,4 @@ fn main() { } } }).unwrap(); -} \ No newline at end of file +} diff --git a/differential-dataflow/examples/bfs.rs b/differential-dataflow/examples/bfs.rs index f8abe4873..aa87b72bc 100644 --- a/differential-dataflow/examples/bfs.rs +++ b/differential-dataflow/examples/bfs.rs @@ -100,8 +100,8 @@ where // repeatedly update minimal distances each node can be reached from each root nodes.clone().iterate(|scope, inner| { - let nodes = nodes.enter(&scope); - let edges = edges.enter(&scope); + let nodes = nodes.enter(scope); + let edges = edges.enter(scope); inner.join_map(edges, |_k,l,d| (*d, l+1)) .concat(nodes) diff --git a/differential-dataflow/examples/columnar/columnar_support.rs b/differential-dataflow/examples/columnar/columnar_support.rs index 03a260218..1241702e7 100644 --- a/differential-dataflow/examples/columnar/columnar_support.rs +++ b/differential-dataflow/examples/columnar/columnar_support.rs @@ -291,7 +291,7 @@ mod distributor { use timely::dataflow::channels::Message; use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract}; use timely::progress::Timestamp; - use timely::worker::AsWorker; + use timely::worker::Worker; use crate::layout::ColumnarUpdate as Update; use crate::{Updates, RecordedUpdates}; @@ -361,15 +361,15 @@ mod distributor { >; type Puller = LogPuller>>>>; - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = worker.allocate::>>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::>(); let distributor = ValDistributor { marker: std::marker::PhantomData, hashfunc: self.hashfunc, pre_lens: Vec::new(), }; - (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + (Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone())) } } } diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 3ccaec945..56380089c 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -185,7 +185,7 @@ mod reachability { variable.set(result_col.clone()); // Leave the iterative scope. - result_col.leave(&outer) + result_col.leave(outer) }) } } diff --git a/differential-dataflow/examples/dynamic.rs b/differential-dataflow/examples/dynamic.rs index 89ca2b2ae..077662f48 100644 --- a/differential-dataflow/examples/dynamic.rs +++ b/differential-dataflow/examples/dynamic.rs @@ -126,7 +126,7 @@ where // Leave the dynamic iteration, stripping off the last timestamp coordinate. next.leave_dynamic(1) .inspect(|x| println!("{:?}", x)) - .leave(&outer) + .leave(outer) }) } diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 2f4ae3763..7b6117d24 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -89,7 +89,7 @@ pub struct EdgeVariable<'scope, T: Timestamp + Lattice> { impl<'scope, T: Timestamp + Lattice> EdgeVariable<'scope, T> { /// Creates a new variable initialized with `source`. pub fn from(source: VecCollection<'scope, T, Edge>, step: T::Summary) -> Self { - let (variable, collection) = VecVariable::new(&mut source.scope(), step); + let (variable, collection) = VecVariable::new(source.scope(), step); EdgeVariable { variable, collection, @@ -150,7 +150,7 @@ impl Query { } /// Creates a dataflow implementing the query, and returns input and trace handles. - pub fn render_in(&self, scope: &mut Scope) -> BTreeMap> + pub fn render_in(&self, scope: Scope) -> BTreeMap> where T: Timestamp + Lattice + ::timely::order::TotalOrder, { @@ -170,7 +170,7 @@ impl Query { // create variables and result handles for each named relation. for (name, (input, collection)) in input_map { let edge_variable = EdgeVariable::from(collection.enter(subscope), Product::new(Default::default(), 1)); - let trace = edge_variable.collection.clone().leave(&scope).arrange_by_self().trace; + let trace = edge_variable.collection.clone().leave(scope).arrange_by_self().trace; result_map.insert(name.clone(), RelationHandles { input, trace }); variable_map.insert(name.clone(), edge_variable); } diff --git a/differential-dataflow/examples/iterate_container.rs b/differential-dataflow/examples/iterate_container.rs index d2fa85302..bd7492df0 100644 --- a/differential-dataflow/examples/iterate_container.rs +++ b/differential-dataflow/examples/iterate_container.rs @@ -77,7 +77,7 @@ fn main() { }).as_collection().consolidate(); let result = wrap(result.inner).as_collection(); variable.set(result); - collection.leave(&scope) + collection.leave(scope) }); }) } diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 027384cd8..fc0022b5d 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -152,6 +152,6 @@ where .as_collection(|k,()| *k); variable.set(result.clone()); - result.leave(&outer) + result.leave(outer) }) } diff --git a/differential-dataflow/examples/pagerank.rs b/differential-dataflow/examples/pagerank.rs index b649316ad..08ab898a1 100644 --- a/differential-dataflow/examples/pagerank.rs +++ b/differential-dataflow/examples/pagerank.rs @@ -131,6 +131,6 @@ where // Bind the recursive variable, return its limit. ranks_bind.set(pushed.clone()); - pushed.leave(&outer) + pushed.leave(outer) }) } diff --git a/differential-dataflow/examples/progress.rs b/differential-dataflow/examples/progress.rs index 20550b7ea..809e50628 100644 --- a/differential-dataflow/examples/progress.rs +++ b/differential-dataflow/examples/progress.rs @@ -131,10 +131,10 @@ where .clone() .iterate(|scope, reach| { transitions - .enter(&scope) + .enter(scope) .join_map(reach, |_from, (dest, summ), time| (dest.clone(), summ.results_in(time))) .flat_map(|(dest, time)| time.map(move |time| (dest, time))) - .concat(times.enter(&scope)) + .concat(times.enter(scope)) .reduce(|_location, input, output: &mut Vec<(T, isize)>| { // retain the lower envelope of times. for (t1, _count1) in input.iter() { @@ -173,10 +173,10 @@ where .clone() .iterate(|scope, summaries| { transitions - .enter(&scope) + .enter(scope) .join_map(summaries, |_middle, (from, summ1), (to, summ2)| (from.clone(), to.clone(), summ1.followed_by(summ2))) .flat_map(|(from, to, summ)| summ.map(move |summ| (from, (to, summ)))) - .concat(zero_inputs.enter(&scope)) + .concat(zero_inputs.enter(scope)) .map(|(from, (to, summary))| ((from, to), summary)) .reduce(|_from_to, input, output| { for (summary, _count) in input.iter() { @@ -222,7 +222,7 @@ where .map(|(_source, target)| target) .distinct(); transitions - .enter(&scope) + .enter(scope) .semijoin(active) }) .consolidate() diff --git a/differential-dataflow/examples/stackoverflow.rs b/differential-dataflow/examples/stackoverflow.rs index 234930932..7aec81f29 100644 --- a/differential-dataflow/examples/stackoverflow.rs +++ b/differential-dataflow/examples/stackoverflow.rs @@ -114,8 +114,8 @@ where // repeatedly update minimal distances each node can be reached from each root nodes.clone().iterate(|scope, inner| { - let edges = edges.enter(&scope); - let nodes = nodes.enter(&scope); + let edges = edges.enter(scope); + let nodes = nodes.enter(scope); inner.join_map(edges, |_k,l,d| (*d, l+1)) .concat(nodes) diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index e1756b073..830cd0f6e 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -33,8 +33,8 @@ where // repeatedly update minimal distances each node can be reached from each root nodes.clone().iterate(|scope, inner| { - let edges = edges.enter(&scope); - let nodes = nodes.enter(&scope); + let edges = edges.enter(scope); + let nodes = nodes.enter(scope); inner.join_core(edges, |_k,l,d| Some((d.clone(), l+1))) .concat(nodes) diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index f6dfa520e..b9d2dd44c 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -118,6 +118,6 @@ where reverse_bind.set(reverse_next); - reached.leave(&outer) + reached.leave(outer) }) } diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index 033ec3e9f..915bf545d 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -69,8 +69,8 @@ where // nodes.filter(|_| false) // .iterate(|scope, inner| { - // let edges = edges.enter(&scope); - // let nodes = nodes.enter_at(&scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64)); + // let edges = edges.enter(scope); + // let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64)); // inner.join_map(edges, |_k,l,d| (d.clone(),l.clone())) // .concat(nodes) // .reduce(|_, s, t| t.push((s[0].0.clone(), 1))) @@ -104,6 +104,6 @@ where labels .as_collection(|k,v| (k.clone(), v.clone())) - .leave(&outer) + .leave(outer) }) } diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index d50a5a05d..7dcb1de7f 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -24,7 +24,7 @@ where let outer = graph.scope(); outer.scoped::,_,_>("StronglyConnected", |scope| { // Bring in edges and transposed edges. - let edges = graph.enter(&scope); + let edges = graph.enter(scope); let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1)); // Create a new variable that will be intra-scc edges. use crate::operators::iterate::Variable; @@ -32,7 +32,7 @@ where let result = trim_edges(trim_edges(inner, edges), trans); variable.set(result.clone()); - result.leave(&outer) + result.leave(outer) }) } @@ -64,6 +64,6 @@ where .join_core(labels, |e2,(e1,l1),l2| [((e1.clone(),e2.clone()),(l1.clone(),l2.clone()))]) .filter(|(_,(l1,l2))| l1 == l2) .map(|((x1,x2),_)| (x2,x1)) - .leave_region(&outer) + .leave_region(outer) }) } diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index e264ed421..9c58cac61 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -57,8 +57,8 @@ where .map(|(node, _state)| (node, None)) .iterate(|scope, new_state| { // immutable content: edges and initial state. - let edges = edges.enter(&scope); - let old_state = state.enter(&scope); + let edges = edges.enter(scope); + let old_state = state.enter(scope); // .map(|x| (x.0, Some(x.1))); // break edges into forward and reverse directions. diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index c348613e7..18292e607 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -55,11 +55,11 @@ where use crate::collection::AsCollection; let init = self.map(|record| (0, record)); - timely::dataflow::operators::generic::operator::empty(&init.scope()) + timely::dataflow::operators::generic::operator::empty(init.scope()) .as_collection() .iterate(|scope, diff| init.clone() - .enter(&scope) + .enter(scope) .concat(diff) .map(|pair| (pair.hashed(), pair)) .reduce(|_hash, input, output| { @@ -107,11 +107,11 @@ mod tests { use crate::collection::AsCollection; let init = input.map(|record| (0, record)); - timely::dataflow::operators::generic::operator::empty(&init.scope()) + timely::dataflow::operators::generic::operator::empty(init.scope()) .as_collection() .iterate(|scope, diff| init.clone() - .enter(&scope) + .enter(scope) .concat(diff) .map(|(round, num)| ((round + num) / 10, (round, num))) .reduce(|_hash, input, output| { diff --git a/differential-dataflow/src/algorithms/prefix_sum.rs b/differential-dataflow/src/algorithms/prefix_sum.rs index e8d1b0f4e..90156d688 100644 --- a/differential-dataflow/src/algorithms/prefix_sum.rs +++ b/differential-dataflow/src/algorithms/prefix_sum.rs @@ -59,7 +59,7 @@ where // most two elements, then summarizes itself using the `combine` function. Finally, we re-add // the initial `unit_ranges` intervals, so that the set of ranges grows monotonically. - let unit_ranges = unit_ranges.enter(&scope); + let unit_ranges = unit_ranges.enter(scope); ranges .filter(|&((_pos, log, _), _)| log < 64) .map(|((pos, log, key), data)| ((pos >> 1, log + 1, key), (pos, data))) @@ -141,9 +141,9 @@ where init_states .clone() .iterate(|scope, states| { - let init_states = init_states.enter(&scope); + let init_states = init_states.enter(scope); used_ranges - .enter(&scope) + .enter(scope) .map(|((pos, log, key), data)| ((pos << log, key), (log, data))) .join_map(states, move |&(pos, ref key), &(log, ref data), state| ((pos + (1 << log), key.clone()), combine(key, state, data))) diff --git a/differential-dataflow/src/capture.rs b/differential-dataflow/src/capture.rs index 65069bd74..def7c9ae4 100644 --- a/differential-dataflow/src/capture.rs +++ b/differential-dataflow/src/capture.rs @@ -230,7 +230,7 @@ pub mod source { use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}}; use timely::dataflow::operators::generic::OutputBuilder; use timely::progress::Timestamp; - use timely::scheduling::{Scheduler, SyncActivator}; + use timely::scheduling::SyncActivator; // TODO(guswynn): implement this generally in timely struct DropActivator { @@ -307,11 +307,11 @@ pub mod source { let shared_frontier2 = shared_frontier.clone(); // Step 1: The MESSAGES operator. - let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone()); + let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope); let address = messages_op.operator_info().address; - let activator = scope.sync_activator_for(address.to_vec()); + let activator = scope.worker().sync_activator_for(address.to_vec()); let activator2 = scope.activator_for(Rc::clone(&address)); - let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(address.to_vec())) }; + let drop_activator = DropActivator { activator: Arc::new(scope.worker().sync_activator_for(address.to_vec())) }; let mut source = source_builder(activator); let (updates_out, updates) = messages_op.new_output(); let mut updates_out = OutputBuilder::from(updates_out); @@ -388,7 +388,7 @@ pub mod source { }); // Step 2: The UPDATES operator. - let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone()); + let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope); let mut input = updates_op.new_input(updates, Exchange::new(|x: &(D, T, R)| x.hashed())); let (changes_out, changes) = updates_op.new_output(); let mut changes_out = OutputBuilder::from(changes_out); @@ -437,7 +437,7 @@ pub mod source { }); // Step 3: The PROGRESS operator. - let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone()); + let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope); let mut input = progress_op.new_input( progress, Exchange::new(|x: &(usize, Progress)| x.0 as u64), @@ -521,7 +521,7 @@ pub mod source { }); // Step 4: The FEEDBACK operator. - let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone()); + let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope); let mut input = feedback_op.new_input( frontier, Exchange::new(|x: &(usize, ChangeBatch)| x.0 as u64), @@ -562,7 +562,6 @@ pub mod sink { use timely::dataflow::Stream; use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::operators::generic::{builder_rc::OperatorBuilder, OutputBuilder}; - use timely::scheduling::Scheduler; use crate::{lattice::Lattice, ExchangeData}; use super::{Writer, Message, Progress}; diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 8d5377498..16755c27e 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -108,7 +108,7 @@ impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> { /// /// This method is a specialization of `enter` to the case where the nested scope is a region. /// It removes the need for an operator that adjusts the timestamp. - pub fn enter_region<'inner>(self, child: &Scope<'inner, T>) -> Collection<'inner, T, C> { + pub fn enter_region<'inner>(self, child: Scope<'inner, T>) -> Collection<'inner, T, C> { self.inner .enter(child) .as_collection() @@ -208,13 +208,13 @@ impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> { /// let result = scope.region(|child| { /// data.clone() /// .enter(child) - /// .leave(&scope) + /// .leave(scope) /// }); /// /// data.assert_eq(result); /// }); /// ``` - pub fn enter<'inner, TInner>(self, child: &Scope<'inner, TInner>) -> Collection<'inner, TInner, >::InnerContainer> + pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Collection<'inner, TInner, >::InnerContainer> where C: containers::Enter, TInner: Refines, @@ -287,7 +287,7 @@ impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> /// data.assert_eq(result); /// }); /// ``` - pub fn leave<'outer, TOuter>(self, outer: &Scope<'outer, TOuter>) -> Collection<'outer, TOuter, >::OuterContainer> + pub fn leave<'outer, TOuter>(self, outer: Scope<'outer, TOuter>) -> Collection<'outer, TOuter, >::OuterContainer> where TOuter: Timestamp, T: Refines, @@ -306,7 +306,7 @@ impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> /// /// This method is a specialization of `leave` to the case that of a nested region. /// It removes the need for an operator that adjusts the timestamp. - pub fn leave_region<'outer>(self, outer: &Scope<'outer, T>) -> Collection<'outer, T, C> { + pub fn leave_region<'outer>(self, outer: Scope<'outer, T>) -> Collection<'outer, T, C> { self.inner .leave(outer) .as_collection() @@ -533,13 +533,13 @@ pub mod vec { /// let result = scope.iterative::(|child| { /// data.clone() /// .enter_at(child, |x| *x) - /// .leave(&scope) + /// .leave(scope) /// }); /// /// data.assert_eq(result); /// }); /// ``` - pub fn enter_at<'inner, TInner, F>(self, child: &Iterative<'inner, T, TInner>, mut initial: F) -> Collection<'inner, Product, D, R> + pub fn enter_at<'inner, TInner, F>(self, child: Iterative<'inner, T, TInner>, mut initial: F) -> Collection<'inner, Product, D, R> where TInner: Timestamp+Hash, F: FnMut(&D) -> TInner + Clone + 'static, @@ -1286,7 +1286,7 @@ impl<'scope, T: Timestamp, C> AsCollection<'scope, T, C> for Stream<'scope, T, C /// .assert_eq(data); /// }); /// ``` -pub fn concatenate<'scope, T, C, I>(scope: &mut Scope<'scope, T>, iterator: I) -> Collection<'scope, T, C> +pub fn concatenate<'scope, T, C, I>(scope: Scope<'scope, T>, iterator: I) -> Collection<'scope, T, C> where T: Timestamp, C: Container, diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 03045270c..e43323c7d 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -41,7 +41,7 @@ pub trait Input<'scope> : TimelyInput<'scope> { /// /// }).unwrap(); /// ``` - fn new_collection(&mut self) -> (InputSession, VecCollection<'scope, Self::Timestamp, D, R>) + fn new_collection(&self) -> (InputSession, VecCollection<'scope, Self::Timestamp, D, R>) where D: Data, R: Semigroup+'static; /// Create a new collection and input handle from initial data. /// @@ -67,7 +67,7 @@ pub trait Input<'scope> : TimelyInput<'scope> { /// /// }).unwrap(); /// ``` - fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection<'scope, Self::Timestamp, I::Item, isize>) + fn new_collection_from(&self, data: I) -> (InputSession, VecCollection<'scope, Self::Timestamp, I::Item, isize>) where I: IntoIterator + 'static; /// Create a new collection and input handle from initial data. /// @@ -93,24 +93,24 @@ pub trait Input<'scope> : TimelyInput<'scope> { /// /// }).unwrap(); /// ``` - fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection<'scope, Self::Timestamp, D, R>) + fn new_collection_from_raw(&self, data: I) -> (InputSession, VecCollection<'scope, Self::Timestamp, D, R>) where I: IntoIterator+'static, D: Data, R: Semigroup+'static; } use crate::lattice::Lattice; impl<'scope, T: Timestamp + Lattice + timely::order::TotalOrder> Input<'scope> for Scope<'scope, T> { - fn new_collection(&mut self) -> (InputSession, VecCollection<'scope, T, D, R>) + fn new_collection(&self) -> (InputSession, VecCollection<'scope, T, D, R>) where D: Data, R: Semigroup+'static, { let (handle, stream) = self.new_input(); (InputSession::from(handle), stream.as_collection()) } - fn new_collection_from(&mut self, data: I) -> (InputSession, VecCollection<'scope, T, I::Item, isize>) + fn new_collection_from(&self, data: I) -> (InputSession, VecCollection<'scope, T, I::Item, isize>) where I: IntoIterator+'static, I::Item: Data { self.new_collection_from_raw(data.into_iter().map(|d| (d, ::minimum(), 1))) } - fn new_collection_from_raw(&mut self, data: I) -> (InputSession, VecCollection<'scope, T, D, R>) + fn new_collection_from_raw(&self, data: I) -> (InputSession, VecCollection<'scope, T, D, R>) where D: Data, R: Semigroup+'static, @@ -119,7 +119,7 @@ impl<'scope, T: Timestamp + Lattice + timely::order::TotalOrder> Input<'scope> f use timely::dataflow::operators::ToStream; let (handle, stream) = self.new_input(); - let source = data.to_stream(self).as_collection(); + let source = data.to_stream(*self).as_collection(); (InputSession::from(handle), stream.as_collection().concat(source)) } @@ -199,7 +199,7 @@ impl InputSession { impl InputSession { /// Introduces a handle as collection. - pub fn to_collection<'scope>(&mut self, scope: &mut Scope<'scope, T>) -> VecCollection<'scope, T, D, R> + pub fn to_collection<'scope>(&mut self, scope: Scope<'scope, T>) -> VecCollection<'scope, T, D, R> where T: timely::order::TotalOrder, { diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 61118a497..630c4258a 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -9,7 +9,6 @@ use timely::dataflow::operators::generic::{OperatorInfo, source}; use timely::progress::Timestamp; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::CapabilitySet; -use timely::scheduling::Scheduler; use crate::trace::{Trace, TraceReader, BatchReader}; use crate::trace::wrappers::rc::TraceBox; @@ -216,13 +215,13 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceAgent> + pub fn import<'scope>(&mut self, scope: Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceAgent> { self.import_named(scope, "ArrangedSource") } /// Same as `import`, but allows to name the source. - pub fn import_named<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceAgent> + pub fn import_named<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceAgent> { // Drop ShutdownButton and return only the arrangement. self.import_core(scope, name).0 @@ -275,7 +274,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_core<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceAgent>, ShutdownButton>) + pub fn import_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceAgent>, ShutdownButton>) { let trace = self.clone(); @@ -388,7 +387,7 @@ impl TraceAgent { /// /// }).unwrap(); /// ``` - pub fn import_frontier<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) + pub fn import_frontier<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) where Tr: TraceReader, { @@ -405,7 +404,7 @@ impl TraceAgent { /// /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty /// frontier indicates the end of times. - pub fn import_frontier_core<'scope>(&mut self, scope: &Scope<'scope, Tr::Time>, name: &str, since: Antichain, until: Antichain) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) + pub fn import_frontier_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str, since: Antichain, until: Antichain) -> (Arranged<'scope, TraceFrontier>>, ShutdownButton>) where Tr: TraceReader, { diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 7435e692e..de533a08f 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -25,8 +25,6 @@ use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline}; use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; -use timely::scheduling::Scheduler; -use timely::worker::AsWorker; use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; @@ -83,7 +81,7 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter<'inner, TInner>(self, child: &Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter> + pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Arranged<'inner, TraceEnter> where TInner: Refines+Lattice, { @@ -97,7 +95,7 @@ where /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn enter_region<'inner>(self, child: &Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> { + pub fn enter_region<'inner>(self, child: Scope<'inner, Tr::Time>) -> Arranged<'inner, Tr> { Arranged { stream: self.stream.enter(child), trace: self.trace, @@ -109,7 +107,7 @@ where /// This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps /// have all been extended with an additional coordinate with the default value. The resulting collection does /// not vary with the new timestamp coordinate. - pub fn enter_at<'inner, TInner, F, P>(self, child: &Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt> + pub fn enter_at<'inner, TInner, F, P>(self, child: Scope<'inner, TInner>, logic: F, prior: P) -> Arranged<'inner, TraceEnterAt> where TInner: Refines+Lattice+'static, F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static, @@ -314,7 +312,7 @@ where /// /// This method only applies to *regions*, which are subscopes with the same timestamp /// as their containing scope. In this case, the trace type does not need to change. - pub fn leave_region<'outer>(self, outer: &Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> { + pub fn leave_region<'outer>(self, outer: Scope<'outer, Tr::Time>) -> Arranged<'outer, Tr> { use timely::dataflow::operators::Leave; Arranged { stream: self.stream.leave(outer), @@ -383,7 +381,7 @@ where let stream = stream.unary_frontier(pact, name, move |_capability, info| { // Acquire a logger for arrange events. - let logger = scope.logger_for::("differential/arrange").map(Into::into); + let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); // Where we will deposit received updates, and from which we extract batches. let mut batcher = Ba::new(logger.clone(), info.global_id); @@ -394,7 +392,7 @@ where let activator = Some(scope.activator_for(info.address.clone())); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); // If there is default exertion logic set, install it. - if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { + if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { empty_trace.set_exert_logic(exert_logic); } diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 0c88eebd9..5e336fc3d 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -106,8 +106,6 @@ use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::Exchange; use timely::progress::{Antichain, Timestamp}; use timely::dataflow::operators::Capability; -use timely::scheduling::Scheduler; -use timely::worker::AsWorker; use crate::operators::arrange::arrangement::Arranged; use crate::trace::{Builder, Description}; @@ -156,7 +154,7 @@ where stream.unary_frontier(exchange, name, move |_capability, info| { // Acquire a logger for arrange events. - let logger = scope.logger_for::("differential/arrange").map(Into::into); + let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); // Tracks the lower envelope of times in `priority_queue`. let mut capabilities = Antichain::>::new(); @@ -164,7 +162,7 @@ where let activator = Some(scope.activator_for(info.address.clone())); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { + if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { empty_trace.set_exert_logic(exert_logic); } diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 496f9f5c1..0beae88bf 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -92,9 +92,9 @@ impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Abelian+'static> Iter // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1)); - let result = logic(subgraph.clone(), collection); + let result = logic(subgraph, collection); variable.set(result.clone()); - result.leave(&outer) + result.leave(outer) }) } } @@ -104,7 +104,7 @@ impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Semigroup+'static> It where for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product, D, R>)->VecCollection<'inner, Product, D, R>, { - let outer = self.clone(); + let outer = self; self.scoped("Iterate", |subgraph| { // create a new variable, apply logic, bind variable, return. // @@ -113,9 +113,9 @@ impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Semigroup+'static> It // diffs produced; `result` is post-consolidation, and means fewer // records are yielded out of the loop. let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1)); - let result = logic(subgraph.clone(), collection); + let result = logic(subgraph, collection); variable.set(result.clone()); - result.leave(&outer) + result.leave(outer) } ) } @@ -148,7 +148,7 @@ impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Semigroup+'static> It /// let result = collection.map(|x| if x % 2 == 0 { x/2 } else { x }) /// .consolidate(); /// variable.set(result.clone()); -/// result.leave(&scope) +/// result.leave(scope) /// }); /// }) /// ``` @@ -218,7 +218,7 @@ where /// will produce its fixed point in the outer scope. /// /// In a non-iterative scope the mechanics are the same, but the interpretation varies. - pub fn new(scope: &mut Scope<'scope, T>, step: T::Summary) -> (Self, Collection<'scope, T, C>) { + pub fn new(scope: Scope<'scope, T>, step: T::Summary) -> (Self, Collection<'scope, T, C>) { let (feedback, updates) = scope.feedback(step.clone()); let collection = Collection::::new(updates); (Self { feedback, source: None, step }, collection) diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index b01e3bf89..00277ced3 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -13,7 +13,6 @@ use timely::dataflow::Stream; use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session}; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; -use timely::scheduling::Scheduler; use crate::lattice::Lattice; use crate::operators::arrange::Arranged; diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index a695527a8..06fbddc99 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -11,8 +11,6 @@ use timely::progress::frontier::Antichain; use timely::progress::Timestamp; use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; -use timely::scheduling::Scheduler; -use timely::worker::AsWorker; use crate::operators::arrange::{Arranged, TraceAgent}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; @@ -51,12 +49,12 @@ where trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| { // Acquire a logger for arrange events. - let logger = scope.logger_for::("differential/arrange").map(Into::into); + let logger = scope.worker().logger_for::("differential/arrange").map(Into::into); let activator = Some(scope.activator_for(operator_info.address.clone())); let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator); // If there is default exert logic set, install it. - if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { + if let Some(exert_logic) = scope.worker().config().get::("differential/default_exert_logic").cloned() { empty.set_exert_logic(exert_logic); } diff --git a/differential-dataflow/tests/bfs.rs b/differential-dataflow/tests/bfs.rs index 721f7709d..2362bf6af 100644 --- a/differential-dataflow/tests/bfs.rs +++ b/differential-dataflow/tests/bfs.rs @@ -211,8 +211,8 @@ where // repeatedly update minimal distances each node can be reached from each root nodes.clone().iterate(|scope, inner| { - let edges = edges.enter(&scope); - let nodes = nodes.enter(&scope); + let edges = edges.enter(scope); + let nodes = nodes.enter(scope); inner.join_map(edges, |_k,l,d| (*d, l+1)) .concat(nodes) diff --git a/differential-dataflow/tests/scc.rs b/differential-dataflow/tests/scc.rs index 2c0b35d92..c1e702ff7 100644 --- a/differential-dataflow/tests/scc.rs +++ b/differential-dataflow/tests/scc.rs @@ -219,7 +219,7 @@ where T: timely::progress::Timestamp + Lattice + Ord + Hash, { graph.clone().iterate(|scope, inner| { - let edges = graph.enter(&scope); + let edges = graph.enter(scope); let trans = edges.clone().map_in_place(|x| mem::swap(&mut x.0, &mut x.1)); _trim_edges(_trim_edges(inner, edges), trans) }) @@ -250,8 +250,8 @@ where edges.clone() // <-- wth is this. .filter(|_| false) .iterate(|scope, inner| { - let edges = edges.enter(&scope); - let nodes = nodes.enter_at(&scope, |r| 256 * (64 - (r.0 as u64).leading_zeros() as u64)); + let edges = edges.enter(scope); + let nodes = nodes.enter_at(scope, |r| 256 * (64 - (r.0 as u64).leading_zeros() as u64)); inner.join_map(edges, |_k,l,d| (*d,*l)) .concat(nodes) diff --git a/dogsdogsdogs/examples/delta_query.rs b/dogsdogsdogs/examples/delta_query.rs index 74d24b246..bf5388fce 100644 --- a/dogsdogsdogs/examples/delta_query.rs +++ b/dogsdogsdogs/examples/delta_query.rs @@ -89,7 +89,7 @@ fn main() { let changes3 = validate(changes3, reverse_self_alt.clone(), key2.clone()); let changes3 = changes3.map(|((a,c),b)| (a,b,c)); - let prev_changes = changes1.concat(changes2).concat(changes3).leave(&scope); + let prev_changes = changes1.concat(changes2).concat(changes3).leave(scope); // New ideas let d_edges = edges.differentiate(inner); @@ -115,7 +115,7 @@ fn main() { .join_core(forward_key_alt, |a,c,b| Some(((*c, *b), *a))) .join_core(reverse_self_alt, |(c,b), a, &()| Some((*a,*b,*c))); - let next_changes = changes1.concat(changes2).concat(changes3).integrate(&scope); + let next_changes = changes1.concat(changes2).concat(changes3).integrate(scope); (prev_changes, next_changes) }); diff --git a/dogsdogsdogs/examples/delta_query_wcoj.rs b/dogsdogsdogs/examples/delta_query_wcoj.rs index 760c0afc1..cba34a71f 100644 --- a/dogsdogsdogs/examples/delta_query_wcoj.rs +++ b/dogsdogsdogs/examples/delta_query_wcoj.rs @@ -79,7 +79,7 @@ fn main() { ]) .map(|((a,c),b)| (a,b,c)); - changes1.concat(changes2).concat(changes3).leave(&scope) + changes1.concat(changes2).concat(changes3).leave(scope) }); triangles diff --git a/dogsdogsdogs/src/calculus.rs b/dogsdogsdogs/src/calculus.rs index a9bfe1589..f7697e1e6 100644 --- a/dogsdogsdogs/src/calculus.rs +++ b/dogsdogsdogs/src/calculus.rs @@ -22,12 +22,12 @@ use crate::altneu::AltNeu; /// Produce a collection containing the changes at the moments they happen. pub trait Differentiate<'scope, T: Timestamp, D: Data, R: Abelian> { - fn differentiate<'inner>(self, child: &Scope<'inner, AltNeu>) -> VecCollection<'inner, AltNeu, D, R>; + fn differentiate<'inner>(self, child: Scope<'inner, AltNeu>) -> VecCollection<'inner, AltNeu, D, R>; } /// Collect instantaneous changes back in to a collection. pub trait Integrate<'scope, T: Timestamp, D: Data, R: Abelian> { - fn integrate<'outer>(self, outer: &Scope<'outer, T>) -> VecCollection<'outer, T, D, R>; + fn integrate<'outer>(self, outer: Scope<'outer, T>) -> VecCollection<'outer, T, D, R>; } impl<'scope, T, D, R> Differentiate<'scope, T, D, R> for VecCollection<'scope, T, D, R> @@ -37,7 +37,7 @@ where R: Abelian + 'static, { // For each (data, Alt(time), diff) we add a (data, Neu(time), -diff). - fn differentiate<'inner>(self, child: &Scope<'inner, AltNeu>) -> VecCollection<'inner, AltNeu, D, R> { + fn differentiate<'inner>(self, child: Scope<'inner, AltNeu>) -> VecCollection<'inner, AltNeu, D, R> { self.enter(child) .inner .flat_map(|(data, time, diff)| { @@ -58,7 +58,7 @@ where R: Abelian + 'static, { // We discard each `neu` variant and strip off the `alt` wrapper. - fn integrate<'outer>(self, outer: &Scope<'outer, T>) -> VecCollection<'outer, T, D, R> { + fn integrate<'outer>(self, outer: Scope<'outer, T>) -> VecCollection<'outer, T, D, R> { self.inner .filter(|(_d,t,_r)| !t.neu) .as_collection() diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 91549a6f3..ce484e3b7 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -192,17 +192,17 @@ where type Extension = V; fn count(&mut self, prefixes: VecCollection<'scope, T, (P, usize, usize), R>, index: usize) -> VecCollection<'scope, T, (P, usize, usize), R> { - let counts = self.indices.count_trace.import(&prefixes.scope()); + let counts = self.indices.count_trace.import(prefixes.scope()); operators::count::count(prefixes, counts, self.key_selector.clone(), index) } fn propose(&mut self, prefixes: VecCollection<'scope, T, P, R>) -> VecCollection<'scope, T, (P, V), R> { - let propose = self.indices.propose_trace.import(&prefixes.scope()); + let propose = self.indices.propose_trace.import(prefixes.scope()); operators::propose::propose(prefixes, propose, self.key_selector.clone()) } fn validate(&mut self, extensions: VecCollection<'scope, T, (P, V), R>) -> VecCollection<'scope, T, (P, V), R> { - let validate = self.indices.validate_trace.import(&extensions.scope()); + let validate = self.indices.validate_trace.import(extensions.scope()); operators::validate::validate(extensions, validate, self.key_selector.clone()) } } diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index e095d95a8..36836357c 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -38,7 +38,6 @@ use std::time::Instant; use timely::ContainerBuilder; use timely::container::CapacityContainerBuilder; use timely::dataflow::Stream; -use timely::scheduling::Scheduler; use timely::dataflow::channels::pact::{Pipeline, Exchange}; use timely::dataflow::operators::{Capability, Operator, generic::Session}; use timely::PartialOrder; diff --git a/dogsdogsdogs/src/operators/half_join2.rs b/dogsdogsdogs/src/operators/half_join2.rs index 2cc963f76..2c61b8e3f 100644 --- a/dogsdogsdogs/src/operators/half_join2.rs +++ b/dogsdogsdogs/src/operators/half_join2.rs @@ -23,7 +23,6 @@ use std::ops::Mul; use timely::ContainerBuilder; use timely::container::CapacityContainerBuilder; use timely::dataflow::Stream; -use timely::scheduling::Scheduler; use timely::dataflow::channels::pact::{Pipeline, Exchange}; use timely::dataflow::operators::Operator; use timely::PartialOrder; diff --git a/experiments/src/bin/attend.rs b/experiments/src/bin/attend.rs index 60e17a3f0..a78f8f276 100644 --- a/experiments/src/bin/attend.rs +++ b/experiments/src/bin/attend.rs @@ -24,11 +24,11 @@ fn main() { organizers .clone() .iterate(|scope, attend| { - graph2.enter(&scope) + graph2.enter(scope) .semijoin(attend) .map(|(_,y)| y) .threshold_total(|_,w| if w >= &3 { 1 } else { 0 }) - .concat(organizers.enter(&scope)) + .concat(organizers.enter(scope)) .consolidate() }) .map(|_| ()) diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index c3d2a524a..234152d59 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -217,9 +217,9 @@ fn interactive<'s, G: timely::progress::Timestamp + Lattice>( tc_1.map(|x| (x,x)) .iterate(|scope, inner| edges_q1 - .enter(&scope) + .enter(scope) .join_core(inner.arrange_by_key(), |_,&y,&q| [(y,q)]) - .concat(tc_1_enter.enter(&scope).map(|x| (x,x))) + .concat(tc_1_enter.enter(scope).map(|x| (x,x))) .distinct() ) .map(|(x,q)| (q,x)); @@ -232,9 +232,9 @@ fn interactive<'s, G: timely::progress::Timestamp + Lattice>( .iterate(|scope, inner| edges_q2 .as_collection(|&k,&v| (v,k)) - .enter(&scope) + .enter(scope) .join_core(inner.arrange_by_key(), |_,&y,&q| [(y,q)]) - .concat(tc_2_enter.enter(&scope).map(|x| (x,x))) + .concat(tc_2_enter.enter(scope).map(|x| (x,x))) .distinct() ) .map(|(x,q)| (q,x)); @@ -251,10 +251,10 @@ fn interactive<'s, G: timely::progress::Timestamp + Lattice>( sg_x.iterate(|scope, inner| edges_magic .as_collection(|&k,&v| (v,k)) - .enter(&scope) + .enter(scope) .semijoin(inner) .map(|(_x,y)| y) - .concat(sg_x_enter.enter(&scope)) + .concat(sg_x_enter.enter(scope)) .distinct() ); @@ -271,9 +271,9 @@ fn interactive<'s, G: timely::progress::Timestamp + Lattice>( .map(|x| (x,x)) // for query q, sg(x,x) .iterate(|scope, inner| { - let edges = edges.enter(&scope); - let magic = magic_enter.enter(&scope); - let magic_edges = magic_edges.enter(&scope); + let edges = edges.enter(scope); + let magic = magic_enter.enter(scope); + let magic_edges = magic_edges.enter(scope); let result = inner diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index 966cc4840..f20e3678f 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -102,7 +102,7 @@ fn tc<'s, T: timely::progress::Timestamp + Lattice + Default + timely::order::Em ; inner.set(result.clone()); - result.leave(&outer) + result.leave(outer) } ) } @@ -132,7 +132,7 @@ fn sg<'s, T: timely::progress::Timestamp + Lattice + Default + timely::order::Em ; inner.set(result.clone()); - result.leave(&outer) + result.leave(outer) } ) } diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 032f5d5ec..838836d5e 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -357,7 +357,7 @@ fn _bidijkstra<'s, T: Timestamp + Lattice + Ord>( reverse.set(reverse_next); - reached.leave(&outer) + reached.leave(outer) }) } @@ -380,8 +380,8 @@ where T: Lattice + std::hash::Hash .filter(|_| false) .iterate(|scope, inner| { - let graph = graph.enter(&scope); - let nodes = nodes.enter_at(&scope, |r| 256 * (64 - r.1.leading_zeros() as u64)); + let graph = graph.enter(scope); + let nodes = nodes.enter_at(scope, |r| 256 * (64 - r.1.leading_zeros() as u64)); let inner = inner.arrange_by_key(); diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index c12e92845..e1f26ad29 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -390,6 +390,6 @@ fn _bidijkstra<'s, T: Timestamp + Lattice + Ord>( reverse.set(reverse_next); - reached.leave(&outer) + reached.leave(outer) }) } diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index d7914c66e..29a6ee6b8 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -294,6 +294,6 @@ fn _bidijkstra<'s, T: Timestamp + Lattice + Ord>( reverse.set(reverse_next); - reached.leave(&outer) + reached.leave(outer) }) } diff --git a/experiments/src/bin/graphs-static.rs b/experiments/src/bin/graphs-static.rs index 02de62956..fc6a5cb3d 100644 --- a/experiments/src/bin/graphs-static.rs +++ b/experiments/src/bin/graphs-static.rs @@ -109,7 +109,7 @@ fn reach<'s>( roots: VecCollection<'s, (), Node, Diff> ) -> VecCollection<'s, (), Node, Diff> { - let graph = graph.import(&roots.scope()); + let graph = graph.import(roots.scope()); let outer = roots.scope(); outer.iterative::(|scope| { @@ -125,7 +125,7 @@ fn reach<'s>( .threshold_total(|_,_| 1); inner.set(result.clone()); - result.leave(&outer) + result.leave(outer) }) } @@ -135,7 +135,7 @@ fn bfs<'s>( roots: VecCollection<'s, (), Node, Diff> ) -> VecCollection<'s, (), (Node, u32), Diff> { - let graph = graph.import(&roots.scope()); + let graph = graph.import(roots.scope()); let roots = roots.map(|r| (r,0)); let outer = roots.scope(); @@ -151,12 +151,12 @@ fn bfs<'s>( .reduce(|_key, input, output| output.push((*input[0].0,1))); inner.set(result.clone()); - result.leave(&outer) + result.leave(outer) }) } fn connected_components<'s>( - scope: &mut timely::dataflow::Scope<'s, ()>, + scope: timely::dataflow::Scope<'s, ()>, forward: &mut TraceHandle, reverse: &mut TraceHandle, ) -> VecCollection<'s, (), (Node, Node), Diff> { diff --git a/experiments/src/bin/graphs.rs b/experiments/src/bin/graphs.rs index c5294a37b..633bb986c 100644 --- a/experiments/src/bin/graphs.rs +++ b/experiments/src/bin/graphs.rs @@ -91,12 +91,12 @@ fn reach<'s>( roots: VecCollection<'s, (), Node> ) -> VecCollection<'s, (), Node> { - let graph = graph.import(&roots.scope()); + let graph = graph.import(roots.scope()); roots.clone().iterate(|scope, inner| { - let graph = graph.enter(&scope); - let roots = roots.enter(&scope); + let graph = graph.enter(scope); + let roots = roots.enter(scope); // let reach = inner.concat(roots).distinct_total().arrange_by_self(); // graph.join_core(reach, |_src,&dst,&()| Some(dst)) @@ -113,13 +113,13 @@ fn bfs<'s>( roots: VecCollection<'s, (), Node> ) -> VecCollection<'s, (), (Node, u32)> { - let graph = graph.import(&roots.scope()); + let graph = graph.import(roots.scope()); let roots = roots.map(|r| (r,0)); roots.clone().iterate(|scope, inner| { - let graph = graph.enter(&scope); - let roots = roots.enter(&scope); + let graph = graph.enter(scope); + let roots = roots.enter(scope); graph.join_core(inner.arrange_by_key(), |_src,&dest,&dist| [(dest, dist+1)]) .concat(roots) @@ -147,8 +147,8 @@ fn bfs<'s>( // // don't actually use these labels, just grab the type // nodes.filter(|_| false) // .iterate(|scope, inner| { -// let edges = edges.enter(&scope); -// let nodes = nodes.enter_at(&scope, |r| 256 * (64 - r.1.leading_zeros() as u64)); +// let edges = edges.enter(scope); +// let nodes = nodes.enter_at(scope, |r| 256 * (64 - r.1.leading_zeros() as u64)); // inner.join_map(edges, |_k,l,d| (*d,*l)) // .concat(nodes) diff --git a/experiments/src/bin/graspan-interactive.rs b/experiments/src/bin/graspan-interactive.rs index 198a7dfcb..93953bba7 100644 --- a/experiments/src/bin/graspan-interactive.rs +++ b/experiments/src/bin/graspan-interactive.rs @@ -30,8 +30,8 @@ fn main() { .filter(|_| false) .iterate(|scope, inner| { - let nodes = nodes2.enter(&scope); - let edges = edges.enter(&scope); + let nodes = nodes2.enter(scope); + let edges = edges.enter(scope); inner .map(|(a,b)| (b,a)) diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index 571b46f1b..ce04f2202 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -51,7 +51,7 @@ fn main() { .threshold_semigroup(|_,_,x: Option<&Present>| if x.is_none() { Some(Present) } else { None }); labels.set(next.clone()); - next.leave(&outer) + next.leave(outer) }); reached diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index f101ae246..fcd05ee65 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -107,7 +107,7 @@ fn unoptimized() { value_flow.set(value_flow_next.clone()); memory_alias.set(memory_alias_next.clone()); - (value_flow_next.leave(&scope), memory_alias_next.leave(&scope), value_alias_next.leave(&scope)) + (value_flow_next.leave(scope), memory_alias_next.leave(scope), value_alias_next.leave(scope)) }); value_flow.map(|_| ()).consolidate().inspect(|x| println!("VF: {:?}", x)); @@ -232,7 +232,7 @@ fn optimized() { value_flow.set(value_flow_next.clone()); memory_alias.set(memory_alias_next.clone()); - (value_flow_next.leave(&scope), memory_alias_next.leave(&scope)) + (value_flow_next.leave(scope), memory_alias_next.leave(scope)) }); value_flow.map(|_| ()).consolidate().inspect(|x| println!("VF: {:?}", x)); diff --git a/experiments/src/bin/multitemporal.rs b/experiments/src/bin/multitemporal.rs index eb602efe2..f926f5918 100644 --- a/experiments/src/bin/multitemporal.rs +++ b/experiments/src/bin/multitemporal.rs @@ -35,8 +35,8 @@ fn main() { roots.clone().iterate(|scope, inner| { - let edges = edges.enter(&scope); - let roots = roots.enter(&scope); + let edges = edges.enter(scope); + let roots = roots.enter(scope); edges .semijoin(inner) diff --git a/interactive/examples/ddir_col.rs b/interactive/examples/ddir_col.rs index 0d53fb609..6ece2eeae 100644 --- a/interactive/examples/ddir_col.rs +++ b/interactive/examples/ddir_col.rs @@ -131,7 +131,7 @@ mod render { } } - pub fn render_program<'scope>(program: &Program, scope: &mut Scope<'scope, ConcreteTime>, inputs: &[Col<'scope>]) -> HashMap> + pub fn render_program<'scope>(program: &Program, scope: Scope<'scope, ConcreteTime>, inputs: &[Col<'scope>]) -> HashMap> { let mut nodes: HashMap> = HashMap::new(); let mut level: usize = 0; diff --git a/interactive/examples/ddir_vec.rs b/interactive/examples/ddir_vec.rs index 941adc9cb..b3104f223 100644 --- a/interactive/examples/ddir_vec.rs +++ b/interactive/examples/ddir_vec.rs @@ -33,7 +33,7 @@ impl<'scope, T: timely::progress::Timestamp + differential_dataflow::lattice::La } -fn render_program<'scope>(program: &Program, scope: &mut Scope<'scope, DdirTime>, inputs: &[Col<'scope, DdirTime>]) -> HashMap> +fn render_program<'scope>(program: &Program, scope: Scope<'scope, DdirTime>, inputs: &[Col<'scope, DdirTime>]) -> HashMap> { let mut nodes: HashMap> = HashMap::new(); let mut level: usize = 0; diff --git a/mdbook/src/chapter_2/chapter_2_7.md b/mdbook/src/chapter_2/chapter_2_7.md index 18c58965a..80ca6ab8a 100644 --- a/mdbook/src/chapter_2/chapter_2_7.md +++ b/mdbook/src/chapter_2/chapter_2_7.md @@ -53,7 +53,7 @@ In the example above, we could rewrite .clone() .iterate(|scope, transitive| { - let manages = manages.enter(&scope); + let manages = manages.enter(scope); transitive .map(|(mk, m1)| (m1, mk)) @@ -99,7 +99,7 @@ As an example, the implementation of the `iterate` operator looks something like let (variable, collection) = VecVariable::new_from(collection.enter(subgraph), 1); let result = logic(collection); variable.set(result.clone()); - result.leave(&outer) + result.leave(outer) }); # } ``` diff --git a/mdbook/src/chapter_4/chapter_4_1.md b/mdbook/src/chapter_4/chapter_4_1.md index 268367b06..78de41853 100644 --- a/mdbook/src/chapter_4/chapter_4_1.md +++ b/mdbook/src/chapter_4/chapter_4_1.md @@ -15,8 +15,8 @@ Let's write this computation starting from a collection `edges`, using different labels .iterate(|scope, inner| { - let labels = labels.enter(&scope); - let edges = edges.enter(&scope); + let labels = labels.enter(scope); + let edges = edges.enter(scope); inner.join(edges) .map(|(_src,(lbl,dst))| (dst,lbl)) .concat(labels) diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index 35ebe9ecd..0f4c6bede 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -43,8 +43,8 @@ fn main() { // Reachability queries. query.clone().iterate(|scope, reach| { - let knows = knows.enter(&scope); - let query = query.enter(&scope); + let knows = knows.enter(scope); + let query = query.enter(scope); knows.join_core(reach.arrange_by_key(), |x,y,q| [(*y,*q)]) .concat(query) diff --git a/server/dataflows/random_graph/src/lib.rs b/server/dataflows/random_graph/src/lib.rs index cb527b25d..971646c04 100644 --- a/server/dataflows/random_graph/src/lib.rs +++ b/server/dataflows/random_graph/src/lib.rs @@ -3,7 +3,7 @@ use std::cell::RefCell; use rand::{Rng, SeedableRng, StdRng}; -use timely::scheduling::Scheduler; + use timely::dataflow::operators::Probe; use timely::dataflow::operators::generic::operator::source; use timely::progress::Antichain; diff --git a/server/dataflows/reachability/src/lib.rs b/server/dataflows/reachability/src/lib.rs index e5f46ad16..6ad5e9d16 100644 --- a/server/dataflows/reachability/src/lib.rs +++ b/server/dataflows/reachability/src/lib.rs @@ -20,8 +20,8 @@ pub fn build((dataflow, handles, probe, _timer, args): Environment) -> Result<() // repeatedly update minimal distances each node can be reached from each root roots.clone().iterate(|scope, dists| { - let edges = edges.enter(&scope); - let roots = roots.enter(&scope); + let edges = edges.enter(scope); + let roots = roots.enter(scope); dists.arrange_by_self() .join_core(edges, |_src, _, &dst| Some(dst)) .concat(roots) diff --git a/server/src/lib.rs b/server/src/lib.rs index b1a58c61f..cdf674b68 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -19,7 +19,7 @@ pub type TraceHandle = TraceAgent; /// Arguments provided to each shared library to help build their dataflows and register their results. pub type Environment<'a, 'b> = ( - &'a mut Scope<'b, usize>, + Scope<'b, usize>, &'a mut TraceHandler, &'a mut ProbeHandle, &'a Instant, diff --git a/tpchlike/src/bin/arrange.rs b/tpchlike/src/bin/arrange.rs index a96996bdc..c9ffcd48e 100644 --- a/tpchlike/src/bin/arrange.rs +++ b/tpchlike/src/bin/arrange.rs @@ -59,7 +59,7 @@ fn main() { // use timely::dataflow::ProbeHandle; - let mut context = Context::new(scope.clone(), collections); + let mut context = Context::new(scope, collections); context.index = arrange; diff --git a/tpchlike/src/bin/just-arrange.rs b/tpchlike/src/bin/just-arrange.rs index 60d82de74..ce78720ae 100644 --- a/tpchlike/src/bin/just-arrange.rs +++ b/tpchlike/src/bin/just-arrange.rs @@ -59,7 +59,7 @@ fn main() { // use timely::dataflow::ProbeHandle; - let mut context = Context::new(scope.clone(), collections); + let mut context = Context::new(scope, collections); context.index = arrange;