Skip to content

Add filebeat registry to beat receiver diagnostics #9029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions internal/pkg/otel/manager/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,32 @@
package manager

import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"

"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
"github.com/elastic/elastic-agent/pkg/core/logger"

"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
)

var fileBeatRegistryPathRegExps = getRegexpsForRegistryFiles()

// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
// it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning
// is logged.
Expand Down Expand Up @@ -98,5 +117,191 @@ func (m *OTelManager) PerformComponentDiagnostics(
}
}

for idx, diag := range diagnostics {
var results []*proto.ActionDiagnosticUnitResult
if translate.GetBeatNameForComponent(&diag.Component) == "filebeat" {
// include filebeat registry, reimplementation of a filebeat diagnostic hook
registryTarGzBytes, err := FileBeatRegistryTarGz(m.logger, diag.Component.ID)
if err != nil {
m.logger.Warnf("error creating registry tar.gz: %v", err)
continue
}
m.logger.Debugf("created registry tar.gz, size %d", len(registryTarGzBytes))
results = append(results, &proto.ActionDiagnosticUnitResult{
Name: "registry",
Description: "Filebeat's registry",
Filename: "registry.tar.gz",
ContentType: "application/octet-stream",
Content: registryTarGzBytes,
Generated: timestamppb.Now(),
})
}
diagnostics[idx].Results = results
}

return diagnostics, nil
}

func FileBeatRegistryPath(componentID string) string {
dataPath := translate.BeatDataPath(componentID)
return filepath.Join(dataPath, "registry")
}

// FileBeatRegistryTarGz creates a tar.gz file containing the filebeat registry and returns its contents as bytes.
func FileBeatRegistryTarGz(logger *logger.Logger, componentID string) ([]byte, error) {
registryPath := FileBeatRegistryPath(componentID)

tempFile, err := os.CreateTemp("", "temp-registry.tar.gz")
if err != nil {
return nil, err
}

defer func() {
if closeErr := tempFile.Close(); closeErr != nil {
logger.Warn("error closing temporary registry archive", "error", closeErr)
}
if removeErr := os.Remove(tempFile.Name()); removeErr != nil {
logger.Warnf("cannot remove temporary registry archive '%s': '%s'", tempFile.Name(), removeErr)
}
}()

gzWriter := gzip.NewWriter(tempFile)
defer func() {
if closeErr := gzWriter.Close(); closeErr != nil {
logger.Warnf("error closing gzip writer: %v", closeErr)
}
}()

err = tarFolder(logger, gzWriter, registryPath)
if err != nil {
return nil, err
}
if closeErr := gzWriter.Close(); closeErr != nil {
return nil, closeErr
}

stat, err := tempFile.Stat()
if err != nil {
return nil, err
}

if stat.Size() > 20_000_000 {
return nil, fmt.Errorf("registry is too large for diagnostics, %d > 20mb", stat.Size()/1_000_000)
}

var output bytes.Buffer
_, err = tempFile.Seek(0, 0)
if err != nil {
return nil, err
}
_, err = io.Copy(&output, tempFile)
if err != nil {
return nil, err
}

return output.Bytes(), nil
}

// getRegexpsForRegistryFiles returns a list of regexps to match filebeat registry files.
func getRegexpsForRegistryFiles() []*regexp.Regexp {
var registryFileRegExps []*regexp.Regexp
preFilesList := [][]string{
{"^registry$"},
{"^registry", "filebeat$"},
{"^registry", "filebeat", "meta\\.json$"},
{"^registry", "filebeat", "log\\.json$"},
{"^registry", "filebeat", "active\\.dat$"},
{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
}

for _, lst := range preFilesList {
// On windows, we need to ensure we escape the path separator, because backslash has a special meaning
separator := regexp.QuoteMeta(string(filepath.Separator))
pathRe := strings.Join(lst, separator)
re := regexp.MustCompile(pathRe)
registryFileRegExps = append(registryFileRegExps, re)
}

return registryFileRegExps
}

// tarFolder creates a tar archive from the folder src and stores it at dst.
//
// dst must be the full path with extension, e.g: /tmp/foo.tar
// If src is not a folder an error is returned
func tarFolder(logger *logger.Logger, dst io.Writer, srcPath string) error {
fullPath, err := filepath.Abs(srcPath)
if err != nil {
return fmt.Errorf("cannot get full path from '%s': '%w'", srcPath, err)
}

tarWriter := tar.NewWriter(dst)
defer func() {
if err := tarWriter.Close(); err != nil {
logger.Warnf("cannot close tar writer: '%s'", err)
}
}()

info, err := os.Stat(fullPath)
if err != nil {
return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err)
}

if !info.IsDir() {
return fmt.Errorf("'%s' is not a directory", fullPath)
}
baseDir := filepath.Base(srcPath)

logger.Debugf("starting to walk '%s'", fullPath)

return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error {
// Stop if there is any errors
if prevErr != nil {
return prevErr
}

pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, srcPath))
if !matchRegistryFiles(fileBeatRegistryPathRegExps, pathInTar) {
return nil
}
header, err := tar.FileInfoHeader(info, info.Name())
if err != nil {
return fmt.Errorf("cannot create tar info header: '%w'", err)
}
header.Name = pathInTar

if err := tarWriter.WriteHeader(header); err != nil {
return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err)
}

if info.IsDir() {
return nil
}

file, err := os.Open(path)
if err != nil {
return fmt.Errorf("cannot open '%s' for reading: '%w", path, err)
}
defer func() {
if closeErr := file.Close(); closeErr != nil {
logger.Warnf("cannot close file '%s': '%s'", path, closeErr)
}
}()

