Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ use crate::{
joins::join_hash_map::JoinHashMapOffset,
joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices, build_join_schema, check_join_is_valid,
estimate_join_statistics, need_produce_result_in_final,
build_batch_empty_build_side, build_batch_from_indices, build_join_schema,
check_join_is_valid, estimate_join_statistics, need_produce_result_in_final,
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
JoinFilter, JoinHashMapType, StatefulStreamResult,
},
Expand All @@ -70,8 +70,8 @@ use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError,
JoinSide, JoinType, NullEquality, Result,
internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide, JoinType,
NullEquality, Result,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -1363,11 +1363,9 @@ pub fn equal_rows_arr(
) -> Result<(UInt64Array, UInt32Array)> {
let mut iter = left_arrays.iter().zip(right_arrays.iter());

let (first_left, first_right) = iter.next().ok_or_else(|| {
DataFusionError::Internal(
"At least one array should be provided for both left and right".to_string(),
)
})?;
let Some((first_left, first_right)) = iter.next() else {
return Ok((Vec::<u64>::new().into(), Vec::<u32>::new().into()));
};

let arr_left = take(first_left.as_ref(), indices_left, None)?;
let arr_right = take(first_right.as_ref(), indices_right, None)?;
Expand Down Expand Up @@ -1498,6 +1496,23 @@ impl HashJoinStream {

let timer = self.join_metrics.join_time.timer();

// if the left side is empty, we can skip the (potentially expensive) join operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we would check the left side being empty before retrieving probe batches, we could also remove hash repartition 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this in a follow up pr wdyt @nuno-faria?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Can you point out where the probe repartition is being triggered? In the process_probe_batch itself I think we can also skip creating the hashes when the build side is empty, but I measured and it didn't have a relatively big impact on performance.

if build_side.left_data.hash_map.is_empty() && self.filter.is_none() {
let result = build_batch_empty_build_side(
&self.schema,
build_side.left_data.batch(),
&state.batch,
&self.column_indices,
self.join_type,
)?;
self.join_metrics.output_batches.add(1);
timer.done();

self.state = HashJoinStreamState::FetchProbeBatch;

return Ok(StatefulStreamResult::Ready(Some(result)));
}

// get the matched by join keys indices
let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
Expand Down
11 changes: 11 additions & 0 deletions datafusion/physical-plan/src/joins/join_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ pub trait JoinHashMapType: Send + Sync {
limit: usize,
offset: JoinHashMapOffset,
) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>);

/// Returns `true` if the join hash map contains no entries.
fn is_empty(&self) -> bool;
}

pub struct JoinHashMapU32 {
Expand Down Expand Up @@ -176,6 +179,10 @@ impl JoinHashMapType for JoinHashMapU32 {
offset,
)
}

fn is_empty(&self) -> bool {
self.map.is_empty()
}
}

pub struct JoinHashMapU64 {
Expand Down Expand Up @@ -238,6 +245,10 @@ impl JoinHashMapType for JoinHashMapU64 {
offset,
)
}

fn is_empty(&self) -> bool {
self.map.is_empty()
}
}

// Type of offsets for obtaining indices from JoinHashMap.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ impl JoinHashMapType for PruningJoinHashMap {
offset,
)
}

fn is_empty(&self) -> bool {
self.map.is_empty()
}
}

/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with
Expand Down
52 changes: 51 additions & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ pub use super::join_filter::JoinFilter;
pub use super::join_hash_map::JoinHashMapType;
pub use crate::joins::{JoinOn, JoinOnRef};

use arrow::array::BooleanArray;
use arrow::array::{
builder::UInt64Builder, downcast_array, new_null_array, Array, ArrowPrimitiveType,
BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch, RecordBatchOptions,
UInt32Array, UInt32Builder, UInt64Array,
};
use arrow::buffer::NullBuffer;
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::compute;
use arrow::datatypes::{
ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
Expand Down Expand Up @@ -928,6 +929,55 @@ pub(crate) fn build_batch_from_indices(
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}

/// Returns a new [RecordBatch] resulting of a join where the build/left side is empty.
/// The resulting batch has [Schema] `schema`.
pub(crate) fn build_batch_empty_build_side(
schema: &Schema,
build_batch: &RecordBatch,
probe_batch: &RecordBatch,
column_indices: &[ColumnIndex],
join_type: JoinType,
) -> Result<RecordBatch> {
match join_type {
// these join types only return data if the left side is not empty, so we return an
// empty RecordBatch
JoinType::Inner
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, how about cross join

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross joins with an empty relation already appear to run well in the CrossJoinExec operator.

Here is the CrossJoinExec operator for SELECT * FROM t1, t2, where t1 has 100M rows and t2 has none:

CrossJoinExec, metrics=[
    output_rows=0,
    elapsed_compute=351.714µs,
    build_input_batches=0,
    build_input_rows=0,
    input_batches=0,
    input_rows=0,
    output_batches=0,
    build_mem_used=0,
    build_time=351.7µs,
    join_time=12ns
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this makes sense, cross join is not a join type that would go through creating hash table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, I think a more generic version of this would be switching small left sides (e.g < 10 rows) to using cross join 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, I think a more generic version of this would be switching small left sides (e.g < 10 rows) to using cross join 🤔

Is this including for equijoin conditions? I think the performance seemed slow when there was a larger right table for doing this with nested loop join which follows a similar algorithm. It is probably a memory issue due to the cartesian product.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be relatively fast to do a cross join / NLJ instead of a hash join for those cases, but of course depends how the nested loop join is implemented, probably there is more room for optimization of the nested loop join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of opening a proposal to make nested loop join faster, there are definitely some issues to work on there. I'll try to get to that when I have the time

| JoinType::Left
| JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::LeftMark => Ok(RecordBatch::new_empty(Arc::new(schema.clone()))),

// the remaining joins will return data for the right columns and null for the left ones
JoinType::Right | JoinType::Full | JoinType::RightAnti | JoinType::RightMark => {
let num_rows = probe_batch.num_rows();
let mut columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(schema.fields().len());

for column_index in column_indices {
let array = match column_index.side {
// left -> null array
JoinSide::Left => new_null_array(
build_batch.column(column_index.index).data_type(),
num_rows,
),
// right -> respective right array
JoinSide::Right => Arc::clone(probe_batch.column(column_index.index)),
// right mark -> unset boolean array as there are no matches on the left side
JoinSide::None => Arc::new(BooleanArray::new(
BooleanBuffer::new_unset(num_rows),
None,
)),
};

columns.push(array);
}

Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}
}
}

/// The input is the matched indices for left and right and
/// adjust the indices according to the join type
pub(crate) fn adjust_indices_by_join_type(
Expand Down
Loading