Skip to content

Commit ca19411

Browse files
swiatekmmergify[bot]
authored andcommitted
Add component and unit diagnostics for beats receivers (#8991)
* Add diagnostics to otel manager # Conflicts: # internal/pkg/agent/application/coordinator/coordinator_test.go # internal/pkg/otel/manager/manager.go * add integration test * Move helper function to allow diagnostic tests to run on Windows * Handle diagnostics errors separately * Centralize locking for config updates * Make tests more explicit * Early exit from diagnostics generation function * Refactor config generation for easier locking (cherry picked from commit ce0519f)
1 parent 40d0025 commit ca19411

File tree

9 files changed

+721
-207
lines changed

9 files changed

+721
-207
lines changed

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,14 @@ type OTelManager interface {
148148
// MergedOtelConfig returns the merged Otel collector configuration, containing both the plain config and the
149149
// component config.
150150
MergedOtelConfig() *confmap.Conf
151+
152+
// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
153+
// it performs diagnostics for all current units.
154+
PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic
155+
156+
// PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided,
157+
// then it performs the diagnostics for all current units.
158+
PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error)
151159
}
152160

153161
// ConfigChange provides an interface for receiving a new configuration.
@@ -649,12 +657,29 @@ func (c *Coordinator) PerformAction(ctx context.Context, comp component.Componen
649657
// it performs diagnostics for all current units.
650658
// Called from external goroutines.
651659
func (c *Coordinator) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
652-
return c.runtimeMgr.PerformDiagnostics(ctx, req...)
660+
var diags []runtime.ComponentUnitDiagnostic
661+
runtimeDiags := c.runtimeMgr.PerformDiagnostics(ctx, req...)
662+
diags = append(diags, runtimeDiags...)
663+
otelDiags := c.otelMgr.PerformDiagnostics(ctx, req...)
664+
diags = append(diags, otelDiags...)
665+
return diags
653666
}
654667

655668
// PerformComponentDiagnostics executes the diagnostic action for the provided components.
656669
func (c *Coordinator) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) {
657-
return c.runtimeMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...)
670+
var diags []runtime.ComponentDiagnostic
671+
runtimeDiags, runtimeErr := c.runtimeMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...)
672+
if runtimeErr != nil {
673+
runtimeErr = fmt.Errorf("runtime diagnostics failed: %w", runtimeErr)
674+
}
675+
diags = append(diags, runtimeDiags...)
676+
otelDiags, otelErr := c.otelMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...)
677+
if otelErr != nil {
678+
otelErr = fmt.Errorf("otel diagnostics failed: %w", otelErr)
679+
}
680+
diags = append(diags, otelDiags...)
681+
err := errors.Join(runtimeErr, otelErr)
682+
return diags, err
658683
}
659684

660685
// SetLogLevel changes the entire log level for the running Elastic Agent.

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,11 +1341,13 @@ func (f *fakeVarsManager) DefaultProvider() string {
13411341
var _ OTelManager = (*fakeOTelManager)(nil)
13421342

13431343
type fakeOTelManager struct {
1344-
updateCollectorCallback func(*confmap.Conf) error
1345-
updateComponentCallback func([]component.Component) error
1346-
errChan chan error
1347-
collectorStatusChan chan *status.AggregateStatus
1348-
componentStateChan chan []runtime.ComponentComponentState
1344+
updateCollectorCallback func(*confmap.Conf) error
1345+
updateComponentCallback func([]component.Component) error
1346+
performDiagnosticsCallback func(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic
1347+
performComponentDiagnosticsCallback func(context.Context, []cproto.AdditionalDiagnosticRequest, ...component.Component) ([]runtime.ComponentDiagnostic, error)
1348+
errChan chan error
1349+
collectorStatusChan chan *status.AggregateStatus
1350+
componentStateChan chan []runtime.ComponentComponentState
13491351
}
13501352

13511353
func (f *fakeOTelManager) Run(ctx context.Context) error {
@@ -1385,12 +1387,28 @@ func (f *fakeOTelManager) WatchComponents() <-chan []runtime.ComponentComponentS
13851387

13861388
func (f *fakeOTelManager) MergedOtelConfig() *confmap.Conf { return nil }
13871389

1390+
func (f *fakeOTelManager) PerformDiagnostics(ctx context.Context, reqs ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
1391+
if f.performDiagnosticsCallback != nil {
1392+
return f.performDiagnosticsCallback(ctx, reqs...)
1393+
}
1394+
return nil
1395+
}
1396+
1397+
func (f *fakeOTelManager) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) {
1398+
if f.performComponentDiagnosticsCallback != nil {
1399+
return f.performComponentDiagnosticsCallback(ctx, additionalMetrics, req...)
1400+
}
1401+
return nil, nil
1402+
}
1403+
13881404
// An implementation of the RuntimeManager interface for use in testing.
13891405
type fakeRuntimeManager struct {
1390-
state []runtime.ComponentComponentState
1391-
updateCallback func([]component.Component) error
1392-
result error
1393-
errChan chan error
1406+
state []runtime.ComponentComponentState
1407+
updateCallback func([]component.Component) error
1408+
performDiagnosticsCallback func(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic
1409+
performComponentDiagnosticsCallback func(context.Context, []cproto.AdditionalDiagnosticRequest, ...component.Component) ([]runtime.ComponentDiagnostic, error)
1410+
result error
1411+
errChan chan error
13941412
}
13951413

13961414
func (r *fakeRuntimeManager) Run(ctx context.Context) error {
@@ -1428,12 +1446,18 @@ func (r *fakeRuntimeManager) SubscribeAll(context.Context) *runtime.Subscription
14281446

14291447
// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
14301448
// it performs diagnostics for all current units.
1431-
func (r *fakeRuntimeManager) PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
1449+
func (r *fakeRuntimeManager) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
1450+
if r.performDiagnosticsCallback != nil {
1451+
return r.performDiagnosticsCallback(ctx, req...)
1452+
}
14321453
return nil
14331454
}
14341455

14351456
// PerformComponentDiagnostics executes the diagnostic action for the provided components.
1436-
func (r *fakeRuntimeManager) PerformComponentDiagnostics(_ context.Context, _ []cproto.AdditionalDiagnosticRequest, _ ...component.Component) ([]runtime.ComponentDiagnostic, error) {
1457+
func (r *fakeRuntimeManager) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) {
1458+
if r.performComponentDiagnosticsCallback != nil {
1459+
return r.performComponentDiagnosticsCallback(ctx, additionalMetrics, req...)
1460+
}
14371461
return nil, nil
14381462
}
14391463

0 commit comments

Comments
 (0)