Skip to content

Add TTL for retention policy #1699

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/logscommon/const.go
Original file line number Diff line number Diff line change
@@ -34,4 +34,6 @@ const (
LogType = "log_type"

LogBackpressureModeKey = "backpressure_mode"

RetentionPolicyTTLFileName = "Amazon_CloudWatch_RetentionPolicyTTL"
)
2 changes: 1 addition & 1 deletion plugins/inputs/logfile/logfile.go
Original file line number Diff line number Diff line change
@@ -373,7 +373,7 @@ func (t *LogFile) cleanupStateFolder() {
continue
}

if strings.Contains(file, logscommon.WindowsEventLogPrefix) {
if strings.Contains(file, logscommon.WindowsEventLogPrefix) || strings.Contains(file, logscommon.RetentionPolicyTTLFileName) {
continue
}

10 changes: 7 additions & 3 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
@@ -63,8 +63,9 @@ type CloudWatchLogs struct {
Token string `toml:"token"`

//log group and stream names
LogStreamName string `toml:"log_stream_name"`
LogGroupName string `toml:"log_group_name"`
LogStreamName string `toml:"log_stream_name"`
LogGroupName string `toml:"log_group_name"`
FileStateFolder string `toml:"file_state_folder"`

// Retention for log group
RetentionInDays int `toml:"retention_in_days"`
@@ -97,6 +98,9 @@ func (c *CloudWatchLogs) Close() error {
if c.workerPool != nil {
c.workerPool.Stop()
}
if c.targetManager != nil {
c.targetManager.Stop()
}

return nil
}
@@ -144,7 +148,7 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
if c.Concurrency > 0 {
c.workerPool = pusher.NewWorkerPool(c.Concurrency)
}
c.targetManager = pusher.NewTargetManager(c.Log, client)
c.targetManager = pusher.NewTargetManager(c.Log, client, c.FileStateFolder)
})
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup)
cwd := &cwDest{pusher: p, retryer: logThrottleRetryer}
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ package pusher
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
@@ -17,6 +19,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent/internal/logscommon"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
@@ -674,7 +677,10 @@ func testPreparationWithLogger(
) (chan struct{}, *queue) {
t.Helper()
stop := make(chan struct{})
tm := NewTargetManager(logger, service)
tempDir := t.TempDir()
file, _ := os.Create(filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName))
file.Close()
tm := NewTargetManager(logger, service, tempDir)
s := newSender(logger, service, tm, retryDuration, stop)
q := newQueue(
logger,
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package pusher

import (
"bufio"
"bytes"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"

"github.com/aws/amazon-cloudwatch-agent/internal/logscommon"
)

const (
ttlTime = 5 * time.Minute
)

type payload struct {
group string
timestamp time.Time
}

type RetentionPolicyTTL struct {
logger telegraf.Logger
stateFilePath string
// oldTimestamps come from the TTL file on agent start. Key is escaped group name
oldTimestamps map[string]time.Time
// newTimestamps are the new TTLs that will be saved periodically and when the agent is done. Key is escaped group name
newTimestamps map[string]time.Time
mu sync.RWMutex
ch chan payload
done chan struct{}
}

func NewRetentionPolicyTTL(logger telegraf.Logger, fileStatePath string) *RetentionPolicyTTL {
r := &RetentionPolicyTTL{
logger: logger,
stateFilePath: filepath.Join(fileStatePath, logscommon.RetentionPolicyTTLFileName),
oldTimestamps: make(map[string]time.Time),
newTimestamps: make(map[string]time.Time),
ch: make(chan payload, retentionChannelSize),
done: make(chan struct{}),
}

r.loadTTLState()
go r.process()
return r
}

// Update will update the newTimestamps to the current time that will later be persisted to disk.
func (r *RetentionPolicyTTL) Update(group string) {
r.ch <- payload{
group: group,
timestamp: time.Now(),
}
}

func (r *RetentionPolicyTTL) Done() {
close(r.done)
}

// IsExpired checks from the timestamps in the read state file at the agent start.
func (r *RetentionPolicyTTL) IsExpired(group string) bool {
if ts, ok := r.oldTimestamps[escapeLogGroup(group)]; ok {
return ts.Add(ttlTime).Before(time.Now())
}
// Log group was not in state file -- default to expired
return true
}

// UpdateFromFile updates the newTimestamps cache using the timestamp from the loaded state file.
func (r *RetentionPolicyTTL) UpdateFromFile(group string) {
if oldTs, ok := r.oldTimestamps[escapeLogGroup(group)]; ok {
r.ch <- payload{
group: group,
timestamp: oldTs,
}
}
}

func (r *RetentionPolicyTTL) loadTTLState() {
if _, err := os.Stat(r.stateFilePath); err != nil {
r.logger.Debug("retention policy ttl state file does not exist")
return
}

file, err := os.Open(r.stateFilePath)
if err != nil {
r.logger.Errorf("unable to open retention policy ttl state file: %v", err)
return
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if len(line) == 0 {
continue
}
split := strings.Split(line, ":")
if len(split) < 2 {
r.logger.Errorf("invalid format in retention policy ttl state file: %s", line)
continue
}

group := split[0]
timestamp, err := strconv.ParseInt(split[1], 10, 64)
if err != nil {
r.logger.Errorf("unable to parse timestamp in retention policy ttl for group %s: %v", group, err)
continue
}
r.oldTimestamps[group] = time.UnixMilli(timestamp)
}

if err := scanner.Err(); err != nil {
r.logger.Errorf("error when parsing retention policy ttl state file: %v", err)
return
}
}

func (r *RetentionPolicyTTL) process() {
t := time.NewTicker(time.Minute)
defer t.Stop()

for {
select {
case payload := <-r.ch:
r.updateTimestamp(payload.group, payload.timestamp)
case <-t.C:
r.saveTTLState()
case <-r.done:
r.saveTTLState()
return
}
}
}

func (r *RetentionPolicyTTL) updateTimestamp(group string, timestamp time.Time) {
r.mu.Lock()
defer r.mu.Unlock()
r.newTimestamps[escapeLogGroup(group)] = timestamp
}

func (r *RetentionPolicyTTL) saveTTLState() {
r.mu.RLock()
defer r.mu.RUnlock()

var buf bytes.Buffer
for group, timestamp := range r.newTimestamps {
buf.Write([]byte(group + ":" + strconv.FormatInt(timestamp.UnixMilli(), 10) + "\n"))
}

err := os.WriteFile(r.stateFilePath, buf.Bytes(), 0644) // nolint:gosec
if err != nil {
r.logger.Errorf("unable to write retention policy ttl state file: %v", err)
}
}

func escapeLogGroup(group string) string {
escapedLogGroup := filepath.ToSlash(group)
escapedLogGroup = strings.Replace(escapedLogGroup, "/", "_", -1)
escapedLogGroup = strings.Replace(escapedLogGroup, " ", "_", -1)
escapedLogGroup = strings.Replace(escapedLogGroup, ":", "_", -1)
return escapedLogGroup
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package pusher

import (
"os"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/aws/amazon-cloudwatch-agent/internal/logscommon"
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
)

func TestRetentionPolicyTTL(t *testing.T) {
logger := testutil.NewNopLogger()

t.Run("NewRetentionPolicyTTL", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer ttl.Done()
defer cleanupTTLFile(tempDir)

assert.NotNil(t, ttl)
assert.Equal(t, filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName), ttl.stateFilePath)
assert.NotNil(t, ttl.oldTimestamps)
assert.NotNil(t, ttl.newTimestamps)
assert.NotNil(t, ttl.ch)
assert.NotNil(t, ttl.done)
})

t.Run("IsExpired_NoExistingTimestamp", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

// When no timestamp exists for a group, it should be considered expired
assert.True(t, ttl.IsExpired("TestGroup"))
})

