From 35a3b1a476dc8f2598828b633561e2d0d9cb85db Mon Sep 17 00:00:00 2001 From: Liebing Date: Fri, 3 Apr 2026 19:41:31 +0800 Subject: [PATCH 1/2] [server] Integrate remote directory selector into table/partition creation --- .../apache/fluss/config/FlussConfigUtils.java | 76 +++-- .../fluss/config/FlussConfigUtilsTest.java | 14 + .../fluss/server/DynamicServerConfig.java | 10 +- .../coordinator/AutoPartitionManager.java | 10 +- .../server/coordinator/CoordinatorServer.java | 15 +- .../coordinator/CoordinatorService.java | 20 +- .../server/coordinator/MetadataManager.java | 10 +- .../remote/RemoteDirDynamicLoader.java | 167 +++++++++++ .../coordinator/remote/RemoteDirSelector.java | 49 +++ .../remote/RoundRobinRemoteDirSelector.java | 55 ++++ .../WeightedRoundRobinRemoteDirSelector.java | 108 +++++++ .../fluss/server/DynamicConfigChangeTest.java | 55 ++++ .../coordinator/AutoPartitionManagerTest.java | 27 +- .../CoordinatorEventProcessorTest.java | 64 ++-- .../event/watcher/TableChangeWatcherTest.java | 17 +- .../rebalance/RebalanceManagerTest.java | 18 +- .../remote/RemoteDirDynamicLoaderTest.java | 110 +++++++ .../coordinator/remote/RemoteDirsITCase.java | 278 ++++++++++++++++++ .../RoundRobinRemoteDirSelectorTest.java | 163 ++++++++++ ...ightedRoundRobinRemoteDirSelectorTest.java | 246 ++++++++++++++++ .../TableBucketStateMachineTest.java | 2 + .../metadata/ZkBasedMetadataProviderTest.java | 4 +- .../testutils/FlussClusterExtension.java | 35 ++- 23 files changed, 1476 insertions(+), 77 deletions(-) create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirSelector.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelector.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelectorTest.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelectorTest.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index adbbce0af4..7daa48c2bf 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -128,34 +128,9 @@ public static void validateTabletConfigs(Configuration conf) { validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0); } - /** Validate common server configs. */ - protected static void validateServerConfigs(Configuration conf) { - // Validate remote.data.dir and remote.data.dirs - String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR); - List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); - if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null - && conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) { - throw new IllegalConfigurationException( - String.format( - "Either %s or %s must be configured.", - ConfigOptions.REMOTE_DATA_DIR.key(), - ConfigOptions.REMOTE_DATA_DIRS.key())); - } - - if (remoteDataDir != null) { - // Must validate that remote.data.dir is a valid FsPath - try { - new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); - } catch (Exception e) { - throw new IllegalConfigurationException( - String.format( - "Invalid configuration for %s.", - ConfigOptions.REMOTE_DATA_DIR.key()), - e); - } - } - + public static void validateRemoteDataDirs(Configuration conf) { // Validate remote.data.dirs + List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); for (int i = 0; i < remoteDataDirs.size(); i++) { String dir = remoteDataDirs.get(i); try { @@ -185,19 +160,56 @@ protected static void validateServerConfigs(Configuration conf) { weights.size())); } - // Validate all weights are no less than 0 + // Verify that each weight is non-negative and that the total weight is greater than + // 0. + int totalWeight = 0; for (int i = 0; i < weights.size(); i++) { - if (weights.get(i) < 0) { + int weight = weights.get(i); + if (weight < 0) { throw new IllegalConfigurationException( String.format( "All weights in '%s' must be no less than 0, but found %d at index %d.", - ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), - weights.get(i), - i)); + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), weight, i)); } + totalWeight += weight; + } + if (totalWeight <= 0) { + throw new IllegalConfigurationException( + String.format( + "The sum of all weights in '%s' must be greater than 0, but the current sum is %d.", + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), totalWeight)); } } } + } + + /** Validate common server configs. */ + protected static void validateServerConfigs(Configuration conf) { + // Validate remote.data.dir and remote.data.dirs + String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR); + List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); + if (remoteDataDir == null && remoteDataDirs.isEmpty()) { + throw new IllegalConfigurationException( + String.format( + "Either %s or %s must be configured.", + ConfigOptions.REMOTE_DATA_DIR.key(), + ConfigOptions.REMOTE_DATA_DIRS.key())); + } + + if (remoteDataDir != null) { + // Must validate that remote.data.dir is a valid FsPath + try { + new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR)); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Invalid configuration for %s.", + ConfigOptions.REMOTE_DATA_DIR.key()), + e); + } + } + + validateRemoteDataDirs(conf); validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1); validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1); diff --git a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java index e2690c54ba..2783eea7af 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java @@ -137,6 +137,20 @@ void testValidateCoordinatorConfigs() { .hasMessageContaining( "All weights in 'remote.data.dirs.weights' must be no less than 0"); + // Test all zero weights + Configuration zeroWeightsConf = new Configuration(); + zeroWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + zeroWeightsConf.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", "s3://bucket2")); + zeroWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(0, 0)); + assertThatThrownBy(() -> validateCoordinatorConfigs(zeroWeightsConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("The sum of all weights") + .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key()) + .hasMessageContaining("must be greater than 0"); + // Test invalid DEFAULT_REPLICATION_FACTOR Configuration invalidReplicationConf = new Configuration(); invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path"); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index e130c392d5..4a0617c72b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -46,6 +46,9 @@ import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL; import static org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER; import static org.apache.fluss.config.ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO; +import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS; +import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS_STRATEGY; +import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS; import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -66,7 +69,12 @@ class DynamicServerConfig { LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(), KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), KV_SNAPSHOT_INTERVAL.key(), - SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key())); + SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key()), + KV_SNAPSHOT_INTERVAL.key(), + // Config options for remote.data.dirs + REMOTE_DATA_DIRS.key(), + REMOTE_DATA_DIRS_STRATEGY.key(), + REMOTE_DATA_DIRS_WEIGHTS.key()); private static final Set ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake."); private final ReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java index 695c770d45..ae3ceef33f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java @@ -29,6 +29,7 @@ import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.PartitionAssignment; @@ -85,6 +86,7 @@ public class AutoPartitionManager implements AutoCloseable { private final ServerMetadataCache metadataCache; private final MetadataManager metadataManager; + private final RemoteDirDynamicLoader remoteDirDynamicLoader; private final Clock clock; private final long periodicInterval; @@ -108,10 +110,12 @@ public class AutoPartitionManager implements AutoCloseable { public AutoPartitionManager( ServerMetadataCache metadataCache, MetadataManager metadataManager, + RemoteDirDynamicLoader remoteDirDynamicLoader, Configuration conf) { this( metadataCache, metadataManager, + remoteDirDynamicLoader, conf, SystemClock.getInstance(), Executors.newScheduledThreadPool( @@ -122,11 +126,13 @@ public AutoPartitionManager( AutoPartitionManager( ServerMetadataCache metadataCache, MetadataManager metadataManager, + RemoteDirDynamicLoader remoteDirDynamicLoader, Configuration conf, Clock clock, ScheduledExecutorService periodicExecutor) { this.metadataCache = metadataCache; this.metadataManager = metadataManager; + this.remoteDirDynamicLoader = remoteDirDynamicLoader; this.clock = clock; this.periodicExecutor = periodicExecutor; this.periodicInterval = conf.get(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL).toMillis(); @@ -373,8 +379,10 @@ private void createPartitions( PartitionAssignment partitionAssignment = new PartitionAssignment(tableInfo.getTableId(), bucketAssignments); + // select a remote data dir for the partition + String remoteDataDir = remoteDirDynamicLoader.getRemoteDirSelector().nextDataDir(); metadataManager.createPartition( - tablePath, tableId, partitionAssignment, partition, false); + tablePath, tableId, remoteDataDir, partitionAssignment, partition, false); // only single partition key table supports automatic creation of partitions currentPartitions.put(partition.getPartitionName(), null); LOG.info( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index c3094c7ad6..6914e8207e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -34,6 +34,7 @@ import org.apache.fluss.server.authorizer.AuthorizerLoader; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.rebalance.RebalanceManager; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.metrics.ServerMetricUtils; @@ -148,6 +149,9 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private LakeCatalogDynamicLoader lakeCatalogDynamicLoader; + @GuardedBy("lock") + private RemoteDirDynamicLoader remoteDirDynamicLoader; + @GuardedBy("lock") private CoordinatorLeaderElection coordinatorLeaderElection; @@ -225,10 +229,13 @@ protected void initCoordinatorStandby() throws Exception { this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId); this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true); + this.remoteDirDynamicLoader = new RemoteDirDynamicLoader(conf); + this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true); // Register server reconfigurable components dynamicConfigManager.register(lakeCatalogDynamicLoader); + dynamicConfigManager.register(remoteDirDynamicLoader); // Register stateless validators for coordinator-side upfront validation dynamicConfigManager.registerValidator(new DiskWriteLimitRatioValidator()); @@ -274,6 +281,7 @@ protected void initCoordinatorStandby() throws Exception { authorizer, lakeCatalogDynamicLoader, lakeTableTieringManager, + remoteDirDynamicLoader, dynamicConfigManager, ioExecutor, kvSnapshotLeaseManager, @@ -307,7 +315,8 @@ protected void initCoordinatorLeader() throws Exception { this.coordinatorChannelManager = new CoordinatorChannelManager(rpcClient); this.autoPartitionManager = - new AutoPartitionManager(metadataCache, metadataManager, conf); + new AutoPartitionManager( + metadataCache, metadataManager, remoteDirDynamicLoader, conf); autoPartitionManager.start(); // start coordinator event processor after we register coordinator leader to zk @@ -620,6 +629,10 @@ CompletableFuture stopServices() { lakeCatalogDynamicLoader.close(); } + if (remoteDirDynamicLoader != null) { + remoteDirDynamicLoader.close(); + } + if (kvSnapshotLeaseManager != null) { kvSnapshotLeaseManager.close(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index bdc97434ec..e96e36f454 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -154,6 +154,7 @@ import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.producer.ProducerOffsetsManager; import org.apache.fluss.server.coordinator.rebalance.goal.Goal; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.DatabasePropertyChanges; import org.apache.fluss.server.entity.LakeTieringTableInfo; @@ -241,6 +242,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina private final ProducerOffsetsManager producerOffsetsManager; private final KvSnapshotLeaseManager kvSnapshotLeaseManager; private final CoordinatorLeaderElection coordinatorLeaderElection; + private final RemoteDirDynamicLoader remoteDirDynamicLoader; public CoordinatorService( Configuration conf, @@ -252,6 +254,7 @@ public CoordinatorService( @Nullable Authorizer authorizer, LakeCatalogDynamicLoader lakeCatalogDynamicLoader, LakeTableTieringManager lakeTableTieringManager, + RemoteDirDynamicLoader remoteDirDynamicLoader, DynamicConfigManager dynamicConfigManager, ExecutorService ioExecutor, KvSnapshotLeaseManager kvSnapshotLeaseManager, @@ -278,6 +281,7 @@ public CoordinatorService( this.ioExecutor = ioExecutor; this.lakeTableHelper = new LakeTableHelper(zkClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)); + this.remoteDirDynamicLoader = remoteDirDynamicLoader; // Initialize and start the producer snapshot manager this.producerOffsetsManager = new ProducerOffsetsManager(conf, zkClient); @@ -493,9 +497,18 @@ public CompletableFuture createTable(CreateTableRequest req } } + // select remote data dir for table. + // remote data dir will be used to store table data for non-partitioned table and metadata + // (such as lake snapshot offset file) for partitioned table + String remoteDataDir = remoteDirDynamicLoader.getRemoteDirSelector().nextDataDir(); + // then create table; metadataManager.createTable( - tablePath, tableDescriptor, tableAssignment, request.isIgnoreIfExists()); + tablePath, + remoteDataDir, + tableDescriptor, + tableAssignment, + request.isIgnoreIfExists()); return CompletableFuture.completedFuture(new CreateTableResponse()); } @@ -708,9 +721,13 @@ public CompletableFuture createPartition( PartitionAssignment partitionAssignment = new PartitionAssignment(table.tableId, bucketAssignments); + // select remote data dir for partition + String remoteDataDir = remoteDirDynamicLoader.getRemoteDirSelector().nextDataDir(); + metadataManager.createPartition( tablePath, table.tableId, + remoteDataDir, partitionAssignment, partitionToCreate, request.isIgnoreIfNotExists()); @@ -759,6 +776,7 @@ public CompletableFuture metadata(MetadataRequest request) { return metadataResponseAccessContextEvent.getResultFuture(); } + @Override public CompletableFuture adjustIsr(AdjustIsrRequest request) { CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 65ba95d2ab..e1d8179507 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -364,6 +364,7 @@ public void completeDeletePartition(long partitionId) { * Returns -1 if the table already exists and ignoreIfExists is true. * * @param tablePath the table path + * @param remoteDataDir the remote data directory * @param tableToCreate the table descriptor describing the table to create * @param tableAssignment the table assignment, will be null when the table is partitioned table * @param ignoreIfExists whether to ignore if the table already exists @@ -371,6 +372,7 @@ public void completeDeletePartition(long partitionId) { */ public long createTable( TablePath tablePath, + String remoteDataDir, TableDescriptor tableToCreate, @Nullable TableAssignment tableAssignment, boolean ignoreIfExists) @@ -410,10 +412,7 @@ public long createTable( // register the table zookeeperClient.registerTable( tablePath, - TableRegistration.newTable( - tableId, - zookeeperClient.getDefaultRemoteDataDir(), - tableToCreate), + TableRegistration.newTable(tableId, remoteDataDir, tableToCreate), false); return tableId; }, @@ -804,6 +803,7 @@ public Set getPartitions(TablePath tablePath) { public void createPartition( TablePath tablePath, long tableId, + String remoteDataDir, PartitionAssignment partitionAssignment, ResolvedPartitionSpec partition, boolean ignoreIfExists) { @@ -866,7 +866,7 @@ public void createPartition( partitionId, partitionName, partitionAssignment, - zookeeperClient.getDefaultRemoteDataDir(), + remoteDataDir, tablePath, tableId); LOG.info( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java new file mode 100644 index 0000000000..1caf4b2f6e --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import org.apache.fluss.config.ConfigOption; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.cluster.ServerReconfigurable; +import org.apache.fluss.exception.ConfigException; +import org.apache.fluss.exception.IllegalConfigurationException; + +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.fluss.config.FlussConfigUtils.validateRemoteDataDirs; + +/** + * Dynamic loader for remote data directories that supports runtime reconfiguration. + * + *

This class manages the lifecycle of remote data directories and provides a selector for + * selecting remote data directories. It implements {@link ServerReconfigurable} to support dynamic + * configuration updates at runtime without requiring a server restart. + * + *

When creating a new table or partition, the coordinator server uses this loader to select an + * appropriate remote data directory based on the configured selection strategy (see {@link + * org.apache.fluss.config.ConfigOptions#REMOTE_DATA_DIRS_STRATEGY}). + */ +public class RemoteDirDynamicLoader implements ServerReconfigurable, AutoCloseable { + + private volatile RemoteDirSelector remoteDirSelector; + private Configuration currentConfiguration; + + public RemoteDirDynamicLoader(Configuration configuration) { + this.currentConfiguration = configuration; + this.remoteDirSelector = createRemoteDirSelector(configuration); + } + + public RemoteDirSelector getRemoteDirSelector() { + return remoteDirSelector; + } + + @Override + public void validate(Configuration newConfig) throws ConfigException { + // Validate new remote data dirs contain all old remote data dirs + Optional> newRemoteDataDirsOp = + newConfig.getOptional(ConfigOptions.REMOTE_DATA_DIRS); + if (newRemoteDataDirsOp.isPresent()) { + List oldRemoteDataDirs = + currentConfiguration.get(ConfigOptions.REMOTE_DATA_DIRS); + Set newRemoteDataDirs = new HashSet<>(newRemoteDataDirsOp.get()); + if (!newRemoteDataDirs.containsAll(oldRemoteDataDirs)) { + throw new ConfigException( + String.format( + "New %s: %s must contain all old %s: %s. " + + "If you want the Fluss cluster to stop transferring data to a certain path, " + + "keep it in %s and set its weight to 0 in %s.", + ConfigOptions.REMOTE_DATA_DIRS.key(), + newRemoteDataDirsOp.get(), + ConfigOptions.REMOTE_DATA_DIRS.key(), + oldRemoteDataDirs, + ConfigOptions.REMOTE_DATA_DIRS.key(), + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())); + } + } + + Configuration mergedConfig = mergeConfigurations(currentConfiguration, newConfig); + try { + validateRemoteDataDirs(mergedConfig); + } catch (IllegalConfigurationException e) { + throw new ConfigException(e.getMessage()); + } + } + + @Override + public void reconfigure(Configuration newConfig) throws ConfigException { + if (strategyChanged(newConfig) + || remoteDataDirsChanged(newConfig) + || weightsChanged(newConfig)) { + // Create a new container with the merged configuration + Configuration mergedConfig = mergeConfigurations(currentConfiguration, newConfig); + this.remoteDirSelector = createRemoteDirSelector(mergedConfig); + this.currentConfiguration = mergedConfig; + } + } + + private RemoteDirSelector createRemoteDirSelector(Configuration conf) { + ConfigOptions.RemoteDataDirStrategy strategy = + conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY); + String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR); + List remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS); + List weights = conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS); + + switch (strategy) { + case ROUND_ROBIN: + return new RoundRobinRemoteDirSelector(remoteDataDir, remoteDataDirs); + case WEIGHTED_ROUND_ROBIN: + return new WeightedRoundRobinRemoteDirSelector( + remoteDataDir, remoteDataDirs, weights); + default: + throw new IllegalArgumentException( + "Unsupported remote data directory select strategy: " + strategy); + } + } + + private boolean strategyChanged(Configuration newConfig) { + return hasConfigChanged(newConfig, ConfigOptions.REMOTE_DATA_DIRS_STRATEGY); + } + + private boolean remoteDataDirsChanged(Configuration newConfig) { + return hasConfigChanged(newConfig, ConfigOptions.REMOTE_DATA_DIRS); + } + + private boolean weightsChanged(Configuration newConfig) { + return hasConfigChanged(newConfig, ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS); + } + + /** + * Checks if a specific configuration option has changed in the new config. + * + * @param newConfig the new configuration + * @param option the configuration option to check + * @param the type of the configuration value + * @return true if the configuration has changed + */ + private boolean hasConfigChanged(Configuration newConfig, ConfigOption option) { + return newConfig + .getOptional(option) + .map(newValue -> !Objects.equals(newValue, currentConfiguration.get(option))) + .orElse(false); + } + + /** + * Merges the current configuration with new configuration values. + * + * @param current the current configuration + * @param updates the configuration updates to apply + * @return a new merged configuration + */ + private Configuration mergeConfigurations(Configuration current, Configuration updates) { + Configuration merged = new Configuration(current); + updates.toMap().forEach(merged::setString); + return merged; + } + + @Override + public void close() throws Exception { + // do nothing + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirSelector.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirSelector.java new file mode 100644 index 0000000000..f98dcaa623 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirSelector.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Interface for selecting remote data directories from a list of available directories. + * + *

This interface is used to implement different selection strategies for choosing remote data + * directories when creating tables or partitions. The selection strategy can be configured via + * {@link org.apache.fluss.config.ConfigOptions#REMOTE_DATA_DIRS_STRATEGY}. + * + *

Implementations of this interface should be thread-safe as they may be accessed concurrently + * from multiple threads. + * + * @see RoundRobinRemoteDirSelector + * @see WeightedRoundRobinRemoteDirSelector + */ +@ThreadSafe +public interface RemoteDirSelector { + + /** + * Returns the next remote data directory path to use. + * + *

This method should implement the selection strategy (e.g., round-robin, weighted + * round-robin) to choose from the available remote data directories. + * + * @return the next remote data directory path to use. If {@link + * org.apache.fluss.config.ConfigOptions#REMOTE_DATA_DIRS} is empty, should always return + * {@link org.apache.fluss.config.ConfigOptions#REMOTE_DATA_DIR}. + */ + String nextDataDir(); +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java new file mode 100644 index 0000000000..b627ce4fe7 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Round-robin remote data dir selector. + * + *

This implementation cycles through the available remote data directories in order, ensuring + * each directory is selected once before repeating. + * + *

Example: For directories [A, B, C], the selection sequence would be: A, B, C, A, B, C, ... + */ +public class RoundRobinRemoteDirSelector implements RemoteDirSelector { + + private final String remoteDataDir; + private final List remoteDataDirs; + + // Current position in the round-robin cycle. + private final AtomicInteger position; + + public RoundRobinRemoteDirSelector(String remoteDataDir, List remoteDataDirs) { + this.remoteDataDir = remoteDataDir; + this.remoteDataDirs = Collections.unmodifiableList(remoteDataDirs); + this.position = new AtomicInteger(0); + } + + @Override + public String nextDataDir() { + if (remoteDataDirs.isEmpty()) { + return remoteDataDir; + } + + int index = position.getAndUpdate(i -> (i + 1) % remoteDataDirs.size()); + return remoteDataDirs.get(index); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelector.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelector.java new file mode 100644 index 0000000000..a255d78408 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelector.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Weighted Round-robin remote data dir selector using Interleaved (Smooth) Weighted Round-Robin + * algorithm. + * + *

This implementation uses the smooth weighted round-robin algorithm (also known as interleaved + * weighted round-robin), which distributes selections more evenly compared to traditional weighted + * round-robin. Instead of selecting the same node consecutively based on its weight, it interleaves + * selections to achieve a smoother distribution. + * + *

Algorithm: + * + *

    + *
  1. Each node maintains a currentWeight initialized to 0 + *
  2. On each selection: add the node's configured weight to its currentWeight + *
  3. Select the node with the highest currentWeight + *
  4. Subtract the total weight sum from the selected node's currentWeight + *
+ * + *

Example: For nodes A, B, C with weights 5, 1, 1 (total=7), the selection sequence would be: A, + * A, B, A, C, A, A (instead of A, A, A, A, A, B, C in traditional WRR). + */ +public class WeightedRoundRobinRemoteDirSelector implements RemoteDirSelector { + + private final String remoteDataDir; + private final List remoteDataDirs; + private final int[] weights; + private final int totalWeight; + + // Current weights for each node, used in smooth weighted round-robin + private final int[] currentWeights; + + // Lock object for thread safety + private final Object lock = new Object(); + + public WeightedRoundRobinRemoteDirSelector( + String remoteDataDir, List remoteDataDirs, List weights) { + checkArgument( + remoteDataDirs.size() == weights.size(), + "remoteDataDirs size (%s) must equal weights size (%s)", + remoteDataDirs.size(), + weights.size()); + + this.remoteDataDir = remoteDataDir; + this.remoteDataDirs = Collections.unmodifiableList(remoteDataDirs); + + // Convert weights list to array for better performance + this.weights = new int[weights.size()]; + int sum = 0; + for (int i = 0; i < weights.size(); i++) { + this.weights[i] = weights.get(i); + sum += this.weights[i]; + } + this.totalWeight = sum; + + // Initialize current weights to 0 + this.currentWeights = new int[remoteDataDirs.size()]; + } + + @Override + public String nextDataDir() { + if (remoteDataDirs.isEmpty()) { + return remoteDataDir; + } + + synchronized (lock) { + int selectedIndex = -1; + int maxCurrentWeight = Integer.MIN_VALUE; + + // Step 1 & 2: Add weight to currentWeight and find the max + for (int i = 0; i < remoteDataDirs.size(); i++) { + currentWeights[i] += weights[i]; + if (currentWeights[i] > maxCurrentWeight) { + maxCurrentWeight = currentWeights[i]; + selectedIndex = i; + } + } + + // Step 3: Subtract total weight from selected node's current weight + currentWeights[selectedIndex] -= totalWeight; + + return remoteDataDirs.get(selectedIndex); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index 39db82daf3..e65b7e7044 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -25,6 +25,9 @@ import org.apache.fluss.exception.ConfigException; import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; import org.apache.fluss.server.storage.LocalDiskManager; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; +import org.apache.fluss.server.coordinator.remote.RoundRobinRemoteDirSelector; +import org.apache.fluss.server.coordinator.remote.WeightedRoundRobinRemoteDirSelector; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -462,6 +465,58 @@ public void reconfigure(Configuration newConfig) { assertThat(reconfiguredInterval.get()).isEqualTo(Duration.ofMinutes(5)); } + @Test + void testDynamicReconfigurationOfRemoteDataDirs() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(ConfigOptions.REMOTE_DATA_DIR, "hdfs://default-dir"); + configuration.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("hdfs://dir1", "hdfs://dir2")); + + try (RemoteDirDynamicLoader remoteDirDynamicLoader = + new RemoteDirDynamicLoader(configuration)) { + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + dynamicConfigManager.register(remoteDirDynamicLoader); + dynamicConfigManager.startup(); + + // Verify initial selector is RoundRobin (default strategy) + assertThat(remoteDirDynamicLoader.getRemoteDirSelector()) + .isInstanceOf(RoundRobinRemoteDirSelector.class); + + // Change multiple configs - generic validation applies to all + dynamicConfigManager.alterConfigs( + Arrays.asList( + new AlterConfig( + ConfigOptions.REMOTE_DATA_DIRS.key(), + "hdfs://dir1,hdfs://dir2,hdfs://dir3", + AlterConfigOpType.SET), + new AlterConfig( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY.key(), + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN.name(), + AlterConfigOpType.SET), + new AlterConfig( + ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(), + "1,2,3", + AlterConfigOpType.SET))); + + // Verify both configs were applied + Map zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.REMOTE_DATA_DIRS.key())) + .isEqualTo("hdfs://dir1,hdfs://dir2,hdfs://dir3"); + assertThat(zkConfig.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY.key())) + .isEqualTo(ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN.name()); + assertThat(zkConfig.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())) + .isEqualTo("1,2,3"); + + // Wait for config change to propagate via ZK watcher + retry( + Duration.ofMinutes(1), + () -> + assertThat(remoteDirDynamicLoader.getRemoteDirSelector()) + .isInstanceOf(WeightedRoundRobinRemoteDirSelector.class)); + } + } + @Test void testPreventInvalidMinInSyncReplicas() throws Exception { Configuration configuration = new Configuration(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java index 5114ee1156..9151ae1562 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.testutils.TestingServerMetadataCache; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; @@ -50,8 +51,10 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -73,6 +76,8 @@ class AutoPartitionManagerTest { protected static ZooKeeperClient zookeeperClient; private static MetadataManager metadataManager; private static String remoteDataDir; + private static List remoteDataDirs; + private static RemoteDirDynamicLoader remoteDirDynamicLoader; @BeforeAll static void beforeAll() { @@ -85,7 +90,17 @@ static void beforeAll() { zookeeperClient, new Configuration(), new LakeCatalogDynamicLoader(new Configuration(), null, true)); - remoteDataDir = zookeeperClient.getDefaultRemoteDataDir(); + + remoteDataDir = "/dir"; + remoteDataDirs = Arrays.asList("/dir1", "/dir2", "/dir3", "/dir4"); + Configuration conf = new Configuration(); + conf.set(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir); + conf.set(ConfigOptions.REMOTE_DATA_DIRS, remoteDataDirs); + conf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 1, 1, 1)); + conf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + remoteDirDynamicLoader = new RemoteDirDynamicLoader(conf); } @AfterEach @@ -304,6 +319,7 @@ void testAddPartitionedTable(TestParams params) throws Exception { zookeeperClient, new Configuration(), new LakeCatalogDynamicLoader(new Configuration(), null, true)), + remoteDirDynamicLoader, new Configuration(), clock, periodicExecutor); @@ -320,6 +336,7 @@ void testAddPartitionedTable(TestParams params) throws Exception { zookeeperClient.getPartitionRegistrations(tablePath); // pre-create 4 partitions including current partition assertThat(partitions.keySet()).containsExactlyInAnyOrder(params.expectedPartitions); + verifyPartitionsRemoteDataDir(tablePath, partitions.keySet()); int replicaFactor = table.getTableConfig().getReplicationFactor(); Map bucketAssignments = @@ -341,6 +358,7 @@ void testAddPartitionedTable(TestParams params) throws Exception { metadataManager.createPartition( tablePath, tableId, + remoteDataDir, partitionAssignment, fromPartitionName(table.getPartitionKeys(), partitionName), false); @@ -397,6 +415,7 @@ void testMaxPartitions() throws Exception { new AutoPartitionManager( new TestingServerMetadataCache(3), metadataManager, + remoteDirDynamicLoader, new Configuration(), clock, periodicExecutor); @@ -435,6 +454,7 @@ void testMaxPartitions() throws Exception { metadataManager.createPartition( tablePath, tableId, + remoteDataDir, partitionAssignment, fromPartitionName(table.getPartitionKeys(), i + ""), false); @@ -474,6 +494,7 @@ void testAutoCreateDayPartitionShouldJitter() throws Exception { new AutoPartitionManager( new TestingServerMetadataCache(3), metadataManager, + remoteDirDynamicLoader, new Configuration(), clock, periodicExecutor); @@ -539,6 +560,7 @@ void testMaxBucketNum() throws Exception { new AutoPartitionManager( new TestingServerMetadataCache(3), metadataManager, + remoteDirDynamicLoader, config, clock, periodicExecutor); @@ -580,6 +602,7 @@ void testUpdateAutoPartitionNumRetention() throws Exception { new AutoPartitionManager( new TestingServerMetadataCache(3), metadataManager, + remoteDirDynamicLoader, new Configuration(), clock, periodicExecutor); @@ -759,7 +782,7 @@ public TestParams build() { private void verifyPartitionsRemoteDataDir( TablePath tablePath, Collection partitionNames) throws Exception { - Set allRemoteDataDirs = new HashSet<>(); + Set allRemoteDataDirs = new HashSet<>(remoteDataDirs); allRemoteDataDirs.add(remoteDataDir); for (String partitionName : partitionNames) { Optional partition = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 0694e9787a..e1226156e1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -52,6 +52,7 @@ import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.coordinator.statemachine.BucketState; import org.apache.fluss.server.coordinator.statemachine.ReplicaState; import org.apache.fluss.server.entity.AdjustIsrResultForBucket; @@ -94,7 +95,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; -import java.io.IOException; import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; @@ -196,17 +196,21 @@ static void baseBeforeAll() throws Exception { } @BeforeEach - void beforeEach() throws IOException { + void beforeEach() { serverMetadataCache = new CoordinatorMetadataCache(); // set a test channel manager for the context testCoordinatorChannelManager = new TestCoordinatorChannelManager(); - autoPartitionManager = - new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration()); lakeTableTieringManager = new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS); - Configuration conf = new Configuration(); remoteDataDir = zookeeperClient.getDefaultRemoteDataDir(); + Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir); + autoPartitionManager = + new AutoPartitionManager( + serverMetadataCache, + metadataManager, + new RemoteDirDynamicLoader(conf), + new Configuration()); kvSnapshotLeaseManager = new KvSnapshotLeaseManager( Duration.ofMinutes(10).toMillis(), @@ -250,10 +254,14 @@ void testCreateAndDropTable() throws Exception { new TabletServerInfo(1, "rack1"), new TabletServerInfo(2, "rack2") }); - long t1Id = metadataManager.createTable(t1, tableDescriptor, tableAssignment, false); + long t1Id = + metadataManager.createTable( + t1, remoteDataDir, tableDescriptor, tableAssignment, false); TablePath t2 = TablePath.of(defaultDatabase, "create_drop_t2"); - long t2Id = metadataManager.createTable(t2, tableDescriptor, tableAssignment, false); + long t2Id = + metadataManager.createTable( + t2, remoteDataDir, tableDescriptor, tableAssignment, false); verifyTableCreated(t2Id, tableAssignment, nBuckets, replicationFactor); @@ -383,13 +391,15 @@ void testServerBecomeOnlineAndOfflineLine() throws Exception { TablePath table1Path = TablePath.of(defaultDatabase, "t1"); long table1Id = - metadataManager.createTable(table1Path, TEST_TABLE, table1Assignment, false); + metadataManager.createTable( + table1Path, remoteDataDir, TEST_TABLE, table1Assignment, false); TableAssignment table2Assignment = TableAssignment.builder().add(0, BucketAssignment.of(3)).build(); TablePath table2Path = TablePath.of(defaultDatabase, "t2"); long table2Id = - metadataManager.createTable(table2Path, TEST_TABLE, table2Assignment, false); + metadataManager.createTable( + table2Path, remoteDataDir, TEST_TABLE, table2Assignment, false); // retry until the table2 been created retryVerifyContext( @@ -494,7 +504,9 @@ void testRestartTriggerReplicaToOffline() throws Exception { .add(1, BucketAssignment.of(1, 2, 0)) .build(); TablePath tablePath = TablePath.of(defaultDatabase, "t_restart"); - long table1Id = metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); + long table1Id = + metadataManager.createTable( + tablePath, remoteDataDir, TEST_TABLE, tableAssignment, false); // let's restart initCoordinatorChannel(); @@ -618,7 +630,8 @@ void testCreateAndDropPartition() throws Exception { // create a partitioned table TableDescriptor tablePartitionTableDescriptor = getPartitionedTable(); long tableId = - metadataManager.createTable(tablePath, tablePartitionTableDescriptor, null, false); + metadataManager.createTable( + tablePath, remoteDataDir, tablePartitionTableDescriptor, null, false); int nBuckets = 3; int replicationFactor = 3; @@ -690,7 +703,8 @@ void testRestartResumeDropPartition() throws Exception { // create a partitioned table TableDescriptor tablePartitionTableDescriptor = getPartitionedTable(); long tableId = - metadataManager.createTable(tablePath, tablePartitionTableDescriptor, null, false); + metadataManager.createTable( + tablePath, remoteDataDir, tablePartitionTableDescriptor, null, false); int nBuckets = 3; int replicationFactor = 3; @@ -837,7 +851,8 @@ void testProcessAdjustIsr() throws Exception { new TabletServerInfo(1, "rack1"), new TabletServerInfo(2, "rack2") }); - long t1Id = metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false); + long t1Id = + metadataManager.createTable(t1, remoteDataDir, TEST_TABLE, tableAssignment, false); verifyTableCreated(t1Id, tableAssignment, nBuckets, replicationFactor); // get the origin bucket leaderAndIsr @@ -891,7 +906,7 @@ void testSchemaChange() throws Exception { }); // create table List replicas = tableAssignment.getBucketAssignment(0).getReplicas(); - metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false); + metadataManager.createTable(t1, remoteDataDir, TEST_TABLE, tableAssignment, false); TableInfo tableInfo = metadataManager.getTable(t1); retry( @@ -950,7 +965,7 @@ void testTableRegistrationChange() throws Exception { }); // create table List replicas = tableAssignment.getBucketAssignment(0).getReplicas(); - metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false); + metadataManager.createTable(t1, remoteDataDir, TEST_TABLE, tableAssignment, false); TableInfo tableInfo = metadataManager.getTable(t1); retry( @@ -1012,7 +1027,11 @@ void testDoBucketReassignment() throws Exception { TableAssignment tableAssignment = new TableAssignment(bucketAssignments); long t1Id = metadataManager.createTable( - t1, CoordinatorEventProcessorTest.TEST_TABLE, tableAssignment, false); + t1, + remoteDataDir, + CoordinatorEventProcessorTest.TEST_TABLE, + tableAssignment, + false); TableBucket tb0 = new TableBucket(t1Id, 0); verifyIsr(tb0, 0, Arrays.asList(0, 1, 3)); @@ -1073,7 +1092,8 @@ void testLeaderOnlyRebalanceExecutesSequentially() throws Exception { bucketAssignments.put(1, BucketAssignment.of(0, 1, 2)); bucketAssignments.put(2, BucketAssignment.of(0, 1, 2)); TableAssignment tableAssignment = new TableAssignment(bucketAssignments); - long t1Id = metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false); + long t1Id = + metadataManager.createTable(t1, remoteDataDir, TEST_TABLE, tableAssignment, false); TableBucket tb0 = new TableBucket(t1Id, 0); TableBucket tb1 = new TableBucket(t1Id, 1); @@ -1172,6 +1192,8 @@ private void verifyIsr(TableBucket tb, int expectedLeader, List expecte } private CoordinatorEventProcessor buildCoordinatorEventProcessor() { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir); return new CoordinatorEventProcessor( zookeeperClient, serverMetadataCache, @@ -1180,7 +1202,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() { autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, - new Configuration(), + conf, Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io")), metadataManager, kvSnapshotLeaseManager); @@ -1508,7 +1530,11 @@ private long createTable(TablePath tablePath, TabletServerInfo[] servers) { TableAssignment tableAssignment = generateAssignment(N_BUCKETS, REPLICATION_FACTOR, servers); return metadataManager.createTable( - tablePath, CoordinatorEventProcessorTest.TEST_TABLE, tableAssignment, false); + tablePath, + remoteDataDir, + CoordinatorEventProcessorTest.TEST_TABLE, + tableAssignment, + false); } private void alterTable(TablePath tablePath, List schemaChanges) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 72697b7a91..9baa3ec49d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -136,7 +136,8 @@ void testTableChanges() { new TabletServerInfo(2, "rack2") }); long tableId = - metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); + metadataManager.createTable( + tablePath, remoteDataDir, TEST_TABLE, tableAssignment, false); SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath); long currentMillis = System.currentTimeMillis(); expectedEvents.add( @@ -197,7 +198,9 @@ void testPartitionedTable() throws Exception { .property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key(), "DAY") .build() .withReplicationFactor(3); - long tableId = metadataManager.createTable(tablePath, partitionedTable, null, false); + long tableId = + metadataManager.createTable( + tablePath, remoteDataDir, partitionedTable, null, false); List expectedEvents = new ArrayList<>(); SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath); // create table event @@ -277,7 +280,8 @@ void testSchemaChanges() { new TabletServerInfo(2, "rack2") }); long tableId = - metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); + metadataManager.createTable( + tablePath, remoteDataDir, TEST_TABLE, tableAssignment, false); SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath); long currentMillis = System.currentTimeMillis(); expectedEvents.add( @@ -350,7 +354,9 @@ void testTableRegistrationChange() { new TabletServerInfo(1, "rack1"), new TabletServerInfo(2, "rack2") }); - long tableId = metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); + long tableId = + metadataManager.createTable( + tablePath, remoteDataDir, TEST_TABLE, tableAssignment, false); SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath); long currentMillis = System.currentTimeMillis(); @@ -430,7 +436,8 @@ void testTableCreationDetectedViaNodeCreatedEvent() { new TabletServerInfo(2, "rack2") }); long tableId = - metadataManager.createTable(tablePath, TEST_TABLE, tableAssignment, false); + metadataManager.createTable( + tablePath, remoteDataDir, TEST_TABLE, tableAssignment, false); SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath); long currentMillis = System.currentTimeMillis(); expectedEvents.add( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java index 6ae36b8dee..725430ede2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.coordinator.rebalance; import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.server.coordinator.AutoPartitionManager; import org.apache.fluss.server.coordinator.CoordinatorContext; @@ -27,6 +28,7 @@ import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.zk.NOPErrorHandler; @@ -83,8 +85,10 @@ static void baseBeforeAll() throws Exception { void beforeEach() { serverMetadataCache = new CoordinatorMetadataCache(); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); - String remoteDataDir = "/tmp/fluss/remote-data"; + Configuration conf = new Configuration(); + conf.set(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir); + kvSnapshotLeaseManager = new KvSnapshotLeaseManager( Duration.ofMinutes(10).toMillis(), @@ -95,10 +99,14 @@ void beforeEach() { kvSnapshotLeaseManager.start(); autoPartitionManager = - new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration()); + new AutoPartitionManager( + serverMetadataCache, + metadataManager, + new RemoteDirDynamicLoader(conf), + conf); lakeTableTieringManager = new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS); - CoordinatorEventProcessor eventProcessor = buildCoordinatorEventProcessor(); + CoordinatorEventProcessor eventProcessor = buildCoordinatorEventProcessor(conf); rebalanceManager = new RebalanceManager(eventProcessor, zookeeperClient); rebalanceManager.startup(); } @@ -134,7 +142,7 @@ void testRebalanceWithoutTask() throws Exception { .hasValue(new RebalanceTask(rebalanceId, COMPLETED, new HashMap<>())); } - private CoordinatorEventProcessor buildCoordinatorEventProcessor() { + private CoordinatorEventProcessor buildCoordinatorEventProcessor(Configuration conf) { return new CoordinatorEventProcessor( zookeeperClient, serverMetadataCache, @@ -143,7 +151,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() { autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, - new Configuration(), + conf, Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io")), metadataManager, kvSnapshotLeaseManager); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java new file mode 100644 index 0000000000..f51ad0788c --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.ConfigException; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link RemoteDirDynamicLoader}. */ +class RemoteDirDynamicLoaderTest { + + @Test + void testReconfigureWithStrategyChange() throws Exception { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("hdfs://dir1", "hdfs://dir2")); + try (RemoteDirDynamicLoader loader = new RemoteDirDynamicLoader(conf)) { + RemoteDirSelector selector = loader.getRemoteDirSelector(); + assertThat(selector).isInstanceOf(RoundRobinRemoteDirSelector.class); + + // 1. Reconfigure with WEIGHTED_ROUND_ROBIN strategy + Configuration newConfig = new Configuration(); + newConfig.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + newConfig.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 2)); + loader.reconfigure(newConfig); + + // Selector should be replaced + assertThat(loader.getRemoteDirSelector()).isNotSameAs(selector); + assertThat(loader.getRemoteDirSelector()) + .isInstanceOf(WeightedRoundRobinRemoteDirSelector.class); + + selector = loader.getRemoteDirSelector(); + + // 2. Reconfigure back to ROUND_ROBIN strategy + newConfig.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.ROUND_ROBIN); + loader.reconfigure(newConfig); + + // Selector should be replaced + assertThat(loader.getRemoteDirSelector()).isNotSameAs(selector); + assertThat(loader.getRemoteDirSelector()) + .isInstanceOf(RoundRobinRemoteDirSelector.class); + } + } + + @Test + void testReconfigureWithWeightsChange() throws Exception { + Configuration conf = new Configuration(); + conf.set( + ConfigOptions.REMOTE_DATA_DIRS_STRATEGY, + ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN); + conf.set(ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("hdfs://dir1", "hdfs://dir2")); + conf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(1, 2)); + + try (RemoteDirDynamicLoader loader = new RemoteDirDynamicLoader(conf)) { + RemoteDirSelector originalSelector = loader.getRemoteDirSelector(); + assertThat(originalSelector).isInstanceOf(WeightedRoundRobinRemoteDirSelector.class); + + // Reconfigure with weights change + Configuration newConfig = new Configuration(); + newConfig.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, Arrays.asList(3, 4)); + loader.reconfigure(newConfig); + + // Container should be replaced + assertThat(loader.getRemoteDirSelector()).isNotSameAs(originalSelector); + assertThat(loader.getRemoteDirSelector()) + .isInstanceOf(WeightedRoundRobinRemoteDirSelector.class); + } + } + + @Test + void testReconfigureWithRemoteDataDirsChange() throws Exception { + // Test new dirs must contain all old dirs + Configuration conf1 = new Configuration(); + conf1.set(ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("hdfs://dir1", "hdfs://dir2")); + try (RemoteDirDynamicLoader loader = new RemoteDirDynamicLoader(conf1)) { + Configuration newConfig = new Configuration(); + newConfig.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("hdfs://dir2", "hdfs://dir3")); + + assertThatThrownBy(() -> loader.validate(newConfig)) + .isInstanceOf(ConfigException.class) + .hasMessageContaining("must contain all old remote.data.dirs"); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java new file mode 100644 index 0000000000..6cbc2544c3 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import org.apache.fluss.config.AutoPartitionTimeUnit; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.server.testutils.RpcMessageTestUtils; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.PartitionRegistration; +import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR; +import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; +import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for multi remote data directories functionality. */ +class RemoteDirsITCase { + + private static final TableDescriptor DATA1_PARTITIONED_TABLE_DESCRIPTOR_PK = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.STRING()) + .withComment("b is second column") + .primaryKey("a", "b") + .build()) + .distributedBy(3) + .partitionedBy("b") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.YEAR) + .build(); + + private static final List REMOTE_DIR_NAMES = Arrays.asList("dir1", "dir2", "dir3"); + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(3) + .setClusterConf(initConfig()) + .setRemoteDirNames(REMOTE_DIR_NAMES) + .build(); + + private ZooKeeperClient zkClient; + + private static Configuration initConfig() { + Configuration conf = new Configuration(); + conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); + conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL, Duration.ofSeconds(1)); + + return conf; + } + + @BeforeEach + void setup() { + zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreateMultipleTablesWithRoundRobin(boolean isPrimaryKeyTable) throws Exception { + // Create multiple tables and verify they use different remote dirs via round-robin + TableDescriptor tableDescriptor = + isPrimaryKeyTable ? DATA1_TABLE_DESCRIPTOR_PK : DATA1_TABLE_DESCRIPTOR; + String tablePrefix = isPrimaryKeyTable ? "pk_table_" : "non_pk_table_"; + + List remoteDirsUsed = new ArrayList<>(); + int tableCount = 6; // Create more tables than dirs to see round-robin in action + + for (int i = 0; i < tableCount; i++) { + TablePath tablePath = TablePath.of("test_db", tablePrefix + i); + RpcMessageTestUtils.createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + + // Get the table registration to check remoteDataDir + Optional tableOpt = zkClient.getTable(tablePath); + assertThat(tableOpt).isPresent(); + TableRegistration table = tableOpt.get(); + + assertThat(table.remoteDataDir).isNotNull(); + remoteDirsUsed.add(table.remoteDataDir); + } + + // Verify round-robin distribution: each dir should be used at least once + Map dirUsageCount = new HashMap<>(); + for (String dir : remoteDirsUsed) { + dirUsageCount.merge(dir, 1, Integer::sum); + } + + // With round-robin, all configured dirs should be used + assertThat(dirUsageCount.keySet()).hasSize(REMOTE_DIR_NAMES.size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreatePartitionsWithRoundRobin(boolean isPrimaryKeyTable) throws Exception { + // Create a partitioned table and add multiple partitions + // Each partition should get a different remoteDataDir via round-robin + String tablePrefix = isPrimaryKeyTable ? "pk_partitioned_" : "partitioned_"; + TablePath tablePath = TablePath.of("test_db", tablePrefix + "table_2"); + + TableDescriptor tableDescriptor = + isPrimaryKeyTable + ? DATA1_PARTITIONED_TABLE_DESCRIPTOR_PK + : DATA1_PARTITIONED_TABLE_DESCRIPTOR; + RpcMessageTestUtils.createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + + Optional tableOpt = zkClient.getTable(tablePath); + assertThat(tableOpt).isPresent(); + TableRegistration table = tableOpt.get(); + // Partitioned table should have remoteDataDir set at table level + assertThat(table.remoteDataDir).isNotNull(); + + // Create multiple partitions using partition column "b" + int partitionCount = 6; + List partitionNames = new ArrayList<>(); + for (int i = 0; i < partitionCount; i++) { + String partitionName = "202" + i; + partitionNames.add(partitionName); + PartitionSpec partitionSpec = + new PartitionSpec(Collections.singletonMap("b", partitionName)); + RpcMessageTestUtils.createPartition( + FLUSS_CLUSTER_EXTENSION, tablePath, partitionSpec, false); + } + + // Verify each partition has remoteDataDir set and round-robin is applied + Set usedRemoteDirs = new HashSet<>(); + for (String partitionName : partitionNames) { + Optional partitionOpt = + zkClient.getPartition(tablePath, partitionName); + assertThat(partitionOpt).isPresent(); + PartitionRegistration partition = partitionOpt.get(); + + assertThat(partition.getRemoteDataDir()).isNotNull(); + usedRemoteDirs.add(partition.getRemoteDataDir()); + } + + // All configured dirs should be used + assertThat(usedRemoteDirs).hasSize(REMOTE_DIR_NAMES.size()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testAutoPartitionWithMultipleRemoteDirs(boolean isPrimaryKeyTable) throws Exception { + // Create an auto-partitioned table and verify partitions use different remote dirs + String tablePrefix = isPrimaryKeyTable ? "auto_pk_partitioned_" : "auto_partitioned_"; + TablePath tablePath = TablePath.of("test_db", tablePrefix + "table"); + TableDescriptor tableDescriptor = + isPrimaryKeyTable + ? DATA1_PARTITIONED_TABLE_DESCRIPTOR_PK + : DATA1_PARTITIONED_TABLE_DESCRIPTOR; + + RpcMessageTestUtils.createTable(FLUSS_CLUSTER_EXTENSION, tablePath, tableDescriptor); + + // Wait for auto partitions to be created + Map partitions = + FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath); + assertThat(partitions).isNotEmpty(); + + // Verify partitions use remote dirs + Set usedRemoteDirs = new HashSet<>(); + for (String partitionName : partitions.keySet()) { + Optional partitionOpt = + zkClient.getPartition(tablePath, partitionName); + assertThat(partitionOpt).isPresent(); + PartitionRegistration partition = partitionOpt.get(); + + assertThat(partition.getRemoteDataDir()).isNotNull(); + usedRemoteDirs.add(partition.getRemoteDataDir()); + } + + // At least one remote dir should be used + assertThat(usedRemoteDirs).isNotEmpty(); + } + + @Test + void testMixedTableAndPartitionCreation() throws Exception { + // Create a mix of non-partitioned tables and partitioned table partitions + // to verify round-robin works correctly across both types + + // Create 2 non-partitioned tables + for (int i = 0; i < 2; i++) { + TablePath tablePath = TablePath.of("test_db", "mixed_non_pk_" + i); + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_DESCRIPTOR_PK); + } + + // Create a partitioned table + TablePath partitionedTablePath = TablePath.of("test_db", "mixed_partitioned"); + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, partitionedTablePath, DATA1_PARTITIONED_TABLE_DESCRIPTOR); + + // Create partitions using partition column "b" + RpcMessageTestUtils.createPartition( + FLUSS_CLUSTER_EXTENSION, + partitionedTablePath, + new PartitionSpec(Collections.singletonMap("b", "2024")), + false); + RpcMessageTestUtils.createPartition( + FLUSS_CLUSTER_EXTENSION, + partitionedTablePath, + new PartitionSpec(Collections.singletonMap("b", "2025")), + false); + + // Create 2 more non-partitioned tables + for (int i = 2; i < 4; i++) { + TablePath tablePath = TablePath.of("test_db", "mixed_non_pk_" + i); + RpcMessageTestUtils.createTable( + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_DESCRIPTOR_PK); + } + + // Collect all remote dirs used + Set allUsedDirs = new HashSet<>(); + + // Check non-partitioned tables + for (int i = 0; i < 4; i++) { + TablePath tablePath = TablePath.of("test_db", "mixed_non_pk_" + i); + Optional tableOpt = zkClient.getTable(tablePath); + assertThat(tableOpt).isPresent(); + assertThat(tableOpt.get().remoteDataDir).isNotNull(); + allUsedDirs.add(tableOpt.get().remoteDataDir); + } + + // Check partitions + for (String p : Arrays.asList("2024", "2025")) { + Optional partitionOpt = + zkClient.getPartition(partitionedTablePath, p); + assertThat(partitionOpt).isPresent(); + assertThat(partitionOpt.get().getRemoteDataDir()).isNotNull(); + allUsedDirs.add(partitionOpt.get().getRemoteDataDir()); + } + + // All remote dirs should have been used (6 items, 3 dirs, round-robin) + assertThat(allUsedDirs).hasSize(REMOTE_DIR_NAMES.size()); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelectorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelectorTest.java new file mode 100644 index 0000000000..b94d3831a8 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelectorTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RoundRobinRemoteDirSelector}. */ +class RoundRobinRemoteDirSelectorTest { + + private static final String DEFAULT_DIR = "hdfs://default/data"; + + @Test + void testEmptyRemoteDirsShouldReturnDefault() { + RoundRobinRemoteDirSelector selector = + new RoundRobinRemoteDirSelector(DEFAULT_DIR, Collections.emptyList()); + + // Should always return default when remoteDataDirs is empty + for (int i = 0; i < 10; i++) { + assertThat(selector.nextDataDir()).isEqualTo(DEFAULT_DIR); + } + } + + @Test + void testSingleDirShouldAlwaysReturnSame() { + String dir = "hdfs://cluster/data1"; + RoundRobinRemoteDirSelector selector = + new RoundRobinRemoteDirSelector(DEFAULT_DIR, Collections.singletonList(dir)); + + // Should always return the single directory + for (int i = 0; i < 10; i++) { + assertThat(selector.nextDataDir()).isEqualTo(dir); + } + } + + @Test + void testRoundRobinOrder() { + List dirs = + Arrays.asList( + "hdfs://cluster/data1", "hdfs://cluster/data2", "hdfs://cluster/data3"); + + RoundRobinRemoteDirSelector selector = new RoundRobinRemoteDirSelector(DEFAULT_DIR, dirs); + + // Collect selections for multiple cycles + List selections = new ArrayList<>(); + for (int i = 0; i < 9; i++) { + selections.add(selector.nextDataDir()); + } + + // Verify round-robin pattern: each cycle should contain all dirs in order + // First cycle + assertThat(selections.subList(0, 3)).containsExactlyElementsOf(dirs); + // Second cycle + assertThat(selections.subList(3, 6)).containsExactlyElementsOf(dirs); + // Third cycle + assertThat(selections.subList(6, 9)).containsExactlyElementsOf(dirs); + } + + @Test + void testEvenDistribution() { + List dirs = + Arrays.asList( + "hdfs://cluster/data1", "hdfs://cluster/data2", "hdfs://cluster/data3"); + + RoundRobinRemoteDirSelector selector = new RoundRobinRemoteDirSelector(DEFAULT_DIR, dirs); + + Map counts = new HashMap<>(); + int totalCalls = 30; + + for (int i = 0; i < totalCalls; i++) { + String selected = selector.nextDataDir(); + counts.merge(selected, 1, Integer::sum); + } + + // Each directory should be selected equally + assertThat(counts.get(dirs.get(0))).isEqualTo(10); + assertThat(counts.get(dirs.get(1))).isEqualTo(10); + assertThat(counts.get(dirs.get(2))).isEqualTo(10); + } + + @Test + void testTwoDirs() { + List dirs = Arrays.asList("hdfs://cluster/data1", "hdfs://cluster/data2"); + + RoundRobinRemoteDirSelector selector = new RoundRobinRemoteDirSelector(DEFAULT_DIR, dirs); + + // Verify alternating pattern + assertThat(selector.nextDataDir()).isEqualTo(dirs.get(0)); + assertThat(selector.nextDataDir()).isEqualTo(dirs.get(1)); + assertThat(selector.nextDataDir()).isEqualTo(dirs.get(0)); + assertThat(selector.nextDataDir()).isEqualTo(dirs.get(1)); + } + + @Test + void testCycleWrapsCorrectly() { + List dirs = + Arrays.asList( + "hdfs://cluster/data1", "hdfs://cluster/data2", "hdfs://cluster/data3"); + + RoundRobinRemoteDirSelector selector = new RoundRobinRemoteDirSelector(DEFAULT_DIR, dirs); + + // Collect first cycle + List firstCycle = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + firstCycle.add(selector.nextDataDir()); + } + + // Collect second cycle + List secondCycle = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + secondCycle.add(selector.nextDataDir()); + } + + // Both cycles should have same sequence + assertThat(secondCycle).isEqualTo(firstCycle); + } + + @Test + void testAllDirsSelectedInOneCycle() { + List dirs = + Arrays.asList( + "hdfs://cluster/data1", + "hdfs://cluster/data2", + "hdfs://cluster/data3", + "hdfs://cluster/data4", + "hdfs://cluster/data5"); + + RoundRobinRemoteDirSelector selector = new RoundRobinRemoteDirSelector(DEFAULT_DIR, dirs); + + Set selectedInCycle = new HashSet<>(); + for (int i = 0; i < dirs.size(); i++) { + selectedInCycle.add(selector.nextDataDir()); + } + + // All directories should be selected exactly once in one cycle + assertThat(selectedInCycle).containsExactlyInAnyOrderElementsOf(dirs); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelectorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelectorTest.java new file mode 100644 index 0000000000..44fe690a7d --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/WeightedRoundRobinRemoteDirSelectorTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.remote; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link WeightedRoundRobinRemoteDirSelector}. */ +class WeightedRoundRobinRemoteDirSelectorTest { + + private static final String DEFAULT_DIR = "hdfs://default/data"; + + @Test + void testEmptyRemoteDirsShouldReturnDefault() { + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector( + DEFAULT_DIR, Collections.emptyList(), Collections.emptyList()); + + // Should always return default when remoteDataDirs is empty + for (int i = 0; i < 10; i++) { + assertThat(selector.nextDataDir()).isEqualTo(DEFAULT_DIR); + } + } + + @Test + void testSingleDirShouldAlwaysReturnSame() { + String dir = "hdfs://cluster/data1"; + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector( + DEFAULT_DIR, Collections.singletonList(dir), Collections.singletonList(5)); + + // Should always return the single directory + for (int i = 0; i < 10; i++) { + assertThat(selector.nextDataDir()).isEqualTo(dir); + } + } + + @Test + void testEqualWeightsShouldDistributeEvenly() { + List dirs = + Arrays.asList( + "hdfs://cluster/data1", "hdfs://cluster/data2", "hdfs://cluster/data3"); + List weights = Arrays.asList(1, 1, 1); + + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector(DEFAULT_DIR, dirs, weights); + + Map counts = new HashMap<>(); + int totalCalls = 30; + + for (int i = 0; i < totalCalls; i++) { + String selected = selector.nextDataDir(); + counts.merge(selected, 1, Integer::sum); + } + + // Each directory should be selected equally + assertThat(counts.get(dirs.get(0))).isEqualTo(10); + assertThat(counts.get(dirs.get(1))).isEqualTo(10); + assertThat(counts.get(dirs.get(2))).isEqualTo(10); + } + + @Test + void testWeightedDistribution() { + List dirs = + Arrays.asList( + "hdfs://cluster/data1", "hdfs://cluster/data2", "hdfs://cluster/data3"); + // weights: 5, 1, 1 -> total = 7 + List weights = Arrays.asList(5, 1, 1); + + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector(DEFAULT_DIR, dirs, weights); + + Map counts = new HashMap<>(); + int totalCalls = 70; // 10 complete cycles + + for (int i = 0; i < totalCalls; i++) { + String selected = selector.nextDataDir(); + counts.merge(selected, 1, Integer::sum); + } + + // Distribution should match weights ratio: 5:1:1 + assertThat(counts.get(dirs.get(0))).isEqualTo(50); // 5/7 * 70 = 50 + assertThat(counts.get(dirs.get(1))).isEqualTo(10); // 1/7 * 70 = 10 + assertThat(counts.get(dirs.get(2))).isEqualTo(10); // 1/7 * 70 = 10 + } + + @Test + void testInterleavedDistribution() { + // Verify that selections are interleaved, not consecutive + List dirs = + Arrays.asList("hdfs://cluster/A", "hdfs://cluster/B", "hdfs://cluster/C"); + // weights: 5, 1, 1 -> total = 7 + List weights = Arrays.asList(5, 1, 1); + + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector(DEFAULT_DIR, dirs, weights); + + List sequence = new ArrayList<>(); + for (int i = 0; i < 7; i++) { + sequence.add(selector.nextDataDir()); + } + + // Expected interleaved sequence for weights 5,1,1: + // The smooth WRR should produce: A, A, B, A, C, A, A (or similar interleaved pattern) + // Instead of traditional WRR: A, A, A, A, A, B, C + + // Count consecutive same selections - should be less than weight + int maxConsecutive = 0; + int currentConsecutive = 1; + for (int i = 1; i < sequence.size(); i++) { + if (sequence.get(i).equals(sequence.get(i - 1))) { + currentConsecutive++; + } else { + maxConsecutive = Math.max(maxConsecutive, currentConsecutive); + currentConsecutive = 1; + } + } + maxConsecutive = Math.max(maxConsecutive, currentConsecutive); + + // With smooth WRR, max consecutive selections should be <= 2 for this weight distribution + // (In traditional WRR, A would be selected 5 times consecutively) + assertThat(maxConsecutive).isLessThanOrEqualTo(2); + + // Verify all directories are selected at least once within one cycle + assertThat(sequence).contains(dirs.get(0), dirs.get(1), dirs.get(2)); + } + + @Test + void testTwoDirsWithDifferentWeights() { + List dirs = Arrays.asList("hdfs://cluster/data1", "hdfs://cluster/data2"); + // weights: 3, 1 -> total = 4 + List weights = Arrays.asList(3, 1); + + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector(DEFAULT_DIR, dirs, weights); + + List sequence = new ArrayList<>(); + for (int i = 0; i < 8; i++) { + sequence.add(selector.nextDataDir()); + } + + // Count selections + long dir1Count = sequence.stream().filter(d -> d.equals(dirs.get(0))).count(); + long dir2Count = sequence.stream().filter(d -> d.equals(dirs.get(1))).count(); + + // Should follow 3:1 ratio + assertThat(dir1Count).isEqualTo(6); // 3/4 * 8 = 6 + assertThat(dir2Count).isEqualTo(2); // 1/4 * 8 = 2 + } + + @Test + void testCycleRepeatsCorrectly() { + List dirs = Arrays.asList("hdfs://cluster/data1", "hdfs://cluster/data2"); + List weights = Arrays.asList(2, 1); + + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector(DEFAULT_DIR, dirs, weights); + + // Collect first cycle (3 selections) + List firstCycle = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + firstCycle.add(selector.nextDataDir()); + } + + // Collect second cycle (3 selections) + List secondCycle = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + secondCycle.add(selector.nextDataDir()); + } + + // Both cycles should have same sequence + assertThat(secondCycle).isEqualTo(firstCycle); + } + + @Test + void testLargeWeights() { + List dirs = Arrays.asList("hdfs://cluster/data1", "hdfs://cluster/data2"); + List weights = Arrays.asList(100, 1); + + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector(DEFAULT_DIR, dirs, weights); + + Map counts = new HashMap<>(); + int totalCalls = 101; // One complete cycle + + for (int i = 0; i < totalCalls; i++) { + String selected = selector.nextDataDir(); + counts.merge(selected, 1, Integer::sum); + } + + // Should follow 100:1 ratio + assertThat(counts.get(dirs.get(0))).isEqualTo(100); + assertThat(counts.get(dirs.get(1))).isEqualTo(1); + } + + @Test + void testZeroWeights() { + // Test case 1: Some directories have zero weight - they should never be selected + List dirs = + Arrays.asList( + "hdfs://cluster/data1", "hdfs://cluster/data2", "hdfs://cluster/data3"); + // weight 0 for data2 means it should never be selected + List weights = Arrays.asList(2, 0, 1); + + WeightedRoundRobinRemoteDirSelector selector = + new WeightedRoundRobinRemoteDirSelector(DEFAULT_DIR, dirs, weights); + + Map counts = new HashMap<>(); + int totalCalls = 30; // 10 complete cycles (total weight = 3) + + for (int i = 0; i < totalCalls; i++) { + String selected = selector.nextDataDir(); + counts.merge(selected, 1, Integer::sum); + } + + // data1 should be selected 20 times (2/3 * 30) + assertThat(counts.get(dirs.get(0))).isEqualTo(20); + // data2 should never be selected (weight = 0) + assertThat(counts.get(dirs.get(1))).isNull(); + // data3 should be selected 10 times (1/3 * 30) + assertThat(counts.get(dirs.get(2))).isEqualTo(10); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index 7d80fe41ce..4dca20f567 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -36,6 +36,7 @@ import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; +import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElection.ControlledShutdownLeaderElection; import org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine.ElectionResult; import org.apache.fluss.server.metadata.CoordinatorMetadataCache; @@ -126,6 +127,7 @@ void beforeEach() { zookeeperClient, new Configuration(), new LakeCatalogDynamicLoader(new Configuration(), null, true)), + new RemoteDirDynamicLoader(conf), new Configuration()); lakeTableTieringManager = new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java index 6ef53b62ad..a2f4cbedb9 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java @@ -102,7 +102,9 @@ void testGetTableMetadataFromZk() throws Exception { .add(1, BucketAssignment.of(2, 3, 4)) .build(); metadataManager.createDatabase("test_db", DatabaseDescriptor.EMPTY, true); - long tableId = metadataManager.createTable(tablePath, desc, tableAssignment, false); + long tableId = + metadataManager.createTable( + tablePath, DEFAULT_REMOTE_DATA_DIR, desc, tableAssignment, false); // Create leader and isr for buckets TableBucket tableBucket0 = new TableBucket(tableId, 0); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 00a2344a68..48777a6f2e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -138,6 +138,7 @@ public final class FlussClusterExtension private final Configuration clusterConf; private final Clock clock; private final String[] racks; + private final List remoteDirNames; /** Creates a new {@link Builder} for {@link FlussClusterExtension}. */ public static Builder builder() { @@ -150,7 +151,8 @@ private FlussClusterExtension( String tabletServerListeners, Configuration clusterConf, Clock clock, - String[] racks) { + String[] racks, + List remoteDirNames) { this.initialNumOfTabletServers = numOfTabletServers; this.tabletServers = new HashMap<>(numOfTabletServers); this.coordinatorServerListeners = coordinatorServerListeners; @@ -162,6 +164,7 @@ private FlussClusterExtension( racks != null && racks.length == numOfTabletServers, "racks must be not null and have the same length as numOfTabletServers"); this.racks = racks; + this.remoteDirNames = remoteDirNames; } @Override @@ -206,6 +209,7 @@ public void start() throws Exception { tempDir = Files.createTempDirectory("fluss-testing-cluster").toFile(); Configuration conf = new Configuration(); setRemoteDataDir(conf); + setRemoteDataDirs(conf); zooKeeperServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer(); zooKeeperClient = createZooKeeperClient( @@ -215,6 +219,7 @@ public void start() throws Exception { zooKeeperClient, clusterConf, new LakeCatalogDynamicLoader(clusterConf, null, true)); + rpcClient = RpcClient.create( conf, @@ -263,6 +268,7 @@ public void startCoordinatorServer() throws Exception { conf.setString(ConfigOptions.ZOOKEEPER_ADDRESS, zooKeeperServer.getConnectString()); conf.setString(ConfigOptions.BIND_LISTENERS, coordinatorServerListeners); setRemoteDataDir(conf); + setRemoteDataDirs(conf); coordinatorServer = new CoordinatorServer(conf, clock); coordinatorServer.start(); waitUntilCoordinatorServerElected(); @@ -375,12 +381,26 @@ private void setRemoteDataDir(Configuration conf) { conf.set(ConfigOptions.REMOTE_DATA_DIR, getRemoteDataDir()); } + private void setRemoteDataDirs(Configuration conf) { + if (!remoteDirNames.isEmpty()) { + List remoteDataDirs = + remoteDirNames.stream() + .map(this::getRemoteDataDir) + .collect(Collectors.toList()); + conf.set(ConfigOptions.REMOTE_DATA_DIRS, remoteDataDirs); + } + } + public String getRemoteDataDir() { + return getRemoteDataDir("remote-data-dir"); + } + + public String getRemoteDataDir(String dirName) { return LocalFileSystem.getLocalFsURI().getScheme() + "://" + tempDir.getAbsolutePath() + File.separator - + "remote-data-dir"; + + dirName; } /** Stop a tablet server. */ @@ -780,7 +800,7 @@ private Long triggerSnapshot(TableBucket tableBucket) { } } - private CompletedSnapshot waitUntilSnapshotFinished(TableBucket tableBucket, long snapshotId) { + public CompletedSnapshot waitUntilSnapshotFinished(TableBucket tableBucket, long snapshotId) { ZooKeeperClient zkClient = getZooKeeperClient(); return waitValue( () -> { @@ -969,6 +989,7 @@ public static class Builder { private String coordinatorServerListeners = DEFAULT_LISTENERS; private Clock clock = SystemClock.getInstance(); private String[] racks = new String[] {"rack-0"}; + private List remoteDirNames = Collections.emptyList(); private final Configuration clusterConf = new Configuration(); @@ -1014,6 +1035,11 @@ public Builder setRacks(String[] racks) { return this; } + public Builder setRemoteDirNames(List remoteDirNames) { + this.remoteDirNames = remoteDirNames; + return this; + } + public FlussClusterExtension build() { if (numOfTabletServers > 1 && racks.length == 1) { String[] racks = new String[numOfTabletServers]; @@ -1029,7 +1055,8 @@ public FlussClusterExtension build() { tabletServerListeners, clusterConf, clock, - racks); + racks, + remoteDirNames); } } } From 7d1a28a4eacf4df20ee82fda88f20f07960e9237 Mon Sep 17 00:00:00 2001 From: Liebing Date: Mon, 25 May 2026 09:27:04 +0800 Subject: [PATCH 2/2] address jark's comments an resolve confilict --- .../fluss/client/admin/FlussAdminITCase.java | 37 +++++++++++++++++++ .../fluss/server/DynamicServerConfig.java | 7 ++-- .../apache/fluss/server/RpcServiceBase.java | 1 + .../remote/RemoteDirDynamicLoader.java | 12 +++++- .../remote/RoundRobinRemoteDirSelector.java | 4 +- .../fluss/server/DynamicConfigChangeTest.java | 2 +- .../remote/RemoteDirDynamicLoaderTest.java | 27 ++++++++++++-- .../coordinator/remote/RemoteDirsITCase.java | 3 +- 8 files changed, 80 insertions(+), 13 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 2cda18398c..da737dd07c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -1619,6 +1619,43 @@ void testDynamicDiskWriteLimitRatio() throws Exception { .get(); } + @Test + void testDynamicRemoteDataDirsWithRoundRobin() throws Exception { + String originalDir = FLUSS_CLUSTER_EXTENSION.getRemoteDataDir(); + String newDir1 = FLUSS_CLUSTER_EXTENSION.getRemoteDataDir("remote-dir-1"); + String newDir2 = FLUSS_CLUSTER_EXTENSION.getRemoteDataDir("remote-dir-2"); + + // Dynamically switch from single remote.data.dir to multiple remote.data.dirs + // with round-robin strategy; original dir must be included + admin.alterClusterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions.REMOTE_DATA_DIRS.key(), + String.join(",", originalDir, newDir1, newDir2), + AlterConfigOpType.SET))) + .get(); + + // Create 6 tables and verify round-robin distribution across 3 directories + int tableCount = 6; + Map dirUsageCount = new HashMap<>(); + for (int i = 0; i < tableCount; i++) { + TablePath tablePath = TablePath.of("test_db", "dynamic_rr_table_" + i); + createTable( + tablePath, + TableDescriptor.builder().schema(DEFAULT_SCHEMA).distributedBy(1, "id").build(), + true); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + String remoteDataDir = tableInfo.getRemoteDataDir(); + assertThat(remoteDataDir).isNotNull(); + dirUsageCount.merge(remoteDataDir, 1, Integer::sum); + } + + // Each of the 3 directories should be used exactly twice (6 / 3 = 2) + assertThat(dirUsageCount).hasSize(3); + assertThat(dirUsageCount.values()).allMatch(count -> count == 2); + } + private void assertConfigEntry( String key, @Nullable String value, ConfigEntry.ConfigSource source) throws ExecutionException, InterruptedException { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index 4a0617c72b..dee0b1fade 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -45,10 +45,10 @@ import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC; import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL; import static org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER; -import static org.apache.fluss.config.ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO; import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS; import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS_STRATEGY; import static org.apache.fluss.config.ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS; +import static org.apache.fluss.config.ConfigOptions.SERVER_DATA_DISK_WRITE_LIMIT_RATIO; import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -69,12 +69,11 @@ class DynamicServerConfig { LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(), KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), KV_SNAPSHOT_INTERVAL.key(), - SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key()), - KV_SNAPSHOT_INTERVAL.key(), + SERVER_DATA_DISK_WRITE_LIMIT_RATIO.key(), // Config options for remote.data.dirs REMOTE_DATA_DIRS.key(), REMOTE_DATA_DIRS_STRATEGY.key(), - REMOTE_DATA_DIRS_WEIGHTS.key()); + REMOTE_DATA_DIRS_WEIGHTS.key())); private static final Set ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake."); private final ReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index dedef6ea8b..f7b6de9eb8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -302,6 +302,7 @@ public CompletableFuture getTableInfo(GetTableInfoRequest response.setTableJson(tableInfo.toTableDescriptor().toJsonBytes()) .setSchemaId(tableInfo.getSchemaId()) .setTableId(tableInfo.getTableId()) + .setRemoteDataDir(tableInfo.getRemoteDataDir()) .setCreatedTime(tableInfo.getCreatedTime()) .setModifiedTime(tableInfo.getModifiedTime()); return CompletableFuture.completedFuture(response); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java index 1caf4b2f6e..3c63bd2b19 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoader.java @@ -24,6 +24,7 @@ import org.apache.fluss.exception.ConfigException; import org.apache.fluss.exception.IllegalConfigurationException; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -65,16 +66,23 @@ public void validate(Configuration newConfig) throws ConfigException { if (newRemoteDataDirsOp.isPresent()) { List oldRemoteDataDirs = currentConfiguration.get(ConfigOptions.REMOTE_DATA_DIRS); + // When old remote.data.dirs is empty, the cluster was using remote.data.dir + // as the sole directory. Ensure it is included in new remote.data.dirs. + if (oldRemoteDataDirs.isEmpty()) { + String oldRemoteDataDir = currentConfiguration.get(ConfigOptions.REMOTE_DATA_DIR); + if (oldRemoteDataDir != null) { + oldRemoteDataDirs = Collections.singletonList(oldRemoteDataDir); + } + } Set newRemoteDataDirs = new HashSet<>(newRemoteDataDirsOp.get()); if (!newRemoteDataDirs.containsAll(oldRemoteDataDirs)) { throw new ConfigException( String.format( - "New %s: %s must contain all old %s: %s. " + "New %s: %s must contain all existing remote data directories: %s. " + "If you want the Fluss cluster to stop transferring data to a certain path, " + "keep it in %s and set its weight to 0 in %s.", ConfigOptions.REMOTE_DATA_DIRS.key(), newRemoteDataDirsOp.get(), - ConfigOptions.REMOTE_DATA_DIRS.key(), oldRemoteDataDirs, ConfigOptions.REMOTE_DATA_DIRS.key(), ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java index b627ce4fe7..b785a261a5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/remote/RoundRobinRemoteDirSelector.java @@ -49,7 +49,7 @@ public String nextDataDir() { return remoteDataDir; } - int index = position.getAndUpdate(i -> (i + 1) % remoteDataDirs.size()); - return remoteDataDirs.get(index); + int index = position.getAndIncrement(); + return remoteDataDirs.get(index % remoteDataDirs.size()); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index e65b7e7044..c41afa0f6d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -24,10 +24,10 @@ import org.apache.fluss.config.cluster.ServerReconfigurable; import org.apache.fluss.exception.ConfigException; import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; -import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.coordinator.remote.RemoteDirDynamicLoader; import org.apache.fluss.server.coordinator.remote.RoundRobinRemoteDirSelector; import org.apache.fluss.server.coordinator.remote.WeightedRoundRobinRemoteDirSelector; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java index f51ad0788c..c8596f9a1c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirDynamicLoaderTest.java @@ -93,8 +93,8 @@ void testReconfigureWithWeightsChange() throws Exception { } @Test - void testReconfigureWithRemoteDataDirsChange() throws Exception { - // Test new dirs must contain all old dirs + void testValidateRemoteDataDirsChange() throws Exception { + // 1. New dirs must contain all old remote.data.dirs Configuration conf1 = new Configuration(); conf1.set(ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("hdfs://dir1", "hdfs://dir2")); try (RemoteDirDynamicLoader loader = new RemoteDirDynamicLoader(conf1)) { @@ -104,7 +104,28 @@ void testReconfigureWithRemoteDataDirsChange() throws Exception { assertThatThrownBy(() -> loader.validate(newConfig)) .isInstanceOf(ConfigException.class) - .hasMessageContaining("must contain all old remote.data.dirs"); + .hasMessageContaining("must contain all existing remote data directories"); + } + + // 2. New dirs must also contain old remote.data.dir (singular) + Configuration conf2 = new Configuration(); + conf2.setString(ConfigOptions.REMOTE_DATA_DIR, "hdfs://original-dir"); + try (RemoteDirDynamicLoader loader = new RemoteDirDynamicLoader(conf2)) { + Configuration newConfig = new Configuration(); + newConfig.set( + ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("hdfs://dir1", "hdfs://dir2")); + + assertThatThrownBy(() -> loader.validate(newConfig)) + .isInstanceOf(ConfigException.class) + .hasMessageContaining("must contain all existing remote data directories") + .hasMessageContaining("hdfs://original-dir"); + + // 3. Including old remote.data.dir should pass + Configuration validConfig = new Configuration(); + validConfig.set( + ConfigOptions.REMOTE_DATA_DIRS, + Arrays.asList("hdfs://original-dir", "hdfs://dir2")); + loader.validate(validConfig); } } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java index 6cbc2544c3..808c10b19e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/remote/RemoteDirsITCase.java @@ -129,8 +129,9 @@ void testCreateMultipleTablesWithRoundRobin(boolean isPrimaryKeyTable) throws Ex dirUsageCount.merge(dir, 1, Integer::sum); } - // With round-robin, all configured dirs should be used + // With round-robin, all configured dirs should be used exactly twice (6 / 3) assertThat(dirUsageCount.keySet()).hasSize(REMOTE_DIR_NAMES.size()); + assertThat(dirUsageCount.values()).allMatch(count -> count == 2); } @ParameterizedTest