diff --git a/.gitignore b/.gitignore
index f0b438d..cb8cec8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,6 @@ target/
node_modules/
out/
*.class
+.idea
+*.iml
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..dff5f3a
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1 @@
+language: java
diff --git a/README.md b/README.md
index daa8537..9cc38f2 100644
--- a/README.md
+++ b/README.md
@@ -1,36 +1,89 @@
-Under Siege
-===============
-
Cassandra Statsd Reporting Tool
---------------------------------
+===============================
+
+Thanks to UnderSiege for getting this project off the ground.
REQUIREMENTS
----------------
* Maven
-* Java
+* Java 7
BUILD
----------------
+Check which metrics library cassandra is using by looking in
+cassandra/lib/metrics-core*, verify that pom.xml points to the
+same exact version. For example, if you have metrics-core-2.2.0.jar,
+make sure pom.xml has 2.2.0.
+
`mvn package`
-It may be necessary to build the jar against the same version of the metrics library being used in Cassandra.
+Alternatively, grab the binary from bintray:
-INSTAL
+`curl -L http://dl.bintray.com/lookout/systems/com/github/lookout/metrics/agent/1.2/agent-1.2.jar -o agent-1.2.jar`
+
+INSTALL
----------------
-Toss the appropriate version of statsd library (hopefully in your .m2 folder by now) in your cassandra/lib/ directory.
+Copy the statsd library from the .m2 folder to cassandra/lib.
Add the following to your cassandra startup script:
-`JVM_OPTS="$JVM_OPTS -javaagent:/path/to/built.jar=localhost"`
-
-Or whatever path you've decided to put your agent.
-
-Note the '=localhost' at the end. You should change this to your statsd instance.
-
+Copy the agent-1.2.jar to a new directory cassandra/plugins
+Change cassandra startup to add this agent. This can be done in
+a stock install by adding the following to /etc/default/cassandra:
+`export JVM_OPTS="-javaagent:/usr/share/cassandra/plugins/agent-1.2.jar=localhost"`
+Note the '=localhost' at the end. This supports the following syntaxes:
+`hostname:port@interval`
+For example:
+`your.statsd.host.com:9999@60`
+The default port is 8125 and the default interval is 10 (seconds); these
+can be omitted. IPV6 is also supported with the following syntax:
+`[2001:db8::1]:8888@300`
+which means connect to 2001:db8::1 on port 8888 every 300 seconds.
+REPORTING
+----------------
+A log message will be added to the system.log at startup to
+confirm that everything is running, it looks like this:
+
+`INFO [metrics-statsd-thread-1] 2014-12-19 19:05:37,120 StatsdReporter.java:65 - Statsd reporting to host localhost port 8125 every 10 seconds`
+
+WHAT GETS REPORTED
+------------------
+Lots of stuff:
+
+* Gossip statistics:
+ gossip.score., which help decide who is closer/faster for queries
+ gossip.severity, which indicates how busy this node is self-reporting to others
+* Per table statistics:
+ cfstats...ReadCount
+ cfstats...WriteCount
+ cfstats...RecentReadLatencyMicros
+ cfstats...RecentWriteLatencyMicros
+ cfstats...TombstonesPerSlice
+ cfstats...estimatedKeys
+ The last one is great for monitoring general trends, but of course don't
+ rely on that number to be very accurate.
+* PHI reporter
+ Also supported is the currently-experimental PHI reporter, in PHI.,
+ coming to a Cassandra cluster near you soon.
+* JVM GC metrics
+* Anything else registered with yammer-metrics
+
+DEBUGGING
+----------------
+Not working? There's a lot of tracing and debugging available. Change the
+log4j-server.properties and add something like this to get extremely detailed
+traces of what it's doing in the server.log.
+`log4j.logger.com.github.lookout.metrics.agent.generators=TRACE`
+TODO
+----------------
+Errors that happen during startup are not reported as well as they should
+be, mostly because the logging system is not active during startup. The log
+message is only generated when the actual metrics collector has run for the
+first time.
diff --git a/pom.xml b/pom.xml
index 7771122..a70585d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,58 +1,76 @@
- 4.0.0
-
- undersiege
- undersiege
-
- 0.1
- jar
-
- cassandra-statsd-reporter-smm
- http://maven.apache.org
-
-
- UTF-8
- 1.6
- 1.6
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
- src/main/java/META-INF/MANIFEST.MF
-
-
-
-
-
-
-
-
-
- junit
- junit
- 3.8.1
- test
-
-
-
- com.yammer.metrics
- metrics-core
- 2.2.0
-
-
-
- com.timgroup
- java-statsd-client
- 2.0.0
-
-
-
-
-
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ 4.0.0
+
+ com.github.lookout.metrics
+ agent
+
+ 1.3
+ jar
+
+ cassandra-statsd-reporter
+
+
+ UTF-8
+ 1.7
+ 1.7
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.4
+
+
+ src/main/java/META-INF/MANIFEST.MF
+
+
+
+
+
+
+
+
+
+ com.yammer.metrics
+ metrics-core
+ 2.2.0
+ provided
+
+
+
+ com.timgroup
+ java-statsd-client
+ 3.0.2
+ provided
+
+
+ org.testng
+ testng
+ 6.8.8
+ test
+
+
+ org.mockito
+ mockito-core
+ 1.10.17
+ test
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.9
+ test
+
+
+
+
+
+ bintray-lookout-systems-cassandra-statsd-agent
+ lookout-systems-cassandra-statsd-agent
+ https://api.bintray.com/maven/lookout/systems/cassandra-statsd-agent
+
+
diff --git a/src/main/java/META-INF/MANIFEST.MF b/src/main/java/META-INF/MANIFEST.MF
index d629a3f..c3b27a0 100644
--- a/src/main/java/META-INF/MANIFEST.MF
+++ b/src/main/java/META-INF/MANIFEST.MF
@@ -1,2 +1,2 @@
-Manifest-Version: 1.0
-Premain-Class: com.shift.undersiege.ReportAgent
+Manifest-Version: 1.0
+Premain-Class: com.github.lookout.metrics.agent.ReportAgent
diff --git a/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java b/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java
new file mode 100644
index 0000000..859af5d
--- /dev/null
+++ b/src/main/java/com/github/lookout/metrics/agent/HostPortInterval.java
@@ -0,0 +1,68 @@
+package com.github.lookout.metrics.agent;
+
+/**
+ * Created by rkuris on 12/19/14.
+ */
+public class HostPortInterval {
+ public static final String DEFAULT_HOST = "localhost";
+ public static final int DEFAULT_PORT = 8125;
+ public static final int DEFAULT_INTERVAL = 10;
+
+ private final String host;
+ private final int port;
+ private final int interval;
+
+ public HostPortInterval(final String hostPortInterval) {
+ if (hostPortInterval == null || hostPortInterval.isEmpty()) {
+ this.host = DEFAULT_HOST;
+ this.port = DEFAULT_PORT;
+ this.interval = DEFAULT_INTERVAL;
+ return;
+ }
+ int intervalOffset = hostPortInterval.lastIndexOf('@');
+ final String hostPort;
+ if (intervalOffset == -1) {
+ this.interval = DEFAULT_INTERVAL;
+ hostPort = hostPortInterval;
+ } else {
+ this.interval = Integer.parseInt(hostPortInterval.substring(intervalOffset + 1));
+ hostPort = hostPortInterval.substring(0, intervalOffset);
+ }
+ int colonOffset = hostPort.lastIndexOf(':');
+ if (colonOffset == -1 || hostPort.endsWith("]")) {
+ this.host = stripBrackets(hostPort);
+ this.port = DEFAULT_PORT;
+ } else {
+ final String hostPart = hostPort.substring(0, colonOffset);
+ final String portPart = hostPort.substring(colonOffset + 1);
+ this.host = stripBrackets(hostPart);
+ this.port = Integer.parseInt(portPart);
+ }
+
+ }
+
+ private String stripBrackets(final String source) {
+ int sourceLength = source.length();
+ if (sourceLength > 2 && source.charAt(0) == '[' && source.charAt(sourceLength - 1) == ']') {
+ return source.substring(1, sourceLength - 1);
+ }
+ return source;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public int getInterval() {
+ return interval;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("host %s port %d every %d seconds", host, port, interval);
+ }
+}
diff --git a/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java
new file mode 100644
index 0000000..1be5a42
--- /dev/null
+++ b/src/main/java/com/github/lookout/metrics/agent/ReportAgent.java
@@ -0,0 +1,26 @@
+package com.github.lookout.metrics.agent;
+
+import com.timgroup.statsd.NonBlockingStatsDClient;
+import com.timgroup.statsd.StatsDClient;
+
+import java.io.IOException;
+import java.lang.instrument.Instrumentation;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+
+public class ReportAgent {
+
+ public static void premain(final String agentArgs, final Instrumentation inst) {
+ final String prefix = "cassandra";
+
+ final String[] reportingHostPorts = (agentArgs != null) ? agentArgs.split(",") : new String[]{null};
+ for (final String reportingHostPort : reportingHostPorts) {
+ final HostPortInterval hostPortInterval = new HostPortInterval(reportingHostPort);
+ final StatsDClient client = new NonBlockingStatsDClient(prefix, hostPortInterval.getHost(), hostPortInterval.getPort());
+ final StatsdReporter reporter = new StatsdReporter(hostPortInterval, client);
+ reporter.start(hostPortInterval.getInterval(), TimeUnit.SECONDS);
+ }
+ }
+}
+
diff --git a/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java
new file mode 100644
index 0000000..6099c3b
--- /dev/null
+++ b/src/main/java/com/github/lookout/metrics/agent/StatsdReporter.java
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2012-2013 Sean Laurent
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ Edited by Jon Haddad at SHIFT to work with
+ */
+
+package com.github.lookout.metrics.agent;
+
+import com.github.lookout.metrics.agent.generators.CassandraJMXGenerator;
+import com.github.lookout.metrics.agent.generators.JavaVMGenerator;
+import com.github.lookout.metrics.agent.generators.MetricGenerator;
+import com.github.lookout.metrics.agent.generators.YammerMetricsGenerator;
+import com.timgroup.statsd.StatsDClient;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.*;
+import com.yammer.metrics.reporting.AbstractPollingReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+public class StatsdReporter extends AbstractPollingReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class);
+
+
+ private final StatsDClient statsd;
+
+ private boolean reportedStartup = false;
+ private final HostPortInterval hostPortInterval;
+
+ private final Set generators = new HashSet<>();
+
+ public StatsdReporter(final HostPortInterval hostPortInterval, final StatsDClient statsd) {
+ super(Metrics.defaultRegistry(), "statsd");
+ this.hostPortInterval = hostPortInterval;
+ this.statsd = statsd;
+
+ // This really should be done with an injection framework, but that's too heavy for this
+ generators.add(new CassandraJMXGenerator());
+ generators.add(new JavaVMGenerator());
+ generators.add(new YammerMetricsGenerator());
+ }
+
+ @Override
+ public void run() {
+ if (!reportedStartup || LOG.isDebugEnabled()) {
+ LOG.info("Statsd reporting to {}", hostPortInterval);
+ reportedStartup = true;
+ }
+ for (MetricGenerator generator : generators) {
+ try {
+ generator.generate(statsd);
+ } catch (RuntimeException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error writing to statsd", e);
+ } else {
+ LOG.warn("Error writing to statsd: {}", e.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/CassandraJMXGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/CassandraJMXGenerator.java
new file mode 100644
index 0000000..7e57ed3
--- /dev/null
+++ b/src/main/java/com/github/lookout/metrics/agent/generators/CassandraJMXGenerator.java
@@ -0,0 +1,148 @@
+package com.github.lookout.metrics.agent.generators;
+
+import com.timgroup.statsd.StatsDClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Extracts the relevant JMX values from a running cassandra instance.
+ * This code doesn't report anything when it can't find the JMX stuff
+ * we care about.
+ */
+public class CassandraJMXGenerator implements MetricGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraJMXGenerator.class);
+ private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ private boolean phiAvailable = true;
+
+ public CassandraJMXGenerator() {
+ }
+
+ private void gaugeAndLog(StatsDClient statsDClient, String gaugeName, Double value) {
+ statsDClient.gauge(gaugeName, value);
+ LOG.debug("Reporting {} as {}", gaugeName, value);
+ }
+ private void gaugeAndLog(StatsDClient statsDClient, String gaugeName, Long value) {
+ statsDClient.gauge(gaugeName, value);
+ LOG.debug("Reporting {} as {}", gaugeName, value);
+ }
+
+ /**
+ * Construct the base gauge name from the JMX object name
+ * The gauge is reported as cfstats.KEYSPACE.COLUMNFAMILY.
+ * There is a trailing dot, so the actual gauge name can be
+ * constructed by appending the item name to the end
+ *
+ * @param name The JMX ObjectName
+ * @return A string in the format cfstats.KEYSPACE.COLUMNFAMILY
+ */
+ private String gaugeName(ObjectName name) {
+ String ks = name.getKeyProperty("keyspace");
+ String cf = name.getKeyProperty("columnfamily");
+ return String.format("cfstats.%s.%s.", ks, cf);
+ }
+
+ /**
+ * Some generated gauge names have things like slashes and dots in them.
+ * This will replace all non-alnum characters with an underscore.
+ * @param name
+ * @return
+ */
+ private String quoteGaugeName(String name) {
+ return name.replaceAll("\\W", "_");
+ }
+
+ /**
+ * These are the per-column-family attributes that we will report to statsd
+ */
+ public String[] cfAttributeList = new String[]
+ { "ReadCount",
+ "WriteCount",
+ "RecentReadLatencyMicros",
+ "RecentWriteLatencyMicros",
+ "TombstonesPerSlice"
+ };
+
+
+ @Override
+ public void generate(StatsDClient statsDClient) {
+ // Step 1: report PHI values from FailureDetector if available
+ if (phiAvailable) {
+ try {
+ TabularData table = (TabularData) mBeanServer.getAttribute(new ObjectName("org.apache.cassandra.net:type=FailureDetector"), "PhiValues");
+ for (Object rawData : table.values()) {
+ final CompositeData data = (CompositeData) rawData;
+ final String gaugeName = "phi." + quoteGaugeName(data.get("Endpoint").toString());
+ gaugeAndLog(statsDClient, gaugeName, (Double) data.get("PHI"));
+ }
+
+ } catch (Exception e) {
+ LOG.trace("Exception reading PhiValues, skipping", e);
+ phiAvailable = false;
+ }
+ }
+
+ // Step 2: Gossiper information
+ try {
+ ObjectName snitchName = new ObjectName("org.apache.cassandra.db:type=DynamicEndpointSnitch");
+ Map gossipScores = (Map) mBeanServer.getAttribute(snitchName, "Scores");
+ for (Map.Entry entry : gossipScores.entrySet()) {
+ final String gaugeName = "gossip.score." + quoteGaugeName(entry.getKey().toString());
+ gaugeAndLog(statsDClient, gaugeName, entry.getValue());
+ }
+ Double severity = (Double) mBeanServer.getAttribute(snitchName, "Severity");
+ gaugeAndLog(statsDClient, "gossip.severity", severity);
+ } catch (Exception e) {
+ LOG.debug("Skipping gossip info", e);
+ }
+
+ // Step 3: per cf statistics
+ Set names;
+ try {
+ names = mBeanServer.queryNames(new ObjectName("org.apache.cassandra.db:columnfamily=*,keyspace=*,type=ColumnFamilies"), null);
+ } catch (final MalformedObjectNameException e) {
+ throw new RuntimeException(e); // impossible
+ }
+ for (ObjectName name : names) {
+ Long keys = null;
+ String gaugeName = gaugeName(name);
+ try {
+ keys = (Long) mBeanServer.invoke(name, "estimateKeys", new Object[]{}, new String[]{});
+ gaugeAndLog(statsDClient, gaugeName + "estimatedKeys", keys);
+ } catch (final InstanceNotFoundException|MBeanException|ReflectionException e) {
+ LOG.debug("Failed to invoke estimateKeys method on {} (ignored)", name, e);
+ }
+ for (String attr : cfAttributeList) {
+ try {
+ Object value = null;
+ try {
+ value = mBeanServer.getAttribute(name, attr);
+ } catch (final MBeanException|InstanceNotFoundException|ReflectionException e) {
+ LOG.debug("Couldn't fetch attribute {} from {} (ignored)", new Object[]{attr, name, e});
+ }
+ final String fullyQualifiedGaugeName = gaugeName + attr;
+ if (value instanceof Long) {
+ statsDClient.gauge(fullyQualifiedGaugeName, (Long) value);
+ } else if (value instanceof Double) {
+ statsDClient.gauge(fullyQualifiedGaugeName, (Double) value);
+ } else if (LOG.isWarnEnabled()){
+ LOG.warn("Type {} for attribute {} of {} is not supported", new Object[]{value.getClass(), attr, name});
+ continue;
+ }
+ LOG.trace("Reporting {} value {}", fullyQualifiedGaugeName, value);
+ } catch (final AttributeNotFoundException e) {
+ // don't report missing attributes
+ LOG.debug("Missing attribute {} for {}", attr, name);
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/JavaVMGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/JavaVMGenerator.java
new file mode 100644
index 0000000..344a1d3
--- /dev/null
+++ b/src/main/java/com/github/lookout/metrics/agent/generators/JavaVMGenerator.java
@@ -0,0 +1,76 @@
+package com.github.lookout.metrics.agent.generators;
+
+import com.timgroup.statsd.StatsDClient;
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by rkuris on 5/28/15.
+ */
+public class JavaVMGenerator implements MetricGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraJMXGenerator.class);
+
+ protected final VirtualMachineMetrics vm = VirtualMachineMetrics.getInstance();
+ private final HashMap previous_run_times = new HashMap();
+ private final HashMap previous_run_counts = new HashMap();
+
+ private int bytesToMB(double bytes) {
+ return (int)(bytes/(1024*1024));
+ }
+ private int doubleToPct(double pct) {
+ return (int) Math.round(100 * pct);
+ }
+
+ @Override
+ public void generate(StatsDClient statsDClient) {
+ statsDClient.gauge("jvm.memory.totalInitInMB", bytesToMB(vm.totalInit()));
+ statsDClient.gauge("jvm.memory.totalUsedInMB", bytesToMB(vm.totalUsed()));
+ statsDClient.gauge("jvm.memory.heapUsedInMB", bytesToMB(vm.heapUsed()));
+
+ statsDClient.gauge("jvm.memory.heapUsagePercent", doubleToPct(vm.heapUsage()));
+
+ for (Map.Entry pool : vm.memoryPoolUsage().entrySet()) {
+ statsDClient.gauge("jvm.memory.memory_pool_usages." + pool.getKey() + "Percent", doubleToPct(pool.getValue()));
+ }
+
+ statsDClient.gauge("jvm.fdUsagePercent", doubleToPct(vm.fileDescriptorUsage()));
+
+ for (Map.Entry entry : vm.garbageCollectors().entrySet()) {
+ // we only care about the delta times for the GC time and GC runs
+
+ final String name = "jvm.gc." + entry.getKey();
+ String stat_name_time = name + ".timeInMS";
+
+ int total_run_time = (int) entry.getValue().getTime(TimeUnit.MILLISECONDS);
+ Integer previous_total_run_time = previous_run_times.get(stat_name_time);
+
+ if (previous_total_run_time == null) {
+ previous_total_run_time = 0;
+ }
+ int delta_run_time = total_run_time - previous_total_run_time;
+ previous_run_times.put(stat_name_time, total_run_time);
+
+ statsDClient.gauge(stat_name_time, delta_run_time);
+ String stat_run_count = name + ".runs";
+
+ int total_runs = (int) entry.getValue().getRuns();
+
+ Integer previous_total_runs = previous_run_counts.get(stat_run_count);
+
+ if (previous_total_runs == null) {
+ previous_total_runs = 0;
+ }
+
+ statsDClient.gauge(stat_run_count, total_runs - previous_total_runs);
+ LOG.debug("Reporting {} as {}", stat_run_count, total_runs - previous_total_runs);
+ previous_run_counts.put(stat_run_count, total_runs);
+
+ }
+ }
+}
diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/MetricGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/MetricGenerator.java
new file mode 100644
index 0000000..b1c952d
--- /dev/null
+++ b/src/main/java/com/github/lookout/metrics/agent/generators/MetricGenerator.java
@@ -0,0 +1,10 @@
+package com.github.lookout.metrics.agent.generators;
+
+import com.timgroup.statsd.StatsDClient;
+
+/**
+ * Created by rkuris on 5/28/15.
+ */
+public interface MetricGenerator {
+ void generate(StatsDClient statsDClient);
+}
diff --git a/src/main/java/com/github/lookout/metrics/agent/generators/YammerMetricsGenerator.java b/src/main/java/com/github/lookout/metrics/agent/generators/YammerMetricsGenerator.java
new file mode 100644
index 0000000..db7f245
--- /dev/null
+++ b/src/main/java/com/github/lookout/metrics/agent/generators/YammerMetricsGenerator.java
@@ -0,0 +1,87 @@
+package com.github.lookout.metrics.agent.generators;
+
+import com.timgroup.statsd.StatsDClient;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by rkuris on 5/28/15.
+ */
+public class YammerMetricsGenerator implements MetricProcessor, MetricGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(YammerMetricsGenerator.class);
+ private MetricsRegistry registry = Metrics.defaultRegistry();
+ private StatsDClient client;
+ private static final MetricPredicate predicate = MetricPredicate.ALL;
+ protected final Clock clock = Clock.defaultClock();
+
+ @Override
+ public void generate(StatsDClient statsDClient) {
+ // get all the registered metrics via dropwizard
+ this.client = statsDClient;
+ final long epoch = TimeUnit.MILLISECONDS.toSeconds(clock.time());
+ final Set>> entries = registry.groupedMetrics(predicate).entrySet();
+
+ for (final Map.Entry> entry : entries) {
+ for (final Map.Entry subEntry : entry.getValue().entrySet()) {
+
+ final Metric metric = subEntry.getValue();
+
+ if (metric != null) {
+ try {
+ metric.processWith(this, subEntry.getKey(), epoch);
+ } catch (final Exception exception) {
+ LOG.error("Error processing key {}", subEntry.getKey(), exception);
+ }
+ }
+ }
+ }
+
+ }
+ @Override
+ public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception {
+ LOG.debug("Meter {} {} skipped", name.getName(), meter.count());
+ }
+
+ @Override
+ public void processCounter(MetricName name, Counter counter, Long epoch) throws Exception {
+ client.gauge(name.getName(), counter.count());
+ }
+
+ @Override
+ public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception {
+ LOG.debug("Histogram {} mean {} skipped", name.getName(), histogram.mean());
+ }
+
+ @Override
+ public void processTimer(MetricName name, Timer timer, Long context) throws Exception {
+ LOG.debug("Timer {} skipped", name.getName());
+ }
+
+ @Override
+ public void processGauge(MetricName name, Gauge> gauge, Long context) throws Exception {
+ reportGaugeValue(name.getName(), gauge.value());
+ }
+ private void reportGaugeValue(String name, Object gaugeValue) {
+ if (gaugeValue instanceof Long) {
+ long value = ((Long) gaugeValue).longValue();
+ LOG.debug("Reporting {} as {}", name, value);
+ client.gauge(name, value);
+ } else if (gaugeValue instanceof Double) {
+ double value = ((Double)gaugeValue).doubleValue();
+ LOG.debug("Reporting {} as {}", name, value);
+ client.gauge(name, value);
+ } else if (gaugeValue instanceof Map) {
+ for (Map.Entry, ?> entry: ((Map,?>)gaugeValue).entrySet()) {
+ reportGaugeValue(name + "." + entry.getKey().toString(), entry.getValue());
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/shift/undersiege/ReportAgent.java b/src/main/java/com/shift/undersiege/ReportAgent.java
deleted file mode 100644
index 9b7684a..0000000
--- a/src/main/java/com/shift/undersiege/ReportAgent.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.shift.undersiege;
-
-import java.io.IOException;
-import java.lang.instrument.Instrumentation;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
-
-import com.yammer.metrics.reporting.ConsoleReporter;
-//import com.yammer.com.shift.undersiege.reporting.GraphiteReporter;
-//
-
-public class ReportAgent
-{
- public static void premain(String agentArgs, Instrumentation inst) throws IOException
- {
- // comma separated list of
- String host;
- try {
- host = InetAddress.getLocalHost().getHostName();
- }
- catch (UnknownHostException e) {
- host = "unknown-host";
- }
-
- StatsdReporter reporter = new StatsdReporter(agentArgs, 8125, host);
- reporter.start(10, TimeUnit.SECONDS);
-
- System.out.println("STATSD STARTING sending to " + agentArgs + " with host prefix " + host);
-
- }
- public static void main(String[] args)
- {
- System.out.println("Main running.");
- try {
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- System.out.println("Done.");
- }
-}
-
diff --git a/src/main/java/com/shift/undersiege/StatsdReporter.java b/src/main/java/com/shift/undersiege/StatsdReporter.java
deleted file mode 100644
index 2b56654..0000000
--- a/src/main/java/com/shift/undersiege/StatsdReporter.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Copyright (C) 2012-2013 Sean Laurent
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- Edited by Jon Haddad at SHIFT to work with
- */
-
-package com.shift.undersiege;
-
-
-import com.timgroup.statsd.NonBlockingStatsDClient;
-import com.timgroup.statsd.StatsDClient;
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.*;
-import com.yammer.metrics.reporting.AbstractPollingReporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.concurrent.TimeUnit;
-
-public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class);
-
- protected final VirtualMachineMetrics vm;
- private StatsDClient statsd;
- private boolean printVMMetrics = false;
- protected final Clock clock;
- private MetricPredicate predicate = MetricPredicate.ALL;
-
- private HashMap previous_run_times;
- private HashMap previous_run_counts;
-
- public StatsdReporter(String host, int port, String prefix) throws IOException {
- super(Metrics.defaultRegistry(), "statsd");
- statsd = new NonBlockingStatsDClient(prefix, host, port);
- vm = VirtualMachineMetrics.getInstance();
- clock = Clock.defaultClock();
- previous_run_times = new HashMap();
- previous_run_counts = new HashMap();
- }
-
-
- @Override
- public void run() {
- try {
- final long epoch = clock.time() / 1000;
- if (this.printVMMetrics) {
- printVmMetrics(epoch);
- }
- printRegularMetrics(epoch);
-
- // Send UDP data
-
- } catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Error writing to statsd", e);
- } else {
- LOG.warn("Error writing to statsd: {}", e.getMessage());
- }
- }
- }
-
- protected void printVmMetrics(long epoch) {
- // Memory
- int div = 1048576;
- statsd.gauge("jvm.memory.totalInitInMB", (int) vm.totalInit() / div);
- statsd.gauge("jvm.memory.totalUsedInMB", (int) vm.totalUsed() / div);
- statsd.gauge("jvm.memory.heapUsedInMB", (int) vm.heapUsed() / div);
-
- statsd.gauge("jvm.memory.heapUsageInMB", (int) vm.heapUsage() / div);
-
- for (Map.Entry pool : vm.memoryPoolUsage().entrySet()) {
- statsd.gauge("jvm.memory.memory_pool_usages." + pool.getKey(), pool.getValue().intValue() / div);
- }
-
- statsd.gauge("jvm.fd_usage", (int) vm.fileDescriptorUsage());
-
- for (Map.Entry entry : vm.garbageCollectors().entrySet()) {
- // we only care about the delta times for the GC time and GC runs
-
- final String name = "jvm.gc." + entry.getKey();
- String stat_name_time = name + ".timeInMS";
-
- int total_run_time = (int) entry.getValue().getTime(TimeUnit.MILLISECONDS);
- Integer previous_total_run_time = previous_run_times.get(stat_name_time);
-
- if (previous_total_run_time == null) {
- previous_total_run_time = 0;
- }
- int delta_run_time = total_run_time - previous_total_run_time;
- previous_run_times.put(stat_name_time, total_run_time);
-
- statsd.gauge(stat_name_time, delta_run_time);
- String stat_run_count = name + ".runs";
-
- int total_runs = (int) entry.getValue().getRuns();
-
- Integer previous_total_runs = previous_run_counts.get(stat_run_count);
-
- if(previous_total_runs == null) {
- previous_total_runs = 0;
- }
-
- statsd.gauge(stat_run_count, total_runs - previous_total_runs);
- previous_run_counts.put(stat_run_count, total_runs);
- }
- }
-
- protected void printRegularMetrics(long epoch) {
- printVmMetrics(epoch);
-
- Set>> entries = getMetricsRegistry().groupedMetrics(predicate).entrySet();
-
- for (Map.Entry> entry : entries) {
- for (Map.Entry subEntry : entry.getValue().entrySet()) {
-
- final Metric metric = subEntry.getValue();
-
- if (metric != null) {
- try {
- metric.processWith(this, subEntry.getKey(), epoch);
- //statsd.gauge(subEntry.getKey(), subEntry.getValue().);
- } catch (Exception ignored) {
- LOG.error("Error printing regular com.shift.undersiege:", ignored);
- }
- }
- }
- }
- }
-
- @Override
- public void processMeter(MetricName name, Metered meter, Long epoch) throws Exception {
-// System.out.printf("Printing process meter %s %d\n", name.getName(), meter.count());
-
-//
- }
-
- @Override
- public void processCounter(MetricName name, Counter counter, Long epoch) throws Exception {
- statsd.gauge(name.getName(), (int) counter.count());
- }
-
- @Override
- public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws Exception {
-// System.out.printf("process histogram %s %f\n", name.getName(), histogram.mean());
- }
-
- @Override
- public void processTimer(MetricName name, Timer timer, Long context) throws Exception {
-// System.out.printf("timer %s\n", name.getName());
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public void processGauge(MetricName name, Gauge> gauge, Long context) throws Exception {
- //To change body of implemented methods use File | Settings | File Templates.
-// System.out.printf("gauge %s %s\n", name.getName(), gauge.toString());
- statsd.gauge(name.getName(), gauge.hashCode());
- }
-
-
-}
diff --git a/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java b/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java
new file mode 100644
index 0000000..6028765
--- /dev/null
+++ b/src/test/java/com/github/lookout/metrics/agent/test/HostPortIntervalParserTest.java
@@ -0,0 +1,69 @@
+package com.github.lookout.metrics.agent.test;
+
+import com.github.lookout.metrics.agent.HostPortInterval;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Created by rkuris on 12/19/14.
+ */
+public class HostPortIntervalParserTest {
+
+ @Test
+ public void testDefault() {
+ HostPortInterval result = new HostPortInterval(null);
+ assertEquals(result.getHost(), HostPortInterval.DEFAULT_HOST);
+ assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT);
+ assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL);
+ }
+
+ @Test
+ public void testDefaultPort() {
+ HostPortInterval result = new HostPortInterval("host");
+ assertEquals(result.getHost(), "host");
+ assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT);
+ assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL);
+ }
+
+ @Test
+ public void testIPV4() {
+ HostPortInterval result = new HostPortInterval("1.2.3.4:56");
+ assertEquals(result.getHost(), "1.2.3.4");
+ assertEquals(result.getPort(), 56);
+ assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL);
+ }
+
+ @Test
+ public void testIPV6DefaultPort() {
+ HostPortInterval result = new HostPortInterval("[::1]");
+ assertEquals(result.getHost(), "::1");
+ assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT);
+ assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL);
+ }
+
+ @Test
+ public void testIPV6() {
+ HostPortInterval result = new HostPortInterval("[::1]:9123");
+ assertEquals(result.getHost(), "::1");
+ assertEquals(result.getPort(), 9123);
+ assertEquals(result.getInterval(), HostPortInterval.DEFAULT_INTERVAL);
+ }
+
+ @Test
+ public void testIntervalDefaultPort() {
+ HostPortInterval result = new HostPortInterval("1.2.3.4@1");
+ assertEquals(result.getHost(), "1.2.3.4");
+ assertEquals(result.getPort(), HostPortInterval.DEFAULT_PORT);
+ assertEquals(result.getInterval(), 1);
+ }
+
+ @Test
+ public void testWholeEnchilada() {
+ HostPortInterval result = new HostPortInterval("[2001:db8::1]:210@321");
+ assertEquals(result.getHost(), "2001:db8::1");
+ assertEquals(result.getPort(), 210);
+ assertEquals(result.getInterval(), 321);
+ assertEquals(result.toString(), "host 2001:db8::1 port 210 every 321 seconds");
+ }
+}
diff --git a/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java
new file mode 100644
index 0000000..8635d24
--- /dev/null
+++ b/src/test/java/com/github/lookout/metrics/agent/test/StatsTest.java
@@ -0,0 +1,90 @@
+package com.github.lookout.metrics.agent.test;
+
+import com.github.lookout.metrics.agent.HostPortInterval;
+import com.github.lookout.metrics.agent.StatsdReporter;
+import com.timgroup.statsd.StatsDClient;
+import org.mockito.*;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+/**
+ * Created by rkuris on 12/23/14.
+ */
+public class StatsTest {
+
+ @Mock
+ public StatsDClient client;
+
+ @Mock
+ public DynamicMBean mockMBean;
+
+ private final HostPortInterval hostPortInterval = new HostPortInterval("localhost");
+
+ private StatsdReporter sut;
+
+ @BeforeMethod
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testStatsNotZero() {
+ sut = new StatsdReporter(hostPortInterval, client);
+ byte[] bigMemoryChunk = new byte[102400000]; // 100mb should get us to 1% of heaps up to 10gb
+ bigMemoryChunk[4444] = 1;
+ sut.run();
+ verify(client).gauge(Matchers.eq("jvm.memory.totalInitInMB"), AdditionalMatchers.gt(0L));
+ verify(client).gauge(Matchers.eq("jvm.memory.totalUsedInMB"), AdditionalMatchers.gt(0L));
+ verify(client).gauge(Matchers.eq("jvm.memory.heapUsedInMB"), AdditionalMatchers.gt(0L));
+ verify(client).gauge(Matchers.eq("jvm.memory.heapUsagePercent"), AdditionalMatchers.gt(0L));
+ verify(client, atLeastOnce()).gauge(Matchers.matches("jvm.memory\\.memory_pool_usages\\..*Percent"), AdditionalMatchers.and(AdditionalMatchers.geq(0L), AdditionalMatchers.lt(100L)));
+ verify(client).gauge(Matchers.eq("jvm.fdUsagePercent"), AdditionalMatchers.geq(0L)); // gt failing on CI
+ verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyLong());
+ verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), Matchers.anyLong());
+ assertEquals(bigMemoryChunk[4444], 1);
+
+ verifyNoMoreInteractions(client);
+ }
+
+ @Test
+ public void testGCCounts() {
+ sut = new StatsdReporter(hostPortInterval, client);
+ sut.run();
+ ArgumentCaptor countCaptor1 = ArgumentCaptor.forClass(Long.class);
+ verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), Matchers.anyLong());
+ verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor1.capture());
+ reset(client);
+ System.gc();
+ sut.run();
+ ArgumentCaptor countCaptor2 = ArgumentCaptor.forClass(Long.class);
+ verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.timeInMS"), AdditionalMatchers.gt(0L));
+ verify(client, atLeastOnce()).gauge(Matchers.matches("jvm\\.gc\\..*\\.runs"), countCaptor2.capture());
+ assertNotEquals(countCaptor1.getAllValues(), countCaptor2.getAllValues());
+ }
+ @AfterClass
+ public void testMBean() throws MalformedObjectNameException, NotCompliantMBeanException, InstanceAlreadyExistsException, MBeanException, ReflectionException, AttributeNotFoundException {
+ final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ when(mockMBean.getMBeanInfo()).thenReturn(new MBeanInfo("test", "", new MBeanAttributeInfo[]{ new MBeanAttributeInfo("ReadCount", "Long", "Read count", true, false, false)}, new MBeanConstructorInfo[]{}, new MBeanOperationInfo[]{
+
+ }, new MBeanNotificationInfo[]{}));
+ when(mockMBean.invoke(Matchers.eq("estimateKeys"), Matchers.any(Object[].class), Matchers.any(String[].class))).thenReturn(1L);
+ when(mockMBean.getAttribute(Matchers.eq("ReadCount"))).thenReturn(88888L);
+ reset(client);
+ mBeanServer.registerMBean(mockMBean, new ObjectName("org.apache.cassandra.db:columnfamily=testcf,keyspace=testks,type=ColumnFamilies"));
+ sut = new StatsdReporter(hostPortInterval, client);
+ sut.run();
+ verify(client,atLeastOnce()).gauge(Matchers.matches("jvm\\..*"), Matchers.anyLong());
+ verify(client,times(1)).gauge(Matchers.eq("cfstats.testks.testcf.estimatedKeys"), Matchers.eq(1L));
+ verify(client,times(1)).gauge(Matchers.eq("cfstats.testks.testcf.ReadCount"), Matchers.eq(88888L));
+ verifyNoMoreInteractions(client);
+ }
+}