Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@ pub enum DataFusionError {
External(GenericError),
}

impl DataFusionError {
/// Wraps this [DataFusionError] as an [arrow::error::ArrowError].
///
/// TODO this can be removed in favor if the conversion below
pub fn into_arrow_external_error(self) -> ArrowError {
ArrowError::from_external_error(Box::new(self))
}
}

impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ fn build_batch(
let scalar = ScalarValue::try_from_array(arr, left_index)?;
Ok(scalar.to_array_of_size(batch.num_rows()))
})
.collect::<Result<Vec<_>>>()
.map_err(|x| x.into_arrow_external_error())?;
.collect::<Result<Vec<_>>>()?;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty good exemplar of how this change makes the DataFusion codebase less confusing


RecordBatch::try_new(
Arc::new(schema.clone()),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ fn batch_filter(
predicate
.evaluate(batch)
.map(|v| v.into_array(batch.num_rows()))
.map_err(DataFusionError::into_arrow_external_error)
.map_err(DataFusionError::into)
.and_then(|array| {
array
.as_any()
Expand All @@ -185,7 +185,7 @@ fn batch_filter(
DataFusionError::Internal(
"Filter predicate evaluated to non-boolean value".to_string(),
)
.into_arrow_external_error()
.into()
})
// apply filter array to record batch
.and_then(|filter_array| filter_record_batch(batch, filter_array))
Expand Down
34 changes: 11 additions & 23 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,7 @@ fn group_aggregate_batch(
}
// 1.2 Need to create new entry
None => {
let accumulator_set = create_accumulators(aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
let accumulator_set = create_accumulators(aggr_expr)?;

// Copy group values out of arrays into `ScalarValue`s
let group_by_values = group_values
Expand Down Expand Up @@ -516,8 +515,7 @@ async fn compute_grouped_hash_aggregate(
// Assume create_schema() always put group columns in front of aggr columns, we set
// col_idx_base to group expression count.
let aggregate_expressions =
aggregate_expressions(&aggr_expr, &mode, group_expr.len())
.map_err(DataFusionError::into_arrow_external_error)?;
aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;

let random_state = RandomState::new();

Expand All @@ -535,8 +533,7 @@ async fn compute_grouped_hash_aggregate(
batch,
accumulators,
&aggregate_expressions,
)
.map_err(DataFusionError::into_arrow_external_error)?;
)?;
timer.done();
}

Expand Down Expand Up @@ -754,10 +751,8 @@ async fn compute_hash_aggregate(
elapsed_compute: metrics::Time,
) -> ArrowResult<RecordBatch> {
let timer = elapsed_compute.timer();
let mut accumulators = create_accumulators(&aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
let expressions = aggregate_expressions(&aggr_expr, &mode, 0)
.map_err(DataFusionError::into_arrow_external_error)?;
let mut accumulators = create_accumulators(&aggr_expr)?;
let expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
let expressions = Arc::new(expressions);
timer.done();

Expand All @@ -766,16 +761,14 @@ async fn compute_hash_aggregate(
while let Some(batch) = input.next().await {
let batch = batch?;
let timer = elapsed_compute.timer();
aggregate_batch(&mode, &batch, &mut accumulators, &expressions)
.map_err(DataFusionError::into_arrow_external_error)?;
aggregate_batch(&mode, &batch, &mut accumulators, &expressions)?;
timer.done();
}

// 2. convert values to a record batch
let timer = elapsed_compute.timer();
let batch = finalize_aggregation(&accumulators, &mode)
.map(|columns| RecordBatch::try_new(schema.clone(), columns))
.map_err(DataFusionError::into_arrow_external_error)?;
.map(|columns| RecordBatch::try_new(schema.clone(), columns))?;
timer.done();
batch
}
Expand Down Expand Up @@ -904,9 +897,7 @@ fn create_batch_from_map(
match mode {
AggregateMode::Partial => {
for acc in accs.iter() {
let state = acc
.state()
.map_err(DataFusionError::into_arrow_external_error)?;
let state = acc.state()?;
acc_data_types.push(state.len());
}
}
Expand All @@ -924,8 +915,7 @@ fn create_batch_from_map(
.map(|group_state| group_state.group_by_values[i].clone()),
)
})
.collect::<Result<Vec<_>>>()
.map_err(|x| x.into_arrow_external_error())?;
.collect::<Result<Vec<_>>>()?;

// add state / evaluated arrays
for (x, &state_len) in acc_data_types.iter().enumerate() {
Expand All @@ -937,8 +927,7 @@ fn create_batch_from_map(
let x = group_state.accumulator_set[x].state().unwrap();
x[y].clone()
}),
)
.map_err(DataFusionError::into_arrow_external_error)?;
)?;

columns.push(res);
}
Expand All @@ -947,8 +936,7 @@ fn create_batch_from_map(
accumulators.group_states.iter().map(|group_state| {
group_state.accumulator_set[x].evaluate().unwrap()
}),
)
.map_err(DataFusionError::into_arrow_external_error)?;
)?;
columns.push(res);
}
}
Expand Down
11 changes: 5 additions & 6 deletions datafusion/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,14 @@ impl ProjectionStream {
fn batch_project(&self, batch: &RecordBatch) -> ArrowResult<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
self.expr
let arrays = self
.expr
.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
.map_or_else(
|e| Err(DataFusionError::into_arrow_external_error(e)),
|arrays| RecordBatch::try_new(self.schema.clone(), arrays),
)
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(self.schema.clone(), arrays)
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl RepartitionExec {
Err(e) => {
for (_, tx) in txs {
let err = DataFusionError::Execution(format!("Join Error: {}", e));
let err = Err(err.into_arrow_external_error());
let err = Err(err.into());
tx.send(Some(err)).ok();
}
}
Expand All @@ -425,7 +425,7 @@ impl RepartitionExec {
for (_, tx) in txs {
// wrap it because need to send error to all output partitions
let err = DataFusionError::Execution(e.to_string());
let err = Err(err.into_arrow_external_error());
let err = Err(err.into());
tx.send(Some(err)).ok();
}
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,7 @@ fn sort_batch(
&expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()
.map_err(DataFusionError::into_arrow_external_error)?,
.collect::<Result<Vec<SortColumn>>>()?,
None,
)?;

Expand Down
7 changes: 2 additions & 5 deletions datafusion/src/physical_plan/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,15 @@ impl WindowAggStream {
elapsed_compute: crate::physical_plan::metrics::Time,
) -> ArrowResult<RecordBatch> {
let input_schema = input.schema();
let batches = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)?;
let batches = common::collect(input).await?;

// record compute time on drop
let _timer = elapsed_compute.timer();

let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
// calculate window cols
let mut columns = compute_window_aggregates(window_expr, &batch)
.map_err(DataFusionError::into_arrow_external_error)?;
let mut columns = compute_window_aggregates(window_expr, &batch)?;
// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
// expressions are always prepended to the columns
Expand Down