diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 6f0b60556a751..7cad2d6747a50 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -302,41 +302,50 @@ impl PhysicalExpr for BinaryExpr { let rhs = self.right.evaluate(batch)?; return Ok(rhs); } - ShortCircuitStrategy::PreSelection(selection) => { - // The function `evaluate_selection` was not called for filtering and calculation, - // as it takes into account cases where the selection contains null values. - let batch = filter_record_batch(batch, selection)?; - let right_ret = self.right.evaluate(&batch)?; + ShortCircuitStrategy::PreSelection { mask, fill_value } => { + // `mask` selects the rows whose result depends on the RHS; the + // unselected rows are all `fill_value` (see `ShortCircuitStrategy`). + // + // Use `filter_record_batch` directly because `evaluate_selection` + // scatters the RHS back to the original batch length. + let selection_batch = filter_record_batch(batch, &mask)?; + let right_ret = self.right.evaluate(&selection_batch)?; match &right_ret { ColumnarValue::Array(array) => { - // When the array on the right is all true or all false, skip the scatter process let boolean_array = array.as_boolean(); - if boolean_array.null_count() == 0 && !boolean_array.has_false() { - return Ok(lhs); - } else if boolean_array.null_count() == 0 - && !boolean_array.has_true() - { - // If the right-hand array is returned at this point,the lengths will be inconsistent; - // returning a scalar can avoid this issue - return Ok(ColumnarValue::Scalar(ScalarValue::Boolean( - Some(false), - ))); + // If the RHS is uniform on the selected rows, the whole + // expression collapses and no scatter is needed. + if boolean_array.null_count() == 0 { + let rhs_value = if !boolean_array.has_false() { + Some(true) + } else if !boolean_array.has_true() { + Some(false) + } else { + None + }; + if let Some(rhs_value) = rhs_value { + return Ok(uniform_pre_selection_result( + rhs_value, fill_value, lhs, + )); + } } - return pre_selection_scatter(selection, Some(boolean_array)); + return pre_selection_scatter( + &mask, + Some(boolean_array), + fill_value, + ); } ColumnarValue::Scalar(scalar) => { if let ScalarValue::Boolean(v) = scalar { - // When the scalar is true or false, skip the scatter process + // A scalar RHS applies uniformly to all selected rows. if let Some(v) = v { - if *v { - return Ok(lhs); - } else { - return Ok(right_ret); - } + return Ok(uniform_pre_selection_result( + *v, fill_value, lhs, + )); } else { - return pre_selection_scatter(selection, None); + return pre_selection_scatter(&mask, None, fill_value); } } else { return internal_err!( @@ -846,16 +855,28 @@ impl BinaryExpr { } } -enum ShortCircuitStrategy<'a> { +enum ShortCircuitStrategy { None, ReturnLeft, ReturnRight, - PreSelection(&'a BooleanArray), + /// Evaluate the right-hand side only on the rows selected by `mask`, then + /// scatter the results back, filling the unselected rows with `fill_value`. + /// + /// - For `AND`, `mask` selects the rows where the LHS is `true` and + /// `fill_value` is `false` (rows where the LHS is `false` are `false`). + /// - For `OR`, `mask` selects the rows where the LHS is `false` and + /// `fill_value` is `true` (rows where the LHS is `true` are `true`). + PreSelection { + mask: BooleanArray, + fill_value: bool, + }, } /// Based on the results calculated from the left side of the short-circuit operation, -/// if the proportion of `true` is less than 0.2 and the current operation is an `and`, -/// the `RecordBatch` will be filtered in advance. +/// pre-selection filters the `RecordBatch` before evaluating the right-hand side when +/// the side that cannot short-circuit the operator is rare: +/// - for `AND`, when the proportion of `true` is less than or equal to 0.2 +/// - for `OR`, when the proportion of `false` is less than or equal to 0.2 const PRE_SELECTION_THRESHOLD: f32 = 0.2; /// Checks if a logical operator (`AND`/`OR`) can short-circuit evaluation based on the left-hand side (lhs) result. @@ -864,24 +885,21 @@ const PRE_SELECTION_THRESHOLD: f32 = 0.2; /// - For `AND`: /// - if LHS is all false => short-circuit → return LHS /// - if LHS is all true => short-circuit → return RHS -/// - if LHS is mixed and true_count/sum_count <= [`PRE_SELECTION_THRESHOLD`] -> pre-selection +/// - if LHS is mixed and true_count / len <= [`PRE_SELECTION_THRESHOLD`] -> pre-selection /// - For `OR`: /// - if LHS is all true => short-circuit → return LHS /// - if LHS is all false => short-circuit → return RHS +/// - if LHS is mixed and false_count / len <= [`PRE_SELECTION_THRESHOLD`] -> pre-selection /// # Arguments /// * `lhs` - The left-hand side (lhs) columnar value (array or scalar) -/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar) /// * `op` - The logical operator (`AND` or `OR`) /// /// # Implementation Notes /// 1. Only works with Boolean-typed arguments (other types automatically return `false`) /// 2. Handles both scalar values and array values /// 3. For arrays, uses optimized bit counting techniques for boolean arrays -fn check_short_circuit<'a>( - lhs: &'a ColumnarValue, - op: &Operator, -) -> ShortCircuitStrategy<'a> { - // Quick reject for non-logical operators,and quick judgment when op is and +fn check_short_circuit(lhs: &ColumnarValue, op: &Operator) -> ShortCircuitStrategy { + // Only logical operators can use this path. let is_and = match op { Operator::And => true, Operator::Or => false, @@ -909,36 +927,42 @@ fn check_short_circuit<'a>( let true_count = bool_array.values().count_set_bits(); if is_and { - // For AND, prioritize checking for all-false (short circuit case) - // Uses optimized false_count() method provided by Arrow - - // Short circuit if all values are false if true_count == 0 { return ShortCircuitStrategy::ReturnLeft; } - // If no false values, then all must be true if true_count == len { return ShortCircuitStrategy::ReturnRight; } - // determine if we can pre-selection if true_count as f32 / len as f32 <= PRE_SELECTION_THRESHOLD { - return ShortCircuitStrategy::PreSelection(bool_array); + // Select rows where the LHS is true; rows where the LHS + // is false are false regardless of the RHS. + return ShortCircuitStrategy::PreSelection { + mask: bool_array.clone(), + fill_value: false, + }; } } else { - // For OR, prioritize checking for all-true (short circuit case) - // Uses optimized true_count() method provided by Arrow - - // Short circuit if all values are true if true_count == len { return ShortCircuitStrategy::ReturnLeft; } - // If no true values, then all must be false if true_count == 0 { return ShortCircuitStrategy::ReturnRight; } + + let false_count = len - true_count; + if false_count as f32 / len as f32 <= PRE_SELECTION_THRESHOLD { + // Select rows where the LHS is false; rows where the LHS + // is true are true regardless of the RHS. The LHS has no + // nulls here, so negating its bits is infallible. + let mask = BooleanArray::new(!bool_array.values(), None); + return ShortCircuitStrategy::PreSelection { + mask, + fill_value: true, + }; + } } } } @@ -961,62 +985,54 @@ fn check_short_circuit<'a>( ShortCircuitStrategy::None } -/// Creates a new boolean array based on the evaluation of the right expression, -/// but only for positions where the left_result is true. +/// Collapses a pre-selected expression whose RHS is uniformly `rhs_value` across +/// every selected row, avoiding a scatter: +/// - when it equals `fill_value`, every row is `fill_value` (a scalar); +/// - otherwise the selected rows already equal the RHS, which matches the LHS +/// there, and the unselected rows are the LHS value too, so the result is `lhs`. +fn uniform_pre_selection_result( + rhs_value: bool, + fill_value: bool, + lhs: ColumnarValue, +) -> ColumnarValue { + if rhs_value == fill_value { + ColumnarValue::Scalar(ScalarValue::Boolean(Some(fill_value))) + } else { + lhs + } +} + +/// Creates a boolean array by scattering compact RHS results into the positions +/// selected by `mask`. /// -/// This function is used for short-circuit evaluation optimization of logical AND operations: -/// - When left_result has few true values, we only evaluate the right expression for those positions -/// - Values are copied from right_array where left_result is true -/// - All other positions are filled with false values +/// This function is used for short-circuit evaluation optimization of logical AND/OR operations: +/// - Only selected rows are evaluated on the RHS +/// - Values are copied from `right_result` where `mask` is true +/// - All other positions are filled with `fill_value` (`false` for AND, `true` for OR) /// /// # Parameters -/// - `left_result` Boolean array with selection mask (typically from left side of AND) +/// - `mask` Boolean array with the rows whose result depends on the RHS /// - `right_result` Result of evaluating right side of expression (only for selected positions) +/// - `fill_value` The value for the unselected positions (`false` for AND, `true` for OR) /// /// # Returns -/// A combined ColumnarValue with values from right_result where left_result is true -/// -/// # Example -/// Initial Data: { 1, 2, 3, 4, 5 } -/// Left Evaluation -/// (Condition: Equal to 2 or 3) -/// ↓ -/// Filtered Data: {2, 3} -/// Left Bitmap: { 0, 1, 1, 0, 0 } -/// ↓ -/// Right Evaluation -/// (Condition: Even numbers) -/// ↓ -/// Right Data: { 2 } -/// Right Bitmap: { 1, 0 } -/// ↓ -/// Combine Results -/// Final Bitmap: { 0, 1, 0, 0, 0 } -/// -/// # Note -/// Perhaps it would be better to modify `left_result` directly without creating a copy? -/// In practice, `left_result` should have only one owner, so making changes should be safe. -/// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`]. +/// A combined `ColumnarValue` with the same length as `mask`. fn pre_selection_scatter( - left_result: &BooleanArray, + mask: &BooleanArray, right_result: Option<&BooleanArray>, + fill_value: bool, ) -> Result { - let result_len = left_result.len(); + let result_len = mask.len(); let mut result_array_builder = BooleanArray::builder(result_len); - // keep track of current position we have in right boolean array let mut right_array_pos = 0; - - // keep track of how much is filled let mut last_end = 0; - // reduce if condition in for_each match right_result { Some(right_result) => { - SlicesIterator::new(left_result).for_each(|(start, end)| { - // the gap needs to be filled with false + SlicesIterator::new(mask).for_each(|(start, end)| { if start > last_end { - result_array_builder.append_n(start - last_end, false); + result_array_builder.append_n(start - last_end, fill_value); } // copy values from right array for this slice @@ -1030,13 +1046,11 @@ fn pre_selection_scatter( last_end = end; }); } - None => SlicesIterator::new(left_result).for_each(|(start, end)| { - // the gap needs to be filled with false + None => SlicesIterator::new(mask).for_each(|(start, end)| { if start > last_end { - result_array_builder.append_n(start - last_end, false); + result_array_builder.append_n(start - last_end, fill_value); } - // append nulls for this slice derictly let len = end - start; result_array_builder.append_nulls(len); @@ -1044,9 +1058,9 @@ fn pre_selection_scatter( }), } - // Fill any remaining positions with false + // Fill any remaining positions with `fill_value` if last_end < result_len { - result_array_builder.append_n(result_len - last_end, false); + result_array_builder.append_n(result_len - last_end, fill_value); } let boolean_result = result_array_builder.finish(); @@ -5215,14 +5229,17 @@ mod tests { let ColumnarValue::Array(array) = &left_value else { panic!("Expected ColumnarValue::Array"); }; - let ShortCircuitStrategy::PreSelection(value) = + let ShortCircuitStrategy::PreSelection { mask, fill_value } = check_short_circuit(&left_value, &Operator::And) else { panic!("Expected ShortCircuitStrategy::PreSelection"); }; + // For AND, the mask selects the rows where the LHS is true and the + // unselected rows are filled with `false`. + assert!(!fill_value); let expected_boolean_arr: Vec<_> = as_boolean_array(array).unwrap().iter().collect(); - let boolean_arr: Vec<_> = value.iter().collect(); + let boolean_arr: Vec<_> = mask.iter().collect(); assert_eq!(expected_boolean_arr, boolean_arr); // op: OR left: all true @@ -5233,10 +5250,33 @@ mod tests { ShortCircuitStrategy::ReturnLeft )); - // op: OR left: not all true + // 20% false: OR can pre-select the false rows. let left_expr: Arc = logical2physical(&logical_col("a").gt(expr_lit(2)), &schema); let left_value = left_expr.evaluate(&batch).unwrap(); + let ColumnarValue::Array(array) = &left_value else { + panic!("Expected ColumnarValue::Array"); + }; + let ShortCircuitStrategy::PreSelection { mask, fill_value } = + check_short_circuit(&left_value, &Operator::Or) + else { + panic!("Expected ShortCircuitStrategy::PreSelection"); + }; + // For OR, the mask selects the rows where the LHS is false (the negation + // of the LHS) and the unselected rows are filled with `true`. + assert!(fill_value); + let negated_lhs: Vec<_> = as_boolean_array(array) + .unwrap() + .iter() + .map(|v| v.map(|b| !b)) + .collect(); + let boolean_arr: Vec<_> = mask.iter().collect(); + assert_eq!(negated_lhs, boolean_arr); + + // 60% false: OR falls back to normal evaluation. + let left_expr: Arc = + logical2physical(&logical_col("a").gt(expr_lit(4)), &schema); + let left_value = left_expr.evaluate(&batch).unwrap(); assert!(matches!( check_short_circuit(&left_value, &Operator::Or), ShortCircuitStrategy::None @@ -5340,15 +5380,10 @@ mod tests { )); } - /// Test for [pre_selection_scatter] - /// Since [check_short_circuit] ensures that the left side does not contain null and is neither all_true nor all_false, as well as not being empty, - /// the following tests have been designed: - /// 1. Test sparse left with interleaved true/false - /// 2. Test multiple consecutive true blocks - /// 3. Test multiple consecutive true blocks - /// 4. Test single true at first position - /// 5. Test single true at last position - /// 6. Test nulls in right array + /// Test for [pre_selection_scatter]. + /// + /// `check_short_circuit` only calls this helper with a non-empty, + /// non-null mask that is neither all true nor all false. #[test] fn test_pre_selection_scatter() { fn create_bool_array(bools: Vec) -> BooleanArray { @@ -5361,7 +5396,7 @@ mod tests { let left = create_bool_array(vec![true, false, true, false, true]); let right = create_bool_array(vec![false, true, false]); - let result = pre_selection_scatter(&left, Some(&right)).unwrap(); + let result = pre_selection_scatter(&left, Some(&right), false).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, true, false, false]); @@ -5375,7 +5410,7 @@ mod tests { create_bool_array(vec![false, true, true, false, true, true, true]); let right = create_bool_array(vec![true, false, false, true, false]); - let result = pre_selection_scatter(&left, Some(&right)).unwrap(); + let result = pre_selection_scatter(&left, Some(&right), false).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = @@ -5389,7 +5424,7 @@ mod tests { let left = create_bool_array(vec![true, false, false]); let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, Some(&right)).unwrap(); + let result = pre_selection_scatter(&left, Some(&right), false).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5402,7 +5437,7 @@ mod tests { let left = create_bool_array(vec![false, false, true]); let right = create_bool_array(vec![false]); - let result = pre_selection_scatter(&left, Some(&right)).unwrap(); + let result = pre_selection_scatter(&left, Some(&right), false).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = create_bool_array(vec![false, false, false]); @@ -5415,7 +5450,7 @@ mod tests { let left = create_bool_array(vec![false, true, false, true]); let right = BooleanArray::from(vec![None, Some(false)]); - let result = pre_selection_scatter(&left, Some(&right)).unwrap(); + let result = pre_selection_scatter(&left, Some(&right), false).unwrap(); let result_arr = result.into_array(left.len()).unwrap(); let expected = BooleanArray::from(vec![ @@ -5426,6 +5461,38 @@ mod tests { ]); assert_eq!(&expected, result_arr.as_boolean()); } + // OR semantics: selected rows take the RHS, unselected rows become true. + { + // Selection (LHS false rows): [T, F, T, F, T] + // Right (RHS on those rows): [F, T, F] + let left = create_bool_array(vec![true, false, true, false, true]); + let right = create_bool_array(vec![false, true, false]); + + let result = pre_selection_scatter(&left, Some(&right), true).unwrap(); + let result_arr = result.into_array(left.len()).unwrap(); + + // selected rows take the RHS value; unselected rows are `true` + let expected = create_bool_array(vec![false, true, true, true, false]); + assert_eq!(&expected, result_arr.as_boolean()); + } + // OR semantics with nulls in the right array. + { + // Selection (LHS false rows): [F, T, F, T] + // Right: [None, Some(false)] + let left = create_bool_array(vec![false, true, false, true]); + let right = BooleanArray::from(vec![None, Some(false)]); + + let result = pre_selection_scatter(&left, Some(&right), true).unwrap(); + let result_arr = result.into_array(left.len()).unwrap(); + + let expected = BooleanArray::from(vec![ + Some(true), // unselected => true + None, // null from right + Some(true), // unselected => true + Some(false), + ]); + assert_eq!(&expected, result_arr.as_boolean()); + } } #[test] @@ -5452,6 +5519,89 @@ mod tests { ); } + #[test] + fn test_or_false_preselection_returns_lhs() { + // `c OR false` over a mostly-true `c` triggers OR pre-selection; the + // result must equal `c`. + let schema = + Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)])); + let c_array = + Arc::new(BooleanArray::from(vec![true, false, true, true, true])) as ArrayRef; + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)]) + .unwrap(); + + let expr = logical2physical(&logical_col("c").or(expr_lit(false)), &schema); + + let result = expr.evaluate(&batch).unwrap(); + let ColumnarValue::Array(result_arr) = result else { + panic!("Expected ColumnarValue::Array"); + }; + + let expected: Vec<_> = c_array.as_boolean().iter().collect(); + let actual: Vec<_> = result_arr.as_boolean().iter().collect(); + assert_eq!( + expected, actual, + "OR with FALSE must equal LHS even with PreSelection" + ); + } + + #[test] + fn test_or_preselection_matches_kleene() { + // The OR pre-selection path must match full-batch Kleene OR. + use arrow::compute::kernels::boolean::or_kleene; + + let schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Boolean, true), + Field::new("d", DataType::Boolean, true), + ])); + + // `c` is mostly true (2/10 false => 20% <= threshold) so OR pre-selects. + let c = BooleanArray::from(vec![ + true, true, false, true, true, true, true, false, true, true, + ]); + + let d_cases = vec![ + // Mixed RHS with nulls exercises scatter and null copy. + BooleanArray::from(vec![ + Some(false), + Some(true), + Some(true), + Some(false), + Some(false), + Some(true), + Some(false), + None, + Some(true), + None, + ]), + // RHS true on selected rows exercises the uniform-fill path. + BooleanArray::from(vec![Some(true); 10]), + // RHS false on selected rows exercises the return-LHS path. + BooleanArray::from(vec![Some(false); 10]), + ]; + + for d in d_cases { + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(c.clone()) as ArrayRef, + Arc::new(d.clone()) as ArrayRef, + ], + ) + .unwrap(); + + let expr = logical2physical(&logical_col("c").or(logical_col("d")), &schema); + let result = expr.evaluate(&batch).unwrap().into_array(c.len()).unwrap(); + + let expected = or_kleene(&c, &d).unwrap(); + assert_eq!( + expected, + *result.as_boolean(), + "OR pre-selection must match Kleene OR for d = {d:?}" + ); + } + } + #[test] fn test_evaluate_bounds_int32() { let schema = Schema::new(vec![