Skip to content
This repository was archived by the owner on Nov 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ _Name Story: the inspiration of the name `Manta` is coming from Dota2, called [M

## Features Overview

- **Model Hub Support**: Models could be downloaded directly from model hubs (Huggingface etc.) or object storages, no other efforts.
- **Model Hub Support**: Models could be downloaded directly from model hubs (Huggingface etc.) or object storages, no other effort.
- **Model Preheat**: Models could be preloaded to clusters, or specified nodes to accelerate the model serving.
- **Model Cache**: Models will be cached as chunks after downloading for faster model loading.
- **Model Lifecycle Management**: Model lifecycle is managed automatically with different strategies, like `Retain` or `Delete`.
Expand Down Expand Up @@ -97,9 +97,11 @@ More details refer to the [APIs](https://github.com/InftyAI/Manta/blob/main/api/

## Roadmap

In the long term, we hope to make Manta **an unified cache system within MLOps**.

- Preloading datasets from model hubs
- RDMA support for faster model loading
- More integrations with serving projects
- More integrations with MLOps system, including training and serving

## Community

Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/replication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Target struct {
// - oss://<bucket>.<endpoint>/<path-to-your-file>
// - localhost://<path-to-your-file>
// - remote://<node-name>@<path-to-your-file>
// Localhost means the local host path, remote means the host path of the provided node.
// Note: if it's a folder, all the files under the folder will be considered,
// otherwise, only one file will be replicated.
URI *string `json:"uri,omitempty"`
Expand Down Expand Up @@ -80,6 +81,7 @@ type ReplicationStatus struct {
//+kubebuilder:resource:scope=Cluster
//+kubebuilder:printcolumn:name="node",type=string,JSONPath=".spec.nodeName"
//+kubebuilder:printcolumn:name="phase",type=string,JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type=date,JSONPath=".metadata.creationTimestamp"

// Replication is the Schema for the replications API
type Replication struct {
Expand Down
13 changes: 13 additions & 0 deletions api/v1alpha1/torrent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v1alpha1

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -65,6 +67,12 @@ const (

// TorrentSpec defines the desired state of Torrent
type TorrentSpec struct {
// Preheat represents whether we should preload the model.
// Preheat can only be transitioned from false to true, not the other way around.
// +kubebuilder:default=true
// +optional
Preheat *bool `json:"preheat,omitempty"`

// Hub represents the model registry for model downloads.
// Hub and URI are exclusive.
// +optional
Expand All @@ -88,6 +96,11 @@ type TorrentSpec struct {
// +kubebuilder:validation:Enum={Retain,Delete}
// +optional
ReclaimPolicy *ReclaimPolicy `json:"reclaimPolicy,omitempty"`
// TTLSecondsAfterReady represents the waiting time to delete the Torrent once Ready.
// Default to nil indicates Torrent will not be deleted.
// TODO: We only support nil and 0 right now.
// +optional
TTLSecondsAfterReady *time.Duration `json:"ttlSecondsAfterReady,omitempty"`
// NodeSelector represents the node constraints to download the chunks.
// It can be used to download the model to a specified node for preheating.
// +optional
Expand Down
11 changes: 11 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 10 additions & 6 deletions config/crd/bases/manta.io_replications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ spec:
uri:
description: "URI represents the file address with different storages,
e.g.:\n\t - oss://<bucket>.<endpoint>/<path-to-your-file>\n\t
- localhost://<path-to-your-file>\n\t - remote://<node-name>@<path-to-your-file>\nNote:
if it's a folder, all the files under the folder will be considered,\notherwise,
only one file will be replicated."
- localhost://<path-to-your-file>\n\t - remote://<node-name>@<path-to-your-file>\nLocalhost
means the local host path, remote means the host path of the
provided node.\nNote: if it's a folder, all the files under
the folder will be considered,\notherwise, only one file will
be replicated."
type: string
type: object
nodeName:
Expand Down Expand Up @@ -139,9 +141,11 @@ spec:
uri:
description: "URI represents the file address with different storages,
e.g.:\n\t - oss://<bucket>.<endpoint>/<path-to-your-file>\n\t
- localhost://<path-to-your-file>\n\t - remote://<node-name>@<path-to-your-file>\nNote:
if it's a folder, all the files under the folder will be considered,\notherwise,
only one file will be replicated."
- localhost://<path-to-your-file>\n\t - remote://<node-name>@<path-to-your-file>\nLocalhost
means the local host path, remote means the host path of the
provided node.\nNote: if it's a folder, all the files under
the folder will be considered,\notherwise, only one file will
be replicated."
type: string
type: object
required:
Expand Down
12 changes: 12 additions & 0 deletions config/crd/bases/manta.io_torrents.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ spec:
NodeSelector represents the node constraints to download the chunks.
It can be used to download the model to a specified node for preheating.
type: object
preheat:
default: true
description: |-
Preheat represents whether we should preload the model.
Preheat can only be transitioned from false to true, not the other way around.
type: boolean
reclaimPolicy:
default: Retain
description: |-
Expand All @@ -99,6 +105,12 @@ spec:
description: Replicas represents the replication number of each object.
format: int32
type: integer
ttlSecondsAfterReady:
description: |-
TTLSecondsAfterReady represents the waiting time to delete the Torrent once Ready.
Default to nil indicates Torrent will not be deleted.
format: int64
type: integer
type: object
status:
description: TorrentStatus defines the observed state of Torrent
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)

logger.Info("reconcile Replication", "Replication", klog.KObj(replication))

// Leave the left reconciliation to agent controller.
if setReplicationCondition(replication) {
return ctrl.Result{}, r.Status().Update(ctx, replication)
}
Expand All @@ -70,7 +71,6 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// Only watch for create events.

func (r *ReplicationReconciler) Create(e event.CreateEvent) bool {
return true
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/controller/torrent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -74,6 +75,12 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

logger.Info("reconcile Torrent")

// Noe need to handle Torrent at this point just because we don't want to
// download the files.
if torrent.Spec.Preheat != nil && !*torrent.Spec.Preheat {
return ctrl.Result{}, nil
}

// TODO: delete torrent at anytime.
if torrentReady(torrent) && torrentDeleting(torrent) {
logger.Info("start to handle torrent deletion")
Expand Down Expand Up @@ -102,6 +109,7 @@ func (r *TorrentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Error(err, "failed to handle creation")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

nodeTrackers := &api.NodeTrackerList{}
Expand Down Expand Up @@ -196,6 +204,15 @@ func (r *TorrentReconciler) handleDeletion(ctx context.Context, torrent *api.Tor
}

func (r *TorrentReconciler) handleReady(ctx context.Context, torrent *api.Torrent) error {
// TODO: once ttl supports other values than 0, we need to refactor here.
if torrent.Spec.TTLSecondsAfterReady != nil && *torrent.Spec.TTLSecondsAfterReady == time.Duration(0) {
// Corresponding Replications will be deleted as well.
if err := r.Client.Delete(ctx, torrent); err != nil {
return err
}
return nil
}

replications, err := r.replications(ctx, torrent)
if err != nil {
return err
Expand Down Expand Up @@ -299,6 +316,7 @@ func (r *TorrentReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) (changed bool) {
// Set to Pending condition.
if torrent.Status.Repo == nil {
condition := metav1.Condition{
Type: api.PendingConditionType,
Expand All @@ -309,6 +327,7 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) (
return setTorrentConditionTo(torrent, condition)
}

// Set to Reclaiming condition.
if torrentReady(torrent) && torrentDeleting(torrent) {
condition := metav1.Condition{
Type: api.ReclaimingConditionType,
Expand All @@ -323,6 +342,7 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) (
return false
}

// Set to Ready condition.
if apimeta.IsStatusConditionTrue(torrent.Status.Conditions, api.ReplicateConditionType) && replicationsReady(replications) {
condition := metav1.Condition{
Type: api.ReadyConditionType,
Expand All @@ -333,6 +353,7 @@ func setTorrentCondition(torrent *api.Torrent, replications []api.Replication) (
return setTorrentConditionTo(torrent, condition)
}

// Set to Replicating condition.
if torrentDownloading(replications) {
condition := metav1.Condition{
Type: api.ReplicateConditionType,
Expand Down
2 changes: 1 addition & 1 deletion pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func buildSyncReplication(torrent *api.Torrent, chunk framework.ChunkInfo, sourc
func buildDeletionReplication(torrent *api.Torrent, chunk framework.ChunkInfo, nodeName string) *api.Replication {
repoName := hubRepoName(torrent.Spec.Hub)
generatedName := util.GenerateName(nodeName)
name := chunk.Name + "--" + generatedName
name := chunk.Name + "--" + generatedName + "--" + "d"

return &api.Replication{
TypeMeta: v1.TypeMeta{
Expand Down
15 changes: 14 additions & 1 deletion pkg/webhook/torrent_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package webhook

import (
"context"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand Down Expand Up @@ -60,7 +61,15 @@ func (w *TorrentWebhook) ValidateCreate(ctx context.Context, obj runtime.Object)

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (w *TorrentWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) {
allErrs := w.generateValidate(newObj)
old := oldObj.(*api.Torrent)
new := newObj.(*api.Torrent)

var allErrs field.ErrorList
specPath := field.NewPath("spec")
if *old.Spec.Preheat && !*new.Spec.Preheat {
allErrs = append(allErrs, field.Forbidden(specPath.Child("preheat"), "preheat can only be transitioned from false to true"))
}
allErrs = append(allErrs, w.generateValidate(newObj)...)
return nil, allErrs.ToAggregate()
}

Expand All @@ -78,5 +87,9 @@ func (w *TorrentWebhook) generateValidate(obj runtime.Object) field.ErrorList {
allErrs = append(allErrs, field.Forbidden(specPath.Child("hub"), "hub can't be null"))
}

if !(torrent.Spec.TTLSecondsAfterReady == nil || *torrent.Spec.TTLSecondsAfterReady == time.Duration(0)) {
allErrs = append(allErrs, field.Forbidden(specPath.Child("ttlSecondsAfterReady"), "only support nil and 0 right now"))
}

return allErrs
}
22 changes: 14 additions & 8 deletions test/e2e/torrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var _ = ginkgo.Describe("torrent e2e test", func() {
defer func() {
gomega.Expect(k8sClient.Delete(ctx, torrent)).To(gomega.Succeed())
validation.ValidateTorrentNotExist(ctx, k8sClient, torrent)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3")
}()

validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.ReadyConditionType, "Ready", metav1.ConditionTrue, &validation.ValidateOptions{Timeout: 5 * time.Minute})
Expand All @@ -63,14 +64,14 @@ var _ = ginkgo.Describe("torrent e2e test", func() {
defer func() {
gomega.Expect(k8sClient.Delete(ctx, torrent)).To(gomega.Succeed())
validation.ValidateTorrentNotExist(ctx, k8sClient, torrent)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 0)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3")
}()

validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.ReadyConditionType, "Ready", metav1.ConditionTrue, &validation.ValidateOptions{Timeout: 5 * time.Minute})
validation.ValidateAllReplicationsNodeNameEqualTo(ctx, k8sClient, torrent, "kind-worker2")
validation.ValidateReplicationsNumberEqualTo(ctx, k8sClient, torrent, 0)
// From https://huggingface.co/facebook/opt-125m/tree/main, opt-125m has 12 files.
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 12)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 12, "kind-worker2")
})

ginkgo.It("Sync the models successfully", func() {
Expand All @@ -79,9 +80,7 @@ var _ = ginkgo.Describe("torrent e2e test", func() {
defer func() {
gomega.Expect(k8sClient.Delete(ctx, torrent)).To(gomega.Succeed())
validation.ValidateTorrentNotExist(ctx, k8sClient, torrent)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker", 0)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 0)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker3", 0)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3")
}()

validation.ValidateTorrentStatusEqualTo(ctx, k8sClient, torrent, api.ReadyConditionType, "Ready", metav1.ConditionTrue, &validation.ValidateOptions{Timeout: 5 * time.Minute})
Expand All @@ -97,8 +96,15 @@ var _ = ginkgo.Describe("torrent e2e test", func() {

// We have three nodes.
// From https://huggingface.co/facebook/opt-125m/tree/main, opt-125m has 12 files.
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker", 12)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker2", 12)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, "kind-worker3", 12)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 12, "kind-worker", "kind-worker2", "kind-worker3")
})

ginkgo.It("Torrent will be auto GCed with TTLSecondsAfterReady", func() {
torrent := wrapper.MakeTorrent("facebook-opt-125m").TTL(0).Hub("Huggingface", "facebook/opt-125m", "").ReclaimPolicy(api.DeleteReclaimPolicy).Obj()
gomega.Expect(k8sClient.Create(ctx, torrent)).To(gomega.Succeed())

validation.ValidateTorrentNotExist(ctx, k8sClient, torrent)
validation.ValidateReplicationsNumberEqualTo(ctx, k8sClient, torrent, 0)
validation.ValidateNodeTrackerChunkNumberEqualTo(ctx, k8sClient, 0, "kind-worker", "kind-worker2", "kind-worker3")
})
})
Loading
Loading