Skip to content

Commit 9aecd67

Browse files
committed
public release
1 parent e014a47 commit 9aecd67

File tree

16 files changed

+2282
-1
lines changed

16 files changed

+2282
-1
lines changed

Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
build:
2+
govendor build +local
3+
4+
install-libs:
5+
govendor install +vendor,^program
6+
7+
example:
8+
go build -o _example/example ./_example
9+
10+
.PHONY: build install-libs example

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# kcache: kubernetes object cache
2+
3+
Kcache is a replacement for [k8s.io/client-go/tools/cache](https://github.com/kubernetes/client-go/tree/master/tools/cache).
4+
5+
See example usage [here](_example/main.go)
6+
7+
## Key differences
8+
9+
* Uses goprocs and channels instead of mutexes and condition variables (no need to poll when ready)
10+
* Allows multiple subscribers
11+
* Does not emit "add" events on every resync
12+
* Does not (currently) support indexes.
13+
* Kcache does not handle connection/timeout errors well
14+
15+
## Status
16+
17+
WIP; not ready for production.
18+
19+
### TODO
20+
21+
* Tests
22+
* Indexes (will be subscribers) and filters
23+
* Documentation
24+

_example/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/example

_example/main.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/fields"
12+
"k8s.io/client-go/kubernetes"
13+
"k8s.io/client-go/pkg/api/v1"
14+
"k8s.io/client-go/rest"
15+
"k8s.io/client-go/tools/clientcmd"
16+
17+
logutil "github.com/boz/go-logutil"
18+
"github.com/boz/kcache"
19+
"github.com/boz/kcache/client"
20+
21+
lr "github.com/boz/go-logutil/logrus"
22+
"github.com/sirupsen/logrus"
23+
)
24+
25+
func main() {
26+
logger := logrus.New()
27+
logger.Level = logrus.DebugLevel
28+
29+
log := lr.New(logger)
30+
ctx := context.Background()
31+
32+
rclient := getRESTClient(log)
33+
34+
client := client.ForResource(rclient, "pods", metav1.NamespaceAll, fields.Everything())
35+
36+
controller, err := kcache.NewController(ctx, log, client)
37+
if err != nil {
38+
log.ErrFatal(err, "kcache.NewController()")
39+
}
40+
41+
go watchSignals(log, controller)
42+
43+
defer controller.Stop()
44+
45+
subscription := controller.Subscribe()
46+
47+
<-subscription.Ready()
48+
49+
list, err := subscription.Cache().List()
50+
if err != nil {
51+
log.ErrFatal(err, "Cache().List()")
52+
}
53+
54+
for _, pod := range list {
55+
if pod, ok := pod.(*v1.Pod); ok {
56+
fmt.Printf("%v/%v: %v\n", pod.GetNamespace(), pod.GetName(), pod.GetResourceVersion())
57+
} else {
58+
log.Infof("invalid type: %T", pod)
59+
}
60+
}
61+
62+
for {
63+
select {
64+
case ev, ok := <-subscription.Events():
65+
if !ok {
66+
return
67+
}
68+
obj := ev.Resource()
69+
fmt.Printf("event: %v: %v/%v[%v]\n", ev.Type(), obj.GetNamespace(), obj.GetName(), obj.GetResourceVersion())
70+
71+
cobj, err := subscription.Cache().GetObject(obj)
72+
if err != nil {
73+
log.ErrWarn(err, "GetObject()")
74+
continue
75+
}
76+
77+
cnobj, err := subscription.Cache().Get(obj.GetNamespace(), obj.GetName())
78+
if err != nil {
79+
log.ErrWarn(err, "Get()")
80+
continue
81+
}
82+
83+
if ev.Type() == kcache.EventTypeDelete {
84+
if cobj != nil {
85+
log.Warnf("GetObject(deleted) != nil")
86+
}
87+
if cnobj != nil {
88+
log.Warnf("Get(deleted) != nil")
89+
}
90+
continue
91+
}
92+
93+
if cobj == nil {
94+
log.Warnf("GetObject() -> nil")
95+
continue
96+
}
97+
if cnobj == nil {
98+
log.Warnf("Get() -> nil")
99+
continue
100+
}
101+
102+
fmt.Printf("GetObject: %v/%v[%v]\n", cobj.GetNamespace(), cobj.GetName(), cobj.GetResourceVersion())
103+
fmt.Printf("Get: %v/%v[%v]\n", cnobj.GetNamespace(), cnobj.GetName(), cnobj.GetResourceVersion())
104+
}
105+
}
106+
}
107+
108+
func getRESTClient(log logutil.Log) rest.Interface {
109+
kconfig, err := getKubeRESTConfig()
110+
if err != nil {
111+
log.ErrFatal(err, "can't get kube client")
112+
}
113+
114+
clientset, err := kubernetes.NewForConfig(kconfig)
115+
if err != nil {
116+
log.ErrFatal(err, "can't get clientset")
117+
}
118+
119+
client := clientset.Core().RESTClient()
120+
return client
121+
}
122+
123+
func getKubeRESTConfig() (*rest.Config, error) {
124+
/*
125+
config, err := rest.InClusterConfig()
126+
if err == nil {
127+
return config, err
128+
}
129+
*/
130+
131+
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
132+
clientcmd.NewDefaultClientConfigLoadingRules(),
133+
&clientcmd.ConfigOverrides{},
134+
).ClientConfig()
135+
}
136+
137+
func watchSignals(log logutil.Log, controller kcache.Controller) {
138+
sigch := make(chan os.Signal, 1)
139+
signal.Notify(sigch, syscall.SIGINT, syscall.SIGQUIT)
140+
141+
select {
142+
case <-controller.Done():
143+
case <-sigch:
144+
controller.Stop()
145+
}
146+
}

