diff --git a/CHANGELOG.md b/CHANGELOG.md index 532108424..420f69262 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Fixed - Fixed a bug where the scheduler would not re-try updating podgroup status after failure - Fixed a bug where ray workloads gang scheduling would ignore `minReplicas` if autoscaling was not set +- KAI Config wrong statuses when prometheus operand is enabled ## [v0.9.1] - 20250-09-15 diff --git a/pkg/apis/kai/v1/config_types.go b/pkg/apis/kai/v1/config_types.go index fec320927..d4abcae4b 100644 --- a/pkg/apis/kai/v1/config_types.go +++ b/pkg/apis/kai/v1/config_types.go @@ -37,6 +37,7 @@ const ( Available ConditionReason = "available" Reconciled ConditionReason = "reconciled" DependenciesFulfilled ConditionReason = "dependencies_fulfilled" + DependenciesMissing ConditionReason = "dependencies_missing" PrometheusConnected ConditionReason = "prometheus_connected" PrometheusConnectionFailed ConditionReason = "prometheus_connection_failed" ) diff --git a/pkg/operator/controller/status_reconciler/status_reconciler.go b/pkg/operator/controller/status_reconciler/status_reconciler.go index 8de703d69..696b7615c 100644 --- a/pkg/operator/controller/status_reconciler/status_reconciler.go +++ b/pkg/operator/controller/status_reconciler/status_reconciler.go @@ -48,7 +48,7 @@ func (r *StatusReconciler) ReconcileStatus(ctx context.Context, object objectWit if err := r.reconcileCondition(ctx, object, r.getAvailableCondition(ctx, object.GetGeneration())); err != nil { return err } - return r.reconcileCondition(ctx, object, r.getDependenciesFulfilledCondition(object.GetGeneration())) + return r.reconcileCondition(ctx, object, r.getDependenciesFulfilledCondition(ctx, object)) } func (r *StatusReconciler) reconcileCondition(ctx context.Context, object objectWithConditions, condition metav1.Condition) error { @@ -155,13 +155,36 @@ func (r *StatusReconciler) getAvailableCondition(ctx context.Context, gen int64) } } -func (r *StatusReconciler) getDependenciesFulfilledCondition(gen int64) metav1.Condition { +func (r *StatusReconciler) getDependenciesFulfilledCondition(ctx context.Context, object objectWithConditions) metav1.Condition { + missingDependencies, err := r.deployable.HasMissingDependencies(ctx, r.Client, object.GetInternalObject()) + if err != nil { + return metav1.Condition{ + Type: string(kaiv1.ConditionDependenciesFulfilled), + Status: metav1.ConditionFalse, + Reason: string(kaiv1.DependenciesMissing), + Message: err.Error(), + ObservedGeneration: object.GetGeneration(), + LastTransitionTime: metav1.Now(), + } + } + + if len(missingDependencies) > 0 { + return metav1.Condition{ + Type: string(kaiv1.ConditionDependenciesFulfilled), + Status: metav1.ConditionFalse, + Reason: string(kaiv1.DependenciesMissing), + Message: missingDependencies, + ObservedGeneration: object.GetGeneration(), + LastTransitionTime: metav1.Now(), + } + } + return metav1.Condition{ Type: string(kaiv1.ConditionDependenciesFulfilled), Status: metav1.ConditionTrue, Reason: string(kaiv1.DependenciesFulfilled), Message: "Dependencies are fulfilled", - ObservedGeneration: gen, + ObservedGeneration: object.GetGeneration(), LastTransitionTime: metav1.Now(), } } diff --git a/pkg/operator/controller/status_reconciler/status_reconciler_test.go b/pkg/operator/controller/status_reconciler/status_reconciler_test.go index c48836b46..405ca7303 100644 --- a/pkg/operator/controller/status_reconciler/status_reconciler_test.go +++ b/pkg/operator/controller/status_reconciler/status_reconciler_test.go @@ -188,6 +188,14 @@ func (f *fakeDeployable) IsAvailable(ctx context.Context, runtimeClient client.R return f.isAvailable, nil } +func (f *fakeDeployable) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { + return nil +} + +func (f *fakeDeployable) HasMissingDependencies(ctx context.Context, readerClient client.Reader, obj client.Object) (string, error) { + return "", nil +} + func (f *fakeDeployable) Name() string { return "fakeDeployable" } diff --git a/pkg/operator/operands/admission/admission.go b/pkg/operator/operands/admission/admission.go index 698a1648d..c3694d6af 100644 --- a/pkg/operator/operands/admission/admission.go +++ b/pkg/operator/operands/admission/admission.go @@ -81,3 +81,7 @@ func (a *Admission) Name() string { func (a *Admission) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { return nil } + +func (a *Admission) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} diff --git a/pkg/operator/operands/binder/binder.go b/pkg/operator/operands/binder/binder.go index 9b14bf505..0d9898dd0 100644 --- a/pkg/operator/operands/binder/binder.go +++ b/pkg/operator/operands/binder/binder.go @@ -66,3 +66,7 @@ func (b *Binder) Name() string { func (b *Binder) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { return nil } + +func (b *Binder) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} diff --git a/pkg/operator/operands/deployable/deployable.go b/pkg/operator/operands/deployable/deployable.go index f7022ef2f..572e255d4 100644 --- a/pkg/operator/operands/deployable/deployable.go +++ b/pkg/operator/operands/deployable/deployable.go @@ -66,15 +66,15 @@ func (d *DeployableOperands) Deploy( Controller: ptr.To(true), } - if createObjectsInCluster(ctx, runtimeClient, reconcilerAsOwnerReference, objectsToCreate) != nil { + if err := createObjectsInCluster(ctx, runtimeClient, reconcilerAsOwnerReference, objectsToCreate); err != nil { return err } - if deleteObjectsInCluster(ctx, runtimeClient, objectsToDelete) != nil { + if err := deleteObjectsInCluster(ctx, runtimeClient, objectsToDelete); err != nil { return err } - if updateObjectsInCluster(ctx, runtimeClient, reconcilerAsOwnerReference, objectsToUpdate) != nil { + if err := updateObjectsInCluster(ctx, runtimeClient, reconcilerAsOwnerReference, objectsToUpdate); err != nil { return err } return nil @@ -301,3 +301,26 @@ func (d *DeployableOperands) Monitor(ctx context.Context, runtimeReader client.R } return nil } + +func (d *DeployableOperands) HasMissingDependencies(ctx context.Context, readerClient client.Reader, object client.Object) (string, error) { + var missingDependencies string + var err error + + kaiConfig, checkForKAIConfig := object.(*kaiv1.Config) + if !checkForKAIConfig { + return "", nil + } + + for _, operand := range d.operands { + // Assuming each operand has a HasDependencies method + missing, e := operand.HasMissingDependencies(ctx, readerClient, kaiConfig) + if e != nil { + err = errors.Join(err, e) + } + if len(missing) > 0 { + missingDependencies = missingDependencies + fmt.Sprintf("\n%s is missing %s", operand.Name(), missing) + } + } + + return missingDependencies, err +} diff --git a/pkg/operator/operands/deployable/deployable_test.go b/pkg/operator/operands/deployable/deployable_test.go index 35550856d..b48589ae8 100644 --- a/pkg/operator/operands/deployable/deployable_test.go +++ b/pkg/operator/operands/deployable/deployable_test.go @@ -5,6 +5,7 @@ package deployable import ( "context" + "errors" "testing" kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1" @@ -24,6 +25,7 @@ import ( "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" ) func TestDeployable(t *testing.T) { @@ -62,92 +64,154 @@ var _ = Describe("Deployable", func() { deployable *DeployableOperands fakeClient client.Client ) - BeforeEach(func() { - operand := &fakeOperand{} + Context("object creation successfull", func() { - deployable = New([]operands.Operand{operand}, known_types.KAIConfigRegisteredCollectible) - fakeClient = getFakeClient(fakeClientBuilder, known_types.KAIConfigRegisteredCollectible) - }) - It("should deploy operands desired state", func() { - Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) - podsList := &v1.PodList{} - err := fakeClient.List(context.TODO(), podsList) - Expect(err).ToNot(HaveOccurred()) - Expect(len(podsList.Items)).To(Equal(1)) + BeforeEach(func() { + operand := &fakeOperand{} - configMapList := &v1.ConfigMapList{} - err = fakeClient.List(context.TODO(), configMapList) - Expect(err).ToNot(HaveOccurred()) - Expect(len(configMapList.Items)).To(Equal(1)) - }) - It("should not deploy operands on custom field inheritor", func() { - desiredWithAnnotation := &v1.ConfigMap{ - TypeMeta: metav1.TypeMeta{ - Kind: "ConfigMap", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "bar", - Annotations: map[string]string{"A": "a"}, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: kaiConfig.GetObjectKind().GroupVersionKind().GroupVersion().String(), - Kind: kaiConfig.GetObjectKind().GroupVersionKind().Kind, - Name: kaiConfig.GetName(), - UID: kaiConfig.GetUID(), - Controller: ptr.To(true), + deployable = New([]operands.Operand{operand}, known_types.KAIConfigRegisteredCollectible) + fakeClient = getFakeClient(fakeClientBuilder, known_types.KAIConfigRegisteredCollectible) + }) + It("should deploy operands desired state", func() { + Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) + podsList := &v1.PodList{} + err := fakeClient.List(context.TODO(), podsList) + Expect(err).ToNot(HaveOccurred()) + Expect(len(podsList.Items)).To(Equal(1)) + + configMapList := &v1.ConfigMapList{} + err = fakeClient.List(context.TODO(), configMapList) + Expect(err).ToNot(HaveOccurred()) + Expect(len(configMapList.Items)).To(Equal(1)) + }) + It("should not deploy operands on custom field inheritor", func() { + desiredWithAnnotation := &v1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + Annotations: map[string]string{"A": "a"}, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: kaiConfig.GetObjectKind().GroupVersionKind().GroupVersion().String(), + Kind: kaiConfig.GetObjectKind().GroupVersionKind().Kind, + Name: kaiConfig.GetName(), + UID: kaiConfig.GetUID(), + Controller: ptr.To(true), + }, }, }, - }, - } - Expect(fakeClient.Create(context.TODO(), desiredWithAnnotation)).To(Succeed()) - - deployable.RegisterFieldsInheritFromClusterObjects(&v1.ConfigMap{}, func(current, desired client.Object) { - currentCm := current.(*v1.ConfigMap) - desiredCm := desired.(*v1.ConfigMap) - if desiredCm.Annotations == nil { - desiredCm.Annotations = map[string]string{} } - maps.Copy(desiredCm.Annotations, currentCm.Annotations) + Expect(fakeClient.Create(context.TODO(), desiredWithAnnotation)).To(Succeed()) + + deployable.RegisterFieldsInheritFromClusterObjects(&v1.ConfigMap{}, func(current, desired client.Object) { + currentCm := current.(*v1.ConfigMap) + desiredCm := desired.(*v1.ConfigMap) + if desiredCm.Annotations == nil { + desiredCm.Annotations = map[string]string{} + } + maps.Copy(desiredCm.Annotations, currentCm.Annotations) + }) + Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) + + cmList := &v1.ConfigMapList{} + err := fakeClient.List(context.TODO(), cmList) + Expect(err).ToNot(HaveOccurred()) + Expect(len(cmList.Items)).To(Equal(1)) + Expect(len(cmList.Items[0].Annotations)).To(Equal(1)) }) - Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) - cmList := &v1.ConfigMapList{} - err := fakeClient.List(context.TODO(), cmList) - Expect(err).ToNot(HaveOccurred()) - Expect(len(cmList.Items)).To(Equal(1)) - Expect(len(cmList.Items[0].Annotations)).To(Equal(1)) + It("should delete other resources", func() { + otherConfigMap := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo2", + Namespace: "bar2", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: kaiConfig.GetObjectKind().GroupVersionKind().GroupVersion().String(), + Kind: kaiConfig.GetObjectKind().GroupVersionKind().Kind, + Name: kaiConfig.GetName(), + UID: kaiConfig.GetUID(), + Controller: ptr.To(true), + }, + }, + }, + } + Expect(fakeClient.Create(context.TODO(), otherConfigMap)).To(Succeed()) + + Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) + + configMapList := &v1.ConfigMapList{} + err := fakeClient.List(context.TODO(), configMapList) + Expect(err).ToNot(HaveOccurred()) + Expect(len(configMapList.Items)).To(Equal(1)) + + for _, item := range configMapList.Items { + Expect(item.Name).ToNot(Equal(otherConfigMap.Name)) + } + }) }) - It("should delete other resources", func() { - otherConfigMap := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo2", - Namespace: "bar2", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: kaiConfig.GetObjectKind().GroupVersionKind().GroupVersion().String(), - Kind: kaiConfig.GetObjectKind().GroupVersionKind().Kind, - Name: kaiConfig.GetName(), - UID: kaiConfig.GetUID(), - Controller: ptr.To(true), + Context("Object creation fails", func() { + var ( + fakeClient client.Client + createCalls int + updateCalls int + createError error + updateError error + ) + BeforeEach(func() { + createCalls = 0 + updateCalls = 0 + createError = nil + updateError = nil + + operand := &fakeOperand{} + deployable = New([]operands.Operand{operand}, known_types.KAIConfigRegisteredCollectible) + + fakeClient = getFakeClient(fakeClientBuilder. + WithInterceptorFuncs(interceptor.Funcs{ + Create: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + createCalls++ + return createError }, - }, - }, - } - Expect(fakeClient.Create(context.TODO(), otherConfigMap)).To(Succeed()) + Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + updateCalls++ + return updateError + }, + }), known_types.KAIConfigRegisteredCollectible) + }) - Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) + It("should create object successfully", func() { + Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) + Expect(createCalls).To(Equal(2)) // Pod and ConfigMap creates + Expect(updateCalls).To(Equal(0)) + }) - configMapList := &v1.ConfigMapList{} - err := fakeClient.List(context.TODO(), configMapList) - Expect(err).ToNot(HaveOccurred()) - Expect(len(configMapList.Items)).To(Equal(1)) + It("should update object if create fails due to existing resource", func() { + createError = errors.New("already exists") + defer func() { createError = nil }() + Expect(deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig)).To(Succeed()) + Expect(createCalls).To(Equal(2)) + Expect(updateCalls).To(Equal(2)) + }) - for _, item := range configMapList.Items { - Expect(item.Name).ToNot(Equal(otherConfigMap.Name)) - } + It("should fail if both create and update fail", func() { + createError = errors.New("already exists") + updateError = errors.New("update failed") + defer func() { + createError = nil + updateError = nil + }() + + err := deployable.Deploy(context.TODO(), fakeClient, kaiConfig, kaiConfig) + Expect(err).To(HaveOccurred()) + Expect(createCalls).To(Equal(1)) + Expect(updateCalls).To(Equal(1)) + }) }) }) @@ -198,6 +262,64 @@ var _ = Describe("Deployable", func() { }) }) + Describe("Monitor", func() { + var ( + deployable *DeployableOperands + fakeClient client.Client + ) + BeforeEach(func() { + operand := &fakeOperand{} + deployable = New([]operands.Operand{operand}, known_types.KAIConfigRegisteredCollectible) + fakeClient = getFakeClient(fakeClientBuilder, known_types.KAIConfigRegisteredCollectible) + }) + + It("should not return error when monitoring", func() { + Expect(deployable.Monitor(context.TODO(), fakeClient, kaiConfig)).To(Succeed()) + }) + + It("should return error if operand's Monitor fails", func() { + errOperand := &fakeOperandWithMonitorError{monitorErr: errors.New("monitor failed")} + deployable = New([]operands.Operand{errOperand}, known_types.KAIConfigRegisteredCollectible) + Expect(deployable.Monitor(context.TODO(), fakeClient, kaiConfig)).To(MatchError(ContainSubstring("monitor failed"))) + Expect(errOperand.monitorErr).To(HaveOccurred()) + }) + }) + + Describe("HasMissingDependencies", func() { + var ( + deployable *DeployableOperands + fakeClient client.Reader + ) + BeforeEach(func() { + operand := &fakeOperand{} + deployable = New([]operands.Operand{operand}, known_types.KAIConfigRegisteredCollectible) + fakeClient = getFakeClient(fakeClientBuilder, known_types.KAIConfigRegisteredCollectible) + }) + + It("should return no missing dependencies if all operands report none", func() { + missing, err := deployable.HasMissingDependencies(context.TODO(), fakeClient, kaiConfig) + Expect(missing).To(BeEmpty()) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should aggregate missing dependencies from operands", func() { + operand1 := &fakeOperandWithDeps{missingDeps: "dep1", name: "operand1"} + operand2 := &fakeOperandWithDeps{missingDeps: "dep2", name: "operand2"} + deployable = New([]operands.Operand{operand1, operand2}, known_types.KAIConfigRegisteredCollectible) + missing, err := deployable.HasMissingDependencies(context.TODO(), fakeClient, kaiConfig) + Expect(missing).To(ContainSubstring("operand1 is missing dep1")) + Expect(missing).To(ContainSubstring("operand2 is missing dep2")) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should handle error from operand's HasMissingDependencies", func() { + errOperand := &fakeOperandWithDeps{hasDepErr: errors.New("dependency check failed"), name: "errOperand"} + deployable = New([]operands.Operand{errOperand}, known_types.KAIConfigRegisteredCollectible) + _, err := deployable.HasMissingDependencies(context.TODO(), fakeClient, kaiConfig) + Expect(err).To(MatchError(ContainSubstring("dependency check failed"))) + }) + }) + Describe("SortObjectByCreationOrder", func() { var ( orderDefinition []string @@ -355,6 +477,10 @@ func (f *fakeOperand) Monitor(_ context.Context, _ client.Reader, _ *kaiv1.Confi return nil } +func (f *fakeOperand) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} + func (f *fakeOperand) Name() string { if f.name == "" { return "fakeOperand" @@ -371,3 +497,48 @@ func getFakeClient(builder *fake.ClientBuilder, collectables []*known_types.Coll } return builder.Build() } + +type fakeOperandWithMonitorError struct { + monitorErr error +} + +func (f *fakeOperandWithMonitorError) Monitor(_ context.Context, _ client.Reader, _ *kaiv1.Config) error { + return f.monitorErr +} + +func (f *fakeOperandWithMonitorError) Name() string { return "fakeOperandWithMonitorError" } +func (f *fakeOperandWithMonitorError) DesiredState(_ context.Context, _ client.Reader, _ *kaiv1.Config) ([]client.Object, error) { + return nil, nil +} +func (f *fakeOperandWithMonitorError) IsDeployed(_ context.Context, _ client.Reader) (bool, error) { + return true, nil +} +func (f *fakeOperandWithMonitorError) IsAvailable(_ context.Context, _ client.Reader) (bool, error) { + return true, nil +} +func (f *fakeOperandWithMonitorError) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} + +type fakeOperandWithDeps struct { + missingDeps string + hasDepErr error + name string +} + +func (f *fakeOperandWithDeps) HasMissingDependencies(_ context.Context, _ client.Reader, _ *kaiv1.Config) (string, error) { + return f.missingDeps, f.hasDepErr +} +func (f *fakeOperandWithDeps) Name() string { return f.name } +func (f *fakeOperandWithDeps) DesiredState(_ context.Context, _ client.Reader, _ *kaiv1.Config) ([]client.Object, error) { + return nil, nil +} +func (f *fakeOperandWithDeps) IsDeployed(_ context.Context, _ client.Reader) (bool, error) { + return true, nil +} +func (f *fakeOperandWithDeps) IsAvailable(_ context.Context, _ client.Reader) (bool, error) { + return true, nil +} +func (f *fakeOperandWithDeps) Monitor(_ context.Context, _ client.Reader, _ *kaiv1.Config) error { + return nil +} diff --git a/pkg/operator/operands/deployable/interface.go b/pkg/operator/operands/deployable/interface.go index 6d76ff4cc..e9d078fe6 100644 --- a/pkg/operator/operands/deployable/interface.go +++ b/pkg/operator/operands/deployable/interface.go @@ -14,4 +14,6 @@ type Deployable interface { Deploy(ctx context.Context, runtimeClient client.Client, kaiConfig *kaiConfig.Config, reconciler client.Object) error IsDeployed(ctx context.Context, readerClient client.Reader) (bool, error) IsAvailable(ctx context.Context, readerClient client.Reader) (bool, error) + Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiConfig.Config) error + HasMissingDependencies(ctx context.Context, readerClient client.Reader, obj client.Object) (string, error) } diff --git a/pkg/operator/operands/interface.go b/pkg/operator/operands/interface.go index 3472b7269..4c227db9c 100644 --- a/pkg/operator/operands/interface.go +++ b/pkg/operator/operands/interface.go @@ -6,14 +6,15 @@ package operands import ( "context" - enginev1alpha1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1" + kaiv1 "github.com/NVIDIA/KAI-scheduler/pkg/apis/kai/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) type Operand interface { - DesiredState(context.Context, client.Reader, *enginev1alpha1.Config) ([]client.Object, error) + DesiredState(context.Context, client.Reader, *kaiv1.Config) ([]client.Object, error) IsDeployed(context.Context, client.Reader) (bool, error) IsAvailable(context.Context, client.Reader) (bool, error) - Monitor(context.Context, client.Reader, *enginev1alpha1.Config) error + Monitor(context.Context, client.Reader, *kaiv1.Config) error + HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) Name() string } diff --git a/pkg/operator/operands/node_scale_adjuster/node_scale_adjuster.go b/pkg/operator/operands/node_scale_adjuster/node_scale_adjuster.go index 3df719cba..9726ceb50 100644 --- a/pkg/operator/operands/node_scale_adjuster/node_scale_adjuster.go +++ b/pkg/operator/operands/node_scale_adjuster/node_scale_adjuster.go @@ -61,3 +61,7 @@ func (nsa *NodeScaleAdjuster) Name() string { func (nsa *NodeScaleAdjuster) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { return nil } + +func (nsa *NodeScaleAdjuster) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} diff --git a/pkg/operator/operands/pod_group_controller/pod_group_controller.go b/pkg/operator/operands/pod_group_controller/pod_group_controller.go index 5ac3ecd6f..1f24e066c 100644 --- a/pkg/operator/operands/pod_group_controller/pod_group_controller.go +++ b/pkg/operator/operands/pod_group_controller/pod_group_controller.go @@ -81,3 +81,7 @@ func (p *PodGroupController) Name() string { func (p *PodGroupController) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { return nil } + +func (p *PodGroupController) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} diff --git a/pkg/operator/operands/pod_grouper/pod_grouper.go b/pkg/operator/operands/pod_grouper/pod_grouper.go index 3df386317..5c677fa59 100644 --- a/pkg/operator/operands/pod_grouper/pod_grouper.go +++ b/pkg/operator/operands/pod_grouper/pod_grouper.go @@ -64,3 +64,7 @@ func (p *PodGrouper) Name() string { func (p *PodGrouper) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { return nil } + +func (p *PodGrouper) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} diff --git a/pkg/operator/operands/prometheus/prometheus.go b/pkg/operator/operands/prometheus/prometheus.go index 7dce981d5..b99ae9cee 100644 --- a/pkg/operator/operands/prometheus/prometheus.go +++ b/pkg/operator/operands/prometheus/prometheus.go @@ -36,6 +36,10 @@ func (p *Prometheus) DesiredState( p.namespace = kaiConfig.Spec.Namespace p.client = runtimeClient.(client.Client) + if kaiConfig.Spec.Prometheus == nil || kaiConfig.Spec.Prometheus.Enabled == nil || !*kaiConfig.Spec.Prometheus.Enabled { + return []client.Object{}, nil + } + var objects []client.Object for _, resourceFunc := range []promethuesResourceForKAIConfig{ prometheusForKAIConfig, @@ -55,7 +59,6 @@ func (p *Prometheus) DesiredState( } func (b *Prometheus) IsDeployed(ctx context.Context, readerClient client.Reader) (bool, error) { - // If there are no objects to check, consider it deployed if len(b.lastDesiredState) == 0 { return true, nil } @@ -63,21 +66,26 @@ func (b *Prometheus) IsDeployed(ctx context.Context, readerClient client.Reader) } func (b *Prometheus) IsAvailable(ctx context.Context, readerClient client.Reader) (bool, error) { - // If there are no objects to check, consider it available if len(b.lastDesiredState) == 0 { return true, nil } - prometheus := &monitoringv1.Prometheus{} - err := readerClient.Get(ctx, client.ObjectKey{ - Name: mainResourceName, - Namespace: b.namespace, - }, prometheus) + var prometheus *monitoringv1.Prometheus = nil + for _, obj := range b.lastDesiredState { + var ok bool + if prometheus, ok = obj.(*monitoringv1.Prometheus); ok { + break + } + } + if prometheus == nil { + return true, nil + } + + err := readerClient.Get(ctx, client.ObjectKeyFromObject(prometheus), prometheus) if err != nil { return false, err } - // Check if there are any conditions and if the first one is Available if len(prometheus.Status.Conditions) > 0 { for _, condition := range prometheus.Status.Conditions { if condition.Type == monitoringv1.ConditionType("Available") { @@ -92,14 +100,32 @@ func (b *Prometheus) Name() string { return "KAI-prometheus" } +func (p *Prometheus) HasMissingDependencies(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) (string, error) { + if kaiConfig.Spec.Prometheus == nil || kaiConfig.Spec.Prometheus.Enabled == nil || !*kaiConfig.Spec.Prometheus.Enabled { + return "", nil + } + if kaiConfig.Spec.Prometheus.ExternalPrometheusUrl != nil && *kaiConfig.Spec.Prometheus.ExternalPrometheusUrl != "" { + return "", nil + } + + hasPrometheusOperator, err := common.CheckPrometheusCRDsAvailable(ctx, runtimeReader, "prometheus") + if err != nil { + return "", err + } + + if !hasPrometheusOperator { + return "prometheus operator", nil + } + + return "", nil +} + func (p *Prometheus) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { - // Check if external Prometheus is configured hasExternalPrometheus := kaiConfig.Spec.Prometheus != nil && kaiConfig.Spec.Prometheus.ExternalPrometheusUrl != nil && *kaiConfig.Spec.Prometheus.ExternalPrometheusUrl != "" if !hasExternalPrometheus { - // Stop monitoring if not already running if p.monitoringCancel != nil { p.monitoringCancel() p.monitoringCtx = nil @@ -108,18 +134,12 @@ func (p *Prometheus) Monitor(ctx context.Context, runtimeReader client.Reader, k return nil } - // do nothing if already running if p.monitoringCtx != nil && p.monitoringCtx.Err() == nil { return nil } - // Start monitoring if not already running p.monitoringCtx, p.monitoringCancel = context.WithCancel(ctx) - - // Create status updater function statusUpdater := p.createStatusUpdaterFunction(kaiConfig) - - // Start the monitoring goroutine StartMonitoring(p.monitoringCtx, kaiConfig.Spec.Prometheus, statusUpdater) return nil @@ -128,7 +148,6 @@ func (p *Prometheus) Monitor(ctx context.Context, runtimeReader client.Reader, k // createStatusUpdaterFunction creates a function that updates the KAI Config status func (p *Prometheus) createStatusUpdaterFunction(kaiConfig *kaiv1.Config) func(ctx context.Context, condition metav1.Condition) error { return func(ctx context.Context, condition metav1.Condition) error { - // Get fresh kaiConfig from cluster currentConfig := &kaiv1.Config{} if err := p.client.Get(ctx, client.ObjectKey{ Name: kaiConfig.Name, @@ -137,10 +156,8 @@ func (p *Prometheus) createStatusUpdaterFunction(kaiConfig *kaiv1.Config) func(c return fmt.Errorf("failed to get current config: %w", err) } - // Set the observed generation to match the current config generation condition.ObservedGeneration = currentConfig.Generation - // Get the current config to update configToUpdate := currentConfig.DeepCopy() // Find and update the Prometheus connectivity condition @@ -226,25 +243,20 @@ func StartMonitoring(ctx context.Context, prometheusConfig *kaiprometheus.Promet func pingExternalPrometheus(ctx context.Context, prometheusURL string, timeout int, maxRetries int) error { logger := log.FromContext(ctx) - // Ensure the URL has a scheme if !strings.Contains(prometheusURL, "://") { prometheusURL = "http://" + prometheusURL } - // Parse the URL to ensure it's valid _, err := url.Parse(prometheusURL) if err != nil { return fmt.Errorf("invalid Prometheus URL: %w, prometheusURL: %s", err, prometheusURL) } - // Create HTTP client with timeout client := &http.Client{ Timeout: time.Duration(timeout) * time.Second, } - // Try to connect to the Prometheus /api/v1/status/config endpoint statusURL := prometheusURL + "/api/v1/status/config" - logger.Info("Validating external Prometheus connection", "url", statusURL) var lastErr error for attempt := 1; attempt <= maxRetries; attempt++ { @@ -282,18 +294,3 @@ func pingExternalPrometheus(ctx context.Context, prometheusURL string, timeout i return fmt.Errorf("failed to connect to external Prometheus after %d attempts: %w", maxRetries, lastErr) } - -// ValidateExternalPrometheusConnection validates connectivity to an external Prometheus instance -func ValidateExternalPrometheusConnection(ctx context.Context, prometheusConfig *kaiprometheus.Prometheus) error { - // Check if external Prometheus URL is configured - if prometheusConfig == nil || prometheusConfig.ExternalPrometheusUrl == nil || *prometheusConfig.ExternalPrometheusUrl == "" { - return nil - } - - // Validate the connection once - err := pingExternalPrometheus(ctx, *prometheusConfig.ExternalPrometheusUrl, *prometheusConfig.ExternalPrometheusHealthProbe.Timeout, *prometheusConfig.ExternalPrometheusHealthProbe.MaxRetries) - if err != nil { - return fmt.Errorf("failed to ping external Prometheus: %w", err) - } - return nil -} diff --git a/pkg/operator/operands/prometheus/prometheus_test.go b/pkg/operator/operands/prometheus/prometheus_test.go index 202e90f98..db9825d19 100644 --- a/pkg/operator/operands/prometheus/prometheus_test.go +++ b/pkg/operator/operands/prometheus/prometheus_test.go @@ -59,18 +59,11 @@ var _ = Describe("Prometheus", func() { }) Context("when Prometheus is disabled", func() { - It("should return ServiceAccount only", func(ctx context.Context) { + It("should return no objects", func(ctx context.Context) { kaiConfig.Spec.Prometheus.Enabled = ptr.To(false) objects, err := prometheus.DesiredState(ctx, fakeKubeClient, kaiConfig) Expect(err).To(BeNil()) - Expect(len(objects)).To(Equal(1)) // ServiceAccount only - }) - - It("should return ServiceAccount only when config is nil", func(ctx context.Context) { - kaiConfig.Spec.Prometheus = nil - objects, err := prometheus.DesiredState(ctx, fakeKubeClient, kaiConfig) - Expect(err).To(BeNil()) - Expect(len(objects)).To(Equal(1)) // ServiceAccount only + Expect(len(objects)).To(Equal(0)) }) }) @@ -358,20 +351,6 @@ var _ = Describe("prometheusForKAIConfig", func() { Expect(err).To(BeNil()) Expect(len(objects)).To(Equal(0)) }) - - It("should return empty objects list when config is nil", func(ctx context.Context) { - kaiConfig.Spec.Prometheus = nil - objects, err := prometheusForKAIConfig(ctx, fakeKubeClient, kaiConfig) - Expect(err).To(BeNil()) - Expect(len(objects)).To(Equal(0)) - }) - - It("should return empty objects list when enabled is nil", func(ctx context.Context) { - kaiConfig.Spec.Prometheus.Enabled = nil - objects, err := prometheusForKAIConfig(ctx, fakeKubeClient, kaiConfig) - Expect(err).To(BeNil()) - Expect(len(objects)).To(Equal(0)) - }) }) Context("when Prometheus is enabled", func() { diff --git a/pkg/operator/operands/prometheus/resources.go b/pkg/operator/operands/prometheus/resources.go index 7f1a09b1c..c402b8052 100644 --- a/pkg/operator/operands/prometheus/resources.go +++ b/pkg/operator/operands/prometheus/resources.go @@ -24,7 +24,7 @@ import ( ) const ( - mainResourceName = "kai" + mainResourceName = "prometheus" ) func prometheusForKAIConfig( @@ -33,23 +33,12 @@ func prometheusForKAIConfig( logger := log.FromContext(ctx) config := kaiConfig.Spec.Prometheus - // Check if Prometheus is enabled - if config == nil || config.Enabled == nil || !*config.Enabled { - logger.Info("Prometheus is disabled in configuration") - return []client.Object{}, nil - } - - // Check if external Prometheus URL is provided if config.ExternalPrometheusUrl != nil && *config.ExternalPrometheusUrl != "" { logger.Info("External Prometheus URL provided, skipping Prometheus CR creation", "url", *config.ExternalPrometheusUrl) - // For external Prometheus, we only create ServiceMonitors, not the Prometheus CR - // Note: Connectivity validation happens in the background monitoring goroutine, not here return createServiceMonitorsForExternalPrometheus(ctx, runtimeClient, kaiConfig) } - logger.Info("Prometheus is enabled, checking for Prometheus Operator installation") - // Check if Prometheus Operator is installed by looking for the Prometheus CRD // This is a simple check - in production you might want to check for the operator deployment hasPrometheusOperator, err := common.CheckPrometheusCRDsAvailable(ctx, runtimeClient, "prometheus") @@ -58,11 +47,11 @@ func prometheusForKAIConfig( return []client.Object{}, err } - // If Prometheus Operator is not installed, we can't create a Prometheus CR if !hasPrometheusOperator { logger.Info("Prometheus Operator not found - Prometheus CRD is not available") return []client.Object{}, nil } + prometheus, err := common.ObjectForKAIConfig(ctx, runtimeClient, &monitoringv1.Prometheus{}, mainResourceName, kaiConfig.Spec.Namespace) if err != nil { if !errors.IsNotFound(err) { @@ -70,6 +59,7 @@ func prometheusForKAIConfig( return nil, err } } + var ok bool prometheus, ok = prometheus.(*monitoringv1.Prometheus) if !ok { @@ -77,11 +67,7 @@ func prometheusForKAIConfig( return nil, fmt.Errorf("failed to cast object to Prometheus type") } - // Set the Prometheus spec from configuration - prometheusSpec := monitoringv1.PrometheusSpec{ - // Basic configuration required for Prometheus Operator to create pods - // Using minimal spec to avoid field name issues - } + prometheusSpec := monitoringv1.PrometheusSpec{} // Configure TSDB storage storageSize, err := config.CalculateStorageSize(ctx, runtimeClient) @@ -103,12 +89,10 @@ func prometheusForKAIConfig( }, } - // Set retention period if specified if config.RetentionPeriod != nil { prometheusSpec.Retention = monitoringv1.Duration(*config.RetentionPeriod) } - // Configure ServiceMonitor selector to match KAI ServiceMonitors if config.ServiceMonitor != nil && *config.ServiceMonitor.Enabled { prometheusSpec.ServiceMonitorSelector = &metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -118,8 +102,7 @@ func prometheusForKAIConfig( prometheusSpec.ServiceMonitorNamespaceSelector = &metav1.LabelSelector{} } - // Set the service account name in the Prometheus spec - prometheusSpec.ServiceAccountName = mainResourceName + "-prometheus" + prometheusSpec.ServiceAccountName = mainResourceName prometheus.(*monitoringv1.Prometheus).Spec = prometheusSpec return []client.Object{prometheus}, nil @@ -225,9 +208,8 @@ func serviceMonitorsForKAIConfig( func prometheusServiceAccountForKAIConfig( ctx context.Context, runtimeClient client.Reader, kaiConfig *kaiv1.Config, ) ([]client.Object, error) { - serviceAccountName := mainResourceName + "-prometheus" + serviceAccountName := mainResourceName - // Check if ServiceAccount already exists saObj, err := common.ObjectForKAIConfig(ctx, runtimeClient, &v1.ServiceAccount{}, serviceAccountName, kaiConfig.Spec.Namespace) if err != nil { return []client.Object{}, err diff --git a/pkg/operator/operands/queue_controller/queue_controller.go b/pkg/operator/operands/queue_controller/queue_controller.go index 2550aec15..93753262d 100644 --- a/pkg/operator/operands/queue_controller/queue_controller.go +++ b/pkg/operator/operands/queue_controller/queue_controller.go @@ -82,3 +82,7 @@ func (q *QueueController) Name() string { func (q *QueueController) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { return nil } + +func (q *QueueController) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} diff --git a/pkg/operator/operands/scheduler/scheduler.go b/pkg/operator/operands/scheduler/scheduler.go index f577916dd..2f916f903 100644 --- a/pkg/operator/operands/scheduler/scheduler.go +++ b/pkg/operator/operands/scheduler/scheduler.go @@ -85,6 +85,10 @@ func (s *SchedulerForShard) Monitor(ctx context.Context, runtimeReader client.Re return nil } +func (s *SchedulerForShard) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +} + func (s *SchedulerForShard) Name() string { return "SchedulerForShard" } @@ -129,3 +133,7 @@ func (s *SchedulerForConfig) Name() string { func (s *SchedulerForConfig) Monitor(ctx context.Context, runtimeReader client.Reader, kaiConfig *kaiv1.Config) error { return nil } + +func (s *SchedulerForConfig) HasMissingDependencies(context.Context, client.Reader, *kaiv1.Config) (string, error) { + return "", nil +}