diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java index 0ca6734a07..2a1b2257b2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java @@ -82,7 +82,7 @@ public void format() throws IOException { // TODO } - static List getSingleFileSnapshotInfos(Path dir) throws IOException { + static List getSingleFileSnapshotInfos(Path dir, boolean requireMd5) throws IOException { final List infos = new ArrayList<>(); try (DirectoryStream stream = Files.newDirectoryStream(dir)) { for (Path path : stream) { @@ -90,10 +90,15 @@ static List getSingleFileSnapshotInfos(Path dir) throws if (filename != null) { final Matcher matcher = SNAPSHOT_REGEX.matcher(filename.toString()); if (matcher.matches()) { + final boolean hasMd5 = MD5FileUtil.getDigestFileForFile(path.toFile()).exists(); + if (requireMd5 && !hasMd5) { + continue; + } + final long term = Long.parseLong(matcher.group(1)); final long index = Long.parseLong(matcher.group(2)); final FileInfo fileInfo = new FileInfo(path, null); //No FileDigest here. - infos.add(new SingleFileSnapshotInfo(fileInfo, term, index)); + infos.add(new SingleFileSnapshotInfo(fileInfo, term, index, hasMd5)); } } } @@ -114,11 +119,27 @@ public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) return; } - final List allSnapshotFiles = getSingleFileSnapshotInfos(stateMachineDir.toPath()); + // Fetch all the snapshot files irrespective of whether they have an MD5 file or not + final List allSnapshotFiles = getSingleFileSnapshotInfos(stateMachineDir.toPath(), false); + allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed()); + int numSnapshotsWithMd5 = 0; + int deleteIdx = -1; + + for (int i = 0; i < allSnapshotFiles.size(); i++) { + final SingleFileSnapshotInfo snapshot = allSnapshotFiles.get(i); + if (snapshot.hasMd5()) { + if (++numSnapshotsWithMd5 == numSnapshotsRetained) { + // We have found the last snapshot with an MD5 file that needs to be retained + deleteIdx = i + 1; + break; + } + } else { + LOG.warn("Snapshot file {} has missing MD5 file.", snapshot); + } + } - if (allSnapshotFiles.size() > numSnapshotsRetained) { - allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed()); - allSnapshotFiles.subList(numSnapshotsRetained, allSnapshotFiles.size()) + if (deleteIdx > 0) { + allSnapshotFiles.subList(deleteIdx, allSnapshotFiles.size()) .stream() .map(SingleFileSnapshotInfo::getFile) .map(FileInfo::getPath) @@ -126,20 +147,21 @@ public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) LOG.info("Deleting old snapshot at {}", snapshotPath.toAbsolutePath()); FileUtils.deletePathQuietly(snapshotPath); }); - // clean up the md5 files if the corresponding snapshot file does not exist - try (DirectoryStream stream = Files.newDirectoryStream(stateMachineDir.toPath(), - SNAPSHOT_MD5_FILTER)) { - for (Path md5path : stream) { - Path md5FileNamePath = md5path.getFileName(); - if (md5FileNamePath == null) { - continue; - } - final String md5FileName = md5FileNamePath.toString(); - final File snapshotFile = new File(stateMachineDir, - md5FileName.substring(0, md5FileName.length() - MD5_SUFFIX.length())); - if (!snapshotFile.exists()) { - FileUtils.deletePathQuietly(md5path); - } + } + + // clean up the md5 files if the corresponding snapshot file does not exist + try (DirectoryStream stream = Files.newDirectoryStream(stateMachineDir.toPath(), + SNAPSHOT_MD5_FILTER)) { + for (Path md5path : stream) { + Path md5FileNamePath = md5path.getFileName(); + if (md5FileNamePath == null) { + continue; + } + final String md5FileName = md5FileNamePath.toString(); + final File snapshotFile = new File(stateMachineDir, + md5FileName.substring(0, md5FileName.length() - MD5_SUFFIX.length())); + if (!snapshotFile.exists()) { + FileUtils.deletePathQuietly(md5path); } } } @@ -182,7 +204,7 @@ protected File getCorruptSnapshotFile(long term, long endIndex) { } static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException { - final Iterator i = getSingleFileSnapshotInfos(dir).iterator(); + final Iterator i = getSingleFileSnapshotInfos(dir, true).iterator(); if (!i.hasNext()) { return null; } @@ -199,7 +221,7 @@ static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException { final Path path = latest.getFile().getPath(); final MD5Hash md5 = MD5FileUtil.readStoredMd5ForFile(path.toFile()); final FileInfo info = new FileInfo(path, md5); - return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex()); + return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex(), true); } public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo info) { diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java index 14d501a4af..922fa2ff52 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SingleFileSnapshotInfo.java @@ -28,12 +28,24 @@ * The objects of this class are immutable. */ public class SingleFileSnapshotInfo extends FileListSnapshotInfo { + private final Boolean hasMd5; // Whether the snapshot file has a corresponding MD5 file + public SingleFileSnapshotInfo(FileInfo fileInfo, TermIndex termIndex) { + this(fileInfo, termIndex, null); + } + + public SingleFileSnapshotInfo(FileInfo fileInfo, TermIndex termIndex, Boolean hasMd5) { super(Collections.singletonList(fileInfo), termIndex); + this.hasMd5 = hasMd5; + } + + public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex, boolean hasMd5) { + this(fileInfo, TermIndex.valueOf(term, endIndex), hasMd5); } - public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) { - this(fileInfo, TermIndex.valueOf(term, endIndex)); + /** @return the md5 file exists for the snapshot file */ + public boolean hasMd5() { + return hasMd5 != null && hasMd5; } /** @return the file associated with the snapshot. */ diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java index 12cd771315..a86f6ff1a1 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java @@ -18,6 +18,7 @@ package org.apache.ratis.server.storage; import static java.util.stream.Collectors.toList; +import static org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_MD5_REGEX; import static org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_REGEX; import static org.apache.ratis.util.MD5FileUtil.MD5_SUFFIX; @@ -29,7 +30,9 @@ import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.SnapshotRetentionPolicy; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.MD5FileUtil; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; import org.junit.jupiter.api.AfterEach; @@ -44,6 +47,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; @@ -228,7 +232,7 @@ public void testSnapshotCleanup() throws IOException { SnapshotRetentionPolicy snapshotRetentionPolicy = new SnapshotRetentionPolicy() { @Override public int getNumSnapshotsRetained() { - return 3; + return 2; } }; @@ -239,15 +243,24 @@ public int getNumSnapshotsRetained() { Set termIndexSet = new HashSet<>(); - //Create 5 snapshot files in storage dir. - while (termIndexSet.size() < 5) { + //Create 3 snapshot files in storage dir. + while (termIndexSet.size() < 3) { final long term = ThreadLocalRandom.current().nextLong(1, 10L); - final long index = ThreadLocalRandom.current().nextLong(100, 1000L); + final long index = ThreadLocalRandom.current().nextLong(100, 500L); if (termIndexSet.add(TermIndex.valueOf(term, index))) { - File file = simpleStateMachineStorage.getSnapshotFile(term, index); - Assertions.assertTrue(file.createNewFile()); + createSnapshot(simpleStateMachineStorage, term, index, true); } } + + // Create 2 more snapshot files in storage dir without MD5 files + while (termIndexSet.size() < 5) { + final long term = ThreadLocalRandom.current().nextLong(11, 20L); + final long index = ThreadLocalRandom.current().nextLong(501, 1000L); + if (termIndexSet.add(TermIndex.valueOf(term, index))) { + createSnapshot(simpleStateMachineStorage, term, index, false); + } + } + // create MD5 files that will not be deleted in older version while (termIndexSet.size() < 7) { final long term = 1; @@ -260,16 +273,18 @@ public int getNumSnapshotsRetained() { } File stateMachineDir = storage.getStorageDir().getStateMachineDir(); - assertFileCount(stateMachineDir, 7); + assertFileCount(stateMachineDir, 10); simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); - File[] remainingFiles = assertFileCount(stateMachineDir, 3); + // Since the MD5 files are not matching the snapshot files they are cleaned up. + // So we still have 6 files - 4 snapshots and 2 MD5 files. + File[] remainingFiles = assertFileCount(stateMachineDir, 6); List remainingIndices = termIndexSet.stream() .map(TermIndex::getIndex) .sorted(Collections.reverseOrder()) - .limit(3) + .limit(4) .collect(toList()); for (File file : remainingFiles) { System.out.println(file.getName()); @@ -281,21 +296,218 @@ public int getNumSnapshotsRetained() { // Attempt to clean up again should not delete any more files. simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); - assertFileCount(stateMachineDir, 3); + assertFileCount(stateMachineDir, 6); //Test with Retention disabled. //Create 2 snapshot files in storage dir. for (int i = 0; i < 2; i++) { - final long term = ThreadLocalRandom.current().nextLong(1, 10L); + final long term = ThreadLocalRandom.current().nextLong(21, 30L); final long index = ThreadLocalRandom.current().nextLong(1000L); - File file = simpleStateMachineStorage.getSnapshotFile(term, index); - Assertions.assertTrue(file.createNewFile()); + createSnapshot(simpleStateMachineStorage, term, index, false); } simpleStateMachineStorage.cleanupOldSnapshots(new SnapshotRetentionPolicy() { }); + assertFileCount(stateMachineDir, 8); + } + + @Test + public void testSnapshotCleanupWithMissingMd5File() throws IOException { + + SnapshotRetentionPolicy snapshotRetentionPolicy = new SnapshotRetentionPolicy() { + @Override + public int getNumSnapshotsRetained() { + return 2; + } + }; + + + SimpleStateMachineStorage simpleStateMachineStorage = new SimpleStateMachineStorage(); + final RaftStorage storage = newRaftStorage(storageDir); + simpleStateMachineStorage.init(storage); + + Set termIndexSet = new HashSet<>(); + + // Create one snapshot file without MD5 file + if (termIndexSet.add(TermIndex.valueOf(1, 100))) { + createSnapshot(simpleStateMachineStorage, 1, 100, false); + } + + //Create 4 snapshot files in storage dir + while (termIndexSet.size() < 5) { + final long term = ThreadLocalRandom.current().nextLong(2, 10L); + final long index = ThreadLocalRandom.current().nextLong(100, 1000L); + if (termIndexSet.add(TermIndex.valueOf(term, index))) { + createSnapshot(simpleStateMachineStorage, term, index, true); + } + } + + // 1 snapshot file without MD5 hash, 4 snapshots + 4 md5 hash files = 9 files + File stateMachineDir = storage.getStorageDir().getStateMachineDir(); + assertFileCount(stateMachineDir, 9); + + simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); + + // We should have 4 files remaining, and 2 snapshots with MD5 hash + assertFileCount(stateMachineDir, 4); + } + + @Test + public void testSnapshotCleanupWithLatestSnapshotMissingMd5File() throws IOException { + + SnapshotRetentionPolicy snapshotRetentionPolicy = new SnapshotRetentionPolicy() { + @Override + public int getNumSnapshotsRetained() { + return 2; + } + }; + + + SimpleStateMachineStorage simpleStateMachineStorage = new SimpleStateMachineStorage(); + final RaftStorage storage = newRaftStorage(storageDir); + simpleStateMachineStorage.init(storage); + + Set termIndexSet = new HashSet<>(); + + //Create 4 snapshot files in storage dir + while (termIndexSet.size() < 4) { + final long term = ThreadLocalRandom.current().nextLong(1, 10L); + final long index = ThreadLocalRandom.current().nextLong(100, 1000L); + if (termIndexSet.add(TermIndex.valueOf(term, index))) { + createSnapshot(simpleStateMachineStorage, term, index, true); + } + } + + // Create a snapshot file with a missing MD5 file and having the highest term index + if (termIndexSet.add(TermIndex.valueOf(99, 1001))) { + createSnapshot(simpleStateMachineStorage, 99, 1001, false); + } + + // 1 snapshot file without MD5 hash, 4 snapshots + 4 md5 hash files = 9 files + File stateMachineDir = storage.getStorageDir().getStateMachineDir(); + assertFileCount(stateMachineDir, 9); + + simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); + + // We should have 5 files remaining, and 2 snapshots with MD5 hash and 1 snapshot file without MD5 hash assertFileCount(stateMachineDir, 5); } + @Test + public void testCleanupOldSnapshotsDeletesOlderSnapshotsWithMd5() throws Exception { + SnapshotRetentionPolicy snapshotRetentionPolicy = new SnapshotRetentionPolicy() { + @Override + public int getNumSnapshotsRetained() { + return 2; + } + }; + + SimpleStateMachineStorage simpleStateMachineStorage = new SimpleStateMachineStorage(); + final RaftStorage storage = newRaftStorage(storageDir); + simpleStateMachineStorage.init(storage); + try { + createSnapshot(simpleStateMachineStorage, 1, 100, true); + createSnapshot(simpleStateMachineStorage, 1, 200, true); + createSnapshot(simpleStateMachineStorage, 1, 300, true); + createSnapshot(simpleStateMachineStorage, 1, 400, true); + + File stateMachineDir = storage.getStorageDir().getStateMachineDir(); + simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); + + List snapshotNames = listMatchingFileNames(stateMachineDir, SNAPSHOT_REGEX); + Assertions.assertEquals(2, snapshotNames.size()); + Assertions.assertTrue(snapshotNames.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 400))); + Assertions.assertTrue(snapshotNames.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 300))); + Assertions.assertFalse(snapshotNames.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 200))); + Assertions.assertFalse(snapshotNames.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 100))); + + List md5Names = listMatchingFileNames(stateMachineDir, SNAPSHOT_MD5_REGEX); + Assertions.assertEquals(2, md5Names.size()); + Assertions.assertTrue(md5Names.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 400) + MD5_SUFFIX)); + Assertions.assertTrue(md5Names.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 300) + MD5_SUFFIX)); + Assertions.assertFalse(md5Names.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 200) + MD5_SUFFIX)); + Assertions.assertFalse(md5Names.contains(SimpleStateMachineStorage.getSnapshotFileName(1, 100) + MD5_SUFFIX)); + } finally { + storage.close(); + } + } + + @Test + public void testCleanupOldSnapshotsWithoutAnyMd5() throws Exception { + SnapshotRetentionPolicy snapshotRetentionPolicy = new SnapshotRetentionPolicy() { + @Override + public int getNumSnapshotsRetained() { + return 2; + } + }; + + SimpleStateMachineStorage simpleStateMachineStorage = new SimpleStateMachineStorage(); + final RaftStorage storage = newRaftStorage(storageDir); + simpleStateMachineStorage.init(storage); + try { + createSnapshot(simpleStateMachineStorage, 1, 100, false); + createSnapshot(simpleStateMachineStorage, 1, 200, false); + createSnapshot(simpleStateMachineStorage, 1, 300, false); + + File stateMachineDir = storage.getStorageDir().getStateMachineDir(); + simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy); + + List snapshotNames = listMatchingFileNames(stateMachineDir, SNAPSHOT_REGEX); + Assertions.assertEquals(3, snapshotNames.size()); + Assertions.assertTrue(listMatchingFileNames(stateMachineDir, SNAPSHOT_MD5_REGEX).isEmpty()); + } finally { + storage.close(); + } + } + + @Test + public void testGetLatestSnapshotReturnsNewest() throws Exception { + SimpleStateMachineStorage simpleStateMachineStorage = new SimpleStateMachineStorage(); + final RaftStorage storage = newRaftStorage(storageDir); + simpleStateMachineStorage.init(storage); + try { + Assertions.assertNull(simpleStateMachineStorage.getLatestSnapshot()); + + createSnapshot(simpleStateMachineStorage, 1, 100, true); + simpleStateMachineStorage.loadLatestSnapshot(); + SingleFileSnapshotInfo first = simpleStateMachineStorage.getLatestSnapshot(); + Assertions.assertNotNull(first); + Assertions.assertEquals(1, first.getTerm()); + Assertions.assertEquals(100, first.getIndex()); + Assertions.assertNotNull(first.getFile().getFileDigest()); + + createSnapshot(simpleStateMachineStorage, 1, 200, true); + simpleStateMachineStorage.loadLatestSnapshot(); + SingleFileSnapshotInfo second = simpleStateMachineStorage.getLatestSnapshot(); + Assertions.assertNotNull(second); + Assertions.assertEquals(1, second.getTerm()); + Assertions.assertEquals(200, second.getIndex()); + Assertions.assertNotNull(second.getFile().getFileDigest()); + } finally { + storage.close(); + } + } + + @Test + public void testGetLatestSnapshotIgnoresSnapshotsWithoutMd5() throws Exception { + SimpleStateMachineStorage simpleStateMachineStorage = new SimpleStateMachineStorage(); + final RaftStorage storage = newRaftStorage(storageDir); + simpleStateMachineStorage.init(storage); + try { + createSnapshot(simpleStateMachineStorage, 1, 100, true); + simpleStateMachineStorage.loadLatestSnapshot(); + + createSnapshot(simpleStateMachineStorage, 1, 200, false); + simpleStateMachineStorage.loadLatestSnapshot(); + + SingleFileSnapshotInfo latest = simpleStateMachineStorage.getLatestSnapshot(); + Assertions.assertNotNull(latest); + Assertions.assertEquals(100, latest.getIndex()); + Assertions.assertEquals(1, latest.getTerm()); + } finally { + storage.close(); + } + } + private static File[] assertFileCount(File dir, int expected) { File[] files = dir.listFiles(); Assertions.assertNotNull(files); @@ -303,6 +515,25 @@ private static File[] assertFileCount(File dir, int expected) { return files; } + private File createSnapshot(SimpleStateMachineStorage storage, + long term, long endIndex, + boolean withMd5) throws IOException { + File snapshotFile = storage.getSnapshotFile(term, endIndex); + Assertions.assertTrue(snapshotFile.createNewFile()); + + if (withMd5) { + MD5FileUtil.computeAndSaveMd5ForFile(snapshotFile); + } + + return snapshotFile; + } + + private static List listMatchingFileNames(File dir, java.util.regex.Pattern pattern) { + return Arrays.stream(Objects.requireNonNull(dir.list())) + .filter(name -> pattern.matcher(name).matches()) + .collect(toList()); + } + @Test public void testNotEnoughSpace() throws IOException { File mockStorageDir = Mockito.spy(storageDir);