t.Run("IsExpired_WithExpiredTimestamp", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

// Set an expired timestamp (more than ttlTime in the past)
expiredTime := time.Now().Add(-10 * time.Minute)
ttl.oldTimestamps["TestGroup"] = expiredTime

assert.True(t, ttl.IsExpired("TestGroup"))
})

t.Run("IsExpired_WithValidTimestamp", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

// Set a valid timestamp (less than ttlTime in the past)
validTime := time.Now().Add(-1 * time.Minute)
ttl.oldTimestamps["TestGroup"] = validTime

assert.False(t, ttl.IsExpired("TestGroup"))
})

t.Run("Update_SavesTimestamp", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

ttl.Update("TestGroup")

time.Sleep(100 * time.Millisecond)

ttl.mu.RLock()
_, exists := ttl.newTimestamps["TestGroup"]
ttl.mu.RUnlock()

assert.True(t, exists)
})

t.Run("UpdateFromFile_CopiesOldTimestamp", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

// Set an old timestamp
oldTime := time.Now().Add(-1 * time.Minute)
ttl.oldTimestamps["TestGroup"] = oldTime

// Persist old timestamp to new timestamp
ttl.UpdateFromFile("TestGroup")

time.Sleep(200 * time.Millisecond)

ttl.mu.RLock()
newTime, exists := ttl.newTimestamps["TestGroup"]
ttl.mu.RUnlock()

assert.True(t, exists)
assert.Equal(t, oldTime.UnixMilli(), newTime.UnixMilli())
})

