Skip to content

[8.19] (backport #8991) Add component and unit diagnostics for beats receivers #9051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ type OTelManager interface {
// MergedOtelConfig returns the merged Otel collector configuration, containing both the plain config and the
// component config.
MergedOtelConfig() *confmap.Conf

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units.
PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic

// PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided,
// then it performs the diagnostics for all current units.
PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error)
}

// ConfigChange provides an interface for receiving a new configuration.
Expand Down Expand Up @@ -649,12 +657,29 @@ func (c *Coordinator) PerformAction(ctx context.Context, comp component.Componen
// it performs diagnostics for all current units.
// Called from external goroutines.
func (c *Coordinator) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
return c.runtimeMgr.PerformDiagnostics(ctx, req...)
var diags []runtime.ComponentUnitDiagnostic
runtimeDiags := c.runtimeMgr.PerformDiagnostics(ctx, req...)
diags = append(diags, runtimeDiags...)
otelDiags := c.otelMgr.PerformDiagnostics(ctx, req...)
diags = append(diags, otelDiags...)
return diags
}

// PerformComponentDiagnostics executes the diagnostic action for the provided components.
func (c *Coordinator) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) {
return c.runtimeMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...)
var diags []runtime.ComponentDiagnostic
runtimeDiags, runtimeErr := c.runtimeMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...)
if runtimeErr != nil {
runtimeErr = fmt.Errorf("runtime diagnostics failed: %w", runtimeErr)
}
diags = append(diags, runtimeDiags...)
otelDiags, otelErr := c.otelMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...)
if otelErr != nil {
otelErr = fmt.Errorf("otel diagnostics failed: %w", otelErr)
}
diags = append(diags, otelDiags...)
err := errors.Join(runtimeErr, otelErr)
return diags, err
}

// SetLogLevel changes the entire log level for the running Elastic Agent.
Expand Down
46 changes: 35 additions & 11 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,11 +1341,13 @@ func (f *fakeVarsManager) DefaultProvider() string {
var _ OTelManager = (*fakeOTelManager)(nil)

type fakeOTelManager struct {
updateCollectorCallback func(*confmap.Conf) error
updateComponentCallback func([]component.Component) error
errChan chan error
collectorStatusChan chan *status.AggregateStatus
componentStateChan chan []runtime.ComponentComponentState
updateCollectorCallback func(*confmap.Conf) error
updateComponentCallback func([]component.Component) error
performDiagnosticsCallback func(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic
performComponentDiagnosticsCallback func(context.Context, []cproto.AdditionalDiagnosticRequest, ...component.Component) ([]runtime.ComponentDiagnostic, error)
errChan chan error
collectorStatusChan chan *status.AggregateStatus
componentStateChan chan []runtime.ComponentComponentState
}

func (f *fakeOTelManager) Run(ctx context.Context) error {
Expand Down Expand Up @@ -1385,12 +1387,28 @@ func (f *fakeOTelManager) WatchComponents() <-chan []runtime.ComponentComponentS

func (f *fakeOTelManager) MergedOtelConfig() *confmap.Conf { return nil }

func (f *fakeOTelManager) PerformDiagnostics(ctx context.Context, reqs ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
if f.performDiagnosticsCallback != nil {
return f.performDiagnosticsCallback(ctx, reqs...)
}
return nil
}

func (f *fakeOTelManager) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) {
if f.performComponentDiagnosticsCallback != nil {
return f.performComponentDiagnosticsCallback(ctx, additionalMetrics, req...)
}
return nil, nil
}

// An implementation of the RuntimeManager interface for use in testing.
type fakeRuntimeManager struct {
state []runtime.ComponentComponentState
updateCallback func([]component.Component) error
result error
errChan chan error
state []runtime.ComponentComponentState
updateCallback func([]component.Component) error
performDiagnosticsCallback func(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic
performComponentDiagnosticsCallback func(context.Context, []cproto.AdditionalDiagnosticRequest, ...component.Component) ([]runtime.ComponentDiagnostic, error)
result error
errChan chan error
}

func (r *fakeRuntimeManager) Run(ctx context.Context) error {
Expand Down Expand Up @@ -1428,12 +1446,18 @@ func (r *fakeRuntimeManager) SubscribeAll(context.Context) *runtime.Subscription

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units.
func (r *fakeRuntimeManager) PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
func (r *fakeRuntimeManager) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic {
if r.performDiagnosticsCallback != nil {
return r.performDiagnosticsCallback(ctx, req...)
}
return nil
}

// PerformComponentDiagnostics executes the diagnostic action for the provided components.
func (r *fakeRuntimeManager) PerformComponentDiagnostics(_ context.Context, _ []cproto.AdditionalDiagnosticRequest, _ ...component.Component) ([]runtime.ComponentDiagnostic, error) {
func (r *fakeRuntimeManager) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) {
if r.performComponentDiagnosticsCallback != nil {
return r.performComponentDiagnosticsCallback(ctx, additionalMetrics, req...)
}
return nil, nil
}

Expand Down
Loading
Loading