Skip to content
2 changes: 1 addition & 1 deletion ebpf/bpfwrapper/eventCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func SocketDataEventCallback(inputChan chan []byte, connectionFactory *connectio
bytesSent := event.Attr.Bytes_sent

// The 4 bytes are being lost in padding, thus, not taking them into consideration.
eventAttributesLogicalSize := 45
eventAttributesLogicalSize := 53

if len(data) > eventAttributesLogicalSize {
copy(event.Msg[:], data[eventAttributesLogicalSize:eventAttributesLogicalSize+int(utils.Abs(bytesSent))])
Expand Down
16 changes: 12 additions & 4 deletions ebpf/connections/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ var (
trackerDataProcessInterval = 100
)

const (
protocolUnknown = "UNKN"
protocolhttp1 = "HTTP1"
protocolhttp2 = "HTTP2"
)

func init() {
utils.InitVar("TRAFFIC_DISABLE_EGRESS", &disableEgress)
utils.InitVar("TRAFFIC_MAX_ACTIVE_CONN", &maxActiveConnections)
Expand Down Expand Up @@ -120,13 +126,15 @@ func ProcessTrackerData(connID structs.ConnID, tracker *Tracker, isComplete bool
hostName = kafkaUtil.PodInformerInstance.GetPodNameByProcessId(int32(connID.Id >> 32))
}

if len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes)) {
tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName)
protocol := tracker.protocol

if (len(sentBuffer) >= len(httpBytes) && (bytes.Equal(sentBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 {
tryReadFromBD(destIpStr, srcIpStr, receiveBuffer, sentBuffer, isComplete, 1, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol)
}
if !disableEgress {
// attempt to parse the egress as well by switching the recv and sent buffers.
if len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes)) {
tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName)
if (len(receiveBuffer) >= len(httpBytes) && (bytes.Equal(receiveBuffer[:len(httpBytes)], httpBytes))) || protocol == protocolhttp2 {
tryReadFromBD(srcIpStr, destIpStr, sentBuffer, receiveBuffer, isComplete, 2, connID.Id, connID.Fd, uniqueDaemonsetId, hostName, protocol)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions ebpf/connections/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import (
"github.com/akto-api-security/mirroring-api-logging/trafficUtil/kafkaUtil"
)

func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string) {
kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName)
func tryReadFromBD(ip string, destIp string, receiveBuffer []byte, sentBuffer []byte, isComplete bool, direction int, id uint64, fd uint32, daemonsetIdentifier, hostName string, protocol string) {
kafkaUtil.ParseAndProduce(receiveBuffer, sentBuffer, ip, destIp, 0, false, "MIRRORING", isComplete, direction, id, fd, daemonsetIdentifier, hostName, protocol)
}
25 changes: 25 additions & 0 deletions ebpf/connections/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Tracker struct {
srcPort uint16

foundHTTP bool
protocol string // "http1", "http2", or "unknown"
}

func NewTracker(connID structs.ConnID) *Tracker {
Expand All @@ -40,6 +41,7 @@ func NewTracker(connID structs.ConnID) *Tracker {
mutex: sync.RWMutex{},
ssl: false,
foundHTTP: false,
protocol: protocolUnknown,
}
}

Expand All @@ -58,6 +60,29 @@ func (conn *Tracker) AddDataEvent(event structs.SocketDataEvent) {
conn.mutex.Lock()
defer conn.mutex.Unlock()

// Update protocol from eBPF if it has changed from UNKN
protocolBytes := event.Attr.Protocol[:]
nullIndex := -1
for i, b := range protocolBytes {
if b == 0 {
nullIndex = i
break
}
}
if nullIndex > 0 {
protocolStr := string(protocolBytes[:nullIndex])
if protocolStr != protocolUnknown && conn.protocol != protocolStr {
switch protocolStr {
case protocolhttp1:
conn.protocol = protocolhttp1
case protocolhttp2:
conn.protocol = protocolhttp2
default:
conn.protocol = protocolUnknown
}
}
}

if !conn.ssl && event.Attr.Ssl {
for k := range conn.sentBuf {
conn.sentBuf[k] = []byte{}
Expand Down
51 changes: 48 additions & 3 deletions ebpf/kernel/module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct conn_info_t {
bool ssl;
u32 readEventsCount;
u32 writeEventsCount;
char protocol[8];
};

union sockaddr_t {
Expand Down Expand Up @@ -87,6 +88,7 @@ struct socket_open_event_t {
u32 src_ip;
unsigned short src_port;
u64 socket_open_ns;
char protocol[8]; // Protocol detected from payload: "HTTP1", "HTTP2", "UNKN"
};

struct socket_close_event_t {
Expand All @@ -108,6 +110,7 @@ struct socket_data_event_t {
u32 readEventsCount;
u32 writeEventsCount;
bool ssl;
char protocol[8]; // Protocol detected: "HTTP1", "HTTP2", "UNKN"
char msg[MAX_MSG_SIZE];
};

Expand Down Expand Up @@ -233,6 +236,9 @@ static __inline void process_syscall_accept(struct pt_regs* ret, const struct ac
conn_info.readEventsCount = 0;
conn_info.writeEventsCount = 0;

// Initialize protocol as unknown - will be detected from first data packet
__builtin_memcpy(conn_info.protocol, "UNKN", 5);

u32 tgid = id >> 32;
u64 tgid_fd = 0;
if(isConnect){
Expand Down Expand Up @@ -280,6 +286,7 @@ static __inline void process_syscall_accept(struct pt_regs* ret, const struct ac
socket_open_event.ip = conn_info.ip;
socket_open_event.src_ip = srcIp;
socket_open_event.src_port = lport;
__builtin_memcpy(socket_open_event.protocol, conn_info.protocol, 8);

if (PRINT_BPF_LOGS){
bpf_trace_printk("accept call: %llu %d %d", socket_open_event.id, socket_open_event.fd, isConnect);
Expand Down Expand Up @@ -318,7 +325,42 @@ static __inline void process_syscall_close(struct pt_regs* ret, const struct clo

socket_close_event.socket_close_ns = bpf_ktime_get_ns();
socket_close_events.perf_submit(ret, &socket_close_event, sizeof(struct socket_close_event_t));
conn_info_map.delete(&tgid_fd);
conn_info_map.delete(&tgid_fd);
}

static __inline void detect_protocol_from_data(struct conn_info_t *conn_info, const char *buf, size_t count) {
// Only detect if protocol is still unknown
if (conn_info->protocol[0] != 'U') {
return;
}

if (count < 6) {
return;
}

// HTTP/2 connection preface: "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
if (buf[0] == 'P' && buf[1] == 'R' && buf[2] == 'I' && buf[3] == ' ' && buf[4] == '*') {
__builtin_memcpy(conn_info->protocol, "HTTP2", 6);
return;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comment for a reference of the magic sequence.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, just write, http verbs

// HTTP/1.x request methods (verbs): GET, POST, PUT, DELETE, HEAD, PATCH
// GET /path HTTP/1.1
if ((buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T' && buf[3] == ' ') ||
(buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') ||
(buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T' && buf[3] == ' ') ||
(buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E') ||
(buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') ||
(buf[0] == 'P' && buf[1] == 'A' && buf[2] == 'T' && buf[3] == 'C')) {
__builtin_memcpy(conn_info->protocol, "HTTP1", 6);
return;
}

// HTTP/1.x response: "HTTP/1.0 200 OK" or "HTTP/1.1 200 OK"
if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P' && buf[4] == '/') {
__builtin_memcpy(conn_info->protocol, "HTTP1", 6);
return;
}
}

static __inline void process_syscall_data(struct pt_regs* ret, const struct data_args_t* args, u64 id, bool is_send, bool ssl) {
Expand Down Expand Up @@ -370,8 +412,9 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data
socket_data_event->fd = conn_info->fd;
socket_data_event->conn_start_ns = conn_info->conn_start_ns;
socket_data_event->port = conn_info->port;
socket_data_event->ip = conn_info->ip;
socket_data_event->ip = conn_info->ip;
socket_data_event->ssl = conn_info->ssl;
__builtin_memcpy(socket_data_event->protocol, conn_info->protocol, 8);

int bytes_sent = 0;
size_t size_to_save = 0;
Expand Down Expand Up @@ -401,6 +444,9 @@ static __inline void process_syscall_data(struct pt_regs* ret, const struct data
size_to_save = MAX_MSG_SIZE;
}

// Detect protocol from first packet payload
detect_protocol_from_data(conn_info, socket_data_event->msg, size_to_save);
Comment thread
notshivansh marked this conversation as resolved.

if (is_send){
conn_info->writeEventsCount = (conn_info->writeEventsCount) + 1u;
} else {
Expand Down Expand Up @@ -446,7 +492,6 @@ static __inline void process_syscall_data_vecs(struct pt_regs* ret, struct data_
}
}

// Hooks
int syscall__probe_entry_accept(struct pt_regs* ctx, int sockfd, struct sockaddr* addr, socklen_t* addrlen) {
u64 id = bpf_get_current_pid_tgid();

Expand Down
2 changes: 2 additions & 0 deletions ebpf/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type SocketDataEventAttr struct {
ReadEventsCount uint32
WriteEventsCount uint32
Ssl bool
Protocol [8]byte
}

/*
Expand Down Expand Up @@ -46,6 +47,7 @@ type SocketOpenEvent struct {
SrcPort uint16
Padding [2]byte
Socket_open_ns uint64
Protocol [8]byte
}

type SocketCloseEvent struct {
Expand Down
Loading