Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sql-parser/src/ast/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub trait AstDisplay {
}

// Derive a fmt::Display implementation for types implementing AstDisplay.
#[macro_export]
macro_rules! impl_display {
($name:ident) => {
impl std::fmt::Display for $name {
Expand Down
41 changes: 17 additions & 24 deletions src/sql/src/postgres_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

use anyhow::anyhow;

use sql_parser::ast::display::{AstDisplay, AstFormatter};
use sql_parser::impl_display;
use tokio_postgres::types::Type as PgType;
use tokio_postgres::NoTls;

Expand All @@ -21,6 +23,21 @@ pub struct PgColumn {
nullable: bool,
}

impl AstDisplay for PgColumn {
fn fmt(&self, f: &mut AstFormatter) {
f.write_str(&self.name);
f.write_str(" ");
f.write_str(&self.scalar_type);
f.write_str(" ");
if self.nullable {
f.write_str("NULL");
} else {
f.write_str("NOT NULL");
}
}
}
impl_display!(PgColumn);

/// Fetches column information from an upstream Postgres source, given
/// a connection string, a namespace, and a target table.
///
Expand Down Expand Up @@ -51,8 +68,6 @@ pub async fn fetch_columns(
.ok_or_else(|| anyhow!("table not found in the upstream catalog"))?
.get(0);

// todo@jldlaughlin: fetch all constraints, so we correctly error in `plan_create_source` if they
// are present.
Ok(client
.query(
"SELECT a.attname, a.atttypid, a.attnotnull
Expand All @@ -79,25 +94,3 @@ pub async fn fetch_columns(
})
.collect::<Result<Vec<_>, anyhow::Error>>()?)
}

/// Stringifies `PgColumn` information to appear as they would have been written in text.
pub fn format_columns(columns: Vec<PgColumn>) -> String {
let nullable = |nullable| {
if nullable {
"NULL"
} else {
"NOT NULL"
}
};

let mut formatted_columns = Vec::with_capacity(columns.len());
for c in columns {
formatted_columns.push(format!(
"{} {} {}",
c.name,
c.scalar_type,
nullable(c.nullable)
));
}
format!("({})", formatted_columns.join(","))
}
10 changes: 8 additions & 2 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::task;
use tokio::time::Duration;

use repr::strconv;
use sql_parser::ast::display::AstDisplay;
use sql_parser::ast::{
AvroSchema, ColumnOption, ColumnOptionDef, Connector, CreateSourceStatement, CsrSeed, Format,
Ident, Raw, Statement,
Expand Down Expand Up @@ -111,9 +112,14 @@ pub async fn purify(mut stmt: Statement<Raw>) -> Result<Statement<Raw>, anyhow::
columns,
..
} => {
let fetched_columns = postgres_util::fetch_columns(conn, namespace, table).await?;
let fetched_columns = postgres_util::fetch_columns(conn, namespace, table)
.await?
.iter()
.map(|c| c.to_ast_string())
.collect::<Vec<String>>()
.join(", ");
let (upstream_columns, _constraints) =
parse_columns(&postgres_util::format_columns(fetched_columns))?;
parse_columns(&format!("({})", fetched_columns))?;
if columns.is_empty() {
*columns = upstream_columns;
} else {
Expand Down