Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 95 additions & 85 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,28 @@
//! Often we find ourselves with collections of records with associated weights (often
//! integers) where we want to reduce the collection to the point that each record occurs
//! at most once, with the accumulated weights. These methods supply that functionality.
//!
//! Importantly, these methods are used internally by differential dataflow, but are made
//! public for the convenience of others. Their precise behavior is driven by the needs of
//! differential dataflow (chiefly: canonicalizing sequences of non-zero updates); should
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
//! specific behavior you require.

use crate::difference::Semigroup;

/// Sorts and consolidates `vec`.
///
/// This method will sort `vec` and then consolidate runs of more than one entry with
/// identical first elements by accumulating the second elements of the pairs. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
consolidate_from(vec, 0);
}

/// Sorts and consolidate `vec[offset..]`.
///
/// This method will sort `vec[offset..]` and then consolidate runs with the same first
/// element of a pair by accumulating the second elements of the pairs. Should the final
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
/// identical first elements by accumulating the second elements of the pairs. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
let length = consolidate_slice(&mut vec[offset..]);
Expand All @@ -26,62 +36,60 @@ pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
if slice.len() > 1 {

slice.sort_by(|x,y| x.0.cmp(&y.0));

let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

assert!(offset < index);

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 {
(*ptr1).1 += &(*ptr2).1;
}
else {
if !(*ptr1).1.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
slice.sort_by(|x,y| x.0.cmp(&y.0));

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

assert!(offset < index);

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 {
(*ptr1).1 += &(*ptr2).1;
}
else {
if !(*ptr1).1.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
}
}
if !slice[offset].1.is_zero() {
offset += 1;
}

offset
} else if slice.len() == 1 && !slice[0].1.is_zero() {
1
} else {
0
}
if offset < slice.len() && !slice[offset].1.is_zero() {
offset += 1;
}

offset
}

/// Sorts and consolidates `vec`.
///
/// This method will sort `vec` and then consolidate runs of more than one entry with
/// identical first two elements by accumulating the third elements of the triples. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
consolidate_updates_from(vec, 0);
}

/// Sorts and consolidate `vec[offset..]`.
///
/// This method will sort `vec[offset..]` and then consolidate runs with the same first
/// element of a pair by accumulating the second elements of the pairs. Should the final
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
/// identical first two elements by accumulating the third elements of the triples. Should the final
/// accumulation be zero, the element is discarded.
pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
let length = consolidate_updates_slice(&mut vec[offset..]);
Expand All @@ -93,50 +101,44 @@ pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D,

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
if slice.len() > 1 {

slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 {
(*ptr1).2 += &(*ptr2).2;
}
else {
if !(*ptr1).2.is_zero() {
offset += 1;
}
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
for index in 1 .. slice.len() {

// The following unsafe block elides various bounds checks, using the reasoning that `offset`
// is always strictly less than `index` at the beginning of each iteration. This is initially
// true, and in each iteration `offset` can increase by at most one (whereas `index` always
// increases by one). As `index` is always in bounds, and `offset` starts at zero, it too is
// always in bounds.
//
// LLVM appears to struggle to optimize out Rust's split_at_mut, which would prove disjointness
// using run-time tests.
unsafe {

// LOOP INVARIANT: offset < index
let ptr1 = slice.as_mut_ptr().offset(offset as isize);
let ptr2 = slice.as_mut_ptr().offset(index as isize);

if (*ptr1).0 == (*ptr2).0 && (*ptr1).1 == (*ptr2).1 {
(*ptr1).2 += &(*ptr2).2;
}
else {
if !(*ptr1).2.is_zero() {
offset += 1;
}

let ptr1 = slice.as_mut_ptr().offset(offset as isize);
std::mem::swap(&mut *ptr1, &mut *ptr2);
}
}
if !slice[offset].2.is_zero() {
offset += 1;
}

offset
} else if slice.len() == 1 && !slice[0].2.is_zero() {
1
} else {
0
}
}
if offset < slice.len() && !slice[offset].2.is_zero() {
offset += 1;
}

offset
}

#[cfg(test)]
Expand All @@ -158,6 +160,10 @@ mod tests {
vec![("a", 0)],
vec![],
),
(
vec![("a", 0), ("b", 0)],
vec![],
),
];

for (mut input, output) in test_cases {
Expand All @@ -182,6 +188,10 @@ mod tests {
vec![("a", 1, 0)],
vec![],
),
(
vec![("a", 1, 0), ("b", 1, 0)],
vec![],
),
];

for (mut input, output) in test_cases {
Expand Down