Skip to content

Commit d688655

Browse files
authored
chore: migrate to DF 49.0.0 (#2040)
1 parent 9505478 commit d688655

File tree

18 files changed

+528
-311
lines changed

18 files changed

+528
-311
lines changed

native/Cargo.lock

Lines changed: 464 additions & 257 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ edition = "2021"
3434
rust-version = "1.85"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "55.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow = { version = "55.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
3838
async-trait = { version = "0.1" }
3939
bytes = { version = "1.10.0" }
40-
parquet = { version = "55.1.0", default-features = false, features = ["experimental"] }
41-
datafusion = { version = "48.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42-
datafusion-spark = { version = "48.0.0" }
40+
parquet = { version = "55.2.0", default-features = false, features = ["experimental"] }
41+
datafusion = { version = "49.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42+
datafusion-spark = { version = "49.0.0" }
4343
datafusion-comet-spark-expr = { path = "spark-expr" }
4444
datafusion-comet-proto = { path = "proto" }
4545
chrono = { version = "0.4", default-features = false, features = ["clock"] }
@@ -49,7 +49,7 @@ num = "0.4"
4949
rand = "0.9"
5050
regex = "1.9.6"
5151
thiserror = "2"
52-
object_store = { version = "0.12.0", features = ["gcp", "azure", "aws", "http"] }
52+
object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"] }
5353
url = "2.2"
5454
aws-config = "1.6.3"
5555
aws-credential-types = "1.2.3"

native/core/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default
7878
procfs = "0.17.0"
7979

8080
[dev-dependencies]
81-
pprof = { version = "0.14.0", features = ["flamegraph"] }
82-
criterion = { version = "0.5.1", features = ["async_tokio"] }
81+
pprof = { version = "0.15", features = ["flamegraph"] }
82+
criterion = { version = "0.7", features = ["async", "async_tokio", "async_std"] }
8383
jni = { version = "0.21", features = ["invocation"] }
8484
lazy_static = "1.4"
8585
assertables = "9"
8686
hex = "0.4.3"
87-
datafusion-functions-nested = { version = "48.0.0" }
87+
datafusion-functions-nested = { version = "49.0.0" }
8888

8989
[features]
9090
default = []

native/core/benches/bit_util.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use comet::common::bit::{
2424
log2, read_num_bytes_u32, read_num_bytes_u64, read_u32, read_u64, set_bits, trailing_bits,
2525
BitReader, BitWriter,
2626
};
27-
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
27+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
28+
use std::hint::black_box;
2829

2930
/// Benchmark to measure bit_util performance.
3031
/// To run this benchmark:

native/core/benches/filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use arrow::array::{ArrayRef, RecordBatch};
2020
use arrow::compute::filter_record_batch;
2121
use arrow::datatypes::{DataType, Field, Schema};
2222
use comet::execution::operators::comet_filter_record_batch;
23-
use criterion::{black_box, criterion_group, criterion_main, Criterion};
23+
use criterion::{criterion_group, criterion_main, Criterion};
24+
use std::hint::black_box;
2425
use std::sync::Arc;
2526
use std::time::Duration;
2627

native/core/benches/shuffle_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ fn criterion_benchmark(c: &mut Criterion) {
8989
CometPartitioning::RangePartitioning(
9090
LexOrdering::new(vec![PhysicalSortExpr::new_default(
9191
col("c0", batch.schema().as_ref()).unwrap(),
92-
)]),
92+
)])
93+
.unwrap(),
9394
16,
9495
100,
9596
),

native/core/src/execution/operators/filter.rs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -211,22 +211,16 @@ impl FilterExec {
211211
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() {
212212
if binary.op() == &Operator::Eq {
213213
// Filter evaluates to single value for all partitions
214-
if input_eqs.is_expr_constant(binary.left()) {
215-
let (expr, across_parts) = (
216-
binary.right(),
217-
input_eqs.get_expr_constant_value(binary.right()),
218-
);
219-
res_constants.push(
220-
ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts),
221-
);
222-
} else if input_eqs.is_expr_constant(binary.right()) {
223-
let (expr, across_parts) = (
224-
binary.left(),
225-
input_eqs.get_expr_constant_value(binary.left()),
226-
);
227-
res_constants.push(
228-
ConstExpr::new(Arc::clone(expr)).with_across_partitions(across_parts),
229-
);
214+
if input_eqs.is_expr_constant(binary.left()).is_some() {
215+
let across = input_eqs
216+
.is_expr_constant(binary.right())
217+
.unwrap_or_default();
218+
res_constants.push(ConstExpr::new(Arc::clone(binary.right()), across));
219+
} else if input_eqs.is_expr_constant(binary.right()).is_some() {
220+
let across = input_eqs
221+
.is_expr_constant(binary.left())
222+
.unwrap_or_default();
223+
res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across));
230224
}
231225
}
232226
}
@@ -246,7 +240,7 @@ impl FilterExec {
246240
let mut eq_properties = input.equivalence_properties().clone();
247241
let (equal_pairs, _) = collect_columns_from_predicate(predicate);
248242
for (lhs, rhs) in equal_pairs {
249-
eq_properties.add_equal_conditions(lhs, rhs)?
243+
eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
250244
}
251245
// Add the columns that have only one viable value (singleton) after
252246
// filtering to constants.
@@ -258,14 +252,13 @@ impl FilterExec {
258252
.min_value
259253
.get_value();
260254
let expr = Arc::new(column) as _;
261-
ConstExpr::new(expr)
262-
.with_across_partitions(AcrossPartitions::Uniform(value.cloned()))
255+
ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned()))
263256
});
264257
// This is for statistics
265-
eq_properties = eq_properties.with_constants(constants);
258+
eq_properties.add_constants(constants)?;
266259
// This is for logical constant (for example: a = '1', then a could be marked as a constant)
267-
// to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
268-
eq_properties = eq_properties.with_constants(Self::extend_constants(input, predicate));
260+
// to do: how to deal with a multiple situation to represent = (for example, c1 between 0 and 0)
261+
eq_properties.add_constants(Self::extend_constants(input, predicate))?;
269262

