Skip to content

Commit f343359

Browse files
CloudWatch Agent integration for nvidia-training (#649)
* Added CW Agent for nvidia-training * Scrape interval changes + CW Agent cleanup * Update cloudwatch-agent.yaml * cw agent code changes * cw agent suggested changes * Fixing nits, modifying pod name, changing IRSA * nit. formatting * seperate cw manifest template function * log nit. fix * new flag system * fmt * Removed Region and added templating * Convert to gpflag update flag format * clean up for gpflag * e2e flag issue * removed manifest complexity * moving around ft. fixes * Add MetricOps struct and reflection-based flag parsing * Moved flags.go back + deploy cwagent based on metricDimensions flag * Removing flag.go file * anonymous function for optional daemonsets
1 parent bb3d7a9 commit f343359

File tree

9 files changed

+257
-51
lines changed

9 files changed

+257
-51
lines changed

test/cases/nvidia-training/bert_training_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ var (
2828
)
2929

3030
func TestBertTraining(t *testing.T) {
31-
if *bertTrainingImage == "" {
31+
if testConfig.BertTrainingImage == "" {
3232
t.Fatal(fmt.Errorf("bertTrainingImage must be set to run the test"))
3333
}
3434

3535
slotsPerWorker := gpuPerNode
3636
workerReplicas := nodeCount
3737
np := slotsPerWorker * workerReplicas
3838
efaRequested := 0
39-
if *efaEnabled && efaPerNode > 0 {
39+
if testConfig.EfaEnabled && efaPerNode > 0 {
4040
efaRequested = 1
4141
}
4242

@@ -45,7 +45,7 @@ func TestBertTraining(t *testing.T) {
4545
WithLabel("hardware", "gpu").
4646
Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
4747
renderVars := map[string]string{
48-
"BertTrainingImage": *bertTrainingImage,
48+
"BertTrainingImage": testConfig.BertTrainingImage,
4949
"SlotsPerWorker": fmt.Sprintf("%d", slotsPerWorker),
5050
"NP": fmt.Sprintf("%d", np),
5151
"WorkerReplicas": fmt.Sprintf("%d", workerReplicas),

test/cases/nvidia-training/main_test.go

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
_ "embed"
88
"fmt"
9+
"github.com/aws/aws-k8s-tester/test/common"
910
"log"
1011
"os"
1112
"os/signal"
@@ -26,6 +27,10 @@ import (
2627
)
2728

2829
func TestMain(m *testing.M) {
30+
_, err := common.ParseFlags(&testConfig)
31+
if err != nil {
32+
log.Fatalf("failed to parse flags: %v", err)
33+
}
2934
cfg, err := envconf.NewFromFlags()
3035
if err != nil {
3136
log.Fatalf("failed to initialize test environment: %v", err)
@@ -35,22 +40,32 @@ func TestMain(m *testing.M) {
3540
defer cancel()
3641
testenv = env.NewWithConfig(cfg).WithContext(ctx)
3742

38-
manifests := [][]byte{
43+
// Render CloudWatch Agent manifest with dynamic dimensions
44+
renderedCloudWatchAgentManifest, err := manifests.RenderCloudWatchAgentManifest(testConfig.MetricDimensions)
45+
if err != nil {
46+
log.Printf("Warning: failed to render CloudWatch Agent manifest: %v", err)
47+
}
48+
49+
manifestsList := [][]byte{
3950
manifests.NvidiaDevicePluginManifest,
4051
manifests.MpiOperatorManifest,
4152
manifests.EfaDevicePluginManifest,
42-
manifests.DCGMExporterManifest,
53+
}
54+
55+
if len(testConfig.MetricDimensions) > 0 {
56+
manifestsList = append(manifestsList, manifests.DCGMExporterManifest, renderedCloudWatchAgentManifest)
4357
}
4458

4559
testenv.Setup(
4660
// Apply all manifests
4761
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
48-
log.Println("Applying NVIDIA device plugin, MPI operator, EFA device plugin and DCGM Exporter manifests.")
49-
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifests...)
62+
log.Println("Applying manifests.")
63+
64+
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifestsList...)
5065
if err != nil {
5166
return ctx, fmt.Errorf("failed to apply manifests: %w", err)
5267
}
53-
log.Println("Successfully applied NVIDIA device plugin, MPI operator, EFA device plugin and DCGM Exporter manifests.")
68+
log.Println("Successfully applied manifests.")
5469
return ctx, nil
5570
},
5671

@@ -73,23 +88,34 @@ func TestMain(m *testing.M) {
7388
return ctx, nil
7489
},
7590

76-
// Wait for DaemonSets using helper
77-
deployDaemonSet("nvidia-device-plugin-daemonset", "kube-system"),
78-
deployDaemonSet("aws-efa-k8s-device-plugin-daemonset", "kube-system"),
79-
deployDaemonSet("dcgm-exporter", "kube-system"),
91+
// Wait for required DaemonSets
92+
common.DeployDaemonSet("nvidia-device-plugin-daemonset", "kube-system"),
93+
common.DeployDaemonSet("aws-efa-k8s-device-plugin-daemonset", "kube-system"),
94+
95+
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
96+
if len(testConfig.MetricDimensions) > 0 {
97+
if ctx, err := common.DeployDaemonSet("dcgm-exporter", "kube-system")(ctx, config); err != nil {
98+
return ctx, err
99+
}
100+
if ctx, err := common.DeployDaemonSet("cwagent", "amazon-cloudwatch")(ctx, config); err != nil {
101+
return ctx, err
102+
}
103+
}
104+
return ctx, nil
105+
}, // Deploy CloudWatch Agent + DCGM only if MetricDimensions are set
80106

81107
checkNodeTypes, // Dynamically check node types and capacities after device plugins are ready
82108
)
83109

84110
testenv.Finish(
85111
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
86-
log.Println("Deleting NVIDIA device plugin, MPI operator, EFA device plugin and DCGM Exporter manifests.")
87-
slices.Reverse(manifests)
88-
err := fwext.DeleteManifests(config.Client().RESTConfig(), manifests...)
112+
log.Println("Deleting NVIDIA device plugin, MPI operator, EFA device plugin DCGM Exporter and CloudWatch Agent manifests.")
113+
slices.Reverse(manifestsList)
114+
err := fwext.DeleteManifests(config.Client().RESTConfig(), manifestsList...)
89115
if err != nil {
90116
return ctx, fmt.Errorf("failed to delete manifests: %w", err)
91117
}
92-
log.Println("Successfully deleted NVIDIA device plugin, MPI operator, EFA device plugin and DCGM Exporter manifests.")
118+
log.Println("Successfully deleted NVIDIA device plugin, MPI operator, EFA device plugin, DCGM Exporter and CloudWatch Agent manifests.")
93119
return ctx, nil
94120
},
95121
)
@@ -121,10 +147,10 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
121147
}
122148
}
123149

124-
if *nodeType != "" {
150+
if testConfig.NodeType != "" {
125151
count := 0
126152
for _, v := range nodes.Items {
127-
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
153+
if v.Labels["node.kubernetes.io/instance-type"] == testConfig.NodeType {
128154
count++
129155
if gpuCap, ok := v.Status.Capacity["nvidia.com/gpu"]; ok {
130156
gpuPerNode = int(gpuCap.Value())
@@ -135,11 +161,11 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
135161
}
136162
}
137163
if count == 0 {
138-
return ctx, fmt.Errorf("no nodes match the specified nodeType: %s", *nodeType)
164+
return ctx, fmt.Errorf("no nodes match the specified nodeType: %s", testConfig.NodeType)
139165
}
140166
nodeCount = count
141167
} else {
142-
*nodeType = nodes.Items[0].Labels["node.kubernetes.io/instance-type"]
168+
testConfig.NodeType = nodes.Items[0].Labels["node.kubernetes.io/instance-type"]
143169
nodeCount = len(nodes.Items)
144170
if gpuCap, ok := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]; ok {
145171
gpuPerNode = int(gpuCap.Value())
@@ -149,28 +175,10 @@ func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Contex
149175
}
150176
}
151177

152-
log.Printf("[INFO] Node Type: %s", *nodeType)
178+
log.Printf("[INFO] Node Type: %s", testConfig.NodeType)
153179
log.Printf("[INFO] Node Count: %d", nodeCount)
154180
log.Printf("[INFO] GPU Per Node: %d", gpuPerNode)
155181
log.Printf("[INFO] EFA Per Node: %d", efaPerNode)
156182

157183
return ctx, nil
158184
}
159-
// Helper function to deploy DaemonSet + Wait for Ready
160-
func deployDaemonSet(name, namespace string) env.Func {
161-
return func(ctx context.Context, config *envconf.Config) (context.Context, error) {
162-
log.Printf("Waiting for %s daemonset to be ready.", name)
163-
daemonset := appsv1.DaemonSet{
164-
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
165-
}
166-
err := wait.For(
167-
fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&daemonset),
168-
wait.WithTimeout(5*time.Minute),
169-
)
170-
if err != nil {
171-
return ctx, fmt.Errorf("%s daemonset is not ready: %w", name, err)
172-
}
173-
log.Printf("%s daemonset is ready.", name)
174-
return ctx, nil
175-
}
176-
}

test/cases/nvidia-training/vars.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,23 @@
33
package training
44

55
import (
6-
"flag"
7-
6+
"github.com/aws/aws-k8s-tester/test/common"
87
"sigs.k8s.io/e2e-framework/pkg/env"
98
)
109

10+
type Config struct {
11+
common.MetricOps
12+
BertTrainingImage string `flag:"bertTrainingImage" desc:"Docker image used for BERT training workload"`
13+
EfaEnabled bool `flag:"efaEnabled" desc:"Enable Elastic Fabric Adapter (EFA)"`
14+
NodeType string `flag:"nodeType" desc:"Instance type for cluster nodes"`
15+
}
16+
1117
// Shared global variables
1218
var (
13-
testenv env.Environment
14-
bertTrainingImage *string
15-
efaEnabled *bool
16-
nodeType *string
19+
testenv env.Environment
20+
testConfig Config
1721

1822
nodeCount int
1923
gpuPerNode int
2024
efaPerNode int
2125
)
22-
23-
func init() {
24-
bertTrainingImage = flag.String("bertTrainingImage", "", "Docker image used for BERT training workload")
25-
efaEnabled = flag.Bool("efaEnabled", false, "Enable Elastic Fabric Adapter (EFA)")
26-
nodeType = flag.String("nodeType", "", "Instance type for cluster nodes")
27-
}

test/common/flags.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
//go:build e2e
2+
3+
package common
4+
5+
import (
6+
"flag"
7+
"fmt"
8+
"github.com/octago/sflags/gen/gpflag"
9+
"github.com/spf13/pflag"
10+
"reflect"
11+
)
12+
13+
// For CloudWatch metric dimension flag
14+
type MetricOps struct {
15+
MetricDimensions map[string]string `flag:"metricDimensions" desc:"CloudWatch metric dimensions as comma-separated key=value pairs"`
16+
}
17+
18+
func ParseFlags(config interface{}) (*pflag.FlagSet, error) {
19+
flags, err := gpflag.Parse(config)
20+
if err != nil {
21+
return nil, fmt.Errorf("failed to parse flags: %w", err)
22+
}
23+
24+
// Handle MetricDimensions map that gpflag doesn't support
25+
if _, hasField := reflect.TypeOf(config).Elem().FieldByName("MetricDimensions"); hasField {
26+
field := reflect.ValueOf(config).Elem().FieldByName("MetricDimensions")
27+
metricDims := field.Addr().Interface().(*map[string]string)
28+
flags.StringToStringVar(metricDims, "metricDimensions", nil, "CloudWatch metric dimensions as comma-separated key=value pairs")
29+
}
30+
31+
flags.VisitAll(func(pf *pflag.Flag) {
32+
flag.CommandLine.Var(pf.Value, pf.Name, pf.Usage)
33+
})
34+
35+
return flags, nil
36+
}

test/common/resources.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
//go:build e2e
2+
3+
package common
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"log"
9+
"time"
10+
11+
fwext "github.com/aws/aws-k8s-tester/internal/e2e"
12+
appsv1 "k8s.io/api/apps/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"sigs.k8s.io/e2e-framework/klient/wait"
15+
"sigs.k8s.io/e2e-framework/pkg/env"
16+
"sigs.k8s.io/e2e-framework/pkg/envconf"
17+
)
18+
19+
// DeployDaemonSet returns a function to deploy and wait for a DaemonSet to be ready
20+
func DeployDaemonSet(name, namespace string) env.Func {
21+
return func(ctx context.Context, config *envconf.Config) (context.Context, error) {
22+
log.Printf("Waiting for %s daemonset to be ready.", name)
23+
daemonset := appsv1.DaemonSet{
24+
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
25+
}
26+
err := wait.For(
27+
fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&daemonset),
28+
wait.WithTimeout(5*time.Minute),
29+
wait.WithContext(ctx),
30+
)
31+
if err != nil {
32+
return ctx, fmt.Errorf("%s daemonset is not ready: %w", name, err)
33+
}
34+
log.Printf("%s daemonset is ready.", name)
35+
return ctx, nil
36+
}
37+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
apiVersion: v1
2+
kind: ConfigMap
3+
metadata:
4+
name: prometheus-cwagentconfig
5+
namespace: amazon-cloudwatch
6+
data:
7+
cwagentconfig.json: |
8+
{
9+
"agent": {
10+
"debug": true
11+
},
12+
"logs": {
13+
"metrics_collected": {
14+
"prometheus": {
15+
"prometheus_config_path": "/etc/prometheusconfig/prometheus.yaml",
16+
"emf_processor": {
17+
"metric_declaration": [
18+
{
19+
"source_labels": ["job"],
20+
"label_matcher": "dcgm-exporter",
21+
"dimensions": [[{{.DimensionKeys}}]],
22+
"metric_selectors": [
23+
"^DCGM_FI_DEV_GPU_UTIL$",
24+
"^DCGM_FI_DEV_MEM_COPY_UTIL$",
25+
"^DCGM_FI_DEV_FB_USED$",
26+
"^DCGM_FI_DEV_FB_FREE$",
27+
"^DCGM_FI_DEV_POWER_USAGE$"
28+
]
29+
}
30+
]
31+
}
32+
}
33+
},
34+
"force_flush_interval": 5
35+
}
36+
}
37+
38+
---
39+
apiVersion: v1
40+
kind: ConfigMap
41+
metadata:
42+
name: prometheus-config
43+
namespace: amazon-cloudwatch
44+
data:
45+
prometheus.yaml: |
46+
global:
47+
scrape_interval: 1s
48+
scrape_timeout: 1s
49+
scrape_configs:
50+
- job_name: dcgm-exporter
51+
static_configs:
52+
- targets:
53+
- dcgm-exporter.kube-system.svc.cluster.local:9400
54+
metrics_path: /metrics
55+
metric_relabel_configs:
56+
{{- range $key, $value := .MetricDimensions}}
57+
- {action: replace, target_label: {{$key}}, replacement: '{{$value}}'}
58+
{{- end}}
59+
---
60+
apiVersion: apps/v1
61+
kind: DaemonSet
62+
metadata:
63+
name: cwagent
64+
namespace: amazon-cloudwatch
65+
spec:
66+
selector:
67+
matchLabels:
68+
app: cwagent
69+
template:
70+
metadata:
71+
labels:
72+
app: cwagent
73+
spec:
74+
serviceAccountName: cwagent
75+
dnsPolicy: ClusterFirst
76+
containers:
77+
- name: cloudwatch-agent
78+
image: public.ecr.aws/cloudwatch-agent/cloudwatch-agent:latest
79+
imagePullPolicy: Always
80+
resources:
81+
limits:
82+
cpu: 1000m
83+
memory: 1000Mi
84+
requests:
85+
cpu: 200m
86+
memory: 200Mi
87+
volumeMounts:
88+
- name: prometheus-cwagentconfig
89+
mountPath: /etc/cwagentconfig
90+
- name: prometheus-config
91+
mountPath: /etc/prometheusconfig
92+
volumes:
93+
- name: prometheus-cwagentconfig
94+
configMap:
95+
name: prometheus-cwagentconfig
96+
- name: prometheus-config
97+
configMap:
98+
name: prometheus-config
99+
terminationGracePeriodSeconds: 60
100+
---

0 commit comments

Comments
 (0)