From 05735cd7b4941b865f15c969d41feea8514bb286 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 5 May 2025 14:18:02 +0530 Subject: [PATCH 01/11] HDDS-12554. Support callback on completed reconfiguration --- .../hadoop/ozone/HddsDatanodeService.java | 8 + .../hadoop/hdds/conf/ReconfigurableBase.java | 207 ++++++++++++++++++ .../hdds/conf/ReconfigurationHandler.java | 1 - .../scm/server/StorageContainerManager.java | 8 + .../hadoop/ozone/shell/TestReconfigShell.java | 2 +- .../apache/hadoop/ozone/om/OzoneManager.java | 8 + 6 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index 585cab9d38ab..1da61c21a4bf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -287,6 +287,14 @@ public String getNamespace() { .register(REPLICATION_STREAMS_LIMIT_KEY, this::reconfigReplicationStreamsLimit); + reconfigurationHandler.setReconfigurationCompleteCallback(status -> { + if (status.getStatus() != null && !status.getStatus().isEmpty()) { + LOG.info("Reconfiguration completed. Properties are updated."); + } else { + LOG.info("Reconfiguration complete. No properties were changed."); + } + }); + datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf, dnCertClient, secretKeyClient, this::terminateDatanode, reconfigurationHandler); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java new file mode 100644 index 000000000000..838252931b1e --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.conf; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.ConfigRedactor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.conf.Reconfigurable; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class to support dynamic reconfiguration of configuration properties at runtime. + */ +public abstract class ReconfigurableBase extends Configured implements Reconfigurable { + private static final Logger LOG = LoggerFactory.getLogger(ReconfigurableBase.class); + private final ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil(); + private Thread reconfigThread = null; + private volatile boolean shouldRun = true; + private final Object reconfigLock = new Object(); + private long startTime = 0L; + private long endTime = 0L; + private Map> status = null; + private Consumer reconfigurationCompleteCallback; + + public ReconfigurableBase(Configuration conf) { + super(conf == null ? new Configuration() : conf); + } + + protected abstract Configuration getNewConf(); + + @VisibleForTesting + public Collection getChangedProperties(Configuration newConf, + Configuration oldConf) { + return this.reconfigurationUtil.parseChangedProperties(newConf, oldConf); + } + + public void startReconfigurationTask() throws IOException { + synchronized (this.reconfigLock) { + String errorMessage; + if (!this.shouldRun) { + errorMessage = "The server is stopped."; + LOG.warn(errorMessage); + throw new IOException(errorMessage); + } else if (this.reconfigThread != null) { + errorMessage = "Another reconfiguration task is running."; + LOG.warn(errorMessage); + throw new IOException(errorMessage); + } else { + this.reconfigThread = new ReconfigurationThread(this); + this.reconfigThread.setDaemon(true); + this.reconfigThread.setName("Reconfiguration Task"); + this.reconfigThread.start(); + this.startTime = Time.now(); + } + } + } + + public ReconfigurationTaskStatus getReconfigurationTaskStatus() { + synchronized (this.reconfigLock) { + return this.reconfigThread != null ? new ReconfigurationTaskStatus(this.startTime, 0L, null) : + new ReconfigurationTaskStatus(this.startTime, this.endTime, this.status); + } + } + + public void shutdownReconfigurationTask() { + Thread tempThread; + synchronized (this.reconfigLock) { + this.shouldRun = false; + if (this.reconfigThread == null) { + return; + } + + tempThread = this.reconfigThread; + this.reconfigThread = null; + } + + try { + tempThread.join(); + } catch (InterruptedException ignored) { + } + + } + + public final void reconfigureProperty(String property, String newVal) throws ReconfigurationException { + if (this.isPropertyReconfigurable(property)) { + LOG.info("changing property " + property + " to " + newVal); + synchronized (this.getConf()) { + this.getConf().get(property); + String effectiveValue = this.reconfigurePropertyImpl(property, newVal); + if (newVal != null) { + this.getConf().set(property, effectiveValue); + } else { + this.getConf().unset(property); + } + + } + } else { + throw new ReconfigurationException(property, newVal, this.getConf().get(property)); + } + } + + public abstract Collection getReconfigurableProperties(); + + public boolean isPropertyReconfigurable(String property) { + return this.getReconfigurableProperties().contains(property); + } + + protected abstract String reconfigurePropertyImpl(String var1, String var2) throws ReconfigurationException; + + private static class ReconfigurationThread extends Thread { + private final ReconfigurableBase parent; + + ReconfigurationThread(ReconfigurableBase base) { + this.parent = base; + } + + public void run() { + LOG.info("Starting reconfiguration task."); + Configuration oldConf = this.parent.getConf(); + Configuration newConf = this.parent.getNewConf(); + Collection changes = this.parent.getChangedProperties(newConf, oldConf); + Map> results = Maps.newHashMap(); + ConfigRedactor oldRedactor = new ConfigRedactor(oldConf); + ConfigRedactor newRedactor = new ConfigRedactor(newConf); + + for (ReconfigurationUtil.PropertyChange change : changes) { + String errorMessage = null; + String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal); + String newValRedacted = newRedactor.redact(change.prop, change.newVal); + if (!this.parent.isPropertyReconfigurable(change.prop)) { + LOG.info(String.format("Property %s is not configurable: old value: %s, new value: %s", + change.prop, oldValRedacted, newValRedacted)); + } else { + LOG.info("Change property: " + change.prop + " from \"" + + (change.oldVal == null ? "" : oldValRedacted) + "\" to \"" + + (change.newVal == null ? "" : newValRedacted) + "\"."); + + try { + String effectiveValue = this.parent.reconfigurePropertyImpl(change.prop, change.newVal); + if (change.newVal != null) { + oldConf.set(change.prop, effectiveValue); + } else { + oldConf.unset(change.prop); + } + } catch (ReconfigurationException var16) { + Throwable cause = var16.getCause(); + errorMessage = cause == null ? var16.getMessage() : cause.getMessage(); + } + + results.put(change, Optional.ofNullable(errorMessage)); + } + } + + synchronized (this.parent.reconfigLock) { + this.parent.endTime = Time.now(); + this.parent.status = Collections.unmodifiableMap(results); + this.parent.reconfigThread = null; + + LOG.info("Reconfiguration completed. {} properties were updated.", results.size()); + + if (parent.reconfigurationCompleteCallback != null) { + try { + parent.reconfigurationCompleteCallback.accept( + parent.getReconfigurationTaskStatus()); + } catch (Exception e) { + LOG.warn("Reconfiguration complete callback threw exception", e); + } + } + } + } + } + + public void setReconfigurationCompleteCallback(Consumer callback) { + synchronized (reconfigLock) { + this.reconfigurationCompleteCallback = callback; + } + } + +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java index a594bfa27605..1ac3b66ca365 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.UnaryOperator; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.ReconfigurableBase; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.hdds.protocol.ReconfigureProtocol; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 04bf80366ed2..5911bf08215d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -403,6 +403,14 @@ private StorageContainerManager(OzoneConfiguration conf, .register(OZONE_READONLY_ADMINISTRATORS, this::reconfOzoneReadOnlyAdmins); + reconfigurationHandler.setReconfigurationCompleteCallback(status -> { + if (status.getStatus() != null && !status.getStatus().isEmpty()) { + LOG.info("Reconfiguration completed. Properties are updated."); + } else { + LOG.info("Reconfiguration complete. No properties were changed."); + } + }); + initializeSystemManagers(conf, configurator); if (isSecretKeyEnable(securityConfig)) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java index d6001a2fd819..d133084c09b2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java @@ -24,7 +24,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import org.apache.hadoop.conf.ReconfigurableBase; +import org.apache.hadoop.hdds.conf.ReconfigurableBase; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.node.NodeManager; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index d265e6dfd8eb..e028bde4bd6f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -523,6 +523,14 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) .register(OZONE_KEY_DELETING_LIMIT_PER_TASK, this::reconfOzoneKeyDeletingLimitPerTask); + reconfigurationHandler.setReconfigurationCompleteCallback(status -> { + if (status.getStatus() != null && !status.getStatus().isEmpty()) { + LOG.info("Reconfiguration completed. Properties are updated."); + } else { + LOG.info("Reconfiguration complete. No properties were changed."); + } + }); + versionManager = new OMLayoutVersionManager(omStorage.getLayoutVersion()); upgradeFinalizer = new OMUpgradeFinalizer(versionManager); replicationConfigValidator = From c63cde063cb3d575ed6292cbab693182c980065b Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 5 May 2025 14:35:48 +0530 Subject: [PATCH 02/11] Fixed PMD failures --- .../java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java index 838252931b1e..27e30b606b2c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java @@ -109,6 +109,7 @@ public void shutdownReconfigurationTask() { } + @Override public final void reconfigureProperty(String property, String newVal) throws ReconfigurationException { if (this.isPropertyReconfigurable(property)) { LOG.info("changing property " + property + " to " + newVal); @@ -127,8 +128,10 @@ public final void reconfigureProperty(String property, String newVal) throws Rec } } + @Override public abstract Collection getReconfigurableProperties(); + @Override public boolean isPropertyReconfigurable(String property) { return this.getReconfigurableProperties().contains(property); } @@ -142,6 +145,7 @@ private static class ReconfigurationThread extends Thread { this.parent = base; } + @Override public void run() { LOG.info("Starting reconfiguration task."); Configuration oldConf = this.parent.getConf(); From 1a2a13cbec1d10b0dc990ec455918dfff35c6a01 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Fri, 16 May 2025 17:09:27 +0530 Subject: [PATCH 03/11] Register the config in BackgroundService --- .../hadoop/hdds/utils/BackgroundService.java | 23 ++++++-- .../hadoop/ozone/HddsDatanodeService.java | 2 +- .../hadoop/hdds/conf/ReconfigurableBase.java | 12 ++-- .../conf/ReconfigurationChangeCallback.java | 29 +++++++++ .../hdds/conf/ReconfigurationHandler.java | 39 ++++++++++++ .../scm/server/StorageContainerManager.java | 2 +- .../apache/hadoop/ozone/om/OzoneManager.java | 59 ++++++++++++++++++- .../om/service/DirectoryDeletingService.java | 16 +++++ 8 files changed, 168 insertions(+), 14 deletions(-) create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 959bee8d8c5f..442d437e7cd0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -42,11 +43,12 @@ public abstract class BackgroundService { // Executor to launch child tasks private final ScheduledThreadPoolExecutor exec; + private volatile ScheduledFuture scheduledHandle; private final ThreadGroup threadGroup; private final String serviceName; - private final long interval; + private long interval; private final long serviceTimeoutInNanos; - private final TimeUnit unit; + private TimeUnit unit; private final PeriodicalTask service; public BackgroundService(String serviceName, long interval, @@ -103,8 +105,21 @@ public void runPeriodicalTaskNow() throws Exception { } // start service - public void start() { - exec.scheduleWithFixedDelay(service, 0, interval, unit); + public synchronized void start() { + scheduledHandle = exec.scheduleWithFixedDelay(service, 0, interval, unit); + } + + protected void setInterval(long newInterval, TimeUnit newUnit) { + this.interval = newInterval; + this.unit = newUnit; + } + + public synchronized void stop() { + LOG.info("Stopping {}", serviceName); + if (scheduledHandle != null) { + scheduledHandle.cancel(false); // don't interrupt running tasks + scheduledHandle = null; + } } public abstract BackgroundTaskQueue getTasks(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index 1da61c21a4bf..8c3005ce1301 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -287,7 +287,7 @@ public String getNamespace() { .register(REPLICATION_STREAMS_LIMIT_KEY, this::reconfigReplicationStreamsLimit); - reconfigurationHandler.setReconfigurationCompleteCallback(status -> { + reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { if (status.getStatus() != null && !status.getStatus().isEmpty()) { LOG.info("Reconfiguration completed. Properties are updated."); } else { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java index 27e30b606b2c..093d209bf572 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java @@ -19,6 +19,7 @@ import com.google.common.collect.Maps; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -48,7 +49,7 @@ public abstract class ReconfigurableBase extends Configured implements Reconfigu private long startTime = 0L; private long endTime = 0L; private Map> status = null; - private Consumer reconfigurationCompleteCallback; + private final Collection> reconfigurationCompleteCallbacks = new ArrayList<>(); public ReconfigurableBase(Configuration conf) { super(conf == null ? new Configuration() : conf); @@ -190,10 +191,9 @@ public void run() { LOG.info("Reconfiguration completed. {} properties were updated.", results.size()); - if (parent.reconfigurationCompleteCallback != null) { + for (Consumer callback : parent.reconfigurationCompleteCallbacks) { try { - parent.reconfigurationCompleteCallback.accept( - parent.getReconfigurationTaskStatus()); + callback.accept(parent.getReconfigurationTaskStatus()); } catch (Exception e) { LOG.warn("Reconfiguration complete callback threw exception", e); } @@ -202,9 +202,9 @@ public void run() { } } - public void setReconfigurationCompleteCallback(Consumer callback) { + public void addReconfigurationCompleteCallback(Consumer callback) { synchronized (reconfigLock) { - this.reconfigurationCompleteCallback = callback; + this.reconfigurationCompleteCallbacks.add(callback); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java new file mode 100644 index 000000000000..ed5cd913643a --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.conf; + +import org.apache.hadoop.conf.Configuration; +import java.util.Map; + +/** + * Define a Callback Interface + */ +@FunctionalInterface +public interface ReconfigurationChangeCallback { + void onPropertiesChanged(Map changedKeys, Configuration newConf); +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java index 1ac3b66ca365..51b5dea22638 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java @@ -22,15 +22,18 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; import java.util.function.UnaryOperator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.conf.ReconfigurationTaskStatus; +import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.hdds.protocol.ReconfigureProtocol; import org.apache.ratis.util.function.CheckedConsumer; @@ -46,11 +49,47 @@ public class ReconfigurationHandler extends ReconfigurableBase private final Map> properties = new ConcurrentHashMap<>(); + private final List completeCallbacks = new ArrayList<>(); + private BiConsumer reconfigurationStatusListener; + + public void registerCompleteCallback(ReconfigurationChangeCallback callback) { + completeCallbacks.add(callback); + } + + public void setReconfigurationCompleteCallback(BiConsumer + reconfigurationStatusListener) { + this.reconfigurationStatusListener = reconfigurationStatusListener; + } + + private void triggerCompleteCallbacks(ReconfigurationTaskStatus status, Configuration newConf) { + if (status.getStatus() != null && !status.getStatus().isEmpty()) { + Map changedKeys = new HashMap<>(); + for (ReconfigurationUtil.PropertyChange change : status.getStatus().keySet()) { + boolean deleted = change.newVal == null; + changedKeys.put(change.prop, !deleted); + } + for (ReconfigurationChangeCallback callback : completeCallbacks) { + callback.onPropertiesChanged(changedKeys, newConf); + } + } + + if (reconfigurationStatusListener != null) { + reconfigurationStatusListener.accept(status, newConf); + } + } + public ReconfigurationHandler(String name, OzoneConfiguration config, CheckedConsumer requireAdminPrivilege) { super(config); this.name = name; this.requireAdminPrivilege = requireAdminPrivilege; + + // Register callback on reconfiguration complete + addReconfigurationCompleteCallback(status -> { + Configuration newConf = getNewConf(); + triggerCompleteCallbacks(status, newConf); + }); + } public ReconfigurationHandler register( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 5911bf08215d..05fbde044de9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -403,7 +403,7 @@ private StorageContainerManager(OzoneConfiguration conf, .register(OZONE_READONLY_ADMINISTRATORS, this::reconfOzoneReadOnlyAdmins); - reconfigurationHandler.setReconfigurationCompleteCallback(status -> { + reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { if (status.getStatus() != null && !status.getStatus().isEmpty()) { LOG.info("Reconfiguration completed. Properties are updated."); } else { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index e028bde4bd6f..056ce40f03c3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -32,6 +32,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX; @@ -52,6 +54,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT; @@ -81,6 +84,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR; @@ -281,6 +286,7 @@ import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider; import org.apache.hadoop.ozone.om.s3.S3SecretStoreProvider; import org.apache.hadoop.ozone.om.service.CompactDBService; +import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService; import org.apache.hadoop.ozone.om.service.QuotaRepairTask; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; @@ -496,6 +502,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl // instance creation every single time. private ReferenceCounted rcOmMetadataReader; private OmSnapshotManager omSnapshotManager; + private volatile DirectoryDeletingService dirDeletingService; @SuppressWarnings("methodlength") private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) @@ -521,9 +528,10 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) this::reconfOzoneReadOnlyAdmins) .register(OZONE_OM_VOLUME_LISTALL_ALLOWED, this::reconfigureAllowListAllVolumes) .register(OZONE_KEY_DELETING_LIMIT_PER_TASK, - this::reconfOzoneKeyDeletingLimitPerTask); + this::reconfOzoneKeyDeletingLimitPerTask) + .register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval); - reconfigurationHandler.setReconfigurationCompleteCallback(status -> { + reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { if (status.getStatus() != null && !status.getStatus().isEmpty()) { LOG.info("Reconfiguration completed. Properties are updated."); } else { @@ -531,6 +539,16 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) } }); + reconfigurationHandler.registerCompleteCallback((changedKeys, newConf) -> { + Set relevantKeys = new HashSet<>(); + relevantKeys.add(OZONE_DIR_DELETING_SERVICE_INTERVAL); + + boolean shouldRestart = changedKeys.keySet().stream().anyMatch(relevantKeys::contains); + if (shouldRestart) { + initOrUpdateDirDeletingService((OzoneConfiguration) newConf); + } + }); + versionManager = new OMLayoutVersionManager(omStorage.getLayoutVersion()); upgradeFinalizer = new OMUpgradeFinalizer(versionManager); replicationConfigValidator = @@ -5105,6 +5123,12 @@ private String reconfigureAllowListAllVolumes(String newVal) { return String.valueOf(allowListAllVolumes); } + private String reconfOzoneDirDeletingServiceInterval(String newVal) { + getConfiguration().set(OZONE_DIR_DELETING_SERVICE_INTERVAL, newVal); + getKeyManager().getDirDeletingService().setDirDeletingServiceInterval(newVal); + return newVal; + } + public void validateReplicationConfig(ReplicationConfig replicationConfig) throws OMException { try { @@ -5115,6 +5139,37 @@ public void validateReplicationConfig(ReplicationConfig replicationConfig) } } + private void initOrUpdateDirDeletingService(OzoneConfiguration conf) { + long newInterval = conf.getTimeDuration( + OZONE_DIR_DELETING_SERVICE_INTERVAL, 60, TimeUnit.SECONDS); + + if (dirDeletingService == null) { + long serviceTimeout = conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + int dirDeletingServiceCorePoolSize = conf.getInt( + OZONE_THREAD_NUMBER_DIR_DELETION, + OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT); + if (dirDeletingServiceCorePoolSize <= 0) { + dirDeletingServiceCorePoolSize = 1; + } + + LOG.info("Initializing DirectoryDeletingService with interval: {}s", newInterval); + + dirDeletingService = new DirectoryDeletingService( + newInterval, TimeUnit.SECONDS, + serviceTimeout, + this, + conf, + dirDeletingServiceCorePoolSize); + } else { + LOG.info("Restarting DirectoryDeletingService with new interval: {}s", newInterval); + dirDeletingService.updateAndRestart(newInterval, TimeUnit.SECONDS); + } + } + @VisibleForTesting public ReplicationConfigValidator getReplicationConfigValidator() { return replicationConfigValidator; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 3790bb286b50..ffc84f63cb54 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -84,6 +84,8 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { private final DeletedDirSupplier deletedDirSupplier; private AtomicInteger taskCount = new AtomicInteger(0); + private String dirDeletingServiceInterval; + public DirectoryDeletingService(long interval, TimeUnit unit, long serviceTimeout, OzoneManager ozoneManager, @@ -99,10 +101,20 @@ public DirectoryDeletingService(long interval, TimeUnit unit, this.suspended = new AtomicBoolean(false); this.isRunningOnAOS = new AtomicBoolean(false); this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize; + this.dirDeletingServiceInterval = configuration.get( + OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, + OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT); deletedDirSupplier = new DeletedDirSupplier(); taskCount.set(0); } + public synchronized void updateAndRestart(long newInterval, TimeUnit newUnit) { + LOG.info("Updating and restarting DirectoryDeletingService with interval: {} {}", newInterval, newUnit); + stop(); + setInterval(newInterval, newUnit); + start(); + } + private boolean shouldRun() { if (getOzoneManager() == null) { // OzoneManager can be null for testing @@ -346,4 +358,8 @@ public KeyValue getPendingDeletedDirInfo() return deletedDirSupplier.get(); } + public void setDirDeletingServiceInterval(String dirDeletingServiceInterval) { + this.dirDeletingServiceInterval = dirDeletingServiceInterval; + } + } From 9e283a7733afc367c8c8be299aac68748aca9ffa Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Fri, 16 May 2025 20:26:56 +0530 Subject: [PATCH 04/11] Basic CI fixes --- .../apache/hadoop/hdds/utils/BackgroundService.java | 2 +- .../hdds/conf/ReconfigurationChangeCallback.java | 4 ++-- .../hadoop/hdds/conf/ReconfigurationHandler.java | 4 ++-- .../hadoop/ozone/reconfig/TestOmReconfiguration.java | 11 +++++++++++ .../ozone/om/service/DirectoryDeletingService.java | 5 ++++- 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 442d437e7cd0..ba3470ce79ed 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -109,7 +109,7 @@ public synchronized void start() { scheduledHandle = exec.scheduleWithFixedDelay(service, 0, interval, unit); } - protected void setInterval(long newInterval, TimeUnit newUnit) { + protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { this.interval = newInterval; this.unit = newUnit; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java index ed5cd913643a..810df7870d1d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationChangeCallback.java @@ -17,11 +17,11 @@ package org.apache.hadoop.hdds.conf; -import org.apache.hadoop.conf.Configuration; import java.util.Map; +import org.apache.hadoop.conf.Configuration; /** - * Define a Callback Interface + * Callback interface to handle configuration changes after a reconfiguration task completes. */ @FunctionalInterface public interface ReconfigurationChangeCallback { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java index 51b5dea22638..650bd8b9941a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java @@ -57,8 +57,8 @@ public void registerCompleteCallback(ReconfigurationChangeCallback callback) { } public void setReconfigurationCompleteCallback(BiConsumer - reconfigurationStatusListener) { - this.reconfigurationStatusListener = reconfigurationStatusListener; + statusListener) { + this.reconfigurationStatusListener = statusListener; } private void triggerCompleteCallbacks(ReconfigurationTaskStatus status, Configuration newConf) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java index 05e7e2f0f3e3..f96d3c5b76ca 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_READONLY_ADMINISTRATORS; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT; @@ -51,6 +52,7 @@ void reconfigurableProperties() { .add(OZONE_KEY_DELETING_LIMIT_PER_TASK) .add(OZONE_OM_VOLUME_LISTALL_ALLOWED) .add(OZONE_READONLY_ADMINISTRATORS) + .add(OZONE_DIR_DELETING_SERVICE_INTERVAL) .addAll(new OmConfig().reconfigurableProperties()) .build(); @@ -121,4 +123,13 @@ void unsetAllowListAllVolumes(String newValue) throws ReconfigurationException { assertEquals(OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT, cluster().getOzoneManager().getAllowListAllVolumes()); } + @Test + void dirDeletingServiceInterval() throws ReconfigurationException { + //Initial string is 1m + getSubject().reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL, "2m"); + + assertEquals("2m", cluster().getOzoneManager(). + getKeyManager().getDirDeletingService().getDirDeletingServiceInterval()); + } + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index ffc84f63cb54..e2789dccc15d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -86,7 +86,6 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { private AtomicInteger taskCount = new AtomicInteger(0); private String dirDeletingServiceInterval; - public DirectoryDeletingService(long interval, TimeUnit unit, long serviceTimeout, OzoneManager ozoneManager, OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize) { @@ -358,6 +357,10 @@ public KeyValue getPendingDeletedDirInfo() return deletedDirSupplier.get(); } + public String getDirDeletingServiceInterval() { + return dirDeletingServiceInterval; + } + public void setDirDeletingServiceInterval(String dirDeletingServiceInterval) { this.dirDeletingServiceInterval = dirDeletingServiceInterval; } From 80fad80dfc11a06eeac7239c001613a9d7214a3c Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 19 May 2025 12:23:12 +0530 Subject: [PATCH 05/11] Improved error handling --- .../hadoop/hdds/utils/BackgroundService.java | 4 ++++ .../hadoop/ozone/HddsDatanodeService.java | 2 +- .../hadoop/hdds/conf/ReconfigurableBase.java | 7 ++++--- .../scm/server/StorageContainerManager.java | 2 +- .../apache/hadoop/ozone/om/OzoneManager.java | 18 ++++++++++-------- .../om/service/DirectoryDeletingService.java | 4 ---- 6 files changed, 20 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index ba3470ce79ed..0883ecf4d0da 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -106,6 +106,10 @@ public void runPeriodicalTaskNow() throws Exception { // start service public synchronized void start() { + if (scheduledHandle != null && !scheduledHandle.isCancelled()) { + LOG.warn("Background service {} is already running", serviceName); + return; + } scheduledHandle = exec.scheduleWithFixedDelay(service, 0, interval, unit); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index 8c3005ce1301..57f66d20305a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -289,7 +289,7 @@ public String getNamespace() { reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { if (status.getStatus() != null && !status.getStatus().isEmpty()) { - LOG.info("Reconfiguration completed. Properties are updated."); + LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size()); } else { LOG.info("Reconfiguration complete. No properties were changed."); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java index 093d209bf572..ac1a46fa7a44 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java @@ -175,9 +175,10 @@ public void run() { } else { oldConf.unset(change.prop); } - } catch (ReconfigurationException var16) { - Throwable cause = var16.getCause(); - errorMessage = cause == null ? var16.getMessage() : cause.getMessage(); + } catch (ReconfigurationException reconfException) { + Throwable cause = reconfException.getCause(); + errorMessage = cause == null ? reconfException.getMessage() : cause.getMessage(); + LOG.error("Failed to reconfigure property {}: {}", change.prop, errorMessage, reconfException); } results.put(change, Optional.ofNullable(errorMessage)); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 05fbde044de9..d8cb37897728 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -405,7 +405,7 @@ private StorageContainerManager(OzoneConfiguration conf, reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { if (status.getStatus() != null && !status.getStatus().isEmpty()) { - LOG.info("Reconfiguration completed. Properties are updated."); + LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size()); } else { LOG.info("Reconfiguration complete. No properties were changed."); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 056ce40f03c3..6ba63f0027c5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -152,6 +152,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.management.ObjectName; import org.apache.commons.lang3.StringUtils; @@ -533,19 +534,15 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { if (status.getStatus() != null && !status.getStatus().isEmpty()) { - LOG.info("Reconfiguration completed. Properties are updated."); + LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size()); } else { LOG.info("Reconfiguration complete. No properties were changed."); } }); reconfigurationHandler.registerCompleteCallback((changedKeys, newConf) -> { - Set relevantKeys = new HashSet<>(); - relevantKeys.add(OZONE_DIR_DELETING_SERVICE_INTERVAL); - - boolean shouldRestart = changedKeys.keySet().stream().anyMatch(relevantKeys::contains); - if (shouldRestart) { - initOrUpdateDirDeletingService((OzoneConfiguration) newConf); + if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) { + initOrUpdateDirDeletingService(getConfiguration()); } }); @@ -5123,9 +5120,14 @@ private String reconfigureAllowListAllVolumes(String newVal) { return String.valueOf(allowListAllVolumes); } + private static final Pattern TIME_DURATION_PATTERN = + Pattern.compile("^\\s*\\d+\\s*(ms|s|m|h|d)\\s*$", Pattern.CASE_INSENSITIVE); + private String reconfOzoneDirDeletingServiceInterval(String newVal) { + if (!TIME_DURATION_PATTERN.matcher(newVal).matches()) { + throw new IllegalArgumentException("Invalid time duration format: " + newVal); + } getConfiguration().set(OZONE_DIR_DELETING_SERVICE_INTERVAL, newVal); - getKeyManager().getDirDeletingService().setDirDeletingServiceInterval(newVal); return newVal; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index e2789dccc15d..6ab0fa7184ea 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -361,8 +361,4 @@ public String getDirDeletingServiceInterval() { return dirDeletingServiceInterval; } - public void setDirDeletingServiceInterval(String dirDeletingServiceInterval) { - this.dirDeletingServiceInterval = dirDeletingServiceInterval; - } - } From ba8e4635d1207060752383d2d30d3fb7b0883ec2 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 19 May 2025 12:39:54 +0530 Subject: [PATCH 06/11] Addressed pmd failure --- .../main/java/org/apache/hadoop/ozone/om/OzoneManager.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 6ba63f0027c5..94904d53f49b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -504,6 +504,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private ReferenceCounted rcOmMetadataReader; private OmSnapshotManager omSnapshotManager; private volatile DirectoryDeletingService dirDeletingService; + private static final Pattern TIME_DURATION_PATTERN = + Pattern.compile("^\\s*\\d+\\s*(ms|s|m|h|d)\\s*$", Pattern.CASE_INSENSITIVE); @SuppressWarnings("methodlength") private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) @@ -5120,9 +5122,6 @@ private String reconfigureAllowListAllVolumes(String newVal) { return String.valueOf(allowListAllVolumes); } - private static final Pattern TIME_DURATION_PATTERN = - Pattern.compile("^\\s*\\d+\\s*(ms|s|m|h|d)\\s*$", Pattern.CASE_INSENSITIVE); - private String reconfOzoneDirDeletingServiceInterval(String newVal) { if (!TIME_DURATION_PATTERN.matcher(newVal).matches()) { throw new IllegalArgumentException("Invalid time duration format: " + newVal); From 7461586986639bdf560342d3af113584e7fbcace Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 19 May 2025 13:11:59 +0530 Subject: [PATCH 07/11] Removed dirDeletingServiceInterval from constructor and its getter function --- .../hadoop/ozone/reconfig/TestOmReconfiguration.java | 9 --------- .../ozone/om/service/DirectoryDeletingService.java | 8 -------- 2 files changed, 17 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java index f96d3c5b76ca..55172f78f000 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestOmReconfiguration.java @@ -123,13 +123,4 @@ void unsetAllowListAllVolumes(String newValue) throws ReconfigurationException { assertEquals(OZONE_OM_VOLUME_LISTALL_ALLOWED_DEFAULT, cluster().getOzoneManager().getAllowListAllVolumes()); } - @Test - void dirDeletingServiceInterval() throws ReconfigurationException { - //Initial string is 1m - getSubject().reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL, "2m"); - - assertEquals("2m", cluster().getOzoneManager(). - getKeyManager().getDirDeletingService().getDirDeletingServiceInterval()); - } - } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 6ab0fa7184ea..dde35e7fb799 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -84,7 +84,6 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { private final DeletedDirSupplier deletedDirSupplier; private AtomicInteger taskCount = new AtomicInteger(0); - private String dirDeletingServiceInterval; public DirectoryDeletingService(long interval, TimeUnit unit, long serviceTimeout, OzoneManager ozoneManager, @@ -100,9 +99,6 @@ public DirectoryDeletingService(long interval, TimeUnit unit, this.suspended = new AtomicBoolean(false); this.isRunningOnAOS = new AtomicBoolean(false); this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize; - this.dirDeletingServiceInterval = configuration.get( - OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, - OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT); deletedDirSupplier = new DeletedDirSupplier(); taskCount.set(0); } @@ -357,8 +353,4 @@ public KeyValue getPendingDeletedDirInfo() return deletedDirSupplier.get(); } - public String getDirDeletingServiceInterval() { - return dirDeletingServiceInterval; - } - } From ad04d20bed6743712ced3f963fa27967ae94a292 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Fri, 30 May 2025 12:13:08 +0530 Subject: [PATCH 08/11] Use shutdown to interrupt service and validate using getTimeDuration() --- .../hadoop/hdds/utils/BackgroundService.java | 44 +++++++++---------- .../apache/hadoop/ozone/om/OzoneManager.java | 43 +----------------- .../om/service/DirectoryDeletingService.java | 13 ++++-- 3 files changed, 30 insertions(+), 70 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 0883ecf4d0da..450c2a3945fc 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -22,7 +22,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -42,13 +41,14 @@ public abstract class BackgroundService { LoggerFactory.getLogger(BackgroundService.class); // Executor to launch child tasks - private final ScheduledThreadPoolExecutor exec; - private volatile ScheduledFuture scheduledHandle; - private final ThreadGroup threadGroup; + private ScheduledThreadPoolExecutor exec; + private ThreadGroup threadGroup; private final String serviceName; private long interval; private final long serviceTimeoutInNanos; private TimeUnit unit; + private final int threadPoolSize; + private final String threadNamePrefix; private final PeriodicalTask service; public BackgroundService(String serviceName, long interval, @@ -64,14 +64,9 @@ public BackgroundService(String serviceName, long interval, this.serviceName = serviceName; this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit) .toLong(TimeUnit.NANOSECONDS); - threadGroup = new ThreadGroup(serviceName); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setThreadFactory(r -> new Thread(threadGroup, r)) - .setDaemon(true) - .setNameFormat(threadNamePrefix + serviceName + "#%d") - .build(); - exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool( - threadPoolSize, threadFactory); + this.threadPoolSize = threadPoolSize; + this.threadNamePrefix = threadNamePrefix; + initExecutorAndThreadGroup(); service = new PeriodicalTask(); } @@ -106,11 +101,10 @@ public void runPeriodicalTaskNow() throws Exception { // start service public synchronized void start() { - if (scheduledHandle != null && !scheduledHandle.isCancelled()) { - LOG.warn("Background service {} is already running", serviceName); - return; + if (exec == null || exec.isShutdown() || exec.isTerminated()) { + initExecutorAndThreadGroup(); } - scheduledHandle = exec.scheduleWithFixedDelay(service, 0, interval, unit); + exec.scheduleWithFixedDelay(service, 0, interval, unit); } protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { @@ -118,14 +112,6 @@ protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { this.unit = newUnit; } - public synchronized void stop() { - LOG.info("Stopping {}", serviceName); - if (scheduledHandle != null) { - scheduledHandle.cancel(false); // don't interrupt running tasks - scheduledHandle = null; - } - } - public abstract BackgroundTaskQueue getTasks(); /** @@ -191,4 +177,14 @@ public void shutdown() { threadGroup.destroy(); } } + + private void initExecutorAndThreadGroup() { + threadGroup = new ThreadGroup(serviceName); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setThreadFactory(r -> new Thread(threadGroup, r)) + .setDaemon(true) + .setNameFormat(threadNamePrefix + serviceName + "#%d") + .build(); + exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 94904d53f49b..e7b31e73ad8f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -32,8 +32,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX; @@ -84,8 +82,6 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR; @@ -152,7 +148,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.management.ObjectName; import org.apache.commons.lang3.StringUtils; @@ -504,8 +499,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private ReferenceCounted rcOmMetadataReader; private OmSnapshotManager omSnapshotManager; private volatile DirectoryDeletingService dirDeletingService; - private static final Pattern TIME_DURATION_PATTERN = - Pattern.compile("^\\s*\\d+\\s*(ms|s|m|h|d)\\s*$", Pattern.CASE_INSENSITIVE); @SuppressWarnings("methodlength") private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) @@ -544,7 +537,7 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) reconfigurationHandler.registerCompleteCallback((changedKeys, newConf) -> { if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) { - initOrUpdateDirDeletingService(getConfiguration()); + getKeyManager().getDirDeletingService().updateAndRestart(getConfiguration()); } }); @@ -5123,9 +5116,6 @@ private String reconfigureAllowListAllVolumes(String newVal) { } private String reconfOzoneDirDeletingServiceInterval(String newVal) { - if (!TIME_DURATION_PATTERN.matcher(newVal).matches()) { - throw new IllegalArgumentException("Invalid time duration format: " + newVal); - } getConfiguration().set(OZONE_DIR_DELETING_SERVICE_INTERVAL, newVal); return newVal; } @@ -5140,37 +5130,6 @@ public void validateReplicationConfig(ReplicationConfig replicationConfig) } } - private void initOrUpdateDirDeletingService(OzoneConfiguration conf) { - long newInterval = conf.getTimeDuration( - OZONE_DIR_DELETING_SERVICE_INTERVAL, 60, TimeUnit.SECONDS); - - if (dirDeletingService == null) { - long serviceTimeout = conf.getTimeDuration( - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - - int dirDeletingServiceCorePoolSize = conf.getInt( - OZONE_THREAD_NUMBER_DIR_DELETION, - OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT); - if (dirDeletingServiceCorePoolSize <= 0) { - dirDeletingServiceCorePoolSize = 1; - } - - LOG.info("Initializing DirectoryDeletingService with interval: {}s", newInterval); - - dirDeletingService = new DirectoryDeletingService( - newInterval, TimeUnit.SECONDS, - serviceTimeout, - this, - conf, - dirDeletingServiceCorePoolSize); - } else { - LOG.info("Restarting DirectoryDeletingService with new interval: {}s", newInterval); - dirDeletingService.updateAndRestart(newInterval, TimeUnit.SECONDS); - } - } - @VisibleForTesting public ReplicationConfigValidator getReplicationConfigValidator() { return replicationConfigValidator; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index dde35e7fb799..9a8987fe9445 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -17,6 +17,9 @@ package org.apache.hadoop.ozone.om.service; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT; + import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -103,10 +106,12 @@ public DirectoryDeletingService(long interval, TimeUnit unit, taskCount.set(0); } - public synchronized void updateAndRestart(long newInterval, TimeUnit newUnit) { - LOG.info("Updating and restarting DirectoryDeletingService with interval: {} {}", newInterval, newUnit); - stop(); - setInterval(newInterval, newUnit); + public synchronized void updateAndRestart(OzoneConfiguration conf) { + long newInterval = conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, + OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS); + LOG.info("Updating and restarting DirectoryDeletingService with interval: {}", newInterval); + shutdown(); + setInterval(newInterval, TimeUnit.SECONDS); start(); } From 329f92003d30e86f1271f7a8f47e0e02c8dffb53 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Fri, 30 May 2025 20:10:37 +0530 Subject: [PATCH 09/11] Added test to verify interval set --- .../hadoop/ozone/shell/TestReconfigShell.java | 35 +++++++++++++++++++ .../src/test/resources/ozone-site.xml | 12 +++++++ .../om/service/DirectoryDeletingService.java | 6 ++++ 3 files changed, 53 insertions(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java index d133084c09b2..afd1574d464a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java @@ -18,12 +18,15 @@ package org.apache.hadoop.ozone.shell; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; import java.util.List; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.ReconfigurableBase; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -33,6 +36,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.admin.OzoneAdmin; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.NonHATests; import org.junit.jupiter.api.AfterEach; @@ -49,6 +53,8 @@ public abstract class TestReconfigShell implements NonHATests.TestCase { private OzoneAdmin ozoneAdmin; + private OzoneConfiguration conf; + private DirectoryDeletingService directoryDeletingService; private GenericTestUtils.PrintStreamCapturer out; private GenericTestUtils.PrintStreamCapturer err; @@ -57,6 +63,8 @@ void capture() { out = GenericTestUtils.captureOut(); err = GenericTestUtils.captureErr(); ozoneAdmin = new OzoneAdmin(); + conf = new OzoneConfiguration(); + directoryDeletingService = cluster().getOzoneManager().getKeyManager().getDirDeletingService(); } @AfterEach @@ -79,6 +87,21 @@ void testOzoneManagerGetReconfigurationProperties() { executeAndAssertProperties(om.getReconfigurationHandler(), "OM", socket); } + @Test + void testDirectoryDeletingServiceIntervalReconfiguration() throws ReconfigurationException { + OzoneManager om = cluster().getOzoneManager(); + InetSocketAddress socket = om.getOmRpcServerAddr(); + + cluster().getOzoneManager().getReconfigurationHandler() + .reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL, "1m"); + + executeAndAssertStart("OM", socket); + //If config value is set in ozone-site.xml then it is picked up during reconfiguration + assertThat(directoryDeletingService.getDirectoryDeletingServiceInterval(conf)).isEqualTo(120L); //2m + executeStatus("OM", socket); + assertThat(out.get()).contains("SUCCESS: Changed property ozone.directory.deleting.service.interval"); + } + @Test void testStorageContainerManagerGetReconfigurationProperties() { StorageContainerManager scm = cluster().getStorageContainerManager(); @@ -132,4 +155,16 @@ private void executeForInServiceDatanodes(int expectedCount) { private String getAddress(InetSocketAddress socket) { return socket.getHostString() + ":" + socket.getPort(); } + + private void executeAndAssertStart(String service, InetSocketAddress socket) { + String address = socket.getHostString() + ":" + socket.getPort(); + ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", address, "start"); + assertThat(out.get()).contains(service + ": Started reconfiguration task on node"); + } + + private void executeStatus(String service, InetSocketAddress socket) { + String address = socket.getHostString() + ":" + socket.getPort(); + ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", address, "status"); + } + } diff --git a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml index 5ea2eb89dfa3..2b07b1d060dc 100644 --- a/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml +++ b/hadoop-ozone/integration-test/src/test/resources/ozone-site.xml @@ -127,5 +127,17 @@ ozone.client.datastream.window.size 8MB + + ozone.readonly.administrators + admin + + + ozone.administrators + admin + + + ozone.directory.deleting.service.interval + 2m + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 9a8987fe9445..c9a8c0ef5bc2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -115,6 +115,12 @@ public synchronized void updateAndRestart(OzoneConfiguration conf) { start(); } + @VisibleForTesting + public long getDirectoryDeletingServiceInterval(OzoneConfiguration conf) { + return conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, + OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS); + } + private boolean shouldRun() { if (getOzoneManager() == null) { // OzoneManager can be null for testing From 70fa4e5ce1b7419c06ca737aa4a16f15a5ffe088 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 2 Jun 2025 18:09:59 +0530 Subject: [PATCH 10/11] Improved the test --- .../hadoop/hdds/conf/ReconfigurableBase.java | 1 - .../hadoop/ozone/shell/TestReconfigShell.java | 36 ++++++++++++++----- .../om/service/DirectoryDeletingService.java | 9 ++--- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java index ac1a46fa7a44..977492fcdf4b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java @@ -122,7 +122,6 @@ public final void reconfigureProperty(String property, String newVal) throws Rec } else { this.getConf().unset(property); } - } } else { throw new ReconfigurationException(property, newVal, this.getConf().get(property)); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java index 1f8f4fb2c55d..289044f89db0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestReconfigShell.java @@ -25,9 +25,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.ReconfigurableBase; +import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -38,6 +40,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.apache.ozone.test.NonHATests; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -52,7 +55,7 @@ public abstract class TestReconfigShell implements NonHATests.TestCase { private OzoneAdmin ozoneAdmin; private OzoneConfiguration conf; - private DirectoryDeletingService directoryDeletingService; + private ReconfigurationHandler reconfigurationHandler; private GenericTestUtils.PrintStreamCapturer out; private GenericTestUtils.PrintStreamCapturer err; @@ -62,7 +65,7 @@ void capture() { err = GenericTestUtils.captureErr(); ozoneAdmin = new OzoneAdmin(); conf = new OzoneConfiguration(); - directoryDeletingService = cluster().getOzoneManager().getKeyManager().getDirDeletingService(); + reconfigurationHandler = cluster().getOzoneManager().getReconfigurationHandler(); } @AfterEach @@ -89,15 +92,29 @@ void testOzoneManagerGetReconfigurationProperties() { void testDirectoryDeletingServiceIntervalReconfiguration() throws ReconfigurationException { OzoneManager om = cluster().getOzoneManager(); InetSocketAddress socket = om.getOmRpcServerAddr(); + LogCapturer logCapturer = LogCapturer.captureLogs(DirectoryDeletingService.class); - cluster().getOzoneManager().getReconfigurationHandler() - .reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL, "1m"); + String initialInterval = "1m"; + String intervalFromXML = "2m"; //config value set in ozone-site.xml + long intervalFromXMLInSeconds = TimeUnit.MINUTES.toSeconds(2); //120 seconds + reconfigurationHandler.reconfigurePropertyImpl(OZONE_DIR_DELETING_SERVICE_INTERVAL, initialInterval); + assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(initialInterval); + + //Start the reconfiguration task executeAndAssertStart("OM", socket); //If config value is set in ozone-site.xml then it is picked up during reconfiguration - assertThat(directoryDeletingService.getDirectoryDeletingServiceInterval(conf)).isEqualTo(120L); //2m - executeStatus("OM", socket); - assertThat(out.get()).contains("SUCCESS: Changed property ozone.directory.deleting.service.interval"); + assertThat(conf.get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML); + + executeAndAssertStatus("OM", socket); + assertThat(reconfigurationHandler.getConf().get(OZONE_DIR_DELETING_SERVICE_INTERVAL)).isEqualTo(intervalFromXML); + assertThat(out.get()).contains( + String.format("SUCCESS: Changed property %s", OZONE_DIR_DELETING_SERVICE_INTERVAL) + ); + assertThat(logCapturer.getOutput()).contains( + String.format("Updating and restarting DirectoryDeletingService with interval: %d %s", + intervalFromXMLInSeconds, TimeUnit.SECONDS.name().toLowerCase()) + ); } @Test @@ -157,12 +174,13 @@ private String getAddress(InetSocketAddress socket) { private void executeAndAssertStart(String service, InetSocketAddress socket) { String address = socket.getHostString() + ":" + socket.getPort(); ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", address, "start"); - assertThat(out.get()).contains(service + ": Started reconfiguration task on node"); + assertThat(out.get()).contains(service + ": Started reconfiguration task on node [" + address + "]"); } - private void executeStatus(String service, InetSocketAddress socket) { + private void executeAndAssertStatus(String service, InetSocketAddress socket) { String address = socket.getHostString() + ":" + socket.getPort(); ozoneAdmin.getCmd().execute("reconfig", "--service", service, "--address", address, "status"); + assertThat(out.get()).contains(service + ": Reconfiguring status for node [" + address + "]: started"); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index a48cd7498899..0a51cc4ce20f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -109,18 +109,13 @@ public DirectoryDeletingService(long interval, TimeUnit unit, public synchronized void updateAndRestart(OzoneConfiguration conf) { long newInterval = conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS); - LOG.info("Updating and restarting DirectoryDeletingService with interval: {}", newInterval); + LOG.info("Updating and restarting DirectoryDeletingService with interval: {} {}", + newInterval, TimeUnit.SECONDS.name().toLowerCase()); shutdown(); setInterval(newInterval, TimeUnit.SECONDS); start(); } - @VisibleForTesting - public long getDirectoryDeletingServiceInterval(OzoneConfiguration conf) { - return conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, - OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS); - } - private boolean shouldRun() { if (getOzoneManager() == null) { // OzoneManager can be null for testing From 0078bda42f97baf452a100ddd5201b226f0dddc6 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Thu, 5 Jun 2025 19:50:32 +0530 Subject: [PATCH 11/11] Move register call back to DDS --- .../hadoop/hdds/utils/BackgroundService.java | 2 ++ .../apache/hadoop/ozone/HddsDatanodeService.java | 8 +------- .../hadoop/hdds/conf/ReconfigurableBase.java | 2 -- .../hadoop/hdds/conf/ReconfigurationHandler.java | 14 ++++++++++++++ .../hdds/scm/server/StorageContainerManager.java | 8 +------- .../org/apache/hadoop/ozone/om/OzoneManager.java | 14 +------------- .../ozone/om/service/DirectoryDeletingService.java | 12 +++++++++++- 7 files changed, 30 insertions(+), 30 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 450c2a3945fc..a5df9a1776e7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -104,6 +104,8 @@ public synchronized void start() { if (exec == null || exec.isShutdown() || exec.isTerminated()) { initExecutorAndThreadGroup(); } + LOG.info("Starting service {} with interval {} {}", serviceName, + interval, unit.name().toLowerCase()); exec.scheduleWithFixedDelay(service, 0, interval, unit); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index 57f66d20305a..31dab87935e6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -287,13 +287,7 @@ public String getNamespace() { .register(REPLICATION_STREAMS_LIMIT_KEY, this::reconfigReplicationStreamsLimit); - reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { - if (status.getStatus() != null && !status.getStatus().isEmpty()) { - LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size()); - } else { - LOG.info("Reconfiguration complete. No properties were changed."); - } - }); + reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback()); datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf, dnCertClient, secretKeyClient, this::terminateDatanode, diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java index 977492fcdf4b..3d3f7e29662d 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurableBase.java @@ -189,8 +189,6 @@ public void run() { this.parent.status = Collections.unmodifiableMap(results); this.parent.reconfigThread = null; - LOG.info("Reconfiguration completed. {} properties were updated.", results.size()); - for (Consumer callback : parent.reconfigurationCompleteCallbacks) { try { callback.accept(parent.getReconfigurationTaskStatus()); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java index 650bd8b9941a..979525f7a1a7 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/ReconfigurationHandler.java @@ -36,6 +36,8 @@ import org.apache.hadoop.conf.ReconfigurationUtil; import org.apache.hadoop.hdds.protocol.ReconfigureProtocol; import org.apache.ratis.util.function.CheckedConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Keeps track of reconfigurable properties and the corresponding functions @@ -44,6 +46,7 @@ public class ReconfigurationHandler extends ReconfigurableBase implements ReconfigureProtocol { + private static final Logger LOG = LoggerFactory.getLogger(ReconfigurationHandler.class); private final String name; private final CheckedConsumer requireAdminPrivilege; private final Map> properties = @@ -61,6 +64,17 @@ public void setReconfigurationCompleteCallback(BiConsumer defaultLoggingCallback() { + return (status, conf) -> { + if (status.getStatus() != null && !status.getStatus().isEmpty()) { + LOG.info("Reconfiguration completed with {} updated properties.", + status.getStatus().size()); + } else { + LOG.info("Reconfiguration complete. No properties were changed."); + } + }; + } + private void triggerCompleteCallbacks(ReconfigurationTaskStatus status, Configuration newConf) { if (status.getStatus() != null && !status.getStatus().isEmpty()) { Map changedKeys = new HashMap<>(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 1481038f2fcd..36b29c6a0792 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -403,13 +403,7 @@ private StorageContainerManager(OzoneConfiguration conf, .register(OZONE_READONLY_ADMINISTRATORS, this::reconfOzoneReadOnlyAdmins); - reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { - if (status.getStatus() != null && !status.getStatus().isEmpty()) { - LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size()); - } else { - LOG.info("Reconfiguration complete. No properties were changed."); - } - }); + reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback()); initializeSystemManagers(conf, configurator); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index aee7bff7d354..ec2b9964f53d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -525,19 +525,7 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) this::reconfOzoneKeyDeletingLimitPerTask) .register(OZONE_DIR_DELETING_SERVICE_INTERVAL, this::reconfOzoneDirDeletingServiceInterval); - reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> { - if (status.getStatus() != null && !status.getStatus().isEmpty()) { - LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size()); - } else { - LOG.info("Reconfiguration complete. No properties were changed."); - } - }); - - reconfigurationHandler.registerCompleteCallback((changedKeys, newConf) -> { - if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) { - getKeyManager().getDirDeletingService().updateAndRestart(getConfiguration()); - } - }); + reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback()); versionManager = new OMLayoutVersionManager(omStorage.getLayoutVersion()); upgradeFinalizer = new OMUpgradeFinalizer(versionManager); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 0a51cc4ce20f..ad90490101c4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; @@ -103,10 +104,19 @@ public DirectoryDeletingService(long interval, TimeUnit unit, this.isRunningOnAOS = new AtomicBoolean(false); this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize; deletedDirSupplier = new DeletedDirSupplier(); + registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), configuration); taskCount.set(0); } - public synchronized void updateAndRestart(OzoneConfiguration conf) { + public void registerReconfigCallbacks(ReconfigurationHandler handler, OzoneConfiguration conf) { + handler.registerCompleteCallback((changedKeys, newConf) -> { + if (changedKeys.containsKey(OZONE_DIR_DELETING_SERVICE_INTERVAL)) { + updateAndRestart(conf); + } + }); + } + + private synchronized void updateAndRestart(OzoneConfiguration conf) { long newInterval = conf.getTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT, TimeUnit.SECONDS); LOG.info("Updating and restarting DirectoryDeletingService with interval: {} {}",