Skip to content

Commit a838261

Browse files
authored
Merge pull request #2 from ykulazhenkov/impl
Add initial implementation
2 parents fbf7cff + f43fa15 commit a838261

File tree

12 files changed

+801
-56
lines changed

12 files changed

+801
-56
lines changed

README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,73 @@
11
# network-operator-init-container
22
Init container for NVIDIA Network Operator
3+
4+
The network-operator-init-container container has two required command line arguments:
5+
6+
- `--config` path to the configuration file
7+
- `--node-name` name of the k8s node on which this app runs
8+
9+
The configuration file should be in JSON format:
10+
11+
```
12+
{
13+
"safeDriverLoad": {
14+
"enable": true,
15+
"annotation": "some-annotation"
16+
}
17+
}
18+
```
19+
20+
- `safeDriverLoad` - contains settings related to safeDriverLoad feature
21+
- `safeDriverLoad.enable` - enable safeDriveLoad feature
22+
- `safeDriverLoad.annotation` - annotation to use for safeDriverLoad feature
23+
24+
25+
If `safeDriverLoad` feature is enabled then the network-operator-init-container container will set annotation
26+
provided in `safeDriverLoad.annotation` on the Kubernetes Node object identified by `--node-name`.
27+
The container exits with code 0 when the annotation is removed from the Node object.
28+
29+
If `safeDriverLoad` feature is disabled then the container will immediately exit with code 0.
30+
31+
```
32+
NVIDIA Network Operator init container
33+
34+
Usage:
35+
network-operator-init-container [flags]
36+
37+
Config flags:
38+
39+
--config string
40+
path to the configuration file
41+
--node-name string
42+
name of the k8s node on which this app runs
43+
44+
Logging flags:
45+
46+
--log-flush-frequency duration
47+
Maximum number of seconds between log flushes (default 5s)
48+
--log-json-info-buffer-size quantity
49+
[Alpha] In JSON format with split output streams, the info messages can be buffered for a while to increase performance. The default value of zero bytes disables buffering. The size can
50+
be specified as number of bytes (512), multiples of 1000 (1K), multiples of 1024 (2Ki), or powers of those (3M, 4G, 5Mi, 6Gi). Enable the LoggingAlphaOptions feature gate to use this.
51+
--log-json-split-stream
52+
[Alpha] In JSON format, write error messages to stderr and info messages to stdout. The default is to write a single stream to stdout. Enable the LoggingAlphaOptions feature gate to use
53+
this.
54+
--logging-format string
55+
Sets the log format. Permitted formats: "json" (gated by LoggingBetaOptions), "text". (default "text")
56+
-v, --v Level
57+
number for the log level verbosity
58+
--vmodule pattern=N,...
59+
comma-separated list of pattern=N settings for file-filtered logging (only works for text log format)
60+
61+
General flags:
62+
63+
-h, --help
64+
print help and exit
65+
--version
66+
print version and exit
67+
68+
Kubernetes flags:
69+
70+
--kubeconfig string
71+
Paths to a kubeconfig. Only required if out-of-cluster.
72+
73+
```

Taskfile.dist.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ vars:
2121
IMAGE_NAME_FULL: "{{.IMAGE_REGISTRY}}/{{.IMAGE_REPOSITORY}}:{{.IMAGE_TAG}}"
2222
# Coverage related vars
2323
COVER_PROFILE: "{{.PROJECT_DIR}}/network-operator-init-container.cover"
24+
# Test related vars
25+
ENVTEST_K8S_VERSION: 1.27.1
2426

2527
includes:
2628
version: ./taskfiles/Version.yaml
@@ -65,11 +67,14 @@ tasks:
6567

6668
test:
6769
desc: run unit tests
70+
deps:
71+
- install:setup-envtest
6872
vars:
6973
COVER_MODE: atomic
7074
GO_PKGS:
7175
sh: go list ./... | grep -v ".*/mocks"
7276
cmd: |
77+
export KUBEBUILDER_ASSETS=$({{.LOCAL_BIN}}/setup-envtest use {{.ENVTEST_K8S_VERSION}} -p path);
7378
go test -covermode={{.COVER_MODE}} -coverprofile={{.COVER_PROFILE}} {{.GO_PKGS | catLines}}
7479
7580
lint:

cmd/network-operator-init-container/app/app.go

