Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ public void testRecoveryAfterCorruptionMetadataLocation() throws Exception {
DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(Integer.valueOf(location.getDataFileId()));
RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
randomAccessFile.seek(location.getOffset());
randomAccessFile.writeInt(Integer.MAX_VALUE);
// Use an invalid size well past the end of the data file to trigger corruption handling without large allocation.
int bogusSize = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal()
.getFileMap().get(location.getDataFileId()).getLength() * 10;
randomAccessFile.writeInt(bogusSize);
randomAccessFile.getChannel().force(true);

((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().close();
Expand All @@ -246,15 +249,26 @@ public void append(LogEvent event) {
* throw new EOFException();
*/
if (event != null
&& event.getLevel() == Level.WARN
&& event.getMessage() != null
&& event.getMessage().getFormattedMessage() != null
&& event.getMessage().getFormattedMessage().contains("Cannot recover message audit")
&& event.getLevel() == Level.WARN
&& event.getMessage() != null
&& event.getMessage().getFormattedMessage() != null) {

final String msg = event.getMessage().getFormattedMessage();

boolean auditCorruption =
msg.contains("Cannot recover message audit")
&& event.getThrown() != null
&& event.getThrown() instanceof EOFException
&& event.getThrown().getMessage() == null) {
&& (event.getThrown() instanceof EOFException
|| (event.getThrown() instanceof IOException
&& event.getThrown().getMessage() != null
&& event.getThrown().getMessage().contains("Invalid location size")));

trappedExpectedLogMessage.set(true);
boolean dataFileCorruption =
msg.contains("DataFile:") && (msg.contains("Invalid location size") || msg.contains("larger than expected"));

if (auditCorruption || dataFileCorruption) {
trappedExpectedLogMessage.set(true);
}
}
}
};
Expand All @@ -272,7 +286,7 @@ public void append(LogEvent event) {
assertEquals("no missing message", 50, broker.getAdminView().getTotalMessageCount());
assertEquals("Drain", 50, drainQueue(50));
assertEquals("no problem draining messages", 0, broker.getAdminView().getTotalMessageCount());
assertTrue("Did replay records on invalid location size", trappedExpectedLogMessage.get());
assertTrue("Did not detect corruption via warning", trappedExpectedLogMessage.get());
}

@Test
Expand Down Expand Up @@ -419,7 +433,8 @@ private void corruptBatchEndEof(int id) throws Exception{
int pos = batchPositions.get(batchPositions.size() - 3);
LOG.info("corrupting checksum and size (to push it past eof) of batch record at:" + id + "-" + pos);
randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_HEADER.length);
randomAccessFile.writeInt(31 * 1024 * 1024);
// Use a bounded bogus size to trigger the corruption path without exhausting heap on read.
randomAccessFile.writeInt(4 * 1024 * 1024);
randomAccessFile.writeLong(0l);
randomAccessFile.getChannel().force(true);
}
Expand Down