From 8f927c37d846c581e3f3750aaedfdcce36a25c11 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Fri, 25 Jul 2025 11:56:20 +0530 Subject: [PATCH 01/44] add unknown container --- ebpf/uprobeBuilder/process/processFactory.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ebpf/uprobeBuilder/process/processFactory.go b/ebpf/uprobeBuilder/process/processFactory.go index c83d73bb..39b26286 100644 --- a/ebpf/uprobeBuilder/process/processFactory.go +++ b/ebpf/uprobeBuilder/process/processFactory.go @@ -118,6 +118,10 @@ func (processFactory *ProcessFactory) AddNewProcessesToProbe(bpfModule *bcc.Modu // openssl probes here are being attached on dynamically linked SSL libraries only. attached, err := ssl.TryOpensslProbes(libraries, bpfModule) + if len(containers) == 0 { + containers = append(containers, "unknown") + } + if attached { p := Process{ pid: pid, @@ -158,9 +162,7 @@ func (processFactory *ProcessFactory) AddNewProcessesToProbe(bpfModule *bcc.Modu } else if err != nil { slog.Error("Node probing error", "pid", pid, "error", err) } - processFactory.unattachedProcess[pid] = true - } } } From a6816ff2cb21868f4f999bf9f8f84f73d5aa8283 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 13 Aug 2025 15:12:20 +0530 Subject: [PATCH 02/44] update version --- Dockerfile.eBPF | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.eBPF b/Dockerfile.eBPF index 96884c74..aae6b73e 100644 --- a/Dockerfile.eBPF +++ b/Dockerfile.eBPF @@ -1,4 +1,4 @@ -FROM alpine:3.21 AS base +FROM alpine:3.22 AS base USER root RUN apk add bcc-tools bcc-dev bcc-doc linux-headers build-base From a2b09682dda93426d8d4e5cb7109e919f6b93971 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Tue, 19 Aug 2025 00:50:15 +0530 Subject: [PATCH 03/44] add chunk encoding and debug logs --- .gitignore | 3 +- ebpf/kernel/module.cc | 50 ++++++++++++++++++++++----------- trafficUtil/kafkaUtil/kafka.go | 6 ++-- trafficUtil/kafkaUtil/parser.go | 15 +++++----- trafficUtil/utils/common.go | 1 - 5 files changed, 45 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index 7c1c098f..b872b0c3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ mirroring-api-logging .idea/ **/.vscode/ temp -**temp \ No newline at end of file +**temp +data-* \ No newline at end of file diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index 4cee7779..cac8da8a 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -7,6 +7,7 @@ #define socklen_t size_t #define MAX_MSG_SIZE 30720 +#define CHUNK_LIMIT 4 #define LOOP_LIMIT 42 #define ARCH_TYPE 1 @@ -370,8 +371,35 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data socket_data_event->conn_start_ns = conn_info->conn_start_ns; socket_data_event->port = conn_info->port; socket_data_event->ip = conn_info->ip; - socket_data_event->bytes_sent = is_send ? 1 : -1; socket_data_event->ssl = conn_info->ssl; + + int bytes_sent = 0; + size_t size_to_save = 0; + int i =0; + #pragma unroll + for (i = 0; i < CHUNK_LIMIT; ++i) { + const int bytes_remaining = bytes_exchanged - bytes_sent; + + if (bytes_remaining <= 0) { + break; + } + size_t current_size = (bytes_remaining > MAX_MSG_SIZE && (i != CHUNK_LIMIT - 1)) ? MAX_MSG_SIZE : bytes_remaining; + + if (current_size > MAX_MSG_SIZE) { + current_size = MAX_MSG_SIZE; + } + + size_t current_size_minus_1 = current_size - 1; + asm volatile("" : "+r"(current_size_minus_1) :); + current_size = current_size_minus_1 + 1; + + if (current_size_minus_1 < MAX_MSG_SIZE) { + bpf_probe_read(&socket_data_event->msg, current_size, args->buf + bytes_sent); + size_to_save = current_size; + } else if (current_size_minus_1 < 0x7fffffff) { + bpf_probe_read(&socket_data_event->msg, MAX_MSG_SIZE, args->buf + bytes_sent); + size_to_save = MAX_MSG_SIZE; + } if (is_send){ conn_info->writeEventsCount = (conn_info->writeEventsCount) + 1u; @@ -389,25 +417,13 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data bpf_trace_printk("rwc: %d tdfd: %llu data: %s", (socket_data_event->readEventsCount*10000 + socket_data_event->writeEventsCount%10000),tgid_fd, socket_data_event->msg); } - size_t bytes_exchanged_minus_1 = bytes_exchanged - 1; - asm volatile("" : "+r"(bytes_exchanged_minus_1) :); - bytes_exchanged = bytes_exchanged_minus_1 + 1; - - size_t size_to_save = 0; - if (bytes_exchanged_minus_1 < MAX_MSG_SIZE) { - bpf_probe_read(&socket_data_event->msg, bytes_exchanged, args->buf); - size_to_save = bytes_exchanged; - socket_data_event->msg[size_to_save] = '\\0'; - } else if (bytes_exchanged_minus_1 < 0x7fffffff) { - bpf_probe_read(&socket_data_event->msg, MAX_MSG_SIZE, args->buf); - size_to_save = MAX_MSG_SIZE; - } - - + socket_data_event->bytes_sent = is_send ? 1 : -1; socket_data_event->bytes_sent *= size_to_save; - socket_data_events.perf_submit(ret, socket_data_event, sizeof(struct socket_data_event_t) - MAX_MSG_SIZE + size_to_save); + bytes_sent += current_size; + } + } static __inline void process_syscall_data_vecs(struct pt_regs* ret, struct data_args_t* args, u64 id, bool is_send){ diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index fdecb1dd..93e872d7 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -63,14 +63,12 @@ func InitKafka() { kafka_batch_size, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_SIZE")) if e != nil { - utils.PrintLog("AKTO_TRAFFIC_BATCH_SIZE should be valid integer") - return + kafka_batch_size = 100 } kafka_batch_time_secs, e := strconv.Atoi(os.Getenv("AKTO_TRAFFIC_BATCH_TIME_SECS")) if e != nil { - utils.PrintLog("AKTO_TRAFFIC_BATCH_TIME_SECS should be valid integer") - return + kafka_batch_time_secs = 10 } kafka_batch_time_secs_duration := time.Duration(kafka_batch_time_secs) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index b5acdfd0..f84be670 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -180,6 +180,8 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d return } + slog.Warn("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) + shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") if shouldPrint { slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) @@ -195,13 +197,13 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - utils.PrintLog(fmt.Sprintf("HTTP-request error: %s \n", err)) + fmt.Sprintf("HTTP-request error: %s \n", err) return } body, err := io.ReadAll(req.Body) req.Body.Close() if err != nil { - utils.PrintLog(fmt.Sprintf("Got body err: %s\n", err)) + fmt.Sprintf("Got body err: %s\n", err) return } @@ -229,13 +231,13 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - utils.PrintLog(fmt.Sprintf("HTTP-Response error: %s\n", err)) + fmt.Sprintf("HTTP-Response error: %s\n", err) return } body, err := io.ReadAll(resp.Body) if err != nil { - utils.PrintLog(fmt.Sprintf("Got err reading resp body: %s\n", err)) + fmt.Sprintf("Got err reading resp body: %s\n", err) return } encoding := resp.Header["Content-Encoding"] @@ -244,14 +246,14 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") { r, err = gzip.NewReader(r) if err != nil { - utils.PrintLog(fmt.Sprintf("HTTP-gunzip "+"Failed to gzip decode: %s", err)) + fmt.Sprintf("HTTP-gunzip "+"Failed to gzip decode: %s", err) return } } if err == nil { body, err = io.ReadAll(r) if err != nil { - utils.PrintLog(fmt.Sprintf("Failed to read decompressed body: %s\n", err)) + fmt.Sprintf("Failed to read decompressed body: %s\n", err) return } if _, ok := r.(*gzip.Reader); ok { @@ -444,7 +446,6 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d pid, hostName, ) - utils.PrintLog(log) checkDebugUrlAndPrint(url, req.Host, log) if PodInformerInstance != nil && direction == utils.DirectionInbound { diff --git a/trafficUtil/utils/common.go b/trafficUtil/utils/common.go index 7d49d8a0..2d6af51a 100644 --- a/trafficUtil/utils/common.go +++ b/trafficUtil/utils/common.go @@ -76,6 +76,5 @@ func InitVar(envVarName string, targetVar interface{}) { slog.Warn("Unsupported type for targetVar", "type", v) } } else { - slog.Warn("Missing env value, using default value", "name", envVarName) } } From 99ae18af81a03f38d396776f1d6867145380eff3 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Tue, 19 Aug 2025 01:10:52 +0530 Subject: [PATCH 04/44] attempt --- ebpf/kernel/module.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index cac8da8a..cb89fe69 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -384,15 +384,15 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data break; } size_t current_size = (bytes_remaining > MAX_MSG_SIZE && (i != CHUNK_LIMIT - 1)) ? MAX_MSG_SIZE : bytes_remaining; - - if (current_size > MAX_MSG_SIZE) { - current_size = MAX_MSG_SIZE; - } size_t current_size_minus_1 = current_size - 1; asm volatile("" : "+r"(current_size_minus_1) :); current_size = current_size_minus_1 + 1; + if (current_size > MAX_MSG_SIZE) { + current_size = MAX_MSG_SIZE; + } + if (current_size_minus_1 < MAX_MSG_SIZE) { bpf_probe_read(&socket_data_event->msg, current_size, args->buf + bytes_sent); size_to_save = current_size; From 5c5892b41b304b7dd6c695d32f09911dbf4b7168 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 20 Aug 2025 14:02:54 +0530 Subject: [PATCH 05/44] remove debug logs --- trafficUtil/kafkaUtil/parser.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index f84be670..7bd1753d 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -180,8 +180,6 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d return } - slog.Warn("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) - shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") if shouldPrint { slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) @@ -197,13 +195,13 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - fmt.Sprintf("HTTP-request error: %s \n", err) + utils.PrintLog(fmt.Sprintf("HTTP-request error: %s \n", err)) return } body, err := io.ReadAll(req.Body) req.Body.Close() if err != nil { - fmt.Sprintf("Got body err: %s\n", err) + utils.PrintLog(fmt.Sprintf("Got body err: %s\n", err)) return } @@ -231,13 +229,13 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - fmt.Sprintf("HTTP-Response error: %s\n", err) + utils.PrintLog(fmt.Sprintf("HTTP-Response error: %s\n", err)) return } body, err := io.ReadAll(resp.Body) if err != nil { - fmt.Sprintf("Got err reading resp body: %s\n", err) + utils.PrintLog(fmt.Sprintf("Got err reading resp body: %s\n", err)) return } encoding := resp.Header["Content-Encoding"] @@ -246,14 +244,14 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") { r, err = gzip.NewReader(r) if err != nil { - fmt.Sprintf("HTTP-gunzip "+"Failed to gzip decode: %s", err) + utils.PrintLog(fmt.Sprintf("HTTP-gunzip "+"Failed to gzip decode: %s", err)) return } } if err == nil { body, err = io.ReadAll(r) if err != nil { - fmt.Sprintf("Failed to read decompressed body: %s\n", err) + utils.PrintLog(fmt.Sprintf("Failed to read decompressed body: %s\n", err)) return } if _, ok := r.(*gzip.Reader); ok { From d853d48eeae1aa6a388541645093a167276d4735 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 20 Aug 2025 15:21:08 +0530 Subject: [PATCH 06/44] make limit configurable --- ebpf/bpfwrapper/eventCallbacks.go | 3 ++- ebpf/kernel/module.cc | 3 ++- ebpf/main.go | 9 ++++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/ebpf/bpfwrapper/eventCallbacks.go b/ebpf/bpfwrapper/eventCallbacks.go index 67b04dfb..22082b28 100644 --- a/ebpf/bpfwrapper/eventCallbacks.go +++ b/ebpf/bpfwrapper/eventCallbacks.go @@ -178,6 +178,7 @@ func SocketDataEventCallback(inputChan chan []byte, connectionFactory *connectio "data", dataStr, "rc", event.Attr.ReadEventsCount, "wc", event.Attr.WriteEventsCount, - "ssl", event.Attr.Ssl) + "ssl", event.Attr.Ssl, + "bytesSent", bytesSent) } } diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index cb89fe69..a17a1251 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -7,7 +7,7 @@ #define socklen_t size_t #define MAX_MSG_SIZE 30720 -#define CHUNK_LIMIT 4 +#define CHUNK_LIMIT CHUNK_SIZE_LIMIT #define LOOP_LIMIT 42 #define ARCH_TYPE 1 @@ -413,6 +413,7 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data if(PRINT_BPF_LOGS){ bpf_trace_printk("pid: %d conn-id:%d, fd: %d", id, conn_info->id, conn_info->fd); + bpf_trace_printk("current_size: %d i:%d, bytes_exchanged: %d", current_size, i, bytes_exchanged); unsigned long tdfd = ((id & 0xffff) << 32) + conn_info->fd; bpf_trace_printk("rwc: %d tdfd: %llu data: %s", (socket_data_event->readEventsCount*10000 + socket_data_event->writeEventsCount%10000),tgid_fd, socket_data_event->msg); } diff --git a/ebpf/main.go b/ebpf/main.go index 7fbf6d67..8a2715cf 100644 --- a/ebpf/main.go +++ b/ebpf/main.go @@ -32,6 +32,12 @@ import ( var source string = "" +func replaceBpfChunkSizeMacros() { + chunkSizeLimit := 4 + trafficUtils.InitVar("BPF_CHUNK_SIZE_LIMIT", &chunkSizeLimit) + source = strings.Replace(source, "CHUNK_SIZE_LIMIT", strconv.Itoa(chunkSizeLimit), -1) +} + func replaceBpfLogsMacros() { printBpfLogsEnv := os.Getenv("PRINT_BPF_LOGS") @@ -92,6 +98,7 @@ func run() { source = string(byteString) replaceBpfLogsMacros() + replaceBpfChunkSizeMacros() replaceMaxConnectionMapSize() replaceArchType() @@ -226,7 +233,7 @@ func run() { slog.Info("Stopping pod watcher") close(stopCh) } - + slog.Info("signaled to terminate") } From 99bd902c83092c7ecd4672af7c6ae2e365699051 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 20 Aug 2025 15:25:37 +0530 Subject: [PATCH 07/44] increase chunk limit --- ebpf/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ebpf/main.go b/ebpf/main.go index 8a2715cf..384e7187 100644 --- a/ebpf/main.go +++ b/ebpf/main.go @@ -33,7 +33,7 @@ import ( var source string = "" func replaceBpfChunkSizeMacros() { - chunkSizeLimit := 4 + chunkSizeLimit := 10 trafficUtils.InitVar("BPF_CHUNK_SIZE_LIMIT", &chunkSizeLimit) source = strings.Replace(source, "CHUNK_SIZE_LIMIT", strconv.Itoa(chunkSizeLimit), -1) } From 8535a4f21fc45307c78f233fcf9005e51f854dd7 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 20 Aug 2025 17:56:37 +0530 Subject: [PATCH 08/44] reduce default limit --- ebpf/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ebpf/main.go b/ebpf/main.go index 384e7187..8a2715cf 100644 --- a/ebpf/main.go +++ b/ebpf/main.go @@ -33,7 +33,7 @@ import ( var source string = "" func replaceBpfChunkSizeMacros() { - chunkSizeLimit := 10 + chunkSizeLimit := 4 trafficUtils.InitVar("BPF_CHUNK_SIZE_LIMIT", &chunkSizeLimit) source = strings.Replace(source, "CHUNK_SIZE_LIMIT", strconv.Itoa(chunkSizeLimit), -1) } From b6629c4595333ae04e653502d1128e97c36a9ff9 Mon Sep 17 00:00:00 2001 From: Ark2307 Date: Mon, 1 Sep 2025 09:07:58 +0530 Subject: [PATCH 09/44] Adding sasl auth in kafka for agent --- trafficUtil/kafkaUtil/kafka.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 93e872d7..01092e77 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -29,12 +29,20 @@ var useTLS = false var InsecureSkipVerify = true var tlsCACertPath = "./ca.crt" +var isAuthImplemented = false +var kafkaUsername = "" +var kafkaPassword = "" + func init() { utils.InitVar("USE_TLS", &useTLS) utils.InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify) utils.InitVar("TLS_CA_CERT_PATH", &tlsCACertPath) + InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) + InitVar("KAFKA_USERNAME", &kafkaUsername) + InitVar("KAFKA_PASSWORD", &kafkaPassword) + } func InitKafka() { @@ -312,5 +320,12 @@ func getKafkaWriter(kafkaURL string, batchSize int, batchTimeout time.Duration) TLS: tlsConfig, } } + if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" { + slog.Info("Configuring SASL plain authentication", "username", kafkaUsername) + transport.SASL = plain.Mechanism{ + Username: kafkaUsername, + Password: kafkaPassword, + } + } return &kafkaWriter } From 176250e2c68e7ec63e6542585410a6f497855362 Mon Sep 17 00:00:00 2001 From: Ark2307 Date: Tue, 2 Sep 2025 17:38:14 +0530 Subject: [PATCH 10/44] Fixing compilation errors --- trafficUtil/kafkaUtil/kafka.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 01092e77..8014bbc4 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -39,9 +39,9 @@ func init() { utils.InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify) utils.InitVar("TLS_CA_CERT_PATH", &tlsCACertPath) - InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) - InitVar("KAFKA_USERNAME", &kafkaUsername) - InitVar("KAFKA_PASSWORD", &kafkaPassword) + utils.InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) + utils.InitVar("KAFKA_USERNAME", &kafkaUsername) + utils.InitVar("KAFKA_PASSWORD", &kafkaPassword) } @@ -314,12 +314,14 @@ func getKafkaWriter(kafkaURL string, batchSize int, batchTimeout time.Duration) Compression: kafka.Lz4, } + transport := &kafka.Transport{} + if useTLS { tlsConfig, _ := NewTLSConfig(tlsCACertPath) - kafkaWriter.Transport = &kafka.Transport{ - TLS: tlsConfig, - } + transport.TLS = tlsConfig } + + // Add SASL authentication if enabled if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" { slog.Info("Configuring SASL plain authentication", "username", kafkaUsername) transport.SASL = plain.Mechanism{ @@ -327,5 +329,7 @@ func getKafkaWriter(kafkaURL string, batchSize int, batchTimeout time.Duration) Password: kafkaPassword, } } + + kafkaWriter.Transport = transport return &kafkaWriter } From 2383fe09382c8af6abada622610328002b32a91c Mon Sep 17 00:00:00 2001 From: Ark2307 Date: Tue, 2 Sep 2025 20:32:13 +0530 Subject: [PATCH 11/44] adding missing import --- trafficUtil/kafkaUtil/kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 8014bbc4..acc6e77b 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -17,6 +17,7 @@ import ( "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" "google.golang.org/protobuf/proto" ) From c53c2e34507a45ea778d7b25e6c076f4f14907aa Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Tue, 30 Sep 2025 13:03:43 +0530 Subject: [PATCH 12/44] enable threat events push by default --- trafficUtil/utils/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trafficUtil/utils/common.go b/trafficUtil/utils/common.go index 2d6af51a..e69197da 100644 --- a/trafficUtil/utils/common.go +++ b/trafficUtil/utils/common.go @@ -37,7 +37,7 @@ var IgnoreIpTraffic = false var IgnoreCloudMetadataCalls = false var IgnoreEnvoyProxycalls = false var EnableGraph = true -var ThreatEnabled = false +var ThreatEnabled = true const EnvoyProxyIp = "127.0.0.6" From b0eb594bccf8dfb746fa5d4f382d0a5c3e6d3354 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Tue, 30 Sep 2025 15:46:32 +0530 Subject: [PATCH 13/44] remove file logging enabled check on debug urls --- trafficUtil/kafkaUtil/parser.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 7bd1753d..9b02795b 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -64,10 +64,6 @@ func init() { // Start ticker to read debug URLs from file every 30 seconds go func() { - if !utils.FileLoggingEnabled { - slog.Info("File logging is not enabled, skipping debug URL file watcher") - return - } ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { From 66d67b387b41c6e84099f73c86059c3e0ec0373c Mon Sep 17 00:00:00 2001 From: notshivansh Date: Fri, 3 Oct 2025 13:56:33 +0530 Subject: [PATCH 14/44] update script --- ebpf-run.sh | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/ebpf-run.sh b/ebpf-run.sh index a85566c8..0c44e77f 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -3,6 +3,7 @@ LOG_FILE="/tmp/dump.log" MAX_LOG_SIZE=${MAX_LOG_SIZE:-10485760} # Default to 10 MB if not set (10 MB = 10 * 1024 * 1024 bytes) CHECK_INTERVAL=60 # Check interval in seconds +MEMORY_THRESHOLD=85 # Kill process at 85% memory usage # Function to rotate the log file rotate_log() { @@ -14,6 +15,30 @@ rotate_log() { fi } +# Function to check memory usage and kill process if threshold exceeded +check_memory_and_kill() { + # Get current memory usage in bytes + if [ -f /sys/fs/cgroup/memory.current ]; then + # cgroup v2 + CURRENT_MEM=$(cat /sys/fs/cgroup/memory.current) + elif [ -f /sys/fs/cgroup/memory/memory.usage_in_bytes ]; then + # cgroup v1 + CURRENT_MEM=$(cat /sys/fs/cgroup/memory/memory.usage_in_bytes) + else + return + fi + + # Calculate percentage used + PERCENT_USED=$((CURRENT_MEM * 100 / MEM_LIMIT_BYTES)) + + echo "Memory usage: ${PERCENT_USED}% (${CURRENT_MEM} / ${MEM_LIMIT_BYTES} bytes)" + + if [ "$PERCENT_USED" -ge "$MEMORY_THRESHOLD" ]; then + echo "Memory threshold ${MEMORY_THRESHOLD}% exceeded (${PERCENT_USED}%), killing ebpf-logging process" + pkill -9 ebpf-logging + fi +} + # Start monitoring in the background if [[ "${ENABLE_LOGS}" == "false" ]]; then while true; do @@ -22,10 +47,41 @@ if [[ "${ENABLE_LOGS}" == "false" ]]; then done & fi +# 1. Detect and read cgroup memory limits +if [ -f /sys/fs/cgroup/memory.max ]; then + # cgroup v2 + MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory.max) +elif [ -f /sys/fs/cgroup/memory/memory.limit_in_bytes ]; then + # cgroup v1 + MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) +else + # Fallback to free -b (bytes) if cgroup file not found + echo "Neither cgroup v2 nor v1 memory file found, defaulting to free -m" + # Convert from kB to bytes + MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') +fi + +# 2. Handle edge cases: "max" means no strict limit or a very large limit +if [ "$MEM_LIMIT_BYTES" = "max" ]; then + # Arbitrary fallback (1 GiB in bytes here, but adjust as needed) + echo "Cgroup memory limit set to 'max', defaulting to free memory" + MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') +fi + +# 3. Convert the memory limit from bytes to MB (integer division) +MEM_LIMIT_MB=$((MEM_LIMIT_BYTES / 1024 / 1024)) +echo "Detected container memory limit: ${MEM_LIMIT_MB} MB" + +# Start memory monitoring in the background +while true; do + check_memory_and_kill + sleep "$CHECK_INTERVAL" +done & + while : do if [[ "${ENABLE_LOGS}" == "false" ]]; then - ./ebpf-logging >> "$LOG_FILE" 2>&1 + ./ebpf-logging >> "$LOG_FILE" 2>&1 else ./ebpf-logging fi From bdee8006bf13b43e36758e548adaf2b294501b60 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Fri, 3 Oct 2025 14:57:39 +0530 Subject: [PATCH 15/44] configurable --- ebpf-run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ebpf-run.sh b/ebpf-run.sh index 0c44e77f..5eb14381 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -2,8 +2,8 @@ LOG_FILE="/tmp/dump.log" MAX_LOG_SIZE=${MAX_LOG_SIZE:-10485760} # Default to 10 MB if not set (10 MB = 10 * 1024 * 1024 bytes) -CHECK_INTERVAL=60 # Check interval in seconds -MEMORY_THRESHOLD=85 # Kill process at 85% memory usage +CHECK_INTERVAL=${CHECK_INTERVAL:-10} # Check interval in seconds (configurable via env) +MEMORY_THRESHOLD=${MEMORY_THRESHOLD:-80} # Kill process at this % memory usage (configurable via env) # Function to rotate the log file rotate_log() { From 40c453d41fcffceb559a2db4d29523457977e7ed Mon Sep 17 00:00:00 2001 From: notshivansh Date: Fri, 3 Oct 2025 15:29:41 +0530 Subject: [PATCH 16/44] add override --- ebpf-run.sh | 46 ++++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/ebpf-run.sh b/ebpf-run.sh index 5eb14381..76da5c5f 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -2,7 +2,8 @@ LOG_FILE="/tmp/dump.log" MAX_LOG_SIZE=${MAX_LOG_SIZE:-10485760} # Default to 10 MB if not set (10 MB = 10 * 1024 * 1024 bytes) -CHECK_INTERVAL=${CHECK_INTERVAL:-10} # Check interval in seconds (configurable via env) +CHECK_INTERVAL=${CHECK_INTERVAL:-60} +CHECK_INTERVAL_MEM=${CHECK_INTERVAL_MEM:-10} # Check interval in seconds (configurable via env) MEMORY_THRESHOLD=${MEMORY_THRESHOLD:-80} # Kill process at this % memory usage (configurable via env) # Function to rotate the log file @@ -47,25 +48,30 @@ if [[ "${ENABLE_LOGS}" == "false" ]]; then done & fi -# 1. Detect and read cgroup memory limits -if [ -f /sys/fs/cgroup/memory.max ]; then - # cgroup v2 - MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory.max) -elif [ -f /sys/fs/cgroup/memory/memory.limit_in_bytes ]; then - # cgroup v1 - MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) -else - # Fallback to free -b (bytes) if cgroup file not found - echo "Neither cgroup v2 nor v1 memory file found, defaulting to free -m" - # Convert from kB to bytes - MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') -fi +# 1. Check if MEM_LIMIT_BYTES is provided as env variable +if [ -z "$MEM_LIMIT_BYTES" ]; then + # Not provided, detect and read cgroup memory limits + if [ -f /sys/fs/cgroup/memory.max ]; then + # cgroup v2 + MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory.max) + elif [ -f /sys/fs/cgroup/memory/memory.limit_in_bytes ]; then + # cgroup v1 + MEM_LIMIT_BYTES=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) + else + # Fallback to free -b (bytes) if cgroup file not found + echo "Neither cgroup v2 nor v1 memory file found, defaulting to free -m" + # Convert from kB to bytes + MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') + fi -# 2. Handle edge cases: "max" means no strict limit or a very large limit -if [ "$MEM_LIMIT_BYTES" = "max" ]; then - # Arbitrary fallback (1 GiB in bytes here, but adjust as needed) - echo "Cgroup memory limit set to 'max', defaulting to free memory" - MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') + # 2. Handle edge cases: "max" means no strict limit or a very large limit + if [ "$MEM_LIMIT_BYTES" = "max" ]; then + # Arbitrary fallback (1 GiB in bytes here, but adjust as needed) + echo "Cgroup memory limit set to 'max', defaulting to free memory" + MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') + fi +else + echo "Using MEM_LIMIT_BYTES from environment variable: ${MEM_LIMIT_BYTES}" fi # 3. Convert the memory limit from bytes to MB (integer division) @@ -75,7 +81,7 @@ echo "Detected container memory limit: ${MEM_LIMIT_MB} MB" # Start memory monitoring in the background while true; do check_memory_and_kill - sleep "$CHECK_INTERVAL" + sleep "$CHECK_INTERVAL_MEM" done & while : From 9ab2ed505819ab28579bfd23e0018e650183a74e Mon Sep 17 00:00:00 2001 From: notshivansh Date: Fri, 3 Oct 2025 15:47:53 +0530 Subject: [PATCH 17/44] take in mb --- ebpf-run.sh | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/ebpf-run.sh b/ebpf-run.sh index 76da5c5f..f98e2558 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -48,8 +48,8 @@ if [[ "${ENABLE_LOGS}" == "false" ]]; then done & fi -# 1. Check if MEM_LIMIT_BYTES is provided as env variable -if [ -z "$MEM_LIMIT_BYTES" ]; then +# 1. Check if MEM_LIMIT is provided as env variable +if [ -z "$MEM_LIMIT" ]; then # Not provided, detect and read cgroup memory limits if [ -f /sys/fs/cgroup/memory.max ]; then # cgroup v2 @@ -70,13 +70,18 @@ if [ -z "$MEM_LIMIT_BYTES" ]; then echo "Cgroup memory limit set to 'max', defaulting to free memory" MEM_LIMIT_BYTES=$(free -b | awk '/Mem:/ {print $2}') fi + + # 3. Convert the memory limit from bytes to MB (integer division) + MEM_LIMIT_MB=$((MEM_LIMIT_BYTES / 1024 / 1024)) else - echo "Using MEM_LIMIT_BYTES from environment variable: ${MEM_LIMIT_BYTES}" + # MEM_LIMIT provided as env variable, treat as MB + echo "Using MEM_LIMIT from environment variable: ${MEM_LIMIT} MB" + MEM_LIMIT_MB=$MEM_LIMIT + # Convert MB to bytes for calculations + MEM_LIMIT_BYTES=$((MEM_LIMIT * 1024 * 1024)) fi -# 3. Convert the memory limit from bytes to MB (integer division) -MEM_LIMIT_MB=$((MEM_LIMIT_BYTES / 1024 / 1024)) -echo "Detected container memory limit: ${MEM_LIMIT_MB} MB" +echo "Using container memory limit: ${MEM_LIMIT_MB} MB" # Start memory monitoring in the background while true; do From 8d4d02a9316413ed43bf4ce3b4597f657ef36f8b Mon Sep 17 00:00:00 2001 From: notshivansh Date: Sat, 4 Oct 2025 03:43:19 +0530 Subject: [PATCH 18/44] add go mem limit. --- ebpf-run.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ebpf-run.sh b/ebpf-run.sh index f98e2558..e72e4be4 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -83,6 +83,10 @@ fi echo "Using container memory limit: ${MEM_LIMIT_MB} MB" +# Set GOMEMLIMIT for the Go process +export GOMEMLIMIT="${MEM_LIMIT_MB}MiB" +echo "Setting GOMEMLIMIT to: ${GOMEMLIMIT}" + # Start memory monitoring in the background while true; do check_memory_and_kill From 9c7b15cff9d02b25de70f75112dec4b5c131f9c9 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Sun, 5 Oct 2025 12:49:49 +0530 Subject: [PATCH 19/44] set go mem limit as 50% of mem threshold --- ebpf-run.sh | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/ebpf-run.sh b/ebpf-run.sh index e72e4be4..750ab642 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -83,15 +83,12 @@ fi echo "Using container memory limit: ${MEM_LIMIT_MB} MB" -# Set GOMEMLIMIT for the Go process -export GOMEMLIMIT="${MEM_LIMIT_MB}MiB" -echo "Setting GOMEMLIMIT to: ${GOMEMLIMIT}" +# Set GOMEMLIMIT for the Go process (60% of container limit) +GOMEMLIMIT_MB=$((MEM_LIMIT_MB * 50 / 100)) +export GOMEMLIMIT="${GOMEMLIMIT_MB}MiB" +echo "Setting GOMEMLIMIT to: ${GOMEMLIMIT} (60% of ${MEM_LIMIT_MB} MB)" # Start memory monitoring in the background -while true; do - check_memory_and_kill - sleep "$CHECK_INTERVAL_MEM" -done & while : do From e63a15ccb4b488be38e42eb975611172ff002899 Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Sun, 5 Oct 2025 12:50:57 +0530 Subject: [PATCH 20/44] set go mem limit as 50% of mem threshold --- ebpf-run.sh | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ebpf-run.sh b/ebpf-run.sh index 750ab642..2042dc46 100644 --- a/ebpf-run.sh +++ b/ebpf-run.sh @@ -5,6 +5,7 @@ MAX_LOG_SIZE=${MAX_LOG_SIZE:-10485760} # Default to 10 MB if not set (10 MB = 1 CHECK_INTERVAL=${CHECK_INTERVAL:-60} CHECK_INTERVAL_MEM=${CHECK_INTERVAL_MEM:-10} # Check interval in seconds (configurable via env) MEMORY_THRESHOLD=${MEMORY_THRESHOLD:-80} # Kill process at this % memory usage (configurable via env) +GOMEMLIMIT_PERCENT=${GOMEMLIMIT_PERCENT:-60} # GOMEMLIMIT as % of container memory limit (configurable via env) # Function to rotate the log file rotate_log() { @@ -83,10 +84,10 @@ fi echo "Using container memory limit: ${MEM_LIMIT_MB} MB" -# Set GOMEMLIMIT for the Go process (60% of container limit) -GOMEMLIMIT_MB=$((MEM_LIMIT_MB * 50 / 100)) +# Set GOMEMLIMIT for the Go process +GOMEMLIMIT_MB=$((MEM_LIMIT_MB * GOMEMLIMIT_PERCENT / 100)) export GOMEMLIMIT="${GOMEMLIMIT_MB}MiB" -echo "Setting GOMEMLIMIT to: ${GOMEMLIMIT} (60% of ${MEM_LIMIT_MB} MB)" +echo "Setting GOMEMLIMIT to: ${GOMEMLIMIT} (${GOMEMLIMIT_PERCENT}% of ${MEM_LIMIT_MB} MB)" # Start memory monitoring in the background From eed4401d0d5717585531fb6cea92ab39526f466a Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 12 Nov 2025 12:18:04 +0530 Subject: [PATCH 21/44] dbeug direction --- trafficUtil/kafkaUtil/parser.go | 19 +++++++++++-------- trafficUtil/kafkaUtil/podinformer.go | 11 ++++++++--- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index b5acdfd0..3fe09eb6 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -125,6 +125,7 @@ func UpdateDebugStringsFromFile() { func checkDebugUrlAndPrint(url string, host string, message string) { // url or host. [array string] + message = "mannakto" + message if len(DebugStrings) > 0 { for _, debugString := range DebugStrings { if strings.Contains(url, debugString) { @@ -434,7 +435,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d // Process id was captured from the eBPF program using bpf_get_current_pid_tgid() // Shifting by 32 gives us the process id on host machine. var pid = idfd >> 32 - log := fmt.Sprintf("pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", + log := fmt.Sprintf("direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", direction, reqHeaderStr["host"], value["path"], @@ -444,27 +445,29 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d pid, hostName, ) - utils.PrintLog(log) - checkDebugUrlAndPrint(url, req.Host, log) + slog.Warn("before resolving labels pod direction log" + log) + checkDebugUrlAndPrint(url, req.Host, "before resolving labels pod direction log" + log) if PodInformerInstance != nil && direction == utils.DirectionInbound { if hostName == "" { - checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) - slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) + checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid) + log) + slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName, "log", log) } else { + slog.Warn("Resolving Pod Name to labels", "podName", hostName, "log", log) podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host) if err != nil { slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) - checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName) + checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName + "log: " + log) } else { value["tag"] = podLabels - checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) + checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName + "log: " + log) slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) } } } else { - checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) + slog.Warn("Pod labels not resolved, PodInformerInstance is nil or direction is not inbound", "log", log) + checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound" + "log: " + log) } out, _ := json.Marshal(value) diff --git a/trafficUtil/kafkaUtil/podinformer.go b/trafficUtil/kafkaUtil/podinformer.go index 14a54c23..83baf4bb 100644 --- a/trafficUtil/kafkaUtil/podinformer.go +++ b/trafficUtil/kafkaUtil/podinformer.go @@ -124,7 +124,13 @@ func (w *PodInformer) GetPodNameByProcessId(pid int32) string { if hostName, ok := w.pidHostNameMap[pid]; ok { return hostName } - slog.Warn("Hostname not found for", "processId", pid) + cmd := exec.Command("sh", "-c", fmt.Sprintf("ps -p %d -o comm=", pid)) + output, err := cmd.Output() + if err != nil { + slog.Error("Failed to get process name", "pid", pid) + } + slog.Warn("Hostname not found for", "processId", pid, "commandName", output) + return "" } @@ -152,7 +158,6 @@ func (w *PodInformer) BuildPidHostNameMap() { } func (w *PodInformer) ResolvePodLabels(podName string, url, reqHost string) (string, error) { - slog.Debug("Resolving Pod Name to labels", "podName", podName) checkDebugUrlAndPrint(url, reqHost, "Resolving Pod Name to labels for "+podName) // Step 1: Use the pod name as the key to find labels in podNameLabelsMap @@ -345,4 +350,4 @@ func (w *PodInformer) handlePodDelete(obj interface{}) { // Build the PID to Hostname map again to ensure it is up-to-date // TODO: Optimize this ? What's the rate of pod add events? w.BuildPidHostNameMap() -} \ No newline at end of file +} From 2a4977457bffb2293586991d66f8f25fbe6972cd Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 12 Nov 2025 14:35:15 +0530 Subject: [PATCH 22/44] label resolving add --- trafficUtil/kafkaUtil/parser.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 3fe09eb6..7a5168e4 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -125,7 +125,7 @@ func UpdateDebugStringsFromFile() { func checkDebugUrlAndPrint(url string, host string, message string) { // url or host. [array string] - message = "mannakto" + message + message = "mannakto: " + message if len(DebugStrings) > 0 { for _, debugString := range DebugStrings { if strings.Contains(url, debugString) { @@ -454,7 +454,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid) + log) slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName, "log", log) } else { - slog.Warn("Resolving Pod Name to labels", "podName", hostName, "log", log) + checkDebugUrlAndPrint(url, req.Host, "Resolving Pod Name to labels podName: " + hostName + "log: " + log) podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host) if err != nil { slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) From f74686153eadad745c87d5326dccb6366264db9c Mon Sep 17 00:00:00 2001 From: ayushaga14 Date: Wed, 19 Nov 2025 16:04:59 +0530 Subject: [PATCH 23/44] push source in threat payload --- trafficUtil/kafkaUtil/parser.go | 1 + 1 file changed, 1 insertion(+) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 9b02795b..c348f5de 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -397,6 +397,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d AktoAccountId: fmt.Sprint(1000000), AktoVxlanId: fmt.Sprint(vxlanID), IsPending: isPending, + Source: trafficSource, } reqHeaderString, _ := json.Marshal(reqHeaderStr) From 6e846659698ad7d000cc9c8b128440c0bdb35c47 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 27 Nov 2025 16:43:25 +0530 Subject: [PATCH 24/44] remove the payload printing --- trafficUtil/kafkaUtil/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index acc6e77b..58bee14c 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -280,7 +280,7 @@ func ProduceStr(ctx context.Context, message string, url, reqHost string) error slog.Error("ERROR while writing messages", "topic", topic, "error", err) return err } - checkDebugUrlAndPrint(url, reqHost, "Kafka write successful: "+message) + checkDebugUrlAndPrint(url, reqHost, "Kafka write successful: ") return nil } From 751dcf3bcadecdc9bdda225a695ad6ae4d49bf19 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 4 Dec 2025 19:47:13 +0530 Subject: [PATCH 25/44] fix the ignore urls bug --- trafficUtil/kafkaUtil/parser.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 7a1ef74e..ab7c1f0a 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -387,12 +387,17 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d url := req.URL.String() + skipUrl := false for _, ignoreUrl := range IgnoreUrls { if strings.Contains(url, ignoreUrl) { - i++ - continue + skipUrl = true + break } } + if skipUrl { + i++ + continue + } checkDebugUrlAndPrint(url, req.Host, "URL,host found in ParseAndProduce") From 19df05c851c3d2971a163b2affe535770e4954e7 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Wed, 10 Dec 2025 23:47:35 +0530 Subject: [PATCH 26/44] envoy dont tag --- trafficUtil/kafkaUtil/parser.go | 3 +- trafficUtil/kafkaUtil/podinformer.go | 46 ++++++++++++++++++---------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index ab7c1f0a..65656ee0 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -464,7 +464,8 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d slog.Warn("before resolving labels pod direction log" + log) checkDebugUrlAndPrint(url, req.Host, "before resolving labels pod direction log" + log) - if PodInformerInstance != nil && direction == utils.DirectionInbound { + processName := PodInformerInstance.GetProcessNameByProcessId(int32(pid)) + if PodInformerInstance != nil && direction == utils.DirectionInbound && !strings.Contains(processName, "envoy") { if hostName == "" { checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid) + log) diff --git a/trafficUtil/kafkaUtil/podinformer.go b/trafficUtil/kafkaUtil/podinformer.go index 83baf4bb..891c426f 100644 --- a/trafficUtil/kafkaUtil/podinformer.go +++ b/trafficUtil/kafkaUtil/podinformer.go @@ -42,14 +42,18 @@ func init() { utils.InitVar("AKTO_K8_METADATA_CAPTURE", &KubeInjectEnabled) } +type PidInfo struct { + HostName string + ProcessName string +} + type PodInformer struct { clientset *kubernetes.Clientset nodeName string podNameLabelsMap sync.Map // Maps pod names to their labels directly - pidHostNameMap map[int32]string + pidHostNameMap map[int32]PidInfo } - func SetupPodInformer() (chan struct{}, error) { if !KubeInjectEnabled { slog.Warn("AKTO_K8_METADATA_CAPTURE is not true, skipping PodInformer setup") @@ -116,27 +120,31 @@ func NewPodInformer() (*PodInformer, error) { clientset: clientset, nodeName: nodeName, podNameLabelsMap: sync.Map{}, - pidHostNameMap: make(map[int32]string), + pidHostNameMap: make(map[int32]PidInfo), }, nil } func (w *PodInformer) GetPodNameByProcessId(pid int32) string { - if hostName, ok := w.pidHostNameMap[pid]; ok { - return hostName - } - cmd := exec.Command("sh", "-c", fmt.Sprintf("ps -p %d -o comm=", pid)) - output, err := cmd.Output() - if err != nil { - slog.Error("Failed to get process name", "pid", pid) + if info, ok := w.pidHostNameMap[pid]; ok { + return info.HostName } - slog.Warn("Hostname not found for", "processId", pid, "commandName", output) + slog.Debug("Hostname not found for", "processId", pid) + + return "" +} + +func (w *PodInformer) GetProcessNameByProcessId(pid int32) string { + if info, ok := w.pidHostNameMap[pid]; ok { + return info.ProcessName + } + slog.Debug("Process name not found for", "processId", pid) return "" } func (w *PodInformer) BuildPidHostNameMap() { - cmd := exec.Command("sh", "-c", "for dir in /host/proc/[0-9]*; do pid=$(echo \"$dir\" | cut -d'/' -f4); if [ -f \"$dir/environ\" ]; then hostname=$(strings \"$dir/environ\" | grep '^HOSTNAME=' | cut -d'=' -f2); if [ -n \"$hostname\" ]; then echo \"$pid $hostname\"; fi; fi; done") + cmd := exec.Command("sh", "-c", "for dir in /host/proc/[0-9]*; do pid=$(basename \"$dir\"); if [ -f $dir/environ ]; then hostname=$(strings $dir/environ | grep '^HOSTNAME=' | cut -d'=' -f2); if [ -n \"$hostname\" ]; then comm=$(cat $dir/comm 2>/dev/null); echo \"$pid $comm $hostname\"; fi; fi; done | sort -k3") output, err := cmd.Output() if err != nil { slog.Error("Failed to execute shell command", "error", err) @@ -146,10 +154,13 @@ func (w *PodInformer) BuildPidHostNameMap() { lines := strings.Split(string(output), "\n") for _, line := range lines { parts := strings.Fields(line) - if len(parts) == 2 { + if len(parts) == 3 { pid, err := strconv.Atoi(parts[0]) if err == nil { - w.pidHostNameMap[int32(pid)] = parts[1] + w.pidHostNameMap[int32(pid)] = PidInfo{ + ProcessName: parts[1], + HostName: parts[2], + } } } } @@ -191,10 +202,10 @@ func (w *PodInformer) ResolvePodLabels(podName string, url, reqHost string) (str func (w *PodInformer) logPidHostNameMap() { slog.Warn("Logging PID to Hostname Map to file", "file", utils.GoPidLogFile) var builder strings.Builder - fmt.Fprintf(&builder, "PID\tHostname:\n") + fmt.Fprintf(&builder, "PID\tProcessName\tHostname:\n") - for pid, hostName := range w.pidHostNameMap { - fmt.Fprintf(&builder, "%d\t%s\n", pid, hostName) + for pid, info := range w.pidHostNameMap { + fmt.Fprintf(&builder, "%d\t%s\t%s\n", pid, info.ProcessName, info.HostName) } fmt.Fprintf(&builder, "-------Total PIDs tracked: %d----------\n", len(w.pidHostNameMap)) utils.LogToSpecificFile(utils.GoPidLogFile, builder.String()) @@ -337,6 +348,7 @@ func (w *PodInformer) handlePodUpdate(oldObj, newObj interface{}) { slog.Debug("Pod update:", "namespace", newPod.Namespace, "podName", newPod.Name) w.podNameLabelsMap.Delete(oldPod.Name) w.podNameLabelsMap.Store(newPod.Name, newPod.Labels) + w.BuildPidHostNameMap() } func (w *PodInformer) handlePodDelete(obj interface{}) { From 7fa33f7854957d756e6bf2f9a799385e964e46fe Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 11 Dec 2025 03:12:23 +0530 Subject: [PATCH 27/44] remove the ignore urls change --- trafficUtil/kafkaUtil/parser.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 65656ee0..c6c2d19c 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -41,7 +41,6 @@ var ( "TRACK": true, "PATCH": true} DebugStrings = []string{} - IgnoreUrls = []string{} EventChanBuffSize = 100000 ) @@ -62,12 +61,6 @@ func init() { DebugStrings = strings.Split(debugStringsEnv, ",") } - ignoreUrlsEnv := "" - utils.InitVar("IGNORE_URLS", &ignoreUrlsEnv) - if len(ignoreUrlsEnv) > 0 { - IgnoreUrls = strings.Split(ignoreUrlsEnv, ",") - } - slog.Info("ignoreUrls", "IgnoreUrls", IgnoreUrls) // Start ticker to read debug URLs from file every 30 seconds go func() { ticker := time.NewTicker(30 * time.Second) @@ -387,18 +380,6 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d url := req.URL.String() - skipUrl := false - for _, ignoreUrl := range IgnoreUrls { - if strings.Contains(url, ignoreUrl) { - skipUrl = true - break - } - } - if skipUrl { - i++ - continue - } - checkDebugUrlAndPrint(url, req.Host, "URL,host found in ParseAndProduce") // build kafka payload for threat client From b2e23eae2473835d261cb1447519b2ae8488f1ab Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 11 Dec 2025 03:26:31 +0530 Subject: [PATCH 28/44] add back info --- trafficUtil/kafkaUtil/parser.go | 1 + 1 file changed, 1 insertion(+) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index c6c2d19c..8aae5258 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -60,6 +60,7 @@ func init() { if len(debugStringsEnv) > 0 { DebugStrings = strings.Split(debugStringsEnv, ",") } + slog.Info("debugStrings", "DebugStrings", DebugStrings) // Start ticker to read debug URLs from file every 30 seconds go func() { From 87ac8158f8158807f0433eebc414b6391b187076 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 11 Dec 2025 03:27:52 +0530 Subject: [PATCH 29/44] change parser to master --- trafficUtil/kafkaUtil/parser.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 8aae5258..c348f5de 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -121,7 +121,6 @@ func UpdateDebugStringsFromFile() { func checkDebugUrlAndPrint(url string, host string, message string) { // url or host. [array string] - message = "mannakto: " + message if len(DebugStrings) > 0 { for _, debugString := range DebugStrings { if strings.Contains(url, debugString) { @@ -380,7 +379,6 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d } url := req.URL.String() - checkDebugUrlAndPrint(url, req.Host, "URL,host found in ParseAndProduce") // build kafka payload for threat client @@ -433,7 +431,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d // Process id was captured from the eBPF program using bpf_get_current_pid_tgid() // Shifting by 32 gives us the process id on host machine. var pid = idfd >> 32 - log := fmt.Sprintf("direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", + log := fmt.Sprintf("pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", direction, reqHeaderStr["host"], value["path"], @@ -443,30 +441,26 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d pid, hostName, ) - slog.Warn("before resolving labels pod direction log" + log) - checkDebugUrlAndPrint(url, req.Host, "before resolving labels pod direction log" + log) + checkDebugUrlAndPrint(url, req.Host, log) - processName := PodInformerInstance.GetProcessNameByProcessId(int32(pid)) - if PodInformerInstance != nil && direction == utils.DirectionInbound && !strings.Contains(processName, "envoy") { + if PodInformerInstance != nil && direction == utils.DirectionInbound { if hostName == "" { - checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid) + log) - slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName, "log", log) + checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) + slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) } else { - checkDebugUrlAndPrint(url, req.Host, "Resolving Pod Name to labels podName: " + hostName + "log: " + log) podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host) if err != nil { slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) - checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName + "log: " + log) + checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName) } else { value["tag"] = podLabels - checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName + "log: " + log) + checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) } } } else { - slog.Warn("Pod labels not resolved, PodInformerInstance is nil or direction is not inbound", "log", log) - checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound" + "log: " + log) + checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) } out, _ := json.Marshal(value) From b5b6f076df2760d8a19b652f002584ada589a2e5 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 11 Dec 2025 03:29:26 +0530 Subject: [PATCH 30/44] don't tag envoy process calls --- trafficUtil/kafkaUtil/parser.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index c348f5de..bf4f1316 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -442,8 +442,8 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d hostName, ) checkDebugUrlAndPrint(url, req.Host, log) - - if PodInformerInstance != nil && direction == utils.DirectionInbound { + processName := PodInformerInstance.GetProcessNameByProcessId(int32(pid)) + if PodInformerInstance != nil && direction == utils.DirectionInbound && !strings.Contains(processName, "envoy") { if hostName == "" { checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) From c4a70ced53e8c11d9c1d681ea9502c7594bed806 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Tue, 16 Dec 2025 15:34:08 +0530 Subject: [PATCH 31/44] exit on kafka error threshold --- trafficUtil/kafkaUtil/kafka.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 58bee14c..86d448e7 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -34,6 +34,8 @@ var isAuthImplemented = false var kafkaUsername = "" var kafkaPassword = "" +var kafkaErrorThreshold = 10 + func init() { utils.InitVar("USE_TLS", &useTLS) @@ -44,6 +46,7 @@ func init() { utils.InitVar("KAFKA_USERNAME", &kafkaUsername) utils.InitVar("KAFKA_PASSWORD", &kafkaPassword) + utils.InitVar("KAFKA_ERROR_THRESHOLD", &kafkaErrorThreshold) } func InitKafka() { @@ -112,6 +115,11 @@ func kafkaCompletion() func(messages []kafka.Message, err error) { if err != nil { KafkaErrMsgCount += len(messages) slog.Error("kafka error message", "err", err, "count", KafkaErrMsgCount, "messagesCount", len(messages)) + + if KafkaErrMsgCount > kafkaErrorThreshold { + slog.Error("kafka error count exceeded threshold, restarting module", "count", KafkaErrMsgCount, "threshold", kafkaErrorThreshold) + os.Exit(1) + } } else { utils.PrintLog("kafka messages sent successfully", "messagesCount", len(messages)) } From 9895ea7c3814f53e4832d867ce1e20412a863150 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Tue, 16 Dec 2025 15:42:52 +0530 Subject: [PATCH 32/44] make optional config changes --- .github/workflows/main.yml | 59 ++++++++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bf4df63e..99a8de03 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -36,7 +36,16 @@ on: options: - legacy - ebpf - default: legacy + default: legacy + Architecture: + description: "The target architecture(s) for the Docker image." + required: true + type: choice + options: + - both + - arm64 + - amd64 + default: both # A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: @@ -82,11 +91,19 @@ jobs: ECR_REPOSITORY: akto-api-security REGISTRY_ALIAS: p7q3h0z2 IMAGE_TAG: ${{ github.event.inputs.Tag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to ECR..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to ECR with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG" - name: Build, tag, and push the image to Amazon ECR -ebpf @@ -97,11 +114,19 @@ jobs: ECR_REPOSITORY: akto-api-security REGISTRY_ALIAS: p7q3h0z2 IMAGE_TAG: ${{ github.event.inputs.EbpfTag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to ECR..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to ECR with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push echo "::set-output name=image::$ECR_REGISTRY/$REGISTRY_ALIAS/mirror-api-logging:$IMAGE_TAG" build-docker: @@ -136,11 +161,19 @@ jobs: env: ECR_REGISTRY: aktosecurity IMAGE_TAG: ${{ github.event.inputs.Tag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to DockerHub..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to DockerHub with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG . --push echo "::set-output name=image::$ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG" - name: Build, tag, and push the image to DockerHub - ebpf @@ -149,9 +182,17 @@ jobs: env: ECR_REGISTRY: aktosecurity IMAGE_TAG: ${{ github.event.inputs.EbpfTag }} + ARCH_INPUT: ${{ github.event.inputs.Architecture }} run: | # Build a docker container and push it to DockerHub docker buildx create --use - echo "Building and Pushing image to DockerHub..." - docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push + if [ "$ARCH_INPUT" == "arm64" ]; then + PLATFORM="linux/arm64/v8" + elif [ "$ARCH_INPUT" == "amd64" ]; then + PLATFORM="linux/amd64" + else + PLATFORM="linux/arm64/v8,linux/amd64" + fi + echo "Building and Pushing image to DockerHub with platform: $PLATFORM" + docker buildx build --platform $PLATFORM -t $ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG -f Dockerfile.eBPF . --push echo "::set-output name=image::$ECR_REGISTRY/mirror-api-logging:$IMAGE_TAG" From 5f93245f9dc8660dbcb4fb793a1c44e425fd95d3 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Tue, 16 Dec 2025 15:56:48 +0530 Subject: [PATCH 33/44] increase default threshold --- trafficUtil/kafkaUtil/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 86d448e7..cb30e21c 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -34,7 +34,7 @@ var isAuthImplemented = false var kafkaUsername = "" var kafkaPassword = "" -var kafkaErrorThreshold = 10 +var kafkaErrorThreshold = 100 func init() { From 9a03ba8dad3aead53d93957689c2653e9d8fbb99 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Tue, 16 Dec 2025 16:26:30 +0530 Subject: [PATCH 34/44] increase threshold --- trafficUtil/kafkaUtil/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index cb30e21c..4dd12b8d 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -34,7 +34,7 @@ var isAuthImplemented = false var kafkaUsername = "" var kafkaPassword = "" -var kafkaErrorThreshold = 100 +var kafkaErrorThreshold = 500 func init() { From d73dfb578a1274c2ec5b04b5b0ff66eeb6e85ca9 Mon Sep 17 00:00:00 2001 From: notshivansh Date: Wed, 24 Dec 2025 17:05:58 +0530 Subject: [PATCH 35/44] kafka reconnect --- trafficUtil/kafkaUtil/kafka.go | 88 ++++++++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 5 deletions(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 4dd12b8d..37e46dbc 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -10,6 +10,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/apiProcessor" @@ -22,6 +23,7 @@ import ( ) var kafkaWriter *kafka.Writer +var kafkaWriterMutex sync.RWMutex var KafkaErrMsgCount = 0 var KafkaErrMsgEpoch = time.Now() var BytesInThreshold = 500 * 1024 * 1024 @@ -35,6 +37,7 @@ var kafkaUsername = "" var kafkaPassword = "" var kafkaErrorThreshold = 500 +var kafkaReconnectIntervalMinutes = 5 func init() { @@ -47,6 +50,7 @@ func init() { utils.InitVar("KAFKA_PASSWORD", &kafkaPassword) utils.InitVar("KAFKA_ERROR_THRESHOLD", &kafkaErrorThreshold) + utils.InitVar("KAFKA_RECONNECT_INTERVAL_MINUTES", &kafkaReconnectIntervalMinutes) } func InitKafka() { @@ -85,7 +89,10 @@ func InitKafka() { kafka_batch_time_secs_duration := time.Duration(kafka_batch_time_secs) for { + kafkaWriterMutex.Lock() kafkaWriter = getKafkaWriter(kafka_url, kafka_batch_size, kafka_batch_time_secs_duration*time.Second) + kafkaWriterMutex.Unlock() + utils.LogMemoryStats() utils.PrintLog("logging kafka stats before pushing message") LogKafkaStats() @@ -100,11 +107,19 @@ func InitKafka() { LogKafkaStats() if err != nil { slog.Error("error establishing connection with kafka, sending message failed, retrying in 2 seconds", "error", err) + kafkaWriterMutex.Lock() kafkaWriter.Close() + kafkaWriterMutex.Unlock() time.Sleep(time.Second * 2) } else { utils.PrintLog("connection establishing with kafka successfully") + kafkaWriterMutex.Lock() kafkaWriter.Completion = kafkaCompletion() + kafkaWriterMutex.Unlock() + + // Start periodic reconnection routine + go periodicKafkaReconnect(kafka_url, kafka_batch_size, kafka_batch_time_secs_duration*time.Second) + slog.Info("Started Kafka periodic reconnection routine", "interval_minutes", kafkaReconnectIntervalMinutes) break } } @@ -126,11 +141,62 @@ func kafkaCompletion() func(messages []kafka.Message, err error) { } } -func Close() { - kafkaWriter.Close() +func periodicKafkaReconnect(kafka_url string, kafka_batch_size int, kafka_batch_time_secs_duration time.Duration) { + if kafkaReconnectIntervalMinutes <= 0 { + slog.Info("Kafka reconnection disabled", "interval", kafkaReconnectIntervalMinutes) + return + } + + ticker := time.NewTicker(time.Duration(kafkaReconnectIntervalMinutes) * time.Minute) + defer ticker.Stop() + + for range ticker.C { + slog.Info("Starting periodic Kafka reconnection", "interval_minutes", kafkaReconnectIntervalMinutes) + + // Create new writer + newWriter := getKafkaWriter(kafka_url, kafka_batch_size, kafka_batch_time_secs_duration) + newWriter.Completion = kafkaCompletion() + + // Test the new connection + ctx := context.Background() + value := map[string]string{ + "testConnectionString": "periodicReconnect", + } + out, _ := json.Marshal(value) + testMsg := kafka.Message{ + Topic: "akto.api.logs", + Value: out, + } + + err := newWriter.WriteMessages(ctx, testMsg) + if err != nil { + slog.Error("Failed to test new Kafka connection during periodic reconnect, keeping old connection", "error", err) + newWriter.Close() + continue + } + + // Replace old writer with new one + kafkaWriterMutex.Lock() + oldWriter := kafkaWriter + kafkaWriter = newWriter + kafkaWriterMutex.Unlock() + + // Close old writer + if oldWriter != nil { + slog.Info("Closing old Kafka writer") + oldWriter.Close() + } + + slog.Info("Kafka reconnection completed successfully") + } } func LogKafkaStats() { + kafkaWriterMutex.RLock() + defer kafkaWriterMutex.RUnlock() + if kafkaWriter == nil { + return + } stats := kafkaWriter.Stats() slog.Debug("Kafka Stats", "dials", stats.Dials, @@ -215,7 +281,11 @@ func Produce(ctx context.Context, value *trafficpb.HttpResponseParam) error { Value: protoBytes, } - err = kafkaWriter.WriteMessages(ctx, msg) + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err = writer.WriteMessages(ctx, msg) if err != nil { slog.Error("Kafka write for threat failed", "topic", topic, "error", err) return err @@ -265,7 +335,11 @@ func ProduceLogs(ctx context.Context, message string, logType string) error { Value: []byte(string(out)), } - err := kafkaWriter.WriteMessages(ctx, msg) + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err := writer.WriteMessages(ctx, msg) if err != nil { slog.Error("ERROR while writing messages", "topic", topic, "error", err) @@ -282,7 +356,11 @@ func ProduceStr(ctx context.Context, message string, url, reqHost string) error Value: []byte(message), } - err := kafkaWriter.WriteMessages(ctx, msg) + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err := writer.WriteMessages(ctx, msg) if err != nil { slog.Error("ERROR while writing messages", "topic", topic, "error", err) From e4ada9c513648421158b9187913f621c43ab9b25 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 25 Dec 2025 10:35:24 +0530 Subject: [PATCH 36/44] refactor parseAndProduce --- ebpf/connections/parser.go | 15 +- main.go | 18 +- trafficUtil/kafkaUtil/parser.go | 529 ++++++++++++++++++-------------- 3 files changed, 334 insertions(+), 228 deletions(-) diff --git a/ebpf/connections/parser.go b/ebpf/connections/parser.go index b8046d1a..65b4a69d 100644 --- a/ebpf/connections/parser.go +++ b/ebpf/connections/parser.go @@ -5,5 +5,18 @@ import ( ) func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string) { - kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName) + ctx := kafkaUtil.TrafficContext{ + SourceIP: ip, + DestIP: destIp, + VxlanID: 0, + IsPending: false, + TrafficSource: "MIRRORING", + IsComplete: isComplete, + Direction: direction, + ProcessID: uint32(id >> 32), + SocketFD: fd, + DaemonsetIdentifier: daemonsetIdentifier, + HostName: hostName, + } + kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ctx) } diff --git a/main.go b/main.go index aff7d276..ebc68d17 100644 --- a/main.go +++ b/main.go @@ -158,10 +158,20 @@ func (s *myStream) ReassemblyComplete() { } func tryReadFromBD(bd *bidi, isPending bool) { - - kafkaUtil.ParseAndProduce(bd.a.bytes, bd.b.bytes, - bd.key.net.Src().String(), bd.key.net.Dst().String(), bd.vxlanID, isPending, bd.source, true, 1, 0, 0, "0") - + ctx := kafkaUtil.TrafficContext{ + SourceIP: bd.key.net.Src().String(), + DestIP: bd.key.net.Dst().String(), + VxlanID: bd.vxlanID, + IsPending: isPending, + TrafficSource: bd.source, + IsComplete: true, + Direction: 1, + ProcessID: 0, + SocketFD: 0, + DaemonsetIdentifier: "0", + HostName: "", + } + kafkaUtil.ParseAndProduce(bd.a.bytes, bd.b.bytes, ctx) } // maybeFinish will wait until both directions are complete, then print out diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index c348f5de..2885ac74 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -21,6 +21,201 @@ import ( "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" ) +// TrafficContext holds metadata about the captured traffic. +// This consolidates the many parameters previously passed to ParseAndProduce. +type TrafficContext struct { + SourceIP string + DestIP string + VxlanID int + IsPending bool + TrafficSource string + IsComplete bool + Direction int + ProcessID uint32 // Extracted from idfd >> 32 + SocketFD uint32 + DaemonsetIdentifier string + HostName string +} + +// ParsedTraffic holds the parsed HTTP requests and responses with their bodies. +type ParsedTraffic struct { + Requests []http.Request + RequestBodies []string + Responses []http.Response + ResponseBodies []string +} + +// HeaderSet holds HTTP headers in both protobuf and string map formats. +type HeaderSet struct { + Protobuf map[string]*trafficpb.StringList + StringMap map[string]string +} + +// ConvertedHeaders holds converted headers for both request and response. +type ConvertedHeaders struct { + Request HeaderSet + Response HeaderSet + DebugID string // x-debug-token value if present +} + +// PayloadInput contains the data needed to build traffic payloads. +type PayloadInput struct { + Request *http.Request + Response *http.Response + Headers ConvertedHeaders + RequestBody string + ResponseBody string + SourceIP string // IP after GetSourceIp processing + Context TrafficContext +} + +// buildProtobufPayload creates the protobuf payload for the threat client. +func buildProtobufPayload(input PayloadInput) *trafficpb.HttpResponseParam { + return &trafficpb.HttpResponseParam{ + Method: input.Request.Method, + Path: input.Request.URL.String(), + RequestHeaders: input.Headers.Request.Protobuf, + ResponseHeaders: input.Headers.Response.Protobuf, + RequestPayload: input.RequestBody, + ResponsePayload: input.ResponseBody, + Ip: input.SourceIP, + Time: int32(time.Now().Unix()), + StatusCode: int32(input.Response.StatusCode), + Type: string(input.Request.Proto), + Status: input.Response.Status, + AktoAccountId: fmt.Sprint(1000000), + AktoVxlanId: fmt.Sprint(input.Context.VxlanID), + IsPending: input.Context.IsPending, + Source: input.Context.TrafficSource, + } +} + +// buildJSONPayload creates the JSON map payload (legacy format, TODO: remove). +func buildJSONPayload(input PayloadInput) map[string]string { + reqHeaderString, _ := json.Marshal(input.Headers.Request.StringMap) + respHeaderString, _ := json.Marshal(input.Headers.Response.StringMap) + + return map[string]string{ + "path": input.Request.URL.String(), + "requestHeaders": string(reqHeaderString), + "responseHeaders": string(respHeaderString), + "method": input.Request.Method, + "requestPayload": input.RequestBody, + "responsePayload": input.ResponseBody, + "ip": input.Context.SourceIP, + "destIp": input.Context.DestIP, + "time": fmt.Sprint(time.Now().Unix()), + "statusCode": fmt.Sprint(input.Response.StatusCode), + "type": string(input.Request.Proto), + "status": input.Response.Status, + "akto_account_id": fmt.Sprint(1000000), + "akto_vxlan_id": fmt.Sprint(input.Context.VxlanID), + "is_pending": fmt.Sprint(input.Context.IsPending), + "source": input.Context.TrafficSource, + "direction": fmt.Sprint(input.Context.Direction), + "process_id": fmt.Sprint(input.Context.ProcessID), + "socket_id": fmt.Sprint(input.Context.SocketFD), + "daemonset_id": fmt.Sprint(input.Context.DaemonsetIdentifier), + "enable_graph": fmt.Sprint(utils.EnableGraph), + } +} + +// resolvePodLabels resolves pod labels for inbound traffic and adds them to the value map. +func resolvePodLabels(value map[string]string, ctx TrafficContext, url, host string) { + if PodInformerInstance == nil || ctx.Direction != utils.DirectionInbound { + checkDebugUrlAndPrint(url, host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(ctx.Direction)) + return + } + + if ctx.HostName == "" { + checkDebugUrlAndPrint(url, host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(ctx.ProcessID)) + slog.Debug("Failed to resolve pod name, hostName is empty for ", "processId", ctx.ProcessID, "hostName", ctx.HostName) + return + } + + podLabels, err := PodInformerInstance.ResolvePodLabels(ctx.HostName, url, host) + if err != nil { + slog.Error("Failed to resolve pod labels", "hostName", ctx.HostName, "error", err) + checkDebugUrlAndPrint(url, host, "Error resolving pod labels "+ctx.HostName) + return + } + + value["tag"] = podLabels + checkDebugUrlAndPrint(url, host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+ctx.HostName) + slog.Debug("Pod labels", "podName", ctx.HostName, "labels", podLabels) +} + +// convertHeaders converts HTTP headers to both protobuf and string map formats in a single pass. +func convertHeaders(req *http.Request, resp *http.Response, shouldPrint bool) ConvertedHeaders { + result := ConvertedHeaders{ + Request: HeaderSet{ + Protobuf: make(map[string]*trafficpb.StringList), + StringMap: make(map[string]string), + }, + Response: HeaderSet{ + Protobuf: make(map[string]*trafficpb.StringList), + StringMap: make(map[string]string), + }, + } + + // Convert request headers + for name, values := range req.Header { + for _, value := range values { + result.Request.Protobuf[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + if shouldPrint && strings.EqualFold(name, "x-debug-token") { + result.DebugID = value + } + result.Request.StringMap[name] = value + } + } + result.Request.Protobuf["host"] = &trafficpb.StringList{Values: []string{req.Host}} + result.Request.StringMap["host"] = req.Host + + // Convert response headers + for name, values := range resp.Header { + for _, value := range values { + result.Response.Protobuf[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + result.Response.StringMap[name] = value + } + } + + return result +} + +// shouldProcessRequest checks all filter conditions and returns true if the request should be processed. +func shouldProcessRequest(req *http.Request, reqHeaders map[string]string, ctx TrafficContext) bool { + if !IsValidMethod(req.Method) { + return false + } + + if !utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaders) { + return false + } + + if utils.IgnoreIpTraffic && utils.CheckIfIp(req.Host) { + return false + } + + if utils.IgnoreCloudMetadataCalls && req.Host == "169.254.169.254" { + return false + } + + if utils.IgnoreEnvoyProxycalls && ctx.SourceIP == utils.EnvoyProxyIp && ctx.Direction == utils.DirectionOutbound { + slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", ctx.SourceIP, "url", req.URL.String(), "host", req.Host) + return false + } + + if utils.FilterPacket(reqHeaders) { + return false + } + + return true +} + var ( goodRequests = 0 badRequests = 0 @@ -169,22 +364,13 @@ func IsValidMethod(method string) bool { return ok } -func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, destIp string, vxlanID int, isPending bool, - trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string) { - - if checkAndUpdateBandwidthProcessed(0) { - return - } - - shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") - if shouldPrint { - slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) - } - - reader := bufio.NewReader(bytes.NewReader(receiveBuffer)) - i := 0 +// parseHTTPTraffic parses HTTP requests and responses from raw byte buffers. +// Returns nil if parsing fails (errors are logged). +func parseHTTPTraffic(reqBuffer, respBuffer []byte, shouldPrint bool) *ParsedTraffic { + // Parse requests + reader := bufio.NewReader(bytes.NewReader(reqBuffer)) requests := []http.Request{} - requestsContent := []string{} + requestBodies := []string{} for { req, err := http.ReadRequest(reader) @@ -192,48 +378,48 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d break } else if err != nil { utils.PrintLog(fmt.Sprintf("HTTP-request error: %s \n", err)) - return + return nil } body, err := io.ReadAll(req.Body) req.Body.Close() if err != nil { utils.PrintLog(fmt.Sprintf("Got body err: %s\n", err)) - return + return nil } requests = append(requests, *req) - requestsContent = append(requestsContent, string(body)) - i++ + requestBodies = append(requestBodies, string(body)) } if shouldPrint { - slog.Debug("ParseAndProduce", "count", i) + slog.Debug("parseHTTPTraffic", "requestCount", len(requests)) } + if len(requests) == 0 { - return + return nil } - reader = bufio.NewReader(bytes.NewReader(sentBuffer)) - i = 0 - + // Parse responses + reader = bufio.NewReader(bytes.NewReader(respBuffer)) responses := []http.Response{} - responsesContent := []string{} + responseBodies := []string{} for { - resp, err := http.ReadResponse(reader, nil) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { utils.PrintLog(fmt.Sprintf("HTTP-Response error: %s\n", err)) - return + return nil } body, err := io.ReadAll(resp.Body) if err != nil { utils.PrintLog(fmt.Sprintf("Got err reading resp body: %s\n", err)) - return + return nil } + + // Handle gzip/deflate decompression encoding := resp.Header["Content-Encoding"] var r io.Reader r = bytes.NewBuffer(body) @@ -241,14 +427,14 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d r, err = gzip.NewReader(r) if err != nil { utils.PrintLog(fmt.Sprintf("HTTP-gunzip "+"Failed to gzip decode: %s", err)) - return + return nil } } if err == nil { body, err = io.ReadAll(r) if err != nil { utils.PrintLog(fmt.Sprintf("Failed to read decompressed body: %s\n", err)) - return + return nil } if _, ok := r.(*gzip.Reader); ok { r.(*gzip.Reader).Close() @@ -256,20 +442,47 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d } responses = append(responses, *resp) - responsesContent = append(responsesContent, string(body)) + responseBodies = append(responseBodies, string(body)) + } + + if shouldPrint { + slog.Debug("parseHTTPTraffic", "responseCount", len(responses)) + } - i++ + return &ParsedTraffic{ + Requests: requests, + RequestBodies: requestBodies, + Responses: responses, + ResponseBodies: responseBodies, + } +} + +func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, ctx TrafficContext) { + + if checkAndUpdateBandwidthProcessed(0) { + return } + shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") if shouldPrint { + slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) + } - slog.Debug("ParseAndProduce", "count", i) + parsed := parseHTTPTraffic(receiveBuffer, sentBuffer, shouldPrint) + if parsed == nil { + return } + + requests := parsed.Requests + requestsContent := parsed.RequestBodies + responses := parsed.Responses + responsesContent := parsed.ResponseBodies + if len(requests) != len(responses) { if shouldPrint { - slog.Debug("Len req-res mismatch", "lenRequests", len(requests), "lenResponses", len(responses), "lenReceiveBuffer", len(receiveBuffer), "lenSentBuffer", len(sentBuffer), "isComplete", isComplete) + slog.Debug("Len req-res mismatch", "lenRequests", len(requests), "lenResponses", len(responses), "lenReceiveBuffer", len(receiveBuffer), "lenSentBuffer", len(sentBuffer), "isComplete", ctx.IsComplete) } - if isComplete { + if ctx.IsComplete { return } correctLen := len(requests) @@ -279,192 +492,60 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d responses = responses[:correctLen] requests = requests[:correctLen] + responsesContent = responsesContent[:correctLen] + requestsContent = requestsContent[:correctLen] } - i = 0 - for { - if len(requests) < i+1 { - break - } + bgCtx := context.Background() + for i := 0; i < len(requests); i++ { req := &requests[i] resp := &responses[i] - - if !IsValidMethod(req.Method) { - continue - } - - id := "" - - // build req headers for threat client - reqHeader := make(map[string]*trafficpb.StringList) - for name, values := range req.Header { - // Loop over all values for the name. - for _, value := range values { - reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - } - ip := GetSourceIp(reqHeader, sourceIp) - - reqHeader["host"] = &trafficpb.StringList{ - Values: []string{req.Host}, - } - - reqHeaderStr := make(map[string]string) - for name, values := range req.Header { - // Loop over all values for the name. - for _, value := range values { - if shouldPrint && - strings.EqualFold(name, "x-debug-token") { - id = value - } - reqHeaderStr[name] = value - } - } - - reqHeaderStr["host"] = req.Host - - passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) - //printLog("Req header: " + mapToString(reqHeaderStr)) - //printLog(fmt.Sprintf("passes %t", passes)) - - if !passes { - i++ - continue - } - - if utils.IgnoreIpTraffic && utils.CheckIfIp(req.Host) { - i++ - continue - } - - if utils.IgnoreCloudMetadataCalls && req.Host == "169.254.169.254" { - i++ - continue - } - - if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { - slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "url", req.URL.String(), "host", req.Host) - i++ - continue - } - - var skipPacket = utils.FilterPacket(reqHeaderStr) - - if skipPacket { - i++ - continue - } - - // build resp headers for threat client - respHeader := make(map[string]*trafficpb.StringList) - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - respHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - } - - // TODO: remove and use protobuf instead - respHeaderStr := make(map[string]string) - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - respHeaderStr[name] = value - } - } - + url := req.URL.String() checkDebugUrlAndPrint(url, req.Host, "URL,host found in ParseAndProduce") + + // Convert headers in a single pass (both protobuf and string map formats) + headers := convertHeaders(req, resp, shouldPrint) - // build kafka payload for threat client - payload := &trafficpb.HttpResponseParam{ - Method: req.Method, - Path: req.URL.String(), - RequestHeaders: reqHeader, - ResponseHeaders: respHeader, - RequestPayload: requestsContent[i], - ResponsePayload: responsesContent[i], - Ip: ip, - Time: int32(time.Now().Unix()), - StatusCode: int32(resp.StatusCode), - Type: string(req.Proto), - Status: resp.Status, - AktoAccountId: fmt.Sprint(1000000), - AktoVxlanId: fmt.Sprint(vxlanID), - IsPending: isPending, - Source: trafficSource, + // Check all filter conditions + if !shouldProcessRequest(req, headers.Request.StringMap, ctx) { + continue } - reqHeaderString, _ := json.Marshal(reqHeaderStr) - respHeaderString, _ := json.Marshal(respHeaderStr) - - // TODO: remove and use protobuf instead - value := map[string]string{ - "path": req.URL.String(), - "requestHeaders": string(reqHeaderString), - "responseHeaders": string(respHeaderString), - "method": req.Method, - "requestPayload": requestsContent[i], - "responsePayload": responsesContent[i], - "ip": sourceIp, - "destIp": destIp, - "time": fmt.Sprint(time.Now().Unix()), - "statusCode": fmt.Sprint(resp.StatusCode), - "type": string(req.Proto), - "status": resp.Status, - "akto_account_id": fmt.Sprint(1000000), - "akto_vxlan_id": fmt.Sprint(vxlanID), - "is_pending": fmt.Sprint(isPending), - "source": trafficSource, - "direction": fmt.Sprint(direction), - "process_id": fmt.Sprint(idfd >> 32), - "socket_id": fmt.Sprint(fd), - "daemonset_id": fmt.Sprint(daemonsetIdentifier), - "enable_graph": fmt.Sprint(utils.EnableGraph), + // Get source IP from headers + ip := GetSourceIp(headers.Request.Protobuf, ctx.SourceIP) + + // Build payloads + input := PayloadInput{ + Request: req, + Response: resp, + Headers: headers, + RequestBody: requestsContent[i], + ResponseBody: responsesContent[i], + SourceIP: ip, + Context: ctx, } + payload := buildProtobufPayload(input) + value := buildJSONPayload(input) - // Process id was captured from the eBPF program using bpf_get_current_pid_tgid() - // Shifting by 32 gives us the process id on host machine. - var pid = idfd >> 32 + // Debug logging log := fmt.Sprintf("pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", - direction, - reqHeaderStr["host"], + ctx.Direction, + headers.Request.StringMap["host"], value["path"], - sourceIp, - destIp, + ctx.SourceIP, + ctx.DestIP, value["socket_id"], - pid, - hostName, + ctx.ProcessID, + ctx.HostName, ) checkDebugUrlAndPrint(url, req.Host, log) - if PodInformerInstance != nil && direction == utils.DirectionInbound { - - if hostName == "" { - checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) - slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) - } else { - podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host) - if err != nil { - slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) - checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName) - } else { - value["tag"] = podLabels - checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) - slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) - } - } - } else { - checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) - } + // Resolve pod labels for inbound traffic + resolvePodLabels(value, ctx, url, req.Host) out, _ := json.Marshal(value) - ctx := context.Background() // calculating the size of outgoing bytes and requests (1) and saving it in outgoingCounterMap // this number is the closest (slightly higher) to the actual connection transfer bytes. @@ -474,25 +555,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d return } - hostString := reqHeaderStr["host"] - if utils.CheckIfIpHost(hostString) { - hostString = "ip-host" - } - oc := utils.GenerateOutgoingCounter(vxlanID, sourceIp, hostString) - trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) - - if shouldPrint { - if strings.Contains(responsesContent[i], id) { - goodRequests++ - } else { - slog.Debug("req-resp.String()", "out", string(out)) - badRequests++ - } - - if goodRequests%100 == 0 || badRequests%100 == 0 { - slog.Debug("Good requests", "count", goodRequests, "badRequests", badRequests) - } - } + sendMetrics(headers, ctx, outgoingBytes, shouldPrint, responsesContent, i, out) if apiProcessor.CloudProcessorInstance != nil { apiProcessor.CloudProcessorInstance.Produce(value) @@ -500,10 +563,30 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d } else { // Produce to kafka // TODO : remove and use protobuf instead - go ProduceStr(ctx, string(out), url, req.Host) - go Produce(ctx, payload) + go ProduceStr(bgCtx, string(out), url, req.Host) + go Produce(bgCtx, payload) } + } +} - i++ +func sendMetrics(headers ConvertedHeaders, ctx TrafficContext, outgoingBytes int, shouldPrint bool, responsesContent []string, i int, out []byte) { + hostString := headers.Request.StringMap["host"] + if utils.CheckIfIpHost(hostString) { + hostString = "ip-host" + } + oc := utils.GenerateOutgoingCounter(ctx.VxlanID, ctx.SourceIP, hostString) + trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) + + if shouldPrint { + if strings.Contains(responsesContent[i], headers.DebugID) { + goodRequests++ + } else { + slog.Debug("req-resp.String()", "out", string(out)) + badRequests++ + } + + if goodRequests%100 == 0 || badRequests%100 == 0 { + slog.Debug("Good requests", "count", goodRequests, "badRequests", badRequests) + } } } From d784ad18fb4bc6079cd26b15791a9a64a12dfbd4 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 25 Dec 2025 18:53:18 +0530 Subject: [PATCH 37/44] use empty bodies on failures --- trafficUtil/kafkaUtil/parser.go | 8 +- trafficUtil/kafkaUtil/parser_test.go | 156 +++++++++++++++++++++++++++ trafficUtil/kafkaUtil/podinformer.go | 2 +- 3 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 trafficUtil/kafkaUtil/parser_test.go diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 2885ac74..811baead 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -384,7 +384,7 @@ func parseHTTPTraffic(reqBuffer, respBuffer []byte, shouldPrint bool) *ParsedTra req.Body.Close() if err != nil { utils.PrintLog(fmt.Sprintf("Got body err: %s\n", err)) - return nil + body = []byte{} } requests = append(requests, *req) @@ -416,7 +416,7 @@ func parseHTTPTraffic(reqBuffer, respBuffer []byte, shouldPrint bool) *ParsedTra body, err := io.ReadAll(resp.Body) if err != nil { utils.PrintLog(fmt.Sprintf("Got err reading resp body: %s\n", err)) - return nil + body = []byte{} } // Handle gzip/deflate decompression @@ -427,14 +427,14 @@ func parseHTTPTraffic(reqBuffer, respBuffer []byte, shouldPrint bool) *ParsedTra r, err = gzip.NewReader(r) if err != nil { utils.PrintLog(fmt.Sprintf("HTTP-gunzip "+"Failed to gzip decode: %s", err)) - return nil + body = []byte{} } } if err == nil { body, err = io.ReadAll(r) if err != nil { utils.PrintLog(fmt.Sprintf("Failed to read decompressed body: %s\n", err)) - return nil + body = []byte{} } if _, ok := r.(*gzip.Reader); ok { r.(*gzip.Reader).Close() diff --git a/trafficUtil/kafkaUtil/parser_test.go b/trafficUtil/kafkaUtil/parser_test.go new file mode 100644 index 00000000..b673916f --- /dev/null +++ b/trafficUtil/kafkaUtil/parser_test.go @@ -0,0 +1,156 @@ +package kafkaUtil + +import ( + "compress/gzip" + "bytes" + "testing" +) + +func TestParseHTTPTraffic_ValidRequest(t *testing.T) { + reqBody := `{"cardId":12,"amount":9100.50,"bookingId":4123}` + req := []byte("POST /credit-cards/charge HTTP/1.1\r\nHost: credit-card.default.svc.cluster.local\r\nContent-Type: application/json\r\nContent-Length: 47\r\n\r\n" + reqBody) + + respBody := `{"status":"success"}` + resp := []byte("HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 20\r\n\r\n" + respBody) + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("expected non-nil result for valid request") + } + if len(result.Requests) != 1 { + t.Errorf("expected 1 request, got %d", len(result.Requests)) + } + if len(result.Responses) != 1 { + t.Errorf("expected 1 response, got %d", len(result.Responses)) + } + if result.RequestBodies[0] != reqBody { + t.Errorf("unexpected request body: %s", result.RequestBodies[0]) + } + if result.ResponseBodies[0] != respBody { + t.Errorf("unexpected response body: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_BadGzip(t *testing.T) { + reqBody := `{"cardId":12}` + req := []byte("POST /credit-cards/charge HTTP/1.1\r\nHost: credit-card.default.svc.cluster.local\r\nContent-Type: application/json\r\nContent-Length: 13\r\n\r\n" + reqBody) + + // Response claims gzip but body is not gzip encoded + resp := []byte("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Type: application/json\r\nContent-Length: 13\r\n\r\nnot-gzip-data") + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("should not return nil on gzip failure") + } + if len(result.Requests) != 1 { + t.Errorf("expected 1 request, got %d", len(result.Requests)) + } + if len(result.Responses) != 1 { + t.Errorf("expected 1 response, got %d", len(result.Responses)) + } + if result.ResponseBodies[0] != "" { + t.Errorf("expected empty body on gzip failure, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_TruncatedGzip(t *testing.T) { + req := []byte("GET /test HTTP/1.1\r\nHost: example.com\r\n\r\n") + + // Create valid gzip but truncate it + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + gz.Write([]byte("hello world this is a longer message")) + gz.Close() + truncatedGzip := buf.Bytes()[:10] // Truncate to first 10 bytes + + resp := append([]byte("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\n\r\n"), truncatedGzip...) + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("should not return nil on truncated gzip") + } + if result.ResponseBodies[0] != "" { + t.Errorf("expected empty body on truncated gzip, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_ValidGzip(t *testing.T) { + req := []byte("GET /test HTTP/1.1\r\nHost: example.com\r\n\r\n") + + // Create valid gzip response + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + gz.Write([]byte(`{"status":"ok"}`)) + gz.Close() + + resp := append([]byte("HTTP/1.1 200 OK\r\nContent-Encoding: gzip\r\nContent-Type: application/json\r\n\r\n"), buf.Bytes()...) + + result := parseHTTPTraffic(req, resp, true) + + if result == nil { + t.Fatal("expected non-nil result for valid gzip") + } + if result.ResponseBodies[0] != `{"status":"ok"}` { + t.Errorf("expected decompressed body, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_EmptyRequestBody(t *testing.T) { + req := []byte("GET /health HTTP/1.1\r\nHost: example.com\r\n\r\n") + resp := []byte("HTTP/1.1 200 OK\r\n\r\nOK") + + result := parseHTTPTraffic(req, resp, false) + + if result == nil { + t.Fatal("expected non-nil result") + } + if result.RequestBodies[0] != "" { + t.Errorf("expected empty request body for GET, got: %s", result.RequestBodies[0]) + } + if result.ResponseBodies[0] != "OK" { + t.Errorf("expected 'OK' response body, got: %s", result.ResponseBodies[0]) + } +} + +func TestParseHTTPTraffic_MultipleRequests(t *testing.T) { + req := []byte("GET /first HTTP/1.1\r\nHost: example.com\r\nContent-Length: 0\r\n\r\nGET /second HTTP/1.1\r\nHost: example.com\r\nContent-Length: 0\r\n\r\n") + resp := []byte("HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nfirstHTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nsecond") + + result := parseHTTPTraffic(req, resp, false) + + if result == nil { + t.Fatal("expected non-nil result") + } + if len(result.Requests) != 2 { + t.Errorf("expected 2 requests, got %d", len(result.Requests)) + } + if len(result.Responses) != 2 { + t.Errorf("expected 2 responses, got %d", len(result.Responses)) + } +} + +func TestParseHTTPTraffic_InvalidRequest(t *testing.T) { + req := []byte("not a valid http request") + resp := []byte("HTTP/1.1 200 OK\r\n\r\nOK") + + result := parseHTTPTraffic(req, resp, false) + + // Should return nil because request parsing fails completely + if result != nil { + t.Error("expected nil result for invalid request") + } +} + +func TestParseHTTPTraffic_NoRequests(t *testing.T) { + req := []byte("") + resp := []byte("HTTP/1.1 200 OK\r\n\r\nOK") + + result := parseHTTPTraffic(req, resp, false) + + if result != nil { + t.Error("expected nil result for empty request buffer") + } +} diff --git a/trafficUtil/kafkaUtil/podinformer.go b/trafficUtil/kafkaUtil/podinformer.go index 14a54c23..c932f631 100644 --- a/trafficUtil/kafkaUtil/podinformer.go +++ b/trafficUtil/kafkaUtil/podinformer.go @@ -124,7 +124,7 @@ func (w *PodInformer) GetPodNameByProcessId(pid int32) string { if hostName, ok := w.pidHostNameMap[pid]; ok { return hostName } - slog.Warn("Hostname not found for", "processId", pid) + slog.Debug("Hostname not found for", "processId", pid) return "" } From cf4a711aba1bde0bba06f7d974a1143c1d2a3067 Mon Sep 17 00:00:00 2001 From: gauravmann Date: Thu, 25 Dec 2025 21:16:50 +0530 Subject: [PATCH 38/44] increase and reset timer --- ebpf/connections/factory.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index ac762cd8..4487ca85 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -73,7 +73,7 @@ func convertToSingleByteArr(bufMap map[int][]byte) []byte { var ( disableEgress = false maxActiveConnections = 4096 - inactivityThreshold = 3 * time.Second + inactivityThreshold = 7 * time.Second // Value in MB bufferMemThreshold = 400 @@ -201,6 +201,25 @@ func (factory *Factory) CreateIfNotExists(connectionID structs.ConnID) { } } +// resetTimer stops, drains, and resets the timer to the given duration. +func resetTimer(t *time.Timer, d time.Duration) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + t.Reset(d) +} + +// Worker lifecycle: +// ACTIVE: +// - socket data/open -> reset inactivity timer on each event +// - socket close -> schedule delayed termination +// - inactivity timer -> terminate immediately +// +// TERMINATION is final and happens exactly once. +// either due to inactivityThreshold or due to socker close event func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracker, ch chan interface{}) { go func(connID structs.ConnID, tracker *Tracker, ch chan interface{}) { @@ -216,9 +235,11 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke case *structs.SocketDataEvent: utils.LogProcessing("Received data event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddDataEvent(*e) + resetTimer(inactivityTimer, inactivityThreshold) case *structs.SocketOpenEvent: utils.LogProcessing("Received open event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddOpenEvent(*e) + resetTimer(inactivityTimer, inactivityThreshold) case *structs.SocketCloseEvent: utils.LogProcessing("Received close event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddCloseEvent(*e) From c5a88e385325337a06f43d6f942903d855806a5b Mon Sep 17 00:00:00 2001 From: notshivansh Date: Fri, 26 Dec 2025 17:29:20 +0530 Subject: [PATCH 39/44] default don't reconnect --- trafficUtil/kafkaUtil/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 37e46dbc..3e2d2d7f 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -37,7 +37,7 @@ var kafkaUsername = "" var kafkaPassword = "" var kafkaErrorThreshold = 500 -var kafkaReconnectIntervalMinutes = 5 +var kafkaReconnectIntervalMinutes = -1 func init() { From bb536cf52cff67042ba5b595311146207112d16a Mon Sep 17 00:00:00 2001 From: gauravmann Date: Fri, 26 Dec 2025 22:28:13 +0530 Subject: [PATCH 40/44] add max 10MB limit --- ebpf/connections/factory.go | 22 +++++++++++++++++----- ebpf/connections/tracker.go | 8 ++++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index 4487ca85..e0136918 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -80,6 +80,8 @@ var ( // unique id of daemonset uniqueDaemonsetId = uuid.New().String() trackerDataProcessInterval = 100 + + socketDataEventBytesThreshold = 10 * 1024 * 1024 ) func init() { @@ -89,6 +91,7 @@ func init() { utils.InitVar("TRAFFIC_BUFFER_THRESHOLD", &bufferMemThreshold) utils.InitVar("AKTO_MEM_SOFT_LIMIT", &bufferMemThreshold) utils.InitVar("TRACKER_DATA_PROCESS_INTERVAL", &trackerDataProcessInterval) + utils.InitVar("SOCKET_DATA_EVENT_BYTES_THRESHOLD", &socketDataEventBytesThreshold) } func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool) { @@ -235,7 +238,13 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke case *structs.SocketDataEvent: utils.LogProcessing("Received data event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddDataEvent(*e) - resetTimer(inactivityTimer, inactivityThreshold) + if tracker.GetSentBytes() + tracker.GetRecvBytes() > uint64(socketDataEventBytesThreshold) { + utils.LogProcessing("Socket Data threshold data breached, processing current data", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) + factory.StopProcessing(connID) + return + }else{ + resetTimer(inactivityTimer, inactivityThreshold) + } case *structs.SocketOpenEvent: utils.LogProcessing("Received open event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddOpenEvent(*e) @@ -251,15 +260,13 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke case <-delayedDeleteChan: utils.LogProcessing("Stopping go routine (delayed close)", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) - factory.ProcessAndStopWorker(connID) - factory.DeleteWorker(connID) + factory.StopProcessing(connID) return case <-inactivityTimer.C: // Eat the go routine after inactive threshold, process the tracker and stop the worker utils.LogProcessing("Inactivity threshold reached, marking connection as inactive and processing", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) - factory.ProcessAndStopWorker(connID) - factory.DeleteWorker(connID) + factory.StopProcessing(connID) utils.LogProcessing("Stopping go routine", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) return } @@ -267,6 +274,11 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke }(connectionID, tracker, ch) } +func (factory *Factory) StopProcessing(connID structs.ConnID){ + factory.ProcessAndStopWorker(connID) + factory.DeleteWorker(connID) +} + func (factory *Factory) ProcessAndStopWorker(connectionID structs.ConnID) { tracker, connExists := factory.getTracker(connectionID) if connExists { diff --git a/ebpf/connections/tracker.go b/ebpf/connections/tracker.go index 0d8d8acb..3056d0be 100644 --- a/ebpf/connections/tracker.go +++ b/ebpf/connections/tracker.go @@ -108,3 +108,11 @@ func (conn *Tracker) AddCloseEvent(event structs.SocketCloseEvent) { conn.closeTimestamp = uint64(time.Now().UnixNano()) conn.lastAccessTimestamp = uint64(time.Now().UnixNano()) } + +func (conn *Tracker) GetSentBytes() uint64 { + return conn.sentBytes +} + +func (conn *Tracker) GetRecvBytes() uint64 { + return conn.recvBytes +} \ No newline at end of file From 1f9ee08c33c7937f0324d2a656d65dcdc50c9521 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Mon, 12 Jan 2026 20:02:16 +0530 Subject: [PATCH 41/44] Feature/communicate kafka (#144) * added kafka heartbeats with pod name * added module type as well * added module type changes and organise paylaod * change in module type and added jitter to stop spike * change in moduleType value * removing the pod name and node name * added akto agent name * added changes to get the docker image version * code cleanup * added pod name to handle docker deployments * added logs for debug * proper logging enable * chaning log level --- trafficUtil/kafkaUtil/kafka.go | 100 +++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 3e2d2d7f..9122715e 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "log/slog" + "math/rand" "os" "strconv" "strings" @@ -17,6 +18,7 @@ import ( trafficpb "github.com/akto-api-security/mirroring-api-logging/trafficUtil/protobuf/traffic_payload" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" + "github.com/google/uuid" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" "google.golang.org/protobuf/proto" @@ -38,6 +40,9 @@ var kafkaPassword = "" var kafkaErrorThreshold = 500 var kafkaReconnectIntervalMinutes = -1 +var heartbeatIntervalSeconds = 60 +var uniqueDaemonsetId = uuid.New().String() +var moduleType = "TRAFFIC_COLLECTOR" func init() { @@ -51,6 +56,7 @@ func init() { utils.InitVar("KAFKA_ERROR_THRESHOLD", &kafkaErrorThreshold) utils.InitVar("KAFKA_RECONNECT_INTERVAL_MINUTES", &kafkaReconnectIntervalMinutes) + utils.InitVar("KAFKA_HEARTBEAT_INTERVAL_SECONDS", &heartbeatIntervalSeconds) } func InitKafka() { @@ -120,6 +126,10 @@ func InitKafka() { // Start periodic reconnection routine go periodicKafkaReconnect(kafka_url, kafka_batch_size, kafka_batch_time_secs_duration*time.Second) slog.Info("Started Kafka periodic reconnection routine", "interval_minutes", kafkaReconnectIntervalMinutes) + + // Start heartbeat routine + go sendKafkaHeartbeat() + slog.Info("Started Kafka heartbeat routine", "interval_seconds", heartbeatIntervalSeconds) break } } @@ -191,6 +201,71 @@ func periodicKafkaReconnect(kafka_url string, kafka_batch_size int, kafka_batch_ } } +func getDaemonPodName() string { + aktoAgentName := os.Getenv("AKTO_AGENT_NAME") + podName := os.Getenv("POD_NAME") + nodeName := os.Getenv("NODE_NAME") + + if aktoAgentName != "" { + return fmt.Sprintf("akto-tc:%s", aktoAgentName) + } + + if podName != "" && nodeName != "" { + return fmt.Sprintf("akto-tc:%s:%s", podName, nodeName) + } + + hostname := os.Getenv("HOSTNAME") + if hostname == "" { + hostname = fmt.Sprintf("daemon-%s", uniqueDaemonsetId[:8]) + } + return fmt.Sprintf("akto-tc:%s", hostname) +} + +func getImageVersion() string { + imageVersion := os.Getenv("AKTO_IMAGE_VERSION") + if imageVersion == "" { + imageVersion = "aktosecurity/mirror-api-logging:k8s-ebpf" + } + return imageVersion +} + +func sendKafkaHeartbeat() { + if heartbeatIntervalSeconds <= 0 { + slog.Info("Kafka heartbeat disabled", "interval", heartbeatIntervalSeconds) + return + } + + daemonPodName := getDaemonPodName() + imageVersion := getImageVersion() + + slog.Debug("Starting Kafka heartbeat routine", "interval_seconds", heartbeatIntervalSeconds, "daemonPod", daemonPodName, "daemonId", uniqueDaemonsetId) + ctx := context.Background() + + for { + jitter := time.Duration(1+rand.Intn(5)) * time.Second + sleepDuration := time.Duration(heartbeatIntervalSeconds)*time.Second + jitter + + slog.Debug("Sleeping before next heartbeat", "base_interval", heartbeatIntervalSeconds, "jitter_seconds", jitter.Seconds(), "total_sleep", sleepDuration.Seconds()) + time.Sleep(sleepDuration) + + // Send single heartbeat for this daemon + heartbeatMessage := map[string]string{ + "type": "heartbeat", + "daemonId": uniqueDaemonsetId, + "daemonPodName": daemonPodName, + "timestamp": fmt.Sprint(time.Now().Unix()), + "moduleType": moduleType, + "imageVersion": imageVersion, + } + + slog.Debug("Sending Kafka heartbeat", "daemonPod", daemonPodName, "imageVersion", imageVersion, "heartbeatMessage", heartbeatMessage) + err := ProduceHeartbeat(ctx, heartbeatMessage) + if err != nil { + slog.Error("Failed to send heartbeat to Kafka", "error", err) + } + } +} + func LogKafkaStats() { kafkaWriterMutex.RLock() defer kafkaWriterMutex.RUnlock() @@ -319,6 +394,31 @@ const ( LogTypeDebug = "DEBUG" ) +func ProduceHeartbeat(ctx context.Context, heartbeatData map[string]string) error { + out, err := json.Marshal(heartbeatData) + if err != nil { + return err + } + + topic := "akto.daemonset.producer.heartbeats" + msg := kafka.Message{ + Topic: topic, + Value: []byte(string(out)), + } + + kafkaWriterMutex.RLock() + writer := kafkaWriter + kafkaWriterMutex.RUnlock() + + err = writer.WriteMessages(ctx, msg) + + if err != nil { + slog.Error("ERROR while writing heartbeat messages", "topic", topic, "error", err) + return err + } + return nil +} + func ProduceLogs(ctx context.Context, message string, logType string) error { value := map[string]string{ "message": message, From dd96fbaf2563eda1bdf98f29e079f24787f1775b Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 14 Jan 2026 15:07:47 +0530 Subject: [PATCH 42/44] feat: add kafka header --- trafficUtil/kafkaUtil/kafka.go | 28 +++++++++++++++++++++++----- trafficUtil/kafkaUtil/parser.go | 5 ++--- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 9122715e..de73bc37 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -108,7 +108,7 @@ func InitKafka() { out, _ := json.Marshal(value) ctx := context.Background() - err := ProduceStr(ctx, string(out), "testKafkaConnection", "testKafkaConnectionHost") + err := ProduceStr(ctx, string(out), "testKafkaConnection", "testKafkaConnectionHost", "") utils.PrintLog("logging kafka stats post pushing message") LogKafkaStats() if err != nil { @@ -448,12 +448,30 @@ func ProduceLogs(ctx context.Context, message string, logType string) error { return nil } -func ProduceStr(ctx context.Context, message string, url, reqHost string) error { - // initialize the writer with the broker addresses, and the topic +// buildCollectionDetailsHeader creates the collection_details Kafka header +// Format: "host|method|url" +// Returns nil if method is empty (skip header for non-HTTP messages) +func buildCollectionDetailsHeader(host, method, url string) []kafka.Header { + if method == "" { + return nil + } + + headerValue := fmt.Sprintf("%s|%s|%s", host, method, url) + return []kafka.Header{ + { + Key: "collection_details", + Value: []byte(headerValue), + }, + } +} + +func ProduceStr(ctx context.Context, message string, url, reqHost, method string) error { topic := "akto.api.logs" + msg := kafka.Message{ - Topic: topic, - Value: []byte(message), + Topic: topic, + Value: []byte(message), + Headers: buildCollectionDetailsHeader(reqHost, method, url), } kafkaWriterMutex.RLock() diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 811baead..7baea85f 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -561,9 +561,8 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, ctx TrafficContext apiProcessor.CloudProcessorInstance.Produce(value) } else { - // Produce to kafka - // TODO : remove and use protobuf instead - go ProduceStr(bgCtx, string(out), url, req.Host) + // Produce to kafka with collection_details header + go ProduceStr(bgCtx, string(out), url, req.Host, req.Method) go Produce(bgCtx, payload) } } From 148495503ea10af8e494d0ea46ac8a0a3a5f10b2 Mon Sep 17 00:00:00 2001 From: Abhijeet Date: Wed, 14 Jan 2026 15:37:17 +0530 Subject: [PATCH 43/44] chore: add check --- trafficUtil/kafkaUtil/kafka.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index de73bc37..0dac65f5 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -450,9 +450,9 @@ func ProduceLogs(ctx context.Context, message string, logType string) error { // buildCollectionDetailsHeader creates the collection_details Kafka header // Format: "host|method|url" -// Returns nil if method is empty (skip header for non-HTTP messages) +// Returns nil if any parameter is empty (skip header for incomplete messages) func buildCollectionDetailsHeader(host, method, url string) []kafka.Header { - if method == "" { + if host == "" || method == "" || url == "" { return nil } From edde56caf28abaceb171c48fb4e447378b67f3ea Mon Sep 17 00:00:00 2001 From: gauravmann Date: Mon, 19 Jan 2026 11:02:14 +0530 Subject: [PATCH 44/44] add better logs --- trafficUtil/kafkaUtil/parser.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index de8003ef..45e198d0 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -129,13 +129,13 @@ func resolvePodLabels(value map[string]string, ctx TrafficContext, url, host str } if ctx.Direction == utils.DirectionOutbound { - checkDebugUrlAndPrint(url, host, "Pod labels not resolved for outbound direction request: "+fmt.Sprint(ctx.Direction)) + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels not resolved for outbound request, podName: %s, direction: %v", ctx.HostName, ctx.Direction)) return } processName := PodInformerInstance.GetProcessNameByProcessId(int32(ctx.ProcessID)) if strings.Contains(processName, "envoy") { - checkDebugUrlAndPrint(url, host, "Pod labels not resolved for envoy request: "+fmt.Sprint(ctx.Direction)) + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels not resolved for envoy request, podName: %s, direction: %v", ctx.HostName, ctx.Direction)) return } @@ -332,13 +332,13 @@ func checkDebugUrlAndPrint(url string, host string, message string) { for _, debugString := range DebugStrings { if strings.Contains(url, debugString) { ctx := context.Background() - logMsg := fmt.Sprintf("%s : %s", message, url) + logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message) utils.PrintLogDebug(logMsg) go ProduceLogs(ctx, logMsg, LogTypeInfo) break } else if strings.Contains(host, debugString) { ctx := context.Background() - logMsg := fmt.Sprintf("%s : %s", message, host) + logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message) utils.PrintLogDebug(logMsg) go ProduceLogs(ctx, logMsg, LogTypeInfo) break