Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ This image is configurable using different flags
| sasl.handshake | true | Only set this to false if using a non-Kafka SASL proxy |
| sasl.username | | SASL user name |
| sasl.password | | SASL user password |
| sasl.mechanism | plain | SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi or awsiam |
| sasl.mechanism | plain | SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi, awsiam or oauthbearer |
| sasl.aws-region | AWS_REGION env | The AWS region for IAM SASL authentication |
| sasl.oauthbearer-token-url | | The url to retrieve OAuthBearer tokens from, for OAuthBearer SASL authentication |
| sasl.service-name | | Service name when using Kerberos Auth |
| sasl.kerberos-config-path | | Kerberos config path |
| sasl.realm | | Kerberos realm |
Expand Down
1 change: 1 addition & 0 deletions charts/kafka-exporter/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ spec:
- --sasl.username={{ .Values.kafkaExporter.sasl.username }}
- --sasl.password={{ .Values.kafkaExporter.sasl.password }}
- --sasl.mechanism={{ .Values.kafkaExporter.sasl.mechanism }}
- --sasl.oauthbearer-token-url={{ .Values.kafkaExporter.sasl.oauthbearerTokenUrl }}
{{- end }}
{{- if .Values.kafkaExporter.tls.enabled}}
- --tls.enabled
Expand Down
1 change: 1 addition & 0 deletions charts/kafka-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ kafkaExporter:
username: ""
password: ""
mechanism: ""
oauthbearerTokenUrl: ""

tls:
enabled: false
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/danielqsj/kafka_exporter

go 1.24
go 1.24.0

require (
github.com/IBM/sarama v1.45.2
Expand All @@ -11,6 +11,7 @@ require (
github.com/prometheus/common v0.55.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/xdg-go/scram v1.1.2
golang.org/x/oauth2 v0.32.0
k8s.io/klog/v2 v2.130.1
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY=
golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
Expand Down
62 changes: 61 additions & 1 deletion kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
plog "github.com/prometheus/common/promlog"
plogflag "github.com/prometheus/common/promlog/flag"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"

versionCollector "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/common/version"
Expand Down Expand Up @@ -91,6 +93,7 @@ type kafkaOpts struct {
saslMechanism string
saslDisablePAFXFast bool
saslAwsRegion string
saslOAuthBearerTokenUrl string
useTLS bool
tlsServerName string
tlsCAFile string
Expand Down Expand Up @@ -128,6 +131,43 @@ func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) {
return &sarama.AccessToken{Token: token}, err
}

type OAuth2Config interface {
Token(ctx context.Context) (*oauth2.Token, error)
}

type oauthbearerTokenProvider struct {
tokenExpiration time.Time
token string
oauth2Config OAuth2Config
}

func newOauthbearerTokenProvider(oauth2Config OAuth2Config) *oauthbearerTokenProvider {
return &oauthbearerTokenProvider{
tokenExpiration: time.Time{},
token: "",
oauth2Config: oauth2Config,
}
}

func (o *oauthbearerTokenProvider) Token() (*sarama.AccessToken, error) {
var accessToken string
var err error

if o.token != "" && time.Now().Before(o.tokenExpiration.Add(time.Duration(-2)*time.Second)) {
accessToken = o.token
err = nil
} else {
token, err := o.oauth2Config.Token(context.Background())
if err == nil {
accessToken = token.AccessToken
o.token = token.AccessToken
o.tokenExpiration = token.Expiry
}
}

return &sarama.AccessToken{Token: accessToken}, err
}

// CanReadCertAndKey returns true if the certificate and key files already exists,
// otherwise returns false. If lost one of cert and key, returns error.
func CanReadCertAndKey(certPath, keyPath string) (bool, error) {
Expand Down Expand Up @@ -208,6 +248,25 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF
case "awsiam":
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth)
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: opts.saslAwsRegion}
case "oauthbearer":
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth)
tokenUrl := opts.saslOAuthBearerTokenUrl
if tokenUrl == "" {
tokenUrl = os.Getenv("SASL_OAUTHBEARER_TOKEN_URL")
}
if tokenUrl == "" {
log.Fatalf("[ERROR] sasl.oauthbearer-token-url must be configured or SASL_OAUTHBEARER_TOKEN_URL environment variable must be set when using the OAuthBearer SASL mechanism")
}
saslUsername := opts.saslUsername
if saslUsername == "" {
log.Fatalf("[ERROR] sasl.username must be configured when using the OAuthBearer SASL mechanism")
}
oauth2Config := clientcredentials.Config{
TokenURL: tokenUrl,
ClientID: saslUsername,
ClientSecret: saslPassword,
}
config.Net.SASL.TokenProvider = newOauthbearerTokenProvider(&oauth2Config)
case "plain":
default:
return nil, fmt.Errorf(
Expand Down Expand Up @@ -773,7 +832,8 @@ func main() {
toFlagStringVar("sasl.username", "SASL user name.", "", &opts.saslUsername)
toFlagStringVar("sasl.password", "SASL user password.", "", &opts.saslPassword)
toFlagStringVar("sasl.aws-region", "The AWS region for IAM SASL authentication", os.Getenv("AWS_REGION"), &opts.saslAwsRegion)
toFlagStringVar("sasl.mechanism", "SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi or awsiam", "", &opts.saslMechanism)
toFlagStringVar("sasl.oauthbearer-token-url", "The url to retrieve OAuthBearer tokens from, for OAuthBearer SASL authentication", "", &opts.saslOAuthBearerTokenUrl)
toFlagStringVar("sasl.mechanism", "SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi, awsiam or oauthbearer", "", &opts.saslMechanism)
toFlagStringVar("sasl.service-name", "Service name when using kerberos Auth", "", &opts.serviceName)
toFlagStringVar("sasl.kerberos-config-path", "Kerberos config path", "", &opts.kerberosConfigPath)
toFlagStringVar("sasl.realm", "Kerberos realm", "", &opts.realm)
Expand Down