Skip to content
Merged
63 changes: 32 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,37 +143,38 @@ When running a container with a non-root user, you need to give the container ac

```
Usage of amazon-eks-pod-identity-webhook:
--add_dir_header If true, adds the file directory to the header
--alsologtostderr log to standard error as well as files
--annotation-prefix string The Service Account annotation to look for (default "eks.amazonaws.com")
--aws-default-region string If set, AWS_DEFAULT_REGION and AWS_REGION will be set to this value in mutated containers
--enable-debugging-handlers Enable debugging handlers. Currently /debug/alpha/cache is supported
--in-cluster Use in-cluster authentication and certificate request API (default true)
--kube-api string (out-of-cluster) The url to the API server
--kubeconfig string (out-of-cluster) Absolute path to the API server kubeconfig file
--log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0)
--log_dir string If non-empty, write log files in this directory
--log_file string If non-empty, use this log file
--log_file_max_size uint Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. (default 1800)
--logtostderr log to standard error instead of files (default true)
--metrics-port int Port to listen on for metrics (http) (default 9999)
--namespace string (in-cluster) The namespace name this webhook, the TLS secret, and configmap resides in (default "eks")
--port int Port to listen on (default 443)
--service-name string (in-cluster) The service name fronting this webhook (default "pod-identity-webhook")
--skip_headers If true, avoid header prefixes in the log messages
--skip_log_headers If true, avoid headers when opening log files
--stderrthreshold severity logs at or above this threshold go to stderr (default 2)
--sts-regional-endpoint false Whether to inject the AWS_STS_REGIONAL_ENDPOINTS=regional env var in mutated pods. Defaults to false.
--tls-cert string (out-of-cluster) TLS certificate file path (default "/etc/webhook/certs/tls.crt")
--tls-key string (out-of-cluster) TLS key file path (default "/etc/webhook/certs/tls.key")
--tls-secret string (in-cluster) The secret name for storing the TLS serving cert (default "pod-identity-webhook")
--token-audience string The default audience for tokens. Can be overridden by annotation (default "sts.amazonaws.com")
--token-expiration int The token expiration (default 86400)
--token-mount-path string The path to mount tokens (default "/var/run/secrets/eks.amazonaws.com/serviceaccount")
-v, --v Level number for the log level verbosity
--version Display the version and exit
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
--watch-config-map Enables watching serviceaccounts that are configured through the pod-identity-webhook configmap instead of using annotations
--add_dir_header If true, adds the file directory to the header
--alsologtostderr log to standard error as well as files
--annotation-prefix string The Service Account annotation to look for (default "eks.amazonaws.com")
--aws-default-region string If set, AWS_DEFAULT_REGION and AWS_REGION will be set to this value in mutated containers
--enable-debugging-handlers Enable debugging handlers. Currently /debug/alpha/cache is supported
--in-cluster Use in-cluster authentication and certificate request API (default true)
--kube-api string (out-of-cluster) The url to the API server
--kubeconfig string (out-of-cluster) Absolute path to the API server kubeconfig file
--log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0)
--log_dir string If non-empty, write log files in this directory
--log_file string If non-empty, use this log file
--log_file_max_size uint Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. (default 1800)
--logtostderr log to standard error instead of files (default true)
--metrics-port int Port to listen on for metrics (http) (default 9999)
--namespace string (in-cluster) The namespace name this webhook, the TLS secret, and configmap resides in (default "eks")
--port int Port to listen on (default 443)
--service-name string (in-cluster) The service name fronting this webhook (default "pod-identity-webhook")
--service-account-lookup-grace-period The grace period for service account to be available in cache before not mutating a pod. Set to 0 to deactivate waiting. Carefully use higher values as it may have significant impact on Kubernetes' pod scheduling performance. (default 100ms)
--skip_headers If true, avoid header prefixes in the log messages
--skip_log_headers If true, avoid headers when opening log files
--stderrthreshold severity logs at or above this threshold go to stderr (default 2)
--sts-regional-endpoint false Whether to inject the AWS_STS_REGIONAL_ENDPOINTS=regional env var in mutated pods. Defaults to false.
--tls-cert string (out-of-cluster) TLS certificate file path (default "/etc/webhook/certs/tls.crt")
--tls-key string (out-of-cluster) TLS key file path (default "/etc/webhook/certs/tls.key")
--tls-secret string (in-cluster) The secret name for storing the TLS serving cert (default "pod-identity-webhook")
--token-audience string The default audience for tokens. Can be overridden by annotation (default "sts.amazonaws.com")
--token-expiration int The token expiration (default 86400)
--token-mount-path string The path to mount tokens (default "/var/run/secrets/eks.amazonaws.com/serviceaccount")
-v, --v Level number for the log level verbosity
--version Display the version and exit
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
--watch-config-map Enables watching serviceaccounts that are configured through the pod-identity-webhook configmap instead of using annotations
```

