diff --git a/logprocesser/logparser.go b/logprocesser/logparser.go index a41bd60..42b39e9 100644 --- a/logprocesser/logparser.go +++ b/logprocesser/logparser.go @@ -441,6 +441,7 @@ func ParseAndProduce(entry LogEntry) { "is_pending": fmt.Sprint(false), "source": "MIRRORING", "direction": fmt.Sprint(1), + "tag": "{\n \"service\": \"aws-api-gateway\"\n}", } // Debug: Print the Kafka message being sent diff --git a/logprocesser/logreader.go b/logprocesser/logreader.go index ddc081b..c0789a8 100644 --- a/logprocesser/logreader.go +++ b/logprocesser/logreader.go @@ -15,12 +15,34 @@ import ( var globalTimestampTracker = NewTimestampTracker() +// lookbackWindowMs is the time window (ms) for stream/event fetch and tracker cleanup. Set in SetLookbackWindow. +var lookbackWindowMs int64 = LOG_STREAM_FETCH_TIME + +// maxStreamMapSize is set in SetLookbackWindow: 50000 for ACCOUNT_ID_FOR_6_DAYS, else defaultMaxStreamMapSize. +var maxStreamMapSize int = defaultMaxStreamMapSize + const ( - POLL_DURATION = 180000 // 3 minute in milliseconds - LOG_STREAM_FETCH_TIME = 3600000 // 1 hour in milliseconds - MAX_STREAM_MAP_SIZE = 10000 // max entries in lastReadTimestamps map + POLL_DURATION = 180000 // 3 minute in milliseconds + LOG_STREAM_FETCH_TIME = 3600000 // 1 hour in milliseconds + LOOKBACK_6_DAYS_MS = 6 * 24 * 3600 * 1000 // 6 days in ms + defaultMaxStreamMapSize = 10000 // max entries when not 6-day account + maxStreamMapSizeFor6Days = 50000 // max entries for ACCOUNT_ID_FOR_6_DAYS + ACCOUNT_ID_FOR_6_DAYS = "1729478227" ) +// SetLookbackWindow parses accountId from the JWT token and sets lookback and max stream map size: ACCOUNT_ID_FOR_6_DAYS -> 6 days, 50k streams; else 1 hour, 10k. Call before starting monitors. +func SetLookbackWindow(token string) { + accountID := utils.AccountIDFromToken(token) + if accountID == ACCOUNT_ID_FOR_6_DAYS { + lookbackWindowMs = LOOKBACK_6_DAYS_MS + maxStreamMapSize = maxStreamMapSizeFor6Days + } else { + lookbackWindowMs = LOG_STREAM_FETCH_TIME + maxStreamMapSize = defaultMaxStreamMapSize + } + utils.LogToCyborg("info", fmt.Sprintf("Lookback window: %d ms, maxStreamMapSize: %d (accountId: %q)", lookbackWindowMs, maxStreamMapSize, accountID)) +} + // TimestampTracker tracks last read timestamp per log group + stream (same structure as temp_cred). type TimestampTracker struct { mu sync.RWMutex @@ -43,14 +65,14 @@ func (t *TimestampTracker) UpdateLastReadTimestamp(logGroupName, streamName stri t.mu.Lock() defer t.mu.Unlock() t.lastReadTimestamps[logGroupName+"|"+streamName] = timestamp - if len(t.lastReadTimestamps) > MAX_STREAM_MAP_SIZE { + if len(t.lastReadTimestamps) > maxStreamMapSize { t.cleanupStaleEntries() } } func (t *TimestampTracker) cleanupStaleEntries() { - cutoffTime := time.Now().UnixMilli() - LOG_STREAM_FETCH_TIME - utils.LogToCyborg("info", fmt.Sprintf("TimestampTracker cleanup started at: %d and cutoffTime: %d", time.Now().UnixMilli(), cutoffTime)) + cutoffTime := time.Now().UnixMilli() - lookbackWindowMs + utils.LogToCyborg("info", fmt.Sprintf("TimestampTracker cleanup started at: %d and cutoffTime: %d. Total entries: %d", time.Now().UnixMilli(), cutoffTime, len(t.lastReadTimestamps))) cleanedEntries := 0 for key, ts := range t.lastReadTimestamps { if ts < cutoffTime { @@ -69,7 +91,7 @@ func MonitorLogGroup(ctx context.Context, client *cloudwatchlogs.Client, logGrou cycleStartTime := time.Now().UnixMilli() utils.DebugLog("MonitorLogGroup() - Starting new monitoring cycle at: %d for logGroup: %s", cycleStartTime, logGroupName) - lookBackTime := cycleStartTime - LOG_STREAM_FETCH_TIME + lookBackTime := cycleStartTime - lookbackWindowMs logStreams, err := FetchLogStreams(ctx, client, logGroupName, lookBackTime) if err != nil { utils.LogToCyborg("error", "Error fetching log streams: "+err.Error()) @@ -88,8 +110,8 @@ func MonitorLogGroup(ctx context.Context, client *cloudwatchlogs.Client, logGrou lastEventTs = fmt.Sprint(*stream.LastEventTimestamp) } utils.LogToCyborg("info", fmt.Sprintf("Discovered new log stream: %s (lastEventTimestamp: %s); logGroup: %s", streamName, lastEventTs, logGroupName)) - // Limit to last 1 hour for new streams to avoid processing months of old data. - lastReadTime = cycleStartTime - 1*time.Hour.Milliseconds() + // Limit new streams to lookback window to avoid processing months of old data. + lastReadTime = cycleStartTime - lookbackWindowMs } utils.DebugLog("MonitorLogGroup() - Processing stream: %s, reading from: %d for logGroup: %s", streamName, lastReadTime, logGroupName) diff --git a/main.go b/main.go index e198dde..3086ee5 100644 --- a/main.go +++ b/main.go @@ -75,6 +75,9 @@ func main() { discoverOpenAPISpec = false } + // Configurable lookback window from accountId (parsed from JWT in SetLookbackWindow) + logprocesser.SetLookbackWindow(databaseAbstractorToken) + for _, lgName := range logGroupNames { go func(name string) { utils.LogToCyborg("info", "Starting CloudWatch monitor for log group: "+name) diff --git a/trafficUtil/utils/jwt.go b/trafficUtil/utils/jwt.go new file mode 100644 index 0000000..df5a898 --- /dev/null +++ b/trafficUtil/utils/jwt.go @@ -0,0 +1,44 @@ +package utils + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" +) + +// AccountIDFromToken parses the JWT (e.g. DATABASE_ABSTRACTOR_TOKEN), decodes the payload +// without verification, and returns the accountId claim. Tries "accountId" and "account_id". +// Returns empty string if token is empty, malformed, or claim is missing. +func AccountIDFromToken(token string) string { + if token == "" { + return "" + } + parts := strings.Split(token, ".") + if len(parts) < 2 { + return "" + } + payload, err := base64.RawURLEncoding.DecodeString(parts[1]) + if err != nil { + return "" + } + var claims map[string]interface{} + if err := json.Unmarshal(payload, &claims); err != nil { + return "" + } + if id, ok := claims["accountId"]; ok { + return accountIDClaim(id) + } + return "" +} + +func accountIDClaim(v interface{}) string { + switch s := v.(type) { + case string: + return s + case float64: + return fmt.Sprintf("%.0f", s) + default: + return "" + } +}