diff --git a/README.md b/README.md index c7652af..b14ed5f 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ _Name Story: the inspiration of the name `Manta` is coming from Dota2, called [M ## Features Overview -- **Model Hub Support**: Models could be downloaded directly from model hubs (Huggingface etc.) or object storages, no other efforts. +- **Model Hub Support**: Models could be downloaded directly from model hubs (Huggingface etc.) or object storages, no other effort. - **Model Preheat**: Models could be preloaded to clusters, or specified nodes to accelerate the model serving. - **Model Cache**: Models will be cached as chunks after downloading for faster model loading. - **Model Lifecycle Management**: Model lifecycle is managed automatically with different strategies, like `Retain` or `Delete`. @@ -97,9 +97,11 @@ More details refer to the [APIs](https://github.com/InftyAI/Manta/blob/main/api/ ## Roadmap +In the long term, we hope to make Manta **an unified cache system within MLOps**. + - Preloading datasets from model hubs - RDMA support for faster model loading -- More integrations with serving projects +- More integrations with MLOps system, including training and serving ## Community diff --git a/api/v1alpha1/replication_types.go b/api/v1alpha1/replication_types.go index 35128ce..6bacc9f 100644 --- a/api/v1alpha1/replication_types.go +++ b/api/v1alpha1/replication_types.go @@ -33,6 +33,7 @@ type Target struct { // - oss://./ // - localhost:// // - remote://@ + // Localhost means the local host path, remote means the host path of the provided node. // Note: if it's a folder, all the files under the folder will be considered, // otherwise, only one file will be replicated. URI *string `json:"uri,omitempty"` @@ -80,6 +81,7 @@ type ReplicationStatus struct { //+kubebuilder:resource:scope=Cluster //+kubebuilder:printcolumn:name="node",type=string,JSONPath=".spec.nodeName" //+kubebuilder:printcolumn:name="phase",type=string,JSONPath=".status.phase" +//+kubebuilder:printcolumn:name="Age",type=date,JSONPath=".metadata.creationTimestamp" // Replication is the Schema for the replications API type Replication struct { diff --git a/api/v1alpha1/torrent_types.go b/api/v1alpha1/torrent_types.go index c0fd53f..24f7fc3 100644 --- a/api/v1alpha1/torrent_types.go +++ b/api/v1alpha1/torrent_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -65,6 +67,12 @@ const ( // TorrentSpec defines the desired state of Torrent type TorrentSpec struct { + // Preheat represents whether we should preload the model. + // Preheat can only be transitioned from false to true, not the other way around. + // +kubebuilder:default=true + // +optional + Preheat *bool `json:"preheat,omitempty"` + // Hub represents the model registry for model downloads. // Hub and URI are exclusive. // +optional @@ -88,6 +96,11 @@ type TorrentSpec struct { // +kubebuilder:validation:Enum={Retain,Delete} // +optional ReclaimPolicy *ReclaimPolicy `json:"reclaimPolicy,omitempty"` + // TTLSecondsAfterReady represents the waiting time to delete the Torrent once Ready. + // Default to nil indicates Torrent will not be deleted. + // TODO: We only support nil and 0 right now. + // +optional + TTLSecondsAfterReady *time.Duration `json:"ttlSecondsAfterReady,omitempty"` // NodeSelector represents the node constraints to download the chunks. // It can be used to download the model to a specified node for preheating. // +optional diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 6bffc73..8de0c41 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1alpha1 import ( "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + timex "time" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -420,6 +421,11 @@ func (in *TorrentList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TorrentSpec) DeepCopyInto(out *TorrentSpec) { *out = *in + if in.Preheat != nil { + in, out := &in.Preheat, &out.Preheat + *out = new(bool) + **out = **in + } if in.Hub != nil { in, out := &in.Hub, &out.Hub *out = new(Hub) @@ -435,6 +441,11 @@ func (in *TorrentSpec) DeepCopyInto(out *TorrentSpec) { *out = new(ReclaimPolicy) **out = **in } + if in.TTLSecondsAfterReady != nil { + in, out := &in.TTLSecondsAfterReady, &out.TTLSecondsAfterReady + *out = new(timex.Duration) + **out = **in + } if in.NodeSelector != nil { in, out := &in.NodeSelector, &out.NodeSelector *out = make(map[string]string, len(*in)) diff --git a/config/crd/bases/manta.io_replications.yaml b/config/crd/bases/manta.io_replications.yaml index d44dddc..77e1380 100644 --- a/config/crd/bases/manta.io_replications.yaml +++ b/config/crd/bases/manta.io_replications.yaml @@ -88,9 +88,11 @@ spec: uri: description: "URI represents the file address with different storages, e.g.:\n\t - oss://./\n\t - - localhost://\n\t - remote://@\nNote: - if it's a folder, all the files under the folder will be considered,\notherwise, - only one file will be replicated." + - localhost://\n\t - remote://@\nLocalhost + means the local host path, remote means the host path of the + provided node.\nNote: if it's a folder, all the files under + the folder will be considered,\notherwise, only one file will + be replicated." type: string type: object nodeName: @@ -139,9 +141,11 @@ spec: uri: description: "URI represents the file address with different storages, e.g.:\n\t - oss://./\n\t - - localhost://\n\t - remote://@\nNote: - if it's a folder, all the files under the folder will be considered,\notherwise, - only one file will be replicated." + - localhost://\n\t - remote://@\nLocalhost + means the local host path, remote means the host path of the + provided node.\nNote: if it's a folder, all the files under + the folder will be considered,\notherwise, only one file will + be replicated." type: string type: object required: diff --git a/config/crd/bases/manta.io_torrents.yaml b/config/crd/bases/manta.io_torrents.yaml index 59ff253..77b337e 100644 --- a/config/crd/bases/manta.io_torrents.yaml +++ b/config/crd/bases/manta.io_torrents.yaml @@ -84,6 +84,12 @@ spec: NodeSelector represents the node constraints to download the chunks. It can be used to download the model to a specified node for preheating. type: object + preheat: + default: true + description: |- + Preheat represents whether we should preload the model. + Preheat can only be transitioned from false to true, not the other way around. + type: boolean reclaimPolicy: default: Retain description: |- @@ -99,6 +105,12 @@ spec: description: Replicas represents the replication number of each object. format: int32 type: integer + ttlSecondsAfterReady: + description: |- + TTLSecondsAfterReady represents the waiting time to delete the Torrent once Ready. + Default to nil indicates Torrent will not be deleted. + format: int64 + type: integer type: object status: description: TorrentStatus defines the observed state of Torrent diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 197e53e..8739a20 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -62,6 +62,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) logger.Info("reconcile Replication", "Replication", klog.KObj(replication)) + // Leave the left reconciliation to agent controller. if setReplicationCondition(replication) { return ctrl.Result{}, r.Status().Update(ctx, replication) } @@ -70,7 +71,6 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Only watch for create events. - func (r *ReplicationReconciler) Create(e event.CreateEvent) bool { return true } diff --git a/pkg/controller/torrent_controller.go b/pkg/controller/torrent_controller.go index 3c0b0f2..c2c0c83 100644 --- a/pkg/controller/torrent_controller.go +++ b/pkg/controller/torrent_controller.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "time" "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -74,6 +75,12 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct logger.Info("reconcile Torrent") + // Noe need to handle Torrent at this point just because we don't want to + // download the files. + if torrent.Spec.Preheat != nil && !*torrent.Spec.Preheat { + return ctrl.Result{}, nil + } + // TODO: delete torrent at anytime. if torrentReady(torrent) && torrentDeleting(torrent) { logger.Info("start to handle torrent deletion") @@ -102,6 +109,7 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct logger.Error(err, "failed to handle creation") return ctrl.Result{}, err } + return ctrl.Result{}, nil } nodeTrackers := &api.NodeTrackerList{} @@ -196,6 +204,15 @@ func (r *TorrentReconciler) handleDeletion(ctx context.Context, torrent *api.Tor } func (r *TorrentReconciler) handleReady(ctx context.Context, torrent *api.Torrent) error { + // TODO: once ttl supports other values than 0, we need to refactor here. + if torrent.Spec.TTLSecondsAfterReady != nil && *torrent.Spec.TTLSecondsAfterReady == time.Duration(0) { + // Corresponding Replications will be deleted as well. + if err := r.Client.Delete(ctx, torrent); err != nil { + return err + } + return nil + } + replications, err := r.replications(ctx, torrent) if err != nil { return err @@ -299,6 +316,7 @@ func (r *TorrentReconciler) SetupWithManager(mgr ctrl.Manager) error { } func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) (changed bool) { + // Set to Pending condition. if torrent.Status.Repo == nil { condition := metav1.Condition{ Type: api.PendingConditionType, @@ -309,6 +327,7 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) ( return setTorrentConditionTo(torrent, condition) } + // Set to Reclaiming condition. if torrentReady(torrent) && torrentDeleting(torrent) { condition := metav1.Condition{ Type: api.ReclaimingConditionType, @@ -323,6 +342,7 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) ( return false } + // Set to Ready condition. if apimeta.IsStatusConditionTrue(torrent.Status.Conditions, api.ReplicateConditionType) && replicationsReady(replications) { condition := metav1.Condition{ Type: api.ReadyConditionType, @@ -333,6 +353,7 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) ( return setTorrentConditionTo(torrent, condition) } + // Set to Replicating condition. if torrentDownloading(replications) { condition := metav1.Condition{ Type: api.ReplicateConditionType, diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 23bad99..5815749 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -379,7 +379,7 @@ func buildSyncReplication(torrent *api.Torrent, chunk framework.ChunkInfo, sourc func buildDeletionReplication(torrent *api.Torrent, chunk framework.ChunkInfo, nodeName string) *api.Replication { repoName := hubRepoName(torrent.Spec.Hub) generatedName := util.GenerateName(nodeName) - name := chunk.Name + "--" + generatedName + name := chunk.Name + "--" + generatedName + "--" + "d" return &api.Replication{ TypeMeta: v1.TypeMeta{ diff --git a/pkg/webhook/torrent_webhook.go b/pkg/webhook/torrent_webhook.go index 77f0c52..5250a32 100644 --- a/pkg/webhook/torrent_webhook.go +++ b/pkg/webhook/torrent_webhook.go @@ -18,6 +18,7 @@ package webhook import ( "context" + "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -60,7 +61,15 @@ func (w *TorrentWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type func (w *TorrentWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { - allErrs := w.generateValidate(newObj) + old := oldObj.(*api.Torrent) + new := newObj.(*api.Torrent) + + var allErrs field.ErrorList + specPath := field.NewPath("spec") + if *old.Spec.Preheat && !*new.Spec.Preheat { + allErrs = append(allErrs, field.Forbidden(specPath.Child("preheat"), "preheat can only be transitioned from false to true")) + } + allErrs = append(allErrs, w.generateValidate(newObj)...) return nil, allErrs.ToAggregate() } @@ -78,5 +87,9 @@ func (w *TorrentWebhook) generateValidate(obj runtime.Object) field.ErrorList { allErrs = append(allErrs, field.Forbidden(specPath.Child("hub"), "hub can't be null")) } + if !(torrent.Spec.TTLSecondsAfterReady == nil || *torrent.Spec.TTLSecondsAfterReady == time.Duration(0)) { + allErrs = append(allErrs, field.Forbidden(specPath.Child("ttlSecondsAfterReady"), "only support nil and 0 right now")) + } + return allErrs } diff --git a/test/e2e/torrent_test.go b/test/e2e/torrent_test.go index 34f7133..6d4c4fd 100644 --- a/test/e2e/torrent_test.go +++ b/test/e2e/torrent_test.go @@ -51,6 +51,7 @@ var _ = ginkgo.Describe("torrent e2e test", func() { defer func() { gomega.Expect(k8sClient.Delete(ctx, torrent)).To(gomega.Succeed()) validation.ValidateTorrentNotExist(ctx, k8sClient, torrent) + validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3") }() validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.ReadyConditionType, "Ready", metav1.ConditionTrue, &validation.ValidateOptions{Timeout: 5 * time.Minute}) @@ -63,14 +64,14 @@ var _ = ginkgo.Describe("torrent e2e test", func() { defer func() { gomega.Expect(k8sClient.Delete(ctx, torrent)).To(gomega.Succeed()) validation.ValidateTorrentNotExist(ctx, k8sClient, torrent) - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 0) + validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3") }() validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.ReadyConditionType, "Ready", metav1.ConditionTrue, &validation.ValidateOptions{Timeout: 5 * time.Minute}) validation.ValidateAllReplicationsNodeNameEqualTo(ctx, k8sClient, torrent, "kind-worker2") validation.ValidateReplicationsNumberEqualTo(ctx, k8sClient, torrent, 0) // From https://huggingface.co/facebook/opt-125m/tree/main, opt-125m has 12 files. - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 12) + validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 12, "kind-worker2") }) ginkgo.It("Sync the models successfully", func() { @@ -79,9 +80,7 @@ var _ = ginkgo.Describe("torrent e2e test", func() { defer func() { gomega.Expect(k8sClient.Delete(ctx, torrent)).To(gomega.Succeed()) validation.ValidateTorrentNotExist(ctx, k8sClient, torrent) - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker", 0) - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 0) - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker3", 0) + validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3") }() validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.ReadyConditionType, "Ready", metav1.ConditionTrue, &validation.ValidateOptions{Timeout: 5 * time.Minute}) @@ -97,8 +96,15 @@ var _ = ginkgo.Describe("torrent e2e test", func() { // We have three nodes. // From https://huggingface.co/facebook/opt-125m/tree/main, opt-125m has 12 files. - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker", 12) - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 12) - validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker3", 12) + validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 12, "kind-worker", "kind-worker2", "kind-worker3") + }) + + ginkgo.It("Torrent will be auto GCed with TTLSecondsAfterReady", func() { + torrent := wrapper.MakeTorrent("facebook-opt-125m").TTL(0).Hub("Huggingface", "facebook/opt-125m", "").ReclaimPolicy(api.DeleteReclaimPolicy).Obj() + gomega.Expect(k8sClient.Create(ctx, torrent)).To(gomega.Succeed()) + + validation.ValidateTorrentNotExist(ctx, k8sClient, torrent) + validation.ValidateReplicationsNumberEqualTo(ctx, k8sClient, torrent, 0) + validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3") }) }) diff --git a/test/integration/controller/torrent_test.go b/test/integration/controller/torrent_test.go index c321f2b..58a4e0a 100644 --- a/test/integration/controller/torrent_test.go +++ b/test/integration/controller/torrent_test.go @@ -24,6 +24,7 @@ import ( "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" api "github.com/inftyai/manta/api/v1alpha1" @@ -71,8 +72,8 @@ var _ = ginkgo.Describe("Torrent controller test", func() { update.updateFunc(obj) } newObj := &api.Torrent{} - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: obj.Name}, newObj)).To(gomega.Succeed()) if update.checkFunc != nil { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: obj.Name}, newObj)).To(gomega.Succeed()) update.checkFunc(ctx, k8sClient, newObj) } } @@ -83,13 +84,24 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return k8sClient.Create(ctx, nodeTracker) }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b").Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + return wrapper.MakeTorrent("qwen2-7b").Preheat(false).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() }, updates: []*update{ { updateFunc: func(torrent *api.Torrent) { + // Preheat: false gomega.Expect(k8sClient.Create(ctx, torrent)).To(gomega.Succeed()) }, + checkFunc: func(ctx context.Context, k8sClient client.Client, torrent *api.Torrent) { + gomega.Expect(len(torrent.Status.Conditions)).To(gomega.Equal(0)) + validation.ValidateReplicationsNumberEqualTo(ctx, k8sClient, torrent, 0) + }, + }, + { + updateFunc: func(torrent *api.Torrent) { + torrent.Spec.Preheat = ptr.To[bool](true) + gomega.Expect(k8sClient.Update(ctx, torrent)).To(gomega.Succeed()) + }, checkFunc: func(ctx context.Context, k8sClient client.Client, torrent *api.Torrent) { validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.PendingConditionType, "Pending", metav1.ConditionTrue, nil) validation.ValidateReplicationsNumberEqualTo(ctx, k8sClient, torrent, util.TorrentChunkNumber(torrent)) @@ -121,7 +133,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return k8sClient.Create(ctx, nodeTracker) }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b-gguf").Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() + return wrapper.MakeTorrent("qwen2-7b-gguf").Preheat(true).Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() }, updates: []*update{ { @@ -159,7 +171,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return k8sClient.Create(ctx, nodeTracker) }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b").Replicas(3).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + return wrapper.MakeTorrent("qwen2-7b").Preheat(true).Replicas(3).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() }, updates: []*update{ { @@ -203,7 +215,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return nil }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b").Replicas(3).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + return wrapper.MakeTorrent("qwen2-7b").Preheat(true).Replicas(3).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() }, updates: []*update{ { @@ -247,7 +259,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return nil }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b").Replicas(1).NodeSelector("zone", "zone1").Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + return wrapper.MakeTorrent("qwen2-7b").Preheat(true).Replicas(1).NodeSelector("zone", "zone1").Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() }, updates: []*update{ { @@ -291,7 +303,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return nil }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b").Replicas(1).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + return wrapper.MakeTorrent("qwen2-7b").Preheat(true).Replicas(1).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() }, updates: []*update{ { @@ -335,7 +347,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return nil }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b-gguf").Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() + return wrapper.MakeTorrent("qwen2-7b-gguf").Preheat(true).Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() }, updates: []*update{ { @@ -375,6 +387,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { }, makeTorrent: func() *api.Torrent { return wrapper.MakeTorrent("qwen2-7b"). + Preheat(true). Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", ""). ReclaimPolicy(api.DeleteReclaimPolicy). Obj() @@ -433,7 +446,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return k8sClient.Create(ctx, nodeTracker) }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b-gguf").Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() + return wrapper.MakeTorrent("qwen2-7b-gguf").Preheat(true).Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() }, updates: []*update{ { @@ -466,7 +479,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { { updateFunc: func(torrent *api.Torrent) { // Recreate the same model torrent. - newTorrent := wrapper.MakeTorrent("qwen2-7b-gguf-2").Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() + newTorrent := wrapper.MakeTorrent("qwen2-7b-gguf-2").Preheat(true).Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").Obj() gomega.Expect(k8sClient.Create(ctx, newTorrent)).To(gomega.Succeed()) }, checkFunc: func(ctx context.Context, k8sClient client.Client, torrent *api.Torrent) { @@ -483,7 +496,7 @@ var _ = ginkgo.Describe("Torrent controller test", func() { return k8sClient.Create(ctx, nodeTracker) }, makeTorrent: func() *api.Torrent { - return wrapper.MakeTorrent("qwen2-7b-gguf").Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").NodeSelector("zone", "zone1").Obj() + return wrapper.MakeTorrent("qwen2-7b-gguf").Preheat(true).Hub("Huggingface", "Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf").NodeSelector("zone", "zone1").Obj() }, updates: []*update{ { @@ -499,5 +512,36 @@ var _ = ginkgo.Describe("Torrent controller test", func() { }, }, }), + ginkgo.Entry("Torrent with ttl is zero", &testValidatingCase{ + precondition: func() error { + nodeTracker := wrapper.MakeNodeTracker("node1").Obj() + return k8sClient.Create(ctx, nodeTracker) + }, + makeTorrent: func() *api.Torrent { + return wrapper.MakeTorrent("qwen2-7b").Preheat(true).TTL(0).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + }, + updates: []*update{ + { + updateFunc: func(torrent *api.Torrent) { + gomega.Expect(k8sClient.Create(ctx, torrent)).To(gomega.Succeed()) + }, + }, + { + updateFunc: func(torrent *api.Torrent) { + util.UpdateReplicationsCondition(ctx, k8sClient, torrent, api.ReplicateConditionType) + }, + checkFunc: func(ctx context.Context, k8sClient client.Client, torrent *api.Torrent) { + validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.ReplicateConditionType, "Replicating", metav1.ConditionTrue, nil) + }, + }, + { + updateFunc: func(torrent *api.Torrent) { + util.UpdateReplicationsCondition(ctx, k8sClient, torrent, api.ReadyConditionType) + // Once ready, Torrent will be deleted immediately. + validation.ValidateTorrentNotExist(ctx, k8sClient, torrent) + }, + }, + }, + }), ) }) diff --git a/test/integration/webhook/torrent_test.go b/test/integration/webhook/torrent_test.go index e0d4ccd..7bdf8c6 100644 --- a/test/integration/webhook/torrent_test.go +++ b/test/integration/webhook/torrent_test.go @@ -19,6 +19,8 @@ package webhook import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" api "github.com/inftyai/manta/api/v1alpha1" "github.com/inftyai/manta/test/util/wrapper" @@ -37,34 +39,85 @@ var _ = ginkgo.Describe("Torrent default and validation", func() { }) type testValidatingCase struct { - torrent func() *api.Torrent - failed bool + creationFunc func() *api.Torrent + createFailed bool + updateFunc func(*api.Torrent) *api.Torrent + updateFiled bool } ginkgo.DescribeTable("test validating", func(tc *testValidatingCase) { - if tc.failed { - gomega.Expect(k8sClient.Create(ctx, tc.torrent())).Should(gomega.HaveOccurred()) + torrent := tc.creationFunc() + err := k8sClient.Create(ctx, torrent) + + if tc.createFailed { + gomega.Expect(err).To(gomega.HaveOccurred()) + return } else { - gomega.Expect(k8sClient.Create(ctx, tc.torrent())).To(gomega.Succeed()) + gomega.Expect(err).To(gomega.Succeed()) + } + + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: torrent.Name, Namespace: torrent.Namespace}, torrent)).Should(gomega.Succeed()) + + if tc.updateFunc != nil { + err = k8sClient.Update(ctx, tc.updateFunc(torrent)) + if tc.updateFiled { + gomega.Expect(err).To(gomega.HaveOccurred()) + } else { + gomega.Expect(err).To(gomega.Succeed()) + } } }, ginkgo.Entry("torrent hub set", &testValidatingCase{ - torrent: func() *api.Torrent { + creationFunc: func() *api.Torrent { return wrapper.MakeTorrent("download-qwen").Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() }, - failed: false, + createFailed: false, }), ginkgo.Entry("torrent hub not set", &testValidatingCase{ - torrent: func() *api.Torrent { + creationFunc: func() *api.Torrent { return wrapper.MakeTorrent("download-qwen").Obj() }, - failed: true, + createFailed: true, }), ginkgo.Entry("unknown hub not supported", &testValidatingCase{ - torrent: func() *api.Torrent { + creationFunc: func() *api.Torrent { return wrapper.MakeTorrent("download-qwen").Hub("ModelScope", "Qwen/Qwen2-7B-Instruct", "").Obj() }, - failed: true, + createFailed: true, + }), + ginkgo.Entry("preheat from false to true should be succeeded", &testValidatingCase{ + creationFunc: func() *api.Torrent { + return wrapper.MakeTorrent("download-qwen").Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + }, + createFailed: false, + updateFunc: func(torrent *api.Torrent) *api.Torrent { + torrent.Spec.Preheat = ptr.To[bool](true) + return torrent + }, + updateFiled: false, + }), + ginkgo.Entry("preheat from true to false should be failed", &testValidatingCase{ + creationFunc: func() *api.Torrent { + return wrapper.MakeTorrent("download-qwen").Preheat(true).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + }, + createFailed: false, + updateFunc: func(torrent *api.Torrent) *api.Torrent { + torrent.Spec.Preheat = ptr.To[bool](false) + return torrent + }, + updateFiled: true, + }), + ginkgo.Entry("ttlSecondsAfterReady is 0", &testValidatingCase{ + creationFunc: func() *api.Torrent { + return wrapper.MakeTorrent("download-qwen").Preheat(true).TTL(0).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + }, + createFailed: false, + }), + ginkgo.Entry("ttlSecondsAfterReady not nil or 0", &testValidatingCase{ + creationFunc: func() *api.Torrent { + return wrapper.MakeTorrent("download-qwen").Preheat(true).TTL(1).Hub("Huggingface", "Qwen/Qwen2-7B-Instruct", "").Obj() + }, + createFailed: true, }), ) }) diff --git a/test/util/validation/validate_torrent.go b/test/util/validation/validate_torrent.go index c98e104..ee5082c 100644 --- a/test/util/validation/validate_torrent.go +++ b/test/util/validation/validate_torrent.go @@ -149,15 +149,17 @@ func ValidateReplicationsNumberEqualTo(ctx context.Context, k8sClient client.Cli }, util.Timeout, util.Interval).Should(gomega.BeTrue()) } -func ValidateNodeTrackerChunkNumberEqualTo(ctx context.Context, k8sClient client.Client, nodeTrackerName string, number int) { +func ValidateNodeTrackerChunkNumberEqualTo(ctx context.Context, k8sClient client.Client, number int, nodeTrackerNames ...string) { gomega.Eventually(func() error { - nt := &api.NodeTracker{} - if err := k8sClient.Get(ctx, types.NamespacedName{Name: nodeTrackerName}, nt); err != nil { - return err - } - if len(nt.Spec.Chunks) != number { - return fmt.Errorf("unexpected chunk number, want %d, got %d", number, len(nt.Spec.Chunks)) + for _, name := range nodeTrackerNames { + nt := &api.NodeTracker{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: name}, nt); err != nil { + return err + } + if len(nt.Spec.Chunks) != number { + return fmt.Errorf("unexpected chunk number, want %d, got %d", number, len(nt.Spec.Chunks)) + } } return nil - }, util.Timeout*3, util.Interval).Should(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) } diff --git a/test/util/wrapper/torrent.go b/test/util/wrapper/torrent.go index d77a5de..9a97851 100644 --- a/test/util/wrapper/torrent.go +++ b/test/util/wrapper/torrent.go @@ -17,6 +17,8 @@ limitations under the License. package wrapper import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" api "github.com/inftyai/manta/api/v1alpha1" @@ -71,3 +73,14 @@ func (w *TorrentWrapper) NodeSelector(k, v string) *TorrentWrapper { w.Spec.NodeSelector[k] = v return w } + +func (w *TorrentWrapper) Preheat(yesOrNo bool) *TorrentWrapper { + w.Spec.Preheat = &yesOrNo + return w +} + +func (w *TorrentWrapper) TTL(number int32) *TorrentWrapper { + ttl := time.Duration(number) * time.Second + w.Spec.TTLSecondsAfterReady = &ttl + return w +}