Skip to content

Create a new cluster info retriever #1052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions cluster/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"path"
"sync"
"time"
)

type Info struct {
ClusterName string `json:"cluster_name"`
}

type InfoProvider struct {
logger *slog.Logger
client *http.Client
url *url.URL
interval time.Duration
lastClusterInfo Info
lastError error
cachedAt time.Time // Time when the last cluster info was fetched
mu sync.RWMutex // Protects lastClusterInfo, lastError, and cachedAt
}

// New creates a new Retriever.
func NewInfoProvider(logger *slog.Logger, client *http.Client, u *url.URL, interval time.Duration) *InfoProvider {
return &InfoProvider{
logger: logger,
client: client,
url: u,
interval: interval,
}
}

func (i *InfoProvider) GetInfo(ctx context.Context) (Info, error) {
i.mu.RLock()
info := i.lastClusterInfo
err := i.lastError
cachedAt := i.cachedAt

i.mu.RUnlock()

// If the cached info is recent enough, return it.
if !cachedAt.IsZero() && time.Since(cachedAt) < i.interval {
if err != nil {
return Info{}, err
}

if info.ClusterName != "" {
return info, nil
}
}

i.mu.Lock()
defer i.mu.Unlock()

// If we reach here, we need to fetch the cluster info. The cache is either empty or stale.
u := *i.url
u.Path = path.Join(u.Path, "/")

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return Info{}, err
}

resp, err := i.client.Do(req)
if err != nil {
i.logger.Error("failed to fetch cluster info", "err", err)
return Info{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected status code: %d", resp.StatusCode)
i.lastError = err
return Info{}, err
}

var infoResponse Info
body, err := io.ReadAll(resp.Body)
if err != nil {
i.lastError = err
return Info{}, err
}

if err := json.Unmarshal(body, &infoResponse); err != nil {
i.lastError = err
return Info{}, fmt.Errorf("failed to unmarshal cluster info: %w", err)
}

info = Info{ClusterName: infoResponse.ClusterName}
i.lastClusterInfo = info
i.lastError = nil
i.cachedAt = time.Now()

return info, nil
}
106 changes: 106 additions & 0 deletions cluster/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"testing"
"time"

"github.com/prometheus/common/promslog"
)

func TestInfoProvider_GetInfo(t *testing.T) {
timesURLCalled := 0
expectedInfo := Info{
ClusterName: "test-cluster-1",
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
timesURLCalled++
_, _ = w.Write([]byte(`{
"name": "test-node-abcd",
"cluster_name": "test-cluster-1",
"cluster_uuid": "r1bT9sBrR7S9-CamE41Qqg",
"version": {
"number": "5.6.9",
"build_hash": "877a590",
"build_date": "2018-04-12T16:25:14.838Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
}
}`))
}))
tsURL, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("failed to parse test server URL: %v", err)
}
defer ts.Close()

i := NewInfoProvider(promslog.New(&promslog.Config{Writer: os.Stdout}), http.DefaultClient, tsURL, time.Second)

if timesURLCalled != 0 {
t.Errorf("expected no initial URL calls, got %d", timesURLCalled)
}

got, err := i.GetInfo(context.Background())
if err != nil {
t.Errorf("InfoProvider.GetInfo() error = %v, wantErr %v", err, false)
return
}

if !reflect.DeepEqual(got, expectedInfo) {
t.Errorf("InfoProvider.GetInfo() = %v, want %v", got, expectedInfo)
}

if timesURLCalled != 1 {
t.Errorf("expected URL to be called once, got %d", timesURLCalled)
}

// Call again to ensure cached value is returned
got, err = i.GetInfo(context.Background())
if err != nil {
t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false)
return
}
if !reflect.DeepEqual(got, expectedInfo) {
t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo)
}
if timesURLCalled != 1 {
t.Errorf("expected URL to be called only once, got %d", timesURLCalled)
}

// Call again after delay to ensure we refresh the cache
time.Sleep(2 * time.Second)
got, err = i.GetInfo(context.Background())
if err != nil {
t.Errorf("InfoProvider.GetInfo() error on second call = %v, wantErr %v", err, false)
return
}
if !reflect.DeepEqual(got, expectedInfo) {
t.Errorf("InfoProvider.GetInfo() on second call = %v, want %v", got, expectedInfo)
}
if timesURLCalled != 2 {
t.Errorf("expected URL to be called only once, got %d", timesURLCalled)
}
}
2 changes: 1 addition & 1 deletion collector/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type VersionInfo struct {
LuceneVersion semver.Version `json:"lucene_version"`
}

