Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn create_runtime_env() -> Result<RuntimeEnv> {
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
let rn_config =
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
return RuntimeEnv::new(rn_config);
RuntimeEnv::new(rn_config)
}

fn is_valid_file(dir: &str) -> std::result::Result<(), String> {
Expand Down
12 changes: 7 additions & 5 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn build_gcs_object_store(url: &Url) -> Result<Arc<dyn object_store::ObjectStore
let host = get_host_name(url)?;
let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(host);

if let Some(path) = env::var("GCP_SERVICE_ACCOUNT_PATH").ok() {
if let Ok(path) = env::var("GCP_SERVICE_ACCOUNT_PATH") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are nice cleanups but not directly connected to the other tickets in this PR. Were they suggested by clippy or just things you saw as you were reviewing the code?

@mingmwang mingmwang Oct 19, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are suggested by the clippy, so I have to change accordingly.

builder = builder.with_service_account_path(path);
}
match builder.build() {
Expand All @@ -78,10 +78,12 @@ fn build_gcs_object_store(url: &Url) -> Result<Arc<dyn object_store::ObjectStore
}

fn get_host_name(url: &Url) -> Result<&str> {
url.host_str().ok_or(DataFusionError::Execution(format!(
"Not able to parse hostname from url, {}",
url.as_str()
)))
url.host_str().ok_or_else(|| {
DataFusionError::Execution(format!(
"Not able to parse hostname from url, {}",
url.as_str()
))
})
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ impl ExecutionPlan for CustomExec {
None
}

fn equivalence_properties(
&self,
) -> Vec<Vec<datafusion::physical_expr::expressions::Column>> {
vec![]
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
80 changes: 80 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1519,4 +1519,84 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c3"])?
.with_column_renamed("c2.c1", "c2_c1")?;

let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
let join1 =
left.join(right.clone(), JoinType::Inner, &["c1"], &["c2_c1"], None)?;
let join2 = left.join(right, JoinType::Inner, &["c1"], &["c2_c1"], None)?;

let union = join1.union(join2)?;

let union_rows = union.collect().await?;

assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;
assert_eq!(
physical_plan.output_partitioning().partition_count(),
default_partition_count
);
Ok(())
}

#[tokio::test]
async fn non_partition_aware_union() -> Result<()> {
let left = test_table().await?.select_columns(&["c1", "c2"])?;
let right = test_table_with_name("c2")
.await?
.select_columns(&["c1", "c2"])?
.with_column_renamed("c2.c1", "c2_c1")?
.with_column_renamed("c2.c2", "c2_c2")?;

let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
let join1 = left.join(
right.clone(),
JoinType::Inner,
&["c1", "c2"],
&["c2_c1", "c2_c2"],
None,
)?;

// join key ordering is different
let join2 = left.join(
right,
JoinType::Inner,
&["c2", "c1"],
&["c2_c2", "c2_c1"],
None,
)?;

let union = join1.union(join2)?;

let union_rows = union.collect().await?;

assert_eq!(100, left_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(100, right_rows.iter().map(|x| x.num_rows()).sum::<usize>());
assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;

// the union's output partitioning count should be the combination of all output partitions count
assert_eq!(
physical_plan.output_partitioning().partition_count(),
default_partition_count * 2
);
Ok(())
}
}
9 changes: 7 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use datafusion_sql::{ResolvedTableReference, TableReference};

use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
Expand All @@ -82,6 +81,7 @@ use crate::config::{
};
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
Expand Down Expand Up @@ -1145,6 +1145,8 @@ pub struct SessionConfig {
pub parquet_pruning: bool,
/// Should DataFusion collect statistics after listing files
pub collect_statistics: bool,
/// Should DataFusion optimizer run a top down process to reorder the join keys
pub top_down_join_key_reordering: bool,
/// Configuration options
pub config_options: Arc<RwLock<ConfigOptions>>,
/// Opaque extensions.
Expand All @@ -1164,6 +1166,7 @@ impl Default for SessionConfig {
repartition_windows: true,
parquet_pruning: true,
collect_statistics: false,
top_down_join_key_reordering: true,
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
Expand Down Expand Up @@ -1479,6 +1482,7 @@ impl SessionState {
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
if config
.config_options
.read()
Expand All @@ -1496,7 +1500,8 @@ impl SessionState {
)));
}
physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));

SessionState {
session_id,
Expand Down
35 changes: 14 additions & 21 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, with_new_children_if_necessary,
repartition::RepartitionExec, TreeNodeRewritable,
},
};

use std::sync::Arc;

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
Expand All @@ -42,40 +43,32 @@ impl CoalesceBatches {
Self { target_batch_size }
}
}

impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::SessionConfig,
_config: &crate::execution::context::SessionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
// recurse down first
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
let plan = with_new_children_if_necessary(plan, children)?;
let target_batch_size = self.target_batch_size;
plan.transform_up(&|plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/arrow-datafusion/issues/139
let plan_any = plan.as_any();
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
Ok(if wrap_in_coalesce {
Arc::new(CoalesceBatchesExec::new(
if wrap_in_coalesce {
Some(Arc::new(CoalesceBatchesExec::new(
plan.clone(),
self.target_batch_size,
))
target_batch_size,
)))
} else {
plan.clone()
})
}
None
}
})
}

fn name(&self) -> &str {
Expand Down
Loading