diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 76a53486d5b..43feca98de6 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -164,6 +164,14 @@ type OTelManager interface { // MergedOtelConfig returns the merged Otel collector configuration, containing both the plain config and the // component config. MergedOtelConfig() *confmap.Conf + + // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then + // it performs diagnostics for all current units. + PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic + + // PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided, + // then it performs the diagnostics for all current units. + PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) } // ConfigChange provides an interface for receiving a new configuration. @@ -767,12 +775,29 @@ func (c *Coordinator) PerformAction(ctx context.Context, comp component.Componen // it performs diagnostics for all current units. // Called from external goroutines. func (c *Coordinator) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic { - return c.runtimeMgr.PerformDiagnostics(ctx, req...) + var diags []runtime.ComponentUnitDiagnostic + runtimeDiags := c.runtimeMgr.PerformDiagnostics(ctx, req...) + diags = append(diags, runtimeDiags...) + otelDiags := c.otelMgr.PerformDiagnostics(ctx, req...) + diags = append(diags, otelDiags...) + return diags } // PerformComponentDiagnostics executes the diagnostic action for the provided components. func (c *Coordinator) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) { - return c.runtimeMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...) + var diags []runtime.ComponentDiagnostic + runtimeDiags, runtimeErr := c.runtimeMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...) + if runtimeErr != nil { + runtimeErr = fmt.Errorf("runtime diagnostics failed: %w", runtimeErr) + } + diags = append(diags, runtimeDiags...) + otelDiags, otelErr := c.otelMgr.PerformComponentDiagnostics(ctx, additionalMetrics, req...) + if otelErr != nil { + otelErr = fmt.Errorf("otel diagnostics failed: %w", otelErr) + } + diags = append(diags, otelDiags...) + err := errors.Join(runtimeErr, otelErr) + return diags, err } // SetLogLevel changes the entire log level for the running Elastic Agent. diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 62c9e5e2733..3fc15e394f3 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1341,11 +1341,13 @@ func (f *fakeVarsManager) DefaultProvider() string { var _ OTelManager = (*fakeOTelManager)(nil) type fakeOTelManager struct { - updateCollectorCallback func(*confmap.Conf) error - updateComponentCallback func([]component.Component) error - errChan chan error - collectorStatusChan chan *status.AggregateStatus - componentStateChan chan []runtime.ComponentComponentState + updateCollectorCallback func(*confmap.Conf) error + updateComponentCallback func([]component.Component) error + performDiagnosticsCallback func(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic + performComponentDiagnosticsCallback func(context.Context, []cproto.AdditionalDiagnosticRequest, ...component.Component) ([]runtime.ComponentDiagnostic, error) + errChan chan error + collectorStatusChan chan *status.AggregateStatus + componentStateChan chan []runtime.ComponentComponentState } func (f *fakeOTelManager) Run(ctx context.Context) error { @@ -1385,12 +1387,28 @@ func (f *fakeOTelManager) WatchComponents() <-chan []runtime.ComponentComponentS func (f *fakeOTelManager) MergedOtelConfig() *confmap.Conf { return nil } +func (f *fakeOTelManager) PerformDiagnostics(ctx context.Context, reqs ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic { + if f.performDiagnosticsCallback != nil { + return f.performDiagnosticsCallback(ctx, reqs...) + } + return nil +} + +func (f *fakeOTelManager) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) { + if f.performComponentDiagnosticsCallback != nil { + return f.performComponentDiagnosticsCallback(ctx, additionalMetrics, req...) + } + return nil, nil +} + // An implementation of the RuntimeManager interface for use in testing. type fakeRuntimeManager struct { - state []runtime.ComponentComponentState - updateCallback func([]component.Component) error - result error - errChan chan error + state []runtime.ComponentComponentState + updateCallback func([]component.Component) error + performDiagnosticsCallback func(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic + performComponentDiagnosticsCallback func(context.Context, []cproto.AdditionalDiagnosticRequest, ...component.Component) ([]runtime.ComponentDiagnostic, error) + result error + errChan chan error } func (r *fakeRuntimeManager) Run(ctx context.Context) error { @@ -1428,12 +1446,18 @@ func (r *fakeRuntimeManager) SubscribeAll(context.Context) *runtime.Subscription // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then // it performs diagnostics for all current units. -func (r *fakeRuntimeManager) PerformDiagnostics(context.Context, ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic { +func (r *fakeRuntimeManager) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic { + if r.performDiagnosticsCallback != nil { + return r.performDiagnosticsCallback(ctx, req...) + } return nil } // PerformComponentDiagnostics executes the diagnostic action for the provided components. -func (r *fakeRuntimeManager) PerformComponentDiagnostics(_ context.Context, _ []cproto.AdditionalDiagnosticRequest, _ ...component.Component) ([]runtime.ComponentDiagnostic, error) { +func (r *fakeRuntimeManager) PerformComponentDiagnostics(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component) ([]runtime.ComponentDiagnostic, error) { + if r.performComponentDiagnosticsCallback != nil { + return r.performComponentDiagnosticsCallback(ctx, additionalMetrics, req...) + } return nil, nil } diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 93bfb340b23..87761c0a2e7 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/utils/broadcaster" ) @@ -691,3 +692,284 @@ func (a fakeAgentInfo) ECSMetadata(l *logger.Logger) (*info.ECSMeta, error) { func (a fakeAgentInfo) ReloadID(ctx context.Context) error { panic("implement me") } func (a fakeAgentInfo) SetLogLevel(ctx context.Context, level string) error { panic("implement me") } + +func TestCoordinatorPerformDiagnostics(t *testing.T) { + tests := []struct { + name string + runtimeDiags []runtime.ComponentUnitDiagnostic + otelDiags []runtime.ComponentUnitDiagnostic + expectedRuntimeDiagCount int + expectedOtelDiagCount int + }{ + { + name: "both runtime and otel return diagnostics", + runtimeDiags: []runtime.ComponentUnitDiagnostic{ + { + Component: component.Component{ID: "runtime-comp-1"}, + Unit: component.Unit{ID: "runtime-unit-1", Type: client.UnitTypeInput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "runtime-diag"}}, + }, + }, + otelDiags: []runtime.ComponentUnitDiagnostic{ + { + Component: component.Component{ID: "otel-comp-1"}, + Unit: component.Unit{ID: "otel-unit-1", Type: client.UnitTypeOutput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "otel-diag"}}, + }, + }, + expectedRuntimeDiagCount: 1, + expectedOtelDiagCount: 1, + }, + { + name: "only runtime returns diagnostics", + runtimeDiags: []runtime.ComponentUnitDiagnostic{ + { + Component: component.Component{ID: "runtime-comp-1"}, + Unit: component.Unit{ID: "runtime-unit-1", Type: client.UnitTypeInput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "runtime-diag"}}, + }, + }, + otelDiags: []runtime.ComponentUnitDiagnostic{}, + expectedRuntimeDiagCount: 1, + expectedOtelDiagCount: 0, + }, + { + name: "only otel returns diagnostics", + runtimeDiags: []runtime.ComponentUnitDiagnostic{}, + otelDiags: []runtime.ComponentUnitDiagnostic{ + { + Component: component.Component{ID: "otel-comp-1"}, + Unit: component.Unit{ID: "otel-unit-1", Type: client.UnitTypeOutput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "otel-diag"}}, + }, + }, + expectedRuntimeDiagCount: 0, + expectedOtelDiagCount: 1, + }, + { + name: "no diagnostics from either manager", + runtimeDiags: []runtime.ComponentUnitDiagnostic{}, + otelDiags: []runtime.ComponentUnitDiagnostic{}, + expectedRuntimeDiagCount: 0, + expectedOtelDiagCount: 0, + }, + { + name: "multiple diagnostics from both managers", + runtimeDiags: []runtime.ComponentUnitDiagnostic{ + { + Component: component.Component{ID: "runtime-comp-1"}, + Unit: component.Unit{ID: "runtime-unit-1", Type: client.UnitTypeInput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "runtime-diag-1"}}, + }, + { + Component: component.Component{ID: "runtime-comp-2"}, + Unit: component.Unit{ID: "runtime-unit-2", Type: client.UnitTypeInput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "runtime-diag-2"}}, + }, + }, + otelDiags: []runtime.ComponentUnitDiagnostic{ + { + Component: component.Component{ID: "otel-comp-1"}, + Unit: component.Unit{ID: "otel-unit-1", Type: client.UnitTypeOutput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "otel-diag-1"}}, + }, + { + Component: component.Component{ID: "otel-comp-2"}, + Unit: component.Unit{ID: "otel-unit-2", Type: client.UnitTypeOutput}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "otel-diag-2"}}, + }, + }, + expectedRuntimeDiagCount: 2, + expectedOtelDiagCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock managers with callbacks + mockRuntimeMgr := &fakeRuntimeManager{ + performDiagnosticsCallback: func( + ctx context.Context, + reqs ...runtime.ComponentUnitDiagnosticRequest, + ) []runtime.ComponentUnitDiagnostic { + return tt.runtimeDiags + }, + } + mockOtelMgr := &fakeOTelManager{ + performDiagnosticsCallback: func( + ctx context.Context, + reqs ...runtime.ComponentUnitDiagnosticRequest, + ) []runtime.ComponentUnitDiagnostic { + return tt.otelDiags + }, + } + + // Create coordinator with mock managers + coord := &Coordinator{ + runtimeMgr: mockRuntimeMgr, + otelMgr: mockOtelMgr, + } + + // Create test requests + req1 := runtime.ComponentUnitDiagnosticRequest{ + Component: component.Component{ID: "test-comp-1"}, + Unit: component.Unit{ID: "test-unit-1", Type: client.UnitTypeInput}, + } + req2 := runtime.ComponentUnitDiagnosticRequest{ + Component: component.Component{ID: "test-comp-2"}, + Unit: component.Unit{ID: "test-unit-2", Type: client.UnitTypeOutput}, + } + + // Execute PerformDiagnostics + ctx := context.Background() + result := coord.PerformDiagnostics(ctx, req1, req2) + + // Verify results + runtimeDiagFound := 0 + otelDiagFound := 0 + for _, diag := range result { + if diag.Component.ID == "runtime-comp-1" || diag.Component.ID == "runtime-comp-2" { + runtimeDiagFound++ + } + if diag.Component.ID == "otel-comp-1" || diag.Component.ID == "otel-comp-2" { + otelDiagFound++ + } + } + assert.Equal(t, tt.expectedRuntimeDiagCount, runtimeDiagFound, "Runtime diagnostic count should match expected") + assert.Equal(t, tt.expectedOtelDiagCount, otelDiagFound, "OTel diagnostic count should match expected") + }) + } +} + +func TestCoordinatorPerformComponentDiagnostics(t *testing.T) { + tests := []struct { + name string + runtimeDiags []runtime.ComponentDiagnostic + runtimeErr error + otelDiags []runtime.ComponentDiagnostic + otelErr error + expectedRuntimeDiagCount int + expectedOtelDiagCount int + }{ + { + name: "both runtime and otel return diagnostics successfully", + runtimeDiags: []runtime.ComponentDiagnostic{ + { + Component: component.Component{ID: "runtime-comp-1"}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "runtime-diag"}}, + }, + }, + otelDiags: []runtime.ComponentDiagnostic{ + { + Component: component.Component{ID: "otel-comp-1"}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "otel-diag"}}, + }, + }, + expectedRuntimeDiagCount: 1, + expectedOtelDiagCount: 1, + }, + { + name: "runtime manager returns error", + runtimeDiags: []runtime.ComponentDiagnostic{}, + runtimeErr: errors.New("runtime manager error"), + otelDiags: []runtime.ComponentDiagnostic{ + { + Component: component.Component{ID: "otel-comp-1"}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "otel-diag"}}, + }, + }, + expectedRuntimeDiagCount: 0, + expectedOtelDiagCount: 1, + }, + { + name: "otel manager returns error", + runtimeDiags: []runtime.ComponentDiagnostic{ + { + Component: component.Component{ID: "runtime-comp-1"}, + Results: []*proto.ActionDiagnosticUnitResult{{Name: "runtime-diag"}}, + }, + }, + otelDiags: []runtime.ComponentDiagnostic{}, + otelErr: errors.New("otel manager error"), + expectedRuntimeDiagCount: 1, + expectedOtelDiagCount: 0, + }, + { + name: "only runtime returns diagnostics", + runtimeDiags: []runtime.ComponentDiagnostic{{Component: component.Component{ID: "runtime-comp-1"}}}, + otelDiags: []runtime.ComponentDiagnostic{}, + expectedRuntimeDiagCount: 1, + expectedOtelDiagCount: 0, + }, + { + name: "only otel returns diagnostics", + runtimeDiags: []runtime.ComponentDiagnostic{}, + otelDiags: []runtime.ComponentDiagnostic{{Component: component.Component{ID: "otel-comp-1"}}}, + expectedRuntimeDiagCount: 0, + expectedOtelDiagCount: 1, + }, + { + name: "no diagnostics from either manager", + runtimeDiags: []runtime.ComponentDiagnostic{}, + otelDiags: []runtime.ComponentDiagnostic{}, + expectedRuntimeDiagCount: 0, + expectedOtelDiagCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create mock managers with callbacks + mockRuntimeMgr := &fakeRuntimeManager{ + performComponentDiagnosticsCallback: func(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, comps ...component.Component) ([]runtime.ComponentDiagnostic, error) { + return tt.runtimeDiags, tt.runtimeErr + }, + } + mockOtelMgr := &fakeOTelManager{ + performComponentDiagnosticsCallback: func(ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, comps ...component.Component) ([]runtime.ComponentDiagnostic, error) { + return tt.otelDiags, tt.otelErr + }, + } + + // Create coordinator with mock managers + coord := &Coordinator{ + runtimeMgr: mockRuntimeMgr, + otelMgr: mockOtelMgr, + } + + // Create test components and additional metrics + comp1 := component.Component{ID: "test-comp-1"} + comp2 := component.Component{ID: "test-comp-2"} + additionalMetrics := []cproto.AdditionalDiagnosticRequest{ + cproto.AdditionalDiagnosticRequest_CPU, + cproto.AdditionalDiagnosticRequest_CONN, + } + + // Execute PerformComponentDiagnostics + ctx := context.Background() + result, err := coord.PerformComponentDiagnostics(ctx, additionalMetrics, comp1, comp2) + + // Verify error handling + if tt.otelErr != nil { + assert.ErrorIs(t, err, tt.otelErr, "Returned error should include otel manager error") + } + if tt.runtimeErr != nil { + assert.ErrorIs(t, err, tt.runtimeErr, "Returned error should include runtime manager error") + } + + // Verify results + runtimeDiagFound := 0 + otelDiagFound := 0 + for _, diag := range result { + if diag.Component.ID == "runtime-comp-1" { + runtimeDiagFound++ + } + if diag.Component.ID == "otel-comp-1" { + otelDiagFound++ + } + } + assert.Equal(t, tt.expectedRuntimeDiagCount, runtimeDiagFound, "Runtime diagnostic count should match expected") + assert.Equal(t, tt.expectedOtelDiagCount, otelDiagFound, "OTel diagnostic count should match expected") + }) + } +} diff --git a/internal/pkg/otel/manager/common_test.go b/internal/pkg/otel/manager/common_test.go index d616bbc19e4..f728cff8f52 100644 --- a/internal/pkg/otel/manager/common_test.go +++ b/internal/pkg/otel/manager/common_test.go @@ -7,8 +7,13 @@ package manager import ( "errors" "net" + "path/filepath" "testing" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/stretchr/testify/require" ) @@ -27,3 +32,66 @@ func TestFindRandomPort(t *testing.T) { _, err = findRandomTCPPort() require.Error(t, err, "failed to find random port") } + +func testComponent(componentId string) component.Component { + fileStreamConfig := map[string]any{ + "id": "test", + "use_output": "default", + "streams": []any{ + map[string]any{ + "id": "test-1", + "data_stream": map[string]any{ + "dataset": "generic-1", + }, + "paths": []any{ + filepath.Join(paths.TempDir(), "nonexistent.log"), + }, + }, + map[string]any{ + "id": "test-2", + "data_stream": map[string]any{ + "dataset": "generic-2", + }, + "paths": []any{ + filepath.Join(paths.TempDir(), "nonexistent.log"), + }, + }, + }, + } + + esOutputConfig := map[string]any{ + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "username": "elastic", + "password": "password", + "preset": "balanced", + "queue.mem.events": 3200, + } + + return component.Component{ + ID: componentId, + RuntimeManager: component.OtelRuntimeManager, + InputType: "filestream", + OutputType: "elasticsearch", + InputSpec: &component.InputRuntimeSpec{ + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + }, + }, + Units: []component.Unit{ + { + ID: "filestream-unit", + Type: client.UnitTypeInput, + Config: component.MustExpectedConfig(fileStreamConfig), + }, + { + ID: "filestream-default", + Type: client.UnitTypeOutput, + Config: component.MustExpectedConfig(esOutputConfig), + }, + }, + } +} diff --git a/internal/pkg/otel/manager/diagnostics.go b/internal/pkg/otel/manager/diagnostics.go new file mode 100644 index 00000000000..513d784b4e9 --- /dev/null +++ b/internal/pkg/otel/manager/diagnostics.go @@ -0,0 +1,102 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" +) + +// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then +// it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning +// is logged. +func (m *OTelManager) PerformDiagnostics(ctx context.Context, req ...runtime.ComponentUnitDiagnosticRequest) []runtime.ComponentUnitDiagnostic { + var diagnostics []runtime.ComponentUnitDiagnostic + m.mx.RLock() + currentComponents := m.components + m.mx.RUnlock() + + // if no request is provided, then perform diagnostics for all units + if len(req) == 0 { + for _, comp := range currentComponents { + for _, unit := range comp.Units { + diagnostics = append(diagnostics, runtime.ComponentUnitDiagnostic{ + Component: comp, + Unit: unit, + }) + } + } + return diagnostics + } + + // create a map of unit by component and unit id, this is used to filter out units that + // do not exist in the manager + unitByID := make(map[string]map[string]*component.Unit) + for _, r := range req { + if unitByID[r.Component.ID] == nil { + unitByID[r.Component.ID] = make(map[string]*component.Unit) + } + unitByID[r.Component.ID][r.Unit.ID] = &r.Unit + } + + // create empty diagnostics for units that exist in the manager + for _, existingComp := range currentComponents { + inputComp, ok := unitByID[existingComp.ID] + if !ok { + m.logger.Warnf("requested diagnostics for component %s, but it does not exist in the manager", existingComp.ID) + continue + } + for _, unit := range existingComp.Units { + if _, ok := inputComp[unit.ID]; ok { + diagnostics = append(diagnostics, runtime.ComponentUnitDiagnostic{ + Component: existingComp, + Unit: unit, + }) + } else { + m.logger.Warnf("requested diagnostics for unit %s, but it does not exist in the manager", unit.ID) + } + } + } + + return diagnostics +} + +// PerformComponentDiagnostics executes the diagnostic action for the provided components. If no components are provided, +// then it performs the diagnostics for all current components. +func (m *OTelManager) PerformComponentDiagnostics( + ctx context.Context, additionalMetrics []cproto.AdditionalDiagnosticRequest, req ...component.Component, +) ([]runtime.ComponentDiagnostic, error) { + var diagnostics []runtime.ComponentDiagnostic + m.mx.RLock() + currentComponents := m.components + m.mx.RUnlock() + + // if no request is provided, then perform diagnostics for all components + if len(req) == 0 { + req = currentComponents + } + + // create a map of component by id, this is used to filter out components that do not exist in the manager + compByID := make(map[string]component.Component) + for _, comp := range req { + compByID[comp.ID] = comp + } + + // create empty diagnostics for components that exist in the manager + for _, existingComp := range currentComponents { + if inputComp, ok := compByID[existingComp.ID]; ok { + diagnostics = append(diagnostics, runtime.ComponentDiagnostic{ + Component: inputComp, + }) + } else { + m.logger.Warnf("requested diagnostics for component %s, but it does not exist in the manager", existingComp.ID) + } + } + + return diagnostics, nil +} diff --git a/internal/pkg/otel/manager/diagnostics_test.go b/internal/pkg/otel/manager/diagnostics_test.go new file mode 100644 index 00000000000..2dbb52e8432 --- /dev/null +++ b/internal/pkg/otel/manager/diagnostics_test.go @@ -0,0 +1,101 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + "testing" + + "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent/pkg/component" +) + +func TestPerformComponentDiagnostics(t *testing.T) { + logger, _ := loggertest.New("test") + compID := "filebeat-comp-1" + + filebeatComp := testComponent(compID) + filebeatComp.InputSpec.Spec.Command.Args = []string{"filebeat"} + + otherComp := testComponent("other-comp") + otherComp.InputSpec.Spec.Command.Args = []string{"metricbeat"} + + m := &OTelManager{ + logger: logger, + components: []component.Component{filebeatComp, otherComp}, + } + + expectedDiags := []runtime.ComponentDiagnostic{ + { + Component: filebeatComp, + }, + { + Component: otherComp, + }, + } + + diags, err := m.PerformComponentDiagnostics(context.Background(), nil) + require.NoError(t, err) + assert.Equal(t, expectedDiags, diags) +} + +func TestPerformDiagnostics(t *testing.T) { + logger, _ := loggertest.New("test") + compID := "filebeat-comp-1" + + filebeatComp := testComponent(compID) + filebeatComp.InputSpec.Spec.Command.Args = []string{"filebeat"} + + otherComp := testComponent("other-comp") + otherComp.InputSpec.Spec.Command.Args = []string{"metricbeat"} + + m := &OTelManager{ + logger: logger, + components: []component.Component{filebeatComp, otherComp}, + } + + t.Run("diagnose all units when no request is provided", func(t *testing.T) { + expectedDiags := []runtime.ComponentUnitDiagnostic{ + { + Component: filebeatComp, + Unit: filebeatComp.Units[0], + }, + { + Component: filebeatComp, + Unit: filebeatComp.Units[1], + }, + { + Component: otherComp, + Unit: otherComp.Units[0], + }, + { + Component: otherComp, + Unit: otherComp.Units[1], + }, + } + diags := m.PerformDiagnostics(t.Context()) + assert.Equal(t, expectedDiags, diags) + }) + + t.Run("diagnose specific unit", func(t *testing.T) { + req := runtime.ComponentUnitDiagnosticRequest{ + Component: filebeatComp, + Unit: filebeatComp.Units[0], + } + expectedDiags := []runtime.ComponentUnitDiagnostic{ + { + Component: filebeatComp, + Unit: filebeatComp.Units[0], + }, + } + diags := m.PerformDiagnostics(t.Context(), req) + assert.Equal(t, expectedDiags, diags) + }) +} diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 46b2d7bf6c5..06aeb1a2d89 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -265,12 +265,19 @@ func (m *OTelManager) Run(ctx context.Context) error { // and reset the retry count m.recoveryTimer.Stop() m.recoveryRetries.Store(0) - err = m.handleConfigUpdate(cfgUpdate.collectorCfg, cfgUpdate.components) + mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter) if err != nil { reportErr(ctx, m.errCh, err) continue } + // this is the only place where we mutate the internal config attributes, take a write lock for the duration + m.mx.Lock() + m.mergedCollectorCfg = mergedCfg + m.collectorCfg = cfgUpdate.collectorCfg + m.components = cfgUpdate.components + m.mx.Unlock() + err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr) // report the error unconditionally to indicate that the config was applied reportErr(ctx, m.errCh, err) @@ -289,32 +296,23 @@ func (m *OTelManager) Errors() <-chan error { return m.errCh } -// handleConfigUpdate processes collector and component configuration updates received through the updateCh. -// This method updates the internal collector and component configurations and triggers a rebuild of the merged -// configuration that combines them. -func (m *OTelManager) handleConfigUpdate(cfg *confmap.Conf, components []component.Component) error { - m.collectorCfg = cfg - m.components = components - return m.updateMergedConfig() -} - // buildMergedConfig combines collector configuration with component-derived configuration. -func (m *OTelManager) buildMergedConfig() (*confmap.Conf, error) { +func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringConfigGetter translate.BeatMonitoringConfigGetter) (*confmap.Conf, error) { mergedOtelCfg := confmap.New() // Generate component otel config if there are components var componentOtelCfg *confmap.Conf - if len(m.components) > 0 { - model := &component.Model{Components: m.components} + if len(cfgUpdate.components) > 0 { + model := &component.Model{Components: cfgUpdate.components} var err error - componentOtelCfg, err = translate.GetOtelConfig(model, m.agentInfo, m.beatMonitoringConfigGetter) + componentOtelCfg, err = translate.GetOtelConfig(model, agentInfo, monitoringConfigGetter) if err != nil { return nil, fmt.Errorf("failed to generate otel config: %w", err) } } // If both configs are nil, return nil so the manager knows to stop the collector - if componentOtelCfg == nil && m.collectorCfg == nil { + if componentOtelCfg == nil && cfgUpdate.collectorCfg == nil { return nil, nil } @@ -327,8 +325,8 @@ func (m *OTelManager) buildMergedConfig() (*confmap.Conf, error) { } // Merge with base collector config if it exists - if m.collectorCfg != nil { - err := mergedOtelCfg.Merge(m.collectorCfg) + if cfgUpdate.collectorCfg != nil { + err := mergedOtelCfg.Merge(cfgUpdate.collectorCfg) if err != nil { return nil, fmt.Errorf("failed to merge collector otel config: %w", err) } @@ -337,20 +335,6 @@ func (m *OTelManager) buildMergedConfig() (*confmap.Conf, error) { return mergedOtelCfg, nil } -// updateMergedConfig builds the merged configuration for the otel manager by merging the base collector configuration -// with the component configuration, and updates the otel manager with the merged configuration. -func (m *OTelManager) updateMergedConfig() error { - mergedCfg, err := m.buildMergedConfig() - if err != nil { - return err - } - - m.mx.Lock() - defer m.mx.Unlock() - m.mergedCollectorCfg = mergedCfg - return nil -} - func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error { if m.proc != nil { m.proc.Stop(ctx) diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 4998c849ae1..215173bbfb3 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -17,7 +17,6 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/otel/translate" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" @@ -723,15 +722,11 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mgr := &OTelManager{ - logger: newTestLogger(), - collectorCfg: tt.collectorCfg, - components: tt.components, - agentInfo: commonAgentInfo, - beatMonitoringConfigGetter: commonBeatMonitoringConfigGetter, + cfgUpdate := configUpdate{ + collectorCfg: tt.collectorCfg, + components: tt.components, } - - result, err := mgr.buildMergedConfig() + result, err := buildMergedConfig(cfgUpdate, commonAgentInfo, commonBeatMonitoringConfigGetter) if tt.expectedErrorString != "" { assert.Error(t, err) @@ -755,97 +750,6 @@ func TestOTelManager_buildMergedConfig(t *testing.T) { } } -func TestOTelManager_handleConfigUpdate(t *testing.T) { - testComp := testComponent("test-component") - t.Run("successful update with empty collector config and components", func(t *testing.T) { - mgr := &OTelManager{ - logger: newTestLogger(), - agentInfo: &info.AgentInfo{}, - beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, - } - - err := mgr.handleConfigUpdate(nil, nil) - - assert.NoError(t, err) - assert.Nil(t, mgr.components) - // Verify that Update was called with nil config (empty components should result in nil config) - assert.Nil(t, mgr.mergedCollectorCfg) - }) - - t.Run("successful update with components and empty collector config", func(t *testing.T) { - mgr := &OTelManager{ - logger: newTestLogger(), - agentInfo: &info.AgentInfo{}, - beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, - } - - components := []component.Component{testComp} - err := mgr.handleConfigUpdate(nil, components) - - assert.NoError(t, err) - assert.Equal(t, components, mgr.components) - // Verify that Update was called with a valid configuration - assert.NotNil(t, mgr.mergedCollectorCfg) - // Verify that the configuration contains expected OpenTelemetry sections - assert.True(t, mgr.mergedCollectorCfg.IsSet("receivers"), "Expected receivers section in config") - assert.True(t, mgr.mergedCollectorCfg.IsSet("exporters"), "Expected exporters section in config") - assert.True(t, mgr.mergedCollectorCfg.IsSet("service"), "Expected service section in config") - }) - - t.Run("successful update with empty components and collector config", func(t *testing.T) { - mgr := &OTelManager{ - logger: newTestLogger(), - agentInfo: &info.AgentInfo{}, - beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, - } - - collectorConfig := confmap.NewFromStringMap(map[string]any{ - "receivers": map[string]any{ - "nop": map[string]any{}, - }, - "processors": map[string]any{ - "batch": map[string]any{}, - }, - }) - - err := mgr.handleConfigUpdate(collectorConfig, nil) - - assert.NoError(t, err) - assert.Equal(t, collectorConfig, mgr.collectorCfg) - assert.Equal(t, collectorConfig, mgr.MergedOtelConfig()) - // Verify that Update was called with the collector configuration - assert.NotNil(t, mgr.mergedCollectorCfg) - // Verify that the configuration contains expected collector sections - assert.True(t, mgr.mergedCollectorCfg.IsSet("receivers"), "Expected receivers section in config") - assert.True(t, mgr.mergedCollectorCfg.IsSet("processors"), "Expected processors section in config") - }) - - t.Run("successful update with both collector config and components", func(t *testing.T) { - mgr := &OTelManager{ - logger: newTestLogger(), - agentInfo: &info.AgentInfo{}, - beatMonitoringConfigGetter: mockBeatMonitoringConfigGetter, - } - components := []component.Component{testComponent("test-component")} - - collectorConfig := confmap.NewFromStringMap(map[string]any{ - "processors": map[string]any{ - "batch": map[string]any{}, - }, - }) - - err := mgr.handleConfigUpdate(collectorConfig, components) - - assert.NoError(t, err) - assert.Equal(t, collectorConfig, mgr.collectorCfg) - // Verify that the configuration contains both collector and component sections - assert.True(t, mgr.mergedCollectorCfg.IsSet("receivers"), "Expected receivers section from components") - assert.True(t, mgr.mergedCollectorCfg.IsSet("exporters"), "Expected exporters section from components") - assert.True(t, mgr.mergedCollectorCfg.IsSet("service"), "Expected service section from components") - assert.True(t, mgr.mergedCollectorCfg.IsSet("processors"), "Expected processors section from collector config") - }) -} - func TestOTelManager_handleOtelStatusUpdate(t *testing.T) { // Common test component used across test cases testComp := testComponent("test-component") @@ -1263,69 +1167,6 @@ func TestOTelManagerEndToEnd(t *testing.T) { }) } -func testComponent(componentId string) component.Component { - fileStreamConfig := map[string]any{ - "id": "test", - "use_output": "default", - "streams": []any{ - map[string]any{ - "id": "test-1", - "data_stream": map[string]any{ - "dataset": "generic-1", - }, - "paths": []any{ - filepath.Join(paths.TempDir(), "nonexistent.log"), - }, - }, - map[string]any{ - "id": "test-2", - "data_stream": map[string]any{ - "dataset": "generic-2", - }, - "paths": []any{ - filepath.Join(paths.TempDir(), "nonexistent.log"), - }, - }, - }, - } - - esOutputConfig := map[string]any{ - "type": "elasticsearch", - "hosts": []any{"localhost:9200"}, - "username": "elastic", - "password": "password", - "preset": "balanced", - "queue.mem.events": 3200, - } - - return component.Component{ - ID: componentId, - RuntimeManager: component.OtelRuntimeManager, - InputType: "filestream", - OutputType: "elasticsearch", - InputSpec: &component.InputRuntimeSpec{ - BinaryName: "agentbeat", - Spec: component.InputSpec{ - Command: &component.CommandSpec{ - Args: []string{"filebeat"}, - }, - }, - }, - Units: []component.Unit{ - { - ID: "filestream-unit", - Type: client.UnitTypeInput, - Config: component.MustExpectedConfig(fileStreamConfig), - }, - { - ID: "filestream-default", - Type: client.UnitTypeOutput, - Config: component.MustExpectedConfig(esOutputConfig), - }, - }, - } -} - func getFromChannelOrErrorWithContext[T any](t *testing.T, ctx context.Context, ch <-chan T, errCh <-chan error) (T, error) { t.Helper() var result T diff --git a/testing/integration/ess/diagnostics_test.go b/testing/integration/ess/diagnostics_test.go index 6f559ec551b..a72b092184c 100644 --- a/testing/integration/ess/diagnostics_test.go +++ b/testing/integration/ess/diagnostics_test.go @@ -8,6 +8,7 @@ package ess import ( "archive/zip" + "bytes" "context" "io" "io/fs" @@ -16,6 +17,7 @@ import ( "path/filepath" "strings" "testing" + "text/template" "time" "github.com/stretchr/testify/assert" @@ -295,6 +297,91 @@ func TestRedactFleetSecretPathsDiagnostics(t *testing.T) { assert.Equal(t, "", yObj.Inputs[0].CustomAttr) } +func TestBeatDiagnostics(t *testing.T) { + define.Require(t, define.Requirements{ + Group: integration.Default, + Local: false, + }) + + configTemplate := ` +inputs: + - id: filestream-filebeat + type: filestream + paths: + - /var/log/system.log + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + use_output: default + _runtime_experimental: {{ .Runtime }} +outputs: + default: + type: elasticsearch + hosts: [http://localhost:9200] + api_key: placeholder +agent.monitoring.enabled: false +` + + var filebeatSetup = map[string]integrationtest.ComponentState{ + "filestream-default": { + State: integrationtest.NewClientState(client.Healthy), + Units: map[integrationtest.ComponentUnitKey]integrationtest.ComponentUnitState{ + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "filestream-default"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + integrationtest.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "filestream-filebeat"}: { + State: integrationtest.NewClientState(client.Healthy), + }, + }, + }, + } + f, err := define.NewFixtureFromLocalBuild(t, define.Version(), integrationtest.WithAllowErrors()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(10*time.Minute)) + defer cancel() + err = f.Prepare(ctx) + require.NoError(t, err) + + t.Run("filebeat process", func(t *testing.T) { + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, map[string]any{ + "Runtime": "process", + })) + expectedCompDiagnosticsFiles := append(compDiagnosticsFiles, + "registry.tar.gz", + "input_metrics.json", + "beat_metrics.json", + "beat-rendered-config.yml", + "global_processors.txt", + "filestream-filebeat/error.txt", + "filestream-default/error.txt", + ) + err = f.Run(ctx, integrationtest.State{ + Configure: configBuffer.String(), + AgentState: integrationtest.NewClientState(client.Healthy), + After: testDiagnosticsFactory(t, filebeatSetup, diagnosticsFiles, expectedCompDiagnosticsFiles, f, []string{"diagnostics", "collect"}), + }) + assert.NoError(t, err) + }) + + t.Run("filebeat receiver", func(t *testing.T) { + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, map[string]any{ + "Runtime": "otel", + })) + // currently we don't expect any diagnostics files for beats receivers + var expectedCompDiagnosticsFiles []string + err = f.Run(ctx, integrationtest.State{ + Configure: configBuffer.String(), + AgentState: integrationtest.NewClientState(client.Healthy), + After: testDiagnosticsFactory(t, filebeatSetup, diagnosticsFiles, expectedCompDiagnosticsFiles, f, []string{"diagnostics", "collect"}), + }) + assert.NoError(t, err) + }) +} + func testDiagnosticsFactory(t *testing.T, compSetup map[string]integrationtest.ComponentState, diagFiles []string, diagCompFiles []string, fix *integrationtest.Fixture, cmd []string) func(ctx context.Context) error { return func(ctx context.Context) error { diagZip, err := fix.ExecDiagnostics(ctx, cmd...)