Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
435 changes: 234 additions & 201 deletions native/Cargo.lock

Large diffs are not rendered by default.

32 changes: 16 additions & 16 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,26 @@ license = "Apache-2.0"
edition = "2021"

# Comet uses the same minimum Rust version as DataFusion
rust-version = "1.81"
rust-version = "1.85"

[workspace.dependencies]
arrow = { version = "54.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "54.1.0" }
arrow-buffer = { version = "54.1.0" }
arrow-data = { version = "54.1.0" }
arrow-schema = { version = "54.1.0" }
arrow = { version = "54.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "54.2.0" }
arrow-buffer = { version = "54.2.0" }
arrow-data = { version = "54.2.0" }
arrow-schema = { version = "54.2.0" }
async-trait = { version = "0.1" }
bytes = { version = "1.10.0" }
parquet = { version = "54.1.0", default-features = false, features = ["experimental"] }
datafusion = { version = "45.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
datafusion-common = { version = "45.0.0", default-features = false }
datafusion-functions = { version = "45.0.0", default-features = false, features = ["crypto_expressions"] }
datafusion-functions-nested = { version = "45.0.0", default-features = false }
datafusion-expr = { version = "45.0.0", default-features = false }
datafusion-expr-common = { version = "45.0.0", default-features = false }
datafusion-execution = { version = "45.0.0", default-features = false }
datafusion-physical-plan = { version = "45.0.0", default-features = false }
datafusion-physical-expr = { version = "45.0.0", default-features = false }
parquet = { version = "54.2.0", default-features = false, features = ["experimental"] }
datafusion = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
datafusion-datasource = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false, features = ["crypto_expressions"] }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
datafusion-expr-common = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "46.0.0-rc2", default-features = false }
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.7.0" }
datafusion-comet-proto = { path = "proto", version = "0.7.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand Down
8 changes: 5 additions & 3 deletions native/core/benches/bloom_filter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ScalarValue;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -88,8 +89,9 @@ async fn agg_test(
mode: AggregateMode,
) {
let schema = &partitions[0][0].schema();
let scan: Arc<dyn ExecutionPlan> =
Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap());
let scan: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(partitions, Arc::clone(schema), None).unwrap(),
)));
let aggregate = create_aggregate(scan, c0.clone(), schema, aggregate_udf, alias, mode);
let mut stream = aggregate
.execute(0, Arc::new(TaskContext::default()))
Expand Down
8 changes: 6 additions & 2 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::shuffle::{CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::metrics::Time;
use datafusion::{
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
physical_plan::{common::collect, ExecutionPlan},
prelude::SessionContext,
};
use datafusion_physical_expr::{expressions::Column, Partitioning};
Expand Down Expand Up @@ -88,7 +90,9 @@ fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWri
let schema = batches[0].schema();
let partitions = &[batches];
ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, schema, None).unwrap()),
Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(),
))),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
compression_codec,
"/tmp/data.out".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/expressions/bloom_filter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::array::ArrayRef;
use arrow_array::BinaryArray;
use datafusion::error::Result;
use datafusion::physical_expr::PhysicalExpr;
use datafusion_common::{downcast_value, DataFusionError, ScalarValue};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_expr::{
function::{AccumulatorArgs, StateFieldsArgs},
Accumulator, AggregateUDFImpl, Signature,
Expand Down
38 changes: 21 additions & 17 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ use crate::execution::spark_plan::SparkPlan;
use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions};
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
use datafusion_comet_proto::{
Expand Down Expand Up @@ -1274,16 +1274,6 @@ impl PhysicalPlanner {
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
})
.collect_vec();
let mut file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema))
.with_file_groups(file_groups)
.with_table_partition_cols(partition_fields);

assert_eq!(
projection_vector.len(),
required_schema.fields.len() + partition_schema.fields.len()
);
file_scan_config = file_scan_config.with_projection(Some(projection_vector));

