-
Notifications
You must be signed in to change notification settings - Fork 2k
feat: Add evaluate_to_arrays function #18446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
0717ab9
8eb7788
9fb6daa
396de6c
0791062
2541120
e13afbc
b73ac2e
778c971
3597bcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -113,10 +113,10 @@ impl ColumnarValue { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Convert a columnar value into an Arrow [`ArrayRef`] with the specified | ||||||
| /// number of rows. [`Self::Scalar`] is converted by repeating the same | ||||||
| /// scalar multiple times which is not as efficient as handling the scalar | ||||||
| /// directly. | ||||||
| /// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified | ||||||
| /// number of rows by repeating the same scalar multiple times, | ||||||
| /// which is not as efficient as handling the scalar directly. | ||||||
| /// [`Self::Array`] will just be returned as is. | ||||||
| /// | ||||||
| /// See [`Self::values_to_arrays`] to convert multiple columnar values into | ||||||
| /// arrays of the same length. | ||||||
|
|
@@ -135,6 +135,36 @@ impl ColumnarValue { | |||||
| /// number of rows. [`Self::Scalar`] is converted by repeating the same | ||||||
| /// scalar multiple times which is not as efficient as handling the scalar | ||||||
| /// directly. | ||||||
| /// This validates that [`Self::Array`], if it exists, has the expected length. | ||||||
|
||||||
| /// This validates that [`Self::Array`], if it exists, has the expected length. | |
| /// This validates that if this is [`Self::Array`] it has the expected length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you validate the exact error as it can return different error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,6 +22,7 @@ use crate::tree_node::ExprContext; | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use arrow::record_batch::RecordBatch; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use datafusion_common::Result; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use datafusion_expr_common::sort_properties::ExprProperties; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -91,6 +92,26 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(make_array(data)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Evaluates expressions against a record batch. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// This will convert the resulting ColumnarValues to ArrayRefs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// duplicating any ScalarValues that may have been returned, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// and validating that the returned arrays all have the same | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// number of rows as the input batch. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| #[inline] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pub fn evaluate_expressions_to_arrays<'a>( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| batch: &RecordBatch, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> Result<Vec<ArrayRef>> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let num_rows = batch.num_rows(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| exprs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .into_iter() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .map(|e| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| e.evaluate(batch) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .and_then(|col| col.into_array_of_size(num_rows)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .collect::<Result<Vec<ArrayRef>>>() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
100
to
113
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we handle the footgun of Also since this is going to be used pretty widely maybe a docstring example, etc. would help
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I had a few busy days.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But I do think that's another feature/function that is missing in my opinion
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe there is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to_array is the same as into_array except it takes by reference, which is an extra unneeded clone in this case.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To avoid bike shedding I'll just give my code suggestion:
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems reasonable to me, I do still think this should be inside ColumnarValue, I will prob make this PR in the near future
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed I think
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have decided to just add the required functions in ColumnarValue, as well as make this function more generic, so it can be used directly with iterators such as the group_by and nulls exprs case |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| #[cfg(test)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| mod tests { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use std::sync::Arc; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ use arrow::record_batch::RecordBatch; | |
| use datafusion_common::Result; | ||
| use datafusion_expr::{LimitEffect, PartitionEvaluator}; | ||
|
|
||
| use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; | ||
| use std::any::Any; | ||
| use std::sync::Arc; | ||
|
|
||
|
|
@@ -57,13 +58,7 @@ pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug { | |
| /// | ||
| /// Typically, the resulting vector is a single element vector. | ||
| fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> { | ||
| self.expressions() | ||
| .iter() | ||
| .map(|e| { | ||
| e.evaluate(batch) | ||
| .and_then(|v| v.into_array(batch.num_rows())) | ||
| }) | ||
| .collect() | ||
| evaluate_expressions_to_arrays(&self.expressions(), batch) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😍 |
||
| } | ||
|
|
||
| /// Create a [`PartitionEvaluator`] for evaluating the function on | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::{ | |
| }; | ||
|
|
||
| use datafusion_expr::utils::AggregateOrderSensitivity; | ||
| use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; | ||
| use itertools::Itertools; | ||
|
|
||
| pub mod group_values; | ||
|
|
@@ -1434,25 +1435,14 @@ pub fn finalize_aggregation( | |
| } | ||
| } | ||
|
|
||
| /// Evaluates expressions against a record batch. | ||
| fn evaluate( | ||
| expr: &[Arc<dyn PhysicalExpr>], | ||
| batch: &RecordBatch, | ||
| ) -> Result<Vec<ArrayRef>> { | ||
| expr.iter() | ||
| .map(|expr| { | ||
| expr.evaluate(batch) | ||
| .and_then(|v| v.into_array(batch.num_rows())) | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| /// Evaluates expressions against a record batch. | ||
| /// Evaluates groups of expressions against a record batch. | ||
| pub fn evaluate_many( | ||
| expr: &[Vec<Arc<dyn PhysicalExpr>>], | ||
| batch: &RecordBatch, | ||
| ) -> Result<Vec<Vec<ArrayRef>>> { | ||
| expr.iter().map(|expr| evaluate(expr, batch)).collect() | ||
| expr.iter() | ||
| .map(|expr| evaluate_expressions_to_arrays(expr, batch)) | ||
| .collect() | ||
| } | ||
|
|
||
| fn evaluate_optional( | ||
|
|
@@ -1506,23 +1496,14 @@ pub fn evaluate_group_by( | |
| group_by: &PhysicalGroupBy, | ||
| batch: &RecordBatch, | ||
| ) -> Result<Vec<Vec<ArrayRef>>> { | ||
| let exprs: Vec<ArrayRef> = group_by | ||
| .expr | ||
| .iter() | ||
| .map(|(expr, _)| { | ||
| let value = expr.evaluate(batch)?; | ||
| value.into_array(batch.num_rows()) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| let null_exprs: Vec<ArrayRef> = group_by | ||
| .null_expr | ||
| .iter() | ||
| .map(|(expr, _)| { | ||
| let value = expr.evaluate(batch)?; | ||
| value.into_array(batch.num_rows()) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| let exprs = evaluate_expressions_to_arrays( | ||
| group_by.expr.iter().map(|(expr, _)| expr), | ||
| batch, | ||
| )?; | ||
| let null_exprs = evaluate_expressions_to_arrays( | ||
| group_by.null_expr.iter().map(|(expr, _)| expr), | ||
| batch, | ||
| )?; | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe these could make use of the new helper function too: let exprs_list: Vec<_> = group_by.expr.iter().map(|(e, _)| Arc::clone(e)).collect();
let null_exprs_list: Vec<_> = group_by.null_expr.iter().map(|(e, _)| Arc::clone(e)).collect();
let exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&exprs_list, batch)?;
let null_exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&null_exprs_list, batch)?;
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually considered it, just didn't want to allocate new Vecs if I could help it, but maybe worth adding anyway |
||
| group_by | ||
| .groups | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.