Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions logprocesser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func ParseAndProduce(entry LogEntry) {
"is_pending": fmt.Sprint(false),
"source": "MIRRORING",
"direction": fmt.Sprint(1),
"tag": "{\n \"service\": \"aws-api-gateway\"\n}",
}

// Debug: Print the Kafka message being sent
Expand Down
40 changes: 31 additions & 9 deletions logprocesser/logreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,34 @@ import (

var globalTimestampTracker = NewTimestampTracker()

// lookbackWindowMs is the time window (ms) for stream/event fetch and tracker cleanup. Set in SetLookbackWindow.
var lookbackWindowMs int64 = LOG_STREAM_FETCH_TIME

// maxStreamMapSize is set in SetLookbackWindow: 50000 for ACCOUNT_ID_FOR_6_DAYS, else defaultMaxStreamMapSize.
var maxStreamMapSize int = defaultMaxStreamMapSize

const (
POLL_DURATION = 180000 // 3 minute in milliseconds
LOG_STREAM_FETCH_TIME = 3600000 // 1 hour in milliseconds
MAX_STREAM_MAP_SIZE = 10000 // max entries in lastReadTimestamps map
POLL_DURATION = 180000 // 3 minute in milliseconds
LOG_STREAM_FETCH_TIME = 3600000 // 1 hour in milliseconds
LOOKBACK_6_DAYS_MS = 6 * 24 * 3600 * 1000 // 6 days in ms
defaultMaxStreamMapSize = 10000 // max entries when not 6-day account
maxStreamMapSizeFor6Days = 50000 // max entries for ACCOUNT_ID_FOR_6_DAYS
ACCOUNT_ID_FOR_6_DAYS = "1729478227"
)

// SetLookbackWindow parses accountId from the JWT token and sets lookback and max stream map size: ACCOUNT_ID_FOR_6_DAYS -> 6 days, 50k streams; else 1 hour, 10k. Call before starting monitors.
func SetLookbackWindow(token string) {
accountID := utils.AccountIDFromToken(token)
if accountID == ACCOUNT_ID_FOR_6_DAYS {
lookbackWindowMs = LOOKBACK_6_DAYS_MS
maxStreamMapSize = maxStreamMapSizeFor6Days
} else {
lookbackWindowMs = LOG_STREAM_FETCH_TIME
maxStreamMapSize = defaultMaxStreamMapSize
}
utils.LogToCyborg("info", fmt.Sprintf("Lookback window: %d ms, maxStreamMapSize: %d (accountId: %q)", lookbackWindowMs, maxStreamMapSize, accountID))
}

// TimestampTracker tracks last read timestamp per log group + stream (same structure as temp_cred).
type TimestampTracker struct {
mu sync.RWMutex
Expand All @@ -43,14 +65,14 @@ func (t *TimestampTracker) UpdateLastReadTimestamp(logGroupName, streamName stri
t.mu.Lock()
defer t.mu.Unlock()
t.lastReadTimestamps[logGroupName+"|"+streamName] = timestamp
if len(t.lastReadTimestamps) > MAX_STREAM_MAP_SIZE {
if len(t.lastReadTimestamps) > maxStreamMapSize {
t.cleanupStaleEntries()
}
}

func (t *TimestampTracker) cleanupStaleEntries() {
cutoffTime := time.Now().UnixMilli() - LOG_STREAM_FETCH_TIME
utils.LogToCyborg("info", fmt.Sprintf("TimestampTracker cleanup started at: %d and cutoffTime: %d", time.Now().UnixMilli(), cutoffTime))
cutoffTime := time.Now().UnixMilli() - lookbackWindowMs
utils.LogToCyborg("info", fmt.Sprintf("TimestampTracker cleanup started at: %d and cutoffTime: %d. Total entries: %d", time.Now().UnixMilli(), cutoffTime, len(t.lastReadTimestamps)))
cleanedEntries := 0
for key, ts := range t.lastReadTimestamps {
if ts < cutoffTime {
Expand All @@ -69,7 +91,7 @@ func MonitorLogGroup(ctx context.Context, client *cloudwatchlogs.Client, logGrou
cycleStartTime := time.Now().UnixMilli()
utils.DebugLog("MonitorLogGroup() - Starting new monitoring cycle at: %d for logGroup: %s", cycleStartTime, logGroupName)

lookBackTime := cycleStartTime - LOG_STREAM_FETCH_TIME
lookBackTime := cycleStartTime - lookbackWindowMs
logStreams, err := FetchLogStreams(ctx, client, logGroupName, lookBackTime)
if err != nil {
utils.LogToCyborg("error", "Error fetching log streams: "+err.Error())
Expand All @@ -88,8 +110,8 @@ func MonitorLogGroup(ctx context.Context, client *cloudwatchlogs.Client, logGrou
lastEventTs = fmt.Sprint(*stream.LastEventTimestamp)
}
utils.LogToCyborg("info", fmt.Sprintf("Discovered new log stream: %s (lastEventTimestamp: %s); logGroup: %s", streamName, lastEventTs, logGroupName))
// Limit to last 1 hour for new streams to avoid processing months of old data.
lastReadTime = cycleStartTime - 1*time.Hour.Milliseconds()
// Limit new streams to lookback window to avoid processing months of old data.
lastReadTime = cycleStartTime - lookbackWindowMs
}

utils.DebugLog("MonitorLogGroup() - Processing stream: %s, reading from: %d for logGroup: %s", streamName, lastReadTime, logGroupName)
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func main() {
discoverOpenAPISpec = false
}

// Configurable lookback window from accountId (parsed from JWT in SetLookbackWindow)
logprocesser.SetLookbackWindow(databaseAbstractorToken)

for _, lgName := range logGroupNames {
go func(name string) {
utils.LogToCyborg("info", "Starting CloudWatch monitor for log group: "+name)
Expand Down
44 changes: 44 additions & 0 deletions trafficUtil/utils/jwt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package utils

import (
"encoding/base64"
"encoding/json"
"fmt"
"strings"
)

// AccountIDFromToken parses the JWT (e.g. DATABASE_ABSTRACTOR_TOKEN), decodes the payload
// without verification, and returns the accountId claim. Tries "accountId" and "account_id".
// Returns empty string if token is empty, malformed, or claim is missing.
func AccountIDFromToken(token string) string {
if token == "" {
return ""
}
parts := strings.Split(token, ".")
if len(parts) < 2 {
return ""
}
payload, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
return ""
}
var claims map[string]interface{}
if err := json.Unmarshal(payload, &claims); err != nil {
return ""
}
if id, ok := claims["accountId"]; ok {
return accountIDClaim(id)
}
return ""
}

func accountIDClaim(v interface{}) string {
switch s := v.(type) {
case string:
return s
case float64:
return fmt.Sprintf("%.0f", s)
default:
return ""
}
}