logger.Debugf("adding '%s' to the tar archive", file.Name())
if _, err := io.Copy(tarWriter, file); err != nil {
return fmt.Errorf("cannot read '%s': '%w'", path, err)
}

return nil
})
}

func matchRegistryFiles(registryFileRegExps []*regexp.Regexp, path string) bool {
for _, regExp := range registryFileRegExps {
if regExp.MatchString(path) {
return true
}
}
return false
}
141 changes: 141 additions & 0 deletions internal/pkg/otel/manager/diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@
package manager

import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"crypto/rand"
"io"
"os"
"path/filepath"
"regexp"
"testing"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"

"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"

Expand Down Expand Up @@ -99,3 +109,134 @@ func TestPerformDiagnostics(t *testing.T) {
assert.Equal(t, expectedDiags, diags)
})
}

func TestMatchRegistryFiles(t *testing.T) {
regexps := getRegexpsForRegistryFiles()
testCases := []struct {
path string
expected bool
}{
{"registry", true},
{filepath.Join("registry", "filebeat"), true},
{filepath.Join("registry", "filebeat", "meta.json"), true},
{filepath.Join("registry", "filebeat", "log.json"), true},
{filepath.Join("registry", "filebeat", "active.dat"), true},
{filepath.Join("registry", "filebeat", "12345.json"), true},
{filepath.Join("registry", "filebeat", "other.txt"), false},
{"not_registry", false},
}

for _, tc := range testCases {
t.Run(tc.path, func(t *testing.T) {
assert.Equal(t, tc.expected, matchRegistryFiles(regexps, tc.path))
})
}
}

func TestTarFolder(t *testing.T) {
logger, _ := loggertest.New("test")

// Create a temporary source directory
srcDir, err := os.MkdirTemp("", "src")
require.NoError(t, err)
defer os.RemoveAll(srcDir)

// Create registry structure
registryDir := filepath.Join(srcDir, "registry")
filebeatDir := filepath.Join(registryDir, "filebeat")
require.NoError(t, os.MkdirAll(filebeatDir, 0755))

// Create files
filesToCreate := []string{
filepath.Join(filebeatDir, "meta.json"),
filepath.Join(filebeatDir, "log.json"),
filepath.Join(filebeatDir, "123.json"),
filepath.Join(filebeatDir, "should_be_ignored.txt"),
}
for _, f := range filesToCreate {
require.NoError(t, os.WriteFile(f, []byte("test data"), 0644))
}

// Tar the folder
var buf bytes.Buffer
err = tarFolder(logger, &buf, registryDir)
require.NoError(t, err)

// Verify the tar contents
tarReader := tar.NewReader(&buf)
foundFiles := make(map[string]bool)
for {
hdr, err := tarReader.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
foundFiles[hdr.Name] = true
}

assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "meta.json")])
assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "log.json")])
assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "123.json")])
assert.False(t, foundFiles[filepath.Join("registry", "filebeat", "should_be_ignored.txt")])
}

func TestFileBeatRegistryPath(t *testing.T) {
compID := "test-component"
expectedPath := filepath.Join(paths.Run(), compID, "registry")
assert.Equal(t, expectedPath, FileBeatRegistryPath(compID))
}

func TestFileBeatRegistryTarGz(t *testing.T) {
logger, _ := loggertest.New("test")
compID := "filebeat-comp-1"

setTemporaryAgentPath(t)
registryPath := FileBeatRegistryPath(compID)
require.NoError(t, os.MkdirAll(filepath.Join(registryPath, "filebeat"), 0755))
require.NoError(t, os.WriteFile(filepath.Join(registryPath, "filebeat", "meta.json"), []byte("test"), 0644))

t.Run("creates a valid tar.gz", func(t *testing.T) {
data, err := FileBeatRegistryTarGz(logger, compID)
require.NoError(t, err)

gzReader, err := gzip.NewReader(bytes.NewReader(data))
require.NoError(t, err)
tarReader := tar.NewReader(gzReader)
hdr, err := tarReader.Next()
require.NoError(t, err)
assert.Equal(t, "registry", hdr.Name)
hdr, err = tarReader.Next()
require.NoError(t, err)
assert.Equal(t, filepath.Join("registry", "filebeat"), hdr.Name)
hdr, err = tarReader.Next()
require.NoError(t, err)
assert.Equal(t, filepath.Join("registry", "filebeat", "meta.json"), hdr.Name)
})

t.Run("returns error if registry is too large", func(t *testing.T) {
// Temporarily change the regex to include a large file
originalRegexps := fileBeatRegistryPathRegExps
fileBeatRegistryPathRegExps = []*regexp.Regexp{regexp.MustCompile(".*")}
defer func() { fileBeatRegistryPathRegExps = originalRegexps }()

largeFilePath := filepath.Join(registryPath, "largefile.log")
largeData := make([]byte, 21*1024*1024) // 21MB
_, err := rand.Read(largeData)
require.NoError(t, err)
require.NoError(t, os.WriteFile(largeFilePath, largeData, 0644))
defer os.Remove(largeFilePath)

_, err = FileBeatRegistryTarGz(logger, compID)
require.Error(t, err)
assert.Contains(t, err.Error(), "registry is too large for diagnostics")
})
}

func setTemporaryAgentPath(t *testing.T) {
topPath := paths.Top()
tempTopPath := t.TempDir()
paths.SetTop(tempTopPath)
t.Cleanup(func() {
paths.SetTop(topPath)
})
}
Loading
Loading