Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions datafusion/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! representing collections of named schemas.

use crate::catalog::schema::SchemaProvider;
use datafusion_common::{DataFusionError, Result};
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -110,12 +111,23 @@ pub trait CatalogProvider: Sync + Send {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;

/// Adds a new schema to this catalog.
/// If a schema of the same name existed before, it is replaced in the catalog and returned.
///
/// If a schema of the same name existed before, it is replaced in
/// the catalog and returned.
///
/// By default returns a "Not Implemented" error
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>>;
) -> Result<Option<Arc<dyn SchemaProvider>>> {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the change -- return Result here (and add a default impl that returns error)

// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
}
}

/// Simple in-memory implementation of a catalog.
Expand Down Expand Up @@ -151,8 +163,42 @@ impl CatalogProvider for MemoryCatalogProvider {
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let mut schemas = self.schemas.write();
schemas.insert(name.into(), schema)
Ok(schemas.insert(name.into(), schema))
}
}

#[cfg(test)]
mod tests {
use crate::catalog::schema::MemorySchemaProvider;

use super::*;

#[test]
fn default_register_schema_not_supported() {
// mimic a new CatalogProvider and ensure it does not support registering schemas
struct TestProvider {}
impl CatalogProvider for TestProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
unimplemented!()
}

fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
unimplemented!()
}
}

let schema = Arc::new(MemorySchemaProvider::new()) as _;
let catalog = Arc::new(TestProvider {});

match catalog.register_schema("foo", schema) {
Ok(_) => panic!("unexpected OK"),
Err(e) => assert_eq!(e.to_string(), "This feature is not implemented: Registering new schemas is not supported"),
};
}
}
3 changes: 2 additions & 1 deletion datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow::{
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use datafusion_common::Result;

use crate::datasource::{MemTable, TableProvider, TableType};

Expand Down Expand Up @@ -89,7 +90,7 @@ impl CatalogProvider for CatalogWithInformationSchema {
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let catalog = &self.inner;
catalog.register_schema(name, schema)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ mod tests {
.unwrap();

let catalog = MemoryCatalogProvider::new();
catalog.register_schema("active", Arc::new(schema));
catalog.register_schema("active", Arc::new(schema)).unwrap();

let mut ctx = SessionContext::new();

Expand Down
20 changes: 12 additions & 8 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl SessionContext {
}
(true, None) | (false, None) => {
let schema = Arc::new(MemorySchemaProvider::new());
catalog.register_schema(&schema_name, schema);
catalog.register_schema(&schema_name, schema)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
Expand Down Expand Up @@ -996,10 +996,12 @@ impl SessionState {
if config.create_default_catalog_and_schema {
let default_catalog = MemoryCatalogProvider::new();

default_catalog.register_schema(
&config.default_schema,
Arc::new(MemorySchemaProvider::new()),
);
default_catalog
.register_schema(
&config.default_schema,
Arc::new(MemorySchemaProvider::new()),
)
.expect("memory catalog provider can register schema");

let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Expand Down Expand Up @@ -2948,7 +2950,9 @@ mod tests {
schema
.register_table("test".to_owned(), test::table_with_sequence(1, 1).unwrap())
.unwrap();
catalog.register_schema("my_schema", Arc::new(schema));
catalog
.register_schema("my_schema", Arc::new(schema))
.unwrap();
ctx.register_catalog("my_catalog", Arc::new(catalog));

for table_ref in &["my_catalog.my_schema.test", "my_schema.test", "test"] {
Expand Down Expand Up @@ -2978,14 +2982,14 @@ mod tests {
let schema_a = MemorySchemaProvider::new();
schema_a
.register_table("table_a".to_owned(), test::table_with_sequence(1, 1)?)?;
catalog_a.register_schema("schema_a", Arc::new(schema_a));
catalog_a.register_schema("schema_a", Arc::new(schema_a))?;
ctx.register_catalog("catalog_a", Arc::new(catalog_a));

let catalog_b = MemoryCatalogProvider::new();
let schema_b = MemorySchemaProvider::new();
schema_b
.register_table("table_b".to_owned(), test::table_with_sequence(1, 2)?)?;
catalog_b.register_schema("schema_b", Arc::new(schema_b));
catalog_b.register_schema("schema_b", Arc::new(schema_b))?;
ctx.register_catalog("catalog_b", Arc::new(catalog_b));

let result = plan_and_collect(
Expand Down
12 changes: 9 additions & 3 deletions datafusion/tests/sql/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,19 @@ async fn information_schema_tables_tables_with_multiple_catalogs() {
schema
.register_table("t2".to_owned(), table_with_sequence(1, 1).unwrap())
.unwrap();
catalog.register_schema("my_schema", Arc::new(schema));
catalog
.register_schema("my_schema", Arc::new(schema))
.unwrap();
ctx.register_catalog("my_catalog", Arc::new(catalog));

let catalog = MemoryCatalogProvider::new();
let schema = MemorySchemaProvider::new();
schema
.register_table("t3".to_owned(), table_with_sequence(1, 1).unwrap())
.unwrap();
catalog.register_schema("my_other_schema", Arc::new(schema));
catalog
.register_schema("my_other_schema", Arc::new(schema))
.unwrap();
ctx.register_catalog("my_other_catalog", Arc::new(catalog));

let result = plan_and_collect(&mut ctx, "SELECT * from information_schema.tables")
Expand Down Expand Up @@ -460,7 +464,9 @@ async fn information_schema_columns() {
schema
.register_table("t2".to_owned(), table_with_many_types())
.unwrap();
catalog.register_schema("my_schema", Arc::new(schema));
catalog
.register_schema("my_schema", Arc::new(schema))
.unwrap();
ctx.register_catalog("my_catalog", Arc::new(catalog));

let result = plan_and_collect(&mut ctx, "SELECT * from information_schema.columns")
Expand Down