Skip to content

Commit 3c0770c

Browse files
01377880jimmysenior
authored andcommitted
Add:support-evict-yarn-container
Signed-off-by: jimmysenior <[email protected]>
1 parent 658fb98 commit 3c0770c

File tree

7 files changed

+1226
-33
lines changed

7 files changed

+1226
-33
lines changed

cmd/yarn-copilot-agent/main.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package main
1818

1919
import (
2020
"flag"
21+
"k8s.io/client-go/rest"
2122
"os"
23+
"sigs.k8s.io/controller-runtime/pkg/client/config"
2224
"time"
2325

2426
statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/impl"
@@ -53,7 +55,11 @@ func main() {
5355
klog.Infof("args: %s = %s", f.Name, f.Value)
5456
})
5557
stopCtx := signals.SetupSignalHandler()
56-
kubelet, _ := statesinformer.NewKubeletStub("127.0.0.1", 10255, "http", time.Second*5, nil)
58+
restConfig, err := initRestConfig()
59+
if err != nil {
60+
klog.Fatal(err)
61+
}
62+
kubelet, _ := statesinformer.NewKubeletStub("127.0.0.1", 10250, "https", time.Second*5, restConfig)
5763
operator, err := nm.NewNodeMangerOperator(conf.CgroupRootDir, conf.YarnContainerCgroupPath, conf.SyncMemoryCgroup, conf.NodeMangerEndpoint, conf.SyncCgroupPeriod, kubelet)
5864
if err != nil {
5965
klog.Fatal(err)
@@ -68,3 +74,14 @@ func main() {
6874
klog.Fatal(err)
6975
}
7076
}
77+
78+
func initRestConfig() (*rest.Config, error) {
79+
restConfig, err := config.GetConfig()
80+
if err != nil {
81+
return nil, err
82+
}
83+
restConfig.TLSClientConfig.Insecure = true
84+
restConfig.TLSClientConfig.CAData = nil
85+
restConfig.TLSClientConfig.CAFile = ""
86+
return restConfig, nil
87+
}

pkg/copilot-agent/nm/nm.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,16 @@ const (
3838
MemoryMoveChargeAtImmigrateName = "memory.move_charge_at_immigrate"
3939
)
4040