t.Run("saveTTLState_WritesFile", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

now := time.Now()
ttl.mu.Lock()
ttl.newTimestamps["group1"] = now
ttl.newTimestamps["group2"] = now.Add(1 * time.Minute)
ttl.mu.Unlock()

ttl.saveTTLState()

_, err := os.Stat(ttl.stateFilePath)
assert.NoError(t, err)

content, err := os.ReadFile(ttl.stateFilePath)
assert.NoError(t, err)

contentStr := string(content)
assert.Contains(t, contentStr, "group1:")
assert.Contains(t, contentStr, "group2:")
})

t.Run("loadTTLState_ReadsFile", func(t *testing.T) {
tempDir := t.TempDir()
stateFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName)

// Create a state file
now := time.Now()
nowMillis := now.UnixMilli()
content := "group1:" + strconv.FormatInt(nowMillis, 10) + "\n" +
"group2:" + strconv.FormatInt(nowMillis+60000, 10) + "\n"

err := os.WriteFile(stateFilePath, []byte(content), 0644) // nolint:gosec
assert.NoError(t, err)

// Create a new TTL instance that will load the file
ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

time.Sleep(200 * time.Millisecond)

assert.Len(t, ttl.oldTimestamps, 2)
assert.Contains(t, ttl.oldTimestamps, "group1")
assert.Contains(t, ttl.oldTimestamps, "group2")

assert.Equal(t, nowMillis, ttl.oldTimestamps["group1"].UnixMilli())
assert.Equal(t, nowMillis+60000, ttl.oldTimestamps["group2"].UnixMilli())
})

t.Run("loadTTLState_HandlesInvalidFile", func(t *testing.T) {
tempDir := t.TempDir()
stateFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName)

// Create an invalid state file
content := "group1:invalid_timestamp\n" +
"group2:123456789\n" +
"\n" + // Empty line should be skipped
"invalid_line_no_separator\n"

err := os.WriteFile(stateFilePath, []byte(content), 0644) // nolint:gosec
assert.NoError(t, err)

ttl := NewRetentionPolicyTTL(logger, tempDir)
defer cleanupTTLFile(tempDir)
defer ttl.Done()

assert.Len(t, ttl.oldTimestamps, 1)
assert.Contains(t, ttl.oldTimestamps, "group2")
assert.NotContains(t, ttl.oldTimestamps, "group1")
assert.NotContains(t, ttl.oldTimestamps, "invalid_line_no_separator")
})

t.Run("Done_ClosesChannel", func(t *testing.T) {
tempDir := t.TempDir()
ttl := NewRetentionPolicyTTL(logger, tempDir)

ttl.Update("TestGroup")
time.Sleep(100 * time.Millisecond)
ttl.Done()
time.Sleep(100 * time.Millisecond)

_, err := os.Stat(ttl.stateFilePath)
assert.NoError(t, err)
})

t.Run("escapeLogGroup", func(t *testing.T) {
testCases := []struct {
input string
expected string
}{
{"/aws/lambda/function", "_aws_lambda_function"},
{"my log group", "my_log_group"},
{"group:with:colons", "group_with_colons"},
{"/path/with/slashes", "_path_with_slashes"},
{"normal-group", "normal-group"},
}

for _, tc := range testCases {
result := escapeLogGroup(tc.input)
assert.Equal(t, tc.expected, result)
}
})
}

func cleanupTTLFile(dir string) {
time.Sleep(100 * time.Millisecond)
os.Remove(filepath.Join(dir, logscommon.RetentionPolicyTTLFileName))
}
Original file line number Diff line number Diff line change
@@ -58,6 +58,10 @@ func (m *mockTargetManager) PutRetentionPolicy(target Target) {
m.Called(target)
}

