Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/main/java/io/pravega/perf/PravegaPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public static void main(String[] args) {
options.addOption("controller", true, "Controller URI");
options.addOption("scope", true, "Scope name");
options.addOption("stream", true, "Stream name");
options.addOption("streamRetentionType", true, "Retention type for stream: TIME/SIZE");
options.addOption("streamRetentionValue", true, "Retention value for stream. If retention" +
"type is by TIME then give value as no of minutes else if retention type is by SIZE then give value " +
"as size in bytes.");
options.addOption("producers", true, "Number of producers");
options.addOption("consumers", true, "Number of consumers");
options.addOption("events", true,
Expand Down Expand Up @@ -195,6 +199,8 @@ static private abstract class Test {
final String controllerUri;
final int messageSize;
final String streamName;
final String retentionType;
final int retentionValue;
final String rdGrpName;
final String scopeName;
final boolean recreate;
Expand Down Expand Up @@ -257,6 +263,8 @@ static private abstract class Test {
messageSize = parseIntOption(commandline, "size", 0);
streamName = parseStringOption(commandline, "stream", null);
scopeName = parseStringOption(commandline, "scope", SCOPE);
retentionType = parseStringOption(commandline, "streamRetentionType", null);
retentionValue = parseIntOption(commandline, "streamRetentionValue", 0);
transactionPerCommit = parseIntOption(commandline, "transactionspercommit", 0);
segmentCount = parseIntOption(commandline, "segments", producerCount);
segmentScaleKBps = parseIntOption(commandline, "segmentScaleKBps", 0);
Expand Down Expand Up @@ -311,6 +319,16 @@ static private abstract class Test {
throw new IllegalArgumentException("Error: Must specify stream Name");
}

if (retentionType != null && !retentionType.equalsIgnoreCase("TIME")
&& !retentionType.equalsIgnoreCase("SIZE")) {
throw new IllegalArgumentException("Error: Retention type can be TIME and SIZE only");
}

if (retentionType != null && retentionValue == 0) {
throw new IllegalArgumentException("Error: Must specify the retention value (for time it is no of " +
"minutes and for size it is size in byte)");
}

if (producerCount == 0 && consumerCount == 0) {
throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers");
}
Expand Down Expand Up @@ -434,7 +452,8 @@ static private class PravegaTest extends Test {
bgExecutor);

streamHandle = new PravegaStreamHandler(scopeName, streamName, rdGrpName, controllerUri, segmentCount,
segmentScaleKBps, segmentScaleEventsPerSecond, scaleFactor, TIMEOUT, controller, bgExecutor, createScope);
segmentScaleKBps, segmentScaleEventsPerSecond, scaleFactor, TIMEOUT, controller, bgExecutor,
createScope, retentionType, retentionValue);

if (producerCount > 0 && segmentCount > 0 && !streamHandle.create()) {
if (recreate) {
Expand Down Expand Up @@ -514,6 +533,8 @@ public String toString() {
return "streamName='" + streamName + '\'' +
", rdGrpName='" + rdGrpName + '\'' +
", scopeName='" + scopeName + '\'' +
", retentionType=" + retentionType +
", retentionValue=" + retentionValue +
", recreate=" + recreate +
", writeAndRead=" + writeAndRead +
", producerCount=" + producerCount +
Expand Down
29 changes: 25 additions & 4 deletions src/main/java/io/pravega/perf/PravegaStreamHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

package io.pravega.perf;

import io.pravega.client.stream.RetentionPolicy;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -55,13 +57,16 @@ public class PravegaStreamHandler {
final int segmentScaleKBps;
final int segmentScaleEventsPerSecond;
final int scaleFactor;
final String retentionType;
final int retentionValue;

ReaderGroupManager readerGroupManager;
ReaderGroupConfig rdGrpConfig;

PravegaStreamHandler(String scope, String stream, String rdGrpName, String uri, int segments, int segmentScaleKBps,
int segmentScaleEventsPerSecond, int scaleFactor, int timeout, ControllerImpl controller,
ScheduledExecutorService bgexecutor, boolean createScope) throws Exception {
ScheduledExecutorService bgexecutor, boolean createScope, String retentionType,
int retentionValue) throws Exception {
this.scope = scope;
this.stream = stream;
this.rdGrpName = rdGrpName;
Expand All @@ -74,6 +79,8 @@ public class PravegaStreamHandler {
this.segmentScaleKBps = segmentScaleKBps;
this.segmentScaleEventsPerSecond = segmentScaleEventsPerSecond;
this.streamManager = StreamManager.create(new URI(uri));
this.retentionType = retentionType;
this.retentionValue = retentionValue;

if (createScope) {
this.streamManager.createScope(scope);
Expand Down Expand Up @@ -179,9 +186,23 @@ private StreamConfiguration getStreamConfig() {
} else if (segmentScaleEventsPerSecond > 0) {
scalingPolicy = ScalingPolicy.byEventRate(segmentScaleEventsPerSecond, scaleFactor, segCount);
}
if (retentionType == null) {
return StreamConfiguration.builder()
.scalingPolicy(scalingPolicy)
.build();
} else {
RetentionPolicy retentionPolicy = null;
if (retentionType.equalsIgnoreCase("TIME")) {
retentionPolicy = RetentionPolicy.byTime(Duration.ofMinutes(retentionValue));
} else {
retentionPolicy = RetentionPolicy.bySizeBytes(retentionValue);
}
return StreamConfiguration.builder()
.scalingPolicy(scalingPolicy)
.retentionPolicy(retentionPolicy)
.build();
}


return StreamConfiguration.builder()
.scalingPolicy(scalingPolicy)
.build();
}
}