From 35f9e108c93eb6850f5a69c99185fba5f286f146 Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Mon, 10 Jan 2022 23:15:44 -0800 Subject: [PATCH 01/12] initial change --- ballista/rust/core/proto/ballista.proto | 6 +- .../core/src/serde/logical_plan/to_proto.rs | 8 +- ballista/rust/core/src/serde/mod.rs | 2 + datafusion/src/physical_plan/aggregates.rs | 39 +- .../coercion_rule/aggregate_rule.rs | 20 +- .../physical_plan/expressions/covariance.rs | 552 ++++++++++++++++++ .../src/physical_plan/expressions/mod.rs | 5 + datafusion/tests/sql/aggregates.rs | 12 + 8 files changed, 637 insertions(+), 7 deletions(-) create mode 100644 datafusion/src/physical_plan/expressions/covariance.rs diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index aa7b6a9f900fe..c12dbd3dfa959 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -171,8 +171,10 @@ enum AggregateFunction { ARRAY_AGG = 6; VARIANCE=7; VARIANCE_POP=8; - STDDEV=9; - STDDEV_POP=10; + COVARIANCE=9; + COVARIANCE_POP=10; + STDDEV=11; + STDDEV_POP=12; } message AggregateExprNode { diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 01428d9ba7a77..9a28606617546 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1027,9 +1027,9 @@ impl TryInto for &Expr { AggregateFunction::Avg => protobuf::AggregateFunction::Avg, AggregateFunction::Count => protobuf::AggregateFunction::Count, AggregateFunction::Variance => protobuf::AggregateFunction::Variance, - AggregateFunction::VariancePop => { - protobuf::AggregateFunction::VariancePop - } + AggregateFunction::VariancePop => protobuf::AggregateFunction::VariancePop, + AggregateFunction::Covariance => protobuf::AggregateFunction::Covariance, + AggregateFunction::CovariancePop => protobuf::AggregateFunction::CovariancePop, AggregateFunction::Stddev => protobuf::AggregateFunction::Stddev, AggregateFunction::StddevPop => { protobuf::AggregateFunction::StddevPop @@ -1266,6 +1266,8 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { AggregateFunction::ArrayAgg => Self::ArrayAgg, AggregateFunction::Variance => Self::Variance, AggregateFunction::VariancePop => Self::VariancePop, + AggregateFunction::Covariance => Self::Covariance, + AggregateFunction::CovariancePop => Self::CovariancePop, AggregateFunction::Stddev => Self::Stddev, AggregateFunction::StddevPop => Self::StddevPop, } diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index fd3b57b3deda1..83ba3511f9c17 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -121,6 +121,8 @@ impl From for AggregateFunction { protobuf::AggregateFunction::ArrayAgg => AggregateFunction::ArrayAgg, protobuf::AggregateFunction::Variance => AggregateFunction::Variance, protobuf::AggregateFunction::VariancePop => AggregateFunction::VariancePop, + protobuf::AggregateFunction::Covariance => AggregateFunction::Covariance, + protobuf::AggregateFunction::CovariancePop => AggregateFunction::CovariancePop, protobuf::AggregateFunction::Stddev => AggregateFunction::Stddev, protobuf::AggregateFunction::StddevPop => AggregateFunction::StddevPop, } diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 07b0ff8b33b29..5fb631c67aeaf 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -36,7 +36,7 @@ use crate::physical_plan::distinct_expressions; use crate::physical_plan::expressions; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use expressions::{ - avg_return_type, stddev_return_type, sum_return_type, variance_return_type, + avg_return_type, stddev_return_type, sum_return_type, variance_return_type, covariance_return_type, }; use std::{fmt, str::FromStr, sync::Arc}; @@ -74,6 +74,10 @@ pub enum AggregateFunction { Stddev, /// Standard Deviation (Population) StddevPop, + /// Covariance (Sample) + Covariance, + /// Covariance (Population) + CovariancePop, } impl fmt::Display for AggregateFunction { @@ -100,6 +104,9 @@ impl FromStr for AggregateFunction { "stddev" => AggregateFunction::Stddev, "stddev_samp" => AggregateFunction::Stddev, "stddev_pop" => AggregateFunction::StddevPop, + "covar" => AggregateFunction::Covariance, + "covar_samp" => AggregateFunction::Covariance, + "covar_pop" => AggregateFunction::CovariancePop, _ => { return Err(DataFusionError::Plan(format!( "There is no built-in function named {}", @@ -134,6 +141,8 @@ pub fn return_type( AggregateFunction::Sum => sum_return_type(&coerced_data_types[0]), AggregateFunction::Variance => variance_return_type(&coerced_data_types[0]), AggregateFunction::VariancePop => variance_return_type(&coerced_data_types[0]), + AggregateFunction::Covariance => covariance_return_type(&coerced_data_types[0]), + AggregateFunction::CovariancePop => covariance_return_type(&coerced_data_types[0]), AggregateFunction::Stddev => stddev_return_type(&coerced_data_types[0]), AggregateFunction::StddevPop => stddev_return_type(&coerced_data_types[0]), AggregateFunction::Avg => avg_return_type(&coerced_data_types[0]), @@ -254,6 +263,30 @@ pub fn create_aggregate_expr( "VAR_POP(DISTINCT) aggregations are not available".to_string(), )); } + (AggregateFunction::Covariance, false) => Arc::new(expressions::Covariance::new( + coerced_phy_exprs[0].clone(), + coerced_phy_exprs[1].clone(), + name, + return_type, + )), + (AggregateFunction::Covariance, true) => { + return Err(DataFusionError::NotImplemented( + "COVAR(DISTINCT) aggregations are not available".to_string(), + )); + } + (AggregateFunction::CovariancePop, false) => { + Arc::new(expressions::CovariancePop::new( + coerced_phy_exprs[0].clone(), + coerced_phy_exprs[1].clone(), + name, + return_type, + )) + } + (AggregateFunction::CovariancePop, true) => { + return Err(DataFusionError::NotImplemented( + "COVAR_POP(DISTINCT) aggregations are not available".to_string(), + )); + } (AggregateFunction::Stddev, false) => Arc::new(expressions::Stddev::new( coerced_phy_exprs[0].clone(), name, @@ -326,6 +359,10 @@ pub fn signature(fun: &AggregateFunction) -> Signature { | AggregateFunction::StddevPop => { Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) } + AggregateFunction::Covariance + | AggregateFunction::CovariancePop => { + Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable) + } } } diff --git a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs index d74b4e465c891..8f842b7037d7e 100644 --- a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs +++ b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs @@ -22,7 +22,7 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::aggregates::AggregateFunction; use crate::physical_plan::expressions::{ is_avg_support_arg_type, is_stddev_support_arg_type, is_sum_support_arg_type, - is_variance_support_arg_type, try_cast, + is_variance_support_arg_type, is_covariance_support_arg_type, try_cast, }; use crate::physical_plan::functions::{Signature, TypeSignature}; use crate::physical_plan::PhysicalExpr; @@ -105,6 +105,24 @@ pub(crate) fn coerce_types( } Ok(input_types.to_vec()) } + AggregateFunction::Covariance => { + if !is_covariance_support_arg_type(&input_types[0]) { + return Err(DataFusionError::Plan(format!( + "The function {:?} does not support inputs of type {:?}.", + agg_fun, input_types[0] + ))); + } + Ok(input_types.to_vec()) + } + AggregateFunction::CovariancePop => { + if !is_covariance_support_arg_type(&input_types[0]) { + return Err(DataFusionError::Plan(format!( + "The function {:?} does not support inputs of type {:?}.", + agg_fun, input_types[0] + ))); + } + Ok(input_types.to_vec()) + } AggregateFunction::Stddev => { if !is_stddev_support_arg_type(&input_types[0]) { return Err(DataFusionError::Plan(format!( diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs new file mode 100644 index 0000000000000..1ae08e92ec332 --- /dev/null +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -0,0 +1,552 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expressions that can evaluated at runtime during query execution + +use std::any::Any; +use std::sync::Arc; + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; +use crate::scalar::ScalarValue; +use arrow::array::Float64Array; +use arrow::{ + array::{ArrayRef, UInt64Array}, + compute::cast, + datatypes::DataType, + datatypes::Field, +}; + +use super::{format_state_name, StatsType}; + +/// COVAR and COVAR_SAMP aggregate expression +#[derive(Debug)] +pub struct Covariance { + name: String, + expr1: Arc, + expr2: Arc, +} + +/// COVAR_POP aggregate expression +#[derive(Debug)] +pub struct CovariancePop { + name: String, + expr1: Arc, + expr2: Arc, +} + +/// function return type of covariance +pub(crate) fn covariance_return_type(arg_type: &DataType) -> Result { + match arg_type { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 => Ok(DataType::Float64), + other => Err(DataFusionError::Plan(format!( + "VARIANCE does not support {:?}", + other + ))), + } +} + +pub(crate) fn is_covariance_support_arg_type(arg_type: &DataType) -> bool { + matches!( + arg_type, + DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + ) +} + +impl Covariance { + /// Create a new COVAR aggregate function + pub fn new( + expr1: Arc, + expr2: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { + // the result of covariance just support FLOAT64 data type. + assert!(matches!(data_type, DataType::Float64)); + Self { + name: name.into(), + expr1, + expr2, + } + } +} + +impl AggregateExpr for Covariance { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn field(&self) -> Result { + Ok(Field::new(&self.name, DataType::Float64, true)) + } + + fn create_accumulator(&self) -> Result> { + Ok(Box::new(CovarianceAccumulator::try_new(StatsType::Sample)?)) + } + + fn state_fields(&self) -> Result> { + Ok(vec![ + Field::new( + &format_state_name(&self.name, "count"), + DataType::UInt64, + true, + ), + Field::new( + &format_state_name(&self.name, "mean1"), + DataType::Float64, + true, + ), + Field::new( + &format_state_name(&self.name, "mean2"), + DataType::Float64, + true, + ), + Field::new( + &format_state_name(&self.name, "algo_const"), + DataType::Float64, + true, + ), + ]) + } + + fn expressions(&self) -> Vec> { + vec![self.expr1.clone(), self.expr2.clone()] + } + + fn name(&self) -> &str { + &self.name + } +} + +impl CovariancePop { + /// Create a new COVAR_POP aggregate function + pub fn new( + expr1: Arc, + expr2: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { + // the result of covariance just support FLOAT64 data type. + assert!(matches!(data_type, DataType::Float64)); + Self { + name: name.into(), + expr1, + expr2, + } + } +} + +impl AggregateExpr for CovariancePop { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn field(&self) -> Result { + Ok(Field::new(&self.name, DataType::Float64, true)) + } + + fn create_accumulator(&self) -> Result> { + Ok(Box::new(CovarianceAccumulator::try_new( + StatsType::Population, + )?)) + } + + fn state_fields(&self) -> Result> { + Ok(vec![ + Field::new( + &format_state_name(&self.name, "count"), + DataType::UInt64, + true, + ), + Field::new( + &format_state_name(&self.name, "mean1"), + DataType::Float64, + true, + ), + Field::new( + &format_state_name(&self.name, "mean2"), + DataType::Float64, + true, + ), + Field::new( + &format_state_name(&self.name, "algo_const"), + DataType::Float64, + true, + ), + ]) + } + + fn expressions(&self) -> Vec> { + vec![self.expr1.clone(), self.expr2.clone()] + } + + fn name(&self) -> &str { + &self.name + } +} + +/// An accumulator to compute covariance +/// The algrithm used is an online implementation and numerically stable. It is based on this paper: +/// Welford, B. P. (1962). "Note on a method for calculating corrected sums of squares and products". +/// Technometrics. 4 (3): 419–420. doi:10.2307/1266577. JSTOR 1266577. +/// +/// The algorithm has been analyzed here: +/// Ling, Robert F. (1974). "Comparison of Several Algorithms for Computing Sample Means and Variances". +/// Journal of the American Statistical Association. 69 (348): 859–866. doi:10.2307/2286154. JSTOR 2286154. + +#[derive(Debug)] +pub struct CovarianceAccumulator { + algo_const: f64, + mean1: f64, + mean2: f64, + count: u64, + stats_type: StatsType, +} + +impl CovarianceAccumulator { + /// Creates a new `CovarianceAccumulator` + pub fn try_new(s_type: StatsType) -> Result { + Ok(Self { + algo_const: 0_f64, + mean1: 0_f64, + mean2: 0_f64, + count: 0_u64, + stats_type: s_type, + }) + } + + pub fn get_count(&self) -> u64 { + self.count + } + + pub fn get_mean1(&self) -> f64 { + self.mean1 + } + + pub fn get_mean2(&self) -> f64 { + self.mean2 + } + + pub fn get_algo_const(&self) -> f64 { + self.algo_const + } +} + +impl Accumulator for CovarianceAccumulator { + fn state(&self) -> Result> { + Ok(vec![ + ScalarValue::from(self.count), + ScalarValue::from(self.mean1), + ScalarValue::from(self.mean2), + ScalarValue::from(self.algo_const), + ]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values1 = &cast(&values[0], &DataType::Float64)?; + let values2 = &cast(&values[1], &DataType::Float64)?; + + let arr1 = values1.as_any().downcast_ref::().unwrap(); + let arr2 = values2.as_any().downcast_ref::().unwrap(); + + + for i in 0..arr1.len() { + let value1 = arr1.value(i); + let value2 = arr2.value(i); + + if (value1 == 0_f64 && values1.is_null(i)) || (value2 == 0_f64 && values2.is_null(i)) { + if values2.is_null(i) && values1.is_null(i) { + continue; + } else { + return Err(DataFusionError::Internal( + "The two columns are not aligned".to_string(), + )); + } + } + + let new_count = self.count + 1; + let delta1 = value1 - self.mean1; + let new_mean1 = delta1 / new_count as f64 + self.mean1; + let delta2 = value2 - self.mean2; + let new_mean2 = delta2 / new_count as f64 + self.mean2; + let new_c = delta1 * (value2 - new_mean2) + self.algo_const; + + self.count += 1; + self.mean1 = new_mean1; + self.mean2 = new_mean2; + self.algo_const = new_c; + } + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let counts = states[0].as_any().downcast_ref::().unwrap(); + let means1 = states[1].as_any().downcast_ref::().unwrap(); + let means2 = states[2].as_any().downcast_ref::().unwrap(); + let cs = states[3].as_any().downcast_ref::().unwrap(); + + for i in 0..counts.len() { + let c = counts.value(i); + if c == 0_u64 { + continue; + } + let new_count = self.count + c; + let new_mean1 = (self.mean1 + means1.value(i)) / 2 as f64; + let new_mean2 = (self.mean2 + means2.value(i)) / 2 as f64; + let delta1 = self.mean1 - means1.value(i); + let delta2 = self.mean2 - means2.value(i); + let new_c = self.algo_const + cs.value(i) + delta1 * delta2 * self.count as f64 * c as f64 / new_count as f64; + + self.count = new_count; + self.mean1 = new_mean1; + self.mean2 = new_mean1; + self.algo_const = new_c; + } + Ok(()) + } + + fn update(&mut self, values: &[ScalarValue]) -> Result<()> { + Ok(()) + } + + fn merge(&mut self, states: &[ScalarValue]) -> Result<()> { + Ok(()) + } + + fn evaluate(&self) -> Result { + let count = match self.stats_type { + StatsType::Population => self.count, + StatsType::Sample => { + if self.count > 0 { + self.count - 1 + } else { + self.count + } + } + }; + + if count <= 1 { + return Err(DataFusionError::Internal( + "At least two values are needed to calculate variance".to_string(), + )); + } + + if self.count == 0 { + Ok(ScalarValue::Float64(None)) + } else { + Ok(ScalarValue::Float64(Some(self.algo_const / count as f64))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::physical_plan::expressions::col; + use crate::{error::Result, generic_test_op}; + use arrow::record_batch::RecordBatch; + use arrow::{array::*, datatypes::*}; + + #[test] + fn variance_f64_1() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64])); + generic_test_op!( + a, + DataType::Float64, + CovariancePop, + ScalarValue::from(0.25_f64), + DataType::Float64 + ) + } + + #[test] + fn variance_f64_2() -> Result<()> { + let a: ArrayRef = + Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + generic_test_op!( + a, + DataType::Float64, + CovariancePop, + ScalarValue::from(2_f64), + DataType::Float64 + ) + } + + #[test] + fn variance_f64_3() -> Result<()> { + let a: ArrayRef = + Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + generic_test_op!( + a, + DataType::Float64, + Covariance, + ScalarValue::from(2.5_f64), + DataType::Float64 + ) + } + + #[test] + fn variance_f64_4() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 3_f64])); + generic_test_op!( + a, + DataType::Float64, + Covariance, + ScalarValue::from(0.9033333333333333_f64), + DataType::Float64 + ) + } + + #[test] + fn variance_i32() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + generic_test_op!( + a, + DataType::Int32, + CovariancePop, + ScalarValue::from(2_f64), + DataType::Float64 + ) + } + + #[test] + fn variance_u32() -> Result<()> { + let a: ArrayRef = + Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + generic_test_op!( + a, + DataType::UInt32, + CovariancePop, + ScalarValue::from(2.0f64), + DataType::Float64 + ) + } + + #[test] + fn variance_f32() -> Result<()> { + let a: ArrayRef = + Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + generic_test_op!( + a, + DataType::Float32, + CovariancePop, + ScalarValue::from(2_f64), + DataType::Float64 + ) + } + + #[test] + fn test_variance_return_data_type() -> Result<()> { + let data_type = DataType::Float64; + let result_type = covariance_return_type(&data_type)?; + assert_eq!(DataType::Float64, result_type); + + let data_type = DataType::Decimal(36, 10); + assert!(covariance_return_type(&data_type).is_err()); + Ok(()) + } + + #[test] + fn test_variance_1_input() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64])); + let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + + let agg = Arc::new(Covariance::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + let actual = aggregate(&batch, agg); + assert!(actual.is_err()); + + Ok(()) + } + + #[test] + fn variance_i32_with_nulls() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(3), + Some(4), + Some(5), + ])); + generic_test_op!( + a, + DataType::Int32, + CovariancePop, + ScalarValue::from(2.1875f64), + DataType::Float64 + ) + } + + #[test] + fn variance_i32_all_nulls() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + + let agg = Arc::new(Covariance::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + let actual = aggregate(&batch, agg); + assert!(actual.is_err()); + + Ok(()) + } + + fn aggregate( + batch: &RecordBatch, + agg: Arc, + ) -> Result { + let mut accum = agg.create_accumulator()?; + let expr = agg.expressions(); + let values = expr + .iter() + .map(|e| e.evaluate(batch)) + .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .collect::>>()?; + accum.update_batch(&values)?; + accum.evaluate() + } +} diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index a85d867085572..25a04a4433970 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -55,6 +55,8 @@ mod stddev; mod sum; mod try_cast; mod variance; +mod covariance; + /// Module with some convenient methods used in expression building pub mod helpers { @@ -97,6 +99,9 @@ pub use try_cast::{try_cast, TryCastExpr}; pub(crate) use variance::{ is_variance_support_arg_type, variance_return_type, Variance, VariancePop, }; +pub(crate) use covariance::{ + is_covariance_support_arg_type, covariance_return_type, Covariance, CovariancePop, +}; /// returns the name of the state pub fn format_state_name(name: &str, state_name: &str) -> String { diff --git a/datafusion/tests/sql/aggregates.rs b/datafusion/tests/sql/aggregates.rs index edf530be8b7d1..2818941f0b47c 100644 --- a/datafusion/tests/sql/aggregates.rs +++ b/datafusion/tests/sql/aggregates.rs @@ -49,6 +49,18 @@ async fn csv_query_avg() -> Result<()> { Ok(()) } +#[tokio::test] +async fn csv_query_covariance_1() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).await?; + let sql = "SELECT covar_pop(c2, c12) FROM aggregate_test_100"; + let mut actual = execute(&mut ctx, sql).await; + actual.sort(); + let expected = vec![vec!["1.8675"]]; + assert_float_eq(&expected, &actual); + Ok(()) +} + #[tokio::test] async fn csv_query_variance_1() -> Result<()> { let mut ctx = ExecutionContext::new(); From c5221676cc6d0f4236024d8111b2bd9a504b1dfc Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 10:47:23 -0800 Subject: [PATCH 02/12] fix some logic --- datafusion/src/physical_plan/expressions/covariance.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index 1ae08e92ec332..a43809e36b49f 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -326,25 +326,25 @@ impl Accumulator for CovarianceAccumulator { continue; } let new_count = self.count + c; - let new_mean1 = (self.mean1 + means1.value(i)) / 2 as f64; - let new_mean2 = (self.mean2 + means2.value(i)) / 2 as f64; + let new_mean1 = self.mean1 * self.count as f64 / new_count as f64 + means1.value(i) * c as f64 / new_count as f64; + let new_mean2 = self.mean2 * self.count as f64 / new_count as f64 + means2.value(i) * c as f64 / new_count as f64; let delta1 = self.mean1 - means1.value(i); let delta2 = self.mean2 - means2.value(i); let new_c = self.algo_const + cs.value(i) + delta1 * delta2 * self.count as f64 * c as f64 / new_count as f64; self.count = new_count; self.mean1 = new_mean1; - self.mean2 = new_mean1; + self.mean2 = new_mean2; self.algo_const = new_c; } Ok(()) } - fn update(&mut self, values: &[ScalarValue]) -> Result<()> { + fn update(&mut self, _values: &[ScalarValue]) -> Result<()> { Ok(()) } - fn merge(&mut self, states: &[ScalarValue]) -> Result<()> { + fn merge(&mut self, _states: &[ScalarValue]) -> Result<()> { Ok(()) } From 081e8969cdc2aff63dbc11ed5029ffcaa54fdd04 Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 12:30:48 -0800 Subject: [PATCH 03/12] adding basic tests --- .../physical_plan/expressions/covariance.rs | 290 ++++++++++-------- .../src/physical_plan/expressions/mod.rs | 22 ++ 2 files changed, 177 insertions(+), 135 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index a43809e36b49f..94d203a141eb7 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -378,162 +378,182 @@ impl Accumulator for CovarianceAccumulator { mod tests { use super::*; use crate::physical_plan::expressions::col; - use crate::{error::Result, generic_test_op}; + use crate::{error::Result, generic_test_op2}; use arrow::record_batch::RecordBatch; use arrow::{array::*, datatypes::*}; #[test] - fn variance_f64_1() -> Result<()> { - let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64])); - generic_test_op!( - a, - DataType::Float64, - CovariancePop, - ScalarValue::from(0.25_f64), - DataType::Float64 - ) - } + fn covariance_f64_1() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64])); - #[test] - fn variance_f64_2() -> Result<()> { - let a: ArrayRef = - Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); - generic_test_op!( - a, - DataType::Float64, - CovariancePop, - ScalarValue::from(2_f64), - DataType::Float64 - ) - } - - #[test] - fn variance_f64_3() -> Result<()> { - let a: ArrayRef = - Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); - generic_test_op!( + generic_test_op2!( a, + b, DataType::Float64, - Covariance, - ScalarValue::from(2.5_f64), - DataType::Float64 - ) - } - - #[test] - fn variance_f64_4() -> Result<()> { - let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 3_f64])); - generic_test_op!( - a, DataType::Float64, - Covariance, - ScalarValue::from(0.9033333333333333_f64), - DataType::Float64 - ) - } - - #[test] - fn variance_i32() -> Result<()> { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); - generic_test_op!( - a, - DataType::Int32, CovariancePop, - ScalarValue::from(2_f64), + ScalarValue::from(0.6666666666666666), DataType::Float64 ) } #[test] - fn variance_u32() -> Result<()> { - let a: ArrayRef = - Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); - generic_test_op!( - a, - DataType::UInt32, - CovariancePop, - ScalarValue::from(2.0f64), - DataType::Float64 - ) - } + fn covariance_f64_2() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64])); - #[test] - fn variance_f32() -> Result<()> { - let a: ArrayRef = - Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); - generic_test_op!( + generic_test_op2!( a, - DataType::Float32, - CovariancePop, - ScalarValue::from(2_f64), - DataType::Float64 - ) - } - - #[test] - fn test_variance_return_data_type() -> Result<()> { - let data_type = DataType::Float64; - let result_type = covariance_return_type(&data_type)?; - assert_eq!(DataType::Float64, result_type); - - let data_type = DataType::Decimal(36, 10); - assert!(covariance_return_type(&data_type).is_err()); - Ok(()) - } - - #[test] - fn test_variance_1_input() -> Result<()> { - let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64])); - let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; - - let agg = Arc::new(Covariance::new( - col("a", &schema)?, - col("b", &schema)?, - "bla".to_string(), + b, DataType::Float64, - )); - let actual = aggregate(&batch, agg); - assert!(actual.is_err()); - - Ok(()) - } - - #[test] - fn variance_i32_with_nulls() -> Result<()> { - let a: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(1), - None, - Some(3), - Some(4), - Some(5), - ])); - generic_test_op!( - a, - DataType::Int32, - CovariancePop, - ScalarValue::from(2.1875f64), + DataType::Float64, + Covariance, + ScalarValue::from(1_f64), DataType::Float64 ) } - #[test] - fn variance_i32_all_nulls() -> Result<()> { - let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; - - let agg = Arc::new(Covariance::new( - col("a", &schema)?, - col("b", &schema)?, - "bla".to_string(), - DataType::Float64, - )); - let actual = aggregate(&batch, agg); - assert!(actual.is_err()); - - Ok(()) - } + // #[test] + // fn variance_f64_2() -> Result<()> { + // let a: ArrayRef = + // Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + // generic_test_op!( + // a, + // DataType::Float64, + // CovariancePop, + // ScalarValue::from(2_f64), + // DataType::Float64 + // ) + // } + + // #[test] + // fn variance_f64_3() -> Result<()> { + // let a: ArrayRef = + // Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + // generic_test_op!( + // a, + // DataType::Float64, + // Covariance, + // ScalarValue::from(2.5_f64), + // DataType::Float64 + // ) + // } + + // #[test] + // fn variance_f64_4() -> Result<()> { + // let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 3_f64])); + // generic_test_op!( + // a, + // DataType::Float64, + // Covariance, + // ScalarValue::from(0.9033333333333333_f64), + // DataType::Float64 + // ) + // } + + // #[test] + // fn variance_i32() -> Result<()> { + // let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + // generic_test_op!( + // a, + // DataType::Int32, + // CovariancePop, + // ScalarValue::from(2_f64), + // DataType::Float64 + // ) + // } + + // #[test] + // fn variance_u32() -> Result<()> { + // let a: ArrayRef = + // Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); + // generic_test_op!( + // a, + // DataType::UInt32, + // CovariancePop, + // ScalarValue::from(2.0f64), + // DataType::Float64 + // ) + // } + + // #[test] + // fn variance_f32() -> Result<()> { + // let a: ArrayRef = + // Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); + // generic_test_op!( + // a, + // DataType::Float32, + // CovariancePop, + // ScalarValue::from(2_f64), + // DataType::Float64 + // ) + // } + + // #[test] + // fn test_variance_return_data_type() -> Result<()> { + // let data_type = DataType::Float64; + // let result_type = covariance_return_type(&data_type)?; + // assert_eq!(DataType::Float64, result_type); + + // let data_type = DataType::Decimal(36, 10); + // assert!(covariance_return_type(&data_type).is_err()); + // Ok(()) + // } + + // #[test] + // fn test_variance_1_input() -> Result<()> { + // let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64])); + // let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); + // let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + + // let agg = Arc::new(Covariance::new( + // col("a", &schema)?, + // col("b", &schema)?, + // "bla".to_string(), + // DataType::Float64, + // )); + // let actual = aggregate(&batch, agg); + // assert!(actual.is_err()); + + // Ok(()) + // } + + // #[test] + // fn variance_i32_with_nulls() -> Result<()> { + // let a: ArrayRef = Arc::new(Int32Array::from(vec![ + // Some(1), + // None, + // Some(3), + // Some(4), + // Some(5), + // ])); + // generic_test_op!( + // a, + // DataType::Int32, + // CovariancePop, + // ScalarValue::from(2.1875f64), + // DataType::Float64 + // ) + // } + + // #[test] + // fn variance_i32_all_nulls() -> Result<()> { + // let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); + // let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + // let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + + // let agg = Arc::new(Covariance::new( + // col("a", &schema)?, + // col("b", &schema)?, + // "bla".to_string(), + // DataType::Float64, + // )); + // let actual = aggregate(&batch, agg); + // assert!(actual.is_err()); + + // Ok(()) + // } fn aggregate( batch: &RecordBatch, diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 25a04a4433970..eccf1937ac1f5 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -177,6 +177,28 @@ mod tests { }}; } + /// macro to perform an aggregation with two inputs and verify the result. + #[macro_export] + macro_rules! generic_test_op2 { + ($ARRAY1:expr, $ARRAY2:expr, $DATATYPE1:expr, $DATATYPE2:expr, $OP:ident, $EXPECTED:expr, $EXPECTED_DATATYPE:expr) => {{ + let schema = Schema::new(vec![Field::new("a", $DATATYPE1, false), Field::new("b", $DATATYPE2, false)]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY1, $ARRAY2])?; + + let agg = Arc::new(<$OP>::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + $EXPECTED_DATATYPE, + )); + let actual = aggregate(&batch, agg)?; + let expected = ScalarValue::from($EXPECTED); + + assert_eq!(expected, actual); + + Ok(()) + }}; + } + pub fn aggregate( batch: &RecordBatch, agg: Arc, From ab0886bcc99e5e664e672af332ca3305bc3ab92b Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 14:23:29 -0800 Subject: [PATCH 04/12] mark update and merge unimplemented --- .../physical_plan/expressions/covariance.rs | 308 ++++++++++-------- 1 file changed, 166 insertions(+), 142 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index 94d203a141eb7..16777a8cc27df 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -341,11 +341,11 @@ impl Accumulator for CovarianceAccumulator { } fn update(&mut self, _values: &[ScalarValue]) -> Result<()> { - Ok(()) + unimplemented!("update_batch is implemented instead"); } fn merge(&mut self, _states: &[ScalarValue]) -> Result<()> { - Ok(()) + unimplemented!("merge_batch is implemented instead"); } fn evaluate(&self) -> Result { @@ -414,146 +414,170 @@ mod tests { ) } - // #[test] - // fn variance_f64_2() -> Result<()> { - // let a: ArrayRef = - // Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); - // generic_test_op!( - // a, - // DataType::Float64, - // CovariancePop, - // ScalarValue::from(2_f64), - // DataType::Float64 - // ) - // } - - // #[test] - // fn variance_f64_3() -> Result<()> { - // let a: ArrayRef = - // Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); - // generic_test_op!( - // a, - // DataType::Float64, - // Covariance, - // ScalarValue::from(2.5_f64), - // DataType::Float64 - // ) - // } - - // #[test] - // fn variance_f64_4() -> Result<()> { - // let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 3_f64])); - // generic_test_op!( - // a, - // DataType::Float64, - // Covariance, - // ScalarValue::from(0.9033333333333333_f64), - // DataType::Float64 - // ) - // } - - // #[test] - // fn variance_i32() -> Result<()> { - // let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); - // generic_test_op!( - // a, - // DataType::Int32, - // CovariancePop, - // ScalarValue::from(2_f64), - // DataType::Float64 - // ) - // } - - // #[test] - // fn variance_u32() -> Result<()> { - // let a: ArrayRef = - // Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32])); - // generic_test_op!( - // a, - // DataType::UInt32, - // CovariancePop, - // ScalarValue::from(2.0f64), - // DataType::Float64 - // ) - // } - - // #[test] - // fn variance_f32() -> Result<()> { - // let a: ArrayRef = - // Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32])); - // generic_test_op!( - // a, - // DataType::Float32, - // CovariancePop, - // ScalarValue::from(2_f64), - // DataType::Float64 - // ) - // } - - // #[test] - // fn test_variance_return_data_type() -> Result<()> { - // let data_type = DataType::Float64; - // let result_type = covariance_return_type(&data_type)?; - // assert_eq!(DataType::Float64, result_type); - - // let data_type = DataType::Decimal(36, 10); - // assert!(covariance_return_type(&data_type).is_err()); - // Ok(()) - // } - - // #[test] - // fn test_variance_1_input() -> Result<()> { - // let a: ArrayRef = Arc::new(Float64Array::from(vec![1_f64])); - // let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); - // let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; - - // let agg = Arc::new(Covariance::new( - // col("a", &schema)?, - // col("b", &schema)?, - // "bla".to_string(), - // DataType::Float64, - // )); - // let actual = aggregate(&batch, agg); - // assert!(actual.is_err()); - - // Ok(()) - // } - - // #[test] - // fn variance_i32_with_nulls() -> Result<()> { - // let a: ArrayRef = Arc::new(Int32Array::from(vec![ - // Some(1), - // None, - // Some(3), - // Some(4), - // Some(5), - // ])); - // generic_test_op!( - // a, - // DataType::Int32, - // CovariancePop, - // ScalarValue::from(2.1875f64), - // DataType::Float64 - // ) - // } - - // #[test] - // fn variance_i32_all_nulls() -> Result<()> { - // let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); - // let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - // let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; - - // let agg = Arc::new(Covariance::new( - // col("a", &schema)?, - // col("b", &schema)?, - // "bla".to_string(), - // DataType::Float64, - // )); - // let actual = aggregate(&batch, agg); - // assert!(actual.is_err()); - - // Ok(()) - // } + #[test] + fn covariance_f64_4() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 3_f64])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![4.1_f64, 5_f64, 6_f64])); + + generic_test_op2!( + a, + b, + DataType::Float64, + DataType::Float64, + Covariance, + ScalarValue::from(0.9033333333333335_f64), + DataType::Float64 + ) + } + + #[test] + fn covariance_f64_5() -> Result<()> { + let a: ArrayRef = Arc::new(Float64Array::from(vec![1.1_f64, 2_f64, 3_f64])); + let b: ArrayRef = Arc::new(Float64Array::from(vec![4.1_f64, 5_f64, 6_f64])); + + generic_test_op2!( + a, + b, + DataType::Float64, + DataType::Float64, + CovariancePop, + ScalarValue::from(0.6022222222222223_f64), + DataType::Float64 + ) + } + + #[test] + fn covariance_i32() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6])); + + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + CovariancePop, + ScalarValue::from(0.6666666666666666_f64), + DataType::Float64 + ) + } + + #[test] + fn covariance_u32() -> Result<()> { + let a: ArrayRef = + Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32])); + let b: ArrayRef = + Arc::new(UInt32Array::from(vec![4_u32, 5_u32, 6_u32])); + generic_test_op2!( + a, + b, + DataType::UInt32, + DataType::UInt32, + CovariancePop, + ScalarValue::from(0.6666666666666666_f64), + DataType::Float64 + ) + } + + #[test] + fn covariance_f32() -> Result<()> { + let a: ArrayRef = + Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32])); + let b: ArrayRef = + Arc::new(Float32Array::from(vec![4_f32, 5_f32, 6_f32])); + generic_test_op2!( + a, + b, + DataType::Float32, + DataType::Float32, + CovariancePop, + ScalarValue::from(0.6666666666666666_f64), + DataType::Float64 + ) + } + + #[test] + fn test_covariance_return_data_type() -> Result<()> { + let data_type = DataType::Float64; + let result_type = covariance_return_type(&data_type)?; + assert_eq!(DataType::Float64, result_type); + + let data_type = DataType::Decimal(36, 10); + assert!(covariance_return_type(&data_type).is_err()); + Ok(()) + } + + #[test] + fn variance_i32_with_nulls_1() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(3), + ])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(4), + None, + Some(6), + ])); + + generic_test_op2!( + a, + b, + DataType::Int32, + DataType::Int32, + CovariancePop, + ScalarValue::from(1_f64), + DataType::Float64 + ) + } + + #[test] + fn variance_i32_with_nulls_2() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + None, + Some(3), + ])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(4), + Some(5), + Some(6), + ])); + + let schema = Schema::new(vec![Field::new("b", DataType::Int32, false)]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; + + let agg = Arc::new(Covariance::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + let actual = aggregate(&batch, agg); + assert!(actual.is_err()); + + Ok(()) + } + + #[test] + fn variance_i32_all_nulls() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); + + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false)]); + let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; + + let agg = Arc::new(Covariance::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + let actual = aggregate(&batch, agg); + assert!(actual.is_err()); + + Ok(()) + } fn aggregate( batch: &RecordBatch, From ed31b9b82da594cdcb56ebc5fcc24dc96cd8f36a Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 14:38:31 -0800 Subject: [PATCH 05/12] all tests pass --- datafusion/src/physical_plan/expressions/covariance.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index 16777a8cc27df..d21582c9d1104 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -362,7 +362,7 @@ impl Accumulator for CovarianceAccumulator { if count <= 1 { return Err(DataFusionError::Internal( - "At least two values are needed to calculate variance".to_string(), + "At least two values are needed to calculate covariance".to_string(), )); } @@ -508,7 +508,7 @@ mod tests { } #[test] - fn variance_i32_with_nulls_1() -> Result<()> { + fn covariance_i32_with_nulls_1() -> Result<()> { let a: ArrayRef = Arc::new(Int32Array::from(vec![ Some(1), None, @@ -532,7 +532,7 @@ mod tests { } #[test] - fn variance_i32_with_nulls_2() -> Result<()> { + fn covariance_i32_with_nulls_2() -> Result<()> { let a: ArrayRef = Arc::new(Int32Array::from(vec![ Some(1), None, @@ -544,7 +544,7 @@ mod tests { Some(6), ])); - let schema = Schema::new(vec![Field::new("b", DataType::Int32, false)]); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; let agg = Arc::new(Covariance::new( @@ -560,7 +560,7 @@ mod tests { } #[test] - fn variance_i32_all_nulls() -> Result<()> { + fn covariance_i32_all_nulls() -> Result<()> { let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); From f1ff993d4c5e2a91dba700df4b941f9cea39864a Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 15:36:58 -0800 Subject: [PATCH 06/12] add doc and tests --- README.md | 2 +- .../core/src/serde/logical_plan/to_proto.rs | 12 ++- ballista/rust/core/src/serde/mod.rs | 4 +- datafusion/src/physical_plan/aggregates.rs | 10 +- .../coercion_rule/aggregate_rule.rs | 4 +- .../physical_plan/expressions/covariance.rs | 96 +++++++++---------- .../src/physical_plan/expressions/mod.rs | 17 ++-- datafusion/tests/sql/aggregates.rs | 14 ++- 8 files changed, 88 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 82089f1bd08b3..19ccdef31f7ed 100644 --- a/README.md +++ b/README.md @@ -268,7 +268,7 @@ This library currently supports many SQL constructs, including - `CAST` to change types, including e.g. `Timestamp(Nanosecond, None)` - Many mathematical unary and binary expressions such as `+`, `/`, `sqrt`, `tan`, `>=`. - `WHERE` to filter -- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`, `COUNT`, `SUM`, `AVG`, `VAR`, `STDDEV` (sample and population) +- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`, `COUNT`, `SUM`, `AVG`, `VAR`, `COVAR`, `STDDEV` (sample and population) - `ORDER BY` together with an expression and optional `ASC` or `DESC` and also optional `NULLS FIRST` or `NULLS LAST` ## Supported Functions diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 9a28606617546..46cf10578b491 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1027,9 +1027,15 @@ impl TryInto for &Expr { AggregateFunction::Avg => protobuf::AggregateFunction::Avg, AggregateFunction::Count => protobuf::AggregateFunction::Count, AggregateFunction::Variance => protobuf::AggregateFunction::Variance, - AggregateFunction::VariancePop => protobuf::AggregateFunction::VariancePop, - AggregateFunction::Covariance => protobuf::AggregateFunction::Covariance, - AggregateFunction::CovariancePop => protobuf::AggregateFunction::CovariancePop, + AggregateFunction::VariancePop => { + protobuf::AggregateFunction::VariancePop + } + AggregateFunction::Covariance => { + protobuf::AggregateFunction::Covariance + } + AggregateFunction::CovariancePop => { + protobuf::AggregateFunction::CovariancePop + } AggregateFunction::Stddev => protobuf::AggregateFunction::Stddev, AggregateFunction::StddevPop => { protobuf::AggregateFunction::StddevPop diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index 83ba3511f9c17..95c3241822e49 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -122,7 +122,9 @@ impl From for AggregateFunction { protobuf::AggregateFunction::Variance => AggregateFunction::Variance, protobuf::AggregateFunction::VariancePop => AggregateFunction::VariancePop, protobuf::AggregateFunction::Covariance => AggregateFunction::Covariance, - protobuf::AggregateFunction::CovariancePop => AggregateFunction::CovariancePop, + protobuf::AggregateFunction::CovariancePop => { + AggregateFunction::CovariancePop + } protobuf::AggregateFunction::Stddev => AggregateFunction::Stddev, protobuf::AggregateFunction::StddevPop => AggregateFunction::StddevPop, } diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 5a51e3577f3d5..949dbb1e05bac 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -36,7 +36,8 @@ use crate::physical_plan::distinct_expressions; use crate::physical_plan::expressions; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use expressions::{ - avg_return_type, stddev_return_type, sum_return_type, variance_return_type, covariance_return_type, + avg_return_type, covariance_return_type, stddev_return_type, sum_return_type, + variance_return_type, }; use std::{fmt, str::FromStr, sync::Arc}; @@ -142,7 +143,9 @@ pub fn return_type( AggregateFunction::Variance => variance_return_type(&coerced_data_types[0]), AggregateFunction::VariancePop => variance_return_type(&coerced_data_types[0]), AggregateFunction::Covariance => covariance_return_type(&coerced_data_types[0]), - AggregateFunction::CovariancePop => covariance_return_type(&coerced_data_types[0]), + AggregateFunction::CovariancePop => { + covariance_return_type(&coerced_data_types[0]) + } AggregateFunction::Stddev => stddev_return_type(&coerced_data_types[0]), AggregateFunction::StddevPop => stddev_return_type(&coerced_data_types[0]), AggregateFunction::Avg => avg_return_type(&coerced_data_types[0]), @@ -364,8 +367,7 @@ pub fn signature(fun: &AggregateFunction) -> Signature { | AggregateFunction::StddevPop => { Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) } - AggregateFunction::Covariance - | AggregateFunction::CovariancePop => { + AggregateFunction::Covariance | AggregateFunction::CovariancePop => { Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable) } } diff --git a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs index 8f842b7037d7e..2b02ba0eba8a0 100644 --- a/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs +++ b/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs @@ -21,8 +21,8 @@ use crate::arrow::datatypes::Schema; use crate::error::{DataFusionError, Result}; use crate::physical_plan::aggregates::AggregateFunction; use crate::physical_plan::expressions::{ - is_avg_support_arg_type, is_stddev_support_arg_type, is_sum_support_arg_type, - is_variance_support_arg_type, is_covariance_support_arg_type, try_cast, + is_avg_support_arg_type, is_covariance_support_arg_type, is_stddev_support_arg_type, + is_sum_support_arg_type, is_variance_support_arg_type, try_cast, }; use crate::physical_plan::functions::{Signature, TypeSignature}; use crate::physical_plan::PhysicalExpr; diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index d21582c9d1104..6888f520e28bb 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -249,21 +249,22 @@ impl CovarianceAccumulator { }) } - pub fn get_count(&self) -> u64 { - self.count - } - - pub fn get_mean1(&self) -> f64 { - self.mean1 - } - - pub fn get_mean2(&self) -> f64 { - self.mean2 - } - - pub fn get_algo_const(&self) -> f64 { - self.algo_const - } + // These functions are commented out for now, they will be used in the next PR + // pub fn get_count(&self) -> u64 { + // self.count + // } + + // pub fn get_mean1(&self) -> f64 { + // self.mean1 + // } + + // pub fn get_mean2(&self) -> f64 { + // self.mean2 + // } + + // pub fn get_algo_const(&self) -> f64 { + // self.algo_const + // } } impl Accumulator for CovarianceAccumulator { @@ -283,18 +284,19 @@ impl Accumulator for CovarianceAccumulator { let arr1 = values1.as_any().downcast_ref::().unwrap(); let arr2 = values2.as_any().downcast_ref::().unwrap(); - for i in 0..arr1.len() { let value1 = arr1.value(i); let value2 = arr2.value(i); - if (value1 == 0_f64 && values1.is_null(i)) || (value2 == 0_f64 && values2.is_null(i)) { + if (value1 == 0_f64 && values1.is_null(i)) + || (value2 == 0_f64 && values2.is_null(i)) + { if values2.is_null(i) && values1.is_null(i) { continue; } else { return Err(DataFusionError::Internal( "The two columns are not aligned".to_string(), - )); + )); } } @@ -326,11 +328,15 @@ impl Accumulator for CovarianceAccumulator { continue; } let new_count = self.count + c; - let new_mean1 = self.mean1 * self.count as f64 / new_count as f64 + means1.value(i) * c as f64 / new_count as f64; - let new_mean2 = self.mean2 * self.count as f64 / new_count as f64 + means2.value(i) * c as f64 / new_count as f64; + let new_mean1 = self.mean1 * self.count as f64 / new_count as f64 + + means1.value(i) * c as f64 / new_count as f64; + let new_mean2 = self.mean2 * self.count as f64 / new_count as f64 + + means2.value(i) * c as f64 / new_count as f64; let delta1 = self.mean1 - means1.value(i); let delta2 = self.mean2 - means2.value(i); - let new_c = self.algo_const + cs.value(i) + delta1 * delta2 * self.count as f64 * c as f64 / new_count as f64; + let new_c = self.algo_const + + cs.value(i) + + delta1 * delta2 * self.count as f64 * c as f64 / new_count as f64; self.count = new_count; self.mean1 = new_mean1; @@ -464,10 +470,8 @@ mod tests { #[test] fn covariance_u32() -> Result<()> { - let a: ArrayRef = - Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32])); - let b: ArrayRef = - Arc::new(UInt32Array::from(vec![4_u32, 5_u32, 6_u32])); + let a: ArrayRef = Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32])); + let b: ArrayRef = Arc::new(UInt32Array::from(vec![4_u32, 5_u32, 6_u32])); generic_test_op2!( a, b, @@ -481,10 +485,8 @@ mod tests { #[test] fn covariance_f32() -> Result<()> { - let a: ArrayRef = - Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32])); - let b: ArrayRef = - Arc::new(Float32Array::from(vec![4_f32, 5_f32, 6_f32])); + let a: ArrayRef = Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32])); + let b: ArrayRef = Arc::new(Float32Array::from(vec![4_f32, 5_f32, 6_f32])); generic_test_op2!( a, b, @@ -509,16 +511,8 @@ mod tests { #[test] fn covariance_i32_with_nulls_1() -> Result<()> { - let a: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(1), - None, - Some(3), - ])); - let b: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(4), - None, - Some(6), - ])); + let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), None, Some(6)])); generic_test_op2!( a, @@ -533,18 +527,13 @@ mod tests { #[test] fn covariance_i32_with_nulls_2() -> Result<()> { - let a: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(1), - None, - Some(3), - ])); - let b: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(4), - Some(5), - Some(6), - ])); - - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false)]); + let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![Some(4), Some(5), Some(6)])); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; let agg = Arc::new(Covariance::new( @@ -564,7 +553,10 @@ mod tests { let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); let b: ArrayRef = Arc::new(Int32Array::from(vec![None, None])); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false)]); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; let agg = Arc::new(Covariance::new( diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index eccf1937ac1f5..694326599eb37 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -44,6 +44,7 @@ mod lead_lag; mod literal; #[macro_use] mod min_max; +mod covariance; mod negative; mod not; mod nth_value; @@ -55,8 +56,6 @@ mod stddev; mod sum; mod try_cast; mod variance; -mod covariance; - /// Module with some convenient methods used in expression building pub mod helpers { @@ -74,6 +73,9 @@ pub use cast::{ }; pub use column::{col, Column}; pub use count::Count; +pub(crate) use covariance::{ + covariance_return_type, is_covariance_support_arg_type, Covariance, CovariancePop, +}; pub use cume_dist::cume_dist; pub use get_indexed_field::GetIndexedFieldExpr; pub use in_list::{in_list, InListExpr}; @@ -99,9 +101,6 @@ pub use try_cast::{try_cast, TryCastExpr}; pub(crate) use variance::{ is_variance_support_arg_type, variance_return_type, Variance, VariancePop, }; -pub(crate) use covariance::{ - is_covariance_support_arg_type, covariance_return_type, Covariance, CovariancePop, -}; /// returns the name of the state pub fn format_state_name(name: &str, state_name: &str) -> String { @@ -181,8 +180,12 @@ mod tests { #[macro_export] macro_rules! generic_test_op2 { ($ARRAY1:expr, $ARRAY2:expr, $DATATYPE1:expr, $DATATYPE2:expr, $OP:ident, $EXPECTED:expr, $EXPECTED_DATATYPE:expr) => {{ - let schema = Schema::new(vec![Field::new("a", $DATATYPE1, false), Field::new("b", $DATATYPE2, false)]); - let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY1, $ARRAY2])?; + let schema = Schema::new(vec![ + Field::new("a", $DATATYPE1, false), + Field::new("b", $DATATYPE2, false), + ]); + let batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![$ARRAY1, $ARRAY2])?; let agg = Arc::new(<$OP>::new( col("a", &schema)?, diff --git a/datafusion/tests/sql/aggregates.rs b/datafusion/tests/sql/aggregates.rs index 2818941f0b47c..eb656012c0a01 100644 --- a/datafusion/tests/sql/aggregates.rs +++ b/datafusion/tests/sql/aggregates.rs @@ -56,7 +56,19 @@ async fn csv_query_covariance_1() -> Result<()> { let sql = "SELECT covar_pop(c2, c12) FROM aggregate_test_100"; let mut actual = execute(&mut ctx, sql).await; actual.sort(); - let expected = vec![vec!["1.8675"]]; + let expected = vec![vec!["-0.07916932235380847"]]; + assert_float_eq(&expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn csv_query_covariance_2() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_csv(&mut ctx).await?; + let sql = "SELECT covar(c2, c12) FROM aggregate_test_100"; + let mut actual = execute(&mut ctx, sql).await; + actual.sort(); + let expected = vec![vec!["-0.07996901247859442"]]; assert_float_eq(&expected, &actual); Ok(()) } From bbb1244d074227edced5b68afba6b6944d66440f Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 22:02:03 -0800 Subject: [PATCH 07/12] lint --- .../physical_plan/expressions/covariance.rs | 94 ++++++++++++++++++- 1 file changed, 93 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index 6888f520e28bb..b4b41e37fe73b 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -220,13 +220,17 @@ impl AggregateExpr for CovariancePop { } /// An accumulator to compute covariance -/// The algrithm used is an online implementation and numerically stable. It is based on this paper: +/// The algrithm used is an online implementation and numerically stable. It is derived from the following paper +/// for calculating variance: /// Welford, B. P. (1962). "Note on a method for calculating corrected sums of squares and products". /// Technometrics. 4 (3): 419–420. doi:10.2307/1266577. JSTOR 1266577. /// /// The algorithm has been analyzed here: /// Ling, Robert F. (1974). "Comparison of Several Algorithms for Computing Sample Means and Variances". /// Journal of the American Statistical Association. 69 (348): 859–866. doi:10.2307/2286154. JSTOR 2286154. +/// +/// Though it is not covered in the original paper but is based on the same idea, as a result the algorithm is online, +/// parallelizable and numerically stable. #[derive(Debug)] pub struct CovarianceAccumulator { @@ -452,6 +456,26 @@ mod tests { ) } + #[test] + fn covariance_f64_6() -> Result<()> { + let a = Arc::new(Float64Array::from(vec![ + 1_f64, 2_f64, 3_f64, 1.1_f64, 2.2_f64, 3.3_f64, + ])); + let b = Arc::new(Float64Array::from(vec![ + 4_f64, 5_f64, 6_f64, 4.4_f64, 5.5_f64, 6.6_f64, + ])); + + generic_test_op2!( + a, + b, + DataType::Float64, + DataType::Float64, + CovariancePop, + ScalarValue::from(0.7616666666666666), + DataType::Float64 + ) + } + #[test] fn covariance_i32() -> Result<()> { let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); @@ -571,6 +595,41 @@ mod tests { Ok(()) } + #[test] + fn covariance_f64_merge_1() -> Result<()> { + let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64])); + let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64])); + let c = Arc::new(Float64Array::from(vec![1.1_f64, 2.2_f64, 3.3_f64])); + let d = Arc::new(Float64Array::from(vec![4.4_f64, 5.5_f64, 6.6_f64])); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Float64, false), + Field::new("b", DataType::Float64, false), + ]); + + let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; + let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c, d])?; + + let agg1 = Arc::new(CovariancePop::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let agg2 = Arc::new(CovariancePop::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let actual = merge(&batch1, &batch2, agg1, agg2)?; + assert!(actual == ScalarValue::from(0.7616666666666666)); + + Ok(()) + } + fn aggregate( batch: &RecordBatch, agg: Arc, @@ -585,4 +644,37 @@ mod tests { accum.update_batch(&values)?; accum.evaluate() } + + fn merge( + batch1: &RecordBatch, + batch2: &RecordBatch, + agg1: Arc, + agg2: Arc, + ) -> Result { + let mut accum1 = agg1.create_accumulator()?; + let mut accum2 = agg2.create_accumulator()?; + let expr1 = agg1.expressions(); + let expr2 = agg2.expressions(); + + let values1 = expr1 + .iter() + .map(|e| e.evaluate(batch1)) + .map(|r| r.map(|v| v.into_array(batch1.num_rows()))) + .collect::>>()?; + let values2 = expr2 + .iter() + .map(|e| e.evaluate(batch2)) + .map(|r| r.map(|v| v.into_array(batch2.num_rows()))) + .collect::>>()?; + accum1.update_batch(&values1)?; + accum2.update_batch(&values2)?; + let state2 = accum2 + .state()? + .iter() + .map(|v| vec![v.clone()]) + .map(|x| ScalarValue::iter_to_array(x.clone()).map(|e| e).unwrap()) + .collect::>(); + accum1.merge_batch(&state2)?; + accum1.evaluate() + } } From d5188b158f9e1e61ef947490c6f3940a208f799e Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 22:04:13 -0800 Subject: [PATCH 08/12] clipy --- datafusion/src/physical_plan/expressions/covariance.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index b4b41e37fe73b..98af2b7ef1857 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -672,7 +672,7 @@ mod tests { .state()? .iter() .map(|v| vec![v.clone()]) - .map(|x| ScalarValue::iter_to_array(x.clone()).map(|e| e).unwrap()) + .map(|x| ScalarValue::iter_to_array(x).unwrap()) .collect::>(); accum1.merge_batch(&state2)?; accum1.evaluate() From 4498faf99b32df68dec0d1617f979a0532affa85 Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 22:15:28 -0800 Subject: [PATCH 09/12] add more merge tests --- .../physical_plan/expressions/covariance.rs | 35 ++++++++ .../src/physical_plan/expressions/variance.rs | 89 +++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index 98af2b7ef1857..b2d78b8d4c72a 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -630,6 +630,41 @@ mod tests { Ok(()) } + #[test] + fn covariance_f64_merge_2() -> Result<()> { + let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64])); + let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64, 6_f64])); + let c = Arc::new(Float64Array::from(vec![None])); + let d = Arc::new(Float64Array::from(vec![None])); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Float64, false), + Field::new("b", DataType::Float64, false), + ]); + + let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b])?; + let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![c, d])?; + + let agg1 = Arc::new(CovariancePop::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let agg2 = Arc::new(CovariancePop::new( + col("a", &schema)?, + col("b", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let actual = merge(&batch1, &batch2, agg1, agg2)?; + assert!(actual == ScalarValue::from(0.6666666666666666)); + + Ok(()) + } + fn aggregate( batch: &RecordBatch, agg: Arc, diff --git a/datafusion/src/physical_plan/expressions/variance.rs b/datafusion/src/physical_plan/expressions/variance.rs index 75164405e537f..b5a5642d5fb79 100644 --- a/datafusion/src/physical_plan/expressions/variance.rs +++ b/datafusion/src/physical_plan/expressions/variance.rs @@ -605,6 +605,62 @@ mod tests { Ok(()) } + #[test] + fn variance_f64_merge_1() -> Result<()> { + let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64])); + let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64])); + + let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); + + let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?; + + let agg1 = Arc::new(VariancePop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let agg2 = Arc::new(VariancePop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let actual = merge(&batch1, &batch2, agg1, agg2)?; + assert!(actual == ScalarValue::from(2_f64)); + + Ok(()) + } + + #[test] + fn variance_f64_merge_2() -> Result<()> { + let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let b = Arc::new(Float64Array::from(vec![None])); + + let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); + + let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?; + + let agg1 = Arc::new(VariancePop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let agg2 = Arc::new(VariancePop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let actual = merge(&batch1, &batch2, agg1, agg2)?; + assert!(actual == ScalarValue::from(2_f64)); + + Ok(()) + } + fn aggregate( batch: &RecordBatch, agg: Arc, @@ -619,4 +675,37 @@ mod tests { accum.update_batch(&values)?; accum.evaluate() } + + fn merge( + batch1: &RecordBatch, + batch2: &RecordBatch, + agg1: Arc, + agg2: Arc, + ) -> Result { + let mut accum1 = agg1.create_accumulator()?; + let mut accum2 = agg2.create_accumulator()?; + let expr1 = agg1.expressions(); + let expr2 = agg2.expressions(); + + let values1 = expr1 + .iter() + .map(|e| e.evaluate(batch1)) + .map(|r| r.map(|v| v.into_array(batch1.num_rows()))) + .collect::>>()?; + let values2 = expr2 + .iter() + .map(|e| e.evaluate(batch2)) + .map(|r| r.map(|v| v.into_array(batch2.num_rows()))) + .collect::>>()?; + accum1.update_batch(&values1)?; + accum2.update_batch(&values2)?; + let state2 = accum2 + .state()? + .iter() + .map(|v| vec![v.clone()]) + .map(|x| ScalarValue::iter_to_array(x).unwrap()) + .collect::>(); + accum1.merge_batch(&state2)?; + accum1.evaluate() + } } From e9e40d08d9519cb8f1e79d13c65f0d645341d13d Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Tue, 11 Jan 2022 22:20:40 -0800 Subject: [PATCH 10/12] add merge tests for stddev --- .../src/physical_plan/expressions/stddev.rs | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/datafusion/src/physical_plan/expressions/stddev.rs b/datafusion/src/physical_plan/expressions/stddev.rs index 2c85b90491521..191ac07ecf07d 100644 --- a/datafusion/src/physical_plan/expressions/stddev.rs +++ b/datafusion/src/physical_plan/expressions/stddev.rs @@ -411,6 +411,62 @@ mod tests { Ok(()) } + #[test] + fn stddev_f64_merge_1() -> Result<()> { + let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64])); + let b = Arc::new(Float64Array::from(vec![4_f64, 5_f64])); + + let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); + + let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?; + + let agg1 = Arc::new(StddevPop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let agg2 = Arc::new(StddevPop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let actual = merge(&batch1, &batch2, agg1, agg2)?; + assert!(actual == ScalarValue::from(std::f64::consts::SQRT_2)); + + Ok(()) + } + + #[test] + fn stddev_f64_merge_2() -> Result<()> { + let a = Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64])); + let b = Arc::new(Float64Array::from(vec![None])); + + let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]); + + let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![a])?; + let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![b])?; + + let agg1 = Arc::new(StddevPop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let agg2 = Arc::new(StddevPop::new( + col("a", &schema)?, + "bla".to_string(), + DataType::Float64, + )); + + let actual = merge(&batch1, &batch2, agg1, agg2)?; + assert!(actual == ScalarValue::from(std::f64::consts::SQRT_2)); + + Ok(()) + } + fn aggregate( batch: &RecordBatch, agg: Arc, @@ -425,4 +481,37 @@ mod tests { accum.update_batch(&values)?; accum.evaluate() } + + fn merge( + batch1: &RecordBatch, + batch2: &RecordBatch, + agg1: Arc, + agg2: Arc, + ) -> Result { + let mut accum1 = agg1.create_accumulator()?; + let mut accum2 = agg2.create_accumulator()?; + let expr1 = agg1.expressions(); + let expr2 = agg2.expressions(); + + let values1 = expr1 + .iter() + .map(|e| e.evaluate(batch1)) + .map(|r| r.map(|v| v.into_array(batch1.num_rows()))) + .collect::>>()?; + let values2 = expr2 + .iter() + .map(|e| e.evaluate(batch2)) + .map(|r| r.map(|v| v.into_array(batch2.num_rows()))) + .collect::>>()?; + accum1.update_batch(&values1)?; + accum2.update_batch(&values2)?; + let state2 = accum2 + .state()? + .iter() + .map(|v| vec![v.clone()]) + .map(|x| ScalarValue::iter_to_array(x).unwrap()) + .collect::>(); + accum1.merge_batch(&state2)?; + accum1.evaluate() + } } From 524eb8c6d25198038398df299d955cde1c7f4271 Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Wed, 12 Jan 2022 22:41:58 -0800 Subject: [PATCH 11/12] lint --- .../physical_plan/expressions/covariance.rs | 32 ++++++++++++------- .../src/physical_plan/expressions/variance.rs | 14 ++++---- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index b2d78b8d4c72a..62c86f1450f2f 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -285,17 +285,25 @@ impl Accumulator for CovarianceAccumulator { let values1 = &cast(&values[0], &DataType::Float64)?; let values2 = &cast(&values[1], &DataType::Float64)?; - let arr1 = values1.as_any().downcast_ref::().unwrap(); - let arr2 = values2.as_any().downcast_ref::().unwrap(); + let mut arr1 = values1 + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .filter_map(|v| v); + let mut arr2 = values2 + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .filter_map(|v| v); - for i in 0..arr1.len() { - let value1 = arr1.value(i); - let value2 = arr2.value(i); + for _i in 0..values1.len() { + let value1 = arr1.next(); + let value2 = arr2.next(); - if (value1 == 0_f64 && values1.is_null(i)) - || (value2 == 0_f64 && values2.is_null(i)) - { - if values2.is_null(i) && values1.is_null(i) { + if value1 == None || value2 == None { + if value1 == None && value2 == None { continue; } else { return Err(DataFusionError::Internal( @@ -305,11 +313,11 @@ impl Accumulator for CovarianceAccumulator { } let new_count = self.count + 1; - let delta1 = value1 - self.mean1; + let delta1 = value1.unwrap() - self.mean1; let new_mean1 = delta1 / new_count as f64 + self.mean1; - let delta2 = value2 - self.mean2; + let delta2 = value2.unwrap() - self.mean2; let new_mean2 = delta2 / new_count as f64 + self.mean2; - let new_c = delta1 * (value2 - new_mean2) + self.algo_const; + let new_c = delta1 * (value2.unwrap() - new_mean2) + self.algo_const; self.count += 1; self.mean1 = new_mean1; diff --git a/datafusion/src/physical_plan/expressions/variance.rs b/datafusion/src/physical_plan/expressions/variance.rs index b5a5642d5fb79..fc90c3b87189f 100644 --- a/datafusion/src/physical_plan/expressions/variance.rs +++ b/datafusion/src/physical_plan/expressions/variance.rs @@ -255,14 +255,14 @@ impl Accumulator for VarianceAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &cast(&values[0], &DataType::Float64)?; - let arr = values.as_any().downcast_ref::().unwrap(); - - for i in 0..arr.len() { - let value = arr.value(i); + let arr = values + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .filter_map(|v| v); - if value == 0_f64 && values.is_null(i) { - continue; - } + for value in arr { let new_count = self.count + 1; let delta1 = value - self.mean; let new_mean = delta1 / new_count as f64 + self.mean; From 1b7810ec1e17ba13dcfac4ca9cd979759531cd44 Mon Sep 17 00:00:00 2001 From: Lin Ma Date: Wed, 12 Jan 2022 22:44:54 -0800 Subject: [PATCH 12/12] clippy --- datafusion/src/physical_plan/expressions/covariance.rs | 4 ++-- datafusion/src/physical_plan/expressions/variance.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/covariance.rs b/datafusion/src/physical_plan/expressions/covariance.rs index 62c86f1450f2f..454dc4351776a 100644 --- a/datafusion/src/physical_plan/expressions/covariance.rs +++ b/datafusion/src/physical_plan/expressions/covariance.rs @@ -290,13 +290,13 @@ impl Accumulator for CovarianceAccumulator { .downcast_ref::() .unwrap() .iter() - .filter_map(|v| v); + .flatten(); let mut arr2 = values2 .as_any() .downcast_ref::() .unwrap() .iter() - .filter_map(|v| v); + .flatten(); for _i in 0..values1.len() { let value1 = arr1.next(); diff --git a/datafusion/src/physical_plan/expressions/variance.rs b/datafusion/src/physical_plan/expressions/variance.rs index fc90c3b87189f..ff9b737fc1bc3 100644 --- a/datafusion/src/physical_plan/expressions/variance.rs +++ b/datafusion/src/physical_plan/expressions/variance.rs @@ -260,7 +260,7 @@ impl Accumulator for VarianceAccumulator { .downcast_ref::() .unwrap() .iter() - .filter_map(|v| v); + .flatten(); for value in arr { let new_count = self.count + 1;