diff --git a/internal/logscommon/const.go b/internal/logscommon/const.go index fec0aac015..926db0f255 100644 --- a/internal/logscommon/const.go +++ b/internal/logscommon/const.go @@ -34,4 +34,6 @@ const ( LogType = "log_type" LogBackpressureModeKey = "backpressure_mode" + + RetentionPolicyTTLFileName = "Amazon_CloudWatch_RetentionPolicyTTL" ) diff --git a/plugins/inputs/logfile/logfile.go b/plugins/inputs/logfile/logfile.go index b1f2a7bb1e..656d8929b1 100644 --- a/plugins/inputs/logfile/logfile.go +++ b/plugins/inputs/logfile/logfile.go @@ -373,7 +373,7 @@ func (t *LogFile) cleanupStateFolder() { continue } - if strings.Contains(file, logscommon.WindowsEventLogPrefix) { + if strings.Contains(file, logscommon.WindowsEventLogPrefix) || strings.Contains(file, logscommon.RetentionPolicyTTLFileName) { continue } diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 59c03d3c7a..6ac9c66b7c 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -63,8 +63,9 @@ type CloudWatchLogs struct { Token string `toml:"token"` //log group and stream names - LogStreamName string `toml:"log_stream_name"` - LogGroupName string `toml:"log_group_name"` + LogStreamName string `toml:"log_stream_name"` + LogGroupName string `toml:"log_group_name"` + FileStateFolder string `toml:"file_state_folder"` // Retention for log group RetentionInDays int `toml:"retention_in_days"` @@ -97,6 +98,9 @@ func (c *CloudWatchLogs) Close() error { if c.workerPool != nil { c.workerPool.Stop() } + if c.targetManager != nil { + c.targetManager.Stop() + } return nil } @@ -144,7 +148,7 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { if c.Concurrency > 0 { c.workerPool = pusher.NewWorkerPool(c.Concurrency) } - c.targetManager = pusher.NewTargetManager(c.Log, client) + c.targetManager = pusher.NewTargetManager(c.Log, client, c.FileStateFolder) }) p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup) cwd := &cwDest{pusher: p, retryer: logThrottleRetryer} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go index d12fe0f48d..3e4dabc036 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go @@ -6,6 +6,8 @@ package pusher import ( "errors" "fmt" + "os" + "path/filepath" "strings" "sync" "sync/atomic" @@ -17,6 +19,7 @@ import ( "github.com/influxdata/telegraf" "github.com/stretchr/testify/require" + "github.com/aws/amazon-cloudwatch-agent/internal/logscommon" "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" "github.com/aws/amazon-cloudwatch-agent/tool/testutil" @@ -674,7 +677,10 @@ func testPreparationWithLogger( ) (chan struct{}, *queue) { t.Helper() stop := make(chan struct{}) - tm := NewTargetManager(logger, service) + tempDir := t.TempDir() + file, _ := os.Create(filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName)) + file.Close() + tm := NewTargetManager(logger, service, tempDir) s := newSender(logger, service, tm, retryDuration, stop) q := newQueue( logger, diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retention_policy_ttl.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retention_policy_ttl.go new file mode 100644 index 0000000000..3cc4aa6dd0 --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retention_policy_ttl.go @@ -0,0 +1,172 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "bufio" + "bytes" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + + "github.com/aws/amazon-cloudwatch-agent/internal/logscommon" +) + +const ( + ttlTime = 5 * time.Minute +) + +type payload struct { + group string + timestamp time.Time +} + +type RetentionPolicyTTL struct { + logger telegraf.Logger + stateFilePath string + // oldTimestamps come from the TTL file on agent start. Key is escaped group name + oldTimestamps map[string]time.Time + // newTimestamps are the new TTLs that will be saved periodically and when the agent is done. Key is escaped group name + newTimestamps map[string]time.Time + mu sync.RWMutex + ch chan payload + done chan struct{} +} + +func NewRetentionPolicyTTL(logger telegraf.Logger, fileStatePath string) *RetentionPolicyTTL { + r := &RetentionPolicyTTL{ + logger: logger, + stateFilePath: filepath.Join(fileStatePath, logscommon.RetentionPolicyTTLFileName), + oldTimestamps: make(map[string]time.Time), + newTimestamps: make(map[string]time.Time), + ch: make(chan payload, retentionChannelSize), + done: make(chan struct{}), + } + + r.loadTTLState() + go r.process() + return r +} + +// Update will update the newTimestamps to the current time that will later be persisted to disk. +func (r *RetentionPolicyTTL) Update(group string) { + r.ch <- payload{ + group: group, + timestamp: time.Now(), + } +} + +func (r *RetentionPolicyTTL) Done() { + close(r.done) +} + +// IsExpired checks from the timestamps in the read state file at the agent start. +func (r *RetentionPolicyTTL) IsExpired(group string) bool { + if ts, ok := r.oldTimestamps[escapeLogGroup(group)]; ok { + return ts.Add(ttlTime).Before(time.Now()) + } + // Log group was not in state file -- default to expired + return true +} + +// UpdateFromFile updates the newTimestamps cache using the timestamp from the loaded state file. +func (r *RetentionPolicyTTL) UpdateFromFile(group string) { + if oldTs, ok := r.oldTimestamps[escapeLogGroup(group)]; ok { + r.ch <- payload{ + group: group, + timestamp: oldTs, + } + } +} + +func (r *RetentionPolicyTTL) loadTTLState() { + if _, err := os.Stat(r.stateFilePath); err != nil { + r.logger.Debug("retention policy ttl state file does not exist") + return + } + + file, err := os.Open(r.stateFilePath) + if err != nil { + r.logger.Errorf("unable to open retention policy ttl state file: %v", err) + return + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if len(line) == 0 { + continue + } + split := strings.Split(line, ":") + if len(split) < 2 { + r.logger.Errorf("invalid format in retention policy ttl state file: %s", line) + continue + } + + group := split[0] + timestamp, err := strconv.ParseInt(split[1], 10, 64) + if err != nil { + r.logger.Errorf("unable to parse timestamp in retention policy ttl for group %s: %v", group, err) + continue + } + r.oldTimestamps[group] = time.UnixMilli(timestamp) + } + + if err := scanner.Err(); err != nil { + r.logger.Errorf("error when parsing retention policy ttl state file: %v", err) + return + } +} + +func (r *RetentionPolicyTTL) process() { + t := time.NewTicker(time.Minute) + defer t.Stop() + + for { + select { + case payload := <-r.ch: + r.updateTimestamp(payload.group, payload.timestamp) + case <-t.C: + r.saveTTLState() + case <-r.done: + r.saveTTLState() + return + } + } +} + +func (r *RetentionPolicyTTL) updateTimestamp(group string, timestamp time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + r.newTimestamps[escapeLogGroup(group)] = timestamp +} + +func (r *RetentionPolicyTTL) saveTTLState() { + r.mu.RLock() + defer r.mu.RUnlock() + + var buf bytes.Buffer + for group, timestamp := range r.newTimestamps { + buf.Write([]byte(group + ":" + strconv.FormatInt(timestamp.UnixMilli(), 10) + "\n")) + } + + err := os.WriteFile(r.stateFilePath, buf.Bytes(), 0644) // nolint:gosec + if err != nil { + r.logger.Errorf("unable to write retention policy ttl state file: %v", err) + } +} + +func escapeLogGroup(group string) string { + escapedLogGroup := filepath.ToSlash(group) + escapedLogGroup = strings.Replace(escapedLogGroup, "/", "_", -1) + escapedLogGroup = strings.Replace(escapedLogGroup, " ", "_", -1) + escapedLogGroup = strings.Replace(escapedLogGroup, ":", "_", -1) + return escapedLogGroup +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retention_policy_ttl_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retention_policy_ttl_test.go new file mode 100644 index 0000000000..8e9ad5c12a --- /dev/null +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retention_policy_ttl_test.go @@ -0,0 +1,223 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package pusher + +import ( + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/internal/logscommon" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" +) + +func TestRetentionPolicyTTL(t *testing.T) { + logger := testutil.NewNopLogger() + + t.Run("NewRetentionPolicyTTL", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer ttl.Done() + defer cleanupTTLFile(tempDir) + + assert.NotNil(t, ttl) + assert.Equal(t, filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName), ttl.stateFilePath) + assert.NotNil(t, ttl.oldTimestamps) + assert.NotNil(t, ttl.newTimestamps) + assert.NotNil(t, ttl.ch) + assert.NotNil(t, ttl.done) + }) + + t.Run("IsExpired_NoExistingTimestamp", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + // When no timestamp exists for a group, it should be considered expired + assert.True(t, ttl.IsExpired("TestGroup")) + }) + + t.Run("IsExpired_WithExpiredTimestamp", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + // Set an expired timestamp (more than ttlTime in the past) + expiredTime := time.Now().Add(-10 * time.Minute) + ttl.oldTimestamps["TestGroup"] = expiredTime + + assert.True(t, ttl.IsExpired("TestGroup")) + }) + + t.Run("IsExpired_WithValidTimestamp", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + // Set a valid timestamp (less than ttlTime in the past) + validTime := time.Now().Add(-1 * time.Minute) + ttl.oldTimestamps["TestGroup"] = validTime + + assert.False(t, ttl.IsExpired("TestGroup")) + }) + + t.Run("Update_SavesTimestamp", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + ttl.Update("TestGroup") + + time.Sleep(100 * time.Millisecond) + + ttl.mu.RLock() + _, exists := ttl.newTimestamps["TestGroup"] + ttl.mu.RUnlock() + + assert.True(t, exists) + }) + + t.Run("UpdateFromFile_CopiesOldTimestamp", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + // Set an old timestamp + oldTime := time.Now().Add(-1 * time.Minute) + ttl.oldTimestamps["TestGroup"] = oldTime + + // Persist old timestamp to new timestamp + ttl.UpdateFromFile("TestGroup") + + time.Sleep(200 * time.Millisecond) + + ttl.mu.RLock() + newTime, exists := ttl.newTimestamps["TestGroup"] + ttl.mu.RUnlock() + + assert.True(t, exists) + assert.Equal(t, oldTime.UnixMilli(), newTime.UnixMilli()) + }) + + t.Run("saveTTLState_WritesFile", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + now := time.Now() + ttl.mu.Lock() + ttl.newTimestamps["group1"] = now + ttl.newTimestamps["group2"] = now.Add(1 * time.Minute) + ttl.mu.Unlock() + + ttl.saveTTLState() + + _, err := os.Stat(ttl.stateFilePath) + assert.NoError(t, err) + + content, err := os.ReadFile(ttl.stateFilePath) + assert.NoError(t, err) + + contentStr := string(content) + assert.Contains(t, contentStr, "group1:") + assert.Contains(t, contentStr, "group2:") + }) + + t.Run("loadTTLState_ReadsFile", func(t *testing.T) { + tempDir := t.TempDir() + stateFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName) + + // Create a state file + now := time.Now() + nowMillis := now.UnixMilli() + content := "group1:" + strconv.FormatInt(nowMillis, 10) + "\n" + + "group2:" + strconv.FormatInt(nowMillis+60000, 10) + "\n" + + err := os.WriteFile(stateFilePath, []byte(content), 0644) // nolint:gosec + assert.NoError(t, err) + + // Create a new TTL instance that will load the file + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + time.Sleep(200 * time.Millisecond) + + assert.Len(t, ttl.oldTimestamps, 2) + assert.Contains(t, ttl.oldTimestamps, "group1") + assert.Contains(t, ttl.oldTimestamps, "group2") + + assert.Equal(t, nowMillis, ttl.oldTimestamps["group1"].UnixMilli()) + assert.Equal(t, nowMillis+60000, ttl.oldTimestamps["group2"].UnixMilli()) + }) + + t.Run("loadTTLState_HandlesInvalidFile", func(t *testing.T) { + tempDir := t.TempDir() + stateFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName) + + // Create an invalid state file + content := "group1:invalid_timestamp\n" + + "group2:123456789\n" + + "\n" + // Empty line should be skipped + "invalid_line_no_separator\n" + + err := os.WriteFile(stateFilePath, []byte(content), 0644) // nolint:gosec + assert.NoError(t, err) + + ttl := NewRetentionPolicyTTL(logger, tempDir) + defer cleanupTTLFile(tempDir) + defer ttl.Done() + + assert.Len(t, ttl.oldTimestamps, 1) + assert.Contains(t, ttl.oldTimestamps, "group2") + assert.NotContains(t, ttl.oldTimestamps, "group1") + assert.NotContains(t, ttl.oldTimestamps, "invalid_line_no_separator") + }) + + t.Run("Done_ClosesChannel", func(t *testing.T) { + tempDir := t.TempDir() + ttl := NewRetentionPolicyTTL(logger, tempDir) + + ttl.Update("TestGroup") + time.Sleep(100 * time.Millisecond) + ttl.Done() + time.Sleep(100 * time.Millisecond) + + _, err := os.Stat(ttl.stateFilePath) + assert.NoError(t, err) + }) + + t.Run("escapeLogGroup", func(t *testing.T) { + testCases := []struct { + input string + expected string + }{ + {"/aws/lambda/function", "_aws_lambda_function"}, + {"my log group", "my_log_group"}, + {"group:with:colons", "group_with_colons"}, + {"/path/with/slashes", "_path_with_slashes"}, + {"normal-group", "normal-group"}, + } + + for _, tc := range testCases { + result := escapeLogGroup(tc.input) + assert.Equal(t, tc.expected, result) + } + }) +} + +func cleanupTTLFile(dir string) { + time.Sleep(100 * time.Millisecond) + os.Remove(filepath.Join(dir, logscommon.RetentionPolicyTTLFileName)) +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go index 2c8716fcf2..cc061c7f42 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go @@ -58,6 +58,10 @@ func (m *mockTargetManager) PutRetentionPolicy(target Target) { m.Called(target) } +func (m *mockTargetManager) Stop() { + m.Called() +} + func TestSender(t *testing.T) { logger := testutil.NewNopLogger() diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go index aa78ba85e1..cd8d93726b 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target.go @@ -33,6 +33,7 @@ type Target struct { type TargetManager interface { InitTarget(target Target) error PutRetentionPolicy(target Target) + Stop() } type targetManager struct { @@ -44,16 +45,19 @@ type targetManager struct { mu sync.Mutex dlg chan Target prp chan Target + // cache of most recent retention policy changes + retentionPolicyTTL *RetentionPolicyTTL } -func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager { +func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService, fileStateFolder string) TargetManager { tm := &targetManager{ - logger: logger, - service: service, - cache: make(map[Target]time.Time), - cacheTTL: cacheTTL, - dlg: make(chan Target, retentionChannelSize), - prp: make(chan Target, retentionChannelSize), + logger: logger, + service: service, + cache: make(map[Target]time.Time), + cacheTTL: cacheTTL, + dlg: make(chan Target, retentionChannelSize), + prp: make(chan Target, retentionChannelSize), + retentionPolicyTTL: NewRetentionPolicyTTL(logger, fileStateFolder), } go tm.processDescribeLogGroup() @@ -95,6 +99,12 @@ func (m *targetManager) PutRetentionPolicy(target Target) { } } +func (m *targetManager) Stop() { + if m.retentionPolicyTTL != nil { + m.retentionPolicyTTL.Done() + } +} + func (m *targetManager) createLogGroupAndStream(t Target) (bool, error) { err := m.createLogStream(t) if m.isLogStreamCreated(err, t.Stream) { @@ -175,6 +185,9 @@ func (m *targetManager) createLogStream(t Target) error { func (m *targetManager) processDescribeLogGroup() { for target := range m.dlg { + if !m.isTTLExpired(target) { + continue + } for attempt := 0; attempt < numBackoffRetries; attempt++ { currentRetention, err := m.getRetention(target) if err != nil { @@ -187,7 +200,9 @@ func (m *targetManager) processDescribeLogGroup() { m.logger.Debugf("queueing log group %v to update retention policy", target.Group) m.prp <- target } - break // no change in retention + // no change in retention + m.retentionPolicyTTL.Update(target.Group) + break } } } @@ -216,6 +231,9 @@ func (m *targetManager) getRetention(target Target) (int, error) { func (m *targetManager) processPutRetentionPolicy() { for target := range m.prp { + if !m.isTTLExpired(target) { + continue + } var updated bool for attempt := 0; attempt < numBackoffRetries; attempt++ { err := m.updateRetentionPolicy(target) @@ -230,6 +248,8 @@ func (m *targetManager) processPutRetentionPolicy() { if !updated { m.logger.Errorf("failed to update retention policy for target %v after %d attempts", target, numBackoffRetries) + } else { + m.retentionPolicyTTL.Update(target.Group) } } } @@ -258,3 +278,13 @@ func (m *targetManager) calculateBackoff(retryCount int) time.Duration { } return withJitter(delay) } + +func (m *targetManager) isTTLExpired(target Target) bool { + expired := m.retentionPolicyTTL.IsExpired(target.Group) + if !expired { + // Persist the old timestamp from the state file so that the TTL + // from the last agent run is not lost + m.retentionPolicyTTL.UpdateFromFile(target.Group) + } + return expired +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index 1224800c73..43d61e7995 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -4,6 +4,9 @@ package pusher import ( + "os" + "path/filepath" + "strconv" "sync" "sync/atomic" "testing" @@ -14,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/aws/amazon-cloudwatch-agent/internal/logscommon" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" "github.com/aws/amazon-cloudwatch-agent/tool/testutil" ) @@ -27,7 +31,8 @@ func TestTargetManager(t *testing.T) { mockService := new(mockLogsService) mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) @@ -44,7 +49,8 @@ func TestTargetManager(t *testing.T) { mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, nil).Once() mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) @@ -61,7 +67,8 @@ func TestTargetManager(t *testing.T) { mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once() mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) @@ -78,7 +85,8 @@ func TestTargetManager(t *testing.T) { mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once() mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, &cloudwatchlogs.AccessDeniedException{}).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.Error(t, err) @@ -95,7 +103,8 @@ func TestTargetManager(t *testing.T) { mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, nil).Once() mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) @@ -112,7 +121,8 @@ func TestTargetManager(t *testing.T) { mockService.On("CreateLogGroup", mock.Anything). Return(&cloudwatchlogs.CreateLogGroupOutput{}, awserr.New("SomeAWSError", "Failed to create log group", nil)).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.Error(t, err) @@ -135,7 +145,8 @@ func TestTargetManager(t *testing.T) { }, nil).Once() mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) // Wait for async operations to complete @@ -158,7 +169,8 @@ func TestTargetManager(t *testing.T) { }, }, nil).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) time.Sleep(100 * time.Millisecond) @@ -176,7 +188,8 @@ func TestTargetManager(t *testing.T) { mockService.On("DescribeLogGroups", mock.Anything). Return(&cloudwatchlogs.DescribeLogGroupsOutput{}, &cloudwatchlogs.ResourceNotFoundException{}).Times(numBackoffRetries) - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) time.Sleep(30 * time.Second) @@ -203,7 +216,8 @@ func TestTargetManager(t *testing.T) { Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, awserr.New("SomeAWSError", "Failed to set retention policy", nil)).Times(numBackoffRetries) - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) time.Sleep(30 * time.Second) @@ -216,7 +230,8 @@ func TestTargetManager(t *testing.T) { mockService := new(mockLogsService) - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) manager.PutRetentionPolicy(target) mockService.AssertNotCalled(t, "PutRetentionPolicy", mock.Anything) @@ -237,7 +252,8 @@ func TestTargetManager(t *testing.T) { return &cloudwatchlogs.CreateLogStreamOutput{}, nil } - manager := NewTargetManager(logger, service) + tempDir := t.TempDir() + manager := NewTargetManager(logger, service, tempDir) var wg sync.WaitGroup for i := 0; i < 50; i++ { wg.Add(1) @@ -263,7 +279,8 @@ func TestTargetManager(t *testing.T) { return &cloudwatchlogs.CreateLogStreamOutput{}, nil } - manager := NewTargetManager(logger, service) + tempDir := t.TempDir() + manager := NewTargetManager(logger, service, tempDir) manager.(*targetManager).cacheTTL = 50 * time.Millisecond for i := 0; i < 10; i++ { err := manager.InitTarget(target) @@ -288,7 +305,8 @@ func TestTargetManager(t *testing.T) { mockService := new(mockLogsService) mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) @@ -311,7 +329,8 @@ func TestTargetManager(t *testing.T) { return *input.LogGroupName == target.Group && *input.RetentionInDays == int64(target.Retention) })).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once() - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) @@ -332,7 +351,8 @@ func TestTargetManager(t *testing.T) { // fails but should retry mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, awserr.New("InternalError", "Internal error", nil)).Times(numBackoffRetries) - manager := NewTargetManager(logger, mockService) + tempDir := t.TempDir() + manager := NewTargetManager(logger, mockService, tempDir) err := manager.InitTarget(target) assert.NoError(t, err) @@ -344,6 +364,103 @@ func TestTargetManager(t *testing.T) { }) } +func TestTargetManagerWithTTL(t *testing.T) { + logger := testutil.NewNopLogger() + + t.Run("TTLExpired_CallsAPI", func(t *testing.T) { + tempDir := t.TempDir() + target := Target{Group: "TestGroup", Stream: "TestStream", Retention: 7} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() + mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String(target.Group), + RetentionInDays: aws.Int64(0), + }, + }, + }, nil).Once() + mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once() + + manager := NewTargetManager(logger, mockService, tempDir) + err := manager.InitTarget(target) + assert.NoError(t, err) + time.Sleep(200 * time.Millisecond) + + mockService.AssertExpectations(t) + + manager.Stop() + time.Sleep(200 * time.Millisecond) + + ttlFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName) + content, err := os.ReadFile(ttlFilePath) + assert.NoError(t, err) + assert.Contains(t, string(content), escapeLogGroup(target.Group)) + }) + + t.Run("TTLNotExpired_SkipsAPI", func(t *testing.T) { + tempDir := t.TempDir() + target := Target{Group: "TestGroup", Stream: "TestStream", Retention: 7} + + // Create a TTL file with a recent timestamp + ttlFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName) + now := time.Now() + content := escapeLogGroup(target.Group) + ":" + strconv.FormatInt(now.UnixMilli(), 10) + "\n" + err := os.WriteFile(ttlFilePath, []byte(content), 0644) // nolint:gosec + assert.NoError(t, err) + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() + // DescribeLogGroups should not be called because TTL is not expired + + manager := NewTargetManager(logger, mockService, tempDir) + err = manager.InitTarget(target) + assert.NoError(t, err) + time.Sleep(100 * time.Millisecond) + + manager.Stop() + time.Sleep(100 * time.Millisecond) + + mockService.AssertExpectations(t) + mockService.AssertNotCalled(t, "DescribeLogGroups") + mockService.AssertNotCalled(t, "PutRetentionPolicy") + + _, err = os.Stat(ttlFilePath) + assert.NoError(t, err) + }) + + t.Run("Stop_SavesTTLState", func(t *testing.T) { + tempDir := t.TempDir() + target := Target{Group: "TestGroup", Stream: "TestStream", Retention: 7} + + mockService := new(mockLogsService) + mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once() + mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{ + LogGroups: []*cloudwatchlogs.LogGroup{ + { + LogGroupName: aws.String(target.Group), + RetentionInDays: aws.Int64(0), + }, + }, + }, nil).Once() + mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once() + + manager := NewTargetManager(logger, mockService, tempDir) + err := manager.InitTarget(target) + assert.NoError(t, err) + time.Sleep(200 * time.Millisecond) + + manager.Stop() + time.Sleep(200 * time.Millisecond) + + ttlFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName) + content, err := os.ReadFile(ttlFilePath) + assert.NoError(t, err) + assert.Contains(t, string(content), escapeLogGroup(target.Group)) + }) +} + func TestCalculateBackoff(t *testing.T) { manager := &targetManager{} // should never exceed 30sec of total wait time diff --git a/translator/translate/logs/logs_test.go b/translator/translate/logs/logs_test.go index fe1dc0583d..59c8bb5ff1 100644 --- a/translator/translate/logs/logs_test.go +++ b/translator/translate/logs/logs_test.go @@ -13,6 +13,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" "github.com/aws/amazon-cloudwatch-agent/translator/translate/agent" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util" "github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil" ) @@ -36,6 +37,7 @@ func TestLogs(t *testing.T) { "region_type": "any", "mode": "", "log_stream_name": "LOG_STREAM_NAME", + "file_state_folder": util.GetFileStateFolder(), "force_flush_interval": "5s", }, }, @@ -68,6 +70,7 @@ func TestLogs_LogStreamName(t *testing.T) { "region_type": "any", "mode": "OP", "log_stream_name": hostname, + "file_state_folder": util.GetFileStateFolder(), "force_flush_interval": "5s", }, }, @@ -95,6 +98,7 @@ func TestLogs_LogStreamName(t *testing.T) { "region_type": "any", "mode": "", "log_stream_name": "arn_aws_ecs_us-east-2_012345678910_task/cluster-name/9781c248-0edd-4cdb-9a93-f63cb662a5d3", + "file_state_folder": util.GetFileStateFolder(), "force_flush_interval": "5s", }, }, @@ -119,6 +123,7 @@ func TestLogs_LogStreamName(t *testing.T) { "region_type": "any", "mode": "", "log_stream_name": "demo-app-5ffc89b95c-jgnf6", + "file_state_folder": util.GetFileStateFolder(), "force_flush_interval": "5s", }, }, @@ -154,6 +159,7 @@ func TestLogs_ForceFlushInterval(t *testing.T) { "region_type": "any", "mode": "OP", "log_stream_name": hostname, + "file_state_folder": util.GetFileStateFolder(), "force_flush_interval": "10s", }, }, @@ -190,6 +196,7 @@ func TestLogs_EndpointOverride(t *testing.T) { "mode": "OP", "endpoint_override": "https://logs-fips.us-east-1.amazonaws.com", "log_stream_name": hostname, + "file_state_folder": util.GetFileStateFolder(), "force_flush_interval": "5s", }, }, diff --git a/translator/translate/logs/ruleFileStateFolder.go b/translator/translate/logs/ruleFileStateFolder.go new file mode 100644 index 0000000000..08ddae23b3 --- /dev/null +++ b/translator/translate/logs/ruleFileStateFolder.go @@ -0,0 +1,22 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package logs + +import ( + "github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util" +) + +type FileStateFolder struct { +} + +// FileStateFolder is internal value, not exposing to customer +func (f *FileStateFolder) ApplyRule(_ interface{}) (string, interface{}) { + res := map[string]interface{}{} + res["file_state_folder"] = util.GetFileStateFolder() + return Output_Cloudwatch_Logs, res +} +func init() { + f := new(FileStateFolder) + RegisterRule("file_state_folder", f) +}