From e1755af9aabbd1ee751c60c6b1fb35d11263f42d Mon Sep 17 00:00:00 2001 From: kural-akto Date: Wed, 17 Dec 2025 19:10:44 +0530 Subject: [PATCH 01/13] feat: implemented http2 support for ebpf --- ebpf/connections/factory.go | 72 ++++++- ebpf/connections/parser.go | 4 +- ebpf/connections/tracker.go | 15 +- trafficUtil/kafkaUtil/parser.go | 325 +++++++++++++++++++++++++++++++- 4 files changed, 408 insertions(+), 8 deletions(-) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index ac762cd8..c3f74da7 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -82,6 +82,12 @@ var ( trackerDataProcessInterval = 100 ) +const ( + protocolUnknown = "unknown" + protocolhttp1 = "http1" + protocolhttp2 = "http2" +) + func init() { utils.InitVar("TRAFFIC_DISABLE_EGRESS", &disableEgress) utils.InitVar("TRAFFIC_MAX_ACTIVE_CONN", &maxActiveConnections) @@ -120,13 +126,15 @@ func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool hostName = kafkaUtil.PodInformerInstance.GetPodNameByProcessId(int32(connID.Id >> 32)) } + protocol := tracker.GetProtocol() + if len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes)) { - tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName) + tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } if !disableEgress { // attempt to parse the egress as well by switching the recv and sent buffers. if len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes)) { - tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName) + tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } } } @@ -215,7 +223,12 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke switch e := event.(type) { case *structs.SocketDataEvent: utils.LogProcessing("Received data event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) - tracker.AddDataEvent(*e) + + protocol := tracker.GetProtocol() + if protocol == protocolUnknown { + protocol = detectProtocol(e.Msg[:]) + } + tracker.AddDataEvent(*e, protocol) case *structs.SocketOpenEvent: utils.LogProcessing("Received open event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddOpenEvent(*e) @@ -329,3 +342,56 @@ func (factory *Factory) SendEvent(connectionID structs.ConnID, event interface{} utils.LogProcessing("No worker found for", "connectionId", connectionID) } } + +func detectProtocol(data []byte) string { + if len(data) < 16 { + return protocolUnknown + } + + http2Preface := []byte("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + if len(data) >= len(http2Preface) && bytes.Equal(data[:len(http2Preface)], http2Preface) { + return protocolhttp2 + } + + // Frame format: 3 bytes length + 1 byte type + 1 byte flags + 4 bytes stream ID + if len(data) >= 9 { + frameType := data[3] + + if frameType <= 9 { + + frameLen := int(data[0])<<16 | int(data[1])<<8 | int(data[2]) + if frameLen > 0 && frameLen <= 16384 { // Max frame size is typically 16KB + // Additional heuristic: check if it's not starting with HTTP/1 methods + if !bytes.HasPrefix(data, []byte("GET ")) && + !bytes.HasPrefix(data, []byte("POST ")) && + !bytes.HasPrefix(data, []byte("PUT ")) && + !bytes.HasPrefix(data, []byte("DELETE ")) && + !bytes.HasPrefix(data, []byte("PATCH ")) && + !bytes.HasPrefix(data, []byte("HEAD ")) && + !bytes.HasPrefix(data, []byte("OPTIONS ")) && + !bytes.HasPrefix(data, []byte("CONNECT ")) && + !bytes.HasPrefix(data, []byte("TRACE ")) && + !bytes.HasPrefix(data, httpBytes) { + return protocolhttp2 + } + } + } + } + + // HTTP/1.x detection - starts with HTTP method or "HTTP/" + if bytes.HasPrefix(data, []byte("GET ")) || + bytes.HasPrefix(data, []byte("POST ")) || + bytes.HasPrefix(data, []byte("PUT ")) || + bytes.HasPrefix(data, []byte("DELETE ")) || + bytes.HasPrefix(data, []byte("PATCH ")) || + bytes.HasPrefix(data, []byte("HEAD ")) || + bytes.HasPrefix(data, []byte("OPTIONS ")) || + bytes.HasPrefix(data, []byte("CONNECT ")) || + bytes.HasPrefix(data, []byte("TRACE ")) || + bytes.HasPrefix(data, []byte("HTTP/1.")) || + bytes.HasPrefix(data, []byte("HTTP/0.")) { + return protocolhttp1 + } + + return protocolUnknown +} diff --git a/ebpf/connections/parser.go b/ebpf/connections/parser.go index b8046d1a..e3a43053 100644 --- a/ebpf/connections/parser.go +++ b/ebpf/connections/parser.go @@ -4,6 +4,6 @@ import ( "github.com/akto-api-security/mirroring-api-logging/trafficUtil/kafkaUtil" ) -func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string) { - kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName) +func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string, protocol string) { + kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName, protocol) } diff --git a/ebpf/connections/tracker.go b/ebpf/connections/tracker.go index 0d8d8acb..e6cebb1d 100644 --- a/ebpf/connections/tracker.go +++ b/ebpf/connections/tracker.go @@ -30,6 +30,7 @@ type Tracker struct { srcPort uint16 foundHTTP bool + protocol string // "http1", "http2", or "unknown" } func NewTracker(connID structs.ConnID) *Tracker { @@ -40,6 +41,7 @@ func NewTracker(connID structs.ConnID) *Tracker { mutex: sync.RWMutex{}, ssl: false, foundHTTP: false, + protocol: protocolUnknown, } } @@ -54,10 +56,15 @@ func (conn *Tracker) IsComplete() bool { return complete } -func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) { +func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent, protocol string) { conn.mutex.Lock() defer conn.mutex.Unlock() + if conn.protocol == protocolUnknown && protocol != protocolUnknown { + conn.protocol = protocol + metaUtils.LogIngest("Protocol detected", "fd", conn.connID.Fd, "id", conn.connID.Id, "protocol", protocol) + } + if !conn.ssl && event.Attr.Ssl { for k := range conn.sentBuf { conn.sentBuf[k] = []byte{} @@ -87,6 +94,12 @@ func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) { conn.lastAccessTimestamp = uint64(time.Now().UnixNano()) } +func (conn *Tracker) GetProtocol() string { + conn.mutex.RLock() + defer conn.mutex.RUnlock() + return conn.protocol +} + func (conn *Tracker) AddOpenEvent(event structs.SocketOpenEvent) { conn.mutex.Lock() defer conn.mutex.Unlock() diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index c348f5de..47acab7b 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -19,6 +19,8 @@ import ( trafficpb "github.com/akto-api-security/mirroring-api-logging/trafficUtil/protobuf/traffic_payload" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/trafficMetrics" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var ( @@ -47,6 +49,10 @@ var ( const ONE_MINUTE = 60 +const ( + protocolhttp2 = "http2" +) + func init() { utils.InitVar("DEBUG_MODE", &debugMode) utils.InitVar("OUTPUT_BANDWIDTH_LIMIT", &outputBandwidthLimitPerMin) @@ -170,15 +176,21 @@ func IsValidMethod(method string) bool { } func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, destIp string, vxlanID int, isPending bool, - trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string) { + trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string, protocol string) { if checkAndUpdateBandwidthProcessed(0) { return } + // Route to HTTP/2 parser if protocol is detected as HTTP/2 + if protocol == protocolhttp2 { + slog.Debug("Routing to HTTP/2 parser", "sourceIp", sourceIp, "destIp", destIp, "protocol", protocol) + ParseHTTP2AndProduce(receiveBuffer, sentBuffer, sourceIp, destIp, vxlanID, isPending, trafficSource, isComplete, direction, idfd, fd, daemonsetIdentifier, hostName) + return + } shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") if shouldPrint { - slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) + slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer), "protocol", protocol) } reader := bufio.NewReader(bytes.NewReader(receiveBuffer)) @@ -507,3 +519,312 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d i++ } } + +type http2Stream struct { + streamID uint32 + requestHeaders map[string]string + requestBody []byte + responseHeaders map[string]string + responseBody []byte + method string + path string + statusCode int + status string + requestComplete bool + responseComplete bool +} + +func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, destIp string, vxlanID int, isPending bool, + trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string) { + + if checkAndUpdateBandwidthProcessed(0) { + return + } + + shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") + if shouldPrint { + slog.Debug("ParseHTTP2AndProduce", "receiveBufferLen", len(receiveBuffer), "sentBufferLen", len(sentBuffer)) + } + + http2Preface := []byte("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + if len(receiveBuffer) >= len(http2Preface) && bytes.Equal(receiveBuffer[:len(http2Preface)], http2Preface) { + receiveBuffer = receiveBuffer[len(http2Preface):] + slog.Debug("Skipped HTTP/2 connection preface") + } + if len(sentBuffer) >= len(http2Preface) && bytes.Equal(sentBuffer[:len(http2Preface)], sentBuffer) { + sentBuffer = sentBuffer[len(http2Preface):] + } + + streams := make(map[uint32]*http2Stream) + + parseHTTP2Frames(receiveBuffer, streams, true, shouldPrint) + parseHTTP2Frames(sentBuffer, streams, false, shouldPrint) + + // Process complete request/response pairs + for streamID, stream := range streams { + if !stream.requestComplete || !stream.responseComplete { + if shouldPrint { + slog.Debug("Incomplete stream", "streamID", streamID, "requestComplete", stream.requestComplete, "responseComplete", stream.responseComplete) + } + // Skip incomplete streams unless connection is complete + if !isComplete { + continue + } + } + + reqHeader := make(map[string]*trafficpb.StringList) + for name, value := range stream.requestHeaders { + reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + } + + ip := GetSourceIp(reqHeader, sourceIp) + + if stream.method != "" { + reqHeader[":method"] = &trafficpb.StringList{Values: []string{stream.method}} + } + if stream.path != "" { + reqHeader[":path"] = &trafficpb.StringList{Values: []string{stream.path}} + } + + host := stream.requestHeaders[":authority"] + if host == "" { + host = stream.requestHeaders["host"] + } + if host != "" { + reqHeader["host"] = &trafficpb.StringList{Values: []string{host}} + } + + respHeader := make(map[string]*trafficpb.StringList) + for name, value := range stream.responseHeaders { + respHeader[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + } + + reqHeaderStr := make(map[string]string) + for name, value := range stream.requestHeaders { + reqHeaderStr[name] = value + } + if host != "" { + reqHeaderStr["host"] = host + } + + passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) + if !passes { + continue + } + + if utils.IgnoreIpTraffic && utils.CheckIfIp(host) { + continue + } + + if utils.IgnoreCloudMetadataCalls && host == "169.254.169.254" { + continue + } + + if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { + slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "path", stream.path, "host", host) + continue + } + + var skipPacket = utils.FilterPacket(reqHeaderStr) + if skipPacket { + continue + } + + url := stream.path + checkDebugUrlAndPrint(url, host, "HTTP/2 URL,host found in ParseHTTP2AndProduce") + + payload := &trafficpb.HttpResponseParam{ + Method: stream.method, + Path: stream.path, + RequestHeaders: reqHeader, + ResponseHeaders: respHeader, + RequestPayload: string(stream.requestBody), + ResponsePayload: string(stream.responseBody), + Ip: ip, + Time: int32(time.Now().Unix()), + StatusCode: int32(stream.statusCode), + Type: "HTTP/2.0", + Status: stream.status, + AktoAccountId: fmt.Sprint(1000000), + AktoVxlanId: fmt.Sprint(vxlanID), + IsPending: isPending, + Source: trafficSource, + } + + respHeaderStr := make(map[string]string) + for name, value := range stream.responseHeaders { + respHeaderStr[name] = value + } + + reqHeaderString, _ := json.Marshal(reqHeaderStr) + respHeaderString, _ := json.Marshal(respHeaderStr) + + value := map[string]string{ + "path": stream.path, + "requestHeaders": string(reqHeaderString), + "responseHeaders": string(respHeaderString), + "method": stream.method, + "requestPayload": string(stream.requestBody), + "responsePayload": string(stream.responseBody), + "ip": sourceIp, + "destIp": destIp, + "time": fmt.Sprint(time.Now().Unix()), + "statusCode": fmt.Sprint(stream.statusCode), + "type": "HTTP/2.0", + "status": stream.status, + "akto_account_id": fmt.Sprint(1000000), + "akto_vxlan_id": fmt.Sprint(vxlanID), + "is_pending": fmt.Sprint(isPending), + "source": trafficSource, + "direction": fmt.Sprint(direction), + "process_id": fmt.Sprint(idfd >> 32), + "socket_id": fmt.Sprint(fd), + "daemonset_id": fmt.Sprint(daemonsetIdentifier), + "enable_graph": fmt.Sprint(utils.EnableGraph), + } + + var pid = idfd >> 32 + log := fmt.Sprintf("HTTP/2 pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v, streamId=%v", + direction, host, stream.path, sourceIp, destIp, fd, pid, hostName, streamID) + checkDebugUrlAndPrint(url, host, log) + + if PodInformerInstance != nil && direction == utils.DirectionInbound { + if hostName == "" { + checkDebugUrlAndPrint(url, host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) + slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) + } else { + podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, host) + if err != nil { + slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) + checkDebugUrlAndPrint(url, host, "Error resolving pod labels "+hostName) + } else { + value["tag"] = podLabels + checkDebugUrlAndPrint(url, host, "Pod labels found in ParseHTTP2AndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) + slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) + } + } + } else { + checkDebugUrlAndPrint(url, host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) + } + + out, _ := json.Marshal(value) + ctx := context.Background() + + outgoingBytes := len(out) + if checkAndUpdateBandwidthProcessed(outgoingBytes) { + return + } + + hostString := host + if utils.CheckIfIpHost(hostString) { + hostString = "ip-host" + } + oc := utils.GenerateOutgoingCounter(vxlanID, sourceIp, hostString) + trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) + + if apiProcessor.CloudProcessorInstance != nil { + apiProcessor.CloudProcessorInstance.Produce(value) + } else { + go ProduceStr(ctx, string(out), url, host) + go Produce(ctx, payload) + } + } +} + +func parseHTTP2Frames(buffer []byte, streams map[uint32]*http2Stream, isRequest bool, shouldPrint bool) { + framer := http2.NewFramer(nil, bytes.NewReader(buffer)) + framer.SetMaxReadFrameSize(1 << 20) // 1MB max frame size + + decoder := hpack.NewDecoder(4096, nil) + + for { + frame, err := framer.ReadFrame() + if err != nil { + if err != io.EOF { + if shouldPrint { + slog.Debug("Error reading HTTP/2 frame", "error", err, "isRequest", isRequest) + } + } + break + } + + streamID := frame.Header().StreamID + + // Skip stream 0 (connection-level frames like SETTINGS, WINDOW_UPDATE, PING) + if streamID == 0 { + continue + } + + // Get or create stream + stream, exists := streams[streamID] + if !exists { + stream = &http2Stream{ + streamID: streamID, + requestHeaders: make(map[string]string), + responseHeaders: make(map[string]string), + } + streams[streamID] = stream + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + headerBlock := f.HeaderBlockFragment() + headers, err := decoder.DecodeFull(headerBlock) + if err != nil { + slog.Error("Failed to decode HPACK headers", "error", err, "streamID", streamID) + continue + } + + if isRequest { + for _, hf := range headers { + stream.requestHeaders[hf.Name] = hf.Value + // Extract pseudo-headers + if hf.Name == ":method" { + stream.method = hf.Value + } else if hf.Name == ":path" { + stream.path = hf.Value + } + } + if f.StreamEnded() { + stream.requestComplete = true + } + } else { + for _, hf := range headers { + stream.responseHeaders[hf.Name] = hf.Value + // Extract status code + if hf.Name == ":status" { + stream.status = hf.Value + fmt.Sscanf(hf.Value, "%d", &stream.statusCode) + } + } + if f.StreamEnded() { + stream.responseComplete = true + } + } + + case *http2.DataFrame: + data := f.Data() + if isRequest { + stream.requestBody = append(stream.requestBody, data...) + if f.StreamEnded() { + stream.requestComplete = true + } + } else { + stream.responseBody = append(stream.responseBody, data...) + if f.StreamEnded() { + stream.responseComplete = true + } + } + + case *http2.RSTStreamFrame: + // Stream was reset + if shouldPrint { + slog.Debug("Stream reset", "streamID", streamID, "errorCode", f.ErrCode) + } + } + } +} From 85547cd78998588dd84bfc7f143c0b2f0cd2a9c6 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Sun, 21 Dec 2025 11:06:06 +0530 Subject: [PATCH 02/13] fix: remove dead lock and allow http2 --- ebpf/connections/factory.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index c3f74da7..6b958704 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -126,14 +126,14 @@ func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool hostName = kafkaUtil.PodInformerInstance.GetPodNameByProcessId(int32(connID.Id >> 32)) } - protocol := tracker.GetProtocol() + protocol := tracker.protocol - if len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes)) { + if (len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 { tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } if !disableEgress { // attempt to parse the egress as well by switching the recv and sent buffers. - if len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes)) { + if (len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 { tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } } From 8da4b1e938490a293062f8acc996cd9df95e97f9 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Mon, 22 Dec 2025 23:00:28 +0530 Subject: [PATCH 03/13] fix: deduct the protocol directly from when we get the packets. --- ebpf/connections/factory.go | 6 +++--- ebpf/connections/tracker.go | 21 +++++++++++++++++++ ebpf/kernel/module.cc | 42 ++++++++++++++++++++++++++++++++++++- ebpf/structs/structs.go | 1 + 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index 6b958704..993051d1 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -83,9 +83,9 @@ var ( ) const ( - protocolUnknown = "unknown" - protocolhttp1 = "http1" - protocolhttp2 = "http2" + protocolUnknown = "UNKN" + protocolhttp1 = "HTTP1" + protocolhttp2 = "HTTP2" ) func init() { diff --git a/ebpf/connections/tracker.go b/ebpf/connections/tracker.go index e6cebb1d..aee02947 100644 --- a/ebpf/connections/tracker.go +++ b/ebpf/connections/tracker.go @@ -112,6 +112,27 @@ func (conn *Tracker) AddOpenEvent(event structs.SocketOpenEvent) { conn.lastAccessTimestamp = now conn.srcIp = event.SrcIp conn.srcPort = event.SrcPort + + protocolBytes := event.Protocol[:] + nullIndex := -1 + for i, b := range protocolBytes { + if b == 0 { + nullIndex = i + break + } + } + if nullIndex > 0 { + protocolStr := string(protocolBytes[:nullIndex]) + switch protocolStr { + case protocolhttp1: + conn.protocol = protocolhttp1 + case protocolhttp2: + conn.protocol = protocolhttp2 + default: + conn.protocol = protocolUnknown + } + metaUtils.LogIngest("Protocol set from eBPF", "fd", conn.connID.Fd, "id", conn.connID.Id, "protocol", conn.protocol, "raw", protocolStr) + } } func (conn *Tracker) AddCloseEvent(event structs.SocketCloseEvent) { diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index a17a1251..2fbcb5f6 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -49,6 +49,7 @@ struct conn_info_t { bool ssl; u32 readEventsCount; u32 writeEventsCount; + char protocol[8]; }; union sockaddr_t { @@ -87,6 +88,7 @@ struct socket_open_event_t { u32 src_ip; unsigned short src_port; u64 socket_open_ns; + char protocol[8]; // Protocol detected from payload: "HTTP1", "HTTP2", "UNKN" }; struct socket_close_event_t { @@ -233,6 +235,9 @@ static __inline void process_syscall_accept(struct pt_regs* ret, const struct ac conn_info.readEventsCount = 0; conn_info.writeEventsCount = 0; + // Initialize protocol as unknown - will be detected from first data packet + __builtin_memcpy(conn_info.protocol, "UNKN", 5); + u32 tgid = id >> 32; u64 tgid_fd = 0; if(isConnect){ @@ -280,6 +285,7 @@ static __inline void process_syscall_accept(struct pt_regs* ret, const struct ac socket_open_event.ip = conn_info.ip; socket_open_event.src_ip = srcIp; socket_open_event.src_port = lport; + __builtin_memcpy(socket_open_event.protocol, conn_info.protocol, 8); if (PRINT_BPF_LOGS){ bpf_trace_printk("accept call: %llu %d %d", socket_open_event.id, socket_open_event.fd, isConnect); @@ -401,6 +407,9 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data size_to_save = MAX_MSG_SIZE; } + // Detect protocol from first packet payload + detect_protocol_from_data(conn_info, socket_data_event->msg, size_to_save); + if (is_send){ conn_info->writeEventsCount = (conn_info->writeEventsCount) + 1u; } else { @@ -446,7 +455,38 @@ static __inline void process_syscall_data_vecs(struct pt_regs* ret, struct data_ } } -// Hooks + +static __inline void detect_protocol_from_data(struct conn_info_t *conn_info, const char *buf, size_t count) { + // Only detect if protocol is still unknown + if (conn_info->protocol[0] != 'U') { + return; + } + + if (count < 6) { + return; + } + + if (buf[0] == 'P' && buf[1] == 'R' && buf[2] == 'I' && buf[3] == ' ' && buf[4] == '*') { + __builtin_memcpy(conn_info->protocol, "HTTP2", 6); + return; + } + + if ((buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ') || + (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') || + (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T' && buf[3] == ' ') || + (buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E') || + (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') || + (buf[0] == 'P' && buf[1] == 'A' && buf[2] == 'T' && buf[3] == 'C')) { + __builtin_memcpy(conn_info->protocol, "HTTP1", 6); + return; + } + + if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P' && buf[4] == '/') { + __builtin_memcpy(conn_info->protocol, "HTTP1", 6); + return; + } +} + int syscall__probe_entry_accept(struct pt_regs* ctx, int sockfd, struct sockaddr* addr, socklen_t* addrlen) { u64 id = bpf_get_current_pid_tgid(); diff --git a/ebpf/structs/structs.go b/ebpf/structs/structs.go index 3a88bd09..f392f5d2 100644 --- a/ebpf/structs/structs.go +++ b/ebpf/structs/structs.go @@ -46,6 +46,7 @@ type SocketOpenEvent struct { SrcPort uint16 Padding [2]byte Socket_open_ns uint64 + Protocol [8]byte } type SocketCloseEvent struct { From bfae1dc529754d1c8335776eaa82cc427a0d71aa Mon Sep 17 00:00:00 2001 From: kural-akto Date: Tue, 23 Dec 2025 01:08:26 +0530 Subject: [PATCH 04/13] fix: detect protocol in data event not on socket open --- ebpf/connections/factory.go | 60 +------------------------------- ebpf/connections/tracker.go | 52 ++++++++++++++-------------- ebpf/kernel/module.cc | 69 +++++++++++++++++++------------------ ebpf/structs/structs.go | 1 + 4 files changed, 64 insertions(+), 118 deletions(-) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index 993051d1..38194cec 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -223,12 +223,7 @@ func (factory *Factory) StartWorker(connectionID structs.ConnID, tracker *Tracke switch e := event.(type) { case *structs.SocketDataEvent: utils.LogProcessing("Received data event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) - - protocol := tracker.GetProtocol() - if protocol == protocolUnknown { - protocol = detectProtocol(e.Msg[:]) - } - tracker.AddDataEvent(*e, protocol) + tracker.AddDataEvent(*e) case *structs.SocketOpenEvent: utils.LogProcessing("Received open event", "fd", connID.Fd, "id", connID.Id, "timestamp", connID.Conn_start_ns, "ip", connID.Ip, "port", connID.Port) tracker.AddOpenEvent(*e) @@ -342,56 +337,3 @@ func (factory *Factory) SendEvent(connectionID structs.ConnID, event interface{} utils.LogProcessing("No worker found for", "connectionId", connectionID) } } - -func detectProtocol(data []byte) string { - if len(data) < 16 { - return protocolUnknown - } - - http2Preface := []byte("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - if len(data) >= len(http2Preface) && bytes.Equal(data[:len(http2Preface)], http2Preface) { - return protocolhttp2 - } - - // Frame format: 3 bytes length + 1 byte type + 1 byte flags + 4 bytes stream ID - if len(data) >= 9 { - frameType := data[3] - - if frameType <= 9 { - - frameLen := int(data[0])<<16 | int(data[1])<<8 | int(data[2]) - if frameLen > 0 && frameLen <= 16384 { // Max frame size is typically 16KB - // Additional heuristic: check if it's not starting with HTTP/1 methods - if !bytes.HasPrefix(data, []byte("GET ")) && - !bytes.HasPrefix(data, []byte("POST ")) && - !bytes.HasPrefix(data, []byte("PUT ")) && - !bytes.HasPrefix(data, []byte("DELETE ")) && - !bytes.HasPrefix(data, []byte("PATCH ")) && - !bytes.HasPrefix(data, []byte("HEAD ")) && - !bytes.HasPrefix(data, []byte("OPTIONS ")) && - !bytes.HasPrefix(data, []byte("CONNECT ")) && - !bytes.HasPrefix(data, []byte("TRACE ")) && - !bytes.HasPrefix(data, httpBytes) { - return protocolhttp2 - } - } - } - } - - // HTTP/1.x detection - starts with HTTP method or "HTTP/" - if bytes.HasPrefix(data, []byte("GET ")) || - bytes.HasPrefix(data, []byte("POST ")) || - bytes.HasPrefix(data, []byte("PUT ")) || - bytes.HasPrefix(data, []byte("DELETE ")) || - bytes.HasPrefix(data, []byte("PATCH ")) || - bytes.HasPrefix(data, []byte("HEAD ")) || - bytes.HasPrefix(data, []byte("OPTIONS ")) || - bytes.HasPrefix(data, []byte("CONNECT ")) || - bytes.HasPrefix(data, []byte("TRACE ")) || - bytes.HasPrefix(data, []byte("HTTP/1.")) || - bytes.HasPrefix(data, []byte("HTTP/0.")) { - return protocolhttp1 - } - - return protocolUnknown -} diff --git a/ebpf/connections/tracker.go b/ebpf/connections/tracker.go index aee02947..a0b1459a 100644 --- a/ebpf/connections/tracker.go +++ b/ebpf/connections/tracker.go @@ -56,13 +56,35 @@ func (conn *Tracker) IsComplete() bool { return complete } -func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent, protocol string) { +func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) { conn.mutex.Lock() defer conn.mutex.Unlock() - if conn.protocol == protocolUnknown && protocol != protocolUnknown { - conn.protocol = protocol - metaUtils.LogIngest("Protocol detected", "fd", conn.connID.Fd, "id", conn.connID.Id, "protocol", protocol) + // Update protocol from eBPF if it has changed from UNKN + protocolBytes := event.Attr.Protocol[:] + nullIndex := -1 + for i, b := range protocolBytes { + if b == 0 { + nullIndex = i + break + } + } + if nullIndex > 0 { + protocolStr := string(protocolBytes[:nullIndex]) + if protocolStr != protocolUnknown && conn.protocol != protocolStr { + oldProtocol := conn.protocol + switch protocolStr { + case protocolhttp1: + conn.protocol = protocolhttp1 + case protocolhttp2: + conn.protocol = protocolhttp2 + default: + conn.protocol = protocolUnknown + } + if oldProtocol != conn.protocol { + metaUtils.LogProcessing("Protocol updated from data event", "fd", conn.connID.Fd, "id", conn.connID.Id, "old", oldProtocol, "new", conn.protocol) + } + } } if !conn.ssl && event.Attr.Ssl { @@ -112,27 +134,7 @@ func (conn *Tracker) AddOpenEvent(event structs.SocketOpenEvent) { conn.lastAccessTimestamp = now conn.srcIp = event.SrcIp conn.srcPort = event.SrcPort - - protocolBytes := event.Protocol[:] - nullIndex := -1 - for i, b := range protocolBytes { - if b == 0 { - nullIndex = i - break - } - } - if nullIndex > 0 { - protocolStr := string(protocolBytes[:nullIndex]) - switch protocolStr { - case protocolhttp1: - conn.protocol = protocolhttp1 - case protocolhttp2: - conn.protocol = protocolhttp2 - default: - conn.protocol = protocolUnknown - } - metaUtils.LogIngest("Protocol set from eBPF", "fd", conn.connID.Fd, "id", conn.connID.Id, "protocol", conn.protocol, "raw", protocolStr) - } + // Protocol will be set from SocketDataEvent, not from SocketOpenEvent } func (conn *Tracker) AddCloseEvent(event structs.SocketCloseEvent) { diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index 2fbcb5f6..e25be939 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -110,6 +110,7 @@ struct socket_data_event_t { u32 readEventsCount; u32 writeEventsCount; bool ssl; + char protocol[8]; // Protocol detected: "HTTP1", "HTTP2", "UNKN" char msg[MAX_MSG_SIZE]; }; @@ -324,7 +325,38 @@ static __inline void process_syscall_close(struct pt_regs* ret, const struct clo socket_close_event.socket_close_ns = bpf_ktime_get_ns(); socket_close_events.perf_submit(ret, &socket_close_event, sizeof(struct socket_close_event_t)); - conn_info_map.delete(&tgid_fd); + conn_info_map.delete(&tgid_fd); +} + +static __inline void detect_protocol_from_data(struct conn_info_t *conn_info, const char *buf, size_t count) { + // Only detect if protocol is still unknown + if (conn_info->protocol[0] != 'U') { + return; + } + + if (count < 6) { + return; + } + + if (buf[0] == 'P' && buf[1] == 'R' && buf[2] == 'I' && buf[3] == ' ' && buf[4] == '*') { + __builtin_memcpy(conn_info->protocol, "HTTP2", 6); + return; + } + + if ((buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ') || + (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') || + (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T' && buf[3] == ' ') || + (buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E') || + (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') || + (buf[0] == 'P' && buf[1] == 'A' && buf[2] == 'T' && buf[3] == 'C')) { + __builtin_memcpy(conn_info->protocol, "HTTP1", 6); + return; + } + + if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P' && buf[4] == '/') { + __builtin_memcpy(conn_info->protocol, "HTTP1", 6); + return; + } } static __inline void process_syscall_data(struct pt_regs* ret, const struct data_args_t* args, u64 id, bool is_send, bool ssl) { @@ -376,8 +408,9 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data socket_data_event->fd = conn_info->fd; socket_data_event->conn_start_ns = conn_info->conn_start_ns; socket_data_event->port = conn_info->port; - socket_data_event->ip = conn_info->ip; + socket_data_event->ip = conn_info->ip; socket_data_event->ssl = conn_info->ssl; + __builtin_memcpy(socket_data_event->protocol, conn_info->protocol, 8); int bytes_sent = 0; size_t size_to_save = 0; @@ -455,38 +488,6 @@ static __inline void process_syscall_data_vecs(struct pt_regs* ret, struct data_ } } - -static __inline void detect_protocol_from_data(struct conn_info_t *conn_info, const char *buf, size_t count) { - // Only detect if protocol is still unknown - if (conn_info->protocol[0] != 'U') { - return; - } - - if (count < 6) { - return; - } - - if (buf[0] == 'P' && buf[1] == 'R' && buf[2] == 'I' && buf[3] == ' ' && buf[4] == '*') { - __builtin_memcpy(conn_info->protocol, "HTTP2", 6); - return; - } - - if ((buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ') || - (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') || - (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T' && buf[3] == ' ') || - (buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E') || - (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') || - (buf[0] == 'P' && buf[1] == 'A' && buf[2] == 'T' && buf[3] == 'C')) { - __builtin_memcpy(conn_info->protocol, "HTTP1", 6); - return; - } - - if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P' && buf[4] == '/') { - __builtin_memcpy(conn_info->protocol, "HTTP1", 6); - return; - } -} - int syscall__probe_entry_accept(struct pt_regs* ctx, int sockfd, struct sockaddr* addr, socklen_t* addrlen) { u64 id = bpf_get_current_pid_tgid(); diff --git a/ebpf/structs/structs.go b/ebpf/structs/structs.go index f392f5d2..342f0d36 100644 --- a/ebpf/structs/structs.go +++ b/ebpf/structs/structs.go @@ -18,6 +18,7 @@ type SocketDataEventAttr struct { ReadEventsCount uint32 WriteEventsCount uint32 Ssl bool + Protocol [8]byte } /* From 5d7e3a7d8313f98145eb291c5faac5277c634d64 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Tue, 23 Dec 2025 11:49:39 +0530 Subject: [PATCH 05/13] fix: added proper comments and common function for filters and kafka production. --- ebpf/kernel/module.cc | 4 + trafficUtil/kafkaUtil/parser.go | 531 ++++++++++++++------------------ 2 files changed, 228 insertions(+), 307 deletions(-) diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index e25be939..834fd7d5 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -338,11 +338,14 @@ static __inline void detect_protocol_from_data(struct conn_info_t *conn_info, co return; } + // HTTP/2 connection preface: "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" if (buf[0] == 'P' && buf[1] == 'R' && buf[2] == 'I' && buf[3] == ' ' && buf[4] == '*') { __builtin_memcpy(conn_info->protocol, "HTTP2", 6); return; } + // HTTP/1.x request methods (verbs): GET, POST, PUT, DELETE, HEAD, PATCH + // GET /path HTTP/1.1 if ((buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ') || (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') || (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T' && buf[3] == ' ') || @@ -353,6 +356,7 @@ static __inline void detect_protocol_from_data(struct conn_info_t *conn_info, co return; } + // HTTP/1.x response: "HTTP/1.0 200 OK" or "HTTP/1.1 200 OK" if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P' && buf[4] == '/') { __builtin_memcpy(conn_info->protocol, "HTTP1", 6); return; diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 47acab7b..9347c186 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -308,22 +308,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d id := "" - // build req headers for threat client - reqHeader := make(map[string]*trafficpb.StringList) - for name, values := range req.Header { - // Loop over all values for the name. - for _, value := range values { - reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - } - ip := GetSourceIp(reqHeader, sourceIp) - - reqHeader["host"] = &trafficpb.StringList{ - Values: []string{req.Host}, - } - + // build req headers for threat client (no longer needed, keeping for x-debug-token detection) reqHeaderStr := make(map[string]string) for name, values := range req.Header { // Loop over all values for the name. @@ -338,166 +323,48 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d reqHeaderStr["host"] = req.Host - passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) - //printLog("Req header: " + mapToString(reqHeaderStr)) - //printLog(fmt.Sprintf("passes %t", passes)) - - if !passes { - i++ - continue - } - - if utils.IgnoreIpTraffic && utils.CheckIfIp(req.Host) { - i++ - continue - } - - if utils.IgnoreCloudMetadataCalls && req.Host == "169.254.169.254" { + // Apply common filters + if !applyFiltersAndChecks(reqHeaderStr, req.Host, sourceIp, direction) { i++ continue } - if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { - slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "url", req.URL.String(), "host", req.Host) - i++ - continue - } - - var skipPacket = utils.FilterPacket(reqHeaderStr) - - if skipPacket { - i++ - continue - } - - // build resp headers for threat client - respHeader := make(map[string]*trafficpb.StringList) - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - respHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - } - - // TODO: remove and use protobuf instead + // Build response headers map respHeaderStr := make(map[string]string) for name, values := range resp.Header { - // Loop over all values for the name. for _, value := range values { respHeaderStr[name] = value } } - url := req.URL.String() - checkDebugUrlAndPrint(url, req.Host, "URL,host found in ParseAndProduce") - - // build kafka payload for threat client - payload := &trafficpb.HttpResponseParam{ - Method: req.Method, - Path: req.URL.String(), - RequestHeaders: reqHeader, - ResponseHeaders: respHeader, - RequestPayload: requestsContent[i], - ResponsePayload: responsesContent[i], - Ip: ip, - Time: int32(time.Now().Unix()), - StatusCode: int32(resp.StatusCode), - Type: string(req.Proto), - Status: resp.Status, - AktoAccountId: fmt.Sprint(1000000), - AktoVxlanId: fmt.Sprint(vxlanID), - IsPending: isPending, - Source: trafficSource, - } - - reqHeaderString, _ := json.Marshal(reqHeaderStr) - respHeaderString, _ := json.Marshal(respHeaderStr) - - // TODO: remove and use protobuf instead - value := map[string]string{ - "path": req.URL.String(), - "requestHeaders": string(reqHeaderString), - "responseHeaders": string(respHeaderString), - "method": req.Method, - "requestPayload": requestsContent[i], - "responsePayload": responsesContent[i], - "ip": sourceIp, - "destIp": destIp, - "time": fmt.Sprint(time.Now().Unix()), - "statusCode": fmt.Sprint(resp.StatusCode), - "type": string(req.Proto), - "status": resp.Status, - "akto_account_id": fmt.Sprint(1000000), - "akto_vxlan_id": fmt.Sprint(vxlanID), - "is_pending": fmt.Sprint(isPending), - "source": trafficSource, - "direction": fmt.Sprint(direction), - "process_id": fmt.Sprint(idfd >> 32), - "socket_id": fmt.Sprint(fd), - "daemonset_id": fmt.Sprint(daemonsetIdentifier), - "enable_graph": fmt.Sprint(utils.EnableGraph), - } - - // Process id was captured from the eBPF program using bpf_get_current_pid_tgid() - // Shifting by 32 gives us the process id on host machine. - var pid = idfd >> 32 - log := fmt.Sprintf("pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", - direction, - reqHeaderStr["host"], - value["path"], - sourceIp, - destIp, - value["socket_id"], - pid, - hostName, - ) - checkDebugUrlAndPrint(url, req.Host, log) - - if PodInformerInstance != nil && direction == utils.DirectionInbound { - - if hostName == "" { - checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) - slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) - } else { - podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host) - if err != nil { - slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) - checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName) - } else { - value["tag"] = podLabels - checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) - slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) - } - } - } else { - checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) - } - - out, _ := json.Marshal(value) - ctx := context.Background() - - // calculating the size of outgoing bytes and requests (1) and saving it in outgoingCounterMap - // this number is the closest (slightly higher) to the actual connection transfer bytes. - outgoingBytes := len(out) - - if checkAndUpdateBandwidthProcessed(outgoingBytes) { - return + // Use common production logic + params := trafficParams{ + method: req.Method, + path: req.URL.String(), + requestHeaders: reqHeaderStr, + responseHeaders: respHeaderStr, + requestPayload: requestsContent[i], + responsePayload: responsesContent[i], + statusCode: resp.StatusCode, + status: resp.Status, + protocolType: string(req.Proto), + sourceIp: sourceIp, + destIp: destIp, + vxlanID: vxlanID, + isPending: isPending, + trafficSource: trafficSource, + direction: direction, + idfd: idfd, + fd: fd, + daemonsetID: daemonsetIdentifier, + hostName: hostName, } - hostString := reqHeaderStr["host"] - if utils.CheckIfIpHost(hostString) { - hostString = "ip-host" - } - oc := utils.GenerateOutgoingCounter(vxlanID, sourceIp, hostString) - trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) - if shouldPrint { if strings.Contains(responsesContent[i], id) { goodRequests++ } else { - slog.Debug("req-resp.String()", "out", string(out)) + slog.Debug("req-resp mismatch", "path", req.URL.String()) badRequests++ } @@ -506,15 +373,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d } } - if apiProcessor.CloudProcessorInstance != nil { - apiProcessor.CloudProcessorInstance.Produce(value) - - } else { - // Produce to kafka - // TODO : remove and use protobuf instead - go ProduceStr(ctx, string(out), url, req.Host) - go Produce(ctx, payload) - } + produceTrafficData(params) i++ } @@ -534,6 +393,176 @@ type http2Stream struct { responseComplete bool } +type trafficParams struct { + method string + path string + requestHeaders map[string]string + responseHeaders map[string]string + requestPayload string + responsePayload string + statusCode int + status string + protocolType string + sourceIp string + destIp string + vxlanID int + isPending bool + trafficSource string + direction int + idfd uint64 + fd uint32 + daemonsetID string + hostName string +} + +// applyFiltersAndChecks performs common filtering logic for both HTTP/1 and HTTP/2 +func applyFiltersAndChecks(reqHeaderStr map[string]string, host string, sourceIp string, direction int) bool { + passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) + if !passes { + return false + } + + if utils.IgnoreIpTraffic && utils.CheckIfIp(host) { + return false + } + + if utils.IgnoreCloudMetadataCalls && host == "169.254.169.254" { + return false + } + + if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { + slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "host", host) + return false + } + + return !utils.FilterPacket(reqHeaderStr) +} + +func produceTrafficData(params trafficParams) { + + reqHeader := make(map[string]*trafficpb.StringList) + for name, value := range params.requestHeaders { + reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + } + + // Add HTTP/2 pseudo-headers if present + if params.method != "" { + reqHeader[":method"] = &trafficpb.StringList{Values: []string{params.method}} + } + if params.path != "" { + reqHeader[":path"] = &trafficpb.StringList{Values: []string{params.path}} + } + + ip := GetSourceIp(reqHeader, params.sourceIp) + + respHeader := make(map[string]*trafficpb.StringList) + for name, value := range params.responseHeaders { + respHeader[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + } + + host := params.requestHeaders["host"] + url := params.path + checkDebugUrlAndPrint(url, host, fmt.Sprintf("%s URL,host found", params.protocolType)) + + payload := &trafficpb.HttpResponseParam{ + Method: params.method, + Path: params.path, + RequestHeaders: reqHeader, + ResponseHeaders: respHeader, + RequestPayload: params.requestPayload, + ResponsePayload: params.responsePayload, + Ip: ip, + Time: int32(time.Now().Unix()), + StatusCode: int32(params.statusCode), + Type: params.protocolType, + Status: params.status, + AktoAccountId: fmt.Sprint(1000000), + AktoVxlanId: fmt.Sprint(params.vxlanID), + IsPending: params.isPending, + Source: params.trafficSource, + } + + // Build JSON value map + reqHeaderString, _ := json.Marshal(params.requestHeaders) + respHeaderString, _ := json.Marshal(params.responseHeaders) + + // TODO: remove and use protobuf instead + value := map[string]string{ + "path": params.path, + "requestHeaders": string(reqHeaderString), + "responseHeaders": string(respHeaderString), + "method": params.method, + "requestPayload": params.requestPayload, + "responsePayload": params.responsePayload, + "ip": params.sourceIp, + "destIp": params.destIp, + "time": fmt.Sprint(time.Now().Unix()), + "statusCode": fmt.Sprint(params.statusCode), + "type": params.protocolType, + "status": params.status, + "akto_account_id": fmt.Sprint(1000000), + "akto_vxlan_id": fmt.Sprint(params.vxlanID), + "is_pending": fmt.Sprint(params.isPending), + "source": params.trafficSource, + "direction": fmt.Sprint(params.direction), + "process_id": fmt.Sprint(params.idfd >> 32), + "socket_id": fmt.Sprint(params.fd), + "daemonset_id": params.daemonsetID, + "enable_graph": fmt.Sprint(utils.EnableGraph), + } + + var pid = params.idfd >> 32 + log := fmt.Sprintf("%s pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", + params.protocolType, params.direction, host, params.path, params.sourceIp, params.destIp, params.fd, pid, params.hostName) + checkDebugUrlAndPrint(url, host, log) + + // Resolve pod labels if applicable + if PodInformerInstance != nil && params.direction == utils.DirectionInbound { + if params.hostName == "" { + checkDebugUrlAndPrint(url, host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) + slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", params.hostName) + } else { + podLabels, err := PodInformerInstance.ResolvePodLabels(params.hostName, url, host) + if err != nil { + slog.Error("Failed to resolve pod labels", "hostName", params.hostName, "error", err) + checkDebugUrlAndPrint(url, host, "Error resolving pod labels "+params.hostName) + } else { + value["tag"] = podLabels + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels found in %s, podLabels found %v for hostName %s", params.protocolType, podLabels, params.hostName)) + slog.Debug("Pod labels", "podName", params.hostName, "labels", podLabels) + } + } + } else { + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: %d", params.direction)) + } + + out, _ := json.Marshal(value) + ctx := context.Background() + + outgoingBytes := len(out) + if checkAndUpdateBandwidthProcessed(outgoingBytes) { + return + } + + hostString := host + if utils.CheckIfIpHost(hostString) { + hostString = "ip-host" + } + oc := utils.GenerateOutgoingCounter(params.vxlanID, params.sourceIp, hostString) + trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) + + if apiProcessor.CloudProcessorInstance != nil { + apiProcessor.CloudProcessorInstance.Produce(value) + } else { + go ProduceStr(ctx, string(out), url, host) + go Produce(ctx, payload) + } +} + func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, destIp string, vxlanID int, isPending bool, trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string) { @@ -572,36 +601,11 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri } } - reqHeader := make(map[string]*trafficpb.StringList) - for name, value := range stream.requestHeaders { - reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - - ip := GetSourceIp(reqHeader, sourceIp) - - if stream.method != "" { - reqHeader[":method"] = &trafficpb.StringList{Values: []string{stream.method}} - } - if stream.path != "" { - reqHeader[":path"] = &trafficpb.StringList{Values: []string{stream.path}} - } - + // Extract host for filtering host := stream.requestHeaders[":authority"] if host == "" { host = stream.requestHeaders["host"] } - if host != "" { - reqHeader["host"] = &trafficpb.StringList{Values: []string{host}} - } - - respHeader := make(map[string]*trafficpb.StringList) - for name, value := range stream.responseHeaders { - respHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } reqHeaderStr := make(map[string]string) for name, value := range stream.requestHeaders { @@ -611,127 +615,40 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri reqHeaderStr["host"] = host } - passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) - if !passes { - continue - } - - if utils.IgnoreIpTraffic && utils.CheckIfIp(host) { - continue - } - - if utils.IgnoreCloudMetadataCalls && host == "169.254.169.254" { - continue - } - - if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { - slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "path", stream.path, "host", host) - continue - } - - var skipPacket = utils.FilterPacket(reqHeaderStr) - if skipPacket { + // Apply common filters + if !applyFiltersAndChecks(reqHeaderStr, host, sourceIp, direction) { continue } - url := stream.path - checkDebugUrlAndPrint(url, host, "HTTP/2 URL,host found in ParseHTTP2AndProduce") - - payload := &trafficpb.HttpResponseParam{ - Method: stream.method, - Path: stream.path, - RequestHeaders: reqHeader, - ResponseHeaders: respHeader, - RequestPayload: string(stream.requestBody), - ResponsePayload: string(stream.responseBody), - Ip: ip, - Time: int32(time.Now().Unix()), - StatusCode: int32(stream.statusCode), - Type: "HTTP/2.0", - Status: stream.status, - AktoAccountId: fmt.Sprint(1000000), - AktoVxlanId: fmt.Sprint(vxlanID), - IsPending: isPending, - Source: trafficSource, - } - respHeaderStr := make(map[string]string) for name, value := range stream.responseHeaders { respHeaderStr[name] = value } - reqHeaderString, _ := json.Marshal(reqHeaderStr) - respHeaderString, _ := json.Marshal(respHeaderStr) - - value := map[string]string{ - "path": stream.path, - "requestHeaders": string(reqHeaderString), - "responseHeaders": string(respHeaderString), - "method": stream.method, - "requestPayload": string(stream.requestBody), - "responsePayload": string(stream.responseBody), - "ip": sourceIp, - "destIp": destIp, - "time": fmt.Sprint(time.Now().Unix()), - "statusCode": fmt.Sprint(stream.statusCode), - "type": "HTTP/2.0", - "status": stream.status, - "akto_account_id": fmt.Sprint(1000000), - "akto_vxlan_id": fmt.Sprint(vxlanID), - "is_pending": fmt.Sprint(isPending), - "source": trafficSource, - "direction": fmt.Sprint(direction), - "process_id": fmt.Sprint(idfd >> 32), - "socket_id": fmt.Sprint(fd), - "daemonset_id": fmt.Sprint(daemonsetIdentifier), - "enable_graph": fmt.Sprint(utils.EnableGraph), - } - - var pid = idfd >> 32 - log := fmt.Sprintf("HTTP/2 pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v, streamId=%v", - direction, host, stream.path, sourceIp, destIp, fd, pid, hostName, streamID) - checkDebugUrlAndPrint(url, host, log) - - if PodInformerInstance != nil && direction == utils.DirectionInbound { - if hostName == "" { - checkDebugUrlAndPrint(url, host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) - slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) - } else { - podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, host) - if err != nil { - slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) - checkDebugUrlAndPrint(url, host, "Error resolving pod labels "+hostName) - } else { - value["tag"] = podLabels - checkDebugUrlAndPrint(url, host, "Pod labels found in ParseHTTP2AndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) - slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) - } - } - } else { - checkDebugUrlAndPrint(url, host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) - } - - out, _ := json.Marshal(value) - ctx := context.Background() - - outgoingBytes := len(out) - if checkAndUpdateBandwidthProcessed(outgoingBytes) { - return - } - - hostString := host - if utils.CheckIfIpHost(hostString) { - hostString = "ip-host" - } - oc := utils.GenerateOutgoingCounter(vxlanID, sourceIp, hostString) - trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) - - if apiProcessor.CloudProcessorInstance != nil { - apiProcessor.CloudProcessorInstance.Produce(value) - } else { - go ProduceStr(ctx, string(out), url, host) - go Produce(ctx, payload) - } + // Use common production logic + params := trafficParams{ + method: stream.method, + path: stream.path, + requestHeaders: reqHeaderStr, + responseHeaders: respHeaderStr, + requestPayload: string(stream.requestBody), + responsePayload: string(stream.responseBody), + statusCode: stream.statusCode, + status: stream.status, + protocolType: "HTTP/2.0", + sourceIp: sourceIp, + destIp: destIp, + vxlanID: vxlanID, + isPending: isPending, + trafficSource: trafficSource, + direction: direction, + idfd: idfd, + fd: fd, + daemonsetID: daemonsetIdentifier, + hostName: hostName, + } + + produceTrafficData(params) } } From 3340b833c035e8de109b7bab645f7a9c5bd393fc Mon Sep 17 00:00:00 2001 From: kural-akto Date: Tue, 23 Dec 2025 16:11:24 +0530 Subject: [PATCH 06/13] fix: byte count update --- ebpf/bpfwrapper/eventCallbacks.go | 2 +- trafficUtil/kafkaUtil/parser.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ebpf/bpfwrapper/eventCallbacks.go b/ebpf/bpfwrapper/eventCallbacks.go index 22082b28..355e1249 100644 --- a/ebpf/bpfwrapper/eventCallbacks.go +++ b/ebpf/bpfwrapper/eventCallbacks.go @@ -140,7 +140,7 @@ func SocketDataEventCallback(inputChan chan []byte, connectionFactory *connectio bytesSent := event.Attr.Bytes_sent // The 4 bytes are being lost in padding, thus, not taking them into consideration. - eventAttributesLogicalSize := 45 + eventAttributesLogicalSize := 53 if len(data) > eventAttributesLogicalSize { copy(event.Msg[:], data[eventAttributesLogicalSize:eventAttributesLogicalSize+int(utils.Abs(bytesSent))]) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 9347c186..259a23ab 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -50,7 +50,7 @@ var ( const ONE_MINUTE = 60 const ( - protocolhttp2 = "http2" + protocolhttp2 = "HTTP2" ) func init() { From 7cdc7815e9762baa7ef6cb95f667c74b96818e76 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Tue, 23 Dec 2025 22:33:20 +0530 Subject: [PATCH 07/13] fix: check for complete streams --- trafficUtil/kafkaUtil/parser.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 259a23ab..81666172 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -595,10 +595,8 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri if shouldPrint { slog.Debug("Incomplete stream", "streamID", streamID, "requestComplete", stream.requestComplete, "responseComplete", stream.responseComplete) } - // Skip incomplete streams unless connection is complete - if !isComplete { - continue - } + // Skip incomplete streams + continue } // Extract host for filtering From bf4a3a19446c65cb73a56b623a1ef25b59f92577 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Wed, 24 Dec 2025 14:00:48 +0530 Subject: [PATCH 08/13] remove not necessary code --- ebpf/connections/tracker.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/ebpf/connections/tracker.go b/ebpf/connections/tracker.go index a0b1459a..9cfb46fe 100644 --- a/ebpf/connections/tracker.go +++ b/ebpf/connections/tracker.go @@ -72,7 +72,6 @@ func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) { if nullIndex > 0 { protocolStr := string(protocolBytes[:nullIndex]) if protocolStr != protocolUnknown && conn.protocol != protocolStr { - oldProtocol := conn.protocol switch protocolStr { case protocolhttp1: conn.protocol = protocolhttp1 @@ -81,9 +80,6 @@ func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) { default: conn.protocol = protocolUnknown } - if oldProtocol != conn.protocol { - metaUtils.LogProcessing("Protocol updated from data event", "fd", conn.connID.Fd, "id", conn.connID.Id, "old", oldProtocol, "new", conn.protocol) - } } } @@ -116,12 +112,6 @@ func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) { conn.lastAccessTimestamp = uint64(time.Now().UnixNano()) } -func (conn *Tracker) GetProtocol() string { - conn.mutex.RLock() - defer conn.mutex.RUnlock() - return conn.protocol -} - func (conn *Tracker) AddOpenEvent(event structs.SocketOpenEvent) { conn.mutex.Lock() defer conn.mutex.Unlock() @@ -134,7 +124,6 @@ func (conn *Tracker) AddOpenEvent(event structs.SocketOpenEvent) { conn.lastAccessTimestamp = now conn.srcIp = event.SrcIp conn.srcPort = event.SrcPort - // Protocol will be set from SocketDataEvent, not from SocketOpenEvent } func (conn *Tracker) AddCloseEvent(event structs.SocketCloseEvent) { From 8f7de082567b280c73168637f774bd1f292b1644 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Wed, 24 Dec 2025 21:35:04 +0530 Subject: [PATCH 09/13] feat: implemented grpc support in ebpf --- trafficUtil/kafkaUtil/parser.go | 72 ++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 81666172..5d84ce04 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -5,6 +5,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/binary" "encoding/json" "fmt" "io" @@ -391,6 +392,9 @@ type http2Stream struct { status string requestComplete bool responseComplete bool + isGRPC bool + grpcStatus string + grpcMessage string } type trafficParams struct { @@ -623,6 +627,11 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri respHeaderStr[name] = value } + protocolType := "HTTP/2.0" + if stream.isGRPC { + protocolType = "gRPC" + } + // Use common production logic params := trafficParams{ method: stream.method, @@ -633,7 +642,7 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri responsePayload: string(stream.responseBody), statusCode: stream.statusCode, status: stream.status, - protocolType: "HTTP/2.0", + protocolType: protocolType, sourceIp: sourceIp, destIp: destIp, vxlanID: vxlanID, @@ -697,10 +706,14 @@ func parseHTTP2Frames(buffer []byte, streams map[uint32]*http2Stream, isRequest if isRequest { for _, hf := range headers { stream.requestHeaders[hf.Name] = hf.Value + if hf.Name == "content-type" && strings.HasPrefix(hf.Value, "application/grpc") { + stream.isGRPC = true + } // Extract pseudo-headers - if hf.Name == ":method" { + switch hf.Name { + case ":method": stream.method = hf.Value - } else if hf.Name == ":path" { + case ":path": stream.path = hf.Value } } @@ -711,9 +724,14 @@ func parseHTTP2Frames(buffer []byte, streams map[uint32]*http2Stream, isRequest for _, hf := range headers { stream.responseHeaders[hf.Name] = hf.Value // Extract status code - if hf.Name == ":status" { + switch hf.Name { + case ":status": stream.status = hf.Value fmt.Sscanf(hf.Value, "%d", &stream.statusCode) + case "grpc-status": + stream.grpcStatus = hf.Value + case "grpc-message": + stream.grpcMessage = hf.Value } } if f.StreamEnded() { @@ -724,12 +742,22 @@ func parseHTTP2Frames(buffer []byte, streams map[uint32]*http2Stream, isRequest case *http2.DataFrame: data := f.Data() if isRequest { - stream.requestBody = append(stream.requestBody, data...) + if stream.isGRPC { + parsedData := parseGRPCFrames(data) + stream.requestBody = append(stream.requestBody, parsedData...) + } else { + stream.requestBody = append(stream.requestBody, data...) + } if f.StreamEnded() { stream.requestComplete = true } } else { - stream.responseBody = append(stream.responseBody, data...) + if stream.isGRPC { + parsedData := parseGRPCFrames(data) + stream.responseBody = append(stream.responseBody, parsedData...) + } else { + stream.responseBody = append(stream.responseBody, data...) + } if f.StreamEnded() { stream.responseComplete = true } @@ -743,3 +771,35 @@ func parseHTTP2Frames(buffer []byte, streams map[uint32]*http2Stream, isRequest } } } + +func parseGRPCFrames(data []byte) []byte { + var finalData []byte + offset := 0 + + for offset < len(data) { + // Need atleast 5 bytes for gRPC + if offset+5 > len(data) { + break + } + compressed := data[offset] + messageLength := binary.BigEndian.Uint32(data[offset+1 : offset+5]) + + if offset+5+int(messageLength) > len(data) { + break + } + + message := data[offset+5 : offset+5+int(messageLength)] + + if compressed == 1 { + // Message is compressed + // TODO, if required we can get the compression algo and uncompress it. + slog.Debug("gRPC message is compressed", "length", messageLength) + } + + finalData = append(finalData, message...) + + offset += (5 + int(messageLength)) + } + + return finalData +} From 90f8f83f47065fb46a23bd6876ba11cbc492f9d6 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Thu, 25 Dec 2025 17:12:09 +0530 Subject: [PATCH 10/13] feat: added common library for http2 frame parsing --- ebpf/go.mod | 1 + ebpf/go.sum | 2 + trafficUtil/kafkaUtil/parser.go | 195 +++++--------------------------- 3 files changed, 34 insertions(+), 164 deletions(-) diff --git a/ebpf/go.mod b/ebpf/go.mod index f3162c1e..62c7e126 100644 --- a/ebpf/go.mod +++ b/ebpf/go.mod @@ -12,6 +12,7 @@ require ( ) require ( + github.com/akto-api-security/gomiddleware v0.1.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect diff --git a/ebpf/go.sum b/ebpf/go.sum index b48ff900..54c4c45b 100644 --- a/ebpf/go.sum +++ b/ebpf/go.sum @@ -1,3 +1,5 @@ +github.com/akto-api-security/gomiddleware v0.1.4 h1:jz3Umei5ItlyCBwHROdh9oRrLPhr0V6mwLT9vfvspc8= +github.com/akto-api-security/gomiddleware v0.1.4/go.mod h1:zDsxe1UTr+rGvHt6r1h+c8RkBBzy/A7iTMGoXiTZ5oI= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 5d84ce04..2ec59dd0 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -5,10 +5,10 @@ import ( "bytes" "compress/gzip" "context" - "encoding/binary" "encoding/json" "fmt" "io" + "log" "log/slog" "net/http" "os" @@ -16,12 +16,11 @@ import ( "sync" "time" + http2parser "github.com/akto-api-security/gomiddleware/http2parser" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/apiProcessor" trafficpb "github.com/akto-api-security/mirroring-api-logging/trafficUtil/protobuf/traffic_payload" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/trafficMetrics" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" - "golang.org/x/net/http2" - "golang.org/x/net/http2/hpack" ) var ( @@ -588,29 +587,42 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri sentBuffer = sentBuffer[len(http2Preface):] } - streams := make(map[uint32]*http2Stream) + // streams := make(map[uint32]*http2Stream) + opts := http2parser.NewParseOptions( + http2parser.WithBase64Encoding(true), + http2parser.WithWaitForEndStream(true), + http2parser.WithGRPCTrailers(true), + ) + streams := make(map[uint32]*http2parser.HTTP2Stream) - parseHTTP2Frames(receiveBuffer, streams, true, shouldPrint) - parseHTTP2Frames(sentBuffer, streams, false, shouldPrint) + err := http2parser.ParseHTTP2Frames(receiveBuffer, streams, true, opts) + if err != nil { + log.Printf("Error parsing requests: %v", err) + } + // Parse responses + err = http2parser.ParseHTTP2Frames(sentBuffer, streams, false, opts) + if err != nil { + log.Printf("Error parsing responses: %v", err) + } // Process complete request/response pairs for streamID, stream := range streams { - if !stream.requestComplete || !stream.responseComplete { + if !stream.RequestComplete || !stream.ResponseComplete { if shouldPrint { - slog.Debug("Incomplete stream", "streamID", streamID, "requestComplete", stream.requestComplete, "responseComplete", stream.responseComplete) + slog.Debug("Incomplete stream", "streamID", streamID, "requestComplete", stream.RequestComplete, "responseComplete", stream.ResponseComplete) } // Skip incomplete streams continue } // Extract host for filtering - host := stream.requestHeaders[":authority"] + host := stream.RequestHeaders[":authority"] if host == "" { - host = stream.requestHeaders["host"] + host = stream.RequestHeaders["host"] } reqHeaderStr := make(map[string]string) - for name, value := range stream.requestHeaders { + for name, value := range stream.RequestHeaders { reqHeaderStr[name] = value } if host != "" { @@ -623,25 +635,25 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri } respHeaderStr := make(map[string]string) - for name, value := range stream.responseHeaders { + for name, value := range stream.ResponseHeaders { respHeaderStr[name] = value } protocolType := "HTTP/2.0" - if stream.isGRPC { + if stream.IsGRPC { protocolType = "gRPC" } // Use common production logic params := trafficParams{ - method: stream.method, - path: stream.path, + method: stream.Method, + path: stream.Path, requestHeaders: reqHeaderStr, responseHeaders: respHeaderStr, - requestPayload: string(stream.requestBody), - responsePayload: string(stream.responseBody), - statusCode: stream.statusCode, - status: stream.status, + requestPayload: string(stream.RequestBody), + responsePayload: string(stream.ResponseBody), + statusCode: stream.StatusCode, + status: stream.Status, protocolType: protocolType, sourceIp: sourceIp, destIp: destIp, @@ -658,148 +670,3 @@ func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp stri produceTrafficData(params) } } - -func parseHTTP2Frames(buffer []byte, streams map[uint32]*http2Stream, isRequest bool, shouldPrint bool) { - framer := http2.NewFramer(nil, bytes.NewReader(buffer)) - framer.SetMaxReadFrameSize(1 << 20) // 1MB max frame size - - decoder := hpack.NewDecoder(4096, nil) - - for { - frame, err := framer.ReadFrame() - if err != nil { - if err != io.EOF { - if shouldPrint { - slog.Debug("Error reading HTTP/2 frame", "error", err, "isRequest", isRequest) - } - } - break - } - - streamID := frame.Header().StreamID - - // Skip stream 0 (connection-level frames like SETTINGS, WINDOW_UPDATE, PING) - if streamID == 0 { - continue - } - - // Get or create stream - stream, exists := streams[streamID] - if !exists { - stream = &http2Stream{ - streamID: streamID, - requestHeaders: make(map[string]string), - responseHeaders: make(map[string]string), - } - streams[streamID] = stream - } - - switch f := frame.(type) { - case *http2.HeadersFrame: - headerBlock := f.HeaderBlockFragment() - headers, err := decoder.DecodeFull(headerBlock) - if err != nil { - slog.Error("Failed to decode HPACK headers", "error", err, "streamID", streamID) - continue - } - - if isRequest { - for _, hf := range headers { - stream.requestHeaders[hf.Name] = hf.Value - if hf.Name == "content-type" && strings.HasPrefix(hf.Value, "application/grpc") { - stream.isGRPC = true - } - // Extract pseudo-headers - switch hf.Name { - case ":method": - stream.method = hf.Value - case ":path": - stream.path = hf.Value - } - } - if f.StreamEnded() { - stream.requestComplete = true - } - } else { - for _, hf := range headers { - stream.responseHeaders[hf.Name] = hf.Value - // Extract status code - switch hf.Name { - case ":status": - stream.status = hf.Value - fmt.Sscanf(hf.Value, "%d", &stream.statusCode) - case "grpc-status": - stream.grpcStatus = hf.Value - case "grpc-message": - stream.grpcMessage = hf.Value - } - } - if f.StreamEnded() { - stream.responseComplete = true - } - } - - case *http2.DataFrame: - data := f.Data() - if isRequest { - if stream.isGRPC { - parsedData := parseGRPCFrames(data) - stream.requestBody = append(stream.requestBody, parsedData...) - } else { - stream.requestBody = append(stream.requestBody, data...) - } - if f.StreamEnded() { - stream.requestComplete = true - } - } else { - if stream.isGRPC { - parsedData := parseGRPCFrames(data) - stream.responseBody = append(stream.responseBody, parsedData...) - } else { - stream.responseBody = append(stream.responseBody, data...) - } - if f.StreamEnded() { - stream.responseComplete = true - } - } - - case *http2.RSTStreamFrame: - // Stream was reset - if shouldPrint { - slog.Debug("Stream reset", "streamID", streamID, "errorCode", f.ErrCode) - } - } - } -} - -func parseGRPCFrames(data []byte) []byte { - var finalData []byte - offset := 0 - - for offset < len(data) { - // Need atleast 5 bytes for gRPC - if offset+5 > len(data) { - break - } - compressed := data[offset] - messageLength := binary.BigEndian.Uint32(data[offset+1 : offset+5]) - - if offset+5+int(messageLength) > len(data) { - break - } - - message := data[offset+5 : offset+5+int(messageLength)] - - if compressed == 1 { - // Message is compressed - // TODO, if required we can get the compression algo and uncompress it. - slog.Debug("gRPC message is compressed", "length", messageLength) - } - - finalData = append(finalData, message...) - - offset += (5 + int(messageLength)) - } - - return finalData -} From ca781ed168587c2297af2d5440f522a754fab29e Mon Sep 17 00:00:00 2001 From: kural-akto Date: Tue, 30 Dec 2025 10:25:44 +0530 Subject: [PATCH 11/13] handled checks for allowing http2 protocol --- ebpf/connections/factory.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index 38194cec..07bdafd4 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -17,6 +17,7 @@ import ( ) var httpBytes = []byte("HTTP") +var http2Preface = []byte("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") // Factory is a routine-safe container that holds a trackers with unique ID, and able to create new tracker. type Factory struct { @@ -97,6 +98,14 @@ func init() { utils.InitVar("TRACKER_DATA_PROCESS_INTERVAL", &trackerDataProcessInterval) } +func hasHTTPResponse(buffer []byte) bool { + return len(buffer) >= len(httpBytes) && bytes.Equal(buffer[:len(httpBytes)], httpBytes) +} + +func hasHTTP2Preface(buffer []byte) bool { + return len(buffer) >= len(http2Preface) && bytes.Equal(buffer[:len(http2Preface)], http2Preface) +} + func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool) { tracker.mutex.Lock() defer tracker.mutex.Unlock() @@ -128,14 +137,12 @@ func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool protocol := tracker.protocol - if (len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 { + if hasHTTPResponse(sentBuffer) || hasHTTP2Preface(receiveBuffer) { tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } - if !disableEgress { - // attempt to parse the egress as well by switching the recv and sent buffers. - if (len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 { - tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) - } + + if !disableEgress && (hasHTTPResponse(receiveBuffer) || hasHTTP2Preface(sentBuffer)) { + tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } } From b32213227c651acb23c52abc7a4df84eaedcb240 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Tue, 30 Dec 2025 11:46:13 +0530 Subject: [PATCH 12/13] code cleanup --- ebpf/kernel/module.cc | 1 + trafficUtil/kafkaUtil/parser.go | 30 +----------------------------- 2 files changed, 2 insertions(+), 29 deletions(-) diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index 834fd7d5..8be803ae 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -447,6 +447,7 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data // Detect protocol from first packet payload detect_protocol_from_data(conn_info, socket_data_event->msg, size_to_save); + // Hooks if (is_send){ conn_info->writeEventsCount = (conn_info->writeEventsCount) + 1u; } else { diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index 73a68bb5..b2d9183b 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -478,7 +478,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, ctx TrafficContext // Parse based on protocol var parsed *ParsedTraffic if ctx.Protocol == protocolhttp2 { - slog.Debug("Using HTTP/2 parser", "sourceIp", ctx.SourceIP, "destIp", ctx.DestIP) + // slog.Debug("Using HTTP/2 parser", "sourceIp", ctx.SourceIP, "destIp", ctx.DestIP) parsed = parseHTTP2Traffic(receiveBuffer, sentBuffer, ctx, shouldPrint) } else { parsed = parseHTTPTraffic(receiveBuffer, sentBuffer, shouldPrint) @@ -605,26 +605,7 @@ func sendMetrics(headers ConvertedHeaders, ctx TrafficContext, outgoingBytes int } } -type http2Stream struct { - streamID uint32 - requestHeaders map[string]string - requestBody []byte - responseHeaders map[string]string - responseBody []byte - method string - path string - statusCode int - status string - requestComplete bool - responseComplete bool - isGRPC bool - grpcStatus string - grpcMessage string -} - -// parseHTTP2Traffic parses HTTP/2 frames and returns ParsedTraffic structure func parseHTTP2Traffic(receiveBuffer []byte, sentBuffer []byte, ctx TrafficContext, shouldPrint bool) *ParsedTraffic { - // Strip HTTP/2 connection preface http2Preface := []byte("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") if len(receiveBuffer) >= len(http2Preface) && bytes.Equal(receiveBuffer[:len(http2Preface)], http2Preface) { receiveBuffer = receiveBuffer[len(http2Preface):] @@ -639,7 +620,6 @@ func parseHTTP2Traffic(receiveBuffer []byte, sentBuffer []byte, ctx TrafficConte } } - // Parse HTTP/2 frames opts := http2parser.NewParseOptions( http2parser.WithBase64Encoding(true), http2parser.WithWaitForEndStream(true), @@ -673,7 +653,6 @@ func parseHTTP2Traffic(receiveBuffer []byte, sentBuffer []byte, ctx TrafficConte continue } - // Build http.Request from HTTP/2 stream req, err := convertHTTP2ToRequest(stream) if err != nil { if shouldPrint { @@ -682,7 +661,6 @@ func parseHTTP2Traffic(receiveBuffer []byte, sentBuffer []byte, ctx TrafficConte continue } - // Build http.Response from HTTP/2 stream resp := convertHTTP2ToResponse(stream) parsed.Requests = append(parsed.Requests, *req) @@ -698,7 +676,6 @@ func parseHTTP2Traffic(receiveBuffer []byte, sentBuffer []byte, ctx TrafficConte return parsed } -// convertHTTP2ToRequest converts HTTP/2 stream to http.Request func convertHTTP2ToRequest(stream *http2parser.HTTP2Stream) (*http.Request, error) { method := stream.Method if method == "" { @@ -710,7 +687,6 @@ func convertHTTP2ToRequest(stream *http2parser.HTTP2Stream) (*http.Request, erro path = "/" } - // Build URL from pseudo-headers scheme := stream.RequestHeaders[":scheme"] if scheme == "" { scheme = "https" @@ -727,7 +703,6 @@ func convertHTTP2ToRequest(stream *http2parser.HTTP2Stream) (*http.Request, erro return nil, fmt.Errorf("failed to parse URL %s: %w", urlStr, err) } - // Build HTTP headers (skip pseudo-headers) header := make(http.Header) for name, value := range stream.RequestHeaders { if !strings.HasPrefix(name, ":") { @@ -738,7 +713,6 @@ func convertHTTP2ToRequest(stream *http2parser.HTTP2Stream) (*http.Request, erro header.Set("Host", authority) } - // Determine protocol proto := "HTTP/2.0" if stream.IsGRPC { proto = "gRPC" @@ -753,9 +727,7 @@ func convertHTTP2ToRequest(stream *http2parser.HTTP2Stream) (*http.Request, erro }, nil } -// convertHTTP2ToResponse converts HTTP/2 stream to http.Response func convertHTTP2ToResponse(stream *http2parser.HTTP2Stream) *http.Response { - // Build HTTP headers (skip pseudo-headers) header := make(http.Header) for name, value := range stream.ResponseHeaders { if !strings.HasPrefix(name, ":") { From a309b9eb3db54a6ec928f95fc4ca212458d1ff63 Mon Sep 17 00:00:00 2001 From: kural-akto Date: Tue, 30 Dec 2025 11:47:05 +0530 Subject: [PATCH 13/13] code cleanup --- ebpf/kernel/module.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index 8be803ae..eaa9cbd7 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -447,7 +447,6 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data // Detect protocol from first packet payload detect_protocol_from_data(conn_info, socket_data_event->msg, size_to_save); - // Hooks if (is_send){ conn_info->writeEventsCount = (conn_info->writeEventsCount) + 1u; } else { @@ -493,6 +492,7 @@ static __inline void process_syscall_data_vecs(struct pt_regs* ret, struct data_ } } +// Hooks int syscall__probe_entry_accept(struct pt_regs* ctx, int sockfd, struct sockaddr* addr, socklen_t* addrlen) { u64 id = bpf_get_current_pid_tgid();