Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ This image is configurable using different flags
| sasl.handshake | true | Only set this to false if using a non-Kafka SASL proxy |
| sasl.username | | SASL user name |
| sasl.password | | SASL user password |
| sasl.mechanism | plain | SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi or awsiam |
| sasl.mechanism | plain | SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi, awsiam or oauthbearer |
| sasl.aws-region | AWS_REGION env | The AWS region for IAM SASL authentication |
| sasl.oauthbearer-token-url | | The url to retrieve OAuthBearer tokens from, for OAuthBearer SASL authentication |
| sasl.service-name | | Service name when using Kerberos Auth |
| sasl.kerberos-config-path | | Kerberos config path |
| sasl.realm | | Kerberos realm |
Expand Down
1 change: 1 addition & 0 deletions charts/kafka-exporter/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ spec:
- --sasl.username={{ .Values.kafkaExporter.sasl.username }}
- --sasl.password={{ .Values.kafkaExporter.sasl.password }}
- --sasl.mechanism={{ .Values.kafkaExporter.sasl.mechanism }}
- --sasl.oauthbearer-token-url={{ .Values.kafkaExporter.sasl.oauthbearerTokenUrl }}
{{- end }}
{{- if .Values.kafkaExporter.tls.enabled}}
- --tls.enabled
Expand Down
1 change: 1 addition & 0 deletions charts/kafka-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ kafkaExporter:
username: ""
password: ""
mechanism: ""
oauthbearerTokenUrl: ""

tls:
enabled: false
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/danielqsj/kafka_exporter

go 1.24
go 1.24.0

require (
github.com/IBM/sarama v1.45.2
Expand All @@ -11,6 +11,7 @@ require (
github.com/prometheus/common v0.55.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/xdg-go/scram v1.1.2
golang.org/x/oauth2 v0.32.0
k8s.io/klog/v2 v2.130.1
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY=
golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
Expand Down
65 changes: 64 additions & 1 deletion kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
plog "github.com/prometheus/common/promlog"
plogflag "github.com/prometheus/common/promlog/flag"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"

versionCollector "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/common/version"
Expand Down Expand Up @@ -91,6 +93,7 @@ type kafkaOpts struct {
saslMechanism string
saslDisablePAFXFast bool
saslAwsRegion string
saslOAuthBearerTokenUrl string
useTLS bool
tlsServerName string
tlsCAFile string
Expand Down Expand Up @@ -128,6 +131,46 @@ func (m *MSKAccessTokenProvider) Token() (*sarama.AccessToken, error) {
return &sarama.AccessToken{Token: token}, err
}

type OAuth2Config interface {
Token(ctx context.Context) (*oauth2.Token, error)
}

type oauthbearerTokenProvider struct {
tokenExpiration time.Time
token string
oauth2Config OAuth2Config
}

func newOauthbearerTokenProvider(oauth2Config OAuth2Config) *oauthbearerTokenProvider {
return &oauthbearerTokenProvider{
tokenExpiration: time.Time{},
token: "",
oauth2Config: oauth2Config,
}
}

func (o *oauthbearerTokenProvider) Token() (*sarama.AccessToken, error) {
var accessToken string
var err error
currentTime := time.Now()
ctx := context.Background()

if o.token != "" && currentTime.Before(o.tokenExpiration.Add(time.Duration(-2)*time.Second)) {
accessToken = o.token
err = nil
} else {
token, _err := o.oauth2Config.Token(ctx)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. currenttime and ctx only be used once, so I suggest that no need to define them before
  2. is it need to use _err here? Can we just use err ?

Copy link
Author

@LuxTheDude LuxTheDude Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both points are correct. I am new to GO and had some false assumptions.

I have updated the code.

err = _err
if err == nil {
accessToken = token.AccessToken
o.token = token.AccessToken
o.tokenExpiration = token.Expiry
}
}

return &sarama.AccessToken{Token: accessToken}, err
}

// CanReadCertAndKey returns true if the certificate and key files already exists,
// otherwise returns false. If lost one of cert and key, returns error.
func CanReadCertAndKey(certPath, keyPath string) (bool, error) {
Expand Down Expand Up @@ -208,6 +251,25 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF
case "awsiam":
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth)
config.Net.SASL.TokenProvider = &MSKAccessTokenProvider{region: opts.saslAwsRegion}
case "oauthbearer":
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeOAuth)
tokenUrl := opts.saslOAuthBearerTokenUrl
if tokenUrl == "" {
tokenUrl = os.Getenv("SASL_OAUTHBEARER_TOKEN_URL")
}
if tokenUrl == "" {
log.Fatalf("[ERROR] sasl.oauthbearer-token-url must be configured or SASL_OAUTHBEARER_TOKEN_URL environment variable must be set when using the OAuthBearer SASL mechanism")
}
saslUsername := opts.saslUsername
if saslUsername == "" {
log.Fatalf("[ERROR] sasl.username must be configured when using the OAuthBearer SASL mechanism")
}
oauth2Config := clientcredentials.Config{
TokenURL: tokenUrl,
ClientID: saslUsername,
ClientSecret: saslPassword,
}
config.Net.SASL.TokenProvider = newOauthbearerTokenProvider(&oauth2Config)
case "plain":
default:
return nil, fmt.Errorf(
Expand Down Expand Up @@ -773,7 +835,8 @@ func main() {
toFlagStringVar("sasl.username", "SASL user name.", "", &opts.saslUsername)
toFlagStringVar("sasl.password", "SASL user password.", "", &opts.saslPassword)
toFlagStringVar("sasl.aws-region", "The AWS region for IAM SASL authentication", os.Getenv("AWS_REGION"), &opts.saslAwsRegion)
toFlagStringVar("sasl.mechanism", "SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi or awsiam", "", &opts.saslMechanism)
toFlagStringVar("sasl.oauthbearer-token-url", "The url to retrieve OAuthBearer tokens from, for OAuthBearer SASL authentication", "", &opts.saslOAuthBearerTokenUrl)
toFlagStringVar("sasl.mechanism", "SASL SCRAM SHA algorithm: sha256 or sha512 or SASL mechanism: gssapi, awsiam or oauthbearer", "", &opts.saslMechanism)
toFlagStringVar("sasl.service-name", "Service name when using kerberos Auth", "", &opts.serviceName)
toFlagStringVar("sasl.kerberos-config-path", "Kerberos config path", "", &opts.kerberosConfigPath)
toFlagStringVar("sasl.realm", "Kerberos realm", "", &opts.realm)
Expand Down