From 89a7c6ed41a0e5268c620e15a4ea24ae6ef2f213 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Fri, 19 Dec 2014 11:32:09 -0800 Subject: [PATCH 01/11] Add support for port and reporting interval Lots of cleanup. Added support for hostname:port@interval in the startup, so you can tune how often the reporter runs and the port it communicates on. Renamed classes to be lookout classes. Switched to testng and added a parser test for the hostname parsing. Improved documentation. --- .gitignore | 2 + README.md | 41 +++++-- pom.xml | 111 +++++++++--------- src/main/java/META-INF/MANIFEST.MF | 4 +- .../metrics/agent/HostPortInterval.java | 68 +++++++++++ .../lookout/metrics/agent/ReportAgent.java | 33 ++++++ .../metrics/agent}/StatsdReporter.java | 69 +++++------ .../com/shift/undersiege/ReportAgent.java | 43 ------- .../test/HostPortIntervalParserTest.java | 69 +++++++++++ 9 files changed, 288 insertions(+), 152 deletions(-) create mode 100644 src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java create mode 100644 src/main/java/com/github/lookout/metrics/agent/ReportAgent.java rename src/main/java/com/{shift/undersiege => github/lookout/metrics/agent}/StatsdReporter.java (70%) delete mode 100644 src/main/java/com/shift/undersiege/ReportAgent.java create mode 100644 src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java diff --git a/.gitignore b/.gitignore index f0b438d..cb8cec8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ target/ node_modules/ out/ *.class +.idea +*.iml diff --git a/README.md b/README.md index daa8537..5b4eb2b 100644 --- a/README.md +++ b/README.md @@ -7,30 +7,51 @@ Cassandra Statsd Reporting Tool REQUIREMENTS ---------------- * Maven -* Java +* Java 7 BUILD ---------------- -`mvn package` +Check which metrics library cassandra is using by looking in +cassandra/lib/metrics-core*, verify that pom.xml points to the +same exact version. For example, if you have metrics-core-2.2.0.jar, +make sure pom.xml has 2.2.0. -It may be necessary to build the jar against the same version of the metrics library being used in Cassandra. +`mvn package` -INSTAL +INSTALL ---------------- -Toss the appropriate version of statsd library (hopefully in your .m2 folder by now) in your cassandra/lib/ directory. +Copy the statsd library from the .m2 folder to cassandra/lib. Add the following to your cassandra startup script: -`JVM_OPTS="$JVM_OPTS -javaagent:/path/to/built.jar=localhost"` - -Or whatever path you've decided to put your agent. - -Note the '=localhost' at the end. You should change this to your statsd instance. +Copy the agent-1.0.jar to a new directory cassandra/plugins +Change cassandra startup to add this agent. This can be done in +a stock install by adding the following to /etc/default/cassandra: +`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.0.jar=localhost"` +Note the '=localhost' at the end. This supports the following syntaxes: +`hostname:port@interval` +For example: +`your.statsd.host.com:9999@60` +The default port is 8125 and the default interval is 10 (seconds); these +can be omitted. IPV6 is also supported with the following syntax: +`[2001:db8::1]:8888@300` +which means connect to 2001:db8::1 on port 8888 every 300 seconds. +REPORTING +---------------- +A log message will be added to the system.log at startup to +confirm that everything is running, it looks like this: +`INFO [metrics-statsd-thread-1] 2014-12-19 19:05:37,120 StatsdReporter.java:65 - Statsd reporting to host localhost port 8125 every 10 seconds` +TODO +---------------- +Errors that happen during startup are not reported as well as they should +be, mostly because the logging system is not active during startup. The log +message is only generated when the actual metrics collector has run for the +first time. diff --git a/pom.xml b/pom.xml index 7771122..1b3a0c7 100644 --- a/pom.xml +++ b/pom.xml @@ -1,58 +1,57 @@ - 4.0.0 - - undersiege - undersiege - - 0.1 - jar - - cassandra-statsd-reporter-smm - http://maven.apache.org - - - UTF-8 - 1.6 - 1.6 - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - src/main/java/META-INF/MANIFEST.MF - - - - - - - - - - junit - junit - 3.8.1 - test - - - - com.yammer.metrics - metrics-core - 2.2.0 - - - - com.timgroup - java-statsd-client - 2.0.0 - - - - - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + + com.github.lookout.metrics + agent + + 1.0 + jar + + cassandra-statsd-reporter + + + UTF-8 + 1.7 + 1.7 + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + src/main/java/META-INF/MANIFEST.MF + + + + + + + + + + com.yammer.metrics + metrics-core + 2.2.0 + provided + + + + com.timgroup + java-statsd-client + 2.0.0 + provided + + + org.testng + testng + 6.8.8 + test + + + diff --git a/src/main/java/META-INF/MANIFEST.MF b/src/main/java/META-INF/MANIFEST.MF index d629a3f..c3b27a0 100644 --- a/src/main/java/META-INF/MANIFEST.MF +++ b/src/main/java/META-INF/MANIFEST.MF @@ -1,2 +1,2 @@ -Manifest-Version: 1.0 -Premain-Class: com.shift.undersiege.ReportAgent +Manifest-Version: 1.0 +Premain-Class: com.github.lookout.metrics.agent.ReportAgent diff --git a/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java b/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java new file mode 100644 index 0000000..859af5d --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java @@ -0,0 +1,68 @@ +package com.github.lookout.metrics.agent; + +/** + * Created by rkuris on 12/19/14. + */ +public class HostPortInterval { + public static final String DEFAULT_HOST = "localhost"; + public static final int DEFAULT_PORT = 8125; + public static final int DEFAULT_INTERVAL = 10; + + private final String host; + private final int port; + private final int interval; + + public HostPortInterval(final String hostPortInterval) { + if (hostPortInterval == null || hostPortInterval.isEmpty()) { + this.host = DEFAULT_HOST; + this.port = DEFAULT_PORT; + this.interval = DEFAULT_INTERVAL; + return; + } + int intervalOffset = hostPortInterval.lastIndexOf('@'); + final String hostPort; + if (intervalOffset == -1) { + this.interval = DEFAULT_INTERVAL; + hostPort = hostPortInterval; + } else { + this.interval = Integer.parseInt(hostPortInterval.substring(intervalOffset + 1)); + hostPort = hostPortInterval.substring(0, intervalOffset); + } + int colonOffset = hostPort.lastIndexOf(':'); + if (colonOffset == -1 || hostPort.endsWith("]")) { + this.host = stripBrackets(hostPort); + this.port = DEFAULT_PORT; + } else { + final String hostPart = hostPort.substring(0, colonOffset); + final String portPart = hostPort.substring(colonOffset + 1); + this.host = stripBrackets(hostPart); + this.port = Integer.parseInt(portPart); + } + + } + + private String stripBrackets(final String source) { + int sourceLength = source.length(); + if (sourceLength > 2 && source.charAt(0) == '[' && source.charAt(sourceLength - 1) == ']') { + return source.substring(1, sourceLength - 1); + } + return source; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public int getInterval() { + return interval; + } + + @Override + public String toString() { + return String.format("host %s port %d every %d seconds", host, port, interval); + } +} diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java new file mode 100644 index 0000000..31e2d5d --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java @@ -0,0 +1,33 @@ +package com.github.lookout.metrics.agent; + +import java.io.IOException; +import java.lang.instrument.Instrumentation; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; + +public class ReportAgent { + public static void premain(final String agentArgs, final Instrumentation inst) throws IOException { + String host; + try { + host = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + host = "unknown-host"; + } + + for (final String reportingHostPort : agentArgs.split(",")) { + final HostPortInterval hostPortInterval = new HostPortInterval(reportingHostPort); + final StatsdReporter reporter = new StatsdReporter(hostPortInterval, host); + reporter.start(hostPortInterval.getInterval(), TimeUnit.SECONDS); + } + } + + public static void main(final String[] args) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // impossible + } + } +} + diff --git a/src/main/java/com/shift/undersiege/StatsdReporter.java b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java similarity index 70% rename from src/main/java/com/shift/undersiege/StatsdReporter.java rename to src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java index 2b56654..4d160d0 100644 --- a/src/main/java/com/shift/undersiege/StatsdReporter.java +++ b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java @@ -14,10 +14,10 @@ * limitations under the License. */ /** - Edited by Jon Haddad at SHIFT to work with + Edited by Jon Haddad at SHIFT to work with */ -package com.shift.undersiege; +package com.github.lookout.metrics.agent; import com.timgroup.statsd.NonBlockingStatsDClient; @@ -28,9 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.TimeUnit; @@ -40,17 +39,19 @@ public class StatsdReporter extends AbstractPollingReporter implements MetricPro private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class); protected final VirtualMachineMetrics vm; - private StatsDClient statsd; - private boolean printVMMetrics = false; + private final StatsDClient statsd; protected final Clock clock; - private MetricPredicate predicate = MetricPredicate.ALL; + private static final MetricPredicate predicate = MetricPredicate.ALL; - private HashMap previous_run_times; - private HashMap previous_run_counts; + private boolean reportedStartup = false; + private final HostPortInterval hostPortInterval; + private final HashMap previous_run_times; + private final HashMap previous_run_counts; - public StatsdReporter(String host, int port, String prefix) throws IOException { + public StatsdReporter(final HostPortInterval hostPortInterval, final String prefix) { super(Metrics.defaultRegistry(), "statsd"); - statsd = new NonBlockingStatsDClient(prefix, host, port); + this.hostPortInterval = hostPortInterval; + statsd = new NonBlockingStatsDClient(prefix, hostPortInterval.getHost(), hostPortInterval.getPort()); vm = VirtualMachineMetrics.getInstance(); clock = Clock.defaultClock(); previous_run_times = new HashMap(); @@ -60,15 +61,13 @@ public StatsdReporter(String host, int port, String prefix) throws IOException { @Override public void run() { + if (!reportedStartup || LOG.isDebugEnabled()) { + LOG.info("Statsd reporting to {}", hostPortInterval); + reportedStartup = true; + } try { final long epoch = clock.time() / 1000; - if (this.printVMMetrics) { - printVmMetrics(epoch); - } - printRegularMetrics(epoch); - - // Send UDP data - + printMetrics(epoch); } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Error writing to statsd", e); @@ -78,7 +77,7 @@ public void run() { } } - protected void printVmMetrics(long epoch) { + protected void printMetrics(long epoch) { // Memory int div = 1048576; statsd.gauge("jvm.memory.totalInitInMB", (int) vm.totalInit() / div); @@ -93,7 +92,7 @@ protected void printVmMetrics(long epoch) { statsd.gauge("jvm.fd_usage", (int) vm.fileDescriptorUsage()); - for (Map.Entry entry : vm.garbageCollectors().entrySet()) { + for (Map.Entry entry : vm.garbageCollectors().entrySet()) { // we only care about the delta times for the GC time and GC runs final String name = "jvm.gc." + entry.getKey(); @@ -115,31 +114,26 @@ protected void printVmMetrics(long epoch) { Integer previous_total_runs = previous_run_counts.get(stat_run_count); - if(previous_total_runs == null) { + if (previous_total_runs == null) { previous_total_runs = 0; } statsd.gauge(stat_run_count, total_runs - previous_total_runs); previous_run_counts.put(stat_run_count, total_runs); } - } - - protected void printRegularMetrics(long epoch) { - printVmMetrics(epoch); - Set>> entries = getMetricsRegistry().groupedMetrics(predicate).entrySet(); + final Set>> entries = getMetricsRegistry().groupedMetrics(predicate).entrySet(); - for (Map.Entry> entry : entries) { - for (Map.Entry subEntry : entry.getValue().entrySet()) { + for (final Map.Entry> entry : entries) { + for (final Map.Entry subEntry : entry.getValue().entrySet()) { final Metric metric = subEntry.getValue(); if (metric != null) { try { metric.processWith(this, subEntry.getKey(), epoch); - //statsd.gauge(subEntry.getKey(), subEntry.getValue().); - } catch (Exception ignored) { - LOG.error("Error printing regular com.shift.undersiege:", ignored); + } catch (final Exception exception) { + LOG.error("Error processing key {}", subEntry.getKey(), exception); } } } @@ -148,9 +142,7 @@ protected void printRegularMetrics(long epoch) { @Override public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception { -// System.out.printf("Printing process meter %s %d\n", name.getName(), meter.count()); - -// + LOG.debug("Meter {} {} skipped", name.getName(), meter.count()); } @Override @@ -160,21 +152,16 @@ public void processCounter(MetricName name, Counter counter, Long epoch) throws @Override public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception { -// System.out.printf("process histogram %s %f\n", name.getName(), histogram.mean()); + LOG.debug("Histogram {} mean {} skipped", name.getName(), histogram.mean()); } @Override public void processTimer(MetricName name, Timer timer, Long context) throws Exception { -// System.out.printf("timer %s\n", name.getName()); - //To change body of implemented methods use File | Settings | File Templates. + LOG.debug("Timer {} skipped", name.getName()); } @Override public void processGauge(MetricName name, Gauge gauge, Long context) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. -// System.out.printf("gauge %s %s\n", name.getName(), gauge.toString()); statsd.gauge(name.getName(), gauge.hashCode()); } - - } diff --git a/src/main/java/com/shift/undersiege/ReportAgent.java b/src/main/java/com/shift/undersiege/ReportAgent.java deleted file mode 100644 index 9b7684a..0000000 --- a/src/main/java/com/shift/undersiege/ReportAgent.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.shift.undersiege; - -import java.io.IOException; -import java.lang.instrument.Instrumentation; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.TimeUnit; - -import com.yammer.metrics.reporting.ConsoleReporter; -//import com.yammer.com.shift.undersiege.reporting.GraphiteReporter; -// - -public class ReportAgent -{ - public static void premain(String agentArgs, Instrumentation inst) throws IOException - { - // comma separated list of - String host; - try { - host = InetAddress.getLocalHost().getHostName(); - } - catch (UnknownHostException e) { - host = "unknown-host"; - } - - StatsdReporter reporter = new StatsdReporter(agentArgs, 8125, host); - reporter.start(10, TimeUnit.SECONDS); - - System.out.println("STATSD STARTING sending to " + agentArgs + " with host prefix " + host); - - } - public static void main(String[] args) - { - System.out.println("Main running."); - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - System.out.println("Done."); - } -} - diff --git a/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java b/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java new file mode 100644 index 0000000..6028765 --- /dev/null +++ b/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java @@ -0,0 +1,69 @@ +package com.github.lookout.metrics.agent.test; + +import com.github.lookout.metrics.agent.HostPortInterval; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +/** + * Created by rkuris on 12/19/14. + */ +public class HostPortIntervalParserTest { + + @Test + public void testDefault() { + HostPortInterval result = new HostPortInterval(null); + assertEquals(result.getHost(), HostPortInterval.DEFAULT_HOST); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testDefaultPort() { + HostPortInterval result = new HostPortInterval("host"); + assertEquals(result.getHost(), "host"); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIPV4() { + HostPortInterval result = new HostPortInterval("1.2.3.4:56"); + assertEquals(result.getHost(), "1.2.3.4"); + assertEquals(result.getPort(), 56); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIPV6DefaultPort() { + HostPortInterval result = new HostPortInterval("[::1]"); + assertEquals(result.getHost(), "::1"); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIPV6() { + HostPortInterval result = new HostPortInterval("[::1]:9123"); + assertEquals(result.getHost(), "::1"); + assertEquals(result.getPort(), 9123); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIntervalDefaultPort() { + HostPortInterval result = new HostPortInterval("1.2.3.4@1"); + assertEquals(result.getHost(), "1.2.3.4"); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), 1); + } + + @Test + public void testWholeEnchilada() { + HostPortInterval result = new HostPortInterval("[2001:db8::1]:210@321"); + assertEquals(result.getHost(), "2001:db8::1"); + assertEquals(result.getPort(), 210); + assertEquals(result.getInterval(), 321); + assertEquals(result.toString(), "host 2001:db8::1 port 210 every 321 seconds"); + } +} From 4dcb7f59572bd4e0cb8df32082487bd914cc0c4a Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Tue, 23 Dec 2014 11:24:30 -0800 Subject: [PATCH 02/11] Fixed some stats that were always zero Added tests for statsdReporter, verified that the JVM reports non-zero values for everything. --- pom.xml | 12 ++++ .../lookout/metrics/agent/ReportAgent.java | 6 +- .../lookout/metrics/agent/StatsdReporter.java | 24 ++++--- .../lookout/metrics/agent/test/StatsTest.java | 65 +++++++++++++++++++ 4 files changed, 97 insertions(+), 10 deletions(-) create mode 100644 src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java diff --git a/pom.xml b/pom.xml index 1b3a0c7..1a7a1c7 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,18 @@ 6.8.8 test + + org.mockito + mockito-core + 1.10.17 + test + + + org.slf4j + slf4j-simple + 1.7.9 + test + diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java index 31e2d5d..f72a71c 100644 --- a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java +++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java @@ -1,5 +1,8 @@ package com.github.lookout.metrics.agent; +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; + import java.io.IOException; import java.lang.instrument.Instrumentation; import java.net.InetAddress; @@ -17,7 +20,8 @@ public static void premain(final String agentArgs, final Instrumentation inst) t for (final String reportingHostPort : agentArgs.split(",")) { final HostPortInterval hostPortInterval = new HostPortInterval(reportingHostPort); - final StatsdReporter reporter = new StatsdReporter(hostPortInterval, host); + final StatsDClient client = new NonBlockingStatsDClient(host, hostPortInterval.getHost(), hostPortInterval.getPort()); + final StatsdReporter reporter = new StatsdReporter(hostPortInterval, client); reporter.start(hostPortInterval.getInterval(), TimeUnit.SECONDS); } } diff --git a/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java index 4d160d0..105ed64 100644 --- a/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java +++ b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java @@ -48,10 +48,10 @@ public class StatsdReporter extends AbstractPollingReporter implements MetricPro private final HashMap previous_run_times; private final HashMap previous_run_counts; - public StatsdReporter(final HostPortInterval hostPortInterval, final String prefix) { + public StatsdReporter(final HostPortInterval hostPortInterval, final StatsDClient statsd) { super(Metrics.defaultRegistry(), "statsd"); this.hostPortInterval = hostPortInterval; - statsd = new NonBlockingStatsDClient(prefix, hostPortInterval.getHost(), hostPortInterval.getPort()); + this.statsd = statsd; vm = VirtualMachineMetrics.getInstance(); clock = Clock.defaultClock(); previous_run_times = new HashMap(); @@ -77,20 +77,26 @@ public void run() { } } + private int bytesToMB(double bytes) { + return (int)(bytes/(1024*1024)); + } + private int doubleToPct(double pct) { + return (int) Math.round(100 * pct); + } + protected void printMetrics(long epoch) { // Memory - int div = 1048576; - statsd.gauge("jvm.memory.totalInitInMB", (int) vm.totalInit() / div); - statsd.gauge("jvm.memory.totalUsedInMB", (int) vm.totalUsed() / div); - statsd.gauge("jvm.memory.heapUsedInMB", (int) vm.heapUsed() / div); + statsd.gauge("jvm.memory.totalInitInMB", bytesToMB(vm.totalInit())); + statsd.gauge("jvm.memory.totalUsedInMB", bytesToMB(vm.totalUsed())); + statsd.gauge("jvm.memory.heapUsedInMB", bytesToMB(vm.heapUsed())); - statsd.gauge("jvm.memory.heapUsageInMB", (int) vm.heapUsage() / div); + statsd.gauge("jvm.memory.heapUsagePercent", doubleToPct(vm.heapUsage())); for (Map.Entry pool : vm.memoryPoolUsage().entrySet()) { - statsd.gauge("jvm.memory.memory_pool_usages." + pool.getKey(), pool.getValue().intValue() / div); + statsd.gauge("jvm.memory.memory_pool_usages." + pool.getKey() + "Percent", doubleToPct(pool.getValue())); } - statsd.gauge("jvm.fd_usage", (int) vm.fileDescriptorUsage()); + statsd.gauge("jvm.fdUsagePercent", doubleToPct(vm.fileDescriptorUsage())); for (Map.Entry entry : vm.garbageCollectors().entrySet()) { // we only care about the delta times for the GC time and GC runs diff --git a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java new file mode 100644 index 0000000..c5fa045 --- /dev/null +++ b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java @@ -0,0 +1,65 @@ +package com.github.lookout.metrics.agent.test; + +import com.github.lookout.metrics.agent.HostPortInterval; +import com.github.lookout.metrics.agent.StatsdReporter; +import com.timgroup.statsd.StatsDClient; +import org.mockito.*; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +/** + * Created by rkuris on 12/23/14. + */ +public class StatsTest { + + @Mock + public StatsDClient client; + private final HostPortInterval hostPortInterval = new HostPortInterval("localhost"); + + private StatsdReporter sut; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testStatsNotZero() { + sut = new StatsdReporter(hostPortInterval, client); + byte[] bigMemoryChunk = new byte[102400000]; // 100mb should get us to 1% of heaps up to 10gb + bigMemoryChunk[4444] = 1; + sut.run(); + verify(client).gauge(Matchers.eq("jvm.memory.totalInitInMB"), AdditionalMatchers.gt(0)); + verify(client).gauge(Matchers.eq("jvm.memory.totalUsedInMB"), AdditionalMatchers.gt(0)); + verify(client).gauge(Matchers.eq("jvm.memory.heapUsedInMB"), AdditionalMatchers.gt(0)); + verify(client).gauge(Matchers.eq("jvm.memory.heapUsagePercent"), AdditionalMatchers.gt(0)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm.memory\\.memory_pool_usages\\..*Percent"), AdditionalMatchers.and(AdditionalMatchers.geq(0), AdditionalMatchers.lt(100))); + verify(client).gauge(Matchers.eq("jvm.fdUsagePercent"), AdditionalMatchers.gt(0)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyInt()); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), Matchers.anyInt()); + assertEquals(bigMemoryChunk[4444], 1); + + verifyNoMoreInteractions(client); + } + + @Test + public void testGCCounts() { + sut = new StatsdReporter(hostPortInterval, client); + sut.run(); + ArgumentCaptor countCaptor1 = ArgumentCaptor.forClass(Integer.class); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyInt()); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor1.capture()); + reset(client); + System.gc(); + sut.run(); + ArgumentCaptor countCaptor2 = ArgumentCaptor.forClass(Integer.class); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), AdditionalMatchers.gt(0)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor2.capture()); + assertNotEquals(countCaptor1.getAllValues(), countCaptor2.getAllValues()); + } +} From 293eef09da0a649d93776e2a71e42daef2b1de6d Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Mon, 29 Dec 2014 10:34:36 -0800 Subject: [PATCH 03/11] Store binaries in bintray --- README.md | 12 +++++++----- pom.xml | 7 +++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 5b4eb2b..72bbc14 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,7 @@ -Under Siege -=============== - Cassandra Statsd Reporting Tool --------------------------------- +=============================== + +Thanks to UnderSiege for getting this project off the ground. REQUIREMENTS ---------------- @@ -19,6 +18,10 @@ make sure pom.xml has 2.2.0. `mvn package` +Alternatively, grab the binary from bintray: + +`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.0/agent-1.0.jar -o agent-1.0.jar` + INSTALL ---------------- @@ -41,7 +44,6 @@ can be omitted. IPV6 is also supported with the following syntax: `[2001:db8::1]:8888@300` which means connect to 2001:db8::1 on port 8888 every 300 seconds. - REPORTING ---------------- A log message will be added to the system.log at startup to diff --git a/pom.xml b/pom.xml index 1a7a1c7..bfe93dc 100644 --- a/pom.xml +++ b/pom.xml @@ -66,4 +66,11 @@ + + + bintray-lookout-systems-cassandra-statsd-agent + lookout-systems-cassandra-statsd-agent + https://api.bintray.com/maven/lookout/systems/cassandra-statsd-agent + + From 2ae09c23e8b8be05f3237c0222d00cf5c16de9f7 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Tue, 17 Feb 2015 11:10:12 -0800 Subject: [PATCH 04/11] Fix gauges, use longs, handle maps --- README.md | 6 ++--- pom.xml | 4 ++-- .../lookout/metrics/agent/StatsdReporter.java | 17 +++++++++---- .../lookout/metrics/agent/test/StatsTest.java | 24 +++++++++---------- 4 files changed, 30 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 72bbc14..47029ce 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ make sure pom.xml has 2.2.0. Alternatively, grab the binary from bintray: -`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.0/agent-1.0.jar -o agent-1.0.jar` +`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.1/agent-1.1.jar -o agent-1.1.jar` INSTALL ---------------- @@ -28,12 +28,12 @@ INSTALL Copy the statsd library from the .m2 folder to cassandra/lib. Add the following to your cassandra startup script: -Copy the agent-1.0.jar to a new directory cassandra/plugins +Copy the agent-1.1.jar to a new directory cassandra/plugins Change cassandra startup to add this agent. This can be done in a stock install by adding the following to /etc/default/cassandra: -`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.0.jar=localhost"` +`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.1.jar=localhost"` Note the '=localhost' at the end. This supports the following syntaxes: `hostname:port@interval` diff --git a/pom.xml b/pom.xml index bfe93dc..4593839 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.github.lookout.metrics agent - 1.0 + 1.1 jar cassandra-statsd-reporter @@ -43,7 +43,7 @@ com.timgroup java-statsd-client - 2.0.0 + 3.0.2 provided diff --git a/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java index 105ed64..1417e54 100644 --- a/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java +++ b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java @@ -19,8 +19,6 @@ package com.github.lookout.metrics.agent; - -import com.timgroup.statsd.NonBlockingStatsDClient; import com.timgroup.statsd.StatsDClient; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.*; @@ -153,7 +151,7 @@ public void processMeter(MetricName name, Metered meter, Long epoch) throws Exce @Override public void processCounter(MetricName name, Counter counter, Long epoch) throws Exception { - statsd.gauge(name.getName(), (int) counter.count()); + statsd.gauge(name.getName(), counter.count()); } @Override @@ -168,6 +166,17 @@ public void processTimer(MetricName name, Timer timer, Long context) throws Exce @Override public void processGauge(MetricName name, Gauge gauge, Long context) throws Exception { - statsd.gauge(name.getName(), gauge.hashCode()); + reportGaugeValue(name.getName(), gauge.value()); + } + private void reportGaugeValue(String name, Object gaugeValue) { + if (gaugeValue instanceof Long) { + statsd.gauge(name, ((Long) gaugeValue).longValue()); + } else if (gaugeValue instanceof Double) { + statsd.gauge(name, ((Double)gaugeValue).doubleValue()); + } else if (gaugeValue instanceof Map) { + for (Map.Entry entry: ((Map)gaugeValue).entrySet()) { + reportGaugeValue(name + "." + entry.getKey().toString(), entry.getValue()); + } + } } } diff --git a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java index c5fa045..eb403c6 100644 --- a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java +++ b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java @@ -34,14 +34,14 @@ public void testStatsNotZero() { byte[] bigMemoryChunk = new byte[102400000]; // 100mb should get us to 1% of heaps up to 10gb bigMemoryChunk[4444] = 1; sut.run(); - verify(client).gauge(Matchers.eq("jvm.memory.totalInitInMB"), AdditionalMatchers.gt(0)); - verify(client).gauge(Matchers.eq("jvm.memory.totalUsedInMB"), AdditionalMatchers.gt(0)); - verify(client).gauge(Matchers.eq("jvm.memory.heapUsedInMB"), AdditionalMatchers.gt(0)); - verify(client).gauge(Matchers.eq("jvm.memory.heapUsagePercent"), AdditionalMatchers.gt(0)); - verify(client, atLeastOnce()).gauge(Matchers.matches("jvm.memory\\.memory_pool_usages\\..*Percent"), AdditionalMatchers.and(AdditionalMatchers.geq(0), AdditionalMatchers.lt(100))); - verify(client).gauge(Matchers.eq("jvm.fdUsagePercent"), AdditionalMatchers.gt(0)); - verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyInt()); - verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), Matchers.anyInt()); + verify(client).gauge(Matchers.eq("jvm.memory.totalInitInMB"), AdditionalMatchers.gt(0L)); + verify(client).gauge(Matchers.eq("jvm.memory.totalUsedInMB"), AdditionalMatchers.gt(0L)); + verify(client).gauge(Matchers.eq("jvm.memory.heapUsedInMB"), AdditionalMatchers.gt(0L)); + verify(client).gauge(Matchers.eq("jvm.memory.heapUsagePercent"), AdditionalMatchers.gt(0L)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm.memory\\.memory_pool_usages\\..*Percent"), AdditionalMatchers.and(AdditionalMatchers.geq(0L), AdditionalMatchers.lt(100L))); + verify(client).gauge(Matchers.eq("jvm.fdUsagePercent"), AdditionalMatchers.gt(0L)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyLong()); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), Matchers.anyLong()); assertEquals(bigMemoryChunk[4444], 1); verifyNoMoreInteractions(client); @@ -51,14 +51,14 @@ public void testStatsNotZero() { public void testGCCounts() { sut = new StatsdReporter(hostPortInterval, client); sut.run(); - ArgumentCaptor countCaptor1 = ArgumentCaptor.forClass(Integer.class); - verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyInt()); + ArgumentCaptor countCaptor1 = ArgumentCaptor.forClass(Long.class); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyLong()); verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor1.capture()); reset(client); System.gc(); sut.run(); - ArgumentCaptor countCaptor2 = ArgumentCaptor.forClass(Integer.class); - verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), AdditionalMatchers.gt(0)); + ArgumentCaptor countCaptor2 = ArgumentCaptor.forClass(Long.class); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), AdditionalMatchers.gt(0L)); verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor2.capture()); assertNotEquals(countCaptor1.getAllValues(), countCaptor2.getAllValues()); } From 9f5ad5b12c1602bad56c1fe3ee91cb86c02e4449 Mon Sep 17 00:00:00 2001 From: Val Ishida Date: Thu, 16 Apr 2015 16:50:25 -0700 Subject: [PATCH 05/11] Fix the agent not starting if args are not supplied --- .../java/com/github/lookout/metrics/agent/ReportAgent.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java index f72a71c..38fc923 100644 --- a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java +++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java @@ -18,7 +18,8 @@ public static void premain(final String agentArgs, final Instrumentation inst) t host = "unknown-host"; } - for (final String reportingHostPort : agentArgs.split(",")) { + final String[] reportingHostPorts = (agentArgs != null) ? agentArgs.split(",") : new String[]{null}; + for (final String reportingHostPort : reportingHostPorts) { final HostPortInterval hostPortInterval = new HostPortInterval(reportingHostPort); final StatsDClient client = new NonBlockingStatsDClient(host, hostPortInterval.getHost(), hostPortInterval.getPort()); final StatsdReporter reporter = new StatsdReporter(hostPortInterval, client); From a7c11f08347ca991f6e11f4f79b13348f24b98fd Mon Sep 17 00:00:00 2001 From: Val Ishida Date: Fri, 17 Apr 2015 12:53:57 -0700 Subject: [PATCH 06/11] Add config for Travis CI --- .travis.yml | 1 + 1 file changed, 1 insertion(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..dff5f3a --- /dev/null +++ b/.travis.yml @@ -0,0 +1 @@ +language: java From 0d33bd34e998fd9a1c4d98c93ba37133012640cf Mon Sep 17 00:00:00 2001 From: Val Ishida Date: Fri, 17 Apr 2015 15:39:14 -0700 Subject: [PATCH 07/11] Test for jvm.fdUsagePercent gt 0 doesn't pass on CI --- .../java/com/github/lookout/metrics/agent/test/StatsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java index eb403c6..2b5d96d 100644 --- a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java +++ b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java @@ -39,7 +39,7 @@ public void testStatsNotZero() { verify(client).gauge(Matchers.eq("jvm.memory.heapUsedInMB"), AdditionalMatchers.gt(0L)); verify(client).gauge(Matchers.eq("jvm.memory.heapUsagePercent"), AdditionalMatchers.gt(0L)); verify(client, atLeastOnce()).gauge(Matchers.matches("jvm.memory\\.memory_pool_usages\\..*Percent"), AdditionalMatchers.and(AdditionalMatchers.geq(0L), AdditionalMatchers.lt(100L))); - verify(client).gauge(Matchers.eq("jvm.fdUsagePercent"), AdditionalMatchers.gt(0L)); + verify(client).gauge(Matchers.eq("jvm.fdUsagePercent"), AdditionalMatchers.geq(0L)); // gt failing on CI verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyLong()); verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), Matchers.anyLong()); assertEquals(bigMemoryChunk[4444], 1); From 2812875cdf195833ae55b12989de1e4065617bdd Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Tue, 26 May 2015 14:36:06 -0700 Subject: [PATCH 08/11] Upgrade to version 1.2 --- README.md | 6 +++--- pom.xml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 47029ce..e1b7d69 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ make sure pom.xml has 2.2.0. Alternatively, grab the binary from bintray: -`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.1/agent-1.1.jar -o agent-1.1.jar` +`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.2/agent-1.2.jar -o agent-1.2.jar` INSTALL ---------------- @@ -28,12 +28,12 @@ INSTALL Copy the statsd library from the .m2 folder to cassandra/lib. Add the following to your cassandra startup script: -Copy the agent-1.1.jar to a new directory cassandra/plugins +Copy the agent-1.2.jar to a new directory cassandra/plugins Change cassandra startup to add this agent. This can be done in a stock install by adding the following to /etc/default/cassandra: -`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.1.jar=localhost"` +`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.2.jar=localhost"` Note the '=localhost' at the end. This supports the following syntaxes: `hostname:port@interval` diff --git a/pom.xml b/pom.xml index 4593839..6b80e82 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.github.lookout.metrics agent - 1.1 + 1.2 jar cassandra-statsd-reporter From 6f4cffa085549c4a612e0d52d5f195cf3746f8f4 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Tue, 26 May 2015 14:42:56 -0700 Subject: [PATCH 09/11] Code cleanup --- .../github/lookout/metrics/agent/ReportAgent.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java index 38fc923..b99e917 100644 --- a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java +++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java @@ -10,7 +10,10 @@ import java.util.concurrent.TimeUnit; public class ReportAgent { - public static void premain(final String agentArgs, final Instrumentation inst) throws IOException { + + private static final long STARTUP_DELAY_MS = TimeUnit.SECONDS.toMillis(10); + + public static void premain(final String agentArgs, final Instrumentation inst) { String host; try { host = InetAddress.getLocalHost().getHostName(); @@ -27,12 +30,8 @@ public static void premain(final String agentArgs, final Instrumentation inst) t } } - public static void main(final String[] args) { - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // impossible - } + public static void main(final String[] args) throws InterruptedException { + Thread.sleep(STARTUP_DELAY_MS); } } From 8459f0ad846aee4fd076d6d8d2d8be69808814c3 Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Thu, 28 May 2015 16:50:55 -0700 Subject: [PATCH 10/11] Fetch some information from JMX This is a pretty big refactor. There are now three sources of data for logging to statsd: the JVM (now in JavaVMGenerator), yammer metrics (now in YammerMetricsGenerator) and JMX (see CassandraJMXGenerator). Unfortunately, the CassandraJMXGenerator has to know some of the internals of how Cassandra reports JMX. It's coded to be somewhat bulletproof, such that missing JMX variables won't stop the other found ones from being reported. Right now, we fetch: gossip.score., which help decide who is closer/faster for queries gossip.severity, which indicates how busy this node is self-reporting to others cfstats...ReadCount cfstats...WriteCount cfstats...RecentReadLatencyMicros cfstats...RecentWriteLatencyMicros cfstats...TombstonesPerSlice cfstats...estimatedKeys The last one is great for monitoring general trends, but of course don't rely on that number to be very accurate. Also supported is the currently-experimental PHI reporter, in PHI., coming to a Cassandra cluster near you soon. --- README.md | 30 ++++ .../lookout/metrics/agent/ReportAgent.java | 6 - .../lookout/metrics/agent/StatsdReporter.java | 149 +++--------------- .../generators/CassandraJMXGenerator.java | 148 +++++++++++++++++ .../agent/generators/JavaVMGenerator.java | 76 +++++++++ .../agent/generators/MetricGenerator.java | 10 ++ .../generators/YammerMetricsGenerator.java | 87 ++++++++++ .../lookout/metrics/agent/test/StatsTest.java | 25 +++ 8 files changed, 398 insertions(+), 133 deletions(-) create mode 100644 src/main/java/com/github/lookout/metrics/agent/generators/CassandraJMXGenerator.java create mode 100644 src/main/java/com/github/lookout/metrics/agent/generators/JavaVMGenerator.java create mode 100644 src/main/java/com/github/lookout/metrics/agent/generators/MetricGenerator.java create mode 100644 src/main/java/com/github/lookout/metrics/agent/generators/YammerMetricsGenerator.java diff --git a/README.md b/README.md index e1b7d69..9cc38f2 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,36 @@ confirm that everything is running, it looks like this: `INFO [metrics-statsd-thread-1] 2014-12-19 19:05:37,120 StatsdReporter.java:65 - Statsd reporting to host localhost port 8125 every 10 seconds` +WHAT GETS REPORTED +------------------ +Lots of stuff: + +* Gossip statistics: + gossip.score., which help decide who is closer/faster for queries + gossip.severity, which indicates how busy this node is self-reporting to others +* Per table statistics: + cfstats...ReadCount + cfstats...WriteCount + cfstats...RecentReadLatencyMicros + cfstats...RecentWriteLatencyMicros + cfstats...TombstonesPerSlice + cfstats...estimatedKeys + The last one is great for monitoring general trends, but of course don't + rely on that number to be very accurate. +* PHI reporter + Also supported is the currently-experimental PHI reporter, in PHI., + coming to a Cassandra cluster near you soon. +* JVM GC metrics +* Anything else registered with yammer-metrics + +DEBUGGING +---------------- +Not working? There's a lot of tracing and debugging available. Change the +log4j-server.properties and add something like this to get extremely detailed +traces of what it's doing in the server.log. + +`log4j.logger.com.github.lookout.metrics.agent.generators=TRACE` + TODO ---------------- Errors that happen during startup are not reported as well as they should diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java index b99e917..c2f4a63 100644 --- a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java +++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java @@ -11,8 +11,6 @@ public class ReportAgent { - private static final long STARTUP_DELAY_MS = TimeUnit.SECONDS.toMillis(10); - public static void premain(final String agentArgs, final Instrumentation inst) { String host; try { @@ -29,9 +27,5 @@ public static void premain(final String agentArgs, final Instrumentation inst) { reporter.start(hostPortInterval.getInterval(), TimeUnit.SECONDS); } } - - public static void main(final String[] args) throws InterruptedException { - Thread.sleep(STARTUP_DELAY_MS); - } } diff --git a/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java index 1417e54..6099c3b 100644 --- a/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java +++ b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java @@ -19,6 +19,10 @@ package com.github.lookout.metrics.agent; +import com.github.lookout.metrics.agent.generators.CassandraJMXGenerator; +import com.github.lookout.metrics.agent.generators.JavaVMGenerator; +import com.github.lookout.metrics.agent.generators.MetricGenerator; +import com.github.lookout.metrics.agent.generators.YammerMetricsGenerator; import com.timgroup.statsd.StatsDClient; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.*; @@ -26,36 +30,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; +import java.util.*; import java.util.concurrent.TimeUnit; -public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor { +public class StatsdReporter extends AbstractPollingReporter { private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class); - protected final VirtualMachineMetrics vm; + private final StatsDClient statsd; - protected final Clock clock; - private static final MetricPredicate predicate = MetricPredicate.ALL; private boolean reportedStartup = false; private final HostPortInterval hostPortInterval; - private final HashMap previous_run_times; - private final HashMap previous_run_counts; + + private final Set generators = new HashSet<>(); public StatsdReporter(final HostPortInterval hostPortInterval, final StatsDClient statsd) { super(Metrics.defaultRegistry(), "statsd"); this.hostPortInterval = hostPortInterval; this.statsd = statsd; - vm = VirtualMachineMetrics.getInstance(); - clock = Clock.defaultClock(); - previous_run_times = new HashMap(); - previous_run_counts = new HashMap(); - } + // This really should be done with an injection framework, but that's too heavy for this + generators.add(new CassandraJMXGenerator()); + generators.add(new JavaVMGenerator()); + generators.add(new YammerMetricsGenerator()); + } @Override public void run() { @@ -63,120 +62,16 @@ public void run() { LOG.info("Statsd reporting to {}", hostPortInterval); reportedStartup = true; } - try { - final long epoch = clock.time() / 1000; - printMetrics(epoch); - } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Error writing to statsd", e); - } else { - LOG.warn("Error writing to statsd: {}", e.getMessage()); - } - } - } - - private int bytesToMB(double bytes) { - return (int)(bytes/(1024*1024)); - } - private int doubleToPct(double pct) { - return (int) Math.round(100 * pct); - } - - protected void printMetrics(long epoch) { - // Memory - statsd.gauge("jvm.memory.totalInitInMB", bytesToMB(vm.totalInit())); - statsd.gauge("jvm.memory.totalUsedInMB", bytesToMB(vm.totalUsed())); - statsd.gauge("jvm.memory.heapUsedInMB", bytesToMB(vm.heapUsed())); - - statsd.gauge("jvm.memory.heapUsagePercent", doubleToPct(vm.heapUsage())); - - for (Map.Entry pool : vm.memoryPoolUsage().entrySet()) { - statsd.gauge("jvm.memory.memory_pool_usages." + pool.getKey() + "Percent", doubleToPct(pool.getValue())); - } - - statsd.gauge("jvm.fdUsagePercent", doubleToPct(vm.fileDescriptorUsage())); - - for (Map.Entry entry : vm.garbageCollectors().entrySet()) { - // we only care about the delta times for the GC time and GC runs - - final String name = "jvm.gc." + entry.getKey(); - String stat_name_time = name + ".timeInMS"; - - int total_run_time = (int) entry.getValue().getTime(TimeUnit.MILLISECONDS); - Integer previous_total_run_time = previous_run_times.get(stat_name_time); - - if (previous_total_run_time == null) { - previous_total_run_time = 0; - } - int delta_run_time = total_run_time - previous_total_run_time; - previous_run_times.put(stat_name_time, total_run_time); - - statsd.gauge(stat_name_time, delta_run_time); - String stat_run_count = name + ".runs"; - - int total_runs = (int) entry.getValue().getRuns(); - - Integer previous_total_runs = previous_run_counts.get(stat_run_count); - - if (previous_total_runs == null) { - previous_total_runs = 0; - } - - statsd.gauge(stat_run_count, total_runs - previous_total_runs); - previous_run_counts.put(stat_run_count, total_runs); - } - - final Set>> entries = getMetricsRegistry().groupedMetrics(predicate).entrySet(); - - for (final Map.Entry> entry : entries) { - for (final Map.Entry subEntry : entry.getValue().entrySet()) { - - final Metric metric = subEntry.getValue(); - - if (metric != null) { - try { - metric.processWith(this, subEntry.getKey(), epoch); - } catch (final Exception exception) { - LOG.error("Error processing key {}", subEntry.getKey(), exception); - } + for (MetricGenerator generator : generators) { + try { + generator.generate(statsd); + } catch (RuntimeException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error writing to statsd", e); + } else { + LOG.warn("Error writing to statsd: {}", e.getMessage()); } } } } - - @Override - public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception { - LOG.debug("Meter {} {} skipped", name.getName(), meter.count()); - } - - @Override - public void processCounter(MetricName name, Counter counter, Long epoch) throws Exception { - statsd.gauge(name.getName(), counter.count()); - } - - @Override - public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception { - LOG.debug("Histogram {} mean {} skipped", name.getName(), histogram.mean()); - } - - @Override - public void processTimer(MetricName name, Timer timer, Long context) throws Exception { - LOG.debug("Timer {} skipped", name.getName()); - } - - @Override - public void processGauge(MetricName name, Gauge gauge, Long context) throws Exception { - reportGaugeValue(name.getName(), gauge.value()); - } - private void reportGaugeValue(String name, Object gaugeValue) { - if (gaugeValue instanceof Long) { - statsd.gauge(name, ((Long) gaugeValue).longValue()); - } else if (gaugeValue instanceof Double) { - statsd.gauge(name, ((Double)gaugeValue).doubleValue()); - } else if (gaugeValue instanceof Map) { - for (Map.Entry entry: ((Map)gaugeValue).entrySet()) { - reportGaugeValue(name + "." + entry.getKey().toString(), entry.getValue()); - } - } - } } diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/CassandraJMXGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/CassandraJMXGenerator.java new file mode 100644 index 0000000..7e57ed3 --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/generators/CassandraJMXGenerator.java @@ -0,0 +1,148 @@ +package com.github.lookout.metrics.agent.generators; + +import com.timgroup.statsd.StatsDClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.*; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.util.Map; +import java.util.Set; + +/** + * Extracts the relevant JMX values from a running cassandra instance. + * This code doesn't report anything when it can't find the JMX stuff + * we care about. + */ +public class CassandraJMXGenerator implements MetricGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraJMXGenerator.class); + private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + private boolean phiAvailable = true; + + public CassandraJMXGenerator() { + } + + private void gaugeAndLog(StatsDClient statsDClient, String gaugeName, Double value) { + statsDClient.gauge(gaugeName, value); + LOG.debug("Reporting {} as {}", gaugeName, value); + } + private void gaugeAndLog(StatsDClient statsDClient, String gaugeName, Long value) { + statsDClient.gauge(gaugeName, value); + LOG.debug("Reporting {} as {}", gaugeName, value); + } + + /** + * Construct the base gauge name from the JMX object name + * The gauge is reported as cfstats.KEYSPACE.COLUMNFAMILY. + * There is a trailing dot, so the actual gauge name can be + * constructed by appending the item name to the end + * + * @param name The JMX ObjectName + * @return A string in the format cfstats.KEYSPACE.COLUMNFAMILY + */ + private String gaugeName(ObjectName name) { + String ks = name.getKeyProperty("keyspace"); + String cf = name.getKeyProperty("columnfamily"); + return String.format("cfstats.%s.%s.", ks, cf); + } + + /** + * Some generated gauge names have things like slashes and dots in them. + * This will replace all non-alnum characters with an underscore. + * @param name + * @return + */ + private String quoteGaugeName(String name) { + return name.replaceAll("\\W", "_"); + } + + /** + * These are the per-column-family attributes that we will report to statsd + */ + public String[] cfAttributeList = new String[] + { "ReadCount", + "WriteCount", + "RecentReadLatencyMicros", + "RecentWriteLatencyMicros", + "TombstonesPerSlice" + }; + + + @Override + public void generate(StatsDClient statsDClient) { + // Step 1: report PHI values from FailureDetector if available + if (phiAvailable) { + try { + TabularData table = (TabularData) mBeanServer.getAttribute(new ObjectName("org.apache.cassandra.net:type=FailureDetector"), "PhiValues"); + for (Object rawData : table.values()) { + final CompositeData data = (CompositeData) rawData; + final String gaugeName = "phi." + quoteGaugeName(data.get("Endpoint").toString()); + gaugeAndLog(statsDClient, gaugeName, (Double) data.get("PHI")); + } + + } catch (Exception e) { + LOG.trace("Exception reading PhiValues, skipping", e); + phiAvailable = false; + } + } + + // Step 2: Gossiper information + try { + ObjectName snitchName = new ObjectName("org.apache.cassandra.db:type=DynamicEndpointSnitch"); + Map gossipScores = (Map) mBeanServer.getAttribute(snitchName, "Scores"); + for (Map.Entry entry : gossipScores.entrySet()) { + final String gaugeName = "gossip.score." + quoteGaugeName(entry.getKey().toString()); + gaugeAndLog(statsDClient, gaugeName, entry.getValue()); + } + Double severity = (Double) mBeanServer.getAttribute(snitchName, "Severity"); + gaugeAndLog(statsDClient, "gossip.severity", severity); + } catch (Exception e) { + LOG.debug("Skipping gossip info", e); + } + + // Step 3: per cf statistics + Set names; + try { + names = mBeanServer.queryNames(new ObjectName("org.apache.cassandra.db:columnfamily=*,keyspace=*,type=ColumnFamilies"), null); + } catch (final MalformedObjectNameException e) { + throw new RuntimeException(e); // impossible + } + for (ObjectName name : names) { + Long keys = null; + String gaugeName = gaugeName(name); + try { + keys = (Long) mBeanServer.invoke(name, "estimateKeys", new Object[]{}, new String[]{}); + gaugeAndLog(statsDClient, gaugeName + "estimatedKeys", keys); + } catch (final InstanceNotFoundException|MBeanException|ReflectionException e) { + LOG.debug("Failed to invoke estimateKeys method on {} (ignored)", name, e); + } + for (String attr : cfAttributeList) { + try { + Object value = null; + try { + value = mBeanServer.getAttribute(name, attr); + } catch (final MBeanException|InstanceNotFoundException|ReflectionException e) { + LOG.debug("Couldn't fetch attribute {} from {} (ignored)", new Object[]{attr, name, e}); + } + final String fullyQualifiedGaugeName = gaugeName + attr; + if (value instanceof Long) { + statsDClient.gauge(fullyQualifiedGaugeName, (Long) value); + } else if (value instanceof Double) { + statsDClient.gauge(fullyQualifiedGaugeName, (Double) value); + } else if (LOG.isWarnEnabled()){ + LOG.warn("Type {} for attribute {} of {} is not supported", new Object[]{value.getClass(), attr, name}); + continue; + } + LOG.trace("Reporting {} value {}", fullyQualifiedGaugeName, value); + } catch (final AttributeNotFoundException e) { + // don't report missing attributes + LOG.debug("Missing attribute {} for {}", attr, name); + } + } + } + } +} diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/JavaVMGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/JavaVMGenerator.java new file mode 100644 index 0000000..344a1d3 --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/generators/JavaVMGenerator.java @@ -0,0 +1,76 @@ +package com.github.lookout.metrics.agent.generators; + +import com.timgroup.statsd.StatsDClient; +import com.yammer.metrics.core.VirtualMachineMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Created by rkuris on 5/28/15. + */ +public class JavaVMGenerator implements MetricGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraJMXGenerator.class); + + protected final VirtualMachineMetrics vm = VirtualMachineMetrics.getInstance(); + private final HashMap previous_run_times = new HashMap(); + private final HashMap previous_run_counts = new HashMap(); + + private int bytesToMB(double bytes) { + return (int)(bytes/(1024*1024)); + } + private int doubleToPct(double pct) { + return (int) Math.round(100 * pct); + } + + @Override + public void generate(StatsDClient statsDClient) { + statsDClient.gauge("jvm.memory.totalInitInMB", bytesToMB(vm.totalInit())); + statsDClient.gauge("jvm.memory.totalUsedInMB", bytesToMB(vm.totalUsed())); + statsDClient.gauge("jvm.memory.heapUsedInMB", bytesToMB(vm.heapUsed())); + + statsDClient.gauge("jvm.memory.heapUsagePercent", doubleToPct(vm.heapUsage())); + + for (Map.Entry pool : vm.memoryPoolUsage().entrySet()) { + statsDClient.gauge("jvm.memory.memory_pool_usages." + pool.getKey() + "Percent", doubleToPct(pool.getValue())); + } + + statsDClient.gauge("jvm.fdUsagePercent", doubleToPct(vm.fileDescriptorUsage())); + + for (Map.Entry entry : vm.garbageCollectors().entrySet()) { + // we only care about the delta times for the GC time and GC runs + + final String name = "jvm.gc." + entry.getKey(); + String stat_name_time = name + ".timeInMS"; + + int total_run_time = (int) entry.getValue().getTime(TimeUnit.MILLISECONDS); + Integer previous_total_run_time = previous_run_times.get(stat_name_time); + + if (previous_total_run_time == null) { + previous_total_run_time = 0; + } + int delta_run_time = total_run_time - previous_total_run_time; + previous_run_times.put(stat_name_time, total_run_time); + + statsDClient.gauge(stat_name_time, delta_run_time); + String stat_run_count = name + ".runs"; + + int total_runs = (int) entry.getValue().getRuns(); + + Integer previous_total_runs = previous_run_counts.get(stat_run_count); + + if (previous_total_runs == null) { + previous_total_runs = 0; + } + + statsDClient.gauge(stat_run_count, total_runs - previous_total_runs); + LOG.debug("Reporting {} as {}", stat_run_count, total_runs - previous_total_runs); + previous_run_counts.put(stat_run_count, total_runs); + + } + } +} diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/MetricGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/MetricGenerator.java new file mode 100644 index 0000000..b1c952d --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/generators/MetricGenerator.java @@ -0,0 +1,10 @@ +package com.github.lookout.metrics.agent.generators; + +import com.timgroup.statsd.StatsDClient; + +/** + * Created by rkuris on 5/28/15. + */ +public interface MetricGenerator { + void generate(StatsDClient statsDClient); +} diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/YammerMetricsGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/YammerMetricsGenerator.java new file mode 100644 index 0000000..db7f245 --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/generators/YammerMetricsGenerator.java @@ -0,0 +1,87 @@ +package com.github.lookout.metrics.agent.generators; + +import com.timgroup.statsd.StatsDClient; +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; + +/** + * Created by rkuris on 5/28/15. + */ +public class YammerMetricsGenerator implements MetricProcessor, MetricGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(YammerMetricsGenerator.class); + private MetricsRegistry registry = Metrics.defaultRegistry(); + private StatsDClient client; + private static final MetricPredicate predicate = MetricPredicate.ALL; + protected final Clock clock = Clock.defaultClock(); + + @Override + public void generate(StatsDClient statsDClient) { + // get all the registered metrics via dropwizard + this.client = statsDClient; + final long epoch = TimeUnit.MILLISECONDS.toSeconds(clock.time()); + final Set>> entries = registry.groupedMetrics(predicate).entrySet(); + + for (final Map.Entry> entry : entries) { + for (final Map.Entry subEntry : entry.getValue().entrySet()) { + + final Metric metric = subEntry.getValue(); + + if (metric != null) { + try { + metric.processWith(this, subEntry.getKey(), epoch); + } catch (final Exception exception) { + LOG.error("Error processing key {}", subEntry.getKey(), exception); + } + } + } + } + + } + @Override + public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception { + LOG.debug("Meter {} {} skipped", name.getName(), meter.count()); + } + + @Override + public void processCounter(MetricName name, Counter counter, Long epoch) throws Exception { + client.gauge(name.getName(), counter.count()); + } + + @Override + public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception { + LOG.debug("Histogram {} mean {} skipped", name.getName(), histogram.mean()); + } + + @Override + public void processTimer(MetricName name, Timer timer, Long context) throws Exception { + LOG.debug("Timer {} skipped", name.getName()); + } + + @Override + public void processGauge(MetricName name, Gauge gauge, Long context) throws Exception { + reportGaugeValue(name.getName(), gauge.value()); + } + private void reportGaugeValue(String name, Object gaugeValue) { + if (gaugeValue instanceof Long) { + long value = ((Long) gaugeValue).longValue(); + LOG.debug("Reporting {} as {}", name, value); + client.gauge(name, value); + } else if (gaugeValue instanceof Double) { + double value = ((Double)gaugeValue).doubleValue(); + LOG.debug("Reporting {} as {}", name, value); + client.gauge(name, value); + } else if (gaugeValue instanceof Map) { + for (Map.Entry entry: ((Map)gaugeValue).entrySet()) { + reportGaugeValue(name + "." + entry.getKey().toString(), entry.getValue()); + } + } + } +} diff --git a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java index 2b5d96d..8635d24 100644 --- a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java +++ b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java @@ -4,10 +4,14 @@ import com.github.lookout.metrics.agent.StatsdReporter; import com.timgroup.statsd.StatsDClient; import org.mockito.*; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +import javax.management.*; +import java.lang.management.ManagementFactory; + import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; @@ -19,6 +23,10 @@ public class StatsTest { @Mock public StatsDClient client; + + @Mock + public DynamicMBean mockMBean; + private final HostPortInterval hostPortInterval = new HostPortInterval("localhost"); private StatsdReporter sut; @@ -62,4 +70,21 @@ public void testGCCounts() { verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor2.capture()); assertNotEquals(countCaptor1.getAllValues(), countCaptor2.getAllValues()); } + @AfterClass + public void testMBean() throws MalformedObjectNameException, NotCompliantMBeanException, InstanceAlreadyExistsException, MBeanException, ReflectionException, AttributeNotFoundException { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + when(mockMBean.getMBeanInfo()).thenReturn(new MBeanInfo("test", "", new MBeanAttributeInfo[]{ new MBeanAttributeInfo("ReadCount", "Long", "Read count", true, false, false)}, new MBeanConstructorInfo[]{}, new MBeanOperationInfo[]{ + + }, new MBeanNotificationInfo[]{})); + when(mockMBean.invoke(Matchers.eq("estimateKeys"), Matchers.any(Object[].class), Matchers.any(String[].class))).thenReturn(1L); + when(mockMBean.getAttribute(Matchers.eq("ReadCount"))).thenReturn(88888L); + reset(client); + mBeanServer.registerMBean(mockMBean, new ObjectName("org.apache.cassandra.db:columnfamily=testcf,keyspace=testks,type=ColumnFamilies")); + sut = new StatsdReporter(hostPortInterval, client); + sut.run(); + verify(client,atLeastOnce()).gauge(Matchers.matches("jvm\\..*"), Matchers.anyLong()); + verify(client,times(1)).gauge(Matchers.eq("cfstats.testks.testcf.estimatedKeys"), Matchers.eq(1L)); + verify(client,times(1)).gauge(Matchers.eq("cfstats.testks.testcf.ReadCount"), Matchers.eq(88888L)); + verifyNoMoreInteractions(client); + } } From a9d47cbf3645f7ed474a92986ff4c507dbda879a Mon Sep 17 00:00:00 2001 From: Ron Kuris Date: Mon, 24 Aug 2015 09:45:52 -0700 Subject: [PATCH 11/11] Remove the hostname from the prefix This change just hardcodes the prefix to 'cassandra'. We should make the prefix configurable based on either a passed in configuration string or the cluster name, but that's not as easy or urgent as this change. --- pom.xml | 2 +- .../com/github/lookout/metrics/agent/ReportAgent.java | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 6b80e82..a70585d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.github.lookout.metrics agent - 1.2 + 1.3 jar cassandra-statsd-reporter diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java index c2f4a63..1be5a42 100644 --- a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java +++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java @@ -12,17 +12,12 @@ public class ReportAgent { public static void premain(final String agentArgs, final Instrumentation inst) { - String host; - try { - host = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - host = "unknown-host"; - } + final String prefix = "cassandra"; final String[] reportingHostPorts = (agentArgs != null) ? agentArgs.split(",") : new String[]{null}; for (final String reportingHostPort : reportingHostPorts) { final HostPortInterval hostPortInterval = new HostPortInterval(reportingHostPort); - final StatsDClient client = new NonBlockingStatsDClient(host, hostPortInterval.getHost(), hostPortInterval.getPort()); + final StatsDClient client = new NonBlockingStatsDClient(prefix, hostPortInterval.getHost(), hostPortInterval.getPort()); final StatsdReporter reporter = new StatsdReporter(hostPortInterval, client); reporter.start(hostPortInterval.getInterval(), TimeUnit.SECONDS); }