### AWS_DEFAULT_REGION Injection
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func main() {

debug := flag.Bool("enable-debugging-handlers", false, "Enable debugging handlers. Currently /debug/alpha/cache is supported")

saLookupGracePeriod := flag.Duration("service-account-lookup-grace-period", 100*time.Millisecond, "The grace period for service account to be available in cache before not mutating a pod. Defaults to 100ms. Set to 0 to deactivate waiting. Carefully use higher values as it may have significant impact on Kubernetes' pod scheduling performance.")

klog.InitFlags(goflag.CommandLine)
// Add klog CommandLine flags to pflag CommandLine
goflag.CommandLine.VisitAll(func(f *goflag.Flag) {
Expand Down Expand Up @@ -208,6 +210,7 @@ func main() {
handler.WithServiceAccountCache(saCache),
handler.WithContainerCredentialsConfig(containerCredentialsConfig),
handler.WithRegion(*region),
handler.WithSALookupGraceTime(*saLookupGracePeriod),
)

addr := fmt.Sprintf(":%d", *port)
Expand Down
165 changes: 113 additions & 52 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,48 @@ import (
"k8s.io/klog/v2"
)

type CacheResponse struct {
type Entry struct {
RoleARN string
Audience string
UseRegionalSTS bool
TokenExpiration int64
}

type Request struct {
Name string
Namespace string
RequestNotification bool
}

func (r Request) CacheKey() string {
return r.Namespace + "/" + r.Name
}
func (r Request) WithNotification() Request {
r.RequestNotification = true
return r
}

type Response struct {
RoleARN string
Audience string
UseRegionalSTS bool
TokenExpiration int64
FoundInCache bool
Notifier <-chan struct{}
}

type ServiceAccountCache interface {
Start(stop chan struct{})
Get(name, namespace string) (role, aud string, useRegionalSTS bool, tokenExpiration int64)
Get(request Request) Response
GetCommonConfigurations(name, namespace string) (useRegionalSTS bool, tokenExpiration int64)
// ToJSON returns cache contents as JSON string
ToJSON() string
}

type serviceAccountCache struct {
mu sync.RWMutex // guards cache
saCache map[string]*CacheResponse
cmCache map[string]*CacheResponse
saCache map[string]*Entry
cmCache map[string]*Entry
hasSynced cache.InformerSynced
clientset kubernetes.Interface
annotationPrefix string
Expand All @@ -60,6 +83,8 @@ type serviceAccountCache struct {
composeRoleArn ComposeRoleArn
defaultTokenExpiration int64
webhookUsage prometheus.Gauge
notificationHandlers map[string]chan struct{}
handlerMu sync.Mutex
}

type ComposeRoleArn struct {
Expand All @@ -85,56 +110,81 @@ func init() {
}

// Get will return the cached configuration of the given ServiceAccount.
// It will first look at the set of ServiceAccounts configured using annotations. If none are found, it will look for any
// ServiceAccount configured through the pod-identity-webhook ConfigMap.
func (c *serviceAccountCache) Get(name, namespace string) (role, aud string, useRegionalSTS bool, tokenExpiration int64) {
klog.V(5).Infof("Fetching sa %s/%s from cache", namespace, name)
// It will first look at the set of ServiceAccounts configured using annotations. If none is found and a notifier is
// requested, it will register a handler to be notified as soon as a ServiceAccount with given key is populated to the
// cache. Afterward it will check for a ServiceAccount configured through the pod-identity-webhook ConfigMap.
func (c *serviceAccountCache) Get(req Request) Response {
result := Response{
TokenExpiration: pkg.DefaultTokenExpiration,
}
klog.V(5).Infof("Fetching sa %s from cache", req.CacheKey())
{
resp := c.getSA(name, namespace)
if resp != nil && resp.RoleARN != "" {
return resp.RoleARN, resp.Audience, resp.UseRegionalSTS, resp.TokenExpiration
var entry *Entry
entry, result.Notifier = c.getSA(req)
if entry != nil {
result.FoundInCache = true
}
if entry != nil && entry.RoleARN != "" {
result.RoleARN = entry.RoleARN
result.Audience = entry.Audience
result.UseRegionalSTS = entry.UseRegionalSTS
result.TokenExpiration = entry.TokenExpiration
return result
}
}
{
resp := c.getCM(name, namespace)
if resp != nil {
return resp.RoleARN, resp.Audience, resp.UseRegionalSTS, resp.TokenExpiration
entry := c.getCM(req.Name, req.Namespace)
if entry != nil {
result.FoundInCache = true
result.RoleARN = entry.RoleARN
result.Audience = entry.Audience
result.UseRegionalSTS = entry.UseRegionalSTS
result.TokenExpiration = entry.TokenExpiration
return result
}
}
klog.V(5).Infof("Service account %s/%s not found in cache", namespace, name)
return "", "", false, pkg.DefaultTokenExpiration
klog.V(5).Infof("Service account %s not found in cache", req.CacheKey())
return result
}

// GetCommonConfigurations returns the common configurations that also applies to the new mutation method(i.e Container Credentials).
// The config file for the container credentials does not contain "TokenExpiration" or "UseRegionalSTS". For backward compatibility,
// Use these fields if they are set in the sa annotations or config map.
func (c *serviceAccountCache) GetCommonConfigurations(name, namespace string) (useRegionalSTS bool, tokenExpiration int64) {
if resp := c.getSA(name, namespace); resp != nil {
return resp.UseRegionalSTS, resp.TokenExpiration
} else if resp := c.getCM(name, namespace); resp != nil {
return resp.UseRegionalSTS, resp.TokenExpiration
if entry, _ := c.getSA(Request{Name: name, Namespace: namespace, RequestNotification: false}); entry != nil {
return entry.UseRegionalSTS, entry.TokenExpiration
} else if entry := c.getCM(name, namespace); entry != nil {
return entry.UseRegionalSTS, entry.TokenExpiration
}
return false, pkg.DefaultTokenExpiration
}

func (c *serviceAccountCache) getSA(name, namespace string) *CacheResponse {
func (c *serviceAccountCache) getSA(req Request) (*Entry, chan struct{}) {
c.mu.RLock()
defer c.mu.RUnlock()
resp, ok := c.saCache[namespace+"/"+name]
if !ok {
return nil
entry, ok := c.saCache[req.CacheKey()]
if !ok && req.RequestNotification {
klog.V(5).Infof("Service Account %s not found in cache, adding notification handler", req.CacheKey())
c.handlerMu.Lock()
defer c.handlerMu.Unlock()
notifier, found := c.notificationHandlers[req.CacheKey()]
if !found {
notifier = make(chan struct{})
c.notificationHandlers[req.CacheKey()] = notifier
}
return nil, notifier
}
return resp
return entry, nil
}

func (c *serviceAccountCache) getCM(name, namespace string) *CacheResponse {
func (c *serviceAccountCache) getCM(name, namespace string) *Entry {
c.mu.RLock()
defer c.mu.RUnlock()
resp, ok := c.cmCache[namespace+"/"+name]
entry, ok := c.cmCache[namespace+"/"+name]
if !ok {
return nil
}
return resp
return entry
}

func (c *serviceAccountCache) popSA(name, namespace string) {
Expand Down Expand Up @@ -164,7 +214,7 @@ func (c *serviceAccountCache) ToJSON() string {
}

func (c *serviceAccountCache) addSA(sa *v1.ServiceAccount) {
resp := &CacheResponse{}
entry := &Entry{}

arn, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.RoleARNAnnotation]
if ok {
Expand All @@ -178,49 +228,59 @@ func (c *serviceAccountCache) addSA(sa *v1.ServiceAccount) {
} else if !matched {
klog.Warningf("arn is invalid: %s", arn)
}
resp.RoleARN = arn
entry.RoleARN = arn
}

resp.Audience = c.defaultAudience
entry.Audience = c.defaultAudience
if audience, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.AudienceAnnotation]; ok {
resp.Audience = audience
entry.Audience = audience
}

resp.UseRegionalSTS = c.defaultRegionalSTS
entry.UseRegionalSTS = c.defaultRegionalSTS
if useRegionalStr, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.UseRegionalSTSAnnotation]; ok {
useRegional, err := strconv.ParseBool(useRegionalStr)
if err != nil {
klog.V(4).Infof("Ignoring service account %s/%s invalid value for disable-regional-sts annotation", sa.Namespace, sa.Name)
} else {
resp.UseRegionalSTS = useRegional
entry.UseRegionalSTS = useRegional
}
}

resp.TokenExpiration = c.defaultTokenExpiration
entry.TokenExpiration = c.defaultTokenExpiration
if tokenExpirationStr, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.TokenExpirationAnnotation]; ok {
if tokenExpiration, err := strconv.ParseInt(tokenExpirationStr, 10, 64); err != nil {
klog.V(4).Infof("Found invalid value for token expiration, using %d seconds as default: %v", resp.TokenExpiration, err)
klog.V(4).Infof("Found invalid value for token expiration, using %d seconds as default: %v", entry.TokenExpiration, err)
} else {
resp.TokenExpiration = pkg.ValidateMinTokenExpiration(tokenExpiration)
entry.TokenExpiration = pkg.ValidateMinTokenExpiration(tokenExpiration)
}
}
c.webhookUsage.Set(1)

c.setSA(sa.Name, sa.Namespace, resp)
c.setSA(sa.Name, sa.Namespace, entry)
}

func (c *serviceAccountCache) setSA(name, namespace string, resp *CacheResponse) {
func (c *serviceAccountCache) setSA(name, namespace string, entry *Entry) {
c.mu.Lock()
defer c.mu.Unlock()
klog.V(5).Infof("Adding SA %s/%s to SA cache: %+v", namespace, name, resp)
c.saCache[namespace+"/"+name] = resp

key := namespace + "/" + name
klog.V(5).Infof("Adding SA %q to SA cache: %+v", key, entry)
c.saCache[key] = entry

c.handlerMu.Lock()
defer c.handlerMu.Unlock()
if handler, found := c.notificationHandlers[key]; found {
klog.V(5).Infof("Notifying handlers for %q", key)
close(handler)
delete(c.notificationHandlers, key)
}
}

func (c *serviceAccountCache) setCM(name, namespace string, resp *CacheResponse) {
func (c *serviceAccountCache) setCM(name, namespace string, entry *Entry) {
c.mu.Lock()
defer c.mu.Unlock()
klog.V(5).Infof("Adding SA %s/%s to CM cache: %+v", namespace, name, resp)
c.cmCache[namespace+"/"+name] = resp
klog.V(5).Infof("Adding SA %s/%s to CM cache: %+v", namespace, name, entry)
c.cmCache[namespace+"/"+name] = entry
}

func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenExpiration int64, saInformer coreinformers.ServiceAccountInformer, cmInformer coreinformers.ConfigMapInformer, composeRoleArn ComposeRoleArn) ServiceAccountCache {
Expand All @@ -233,15 +293,16 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
}

c := &serviceAccountCache{
saCache: map[string]*CacheResponse{},
cmCache: map[string]*CacheResponse{},
saCache: map[string]*Entry{},
cmCache: map[string]*Entry{},
defaultAudience: defaultAudience,
annotationPrefix: prefix,
defaultRegionalSTS: defaultRegionalSTS,
composeRoleArn: composeRoleArn,
defaultTokenExpiration: defaultTokenExpiration,
hasSynced: hasSynced,
webhookUsage: webhookUsage,
notificationHandlers: map[string]chan struct{}{},
}

saInformer.Informer().AddEventHandler(
Expand Down Expand Up @@ -298,22 +359,22 @@ func (c *serviceAccountCache) populateCacheFromCM(oldCM, newCM *v1.ConfigMap) er
return nil
}
newConfig := newCM.Data["config"]
sas := make(map[string]*CacheResponse)
sas := make(map[string]*Entry)
err := json.Unmarshal([]byte(newConfig), &sas)
if err != nil {
return fmt.Errorf("failed to unmarshal new config %q: %v", newConfig, err)
}
for key, resp := range sas {
for key, entry := range sas {
parts := strings.Split(key, "/")
if resp.TokenExpiration == 0 {
resp.TokenExpiration = c.defaultTokenExpiration
if entry.TokenExpiration == 0 {
entry.TokenExpiration = c.defaultTokenExpiration
}
c.setCM(parts[1], parts[0], resp)
c.setCM(parts[1], parts[0], entry)
}

if oldCM != nil {
oldConfig := oldCM.Data["config"]
oldCache := make(map[string]*CacheResponse)
oldCache := make(map[string]*Entry)
err := json.Unmarshal([]byte(oldConfig), &oldCache)
if err != nil {
return fmt.Errorf("failed to unmarshal old config %q: %v", oldConfig, err)
Expand Down
Loading