Skip to content

Commit 951d666

Browse files
committed
Improve load generator testability and re-useability
1 parent 7bea803 commit 951d666

File tree

1 file changed

+29
-14
lines changed

1 file changed

+29
-14
lines changed

load-generator/src/main/java/com/dynatrace/research/shufflebench/KafkaLoadGenerator.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,14 @@ public class KafkaLoadGenerator {
1616

1717
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLoadGenerator.class);
1818

19-
private static final long SEED = 0xd966c7902a8716f9L;
19+
private final int executionTimeMs;
2020

2121
private final String seedString;
2222

2323
private final String kafkaBootstrapServers;
2424

2525
private final String kafkaTopic;
2626

27-
private final int executionTimeMs;
28-
2927
private final int numSources;
3028
private final int recordsPerSecondAndSource;
3129

@@ -36,16 +34,23 @@ public class KafkaLoadGenerator {
3634
private final List<KafkaSender> openKafkaSenders = new ArrayList<>();
3735
private final List<RecordSource> openRecordSources = new ArrayList<>();
3836

39-
public KafkaLoadGenerator() {
40-
final Config config = ConfigProvider.getConfig();
41-
executionTimeMs = config.getValue("execution.time.ms", Integer.class);
42-
seedString = config.getValue("seed.string", String.class);
43-
kafkaBootstrapServers = config.getValue("kafka.bootstrap.servers", String.class);
44-
kafkaTopic = config.getValue("kafka.topic", String.class);
45-
numSources = config.getValue("num.sources", Integer.class);
46-
recordsPerSecondAndSource = config.getValue("num.records.per.source.second", Integer.class);
47-
recordSizeInBytes = config.getValue("record.size.bytes", Integer.class);
48-
final int threadPoolSize = config.getValue("thread.pool.size", Integer.class);
37+
public KafkaLoadGenerator(
38+
int executionTimeMs,
39+
String seedString,
40+
String kafkaBootstrapServers,
41+
String kafkaTopic,
42+
int numSources,
43+
int recordsPerSecondAndSource,
44+
int recordSizeInBytes,
45+
int threadPoolSize
46+
) {
47+
this.executionTimeMs = executionTimeMs;
48+
this.seedString = seedString;
49+
this.kafkaBootstrapServers = kafkaBootstrapServers;
50+
this.kafkaTopic = kafkaTopic;
51+
this.numSources = numSources;
52+
this.recordsPerSecondAndSource = recordsPerSecondAndSource;
53+
this.recordSizeInBytes = recordSizeInBytes;
4954
this.executor = new ScheduledThreadPoolExecutor(threadPoolSize);
5055
}
5156

@@ -80,7 +85,17 @@ public void stop() throws InterruptedException, IOException {
8085
}
8186

8287
public static void main(String[] args) throws InterruptedException, IOException {
83-
final KafkaLoadGenerator kafkaLoadGenerator = new KafkaLoadGenerator();
88+
final Config config = ConfigProvider.getConfig();
89+
final KafkaLoadGenerator kafkaLoadGenerator = new KafkaLoadGenerator(
90+
config.getValue("execution.time.ms", Integer.class),
91+
config.getValue("seed.string", String.class),
92+
config.getValue("kafka.bootstrap.servers", String.class),
93+
config.getValue("kafka.topic", String.class),
94+
config.getValue("num.sources", Integer.class),
95+
config.getValue("num.records.per.source.second", Integer.class),
96+
config.getValue("record.size.bytes", Integer.class),
97+
config.getValue("thread.pool.size", Integer.class)
98+
);
8499
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
85100
LOGGER.info("Shut down load generator.");
86101
try {

0 commit comments

Comments
 (0)