let mut table_parquet_options = TableParquetOptions::new();
// TODO: Maybe these are configs?
Expand All @@ -1297,17 +1287,31 @@ impl PhysicalPlanner {
);
spark_parquet_options.allow_cast_unsigned_ints = true;

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options)
let mut parquet_source = ParquetSource::new(table_parquet_options)
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
spark_parquet_options,
)));

if let Some(filter) = cnf_data_filters {
builder = builder.with_predicate(filter);
parquet_source =
parquet_source.with_predicate(Arc::clone(&data_schema), filter);
}

let scan = builder.build();
let mut file_scan_config = FileScanConfig::new(
object_store_url,
Arc::clone(&data_schema),
Arc::new(parquet_source),
)
.with_file_groups(file_groups)
.with_table_partition_cols(partition_fields);

assert_eq!(
projection_vector.len(),
required_schema.fields.len() + partition_schema.fields.len()
);
file_scan_config = file_scan_config.with_projection(Some(projection_vector));

let scan = DataSourceExec::new(Arc::new(file_scan_config));
Ok((
vec![],
Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan), vec![])),
Expand Down Expand Up @@ -1604,8 +1608,8 @@ impl PhysicalPlanner {
let window_agg = Arc::new(BoundedWindowAggExec::try_new(
window_expr?,
Arc::clone(&child.native_plan),
partition_exprs.to_vec(),
InputOrderMode::Sorted,
!partition_exprs.is_empty(),
)?);
Ok((
scans,
Expand Down
7 changes: 5 additions & 2 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,8 +909,9 @@ mod test {
use super::*;
use crate::execution::shuffle::read_ipc_compressed;
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
Expand Down Expand Up @@ -1152,7 +1153,9 @@ mod test {

let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(),
))),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
CompressionCodec::Zstd(1),
"/tmp/data.out".to_string(),
Expand Down
41 changes: 21 additions & 20 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow_array::{Array, RecordBatch};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_common::config::TableParquetOptions;
Expand Down Expand Up @@ -666,18 +666,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
start + length,
);
partitioned_file.object_meta.location = object_store_path;
// We build the file scan config with the *required* schema so that the reader knows
// the output schema we want
let file_scan_config = FileScanConfig::new(object_store_url, Arc::new(required_schema_arrow))
.with_file(partitioned_file)
// TODO: (ARROW NATIVE) - do partition columns in native
// - will need partition schema and partition values to do so
// .with_table_partition_cols(partition_fields)
;
let mut table_parquet_options = TableParquetOptions::new();
// TODO: Maybe these are configs?
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;
let session_timezone: String = env
.get_string(&JString::from_raw(session_timezone))
.unwrap()
Expand All @@ -687,16 +675,29 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
SparkParquetOptions::new(EvalMode::Legacy, session_timezone.as_str(), false);
spark_parquet_options.allow_cast_unsigned_ints = true;

let builder2 = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options)
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
spark_parquet_options,
)));
let mut table_parquet_options = TableParquetOptions::new();
// TODO: Maybe these are configs?
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let parquet_source = ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
);

// We build the file scan config with the *required* schema so that the reader knows
// the output schema we want
let file_scan_config = FileScanConfig::new(object_store_url, Arc::new(required_schema_arrow), Arc::new(parquet_source))
.with_file(partitioned_file)
// TODO: (ARROW NATIVE) - do partition columns in native
// - will need partition schema and partition values to do so
// .with_table_partition_cols(partition_fields)
;

//TODO: (ARROW NATIVE) - predicate pushdown??
// builder = builder.with_predicate(filter);

let scan = builder2.build();
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config)));

