Skip to content

Commit e922a85

Browse files
committed
fetch SAs from apiserver
1 parent feac6cc commit e922a85

File tree

4 files changed

+203
-29
lines changed

4 files changed

+203
-29
lines changed

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ func main() {
181181
saInformer,
182182
cmInformer,
183183
composeRoleArnCache,
184+
clientset.CoreV1(),
184185
)
185186
stop := make(chan struct{})
186187
informerFactory.Start(stop)

pkg/cache/cache.go

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,23 @@
1616
package cache
1717

1818
import (
19+
"context"
1920
"encoding/json"
2021
"fmt"
2122
"regexp"
2223
"strconv"
2324
"strings"
2425
"sync"
26+
"time"
2527

2628
"github.com/aws/amazon-eks-pod-identity-webhook/pkg"
2729
"github.com/prometheus/client_golang/prometheus"
2830
v1 "k8s.io/api/core/v1"
31+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2932
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3033
coreinformers "k8s.io/client-go/informers/core/v1"
3134
"k8s.io/client-go/kubernetes"
35+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
3236
"k8s.io/client-go/tools/cache"
3337
"k8s.io/klog/v2"
3438
)
@@ -80,8 +84,7 @@ type serviceAccountCache struct {
8084
composeRoleArn ComposeRoleArn
8185
defaultTokenExpiration int64
8286
webhookUsage prometheus.Gauge
83-
notificationHandlers map[string]chan struct{}
84-
handlerMu sync.Mutex
87+
notifications *notifications
8588
}
8689

