From a419352dd123da8136e998aa64427b6f9e0baa79 Mon Sep 17 00:00:00 2001 From: JLDLaughlin Date: Tue, 30 Mar 2021 14:56:55 -0400 Subject: [PATCH] sql: add AstDisplay impl for PgColumn --- src/sql-parser/src/ast/display.rs | 1 + src/sql/src/postgres_util.rs | 41 +++++++++++++------------------ src/sql/src/pure.rs | 10 ++++++-- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/sql-parser/src/ast/display.rs b/src/sql-parser/src/ast/display.rs index 0c379a03780e2..8d21bd4a79acc 100644 --- a/src/sql-parser/src/ast/display.rs +++ b/src/sql-parser/src/ast/display.rs @@ -117,6 +117,7 @@ pub trait AstDisplay { } // Derive a fmt::Display implementation for types implementing AstDisplay. +#[macro_export] macro_rules! impl_display { ($name:ident) => { impl std::fmt::Display for $name { diff --git a/src/sql/src/postgres_util.rs b/src/sql/src/postgres_util.rs index 9f52d42312630..726dd77113011 100644 --- a/src/sql/src/postgres_util.rs +++ b/src/sql/src/postgres_util.rs @@ -11,6 +11,8 @@ use anyhow::anyhow; +use sql_parser::ast::display::{AstDisplay, AstFormatter}; +use sql_parser::impl_display; use tokio_postgres::types::Type as PgType; use tokio_postgres::NoTls; @@ -21,6 +23,21 @@ pub struct PgColumn { nullable: bool, } +impl AstDisplay for PgColumn { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str(&self.name); + f.write_str(" "); + f.write_str(&self.scalar_type); + f.write_str(" "); + if self.nullable { + f.write_str("NULL"); + } else { + f.write_str("NOT NULL"); + } + } +} +impl_display!(PgColumn); + /// Fetches column information from an upstream Postgres source, given /// a connection string, a namespace, and a target table. /// @@ -51,8 +68,6 @@ pub async fn fetch_columns( .ok_or_else(|| anyhow!("table not found in the upstream catalog"))? .get(0); - // todo@jldlaughlin: fetch all constraints, so we correctly error in `plan_create_source` if they - // are present. Ok(client .query( "SELECT a.attname, a.atttypid, a.attnotnull @@ -79,25 +94,3 @@ pub async fn fetch_columns( }) .collect::, anyhow::Error>>()?) } - -/// Stringifies `PgColumn` information to appear as they would have been written in text. -pub fn format_columns(columns: Vec) -> String { - let nullable = |nullable| { - if nullable { - "NULL" - } else { - "NOT NULL" - } - }; - - let mut formatted_columns = Vec::with_capacity(columns.len()); - for c in columns { - formatted_columns.push(format!( - "{} {} {}", - c.name, - c.scalar_type, - nullable(c.nullable) - )); - } - format!("({})", formatted_columns.join(",")) -} diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 45722aa2a0187..f38b867d20f44 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -21,6 +21,7 @@ use tokio::task; use tokio::time::Duration; use repr::strconv; +use sql_parser::ast::display::AstDisplay; use sql_parser::ast::{ AvroSchema, ColumnOption, ColumnOptionDef, Connector, CreateSourceStatement, CsrSeed, Format, Ident, Raw, Statement, @@ -111,9 +112,14 @@ pub async fn purify(mut stmt: Statement) -> Result, anyhow:: columns, .. } => { - let fetched_columns = postgres_util::fetch_columns(conn, namespace, table).await?; + let fetched_columns = postgres_util::fetch_columns(conn, namespace, table) + .await? + .iter() + .map(|c| c.to_ast_string()) + .collect::>() + .join(", "); let (upstream_columns, _constraints) = - parse_columns(&postgres_util::format_columns(fetched_columns))?; + parse_columns(&format!("({})", fetched_columns))?; if columns.is_empty() { *columns = upstream_columns; } else {