Skip to content

Commit 9f92986

Browse files
wildumsimonswinekorniltsev
authored
Pyroscope fixes (#3697)
* pyroscope.ebpf: Retry after eBPF errors and buffer config updates. (#3381) * fix: Keep ebpf component loop running If the eBPF component encounter an error, it stops the main processing loops, which then caused issues because any config reload for alloy would block the whole agent. Fixes #3332 Also for long running scrapes we were blocking the component evaluation as we are waiting for the config to be updated. I switched this to be a buffered channel, so the config updated are unblocked. In case of multiple updated buffered it will skip the in between updates and go straight to the latest version. Fixes #1371 * Fix data racec in test * Limit tries to start an ebpf session * Add info logs at session start/end * Pass context around to shutdown properly * fix(pyroscope.scrape): godeltaprof scraping (#3542) * fix(pyroscope.scrape): godeltaprof scraping * cl * update changelog --------- Co-authored-by: Christian Simon <[email protected]> Co-authored-by: Tolya Korniltsev <[email protected]>
1 parent d1e3524 commit 9f92986

File tree

6 files changed

+259
-63
lines changed

6 files changed

+259
-63
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@ This document contains a historical list of changes between releases. Only
77
changes that impact end-user behavior are listed; changes to documentation or
88
internal API changes are not present.
99

10-
v1.9.0-rc.0
10+
v1.9.0-rc.2
11+
-----------------
12+
13+
### Bugfixes
14+
15+
- Fix `pyroscope.scrape` scraping godeltaprof profiles. (@korniltsev)
16+
17+
v1.9.0-rc.1
1118
-----------------
1219

1320
### Breaking changes

internal/component/pyroscope/ebpf/ebpf_linux.go

Lines changed: 109 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/grafana/pyroscope/ebpf/pprof"
1616
"github.com/grafana/pyroscope/ebpf/sd"
1717
"github.com/grafana/pyroscope/ebpf/symtab"
18-
"github.com/oklog/run"
1918

2019
"github.com/grafana/alloy/internal/component"
2120
"github.com/grafana/alloy/internal/component/pyroscope"
@@ -61,7 +60,7 @@ func New(opts component.Options, args Arguments) (component.Component, error) {
6160
args: args,
6261
targetFinder: targetFinder,
6362
session: session,
64-
argsUpdate: make(chan Arguments),
63+
argsUpdate: make(chan Arguments, 4),
6564
}
6665
res.metrics.targetsActive.Set(float64(len(res.targetFinder.DebugInfo())))
6766
return res, nil
@@ -104,48 +103,89 @@ type Component struct {
104103
debugInfo DebugInfo
105104
debugInfoLock sync.Mutex
106105
metrics *metrics
106+
107+
healthMut sync.RWMutex
108+
health component.Health
107109
}
108110

109111
func (c *Component) Run(ctx context.Context) error {
110-
err := c.session.Start()
111-
if err != nil {
112-
return fmt.Errorf("ebpf profiling session start: %w", err)
113-
}
114-
defer c.session.Stop()
115-
116-
var g run.Group
117-
g.Add(func() error {
118-
collectInterval := c.args.CollectInterval
119-
t := time.NewTicker(collectInterval)
120-
defer t.Stop()
121-
for {
122-
select {
123-
case <-ctx.Done():
124-
return nil
125-
case newArgs := <-c.argsUpdate:
126-
c.args = newArgs
127-
c.session.UpdateTargets(targetsOptionFromArgs(c.args))
128-
c.metrics.targetsActive.Set(float64(len(c.targetFinder.DebugInfo())))
129-
err := c.session.Update(convertSessionOptions(c.args, c.metrics))
130-
if err != nil {
131-
return nil
132-
}
133-
c.appendable.UpdateChildren(newArgs.ForwardTo)
134-
if c.args.CollectInterval != collectInterval {
135-
t.Reset(c.args.CollectInterval)
136-
collectInterval = c.args.CollectInterval
137-
}
138-
case <-t.C:
139-
err := c.collectProfiles()
112+
var (
113+
sessionStarted = false
114+
sessionErrors = 0
115+
sessionMaxErrors = 3
116+
)
117+
118+
collectInterval := c.args.CollectInterval
119+
t := time.NewTicker(collectInterval)
120+
defer t.Stop()
121+
for {
122+
select {
123+
case <-ctx.Done():
124+
return nil
125+
case newArgs := <-c.argsUpdate:
126+
// ensure there are no other updates queued. this might happen if the collection takes a very long time
127+
newArgs = getLatestArgsFromChannel(c.argsUpdate, newArgs)
128+
129+
// update targets
130+
c.args = newArgs
131+
c.session.UpdateTargets(targetsOptionFromArgs(c.args))
132+
c.metrics.targetsActive.Set(float64(len(c.targetFinder.DebugInfo())))
133+
err := c.session.Update(convertSessionOptions(c.args, c.metrics))
134+
if err != nil {
135+
level.Error(c.options.Logger).Log("msg", "failed to update profiling session", "err", err)
136+
c.reportUnhealthy(err)
137+
continue
138+
}
139+
c.appendable.UpdateChildren(newArgs.ForwardTo)
140+
if c.args.CollectInterval != collectInterval {
141+
t.Reset(c.args.CollectInterval)
142+
collectInterval = c.args.CollectInterval
143+
}
144+
case <-t.C:
145+
if !sessionStarted {
146+
err := c.session.Start()
140147
if err != nil {
141-
c.metrics.profilingSessionsFailingTotal.Inc()
142-
return err
148+
sessionErrors++
149+
if sessionErrors > sessionMaxErrors {
150+
level.Error(c.options.Logger).Log("msg", "too many errors starting profiling session, giving up", "tries", sessionErrors, "last_error", err)
151+
t.Stop()
152+
continue
153+
}
154+
level.Error(c.options.Logger).Log("msg", "failed to start profiling session", "err", err)
155+
c.reportUnhealthy(err)
156+
continue
143157
}
144-
c.updateDebugInfo()
158+
sessionErrors = 0
159+
defer func() {
160+
c.session.Stop()
161+
level.Info(c.options.Logger).Log("msg", "ebpf profiling session stopped")
162+
}()
163+
sessionStarted = true
164+
level.Info(c.options.Logger).Log("msg", "ebpf profiling session started")
145165
}
166+
167+
err := c.collectProfiles(ctx)
168+
if err != nil {
169+
level.Error(c.options.Logger).Log("msg", "failed to collect profiles", "err", err)
170+
c.reportUnhealthy(err)
171+
c.metrics.profilingSessionsFailingTotal.Inc()
172+
continue
173+
}
174+
c.reportHealthy()
175+
c.updateDebugInfo()
176+
}
177+
}
178+
}
179+
180+
func getLatestArgsFromChannel[A any](ch chan A, current A) A {
181+
for {
182+
select {
183+
case x := <-ch:
184+
current = x
185+
default:
186+
return current
146187
}
147-
}, func(error) {})
148-
return g.Run()
188+
}
149189
}
150190

151191
func (c *Component) Update(args component.Arguments) error {
@@ -154,13 +194,38 @@ func (c *Component) Update(args component.Arguments) error {
154194
return nil
155195
}
156196

197+
func (c *Component) reportUnhealthy(err error) {
198+
c.healthMut.Lock()
199+
defer c.healthMut.Unlock()
200+
c.health = component.Health{
201+
Health: component.HealthTypeUnhealthy,
202+
Message: err.Error(),
203+
UpdateTime: time.Now(),
204+
}
205+
}
206+
207+
func (c *Component) reportHealthy() {
208+
c.healthMut.Lock()
209+
defer c.healthMut.Unlock()
210+
c.health = component.Health{
211+
Health: component.HealthTypeHealthy,
212+
UpdateTime: time.Now(),
213+
}
214+
}
215+
216+
func (c *Component) CurrentHealth() component.Health {
217+
c.healthMut.RLock()
218+
defer c.healthMut.RUnlock()
219+
return c.health
220+
}
221+
157222
func (c *Component) DebugInfo() interface{} {
158223
c.debugInfoLock.Lock()
159224
defer c.debugInfoLock.Unlock()
160225
return c.debugInfo
161226
}
162227

163-
func (c *Component) collectProfiles() error {
228+
func (c *Component) collectProfiles(ctx context.Context) error {
164229
c.metrics.profilingSessionsTotal.Inc()
165230
level.Debug(c.options.Logger).Log("msg", "ebpf collectProfiles")
166231
args := c.args
@@ -176,6 +241,11 @@ func (c *Component) collectProfiles() error {
176241
level.Debug(c.options.Logger).Log("msg", "ebpf collectProfiles done", "profiles", len(builders.Builders))
177242
bytesSent := 0
178243
for _, builder := range builders.Builders {
244+
// check if the context is done
245+
if ctx.Err() != nil {
246+
return ctx.Err()
247+
}
248+
179249
serviceName := builder.Labels.Get("service_name")
180250
c.metrics.pprofsTotal.WithLabelValues(serviceName).Inc()
181251
c.metrics.pprofSamplesTotal.WithLabelValues(serviceName).Add(float64(len(builder.Profile.Sample)))
@@ -192,7 +262,7 @@ func (c *Component) collectProfiles() error {
192262
c.metrics.pprofBytesTotal.WithLabelValues(serviceName).Add(float64(len(rawProfile)))
193263

194264
samples := []*pyroscope.RawSample{{RawProfile: rawProfile}}
195-
err = appender.Append(context.Background(), builder.Labels, samples)
265+
err = appender.Append(ctx, builder.Labels, samples)
196266
if err != nil {
197267
level.Error(c.options.Logger).Log("msg", "ebpf pprof write", "err", err)
198268
continue

internal/component/pyroscope/ebpf/ebpf_linux_test.go

Lines changed: 98 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"fmt"
88
"os"
9+
"sync"
910
"testing"
1011
"time"
1112

@@ -26,11 +27,12 @@ import (
2627
)
2728

2829
type mockSession struct {
29-
options ebpfspy.SessionOptions
30-
collectError error
31-
collected int
32-
data [][]string
33-
dataTarget *sd.Target
30+
options ebpfspy.SessionOptions
31+
collectCallback func() error
32+
collected int
33+
data [][]string
34+
dataTarget *sd.Target
35+
mtx sync.Mutex
3436
}
3537

3638
func (m *mockSession) Start() error {
@@ -42,6 +44,8 @@ func (m *mockSession) Stop() {
4244
}
4345

4446
func (m *mockSession) Update(options ebpfspy.SessionOptions) error {
47+
m.mtx.Lock()
48+
defer m.mtx.Unlock()
4549
m.options = options
4650
return nil
4751
}
@@ -52,8 +56,8 @@ func (m *mockSession) UpdateTargets(_ sd.TargetsOptions) {
5256

5357
func (m *mockSession) CollectProfiles(f pprof.CollectProfilesCallback) error {
5458
m.collected++
55-
if m.collectError != nil {
56-
return m.collectError
59+
if m.collectCallback != nil {
60+
return m.collectCallback()
5761
}
5862
for _, stack := range m.data {
5963
f(
@@ -104,7 +108,7 @@ func (m *mockSession) DebugInfo() interface{} {
104108
}
105109
}
106110

107-
func TestShutdownOnError(t *testing.T) {
111+
func TestTargetUpdatesWithLongCollection(t *testing.T) {
108112
logger := util.TestAlloyLogger(t)
109113
ms := newMetrics(nil)
110114
targetFinder, err := sd.NewTargetFinder(os.DirFS("/foo"), logger, sd.TargetsOptions{
@@ -126,9 +130,92 @@ func TestShutdownOnError(t *testing.T) {
126130
ms,
127131
)
128132

129-
session.collectError = fmt.Errorf("mocked error collecting profiles")
130-
err = c.Run(t.Context())
131-
require.Error(t, err)
133+
collection := make(chan struct{})
134+
135+
// simulate a long collection
136+
session.collectCallback = func() error {
137+
<-collection
138+
return nil
139+
}
140+
141+
var wg sync.WaitGroup
142+
wg.Add(1)
143+
ctx, cancel := context.WithCancel(t.Context())
144+
go func() {
145+
err = c.Run(ctx)
146+
require.NoError(t, err)
147+
wg.Done()
148+
}()
149+
150+
// now schedule 3 updates
151+
c.Update(arguments)
152+
c.Update(arguments)
153+
c.Update(arguments)
154+
argX := NewDefaultArguments()
155+
argX.SampleRate = 1234
156+
c.Update(argX)
157+
158+
// unblock the collection
159+
close(collection)
160+
161+
// wait for the session to be updated
162+
require.Eventually(t, func() bool {
163+
session.mtx.Lock()
164+
defer session.mtx.Unlock()
165+
return session.options.SampleRate == 1234
166+
}, time.Second*1, time.Millisecond*10)
167+
168+
cancel()
169+
wg.Wait()
170+
}
171+
172+
func TestReportingCollectError(t *testing.T) {
173+
logger := util.TestAlloyLogger(t)
174+
ms := newMetrics(nil)
175+
targetFinder, err := sd.NewTargetFinder(os.DirFS("/foo"), logger, sd.TargetsOptions{
176+
ContainerCacheSize: 1024,
177+
})
178+
require.NoError(t, err)
179+
session := &mockSession{}
180+
arguments := NewDefaultArguments()
181+
arguments.CollectInterval = time.Millisecond * 100
182+
c := newTestComponent(
183+
component.Options{
184+
Logger: logger,
185+
Registerer: prometheus.NewRegistry(),
186+
OnStateChange: func(e component.Exports) {},
187+
},
188+
arguments,
189+
session,
190+
targetFinder,
191+
ms,
192+
)
193+
194+
session.collectCallback = func() error { return fmt.Errorf("mocked error collecting profiles") }
195+
var wg sync.WaitGroup
196+
wg.Add(1)
197+
ctx, cancel := context.WithCancel(t.Context())
198+
go func() {
199+
err = c.Run(ctx)
200+
require.NoError(t, err)
201+
wg.Done()
202+
}()
203+
204+
// expect the component to be unhealthy
205+
require.Eventually(t, func() bool {
206+
if c.CurrentHealth().Health == component.HealthTypeUnhealthy {
207+
require.Equal(t, c.CurrentHealth().Message, "ebpf session collectProfiles mocked error collecting profiles")
208+
return true
209+
}
210+
return false
211+
}, time.Second*1, time.Millisecond*10)
212+
213+
// the component should still be handling update requests
214+
err = c.Update(arguments)
215+
require.NoError(t, err)
216+
217+
cancel()
218+
wg.Wait()
132219
}
133220

134221
func TestContextShutdown(t *testing.T) {

internal/component/pyroscope/scrape/scrape_loop.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,15 @@ func newScrapeLoop(t *Target, scrapeClient *http.Client, appendable pyroscope.Ap
168168
timeout += interval - time.Second
169169
}
170170

171+
appender := appendable.Appender()
172+
if !t.godeltaprof {
173+
appender = NewDeltaAppender(appender, t.allLabels)
174+
}
171175
return &scrapeLoop{
172176
Target: t,
173177
logger: logger,
174178
scrapeClient: scrapeClient,
175-
appender: NewDeltaAppender(appendable.Appender(), t.allLabels),
179+
appender: appender,
176180
interval: interval,
177181
timeout: timeout,
178182
}

0 commit comments

Comments
 (0)