diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bf4df63e..99a8de03 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -36,7 +36,16 @@ on: options: - legacy - ebpf - default: legacy + default: legacy + Architecture: + description: "The target architecture(s) for the Docker image." + required: true + type: choice + options: + - both + - arm64 + - amd64 + default: both # A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: @@ -82,11 +91,19 @@ jobs: ECR_REPOSITORY: akto-api-security REGISTRY_ALIAS: p7q3h0z2 IMAGE_TAG: ${{ github.event.inputs.Tag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to ECR..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to ECR with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG" - name: Build, tag, and push the image to Amazon ECR -ebpf @@ -97,11 +114,19 @@ jobs: ECR_REPOSITORY: akto-api-security REGISTRY_ALIAS: p7q3h0z2 IMAGE_TAG: ${{ github.event.inputs.EbpfTag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to ECR..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to ECR with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push echo "::set-output name=image::$ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG" build-docker: @@ -136,11 +161,19 @@ jobs: env: ECR_REGISTRY: aktosecurity IMAGE_TAG: ${{ github.event.inputs.Tag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to DockerHub..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to DockerHub with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG" - name: Build, tag, and push the image to DockerHub - ebpf @@ -149,9 +182,17 @@ jobs: env: ECR_REGISTRY: aktosecurity IMAGE_TAG: ${{ github.event.inputs.EbpfTag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to DockerHub..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to DockerHub with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push echo "::set-output name=image::$ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG" diff --git a/.gitignore b/.gitignore index 7c1c098f..b872b0c3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ mirroring-api-logging .idea/ **/.vscode/ temp -**temp \ No newline at end of file +**temp +data-* \ No newline at end of file diff --git a/Dockerfile.eBPF b/Dockerfile.eBPF index 96884c74..aae6b73e 100644 --- a/Dockerfile.eBPF +++ b/Dockerfile.eBPF @@ -1,4 +1,4 @@ -FROM alpine:3.21 AS base +FROM alpine:3.22 AS base USER root RUN apk add bcc-tools bcc-dev bcc-doc linux-headers build-base diff --git a/ebpf-run.sh b/ebpf-run.sh index a85566c8..2042dc46 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -2,7 +2,10 @@ LOG_FILE="/tmp/dump.log" MAX_LOG_SIZE=${MAX_LOG_SIZE:-10485760} # Default to 10 MB if not set (10 MB = 10 * 1024 * 1024 bytes) -CHECK_INTERVAL=60 # Check interval in seconds +CHECK_INTERVAL=${CHECK_INTERVAL:-60} +CHECK_INTERVAL_MEM=${CHECK_INTERVAL_MEM:-10} # Check interval in seconds (configurable via env) +MEMORY_THRESHOLD=${MEMORY_THRESHOLD:-80} # Kill process at this % memory usage (configurable via env) +GOMEMLIMIT_PERCENT=${GOMEMLIMIT_PERCENT:-60} # GOMEMLIMIT as % of container memory limit (configurable via env) # Function to rotate the log file rotate_log() { @@ -14,6 +17,30 @@ rotate_log() { fi } +# Function to check memory usage and kill process if threshold exceeded +check_memory_and_kill() { + # Get current memory usage in bytes + if [ -f /sys/fs/cgroup/memory.current ]; then + # cgroup v2 + CURRENT_MEM=$(cat /sys/fs/cgroup/memory.current) + elif [ -f /sys/fs/cgroup/memory/memory.usage_in_bytes ]; then + # cgroup v1 + CURRENT_MEM=$(cat /sys/fs/cgroup/memory/memory.usage_in_bytes) + else + return + fi + + # Calculate percentage used + PERCENT_USED=$((CURRENT_MEM * 100 / MEM_LIMIT_BYTES)) + + echo "Memory usage: ${PERCENT_USED}% (${CURRENT_MEM} / ${MEM_LIMIT_BYTES} bytes)" + + if [ "$PERCENT_USED" -ge "$MEMORY_THRESHOLD" ]; then + echo "Memory threshold ${MEMORY_THRESHOLD}% exceeded (${PERCENT_USED}%), killing ebpf-logging process" + pkill -9 ebpf-logging + fi +} + # Start monitoring in the background if [[ "${ENABLE_LOGS}" == "false" ]]; then while true; do @@ -22,10 +49,52 @@ if [[ "${ENABLE_LOGS}" == "false" ]]; then done & fi +# 1. Check if MEM_LIMIT is provided as env variable +if [ -z "$MEM_LIMIT" ]; then + # Not provided, detect and read cgroup memory limits + if [ -f /sys/fs/cgroup/memory.max ]; then + # cgroup v2 + MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory.max) + elif [ -f /sys/fs/cgroup/memory/memory.limit_in_bytes ]; then + # cgroup v1 + MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) + else + # Fallback to free -b (bytes) if cgroup file not found + echo "Neither cgroup v2 nor v1 memory file found, defaulting to free -m" + # Convert from kB to bytes + MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') + fi + + # 2. Handle edge cases: "max" means no strict limit or a very large limit + if [ "$MEM_LIMIT_BYTES" = "max" ]; then + # Arbitrary fallback (1 GiB in bytes here, but adjust as needed) + echo "Cgroup memory limit set to 'max', defaulting to free memory" + MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') + fi + + # 3. Convert the memory limit from bytes to MB (integer division) + MEM_LIMIT_MB=$((MEM_LIMIT_BYTES / 1024 / 1024)) +else + # MEM_LIMIT provided as env variable, treat as MB + echo "Using MEM_LIMIT from environment variable: ${MEM_LIMIT} MB" + MEM_LIMIT_MB=$MEM_LIMIT + # Convert MB to bytes for calculations + MEM_LIMIT_BYTES=$((MEM_LIMIT * 1024 * 1024)) +fi + +echo "Using container memory limit: ${MEM_LIMIT_MB} MB" + +# Set GOMEMLIMIT for the Go process +GOMEMLIMIT_MB=$((MEM_LIMIT_MB * GOMEMLIMIT_PERCENT / 100)) +export GOMEMLIMIT="${GOMEMLIMIT_MB}MiB" +echo "Setting GOMEMLIMIT to: ${GOMEMLIMIT} (${GOMEMLIMIT_PERCENT}% of ${MEM_LIMIT_MB} MB)" + +# Start memory monitoring in the background + while : do if [[ "${ENABLE_LOGS}" == "false" ]]; then - ./ebpf-logging >> "$LOG_FILE" 2>&1 + ./ebpf-logging >> "$LOG_FILE" 2>&1 else ./ebpf-logging fi diff --git a/ebpf/bpfwrapper/eventCallbacks.go b/ebpf/bpfwrapper/eventCallbacks.go index 67b04dfb..22082b28 100644 --- a/ebpf/bpfwrapper/eventCallbacks.go +++ b/ebpf/bpfwrapper/eventCallbacks.go @@ -178,6 +178,7 @@ func SocketDataEventCallback(inputChan chan []byte, connectionFactory *connectio "data", dataStr, "rc", event.Attr.ReadEventsCount, "wc", event.Attr.WriteEventsCount, - "ssl", event.Attr.Ssl) + "ssl", event.Attr.Ssl, + "bytesSent", bytesSent) } } diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index ac762cd8..e0136918 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -73,13 +73,15 @@ func convertToSingleByteArr(bufMap map[int][]byte) []byte { var ( disableEgress = false maxActiveConnections = 4096 - inactivityThreshold = 3 * time.Second + inactivityThreshold = 7 * time.Second // Value in MB bufferMemThreshold = 400 // unique id of daemonset uniqueDaemonsetId = uuid.New().String() trackerDataProcessInterval = 100 + + socketDataEventBytesThreshold = 10 * 1024 * 1024 ) func init() { @@ -89,6 +91,7 @@ func init() { utils.InitVar("TRAFFIC_BUFFER_THRESHOLD", &bufferMemThreshold) utils.InitVar("AKTO_MEM_SOFT_LIMIT", &bufferMemThreshold) utils.InitVar("TRACKER_DATA_PROCESS_INTERVAL", &trackerDataProcessInterval) + utils.InitVar("SOCKET_DATA_EVENT_BYTES_THRESHOLD", &socketDataEventBytesThreshold) } func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool) { @@ -201,6 +204,25 @@ func (factory *Factory) CreateIfNotExists(connectionID structs.ConnID) { } } +// resetTimer stops, drains, and resets the timer to the given duration. +func resetTimer(t *time.Timer, d time.Duration) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + t.Reset(d) +} + +// Worker lifecycle: +// ACTIVE: +// - socket data/open -> reset inactivity timer on each event +// - socket close -> schedule delayed termination +// - inactivity timer -> terminate immediately +// +// TERMINATION is final and happens exactly once. +// either due to inactivityThreshold or due to socker close event func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracker, ch chan interface{}) { go func(connID structs.ConnID, tracker *Tracker, ch chan interface{}) { @@ -216,9 +238,17 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke case *structs.SocketDataEvent: utils.LogProcessing("Received data event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddDataEvent(*e) + if tracker.GetSentBytes() + tracker.GetRecvBytes() > uint64(socketDataEventBytesThreshold) { + utils.LogProcessing("Socket Data threshold data breached, processing current data", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) + factory.StopProcessing(connID) + return + }else{ + resetTimer(inactivityTimer, inactivityThreshold) + } case *structs.SocketOpenEvent: utils.LogProcessing("Received open event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddOpenEvent(*e) + resetTimer(inactivityTimer, inactivityThreshold) case *structs.SocketCloseEvent: utils.LogProcessing("Received close event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddCloseEvent(*e) @@ -230,15 +260,13 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke case <-delayedDeleteChan: utils.LogProcessing("Stopping go routine (delayed close)", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) - factory.ProcessAndStopWorker(connID) - factory.DeleteWorker(connID) + factory.StopProcessing(connID) return case <-inactivityTimer.C: // Eat the go routine after inactive threshold, process the tracker and stop the worker utils.LogProcessing("Inactivity threshold reached, marking connection as inactive and processing", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) - factory.ProcessAndStopWorker(connID) - factory.DeleteWorker(connID) + factory.StopProcessing(connID) utils.LogProcessing("Stopping go routine", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) return } @@ -246,6 +274,11 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke }(connectionID, tracker, ch) } +func (factory *Factory) StopProcessing(connID structs.ConnID){ + factory.ProcessAndStopWorker(connID) + factory.DeleteWorker(connID) +} + func (factory *Factory) ProcessAndStopWorker(connectionID structs.ConnID) { tracker, connExists := factory.getTracker(connectionID) if connExists { diff --git a/ebpf/connections/parser.go b/ebpf/connections/parser.go index b8046d1a..65b4a69d 100644 --- a/ebpf/connections/parser.go +++ b/ebpf/connections/parser.go @@ -5,5 +5,18 @@ import ( ) func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string) { - kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName) + ctx := kafkaUtil.TrafficContext{ + SourceIP: ip, + DestIP: destIp, + VxlanID: 0, + IsPending: false, + TrafficSource: "MIRRORING", + IsComplete: isComplete, + Direction: direction, + ProcessID: uint32(id >> 32), + SocketFD: fd, + DaemonsetIdentifier: daemonsetIdentifier, + HostName: hostName, + } + kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ctx) } diff --git a/ebpf/connections/tracker.go b/ebpf/connections/tracker.go index 0d8d8acb..3056d0be 100644 --- a/ebpf/connections/tracker.go +++ b/ebpf/connections/tracker.go @@ -108,3 +108,11 @@ func (conn *Tracker) AddCloseEvent(event structs.SocketCloseEvent) { conn.closeTimestamp = uint64(time.Now().UnixNano()) conn.lastAccessTimestamp = uint64(time.Now().UnixNano()) } + +func (conn *Tracker) GetSentBytes() uint64 { + return conn.sentBytes +} + +func (conn *Tracker) GetRecvBytes() uint64 { + return conn.recvBytes +} \ No newline at end of file diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index 4cee7779..a17a1251 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -7,6 +7,7 @@ #define socklen_t size_t #define MAX_MSG_SIZE 30720 +#define CHUNK_LIMIT CHUNK_SIZE_LIMIT #define LOOP_LIMIT 42 #define ARCH_TYPE 1 @@ -370,8 +371,35 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data socket_data_event->conn_start_ns = conn_info->conn_start_ns; socket_data_event->port = conn_info->port; socket_data_event->ip = conn_info->ip; - socket_data_event->bytes_sent = is_send ? 1 : -1; socket_data_event->ssl = conn_info->ssl; + + int bytes_sent = 0; + size_t size_to_save = 0; + int i =0; + #pragma unroll + for (i = 0; i < CHUNK_LIMIT; ++i) { + const int bytes_remaining = bytes_exchanged - bytes_sent; + + if (bytes_remaining <= 0) { + break; + } + size_t current_size = (bytes_remaining > MAX_MSG_SIZE && (i != CHUNK_LIMIT - 1)) ? MAX_MSG_SIZE : bytes_remaining; + + size_t current_size_minus_1 = current_size - 1; + asm volatile("" : "+r"(current_size_minus_1) :); + current_size = current_size_minus_1 + 1; + + if (current_size > MAX_MSG_SIZE) { + current_size = MAX_MSG_SIZE; + } + + if (current_size_minus_1 < MAX_MSG_SIZE) { + bpf_probe_read(&socket_data_event->msg, current_size, args->buf + bytes_sent); + size_to_save = current_size; + } else if (current_size_minus_1 < 0x7fffffff) { + bpf_probe_read(&socket_data_event->msg, MAX_MSG_SIZE, args->buf + bytes_sent); + size_to_save = MAX_MSG_SIZE; + } if (is_send){ conn_info->writeEventsCount = (conn_info->writeEventsCount) + 1u; @@ -385,29 +413,18 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data if(PRINT_BPF_LOGS){ bpf_trace_printk("pid: %d conn-id:%d, fd: %d", id, conn_info->id, conn_info->fd); + bpf_trace_printk("current_size: %d i:%d, bytes_exchanged: %d", current_size, i, bytes_exchanged); unsigned long tdfd = ((id & 0xffff) << 32) + conn_info->fd; bpf_trace_printk("rwc: %d tdfd: %llu data: %s", (socket_data_event->readEventsCount*10000 + socket_data_event->writeEventsCount%10000),tgid_fd, socket_data_event->msg); } - size_t bytes_exchanged_minus_1 = bytes_exchanged - 1; - asm volatile("" : "+r"(bytes_exchanged_minus_1) :); - bytes_exchanged = bytes_exchanged_minus_1 + 1; - - size_t size_to_save = 0; - if (bytes_exchanged_minus_1 < MAX_MSG_SIZE) { - bpf_probe_read(&socket_data_event->msg, bytes_exchanged, args->buf); - size_to_save = bytes_exchanged; - socket_data_event->msg[size_to_save] = '\\0'; - } else if (bytes_exchanged_minus_1 < 0x7fffffff) { - bpf_probe_read(&socket_data_event->msg, MAX_MSG_SIZE, args->buf); - size_to_save = MAX_MSG_SIZE; - } - - + socket_data_event->bytes_sent = is_send ? 1 : -1; socket_data_event->bytes_sent *= size_to_save; - socket_data_events.perf_submit(ret, socket_data_event, sizeof(struct socket_data_event_t) - MAX_MSG_SIZE + size_to_save); + bytes_sent += current_size; + } + } static __inline void process_syscall_data_vecs(struct pt_regs* ret, struct data_args_t* args, u64 id, bool is_send){ diff --git a/ebpf/main.go b/ebpf/main.go index 7fbf6d67..8a2715cf 100644 --- a/ebpf/main.go +++ b/ebpf/main.go @@ -32,6 +32,12 @@ import ( var source string = "" +func replaceBpfChunkSizeMacros() { + chunkSizeLimit := 4 + trafficUtils.InitVar("BPF_CHUNK_SIZE_LIMIT", &chunkSizeLimit) + source = strings.Replace(source, "CHUNK_SIZE_LIMIT", strconv.Itoa(chunkSizeLimit), -1) +} + func replaceBpfLogsMacros() { printBpfLogsEnv := os.Getenv("PRINT_BPF_LOGS") @@ -92,6 +98,7 @@ func run() { source = string(byteString) replaceBpfLogsMacros() + replaceBpfChunkSizeMacros() replaceMaxConnectionMapSize() replaceArchType() @@ -226,7 +233,7 @@ func run() { slog.Info("Stopping pod watcher") close(stopCh) } - + slog.Info("signaled to terminate") } diff --git a/ebpf/uprobeBuilder/process/processFactory.go b/ebpf/uprobeBuilder/process/processFactory.go index c83d73bb..39b26286 100644 --- a/ebpf/uprobeBuilder/process/processFactory.go +++ b/ebpf/uprobeBuilder/process/processFactory.go @@ -118,6 +118,10 @@ func (processFactory *ProcessFactory) AddNewProcessesToProbe(bpfModule *bcc.Modu // openssl probes here are being attached on dynamically linked SSL libraries only. attached, err := ssl.TryOpensslProbes(libraries, bpfModule) + if len(containers) == 0 { + containers = append(containers, "unknown") + } + if attached { p := Process{ pid: pid, @@ -158,9 +162,7 @@ func (processFactory *ProcessFactory) AddNewProcessesToProbe(bpfModule *bcc.Modu } else if err != nil { slog.Error("Node probing error", "pid", pid, "error", err) } - processFactory.unattachedProcess[pid] = true - } } } diff --git a/main.go b/main.go index aff7d276..ebc68d17 100644 --- a/main.go +++ b/main.go @@ -158,10 +158,20 @@ func (s *myStream) ReassemblyComplete() { } func tryReadFromBD(bd *bidi, isPending bool) { - - kafkaUtil.ParseAndProduce(bd.a.bytes, bd.b.bytes, - bd.key.net.Src().String(), bd.key.net.Dst().String(), bd.vxlanID, isPending, bd.source, true, 1, 0, 0, "0") - + ctx := kafkaUtil.TrafficContext{ + SourceIP: bd.key.net.Src().String(), + DestIP: bd.key.net.Dst().String(), + VxlanID: bd.vxlanID, + IsPending: isPending, + TrafficSource: bd.source, + IsComplete: true, + Direction: 1, + ProcessID: 0, + SocketFD: 0, + DaemonsetIdentifier: "0", + HostName: "", + } + kafkaUtil.ParseAndProduce(bd.a.bytes, bd.b.bytes, ctx) } // maybeFinish will wait until both directions are complete, then print out diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index fdecb1dd..0dac65f5 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -7,20 +7,25 @@ import ( "encoding/json" "fmt" "log/slog" + "math/rand" "os" "strconv" "strings" + "sync" "time" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/apiProcessor" trafficpb "github.com/akto-api-security/mirroring-api-logging/trafficUtil/protobuf/traffic_payload" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" + "github.com/google/uuid" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" "google.golang.org/protobuf/proto" ) var kafkaWriter *kafka.Writer +var kafkaWriterMutex sync.RWMutex var KafkaErrMsgCount = 0 var KafkaErrMsgEpoch = time.Now() var BytesInThreshold = 500 * 1024 * 1024 @@ -29,12 +34,29 @@ var useTLS = false var InsecureSkipVerify = true var tlsCACertPath = "./ca.crt" +var isAuthImplemented = false +var kafkaUsername = "" +var kafkaPassword = "" + +var kafkaErrorThreshold = 500 +var kafkaReconnectIntervalMinutes = -1 +var heartbeatIntervalSeconds = 60 +var uniqueDaemonsetId = uuid.New().String() +var moduleType = "TRAFFIC_COLLECTOR" + func init() { utils.InitVar("USE_TLS", &useTLS) utils.InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify) utils.InitVar("TLS_CA_CERT_PATH", &tlsCACertPath) + utils.InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) + utils.InitVar("KAFKA_USERNAME", &kafkaUsername) + utils.InitVar("KAFKA_PASSWORD", &kafkaPassword) + + utils.InitVar("KAFKA_ERROR_THRESHOLD", &kafkaErrorThreshold) + utils.InitVar("KAFKA_RECONNECT_INTERVAL_MINUTES", &kafkaReconnectIntervalMinutes) + utils.InitVar("KAFKA_HEARTBEAT_INTERVAL_SECONDS", &heartbeatIntervalSeconds) } func InitKafka() { @@ -63,19 +85,20 @@ func InitKafka() { kafka_batch_size, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_SIZE")) if e != nil { - utils.PrintLog("AKTO_TRAFFIC_BATCH_SIZE should be valid integer") - return + kafka_batch_size = 100 } kafka_batch_time_secs, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_TIME_SECS")) if e != nil { - utils.PrintLog("AKTO_TRAFFIC_BATCH_TIME_SECS should be valid integer") - return + kafka_batch_time_secs = 10 } kafka_batch_time_secs_duration := time.Duration(kafka_batch_time_secs) for { + kafkaWriterMutex.Lock() kafkaWriter = getKafkaWriter(kafka_url, kafka_batch_size, kafka_batch_time_secs_duration*time.Second) + kafkaWriterMutex.Unlock() + utils.LogMemoryStats() utils.PrintLog("logging kafka stats before pushing message") LogKafkaStats() @@ -85,16 +108,28 @@ func InitKafka() { out, _ := json.Marshal(value) ctx := context.Background() - err := ProduceStr(ctx, string(out), "testKafkaConnection", "testKafkaConnectionHost") + err := ProduceStr(ctx, string(out), "testKafkaConnection", "testKafkaConnectionHost", "") utils.PrintLog("logging kafka stats post pushing message") LogKafkaStats() if err != nil { slog.Error("error establishing connection with kafka, sending message failed, retrying in 2 seconds", "error", err) + kafkaWriterMutex.Lock() kafkaWriter.Close() + kafkaWriterMutex.Unlock() time.Sleep(time.Second * 2) } else { utils.PrintLog("connection establishing with kafka successfully") + kafkaWriterMutex.Lock() kafkaWriter.Completion = kafkaCompletion() + kafkaWriterMutex.Unlock() + + // Start periodic reconnection routine + go periodicKafkaReconnect(kafka_url, kafka_batch_size, kafka_batch_time_secs_duration*time.Second) + slog.Info("Started Kafka periodic reconnection routine", "interval_minutes", kafkaReconnectIntervalMinutes) + + // Start heartbeat routine + go sendKafkaHeartbeat() + slog.Info("Started Kafka heartbeat routine", "interval_seconds", heartbeatIntervalSeconds) break } } @@ -105,17 +140,138 @@ func kafkaCompletion() func(messages []kafka.Message, err error) { if err != nil { KafkaErrMsgCount += len(messages) slog.Error("kafka error message", "err", err, "count", KafkaErrMsgCount, "messagesCount", len(messages)) + + if KafkaErrMsgCount > kafkaErrorThreshold { + slog.Error("kafka error count exceeded threshold, restarting module", "count", KafkaErrMsgCount, "threshold", kafkaErrorThreshold) + os.Exit(1) + } } else { utils.PrintLog("kafka messages sent successfully", "messagesCount", len(messages)) } } } -func Close() { - kafkaWriter.Close() +func periodicKafkaReconnect(kafka_url string, kafka_batch_size int, kafka_batch_time_secs_duration time.Duration) { + if kafkaReconnectIntervalMinutes <= 0 { + slog.Info("Kafka reconnection disabled", "interval", kafkaReconnectIntervalMinutes) + return + } + + ticker := time.NewTicker(time.Duration(kafkaReconnectIntervalMinutes) * time.Minute) + defer ticker.Stop() + + for range ticker.C { + slog.Info("Starting periodic Kafka reconnection", "interval_minutes", kafkaReconnectIntervalMinutes) + + // Create new writer + newWriter := getKafkaWriter(kafka_url, kafka_batch_size, kafka_batch_time_secs_duration) + newWriter.Completion = kafkaCompletion() + + // Test the new connection + ctx := context.Background() + value := map[string]string{ + "testConnectionString": "periodicReconnect", + } + out, _ := json.Marshal(value) + testMsg := kafka.Message{ + Topic: "akto.api.logs", + Value: out, + } + + err := newWriter.WriteMessages(ctx, testMsg) + if err != nil { + slog.Error("Failed to test new Kafka connection during periodic reconnect, keeping old connection", "error", err) + newWriter.Close() + continue + } + + // Replace old writer with new one + kafkaWriterMutex.Lock() + oldWriter := kafkaWriter + kafkaWriter = newWriter + kafkaWriterMutex.Unlock() + + // Close old writer + if oldWriter != nil { + slog.Info("Closing old Kafka writer") + oldWriter.Close() + } + + slog.Info("Kafka reconnection completed successfully") + } +} + +func getDaemonPodName() string { + aktoAgentName := os.Getenv("AKTO_AGENT_NAME") + podName := os.Getenv("POD_NAME") + nodeName := os.Getenv("NODE_NAME") + + if aktoAgentName != "" { + return fmt.Sprintf("akto-tc:%s", aktoAgentName) + } + + if podName != "" && nodeName != "" { + return fmt.Sprintf("akto-tc:%s:%s", podName, nodeName) + } + + hostname := os.Getenv("HOSTNAME") + if hostname == "" { + hostname = fmt.Sprintf("daemon-%s", uniqueDaemonsetId[:8]) + } + return fmt.Sprintf("akto-tc:%s", hostname) +} + +func getImageVersion() string { + imageVersion := os.Getenv("AKTO_IMAGE_VERSION") + if imageVersion == "" { + imageVersion = "aktosecurity/mirror-api-logging:k8s-ebpf" + } + return imageVersion +} + +func sendKafkaHeartbeat() { + if heartbeatIntervalSeconds <= 0 { + slog.Info("Kafka heartbeat disabled", "interval", heartbeatIntervalSeconds) + return + } + + daemonPodName := getDaemonPodName() + imageVersion := getImageVersion() + + slog.Debug("Starting Kafka heartbeat routine", "interval_seconds", heartbeatIntervalSeconds, "daemonPod", daemonPodName, "daemonId", uniqueDaemonsetId) + ctx := context.Background() + + for { + jitter := time.Duration(1+rand.Intn(5)) * time.Second + sleepDuration := time.Duration(heartbeatIntervalSeconds)*time.Second + jitter + + slog.Debug("Sleeping before next heartbeat", "base_interval", heartbeatIntervalSeconds, "jitter_seconds", jitter.Seconds(), "total_sleep", sleepDuration.Seconds()) + time.Sleep(sleepDuration) + + // Send single heartbeat for this daemon + heartbeatMessage := map[string]string{ + "type": "heartbeat", + "daemonId": uniqueDaemonsetId, + "daemonPodName": daemonPodName, + "timestamp": fmt.Sprint(time.Now().Unix()), + "moduleType": moduleType, + "imageVersion": imageVersion, + } + + slog.Debug("Sending Kafka heartbeat", "daemonPod", daemonPodName, "imageVersion", imageVersion, "heartbeatMessage", heartbeatMessage) + err := ProduceHeartbeat(ctx, heartbeatMessage) + if err != nil { + slog.Error("Failed to send heartbeat to Kafka", "error", err) + } + } } func LogKafkaStats() { + kafkaWriterMutex.RLock() + defer kafkaWriterMutex.RUnlock() + if kafkaWriter == nil { + return + } stats := kafkaWriter.Stats() slog.Debug("Kafka Stats", "dials", stats.Dials, @@ -200,7 +356,11 @@ func Produce(ctx context.Context, value *trafficpb.HttpResponseParam) error { Value: protoBytes, } - err = kafkaWriter.WriteMessages(ctx, msg) + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err = writer.WriteMessages(ctx, msg) if err != nil { slog.Error("Kafka write for threat failed", "topic", topic, "error", err) return err @@ -234,6 +394,31 @@ const ( LogTypeDebug = "DEBUG" ) +func ProduceHeartbeat(ctx context.Context, heartbeatData map[string]string) error { + out, err := json.Marshal(heartbeatData) + if err != nil { + return err + } + + topic := "akto.daemonset.producer.heartbeats" + msg := kafka.Message{ + Topic: topic, + Value: []byte(string(out)), + } + + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err = writer.WriteMessages(ctx, msg) + + if err != nil { + slog.Error("ERROR while writing heartbeat messages", "topic", topic, "error", err) + return err + } + return nil +} + func ProduceLogs(ctx context.Context, message string, logType string) error { value := map[string]string{ "message": message, @@ -250,7 +435,11 @@ func ProduceLogs(ctx context.Context, message string, logType string) error { Value: []byte(string(out)), } - err := kafkaWriter.WriteMessages(ctx, msg) + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err := writer.WriteMessages(ctx, msg) if err != nil { slog.Error("ERROR while writing messages", "topic", topic, "error", err) @@ -259,21 +448,43 @@ func ProduceLogs(ctx context.Context, message string, logType string) error { return nil } -func ProduceStr(ctx context.Context, message string, url, reqHost string) error { - // initialize the writer with the broker addresses, and the topic +// buildCollectionDetailsHeader creates the collection_details Kafka header +// Format: "host|method|url" +// Returns nil if any parameter is empty (skip header for incomplete messages) +func buildCollectionDetailsHeader(host, method, url string) []kafka.Header { + if host == "" || method == "" || url == "" { + return nil + } + + headerValue := fmt.Sprintf("%s|%s|%s", host, method, url) + return []kafka.Header{ + { + Key: "collection_details", + Value: []byte(headerValue), + }, + } +} + +func ProduceStr(ctx context.Context, message string, url, reqHost, method string) error { topic := "akto.api.logs" + msg := kafka.Message{ - Topic: topic, - Value: []byte(message), + Topic: topic, + Value: []byte(message), + Headers: buildCollectionDetailsHeader(reqHost, method, url), } - err := kafkaWriter.WriteMessages(ctx, msg) + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err := writer.WriteMessages(ctx, msg) if err != nil { slog.Error("ERROR while writing messages", "topic", topic, "error", err) return err } - checkDebugUrlAndPrint(url, reqHost, "Kafka write successful: "+message) + checkDebugUrlAndPrint(url, reqHost, "Kafka write successful: ") return nil } @@ -308,11 +519,22 @@ func getKafkaWriter(kafkaURL string, batchSize int, batchTimeout time.Duration) Compression: kafka.Lz4, } + transport := &kafka.Transport{} + if useTLS { tlsConfig, _ := NewTLSConfig(tlsCACertPath) - kafkaWriter.Transport = &kafka.Transport{ - TLS: tlsConfig, + transport.TLS = tlsConfig + } + + // Add SASL authentication if enabled + if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" { + slog.Info("Configuring SASL plain authentication", "username", kafkaUsername) + transport.SASL = plain.Mechanism{ + Username: kafkaUsername, + Password: kafkaPassword, } } + + kafkaWriter.Transport = transport return &kafkaWriter } diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index b5acdfd0..45e198d0 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -21,6 +21,213 @@ import ( "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" ) +// TrafficContext holds metadata about the captured traffic. +// This consolidates the many parameters previously passed to ParseAndProduce. +type TrafficContext struct { + SourceIP string + DestIP string + VxlanID int + IsPending bool + TrafficSource string + IsComplete bool + Direction int + ProcessID uint32 // Extracted from idfd >> 32 + SocketFD uint32 + DaemonsetIdentifier string + HostName string +} + +// ParsedTraffic holds the parsed HTTP requests and responses with their bodies. +type ParsedTraffic struct { + Requests []http.Request + RequestBodies []string + Responses []http.Response + ResponseBodies []string +} + +// HeaderSet holds HTTP headers in both protobuf and string map formats. +type HeaderSet struct { + Protobuf map[string]*trafficpb.StringList + StringMap map[string]string +} + +// ConvertedHeaders holds converted headers for both request and response. +type ConvertedHeaders struct { + Request HeaderSet + Response HeaderSet + DebugID string // x-debug-token value if present +} + +// PayloadInput contains the data needed to build traffic payloads. +type PayloadInput struct { + Request *http.Request + Response *http.Response + Headers ConvertedHeaders + RequestBody string + ResponseBody string + SourceIP string // IP after GetSourceIp processing + Context TrafficContext +} + +// buildProtobufPayload creates the protobuf payload for the threat client. +func buildProtobufPayload(input PayloadInput) *trafficpb.HttpResponseParam { + return &trafficpb.HttpResponseParam{ + Method: input.Request.Method, + Path: input.Request.URL.String(), + RequestHeaders: input.Headers.Request.Protobuf, + ResponseHeaders: input.Headers.Response.Protobuf, + RequestPayload: input.RequestBody, + ResponsePayload: input.ResponseBody, + Ip: input.SourceIP, + Time: int32(time.Now().Unix()), + StatusCode: int32(input.Response.StatusCode), + Type: string(input.Request.Proto), + Status: input.Response.Status, + AktoAccountId: fmt.Sprint(1000000), + AktoVxlanId: fmt.Sprint(input.Context.VxlanID), + IsPending: input.Context.IsPending, + Source: input.Context.TrafficSource, + } +} + +// buildJSONPayload creates the JSON map payload (legacy format, TODO: remove). +func buildJSONPayload(input PayloadInput) map[string]string { + reqHeaderString, _ := json.Marshal(input.Headers.Request.StringMap) + respHeaderString, _ := json.Marshal(input.Headers.Response.StringMap) + + return map[string]string{ + "path": input.Request.URL.String(), + "requestHeaders": string(reqHeaderString), + "responseHeaders": string(respHeaderString), + "method": input.Request.Method, + "requestPayload": input.RequestBody, + "responsePayload": input.ResponseBody, + "ip": input.Context.SourceIP, + "destIp": input.Context.DestIP, + "time": fmt.Sprint(time.Now().Unix()), + "statusCode": fmt.Sprint(input.Response.StatusCode), + "type": string(input.Request.Proto), + "status": input.Response.Status, + "akto_account_id": fmt.Sprint(1000000), + "akto_vxlan_id": fmt.Sprint(input.Context.VxlanID), + "is_pending": fmt.Sprint(input.Context.IsPending), + "source": input.Context.TrafficSource, + "direction": fmt.Sprint(input.Context.Direction), + "process_id": fmt.Sprint(input.Context.ProcessID), + "socket_id": fmt.Sprint(input.Context.SocketFD), + "daemonset_id": fmt.Sprint(input.Context.DaemonsetIdentifier), + "enable_graph": fmt.Sprint(utils.EnableGraph), + } +} + +// resolvePodLabels resolves pod labels for inbound traffic and adds them to the value map. +func resolvePodLabels(value map[string]string, ctx TrafficContext, url, host string) { + + if PodInformerInstance == nil { + checkDebugUrlAndPrint(url, host, "Pod labels not resolved, PodInformerInstance is nil") + return + } + + if ctx.Direction == utils.DirectionOutbound { + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels not resolved for outbound request, podName: %s, direction: %v", ctx.HostName, ctx.Direction)) + return + } + + processName := PodInformerInstance.GetProcessNameByProcessId(int32(ctx.ProcessID)) + if strings.Contains(processName, "envoy") { + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels not resolved for envoy request, podName: %s, direction: %v", ctx.HostName, ctx.Direction)) + return + } + + if ctx.HostName == "" { + checkDebugUrlAndPrint(url, host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(ctx.ProcessID)) + slog.Debug("Failed to resolve pod name, hostName is empty for ", "processId", ctx.ProcessID, "hostName", ctx.HostName) + return + } + + podLabels, err := PodInformerInstance.ResolvePodLabels(ctx.HostName, url, host) + if err != nil { + slog.Error("Failed to resolve pod labels", "hostName", ctx.HostName, "error", err) + checkDebugUrlAndPrint(url, host, "Error resolving pod labels "+ctx.HostName) + return + } + + value["tag"] = podLabels + checkDebugUrlAndPrint(url, host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+ctx.HostName) + slog.Debug("Pod labels", "podName", ctx.HostName, "labels", podLabels) +} + +// convertHeaders converts HTTP headers to both protobuf and string map formats in a single pass. +func convertHeaders(req *http.Request, resp *http.Response, shouldPrint bool) ConvertedHeaders { + result := ConvertedHeaders{ + Request: HeaderSet{ + Protobuf: make(map[string]*trafficpb.StringList), + StringMap: make(map[string]string), + }, + Response: HeaderSet{ + Protobuf: make(map[string]*trafficpb.StringList), + StringMap: make(map[string]string), + }, + } + + // Convert request headers + for name, values := range req.Header { + for _, value := range values { + result.Request.Protobuf[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + if shouldPrint && strings.EqualFold(name, "x-debug-token") { + result.DebugID = value + } + result.Request.StringMap[name] = value + } + } + result.Request.Protobuf["host"] = &trafficpb.StringList{Values: []string{req.Host}} + result.Request.StringMap["host"] = req.Host + + // Convert response headers + for name, values := range resp.Header { + for _, value := range values { + result.Response.Protobuf[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + result.Response.StringMap[name] = value + } + } + + return result +} + +// shouldProcessRequest checks all filter conditions and returns true if the request should be processed. +func shouldProcessRequest(req *http.Request, reqHeaders map[string]string, ctx TrafficContext) bool { + if !IsValidMethod(req.Method) { + return false + } + + if !utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaders) { + return false + } + + if utils.IgnoreIpTraffic && utils.CheckIfIp(req.Host) { + return false + } + + if utils.IgnoreCloudMetadataCalls && req.Host == "169.254.169.254" { + return false + } + + if utils.IgnoreEnvoyProxycalls && ctx.SourceIP == utils.EnvoyProxyIp && ctx.Direction == utils.DirectionOutbound { + slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", ctx.SourceIP, "url", req.URL.String(), "host", req.Host) + return false + } + + if utils.FilterPacket(reqHeaders) { + return false + } + + return true +} + var ( goodRequests = 0 badRequests = 0 @@ -64,10 +271,6 @@ func init() { // Start ticker to read debug URLs from file every 30 seconds go func() { - if !utils.FileLoggingEnabled { - slog.Info("File logging is not enabled, skipping debug URL file watcher") - return - } ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { @@ -129,13 +332,13 @@ func checkDebugUrlAndPrint(url string, host string, message string) { for _, debugString := range DebugStrings { if strings.Contains(url, debugString) { ctx := context.Background() - logMsg := fmt.Sprintf("%s : %s", message, url) + logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message) utils.PrintLogDebug(logMsg) go ProduceLogs(ctx, logMsg, LogTypeInfo) break } else if strings.Contains(host, debugString) { ctx := context.Background() - logMsg := fmt.Sprintf("%s : %s", message, host) + logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message) utils.PrintLogDebug(logMsg) go ProduceLogs(ctx, logMsg, LogTypeInfo) break @@ -173,22 +376,13 @@ func IsValidMethod(method string) bool { return ok } -func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, destIp string, vxlanID int, isPending bool, - trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string) { - - if checkAndUpdateBandwidthProcessed(0) { - return - } - - shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") - if shouldPrint { - slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) - } - - reader := bufio.NewReader(bytes.NewReader(receiveBuffer)) - i := 0 +// parseHTTPTraffic parses HTTP requests and responses from raw byte buffers. +// Returns nil if parsing fails (errors are logged). +func parseHTTPTraffic(reqBuffer, respBuffer []byte, shouldPrint bool) *ParsedTraffic { + // Parse requests + reader := bufio.NewReader(bytes.NewReader(reqBuffer)) requests := []http.Request{} - requestsContent := []string{} + requestBodies := []string{} for { req, err := http.ReadRequest(reader) @@ -196,48 +390,48 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d break } else if err != nil { utils.PrintLog(fmt.Sprintf("HTTP-request error: %s \n", err)) - return + return nil } body, err := io.ReadAll(req.Body) req.Body.Close() if err != nil { utils.PrintLog(fmt.Sprintf("Got body err: %s\n", err)) - return + body = []byte{} } requests = append(requests, *req) - requestsContent = append(requestsContent, string(body)) - i++ + requestBodies = append(requestBodies, string(body)) } if shouldPrint { - slog.Debug("ParseAndProduce", "count", i) + slog.Debug("parseHTTPTraffic", "requestCount", len(requests)) } + if len(requests) == 0 { - return + return nil } - reader = bufio.NewReader(bytes.NewReader(sentBuffer)) - i = 0 - + // Parse responses + reader = bufio.NewReader(bytes.NewReader(respBuffer)) responses := []http.Response{} - responsesContent := []string{} + responseBodies := []string{} for { - resp, err := http.ReadResponse(reader, nil) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { utils.PrintLog(fmt.Sprintf("HTTP-Response error: %s\n", err)) - return + return nil } body, err := io.ReadAll(resp.Body) if err != nil { utils.PrintLog(fmt.Sprintf("Got err reading resp body: %s\n", err)) - return + body = []byte{} } + + // Handle gzip/deflate decompression encoding := resp.Header["Content-Encoding"] var r io.Reader r = bytes.NewBuffer(body) @@ -245,14 +439,14 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d r, err = gzip.NewReader(r) if err != nil { utils.PrintLog(fmt.Sprintf("HTTP-gunzip "+"Failed to gzip decode: %s", err)) - return + body = []byte{} } } if err == nil { body, err = io.ReadAll(r) if err != nil { utils.PrintLog(fmt.Sprintf("Failed to read decompressed body: %s\n", err)) - return + body = []byte{} } if _, ok := r.(*gzip.Reader); ok { r.(*gzip.Reader).Close() @@ -260,20 +454,47 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d } responses = append(responses, *resp) - responsesContent = append(responsesContent, string(body)) + responseBodies = append(responseBodies, string(body)) + } - i++ + if shouldPrint { + slog.Debug("parseHTTPTraffic", "responseCount", len(responses)) } + return &ParsedTraffic{ + Requests: requests, + RequestBodies: requestBodies, + Responses: responses, + ResponseBodies: responseBodies, + } +} + +func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, ctx TrafficContext) { + + if checkAndUpdateBandwidthProcessed(0) { + return + } + + shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") if shouldPrint { + slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) + } - slog.Debug("ParseAndProduce", "count", i) + parsed := parseHTTPTraffic(receiveBuffer, sentBuffer, shouldPrint) + if parsed == nil { + return } + + requests := parsed.Requests + requestsContent := parsed.RequestBodies + responses := parsed.Responses + responsesContent := parsed.ResponseBodies + if len(requests) != len(responses) { if shouldPrint { - slog.Debug("Len req-res mismatch", "lenRequests", len(requests), "lenResponses", len(responses), "lenReceiveBuffer", len(receiveBuffer), "lenSentBuffer", len(sentBuffer), "isComplete", isComplete) + slog.Debug("Len req-res mismatch", "lenRequests", len(requests), "lenResponses", len(responses), "lenReceiveBuffer", len(receiveBuffer), "lenSentBuffer", len(sentBuffer), "isComplete", ctx.IsComplete) } - if isComplete { + if ctx.IsComplete { return } correctLen := len(requests) @@ -283,192 +504,60 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d responses = responses[:correctLen] requests = requests[:correctLen] + responsesContent = responsesContent[:correctLen] + requestsContent = requestsContent[:correctLen] } - i = 0 - for { - if len(requests) < i+1 { - break - } + bgCtx := context.Background() + for i := 0; i < len(requests); i++ { req := &requests[i] resp := &responses[i] - - if !IsValidMethod(req.Method) { - continue - } - - id := "" - - // build req headers for threat client - reqHeader := make(map[string]*trafficpb.StringList) - for name, values := range req.Header { - // Loop over all values for the name. - for _, value := range values { - reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - } - ip := GetSourceIp(reqHeader, sourceIp) - - reqHeader["host"] = &trafficpb.StringList{ - Values: []string{req.Host}, - } - - reqHeaderStr := make(map[string]string) - for name, values := range req.Header { - // Loop over all values for the name. - for _, value := range values { - if shouldPrint && - strings.EqualFold(name, "x-debug-token") { - id = value - } - reqHeaderStr[name] = value - } - } - - reqHeaderStr["host"] = req.Host - - passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) - //printLog("Req header: " + mapToString(reqHeaderStr)) - //printLog(fmt.Sprintf("passes %t", passes)) - - if !passes { - i++ - continue - } - - if utils.IgnoreIpTraffic && utils.CheckIfIp(req.Host) { - i++ - continue - } - - if utils.IgnoreCloudMetadataCalls && req.Host == "169.254.169.254" { - i++ - continue - } - - if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { - slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "url", req.URL.String(), "host", req.Host) - i++ - continue - } - - var skipPacket = utils.FilterPacket(reqHeaderStr) - - if skipPacket { - i++ - continue - } - - // build resp headers for threat client - respHeader := make(map[string]*trafficpb.StringList) - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - respHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - } - - // TODO: remove and use protobuf instead - respHeaderStr := make(map[string]string) - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - respHeaderStr[name] = value - } - } - + url := req.URL.String() checkDebugUrlAndPrint(url, req.Host, "URL,host found in ParseAndProduce") + + // Convert headers in a single pass (both protobuf and string map formats) + headers := convertHeaders(req, resp, shouldPrint) - // build kafka payload for threat client - payload := &trafficpb.HttpResponseParam{ - Method: req.Method, - Path: req.URL.String(), - RequestHeaders: reqHeader, - ResponseHeaders: respHeader, - RequestPayload: requestsContent[i], - ResponsePayload: responsesContent[i], - Ip: ip, - Time: int32(time.Now().Unix()), - StatusCode: int32(resp.StatusCode), - Type: string(req.Proto), - Status: resp.Status, - AktoAccountId: fmt.Sprint(1000000), - AktoVxlanId: fmt.Sprint(vxlanID), - IsPending: isPending, + // Check all filter conditions + if !shouldProcessRequest(req, headers.Request.StringMap, ctx) { + continue } - reqHeaderString, _ := json.Marshal(reqHeaderStr) - respHeaderString, _ := json.Marshal(respHeaderStr) - - // TODO: remove and use protobuf instead - value := map[string]string{ - "path": req.URL.String(), - "requestHeaders": string(reqHeaderString), - "responseHeaders": string(respHeaderString), - "method": req.Method, - "requestPayload": requestsContent[i], - "responsePayload": responsesContent[i], - "ip": sourceIp, - "destIp": destIp, - "time": fmt.Sprint(time.Now().Unix()), - "statusCode": fmt.Sprint(resp.StatusCode), - "type": string(req.Proto), - "status": resp.Status, - "akto_account_id": fmt.Sprint(1000000), - "akto_vxlan_id": fmt.Sprint(vxlanID), - "is_pending": fmt.Sprint(isPending), - "source": trafficSource, - "direction": fmt.Sprint(direction), - "process_id": fmt.Sprint(idfd >> 32), - "socket_id": fmt.Sprint(fd), - "daemonset_id": fmt.Sprint(daemonsetIdentifier), - "enable_graph": fmt.Sprint(utils.EnableGraph), + // Get source IP from headers + ip := GetSourceIp(headers.Request.Protobuf, ctx.SourceIP) + + // Build payloads + input := PayloadInput{ + Request: req, + Response: resp, + Headers: headers, + RequestBody: requestsContent[i], + ResponseBody: responsesContent[i], + SourceIP: ip, + Context: ctx, } + payload := buildProtobufPayload(input) + value := buildJSONPayload(input) - // Process id was captured from the eBPF program using bpf_get_current_pid_tgid() - // Shifting by 32 gives us the process id on host machine. - var pid = idfd >> 32 - log := fmt.Sprintf("pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", - direction, - reqHeaderStr["host"], + // Debug logging + log := fmt.Sprintf("before resolving pod labels direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", + ctx.Direction, + headers.Request.StringMap["host"], value["path"], - sourceIp, - destIp, + ctx.SourceIP, + ctx.DestIP, value["socket_id"], - pid, - hostName, + ctx.ProcessID, + ctx.HostName, ) - utils.PrintLog(log) checkDebugUrlAndPrint(url, req.Host, log) - if PodInformerInstance != nil && direction == utils.DirectionInbound { - - if hostName == "" { - checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) - slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) - } else { - podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host) - if err != nil { - slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) - checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName) - } else { - value["tag"] = podLabels - checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) - slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) - } - } - } else { - checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) - } + // Resolve pod labels for inbound traffic + resolvePodLabels(value, ctx, url, req.Host) out, _ := json.Marshal(value) - ctx := context.Background() // calculating the size of outgoing bytes and requests (1) and saving it in outgoingCounterMap // this number is the closest (slightly higher) to the actual connection transfer bytes. @@ -478,36 +567,37 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d return } - hostString := reqHeaderStr["host"] - if utils.CheckIfIpHost(hostString) { - hostString = "ip-host" - } - oc := utils.GenerateOutgoingCounter(vxlanID, sourceIp, hostString) - trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) + sendMetrics(headers, ctx, outgoingBytes, shouldPrint, responsesContent, i, out) - if shouldPrint { - if strings.Contains(responsesContent[i], id) { - goodRequests++ - } else { - slog.Debug("req-resp.String()", "out", string(out)) - badRequests++ - } + if apiProcessor.CloudProcessorInstance != nil { + apiProcessor.CloudProcessorInstance.Produce(value) - if goodRequests%100 == 0 || badRequests%100 == 0 { - slog.Debug("Good requests", "count", goodRequests, "badRequests", badRequests) - } + } else { + // Produce to kafka with collection_details header + go ProduceStr(bgCtx, string(out), url, req.Host, req.Method) + go Produce(bgCtx, payload) } + } +} - if apiProcessor.CloudProcessorInstance != nil { - apiProcessor.CloudProcessorInstance.Produce(value) +func sendMetrics(headers ConvertedHeaders, ctx TrafficContext, outgoingBytes int, shouldPrint bool, responsesContent []string, i int, out []byte) { + hostString := headers.Request.StringMap["host"] + if utils.CheckIfIpHost(hostString) { + hostString = "ip-host" + } + oc := utils.GenerateOutgoingCounter(ctx.VxlanID, ctx.SourceIP, hostString) + trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) + if shouldPrint { + if strings.Contains(responsesContent[i], headers.DebugID) { + goodRequests++ } else { - // Produce to kafka - // TODO : remove and use protobuf instead - go ProduceStr(ctx, string(out), url, req.Host) - go Produce(ctx, payload) + slog.Debug("req-resp.String()", "out", string(out)) + badRequests++ } - i++ + if goodRequests%100 == 0 || badRequests%100 == 0 { + slog.Debug("Good requests", "count", goodRequests, "badRequests", badRequests) + } } } diff --git a/trafficUtil/kafkaUtil/parser_test.go b/trafficUtil/kafkaUtil/parser_test.go new file mode 100644 index 00000000..b673916f --- /dev/null +++ b/trafficUtil/kafkaUtil/parser_test.go @@ -0,0 +1,156 @@ +package kafkaUtil + +import ( + "compress/gzip" + "bytes" + "testing" +) + +func TestParseHTTPTraffic_ValidRequest(t *testing.T) { + reqBody := `{"cardId":12,"amount":9100.50,"bookingId":4123}` + req := []byte("POST /credit-cards/charge HTTP/1.1\r\nHost: credit-card.default.svc.cluster.local\r\nContent-Type: application/json\r\nContent-Length: 47\r\n\r\n" + reqBody) + + respBody := `{"status":"success"}` + resp := []byte("HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 20\r\n\r\n" + respBody) + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("expected non-nil result for valid request") + } + if len(result.Requests) != 1 { + t.Errorf("expected 1 request, got %d", len(result.Requests)) + } + if len(result.Responses) != 1 { + t.Errorf("expected 1 response, got %d", len(result.Responses)) + } + if result.RequestBodies[0] != reqBody { + t.Errorf("unexpected request body: %s", result.RequestBodies[0]) + } + if result.ResponseBodies[0] != respBody { + t.Errorf("unexpected response body: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_BadGzip(t *testing.T) { + reqBody := `{"cardId":12}` + req := []byte("POST /credit-cards/charge HTTP/1.1\r\nHost: credit-card.default.svc.cluster.local\r\nContent-Type: application/json\r\nContent-Length: 13\r\n\r\n" + reqBody) + + // Response claims gzip but body is not gzip encoded + resp := []byte("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Type: application/json\r\nContent-Length: 13\r\n\r\nnot-gzip-data") + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("should not return nil on gzip failure") + } + if len(result.Requests) != 1 { + t.Errorf("expected 1 request, got %d", len(result.Requests)) + } + if len(result.Responses) != 1 { + t.Errorf("expected 1 response, got %d", len(result.Responses)) + } + if result.ResponseBodies[0] != "" { + t.Errorf("expected empty body on gzip failure, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_TruncatedGzip(t *testing.T) { + req := []byte("GET /test HTTP/1.1\r\nHost: example.com\r\n\r\n") + + // Create valid gzip but truncate it + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + gz.Write([]byte("hello world this is a longer message")) + gz.Close() + truncatedGzip := buf.Bytes()[:10] // Truncate to first 10 bytes + + resp := append([]byte("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\n\r\n"), truncatedGzip...) + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("should not return nil on truncated gzip") + } + if result.ResponseBodies[0] != "" { + t.Errorf("expected empty body on truncated gzip, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_ValidGzip(t *testing.T) { + req := []byte("GET /test HTTP/1.1\r\nHost: example.com\r\n\r\n") + + // Create valid gzip response + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + gz.Write([]byte(`{"status":"ok"}`)) + gz.Close() + + resp := append([]byte("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Type: application/json\r\n\r\n"), buf.Bytes()...) + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("expected non-nil result for valid gzip") + } + if result.ResponseBodies[0] != `{"status":"ok"}` { + t.Errorf("expected decompressed body, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_EmptyRequestBody(t *testing.T) { + req := []byte("GET /health HTTP/1.1\r\nHost: example.com\r\n\r\n") + resp := []byte("HTTP/1.1 200 OK\r\n\r\nOK") + + result := parseHTTPTraffic(req, resp, false) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.RequestBodies[0] != "" { + t.Errorf("expected empty request body for GET, got: %s", result.RequestBodies[0]) + } + if result.ResponseBodies[0] != "OK" { + t.Errorf("expected 'OK' response body, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_MultipleRequests(t *testing.T) { + req := []byte("GET /first HTTP/1.1\r\nHost: example.com\r\nContent-Length: 0\r\n\r\nGET /second HTTP/1.1\r\nHost: example.com\r\nContent-Length: 0\r\n\r\n") + resp := []byte("HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nfirstHTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nsecond") + + result := parseHTTPTraffic(req, resp, false) + + if result == nil { + t.Fatal("expected non-nil result") + } + if len(result.Requests) != 2 { + t.Errorf("expected 2 requests, got %d", len(result.Requests)) + } + if len(result.Responses) != 2 { + t.Errorf("expected 2 responses, got %d", len(result.Responses)) + } +} + +func TestParseHTTPTraffic_InvalidRequest(t *testing.T) { + req := []byte("not a valid http request") + resp := []byte("HTTP/1.1 200 OK\r\n\r\nOK") + + result := parseHTTPTraffic(req, resp, false) + + // Should return nil because request parsing fails completely + if result != nil { + t.Error("expected nil result for invalid request") + } +} + +func TestParseHTTPTraffic_NoRequests(t *testing.T) { + req := []byte("") + resp := []byte("HTTP/1.1 200 OK\r\n\r\nOK") + + result := parseHTTPTraffic(req, resp, false) + + if result != nil { + t.Error("expected nil result for empty request buffer") + } +} diff --git a/trafficUtil/kafkaUtil/podinformer.go b/trafficUtil/kafkaUtil/podinformer.go index 14a54c23..8341d125 100644 --- a/trafficUtil/kafkaUtil/podinformer.go +++ b/trafficUtil/kafkaUtil/podinformer.go @@ -42,14 +42,18 @@ func init() { utils.InitVar("AKTO_K8_METADATA_CAPTURE", &KubeInjectEnabled) } +type PidInfo struct { + HostName string + ProcessName string +} + type PodInformer struct { clientset *kubernetes.Clientset nodeName string podNameLabelsMap sync.Map // Maps pod names to their labels directly - pidHostNameMap map[int32]string + pidHostNameMap map[int32]PidInfo } - func SetupPodInformer() (chan struct{}, error) { if !KubeInjectEnabled { slog.Warn("AKTO_K8_METADATA_CAPTURE is not true, skipping PodInformer setup") @@ -116,21 +120,29 @@ func NewPodInformer() (*PodInformer, error) { clientset: clientset, nodeName: nodeName, podNameLabelsMap: sync.Map{}, - pidHostNameMap: make(map[int32]string), + pidHostNameMap: make(map[int32]PidInfo), }, nil } func (w *PodInformer) GetPodNameByProcessId(pid int32) string { - if hostName, ok := w.pidHostNameMap[pid]; ok { - return hostName + if info, ok := w.pidHostNameMap[pid]; ok { + return info.HostName + } + + return "" +} + +func (w *PodInformer) GetProcessNameByProcessId(pid int32) string { + if info, ok := w.pidHostNameMap[pid]; ok { + return info.ProcessName } - slog.Warn("Hostname not found for", "processId", pid) + slog.Debug("Process name not found for", "processId", pid) return "" } func (w *PodInformer) BuildPidHostNameMap() { - cmd := exec.Command("sh", "-c", "for dir in /host/proc/[0-9]*; do pid=$(echo \"$dir\" | cut -d'/' -f4); if [ -f \"$dir/environ\" ]; then hostname=$(strings \"$dir/environ\" | grep '^HOSTNAME=' | cut -d'=' -f2); if [ -n \"$hostname\" ]; then echo \"$pid $hostname\"; fi; fi; done") + cmd := exec.Command("sh", "-c", "for dir in /host/proc/[0-9]*; do pid=$(basename \"$dir\"); if [ -f $dir/environ ]; then hostname=$(strings $dir/environ | grep '^HOSTNAME=' | cut -d'=' -f2); if [ -n \"$hostname\" ]; then comm=$(cat $dir/comm 2>/dev/null); echo \"$pid $comm $hostname\"; fi; fi; done | sort -k3") output, err := cmd.Output() if err != nil { slog.Error("Failed to execute shell command", "error", err) @@ -140,10 +152,13 @@ func (w *PodInformer) BuildPidHostNameMap() { lines := strings.Split(string(output), "\n") for _, line := range lines { parts := strings.Fields(line) - if len(parts) == 2 { + if len(parts) == 3 { pid, err := strconv.Atoi(parts[0]) if err == nil { - w.pidHostNameMap[int32(pid)] = parts[1] + w.pidHostNameMap[int32(pid)] = PidInfo{ + ProcessName: parts[1], + HostName: parts[2], + } } } } @@ -152,7 +167,6 @@ func (w *PodInformer) BuildPidHostNameMap() { } func (w *PodInformer) ResolvePodLabels(podName string, url, reqHost string) (string, error) { - slog.Debug("Resolving Pod Name to labels", "podName", podName) checkDebugUrlAndPrint(url, reqHost, "Resolving Pod Name to labels for "+podName) // Step 1: Use the pod name as the key to find labels in podNameLabelsMap @@ -186,10 +200,10 @@ func (w *PodInformer) ResolvePodLabels(podName string, url, reqHost string) (str func (w *PodInformer) logPidHostNameMap() { slog.Warn("Logging PID to Hostname Map to file", "file", utils.GoPidLogFile) var builder strings.Builder - fmt.Fprintf(&builder, "PID\tHostname:\n") + fmt.Fprintf(&builder, "PID\tProcessName\tHostname:\n") - for pid, hostName := range w.pidHostNameMap { - fmt.Fprintf(&builder, "%d\t%s\n", pid, hostName) + for pid, info := range w.pidHostNameMap { + fmt.Fprintf(&builder, "%d\t%s\t%s\n", pid, info.ProcessName, info.HostName) } fmt.Fprintf(&builder, "-------Total PIDs tracked: %d----------\n", len(w.pidHostNameMap)) utils.LogToSpecificFile(utils.GoPidLogFile, builder.String()) @@ -332,6 +346,7 @@ func (w *PodInformer) handlePodUpdate(oldObj, newObj interface{}) { slog.Debug("Pod update:", "namespace", newPod.Namespace, "podName", newPod.Name) w.podNameLabelsMap.Delete(oldPod.Name) w.podNameLabelsMap.Store(newPod.Name, newPod.Labels) + w.BuildPidHostNameMap() } func (w *PodInformer) handlePodDelete(obj interface{}) { @@ -345,4 +360,4 @@ func (w *PodInformer) handlePodDelete(obj interface{}) { // Build the PID to Hostname map again to ensure it is up-to-date // TODO: Optimize this ? What's the rate of pod add events? w.BuildPidHostNameMap() -} \ No newline at end of file +} diff --git a/trafficUtil/utils/common.go b/trafficUtil/utils/common.go index 7d49d8a0..e69197da 100644 --- a/trafficUtil/utils/common.go +++ b/trafficUtil/utils/common.go @@ -37,7 +37,7 @@ var IgnoreIpTraffic = false var IgnoreCloudMetadataCalls = false var IgnoreEnvoyProxycalls = false var EnableGraph = true -var ThreatEnabled = false +var ThreatEnabled = true const EnvoyProxyIp = "127.0.0.6" @@ -76,6 +76,5 @@ func InitVar(envVarName string, targetVar interface{}) { slog.Warn("Unsupported type for targetVar", "type", v) } } else { - slog.Warn("Missing env value, using default value", "name", envVarName) } }