Skip to content

Commit ebac929

Browse files
authored
Merge pull request #236 from c445/roehrijn/fix-with-notification
Fix race condition between service account availability and webhook invocation
2 parents d1673c1 + 1e24598 commit ebac929

File tree

8 files changed

+381
-160
lines changed

8 files changed

+381
-160
lines changed

README.md

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -143,37 +143,38 @@ When running a container with a non-root user, you need to give the container ac
143143

144144
```
145145
Usage of amazon-eks-pod-identity-webhook:
146-
--add_dir_header If true, adds the file directory to the header
147-
--alsologtostderr log to standard error as well as files
148-
--annotation-prefix string The Service Account annotation to look for (default "eks.amazonaws.com")
149-
--aws-default-region string If set, AWS_DEFAULT_REGION and AWS_REGION will be set to this value in mutated containers
150-
--enable-debugging-handlers Enable debugging handlers. Currently /debug/alpha/cache is supported
151-
--in-cluster Use in-cluster authentication and certificate request API (default true)
152-
--kube-api string (out-of-cluster) The url to the API server
153-
--kubeconfig string (out-of-cluster) Absolute path to the API server kubeconfig file
154-
--log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0)
155-
--log_dir string If non-empty, write log files in this directory
156-
--log_file string If non-empty, use this log file
157-
--log_file_max_size uint Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. (default 1800)
158-
--logtostderr log to standard error instead of files (default true)
159-
--metrics-port int Port to listen on for metrics (http) (default 9999)
160-
--namespace string (in-cluster) The namespace name this webhook, the TLS secret, and configmap resides in (default "eks")
161-
--port int Port to listen on (default 443)
162-
--service-name string (in-cluster) The service name fronting this webhook (default "pod-identity-webhook")
163-
--skip_headers If true, avoid header prefixes in the log messages
164-
--skip_log_headers If true, avoid headers when opening log files
165-
--stderrthreshold severity logs at or above this threshold go to stderr (default 2)
166-
--sts-regional-endpoint false Whether to inject the AWS_STS_REGIONAL_ENDPOINTS=regional env var in mutated pods. Defaults to false.
167-
--tls-cert string (out-of-cluster) TLS certificate file path (default "/etc/webhook/certs/tls.crt")
168-
--tls-key string (out-of-cluster) TLS key file path (default "/etc/webhook/certs/tls.key")
169-
--tls-secret string (in-cluster) The secret name for storing the TLS serving cert (default "pod-identity-webhook")
170-
--token-audience string The default audience for tokens. Can be overridden by annotation (default "sts.amazonaws.com")
171-
--token-expiration int The token expiration (default 86400)
172-
--token-mount-path string The path to mount tokens (default "/var/run/secrets/eks.amazonaws.com/serviceaccount")
173-
-v, --v Level number for the log level verbosity
174-
--version Display the version and exit
175-
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
176-
--watch-config-map Enables watching serviceaccounts that are configured through the pod-identity-webhook configmap instead of using annotations
146+
--add_dir_header If true, adds the file directory to the header
147+
--alsologtostderr log to standard error as well as files
148+
--annotation-prefix string The Service Account annotation to look for (default "eks.amazonaws.com")
149+
--aws-default-region string If set, AWS_DEFAULT_REGION and AWS_REGION will be set to this value in mutated containers
150+
--enable-debugging-handlers Enable debugging handlers. Currently /debug/alpha/cache is supported
151+
--in-cluster Use in-cluster authentication and certificate request API (default true)
152+
--kube-api string (out-of-cluster) The url to the API server
153+
--kubeconfig string (out-of-cluster) Absolute path to the API server kubeconfig file
154+
--log_backtrace_at traceLocation when logging hits line file:N, emit a stack trace (default :0)
155+
--log_dir string If non-empty, write log files in this directory
156+
--log_file string If non-empty, use this log file
157+
--log_file_max_size uint Defines the maximum size a log file can grow to. Unit is megabytes. If the value is 0, the maximum file size is unlimited. (default 1800)
158+
--logtostderr log to standard error instead of files (default true)
159+
--metrics-port int Port to listen on for metrics (http) (default 9999)
160+
--namespace string (in-cluster) The namespace name this webhook, the TLS secret, and configmap resides in (default "eks")
161+
--port int Port to listen on (default 443)
162+
--service-name string (in-cluster) The service name fronting this webhook (default "pod-identity-webhook")
163+
--service-account-lookup-grace-period The grace period for service account to be available in cache before not mutating a pod. Set to 0 to deactivate waiting. Carefully use higher values as it may have significant impact on Kubernetes' pod scheduling performance. (default 100ms)
164+
--skip_headers If true, avoid header prefixes in the log messages
165+
--skip_log_headers If true, avoid headers when opening log files
166+
--stderrthreshold severity logs at or above this threshold go to stderr (default 2)
167+
--sts-regional-endpoint false Whether to inject the AWS_STS_REGIONAL_ENDPOINTS=regional env var in mutated pods. Defaults to false.
168+
--tls-cert string (out-of-cluster) TLS certificate file path (default "/etc/webhook/certs/tls.crt")
169+
--tls-key string (out-of-cluster) TLS key file path (default "/etc/webhook/certs/tls.key")
170+
--tls-secret string (in-cluster) The secret name for storing the TLS serving cert (default "pod-identity-webhook")
171+
--token-audience string The default audience for tokens. Can be overridden by annotation (default "sts.amazonaws.com")
172+
--token-expiration int The token expiration (default 86400)
173+
--token-mount-path string The path to mount tokens (default "/var/run/secrets/eks.amazonaws.com/serviceaccount")
174+
-v, --v Level number for the log level verbosity
175+
--version Display the version and exit
176+
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
177+
--watch-config-map Enables watching serviceaccounts that are configured through the pod-identity-webhook configmap instead of using annotations
177178
```
178179
179180
### AWS_DEFAULT_REGION Injection

