From 163da484239b40868da762c228d3f2c2db29cbdb Mon Sep 17 00:00:00 2001 From: Claudio Fahey Date: Wed, 4 Sep 2019 20:11:03 +0000 Subject: [PATCH 1/5] Issue 56: Number of unacked events and bytes are now periodically printed Signed-off-by: Claudio Fahey --- src/main/java/io/pravega/perf/PerfStats.java | 44 +++++++++++++------ .../io/pravega/perf/PravegaWriterWorker.java | 1 + .../java/io/pravega/perf/TriConsumer.java | 2 +- .../java/io/pravega/perf/WriterWorker.java | 4 +- 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index 30008c43a..608ff57fb 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -64,6 +64,10 @@ private TimeStamp(long endTime) { private boolean isEnd() { return this.bytes == -1 && this.startTime == -1; } + + private boolean isAcked() { + return this.endTime != -1; + } } public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) { @@ -100,21 +104,35 @@ public Void call() throws IOException { long time = startTime; long idleCount = 0; TimeStamp t; + long sentEvents = 0; + long sentBytes = 0; + long ackedEvents = 0; + long ackedBytes = 0; while (doWork) { t = queue.poll(); if (t != null) { - if (t.isEnd()) { - doWork = false; + if (t.isAcked()) { + if (t.isEnd()) { + doWork = false; + } else { + ackedEvents++; + ackedBytes += t.bytes; + final int latency = (int) (t.endTime - t.startTime); + window.record(t.bytes, latency); + latencyRecorder.record(t.startTime, t.bytes, latency); + } + time = t.endTime; + if (window.windowTimeMS(time) > windowInterval) { + window.print(time); + window.reset(time); + double unAckedMiB = (sentBytes - ackedBytes) / 1024.0 / 1024.0; + long unAckedEvents = sentEvents - ackedEvents; + System.out.printf("%.3f MiB in %d events unacked", unAckedMiB, unAckedEvents); + } } else { - final int latency = (int) (t.endTime - t.startTime); - window.record(t.bytes, latency); - latencyRecorder.record(t.startTime, t.bytes, latency); - } - time = t.endTime; - if (window.windowTimeMS(time) > windowInterval) { - window.print(time); - window.reset(time); + sentEvents++; + sentBytes += t.bytes; } } else { LockSupport.parkNanos(PARK_NS); @@ -184,7 +202,7 @@ private void print(long time) { final double recsPerSec = count / elapsed; final double mbPerSec = (this.bytes / (1024.0 * 1024.0)) / elapsed; - System.out.printf("%8d records %s, %9.1f records/sec, %6.2f MB/sec, %7.1f ms avg latency, %7.1f ms max latency\n", + System.out.printf("%8d records %s, %9.1f records/sec, %6.2f MiB/sec, %7.1f ms avg latency, %7.1f ms max latency\n", count, action, recsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency); } @@ -361,10 +379,10 @@ public synchronized void shutdown(long endTime) throws ExecutionException, Inter * Record the data write/read time of data. * * @param startTime starting time - * @param endTime End time + * @param endTime End time (-1 indicates that ack has not been received) * @param bytes number of bytes written or read **/ public void recordTime(long startTime, long endTime, int bytes) { queue.add(new TimeStamp(startTime, endTime, bytes)); } -} \ No newline at end of file +} diff --git a/src/main/java/io/pravega/perf/PravegaWriterWorker.java b/src/main/java/io/pravega/perf/PravegaWriterWorker.java index 9ad293daf..ea75784bf 100644 --- a/src/main/java/io/pravega/perf/PravegaWriterWorker.java +++ b/src/main/java/io/pravega/perf/PravegaWriterWorker.java @@ -41,6 +41,7 @@ public class PravegaWriterWorker extends WriterWorker { public long recordWrite(byte[] data, TriConsumer record) { CompletableFuture ret; final long time = System.currentTimeMillis(); + record.accept(time, -1, data.length); ret = producer.writeEvent(data); ret.thenAccept(d -> { record.accept(time, System.currentTimeMillis(), data.length); diff --git a/src/main/java/io/pravega/perf/TriConsumer.java b/src/main/java/io/pravega/perf/TriConsumer.java index 6d0b1c23b..41b52cd2e 100644 --- a/src/main/java/io/pravega/perf/TriConsumer.java +++ b/src/main/java/io/pravega/perf/TriConsumer.java @@ -11,5 +11,5 @@ package io.pravega.perf; public interface TriConsumer { - void accept(long a, long b, int c); + void accept(long startTime, long endTime, int bytes); } diff --git a/src/main/java/io/pravega/perf/WriterWorker.java b/src/main/java/io/pravega/perf/WriterWorker.java index 7a87f5b9c..81ea36a18 100644 --- a/src/main/java/io/pravega/perf/WriterWorker.java +++ b/src/main/java/io/pravega/perf/WriterWorker.java @@ -79,7 +79,7 @@ private Performance createBenchmark() { /** - * Writes the data and benchmark. + * Writes the data and records statistics. * * @param data data to write * @param record to call for benchmarking @@ -88,7 +88,7 @@ private Performance createBenchmark() { public abstract long recordWrite(byte[] data, TriConsumer record); /** - * Writes the data and benchmark. + * Writes the data. Does not record statistics. * * @param data data to write */ From 2fa333e17994d536a99e7d2edd0d6291101d353a Mon Sep 17 00:00:00 2001 From: Claudio Fahey Date: Wed, 4 Sep 2019 20:13:12 +0000 Subject: [PATCH 2/5] Added Dockerfile and Kubernetes job Signed-off-by: Claudio Fahey --- .dockerignore | 5 +++ .gitignore | 4 ++ Dockerfile | 48 ++++++++++++++++++++++ build.gradle | 32 +++++++-------- gradle.properties | 16 ++++++++ gradle/wrapper/gradle-wrapper.properties | 3 +- scripts/build-docker.sh | 11 +++++ scripts/pravega-benchmark-k8s.sh | 5 +++ scripts/pravega-benchmark.yaml | 39 ++++++++++++++++++ src/main/resources/simplelogger.properties | 34 +++++++++++++++ 10 files changed, 179 insertions(+), 18 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 gradle.properties create mode 100755 scripts/build-docker.sh create mode 100755 scripts/pravega-benchmark-k8s.sh create mode 100644 scripts/pravega-benchmark.yaml create mode 100644 src/main/resources/simplelogger.properties diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..ffb02389b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +.git +.gradle +.idea +build +out diff --git a/.gitignore b/.gitignore index 8b40a188e..035d4156d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,12 +7,16 @@ build .classpath .DS_Store bin/ +ca-certificates/ +!ca-certificates/README.txt classes/ .settings .gradle output .idea target +lib/ +!lib/README.txt log/ out/ .metadata/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..303e99c11 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,48 @@ +# Building Container + +FROM gradle:4.10-jdk8 as GradleBuilder + +USER 0 + +COPY ca-certificates/* /usr/local/share/ca-certificates/ +RUN update-ca-certificates + +RUN apt-get update \ + && apt-get install -y \ + maven \ + && rm -rf /var/lib/apt/lists/* + +USER gradle + +COPY --chown=gradle:gradle build.gradle /home/gradle/src/build.gradle +COPY --chown=gradle:gradle gradle /home/gradle/src/gradle +COPY --chown=gradle:gradle gradle.properties /home/gradle/src/gradle.properties +COPY --chown=gradle:gradle settings.gradle /home/gradle/src/settings.gradle +COPY --chown=gradle:gradle lib /home/gradle/src/lib +COPY --chown=gradle:gradle src /home/gradle/src/src + +WORKDIR /home/gradle/src + +ENV GRADLE_USER_HOME=/home/gradle +ENV CREDENTIALS_VERSION=0.5.0-2306.a5a5cdf-0.11.10-002.985e705 + +RUN mvn install:install-file \ +-Dfile=lib/pravega-keycloak-credentials-${CREDENTIALS_VERSION}-shadow.jar \ +-DgroupId=io.pravega \ +-DartifactId=pravega-keycloak-credentials \ +-Dversion=${CREDENTIALS_VERSION} -Dpackaging=jar + +RUN gradle installDist \ +--no-daemon --info --stacktrace \ +-PincludePravegaCredentials=true \ +-PpravegaCredentialsVersion=${CREDENTIALS_VERSION} + +# Runtime Container + +FROM openjdk:8-jre + +ENV APP_NAME=pravega-benchmark + +COPY --from=GradleBuilder /home/gradle/src/build/install/${APP_NAME} /opt/${APP_NAME} + +ENTRYPOINT ["/opt/pravega-benchmark/bin/pravega-benchmark"] diff --git a/build.gradle b/build.gradle index 101678b48..72a9913a3 100644 --- a/build.gradle +++ b/build.gradle @@ -13,29 +13,27 @@ apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'application' +mainClassName = "io.pravega.perf.PravegaPerfTest" -buildscript { - repositories { - jcenter() - } +repositories { + jcenter() + mavenLocal() + mavenCentral() } - repositories { - mavenLocal() - jcenter() - mavenCentral() - } +dependencies { + compile "io.pravega:pravega-client:${pravegaVersion}", + "io.pravega:pravega-common:${pravegaVersion}", + "commons-cli:commons-cli:${commonsCLIVersion}", + "org.apache.commons:commons-csv:1.5" - dependencies { + if (includePravegaCredentials.toBoolean()) { + compile "io.pravega:pravega-keycloak-credentials:${pravegaCredentialsVersion}" + } - compile "io.pravega:pravega-client:0.5.0", - "io.pravega:pravega-common:0.5.0", - "commons-cli:commons-cli:1.3.1", - "org.apache.commons:commons-csv:1.5" + runtime "org.slf4j:slf4j-simple:1.7.14" +} - runtime "org.slf4j:slf4j-simple:1.7.14" - } -mainClassName = "io.pravega.perf.PravegaPerfTest" startScripts { doLast { unixScript.text = unixScript.text.replace('SERVER_APP_HOME', '\$APP_HOME') diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 000000000..e0761ca45 --- /dev/null +++ b/gradle.properties @@ -0,0 +1,16 @@ +# +# Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# + +commonsCLIVersion=1.3.1 +pravegaCredentialsVersion=0.5.0-2306.a5a5cdf-0.11.10-002.985e705 +pravegaVersion=0.5.0 + +# Set below to true when using Pravega in Nautilus. +includePravegaCredentials=true diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index fb7ef980f..3ee08cf6c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Wed Sep 04 14:34:44 UTC 2019 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-all.zip diff --git a/scripts/build-docker.sh b/scripts/build-docker.sh new file mode 100755 index 000000000..878c4c14c --- /dev/null +++ b/scripts/build-docker.sh @@ -0,0 +1,11 @@ +#! /bin/bash +set -ex + +: ${DOCKER_REPOSITORY?"You must export DOCKER_REPOSITORY"} +: ${IMAGE_TAG?"You must export IMAGE_TAG"} + +ROOT_DIR=$(dirname $0)/.. + +docker build -f ${ROOT_DIR}/Dockerfile ${ROOT_DIR} --tag ${DOCKER_REPOSITORY}/pravega-benchmark:${IMAGE_TAG} + +docker push ${DOCKER_REPOSITORY}/pravega-benchmark:${IMAGE_TAG} diff --git a/scripts/pravega-benchmark-k8s.sh b/scripts/pravega-benchmark-k8s.sh new file mode 100755 index 000000000..da9ff4e43 --- /dev/null +++ b/scripts/pravega-benchmark-k8s.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +kubectl delete -f pravega-benchmark.yaml -n examples +kubectl apply -f pravega-benchmark.yaml -n examples +sleep 5s +kubectl logs -f jobs/pravega-benchmark -n examples diff --git a/scripts/pravega-benchmark.yaml b/scripts/pravega-benchmark.yaml new file mode 100644 index 000000000..d09e23843 --- /dev/null +++ b/scripts/pravega-benchmark.yaml @@ -0,0 +1,39 @@ +kind: Job +apiVersion: batch/v1 +metadata: + name: pravega-benchmark +spec: + parallelism: 3 + template: + spec: + serviceAccount: examples-pravega + restartPolicy: Never + containers: + - name: benchmark + image: claudiofahey/pravega-benchmark:0.5.0 + imagePullPolicy: Always + resources: + limits: + cpu: "2" + memory: "4Gi" + requests: + cpu: "1" + memory: "4Gi" + args: [ + "-controller", "tcp://nautilus-pravega-controller.nautilus-pravega.svc.cluster.local:9090", + "-scope", "examples", + "-stream", "benchmark5", + #"-recreate", "1", + "-segments", "96", + "-producers", "1", + "-time", "3600", # Number of seconds to run + "-size", "524288", +# "-throughput", "30", + ] + env: + - name: JAVA_OPTS + value: "-Xmx2g -Xms1g" + - name: pravega_client_auth_method + value: Bearer + - name: pravega_client_auth_loadDynamic + value: "true" diff --git a/src/main/resources/simplelogger.properties b/src/main/resources/simplelogger.properties new file mode 100644 index 000000000..eb7843b22 --- /dev/null +++ b/src/main/resources/simplelogger.properties @@ -0,0 +1,34 @@ +# SLF4J's SimpleLogger configuration file +# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err. + +# Default logging detail level for all instances of SimpleLogger. +# Must be one of ("trace", "debug", "info", "warn", or "error"). +# If not specified, defaults to "info". +org.slf4j.simpleLogger.defaultLogLevel=info + +# Logging detail level for a SimpleLogger instance named "xxxxx". +# Must be one of ("trace", "debug", "info", "warn", or "error"). +# If not specified, the default logging detail level is used. +#org.slf4j.simpleLogger.log.xxxxx= + +# Set to true if you want the current date and time to be included in output messages. +# Default is false, and will output the number of milliseconds elapsed since startup. +#org.slf4j.simpleLogger.showDateTime=false + +# The date and time format to be used in the output messages. +# The pattern describing the date and time format is the same that is used in java.text.SimpleDateFormat. +# If the format is not specified or is invalid, the default format is used. +# The default format is yyyy-MM-dd HH:mm:ss:SSS Z. +#org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z + +# Set to true if you want to output the current thread name. +# Defaults to true. +#org.slf4j.simpleLogger.showThreadName=true + +# Set to true if you want the Logger instance name to be included in output messages. +# Defaults to true. +#org.slf4j.simpleLogger.showLogName=true + +# Set to true if you want the last component of the name to be included in output messages. +# Defaults to false. +#org.slf4j.simpleLogger.showShortLogName=false \ No newline at end of file From 863ac8cd37a1b6ace07941be27c75a82f15c58be Mon Sep 17 00:00:00 2001 From: Claudio Fahey Date: Wed, 4 Sep 2019 20:13:40 +0000 Subject: [PATCH 3/5] Removed createScope to allow this to run in Nautilus Signed-off-by: Claudio Fahey --- src/main/java/io/pravega/perf/PravegaStreamHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/pravega/perf/PravegaStreamHandler.java b/src/main/java/io/pravega/perf/PravegaStreamHandler.java index 83e430cd3..2596c042c 100644 --- a/src/main/java/io/pravega/perf/PravegaStreamHandler.java +++ b/src/main/java/io/pravega/perf/PravegaStreamHandler.java @@ -66,7 +66,6 @@ public class PravegaStreamHandler { this.timeout = timeout; this.bgexecutor = bgexecutor; streamManager = StreamManager.create(new URI(uri)); - streamManager.createScope(scope); streamconfig = StreamConfiguration.builder().scope(scope).streamName(stream) .scalingPolicy(ScalingPolicy.fixed(segCount)) .build(); From e1d65995f0b60eba8e482ddc1fb4a76e9a2e2893 Mon Sep 17 00:00:00 2001 From: Claudio Fahey Date: Thu, 5 Sep 2019 20:40:13 +0000 Subject: [PATCH 4/5] Added option to disable connection pooling for producers Signed-off-by: Claudio Fahey --- scripts/pravega-benchmark-k8s.sh | 7 ++++--- scripts/pravega-benchmark.yaml | 7 ++++--- src/main/java/io/pravega/perf/PravegaPerfTest.java | 8 ++++++-- .../perf/PravegaTransactionWriterWorker.java | 4 ++-- .../java/io/pravega/perf/PravegaWriterWorker.java | 13 +++++++++++-- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/scripts/pravega-benchmark-k8s.sh b/scripts/pravega-benchmark-k8s.sh index da9ff4e43..5458688c9 100755 --- a/scripts/pravega-benchmark-k8s.sh +++ b/scripts/pravega-benchmark-k8s.sh @@ -1,5 +1,6 @@ #!/usr/bin/env bash -kubectl delete -f pravega-benchmark.yaml -n examples -kubectl apply -f pravega-benchmark.yaml -n examples +NAMESPACE=${NAMESPACE:-examples} +kubectl delete -f pravega-benchmark.yaml -n ${NAMESPACE} +kubectl apply -f pravega-benchmark.yaml -n ${NAMESPACE} sleep 5s -kubectl logs -f jobs/pravega-benchmark -n examples +kubectl logs -f jobs/pravega-benchmark -n ${NAMESPACE} diff --git a/scripts/pravega-benchmark.yaml b/scripts/pravega-benchmark.yaml index d09e23843..e54408c12 100644 --- a/scripts/pravega-benchmark.yaml +++ b/scripts/pravega-benchmark.yaml @@ -6,7 +6,7 @@ spec: parallelism: 3 template: spec: - serviceAccount: examples-pravega + serviceAccount: examples3-pravega restartPolicy: Never containers: - name: benchmark @@ -21,14 +21,15 @@ spec: memory: "4Gi" args: [ "-controller", "tcp://nautilus-pravega-controller.nautilus-pravega.svc.cluster.local:9090", - "-scope", "examples", + "-scope", "examples3", "-stream", "benchmark5", #"-recreate", "1", "-segments", "96", "-producers", "1", "-time", "3600", # Number of seconds to run "-size", "524288", -# "-throughput", "30", + "-throughput", "30", + "-enableConnectionPooling", "false", ] env: - name: JAVA_OPTS diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index e90cc9638..a0609030c 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -80,6 +80,7 @@ public static void main(String[] args) { "if -1, get the maximum throughput"); options.addOption("writecsv", true, "CSV file to record write latencies"); options.addOption("readcsv", true, "CSV file to record read latencies"); + options.addOption("enableConnectionPooling", true, "enable connection pooling"); options.addOption("help", false, "Help message"); @@ -194,6 +195,7 @@ static private abstract class Test { final PerfStats produceStats; final PerfStats consumeStats; final long startTime; + final boolean enableConnectionPooling; Test(long startTime, CommandLine commandline) throws IllegalArgumentException { this.startTime = startTime; @@ -293,6 +295,8 @@ static private abstract class Test { readFile = null; } + enableConnectionPooling = Boolean.parseBoolean(commandline.getOptionValue("enableConnectionPooling", "true")); + if (controllerUri == null) { throw new IllegalArgumentException("Error: Must specify Controller IP address"); } @@ -431,7 +435,7 @@ public List getProducers() { messageSize, startTime, produceStats, streamName, eventsPerSec, writeAndRead, factory, - transactionPerCommit)) + transactionPerCommit, enableConnectionPooling)) .collect(Collectors.toList()); } else { writers = IntStream.range(0, producerCount) @@ -439,7 +443,7 @@ public List getProducers() { .map(i -> new PravegaWriterWorker(i, eventsPerProducer, EventsPerFlush, runtimeSec, false, messageSize, startTime, produceStats, - streamName, eventsPerSec, writeAndRead, factory)) + streamName, eventsPerSec, writeAndRead, factory, enableConnectionPooling)) .collect(Collectors.toList()); } } else { diff --git a/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java b/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java index 7bb93a13f..c0805cf68 100644 --- a/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java +++ b/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java @@ -29,10 +29,10 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker { int secondsToRun, boolean isRandomKey, int messageSize, long start, PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead, - ClientFactory factory, int transactionsPerCommit) { + ClientFactory factory, int transactionsPerCommit, boolean enableConnectionPooling) { super(sensorId, events, Integer.MAX_VALUE, secondsToRun, isRandomKey, - messageSize, start, stats, streamName, eventsPerSec, writeAndRead, factory); + messageSize, start, stats, streamName, eventsPerSec, writeAndRead, factory, enableConnectionPooling); this.transactionsPerCommit = transactionsPerCommit; eventCount = 0; diff --git a/src/main/java/io/pravega/perf/PravegaWriterWorker.java b/src/main/java/io/pravega/perf/PravegaWriterWorker.java index ea75784bf..3230cf93b 100644 --- a/src/main/java/io/pravega/perf/PravegaWriterWorker.java +++ b/src/main/java/io/pravega/perf/PravegaWriterWorker.java @@ -16,25 +16,34 @@ import io.pravega.client.ClientFactory; import io.pravega.client.stream.impl.ByteArraySerializer; import io.pravega.client.stream.EventWriterConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class for Pravega writer/producer. */ public class PravegaWriterWorker extends WriterWorker { + private static Logger log = LoggerFactory.getLogger(PravegaWriterWorker.class); + final EventStreamWriter producer; PravegaWriterWorker(int sensorId, int events, int EventsPerFlush, int secondsToRun, boolean isRandomKey, int messageSize, long start, PerfStats stats, String streamName, int eventsPerSec, - boolean writeAndRead, ClientFactory factory) { + boolean writeAndRead, ClientFactory factory, + boolean enableConnectionPooling) { super(sensorId, events, EventsPerFlush, secondsToRun, isRandomKey, messageSize, start, stats, streamName, eventsPerSec, writeAndRead); + log.info("enableConnectionPooling={}", enableConnectionPooling); + this.producer = factory.createEventWriter(streamName, new ByteArraySerializer(), - EventWriterConfig.builder().build()); + EventWriterConfig.builder() + .enableConnectionPooling(enableConnectionPooling) + .build()); } @Override From 263fb2638c378944e94602eb2a4c6fccfb3c8981 Mon Sep 17 00:00:00 2001 From: Claudio Fahey Date: Fri, 6 Sep 2019 03:42:42 +0000 Subject: [PATCH 5/5] Statistics are now logged using the slf4j logger so that timestamps are printed Signed-off-by: Claudio Fahey --- src/main/java/io/pravega/perf/PerfStats.java | 23 ++++++++++++-------- src/main/resources/simplelogger.properties | 4 ++-- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index 608ff57fb..aa0cd0eb2 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -24,6 +24,8 @@ import org.apache.commons.csv.CSVPrinter; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; @@ -33,6 +35,8 @@ * Class for Performance statistics. */ public class PerfStats { + private static Logger log = LoggerFactory.getLogger(PerfStats.class); + final private String action; final private String csvFile; final private int messageSize; @@ -124,11 +128,10 @@ public Void call() throws IOException { } time = t.endTime; if (window.windowTimeMS(time) > windowInterval) { - window.print(time); - window.reset(time); double unAckedMiB = (sentBytes - ackedBytes) / 1024.0 / 1024.0; long unAckedEvents = sentEvents - ackedEvents; - System.out.printf("%.3f MiB in %d events unacked", unAckedMiB, unAckedEvents); + log.info(String.format("%s, %.3f MiB in %d events unacked", window.toString(time), unAckedMiB, unAckedEvents)); + window.reset(time); } } else { sentEvents++; @@ -141,7 +144,7 @@ public Void call() throws IOException { time = System.currentTimeMillis(); idleCount = 0; if (window.windowTimeMS(time) > windowInterval) { - window.print(time); + log.info(window.toString(time)); window.reset(time); } } @@ -193,16 +196,18 @@ private void record(long bytes, int latency) { } /** - * Print the window statistics + * Get the window statistics as a string + * + * Note that record and byte counters are for acked events. */ - private void print(long time) { + private String toString(long time) { this.lastTime = time; assert this.lastTime > this.startTime : "Invalid Start and EndTime"; final double elapsed = (this.lastTime - this.startTime) / 1000.0; final double recsPerSec = count / elapsed; final double mbPerSec = (this.bytes / (1024.0 * 1024.0)) / elapsed; - System.out.printf("%8d records %s, %9.1f records/sec, %6.2f MiB/sec, %7.1f ms avg latency, %7.1f ms max latency\n", + return String.format("%8d records %s, %9.1f records/sec, %6.2f MiB/sec, %7.1f ms avg latency, %7.1f ms max latency", count, action, recsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency); } @@ -292,11 +297,11 @@ public void printTotal(long endTime) { final double mbPerSec = (this.totalBytes / (1024.0 * 1024.0)) / elapsed; int[] percs = getPercentiles(); - System.out.printf( + log.info(String.format( "%d records %s, %.3f records/sec, %d bytes record size, %.2f MB/sec, %.1f ms avg latency, %.1f ms max latency" + ", %d ms 50th, %d ms 75th, %d ms 95th, %d ms 99th, %d ms 99.9th, %d ms 99.99th.\n", count, action, recsPerSec, messageSize, mbPerSec, totalLatency / ((double) count), (double) maxLatency, - percs[0], percs[1], percs[2], percs[3], percs[4], percs[5]); + percs[0], percs[1], percs[2], percs[3], percs[4], percs[5])); } } diff --git a/src/main/resources/simplelogger.properties b/src/main/resources/simplelogger.properties index eb7843b22..3387c234f 100644 --- a/src/main/resources/simplelogger.properties +++ b/src/main/resources/simplelogger.properties @@ -13,13 +13,13 @@ org.slf4j.simpleLogger.defaultLogLevel=info # Set to true if you want the current date and time to be included in output messages. # Default is false, and will output the number of milliseconds elapsed since startup. -#org.slf4j.simpleLogger.showDateTime=false +org.slf4j.simpleLogger.showDateTime=true # The date and time format to be used in the output messages. # The pattern describing the date and time format is the same that is used in java.text.SimpleDateFormat. # If the format is not specified or is invalid, the default format is used. # The default format is yyyy-MM-dd HH:mm:ss:SSS Z. -#org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z +org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z # Set to true if you want to output the current thread name. # Defaults to true.