Skip to content

Commit f374144

Browse files
committed
Add filebeat registry to filebeat receiver diagnostic
1 parent 644ae1e commit f374144

File tree

5 files changed

+356
-12
lines changed

5 files changed

+356
-12
lines changed

internal/pkg/otel/manager/common_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ package manager
66

77
import (
88
"errors"
9-
"github.com/elastic/elastic-agent-client/v7/pkg/client"
10-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
11-
"github.com/elastic/elastic-agent/pkg/component"
129
"net"
1310
"path/filepath"
1411
"testing"

internal/pkg/otel/manager/diagnostics.go

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,32 @@
55
package manager
66

77
import (
8+
"archive/tar"
9+
"bytes"
10+
"compress/gzip"
811
"context"
12+
"fmt"
13+
"io"
14+
"io/fs"
15+
"os"
16+
"path/filepath"
17+
"regexp"
18+
"strings"
19+
20+
"google.golang.org/protobuf/types/known/timestamppb"
21+
22+
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
23+
24+
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
25+
"github.com/elastic/elastic-agent/pkg/core/logger"
926

1027
"github.com/elastic/elastic-agent/pkg/component"
1128
"github.com/elastic/elastic-agent/pkg/component/runtime"
1229
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
1330
)
1431

32+
var fileBeatRegistryPathRegExps = getRegexpsForRegistryFiles()
33+
1534
// PerformDiagnostics executes the diagnostic action for the provided units. If no units are provided then
1635
// it performs diagnostics for all current units. If a given unit does not exist in the manager, then a warning
1736
// is logged.
@@ -98,5 +117,189 @@ func (m *OTelManager) PerformComponentDiagnostics(
98117
}
99118
}
100119