Lines changed: 154 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,37 @@ package app
1616
import (
1717
"context"
1818
"fmt"
19+
"sync"
20+
"time"
1921

2022
"github.com/go-logr/logr"
2123
"github.com/spf13/cobra"
24+
corev1 "k8s.io/api/core/v1"
25+
apiErrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/fields"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/types"
29+
"k8s.io/client-go/rest"
2230
cliflag "k8s.io/component-base/cli/flag"
2331
"k8s.io/component-base/term"
2432
"k8s.io/klog/v2"
25-
33+
ctrl "sigs.k8s.io/controller-runtime"
34+
"sigs.k8s.io/controller-runtime/pkg/cache"
35+
"sigs.k8s.io/controller-runtime/pkg/client"
36+
"sigs.k8s.io/controller-runtime/pkg/log"
37+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
2638
// register json format for logger
2739
_ "k8s.io/component-base/logs/json/register"
2840

29-
"github.com/Mellanox/network-operator-init-container/pkg/utils/signals"
30-
"github.com/Mellanox/network-operator-init-container/pkg/utils/version"
31-
3241
"github.com/Mellanox/network-operator-init-container/cmd/network-operator-init-container/app/options"
42+
configPgk "github.com/Mellanox/network-operator-init-container/pkg/config"
43+
"github.com/Mellanox/network-operator-init-container/pkg/utils/version"
3344
)
3445