func (m *mockTargetManager) Stop() {
m.Called()
}

func TestSender(t *testing.T) {
logger := testutil.NewNopLogger()

46 changes: 38 additions & 8 deletions plugins/outputs/cloudwatchlogs/internal/pusher/target.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ type Target struct {
type TargetManager interface {
InitTarget(target Target) error
PutRetentionPolicy(target Target)
Stop()
}

type targetManager struct {
@@ -44,16 +45,19 @@ type targetManager struct {
mu sync.Mutex
dlg chan Target
prp chan Target
// cache of most recent retention policy changes
retentionPolicyTTL *RetentionPolicyTTL
}

func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) TargetManager {
func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService, fileStateFolder string) TargetManager {
tm := &targetManager{
logger: logger,
service: service,
cache: make(map[Target]time.Time),
cacheTTL: cacheTTL,
dlg: make(chan Target, retentionChannelSize),
prp: make(chan Target, retentionChannelSize),
logger: logger,
service: service,
cache: make(map[Target]time.Time),
cacheTTL: cacheTTL,
dlg: make(chan Target, retentionChannelSize),
prp: make(chan Target, retentionChannelSize),
retentionPolicyTTL: NewRetentionPolicyTTL(logger, fileStateFolder),
}

go tm.processDescribeLogGroup()
@@ -95,6 +99,12 @@ func (m *targetManager) PutRetentionPolicy(target Target) {
}
}

func (m *targetManager) Stop() {
if m.retentionPolicyTTL != nil {
m.retentionPolicyTTL.Done()
}
}

func (m *targetManager) createLogGroupAndStream(t Target) (bool, error) {
err := m.createLogStream(t)
if m.isLogStreamCreated(err, t.Stream) {
@@ -175,6 +185,9 @@ func (m *targetManager) createLogStream(t Target) error {

func (m *targetManager) processDescribeLogGroup() {
for target := range m.dlg {
if !m.isTTLExpired(target) {
continue
}
for attempt := 0; attempt < numBackoffRetries; attempt++ {
currentRetention, err := m.getRetention(target)
if err != nil {
@@ -187,7 +200,9 @@ func (m *targetManager) processDescribeLogGroup() {
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
m.prp <- target
}
break // no change in retention
// no change in retention
m.retentionPolicyTTL.Update(target.Group)
break
}
}
}
@@ -216,6 +231,9 @@ func (m *targetManager) getRetention(target Target) (int, error) {

func (m *targetManager) processPutRetentionPolicy() {
for target := range m.prp {
if !m.isTTLExpired(target) {
continue
}
var updated bool
for attempt := 0; attempt < numBackoffRetries; attempt++ {
err := m.updateRetentionPolicy(target)
@@ -230,6 +248,8 @@ func (m *targetManager) processPutRetentionPolicy() {

if !updated {
m.logger.Errorf("failed to update retention policy for target %v after %d attempts", target, numBackoffRetries)
} else {
m.retentionPolicyTTL.Update(target.Group)
}
}
}
@@ -258,3 +278,13 @@ func (m *targetManager) calculateBackoff(retryCount int) time.Duration {
}
return withJitter(delay)
}

func (m *targetManager) isTTLExpired(target Target) bool {
expired := m.retentionPolicyTTL.IsExpired(target.Group)
if !expired {
// Persist the old timestamp from the state file so that the TTL
// from the last agent run is not lost
m.retentionPolicyTTL.UpdateFromFile(target.Group)
}
return expired
}
149 changes: 133 additions & 16 deletions plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,9 @@
package pusher

import (
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"testing"
@@ -14,6 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/aws/amazon-cloudwatch-agent/internal/logscommon"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
)
@@ -27,7 +31,8 @@ func TestTargetManager(t *testing.T) {
mockService := new(mockLogsService)
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)

assert.NoError(t, err)
@@ -44,7 +49,8 @@ func TestTargetManager(t *testing.T) {
mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, nil).Once()
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)

assert.NoError(t, err)
@@ -61,7 +67,8 @@ func TestTargetManager(t *testing.T) {
mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once()
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)

assert.NoError(t, err)
@@ -78,7 +85,8 @@ func TestTargetManager(t *testing.T) {
mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once()
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, &cloudwatchlogs.AccessDeniedException{}).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)

assert.Error(t, err)
@@ -95,7 +103,8 @@ func TestTargetManager(t *testing.T) {
mockService.On("CreateLogGroup", mock.Anything).Return(&cloudwatchlogs.CreateLogGroupOutput{}, nil).Once()
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, &cloudwatchlogs.ResourceAlreadyExistsException{}).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)

assert.NoError(t, err)
@@ -112,7 +121,8 @@ func TestTargetManager(t *testing.T) {
mockService.On("CreateLogGroup", mock.Anything).
Return(&cloudwatchlogs.CreateLogGroupOutput{}, awserr.New("SomeAWSError", "Failed to create log group", nil)).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)

assert.Error(t, err)
@@ -135,7 +145,8 @@ func TestTargetManager(t *testing.T) {
}, nil).Once()
mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)
// Wait for async operations to complete
@@ -158,7 +169,8 @@ func TestTargetManager(t *testing.T) {
},
}, nil).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)
@@ -176,7 +188,8 @@ func TestTargetManager(t *testing.T) {
mockService.On("DescribeLogGroups", mock.Anything).
Return(&cloudwatchlogs.DescribeLogGroupsOutput{}, &cloudwatchlogs.ResourceNotFoundException{}).Times(numBackoffRetries)

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)
time.Sleep(30 * time.Second)
@@ -203,7 +216,8 @@ func TestTargetManager(t *testing.T) {
Return(&cloudwatchlogs.PutRetentionPolicyOutput{},
awserr.New("SomeAWSError", "Failed to set retention policy", nil)).Times(numBackoffRetries)

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)
time.Sleep(30 * time.Second)
@@ -216,7 +230,8 @@ func TestTargetManager(t *testing.T) {

mockService := new(mockLogsService)

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
manager.PutRetentionPolicy(target)

mockService.AssertNotCalled(t, "PutRetentionPolicy", mock.Anything)
@@ -237,7 +252,8 @@ func TestTargetManager(t *testing.T) {
return &cloudwatchlogs.CreateLogStreamOutput{}, nil
}

manager := NewTargetManager(logger, service)
tempDir := t.TempDir()
manager := NewTargetManager(logger, service, tempDir)
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
@@ -263,7 +279,8 @@ func TestTargetManager(t *testing.T) {
return &cloudwatchlogs.CreateLogStreamOutput{}, nil
}

manager := NewTargetManager(logger, service)
tempDir := t.TempDir()
manager := NewTargetManager(logger, service, tempDir)
manager.(*targetManager).cacheTTL = 50 * time.Millisecond
for i := 0; i < 10; i++ {
err := manager.InitTarget(target)
@@ -288,7 +305,8 @@ func TestTargetManager(t *testing.T) {
mockService := new(mockLogsService)
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)

@@ -311,7 +329,8 @@ func TestTargetManager(t *testing.T) {
return *input.LogGroupName == target.Group && *input.RetentionInDays == int64(target.Retention)
})).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)

