getColumns() {
return columns;
}
+ private String getTypeString() {
+ switch (getType()) {
+ case PRIMARY_KEY:
+ return "PRIMARY KEY";
+ case UNIQUE_KEY:
+ return "UNIQUE";
+ default:
+ throw new IllegalStateException("Unknown key type: " + getType());
+ }
+ }
+
@Override
public ConstraintType getType() {
return type;
@@ -67,27 +78,15 @@ public ConstraintType getType() {
*
* CONSTRAINT [constraint-name] [constraint-type] ([constraint-definition])
*
- * E.g CONSTRAINT pk PRIMARY KEY (f0, f1) NOT ENFORCED
+ * E.g CONSTRAINT pk PRIMARY KEY (`f0`, `f1`) NOT ENFORCED
*
*/
@Override
public final String asSummaryString() {
- final String typeString;
- switch (getType()) {
- case PRIMARY_KEY:
- typeString = "PRIMARY KEY";
- break;
- case UNIQUE_KEY:
- typeString = "UNIQUE";
- break;
- default:
- throw new IllegalStateException("Unknown key type: " + getType());
- }
-
return String.format(
"CONSTRAINT %s %s (%s)%s",
EncodingUtils.escapeIdentifier(getName()),
- typeString,
+ getTypeString(),
columns.stream()
.map(EncodingUtils::escapeIdentifier)
.collect(Collectors.joining(", ")),
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
index 5179684e91d797..f1c6342223ae60 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/WatermarkSpec.java
@@ -68,7 +68,7 @@ public ResolvedExpression getWatermarkExpression() {
public String asSummaryString() {
return "WATERMARK FOR "
- + String.join(".", EncodingUtils.escapeIdentifier(rowtimeAttribute))
+ + EncodingUtils.escapeIdentifier(rowtimeAttribute)
+ ": "
+ watermarkExpression.getOutputDataType()
+ " AS "
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 538b9094e1c6c3..8d55d63adb408e 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -55,6 +55,7 @@
import org.apache.flink.sql.parser.dql.SqlRichDescribeTable;
import org.apache.flink.sql.parser.dql.SqlRichExplain;
import org.apache.flink.sql.parser.dql.SqlShowCatalogs;
+import org.apache.flink.sql.parser.dql.SqlShowCreateTable;
import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog;
import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase;
import org.apache.flink.sql.parser.dql.SqlShowDatabases;
@@ -94,6 +95,7 @@
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
+import org.apache.flink.table.operations.ShowCreateTableOperation;
import org.apache.flink.table.operations.ShowCurrentCatalogOperation;
import org.apache.flink.table.operations.ShowCurrentDatabaseOperation;
import org.apache.flink.table.operations.ShowDatabasesOperation;
@@ -256,6 +258,8 @@ public static Optional convert(
return Optional.of(converter.convertDropFunction((SqlDropFunction) validated));
} else if (validated instanceof SqlAlterFunction) {
return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated));
+ } else if (validated instanceof SqlShowCreateTable) {
+ return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated));
} else if (validated instanceof SqlShowFunctions) {
return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated));
} else if (validated instanceof SqlShowPartitions) {
@@ -781,6 +785,14 @@ private Operation convertShowTables(SqlShowTables sqlShowTables) {
return new ShowTablesOperation();
}
+ /** Convert SHOW CREATE TABLE statement. */
+ private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) {
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(sqlShowCreateTable.getFullTableName());
+ ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
+ return new ShowCreateTableOperation(identifier);
+ }
+
/** Convert SHOW FUNCTIONS statement. */
private Operation convertShowFunctions(SqlShowFunctions sqlShowFunctions) {
return new ShowFunctionsOperation(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index da8a69c9f1b684..4f122e0cc4ca02 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.catalog
import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions}
import org.apache.flink.table.api.internal.TableEnvironmentImpl
-import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, ValidationException}
+import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, TableException, ValidationException}
import org.apache.flink.table.catalog.{CatalogDatabaseImpl, CatalogFunctionImpl, GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.planner.expressions.utils.Func0
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
@@ -29,7 +29,7 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
import org.apache.flink.util.FileUtils
-import org.junit.Assert.{assertEquals, assertTrue, fail}
+import org.junit.Assert.{assertEquals, fail}
import org.junit.rules.ExpectedException
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -39,7 +39,6 @@ import java.io.File
import java.math.{BigDecimal => JBigDecimal}
import java.net.URI
import java.util
-
import scala.collection.JavaConversions._
/** Test cases for catalog table. */
@@ -985,6 +984,91 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
assertEquals(false, tableSchema2.getPrimaryKey.isPresent)
}
+ @Test
+ def testCreateTableAndShowCreateTable(): Unit = {
+ val executedDDL =
+ """
+ |create temporary table TBL1 (
+ | a bigint not null,
+ | h string,
+ | g as 2*(a+1),
+ | b string not null,
+ | c bigint metadata virtual,
+ | e row,
+ | f as myfunc(a),
+ | ts1 timestamp(3),
+ | ts2 timestamp_ltz(3) metadata from 'timestamp',
+ | `__source__` varchar(255),
+ | proc as proctime(),
+ | watermark for ts1 as cast(timestampadd(hour, 8, ts1) as timestamp(3)),
+ | constraint test_constraint primary key (a, b) not enforced
+ |) comment 'test show create table statement'
+ |partitioned by (b,h)
+ |with (
+ | 'connector' = 'kafka',
+ | 'kafka.topic' = 'log.test'
+ |)
+ |""".stripMargin
+
+ val expectedDDL =
+ """ |CREATE TEMPORARY TABLE `default_catalog`.`default_database`.`TBL1` (
+ | `a` BIGINT NOT NULL,
+ | `h` VARCHAR(2147483647),
+ | `g` AS 2 * (`a` + 1),
+ | `b` VARCHAR(2147483647) NOT NULL,
+ | `c` BIGINT METADATA VIRTUAL,
+ | `e` ROW<`name` VARCHAR(2147483647), `age` INT, `flag` BOOLEAN>,
+ | `f` AS `default_catalog`.`default_database`.`myfunc`(`a`),
+ | `ts1` TIMESTAMP(3),
+ | `ts2` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
+ | `__source__` VARCHAR(255),
+ | `proc` AS PROCTIME(),
+ | WATERMARK FOR `ts1` AS CAST(TIMESTAMPADD(HOUR, 8, `ts1`) AS TIMESTAMP(3)),
+ | CONSTRAINT `test_constraint` PRIMARY KEY (`a`, `b`) NOT ENFORCED
+ |) COMMENT 'test show create table statement'
+ |PARTITIONED BY (`b`, `h`)
+ |WITH (
+ | 'connector' = 'kafka',
+ | 'kafka.topic' = 'log.test'
+ |)
+ |""".stripMargin
+ tableEnv.executeSql(executedDDL)
+ val row = tableEnv.executeSql("SHOW CREATE TABLE `TBL1`").collect().next()
+ assertEquals(expectedDDL, row.getField(0))
+
+ expectedEx.expect(classOf[ValidationException])
+ expectedEx.expectMessage(
+ "Could not execute SHOW CREATE TABLE. " +
+ "Table with identifier `default_catalog`.`default_database`.`tmp` does not exist.")
+ tableEnv.executeSql("SHOW CREATE TABLE `tmp`")
+ }
+
+ @Test
+ def testCreateViewAndShowCreateTable(): Unit = {
+ val createTableDDL =
+ """ |create table `source` (
+ | `id` bigint not null,
+ | `group` string not null,
+ | `score` double
+ |) with (
+ | 'connector' = 'source-only'
+ |)
+ |""".stripMargin
+ val createViewDDL =
+ """ |create view `tmp` as
+ |select `group`, avg(`score`) as avg_score
+ |from `source`
+ |group by `group`
+ |""".stripMargin
+ tableEnv.executeSql(createTableDDL)
+ tableEnv.executeSql(createViewDDL)
+ expectedEx.expect(classOf[TableException])
+ expectedEx.expectMessage(
+ "SHOW CREATE TABLE does not support showing CREATE VIEW statement with " +
+ "identifier `default_catalog`.`default_database`.`tmp`.")
+ tableEnv.executeSql("SHOW CREATE TABLE `tmp`")
+ }
+
@Test
def testUseCatalogAndShowCurrentCatalog(): Unit = {
tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1"))