Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 0 additions & 37 deletions common.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,2 @@
package main

import (
"log/slog"
"os"
"strconv"
"strings"
"time"
)

func InitVar(envVarName string, targetVar interface{}) {
envVar := os.Getenv(envVarName)
if len(envVar) > 0 {
switch v := targetVar.(type) {
case *bool:
*v = strings.ToLower(envVar) == "true"
slog.Warn("Setting env value", "name", envVarName, "value", *v)
case *string:
*v = envVar
slog.Warn("Setting env value", "name", envVarName, "value", *v)
case *time.Duration:
temp, err := time.ParseDuration(envVar + "s")
if err == nil {
*v = temp
slog.Warn("Setting env value", "name", envVarName, "value", *v)
}
case *int:
temp, err := strconv.Atoi(envVar)
if err == nil {
*v = temp
slog.Warn("Setting env value", "name", envVarName, "value", *v)
}
default:
slog.Warn("Unsupported type for targetVar", "type", v)
}
} else {
slog.Warn("Missing env value, using default value", "name", envVarName)
}
}
19 changes: 12 additions & 7 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"log/slog"
"os"
"strings"
"time"

trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload"
"github.com/akto-api-security/mirroring-api-logging/utils"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"google.golang.org/protobuf/proto"
Expand All @@ -36,14 +38,14 @@ var kafkaPassword = ""

func init() {

InitVar("USE_TLS", &useTLS)
InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify)
InitVar("TLS_CA_CERT_PATH", &tlsCACertPath)
utils.InitVar("USE_TLS", &useTLS)
utils.InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify)
utils.InitVar("TLS_CA_CERT_PATH", &tlsCACertPath)

// Initialize SASL authentication variables
InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented)
InitVar("KAFKA_USERNAME", &kafkaUsername)
InitVar("KAFKA_PASSWORD", &kafkaPassword)
utils.InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented)
utils.InitVar("KAFKA_USERNAME", &kafkaUsername)
utils.InitVar("KAFKA_PASSWORD", &kafkaPassword)

}

Expand Down Expand Up @@ -95,9 +97,10 @@ func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) s
return packetIp
}

func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) error {
func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string, url, reqHost string) error {
// intialize the writer with the broker addresses, and the topic
topic := "akto.api.logs"
utils.CheckDebugUrlAndPrint(url, reqHost, "begin kafka write to akto.api.logs topic")
msg := kafka.Message{
Topic: topic,
Value: []byte(message),
Expand All @@ -106,9 +109,11 @@ func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string)

if err != nil {
slog.Error("Kafka write for runtime failed", "topic", topic, "error", err)
utils.CheckDebugUrlAndPrint(url, reqHost, fmt.Sprintf("Kafka write failed: %v", err))
return err
}

utils.CheckDebugUrlAndPrint(url, reqHost, "Kafka write successful: ")
return nil

}
Expand Down
15 changes: 12 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) {
printCounter--
log.Println("req-resp.String()", string(out))
}
go ProduceStr(kafkaWriter, ctx, string(out))
go ProduceStr(kafkaWriter, ctx, string(out), value["path"], http2Request.headersMap[":authority"])
}

}
Expand Down Expand Up @@ -437,10 +437,12 @@ func tryReadFromBD(bd *bidi, isPending bool) {

requests = append(requests, *req)
requestsContent = append(requestsContent, string(body))
utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "request parsed in tryReadFromBD")
i++
}

if len(requests) == 0 {
utils.CheckDebugUrlAndPrint("", "", "no requests parsed, returning early")
return
}

Expand Down Expand Up @@ -490,6 +492,9 @@ func tryReadFromBD(bd *bidi, isPending bool) {
}

if len(requests) != len(responses) {
if len(requests) > 0 {
utils.CheckDebugUrlAndPrint(requests[0].URL.String(), requests[0].Host, "req/resp count mismatch, dropping all")
}
return
}

Expand All @@ -502,6 +507,8 @@ func tryReadFromBD(bd *bidi, isPending bool) {
req := &requests[i]
resp := &responses[i]

utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "URL,host found in ParseAndProduce")

