From cf286052fb54f0198a5a7c5bc7a4055d71b5aac3 Mon Sep 17 00:00:00 2001 From: Wang Date: Thu, 17 Mar 2022 16:07:07 +0800 Subject: [PATCH 1/4] Refactor SessionContext, SessionState add SessionConfig to support multi-tenancy configurations - Part 2 --- .../rust/core/src/serde/logical_plan/mod.rs | 6 +- ballista/rust/core/src/serde/mod.rs | 8 +- .../src/serde/physical_plan/from_proto.rs | 8 +- .../rust/core/src/serde/physical_plan/mod.rs | 2 +- ballista/rust/core/src/utils.rs | 12 +- .../scheduler/src/scheduler_server/mod.rs | 12 +- datafusion/src/dataframe.rs | 13 +- datafusion/src/datasource/object_store/mod.rs | 2 +- datafusion/src/execution/context.rs | 594 +++++++++--------- datafusion/src/execution/runtime_env.rs | 81 ++- .../aggregate_statistics.rs | 2 +- .../physical_optimizer/coalesce_batches.rs | 2 +- .../src/physical_plan/file_format/csv.rs | 8 +- .../src/physical_plan/file_format/json.rs | 2 +- .../src/physical_plan/file_format/parquet.rs | 10 +- datafusion/src/physical_plan/planner.rs | 7 +- datafusion/src/physical_plan/sorts/sort.rs | 13 +- .../sorts/sort_preserving_merge.rs | 13 +- datafusion/tests/order_spill_fuzz.rs | 6 +- datafusion/tests/user_defined_plan.rs | 9 +- 20 files changed, 418 insertions(+), 392 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 737979355afe8..20ca69be7e2a8 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -224,6 +224,7 @@ impl AsLogicalPlan for LogicalPlanNode { }; let object_store = ctx + .runtime_env() .object_store(scan.path.as_str()) .map_err(|e| { BallistaError::NotImplemented(format!( @@ -1256,9 +1257,10 @@ mod roundtrip_tests { let codec: BallistaCodec = BallistaCodec::default(); let custom_object_store = Arc::new(TestObjectStore {}); - ctx.register_object_store("test", custom_object_store.clone()); + ctx.runtime_env() + .register_object_store("test", custom_object_store.clone()); - let (os, _) = ctx.object_store("test://foo.csv")?; + let (os, _) = ctx.runtime_env().object_store("test://foo.csv")?; println!("Object Store {:?}", os); diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index cc1bbb4174295..00b3a5fa8698c 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -360,6 +360,7 @@ mod tests { use prost::Message; use std::any::Any; + use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use std::convert::TryInto; use std::fmt; use std::fmt::{Debug, Formatter}; @@ -692,10 +693,11 @@ mod tests { #[tokio::test] async fn test_extension_plan() -> crate::error::Result<()> { let store = Arc::new(LocalFileSystem {}); - let config = - SessionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {})); + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let session_state = SessionState::with_config(SessionConfig::new(), runtime) + .with_query_planner(Arc::new(TopKQueryPlanner {})); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::with_state(session_state); let scan = LogicalPlanBuilder::scan_csv( store, diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 79161a315a47c..97c31b9a9d018 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -29,7 +29,7 @@ use chrono::{TimeZone, Utc}; use datafusion::datasource::object_store::local::LocalFileSystem; use datafusion::datasource::object_store::{FileMeta, SizedFile}; use datafusion::datasource::PartitionedFile; -use datafusion::execution::context::SessionState; +use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::file_format::FileScanConfig; @@ -153,12 +153,12 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { .map(|x| x.try_into()) .collect::, _>>()?; - // TODO Do not create new the SessionState - let session_state = SessionState::new(); + // TODO Do not create new the ExecutionProps + let execution_props = ExecutionProps::new(); let fun_expr = functions::create_physical_fun( &(&scalar_function).into(), - &session_state.execution_props, + &execution_props, )?; Arc::new(ScalarFunctionExpr::new( diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index 9c151229dbc3d..a907bd5ff4026 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -905,7 +905,7 @@ fn decode_scan_config( .collect::, _>>()?; let object_store = if let Some(file) = file_groups.get(0).and_then(|h| h.get(0)) { - ctx.object_store(file.file_meta.path())?.0 + ctx.runtime_env().object_store(file.file_meta.path())?.0 } else { Arc::new(LocalFileSystem {}) }; diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index a668b7334cfd5..b769cd60b10b7 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -47,6 +47,7 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::common::batch_byte_size; use datafusion::physical_plan::empty::EmptyExec; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::file_format::{CsvExec, ParquetExec}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; @@ -234,11 +235,16 @@ pub fn create_df_ctx_with_ballista_query_planner( let scheduler_url = format!("http://{}:{}", scheduler_host, scheduler_port); let planner: Arc> = Arc::new(BallistaQueryPlanner::new(scheduler_url, config.clone())); - let config = SessionConfig::new() - .with_query_planner(planner) + + let session_config = SessionConfig::new() .with_target_partitions(config.default_shuffle_partitions()) .with_information_schema(true); - SessionContext::with_config(config) + let session_state = SessionState::with_config( + session_config, + Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()), + ) + .with_query_planner(planner); + SessionContext::with_state(session_state) } pub struct BallistaQueryPlanner { diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 51f6fe4180b41..2228b768bb4d6 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -160,9 +160,19 @@ impl SchedulerServer SessionContext { let config = SessionConfig::new().with_target_partitions(config.default_shuffle_partitions()); SessionContext::with_config(config) } + +/// Update the existing DataFusion session context with Ballista Configuration +pub fn update_datafusion_context( + session_ctx: Arc, + config: &BallistaConfig, +) -> Arc { + session_ctx.state.lock().config.target_partitions = + config.default_shuffle_partitions(); + session_ctx +} diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 643a7cea929dc..36076c06d25c1 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -34,7 +34,7 @@ use crate::arrow::datatypes::SchemaRef; use crate::arrow::util::pretty; use crate::datasource::TableProvider; use crate::datasource::TableType; -use crate::execution::context::{SessionContext, SessionState, TaskContext}; +use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet}; use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; @@ -85,9 +85,8 @@ impl DataFrame { /// Create a physical plan pub async fn create_physical_plan(&self) -> Result> { let state = self.session_state.lock().clone(); - let ctx = SessionContext::from(Arc::new(Mutex::new(state))); - let plan = ctx.optimize(&self.plan)?; - ctx.create_physical_plan(&plan).await + let optimized_plan = state.optimize(&self.plan)?; + state.create_physical_plan(&optimized_plan).await } /// Filter the DataFrame by column. Returns a new DataFrame only containing the @@ -564,8 +563,7 @@ impl DataFrame { pub async fn write_csv(&self, path: &str) -> Result<()> { let plan = self.create_physical_plan().await?; let state = self.session_state.lock().clone(); - let ctx = SessionContext::from(Arc::new(Mutex::new(state))); - plan_to_csv(&ctx, plan, path).await + plan_to_csv(&state, plan, path).await } /// Write a `DataFrame` to a Parquet file. @@ -576,8 +574,7 @@ impl DataFrame { ) -> Result<()> { let plan = self.create_physical_plan().await?; let state = self.session_state.lock().clone(); - let ctx = SessionContext::from(Arc::new(Mutex::new(state))); - plan_to_parquet(&ctx, plan, path, writer_properties).await + plan_to_parquet(&state, plan, path, writer_properties).await } } diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index aad70e70a3087..3a9da67017001 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -163,7 +163,7 @@ pub trait ObjectStore: Sync + Send + Debug { static LOCAL_SCHEME: &str = "file"; -/// A Registry holds all the object stores at runtime with a scheme for each store. +/// A Registry holds all the object stores at Runtime with a scheme for each store. /// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS /// and query data inside these systems. pub struct ObjectStoreRegistry { diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 1361637270124..12cda260b989b 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -42,7 +42,6 @@ use crate::{ use log::{debug, trace}; use parking_lot::Mutex; use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; use std::string::String; use std::sync::Arc; @@ -55,7 +54,6 @@ use crate::catalog::{ }; use crate::dataframe::DataFrame; use crate::datasource::listing::ListingTableConfig; -use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::logical_plan::{ @@ -93,12 +91,7 @@ use chrono::{DateTime, Utc}; use parquet::file::properties::WriterProperties; use uuid::Uuid; -use super::{ - disk_manager::DiskManagerConfig, - memory_manager::MemoryManagerConfig, - options::{AvroReadOptions, CsvReadOptions}, - DiskManager, MemoryManager, -}; +use super::options::{AvroReadOptions, CsvReadOptions}; /// SessionContext is the main interface for executing queries with DataFusion. It stands for /// the connection between user and DataFusion/Ballista cluster. @@ -145,7 +138,9 @@ use super::{ pub struct SessionContext { /// Uuid for the session session_id: String, - /// Internal state for the context + /// Session start time + pub session_start_time: DateTime, + /// Shared session state for the session pub state: Arc>, } @@ -156,49 +151,32 @@ impl Default for SessionContext { } impl SessionContext { - /// Creates a new execution context using a default configuration. + /// Creates a new execution context using a default session configuration. pub fn new() -> Self { Self::with_config(SessionConfig::new()) } - /// Creates a new session context using the provided configuration. + /// Creates a new session context using the provided session configuration. pub fn with_config(config: SessionConfig) -> Self { - let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; - - if config.create_default_catalog_and_schema { - let default_catalog = MemoryCatalogProvider::new(); - - default_catalog.register_schema( - config.default_schema.clone(), - Arc::new(MemorySchemaProvider::new()), - ); - - let default_catalog: Arc = if config.information_schema { - Arc::new(CatalogWithInformationSchema::new( - Arc::downgrade(&catalog_list), - Arc::new(default_catalog), - )) - } else { - Arc::new(default_catalog) - }; + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + Self::with_config_rt(config, runtime) + } - catalog_list - .register_catalog(config.default_catalog.clone(), default_catalog); + /// Creates a new session context using the provided configuration and RuntimeEnv. + pub fn with_config_rt(config: SessionConfig, runtime: Arc) -> Self { + let state = SessionState::with_config(config, runtime); + Self { + session_id: state.session_id.clone(), + session_start_time: chrono::Utc::now(), + state: Arc::new(Mutex::new(state)), } + } - let runtime_env = Arc::new(RuntimeEnv::new(config.runtime.clone()).unwrap()); - let state = SessionState { - session_id: Uuid::new_v4().to_string(), - catalog_list, - scalar_functions: HashMap::new(), - aggregate_functions: HashMap::new(), - config, - execution_props: ExecutionProps::new(), - object_store_registry: Arc::new(ObjectStoreRegistry::new()), - runtime_env, - }; + /// Creates a new session context using the provided session state. + pub fn with_state(state: SessionState) -> Self { Self { session_id: state.session_id.clone(), + session_start_time: chrono::Utc::now(), state: Arc::new(Mutex::new(state)), } } @@ -208,6 +186,11 @@ impl SessionContext { self.state.lock().runtime_env.clone() } + /// Return a copied version of config for this Session + pub fn copied_config(&self) -> SessionConfig { + self.state.lock().config.clone() + } + /// Creates a dataframe that will execute a SQL query. /// /// This method is `async` because queries of type `CREATE EXTERNAL TABLE` @@ -246,7 +229,7 @@ impl SessionContext { format: file_format, collect_stat: false, file_extension: file_extension.to_owned(), - target_partitions: self.state.lock().config.target_partitions, + target_partitions: self.copied_config().target_partitions, table_partition_cols: vec![], }; @@ -366,8 +349,8 @@ impl SessionContext { options: AvroReadOptions<'_>, ) -> Result> { let uri: String = uri.into(); - let (object_store, path) = self.object_store(&uri)?; - let target_partitions = self.state.lock().config.target_partitions; + let (object_store, path) = self.runtime_env().object_store(&uri)?; + let target_partitions = self.copied_config().target_partitions; Ok(Arc::new(DataFrame::new( self.state.clone(), &LogicalPlanBuilder::scan_avro( @@ -397,8 +380,8 @@ impl SessionContext { options: CsvReadOptions<'_>, ) -> Result> { let uri: String = uri.into(); - let (object_store, path) = self.object_store(&uri)?; - let target_partitions = self.state.lock().config.target_partitions; + let (object_store, path) = self.runtime_env().object_store(&uri)?; + let target_partitions = self.copied_config().target_partitions; Ok(Arc::new(DataFrame::new( self.state.clone(), &LogicalPlanBuilder::scan_csv( @@ -419,8 +402,8 @@ impl SessionContext { uri: impl Into, ) -> Result> { let uri: String = uri.into(); - let (object_store, path) = self.object_store(&uri)?; - let target_partitions = self.state.lock().config.target_partitions; + let (object_store, path) = self.runtime_env().object_store(&uri)?; + let target_partitions = self.copied_config().target_partitions; let logical_plan = LogicalPlanBuilder::scan_parquet(object_store, path, None, target_partitions) .await? @@ -449,7 +432,7 @@ impl SessionContext { options: ListingOptions, provided_schema: Option, ) -> Result<()> { - let (object_store, path) = self.object_store(uri)?; + let (object_store, path) = self.runtime_env().object_store(uri)?; let resolved_schema = match provided_schema { None => { options @@ -475,7 +458,7 @@ impl SessionContext { options: CsvReadOptions<'_>, ) -> Result<()> { let listing_options = - options.to_listing_options(self.state.lock().config.target_partitions); + options.to_listing_options(self.copied_config().target_partitions); self.register_listing_table( name, @@ -492,8 +475,8 @@ impl SessionContext { /// executed against this context. pub async fn register_parquet(&mut self, name: &str, uri: &str) -> Result<()> { let (target_partitions, enable_pruning) = { - let m = self.state.lock(); - (m.config.target_partitions, m.config.parquet_pruning) + let conf = self.copied_config(); + (conf.target_partitions, conf.parquet_pruning) }; let file_format = ParquetFormat::default().with_enable_pruning(enable_pruning); @@ -519,7 +502,7 @@ impl SessionContext { options: AvroReadOptions<'_>, ) -> Result<()> { let listing_options = - options.to_listing_options(self.state.lock().config.target_partitions); + options.to_listing_options(self.copied_config().target_partitions); self.register_listing_table(name, uri, listing_options, options.schema) .await?; @@ -538,9 +521,9 @@ impl SessionContext { catalog: Arc, ) -> Option> { let name = name.into(); - + let information_schema = self.copied_config().information_schema; let state = self.state.lock(); - let catalog = if state.config.information_schema { + let catalog = if information_schema { Arc::new(CatalogWithInformationSchema::new( Arc::downgrade(&state.catalog_list), catalog, @@ -557,35 +540,6 @@ impl SessionContext { self.state.lock().catalog_list.catalog(name) } - /// Registers a object store with scheme using a custom `ObjectStore` so that - /// an external file system or object storage system could be used against this context. - /// - /// Returns the `ObjectStore` previously registered for this scheme, if any - pub fn register_object_store( - &self, - scheme: impl Into, - object_store: Arc, - ) -> Option> { - let scheme = scheme.into(); - - self.state - .lock() - .object_store_registry - .register_store(scheme, object_store) - } - - /// Retrieves a `ObjectStore` instance by scheme - pub fn object_store<'a>( - &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { - self.state - .lock() - .object_store_registry - .get_by_uri(uri) - .map_err(DataFusionError::from) - } - /// Registers a table using a custom `TableProvider` so that /// it can be referenced from SQL statements executed against this /// context. @@ -618,6 +572,20 @@ impl SessionContext { .deregister_table(table_ref.table()) } + /// Check whether the given table exists in the schema provider or not + /// Returns true if the table exists. + pub fn table_exist<'a>( + &'a self, + table_ref: impl Into>, + ) -> Result { + let table_ref = table_ref.into(); + Ok(self + .state + .lock() + .schema_for_ref(table_ref)? + .table_exist(table_ref.table())) + } + /// Retrieves a DataFrame representing a table previously registered by calling the /// register_table function. /// @@ -667,26 +635,7 @@ impl SessionContext { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - if let LogicalPlan::Explain(e) = plan { - let mut stringified_plans = e.stringified_plans.clone(); - - // optimize the child plan, capturing the output of each optimizer - let plan = - self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| { - let optimizer_name = optimizer.name().to_string(); - let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; - stringified_plans.push(optimized_plan.to_stringified(plan_type)); - })?; - - Ok(LogicalPlan::Explain(Explain { - verbose: e.verbose, - plan: Arc::new(plan), - stringified_plans, - schema: e.schema.clone(), - })) - } else { - self.optimize_internal(plan, |_, _| {}) - } + self.state.lock().optimize(plan) } /// Creates a physical plan from a logical plan. @@ -694,7 +643,7 @@ impl SessionContext { &self, logical_plan: &LogicalPlan, ) -> Result> { - let (state, planner) = { + let state_cloned = { let mut state = self.state.lock(); state.execution_props.start_execution(); @@ -707,10 +656,10 @@ impl SessionContext { // original state after it has been cloned, they will not be picked up by the // clone but that is okay, as it is equivalent to postponing the state update // by keeping the lock until the end of the function scope. - (state.clone(), Arc::clone(&state.config.query_planner)) + state.clone() }; - planner.create_physical_plan(logical_plan, &state).await + state_cloned.create_physical_plan(logical_plan).await } /// Executes a query and writes the results to a partitioned CSV file. @@ -719,7 +668,8 @@ impl SessionContext { plan: Arc, path: impl AsRef, ) -> Result<()> { - plan_to_csv(self, plan, path).await + let state = self.state.lock().clone(); + plan_to_csv(&state, plan, path).await } /// Executes a query and writes the results to a partitioned Parquet file. @@ -729,35 +679,8 @@ impl SessionContext { path: impl AsRef, writer_properties: Option, ) -> Result<()> { - plan_to_parquet(self, plan, path, writer_properties).await - } - - /// Optimizes the logical plan by applying optimizer rules, and - /// invoking observer function after each call - fn optimize_internal( - &self, - plan: &LogicalPlan, - mut observer: F, - ) -> Result - where - F: FnMut(&LogicalPlan, &dyn OptimizerRule), - { - let state = &mut self.state.lock(); - let execution_props = &mut state.execution_props.clone(); - let optimizers = &state.config.optimizers; - - let execution_props = execution_props.start_execution(); - - let mut new_plan = plan.clone(); - debug!("Input logical plan:\n{}\n", plan.display_indent()); - trace!("Full input logical plan:\n{:?}", plan); - for optimizer in optimizers { - new_plan = optimizer.optimize(&new_plan, execution_props)?; - observer(&new_plan, optimizer.as_ref()); - } - debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); - trace!("Full Optimized logical plan:\n {:?}", plan); - Ok(new_plan) + let state = self.state.lock().clone(); + plan_to_parquet(&state, plan, path, writer_properties).await } /// Get a new TaskContext to run in this session @@ -766,13 +689,6 @@ impl SessionContext { } } -impl From>> for SessionContext { - fn from(state: Arc>) -> Self { - let session_id = state.lock().session_id.clone(); - SessionContext { session_id, state } - } -} - impl FunctionRegistry for SessionContext { fn udfs(&self) -> HashSet { self.state.lock().udfs() @@ -816,17 +732,28 @@ impl QueryPlanner for DefaultQueryPlanner { } } -/// Configuration options for execution context +/// Session Configuration entry name +pub const BATCH_SIZE: &str = "batch_size"; +/// Session Configuration entry name +pub const TARGET_PARTITIONS: &str = "target_partitions"; +/// Session Configuration entry name +pub const REPARTITION_JOINS: &str = "repartition_joins"; +/// Session Configuration entry name +pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations"; +/// Session Configuration entry name +pub const REPARTITION_WINDOWS: &str = "repartition_windows"; +/// Session Configuration entry name +pub const PARQUET_PRUNING: &str = "parquet_pruning"; + +/// Configuration options for session context #[derive(Clone)] pub struct SessionConfig { + /// Default batch size while creating new batches, it's especially useful + /// for buffer-in-memory batches since creating tiny batches would results + /// in too much metadata memory consumption. + pub batch_size: usize, /// Number of partitions for query execution. Increasing partitions can increase concurrency. pub target_partitions: usize, - /// Responsible for optimizing a logical plan - optimizers: Vec>, - /// Responsible for optimizing a physical execution plan - pub physical_optimizers: Vec>, - /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` - query_planner: Arc, /// Default catalog name for table resolution default_catalog: String, /// Default schema name for table resolution @@ -846,39 +773,14 @@ pub struct SessionConfig { /// parallel using the provided `target_partitions` level pub repartition_windows: bool, /// Should DataFusion parquet reader using the predicate to prune data - parquet_pruning: bool, - /// Runtime configurations such as memory threshold and local disk for spill - pub runtime: RuntimeConfig, + pub parquet_pruning: bool, } impl Default for SessionConfig { fn default() -> Self { Self { + batch_size: 8192, target_partitions: num_cpus::get(), - optimizers: vec![ - // Simplify expressions first to maximize the chance - // of applying other optimizations - Arc::new(SimplifyExpressions::new()), - Arc::new(EliminateFilter::new()), - Arc::new(CommonSubexprEliminate::new()), - Arc::new(EliminateLimit::new()), - Arc::new(ProjectionPushDown::new()), - Arc::new(FilterPushDown::new()), - Arc::new(LimitPushDown::new()), - Arc::new(SingleDistinctToGroupBy::new()), - // ToApproxPerc must be applied last because - // it rewrites only the function and may interfere with - // other rules - Arc::new(ToApproxPerc::new()), - ], - physical_optimizers: vec![ - Arc::new(AggregateStatistics::new()), - Arc::new(HashBuildProbeOrder::new()), - Arc::new(CoalesceBatches::new()), - Arc::new(Repartition::new()), - Arc::new(AddCoalescePartitionsExec::new()), - ], - query_planner: Arc::new(DefaultQueryPlanner {}), default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), create_default_catalog_and_schema: true, @@ -887,7 +789,6 @@ impl Default for SessionConfig { repartition_aggregations: true, repartition_windows: true, parquet_pruning: true, - runtime: RuntimeConfig::default(), } } } @@ -898,64 +799,19 @@ impl SessionConfig { Default::default() } - /// Customize target_partitions - pub fn with_target_partitions(mut self, n: usize) -> Self { - // partition count must be greater than zero - assert!(n > 0); - self.target_partitions = n; - self - } - /// Customize batch size pub fn with_batch_size(mut self, n: usize) -> Self { // batch size must be greater than zero assert!(n > 0); - self.runtime.batch_size = n; - self - } - - /// Replace the default query planner - pub fn with_query_planner( - mut self, - query_planner: Arc, - ) -> Self { - self.query_planner = query_planner; - self - } - - /// Replace the optimizer rules - pub fn with_optimizer_rules( - mut self, - optimizers: Vec>, - ) -> Self { - self.optimizers = optimizers; + self.batch_size = n; self } - /// Replace the physical optimizer rules - pub fn with_physical_optimizer_rules( - mut self, - physical_optimizers: Vec>, - ) -> Self { - self.physical_optimizers = physical_optimizers; - self - } - - /// Adds a new [`OptimizerRule`] - pub fn add_optimizer_rule( - mut self, - optimizer_rule: Arc, - ) -> Self { - self.optimizers.push(optimizer_rule); - self - } - - /// Adds a new [`PhysicalOptimizerRule`] - pub fn add_physical_optimizer_rule( - mut self, - optimizer_rule: Arc, - ) -> Self { - self.physical_optimizers.push(optimizer_rule); + /// Customize target_partitions + pub fn with_target_partitions(mut self, n: usize) -> Self { + // partition count must be greater than zero + assert!(n > 0); + self.target_partitions = n; self } @@ -1005,54 +861,6 @@ impl SessionConfig { self.parquet_pruning = enabled; self } - - /// Customize runtime config - pub fn with_runtime_config(mut self, config: RuntimeConfig) -> Self { - self.runtime = config; - self - } - - /// Use an an existing [MemoryManager] - pub fn with_existing_memory_manager(mut self, existing: Arc) -> Self { - self.runtime = self - .runtime - .with_memory_manager(MemoryManagerConfig::new_existing(existing)); - self - } - - /// Specify the total memory to use while running the DataFusion - /// plan to `max_memory * memory_fraction` in bytes. - /// - /// Note DataFusion does not yet respect this limit in all cases. - pub fn with_memory_limit( - mut self, - max_memory: usize, - memory_fraction: f64, - ) -> Result { - self.runtime = - self.runtime - .with_memory_manager(MemoryManagerConfig::try_new_limit( - max_memory, - memory_fraction, - )?); - Ok(self) - } - - /// Use an an existing [DiskManager] - pub fn with_existing_disk_manager(mut self, existing: Arc) -> Self { - self.runtime = self - .runtime - .with_disk_manager(DiskManagerConfig::new_existing(existing)); - self - } - - /// Use the specified path to create any needed temporary files - pub fn with_temp_file_path(mut self, path: impl Into) -> Self { - self.runtime = self - .runtime - .with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()])); - self - } } /// Holds per-execution properties and data (such as starting timestamps, etc). @@ -1122,41 +930,84 @@ impl ExecutionProps { pub struct SessionState { /// Uuid for the session session_id: String, + /// Responsible for optimizing a logical plan + pub optimizers: Vec>, + /// Responsible for optimizing a physical execution plan + pub physical_optimizers: Vec>, + /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` + pub query_planner: Arc, /// Collection of catalogs containing schemas and ultimately TableProviders pub catalog_list: Arc, /// Scalar functions that are registered with the context pub scalar_functions: HashMap>, /// Aggregate functions registered in the context pub aggregate_functions: HashMap>, - /// Context configuration + /// Session configuration pub config: SessionConfig, /// Execution properties pub execution_props: ExecutionProps, - /// Object Store that are registered with the context - pub object_store_registry: Arc, /// Runtime environment pub runtime_env: Arc, } -impl Default for SessionState { - fn default() -> Self { - Self::new() - } -} - impl SessionState { - /// Returns new SessionState - pub fn new() -> Self { + /// Returns new SessionState using the provided configuration and runtime + pub fn with_config(config: SessionConfig, runtime: Arc) -> Self { let session_id = Uuid::new_v4().to_string(); + + let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; + if config.create_default_catalog_and_schema { + let default_catalog = MemoryCatalogProvider::new(); + + default_catalog.register_schema( + config.default_schema.clone(), + Arc::new(MemorySchemaProvider::new()), + ); + + let default_catalog: Arc = if config.information_schema { + Arc::new(CatalogWithInformationSchema::new( + Arc::downgrade(&catalog_list), + Arc::new(default_catalog), + )) + } else { + Arc::new(default_catalog) + }; + catalog_list + .register_catalog(config.default_catalog.clone(), default_catalog); + } + SessionState { session_id, - catalog_list: Arc::new(MemoryCatalogList::new()), + optimizers: vec![ + // Simplify expressions first to maximize the chance + // of applying other optimizations + Arc::new(SimplifyExpressions::new()), + Arc::new(EliminateFilter::new()), + Arc::new(CommonSubexprEliminate::new()), + Arc::new(EliminateLimit::new()), + Arc::new(ProjectionPushDown::new()), + Arc::new(FilterPushDown::new()), + Arc::new(LimitPushDown::new()), + Arc::new(SingleDistinctToGroupBy::new()), + // ToApproxPerc must be applied last because + // it rewrites only the function and may interfere with + // other rules + Arc::new(ToApproxPerc::new()), + ], + physical_optimizers: vec![ + Arc::new(AggregateStatistics::new()), + Arc::new(HashBuildProbeOrder::new()), + Arc::new(CoalesceBatches::new()), + Arc::new(Repartition::new()), + Arc::new(AddCoalescePartitionsExec::new()), + ], + query_planner: Arc::new(DefaultQueryPlanner {}), + catalog_list, scalar_functions: HashMap::new(), aggregate_functions: HashMap::new(), - config: SessionConfig::new(), + config, execution_props: ExecutionProps::new(), - object_store_registry: Arc::new(ObjectStoreRegistry::new()), - runtime_env: Arc::new(RuntimeEnv::default()), + runtime_env: runtime, } } @@ -1173,7 +1024,7 @@ impl SessionState { &'a self, table_ref: impl Into>, ) -> Result> { - let resolved_ref = self.resolve_table_ref(table_ref.into()); + let resolved_ref = self.resolve_table_ref(table_ref); self.catalog_list .catalog(resolved_ref.catalog) @@ -1191,6 +1042,111 @@ impl SessionState { )) }) } + + /// Replace the default query planner + pub fn with_query_planner( + mut self, + query_planner: Arc, + ) -> Self { + self.query_planner = query_planner; + self + } + + /// Replace the optimizer rules + pub fn with_optimizer_rules( + mut self, + optimizers: Vec>, + ) -> Self { + self.optimizers = optimizers; + self + } + + /// Replace the physical optimizer rules + pub fn with_physical_optimizer_rules( + mut self, + physical_optimizers: Vec>, + ) -> Self { + self.physical_optimizers = physical_optimizers; + self + } + + /// Adds a new [`OptimizerRule`] + pub fn add_optimizer_rule( + mut self, + optimizer_rule: Arc, + ) -> Self { + self.optimizers.push(optimizer_rule); + self + } + + /// Adds a new [`PhysicalOptimizerRule`] + pub fn add_physical_optimizer_rule( + mut self, + optimizer_rule: Arc, + ) -> Self { + self.physical_optimizers.push(optimizer_rule); + self + } + + /// Optimizes the logical plan by applying optimizer rules. + pub fn optimize(&self, plan: &LogicalPlan) -> Result { + if let LogicalPlan::Explain(e) = plan { + let mut stringified_plans = e.stringified_plans.clone(); + + // optimize the child plan, capturing the output of each optimizer + let plan = + self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; + stringified_plans.push(optimized_plan.to_stringified(plan_type)); + })?; + + Ok(LogicalPlan::Explain(Explain { + verbose: e.verbose, + plan: Arc::new(plan), + stringified_plans, + schema: e.schema.clone(), + })) + } else { + self.optimize_internal(plan, |_, _| {}) + } + } + + /// Optimizes the logical plan by applying optimizer rules, and + /// invoking observer function after each call + fn optimize_internal( + &self, + plan: &LogicalPlan, + mut observer: F, + ) -> Result + where + F: FnMut(&LogicalPlan, &dyn OptimizerRule), + { + let execution_props = &mut self.execution_props.clone(); + let optimizers = &self.optimizers; + + let execution_props = execution_props.start_execution(); + + let mut new_plan = plan.clone(); + debug!("Input logical plan:\n{}\n", plan.display_indent()); + trace!("Full input logical plan:\n{:?}", plan); + for optimizer in optimizers { + new_plan = optimizer.optimize(&new_plan, execution_props)?; + observer(&new_plan, optimizer.as_ref()); + } + debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); + trace!("Full Optimized logical plan:\n {:?}", plan); + Ok(new_plan) + } + + /// Creates a physical plan from a logical plan. + pub async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> Result> { + let planner = self.query_planner.clone(); + planner.create_physical_plan(logical_plan, self).await + } } impl ContextProvider for SessionState { @@ -1289,14 +1245,45 @@ impl TaskContext { runtime, } } + + /// Return the SessionConfig associated with the Task + pub fn session_config(&self) -> SessionConfig { + let task_settings = &self.task_settings; + match task_settings { + TaskProperties::KVPairs(props) => { + let session_config = SessionConfig::new(); + session_config + .with_batch_size(props.get(BATCH_SIZE).unwrap().parse().unwrap()) + .with_target_partitions( + props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(), + ) + .with_repartition_joins( + props.get(REPARTITION_JOINS).unwrap().parse().unwrap(), + ) + .with_repartition_aggregations( + props + .get(REPARTITION_AGGREGATIONS) + .unwrap() + .parse() + .unwrap(), + ) + .with_repartition_windows( + props.get(REPARTITION_WINDOWS).unwrap().parse().unwrap(), + ) + .with_parquet_pruning( + props.get(PARQUET_PRUNING).unwrap().parse().unwrap(), + ) + } + TaskProperties::SessionConfig(session_config) => session_config.clone(), + } + } } /// Create a new task context instance from SessionContext impl From<&SessionContext> for TaskContext { fn from(session: &SessionContext) -> Self { - let state_clone = session.state.lock().clone(); let session_id = session.session_id.clone(); - let config = state_clone.config; + let config = session.state.lock().config.clone(); let runtime = session.runtime_env(); Self { task_id: None, @@ -1362,11 +1349,9 @@ mod tests { // configure with same memory / disk manager let memory_manager = ctx1.runtime_env().memory_manager.clone(); let disk_manager = ctx1.runtime_env().disk_manager.clone(); - let config = SessionConfig::new() - .with_existing_memory_manager(memory_manager.clone()) - .with_existing_disk_manager(disk_manager.clone()); - let ctx2 = SessionContext::with_config(config); + let ctx2 = + SessionContext::with_config_rt(SessionConfig::new(), ctx1.runtime_env()); assert!(std::ptr::eq( Arc::as_ptr(&memory_manager), @@ -2864,9 +2849,10 @@ mod tests { #[tokio::test] async fn custom_query_planner() -> Result<()> { - let mut ctx = SessionContext::with_config( - SessionConfig::new().with_query_planner(Arc::new(MyQueryPlanner {})), - ); + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let session_state = SessionState::with_config(SessionConfig::new(), runtime) + .with_query_planner(Arc::new(MyQueryPlanner {})); + let mut ctx = SessionContext::with_state(session_state); let df = ctx.sql("SELECT 1").await?; df.collect().await.expect_err("query not supported"); diff --git a/datafusion/src/execution/runtime_env.rs b/datafusion/src/execution/runtime_env.rs index e993b385ecd4c..b5925d24f8833 100644 --- a/datafusion/src/execution/runtime_env.rs +++ b/datafusion/src/execution/runtime_env.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Execution runtime environment that tracks memory, disk and various configurations -//! that are used during physical plan execution. +//! Execution runtime environment that holds object Store, memory manager, disk manager +//! and various system level components that are used during physical plan execution. use crate::{ error::Result, @@ -26,19 +26,21 @@ use crate::{ }, }; +use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; +use datafusion_common::DataFusionError; use std::fmt::{Debug, Formatter}; +use std::path::PathBuf; use std::sync::Arc; #[derive(Clone)] -/// Execution runtime environment. This structure is passed to the -/// physical plans when they are run. +/// Execution runtime environment. pub struct RuntimeEnv { - /// Default batch size while creating new batches - pub batch_size: usize, /// Runtime memory management pub memory_manager: Arc, /// Manage temporary files during query execution pub disk_manager: Arc, + /// Object Store Registry + pub object_store_registry: Arc, } impl Debug for RuntimeEnv { @@ -51,23 +53,17 @@ impl RuntimeEnv { /// Create env based on configuration pub fn new(config: RuntimeConfig) -> Result { let RuntimeConfig { - batch_size, memory_manager, disk_manager, } = config; Ok(Self { - batch_size, memory_manager: MemoryManager::new(memory_manager), disk_manager: DiskManager::try_new(disk_manager)?, + object_store_registry: Arc::new(ObjectStoreRegistry::new()), }) } - /// Get execution batch size based on config - pub fn batch_size(&self) -> usize { - self.batch_size - } - /// Register the consumer to get it tracked pub fn register_requester(&self, id: &MemoryConsumerId) { self.memory_manager.register_requester(id); @@ -87,6 +83,30 @@ impl RuntimeEnv { pub fn shrink_tracker_usage(&self, delta: usize) { self.memory_manager.shrink_tracker_usage(delta) } + + /// Registers a object store with scheme using a custom `ObjectStore` so that + /// an external file system or object storage system could be used against this context. + /// + /// Returns the `ObjectStore` previously registered for this scheme, if any + pub fn register_object_store( + &self, + scheme: impl Into, + object_store: Arc, + ) -> Option> { + let scheme = scheme.into(); + self.object_store_registry + .register_store(scheme, object_store) + } + + /// Retrieves a `ObjectStore` instance by scheme + pub fn object_store<'a>( + &self, + uri: &'a str, + ) -> Result<(Arc, &'a str)> { + self.object_store_registry + .get_by_uri(uri) + .map_err(DataFusionError::from) + } } impl Default for RuntimeEnv { @@ -95,13 +115,9 @@ impl Default for RuntimeEnv { } } -#[derive(Clone)] +#[derive(Clone, Default)] /// Execution runtime configuration pub struct RuntimeConfig { - /// Default batch size while creating new batches, it's especially useful - /// for buffer-in-memory batches since creating tiny batches would results - /// in too much metadata memory consumption. - pub batch_size: usize, /// DiskManager to manage temporary disk file usage pub disk_manager: DiskManagerConfig, /// MemoryManager to limit access to memory @@ -114,14 +130,6 @@ impl RuntimeConfig { Default::default() } - /// Customize batch size - pub fn with_batch_size(mut self, n: usize) -> Self { - // batch size must be greater than zero - assert!(n > 0); - self.batch_size = n; - self - } - /// Customize disk manager pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self { self.disk_manager = disk_manager; @@ -133,14 +141,19 @@ impl RuntimeConfig { self.memory_manager = memory_manager; self } -} -impl Default for RuntimeConfig { - fn default() -> Self { - Self { - batch_size: 8192, - disk_manager: DiskManagerConfig::default(), - memory_manager: MemoryManagerConfig::default(), - } + /// Specify the total memory to use while running the DataFusion + /// plan to `max_memory * memory_fraction` in bytes. + /// + /// Note DataFusion does not yet respect this limit in all cases. + pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self { + self.with_memory_manager( + MemoryManagerConfig::try_new_limit(max_memory, memory_fraction).unwrap(), + ) + } + + /// Use the specified path to create any needed temporary files + pub fn with_temp_file_path(self, path: impl Into) -> Self { + self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()])) } } diff --git a/datafusion/src/physical_optimizer/aggregate_statistics.rs b/datafusion/src/physical_optimizer/aggregate_statistics.rs index 8a7d790b4e1bd..9af053f934fb9 100644 --- a/datafusion/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/src/physical_optimizer/aggregate_statistics.rs @@ -297,7 +297,7 @@ mod tests { ) -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let conf = session_ctx.state.lock().clone().config; + let conf = session_ctx.copied_config(); let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?; let (col, count) = match nulls { diff --git a/datafusion/src/physical_optimizer/coalesce_batches.rs b/datafusion/src/physical_optimizer/coalesce_batches.rs index 47d87d35f0e65..cf76c7d140bda 100644 --- a/datafusion/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/src/physical_optimizer/coalesce_batches.rs @@ -75,7 +75,7 @@ impl PhysicalOptimizerRule for CoalesceBatches { // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is // implemented. For now, we choose half the configured batch size to avoid copies // when a small number of rows are removed from a batch - let target_batch_size = config.runtime.batch_size / 2; + let target_batch_size = config.batch_size / 2; Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size)) } else { plan.clone() diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 0f38291e5ee3d..37418b0e976e2 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -18,7 +18,7 @@ //! Execution plan for reading CSV files use crate::error::{DataFusionError, Result}; -use crate::execution::context::{SessionContext, TaskContext}; +use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -124,7 +124,7 @@ impl ExecutionPlan for CsvExec { partition: usize, context: Arc, ) -> Result { - let batch_size = context.runtime.batch_size(); + let batch_size = context.session_config().batch_size; let file_schema = Arc::clone(&self.base_config.file_schema); let file_projection = self.base_config.file_column_projection_indices(); let has_header = self.has_header; @@ -180,7 +180,7 @@ impl ExecutionPlan for CsvExec { } pub async fn plan_to_csv( - context: &SessionContext, + state: &SessionState, plan: Arc, path: impl AsRef, ) -> Result<()> { @@ -196,7 +196,7 @@ pub async fn plan_to_csv( let path = fs_path.join(&filename); let file = fs::File::create(path)?; let mut writer = csv::Writer::new(file); - let task_ctx = context.task_ctx(); + let task_ctx = Arc::new(TaskContext::from(state)); let stream = plan.execute(i, task_ctx).await?; let handle: JoinHandle> = task::spawn(async move { stream diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index 9c3a2e1ecaff4..52a9e9f99cff5 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -99,7 +99,7 @@ impl ExecutionPlan for NdJsonExec { ) -> Result { let proj = self.base_config.projected_file_column_names(); - let batch_size = context.runtime.batch_size(); + let batch_size = context.session_config().batch_size; let file_schema = Arc::clone(&self.base_config.file_schema); // The json reader cannot limit the number of records, so `remaining` is ignored. diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index f532f6a88d277..2c90d5bbaaded 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -27,7 +27,7 @@ use std::{any::Any, convert::TryInto}; use crate::datasource::file_format::parquet::ChunkObjectReader; use crate::datasource::object_store::ObjectStore; use crate::datasource::PartitionedFile; -use crate::execution::context::{SessionContext, TaskContext}; +use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::{ error::{DataFusionError, Result}, @@ -225,7 +225,7 @@ impl ExecutionPlan for ParquetExec { None => (0..self.base_config.file_schema.fields().len()).collect(), }; let pruning_predicate = self.pruning_predicate.clone(); - let batch_size = context.runtime.batch_size(); + let batch_size = context.session_config().batch_size; let limit = self.base_config.limit; let object_store = Arc::clone(&self.base_config.object_store); let partition_col_proj = PartitionColumnProjector::new( @@ -527,7 +527,7 @@ fn read_partition( /// Executes a query and writes the results to a partitioned Parquet file. pub async fn plan_to_parquet( - context: &SessionContext, + state: &SessionState, plan: Arc, path: impl AsRef, writer_properties: Option, @@ -548,7 +548,7 @@ pub async fn plan_to_parquet( plan.schema(), writer_properties.clone(), )?; - let task_ctx = context.task_ctx(); + let task_ctx = Arc::new(TaskContext::from(state)); let stream = plan.execute(i, task_ctx).await?; let handle: JoinHandle> = task::spawn(async move { stream @@ -588,7 +588,7 @@ mod tests { use super::*; use crate::execution::options::CsvReadOptions; - use crate::prelude::SessionConfig; + use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::Float32Array; use arrow::{ array::{Int64Array, Int8Array, StringArray}, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 37b303469b8ea..fcd88ce2ca7b4 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1399,7 +1399,7 @@ impl DefaultPhysicalPlanner { where F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule), { - let optimizers = &session_state.config.physical_optimizers; + let optimizers = &session_state.physical_optimizers; debug!( "Input physical plan:\n{}\n", displayable(plan.as_ref()).indent() @@ -1435,10 +1435,12 @@ mod tests { use crate::datasource::object_store::local::LocalFileSystem; use crate::execution::context::TaskContext; use crate::execution::options::CsvReadOptions; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::logical_plan::plan::Extension; use crate::physical_plan::{ expressions, DisplayFormatType, Partitioning, Statistics, }; + use crate::prelude::SessionConfig; use crate::scalar::ScalarValue; use crate::{ logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream, @@ -1454,7 +1456,8 @@ mod tests { use std::{any::Any, fmt}; fn make_session_state() -> SessionState { - SessionState::new() + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + SessionState::with_config(SessionConfig::new(), runtime) } async fn plan(logical_plan: &LogicalPlan) -> Result> { diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index b2bf604665a08..84a07695a813b 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -37,6 +37,7 @@ use crate::physical_plan::{ common, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use crate::prelude::SessionConfig; use arrow::array::ArrayRef; pub use arrow::compute::SortOptions; use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; @@ -75,6 +76,7 @@ struct ExternalSorter { spills: Mutex>, /// Sort expressions expr: Vec, + session_config: Arc, runtime: Arc, metrics_set: CompositeMetricsSet, metrics: BaselineMetrics, @@ -86,6 +88,7 @@ impl ExternalSorter { schema: SchemaRef, expr: Vec, metrics_set: CompositeMetricsSet, + session_config: Arc, runtime: Arc, ) -> Self { let metrics = metrics_set.new_intermediate_baseline(partition_id); @@ -95,6 +98,7 @@ impl ExternalSorter { in_mem_batches: Mutex::new(vec![]), spills: Mutex::new(vec![]), expr, + session_config, runtime, metrics_set, metrics, @@ -152,7 +156,7 @@ impl ExternalSorter { self.schema.clone(), &self.expr, tracking_metrics, - self.runtime.clone(), + self.session_config.batch_size, ))) } else if in_mem_batches.len() > 0 { let tracking_metrics = self @@ -577,6 +581,7 @@ async fn do_sort( schema.clone(), expr, metrics_set, + Arc::new(context.session_config()), context.runtime.clone(), ); context.runtime.register_requester(sorter.id()); @@ -592,6 +597,7 @@ mod tests { use super::*; use crate::datasource::object_store::local::LocalFileSystem; use crate::execution::context::SessionConfig; + use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; @@ -678,8 +684,9 @@ mod tests { #[tokio::test] async fn test_sort_spill() -> Result<()> { // trigger spill there will be 4 batches with 5.5KB for each - let config = SessionConfig::new().with_memory_limit(12288, 1.0)?; - let session_ctx = SessionContext::with_config(config); + let config = RuntimeConfig::new().with_memory_limit(12288, 1.0); + let runtime = Arc::new(RuntimeEnv::new(config)?); + let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); let schema = test_util::aggr_test_schema(); let partitions = 4; diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index b04af04ba5bfd..5082b9f7ffc15 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -43,7 +43,6 @@ use futures::{Stream, StreamExt}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; -use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::sorts::{RowIndex, SortKeyCursor, SortedStream, StreamWrapper}; use crate::physical_plan::{ common::spawn_execution, expressions::PhysicalSortExpr, DisplayFormatType, @@ -201,7 +200,7 @@ impl ExecutionPlan for SortPreservingMergeExec { self.schema(), &self.expr, tracking_metrics, - context.runtime.clone(), + context.session_config().batch_size, ))) } } @@ -303,7 +302,7 @@ impl SortPreservingMergeStream { schema: SchemaRef, expressions: &[PhysicalSortExpr], tracking_metrics: MemTrackingMetrics, - runtime: Arc, + batch_size: usize, ) -> Self { let stream_count = receivers.len(); let batches = (0..stream_count) @@ -325,7 +324,7 @@ impl SortPreservingMergeStream { in_progress: vec![], next_batch_id: 0, min_heap: BinaryHeap::with_capacity(stream_count), - batch_size: runtime.batch_size(), + batch_size, } } @@ -334,7 +333,7 @@ impl SortPreservingMergeStream { schema: SchemaRef, expressions: &[PhysicalSortExpr], tracking_metrics: MemTrackingMetrics, - runtime: Arc, + batch_size: usize, ) -> Self { let stream_count = streams.len(); let batches = (0..stream_count) @@ -360,7 +359,7 @@ impl SortPreservingMergeStream { in_progress: vec![], next_batch_id: 0, min_heap: BinaryHeap::with_capacity(stream_count), - batch_size: runtime.batch_size(), + batch_size, } } @@ -1228,7 +1227,7 @@ mod tests { batches.schema(), sort.as_slice(), tracking_metrics, - task_ctx.runtime.clone(), + task_ctx.session_config().batch_size, ); let mut merged = common::collect(Box::pin(merge_stream)).await.unwrap(); diff --git a/datafusion/tests/order_spill_fuzz.rs b/datafusion/tests/order_spill_fuzz.rs index 53dadb7f17438..c052382d5eac7 100644 --- a/datafusion/tests/order_spill_fuzz.rs +++ b/datafusion/tests/order_spill_fuzz.rs @@ -23,7 +23,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion::execution::memory_manager::MemoryManagerConfig; -use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; @@ -78,8 +78,8 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { let runtime_config = RuntimeConfig::new().with_memory_manager( MemoryManagerConfig::try_new_limit(pool_size, 1.0).unwrap(), ); - let session_config = SessionConfig::new().with_runtime_config(runtime_config); - let session_ctx = SessionContext::with_config(session_config); + let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); + let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); let task_ctx = session_ctx.task_ctx(); let collected = collect(sort.clone(), task_ctx).await.unwrap(); diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index 37c47969fbca9..30b422925938e 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -87,6 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; use datafusion::execution::context::{ExecutionProps, TaskContext}; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::logical_plan::plan::{Extension, Sort}; use datafusion::logical_plan::{DFSchemaRef, Limit}; @@ -242,12 +243,12 @@ async fn topk_plan() -> Result<()> { } fn make_topk_context() -> SessionContext { - let config = SessionConfig::new() + let config = SessionConfig::new().with_target_partitions(48); + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let state = SessionState::with_config(config, runtime) .with_query_planner(Arc::new(TopKQueryPlanner {})) - .with_target_partitions(48) .add_optimizer_rule(Arc::new(TopKOptimizerRule {})); - - SessionContext::with_config(config) + SessionContext::with_state(state) } // ------ The implementation of the TopK code follows ----- From a5dd501e2a549d80d22172b2aa0666918f867e1c Mon Sep 17 00:00:00 2001 From: Wang Date: Thu, 17 Mar 2022 19:00:07 +0800 Subject: [PATCH 2/4] fix UT --- datafusion/src/physical_plan/file_format/avro.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index 89cae4e143e5a..3ee23a95af673 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -120,7 +120,7 @@ impl ExecutionPlan for AvroExec { ) -> Result { let proj = self.base_config.projected_file_column_names(); - let batch_size = context.runtime.batch_size(); + let batch_size = context.session_config().batch_size; let file_schema = Arc::clone(&self.base_config.file_schema); // The avro reader cannot limit the number of records, so `remaining` is ignored. From b9a82584b54ffc1f12a4df52d7312aa3768cab70 Mon Sep 17 00:00:00 2001 From: Wang Date: Mon, 21 Mar 2022 14:39:51 +0800 Subject: [PATCH 3/4] Resolve review comments --- .../scheduler/src/scheduler_server/mod.rs | 2 +- datafusion/benches/sort_limit_query_sql.rs | 2 +- datafusion/src/dataframe.rs | 22 +++--- datafusion/src/execution/context.rs | 78 +++++++++---------- 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 2228b768bb4d6..20443c20aab31 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -172,7 +172,7 @@ pub fn update_datafusion_context( session_ctx: Arc, config: &BallistaConfig, ) -> Arc { - session_ctx.state.lock().config.target_partitions = + session_ctx.state.write().config.target_partitions = config.default_shuffle_partitions(); session_ctx } diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs index 097191e0f02b1..7860ebc450be8 100644 --- a/datafusion/benches/sort_limit_query_sql.rs +++ b/datafusion/benches/sort_limit_query_sql.rs @@ -84,7 +84,7 @@ fn create_context() -> Arc> { rt.block_on(async { // create local session context let mut ctx = SessionContext::new(); - ctx.state.lock().config.target_partitions = 1; + ctx.state.write().config.target_partitions = 1; let task_ctx = ctx.task_ctx(); let mem_table = MemTable::load(Arc::new(csv.await), Some(partitions), task_ctx) diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 36076c06d25c1..ae2cc765194ab 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -40,7 +40,7 @@ use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; use crate::scalar::ScalarValue; use crate::sql::utils::find_window_exprs; -use parking_lot::Mutex; +use parking_lot::RwLock; use std::any::Any; /// DataFrame represents a logical set of rows with the same named columns. @@ -69,13 +69,13 @@ use std::any::Any; /// # } /// ``` pub struct DataFrame { - session_state: Arc>, + session_state: Arc>, plan: LogicalPlan, } impl DataFrame { /// Create a new Table based on an existing logical plan - pub fn new(session_state: Arc>, plan: &LogicalPlan) -> Self { + pub fn new(session_state: Arc>, plan: &LogicalPlan) -> Self { Self { session_state, plan: plan.clone(), @@ -84,7 +84,7 @@ impl DataFrame { /// Create a physical plan pub async fn create_physical_plan(&self) -> Result> { - let state = self.session_state.lock().clone(); + let state = self.session_state.read().clone(); let optimized_plan = state.optimize(&self.plan)?; state.create_physical_plan(&optimized_plan).await } @@ -350,7 +350,7 @@ impl DataFrame { /// ``` pub async fn collect(&self) -> Result> { let plan = self.create_physical_plan().await?; - let task_ctx = Arc::new(TaskContext::from(&self.session_state.lock().clone())); + let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone())); Ok(collect(plan, task_ctx).await?) } @@ -405,7 +405,7 @@ impl DataFrame { /// ``` pub async fn execute_stream(&self) -> Result { let plan = self.create_physical_plan().await?; - let task_ctx = Arc::new(TaskContext::from(&self.session_state.lock().clone())); + let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone())); execute_stream(plan, task_ctx).await } @@ -425,7 +425,7 @@ impl DataFrame { /// ``` pub async fn collect_partitioned(&self) -> Result>> { let plan = self.create_physical_plan().await?; - let task_ctx = Arc::new(TaskContext::from(&self.session_state.lock().clone())); + let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone())); Ok(collect_partitioned(plan, task_ctx).await?) } @@ -446,7 +446,7 @@ impl DataFrame { &self, ) -> Result> { let plan = self.create_physical_plan().await?; - let task_ctx = Arc::new(TaskContext::from(&self.session_state.lock().clone())); + let task_ctx = Arc::new(TaskContext::from(&self.session_state.read().clone())); Ok(execute_stream_partitioned(plan, task_ctx).await?) } @@ -510,7 +510,7 @@ impl DataFrame { /// # } /// ``` pub fn registry(&self) -> Arc { - let registry = self.session_state.lock().clone(); + let registry = self.session_state.read().clone(); Arc::new(registry) } @@ -562,7 +562,7 @@ impl DataFrame { /// Write a `DataFrame` to a CSV file. pub async fn write_csv(&self, path: &str) -> Result<()> { let plan = self.create_physical_plan().await?; - let state = self.session_state.lock().clone(); + let state = self.session_state.read().clone(); plan_to_csv(&state, plan, path).await } @@ -573,7 +573,7 @@ impl DataFrame { writer_properties: Option, ) -> Result<()> { let plan = self.create_physical_plan().await?; - let state = self.session_state.lock().clone(); + let state = self.session_state.read().clone(); plan_to_parquet(&state, plan, path, writer_properties).await } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 12cda260b989b..15b445b8fda00 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -40,7 +40,7 @@ use crate::{ }, }; use log::{debug, trace}; -use parking_lot::Mutex; +use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; use std::string::String; use std::sync::Arc; @@ -141,7 +141,7 @@ pub struct SessionContext { /// Session start time pub session_start_time: DateTime, /// Shared session state for the session - pub state: Arc>, + pub state: Arc>, } impl Default for SessionContext { @@ -168,7 +168,7 @@ impl SessionContext { Self { session_id: state.session_id.clone(), session_start_time: chrono::Utc::now(), - state: Arc::new(Mutex::new(state)), + state: Arc::new(RwLock::new(state)), } } @@ -177,18 +177,18 @@ impl SessionContext { Self { session_id: state.session_id.clone(), session_start_time: chrono::Utc::now(), - state: Arc::new(Mutex::new(state)), + state: Arc::new(RwLock::new(state)), } } /// Return the [RuntimeEnv] used to run queries with this [SessionContext] pub fn runtime_env(&self) -> Arc { - self.state.lock().runtime_env.clone() + self.state.read().runtime_env.clone() } /// Return a copied version of config for this Session pub fn copied_config(&self) -> SessionConfig { - self.state.lock().config.clone() + self.state.read().config.clone() } /// Creates a dataframe that will execute a SQL query. @@ -296,7 +296,7 @@ impl SessionContext { } // create a query planner - let state = self.state.lock().clone(); + let state = self.state.read().clone(); let query_planner = SqlToRel::new(&state); query_planner.statement_to_plan(statements.pop_front().unwrap()) } @@ -308,7 +308,7 @@ impl SessionContext { provider: Arc, ) { self.state - .lock() + .write() .execution_props .add_var_provider(variable_type, provider); } @@ -322,7 +322,7 @@ impl SessionContext { /// `SELECT "my_FUNC"(x)` will look for a function named `"my_FUNC"` pub fn register_udf(&mut self, f: ScalarUDF) { self.state - .lock() + .write() .scalar_functions .insert(f.name.clone(), Arc::new(f)); } @@ -336,7 +336,7 @@ impl SessionContext { /// `SELECT "my_UDAF"(x)` will look for an aggregate named `"my_UDAF"` pub fn register_udaf(&mut self, f: AggregateUDF) { self.state - .lock() + .write() .aggregate_functions .insert(f.name.clone(), Arc::new(f)); } @@ -522,7 +522,7 @@ impl SessionContext { ) -> Option> { let name = name.into(); let information_schema = self.copied_config().information_schema; - let state = self.state.lock(); + let state = self.state.read(); let catalog = if information_schema { Arc::new(CatalogWithInformationSchema::new( Arc::downgrade(&state.catalog_list), @@ -537,7 +537,7 @@ impl SessionContext { /// Retrieves a `CatalogProvider` instance by name pub fn catalog(&self, name: &str) -> Option> { - self.state.lock().catalog_list.catalog(name) + self.state.read().catalog_list.catalog(name) } /// Registers a table using a custom `TableProvider` so that @@ -553,7 +553,7 @@ impl SessionContext { ) -> Result>> { let table_ref = table_ref.into(); self.state - .lock() + .read() .schema_for_ref(table_ref)? .register_table(table_ref.table().to_owned(), provider) } @@ -567,7 +567,7 @@ impl SessionContext { ) -> Result>> { let table_ref = table_ref.into(); self.state - .lock() + .read() .schema_for_ref(table_ref)? .deregister_table(table_ref.table()) } @@ -581,7 +581,7 @@ impl SessionContext { let table_ref = table_ref.into(); Ok(self .state - .lock() + .read() .schema_for_ref(table_ref)? .table_exist(table_ref.table())) } @@ -595,7 +595,7 @@ impl SessionContext { table_ref: impl Into>, ) -> Result> { let table_ref = table_ref.into(); - let schema = self.state.lock().schema_for_ref(table_ref)?; + let schema = self.state.read().schema_for_ref(table_ref)?; match schema.table(table_ref.table()) { Some(ref provider) => { let plan = LogicalPlanBuilder::scan( @@ -624,7 +624,7 @@ impl SessionContext { pub fn tables(&self) -> Result> { Ok(self .state - .lock() + .read() // a bare reference will always resolve to the default catalog and schema .schema_for_ref(TableReference::Bare { table: "" })? .table_names() @@ -635,7 +635,7 @@ impl SessionContext { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - self.state.lock().optimize(plan) + self.state.read().optimize(plan) } /// Creates a physical plan from a logical plan. @@ -644,7 +644,7 @@ impl SessionContext { logical_plan: &LogicalPlan, ) -> Result> { let state_cloned = { - let mut state = self.state.lock(); + let mut state = self.state.write(); state.execution_props.start_execution(); // We need to clone `state` to release the lock that is not `Send`. We could @@ -668,7 +668,7 @@ impl SessionContext { plan: Arc, path: impl AsRef, ) -> Result<()> { - let state = self.state.lock().clone(); + let state = self.state.read().clone(); plan_to_csv(&state, plan, path).await } @@ -679,7 +679,7 @@ impl SessionContext { path: impl AsRef, writer_properties: Option, ) -> Result<()> { - let state = self.state.lock().clone(); + let state = self.state.read().clone(); plan_to_parquet(&state, plan, path, writer_properties).await } @@ -691,15 +691,15 @@ impl SessionContext { impl FunctionRegistry for SessionContext { fn udfs(&self) -> HashSet { - self.state.lock().udfs() + self.state.read().udfs() } fn udf(&self, name: &str) -> Result> { - self.state.lock().udf(name) + self.state.read().udf(name) } fn udaf(&self, name: &str) -> Result> { - self.state.lock().udaf(name) + self.state.read().udaf(name) } } @@ -732,17 +732,17 @@ impl QueryPlanner for DefaultQueryPlanner { } } -/// Session Configuration entry name +/// Session Configuration entry name for 'BATCH_SIZE' pub const BATCH_SIZE: &str = "batch_size"; -/// Session Configuration entry name +/// Session Configuration entry name for 'TARGET_PARTITIONS' pub const TARGET_PARTITIONS: &str = "target_partitions"; -/// Session Configuration entry name +/// Session Configuration entry name for 'REPARTITION_JOINS' pub const REPARTITION_JOINS: &str = "repartition_joins"; -/// Session Configuration entry name +/// Session Configuration entry name for 'REPARTITION_AGGREGATIONS' pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations"; -/// Session Configuration entry name +/// Session Configuration entry name for 'REPARTITION_WINDOWS' pub const REPARTITION_WINDOWS: &str = "repartition_windows"; -/// Session Configuration entry name +/// Session Configuration entry name for 'PARQUET_PRUNING' pub const PARQUET_PRUNING: &str = "parquet_pruning"; /// Configuration options for session context @@ -1224,8 +1224,8 @@ pub struct TaskContext { pub session_id: String, /// Optional Task Identify pub task_id: Option, - /// Task settings - pub task_settings: TaskProperties, + /// Task properties + pub properties: TaskProperties, /// Runtime environment associated with this task context pub runtime: Arc, } @@ -1241,15 +1241,15 @@ impl TaskContext { Self { task_id: Some(task_id), session_id, - task_settings: TaskProperties::KVPairs(task_settings), + properties: TaskProperties::KVPairs(task_settings), runtime, } } /// Return the SessionConfig associated with the Task pub fn session_config(&self) -> SessionConfig { - let task_settings = &self.task_settings; - match task_settings { + let task_props = &self.properties; + match task_props { TaskProperties::KVPairs(props) => { let session_config = SessionConfig::new(); session_config @@ -1283,12 +1283,12 @@ impl TaskContext { impl From<&SessionContext> for TaskContext { fn from(session: &SessionContext) -> Self { let session_id = session.session_id.clone(); - let config = session.state.lock().config.clone(); + let config = session.state.read().config.clone(); let runtime = session.runtime_env(); Self { task_id: None, session_id, - task_settings: TaskProperties::SessionConfig(config), + properties: TaskProperties::SessionConfig(config), runtime, } } @@ -1303,7 +1303,7 @@ impl From<&SessionState> for TaskContext { Self { task_id: None, session_id, - task_settings: TaskProperties::SessionConfig(config), + properties: TaskProperties::SessionConfig(config), runtime, } } @@ -2969,7 +2969,7 @@ mod tests { ctx.register_catalog("my_catalog", catalog); let catalog_list_weak = { - let state = ctx.state.lock(); + let state = ctx.state.read(); Arc::downgrade(&state.catalog_list) }; From a2bedc70f2524a62bfb164c6e5ca55865f39f2ad Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 21 Mar 2022 14:54:10 -0400 Subject: [PATCH 4/4] Fix minor merge issues --- ballista/rust/core/src/serde/mod.rs | 1 + datafusion-proto/src/from_proto.rs | 4 ++-- datafusion/src/execution/context.rs | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index a95bdb5308dd9..9e60c1b64b62a 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -350,6 +350,7 @@ mod tests { use datafusion::datasource::object_store::local::LocalFileSystem; use datafusion::error::DataFusionError; use datafusion::execution::context::{QueryPlanner, SessionState, TaskContext}; + use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::logical_plan::plan::Extension; use datafusion::logical_plan::{ col, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, diff --git a/datafusion-proto/src/from_proto.rs b/datafusion-proto/src/from_proto.rs index 1502f5fe4a772..f351aa85c1f38 100644 --- a/datafusion-proto/src/from_proto.rs +++ b/datafusion-proto/src/from_proto.rs @@ -1168,7 +1168,7 @@ pub fn parse_expr( ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => { let scalar_fn = ctx .state - .lock() + .read() .get_function_meta(fun_name.as_str()).ok_or_else(|| Error::General(format!("invalid aggregate function message, function {} is not registered in the ExecutionContext", fun_name)))?; Ok(Expr::ScalarUDF { @@ -1182,7 +1182,7 @@ pub fn parse_expr( ExprType::AggregateUdfExpr(protobuf::AggregateUdfExprNode { fun_name, args }) => { let agg_fn = ctx .state - .lock() + .read() .get_aggregate_meta(fun_name.as_str()).ok_or_else(|| Error::General(format!("invalid aggregate function message, function {} is not registered in the ExecutionContext", fun_name)))?; Ok(Expr::AggregateUDF { diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 3bf67f4b0a0f6..b484f2565b60e 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -992,7 +992,7 @@ impl SessionState { let default_catalog = MemoryCatalogProvider::new(); default_catalog.register_schema( - config.default_schema.clone(), + &config.default_schema, Arc::new(MemorySchemaProvider::new()), );