main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ func main() {
8686

8787
debug := flag.Bool("enable-debugging-handlers", false, "Enable debugging handlers. Currently /debug/alpha/cache is supported")
8888

89+
saLookupGracePeriod := flag.Duration("service-account-lookup-grace-period", 0, "The grace period for service account to be available in cache before not mutating a pod. Defaults to 0, what deactivates waiting. Carefully use values higher than a bunch of milliseconds as it may have significant impact on Kubernetes' pod scheduling performance.")
90+
8991
klog.InitFlags(goflag.CommandLine)
9092
// Add klog CommandLine flags to pflag CommandLine
9193
goflag.CommandLine.VisitAll(func(f *goflag.Flag) {
@@ -208,6 +210,7 @@ func main() {
208210
handler.WithServiceAccountCache(saCache),
209211
handler.WithContainerCredentialsConfig(containerCredentialsConfig),
210212
handler.WithRegion(*region),
213+
handler.WithSALookupGraceTime(*saLookupGracePeriod),
211214
)
212215

213216
addr := fmt.Sprintf(":%d", *port)

pkg/cache/cache.go

Lines changed: 109 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,44 @@ import (
3333
"k8s.io/klog/v2"
3434
)
3535

36-
type CacheResponse struct {
36+
type Entry struct {
3737
RoleARN string
3838
Audience string
3939
UseRegionalSTS bool
4040
TokenExpiration int64
4141
}
4242

43+
type Request struct {
44+
Name string
45+
Namespace string
46+
RequestNotification bool
47+
}
48+
49+
func (r Request) CacheKey() string {
50+
return r.Namespace + "/" + r.Name
51+
}
52+
53+
type Response struct {
54+
RoleARN string
55+
Audience string
56+
UseRegionalSTS bool
57+
TokenExpiration int64
58+
FoundInCache bool
59+
Notifier <-chan struct{}
60+
}
61+
4362
type ServiceAccountCache interface {
4463
Start(stop chan struct{})
45-
Get(name, namespace string) (role, aud string, useRegionalSTS bool, tokenExpiration int64)
64+
Get(request Request) Response
4665
GetCommonConfigurations(name, namespace string) (useRegionalSTS bool, tokenExpiration int64)
4766
// ToJSON returns cache contents as JSON string
4867
ToJSON() string
4968
}
5069

5170
type serviceAccountCache struct {
5271
mu sync.RWMutex // guards cache
53-
saCache map[string]*CacheResponse
54-
cmCache map[string]*CacheResponse
72+
saCache map[string]*Entry
73+
cmCache map[string]*Entry
5574
hasSynced cache.InformerSynced
5675
clientset kubernetes.Interface
5776
annotationPrefix string
@@ -60,6 +79,8 @@ type serviceAccountCache struct {
6079
composeRoleArn ComposeRoleArn
6180
defaultTokenExpiration int64
6281
webhookUsage prometheus.Gauge
82+
notificationHandlers map[string]chan struct{}
83+
handlerMu sync.Mutex
6384
}
6485

6586
type ComposeRoleArn struct {
@@ -85,56 +106,81 @@ func init() {
85106
}
86107

87108
// Get will return the cached configuration of the given ServiceAccount.
88-
// It will first look at the set of ServiceAccounts configured using annotations. If none are found, it will look for any
89-
// ServiceAccount configured through the pod-identity-webhook ConfigMap.
90-
func (c *serviceAccountCache) Get(name, namespace string) (role, aud string, useRegionalSTS bool, tokenExpiration int64) {
91-
klog.V(5).Infof("Fetching sa %s/%s from cache", namespace, name)
109+
// It will first look at the set of ServiceAccounts configured using annotations. If none is found and a notifier is
110+
// requested, it will register a handler to be notified as soon as a ServiceAccount with given key is populated to the
111+
// cache. Afterward it will check for a ServiceAccount configured through the pod-identity-webhook ConfigMap.
112+
func (c *serviceAccountCache) Get(req Request) Response {
113+
result := Response{
114+
TokenExpiration: pkg.DefaultTokenExpiration,
115+
}
116+
klog.V(5).Infof("Fetching sa %s from cache", req.CacheKey())
92117
{
93-
resp := c.getSA(name, namespace)
94-
if resp != nil && resp.RoleARN != "" {
95-
return resp.RoleARN, resp.Audience, resp.UseRegionalSTS, resp.TokenExpiration
118+
var entry *Entry
119+
entry, result.Notifier = c.getSA(req)
120+
if entry != nil {
121+
result.FoundInCache = true
122+
}
123+
if entry != nil && entry.RoleARN != "" {
124+
result.RoleARN = entry.RoleARN
125+
result.Audience = entry.Audience
126+
result.UseRegionalSTS = entry.UseRegionalSTS
127+
result.TokenExpiration = entry.TokenExpiration
128+
return result
96129
}
97130
}
98131
{
99-
resp := c.getCM(name, namespace)
100-
if resp != nil {
101-
return resp.RoleARN, resp.Audience, resp.UseRegionalSTS, resp.TokenExpiration
132+
entry := c.getCM(req.Name, req.Namespace)
133+
if entry != nil {
134+
result.FoundInCache = true
135+
result.RoleARN = entry.RoleARN
136+
result.Audience = entry.Audience
137+
result.UseRegionalSTS = entry.UseRegionalSTS
138+
result.TokenExpiration = entry.TokenExpiration
139+
return result
102140
}
103141
}
104-
klog.V(5).Infof("Service account %s/%s not found in cache", namespace, name)
105-
return "", "", false, pkg.DefaultTokenExpiration
142+
klog.V(5).Infof("Service account %s not found in cache", req.CacheKey())
143+
return result
106144
}
107145

108146
// GetCommonConfigurations returns the common configurations that also applies to the new mutation method(i.e Container Credentials).
109147
// The config file for the container credentials does not contain "TokenExpiration" or "UseRegionalSTS". For backward compatibility,
110148
// Use these fields if they are set in the sa annotations or config map.
111149
func (c *serviceAccountCache) GetCommonConfigurations(name, namespace string) (useRegionalSTS bool, tokenExpiration int64) {
112-
if resp := c.getSA(name, namespace); resp != nil {
113-
return resp.UseRegionalSTS, resp.TokenExpiration
114-
} else if resp := c.getCM(name, namespace); resp != nil {
115-
return resp.UseRegionalSTS, resp.TokenExpiration
150+
if entry, _ := c.getSA(Request{Name: name, Namespace: namespace, RequestNotification: false}); entry != nil {
151+
return entry.UseRegionalSTS, entry.TokenExpiration
152+
} else if entry := c.getCM(name, namespace); entry != nil {
153+
return entry.UseRegionalSTS, entry.TokenExpiration
116154
}
117155
return false, pkg.DefaultTokenExpiration
118156
}
119157

120-
func (c *serviceAccountCache) getSA(name, namespace string) *CacheResponse {
158+
func (c *serviceAccountCache) getSA(req Request) (*Entry, chan struct{}) {
121159
c.mu.RLock()
122160
defer c.mu.RUnlock()
123-
resp, ok := c.saCache[namespace+"/"+name]
124-
if !ok {
125-
return nil
161+
entry, ok := c.saCache[req.CacheKey()]
162+
if !ok && req.RequestNotification {
163+
klog.V(5).Infof("Service Account %s not found in cache, adding notification handler", req.CacheKey())
164+
c.handlerMu.Lock()
165+
defer c.handlerMu.Unlock()
166+
notifier, found := c.notificationHandlers[req.CacheKey()]
167+
if !found {
168+
notifier = make(chan struct{})
169+
c.notificationHandlers[req.CacheKey()] = notifier
170+
}
171+
return nil, notifier
126172
}
127-
return resp
173+
return entry, nil
128174
}
129175

130-
func (c *serviceAccountCache) getCM(name, namespace string) *CacheResponse {
176+
func (c *serviceAccountCache) getCM(name, namespace string) *Entry {
131177
c.mu.RLock()
132178
defer c.mu.RUnlock()
133-
resp, ok := c.cmCache[namespace+"/"+name]
179+
entry, ok := c.cmCache[namespace+"/"+name]
134180
if !ok {
135181
return nil
136182
}
137-
return resp
183+
return entry
138184
}
139185

140186
func (c *serviceAccountCache) popSA(name, namespace string) {
@@ -164,7 +210,7 @@ func (c *serviceAccountCache) ToJSON() string {
164210
}
165211

166212
func (c *serviceAccountCache) addSA(sa *v1.ServiceAccount) {
167-
resp := &CacheResponse{}
213+
entry := &Entry{}
168214

169215
arn, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.RoleARNAnnotation]
170216
if ok {
@@ -178,49 +224,59 @@ func (c *serviceAccountCache) addSA(sa *v1.ServiceAccount) {
178224
} else if !matched {
179225
klog.Warningf("arn is invalid: %s", arn)
180226
}
181-
resp.RoleARN = arn
227+
entry.RoleARN = arn
182228
}
183229

184-
resp.Audience = c.defaultAudience
230+
entry.Audience = c.defaultAudience
185231
if audience, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.AudienceAnnotation]; ok {
186-
resp.Audience = audience
232+
entry.Audience = audience
187233
}
188234

189-
resp.UseRegionalSTS = c.defaultRegionalSTS
235+
entry.UseRegionalSTS = c.defaultRegionalSTS
190236
if useRegionalStr, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.UseRegionalSTSAnnotation]; ok {
191237
useRegional, err := strconv.ParseBool(useRegionalStr)
192238
if err != nil {
193239
klog.V(4).Infof("Ignoring service account %s/%s invalid value for disable-regional-sts annotation", sa.Namespace, sa.Name)
194240
} else {
195-
resp.UseRegionalSTS = useRegional
241+
entry.UseRegionalSTS = useRegional
196242
}
197243
}
198244

199-
resp.TokenExpiration = c.defaultTokenExpiration
245+
entry.TokenExpiration = c.defaultTokenExpiration
200246
if tokenExpirationStr, ok := sa.Annotations[c.annotationPrefix+"/"+pkg.TokenExpirationAnnotation]; ok {
201247
if tokenExpiration, err := strconv.ParseInt(tokenExpirationStr, 10, 64); err != nil {
202-
klog.V(4).Infof("Found invalid value for token expiration, using %d seconds as default: %v", resp.TokenExpiration, err)
248+
klog.V(4).Infof("Found invalid value for token expiration, using %d seconds as default: %v", entry.TokenExpiration, err)
203249
} else {
204-
resp.TokenExpiration = pkg.ValidateMinTokenExpiration(tokenExpiration)
250+
entry.TokenExpiration = pkg.ValidateMinTokenExpiration(tokenExpiration)
205251
}
206252
}
207253
c.webhookUsage.Set(1)
208254

209-
c.setSA(sa.Name, sa.Namespace, resp)
255+
c.setSA(sa.Name, sa.Namespace, entry)
210256
}
211257

212-
func (c *serviceAccountCache) setSA(name, namespace string, resp *CacheResponse) {
258+
func (c *serviceAccountCache) setSA(name, namespace string, entry *Entry) {
213259
c.mu.Lock()
214260
defer c.mu.Unlock()
215-
klog.V(5).Infof("Adding SA %s/%s to SA cache: %+v", namespace, name, resp)
216-
c.saCache[namespace+"/"+name] = resp
261+
262+
key := namespace + "/" + name
263+
klog.V(5).Infof("Adding SA %q to SA cache: %+v", key, entry)
264+
c.saCache[key] = entry
265+
266+
c.handlerMu.Lock()
267+
defer c.handlerMu.Unlock()
268+
if handler, found := c.notificationHandlers[key]; found {
269+
klog.V(5).Infof("Notifying handlers for %q", key)
270+
close(handler)
271+
delete(c.notificationHandlers, key)
272+
}
217273
}
218274

219-
func (c *serviceAccountCache) setCM(name, namespace string, resp *CacheResponse) {
275+
func (c *serviceAccountCache) setCM(name, namespace string, entry *Entry) {
220276
c.mu.Lock()
221277
defer c.mu.Unlock()
222-
klog.V(5).Infof("Adding SA %s/%s to CM cache: %+v", namespace, name, resp)
223-
c.cmCache[namespace+"/"+name] = resp
278+
klog.V(5).Infof("Adding SA %s/%s to CM cache: %+v", namespace, name, entry)
279+
c.cmCache[namespace+"/"+name] = entry
224280
}
225281

226282
func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenExpiration int64, saInformer coreinformers.ServiceAccountInformer, cmInformer coreinformers.ConfigMapInformer, composeRoleArn ComposeRoleArn) ServiceAccountCache {
@@ -233,15 +289,16 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
233289
}
234290

235291
c := &serviceAccountCache{
236-
saCache: map[string]*CacheResponse{},
237-
cmCache: map[string]*CacheResponse{},
292+
saCache: map[string]*Entry{},
293+
cmCache: map[string]*Entry{},
238294
defaultAudience: defaultAudience,
239295
annotationPrefix: prefix,
240296
defaultRegionalSTS: defaultRegionalSTS,
241297
composeRoleArn: composeRoleArn,
242298
defaultTokenExpiration: defaultTokenExpiration,
243299
hasSynced: hasSynced,
244300
webhookUsage: webhookUsage,
301+
notificationHandlers: map[string]chan struct{}{},
245302
}
246303

247304
saInformer.Informer().AddEventHandler(
@@ -298,22 +355,22 @@ func (c *serviceAccountCache) populateCacheFromCM(oldCM, newCM *v1.ConfigMap) er
298355
return nil
299356
}
300357
newConfig := newCM.Data["config"]
301-
sas := make(map[string]*CacheResponse)
358+
sas := make(map[string]*Entry)
302359
err := json.Unmarshal([]byte(newConfig), &sas)
303360
if err != nil {
304361
return fmt.Errorf("failed to unmarshal new config %q: %v", newConfig, err)
305362
}
306-
for key, resp := range sas {
363+
for key, entry := range sas {
307364
parts := strings.Split(key, "/")
308-
if resp.TokenExpiration == 0 {
309-
resp.TokenExpiration = c.defaultTokenExpiration
365+
if entry.TokenExpiration == 0 {
366+
entry.TokenExpiration = c.defaultTokenExpiration
310367
}
311-
c.setCM(parts[1], parts[0], resp)
368+
c.setCM(parts[1], parts[0], entry)
312369
}
313370

314371
if oldCM != nil {
315372
oldConfig := oldCM.Data["config"]
316-
oldCache := make(map[string]*CacheResponse)
373+
oldCache := make(map[string]*Entry)
317374
err := json.Unmarshal([]byte(oldConfig), &oldCache)
318375
if err != nil {
319376
return fmt.Errorf("failed to unmarshal old config %q: %v", oldConfig, err)

0 commit comments

Comments
 (0)