From 8bd73043cd1a92ab33af5183d387e62104bf1dd7 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Tue, 17 Jun 2025 23:01:02 -0400 Subject: [PATCH 1/2] Create a new cluster info retriever This is an alternative implementation of the previous retriever in pkg/clusterinfo. The previous implementation required registering collectors and receiving updates over a channel. This InfoProvider instead provides a public method to get the cluster info with a cache to reduce network calls. The Collector.Update signature was also updated to take an UpdateContext. This UpdateContext can provide with an easy way to extend the context available during the Update calls on Collectors. Signed-off-by: Joe Adams --- cluster/provider.go | 117 ++++++++++++++++++++++++++++++++++ cluster/provider_test.go | 107 +++++++++++++++++++++++++++++++ collector/cluster_info.go | 2 +- collector/cluster_settings.go | 2 +- collector/collector.go | 22 +++++-- collector/collector_test.go | 9 ++- collector/data_stream.go | 2 +- collector/health_report.go | 2 +- collector/ilm.go | 2 +- collector/slm.go | 2 +- collector/snapshots.go | 2 +- collector/tasks.go | 2 +- collector/update_context.go | 45 +++++++++++++ main.go | 5 ++ 14 files changed, 308 insertions(+), 13 deletions(-) create mode 100644 cluster/provider.go create mode 100644 cluster/provider_test.go create mode 100644 collector/update_context.go diff --git a/cluster/provider.go b/cluster/provider.go new file mode 100644 index 00000000..a3e0d8f3 --- /dev/null +++ b/cluster/provider.go @@ -0,0 +1,117 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "path" + "sync" + "time" +) + +type Info struct { + ClusterName string `json:"cluster_name"` +} + +type InfoProvider struct { + logger *slog.Logger + client *http.Client + url *url.URL + interval time.Duration + lastClusterInfo Info + lastError error + cachedAt time.Time // Time when the last cluster info was fetched + mu sync.RWMutex // Protects lastClusterInfo, lastError, and cachedAt +} + +// New creates a new Retriever. +func NewInfoProvider(logger *slog.Logger, client *http.Client, u *url.URL, interval time.Duration) *InfoProvider { + return &InfoProvider{ + logger: logger, + client: client, + url: u, + interval: interval, + } +} + +func (i *InfoProvider) GetInfo(ctx context.Context) (Info, error) { + i.mu.RLock() + info := i.lastClusterInfo + err := i.lastError + cachedAt := i.cachedAt + + i.mu.RUnlock() + + // If the cached info is recent enough, return it. + if !cachedAt.IsZero() && time.Since(cachedAt) < i.interval { + + if err != nil { + return Info{}, err + } + + if info.ClusterName != "" { + return info, nil + } + } + + i.mu.Lock() + defer i.mu.Unlock() + + // If we reach here, we need to fetch the cluster info. The cache is either empty or stale. + u := *i.url + u.Path = path.Join(u.Path, "/") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return Info{}, err + } + + resp, err := i.client.Do(req) + if err != nil { + i.logger.Error("failed to fetch cluster info", "err", err) + return Info{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("unexpected status code: %d", resp.StatusCode) + i.lastError = err + return Info{}, err + } + + var infoResponse Info + body, err := io.ReadAll(resp.Body) + if err != nil { + i.lastError = err + return Info{}, err + } + + if err := json.Unmarshal(body, &infoResponse); err != nil { + i.lastError = err + return Info{}, fmt.Errorf("failed to unmarshal cluster info: %w", err) + } + + info = Info{ClusterName: infoResponse.ClusterName} + i.lastClusterInfo = info + i.lastError = nil + i.cachedAt = time.Now() + + return info, nil +} diff --git a/cluster/provider_test.go b/cluster/provider_test.go new file mode 100644 index 00000000..ce6ffa19 --- /dev/null +++ b/cluster/provider_test.go @@ -0,0 +1,107 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "os" + "reflect" + "testing" + "time" + + "github.com/prometheus/common/promslog" +) + +func TestInfoProvider_GetInfo(t *testing.T) { + timesURLCalled := 0 + expectedInfo := Info{ + ClusterName: "test-cluster-1", + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + timesURLCalled++ + _, _ = w.Write([]byte(`{ + "name": "test-node-abcd", + "cluster_name": "test-cluster-1", + "cluster_uuid": "r1bT9sBrR7S9-CamE41Qqg", + "version": { + "number": "5.6.9", + "build_hash": "877a590", + "build_date": "2018-04-12T16:25:14.838Z", + "build_snapshot": false, + "lucene_version": "6.6.1" + } + }`)) + })) + tsURL, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("failed to parse test server URL: %v", err) + } + defer ts.Close() + + i := NewInfoProvider(promslog.New(&promslog.Config{Writer: os.Stdout}), http.DefaultClient, tsURL, time.Second) + + if timesURLCalled != 0 { + t.Errorf("expected no initial URL calls, got %d", timesURLCalled) + } + + got, err := i.GetInfo(context.Background()) + if err != nil { + t.Errorf("InfoProvider.GetInfo() error = %v, wantErr %v", err, false) + return + } + + if !reflect.DeepEqual(got, expectedInfo) { + t.Errorf("InfoProvider.GetInfo() = %v, want %v", got, expectedInfo) + } + + if timesURLCalled != 1 { + t.Errorf("expected URL to be called once, got %d", timesURLCalled) + } + + // Call again to ensure cached value is returned + got, err = i.GetInfo(context.Background()) + if err != nil { + t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false) + return + } + if !reflect.DeepEqual(got, expectedInfo) { + t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo) + } + if timesURLCalled != 1 { + t.Errorf("expected URL to be called only once, got %d", timesURLCalled) + } + + // Call again after delay to ensure we refresh the cache + time.Sleep(2 * time.Second) + got, err = i.GetInfo(context.Background()) + if err != nil { + t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false) + return + } + if !reflect.DeepEqual(got, expectedInfo) { + t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo) + } + if timesURLCalled != 2 { + t.Errorf("expected URL to be called only once, got %d", timesURLCalled) + } + +} diff --git a/collector/cluster_info.go b/collector/cluster_info.go index 6e489a84..7117dba2 100644 --- a/collector/cluster_info.go +++ b/collector/cluster_info.go @@ -77,7 +77,7 @@ type VersionInfo struct { LuceneVersion semver.Version `json:"lucene_version"` } -func (c *ClusterInfoCollector) Update(_ context.Context, ch chan<- prometheus.Metric) error { +func (c *ClusterInfoCollector) Update(_ context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { resp, err := c.hc.Get(c.u.String()) if err != nil { return err diff --git a/collector/cluster_settings.go b/collector/cluster_settings.go index 10e7a180..b919b058 100644 --- a/collector/cluster_settings.go +++ b/collector/cluster_settings.go @@ -145,7 +145,7 @@ type clusterSettingsWatermark struct { Low string `json:"low"` } -func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (c *ClusterSettingsCollector) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"}) q := u.Query() q.Set("include_defaults", "true") diff --git a/collector/collector.go b/collector/collector.go index 86e4d70c..acdb1c23 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -25,6 +25,7 @@ import ( "time" "github.com/alecthomas/kingpin/v2" + "github.com/prometheus-community/elasticsearch_exporter/cluster" "github.com/prometheus/client_golang/prometheus" ) @@ -64,7 +65,7 @@ var ( // Collector is the interface a collector has to implement. type Collector interface { // Get new metrics and expose them via prometheus registry. - Update(context.Context, chan<- prometheus.Metric) error + Update(context.Context, UpdateContext, chan<- prometheus.Metric) error } func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) { @@ -92,6 +93,7 @@ type ElasticsearchCollector struct { logger *slog.Logger esURL *url.URL httpClient *http.Client + cluserInfo *cluster.InfoProvider } type Option func(*ElasticsearchCollector) error @@ -106,6 +108,10 @@ func NewElasticsearchCollector(logger *slog.Logger, filters []string, options .. } } + if e.cluserInfo == nil { + return nil, fmt.Errorf("cluster info provider is not set") + } + f := make(map[string]bool) for _, filter := range filters { enabled, exist := collectorState[filter] @@ -155,6 +161,13 @@ func WithHTTPClient(hc *http.Client) Option { } } +func WithClusterInfoProvider(cl *cluster.InfoProvider) Option { + return func(e *ElasticsearchCollector) error { + e.cluserInfo = cl + return nil + } +} + // Describe implements the prometheus.Collector interface. func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeDurationDesc @@ -163,21 +176,22 @@ func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) { // Collect implements the prometheus.Collector interface. func (e ElasticsearchCollector) Collect(ch chan<- prometheus.Metric) { + uc := NewDefaultUpdateContext(e.cluserInfo) wg := sync.WaitGroup{} ctx := context.TODO() wg.Add(len(e.Collectors)) for name, c := range e.Collectors { go func(name string, c Collector) { - execute(ctx, name, c, ch, e.logger) + execute(ctx, name, c, ch, e.logger, uc) wg.Done() }(name, c) } wg.Wait() } -func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger) { +func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger, uc UpdateContext) { begin := time.Now() - err := c.Update(ctx, ch) + err := c.Update(ctx, uc, ch) duration := time.Since(begin) var success float64 diff --git a/collector/collector_test.go b/collector/collector_test.go index 84c6d03e..a09d52ab 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -16,6 +16,7 @@ package collector import ( "context" + "github.com/prometheus-community/elasticsearch_exporter/cluster" "github.com/prometheus/client_golang/prometheus" ) @@ -32,5 +33,11 @@ func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) { } func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { - w.c.Update(context.Background(), ch) + w.c.Update(context.Background(), &mockUpdateContext{}, ch) +} + +type mockUpdateContext struct{} + +func (m *mockUpdateContext) GetClusterInfo(_ context.Context) (cluster.Info, error) { + return cluster.Info{}, nil } diff --git a/collector/data_stream.go b/collector/data_stream.go index c7f1c4c9..a13cc90e 100644 --- a/collector/data_stream.go +++ b/collector/data_stream.go @@ -83,7 +83,7 @@ type DataStreamStatsDataStream struct { MaximumTimestamp int64 `json:"maximum_timestamp"` } -func (ds *DataStream) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (ds *DataStream) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { var dsr DataStreamStatsResponse u := ds.u.ResolveReference(&url.URL{Path: "/_data_stream/*/_stats"}) diff --git a/collector/health_report.go b/collector/health_report.go index 4933d98c..9c0ceb2d 100644 --- a/collector/health_report.go +++ b/collector/health_report.go @@ -300,7 +300,7 @@ func statusValue(value string, color string) float64 { return 0 } -func (c *HealthReport) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (c *HealthReport) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { u := c.url.ResolveReference(&url.URL{Path: "/_health_report"}) var healthReportResponse HealthReportResponse diff --git a/collector/ilm.go b/collector/ilm.go index 79560687..398e6d42 100644 --- a/collector/ilm.go +++ b/collector/ilm.go @@ -74,7 +74,7 @@ type IlmStatusResponse struct { OperationMode string `json:"operation_mode"` } -func (i *ILM) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (i *ILM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { var ir IlmResponse indexURL := i.u.ResolveReference(&url.URL{Path: "/_all/_ilm/explain"}) diff --git a/collector/slm.go b/collector/slm.go index 01834f03..0dd8fdeb 100644 --- a/collector/slm.go +++ b/collector/slm.go @@ -143,7 +143,7 @@ type SLMStatusResponse struct { OperationMode string `json:"operation_mode"` } -func (s *SLM) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (s *SLM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { u := s.u.ResolveReference(&url.URL{Path: "/_slm/status"}) var slmStatusResp SLMStatusResponse diff --git a/collector/snapshots.go b/collector/snapshots.go index 95bfadfa..65a90e68 100644 --- a/collector/snapshots.go +++ b/collector/snapshots.go @@ -110,7 +110,7 @@ func NewSnapshots(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, }, nil } -func (c *Snapshots) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (c *Snapshots) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { // indices snapshotsStatsResp := make(map[string]SnapshotStatsResponse) u := c.u.ResolveReference(&url.URL{Path: "/_snapshot"}) diff --git a/collector/tasks.go b/collector/tasks.go index ca751616..6cf38a82 100644 --- a/collector/tasks.go +++ b/collector/tasks.go @@ -62,7 +62,7 @@ func NewTaskCollector(logger *slog.Logger, u *url.URL, hc *http.Client) (Collect }, nil } -func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (t *TaskCollector) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { tasks, err := t.fetchTasks(ctx) if err != nil { return fmt.Errorf("failed to fetch and decode task stats: %w", err) diff --git a/collector/update_context.go b/collector/update_context.go new file mode 100644 index 00000000..050ad9f8 --- /dev/null +++ b/collector/update_context.go @@ -0,0 +1,45 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package collector includes all individual collectors to gather and export elasticsearch metrics. +package collector + +import ( + "context" + + "github.com/prometheus-community/elasticsearch_exporter/cluster" +) + +type UpdateContext interface { + // GetClusterInfo returns the current cluster info. + GetClusterInfo(context.Context) (cluster.Info, error) +} + +// DefaultUpdateContext is the default implementation of UpdateContext. +type DefaultUpdateContext struct { + clusterInfo *cluster.InfoProvider +} + +// NewDefaultUpdateContext creates a new DefaultUpdateContext. +func NewDefaultUpdateContext(clusterInfo *cluster.InfoProvider) *DefaultUpdateContext { + return &DefaultUpdateContext{clusterInfo: clusterInfo} +} + +// Retriever returns the cluster info retriever. +func (c *DefaultUpdateContext) GetClusterInfo(ctx context.Context) (cluster.Info, error) { + info, err := c.clusterInfo.GetInfo(ctx) + if err != nil { + return cluster.Info{}, err + } + return info, nil +} diff --git a/main.go b/main.go index a8405f45..251dd2ee 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/exporter-toolkit/web" webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" + "github.com/prometheus-community/elasticsearch_exporter/cluster" "github.com/prometheus-community/elasticsearch_exporter/collector" "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" "github.com/prometheus-community/elasticsearch_exporter/pkg/roundtripper" @@ -177,12 +178,16 @@ func main() { // version metric prometheus.MustRegister(versioncollector.NewCollector(name)) + // This should replace the below cluster info retriever in the future. + infoRetriever := cluster.NewInfoProvider(logger, httpClient, esURL, *esClusterInfoInterval) + // create the exporter exporter, err := collector.NewElasticsearchCollector( logger, []string{}, collector.WithElasticsearchURL(esURL), collector.WithHTTPClient(httpClient), + collector.WithClusterInfoProvider(infoRetriever), ) if err != nil { logger.Error("failed to create Elasticsearch collector", "err", err) From ba5c567aabcaa34143358f5531e70020ce358f29 Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Tue, 17 Jun 2025 23:37:14 -0400 Subject: [PATCH 2/2] CI linting fixup Signed-off-by: Joe Adams --- cluster/provider.go | 1 - cluster/provider_test.go | 1 - collector/collector.go | 3 ++- collector/collector_test.go | 3 ++- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cluster/provider.go b/cluster/provider.go index a3e0d8f3..6d38f53e 100644 --- a/cluster/provider.go +++ b/cluster/provider.go @@ -61,7 +61,6 @@ func (i *InfoProvider) GetInfo(ctx context.Context) (Info, error) { // If the cached info is recent enough, return it. if !cachedAt.IsZero() && time.Since(cachedAt) < i.interval { - if err != nil { return Info{}, err } diff --git a/cluster/provider_test.go b/cluster/provider_test.go index ce6ffa19..8b4946de 100644 --- a/cluster/provider_test.go +++ b/cluster/provider_test.go @@ -103,5 +103,4 @@ func TestInfoProvider_GetInfo(t *testing.T) { if timesURLCalled != 2 { t.Errorf("expected URL to be called only once, got %d", timesURLCalled) } - } diff --git a/collector/collector.go b/collector/collector.go index acdb1c23..9040125c 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -25,8 +25,9 @@ import ( "time" "github.com/alecthomas/kingpin/v2" - "github.com/prometheus-community/elasticsearch_exporter/cluster" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/cluster" ) const ( diff --git a/collector/collector_test.go b/collector/collector_test.go index a09d52ab..e298c635 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -16,8 +16,9 @@ package collector import ( "context" - "github.com/prometheus-community/elasticsearch_exporter/cluster" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/cluster" ) // wrapCollector is a util to let you test your Collector implementation.