diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 39337d41f..a80215ed5 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -239,20 +239,26 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"} } // primary profile is used to set destination - // TODO should use multiple destinations according to epp protocol. current code assumes a single target - targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod() - pool, err := d.datastore.PoolGet() if err != nil { return reqCtx, err } + targetPods := []*backend.Pod{} targetPort := int(pool.Spec.TargetPortNumber) + targetEndpoints := []string{} + + for _, pod := range result.ProfileResults[result.PrimaryProfileName].TargetPods { + curPod := pod.GetPod() + curEndpoint := net.JoinHostPort(curPod.Address, strconv.Itoa(targetPort)) + targetPods = append(targetPods, curPod) + targetEndpoints = append(targetEndpoints, curEndpoint) + } - endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort)) - logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod) + multiEndpointString := strings.Join(targetEndpoints, ",") + logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", multiEndpointString) - reqCtx.TargetPod = targetPod - reqCtx.TargetEndpoint = endpoint + reqCtx.TargetPod = targetPods[0] + reqCtx.TargetEndpoint = multiEndpointString d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort) @@ -274,6 +280,8 @@ func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestC Headers: reqCtx.Response.Headers, } + // TODO: to extend fallback functionality, handle cases where target pod is unavailable + // https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224 d.runPostResponsePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) return reqCtx, nil diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 814af12af..da9eaa4f6 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -19,6 +19,7 @@ package requestcontrol import ( "context" "errors" + "fmt" "testing" "time" @@ -29,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - k8stypes "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -109,26 +109,29 @@ func TestDirector_HandleRequest(t *testing.T) { }, } - // Pod setup - testPod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "default", - Labels: map[string]string{"app": "inference"}, - }, - Status: corev1.PodStatus{ - PodIP: "192.168.1.100", - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}, - }, - } scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() if err := ds.PoolSet(ctx, fakeClient, pool); err != nil { t.Fatalf("Error while setting inference pool: %v", err) } - ds.PodUpdateOrAddIfNotExist(testPod) + + for i := range 5 { + // Pod setup + testPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod%v", i+1), + Namespace: "default", + Labels: map[string]string{"app": "inference"}, + }, + Status: corev1.PodStatus{ + PodIP: fmt.Sprintf("192.168.%v.100", i+1), + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}, + }, + } + ds.PodUpdateOrAddIfNotExist(testPod) + } defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{ ProfileResults: map[string]*schedulingtypes.ProfileRunResult{ @@ -138,7 +141,23 @@ func TestDirector_HandleRequest(t *testing.T) { Pod: &schedulingtypes.PodMetrics{ Pod: &backend.Pod{ Address: "192.168.1.100", - NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"}, + NamespacedName: types.NamespacedName{Name: "pod1", Namespace: "default"}, + }, + }, + }, + &schedulingtypes.ScoredPod{ + Pod: &schedulingtypes.PodMetrics{ + Pod: &backend.Pod{ + Address: "192.168.2.100", + NamespacedName: types.NamespacedName{Name: "pod2", Namespace: "default"}, + }, + }, + }, + &schedulingtypes.ScoredPod{ + Pod: &schedulingtypes.PodMetrics{ + Pod: &backend.Pod{ + Address: "192.168.4.100", + NamespacedName: types.NamespacedName{Name: "pod4", Namespace: "default"}, }, }, }, @@ -174,7 +193,7 @@ func TestDirector_HandleRequest(t *testing.T) { NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, Address: "192.168.1.100", }, - TargetEndpoint: "192.168.1.100:8000", + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, wantMutatedBodyModel: model, }, @@ -199,7 +218,7 @@ func TestDirector_HandleRequest(t *testing.T) { NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, Address: "192.168.1.100", }, - TargetEndpoint: "192.168.1.100:8000", + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, wantMutatedBodyModel: model, }, @@ -228,7 +247,7 @@ func TestDirector_HandleRequest(t *testing.T) { NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, Address: "192.168.1.100", }, - TargetEndpoint: "192.168.1.100:8000", + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, wantMutatedBodyModel: model, }, @@ -249,7 +268,7 @@ func TestDirector_HandleRequest(t *testing.T) { NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, Address: "192.168.1.100", }, - TargetEndpoint: "192.168.1.100:8000", + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, wantMutatedBodyModel: modelSheddable, }, @@ -270,7 +289,7 @@ func TestDirector_HandleRequest(t *testing.T) { NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, Address: "192.168.1.100", }, - TargetEndpoint: "192.168.1.100:8000", + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, wantMutatedBodyModel: "resolved-target-model-A", }, @@ -286,7 +305,7 @@ func TestDirector_HandleRequest(t *testing.T) { NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, Address: "192.168.1.100", }, - TargetEndpoint: "192.168.1.100:8000", + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, wantMutatedBodyModel: "food-review-1", reqBodyMap: map[string]any{