@@ -332,7 +351,8 @@ func TestTargetManager(t *testing.T) {
// fails but should retry
mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, awserr.New("InternalError", "Internal error", nil)).Times(numBackoffRetries)

manager := NewTargetManager(logger, mockService)
tempDir := t.TempDir()
manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)

@@ -344,6 +364,103 @@ func TestTargetManager(t *testing.T) {
})
}

func TestTargetManagerWithTTL(t *testing.T) {
logger := testutil.NewNopLogger()

t.Run("TTLExpired_CallsAPI", func(t *testing.T) {
tempDir := t.TempDir()
target := Target{Group: "TestGroup", Stream: "TestStream", Retention: 7}

mockService := new(mockLogsService)
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{
{
LogGroupName: aws.String(target.Group),
RetentionInDays: aws.Int64(0),
},
},
}, nil).Once()
mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)
time.Sleep(200 * time.Millisecond)

mockService.AssertExpectations(t)

manager.Stop()
time.Sleep(200 * time.Millisecond)

ttlFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName)
content, err := os.ReadFile(ttlFilePath)
assert.NoError(t, err)
assert.Contains(t, string(content), escapeLogGroup(target.Group))
})

t.Run("TTLNotExpired_SkipsAPI", func(t *testing.T) {
tempDir := t.TempDir()
target := Target{Group: "TestGroup", Stream: "TestStream", Retention: 7}

// Create a TTL file with a recent timestamp
ttlFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName)
now := time.Now()
content := escapeLogGroup(target.Group) + ":" + strconv.FormatInt(now.UnixMilli(), 10) + "\n"
err := os.WriteFile(ttlFilePath, []byte(content), 0644) // nolint:gosec
assert.NoError(t, err)

mockService := new(mockLogsService)
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()
// DescribeLogGroups should not be called because TTL is not expired

manager := NewTargetManager(logger, mockService, tempDir)
err = manager.InitTarget(target)
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)