3546
// NewNetworkOperatorInitContainerCommand creates a new command
3647
func NewNetworkOperatorInitContainerCommand() *cobra.Command {
3748
opts := options.New()
38-
ctx := signals.SetupShutdownSignals()
49+
ctx := ctrl.SetupSignalHandler()
3950

4051
cmd := &cobra.Command{
4152
Use: "network-operator-init-container",
@@ -46,9 +57,11 @@ func NewNetworkOperatorInitContainerCommand() *cobra.Command {
4657
if err := opts.Validate(); err != nil {
4758
return fmt.Errorf("invalid config: %w", err)
4859
}
49-
klog.EnableContextualLogging(true)
50-
51-
return RunNetworkOperatorInitContainer(klog.NewContext(ctx, klog.NewKlogr()), opts)
60+
conf, err := ctrl.GetConfig()
61+
if err != nil {
62+
return fmt.Errorf("failed to read config for k8s client: %v", err)
63+
}
64+
return RunNetworkOperatorInitContainer(logr.NewContext(ctx, klog.NewKlogr()), conf, opts)
5265
},
5366
Args: func(cmd *cobra.Command, args []string) error {
5467
for _, arg := range args {
@@ -76,9 +89,139 @@ func NewNetworkOperatorInitContainerCommand() *cobra.Command {
7689
}
7790

7891
// RunNetworkOperatorInitContainer runs init container main loop
79-
func RunNetworkOperatorInitContainer(ctx context.Context, opts *options.Options) error {
92+
func RunNetworkOperatorInitContainer(ctx context.Context, config *rest.Config, opts *options.Options) error {
8093
logger := logr.FromContextOrDiscard(ctx)
81-
logger.Info("start network-operator-init-container", "Options", opts)
94+
ctx, cFunc := context.WithCancel(ctx)
95+
defer cFunc()
96+
logger.Info("start network-operator-init-container",
97+
"Options", opts, "Version", version.GetVersionString())
98+
ctrl.SetLogger(logger)
99+
100+
initContCfg, err := configPgk.FromFile(opts.ConfigPath)
101+
if err != nil {
102+
logger.Error(err, "failed to read configuration")
103+
return err
104+
}
105+
logger.Info("network-operator-init-container configuration", "config", initContCfg.String())
106+
107+
if !initContCfg.SafeDriverLoad.Enable {
108+
logger.Info("safe driver loading is disabled, exit")
109+
return nil
110+
}
111+
112+
mgr, err := ctrl.NewManager(config, ctrl.Options{
113+
Metrics: metricsserver.Options{BindAddress: "0"},
114+
Cache: cache.Options{
115+
ByObject: map[client.Object]cache.ByObject{
116+
&corev1.Node{}: {Field: fields.ParseSelectorOrDie(
117+
fmt.Sprintf("metadata.name=%s", opts.NodeName))}}},
118+
})
119+
if err != nil {
120+
logger.Error(err, "unable to start manager")
121+
return err
122+
}
123+
124+
k8sClient, err := client.New(config,
125+
client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
126+
if err != nil {
127+
logger.Error(err, "failed to create k8sClient client")
128+
return err
129+
}
130+
131+
errCh := make(chan error, 1)
132+
133+
if err = (&NodeReconciler{
134+
ErrCh: errCh,
135+
SafeLoadAnnotation: initContCfg.SafeDriverLoad.Annotation,
136+
Client: mgr.GetClient(),
137+
Scheme: mgr.GetScheme(),
138+
}).SetupWithManager(mgr); err != nil {
139+
logger.Error(err, "unable to create controller", "controller", "Node")
140+
return err
141+
}
142+
143+
node := &corev1.Node{}
144+
err = k8sClient.Get(ctx, types.NamespacedName{Name: opts.NodeName}, node)
145+
if err != nil {
146+
logger.Error(err, "failed to read node object from the API", "node", opts.NodeName)
147+
return err
148+
}
149+
err = k8sClient.Patch(ctx, node, client.RawPatch(
150+
types.MergePatchType, []byte(
151+
fmt.Sprintf(`{"metadata":{"annotations":{%q: %q}}}`,
152+
initContCfg.SafeDriverLoad.Annotation, "true"))))
153+
if err != nil {
154+
logger.Error(err, "unable to set annotation for node", "node", opts.NodeName)
155+
return err
156+
}
157+
158+
logger.Info("wait for annotation to be removed",
159+
"annotation", initContCfg.SafeDriverLoad.Annotation, "node", opts.NodeName)
160+
161+
wg := sync.WaitGroup{}
162+
wg.Add(1)
163+
go func() {
164+
defer wg.Done()
165+
if err := mgr.Start(ctx); err != nil {
166+
logger.Error(err, "problem running manager")
167+
writeCh(errCh, err)
168+
}
169+
}()
170+
defer wg.Wait()
171+
select {
172+
case <-ctx.Done():
173+
return fmt.Errorf("waiting canceled")
174+
case err = <-errCh:
175+
cFunc()
176+
return err
177+
}
178+
}
179+
180+
// NodeReconciler reconciles Node object
181+
type NodeReconciler struct {
182+
ErrCh chan error
183+
SafeLoadAnnotation string
184+
client.Client
185+
Scheme *runtime.Scheme
186+
}
187+
188+
// Reconcile contains logic to sync Node object
189+
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
190+
reqLog := log.FromContext(ctx).WithValues("annotation", r.SafeLoadAnnotation)
191+
192+
node := &corev1.Node{}
193+
err := r.Client.Get(ctx, req.NamespacedName, node)
194+
if err != nil {
195+
if apiErrors.IsNotFound(err) {
196+
reqLog.Info("Node object not found, exit")
197+
writeCh(r.ErrCh, err)
198+
return ctrl.Result{}, err
199+
}
200+
reqLog.Error(err, "failed to get Node object from the cache")
201+
writeCh(r.ErrCh, err)
202+
return ctrl.Result{}, err
203+
}
204+
205+
if node.GetAnnotations()[r.SafeLoadAnnotation] == "" {
206+
reqLog.Info("annotation removed, unblock loading")
207+
writeCh(r.ErrCh, nil)
208+
return ctrl.Result{}, nil
209+
}
210+
reqLog.Info("annotation still present, waiting")
211+
212+
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
213+
}
214+
215+
func writeCh(ch chan error, err error) {
216+
select {
217+
case ch <- err:
218+
default:
219+
}
220+
}
82221

83-
return nil
222+
// SetupWithManager sets up the controller with the Manager.
223+
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
224+
return ctrl.NewControllerManagedBy(mgr).
225+
For(&corev1.Node{}).
226+
Complete(r)
84227
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2023, NVIDIA CORPORATION & AFFILIATES
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package app_test
15+
16+
import (
17+
"context"
18+
"testing"
19+
20+
. "github.com/onsi/ginkgo/v2"
21+
. "github.com/onsi/gomega"
22+
"k8s.io/client-go/kubernetes/scheme"
23+
"k8s.io/client-go/rest"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
25+
"sigs.k8s.io/controller-runtime/pkg/envtest"
26+
)
27+
28+
var (
29+
cfg *rest.Config
30+
k8sClient client.Client
31+
testEnv *envtest.Environment
32+
cFunc context.CancelFunc
33+
ctx context.Context
34+
)
35+
36+
func TestApp(t *testing.T) {
37+
RegisterFailHandler(Fail)
38+
RunSpecs(t, "Network Operator Init Container Suite")
39+
}
40+
41+
var _ = BeforeSuite(func() {
42+
By("bootstrapping test environment")
43+
testEnv = &envtest.Environment{}
44+
ctx, cFunc = context.WithCancel(context.Background())
45+
46+
var err error
47+
// cfg is defined in this file globally.
48+
cfg, err = testEnv.Start()
49+
Expect(err).NotTo(HaveOccurred())
50+
Expect(cfg).NotTo(BeNil())
51+
52+
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
53+
Expect(err).NotTo(HaveOccurred())
54+
Expect(k8sClient).NotTo(BeNil())
55+
56+
createNode(testNodeName)
57+
})
58+
59+
var _ = AfterSuite(func() {
60+
cFunc()
61+
By("tearing down the test environment")
62+
err := testEnv.Stop()
63+
Expect(err).NotTo(HaveOccurred())
64+
})

0 commit comments

Comments
 (0)