diff --git a/.gitignore b/.gitignore index f0b438d..cb8cec8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ target/ node_modules/ out/ *.class +.idea +*.iml diff --git a/README.md b/README.md index daa8537..72bbc14 100644 --- a/README.md +++ b/README.md @@ -1,36 +1,59 @@ -Under Siege -=============== - Cassandra Statsd Reporting Tool --------------------------------- +=============================== + +Thanks to UnderSiege for getting this project off the ground. REQUIREMENTS ---------------- * Maven -* Java +* Java 7 BUILD ---------------- +Check which metrics library cassandra is using by looking in +cassandra/lib/metrics-core*, verify that pom.xml points to the +same exact version. For example, if you have metrics-core-2.2.0.jar, +make sure pom.xml has 2.2.0. + `mvn package` -It may be necessary to build the jar against the same version of the metrics library being used in Cassandra. +Alternatively, grab the binary from bintray: -INSTAL +`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.0/agent-1.0.jar -o agent-1.0.jar` + +INSTALL ---------------- -Toss the appropriate version of statsd library (hopefully in your .m2 folder by now) in your cassandra/lib/ directory. +Copy the statsd library from the .m2 folder to cassandra/lib. Add the following to your cassandra startup script: -`JVM_OPTS="$JVM_OPTS -javaagent:/path/to/built.jar=localhost"` - -Or whatever path you've decided to put your agent. - -Note the '=localhost' at the end. You should change this to your statsd instance. - +Copy the agent-1.0.jar to a new directory cassandra/plugins +Change cassandra startup to add this agent. This can be done in +a stock install by adding the following to /etc/default/cassandra: +`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.0.jar=localhost"` +Note the '=localhost' at the end. This supports the following syntaxes: +`hostname:port@interval` +For example: +`your.statsd.host.com:9999@60` +The default port is 8125 and the default interval is 10 (seconds); these +can be omitted. IPV6 is also supported with the following syntax: +`[2001:db8::1]:8888@300` +which means connect to 2001:db8::1 on port 8888 every 300 seconds. +REPORTING +---------------- +A log message will be added to the system.log at startup to +confirm that everything is running, it looks like this: +`INFO [metrics-statsd-thread-1] 2014-12-19 19:05:37,120 StatsdReporter.java:65 - Statsd reporting to host localhost port 8125 every 10 seconds` +TODO +---------------- +Errors that happen during startup are not reported as well as they should +be, mostly because the logging system is not active during startup. The log +message is only generated when the actual metrics collector has run for the +first time. diff --git a/pom.xml b/pom.xml index 7771122..bfe93dc 100644 --- a/pom.xml +++ b/pom.xml @@ -1,58 +1,76 @@ - 4.0.0 - - undersiege - undersiege - - 0.1 - jar - - cassandra-statsd-reporter-smm - http://maven.apache.org - - - UTF-8 - 1.6 - 1.6 - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - src/main/java/META-INF/MANIFEST.MF - - - - - - - - - - junit - junit - 3.8.1 - test - - - - com.yammer.metrics - metrics-core - 2.2.0 - - - - com.timgroup - java-statsd-client - 2.0.0 - - - - - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + + com.github.lookout.metrics + agent + + 1.0 + jar + + cassandra-statsd-reporter + + + UTF-8 + 1.7 + 1.7 + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + src/main/java/META-INF/MANIFEST.MF + + + + + + + + + + com.yammer.metrics + metrics-core + 2.2.0 + provided + + + + com.timgroup + java-statsd-client + 2.0.0 + provided + + + org.testng + testng + 6.8.8 + test + + + org.mockito + mockito-core + 1.10.17 + test + + + org.slf4j + slf4j-simple + 1.7.9 + test + + + + + + bintray-lookout-systems-cassandra-statsd-agent + lookout-systems-cassandra-statsd-agent + https://api.bintray.com/maven/lookout/systems/cassandra-statsd-agent + + diff --git a/src/main/java/META-INF/MANIFEST.MF b/src/main/java/META-INF/MANIFEST.MF index d629a3f..c3b27a0 100644 --- a/src/main/java/META-INF/MANIFEST.MF +++ b/src/main/java/META-INF/MANIFEST.MF @@ -1,2 +1,2 @@ -Manifest-Version: 1.0 -Premain-Class: com.shift.undersiege.ReportAgent +Manifest-Version: 1.0 +Premain-Class: com.github.lookout.metrics.agent.ReportAgent diff --git a/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java b/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java new file mode 100644 index 0000000..859af5d --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java @@ -0,0 +1,68 @@ +package com.github.lookout.metrics.agent; + +/** + * Created by rkuris on 12/19/14. + */ +public class HostPortInterval { + public static final String DEFAULT_HOST = "localhost"; + public static final int DEFAULT_PORT = 8125; + public static final int DEFAULT_INTERVAL = 10; + + private final String host; + private final int port; + private final int interval; + + public HostPortInterval(final String hostPortInterval) { + if (hostPortInterval == null || hostPortInterval.isEmpty()) { + this.host = DEFAULT_HOST; + this.port = DEFAULT_PORT; + this.interval = DEFAULT_INTERVAL; + return; + } + int intervalOffset = hostPortInterval.lastIndexOf('@'); + final String hostPort; + if (intervalOffset == -1) { + this.interval = DEFAULT_INTERVAL; + hostPort = hostPortInterval; + } else { + this.interval = Integer.parseInt(hostPortInterval.substring(intervalOffset + 1)); + hostPort = hostPortInterval.substring(0, intervalOffset); + } + int colonOffset = hostPort.lastIndexOf(':'); + if (colonOffset == -1 || hostPort.endsWith("]")) { + this.host = stripBrackets(hostPort); + this.port = DEFAULT_PORT; + } else { + final String hostPart = hostPort.substring(0, colonOffset); + final String portPart = hostPort.substring(colonOffset + 1); + this.host = stripBrackets(hostPart); + this.port = Integer.parseInt(portPart); + } + + } + + private String stripBrackets(final String source) { + int sourceLength = source.length(); + if (sourceLength > 2 && source.charAt(0) == '[' && source.charAt(sourceLength - 1) == ']') { + return source.substring(1, sourceLength - 1); + } + return source; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public int getInterval() { + return interval; + } + + @Override + public String toString() { + return String.format("host %s port %d every %d seconds", host, port, interval); + } +} diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java new file mode 100644 index 0000000..f72a71c --- /dev/null +++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java @@ -0,0 +1,37 @@ +package com.github.lookout.metrics.agent; + +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; + +import java.io.IOException; +import java.lang.instrument.Instrumentation; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; + +public class ReportAgent { + public static void premain(final String agentArgs, final Instrumentation inst) throws IOException { + String host; + try { + host = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + host = "unknown-host"; + } + + for (final String reportingHostPort : agentArgs.split(",")) { + final HostPortInterval hostPortInterval = new HostPortInterval(reportingHostPort); + final StatsDClient client = new NonBlockingStatsDClient(host, hostPortInterval.getHost(), hostPortInterval.getPort()); + final StatsdReporter reporter = new StatsdReporter(hostPortInterval, client); + reporter.start(hostPortInterval.getInterval(), TimeUnit.SECONDS); + } + } + + public static void main(final String[] args) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + // impossible + } + } +} + diff --git a/src/main/java/com/shift/undersiege/StatsdReporter.java b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java similarity index 64% rename from src/main/java/com/shift/undersiege/StatsdReporter.java rename to src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java index 2b56654..105ed64 100644 --- a/src/main/java/com/shift/undersiege/StatsdReporter.java +++ b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java @@ -14,10 +14,10 @@ * limitations under the License. */ /** - Edited by Jon Haddad at SHIFT to work with + Edited by Jon Haddad at SHIFT to work with */ -package com.shift.undersiege; +package com.github.lookout.metrics.agent; import com.timgroup.statsd.NonBlockingStatsDClient; @@ -28,9 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.TimeUnit; @@ -40,17 +39,19 @@ public class StatsdReporter extends AbstractPollingReporter implements MetricPro private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class); protected final VirtualMachineMetrics vm; - private StatsDClient statsd; - private boolean printVMMetrics = false; + private final StatsDClient statsd; protected final Clock clock; - private MetricPredicate predicate = MetricPredicate.ALL; + private static final MetricPredicate predicate = MetricPredicate.ALL; - private HashMap previous_run_times; - private HashMap previous_run_counts; + private boolean reportedStartup = false; + private final HostPortInterval hostPortInterval; + private final HashMap previous_run_times; + private final HashMap previous_run_counts; - public StatsdReporter(String host, int port, String prefix) throws IOException { + public StatsdReporter(final HostPortInterval hostPortInterval, final StatsDClient statsd) { super(Metrics.defaultRegistry(), "statsd"); - statsd = new NonBlockingStatsDClient(prefix, host, port); + this.hostPortInterval = hostPortInterval; + this.statsd = statsd; vm = VirtualMachineMetrics.getInstance(); clock = Clock.defaultClock(); previous_run_times = new HashMap(); @@ -60,15 +61,13 @@ public StatsdReporter(String host, int port, String prefix) throws IOException { @Override public void run() { + if (!reportedStartup || LOG.isDebugEnabled()) { + LOG.info("Statsd reporting to {}", hostPortInterval); + reportedStartup = true; + } try { final long epoch = clock.time() / 1000; - if (this.printVMMetrics) { - printVmMetrics(epoch); - } - printRegularMetrics(epoch); - - // Send UDP data - + printMetrics(epoch); } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Error writing to statsd", e); @@ -78,22 +77,28 @@ public void run() { } } - protected void printVmMetrics(long epoch) { + private int bytesToMB(double bytes) { + return (int)(bytes/(1024*1024)); + } + private int doubleToPct(double pct) { + return (int) Math.round(100 * pct); + } + + protected void printMetrics(long epoch) { // Memory - int div = 1048576; - statsd.gauge("jvm.memory.totalInitInMB", (int) vm.totalInit() / div); - statsd.gauge("jvm.memory.totalUsedInMB", (int) vm.totalUsed() / div); - statsd.gauge("jvm.memory.heapUsedInMB", (int) vm.heapUsed() / div); + statsd.gauge("jvm.memory.totalInitInMB", bytesToMB(vm.totalInit())); + statsd.gauge("jvm.memory.totalUsedInMB", bytesToMB(vm.totalUsed())); + statsd.gauge("jvm.memory.heapUsedInMB", bytesToMB(vm.heapUsed())); - statsd.gauge("jvm.memory.heapUsageInMB", (int) vm.heapUsage() / div); + statsd.gauge("jvm.memory.heapUsagePercent", doubleToPct(vm.heapUsage())); for (Map.Entry pool : vm.memoryPoolUsage().entrySet()) { - statsd.gauge("jvm.memory.memory_pool_usages." + pool.getKey(), pool.getValue().intValue() / div); + statsd.gauge("jvm.memory.memory_pool_usages." + pool.getKey() + "Percent", doubleToPct(pool.getValue())); } - statsd.gauge("jvm.fd_usage", (int) vm.fileDescriptorUsage()); + statsd.gauge("jvm.fdUsagePercent", doubleToPct(vm.fileDescriptorUsage())); - for (Map.Entry entry : vm.garbageCollectors().entrySet()) { + for (Map.Entry entry : vm.garbageCollectors().entrySet()) { // we only care about the delta times for the GC time and GC runs final String name = "jvm.gc." + entry.getKey(); @@ -115,31 +120,26 @@ protected void printVmMetrics(long epoch) { Integer previous_total_runs = previous_run_counts.get(stat_run_count); - if(previous_total_runs == null) { + if (previous_total_runs == null) { previous_total_runs = 0; } statsd.gauge(stat_run_count, total_runs - previous_total_runs); previous_run_counts.put(stat_run_count, total_runs); } - } - - protected void printRegularMetrics(long epoch) { - printVmMetrics(epoch); - Set>> entries = getMetricsRegistry().groupedMetrics(predicate).entrySet(); + final Set>> entries = getMetricsRegistry().groupedMetrics(predicate).entrySet(); - for (Map.Entry> entry : entries) { - for (Map.Entry subEntry : entry.getValue().entrySet()) { + for (final Map.Entry> entry : entries) { + for (final Map.Entry subEntry : entry.getValue().entrySet()) { final Metric metric = subEntry.getValue(); if (metric != null) { try { metric.processWith(this, subEntry.getKey(), epoch); - //statsd.gauge(subEntry.getKey(), subEntry.getValue().); - } catch (Exception ignored) { - LOG.error("Error printing regular com.shift.undersiege:", ignored); + } catch (final Exception exception) { + LOG.error("Error processing key {}", subEntry.getKey(), exception); } } } @@ -148,9 +148,7 @@ protected void printRegularMetrics(long epoch) { @Override public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception { -// System.out.printf("Printing process meter %s %d\n", name.getName(), meter.count()); - -// + LOG.debug("Meter {} {} skipped", name.getName(), meter.count()); } @Override @@ -160,21 +158,16 @@ public void processCounter(MetricName name, Counter counter, Long epoch) throws @Override public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception { -// System.out.printf("process histogram %s %f\n", name.getName(), histogram.mean()); + LOG.debug("Histogram {} mean {} skipped", name.getName(), histogram.mean()); } @Override public void processTimer(MetricName name, Timer timer, Long context) throws Exception { -// System.out.printf("timer %s\n", name.getName()); - //To change body of implemented methods use File | Settings | File Templates. + LOG.debug("Timer {} skipped", name.getName()); } @Override public void processGauge(MetricName name, Gauge gauge, Long context) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. -// System.out.printf("gauge %s %s\n", name.getName(), gauge.toString()); statsd.gauge(name.getName(), gauge.hashCode()); } - - } diff --git a/src/main/java/com/shift/undersiege/ReportAgent.java b/src/main/java/com/shift/undersiege/ReportAgent.java deleted file mode 100644 index 9b7684a..0000000 --- a/src/main/java/com/shift/undersiege/ReportAgent.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.shift.undersiege; - -import java.io.IOException; -import java.lang.instrument.Instrumentation; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.TimeUnit; - -import com.yammer.metrics.reporting.ConsoleReporter; -//import com.yammer.com.shift.undersiege.reporting.GraphiteReporter; -// - -public class ReportAgent -{ - public static void premain(String agentArgs, Instrumentation inst) throws IOException - { - // comma separated list of - String host; - try { - host = InetAddress.getLocalHost().getHostName(); - } - catch (UnknownHostException e) { - host = "unknown-host"; - } - - StatsdReporter reporter = new StatsdReporter(agentArgs, 8125, host); - reporter.start(10, TimeUnit.SECONDS); - - System.out.println("STATSD STARTING sending to " + agentArgs + " with host prefix " + host); - - } - public static void main(String[] args) - { - System.out.println("Main running."); - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - System.out.println("Done."); - } -} - diff --git a/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java b/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java new file mode 100644 index 0000000..6028765 --- /dev/null +++ b/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java @@ -0,0 +1,69 @@ +package com.github.lookout.metrics.agent.test; + +import com.github.lookout.metrics.agent.HostPortInterval; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +/** + * Created by rkuris on 12/19/14. + */ +public class HostPortIntervalParserTest { + + @Test + public void testDefault() { + HostPortInterval result = new HostPortInterval(null); + assertEquals(result.getHost(), HostPortInterval.DEFAULT_HOST); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testDefaultPort() { + HostPortInterval result = new HostPortInterval("host"); + assertEquals(result.getHost(), "host"); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIPV4() { + HostPortInterval result = new HostPortInterval("1.2.3.4:56"); + assertEquals(result.getHost(), "1.2.3.4"); + assertEquals(result.getPort(), 56); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIPV6DefaultPort() { + HostPortInterval result = new HostPortInterval("[::1]"); + assertEquals(result.getHost(), "::1"); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIPV6() { + HostPortInterval result = new HostPortInterval("[::1]:9123"); + assertEquals(result.getHost(), "::1"); + assertEquals(result.getPort(), 9123); + assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL); + } + + @Test + public void testIntervalDefaultPort() { + HostPortInterval result = new HostPortInterval("1.2.3.4@1"); + assertEquals(result.getHost(), "1.2.3.4"); + assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT); + assertEquals(result.getInterval(), 1); + } + + @Test + public void testWholeEnchilada() { + HostPortInterval result = new HostPortInterval("[2001:db8::1]:210@321"); + assertEquals(result.getHost(), "2001:db8::1"); + assertEquals(result.getPort(), 210); + assertEquals(result.getInterval(), 321); + assertEquals(result.toString(), "host 2001:db8::1 port 210 every 321 seconds"); + } +} diff --git a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java new file mode 100644 index 0000000..c5fa045 --- /dev/null +++ b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java @@ -0,0 +1,65 @@ +package com.github.lookout.metrics.agent.test; + +import com.github.lookout.metrics.agent.HostPortInterval; +import com.github.lookout.metrics.agent.StatsdReporter; +import com.timgroup.statsd.StatsDClient; +import org.mockito.*; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +/** + * Created by rkuris on 12/23/14. + */ +public class StatsTest { + + @Mock + public StatsDClient client; + private final HostPortInterval hostPortInterval = new HostPortInterval("localhost"); + + private StatsdReporter sut; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testStatsNotZero() { + sut = new StatsdReporter(hostPortInterval, client); + byte[] bigMemoryChunk = new byte[102400000]; // 100mb should get us to 1% of heaps up to 10gb + bigMemoryChunk[4444] = 1; + sut.run(); + verify(client).gauge(Matchers.eq("jvm.memory.totalInitInMB"), AdditionalMatchers.gt(0)); + verify(client).gauge(Matchers.eq("jvm.memory.totalUsedInMB"), AdditionalMatchers.gt(0)); + verify(client).gauge(Matchers.eq("jvm.memory.heapUsedInMB"), AdditionalMatchers.gt(0)); + verify(client).gauge(Matchers.eq("jvm.memory.heapUsagePercent"), AdditionalMatchers.gt(0)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm.memory\\.memory_pool_usages\\..*Percent"), AdditionalMatchers.and(AdditionalMatchers.geq(0), AdditionalMatchers.lt(100))); + verify(client).gauge(Matchers.eq("jvm.fdUsagePercent"), AdditionalMatchers.gt(0)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyInt()); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), Matchers.anyInt()); + assertEquals(bigMemoryChunk[4444], 1); + + verifyNoMoreInteractions(client); + } + + @Test + public void testGCCounts() { + sut = new StatsdReporter(hostPortInterval, client); + sut.run(); + ArgumentCaptor countCaptor1 = ArgumentCaptor.forClass(Integer.class); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyInt()); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor1.capture()); + reset(client); + System.gc(); + sut.run(); + ArgumentCaptor countCaptor2 = ArgumentCaptor.forClass(Integer.class); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), AdditionalMatchers.gt(0)); + verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor2.capture()); + assertNotEquals(countCaptor1.getAllValues(), countCaptor2.getAllValues()); + } +}