Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 96 additions & 79 deletions test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,102 +38,119 @@
import org.apache.accumulo.core.clientImpl.ThriftScanner;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class TimeoutIT extends AccumuloClusterHarness {
public class TimeoutIT extends SharedMiniClusterBase {

@Override
protected Duration defaultTimeout() {
return Duration.ofSeconds(75);
}

@Test
public void run() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String[] tableNames = getUniqueNames(3);
testBatchWriterTimeout(client, tableNames[0]);
testBatchScannerTimeout(client, tableNames[1]);
testScannerTimeout(client, tableNames[2]);
}
@BeforeAll
public static void setup() throws Exception {
SharedMiniClusterBase.startMiniCluster();
}

public void testBatchWriterTimeout(AccumuloClient client, String tableName) throws Exception {
client.tableOperations().create(tableName);
client.tableOperations().addConstraint(tableName, SlowConstraint.class.getName());

// give constraint time to propagate through zookeeper
sleepUninterruptibly(1, TimeUnit.SECONDS);

BatchWriter bw = client.createBatchWriter(tableName,
new BatchWriterConfig().setTimeout(3, TimeUnit.SECONDS));

Mutation mut = new Mutation("r1");
mut.put("cf1", "cq1", "v1");

bw.addMutation(mut);
var mre =
assertThrows(MutationsRejectedException.class, bw::close, "batch writer did not timeout");
if (mre.getCause() instanceof TimedOutException) {
return;
}
throw mre;
@AfterAll
public static void tearDown() throws Exception {
SharedMiniClusterBase.stopMiniCluster();
}

public void testBatchScannerTimeout(AccumuloClient client, String tableName) throws Exception {
client.tableOperations().create(tableName);

try (BatchWriter bw = client.createBatchWriter(tableName)) {
Mutation m = new Mutation("r1");
m.put("cf1", "cq1", "v1");
m.put("cf1", "cq2", "v2");
m.put("cf1", "cq3", "v3");
m.put("cf1", "cq4", "v4");
bw.addMutation(m);
}

try (BatchScanner bs = client.createBatchScanner(tableName)) {
bs.setRanges(Collections.singletonList(new Range()));

// should not timeout
bs.setTimeout(5, TimeUnit.SECONDS);
bs.forEach((k, v) -> {});

IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
iterSetting.addOption("sleepTime", 2000 + "");
bs.addScanIterator(iterSetting);

assertThrows(TimedOutException.class, () -> bs.iterator().next(),
"batch scanner did not time out");
@Test
public void testBatchWriterTimeout() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String[] tableNames = getUniqueNames(1);
String tableName = tableNames[0];
client.tableOperations().create(tableName);
client.tableOperations().addConstraint(tableName, SlowConstraint.class.getName());

// give constraint time to propagate through zookeeper
sleepUninterruptibly(1, TimeUnit.SECONDS);

BatchWriter bw = client.createBatchWriter(tableName,
new BatchWriterConfig().setTimeout(3, TimeUnit.SECONDS));

Mutation mut = new Mutation("r1");
mut.put("cf1", "cq1", "v1");

bw.addMutation(mut);
var mre =
assertThrows(MutationsRejectedException.class, bw::close, "batch writer did not timeout");
if (mre.getCause() instanceof TimedOutException) {
return;
}
throw mre;
}
}

public void testScannerTimeout(AccumuloClient client, String tableName) throws Exception {
client.tableOperations().create(tableName);

try (BatchWriter bw = client.createBatchWriter(tableName)) {
Mutation m = new Mutation("r1");
m.put("cf1", "cq1", "v1");
m.put("cf1", "cq2", "v2");
m.put("cf1", "cq3", "v3");
m.put("cf1", "cq4", "v4");
bw.addMutation(m);
@Test
public void testBatchScannerTimeout() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String[] tableNames = getUniqueNames(1);
String tableName = tableNames[0];
client.tableOperations().create(tableName);

try (BatchWriter bw = client.createBatchWriter(tableName)) {
Mutation m = new Mutation("r1");
m.put("cf1", "cq1", "v1");
m.put("cf1", "cq2", "v2");
m.put("cf1", "cq3", "v3");
m.put("cf1", "cq4", "v4");
bw.addMutation(m);
}

try (BatchScanner bs = client.createBatchScanner(tableName)) {
bs.setRanges(Collections.singletonList(new Range()));

// should not timeout
bs.setTimeout(5, TimeUnit.SECONDS);
bs.forEach((k, v) -> {});

IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
iterSetting.addOption("sleepTime", 2000 + "");
bs.addScanIterator(iterSetting);

assertThrows(TimedOutException.class, () -> bs.iterator().next(),
"batch scanner did not time out");
}
}
}

try (Scanner scanner = client.createScanner(tableName)) {
scanner.setRange(new Range());

// should not timeout
scanner.setTimeout(5, TimeUnit.SECONDS);
scanner.forEach((k, v) -> {});

IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
iterSetting.addOption("sleepTime", 6000 + "");
scanner.addScanIterator(iterSetting);

var exception = assertThrows(RuntimeException.class, () -> scanner.iterator().next(),
"scanner did not time out");
assertEquals(ThriftScanner.ScanTimedOutException.class, exception.getCause().getClass());
@Test
public void testScannerTimeout() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String[] tableNames = getUniqueNames(1);
String tableName = tableNames[0];
client.tableOperations().create(tableName);

try (BatchWriter bw = client.createBatchWriter(tableName)) {
Mutation m = new Mutation("r1");
m.put("cf1", "cq1", "v1");
m.put("cf1", "cq2", "v2");
m.put("cf1", "cq3", "v3");
m.put("cf1", "cq4", "v4");
bw.addMutation(m);
}

try (Scanner scanner = client.createScanner(tableName)) {
scanner.setRange(new Range());

// should not timeout
scanner.setTimeout(5, TimeUnit.SECONDS);
scanner.forEach((k, v) -> {});

IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
iterSetting.addOption("sleepTime", 6000 + "");
scanner.addScanIterator(iterSetting);

var exception = assertThrows(RuntimeException.class, () -> scanner.iterator().next(),
"scanner did not time out");
assertEquals(ThriftScanner.ScanTimedOutException.class, exception.getCause().getClass());
}
}
}
}