270263
let mut output_partitioning = input.output_partitioning().clone();
271264
// If contains projection, update the PlanProperties.

native/core/src/execution/planner.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use crate::parquet::parquet_support::prepare_object_store_with_configs;
7272
use datafusion::common::scalar::ScalarStructBuilder;
7373
use datafusion::common::{
7474
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter},
75-
JoinType as DFJoinType, ScalarValue,
75+
JoinType as DFJoinType, NullEquality, ScalarValue,
7676
};
7777
use datafusion::datasource::listing::PartitionedFile;
7878
use datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression;
@@ -594,6 +594,14 @@ impl PhysicalPlanner {
594594
true,
595595
false,
596596
))),
597+
// DataFusion 49 hardcodes return type for MD5 built in function as UTF8View
598+
// which is not yet supported in Comet
599+
// Converting forcibly to UTF8. To be removed after UTF8View supported
600+
"md5" => Ok(Arc::new(Cast::new(
601+
func?,
602+
DataType::Utf8,
603+
SparkCastOptions::new_without_timezone(EvalMode::Try, true),
604+
))),
597605
_ => func,
598606
}
599607
}
@@ -1153,7 +1161,7 @@ impl PhysicalPlanner {
11531161
let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan));
11541162

11551163
let sort = Arc::new(
1156-
SortExec::new(LexOrdering::new(exprs?), Arc::clone(&child_copied))
1164+
SortExec::new(LexOrdering::new(exprs?).unwrap(), Arc::clone(&child_copied))
11571165
.with_fetch(fetch),
11581166
);
11591167

@@ -1429,7 +1437,7 @@ impl PhysicalPlanner {
14291437
sort_options,
14301438
// null doesn't equal to null in Spark join key. If the join key is
14311439
// `EqualNullSafe`, Spark will rewrite it during planning.
1432-
false,
1440+
NullEquality::NullEqualsNothing,
14331441
)?);
14341442

14351443
if join.filter.is_some() {
@@ -1497,7 +1505,7 @@ impl PhysicalPlanner {
14971505
PartitionMode::Partitioned,
14981506
// null doesn't equal to null in Spark join key. If the join key is
14991507
// `EqualNullSafe`, Spark will rewrite it during planning.
1500-
false,
1508+
NullEquality::NullEqualsNothing,
15011509
)?);
15021510

15031511
// If the hash join is build right, we need to swap the left and right
@@ -2193,13 +2201,15 @@ impl PhysicalPlanner {
21932201
};
21942202

21952203
let window_frame = WindowFrame::new_bounds(units, lower_bound, upper_bound);
2204+
let lex_orderings = LexOrdering::new(sort_exprs.to_vec());
2205+
let sort_phy_exprs = lex_orderings.as_deref().unwrap_or(&[]);
21962206

21972207
datafusion::physical_plan::windows::create_window_expr(
21982208
&window_func,
21992209
window_func_name,
22002210
&window_args,
22012211
partition_by,
2202-
&LexOrdering::new(sort_exprs.to_vec()),
2212+
sort_phy_exprs,
22032213
window_frame.into(),
22042214
input_schema.as_ref(),
22052215
false, // TODO: Ignore nulls
@@ -2280,7 +2290,7 @@ impl PhysicalPlanner {
22802290
.iter()
22812291
.map(|expr| self.create_sort_expr(expr, Arc::clone(&input_schema)))
22822292
.collect();
2283-
let lex_ordering = LexOrdering::from(exprs?);
2293+
let lex_ordering = LexOrdering::new(exprs?).unwrap();
22842294
Ok(CometPartitioning::RangePartitioning(
22852295
lex_ordering,
22862296
range_partition.num_partitions as usize,

native/core/src/execution/shuffle/comet_partitioning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub enum CometPartitioning {
2424
/// Allocate rows based on a hash of one of more expressions and the specified number of
2525
/// partitions
2626
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
27-
/// Allocate rows based on lexical order of one of more expressions and the specified number of
27+
/// Allocate rows based on the lexical order of one of more expressions and the specified number of
2828
/// partitions
2929
RangePartitioning(LexOrdering, usize, usize),
3030
}

native/core/src/execution/shuffle/range_partitioner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ mod test {
247247

248248
let (rows, row_converter) = RangePartitioner::generate_bounds(
249249
input_batch.columns().to_vec().as_ref(),
250-
&lex_ordering,
250+
&lex_ordering.unwrap(),
251251
10,
252252
input_batch.num_rows(),
253253
1000,

0 commit comments

Comments
 (0)