func (c *ClusterInfoCollector) Update(_ context.Context, ch chan<- prometheus.Metric) error {
func (c *ClusterInfoCollector) Update(_ context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
resp, err := c.hc.Get(c.u.String())
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion collector/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type clusterSettingsWatermark struct {
Low string `json:"low"`
}

func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
func (c *ClusterSettingsCollector) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"})
q := u.Query()
q.Set("include_defaults", "true")
Expand Down
23 changes: 19 additions & 4 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/alecthomas/kingpin/v2"
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus-community/elasticsearch_exporter/cluster"
)

const (
Expand Down Expand Up @@ -64,7 +66,7 @@ var (
// Collector is the interface a collector has to implement.
type Collector interface {
// Get new metrics and expose them via prometheus registry.
Update(context.Context, chan<- prometheus.Metric) error
Update(context.Context, UpdateContext, chan<- prometheus.Metric) error
}

func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) {
Expand Down Expand Up @@ -92,6 +94,7 @@ type ElasticsearchCollector struct {
logger *slog.Logger
esURL *url.URL
httpClient *http.Client
cluserInfo *cluster.InfoProvider
}

type Option func(*ElasticsearchCollector) error
Expand All @@ -106,6 +109,10 @@ func NewElasticsearchCollector(logger *slog.Logger, filters []string, options ..
}
}

if e.cluserInfo == nil {
return nil, fmt.Errorf("cluster info provider is not set")
}

f := make(map[string]bool)
for _, filter := range filters {
enabled, exist := collectorState[filter]
Expand Down Expand Up @@ -155,6 +162,13 @@ func WithHTTPClient(hc *http.Client) Option {
}
}

func WithClusterInfoProvider(cl *cluster.InfoProvider) Option {
return func(e *ElasticsearchCollector) error {
e.cluserInfo = cl
return nil
}
}

// Describe implements the prometheus.Collector interface.
func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeDurationDesc
Expand All @@ -163,21 +177,22 @@ func (e ElasticsearchCollector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements the prometheus.Collector interface.
func (e ElasticsearchCollector) Collect(ch chan<- prometheus.Metric) {
uc := NewDefaultUpdateContext(e.cluserInfo)
wg := sync.WaitGroup{}
ctx := context.TODO()
wg.Add(len(e.Collectors))
for name, c := range e.Collectors {
go func(name string, c Collector) {
execute(ctx, name, c, ch, e.logger)
execute(ctx, name, c, ch, e.logger, uc)
wg.Done()
}(name, c)
}
wg.Wait()
}

func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger) {
func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus.Metric, logger *slog.Logger, uc UpdateContext) {
begin := time.Now()
err := c.Update(ctx, ch)
err := c.Update(ctx, uc, ch)
duration := time.Since(begin)
var success float64

Expand Down
10 changes: 9 additions & 1 deletion collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"

"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus-community/elasticsearch_exporter/cluster"
)

// wrapCollector is a util to let you test your Collector implementation.
Expand All @@ -32,5 +34,11 @@ func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) {
}

func (w wrapCollector) Collect(ch chan<- prometheus.Metric) {
w.c.Update(context.Background(), ch)
w.c.Update(context.Background(), &mockUpdateContext{}, ch)
}

type mockUpdateContext struct{}

func (m *mockUpdateContext) GetClusterInfo(_ context.Context) (cluster.Info, error) {
return cluster.Info{}, nil
}
2 changes: 1 addition & 1 deletion collector/data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type DataStreamStatsDataStream struct {
MaximumTimestamp int64 `json:"maximum_timestamp"`
}

func (ds *DataStream) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
func (ds *DataStream) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
var dsr DataStreamStatsResponse

u := ds.u.ResolveReference(&url.URL{Path: "/_data_stream/*/_stats"})
Expand Down
2 changes: 1 addition & 1 deletion collector/health_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func statusValue(value string, color string) float64 {
return 0
}

func (c *HealthReport) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
func (c *HealthReport) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
u := c.url.ResolveReference(&url.URL{Path: "/_health_report"})
var healthReportResponse HealthReportResponse

Expand Down
2 changes: 1 addition & 1 deletion collector/ilm.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type IlmStatusResponse struct {
OperationMode string `json:"operation_mode"`
}

func (i *ILM) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
func (i *ILM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
var ir IlmResponse

indexURL := i.u.ResolveReference(&url.URL{Path: "/_all/_ilm/explain"})
Expand Down
2 changes: 1 addition & 1 deletion collector/slm.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type SLMStatusResponse struct {
OperationMode string `json:"operation_mode"`
}

func (s *SLM) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
func (s *SLM) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
u := s.u.ResolveReference(&url.URL{Path: "/_slm/status"})
var slmStatusResp SLMStatusResponse

Expand Down
2 changes: 1 addition & 1 deletion collector/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func NewSnapshots(logger *slog.Logger, u *url.URL, hc *http.Client) (Collector,
}, nil
}

func (c *Snapshots) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
func (c *Snapshots) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
// indices
snapshotsStatsResp := make(map[string]SnapshotStatsResponse)
u := c.u.ResolveReference(&url.URL{Path: "/_snapshot"})
Expand Down
2 changes: 1 addition & 1 deletion collector/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewTaskCollector(logger *slog.Logger, u *url.URL, hc *http.Client) (Collect
}, nil
}

func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
func (t *TaskCollector) Update(ctx context.Context, uc UpdateContext, ch chan<- prometheus.Metric) error {
tasks, err := t.fetchTasks(ctx)
if err != nil {
return fmt.Errorf("failed to fetch and decode task stats: %w", err)
Expand Down
Loading
Loading