diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 31a41fc8f490..02cec5619ad3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -255,6 +255,7 @@ jobs: - uses: ./.github/actions/run-monitored-tmpnet-cmd with: run: ./scripts/run_task.sh test-load-kube-kind + runtime: kube artifact_prefix: load-kube prometheus_username: ${{ secrets.PROMETHEUS_ID || '' }} prometheus_password: ${{ secrets.PROMETHEUS_PASSWORD || '' }} diff --git a/flake.nix b/flake.nix index 1bbd86dd45c5..cbd9a812dde7 100644 --- a/flake.nix +++ b/flake.nix @@ -48,7 +48,6 @@ k9s # Kubernetes TUI kind # Kubernetes-in-Docker kubernetes-helm # Helm CLI (Kubernetes package manager) - self.packages.${system}.kind-with-registry # Script installing kind configured with a local registry # Linters shellcheck @@ -65,32 +64,11 @@ # macOS-specific frameworks darwin.apple_sdk.frameworks.Security ]; - }; - }); - - # Package to install the kind-with-registry script - packages = forAllSystems ({ pkgs }: { - kind-with-registry = pkgs.stdenv.mkDerivation { - pname = "kind-with-registry"; - version = "1.0.0"; - src = pkgs.fetchurl { - url = "https://raw.githubusercontent.com/kubernetes-sigs/kind/7cb9e6be25b48a0e248097eef29d496ab1a044d0/site/static/examples/kind-with-registry.sh"; - sha256 = "0gri0x0ygcwmz8l4h6zzsvydw8rsh7qa8p5218d4hncm363i81hv"; - }; - - phases = [ "installPhase" ]; - - installPhase = '' - mkdir -p $out/bin - install -m755 $src $out/bin/kind-with-registry.sh + # Add scripts/ directory to PATH so kind-with-registry.sh is accessible + shellHook = '' + export PATH="$PWD/scripts:$PATH" ''; - - meta = with pkgs.lib; { - description = "Script to set up kind with a local registry"; - license = licenses.mit; - maintainers = with maintainers; [ "maru-ava" ]; - }; }; }); }; diff --git a/scripts/kind-with-registry.sh b/scripts/kind-with-registry.sh new file mode 100755 index 000000000000..5027432722eb --- /dev/null +++ b/scripts/kind-with-registry.sh @@ -0,0 +1,90 @@ +#!/bin/sh +# Based on https://raw.githubusercontent.com/kubernetes-sigs/kind/7cb9e6be25b48a0e248097eef29d496ab1a044d0/site/static/examples/kind-with-registry.sh +# Original work Copyright 2019 The Kubernetes Authors +# Modifications Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +# See the file LICENSE for licensing terms. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# TODO(marun) Migrate this script to golang +set -o errexit + +# 1. Create registry container unless it already exists +reg_name='kind-registry' +reg_port='5001' +if [ "$(docker inspect -f '{{.State.Running}}' "${reg_name}" 2>/dev/null || true)" != 'true' ]; then + docker run \ + -d --restart=always -p "127.0.0.1:${reg_port}:5000" --network bridge --name "${reg_name}" \ + registry:2 +fi + +# 2. Create kind cluster with containerd registry config dir enabled +# TODO: kind will eventually enable this by default and this patch will +# be unnecessary. +# +# See: +# https://github.com/kubernetes-sigs/kind/issues/2875 +# https://github.com/containerd/containerd/blob/main/docs/cri/config.md#registry-configuration +# See: https://github.com/containerd/containerd/blob/main/docs/hosts.md +cat < 0 { - log.Info("setting default value for SchedulingLabelKey", - zap.String("schedulingLabelKey", schedulingLabelKey), + if requireSchedulingDefaults { + var ( + schedulingLabelKey = configMap.Data["schedulingLabelKey"] + schedulingLabelValue = configMap.Data["schedulingLabelValue"] ) - c.SchedulingLabelKey = schedulingLabelKey + if len(c.SchedulingLabelKey) == 0 && len(schedulingLabelKey) > 0 { + log.Info("setting default value for SchedulingLabelKey", + zap.String("schedulingLabelKey", schedulingLabelKey), + ) + c.SchedulingLabelKey = schedulingLabelKey + } + if len(c.SchedulingLabelValue) == 0 && len(schedulingLabelValue) > 0 { + log.Info("setting default value for SchedulingLabelValue", + zap.String("schedulingLabelValue", schedulingLabelValue), + ) + c.SchedulingLabelValue = schedulingLabelValue + } + if len(c.SchedulingLabelKey) == 0 || len(c.SchedulingLabelValue) == 0 { + return errMissingSchedulingLabels + } } - if len(c.SchedulingLabelValue) == 0 && len(schedulingLabelValue) > 0 { - log.Info("setting default value for SchedulingLabelValue", - zap.String("schedulingLabelValue", schedulingLabelValue), + if requireIngressDefaults { + var ( + ingressHost = configMap.Data[ingressHostKey] + ingressSecret = configMap.Data["ingressSecret"] ) - c.SchedulingLabelValue = schedulingLabelValue - } - - // Validate that the scheduling labels are now set - if len(c.SchedulingLabelKey) == 0 || len(c.SchedulingLabelValue) == 0 { - return errMissingSchedulingLabels + if len(c.IngressHost) == 0 && len(ingressHost) > 0 { + log.Info("setting default value for IngressHost", + zap.String("ingressHost", ingressHost), + ) + c.IngressHost = ingressHost + } + if len(c.IngressSecret) == 0 && len(ingressSecret) > 0 { + log.Info("setting default value for IngressSecret", + zap.String("ingressSecret", ingressSecret), + ) + c.IngressSecret = ingressSecret + } + if len(c.IngressHost) == 0 { + return errMissingIngressHost + } } return nil @@ -127,6 +160,8 @@ func (c *KubeRuntimeConfig) ensureDefaults(ctx context.Context, log logging.Logg type KubeRuntime struct { node *Node + + kubeConfig *restclient.Config } // readState reads the URI and staking address for the node if the node is running. @@ -145,6 +180,11 @@ func (p *KubeRuntime) readState(ctx context.Context) error { zap.String("statefulSet", statefulSetName), ) + // Validate that it will be possible to construct accessible URIs when running external to the kube cluster + if !IsRunningInCluster() && len(runtimeConfig.IngressHost) == 0 { + return errors.New("IngressHost must be set when running outside of the kubernetes cluster") + } + clientset, err := p.getClientset() if err != nil { return err @@ -187,31 +227,34 @@ func (p *KubeRuntime) readState(ctx context.Context) error { return nil } -// GetLocalURI retrieves a URI for the node intended to be accessible from this -// process until the provided cancel function is called. -func (p *KubeRuntime) GetLocalURI(ctx context.Context) (string, func(), error) { - if len(p.node.URI) == 0 { - // Assume that an empty URI indicates a need to read pod state - if err := p.readState(ctx); err != nil { - return "", func() {}, fmt.Errorf("failed to read Pod state: %w", err) - } - } - - // Use direct pod URI if running inside the cluster +// GetAccessibleURI retrieves a URI for the node accessible from where +// this process is running. If the process is running inside a kube +// cluster, the node and the process will be assumed to be running in the +// same kube cluster and the node's URI be used. If the process is +// running outside of a kube cluster, a URI accessible from outside of +// the cluster will be used. +func (p *KubeRuntime) GetAccessibleURI() string { if IsRunningInCluster() { - return p.node.URI, func() {}, nil + return p.node.URI } - port, stopChan, err := p.forwardPort(ctx, config.DefaultHTTPPort) - if err != nil { - return "", nil, err + var ( + protocol = "http" + nodeID = p.node.NodeID.String() + networkUUID = p.node.network.UUID + runtimeConfig = p.runtimeConfig() + ) + // Assume tls is configured for an ingress secret + if len(runtimeConfig.IngressSecret) > 0 { + protocol = "https" } - return fmt.Sprintf("http://127.0.0.1:%d", port), func() { close(stopChan) }, nil + + return fmt.Sprintf("%s://%s/networks/%s/%s", protocol, runtimeConfig.IngressHost, networkUUID, nodeID) } -// GetLocalStakingAddress retrieves a StakingAddress for the node intended to be +// GetAccessibleStakingAddress retrieves a StakingAddress for the node intended to be // accessible from this process until the provided cancel function is called. -func (p *KubeRuntime) GetLocalStakingAddress(ctx context.Context) (netip.AddrPort, func(), error) { +func (p *KubeRuntime) GetAccessibleStakingAddress(ctx context.Context) (netip.AddrPort, func(), error) { if p.node.StakingAddress == (netip.AddrPort{}) { // Assume that an empty staking address indicates a need to retrieve pod state if err := p.readState(ctx); err != nil { @@ -364,6 +407,23 @@ func (p *KubeRuntime) Start(ctx context.Context) error { zap.String("statefulSet", statefulSetName), ) + if !IsRunningInCluster() { + // If running outside the cluster, ensure the node's API port is accessible via ingress + + serviceName := "s-" + statefulSetName // The 's-' prefix ensures DNS compatibility + if err := p.createNodeService(ctx, serviceName); err != nil { + return fmt.Errorf("failed to create Service for node: %w", err) + } + + if err := p.createNodeIngress(ctx, serviceName); err != nil { + return fmt.Errorf("failed to create Ingress for node: %w", err) + } + + if err := p.waitForIngressReadiness(ctx, serviceName); err != nil { + return fmt.Errorf("failed to wait for Ingress readiness: %w", err) + } + } + return p.ensureBootstrapIP(ctx) } @@ -624,9 +684,6 @@ func (p *KubeRuntime) Restart(ctx context.Context) error { } // IsHealthy checks if the node is running and healthy. -// -// TODO(marun) Add WaitForHealthy as a runtime method to minimize API calls required and -// enable reuse of forwarded connection when running external to the kubernetes cluster func (p *KubeRuntime) IsHealthy(ctx context.Context) (bool, error) { err := p.readState(ctx) if err != nil { @@ -636,13 +693,7 @@ func (p *KubeRuntime) IsHealthy(ctx context.Context) (bool, error) { return false, errNotRunning } - uri, cancel, err := p.GetLocalURI(ctx) - if err != nil { - return false, err - } - defer cancel() - - healthReply, err := CheckNodeHealth(ctx, uri) + healthReply, err := CheckNodeHealth(ctx, p.GetAccessibleURI()) if errors.Is(err, ErrUnrecoverableNodeHealthCheck) { return false, err } else if err != nil { @@ -769,13 +820,23 @@ func (p *KubeRuntime) runtimeConfig() *KubeRuntimeConfig { return p.node.getRuntimeConfig().Kube } +// getKubeconfig retrieves the kubeconfig for the target cluster. It +// will be cached after the first call to avoid unnecessary logging +// when running in-cluster. func (p *KubeRuntime) getKubeconfig() (*restclient.Config, error) { - runtimeConfig := p.runtimeConfig() - return GetClientConfig( - p.node.network.log, - runtimeConfig.ConfigPath, - runtimeConfig.ConfigContext, - ) + if p.kubeConfig == nil { + runtimeConfig := p.runtimeConfig() + config, err := GetClientConfig( + p.node.network.log, + runtimeConfig.ConfigPath, + runtimeConfig.ConfigContext, + ) + if err != nil { + return nil, fmt.Errorf("failed to get kubeconfig: %w", err) + } + p.kubeConfig = config + } + return p.kubeConfig, nil } func (p *KubeRuntime) getClientset() (*kubernetes.Clientset, error) { @@ -842,6 +903,10 @@ func (p *KubeRuntime) getFlags() (FlagsMap, error) { flags[config.DataDirKey] = volumeMountPath // The node must bind to the Pod IP to enable the kubelet to access the http port for the readiness check flags[config.HTTPHostKey] = "0.0.0.0" + // Ensure compatibility with a non-localhost ingress host + if !IsRunningInCluster() && !strings.HasPrefix(p.runtimeConfig().IngressHost, "localhost") { + flags[config.HTTPAllowedHostsKey] = p.runtimeConfig().IngressHost + } return flags, nil } @@ -890,6 +955,310 @@ func configureExclusiveScheduling(template *corev1.PodTemplateSpec, labelKey str } } +// createNodeService creates a Kubernetes Service for the node to enable ingress routing +func (p *KubeRuntime) createNodeService(ctx context.Context, serviceName string) error { + var ( + log = p.node.network.log + nodeID = p.node.NodeID.String() + runtimeConfig = p.runtimeConfig() + namespace = runtimeConfig.Namespace + ) + + log.Debug("creating Service for node", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("service", serviceName), + ) + + clientset, err := p.getClientset() + if err != nil { + return err + } + + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: namespace, + Labels: map[string]string{ + "app": serviceName, + "network-uuid": p.node.network.UUID, + "node-id": nodeID, + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "network_uuid": p.node.network.UUID, + "node_id": nodeID, + }, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: config.DefaultHTTPPort, + TargetPort: intstr.FromInt(config.DefaultHTTPPort), + Protocol: corev1.ProtocolTCP, + }, + }, + Type: corev1.ServiceTypeClusterIP, + }, + } + + _, err = clientset.CoreV1().Services(namespace).Create(ctx, service, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create Service: %w", err) + } + + log.Debug("created Service", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("service", serviceName), + ) + + return nil +} + +// createNodeIngress creates a Kubernetes Ingress for the node to enable external access +func (p *KubeRuntime) createNodeIngress(ctx context.Context, serviceName string) error { + var ( + log = p.node.network.log + nodeID = p.node.NodeID.String() + runtimeConfig = p.runtimeConfig() + namespace = runtimeConfig.Namespace + networkUUID = p.node.network.UUID + ) + + log.Debug("creating Ingress for node", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("service", serviceName), + ) + + clientset, err := p.getClientset() + if err != nil { + return err + } + + var ( + ingressClassName = "nginx" // Assume nginx ingress controller + // Path pattern: /networks//(/|$)(.*) + // Using (/|$)(.*) to properly handle trailing slashes + pathPattern = fmt.Sprintf("/networks/%s/%s", networkUUID, nodeID) + "(/|$)(.*)" + pathType = networkingv1.PathTypeImplementationSpecific + ) + + // Build the ingress rules + ingressRules := []networkingv1.IngressRule{ + { + IngressRuleValue: networkingv1.IngressRuleValue{ + HTTP: &networkingv1.HTTPIngressRuleValue{ + Paths: []networkingv1.HTTPIngressPath{ + { + Path: pathPattern, + PathType: &pathType, + Backend: networkingv1.IngressBackend{ + Service: &networkingv1.IngressServiceBackend{ + Name: serviceName, + Port: networkingv1.ServiceBackendPort{ + Number: config.DefaultHTTPPort, + }, + }, + }, + }, + }, + }, + }, + }, + } + + // Add host if not localhost + if !strings.HasPrefix(runtimeConfig.IngressHost, "localhost") { + ingressRules[0].Host = runtimeConfig.IngressHost + } + + ingress := &networkingv1.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: namespace, + Labels: map[string]string{ + "app": serviceName, + "network-uuid": networkUUID, + "node-id": nodeID, + }, + Annotations: map[string]string{ + "nginx.ingress.kubernetes.io/use-regex": "true", + "nginx.ingress.kubernetes.io/rewrite-target": "/$2", + "nginx.ingress.kubernetes.io/proxy-body-size": "0", + "nginx.ingress.kubernetes.io/proxy-read-timeout": "600", + "nginx.ingress.kubernetes.io/proxy-send-timeout": "600", + }, + }, + Spec: networkingv1.IngressSpec{ + IngressClassName: &ingressClassName, + Rules: ingressRules, + }, + } + + // Add TLS configuration if IngressSecret is set + if len(runtimeConfig.IngressSecret) > 0 && !strings.HasPrefix(runtimeConfig.IngressHost, "localhost") { + ingress.Spec.TLS = []networkingv1.IngressTLS{ + { + Hosts: []string{runtimeConfig.IngressHost}, + SecretName: runtimeConfig.IngressSecret, + }, + } + } + + _, err = clientset.NetworkingV1().Ingresses(namespace).Create(ctx, ingress, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create Ingress: %w", err) + } + + log.Debug("created Ingress", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + zap.String("path", pathPattern), + ) + + return nil +} + +// waitForIngressReadiness waits for the ingress to be ready and able to route traffic +// This prevents 503 errors when health checks are performed immediately after node start +func (p *KubeRuntime) waitForIngressReadiness(ctx context.Context, serviceName string) error { + var ( + log = p.node.network.log + nodeID = p.node.NodeID.String() + runtimeConfig = p.runtimeConfig() + namespace = runtimeConfig.Namespace + ) + + log.Debug("waiting for Ingress readiness", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + ) + + clientset, err := p.getClientset() + if err != nil { + return err + } + + // Wait for the ingress to exist, be processed by the controller, and service endpoints to be available + err = wait.PollUntilContextCancel( + ctx, + statusCheckInterval, + true, // immediate + func(ctx context.Context) (bool, error) { + // Check if ingress exists and is processed by the controller + ingress, err := clientset.NetworkingV1().Ingresses(namespace).Get(ctx, serviceName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + log.Verbo("waiting for Ingress to be created", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + ) + return false, nil + } + if err != nil { + log.Warn("failed to retrieve Ingress", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + zap.Error(err), + ) + return false, nil + } + + // Check if ingress controller has processed the ingress + // The ingress controller should populate the Status.LoadBalancer.Ingress field + // when it has successfully processed and exposed the ingress + hasIngressIP := len(ingress.Status.LoadBalancer.Ingress) > 0 + if !hasIngressIP { + log.Verbo("waiting for Ingress controller to process and expose the Ingress", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + ) + return false, nil + } + + // Validate that at least one ingress has an IP or hostname + hasValidIngress := false + for _, ing := range ingress.Status.LoadBalancer.Ingress { + if ing.IP != "" || ing.Hostname != "" { + hasValidIngress = true + break + } + } + + if !hasValidIngress { + log.Verbo("waiting for Ingress controller to assign IP or hostname", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + ) + return false, nil + } + + // Check if service endpoints are available + endpoints, err := clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + log.Verbo("waiting for Service endpoints to be created", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("service", serviceName), + ) + return false, nil + } + if err != nil { + log.Warn("failed to retrieve Service endpoints", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("service", serviceName), + zap.Error(err), + ) + return false, nil + } + + // Check if endpoints have at least one ready address + hasReadyEndpoints := false + for _, subset := range endpoints.Subsets { + if len(subset.Addresses) > 0 { + hasReadyEndpoints = true + break + } + } + + if !hasReadyEndpoints { + log.Verbo("waiting for Service endpoints to have ready addresses", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("service", serviceName), + ) + return false, nil + } + + log.Debug("Ingress is exposed by controller and Service endpoints are ready", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + ) + return true, nil + }, + ) + if err != nil { + return fmt.Errorf("failed to wait for Ingress %s/%s readiness: %w", namespace, serviceName, err) + } + + log.Debug("Ingress is ready", + zap.String("nodeID", nodeID), + zap.String("namespace", namespace), + zap.String("ingress", serviceName), + ) + + return nil +} + // IsRunningInCluster detects if this code is running inside a Kubernetes cluster // by checking for the presence of the service account token that's automatically // mounted in every pod. diff --git a/tests/fixture/tmpnet/monitor_kube.go b/tests/fixture/tmpnet/monitor_kube.go index 1bd5cc11cbf2..6785195c45d0 100644 --- a/tests/fixture/tmpnet/monitor_kube.go +++ b/tests/fixture/tmpnet/monitor_kube.go @@ -36,8 +36,8 @@ type kubeCollectorConfig struct { manifest []byte } -// DeployKubeCollectors deploys collectors of logs and metrics to a Kubernetes cluster. -func DeployKubeCollectors( +// deployKubeCollectors deploys collectors of logs and metrics to a Kubernetes cluster. +func deployKubeCollectors( ctx context.Context, log logging.Logger, configPath string, diff --git a/tests/fixture/tmpnet/network.go b/tests/fixture/tmpnet/network.go index b990ed114393..ecbe1e18de2b 100644 --- a/tests/fixture/tmpnet/network.go +++ b/tests/fixture/tmpnet/network.go @@ -449,11 +449,7 @@ func (n *Network) Bootstrap(ctx context.Context, log logging.Logger) error { } // Don't restart the node during subnet creation since it will always be restarted afterwards. - uri, cancel, err := bootstrapNode.GetLocalURI(ctx) - if err != nil { - return err - } - defer cancel() + uri := bootstrapNode.GetAccessibleURI() if err := n.CreateSubnets(ctx, log, uri, false /* restartRequired */); err != nil { return err } @@ -784,11 +780,9 @@ func (n *Network) GetNode(nodeID ids.NodeID) (*Node, error) { return nil, fmt.Errorf("%s is not known to the network", nodeID) } -// GetNodeURIs returns the URIs of nodes in the network that are running and not ephemeral. The URIs -// returned are guaranteed be reachable by the caller until the cleanup function is called regardless -// of whether the nodes are running as local processes or in a kube cluster. -func (n *Network) GetNodeURIs(ctx context.Context, deferCleanupFunc func(func())) ([]NodeURI, error) { - return GetNodeURIs(ctx, n.Nodes, deferCleanupFunc) +// GetNodeURIs returns the accessible URIs of nodes in the network that are running and not ephemeral. +func (n *Network) GetNodeURIs() []NodeURI { + return GetNodeURIs(n.Nodes) } // GetAvailableNodeIDs returns the node IDs of nodes in the network that are running and not ephemeral. @@ -969,7 +963,7 @@ func waitForHealthy(ctx context.Context, log logging.Logger, nodes []*Node) erro unhealthyNodes.Remove(node) log.Info("node is healthy", zap.Stringer("nodeID", node.NodeID), - zap.String("uri", node.URI), + zap.String("uri", node.GetAccessibleURI()), ) } diff --git a/tests/fixture/tmpnet/node.go b/tests/fixture/tmpnet/node.go index a76eb02a7e44..347253356b68 100644 --- a/tests/fixture/tmpnet/node.go +++ b/tests/fixture/tmpnet/node.go @@ -41,8 +41,8 @@ var ( // NodeRuntime defines the methods required to support running a node. type NodeRuntime interface { readState(ctx context.Context) error - GetLocalURI(ctx context.Context) (string, func(), error) - GetLocalStakingAddress(ctx context.Context) (netip.AddrPort, func(), error) + GetAccessibleURI() string + GetAccessibleStakingAddress(ctx context.Context) (netip.AddrPort, func(), error) Start(ctx context.Context) error InitiateStop(ctx context.Context) error WaitForStopped(ctx context.Context) error @@ -199,12 +199,12 @@ func (n *Node) readState(ctx context.Context) error { return n.getRuntime().readState(ctx) } -func (n *Node) GetLocalURI(ctx context.Context) (string, func(), error) { - return n.getRuntime().GetLocalURI(ctx) +func (n *Node) GetAccessibleURI() string { + return n.getRuntime().GetAccessibleURI() } -func (n *Node) GetLocalStakingAddress(ctx context.Context) (netip.AddrPort, func(), error) { - return n.getRuntime().GetLocalStakingAddress(ctx) +func (n *Node) GetAccessibleStakingAddress(ctx context.Context) (netip.AddrPort, func(), error) { + return n.getRuntime().GetAccessibleStakingAddress(ctx) } // Writes the current state of the metrics endpoint to disk @@ -213,12 +213,7 @@ func (n *Node) SaveMetricsSnapshot(ctx context.Context) error { // No URI to request metrics from return nil } - baseURI, cancel, err := n.GetLocalURI(ctx) - if err != nil { - return nil - } - defer cancel() - uri := baseURI + "/ext/metrics" + uri := n.GetAccessibleURI() + "/ext/metrics" req, err := http.NewRequestWithContext(ctx, http.MethodGet, uri, nil) if err != nil { return err diff --git a/tests/fixture/tmpnet/process_runtime.go b/tests/fixture/tmpnet/process_runtime.go index 9088f0928210..94d7154c3d59 100644 --- a/tests/fixture/tmpnet/process_runtime.go +++ b/tests/fixture/tmpnet/process_runtime.go @@ -417,11 +417,12 @@ func (p *ProcessRuntime) writeMonitoringConfigFile(name string, config []ConfigM return nil } -func (p *ProcessRuntime) GetLocalURI(_ context.Context) (string, func(), error) { - return p.node.URI, func() {}, nil +// GetAccessibleURI returns the URI that can be used to access the node's API. +func (p *ProcessRuntime) GetAccessibleURI() string { + return p.node.URI } -func (p *ProcessRuntime) GetLocalStakingAddress(_ context.Context) (netip.AddrPort, func(), error) { +func (p *ProcessRuntime) GetAccessibleStakingAddress(_ context.Context) (netip.AddrPort, func(), error) { return p.node.StakingAddress, func() {}, nil } diff --git a/tests/fixture/tmpnet/start_kind_cluster.go b/tests/fixture/tmpnet/start_kind_cluster.go index 1957005c4d21..f8d4299875c7 100644 --- a/tests/fixture/tmpnet/start_kind_cluster.go +++ b/tests/fixture/tmpnet/start_kind_cluster.go @@ -13,6 +13,7 @@ import ( "strings" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -37,6 +38,13 @@ const ( // TODO(marun) Check for the presence of the context rather than string matching on this error missingContextMsg = `context "` + KindKubeconfigContext + `" does not exist` + + // Ingress controller constants + ingressNamespace = "ingress-nginx" + ingressReleaseName = "ingress-nginx" + ingressChartRepo = "https://kubernetes.github.io/ingress-nginx" + ingressChartName = "ingress-nginx/ingress-nginx" + ingressControllerName = "ingress-nginx-controller" ) //go:embed yaml/tmpnet-rbac.yaml @@ -96,10 +104,18 @@ func StartKindCluster( return fmt.Errorf("failed to create service account kubeconfig context: %w", err) } - if err := DeployKubeCollectors(ctx, log, configPath, configContext, startMetricsCollector, startLogsCollector); err != nil { + if err := deployKubeCollectors(ctx, log, configPath, configContext, startMetricsCollector, startLogsCollector); err != nil { return fmt.Errorf("failed to deploy kube collectors: %w", err) } + if err := deployIngressController(ctx, log, configPath, configContext); err != nil { + return fmt.Errorf("failed to deploy ingress controller: %w", err) + } + + if err := createDefaultsConfigMap(ctx, log, configPath, configContext, DefaultTmpnetNamespace); err != nil { + return fmt.Errorf("failed to create defaults ConfigMap: %w", err) + } + return nil } @@ -227,13 +243,18 @@ func createServiceAccountKubeconfig( return fmt.Errorf("failed to load kubeconfig: %w", err) } - // Check if the context already exists if _, exists := config.Contexts[newContextName]; exists { - log.Info("service account kubeconfig context already exists", + log.Info("service account kubeconfig context exists, recreating to ensure consistency with cluster state", + zap.String("kubeconfig", configPath), + zap.String("context", newContextName), + zap.String("namespace", namespace), + ) + } else { + log.Info("creating new service account kubeconfig context", + zap.String("kubeconfig", configPath), zap.String("context", newContextName), zap.String("namespace", namespace), ) - return nil } // Get the current context (already verified to exist by StartKindCluster) @@ -279,9 +300,162 @@ func createServiceAccountKubeconfig( } log.Info("created service account kubeconfig context", + zap.String("kubeconfig", configPath), zap.String("context", newContextName), zap.String("namespace", namespace), ) return nil } + +// deployIngressController deploys the nginx ingress controller using Helm. +func deployIngressController(ctx context.Context, log logging.Logger, configPath string, configContext string) error { + log.Info("checking if nginx ingress controller is already running") + + isRunning, err := isIngressControllerRunning(ctx, log, configPath, configContext) + if err != nil { + return fmt.Errorf("failed to check nginx ingress controller status: %w", err) + } + if isRunning { + log.Info("nginx ingress controller already running") + return nil + } + + log.Info("deploying nginx ingress controller using Helm") + + // Add the helm repo for ingress-nginx + if err := runHelmCommand(ctx, "repo", "add", "ingress-nginx", ingressChartRepo); err != nil { + return fmt.Errorf("failed to add helm repo: %w", err) + } + if err := runHelmCommand(ctx, "repo", "update"); err != nil { + return fmt.Errorf("failed to update helm repos: %w", err) + } + + // Install nginx-ingress with values set directly via flags + // Using fixed nodePort 30791 for cross-platform compatibility + args := []string{ + "install", + ingressReleaseName, + ingressChartName, + "--namespace", ingressNamespace, + "--create-namespace", + "--wait", + "--set", "controller.service.type=NodePort", + // This port value must match the port configured in scripts/kind-with-registry.sh + "--set", "controller.service.nodePorts.http=30791", + "--set", "controller.admissionWebhooks.enabled=false", + "--set", "controller.config.proxy-read-timeout=600", + "--set", "controller.config.proxy-send-timeout=600", + "--set", "controller.config.proxy-body-size=0", + "--set", "controller.config.proxy-http-version=1.1", + "--set", "controller.metrics.enabled=true", + } + + if err := runHelmCommand(ctx, args...); err != nil { + return fmt.Errorf("failed to install nginx-ingress: %w", err) + } + + return waitForIngressController(ctx, log, configPath, configContext) +} + +// isIngressControllerRunning checks if the nginx ingress controller is already running. +func isIngressControllerRunning(ctx context.Context, log logging.Logger, configPath string, configContext string) (bool, error) { + clientset, err := GetClientset(log, configPath, configContext) + if err != nil { + return false, err + } + + // TODO(marun) Handle the case of the deployment being in a failed state + _, err = clientset.AppsV1().Deployments(ingressNamespace).Get(ctx, ingressControllerName, metav1.GetOptions{}) + isRunning := !apierrors.IsNotFound(err) || err == nil + return isRunning, nil +} + +// waitForIngressController waits for the nginx ingress controller to be ready. +func waitForIngressController(ctx context.Context, log logging.Logger, configPath string, configContext string) error { + clientset, err := GetClientset(log, configPath, configContext) + if err != nil { + return fmt.Errorf("failed to get clientset: %w", err) + } + + return wait.PollUntilContextCancel(ctx, statusCheckInterval, true /* immediate */, func(ctx context.Context) (bool, error) { + deployment, err := clientset.AppsV1().Deployments(ingressNamespace).Get(ctx, ingressControllerName, metav1.GetOptions{}) + if err != nil { + log.Debug("failed to get nginx ingress controller deployment", + zap.String("namespace", ingressNamespace), + zap.String("deployment", ingressControllerName), + zap.Error(err), + ) + return false, nil + } + if deployment.Status.ReadyReplicas == 0 { + log.Debug("waiting for nginx ingress controller to become ready", + zap.String("namespace", ingressNamespace), + zap.String("deployment", ingressControllerName), + zap.Int32("readyReplicas", deployment.Status.ReadyReplicas), + zap.Int32("replicas", deployment.Status.Replicas), + ) + return false, nil + } + + log.Info("nginx ingress controller is ready", + zap.String("namespace", ingressNamespace), + zap.String("deployment", ingressControllerName), + zap.Int32("readyReplicas", deployment.Status.ReadyReplicas), + ) + return true, nil + }) +} + +// runHelmCommand runs a Helm command with the given arguments. +func runHelmCommand(ctx context.Context, args ...string) error { + cmd := exec.CommandContext(ctx, "helm", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd.Run() +} + +// createDefaultsConfigMap creates a ConfigMap containing defaults for the tmpnet namespace. +func createDefaultsConfigMap(ctx context.Context, log logging.Logger, configPath string, configContext string, namespace string) error { + clientset, err := GetClientset(log, configPath, configContext) + if err != nil { + return fmt.Errorf("failed to get clientset: %w", err) + } + + configMapName := defaultsConfigMapName + + // Check if configmap already exists + _, err = clientset.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, metav1.GetOptions{}) + if err == nil { + log.Info("defaults ConfigMap already exists", + zap.String("namespace", namespace), + zap.String("configMap", configMapName), + ) + return nil + } + if !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to check for configmap %s/%s: %w", namespace, configMapName, err) + } + + log.Info("creating defaults ConfigMap", + zap.String("namespace", namespace), + zap.String("configMap", configMapName), + ) + + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: configMapName, + Namespace: namespace, + }, + Data: map[string]string{ + ingressHostKey: "localhost:30791", + }, + } + + _, err = clientset.CoreV1().ConfigMaps(namespace).Create(ctx, configMap, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create configmap %s/%s: %w", namespace, configMapName, err) + } + + return nil +} diff --git a/tests/fixture/tmpnet/utils.go b/tests/fixture/tmpnet/utils.go index e80070326900..121fe6a062bb 100644 --- a/tests/fixture/tmpnet/utils.go +++ b/tests/fixture/tmpnet/utils.go @@ -44,6 +44,14 @@ func CheckNodeHealth(ctx context.Context, uri string) (*health.APIReply, error) return nil, err } } + + // Assume `503 Service Unavailable` is the result of the ingress + // for the node not being ready. + // TODO(marun) Update Client.Health() to return a typed error + if err != nil && err.Error() == "received status code: 503" { + return nil, err + } + // Assume all other errors are not recoverable return nil, fmt.Errorf("%w: %w", ErrUnrecoverableNodeHealthCheck, err) } @@ -54,25 +62,18 @@ type NodeURI struct { URI string } -// GetNodeURIs returns the URIs of the provided nodes that are running and not ephemeral. The URIs returned -// are guaranteed be reachable by the caller until the cleanup function is called regardless of whether the -// nodes are running as local processes or in a kube cluster. -func GetNodeURIs(ctx context.Context, nodes []*Node, deferCleanupFunc func(func())) ([]NodeURI, error) { +// GetNodeURIs returns the accessible URIs of the provided nodes that are running and not ephemeral. +func GetNodeURIs(nodes []*Node) []NodeURI { availableNodes := FilterAvailableNodes(nodes) uris := []NodeURI{} for _, node := range availableNodes { - uri, cancel, err := node.GetLocalURI(ctx) - if err != nil { - return nil, err - } - deferCleanupFunc(cancel) uris = append(uris, NodeURI{ NodeID: node.NodeID, - URI: uri, + URI: node.GetAccessibleURI(), }) } - return uris, nil + return uris } // FilteredAvailableNodes filters the provided nodes by whether they are running and not ephemeral. @@ -96,15 +97,10 @@ func FilterAvailableNodes(nodes []*Node) []*Node { // blockchain ID, in the form "ws:///ext/bc//ws". // Ephemeral and stopped nodes are ignored. func GetNodeWebsocketURIs( - ctx context.Context, nodes []*Node, blockchainID string, - deferCleanupFunc func(func()), ) ([]string, error) { - nodeURIs, err := GetNodeURIs(ctx, nodes, deferCleanupFunc) - if err != nil { - return nil, fmt.Errorf("failed to get node URIs: %w", err) - } + nodeURIs := GetNodeURIs(nodes) wsURIs := make([]string, len(nodeURIs)) for i := range nodeURIs { uri, err := url.Parse(nodeURIs[i].URI) diff --git a/tests/fixture/tmpnet/yaml/tmpnet-rbac.yaml b/tests/fixture/tmpnet/yaml/tmpnet-rbac.yaml index 0d3056614a85..21db7582fd0f 100644 --- a/tests/fixture/tmpnet/yaml/tmpnet-rbac.yaml +++ b/tests/fixture/tmpnet/yaml/tmpnet-rbac.yaml @@ -11,6 +11,7 @@ metadata: name: tmpnet namespace: tmpnet rules: +# Regular usage - apiGroups: ["apps"] resources: ["statefulsets"] verbs: ["get", "create", "update", "patch"] @@ -23,6 +24,19 @@ rules: - apiGroups: [""] resources: ["pods/portforward"] verbs: ["create"] +# Enable external node access via ingress +- apiGroups: ["networking.k8s.io"] + resources: ["ingresses"] + verbs: ["get", "create"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] +- apiGroups: [""] + resources: ["endpoints"] + verbs: ["get"] +- apiGroups: [""] + resources: ["services"] + verbs: ["create"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/tests/load/c/main/main.go b/tests/load/c/main/main.go index 676293d173af..fdee7b7eaa42 100644 --- a/tests/load/c/main/main.go +++ b/tests/load/c/main/main.go @@ -88,7 +88,7 @@ func main() { ) }) - endpoints, err := tmpnet.GetNodeWebsocketURIs(ctx, network.Nodes, blockchainID, tc.DeferCleanup) + endpoints, err := tmpnet.GetNodeWebsocketURIs(network.Nodes, blockchainID) require.NoError(err, "failed †o get node websocket URIs") w := &workload{ diff --git a/tests/load2/main/main.go b/tests/load2/main/main.go index 07646d4851c7..1985f8f6902d 100644 --- a/tests/load2/main/main.go +++ b/tests/load2/main/main.go @@ -69,7 +69,7 @@ func main() { e2e.NewTestEnvironment(tc, flagVars, network) ctx := tests.DefaultNotifyContext(0, tc.DeferCleanup) - wsURIs, err := tmpnet.GetNodeWebsocketURIs(ctx, network.Nodes, blockchainID, tc.DeferCleanup) + wsURIs, err := tmpnet.GetNodeWebsocketURIs(network.Nodes, blockchainID) require.NoError(err) registry := prometheus.NewRegistry() diff --git a/tests/log.go b/tests/log.go index a431feb3c1ec..134cc9cd1f78 100644 --- a/tests/log.go +++ b/tests/log.go @@ -25,5 +25,6 @@ func LoggerForFormat(prefix string, rawLogFormat string) (logging.Logger, error) if err != nil { return nil, err } - return logging.NewLogger(prefix, logging.NewWrappedCore(logging.Verbo, writeCloser, logFormat.ConsoleEncoder())), nil + // TODO(marun) Make the log level configurable + return logging.NewLogger(prefix, logging.NewWrappedCore(logging.Debug, writeCloser, logFormat.ConsoleEncoder())), nil }