Skip to content

Commit b37c610

Browse files
authored
Merge pull request #436 from lhsoft/recover_log_from_corrupt
recover log from data corrupt
2 parents ae887a0 + 7fd7b80 commit b37c610

File tree

2 files changed

+103
-1
lines changed

2 files changed

+103
-1
lines changed

src/braft/log.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ BRPC_VALIDATE_GFLAG(raft_max_segment_size, brpc::PositiveInteger);
5252
DEFINE_bool(raft_sync_segments, false, "call fsync when a segment is closed");
5353
BRPC_VALIDATE_GFLAG(raft_sync_segments, ::brpc::PassValidate);
5454

55+
DEFINE_bool(raft_recover_log_from_corrupt, false, "recover log by truncating corrupted log");
56+
BRPC_VALIDATE_GFLAG(raft_recover_log_from_corrupt, ::brpc::PassValidate);
57+
5558
static bvar::LatencyRecorder g_open_segment_latency("raft_open_segment");
5659
static bvar::LatencyRecorder g_segment_append_entry_latency("raft_segment_append_entry");
5760
static bvar::LatencyRecorder g_sync_segment_latency("raft_sync_segment");
@@ -284,6 +287,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
284287
int64_t file_size = st_buf.st_size;
285288
int64_t entry_off = 0;
286289
int64_t actual_last_index = _first_index - 1;
290+
bool is_entry_corrupted = false;
287291
for (int64_t i = _first_index; entry_off < file_size; i++) {
288292
EntryHeader header;
289293
const int rc = _load_entry(entry_off, &header, NULL, ENTRY_HEADER_SIZE);
@@ -292,6 +296,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
292296
break;
293297
}
294298
if (rc < 0) {
299+
is_entry_corrupted = true;
295300
ret = rc;
296301
break;
297302
}
@@ -345,7 +350,12 @@ int Segment::load(ConfigurationManager* configuration_manager) {
345350
}
346351

347352
if (ret != 0) {
348-
return ret;
353+
if (!_is_open || !FLAGS_raft_recover_log_from_corrupt || !is_entry_corrupted) {
354+
return ret;
355+
// Truncate the log to the last normal index
356+
} else {
357+
ret = 0;
358+
}
349359
}
350360

351361
if (_is_open) {

test/test_log.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
namespace braft {
2323
DECLARE_bool(raft_trace_append_entry_latency);
24+
DECLARE_bool(raft_recover_log_from_corrupt);
2425
}
2526

2627
class LogStorageTest : public testing::Test {
@@ -503,6 +504,18 @@ int truncate_uninterrupted(const char* filename, off_t length) {
503504
return rc;
504505
}
505506

507+
int append_corrupted_data(const char* filename) {
508+
char header_buf[200];
509+
memset(header_buf, 1, 200);
510+
FILE* fp = std::fopen(filename, "a");
511+
if (fp == NULL) {
512+
return -1;
513+
}
514+
int ret = std::fputs(header_buf, fp);
515+
std::fclose(fp);
516+
return ret;
517+
}
518+
506519
TEST_F(LogStorageTest, data_lost) {
507520
::system("rm -rf data");
508521
braft::LogStorage* storage = new braft::SegmentLogStorage("./data");
@@ -1281,3 +1294,82 @@ TEST_F(LogStorageTest, append_close_load_append_with_io_metric) {
12811294
delete storage;
12821295
delete configuration_manager;
12831296
}
1297+
1298+
TEST_F(LogStorageTest, data_corrupt) {
1299+
::system("rm -rf data");
1300+
braft::LogStorage* storage = new braft::SegmentLogStorage("./data");
1301+
braft::ConfigurationManager* configuration_manager = new braft::ConfigurationManager;
1302+
ASSERT_EQ(0, storage->init(configuration_manager));
1303+
1304+
// append entry
1305+
for (int i = 0; i < 100000; i++) {
1306+
std::vector<braft::LogEntry*> entries;
1307+
for (int j = 0; j < 5; j++) {
1308+
int64_t index = 5*i + j + 1;
1309+
braft::LogEntry* entry = new braft::LogEntry();
1310+
entry->type = braft::ENTRY_TYPE_DATA;
1311+
entry->id.term = 1;
1312+
entry->id.index = index;
1313+
1314+
char data_buf[128];
1315+
snprintf(data_buf, sizeof(data_buf), "hello, world: %ld", index);
1316+
entry->data.append(data_buf);
1317+
entries.push_back(entry);
1318+
}
1319+
1320+
ASSERT_EQ(5, storage->append_entries(entries, NULL));
1321+
1322+
for (size_t j = 0; j < entries.size(); j++) {
1323+
delete entries[j];
1324+
}
1325+
}
1326+
1327+
delete storage;
1328+
delete configuration_manager;
1329+
1330+
// reinit
1331+
storage = new braft::SegmentLogStorage("./data");
1332+
configuration_manager = new braft::ConfigurationManager;
1333+
ASSERT_EQ(0, storage->init(configuration_manager));
1334+
1335+
ASSERT_EQ(storage->first_log_index(), 1);
1336+
ASSERT_EQ(storage->last_log_index(), 100000*5);
1337+
1338+
delete storage;
1339+
delete configuration_manager;
1340+
1341+
// last segment data corrupt
1342+
butil::DirReaderPosix dir_reader1("./data");
1343+
ASSERT_TRUE(dir_reader1.IsValid());
1344+
while (dir_reader1.Next()) {
1345+
int64_t first_index = 0;
1346+
int match = sscanf(dir_reader1.name(), "log_inprogress_%020ld",
1347+
&first_index);
1348+
std::string path;
1349+
butil::string_appendf(&path, "./data/%s", dir_reader1.name());
1350+
if (match == 1) {
1351+
ASSERT_LE(0, append_corrupted_data(path.c_str()));
1352+
}
1353+
}
1354+
1355+
1356+
storage = new braft::SegmentLogStorage("./data");
1357+
configuration_manager = new braft::ConfigurationManager;
1358+
ASSERT_NE(0, storage->init(configuration_manager));
1359+
1360+
delete storage;
1361+
delete configuration_manager;
1362+
1363+
braft::FLAGS_raft_recover_log_from_corrupt = true;
1364+
1365+
storage = new braft::SegmentLogStorage("./data");
1366+
configuration_manager = new braft::ConfigurationManager;
1367+
ASSERT_EQ(0, storage->init(configuration_manager));
1368+
1369+
ASSERT_EQ(storage->first_log_index(), 1);
1370+
ASSERT_EQ(storage->last_log_index(), 100000*5);
1371+
1372+
delete storage;
1373+
delete configuration_manager;
1374+
}
1375+

0 commit comments

Comments
 (0)