From 8191d2fc0e7ca3b1cdf7c0b377083c9e0bf27ee8 Mon Sep 17 00:00:00 2001 From: 01377880 Date: Wed, 28 May 2025 10:30:00 +0800 Subject: [PATCH] Add:support-evict-yarn-container Signed-off-by: jimmysenior <29084098@qq.com> Add:support-evict-yarn-container Signed-off-by: jimmysenior <29084098@qq.com> --- cmd/yarn-copilot-agent/main.go | 19 +- pkg/copilot-agent/nm/nm.go | 48 +- pkg/copilot-agent/nm/nm_test.go | 363 ++++++++++++++ pkg/copilot-agent/nm/types.go | 8 + pkg/copilot-agent/server/helper.go | 11 +- pkg/copilot-agent/server/server.go | 153 +++++- pkg/copilot-agent/server/server_test.go | 641 ++++++++++++++++++++++++ 7 files changed, 1212 insertions(+), 31 deletions(-) create mode 100644 pkg/copilot-agent/nm/nm_test.go create mode 100644 pkg/copilot-agent/server/server_test.go diff --git a/cmd/yarn-copilot-agent/main.go b/cmd/yarn-copilot-agent/main.go index 70c819949..f0b17df58 100644 --- a/cmd/yarn-copilot-agent/main.go +++ b/cmd/yarn-copilot-agent/main.go @@ -18,7 +18,9 @@ package main import ( "flag" + "k8s.io/client-go/rest" "os" + "sigs.k8s.io/controller-runtime/pkg/client/config" "time" statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/impl" @@ -53,7 +55,11 @@ func main() { klog.Infof("args: %s = %s", f.Name, f.Value) }) stopCtx := signals.SetupSignalHandler() - kubelet, _ := statesinformer.NewKubeletStub("127.0.0.1", 10255, "http", time.Second*5, nil) + restConfig, err := initRestConfig() + if err != nil { + klog.Fatal(err) + } + kubelet, _ := statesinformer.NewKubeletStub("127.0.0.1", 10250, "https", time.Second*5, restConfig) operator, err := nm.NewNodeMangerOperator(conf.CgroupRootDir, conf.YarnContainerCgroupPath, conf.SyncMemoryCgroup, conf.NodeMangerEndpoint, conf.SyncCgroupPeriod, kubelet) if err != nil { klog.Fatal(err) @@ -68,3 +74,14 @@ func main() { klog.Fatal(err) } } + +func initRestConfig() (*rest.Config, error) { + restConfig, err := config.GetConfig() + if err != nil { + return nil, err + } + restConfig.TLSClientConfig.Insecure = true + restConfig.TLSClientConfig.CAData = nil + restConfig.TLSClientConfig.CAFile = "" + return restConfig, nil +} diff --git a/pkg/copilot-agent/nm/nm.go b/pkg/copilot-agent/nm/nm.go index 434aa9e8a..67b269dc8 100644 --- a/pkg/copilot-agent/nm/nm.go +++ b/pkg/copilot-agent/nm/nm.go @@ -38,7 +38,16 @@ const ( MemoryMoveChargeAtImmigrateName = "memory.move_charge_at_immigrate" ) -type NodeMangerOperator struct { +type NodeMangerOperator interface { + Run(stop <-chan struct{}) error + KillContainer(containerID string) error + ListContainers() (*Containers, error) + GetContainer(containerID string) (*YarnContainer, error) + GenerateCgroupPath(containerID string) string + GenerateCgroupFullPath(cgroupSubSystem string) string +} + +type nodeMangerOperator struct { CgroupRoot string CgroupPath string @@ -52,7 +61,7 @@ type NodeMangerOperator struct { nmTicker *time.Ticker } -func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgroup bool, endpoint string, syncPeriod time.Duration, kubelet statesinformer.KubeletStub) (*NodeMangerOperator, error) { +func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgroup bool, endpoint string, syncPeriod time.Duration, kubelet statesinformer.KubeletStub) (NodeMangerOperator, error) { watcher, err := pleg.NewWatcher() if err != nil { return nil, err @@ -60,7 +69,7 @@ func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgrou cli := resty.New() cli.SetBaseURL(fmt.Sprintf("http://%s", endpoint)) w := NewNMPodWater(kubelet) - return &NodeMangerOperator{ + operator := &nodeMangerOperator{ CgroupRoot: cgroupRoot, CgroupPath: cgroupPath, SyncMemoryCgroup: syncMemoryCgroup, @@ -70,10 +79,11 @@ func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgrou nmPodWatcher: w, ticker: time.NewTicker(syncPeriod), nmTicker: time.NewTicker(time.Second), - }, nil + } + return operator, nil } -func (n *NodeMangerOperator) Run(stop <-chan struct{}) error { +func (n *nodeMangerOperator) Run(stop <-chan struct{}) error { klog.Infof("Run node manager operator") if n.SyncMemoryCgroup { return n.syncMemoryCgroup(stop) @@ -81,7 +91,7 @@ func (n *NodeMangerOperator) Run(stop <-chan struct{}) error { return nil } -func (n *NodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error { +func (n *nodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error { cpuDir := filepath.Join(n.CgroupRoot, system.CgroupCPUDir, n.CgroupPath) if err := n.ensureCgroupDir(cpuDir); err != nil { klog.Error(err) @@ -119,7 +129,7 @@ func (n *NodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error { } } -func (n *NodeMangerOperator) syncNoneProcCgroup() { +func (n *nodeMangerOperator) syncNoneProcCgroup() { klog.V(5).Info("syncNoneProcCgroup") cpuPath := n.GenerateCgroupFullPath(system.CgroupCPUDir) _ = filepath.Walk(cpuPath, func(path string, info os.FileInfo, err error) error { @@ -147,7 +157,7 @@ func (n *NodeMangerOperator) syncNoneProcCgroup() { }) } -func (n *NodeMangerOperator) syncNMEndpoint() { +func (n *nodeMangerOperator) syncNMEndpoint() { endpoint, exist, err := n.nmPodWatcher.GetNMPodEndpoint() if err != nil { klog.Error(err) @@ -163,7 +173,7 @@ func (n *NodeMangerOperator) syncNMEndpoint() { } } -func (n *NodeMangerOperator) syncAllCgroup() { +func (n *nodeMangerOperator) syncAllCgroup() { subDirFunc := func(dir string) map[string]struct{} { res := map[string]struct{}{} _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { @@ -190,7 +200,7 @@ func (n *NodeMangerOperator) syncAllCgroup() { } } -func (n *NodeMangerOperator) syncParentCgroup() error { +func (n *nodeMangerOperator) syncParentCgroup() error { containers, err := n.ListContainers() if err != nil { klog.Error(err) @@ -214,7 +224,7 @@ func (n *NodeMangerOperator) syncParentCgroup() error { return nil } -func (n *NodeMangerOperator) removeMemoryCgroup(fileName string) { +func (n *nodeMangerOperator) removeMemoryCgroup(fileName string) { klog.V(5).Infof("receive file delete event %s", fileName) basename := filepath.Base(fileName) if !strings.HasPrefix(basename, "container_") { @@ -229,7 +239,7 @@ func (n *NodeMangerOperator) removeMemoryCgroup(fileName string) { klog.V(5).Infof("yarn container dir %v removed", basename) } -func (n *NodeMangerOperator) createMemoryCgroup(fileName string) { +func (n *nodeMangerOperator) createMemoryCgroup(fileName string) { klog.V(5).Infof("receive file create event %s", fileName) basename := filepath.Base(fileName) if !strings.HasPrefix(basename, "container_") { @@ -277,7 +287,7 @@ func (n *NodeMangerOperator) createMemoryCgroup(fileName string) { klog.V(5).Infof("set memory %s limit_in_bytes as %d", memCgroupPath, memLimit) } -func (n *NodeMangerOperator) ensureCgroupDir(dir string) error { +func (n *nodeMangerOperator) ensureCgroupDir(dir string) error { klog.V(5).Infof("ensure cgroup dir %s", dir) f, err := os.Stat(dir) if err != nil && !os.IsNotExist(err) { @@ -296,7 +306,7 @@ func (n *NodeMangerOperator) ensureCgroupDir(dir string) error { } // KillContainer kill process group for target container -func (n *NodeMangerOperator) KillContainer(containerID string) error { +func (n *nodeMangerOperator) KillContainer(containerID string) error { processGroupID := n.getProcessGroupID(containerID) if processGroupID <= 1 { return fmt.Errorf("invalid process group pid(%d) for container %s", processGroupID, containerID) @@ -304,7 +314,7 @@ func (n *NodeMangerOperator) KillContainer(containerID string) error { return syscall.Kill(-processGroupID, syscall.SIGKILL) } -func (n *NodeMangerOperator) getProcessGroupID(containerID string) int { +func (n *nodeMangerOperator) getProcessGroupID(containerID string) int { containerCgroupPath := filepath.Join(n.CgroupRoot, "cpu", n.CgroupPath, containerID) pids, err := utils.GetPids(containerCgroupPath) if err != nil { @@ -323,7 +333,7 @@ type Containers struct { } `json:"containers"` } -func (n *NodeMangerOperator) ListContainers() (*Containers, error) { +func (n *nodeMangerOperator) ListContainers() (*Containers, error) { var res Containers resp, err := n.client.R().SetResult(&res).Get("/ws/v1/node/containers") if err != nil { @@ -335,7 +345,7 @@ func (n *NodeMangerOperator) ListContainers() (*Containers, error) { return &res, nil } -func (n *NodeMangerOperator) GetContainer(containerID string) (*YarnContainer, error) { +func (n *nodeMangerOperator) GetContainer(containerID string) (*YarnContainer, error) { listContainers, err := n.ListContainers() if err != nil { return nil, err @@ -348,10 +358,10 @@ func (n *NodeMangerOperator) GetContainer(containerID string) (*YarnContainer, e return nil, fmt.Errorf("container Not Found") } -func (n *NodeMangerOperator) GenerateCgroupPath(containerID string) string { +func (n *nodeMangerOperator) GenerateCgroupPath(containerID string) string { return filepath.Join(n.CgroupPath, containerID) } -func (n *NodeMangerOperator) GenerateCgroupFullPath(cgroupSubSystem string) string { +func (n *nodeMangerOperator) GenerateCgroupFullPath(cgroupSubSystem string) string { return filepath.Join(n.CgroupRoot, cgroupSubSystem, n.CgroupPath) } diff --git a/pkg/copilot-agent/nm/nm_test.go b/pkg/copilot-agent/nm/nm_test.go new file mode 100644 index 000000000..88293c418 --- /dev/null +++ b/pkg/copilot-agent/nm/nm_test.go @@ -0,0 +1,363 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nm + +import ( + "context" + "encoding/json" + "fmt" + "github.com/go-resty/resty/v2" + statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/impl" + "github.com/koordinator-sh/yarn-copilot/cmd/yarn-copilot-agent/options" + "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "log" + "net/http" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + "time" +) + +func Test_NodeMangerOperator_NewNodeMangerOperator(t *testing.T) { + operator := initNodeManagerOperator() + klog.Infof("operator: %v", &operator) + time.Sleep(3 * time.Second) +} + +func initNodeManagerOperator() NodeMangerOperator { + original := runtime.GOOS + var operator NodeMangerOperator + var err error + if original != "darwin" && original != "windows" { + conf := options.NewConfiguration() + conf.SyncMemoryCgroup = true + conf.CgroupRootDir = "/tmp" + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Ensures cleanup after test + operator, err = NewNodeMangerOperator(conf.CgroupRootDir, conf.YarnContainerCgroupPath, conf.SyncMemoryCgroup, conf.NodeMangerEndpoint, conf.SyncCgroupPeriod, nil) + if err != nil { + klog.Fatal(err) + } + go func() { + if err := operator.Run(ctx.Done()); err != nil { + klog.Error(err) + } + }() + } + return operator +} + +func Test_NodeMangerOperator_KillContainer(t *testing.T) { + containerId := "container_e517_1746198264116_5225_01_000003" + operator := initNodeManagerOperator() + if operator != nil { + err := operator.KillContainer(containerId) + assert.Error(t, err) + } +} + +func Test_NodeMangerOperator_ListContainers(t *testing.T) { + go initHttpServer() + time.Sleep(2 * time.Second) + operator := initNodeManagerOperator() + if operator != nil { + containers, _ := operator.ListContainers() + if containers != nil { + assert.Equal(t, createMockResponse(), *containers) + } + } +} + +func Test_NodeMangerOperator_GetContainer(t *testing.T) { + go initHttpServer() + time.Sleep(2 * time.Second) + containerId := "container_1697376600001_0001_01_000001" + operator := initNodeManagerOperator() + if operator != nil { + container, _ := operator.GetContainer(containerId) + assert.NotNil(t, container) + } +} + +func Test_NodeMangerOperator_TestServer(t *testing.T) { + endpoint := "localhost:8042" + go initHttpServer() + time.Sleep(3 * time.Second) + cli := resty.New() + cli.SetBaseURL(fmt.Sprintf("http://%s", endpoint)) + var res Containers + resp, _ := cli.R().SetResult(&res).Get("/ws/v1/node/containers") + assert.Equal(t, http.StatusOK, resp.StatusCode()) + res2 := testFunc(res) + assert.Equal(t, createMockResponse(), *res2) + klog.Infof("res: %v", *res2) +} + +func testFunc(res Containers) *Containers { + return &res +} + +func Test_NodeMangerOperator_GenerateCgroupPath(t *testing.T) { + tests := []struct { + name string + containerID string + expected string + }{ + { + name: "生成有效的cgroup路径", + containerID: "container_123", + expected: "yarn/container_123", + }, + } + + operator := &nodeMangerOperator{ + CgroupPath: "yarn", + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := operator.GenerateCgroupPath(tt.containerID) + assert.Equal(t, tt.expected, result) + }) + } +} + +func Test_NodeMangerOperator_GenerateCgroupFullPath(t *testing.T) { + tests := []struct { + name string + cgroupSubSystem string + expected string + }{ + { + name: "生成CPU cgroup完整路径", + cgroupSubSystem: "cpu", + expected: "/sys/fs/cgroup/cpu/yarn", + }, + { + name: "生成内存cgroup完整路径", + cgroupSubSystem: "memory", + expected: "/sys/fs/cgroup/memory/yarn", + }, + } + + operator := &nodeMangerOperator{ + CgroupRoot: "/sys/fs/cgroup", + CgroupPath: "yarn", + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := operator.GenerateCgroupFullPath(tt.cgroupSubSystem) + assert.Equal(t, tt.expected, result) + }) + } +} + +var ( + httpServerOnce sync.Once + httpServer *http.Server +) + +func initHttpServer() { + httpServerOnce.Do(func() { + mux := http.NewServeMux() + mux.HandleFunc("/ws/v1/node/containers", func(w http.ResponseWriter, r *http.Request) { + log.Printf("收到请求: %s %s", r.Method, r.URL.Path) + + response := createMockResponse() + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-YARN-Version", "3.3.4") + w.Header().Set("X-Request-ID", fmt.Sprintf("yarn-req-%d", time.Now().UnixNano())) + + if err := json.NewEncoder(w).Encode(response); err != nil { + log.Printf("JSON编码错误: %v", err) + http.Error(w, "内部服务器错误", http.StatusInternalServerError) + } + }) + + httpServer = &http.Server{ + Addr: ":8042", + Handler: mux, // 使用自定义路由器 + } + + go func() { + log.Printf("YARN 节点容器API模拟器启动,监听 http://localhost%s", httpServer.Addr) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("服务器错误: %v", err) + } + }() + }) +} + +func createMockContainers() []YarnContainer { + return []YarnContainer{ + { + Id: "container_1697376600001_0001_01_000001", + Appid: "application_1697376600001_0001", + State: "RUNNING", + ExitCode: -1000, + Diagnostics: "", + User: "user1", + TotalMemoryNeededMB: 4096, + TotalVCoresNeeded: 2, + ContainerLogsLink: "http://node01:8042/logs/application_1697376600001_0001/container_1697376600001_0001_01_000001", + NodeId: "node01.example.com:8041", + MemUsed: 2456.78, + MemMaxed: 4096, + CpuUsed: 1.75, + CpuMaxed: 2.0, + ContainerLogFiles: []string{"syslog", "stderr", "stdout"}, + }, + { + Id: "container_1697376600001_0001_01_000002", + Appid: "application_1697376600001_0001", + State: "COMPLETED", + ExitCode: 0, + Diagnostics: "Success", + User: "user2", + TotalMemoryNeededMB: 2048, + TotalVCoresNeeded: 1, + ContainerLogsLink: "http://node01:8042/logs/application_1697376600001_0001/container_1697376600001_0001_01_000002", + NodeId: "node01.example.com:8041", + MemUsed: 1984.32, + MemMaxed: 2048, + CpuUsed: 0.92, + CpuMaxed: 1.0, + ContainerLogFiles: []string{"syslog", "stderr"}, + }, + { + Id: "container_1697376600001_0002_01_000001", + Appid: "application_1697376600001_0002", + State: "FAILED", + ExitCode: 137, + Diagnostics: "Container killed on request. Exit code is 137", + User: "user3", + TotalMemoryNeededMB: 8192, + TotalVCoresNeeded: 4, + ContainerLogsLink: "http://node01:8042/logs/application_1697376600001_0002/container_1697376600001_0002_01_000001", + NodeId: "node01.example.com:8041", + MemUsed: 7892.45, + MemMaxed: 8192, + CpuUsed: 3.82, + CpuMaxed: 4.0, + ContainerLogFiles: []string{"syslog", "stderr", "stdout", "gc.log"}, + }, + { + Id: "container_1697376600001_0003_01_000001", + Appid: "application_1697376600001_0003", + State: "NEW", + ExitCode: -1000, + Diagnostics: "", + User: "user4", + TotalMemoryNeededMB: 1024, + TotalVCoresNeeded: 1, + ContainerLogsLink: "http://node01:8042/logs/application_1697376600001_0003/container_1697376600001_0003_01_000001", + NodeId: "node01.example.com:8041", + MemUsed: 0, + MemMaxed: 1024, + CpuUsed: 0, + CpuMaxed: 1.0, + ContainerLogFiles: []string{}, + }, + } +} + +// 创建模拟响应 +func createMockResponse() Containers { + containers := createMockContainers() + + var resp Containers + resp.Containers.Items = containers + return resp +} + +func Test_nodeMangerOperator_ensureCgroupDir(t *testing.T) { + var dir = "/tmp/cpu/kubepods/besteffort/hadoop-yarn" + n := &nodeMangerOperator{} + assert.NoError(t, n.ensureCgroupDir(dir)) +} + +func Test_nodeMangerOperator_syncNoneProcCgroup(t *testing.T) { + n := &nodeMangerOperator{} + n.syncNoneProcCgroup() +} + +func Test_nodeMangerOperator_syncAllCgroup(t *testing.T) { + n := &nodeMangerOperator{} + n.syncAllCgroup() +} + +func Test_nodeMangerOperator_syncParentCgroup(t *testing.T) { + go initHttpServer() + time.Sleep(2 * time.Second) + cgroupRoot := "/tmp" + endpoint := "localhost:8042" + filePath := "/tmp/cpu/cpu.shares" + dir := filepath.Dir(filePath) + if err := os.MkdirAll(dir, 0755); err != nil { + fmt.Printf("创建目录失败: %v\n", err) + return + } + file, err := os.Create(filePath) + if err != nil { + fmt.Printf("创建文件失败: %v\n", err) + return + } + defer file.Close() + cli := resty.New() + cli.SetBaseURL(fmt.Sprintf("http://%s", endpoint)) + n := &nodeMangerOperator{ + CgroupRoot: cgroupRoot, + client: cli, + } + assert.NoError(t, n.syncParentCgroup()) +} + +func Test_nodeMangerOperator_syncNMEndpoint(t *testing.T) { + kubelet, _ := statesinformer.NewKubeletStub("127.0.0.1", 10250, "https", time.Second*5, nil) + w := NewNMPodWater(kubelet) + n := &nodeMangerOperator{ + nmPodWatcher: w, + } + n.syncNMEndpoint() +} + +func Test_nodeMangerOperator_removeMemoryCgroup(t *testing.T) { + dir := "/tmp/memory/container_1697376600001_0003_01_000001" + if err := os.MkdirAll(dir, 0755); err != nil { + fmt.Printf("创建目录失败: %v\n", err) + return + } + cgroupRoot := "/tmp" + n := &nodeMangerOperator{ + CgroupRoot: cgroupRoot, + } + n.removeMemoryCgroup(dir) +} + +func Test_nodeMangerOperator_createMemoryCgroup(t *testing.T) { + dir := "/tmp/memory/container_1697376600001_0003_01_000001" + os.Remove(dir) + cgroupRoot := "/tmp" + n := &nodeMangerOperator{ + CgroupRoot: cgroupRoot, + } + n.createMemoryCgroup(dir) +} diff --git a/pkg/copilot-agent/nm/types.go b/pkg/copilot-agent/nm/types.go index 91ae0a84e..c686d7e73 100644 --- a/pkg/copilot-agent/nm/types.go +++ b/pkg/copilot-agent/nm/types.go @@ -39,6 +39,14 @@ type YarnContainer struct { ContainerLogFiles []string `json:"containerLogFiles"` } +type ContainerId struct { + ID string + ClusterTS int64 + AppID int64 + AttemptID int + ContainerID int +} + func (c *YarnContainer) IsFinalState() bool { for _, state := range FinalContainerStates { if c.State == state { diff --git a/pkg/copilot-agent/server/helper.go b/pkg/copilot-agent/server/helper.go index aecee3226..b1bc55691 100644 --- a/pkg/copilot-agent/server/helper.go +++ b/pkg/copilot-agent/server/helper.go @@ -17,13 +17,14 @@ limitations under the License. package server import ( + "github.com/koordinator-sh/koordinator/apis/extension" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "github.com/koordinator-sh/yarn-copilot/pkg/copilot-agent/nm" ) -func ParseContainerInfo(yarnContainer *nm.YarnContainer, op *nm.NodeMangerOperator) *ContainerInfo { +func ParseContainerInfo(yarnContainer *nm.YarnContainer, op nm.NodeMangerOperator) *ContainerInfo { return &ContainerInfo{ Name: yarnContainer.Id, Namespace: "yarn", @@ -33,12 +34,12 @@ func ParseContainerInfo(yarnContainer *nm.YarnContainer, op *nm.NodeMangerOperat Priority: 1, Resources: corev1.ResourceRequirements{ Limits: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: *resource.NewMilliQuantity(int64(yarnContainer.TotalVCoresNeeded*1000), resource.DecimalSI), - corev1.ResourceMemory: *resource.NewMilliQuantity(int64(yarnContainer.TotalMemoryNeededMB*1024*1024*1000), resource.DecimalSI), + extension.BatchCPU: *resource.NewMilliQuantity(int64(yarnContainer.TotalVCoresNeeded*1000), resource.DecimalSI), + extension.BatchMemory: *resource.NewMilliQuantity(int64(yarnContainer.TotalMemoryNeededMB*1024*1024*1000), resource.DecimalSI), }, Requests: map[corev1.ResourceName]resource.Quantity{ - corev1.ResourceCPU: *resource.NewMilliQuantity(int64(yarnContainer.TotalVCoresNeeded*1000), resource.DecimalSI), - corev1.ResourceMemory: *resource.NewMilliQuantity(int64(yarnContainer.TotalMemoryNeededMB*1024*1024*1000), resource.DecimalSI), + extension.BatchCPU: *resource.NewMilliQuantity(int64(yarnContainer.TotalVCoresNeeded*1000), resource.DecimalSI), + extension.BatchMemory: *resource.NewMilliQuantity(int64(yarnContainer.TotalMemoryNeededMB*1024*1024*1000), resource.DecimalSI), }, }, } diff --git a/pkg/copilot-agent/server/server.go b/pkg/copilot-agent/server/server.go index 10f98d461..5b5347d85 100644 --- a/pkg/copilot-agent/server/server.go +++ b/pkg/copilot-agent/server/server.go @@ -18,11 +18,17 @@ package server import ( "context" + "errors" "fmt" + "github.com/koordinator-sh/koordinator/apis/extension" + "k8s.io/apimachinery/pkg/api/resource" "net" "net/http" "os" "path/filepath" + "sort" + "strconv" + "strings" "time" "github.com/gin-gonic/gin" @@ -34,11 +40,11 @@ import ( ) type YarnCopilotServer struct { - mgr *nm.NodeMangerOperator + mgr nm.NodeMangerOperator unixPath string } -func NewYarnCopilotServer(mgr *nm.NodeMangerOperator, unixPath string) *YarnCopilotServer { +func NewYarnCopilotServer(mgr nm.NodeMangerOperator, unixPath string) *YarnCopilotServer { return &YarnCopilotServer{mgr: mgr, unixPath: unixPath} } @@ -49,7 +55,7 @@ func (y *YarnCopilotServer) Run(ctx context.Context) error { e.GET("/v1/container", y.GetContainer) e.GET("/v1/containers", y.ListContainers) e.POST("/v1/killContainer", y.KillContainer) - e.POST("/v1/killContainersByResource", y.KillContainerByResource) + e.POST("/v1/killContainersByResource", y.KillContainersByResource) server := &http.Server{ Handler: e, @@ -160,15 +166,150 @@ func (y *YarnCopilotServer) KillContainer(ctx *gin.Context) { } container, err := y.mgr.GetContainer(kr.ContainerID) if err != nil { - ctx.JSON(http.StatusBadRequest, err) + ctx.JSON(http.StatusInternalServerError, err) return } if err := y.mgr.KillContainer(kr.ContainerID); err != nil { - ctx.JSON(http.StatusBadRequest, err) + ctx.JSON(http.StatusInternalServerError, err) return } ctx.JSON(http.StatusOK, KillInfo{Items: []*ContainerInfo{ParseContainerInfo(container, y.mgr)}}) } -func (y *YarnCopilotServer) KillContainerByResource(ctx *gin.Context) { +func (y *YarnCopilotServer) KillContainersByResource(ctx *gin.Context) { + var kr KillRequest + if err := ctx.BindJSON(&kr); err != nil { + ctx.JSON(http.StatusBadRequest, err) + return + } + klog.Info("KillRequest: %s", kr) + res, err := y.mgr.ListContainers() + if err != nil { + ctx.JSON(http.StatusInternalServerError, err) + return + } + needReleasedCpu, _ := kr.Resources.Name(extension.BatchCPU, resource.DecimalSI).AsInt64() + needReleasedMemory, _ := kr.Resources.Name(extension.BatchMemory, resource.BinarySI).AsInt64() + var currentReleasedCpu, currentReleasedMemory int + containers := res.Containers.Items + if len(containers) > 0 { + filteredContainers := filter(containers) + if len(filteredContainers) == 0 { + filteredContainers = containers + } + scoredContainers := score(filteredContainers) + for _, container := range scoredContainers { + if err := y.mgr.KillContainer(container.Id); err != nil { + klog.Errorf("KillContainersByResource error: %s", container) + ctx.JSON(http.StatusInternalServerError, err) + return + } else { + klog.Infof("kill container %s", container) + currentReleasedCpu += container.TotalVCoresNeeded * 1000 + currentReleasedMemory += container.TotalMemoryNeededMB * 1024 * 1024 + if int64(currentReleasedCpu) >= needReleasedCpu && int64(currentReleasedMemory) >= needReleasedMemory { + break + } + } + } + } + releasedResourceList := v1.ResourceList{ + extension.BatchCPU: *resource.NewMilliQuantity(int64(currentReleasedCpu), resource.DecimalSI), + extension.BatchMemory: *resource.NewQuantity(int64(currentReleasedMemory), resource.BinarySI), + } + klog.Infof("release resources: %s", releasedResourceList) + ctx.JSON(http.StatusOK, releasedResourceList) +} + +func filter(containers []nm.YarnContainer) []nm.YarnContainer { + var filtered []nm.YarnContainer + for _, c := range containers { + lastSeg, err := parseLastSegment(c.Id) + if err != nil || lastSeg != "000001" { + filtered = append(filtered, c) + } + } + return filtered +} + +func score(containers []nm.YarnContainer) []nm.YarnContainer { + if len(containers) > 0 { + sort.Sort(ReverseContainerSorter(containers)) + klog.V(4).Infof("Sorted containers by ID in descending order: %+v", containers) + } + return containers +} + +/** + * containerId + * container_e*epoch*_*clusterTimestamp*_*appId*_*attemptId*_*containerId* + * container_*clusterTimestamp*_*appId*_*attemptId*_*containerId* + */ +func parseLastSegment(containerID string) (string, error) { + if containerID == "" { + return "", errors.New("container ID 不能为空") + } + segments := strings.Split(containerID, "_") + if (len(segments) != 5 && len(segments) != 6) || segments[0] != "container" { + return "", fmt.Errorf("无效的容器ID格式: %s", containerID) + } + lastSegment := segments[len(segments)-1] + return lastSegment, nil +} + +type ReverseContainerSorter []nm.YarnContainer + +func (cs ReverseContainerSorter) Len() int { return len(cs) } +func (cs ReverseContainerSorter) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } +func (cs ReverseContainerSorter) Less(i, j int) bool { + a, b := cs[i], cs[j] + containerA, _ := parseContainerID(a.Id) + containerB, _ := parseContainerID(b.Id) + switch { + case containerA.ClusterTS != containerB.ClusterTS: + return containerA.ClusterTS > containerB.ClusterTS + case containerA.AppID != containerB.AppID: + return containerA.AppID > containerB.AppID + case containerA.AttemptID != containerB.AttemptID: + return containerA.AttemptID > containerB.AttemptID + default: + return containerA.ContainerID > containerB.ContainerID + } +} + +func parseContainerID(id string) (nm.ContainerId, error) { + segments := strings.Split(id, "_") + if (len(segments) != 5 && len(segments) != 6) || segments[0] != "container" { + return nm.ContainerId{}, fmt.Errorf("invalid container ID format: %s", id) + } + + clusterTSStr := segments[len(segments)-4] + clusterTS, err := strconv.ParseInt(clusterTSStr, 10, 64) + if 13 != len(clusterTSStr) || err != nil { + return nm.ContainerId{}, fmt.Errorf("invalid cluster timestamp: %s", clusterTSStr) + + } + + appID, err := strconv.ParseInt(segments[len(segments)-3], 10, 64) + if err != nil { + return nm.ContainerId{}, fmt.Errorf("invalid app ID: %s", segments[len(segments)-3]) + } + + attemptID, err := strconv.Atoi(segments[len(segments)-2]) + if err != nil { + return nm.ContainerId{}, fmt.Errorf("invalid attempt ID: %s", segments[len(segments)-2]) + } + + containerID, err := strconv.Atoi(segments[len(segments)-1]) + if err != nil { + return nm.ContainerId{}, fmt.Errorf("invalid container ID: %s", segments[len(segments)-1]) + } + + return nm.ContainerId{ + ID: id, + ClusterTS: clusterTS, + AppID: appID, + AttemptID: attemptID, + ContainerID: containerID, + }, nil } diff --git a/pkg/copilot-agent/server/server_test.go b/pkg/copilot-agent/server/server_test.go new file mode 100644 index 000000000..6d280922a --- /dev/null +++ b/pkg/copilot-agent/server/server_test.go @@ -0,0 +1,641 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/koordinator-sh/yarn-copilot/pkg/copilot-agent/nm" +) + +// MockNodeMangerOperator 模拟NodeMangerOperator接口 +type MockNodeMangerOperator struct { + containers *nm.Containers + err1 error + err2 error +} + +func (m *MockNodeMangerOperator) Run(stop <-chan struct{}) error { + return nil +} + +func (m *MockNodeMangerOperator) KillContainer(containerID string) error { + return m.err2 +} + +func (m *MockNodeMangerOperator) ListContainers() (*nm.Containers, error) { + return m.containers, m.err1 +} + +func (m *MockNodeMangerOperator) GetContainer(containerID string) (*nm.YarnContainer, error) { + if m.containers == nil { + return nil, m.err1 + } + for _, container := range m.containers.Containers.Items { + if container.Id == containerID { + return &container, nil + } + } + return nil, m.err1 +} + +func (m *MockNodeMangerOperator) GenerateCgroupPath(containerID string) string { + return "yarn/" + containerID +} + +func (m *MockNodeMangerOperator) GenerateCgroupFullPath(cgroupSubSystem string) string { + return "/sys/fs/cgroup/" + cgroupSubSystem + "/yarn" +} + +func setupTestServer() (*YarnCopilotServer, *MockNodeMangerOperator) { + mockMgr := &MockNodeMangerOperator{} + server := NewYarnCopilotServer(mockMgr, "/tmp/test.sock") + return server, mockMgr +} + +func TestYarnCopilotServer_Health(t *testing.T) { + gin.SetMode(gin.TestMode) + server, _ := setupTestServer() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/health", nil) + + server.Health(c) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "\"ok\"", w.Body.String()) +} + +func TestYarnCopilotServer_Information(t *testing.T) { + gin.SetMode(gin.TestMode) + server, _ := setupTestServer() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/information", nil) + + server.Information(c) + + assert.Equal(t, http.StatusOK, w.Code) + var info PluginInfo + err := json.Unmarshal(w.Body.Bytes(), &info) + assert.NoError(t, err) + assert.Equal(t, "yarn", info.Name) + assert.Equal(t, "v1", info.Version) +} + +func TestYarnCopilotServer_ListContainers(t *testing.T) { + gin.SetMode(gin.TestMode) + server, mockMgr := setupTestServer() + + mockMgr.containers = &nm.Containers{ + Containers: struct { + Items []nm.YarnContainer `json:"container"` + }{ + Items: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000003", + State: "RUNNING", + }, + { + Id: "container_e517_1746198264116_5225_01_000004", + State: "DONE", + }, + }, + }, + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/v1/containers", nil) + + server.ListContainers(c) + + assert.Equal(t, http.StatusOK, w.Code) + var containers []*ContainerInfo + err := json.Unmarshal(w.Body.Bytes(), &containers) + assert.NoError(t, err) + assert.Equal(t, 1, len(containers)) // 只返回非终态容器 + assert.Equal(t, "container_e517_1746198264116_5225_01_000003", containers[0].Name) + + // 测试错误情况 + mockMgr.err1 = assert.AnError + w = httptest.NewRecorder() + c, _ = gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/v1/containers", nil) + + server.ListContainers(c) + + assert.Equal(t, http.StatusBadRequest, w.Code) +} + +func TestYarnCopilotServer_GetContainer(t *testing.T) { + gin.SetMode(gin.TestMode) + server, mockMgr := setupTestServer() + + mockMgr.containers = &nm.Containers{ + Containers: struct { + Items []nm.YarnContainer `json:"container"` + }{ + Items: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000003", + State: "RUNNING", + }, + }, + }, + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/v1/container?containerID=container_e517_1746198264116_5225_01_000003", nil) + + server.GetContainer(c) + + assert.Equal(t, http.StatusOK, w.Code) + var container ContainerInfo + err := json.Unmarshal(w.Body.Bytes(), &container) + assert.NoError(t, err) + assert.Equal(t, "container_e517_1746198264116_5225_01_000003", container.Name) + + // 测试容器不存在的情况 + mockMgr.err1 = errors.New("container Not Found") + w = httptest.NewRecorder() + c, _ = gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/v1/container?containerID=container_e517_1746198264116_5225_01_000004", nil) + + server.GetContainer(c) + + assert.Equal(t, http.StatusBadRequest, w.Code) +} + +func TestYarnCopilotServer_KillContainer(t *testing.T) { + gin.SetMode(gin.TestMode) + server, mockMgr := setupTestServer() + + mockMgr.containers = &nm.Containers{ + Containers: struct { + Items []nm.YarnContainer `json:"container"` + }{ + Items: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000003", + State: "RUNNING", + }, + }, + }, + } + + killRequest := KillRequest{ + ContainerID: "container_e517_1746198264116_5225_01_000003", + } + body, _ := json.Marshal(killRequest) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/killContainer", bytes.NewBuffer(body)) + + server.KillContainer(c) + + assert.Equal(t, http.StatusOK, w.Code) + var killInfo KillInfo + err := json.Unmarshal(w.Body.Bytes(), &killInfo) + assert.NoError(t, err) + assert.Equal(t, 1, len(killInfo.Items)) + assert.Equal(t, "container_e517_1746198264116_5225_01_000003", killInfo.Items[0].Name) + + // 测试错误情况 + mockMgr.err2 = assert.AnError + w = httptest.NewRecorder() + c, _ = gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/killContainer", bytes.NewBuffer(body)) + + server.KillContainer(c) + + assert.Equal(t, http.StatusInternalServerError, w.Code) +} + +func TestYarnCopilotServer_KillContainersByResource(t *testing.T) { + var tests = []struct { + name string + containers []nm.YarnContainer + killRequest KillRequest + releasedResources v1.ResourceList + httpStatus int + err1 error + err2 error + }{ + + { + name: "list container error ", + containers: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000003", + State: "RUNNING", + TotalVCoresNeeded: 2, + TotalMemoryNeededMB: 1024, + }, + }, + killRequest: KillRequest{ + Resources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("1"), + extension.BatchMemory: resource.MustParse("1Gi"), + }, + }, + releasedResources: v1.ResourceList{}, + httpStatus: http.StatusInternalServerError, + err1: assert.AnError, + err2: nil, + }, + { + name: "kill container error ", + containers: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000003", + State: "RUNNING", + TotalVCoresNeeded: 2, + TotalMemoryNeededMB: 1024, + }, + }, + killRequest: KillRequest{ + Resources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("1"), + extension.BatchMemory: resource.MustParse("1Gi"), + }, + }, + releasedResources: v1.ResourceList{}, + httpStatus: http.StatusInternalServerError, + err1: nil, + err2: assert.AnError, + }, + { + name: "kill container with max containerID", + containers: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000003", + State: "RUNNING", + TotalVCoresNeeded: 2, + TotalMemoryNeededMB: 1024, + }, + { + Id: "container_e517_1746198264116_34108_01_000003", + State: "RUNNING", + TotalVCoresNeeded: 1, + TotalMemoryNeededMB: 2048, + }, + }, + killRequest: KillRequest{ + Resources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("1"), + extension.BatchMemory: resource.MustParse("1Gi"), + }, + }, + releasedResources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("1"), + extension.BatchMemory: resource.MustParse("2Gi"), + }, + httpStatus: http.StatusOK, + err1: nil, + err2: nil, + }, + { + name: "kill container which is not ApplicationMaster", + containers: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000003", + State: "RUNNING", + TotalVCoresNeeded: 2, + TotalMemoryNeededMB: 1024, + }, + { + Id: "container_e517_1746198264116_34108_01_000001", + State: "RUNNING", + TotalVCoresNeeded: 1, + TotalMemoryNeededMB: 2048, + }, + }, + killRequest: KillRequest{ + Resources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("1"), + extension.BatchMemory: resource.MustParse("1Gi"), + }, + }, + releasedResources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("2"), + extension.BatchMemory: resource.MustParse("1Gi"), + }, + httpStatus: http.StatusOK, + err1: nil, + err2: nil, + }, + { + name: "when all containers running with ApplicationMaster,kill container with max containerId", + containers: []nm.YarnContainer{ + { + Id: "container_e517_1746198264116_5225_01_000001", + State: "RUNNING", + TotalVCoresNeeded: 2, + TotalMemoryNeededMB: 1024, + }, + { + Id: "container_e517_1746198264116_34108_01_000001", + State: "RUNNING", + TotalVCoresNeeded: 1, + TotalMemoryNeededMB: 2048, + }, + }, + killRequest: KillRequest{ + Resources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("1"), + extension.BatchMemory: resource.MustParse("1Gi"), + }, + }, + releasedResources: v1.ResourceList{ + extension.BatchCPU: resource.MustParse("1"), + extension.BatchMemory: resource.MustParse("2Gi"), + }, + httpStatus: http.StatusOK, + err1: nil, + err2: nil, + }, + } + for _, tt := range tests { + gin.SetMode(gin.TestMode) + server, mockMgr := setupTestServer() + mockMgr.err1 = tt.err1 + mockMgr.err2 = tt.err2 + mockMgr.containers = &nm.Containers{ + Containers: struct { + Items []nm.YarnContainer `json:"container"` + }{ + Items: tt.containers, + }, + } + body, _ := json.Marshal(tt.killRequest) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/v1/killContainersByResource", bytes.NewBuffer(body)) + server.KillContainersByResource(c) + assert.Equal(t, tt.httpStatus, w.Code) + var releasedResources v1.ResourceList + err := json.Unmarshal(w.Body.Bytes(), &releasedResources) + assert.NoError(t, err) + assert.NotNil(t, releasedResources) + assert.Equal(t, tt.releasedResources, releasedResources) + } +} + +func TestYarnCopilotServer_Run(t *testing.T) { + gin.SetMode(gin.TestMode) + server, _ := setupTestServer() + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err := server.Run(ctx) + assert.NoError(t, err) +} + +func Test_filter(t *testing.T) { + tests := []struct { + name string + containers []nm.YarnContainer + expected int + }{ + { + name: "filter container which containerID ended with 000001", + containers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5225_01_000001"}, + {Id: "container_e517_1746198264116_5225_01_000002"}, + {Id: "container_e517_1746198264116_34108_01_000003"}, + }, + expected: 2, + }, + { + name: "filter all containers", + containers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5225_01_000001"}, + {Id: "container_e517_1746198264116_5226_01_000001"}, + }, + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter(tt.containers) + assert.Equal(t, tt.expected, len(result)) + }) + } +} + +func Test_score(t *testing.T) { + tests := []struct { + name string + containers []nm.YarnContainer + expectedContainers []nm.YarnContainer + }{ + { + name: "sort containerId", + containers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5225_01_000003"}, + {Id: "container_e517_1746198264116_5225_01_000004"}, + {Id: "container_e517_1746198264116_34108_01_000003"}, + {Id: "container_e517_1746198264116_34108_01_000003"}, + }, + expectedContainers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_34108_01_000003"}, + {Id: "container_e517_1746198264116_34108_01_000003"}, + {Id: "container_e517_1746198264116_5225_01_000004"}, + {Id: "container_e517_1746198264116_5225_01_000003"}, + }, + }, + { + // container_*clusterTimestamp*_*appId*_*attemptId*_*containerId* + name: "sort attemptId", + containers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5225_01_000004"}, + {Id: "container_e517_1746198264116_5225_02_000003"}, + }, + expectedContainers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5225_02_000003"}, + {Id: "container_e517_1746198264116_5225_01_000004"}, + }, + }, + { + // container_*clusterTimestamp*_*appId*_*attemptId*_*containerId* + name: "sort appId", + containers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5225_02_000004"}, + {Id: "container_e517_1746198264116_5226_01_000003"}, + }, + expectedContainers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5226_01_000003"}, + {Id: "container_e517_1746198264116_5225_02_000004"}, + }, + }, + { + // container_*clusterTimestamp*_*appId*_*attemptId*_*containerId* + name: "sort clusterTimestamp", + containers: []nm.YarnContainer{ + {Id: "container_e517_1746198264116_5225_02_000004"}, + {Id: "container_e517_1746198264117_5225_01_000003"}, + }, + expectedContainers: []nm.YarnContainer{ + {Id: "container_e517_1746198264117_5225_01_000003"}, + {Id: "container_e517_1746198264116_5225_02_000004"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := score(tt.containers) + assert.Equal(t, tt.expectedContainers, result) + }) + } +} + +func Test_parseLastSegment(t *testing.T) { + tests := []struct { + name string + containerID string + expected string + hasError bool + }{ + { + name: "valid containerID", + containerID: "container_e517_1746198264116_5225_01_000001", + expected: "000001", + hasError: false, + }, + { + name: "invalid containerID", + containerID: "container_e517_1746198264116_5225_01_000003_000001", + expected: "", + hasError: true, + }, + { + name: "empty containerID", + containerID: "", + expected: "", + hasError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := parseLastSegment(tt.containerID) + if tt.hasError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +} + +func Test_parseContainerID(t *testing.T) { + tests := []struct { + name string + containerID string + expected string + hasError bool + expectedErr string + }{ + { + name: "invalid container ID, too long", + containerID: "container_e517_1746198264116_5225_01_000003_000001", + hasError: true, + expectedErr: "invalid container ID format", + }, + { + name: "invalid container ID, too short", + containerID: "container_e517_1746198264116_5225", + hasError: true, + expectedErr: "invalid container ID format", + }, + { + name: "invalid container ID, not started with prefix container", + containerID: "c_e517_1746198264116_5225_01", + hasError: true, + expectedErr: "invalid container ID format", + }, + { + name: "invalid cluster timestamp, number too long", + containerID: "container_e517_17461982641167_5225_01_000001", + hasError: true, + expectedErr: "invalid cluster timestamp", + }, + { + name: "invalid cluster timestamp, not number", + containerID: "container_e517_xxx_5225_01_000001", + hasError: true, + expectedErr: "invalid cluster timestamp", + }, + { + name: "invalid app ID, not number", + containerID: "container_e517_1746198264116_xxx_01_000001", + hasError: true, + expectedErr: "invalid app ID", + }, + { + name: "invalid attempt ID, not number", + containerID: "container_e517_1746198264116_5225_xx_000001", + hasError: true, + expectedErr: "invalid attempt ID", + }, + { + name: "invalid container ID, not number", + containerID: "container_e517_1746198264116_5225_01_00000x", + hasError: true, + expectedErr: "invalid container ID", + }, + { + name: "valid container ID", + containerID: "container_e517_1746198264116_5225_01_000001", + hasError: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := parseContainerID(tt.containerID) + if tt.hasError { + assert.Equal(t, nm.ContainerId{}, result) + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedErr) + } else { + assert.NoError(t, err) + } + }) + } +}