_example/pod.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
apiVersion: v1
3+
kind: Pod
4+
metadata:
5+
name: sleeper
6+
spec:
7+
containers:
8+
- name: busybox
9+
image: busybox
10+
args:
11+
- sleep
12+
- "10000"

builder.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package kcache
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
lifecycle "github.com/boz/go-lifecycle"
9+
logutil "github.com/boz/go-logutil"
10+
"github.com/boz/kcache/client"
11+
)
12+
13+
type Builder interface {
14+
Context(context.Context) Builder
15+
Log(logutil.Log) Builder
16+
17+
Client(client.Client) Builder
18+
Lister() ListerBuilder
19+
Watcher() WatcherBuilder
20+
21+
Create() (Controller, error)
22+
}
23+
24+
type ListerBuilder interface {
25+
RefreshPeriod(time.Duration) ListerBuilder
26+
Client(client.ListClient) ListerBuilder
27+
}
28+
29+
type WatcherBuilder interface {
30+
Client(client.WatchClient) WatcherBuilder
31+
}
32+
33+
func NewBuilder() Builder {
34+
return &builder{
35+
ctx: context.Background(),
36+
lb: newListerBuilder(),
37+
wb: newWatcherBuilder(),
38+
}
39+
}
40+
41+
type builder struct {
42+
client client.Client
43+
log logutil.Log
44+
ctx context.Context
45+
46+
lb *listerBuilder
47+
wb *watcherBuilder
48+
}
49+
50+
func (b *builder) Context(ctx context.Context) Builder {
51+
b.ctx = ctx
52+
return b
53+
}
54+
55+
func (b *builder) Log(log logutil.Log) Builder {
56+
b.log = log
57+
return b
58+
}
59+
60+
func (b *builder) Client(client client.Client) Builder {
61+
b.lb.Client(client)
62+
b.wb.Client(client)
63+
return b
64+
}
65+
66+
func (b *builder) Lister() ListerBuilder {
67+
return b.lb
68+
}
69+
70+
func (b *builder) Watcher() WatcherBuilder {
71+
return b.wb
72+
}
73+
74+
func (b *builder) Create() (Controller, error) {
75+
if b.log == nil {
76+
return nil, fmt.Errorf("kcache builder: log required")
77+
}
78+
79+
if b.client == nil {
80+
return nil, fmt.Errorf("kcache builder: client required")
81+
}
82+
83+
log := b.log.WithComponent("controller")
84+
ctx := b.ctx
85+
86+
lc := lifecycle.New()
87+
88+
c := &controller{
89+
readych: make(chan struct{}),
90+
lister: newLister(ctx, log, lc.ShuttingDown(), b.lb.period, b.lb.client),
91+
watcher: newWatcher(ctx, log, lc.ShuttingDown(), b.wb.client),
92+
cache: newCache(ctx, log, lc.ShuttingDown(), b.client),
93+
94+
subscribech: make(chan chan<- Subscription),
95+
unsubscribech: make(chan subscription),
96+
subscriptions: make(map[subscription]struct{}),
97+
98+
log: log,
99+
lc: lc,
100+
ctx: ctx,
101+
}
102+
103+
go c.lc.WatchContext(c.ctx)
104+
105+
go c.run()
106+
107+
return c, nil
108+
}
109+
110+
type listerBuilder struct {
111+
client client.ListClient
112+
period time.Duration
113+
}
114+
115+
func newListerBuilder() *listerBuilder {
116+
return &listerBuilder{period: defaultRefreshPeriod}
117+
}
118+
119+
func (b *listerBuilder) RefreshPeriod(period time.Duration) ListerBuilder {
120+
b.period = period
121+
return b
122+
}
123+
124+
func (b *listerBuilder) Client(client client.ListClient) ListerBuilder {
125+
b.client = client
126+
return b
127+
}
128+
129+
type watcherBuilder struct {
130+
client client.WatchClient
131+
}
132+
133+
func newWatcherBuilder() *watcherBuilder {
134+
return &watcherBuilder{}
135+
}
136+
137+
func (b *watcherBuilder) Client(client client.WatchClient) WatcherBuilder {
138+
b.client = client
139+
return b
140+
}

0 commit comments

Comments
 (0)