From c319fd56cabbdb9415f6d2bb3110468ef334bdfc Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 10 Dec 2025 14:55:51 +0800 Subject: [PATCH 1/9] implement create/drop table/db for paimon --- .../ExternalMetadataOperations.java | 6 + .../paimon/DorisToPaimonTypeVisitor.java | 109 +++++ .../paimon/PaimonExternalCatalog.java | 40 +- .../datasource/paimon/PaimonMetadataOps.java | 401 ++++++++++++++++++ .../plans/commands/info/ColumnDefinition.java | 12 + .../plans/commands/info/CreateTableInfo.java | 21 +- .../paimon/IcebergMetadataOpsTest.java | 216 ++++++++++ .../paimon/test_paimon_table.groovy | 97 +++++ 8 files changed, 864 insertions(+), 38 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/IcebergMetadataOpsTest.java create mode 100644 regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java index 7d63b18cd13ffb..513b3379177da9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java @@ -21,6 +21,7 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; +import org.apache.doris.datasource.paimon.PaimonMetadataOps; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.catalog.Catalog; @@ -35,4 +36,9 @@ public static HiveMetadataOps newHiveMetadataOps(HiveConf hiveConf, HMSExternalC public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) { return new IcebergMetadataOps(dorisCatalog, catalog); } + + public static PaimonMetadataOps newPaimonMetaOps(ExternalCatalog dorisCatalog, + org.apache.paimon.catalog.Catalog catalog) { + return new PaimonMetadataOps(dorisCatalog, catalog); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java new file mode 100644 index 00000000000000..aad8106563b4f4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/DorisToPaimonTypeVisitor.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.DorisTypeVisitor; + +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class DorisToPaimonTypeVisitor extends DorisTypeVisitor { + + @Override + public DataType struct(StructType struct, List fieldResults) { + List fields = struct.getFields(); + List newFields = new ArrayList<>(fields.size()); + AtomicInteger atomicInteger = new AtomicInteger(-1); + for (int i = 0; i < fields.size(); i++) { + StructField field = fields.get(i); + DataType fieldType = fieldResults.get(i).copy(field.getContainsNull()); + String comment = field.getComment(); + DataField dataField = new DataField(atomicInteger.incrementAndGet(), field.getName(), fieldType, comment); + newFields.add(dataField); + } + return new RowType(newFields); + } + + @Override + public DataType field(StructField field, DataType typeResult) { + return typeResult; + } + + @Override + public DataType array(ArrayType array, DataType elementResult) { + return new org.apache.paimon.types.ArrayType(elementResult.copy(array.getContainsNull())); + } + + @Override + public DataType map(MapType map, DataType keyResult, DataType valueResult) { + return new org.apache.paimon.types.MapType(keyResult.copy(false), + valueResult.copy(map.getIsValueContainsNull())); + } + + @Override + public DataType atomic(Type atomic) { + PrimitiveType primitiveType = atomic.getPrimitiveType(); + if (primitiveType.equals(PrimitiveType.BOOLEAN)) { + return new BooleanType(); + } else if (primitiveType.equals(PrimitiveType.INT)) { + return new IntType(); + } else if (primitiveType.equals(PrimitiveType.BIGINT)) { + return new BigIntType(); + } else if (primitiveType.equals(PrimitiveType.FLOAT)) { + return new FloatType(); + } else if (primitiveType.equals(PrimitiveType.DOUBLE)) { + return new DoubleType(); + } else if (primitiveType.isCharFamily()) { + return new VarCharType(VarCharType.MAX_LENGTH); + } else if (primitiveType.equals(PrimitiveType.DATE) || primitiveType.equals(PrimitiveType.DATEV2)) { + return new DateType(); + } else if (primitiveType.equals(PrimitiveType.DECIMALV2) || primitiveType.isDecimalV3Type()) { + return new DecimalType(((ScalarType) atomic).getScalarPrecision(), ((ScalarType) atomic).getScalarScale()); + } else if (primitiveType.equals(PrimitiveType.DATETIME) || primitiveType.equals(PrimitiveType.DATETIMEV2)) { + return new TimestampType(); + } else if (primitiveType.isVarbinaryType()) { + return new VarBinaryType(VarBinaryType.MAX_LENGTH); + } else if (primitiveType.isVariantType()) { + return new VariantType(); + } + throw new UnsupportedOperationException("Not a supported type: " + primitiveType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index 09ec08e904dc7b..f3bef8039fcfaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -23,13 +23,13 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.NameMapping; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Catalog.TableNotExistException; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.partition.Partition; @@ -62,6 +62,7 @@ protected void initLocalObjectsImpl() { catalogType = paimonProperties.getPaimonCatalogType(); catalog = createCatalog(); initPreExecutionAuthenticator(); + metadataOps = ExternalMetadataOperations.newPaimonMetaOps(this, catalog); } @Override @@ -76,49 +77,16 @@ public String getCatalogType() { return catalogType; } - protected List listDatabaseNames() { - try { - return executionAuthenticator.execute(() -> new ArrayList<>(catalog.listDatabases())); - } catch (Exception e) { - throw new RuntimeException("Failed to list databases names, catalog name: " + getName(), e); - } - } - @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - try { - return executionAuthenticator.execute(() -> { - try { - catalog.getTable(Identifier.create(dbName, tblName)); - return true; - } catch (TableNotExistException e) { - return false; - } - }); - - } catch (Exception e) { - throw new RuntimeException("Failed to check table existence, catalog name: " + getName() - + "error message is:" + ExceptionUtils.getRootCauseMessage(e), e); - } + return metadataOps.tableExist(dbName, tblName); } @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - try { - return executionAuthenticator.execute(() -> { - List tableNames = null; - try { - tableNames = catalog.listTables(dbName); - } catch (Catalog.DatabaseNotExistException e) { - LOG.warn("DatabaseNotExistException", e); - } - return tableNames; - }); - } catch (Exception e) { - throw new RuntimeException("Failed to list table names, catalog name: " + getName(), e); - } + return metadataOps.listTableNames(dbName); } public List getPaimonPartitions(NameMapping nameMapping) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java new file mode 100644 index 00000000000000..c1687ce405e0fc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java @@ -0,0 +1,401 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.ExecutionAuthenticator; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.DorisTypeVisitor; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException; +import org.apache.paimon.catalog.Catalog.DatabaseNotExistException; +import org.apache.paimon.catalog.Catalog.TableAlreadyExistException; +import org.apache.paimon.catalog.Catalog.TableNotExistException; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class PaimonMetadataOps implements ExternalMetadataOps { + + private static final Logger LOG = LogManager.getLogger(PaimonMetadataOps.class); + protected Catalog catalog; + protected ExternalCatalog dorisCatalog; + private ExecutionAuthenticator executionAuthenticator; + private static final String PRIMARY_KEY_IDENTIFIER = "primary-key"; + private static final String PROP_COMMENT = "comment"; + private static final String PROP_LOCATION = "location"; + + public PaimonMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) { + this.dorisCatalog = dorisCatalog; + this.catalog = catalog; + this.executionAuthenticator = dorisCatalog.getExecutionAuthenticator(); + } + + + @Override + public boolean createDbImpl(String dbName, boolean ifNotExists, Map properties) + throws DdlException { + try { + return executionAuthenticator.execute(() -> performCreateDb(dbName, ifNotExists, properties)); + } catch (Exception e) { + throw new DdlException("Failed to create database: " + + dbName + ": " + Util.getRootCauseMessage(e), e); + } + } + + private boolean performCreateDb(String dbName, boolean ifNotExists, Map properties) + throws DdlException, Catalog.DatabaseAlreadyExistException { + if (databaseExist(dbName)) { + if (ifNotExists) { + LOG.info("create database[{}] which already exists", dbName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS, dbName); + } + } + + if (!properties.isEmpty() && dorisCatalog instanceof PaimonExternalCatalog) { + String catalogType = ((PaimonExternalCatalog) dorisCatalog).getCatalogType(); + if (!PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)) { + throw new DdlException( + "Not supported: create database with properties for paimon catalog type: " + catalogType); + } + } + + catalog.createDatabase(dbName, ifNotExists, properties); + return false; + } + + @Override + public void afterCreateDb() { + dorisCatalog.resetMetaCacheNames(); + } + + @Override + public void dropDbImpl(String dbName, boolean ifExists, boolean force) throws DdlException { + try { + executionAuthenticator.execute(() -> { + performDropDb(dbName, ifExists, force); + return null; + }); + } catch (Exception e) { + throw new DdlException( + "Failed to drop database: " + dbName + ", error message is:" + e.getMessage(), e); + } + } + + private void performDropDb(String dbName, boolean ifExists, boolean force) throws DdlException { + ExternalDatabase dorisDb = dorisCatalog.getDbNullable(dbName); + if (dorisDb == null) { + if (ifExists) { + LOG.info("drop database[{}] which does not exist", dbName); + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName); + } + } + + if (force) { + List tableNames = listTableNames(dbName); + if (!tableNames.isEmpty()) { + LOG.info("drop database[{}] with force, drop all tables, num: {}", dbName, tableNames.size()); + } + for (String tableName : tableNames) { + performDropTable(dbName, tableName, true); + } + } + + try { + catalog.dropDatabase(dbName, ifExists, force); + } catch (DatabaseNotExistException e) { + throw new RuntimeException("database " + dbName + " does not exist!"); + } catch (DatabaseNotEmptyException e) { + throw new RuntimeException("database " + dbName + " does not empty! please check!"); + } + } + + @Override + public void afterDropDb(String dbName) { + dorisCatalog.unregisterDatabase(dbName); + } + + @Override + public boolean createTableImpl(CreateTableInfo createTableInfo) throws UserException { + try { + return executionAuthenticator.execute(() -> performCreateTable(createTableInfo)); + } catch (Exception e) { + throw new DdlException( + "Failed to create table: " + createTableInfo.getTableName() + ", error message is:" + e.getMessage(), + e); + } + } + + public boolean performCreateTable(CreateTableInfo createTableInfo) throws UserException { + String dbName = createTableInfo.getDbName(); + ExternalDatabase db = dorisCatalog.getDbNullable(dbName); + if (db == null) { + throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName()); + } + String tableName = createTableInfo.getTableName(); + // 1. first, check if table exist in remote + if (tableExist(db.getRemoteName(), tableName)) { + if (createTableInfo.isIfNotExists()) { + LOG.info("create table[{}] which already exists", tableName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } + } + + // 2. second, check if table exist in local. + // This is because case sensibility issue, eg: + // 1. lower_case_table_name = 1 + // 2. create table tbl1; + // 3. create table TBL1; TBL1 does not exist in remote because the remote system is case-sensitive. + // but because lower_case_table_name = 1, the table can not be created in Doris because it is conflict with + // tbl1 + ExternalTable dorisTable = db.getTableNullable(tableName); + if (dorisTable != null) { + if (createTableInfo.isIfNotExists()) { + LOG.info("create table[{}] which already exists", tableName); + return true; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); + } + } + List columns = createTableInfo.getColumnDefinitions(); + List collect = columns.stream() + .map(col -> new StructField(col.getName(), col.getType().toCatalogDataType(), + col.getComment(), col.isNullable())) + .collect(Collectors.toList()); + StructType structType = new StructType(new ArrayList<>(collect)); + Schema schema = toPaimonSchema(structType, createTableInfo.getPartitionDesc(), createTableInfo.getProperties()); + try { + catalog.createTable(new Identifier(createTableInfo.getDbName(), createTableInfo.getTableName()), + schema, createTableInfo.isIfNotExists()); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + throw new RuntimeException(e); + } + return false; + } + + private Schema toPaimonSchema(StructType structType, PartitionDesc partitionDesc, Map properties) { + Map normalizedProperties = new HashMap<>(properties); + normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); + normalizedProperties.remove(PROP_COMMENT); + if (normalizedProperties.containsKey(PROP_LOCATION)) { + String path = normalizedProperties.remove(PROP_LOCATION); + normalizedProperties.put(CoreOptions.PATH.key(), path); + } + + String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); + List primaryKeys = pkAsString == null ? Collections.emptyList() : Arrays.stream(pkAsString.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + List partitionKeys = partitionDesc == null ? new ArrayList<>() : partitionDesc.getPartitionColNames(); + Schema.Builder schemaBuilder = Schema.newBuilder() + .options(normalizedProperties) + .primaryKey(primaryKeys) + .partitionKeys(partitionKeys) + .comment(properties.getOrDefault(PROP_COMMENT, null)); + for (StructField field : structType.getFields()) { + schemaBuilder.column(field.getName(), + toPaimontype(field.getType()).copy(field.getContainsNull()), + field.getComment()); + } + return schemaBuilder.build(); + } + + private DataType toPaimontype(Type type) { + return DorisTypeVisitor.visit(type, new DorisToPaimonTypeVisitor()); + } + + @Override + public void afterCreateTable(String dbName, String tblName) { + Optional> db = dorisCatalog.getDbForReplay(dbName); + if (db.isPresent()) { + db.get().resetMetaCacheNames(); + } + LOG.info("after create table {}.{}.{}, is db exists: {}", + dorisCatalog.getName(), dbName, tblName, db.isPresent()); + } + + @Override + public void dropTableImpl(ExternalTable dorisTable, boolean ifExists) throws DdlException { + try { + executionAuthenticator.execute(() -> { + performDropTable(dorisTable.getRemoteDbName(), dorisTable.getRemoteName(), ifExists); + return null; + }); + } catch (Exception e) { + throw new DdlException( + "Failed to drop table: " + dorisTable.getName() + ", error message is:" + e.getMessage(), e); + } + } + + private void performDropTable(String dBName, String tableName, boolean ifExists) throws DdlException { + if (!tableExist(dBName, tableName)) { + if (ifExists) { + LOG.info("drop table[{}] which does not exist", tableName); + return; + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tableName, dBName); + } + } + try { + catalog.dropTable(Identifier.create(dBName, tableName), ifExists); + } catch (TableNotExistException e) { + throw new RuntimeException("table " + tableName + " does not exist"); + } + } + + @Override + public void afterDropTable(String dbName, String tblName) { + Optional> db = dorisCatalog.getDbForReplay(dbName); + db.ifPresent(externalDatabase -> externalDatabase.unregisterTable(tblName)); + LOG.info("after drop table {}.{}.{}. is db exists: {}", + dorisCatalog.getName(), dbName, tblName, db.isPresent()); + } + + @Override + public void truncateTableImpl(ExternalTable dorisTable, List partitions) throws DdlException { + throw new UnsupportedOperationException("truncate table is not a supported operation!"); + } + + @Override + public void createOrReplaceBranchImpl(ExternalTable dorisTable, CreateOrReplaceBranchInfo branchInfo) + throws UserException { + throw new UnsupportedOperationException("create or replace branch is not a supported operation!"); + } + + @Override + public void createOrReplaceTagImpl(ExternalTable dorisTable, CreateOrReplaceTagInfo tagInfo) throws UserException { + throw new UnsupportedOperationException("create or replace tag is not a supported operation!"); + } + + @Override + public void dropTagImpl(ExternalTable dorisTable, DropTagInfo tagInfo) throws UserException { + throw new UnsupportedOperationException("drop tag is not a supported operation!"); + } + + @Override + public void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) throws UserException { + throw new UnsupportedOperationException("drop branch is not a supported operation!"); + } + + @Override + public List listDatabaseNames() { + try { + return executionAuthenticator.execute(() -> new ArrayList<>(catalog.listDatabases())); + } catch (Exception e) { + throw new RuntimeException("Failed to list databases names, catalog name: " + dorisCatalog.getName(), e); + } + } + + @Override + public List listTableNames(String db) { + try { + return executionAuthenticator.execute(() -> { + List tableNames = new ArrayList<>(); + try { + tableNames.addAll(catalog.listTables(db)); + } catch (DatabaseNotExistException e) { + LOG.warn("DatabaseNotExistException", e); + } + return tableNames; + }); + } catch (Exception e) { + throw new RuntimeException("Failed to list table names, catalog name: " + dorisCatalog.getName(), e); + } + } + + @Override + public boolean tableExist(String dbName, String tblName) { + try { + return executionAuthenticator.execute(() -> { + try { + catalog.getTable(Identifier.create(dbName, tblName)); + return true; + } catch (TableNotExistException e) { + return false; + } + }); + + } catch (Exception e) { + throw new RuntimeException("Failed to check table existence, catalog name: " + dorisCatalog.getName() + + "error message is:" + ExceptionUtils.getRootCauseMessage(e), e); + } + } + + @Override + public boolean databaseExist(String dbName) { + try { + return executionAuthenticator.execute(() -> { + try { + catalog.getDatabase(dbName); + return true; + } catch (DatabaseNotExistException e) { + return false; + } + }); + } catch (Exception e) { + throw new RuntimeException("Failed to check database exist, error message is:" + e.getMessage(), e); + } + } + + public Catalog getCatalog() { + return catalog; + } + + @Override + public void close() { + if (catalog != null) { + catalog = null; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java index eb5893114c5875..1f181c71c431e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java @@ -192,6 +192,18 @@ public void setGeneratedColumnsThatReferToThis(Set generatedColumnsThatR this.generatedColumnsThatReferToThis = generatedColumnsThatReferToThis; } + public String getComment() { + return getComment(false); + } + + public String getComment(boolean escapeQuota) { + String comment = this.comment == null ? "" : this.comment; + if (!escapeQuota) { + return comment; + } + return SqlUtils.escapeQuota(comment); + } + /** * toSql */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 2fc3d849445664..d6bc4f98e1e0b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -47,6 +47,7 @@ import org.apache.doris.datasource.es.EsUtil; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.info.TableNameInfo; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.CascadesContext; @@ -119,6 +120,7 @@ public class CreateTableInfo { public static final String ENGINE_BROKER = "broker"; public static final String ENGINE_HIVE = "hive"; public static final String ENGINE_ICEBERG = "iceberg"; + public static final String ENGINE_PAIMON = "paimon"; private static final ImmutableSet GENERATED_COLUMN_ALLOW_AGG_TYPE = ImmutableSet.of(AggregateType.REPLACE, AggregateType.REPLACE_IF_NOT_NULL); @@ -378,6 +380,8 @@ private void checkEngineWithCatalog() { throw new AnalysisException("Hms type catalog can only use `hive` engine."); } else if (catalog instanceof IcebergExternalCatalog && !engineName.equals(ENGINE_ICEBERG)) { throw new AnalysisException("Iceberg type catalog can only use `iceberg` engine."); + } else if (catalog instanceof PaimonExternalCatalog && !engineName.equals(ENGINE_PAIMON)) { + throw new AnalysisException("Paimon type catalog can only use `paimon` engine."); } } @@ -772,7 +776,12 @@ public void validate(ConnectContext ctx) { throw new AnalysisException( "Iceberg doesn't support 'DISTRIBUTE BY', " + "and you can use 'bucket(num, column)' in 'PARTITIONED BY'."); + } else if (engineName.equalsIgnoreCase(ENGINE_PAIMON) && distribution != null) { + throw new AnalysisException( + "Paimon doesn't support 'DISTRIBUTE BY', " + + "and you can use 'bucket(num, column)' in 'PARTITIONED BY'."); } + for (ColumnDefinition columnDef : columns) { if (!columnDef.isNullable() && engineName.equalsIgnoreCase(ENGINE_HIVE)) { @@ -880,6 +889,8 @@ private void paddingEngineName(String ctlName, ConnectContext ctx) { engineName = ENGINE_HIVE; } else if (catalog instanceof IcebergExternalCatalog) { engineName = ENGINE_ICEBERG; + } else if (catalog instanceof PaimonExternalCatalog) { + engineName = ENGINE_PAIMON; } else { throw new AnalysisException("Current catalog does not support create table: " + ctlName); } @@ -909,7 +920,8 @@ public void validateCreateTableAsSelect(List qualifierTableName, List getColumnDefinitions() { + return columns; + } + public List getColumns() { return columns.stream() .map(ColumnDefinition::translateToCatalogStyle).collect(Collectors.toList()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/IcebergMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/IcebergMetadataOpsTest.java new file mode 100644 index 00000000000000..75345b8437c3fd --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/IcebergMetadataOpsTest.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogFactory; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand; +import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; + +import com.google.common.collect.Maps; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Catalog.TableNotExistException; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.VarCharType; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +public class IcebergMetadataOpsTest { + public static PaimonExternalCatalog paimonCatalog; + public static PaimonMetadataOps ops; + public static String dbName = "testdb"; + + @BeforeClass + public static void beforeClass() throws Throwable { + HashMap param = new HashMap<>(); + param.put("type", "paimon"); + param.put("paimon.catalog.type", "hms"); + // create catalog + CreateCatalogCommand createCatalogCommand = new CreateCatalogCommand("paimon", true, "", "comment", param); + paimonCatalog = (PaimonExternalCatalog) CatalogFactory.createFromCommand(1, createCatalogCommand); + paimonCatalog.makeSureInitialized(); + // create db + ops = new PaimonMetadataOps(paimonCatalog, paimonCatalog.catalog); + ops.createDb(dbName, true, Maps.newHashMap()); + } + + @Test + public void testSimpleTable() throws UserException, TableNotExistException { + Identifier identifier = new Identifier(dbName, getTableName()); + String sql = "create table " + dbName + "." + getTableName() + " (id int) engine = paimon"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + List columnNames = new ArrayList<>(); + if (catalog instanceof HiveCatalog) { + columnNames.addAll(((HiveCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } else if (catalog instanceof FileSystemCatalog) { + columnNames.addAll(((FileSystemCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } + + if (!columnNames.isEmpty()) { + Assert.assertEquals(1, columnNames.size()); + } + Assert.assertEquals(0, table.partitionKeys().size()); + } + + @Test + public void testProperties() throws UserException, TableNotExistException { + Identifier identifier = new Identifier(dbName, getTableName()); + String sql = "create table " + dbName + "." + getTableName() + " (id int) engine = paimon properties(\"primary-key\"=id)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + + List columnNames = new ArrayList<>(); + if (catalog instanceof HiveCatalog) { + columnNames.addAll(((HiveCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } else if (catalog instanceof FileSystemCatalog) { + columnNames.addAll(((FileSystemCatalog) catalog).loadTableSchema(identifier).fieldNames()); + } + + if (!columnNames.isEmpty()) { + Assert.assertEquals(1, columnNames.size()); + } + Assert.assertEquals(0, table.partitionKeys().size()); + Assert.assertEquals("id", table.options().get("primary-key")); + Assert.assertEquals(1, table.primaryKeys().size()); + } + + @Test + public void testType() throws UserException, TableNotExistException { + Identifier identifier = new Identifier(dbName, getTableName()); + String sql = "create table " + dbName + "." + getTableName() + " (" + + "c0 int, " + + "c1 bigint, " + + "c2 float, " + + "c3 double, " + + "c4 string, " + + "c5 date, " + + "c6 decimal(20, 10), " + + "c7 datetime" + + ") engine = paimon " + + "properties(\"primary-key\"=c0)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + + List columns = new ArrayList<>(); + if (catalog instanceof HiveCatalog) { + columns.addAll(((HiveCatalog) catalog).loadTableSchema(identifier).fields()); + } else if (catalog instanceof FileSystemCatalog) { + columns.addAll(((FileSystemCatalog) catalog).loadTableSchema(identifier).fields()); + } + + if (!columns.isEmpty()) { + Assert.assertEquals(8, columns.size()); + Assert.assertEquals(new IntType().asSQLString(), columns.get(0).type().toString()); + Assert.assertEquals(new BigIntType().asSQLString(), columns.get(1).type().toString()); + Assert.assertEquals(new FloatType().asSQLString(), columns.get(2).type().toString()); + Assert.assertEquals(new DoubleType().asSQLString(), columns.get(3).type().toString()); + Assert.assertEquals(new VarCharType(VarCharType.MAX_LENGTH).asSQLString(), columns.get(4).type().toString()); + Assert.assertEquals(new DateType().asSQLString(), columns.get(5).type().toString()); + Assert.assertEquals(new DecimalType(20, 10).asSQLString(), columns.get(6).type().toString()); + Assert.assertEquals(new TimestampType().asSQLString(), columns.get(7).type().toString()); + } + + Assert.assertEquals(0, table.partitionKeys().size()); + Assert.assertEquals("c0", table.options().get("primary-key")); + Assert.assertEquals(1, table.primaryKeys().size()); + } + + @Test + public void testPartition() throws UserException, TableNotExistException { + Identifier identifier = new Identifier(dbName, getTableName()); + String sql = "create table " + dbName + "." + getTableName() + " (" + + "c0 int, " + + "c1 bigint, " + + "c2 float, " + + "c3 double, " + + "c4 string, " + + "c5 date, " + + "c6 decimal(20, 10), " + + "c7 datetime" + + ") engine = paimon " + + "partition by (" + + "c1 )" + + "properties(\"primary-key\"=c0)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + Assert.assertEquals(1, table.partitionKeys().size()); + Assert.assertEquals("c0", table.options().get("primary-key")); + Assert.assertEquals(1, table.primaryKeys().size()); + } + + public void createTable(String sql) throws UserException { + LogicalPlan plan = new NereidsParser().parseSingle(sql); + Assertions.assertTrue(plan instanceof CreateTableCommand); + CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo(); + createTableInfo.setIsExternal(true); + createTableInfo.analyzeEngine(); + ops.createTable(createTableInfo); + } + + public String getTableName() { + String s = "test_tb_" + UUID.randomUUID(); + return s.replaceAll("-", ""); + } + + @Test + public void testDropDB() { + try { + // create db success + ops.createDb("t_paimon", false, Maps.newHashMap()); + // drop db success + ops.dropDb("t_paimon", false, false); + } catch (Throwable t) { + Assert.fail(); + } + + try { + ops.dropDb("t_paimon", false, false); + Assert.fail(); + } catch (Throwable t) { + Assert.assertTrue(t instanceof DdlException); + Assert.assertTrue(t.getMessage().contains("database doesn't exist")); + } + } +} diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy new file mode 100644 index 00000000000000..d699bdf228166f --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_table", "p0,external,doris,external_docker,external_docker_doris,new_catalog_property") { + + String hms_ctl_name = "paimon_hms_catalog"; + + // This is only for testing creating catalog + sql """DROP CATALOG IF EXISTS ${hms_ctl_name}""" + sql """ + CREATE CATALOG ${hms_ctl_name} PROPERTIES ( + "type" = "paimon", + "paimon.catalog.type"="hms", + "warehouse" = "hdfs://HDFS8000871/user/zhangdong/paimon3", + "hive.metastore.uris" = "thrift://172.21.0.44:7004", + "dfs.nameservices"="HDFS8000871", + "dfs.ha.namenodes.HDFS8000871"="nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", + "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", + "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "hadoop.username"="hadoop" + ); + """ + sql """switch${hms_ctl_name}""" + String db_name = "test_db" + sql """create database if not exists ${db_name}""" + sql """use ${db_name}""" + + sql """drop table if exists ${db_name}.test01""" + sql """ + CREATE TABLE ${db_name}.test01 ( + id int + ) engine=paimon; + """ + + sql """drop table if exists ${db_name}.test02""" + sql """ + CREATE TABLE ${db_name}.test02 ( + id int + ) engine=paimon + properties("primary-key"=id); + """ + + sql """drop table if exists ${db_name}.test03""" + sql """ + CREATE TABLE ${db_name}.test03 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + properties("primary-key"=c0); + """ + + sql """drop table if exists ${db_name}.test04""" + sql """ + CREATE TABLE ${db_name}.test04 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + partition by (c1) () + properties("primary-key"=c0); + """ + + sql """ drop table if exists ${db_name}.test01""" + sql """ drop table if exists ${db_name}.test02""" + sql """ drop table if exists ${db_name}.test03""" + sql """ drop table if exists ${db_name}.test04""" + sql """ drop database if exists ${db_name}""" + sql """DROP CATALOG IF EXISTS ${hms_ctl_name}""" +} + + From 94c4b9e483575f604024ca33b4275e4b53b33d4a Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 17 Dec 2025 21:36:24 +0800 Subject: [PATCH 2/9] optimize test --- ...ergMetadataOpsTest.java => PaimonMetadataOpsTest.java} | 8 +++++++- .../external_table_p0/paimon/test_paimon_table.groovy | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) rename fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/{IcebergMetadataOpsTest.java => PaimonMetadataOpsTest.java} (96%) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/IcebergMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java similarity index 96% rename from fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/IcebergMetadataOpsTest.java rename to fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java index 75345b8437c3fd..419c677198a654 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/IcebergMetadataOpsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -47,21 +47,27 @@ import org.junit.Test; import org.junit.jupiter.api.Assertions; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.UUID; -public class IcebergMetadataOpsTest { +public class PaimonMetadataOpsTest { + public static String warehouse; public static PaimonExternalCatalog paimonCatalog; public static PaimonMetadataOps ops; public static String dbName = "testdb"; @BeforeClass public static void beforeClass() throws Throwable { + Path warehousePath = Files.createTempDirectory("test_warehouse_"); + warehouse = "file://" + warehousePath.toAbsolutePath() + "/"; HashMap param = new HashMap<>(); param.put("type", "paimon"); param.put("paimon.catalog.type", "hms"); + param.put("warehouse", warehouse); // create catalog CreateCatalogCommand createCatalogCommand = new CreateCatalogCommand("paimon", true, "", "comment", param); paimonCatalog = (PaimonExternalCatalog) CatalogFactory.createFromCommand(1, createCatalogCommand); diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy index d699bdf228166f..5e80611ff5a45e 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy @@ -17,7 +17,7 @@ suite("test_paimon_table", "p0,external,doris,external_docker,external_docker_doris,new_catalog_property") { - String hms_ctl_name = "paimon_hms_catalog"; + String hms_ctl_name = "paimon_hms_catalog_test01"; // This is only for testing creating catalog sql """DROP CATALOG IF EXISTS ${hms_ctl_name}""" @@ -35,7 +35,7 @@ suite("test_paimon_table", "p0,external,doris,external_docker,external_docker_do "hadoop.username"="hadoop" ); """ - sql """switch${hms_ctl_name}""" + sql """switch ${hms_ctl_name}""" String db_name = "test_db" sql """create database if not exists ${db_name}""" sql """use ${db_name}""" From 28bceaaf501da722c21b8a1d1a37a5b3d20d4cc2 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 17 Dec 2025 23:57:40 +0800 Subject: [PATCH 3/9] optimize test --- .../paimon/PaimonMetadataOpsTest.java | 2 +- .../paimon/test_paimon_table.groovy | 157 ++++++++++-------- 2 files changed, 87 insertions(+), 72 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java index 419c677198a654..2ef817c3c91828 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -66,7 +66,7 @@ public static void beforeClass() throws Throwable { warehouse = "file://" + warehousePath.toAbsolutePath() + "/"; HashMap param = new HashMap<>(); param.put("type", "paimon"); - param.put("paimon.catalog.type", "hms"); + param.put("paimon.catalog.type", "filesystem"); param.put("warehouse", warehouse); // create catalog CreateCatalogCommand createCatalogCommand = new CreateCatalogCommand("paimon", true, "", "comment", param); diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy index 5e80611ff5a45e..90d4ccb80fda13 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy @@ -15,83 +15,98 @@ // specific language governing permissions and limitations // under the License. -suite("test_paimon_table", "p0,external,doris,external_docker,external_docker_doris,new_catalog_property") { +suite("test_create_paimon_table", "p0,external,doris,external_docker,external_docker_doris") { + String catalog_name = "paimon_hms_catalog_test01" - String hms_ctl_name = "paimon_hms_catalog_test01"; + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + for (String hivePrefix : ["hive2"]) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" + String warehouse = "${default_fs}/warehouse" - // This is only for testing creating catalog - sql """DROP CATALOG IF EXISTS ${hms_ctl_name}""" - sql """ - CREATE CATALOG ${hms_ctl_name} PROPERTIES ( - "type" = "paimon", - "paimon.catalog.type"="hms", - "warehouse" = "hdfs://HDFS8000871/user/zhangdong/paimon3", - "hive.metastore.uris" = "thrift://172.21.0.44:7004", - "dfs.nameservices"="HDFS8000871", - "dfs.ha.namenodes.HDFS8000871"="nn1,nn2", - "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", - "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", - "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", - "hadoop.username"="hadoop" - ); - """ - sql """switch ${hms_ctl_name}""" - String db_name = "test_db" - sql """create database if not exists ${db_name}""" - sql """use ${db_name}""" + // 1. test create catalog + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog ${catalog_name} properties ( + 'type'='paimon', + 'paimon.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}', + 'warehouse' = '${warehouse}' + ); + """ - sql """drop table if exists ${db_name}.test01""" - sql """ - CREATE TABLE ${db_name}.test01 ( - id int - ) engine=paimon; - """ + // 2. test create database + sql """switch ${catalog_name}""" + String db_name = "test_db" + sql """create database if not exists ${db_name}""" - sql """drop table if exists ${db_name}.test02""" - sql """ - CREATE TABLE ${db_name}.test02 ( - id int - ) engine=paimon - properties("primary-key"=id); - """ + // 3. test create table + sql """use ${db_name}""" + sql """drop table if exists ${db_name}.test01""" + sql """ + CREATE TABLE ${db_name}.test01 ( + id int + ) engine=paimon; + """ - sql """drop table if exists ${db_name}.test03""" - sql """ - CREATE TABLE ${db_name}.test03 ( - c0 int, - c1 bigint, - c2 float, - c3 double, - c4 string, - c5 date, - c6 decimal(10,5), - c7 datetime - ) engine=paimon - properties("primary-key"=c0); - """ + sql """drop table if exists ${db_name}.test02""" + sql """ + CREATE TABLE ${db_name}.test02 ( + id int + ) engine=paimon + properties("primary-key"=id); + """ - sql """drop table if exists ${db_name}.test04""" - sql """ - CREATE TABLE ${db_name}.test04 ( - c0 int, - c1 bigint, - c2 float, - c3 double, - c4 string, - c5 date, - c6 decimal(10,5), - c7 datetime - ) engine=paimon - partition by (c1) () - properties("primary-key"=c0); - """ + sql """drop table if exists ${db_name}.test03""" + sql """ + CREATE TABLE ${db_name}.test03 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + properties("primary-key"=c0); + """ - sql """ drop table if exists ${db_name}.test01""" - sql """ drop table if exists ${db_name}.test02""" - sql """ drop table if exists ${db_name}.test03""" - sql """ drop table if exists ${db_name}.test04""" - sql """ drop database if exists ${db_name}""" - sql """DROP CATALOG IF EXISTS ${hms_ctl_name}""" -} + sql """drop table if exists ${db_name}.test04""" + sql """ + CREATE TABLE ${db_name}.test04 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + partition by (c1) () + properties("primary-key"=c0); + """ + + sql """drop database if exists test_iceberg_meta_cache_db""" + sql """create database test_iceberg_meta_cache_db""" + sql """ + CREATE TABLE test_iceberg_meta_cache_db.sales ( + id INT, + amount DOUBLE + ); + """ + sql """ drop table if exists ${db_name}.test01""" + sql """ drop table if exists ${db_name}.test02""" + sql """ drop table if exists ${db_name}.test03""" + sql """ drop table if exists ${db_name}.test04""" + sql """ drop database if exists ${db_name}""" + sql """DROP CATALOG IF EXISTS ${catalog_name}""" + } + } +} From 8d2fc62f5eab8b7ab7976bec6e31085d3a1835f6 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Thu, 18 Dec 2025 09:06:46 +0800 Subject: [PATCH 4/9] optimize test --- .../apache/doris/datasource/paimon/PaimonMetadataOpsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java index 2ef817c3c91828..ff8d5cdd5b8440 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -75,6 +75,7 @@ public static void beforeClass() throws Throwable { // create db ops = new PaimonMetadataOps(paimonCatalog, paimonCatalog.catalog); ops.createDb(dbName, true, Maps.newHashMap()); + paimonCatalog.makeSureInitialized(); } @Test From a74e1690e740cd83ef15ff5b21f500d94522074c Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Thu, 18 Dec 2025 22:44:03 +0800 Subject: [PATCH 5/9] optimize ut test --- .../paimon/PaimonMetadataOpsTest.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java index ff8d5cdd5b8440..a83e1206c02b02 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -80,8 +80,9 @@ public static void beforeClass() throws Throwable { @Test public void testSimpleTable() throws UserException, TableNotExistException { - Identifier identifier = new Identifier(dbName, getTableName()); - String sql = "create table " + dbName + "." + getTableName() + " (id int) engine = paimon"; + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (id int) engine = paimon"; createTable(sql); Catalog catalog = ops.getCatalog(); Table table = catalog.getTable(identifier); @@ -100,8 +101,9 @@ public void testSimpleTable() throws UserException, TableNotExistException { @Test public void testProperties() throws UserException, TableNotExistException { - Identifier identifier = new Identifier(dbName, getTableName()); - String sql = "create table " + dbName + "." + getTableName() + " (id int) engine = paimon properties(\"primary-key\"=id)"; + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (id int) engine = paimon properties(\"primary-key\"=id)"; createTable(sql); Catalog catalog = ops.getCatalog(); Table table = catalog.getTable(identifier); @@ -123,8 +125,9 @@ public void testProperties() throws UserException, TableNotExistException { @Test public void testType() throws UserException, TableNotExistException { - Identifier identifier = new Identifier(dbName, getTableName()); - String sql = "create table " + dbName + "." + getTableName() + " (" + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (" + "c0 int, " + "c1 bigint, " + "c2 float, " @@ -165,8 +168,9 @@ public void testType() throws UserException, TableNotExistException { @Test public void testPartition() throws UserException, TableNotExistException { - Identifier identifier = new Identifier(dbName, getTableName()); - String sql = "create table " + dbName + "." + getTableName() + " (" + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (" + "c0 int, " + "c1 bigint, " + "c2 float, " @@ -177,7 +181,7 @@ public void testPartition() throws UserException, TableNotExistException { + "c7 datetime" + ") engine = paimon " + "partition by (" - + "c1 )" + + "c1 ) ()" + "properties(\"primary-key\"=c0)"; createTable(sql); Catalog catalog = ops.getCatalog(); From ef9a29f9c7fd372f6f434909ec751c3f32e19e90 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Fri, 19 Dec 2025 09:26:34 +0800 Subject: [PATCH 6/9] optimize ut --- .../doris/datasource/paimon/PaimonMetadataOpsTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java index a83e1206c02b02..b09f55a0bae2fa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -119,7 +119,7 @@ public void testProperties() throws UserException, TableNotExistException { Assert.assertEquals(1, columnNames.size()); } Assert.assertEquals(0, table.partitionKeys().size()); - Assert.assertEquals("id", table.options().get("primary-key")); + Assert.assertTrue(table.primaryKeys().contains("id")); Assert.assertEquals(1, table.primaryKeys().size()); } @@ -162,7 +162,7 @@ public void testType() throws UserException, TableNotExistException { } Assert.assertEquals(0, table.partitionKeys().size()); - Assert.assertEquals("c0", table.options().get("primary-key")); + Assert.assertTrue(table.primaryKeys().contains("c0")); Assert.assertEquals(1, table.primaryKeys().size()); } @@ -187,7 +187,7 @@ public void testPartition() throws UserException, TableNotExistException { Catalog catalog = ops.getCatalog(); Table table = catalog.getTable(identifier); Assert.assertEquals(1, table.partitionKeys().size()); - Assert.assertEquals("c0", table.options().get("primary-key")); + Assert.assertTrue(table.primaryKeys().contains("c0")); Assert.assertEquals(1, table.primaryKeys().size()); } From 95102c556085032305b3438688cba675e44fb7fc Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Fri, 19 Dec 2025 11:05:03 +0800 Subject: [PATCH 7/9] optimize ut --- .../doris/datasource/paimon/PaimonMetadataOpsTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java index b09f55a0bae2fa..fc8c9ef9a55c97 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Maps; import org.apache.paimon.catalog.Catalog; @@ -59,6 +60,7 @@ public class PaimonMetadataOpsTest { public static PaimonExternalCatalog paimonCatalog; public static PaimonMetadataOps ops; public static String dbName = "testdb"; + public static ConnectContext connectContext; @BeforeClass public static void beforeClass() throws Throwable { @@ -76,6 +78,10 @@ public static void beforeClass() throws Throwable { ops = new PaimonMetadataOps(paimonCatalog, paimonCatalog.catalog); ops.createDb(dbName, true, Maps.newHashMap()); paimonCatalog.makeSureInitialized(); + + // context + connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); } @Test From 4b7b784895abe2d60f1dbed92646ee098064204d Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Fri, 19 Dec 2025 20:59:13 +0800 Subject: [PATCH 8/9] optimize --- .../doris/datasource/paimon/PaimonMetadataOps.java | 6 +++++- .../external_table_p0/paimon/test_paimon_table.groovy | 9 --------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java index c1687ce405e0fc..e6c8177edcae47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataOps.java @@ -135,8 +135,12 @@ private void performDropDb(String dbName, boolean ifExists, boolean force) throw if (dorisDb == null) { if (ifExists) { LOG.info("drop database[{}] which does not exist", dbName); + // Database does not exist and IF EXISTS is specified; treat as no-op. + return; } else { ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS, dbName); + // ErrorReport.reportDdlException is expected to throw DdlException. + return; } } @@ -155,7 +159,7 @@ private void performDropDb(String dbName, boolean ifExists, boolean force) throw } catch (DatabaseNotExistException e) { throw new RuntimeException("database " + dbName + " does not exist!"); } catch (DatabaseNotEmptyException e) { - throw new RuntimeException("database " + dbName + " does not empty! please check!"); + throw new RuntimeException("database " + dbName + " is not empty! please check!"); } } diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy index 90d4ccb80fda13..660ddf84e5e701 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy @@ -91,15 +91,6 @@ suite("test_create_paimon_table", "p0,external,doris,external_docker,external_do properties("primary-key"=c0); """ - sql """drop database if exists test_iceberg_meta_cache_db""" - sql """create database test_iceberg_meta_cache_db""" - sql """ - CREATE TABLE test_iceberg_meta_cache_db.sales ( - id INT, - amount DOUBLE - ); - """ - sql """ drop table if exists ${db_name}.test01""" sql """ drop table if exists ${db_name}.test02""" sql """ drop table if exists ${db_name}.test03""" From cafe6266cad8bae251d515631b5e3794f6aa09f8 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Wed, 31 Dec 2025 10:35:49 +0800 Subject: [PATCH 9/9] create bucket table --- .../org/apache/doris/catalog/TableIf.java | 2 + .../paimon/PaimonMetadataOpsTest.java | 37 +++++++++++++++---- .../paimon/test_paimon_table.groovy | 19 ++++++++++ 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 8f445daf789c7d..151bc0d00ea1ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -485,6 +485,8 @@ public String toEngineName() { case ICEBERG: case ICEBERG_EXTERNAL_TABLE: return "iceberg"; + case PAIMON_EXTERNAL_TABLE: + return "paimon"; case DICTIONARY: return "dictionary"; case DORIS_EXTERNAL_TABLE: diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java index fc8c9ef9a55c97..e4146faa690ba9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/PaimonMetadataOpsTest.java @@ -29,7 +29,6 @@ import com.google.common.collect.Maps; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Catalog.TableNotExistException; import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.hive.HiveCatalog; @@ -59,7 +58,7 @@ public class PaimonMetadataOpsTest { public static String warehouse; public static PaimonExternalCatalog paimonCatalog; public static PaimonMetadataOps ops; - public static String dbName = "testdb"; + public static String dbName = "test_db"; public static ConnectContext connectContext; @BeforeClass @@ -85,7 +84,7 @@ public static void beforeClass() throws Throwable { } @Test - public void testSimpleTable() throws UserException, TableNotExistException { + public void testSimpleTable() throws Exception { String tableName = getTableName(); Identifier identifier = new Identifier(dbName, tableName); String sql = "create table " + dbName + "." + tableName + " (id int) engine = paimon"; @@ -106,7 +105,7 @@ public void testSimpleTable() throws UserException, TableNotExistException { } @Test - public void testProperties() throws UserException, TableNotExistException { + public void testProperties() throws Exception { String tableName = getTableName(); Identifier identifier = new Identifier(dbName, tableName); String sql = "create table " + dbName + "." + tableName + " (id int) engine = paimon properties(\"primary-key\"=id)"; @@ -130,7 +129,7 @@ public void testProperties() throws UserException, TableNotExistException { } @Test - public void testType() throws UserException, TableNotExistException { + public void testType() throws Exception { String tableName = getTableName(); Identifier identifier = new Identifier(dbName, tableName); String sql = "create table " + dbName + "." + tableName + " (" @@ -173,8 +172,8 @@ public void testType() throws UserException, TableNotExistException { } @Test - public void testPartition() throws UserException, TableNotExistException { - String tableName = getTableName(); + public void testPartition() throws Exception { + String tableName = "test04"; Identifier identifier = new Identifier(dbName, tableName); String sql = "create table " + dbName + "." + tableName + " (" + "c0 int, " @@ -197,6 +196,30 @@ public void testPartition() throws UserException, TableNotExistException { Assert.assertEquals(1, table.primaryKeys().size()); } + @Test + public void testBucket() throws Exception { + String tableName = getTableName(); + Identifier identifier = new Identifier(dbName, tableName); + String sql = "create table " + dbName + "." + tableName + " (" + + "c0 int, " + + "c1 bigint, " + + "c2 float, " + + "c3 double, " + + "c4 string, " + + "c5 date, " + + "c6 decimal(20, 10), " + + "c7 datetime" + + ") engine = paimon " + + "properties(\"primary-key\"=c0," + + "\"bucket\" = 4," + + "\"bucket-key\" = c0)"; + createTable(sql); + Catalog catalog = ops.getCatalog(); + Table table = catalog.getTable(identifier); + Assert.assertEquals("4", table.options().get("bucket")); + Assert.assertEquals("c0", table.options().get("bucket-key")); + } + public void createTable(String sql) throws UserException { LogicalPlan plan = new NereidsParser().parseSingle(sql); Assertions.assertTrue(plan instanceof CreateTableCommand); diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy index 660ddf84e5e701..032176bd1033f1 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_table.groovy @@ -90,11 +90,30 @@ suite("test_create_paimon_table", "p0,external,doris,external_docker,external_do partition by (c1) () properties("primary-key"=c0); """ + + sql """drop table if exists ${db_name}.test05""" + sql """ + CREATE TABLE ${db_name}.test05 ( + c0 int, + c1 bigint, + c2 float, + c3 double, + c4 string, + c5 date, + c6 decimal(10,5), + c7 datetime + ) engine=paimon + properties( + 'primary-key' = 'c0,c1', + 'bucket' = '4', + 'bucket-key' = 'c0,c1'); + """ sql """ drop table if exists ${db_name}.test01""" sql """ drop table if exists ${db_name}.test02""" sql """ drop table if exists ${db_name}.test03""" sql """ drop table if exists ${db_name}.test04""" + sql """ drop table if exists ${db_name}.test05""" sql """ drop database if exists ${db_name}""" sql """DROP CATALOG IF EXISTS ${catalog_name}""" }