Skip to content

Commit 694c11d

Browse files
adelapenadjatnieks
authored andcommitted
Fix ignored nodetool stop ANTICOMPACTION
(cherry picked from commit fad831e)
1 parent 4ff0fbb commit 694c11d

File tree

2 files changed

+85
-13
lines changed

2 files changed

+85
-13
lines changed

src/java/org/apache/cassandra/db/compaction/CompactionManager.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,27 +1837,25 @@ else if (transChecker.test(token))
18371837
pendingRepair);
18381838
return fullSSTables.size() + transSSTables.size() + unrepairedSSTables.size();
18391839
}
1840-
catch (Throwable e)
1840+
catch (CompactionInterruptedException e)
18411841
{
1842-
if (e instanceof CompactionInterruptedException)
1842+
if (isCancelled.getAsBoolean())
18431843
{
1844-
if (isCancelled.getAsBoolean())
1845-
{
1846-
logger.info("Anticompaction has been canceled for session {}", pendingRepair);
1847-
logger.trace(e.getMessage(), e);
1848-
}
1849-
else
1850-
{
1851-
logger.info("Anticompaction for session {} has been stopped by request.", pendingRepair);
1852-
}
1844+
logger.info("Anticompaction has been canceled for session {}", pendingRepair);
1845+
logger.trace(e.getMessage(), e);
18531846
}
18541847
else
18551848
{
1856-
JVMStabilityInspector.inspectThrowable(e);
1857-
logger.error("Error anticompacting " + txn + " for " + pendingRepair, e);
1849+
logger.info("Anticompaction for session {} has been stopped by request.", pendingRepair);
18581850
}
18591851
throw e;
18601852
}
1853+
catch (Throwable e)
1854+
{
1855+
JVMStabilityInspector.inspectThrowable(e);
1856+
logger.error("Error anticompacting " + txn + " for " + pendingRepair, e);
1857+
throw e;
1858+
}
18611859
}
18621860

18631861
@VisibleForTesting
@@ -1889,6 +1887,12 @@ public OperationProgress getProgress()
18891887
return compaction.getProgress();
18901888
}
18911889

1890+
@Override
1891+
public void stop()
1892+
{
1893+
compaction.stop();
1894+
}
1895+
18921896
@Override
18931897
public boolean isStopRequested()
18941898
{

test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionBytemanTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,40 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.Collection;
23+
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.concurrent.ExecutionException;
2526
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2729

2830
import com.google.common.collect.Lists;
31+
import org.junit.Assert;
2932
import org.junit.Test;
3033
import org.junit.runner.RunWith;
34+
35+
import org.apache.cassandra.config.DatabaseDescriptor;
36+
import org.apache.cassandra.cql3.QueryProcessor;
37+
import org.apache.cassandra.db.ColumnFamilyStore;
3138
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
3239
import org.apache.cassandra.db.compaction.CompactionManager;
40+
import org.apache.cassandra.dht.ByteOrderedPartitioner;
3341
import org.apache.cassandra.dht.Range;
3442
import org.apache.cassandra.dht.Token;
3543
import org.apache.cassandra.io.sstable.format.SSTableReader;
44+
import org.apache.cassandra.locator.InetAddressAndPort;
3645
import org.apache.cassandra.locator.RangesAtEndpoint;
3746
import org.apache.cassandra.locator.Replica;
3847
import org.apache.cassandra.utils.TimeUUID;
48+
import org.apache.cassandra.service.ActiveRepairService;
49+
import org.apache.cassandra.streaming.PreviewKind;
50+
import org.apache.cassandra.utils.ByteBufferUtil;
51+
import org.assertj.core.api.Assertions;
3952
import org.jboss.byteman.contrib.bmunit.BMRule;
4053
import org.jboss.byteman.contrib.bmunit.BMRules;
4154
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
4255

56+
import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
4357
import static org.junit.Assert.assertEquals;
4458
import static org.junit.Assert.assertTrue;
4559
import static org.junit.Assert.fail;
@@ -97,4 +111,58 @@ private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collec
97111

98112
return builder.build();
99113
}
114+
115+
@BMRules(rules = { @BMRule(name = "Abort anti-compaction after first call to onOperationStart",
116+
targetClass = "CompactionManager",
117+
targetMethod = "antiCompactGroup",
118+
condition = "not flagged(\"done\")",
119+
targetLocation = "AFTER INVOKE compactionRateLimiterAcquire",
120+
action = "org.apache.cassandra.db.compaction.CompactionManager.instance.stopCompaction(\"ANTICOMPACTION\");") } )
121+
@Test
122+
public void testStopAntiCompaction()
123+
{
124+
Assert.assertSame(ByteOrderedPartitioner.class, DatabaseDescriptor.getPartitioner().getClass());
125+
cfs.disableAutoCompaction();
126+
127+
// create 2 sstables, one that will be split, and another that will be moved
128+
for (int i = 0; i < 10; i++)
129+
{
130+
QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
131+
}
132+
cfs.forceBlockingFlush(UNIT_TESTS);
133+
for (int i = 10; i < 20; i++)
134+
{
135+
QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
136+
}
137+
cfs.forceBlockingFlush(UNIT_TESTS);
138+
139+
assertEquals(2, cfs.getLiveSSTables().size());
140+
assertEquals(0, cfs.getLiveSSTables().stream().filter(SSTableReader::isPendingRepair).count());
141+
142+
Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes(5));
143+
Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes(15));
144+
List<Range<Token>> ranges = Collections.singletonList(new Range<>(left, right));
145+
List<ColumnFamilyStore> tables = Collections.singletonList(cfs);
146+
147+
// create a repair session so the anti-compaction job can find it
148+
TimeUUID sessionID = TimeUUID.Generator.nextTimeUUID();
149+
ActiveRepairService.instance().registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), tables, ranges, true, 1, true, PreviewKind.NONE);
150+
151+
ExecutorService executor = Executors.newSingleThreadExecutor();
152+
try
153+
{
154+
PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor, () -> false);
155+
Future<?> future = pac.run();
156+
Assertions.assertThatThrownBy(future::get)
157+
.hasCauseInstanceOf(CompactionInterruptedException.class)
158+
.hasMessageContaining("Compaction interrupted");
159+
}
160+
finally
161+
{
162+
executor.shutdown();
163+
}
164+
165+
assertEquals(2, cfs.getLiveSSTables().size());
166+
assertEquals(0, cfs.getLiveSSTables().stream().filter(SSTableReader::isPendingRepair).count());
167+
}
100168
}

0 commit comments

Comments
 (0)