diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java index 282d7abb55..68b3ba2645 100644 --- a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java +++ b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java @@ -41,11 +41,10 @@ public void close() { int fsck(List tables, boolean fix) throws IOException { try (HBaseFsck hbaseFsck = new HBaseFsck(this.configuration)) { + hbaseFsck.connect(); hbaseFsck.setFixReplication(fix); hbaseFsck.checkAndFixReplication(); if (tables != null && !tables.isEmpty()) { - // Below needs connection to be up; uses admin. - hbaseFsck.connect(); hbaseFsck.setCleanReplicationBarrier(fix); for (String table : tables) { hbaseFsck.setCleanReplicationBarrierTable(table); diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java new file mode 100644 index 0000000000..e54a2d0e67 --- /dev/null +++ b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hbase; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for supporting different versions of HBase for creating + * {@link ReplicationQueueStorage} and {@link ReplicationPeerStorage}. + */ +public final class ReplicationStorageFactoryHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactoryHelper.class); + + private ReplicationStorageFactoryHelper() { + } + + public static ReplicationPeerStorage getReplicationPeerStorage(Configuration conf, ZKWatcher zkw, + FileSystem fs) { + // Case HBase >= 2.6.0: Invoke the method that requires three parameters + try { + Method method = ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage", + FileSystem.class, ZKWatcher.class, Configuration.class); + return (ReplicationPeerStorage) method.invoke(null, fs, zkw, conf); + } catch (NoSuchMethodException e) { + LOG.debug("No getReplicationPeerStorage method with FileSystem as a parameter, " + + "should be HBase 2.6-", e); + } catch (IllegalAccessException | InvocationTargetException e) { + // getReplicationPeerStorage method does not throw any exceptions, so should not arrive here + throw new RuntimeException(e); + } + // Case HBase < 2.6.0: Fall back to the method that requires only two parameters + try { + Method method = ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage", + ZKWatcher.class, Configuration.class); + return (ReplicationPeerStorage) method.invoke(null, zkw, conf); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + public static ReplicationQueueStorage getReplicationQueueStorage(Configuration conf, + ZKWatcher zkw, Connection conn) { + try { + Method method = ReplicationStorageFactory.class.getMethod("getReplicationQueueStorage", + Connection.class, Configuration.class); + return (ReplicationQueueStorage) method.invoke(null, conn, conf); + } catch (NoSuchMethodException e) { + LOG.debug("No getReplicationQueueStorage method with Connection as a parameter, " + + "should be HBase 2.x", e); + } catch (IllegalAccessException | InvocationTargetException e) { + // getReplicationQueueStorage method does not throw any exceptions, so should not arrive here + throw new RuntimeException(e); + } + try { + Method method = ReplicationStorageFactory.class.getMethod("getReplicationQueueStorage", + ZKWatcher.class, Configuration.class); + return (ReplicationQueueStorage) method.invoke(null, zkw, conf); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + // getReplicationQueueStorage method does not throw any exceptions, so should not arrive here + throw new RuntimeException(e); + } + } +} diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java index 2448dd4094..601e913636 100644 --- a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java +++ b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java @@ -126,7 +126,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; @@ -155,6 +154,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hbase.HBCKFsUtils; import org.apache.hbase.HBCKMetaTableAccessor; +import org.apache.hbase.ReplicationStorageFactoryHelper; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.zookeeper.KeeperException; @@ -1050,9 +1050,9 @@ private void adoptHdfsOrphan(HbckInfo hi) throws IOException { HFile.Reader hf = null; try { hf = HFile.createReader(fs, hfile.getPath(), CacheConfig.DISABLED, true, getConf()); - Optional startKv = hf.getFirstKey(); + Optional startKv = hf.getFirstKey(); start = CellUtil.cloneRow(startKv.get()); - Optional endKv = hf.getLastKey(); + Optional endKv = hf.getLastKey(); end = CellUtil.cloneRow(endKv.get()); } catch (IOException ioe) { LOG.warn("Problem reading orphan file " + hfile + ", skipping"); @@ -3841,7 +3841,7 @@ private synchronized HbckInfo getOrCreateInfo(String name) { } public void checkAndFixReplication() throws ReplicationException { - ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, errors); + ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, rootFs, connection, errors); checker.checkUnDeletedQueues(); if (checker.hasUnDeletedQueues() && this.fixReplication) { @@ -5475,7 +5475,7 @@ public void cleanReplicationBarrier() throws IOException { return; } ReplicationQueueStorage queueStorage = - ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); + ReplicationStorageFactoryHelper.getReplicationQueueStorage(getConf(), zkw, connection); List peerDescriptions = admin.listReplicationPeers(); if (peerDescriptions != null && peerDescriptions.size() > 0) { List peers = peerDescriptions.stream() diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java index 14359f99cf..5f1a2a9d68 100644 --- a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java +++ b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java @@ -17,10 +17,10 @@ */ package org.apache.hbase.hbck1; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,16 +29,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hbase.ReplicationStorageFactoryHelper; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + /** * Check and fix undeleted replication queues for removed peerId. Copied over wholesale from hbase. * Unaltered except for package and imports. @@ -49,68 +52,199 @@ public class ReplicationChecker { private static final Logger LOG = LoggerFactory.getLogger(ReplicationChecker.class); private final HBaseFsck.ErrorReporter errorReporter; - // replicator with its queueIds for removed peers - private Map> undeletedQueueIds = new HashMap<>(); + + private final UnDeletedQueueChecker unDeletedQueueChecker; + // replicator with its undeleted queueIds for removed peers in hfile-refs queue - private Set undeletedHFileRefsPeerIds = new HashSet<>(); + private Set undeletedHFileRefsPeerIds = Collections.emptySet(); private final ReplicationPeerStorage peerStorage; private final ReplicationQueueStorage queueStorage; - public ReplicationChecker(Configuration conf, ZKWatcher zkw, + private UnDeletedQueueChecker initUnDeletedQueueChecker() { + try { + ReplicationQueueStorage.class.getMethod("listAllPeerIds"); + return new UnDeletedQueueChecker3(); + } catch (NoSuchMethodException e) { + LOG.debug("No listAllPeerIds method, should be hbase 2", e); + return new UnDeletedQueueChecker2(); + } + } + + public ReplicationChecker(Configuration conf, ZKWatcher zkw, FileSystem fs, Connection conn, HBaseFsck.ErrorReporter errorReporter) { - this.peerStorage = getReplicationPeerStorage(conf, zkw); - this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + this.peerStorage = ReplicationStorageFactoryHelper.getReplicationPeerStorage(conf, zkw, fs); + this.queueStorage = ReplicationStorageFactoryHelper.getReplicationQueueStorage(conf, zkw, conn); this.errorReporter = errorReporter; + this.unDeletedQueueChecker = initUnDeletedQueueChecker(); } - private ReplicationPeerStorage getReplicationPeerStorage(Configuration conf, ZKWatcher zkw) - throws AssertionError { - ReplicationPeerStorage peerStorage; - try { - // Case HBase >= 2.6.0: Invoke the method that requires three parameters - Method method = ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage", - FileSystem.class, ZKWatcher.class, Configuration.class); - FileSystem fileSystem = FileSystem.get(conf); - peerStorage = (ReplicationPeerStorage) method.invoke(null, fileSystem, zkw, conf); - } catch (IOException | NoSuchMethodException | IllegalAccessException - | InvocationTargetException e1) { - // Case HBase < 2.6.0: Fall back to the method that requires only two parameters + public boolean hasUnDeletedQueues() { + return errorReporter.getErrorList() + .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); + } + + private interface UnDeletedQueueChecker { + + void check() throws ReplicationException; + + void fix() throws ReplicationException; + } + + private final class UnDeletedQueueChecker2 implements UnDeletedQueueChecker { + + private final Method getListOfReplicators; + + private final Method getAllQueues; + + private final Method removeQueue; + + private final Method removeReplicatorIfQueueIsEmpty; + + // replicator with its queueIds for removed peers + private Map> undeletedQueueIds = Collections.emptyMap(); + + UnDeletedQueueChecker2() { try { - Method method = ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage", - ZKWatcher.class, Configuration.class); - peerStorage = (ReplicationPeerStorage) method.invoke(null, zkw, conf); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e2) { - throw new AssertionError("should not happen", e2); + getListOfReplicators = ReplicationQueueStorage.class.getMethod("getListOfReplicators"); + getAllQueues = ReplicationQueueStorage.class.getMethod("getAllQueues", ServerName.class); + removeQueue = + ReplicationQueueStorage.class.getMethod("removeQueue", ServerName.class, String.class); + removeReplicatorIfQueueIsEmpty = ReplicationQueueStorage.class + .getMethod("removeReplicatorIfQueueIsEmpty", ServerName.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException("method unavailable", e); } } - return peerStorage; - } - public boolean hasUnDeletedQueues() { - return errorReporter.getErrorList() - .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); + private Map> getUnDeletedQueues() throws ReplicationException { + Map> undeletedQueues = new HashMap<>(); + Set peerIds = new HashSet<>(peerStorage.listPeerIds()); + try { + for (ServerName replicator : (List) getListOfReplicators.invoke(queueStorage)) { + for (String queueId : (List) getAllQueues.invoke(queueStorage, replicator)) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId); + LOG.debug( + "Undeleted replication queue for removed peer found: " + + "[removedPeerId={}, replicator={}, queueId={}]", + queueInfo.getPeerId(), replicator, queueId); + } + } + } + } catch (InvocationTargetException e) { + Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + return undeletedQueues; + } + + @Override + public void check() throws ReplicationException { + undeletedQueueIds = getUnDeletedQueues(); + undeletedQueueIds.forEach((replicator, queueIds) -> { + queueIds.forEach(queueId -> { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + String msg = "Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), + replicator, queueId); + errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, + msg); + }); + }); + } + + @Override + public void fix() throws ReplicationException { + try { + for (Map.Entry> replicatorAndQueueIds : undeletedQueueIds + .entrySet()) { + ServerName replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + removeQueue.invoke(queueStorage, replicator, queueId); + } + removeReplicatorIfQueueIsEmpty.invoke(queueStorage, replicator); + } + } catch (InvocationTargetException e) { + Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } } - private Map> getUnDeletedQueues() throws ReplicationException { - Map> undeletedQueues = new HashMap<>(); - Set peerIds = new HashSet<>(peerStorage.listPeerIds()); - for (ServerName replicator : queueStorage.getListOfReplicators()) { - for (String queueId : queueStorage.getAllQueues(replicator)) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (!peerIds.contains(queueInfo.getPeerId())) { - undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId); - LOG.debug( - "Undeleted replication queue for removed peer found: " - + "[removedPeerId={}, replicator={}, queueId={}]", - queueInfo.getPeerId(), replicator, queueId); + private final class UnDeletedQueueChecker3 implements UnDeletedQueueChecker { + + private final Method listAllPeerIds; + + private final Method removeAllQueues; + + private List unDeletedPeerIds = Collections.emptyList(); + + UnDeletedQueueChecker3() { + try { + listAllPeerIds = ReplicationQueueStorage.class.getMethod("listAllPeerIds"); + removeAllQueues = ReplicationQueueStorage.class.getMethod("removeAllQueues", String.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException("method unavailable", e); + } + } + + private List getUnDeletedPeerIds() throws ReplicationException { + List unDeletedPeerIds = new ArrayList<>(); + try { + Set peerIds = new HashSet<>(peerStorage.listPeerIds()); + for (String peerId : (List) listAllPeerIds.invoke(queueStorage)) { + if (!peerIds.contains(peerId)) { + unDeletedPeerIds.add(peerId); + } } + } catch (InvocationTargetException e) { + Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); } + return unDeletedPeerIds; } - return undeletedQueues; + + @Override + public void check() throws ReplicationException { + unDeletedPeerIds = getUnDeletedPeerIds(); + unDeletedPeerIds.forEach(peerId -> { + String msg = "Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s]", peerId); + errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, + msg); + }); + } + + @Override + public void fix() throws ReplicationException { + try { + for (String peerId : unDeletedPeerIds) { + removeAllQueues.invoke(queueStorage, peerId); + } + } catch (InvocationTargetException e) { + Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } private Set getUndeletedHFileRefsPeers() throws ReplicationException { + Set undeletedHFileRefsPeerIds = new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue()); Set peerIds = new HashSet<>(peerStorage.listPeerIds()); @@ -123,18 +257,30 @@ private Set getUndeletedHFileRefsPeers() throws ReplicationException { return undeletedHFileRefsPeerIds; } + private boolean hasData() throws ReplicationException { + Method hasDataMethod; + try { + hasDataMethod = ReplicationQueueStorage.class.getMethod("hasData"); + } catch (NoSuchMethodException e) { + LOG.debug("No hasData method, should be hbase 2", e); + return true; + } + try { + return (boolean) hasDataMethod.invoke(queueStorage); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } + } + public void checkUnDeletedQueues() throws ReplicationException { - undeletedQueueIds = getUnDeletedQueues(); - undeletedQueueIds.forEach((replicator, queueIds) -> { - queueIds.forEach(queueId -> { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - String msg = "Undeleted replication queue for removed peer found: " - + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), - replicator, queueId); - errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, - msg); - }); - }); + if (!hasData()) { + return; + } + unDeletedQueueChecker.check(); undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers(); undeletedHFileRefsPeerIds.stream() .map(peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found") @@ -143,13 +289,7 @@ public void checkUnDeletedQueues() throws ReplicationException { } public void fixUnDeletedQueues() throws ReplicationException { - for (Map.Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { - ServerName replicator = replicatorAndQueueIds.getKey(); - for (String queueId : replicatorAndQueueIds.getValue()) { - queueStorage.removeQueue(replicator, queueId); - } - queueStorage.removeReplicatorIfQueueIsEmpty(replicator); - } + unDeletedQueueChecker.fix(); for (String peerId : undeletedHFileRefsPeerIds) { queueStorage.removePeerFromHFileRefs(peerId); }