Skip to content

Commit e0a1211

Browse files
adriangbLiaCastaneda
authored andcommitted
replace HashTableLookupExpr with lit(true) in proto serialization (apache#19300)
*errors* when serializing now, and would break any users using joins + protobuf. (cherry picked from commit d61f1a7)
1 parent 115313c commit e0a1211

File tree

5 files changed

+87
-8
lines changed

5 files changed

+87
-8
lines changed

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! [`HashJoinExec`] Partitioned Hash Join Operator
1919
2020
pub use exec::HashJoinExec;
21+
pub use partitioned_hash_eval::HashTableLookupExpr;
2122

2223
mod exec;
2324
mod inlist_builder;

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
4242
/// This is used for:
4343
/// - Computing routing hashes (with RepartitionExec's 0,0,0,0 seeds)
4444
/// - Computing lookup hashes (with HashJoin's 'J','O','I','N' seeds)
45-
pub(super) struct HashExpr {
45+
pub struct HashExpr {
4646
/// Columns to hash
4747
on_columns: Vec<PhysicalExprRef>,
4848
/// Random state for hashing
@@ -179,7 +179,11 @@ impl HashTableLookupExpr {
179179
/// * `hash_expr` - Expression that computes hash values
180180
/// * `hash_map` - Hash table to check membership
181181
/// * `description` - Description for debugging
182-
pub(super) fn new(
182+
///
183+
/// # Note
184+
/// This is public for internal testing purposes only and is not
185+
/// guaranteed to be stable across versions.
186+
pub fn new(
183187
hash_expr: PhysicalExprRef,
184188
hash_map: Arc<dyn JoinHashMapType>,
185189
description: String,

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use arrow::array::BooleanBufferBuilder;
2121
pub use cross_join::CrossJoinExec;
2222
use datafusion_physical_expr::PhysicalExprRef;
23-
pub use hash_join::HashJoinExec;
23+
pub use hash_join::{HashJoinExec, HashTableLookupExpr};
2424
pub use nested_loop_join::NestedLoopJoinExec;
2525
use parking_lot::Mutex;
2626
// Note: SortMergeJoin is not used in plans yet
@@ -35,7 +35,11 @@ mod symmetric_hash_join;
3535
pub mod utils;
3636

3737
mod join_filter;
38-
mod join_hash_map;
38+
/// Hash map implementations for join operations.
39+
///
40+
/// Note: This module is public for internal testing purposes only
41+
/// and is not guaranteed to be stable across versions.
42+
pub mod join_hash_map;
3943

4044
#[cfg(test)]
4145
pub mod test_utils;

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion::physical_plan::expressions::{
3333
};
3434
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
3535
use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
36+
use datafusion::physical_plan::joins::HashTableLookupExpr;
3637
use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
3738
use datafusion::{
3839
datasource::{
@@ -226,6 +227,30 @@ pub fn serialize_physical_expr(
226227
let value = snapshot_physical_expr(Arc::clone(value))?;
227228
let expr = value.as_any();
228229

230+
// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
231+
// It contains an Arc<dyn JoinHashMapType> (the build-side hash table) which
232+
// cannot be serialized - the hash table is a runtime structure built during
233+
// execution on the build side.
234+
//
235+
// We replace it with lit(true) which is safe because:
236+
// 1. The filter is a performance optimization, not a correctness requirement
237+
// 2. lit(true) passes all rows, so no valid rows are incorrectly filtered out
238+
// 3. The join itself will still produce correct results, just without the
239+
// benefit of early filtering on the probe side
240+
//
241+
// In distributed execution, the remote worker won't have access to the hash
242+
// table anyway, so the best we can do is skip this optimization.
243+
if expr.downcast_ref::<HashTableLookupExpr>().is_some() {
244+
let value = datafusion_proto_common::ScalarValue {
245+
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
246+
true,
247+
)),
248+
};
249+
return Ok(protobuf::PhysicalExprNode {
250+
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)),
251+
});
252+
}
253+
229254
if let Some(expr) = expr.downcast_ref::<Column>() {
230255
Ok(protobuf::PhysicalExprNode {
231256
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ use datafusion::physical_plan::expressions::{
7575
};
7676
use datafusion::physical_plan::filter::FilterExec;
7777
use datafusion::physical_plan::joins::{
78-
HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
79-
StreamJoinPartitionMode, SymmetricHashJoinExec,
78+
HashJoinExec, HashTableLookupExpr, NestedLoopJoinExec, PartitionMode,
79+
SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
8080
};
8181
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
8282
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
@@ -103,12 +103,12 @@ use datafusion_common::{
103103
internal_err, not_impl_err, DataFusionError, NullEquality, Result, UnnestOptions,
104104
};
105105
use datafusion_expr::{
106-
Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF,
107-
Signature, SimpleAggregateUDF, WindowFrame, WindowFrameBound, WindowUDF,
106+
Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, Signature, SimpleAggregateUDF, WindowFrame, WindowFrameBound, WindowUDF
108107
};
109108
use datafusion_functions_aggregate::average::avg_udaf;
110109
use datafusion_functions_aggregate::nth_value::nth_value_udaf;
111110
use datafusion_functions_aggregate::string_agg::string_agg_udaf;
111+
use datafusion::physical_plan::joins::join_hash_map::JoinHashMapU32;
112112
use datafusion_proto::physical_plan::{
113113
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
114114
};
@@ -2238,3 +2238,48 @@ async fn roundtrip_memory_source() -> Result<()> {
22382238
.await?;
22392239
roundtrip_test(plan)
22402240
}
2241+
2242+
/// Test that HashTableLookupExpr serializes to lit(true)
2243+
///
2244+
/// HashTableLookupExpr contains a runtime hash table that cannot be serialized.
2245+
/// The serialization code replaces it with lit(true) which is safe because
2246+
/// it's a performance optimization filter, not a correctness requirement.
2247+
#[test]
2248+
fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> {
2249+
// Create a simple schema and input plan
2250+
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]));
2251+
let input = Arc::new(EmptyExec::new(schema.clone()));
2252+
2253+
// Create a HashTableLookupExpr - it will be replaced with lit(true) during serialization
2254+
let hash_map = Arc::new(JoinHashMapU32::with_capacity(0));
2255+
let hash_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new("col", 0));
2256+
let lookup_expr: Arc<dyn PhysicalExpr> = Arc::new(HashTableLookupExpr::new(
2257+
hash_expr,
2258+
hash_map,
2259+
"test_lookup".to_string(),
2260+
));
2261+
2262+
// Create a filter with the lookup expression
2263+
let filter = Arc::new(FilterExec::try_new(lookup_expr, input)?);
2264+
2265+
// Serialize
2266+
let ctx = SessionContext::new();
2267+
let codec = DefaultPhysicalExtensionCodec {};
2268+
let proto: protobuf::PhysicalPlanNode =
2269+
protobuf::PhysicalPlanNode::try_from_physical_plan(filter.clone(), &codec)
2270+
.expect("serialization should succeed");
2271+
2272+
// Deserialize
2273+
let result: Arc<dyn ExecutionPlan> = proto
2274+
.try_into_physical_plan(&ctx, &ctx.runtime_env(), &codec)
2275+
.expect("deserialization should succeed");
2276+
2277+
// The deserialized plan should have lit(true) instead of HashTableLookupExpr
2278+
// Verify the filter predicate is a Literal(true)
2279+
let result_filter = result.as_any().downcast_ref::<FilterExec>().unwrap();
2280+
let predicate = result_filter.predicate();
2281+
let literal = predicate.as_any().downcast_ref::<Literal>().unwrap();
2282+
assert_eq!(*literal.value(), ScalarValue::Boolean(Some(true)));
2283+
2284+
Ok(())
2285+
}

0 commit comments

Comments
 (0)