Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 167 additions & 113 deletions controllers/lxd_initializer_ds.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"sigs.k8s.io/yaml"

"github.com/spectrocloud/cluster-api-provider-maas/pkg/maas/scope"
"github.com/spectrocloud/cluster-api-provider-maas/pkg/util/trust"
"github.com/spectrocloud/cluster-api-provider-maas/pkg/util"
"github.com/spectrocloud/cluster-api-provider-maas/pkg/util/trust"

// embed template
_ "embed"
Expand All @@ -43,9 +43,9 @@ func (r *MaasClusterReconciler) ensureLXDInitializerDS(ctx context.Context, clus
dsNamespace := cluster.Namespace

// Always operate on the TARGET cluster client
remoteClient, err := clusterScope.GetWorkloadClusterClient(ctx)
remoteClient, err := r.getTargetClient(ctx, clusterScope)
if err != nil {
return fmt.Errorf("failed to get target cluster client: %v", err)
return err
}

// If feature is off or cluster is being deleted, we're done
Expand All @@ -54,65 +54,150 @@ func (r *MaasClusterReconciler) ensureLXDInitializerDS(ctx context.Context, clus
}

// Gate: ensure pivot completed. Require mgmt namespace to have clusterEnv=target
mgmtNS := &corev1.Namespace{}
if err := r.Client.Get(ctx, client.ObjectKey{Name: dsNamespace}, mgmtNS); err != nil {
// Namespace not readable yet on management cluster; skip for now
isTarget, clusterEnv := r.namespaceIsTarget(ctx, dsNamespace)
if !isTarget {
r.Log.Info("Namespace not marked as target; deferring LXD initializer", "namespace", dsNamespace, "clusterEnv", clusterEnv)
return nil
}
if v := strings.TrimSpace(mgmtNS.Annotations["clusterEnv"]); v != "target" {
r.Log.Info("Namespace not marked as target; deferring LXD initializer", "namespace", dsNamespace, "clusterEnv", v)

// Gate: derive desired CP count from MaasCloudConfig; fallback to KCP
desiredCP, readyByKCP := r.computeDesiredControlPlane(ctx, dsNamespace, cluster.Name)

if ok := r.enoughCPNodesReady(ctx, remoteClient, desiredCP, readyByKCP); !ok {
return nil
}

// Gate: derive desired CP count from MaasCloudConfig; fallback to KCP
if err := r.deleteExistingInitializerDS(ctx, remoteClient, dsNamespace); err != nil {
return err
}

// Ensure RBAC resources are created on target cluster
if err := r.ensureLXDInitializerRBACOnTarget(ctx, remoteClient, dsNamespace); err != nil {
return fmt.Errorf("failed to ensure LXD initializer RBAC: %v", err)
}

if done, err := r.maybeShortCircuitDelete(ctx, remoteClient, dsNamespace, desiredCP, dsName); err != nil {
return err
} else if done {
return nil
}

ds, err := r.renderDaemonSetForCluster(clusterScope, dsName, dsNamespace)
if err != nil {
return err
}

// Do not set owner refs across clusters; just create/patch on target cluster
_, err = controllerutil.CreateOrPatch(ctx, remoteClient, ds, func() error { return nil })
return err
}

// ensureLXDInitializerRBACOnTarget creates the RBAC resources for lxd-initializer on the target cluster
func (r *MaasClusterReconciler) ensureLXDInitializerRBACOnTarget(ctx context.Context, remoteClient client.Client, namespace string) error {
// Parse RBAC template into separate resources
rbacYaml := strings.ReplaceAll(lxdInitRBACTemplate, "namespace: default", fmt.Sprintf("namespace: %s", namespace))

// Split the YAML into separate documents
docs := strings.Split(rbacYaml, "---")

for _, doc := range docs {
doc = strings.TrimSpace(doc)
if doc == "" {
continue
}

// Parse as unstructured object to handle different resource types
obj := &unstructured.Unstructured{}
if err := yaml.Unmarshal([]byte(doc), obj); err != nil {
return fmt.Errorf("failed to unmarshal RBAC resource: %v", err)
}

// Set namespace for namespaced resources
if obj.GetKind() == "ServiceAccount" {
obj.SetNamespace(namespace)
}

// Create or update the resource on target cluster
_, err := controllerutil.CreateOrPatch(ctx, remoteClient, obj, func() error { return nil })
if err != nil {
return fmt.Errorf("failed to create/patch %s %s: %v", obj.GetKind(), obj.GetName(), err)
}
}

return nil
}

// getTargetClient returns the workload cluster client or a wrapped error
func (r *MaasClusterReconciler) getTargetClient(ctx context.Context, clusterScope *scope.ClusterScope) (client.Client, error) {
remoteClient, err := clusterScope.GetWorkloadClusterClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get target cluster client: %v", err)
}
return remoteClient, nil
}

// namespaceIsTarget checks if the management namespace is annotated as clusterEnv=target
func (r *MaasClusterReconciler) namespaceIsTarget(ctx context.Context, namespace string) (bool, string) {
mgmtNS := &corev1.Namespace{}
if err := r.Client.Get(ctx, client.ObjectKey{Name: namespace}, mgmtNS); err != nil {
return false, ""
}
if mgmtNS.Annotations == nil {
return false, ""
}
v := strings.TrimSpace(mgmtNS.Annotations["clusterEnv"])
return v == "target", v
}

// computeDesiredControlPlane determines desired control-plane replicas and ready count
func (r *MaasClusterReconciler) computeDesiredControlPlane(ctx context.Context, namespace, clusterName string) (int32, int32) {
desiredCP := int32(1)
readyByKCP := int32(0)

// Prefer MaasCloudConfig.spec.machinePoolConfig[].size where isControlPlane=true (sum)
{
mccList := &unstructured.UnstructuredList{}
mccList.SetGroupVersionKind(schema.GroupVersionKind{Group: "cluster.spectrocloud.com", Version: "v1alpha1", Kind: "MaasCloudConfigList"})
if err := r.Client.List(ctx, mccList, client.InNamespace(dsNamespace)); err == nil {
var sum int64
for _, it := range mccList.Items {
owned := false
for _, or := range it.GetOwnerReferences() {
if or.Name == cluster.Name {
owned = true
break
}
}
if !owned && !strings.HasSuffix(it.GetName(), "-maas-config") {
continue
}
pools, found, _ := unstructured.NestedSlice(it.Object, "spec", "machinePoolConfig")
if !found {
continue
mccList := &unstructured.UnstructuredList{}
mccList.SetGroupVersionKind(schema.GroupVersionKind{Group: "cluster.spectrocloud.com", Version: "v1alpha1", Kind: "MaasCloudConfigList"})
if err := r.Client.List(ctx, mccList, client.InNamespace(namespace)); err == nil {
var sum int64
for _, it := range mccList.Items {
owned := false
for _, or := range it.GetOwnerReferences() {
if or.Name == clusterName {
owned = true
break
}
for _, p := range pools {
if mp, ok := p.(map[string]interface{}); ok {
isCP, _, _ := unstructured.NestedBool(mp, "isControlPlane")
if !isCP {
continue
}
if v, foundSz, _ := unstructured.NestedInt64(mp, "size"); foundSz && v > 0 {
sum += v
}
}
if !owned && !strings.HasSuffix(it.GetName(), "-maas-config") {
continue
}
pools, found, _ := unstructured.NestedSlice(it.Object, "spec", "machinePoolConfig")
if !found {
continue
}
for _, p := range pools {
if mp, ok := p.(map[string]interface{}); ok {
isCP, _, _ := unstructured.NestedBool(mp, "isControlPlane")
if !isCP {
continue
}
if v, foundSz, _ := unstructured.NestedInt64(mp, "size"); foundSz && v > 0 {
sum += v
}
}
if sum > 0 {
desiredCP = util.SafeInt64ToInt32(sum)
break
}
}
if sum > 0 {
desiredCP = util.SafeInt64ToInt32(sum)
break
}
}
}

// Fallback: use KCP if MCC not found
if desiredCP == 1 {
kcpList := &unstructured.UnstructuredList{}
kcpList.SetGroupVersionKind(schema.GroupVersionKind{Group: "controlplane.cluster.x-k8s.io", Version: "v1beta1", Kind: "KubeadmControlPlaneList"})
if err := r.Client.List(ctx, kcpList, client.InNamespace(dsNamespace), client.MatchingLabels{
"cluster.x-k8s.io/cluster-name": cluster.Name,
if err := r.Client.List(ctx, kcpList, client.InNamespace(namespace), client.MatchingLabels{
"cluster.x-k8s.io/cluster-name": clusterName,
}); err == nil {
if len(kcpList.Items) > 0 {
item := kcpList.Items[0]
Expand All @@ -126,6 +211,11 @@ func (r *MaasClusterReconciler) ensureLXDInitializerDS(ctx context.Context, clus
}
}

return desiredCP, readyByKCP
}

// enoughCPNodesReady checks the target cluster for Ready control-plane nodes
func (r *MaasClusterReconciler) enoughCPNodesReady(ctx context.Context, remoteClient client.Client, desiredCP, readyByKCP int32) bool {
nodeList := &corev1.NodeList{}
cpSelector := labels.SelectorFromSet(labels.Set{
"node-role.kubernetes.io/control-plane": "",
Expand All @@ -140,63 +230,64 @@ func (r *MaasClusterReconciler) ensureLXDInitializerDS(ctx context.Context, clus
}
}
}
// Proceed when CP nodes are present and Ready, regardless of KCP readyReplicas
if int64(len(nodeList.Items)) < int64(desiredCP) || int64(ready) < int64(desiredCP) {
r.Log.Info("Not enough control-plane nodes present/ready yet; skipping DS for now", "desiredCP", desiredCP, "readyByKCP", readyByKCP, "nodeList", len(nodeList.Items), "ready", ready)
// Not enough control-plane nodes present/ready yet; skip DS for now
return nil
return false
}
}
return true
}

// Clean up any existing DaemonSets in this namespace (old naming/labels)
// deleteExistingInitializerDS removes any DaemonSets with old labeling in the namespace
func (r *MaasClusterReconciler) deleteExistingInitializerDS(ctx context.Context, remoteClient client.Client, namespace string) error {
dsList := &appsv1.DaemonSetList{}
if err := remoteClient.List(ctx, dsList, client.InNamespace(dsNamespace), client.MatchingLabels{
if err := remoteClient.List(ctx, dsList, client.InNamespace(namespace), client.MatchingLabels{
"app": "lxd-initializer",
}); err != nil {
return fmt.Errorf("failed to list DaemonSets: %v", err)
}

// Delete all existing LXD initializer DaemonSets
for _, ds := range dsList.Items {
if err := remoteClient.Delete(ctx, &ds); err != nil {
return fmt.Errorf("failed to delete DaemonSet %s: %v", ds.Name, err)
}
}
return nil
}

// Ensure RBAC resources are created on target cluster
if err := r.ensureLXDInitializerRBACOnTarget(ctx, remoteClient, dsNamespace); err != nil {
return fmt.Errorf("failed to ensure LXD initializer RBAC: %v", err)
}

// Short-circuit deletion: if all control-plane nodes are labeled initialized, delete DS
// maybeShortCircuitDelete deletes the DS if all CP nodes are already initialized
func (r *MaasClusterReconciler) maybeShortCircuitDelete(ctx context.Context, remoteClient client.Client, namespace string, desiredCP int32, dsName string) (bool, error) {
shortCircuitNodes := &corev1.NodeList{}
shortCircuitSelector := labels.SelectorFromSet(labels.Set{
"node-role.kubernetes.io/control-plane": "",
})
if err := remoteClient.List(ctx, shortCircuitNodes, &client.ListOptions{LabelSelector: shortCircuitSelector}); err == nil && len(shortCircuitNodes.Items) > 0 {
// Count how many CP nodes are initialized
initCount := 0
for _, n := range shortCircuitNodes.Items {
if n.Labels != nil && n.Labels["lxdhost.cluster.com/initialized"] == "true" {
initCount++
}
if err := remoteClient.List(ctx, shortCircuitNodes, &client.ListOptions{LabelSelector: shortCircuitSelector}); err != nil || len(shortCircuitNodes.Items) == 0 {
return false, nil
}

initCount := 0
for _, n := range shortCircuitNodes.Items {
if n.Labels != nil && n.Labels["lxdhost.cluster.com/initialized"] == "true" {
initCount++
}
// Delete DS only when we see at least desiredCP control-plane nodes
// AND desiredCP of them are initialized
if int64(len(shortCircuitNodes.Items)) >= int64(desiredCP) && int64(initCount) >= int64(desiredCP) {
// Delete existing DSs and return
shortCircuitDSList := &appsv1.DaemonSetList{}
if err := remoteClient.List(ctx, shortCircuitDSList, client.InNamespace(dsNamespace), client.MatchingLabels{"app": dsName}); err == nil {
for _, ds := range shortCircuitDSList.Items {
_ = remoteClient.Delete(ctx, &ds)
}
}
if int64(len(shortCircuitNodes.Items)) >= int64(desiredCP) && int64(initCount) >= int64(desiredCP) {
shortCircuitDSList := &appsv1.DaemonSetList{}
if err := remoteClient.List(ctx, shortCircuitDSList, client.InNamespace(namespace), client.MatchingLabels{"app": dsName}); err == nil {
for _, ds := range shortCircuitDSList.Items {
_ = remoteClient.Delete(ctx, &ds)
}
return nil
}
return true, nil
}
return false, nil
}

// pull LXD config
// renderDaemonSetForCluster renders the DS YAML from template using cluster config and returns a DaemonSet object
func (r *MaasClusterReconciler) renderDaemonSetForCluster(clusterScope *scope.ClusterScope, dsName, namespace string) (*appsv1.DaemonSet, error) {
cluster := clusterScope.MaasCluster
cfg := clusterScope.GetLXDConfig()

sb := cfg.StorageBackend
if sb == "" {
sb = "zfs"
Expand Down Expand Up @@ -233,7 +324,7 @@ func (r *MaasClusterReconciler) ensureLXDInitializerDS(ctx context.Context, clus

ds := &appsv1.DaemonSet{}
if err := yaml.Unmarshal([]byte(dsYaml), ds); err != nil {
return err
return nil, err
}

// ensure names/labels are cluster-specific without touching the image name
Expand All @@ -244,44 +335,7 @@ func (r *MaasClusterReconciler) ensureLXDInitializerDS(ctx context.Context, clus
ds.Labels["app"] = dsName
ds.Spec.Selector.MatchLabels["app"] = dsName
ds.Spec.Template.Labels["app"] = dsName
ds.Namespace = dsNamespace

// Do not set owner refs across clusters; just create/patch on target cluster
_, err = controllerutil.CreateOrPatch(ctx, remoteClient, ds, func() error { return nil })
return err
}

// ensureLXDInitializerRBACOnTarget creates the RBAC resources for lxd-initializer on the target cluster
func (r *MaasClusterReconciler) ensureLXDInitializerRBACOnTarget(ctx context.Context, remoteClient client.Client, namespace string) error {
// Parse RBAC template into separate resources
rbacYaml := strings.ReplaceAll(lxdInitRBACTemplate, "namespace: default", fmt.Sprintf("namespace: %s", namespace))

// Split the YAML into separate documents
docs := strings.Split(rbacYaml, "---")

for _, doc := range docs {
doc = strings.TrimSpace(doc)
if doc == "" {
continue
}

// Parse as unstructured object to handle different resource types
obj := &unstructured.Unstructured{}
if err := yaml.Unmarshal([]byte(doc), obj); err != nil {
return fmt.Errorf("failed to unmarshal RBAC resource: %v", err)
}

// Set namespace for namespaced resources
if obj.GetKind() == "ServiceAccount" {
obj.SetNamespace(namespace)
}
ds.Namespace = namespace

// Create or update the resource on target cluster
_, err := controllerutil.CreateOrPatch(ctx, remoteClient, obj, func() error { return nil })
if err != nil {
return fmt.Errorf("failed to create/patch %s %s: %v", obj.GetKind(), obj.GetName(), err)
}
}

return nil
return ds, nil
}
Loading