// build req headers for threat client
reqHeader := make(map[string]*trafficpb.StringList)
for name, values := range req.Header {
Expand Down Expand Up @@ -533,6 +540,7 @@ func tryReadFromBD(bd *bidi, isPending bool) {
//printLog(fmt.Sprintf("passes %t", passes))

if !passes {
utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "request dropped by filter in tryReadFromBD")
i++
continue
}
Expand Down Expand Up @@ -638,7 +646,8 @@ func tryReadFromBD(bd *bidi, isPending bool) {
}

// Todo convert to protobuf
go ProduceStr(kafkaWriter, ctx, string(out))
utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "producing to kafka in tryReadFromBD")
go ProduceStr(kafkaWriter, ctx, string(out), req.URL.String(), req.Host)
i++
}
}
Expand Down Expand Up @@ -927,7 +936,7 @@ func initKafka() {

ctx := context.Background()
out, _ := json.Marshal(value)
err := ProduceStr(kafkaWriter, ctx, string(out))
err := ProduceStr(kafkaWriter, ctx, string(out), "", "")
err = Produce(kafkaWriter, ctx, payload)
log.Println("logging kafka stats post pushing message")
logKafkaStats()
Expand Down
124 changes: 123 additions & 1 deletion utils/util_functions.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,129 @@
package utils

import "strings"
import (
"bufio"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"
)

var DebugStrings = []string{}

func init() {
debugStringsEnv := ""
InitVar("DEBUG_URLS", &debugStringsEnv)
if len(debugStringsEnv) > 0 {
DebugStrings = strings.Split(debugStringsEnv, ",")
}
slog.Info("debugStrings", "DebugStrings", DebugStrings)

go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
UpdateDebugStringsFromFile()
<-ticker.C
}
}()
}

func InitVar(envVarName string, targetVar interface{}) {
envVar := os.Getenv(envVarName)
if len(envVar) > 0 {
switch v := targetVar.(type) {
case *bool:
*v = strings.ToLower(envVar) == "true"
slog.Warn("Setting env value", "name", envVarName, "value", *v)
case *string:
*v = envVar
slog.Warn("Setting env value", "name", envVarName, "value", *v)
case *time.Duration:
temp, err := time.ParseDuration(envVar + "s")
if err == nil {
*v = temp
slog.Warn("Setting env value", "name", envVarName, "value", *v)
}
case *int:
temp, err := strconv.Atoi(envVar)
if err == nil {
*v = temp
slog.Warn("Setting env value", "name", envVarName, "value", *v)
}
default:
slog.Warn("Unsupported type for targetVar", "type", v)
}
} else {
slog.Warn("Missing env value, using default value", "name", envVarName)
}
}

// Reads /ebpf/debug-urls.txt and updates DebugStrings with any new URLs found in the file (one per line)
func UpdateDebugStringsFromFile() {
filePath := "/app/debug-urls.txt"
f, err := os.Open(filePath)
if err != nil {
// File may not exist, that's fine
return
}
defer f.Close()

scanner := bufio.NewScanner(f)
fileUrls := []string{}
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line != "" {
fileUrls = append(fileUrls, line)
}
}
if err := scanner.Err(); err != nil {
return
}

if len(fileUrls) > 0 {
// Merge with env DebugStrings, avoid duplicates
urlSet := make(map[string]struct{})
for _, u := range DebugStrings {
urlSet[u] = struct{}{}
}
newUrls := []string{}
for _, u := range fileUrls {
if _, exists := urlSet[u]; !exists {
newUrls = append(newUrls, u)
}
urlSet[u] = struct{}{}
}
if len(newUrls) > 0 {
merged := make([]string, 0, len(urlSet))
for u := range urlSet {
merged = append(merged, u)
}
DebugStrings = merged
slog.Info("New debugStrings found in file", "newUrls", newUrls, "DebugStrings", DebugStrings)
}
}
}

func CheckIfIpHost(host string) bool {
return strings.ToLower(host) == strings.ToUpper(host)
}


func CheckDebugUrlAndPrint(url string, host string, message string) {
// url or host. [array string]
if len(DebugStrings) > 0 {
for _, debugString := range DebugStrings {
if strings.Contains(url, debugString) {
logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message)
slog.Warn(logMsg)
break
} else if strings.Contains(host, debugString) {
logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message)
slog.Warn(logMsg)
break
}
}
}
}
Loading