8790
type ComposeRoleArn struct {
@@ -156,20 +159,13 @@ func (c *serviceAccountCache) GetCommonConfigurations(name, namespace string) (u
156159
return false, pkg.DefaultTokenExpiration
157160
}
158161

159-
func (c *serviceAccountCache) getSA(req Request) (*Entry, chan struct{}) {
162+
func (c *serviceAccountCache) getSA(req Request) (*Entry, <-chan struct{}) {
160163
c.mu.RLock()
161164
defer c.mu.RUnlock()
162165
entry, ok := c.saCache[req.CacheKey()]
163166
if !ok && req.RequestNotification {
164167
klog.V(5).Infof("Service Account %s not found in cache, adding notification handler", req.CacheKey())
165-
c.handlerMu.Lock()
166-
defer c.handlerMu.Unlock()
167-
notifier, found := c.notificationHandlers[req.CacheKey()]
168-
if !found {
169-
notifier = make(chan struct{})
170-
c.notificationHandlers[req.CacheKey()] = notifier
171-
}
172-
return nil, notifier
168+
return nil, c.notifications.create(req)
173169
}
174170
return entry, nil
175171
}
@@ -264,13 +260,7 @@ func (c *serviceAccountCache) setSA(name, namespace string, entry *Entry) {
264260
klog.V(5).Infof("Adding SA %q to SA cache: %+v", key, entry)
265261
c.saCache[key] = entry
266262

267-
c.handlerMu.Lock()
268-
defer c.handlerMu.Unlock()
269-
if handler, found := c.notificationHandlers[key]; found {
270-
klog.V(5).Infof("Notifying handlers for %q", key)
271-
close(handler)
272-
delete(c.notificationHandlers, key)
273-
}
263+
c.notifications.broadcast(key)
274264
}
275265

276266
func (c *serviceAccountCache) setCM(name, namespace string, entry *Entry) {
@@ -280,7 +270,15 @@ func (c *serviceAccountCache) setCM(name, namespace string, entry *Entry) {
280270
c.cmCache[namespace+"/"+name] = entry
281271
}
282272

283-
func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenExpiration int64, saInformer coreinformers.ServiceAccountInformer, cmInformer coreinformers.ConfigMapInformer, composeRoleArn ComposeRoleArn) ServiceAccountCache {
273+
func New(defaultAudience,
274+
prefix string,
275+
defaultRegionalSTS bool,
276+
defaultTokenExpiration int64,
277+
saInformer coreinformers.ServiceAccountInformer,
278+
cmInformer coreinformers.ConfigMapInformer,
279+
composeRoleArn ComposeRoleArn,
280+
SAGetter corev1.ServiceAccountsGetter,
281+
) ServiceAccountCache {
284282
hasSynced := func() bool {
285283
if cmInformer != nil {
286284
return saInformer.Informer().HasSynced() && cmInformer.Informer().HasSynced()
@@ -289,6 +287,8 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
289287
}
290288
}
291289

290+
// Rate limit to 10 concurrent requests against the API server.
291+
saFetchRequests := make(chan *Request, 10)
292292
c := &serviceAccountCache{
293293
saCache: map[string]*Entry{},
294294
cmCache: map[string]*Entry{},
@@ -299,9 +299,20 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
299299
defaultTokenExpiration: defaultTokenExpiration,
300300
hasSynced: hasSynced,
301301
webhookUsage: webhookUsage,
302-
notificationHandlers: map[string]chan struct{}{},
302+
notifications: newNotifications(saFetchRequests),
303303
}
304304

305+
go func() {
306+
for req := range saFetchRequests {
307+
sa, err := fetchFromAPI(SAGetter, req)
308+
if err != nil {
309+
klog.Errorf("fetching SA: %s, but got error from API: %v", req.CacheKey(), err)
310+
continue
311+
}
312+
c.addSA(sa)
313+
}
314+
}()
315+
305316
saInformer.Informer().AddEventHandler(
306317
cache.ResourceEventHandlerFuncs{
307318
AddFunc: func(obj interface{}) {
@@ -351,6 +362,29 @@ func New(defaultAudience, prefix string, defaultRegionalSTS bool, defaultTokenEx
351362
return c
352363
}
353364

365+
func fetchFromAPI(getter corev1.ServiceAccountsGetter, req *Request) (*v1.ServiceAccount, error) {
366+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
367+
defer cancel()
368+
369+
klog.V(5).Infof("fetching SA: %s", req.CacheKey())
370+
saList, err := getter.ServiceAccounts(req.Namespace).List(
371+
ctx,
372+
metav1.ListOptions{},
373+
)
374+
if err != nil {
375+
return nil, err
376+
}
377+
378+
// Find the ServiceAccount
379+
for _, sa := range saList.Items {
380+
if sa.Name == req.Name {
381+
return &sa, nil
382+
383+
}
384+
}
385+
return nil, fmt.Errorf("no SA found in namespace: %s", req.CacheKey())
386+
}
387+
354388
func (c *serviceAccountCache) populateCacheFromCM(oldCM, newCM *v1.ConfigMap) error {
355389
if newCM.Name != "pod-identity-webhook" {
356390
return nil

pkg/cache/cache_test.go

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func TestSaCache(t *testing.T) {
3535
defaultAudience: "sts.amazonaws.com",
3636
annotationPrefix: "eks.amazonaws.com",
3737
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
38+
notifications: newNotifications(make(chan *Request, 10)),
3839
}
3940

4041
resp := cache.Get(Request{Name: "default", Namespace: "default"})
@@ -69,9 +70,9 @@ func TestNotification(t *testing.T) {
6970

7071
t.Run("with one notification handler", func(t *testing.T) {
7172
cache := &serviceAccountCache{
72-
saCache: map[string]*Entry{},
73-
notificationHandlers: map[string]chan struct{}{},
74-
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
73+
saCache: map[string]*Entry{},
74+
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
75+
notifications: newNotifications(make(chan *Request, 10)),
7576
}
7677

7778
// test that the requested SA is not in the cache
@@ -106,9 +107,9 @@ func TestNotification(t *testing.T) {
106107

107108
t.Run("with 10 notification handlers", func(t *testing.T) {
108109
cache := &serviceAccountCache{
109-
saCache: map[string]*Entry{},
110-
notificationHandlers: map[string]chan struct{}{},
111-
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
110+
saCache: map[string]*Entry{},
111+
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
112+
notifications: newNotifications(make(chan *Request, 5)),
112113
}
113114

114115
// test that the requested SA is not in the cache
@@ -153,6 +154,63 @@ func TestNotification(t *testing.T) {
153154
})
154155
}
155156

157+
func TestFetchFromAPIServer(t *testing.T) {
158+
testSA := &v1.ServiceAccount{
159+
ObjectMeta: metav1.ObjectMeta{
160+
Name: "default",
161+
Namespace: "default",
162+
Annotations: map[string]string{
163+
"eks.amazonaws.com/role-arn": "arn:aws:iam::111122223333:role/s3-reader",
164+
"eks.amazonaws.com/token-expiration": "3600",
165+
},
166+
},
167+
}
168+
fakeSAClient := fake.NewSimpleClientset(testSA)
169+
170+
// use an empty informer to simulate the need to fetch SA from api server:
171+
fakeEmptyClient := fake.NewSimpleClientset()
172+
emptyInformerFactory := informers.NewSharedInformerFactory(fakeEmptyClient, 0)
173+
emptyInformer := emptyInformerFactory.Core().V1().ServiceAccounts()
174+
175+
cache := New(
176+
"sts.amazonaws.com",
177+
"eks.amazonaws.com",
178+
true,
179+
86400,
180+
emptyInformer,
181+
nil,
182+
ComposeRoleArn{},
183+
fakeSAClient.CoreV1(),
184+
)
185+
186+
stop := make(chan struct{})
187+
emptyInformerFactory.Start(stop)
188+
emptyInformerFactory.WaitForCacheSync(stop)
189+
cache.Start(stop)
190+
defer close(stop)
191+
192+
err := wait.ExponentialBackoff(wait.Backoff{Duration: 10 * time.Millisecond, Factor: 1.0, Steps: 3}, func() (bool, error) {
193+
return len(fakeEmptyClient.Actions()) != 0, nil
194+
})
195+
if err != nil {
196+
t.Fatalf("informer never called client: %v", err)
197+
}
198+
199+
resp := cache.Get(Request{Name: "default", Namespace: "default", RequestNotification: true})
200+
assert.False(t, resp.FoundInCache, "Expected cache entry to not be found")
201+
202+
// wait for the notification while we fetch the SA from the API server:
203+
select {
204+
case <-resp.Notifier:
205+
// expected
206+
// test that the requested SA is now in the cache
207+
resp := cache.Get(Request{Name: "default", Namespace: "default", RequestNotification: false})
208+
assert.True(t, resp.FoundInCache, "Expected cache entry to be found in cache")
209+
case <-time.After(1 * time.Second):
210+
t.Fatal("timeout waiting for notification")
211+
}
212+
}
213+
156214
func TestNonRegionalSTS(t *testing.T) {
157215
trueStr := "true"
158216
falseStr := "false"
@@ -237,7 +295,16 @@ func TestNonRegionalSTS(t *testing.T) {
237295

238296
testComposeRoleArn := ComposeRoleArn{}
239297

240-
cache := New(audience, "eks.amazonaws.com", tc.defaultRegionalSTS, 86400, informer, nil, testComposeRoleArn)
298+
cache := New(
299+
audience,
300+
"eks.amazonaws.com",
301+
tc.defaultRegionalSTS,
302+
86400,
303+
informer,
304+
nil,
305+
testComposeRoleArn,
306+
fakeClient.CoreV1(),
307+
)
241308
stop := make(chan struct{})
242309
informerFactory.Start(stop)
243310
informerFactory.WaitForCacheSync(stop)
@@ -295,7 +362,8 @@ func TestPopulateCacheFromCM(t *testing.T) {
295362
}
296363

297364
c := serviceAccountCache{
298-
cmCache: make(map[string]*Entry),
365+
cmCache: make(map[string]*Entry),
366+
notifications: newNotifications(make(chan *Request, 10)),
299367
}
300368

301369
{
@@ -353,6 +421,7 @@ func TestSAAnnotationRemoval(t *testing.T) {
353421
saCache: make(map[string]*Entry),
354422
annotationPrefix: "eks.amazonaws.com",
355423
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
424+
notifications: newNotifications(make(chan *Request, 10)),
356425
}
357426

358427
c.addSA(oldSA)
@@ -416,6 +485,7 @@ func TestCachePrecedence(t *testing.T) {
416485
defaultTokenExpiration: pkg.DefaultTokenExpiration,
417486
annotationPrefix: "eks.amazonaws.com",
418487
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
488+
notifications: newNotifications(make(chan *Request, 10)),
419489
}
420490

421491
{
@@ -514,7 +584,15 @@ func TestRoleArnComposition(t *testing.T) {
514584
informerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
515585
informer := informerFactory.Core().V1().ServiceAccounts()
516586

517-
cache := New(audience, "eks.amazonaws.com", true, 86400, informer, nil, testComposeRoleArn)
587+
cache := New(audience,
588+
"eks.amazonaws.com",
589+
true,
590+
86400,
591+
informer,
592+
nil,
593+
testComposeRoleArn,
594+
fakeClient.CoreV1(),
595+
)
518596
stop := make(chan struct{})
519597
informerFactory.Start(stop)
520598
informerFactory.WaitForCacheSync(stop)
@@ -613,6 +691,7 @@ func TestGetCommonConfigurations(t *testing.T) {
613691
defaultAudience: "sts.amazonaws.com",
614692
annotationPrefix: "eks.amazonaws.com",
615693
webhookUsage: prometheus.NewGauge(prometheus.GaugeOpts{}),
694+
notifications: newNotifications(make(chan *Request, 10)),
616695
}
617696

618697
if tc.serviceAccount != nil {

pkg/cache/notifications.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package cache
2+
3+
import (
4+
"sync"
5+
6+
"k8s.io/klog/v2"
7+
8+
"github.com/prometheus/client_golang/prometheus"
9+
)
10+
11+
var notificationUsage = prometheus.NewCounterVec(
12+
prometheus.CounterOpts{
13+
Name: "pod_identity_cache_notifications",
14+
Help: "Counter of SA notifications",
15+
},
16+
[]string{"method"},
17+
)
18+
19+
func init() {
20+
prometheus.MustRegister(notificationUsage)
21+
}
22+
23+
type notifications struct {
24+
handlers map[string]chan struct{}
25+
mu sync.Mutex
26+
fetchRequests chan<- *Request
27+
}
28+
29+
func newNotifications(saFetchRequests chan<- *Request) *notifications {
30+
return &notifications{
31+
handlers: map[string]chan struct{}{},
32+
fetchRequests: saFetchRequests,
33+
}
34+
}
35+
36+
func (n *notifications) create(req Request) <-chan struct{} {
37+
n.mu.Lock()
38+
defer n.mu.Unlock()
39+
40+
notificationUsage.WithLabelValues("used").Inc()
41+
notifier, found := n.handlers[req.CacheKey()]
42+
if !found {
43+
notifier = make(chan struct{})
44+
n.handlers[req.CacheKey()] = notifier
45+
notificationUsage.WithLabelValues("created").Inc()
46+
n.fetchRequests <- &req
47+
}
48+
return notifier
49+
}
50+
51+
func (n *notifications) broadcast(key string) {
52+
n.mu.Lock()
53+
defer n.mu.Unlock()
54+
if handler, found := n.handlers[key]; found {
55+
klog.V(5).Infof("Notifying handlers for %q", key)
56+
notificationUsage.WithLabelValues("broadcast").Inc()
57+
close(handler)
58+
delete(n.handlers, key)
59+
}
60+
}

0 commit comments

Comments
 (0)