diff --git a/ebpf/bpfwrapper/eventCallbacks.go b/ebpf/bpfwrapper/eventCallbacks.go index 22082b28..355e1249 100644 --- a/ebpf/bpfwrapper/eventCallbacks.go +++ b/ebpf/bpfwrapper/eventCallbacks.go @@ -140,7 +140,7 @@ func SocketDataEventCallback(inputChan chan []byte, connectionFactory *connectio bytesSent := event.Attr.Bytes_sent // The 4 bytes are being lost in padding, thus, not taking them into consideration. - eventAttributesLogicalSize := 45 + eventAttributesLogicalSize := 53 if len(data) > eventAttributesLogicalSize { copy(event.Msg[:], data[eventAttributesLogicalSize:eventAttributesLogicalSize+int(utils.Abs(bytesSent))]) diff --git a/ebpf/connections/factory.go b/ebpf/connections/factory.go index ac762cd8..38194cec 100644 --- a/ebpf/connections/factory.go +++ b/ebpf/connections/factory.go @@ -82,6 +82,12 @@ var ( trackerDataProcessInterval = 100 ) +const ( + protocolUnknown = "UNKN" + protocolhttp1 = "HTTP1" + protocolhttp2 = "HTTP2" +) + func init() { utils.InitVar("TRAFFIC_DISABLE_EGRESS", &disableEgress) utils.InitVar("TRAFFIC_MAX_ACTIVE_CONN", &maxActiveConnections) @@ -120,13 +126,15 @@ func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool hostName = kafkaUtil.PodInformerInstance.GetPodNameByProcessId(int32(connID.Id >> 32)) } - if len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes)) { - tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName) + protocol := tracker.protocol + + if (len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 { + tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } if !disableEgress { // attempt to parse the egress as well by switching the recv and sent buffers. - if len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes)) { - tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName) + if (len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 { + tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol) } } } diff --git a/ebpf/connections/parser.go b/ebpf/connections/parser.go index b8046d1a..e3a43053 100644 --- a/ebpf/connections/parser.go +++ b/ebpf/connections/parser.go @@ -4,6 +4,6 @@ import ( "github.com/akto-api-security/mirroring-api-logging/trafficUtil/kafkaUtil" ) -func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string) { - kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName) +func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string, protocol string) { + kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName, protocol) } diff --git a/ebpf/connections/tracker.go b/ebpf/connections/tracker.go index 0d8d8acb..9cfb46fe 100644 --- a/ebpf/connections/tracker.go +++ b/ebpf/connections/tracker.go @@ -30,6 +30,7 @@ type Tracker struct { srcPort uint16 foundHTTP bool + protocol string // "http1", "http2", or "unknown" } func NewTracker(connID structs.ConnID) *Tracker { @@ -40,6 +41,7 @@ func NewTracker(connID structs.ConnID) *Tracker { mutex: sync.RWMutex{}, ssl: false, foundHTTP: false, + protocol: protocolUnknown, } } @@ -58,6 +60,29 @@ func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) { conn.mutex.Lock() defer conn.mutex.Unlock() + // Update protocol from eBPF if it has changed from UNKN + protocolBytes := event.Attr.Protocol[:] + nullIndex := -1 + for i, b := range protocolBytes { + if b == 0 { + nullIndex = i + break + } + } + if nullIndex > 0 { + protocolStr := string(protocolBytes[:nullIndex]) + if protocolStr != protocolUnknown && conn.protocol != protocolStr { + switch protocolStr { + case protocolhttp1: + conn.protocol = protocolhttp1 + case protocolhttp2: + conn.protocol = protocolhttp2 + default: + conn.protocol = protocolUnknown + } + } + } + if !conn.ssl && event.Attr.Ssl { for k := range conn.sentBuf { conn.sentBuf[k] = []byte{} diff --git a/ebpf/kernel/module.cc b/ebpf/kernel/module.cc index a17a1251..834fd7d5 100644 --- a/ebpf/kernel/module.cc +++ b/ebpf/kernel/module.cc @@ -49,6 +49,7 @@ struct conn_info_t { bool ssl; u32 readEventsCount; u32 writeEventsCount; + char protocol[8]; }; union sockaddr_t { @@ -87,6 +88,7 @@ struct socket_open_event_t { u32 src_ip; unsigned short src_port; u64 socket_open_ns; + char protocol[8]; // Protocol detected from payload: "HTTP1", "HTTP2", "UNKN" }; struct socket_close_event_t { @@ -108,6 +110,7 @@ struct socket_data_event_t { u32 readEventsCount; u32 writeEventsCount; bool ssl; + char protocol[8]; // Protocol detected: "HTTP1", "HTTP2", "UNKN" char msg[MAX_MSG_SIZE]; }; @@ -233,6 +236,9 @@ static __inline void process_syscall_accept(struct pt_regs* ret, const struct ac conn_info.readEventsCount = 0; conn_info.writeEventsCount = 0; + // Initialize protocol as unknown - will be detected from first data packet + __builtin_memcpy(conn_info.protocol, "UNKN", 5); + u32 tgid = id >> 32; u64 tgid_fd = 0; if(isConnect){ @@ -280,6 +286,7 @@ static __inline void process_syscall_accept(struct pt_regs* ret, const struct ac socket_open_event.ip = conn_info.ip; socket_open_event.src_ip = srcIp; socket_open_event.src_port = lport; + __builtin_memcpy(socket_open_event.protocol, conn_info.protocol, 8); if (PRINT_BPF_LOGS){ bpf_trace_printk("accept call: %llu %d %d", socket_open_event.id, socket_open_event.fd, isConnect); @@ -318,7 +325,42 @@ static __inline void process_syscall_close(struct pt_regs* ret, const struct clo socket_close_event.socket_close_ns = bpf_ktime_get_ns(); socket_close_events.perf_submit(ret, &socket_close_event, sizeof(struct socket_close_event_t)); - conn_info_map.delete(&tgid_fd); + conn_info_map.delete(&tgid_fd); +} + +static __inline void detect_protocol_from_data(struct conn_info_t *conn_info, const char *buf, size_t count) { + // Only detect if protocol is still unknown + if (conn_info->protocol[0] != 'U') { + return; + } + + if (count < 6) { + return; + } + + // HTTP/2 connection preface: "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + if (buf[0] == 'P' && buf[1] == 'R' && buf[2] == 'I' && buf[3] == ' ' && buf[4] == '*') { + __builtin_memcpy(conn_info->protocol, "HTTP2", 6); + return; + } + + // HTTP/1.x request methods (verbs): GET, POST, PUT, DELETE, HEAD, PATCH + // GET /path HTTP/1.1 + if ((buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ') || + (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') || + (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T' && buf[3] == ' ') || + (buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E') || + (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') || + (buf[0] == 'P' && buf[1] == 'A' && buf[2] == 'T' && buf[3] == 'C')) { + __builtin_memcpy(conn_info->protocol, "HTTP1", 6); + return; + } + + // HTTP/1.x response: "HTTP/1.0 200 OK" or "HTTP/1.1 200 OK" + if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P' && buf[4] == '/') { + __builtin_memcpy(conn_info->protocol, "HTTP1", 6); + return; + } } static __inline void process_syscall_data(struct pt_regs* ret, const struct data_args_t* args, u64 id, bool is_send, bool ssl) { @@ -370,8 +412,9 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data socket_data_event->fd = conn_info->fd; socket_data_event->conn_start_ns = conn_info->conn_start_ns; socket_data_event->port = conn_info->port; - socket_data_event->ip = conn_info->ip; + socket_data_event->ip = conn_info->ip; socket_data_event->ssl = conn_info->ssl; + __builtin_memcpy(socket_data_event->protocol, conn_info->protocol, 8); int bytes_sent = 0; size_t size_to_save = 0; @@ -401,6 +444,9 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data size_to_save = MAX_MSG_SIZE; } + // Detect protocol from first packet payload + detect_protocol_from_data(conn_info, socket_data_event->msg, size_to_save); + if (is_send){ conn_info->writeEventsCount = (conn_info->writeEventsCount) + 1u; } else { @@ -446,7 +492,6 @@ static __inline void process_syscall_data_vecs(struct pt_regs* ret, struct data_ } } -// Hooks int syscall__probe_entry_accept(struct pt_regs* ctx, int sockfd, struct sockaddr* addr, socklen_t* addrlen) { u64 id = bpf_get_current_pid_tgid(); diff --git a/ebpf/structs/structs.go b/ebpf/structs/structs.go index 3a88bd09..342f0d36 100644 --- a/ebpf/structs/structs.go +++ b/ebpf/structs/structs.go @@ -18,6 +18,7 @@ type SocketDataEventAttr struct { ReadEventsCount uint32 WriteEventsCount uint32 Ssl bool + Protocol [8]byte } /* @@ -46,6 +47,7 @@ type SocketOpenEvent struct { SrcPort uint16 Padding [2]byte Socket_open_ns uint64 + Protocol [8]byte } type SocketCloseEvent struct { diff --git a/trafficUtil/kafkaUtil/parser.go b/trafficUtil/kafkaUtil/parser.go index c348f5de..81666172 100644 --- a/trafficUtil/kafkaUtil/parser.go +++ b/trafficUtil/kafkaUtil/parser.go @@ -19,6 +19,8 @@ import ( trafficpb "github.com/akto-api-security/mirroring-api-logging/trafficUtil/protobuf/traffic_payload" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/trafficMetrics" "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var ( @@ -47,6 +49,10 @@ var ( const ONE_MINUTE = 60 +const ( + protocolhttp2 = "HTTP2" +) + func init() { utils.InitVar("DEBUG_MODE", &debugMode) utils.InitVar("OUTPUT_BANDWIDTH_LIMIT", &outputBandwidthLimitPerMin) @@ -170,15 +176,21 @@ func IsValidMethod(method string) bool { } func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, destIp string, vxlanID int, isPending bool, - trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string) { + trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string, protocol string) { if checkAndUpdateBandwidthProcessed(0) { return } + // Route to HTTP/2 parser if protocol is detected as HTTP/2 + if protocol == protocolhttp2 { + slog.Debug("Routing to HTTP/2 parser", "sourceIp", sourceIp, "destIp", destIp, "protocol", protocol) + ParseHTTP2AndProduce(receiveBuffer, sentBuffer, sourceIp, destIp, vxlanID, isPending, trafficSource, isComplete, direction, idfd, fd, daemonsetIdentifier, hostName) + return + } shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") if shouldPrint { - slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer)) + slog.Debug("ParseAndProduce", "receiveBuffer", string(receiveBuffer), "sentBuffer", string(sentBuffer), "protocol", protocol) } reader := bufio.NewReader(bytes.NewReader(receiveBuffer)) @@ -296,22 +308,7 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d id := "" - // build req headers for threat client - reqHeader := make(map[string]*trafficpb.StringList) - for name, values := range req.Header { - // Loop over all values for the name. - for _, value := range values { - reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } - } - } - ip := GetSourceIp(reqHeader, sourceIp) - - reqHeader["host"] = &trafficpb.StringList{ - Values: []string{req.Host}, - } - + // build req headers for threat client (no longer needed, keeping for x-debug-token detection) reqHeaderStr := make(map[string]string) for name, values := range req.Header { // Loop over all values for the name. @@ -326,184 +323,423 @@ func ParseAndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, d reqHeaderStr["host"] = req.Host - passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) - //printLog("Req header: " + mapToString(reqHeaderStr)) - //printLog(fmt.Sprintf("passes %t", passes)) - - if !passes { + // Apply common filters + if !applyFiltersAndChecks(reqHeaderStr, req.Host, sourceIp, direction) { i++ continue } - if utils.IgnoreIpTraffic && utils.CheckIfIp(req.Host) { - i++ - continue + // Build response headers map + respHeaderStr := make(map[string]string) + for name, values := range resp.Header { + for _, value := range values { + respHeaderStr[name] = value + } } - if utils.IgnoreCloudMetadataCalls && req.Host == "169.254.169.254" { - i++ - continue + // Use common production logic + params := trafficParams{ + method: req.Method, + path: req.URL.String(), + requestHeaders: reqHeaderStr, + responseHeaders: respHeaderStr, + requestPayload: requestsContent[i], + responsePayload: responsesContent[i], + statusCode: resp.StatusCode, + status: resp.Status, + protocolType: string(req.Proto), + sourceIp: sourceIp, + destIp: destIp, + vxlanID: vxlanID, + isPending: isPending, + trafficSource: trafficSource, + direction: direction, + idfd: idfd, + fd: fd, + daemonsetID: daemonsetIdentifier, + hostName: hostName, } - if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { - slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "url", req.URL.String(), "host", req.Host) - i++ - continue + if shouldPrint { + if strings.Contains(responsesContent[i], id) { + goodRequests++ + } else { + slog.Debug("req-resp mismatch", "path", req.URL.String()) + badRequests++ + } + + if goodRequests%100 == 0 || badRequests%100 == 0 { + slog.Debug("Good requests", "count", goodRequests, "badRequests", badRequests) + } } - var skipPacket = utils.FilterPacket(reqHeaderStr) + produceTrafficData(params) - if skipPacket { - i++ - continue + i++ + } +} + +type http2Stream struct { + streamID uint32 + requestHeaders map[string]string + requestBody []byte + responseHeaders map[string]string + responseBody []byte + method string + path string + statusCode int + status string + requestComplete bool + responseComplete bool +} + +type trafficParams struct { + method string + path string + requestHeaders map[string]string + responseHeaders map[string]string + requestPayload string + responsePayload string + statusCode int + status string + protocolType string + sourceIp string + destIp string + vxlanID int + isPending bool + trafficSource string + direction int + idfd uint64 + fd uint32 + daemonsetID string + hostName string +} + +// applyFiltersAndChecks performs common filtering logic for both HTTP/1 and HTTP/2 +func applyFiltersAndChecks(reqHeaderStr map[string]string, host string, sourceIp string, direction int) bool { + passes := utils.PassesFilter(trafficMetrics.FilterHeaderValueMap, reqHeaderStr) + if !passes { + return false + } + + if utils.IgnoreIpTraffic && utils.CheckIfIp(host) { + return false + } + + if utils.IgnoreCloudMetadataCalls && host == "169.254.169.254" { + return false + } + + if utils.IgnoreEnvoyProxycalls && sourceIp == utils.EnvoyProxyIp && direction == utils.DirectionOutbound { + slog.Debug("Ignoring outbound envoy proxy call", "sourceIp", sourceIp, "host", host) + return false + } + + return !utils.FilterPacket(reqHeaderStr) +} + +func produceTrafficData(params trafficParams) { + + reqHeader := make(map[string]*trafficpb.StringList) + for name, value := range params.requestHeaders { + reqHeader[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, } + } - // build resp headers for threat client - respHeader := make(map[string]*trafficpb.StringList) - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - respHeader[strings.ToLower(name)] = &trafficpb.StringList{ - Values: []string{value}, - } + // Add HTTP/2 pseudo-headers if present + if params.method != "" { + reqHeader[":method"] = &trafficpb.StringList{Values: []string{params.method}} + } + if params.path != "" { + reqHeader[":path"] = &trafficpb.StringList{Values: []string{params.path}} + } + + ip := GetSourceIp(reqHeader, params.sourceIp) + + respHeader := make(map[string]*trafficpb.StringList) + for name, value := range params.responseHeaders { + respHeader[strings.ToLower(name)] = &trafficpb.StringList{ + Values: []string{value}, + } + } + + host := params.requestHeaders["host"] + url := params.path + checkDebugUrlAndPrint(url, host, fmt.Sprintf("%s URL,host found", params.protocolType)) + + payload := &trafficpb.HttpResponseParam{ + Method: params.method, + Path: params.path, + RequestHeaders: reqHeader, + ResponseHeaders: respHeader, + RequestPayload: params.requestPayload, + ResponsePayload: params.responsePayload, + Ip: ip, + Time: int32(time.Now().Unix()), + StatusCode: int32(params.statusCode), + Type: params.protocolType, + Status: params.status, + AktoAccountId: fmt.Sprint(1000000), + AktoVxlanId: fmt.Sprint(params.vxlanID), + IsPending: params.isPending, + Source: params.trafficSource, + } + + // Build JSON value map + reqHeaderString, _ := json.Marshal(params.requestHeaders) + respHeaderString, _ := json.Marshal(params.responseHeaders) + + // TODO: remove and use protobuf instead + value := map[string]string{ + "path": params.path, + "requestHeaders": string(reqHeaderString), + "responseHeaders": string(respHeaderString), + "method": params.method, + "requestPayload": params.requestPayload, + "responsePayload": params.responsePayload, + "ip": params.sourceIp, + "destIp": params.destIp, + "time": fmt.Sprint(time.Now().Unix()), + "statusCode": fmt.Sprint(params.statusCode), + "type": params.protocolType, + "status": params.status, + "akto_account_id": fmt.Sprint(1000000), + "akto_vxlan_id": fmt.Sprint(params.vxlanID), + "is_pending": fmt.Sprint(params.isPending), + "source": params.trafficSource, + "direction": fmt.Sprint(params.direction), + "process_id": fmt.Sprint(params.idfd >> 32), + "socket_id": fmt.Sprint(params.fd), + "daemonset_id": params.daemonsetID, + "enable_graph": fmt.Sprint(utils.EnableGraph), + } + + var pid = params.idfd >> 32 + log := fmt.Sprintf("%s pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", + params.protocolType, params.direction, host, params.path, params.sourceIp, params.destIp, params.fd, pid, params.hostName) + checkDebugUrlAndPrint(url, host, log) + + // Resolve pod labels if applicable + if PodInformerInstance != nil && params.direction == utils.DirectionInbound { + if params.hostName == "" { + checkDebugUrlAndPrint(url, host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) + slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", params.hostName) + } else { + podLabels, err := PodInformerInstance.ResolvePodLabels(params.hostName, url, host) + if err != nil { + slog.Error("Failed to resolve pod labels", "hostName", params.hostName, "error", err) + checkDebugUrlAndPrint(url, host, "Error resolving pod labels "+params.hostName) + } else { + value["tag"] = podLabels + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels found in %s, podLabels found %v for hostName %s", params.protocolType, podLabels, params.hostName)) + slog.Debug("Pod labels", "podName", params.hostName, "labels", podLabels) } } + } else { + checkDebugUrlAndPrint(url, host, fmt.Sprintf("Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: %d", params.direction)) + } - // TODO: remove and use protobuf instead - respHeaderStr := make(map[string]string) - for name, values := range resp.Header { - // Loop over all values for the name. - for _, value := range values { - respHeaderStr[name] = value + out, _ := json.Marshal(value) + ctx := context.Background() + + outgoingBytes := len(out) + if checkAndUpdateBandwidthProcessed(outgoingBytes) { + return + } + + hostString := host + if utils.CheckIfIpHost(hostString) { + hostString = "ip-host" + } + oc := utils.GenerateOutgoingCounter(params.vxlanID, params.sourceIp, hostString) + trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) + + if apiProcessor.CloudProcessorInstance != nil { + apiProcessor.CloudProcessorInstance.Produce(value) + } else { + go ProduceStr(ctx, string(out), url, host) + go Produce(ctx, payload) + } +} + +func ParseHTTP2AndProduce(receiveBuffer []byte, sentBuffer []byte, sourceIp string, destIp string, vxlanID int, isPending bool, + trafficSource string, isComplete bool, direction int, idfd uint64, fd uint32, daemonsetIdentifier string, hostName string) { + + if checkAndUpdateBandwidthProcessed(0) { + return + } + + shouldPrint := debugMode && strings.Contains(string(receiveBuffer), "x-debug-token") + if shouldPrint { + slog.Debug("ParseHTTP2AndProduce", "receiveBufferLen", len(receiveBuffer), "sentBufferLen", len(sentBuffer)) + } + + http2Preface := []byte("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + if len(receiveBuffer) >= len(http2Preface) && bytes.Equal(receiveBuffer[:len(http2Preface)], http2Preface) { + receiveBuffer = receiveBuffer[len(http2Preface):] + slog.Debug("Skipped HTTP/2 connection preface") + } + if len(sentBuffer) >= len(http2Preface) && bytes.Equal(sentBuffer[:len(http2Preface)], sentBuffer) { + sentBuffer = sentBuffer[len(http2Preface):] + } + + streams := make(map[uint32]*http2Stream) + + parseHTTP2Frames(receiveBuffer, streams, true, shouldPrint) + parseHTTP2Frames(sentBuffer, streams, false, shouldPrint) + + // Process complete request/response pairs + for streamID, stream := range streams { + if !stream.requestComplete || !stream.responseComplete { + if shouldPrint { + slog.Debug("Incomplete stream", "streamID", streamID, "requestComplete", stream.requestComplete, "responseComplete", stream.responseComplete) } + // Skip incomplete streams + continue } - url := req.URL.String() - checkDebugUrlAndPrint(url, req.Host, "URL,host found in ParseAndProduce") - - // build kafka payload for threat client - payload := &trafficpb.HttpResponseParam{ - Method: req.Method, - Path: req.URL.String(), - RequestHeaders: reqHeader, - ResponseHeaders: respHeader, - RequestPayload: requestsContent[i], - ResponsePayload: responsesContent[i], - Ip: ip, - Time: int32(time.Now().Unix()), - StatusCode: int32(resp.StatusCode), - Type: string(req.Proto), - Status: resp.Status, - AktoAccountId: fmt.Sprint(1000000), - AktoVxlanId: fmt.Sprint(vxlanID), - IsPending: isPending, - Source: trafficSource, - } - - reqHeaderString, _ := json.Marshal(reqHeaderStr) - respHeaderString, _ := json.Marshal(respHeaderStr) - - // TODO: remove and use protobuf instead - value := map[string]string{ - "path": req.URL.String(), - "requestHeaders": string(reqHeaderString), - "responseHeaders": string(respHeaderString), - "method": req.Method, - "requestPayload": requestsContent[i], - "responsePayload": responsesContent[i], - "ip": sourceIp, - "destIp": destIp, - "time": fmt.Sprint(time.Now().Unix()), - "statusCode": fmt.Sprint(resp.StatusCode), - "type": string(req.Proto), - "status": resp.Status, - "akto_account_id": fmt.Sprint(1000000), - "akto_vxlan_id": fmt.Sprint(vxlanID), - "is_pending": fmt.Sprint(isPending), - "source": trafficSource, - "direction": fmt.Sprint(direction), - "process_id": fmt.Sprint(idfd >> 32), - "socket_id": fmt.Sprint(fd), - "daemonset_id": fmt.Sprint(daemonsetIdentifier), - "enable_graph": fmt.Sprint(utils.EnableGraph), - } - - // Process id was captured from the eBPF program using bpf_get_current_pid_tgid() - // Shifting by 32 gives us the process id on host machine. - var pid = idfd >> 32 - log := fmt.Sprintf("pod direction log: direction=%v, host=%v, path=%v, sourceIp=%v, destIp=%v, socketId=%v, processId=%v, hostName=%v", - direction, - reqHeaderStr["host"], - value["path"], - sourceIp, - destIp, - value["socket_id"], - pid, - hostName, - ) - checkDebugUrlAndPrint(url, req.Host, log) - - if PodInformerInstance != nil && direction == utils.DirectionInbound { - - if hostName == "" { - checkDebugUrlAndPrint(url, req.Host, "Failed to resolve pod name, hostName is empty for processId "+fmt.Sprint(pid)) - slog.Error("Failed to resolve pod name, hostName is empty for ", "processId", pid, "hostName", hostName) - } else { - podLabels, err := PodInformerInstance.ResolvePodLabels(hostName, url, req.Host) - if err != nil { - slog.Error("Failed to resolve pod labels", "hostName", hostName, "error", err) - checkDebugUrlAndPrint(url, req.Host, "Error resolving pod labels "+hostName) - } else { - value["tag"] = podLabels - checkDebugUrlAndPrint(url, req.Host, "Pod labels found in ParseAndProduce, podLabels found "+fmt.Sprint(podLabels)+" for hostName "+hostName) - slog.Debug("Pod labels", "podName", hostName, "labels", podLabels) + // Extract host for filtering + host := stream.requestHeaders[":authority"] + if host == "" { + host = stream.requestHeaders["host"] + } + + reqHeaderStr := make(map[string]string) + for name, value := range stream.requestHeaders { + reqHeaderStr[name] = value + } + if host != "" { + reqHeaderStr["host"] = host + } + + // Apply common filters + if !applyFiltersAndChecks(reqHeaderStr, host, sourceIp, direction) { + continue + } + + respHeaderStr := make(map[string]string) + for name, value := range stream.responseHeaders { + respHeaderStr[name] = value + } + + // Use common production logic + params := trafficParams{ + method: stream.method, + path: stream.path, + requestHeaders: reqHeaderStr, + responseHeaders: respHeaderStr, + requestPayload: string(stream.requestBody), + responsePayload: string(stream.responseBody), + statusCode: stream.statusCode, + status: stream.status, + protocolType: "HTTP/2.0", + sourceIp: sourceIp, + destIp: destIp, + vxlanID: vxlanID, + isPending: isPending, + trafficSource: trafficSource, + direction: direction, + idfd: idfd, + fd: fd, + daemonsetID: daemonsetIdentifier, + hostName: hostName, + } + + produceTrafficData(params) + } +} + +func parseHTTP2Frames(buffer []byte, streams map[uint32]*http2Stream, isRequest bool, shouldPrint bool) { + framer := http2.NewFramer(nil, bytes.NewReader(buffer)) + framer.SetMaxReadFrameSize(1 << 20) // 1MB max frame size + + decoder := hpack.NewDecoder(4096, nil) + + for { + frame, err := framer.ReadFrame() + if err != nil { + if err != io.EOF { + if shouldPrint { + slog.Debug("Error reading HTTP/2 frame", "error", err, "isRequest", isRequest) } } - } else { - checkDebugUrlAndPrint(url, req.Host, "Pod labels not resolved, PodInformerInstance is nil or direction is not inbound, direction: "+fmt.Sprint(direction)) + break } - out, _ := json.Marshal(value) - ctx := context.Background() + streamID := frame.Header().StreamID - // calculating the size of outgoing bytes and requests (1) and saving it in outgoingCounterMap - // this number is the closest (slightly higher) to the actual connection transfer bytes. - outgoingBytes := len(out) - - if checkAndUpdateBandwidthProcessed(outgoingBytes) { - return + // Skip stream 0 (connection-level frames like SETTINGS, WINDOW_UPDATE, PING) + if streamID == 0 { + continue } - hostString := reqHeaderStr["host"] - if utils.CheckIfIpHost(hostString) { - hostString = "ip-host" + // Get or create stream + stream, exists := streams[streamID] + if !exists { + stream = &http2Stream{ + streamID: streamID, + requestHeaders: make(map[string]string), + responseHeaders: make(map[string]string), + } + streams[streamID] = stream } - oc := utils.GenerateOutgoingCounter(vxlanID, sourceIp, hostString) - trafficMetrics.SubmitOutgoingTrafficMetrics(oc, outgoingBytes) - if shouldPrint { - if strings.Contains(responsesContent[i], id) { - goodRequests++ - } else { - slog.Debug("req-resp.String()", "out", string(out)) - badRequests++ + switch f := frame.(type) { + case *http2.HeadersFrame: + headerBlock := f.HeaderBlockFragment() + headers, err := decoder.DecodeFull(headerBlock) + if err != nil { + slog.Error("Failed to decode HPACK headers", "error", err, "streamID", streamID) + continue } - if goodRequests%100 == 0 || badRequests%100 == 0 { - slog.Debug("Good requests", "count", goodRequests, "badRequests", badRequests) + if isRequest { + for _, hf := range headers { + stream.requestHeaders[hf.Name] = hf.Value + // Extract pseudo-headers + if hf.Name == ":method" { + stream.method = hf.Value + } else if hf.Name == ":path" { + stream.path = hf.Value + } + } + if f.StreamEnded() { + stream.requestComplete = true + } + } else { + for _, hf := range headers { + stream.responseHeaders[hf.Name] = hf.Value + // Extract status code + if hf.Name == ":status" { + stream.status = hf.Value + fmt.Sscanf(hf.Value, "%d", &stream.statusCode) + } + } + if f.StreamEnded() { + stream.responseComplete = true + } } - } - if apiProcessor.CloudProcessorInstance != nil { - apiProcessor.CloudProcessorInstance.Produce(value) + case *http2.DataFrame: + data := f.Data() + if isRequest { + stream.requestBody = append(stream.requestBody, data...) + if f.StreamEnded() { + stream.requestComplete = true + } + } else { + stream.responseBody = append(stream.responseBody, data...) + if f.StreamEnded() { + stream.responseComplete = true + } + } - } else { - // Produce to kafka - // TODO : remove and use protobuf instead - go ProduceStr(ctx, string(out), url, req.Host) - go Produce(ctx, payload) + case *http2.RSTStreamFrame: + // Stream was reset + if shouldPrint { + slog.Debug("Stream reset", "streamID", streamID, "errorCode", f.ErrCode) + } } - - i++ } }