manager.Stop()
time.Sleep(100 * time.Millisecond)

mockService.AssertExpectations(t)
mockService.AssertNotCalled(t, "DescribeLogGroups")
mockService.AssertNotCalled(t, "PutRetentionPolicy")

_, err = os.Stat(ttlFilePath)
assert.NoError(t, err)
})

t.Run("Stop_SavesTTLState", func(t *testing.T) {
tempDir := t.TempDir()
target := Target{Group: "TestGroup", Stream: "TestStream", Retention: 7}

mockService := new(mockLogsService)
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{
{
LogGroupName: aws.String(target.Group),
RetentionInDays: aws.Int64(0),
},
},
}, nil).Once()
mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService, tempDir)
err := manager.InitTarget(target)
assert.NoError(t, err)
time.Sleep(200 * time.Millisecond)

manager.Stop()
time.Sleep(200 * time.Millisecond)

ttlFilePath := filepath.Join(tempDir, logscommon.RetentionPolicyTTLFileName)
content, err := os.ReadFile(ttlFilePath)
assert.NoError(t, err)
assert.Contains(t, string(content), escapeLogGroup(target.Group))
})
}

func TestCalculateBackoff(t *testing.T) {
manager := &targetManager{}
// should never exceed 30sec of total wait time
7 changes: 7 additions & 0 deletions translator/translate/logs/logs_test.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/translator/config"
"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/agent"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)

@@ -36,6 +37,7 @@ func TestLogs(t *testing.T) {
"region_type": "any",
"mode": "",
"log_stream_name": "LOG_STREAM_NAME",
"file_state_folder": util.GetFileStateFolder(),
"force_flush_interval": "5s",
},
},
@@ -68,6 +70,7 @@ func TestLogs_LogStreamName(t *testing.T) {
"region_type": "any",
"mode": "OP",
"log_stream_name": hostname,
"file_state_folder": util.GetFileStateFolder(),
"force_flush_interval": "5s",
},
},
@@ -95,6 +98,7 @@ func TestLogs_LogStreamName(t *testing.T) {
"region_type": "any",
"mode": "",
"log_stream_name": "arn_aws_ecs_us-east-2_012345678910_task/cluster-name/9781c248-0edd-4cdb-9a93-f63cb662a5d3",
"file_state_folder": util.GetFileStateFolder(),
"force_flush_interval": "5s",
},
},
@@ -119,6 +123,7 @@ func TestLogs_LogStreamName(t *testing.T) {
"region_type": "any",
"mode": "",
"log_stream_name": "demo-app-5ffc89b95c-jgnf6",
"file_state_folder": util.GetFileStateFolder(),
"force_flush_interval": "5s",
},
},
@@ -154,6 +159,7 @@ func TestLogs_ForceFlushInterval(t *testing.T) {
"region_type": "any",
"mode": "OP",
"log_stream_name": hostname,
"file_state_folder": util.GetFileStateFolder(),
"force_flush_interval": "10s",
},
},
@@ -190,6 +196,7 @@ func TestLogs_EndpointOverride(t *testing.T) {
"mode": "OP",
"endpoint_override": "https://logs-fips.us-east-1.amazonaws.com",
"log_stream_name": hostname,
"file_state_folder": util.GetFileStateFolder(),
"force_flush_interval": "5s",
},
},
22 changes: 22 additions & 0 deletions translator/translate/logs/ruleFileStateFolder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package logs

import (
"github.com/aws/amazon-cloudwatch-agent/translator/translate/logs/util"
)

type FileStateFolder struct {
}

// FileStateFolder is internal value, not exposing to customer
func (f *FileStateFolder) ApplyRule(_ interface{}) (string, interface{}) {
res := map[string]interface{}{}
res["file_state_folder"] = util.GetFileStateFolder()
return Output_Cloudwatch_Logs, res
}
func init() {
f := new(FileStateFolder)
RegisterRule("file_state_folder", f)
}