From ae09792fd2976a1457e13f79ac3945b2bccbc2ec Mon Sep 17 00:00:00 2001 From: zhaolong Date: Mon, 24 Jun 2019 14:08:32 +0200 Subject: [PATCH 1/2] add instance type tag make autothrottle configuratable --- cmd/autothrottle/api.go | 2 +- cmd/autothrottle/events.go | 2 +- cmd/autothrottle/helpers.go | 2 +- cmd/autothrottle/limits.go | 2 +- cmd/autothrottle/limits_test.go | 2 +- cmd/autothrottle/main.go | 19 +++++++++------- cmd/autothrottle/throttles.go | 6 ++--- cmd/autothrottle/throttles_test.go | 4 ++-- kafkametrics/datadog/datadog.go | 34 ++++++++++++++++------------ kafkametrics/datadog/datadog_test.go | 6 ++--- kafkametrics/datadog/helpers.go | 11 ++++----- 11 files changed, 47 insertions(+), 43 deletions(-) diff --git a/cmd/autothrottle/api.go b/cmd/autothrottle/api.go index 5f7be56a..7c16bfb2 100644 --- a/cmd/autothrottle/api.go +++ b/cmd/autothrottle/api.go @@ -7,7 +7,7 @@ import ( "net/http" "strconv" - "github.com/DataDog/kafka-kit/kafkazk" + "github.com/mrmuggymuggy/kafka-kit/kafkazk" ) // APIConfig holds configuration diff --git a/cmd/autothrottle/events.go b/cmd/autothrottle/events.go index ac1a3e1e..ece3a52b 100644 --- a/cmd/autothrottle/events.go +++ b/cmd/autothrottle/events.go @@ -4,7 +4,7 @@ import ( "fmt" "log" - "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" ) // Events configs. diff --git a/cmd/autothrottle/helpers.go b/cmd/autothrottle/helpers.go index 30156647..2e904ce2 100644 --- a/cmd/autothrottle/helpers.go +++ b/cmd/autothrottle/helpers.go @@ -4,7 +4,7 @@ import ( "bytes" "sort" - "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" ) // bmapBundle holds several maps diff --git a/cmd/autothrottle/limits.go b/cmd/autothrottle/limits.go index 1db7434f..17d5b3fa 100644 --- a/cmd/autothrottle/limits.go +++ b/cmd/autothrottle/limits.go @@ -4,7 +4,7 @@ import ( "errors" "math" - "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" ) // Limits is a map of instance-type diff --git a/cmd/autothrottle/limits_test.go b/cmd/autothrottle/limits_test.go index 94093133..97086d58 100644 --- a/cmd/autothrottle/limits_test.go +++ b/cmd/autothrottle/limits_test.go @@ -3,7 +3,7 @@ package main import ( "testing" - "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" ) func TestNewLimits(t *testing.T) { diff --git a/cmd/autothrottle/main.go b/cmd/autothrottle/main.go index 7399e807..3ebca72a 100644 --- a/cmd/autothrottle/main.go +++ b/cmd/autothrottle/main.go @@ -10,9 +10,9 @@ import ( "strings" "time" - "github.com/DataDog/kafka-kit/kafkametrics" - "github.com/DataDog/kafka-kit/kafkametrics/datadog" - "github.com/DataDog/kafka-kit/kafkazk" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics/datadog" + "github.com/mrmuggymuggy/kafka-kit/kafkazk" "github.com/jamiealquiza/envy" ) @@ -25,6 +25,7 @@ var ( AppKey string NetworkTXQuery string BrokerIDTag string + InstanceTypeTag string MetricsWindow int ZKAddr string ZKPrefix string @@ -51,6 +52,7 @@ func init() { flag.StringVar(&Config.AppKey, "app-key", "", "Datadog app key") flag.StringVar(&Config.NetworkTXQuery, "net-tx-query", "avg:system.net.bytes_sent{service:kafka} by {host}", "Datadog query for broker outbound bandwidth by host") flag.StringVar(&Config.BrokerIDTag, "broker-id-tag", "broker_id", "Datadog host tag for broker ID") + flag.StringVar(&Config.InstanceTypeTag, "instance-type-tag", "instance-type", "Datadog instance-type tag for broker") flag.IntVar(&Config.MetricsWindow, "metrics-window", 120, "Time span of metrics required (seconds)") flag.StringVar(&Config.ZKAddr, "zk-addr", "localhost:2181", "ZooKeeper connect string (for broker metadata or rebuild-topic lookups)") flag.StringVar(&Config.ZKPrefix, "zk-prefix", "", "ZooKeeper namespace prefix") @@ -106,11 +108,12 @@ func main() { // Init a Kafka metrics fetcher. km, err := datadog.NewHandler(&datadog.Config{ - APIKey: Config.APIKey, - AppKey: Config.AppKey, - NetworkTXQuery: Config.NetworkTXQuery, - BrokerIDTag: Config.BrokerIDTag, - MetricsWindow: Config.MetricsWindow, + APIKey: Config.APIKey, + AppKey: Config.AppKey, + NetworkTXQuery: Config.NetworkTXQuery, + BrokerIDTag: Config.BrokerIDTag, + InstanceTypeTag: Config.InstanceTypeTag, + MetricsWindow: Config.MetricsWindow, }) if err != nil { log.Fatal(err) diff --git a/cmd/autothrottle/throttles.go b/cmd/autothrottle/throttles.go index 2f829bcf..1861fcba 100644 --- a/cmd/autothrottle/throttles.go +++ b/cmd/autothrottle/throttles.go @@ -9,8 +9,8 @@ import ( "strconv" "time" - "github.com/DataDog/kafka-kit/kafkametrics" - "github.com/DataDog/kafka-kit/kafkazk" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkazk" ) // ReplicationThrottleMeta holds all types @@ -127,7 +127,6 @@ func updateReplicationThrottle(params *ReplicationThrottleMeta) error { inFailureMode = true } } - // If we cannot proceed normally due to missing/partial // metrics data, check what failure iteration we're in. // If we're above the threshold, revert to the minimum @@ -290,7 +289,6 @@ func repCapacityByMetrics(rtm *ReplicationThrottleMeta, bmb bmapBundle, bm kafka participatingBrokers := &ReassigningBrokers{} var event string - // Source brokers. for b := range bmb.src { if broker, exists := bm[b]; exists { diff --git a/cmd/autothrottle/throttles_test.go b/cmd/autothrottle/throttles_test.go index f1aa1576..236c0c8d 100644 --- a/cmd/autothrottle/throttles_test.go +++ b/cmd/autothrottle/throttles_test.go @@ -5,8 +5,8 @@ import ( "sort" "testing" - "github.com/DataDog/kafka-kit/kafkametrics" - "github.com/DataDog/kafka-kit/kafkazk" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkazk" ) func TestHighestSrcNetTX(t *testing.T) { diff --git a/kafkametrics/datadog/datadog.go b/kafkametrics/datadog/datadog.go index 95ed21d8..390c07e1 100644 --- a/kafkametrics/datadog/datadog.go +++ b/kafkametrics/datadog/datadog.go @@ -7,7 +7,7 @@ import ( "regexp" "time" - "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" dd "github.com/zorkian/go-datadog-api" ) @@ -27,6 +27,9 @@ type Config struct { // BrokerIDTag is the host tag name // for Kafka broker IDs. BrokerIDTag string + // InstanceTypeTag is the instance type tag name + // for Kafka broker Instance. + InstanceTypeTag string // MetricsWindow specifies the window size of // timeseries data to evaluate in seconds. // All values for the window are averaged. @@ -34,13 +37,14 @@ type Config struct { } type ddHandler struct { - c *dd.Client - netTXQuery string - brokerIDTag string - metricsWindow int - tagCache map[string][]string - keysRegex *regexp.Regexp - redactionSub []byte + c *dd.Client + netTXQuery string + brokerIDTag string + InstanceTypeTag string + metricsWindow int + tagCache map[string][]string + keysRegex *regexp.Regexp + redactionSub []byte } // NewHandler takes a *Config and @@ -57,12 +61,13 @@ func NewHandler(c *Config) (kafkametrics.Handler, error) { keysRegex := regexp.MustCompile(fmt.Sprintf("%s|%s", c.APIKey, c.AppKey)) h := &ddHandler{ - netTXQuery: createNetTXQuery(c), - metricsWindow: c.MetricsWindow, - brokerIDTag: c.BrokerIDTag, - tagCache: make(map[string][]string), - keysRegex: keysRegex, - redactionSub: []byte("xxx"), + netTXQuery: createNetTXQuery(c), + metricsWindow: c.MetricsWindow, + brokerIDTag: c.BrokerIDTag, + InstanceTypeTag: c.InstanceTypeTag, + tagCache: make(map[string][]string), + keysRegex: keysRegex, + redactionSub: []byte("xxx"), } client := dd.NewClient(c.APIKey, c.AppKey) @@ -132,7 +137,6 @@ func (h *ddHandler) GetMetrics() (kafkametrics.BrokerMetrics, []error) { if errs != nil { errors = append(errors, errs...) } - // The []*kafkametrics.Broker only contains hostnames // and the network tx metric. Fetch the rest // of the required metadata and construct diff --git a/kafkametrics/datadog/datadog_test.go b/kafkametrics/datadog/datadog_test.go index d5e1637d..d31d2a45 100644 --- a/kafkametrics/datadog/datadog_test.go +++ b/kafkametrics/datadog/datadog_test.go @@ -6,7 +6,7 @@ import ( "fmt" "testing" - "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" dd "github.com/zorkian/go-datadog-api" ) @@ -98,7 +98,7 @@ func TestPopulateFromTagMap(t *testing.T) { // Test with complete input. tagMap := mockTagMap() - err := populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id") + err := populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id", "instance-type") if err != nil { t.Errorf("Unexpected error: %s\n", err) } @@ -119,7 +119,7 @@ func TestPopulateFromTagMap(t *testing.T) { // Test with incomplete input. tagMap[rndBroker] = tagMap[rndBroker][1:] - err = populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id") + err = populateFromTagMap(b, map[string][]string{}, tagMap, "broker_id", "instance-type") if err == nil { t.Errorf("Expected error, got nil") } diff --git a/kafkametrics/datadog/helpers.go b/kafkametrics/datadog/helpers.go index b3e1edc5..10687004 100644 --- a/kafkametrics/datadog/helpers.go +++ b/kafkametrics/datadog/helpers.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/mrmuggymuggy/kafka-kit/kafkametrics" dd "github.com/zorkian/go-datadog-api" ) @@ -65,11 +65,10 @@ func (h *ddHandler) brokerMetricsFromList(l []*kafkametrics.Broker) (kafkametric } brokers := kafkametrics.BrokerMetrics{} - errs = populateFromTagMap(brokers, h.tagCache, tags, h.brokerIDTag) + errs = populateFromTagMap(brokers, h.tagCache, tags, h.brokerIDTag, h.InstanceTypeTag) if errs != nil { errs = append(errors, errs...) } - return brokers, errors } @@ -103,7 +102,6 @@ func (h *ddHandler) getHostTagMap(l []*kafkametrics.Broker) (map[*kafkametrics.B brokers[b] = ht } } - return brokers, errors } @@ -112,7 +110,7 @@ func (h *ddHandler) getHostTagMap(l []*kafkametrics.Broker) (map[*kafkametrics.B // to []string unparsed host tag key:value pairs, and a broker ID tag key // populates the kafkametrics.BrokerMetrics with tags of interest. // An error describing any missing tags is returned. -func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t map[*kafkametrics.Broker][]string, btag string) []error { +func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t map[*kafkametrics.Broker][]string, btag string, itag string) []error { var missingTags bytes.Buffer for b, ht := range t { @@ -133,7 +131,7 @@ func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t } // Get instance type. - it = valFromTags(ht, "instance-type") + it = valFromTags(ht, itag) if it != "" { // Cache this broker's tags. In case additional tags are populated // in the future, we should only cache brokers that have @@ -153,6 +151,7 @@ func populateFromTagMap(bm kafkametrics.BrokerMetrics, c map[string][]string, t b.ID = id b.InstanceType = it bm[id] = b + } if missingTags.String() != "" { From ea78c3362bb70615c964ba45bc2c33bd1b06d40e Mon Sep 17 00:00:00 2001 From: zhaolong Date: Mon, 24 Jun 2019 17:13:06 +0200 Subject: [PATCH 2/2] add instance type tag on autothrottle configuration --- cmd/autothrottle/api.go | 2 +- cmd/autothrottle/events.go | 2 +- cmd/autothrottle/helpers.go | 2 +- cmd/autothrottle/limits.go | 2 +- cmd/autothrottle/limits_test.go | 2 +- cmd/autothrottle/main.go | 6 +++--- cmd/autothrottle/throttles.go | 6 ++++-- cmd/autothrottle/throttles_test.go | 4 ++-- kafkametrics/datadog/datadog.go | 7 ++++--- kafkametrics/datadog/datadog_test.go | 2 +- kafkametrics/datadog/helpers.go | 6 ++++-- 11 files changed, 23 insertions(+), 18 deletions(-) diff --git a/cmd/autothrottle/api.go b/cmd/autothrottle/api.go index 7c16bfb2..5f7be56a 100644 --- a/cmd/autothrottle/api.go +++ b/cmd/autothrottle/api.go @@ -7,7 +7,7 @@ import ( "net/http" "strconv" - "github.com/mrmuggymuggy/kafka-kit/kafkazk" + "github.com/DataDog/kafka-kit/kafkazk" ) // APIConfig holds configuration diff --git a/cmd/autothrottle/events.go b/cmd/autothrottle/events.go index ece3a52b..ac1a3e1e 100644 --- a/cmd/autothrottle/events.go +++ b/cmd/autothrottle/events.go @@ -4,7 +4,7 @@ import ( "fmt" "log" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics" ) // Events configs. diff --git a/cmd/autothrottle/helpers.go b/cmd/autothrottle/helpers.go index 2e904ce2..30156647 100644 --- a/cmd/autothrottle/helpers.go +++ b/cmd/autothrottle/helpers.go @@ -4,7 +4,7 @@ import ( "bytes" "sort" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics" ) // bmapBundle holds several maps diff --git a/cmd/autothrottle/limits.go b/cmd/autothrottle/limits.go index 17d5b3fa..1db7434f 100644 --- a/cmd/autothrottle/limits.go +++ b/cmd/autothrottle/limits.go @@ -4,7 +4,7 @@ import ( "errors" "math" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics" ) // Limits is a map of instance-type diff --git a/cmd/autothrottle/limits_test.go b/cmd/autothrottle/limits_test.go index 97086d58..94093133 100644 --- a/cmd/autothrottle/limits_test.go +++ b/cmd/autothrottle/limits_test.go @@ -3,7 +3,7 @@ package main import ( "testing" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics" ) func TestNewLimits(t *testing.T) { diff --git a/cmd/autothrottle/main.go b/cmd/autothrottle/main.go index 3ebca72a..bd5e28e2 100644 --- a/cmd/autothrottle/main.go +++ b/cmd/autothrottle/main.go @@ -10,9 +10,9 @@ import ( "strings" "time" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics/datadog" - "github.com/mrmuggymuggy/kafka-kit/kafkazk" + "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics/datadog" + "github.com/DataDog/kafka-kit/kafkazk" "github.com/jamiealquiza/envy" ) diff --git a/cmd/autothrottle/throttles.go b/cmd/autothrottle/throttles.go index 1861fcba..2f829bcf 100644 --- a/cmd/autothrottle/throttles.go +++ b/cmd/autothrottle/throttles.go @@ -9,8 +9,8 @@ import ( "strconv" "time" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" - "github.com/mrmuggymuggy/kafka-kit/kafkazk" + "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkazk" ) // ReplicationThrottleMeta holds all types @@ -127,6 +127,7 @@ func updateReplicationThrottle(params *ReplicationThrottleMeta) error { inFailureMode = true } } + // If we cannot proceed normally due to missing/partial // metrics data, check what failure iteration we're in. // If we're above the threshold, revert to the minimum @@ -289,6 +290,7 @@ func repCapacityByMetrics(rtm *ReplicationThrottleMeta, bmb bmapBundle, bm kafka participatingBrokers := &ReassigningBrokers{} var event string + // Source brokers. for b := range bmb.src { if broker, exists := bm[b]; exists { diff --git a/cmd/autothrottle/throttles_test.go b/cmd/autothrottle/throttles_test.go index 236c0c8d..f1aa1576 100644 --- a/cmd/autothrottle/throttles_test.go +++ b/cmd/autothrottle/throttles_test.go @@ -5,8 +5,8 @@ import ( "sort" "testing" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" - "github.com/mrmuggymuggy/kafka-kit/kafkazk" + "github.com/DataDog/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkazk" ) func TestHighestSrcNetTX(t *testing.T) { diff --git a/kafkametrics/datadog/datadog.go b/kafkametrics/datadog/datadog.go index 390c07e1..002c0b57 100644 --- a/kafkametrics/datadog/datadog.go +++ b/kafkametrics/datadog/datadog.go @@ -7,7 +7,7 @@ import ( "regexp" "time" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics" dd "github.com/zorkian/go-datadog-api" ) @@ -40,7 +40,7 @@ type ddHandler struct { c *dd.Client netTXQuery string brokerIDTag string - InstanceTypeTag string + instanceTypeTag string metricsWindow int tagCache map[string][]string keysRegex *regexp.Regexp @@ -64,7 +64,7 @@ func NewHandler(c *Config) (kafkametrics.Handler, error) { netTXQuery: createNetTXQuery(c), metricsWindow: c.MetricsWindow, brokerIDTag: c.BrokerIDTag, - InstanceTypeTag: c.InstanceTypeTag, + instanceTypeTag: c.InstanceTypeTag, tagCache: make(map[string][]string), keysRegex: keysRegex, redactionSub: []byte("xxx"), @@ -137,6 +137,7 @@ func (h *ddHandler) GetMetrics() (kafkametrics.BrokerMetrics, []error) { if errs != nil { errors = append(errors, errs...) } + // The []*kafkametrics.Broker only contains hostnames // and the network tx metric. Fetch the rest // of the required metadata and construct diff --git a/kafkametrics/datadog/datadog_test.go b/kafkametrics/datadog/datadog_test.go index d31d2a45..bf0921b4 100644 --- a/kafkametrics/datadog/datadog_test.go +++ b/kafkametrics/datadog/datadog_test.go @@ -6,7 +6,7 @@ import ( "fmt" "testing" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics" dd "github.com/zorkian/go-datadog-api" ) diff --git a/kafkametrics/datadog/helpers.go b/kafkametrics/datadog/helpers.go index 10687004..64159d15 100644 --- a/kafkametrics/datadog/helpers.go +++ b/kafkametrics/datadog/helpers.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/mrmuggymuggy/kafka-kit/kafkametrics" + "github.com/DataDog/kafka-kit/kafkametrics" dd "github.com/zorkian/go-datadog-api" ) @@ -65,10 +65,11 @@ func (h *ddHandler) brokerMetricsFromList(l []*kafkametrics.Broker) (kafkametric } brokers := kafkametrics.BrokerMetrics{} - errs = populateFromTagMap(brokers, h.tagCache, tags, h.brokerIDTag, h.InstanceTypeTag) + errs = populateFromTagMap(brokers, h.tagCache, tags, h.brokerIDTag, h.instanceTypeTag) if errs != nil { errs = append(errors, errs...) } + return brokers, errors } @@ -102,6 +103,7 @@ func (h *ddHandler) getHostTagMap(l []*kafkametrics.Broker) (map[*kafkametrics.B brokers[b] = ht } } + return brokers, errors }