let ctx = TaskContext::default();
let partition_index: usize = 0;
batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
Expand Down
26 changes: 16 additions & 10 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,14 @@ mod test {
use arrow_array::UInt32Array;
use arrow_schema::SchemaRef;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_comet_spark_expr::test_common::file_util::get_temp_filename;
use datafusion_comet_spark_expr::EvalMode;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::DataFusionError;
use futures::StreamExt;
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -341,19 +343,23 @@ mod test {
writer.close()?;

let object_store_url = ObjectStoreUrl::local_filesystem();
let file_scan_config = FileScanConfig::new(object_store_url, required_schema)
.with_file_groups(vec![vec![PartitionedFile::from_path(
filename.to_string(),
)?]]);

let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
spark_parquet_options.allow_cast_unsigned_ints = true;

let parquet_exec = ParquetExec::builder(file_scan_config)
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
spark_parquet_options,
)))
.build();
let parquet_source = Arc::new(
ParquetSource::new(TableParquetOptions::new()).with_schema_adapter_factory(Arc::new(
SparkSchemaAdapterFactory::new(spark_parquet_options),
)),
);

let file_scan_config =
FileScanConfig::new(object_store_url, required_schema, parquet_source)
.with_file_groups(vec![vec![PartitionedFile::from_path(
filename.to_string(),
)?]]);

let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config));

let mut stream = parquet_exec
.execute(0, Arc::new(TaskContext::default()))
Expand Down
8 changes: 5 additions & 3 deletions native/spark-expr/benches/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use arrow_array::builder::{Decimal128Builder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::TaskContext;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_comet_spark_expr::AvgDecimal;
use datafusion_comet_spark_expr::SumDecimal;
Expand Down Expand Up @@ -119,8 +120,9 @@ async fn agg_test(
alias: &str,
) {
let schema = &partitions[0][0].schema();
let scan: Arc<dyn ExecutionPlan> =
Arc::new(MemoryExec::try_new(partitions, Arc::clone(schema), None).unwrap());
let scan: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(partitions, Arc::clone(schema), None).unwrap(),
)));
let aggregate = create_aggregate(scan, c0.clone(), c1.clone(), schema, aggregate_udf, alias);
let mut stream = aggregate
.execute(0, Arc::new(TaskContext::default()))
Expand Down
5 changes: 3 additions & 2 deletions native/spark-expr/src/agg_funcs/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_common::types::NativeType;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::{AggregateUDFImpl, Signature, Volatility};
use datafusion_expr_common::signature::Coercion;
use datafusion_physical_expr::expressions::format_state_name;
use datafusion_physical_expr::expressions::StatsType;

Expand Down Expand Up @@ -56,11 +57,11 @@ impl Stddev {
Self {
name: name.into(),
signature: Signature::coercible(
vec![
vec![Coercion::new_exact(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this Coercion do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use of Coercion::new_exact means that we only accept the specific type (Float64) and will not perform any coercion.

From DataFusion docs:

/// There are two variants:
///
/// * `Exact` - Only accepts arguments that exactly match the desired type
/// * `Implicit` - Accepts the desired type and can coerce from specified source types

datafusion_expr_common::signature::TypeSignatureClass::Native(Arc::new(
NativeType::Float64,
)),
],
)],
Volatility::Immutable,
),
stats_type,
Expand Down
8 changes: 5 additions & 3 deletions native/spark-expr/src/agg_funcs/sum_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,10 @@ mod tests {
use arrow::datatypes::*;
use arrow_array::builder::{Decimal128Builder, StringBuilder};
use arrow_array::RecordBatch;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::Result;
use datafusion_expr::AggregateUDF;
Expand Down Expand Up @@ -495,8 +496,9 @@ mod tests {

let data_type = DataType::Decimal128(8, 2);
let schema = Arc::clone(&partitions[0][0].schema());
let scan: Arc<dyn ExecutionPlan> =
Arc::new(MemoryExec::try_new(partitions, Arc::clone(&schema), None).unwrap());
let scan: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(partitions, Arc::clone(&schema), None).unwrap(),
)));

let aggregate_udf = Arc::new(AggregateUDF::new_from_impl(SumDecimal::try_new(
data_type.clone(),
Expand Down
Loading
Loading