diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 000000000..babc088a8 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,18 @@ +name: deploy + +on: + push: + branches: + - master + +jobs: + deploy: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + - run: cargo install mdbook --version 0.4.31 + - run: cd mdbook && mdbook build + - uses: JamesIves/github-pages-deploy-action@v4 + with: + branch: gh-pages + folder: mdbook/book diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 000000000..b3cef797e --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,18 @@ +name: test + +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + - run: rustup update 1.70 --no-self-update && rustup default 1.70 + - run: cargo build + - name: test mdBook + # rustdoc doesn't build dependencies, so it needs to run after `cargo build`, + # but its dependency search gets confused if there are multiple copies of any + # dependency in target/debug/deps, so it needs to run before `cargo test` et al. + # clutter target/debug/deps with multiple copies of things. + run: for file in $(find mdbook -name '*.md' | sort); do rustdoc --test $file -L ./target/debug/deps; done + - run: cargo test diff --git a/mdbook/src/chapter_0/chapter_0.md b/mdbook/src/chapter_0/chapter_0.md index 2d8e4ce3d..35d518657 100644 --- a/mdbook/src/chapter_0/chapter_0.md +++ b/mdbook/src/chapter_0/chapter_0.md @@ -2,9 +2,9 @@ Differential dataflow programs are structured as two easy steps: - 1. Write a program. - 2. Change its input. +1. Write a program. +2. Change its input. We will work through an example program, and then interact with it by changing its inputs. Our goal is foremost to show you what a program looks like, and to give you a sense for what interactions look like. -Once we've done this, in the next chapter we will jazz things up a bit with an increased scale of data, computation, and interaction! \ No newline at end of file +Once we've done this, in the next chapter we will jazz things up a bit with an increased scale of data, computation, and interaction! diff --git a/mdbook/src/chapter_0/chapter_0_0.md b/mdbook/src/chapter_0/chapter_0_0.md index ef45ef039..f72ef11d3 100644 --- a/mdbook/src/chapter_0/chapter_0_0.md +++ b/mdbook/src/chapter_0/chapter_0_0.md @@ -4,34 +4,39 @@ The first thing you will need to do, if you want to follow along with the exampl With Rust in hand, crack open a shell and make a new project using Rust build manager `cargo`. - Echidnatron% cargo new my_project +```shell +cargo new my_project +``` This should create a new folder called `my_project`, and you can wander in there and type - Echidnatron% cargo run +```shell +cargo run +``` This will do something reassuring but pointless, like print `Hello, world!`, because we haven't gotten differential dataflow involved yet. I mean, it's Rust and you could learn that, but you probably want to read a different web page in that case. Instead, edit your `Cargo.toml` file, which tells Rust about your dependencies, to look like this: - Echidnatron% cat Cargo.toml - [package] - name = "my_project" - version = "0.1.0" - authors = ["Your Name "] +```toml +[package] +name = "my_project" +version = "0.1.0" +authors = ["Your Name "] - [dependencies] - timely = "0.11.1" - differential-dataflow = "0.11.0" - Echidnatron% +[dependencies] +timely = "0.11.1" +differential-dataflow = "0.11.0" +``` You should only need to add those last two lines there, which bring in dependencies on both [timely dataflow](https://github.com/TimelyDataflow/timely-dataflow) and [differential dataflow](https://github.com/TimelyDataflow/differential-dataflow). We will be using both of those. If you would like to point at the most current code release, hosted on github, you can replace the dependencies with: - [dependencies] - timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } - differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } - +```toml +[dependencies] +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" } +differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } +``` You should now be ready to go. Code examples should mostly work, and you should complain (or [file an issue](https://github.com/TimelyDataflow/differential-dataflow/issues)) if they do not! diff --git a/mdbook/src/chapter_0/chapter_0_1.md b/mdbook/src/chapter_0/chapter_0_1.md index 60b16e51a..5e69289a5 100644 --- a/mdbook/src/chapter_0/chapter_0_1.md +++ b/mdbook/src/chapter_0/chapter_0_1.md @@ -6,50 +6,50 @@ Let's write a program with one input: a collection `manages` of pairs `(manager, If you are following along at home, put this in your `src/main.rs` file. -```rust,no_run - extern crate timely; - extern crate differential_dataflow; +```rust +extern crate timely; +extern crate differential_dataflow; - use differential_dataflow::input::InputSession; - use differential_dataflow::operators::Join; +use differential_dataflow::input::InputSession; +use differential_dataflow::operators::Join; - fn main() { +fn main() { // define a new timely dataflow computation. timely::execute_from_args(std::env::args(), move |worker| { - // create an input collection of data. - let mut input = InputSession::new(); + // create an input collection of data. + let mut input = InputSession::new(); - // define a new computation. - worker.dataflow(|scope| { + // define a new computation. + worker.dataflow(|scope| { - // create a new collection from our input. - let manages = input.to_collection(scope); + // create a new collection from our input. + let manages = input.to_collection(scope); - // if (m2, m1) and (m1, p), then output (m1, (m2, p)) - manages - .map(|(m2, m1)| (m1, m2)) - .join(&manages) - .inspect(|x| println!("{:?}", x)); - }); + // if (m2, m1) and (m1, p), then output (m1, (m2, p)) + manages + .map(|(m2, m1)| (m1, m2)) + .join(&manages) + .inspect(|x| println!("{:?}", x)); + }); - // Read a size for our organization from the arguments. - let size = std::env::args().nth(1).unwrap().parse().unwrap(); + // Set an arbitrary size for our organization. + let size = 100; - // Load input (a binary tree). - input.advance_to(0); - for person in 0 .. size { - input.insert((person/2, person)); - } + // Load input (a binary tree). + input.advance_to(0); + for person in 0 .. size { + input.insert((person/2, person)); + } - }).expect("Computation terminated abnormally"); - } + }).expect("Computation terminated abnormally"); +} ``` This program has a bit of boilerplate, but at its heart it defines a new input `manages` and then joins it with itself, once the fields have been re-ordered. The intent is as stated in the comment: -```rust,no_run +```rust // if (m2, m1) and (m1, p), then output (m1, (m2, p)) ``` @@ -57,24 +57,27 @@ We want to report each pair `(m2, p)`, and we happen to also produce as evidence When we execute this program we get to see the skip-level reports for the small binary tree we loaded as input: - Echidnatron% cargo run -- 10 - Running `target/debug/my_project` - ((0, (0, 0)), 0, 1) - ((0, (0, 1)), 0, 1) - ((1, (0, 2)), 0, 1) - ((1, (0, 3)), 0, 1) - ((2, (1, 4)), 0, 1) - ((2, (1, 5)), 0, 1) - ((3, (1, 6)), 0, 1) - ((3, (1, 7)), 0, 1) - ((4, (2, 8)), 0, 1) - ((4, (2, 9)), 0, 1) - Echidnatron% +```ignore +Echidnatron% cargo run -- 10 + Running `target/debug/my_project` + ((0, (0, 0)), 0, 1) + ((0, (0, 1)), 0, 1) + ((1, (0, 2)), 0, 1) + ((1, (0, 3)), 0, 1) + ((2, (1, 4)), 0, 1) + ((2, (1, 5)), 0, 1) + ((3, (1, 6)), 0, 1) + ((3, (1, 7)), 0, 1) + ((4, (2, 8)), 0, 1) + ((4, (2, 9)), 0, 1) +Echidnatron% +``` This is a bit crazy, but what we are seeing is many triples of the form - (data, time, diff) - +```ignore +(data, time, diff) +``` describing how the data have *changed*. That's right; our input is actually a *change* from the initially empty input. The output is showing us that at time `(Root, 0)` several tuples have had their frequency incremented by one. That is a fancy way of saying they are the output. -This may make more sense in just a moment, when we want to *change* the input. \ No newline at end of file +This may make more sense in just a moment, when we want to *change* the input. diff --git a/mdbook/src/chapter_0/chapter_0_2.md b/mdbook/src/chapter_0/chapter_0_2.md index 2fb18f8c2..a30bd6f01 100644 --- a/mdbook/src/chapter_0/chapter_0_2.md +++ b/mdbook/src/chapter_0/chapter_0_2.md @@ -6,7 +6,7 @@ Our organization has gone from one where each manager has at most two reports, t The only change we'll make is to add the following just after we load up our initial org chart: -```rust,no_run +```rust,ignore for person in 1 .. size { input.advance_to(person); input.remove((person/2, person)); @@ -16,7 +16,7 @@ The only change we'll make is to add the following just after we load up our ini This moves us through new times, indicated by the line -```rust,no_run +```rust,ignore input.advance_to(person); ``` @@ -24,7 +24,7 @@ which advances the state of the `input` collection up to a timestamp `person`, w Once we've advanced the time, we make some changes. -```rust,no_run +```rust,ignore input.remove((person/2, person)); input.insert((person/3, person)); ``` @@ -33,6 +33,7 @@ This removes the prior management relation, and introduces a new one where the p We do this for each of the non-boss employees and get to see a bunch of outputs. +```ignore Echidnatron% cargo run -- 10 Running `target/debug/my_project` ((0, (0, 0)), 0, 1) @@ -68,6 +69,7 @@ We do this for each of the non-boss employees and get to see a bunch of outputs. ((4, (2, 9)), 0, 1) ((4, (2, 9)), 4, -1) Echidnatron% +``` Gaaaaaaah! What in the !#$!? @@ -81,20 +83,24 @@ It turns out our input changes result in output changes. Let's try and break thi Let's look at the entries for time `4`. +```ignore ((1, (0, 4)), 4, 1) ((2, (0, 4)), 4, -1) ((4, (1, 8)), 4, 1) ((4, (1, 9)), 4, 1) ((4, (2, 8)), 4, -1) ((4, (2, 9)), 4, -1) +``` There is a bit going on here. Four's manager changed from two to one, and while their skip-level manager remained zero the explanation changed. The first two lines record this change. The next four lines record the change in the skip-level manager of four's reports, eight and nine. At the end, time `9`, things are a bit simpler because we have reached the employees with no reports, and so the only changes are their skip-level manager, without any implications for other people. +```ignore ((3, (1, 9)), 9, 1) ((4, (1, 9)), 9, -1) +``` Oof. Well, we probably *could* have figured these things out by hand, right? -Let's check out some ways this gets more interesting. \ No newline at end of file +Let's check out some ways this gets more interesting. diff --git a/mdbook/src/chapter_0/chapter_0_3.md b/mdbook/src/chapter_0/chapter_0_3.md index c8999e5fb..d47d6f395 100644 --- a/mdbook/src/chapter_0/chapter_0_3.md +++ b/mdbook/src/chapter_0/chapter_0_3.md @@ -12,17 +12,21 @@ We'll break down the results of our modified computation two ways, just loading First, if we just produce the collection of skip-level management (with the step two code from before): +``` ignore Echidnatron% time cargo run --release -- 10000000 cargo run --release --example hello 10000000 -w1 2.74s user 1.00s system 98% cpu 3.786 total Echidnatron% +``` Four seconds. We have no clue if this is a good or bad time. Second, if we produce the skip-level management and then modify it 10 million times (including the step two code from before): +``` ignore Echidnatron% time cargo run --release -- 10000000 cargo run --release --example hello 10000000 10.64s user 2.22s system 99% cpu 12.939 total Echidnatron% +``` About thirteen seconds now. Just over a microsecond per modification, though these are throughput rather than latency numbers. @@ -32,28 +36,31 @@ Differential dataflow works great using multiple threads. Produces the same outp For this to work out, we'll want to ask each worker to load up a fraction of the input. Each timely dataflow worker has methods `index()` and `peers()`, which indicate the workers number and out of how many total workers. We can load our input up like so: -```rust,no_run +```rust,ignore let mut person = worker.index(); while person < people { input.insert((person/2, person)); person += worker.peers(); } - ``` We can also make the same changes to the code that supplies the change, where each worker is responsible for those people whose number equals `worker.index()` modulo `worker.peers()`. I'm on a laptop with two cores. Let's load the data again, without modifying it, but let's use two worker threads (with the `-w2` argument) +``` ignore Echidnatron% time cargo run --release -- 10000000 -w2 cargo run --release --example hello 10000000 -w2 3.34s user 1.27s system 191% cpu 2.402 total Echidnatron% +``` Now let's try loading and doing ten million modifications, but with two worker threads. +``` ignore Echidnatron% time cargo run --release -- 10000000 -w2 cargo run --release --example hello 10000000 -w2 13.06s user 3.14s system 196% cpu 8.261 total Echidnatron% +``` Each of these improve on the single-threaded execution (they do more total work, because). Perhaps amazingly, they even improve the case where we need to do ten million *sequential* modifications. We get exactly the same answer, too. @@ -61,7 +68,7 @@ Each of these improve on the single-threaded execution (they do more total work, Instead of loading all of our changes and only waiting for the result, we can load each change and await its results before supplying the next change. This requires a bit of timely dataflow magic, where we add a probe to the end of our dataflow: -```rust,no_run +```rust,ignore // create a manager let probe = worker.dataflow(|scope| { @@ -79,7 +86,7 @@ Instead of loading all of our changes and only waiting for the result, we can lo We can then use this probe to limit the introduction of new data, by waiting for it to catch up with our input before we insert new data: -```rust,no_run +```rust,ignore // wait for data loading. input.advance_to(1); input.flush(); while probe.less_than(&input.time()) { worker.step(); } @@ -99,6 +106,7 @@ We can then use this probe to limit the introduction of new data, by waiting for This starts to print out a mess of data, indicating not only how long it takes to start up the computation, but also how long each individual round of updates takes. +``` ignore Echidnatron% cargo run --release --example hello 10000000 Finished release [optimized + debuginfo] target(s) in 0.06s Running `target/release/examples/hello 10000000` @@ -112,13 +120,16 @@ This starts to print out a mess of data, indicating not only how long it takes t 4.093208245s step 8 complete 4.093236460s step 9 complete 4.093281793s step 10 complete +``` which continues for quite a while. +``` ignore 21.689493445s step 397525 complete 21.689522815s step 397526 complete 21.689553410s step 397527 complete 21.689593500s step 397528 complete 21.689643055s step 397529 complete +``` -You can see that this is pretty prompt; the latencies are in the tens of microseconds, but also that the whole computation is clearly going to take a bit longer. This is because we've forced some work to finish before we start the next work, which we haven't done before. \ No newline at end of file +You can see that this is pretty prompt; the latencies are in the tens of microseconds, but also that the whole computation is clearly going to take a bit longer. This is because we've forced some work to finish before we start the next work, which we haven't done before. diff --git a/mdbook/src/chapter_1/chapter_1_0.md b/mdbook/src/chapter_1/chapter_1_0.md index 57767980d..bc5495183 100644 --- a/mdbook/src/chapter_1/chapter_1_0.md +++ b/mdbook/src/chapter_1/chapter_1_0.md @@ -2,7 +2,7 @@ Differential dataflow computations are really just [timely dataflow](https://github.com/frankmcsherry/timely-dataflow) computations where we supply a sweet set of operators and idioms for you. As such, when you build a new differential dataflow computation it will need to have a timely dataflow skeleton built first. For example: -```rust,ignore +```rust extern crate timely; extern crate differential_dataflow; @@ -26,4 +26,4 @@ fn main() { This is a pretty standard skeleton, where our program immediately starts up a timely dataflow instance by defining what each independent worker should do. A standard pattern, seen above, is to have each worker construct a dataflow and then drive the inputs around somehow. -We'll get more specific in just a moment. \ No newline at end of file +We'll get more specific in just a moment. diff --git a/mdbook/src/chapter_1/chapter_1_1.md b/mdbook/src/chapter_1/chapter_1_1.md index ed0e66448..6d1b589fb 100644 --- a/mdbook/src/chapter_1/chapter_1_1.md +++ b/mdbook/src/chapter_1/chapter_1_1.md @@ -8,7 +8,7 @@ For now, let's think of an input collection as a multiset (or a "bag"): a collec Let's take our skeleton from the previous subsection and add an input collection. -```rust,no_run +```rust extern crate timely; extern crate differential_dataflow; @@ -22,7 +22,7 @@ Let's take our skeleton from the previous subsection and add an input collection // create a counting differential dataflow. let mut input = worker.dataflow::(|scope| { // create inputs, build dataflow, return stuff. - let (input, words) = scope.new_collection(); + let (input, words) = scope.new_collection::(); words.inspect(|x| println!("seen: {:?}", x)); input }); @@ -37,13 +37,15 @@ Here we've created a new input collection in `scope`, which returns a pair `(inp This isn't a wildly interesting program yet, because we haven't actually changed `input`. Let's do that now, where the code currently says +``` // drive the input around here. +``` Differential dataflow inputs are similar to timely dataflow inputs, if you are familiar with those, but with a few important tweaks. Each input has a "time" it is currently set to. You can `insert(item)` and `remove(item)` to your hearts content, and these changes will take effect at the time currently associated with the input. For example, we could write: -```rust,no_run +```rust,ignore // drive the input around here. input.insert("hello".to_string()); input.insert("world".to_string()); @@ -67,4 +69,4 @@ It is crucial to call `input.advance_to(time)` if you want to see the output cha ## Flushing -The calls to `insert()`, `remove()`, and `advance_to()` are buffered in the interest of efficiency, and you may need to call `input.flush()` to ensure that every change you've applied to the input is visible to the system. This is unlike timely dataflow, which does not buffer its `advance_to` calls. \ No newline at end of file +The calls to `insert()`, `remove()`, and `advance_to()` are buffered in the interest of efficiency, and you may need to call `input.flush()` to ensure that every change you've applied to the input is visible to the system. This is unlike timely dataflow, which does not buffer its `advance_to` calls. diff --git a/mdbook/src/chapter_2/chapter_2_1.md b/mdbook/src/chapter_2/chapter_2_1.md index bb9c5d99f..6a3ac46bd 100644 --- a/mdbook/src/chapter_2/chapter_2_1.md +++ b/mdbook/src/chapter_2/chapter_2_1.md @@ -4,18 +4,37 @@ The `map` operator applies a supplied function to each element of a collection, As an example, our example program used `map` to reverse the pairs of identifiers in the `manages` collection, to place the second element first. -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::lattice::Lattice; +# use differential_dataflow::operators::Join; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .map(|(m2, m1)| (m1, m2)) .join(&manages) .inspect(|x| println!("{:?}", x)); +# } ``` If instead we had just written -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::lattice::Lattice; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .map(|(m2, m1)| m2); +# } ``` -we would have a collection containing each manager with a multiplicity equal to the number of individuals they manage. \ No newline at end of file +we would have a collection containing each manager with a multiplicity equal to the number of individuals they manage. diff --git a/mdbook/src/chapter_2/chapter_2_2.md b/mdbook/src/chapter_2/chapter_2_2.md index ba1b4b38f..014984279 100644 --- a/mdbook/src/chapter_2/chapter_2_2.md +++ b/mdbook/src/chapter_2/chapter_2_2.md @@ -4,9 +4,18 @@ The `filter` operator applies a supplied predicate to each element of a collecti As an example, we might select out those management relation where the manager has greater employee id than the managee, by writing -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::lattice::Lattice; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .filter(|&(m2, m1)| m2 > m1); +# } ``` -Rust makes it very clear when a method is provided with data, or only the ability to look at the data. The filter operator is only allowed to look at the data, which is where the `&` glyph comes from. This allows us to be more efficient in execution, but it is a subtle concept that further Rust reading may illuminate. \ No newline at end of file +Rust makes it very clear when a method is provided with data, or only the ability to look at the data. The filter operator is only allowed to look at the data, which is where the `&` glyph comes from. This allows us to be more efficient in execution, but it is a subtle concept that further Rust reading may illuminate. diff --git a/mdbook/src/chapter_2/chapter_2_3.md b/mdbook/src/chapter_2/chapter_2_3.md index 0a7782b66..16f3c7679 100644 --- a/mdbook/src/chapter_2/chapter_2_3.md +++ b/mdbook/src/chapter_2/chapter_2_3.md @@ -4,17 +4,28 @@ The `concat` operator takes two collections whose element have the same type, an For example, we might form the symmetric "management relation" by concatenating the `manages` collection with the same collection with its fields flipped: -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::lattice::Lattice; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .map(|(m2, m1)| (m1, m2)) .concat(&manages); +# } ``` This collection likely has at most one copy of each record, unless perhaps any manager manages itself. In fact, zero manages itself, and the element `(0, 0)` would have count two. Importantly, `concat` doesn't do the hard work of ensuring that there is only one physical of each element. If we inspect the output of the `concat` above, we might see +```ignore ((0, 0), 0, 1) ((0, 0), 0, 1) +``` -Although these are two updates to the same element at the same time, `concat` is a bit lazy (read: efficient) and doesn't do the hard work until we ask it. For that, we'll need the `consolidate` operator. \ No newline at end of file +Although these are two updates to the same element at the same time, `concat` is a bit lazy (read: efficient) and doesn't do the hard work until we ask it. For that, we'll need the `consolidate` operator. diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index f61cebce6..b9149792e 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -6,30 +6,53 @@ What `consolidate` does do is ensure that each element at each time has at most As an example, if we were to inspect -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::lattice::Lattice; +# use differential_dataflow::operators::Reduce; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .map(|(m2, m1)| (m1, m2)) .concat(&manages) .inspect(|x| println!("{:?}", x)); +# } ``` we might see two copies of the same element: - ((0, 0), 0, 1) - ((0, 0), 0, 1) +```ignore +((0, 0), 0, 1) +((0, 0), 0, 1) +``` However, by introducing `consolidate` -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::lattice::Lattice; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .map(|(m2, m1)| (m1, m2)) .concat(&manages) .consolidate() .inspect(|x| println!("{:?}", x)); +# } ``` we are guaranteed to see at most one `(0,0)` update at each time: - ((0, 0), 0, 2) +```ignore +((0, 0), 0, 2) +``` -The `consolidate` operator is mostly useful before `inspect`ing data, but it can also be important for efficiency; knowing when to spend the additional computation to consolidate the representation of your data is an advanced topic! +The `consolidate` function is mostly useful before `inspect`ing data, but it can also be important for efficiency; knowing when to spend the additional computation to consolidate the representation of your data is an advanced topic! diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 48a45ba79..85065e927 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -4,11 +4,21 @@ The `join` operator takes two input collections, each of which must have records Our example from earlier uses a join to match up pairs `(m2, m1)` and `(m1, p)` when the `m1` is in common. To do this, we first have to switch the records in the first collection around, so that they are keyed by `m1` instead of `m2`. -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::operators::Join; +# use differential_dataflow::lattice::Lattice; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .map(|(m2, m1)| (m1, m2)) .join(&manages) .inspect(|x| println!("{:?}", x)); +# } ``` The join operator multiplies frequencies, so if a record `(key, val1)` has multiplicity five, and a matching record `(key, val2)` has multiplicity three, the output result will be `(key, (val1, val2))` with multiplicity fifteen. diff --git a/mdbook/src/chapter_2/chapter_2_6.md b/mdbook/src/chapter_2/chapter_2_6.md index ec8fbb0e5..10ff885b1 100644 --- a/mdbook/src/chapter_2/chapter_2_6.md +++ b/mdbook/src/chapter_2/chapter_2_6.md @@ -4,7 +4,16 @@ The `reduce` operator takes an input collection whose records have a `(key, valu For example, to produce for each manager their managee with the lowest identifier, we might write -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::lattice::Lattice; +# use differential_dataflow::operators::Reduce; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages .reduce(|_key, input, output| { let mut min_index = 0; @@ -19,19 +28,20 @@ For example, to produce for each manager their managee with the lowest identifie // Must produce outputs as `(Value, Count)`. output.push((*input[min_index].0, 1)); }); +# } ``` The reduce operator has some tricky Rust details about how it is expressed. The type of closure you must provide is technically -```rust,no_run - Fn(&Key, &[(&Val, Cnt)], &mut Vec<(Val2, Cnt2)>) +```ignore + Fn(&Key, &[(&Val, Cnt)], &mut Vec<(Val2, Cnt2)>) ``` which means a function of three arguments: - 1. A reference to the common key (`_key` above). - 2. A slice (list) of pairs of value references and counts. - 3. A mutable vector into which one can put pairs of values and counts. +1. A reference to the common key (`_key` above). +2. A slice (list) of pairs of value references and counts. +3. A mutable vector into which one can put pairs of values and counts. The method is structured this way so that you can efficiently observe and manipulate records with large multiplicities without actually walking through that number of records. For example, we can write a `count` operator much more efficiently with the count looking at us than if we had to traverse as many copies of each record as we were counting up. @@ -47,4 +57,4 @@ The `distinct` operator is another convenience operator, and it takes any input ### Threshold -More general than `distinct`, the `threshold` operator takes any function from one count to another count and yields the collection with counts correspondingly updated. This is used to implement the distinct operator, but also operators like "records with count at least three". \ No newline at end of file +More general than `distinct`, the `threshold` operator takes any function from one count to another count and yields the collection with counts correspondingly updated. This is used to implement the distinct operator, but also operators like "records with count at least three". diff --git a/mdbook/src/chapter_2/chapter_2_7.md b/mdbook/src/chapter_2/chapter_2_7.md index f593e1ff7..d079d7924 100644 --- a/mdbook/src/chapter_2/chapter_2_7.md +++ b/mdbook/src/chapter_2/chapter_2_7.md @@ -4,7 +4,16 @@ The `iterate` operator takes a starting input collection and a closure to repeat As an example, we can take our `manages` relation and determine for all employees all managers above them in the organizational chat. To do this, we start from the `manages` relation and write a closure that extends any transitive management pairs by "one hop" along the management relation, using a join operation. -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::operators::{Join, Iterate, Threshold}; +# use differential_dataflow::lattice::Lattice; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages // transitive contains (manager, person) for many hops. .iterate(|transitive| { transitive @@ -14,6 +23,7 @@ As an example, we can take our `manages` relation and determine for all employee .concat(&transitive) .distinct() }); +# } ``` Although the first three lines of the closure may look like our skip-level management example, we have three more steps that are very important. @@ -30,11 +40,21 @@ The `enter` operator is a helpful method that brings collections outside a loop In the example above, we could rewrite -```rust,no_run +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use differential_dataflow::Collection; +# use differential_dataflow::operators::{Join, Threshold}; +# use differential_dataflow::operators::{Iterate, iterate::Variable}; +# use differential_dataflow::lattice::Lattice; +# fn example(manages: &Collection) +# where G::Timestamp: Lattice +# { manages // transitive contains (manager, person) for many hops. .iterate(|transitive| { - let manages = manages.enter(transitive.scope()); + let manages = manages.enter(&transitive.scope()); transitive .map(|(mk, m1)| (m1, mk)) @@ -43,6 +63,7 @@ In the example above, we could rewrite .concat(&manages) .distinct() }); +# } ``` This modified version extends `transitive` by one step along `manages`, rather than by a step along `transitive`. It also concatenates in `manages` rather than `transitive`. This modified version can perform better, as while it takes shorter steps, they are also more measured. @@ -59,11 +80,28 @@ Manual construction can be important when you have mutual recursion, perhaps amo As an example, the implementation of the `iterate` operator looks something like this: -```rust,no_run - collection.scope().scoped(|subgraph| { - let variable = Variable::from(collection.enter(subgraph)); +```rust +# extern crate timely; +# extern crate differential_dataflow; +# use timely::dataflow::Scope; +# use timely::dataflow::scopes::Child; +# use timely::progress::Antichain; +# use differential_dataflow::Collection; +# use differential_dataflow::operators::{Iterate, iterate::Variable}; +# use differential_dataflow::lattice::Lattice; +# fn logic<'a, G: Scope>(variable: &Variable, (u64, u64), isize>) -> Collection, (u64, u64)> +# where G::Timestamp: Lattice +# { +# (*variable).clone() +# } +# fn example<'a, G: Scope>(collection: &Collection) //, logic: impl Fn(&Variable, (u64, u64), isize>) -> Collection, (u64, u64)>) +# where G::Timestamp: Lattice +# { + collection.scope().scoped("inner", |subgraph| { + let variable = Variable::new_from(collection.enter(subgraph), 1); let result = logic(&variable); variable.set(&result); result.leave() - }) -``` \ No newline at end of file + }); +# } +``` diff --git a/mdbook/src/chapter_3/chapter_3.md b/mdbook/src/chapter_3/chapter_3.md index 7e4aad342..2b36388d1 100644 --- a/mdbook/src/chapter_3/chapter_3.md +++ b/mdbook/src/chapter_3/chapter_3.md @@ -4,7 +4,7 @@ Once a computation is written, we have only to interact with it. At its heart, t Our goal is to go in-order through each of the elements of the code from our interactive example. -```rust,no_run +```rust,ignore // make changes, but await completion. let mut person = index; while person < people { @@ -19,4 +19,4 @@ Our goal is to go in-order through each of the elements of the code from our int } ``` -Each of these parts, more or less, do something interesting and important. There is also some flexibility in how they are used, which we will also try to highlight. \ No newline at end of file +Each of these parts, more or less, do something interesting and important. There is also some flexibility in how they are used, which we will also try to highlight. diff --git a/mdbook/src/chapter_3/chapter_3_1.md b/mdbook/src/chapter_3/chapter_3_1.md index 017626beb..469ad8fdc 100644 --- a/mdbook/src/chapter_3/chapter_3_1.md +++ b/mdbook/src/chapter_3/chapter_3_1.md @@ -2,7 +2,7 @@ We've seen already one example of creating a differential dataflow input in our management example. -```rust,no_run +```rust,ignore // create an input collection of data. let mut input = InputSession::new(); @@ -23,7 +23,7 @@ You can also create input sessions from the `new_collection` and `new_collection For example, above we could have written the above as: -```rust,no_run +```rust,ignore // define a new computation. let mut input = worker.dataflow(|scope| { @@ -42,4 +42,4 @@ Notice that we need to return the input from the closure, and bind it as the res Any timely dataflow stream of the correct record type, specifically `(data, time, diff)`, can be re-interpreted as a differential dataflow collection using the `AsCollection` trait, which provides a method `as_collection()`. -This operator is helpful in the implementation of differential dataflow operators, when you need to dive in to timely dataflow specializations, and when you need to interoperate with timely dataflow computations. Perhaps you bring your data in from Kafka using timely dataflow; you must change it from a timely dataflow stream to a differential dataflow collection. \ No newline at end of file +This operator is helpful in the implementation of differential dataflow operators, when you need to dive in to timely dataflow specializations, and when you need to interoperate with timely dataflow computations. Perhaps you bring your data in from Kafka using timely dataflow; you must change it from a timely dataflow stream to a differential dataflow collection. diff --git a/mdbook/src/chapter_3/chapter_3_2.md b/mdbook/src/chapter_3/chapter_3_2.md index f71fcef36..e61f66664 100644 --- a/mdbook/src/chapter_3/chapter_3_2.md +++ b/mdbook/src/chapter_3/chapter_3_2.md @@ -6,7 +6,7 @@ Dataflow computations differ from imperative computations in that you do not *fo For example, recall our example of interacting with our management computation, where we wrote -```rust,no_run +```rust,ignore // create a manager let probe = worker.dataflow(|scope| { @@ -23,7 +23,7 @@ For example, recall our example of interacting with our management computation, The returned probe allows us to ask whether the computation has stabilized to the point that there will be no more changes at certain query timestamps. We used the probe later on, when we wrote -```rust,no_run +```rust,ignore while probe.less_than(&input.time()) { worker.step(); } ``` diff --git a/mdbook/src/chapter_3/chapter_3_5.md b/mdbook/src/chapter_3/chapter_3_5.md index 40372ad11..c982a6b08 100644 --- a/mdbook/src/chapter_3/chapter_3_5.md +++ b/mdbook/src/chapter_3/chapter_3_5.md @@ -2,7 +2,7 @@ All of the differential dataflow computation happens in what seems like a fairly small an unobtrusive operation: -```rust,no_run +```rust,ignore worker.step(); ``` @@ -12,4 +12,4 @@ At the end of your differential dataflow computation, when we exit the closure s For example, our first example computations didn't call `worker.step()` explicitly, but just exited once it supplied the input changes. Exiting causes all of the work to happen (and complete, as the inputs are automatically closed as they are dropped). -Explicit calls to `worker.step()` are important when we are maintaining interactive access to probes, and do not want to simply complete the computation. \ No newline at end of file +Explicit calls to `worker.step()` are important when we are maintaining interactive access to probes, and do not want to simply complete the computation. diff --git a/mdbook/src/chapter_4/chapter_4_1.md b/mdbook/src/chapter_4/chapter_4_1.md index fadce2b49..d6726b4b1 100644 --- a/mdbook/src/chapter_4/chapter_4_1.md +++ b/mdbook/src/chapter_4/chapter_4_1.md @@ -8,7 +8,7 @@ One algorithm for this graph connectivity is "label propagation", in which each Let's write this computation starting from a collection `edges`, using differential dataflow. -```rust,no_run +```rust,ignore // create initial labels from sources. let labels = edges.map(|(src,dst)| (src,src)) .distinct(); @@ -35,4 +35,4 @@ This computation first determines some initial labels, taking as the set of node The computation then iteratively develops the label collection, by joining whatever labels we have at any point with the set of edges, effectively "proposing" each node's label to each of the node's neighbors. All nodes fold in these proposals with their initial labels, and each retains one copy of the smallest label they are provided as input. -The resulting collection contains pairs `(node, label)` from which we can determine if two nodes are in the same connected component: do they have the same label? Let's see how to use this interactively next! \ No newline at end of file +The resulting collection contains pairs `(node, label)` from which we can determine if two nodes are in the same connected component: do they have the same label? Let's see how to use this interactively next! diff --git a/mdbook/src/chapter_4/chapter_4_2.md b/mdbook/src/chapter_4/chapter_4_2.md index 1f6e77378..e8735eca9 100644 --- a/mdbook/src/chapter_4/chapter_4_2.md +++ b/mdbook/src/chapter_4/chapter_4_2.md @@ -6,7 +6,7 @@ Instead, let's describe an extended computation that lets us query the results, Imagine `labels` contains the results of the iterative computation from before. Let's create a new input, `queries`, which will simply contain node identifiers. -```rust,no_run +```rust,ignore labels.semijoin(queries) .inspect(|x| println!("{:?}", x)); ``` diff --git a/mdbook/src/chapter_4/chapter_4_3.md b/mdbook/src/chapter_4/chapter_4_3.md index 556177565..1865b2ab4 100644 --- a/mdbook/src/chapter_4/chapter_4_3.md +++ b/mdbook/src/chapter_4/chapter_4_3.md @@ -4,7 +4,7 @@ Our examples so far have involved careful manipulation of the input, making chan Imagine an external data source that we can poll for changes, and when polled responds with all outstanding changes and the logical times at which each occurred. There is a fairly natural pattern we can write that exposes these changes to differential dataflow and asks it to resolve all changes concurrently, while retaining the logical times of each of the input changes. -```rust,no_run +```rust,ignore while !source.done() { // fetch a bounded amount of input changes. for (data, time, diff) in source.fetch() { diff --git a/mdbook/src/chapter_5/chapter_5_2.md b/mdbook/src/chapter_5/chapter_5_2.md index e79b23315..5704d399e 100644 --- a/mdbook/src/chapter_5/chapter_5_2.md +++ b/mdbook/src/chapter_5/chapter_5_2.md @@ -30,15 +30,14 @@ fn main() { let knows = knows.arrange_by_key(); // Same logic as before, with a new method name. - query.join_core(&knows, |x,q,y| Some((*y,(*x,*q)))) - .join_core(&knows, |y,(x,q),z| Some((*q,(*x,*y,*z)))) - .inspect(|result| println!("result {:?}", result)); - + query.join_core(&knows, |x, q, y| Some((*y, (*x, *q)))) + .join_core(&knows, |y, (x, q), z| Some((*q, (*x, *y, *z)))) + .inspect(|result| println!("result {:?}", result)); }); -# // to help with type inference ... -# knows.update_at((0,0), 0usize, 1isize); -# query.update_at((0,0), 0usize, 1isize); + # // to help with type inference ... + # knows.update_at((0, 0), 0usize, 1isize); + # query.update_at((0, 0), 0usize, 1isize); }); } ``` @@ -108,7 +107,7 @@ You may need to return from an arrangement to a collection (a stream of updates) extern crate timely; extern crate differential_dataflow; -use differential_dataflow::operators::JoinCore; +use differential_dataflow::operators::{Join, JoinCore}; use differential_dataflow::operators::arrange::ArrangeByKey; fn main() { @@ -131,15 +130,15 @@ fn main() { let knows = knows.as_collection(|k,v| (*k,*v)); // Same logic as before, with a new method name. - query.join_map(&knows, |x,q,y| (*y,(*x,*q)))) + query.join_map(&knows, |x,q,y| (*y,(*x,*q))) .join_map(&knows, |y,(x,q),z| (*q,(*x,*y,*z))) .inspect(|result| println!("result {:?}", result)); }); -# // to help with type inference ... -# knows.update_at((0,0), 0usize, 1isize); -# query.update_at((0,0), 0usize, 1isize); + # // to help with type inference ... + # knows.update_at((0,0), 0usize, 1isize); + # query.update_at((0,0), 0usize, 1isize); }); } ``` diff --git a/mdbook/src/chapter_5/chapter_5_3.md b/mdbook/src/chapter_5/chapter_5_3.md index cbd4e8c70..a3d6f4587 100644 --- a/mdbook/src/chapter_5/chapter_5_3.md +++ b/mdbook/src/chapter_5/chapter_5_3.md @@ -31,9 +31,8 @@ fn main() { }); -# // to help with type inference ... -# knows.update_at((0,0), 0usize, 1isize); -# query.update_at((0,0), 0usize, 1isize); + # // to help with type inference ... + # knows.update_at((0,0), 0usize, 1isize); }); } ```