Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cmd/yarn-copilot-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package main

import (
"flag"
"k8s.io/client-go/rest"
"os"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import格式需要调整一下

"time"

statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/impl"
Expand Down Expand Up @@ -53,7 +55,11 @@ func main() {
klog.Infof("args: %s = %s", f.Name, f.Value)
})
stopCtx := signals.SetupSignalHandler()
kubelet, _ := statesinformer.NewKubeletStub("127.0.0.1", 10255, "http", time.Second*5, nil)
restConfig, err := initRestConfig()
if err != nil {
klog.Fatal(err)
}
kubelet, _ := statesinformer.NewKubeletStub("127.0.0.1", 10250, "https", time.Second*5, restConfig)
operator, err := nm.NewNodeMangerOperator(conf.CgroupRootDir, conf.YarnContainerCgroupPath, conf.SyncMemoryCgroup, conf.NodeMangerEndpoint, conf.SyncCgroupPeriod, kubelet)
if err != nil {
klog.Fatal(err)
Expand All @@ -68,3 +74,14 @@ func main() {
klog.Fatal(err)
}
}

func initRestConfig() (*rest.Config, error) {
restConfig, err := config.GetConfig()
if err != nil {
return nil, err
}
restConfig.TLSClientConfig.Insecure = true
restConfig.TLSClientConfig.CAData = nil
restConfig.TLSClientConfig.CAFile = ""
return restConfig, nil
}
48 changes: 29 additions & 19 deletions pkg/copilot-agent/nm/nm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ const (
MemoryMoveChargeAtImmigrateName = "memory.move_charge_at_immigrate"
)

