Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.backup.master;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -32,7 +33,8 @@
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.util.BackupBoundaries;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.master.HMaster;
Expand All @@ -41,7 +43,6 @@
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,8 @@
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class BackupLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(BackupLogCleaner.class);
private static final long TS_BUFFER_DEFAULT = Duration.ofHours(1).toMillis();
static final String TS_BUFFER_KEY = "hbase.backup.log.cleaner.timestamp.buffer.ms";

private boolean stopped = false;
private Connection conn;
Expand Down Expand Up @@ -86,8 +89,9 @@ public void init(Map<String, Object> params) {
* I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
* backups.
*/
private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backups)
private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTable)
throws IOException {
List<BackupInfo> backups = sysTable.getBackupHistory(true);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Cleaning WALs if they are older than the WAL cleanup time-boundary. "
Expand All @@ -112,27 +116,25 @@ private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backu
.collect(Collectors.joining(", ")));
}

// This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp)
// inclusion in any backup. In other words, it is the timestamp boundary up to which all backup
// roots have included the WAL in their backup.
Map<Address, Long> boundaries = new HashMap<>();
BackupBoundaries.BackupBoundariesBuilder builder =
BackupBoundaries.builder(getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT));
for (BackupInfo backupInfo : newestBackupPerRootDir.values()) {
long startCode = Long.parseLong(sysTable.readBackupStartCode(backupInfo.getBackupRootDir()));
// Iterate over all tables in the timestamp map, which contains all tables covered in the
// backup root, not just the tables included in that specific backup (which could be a subset)
for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) {
for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
.entrySet()) {
Address address = Address.fromString(entry.getKey());
Long storedTs = boundaries.get(address);
if (storedTs == null || entry.getValue() < storedTs) {
boundaries.put(address, entry.getValue());
}
builder.addBackupTimestamps(entry.getKey(), entry.getValue(), startCode);
}
}
}

BackupBoundaries boundaries = builder.build();

if (LOG.isDebugEnabled()) {
for (Map.Entry<Address, Long> entry : boundaries.entrySet()) {
LOG.debug("Boundaries oldestStartCode: {}", boundaries.getOldestStartCode());
for (Map.Entry<Address, Long> entry : boundaries.getBoundaries().entrySet()) {
LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(),
entry.getValue());
}
Expand All @@ -153,19 +155,18 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
return files;
}

Map<Address, Long> serverToPreservationBoundaryTs;
BackupBoundaries boundaries;
try {
try (BackupManager backupManager = new BackupManager(conn, getConf())) {
serverToPreservationBoundaryTs =
serverToPreservationBoundaryTs(backupManager.getBackupHistory(true));
try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
boundaries = serverToPreservationBoundaryTs(sysTable);
}
} catch (IOException ex) {
LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
ex.getMessage(), ex);
return Collections.emptyList();
}
for (FileStatus file : files) {
if (canDeleteFile(serverToPreservationBoundaryTs, file.getPath())) {
if (canDeleteFile(boundaries, file.getPath())) {
filteredFiles.add(file);
}
}
Expand Down Expand Up @@ -200,54 +201,17 @@ public boolean isStopped() {
return this.stopped;
}

protected static boolean canDeleteFile(Map<Address, Long> addressToBoundaryTs, Path path) {
protected static boolean canDeleteFile(BackupBoundaries boundaries, Path path) {
if (isHMasterWAL(path)) {
return true;
}

try {
String hostname = BackupUtils.parseHostNameFromLogFile(path);
if (hostname == null) {
LOG.warn(
"Cannot parse hostname from RegionServer WAL file: {}. Ignoring cleanup of this log",
path);
return false;
}
Address walServerAddress = Address.fromString(hostname);
long walTimestamp = WAL.getTimestamp(path.getName());

if (!addressToBoundaryTs.containsKey(walServerAddress)) {
if (LOG.isDebugEnabled()) {
LOG.debug("No cleanup WAL time-boundary found for server: {}. Ok to delete file: {}",
walServerAddress.getHostName(), path);
}
return true;
}

Long backupBoundary = addressToBoundaryTs.get(walServerAddress);
if (backupBoundary >= walTimestamp) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}",
walServerAddress.getHostName(), backupBoundary, path);
}
return true;
}

