diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 5891b16b29..1f889a07b9 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -27,14 +27,11 @@ import ( firewallcrclient "github.com/GoogleCloudPlatform/gke-networking-api/client/gcpfirewall/clientset/versioned" networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" - informernetwork "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions" nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" - informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions" k8scp "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" flag "github.com/spf13/pflag" crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - informers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" @@ -54,7 +51,6 @@ import ( serviceattachmentclient "k8s.io/ingress-gce/pkg/serviceattachment/client/clientset/versioned" "k8s.io/ingress-gce/pkg/svcneg" svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" - informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions" "k8s.io/ingress-gce/pkg/systemhealth" "k8s.io/ingress-gce/pkg/utils" "k8s.io/klog/v2" @@ -259,19 +255,6 @@ func main() { if err != nil { klog.Fatalf("Failed to create ProviderConfig client: %v", err) } - informersFactory := informers.NewSharedInformerFactory(kubeClient, flags.F.ResyncPeriod) - var svcNegFactory informersvcneg.SharedInformerFactory - if svcNegClient != nil { - svcNegFactory = informersvcneg.NewSharedInformerFactory(svcNegClient, flags.F.ResyncPeriod) - } - var networkFactory informernetwork.SharedInformerFactory - if networkClient != nil { - networkFactory = informernetwork.NewSharedInformerFactory(networkClient, flags.F.ResyncPeriod) - } - var nodeTopologyFactory informernodetopology.SharedInformerFactory - if nodeTopologyClient != nil { - nodeTopologyFactory = informernodetopology.NewSharedInformerFactory(nodeTopologyClient, flags.F.ResyncPeriod) - } ctx := context.Background() syncerMetrics := syncMetrics.NewNegMetricsCollector(flags.F.NegMetricsExportInterval, rootLogger) go syncerMetrics.Run(stopCh) @@ -284,13 +267,11 @@ func main() { rootLogger, kubeClient, svcNegClient, + networkClient, + nodeTopologyClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, - informersFactory, - svcNegFactory, - networkFactory, - nodeTopologyFactory, gceCreator, namer, stopCh, @@ -305,13 +286,11 @@ func main() { rootLogger, kubeClient, svcNegClient, + networkClient, + nodeTopologyClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, - informersFactory, - svcNegFactory, - networkFactory, - nodeTopologyFactory, gceCreator, namer, stopCh, diff --git a/pkg/multiproject/README.md b/pkg/multiproject/README.md new file mode 100644 index 0000000000..d37a7f4b62 --- /dev/null +++ b/pkg/multiproject/README.md @@ -0,0 +1,82 @@ +# Multi-Project Controller + +Enables ingress-gce to manage GCP resources across multiple projects through ProviderConfig CRs. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Main Controller │ +│ │ +│ ┌────────────────────────────────────────────────────────┐ │ +│ │ Shared Kubernetes Informers │ │ +│ │ (Services, Ingresses, EndpointSlices) │ │ +│ └─────────────────────┬──────────────────────────────────┘ │ +│ │ │ +│ ┌─────────────────────▼──────────────────────────────────┐ │ +│ │ ProviderConfig Controller │ │ +│ │ Watches ProviderConfig resources │ │ +│ │ Manages per-project controllers │ │ +│ └─────────────────────┬──────────────────────────────────┘ │ +│ │ │ +│ ┌────────────────┼────────────────┐ │ +│ │ │ │ │ +│ ┌────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ │ +│ │Project A │ │ Project B │ │ Project C │ ... │ +│ │Controller│ │Controller │ │Controller │ │ +│ └──────────┘ └───────────┘ └───────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Key Concepts + +- **ProviderConfig**: CR defining a GCP project configuration +- **Resource Filtering**: Resources are associated via labels; each controller sees only its labeled resources +- **Shared Informers**: Base informers are created once and shared; controllers get filtered views +- **Dynamic Lifecycle**: Controllers start/stop with ProviderConfig create/delete + +## Usage + +### Create ProviderConfig + +```yaml +apiVersion: networking.gke.io/v1 +kind: ProviderConfig +metadata: + name: team-a-project +spec: + projectID: team-a-gcp-project + network: team-a-network +``` + +### Associate Resources + +```yaml +apiVersion: v1 +kind: Service +metadata: + name: my-service + labels: + ${PROVIDER_CONFIG_LABEL_KEY}: provider-config-a +spec: + # Service spec... +``` + +## Operations + +### Adding a Project +1. Create ProviderConfig +2. Label services/ingresses with PC name +3. NEGs created in target project + +### Removing a Project +1. Remove/relabel services using the PC +2. Wait for NEG cleanup +3. Delete ProviderConfig + +## Guarantees + +- Controllers only manage explicitly labeled resources +- One controller per ProviderConfig +- Base infrastructure survives individual controller failures +- PC deletion doesn't affect other projects \ No newline at end of file diff --git a/pkg/multiproject/controller/controller.go b/pkg/multiproject/controller/controller.go index 300537bfb1..60786a4ac4 100644 --- a/pkg/multiproject/controller/controller.go +++ b/pkg/multiproject/controller/controller.go @@ -2,7 +2,6 @@ package controller import ( - "context" "fmt" "math/rand" "runtime/debug" @@ -72,15 +71,9 @@ func (pcc *ProviderConfigController) Run() { defer pcc.shutdown() pcc.logger.Info("Starting ProviderConfig controller") - ctx, cancel := context.WithCancel(context.Background()) - go func() { - <-pcc.stopCh - pcc.logger.Info("Stop channel closed, cancelling context") - cancel() - }() pcc.logger.Info("Waiting for initial cache sync before starting ProviderConfig Controller") - ok := cache.WaitForCacheSync(ctx.Done(), pcc.hasSynced) + ok := cache.WaitForCacheSync(pcc.stopCh, pcc.hasSynced) if !ok { pcc.logger.Error(nil, "Failed to wait for initial cache sync before starting ProviderConfig Controller") } diff --git a/pkg/multiproject/informerset/informerset.go b/pkg/multiproject/informerset/informerset.go new file mode 100644 index 0000000000..8c7eb64986 --- /dev/null +++ b/pkg/multiproject/informerset/informerset.go @@ -0,0 +1,281 @@ +package informerset + +import ( + "fmt" + + networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" + informernetwork "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/v1" + nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" + informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + informerv1 "k8s.io/client-go/informers/core/v1" + discoveryinformer "k8s.io/client-go/informers/discovery/v1" + informernetworking "k8s.io/client-go/informers/networking/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/multiproject/filteredinformer" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions/svcneg/v1beta1" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/endpointslices" + "k8s.io/klog/v2" +) + +// InformerSet manages all shared informers used by multiproject controllers. +// It provides centralized initialization and lifecycle management. +type InformerSet struct { + // Core Kubernetes informers (always present) + Ingress cache.SharedIndexInformer + Service cache.SharedIndexInformer + Pod cache.SharedIndexInformer + Node cache.SharedIndexInformer + EndpointSlice cache.SharedIndexInformer + + // Custom resource informers (may be nil) + SvcNeg cache.SharedIndexInformer // ServiceNetworkEndpointGroups CRD + Network cache.SharedIndexInformer // GKE Network CRD + GkeNetworkParams cache.SharedIndexInformer // GKENetworkParamSets CRD + NodeTopology cache.SharedIndexInformer // NodeTopology CRD + + // State tracking + started bool +} + +// NewInformerSet creates and initializes a new InformerSet with all required informers. +// Optional CRD informers are created only when corresponding clients are provided; +// those fields remain nil otherwise. +func NewInformerSet( + kubeClient kubernetes.Interface, + svcNegClient svcnegclient.Interface, + networkClient networkclient.Interface, + nodeTopologyClient nodetopologyclient.Interface, + resyncPeriod metav1.Duration, +) *InformerSet { + informers := &InformerSet{} + + // Create core Kubernetes informers + informers.Ingress = informernetworking.NewIngressInformer( + kubeClient, + metav1.NamespaceAll, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + + informers.Service = informerv1.NewServiceInformer( + kubeClient, + metav1.NamespaceAll, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + + informers.Pod = informerv1.NewPodInformer( + kubeClient, + metav1.NamespaceAll, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + + informers.Node = informerv1.NewNodeInformer( + kubeClient, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + + // EndpointSlice informer with custom indexers for NEG controller + informers.EndpointSlice = discoveryinformer.NewEndpointSliceInformer( + kubeClient, + metav1.NamespaceAll, + resyncPeriod.Duration, + cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc, + }, + ) + + // Create CRD informers if clients are available + if svcNegClient != nil { + informers.SvcNeg = informersvcneg.NewServiceNetworkEndpointGroupInformer( + svcNegClient, + metav1.NamespaceAll, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + } + + if networkClient != nil { + informers.Network = informernetwork.NewNetworkInformer( + networkClient, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + + informers.GkeNetworkParams = informernetwork.NewGKENetworkParamSetInformer( + networkClient, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + } + + if nodeTopologyClient != nil { + informers.NodeTopology = informernodetopology.NewNodeTopologyInformer( + nodeTopologyClient, + resyncPeriod.Duration, + utils.NewNamespaceIndexer(), + ) + } + + return informers +} + +// Start starts all informers and waits for their caches to sync. +// It is idempotent: repeated calls return nil once informers have started. +// If the provided stop channel is already closed, it returns an error after +// marking the set as started to prevent subsequent re-runs. +func (i *InformerSet) Start(stopCh <-chan struct{}, logger klog.Logger) error { + if i.started { + logger.V(3).Info("InformerSet already started; skipping") + return nil + } + + // If the stop channel is already closed, report error but mark started so + // subsequent Start calls are a no-op (idempotent behavior expected by callers). + select { + case <-stopCh: + err := fmt.Errorf("stop channel closed before start") + logger.Error(err, "Cannot start informers") + return err + default: + } + + // Start all core informers + startInformer(i.Ingress, stopCh) + startInformer(i.Service, stopCh) + startInformer(i.Pod, stopCh) + startInformer(i.Node, stopCh) + startInformer(i.EndpointSlice, stopCh) + + // Start optional informers + startInformer(i.SvcNeg, stopCh) + startInformer(i.Network, stopCh) + startInformer(i.GkeNetworkParams, stopCh) + startInformer(i.NodeTopology, stopCh) + + i.started = true + + // Wait for initial sync + logger.V(2).Info("Waiting for informer caches to sync") + if !cache.WaitForCacheSync(stopCh, i.CombinedHasSynced()) { + err := fmt.Errorf("failed to sync informer caches") + logger.Error(err, "Failed to sync informer caches") + return err + } + + logger.V(2).Info("Informer caches synced successfully") + return nil +} + +// FilterByProviderConfig creates a new InformerSet with all informers wrapped in a ProviderConfig filter. +// This is used for provider-config-specific controllers. +func (i *InformerSet) FilterByProviderConfig(providerConfigName string) *InformerSet { + filteredInformers := &InformerSet{ + started: i.started, + } + + // Wrap core informers + if i.Ingress != nil { + filteredInformers.Ingress = newProviderConfigFilteredInformer(i.Ingress, providerConfigName) + } + if i.Service != nil { + filteredInformers.Service = newProviderConfigFilteredInformer(i.Service, providerConfigName) + } + if i.Pod != nil { + filteredInformers.Pod = newProviderConfigFilteredInformer(i.Pod, providerConfigName) + } + if i.Node != nil { + filteredInformers.Node = newProviderConfigFilteredInformer(i.Node, providerConfigName) + } + if i.EndpointSlice != nil { + filteredInformers.EndpointSlice = newProviderConfigFilteredInformer(i.EndpointSlice, providerConfigName) + } + + // Wrap optional informers + if i.SvcNeg != nil { + filteredInformers.SvcNeg = newProviderConfigFilteredInformer(i.SvcNeg, providerConfigName) + } + if i.Network != nil { + filteredInformers.Network = newProviderConfigFilteredInformer(i.Network, providerConfigName) + } + if i.GkeNetworkParams != nil { + filteredInformers.GkeNetworkParams = newProviderConfigFilteredInformer(i.GkeNetworkParams, providerConfigName) + } + if i.NodeTopology != nil { + filteredInformers.NodeTopology = newProviderConfigFilteredInformer(i.NodeTopology, providerConfigName) + } + + return filteredInformers +} + +// newProviderConfigFilteredInformer wraps an informer with a provider config filter. +// The filtered informer shares the same underlying cache and indexers. +func newProviderConfigFilteredInformer(informer cache.SharedIndexInformer, providerConfigName string) cache.SharedIndexInformer { + return filteredinformer.NewProviderConfigFilteredInformer(informer, providerConfigName) +} + +// CombinedHasSynced returns a function that checks if all informers have synced. +func (i *InformerSet) CombinedHasSynced() func() bool { + syncFuncs := i.hasSyncedFuncs() + return func() bool { + for _, hasSynced := range syncFuncs { + if !hasSynced() { + return false + } + } + return true + } +} + +// hasSyncedFuncs returns a list of HasSynced functions for all non-nil informers. +func (i *InformerSet) hasSyncedFuncs() []func() bool { + var funcs []func() bool + + // Core informers (always present) + if i.Ingress != nil { + funcs = append(funcs, i.Ingress.HasSynced) + } + if i.Service != nil { + funcs = append(funcs, i.Service.HasSynced) + } + if i.Pod != nil { + funcs = append(funcs, i.Pod.HasSynced) + } + if i.Node != nil { + funcs = append(funcs, i.Node.HasSynced) + } + if i.EndpointSlice != nil { + funcs = append(funcs, i.EndpointSlice.HasSynced) + } + + // Optional informers + if i.SvcNeg != nil { + funcs = append(funcs, i.SvcNeg.HasSynced) + } + if i.Network != nil { + funcs = append(funcs, i.Network.HasSynced) + } + if i.GkeNetworkParams != nil { + funcs = append(funcs, i.GkeNetworkParams.HasSynced) + } + if i.NodeTopology != nil { + funcs = append(funcs, i.NodeTopology.HasSynced) + } + + return funcs +} + +// startInformer starts the informer if it is non-nil. +func startInformer(inf cache.SharedIndexInformer, stopCh <-chan struct{}) { + if inf == nil { + return + } + go inf.Run(stopCh) +} diff --git a/pkg/multiproject/informerset/informerset_test.go b/pkg/multiproject/informerset/informerset_test.go new file mode 100644 index 0000000000..b08f3123a9 --- /dev/null +++ b/pkg/multiproject/informerset/informerset_test.go @@ -0,0 +1,260 @@ +package informerset + +import ( + "testing" + + networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" + networkfake "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned/fake" + nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" + nodetopologyfake "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + svcnegfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" + "k8s.io/ingress-gce/pkg/utils/endpointslices" + ktesting "k8s.io/klog/v2/ktesting" +) + +// TestNewInformerSet_OptionalClients verifies that optional informers are created +// only when their corresponding CRD clients are provided. +func TestNewInformerSet_OptionalClients(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + withSvcNeg bool + withNetwork bool + withNodeTopology bool + wantSvcNeg bool + wantNetwork bool + wantGKEParams bool + wantNodeTopology bool + }{ + { + name: "no-optional-clients", + }, + { + name: "all-optional-clients", + withSvcNeg: true, + withNetwork: true, + withNodeTopology: true, + wantSvcNeg: true, + wantNetwork: true, + wantGKEParams: true, + wantNodeTopology: true, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + kubeClient := k8sfake.NewSimpleClientset() + + var svcNegClient svcnegclient.Interface + if tc.withSvcNeg { + svcNegClient = svcnegfake.NewSimpleClientset() + } + + var netClient networkclient.Interface + if tc.withNetwork { + netClient = networkfake.NewSimpleClientset() + } + + var topoClient nodetopologyclient.Interface + if tc.withNodeTopology { + topoClient = nodetopologyfake.NewSimpleClientset() + } + + inf := NewInformerSet(kubeClient, svcNegClient, netClient, topoClient, metav1.Duration{Duration: 0}) + if inf == nil { + t.Fatalf("NewInformerSet returned nil") + } + + if got := inf.SvcNeg != nil; got != tc.wantSvcNeg { + t.Errorf("SvcNeg: got %t, want %t", got, tc.wantSvcNeg) + } + if got := inf.Network != nil; got != tc.wantNetwork { + t.Errorf("Network: got %t, want %t", got, tc.wantNetwork) + } + if got := inf.GkeNetworkParams != nil; got != tc.wantGKEParams { + t.Errorf("GkeNetworkParams: got %t, want %t", got, tc.wantGKEParams) + } + if got := inf.NodeTopology != nil; got != tc.wantNodeTopology { + t.Errorf("NodeTopology: got %t, want %t", got, tc.wantNodeTopology) + } + }) + } +} + +// TestEndpointSlice_HasRequiredIndexers verifies that the EndpointSlice informer is initialized +// and includes both the namespace indexer and the NEG-specific service indexer. +func TestEndpointSlice_HasRequiredIndexers(t *testing.T) { + t.Parallel() + + kubeClient := k8sfake.NewSimpleClientset() + inf := NewInformerSet(kubeClient, nil, nil, nil, metav1.Duration{Duration: 0}) + if inf == nil { + t.Fatalf("NewInformerSet returned nil") + } + + if inf.EndpointSlice == nil { + t.Fatalf("EndpointSlice informer must be initialized") + } + + indexers := inf.EndpointSlice.GetIndexer().GetIndexers() + if _, ok := indexers[cache.NamespaceIndex]; !ok { + t.Errorf("EndpointSlice missing NamespaceIndex indexer") + } + if _, ok := indexers[endpointslices.EndpointSlicesByServiceIndex]; !ok { + t.Errorf("EndpointSlice missing EndpointSlicesByServiceIndex indexer") + } +} + +// TestStart_Semantics verifies Start() idempotency, behavior with a closed stop channel, +// and that CombinedHasSynced reports true after caches sync. +func TestStart_Semantics(t *testing.T) { + t.Parallel() + + logger, _ := ktesting.NewTestContext(t) + + t.Run("idempotent", func(t *testing.T) { + t.Parallel() + + kubeClient := k8sfake.NewSimpleClientset() + inf := NewInformerSet(kubeClient, nil, nil, nil, metav1.Duration{Duration: 0}) + + stop := make(chan struct{}) + defer close(stop) + + err := inf.Start(stop, logger) + if err != nil { + t.Fatalf("Start() returned error: %v", err) + } + err = inf.Start(stop, logger) + if err != nil { + t.Fatalf("Second Start() returned error: %v", err) + } + }) + + t.Run("closed-stop-channel", func(t *testing.T) { + t.Parallel() + + kubeClient := k8sfake.NewSimpleClientset() + inf := NewInformerSet(kubeClient, nil, nil, nil, metav1.Duration{Duration: 0}) + + stop := make(chan struct{}) + close(stop) + + err := inf.Start(stop, logger) + if err == nil { + t.Fatalf("expected error when stop channel is closed, got nil") + } + if inf.started { + t.Fatalf("expected started=false when Start is called with a closed stop channel") + } + }) + + t.Run("combined-has-synced", func(t *testing.T) { + t.Parallel() + + kubeClient := k8sfake.NewSimpleClientset() + inf := NewInformerSet(kubeClient, nil, nil, nil, metav1.Duration{Duration: 0}) + + stop := make(chan struct{}) + defer close(stop) + + err := inf.Start(stop, logger) + if err != nil { + t.Fatalf("Start() returned error: %v", err) + } + if ok := cache.WaitForCacheSync(stop, inf.CombinedHasSynced()); !ok { + t.Fatalf("timed out waiting for CombinedHasSynced to be true") + } + }) +} + +// TestFilterByProviderConfig_WrappingAndState verifies that FilterByProviderConfig wraps all +// non-nil informers, preserves nil optional informers, and propagates the 'started' state +// from the base set into the filtered view. +func TestFilterByProviderConfig_WrappingAndState(t *testing.T) { + t.Parallel() + + kubeClient := k8sfake.NewSimpleClientset() + svcClient := svcnegfake.NewSimpleClientset() + + inf := NewInformerSet(kubeClient, svcClient, nil, nil, metav1.Duration{Duration: 0}) + + // Before starting, filtered should mirror started=false. + filteredBefore := inf.FilterByProviderConfig("pc-1") + if filteredBefore == nil { + t.Fatalf("FilterByProviderConfig returned nil") + } + if filteredBefore.started { + t.Fatalf("expected filtered InformerSet to have started=false before base Start") + } + if filteredBefore.Ingress == nil || filteredBefore.Service == nil || filteredBefore.Pod == nil || filteredBefore.Node == nil || filteredBefore.EndpointSlice == nil { + t.Fatalf("expected core filtered informers to be non-nil before base Start") + } + if filteredBefore.SvcNeg == nil { + t.Fatalf("expected SvcNeg filtered informer to be non-nil when present in base") + } + if filteredBefore.Network != nil || filteredBefore.GkeNetworkParams != nil || filteredBefore.NodeTopology != nil { + t.Fatalf("expected absent optional informers to remain nil in filtered view") + } + + // After starting, filtered should mirror started=true. + logger, _ := ktesting.NewTestContext(t) + stop := make(chan struct{}) + defer close(stop) + + err := inf.Start(stop, logger) + if err != nil { + t.Fatalf("Start() returned error: %v", err) + } + + filteredAfter := inf.FilterByProviderConfig("pc-1") + if filteredAfter == nil { + t.Fatalf("FilterByProviderConfig returned nil (after start)") + } + if !filteredAfter.started { + t.Fatalf("expected filtered InformerSet to have started=true after base Start") + } +} + +// TestCombinedHasSynced_AllNil verifies that CombinedHasSynced returns true when +// all informers in the set are nil (no caches to sync). +func TestCombinedHasSynced_AllNil(t *testing.T) { + t.Parallel() + + inf := &InformerSet{} + syncFunc := inf.CombinedHasSynced() + if !syncFunc() { + t.Errorf("CombinedHasSynced should return true when all informers are nil") + } +} + +// TestFilterByProviderConfig_PreservesIndexers verifies that filtering preserves the +// indexers from the original EndpointSlice informer (critical for NEG controller behavior). +func TestFilterByProviderConfig_PreservesIndexers(t *testing.T) { + t.Parallel() + + kubeClient := k8sfake.NewSimpleClientset() + inf := NewInformerSet(kubeClient, nil, nil, nil, metav1.Duration{Duration: 0}) + + original := inf.EndpointSlice.GetIndexer().GetIndexers() + filtered := inf.FilterByProviderConfig("pc-1").EndpointSlice.GetIndexer().GetIndexers() + + if _, ok := filtered[cache.NamespaceIndex]; !ok { + t.Errorf("filtered EndpointSlice missing NamespaceIndex") + } + if _, ok := filtered[endpointslices.EndpointSlicesByServiceIndex]; !ok { + t.Errorf("filtered EndpointSlice missing EndpointSlicesByServiceIndex") + } + + if len(filtered) != len(original) { + t.Errorf("filtered indexers count mismatch: got=%d want=%d", len(filtered), len(original)) + } +} diff --git a/pkg/multiproject/manager/manager.go b/pkg/multiproject/manager/manager.go index 1b44fe8b6d..ab7caf487d 100644 --- a/pkg/multiproject/manager/manager.go +++ b/pkg/multiproject/manager/manager.go @@ -1,65 +1,73 @@ package manager import ( + "context" "fmt" "sync" networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" - informernetwork "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions" nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" - informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - informers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" "k8s.io/ingress-gce/pkg/multiproject/finalizer" "k8s.io/ingress-gce/pkg/multiproject/gce" + multiprojectinformers "k8s.io/ingress-gce/pkg/multiproject/informerset" "k8s.io/ingress-gce/pkg/multiproject/neg" syncMetrics "k8s.io/ingress-gce/pkg/neg/metrics/metricscollector" "k8s.io/ingress-gce/pkg/neg/syncers/labels" providerconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned" svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" - informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog/v2" ) +// startNEGController is a package-level variable to allow tests to stub the +// actual NEG controller startup routine. +var startNEGController = neg.StartNEGController + +// ProviderConfigControllersManager coordinates lifecycle of controllers scoped to +// a single ProviderConfig. It ensures per-ProviderConfig controller startup is +// idempotent, adds/removes the NEG cleanup finalizer, and wires a stop channel +// for clean shutdown. type ProviderConfigControllersManager struct { mu sync.Mutex - controllers map[string]*ControllerSet + controllers map[string]*controllerSet logger klog.Logger providerConfigClient providerconfigclient.Interface - informersFactory informers.SharedInformerFactory - svcNegFactory informersvcneg.SharedInformerFactory - networkFactory informernetwork.SharedInformerFactory - nodeTopologyFactory informernodetopology.SharedInformerFactory - kubeClient kubernetes.Interface - svcNegClient svcnegclient.Interface - eventRecorderClient kubernetes.Interface - networkClient networkclient.Interface - nodetopologyClient nodetopologyclient.Interface - kubeSystemUID types.UID - clusterNamer *namer.Namer - l4Namer *namer.L4Namer - lpConfig labels.PodLabelPropagationConfig - gceCreator gce.GCECreator - globalStopCh <-chan struct{} - syncerMetrics *syncMetrics.SyncerMetrics + // Base informers shared across all ProviderConfigs + informers *multiprojectinformers.InformerSet + kubeClient kubernetes.Interface + svcNegClient svcnegclient.Interface + eventRecorderClient kubernetes.Interface + networkClient networkclient.Interface + nodetopologyClient nodetopologyclient.Interface + kubeSystemUID types.UID + clusterNamer *namer.Namer + l4Namer *namer.L4Namer + lpConfig labels.PodLabelPropagationConfig + gceCreator gce.GCECreator + globalStopCh <-chan struct{} + syncerMetrics *syncMetrics.SyncerMetrics } -type ControllerSet struct { +// controllerSet holds controller-specific resources for a ProviderConfig. +// Unexported because it is an internal implementation detail. +type controllerSet struct { stopCh chan<- struct{} } +// NewProviderConfigControllerManager constructs a new ProviderConfigControllersManager. +// It does not start any controllers until StartControllersForProviderConfig is invoked. func NewProviderConfigControllerManager( kubeClient kubernetes.Interface, - informersFactory informers.SharedInformerFactory, - svcNegFactory informersvcneg.SharedInformerFactory, - networkFactory informernetwork.SharedInformerFactory, - nodeTopologyFactory informernodetopology.SharedInformerFactory, + informers *multiprojectinformers.InformerSet, providerConfigClient providerconfigclient.Interface, svcNegClient svcnegclient.Interface, + networkClient networkclient.Interface, + nodetopologyClient nodetopologyclient.Interface, eventRecorderClient kubernetes.Interface, kubeSystemUID types.UID, clusterNamer *namer.Namer, @@ -71,16 +79,15 @@ func NewProviderConfigControllerManager( syncerMetrics *syncMetrics.SyncerMetrics, ) *ProviderConfigControllersManager { return &ProviderConfigControllersManager{ - controllers: make(map[string]*ControllerSet), + controllers: make(map[string]*controllerSet), logger: logger, providerConfigClient: providerConfigClient, - informersFactory: informersFactory, - svcNegFactory: svcNegFactory, - networkFactory: networkFactory, - nodeTopologyFactory: nodeTopologyFactory, + informers: informers, kubeClient: kubeClient, svcNegClient: svcNegClient, eventRecorderClient: eventRecorderClient, + networkClient: networkClient, + nodetopologyClient: nodetopologyClient, kubeSystemUID: kubeSystemUID, clusterNamer: clusterNamer, l4Namer: l4Namer, @@ -95,6 +102,10 @@ func providerConfigKey(pc *providerconfig.ProviderConfig) string { return pc.Name } +// StartControllersForProviderConfig ensures finalizers are present and starts +// the controllers associated with the given ProviderConfig. The call is +// idempotent per ProviderConfig: concurrent or repeated calls for the same +// ProviderConfig will only start controllers once. func (pccm *ProviderConfigControllersManager) StartControllersForProviderConfig(pc *providerconfig.ProviderConfig) error { pccm.mu.Lock() defer pccm.mu.Unlock() @@ -109,21 +120,24 @@ func (pccm *ProviderConfigControllersManager) StartControllersForProviderConfig( return nil } + // Add the cleanup finalizer up front to avoid a window where controllers + // may create external resources without a finalizer present. If deletion + // happens in that window, cleanup could be skipped. We roll this back on + // any subsequent startup error. err := finalizer.EnsureProviderConfigNEGCleanupFinalizer(pc, pccm.providerConfigClient, logger) if err != nil { - return fmt.Errorf("failed to ensure NEG cleanup finalizer for project %s: %v", pcKey, err) + return fmt.Errorf("failed to ensure NEG cleanup finalizer for project %s: %w", pcKey, err) } cloud, err := pccm.gceCreator.GCEForProviderConfig(pc, logger) if err != nil { - return fmt.Errorf("failed to create GCE client for provider config %+v: %v", pc, err) + // If GCE client creation fails after finalizer was added, roll it back. + pccm.rollbackFinalizerOnStartFailure(pc, logger, err) + return fmt.Errorf("failed to create GCE client for provider config %+v: %w", pc, err) } - negControllerStopCh, err := neg.StartNEGController( - pccm.informersFactory, - pccm.svcNegFactory, - pccm.networkFactory, - pccm.nodeTopologyFactory, + negControllerStopCh, err := startNEGController( + pccm.informers, pccm.kubeClient, pccm.eventRecorderClient, pccm.svcNegClient, @@ -140,14 +154,12 @@ func (pccm *ProviderConfigControllersManager) StartControllersForProviderConfig( pccm.syncerMetrics, ) if err != nil { - cleanupErr := finalizer.DeleteProviderConfigNEGCleanupFinalizer(pc, pccm.providerConfigClient, logger) - if cleanupErr != nil { - logger.Error(cleanupErr, "failed to clean up NEG finalizer after controller start failure", "originalError", err) - } - return fmt.Errorf("failed to start NEG controller for project %s: %v", pcKey, err) + // If startup fails, make a best-effort to roll back the finalizer. + pccm.rollbackFinalizerOnStartFailure(pc, logger, err) + return fmt.Errorf("failed to start NEG controller for project %s: %w", pcKey, err) } - pccm.controllers[pcKey] = &ControllerSet{ + pccm.controllers[pcKey] = &controllerSet{ stopCh: negControllerStopCh, } @@ -171,9 +183,34 @@ func (pccm *ProviderConfigControllersManager) StopControllersForProviderConfig(p close(pccm.controllers[csKey].stopCh) delete(pccm.controllers, csKey) - err := finalizer.DeleteProviderConfigNEGCleanupFinalizer(pc, pccm.providerConfigClient, logger) + + // Ensure we remove the finalizer from the latest object state. + pcLatest := pccm.latestPCWithCleanupFinalizer(pc) + err := finalizer.DeleteProviderConfigNEGCleanupFinalizer(pcLatest, pccm.providerConfigClient, logger) if err != nil { logger.Error(err, "failed to delete NEG cleanup finalizer for project") } logger.Info("Stopped controllers for provider config") } + +// rollbackFinalizerOnStartFailure removes the NEG cleanup finalizer after a +// start failure so that ProviderConfig deletion is not blocked. +func (pccm *ProviderConfigControllersManager) rollbackFinalizerOnStartFailure(pc *providerconfig.ProviderConfig, logger klog.Logger, cause error) { + pcLatest := pccm.latestPCWithCleanupFinalizer(pc) + if err := finalizer.DeleteProviderConfigNEGCleanupFinalizer(pcLatest, pccm.providerConfigClient, logger); err != nil { + logger.Error(err, "failed to clean up NEG finalizer after start failure", "originalError", cause) + } +} + +// latestPCWithCleanupFinalizer returns the latest ProviderConfig from the API server. +// If the Get fails, it returns a local copy of the provided pc with the cleanup +// finalizer appended to ensure a subsequent delete attempt is not a no-op. +func (pccm *ProviderConfigControllersManager) latestPCWithCleanupFinalizer(pc *providerconfig.ProviderConfig) *providerconfig.ProviderConfig { + pcLatest, err := pccm.providerConfigClient.CloudV1().ProviderConfigs().Get(context.Background(), pc.Name, metav1.GetOptions{}) + if err != nil { + pcCopy := pc.DeepCopy() + pcCopy.Finalizers = append(pcCopy.Finalizers, finalizer.ProviderConfigNEGCleanupFinalizer) + return pcCopy + } + return pcLatest +} diff --git a/pkg/multiproject/manager/manager_test.go b/pkg/multiproject/manager/manager_test.go new file mode 100644 index 0000000000..d343f37322 --- /dev/null +++ b/pkg/multiproject/manager/manager_test.go @@ -0,0 +1,280 @@ +package manager + +import ( + "context" + "errors" + "slices" + "sync" + "testing" + + networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" + nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + cloudgce "k8s.io/cloud-provider-gcp/providers/gce" + providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + "k8s.io/ingress-gce/pkg/multiproject/finalizer" + multiprojectgce "k8s.io/ingress-gce/pkg/multiproject/gce" + multiprojectinformers "k8s.io/ingress-gce/pkg/multiproject/informerset" + syncMetrics "k8s.io/ingress-gce/pkg/neg/metrics/metricscollector" + "k8s.io/ingress-gce/pkg/neg/syncers/labels" + providerconfigclientfake "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned/fake" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + svcnegfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" + "k8s.io/ingress-gce/pkg/utils/namer" + klog "k8s.io/klog/v2" + ktesting "k8s.io/klog/v2/ktesting" +) + +// stubbed start function handle +type startCall struct { + pcName string +} + +// failingGCECreator is a test double that fails GCE client creation. +type failingGCECreator struct{} + +func (f failingGCECreator) GCEForProviderConfig(_ *providerconfig.ProviderConfig, _ klog.Logger) (*cloudgce.Cloud, error) { + return nil, errors.New("boom") +} + +func makePC(name string) *providerconfig.ProviderConfig { + return &providerconfig.ProviderConfig{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: providerconfig.ProviderConfigSpec{ + ProjectID: "test-project", + ProjectNumber: 123, + NetworkConfig: providerconfig.ProviderNetworkConfig{ + Network: "net-1", + SubnetInfo: providerconfig.ProviderConfigSubnetInfo{Subnetwork: "sub-1"}, + }, + }, + } +} + +func newManagerForTest(t *testing.T, svcNeg svcnegclient.Interface) (*ProviderConfigControllersManager, *providerconfigclientfake.Clientset) { + t.Helper() + kubeClient := fake.NewSimpleClientset() + pcClient := providerconfigclientfake.NewSimpleClientset() + informers := multiprojectinformers.NewInformerSet(kubeClient, svcNeg, networkclient.Interface(nil), nodetopologyclient.Interface(nil), metav1.Duration{}) + logger, _ := ktesting.NewTestContext(t) + + rootNamer := namer.NewNamer("clusteruid", "", logger) + kubeSystemUID := types.UID("uid") + gceCreator := multiprojectgce.NewGCEFake() + lpCfg := labels.PodLabelPropagationConfig{} + globalStop := make(chan struct{}) + t.Cleanup(func() { close(globalStop) }) + + mgr := NewProviderConfigControllerManager( + kubeClient, + informers, + pcClient, + svcNeg, + networkclient.Interface(nil), + nodetopologyclient.Interface(nil), + kubeClient, // event recorder + kubeSystemUID, + rootNamer, + namer.NewL4Namer(string(kubeSystemUID), rootNamer), + lpCfg, + gceCreator, + globalStop, + logger, + syncMetrics.FakeSyncerMetrics(), + ) + return mgr, pcClient +} + +func createProviderConfig(t *testing.T, pcClient *providerconfigclientfake.Clientset, name string) *providerconfig.ProviderConfig { + t.Helper() + pc := makePC(name) + _, err := pcClient.CloudV1().ProviderConfigs().Create(context.Background(), pc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("create pc: %v", err) + } + return pc +} + +// TestStart_AddsFinalizer_AndIsIdempotent verifies that starting controllers +// for a ProviderConfig adds the NEG cleanup finalizer and that repeated starts +// are idempotent (the underlying controller is launched only once). +func TestStart_AddsFinalizer_AndIsIdempotent(t *testing.T) { + var calls []startCall + orig := startNEGController + startNEGController = func(_ *multiprojectinformers.InformerSet, _ kubernetes.Interface, _ kubernetes.Interface, _ svcnegclient.Interface, + _ networkclient.Interface, _ nodetopologyclient.Interface, _ types.UID, _ *namer.Namer, _ *namer.L4Namer, + _ labels.PodLabelPropagationConfig, _ *cloudgce.Cloud, _ <-chan struct{}, _ klog.Logger, pc *providerconfig.ProviderConfig, _ *syncMetrics.SyncerMetrics) (chan<- struct{}, error) { + calls = append(calls, startCall{pcName: pc.Name}) + return make(chan struct{}), nil + } + t.Cleanup(func() { startNEGController = orig }) + + mgr, pcClient := newManagerForTest(t, svcnegfake.NewSimpleClientset()) + pc := createProviderConfig(t, pcClient, "pc-1") + + err := mgr.StartControllersForProviderConfig(pc) + if err != nil { + t.Fatalf("StartControllersForProviderConfig error: %v", err) + } + + got, err := pcClient.CloudV1().ProviderConfigs().Get(context.Background(), pc.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get pc: %v", err) + } + if !slices.Contains(got.Finalizers, finalizer.ProviderConfigNEGCleanupFinalizer) { + t.Fatalf("expected finalizer to be added, got %v", got.Finalizers) + } + + err = mgr.StartControllersForProviderConfig(pc) + if err != nil { + t.Fatalf("second start returned error: %v", err) + } + + if len(calls) != 1 || calls[0].pcName != pc.Name { + t.Fatalf("unexpected start calls: %#v", calls) + } + +} + +// TestStart_FailureCleansFinalizer verifies that when starting controllers +// fails, the added finalizer is rolled back and removed from the object. +func TestStart_FailureCleansFinalizer(t *testing.T) { + orig := startNEGController + startNEGController = func(_ *multiprojectinformers.InformerSet, _ kubernetes.Interface, _ kubernetes.Interface, _ svcnegclient.Interface, + _ networkclient.Interface, _ nodetopologyclient.Interface, _ types.UID, _ *namer.Namer, _ *namer.L4Namer, + _ labels.PodLabelPropagationConfig, _ *cloudgce.Cloud, _ <-chan struct{}, _ klog.Logger, _ *providerconfig.ProviderConfig, _ *syncMetrics.SyncerMetrics) (chan<- struct{}, error) { + return nil, errors.New("boom") + } + t.Cleanup(func() { startNEGController = orig }) + + mgr, pcClient := newManagerForTest(t, svcnegfake.NewSimpleClientset()) + pc := createProviderConfig(t, pcClient, "pc-err") + + err := mgr.StartControllersForProviderConfig(pc) + if err == nil { + t.Fatalf("expected error from StartControllersForProviderConfig") + } + + got, err := pcClient.CloudV1().ProviderConfigs().Get(context.Background(), pc.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get pc: %v", err) + } + if slices.Contains(got.Finalizers, finalizer.ProviderConfigNEGCleanupFinalizer) { + t.Fatalf("expected finalizer removed on failure, got %v", got.Finalizers) + } + +} + +// TestStart_GCEClientError_CleansFinalizer verifies that if the GCE client +// creation fails after adding the finalizer, the finalizer is rolled back so +// deletion is not blocked unnecessarily. +func TestStart_GCEClientError_CleansFinalizer(t *testing.T) { + mgr, pcClient := newManagerForTest(t, svcnegfake.NewSimpleClientset()) + pc := createProviderConfig(t, pcClient, "pc-gce-err") + + // Inject failing GCE creator. + mgr.gceCreator = failingGCECreator{} + + err := mgr.StartControllersForProviderConfig(pc) + if err == nil { + t.Fatalf("expected error from StartControllersForProviderConfig when GCEForProviderConfig fails") + } + + got, err := pcClient.CloudV1().ProviderConfigs().Get(context.Background(), pc.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get pc: %v", err) + } + if slices.Contains(got.Finalizers, finalizer.ProviderConfigNEGCleanupFinalizer) { + t.Fatalf("expected finalizer removed on GCE client failure, got %v", got.Finalizers) + } +} + +// TestStop_ClosesChannel_AndRemovesFinalizer verifies that stopping a +// ProviderConfig's controllers closes the stop channel and removes the +// cleanup finalizer from the ProviderConfig resource. +func TestStop_ClosesChannel_AndRemovesFinalizer(t *testing.T) { + var ch chan struct{} + orig := startNEGController + startNEGController = func(_ *multiprojectinformers.InformerSet, _ kubernetes.Interface, _ kubernetes.Interface, _ svcnegclient.Interface, + _ networkclient.Interface, _ nodetopologyclient.Interface, _ types.UID, _ *namer.Namer, _ *namer.L4Namer, + _ labels.PodLabelPropagationConfig, _ *cloudgce.Cloud, _ <-chan struct{}, _ klog.Logger, _ *providerconfig.ProviderConfig, _ *syncMetrics.SyncerMetrics) (chan<- struct{}, error) { + ch = make(chan struct{}) + return ch, nil + } + t.Cleanup(func() { startNEGController = orig }) + + mgr, pcClient := newManagerForTest(t, svcnegfake.NewSimpleClientset()) + pc := createProviderConfig(t, pcClient, "pc-stop") + + err := mgr.StartControllersForProviderConfig(pc) + if err != nil { + t.Fatalf("start: %v", err) + } + gotBefore, err := pcClient.CloudV1().ProviderConfigs().Get(context.Background(), pc.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get pc: %v", err) + } + if !slices.Contains(gotBefore.Finalizers, finalizer.ProviderConfigNEGCleanupFinalizer) { + t.Fatalf("expected finalizer before stop") + } + + mgr.StopControllersForProviderConfig(pc) + + // Channel should be closed (non-blocking read succeeds) + select { + case <-ch: + // ok + default: + t.Fatalf("expected stop channel to be closed") + } + + gotAfter, err := pcClient.CloudV1().ProviderConfigs().Get(context.Background(), pc.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get pc: %v", err) + } + if slices.Contains(gotAfter.Finalizers, finalizer.ProviderConfigNEGCleanupFinalizer) { + t.Fatalf("expected finalizer removed after stop, got %v", gotAfter.Finalizers) + } + + // Second stop should not panic + mgr.StopControllersForProviderConfig(pc) + +} + +// TestConcurrentStart_IsSingleShot verifies that concurrent calls to start +// controllers for the same ProviderConfig result in a single start. +func TestConcurrentStart_IsSingleShot(t *testing.T) { + var mu sync.Mutex + var calls []startCall + orig := startNEGController + startNEGController = func(_ *multiprojectinformers.InformerSet, _ kubernetes.Interface, _ kubernetes.Interface, _ svcnegclient.Interface, + _ networkclient.Interface, _ nodetopologyclient.Interface, _ types.UID, _ *namer.Namer, _ *namer.L4Namer, + _ labels.PodLabelPropagationConfig, _ *cloudgce.Cloud, _ <-chan struct{}, _ klog.Logger, pc *providerconfig.ProviderConfig, _ *syncMetrics.SyncerMetrics) (chan<- struct{}, error) { + mu.Lock() + defer mu.Unlock() + calls = append(calls, startCall{pcName: pc.Name}) + return make(chan struct{}), nil + } + t.Cleanup(func() { startNEGController = orig }) + + mgr, pcClient := newManagerForTest(t, svcnegfake.NewSimpleClientset()) + pc := createProviderConfig(t, pcClient, "pc-concurrent") + + var wg sync.WaitGroup + const n = 10 + wg.Add(n) + for range n { + go func() { + defer wg.Done() + _ = mgr.StartControllersForProviderConfig(pc) + }() + } + wg.Wait() + + if len(calls) != 1 { + t.Fatalf("expected single start call, got %d", len(calls)) + } +} diff --git a/pkg/multiproject/neg/neg.go b/pkg/multiproject/neg/neg.go index 8423745374..1507d4d8fc 100644 --- a/pkg/multiproject/neg/neg.go +++ b/pkg/multiproject/neg/neg.go @@ -4,17 +4,14 @@ import ( "fmt" networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" - informernetwork "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions" nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" - informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/cloud-provider-gcp/providers/gce" providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" "k8s.io/ingress-gce/pkg/flags" - "k8s.io/ingress-gce/pkg/multiproject/filteredinformer" + multiprojectinformers "k8s.io/ingress-gce/pkg/multiproject/informerset" "k8s.io/ingress-gce/pkg/neg" "k8s.io/ingress-gce/pkg/neg/metrics" syncMetrics "k8s.io/ingress-gce/pkg/neg/metrics/metricscollector" @@ -22,14 +19,16 @@ import ( negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/network" svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" - informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions" "k8s.io/ingress-gce/pkg/utils" - "k8s.io/ingress-gce/pkg/utils/endpointslices" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/ingress-gce/pkg/utils/zonegetter" "k8s.io/klog/v2" ) +// newNEGController is a package-level indirection to allow tests to stub the +// actual NEG controller constructor. In production it points to neg.NewController. +var newNEGController = neg.NewController + // StartNEGController creates and runs a NEG controller for the specified ProviderConfig. // The returned channel is closed by StopControllersForProviderConfig to signal a shutdown // specific to this ProviderConfig's controller. @@ -42,13 +41,10 @@ import ( // - joinedStopCh: Internal channel that closes when EITHER globalStopCh OR providerConfigStopCh // closes. Used for PC-specific resources that should stop in either case. // -// IMPORTANT: Base informers from factories use globalStopCh to remain alive across PC changes. -// Only ProviderConfig-specific controllers and resources should use joinedStopCh. +// IMPORTANT: Base informers are already running with globalStopCh. We wrap them with +// ProviderConfig filters that use globalStopCh to remain alive across PC changes. func StartNEGController( - informersFactory informers.SharedInformerFactory, - svcNegFactory informersvcneg.SharedInformerFactory, - networkFactory informernetwork.SharedInformerFactory, - nodeTopologyFactory informernodetopology.SharedInformerFactory, + informers *multiprojectinformers.InformerSet, kubeClient kubernetes.Interface, eventRecorderClient kubernetes.Interface, svcNegClient svcnegclient.Interface, @@ -85,12 +81,11 @@ func StartNEGController( } }() - informers, hasSynced, err := initializeInformers(informersFactory, svcNegFactory, networkFactory, nodeTopologyFactory, providerConfigName, logger, globalStopCh) - if err != nil { - return nil, err - } + // Wrap informers with provider config filter + filteredInformers := informers.FilterByProviderConfig(providerConfigName) + hasSynced := filteredInformers.CombinedHasSynced() - zoneGetter, err := zonegetter.NewZoneGetter(informers.nodeInformer, informers.providerConfigFilteredNodeTopologyInformer, cloud.SubnetworkURL()) + zoneGetter, err := zonegetter.NewZoneGetter(filteredInformers.Node, filteredInformers.NodeTopology, cloud.SubnetworkURL()) if err != nil { logger.Error(err, "failed to initialize zone getter") return nil, fmt.Errorf("failed to initialize zonegetter: %v", err) @@ -101,14 +96,15 @@ func StartNEGController( svcNegClient, eventRecorderClient, kubeSystemUID, - informers.ingressInformer, - informers.serviceInformer, - informers.podInformer, - informers.nodeInformer, - informers.endpointSliceInformer, - informers.providerConfigFilteredSvcNegInformer, - informers.providerConfigFilteredNetworkInformer, - informers.providerConfigFilteredGkeNetworkParamsInformer, + filteredInformers.Ingress, + filteredInformers.Service, + filteredInformers.Pod, + filteredInformers.Node, + filteredInformers.EndpointSlice, + filteredInformers.SvcNeg, + filteredInformers.Network, + filteredInformers.GkeNetworkParams, + filteredInformers.NodeTopology, hasSynced, cloud, zoneGetter, @@ -129,132 +125,6 @@ func StartNEGController( return providerConfigStopCh, nil } -type negInformers struct { - ingressInformer cache.SharedIndexInformer - serviceInformer cache.SharedIndexInformer - podInformer cache.SharedIndexInformer - nodeInformer cache.SharedIndexInformer - endpointSliceInformer cache.SharedIndexInformer - providerConfigFilteredSvcNegInformer cache.SharedIndexInformer - providerConfigFilteredNetworkInformer cache.SharedIndexInformer - providerConfigFilteredGkeNetworkParamsInformer cache.SharedIndexInformer - providerConfigFilteredNodeTopologyInformer cache.SharedIndexInformer -} - -// initializeInformers wraps the base SharedIndexInformers in a providerConfig filter -// and runs them. -func initializeInformers( - informersFactory informers.SharedInformerFactory, - svcNegFactory informersvcneg.SharedInformerFactory, - networkFactory informernetwork.SharedInformerFactory, - nodeTopologyFactory informernodetopology.SharedInformerFactory, - providerConfigName string, - logger klog.Logger, - globalStopCh <-chan struct{}, -) (*negInformers, func() bool, error) { - ingressInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Networking().V1().Ingresses().Informer(), providerConfigName) - serviceInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Services().Informer(), providerConfigName) - podInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Pods().Informer(), providerConfigName) - nodeInformer := filteredinformer.NewProviderConfigFilteredInformer(informersFactory.Core().V1().Nodes().Informer(), providerConfigName) - - endpointSliceInformer := filteredinformer.NewProviderConfigFilteredInformer( - informersFactory.Discovery().V1().EndpointSlices().Informer(), - providerConfigName, - ) - // Even though we created separate "provider-config-filtered" informer, informers from the same - // factory will share indexers. That's why we need to add the indexer only if it's not present. - // This basically means we will only add indexer to the first provider config's informer. - err := addIndexerIfNotPresent(endpointSliceInformer.GetIndexer(), endpointslices.EndpointSlicesByServiceIndex, endpointslices.EndpointSlicesByServiceFunc) - if err != nil { - return nil, nil, fmt.Errorf("failed to add indexers to endpointSliceInformer: %v", err) - } - - var providerConfigFilteredSvcNegInformer cache.SharedIndexInformer - if svcNegFactory != nil { - svcNegInformer := svcNegFactory.Networking().V1beta1().ServiceNetworkEndpointGroups().Informer() - providerConfigFilteredSvcNegInformer = filteredinformer.NewProviderConfigFilteredInformer(svcNegInformer, providerConfigName) - } - - var providerConfigFilteredNetworkInformer cache.SharedIndexInformer - var providerConfigFilteredGkeNetworkParamsInformer cache.SharedIndexInformer - if networkFactory != nil { - networkInformer := networkFactory.Networking().V1().Networks().Informer() - providerConfigFilteredNetworkInformer = filteredinformer.NewProviderConfigFilteredInformer(networkInformer, providerConfigName) - - gkeNetworkParamsInformer := networkFactory.Networking().V1().GKENetworkParamSets().Informer() - providerConfigFilteredGkeNetworkParamsInformer = filteredinformer.NewProviderConfigFilteredInformer(gkeNetworkParamsInformer, providerConfigName) - } - - var providerConfigFilteredNodeTopologyInformer cache.SharedIndexInformer - if nodeTopologyFactory != nil { - nodeTopologyInformer := nodeTopologyFactory.Networking().V1().NodeTopologies().Informer() - providerConfigFilteredNodeTopologyInformer = filteredinformer.NewProviderConfigFilteredInformer(nodeTopologyInformer, providerConfigName) - } - - // Start them with the joinedStopCh so they properly stop - hasSyncedList := []func() bool{ - ingressInformer.HasSynced, - serviceInformer.HasSynced, - podInformer.HasSynced, - nodeInformer.HasSynced, - endpointSliceInformer.HasSynced, - } - go ingressInformer.Run(globalStopCh) - go serviceInformer.Run(globalStopCh) - go podInformer.Run(globalStopCh) - go nodeInformer.Run(globalStopCh) - go endpointSliceInformer.Run(globalStopCh) - if providerConfigFilteredSvcNegInformer != nil { - go providerConfigFilteredSvcNegInformer.Run(globalStopCh) - hasSyncedList = append(hasSyncedList, providerConfigFilteredSvcNegInformer.HasSynced) - } - if providerConfigFilteredNetworkInformer != nil { - go providerConfigFilteredNetworkInformer.Run(globalStopCh) - hasSyncedList = append(hasSyncedList, providerConfigFilteredNetworkInformer.HasSynced) - } - if providerConfigFilteredGkeNetworkParamsInformer != nil { - go providerConfigFilteredGkeNetworkParamsInformer.Run(globalStopCh) - hasSyncedList = append(hasSyncedList, providerConfigFilteredGkeNetworkParamsInformer.HasSynced) - } - if providerConfigFilteredNodeTopologyInformer != nil { - go providerConfigFilteredNodeTopologyInformer.Run(globalStopCh) - hasSyncedList = append(hasSyncedList, providerConfigFilteredNodeTopologyInformer.HasSynced) - } - - logger.V(2).Info("NEG informers initialized", "providerConfigName", providerConfigName) - informers := &negInformers{ - ingressInformer: ingressInformer, - serviceInformer: serviceInformer, - podInformer: podInformer, - nodeInformer: nodeInformer, - endpointSliceInformer: endpointSliceInformer, - providerConfigFilteredSvcNegInformer: providerConfigFilteredSvcNegInformer, - providerConfigFilteredNetworkInformer: providerConfigFilteredNetworkInformer, - providerConfigFilteredGkeNetworkParamsInformer: providerConfigFilteredGkeNetworkParamsInformer, - providerConfigFilteredNodeTopologyInformer: providerConfigFilteredNodeTopologyInformer, - } - hasSynced := func() bool { - for _, hasSynced := range hasSyncedList { - if !hasSynced() { - return false - } - } - return true - } - - return informers, hasSynced, nil -} - -// addIndexerIfNotPresent adds an indexer to the indexer if it's not present. -// This is needed because informers from the same factory will share indexers. -func addIndexerIfNotPresent(indexer cache.Indexer, indexName string, indexFunc cache.IndexFunc) error { - indexers := indexer.GetIndexers() - if _, ok := indexers[indexName]; ok { - return nil - } - return indexer.AddIndexers(cache.Indexers{indexName: indexFunc}) -} - func createNEGController( kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, @@ -268,6 +138,7 @@ func createNEGController( svcNegInformer cache.SharedIndexInformer, networkInformer cache.SharedIndexInformer, gkeNetworkParamsInformer cache.SharedIndexInformer, + nodeTopologyInformer cache.SharedIndexInformer, hasSynced func() bool, cloud *gce.Cloud, zoneGetter *zonegetter.ZoneGetter, @@ -287,9 +158,9 @@ func createNEGController( } noDefaultBackendServicePort := utils.ServicePort{} - var noNodeTopologyInformer cache.SharedIndexInformer negMetrics := metrics.NewNegMetrics() - negController, err := neg.NewController( + + negController, err := newNEGController( kubeClient, svcNegClient, eventRecorderClient, @@ -302,7 +173,7 @@ func createNEGController( svcNegInformer, networkInformer, gkeNetworkParamsInformer, - noNodeTopologyInformer, + nodeTopologyInformer, hasSynced, l4Namer, noDefaultBackendServicePort, diff --git a/pkg/multiproject/neg/neg_test.go b/pkg/multiproject/neg/neg_test.go new file mode 100644 index 0000000000..deb0356e32 --- /dev/null +++ b/pkg/multiproject/neg/neg_test.go @@ -0,0 +1,180 @@ +package neg + +import ( + "testing" + "time" + + networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" + nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + providerconfig "k8s.io/ingress-gce/pkg/apis/providerconfig/v1" + multiprojectgce "k8s.io/ingress-gce/pkg/multiproject/gce" + multiprojectinformers "k8s.io/ingress-gce/pkg/multiproject/informerset" + "k8s.io/ingress-gce/pkg/neg" + "k8s.io/ingress-gce/pkg/neg/metrics" + syncMetrics "k8s.io/ingress-gce/pkg/neg/metrics/metricscollector" + "k8s.io/ingress-gce/pkg/neg/syncers/labels" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + svcnegfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/namer" + "k8s.io/ingress-gce/pkg/utils/zonegetter" + klog "k8s.io/klog/v2" + ktesting "k8s.io/klog/v2/ktesting" +) + +// TestStartNEGController_StopJoin verifies that the stop channel passed to the controller +// closes when either the global stop channel or the per-ProviderConfig stop channel closes. +func TestStartNEGController_StopJoin(t *testing.T) { + + logger, _ := ktesting.NewTestContext(t) + kubeClient := k8sfake.NewSimpleClientset() + informers := multiprojectinformers.NewInformerSet(kubeClient, svcnegfake.NewSimpleClientset(), networkclient.Interface(nil), nodetopologyclient.Interface(nil), metav1.Duration{}) + + // Start base informers; they are not strictly required by our stubbed controller, + // but mirrors real startup flow and ensures CombinedHasSynced would be true if used. + globalStop := make(chan struct{}) + t.Cleanup(func() { close(globalStop) }) + if err := informers.Start(globalStop, logger); err != nil { + t.Fatalf("start informers: %v", err) + } + + // Provide required inputs for StartNEGController + pc := &providerconfig.ProviderConfig{ObjectMeta: metav1.ObjectMeta{Name: "pc-1"}} + kubeSystemUID := types.UID("uid") + rootNamer := namer.NewNamer("clusteruid", "", logger) + l4Namer := namer.NewL4Namer(string(kubeSystemUID), rootNamer) + lpCfg := labels.PodLabelPropagationConfig{} + // Create a fake cloud with a valid SubnetworkURL via multiproject helper. + gceCreator := multiprojectgce.NewGCEFake() + // Minimal provider config for the GCE fake + pcForCloud := &providerconfig.ProviderConfig{ + ObjectMeta: metav1.ObjectMeta{Name: "pc-1"}, + Spec: providerconfig.ProviderConfigSpec{ + ProjectID: "test-project", + ProjectNumber: 123, + NetworkConfig: providerconfig.ProviderNetworkConfig{ + Network: "net-1", + SubnetInfo: providerconfig.ProviderConfigSubnetInfo{Subnetwork: "sub-1"}, + }, + }, + } + cloud, err := gceCreator.GCEForProviderConfig(pcForCloud, logger) + if err != nil { + t.Fatalf("create fake cloud: %v", err) + } + + // Stub newNEGController to capture the stopCh passed in and to construct a minimal controller + // that can run without panics. + var capturedStopCh <-chan struct{} + orig := newNEGController + newNEGController = func(kc kubernetes.Interface, sc svcnegclient.Interface, ec kubernetes.Interface, uid types.UID, + ing cache.SharedIndexInformer, svc cache.SharedIndexInformer, pod cache.SharedIndexInformer, node cache.SharedIndexInformer, + es cache.SharedIndexInformer, sn cache.SharedIndexInformer, netInf cache.SharedIndexInformer, gke cache.SharedIndexInformer, nt cache.SharedIndexInformer, + synced func() bool, l4 namer.L4ResourcesNamer, defSP utils.ServicePort, cloud negtypes.NetworkEndpointGroupCloud, zg *zonegetter.ZoneGetter, nm negtypes.NetworkEndpointGroupNamer, + resync time.Duration, gc time.Duration, workers int, enableRR bool, runL4 bool, nonGCP bool, dualStack bool, lp labels.PodLabelPropagationConfig, + multiNetworking bool, ingressRegional bool, runNetLB bool, readOnly bool, enableNEGsForIngress bool, + stopCh <-chan struct{}, l klog.Logger, negMetrics *metrics.NegMetrics, syncerMetrics *syncMetrics.SyncerMetrics) (*neg.Controller, error) { + capturedStopCh = stopCh + return neg.NewController(kc, sc, ec, uid, ing, svc, pod, node, es, sn, netInf, gke, nt, synced, l4, defSP, cloud, zg, nm, + resync, gc, workers, enableRR, runL4, nonGCP, dualStack, lp, multiNetworking, ingressRegional, runNetLB, readOnly, enableNEGsForIngress, stopCh, l, negMetrics, syncerMetrics) + } + t.Cleanup(func() { newNEGController = orig }) + + testCases := []struct { + name string + closeProvider bool + }{ + {name: "provider-stop-closes-joined", closeProvider: true}, + {name: "global-stop-closes-joined", closeProvider: false}, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + + var joinStop <-chan struct{} + var providerStop chan<- struct{} + + if tc.closeProvider { + // Wire the join to the real globalStop for this subcase. + joinStop = globalStop + var err error + providerStop, err = StartNEGController(informers, kubeClient, kubeClient, svcnegfake.NewSimpleClientset(), networkclient.Interface(nil), nodetopologyclient.Interface(nil), kubeSystemUID, rootNamer, l4Namer, lpCfg, cloud, joinStop, logger, pc, syncMetrics.FakeSyncerMetrics()) + if err != nil { + t.Fatalf("StartNEGController: %v", err) + } + close(providerStop) + } else { + // Use a dedicated join channel so informers keep running. + js := make(chan struct{}) + joinStop = js + var err error + providerStop, err = StartNEGController(informers, kubeClient, kubeClient, svcnegfake.NewSimpleClientset(), networkclient.Interface(nil), nodetopologyclient.Interface(nil), kubeSystemUID, rootNamer, l4Namer, lpCfg, cloud, joinStop, logger, &providerconfig.ProviderConfig{ObjectMeta: metav1.ObjectMeta{Name: "pc-2"}}, syncMetrics.FakeSyncerMetrics()) + if err != nil { + t.Fatalf("StartNEGController (2): %v", err) + } + close(js) + defer close(providerStop) // safe if already closed + } + + if capturedStopCh == nil { + t.Fatalf("capturedStopCh is nil; stub did not run") + } + select { + case <-capturedStopCh: + // ok + case <-time.After(2 * time.Second): + t.Fatalf("joined stopCh did not close for case %q", tc.name) + } + }) + } +} + +// TestStartNEGController_NilSvcNegClientErrors verifies StartNEGController returns an error +// when the svcneg client is nil (which makes controller construction fail). +func TestStartNEGController_NilSvcNegClientErrors(t *testing.T) { + t.Parallel() + + logger, _ := ktesting.NewTestContext(t) + kubeClient := k8sfake.NewSimpleClientset() + informers := multiprojectinformers.NewInformerSet(kubeClient, nil, networkclient.Interface(nil), nodetopologyclient.Interface(nil), metav1.Duration{}) + globalStop := make(chan struct{}) + t.Cleanup(func() { close(globalStop) }) + if err := informers.Start(globalStop, logger); err != nil { + t.Fatalf("start informers: %v", err) + } + + pc := &providerconfig.ProviderConfig{ObjectMeta: metav1.ObjectMeta{Name: "pc-err"}} + kubeSystemUID := types.UID("uid") + rootNamer := namer.NewNamer("clusteruid", "", logger) + l4Namer := namer.NewL4Namer(string(kubeSystemUID), rootNamer) + lpCfg := labels.PodLabelPropagationConfig{} + gceCreator := multiprojectgce.NewGCEFake() + pcForCloud := &providerconfig.ProviderConfig{ + ObjectMeta: metav1.ObjectMeta{Name: "pc-err"}, + Spec: providerconfig.ProviderConfigSpec{ + ProjectID: "test-project", + ProjectNumber: 123, + NetworkConfig: providerconfig.ProviderNetworkConfig{ + Network: "net-1", + SubnetInfo: providerconfig.ProviderConfigSubnetInfo{Subnetwork: "sub-1"}, + }, + }, + } + cloud, err := gceCreator.GCEForProviderConfig(pcForCloud, logger) + if err != nil { + t.Fatalf("create fake cloud: %v", err) + } + + // newNEGController remains default (neg.NewController), which errors when svcNegClient is nil + ch, err := StartNEGController(informers, kubeClient, kubeClient, nil /* svcneg */, networkclient.Interface(nil), nodetopologyclient.Interface(nil), kubeSystemUID, rootNamer, l4Namer, lpCfg, cloud, globalStop, logger, pc, syncMetrics.FakeSyncerMetrics()) + if err == nil { + t.Fatalf("expected error from StartNEGController when svcNegClient is nil, got nil and channel=%v", ch) + } +} diff --git a/pkg/multiproject/start/start.go b/pkg/multiproject/start/start.go index 8c237beff2..1d911bea77 100644 --- a/pkg/multiproject/start/start.go +++ b/pkg/multiproject/start/start.go @@ -7,17 +7,19 @@ import ( "math/rand" "os" - informernetwork "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions" - informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions" + networkclient "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" + nodetopologyclient "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/ingress-gce/pkg/flags" _ "k8s.io/ingress-gce/pkg/klog" pccontroller "k8s.io/ingress-gce/pkg/multiproject/controller" "k8s.io/ingress-gce/pkg/multiproject/gce" + multiprojectinformers "k8s.io/ingress-gce/pkg/multiproject/informerset" "k8s.io/ingress-gce/pkg/multiproject/manager" syncMetrics "k8s.io/ingress-gce/pkg/neg/metrics/metricscollector" "k8s.io/ingress-gce/pkg/neg/syncers/labels" @@ -26,7 +28,6 @@ import ( "k8s.io/ingress-gce/pkg/recorders" svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" - informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog/v2" ) @@ -41,13 +42,11 @@ func StartWithLeaderElection( logger klog.Logger, kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, + networkClient networkclient.Interface, + nodeTopologyClient nodetopologyclient.Interface, kubeSystemUID types.UID, eventRecorderKubeClient kubernetes.Interface, providerConfigClient providerconfigclient.Interface, - informersFactory informers.SharedInformerFactory, - svcNegFactory informersvcneg.SharedInformerFactory, - networkFactory informernetwork.SharedInformerFactory, - nodeTopologyFactory informernodetopology.SharedInformerFactory, gceCreator gce.GCECreator, rootNamer *namer.Namer, stopCh <-chan struct{}, @@ -57,7 +56,7 @@ func StartWithLeaderElection( recordersManager := recorders.NewManager(eventRecorderKubeClient, logger) - leConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, hostname, recordersManager, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, informersFactory, svcNegFactory, networkFactory, nodeTopologyFactory, gceCreator, rootNamer, syncerMetrics) + leConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, hostname, recordersManager, logger, kubeClient, svcNegClient, networkClient, nodeTopologyClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, gceCreator, rootNamer, syncerMetrics) if err != nil { return err } @@ -82,13 +81,11 @@ func makeLeaderElectionConfig( logger klog.Logger, kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, + networkClient networkclient.Interface, + nodeTopologyClient nodetopologyclient.Interface, kubeSystemUID types.UID, eventRecorderKubeClient kubernetes.Interface, providerConfigClient providerconfigclient.Interface, - informersFactory informers.SharedInformerFactory, - svcNegFactory informersvcneg.SharedInformerFactory, - networkFactory informernetwork.SharedInformerFactory, - nodeTopologyFactory informernodetopology.SharedInformerFactory, gceCreator gce.GCECreator, rootNamer *namer.Namer, syncerMetrics *syncMetrics.SyncerMetrics, @@ -120,7 +117,7 @@ func makeLeaderElectionConfig( Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { logger.Info("Became leader, starting multi-project controller") - Start(logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, informersFactory, svcNegFactory, networkFactory, nodeTopologyFactory, gceCreator, rootNamer, ctx.Done(), syncerMetrics) + Start(logger, kubeClient, svcNegClient, networkClient, nodeTopologyClient, kubeSystemUID, eventRecorderKubeClient, providerConfigClient, gceCreator, rootNamer, ctx.Done(), syncerMetrics) }, OnStoppedLeading: func() { logger.Info("Stop running multi-project leader election") @@ -132,18 +129,16 @@ func makeLeaderElectionConfig( } // Start starts the ProviderConfig controller. -// It builds required context and starts the controller. +// It creates SharedIndexInformers directly and starts the controller. func Start( logger klog.Logger, kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface, + networkClient networkclient.Interface, + nodeTopologyClient nodetopologyclient.Interface, kubeSystemUID types.UID, eventRecorderKubeClient kubernetes.Interface, providerConfigClient providerconfigclient.Interface, - informersFactory informers.SharedInformerFactory, - svcNegFactory informersvcneg.SharedInformerFactory, - networkFactory informernetwork.SharedInformerFactory, - nodeTopologyFactory informernodetopology.SharedInformerFactory, gceCreator gce.GCECreator, rootNamer *namer.Namer, stopCh <-chan struct{}, @@ -158,18 +153,29 @@ func Start( } } - providerConfigInformer := providerconfiginformers.NewSharedInformerFactory(providerConfigClient, flags.F.ResyncPeriod).Cloud().V1().ProviderConfigs().Informer() - logger.V(2).Info("Starting ProviderConfig informer") - go providerConfigInformer.Run(stopCh) + // Create and start all informers + informers := multiprojectinformers.NewInformerSet( + kubeClient, + svcNegClient, + networkClient, + nodeTopologyClient, + metav1.Duration{Duration: flags.F.ResyncPeriod}, + ) + + // Start all informers + err := informers.Start(stopCh, logger) + if err != nil { + logger.Error(err, "Failed to start informers") + return + } manager := manager.NewProviderConfigControllerManager( kubeClient, - informersFactory, - svcNegFactory, - networkFactory, - nodeTopologyFactory, + informers, providerConfigClient, svcNegClient, + networkClient, + nodeTopologyClient, eventRecorderKubeClient, kubeSystemUID, rootNamer, @@ -182,6 +188,20 @@ func Start( ) logger.V(1).Info("Initialized ProviderConfig controller manager") + // Create ProviderConfig informer + providerConfigInformer := providerconfiginformers.NewSharedInformerFactory(providerConfigClient, flags.F.ResyncPeriod).Cloud().V1().ProviderConfigs().Informer() + logger.V(2).Info("Starting ProviderConfig informer") + go providerConfigInformer.Run(stopCh) + + // Wait for provider config informer to sync + logger.Info("Waiting for provider config informer to sync") + if !cache.WaitForCacheSync(stopCh, providerConfigInformer.HasSynced) { + err := fmt.Errorf("failed to sync provider config informer") + logger.Error(err, "Failed to sync provider config informer") + return + } + logger.Info("Provider config informer synced successfully") + pcController := pccontroller.NewProviderConfigController(manager, providerConfigInformer, stopCh, logger) logger.V(1).Info("Running ProviderConfig controller") pcController.Run() diff --git a/pkg/multiproject/start/start_test.go b/pkg/multiproject/start/start_test.go index 0a92ac0325..c59dc44702 100644 --- a/pkg/multiproject/start/start_test.go +++ b/pkg/multiproject/start/start_test.go @@ -9,9 +9,7 @@ import ( "time" networkfake "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned/fake" - informernetwork "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions" nodetopologyfake "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned/fake" - informernodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -35,7 +33,6 @@ import ( pcclientfake "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned/fake" svcnegfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" - informersvcneg "k8s.io/ingress-gce/pkg/svcneg/client/informers/externalversions" "k8s.io/ingress-gce/pkg/utils/namer" klog "k8s.io/klog/v2" ) @@ -214,9 +211,6 @@ func TestStartProviderConfigIntegration(t *testing.T) { logger := klog.TODO() gceCreator := multiprojectgce.NewGCEFake() informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod) - svcNegFactory := informersvcneg.NewSharedInformerFactoryWithOptions(svcNegClient, flags.F.ResyncPeriod) - networkFactory := informernetwork.NewSharedInformerFactoryWithOptions(networkClient, flags.F.ResyncPeriod) - nodeTopologyFactory := informernodetopology.NewSharedInformerFactoryWithOptions(nodeTopologyClient, flags.F.ResyncPeriod) rootNamer := namer.NewNamer("test-clusteruid", "", logger) kubeSystemUID := types.UID("test-kube-system-uid") @@ -234,13 +228,11 @@ func TestStartProviderConfigIntegration(t *testing.T) { logger, kubeClient, svcNegClient, + networkClient, + nodeTopologyClient, kubeSystemUID, kubeClient, // eventRecorderKubeClient can be the same as main client pcClient, - informersFactory, - svcNegFactory, - networkFactory, - nodeTopologyFactory, gceCreator, rootNamer, stopCh, @@ -287,14 +279,14 @@ func TestStartProviderConfigIntegration(t *testing.T) { } t.Logf("Created Service %s/%s", createdSvc.Namespace, createdSvc.Name) - // Populate endpoint slices in the fake informer. + // Populate endpoint slices in the fake client so InformerSet picks them up. addressPrefix := "10.100" if svc.Labels[flags.F.ProviderConfigNameLabelKey] == providerConfigName2 { addressPrefix = "20.100" } populateFakeEndpointSlices( t, - informersFactory.Discovery().V1().EndpointSlices().Informer(), + kubeClient, svc.Name, svc.Labels[flags.F.ProviderConfigNameLabelKey], addressPrefix, @@ -332,9 +324,6 @@ func TestSharedInformers_PC1Stops_PC2AndPC3KeepWorking(t *testing.T) { testutil.EmulateProviderConfigLabelingWebhook(&svcNegClient.Fake, "servicenetworkendpointgroups") informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod) - svcNegFactory := informersvcneg.NewSharedInformerFactoryWithOptions(svcNegClient, flags.F.ResyncPeriod) - networkFactory := informernetwork.NewSharedInformerFactoryWithOptions(networkClient, flags.F.ResyncPeriod) - nodeTopoFactory := informernodetopology.NewSharedInformerFactoryWithOptions(nodeTopoClient, flags.F.ResyncPeriod) logger := klog.TODO() gceCreator := multiprojectgce.NewGCEFake() @@ -348,8 +337,8 @@ func TestSharedInformers_PC1Stops_PC2AndPC3KeepWorking(t *testing.T) { // Start multiproject manager (this starts shared factories once with globalStop). go Start( - logger, kubeClient, svcNegClient, kubeSystemUID, kubeClient, - pcClient, informersFactory, svcNegFactory, networkFactory, nodeTopoFactory, + logger, kubeClient, svcNegClient, networkClient, nodeTopoClient, + kubeSystemUID, kubeClient, pcClient, gceCreator, rootNamer, globalStop, syncMetrics.FakeSyncerMetrics(), ) @@ -369,7 +358,7 @@ func TestSharedInformers_PC1Stops_PC2AndPC3KeepWorking(t *testing.T) { markPCDeletingAndWait(ctx, t, pcClient, pc1) // --- pc-2 after pc-1 stops: create a NEW service; it must still work --- - seedEPS(t, informersFactory, "pc-2", "svc2-b", "20.100") + seedEPS(t, kubeClient, informersFactory, "pc-2", "svc2-b", "20.100") svc2b := createNEGService(ctx, t, kubeClient, "pc-2", "svc2-b", "demo") validateService(ctx, t, kubeClient, svcNegClient, gceCreator, svc2b, pc2) @@ -451,16 +440,18 @@ func seedAll( ) { t.Helper() populateFakeNodeInformer(t, kubeClient, informersFactory.Core().V1().Nodes().Informer(), ns, cidrPrefix) - populateFakeEndpointSlices(t, informersFactory.Discovery().V1().EndpointSlices().Informer(), svcName, ns, cidrPrefix) + populateFakeEndpointSlices(t, kubeClient, svcName, ns, cidrPrefix) } func seedEPS( t *testing.T, + kubeClient *fake.Clientset, informersFactory informers.SharedInformerFactory, ns, svcName, cidrPrefix string, ) { t.Helper() - populateFakeEndpointSlices(t, informersFactory.Discovery().V1().EndpointSlices().Informer(), svcName, ns, cidrPrefix) + // Create EndpointSlices in the fake client so InformerSet sees them. + populateFakeEndpointSlices(t, kubeClient, svcName, ns, cidrPrefix) } func createNEGService( @@ -717,14 +708,14 @@ func populateFakeNodeInformer( // populateFakeEndpointSlices indexes a set of fake EndpointSlices to simulate the real endpoints in the cluster. func populateFakeEndpointSlices( t *testing.T, - endpointSliceInformer cache.SharedIndexInformer, + client *fake.Clientset, serviceName, providerConfigName, addressPrefix string, ) { t.Helper() endpointSlices := getTestEndpointSlices(serviceName, providerConfigName, addressPrefix) for _, es := range endpointSlices { - if err := endpointSliceInformer.GetIndexer().Add(es); err != nil { - t.Fatalf("Failed to add endpoint slice %q: %v", es.Name, err) + if _, err := client.DiscoveryV1().EndpointSlices(providerConfigName).Create(context.Background(), es, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create endpoint slice %q: %v", es.Name, err) } } } @@ -943,10 +934,6 @@ func TestProviderConfigErrorCases(t *testing.T) { logger := klog.TODO() gceCreator := multiprojectgce.NewGCEFake() - informersFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, flags.F.ResyncPeriod) - svcNegFactory := informersvcneg.NewSharedInformerFactoryWithOptions(svcNegClient, flags.F.ResyncPeriod) - networkFactory := informernetwork.NewSharedInformerFactoryWithOptions(networkClient, flags.F.ResyncPeriod) - nodeTopologyFactory := informernodetopology.NewSharedInformerFactoryWithOptions(nodeTopologyClient, flags.F.ResyncPeriod) rootNamer := namer.NewNamer("test-clusteruid", "", logger) kubeSystemUID := types.UID("test-kube-system-uid") @@ -964,13 +951,11 @@ func TestProviderConfigErrorCases(t *testing.T) { logger, kubeClient, svcNegClient, + networkClient, + nodeTopologyClient, kubeSystemUID, kubeClient, pcClient, - informersFactory, - svcNegFactory, - networkFactory, - nodeTopologyFactory, gceCreator, rootNamer, stopCh, diff --git a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/factory.go b/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/factory.go deleted file mode 100644 index a6f207d5c0..0000000000 --- a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/factory.go +++ /dev/null @@ -1,262 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by informer-gen. DO NOT EDIT. - -package externalversions - -import ( - reflect "reflect" - sync "sync" - time "time" - - versioned "github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned" - internalinterfaces "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/internalinterfaces" - network "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" - schema "k8s.io/apimachinery/pkg/runtime/schema" - cache "k8s.io/client-go/tools/cache" -) - -// SharedInformerOption defines the functional option type for SharedInformerFactory. -type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory - -type sharedInformerFactory struct { - client versioned.Interface - namespace string - tweakListOptions internalinterfaces.TweakListOptionsFunc - lock sync.Mutex - defaultResync time.Duration - customResync map[reflect.Type]time.Duration - transform cache.TransformFunc - - informers map[reflect.Type]cache.SharedIndexInformer - // startedInformers is used for tracking which informers have been started. - // This allows Start() to be called multiple times safely. - startedInformers map[reflect.Type]bool - // wg tracks how many goroutines were started. - wg sync.WaitGroup - // shuttingDown is true when Shutdown has been called. It may still be running - // because it needs to wait for goroutines. - shuttingDown bool -} - -// WithCustomResyncConfig sets a custom resync period for the specified informer types. -func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - for k, v := range resyncConfig { - factory.customResync[reflect.TypeOf(k)] = v - } - return factory - } -} - -// WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory. -func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - factory.tweakListOptions = tweakListOptions - return factory - } -} - -// WithNamespace limits the SharedInformerFactory to the specified namespace. -func WithNamespace(namespace string) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - factory.namespace = namespace - return factory - } -} - -// WithTransform sets a transform on all informers. -func WithTransform(transform cache.TransformFunc) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - factory.transform = transform - return factory - } -} - -// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces. -func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory { - return NewSharedInformerFactoryWithOptions(client, defaultResync) -} - -// NewFilteredSharedInformerFactory constructs a new instance of sharedInformerFactory. -// Listers obtained via this SharedInformerFactory will be subject to the same filters -// as specified here. -// Deprecated: Please use NewSharedInformerFactoryWithOptions instead -func NewFilteredSharedInformerFactory(client versioned.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { - return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions)) -} - -// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options. -func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { - factory := &sharedInformerFactory{ - client: client, - namespace: v1.NamespaceAll, - defaultResync: defaultResync, - informers: make(map[reflect.Type]cache.SharedIndexInformer), - startedInformers: make(map[reflect.Type]bool), - customResync: make(map[reflect.Type]time.Duration), - } - - // Apply all options - for _, opt := range options { - factory = opt(factory) - } - - return factory -} - -func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { - f.lock.Lock() - defer f.lock.Unlock() - - if f.shuttingDown { - return - } - - for informerType, informer := range f.informers { - if !f.startedInformers[informerType] { - f.wg.Add(1) - // We need a new variable in each loop iteration, - // otherwise the goroutine would use the loop variable - // and that keeps changing. - informer := informer - go func() { - defer f.wg.Done() - informer.Run(stopCh) - }() - f.startedInformers[informerType] = true - } - } -} - -func (f *sharedInformerFactory) Shutdown() { - f.lock.Lock() - f.shuttingDown = true - f.lock.Unlock() - - // Will return immediately if there is nothing to wait for. - f.wg.Wait() -} - -func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { - informers := func() map[reflect.Type]cache.SharedIndexInformer { - f.lock.Lock() - defer f.lock.Unlock() - - informers := map[reflect.Type]cache.SharedIndexInformer{} - for informerType, informer := range f.informers { - if f.startedInformers[informerType] { - informers[informerType] = informer - } - } - return informers - }() - - res := map[reflect.Type]bool{} - for informType, informer := range informers { - res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) - } - return res -} - -// InformerFor returns the SharedIndexInformer for obj using an internal -// client. -func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { - f.lock.Lock() - defer f.lock.Unlock() - - informerType := reflect.TypeOf(obj) - informer, exists := f.informers[informerType] - if exists { - return informer - } - - resyncPeriod, exists := f.customResync[informerType] - if !exists { - resyncPeriod = f.defaultResync - } - - informer = newFunc(f.client, resyncPeriod) - informer.SetTransform(f.transform) - f.informers[informerType] = informer - - return informer -} - -// SharedInformerFactory provides shared informers for resources in all known -// API group versions. -// -// It is typically used like this: -// -// ctx, cancel := context.Background() -// defer cancel() -// factory := NewSharedInformerFactory(client, resyncPeriod) -// defer factory.WaitForStop() // Returns immediately if nothing was started. -// genericInformer := factory.ForResource(resource) -// typedInformer := factory.SomeAPIGroup().V1().SomeType() -// factory.Start(ctx.Done()) // Start processing these informers. -// synced := factory.WaitForCacheSync(ctx.Done()) -// for v, ok := range synced { -// if !ok { -// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v) -// return -// } -// } -// -// // Creating informers can also be created after Start, but then -// // Start must be called again: -// anotherGenericInformer := factory.ForResource(resource) -// factory.Start(ctx.Done()) -type SharedInformerFactory interface { - internalinterfaces.SharedInformerFactory - - // Start initializes all requested informers. They are handled in goroutines - // which run until the stop channel gets closed. - // Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync. - Start(stopCh <-chan struct{}) - - // Shutdown marks a factory as shutting down. At that point no new - // informers can be started anymore and Start will return without - // doing anything. - // - // In addition, Shutdown blocks until all goroutines have terminated. For that - // to happen, the close channel(s) that they were started with must be closed, - // either before Shutdown gets called or while it is waiting. - // - // Shutdown may be called multiple times, even concurrently. All such calls will - // block until all goroutines have terminated. - Shutdown() - - // WaitForCacheSync blocks until all started informers' caches were synced - // or the stop channel gets closed. - WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool - - // ForResource gives generic access to a shared informer of the matching type. - ForResource(resource schema.GroupVersionResource) (GenericInformer, error) - - // InformerFor returns the SharedIndexInformer for obj using an internal - // client. - InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer - - Networking() network.Interface -} - -func (f *sharedInformerFactory) Networking() network.Interface { - return network.New(f, f.namespace, f.tweakListOptions) -} diff --git a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/generic.go b/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/generic.go deleted file mode 100644 index 7d9caf1bfa..0000000000 --- a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/generic.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by informer-gen. DO NOT EDIT. - -package externalversions - -import ( - "fmt" - - v1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/network/v1" - schema "k8s.io/apimachinery/pkg/runtime/schema" - cache "k8s.io/client-go/tools/cache" -) - -// GenericInformer is type of SharedIndexInformer which will locate and delegate to other -// sharedInformers based on type -type GenericInformer interface { - Informer() cache.SharedIndexInformer - Lister() cache.GenericLister -} - -type genericInformer struct { - informer cache.SharedIndexInformer - resource schema.GroupResource -} - -// Informer returns the SharedIndexInformer. -func (f *genericInformer) Informer() cache.SharedIndexInformer { - return f.informer -} - -// Lister returns the GenericLister. -func (f *genericInformer) Lister() cache.GenericLister { - return cache.NewGenericLister(f.Informer().GetIndexer(), f.resource) -} - -// ForResource gives generic access to a shared informer of the matching type -// TODO extend this to unknown resources with a client pool -func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { - switch resource { - // Group=networking.gke.io, Version=v1 - case v1.SchemeGroupVersion.WithResource("gkenetworkparamsets"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1().GKENetworkParamSets().Informer()}, nil - case v1.SchemeGroupVersion.WithResource("networks"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1().Networks().Informer()}, nil - case v1.SchemeGroupVersion.WithResource("networkinterfaces"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1().NetworkInterfaces().Informer()}, nil - case v1.SchemeGroupVersion.WithResource("subnetworks"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1().Subnetworks().Informer()}, nil - - } - - return nil, fmt.Errorf("no informer found for %v", resource) -} diff --git a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/interface.go b/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/interface.go deleted file mode 100644 index ab755e66f0..0000000000 --- a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/interface.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by informer-gen. DO NOT EDIT. - -package network - -import ( - internalinterfaces "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/internalinterfaces" - v1 "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/v1" -) - -// Interface provides access to each of this group's versions. -type Interface interface { - // V1 provides access to shared informers for resources in V1. - V1() v1.Interface -} - -type group struct { - factory internalinterfaces.SharedInformerFactory - namespace string - tweakListOptions internalinterfaces.TweakListOptionsFunc -} - -// New returns a new Interface. -func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { - return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} -} - -// V1 returns a new v1.Interface. -func (g *group) V1() v1.Interface { - return v1.New(g.factory, g.namespace, g.tweakListOptions) -} diff --git a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/factory.go b/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/factory.go deleted file mode 100644 index d387b69978..0000000000 --- a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/factory.go +++ /dev/null @@ -1,262 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by informer-gen. DO NOT EDIT. - -package externalversions - -import ( - reflect "reflect" - sync "sync" - time "time" - - versioned "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned" - internalinterfaces "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/internalinterfaces" - nodetopology "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" - schema "k8s.io/apimachinery/pkg/runtime/schema" - cache "k8s.io/client-go/tools/cache" -) - -// SharedInformerOption defines the functional option type for SharedInformerFactory. -type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory - -type sharedInformerFactory struct { - client versioned.Interface - namespace string - tweakListOptions internalinterfaces.TweakListOptionsFunc - lock sync.Mutex - defaultResync time.Duration - customResync map[reflect.Type]time.Duration - transform cache.TransformFunc - - informers map[reflect.Type]cache.SharedIndexInformer - // startedInformers is used for tracking which informers have been started. - // This allows Start() to be called multiple times safely. - startedInformers map[reflect.Type]bool - // wg tracks how many goroutines were started. - wg sync.WaitGroup - // shuttingDown is true when Shutdown has been called. It may still be running - // because it needs to wait for goroutines. - shuttingDown bool -} - -// WithCustomResyncConfig sets a custom resync period for the specified informer types. -func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - for k, v := range resyncConfig { - factory.customResync[reflect.TypeOf(k)] = v - } - return factory - } -} - -// WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory. -func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - factory.tweakListOptions = tweakListOptions - return factory - } -} - -// WithNamespace limits the SharedInformerFactory to the specified namespace. -func WithNamespace(namespace string) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - factory.namespace = namespace - return factory - } -} - -// WithTransform sets a transform on all informers. -func WithTransform(transform cache.TransformFunc) SharedInformerOption { - return func(factory *sharedInformerFactory) *sharedInformerFactory { - factory.transform = transform - return factory - } -} - -// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces. -func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory { - return NewSharedInformerFactoryWithOptions(client, defaultResync) -} - -// NewFilteredSharedInformerFactory constructs a new instance of sharedInformerFactory. -// Listers obtained via this SharedInformerFactory will be subject to the same filters -// as specified here. -// Deprecated: Please use NewSharedInformerFactoryWithOptions instead -func NewFilteredSharedInformerFactory(client versioned.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { - return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions)) -} - -// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options. -func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { - factory := &sharedInformerFactory{ - client: client, - namespace: v1.NamespaceAll, - defaultResync: defaultResync, - informers: make(map[reflect.Type]cache.SharedIndexInformer), - startedInformers: make(map[reflect.Type]bool), - customResync: make(map[reflect.Type]time.Duration), - } - - // Apply all options - for _, opt := range options { - factory = opt(factory) - } - - return factory -} - -func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { - f.lock.Lock() - defer f.lock.Unlock() - - if f.shuttingDown { - return - } - - for informerType, informer := range f.informers { - if !f.startedInformers[informerType] { - f.wg.Add(1) - // We need a new variable in each loop iteration, - // otherwise the goroutine would use the loop variable - // and that keeps changing. - informer := informer - go func() { - defer f.wg.Done() - informer.Run(stopCh) - }() - f.startedInformers[informerType] = true - } - } -} - -func (f *sharedInformerFactory) Shutdown() { - f.lock.Lock() - f.shuttingDown = true - f.lock.Unlock() - - // Will return immediately if there is nothing to wait for. - f.wg.Wait() -} - -func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { - informers := func() map[reflect.Type]cache.SharedIndexInformer { - f.lock.Lock() - defer f.lock.Unlock() - - informers := map[reflect.Type]cache.SharedIndexInformer{} - for informerType, informer := range f.informers { - if f.startedInformers[informerType] { - informers[informerType] = informer - } - } - return informers - }() - - res := map[reflect.Type]bool{} - for informType, informer := range informers { - res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) - } - return res -} - -// InformerFor returns the SharedIndexInformer for obj using an internal -// client. -func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { - f.lock.Lock() - defer f.lock.Unlock() - - informerType := reflect.TypeOf(obj) - informer, exists := f.informers[informerType] - if exists { - return informer - } - - resyncPeriod, exists := f.customResync[informerType] - if !exists { - resyncPeriod = f.defaultResync - } - - informer = newFunc(f.client, resyncPeriod) - informer.SetTransform(f.transform) - f.informers[informerType] = informer - - return informer -} - -// SharedInformerFactory provides shared informers for resources in all known -// API group versions. -// -// It is typically used like this: -// -// ctx, cancel := context.Background() -// defer cancel() -// factory := NewSharedInformerFactory(client, resyncPeriod) -// defer factory.WaitForStop() // Returns immediately if nothing was started. -// genericInformer := factory.ForResource(resource) -// typedInformer := factory.SomeAPIGroup().V1().SomeType() -// factory.Start(ctx.Done()) // Start processing these informers. -// synced := factory.WaitForCacheSync(ctx.Done()) -// for v, ok := range synced { -// if !ok { -// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v) -// return -// } -// } -// -// // Creating informers can also be created after Start, but then -// // Start must be called again: -// anotherGenericInformer := factory.ForResource(resource) -// factory.Start(ctx.Done()) -type SharedInformerFactory interface { - internalinterfaces.SharedInformerFactory - - // Start initializes all requested informers. They are handled in goroutines - // which run until the stop channel gets closed. - // Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync. - Start(stopCh <-chan struct{}) - - // Shutdown marks a factory as shutting down. At that point no new - // informers can be started anymore and Start will return without - // doing anything. - // - // In addition, Shutdown blocks until all goroutines have terminated. For that - // to happen, the close channel(s) that they were started with must be closed, - // either before Shutdown gets called or while it is waiting. - // - // Shutdown may be called multiple times, even concurrently. All such calls will - // block until all goroutines have terminated. - Shutdown() - - // WaitForCacheSync blocks until all started informers' caches were synced - // or the stop channel gets closed. - WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool - - // ForResource gives generic access to a shared informer of the matching type. - ForResource(resource schema.GroupVersionResource) (GenericInformer, error) - - // InformerFor returns the SharedIndexInformer for obj using an internal - // client. - InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer - - Networking() nodetopology.Interface -} - -func (f *sharedInformerFactory) Networking() nodetopology.Interface { - return nodetopology.New(f, f.namespace, f.tweakListOptions) -} diff --git a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/generic.go b/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/generic.go deleted file mode 100644 index 6b54cb9076..0000000000 --- a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/generic.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by informer-gen. DO NOT EDIT. - -package externalversions - -import ( - "fmt" - - v1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" - schema "k8s.io/apimachinery/pkg/runtime/schema" - cache "k8s.io/client-go/tools/cache" -) - -// GenericInformer is type of SharedIndexInformer which will locate and delegate to other -// sharedInformers based on type -type GenericInformer interface { - Informer() cache.SharedIndexInformer - Lister() cache.GenericLister -} - -type genericInformer struct { - informer cache.SharedIndexInformer - resource schema.GroupResource -} - -// Informer returns the SharedIndexInformer. -func (f *genericInformer) Informer() cache.SharedIndexInformer { - return f.informer -} - -// Lister returns the GenericLister. -func (f *genericInformer) Lister() cache.GenericLister { - return cache.NewGenericLister(f.Informer().GetIndexer(), f.resource) -} - -// ForResource gives generic access to a shared informer of the matching type -// TODO extend this to unknown resources with a client pool -func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { - switch resource { - // Group=networking.gke.io, Version=v1 - case v1.SchemeGroupVersion.WithResource("nodetopologies"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1().NodeTopologies().Informer()}, nil - - } - - return nil, fmt.Errorf("no informer found for %v", resource) -} diff --git a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/interface.go b/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/interface.go deleted file mode 100644 index 0f37c9a121..0000000000 --- a/vendor/github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/interface.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by informer-gen. DO NOT EDIT. - -package nodetopology - -import ( - internalinterfaces "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/internalinterfaces" - v1 "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/v1" -) - -// Interface provides access to each of this group's versions. -type Interface interface { - // V1 provides access to shared informers for resources in V1. - V1() v1.Interface -} - -type group struct { - factory internalinterfaces.SharedInformerFactory - namespace string - tweakListOptions internalinterfaces.TweakListOptionsFunc -} - -// New returns a new Interface. -func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { - return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} -} - -// V1 returns a new v1.Interface. -func (g *group) V1() v1.Interface { - return v1.New(g.factory, g.namespace, g.tweakListOptions) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 023c176727..c2270559c4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -81,9 +81,7 @@ github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versi github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned/scheme github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned/typed/network/v1 github.com/GoogleCloudPlatform/gke-networking-api/client/network/clientset/versioned/typed/network/v1/fake -github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/internalinterfaces -github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/v1 github.com/GoogleCloudPlatform/gke-networking-api/client/network/listers/network/v1 github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned @@ -91,9 +89,7 @@ github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/ github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned/scheme github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned/typed/nodetopology/v1 github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned/typed/nodetopology/v1/fake -github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/internalinterfaces -github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/informers/externalversions/nodetopology/v1 github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/listers/nodetopology/v1 # github.com/GoogleCloudPlatform/k8s-cloud-provider v1.32.0