From 5e4a15ea2fb3987a3f6324bc4d6450d3a8970597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Wed, 25 Jun 2025 17:46:46 +0200 Subject: [PATCH 1/5] Add diagnostics to otel manager # Conflicts: # internal/pkg/agent/application/coordinator/coordinator_test.go # internal/pkg/otel/manager/manager.go --- internal/pkg/otel/manager/diagnostics.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/internal/pkg/otel/manager/diagnostics.go b/internal/pkg/otel/manager/diagnostics.go index 513d784b4e9..ed5c2e12c7c 100644 --- a/internal/pkg/otel/manager/diagnostics.go +++ b/internal/pkg/otel/manager/diagnostics.go @@ -63,6 +63,23 @@ func (m *OTelManager) PerformDiagnostics(ctx context.Context, req ...runtime.Com } } + // we don't do per-unit diagnostics for beats receivers, instead we just return component diagnostics + componentDiagnostics, err := m.PerformComponentDiagnostics(ctx, nil, m.components...) + if err != nil { + m.logger.Errorf("error performing component diagnostics: %v", err) + return nil + } + compIdToDiag := make(map[string]*runtime.ComponentDiagnostic) + for _, diag := range componentDiagnostics { + compIdToDiag[diag.Component.ID] = &diag + } + + for i := range diagnostics { + if compDiag, ok := compIdToDiag[diagnostics[i].Component.ID]; ok { + diagnostics[i].Results = compDiag.Results + } + } + return diagnostics } From fac30863dcc4bb889fb7c5f7f4baf42e20279e50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 14 Jul 2025 16:50:17 +0200 Subject: [PATCH 2/5] Move helper function to allow diagnostic tests to run on Windows --- internal/pkg/otel/manager/diagnostics.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/internal/pkg/otel/manager/diagnostics.go b/internal/pkg/otel/manager/diagnostics.go index ed5c2e12c7c..513d784b4e9 100644 --- a/internal/pkg/otel/manager/diagnostics.go +++ b/internal/pkg/otel/manager/diagnostics.go @@ -63,23 +63,6 @@ func (m *OTelManager) PerformDiagnostics(ctx context.Context, req ...runtime.Com } } - // we don't do per-unit diagnostics for beats receivers, instead we just return component diagnostics - componentDiagnostics, err := m.PerformComponentDiagnostics(ctx, nil, m.components...) - if err != nil { - m.logger.Errorf("error performing component diagnostics: %v", err) - return nil - } - compIdToDiag := make(map[string]*runtime.ComponentDiagnostic) - for _, diag := range componentDiagnostics { - compIdToDiag[diag.Component.ID] = &diag - } - - for i := range diagnostics { - if compDiag, ok := compIdToDiag[diagnostics[i].Component.ID]; ok { - diagnostics[i].Results = compDiag.Results - } - } - return diagnostics } From 534ec995e87d1f91ac0ebd7aa639e626573bf38b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 14 Jul 2025 16:50:17 +0200 Subject: [PATCH 3/5] Move helper function to allow diagnostic tests to run on Windows --- internal/pkg/otel/manager/common_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/pkg/otel/manager/common_test.go b/internal/pkg/otel/manager/common_test.go index f728cff8f52..d5852ee9d3b 100644 --- a/internal/pkg/otel/manager/common_test.go +++ b/internal/pkg/otel/manager/common_test.go @@ -6,6 +6,9 @@ package manager import ( "errors" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/component" "net" "path/filepath" "testing" From 2c4ddc8f6f694517c20b8fc39facc3349c5bb098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 14 Jul 2025 16:15:27 +0200 Subject: [PATCH 4/5] Add filebeat registry to filebeat receiver diagnostic --- internal/pkg/otel/manager/common_test.go | 3 - internal/pkg/otel/manager/diagnostics.go | 203 ++++++++++++++++++ internal/pkg/otel/manager/diagnostics_test.go | 141 ++++++++++++ internal/pkg/otel/translate/otelconfig.go | 19 +- testing/integration/ess/diagnostics_test.go | 2 +- 5 files changed, 356 insertions(+), 12 deletions(-) diff --git a/internal/pkg/otel/manager/common_test.go b/internal/pkg/otel/manager/common_test.go index d5852ee9d3b..f728cff8f52 100644 --- a/internal/pkg/otel/manager/common_test.go +++ b/internal/pkg/otel/manager/common_test.go @@ -6,9 +6,6 @@ package manager import ( "errors" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" - "github.com/elastic/elastic-agent/pkg/component" "net" "path/filepath" "testing" diff --git a/internal/pkg/otel/manager/diagnostics.go b/internal/pkg/otel/manager/diagnostics.go index 513d784b4e9..042e0d3ea2c 100644 --- a/internal/pkg/otel/manager/diagnostics.go +++ b/internal/pkg/otel/manager/diagnostics.go @@ -5,13 +5,32 @@ package manager import ( + "archive/tar" + "bytes" + "compress/gzip" "context" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "regexp" + "strings" + + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/otel/translate" + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" ) +var fileBeatRegistryPathRegExps = getRegexpsForRegistryFiles() + // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then // it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning // is logged. @@ -98,5 +117,189 @@ func (m *OTelManager) PerformComponentDiagnostics( } } + for idx, diag := range diagnostics { + var results []*proto.ActionDiagnosticUnitResult + if translate.GetBeatNameForComponent(&diag.Component) == "filebeat" { + // include filebeat registry, reimplementation of a filebeat diagnostic hook + registryTarGzBytes, err := FileBeatRegistryTarGz(m.logger, diag.Component.ID) + if err != nil { + m.logger.Warnf("error creating registry tar.gz: %v", err) + continue + } + m.logger.Debugf("created registry tar.gz, size %d", len(registryTarGzBytes)) + results = append(results, &proto.ActionDiagnosticUnitResult{ + Name: "registry", + Description: "Filebeat's registry", + Filename: "registry.tar.gz", + ContentType: "application/octet-stream", + Content: registryTarGzBytes, + Generated: timestamppb.Now(), + }) + } + diagnostics[idx].Results = results + } + return diagnostics, nil } + +func FileBeatRegistryPath(componentID string) string { + dataPath := translate.BeatDataPath(componentID) + return filepath.Join(dataPath, "registry") +} + +// FileBeatRegistryTarGz creates a tar.gz file containing the filebeat registry and returns its contents as bytes. +func FileBeatRegistryTarGz(logger *logger.Logger, componentID string) ([]byte, error) { + registryPath := FileBeatRegistryPath(componentID) + + tempFile, err := os.CreateTemp("", "temp-registry.tar.gz") + if err != nil { + return nil, err + } + + defer func() { + if closeErr := tempFile.Close(); closeErr != nil { + logger.Warn("error closing temporary registry archive", "error", closeErr) + } + if removeErr := os.Remove(tempFile.Name()); removeErr != nil { + logger.Warnf("cannot remove temporary registry archive '%s': '%s'", tempFile.Name(), removeErr) + } + }() + + gzWriter := gzip.NewWriter(tempFile) + defer func() { + if closeErr := gzWriter.Close(); closeErr != nil { + logger.Warnf("error closing gzip writer: %v", closeErr) + } + }() + + err = tarFolder(logger, gzWriter, registryPath) + if err != nil { + return nil, err + } + if closeErr := gzWriter.Close(); closeErr != nil { + return nil, closeErr + } + + stat, err := tempFile.Stat() + if err != nil { + return nil, err + } + + if stat.Size() > 20_000_000 { + return nil, fmt.Errorf("registry is too large for diagnostics, %d > 20mb", stat.Size()/1_000_000) + } + + var output bytes.Buffer + _, err = tempFile.Seek(0, 0) + if err != nil { + return nil, err + } + _, err = io.Copy(&output, tempFile) + if err != nil { + return nil, err + } + + return output.Bytes(), nil +} + +// getRegexpsForRegistryFiles returns a list of regexps to match filebeat registry files. +func getRegexpsForRegistryFiles() []*regexp.Regexp { + var registryFileRegExps []*regexp.Regexp + preFilesList := [][]string{ + {"^registry$"}, + {"^registry", "filebeat$"}, + {"^registry", "filebeat", "meta\\.json$"}, + {"^registry", "filebeat", "log\\.json$"}, + {"^registry", "filebeat", "active\\.dat$"}, + {"^registry", "filebeat", "[[:digit:]]*\\.json$"}, + } + + for _, lst := range preFilesList { + pathRe := filepath.Join(lst...) + re := regexp.MustCompile(pathRe) + registryFileRegExps = append(registryFileRegExps, re) + } + + return registryFileRegExps +} + +// tarFolder creates a tar archive from the folder src and stores it at dst. +// +// dst must be the full path with extension, e.g: /tmp/foo.tar +// If src is not a folder an error is returned +func tarFolder(logger *logger.Logger, dst io.Writer, srcPath string) error { + fullPath, err := filepath.Abs(srcPath) + if err != nil { + return fmt.Errorf("cannot get full path from '%s': '%w'", srcPath, err) + } + + tarWriter := tar.NewWriter(dst) + defer func() { + if err := tarWriter.Close(); err != nil { + logger.Warnf("cannot close tar writer: '%s'", err) + } + }() + + info, err := os.Stat(fullPath) + if err != nil { + return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err) + } + + if !info.IsDir() { + return fmt.Errorf("'%s' is not a directory", fullPath) + } + baseDir := filepath.Base(srcPath) + + logger.Debugf("starting to walk '%s'", fullPath) + + return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error { + // Stop if there is any errors + if prevErr != nil { + return prevErr + } + + pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, srcPath)) + if !matchRegistryFiles(fileBeatRegistryPathRegExps, pathInTar) { + return nil + } + header, err := tar.FileInfoHeader(info, info.Name()) + if err != nil { + return fmt.Errorf("cannot create tar info header: '%w'", err) + } + header.Name = pathInTar + + if err := tarWriter.WriteHeader(header); err != nil { + return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) + } + + if info.IsDir() { + return nil + } + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("cannot open '%s' for reading: '%w", path, err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + logger.Warnf("cannot close file '%s': '%s'", path, closeErr) + } + }() + + logger.Debugf("adding '%s' to the tar archive", file.Name()) + if _, err := io.Copy(tarWriter, file); err != nil { + return fmt.Errorf("cannot read '%s': '%w'", path, err) + } + + return nil + }) +} + +func matchRegistryFiles(registryFileRegExps []*regexp.Regexp, path string) bool { + for _, regExp := range registryFileRegExps { + if regExp.MatchString(path) { + return true + } + } + return false +} diff --git a/internal/pkg/otel/manager/diagnostics_test.go b/internal/pkg/otel/manager/diagnostics_test.go index 2dbb52e8432..93b26f2e31f 100644 --- a/internal/pkg/otel/manager/diagnostics_test.go +++ b/internal/pkg/otel/manager/diagnostics_test.go @@ -5,9 +5,19 @@ package manager import ( + "archive/tar" + "bytes" + "compress/gzip" "context" + "crypto/rand" + "io" + "os" + "path/filepath" + "regexp" "testing" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" @@ -99,3 +109,134 @@ func TestPerformDiagnostics(t *testing.T) { assert.Equal(t, expectedDiags, diags) }) } + +func TestMatchRegistryFiles(t *testing.T) { + regexps := getRegexpsForRegistryFiles() + testCases := []struct { + path string + expected bool + }{ + {"registry", true}, + {filepath.Join("registry", "filebeat"), true}, + {filepath.Join("registry", "filebeat", "meta.json"), true}, + {filepath.Join("registry", "filebeat", "log.json"), true}, + {filepath.Join("registry", "filebeat", "active.dat"), true}, + {filepath.Join("registry", "filebeat", "12345.json"), true}, + {filepath.Join("registry", "filebeat", "other.txt"), false}, + {"not_registry", false}, + } + + for _, tc := range testCases { + t.Run(tc.path, func(t *testing.T) { + assert.Equal(t, tc.expected, matchRegistryFiles(regexps, tc.path)) + }) + } +} + +func TestTarFolder(t *testing.T) { + logger, _ := loggertest.New("test") + + // Create a temporary source directory + srcDir, err := os.MkdirTemp("", "src") + require.NoError(t, err) + defer os.RemoveAll(srcDir) + + // Create registry structure + registryDir := filepath.Join(srcDir, "registry") + filebeatDir := filepath.Join(registryDir, "filebeat") + require.NoError(t, os.MkdirAll(filebeatDir, 0755)) + + // Create files + filesToCreate := []string{ + filepath.Join(filebeatDir, "meta.json"), + filepath.Join(filebeatDir, "log.json"), + filepath.Join(filebeatDir, "123.json"), + filepath.Join(filebeatDir, "should_be_ignored.txt"), + } + for _, f := range filesToCreate { + require.NoError(t, os.WriteFile(f, []byte("test data"), 0644)) + } + + // Tar the folder + var buf bytes.Buffer + err = tarFolder(logger, &buf, registryDir) + require.NoError(t, err) + + // Verify the tar contents + tarReader := tar.NewReader(&buf) + foundFiles := make(map[string]bool) + for { + hdr, err := tarReader.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + foundFiles[hdr.Name] = true + } + + assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "meta.json")]) + assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "log.json")]) + assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "123.json")]) + assert.False(t, foundFiles[filepath.Join("registry", "filebeat", "should_be_ignored.txt")]) +} + +func TestFileBeatRegistryPath(t *testing.T) { + compID := "test-component" + expectedPath := filepath.Join(paths.Run(), compID, "registry") + assert.Equal(t, expectedPath, FileBeatRegistryPath(compID)) +} + +func TestFileBeatRegistryTarGz(t *testing.T) { + logger, _ := loggertest.New("test") + compID := "filebeat-comp-1" + + setTemporaryAgentPath(t) + registryPath := FileBeatRegistryPath(compID) + require.NoError(t, os.MkdirAll(filepath.Join(registryPath, "filebeat"), 0755)) + require.NoError(t, os.WriteFile(filepath.Join(registryPath, "filebeat", "meta.json"), []byte("test"), 0644)) + + t.Run("creates a valid tar.gz", func(t *testing.T) { + data, err := FileBeatRegistryTarGz(logger, compID) + require.NoError(t, err) + + gzReader, err := gzip.NewReader(bytes.NewReader(data)) + require.NoError(t, err) + tarReader := tar.NewReader(gzReader) + hdr, err := tarReader.Next() + require.NoError(t, err) + assert.Equal(t, "registry", hdr.Name) + hdr, err = tarReader.Next() + require.NoError(t, err) + assert.Equal(t, filepath.Join("registry", "filebeat"), hdr.Name) + hdr, err = tarReader.Next() + require.NoError(t, err) + assert.Equal(t, filepath.Join("registry", "filebeat", "meta.json"), hdr.Name) + }) + + t.Run("returns error if registry is too large", func(t *testing.T) { + // Temporarily change the regex to include a large file + originalRegexps := fileBeatRegistryPathRegExps + fileBeatRegistryPathRegExps = []*regexp.Regexp{regexp.MustCompile(".*")} + defer func() { fileBeatRegistryPathRegExps = originalRegexps }() + + largeFilePath := filepath.Join(registryPath, "largefile.log") + largeData := make([]byte, 21*1024*1024) // 21MB + _, err := rand.Read(largeData) + require.NoError(t, err) + require.NoError(t, os.WriteFile(largeFilePath, largeData, 0644)) + defer os.Remove(largeFilePath) + + _, err = FileBeatRegistryTarGz(logger, compID) + require.Error(t, err) + assert.Contains(t, err.Error(), "registry is too large for diagnostics") + }) +} + +func setTemporaryAgentPath(t *testing.T) { + topPath := paths.Top() + tempTopPath := t.TempDir() + paths.SetTop(tempTopPath) + t.Cleanup(func() { + paths.SetTop(topPath) + }) +} diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index bada4abc252..83c4f0a60a2 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -188,8 +188,7 @@ func getReceiversConfigForComponent( // Beat config inside a beat receiver is nested under an additional key. Not sure if this simple translation is // always safe. We should either ensure this is always the case, or have an explicit mapping. beatName := strings.TrimSuffix(receiverType.String(), "receiver") - beatDataPath := filepath.Join(paths.Run(), comp.ID) - binaryName := getBeatNameForComponent(comp) + binaryName := GetBeatNameForComponent(comp) dataset := fmt.Sprintf("elastic_agent.%s", strings.ReplaceAll(strings.ReplaceAll(binaryName, "-", "_"), "/", "_")) receiverConfig := map[string]any{ @@ -199,7 +198,7 @@ func getReceiversConfigForComponent( }, // just like we do for beats processes, each receiver needs its own data path "path": map[string]any{ - "data": beatDataPath, + "data": BeatDataPath(comp.ID), }, // adds additional context on logs emitted by beatreceivers to uniquely identify per component logs "logging": map[string]any{ @@ -264,8 +263,8 @@ func getExportersConfigForComponent(comp *component.Component) (exporterCfg map[ return exportersConfig, queueSettings, nil } -// getBeatNameForComponent returns the beat binary name that would be used to run this component. -func getBeatNameForComponent(comp *component.Component) string { +// GetBeatNameForComponent returns the beat binary name that would be used to run this component. +func GetBeatNameForComponent(comp *component.Component) string { // TODO: Add this information directly to the spec? if comp.InputSpec == nil || comp.InputSpec.BinaryName != "agentbeat" { return "" @@ -276,7 +275,7 @@ func getBeatNameForComponent(comp *component.Component) string { // getSignalForComponent returns the otel signal for the given component. Currently, this is always logs, even for // metricbeat. func getSignalForComponent(comp *component.Component) (pipeline.Signal, error) { - beatName := getBeatNameForComponent(comp) + beatName := GetBeatNameForComponent(comp) switch beatName { case "filebeat", "metricbeat": return pipeline.SignalLogs, nil @@ -287,7 +286,7 @@ func getSignalForComponent(comp *component.Component) (pipeline.Signal, error) { // getReceiverTypeForComponent returns the receiver type for the given component. func getReceiverTypeForComponent(comp *component.Component) (otelcomponent.Type, error) { - beatName := getBeatNameForComponent(comp) + beatName := GetBeatNameForComponent(comp) switch beatName { case "filebeat": return otelcomponent.MustNewType(fbreceiver.Name), nil @@ -384,7 +383,7 @@ func getInputsForUnit(unit component.Unit, info info.Agent, defaultDataStreamTyp // getDefaultDatastreamTypeForComponent returns the default datastream type for a given component. // This is needed to translate from the agent policy config format to the beats config format. func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, error) { - beatName := getBeatNameForComponent(comp) + beatName := GetBeatNameForComponent(comp) switch beatName { case "filebeat": return "logs", nil @@ -410,3 +409,7 @@ func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { esConfig["mapping"] = map[string]any{"mode": "bodymap"} return esConfig, nil } + +func BeatDataPath(componentId string) string { + return filepath.Join(paths.Run(), componentId) +} diff --git a/testing/integration/ess/diagnostics_test.go b/testing/integration/ess/diagnostics_test.go index a72b092184c..2ca7e66dbcb 100644 --- a/testing/integration/ess/diagnostics_test.go +++ b/testing/integration/ess/diagnostics_test.go @@ -372,7 +372,7 @@ agent.monitoring.enabled: false "Runtime": "otel", })) // currently we don't expect any diagnostics files for beats receivers - var expectedCompDiagnosticsFiles []string + expectedCompDiagnosticsFiles := []string{"registry.tar.gz"} err = f.Run(ctx, integrationtest.State{ Configure: configBuffer.String(), AgentState: integrationtest.NewClientState(client.Healthy), From 2e48fdf22c0964ba6cec997b54824aa169df11fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Wed, 16 Jul 2025 18:11:30 +0200 Subject: [PATCH 5/5] Fix file path regex on Windows --- internal/pkg/otel/manager/diagnostics.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/pkg/otel/manager/diagnostics.go b/internal/pkg/otel/manager/diagnostics.go index 042e0d3ea2c..1251f41988d 100644 --- a/internal/pkg/otel/manager/diagnostics.go +++ b/internal/pkg/otel/manager/diagnostics.go @@ -215,7 +215,9 @@ func getRegexpsForRegistryFiles() []*regexp.Regexp { } for _, lst := range preFilesList { - pathRe := filepath.Join(lst...) + // On windows, we need to ensure we escape the path separator, because backslash has a special meaning + separator := regexp.QuoteMeta(string(filepath.Separator)) + pathRe := strings.Join(lst, separator) re := regexp.MustCompile(pathRe) registryFileRegExps = append(registryFileRegExps, re) }