if (LOG.isDebugEnabled()) {
LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}",
walServerAddress.getHostName(), backupBoundary, path);
}
} catch (Exception ex) {
LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", path, ex);
return false;
}
return false;
return boundaries.isDeletable(path);
}

private static boolean isHMasterWAL(Path path) {
String fn = path.getName();
return fn.startsWith(WALProcedureStore.LOG_PREFIX)
|| fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX);
|| fn.endsWith(MasterRegionFactory.ARCHIVED_WAL_SUFFIX)
|| path.toString().contains("/" + MasterRegionFactory.MASTER_STORE_DIR + "/");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.util;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Tracks time boundaries for WAL file cleanup during backup operations. Maintains the oldest
* timestamp per RegionServer included in any backup, enabling safe determination of which WAL files
* can be deleted without compromising backup integrity.
*/
@InterfaceAudience.Private
public class BackupBoundaries {
private static final Logger LOG = LoggerFactory.getLogger(BackupBoundaries.class);
private static final BackupBoundaries EMPTY_BOUNDARIES =
new BackupBoundaries(Collections.emptyMap(), Long.MAX_VALUE);

// This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp)
// inclusion in any backup. In other words, it is the timestamp boundary up to which all backup
// roots have included the WAL in their backup.
private final Map<Address, Long> boundaries;

// The minimum WAL roll timestamp from the most recent backup of each backup root, used as a
// fallback cleanup boundary for RegionServers without explicit backup boundaries (e.g., servers
// that joined after backups began)
private final long oldestStartCode;

private BackupBoundaries(Map<Address, Long> boundaries, long oldestStartCode) {
this.boundaries = boundaries;
this.oldestStartCode = oldestStartCode;
}

public boolean isDeletable(Path walLogPath) {
try {
String hostname = BackupUtils.parseHostNameFromLogFile(walLogPath);

if (hostname == null) {
LOG.warn(
"Cannot parse hostname from RegionServer WAL file: {}. Ignoring cleanup of this log",
walLogPath);
return false;
}

Address address = Address.fromString(hostname);
long pathTs = WAL.getTimestamp(walLogPath.getName());

if (!boundaries.containsKey(address)) {
boolean isDeletable = pathTs <= oldestStartCode;
if (LOG.isDebugEnabled()) {
LOG.debug(
"Boundary for {} not found. isDeletable = {} based on oldestStartCode = {} and WAL ts of {}",
walLogPath, isDeletable, oldestStartCode, pathTs);
}
return isDeletable;
}

long backupTs = boundaries.get(address);
if (pathTs <= backupTs) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"WAL cleanup time-boundary found for server {}: {}. Ok to delete older file: {}",
address.getHostName(), pathTs, walLogPath);
}
return true;
}

if (LOG.isDebugEnabled()) {
LOG.debug("WAL cleanup time-boundary found for server {}: {}. Keeping younger file: {}",
address.getHostName(), backupTs, walLogPath);
}

return false;
} catch (Exception e) {
LOG.warn("Error occurred while filtering file: {}. Ignoring cleanup of this log", walLogPath,
e);
return false;
}
}

public Map<Address, Long> getBoundaries() {
return boundaries;
}

public long getOldestStartCode() {
return oldestStartCode;
}

public static BackupBoundariesBuilder builder(long tsCleanupBuffer) {
return new BackupBoundariesBuilder(tsCleanupBuffer);
}

public static class BackupBoundariesBuilder {
private final Map<Address, Long> boundaries = new HashMap<>();
private final long tsCleanupBuffer;

private long oldestStartCode = Long.MAX_VALUE;

private BackupBoundariesBuilder(long tsCleanupBuffer) {
this.tsCleanupBuffer = tsCleanupBuffer;
}

public BackupBoundariesBuilder addBackupTimestamps(String host, long hostLogRollTs,
long backupStartCode) {
Address address = Address.fromString(host);
Long storedTs = boundaries.get(address);
if (storedTs == null || hostLogRollTs < storedTs) {
boundaries.put(address, hostLogRollTs);
}

if (oldestStartCode > backupStartCode) {
oldestStartCode = backupStartCode;
}

return this;
}

public BackupBoundaries build() {
if (boundaries.isEmpty()) {
return EMPTY_BOUNDARIES;
}

oldestStartCode -= tsCleanupBuffer;
return new BackupBoundaries(boundaries, oldestStartCode);
}
}
}
Loading