From 822ada6b535f0fa50ffc3e364ecdc154b2466722 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 21 Mar 2022 14:23:33 -0400 Subject: [PATCH 1/3] Allow register_catalog to return an error --- datafusion/src/catalog/catalog.rs | 54 ++++++++++++++++++-- datafusion/src/catalog/information_schema.rs | 3 +- datafusion/src/catalog/schema.rs | 2 +- datafusion/src/execution/context.rs | 24 ++++++--- datafusion/tests/sql/information_schema.rs | 12 +++-- 5 files changed, 78 insertions(+), 17 deletions(-) diff --git a/datafusion/src/catalog/catalog.rs b/datafusion/src/catalog/catalog.rs index 35054dcb4292a..9a932ee35e1c1 100644 --- a/datafusion/src/catalog/catalog.rs +++ b/datafusion/src/catalog/catalog.rs @@ -19,6 +19,7 @@ //! representing collections of named schemas. use crate::catalog::schema::SchemaProvider; +use datafusion_common::{DataFusionError, Result}; use parking_lot::RwLock; use std::any::Any; use std::collections::HashMap; @@ -110,12 +111,23 @@ pub trait CatalogProvider: Sync + Send { fn schema(&self, name: &str) -> Option>; /// Adds a new schema to this catalog. - /// If a schema of the same name existed before, it is replaced in the catalog and returned. + /// + /// If a schema of the same name existed before, it is replaced in + /// the catalog and returned. + /// + /// By default returns a "Not Implemented" error fn register_schema( &self, name: &str, schema: Arc, - ) -> Option>; + ) -> Result>> { + // use variables to avoid unused variable warnings + let _ = name; + let _ = schema; + Err(DataFusionError::NotImplemented( + "Registering new schemas is not supported".to_string(), + )) + } } /// Simple in-memory implementation of a catalog. @@ -151,8 +163,42 @@ impl CatalogProvider for MemoryCatalogProvider { &self, name: &str, schema: Arc, - ) -> Option> { + ) -> Result>> { let mut schemas = self.schemas.write(); - schemas.insert(name.into(), schema) + Ok(schemas.insert(name.into(), schema)) + } +} + +#[cfg(test)] +mod tests { + use crate::catalog::schema::MemorySchemaProvider; + + use super::*; + + #[test] + fn default_register_schema_not_supported() { + // mimic a new CatalogProvider and ensure it does not support registering schemas + struct TestProvider {} + impl CatalogProvider for TestProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + unimplemented!() + } + + fn schema(&self, _name: &str) -> Option> { + unimplemented!() + } + } + + let schema = Arc::new(MemorySchemaProvider::new()) as _; + let catalog = Arc::new(TestProvider {}); + + match catalog.register_schema("foo", schema) { + Ok(_) => panic!("unexpected OK"), + Err(e) => assert_eq!(e.to_string(), "This feature is not implemented: Registering new schemas is not supported"), + }; } } diff --git a/datafusion/src/catalog/information_schema.rs b/datafusion/src/catalog/information_schema.rs index 4a899c91579de..38306150a9da3 100644 --- a/datafusion/src/catalog/information_schema.rs +++ b/datafusion/src/catalog/information_schema.rs @@ -29,6 +29,7 @@ use arrow::{ datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; +use datafusion_common::Result; use crate::datasource::{MemTable, TableProvider, TableType}; @@ -89,7 +90,7 @@ impl CatalogProvider for CatalogWithInformationSchema { &self, name: &str, schema: Arc, - ) -> Option> { + ) -> Result>> { let catalog = &self.inner; catalog.register_schema(name, schema) } diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs index ece61c8430b51..24830a8d60b7e 100644 --- a/datafusion/src/catalog/schema.rs +++ b/datafusion/src/catalog/schema.rs @@ -289,7 +289,7 @@ mod tests { .unwrap(); let catalog = MemoryCatalogProvider::new(); - catalog.register_schema("active", Arc::new(schema)); + catalog.register_schema("active", Arc::new(schema)).unwrap(); let mut ctx = SessionContext::new(); diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 7863eb46287f1..dbde751db1727 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -302,7 +302,7 @@ impl SessionContext { } (true, None) | (false, None) => { let schema = Arc::new(MemorySchemaProvider::new()); - catalog.register_schema(&schema_name, schema); + catalog.register_schema(&schema_name, schema)?; let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) } @@ -996,10 +996,12 @@ impl SessionState { if config.create_default_catalog_and_schema { let default_catalog = MemoryCatalogProvider::new(); - default_catalog.register_schema( - &config.default_schema, - Arc::new(MemorySchemaProvider::new()), - ); + default_catalog + .register_schema( + &config.default_schema, + Arc::new(MemorySchemaProvider::new()), + ) + .expect("memory catalog provider can register schema"); let default_catalog: Arc = if config.information_schema { Arc::new(CatalogWithInformationSchema::new( @@ -2948,7 +2950,9 @@ mod tests { schema .register_table("test".to_owned(), test::table_with_sequence(1, 1).unwrap()) .unwrap(); - catalog.register_schema("my_schema", Arc::new(schema)); + catalog + .register_schema("my_schema", Arc::new(schema)) + .unwrap(); ctx.register_catalog("my_catalog", Arc::new(catalog)); for table_ref in &["my_catalog.my_schema.test", "my_schema.test", "test"] { @@ -2978,14 +2982,18 @@ mod tests { let schema_a = MemorySchemaProvider::new(); schema_a .register_table("table_a".to_owned(), test::table_with_sequence(1, 1)?)?; - catalog_a.register_schema("schema_a", Arc::new(schema_a)); + catalog_a + .register_schema("schema_a", Arc::new(schema_a)) + .unwrap(); ctx.register_catalog("catalog_a", Arc::new(catalog_a)); let catalog_b = MemoryCatalogProvider::new(); let schema_b = MemorySchemaProvider::new(); schema_b .register_table("table_b".to_owned(), test::table_with_sequence(1, 2)?)?; - catalog_b.register_schema("schema_b", Arc::new(schema_b)); + catalog_b + .register_schema("schema_b", Arc::new(schema_b)) + .unwrap(); ctx.register_catalog("catalog_b", Arc::new(catalog_b)); let result = plan_and_collect( diff --git a/datafusion/tests/sql/information_schema.rs b/datafusion/tests/sql/information_schema.rs index 00c691aacb5d8..57ef9ba3b65d4 100644 --- a/datafusion/tests/sql/information_schema.rs +++ b/datafusion/tests/sql/information_schema.rs @@ -117,7 +117,9 @@ async fn information_schema_tables_tables_with_multiple_catalogs() { schema .register_table("t2".to_owned(), table_with_sequence(1, 1).unwrap()) .unwrap(); - catalog.register_schema("my_schema", Arc::new(schema)); + catalog + .register_schema("my_schema", Arc::new(schema)) + .unwrap(); ctx.register_catalog("my_catalog", Arc::new(catalog)); let catalog = MemoryCatalogProvider::new(); @@ -125,7 +127,9 @@ async fn information_schema_tables_tables_with_multiple_catalogs() { schema .register_table("t3".to_owned(), table_with_sequence(1, 1).unwrap()) .unwrap(); - catalog.register_schema("my_other_schema", Arc::new(schema)); + catalog + .register_schema("my_other_schema", Arc::new(schema)) + .unwrap(); ctx.register_catalog("my_other_catalog", Arc::new(catalog)); let result = plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") @@ -460,7 +464,9 @@ async fn information_schema_columns() { schema .register_table("t2".to_owned(), table_with_many_types()) .unwrap(); - catalog.register_schema("my_schema", Arc::new(schema)); + catalog + .register_schema("my_schema", Arc::new(schema)) + .unwrap(); ctx.register_catalog("my_catalog", Arc::new(catalog)); let result = plan_and_collect(&mut ctx, "SELECT * from information_schema.columns") From e9badb258709132b3f30dbfd6e255785c1955b78 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Mar 2022 08:48:23 -0400 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Yijie Shen --- datafusion/src/execution/context.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index dbde751db1727..dc19936055fe5 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -2983,8 +2983,7 @@ mod tests { schema_a .register_table("table_a".to_owned(), test::table_with_sequence(1, 1)?)?; catalog_a - .register_schema("schema_a", Arc::new(schema_a)) - .unwrap(); + .register_schema("schema_a", Arc::new(schema_a))?; ctx.register_catalog("catalog_a", Arc::new(catalog_a)); let catalog_b = MemoryCatalogProvider::new(); @@ -2992,8 +2991,7 @@ mod tests { schema_b .register_table("table_b".to_owned(), test::table_with_sequence(1, 2)?)?; catalog_b - .register_schema("schema_b", Arc::new(schema_b)) - .unwrap(); + .register_schema("schema_b", Arc::new(schema_b))?; ctx.register_catalog("catalog_b", Arc::new(catalog_b)); let result = plan_and_collect( From 2c48eb96eabe3778c532b08a845e3602d71d160a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 22 Mar 2022 09:01:52 -0400 Subject: [PATCH 3/3] fix: fmt --- datafusion/src/execution/context.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index dc19936055fe5..0b678437a1af5 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -2982,16 +2982,14 @@ mod tests { let schema_a = MemorySchemaProvider::new(); schema_a .register_table("table_a".to_owned(), test::table_with_sequence(1, 1)?)?; - catalog_a - .register_schema("schema_a", Arc::new(schema_a))?; + catalog_a.register_schema("schema_a", Arc::new(schema_a))?; ctx.register_catalog("catalog_a", Arc::new(catalog_a)); let catalog_b = MemoryCatalogProvider::new(); let schema_b = MemorySchemaProvider::new(); schema_b .register_table("table_b".to_owned(), test::table_with_sequence(1, 2)?)?; - catalog_b - .register_schema("schema_b", Arc::new(schema_b))?; + catalog_b.register_schema("schema_b", Arc::new(schema_b))?; ctx.register_catalog("catalog_b", Arc::new(catalog_b)); let result = plan_and_collect(