41-
type NodeMangerOperator struct {
41+
type NodeMangerOperator interface {
42+
Run(stop <-chan struct{}) error
43+
KillContainer(containerID string) error
44+
ListContainers() (*Containers, error)
45+
GetContainer(containerID string) (*YarnContainer, error)
46+
GenerateCgroupPath(containerID string) string
47+
GenerateCgroupFullPath(cgroupSubSystem string) string
48+
}
49+
50+
type nodeMangerOperator struct {
4251
CgroupRoot string
4352
CgroupPath string
4453

@@ -52,15 +61,15 @@ type NodeMangerOperator struct {
5261
nmTicker *time.Ticker
5362
}
5463

55-
func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgroup bool, endpoint string, syncPeriod time.Duration, kubelet statesinformer.KubeletStub) (*NodeMangerOperator, error) {
64+
func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgroup bool, endpoint string, syncPeriod time.Duration, kubelet statesinformer.KubeletStub) (NodeMangerOperator, error) {
5665
watcher, err := pleg.NewWatcher()
5766
if err != nil {
5867
return nil, err
5968
}
6069
cli := resty.New()
6170
cli.SetBaseURL(fmt.Sprintf("http://%s", endpoint))
6271
w := NewNMPodWater(kubelet)
63-
return &NodeMangerOperator{
72+
operator := &nodeMangerOperator{
6473
CgroupRoot: cgroupRoot,
6574
CgroupPath: cgroupPath,
6675
SyncMemoryCgroup: syncMemoryCgroup,
@@ -70,18 +79,19 @@ func NewNodeMangerOperator(cgroupRoot string, cgroupPath string, syncMemoryCgrou
7079
nmPodWatcher: w,
7180
ticker: time.NewTicker(syncPeriod),
7281
nmTicker: time.NewTicker(time.Second),
73-
}, nil
82+
}
83+
return operator, nil
7484
}
7585

76-
func (n *NodeMangerOperator) Run(stop <-chan struct{}) error {
86+
func (n *nodeMangerOperator) Run(stop <-chan struct{}) error {
7787
klog.Infof("Run node manager operator")
7888
if n.SyncMemoryCgroup {
7989
return n.syncMemoryCgroup(stop)
8090
}
8191
return nil
8292
}
8393

84-
func (n *NodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error {
94+
func (n *nodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error {
8595
cpuDir := filepath.Join(n.CgroupRoot, system.CgroupCPUDir, n.CgroupPath)
8696
if err := n.ensureCgroupDir(cpuDir); err != nil {
8797
klog.Error(err)
@@ -119,7 +129,7 @@ func (n *NodeMangerOperator) syncMemoryCgroup(stop <-chan struct{}) error {
119129
}
120130
}
121131

122-
func (n *NodeMangerOperator) syncNoneProcCgroup() {
132+
func (n *nodeMangerOperator) syncNoneProcCgroup() {
123133
klog.V(5).Info("syncNoneProcCgroup")
124134
cpuPath := n.GenerateCgroupFullPath(system.CgroupCPUDir)
125135
_ = filepath.Walk(cpuPath, func(path string, info os.FileInfo, err error) error {
@@ -147,7 +157,7 @@ func (n *NodeMangerOperator) syncNoneProcCgroup() {
147157
})
148158
}
149159

150-
func (n *NodeMangerOperator) syncNMEndpoint() {
160+
func (n *nodeMangerOperator) syncNMEndpoint() {
151161
endpoint, exist, err := n.nmPodWatcher.GetNMPodEndpoint()
152162
if err != nil {
153163
klog.Error(err)
@@ -163,7 +173,7 @@ func (n *NodeMangerOperator) syncNMEndpoint() {
163173
}
164174
}
165175

166-
func (n *NodeMangerOperator) syncAllCgroup() {
176+
func (n *nodeMangerOperator) syncAllCgroup() {
167177
subDirFunc := func(dir string) map[string]struct{} {
168178
res := map[string]struct{}{}
169179
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
@@ -190,7 +200,7 @@ func (n *NodeMangerOperator) syncAllCgroup() {
190200
}
191201
}
192202

193-
func (n *NodeMangerOperator) syncParentCgroup() error {
203+
func (n *nodeMangerOperator) syncParentCgroup() error {
194204
containers, err := n.ListContainers()
195205
if err != nil {
196206
klog.Error(err)
@@ -214,7 +224,7 @@ func (n *NodeMangerOperator) syncParentCgroup() error {
214224
return nil
215225
}
216226

217-
func (n *NodeMangerOperator) removeMemoryCgroup(fileName string) {
227+
func (n *nodeMangerOperator) removeMemoryCgroup(fileName string) {
218228
klog.V(5).Infof("receive file delete event %s", fileName)
219229
basename := filepath.Base(fileName)
220230
if !strings.HasPrefix(basename, "container_") {
@@ -229,7 +239,7 @@ func (n *NodeMangerOperator) removeMemoryCgroup(fileName string) {
229239
klog.V(5).Infof("yarn container dir %v removed", basename)
230240
}
231241

232-
func (n *NodeMangerOperator) createMemoryCgroup(fileName string) {
242+
func (n *nodeMangerOperator) createMemoryCgroup(fileName string) {
233243
klog.V(5).Infof("receive file create event %s", fileName)
234244
basename := filepath.Base(fileName)
235245
if !strings.HasPrefix(basename, "container_") {
@@ -277,7 +287,7 @@ func (n *NodeMangerOperator) createMemoryCgroup(fileName string) {
277287
klog.V(5).Infof("set memory %s limit_in_bytes as %d", memCgroupPath, memLimit)
278288
}
279289

280-
func (n *NodeMangerOperator) ensureCgroupDir(dir string) error {
290+
func (n *nodeMangerOperator) ensureCgroupDir(dir string) error {
281291
klog.V(5).Infof("ensure cgroup dir %s", dir)
282292
f, err := os.Stat(dir)
283293
if err != nil && !os.IsNotExist(err) {
@@ -296,15 +306,15 @@ func (n *NodeMangerOperator) ensureCgroupDir(dir string) error {
296306
}
297307

298308
// KillContainer kill process group for target container
299-
func (n *NodeMangerOperator) KillContainer(containerID string) error {
309+
func (n *nodeMangerOperator) KillContainer(containerID string) error {
300310
processGroupID := n.getProcessGroupID(containerID)
301311
if processGroupID <= 1 {
302312
return fmt.Errorf("invalid process group pid(%d) for container %s", processGroupID, containerID)
303313
}
304314
return syscall.Kill(-processGroupID, syscall.SIGKILL)
305315
}
306316

307-
func (n *NodeMangerOperator) getProcessGroupID(containerID string) int {
317+
func (n *nodeMangerOperator) getProcessGroupID(containerID string) int {
308318
containerCgroupPath := filepath.Join(n.CgroupRoot, "cpu", n.CgroupPath, containerID)
309319
pids, err := utils.GetPids(containerCgroupPath)
310320
if err != nil {
@@ -323,7 +333,7 @@ type Containers struct {
323333
} `json:"containers"`
324334
}
325335

326-
func (n *NodeMangerOperator) ListContainers() (*Containers, error) {
336+
func (n *nodeMangerOperator) ListContainers() (*Containers, error) {
327337
var res Containers
328338
resp, err := n.client.R().SetResult(&res).Get("/ws/v1/node/containers")
329339
if err != nil {
@@ -335,7 +345,7 @@ func (n *NodeMangerOperator) ListContainers() (*Containers, error) {
335345
return &res, nil
336346
}
337347

338-
func (n *NodeMangerOperator) GetContainer(containerID string) (*YarnContainer, error) {
348+
func (n *nodeMangerOperator) GetContainer(containerID string) (*YarnContainer, error) {
339349
listContainers, err := n.ListContainers()
340350
if err != nil {
341351
return nil, err
@@ -348,10 +358,10 @@ func (n *NodeMangerOperator) GetContainer(containerID string) (*YarnContainer, e
348358
return nil, fmt.Errorf("container Not Found")
349359
}
350360

351-
func (n *NodeMangerOperator) GenerateCgroupPath(containerID string) string {
361+
func (n *nodeMangerOperator) GenerateCgroupPath(containerID string) string {
352362
return filepath.Join(n.CgroupPath, containerID)
353363
}
354364

355-
func (n *NodeMangerOperator) GenerateCgroupFullPath(cgroupSubSystem string) string {
365+
func (n *nodeMangerOperator) GenerateCgroupFullPath(cgroupSubSystem string) string {
356366
return filepath.Join(n.CgroupRoot, cgroupSubSystem, n.CgroupPath)
357367
}

0 commit comments

Comments
 (0)