diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java index 6bf1edc7bf03..263191df8049 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.backup.master; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -32,7 +33,8 @@ import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupBoundaries; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.master.HMaster; @@ -41,7 +43,6 @@ import org.apache.hadoop.hbase.master.region.MasterRegionFactory; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,8 @@ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class BackupLogCleaner extends BaseLogCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class); + private static final long TS_BUFFER_DEFAULT = Duration.ofHours(1).toMillis(); + static final String TS_BUFFER_KEY = "hbase.backup.log.cleaner.timestamp.buffer.ms"; private boolean stopped = false; private Connection conn; @@ -86,8 +89,9 @@ public void init(Map params) { * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental * backups. */ - private Map serverToPreservationBoundaryTs(List backups) + private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTable) throws IOException { + List backups = sysTable.getBackupHistory(true); if (LOG.isDebugEnabled()) { LOG.debug( "Cleaning WALs if they are older than the WAL cleanup time-boundary. " @@ -112,27 +116,25 @@ private Map serverToPreservationBoundaryTs(List backu .collect(Collectors.joining(", "))); } - // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp) - // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup - // roots have included the WAL in their backup. - Map boundaries = new HashMap<>(); + BackupBoundaries.BackupBoundariesBuilder builder = + BackupBoundaries.builder(getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT)); for (BackupInfo backupInfo : newestBackupPerRootDir.values()) { + long startCode = Long.parseLong(sysTable.readBackupStartCode(backupInfo.getBackupRootDir())); // Iterate over all tables in the timestamp map, which contains all tables covered in the // backup root, not just the tables included in that specific backup (which could be a subset) for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) { for (Map.Entry entry : backupInfo.getTableSetTimestampMap().get(table) .entrySet()) { - Address address = Address.fromString(entry.getKey()); - Long storedTs = boundaries.get(address); - if (storedTs == null || entry.getValue() < storedTs) { - boundaries.put(address, entry.getValue()); - } + builder.addBackupTimestamps(entry.getKey(), entry.getValue(), startCode); } } } + BackupBoundaries boundaries = builder.build(); + if (LOG.isDebugEnabled()) { - for (Map.Entry entry : boundaries.entrySet()) { + LOG.debug("Boundaries oldestStartCode: {}", boundaries.getOldestStartCode()); + for (Map.Entry entry : boundaries.getBoundaries().entrySet()) { LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(), entry.getValue()); } @@ -153,11 +155,10 @@ public Iterable getDeletableFiles(Iterable files) { return files; } - Map serverToPreservationBoundaryTs; + BackupBoundaries boundaries; try { - try (BackupManager backupManager = new BackupManager(conn, getConf())) { - serverToPreservationBoundaryTs = - serverToPreservationBoundaryTs(backupManager.getBackupHistory(true)); + try (BackupSystemTable sysTable = new BackupSystemTable(conn)) { + boundaries = serverToPreservationBoundaryTs(sysTable); } } catch (IOException ex) { LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs", @@ -165,7 +166,7 @@ public Iterable getDeletableFiles(Iterable files) { return Collections.emptyList(); } for (FileStatus file : files) { - if (canDeleteFile(serverToPreservationBoundaryTs, file.getPath())) { + if (canDeleteFile(boundaries, file.getPath())) { filteredFiles.add(file); } } @@ -200,54 +201,17 @@ public boolean isStopped() { return this.stopped; } - protected static boolean canDeleteFile(Map addressToBoundaryTs, Path path) { + protected static boolean canDeleteFile(BackupBoundaries boundaries, Path path) { if (isHMasterWAL(path)) { return true; } - - try { - String hostname = BackupUtils.parseHostNameFromLogFile(path); - if (hostname == null) { - LOG.warn( - "Cannot parse hostname from RegionServer WAL file: {}. Ignoring cleanup of this log", - path); - return false; - } - Address walServerAddress = Address.fromString(hostname); - long walTimestamp = WAL.getTimestamp(path.getName()); - - if (!addressToBoundaryTs.containsKey(walServerAddress)) { - if (LOG.isDebugEnabled()) { - LOG.debug("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}", - walServerAddress.getHostName(), path); - } - return true; - } - - Long backupBoundary = addressToBoundaryTs.get(walServerAddress); - if (backupBoundary >= walTimestamp) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}", - walServerAddress.getHostName(), backupBoundary, path); - } - return true; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}", - walServerAddress.getHostName(), backupBoundary, path); - } - } catch (Exception ex) { - LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", path, ex); - return false; - } - return false; + return boundaries.isDeletable(path); } private static boolean isHMasterWAL(Path path) { String fn = path.getName(); return fn.startsWith(WALProcedureStore.LOG_PREFIX) - || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX); + || fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX) + || path.toString().contains("/" + MasterRegionFactory.MASTER_STORE_DIR + "/"); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupBoundaries.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupBoundaries.java new file mode 100644 index 000000000000..b38c1bdb68d7 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupBoundaries.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracks time boundaries for WAL file cleanup during backup operations. Maintains the oldest + * timestamp per RegionServer included in any backup, enabling safe determination of which WAL files + * can be deleted without compromising backup integrity. + */ +@InterfaceAudience.Private +public class BackupBoundaries { + private static final Logger LOG = LoggerFactory.getLogger(BackupBoundaries.class); + private static final BackupBoundaries EMPTY_BOUNDARIES = + new BackupBoundaries(Collections.emptyMap(), Long.MAX_VALUE); + + // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp) + // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup + // roots have included the WAL in their backup. + private final Map boundaries; + + // The minimum WAL roll timestamp from the most recent backup of each backup root, used as a + // fallback cleanup boundary for RegionServers without explicit backup boundaries (e.g., servers + // that joined after backups began) + private final long oldestStartCode; + + private BackupBoundaries(Map boundaries, long oldestStartCode) { + this.boundaries = boundaries; + this.oldestStartCode = oldestStartCode; + } + + public boolean isDeletable(Path walLogPath) { + try { + String hostname = BackupUtils.parseHostNameFromLogFile(walLogPath); + + if (hostname == null) { + LOG.warn( + "Cannot parse hostname from RegionServer WAL file: {}. Ignoring cleanup of this log", + walLogPath); + return false; + } + + Address address = Address.fromString(hostname); + long pathTs = WAL.getTimestamp(walLogPath.getName()); + + if (!boundaries.containsKey(address)) { + boolean isDeletable = pathTs <= oldestStartCode; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Boundary for {} not found. isDeletable = {} based on oldestStartCode = {} and WAL ts of {}", + walLogPath, isDeletable, oldestStartCode, pathTs); + } + return isDeletable; + } + + long backupTs = boundaries.get(address); + if (pathTs <= backupTs) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}", + address.getHostName(), pathTs, walLogPath); + } + return true; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}", + address.getHostName(), backupTs, walLogPath); + } + + return false; + } catch (Exception e) { + LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", walLogPath, + e); + return false; + } + } + + public Map getBoundaries() { + return boundaries; + } + + public long getOldestStartCode() { + return oldestStartCode; + } + + public static BackupBoundariesBuilder builder(long tsCleanupBuffer) { + return new BackupBoundariesBuilder(tsCleanupBuffer); + } + + public static class BackupBoundariesBuilder { + private final Map boundaries = new HashMap<>(); + private final long tsCleanupBuffer; + + private long oldestStartCode = Long.MAX_VALUE; + + private BackupBoundariesBuilder(long tsCleanupBuffer) { + this.tsCleanupBuffer = tsCleanupBuffer; + } + + public BackupBoundariesBuilder addBackupTimestamps(String host, long hostLogRollTs, + long backupStartCode) { + Address address = Address.fromString(host); + Long storedTs = boundaries.get(address); + if (storedTs == null || hostLogRollTs < storedTs) { + boundaries.put(address, hostLogRollTs); + } + + if (oldestStartCode > backupStartCode) { + oldestStartCode = backupStartCode; + } + + return this; + } + + public BackupBoundaries build() { + if (boundaries.isEmpty()) { + return EMPTY_BOUNDARIES; + } + + oldestStartCode -= tsCleanupBuffer; + return new BackupBoundaries(boundaries, oldestStartCode); + } + } +} diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java index 0602327f9bd3..57e067148f30 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java @@ -21,9 +21,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; @@ -32,16 +33,23 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.TestBackupBase; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupBoundaries; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -60,6 +68,11 @@ public class TestBackupLogCleaner extends TestBackupBase { // implements all test cases in 1 test since incremental full backup/ // incremental backup has dependencies + @BeforeClass + public static void before() { + TEST_UTIL.getConfiguration().setLong(BackupLogCleaner.TS_BUFFER_KEY, 0); + } + @Test public void testBackupLogCleaner() throws Exception { Path backupRoot1 = new Path(BACKUP_ROOT_DIR, "root1"); @@ -74,9 +87,9 @@ public void testBackupLogCleaner() throws Exception { assertFalse(systemTable.hasBackupSessions()); BackupLogCleaner cleaner = new BackupLogCleaner(); - cleaner.setConf(TEST_UTIL.getConfiguration()); Map params = new HashMap<>(1); params.put(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster()); + cleaner.setConf(TEST_UTIL.getConfiguration()); cleaner.init(params); // All WAL files can be deleted because we do not have backups @@ -197,35 +210,145 @@ public void testBackupLogCleaner() throws Exception { // Taking the minimum timestamp (= 2), this means all WALs preceding B3 can be deleted. deletable = cleaner.getDeletableFiles(walFilesAfterB5); assertEquals(toSet(walFilesAfterB2), toSet(deletable)); + } finally { + TEST_UTIL.truncateTable(BackupSystemTable.getTableName(TEST_UTIL.getConfiguration())).close(); } } - private Set mergeAsSet(Collection toCopy, Collection toAdd) { - Set result = new LinkedHashSet<>(toCopy); - result.addAll(toAdd); - return result; + @Test + public void testDoesNotDeleteWALsFromNewServers() throws Exception { + Path backupRoot1 = new Path(BACKUP_ROOT_DIR, "backup1"); + List tableSetFull = Arrays.asList(table1, table2, table3, table4); + + JVMClusterUtil.RegionServerThread rsThread = null; + try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + LOG.info("Creating initial backup B1"); + String backupIdB1 = backupTables(BackupType.FULL, tableSetFull, backupRoot1.toString()); + assertTrue(checkSucceeded(backupIdB1)); + + List walsAfterB1 = getListOfWALFiles(TEST_UTIL.getConfiguration()); + LOG.info("WALs after B1: {}", walsAfterB1.size()); + + String startCodeStr = systemTable.readBackupStartCode(backupRoot1.toString()); + long b1StartCode = Long.parseLong(startCodeStr); + LOG.info("B1 startCode: {}", b1StartCode); + + // Add a new RegionServer to the cluster + LOG.info("Adding new RegionServer to cluster"); + rsThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); + ServerName newServerName = rsThread.getRegionServer().getServerName(); + LOG.info("New RegionServer started: {}", newServerName); + + // Move a region to the new server to ensure it creates a WAL + List regions = TEST_UTIL.getAdmin().getRegions(table1); + RegionInfo regionToMove = regions.get(0); + + LOG.info("Moving region {} to new server {}", regionToMove.getEncodedName(), newServerName); + TEST_UTIL.getAdmin().move(regionToMove.getEncodedNameAsBytes(), newServerName); + + TEST_UTIL.waitFor(30000, () -> { + try { + HRegionLocation location = TEST_UTIL.getConnection().getRegionLocator(table1) + .getRegionLocation(regionToMove.getStartKey()); + return location.getServerName().equals(newServerName); + } catch (IOException e) { + return false; + } + }); + + // Write some data to trigger WAL creation on the new server + try (Table t1 = TEST_UTIL.getConnection().getTable(table1)) { + for (int i = 0; i < 100; i++) { + Put p = new Put(Bytes.toBytes("newserver-row-" + i)); + p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + t1.put(p); + } + } + TEST_UTIL.getAdmin().flushRegion(regionToMove.getEncodedNameAsBytes()); + + List walsAfterNewServer = getListOfWALFiles(TEST_UTIL.getConfiguration()); + LOG.info("WALs after adding new server: {}", walsAfterNewServer.size()); + assertTrue("Should have more WALs after new server", + walsAfterNewServer.size() > walsAfterB1.size()); + + List newServerWALs = new ArrayList<>(walsAfterNewServer); + newServerWALs.removeAll(walsAfterB1); + assertFalse("Should have WALs from new server", newServerWALs.isEmpty()); + + BackupLogCleaner cleaner = new BackupLogCleaner(); + cleaner.setConf(TEST_UTIL.getConfiguration()); + Map params = new HashMap<>(1); + params.put(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster()); + cleaner.init(params); + + Set deletable = toSet(cleaner.getDeletableFiles(walsAfterNewServer)); + for (FileStatus newWAL : newServerWALs) { + assertFalse("WAL from new server should NOT be deletable: " + newWAL.getPath(), + deletable.contains(newWAL)); + } + } finally { + TEST_UTIL.truncateTable(BackupSystemTable.getTableName(TEST_UTIL.getConfiguration())).close(); + // Clean up the RegionServer we added + if (rsThread != null) { + LOG.info("Stopping the RegionServer added for test"); + TEST_UTIL.getMiniHBaseCluster() + .stopRegionServer(rsThread.getRegionServer().getServerName()); + TEST_UTIL.getMiniHBaseCluster() + .waitForRegionServerToStop(rsThread.getRegionServer().getServerName(), 30000); + } + } } - private Set toSet(Iterable iterable) { - Set result = new LinkedHashSet<>(); - iterable.forEach(result::add); - return result; + @Test + public void testCanDeleteFileWithNewServerWALs() { + long backupStartCode = 1000000L; + // Old WAL from before the backup + Path oldWAL = new Path("/hbase/oldWALs/server1%2C60020%2C12345.500000"); + String host = BackupUtils.parseHostNameFromLogFile(oldWAL); + BackupBoundaries boundaries = BackupBoundaries.builder(0L) + .addBackupTimestamps(host, backupStartCode, backupStartCode).build(); + + assertTrue("WAL older than backup should be deletable", + BackupLogCleaner.canDeleteFile(boundaries, oldWAL)); + + // WAL from exactly at the backup boundary + Path boundaryWAL = new Path("/hbase/oldWALs/server1%2C60020%2C12345.1000000"); + assertTrue("WAL at boundary should be deletable", + BackupLogCleaner.canDeleteFile(boundaries, boundaryWAL)); + + // WAL from a server that joined AFTER the backup + Path newServerWAL = new Path("/hbase/oldWALs/newserver%2C60020%2C99999.1500000"); + assertFalse("WAL from new server (after backup) should NOT be deletable", + BackupLogCleaner.canDeleteFile(boundaries, newServerWAL)); } @Test public void testCleansUpHMasterWal() { Path path = new Path("/hbase/MasterData/WALs/hmaster,60000,1718808578163"); - assertTrue(BackupLogCleaner.canDeleteFile(Collections.emptyMap(), path)); + assertTrue(BackupLogCleaner.canDeleteFile(BackupBoundaries.builder(0L).build(), path)); } @Test public void testCleansUpArchivedHMasterWal() { + BackupBoundaries empty = BackupBoundaries.builder(0L).build(); Path normalPath = new Path("/hbase/oldWALs/hmaster%2C60000%2C1716224062663.1716247552189$masterlocalwal$"); - assertTrue(BackupLogCleaner.canDeleteFile(Collections.emptyMap(), normalPath)); + assertTrue(BackupLogCleaner.canDeleteFile(empty, normalPath)); Path masterPath = new Path( "/hbase/MasterData/oldWALs/hmaster%2C60000%2C1716224062663.1716247552189$masterlocalwal$"); - assertTrue(BackupLogCleaner.canDeleteFile(Collections.emptyMap(), masterPath)); + assertTrue(BackupLogCleaner.canDeleteFile(empty, masterPath)); + } + + private Set mergeAsSet(Collection toCopy, Collection toAdd) { + Set result = new LinkedHashSet<>(toCopy); + result.addAll(toAdd); + return result; + } + + private Set toSet(Iterable iterable) { + Set result = new LinkedHashSet<>(); + iterable.forEach(result::add); + return result; } }