diff --git a/pkg/controller/inference/service_controller.go b/pkg/controller/inference/service_controller.go index bc98b53e..b3c88217 100644 --- a/pkg/controller/inference/service_controller.go +++ b/pkg/controller/inference/service_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "sync" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -39,6 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" lws "sigs.k8s.io/lws/api/leaderworkerset/v1" applyconfigurationv1 "sigs.k8s.io/lws/client-go/applyconfiguration/leaderworkerset/v1" @@ -52,8 +54,10 @@ import ( // ServiceReconciler reconciles a Service object type ServiceReconciler struct { client.Client - Scheme *runtime.Scheme - Record record.EventRecorder + Scheme *runtime.Scheme + Record record.EventRecorder + GlobalConfigMutex sync.RWMutex + GlobalConfig *helper.GlobalConfig } func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *ServiceReconciler { @@ -86,24 +90,21 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct logger.V(10).Info("reconcile Service", "Service", klog.KObj(service)) - cm := &corev1.ConfigMap{} - if err := r.Get(ctx, types.NamespacedName{Name: "llmaz-global-config", Namespace: "llmaz-system"}, cm); err != nil { - if client.IgnoreNotFound(err) != nil { - return ctrl.Result{}, fmt.Errorf("failed to get llmaz-global-config configmap: %w", err) - } - } - configs, err := helper.ParseGlobalConfigmap(cm) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to parse global configurations: %w", err) + r.GlobalConfigMutex.RLock() + config := r.GlobalConfig + r.GlobalConfigMutex.RUnlock() + + if config == nil { + return ctrl.Result{}, fmt.Errorf("globel configs not init") } // Set the global configurations to the service. - if configs.SchedulerName != "" { + if config.SchedulerName != "" { if service.Spec.WorkloadTemplate.LeaderTemplate != nil && service.Spec.WorkloadTemplate.LeaderTemplate.Spec.SchedulerName == "" { - service.Spec.WorkloadTemplate.LeaderTemplate.Spec.SchedulerName = configs.SchedulerName + service.Spec.WorkloadTemplate.LeaderTemplate.Spec.SchedulerName = config.SchedulerName } if service.Spec.WorkloadTemplate.WorkerTemplate.Spec.SchedulerName == "" { - service.Spec.WorkloadTemplate.WorkerTemplate.Spec.SchedulerName = configs.SchedulerName + service.Spec.WorkloadTemplate.WorkerTemplate.Spec.SchedulerName = config.SchedulerName } if err := r.Client.Update(ctx, service); err != nil { @@ -156,9 +157,36 @@ func (r *ServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { return !reflect.DeepEqual(oldBar.Status, newBar.Status) }, })). + Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.updateGlobalConfig), + builder.WithPredicates(predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + cm := e.ObjectOld.(*corev1.ConfigMap) + return cm.Name == helper.GlobalConfigMapName && cm.Namespace == helper.GlobalConfigMapNamespace + }, + CreateFunc: func(e event.CreateEvent) bool { + cm := e.Object.(*corev1.ConfigMap) + return cm.Name == helper.GlobalConfigMapName && cm.Namespace == helper.GlobalConfigMapNamespace + }, + })). Complete(r) } +func (r *ServiceReconciler) updateGlobalConfig(ctx context.Context, obj client.Object) []reconcile.Request { + logger := log.FromContext(ctx) + cm := obj.(*corev1.ConfigMap) + + newConfig, err := helper.ParseGlobalConfigmap(cm) + if err != nil { + logger.Error(err, "failed to parse global config") + return nil + } + r.GlobalConfigMutex.Lock() + defer r.GlobalConfigMutex.Unlock() + r.GlobalConfig = newConfig + logger.Info("global config updated", "config", newConfig) + return nil +} + func buildWorkloadApplyConfiguration(service *inferenceapi.Service, models []*coreapi.OpenModel) *applyconfigurationv1.LeaderWorkerSetApplyConfiguration { workload := applyconfigurationv1.LeaderWorkerSet(service.Name, service.Namespace) diff --git a/pkg/controller_helper/configmap.go b/pkg/controller_helper/configmap.go index bd4d9f45..fc9ad587 100644 --- a/pkg/controller_helper/configmap.go +++ b/pkg/controller_helper/configmap.go @@ -23,22 +23,29 @@ import ( corev1 "k8s.io/api/core/v1" ) -type GlobalConfigs struct { +const ( + GlobalConfigMapName = "llmaz-global-config" + GlobalConfigMapNamespace = "llmaz-system" +) + +// GlobalConfig defines the global configuration parameters used across services. +// These configurations are typically provided via a ConfigMap named "llmaz-global-config" +type GlobalConfig struct { SchedulerName string `yaml:"scheduler-name"` InitContainerImage string `yaml:"init-container-image"` } -func ParseGlobalConfigmap(cm *corev1.ConfigMap) (*GlobalConfigs, error) { +func ParseGlobalConfigmap(cm *corev1.ConfigMap) (*GlobalConfig, error) { rawConfig, ok := cm.Data["config.data"] if !ok { return nil, fmt.Errorf("config.data not found in ConfigMap") } - var configs GlobalConfigs - err := yaml.Unmarshal([]byte(rawConfig), &configs) + var config GlobalConfig + err := yaml.Unmarshal([]byte(rawConfig), &config) if err != nil { return nil, fmt.Errorf("failed to unmarshal config.data: %v", err) } - return &configs, nil + return &config, nil }