120+
for idx, diag := range diagnostics {
121+
var results []*proto.ActionDiagnosticUnitResult
122+
if translate.GetBeatNameForComponent(&diag.Component) == "filebeat" {
123+
// include filebeat registry, reimplementation of a filebeat diagnostic hook
124+
registryTarGzBytes, err := FileBeatRegistryTarGz(m.logger, diag.Component.ID)
125+
if err != nil {
126+
m.logger.Warnf("error creating registry tar.gz: %v", err)
127+
continue
128+
}
129+
m.logger.Debugf("created registry tar.gz, size %d", len(registryTarGzBytes))
130+
results = append(results, &proto.ActionDiagnosticUnitResult{
131+
Name: "registry",
132+
Description: "Filebeat's registry",
133+
Filename: "registry.tar.gz",
134+
ContentType: "application/octet-stream",
135+
Content: registryTarGzBytes,
136+
Generated: timestamppb.Now(),
137+
})
138+
}
139+
diagnostics[idx].Results = results
140+
}
141+
101142
return diagnostics, nil
102143
}
144+
145+
func FileBeatRegistryPath(componentID string) string {
146+
dataPath := translate.BeatDataPath(componentID)
147+
return filepath.Join(dataPath, "registry")
148+
}
149+
150+
// FileBeatRegistryTarGz creates a tar.gz file containing the filebeat registry and returns its contents as bytes.
151+
func FileBeatRegistryTarGz(logger *logger.Logger, componentID string) ([]byte, error) {
152+
registryPath := FileBeatRegistryPath(componentID)
153+
154+
tempFile, err := os.CreateTemp("", "temp-registry.tar.gz")
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
defer func() {
160+
if closeErr := tempFile.Close(); closeErr != nil {
161+
logger.Warn("error closing temporary registry archive", "error", closeErr)
162+
}
163+
if removeErr := os.Remove(tempFile.Name()); removeErr != nil {
164+
logger.Warnf("cannot remove temporary registry archive '%s': '%s'", tempFile.Name(), removeErr)
165+
}
166+
}()
167+
168+
gzWriter := gzip.NewWriter(tempFile)
169+
defer func() {
170+
if closeErr := gzWriter.Close(); closeErr != nil {
171+
logger.Warnf("error closing gzip writer: %v", closeErr)
172+
}
173+
}()
174+
175+
err = tarFolder(logger, gzWriter, registryPath)
176+
if err != nil {
177+
return nil, err
178+
}
179+
if closeErr := gzWriter.Close(); closeErr != nil {
180+
return nil, closeErr
181+
}
182+
183+
stat, err := tempFile.Stat()
184+
if err != nil {
185+
return nil, err
186+
}
187+
188+
if stat.Size() > 20_000_000 {
189+
return nil, fmt.Errorf("registry is too large for diagnostics, %d > 20mb", stat.Size()/1_000_000)
190+
}
191+
192+
var output bytes.Buffer
193+
_, err = tempFile.Seek(0, 0)
194+
if err != nil {
195+
return nil, err
196+
}
197+
_, err = io.Copy(&output, tempFile)
198+
if err != nil {
199+
return nil, err
200+
}
201+
202+
return output.Bytes(), nil
203+
}
204+
205+
// getRegexpsForRegistryFiles returns a list of regexps to match filebeat registry files.
206+
func getRegexpsForRegistryFiles() []*regexp.Regexp {
207+
var registryFileRegExps []*regexp.Regexp
208+
preFilesList := [][]string{
209+
{"^registry$"},
210+
{"^registry", "filebeat$"},
211+
{"^registry", "filebeat", "meta\\.json$"},
212+
{"^registry", "filebeat", "log\\.json$"},
213+
{"^registry", "filebeat", "active\\.dat$"},
214+
{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
215+
}
216+
217+
for _, lst := range preFilesList {
218+
pathRe := filepath.Join(lst...)
219+
re := regexp.MustCompile(pathRe)
220+
registryFileRegExps = append(registryFileRegExps, re)
221+
}
222+
223+
return registryFileRegExps
224+
}
225+
226+
// tarFolder creates a tar archive from the folder src and stores it at dst.
227+
//
228+
// dst must be the full path with extension, e.g: /tmp/foo.tar
229+
// If src is not a folder an error is returned
230+
func tarFolder(logger *logger.Logger, dst io.Writer, srcPath string) error {
231+
fullPath, err := filepath.Abs(srcPath)
232+
if err != nil {
233+
return fmt.Errorf("cannot get full path from '%s': '%w'", srcPath, err)
234+
}
235+
236+
tarWriter := tar.NewWriter(dst)
237+
defer func() {
238+
if err := tarWriter.Close(); err != nil {
239+
logger.Warnf("cannot close tar writer: '%s'", err)
240+
}
241+
}()
242+
243+
info, err := os.Stat(fullPath)
244+
if err != nil {
245+
return fmt.Errorf("cannot stat '%s': '%w'", fullPath, err)
246+
}
247+
248+
if !info.IsDir() {
249+
return fmt.Errorf("'%s' is not a directory", fullPath)
250+
}
251+
baseDir := filepath.Base(srcPath)
252+
253+
logger.Debugf("starting to walk '%s'", fullPath)
254+
255+
return filepath.Walk(fullPath, func(path string, info fs.FileInfo, prevErr error) error {
256+
// Stop if there is any errors
257+
if prevErr != nil {
258+
return prevErr
259+
}
260+
261+
pathInTar := filepath.Join(baseDir, strings.TrimPrefix(path, srcPath))
262+
if !matchRegistryFiles(fileBeatRegistryPathRegExps, pathInTar) {
263+
return nil
264+
}
265+
header, err := tar.FileInfoHeader(info, info.Name())
266+
if err != nil {
267+
return fmt.Errorf("cannot create tar info header: '%w'", err)
268+
}
269+
header.Name = pathInTar
270+
271+
if err := tarWriter.WriteHeader(header); err != nil {
272+
return fmt.Errorf("cannot write tar header for '%s': '%w'", path, err)
273+
}
274+
275+
if info.IsDir() {
276+
return nil
277+
}
278+
279+
file, err := os.Open(path)
280+
if err != nil {
281+
return fmt.Errorf("cannot open '%s' for reading: '%w", path, err)
282+
}
283+
defer func() {
284+
if closeErr := file.Close(); closeErr != nil {
285+
logger.Warnf("cannot close file '%s': '%s'", path, closeErr)
286+
}
287+
}()
288+
289+
logger.Debugf("adding '%s' to the tar archive", file.Name())
290+
if _, err := io.Copy(tarWriter, file); err != nil {
291+
return fmt.Errorf("cannot read '%s': '%w'", path, err)
292+
}
293+
294+
return nil
295+
})
296+
}
297+
298+
func matchRegistryFiles(registryFileRegExps []*regexp.Regexp, path string) bool {
299+
for _, regExp := range registryFileRegExps {
300+
if regExp.MatchString(path) {
301+
return true
302+
}
303+
}
304+
return false
305+
}

internal/pkg/otel/manager/diagnostics_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,19 @@
55
package manager
66

77
import (
8+
"archive/tar"
9+
"bytes"
10+
"compress/gzip"
811
"context"
12+
"crypto/rand"
13+
"io"
14+
"os"
15+
"path/filepath"
16+
"regexp"
917
"testing"
1018

19+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
20+
1121
"github.com/elastic/elastic-agent/pkg/component/runtime"
1222
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
1323

@@ -99,3 +109,134 @@ func TestPerformDiagnostics(t *testing.T) {
99109
assert.Equal(t, expectedDiags, diags)
100110
})
101111
}
112+
113+
func TestMatchRegistryFiles(t *testing.T) {
114+
regexps := getRegexpsForRegistryFiles()
115+
testCases := []struct {
116+
path string
117+
expected bool
118+
}{
119+
{"registry", true},
120+
{filepath.Join("registry", "filebeat"), true},
121+
{filepath.Join("registry", "filebeat", "meta.json"), true},
122+
{filepath.Join("registry", "filebeat", "log.json"), true},
123+
{filepath.Join("registry", "filebeat", "active.dat"), true},
124+
{filepath.Join("registry", "filebeat", "12345.json"), true},
125+
{filepath.Join("registry", "filebeat", "other.txt"), false},
126+
{"not_registry", false},
127+
}
128+
129+
for _, tc := range testCases {
130+
t.Run(tc.path, func(t *testing.T) {
131+
assert.Equal(t, tc.expected, matchRegistryFiles(regexps, tc.path))
132+
})
133+
}
134+
}
135+
136+
func TestTarFolder(t *testing.T) {
137+
logger, _ := loggertest.New("test")
138+
139+
// Create a temporary source directory
140+
srcDir, err := os.MkdirTemp("", "src")
141+
require.NoError(t, err)
142+
defer os.RemoveAll(srcDir)
143+
144+
// Create registry structure
145+
registryDir := filepath.Join(srcDir, "registry")
146+
filebeatDir := filepath.Join(registryDir, "filebeat")
147+
require.NoError(t, os.MkdirAll(filebeatDir, 0755))
148+
149+
// Create files
150+
filesToCreate := []string{
151+
filepath.Join(filebeatDir, "meta.json"),
152+
filepath.Join(filebeatDir, "log.json"),
153+
filepath.Join(filebeatDir, "123.json"),
154+
filepath.Join(filebeatDir, "should_be_ignored.txt"),
155+
}
156+
for _, f := range filesToCreate {
157+
require.NoError(t, os.WriteFile(f, []byte("test data"), 0644))
158+
}
159+
160+
// Tar the folder
161+
var buf bytes.Buffer
162+
err = tarFolder(logger, &buf, registryDir)
163+
require.NoError(t, err)
164+
165+
// Verify the tar contents
166+
tarReader := tar.NewReader(&buf)
167+
foundFiles := make(map[string]bool)
168+
for {
169+
hdr, err := tarReader.Next()
170+
if err == io.EOF {
171+
break
172+
}
173+
require.NoError(t, err)
174+
foundFiles[hdr.Name] = true
175+
}
176+
177+
assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "meta.json")])
178+
assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "log.json")])
179+
assert.True(t, foundFiles[filepath.Join("registry", "filebeat", "123.json")])
180+
assert.False(t, foundFiles[filepath.Join("registry", "filebeat", "should_be_ignored.txt")])
181+
}
182+
183+
func TestFileBeatRegistryPath(t *testing.T) {
184+
compID := "test-component"
185+
expectedPath := filepath.Join(paths.Run(), compID, "registry")
186+
assert.Equal(t, expectedPath, FileBeatRegistryPath(compID))
187+
}
188+
189+
func TestFileBeatRegistryTarGz(t *testing.T) {
190+
logger, _ := loggertest.New("test")
191+
compID := "filebeat-comp-1"
192+
193+
setTemporaryAgentPath(t)
194+
registryPath := FileBeatRegistryPath(compID)
195+
require.NoError(t, os.MkdirAll(filepath.Join(registryPath, "filebeat"), 0755))
196+
require.NoError(t, os.WriteFile(filepath.Join(registryPath, "filebeat", "meta.json"), []byte("test"), 0644))
197+
198+
t.Run("creates a valid tar.gz", func(t *testing.T) {
199+
data, err := FileBeatRegistryTarGz(logger, compID)
200+
require.NoError(t, err)
201+
202+
gzReader, err := gzip.NewReader(bytes.NewReader(data))
203+
require.NoError(t, err)
204+
tarReader := tar.NewReader(gzReader)
205+
hdr, err := tarReader.Next()
206+
require.NoError(t, err)
207+
assert.Equal(t, "registry", hdr.Name)
208+
hdr, err = tarReader.Next()
209+
require.NoError(t, err)
210+
assert.Equal(t, filepath.Join("registry", "filebeat"), hdr.Name)
211+
hdr, err = tarReader.Next()
212+
require.NoError(t, err)
213+
assert.Equal(t, filepath.Join("registry", "filebeat", "meta.json"), hdr.Name)
214+
})
215+
216+
t.Run("returns error if registry is too large", func(t *testing.T) {
217+
// Temporarily change the regex to include a large file
218+
originalRegexps := fileBeatRegistryPathRegExps
219+
fileBeatRegistryPathRegExps = []*regexp.Regexp{regexp.MustCompile(".*")}
220+
defer func() { fileBeatRegistryPathRegExps = originalRegexps }()
221+
222+
largeFilePath := filepath.Join(registryPath, "largefile.log")
223+
largeData := make([]byte, 21*1024*1024) // 21MB
224+
_, err := rand.Read(largeData)
225+
require.NoError(t, err)
226+
require.NoError(t, os.WriteFile(largeFilePath, largeData, 0644))
227+
defer os.Remove(largeFilePath)
228+
229+
_, err = FileBeatRegistryTarGz(logger, compID)
230+
require.Error(t, err)
231+
assert.Contains(t, err.Error(), "registry is too large for diagnostics")
232+
})
233+
}
234+
235+
func setTemporaryAgentPath(t *testing.T) {
236+
topPath := paths.Top()
237+
tempTopPath := t.TempDir()
238+
paths.SetTop(tempTopPath)
239+
t.Cleanup(func() {
240+
paths.SetTop(topPath)
241+
})
242+
}

0 commit comments

Comments
 (0)