diff --git a/README.md b/README.md index 6536672f..8cb84de5 100644 --- a/README.md +++ b/README.md @@ -115,8 +115,9 @@ This image is configurable using different flags | sasl.handshake | true | Only set this to false if using a non-Kafka SASL proxy | | sasl.username | | SASL user name | | sasl.password | | SASL user password | -| sasl.mechanism | plain | SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi or awsiam | +| sasl.mechanism | plain | SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi, awsiam or oauthbearer | | sasl.aws-region | AWS_REGION env | The AWS region for IAM SASL authentication | +| sasl.oauthbearer-token-url | | The url to retrieve OAuthBearer tokens from, for OAuthBearer SASL authentication | | sasl.service-name | | Service name when using Kerberos Auth | | sasl.kerberos-config-path | | Kerberos config path | | sasl.realm | | Kerberos realm | diff --git a/charts/kafka-exporter/templates/deployment.yaml b/charts/kafka-exporter/templates/deployment.yaml index e2449a2d..d75bccc6 100644 --- a/charts/kafka-exporter/templates/deployment.yaml +++ b/charts/kafka-exporter/templates/deployment.yaml @@ -61,6 +61,7 @@ spec: - --sasl.username={{ .Values.kafkaExporter.sasl.username }} - --sasl.password={{ .Values.kafkaExporter.sasl.password }} - --sasl.mechanism={{ .Values.kafkaExporter.sasl.mechanism }} + - --sasl.oauthbearer-token-url={{ .Values.kafkaExporter.sasl.oauthbearerTokenUrl }} {{- end }} {{- if .Values.kafkaExporter.tls.enabled}} - --tls.enabled diff --git a/charts/kafka-exporter/values.yaml b/charts/kafka-exporter/values.yaml index 2fc97147..edd443f7 100644 --- a/charts/kafka-exporter/values.yaml +++ b/charts/kafka-exporter/values.yaml @@ -28,6 +28,7 @@ kafkaExporter: username: "" password: "" mechanism: "" + oauthbearerTokenUrl: "" tls: enabled: false diff --git a/go.mod b/go.mod index b2088a9d..8ec4aa5f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/danielqsj/kafka_exporter -go 1.24 +go 1.24.0 require ( github.com/IBM/sarama v1.45.2 @@ -11,6 +11,7 @@ require ( github.com/prometheus/common v0.55.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/xdg-go/scram v1.1.2 + golang.org/x/oauth2 v0.32.0 k8s.io/klog/v2 v2.130.1 ) diff --git a/go.sum b/go.sum index 4fa70a6e..792731a7 100644 --- a/go.sum +++ b/go.sum @@ -139,6 +139,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY= +golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= diff --git a/kafka_exporter.go b/kafka_exporter.go index 20a581e9..9a8c7904 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -23,6 +23,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" plog "github.com/prometheus/common/promlog" plogflag "github.com/prometheus/common/promlog/flag" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" versionCollector "github.com/prometheus/client_golang/prometheus/collectors/version" "github.com/prometheus/common/version" @@ -91,6 +93,7 @@ type kafkaOpts struct { saslMechanism string saslDisablePAFXFast bool saslAwsRegion string + saslOAuthBearerTokenUrl string useTLS bool tlsServerName string tlsCAFile string @@ -128,6 +131,43 @@ func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) { return &sarama.AccessToken{Token: token}, err } +type OAuth2Config interface { + Token(ctx context.Context) (*oauth2.Token, error) +} + +type oauthbearerTokenProvider struct { + tokenExpiration time.Time + token string + oauth2Config OAuth2Config +} + +func newOauthbearerTokenProvider(oauth2Config OAuth2Config) *oauthbearerTokenProvider { + return &oauthbearerTokenProvider{ + tokenExpiration: time.Time{}, + token: "", + oauth2Config: oauth2Config, + } +} + +func (o *oauthbearerTokenProvider) Token() (*sarama.AccessToken, error) { + var accessToken string + var err error + + if o.token != "" && time.Now().Before(o.tokenExpiration.Add(time.Duration(-2)*time.Second)) { + accessToken = o.token + err = nil + } else { + token, err := o.oauth2Config.Token(context.Background()) + if err == nil { + accessToken = token.AccessToken + o.token = token.AccessToken + o.tokenExpiration = token.Expiry + } + } + + return &sarama.AccessToken{Token: accessToken}, err +} + // CanReadCertAndKey returns true if the certificate and key files already exists, // otherwise returns false. If lost one of cert and key, returns error. func CanReadCertAndKey(certPath, keyPath string) (bool, error) { @@ -208,6 +248,25 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF case "awsiam": config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth) config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: opts.saslAwsRegion} + case "oauthbearer": + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth) + tokenUrl := opts.saslOAuthBearerTokenUrl + if tokenUrl == "" { + tokenUrl = os.Getenv("SASL_OAUTHBEARER_TOKEN_URL") + } + if tokenUrl == "" { + log.Fatalf("[ERROR] sasl.oauthbearer-token-url must be configured or SASL_OAUTHBEARER_TOKEN_URL environment variable must be set when using the OAuthBearer SASL mechanism") + } + saslUsername := opts.saslUsername + if saslUsername == "" { + log.Fatalf("[ERROR] sasl.username must be configured when using the OAuthBearer SASL mechanism") + } + oauth2Config := clientcredentials.Config{ + TokenURL: tokenUrl, + ClientID: saslUsername, + ClientSecret: saslPassword, + } + config.Net.SASL.TokenProvider = newOauthbearerTokenProvider(&oauth2Config) case "plain": default: return nil, fmt.Errorf( @@ -773,7 +832,8 @@ func main() { toFlagStringVar("sasl.username", "SASL user name.", "", &opts.saslUsername) toFlagStringVar("sasl.password", "SASL user password.", "", &opts.saslPassword) toFlagStringVar("sasl.aws-region", "The AWS region for IAM SASL authentication", os.Getenv("AWS_REGION"), &opts.saslAwsRegion) - toFlagStringVar("sasl.mechanism", "SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi or awsiam", "", &opts.saslMechanism) + toFlagStringVar("sasl.oauthbearer-token-url", "The url to retrieve OAuthBearer tokens from, for OAuthBearer SASL authentication", "", &opts.saslOAuthBearerTokenUrl) + toFlagStringVar("sasl.mechanism", "SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi, awsiam or oauthbearer", "", &opts.saslMechanism) toFlagStringVar("sasl.service-name", "Service name when using kerberos Auth", "", &opts.serviceName) toFlagStringVar("sasl.kerberos-config-path", "Kerberos config path", "", &opts.kerberosConfigPath) toFlagStringVar("sasl.realm", "Kerberos realm", "", &opts.realm)