Skip to content
Merged
3 changes: 0 additions & 3 deletions continuous_integration/gpuci/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ python -m pip install git+https://github.com/dask/dask
gpuci_logger "Install distributed"
python -m pip install git+https://github.com/dask/distributed

gpuci_logger "Install latest dask-cuda"
gpuci_mamba_retry update -y -c rapidsai-nightly dask-cuda

Comment on lines -55 to -57

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this should be necessary anymore? Removing because the recent pinning of dask/distributed in dask-cuda nightlies is causing GPU failures due to the order of install here

gpuci_logger "Install dask-sql"
pip install -e ".[dev]"

Expand Down
41 changes: 32 additions & 9 deletions dask_planner/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,17 @@ pub struct DescribeModel {
/// Dask-SQL extension DDL for `SHOW SCHEMAS`
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShowSchemas {
/// like
/// optional catalog name
pub catalog_name: Option<String>,
/// optional LIKE identifier
pub like: Option<String>,
}

/// Dask-SQL extension DDL for `SHOW TABLES FROM`
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShowTables {
/// schema name
/// catalog and schema name, i.e. 'catalog_name.schema_name'
pub catalog_name: Option<String>,
pub schema_name: Option<String>,
}

Expand Down Expand Up @@ -1205,8 +1208,23 @@ impl<'a> DaskParser<'a> {

/// Parse Dask-SQL SHOW SCHEMAS statement
fn parse_show_schemas(&mut self) -> Result<DaskStatement, ParserError> {
// Check for existence of `LIKE` clause
let like_val = match self.parser.peek_token().token {
// parse optional `FROM` clause
let catalog_name = match self.parser.peek_token().token {
Token::Word(w) => {
match w.keyword {
Keyword::FROM => {
// move one token forward
self.parser.next_token();
// use custom parsing
Some(self.parser.parse_identifier()?.value)
}
_ => None,
}
}
_ => None,
};
// parse optional `LIKE` clause
let like = match self.parser.peek_token().token {
Token::Word(w) => {
match w.keyword {
Keyword::LIKE => {
Expand All @@ -1222,18 +1240,23 @@ impl<'a> DaskParser<'a> {
};

Ok(DaskStatement::ShowSchemas(Box::new(ShowSchemas {
like: like_val,
catalog_name,
like,
})))
}

/// Parse Dask-SQL SHOW TABLES [FROM] statement
fn parse_show_tables(&mut self) -> Result<DaskStatement, ParserError> {
let mut schema_name = None;
if !self.parser.consume_token(&Token::EOF) {
schema_name = Some(self.parser.parse_identifier()?.value);
if let Ok(obj_name) = &self.parser.parse_object_name() {
let (catalog_name, schema_name) = DaskParserUtils::elements_from_object_name(obj_name)?;
return Ok(DaskStatement::ShowTables(Box::new(ShowTables {
catalog_name,
schema_name: Some(schema_name),
})));
}
Ok(DaskStatement::ShowTables(Box::new(ShowTables {
schema_name,
catalog_name: None,
schema_name: None,
})))
}

Expand Down
4 changes: 3 additions & 1 deletion dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::{
predict_model::PredictModelPlanNode,
show_columns::ShowColumnsPlanNode,
show_models::ShowModelsPlanNode,
show_schema::ShowSchemasPlanNode,
show_schemas::ShowSchemasPlanNode,
show_tables::ShowTablesPlanNode,
PyLogicalPlan,
},
Expand Down Expand Up @@ -626,12 +626,14 @@ impl DaskSQLContext {
DaskStatement::ShowSchemas(show_schemas) => Ok(LogicalPlan::Extension(Extension {
node: Arc::new(ShowSchemasPlanNode {
schema: Arc::new(DFSchema::empty()),
catalog_name: show_schemas.catalog_name,
like: show_schemas.like,
}),
})),
DaskStatement::ShowTables(show_tables) => Ok(LogicalPlan::Extension(Extension {
node: Arc::new(ShowTablesPlanNode {
schema: Arc::new(DFSchema::empty()),
catalog_name: show_tables.catalog_name,
schema_name: show_tables.schema_name,
}),
})),
Expand Down
6 changes: 3 additions & 3 deletions dask_planner/src/sql/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod projection;
pub mod repartition_by;
pub mod show_columns;
pub mod show_models;
pub mod show_schema;
pub mod show_schemas;
pub mod show_tables;
pub mod sort;
pub mod subquery_alias;
Expand All @@ -54,7 +54,7 @@ use self::{
predict_model::PredictModelPlanNode,
show_columns::ShowColumnsPlanNode,
show_models::ShowModelsPlanNode,
show_schema::ShowSchemasPlanNode,
show_schemas::ShowSchemasPlanNode,
show_tables::ShowTablesPlanNode,
use_schema::UseSchemaPlanNode,
};
Expand Down Expand Up @@ -177,7 +177,7 @@ impl PyLogicalPlan {
}

/// LogicalPlan::Extension::ShowSchemas as PyShowSchemas
pub fn show_schemas(&self) -> PyResult<show_schema::PyShowSchema> {
pub fn show_schemas(&self) -> PyResult<show_schemas::PyShowSchema> {
to_py_plan(self.current_node.as_ref())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::sql::{exceptions::py_type_err, logical};
#[derive(Clone)]
pub struct ShowSchemasPlanNode {
pub schema: DFSchemaRef,
pub catalog_name: Option<String>,
pub like: Option<String>,
}

Expand Down Expand Up @@ -44,7 +45,7 @@ impl UserDefinedLogicalNode for ShowSchemasPlanNode {
}

fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ShowSchema")
write!(f, "ShowSchema: catalog_name: {:?}", self.catalog_name)
}

fn from_template(
Expand All @@ -54,6 +55,7 @@ impl UserDefinedLogicalNode for ShowSchemasPlanNode {
) -> Arc<dyn UserDefinedLogicalNode> {
Arc::new(ShowSchemasPlanNode {
schema: Arc::new(DFSchema::empty()),
catalog_name: self.catalog_name.clone(),
like: self.like.clone(),
})
}
Expand All @@ -66,9 +68,14 @@ pub struct PyShowSchema {

#[pymethods]
impl PyShowSchema {
#[pyo3(name = "getCatalogName")]
fn get_from(&self) -> PyResult<Option<String>> {
Ok(self.show_schema.catalog_name.clone())
}

#[pyo3(name = "getLike")]
fn get_like(&self) -> PyResult<String> {
Ok(self.show_schema.like.as_ref().cloned().unwrap_or_default())
fn get_like(&self) -> PyResult<Option<String>> {
Comment thread
charlesbluca marked this conversation as resolved.
Ok(self.show_schema.like.clone())
}
}

Expand Down
13 changes: 12 additions & 1 deletion dask_planner/src/sql/logical/show_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::sql::{exceptions::py_type_err, logical};
#[derive(Clone)]
pub struct ShowTablesPlanNode {
pub schema: DFSchemaRef,
pub catalog_name: Option<String>,
pub schema_name: Option<String>,
}

Expand Down Expand Up @@ -44,7 +45,11 @@ impl UserDefinedLogicalNode for ShowTablesPlanNode {
}

fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ShowTables: schema_name: {:?}", self.schema_name)
write!(
f,
"ShowTables: catalog_name: {:?}, schema_name: {:?}",
self.catalog_name, self.schema_name
)
}

fn from_template(
Expand All @@ -54,6 +59,7 @@ impl UserDefinedLogicalNode for ShowTablesPlanNode {
) -> Arc<dyn UserDefinedLogicalNode> {
Arc::new(ShowTablesPlanNode {
schema: Arc::new(DFSchema::empty()),
catalog_name: self.catalog_name.clone(),
schema_name: self.schema_name.clone(),
})
}
Expand All @@ -66,6 +72,11 @@ pub struct PyShowTables {

#[pymethods]
impl PyShowTables {
#[pyo3(name = "getCatalogName")]
fn get_catalog_name(&self) -> PyResult<Option<String>> {
Ok(self.show_tables.catalog_name.clone())
}

#[pyo3(name = "getSchemaName")]
fn get_schema_name(&self) -> PyResult<Option<String>> {
Ok(self.show_tables.schema_name.clone())
Expand Down
2 changes: 1 addition & 1 deletion dask_sql/physical/rel/custom/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from .drop_table import DropTablePlugin
from .export_model import ExportModelPlugin
from .predict_model import PredictModelPlugin
from .schemas import ShowSchemasPlugin
from .show_columns import ShowColumnsPlugin
from .show_models import ShowModelsPlugin
from .show_schemas import ShowSchemasPlugin
from .show_tables import ShowTablesPlugin
from .use_schema import UseSchemaPlugin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@ class ShowSchemasPlugin(BaseRelPlugin):
Show all schemas.
The SQL is:

SHOW SCHEMAS
SHOW SCHEMAS [FROM <catalog-name>] [LIKE <>]

The result is also a table, although it is created on the fly.
"""

class_name = "ShowSchemas"

def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:

show_schemas = rel.show_schemas()

# "information_schema" is a schema which is found in every presto database
schemas = list(context.schema.keys())
schemas.append("information_schema")
df = pd.DataFrame({"Schema": schemas})

# We currently do not use the passed additional parameter FROM.
# currently catalogs other than the default `dask_sql` are not supported
catalog_name = show_schemas.getCatalogName() or context.catalog_name
if catalog_name != context.catalog_name:
raise RuntimeError(
f"A catalog with the name {catalog_name} is not present."
)

# filter by LIKE value
like = str(show_schemas.getLike()).strip("'")
if like and like != "None":
df = df[df.Schema == like]
Expand Down
13 changes: 11 additions & 2 deletions dask_sql/physical/rel/custom/show_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ShowTablesPlugin(BaseRelPlugin):
Show all tables currently defined for a given schema.
The SQL is:

SHOW TABLES FROM <schema>
SHOW TABLES FROM [<catalog>.]<schema>

Please note that dask-sql currently
only allows for a single schema (called "schema").
Expand All @@ -27,7 +27,16 @@ class ShowTablesPlugin(BaseRelPlugin):
class_name = "ShowTables"

def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContainer:
schema_name = rel.show_tables().getSchemaName() or context.schema_name
show_tables = rel.show_tables()

# currently catalogs other than the default `dask_sql` are not supported
catalog_name = show_tables.getCatalogName() or context.catalog_name
if catalog_name != context.catalog_name:
raise RuntimeError(
f"A catalog with the name {catalog_name} is not present."
)

schema_name = show_tables.getSchemaName() or context.schema_name

if schema_name not in context.schema:
raise AttributeError(f"Schema {schema_name} is not defined.")
Expand Down
41 changes: 32 additions & 9 deletions tests/integration/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,38 @@


def test_schemas(c):
result_df = c.sql("SHOW SCHEMAS")
expected_df = pd.DataFrame({"Schema": [c.schema_name, "information_schema"]})

assert_eq(result_df, expected_df)
assert_eq(c.sql("SHOW SCHEMAS"), expected_df)
assert_eq(c.sql(f"SHOW SCHEMAS FROM {c.catalog_name}"), expected_df)

result_df = c.sql("SHOW SCHEMAS LIKE 'information_schema'")
expected_df = pd.DataFrame({"Schema": ["information_schema"]})

assert_eq(result_df, expected_df, check_index=False)
assert_eq(
c.sql("SHOW SCHEMAS LIKE 'information_schema'"), expected_df, check_index=False
)
assert_eq(
c.sql(f"SHOW SCHEMAS FROM {c.catalog_name} LIKE 'information_schema'"),
expected_df,
check_index=False,
)


@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_tables(gpu):
c = Context()
c.create_table("table", pd.DataFrame(), gpu=gpu)

result_df = c.sql(f'SHOW TABLES FROM "{c.schema_name}"')
expected_df = pd.DataFrame({"Table": ["table"]})

assert_eq(result_df, expected_df, check_index=False)
assert_eq(
c.sql(f'SHOW TABLES FROM "{c.schema_name}"'), expected_df, check_index=False
)
assert_eq(
c.sql(f'SHOW TABLES FROM "{c.catalog_name}"."{c.schema_name}"'),
expected_df,
check_index=False,
)


def test_columns(c):
Expand Down Expand Up @@ -59,14 +71,25 @@ def test_wrong_input(c):
c.sql(f'SHOW COLUMNS FROM "{c.schema_name}"."table"')
with pytest.raises(AttributeError):
c.sql('SHOW TABLES FROM "wrong"')
with pytest.raises(RuntimeError):
c.sql(f'SHOW TABLES FROM "wrong"."{c.schema_name}"')
Comment thread
charlesbluca marked this conversation as resolved.
with pytest.raises(RuntimeError):
c.sql('SHOW SCHEMAS FROM "wrong"')


def test_show_tables_no_schema(c):
def test_show_tables(c):
c = Context()

df = pd.DataFrame({"id": [0, 1]})
c.create_table("test", df)

actual_df = c.sql("show tables").compute()
expected_df = pd.DataFrame({"Table": ["test"]})
assert_eq(actual_df, expected_df)

# no schema specified
assert_eq(c.sql("show tables"), expected_df)

# unqualified schema
assert_eq(c.sql("show tables from root"), expected_df)

# qualified schema
assert_eq(c.sql("show tables from dask_sql.root"), expected_df)