Skip to content

Commit c8a5c69

Browse files
committed
Merge pull request resmoio#229
2 parents cd481dd + 7520d5a commit c8a5c69

File tree

2 files changed

+88
-3
lines changed

2 files changed

+88
-3
lines changed

pkg/sinks/loki.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"io/ioutil"
9+
"io"
1010
"net/http"
1111
"strconv"
1212
"time"
@@ -52,15 +52,32 @@ func generateTimestamp() string {
5252
return strconv.FormatInt(time.Now().Unix(), 10) + "000000000"
5353
}
5454

55+
func convertStreamTemplate(layout map[string]string, ev *kube.EnhancedEvent) (map[string]string, error) {
56+
result := make(map[string]string)
57+
for key, value := range layout {
58+
rendered, err := GetString(ev, value)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
result[key] = rendered
64+
}
65+
return result, nil
66+
}
67+
5568
func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
5669
eventBody, err := serializeEventWithLayout(l.cfg.Layout, ev)
5770
if err != nil {
5871
return err
5972
}
73+
streamLabels, err := convertStreamTemplate(l.cfg.StreamLabels, ev)
74+
if err != nil {
75+
return err
76+
}
6077
timestamp := generateTimestamp()
6178
a := LokiMsg{
6279
Streams: []promtailStream{{
63-
Stream: l.cfg.StreamLabels,
80+
Stream: streamLabels,
6481
Values: [][]string{{timestamp, string(eventBody)}},
6582
}},
6683
}
@@ -95,7 +112,7 @@ func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error {
95112

96113
defer resp.Body.Close()
97114

98-
body, err := ioutil.ReadAll(resp.Body)
115+
body, err := io.ReadAll(resp.Body)
99116
if err != nil {
100117
return err
101118
}

pkg/sinks/loki_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package sinks
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"time"
10+
11+
"testing"
12+
13+
"github.com/resmoio/kubernetes-event-exporter/pkg/kube"
14+
"github.com/stretchr/testify/assert"
15+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
)
17+
18+
func TestLoki_Send(t *testing.T) {
19+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
20+
w.WriteHeader(http.StatusOK)
21+
}))
22+
defer ts.Close()
23+
client := Loki{cfg: &LokiConfig{URL: ts.URL}}
24+
25+
err := client.Send(context.Background(), &kube.EnhancedEvent{})
26+
27+
assert.NoError(t, err)
28+
}
29+
30+
func TestLoki_Send_StreamLabelsTemplated(t *testing.T) {
31+
rr := httptest.NewRecorder()
32+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
33+
result, err := io.ReadAll(r.Body)
34+
if err != nil {
35+
w.WriteHeader(http.StatusInternalServerError)
36+
return
37+
}
38+
rr.Write(result)
39+
w.WriteHeader(http.StatusOK)
40+
}))
41+
defer ts.Close()
42+
client := Loki{cfg: &LokiConfig{
43+
URL: ts.URL,
44+
StreamLabels: map[string]string{
45+
"app": "kube-events",
46+
"object_namespace": "{{ .InvolvedObject.Namespace }}",
47+
}}}
48+
49+
ev := &kube.EnhancedEvent{}
50+
ev.Namespace = "default"
51+
ev.Reason = "my reason"
52+
ev.Type = "Warning"
53+
ev.InvolvedObject.Kind = "Pod"
54+
ev.InvolvedObject.Name = "nginx-server-123abc-456def"
55+
ev.InvolvedObject.Namespace = "prod"
56+
ev.Message = "Successfully pulled image \"nginx:latest\""
57+
ev.FirstTimestamp = v1.Time{Time: time.Now()}
58+
59+
err := client.Send(context.Background(), ev)
60+
assert.NoError(t, err)
61+
62+
var res LokiMsg
63+
err = json.Unmarshal(rr.Body.Bytes(), &res)
64+
assert.NoError(t, err)
65+
66+
assert.Equal(t, res.Streams[0].Stream["app"], "kube-events", "Non template labels should remain the same")
67+
assert.Equal(t, res.Streams[0].Stream["object_namespace"], "prod", "Template labels should be templated")
68+
}

0 commit comments

Comments
 (0)