diff --git a/config/crd/bases/inference.networking.k8s.io_inferencepools.yaml b/config/crd/bases/inference.networking.k8s.io_inferencepools.yaml new file mode 100644 index 000000000..694c084c9 --- /dev/null +++ b/config/crd/bases/inference.networking.k8s.io_inferencepools.yaml @@ -0,0 +1,285 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.1 + name: inferencepools.inference.networking.k8s.io +spec: + group: inference.networking.k8s.io + names: + kind: InferencePool + listKind: InferencePoolList + plural: inferencepools + singular: inferencepool + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + description: InferencePool is the Schema for the InferencePools API. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: InferencePoolSpec defines the desired state of InferencePool + properties: + extensionRef: + description: Extension configures an endpoint picker as an extension + service. + properties: + failureMode: + default: FailClose + description: |- + Configures how the gateway handles the case when the extension is not responsive. + Defaults to failClose. + enum: + - FailOpen + - FailClose + type: string + group: + default: "" + description: |- + Group is the group of the referent. + The default value is "", representing the Core API group. + maxLength: 253 + pattern: ^$|^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ + type: string + kind: + default: Service + description: |- + Kind is the Kubernetes resource kind of the referent. For example + "Service". + + Defaults to "Service" when not specified. + + ExternalName services can refer to CNAME DNS records that may live + outside of the cluster and as such are difficult to reason about in + terms of conformance. They also may not be safe to forward to (see + CVE-2021-25740 for more information). Implementations MUST NOT + support ExternalName Services. + maxLength: 63 + minLength: 1 + pattern: ^[a-zA-Z]([-a-zA-Z0-9]*[a-zA-Z0-9])?$ + type: string + name: + description: Name is the name of the referent. + maxLength: 253 + minLength: 1 + type: string + portNumber: + description: |- + The port number on the service running the extension. When unspecified, + implementations SHOULD infer a default value of 9002 when the Kind is + Service. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - name + type: object + selector: + additionalProperties: + description: |- + LabelValue is the value of a label. This is used for validation + of maps. This matches the Kubernetes label validation rules: + * must be 63 characters or less (can be empty), + * unless empty, must begin and end with an alphanumeric character ([a-z0-9A-Z]), + * could contain dashes (-), underscores (_), dots (.), and alphanumerics between. + + Valid values include: + + * MyValue + * my.name + * 123-my-value + maxLength: 63 + minLength: 0 + pattern: ^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$ + type: string + description: |- + Selector defines a map of labels to watch model server pods + that should be included in the InferencePool. + In some cases, implementations may translate this field to a Service selector, so this matches the simple + map used for Service selectors instead of the full Kubernetes LabelSelector type. + If sepecified, it will be applied to match the model server pods in the same namespace as the InferencePool. + Cross namesoace selector is not supported. + type: object + targetPortNumber: + description: |- + TargetPortNumber defines the port number to access the selected model servers. + The number must be in the range 1 to 65535. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + required: + - extensionRef + - selector + - targetPortNumber + type: object + status: + default: + parent: + - conditions: + - lastTransitionTime: "1970-01-01T00:00:00Z" + message: Waiting for controller + reason: Pending + status: Unknown + type: Accepted + parentRef: + kind: Status + name: default + description: Status defines the observed state of InferencePool. + properties: + parent: + description: |- + Parents is a list of parent resources (usually Gateways) that are + associated with the InferencePool, and the status of the InferencePool with respect to + each parent. + + A maximum of 32 Gateways will be represented in this list. When the list contains + `kind: Status, name: default`, it indicates that the InferencePool is not + associated with any Gateway and a controller must perform the following: + + - Remove the parent when setting the "Accepted" condition. + - Add the parent when the controller will no longer manage the InferencePool + and no other parents exist. + items: + description: PoolStatus defines the observed state of InferencePool + from a Gateway. + properties: + conditions: + default: + - lastTransitionTime: "1970-01-01T00:00:00Z" + message: Waiting for controller + reason: Pending + status: Unknown + type: Accepted + description: |- + Conditions track the state of the InferencePool. + + Known condition types are: + + * "Accepted" + * "ResolvedRefs" + items: + description: Condition contains details for one aspect of + the current state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, + Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + maxItems: 8 + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + parentRef: + description: GatewayRef indicates the gateway that observed + state of InferencePool. + properties: + group: + default: gateway.networking.k8s.io + description: Group is the group of the referent. + maxLength: 253 + pattern: ^$|^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ + type: string + kind: + default: Gateway + description: Kind is kind of the referent. For example "Gateway". + maxLength: 63 + minLength: 1 + pattern: ^[a-zA-Z]([-a-zA-Z0-9]*[a-zA-Z0-9])?$ + type: string + name: + description: Name is the name of the referent. + maxLength: 253 + minLength: 1 + type: string + namespace: + description: |- + Namespace is the namespace of the referent. If not present, + the namespace of the referent is assumed to be the same as + the namespace of the referring object. + maxLength: 63 + minLength: 1 + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$ + type: string + required: + - name + type: object + required: + - parentRef + type: object + maxItems: 32 + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/manifests/inferencepool-resources.yaml b/config/manifests/inferencepool-resources.yaml index 9bb3ea101..e3d32c5da 100644 --- a/config/manifests/inferencepool-resources.yaml +++ b/config/manifests/inferencepool-resources.yaml @@ -1,6 +1,8 @@ -# Note: If you change this file, please also change the file used for e2e tests! -# -# https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/test/testdata/inferencepool-e2e.yaml +# Note: If you change this file, please also change: +# - ./test/testdata/inferencepool-e2e.yaml +# - ./conformance/resources/manifests/manifests.yaml +# - ./site-src/guides/inferencepool-rollout.md +--- apiVersion: inference.networking.x-k8s.io/v1alpha2 kind: InferencePool metadata: diff --git a/conformance/resources/manifests/manifests.yaml b/conformance/resources/manifests/manifests.yaml index 5fbcfdc2d..9816a1f4e 100644 --- a/conformance/resources/manifests/manifests.yaml +++ b/conformance/resources/manifests/manifests.yaml @@ -213,9 +213,6 @@ spec: - "9003" - "-configFile" - "/config/conformance-plugins.yaml" - env: - - name: USE_STREAMING - value: "true" ports: - containerPort: 9002 - containerPort: 9003 @@ -310,9 +307,6 @@ spec: - "9003" - "-configFile" - "/config/conformance-plugins.yaml" - env: - - name: USE_STREAMING - value: "true" ports: - containerPort: 9002 - containerPort: 9003 @@ -342,7 +336,7 @@ apiVersion: v1 kind: ConfigMap metadata: name: plugins-config - namespace: default + namespace: gateway-conformance-app-backend data: conformance-plugins.yaml: | apiVersion: inference.networking.x-k8s.io/v1alpha1 diff --git a/go.mod b/go.mod index a0cc2e134..1a5f82446 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/stretchr/testify v1.10.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.15.0 + golang.org/x/sync v0.16.0 google.golang.org/grpc v1.73.0 google.golang.org/protobuf v1.36.6 k8s.io/api v0.33.2 diff --git a/go.sum b/go.sum index 9a7492158..e720e60be 100644 --- a/go.sum +++ b/go.sum @@ -245,8 +245,8 @@ golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKl golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/epp/flowcontrol/framework/doc.go b/pkg/epp/flowcontrol/framework/doc.go new file mode 100644 index 000000000..374c52a29 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/doc.go @@ -0,0 +1,32 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package framework defines the core plugin interfaces for extending the `controller.FlowController`. +// +// It establishes the contracts that custom logic, such as queueing disciplines and dispatching policies, must adhere +// to. By building on these interfaces, the Flow Control system can be extended and customized without modifying the +// core controller logic. +// +// The primary contracts are: +// - `SafeQueue`: An interface for concurrent-safe queue implementations. +// - `IntraFlowDispatchPolicy`: An interface for policies that decide which item to select from within a single flow's +// queue. +// - `ItemComparator`: An interface vended by policies to make their internal item-ordering logic explicit and +// available to other components. +// +// These components are linked by `QueueCapability`, which allows policies to declare their queue requirements (e.g., +// FIFO or priority-based ordering). +package framework diff --git a/pkg/epp/flowcontrol/framework/errors.go b/pkg/epp/flowcontrol/framework/errors.go new file mode 100644 index 000000000..e994aafa8 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/errors.go @@ -0,0 +1,44 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "errors" +) + +// `SafeQueue` Errors +// +// These errors relate to operations directly on a `SafeQueue` implementation. They are returned by `SafeQueue` methods +// and might be handled or wrapped by the `ports.FlowRegistry`'s `ports.ManagedQueue` or the +// `controller.FlowController`. +var ( + // ErrNilQueueItem indicates that a nil `types.QueueItemAccessor` was passed to `SafeQueue.Add()`. + ErrNilQueueItem = errors.New("queue item cannot be nil") + + // ErrQueueEmpty indicates an attempt to perform an operation on an empty `SafeQueue` that requires one or more items + // (e.g., calling `SafeQueue.PeekHead()`). + ErrQueueEmpty = errors.New("queue is empty") + + // ErrInvalidQueueItemHandle indicates that a `types.QueueItemHandle` provided to a `SafeQueue` operation (e.g., + // `SafeQueue.Remove()`) is not valid for that queue, has been invalidated, or does not correspond to an actual item + // in the queue. + ErrInvalidQueueItemHandle = errors.New("invalid queue item handle") + + // ErrQueueItemNotFound indicates that a `SafeQueue.Remove(handle)` operation did not find an item matching the + // provided, valid `types.QueueItemHandle`. This can occur if the item was removed by a concurrent operation. + ErrQueueItemNotFound = errors.New("queue item not found for the given handle") +) diff --git a/pkg/epp/flowcontrol/framework/mocks/mocks.go b/pkg/epp/flowcontrol/framework/mocks/mocks.go new file mode 100644 index 000000000..86a818951 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/mocks/mocks.go @@ -0,0 +1,65 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// MockItemComparator provides a mock implementation of the `framework.ItemComparator` interface. +type MockItemComparator struct { + FuncV framework.ItemComparatorFunc + ScoreTypeV string +} + +func (m *MockItemComparator) Func() framework.ItemComparatorFunc { return m.FuncV } +func (m *MockItemComparator) ScoreType() string { return m.ScoreTypeV } + +var _ framework.ItemComparator = &MockItemComparator{} + +// MockFlowQueueAccessor is a mock implementation of the `framework.FlowQueueAccessor` interface. +type MockFlowQueueAccessor struct { + NameV string + CapabilitiesV []framework.QueueCapability + LenV int + ByteSizeV uint64 + PeekHeadV types.QueueItemAccessor + PeekHeadErrV error + PeekTailV types.QueueItemAccessor + PeekTailErrV error + FlowSpecV types.FlowSpecification + ComparatorV framework.ItemComparator +} + +func (m *MockFlowQueueAccessor) Name() string { return m.NameV } +func (m *MockFlowQueueAccessor) Capabilities() []framework.QueueCapability { return m.CapabilitiesV } +func (m *MockFlowQueueAccessor) Len() int { return m.LenV } +func (m *MockFlowQueueAccessor) ByteSize() uint64 { return m.ByteSizeV } + +func (m *MockFlowQueueAccessor) PeekHead() (types.QueueItemAccessor, error) { + return m.PeekHeadV, m.PeekHeadErrV +} + +func (m *MockFlowQueueAccessor) PeekTail() (types.QueueItemAccessor, error) { + return m.PeekTailV, m.PeekTailErrV +} + +func (m *MockFlowQueueAccessor) Comparator() framework.ItemComparator { return m.ComparatorV } +func (m *MockFlowQueueAccessor) FlowSpec() types.FlowSpecification { return m.FlowSpecV } + +var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md new file mode 100644 index 000000000..1432db0b7 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md @@ -0,0 +1,63 @@ +# Flow Controller Intra-Flow Dispatch Policy Plugins + +This directory contains concrete implementations of the [`framework.IntraFlowDispatchPolicy`](../../../policies.go) +interface. These policies are responsible for **temporal scheduling**: determining the order in which requests are +selected for dispatch *from within a single flow's queue*. + +## Overview + +The `controller.FlowController` uses a two-tier policy system to manage requests. `framework.IntraFlowDispatchPolicy` +plugins represent the first tier, making tactical decisions about the ordering of requests *within* a single logical +flow (e.g., for a specific model or tenant). + +This contrasts with the `framework.InterFlowDispatchPolicy` (not yet implemented), which is responsible for +**spatial fairness**: deciding *which flow's queue* gets the next opportunity to dispatch a request. The +`framework.IntraFlowDispatchPolicy` only operates *after* the inter-flow policy has selected a specific queue. + +Key responsibilities and characteristics of a `framework.IntraFlowDispatchPolicy`: + +1. **Request Selection (`SelectItem`)**: The primary method, `SelectItem(queue framework.FlowQueueAccessor)`, inspects + the given flow's queue (via a read-only accessor) and decides which item, if any, should be dispatched next from + *that specific queue*. + +2. **Priority Definition (`ItemComparator`)**: + - This policy type is unique because it defines the nature of priority for items *within its specific managed + queue*. It makes this logic explicit by vending a [`framework.ItemComparator`](../../../policies.go). + - The vended comparator defines the "less than" relationship between two items and exposes a `ScoreType()` string + (e.g., `"enqueue_time_ns_asc"`, `"slo_deadline_urgency"`) that gives a semantic meaning to the comparison. + +3. **Queue Compatibility (`RequiredQueueCapabilities`)**: The policy specifies the capabilities its associated + [`framework.SafeQueue`](../../../queue.go) must support for it to function correctly. For example, a simple FCFS + policy would require `framework.CapabilityFIFO`, while a more complex, priority-based policy would require + `framework.CapabilityPriorityConfigurable`. The `ports.FlowRegistry` uses this information to pair policies with + compatible queues. + +The `framework.IntraFlowDispatchPolicy` allows for fine-grained control over how individual requests within a single flow are +serviced, enabling strategies like basic FCFS or more advanced schemes based on SLOs or deadlines. + +## Contributing a New `framework.IntraFlowDispatchPolicy` Implementation + +To contribute a new dispatch policy implementation, follow these steps: + +1. **Define Your Implementation** + - Create a new Go package in a subdirectory (e.g., `mycustompolicy/`). + - Implement the `framework.IntraFlowDispatchPolicy` interface. + - Ensure all methods are goroutine-safe if your policy maintains any internal state. + +2. **Register Your Policy** + - In an `init()` function within your policy's Go file, call [`MustRegisterPolicy()`](./factory.go) with a + unique name and a constructor function that matches the `PolicyConstructor` signature. + +3. **Add to the Functional Test** + - Add a blank import for your new package to [`functional_test.go`](./functional_test.go). Your policy will then + be automatically included in the functional test suite, which validates the basic + `framework.IntraFlowDispatchPolicy` contract (e.g., correct initialization, handling of nil/empty queues). + +4. **Add Policy-Specific Tests** + - The functional test suite only validates the universal contract. You MUST add a separate `_test.go` file within + your package to test the specific logic of your policy. + - For example, your tests should validate that your `Comparator()` works as expected and that `SelectItem()` + correctly implements your desired selection logic for a non-empty queue. + +5. **Documentation** + - Add a package-level GoDoc comment to your new policy's Go file, explaining its behavior and any trade-offs. diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go new file mode 100644 index 000000000..b55740cbc --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go @@ -0,0 +1,63 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package dispatch provides the factory and registration mechanism for all `framework.IntraFlowDispatchPolicy` +// implementations. +// It allows new policies to be added to the system and instantiated by name. +package dispatch + +import ( + "fmt" + "sync" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" +) + +// RegisteredPolicyName is the unique name under which a policy is registered. +type RegisteredPolicyName string + +// PolicyConstructor defines the function signature for creating a `framework.IntraFlowDispatchPolicy`. +type PolicyConstructor func() (framework.IntraFlowDispatchPolicy, error) + +var ( + // mu guards the registration map. + mu sync.RWMutex + // RegisteredPolicies stores the constructors for all registered policies. + RegisteredPolicies = make(map[RegisteredPolicyName]PolicyConstructor) +) + +// MustRegisterPolicy registers a policy constructor, and panics if the name is already registered. +// This is intended to be called from the `init()` function of a policy implementation. +func MustRegisterPolicy(name RegisteredPolicyName, constructor PolicyConstructor) { + mu.Lock() + defer mu.Unlock() + if _, ok := RegisteredPolicies[name]; ok { + panic(fmt.Sprintf("IntraFlowDispatchPolicy already registered with name %q", name)) + } + RegisteredPolicies[name] = constructor +} + +// NewPolicyFromName creates a new `IntraFlowDispatchPolicy` given its registered name. +// This is called by the `registry.FlowRegistry` when configuring a flow. +func NewPolicyFromName(name RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { + mu.RLock() + defer mu.RUnlock() + constructor, ok := RegisteredPolicies[name] + if !ok { + return nil, fmt.Errorf("no IntraFlowDispatchPolicy registered with name %q", name) + } + return constructor() +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go new file mode 100644 index 000000000..edcc02ac6 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go @@ -0,0 +1,104 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fcfs provides a First-Come, First-Served implementation of the `framework.IntraFlowDispatchPolicy`. +package fcfs + +import ( + "errors" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// FCFSPolicyName is the name of the FCFS policy implementation. +const FCFSPolicyName = "FCFS" + +func init() { + dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(FCFSPolicyName), + func() (framework.IntraFlowDispatchPolicy, error) { + return newFCFS(), nil + }) +} + +// fcfs (First-Come, First-Served) implements the `framework.IntraFlowDispatchPolicy` interface. +type fcfs struct { + comparator framework.ItemComparator +} + +// newFCFS creates a new `fcfs` policy instance. +func newFCFS() *fcfs { + return &fcfs{ + comparator: &enqueueTimeComparator{}, + } +} + +// Name returns the name of the policy. +func (p *fcfs) Name() string { + return FCFSPolicyName +} + +// SelectItem selects the next item from the queue by peeking its head. This implementation relies on the queue being +// ordered by dispatch preference, as indicated by its `RequiredQueueCapabilities`. +func (p *fcfs) SelectItem(queue framework.FlowQueueAccessor) (types.QueueItemAccessor, error) { + if queue == nil { + return nil, nil + } + item, err := queue.PeekHead() + if errors.Is(err, framework.ErrQueueEmpty) { + return nil, nil + } + return item, err +} + +// Comparator returns a `framework.ItemComparator` based on enqueue time. +func (p *fcfs) Comparator() framework.ItemComparator { + return p.comparator +} + +// RequiredQueueCapabilities specifies that this policy needs a queue that supports FIFO operations. +func (p *fcfs) RequiredQueueCapabilities() []framework.QueueCapability { + return []framework.QueueCapability{framework.CapabilityFIFO} +} + +// --- enqueueTimeComparator --- + +// enqueueTimeComparator implements `framework.ItemComparator` for FCFS logic. +// It prioritizes items with earlier enqueue times. +type enqueueTimeComparator struct{} + +// Func returns the comparison logic. +// It returns true if item 'a' should be dispatched before item 'b'. +func (c *enqueueTimeComparator) Func() framework.ItemComparatorFunc { + return func(a, b types.QueueItemAccessor) bool { + if a == nil && b == nil { + return false + } + if a == nil { // Treat nil as lowest priority + return false + } + if b == nil { // Treat non-nil 'a' as higher priority than nil 'b' + return true + } + return a.EnqueueTime().Before(b.EnqueueTime()) + } +} + +// ScoreType returns a string descriptor for the comparison logic. +func (c *enqueueTimeComparator) ScoreType() string { + return string(framework.EnqueueTimePriorityScoreType) +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go new file mode 100644 index 000000000..8a13c6c34 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fcfs + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" +) + +func TestFCFS_Name(t *testing.T) { + t.Parallel() + policy := newFCFS() + assert.Equal(t, FCFSPolicyName, policy.Name()) +} + +func TestFCFS_RequiredQueueCapabilities(t *testing.T) { + t.Parallel() + policy := newFCFS() + caps := policy.RequiredQueueCapabilities() + require.Len(t, caps, 1, "RequiredQueueCapabilities should return one capability") + assert.Equal(t, framework.CapabilityFIFO, caps[0], "Required capability should be FIFO") +} + +func TestFCFS_SelectItem(t *testing.T) { + t.Parallel() + // Note: The conformance suite validates the policy's contract for nil and empty queues. + // This unit test focuses on the policy-specific success path. + policy := newFCFS() + + mockItem := typesmocks.NewMockQueueItemAccessor(1, "item1", "flow1") + mockQueue := &frameworkmocks.MockFlowQueueAccessor{ + PeekHeadV: mockItem, + LenV: 1, + } + + item, err := policy.SelectItem(mockQueue) + require.NoError(t, err) + assert.Equal(t, mockItem, item, "Should return the item from the head of the queue") +} + +func TestEnqueueTimeComparator_Func(t *testing.T) { + t.Parallel() + comparator := &enqueueTimeComparator{} // Test the internal comparator directly + compareFunc := comparator.Func() + require.NotNil(t, compareFunc) + + now := time.Now() + itemA := typesmocks.NewMockQueueItemAccessor(10, "itemA", "test-flow") + itemA.EnqueueTimeV = now + + itemB := typesmocks.NewMockQueueItemAccessor(20, "itemB", "test-flow") + itemB.EnqueueTimeV = now.Add(time.Second) // B is later than A + + itemC := typesmocks.NewMockQueueItemAccessor(30, "itemC", "test-flow") + itemC.EnqueueTimeV = now // C is same time as A + + testCases := []struct { + name string + item1 types.QueueItemAccessor + item2 types.QueueItemAccessor + expected bool // true if item1 is higher priority (earlier) than item2 + }{ + {"A before B", itemA, itemB, true}, + {"B after A", itemB, itemA, false}, + {"A same as C (A not strictly before C)", itemA, itemC, false}, + {"C same as A (C not strictly before A)", itemC, itemA, false}, + {"A vs nil B (A is preferred)", itemA, nil, true}, + {"nil A vs B (B is preferred)", nil, itemB, false}, + {"nil A vs nil B (no preference)", nil, nil, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tc.expected, compareFunc(tc.item1, tc.item2)) + }) + } +} + +func TestEnqueueTimeComparator_ScoreType(t *testing.T) { + t.Parallel() + comparator := &enqueueTimeComparator{} + assert.Equal(t, string(framework.EnqueueTimePriorityScoreType), comparator.ScoreType()) +} diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go new file mode 100644 index 000000000..5a78d3ed5 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dispatch_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" + + _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" +) + +// TestIntraFlowDispatchPolicyConformance is the main conformance test suite for `framework.IntraFlowDispatchPolicy` +// implementations. +// It iterates over all policy implementations registered via `dispatch.MustRegisterPolicy` and runs a series of +// sub-tests to ensure they adhere to the `framework.IntraFlowDispatchPolicy` contract. +func TestIntraFlowDispatchPolicyConformance(t *testing.T) { + t.Parallel() + + for policyName, constructor := range dispatch.RegisteredPolicies { + t.Run(string(policyName), func(t *testing.T) { + t.Parallel() + + policy, err := constructor() + require.NoError(t, err, "Policy constructor for %s failed", policyName) + require.NotNil(t, policy, "Constructor for %s should return a non-nil policy instance", policyName) + + t.Run("Initialization", func(t *testing.T) { + t.Parallel() + assert.NotEmpty(t, policy.Name(), "Name() for %s should not be empty", policyName) + + comp := policy.Comparator() + require.NotNil(t, comp, "Comparator() for %s should not return nil", policyName) + assert.NotNil(t, comp.Func(), "Comparator().Func() for %s should not be nil", policyName) + assert.NotEmpty(t, comp.ScoreType(), "Comparator().ScoreType() for %s should not be empty", policyName) + + caps := policy.RequiredQueueCapabilities() + assert.NotNil(t, caps, "RequiredQueueCapabilities() for %s should not return a nil slice", policyName) + }) + + t.Run("SelectItemFromNilQueue", func(t *testing.T) { + t.Parallel() + item, err := policy.SelectItem(nil) + require.NoError(t, err, "SelectItem(nil) for %s should not return an error", policyName) + assert.Nil(t, item, "SelectItem(nil) for %s should return a nil item", policyName) + }) + + t.Run("SelectItemFromEmptyQueue", func(t *testing.T) { + t.Parallel() + mockQueue := &frameworkmocks.MockFlowQueueAccessor{ + PeekHeadErrV: framework.ErrQueueEmpty, + LenV: 0, + } + item, err := policy.SelectItem(mockQueue) + require.NoError(t, err, "SelectItem from an empty queue for %s should not return an error", policyName) + assert.Nil(t, item, "SelectItem from an empty queue for %s should return a nil item", policyName) + }) + }) + } +} diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/README.md b/pkg/epp/flowcontrol/framework/plugins/queue/README.md new file mode 100644 index 000000000..3632487fc --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/queue/README.md @@ -0,0 +1,113 @@ +# Flow Controller Queue Plugins (`plugins/queue/`) + +This directory contains concrete implementations of the [`framework.SafeQueue`](../../queue.go) interface. This contract +defines core, self-contained queue data structures used by the `controller.FlowController`. + +## Overview + +The `controller.FlowController` manages requests by organizing them into queues. Each logical "flow" within a given +priority band has its own `ports.ManagedQueue` instance, which wraps a `framework.SafeQueue`. This design allows the +`controller.FlowController` to apply policies at both the inter-flow (across different flows) and intra-flow (within a +single flow's queue) levels. + +The `framework.SafeQueue` interface abstracts the underlying data structure and its ordering logic. This pluggable +design allows for: + +- **Different Queuing Disciplines**: A basic FIFO queue ([`listqueue`](./listqueue/)) is provided, but other disciplines + like priority queues ([`maxminheap`](./maxminheap/)) can be used for more complex ordering requirements. +- **Specialized Capabilities**: Policies can declare `RequiredQueueCapabilities()` (e.g., `framework.CapabilityFIFO` or + `framework.CapabilityPriorityConfigurable`). The `ports.FlowRegistry` pairs the policy with a queue that provides the + necessary capabilities. +- **Performance Optimization**: Different queue implementations offer varying performance characteristics, which can be + compared using the centralized benchmark suite to select the best fit for a given workload. + +## Contributing a New `SafeQueue` Implementation + +To contribute a new queue implementation, follow these steps: + +1. **Define Your Implementation** + - Create a new Go package in a subdirectory (e.g., `mycustomqueue/`). + - Implement the `framework.SafeQueue` and `types.QueueItemHandle` interfaces. + - Ensure all methods of `framework.SafeQueue` are goroutine-safe, typically by using a `sync.Mutex` or + `sync.RWMutex`. + - If your queue declares `framework.CapabilityPriorityConfigurable`, it MUST use the + [`framework.ItemComparator`](../../policies.go) passed to its constructor for all internal ordering logic. + +2. **Register Your Queue** + - In an `init()` function within your queue's Go file, call [`queue.MustRegisterQueue()`](./factory.go) with a + unique name and a constructor function that matches the `queue.QueueConstructor` signature. + +3. **Add to the Functional Test** + - Add a blank import for your new package to [`functional_test.go`](./functional_test.go). Your queue will then be + automatically included in the functional test suite, which validates the `framework.SafeQueue` contract. + +4. **Documentation** + - Add GoDoc comments to your new queue type, explaining its behavior, capabilities, and any trade-offs. + +5. **Benchmarking** + - You do not need to write custom benchmarks. The centralized suite in [`benchmark_test.go`](./benchmark_test.go) + automatically includes any new queue implementation after it is registered. This ensures all queues are compared + fairly under the same conditions. + +## Benchmarking Strategy and Results + +A centralized benchmark suite runs against all registered `framework.SafeQueue` implementations to provide a consistent +performance comparison. To run the benchmarks, use the following command: + +```sh +go test -bench=. -benchmem ./pkg/epp/flowcontrol/framework/plugins/queue/... +``` + +### Benchmark Scenarios + +The suite includes the following scenarios: + +- **`AddRemove`**: Measures throughput of tightly coupled `Add` and `Remove` operations under high parallelism. This + tests the raw overhead of the data structure and its locking mechanism for simple, transactional workloads. +- **`AddPeekRemove`**: Measures performance of a sequential `Add` -> `PeekHead` -> `Remove` loop. This simulates a + common consumer pattern where a single worker inspects an item before processing it. +- **`BulkAddThenBulkRemove`**: Tests performance of adding a large batch of items and then removing them all. This can + reveal how the data structure's performance changes as it grows and shrinks under load. +- **`HighContention`**: Simulates a realistic workload with multiple concurrent producers (adding items) and consumers + (peeking and removing items) operating on the same queue. + +### Latest Results + +*Last Updated: 2025-07-10* +*(CPU: AMD EPYC 7B12)* + +| Benchmark | Implementation | Iterations | ns/op | B/op | allocs/op | +| --------------------------- | -------------- | ---------- | ------- | ----- | --------- | +| **AddRemove** | `ListQueue` | 1,889,844 | 609.0 | 224 | 5 | +| | `MaxMinHeap` | 1,660,987 | 696.7 | 184 | 4 | +| **AddPeekRemove** | `ListQueue` | 3,884,938 | 298.0 | 224 | 5 | +| | `MaxMinHeap` | 1,857,448 | 615.9 | 184 | 4 | +| **AddPeekTailRemove** | `ListQueue` | 3,576,487 | 308.4 | 224 | 5 | +| | `MaxMinHeap` | 2,113,134 | 535.3 | 184 | 4 | +| **BulkAddThenBulkRemove** | `ListQueue` | 24,032 | 49,861 | 24801 | 698 | +| | `MaxMinHeap` | 10,000 | 108,868 | 20787 | 597 | +| **HighContention** | `ListQueue` | 484,574 | 2,328 | 896 | 20 | +| | `MaxMinHeap` | 84,806 | 18,679 | 783 | 16 | + +### Interpretation of Results + +The benchmark results highlight the trade-offs between the different queue implementations based on their underlying +data structures: + +- **`ListQueue`**: As a linked list, it excels in scenarios involving frequent additions or removals from either end of + the queue (`AddPeekRemove`, `AddPeekTailRemove`), which are O(1) operations. Its performance is less competitive in high-contention and bulk scenarios, which reflects the necessary per-item memory allocation and pointer manipulation + overhead. +- **`MaxMinHeap`**: As a slice-based heap, it has a lower allocation overhead per operation, making it efficient for + high-throughput `AddRemove` cycles. Peeking and removing items involves maintaining the heap property, which has an + O(log n) cost, making individual peek operations slower than `ListQueue`. + +**Choosing a Queue:** + +The data suggests the following guidance: +- For simple **FIFO** workloads where the primary operations are consuming from the head, `ListQueue` is a strong and + simple choice. +- For workloads requiring **priority-based ordering** or those that are sensitive to allocation overhead under high + contention, `MaxMinHeap` is likely the more suitable option. + +These benchmarks provide a baseline for performance. The best choice for a specific use case will depend on the expected +workload patterns. diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go new file mode 100644 index 000000000..e1c9354b2 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/queue/benchmark_test.go @@ -0,0 +1,219 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue_test + +import ( + "fmt" + "sync" + "testing" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" +) + +// BenchmarkQueues runs a series of benchmarks against all registered queue implementations. +func BenchmarkQueues(b *testing.B) { + for queueName, constructor := range queue.RegisteredQueues { + b.Run(string(queueName), func(b *testing.B) { + // All queue implementations must support the default enqueue time comparator. + q, err := constructor(enqueueTimeComparator) + if err != nil { + b.Fatalf("Failed to construct queue '%s': %v", queueName, err) + } + + b.Run("AddRemove", func(b *testing.B) { + benchmarkAddRemove(b, q) + }) + + b.Run("AddPeekRemove", func(b *testing.B) { + benchmarkAddPeekRemove(b, q) + }) + + b.Run("AddPeekTailRemove", func(b *testing.B) { + benchmarkAddPeekTailRemove(b, q) + }) + + b.Run("BulkAddThenBulkRemove", func(b *testing.B) { + benchmarkBulkAddThenBulkRemove(b, q) + }) + + b.Run("HighContention", func(b *testing.B) { + benchmarkHighContention(b, q) + }) + }) + } +} + +// benchmarkAddRemove measures the throughput of tightly coupled Add and Remove operations in parallel. This is a good +// measure of the base overhead of the queue's data structure and locking mechanism. +func benchmarkAddRemove(b *testing.B, q framework.SafeQueue) { + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + _, _, err := q.Add(item) + if err != nil { + b.Fatalf("Add failed: %v", err) + } + _, _, _, err = q.Remove(item.Handle()) + if err != nil { + b.Fatalf("Remove failed: %v", err) + } + } + }) +} + +// benchmarkAddPeekRemove measures the throughput of a serial Add, PeekHead, and Remove sequence. This simulates a +// common consumer pattern where a single worker peeks at an item before deciding to process and remove it. +func benchmarkAddPeekRemove(b *testing.B, q framework.SafeQueue) { + // Pre-add one item so PeekHead doesn't fail on the first iteration. + initialItem := mocks.NewMockQueueItemAccessor(1, "initial", "benchmark-flow") + if _, _, err := q.Add(initialItem); err != nil { + b.Fatalf("Failed to add initial item: %v", err) + } + + b.ReportAllocs() + + for b.Loop() { + item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + _, _, err := q.Add(item) + if err != nil { + b.Fatalf("Add failed: %v", err) + } + + peeked, err := q.PeekHead() + if err != nil { + // In a concurrent benchmark, this could happen if the queue becomes empty. + // In a serial one, it's a fatal error. + b.Fatalf("PeekHead failed: %v", err) + } + + _, _, _, err = q.Remove(peeked.Handle()) + if err != nil { + b.Fatalf("Remove failed: %v", err) + } + } +} + +// benchmarkBulkAddThenBulkRemove measures performance of filling the queue up with a batch of items and then draining +// it. This can reveal performance characteristics related to how the data structure grows and shrinks. +func benchmarkBulkAddThenBulkRemove(b *testing.B, q framework.SafeQueue) { + b.ReportAllocs() + + for i := 0; b.Loop(); i++ { + // Add a batch of items + items := make([]types.QueueItemAccessor, 100) + for j := range items { + item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("bulk-%d-%d", i, j), "benchmark-flow") + items[j] = item + if _, _, err := q.Add(item); err != nil { + b.Fatalf("Add failed: %v", err) + } + } + + // Remove the same number of items + for range items { + peeked, err := q.PeekHead() + if err != nil { + b.Fatalf("PeekHead failed: %v", err) + } + if _, _, _, err := q.Remove(peeked.Handle()); err != nil { + b.Fatalf("Remove failed: %v", err) + } + } + } +} + +// benchmarkAddPeekTailRemove measures the throughput of a serial Add, PeekTail, and Remove sequence. This is useful for +// understanding the performance of accessing the lowest-priority item. +func benchmarkAddPeekTailRemove(b *testing.B, q framework.SafeQueue) { + // Pre-add one item so PeekTail doesn't fail on the first iteration. + initialItem := mocks.NewMockQueueItemAccessor(1, "initial", "benchmark-flow") + if _, _, err := q.Add(initialItem); err != nil { + b.Fatalf("Failed to add initial item: %v", err) + } + + b.ReportAllocs() + + for b.Loop() { + item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + _, _, err := q.Add(item) + if err != nil { + b.Fatalf("Add failed: %v", err) + } + + peeked, err := q.PeekTail() + if err != nil { + b.Fatalf("PeekTail failed: %v", err) + } + + _, _, _, err = q.Remove(peeked.Handle()) + if err != nil { + b.Fatalf("Remove failed: %v", err) + } + } +} + +// benchmarkHighContention simulates a more realistic workload with multiple producers and consumers operating on the +// queue concurrently. +func benchmarkHighContention(b *testing.B, q framework.SafeQueue) { + // Pre-fill the queue to ensure consumers have work to do immediately. + for i := range 1000 { + item := mocks.NewMockQueueItemAccessor(1, fmt.Sprintf("prefill-%d", i), "benchmark-flow") + if _, _, err := q.Add(item); err != nil { + b.Fatalf("Failed to pre-fill queue: %v", err) + } + } + + b.ReportAllocs() + b.ResetTimer() + + var wg sync.WaitGroup + // Producers + for range 4 { + wg.Add(1) + go func() { + defer wg.Done() + for b.Loop() { + item := mocks.NewMockQueueItemAccessor(1, "item", "benchmark-flow") + _, _, _ = q.Add(item) + } + }() + } + + // Consumers + for range 4 { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + peeked, err := q.PeekHead() + if err == nil { + _, _, _, _ = q.Remove(peeked.Handle()) + } + // Also peek tail to add more read contention + _, _ = q.PeekTail() + } + }() + } + + wg.Wait() +} diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/factory.go b/pkg/epp/flowcontrol/framework/plugins/queue/factory.go new file mode 100644 index 000000000..f0e9befee --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/queue/factory.go @@ -0,0 +1,63 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package queue provides the factory and registration mechanism for all `framework.SafeQueue` implementations. +// It allows new queues to be added to the system and instantiated by name. +package queue + +import ( + "fmt" + "sync" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" +) + +// RegisteredQueueName is the unique name under which a queue is registered. +type RegisteredQueueName string + +// QueueConstructor defines the function signature for creating a `framework.SafeQueue`. +type QueueConstructor func(comparator framework.ItemComparator) (framework.SafeQueue, error) + +var ( + // mu guards the registration map. + mu sync.RWMutex + // RegisteredQueues stores the constructors for all registered queues. + RegisteredQueues = make(map[RegisteredQueueName]QueueConstructor) +) + +// MustRegisterQueue registers a queue constructor, and panics if the name is already registered. +// This is intended to be called from init() functions. +func MustRegisterQueue(name RegisteredQueueName, constructor QueueConstructor) { + mu.Lock() + defer mu.Unlock() + if _, ok := RegisteredQueues[name]; ok { + panic(fmt.Sprintf("framework.SafeQueue already registered with name %q", name)) + } + RegisteredQueues[name] = constructor +} + +// NewQueueFromName creates a new SafeQueue given its registered name and the `framework.ItemComparator` that will be +// optionally used to configure the queue (provided it declares `framework.CapabilityPriorityConfigurable`). +// This is called by the `registry.FlowRegistry` during initialization of a flow's `ports.ManagedQueue`. +func NewQueueFromName(name RegisteredQueueName, comparator framework.ItemComparator) (framework.SafeQueue, error) { + mu.RLock() + defer mu.RUnlock() + constructor, ok := RegisteredQueues[name] + if !ok { + return nil, fmt.Errorf("no framework.SafeQueue registered with name %q", name) + } + return constructor(comparator) +} diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go new file mode 100644 index 000000000..dfdf2864e --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/queue/functional_test.go @@ -0,0 +1,607 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue_test + +import ( + "fmt" + "slices" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" + + _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue" + _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap" +) + +// enqueueTimeComparator orders items by their enqueue time (earlier first). +// Used as the default comparator for basic FIFO-like ordering tests. +var enqueueTimeComparator = &frameworkmocks.MockItemComparator{ + ScoreTypeV: "enqueue_time_ns_asc", + FuncV: func(a, b types.QueueItemAccessor) bool { + return a.EnqueueTime().Before(b.EnqueueTime()) + }, +} + +// byteSizeComparator orders items by their byte size (smaller first). +var byteSizeComparator = &frameworkmocks.MockItemComparator{ + ScoreTypeV: "byte_size_asc", + FuncV: func(a, b types.QueueItemAccessor) bool { + return a.OriginalRequest().ByteSize() < b.OriginalRequest().ByteSize() + }, +} + +// reverseEnqueueTimeComparator orders items by their enqueue time (later first - LIFO). +// Used to test CapabilityPriorityConfigurable queues with a non-FIFO ordering. +var reverseEnqueueTimeComparator = &frameworkmocks.MockItemComparator{ + ScoreTypeV: "enqueue_time_ns_desc", + FuncV: func(a, b types.QueueItemAccessor) bool { + return a.EnqueueTime().After(b.EnqueueTime()) + }, +} + +// testLifecycleAndOrdering is a helper function to execute a standard sequence of Add, PeekHead, and Remove operations +// on a queue. It verifies the queue's state (length, byte size) and item ordering based on the provided `itemsInOrder` +// slice, which should be pre-sorted according to the `comparatorName` (which is just a string for +// logging/identification). +// This function is crucial for testing different ordering logic (FIFO, custom priority). +func testLifecycleAndOrdering( + t *testing.T, + q framework.SafeQueue, + itemsInOrder []*typesmocks.MockQueueItemAccessor, + comparatorName string, +) { + t.Helper() + + // PeekHead/PeekTail on empty queue + peeked, err := q.PeekHead() + assert.ErrorIs(t, err, framework.ErrQueueEmpty, + "[%s] PeekHead on empty queue should return ErrQueueEmpty", comparatorName) + assert.Nil(t, peeked, "[%s] PeekHead on empty queue should return a nil item", comparatorName) + peeked, err = q.PeekTail() + assert.ErrorIs(t, err, framework.ErrQueueEmpty, + "[%s] PeekTail on empty queue should return ErrQueueEmpty", comparatorName) + assert.Nil(t, peeked, "[%s] PeekTail on empty queue should return a nil item", comparatorName) + + // Add items + currentExpectedLen := 0 + var currentExpectedByteSize uint64 + for i, item := range itemsInOrder { + newLen, newByteSize, addErr := q.Add(item) + require.NoError(t, addErr, "[%s] Add should not fail for a valid item (item %d, ID: %s)", + comparatorName, i, item.OriginalRequest().ID()) + require.NotNil(t, item.Handle(), "[%s] Add must assign a non-nil handle to the item (item %d, ID: %s)", + comparatorName, i, item.OriginalRequest().ID()) + require.False(t, item.Handle().IsInvalidated(), + "[%s] A new handle from Add must not be invalidated (item %d, ID: %s)", + comparatorName, i, item.OriginalRequest().ID()) + + currentExpectedLen++ + currentExpectedByteSize += item.OriginalRequest().ByteSize() + assert.Equal(t, uint64(currentExpectedLen), newLen, "[%s] Add must return the correct new length (item %d, ID: %s)", + comparatorName, i, item.OriginalRequest().ID()) + assert.Equal(t, currentExpectedByteSize, newByteSize, + "[%s] Add must return the correct new byte size (item %d, ID: %s)", + comparatorName, i, item.OriginalRequest().ID()) + } + + // Check final state after adds + initialLen := len(itemsInOrder) + var expectedTotalByteSize uint64 + for _, item := range itemsInOrder { + expectedTotalByteSize += item.OriginalRequest().ByteSize() + } + assert.Equal(t, initialLen, q.Len(), "[%s] Len() should return the correct count after all items are added", + comparatorName) + assert.Equal(t, expectedTotalByteSize, q.ByteSize(), + "[%s] ByteSize() should return the correct sum after all items are added", comparatorName) + + // Peek and Remove cycle to verify ordering + expectedLen := initialLen + expectedByteSize := expectedTotalByteSize + for i, expectedItem := range itemsInOrder { + // Verify PeekHead + peeked, err = q.PeekHead() + require.NoError(t, err, "[%s] PeekHead should not error on a non-empty queue (iteration %d)", comparatorName, i) + require.NotNil(t, peeked, "[%s] PeekHead should return a non-nil item (iteration %d)", comparatorName, i) + assert.Equal(t, expectedItem.OriginalRequest().ID(), peeked.OriginalRequest().ID(), + "[%s] PeekHead must return the item (ID: %s) at the head of the queue (iteration %d)", + comparatorName, expectedItem.OriginalRequest().ID(), i) + peekedHandle := peeked.Handle() + require.NotNil(t, peekedHandle, "[%s] Handle from a peeked item must not be nil (iteration %d)", comparatorName, i) + require.False(t, peekedHandle.IsInvalidated(), + "[%s] Handle from a peeked item must not be invalidated (iteration %d)", comparatorName, i) + assert.Equal(t, expectedLen, q.Len(), "[%s] Len() must be unchanged after PeekHead (iteration %d)", + comparatorName, i) + assert.Equal(t, expectedByteSize, q.ByteSize(), + "[%s] ByteSize() must be unchanged after PeekHead (iteration %d)", comparatorName, i) + + // Verify PeekTail + peekedTail, err := q.PeekTail() + require.NoError(t, err, "[%s] PeekTail should not error on a non-empty queue (iteration %d)", comparatorName, i) + require.NotNil(t, peekedTail, "[%s] PeekTail should return a non-nil item (iteration %d)", comparatorName, i) + // The tail is the last item in the *remaining* ordered slice. + expectedTailItem := itemsInOrder[len(itemsInOrder)-1] + assert.Equal(t, expectedTailItem.OriginalRequest().ID(), peekedTail.OriginalRequest().ID(), + "[%s] PeekTail must return the item with the lowest priority (iteration %d)", comparatorName, i) + + // Remove the head item + removed, newLen, newByteSize, removeErr := q.Remove(peekedHandle) + require.NoError(t, removeErr, "[%s] Remove with a valid handle should not fail (iteration %d, item ID: %s)", + comparatorName, i, expectedItem.OriginalRequest().ID()) + require.NotNil(t, removed, "[%s] Remove should return the removed item (iteration %d)", comparatorName, i) + assert.Equal(t, expectedItem.OriginalRequest().ID(), removed.OriginalRequest().ID(), + "[%s] Remove should return the correct item (iteration %d)", comparatorName, i) + assert.True(t, peekedHandle.IsInvalidated(), + "[%s] Remove must invalidate the handle of the removed item (iteration %d)", comparatorName, i) + + expectedLen-- + expectedByteSize -= removed.OriginalRequest().ByteSize() + assert.Equal(t, uint64(expectedLen), newLen, "[%s] Remove must return the correct new length (iteration %d)", + comparatorName, i) + assert.Equal(t, expectedByteSize, newByteSize, + "[%s] Remove must return the correct new byte size (iteration %d)", comparatorName, i) + assert.Equal(t, expectedLen, q.Len(), "[%s] Len() should be correctly updated after Remove (iteration %d)", + comparatorName, i) + assert.Equal(t, expectedByteSize, q.ByteSize(), + "[%s] ByteSize() should be correctly updated after Remove (iteration %d)", comparatorName, i) + } + + assert.Zero(t, q.Len(), "[%s] Queue length should be 0 after all items are removed", comparatorName) + assert.Zero(t, q.ByteSize(), "[%s] Queue byte size should be 0 after all items are removed", comparatorName) + + peeked, err = q.PeekHead() + assert.ErrorIs(t, err, framework.ErrQueueEmpty, "[%s] PeekHead on an empty queue should return ErrQueueEmpty again", + comparatorName) + assert.Nil(t, peeked, "[%s] PeekHead on an empty queue should return a nil item again", comparatorName) +} + +// TestQueueConformance is the main conformance test suite for `framework.SafeQueue` implementations. +// It iterates over all queue implementations registered via `queue.MustRegisterQueue` and runs a series of sub-tests to +// ensure they adhere to the `framework.SafeQueue` contract. +func TestQueueConformance(t *testing.T) { + t.Parallel() + + for queueName, constructor := range queue.RegisteredQueues { + t.Run(string(queueName), func(t *testing.T) { + t.Parallel() + flowSpec := &types.FlowSpecification{ID: "test-flow-1", Priority: 0} + + t.Run("Initialization", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue for test should not fail") + + require.NotNil(t, q, "Constructor should return a non-nil queue instance") + assert.Zero(t, q.Len(), "A new queue should have a length of 0") + assert.Zero(t, q.ByteSize(), "A new queue should have a byte size of 0") + assert.Equal(t, string(queueName), q.Name(), "Name() should return the registered name of the queue") + assert.NotNil(t, q.Capabilities(), "Capabilities() should not return a nil slice") + assert.NotEmpty(t, q.Capabilities(), "Capabilities() should return at least one capability") + }) + + t.Run("LifecycleAndOrdering_DefaultFIFO", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue with enqueueTimeComparator should not fail") + + now := time.Now() + + item1 := typesmocks.NewMockQueueItemAccessor(100, "item1_fifo", flowSpec.ID) + item1.EnqueueTimeV = now.Add(-2 * time.Second) // Earliest + item2 := typesmocks.NewMockQueueItemAccessor(50, "item2_fifo", flowSpec.ID) + item2.EnqueueTimeV = now.Add(-1 * time.Second) // Middle + item3 := typesmocks.NewMockQueueItemAccessor(20, "item3_fifo", flowSpec.ID) + item3.EnqueueTimeV = now // Latest + + itemsInFIFOOrder := []*typesmocks.MockQueueItemAccessor{item1, item2, item3} + testLifecycleAndOrdering(t, q, itemsInFIFOOrder, "DefaultFIFO") + }) + + qForCapCheck, err := constructor(enqueueTimeComparator) + if err == nil && slices.Contains(qForCapCheck.Capabilities(), framework.CapabilityPriorityConfigurable) { + t.Run("LifecycleAndOrdering_PriorityConfigurable_ByteSize", func(t *testing.T) { + t.Parallel() + q, err := constructor(byteSizeComparator) + require.NoError(t, err, "Setup: creating queue with byteSizeComparator should not fail") + + itemLarge := typesmocks.NewMockQueueItemAccessor(100, "itemLarge_prio", flowSpec.ID) + itemSmall := typesmocks.NewMockQueueItemAccessor(20, "itemSmall_prio", flowSpec.ID) + itemMedium := typesmocks.NewMockQueueItemAccessor(50, "itemMedium_prio", flowSpec.ID) + + itemsInByteSizeOrder := []*typesmocks.MockQueueItemAccessor{itemSmall, itemMedium, itemLarge} + testLifecycleAndOrdering(t, q, itemsInByteSizeOrder, "PriorityByteSize") + }) + + t.Run("LifecycleAndOrdering_PriorityConfigurable_LIFO", func(t *testing.T) { + t.Parallel() + q, err := constructor(reverseEnqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue with reverseEnqueueTimeComparator should not fail") + + now := time.Now() + item1 := typesmocks.NewMockQueueItemAccessor(100, "item1_lifo", flowSpec.ID) + item1.EnqueueTimeV = now.Add(-2 * time.Second) // Earliest + item2 := typesmocks.NewMockQueueItemAccessor(50, "item2_lifo", flowSpec.ID) + item2.EnqueueTimeV = now.Add(-1 * time.Second) // Middle + item3 := typesmocks.NewMockQueueItemAccessor(20, "item3_lifo", flowSpec.ID) + item3.EnqueueTimeV = now // Latest + + itemsInLIFOOrder := []*typesmocks.MockQueueItemAccessor{item3, item2, item1} + testLifecycleAndOrdering(t, q, itemsInLIFOOrder, "PriorityLIFO") + }) + } + + t.Run("Add_NilItem", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue for test should not fail") + + currentLen := q.Len() + currentByteSize := q.ByteSize() + newLen, newByteSize, err := q.Add(nil) + assert.ErrorIs(t, err, framework.ErrNilQueueItem, "Add(nil) must return ErrNilQueueItem") + assert.Equal(t, uint64(currentLen), newLen, "Add(nil) must not change the length returned") + assert.Equal(t, currentByteSize, newByteSize, "Add(nil) must not change the byte size returned") + assert.Equal(t, currentLen, q.Len(), "The queue's length must not change after a failed Add") + assert.Equal(t, currentByteSize, q.ByteSize(), "The queue's byte size must not change after a failed Add") + }) + + t.Run("Remove_InvalidHandle", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue for test should not fail") + + item := typesmocks.NewMockQueueItemAccessor(100, "item", flowSpec.ID) + _, _, err = q.Add(item) + require.NoError(t, err, "Setup: adding an item should succeed") + + otherQ, err := constructor(enqueueTimeComparator) // A different queue instance + require.NoError(t, err, "Setup: creating otherQ should succeed") + otherItem := typesmocks.NewMockQueueItemAccessor(10, "other_item", "other_flow") + _, _, err = otherQ.Add(otherItem) + require.NoError(t, err, "Setup: adding item to otherQ should succeed") + alienHandle := otherItem.Handle() + require.NotNil(t, alienHandle, "Setup: alien handle should not be nil") + + invalidatedHandle := &typesmocks.MockQueueItemHandle{} + invalidatedHandle.Invalidate() + + foreignHandle := &typesmocks.MockQueueItemHandle{} // Different type + + testCases := []struct { + name string + handle types.QueueItemHandle + expectErr error + }{ + {name: "nil handle", handle: nil, expectErr: framework.ErrInvalidQueueItemHandle}, + {name: "invalidated handle", handle: invalidatedHandle, expectErr: framework.ErrInvalidQueueItemHandle}, + {name: "alien handle from other queue", handle: alienHandle, expectErr: framework.ErrQueueItemNotFound}, + {name: "foreign handle type", handle: foreignHandle, expectErr: framework.ErrInvalidQueueItemHandle}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + currentLen := q.Len() + currentByteSize := q.ByteSize() + + _, newLen, newByteSize, removeErr := q.Remove(tc.handle) + assert.ErrorIs(t, removeErr, tc.expectErr, "Remove with %s should produce %v", tc.name, tc.expectErr) + assert.Equal(t, uint64(currentLen), newLen, "Remove with %s must not change the length returned", tc.name) + assert.Equal(t, currentByteSize, newByteSize, "Remove with %s must not change the byte size returned", + tc.name) + assert.Equal(t, currentLen, q.Len(), "The queue's length must not change after a failed Remove with %s", + tc.name) + assert.Equal(t, currentByteSize, q.ByteSize(), + "The queue's byte size must not change after a failed Remove with %s", tc.name) + }) + } + }) + + t.Run("Remove_NonHead", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue for test should not fail") + + now := time.Now() + item1 := typesmocks.NewMockQueueItemAccessor(10, "item1_nonhead", flowSpec.ID) + item1.EnqueueTimeV = now.Add(-3 * time.Second) + item2 := typesmocks.NewMockQueueItemAccessor(20, "item2_nonhead_TARGET", flowSpec.ID) + item2.EnqueueTimeV = now.Add(-2 * time.Second) + item3 := typesmocks.NewMockQueueItemAccessor(30, "item3_nonhead", flowSpec.ID) + item3.EnqueueTimeV = now.Add(-1 * time.Second) + + _, _, _ = q.Add(item1) + _, _, _ = q.Add(item2) + _, _, _ = q.Add(item3) + require.Equal(t, 3, q.Len(), "Queue should have 3 items before removing non-head") + handleNonHead := item2.Handle() + + removed, newLen, newByteSize, err := q.Remove(handleNonHead) + require.NoError(t, err, "It should be possible to remove an item that is not the head") + require.NotNil(t, removed, "Remove should return the removed item") + assert.Equal(t, item2.OriginalRequest().ID(), removed.OriginalRequest().ID(), + "Remove should return the correct item (item2)") + assert.True(t, handleNonHead.IsInvalidated(), "Remove must invalidate the handle of the removed item") + assert.Equal(t, uint64(2), newLen, "Remove must return the correct new length (2)") + assert.Equal(t, item1.OriginalRequest().ByteSize()+item3.OriginalRequest().ByteSize(), newByteSize, + "Remove must return the correct new byte size") + assert.Equal(t, 2, q.Len(), "Queue length should be 2 after removing non-head") + + // Attempt to remove again with the now-stale handle + _, _, _, errStaleNonHead := q.Remove(handleNonHead) + assert.ErrorIs(t, errStaleNonHead, framework.ErrInvalidQueueItemHandle, + "Removing with a stale handle must fail with ErrInvalidQueueItemHandle") + }) + + predicateRemoveOddSizes := func(item types.QueueItemAccessor) bool { + return item.OriginalRequest().ByteSize()%2 != 0 + } + + t.Run("Cleanup_EmptyQueue", func(t *testing.T) { + t.Parallel() + emptyQ, _ := constructor(enqueueTimeComparator) + cleanedItems, err := emptyQ.Cleanup(predicateRemoveOddSizes) + require.NoError(t, err, "Cleanup on an empty queue should not return an error") + assert.Empty(t, cleanedItems, "Cleanup on an empty queue should return an empty slice") + assert.Zero(t, emptyQ.Len(), "Len() should be 0 after Cleanup on an empty queue") + assert.Zero(t, emptyQ.ByteSize(), "ByteSize() should be 0 after Cleanup on an empty queue") + }) + + t.Run("Cleanup_PredicateMatchesNone", func(t *testing.T) { + t.Parallel() + q, _ := constructor(enqueueTimeComparator) + itemK1 := typesmocks.NewMockQueueItemAccessor(10, "k1_matchNone", flowSpec.ID) + itemK2 := typesmocks.NewMockQueueItemAccessor(12, "k2_matchNone", flowSpec.ID) + _, _, _ = q.Add(itemK1) + _, _, _ = q.Add(itemK2) + initialLen := q.Len() + initialBs := q.ByteSize() + + cleanedItems, err := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) + require.NoError(t, err, "Cleanup should not return an error") + assert.Empty(t, cleanedItems, "Cleanup should return an empty slice when no items match the predicate") + assert.Equal(t, initialLen, q.Len(), "Len() should not change after Cleanup when no items match thepredicate") + assert.Equal(t, initialBs, q.ByteSize(), + "ByteSize() should not change after Cleanup when no items match the predicate") + assert.False(t, itemK1.Handle().IsInvalidated(), "Handle for kept item 1 must NOT be invalidated") + assert.False(t, itemK2.Handle().IsInvalidated(), "Handle for kept item 2 must NOT be invalidated") + }) + + t.Run("Cleanup_PredicateMatchesAll", func(t *testing.T) { + t.Parallel() + q, _ := constructor(enqueueTimeComparator) + itemR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_matchAll", flowSpec.ID) + itemR2 := typesmocks.NewMockQueueItemAccessor(13, "r2_matchAll", flowSpec.ID) + _, _, _ = q.Add(itemR1) + _, _, _ = q.Add(itemR2) + + cleanedItems, err := q.Cleanup(func(item types.QueueItemAccessor) bool { return true }) + require.NoError(t, err, "Cleanup should not return an error") + assert.Len(t, cleanedItems, 2, "Cleanup should return all items that matched the predicate") + assert.Zero(t, q.Len(), "Len() should be 0 after Cleanup") + assert.Zero(t, q.ByteSize(), "ByteSize() should be 0 after Cleanup") + assert.True(t, itemR1.Handle().IsInvalidated(), "Handle for removed item 1 must be invalidated") + assert.True(t, itemR2.Handle().IsInvalidated(), "Handle for removed item 2 must be invalidated") + }) + + t.Run("Cleanup_PredicateMatchesSubset_VerifyHandles", func(t *testing.T) { + t.Parallel() + q, _ := constructor(enqueueTimeComparator) + iK1 := typesmocks.NewMockQueueItemAccessor(20, "k1_subset", flowSpec.ID) + iR1 := typesmocks.NewMockQueueItemAccessor(11, "r1_subset", flowSpec.ID) + iK2 := typesmocks.NewMockQueueItemAccessor(22, "k2_subset", flowSpec.ID) + iR2 := typesmocks.NewMockQueueItemAccessor(33, "r2_subset", flowSpec.ID) + _, _, _ = q.Add(iK1) + _, _, _ = q.Add(iR1) + _, _, _ = q.Add(iK2) + _, _, _ = q.Add(iR2) + + expectedKeptByteSize := iK1.OriginalRequest().ByteSize() + iK2.OriginalRequest().ByteSize() + + cleanedItems, err := q.Cleanup(predicateRemoveOddSizes) + require.NoError(t, err, "Cleanup should not return an error") + assert.Len(t, cleanedItems, 2, "Cleanup should return 2 items that matched the predicate") + assert.Equal(t, 2, q.Len(), "Len() should be 2 after Cleanup") + assert.Equal(t, expectedKeptByteSize, q.ByteSize(), "ByteSize() should be sum of kept items after Cleanup") + + foundR1, foundR2 := false, false + for _, item := range cleanedItems { + if item.OriginalRequest().ID() == iR1.OriginalRequest().ID() { + foundR1 = true + assert.True(t, iR1.Handle().IsInvalidated(), "Handle for removed item iR1 must be invalidated") + } + if item.OriginalRequest().ID() == iR2.OriginalRequest().ID() { + foundR2 = true + assert.True(t, iR2.Handle().IsInvalidated(), "Handle for removed item iR2 must be invalidated") + } + } + assert.True(t, foundR1, "iR1 should have been returned by Cleanup") + assert.True(t, foundR2, "iR2 should have been returned by Cleanup") + + assert.False(t, iK1.Handle().IsInvalidated(), "Handle for kept item iK1 must NOT be invalidated") + assert.False(t, iK2.Handle().IsInvalidated(), "Handle for kept item iK2 must NOT be invalidated") + + // Verify remaining items are correct + var remainingIDs []string + for q.Len() > 0 { + peeked, _ := q.PeekHead() + item, _, _, _ := q.Remove(peeked.Handle()) + remainingIDs = append(remainingIDs, item.OriginalRequest().ID()) + } + sort.Strings(remainingIDs) // Sort for stable comparison + expectedRemainingIDs := []string{iK1.OriginalRequest().ID(), iK2.OriginalRequest().ID()} + sort.Strings(expectedRemainingIDs) + assert.Equal(t, expectedRemainingIDs, remainingIDs, "Remaining items in queue are not as expected") + }) + + t.Run("Drain_NonEmptyQueue_VerifyHandles", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue for drain test should not fail") + + itemD1 := typesmocks.NewMockQueueItemAccessor(10, "ditem1", flowSpec.ID) + itemD2 := typesmocks.NewMockQueueItemAccessor(20, "ditem2", flowSpec.ID) + _, _, _ = q.Add(itemD1) + _, _, _ = q.Add(itemD2) + + drainedItems, err := q.Drain() + require.NoError(t, err, "Drain on a non-empty queue should not fail") + assert.Len(t, drainedItems, 2, "Drain should return all items that were in the queue") + assert.Zero(t, q.Len(), "Queue length must be 0 after Drain") + assert.Zero(t, q.ByteSize(), "Queue byte size must be 0 after Drain") + + assert.True(t, itemD1.Handle().IsInvalidated(), "Handle for drained itemD1 must be invalidated") + assert.True(t, itemD2.Handle().IsInvalidated(), "Handle for drained itemD2 must be invalidated") + + var foundD1, foundD2 bool + for _, item := range drainedItems { + if item.OriginalRequest().ID() == itemD1.OriginalRequest().ID() { + foundD1 = true + } + if item.OriginalRequest().ID() == itemD2.OriginalRequest().ID() { + foundD2 = true + } + } + assert.True(t, foundD1, "itemD1 should be in drainedItems") + assert.True(t, foundD2, "itemD2 should be in drainedItems") + }) + + t.Run("Drain_EmptyQueue_DrainTwice", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue for empty drain test should not fail") + + drainedItems, err := q.Drain() // First drain on empty + require.NoError(t, err, "Drain on an empty queue should not fail") + assert.Empty(t, drainedItems, "Drain on an empty queue should return an empty slice") + + drainedAgain, err := q.Drain() // Second drain on already empty + require.NoError(t, err, "Second drain on an already empty queue should not fail") + assert.Empty(t, drainedAgain, "Second drain on an already empty queue should return an empty slice") + assert.Zero(t, q.Len()) + assert.Zero(t, q.ByteSize()) + }) + + t.Run("Concurrency", func(t *testing.T) { + t.Parallel() + q, err := constructor(enqueueTimeComparator) + require.NoError(t, err, "Setup: creating queue for concurrency test should not fail") + + const ( + numGoroutines = 10 + initialItems = 200 + opsPerGoroutine = 50 + ) + + // handleChan acts as a concurrent-safe pool of handles that goroutines can pull from to test Remove. + handleChan := make(chan types.QueueItemHandle, initialItems+(numGoroutines*opsPerGoroutine)) + + // Pre-populate the queue with an initial set of items. + for i := 0; i < initialItems; i++ { + item := typesmocks.NewMockQueueItemAccessor(1, fmt.Sprintf("%s_conc_init_%d", flowSpec.ID, i), flowSpec.ID) + _, _, err := q.Add(item) + require.NoError(t, err, "Setup: pre-populating the queue should not fail") + handleChan <- item.Handle() + } + + var wg sync.WaitGroup + wg.Add(numGoroutines) + var successfulAdds, successfulRemoves atomic.Uint64 + + // Start goroutines to perform a mix of concurrent operations. + for i := range numGoroutines { + go func(routineID int) { + defer wg.Done() + for j := 0; j < opsPerGoroutine; j++ { + opType := (j + routineID) % 4 // Vary operations more across goroutines + switch opType { + case 0: // Add + item := typesmocks.NewMockQueueItemAccessor(1, + fmt.Sprintf("%s_conc_init_%d_%d", flowSpec.ID, routineID, j), flowSpec.ID) + _, _, err := q.Add(item) + if assert.NoError(t, err, "Add must be goroutine-safe") { + successfulAdds.Add(1) + handleChan <- item.Handle() + } + case 1: // Remove + select { + case handle := <-handleChan: + if handle != nil && !handle.IsInvalidated() { // Check before trying to prevent known-to-fail calls + _, _, _, removeErr := q.Remove(handle) + if removeErr == nil { + successfulRemoves.Add(1) + } else { + // It's okay if it's ErrInvalidQueueItemHandle or ErrQueueItemNotFound due to races + assert.ErrorIs(t, removeErr, framework.ErrInvalidQueueItemHandle, + "Expected invalid handle or not found if raced") + } + } + default: + // No handles available to remove + } + case 2: // Inspect + _ = q.Len() + _ = q.ByteSize() + _, err := q.PeekHead() + if q.Len() == 0 { // Only expect ErrQueueEmpty if Len is 0 + assert.ErrorIs(t, err, framework.ErrQueueEmpty, "Peek on empty queue expected ErrQueueEmpty") + } + _, err = q.PeekTail() + if q.Len() == 0 { // Only expect ErrQueueEmpty if Len is 0 + assert.ErrorIs(t, err, framework.ErrQueueEmpty, "PeekTail on empty queue expected ErrQueueEmpty") + } + case 3: // Cleanup + _, cleanupErr := q.Cleanup(func(item types.QueueItemAccessor) bool { return false }) + assert.NoError(t, cleanupErr, "Cleanup (no-op) should be goroutine-safe") + } + } + }(i) + } + + wg.Wait() + close(handleChan) + + // Drain the queue to verify all handles are invalidated and to count remaining items accurately. + drainedItems, drainErr := q.Drain() + require.NoError(t, drainErr, "Draining queue at the end of concurrency test should not fail") + + for _, item := range drainedItems { + require.True(t, item.Handle().IsInvalidated(), "All handles from final drain must be invalidated") + } + + // The number of items successfully added minus those successfully removed should equal the number of items + // drained. + assert.Equal(t, int(initialItems)+int(successfulAdds.Load())-int(successfulRemoves.Load()), len(drainedItems), + "Number of items drained (%d) should match initial (%d) + successful adds (%d) - successful removes (%d).", + len(drainedItems), initialItems, successfulAdds.Load(), successfulRemoves.Load()) + + assert.Zero(t, q.Len(), "Queue length should be 0 after final drain") + assert.Zero(t, q.ByteSize(), "Queue byte size should be 0 after final drain") + }) + }) + } +} diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go new file mode 100644 index 000000000..6a1e2207f --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/queue/listqueue/listqueue.go @@ -0,0 +1,217 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package listqueue provides a simple, concurrent-safe queue implementation using a standard library +// `container/list.List` as the underlying data structure for FIFO (First-In, First-Out) behavior. +package listqueue + +import ( + "container/list" + "sync" + "sync/atomic" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// ListQueueName is the name of the list queue implementation. +const ListQueueName = "ListQueue" + +func init() { + queue.MustRegisterQueue(queue.RegisteredQueueName(ListQueueName), + func(_ framework.ItemComparator) (framework.SafeQueue, error) { + // The list queue is a simple FIFO queue and does not use a comparator. + return newListQueue(), nil + }) +} + +// listQueue implements the `framework.SafeQueue` interface using a standard `container/list.List` for FIFO behavior. +// This implementation is concurrent-safe. +type listQueue struct { + requests *list.List + byteSize atomic.Uint64 + mu sync.RWMutex +} + +// listItemHandle is the concrete type for `types.QueueItemHandle` used by `listQueue`. +// It wraps the `list.Element` and includes a pointer to the owning `listQueue` for validation. +type listItemHandle struct { + element *list.Element + owner *listQueue + isInvalidated bool +} + +// Handle returns the underlying queue-specific raw handle. +func (lh *listItemHandle) Handle() any { + return lh.element +} + +// Invalidate marks this handle instance as no longer valid for future operations. +func (lh *listItemHandle) Invalidate() { + lh.isInvalidated = true +} + +// IsInvalidated returns true if this handle instance has been marked as invalid. +func (lh *listItemHandle) IsInvalidated() bool { + return lh.isInvalidated +} + +var _ types.QueueItemHandle = &listItemHandle{} + +// newListQueue creates a new `listQueue` instance. +func newListQueue() *listQueue { + return &listQueue{ + requests: list.New(), + } +} + +// --- `framework.SafeQueue` Interface Implementation --- + +// Add enqueues an item to the back of the list. +func (lq *listQueue) Add(item types.QueueItemAccessor) (newLen, newByteSize uint64, err error) { + lq.mu.Lock() + defer lq.mu.Unlock() + + if item == nil { + return uint64(lq.requests.Len()), lq.byteSize.Load(), framework.ErrNilQueueItem + } + + element := lq.requests.PushBack(item) + lq.byteSize.Add(item.OriginalRequest().ByteSize()) + item.SetHandle(&listItemHandle{element: element, owner: lq}) + return uint64(lq.requests.Len()), lq.byteSize.Load(), nil +} + +// Remove removes an item identified by the given handle from the queue. +func (lq *listQueue) Remove( + handle types.QueueItemHandle, +) (removedItem types.QueueItemAccessor, newLen, newByteSize uint64, err error) { + lq.mu.Lock() + defer lq.mu.Unlock() + + currentLen := uint64(lq.requests.Len()) + currentByteSize := lq.byteSize.Load() + + if handle == nil || handle.IsInvalidated() { + return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + } + + lh, ok := handle.(*listItemHandle) + if !ok { + return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + } + + if lh.owner != lq { + return nil, currentLen, currentByteSize, framework.ErrQueueItemNotFound + } + + item := lh.element.Value.(types.QueueItemAccessor) + lq.requests.Remove(lh.element) + lq.byteSize.Add(^item.OriginalRequest().ByteSize() + 1) // Atomic subtraction + handle.Invalidate() + return item, uint64(lq.requests.Len()), lq.byteSize.Load(), nil +} + +// Cleanup removes items from the queue that satisfy the predicate. +func (lq *listQueue) Cleanup(predicate framework.PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) { + lq.mu.Lock() + defer lq.mu.Unlock() + + var removedItems []types.QueueItemAccessor + var next *list.Element + + for e := lq.requests.Front(); e != nil; e = next { + next = e.Next() // Get next before potentially removing e + + item := e.Value.(types.QueueItemAccessor) + if predicate(item) { + lq.requests.Remove(e) + lq.byteSize.Add(^item.OriginalRequest().ByteSize() + 1) // Atomic subtraction + if itemHandle := item.Handle(); itemHandle != nil { + itemHandle.Invalidate() + } + removedItems = append(removedItems, item) + } + } + return removedItems, nil +} + +// Drain removes all items from the queue and returns them. +func (lq *listQueue) Drain() (removedItems []types.QueueItemAccessor, err error) { + lq.mu.Lock() + defer lq.mu.Unlock() + + removedItems = make([]types.QueueItemAccessor, 0, lq.requests.Len()) + + for e := lq.requests.Front(); e != nil; e = e.Next() { + item := e.Value.(types.QueueItemAccessor) + removedItems = append(removedItems, item) + if handle := item.Handle(); handle != nil { + handle.Invalidate() + } + } + + lq.requests.Init() + lq.byteSize.Store(0) + return removedItems, nil +} + +// Name returns the name of the queue. +func (lq *listQueue) Name() string { + return ListQueueName +} + +// Capabilities returns the capabilities of the queue. +func (lq *listQueue) Capabilities() []framework.QueueCapability { + return []framework.QueueCapability{framework.CapabilityFIFO} +} + +// Len returns the number of items in the queue. +func (lq *listQueue) Len() int { + lq.mu.RLock() + defer lq.mu.RUnlock() + return lq.requests.Len() +} + +// ByteSize returns the total byte size of all items in the queue. +func (lq *listQueue) ByteSize() uint64 { + return lq.byteSize.Load() +} + +// PeekHead returns the item at the front of the queue without removing it. +func (lq *listQueue) PeekHead() (head types.QueueItemAccessor, err error) { + lq.mu.RLock() + defer lq.mu.RUnlock() + + if lq.requests.Len() == 0 { + return nil, framework.ErrQueueEmpty + } + element := lq.requests.Front() + return element.Value.(types.QueueItemAccessor), nil +} + +// PeekTail returns the item at the back of the queue without removing it. +func (lq *listQueue) PeekTail() (tail types.QueueItemAccessor, err error) { + lq.mu.RLock() + defer lq.mu.RUnlock() + + if lq.requests.Len() == 0 { + return nil, framework.ErrQueueEmpty + } + element := lq.requests.Back() + return element.Value.(types.QueueItemAccessor), nil +} diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go new file mode 100644 index 000000000..3d3dae727 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap.go @@ -0,0 +1,490 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package maxminheap provides a concurrent-safe, priority queue implementation using a max-min heap. +// +// A max-min heap is a binary tree structure that maintains a specific ordering property: for any node, if it is at an +// even level (e.g., 0, 2, ...), its value is greater than all values in its subtree (max level). If it is at an odd +// level (e.g., 1, 3, ...), its value is smaller than all values in its subtree (min level). This structure allows for +// efficient O(1) retrieval of both the maximum and minimum priority items. +// +// The core heap maintenance logic (up, down, and grandchild finding) is adapted from the public domain implementation +// at https://github.com/esote/minmaxheap, which is licensed under CC0-1.0. +package maxminheap + +import ( + "math" + "sync" + "sync/atomic" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// MaxMinHeapName is the name of the max-min heap queue implementation. +const MaxMinHeapName = "MaxMinHeap" + +func init() { + queue.MustRegisterQueue(queue.RegisteredQueueName(MaxMinHeapName), + func(comparator framework.ItemComparator) (framework.SafeQueue, error) { + return newMaxMinHeap(comparator), nil + }) +} + +// maxMinHeap implements the `framework.SafeQueue` interface using a max-min heap. +// The heap is ordered by the provided comparator, with higher values considered higher priority. +// This implementation is concurrent-safe. +type maxMinHeap struct { + items []types.QueueItemAccessor + handles map[types.QueueItemHandle]*heapItem + byteSize atomic.Uint64 + mu sync.RWMutex + comparator framework.ItemComparator +} + +// heapItem is an internal struct to hold an item and its index in the heap. +// This allows for O(log n) removal of items from the queue. +type heapItem struct { + item types.QueueItemAccessor + index int + isInvalidated bool +} + +// Handle returns the heap item itself, which is used as the handle. +func (h *heapItem) Handle() any { + return h +} + +// Invalidate marks the handle as invalid. +func (h *heapItem) Invalidate() { + h.isInvalidated = true +} + +// IsInvalidated returns true if the handle has been invalidated. +func (h *heapItem) IsInvalidated() bool { + return h.isInvalidated +} + +var _ types.QueueItemHandle = &heapItem{} + +// newMaxMinHeap creates a new max-min heap with the given comparator. +func newMaxMinHeap(comparator framework.ItemComparator) *maxMinHeap { + return &maxMinHeap{ + items: make([]types.QueueItemAccessor, 0), + handles: make(map[types.QueueItemHandle]*heapItem), + comparator: comparator, + } +} + +// --- `framework.SafeQueue` Interface Implementation --- + +// Name returns the name of the queue. +func (h *maxMinHeap) Name() string { + return MaxMinHeapName +} + +// Capabilities returns the capabilities of the queue. +func (h *maxMinHeap) Capabilities() []framework.QueueCapability { + return []framework.QueueCapability{framework.CapabilityPriorityConfigurable} +} + +// Len returns the number of items in the queue. +func (h *maxMinHeap) Len() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.items) +} + +// ByteSize returns the total byte size of all items in the queue. +func (h *maxMinHeap) ByteSize() uint64 { + return h.byteSize.Load() +} + +// PeekHead returns the item with the highest priority (max value) without removing it. +// Time complexity: O(1). +func (h *maxMinHeap) PeekHead() (types.QueueItemAccessor, error) { + h.mu.RLock() + defer h.mu.RUnlock() + + if len(h.items) == 0 { + return nil, framework.ErrQueueEmpty + } + // The root of the max-min heap is always the maximum element. + return h.items[0], nil +} + +// PeekTail returns the item with the lowest priority (min value) without removing it. +// Time complexity: O(1). +func (h *maxMinHeap) PeekTail() (types.QueueItemAccessor, error) { + h.mu.RLock() + defer h.mu.RUnlock() + + n := len(h.items) + if n == 0 { + return nil, framework.ErrQueueEmpty + } + if n == 1 { + return h.items[0], nil + } + if n == 2 { + // With two items, the root is max, the second is min. + return h.items[1], nil + } + + // With three or more items, the minimum element is guaranteed to be one of the two children of the root (at indices 1 + // and 2). We must compare them to find the true minimum. + if h.comparator.Func()(h.items[1], h.items[2]) { + return h.items[2], nil + } + return h.items[1], nil +} + +// Add adds an item to the queue. +// Time complexity: O(log n). +func (h *maxMinHeap) Add(item types.QueueItemAccessor) (uint64, uint64, error) { + h.mu.Lock() + defer h.mu.Unlock() + + if item == nil { + return uint64(len(h.items)), h.byteSize.Load(), framework.ErrNilQueueItem + } + + h.push(item) + h.byteSize.Add(item.OriginalRequest().ByteSize()) + return uint64(len(h.items)), h.byteSize.Load(), nil +} + +// push adds an item to the heap and restores the heap property. +func (h *maxMinHeap) push(item types.QueueItemAccessor) { + heapItem := &heapItem{item: item, index: len(h.items)} + h.items = append(h.items, item) + item.SetHandle(heapItem) + h.handles[item.Handle()] = heapItem + h.up(len(h.items) - 1) +} + +// up moves the item at index i up the heap to its correct position. +func (h *maxMinHeap) up(i int) { + if i == 0 { + return + } + + parentIndex := (i - 1) / 2 + if isMinLevel(i) { + // Current node is on a min level, parent is on a max level. + // If the current node is greater than its parent, they are in the wrong order. + if h.comparator.Func()(h.items[i], h.items[parentIndex]) { + h.swap(i, parentIndex) + // After swapping, the new parent (originally at i) might be larger than its ancestors. + h.upMax(parentIndex) + } else { + // The order with the parent is correct, but it might be smaller than a grandparent. + h.upMin(i) + } + } else { // On a max level + // Current node is on a max level, parent is on a min level. + // If the current node is smaller than its parent, they are in the wrong order. + if h.comparator.Func()(h.items[parentIndex], h.items[i]) { + h.swap(i, parentIndex) + // After swapping, the new parent (originally at i) might be smaller than its ancestors. + h.upMin(parentIndex) + } else { + // The order with the parent is correct, but it might be larger than a grandparent. + h.upMax(i) + } + } +} + +// upMin moves an item up the min levels of the heap. +func (h *maxMinHeap) upMin(i int) { + // Bubble up on min levels by comparing with grandparents. + for { + parentIndex := (i - 1) / 2 + if parentIndex == 0 { + break + } + grandparentIndex := (parentIndex - 1) / 2 + // If the item is smaller than its grandparent, swap them. + if h.comparator.Func()(h.items[grandparentIndex], h.items[i]) { + h.swap(i, grandparentIndex) + i = grandparentIndex + } else { + break + } + } +} + +// upMax moves an item up the max levels of the heap. +func (h *maxMinHeap) upMax(i int) { + // Bubble up on max levels by comparing with grandparents. + for { + parentIndex := (i - 1) / 2 + if parentIndex == 0 { + break + } + grandparentIndex := (parentIndex - 1) / 2 + // If the item is larger than its grandparent, swap them. + if h.comparator.Func()(h.items[i], h.items[grandparentIndex]) { + h.swap(i, grandparentIndex) + i = grandparentIndex + } else { + break + } + } +} + +// swap swaps two items in the heap and updates their handles. +func (h *maxMinHeap) swap(i, j int) { + h.items[i], h.items[j] = h.items[j], h.items[i] + h.handles[h.items[i].Handle()].index = i + h.handles[h.items[j].Handle()].index = j +} + +// Remove removes an item from the queue. +// Time complexity: O(log n). +func (h *maxMinHeap) Remove(handle types.QueueItemHandle) (types.QueueItemAccessor, uint64, uint64, error) { + h.mu.Lock() + defer h.mu.Unlock() + + currentLen := uint64(len(h.items)) + currentByteSize := h.byteSize.Load() + + if handle == nil { + return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + } + + if handle.IsInvalidated() { + return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + } + + heapItem, ok := handle.(*heapItem) + if !ok { + return nil, currentLen, currentByteSize, framework.ErrInvalidQueueItemHandle + } + + // Now we can check if the handle is in the map + _, ok = h.handles[handle] + if !ok { + return nil, currentLen, currentByteSize, framework.ErrQueueItemNotFound + } + + i := heapItem.index + item := h.items[i] + n := len(h.items) - 1 + + if i < n { + // Swap the item to be removed with the last item. + h.swap(i, n) + // Remove the last item (which is the one we wanted to remove). + h.items = h.items[:n] + delete(h.handles, handle) + h.byteSize.Add(^item.OriginalRequest().ByteSize() + 1) // Atomic subtraction + + // The swapped item at index i might violate the heap property, so we must restore it + // by bubbling the item down the heap. + h.down(i) + } else { + // It's the last item, just remove it. + h.items = h.items[:n] + delete(h.handles, handle) + h.byteSize.Add(^item.OriginalRequest().ByteSize() + 1) // Atomic subtraction + } + + handle.Invalidate() + return item, uint64(len(h.items)), h.byteSize.Load(), nil +} + +// down moves the item at index i down the heap to its correct position. +func (h *maxMinHeap) down(i int) { + if isMinLevel(i) { + h.downMin(i) + } else { + h.downMax(i) + } +} + +// downMin moves an item down the min levels of the heap. +func (h *maxMinHeap) downMin(i int) { + for { + m := h.findSmallestChildOrGrandchild(i) + if m == -1 { + break + } + + // If the smallest descendant is smaller than the current item, swap them. + if h.comparator.Func()(h.items[i], h.items[m]) { + h.swap(i, m) + parentOfM := (m - 1) / 2 + // If m was a grandchild, it might be larger than its new parent. + if parentOfM != i { + if h.comparator.Func()(h.items[m], h.items[parentOfM]) { + h.swap(m, parentOfM) + } + } + i = m + } else { + break + } + } +} + +// downMax moves an item down the max levels of the heap. +func (h *maxMinHeap) downMax(i int) { + for { + m := h.findLargestChildOrGrandchild(i) + if m == -1 { + break + } + + // If the largest descendant is larger than the current item, swap them. + if h.comparator.Func()(h.items[m], h.items[i]) { + h.swap(i, m) + parentOfM := (m - 1) / 2 + // If m was a grandchild, it might be smaller than its new parent. + if parentOfM != i { + if h.comparator.Func()(h.items[parentOfM], h.items[m]) { + h.swap(m, parentOfM) + } + } + i = m + } else { + break + } + } +} + +// findSmallestChildOrGrandchild finds the index of the smallest child or grandchild of i. +func (h *maxMinHeap) findSmallestChildOrGrandchild(i int) int { + leftChild := 2*i + 1 + if leftChild >= len(h.items) { + return -1 // No descendants + } + + m := leftChild // Start with the left child as the smallest. + + // Compare with right child. + rightChild := 2*i + 2 + if rightChild < len(h.items) && h.comparator.Func()(h.items[m], h.items[rightChild]) { + m = rightChild + } + + // Compare with grandchildren. + grandchildStart := 2*leftChild + 1 + grandchildEnd := grandchildStart + 4 + for j := grandchildStart; j < grandchildEnd && j < len(h.items); j++ { + if h.comparator.Func()(h.items[m], h.items[j]) { + m = j + } + } + return m +} + +// findLargestChildOrGrandchild finds the index of the largest child or grandchild of i. +func (h *maxMinHeap) findLargestChildOrGrandchild(i int) int { + leftChild := 2*i + 1 + if leftChild >= len(h.items) { + return -1 // No descendants + } + + m := leftChild // Start with the left child as the largest. + + // Compare with right child. + rightChild := 2*i + 2 + if rightChild < len(h.items) && h.comparator.Func()(h.items[rightChild], h.items[m]) { + m = rightChild + } + + // Compare with grandchildren. + grandchildStart := 2*leftChild + 1 + grandchildEnd := grandchildStart + 4 + for j := grandchildStart; j < grandchildEnd && j < len(h.items); j++ { + if h.comparator.Func()(h.items[j], h.items[m]) { + m = j + } + } + return m +} + +// isMinLevel checks if the given index is on a min level of the heap. +func isMinLevel(i int) bool { + // The level is the floor of log2(i+1). + // Levels are 0-indexed. 0, 2, 4... are max levels. 1, 3, 5... are min levels. + // An integer is on a min level if its level number is odd. + level := int(math.Log2(float64(i + 1))) + return level%2 != 0 +} + +// Cleanup removes items from the queue that satisfy the predicate. +func (h *maxMinHeap) Cleanup(predicate framework.PredicateFunc) ([]types.QueueItemAccessor, error) { + h.mu.Lock() + defer h.mu.Unlock() + + var removedItems []types.QueueItemAccessor + var itemsToKeep []types.QueueItemAccessor + + for _, item := range h.items { + if predicate(item) { + removedItems = append(removedItems, item) + handle := item.Handle() + if handle != nil { + handle.Invalidate() + delete(h.handles, handle) + } + h.byteSize.Add(^item.OriginalRequest().ByteSize() + 1) // Atomic subtraction + } else { + itemsToKeep = append(itemsToKeep, item) + } + } + + if len(removedItems) > 0 { + h.items = itemsToKeep + // Re-establish the heap property on the remaining items. + // First, update all the indices in the handles map. + for i, item := range h.items { + h.handles[item.Handle()].index = i + } + // Then, starting from the last non-leaf node, trickle down to fix the heap. + for i := len(h.items)/2 - 1; i >= 0; i-- { + h.down(i) + } + } + + return removedItems, nil +} + +// Drain removes all items from the queue. +func (h *maxMinHeap) Drain() ([]types.QueueItemAccessor, error) { + h.mu.Lock() + defer h.mu.Unlock() + + drainedItems := make([]types.QueueItemAccessor, len(h.items)) + copy(drainedItems, h.items) + + // Invalidate all handles. + for _, item := range h.items { + if handle := item.Handle(); handle != nil { + handle.Invalidate() + } + } + + // Clear the internal state. + h.items = make([]types.QueueItemAccessor, 0) + h.handles = make(map[types.QueueItemHandle]*heapItem) + h.byteSize.Store(0) + + return drainedItems, nil +} diff --git a/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go new file mode 100644 index 000000000..defbf0f40 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/plugins/queue/maxminheap/maxminheap_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package maxminheap + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/require" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + typesmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" +) + +// enqueueTimeComparator orders items by their enqueue time (earlier first). +var enqueueTimeComparator = &mocks.MockItemComparator{ + ScoreTypeV: "enqueue_time_ns_asc", + FuncV: func(a, b types.QueueItemAccessor) bool { + return a.EnqueueTime().After(b.EnqueueTime()) + }, +} + +// TestMaxMinHeap_InternalProperty validates that the max-min heap property is maintained after a series of `Add` and +// `Remove` operations. This is a white-box test to ensure the internal data structure is always in a valid state. +func TestMaxMinHeap_InternalProperty(t *testing.T) { + t.Parallel() + q := newMaxMinHeap(enqueueTimeComparator) + + items := make([]*typesmocks.MockQueueItemAccessor, 20) + now := time.Now() + for i := range items { + // Add items in a somewhat random order of enqueue times + items[i] = typesmocks.NewMockQueueItemAccessor(10, "item", "flow") + items[i].EnqueueTimeV = now.Add(time.Duration((i%5-2)*10) * time.Second) + _, _, err := q.Add(items[i]) + require.NoError(t, err, "Add should not fail") + assertHeapProperty(t, q, "after adding item %d", i) + } + + // Remove a few items from the middle and validate the heap property + for _, i := range []int{15, 7, 11} { + handle := items[i].Handle() + _, _, _, err := q.Remove(handle) + require.NoError(t, err, "Remove should not fail for item %d", i) + assertHeapProperty(t, q, "after removing item %d", i) + } + + // Remove remaining items from the head and validate each time + for q.Len() > 0 { + head, err := q.PeekHead() + require.NoError(t, err) + _, _, _, err = q.Remove(head.Handle()) + require.NoError(t, err) + assertHeapProperty(t, q, "after removing head item") + } +} + +// assertHeapProperty checks if the slice of items satisfies the max-min heap property. +func assertHeapProperty(t *testing.T, h *maxMinHeap, msgAndArgs ...any) { + t.Helper() + if len(h.items) > 0 { + verifyNode(t, h, 0, msgAndArgs...) + } +} + +// verifyNode recursively checks that the subtree at index `i` satisfies the max-min heap property. +func verifyNode(t *testing.T, h *maxMinHeap, i int, msgAndArgs ...any) { + t.Helper() + n := len(h.items) + if i >= n { + return + } + + level := int(math.Floor(math.Log2(float64(i + 1)))) + isMinLevel := level%2 != 0 + + leftChild := 2*i + 1 + rightChild := 2*i + 2 + + // Check children + if leftChild < n { + if isMinLevel { + require.False(t, h.comparator.Func()(h.items[i], h.items[leftChild]), + "min-level node %d has child %d with smaller value. %v", i, leftChild, msgAndArgs) + } else { // isMaxLevel + require.False(t, h.comparator.Func()(h.items[leftChild], h.items[i]), + "max-level node %d has child %d with larger value. %v", i, leftChild, msgAndArgs) + } + verifyNode(t, h, leftChild, msgAndArgs...) + } + + if rightChild < n { + if isMinLevel { + require.False(t, h.comparator.Func()(h.items[i], h.items[rightChild]), + "min-level node %d has child %d with smaller value. %v", i, rightChild, msgAndArgs) + } else { // isMaxLevel + require.False(t, h.comparator.Func()(h.items[rightChild], h.items[i]), + "max-level node %d has child %d with larger value. %v", i, rightChild, msgAndArgs) + } + verifyNode(t, h, rightChild, msgAndArgs...) + } +} diff --git a/pkg/epp/flowcontrol/framework/policies.go b/pkg/epp/flowcontrol/framework/policies.go new file mode 100644 index 000000000..14cbc5df4 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/policies.go @@ -0,0 +1,142 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + +// PriorityScoreType is a descriptor for the domain of a policy's item comparator. +type PriorityScoreType string + +const ( + // EnqueueTimePriorityScoreType indicates that the priority is based on the item's enqueue time, with earlier times + // being higher priority. + EnqueueTimePriorityScoreType PriorityScoreType = "enqueue_time_ns_asc" +) + +// ItemComparatorFunc defines the function signature for comparing two `types.QueueItemAccessor` instances to determine +// their relative dispatch priority. +// +// An implementation of this function determines if item 'a' should be dispatched before item 'b'. It returns true if +// 'a' is of higher priority, and false otherwise. The specific criteria for "higher priority" (e.g., earlier deadline, +// lower enqueue time) are defined by the `IntraFlowDispatchPolicy` that vends this function via an `ItemComparator`. +type ItemComparatorFunc func(a, b types.QueueItemAccessor) bool + +// ItemComparator encapsulates the logic for comparing two `types.QueueItemAccessor` instances to determine their +// relative dispatch priority. It is the definitive source of ordering truth for a flow's queue. +// +// It is vended by an `IntraFlowDispatchPolicy` to make its internal ordering logic explicit and available to other +// components. It is used by `SafeQueue` implementations that support the `CapabilityPriorityConfigurable` capability +// and can also be used by inter-flow policies to compare items from different queues. +// +// Design Justification: This design treats item priority as a relational concept defined by a policy, rather than a +// static attribute on the item itself. This allows for sophisticated, dynamic priority evaluation (e.g., based on +// real-time SLO attainment), as the comparison logic can be stateful. +type ItemComparator interface { + // Func returns the core comparison logic as an `ItemComparatorFunc`. + // + // This function is the single source of truth for determining the relative priority between two items. A `SafeQueue` + // that declares `CapabilityPriorityConfigurable` MUST use this function for its internal ordering. Inter-flow + // policies MAY use this function to compare items from different queues, but only after verifying that their + // `ScoreType()` values are identical, ensuring the comparison is meaningful. + // + // Conformance: MUST NOT return nil. + Func() ItemComparatorFunc + + // ScoreType returns a string descriptor that defines the semantic meaning and domain of the comparison logic. + // + // A non-empty, descriptive string is required for two primary reasons: + // 1. Comparability Check: Inter-flow policies that compare items across different queues (e.g., a "BestHead" policy) + // MUST check for identical `ScoreType` strings before using the comparator functions. A comparison is only + // meaningful if the underlying scoring logic is the same. + // 2. Introspectability: The string makes the priority scheme human-readable for debugging and observability. + // + // Examples: "enqueue_time_ns_asc", "slo_urgency_score_desc". + // + // Future Considerations: While currently a simple string for initial simplicity, a future enhancement could introduce + // a more structured `ScoreType`. Such a structure might explicitly encode ordering (ascending/descending) and value + // semantics (e.g., time, custom_metric), potentially enabling advanced features like cross-`ScoreType` normalization + // plugins. + // + // Conformance: + // - MUST return a non-empty, meaningful string that describes the domain or unit of comparison. + // - For the present, policies MUST NOT assume any implicit cross-`ScoreType` normalization capabilities. + ScoreType() string +} + +// IntraFlowDispatchPolicy selects a specific request to dispatch next from a single flow's queue. +// Implementations define the dispatch ordering of requests within a single flow. +// +// For example, a "First-Come, First-Served" (FCFS) policy would typically inspect a queue and select the item that was +// enqueued the earliest. +type IntraFlowDispatchPolicy interface { + // Name returns a string identifier for the concrete policy implementation type (e.g., "FCFS"). + Name() string + + // SelectItem inspects a flow's queue and returns the `types.QueueItemAccessor` of the item chosen for dispatch. + // + // For queues that inherently order items by dispatch preference, this method will typically just call + // `queue.PeekHead()`. + // + // The `controller.FlowController` uses the handle from the returned item to instruct the `ports.ManagedQueue` to + // remove it. + // + // Returns: + // - `types.QueueItemAccessor`: The selected item, or nil if no item is chosen. + // - error: Non-nil if an unrecoverable error occurs. A nil error is returned if no item is selected (e.g., the + // queue is empty or the policy logic determines a pause is appropriate). + // + // Conformance: Implementations MUST be goroutine-safe if they maintain internal state. + SelectItem(queue FlowQueueAccessor) (selectedItem types.QueueItemAccessor, err error) + + // Comparator returns the `ItemComparator` that defines this policy's item ordering logic. This is the definitive + // source for how items within a flow governed by this policy should be prioritized. + // + // A policy MUST provide a meaningful comparator even if it relies on a queue's inherent ordering (e.g., an FCFS + // policy using a `CapabilityFIFO` queue should return a comparator based on enqueue time). This makes the ordering + // principle explicit and available to other components, like inter-flow policies. + // + // Conformance: MUST NOT return nil. + Comparator() ItemComparator + + // RequiredQueueCapabilities returns a slice of capabilities that the `SafeQueue` used with this policy MUST support. + // This policy is responsible for defining the ordering of items within a flow and so it must require the relevant + // *behavioral* capability (e.g., `CapabilityPriorityConfigurable` or `CapabilityFIFO`). The `ItemComparator` vended + // by this policy then defines that behavior. + RequiredQueueCapabilities() []QueueCapability +} + +// FlowQueueAccessor provides a policy-facing, read-only view of a single flow's queue. +// It combines general queue inspection methods (embedded via `QueueInspectionMethods`) with flow-specific metadata. +// +// Instances of `FlowQueueAccessor` are vended by a `ports.ManagedQueue` and are the primary means by which policies +// inspect individual queue state. +// +// Conformance: Implementations MUST ensure all methods (including those embedded from `QueueInspectionMethods`) are +// goroutine-safe for concurrent access. +type FlowQueueAccessor interface { + QueueInspectionMethods + + // Comparator returns the `ItemComparator` that defines the ordering logic of the items within this queue. + // This is determined by the `IntraFlowDispatchPolicy` associated with this queue's flow. + Comparator() ItemComparator + + // FlowSpec returns the `types.FlowSpecification` of the flow this queue accessor is associated with. + // This provides essential context (like `FlowID`) to policies. + // + // Conformance: Implementations MUST return a valid `types.FlowSpecification`. + FlowSpec() types.FlowSpecification +} diff --git a/pkg/epp/flowcontrol/framework/queue.go b/pkg/epp/flowcontrol/framework/queue.go new file mode 100644 index 000000000..c4829f027 --- /dev/null +++ b/pkg/epp/flowcontrol/framework/queue.go @@ -0,0 +1,108 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// QueueCapability defines a functional capability that a `SafeQueue` implementation can provide. +// These capabilities allow policies (like `IntraFlowDispatchPolicy`) to declare their operational requirements, +// ensuring that a policy is always paired with a compatible queue. +// +// For example, a policy that requires a priority-ordered queue would declare `CapabilityPriorityConfigurable`, and the +// `ports.FlowRegistry` would ensure it is paired with a queue implementation (like a heap) that provides this +// capability. +// +// While a simpler boolean method (e.g., `IsPriorityConfigurable()`) could satisfy current needs, this slice-based +// approach is intentionally chosen for future extensibility. It allows a queue to advertise multiple features (e.g., +// dynamic priority reshuffling, size bounds, etc.) without future breaking changes to the `SafeQueue` interface. +type QueueCapability string + +const ( + // CapabilityFIFO indicates that the queue operates in a First-In, First-Out manner. + // `PeekHead()` will return the oldest item (by logical enqueue time). + CapabilityFIFO QueueCapability = "FIFO" + + // CapabilityPriorityConfigurable indicates that the queue's ordering is determined by an `ItemComparator`. + // `PeekHead()` will return the highest priority item according to this comparator. + CapabilityPriorityConfigurable QueueCapability = "PriorityConfigurable" +) + +// QueueInspectionMethods defines `SafeQueue`'s read-only methods. +type QueueInspectionMethods interface { + // Name returns a string identifier for the concrete queue implementation type (e.g., "ListQueue"). + Name() string + + // Capabilities returns the set of functional capabilities this queue instance provides. + Capabilities() []QueueCapability + + // Len returns the current number of items in the queue. + Len() int + + // ByteSize returns the current total byte size of all items in the queue. + ByteSize() uint64 + + // PeekHead returns the item at the "head" of the queue (the item with the highest priority according to the queue's + // ordering) without removing it. + // + // Returns `ErrQueueEmpty` if the queue is empty. + PeekHead() (peekedItem types.QueueItemAccessor, err error) + + // PeekTail returns the item at the "tail" of the queue (the item with the lowest priority according to the queue's + // ordering) without removing it. + // + // Returns `ErrQueueEmpty` if the queue is empty. + PeekTail() (peekedItem types.QueueItemAccessor, err error) +} + +// PredicateFunc defines a function that returns true if a given item matches a certain condition. +// It is used by `SafeQueue.Cleanup` to filter items. +type PredicateFunc func(item types.QueueItemAccessor) bool + +// SafeQueue defines the contract for a single, concurrent-safe queue implementation. +// +// All implementations MUST be goroutine-safe. +type SafeQueue interface { + QueueInspectionMethods + + // Add attempts to enqueue an item. On success, it must associate a new, unique `types.QueueItemHandle` with the item + // by calling `item.SetHandle()`. + // + // Returns the new length and byte size of the queue. + Add(item types.QueueItemAccessor) (newLen, newByteSize uint64, err error) + + // Remove atomically finds and removes the item identified by the given handle. + // + // On success, implementations MUST invalidate the provided handle by calling `handle.Invalidate()`. + // + // Returns the removed item and the new length and byte size of the queue. + // Returns `ErrInvalidQueueItemHandle` if the handle is invalid (e.g., nil, wrong type, already invalidated). + // Returns `ErrQueueItemNotFound` if the handle is valid but the item is not in the queue. + Remove(handle types.QueueItemHandle) (removedItem types.QueueItemAccessor, newLen, newByteSize uint64, err error) + + // Cleanup iterates through the queue and atomically removes all items for which the predicate returns true, returning + // them in a slice. + // + // The handle for each removed item MUST be invalidated. + Cleanup(predicate PredicateFunc) (cleanedItems []types.QueueItemAccessor, err error) + + // Drain atomically removes all items from the queue and returns them in a slice. + // + // The handle for all removed items MUST be invalidated. The queue MUST be empty after this operation. + Drain() (drainedItems []types.QueueItemAccessor, err error) +} diff --git a/pkg/epp/flowcontrol/types/doc.go b/pkg/epp/flowcontrol/types/doc.go new file mode 100644 index 000000000..eaa8fc004 --- /dev/null +++ b/pkg/epp/flowcontrol/types/doc.go @@ -0,0 +1,22 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package types defines the core data structures and service contracts for the Flow Control system. +// +// It establishes the "vocabulary" of the system, defining the objects that are passed between the main controller, +// policies, and queue plugins. The central data model revolves around the lifecycle of a request, which is +// progressively wrapped in interfaces that provide an enriched, read-only view of its state. +package types diff --git a/pkg/epp/flowcontrol/types/flow.go b/pkg/epp/flowcontrol/types/flow.go index 031a83799..6c73f90f0 100644 --- a/pkg/epp/flowcontrol/types/flow.go +++ b/pkg/epp/flowcontrol/types/flow.go @@ -14,20 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package types defines the core data structures and service contracts for the Flow Controller system. It establishes -// the "vocabulary" of the system, including the request lifecycle interfaces, final outcomes, and standard error types. package types // FlowSpecification defines the configuration of a logical flow, encapsulating its identity and registered priority. // -// A FlowSpecification acts as the registration key for a flow within the Flow Registry. -type FlowSpecification interface { - // ID returns the unique name or identifier for this flow (e.g., model name, tenant ID), corresponding to the value - // from `FlowControlRequest.FlowID()`. - ID() string +// It acts as the registration key for a flow within the `ports.FlowRegistry`. +type FlowSpecification struct { + // ID returns the unique name or identifier for this logical flow, corresponding to the value from + // `FlowControlRequest.FlowID()`. + ID string - // Priority returns the numerical priority level currently associated with this flow within the Flow Registry. + // Priority returns the numerical priority level currently associated with this flow within the `ports.FlowRegistry`. // // Convention: Lower numerical values indicate higher priority. - Priority() uint + Priority uint } diff --git a/pkg/epp/flowcontrol/types/mocks/mocks.go b/pkg/epp/flowcontrol/types/mocks/mocks.go new file mode 100644 index 000000000..04f40b409 --- /dev/null +++ b/pkg/epp/flowcontrol/types/mocks/mocks.go @@ -0,0 +1,93 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package mocks provides simple, configurable mock implementations of the core flow control types, intended for use in +// unit and integration tests. +package mocks + +import ( + "context" + "time" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" +) + +// MockFlowControlRequest provides a mock implementation of the `types.FlowControlRequest` interface. +type MockFlowControlRequest struct { + Ctx context.Context + FlowIDV string + ByteSizeV uint64 + InitialEffectiveTTLV time.Duration + IDV string +} + +func (m *MockFlowControlRequest) Context() context.Context { return m.Ctx } +func (m *MockFlowControlRequest) FlowID() string { return m.FlowIDV } +func (m *MockFlowControlRequest) ByteSize() uint64 { return m.ByteSizeV } +func (m *MockFlowControlRequest) InitialEffectiveTTL() time.Duration { return m.InitialEffectiveTTLV } +func (m *MockFlowControlRequest) ID() string { return m.IDV } + +var _ types.FlowControlRequest = &MockFlowControlRequest{} + +// MockQueueItemHandle provides a mock implementation of the `types.QueueItemHandle` interface. +type MockQueueItemHandle struct { + RawHandle any + IsInvalidatedV bool +} + +func (m *MockQueueItemHandle) Handle() any { return m.RawHandle } +func (m *MockQueueItemHandle) Invalidate() { m.IsInvalidatedV = true } +func (m *MockQueueItemHandle) IsInvalidated() bool { return m.IsInvalidatedV } + +var _ types.QueueItemHandle = &MockQueueItemHandle{} + +// MockQueueItemAccessor provides a mock implementation of the `types.QueueItemAccessor` interface. +type MockQueueItemAccessor struct { + EnqueueTimeV time.Time + EffectiveTTLV time.Duration + OriginalRequestV types.FlowControlRequest + HandleV types.QueueItemHandle +} + +func (m *MockQueueItemAccessor) EnqueueTime() time.Time { return m.EnqueueTimeV } +func (m *MockQueueItemAccessor) EffectiveTTL() time.Duration { return m.EffectiveTTLV } + +func (m *MockQueueItemAccessor) OriginalRequest() types.FlowControlRequest { + if m.OriginalRequestV == nil { + return &MockFlowControlRequest{} + } + return m.OriginalRequestV +} + +func (m *MockQueueItemAccessor) Handle() types.QueueItemHandle { return m.HandleV } +func (m *MockQueueItemAccessor) SetHandle(handle types.QueueItemHandle) { m.HandleV = handle } + +var _ types.QueueItemAccessor = &MockQueueItemAccessor{} + +// NewMockQueueItemAccessor is a constructor for `MockQueueItemAccessor` that initializes the mock with a default +// `MockFlowControlRequest` and `MockQueueItemHandle` to prevent nil pointer dereferences in tests. +func NewMockQueueItemAccessor(byteSize uint64, reqID, flowID string) *MockQueueItemAccessor { + return &MockQueueItemAccessor{ + EnqueueTimeV: time.Now(), + OriginalRequestV: &MockFlowControlRequest{ + IDV: reqID, + FlowIDV: flowID, + ByteSizeV: byteSize, + Ctx: context.Background(), + }, + HandleV: &MockQueueItemHandle{}, + } +} diff --git a/pkg/epp/flowcontrol/types/request.go b/pkg/epp/flowcontrol/types/request.go index bf6960507..4b6ae011b 100644 --- a/pkg/epp/flowcontrol/types/request.go +++ b/pkg/epp/flowcontrol/types/request.go @@ -28,37 +28,39 @@ import ( // wraps this object with its own internal structures (which implement `QueueItemAccessor`) to manage the request's // lifecycle without modifying the original. type FlowControlRequest interface { - // Context returns the request's context. The Flow Controller uses this for monitoring cancellation (e.g., if the - // client disconnects or a request-scoped timeout occurs), which can lead to the request being evicted from a queue. + // Context returns the request's context. The `controller.FlowController` uses this for monitoring cancellation (e.g., + // if the client disconnects or a request-scoped timeout occurs), which can lead to the request being evicted from a + // queue. Context() context.Context // FlowID returns the unique identifier for the flow this request belongs to (e.g., model name, tenant ID). The - // Flow Controller uses this ID, in conjunction with the flow's registered priority, to look up the active - // `ports.ManagedQueue` from the Flow Registry's `ports.RegistryShard`. + // `controller.FlowController` uses this ID, in conjunction with the flow's registered priority, to look up the + // active `ports.ManagedQueue` from the `ports.FlowRegistry`'s `ports.RegistryShard`. FlowID() string - // ByteSize returns the request's size in bytes (e.g., prompt size). This is used by the Flow Controller and for - // managing byte-based capacity limits and for Flow Registry statistics. + // ByteSize returns the request's size in bytes (e.g., prompt size). This is used by the `controller.FlowController` + // and for managing byte-based capacity limits and for `ports.FlowRegistry` statistics. ByteSize() uint64 // InitialEffectiveTTL returns the suggested Time-To-Live for this request. - // This value is treated as a hint; the Flow Controller may override it based on its own configuration or policies. - // A zero value indicates the request has no specific TTL preference, and a system-wide default should be applied. + // This value is treated as a hint; the `controller.FlowController` may override it based on its own configuration or + // policies. A zero value indicates the request has no specific TTL preference, and a system-wide default should be + // applied. InitialEffectiveTTL() time.Duration // ID returns an optional, user-facing unique identifier for this specific request. It is intended for logging, - // tracing, and observability. The core flow control logic does not use this ID for dispatching decisions; it uses + // tracing, and observability. The `controller.FlowController` does not use this ID for dispatching decisions; it uses // the internal, opaque `QueueItemHandle`. ID() string } // QueueItemHandle is an opaque handle to an item that has been successfully added to a `framework.SafeQueue`. It acts -// as a key, allowing the Flow Controller to perform targeted operations (like removal) on a specific item without -// needing to know the queue's internal structure. +// as a key, allowing the `controller.FlowController` to perform targeted operations (like removal) on a specific item +// without needing to know the queue's internal structure. // // A handle is created by and bound to the specific `framework.SafeQueue` instance that stores the item. type QueueItemHandle interface { - // Handle returns the underlying, queue-specific raw handle (e.g., *list.Element). + // Handle returns the underlying, queue-specific raw handle (e.g., `*list.Element`). // This method is intended for internal use by the `framework.SafeQueue` implementation that created it. // Callers outside the queue implementation should treat the returned value as opaque. Handle() any @@ -82,29 +84,20 @@ type QueueItemHandle interface { // // The Flow Controller creates an object that implements this interface by wrapping an incoming `FlowControlRequest`. type QueueItemAccessor interface { - // EnqueueTime is the timestamp when the item was logically accepted by the Flow Controller for queuing (i.e., when - // `FlowController.EnqueueAndWait()` was called. - EnqueueTime() time.Time - - // ByteSize returns the byte size of the original request, cached from `FlowControlRequest.ByteSize()`. - ByteSize() uint64 - - // FlowID returns the unique identifier of the flow this item belongs to, cached from `FlowControlRequest.FlowID()`. - FlowID() string - - // EffectiveTTL is the actual Time-To-Live assigned to this item by the Flow Controller, taking into account the - // request's preference (`FlowControlRequest.InitialEffectiveTTL()`) and any Flow Controller or per-flow - // defaults/policies. - EffectiveTTL() time.Duration - - // RequestID is the user-facing ID from the original request (`FlowControlRequest.ID()`). - RequestID() string - // OriginalRequest returns the underlying `FlowControlRequest` that this accessor provides a view of. // This method serves as an escape hatch, allowing policies or components that are aware of specific // `FlowControlRequest` implementations to perform type assertions and access richer, application-specific data. OriginalRequest() FlowControlRequest + // EnqueueTime is the timestamp when the item was logically accepted by the `controller.FlowController` for queuing + // (i.e., when `controller.FlowController.EnqueueAndWait()` was called). + EnqueueTime() time.Time + + // EffectiveTTL is the actual Time-To-Live assigned to this item by the `controller.FlowController`, taking into + // account the request's preference (`FlowControlRequest.InitialEffectiveTTL()`) and any `controller.FlowController` + // or per-flow defaults/policies. + EffectiveTTL() time.Duration + // Handle returns the `QueueItemHandle` associated with this item once it has been successfully added to a // `framework.SafeQueue`. It returns nil if the item is not yet in a queue. Handle() QueueItemHandle @@ -115,7 +108,5 @@ type QueueItemAccessor interface { // immediately after a new `QueueItemHandle` is created for the item. This ensures that the item always carries a // valid handle while it is in a queue. This method is not intended for use outside of `framework.SafeQueue` // implementations. - // - //go:doc SetHandle(handle QueueItemHandle) } diff --git a/site-src/guides/inferencepool-rollout.md b/site-src/guides/inferencepool-rollout.md index 89a384ab4..809fb7f80 100644 --- a/site-src/guides/inferencepool-rollout.md +++ b/site-src/guides/inferencepool-rollout.md @@ -177,7 +177,6 @@ spec: terminationGracePeriodSeconds: 130 nodeSelector: cloud.google.com/gke-accelerator: "nvidia-h100-80gb" - volumes: - name: data emptyDir: {} @@ -250,40 +249,133 @@ spec: spec: terminationGracePeriodSeconds: 130 containers: - - name: epp - image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:main - imagePullPolicy: Always - args: - - -poolName - - "vllm-llama3-8b-instruct-new" - - "-poolNamespace" - - "default" - - -v - - "4" - - --zap-encoder - - "json" - - -grpcPort - - "9002" - - -grpcHealthPort - - "9003" - ports: - - containerPort: 9002 - - containerPort: 9003 - - name: metrics - containerPort: 9090 - livenessProbe: - grpc: - port: 9003 - service: inference-extension - initialDelaySeconds: 5 - periodSeconds: 10 - readinessProbe: - grpc: - port: 9003 - service: inference-extension - initialDelaySeconds: 5 - periodSeconds: 10 - EOF + - name: epp + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:main + imagePullPolicy: Always + args: + - -poolName + - "vllm-llama3-8b-instruct-new" + - -poolNamespace + - "default" + - -v + - "4" + - --zap-encoder + - "json" + - -grpcPort + - "9002" + - -grpcHealthPort + - "9003" + - -configFile + - "/config/default-plugins.yaml" + ports: + - containerPort: 9002 + name: grpc + - containerPort: 9003 + name: grpc-health + - containerPort: 9090 + name: metrics + livenessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + volumeMounts: + - name: plugins-config-volume + mountPath: /config + volumes: + - name: plugins-config-volume + configMap: + name: plugins-config +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: plugins-config + namespace: default +data: + default-plugins.yaml: | + apiVersion: inference.networking.x-k8s.io/v1alpha1 + kind: EndpointPickerConfig + plugins: + - type: low-queue-filter + parameters: + threshold: 128 + - type: lora-affinity-filter + parameters: + threshold: 0.999 + - type: least-queue-filter + - type: least-kv-cache-filter + - type: decision-tree-filter + name: low-latency-filter + parameters: + current: + pluginRef: low-queue-filter + nextOnSuccess: + decisionTree: + current: + pluginRef: lora-affinity-filter + nextOnSuccessOrFailure: + decisionTree: + current: + pluginRef: least-queue-filter + nextOnSuccessOrFailure: + decisionTree: + current: + pluginRef: least-kv-cache-filter + nextOnFailure: + decisionTree: + current: + pluginRef: least-queue-filter + nextOnSuccessOrFailure: + decisionTree: + current: + pluginRef: lora-affinity-filter + nextOnSuccessOrFailure: + decisionTree: + current: + pluginRef: least-kv-cache-filter + - type: random-picker + parameters: + maxNumOfEndpoints: 1 + - type: single-profile-handler + schedulingProfiles: + - name: default + plugins: + - pluginRef: low-latency-filter + - pluginRef: random-picker + plugins-v2.yaml: | + apiVersion: inference.networking.x-k8s.io/v1alpha1 + kind: EndpointPickerConfig + plugins: + - type: queue-scorer + - type: kv-cache-scorer + - type: prefix-cache-scorer + parameters: + hashBlockSize: 64 + maxPrefixBlocksToMatch: 256 + lruCapacityPerServer: 31250 + - type: max-score-picker + parameters: + maxNumOfEndpoints: 1 + - type: single-profile-handler + schedulingProfiles: + - name: default + plugins: + - pluginRef: queue-scorer + weight: 1 + - pluginRef: kv-cache-scorer + weight: 1 + - pluginRef: prefix-cache-scorer + weight: 1 + - pluginRef: max-score-picker +EOF ``` ### Direct traffic to the new inference pool diff --git a/test/testdata/inferencepool-e2e.yaml b/test/testdata/inferencepool-e2e.yaml index 7503727e0..ed4e8a475 100644 --- a/test/testdata/inferencepool-e2e.yaml +++ b/test/testdata/inferencepool-e2e.yaml @@ -64,9 +64,6 @@ spec: - "9003" - "-configFile" - "/config/default-plugins.yaml" - env: - - name: USE_STREAMING - value: "true" ports: - containerPort: 9002 - containerPort: 9003 diff --git a/v1api/v1/doc.go b/v1api/v1/doc.go new file mode 100644 index 000000000..b14103e22 --- /dev/null +++ b/v1api/v1/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1 contains API Schema definitions for the +// inference.networking.k8s.io API group. +// +// +k8s:openapi-gen=true +// +kubebuilder:object:generate=true +// +groupName=inference.networking.k8s.io +package v1 diff --git a/v1api/v1/inferencepool_types.go b/v1api/v1/inferencepool_types.go new file mode 100644 index 000000000..26d453c21 --- /dev/null +++ b/v1api/v1/inferencepool_types.go @@ -0,0 +1,297 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// InferencePool is the Schema for the InferencePools API. +// +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:storageversion +// +genclient +type InferencePool struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec InferencePoolSpec `json:"spec,omitempty"` + + // Status defines the observed state of InferencePool. + // + // +kubebuilder:default={parent: {{parentRef: {kind: "Status", name: "default"}, conditions: {{type: "Accepted", status: "Unknown", reason: "Pending", message: "Waiting for controller", lastTransitionTime: "1970-01-01T00:00:00Z"}}}}} + Status InferencePoolStatus `json:"status,omitempty"` +} + +// InferencePoolList contains a list of InferencePool. +// +// +kubebuilder:object:root=true +type InferencePoolList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []InferencePool `json:"items"` +} + +// InferencePoolSpec defines the desired state of InferencePool +type InferencePoolSpec struct { + // Selector defines a map of labels to watch model server pods + // that should be included in the InferencePool. + // In some cases, implementations may translate this field to a Service selector, so this matches the simple + // map used for Service selectors instead of the full Kubernetes LabelSelector type. + // If sepecified, it will be applied to match the model server pods in the same namespace as the InferencePool. + // Cross namesoace selector is not supported. + // + // +kubebuilder:validation:Required + Selector map[LabelKey]LabelValue `json:"selector"` + + // TargetPortNumber defines the port number to access the selected model servers. + // The number must be in the range 1 to 65535. + // + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +kubebuilder:validation:Required + TargetPortNumber int32 `json:"targetPortNumber"` + + // EndpointPickerConfig specifies the configuration needed by the proxy to discover and connect to the endpoint + // picker service that picks endpoints for the requests routed to this pool. + EndpointPickerConfig `json:",inline"` +} + +// EndpointPickerConfig specifies the configuration needed by the proxy to discover and connect to the endpoint picker extension. +// This type is intended to be a union of mutually exclusive configuration options that we may add in the future. +type EndpointPickerConfig struct { + // Extension configures an endpoint picker as an extension service. + // + // +kubebuilder:validation:Required + ExtensionRef *Extension `json:"extensionRef,omitempty"` +} + +// Extension specifies how to configure an extension that runs the endpoint picker. +type Extension struct { + // Reference is a reference to a service extension. When ExtensionReference is invalid, + // a 5XX status code MUST be returned for the request that would have otherwise been routed + // to the invalid backend. + ExtensionReference `json:",inline"` + + // ExtensionConnection configures the connection between the gateway and the extension. + ExtensionConnection `json:",inline"` +} + +// ExtensionReference is a reference to the extension. +// +// Connections to this extension MUST use TLS by default. Implementations MAY +// provide a way to customize this connection to use cleartext, a different +// protocol, or custom TLS configuration. +// +// If a reference is invalid, the implementation MUST update the `ResolvedRefs` +// Condition on the InferencePool's status to `status: False`. A 5XX status code +// MUST be returned for the request that would have otherwise been routed to the +// invalid backend. +type ExtensionReference struct { + // Group is the group of the referent. + // The default value is "", representing the Core API group. + // + // +optional + // +kubebuilder:default="" + Group *Group `json:"group,omitempty"` + + // Kind is the Kubernetes resource kind of the referent. For example + // "Service". + // + // Defaults to "Service" when not specified. + // + // ExternalName services can refer to CNAME DNS records that may live + // outside of the cluster and as such are difficult to reason about in + // terms of conformance. They also may not be safe to forward to (see + // CVE-2021-25740 for more information). Implementations MUST NOT + // support ExternalName Services. + // + // +optional + // +kubebuilder:default=Service + Kind *Kind `json:"kind,omitempty"` + + // Name is the name of the referent. + // + // +kubebuilder:validation:Required + Name ObjectName `json:"name"` + + // The port number on the service running the extension. When unspecified, + // implementations SHOULD infer a default value of 9002 when the Kind is + // Service. + // + // +optional + PortNumber *PortNumber `json:"portNumber,omitempty"` +} + +// ExtensionConnection encapsulates options that configures the connection to the extension. +type ExtensionConnection struct { + // Configures how the gateway handles the case when the extension is not responsive. + // Defaults to failClose. + // + // +optional + // +kubebuilder:default="FailClose" + FailureMode *ExtensionFailureMode `json:"failureMode"` +} + +// ExtensionFailureMode defines the options for how the gateway handles the case when the extension is not +// responsive. +// +kubebuilder:validation:Enum=FailOpen;FailClose +type ExtensionFailureMode string + +const ( + // FailOpen specifies that the proxy should not drop the request and forward the request to and endpoint of its picking. + FailOpen ExtensionFailureMode = "FailOpen" + // FailClose specifies that the proxy should drop the request. + FailClose ExtensionFailureMode = "FailClose" +) + +// InferencePoolStatus defines the observed state of InferencePool. +type InferencePoolStatus struct { + // Parents is a list of parent resources (usually Gateways) that are + // associated with the InferencePool, and the status of the InferencePool with respect to + // each parent. + // + // A maximum of 32 Gateways will be represented in this list. When the list contains + // `kind: Status, name: default`, it indicates that the InferencePool is not + // associated with any Gateway and a controller must perform the following: + // + // - Remove the parent when setting the "Accepted" condition. + // - Add the parent when the controller will no longer manage the InferencePool + // and no other parents exist. + // + // +kubebuilder:validation:MaxItems=32 + Parents []PoolStatus `json:"parent,omitempty"` +} + +// PoolStatus defines the observed state of InferencePool from a Gateway. +type PoolStatus struct { + // GatewayRef indicates the gateway that observed state of InferencePool. + GatewayRef ParentGatewayReference `json:"parentRef"` + + // Conditions track the state of the InferencePool. + // + // Known condition types are: + // + // * "Accepted" + // * "ResolvedRefs" + // + // +optional + // +listType=map + // +listMapKey=type + // +kubebuilder:validation:MaxItems=8 + // +kubebuilder:default={{type: "Accepted", status: "Unknown", reason:"Pending", message:"Waiting for controller", lastTransitionTime: "1970-01-01T00:00:00Z"}} + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// InferencePoolConditionType is a type of condition for the InferencePool +type InferencePoolConditionType string + +// InferencePoolReason is the reason for a given InferencePoolConditionType +type InferencePoolReason string + +const ( + // This condition indicates whether the InferencePool has been accepted or rejected + // by a Gateway, and why. + // + // Possible reasons for this condition to be True are: + // + // * "Accepted" + // + // Possible reasons for this condition to be False are: + // + // * "NotSupportedByGateway" + // * "HTTPRouteNotAccepted" + // + // Possible reasons for this condition to be Unknown are: + // + // * "Pending" + // + // Controllers MAY raise this condition with other reasons, but should + // prefer to use the reasons listed above to improve interoperability. + InferencePoolConditionAccepted InferencePoolConditionType = "Accepted" + + // This reason is used with the "Accepted" condition when the InferencePool has been + // accepted by the Gateway. + InferencePoolReasonAccepted InferencePoolReason = "Accepted" + + // This reason is used with the "Accepted" condition when the InferencePool + // has not been accepted by a Gateway because the Gateway does not support + // InferencePool as a backend. + InferencePoolReasonNotSupportedByGateway InferencePoolReason = "NotSupportedByGateway" + + // This reason is used with the "Accepted" condition when the InferencePool is + // referenced by an HTTPRoute that has been rejected by the Gateway. The user + // should inspect the status of the referring HTTPRoute for the specific reason. + InferencePoolReasonHTTPRouteNotAccepted InferencePoolReason = "HTTPRouteNotAccepted" + + // This reason is used with the "Accepted" when a controller has not yet + // reconciled the InferencePool. + InferencePoolReasonPending InferencePoolReason = "Pending" +) + +const ( + // This condition indicates whether the controller was able to resolve all + // the object references for the InferencePool. + // + // Possible reasons for this condition to be True are: + // + // * "ResolvedRefs" + // + // Possible reasons for this condition to be False are: + // + // * "InvalidExtensionRef" + // + // Controllers MAY raise this condition with other reasons, but should + // prefer to use the reasons listed above to improve interoperability. + InferencePoolConditionResolvedRefs InferencePoolConditionType = "ResolvedRefs" + + // This reason is used with the "ResolvedRefs" condition when the condition + // is true. + InferencePoolReasonResolvedRefs InferencePoolReason = "ResolvedRefs" + + // This reason is used with the "ResolvedRefs" condition when the + // ExtensionRef is invalid in some way. This can include an unsupported kind + // or API group, or a reference to a resource that can not be found. + InferencePoolReasonInvalidExtensionRef InferencePoolReason = "InvalidExtensionRef" +) + +// ParentGatewayReference identifies an API object including its namespace, +// defaulting to Gateway. +type ParentGatewayReference struct { + // Group is the group of the referent. + // + // +optional + // +kubebuilder:default="gateway.networking.k8s.io" + Group *Group `json:"group"` + + // Kind is kind of the referent. For example "Gateway". + // + // +optional + // +kubebuilder:default=Gateway + Kind *Kind `json:"kind"` + + // Name is the name of the referent. + Name ObjectName `json:"name"` + + // Namespace is the namespace of the referent. If not present, + // the namespace of the referent is assumed to be the same as + // the namespace of the referring object. + // + // +optional + Namespace *Namespace `json:"namespace,omitempty"` +} diff --git a/v1api/v1/shared_types.go b/v1api/v1/shared_types.go new file mode 100644 index 000000000..a6bbf00e3 --- /dev/null +++ b/v1api/v1/shared_types.go @@ -0,0 +1,129 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +// Group refers to a Kubernetes Group. It must either be an empty string or a +// RFC 1123 subdomain. +// +// This validation is based off of the corresponding Kubernetes validation: +// https://github.com/kubernetes/apimachinery/blob/02cfb53916346d085a6c6c7c66f882e3c6b0eca6/pkg/util/validation/validation.go#L208 +// +// Valid values include: +// +// * "" - empty string implies core Kubernetes API group +// * "gateway.networking.k8s.io" +// * "foo.example.com" +// +// Invalid values include: +// +// * "example.com/bar" - "/" is an invalid character +// +// +kubebuilder:validation:MaxLength=253 +// +kubebuilder:validation:Pattern=`^$|^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$` +type Group string + +// Kind refers to a Kubernetes Kind. +// +// Valid values include: +// +// * "Service" +// * "HTTPRoute" +// +// Invalid values include: +// +// * "invalid/kind" - "/" is an invalid character +// +// +kubebuilder:validation:MinLength=1 +// +kubebuilder:validation:MaxLength=63 +// +kubebuilder:validation:Pattern=`^[a-zA-Z]([-a-zA-Z0-9]*[a-zA-Z0-9])?$` +type Kind string + +// ObjectName refers to the name of a Kubernetes object. +// Object names can have a variety of forms, including RFC 1123 subdomains, +// RFC 1123 labels, or RFC 1035 labels. +// +// +kubebuilder:validation:MinLength=1 +// +kubebuilder:validation:MaxLength=253 +type ObjectName string + +// PortNumber defines a network port. +// +// +kubebuilder:validation:Minimum=1 +// +kubebuilder:validation:Maximum=65535 +type PortNumber int32 + +// LabelKey was originally copied from: https://github.com/kubernetes-sigs/gateway-api/blob/99a3934c6bc1ce0874f3a4c5f20cafd8977ffcb4/apis/v1/shared_types.go#L694-L731 +// Duplicated as to not take an unexpected dependency on gw's API. +// +// LabelKey is the key of a label. This is used for validation +// of maps. This matches the Kubernetes "qualified name" validation that is used for labels. +// Labels are case sensitive, so: my-label and My-Label are considered distinct. +// +// Valid values include: +// +// * example +// * example.com +// * example.com/path +// * example.com/path.html +// +// Invalid values include: +// +// * example~ - "~" is an invalid character +// * example.com. - can not start or end with "." +// +// +kubebuilder:validation:MinLength=1 +// +kubebuilder:validation:MaxLength=253 +// +kubebuilder:validation:Pattern=`^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?([A-Za-z0-9][-A-Za-z0-9_.]{0,61})?[A-Za-z0-9]$` +type LabelKey string + +// LabelValue is the value of a label. This is used for validation +// of maps. This matches the Kubernetes label validation rules: +// * must be 63 characters or less (can be empty), +// * unless empty, must begin and end with an alphanumeric character ([a-z0-9A-Z]), +// * could contain dashes (-), underscores (_), dots (.), and alphanumerics between. +// +// Valid values include: +// +// * MyValue +// * my.name +// * 123-my-value +// +// +kubebuilder:validation:MinLength=0 +// +kubebuilder:validation:MaxLength=63 +// +kubebuilder:validation:Pattern=`^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?$` +type LabelValue string + +// Namespace refers to a Kubernetes namespace. It must be a RFC 1123 label. +// +// This validation is based off of the corresponding Kubernetes validation: +// https://github.com/kubernetes/apimachinery/blob/02cfb53916346d085a6c6c7c66f882e3c6b0eca6/pkg/util/validation/validation.go#L187 +// +// This is used for Namespace name validation here: +// https://github.com/kubernetes/apimachinery/blob/02cfb53916346d085a6c6c7c66f882e3c6b0eca6/pkg/api/validation/generic.go#L63 +// +// Valid values include: +// +// * "example" +// +// Invalid values include: +// +// * "example.com" - "." is an invalid character +// +// +kubebuilder:validation:Pattern=`^[a-z0-9]([-a-z0-9]*[a-z0-9])?$` +// +kubebuilder:validation:MinLength=1 +// +kubebuilder:validation:MaxLength=63 +type Namespace string