From bceb301c9583e356db269dd37580f15a9a9a692f Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Thu, 26 Jun 2025 20:17:36 +0800 Subject: [PATCH 01/14] support paimon system table (first step) use hadoop Authenticator get hadoop Authenticator Revert "get hadoop Authenticator" This reverts commit 3dfc44b0b19441faa48167cbbfe847dbc0308eb3. Revert "use hadoop Authenticator" This reverts commit 950e28b4ea15e78395fa0608ec9388df3f741710. suport read paimon system --- .../table/paimon_sys_table_jni_reader.cpp | 61 ++++ .../table/paimon_sys_table_jni_reader.h | 64 +++++ be/src/vec/exec/scan/meta_scanner.cpp | 7 + .../paimon/PaimonSysTableJniScanner.java | 195 +++++++++++++ .../catalog/BuiltinTableValuedFunctions.java | 2 + .../paimon/PaimonExternalCatalog.java | 27 ++ .../paimon/PaimonMetadataCache.java | 6 + .../doris/datasource/paimon/PaimonUtil.java | 31 +- .../paimon/source/PaimonScanNode.java | 20 +- .../datasource/systable/PaimonSysTable.java | 74 +++++ .../systable/SupportedSysTables.java | 2 +- .../functions/table/PaimonMeta.java | 70 +++++ .../visitor/TableValuedFunctionVisitor.java | 5 + .../PaimonTableValuedFunction.java | 167 +++++++++++ .../tablefunction/TableValuedFunctionIf.java | 2 + gensrc/thrift/PlanNodes.thrift | 8 + gensrc/thrift/Types.thrift | 1 + .../paimon/paimon_system_table.out | 85 ++++++ .../paimon/paimon_system_table.groovy | 264 ++++++++++++++++++ 19 files changed, 1075 insertions(+), 16 deletions(-) create mode 100644 be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp create mode 100644 be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h create mode 100644 fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java create mode 100644 regression-test/data/external_table_p0/paimon/paimon_system_table.out create mode 100644 regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp new file mode 100644 index 00000000000000..f8ee73b02843f8 --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "paimon_sys_table_jni_reader.h" + +#include "runtime/runtime_state.h" +#include "util/string_util.h" + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop."; + +PaimonSysTableJniReader::PaimonSysTableJniReader( + const std::vector& file_slot_descs, RuntimeState* state, + RuntimeProfile* profile, const TPaimonMetadataParams& range_params) + : JniReader(file_slot_descs, state, profile), _range_params(range_params) { + std::vector required_fields; + std::vector required_types; + for (const auto& desc : _file_slot_descs) { + required_fields.emplace_back(desc->col_name()); + required_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type())); + } + + std::map params; + params["serialized_split"] = _range_params.serialized_split; + params["serialized_table"] = _range_params.serialized_table; + params["required_fields"] = join(required_fields, ","); + params["required_types"] = join(required_types, "#"); + + for (const auto& kv : _range_params.hadoop_props) { + params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; + } + + _jni_connector = std::make_unique( + "org/apache/doris/paimon/PaimonSysTableJniScanner", std::move(params), required_fields); +} + +Status PaimonSysTableJniReader::init_reader( + const std::unordered_map* colname_to_value_range) { + _colname_to_value_range = colname_to_value_range; + RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); + return _jni_connector->open(_state, _profile); +} + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h new file mode 100644 index 00000000000000..6c82db6eb83016 --- /dev/null +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/olap_common.h" +#include "vec/exec/format/jni_reader.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; +class SlotDescriptor; +namespace vectorized { +class Block; +} // namespace vectorized +} // namespace doris + +namespace doris::vectorized { +#include "common/compile_check_begin.h" + +class PaimonSysTableJniReader : public JniReader { + ENABLE_FACTORY_CREATOR(PaimonSysTableJniReader); + +public: + static const std::string HADOOP_OPTION_PREFIX; + PaimonSysTableJniReader(const std::vector& file_slot_descs, + RuntimeState* state, RuntimeProfile* profile, + const TPaimonMetadataParams& range_params); + + ~PaimonSysTableJniReader() override = default; + + Status init_reader( + const std::unordered_map* colname_to_value_range); + +private: + const std::unordered_map* _colname_to_value_range; + const TPaimonMetadataParams& _range_params; +}; + +#include "common/compile_check_end.h" +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/meta_scanner.cpp b/be/src/vec/exec/scan/meta_scanner.cpp index 843e09a4865f46..d64806b5e0879b 100644 --- a/be/src/vec/exec/scan/meta_scanner.cpp +++ b/be/src/vec/exec/scan/meta_scanner.cpp @@ -42,6 +42,7 @@ #include "vec/core/block.h" #include "vec/core/types.h" #include "vec/exec/format/table/iceberg_sys_table_jni_reader.h" +#include "vec/exec/format/table/paimon_sys_table_jni_reader.h" namespace doris { class RuntimeProfile; @@ -71,6 +72,12 @@ Status MetaScanner::open(RuntimeState* state) { const std::unordered_map colname_to_value_range; RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); _reader = std::move(reader); + } else if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PAIMON) { + auto reader = PaimonSysTableJniReader::create_unique( + _tuple_desc->slots(), state, _profile, _scan_range.meta_scan_range.paimon_params); + const std::unordered_map colname_to_value_range; + RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range)); + _reader = std::move(reader); } else { RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range)); } diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java new file mode 100644 index 00000000000000..55f5354ec6a514 --- /dev/null +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.paimon; + +import org.apache.doris.common.jni.JniScanner; +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.TimestampType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +/** + * JniScanner to read Paimon SysTables + */ +public class PaimonSysTableJniScanner extends JniScanner { + private static final Logger LOG = LoggerFactory.getLogger(PaimonSysTableJniScanner.class); + @Deprecated + private static final String HADOOP_OPTION_PREFIX = "hadoop."; + + private final Map params; + @Deprecated + private final Map hadoopOptionParams; + + private final ClassLoader classLoader; + private final Split paimonSplit; + private final Table table; + private RecordReader reader; + private final PaimonColumnValue columnValue = new PaimonColumnValue(); + private List paimonDataTypeList; + private final List paimonAllFieldNames; + private final PreExecutionAuthenticator preExecutionAuthenticator; + private RecordReader.RecordIterator recordIterator = null; + + public PaimonSysTableJniScanner(int batchSize, Map params) { + this.classLoader = this.getClass().getClassLoader(); + if (LOG.isDebugEnabled()) { + LOG.debug("params:{}", params); + } + this.params = params; + String[] requiredFields = params.get("required_fields").split(","); + String[] requiredTypes = params.get("required_types").split("#"); + ColumnType[] columnTypes = new ColumnType[requiredTypes.length]; + for (int i = 0; i < requiredTypes.length; i++) { + columnTypes[i] = ColumnType.parseType(requiredFields[i], requiredTypes[i]); + } + initTableInfo(columnTypes, requiredFields, batchSize); + this.paimonSplit = PaimonUtils.deserialize(params.get("serialized_split")); + this.table = PaimonUtils.deserialize(params.get("serialized_table")); + this.hadoopOptionParams = params.entrySet().stream() + .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) + .collect(Collectors + .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), Entry::getValue)); + this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); + this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); + } + + + @Override + public void open() { + try { + // When the user does not specify hive-site.xml, Paimon will look for the file from the classpath: + // org.apache.paimon.hive.HiveCatalog.createHiveConf: + // `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)` + // so we need to provide a classloader, otherwise it will cause NPE. + Thread.currentThread().setContextClassLoader(classLoader); + preExecutionAuthenticator.execute(() -> { + initReader(); + return null; + }); + resetDatetimeV2Precision(); + + } catch (Throwable e) { + LOG.warn("Failed to open paimon_scanner: {}", e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + } + } + + @Override + protected int getNext() throws IOException { + try { + return preExecutionAuthenticator.execute(this::readAndProcessNextBatch); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + private void initReader() throws IOException { + ReadBuilder readBuilder = table.newReadBuilder(); + if (this.fields.length > this.paimonAllFieldNames.size()) { + throw new IOException( + String.format( + "The jni reader fields' size {%s} is not matched with paimon fields' size {%s}." + + " Please refresh table and try again", + fields.length, paimonAllFieldNames.size())); + } + int[] projected = getProjected(); + readBuilder.withProjection(projected); + reader = readBuilder.newRead().executeFilter().createReader(paimonSplit); + paimonDataTypeList = + Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); + } + + private int[] getProjected() { + return Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray(); + } + + private void resetDatetimeV2Precision() { + for (int i = 0; i < types.length; i++) { + if (types[i].isDateTimeV2()) { + // paimon support precision > 6, but it has been reset as 6 in FE + // try to get the right precision for datetimev2 + int index = paimonAllFieldNames.indexOf(fields[i]); + if (index != -1) { + DataType dataType = table.rowType().getTypeAt(index); + if (dataType instanceof TimestampType) { + types[i].setPrecision(((TimestampType) dataType).getPrecision()); + } + } + } + } + } + + private int readAndProcessNextBatch() throws IOException { + int rows = 0; + try { + if (recordIterator == null) { + recordIterator = reader.readBatch(); + } + + while (recordIterator != null) { + InternalRow record; + while ((record = recordIterator.next()) != null) { + columnValue.setOffsetRow(record); + for (int i = 0; i < fields.length; i++) { + columnValue.setIdx(i, types[i], paimonDataTypeList.get(i)); + long l = System.nanoTime(); + appendData(i, columnValue); + appendDataTime += System.nanoTime() - l; + } + rows++; + if (rows >= batchSize) { + return rows; + } + } + recordIterator.releaseBatch(); + recordIterator = reader.readBatch(); + } + } catch (Exception e) { + close(); + LOG.warn("Failed to get the next batch of paimon. " + + "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}", + paimonSplit, params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e); + throw new IOException(e); + } + return rows; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index 97239e1191d466..d4bdf92696bc70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; +import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; import org.apache.doris.nereids.trees.expressions.functions.table.Query; @@ -55,6 +56,7 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(Local.class, "local"), tableValued(HudiMeta.class, "hudi_meta"), tableValued(IcebergMeta.class, "iceberg_meta"), + tableValued(PaimonMeta.class, "paimon_meta"), tableValued(Hdfs.class, "hdfs"), tableValued(HttpStream.class, "http_stream"), tableValued(Numbers.class, "numbers"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index 7483ef2e477cf5..4e4ed38ecf54c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -20,6 +20,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.security.authentication.HadoopAuthenticator; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; @@ -78,6 +79,15 @@ protected void initLocalObjectsImpl() { } authConf = AuthenticationConfig.getKerberosConfig(conf); hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(authConf); + initPreExecutionAuthenticator(); + } + + @Override + protected synchronized void initPreExecutionAuthenticator() { + if (preExecutionAuthenticator == null) { + preExecutionAuthenticator = new PreExecutionAuthenticator(); + preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator); + } } public String getCatalogType() { @@ -142,6 +152,23 @@ public org.apache.paimon.table.Table getPaimonTable(NameMapping nameMapping) { } public List getPaimonPartitions(NameMapping nameMapping) { + public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName, String queryType) { + return getPaimonTable(dbName, tblName, null, queryType); + } + + public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName, String branch, + String queryType) { + makeSureInitialized(); + try { + return hadoopAuthenticator.doAs(() -> catalog.getTable(new Identifier(dbName, tblName, branch, queryType))); + } catch (Exception e) { + throw new RuntimeException("Failed to get Paimon table:" + getName() + "." + + dbName + "." + tblName + ", because " + e.getMessage(), e); + } + } + + + public List getPaimonPartitions(String dbName, String tblName) { makeSureInitialized(); try { return hadoopAuthenticator.doAs(() -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index b36c532d2ed150..b3b6c5e0d31eab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -157,4 +157,10 @@ public Map> getCacheStats() { snapshotCache.estimatedSize())); return res; } + + public org.apache.paimon.table.Table getPaimonTable(CatalogIf catalog, String dbName, String tbName) { + PaimonSnapshotCacheKey paimonSnapshotCacheKey = new PaimonSnapshotCacheKey(catalog, dbName, tbName); + return ((PaimonExternalCatalog) paimonSnapshotCacheKey.getCatalog()).getPaimonTable( + paimonSnapshotCacheKey.getDbName(), paimonSnapshotCacheKey.getTableName()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 2d9218a02ff323..aa0d18002f768d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -59,11 +59,13 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.InstantiationUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Projection; import java.io.IOException; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,6 +74,7 @@ public class PaimonUtil { private static final Logger LOG = LogManager.getLogger(PaimonUtil.class); + private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding(); public static List read( Table table, @Nullable int[] projection, @Nullable Predicate predicate, @@ -104,7 +107,7 @@ public static List read( } public static PaimonPartitionInfo generatePartitionInfo(List partitionColumns, - List paimonPartitions) { + List paimonPartitions) { if (CollectionUtils.isEmpty(partitionColumns) || paimonPartitions.isEmpty()) { return PaimonPartitionInfo.EMPTY; @@ -237,6 +240,32 @@ public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) return paimonPrimitiveTypeToDorisType(type); } + public static List parseSchema(Table table) { + List primaryKeys = table.primaryKeys(); + return parseSchema(table.rowType(), primaryKeys); + } + + public static List parseSchema(RowType rowType, List primaryKeys) { + List resSchema = Lists.newArrayListWithCapacity(rowType.getFields().size()); + rowType.getFields().forEach(field -> { + resSchema.add(new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), primaryKeys.contains(field.name()), null, + field.type().isNullable(), + field.description(), true, + field.id())); + }); + return resSchema; + } + + public static String encodeObjectToString(T t) { + try { + byte[] bytes = InstantiationUtil.serializeObject(t); + return new String(BASE64_ENCODER.encode(bytes), java.nio.charset.StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public static void updatePaimonColumnUniqueId(Column column, DataType dataType) { List columns = column.getChildren(); switch (dataType.getTypeRoot()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 21474a164d0fe3..a806aee7ec78e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -57,10 +57,10 @@ import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.InstantiationUtil; +import org.apache.paimon.types.DataField; import java.io.IOException; import java.util.ArrayList; -import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -154,6 +154,9 @@ protected void doInitialize() throws UserException { serializedTable = encodeObjectToString(source.getPaimonTable()); // Todo: Get the current schema id of the table, instead of using -1. ExternalUtil.initSchemaInfo(params, -1L, source.getTargetTable().getColumns()); + serializedTable = PaimonUtil.encodeObjectToString(source.getPaimonTable()); + Preconditions.checkNotNull(source); + params.setHistorySchemaInfo(new ConcurrentHashMap<>()); } @VisibleForTesting @@ -168,17 +171,6 @@ protected void convertPredicate() { predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts); } - private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding(); - - public static String encodeObjectToString(T t) { - try { - byte[] bytes = InstantiationUtil.serializeObject(t); - return new String(BASE64_ENCODER.encode(bytes), java.nio.charset.StandardCharsets.UTF_8); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - @Override protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { if (split instanceof PaimonSplit) { @@ -210,7 +202,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) if (split != null) { // use jni reader rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI); - fileDesc.setPaimonSplit(encodeObjectToString(split)); + fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split)); rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight()); } else { // use native reader @@ -226,7 +218,7 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) fileDesc.setSchemaId(paimonSplit.getSchemaId()); } fileDesc.setFileFormat(fileFormat); - fileDesc.setPaimonPredicate(encodeObjectToString(predicates)); + fileDesc.setPaimonPredicate(PaimonUtil.encodeObjectToString(predicates)); fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName()) .collect(Collectors.joining(","))); fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java new file mode 100644 index 00000000000000..e5265987d565cb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.systable; + +import org.apache.doris.analysis.TableValuedFunctionRef; +import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta; +import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; +import org.apache.doris.tablefunction.PaimonTableValuedFunction; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.table.system.SystemTableLoader; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class PaimonSysTable extends SysTable { + private static final Logger LOG = LogManager.getLogger(PaimonSysTable.class); + + private static final List SUPPORTED_PAIMON_SYS_TABLES = SystemTableLoader.SYSTEM_TABLES.stream() + .map(PaimonSysTable::new).collect(Collectors.toList()); + + private final String tableName; + + protected PaimonSysTable(String tableName) { + super(tableName, "paimon_meta"); + this.tableName = tableName; + } + + public static List getSupportedIcebergSysTables() { + return SUPPORTED_PAIMON_SYS_TABLES; + } + + @Override + public TableValuedFunction createFunction(String ctlName, String dbName, String sourceNameWithMetaName) { + List nameParts = Lists.newArrayList(ctlName, dbName, + getSourceTableName(sourceNameWithMetaName)); + return PaimonMeta.createPaimonMeta(nameParts, tableName); + } + + @Override + public TableValuedFunctionRef createFunctionRef(String ctlName, String dbName, String sourceNameWithMetaName) { + List nameParts = Lists.newArrayList(ctlName, dbName, + getSourceTableName(sourceNameWithMetaName)); + Map params = Maps.newHashMap(); + params.put(PaimonTableValuedFunction.TABLE, Joiner.on(".").join(nameParts)); + params.put(PaimonTableValuedFunction.QUERY_TYPE, tableName); + try { + return new TableValuedFunctionRef(tvfName, null, params); + } catch (org.apache.doris.common.AnalysisException e) { + LOG.warn("should not happen. {}.{}.{}", ctlName, dbName, sourceNameWithMetaName, e); + return null; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java index 9b471030d45eca..14c19f4ffdbdf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java @@ -36,7 +36,7 @@ public class SupportedSysTables { ICEBERG_SUPPORTED_SYS_TABLES = Lists.newArrayList( IcebergSysTable.getSupportedIcebergSysTables()); // paimon - PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList(); + PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList(PaimonSysTable.getSupportedIcebergSysTables()); // hudi HUDI_SUPPORTED_SYS_TABLES = Lists.newArrayList(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java new file mode 100644 index 00000000000000..4e2b3f1dfa037d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.PaimonTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +/** + * paimon meta + */ +public class PaimonMeta extends TableValuedFunction { + + public PaimonMeta(Properties properties) { + super("paimon_meta", properties); + } + + public static PaimonMeta createPaimonMeta(List nameParts, String queryType) { + Map prop = Maps.newHashMap(); + prop.put(PaimonTableValuedFunction.TABLE, Joiner.on(".").join(nameParts)); + prop.put(PaimonTableValuedFunction.QUERY_TYPE, queryType); + return new PaimonMeta(new Properties(prop)); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return PaimonTableValuedFunction.create(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build PaimonTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitPaimonMeta(this, context); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 311a6e83a85826..a1cd65a8e8fd17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; +import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta; import org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues; import org.apache.doris.nereids.trees.expressions.functions.table.Partitions; import org.apache.doris.nereids.trees.expressions.functions.table.Query; @@ -102,6 +103,10 @@ default R visitIcebergMeta(IcebergMeta icebergMeta, C context) { return visitTableValuedFunction(icebergMeta, context); } + default R visitPaimonMeta(PaimonMeta paimonMeta, C context) { + return visitTableValuedFunction(paimonMeta, context); + } + default R visitLocal(Local local, C context) { return visitTableValuedFunction(local, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java new file mode 100644 index 00000000000000..8636d37a1bfd79 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonUtil; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataType; +import org.apache.doris.thrift.TPaimonMetadataParams; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class PaimonTableValuedFunction extends MetadataTableValuedFunction { + public static final String NAME = "paimon_meta"; + public static final String TABLE = "table"; + public static final String QUERY_TYPE = "query_type"; + + private static final ImmutableSet PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE); + + private final String queryType; + private final Table paimonSysTable; + private final List schema; + private final Map hadoopProps; + private final HadoopAuthenticator hadoopAuthenticator; + + public PaimonTableValuedFunction(TableName paimonTableName, String queryType) throws AnalysisException { + this.queryType = queryType; + CatalogIf dorisCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(paimonTableName.getCtl()); + if (!(dorisCatalog instanceof ExternalCatalog)) { + throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an external catalog"); + } + + if (!(dorisCatalog instanceof PaimonExternalCatalog)) { + throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an paimon catalog"); + } + + if (paimonTableName.getDb().equals("sys")) { + throw new AnalysisException("Paimon global system tables are only supported in Flink."); + } + + PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog) dorisCatalog; + this.hadoopProps = paimonExternalCatalog.getCatalogProperty().getHadoopProperties(); + this.hadoopAuthenticator = paimonExternalCatalog.getPreExecutionAuthenticator().getHadoopAuthenticator(); + + boolean tableExist = paimonExternalCatalog.tableExist(ConnectContext.get().getSessionContext(), + paimonTableName.getDb(), + paimonTableName.getTbl()); + if (!tableExist) { + throw new AnalysisException("Paimon table " + paimonTableName + " does not exist"); + } + + this.paimonSysTable = paimonExternalCatalog.getPaimonTable(paimonTableName.getDb(), paimonTableName.getTbl(), + queryType); + // obtain all schema + this.schema = PaimonUtil.parseSchema(paimonSysTable); + + } + + public static PaimonTableValuedFunction create(Map params) throws AnalysisException { + Map validParams = Maps.newHashMap(); + for (String key : params.keySet()) { + if (!PROPERTIES_SET.contains(key.toLowerCase())) { + throw new AnalysisException("'" + key + "' is invalid property"); + } + // check ctl, db, tbl + validParams.put(key.toLowerCase(), params.get(key)); + } + + String tableName = validParams.get(TABLE); + String queryType = validParams.get(QUERY_TYPE); + if (tableName == null || queryType == null) { + throw new AnalysisException("Invalid paimon metadata query"); + } + + String[] names = tableName.split("\\."); + if (names.length != 3) { + throw new AnalysisException("The paimon table name contains the catalogName, databaseName, and tableName"); + } + TableName paimonTableName = new TableName(names[0], names[1], names[2]); + // check auth + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), paimonTableName, PrivPredicate.SELECT)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + paimonTableName.getDb() + ": " + paimonTableName.getTbl()); + } + return new PaimonTableValuedFunction(paimonTableName, queryType); + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.PAIMON; + } + + @Override + public List getMetaScanRanges(List requiredFileds) { + int[] projections = IntStream.range(0, requiredFileds.size()).toArray(); + List splits; + + try { + splits = hadoopAuthenticator.doAs( + () -> paimonSysTable.newReadBuilder().withProjection(projections).newScan().plan().splits()); + } catch (Exception e) { + throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e)); + } + + return splits.stream().map(this::createMetaScanRange).collect(Collectors.toList()); + } + + @Override + public String getTableName() { + return "PaimonTableValuedFunction<" + queryType + ">"; + } + + @Override + public List getTableColumns() throws AnalysisException { + return schema; + } + + private TMetaScanRange createMetaScanRange(Split split) { + TMetaScanRange tMetaScanRange = new TMetaScanRange(); + tMetaScanRange.setMetadataType(TMetadataType.PAIMON); + + TPaimonMetadataParams tPaimonMetadataParams = new TPaimonMetadataParams(); + tPaimonMetadataParams.setHadoopProps(hadoopProps); + tPaimonMetadataParams.setSerializedTable(PaimonUtil.encodeObjectToString(paimonSysTable)); + tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split)); + + tMetaScanRange.setPaimonParams(tPaimonMetadataParams); + return tMetaScanRange; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index b57e322bb60f9f..2771189329c0cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -60,6 +60,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map hadoop_props } + +struct TPaimonMetadataParams { + 1: optional string serialized_split + 2: optional string serialized_table + 3: optional map hadoop_props +} + struct THudiMetadataParams { 1: optional Types.THudiQueryType hudi_query_type 2: optional string catalog @@ -608,6 +615,7 @@ struct TMetaScanRange { 10: optional TMetaCacheStatsParams meta_cache_stats_params 11: optional TPartitionValuesMetadataParams partition_values_params 12: optional THudiMetadataParams hudi_params + 13: optional TPaimonMetadataParams paimon_params } // Specification of an individual data range which is held in its entirety diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index deb34ea91dcb07..4fe2e1ab0825b1 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -748,6 +748,7 @@ enum TMetadataType { PARTITIONS = 9, PARTITION_VALUES = 10, HUDI = 11, + PAIMON = 12, } enum THudiQueryType { diff --git a/regression-test/data/external_table_p0/paimon/paimon_system_table.out b/regression-test/data/external_table_p0/paimon/paimon_system_table.out new file mode 100644 index 00000000000000..3bfc38f8bd133f --- /dev/null +++ b/regression-test/data/external_table_p0/paimon/paimon_system_table.out @@ -0,0 +1,85 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !direct_query__snapshots_result -- +1 0 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND 2024-07-11T16:01:57.425 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 -9223372036854775808 + +-- !meta_query__snapshots_result -- +1 0 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND 2024-07-11T16:01:57.425 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 -9223372036854775808 + +-- !paimon_snapshots_core_fields_direct_query -- +1 0 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 + +-- !paimon_snapshots_core_fields_meta_query -- +1 0 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 + +-- !paimon_snapshots_reordered_filed_direct_query -- +0 1 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 + +-- !paimon_snapshots_reordered_filed_meta_query -- +0 1 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 + +-- !snapshot_id_direct_query -- +1 + +-- !snapshot_id_meta_query -- +1 + +-- !direct_query_snapshot_id_predicate -- +0 1 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 + +-- !meta_query_snapshot_id_predicate -- +0 1 87d5f82b-5368-418d-91e6-58d01545e29e 9223372036854775807 APPEND manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-0 manifest-list-102a9c07-edbc-4cde-a2bf-a4253a232e10-1 \N 1 1 0 + +-- !direct_query_snapshot_id_count -- +1 + +-- !meta_query_snapshot_id_count -- +1 + +-- !direct_query_snapshots_join -- +1 0 [{"id":0,"name":"id","type":"INT"},{"id":1,"name":"ts1","type":"TIMESTAMP(1)"},{"id":2,"name":"ts2","type":"TIMESTAMP(2)"},{"id":3,"name":"ts3","type":"TIMESTAMP(3)"},{"id":4,"name":"ts4","type":"TIMESTAMP(4)"},{"id":5,"name":"ts5","type":"TIMESTAMP(5)"},{"id":6,"name":"ts6","type":"TIMESTAMP(6)"},{"id":7,"name":"ts7","type":"TIMESTAMP(7)"},{"id":8,"name":"ts8","type":"TIMESTAMP(8)"},{"id":9,"name":"ts9","type":"TIMESTAMP(9)"},{"id":10,"name":"ts11","type":"TIMESTAMP(1) WITH LOCAL TIME ZONE"},{"id":11,"name":"ts12","type":"TIMESTAMP(2) WITH LOCAL TIME ZONE"},{"id":12,"name":"ts13","type":"TIMESTAMP(3) WITH LOCAL TIME ZONE"},{"id":13,"name":"ts14","type":"TIMESTAMP(4) WITH LOCAL TIME ZONE"},{"id":14,"name":"ts15","type":"TIMESTAMP(5) WITH LOCAL TIME ZONE"},{"id":15,"name":"ts16","type":"TIMESTAMP(6) WITH LOCAL TIME ZONE"},{"id":16,"name":"ts17","type":"TIMESTAMP(7) WITH LOCAL TIME ZONE"},{"id":17,"name":"ts18","type":"TIMESTAMP(8) WITH LOCAL TIME ZONE"},{"id":18,"name":"ts19","type":"TIMESTAMP(9) WITH LOCAL TIME ZONE"}] + +-- !desc_direct_query_ctl_db_table -- +snapshot_id bigint No true \N NONE +schema_id bigint No false \N NONE +commit_user varchar(2147483647) No false \N NONE +commit_identifier bigint No false \N NONE +commit_kind varchar(2147483647) No false \N NONE +commit_time datetime(3) No false \N NONE +base_manifest_list varchar(2147483647) No false \N NONE +delta_manifest_list varchar(2147483647) No false \N NONE +changelog_manifest_list varchar(2147483647) Yes false \N NONE +total_record_count bigint Yes false \N NONE +delta_record_count bigint Yes false \N NONE +changelog_record_count bigint Yes false \N NONE +watermark bigint Yes false \N NONE + +-- !desc_direct_query_db_table -- +snapshot_id bigint No true \N NONE +schema_id bigint No false \N NONE +commit_user varchar(2147483647) No false \N NONE +commit_identifier bigint No false \N NONE +commit_kind varchar(2147483647) No false \N NONE +commit_time datetime(3) No false \N NONE +base_manifest_list varchar(2147483647) No false \N NONE +delta_manifest_list varchar(2147483647) No false \N NONE +changelog_manifest_list varchar(2147483647) Yes false \N NONE +total_record_count bigint Yes false \N NONE +delta_record_count bigint Yes false \N NONE +changelog_record_count bigint Yes false \N NONE +watermark bigint Yes false \N NONE + +-- !desc_direct_query_table -- +snapshot_id bigint No true \N NONE +schema_id bigint No false \N NONE +commit_user varchar(2147483647) No false \N NONE +commit_identifier bigint No false \N NONE +commit_kind varchar(2147483647) No false \N NONE +commit_time datetime(3) No false \N NONE +base_manifest_list varchar(2147483647) No false \N NONE +delta_manifest_list varchar(2147483647) No false \N NONE +changelog_manifest_list varchar(2147483647) Yes false \N NONE +total_record_count bigint Yes false \N NONE +delta_record_count bigint Yes false \N NONE +changelog_record_count bigint Yes false \N NONE +watermark bigint Yes false \N NONE + diff --git a/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy b/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy new file mode 100644 index 00000000000000..dffcb68e6f8d04 --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("paimon_system_table", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + + def validateQueryResults = { List> result1, List> result2, String tableType -> + logger.info("${tableType} - Direct query result size: ${result1.size()}") + logger.info("${tableType} - Meta query result size: ${result2.size()}") + assertEquals(result1.size(), result2.size(), tableType + " query size mismatch") + + for (int i = 0; i < result1.size(); i++) { + List row1 = result1.get(i) + List row2 = result2.get(i) + + for (int j = 0; j < row1.size(); j++) { + assertEquals(row1.get(j), row2.get(j), + String.format("%s data mismatch at [%d][%d]", tableType, i, j)) + } + } + + logger.info(tableType + " validation passed: " + result1.size() + " rows verified") + } + String catalog_name = "paimon_timestamp_types" + try { + + String db_name = "flink_paimon" + String tableName = "ts_scale_orc" + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='paimon', + 'warehouse' = 's3://warehouse/wh/', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """use ${db_name};""" + logger.info("use " + db_name) + + // 1. test Paimon data system table + logger.info("query data from paimon system table") + List> paimonTableList = sql """ show tables; """ + boolean targetTableExists = paimonTableList.any { row -> + row.size() > 0 && row[0].toString().equals(tableName) + } + assertTrue(targetTableExists, "Target table '${tableName}' not found in database '${db_name}'") + + // test all paimon system table + List paimonSystemTableList = new ArrayList<>(Arrays.asList("manifests", "snapshots", "options", "schemas", + "partitions", "buckets", "audit_log", "files", "tags", "branches", "consumers", "ro", "aggregation_fields", "statistics", + "binlog", "table_indexes")) + + + // Iterate through all system tables for consistency testing + for (String systemTable : paimonSystemTableList) { + logger.info("Testing system table: " + systemTable) + + // Direct query on system table + List> directQuery = sql """select * from ${tableName}\$${systemTable}""" + + // // Query through paimon_meta function + List> metaQuery = sql """select * from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "${systemTable}"); + """ + + validateQueryResults(directQuery, metaQuery, systemTable) + } + + // 2 Verify system table projection and predicate functionality + // 2.1 Column projection tests + qt_direct_query__snapshots_result """select * from ${tableName}\$snapshots order by snapshot_id""" + qt_meta_query__snapshots_result """select * from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "snapshots") order by snapshot_id; + """ + // column name + qt_paimon_snapshots_core_fields_direct_query """ + select snapshot_id, + schema_id, + commit_user, + commit_identifier, + commit_kind, + base_manifest_list, + delta_manifest_list, + changelog_manifest_list, + total_record_count, + delta_record_count, + changelog_record_count from ${tableName}\$snapshots + order by snapshot_id; + """ + qt_paimon_snapshots_core_fields_meta_query """ + select snapshot_id, + schema_id, + commit_user, + commit_identifier, + commit_kind, + base_manifest_list, + delta_manifest_list, + changelog_manifest_list, + total_record_count, + delta_record_count, + changelog_record_count from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "snapshots") + order by snapshot_id; + """ + + qt_paimon_snapshots_reordered_filed_direct_query """ + select schema_id, + snapshot_id, + commit_user, + commit_identifier, + commit_kind, + base_manifest_list, + delta_manifest_list, + changelog_manifest_list, + total_record_count, + delta_record_count, + changelog_record_count from ${tableName}\$snapshots + order by snapshot_id; + """ + + qt_paimon_snapshots_reordered_filed_meta_query """ + select schema_id, + snapshot_id, + commit_user, + commit_identifier, + commit_kind, + base_manifest_list, + delta_manifest_list, + changelog_manifest_list, + total_record_count, + delta_record_count, + changelog_record_count from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "snapshots") + order by snapshot_id; + """ + // 2.2 Predicate filtering tests + List> res1 = sql """ select snapshot_id from ${tableName}\$snapshots order by snapshot_id;""" + List> res2 = sql """ select snapshot_id from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "snapshots") + order by snapshot_id; + """ + + qt_snapshot_id_direct_query """select snapshot_id from ${tableName}\$snapshots order by snapshot_id; + """ + qt_snapshot_id_meta_query """ + select snapshot_id from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "snapshots") + order by snapshot_id; + """ + + assertTrue(res1.size() > 0, "Direct query should return data") + assertTrue(res2.size() > 0, "Meta query should return data") + String direct_query_snapshot_id = String.valueOf(res1[0][0]); + String meta_query_snapshot_id = String.valueOf(res1[0][0]); + logger.info("snapshot_id=" + direct_query_snapshot_id) + qt_direct_query_snapshot_id_predicate """select schema_id, + snapshot_id, + commit_user, + commit_identifier, + commit_kind, + base_manifest_list, + delta_manifest_list, + changelog_manifest_list, + total_record_count, + delta_record_count, + changelog_record_count from ${tableName}\$snapshots + where snapshot_id=${direct_query_snapshot_id} + order by snapshot_id; + + """ + + qt_meta_query_snapshot_id_predicate """select schema_id, + snapshot_id, + commit_user, + commit_identifier, + commit_kind, + base_manifest_list, + delta_manifest_list, + changelog_manifest_list, + total_record_count, + delta_record_count, + changelog_record_count from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "snapshots") + where snapshot_id=${meta_query_snapshot_id} + order by snapshot_id; + + """ + + //2.3 Aggregation functions + qt_direct_query_snapshot_id_count """ + select count(*) from ${tableName}\$snapshots + where snapshot_id=${direct_query_snapshot_id} + """ + qt_meta_query_snapshot_id_count """ + select count(*) from paimon_meta( + "table" = "${catalog_name}.${db_name}.${tableName}", + "query_type" = "snapshots") + where snapshot_id=${meta_query_snapshot_id} + """ + //2.4 Join operations between system tables + qt_direct_query_snapshots_join """ + SELECT s.snapshot_id, t.schema_id, t.fields + FROM ${tableName}\$snapshots s JOIN ${tableName}\$schemas t + ON s.schema_id=t.schema_id where s.snapshot_id=${direct_query_snapshot_id}; + """ + + //2.5 Table description queries + qt_desc_direct_query_ctl_db_table """ + desc ${catalog_name}.${db_name}.${tableName}\$snapshots + """ + qt_desc_direct_query_db_table """ + desc ${db_name}.${tableName}\$snapshots + """ + + qt_desc_direct_query_table """ + desc ${tableName}\$snapshots + """ + + } catch (Exception e) { + logger.error("Paimon system table test failed: " + e.getMessage()) + throw e + } finally { + // clean resource + try { + sql """drop catalog if exists ${catalog_name}""" + } catch (Exception e) { + logger.warn("Failed to cleanup catalog: " + e.getMessage()) + } + } +} \ No newline at end of file From 7717248f8230a4f2c4b3ac6b07a8ade6707a2237 Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Tue, 1 Jul 2025 19:33:49 +0800 Subject: [PATCH 02/14] delete unused code --- .../apache/doris/datasource/paimon/PaimonMetadataCache.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java index b3b6c5e0d31eab..b36c532d2ed150 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -157,10 +157,4 @@ public Map> getCacheStats() { snapshotCache.estimatedSize())); return res; } - - public org.apache.paimon.table.Table getPaimonTable(CatalogIf catalog, String dbName, String tbName) { - PaimonSnapshotCacheKey paimonSnapshotCacheKey = new PaimonSnapshotCacheKey(catalog, dbName, tbName); - return ((PaimonExternalCatalog) paimonSnapshotCacheKey.getCatalog()).getPaimonTable( - paimonSnapshotCacheKey.getDbName(), paimonSnapshotCacheKey.getTableName()); - } } From b7842456bc4b91987a6272159f497b51feb710c5 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 2 Jul 2025 12:50:29 +0800 Subject: [PATCH 03/14] refactor code --- .../table/paimon_sys_table_jni_reader.cpp | 12 +++- .../table/paimon_sys_table_jni_reader.h | 1 + .../paimon/PaimonSysTableJniScanner.java | 52 ++++++++++++++--- .../apache/doris/paimon/PaimonTableCache.java | 58 ++++++++++++++----- .../datasource/systable/PaimonSysTable.java | 6 +- .../PaimonTableValuedFunction.java | 56 +++++++++++++----- gensrc/thrift/PlanNodes.thrift | 12 +++- 7 files changed, 152 insertions(+), 45 deletions(-) diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp index f8ee73b02843f8..f9b089be7e98df 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp @@ -24,6 +24,7 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop."; +const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon."; PaimonSysTableJniReader::PaimonSysTableJniReader( const std::vector& file_slot_descs, RuntimeState* state, @@ -37,11 +38,20 @@ PaimonSysTableJniReader::PaimonSysTableJniReader( } std::map params; + params["db_name"] = _range_params.db_name; + params["tbl_name"] = _range_params.db_name; + params["query_type"] = _range_params.query_type; + params["ctl_id"] = _range_params.ctl_id; + params["db_id"] = _range_params.db_id; + params["tbl_id"] = _range_params.tbl_id; params["serialized_split"] = _range_params.serialized_split; - params["serialized_table"] = _range_params.serialized_table; params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); + for (const auto& kv : _range_params.paimon_props) { + params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; + } + for (const auto& kv : _range_params.hadoop_props) { params[HADOOP_OPTION_PREFIX + kv.first] = kv.second; } diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h index 6c82db6eb83016..f7b3108f5afd6a 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h @@ -46,6 +46,7 @@ class PaimonSysTableJniReader : public JniReader { public: static const std::string HADOOP_OPTION_PREFIX; + static const std::string PAIMON_OPTION_PREFIX; PaimonSysTableJniReader(const std::vector& file_slot_descs, RuntimeState* state, RuntimeProfile* profile, const TPaimonMetadataParams& range_params); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java index 55f5354ec6a514..d8cd6787f3ec0b 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java @@ -21,6 +21,8 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache; +import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey; +import org.apache.doris.paimon.PaimonTableCache.TableExt; import org.apache.paimon.data.InternalRow; import org.apache.paimon.reader.RecordReader; @@ -44,22 +46,29 @@ */ public class PaimonSysTableJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonSysTableJniScanner.class); - @Deprecated + private static final String HADOOP_OPTION_PREFIX = "hadoop."; + private static final String PAIMON_OPTION_PREFIX = "paimon."; private final Map params; - @Deprecated private final Map hadoopOptionParams; + private final Map paimonOptionParams; private final ClassLoader classLoader; private final Split paimonSplit; - private final Table table; + private Table table; private RecordReader reader; private final PaimonColumnValue columnValue = new PaimonColumnValue(); private List paimonDataTypeList; - private final List paimonAllFieldNames; + private List paimonAllFieldNames; private final PreExecutionAuthenticator preExecutionAuthenticator; private RecordReader.RecordIterator recordIterator = null; + private final long ctlId; + private final long dbId; + private final long tblId; + private final String dbName; + private final String tblName; + private final String queryType; public PaimonSysTableJniScanner(int batchSize, Map params) { this.classLoader = this.getClass().getClassLoader(); @@ -75,13 +84,23 @@ public PaimonSysTableJniScanner(int batchSize, Map params) { } initTableInfo(columnTypes, requiredFields, batchSize); this.paimonSplit = PaimonUtils.deserialize(params.get("serialized_split")); - this.table = PaimonUtils.deserialize(params.get("serialized_table")); + this.ctlId = Long.parseLong(PaimonUtils.deserialize(params.get("ctl_id"))); + this.dbId = Long.parseLong(PaimonUtils.deserialize(params.get("db_id"))); + this.tblId = Long.parseLong(PaimonUtils.deserialize(params.get("tbl_id"))); + this.dbName = PaimonUtils.deserialize(params.get("db_name")); + this.tblName = PaimonUtils.deserialize(params.get("tbl_name")); + this.queryType = PaimonUtils.deserialize(params.get("query_type")); this.hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors - .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), Entry::getValue)); + .toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), + Entry::getValue)); + this.paimonOptionParams = params.entrySet().stream() + .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX)) + .collect(Collectors + .toMap(kv1 -> kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), + Entry::getValue)); this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); - this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); } @@ -94,6 +113,7 @@ public void open() { // so we need to provide a classloader, otherwise it will cause NPE. Thread.currentThread().setContextClassLoader(classLoader); preExecutionAuthenticator.execute(() -> { + initTable(); initReader(); return null; }); @@ -122,6 +142,24 @@ protected int getNext() throws IOException { } + private void initTable() { + PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, + paimonOptionParams, hadoopOptionParams, dbName, tblName, queryType); + TableExt tableExt = PaimonTableCache.getTable(key); + Table paimonTable = tableExt.getTable(); + if (paimonTable == null) { + throw new RuntimeException( + String.format( + "Failed to get Paimon system table {%s}.{%s}${%s}. ", + dbName, tblName, queryType)); + } + this.table = paimonTable; + this.paimonAllFieldNames = PaimonUtils.getFieldNames(this.table.rowType()); + if (LOG.isDebugEnabled()) { + LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames); + } + } + private void initReader() throws IOException { ReadBuilder readBuilder = table.newReadBuilder(); if (this.fields.length > this.paimonAllFieldNames.size()) { diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java index 12aac1533920b4..a6a05a91218482 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java @@ -56,7 +56,13 @@ private static TableExt loadTable(PaimonTableCacheKey key) { try { LOG.warn("load table:{}", key); Catalog catalog = createCatalog(key.getPaimonOptionParams(), key.getHadoopOptionParams()); - Table table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); + Table table; + if (key.getQueryType() != null) { + table = catalog.getTable(new Identifier(key.getDbName(), key.getTblName(), + null, key.getQueryType())); + } else { + table = catalog.getTable(Identifier.create(key.getDbName(), key.getTblName())); + } return new TableExt(table, System.currentTimeMillis()); } catch (Catalog.TableNotExistException e) { LOG.warn("failed to create paimon table ", e); @@ -107,15 +113,16 @@ public long getCreateTime() { public static class PaimonTableCacheKey { // in key - private long ctlId; - private long dbId; - private long tblId; + private final long ctlId; + private final long dbId; + private final long tblId; // not in key private Map paimonOptionParams; private Map hadoopOptionParams; private String dbName; private String tblName; + private String queryType; public PaimonTableCacheKey(long ctlId, long dbId, long tblId, Map paimonOptionParams, @@ -130,6 +137,20 @@ public PaimonTableCacheKey(long ctlId, long dbId, long tblId, this.tblName = tblName; } + public PaimonTableCacheKey(long ctlId, long dbId, long tblId, + Map paimonOptionParams, + Map hadoopOptionParams, + String dbName, String tblName, String queryType) { + this.ctlId = ctlId; + this.dbId = dbId; + this.tblId = tblId; + this.paimonOptionParams = paimonOptionParams; + this.hadoopOptionParams = hadoopOptionParams; + this.dbName = dbName; + this.tblName = tblName; + this.queryType = queryType; + } + public long getCtlId() { return ctlId; } @@ -158,6 +179,10 @@ public String getTblName() { return tblName; } + public String getQueryType() { + return queryType; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -167,9 +192,9 @@ public boolean equals(Object o) { return false; } PaimonTableCacheKey that = (PaimonTableCacheKey) o; - return ctlId == that.ctlId - && dbId == that.dbId - && tblId == that.tblId; + return ctlId == that.ctlId && dbId == that.dbId && tblId == that.tblId && Objects.equal( + queryType, + that.queryType); } @Override @@ -179,15 +204,16 @@ public int hashCode() { @Override public String toString() { - return "PaimonTableCacheKey{" - + "ctlId=" + ctlId - + ", dbId=" + dbId - + ", tblId=" + tblId - + ", paimonOptionParams=" + paimonOptionParams - + ", hadoopOptionParams=" + hadoopOptionParams - + ", dbName='" + dbName + '\'' - + ", tblName='" + tblName + '\'' - + '}'; + return "PaimonTableCacheKey{" + + "ctlId=" + ctlId + + ", dbId=" + dbId + + ", tblId=" + tblId + + ", paimonOptionParams=" + paimonOptionParams + + ", hadoopOptionParams=" + hadoopOptionParams + + ", dbName='" + dbName + '\'' + + ", tblName='" + tblName + '\'' + + ", queryType='" + queryType + '\'' + + '}'; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java index e5265987d565cb..b54d2f1b70e9ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java @@ -36,8 +36,10 @@ public class PaimonSysTable extends SysTable { private static final Logger LOG = LogManager.getLogger(PaimonSysTable.class); - private static final List SUPPORTED_PAIMON_SYS_TABLES = SystemTableLoader.SYSTEM_TABLES.stream() - .map(PaimonSysTable::new).collect(Collectors.toList()); + private static final List SUPPORTED_PAIMON_SYS_TABLES = SystemTableLoader.SYSTEM_TABLES + .stream() + .map(PaimonSysTable::new) + .collect(Collectors.toList()); private final String tableName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java index 8636d37a1bfd79..56cef276c2ac5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java @@ -25,7 +25,8 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -43,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.IntStream; public class PaimonTableValuedFunction extends MetadataTableValuedFunction { public static final String NAME = "paimon_meta"; @@ -56,37 +56,49 @@ public class PaimonTableValuedFunction extends MetadataTableValuedFunction { private final Table paimonSysTable; private final List schema; private final Map hadoopProps; + private final Map paimonProps; private final HadoopAuthenticator hadoopAuthenticator; + private final TableName paimonTableName; + private final long ctlId; + private final long dbId; + private final long tblId; public PaimonTableValuedFunction(TableName paimonTableName, String queryType) throws AnalysisException { this.queryType = queryType; - CatalogIf dorisCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(paimonTableName.getCtl()); - if (!(dorisCatalog instanceof ExternalCatalog)) { - throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an external catalog"); - } + CatalogIf dorisCatalog = Env.getCurrentEnv() + .getCatalogMgr() + .getCatalog(paimonTableName.getCtl()); if (!(dorisCatalog instanceof PaimonExternalCatalog)) { throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an paimon catalog"); } if (paimonTableName.getDb().equals("sys")) { - throw new AnalysisException("Paimon global system tables are only supported in Flink."); + throw new AnalysisException("Paimon global system table are only supported in Flink."); } + this.paimonTableName = paimonTableName; PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog) dorisCatalog; this.hadoopProps = paimonExternalCatalog.getCatalogProperty().getHadoopProperties(); + this.paimonProps = paimonExternalCatalog.getPaimonOptionsMap(); this.hadoopAuthenticator = paimonExternalCatalog.getPreExecutionAuthenticator().getHadoopAuthenticator(); + this.ctlId = paimonExternalCatalog.getId(); - boolean tableExist = paimonExternalCatalog.tableExist(ConnectContext.get().getSessionContext(), - paimonTableName.getDb(), - paimonTableName.getTbl()); - if (!tableExist) { - throw new AnalysisException("Paimon table " + paimonTableName + " does not exist"); - } + ExternalDatabase database = paimonExternalCatalog.getDb(paimonTableName.getDb()) + .orElseThrow(() -> new AnalysisException( + String.format("Paimon catalog database '%s' does not exist", paimonTableName.getDb()) + )); + this.dbId = database.getId(); + + ExternalTable externalTable = database.getTable(paimonTableName.getTbl()) + .orElseThrow(() -> new AnalysisException( + String.format("Paimon catalog table '%s.%s' does not exist", + paimonTableName.getDb(), paimonTableName.getTbl()) + )); + this.tblId = externalTable.getId(); this.paimonSysTable = paimonExternalCatalog.getPaimonTable(paimonTableName.getDb(), paimonTableName.getTbl(), queryType); - // obtain all schema this.schema = PaimonUtil.parseSchema(paimonSysTable); } @@ -129,7 +141,13 @@ public TMetadataType getMetadataType() { @Override public List getMetaScanRanges(List requiredFileds) { - int[] projections = IntStream.range(0, requiredFileds.size()).toArray(); + int[] projections = requiredFileds.stream().mapToInt( + field -> paimonSysTable.rowType().getFieldNames() + .stream() + .map(String::toLowerCase) + .collect(Collectors.toList()) + .indexOf(field)) + .toArray(); List splits; try { @@ -157,8 +175,14 @@ private TMetaScanRange createMetaScanRange(Split split) { tMetaScanRange.setMetadataType(TMetadataType.PAIMON); TPaimonMetadataParams tPaimonMetadataParams = new TPaimonMetadataParams(); + tPaimonMetadataParams.setCtlId(ctlId); + tPaimonMetadataParams.setDbId(dbId); + tPaimonMetadataParams.setTblId(tblId); + tPaimonMetadataParams.setQueryType(queryType); + tPaimonMetadataParams.setDbName(paimonTableName.getDb()); + tPaimonMetadataParams.setTblName(paimonTableName.getTbl()); tPaimonMetadataParams.setHadoopProps(hadoopProps); - tPaimonMetadataParams.setSerializedTable(PaimonUtil.encodeObjectToString(paimonSysTable)); + tPaimonMetadataParams.setPaimonProps(paimonProps); tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split)); tMetaScanRange.setPaimonParams(tPaimonMetadataParams); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 42ee359e09c71e..ea86689f9f2345 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -542,9 +542,15 @@ struct TIcebergMetadataParams { struct TPaimonMetadataParams { - 1: optional string serialized_split - 2: optional string serialized_table - 3: optional map hadoop_props + 1: optional string db_name + 2: optional string tbl_name + 3: optional string query_type + 4: optional i64 ctl_id + 5: optional i64 db_id + 6: optional i64 tbl_id + 7: optional string serialized_split + 8: optional map hadoop_props + 9: optional map paimon_props } struct THudiMetadataParams { From d9fde7848dc2df89ed22b5ed0595d99cc0595ff5 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 2 Jul 2025 13:20:54 +0800 Subject: [PATCH 04/14] resolve type cast error --- .../vec/exec/format/table/paimon_sys_table_jni_reader.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp index f9b089be7e98df..91c71149f530d7 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp @@ -41,9 +41,9 @@ PaimonSysTableJniReader::PaimonSysTableJniReader( params["db_name"] = _range_params.db_name; params["tbl_name"] = _range_params.db_name; params["query_type"] = _range_params.query_type; - params["ctl_id"] = _range_params.ctl_id; - params["db_id"] = _range_params.db_id; - params["tbl_id"] = _range_params.tbl_id; + params["ctl_id"] = std::to_string(_range_params.ctl_id); + params["db_id"] = std::to_string(_range_params.db_id); + params["tbl_id"] = std::to_string(_range_params.tbl_id); params["serialized_split"] = _range_params.serialized_split; params["required_fields"] = join(required_fields, ","); params["required_types"] = join(required_types, "#"); From 6aef0194a3fc36f3467fde117716cfc9a772d371 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 2 Jul 2025 14:21:17 +0800 Subject: [PATCH 05/14] resolve paimon checkstyle --- .../paimon/PaimonSysTableJniScanner.java | 7 +++--- .../apache/doris/paimon/PaimonTableCache.java | 23 ++++++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java index d8cd6787f3ec0b..7f087271be7a5a 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java @@ -42,7 +42,7 @@ import java.util.stream.Collectors; /** - * JniScanner to read Paimon SysTables + * JNI-based scanner for reading Apache Paimon system tables */ public class PaimonSysTableJniScanner extends JniScanner { private static final Logger LOG = LoggerFactory.getLogger(PaimonSysTableJniScanner.class); @@ -70,6 +70,9 @@ public class PaimonSysTableJniScanner extends JniScanner { private final String tblName; private final String queryType; + /** + * Constructs a new PaimonSysTableJniScanner for reading Paimon system tables. + */ public PaimonSysTableJniScanner(int batchSize, Map params) { this.classLoader = this.getClass().getClassLoader(); if (LOG.isDebugEnabled()) { @@ -103,7 +106,6 @@ public PaimonSysTableJniScanner(int batchSize, Map params) { this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams); } - @Override public void open() { try { @@ -141,7 +143,6 @@ protected int getNext() throws IOException { } } - private void initTable() { PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId, paimonOptionParams, hadoopOptionParams, dbName, tblName, queryType); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java index a6a05a91218482..e5f067af96b728 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java @@ -36,12 +36,13 @@ import java.util.concurrent.TimeUnit; public class PaimonTableCache { - private static final Logger LOG = LoggerFactory.getLogger(PaimonTableCache.class); // Max cache num of paimon table public static final long max_external_schema_cache_num = 50; // The expiration time of a cache object after last access of it. public static final long external_cache_expire_time_minutes_after_access = 100; + private static final Logger LOG = LoggerFactory.getLogger(PaimonTableCache.class); + private static LoadingCache tableCache = CacheBuilder.newBuilder() .maximumSize(max_external_schema_cache_num) .expireAfterAccess(external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) @@ -204,16 +205,16 @@ public int hashCode() { @Override public String toString() { - return "PaimonTableCacheKey{" + - "ctlId=" + ctlId + - ", dbId=" + dbId + - ", tblId=" + tblId + - ", paimonOptionParams=" + paimonOptionParams + - ", hadoopOptionParams=" + hadoopOptionParams + - ", dbName='" + dbName + '\'' + - ", tblName='" + tblName + '\'' + - ", queryType='" + queryType + '\'' + - '}'; + return "PaimonTableCacheKey{" + + "ctlId=" + ctlId + + ", dbId=" + dbId + + ", tblId=" + tblId + + ", paimonOptionParams=" + paimonOptionParams + + ", hadoopOptionParams=" + hadoopOptionParams + + ", dbName='" + dbName + '\'' + + ", tblName='" + tblName + '\'' + + ", queryType='" + queryType + '\'' + + '}'; } } From 7ccfd860b87bcb308340897f956a5050097b4929 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 2 Jul 2025 14:44:32 +0800 Subject: [PATCH 06/14] resolve serialization issues --- .../doris/paimon/PaimonSysTableJniScanner.java | 12 ++++++------ .../doris/datasource/systable/PaimonSysTable.java | 8 ++++++++ .../expressions/functions/table/PaimonMeta.java | 2 +- .../tablefunction/PaimonTableValuedFunction.java | 10 ++++++++++ 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java index 7f087271be7a5a..0344c8b2db4194 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java @@ -87,12 +87,12 @@ public PaimonSysTableJniScanner(int batchSize, Map params) { } initTableInfo(columnTypes, requiredFields, batchSize); this.paimonSplit = PaimonUtils.deserialize(params.get("serialized_split")); - this.ctlId = Long.parseLong(PaimonUtils.deserialize(params.get("ctl_id"))); - this.dbId = Long.parseLong(PaimonUtils.deserialize(params.get("db_id"))); - this.tblId = Long.parseLong(PaimonUtils.deserialize(params.get("tbl_id"))); - this.dbName = PaimonUtils.deserialize(params.get("db_name")); - this.tblName = PaimonUtils.deserialize(params.get("tbl_name")); - this.queryType = PaimonUtils.deserialize(params.get("query_type")); + this.ctlId = Long.parseLong(params.get("ctl_id")); + this.dbId = Long.parseLong(params.get("db_id")); + this.tblId = Long.parseLong(params.get("tbl_id")); + this.dbName = params.get("db_name"); + this.tblName = params.get("tbl_name"); + this.queryType = params.get("query_type"); this.hadoopOptionParams = params.entrySet().stream() .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX)) .collect(Collectors diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java index b54d2f1b70e9ba..a961e1a6d63dc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java @@ -33,6 +33,9 @@ import java.util.Map; import java.util.stream.Collectors; +/** + * System table implementation for Paimon metadata tables. + */ public class PaimonSysTable extends SysTable { private static final Logger LOG = LogManager.getLogger(PaimonSysTable.class); @@ -43,6 +46,11 @@ public class PaimonSysTable extends SysTable { private final String tableName; + /** + * Creates a new Paimon system table instance. + * + * @param tableName the name of the system table + */ protected PaimonSysTable(String tableName) { super(tableName, "paimon_meta"); this.tableName = tableName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java index 4e2b3f1dfa037d..9c0ac35af41e20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java @@ -32,7 +32,7 @@ import java.util.Map; /** - * paimon meta + * Table-valued function for accessing Paimon metadata tables. */ public class PaimonMeta extends TableValuedFunction { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java index 56cef276c2ac5f..e2fdac969a957a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java @@ -45,6 +45,9 @@ import java.util.Map; import java.util.stream.Collectors; +/** + * Table-valued function for querying Paimon system tables metadata. + */ public class PaimonTableValuedFunction extends MetadataTableValuedFunction { public static final String NAME = "paimon_meta"; public static final String TABLE = "table"; @@ -63,6 +66,13 @@ public class PaimonTableValuedFunction extends MetadataTableValuedFunction { private final long dbId; private final long tblId; + /** + * Creates a new Paimon table-valued function instance. + * + * @param paimonTableName the target Paimon table name + * @param queryType the type of metadata query to perform + * @throws AnalysisException if table validation or initialization fails + */ public PaimonTableValuedFunction(TableName paimonTableName, String queryType) throws AnalysisException { this.queryType = queryType; CatalogIf dorisCatalog = Env.getCurrentEnv() From 38b43c2295adb78d3f48a44b6c042e13d1ce8456 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 2 Jul 2025 15:22:33 +0800 Subject: [PATCH 07/14] fix jni error --- be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp index 91c71149f530d7..e6b0263ed25053 100644 --- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp @@ -39,7 +39,7 @@ PaimonSysTableJniReader::PaimonSysTableJniReader( std::map params; params["db_name"] = _range_params.db_name; - params["tbl_name"] = _range_params.db_name; + params["tbl_name"] = _range_params.tbl_name; params["query_type"] = _range_params.query_type; params["ctl_id"] = std::to_string(_range_params.ctl_id); params["db_id"] = std::to_string(_range_params.db_id); From 05f306632dca9ee7c6899d333af875cd3560e67a Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 2 Jul 2025 15:53:23 +0800 Subject: [PATCH 08/14] optimize code --- .../apache/doris/datasource/systable/SupportedSysTables.java | 3 ++- .../apache/doris/tablefunction/PaimonTableValuedFunction.java | 4 ---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java index 14c19f4ffdbdf0..e15463fd38332e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java @@ -36,7 +36,8 @@ public class SupportedSysTables { ICEBERG_SUPPORTED_SYS_TABLES = Lists.newArrayList( IcebergSysTable.getSupportedIcebergSysTables()); // paimon - PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList(PaimonSysTable.getSupportedIcebergSysTables()); + PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList( + PaimonSysTable.getSupportedIcebergSysTables()); // hudi HUDI_SUPPORTED_SYS_TABLES = Lists.newArrayList(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java index e2fdac969a957a..7867f229e7558a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java @@ -83,10 +83,6 @@ public PaimonTableValuedFunction(TableName paimonTableName, String queryType) th throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an paimon catalog"); } - if (paimonTableName.getDb().equals("sys")) { - throw new AnalysisException("Paimon global system table are only supported in Flink."); - } - this.paimonTableName = paimonTableName; PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog) dorisCatalog; this.hadoopProps = paimonExternalCatalog.getCatalogProperty().getHadoopProperties(); From fd6ffa8f78b6addc7ce1aac2aa3b40ff07310dea Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Thu, 3 Jul 2025 17:07:44 +0800 Subject: [PATCH 09/14] modify variables --- .../org/apache/doris/datasource/systable/PaimonSysTable.java | 2 +- .../apache/doris/datasource/systable/SupportedSysTables.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java index a961e1a6d63dc1..cf52c78f4edbb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java @@ -56,7 +56,7 @@ protected PaimonSysTable(String tableName) { this.tableName = tableName; } - public static List getSupportedIcebergSysTables() { + public static List getSupportedPaimonSysTables() { return SUPPORTED_PAIMON_SYS_TABLES; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java index e15463fd38332e..fc635113a1df4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java @@ -37,7 +37,7 @@ public class SupportedSysTables { IcebergSysTable.getSupportedIcebergSysTables()); // paimon PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList( - PaimonSysTable.getSupportedIcebergSysTables()); + PaimonSysTable.getSupportedPaimonSysTables()); // hudi HUDI_SUPPORTED_SYS_TABLES = Lists.newArrayList(); } From 9a92ccc7d976d12f06e9aa164d9691f66712fd95 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Mon, 7 Jul 2025 11:52:44 +0800 Subject: [PATCH 10/14] resovle conflict --- .../paimon/PaimonExternalCatalog.java | 34 ++++++------ .../doris/datasource/paimon/PaimonUtil.java | 55 ++++++++++--------- .../paimon/source/PaimonScanNode.java | 23 +++----- .../PaimonTableValuedFunction.java | 4 +- 4 files changed, 58 insertions(+), 58 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java index 4e4ed38ecf54c4..7e73639b6dd1ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -152,23 +152,6 @@ public org.apache.paimon.table.Table getPaimonTable(NameMapping nameMapping) { } public List getPaimonPartitions(NameMapping nameMapping) { - public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName, String queryType) { - return getPaimonTable(dbName, tblName, null, queryType); - } - - public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName, String branch, - String queryType) { - makeSureInitialized(); - try { - return hadoopAuthenticator.doAs(() -> catalog.getTable(new Identifier(dbName, tblName, branch, queryType))); - } catch (Exception e) { - throw new RuntimeException("Failed to get Paimon table:" + getName() + "." - + dbName + "." + tblName + ", because " + e.getMessage(), e); - } - } - - - public List getPaimonPartitions(String dbName, String tblName) { makeSureInitialized(); try { return hadoopAuthenticator.doAs(() -> { @@ -188,6 +171,23 @@ public List getPaimonPartitions(String dbName, String tblName) { } } + public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping nameMapping, String queryType) { + return getPaimonSystemTable(nameMapping, null, queryType); + } + + public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping nameMapping, String branch, + String queryType) { + makeSureInitialized(); + try { + return hadoopAuthenticator.doAs(() -> catalog.getTable(new Identifier(nameMapping.getRemoteDbName(), + nameMapping.getRemoteTblName(), branch, queryType))); + } catch (Exception e) { + throw new RuntimeException("Failed to get Paimon system table:" + getName() + "." + + nameMapping.getRemoteDbName() + "." + nameMapping.getRemoteTblName() + "$" + queryType + + ", because " + e.getMessage(), e); + } + } + protected String getPaimonCatalogType(String catalogType) { if (PAIMON_HMS.equalsIgnoreCase(catalogType)) { return PaimonProperties.PAIMON_HMS_CATALOG; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index aa0d18002f768d..a0ede02a32a0ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -240,32 +240,6 @@ public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) return paimonPrimitiveTypeToDorisType(type); } - public static List parseSchema(Table table) { - List primaryKeys = table.primaryKeys(); - return parseSchema(table.rowType(), primaryKeys); - } - - public static List parseSchema(RowType rowType, List primaryKeys) { - List resSchema = Lists.newArrayListWithCapacity(rowType.getFields().size()); - rowType.getFields().forEach(field -> { - resSchema.add(new Column(field.name().toLowerCase(), - PaimonUtil.paimonTypeToDorisType(field.type()), primaryKeys.contains(field.name()), null, - field.type().isNullable(), - field.description(), true, - field.id())); - }); - return resSchema; - } - - public static String encodeObjectToString(T t) { - try { - byte[] bytes = InstantiationUtil.serializeObject(t); - return new String(BASE64_ENCODER.encode(bytes), java.nio.charset.StandardCharsets.UTF_8); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - public static void updatePaimonColumnUniqueId(Column column, DataType dataType) { List columns = column.getChildren(); switch (dataType.getTypeRoot()) { @@ -368,4 +342,33 @@ public static TSchema getSchemaInfo(TableSchema paimonTableSchema) { return tSchema; } + public static List parseSchema(Table table) { + List primaryKeys = table.primaryKeys(); + return parseSchema(table.rowType(), primaryKeys); + } + + public static List parseSchema(RowType rowType, List primaryKeys) { + List resSchema = Lists.newArrayListWithCapacity(rowType.getFields().size()); + rowType.getFields().forEach(field -> { + resSchema.add(new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), + primaryKeys.contains(field.name()), + null, + field.type().isNullable(), + field.description(), + true, + field.id())); + }); + return resSchema; + } + + public static String encodeObjectToString(T t) { + try { + byte[] bytes = InstantiationUtil.serializeObject(t); + return new String(BASE64_ENCODER.encode(bytes), java.nio.charset.StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index a806aee7ec78e4..0d0bf8d3b32ba8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -56,8 +56,6 @@ import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ReadBuilder; -import org.apache.paimon.utils.InstantiationUtil; -import org.apache.paimon.types.DataField; import java.io.IOException; import java.util.ArrayList; @@ -151,12 +149,9 @@ public PaimonScanNode(PlanNodeId id, protected void doInitialize() throws UserException { super.doInitialize(); source = new PaimonSource(desc); - serializedTable = encodeObjectToString(source.getPaimonTable()); + serializedTable = PaimonUtil.encodeObjectToString(source.getPaimonTable()); // Todo: Get the current schema id of the table, instead of using -1. ExternalUtil.initSchemaInfo(params, -1L, source.getTargetTable().getColumns()); - serializedTable = PaimonUtil.encodeObjectToString(source.getPaimonTable()); - Preconditions.checkNotNull(source); - params.setHistorySchemaInfo(new ConcurrentHashMap<>()); } @VisibleForTesting @@ -187,7 +182,7 @@ private void putHistorySchemaInfo(Long schemaId) { if (currentQuerySchema.putIfAbsent(schemaId, Boolean.TRUE) == null) { PaimonExternalTable table = (PaimonExternalTable) source.getTargetTable(); TableSchema tableSchema = Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() - .getPaimonSchemaCacheValue(table.getOrBuildNameMapping(), schemaId).getTableSchema(); + .getPaimonSchemaCacheValue(table.getOrBuildNameMapping(), schemaId).getTableSchema(); params.addToHistorySchemaInfo(PaimonUtil.getSchemaInfo(tableSchema)); } } @@ -372,13 +367,13 @@ public List getPaimonSplitFromAPI() throws return Collections.emptyList(); } int[] projected = desc.getSlots().stream().mapToInt( - slot -> source.getPaimonTable().rowType() - .getFieldNames() - .stream() - .map(String::toLowerCase) - .collect(Collectors.toList()) - .indexOf(slot.getColumn().getName())) - .toArray(); + slot -> source.getPaimonTable().rowType() + .getFieldNames() + .stream() + .map(String::toLowerCase) + .collect(Collectors.toList()) + .indexOf(slot.getColumn().getName())) + .toArray(); Table paimonTable = source.getPaimonTable(); Map incrReadParams = getIncrReadParams(); paimonTable = paimonTable.copy(incrReadParams); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java index 7867f229e7558a..0c44800b943a5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java @@ -27,6 +27,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.NameMapping; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -101,9 +102,10 @@ public PaimonTableValuedFunction(TableName paimonTableName, String queryType) th String.format("Paimon catalog table '%s.%s' does not exist", paimonTableName.getDb(), paimonTableName.getTbl()) )); + NameMapping buildNameMapping = externalTable.getOrBuildNameMapping(); this.tblId = externalTable.getId(); - this.paimonSysTable = paimonExternalCatalog.getPaimonTable(paimonTableName.getDb(), paimonTableName.getTbl(), + this.paimonSysTable = paimonExternalCatalog.getPaimonSystemTable(buildNameMapping, queryType); this.schema = PaimonUtil.parseSchema(paimonSysTable); From 857caf6419192b72db5ee3b95ee43720c613733b Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Tue, 8 Jul 2025 21:14:06 +0800 Subject: [PATCH 11/14] remove ro binlog audit_log --- .../apache/doris/datasource/systable/PaimonSysTable.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java index cf52c78f4edbb1..68d98aead5fcc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java @@ -29,8 +29,11 @@ import org.apache.logging.log4j.Logger; import org.apache.paimon.table.system.SystemTableLoader; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -39,8 +42,11 @@ public class PaimonSysTable extends SysTable { private static final Logger LOG = LogManager.getLogger(PaimonSysTable.class); + private static final Set EXCLUDED_SYS_TABLES = new HashSet<>(Arrays.asList("binlog", "ro", "audit_log")); + private static final List SUPPORTED_PAIMON_SYS_TABLES = SystemTableLoader.SYSTEM_TABLES .stream() + .filter(table -> !EXCLUDED_SYS_TABLES.contains(table)) .map(PaimonSysTable::new) .collect(Collectors.toList()); From 8d01a17b879f163ec0c0a0d651ef654eb3ae29fb Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Tue, 8 Jul 2025 21:50:59 +0800 Subject: [PATCH 12/14] optimize print error info --- .../java/org/apache/doris/nereids/util/RelationUtil.java | 5 ++++- .../external_table_p0/paimon/paimon_system_table.groovy | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java index e796149b748208..f74c788894221f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java @@ -141,9 +141,12 @@ public static Pair, TableIf> getDbAndTable( "Table [" + tableName + "] does not exist in database [" + dbName + "]." + (origin.map(loc -> "(" + loc + ")").orElse(""))) ); + Optional sysTable = tbl.getSysTableFunction(catalogName, dbName, tableName); if (!Strings.isNullOrEmpty(tableNameWithSysTableName.second) && !sysTable.isPresent()) { - throw new AnalysisException("Unknown sys table '" + tableName + "'"); + throw new AnalysisException(String.format( + "System table '%s' is not supported for table '%s.%s.%s'. Available system tables: %s", + tableName, catalogName, dbName, tableName, tbl.getSupportedSysTables())); } return Pair.of(db, tbl); } catch (Throwable e) { diff --git a/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy b/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy index dffcb68e6f8d04..57fb6c45f10593 100644 --- a/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy +++ b/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy @@ -74,8 +74,8 @@ suite("paimon_system_table", "p0,external,doris,external_docker,external_docker_ // test all paimon system table List paimonSystemTableList = new ArrayList<>(Arrays.asList("manifests", "snapshots", "options", "schemas", - "partitions", "buckets", "audit_log", "files", "tags", "branches", "consumers", "ro", "aggregation_fields", "statistics", - "binlog", "table_indexes")) + "partitions", "buckets", "files", "tags", "branches", "consumers", "aggregation_fields", + "statistics", "table_indexes")) // Iterate through all system tables for consistency testing From 45e892ac8ecdd62df17a0646375b612624ca110d Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Tue, 8 Jul 2025 22:23:52 +0800 Subject: [PATCH 13/14] format code --- .../java/org/apache/doris/nereids/util/RelationUtil.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java index f74c788894221f..2f8c83297b6504 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java @@ -144,9 +144,13 @@ public static Pair, TableIf> getDbAndTable( Optional sysTable = tbl.getSysTableFunction(catalogName, dbName, tableName); if (!Strings.isNullOrEmpty(tableNameWithSysTableName.second) && !sysTable.isPresent()) { + String supportedTables = tbl.getSupportedSysTables() + .stream() + .map(SysTable::getSysTableName) + .collect(Collectors.joining(", ", "[", "]")); throw new AnalysisException(String.format( - "System table '%s' is not supported for table '%s.%s.%s'. Available system tables: %s", - tableName, catalogName, dbName, tableName, tbl.getSupportedSysTables())); + "System table '%s' is not supported for table '%s.%s.%s'. Supported system tables: %s", + tableName, catalogName, dbName, tableName, supportedTables)); } return Pair.of(db, tbl); } catch (Throwable e) { From 0bd6be646cb9e998621e449bca649d4c58356ad6 Mon Sep 17 00:00:00 2001 From: vinlee19 Date: Wed, 9 Jul 2025 09:27:33 +0800 Subject: [PATCH 14/14] revert print info --- .../java/org/apache/doris/nereids/util/RelationUtil.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java index 2f8c83297b6504..e796149b748208 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java @@ -141,16 +141,9 @@ public static Pair, TableIf> getDbAndTable( "Table [" + tableName + "] does not exist in database [" + dbName + "]." + (origin.map(loc -> "(" + loc + ")").orElse(""))) ); - Optional sysTable = tbl.getSysTableFunction(catalogName, dbName, tableName); if (!Strings.isNullOrEmpty(tableNameWithSysTableName.second) && !sysTable.isPresent()) { - String supportedTables = tbl.getSupportedSysTables() - .stream() - .map(SysTable::getSysTableName) - .collect(Collectors.joining(", ", "[", "]")); - throw new AnalysisException(String.format( - "System table '%s' is not supported for table '%s.%s.%s'. Supported system tables: %s", - tableName, catalogName, dbName, tableName, supportedTables)); + throw new AnalysisException("Unknown sys table '" + tableName + "'"); } return Pair.of(db, tbl); } catch (Throwable e) {