diff --git a/cluster/provider.go b/cluster/provider.go new file mode 100644 index 00000000..6d38f53e --- /dev/null +++ b/cluster/provider.go @@ -0,0 +1,116 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "path" + "sync" + "time" +) + +type Info struct { + ClusterName string `json:"cluster_name"` +} + +type InfoProvider struct { + logger *slog.Logger + client *http.Client + url *url.URL + interval time.Duration + lastClusterInfo Info + lastError error + cachedAt time.Time // Time when the last cluster info was fetched + mu sync.RWMutex // Protects lastClusterInfo, lastError, and cachedAt +} + +// New creates a new Retriever. +func NewInfoProvider(logger *slog.Logger, client *http.Client, u *url.URL, interval time.Duration) *InfoProvider { + return &InfoProvider{ + logger: logger, + client: client, + url: u, + interval: interval, + } +} + +func (i *InfoProvider) GetInfo(ctx context.Context) (Info, error) { + i.mu.RLock() + info := i.lastClusterInfo + err := i.lastError + cachedAt := i.cachedAt + + i.mu.RUnlock() + + // If the cached info is recent enough, return it. + if !cachedAt.IsZero() && time.Since(cachedAt) < i.interval { + if err != nil { + return Info{}, err + } + + if info.ClusterName != "" { + return info, nil + } + } + + i.mu.Lock() + defer i.mu.Unlock() + + // If we reach here, we need to fetch the cluster info. The cache is either empty or stale. + u := *i.url + u.Path = path.Join(u.Path, "/") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return Info{}, err + } + + resp, err := i.client.Do(req) + if err != nil { + i.logger.Error("failed to fetch cluster info", "err", err) + return Info{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("unexpected status code: %d", resp.StatusCode) + i.lastError = err + return Info{}, err + } + + var infoResponse Info + body, err := io.ReadAll(resp.Body) + if err != nil { + i.lastError = err + return Info{}, err + } + + if err := json.Unmarshal(body, &infoResponse); err != nil { + i.lastError = err + return Info{}, fmt.Errorf("failed to unmarshal cluster info: %w", err) + } + + info = Info{ClusterName: infoResponse.ClusterName} + i.lastClusterInfo = info + i.lastError = nil + i.cachedAt = time.Now() + + return info, nil +} diff --git a/cluster/provider_test.go b/cluster/provider_test.go new file mode 100644 index 00000000..8b4946de --- /dev/null +++ b/cluster/provider_test.go @@ -0,0 +1,106 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "os" + "reflect" + "testing" + "time" + + "github.com/prometheus/common/promslog" +) + +func TestInfoProvider_GetInfo(t *testing.T) { + timesURLCalled := 0 + expectedInfo := Info{ + ClusterName: "test-cluster-1", + } + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + w.Header().Set("Content-Type", "application/json") + timesURLCalled++ + _, _ = w.Write([]byte(`{ + "name": "test-node-abcd", + "cluster_name": "test-cluster-1", + "cluster_uuid": "r1bT9sBrR7S9-CamE41Qqg", + "version": { + "number": "5.6.9", + "build_hash": "877a590", + "build_date": "2018-04-12T16:25:14.838Z", + "build_snapshot": false, + "lucene_version": "6.6.1" + } + }`)) + })) + tsURL, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("failed to parse test server URL: %v", err) + } + defer ts.Close() + + i := NewInfoProvider(promslog.New(&promslog.Config{Writer: os.Stdout}), http.DefaultClient, tsURL, time.Second) + + if timesURLCalled != 0 { + t.Errorf("expected no initial URL calls, got %d", timesURLCalled) + } + + got, err := i.GetInfo(context.Background()) + if err != nil { + t.Errorf("InfoProvider.GetInfo() error = %v, wantErr %v", err, false) + return + } + + if !reflect.DeepEqual(got, expectedInfo) { + t.Errorf("InfoProvider.GetInfo() = %v, want %v", got, expectedInfo) + } + + if timesURLCalled != 1 { + t.Errorf("expected URL to be called once, got %d", timesURLCalled) + } + + // Call again to ensure cached value is returned + got, err = i.GetInfo(context.Background()) + if err != nil { + t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false) + return + } + if !reflect.DeepEqual(got, expectedInfo) { + t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo) + } + if timesURLCalled != 1 { + t.Errorf("expected URL to be called only once, got %d", timesURLCalled) + } + + // Call again after delay to ensure we refresh the cache + time.Sleep(2 * time.Second) + got, err = i.GetInfo(context.Background()) + if err != nil { + t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false) + return + } + if !reflect.DeepEqual(got, expectedInfo) { + t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo) + } + if timesURLCalled != 2 { + t.Errorf("expected URL to be called only once, got %d", timesURLCalled) + } +} diff --git a/collector/cluster_info.go b/collector/cluster_info.go index 6e489a84..7117dba2 100644 --- a/collector/cluster_info.go +++ b/collector/cluster_info.go @@ -77,7 +77,7 @@ type VersionInfo struct { LuceneVersion semver.Version `json:"lucene_version"` } -func (c *ClusterInfoCollector) Update(_ context.Context, ch chan<- prometheus.Metric) error { +func (c *ClusterInfoCollector) Update(_ context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { resp, err := c.hc.Get(c.u.String()) if err != nil { return err diff --git a/collector/cluster_settings.go b/collector/cluster_settings.go index 10e7a180..b919b058 100644 --- a/collector/cluster_settings.go +++ b/collector/cluster_settings.go @@ -145,7 +145,7 @@ type clusterSettingsWatermark struct { Low string `json:"low"` } -func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (c *ClusterSettingsCollector) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"}) q := u.Query() q.Set("include_defaults", "true") diff --git a/collector/collector.go b/collector/collector.go index 86e4d70c..9040125c 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -26,6 +26,8 @@ import ( "github.com/alecthomas/kingpin/v2" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/cluster" ) const ( @@ -64,7 +66,7 @@ var ( // Collector is the interface a collector has to implement. type Collector interface { // Get new metrics and expose them via prometheus registry. - Update(context.Context, chan<- prometheus.Metric) error + Update(context.Context, UpdateContext, chan<- prometheus.Metric) error } func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) { @@ -92,6 +94,7 @@ type ElasticsearchCollector struct { logger *slog.Logger esURL *url.URL httpClient *http.Client + cluserInfo *cluster.InfoProvider } type Option func(*ElasticsearchCollector) error @@ -106,6 +109,10 @@ func NewElasticsearchCollector(logger *slog.Logger, filters []string, options .. } } + if e.cluserInfo == nil { + return nil, fmt.Errorf("cluster info provider is not set") + } + f := make(map[string]bool) for _, filter := range filters { enabled, exist := collectorState[filter] @@ -155,6 +162,13 @@ func WithHTTPClient(hc *http.Client) Option { } } +func WithClusterInfoProvider(cl *cluster.InfoProvider) Option { + return func(e *ElasticsearchCollector) error { + e.cluserInfo = cl + return nil + } +} + // Describe implements the prometheus.Collector interface. func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeDurationDesc @@ -163,21 +177,22 @@ func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) { // Collect implements the prometheus.Collector interface. func (e ElasticsearchCollector) Collect(ch chan<- prometheus.Metric) { + uc := NewDefaultUpdateContext(e.cluserInfo) wg := sync.WaitGroup{} ctx := context.TODO() wg.Add(len(e.Collectors)) for name, c := range e.Collectors { go func(name string, c Collector) { - execute(ctx, name, c, ch, e.logger) + execute(ctx, name, c, ch, e.logger, uc) wg.Done() }(name, c) } wg.Wait() } -func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger) { +func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger, uc UpdateContext) { begin := time.Now() - err := c.Update(ctx, ch) + err := c.Update(ctx, uc, ch) duration := time.Since(begin) var success float64 diff --git a/collector/collector_test.go b/collector/collector_test.go index 84c6d03e..e298c635 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -17,6 +17,8 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus-community/elasticsearch_exporter/cluster" ) // wrapCollector is a util to let you test your Collector implementation. @@ -32,5 +34,11 @@ func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) { } func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { - w.c.Update(context.Background(), ch) + w.c.Update(context.Background(), &mockUpdateContext{}, ch) +} + +type mockUpdateContext struct{} + +func (m *mockUpdateContext) GetClusterInfo(_ context.Context) (cluster.Info, error) { + return cluster.Info{}, nil } diff --git a/collector/data_stream.go b/collector/data_stream.go index c7f1c4c9..a13cc90e 100644 --- a/collector/data_stream.go +++ b/collector/data_stream.go @@ -83,7 +83,7 @@ type DataStreamStatsDataStream struct { MaximumTimestamp int64 `json:"maximum_timestamp"` } -func (ds *DataStream) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (ds *DataStream) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { var dsr DataStreamStatsResponse u := ds.u.ResolveReference(&url.URL{Path: "/_data_stream/*/_stats"}) diff --git a/collector/health_report.go b/collector/health_report.go index 4933d98c..9c0ceb2d 100644 --- a/collector/health_report.go +++ b/collector/health_report.go @@ -300,7 +300,7 @@ func statusValue(value string, color string) float64 { return 0 } -func (c *HealthReport) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (c *HealthReport) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { u := c.url.ResolveReference(&url.URL{Path: "/_health_report"}) var healthReportResponse HealthReportResponse diff --git a/collector/ilm.go b/collector/ilm.go index 79560687..398e6d42 100644 --- a/collector/ilm.go +++ b/collector/ilm.go @@ -74,7 +74,7 @@ type IlmStatusResponse struct { OperationMode string `json:"operation_mode"` } -func (i *ILM) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (i *ILM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { var ir IlmResponse indexURL := i.u.ResolveReference(&url.URL{Path: "/_all/_ilm/explain"}) diff --git a/collector/slm.go b/collector/slm.go index 01834f03..0dd8fdeb 100644 --- a/collector/slm.go +++ b/collector/slm.go @@ -143,7 +143,7 @@ type SLMStatusResponse struct { OperationMode string `json:"operation_mode"` } -func (s *SLM) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (s *SLM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { u := s.u.ResolveReference(&url.URL{Path: "/_slm/status"}) var slmStatusResp SLMStatusResponse diff --git a/collector/snapshots.go b/collector/snapshots.go index 95bfadfa..65a90e68 100644 --- a/collector/snapshots.go +++ b/collector/snapshots.go @@ -110,7 +110,7 @@ func NewSnapshots(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector, }, nil } -func (c *Snapshots) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (c *Snapshots) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { // indices snapshotsStatsResp := make(map[string]SnapshotStatsResponse) u := c.u.ResolveReference(&url.URL{Path: "/_snapshot"}) diff --git a/collector/tasks.go b/collector/tasks.go index ca751616..6cf38a82 100644 --- a/collector/tasks.go +++ b/collector/tasks.go @@ -62,7 +62,7 @@ func NewTaskCollector(logger *slog.Logger, u *url.URL, hc *http.Client) (Collect }, nil } -func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { +func (t *TaskCollector) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error { tasks, err := t.fetchTasks(ctx) if err != nil { return fmt.Errorf("failed to fetch and decode task stats: %w", err) diff --git a/collector/update_context.go b/collector/update_context.go new file mode 100644 index 00000000..050ad9f8 --- /dev/null +++ b/collector/update_context.go @@ -0,0 +1,45 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package collector includes all individual collectors to gather and export elasticsearch metrics. +package collector + +import ( + "context" + + "github.com/prometheus-community/elasticsearch_exporter/cluster" +) + +type UpdateContext interface { + // GetClusterInfo returns the current cluster info. + GetClusterInfo(context.Context) (cluster.Info, error) +} + +// DefaultUpdateContext is the default implementation of UpdateContext. +type DefaultUpdateContext struct { + clusterInfo *cluster.InfoProvider +} + +// NewDefaultUpdateContext creates a new DefaultUpdateContext. +func NewDefaultUpdateContext(clusterInfo *cluster.InfoProvider) *DefaultUpdateContext { + return &DefaultUpdateContext{clusterInfo: clusterInfo} +} + +// Retriever returns the cluster info retriever. +func (c *DefaultUpdateContext) GetClusterInfo(ctx context.Context) (cluster.Info, error) { + info, err := c.clusterInfo.GetInfo(ctx) + if err != nil { + return cluster.Info{}, err + } + return info, nil +} diff --git a/main.go b/main.go index a8405f45..251dd2ee 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/exporter-toolkit/web" webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" + "github.com/prometheus-community/elasticsearch_exporter/cluster" "github.com/prometheus-community/elasticsearch_exporter/collector" "github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo" "github.com/prometheus-community/elasticsearch_exporter/pkg/roundtripper" @@ -177,12 +178,16 @@ func main() { // version metric prometheus.MustRegister(versioncollector.NewCollector(name)) + // This should replace the below cluster info retriever in the future. + infoRetriever := cluster.NewInfoProvider(logger, httpClient, esURL, *esClusterInfoInterval) + // create the exporter exporter, err := collector.NewElasticsearchCollector( logger, []string{}, collector.WithElasticsearchURL(esURL), collector.WithHTTPClient(httpClient), + collector.WithClusterInfoProvider(infoRetriever), ) if err != nil { logger.Error("failed to create Elasticsearch collector", "err", err)