diff --git a/internal/pkg/otel/manager/diagnostics.go b/internal/pkg/otel/manager/diagnostics.go index 513d784b4e9..1251f41988d 100644 --- a/internal/pkg/otel/manager/diagnostics.go +++ b/internal/pkg/otel/manager/diagnostics.go @@ -5,13 +5,32 @@ package manager import ( + "archive/tar" + "bytes" + "compress/gzip" "context" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "regexp" + "strings" + + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent/internal/pkg/otel/translate" + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/control/v2/cproto" ) +var fileBeatRegistryPathRegExps = getRegexpsForRegistryFiles() + // PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then // it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning // is logged. @@ -98,5 +117,191 @@ func (m *OTelManager) PerformComponentDiagnostics( } } + for idx, diag := range diagnostics { + var results []*proto.ActionDiagnosticUnitResult + if translate.GetBeatNameForComponent(&diag.Component) == "filebeat" { + // include filebeat registry, reimplementation of a filebeat diagnostic hook + registryTarGzBytes, err := FileBeatRegistryTarGz(m.logger, diag.Component.ID) + if err != nil { + m.logger.Warnf("error creating registry tar.gz: %v", err) + continue + } + m.logger.Debugf("created registry tar.gz, size %d", len(registryTarGzBytes)) + results = append(results, &proto.ActionDiagnosticUnitResult{ + Name: "registry", + Description: "Filebeat's registry", + Filename: "registry.tar.gz", + ContentType: "application/octet-stream", + Content: registryTarGzBytes, + Generated: timestamppb.Now(), + }) + } + diagnostics[idx].Results = results + } + return diagnostics, nil } + +func FileBeatRegistryPath(componentID string) string { + dataPath := translate.BeatDataPath(componentID) + return filepath.Join(dataPath, "registry") +} + +// FileBeatRegistryTarGz creates a tar.gz file containing the filebeat registry and returns its contents as bytes. +func FileBeatRegistryTarGz(logger *logger.Logger, componentID string) ([]byte, error) { + registryPath := FileBeatRegistryPath(componentID) + + tempFile, err := os.CreateTemp("", "temp-registry.tar.gz") + if err != nil { + return nil, err + } + + defer func() { + if closeErr := tempFile.Close(); closeErr != nil { + logger.Warn("error closing temporary registry archive", "error", closeErr) + } + if removeErr := os.Remove(tempFile.Name()); removeErr != nil { + logger.Warnf("cannot remove temporary registry archive '%s': '%s'", tempFile.Name(), removeErr) + } + }() + + gzWriter := gzip.NewWriter(tempFile) + defer func() { + if closeErr := gzWriter.Close(); closeErr != nil { + logger.Warnf("error closing gzip writer: %v", closeErr) + } + }() + + err = tarFolder(logger, gzWriter, registryPath) + if err != nil { + return nil, err + } + if closeErr := gzWriter.Close(); closeErr != nil { + return nil, closeErr + } + + stat, err := tempFile.Stat() + if err != nil { + return nil, err + } + + if stat.Size() > 20_000_000 { + return nil, fmt.Errorf("registry is too large for diagnostics, %d > 20mb", stat.Size()/1_000_000) + } + + var output bytes.Buffer + _, err = tempFile.Seek(0, 0) + if err != nil { + return nil, err + } + _, err = io.Copy(&output, tempFile) + if err != nil { + return nil, err + } + + return output.Bytes(), nil +} + +// getRegexpsForRegistryFiles returns a list of regexps to match filebeat registry files. +func getRegexpsForRegistryFiles() []*regexp.Regexp { + var registryFileRegExps []*regexp.Regexp + preFilesList := [][]string{ + {"^registry$"}, + {"^registry", "filebeat$"}, + {"^registry", "filebeat", "meta\\.json$"}, + {"^registry", "filebeat", "log\\.json$"}, + {"^registry", "filebeat", "active\\.dat$"}, + {"^registry", "filebeat", "[[:digit:]]*\\.json$"}, + } + + for _, lst := range preFilesList { + // On windows, we need to ensure we escape the path separator, because backslash has a special meaning + separator := regexp.QuoteMeta(string(filepath.Separator)) + pathRe := strings.Join(lst, separator) + re := regexp.MustCompile(pathRe) + registryFileRegExps = append(registryFileRegExps, re) + } + + return registryFileRegExps +} + +// tarFolder creates a tar archive from the folder src and stores it at dst. +// +// dst must be the full path with extension, e.g: /tmp/foo.tar +// If src is not a folder an error is returned +func tarFolder(logger *logger.Logger, dst io.Writer, srcPath string) error { + fullPath, err := filepath.Abs(srcPath) + if err != nil { + return fmt.Errorf("cannot get full path from '%s': '%w'", srcPath, err) + } + + tarWriter := tar.NewWriter(dst) + defer func() { + if err := tarWriter.Close(); err != nil { + logger.Warnf("cannot close tar writer: '%s'", err) + } + }() + + info, err := os.Stat(fullPath) + if err != nil { + return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err) + } + + if !info.IsDir() { + return fmt.Errorf("'%s' is not a directory", fullPath) + } + baseDir := filepath.Base(srcPath) + + logger.Debugf("starting to walk '%s'", fullPath) + + return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error { + // Stop if there is any errors + if prevErr != nil { + return prevErr + } + + pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, srcPath)) + if !matchRegistryFiles(fileBeatRegistryPathRegExps, pathInTar) { + return nil + } + header, err := tar.FileInfoHeader(info, info.Name()) + if err != nil { + return fmt.Errorf("cannot create tar info header: '%w'", err) + } + header.Name = pathInTar + + if err := tarWriter.WriteHeader(header); err != nil { + return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err) + } + + if info.IsDir() { + return nil + } + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("cannot open '%s' for reading: '%w", path, err) + } + defer func() { + if closeErr := file.Close(); closeErr != nil { + logger.Warnf("cannot close file '%s': '%s'", path, closeErr) + } + }() + + logger.Debugf("adding '%s' to the tar archive", file.Name()) + if _, err := io.Copy(tarWriter, file); err != nil { + return fmt.Errorf("cannot read '%s': '%w'", path, err) + } + + return nil + }) +} + +func matchRegistryFiles(registryFileRegExps []*regexp.Regexp, path string) bool { + for _, regExp := range registryFileRegExps { + if regExp.MatchString(path) { + return true + } + } + return false +} diff --git a/internal/pkg/otel/manager/diagnostics_test.go b/internal/pkg/otel/manager/diagnostics_test.go index 2dbb52e8432..93b26f2e31f 100644 --- a/internal/pkg/otel/manager/diagnostics_test.go +++ b/internal/pkg/otel/manager/diagnostics_test.go @@ -5,9 +5,19 @@ package manager import ( + "archive/tar" + "bytes" + "compress/gzip" "context" + "crypto/rand" + "io" + "os" + "path/filepath" + "regexp" "testing" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" @@ -99,3 +109,134 @@ func TestPerformDiagnostics(t *testing.T) { assert.Equal(t, expectedDiags, diags) }) } + +func TestMatchRegistryFiles(t *testing.T) { + regexps := getRegexpsForRegistryFiles() + testCases := []struct { + path string + expected bool + }{ + {"registry", true}, + {filepath.Join("registry", "filebeat"), true}, + {filepath.Join("registry", "filebeat", "meta.json"), true}, + {filepath.Join("registry", "filebeat", "log.json"), true}, + {filepath.Join("registry", "filebeat", "active.dat"), true}, + {filepath.Join("registry", "filebeat", "12345.json"), true}, + {filepath.Join("registry", "filebeat", "other.txt"), false}, + {"not_registry", false}, + } + + for _, tc := range testCases { + t.Run(tc.path, func(t *testing.T) { + assert.Equal(t, tc.expected, matchRegistryFiles(regexps, tc.path)) + }) + } +} + +func TestTarFolder(t *testing.T) { + logger, _ := loggertest.New("test") + + // Create a temporary source directory + srcDir, err := os.MkdirTemp("", "src") + require.NoError(t, err) + defer os.RemoveAll(srcDir) + + // Create registry structure + registryDir := filepath.Join(srcDir, "registry") + filebeatDir := filepath.Join(registryDir, "filebeat") + require.NoError(t, os.MkdirAll(filebeatDir, 0755)) + + // Create files + filesToCreate := []string{ + filepath.Join(filebeatDir, "meta.json"), + filepath.Join(filebeatDir, "log.json"), + filepath.Join(filebeatDir, "123.json"), + filepath.Join(filebeatDir, "should_be_ignored.txt"), + } + for _, f := range filesToCreate { + require.NoError(t, os.WriteFile(f, []byte("test data"), 0644)) + } + + // Tar the folder + var buf bytes.Buffer + err = tarFolder(logger, &buf, registryDir) + require.NoError(t, err) + + // Verify the tar contents + tarReader := tar.NewReader(&buf) + foundFiles := make(map[string]bool) + for { + hdr, err := tarReader.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + foundFiles[hdr.Name] = true + } + + assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "meta.json")]) + assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "log.json")]) + assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "123.json")]) + assert.False(t, foundFiles[filepath.Join("registry", "filebeat", "should_be_ignored.txt")]) +} + +func TestFileBeatRegistryPath(t *testing.T) { + compID := "test-component" + expectedPath := filepath.Join(paths.Run(), compID, "registry") + assert.Equal(t, expectedPath, FileBeatRegistryPath(compID)) +} + +func TestFileBeatRegistryTarGz(t *testing.T) { + logger, _ := loggertest.New("test") + compID := "filebeat-comp-1" + + setTemporaryAgentPath(t) + registryPath := FileBeatRegistryPath(compID) + require.NoError(t, os.MkdirAll(filepath.Join(registryPath, "filebeat"), 0755)) + require.NoError(t, os.WriteFile(filepath.Join(registryPath, "filebeat", "meta.json"), []byte("test"), 0644)) + + t.Run("creates a valid tar.gz", func(t *testing.T) { + data, err := FileBeatRegistryTarGz(logger, compID) + require.NoError(t, err) + + gzReader, err := gzip.NewReader(bytes.NewReader(data)) + require.NoError(t, err) + tarReader := tar.NewReader(gzReader) + hdr, err := tarReader.Next() + require.NoError(t, err) + assert.Equal(t, "registry", hdr.Name) + hdr, err = tarReader.Next() + require.NoError(t, err) + assert.Equal(t, filepath.Join("registry", "filebeat"), hdr.Name) + hdr, err = tarReader.Next() + require.NoError(t, err) + assert.Equal(t, filepath.Join("registry", "filebeat", "meta.json"), hdr.Name) + }) + + t.Run("returns error if registry is too large", func(t *testing.T) { + // Temporarily change the regex to include a large file + originalRegexps := fileBeatRegistryPathRegExps + fileBeatRegistryPathRegExps = []*regexp.Regexp{regexp.MustCompile(".*")} + defer func() { fileBeatRegistryPathRegExps = originalRegexps }() + + largeFilePath := filepath.Join(registryPath, "largefile.log") + largeData := make([]byte, 21*1024*1024) // 21MB + _, err := rand.Read(largeData) + require.NoError(t, err) + require.NoError(t, os.WriteFile(largeFilePath, largeData, 0644)) + defer os.Remove(largeFilePath) + + _, err = FileBeatRegistryTarGz(logger, compID) + require.Error(t, err) + assert.Contains(t, err.Error(), "registry is too large for diagnostics") + }) +} + +func setTemporaryAgentPath(t *testing.T) { + topPath := paths.Top() + tempTopPath := t.TempDir() + paths.SetTop(tempTopPath) + t.Cleanup(func() { + paths.SetTop(topPath) + }) +} diff --git a/internal/pkg/otel/translate/otelconfig.go b/internal/pkg/otel/translate/otelconfig.go index b0a1b46dcc2..d9ceec48bc0 100644 --- a/internal/pkg/otel/translate/otelconfig.go +++ b/internal/pkg/otel/translate/otelconfig.go @@ -192,8 +192,7 @@ func getReceiversConfigForComponent( // Beat config inside a beat receiver is nested under an additional key. Not sure if this simple translation is // always safe. We should either ensure this is always the case, or have an explicit mapping. beatName := strings.TrimSuffix(receiverType.String(), "receiver") - beatDataPath := filepath.Join(paths.Run(), comp.ID) - binaryName := getBeatNameForComponent(comp) + binaryName := GetBeatNameForComponent(comp) dataset := fmt.Sprintf("elastic_agent.%s", strings.ReplaceAll(strings.ReplaceAll(binaryName, "-", "_"), "/", "_")) receiverConfig := map[string]any{ @@ -203,7 +202,7 @@ func getReceiversConfigForComponent( }, // just like we do for beats processes, each receiver needs its own data path "path": map[string]any{ - "data": beatDataPath, + "data": BeatDataPath(comp.ID), }, // adds additional context on logs emitted by beatreceivers to uniquely identify per component logs "logging": map[string]any{ @@ -279,8 +278,8 @@ func getExportersConfigForComponent(comp *component.Component) (exporterCfg map[ return exportersConfig, queueSettings, nil } -// getBeatNameForComponent returns the beat binary name that would be used to run this component. -func getBeatNameForComponent(comp *component.Component) string { +// GetBeatNameForComponent returns the beat binary name that would be used to run this component. +func GetBeatNameForComponent(comp *component.Component) string { // TODO: Add this information directly to the spec? if comp.InputSpec == nil || comp.InputSpec.BinaryName != "agentbeat" { return "" @@ -291,7 +290,7 @@ func getBeatNameForComponent(comp *component.Component) string { // getSignalForComponent returns the otel signal for the given component. Currently, this is always logs, even for // metricbeat. func getSignalForComponent(comp *component.Component) (pipeline.Signal, error) { - beatName := getBeatNameForComponent(comp) + beatName := GetBeatNameForComponent(comp) switch beatName { case "filebeat", "metricbeat": return pipeline.SignalLogs, nil @@ -302,7 +301,7 @@ func getSignalForComponent(comp *component.Component) (pipeline.Signal, error) { // getReceiverTypeForComponent returns the receiver type for the given component. func getReceiverTypeForComponent(comp *component.Component) (otelcomponent.Type, error) { - beatName := getBeatNameForComponent(comp) + beatName := GetBeatNameForComponent(comp) switch beatName { case "filebeat": return otelcomponent.MustNewType(fbreceiver.Name), nil @@ -399,7 +398,7 @@ func getInputsForUnit(unit component.Unit, info info.Agent, defaultDataStreamTyp // getDefaultDatastreamTypeForComponent returns the default datastream type for a given component. // This is needed to translate from the agent policy config format to the beats config format. func getDefaultDatastreamTypeForComponent(comp *component.Component) (string, error) { - beatName := getBeatNameForComponent(comp) + beatName := GetBeatNameForComponent(comp) switch beatName { case "filebeat": return "logs", nil @@ -426,3 +425,7 @@ func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { esConfig["mapping"] = map[string]any{"mode": "bodymap"} return esConfig, nil } + +func BeatDataPath(componentId string) string { + return filepath.Join(paths.Run(), componentId) +} diff --git a/testing/integration/ess/diagnostics_test.go b/testing/integration/ess/diagnostics_test.go index eab26e2207b..87188a556d9 100644 --- a/testing/integration/ess/diagnostics_test.go +++ b/testing/integration/ess/diagnostics_test.go @@ -308,7 +308,7 @@ inputs: - id: filestream-filebeat type: filestream paths: - - /var/log/system.log + - {{ .InputFile }} prospector.scanner.fingerprint.enabled: false file_identity.native: ~ use_output: default @@ -324,62 +324,72 @@ agent.monitoring.enabled: false var filebeatSetup = map[string]integrationtest.ComponentState{ "filestream-default": { State: integrationtest.NewClientState(client.Healthy), - Units: map[integrationtest.ComponentUnitKey]integrationtest.ComponentUnitState{ - integrationtest.ComponentUnitKey{UnitType: client.UnitTypeOutput, UnitID: "filestream-default"}: { - State: integrationtest.NewClientState(client.Healthy), - }, - integrationtest.ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "filestream-filebeat"}: { - State: integrationtest.NewClientState(client.Healthy), - }, - }, }, } - f, err := define.NewFixtureFromLocalBuild(t, define.Version(), integrationtest.WithAllowErrors()) - require.NoError(t, err) ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(10*time.Minute)) defer cancel() - err = f.Prepare(ctx) - require.NoError(t, err) - t.Run("filebeat process", func(t *testing.T) { - var configBuffer bytes.Buffer - require.NoError(t, - template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, map[string]any{ - "Runtime": "process", - })) - expectedCompDiagnosticsFiles := append(compDiagnosticsFiles, - "registry.tar.gz", - "input_metrics.json", - "beat_metrics.json", - "beat-rendered-config.yml", - "global_processors.txt", - "filestream-filebeat/error.txt", - "filestream-default/error.txt", - ) - err = f.Run(ctx, integrationtest.State{ - Configure: configBuffer.String(), - AgentState: integrationtest.NewClientState(client.Healthy), - After: testDiagnosticsFactory(t, filebeatSetup, diagnosticsFiles, expectedCompDiagnosticsFiles, f, []string{"diagnostics", "collect"}), - }) - assert.NoError(t, err) - }) + testCases := []struct { + name string + runtime string + expectedCompDiagnosticsFiles []string + expectedAgentState *client.State + }{ + { + name: "filebeat process", + runtime: "process", + expectedCompDiagnosticsFiles: append(compDiagnosticsFiles, + "registry.tar.gz", + "input_metrics.json", + "beat_metrics.json", + "beat-rendered-config.yml", + "global_processors.txt", + "filestream-filebeat/error.txt", + "filestream-default/error.txt", + ), + expectedAgentState: integrationtest.NewClientState(client.Healthy), + }, + { + name: "filebeat container", + runtime: "otel", + expectedCompDiagnosticsFiles: []string{"registry.tar.gz"}, + expectedAgentState: integrationtest.NewClientState(client.Degraded), + }, + } - t.Run("filebeat receiver", func(t *testing.T) { - var configBuffer bytes.Buffer - require.NoError(t, - template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, map[string]any{ - "Runtime": "otel", - })) - // currently we don't expect any diagnostics files for beats receivers - var expectedCompDiagnosticsFiles []string - err = f.Run(ctx, integrationtest.State{ - Configure: configBuffer.String(), - AgentState: integrationtest.NewClientState(client.Healthy), - After: testDiagnosticsFactory(t, filebeatSetup, diagnosticsFiles, expectedCompDiagnosticsFiles, f, []string{"diagnostics", "collect"}), + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create the fixture + f, err := define.NewFixtureFromLocalBuild(t, define.Version(), integrationtest.WithAllowErrors()) + require.NoError(t, err) + err = f.Prepare(ctx) + require.NoError(t, err) + + // Create the data file to ingest + inputFile, err := os.CreateTemp(t.TempDir(), "input.txt") + require.NoError(t, err, "failed to create temp file to hold data to ingest") + t.Cleanup(func() { + cErr := inputFile.Close() + assert.NoError(t, cErr) + }) + _, err = inputFile.WriteString("hello world\n") + require.NoError(t, err, "failed to write data to temp file") + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, map[string]any{ + "Runtime": tc.runtime, + "InputFile": inputFile.Name(), + })) + err = f.Run(ctx, integrationtest.State{ + Configure: configBuffer.String(), + AgentState: tc.expectedAgentState, + After: testDiagnosticsFactory(t, filebeatSetup, diagnosticsFiles, tc.expectedCompDiagnosticsFiles, f, []string{"diagnostics", "collect"}), + }) + assert.NoError(t, err) }) - assert.NoError(t, err) - }) + } } func testDiagnosticsFactory(t *testing.T, compSetup map[string]integrationtest.ComponentState, diagFiles []string, diagCompFiles []string, fix *integrationtest.Fixture, cmd []string) func(ctx context.Context) error { @@ -392,6 +402,11 @@ func testDiagnosticsFactory(t *testing.T, compSetup map[string]integrationtest.C verifyDiagnosticArchive(t, compSetup, diagZip, diagFiles, diagCompFiles, avi) + // preserve the diagnostic archive if the test failed + if t.Failed() { + fix.MoveToDiagnosticsDir(diagZip) + } + return nil } }