diff --git a/api/v1alpha1/alluxioruntime_types.go b/api/v1alpha1/alluxioruntime_types.go index 4ee782859ba..fd2a9b933e4 100644 --- a/api/v1alpha1/alluxioruntime_types.go +++ b/api/v1alpha1/alluxioruntime_types.go @@ -148,6 +148,13 @@ type AlluxioFuseSpec struct { // +kubebuilder:validation:Enum=HostNetwork;"";ContainerNetwork // +optional NetworkMode NetworkMode `json:"networkMode,omitempty"` + + // LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy + // +kubebuilder:validation:Enum="";Lazy;Eager + // +kubebuilder:default:=Lazy + // +optional + LaunchMode FuseLaunchMode `json:"launchMode,omitempty"` + // VolumeMounts specifies the volumes listed in ".spec.volumes" to mount into the alluxio runtime component's filesystem. // +optional VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` diff --git a/api/v1alpha1/constant.go b/api/v1alpha1/constant.go index 7cd5c64dd6f..7fb9a726613 100644 --- a/api/v1alpha1/constant.go +++ b/api/v1alpha1/constant.go @@ -53,3 +53,13 @@ const ( // OnRuntimeDeletedCleanPolicy cleans fuse pod only when the cache runtime is deleted OnRuntimeDeletedCleanPolicy FuseCleanPolicy = "OnRuntimeDeleted" ) + +type FuseLaunchMode string + +const ( + // LazyMode is the default launch mode, which will launch fuse pods by the application pods + LazyMode FuseLaunchMode = "Lazy" + + // EagerMode is the optional launch mode, which will launch fuse pods by deploy fuse statefulset actively + EagerMode FuseLaunchMode = "Eager" +) diff --git a/api/v1alpha1/efcruntime_types.go b/api/v1alpha1/efcruntime_types.go index 02fa4b3de5b..a86168f9af4 100644 --- a/api/v1alpha1/efcruntime_types.go +++ b/api/v1alpha1/efcruntime_types.go @@ -124,6 +124,12 @@ type EFCFuseSpec struct { // +optional NetworkMode NetworkMode `json:"networkMode,omitempty"` + // LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy + // +kubebuilder:validation:Enum="";Lazy;Eager + // +kubebuilder:default:=Lazy + // +optional + LaunchMode FuseLaunchMode `json:"launchMode,omitempty"` + // PodMetadata defines labels and annotations that will be propagated to EFC's fuse pods // +optional PodMetadata PodMetadata `json:"podMetadata,omitempty"` diff --git a/api/v1alpha1/goosefsruntime_types.go b/api/v1alpha1/goosefsruntime_types.go index ff65e124018..806d863adff 100644 --- a/api/v1alpha1/goosefsruntime_types.go +++ b/api/v1alpha1/goosefsruntime_types.go @@ -113,6 +113,12 @@ type GooseFSFuseSpec struct { // +optional CleanPolicy FuseCleanPolicy `json:"cleanPolicy,omitempty"` + // LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy + // +kubebuilder:validation:Enum="";Lazy;Eager + // +kubebuilder:default:=Lazy + // +optional + LaunchMode FuseLaunchMode `json:"launchMode,omitempty"` + // Annotations is an unstructured key value map stored with a resource that may be // set by external tools to store and retrieve arbitrary metadata. They are not // queryable and should be preserved when modifying objects. diff --git a/api/v1alpha1/jindoruntime_types.go b/api/v1alpha1/jindoruntime_types.go index 8d5a305bd14..71f9e419d65 100644 --- a/api/v1alpha1/jindoruntime_types.go +++ b/api/v1alpha1/jindoruntime_types.go @@ -141,6 +141,12 @@ type JindoFuseSpec struct { // +optional CleanPolicy FuseCleanPolicy `json:"cleanPolicy,omitempty"` + // LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy + // +kubebuilder:validation:Enum="";Lazy;Eager + // +kubebuilder:default:=Lazy + // +optional + LaunchMode FuseLaunchMode `json:"launchMode,omitempty"` + // If disable JindoFS fuse // +optional Disabled bool `json:"disabled,omitempty"` diff --git a/api/v1alpha1/juicefsruntime_types.go b/api/v1alpha1/juicefsruntime_types.go index 917789369b3..49faed75690 100644 --- a/api/v1alpha1/juicefsruntime_types.go +++ b/api/v1alpha1/juicefsruntime_types.go @@ -172,6 +172,12 @@ type JuiceFSFuseSpec struct { // +kubebuilder:validation:Enum=HostNetwork;"";ContainerNetwork // +optional NetworkMode NetworkMode `json:"networkMode,omitempty"` + + // LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy + // +kubebuilder:validation:Enum="";Lazy;Eager + // +kubebuilder:default:=Lazy + // +optional + LaunchMode FuseLaunchMode `json:"launchMode,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index 394a2a3e0cb..a4a3e0ac52d 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -503,6 +503,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_AlluxioFuseSpec(ref common.Refe Format: "", }, }, + "launchMode": { + SchemaProps: spec.SchemaProps{ + Description: "LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy", + Type: []string{"string"}, + Format: "", + }, + }, "volumeMounts": { SchemaProps: spec.SchemaProps{ Description: "VolumeMounts specifies the volumes listed in \".spec.volumes\" to mount into the alluxio runtime component's filesystem.", @@ -2468,6 +2475,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_EFCFuseSpec(ref common.Referenc Format: "", }, }, + "launchMode": { + SchemaProps: spec.SchemaProps{ + Description: "LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy", + Type: []string{"string"}, + Format: "", + }, + }, "podMetadata": { SchemaProps: spec.SchemaProps{ Description: "PodMetadata defines labels and annotations that will be propagated to EFC's fuse pods", @@ -3049,6 +3063,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_GooseFSFuseSpec(ref common.Refe Format: "", }, }, + "launchMode": { + SchemaProps: spec.SchemaProps{ + Description: "LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy", + Type: []string{"string"}, + Format: "", + }, + }, "annotations": { SchemaProps: spec.SchemaProps{ Description: "Annotations is an unstructured key value map stored with a resource that may be set by external tools to store and retrieve arbitrary metadata. They are not queryable and should be preserved when modifying objects. More info: http://kubernetes.io/docs/user-guide/annotations", @@ -3747,6 +3768,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_JindoFuseSpec(ref common.Refere Format: "", }, }, + "launchMode": { + SchemaProps: spec.SchemaProps{ + Description: "LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy", + Type: []string{"string"}, + Format: "", + }, + }, "disabled": { SchemaProps: spec.SchemaProps{ Description: "If disable JindoFS fuse", @@ -4317,6 +4345,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_JuiceFSFuseSpec(ref common.Refe Format: "", }, }, + "launchMode": { + SchemaProps: spec.SchemaProps{ + Description: "LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, @@ -6200,6 +6235,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_ThinFuseSpec(ref common.Referen Format: "", }, }, + "launchMode": { + SchemaProps: spec.SchemaProps{ + Description: "LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy", + Type: []string{"string"}, + Format: "", + }, + }, "livenessProbe": { SchemaProps: spec.SchemaProps{ Description: "livenessProbe of thin fuse pod", @@ -6780,6 +6822,13 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_VineyardClientSocketSpec(ref co Format: "", }, }, + "launchMode": { + SchemaProps: spec.SchemaProps{ + Description: "LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy", + Type: []string{"string"}, + Format: "", + }, + }, "podMetadata": { SchemaProps: spec.SchemaProps{ Description: "PodMetadata defines labels and annotations that will be propagated to Vineyard's pods.", diff --git a/api/v1alpha1/thinruntime_types.go b/api/v1alpha1/thinruntime_types.go index 8b68ed18eb0..63ed63a6d3e 100644 --- a/api/v1alpha1/thinruntime_types.go +++ b/api/v1alpha1/thinruntime_types.go @@ -180,6 +180,12 @@ type ThinFuseSpec struct { // +optional NetworkMode NetworkMode `json:"networkMode,omitempty"` + // LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy + // +kubebuilder:validation:Enum="";Lazy;Eager + // +kubebuilder:default:=Lazy + // +optional + LaunchMode FuseLaunchMode `json:"launchMode,omitempty"` + // livenessProbe of thin fuse pod // +optional LivenessProbe *corev1.Probe `json:"livenessProbe,omitempty"` diff --git a/api/v1alpha1/vineyardruntime_types.go b/api/v1alpha1/vineyardruntime_types.go index 26c17cc5f80..ad33e284807 100644 --- a/api/v1alpha1/vineyardruntime_types.go +++ b/api/v1alpha1/vineyardruntime_types.go @@ -193,6 +193,12 @@ type VineyardClientSocketSpec struct { // +optional NetworkMode NetworkMode `json:"networkMode,omitempty"` + // LaunchMode specifies the launch mode of fuse pod, Lazy/Eager, default to Lazy + // +kubebuilder:validation:Enum="";Lazy;Eager + // +kubebuilder:default:=Lazy + // +optional + LaunchMode FuseLaunchMode `json:"launchMode,omitempty"` + // PodMetadata defines labels and annotations that will be propagated to Vineyard's pods. // +optional PodMetadata PodMetadata `json:"podMetadata,omitempty"` diff --git a/charts/fluid/fluid/crds/data.fluid.io_alluxioruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_alluxioruntimes.yaml index f5fcd1175f6..c8702e37d83 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_alluxioruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_alluxioruntimes.yaml @@ -356,6 +356,15 @@ spec: items: type: string type: array + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: Whether to use hostnetwork or not enum: diff --git a/charts/fluid/fluid/crds/data.fluid.io_efcruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_efcruntimes.yaml index 524c737a8cc..eacd7ceff06 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_efcruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_efcruntimes.yaml @@ -115,6 +115,15 @@ spec: OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted Defaults to OnRuntimeDeleted type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: Whether to use hostnetwork or not enum: diff --git a/charts/fluid/fluid/crds/data.fluid.io_goosefsruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_goosefsruntimes.yaml index 715b6138754..9746d6c84cd 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_goosefsruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_goosefsruntimes.yaml @@ -285,6 +285,15 @@ spec: items: type: string type: array + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string nodeSelector: additionalProperties: type: string diff --git a/charts/fluid/fluid/crds/data.fluid.io_jindoruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_jindoruntimes.yaml index 5791649ec62..38f9dab9cb3 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_jindoruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_jindoruntimes.yaml @@ -163,6 +163,15 @@ spec: DEPRECATED: this is a deprecated field. Please use PodMetadata.Labels instead. Note: this field is set to be exclusive with PodMetadata.Labels type: object + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string logConfig: additionalProperties: type: string diff --git a/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml index 88c1d001914..453c6f584e0 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_juicefsruntimes.yaml @@ -239,6 +239,15 @@ spec: imageTag: description: Image for JuiceFS fuse type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: Whether to use hostnetwork or not enum: diff --git a/charts/fluid/fluid/crds/data.fluid.io_thinruntimeprofiles.yaml b/charts/fluid/fluid/crds/data.fluid.io_thinruntimeprofiles.yaml index ddbdfe82f74..1936bd65874 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_thinruntimeprofiles.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_thinruntimeprofiles.yaml @@ -205,6 +205,15 @@ spec: imageTag: description: Image for thinRuntime fuse type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string livenessProbe: description: livenessProbe of thin fuse pod properties: diff --git a/charts/fluid/fluid/crds/data.fluid.io_thinruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_thinruntimes.yaml index 90d234d6261..b4aac967707 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_thinruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_thinruntimes.yaml @@ -206,6 +206,15 @@ spec: imageTag: description: Image for thinRuntime fuse type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string livenessProbe: description: livenessProbe of thin fuse pod properties: diff --git a/charts/fluid/fluid/crds/data.fluid.io_vineyardruntimes.yaml b/charts/fluid/fluid/crds/data.fluid.io_vineyardruntimes.yaml index 4cc9bcf62a6..fe4b66928a7 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_vineyardruntimes.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_vineyardruntimes.yaml @@ -128,6 +128,15 @@ spec: Image Tag for Vineyard Fuse Default is `v0.22.2` type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: |- Whether to use hostnetwork or not diff --git a/config/crd/bases/data.fluid.io_alluxioruntimes.yaml b/config/crd/bases/data.fluid.io_alluxioruntimes.yaml index f5fcd1175f6..c8702e37d83 100644 --- a/config/crd/bases/data.fluid.io_alluxioruntimes.yaml +++ b/config/crd/bases/data.fluid.io_alluxioruntimes.yaml @@ -356,6 +356,15 @@ spec: items: type: string type: array + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: Whether to use hostnetwork or not enum: diff --git a/config/crd/bases/data.fluid.io_efcruntimes.yaml b/config/crd/bases/data.fluid.io_efcruntimes.yaml index 524c737a8cc..eacd7ceff06 100644 --- a/config/crd/bases/data.fluid.io_efcruntimes.yaml +++ b/config/crd/bases/data.fluid.io_efcruntimes.yaml @@ -115,6 +115,15 @@ spec: OnRuntimeDeleted cleans fuse pod only when the cache runtime is deleted Defaults to OnRuntimeDeleted type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: Whether to use hostnetwork or not enum: diff --git a/config/crd/bases/data.fluid.io_goosefsruntimes.yaml b/config/crd/bases/data.fluid.io_goosefsruntimes.yaml index 715b6138754..9746d6c84cd 100644 --- a/config/crd/bases/data.fluid.io_goosefsruntimes.yaml +++ b/config/crd/bases/data.fluid.io_goosefsruntimes.yaml @@ -285,6 +285,15 @@ spec: items: type: string type: array + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/data.fluid.io_jindoruntimes.yaml b/config/crd/bases/data.fluid.io_jindoruntimes.yaml index 5791649ec62..38f9dab9cb3 100644 --- a/config/crd/bases/data.fluid.io_jindoruntimes.yaml +++ b/config/crd/bases/data.fluid.io_jindoruntimes.yaml @@ -163,6 +163,15 @@ spec: DEPRECATED: this is a deprecated field. Please use PodMetadata.Labels instead. Note: this field is set to be exclusive with PodMetadata.Labels type: object + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string logConfig: additionalProperties: type: string diff --git a/config/crd/bases/data.fluid.io_juicefsruntimes.yaml b/config/crd/bases/data.fluid.io_juicefsruntimes.yaml index 88c1d001914..453c6f584e0 100644 --- a/config/crd/bases/data.fluid.io_juicefsruntimes.yaml +++ b/config/crd/bases/data.fluid.io_juicefsruntimes.yaml @@ -239,6 +239,15 @@ spec: imageTag: description: Image for JuiceFS fuse type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: Whether to use hostnetwork or not enum: diff --git a/config/crd/bases/data.fluid.io_thinruntimeprofiles.yaml b/config/crd/bases/data.fluid.io_thinruntimeprofiles.yaml index ddbdfe82f74..1936bd65874 100644 --- a/config/crd/bases/data.fluid.io_thinruntimeprofiles.yaml +++ b/config/crd/bases/data.fluid.io_thinruntimeprofiles.yaml @@ -205,6 +205,15 @@ spec: imageTag: description: Image for thinRuntime fuse type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string livenessProbe: description: livenessProbe of thin fuse pod properties: diff --git a/config/crd/bases/data.fluid.io_thinruntimes.yaml b/config/crd/bases/data.fluid.io_thinruntimes.yaml index 90d234d6261..b4aac967707 100644 --- a/config/crd/bases/data.fluid.io_thinruntimes.yaml +++ b/config/crd/bases/data.fluid.io_thinruntimes.yaml @@ -206,6 +206,15 @@ spec: imageTag: description: Image for thinRuntime fuse type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string livenessProbe: description: livenessProbe of thin fuse pod properties: diff --git a/config/crd/bases/data.fluid.io_vineyardruntimes.yaml b/config/crd/bases/data.fluid.io_vineyardruntimes.yaml index 4cc9bcf62a6..fe4b66928a7 100644 --- a/config/crd/bases/data.fluid.io_vineyardruntimes.yaml +++ b/config/crd/bases/data.fluid.io_vineyardruntimes.yaml @@ -128,6 +128,15 @@ spec: Image Tag for Vineyard Fuse Default is `v0.22.2` type: string + launchMode: + default: Lazy + description: LaunchMode specifies the launch mode of fuse pod, + Lazy/Eager, default to Lazy + enum: + - "" + - Lazy + - Eager + type: string networkMode: description: |- Whether to use hostnetwork or not diff --git a/pkg/csi/plugins/nodeserver.go b/pkg/csi/plugins/nodeserver.go index ee86b59777a..e27bbc0cd78 100644 --- a/pkg/csi/plugins/nodeserver.go +++ b/pkg/csi/plugins/nodeserver.go @@ -343,6 +343,12 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag } // 4. remove label on node + // if fuse launch mode is eager, there is no need to remove label on node + if runtimeInfo.GetFuseLaunchMode() == v1alpha1.EagerMode { + glog.Infof("NodeUnstageVolume: NodeUnstage succeeded with VolumeId: %s without removing NodeLabel", volumeId) + return &csi.NodeUnstageVolumeResponse{}, nil + } + // Once the label is removed, fuse pod on corresponding node will be terminated // since node selector in the fuse daemonSet no longer matches. fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetOwnerDatasetUID()) @@ -405,6 +411,13 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if err != nil { return nil, errors.Wrapf(err, "NodeStageVolume: failed to get runtime info for %s/%s", namespace, name) } + + // if fuse launch mode is eager, there is no need to add label on node + if runtimeInfo.GetFuseLaunchMode() == v1alpha1.EagerMode { + glog.Infof("NodeStageVolume: NodeStage succeeded with VolumeId: %s without adding NodeLabel", volumeId) + return &csi.NodeStageVolumeResponse{}, nil + } + fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetOwnerDatasetUID()) var labelsToModify common.LabelsToModify labelsToModify.Add(fuseLabelKey, "true") diff --git a/pkg/ctrl/affinity.go b/pkg/ctrl/affinity.go index 0d5d72d7452..824ad899ab9 100644 --- a/pkg/ctrl/affinity.go +++ b/pkg/ctrl/affinity.go @@ -157,6 +157,26 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd }, }) + // if fuse launch mode is eager, set worker node affinity by fuse node selector + if e.runtimeInfo.GetFuseLaunchMode() == datav1alpha1.EagerMode { + var requirements []corev1.NodeSelectorRequirement + for key, val := range e.runtimeInfo.GetFuseNodeSelector() { + requirements = append(requirements, corev1.NodeSelectorRequirement{ + Key: key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{val}, + }) + } + workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = + append(workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + corev1.PreferredSchedulingTerm{ + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: requirements, + }, + }) + } + // 3. set node affinity if possible if dataset.Spec.NodeAffinity != nil { if dataset.Spec.NodeAffinity.Required != nil { diff --git a/pkg/ctrl/affinity_test.go b/pkg/ctrl/affinity_test.go index 6944e1b2f09..ba592cdffbf 100644 --- a/pkg/ctrl/affinity_test.go +++ b/pkg/ctrl/affinity_test.go @@ -94,6 +94,18 @@ func TestBuildWorkersAffinity(t *testing.T) { }, }, }, + { + Weight: 100, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "nodeA", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, }, }, }, @@ -166,6 +178,18 @@ func TestBuildWorkersAffinity(t *testing.T) { }, }, }, + { + Weight: 100, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "nodeA", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, }, }, }, @@ -248,6 +272,83 @@ func TestBuildWorkersAffinity(t *testing.T) { }, }, }, + { + Weight: 100, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "nodeA", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, { + name: "fuse-eager-mode-with-fuse-nodeSelector", + fields: fields{ + dataset: &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test4", + Namespace: "big-data", + }, + Spec: datav1alpha1.DatasetSpec{}, + }, + worker: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test4-thin-worker", + Namespace: "big-data", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](1), + }, + }, + want: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "fluid.io/dataset", + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + NodeAffinity: &v1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "fluid.io/f-big-data-test4", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + { + Weight: 100, + Preference: v1.NodeSelectorTerm{ + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "nodeA", + Operator: v1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, }, }, }, @@ -265,10 +366,12 @@ func TestBuildWorkersAffinity(t *testing.T) { runtimeObjs = append(runtimeObjs, tt.fields.dataset) runtimeObjs = append(runtimeObjs, tt.fields.worker) mockClient := fake.NewFakeClientWithScheme(s, runtimeObjs...) - runtimeInfo, err := base.BuildRuntimeInfo(tt.fields.dataset.Name, tt.fields.dataset.Namespace, common.JindoRuntime) + runtimeInfo, err := base.BuildRuntimeInfo(tt.fields.dataset.Name, tt.fields.dataset.Namespace, common.ThinRuntime) if err != nil { t.Errorf("testcase %s failed due to %v", tt.name, err) } + runtimeInfo.SetFuseLaunchMode(datav1alpha1.EagerMode) + runtimeInfo.SetFuseNodeSelector(map[string]string{"nodeA": "true"}) h := BuildHelper(runtimeInfo, mockClient, fake.NullLogger()) want := tt.fields.want diff --git a/pkg/ctrl/fuse.go b/pkg/ctrl/fuse.go index 6cb310de8d3..14829f31376 100644 --- a/pkg/ctrl/fuse.go +++ b/pkg/ctrl/fuse.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "strings" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -120,6 +121,11 @@ func (e *Helper) CheckFuseHealthy(recorder record.EventRecorder, runtime base.Ru // CleanUpFuse will cleanup node label for Fuse. func (e *Helper) CleanUpFuse() (count int, err error) { + if e.runtimeInfo.GetFuseLaunchMode() == datav1alpha1.EagerMode { + e.log.Info("No need to clean up fuse node label in eager mode") + return count, nil + } + var ( nodeList = &corev1.NodeList{} fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName(), e.runtimeInfo.GetOwnerDatasetUID()) @@ -176,6 +182,20 @@ func (e *Helper) GetFuseNodes() (nodes []corev1.Node, err error) { return } + // replace the fuseLabelSelector by customized fuse nodeSelectors in Eager mode + if e.runtimeInfo.GetFuseLaunchMode() == datav1alpha1.EagerMode { + fuseNodeSelectors := e.runtimeInfo.GetFuseNodeSelector() + var fuseLabelSelectorStrs []string + for k, v := range fuseNodeSelectors { + fuseLabelSelectorStrs = append(fuseLabelSelectorStrs, fmt.Sprintf("%s=%s", k, v)) + } + + fuseLabelSelector, err = labels.Parse(strings.Join(fuseLabelSelectorStrs, ",")) + if err != nil { + return + } + } + err = e.client.List(context.TODO(), nodeList, &client.ListOptions{ LabelSelector: fuseLabelSelector, }) diff --git a/pkg/ctrl/fuse_test.go b/pkg/ctrl/fuse_test.go index 7f784862b05..b8f9178de93 100644 --- a/pkg/ctrl/fuse_test.go +++ b/pkg/ctrl/fuse_test.go @@ -303,6 +303,7 @@ func TestCleanUpFuse(t *testing.T) { log logr.Logger runtimeType string nodeInputs []*corev1.Node + fuseLaunchMode datav1alpha1.FuseLaunchMode }{ { wantedCount: 1, @@ -324,8 +325,9 @@ func TestCleanUpFuse(t *testing.T) { "node-select": "true", }, }, - log: fake.NullLogger(), - runtimeType: "jindo", + log: fake.NullLogger(), + runtimeType: "jindo", + fuseLaunchMode: datav1alpha1.LazyMode, nodeInputs: []*corev1.Node{ { ObjectMeta: metav1.ObjectMeta{ @@ -378,8 +380,9 @@ func TestCleanUpFuse(t *testing.T) { "node-select": "true", }, }, - log: fake.NullLogger(), - runtimeType: "alluxio", + log: fake.NullLogger(), + runtimeType: "alluxio", + fuseLaunchMode: datav1alpha1.LazyMode, nodeInputs: []*corev1.Node{ { ObjectMeta: metav1.ObjectMeta{ @@ -434,8 +437,9 @@ func TestCleanUpFuse(t *testing.T) { "node-select": "true", }, }, - log: fake.NullLogger(), - runtimeType: "goosefs", + log: fake.NullLogger(), + runtimeType: "goosefs", + fuseLaunchMode: datav1alpha1.LazyMode, nodeInputs: []*corev1.Node{ { ObjectMeta: metav1.ObjectMeta{ @@ -469,6 +473,63 @@ func TestCleanUpFuse(t *testing.T) { }, }, }, + { + wantedCount: 0, + name: "hadoop", + namespace: "fluid", + wantedNodeLabels: map[string]map[string]string{ + "no-fuse": {}, + "multiple-fuse": { + "fluid.io/f-fluid-spark": "true", + "node-select": "true", + "fluid.io/f-fluid-hadoop": "true", + "fluid.io/s-fluid-hadoop": "true", + "fluid.io/s-h-juicefs-d-fluid-hadoop": "5B", + "fluid.io/s-h-juicefs-m-fluid-hadoop": "1B", + "fluid.io/s-h-juicefs-t-fluid-hadoop": "6B", + }, + "fuse": { + "fluid.io/dataset-num": "1", + "fluid.io/f-fluid-spark": "true", + "node-select": "true", + }, + }, + log: fake.NullLogger(), + runtimeType: "juicefs", + fuseLaunchMode: datav1alpha1.EagerMode, + nodeInputs: []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "no-fuse", + Labels: map[string]string{}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "multiple-fuse", + Labels: map[string]string{ + "fluid.io/f-fluid-spark": "true", + "node-select": "true", + "fluid.io/f-fluid-hadoop": "true", + "fluid.io/s-fluid-hadoop": "true", + "fluid.io/s-h-juicefs-d-fluid-hadoop": "5B", + "fluid.io/s-h-juicefs-m-fluid-hadoop": "1B", + "fluid.io/s-h-juicefs-t-fluid-hadoop": "6B", + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "fuse", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/f-fluid-spark": "true", + "node-select": "true", + }, + }, + }, + }, + }, } for _, test := range testCase { @@ -488,6 +549,7 @@ func TestCleanUpFuse(t *testing.T) { if err != nil { t.Errorf("build runtime info error %v", err) } + runtimeInfo.SetFuseLaunchMode(test.fuseLaunchMode) h := &Helper{ runtimeInfo: runtimeInfo, client: fakeClient, diff --git a/pkg/ddc/alluxio/runtime_info.go b/pkg/ddc/alluxio/runtime_info.go index b970a6e608d..a5e77055de1 100644 --- a/pkg/ddc/alluxio/runtime_info.go +++ b/pkg/ddc/alluxio/runtime_info.go @@ -42,6 +42,9 @@ func (e *AlluxioEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + e.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + if !e.UnitTest { // Setup with Dataset Info dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) diff --git a/pkg/ddc/alluxio/transform_fuse.go b/pkg/ddc/alluxio/transform_fuse.go index 7d6c5bc3b13..1611db09e0f 100644 --- a/pkg/ddc/alluxio/transform_fuse.go +++ b/pkg/ddc/alluxio/transform_fuse.go @@ -100,7 +100,10 @@ func (e *AlluxioEngine) transformFuse(runtime *datav1alpha1.AlluxioRuntime, data } else { value.Fuse.NodeSelector = map[string]string{} } - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + } // parse fuse container network mode value.Fuse.HostNetwork = datav1alpha1.IsHostNetwork(runtime.Spec.Fuse.NetworkMode) diff --git a/pkg/ddc/alluxio/transform_fuse_test.go b/pkg/ddc/alluxio/transform_fuse_test.go index 0c7358818a4..b0be84fd50e 100644 --- a/pkg/ddc/alluxio/transform_fuse_test.go +++ b/pkg/ddc/alluxio/transform_fuse_test.go @@ -24,6 +24,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) @@ -270,3 +271,112 @@ func TestTransformFuseWithNetwork(t *testing.T) { } } + +func TestTransformFuseWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.AlluxioRuntime + wantValue *Alluxio + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.AlluxioRuntimeSpec{ + Fuse: datav1alpha1.AlluxioFuseSpec{ + ImageTag: "2.8.0", + Image: "fluid/alluixo-fuse", + ImagePullPolicy: "always", + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Alluxio{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.AlluxioRuntimeSpec{ + Fuse: datav1alpha1.AlluxioFuseSpec{ + ImageTag: "2.8.0", + Image: "fluid/alluixo-fuse", + ImagePullPolicy: "always", + LaunchMode: datav1alpha1.LazyMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Alluxio{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.AlluxioRuntimeSpec{ + Fuse: datav1alpha1.AlluxioFuseSpec{ + ImageTag: "2.8.0", + Image: "fluid/alluixo-fuse", + ImagePullPolicy: "always", + LaunchMode: "", + }, + }, + }, + wantValue: &Alluxio{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &AlluxioEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + ds := &datav1alpha1.Dataset{} + for k, v := range testCases { + gotValue := &Alluxio{} + if err := engine.transformFuse(v.runtime, ds, gotValue); err == nil { + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } + + } +} diff --git a/pkg/ddc/base/runtime.go b/pkg/ddc/base/runtime.go index b2ffce867b6..a4c8333086c 100644 --- a/pkg/ddc/base/runtime.go +++ b/pkg/ddc/base/runtime.go @@ -104,6 +104,10 @@ type RuntimeInfoInterface interface { GetAnnotations() map[string]string GetFuseMetricsScrapeTarget() mountModeSelector + + SetFuseLaunchMode(mode datav1alpha1.FuseLaunchMode) + + GetFuseLaunchMode() datav1alpha1.FuseLaunchMode } var _ RuntimeInfoInterface = &RuntimeInfo{} @@ -149,6 +153,9 @@ type Fuse struct { // Metrics MetricsScrapeTarget mountModeSelector + + // LaunchMode decides how to launch fuse pods. + LaunchMode datav1alpha1.FuseLaunchMode } type TieredStoreInfo struct { @@ -333,6 +340,19 @@ func (info *RuntimeInfo) GetFuseCleanPolicy() datav1alpha1.FuseCleanPolicy { return info.fuse.CleanPolicy } +func (info *RuntimeInfo) SetFuseLaunchMode(mode datav1alpha1.FuseLaunchMode) { + if mode == "" { + // Default to set the fuse launch mode to EagerMode + info.fuse.LaunchMode = datav1alpha1.LazyMode + return + } + info.fuse.LaunchMode = mode +} + +func (info *RuntimeInfo) GetFuseLaunchMode() datav1alpha1.FuseLaunchMode { + return info.fuse.LaunchMode +} + // SetDeprecatedNodeLabel set the DeprecatedNodeLabel func (info *RuntimeInfo) SetDeprecatedNodeLabel(deprecated bool) { info.deprecatedNodeLabel = deprecated @@ -443,6 +463,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(alluxioRuntime.Spec.Fuse.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(alluxioRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetFuseLaunchMode(alluxioRuntime.Spec.Fuse.LaunchMode) case common.JindoRuntime: jindoRuntime, err := utils.GetJindoRuntime(client, name, namespace) if err != nil { @@ -460,6 +481,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(jindoRuntime.Spec.Fuse.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(jindoRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetFuseLaunchMode(jindoRuntime.Spec.Fuse.LaunchMode) case common.GooseFSRuntime: goosefsRuntime, err := utils.GetGooseFSRuntime(client, name, namespace) if err != nil { @@ -476,6 +498,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(goosefsRuntime.Spec.Fuse.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(goosefsRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetFuseLaunchMode(goosefsRuntime.Spec.Fuse.LaunchMode) case common.JuiceFSRuntime: juicefsRuntime, err := utils.GetJuiceFSRuntime(client, name, namespace) if err != nil { @@ -492,6 +515,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(juicefsRuntime.Spec.Fuse.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(juicefsRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetFuseLaunchMode(juicefsRuntime.Spec.Fuse.LaunchMode) case common.ThinRuntime: thinRuntime, err := utils.GetThinRuntime(client, name, namespace) if err != nil { @@ -508,6 +532,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(thinRuntime.Spec.Fuse.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(thinRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetFuseLaunchMode(thinRuntime.Spec.Fuse.LaunchMode) case common.EFCRuntime: efcRuntime, err := utils.GetEFCRuntime(client, name, namespace) if err != nil { @@ -524,6 +549,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(efcRuntime.Spec.Fuse.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(efcRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetFuseLaunchMode(efcRuntime.Spec.Fuse.LaunchMode) case common.VineyardRuntime: vineyardRuntime, err := utils.GetVineyardRuntime(client, name, namespace) if err != nil { @@ -540,6 +566,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(common.VineyardFuseNodeSelector) runtimeInfo.SetupFuseCleanPolicy(vineyardRuntime.Spec.Fuse.CleanPolicy) + runtimeInfo.SetFuseLaunchMode(vineyardRuntime.Spec.Fuse.LaunchMode) default: err = fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType) return diff --git a/pkg/ddc/base/runtime_test.go b/pkg/ddc/base/runtime_test.go index 469da39f82e..dd0c0fe0f5f 100644 --- a/pkg/ddc/base/runtime_test.go +++ b/pkg/ddc/base/runtime_test.go @@ -214,7 +214,7 @@ func TestBuildRuntimeInfo(t *testing.T) { } } -func TestCleanPolicy(t *testing.T) { +func TestCleanPolicyAndLaunchMode(t *testing.T) { s := runtime.NewScheme() s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.AlluxioRuntime{}) @@ -258,6 +258,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.AlluxioRuntimeSpec{ Fuse: v1alpha1.AlluxioFuseSpec{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, } @@ -286,6 +287,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.AlluxioRuntimeSpec{ Fuse: v1alpha1.AlluxioFuseSpec{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, } @@ -346,6 +348,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.JindoRuntimeSpec{ Fuse: v1alpha1.JindoFuseSpec{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, } @@ -374,6 +377,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.JindoRuntimeSpec{ Fuse: v1alpha1.JindoFuseSpec{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, } @@ -434,6 +438,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.JuiceFSRuntimeSpec{ Fuse: v1alpha1.JuiceFSFuseSpec{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, } @@ -462,6 +467,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.JuiceFSRuntimeSpec{ Fuse: v1alpha1.JuiceFSFuseSpec{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, } @@ -522,6 +528,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.GooseFSRuntimeSpec{ Fuse: v1alpha1.GooseFSFuseSpec{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, } @@ -550,6 +557,7 @@ func TestCleanPolicy(t *testing.T) { Spec: v1alpha1.GooseFSRuntimeSpec{ Fuse: v1alpha1.GooseFSFuseSpec{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, } @@ -599,6 +607,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.AlluxioRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -616,6 +625,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.AlluxioRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -633,6 +643,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.AlluxioRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, wantErr: false, @@ -650,6 +661,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.JindoRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -667,6 +679,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.JindoRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -684,6 +697,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.JindoRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, wantErr: false, @@ -701,6 +715,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.JuiceFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -718,6 +733,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.JuiceFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -735,6 +751,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.JuiceFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, wantErr: false, @@ -752,6 +769,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.GooseFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -769,6 +787,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.GooseFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -786,6 +805,7 @@ func TestCleanPolicy(t *testing.T) { runtimeType: common.GooseFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.EagerMode, }, }, wantErr: false, @@ -803,6 +823,9 @@ func TestCleanPolicy(t *testing.T) { if !tt.wantErr && !reflect.DeepEqual(got.GetFuseCleanPolicy(), tt.want.GetFuseCleanPolicy()) { t.Errorf("GetRuntimeInfo() = %#v, want %#v", got, tt.want) } + if !tt.wantErr && !reflect.DeepEqual(got.GetFuseLaunchMode(), tt.want.GetFuseLaunchMode()) { + t.Errorf("GetFuseLaunchMode() = %#v, want %#v", got.GetFuseLaunchMode(), tt.want.GetFuseLaunchMode()) + } }) } } @@ -964,6 +987,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.AlluxioRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -981,6 +1005,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.GooseFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -998,6 +1023,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.GooseFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: true, @@ -1015,6 +1041,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.JindoRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, MetricsScrapeTarget: mountModeSelector{}, }, }, @@ -1033,6 +1060,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.JuiceFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -1050,6 +1078,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.JuiceFSRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: true, @@ -1067,6 +1096,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.EFCRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnRuntimeDeletedCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: false, @@ -1084,6 +1114,7 @@ func TestGetRuntimeInfo(t *testing.T) { runtimeType: common.EFCRuntime, fuse: Fuse{ CleanPolicy: v1alpha1.OnDemandCleanPolicy, + LaunchMode: v1alpha1.LazyMode, }, }, wantErr: true, diff --git a/pkg/ddc/efc/runtime_info.go b/pkg/ddc/efc/runtime_info.go index 2b94c93d9c3..5870002dc04 100644 --- a/pkg/ddc/efc/runtime_info.go +++ b/pkg/ddc/efc/runtime_info.go @@ -43,6 +43,9 @@ func (e *EFCEngine) getRuntimeInfo() (info base.RuntimeInfoInterface, err error) // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + e.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + if !e.UnitTest { e.runtimeInfo.SetDeprecatedNodeLabel(false) e.runtimeInfo.SetDeprecatedPVName(false) diff --git a/pkg/ddc/efc/transform.go b/pkg/ddc/efc/transform.go index a8e48eecb26..688efe2af0e 100644 --- a/pkg/ddc/efc/transform.go +++ b/pkg/ddc/efc/transform.go @@ -216,8 +216,11 @@ func (e *EFCEngine) transformFuse(runtime *datav1alpha1.EFCRuntime, if len(runtime.Spec.Fuse.NodeSelector) > 0 { value.Fuse.NodeSelector = runtime.Spec.Fuse.NodeSelector } - // The label will be added by CSI Plugin when any workload pod is scheduled on the node. - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + + } // tiered store err = e.transformFuseTieredStore(runtime, value) diff --git a/pkg/ddc/efc/transform_test.go b/pkg/ddc/efc/transform_test.go index 1587a388703..4205a90a44a 100644 --- a/pkg/ddc/efc/transform_test.go +++ b/pkg/ddc/efc/transform_test.go @@ -17,6 +17,7 @@ limitations under the License. package efc import ( + "reflect" "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,6 +29,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) @@ -93,3 +95,103 @@ func TestEFCEngine_transform(t *testing.T) { } } } + +func TestTransformFuseWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.EFCRuntime + wantValue *EFC + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.EFCRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.EFCRuntimeSpec{ + Fuse: datav1alpha1.EFCFuseSpec{ + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &EFC{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.EFCRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.EFCRuntimeSpec{ + Fuse: datav1alpha1.EFCFuseSpec{ + LaunchMode: datav1alpha1.LazyMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &EFC{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.EFCRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.EFCRuntimeSpec{ + Fuse: datav1alpha1.EFCFuseSpec{ + LaunchMode: "", + }, + }, + }, + wantValue: &EFC{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "efc") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &EFCEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + ds := &datav1alpha1.Dataset{} + for k, v := range testCases { + gotValue := &EFC{} + if err := engine.transformFuse(v.runtime, ds, gotValue); err == nil { + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } + + } +} diff --git a/pkg/ddc/goosefs/runtime_info.go b/pkg/ddc/goosefs/runtime_info.go index 1ae55b7320c..b421dbafbea 100644 --- a/pkg/ddc/goosefs/runtime_info.go +++ b/pkg/ddc/goosefs/runtime_info.go @@ -44,6 +44,9 @@ func (e *GooseFSEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + e.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + if !e.UnitTest { // Setup with Dataset Info dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) diff --git a/pkg/ddc/goosefs/transform_fuse.go b/pkg/ddc/goosefs/transform_fuse.go index 4994848bd73..b792498d75b 100644 --- a/pkg/ddc/goosefs/transform_fuse.go +++ b/pkg/ddc/goosefs/transform_fuse.go @@ -86,8 +86,10 @@ func (e *GooseFSEngine) transformFuse(runtime *datav1alpha1.GooseFSRuntime, data } else { value.Fuse.NodeSelector = map[string]string{} } - - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + } value.Fuse.HostNetwork = true value.Fuse.HostPID = common.HostPIDEnabled(runtime.Annotations) value.Fuse.Enabled = true diff --git a/pkg/ddc/goosefs/transform_fuse_test.go b/pkg/ddc/goosefs/transform_fuse_test.go index 66cfc5a5585..2d913bc0ae5 100644 --- a/pkg/ddc/goosefs/transform_fuse_test.go +++ b/pkg/ddc/goosefs/transform_fuse_test.go @@ -17,11 +17,14 @@ limitations under the License. package goosefs import ( + "reflect" "testing" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestTransformFuseWithNoArgs(t *testing.T) { @@ -104,3 +107,103 @@ func TestTransformFuseWithArgs(t *testing.T) { } } } + +func TestTransformFuseWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.GooseFSRuntime + wantValue *GooseFS + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.GooseFSRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.GooseFSRuntimeSpec{ + Fuse: datav1alpha1.GooseFSFuseSpec{ + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &GooseFS{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.GooseFSRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.GooseFSRuntimeSpec{ + Fuse: datav1alpha1.GooseFSFuseSpec{ + LaunchMode: datav1alpha1.LazyMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &GooseFS{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.GooseFSRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.GooseFSRuntimeSpec{ + Fuse: datav1alpha1.GooseFSFuseSpec{ + LaunchMode: "", + }, + }, + }, + wantValue: &GooseFS{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "GooseFS") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &GooseFSEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + ds := &datav1alpha1.Dataset{} + for k, v := range testCases { + gotValue := &GooseFS{} + if err := engine.transformFuse(v.runtime, ds, gotValue); err == nil { + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } + + } +} diff --git a/pkg/ddc/jindo/runtime_info.go b/pkg/ddc/jindo/runtime_info.go index 6180775a007..d263ff70688 100644 --- a/pkg/ddc/jindo/runtime_info.go +++ b/pkg/ddc/jindo/runtime_info.go @@ -45,6 +45,9 @@ func (e *JindoEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + e.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + // Setup with Dataset Info dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) if err != nil { diff --git a/pkg/ddc/jindo/transform.go b/pkg/ddc/jindo/transform.go index bc3f086055f..fb656d0b863 100644 --- a/pkg/ddc/jindo/transform.go +++ b/pkg/ddc/jindo/transform.go @@ -463,9 +463,10 @@ func (e *JindoEngine) transformFuseNodeSelector(runtime *datav1alpha1.JindoRunti value.Fuse.NodeSelector = map[string]string{} } - // The label will be added by CSI Plugin when any workload pod is scheduled on the node. - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" - + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + } return nil } diff --git a/pkg/ddc/jindo/transform_fuse_test.go b/pkg/ddc/jindo/transform_fuse_test.go index 288c3517950..c3906678241 100644 --- a/pkg/ddc/jindo/transform_fuse_test.go +++ b/pkg/ddc/jindo/transform_fuse_test.go @@ -17,10 +17,14 @@ limitations under the License. package jindo import ( + "reflect" "testing" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestTransformFuseWithNoArgs(t *testing.T) { @@ -52,6 +56,106 @@ func TestTransformFuseWithNoArgs(t *testing.T) { } } +func TestTransformFuseNodeSelectorWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.JindoRuntime + wantValue *Jindo + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: datav1alpha1.LazyMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: "", + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "Jindo") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &JindoEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + + for k, v := range testCases { + gotValue := &Jindo{} + if err := engine.transformFuseNodeSelector(v.runtime, gotValue); err == nil { + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } + + } +} + func TestTransformRunAsUser(t *testing.T) { var tests = []struct { runtime *datav1alpha1.JindoRuntime diff --git a/pkg/ddc/jindocache/runtime_info.go b/pkg/ddc/jindocache/runtime_info.go index 2182089687b..230dc519c7d 100644 --- a/pkg/ddc/jindocache/runtime_info.go +++ b/pkg/ddc/jindocache/runtime_info.go @@ -44,6 +44,9 @@ func (e *JindoCacheEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + e.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + // Setup with Dataset Info dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) if err != nil { diff --git a/pkg/ddc/jindocache/transform.go b/pkg/ddc/jindocache/transform.go index cedf58cd8cc..5f15957b1f6 100644 --- a/pkg/ddc/jindocache/transform.go +++ b/pkg/ddc/jindocache/transform.go @@ -907,8 +907,10 @@ func (e *JindoCacheEngine) transformFuseNodeSelector(runtime *datav1alpha1.Jindo value.Fuse.NodeSelector = map[string]string{} } - // The label will be added by CSI Plugin when any workload pod is scheduled on the node. - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + } } func (e *JindoCacheEngine) transformNodeSelector(runtime *datav1alpha1.JindoRuntime) map[string]string { diff --git a/pkg/ddc/jindocache/transform_fuse_test.go b/pkg/ddc/jindocache/transform_fuse_test.go index bc0bf4429ed..60d28866fdc 100644 --- a/pkg/ddc/jindocache/transform_fuse_test.go +++ b/pkg/ddc/jindocache/transform_fuse_test.go @@ -17,6 +17,10 @@ limitations under the License. package jindocache import ( + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "reflect" "testing" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" @@ -49,6 +53,104 @@ func TestTransformFuseWithNoArgs(t *testing.T) { } } +func TestTransformFuseNodeSelectorWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.JindoRuntime + wantValue *Jindo + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: datav1alpha1.LazyMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: "", + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "JindoCache") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &JindoCacheEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + + for k, v := range testCases { + gotValue := &Jindo{} + engine.transformFuseNodeSelector(v.runtime, gotValue) + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } +} + func TestTransformFuseWithSecret(t *testing.T) { var tests = []struct { runtime *datav1alpha1.JindoRuntime diff --git a/pkg/ddc/jindofsx/runtime_info.go b/pkg/ddc/jindofsx/runtime_info.go index 557a4bf7c65..51356f8d086 100644 --- a/pkg/ddc/jindofsx/runtime_info.go +++ b/pkg/ddc/jindofsx/runtime_info.go @@ -44,6 +44,9 @@ func (e *JindoFSxEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + e.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + // Setup with Dataset Info dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) if err != nil { diff --git a/pkg/ddc/jindofsx/transform.go b/pkg/ddc/jindofsx/transform.go index 0fe108f509e..36a37f95ddf 100644 --- a/pkg/ddc/jindofsx/transform.go +++ b/pkg/ddc/jindofsx/transform.go @@ -790,8 +790,10 @@ func (e *JindoFSxEngine) transformFuseNodeSelector(runtime *datav1alpha1.JindoRu value.Fuse.NodeSelector = map[string]string{} } - // The label will be added by CSI Plugin when any workload pod is scheduled on the node. - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + } } func (e *JindoFSxEngine) transformNodeSelector(runtime *datav1alpha1.JindoRuntime) map[string]string { diff --git a/pkg/ddc/jindofsx/transform_fuse_test.go b/pkg/ddc/jindofsx/transform_fuse_test.go index 3751ab30c39..35bcbe15258 100644 --- a/pkg/ddc/jindofsx/transform_fuse_test.go +++ b/pkg/ddc/jindofsx/transform_fuse_test.go @@ -17,6 +17,10 @@ limitations under the License. package jindofsx import ( + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "reflect" "testing" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" @@ -49,6 +53,104 @@ func TestTransformFuseWithNoArgs(t *testing.T) { } } +func TestTransformFuseNodeSelectorWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.JindoRuntime + wantValue *Jindo + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: datav1alpha1.LazyMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.JindoRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JindoRuntimeSpec{ + Fuse: datav1alpha1.JindoFuseSpec{ + LaunchMode: "", + }, + }, + }, + wantValue: &Jindo{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "Jindofsx") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &JindoFSxEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + + for k, v := range testCases { + gotValue := &Jindo{} + engine.transformFuseNodeSelector(v.runtime, gotValue) + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } +} + func TestTransformFuseWithSecret(t *testing.T) { var tests = []struct { runtime *datav1alpha1.JindoRuntime diff --git a/pkg/ddc/juicefs/runtime_info.go b/pkg/ddc/juicefs/runtime_info.go index 43ea895cb21..13856a7d994 100644 --- a/pkg/ddc/juicefs/runtime_info.go +++ b/pkg/ddc/juicefs/runtime_info.go @@ -44,6 +44,9 @@ func (j *JuiceFSEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { // Setup Fuse Deploy Mode j.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + j.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + if !j.UnitTest { // Setup with Dataset Info dataset, err := utils.GetDataset(j.Client, j.name, j.namespace) diff --git a/pkg/ddc/juicefs/transform_fuse.go b/pkg/ddc/juicefs/transform_fuse.go index 9349b7492d4..0527f647b07 100644 --- a/pkg/ddc/juicefs/transform_fuse.go +++ b/pkg/ddc/juicefs/transform_fuse.go @@ -127,8 +127,10 @@ func (j *JuiceFSEngine) transformFuseNodeSelector(runtime *datav1alpha1.JuiceFSR value.Fuse.NodeSelector = runtime.Spec.Fuse.NodeSelector } - // The label will be added by CSI Plugin when any workload pod is scheduled on the node. - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, j.runtimeInfo.GetOwnerDatasetUID())] = "true" + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, j.runtimeInfo.GetOwnerDatasetUID())] = "true" + } } // genValue: generate the value of juicefs diff --git a/pkg/ddc/juicefs/transform_fuse_test.go b/pkg/ddc/juicefs/transform_fuse_test.go index d941ceb8c4d..cd56052bc57 100644 --- a/pkg/ddc/juicefs/transform_fuse_test.go +++ b/pkg/ddc/juicefs/transform_fuse_test.go @@ -30,6 +30,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) @@ -1454,3 +1455,103 @@ func TestJuiceFSEngine_genMountOptions(t *testing.T) { }) } } + +func TestTransformFuseWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.JuiceFSRuntime + wantValue *JuiceFS + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.JuiceFSRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Fuse: datav1alpha1.JuiceFSFuseSpec{ + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &JuiceFS{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.JuiceFSRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Fuse: datav1alpha1.JuiceFSFuseSpec{ + LaunchMode: datav1alpha1.LazyMode, + NodeSelector: map[string]string{ + "fuse_node": "true", + }, + }, + }, + }, + wantValue: &JuiceFS{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + "fuse_node": "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.JuiceFSRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.JuiceFSRuntimeSpec{ + Fuse: datav1alpha1.JuiceFSFuseSpec{ + LaunchMode: "", + }, + }, + }, + wantValue: &JuiceFS{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "JuiceFS") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &JuiceFSEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + ds := &datav1alpha1.Dataset{} + for k, v := range testCases { + gotValue := &JuiceFS{} + if err := engine.transformFuse(v.runtime, ds, gotValue); err == nil { + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } + + } +} diff --git a/pkg/ddc/thin/referencedataset/runtime.go b/pkg/ddc/thin/referencedataset/runtime.go index 249c39676ca..2a2604f4acc 100644 --- a/pkg/ddc/thin/referencedataset/runtime.go +++ b/pkg/ddc/thin/referencedataset/runtime.go @@ -85,6 +85,9 @@ func (e *ReferenceDatasetEngine) getRuntimeInfo() (base.RuntimeInfoInterface, er // Setup Fuse Deploy Mode e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + e.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + // Ignore the deprecated common labels and PersistentVolumes, use physical runtime // Setup with Dataset Info diff --git a/pkg/ddc/thin/runtime_info.go b/pkg/ddc/thin/runtime_info.go index f2e2649be61..98591f713c7 100644 --- a/pkg/ddc/thin/runtime_info.go +++ b/pkg/ddc/thin/runtime_info.go @@ -49,6 +49,9 @@ func (t *ThinEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { // Setup Fuse Deploy Mode t.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector) + // Setup Fuse Launch Mode + t.runtimeInfo.SetFuseLaunchMode(runtime.Spec.Fuse.LaunchMode) + if !t.UnitTest { // Check if the runtime is using deprecated naming style for PersistentVolumes isPVNameDeprecated, err := volume.HasDeprecatedPersistentVolumeName(t.Client, t.runtimeInfo, t.Log) diff --git a/pkg/ddc/thin/status_test.go b/pkg/ddc/thin/status_test.go index 50bb5d9cb97..5c7624aa511 100644 --- a/pkg/ddc/thin/status_test.go +++ b/pkg/ddc/thin/status_test.go @@ -281,12 +281,13 @@ func TestThinEngine_CheckAndUpdateRuntimeStatus(t *testing.T) { func TestThinEngine_UpdateRuntimeSetConfigIfNeeded(t *testing.T) { type fields struct { - worker *appsv1.StatefulSet - pods []*corev1.Pod - ds *appsv1.DaemonSet - nodes []*corev1.Node - name string - namespace string + worker *appsv1.StatefulSet + pods []*corev1.Pod + ds *appsv1.DaemonSet + nodes []*corev1.Node + fuseLaunchMode datav1alpha1.FuseLaunchMode + name string + namespace string } testcases := []struct { name string @@ -495,7 +496,7 @@ func TestThinEngine_UpdateRuntimeSetConfigIfNeeded(t *testing.T) { Addresses: []corev1.NodeAddress{ { Type: corev1.NodeInternalIP, - Address: "10.0.0.2", + Address: "10.0.0.6", }, }, }, @@ -511,6 +512,83 @@ func TestThinEngine_UpdateRuntimeSetConfigIfNeeded(t *testing.T) { }, want: "{\"workers\":[],\"fuses\":[]}", wantUpdated: false, }, + { + name: "launch_fuse_eager_mode", + fields: fields{ + name: "flink", + namespace: "big-data", + fuseLaunchMode: datav1alpha1.EagerMode, + worker: &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "flink-worker", + Namespace: "big-data", + UID: "uid2", + }, + Spec: appsv1.StatefulSetSpec{}, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "flink-worker-0", + Namespace: "big-data", + OwnerReferences: []metav1.OwnerReference{{ + Kind: "StatefulSet", + APIVersion: "apps/v1", + Name: "flink-worker", + UID: "uid2", + Controller: ptr.To(true), + }}, + Labels: map[string]string{ + "app": "thin", + "role": "thin-worker", + "fluid.io/dataset": "big-data-flink", + }, + }, + Spec: corev1.PodSpec{NodeName: "node7"}, + }, + }, + nodes: []*corev1.Node{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node7", + Labels: map[string]string{ + "fluid.io/f-big-data-flink": "true", + "fluid.io/s-big-data-flink": "true", + "fluid.io/s-thin-big-data-flink": "true", + }, + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: "10.0.0.7", + }, + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "node8", + Labels: map[string]string{"fluid.io/s-default-flink": "true", + "fluid.io/s-thin-big-data-flink": "true"}, + }, Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Type: corev1.NodeInternalIP, + Address: "172.17.0.10", + }, + }, + }, + }}, + }, + configMap: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "flink-runtimeset", + Namespace: "big-data", + }, Data: map[string]string{ + "runtime.json": "{\"workers\":[\"10.0.0.7\",\"172.17.0.10\"],\"fuses\":[\"10.0.0.1\",\"10.0.0.2\",\"10.0.0.6\",\"10.0.0.7\",\"172.17.0.9\",\"172.17.0.10\",\"192.168.0.1\",\"192.168.0.2\"]}", + }, + }, want: "{\"workers\":[\"10.0.0.7\",\"172.17.0.10\"],\"fuses\":[\"10.0.0.1\",\"10.0.0.2\",\"10.0.0.6\",\"10.0.0.7\",\"172.17.0.9\",\"172.17.0.10\",\"192.168.0.1\",\"192.168.0.2\"]}", + wantUpdated: false, + }, } runtimeObjs := []runtime.Object{} @@ -543,6 +621,7 @@ func TestThinEngine_UpdateRuntimeSetConfigIfNeeded(t *testing.T) { if err != nil { t.Errorf("BuildRuntimeInfo() error = %v", err) } + runtimeInfo.SetFuseLaunchMode(testcase.fields.fuseLaunchMode) engine.Helper = ctrlhelper.BuildHelper(runtimeInfo, c, engine.Log) updated, err := engine.UpdateRuntimeSetConfigIfNeeded() diff --git a/pkg/ddc/thin/transform_fuse.go b/pkg/ddc/thin/transform_fuse.go index 5c66e3192b6..a9c9de8fe5e 100644 --- a/pkg/ddc/thin/transform_fuse.go +++ b/pkg/ddc/thin/transform_fuse.go @@ -62,8 +62,10 @@ func (t *ThinEngine) transformFuse(runtime *datav1alpha1.ThinRuntime, profile *d if len(runtime.Spec.Fuse.NodeSelector) > 0 { value.Fuse.NodeSelector = runtime.Spec.Fuse.NodeSelector } - value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, t.runtimeInfo.GetOwnerDatasetUID())] = "true" - + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode && profile.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, t.runtimeInfo.GetOwnerDatasetUID())] = "true" + } // 5. ports if len(runtime.Spec.Fuse.Ports) != 0 { value.Fuse.Ports = append(value.Fuse.Ports, runtime.Spec.Fuse.Ports...) diff --git a/pkg/ddc/thin/transform_fuse_test.go b/pkg/ddc/thin/transform_fuse_test.go index f9995ad3fd5..13f340f1fe6 100644 --- a/pkg/ddc/thin/transform_fuse_test.go +++ b/pkg/ddc/thin/transform_fuse_test.go @@ -519,6 +519,244 @@ func TestThinEngine_transformFuse(t1 *testing.T) { }) } +func TestThinEngine_transformFuseWithEagerMode(t1 *testing.T) { + profile := &datav1alpha1.ThinRuntimeProfile{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: datav1alpha1.ThinRuntimeProfileSpec{ + FileSystemType: "test", + Fuse: datav1alpha1.ThinFuseSpec{ + Image: "test", + ImageTag: "v1", + ImagePullPolicy: "Always", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + // Should be inherited + corev1.ResourceCPU: resource.MustParse("100m"), + // Should be overridden + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + Env: []corev1.EnvVar{{ + Name: "a", + Value: "b", + }}, + NodeSelector: map[string]string{"a": "b"}, + Ports: []corev1.ContainerPort{{ + Name: "port", + ContainerPort: 8080, + }}, + NetworkMode: datav1alpha1.HostNetworkMode, + VolumeMounts: []corev1.VolumeMount{{ + Name: "a", + MountPath: "/test", + }}, + }, + Volumes: []corev1.Volume{{ + Name: "a", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{Path: "/test"}, + }, + }}, + }, + } + runtime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "fluid", + }, + Spec: datav1alpha1.ThinRuntimeSpec{ + ThinRuntimeProfileName: "test", + Fuse: datav1alpha1.ThinFuseSpec{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("200m"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, + Env: []corev1.EnvVar{{ + Name: "b", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "test-cm"}, + }, + }, + }}, + Options: map[string]string{ + "fuse-opt": "foo", + }, + LaunchMode: datav1alpha1.EagerMode, + NodeSelector: map[string]string{"b": "c"}, + VolumeMounts: []corev1.VolumeMount{{ + Name: "b", + MountPath: "/b", + }}, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + }, + }, + InitialDelaySeconds: 1, + TimeoutSeconds: 1, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 1, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + }, + }, + InitialDelaySeconds: 1, + TimeoutSeconds: 1, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 1, + }, + }, + Volumes: []corev1.Volume{{ + Name: "b", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{Path: "/b"}, + }, + }}, + }, + } + dataset := &datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{ + SharedOptions: map[string]string{ + "c": "d", + }, + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany}, + Mounts: []datav1alpha1.Mount{{ + MountPoint: "abc", + Options: map[string]string{"a": "b"}, + }}, + }, + } + wantValue := &ThinValue{ + Fuse: Fuse{ + Enabled: true, + Image: "test", + ImageTag: "v1", + ImagePullPolicy: "Always", + TargetPath: "/thin/fluid/test/thin-fuse", + Resources: common.Resources{ + Requests: map[corev1.ResourceName]string{ + corev1.ResourceCPU: "100m", + corev1.ResourceMemory: "1Gi", + }, + Limits: map[corev1.ResourceName]string{ + corev1.ResourceCPU: "200m", + corev1.ResourceMemory: "4Gi", + }, + }, + HostNetwork: true, + Envs: []corev1.EnvVar{{ + Name: "a", + Value: "b", + }, { + Name: "b", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "test-cm", + }, + }, + }, + }, { + Name: common.ThinFuseOptionEnvKey, + Value: "fuse-opt=foo", + }, { + Name: common.ThinFusePointEnvKey, + Value: "/thin/fluid/test/thin-fuse", + }}, + NodeSelector: map[string]string{"b": "c"}, + Ports: []corev1.ContainerPort{{ + Name: "port", + ContainerPort: 8080, + }}, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + }, + }, + InitialDelaySeconds: 1, + TimeoutSeconds: 1, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 1, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + }, + }, + InitialDelaySeconds: 1, + TimeoutSeconds: 1, + PeriodSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 1, + }, + Volumes: []corev1.Volume{{ + Name: "a", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{Path: "/test"}, + }, + }, { + Name: "b", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{Path: "/b"}, + }, + }}, + VolumeMounts: []corev1.VolumeMount{{ + Name: "a", + MountPath: "/test", + }, { + Name: "b", + MountPath: "/b", + }}, + // ConfigValue: "{\"/thin/fluid/test/thin-fuse\":\"a=b\"}", + // MountPath: "/thin/fluid/test/thin-fuse", + ConfigValue: "{\"mounts\":[{\"mountPoint\":\"abc\",\"options\":{\"a\":\"b\",\"c\":\"d\"}}],\"targetPath\":\"/thin/fluid/test/thin-fuse\",\"runtimeOptions\":{\"fuse-opt\":\"foo\"},\"accessModes\":[\"ReadWriteMany\"]}", + ConfigStorage: "configmap", + }, + } + value := &ThinValue{} + runtimeInfo, err := base.BuildRuntimeInfo("test", "fluid", "thin") + if err != nil { + t1.Errorf("fail to create the runtimeInfo with error %v", err) + } + t1.Run("test", func(t1 *testing.T) { + t := &ThinEngine{ + Log: fake.NullLogger(), + namespace: "fluid", + name: "test", + runtime: runtime, + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + if err := t.transformFuse(runtime, profile, dataset, value); err != nil { + t1.Errorf("transformFuse() error = %v", err) + } + + value.Fuse.Envs = testutil.SortEnvVarByName(value.Fuse.Envs, common.ThinFuseOptionEnvKey) + if !testutil.DeepEqualIgnoringSliceOrder(t1, value.Fuse, wantValue.Fuse) { + valueYaml, _ := yaml.Marshal(value.Fuse) + wantYaml, _ := yaml.Marshal(wantValue.Fuse) + t1.Errorf("transformFuse() \ngot = %v, \nwant = %v", string(valueYaml), string(wantYaml)) + } + }) +} + func TestThinEngine_transformFuseWithDuplicateOptionKey(t1 *testing.T) { profile := &datav1alpha1.ThinRuntimeProfile{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/ddc/vineyard/transform.go b/pkg/ddc/vineyard/transform.go index cde06208df6..cd7fd94eae3 100644 --- a/pkg/ddc/vineyard/transform.go +++ b/pkg/ddc/vineyard/transform.go @@ -271,7 +271,11 @@ func (e *VineyardEngine) transformWorkerPorts(runtime *datav1alpha1.VineyardRunt func (e *VineyardEngine) transformFuseNodeSelector(runtime *datav1alpha1.VineyardRuntime) map[string]string { nodeSelector := map[string]string{} - nodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + + if runtime.Spec.Fuse.LaunchMode != datav1alpha1.EagerMode { + // The label will be added by CSI Plugin when any workload pod is scheduled on the node. + nodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetOwnerDatasetUID())] = "true" + } return nodeSelector } diff --git a/pkg/ddc/vineyard/transform_test.go b/pkg/ddc/vineyard/transform_test.go index 926e9c6a4b4..1e2916da7de 100755 --- a/pkg/ddc/vineyard/transform_test.go +++ b/pkg/ddc/vineyard/transform_test.go @@ -22,6 +22,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -646,6 +647,95 @@ func TestTransformFuseNodeSelector(t *testing.T) { } } +func TestTransformFuseWithLaunchMode(t *testing.T) { + testCases := map[string]struct { + runtime *datav1alpha1.VineyardRuntime + wantValue *Vineyard + }{ + "test fuse launch mode case 1": { + runtime: &datav1alpha1.VineyardRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.VineyardRuntimeSpec{ + Fuse: datav1alpha1.VineyardClientSocketSpec{ + LaunchMode: datav1alpha1.EagerMode, + }, + }, + }, + wantValue: &Vineyard{ + Fuse: Fuse{ + NodeSelector: map[string]string{}, + }, + }, + }, + "test fuse launch mode case 2": { + runtime: &datav1alpha1.VineyardRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.VineyardRuntimeSpec{ + Fuse: datav1alpha1.VineyardClientSocketSpec{ + LaunchMode: datav1alpha1.LazyMode, + }, + }, + }, + wantValue: &Vineyard{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + "test fuse launch mode case 3": { + runtime: &datav1alpha1.VineyardRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: "hbase", + Namespace: "fluid", + }, + Spec: datav1alpha1.VineyardRuntimeSpec{ + Fuse: datav1alpha1.VineyardClientSocketSpec{ + LaunchMode: "", + }, + }, + }, + wantValue: &Vineyard{ + Fuse: Fuse{ + NodeSelector: map[string]string{ + utils.GetFuseLabelName("fluid", "hbase", ""): "true", + }, + }, + }, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "Vineyard") + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } + + engine := &VineyardEngine{ + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, + Client: fake.NewFakeClientWithScheme(testScheme), + } + + for k, v := range testCases { + gotValue := &Vineyard{} + engine.transformFuse(v.runtime, gotValue) + if !reflect.DeepEqual(gotValue.Fuse.NodeSelector, v.wantValue.Fuse.NodeSelector) { + t.Errorf("check %s failure, got:%+v,want:%+v", + k, + gotValue.Fuse.NodeSelector, + v.wantValue.Fuse.NodeSelector, + ) + } + } +} + func TestTransformTieredStore(t *testing.T) { defaultQuota := resource.MustParse("4Gi") quota := resource.MustParse("20Gi")