Skip to content

Commit 7520d5a

Browse files
committed
revive PR 200: feat(loki): support go template in stream labels
1 parent 907ce91 commit 7520d5a

File tree

2 files changed

+88
-3
lines changed

2 files changed

+88
-3
lines changed

pkg/sinks/loki.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
10-
"io/ioutil"
10+
"io"
1111
"net/http"
1212
"strconv"
1313
"time"
@@ -51,15 +51,32 @@ func generateTimestamp() string {
5151
return strconv.FormatInt(time.Now().Unix(), 10) + "000000000"
5252
}
5353

54+
func convertStreamTemplate(layout map[string]string, ev *kube.EnhancedEvent) (map[string]string, error) {
55+
result := make(map[string]string)
56+
for key, value := range layout {
57+
rendered, err := GetString(ev, value)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
result[key] = rendered
63+
}
64+
return result, nil
65+
}
66+
5467
func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
5568
eventBody, err := serializeEventWithLayout(l.cfg.Layout, ev)
5669
if err != nil {
5770
return err
5871
}
72+
streamLabels, err := convertStreamTemplate(l.cfg.StreamLabels, ev)
73+
if err != nil {
74+
return err
75+
}
5976
timestamp := generateTimestamp()
6077
a := LokiMsg{
6178
Streams: []promtailStream{{
62-
Stream: l.cfg.StreamLabels,
79+
Stream: streamLabels,
6380
Values: [][]string{{timestamp, string(eventBody)}},
6481
}},
6582
}
@@ -93,7 +110,7 @@ func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
93110

94111
defer resp.Body.Close()
95112

96-
body, err := ioutil.ReadAll(resp.Body)
113+
body, err := io.ReadAll(resp.Body)
97114
if err != nil {
98115
return err
99116
}

pkg/sinks/loki_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package sinks
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"time"
10+
11+
"testing"
12+
13+
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
14+
"github.com/stretchr/testify/assert"
15+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
)
17+
18+
func TestLoki_Send(t *testing.T) {
19+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
20+
w.WriteHeader(http.StatusOK)
21+
}))
22+
defer ts.Close()
23+
client := Loki{cfg: &LokiConfig{URL: ts.URL}}
24+
25+
err := client.Send(context.Background(), &kube.EnhancedEvent{})
26+
27+
assert.NoError(t, err)
28+
}
29+
30+
func TestLoki_Send_StreamLabelsTemplated(t *testing.T) {
31+
rr := httptest.NewRecorder()
32+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
33+
result, err := io.ReadAll(r.Body)
34+
if err != nil {
35+
w.WriteHeader(http.StatusInternalServerError)
36+
return
37+
}
38+
rr.Write(result)
39+
w.WriteHeader(http.StatusOK)
40+
}))
41+
defer ts.Close()
42+
client := Loki{cfg: &LokiConfig{
43+
URL: ts.URL,
44+
StreamLabels: map[string]string{
45+
"app": "kube-events",
46+
"object_namespace": "{{ .InvolvedObject.Namespace }}",
47+
}}}
48+
49+
ev := &kube.EnhancedEvent{}
50+
ev.Namespace = "default"
51+
ev.Reason = "my reason"
52+
ev.Type = "Warning"
53+
ev.InvolvedObject.Kind = "Pod"
54+
ev.InvolvedObject.Name = "nginx-server-123abc-456def"
55+
ev.InvolvedObject.Namespace = "prod"
56+
ev.Message = "Successfully pulled image \"nginx:latest\""
57+
ev.FirstTimestamp = v1.Time{Time: time.Now()}
58+
59+
err := client.Send(context.Background(), ev)
60+
assert.NoError(t, err)
61+
62+
var res LokiMsg
63+
err = json.Unmarshal(rr.Body.Bytes(), &res)
64+
assert.NoError(t, err)
65+
66+
assert.Equal(t, res.Streams[0].Stream["app"], "kube-events", "Non template labels should remain the same")
67+
assert.Equal(t, res.Streams[0].Stream["object_namespace"], "prod", "Template labels should be templated")
68+
}

0 commit comments

Comments
 (0)