From a7214d7f160a40dfbc147276e59747f9d9727cc5 Mon Sep 17 00:00:00 2001 From: Radhika Lakhtakia Date: Thu, 26 Jun 2025 19:16:09 +0000 Subject: [PATCH] Update lora affinity to be a scorer. --- cmd/epp/runner/runner.go | 4 +- .../framework/plugins/scorer/lora_affinity.go | 90 +++++++++ .../plugins/scorer/lora_affinity_test.go | 173 ++++++++++++++++++ 3 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 pkg/epp/scheduling/framework/plugins/scorer/lora_affinity.go create mode 100644 pkg/epp/scheduling/framework/plugins/scorer/lora_affinity_test.go diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index fee047ffd..b7e1fed38 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -311,10 +311,12 @@ func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) { if schedulerV2 { queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog) kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog) + loraAffinityScorerWeight := envutil.GetEnvInt("LORA_AFFINITY_SCORE_WEIGHT", scorer.DefaultLoraAffinityScorerWeight, setupLog) schedulerProfile := framework.NewSchedulerProfile(). WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight), - framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)). + framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight), + framework.NewWeightedScorer(scorer.NewLoraAffinityScorer(), loraAffinityScorerWeight)). WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints)) if prefixCacheScheduling { diff --git a/pkg/epp/scheduling/framework/plugins/scorer/lora_affinity.go b/pkg/epp/scheduling/framework/plugins/scorer/lora_affinity.go new file mode 100644 index 000000000..ed8822a59 --- /dev/null +++ b/pkg/epp/scheduling/framework/plugins/scorer/lora_affinity.go @@ -0,0 +1,90 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scorer + +import ( + "context" + "encoding/json" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +const ( + DefaultLoraAffinityScorerWeight = 1 + LoraAffinityScorerType = "lora-affinity" +) + +// compile-time type assertion +var _ framework.Scorer = &LoraAffinityScorer{} + +// LoraAffinityScorerFactory defines the factory function for LoraAffinityScorer. +func LoraAffinityScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewLoraAffinityScorer().WithName(name), nil +} + +// NewLoraAffinityScorer initializes a new LoraAffinityScorer and returns its pointer. +func NewLoraAffinityScorer() *LoraAffinityScorer { + return &LoraAffinityScorer{ + tn: plugins.TypedName{Type: LoraAffinityScorerType, Name: LoraAffinityScorerType}, + } +} + +// LoraAffinityScorer scores list of candidate pods based on Lora affinity and availability. +type LoraAffinityScorer struct { + tn plugins.TypedName +} + +// TypedName returns the type and name tuple of this plugin instance. +func (s *LoraAffinityScorer) TypedName() plugins.TypedName { + return s.tn +} + +// WithName sets the name of the scorer. +func (s *LoraAffinityScorer) WithName(name string) *LoraAffinityScorer { + s.tn.Name = name + return s +} + +func (s *LoraAffinityScorer) Score(_ context.Context, _ *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 { + scores := make(map[types.Pod]float64, len(pods)) + + // Assign a score to each pod for loading the target adapter. + for _, pod := range pods { + _, active := pod.GetMetrics().ActiveModels[request.TargetModel] + _, waiting := pod.GetMetrics().WaitingModels[request.TargetModel] + + // Determine the model server's suitability score based on adapter load status and capacity. + switch { + // Ideal: The adapter is already active on this model server. + case active: + scores[pod] = 1.0 + // Good: The model server has capacity to load at least one more adapter. + case len(pod.GetMetrics().ActiveModels)+len(pod.GetMetrics().WaitingModels) < pod.GetMetrics().MaxActiveModels: + scores[pod] = 0.8 + // Moderate: The adapter is already in the queue to be loaded on this model server. + case waiting: + scores[pod] = 0.6 + // Unsuitable: The model server has reached its maximum capacity and cannot load the adapter. + default: + scores[pod] = 0.0 + } + } + + return scores +} diff --git a/pkg/epp/scheduling/framework/plugins/scorer/lora_affinity_test.go b/pkg/epp/scheduling/framework/plugins/scorer/lora_affinity_test.go new file mode 100644 index 000000000..f184388ba --- /dev/null +++ b/pkg/epp/scheduling/framework/plugins/scorer/lora_affinity_test.go @@ -0,0 +1,173 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scorer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + k8stypes "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +func TestLoraAffinityScorer(t *testing.T) { + tests := []struct { + name string + request *types.LLMRequest + pods []types.Pod + expectedScoresPod map[string]float64 // Map of pod name to expected score + }{ + { + name: "Target model is active", + request: &types.LLMRequest{TargetModel: "active-model-1"}, + pods: []types.Pod{ + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-1": 1}, + WaitingModels: map[string]int{}, + MaxActiveModels: 5, + }, + }, + }, + expectedScoresPod: map[string]float64{ + "pod1": 1.0, + }, + }, + { + name: "Target model is waiting", + request: &types.LLMRequest{TargetModel: "active-model-1"}, + pods: []types.Pod{ + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-2": 2}, + WaitingModels: map[string]int{"active-model-1": 1}, + MaxActiveModels: 2, + }, + }, + }, + expectedScoresPod: map[string]float64{ + "pod1": 0.6, + }, + }, + { + name: "Pods have no space for new model", + request: &types.LLMRequest{TargetModel: "active-model-1"}, + pods: []types.Pod{ + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-2": 2}, + WaitingModels: map[string]int{"active-model-3": 1}, + MaxActiveModels: 2, + }, + }, + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{}, + WaitingModels: map[string]int{}, + MaxActiveModels: 0, + }, + }, + }, + expectedScoresPod: map[string]float64{ + "pod1": 0.0, + "pod2": 0.0, + }, + }, + { + name: "Multiple pods with mixed active and waiting models", + request: &types.LLMRequest{TargetModel: "active-model-1"}, + pods: []types.Pod{ + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-1": 1}, + WaitingModels: map[string]int{}, + MaxActiveModels: 5, + }, + }, + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-2": 4}, + WaitingModels: map[string]int{"active-model-1": 1}, + MaxActiveModels: 5, + }, + }, + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-2": 1}, + WaitingModels: map[string]int{}, + MaxActiveModels: 2, + }, + }, + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod4"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-3": 1}, + WaitingModels: map[string]int{"active-model-1": 1}, + MaxActiveModels: 2, + }, + }, + &types.PodMetrics{ + Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod5"}}, + MetricsState: &backendmetrics.MetricsState{ + ActiveModels: map[string]int{"active-model-4": 1, "active-model-5": 1}, + WaitingModels: map[string]int{}, + MaxActiveModels: 2, + }, + }, + }, + expectedScoresPod: map[string]float64{ + "pod1": 1.0, + "pod2": 0.8, + "pod3": 0.8, + "pod4": 0.6, + "pod5": 0.0, + }, + }, + { + name: "Empty pods slice", + request: &types.LLMRequest{TargetModel: "modelA"}, + pods: []types.Pod{}, + expectedScoresPod: map[string]float64{}, // No pods, no scores + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scorer := &LoraAffinityScorer{} + scores := scorer.Score(context.Background(), types.NewCycleState(), test.request, test.pods) + + for _, pod := range test.pods { + expectedScore, ok := test.expectedScoresPod[pod.GetPod().NamespacedName.Name] + if !ok { + t.Fatalf("Expected score not found for pod %s in test %s", pod.GetPod().NamespacedName, test.name) + } + assert.InDelta(t, expectedScore, scores[pod], 0.0001, "Pod %s should have score %f", pod.GetPod().NamespacedName.Name, expectedScore) + } + assert.Len(t, scores, len(test.expectedScoresPod), "Number of scored pods should match expected") + }) + } +}