diff --git a/common.go b/common.go index 49cebdbe..c9ecbf5e 100644 --- a/common.go +++ b/common.go @@ -1,39 +1,2 @@ package main -import ( - "log/slog" - "os" - "strconv" - "strings" - "time" -) - -func InitVar(envVarName string, targetVar interface{}) { - envVar := os.Getenv(envVarName) - if len(envVar) > 0 { - switch v := targetVar.(type) { - case *bool: - *v = strings.ToLower(envVar) == "true" - slog.Warn("Setting env value", "name", envVarName, "value", *v) - case *string: - *v = envVar - slog.Warn("Setting env value", "name", envVarName, "value", *v) - case *time.Duration: - temp, err := time.ParseDuration(envVar + "s") - if err == nil { - *v = temp - slog.Warn("Setting env value", "name", envVarName, "value", *v) - } - case *int: - temp, err := strconv.Atoi(envVar) - if err == nil { - *v = temp - slog.Warn("Setting env value", "name", envVarName, "value", *v) - } - default: - slog.Warn("Unsupported type for targetVar", "type", v) - } - } else { - slog.Warn("Missing env value, using default value", "name", envVarName) - } -} diff --git a/kafka.go b/kafka.go index 20fb744b..ed2eeb64 100644 --- a/kafka.go +++ b/kafka.go @@ -5,12 +5,14 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "fmt" "log/slog" "os" "strings" "time" trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" + "github.com/akto-api-security/mirroring-api-logging/utils" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" "google.golang.org/protobuf/proto" @@ -36,14 +38,14 @@ var kafkaPassword = "" func init() { - InitVar("USE_TLS", &useTLS) - InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify) - InitVar("TLS_CA_CERT_PATH", &tlsCACertPath) + utils.InitVar("USE_TLS", &useTLS) + utils.InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify) + utils.InitVar("TLS_CA_CERT_PATH", &tlsCACertPath) // Initialize SASL authentication variables - InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) - InitVar("KAFKA_USERNAME", &kafkaUsername) - InitVar("KAFKA_PASSWORD", &kafkaPassword) + utils.InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) + utils.InitVar("KAFKA_USERNAME", &kafkaUsername) + utils.InitVar("KAFKA_PASSWORD", &kafkaPassword) } @@ -95,9 +97,10 @@ func GetSourceIp(reqHeaders map[string]*trafficpb.StringList, packetIp string) s return packetIp } -func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) error { +func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string, url, reqHost string) error { // intialize the writer with the broker addresses, and the topic topic := "akto.api.logs" + utils.CheckDebugUrlAndPrint(url, reqHost, "begin kafka write to akto.api.logs topic") msg := kafka.Message{ Topic: topic, Value: []byte(message), @@ -106,9 +109,11 @@ func ProduceStr(kafkaWriter *kafka.Writer, ctx context.Context, message string) if err != nil { slog.Error("Kafka write for runtime failed", "topic", topic, "error", err) + utils.CheckDebugUrlAndPrint(url, reqHost, fmt.Sprintf("Kafka write failed: %v", err)) return err } + utils.CheckDebugUrlAndPrint(url, reqHost, "Kafka write successful: ") return nil } diff --git a/main.go b/main.go index 87ad8bd7..4b7dd9c5 100644 --- a/main.go +++ b/main.go @@ -401,7 +401,7 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { printCounter-- log.Println("req-resp.String()", string(out)) } - go ProduceStr(kafkaWriter, ctx, string(out)) + go ProduceStr(kafkaWriter, ctx, string(out), value["path"], http2Request.headersMap[":authority"]) } } @@ -437,10 +437,12 @@ func tryReadFromBD(bd *bidi, isPending bool) { requests = append(requests, *req) requestsContent = append(requestsContent, string(body)) + utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "request parsed in tryReadFromBD") i++ } if len(requests) == 0 { + utils.CheckDebugUrlAndPrint("", "", "no requests parsed, returning early") return } @@ -490,6 +492,9 @@ func tryReadFromBD(bd *bidi, isPending bool) { } if len(requests) != len(responses) { + if len(requests) > 0 { + utils.CheckDebugUrlAndPrint(requests[0].URL.String(), requests[0].Host, "req/resp count mismatch, dropping all") + } return } @@ -502,6 +507,8 @@ func tryReadFromBD(bd *bidi, isPending bool) { req := &requests[i] resp := &responses[i] + utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "URL,host found in ParseAndProduce") + // build req headers for threat client reqHeader := make(map[string]*trafficpb.StringList) for name, values := range req.Header { @@ -533,6 +540,7 @@ func tryReadFromBD(bd *bidi, isPending bool) { //printLog(fmt.Sprintf("passes %t", passes)) if !passes { + utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "request dropped by filter in tryReadFromBD") i++ continue } @@ -638,7 +646,8 @@ func tryReadFromBD(bd *bidi, isPending bool) { } // Todo convert to protobuf - go ProduceStr(kafkaWriter, ctx, string(out)) + utils.CheckDebugUrlAndPrint(req.URL.String(), req.Host, "producing to kafka in tryReadFromBD") + go ProduceStr(kafkaWriter, ctx, string(out), req.URL.String(), req.Host) i++ } } @@ -927,7 +936,7 @@ func initKafka() { ctx := context.Background() out, _ := json.Marshal(value) - err := ProduceStr(kafkaWriter, ctx, string(out)) + err := ProduceStr(kafkaWriter, ctx, string(out), "", "") err = Produce(kafkaWriter, ctx, payload) log.Println("logging kafka stats post pushing message") logKafkaStats() diff --git a/utils/util_functions.go b/utils/util_functions.go index c136f645..e311b451 100644 --- a/utils/util_functions.go +++ b/utils/util_functions.go @@ -1,7 +1,129 @@ package utils -import "strings" +import ( + "bufio" + "fmt" + "log/slog" + "os" + "strconv" + "strings" + "time" +) + +var DebugStrings = []string{} + +func init() { + debugStringsEnv := "" + InitVar("DEBUG_URLS", &debugStringsEnv) + if len(debugStringsEnv) > 0 { + DebugStrings = strings.Split(debugStringsEnv, ",") + } + slog.Info("debugStrings", "DebugStrings", DebugStrings) + + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + UpdateDebugStringsFromFile() + <-ticker.C + } + }() +} + +func InitVar(envVarName string, targetVar interface{}) { + envVar := os.Getenv(envVarName) + if len(envVar) > 0 { + switch v := targetVar.(type) { + case *bool: + *v = strings.ToLower(envVar) == "true" + slog.Warn("Setting env value", "name", envVarName, "value", *v) + case *string: + *v = envVar + slog.Warn("Setting env value", "name", envVarName, "value", *v) + case *time.Duration: + temp, err := time.ParseDuration(envVar + "s") + if err == nil { + *v = temp + slog.Warn("Setting env value", "name", envVarName, "value", *v) + } + case *int: + temp, err := strconv.Atoi(envVar) + if err == nil { + *v = temp + slog.Warn("Setting env value", "name", envVarName, "value", *v) + } + default: + slog.Warn("Unsupported type for targetVar", "type", v) + } + } else { + slog.Warn("Missing env value, using default value", "name", envVarName) + } +} + +// Reads /ebpf/debug-urls.txt and updates DebugStrings with any new URLs found in the file (one per line) +func UpdateDebugStringsFromFile() { + filePath := "/app/debug-urls.txt" + f, err := os.Open(filePath) + if err != nil { + // File may not exist, that's fine + return + } + defer f.Close() + + scanner := bufio.NewScanner(f) + fileUrls := []string{} + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line != "" { + fileUrls = append(fileUrls, line) + } + } + if err := scanner.Err(); err != nil { + return + } + + if len(fileUrls) > 0 { + // Merge with env DebugStrings, avoid duplicates + urlSet := make(map[string]struct{}) + for _, u := range DebugStrings { + urlSet[u] = struct{}{} + } + newUrls := []string{} + for _, u := range fileUrls { + if _, exists := urlSet[u]; !exists { + newUrls = append(newUrls, u) + } + urlSet[u] = struct{}{} + } + if len(newUrls) > 0 { + merged := make([]string, 0, len(urlSet)) + for u := range urlSet { + merged = append(merged, u) + } + DebugStrings = merged + slog.Info("New debugStrings found in file", "newUrls", newUrls, "DebugStrings", DebugStrings) + } + } +} func CheckIfIpHost(host string) bool { return strings.ToLower(host) == strings.ToUpper(host) } + + +func CheckDebugUrlAndPrint(url string, host string, message string) { + // url or host. [array string] + if len(DebugStrings) > 0 { + for _, debugString := range DebugStrings { + if strings.Contains(url, debugString) { + logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message) + slog.Warn(logMsg) + break + } else if strings.Contains(host, debugString) { + logMsg := fmt.Sprintf("url: %s, host: %s, message: %s", url, host, message) + slog.Warn(logMsg) + break + } + } + } +} \ No newline at end of file