type NodeMangerOperator struct {
type NodeMangerOperator interface {
Run(stop <-chan struct{}) error
KillContainer(containerID string) error
ListContainers() (*Containers, error)
GetContainer(containerID string) (*YarnContainer, error)
GenerateCgroupPath(containerID string) string
GenerateCgroupFullPath(cgroupSubSystem string) string
}

type nodeMangerOperator struct {
CgroupRoot string
CgroupPath string

Expand All @@ -52,15 +61,15 @@ type NodeMangerOperator struct {
nmTicker *time.Ticker
}

func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgroup bool, endpoint string, syncPeriod time.Duration, kubelet statesinformer.KubeletStub) (*NodeMangerOperator, error) {
func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgroup bool, endpoint string, syncPeriod time.Duration, kubelet statesinformer.KubeletStub) (NodeMangerOperator, error) {
watcher, err := pleg.NewWatcher()
if err != nil {
return nil, err
}
cli := resty.New()
cli.SetBaseURL(fmt.Sprintf("http://%s", endpoint))
w := NewNMPodWater(kubelet)
return &NodeMangerOperator{
operator := &nodeMangerOperator{
CgroupRoot: cgroupRoot,
CgroupPath: cgroupPath,
SyncMemoryCgroup: syncMemoryCgroup,
Expand All @@ -70,18 +79,19 @@ func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgrou
nmPodWatcher: w,
ticker: time.NewTicker(syncPeriod),
nmTicker: time.NewTicker(time.Second),
}, nil
}
return operator, nil
}

func (n *NodeMangerOperator) Run(stop <-chan struct{}) error {
func (n *nodeMangerOperator) Run(stop <-chan struct{}) error {
klog.Infof("Run node manager operator")
if n.SyncMemoryCgroup {
return n.syncMemoryCgroup(stop)
}
return nil
}

func (n *NodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error {
func (n *nodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error {
cpuDir := filepath.Join(n.CgroupRoot, system.CgroupCPUDir, n.CgroupPath)
if err := n.ensureCgroupDir(cpuDir); err != nil {
klog.Error(err)
Expand Down Expand Up @@ -119,7 +129,7 @@ func (n *NodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error {
}
}

func (n *NodeMangerOperator) syncNoneProcCgroup() {
func (n *nodeMangerOperator) syncNoneProcCgroup() {
klog.V(5).Info("syncNoneProcCgroup")
cpuPath := n.GenerateCgroupFullPath(system.CgroupCPUDir)
_ = filepath.Walk(cpuPath, func(path string, info os.FileInfo, err error) error {
Expand Down Expand Up @@ -147,7 +157,7 @@ func (n *NodeMangerOperator) syncNoneProcCgroup() {
})
}

func (n *NodeMangerOperator) syncNMEndpoint() {
func (n *nodeMangerOperator) syncNMEndpoint() {
endpoint, exist, err := n.nmPodWatcher.GetNMPodEndpoint()
if err != nil {
klog.Error(err)
Expand All @@ -163,7 +173,7 @@ func (n *NodeMangerOperator) syncNMEndpoint() {
}
}

func (n *NodeMangerOperator) syncAllCgroup() {
func (n *nodeMangerOperator) syncAllCgroup() {
subDirFunc := func(dir string) map[string]struct{} {
res := map[string]struct{}{}
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
Expand All @@ -190,7 +200,7 @@ func (n *NodeMangerOperator) syncAllCgroup() {
}
}

func (n *NodeMangerOperator) syncParentCgroup() error {
func (n *nodeMangerOperator) syncParentCgroup() error {
containers, err := n.ListContainers()
if err != nil {
klog.Error(err)
Expand All @@ -214,7 +224,7 @@ func (n *NodeMangerOperator) syncParentCgroup() error {
return nil
}

func (n *NodeMangerOperator) removeMemoryCgroup(fileName string) {
func (n *nodeMangerOperator) removeMemoryCgroup(fileName string) {
klog.V(5).Infof("receive file delete event %s", fileName)
basename := filepath.Base(fileName)
if !strings.HasPrefix(basename, "container_") {
Expand All @@ -229,7 +239,7 @@ func (n *NodeMangerOperator) removeMemoryCgroup(fileName string) {
klog.V(5).Infof("yarn container dir %v removed", basename)
}

func (n *NodeMangerOperator) createMemoryCgroup(fileName string) {
func (n *nodeMangerOperator) createMemoryCgroup(fileName string) {
klog.V(5).Infof("receive file create event %s", fileName)
basename := filepath.Base(fileName)
if !strings.HasPrefix(basename, "container_") {
Expand Down Expand Up @@ -277,7 +287,7 @@ func (n *NodeMangerOperator) createMemoryCgroup(fileName string) {
klog.V(5).Infof("set memory %s limit_in_bytes as %d", memCgroupPath, memLimit)
}

func (n *NodeMangerOperator) ensureCgroupDir(dir string) error {
func (n *nodeMangerOperator) ensureCgroupDir(dir string) error {
klog.V(5).Infof("ensure cgroup dir %s", dir)
f, err := os.Stat(dir)
if err != nil && !os.IsNotExist(err) {
Expand All @@ -296,15 +306,15 @@ func (n *NodeMangerOperator) ensureCgroupDir(dir string) error {
}

// KillContainer kill process group for target container
func (n *NodeMangerOperator) KillContainer(containerID string) error {
func (n *nodeMangerOperator) KillContainer(containerID string) error {
processGroupID := n.getProcessGroupID(containerID)
if processGroupID <= 1 {
return fmt.Errorf("invalid process group pid(%d) for container %s", processGroupID, containerID)
}
return syscall.Kill(-processGroupID, syscall.SIGKILL)
}

func (n *NodeMangerOperator) getProcessGroupID(containerID string) int {
func (n *nodeMangerOperator) getProcessGroupID(containerID string) int {
containerCgroupPath := filepath.Join(n.CgroupRoot, "cpu", n.CgroupPath, containerID)
pids, err := utils.GetPids(containerCgroupPath)
if err != nil {
Expand All @@ -323,7 +333,7 @@ type Containers struct {
} `json:"containers"`
}

func (n *NodeMangerOperator) ListContainers() (*Containers, error) {
func (n *nodeMangerOperator) ListContainers() (*Containers, error) {
var res Containers
resp, err := n.client.R().SetResult(&res).Get("/ws/v1/node/containers")
if err != nil {
Expand All @@ -335,7 +345,7 @@ func (n *NodeMangerOperator) ListContainers() (*Containers, error) {
return &res, nil
}

func (n *NodeMangerOperator) GetContainer(containerID string) (*YarnContainer, error) {
func (n *nodeMangerOperator) GetContainer(containerID string) (*YarnContainer, error) {
listContainers, err := n.ListContainers()
if err != nil {
return nil, err
Expand All @@ -348,10 +358,10 @@ func (n *NodeMangerOperator) GetContainer(containerID string) (*YarnContainer, e
return nil, fmt.Errorf("container Not Found")
}

func (n *NodeMangerOperator) GenerateCgroupPath(containerID string) string {
func (n *nodeMangerOperator) GenerateCgroupPath(containerID string) string {
return filepath.Join(n.CgroupPath, containerID)
}

func (n *NodeMangerOperator) GenerateCgroupFullPath(cgroupSubSystem string) string {
func (n *nodeMangerOperator) GenerateCgroupFullPath(cgroupSubSystem string) string {
return filepath.Join(n.CgroupRoot, cgroupSubSystem, n.CgroupPath)
}
Loading
Loading