Skip to content

Commit 4341b88

Browse files
authored
Add option to save pod container logs (#232)
* add logic go get pod container logs * split into files * save files in controller * refactor method
1 parent 52fddbf commit 4341b88

File tree

11 files changed

+179
-64
lines changed

11 files changed

+179
-64
lines changed

pkg/config/types.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net/http"
77
"os"
8+
"path/filepath"
89
"strings"
910
"time"
1011

@@ -39,6 +40,8 @@ type Config struct {
3940
CallbackServicePort int `json:"callbackServicePort"`
4041
// LeaderElectionResourceLock resource lock type. if empty default (resourcelock.ConfigMapsLeasesResourceLock) is used
4142
LeaderElectionResourceLock string `json:"leaderElectionResourceLock,omitempty"`
43+
// SavePodLog if enabled, pod logs are saved along other with other job files
44+
SavePodLog bool `json:"savePodLog"`
4245

4346
Namespace string `json:"-"`
4447
JobPodTemplate string `json:"-"`
@@ -69,6 +72,14 @@ func (cfg *Config) ReportDirExistsChecker() healthz.Checker {
6972
}
7073
}
7174

75+
func (cfg Config) MkReportDir(executionID string) error {
76+
return os.MkdirAll(filepath.Join(cfg.ReportDirectory, executionID), 0o755)
77+
}
78+
79+
func (cfg Config) ReportFileName(executionID string, name string) string {
80+
return filepath.Join(cfg.ReportDirectory, executionID, name)
81+
}
82+
7283
// Metrics config
7384
type Metrics struct {
7485
Port int `json:"port"`

pkg/controller/controller.go

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
package controller
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
7+
"fmt"
8+
"io"
9+
"io/ioutil"
610

711
"github.com/bakito/batch-job-controller/pkg/lifecycle"
12+
"github.com/go-logr/logr"
813
corev1 "k8s.io/api/core/v1"
914
k8serrors "k8s.io/apimachinery/pkg/api/errors"
10-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/client-go/kubernetes"
16+
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
1117
ctrl "sigs.k8s.io/controller-runtime"
1218
"sigs.k8s.io/controller-runtime/pkg/client"
13-
"sigs.k8s.io/controller-runtime/pkg/event"
1419
"sigs.k8s.io/controller-runtime/pkg/handler"
1520
"sigs.k8s.io/controller-runtime/pkg/log"
1621
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -27,11 +32,17 @@ const (
2732
// PodReconciler reconciler
2833
type PodReconciler struct {
2934
client.Client
35+
coreClient corev1client.CoreV1Interface
3036
Controller lifecycle.Controller
3137
}
3238

3339
// SetupWithManager setup
3440
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
41+
clientset, err := kubernetes.NewForConfig(mgr.GetConfig())
42+
if err != nil {
43+
return err
44+
}
45+
r.coreClient = clientset.CoreV1()
3546
return ctrl.NewControllerManagedBy(mgr).
3647
For(&corev1.Pod{}).
3748
Watches(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}).
@@ -59,40 +70,60 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
5970
executionID := pod.GetLabels()[LabelExecutionID]
6071
node := pod.Spec.NodeName
6172

62-
switch pod.Status.Phase {
63-
case corev1.PodSucceeded:
64-
err = r.Controller.PodTerminated(executionID, node, pod.Status.Phase)
65-
case corev1.PodFailed:
66-
err = r.Controller.PodTerminated(executionID, node, pod.Status.Phase)
67-
}
68-
if err != nil {
69-
if !errors.Is(err, &lifecycle.ExecutionIDNotFound{}) {
70-
podLog.Error(err, "unexpected error")
71-
return reconcile.Result{}, err
73+
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
74+
if r.Controller.Config().SavePodLog {
75+
r.savePodLogs(ctx, pod, podLog, executionID)
76+
}
77+
if err := r.Controller.PodTerminated(executionID, node, pod.Status.Phase); err != nil {
78+
if !errors.Is(err, &lifecycle.ExecutionIDNotFound{}) {
79+
podLog.Error(err, "unexpected error")
80+
return reconcile.Result{}, err
81+
}
7282
}
7383
}
7484

7585
return reconcile.Result{}, nil
7686
}
7787

78-
type podPredicate struct{}
79-
80-
func (podPredicate) Create(e event.CreateEvent) bool {
81-
return matches(e.Object)
88+
func (r *PodReconciler) savePodLogs(ctx context.Context, pod *corev1.Pod, podLog logr.Logger, executionID string) {
89+
for _, c := range pod.Spec.Containers {
90+
clog := podLog.WithValues("container", c.Name)
91+
if l, err := r.getPodLog(ctx, pod.Namespace, pod.Name, c.Name); err != nil {
92+
clog.Info("could not get log of container")
93+
} else {
94+
if fileName, err := r.savePodLog(executionID, c.Name, l); err != nil {
95+
clog.Error(err, "error saving container log file")
96+
} else {
97+
clog.WithValues("name", fileName).Info("saved container log file")
98+
}
99+
}
100+
}
82101
}
83102

84-
func (podPredicate) Update(e event.UpdateEvent) bool {
85-
return matches(e.ObjectNew)
86-
}
103+
func (r *PodReconciler) getPodLog(ctx context.Context, namespace string, name string, containerName string) (string, error) {
104+
podLogOpts := corev1.PodLogOptions{
105+
Container: containerName,
106+
}
107+
req := r.coreClient.Pods(namespace).GetLogs(name, &podLogOpts)
108+
podLogs, err := req.Stream(ctx)
109+
if err != nil {
110+
return "", err
111+
}
112+
defer func() { _ = podLogs.Close() }()
87113

88-
func (podPredicate) Delete(e event.DeleteEvent) bool {
89-
return matches(e.Object)
90-
}
114+
buf := new(bytes.Buffer)
115+
if _, err = io.Copy(buf, podLogs); err != nil {
116+
return "", err
117+
}
118+
str := buf.String()
91119

92-
func (podPredicate) Generic(e event.GenericEvent) bool {
93-
return matches(e.Object)
120+
return str, nil
94121
}
95122

96-
func matches(m metav1.Object) bool {
97-
return m.GetLabels()[LabelExecutionID] != "" && m.GetLabels()[LabelOwner] != ""
123+
func (r *PodReconciler) savePodLog(executionID string, name string, data string) (string, error) {
124+
if err := r.Controller.Config().MkReportDir(executionID); err != nil {
125+
return "", err
126+
}
127+
fileName := r.Controller.Config().ReportFileName(executionID, fmt.Sprintf("container-%s.log", name))
128+
return fileName, ioutil.WriteFile(fileName, []byte(data), 0o600)
98129
}

pkg/controller/controller_test.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,24 @@ package controller
33
import (
44
"context"
55
"fmt"
6+
"os"
67

8+
"github.com/bakito/batch-job-controller/pkg/config"
79
mock_client "github.com/bakito/batch-job-controller/pkg/mocks/client"
810
mock_lifecycle "github.com/bakito/batch-job-controller/pkg/mocks/lifecycle"
911
mock_logr "github.com/bakito/batch-job-controller/pkg/mocks/logr"
12+
"github.com/bakito/batch-job-controller/pkg/test"
1013
"github.com/go-logr/logr"
1114
gm "github.com/golang/mock/gomock"
15+
"github.com/google/uuid"
1216
. "github.com/onsi/ginkgo/v2"
1317
. "github.com/onsi/gomega"
1418
corev1 "k8s.io/api/core/v1"
1519
k8serrors "k8s.io/apimachinery/pkg/api/errors"
1620
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1721
"k8s.io/apimachinery/pkg/runtime/schema"
22+
"k8s.io/client-go/kubernetes/fake"
23+
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
1824
ctrl "sigs.k8s.io/controller-runtime"
1925
"sigs.k8s.io/controller-runtime/pkg/client"
2026
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -62,8 +68,12 @@ var _ = Describe("Controller", func() {
6268
mockClient *mock_client.MockClient
6369
mockSink *mock_logr.MockLogSink
6470
ctx context.Context
71+
coreClient corev1client.CoreV1Interface
72+
cfg config.Config
73+
executionID string
6574
)
6675
BeforeEach(func() {
76+
executionID = uuid.NewString()
6777
mockCtrl = gm.NewController(GinkgoT())
6878
mockController = mock_lifecycle.NewMockController(mockCtrl)
6979
mockClient = mock_client.NewMockClient(mockCtrl)
@@ -72,9 +82,23 @@ var _ = Describe("Controller", func() {
7282
mockSink.EXPECT().Init(gm.Any())
7383
mockSink.EXPECT().Enabled(gm.Any()).AnyTimes().Return(true)
7484
ctx = log.IntoContext(context.TODO(), logr.New(mockSink))
85+
86+
coreClient = fake.NewSimpleClientset().CoreV1()
87+
88+
tmp, err := test.TempDir(executionID)
89+
Ω(err).ShouldNot(HaveOccurred())
90+
cfg = config.Config{
91+
ReportDirectory: tmp,
92+
}
93+
7594
r = &PodReconciler{}
7695
r.Controller = mockController
7796
r.Client = mockClient
97+
r.coreClient = coreClient
98+
99+
DeferCleanup(func() error {
100+
return os.RemoveAll(cfg.ReportDirectory)
101+
})
78102
})
79103
It("should not find an entry", func() {
80104
mockSink.EXPECT().WithValues(gm.Any()).Return(mockSink)
@@ -95,13 +119,22 @@ var _ = Describe("Controller", func() {
95119
Ω(result).ShouldNot(BeNil())
96120
Ω(result.Requeue).Should(BeFalse())
97121
})
98-
It("should update controller on pod succeeded", func() {
99-
mockSink.EXPECT().WithValues(gm.Any()).Return(mockSink)
122+
It("should update controller on pod succeeded with logs of 2 containers", func() {
123+
cfg.SavePodLog = true
124+
mockController.EXPECT().Config().Return(cfg).AnyTimes()
125+
mockSink.EXPECT().WithValues(gm.Any()).Return(mockSink).AnyTimes()
126+
mockSink.EXPECT().Info(gm.Any(), gm.Any()).AnyTimes()
100127
mockClient.EXPECT().Get(gm.Any(), gm.Any(), gm.AssignableToTypeOf(&corev1.Pod{})).
101128
Do(func(ctx context.Context, key client.ObjectKey, pod *corev1.Pod) error {
102129
pod.Status = corev1.PodStatus{
103130
Phase: corev1.PodSucceeded,
104131
}
132+
pod.Spec = corev1.PodSpec{
133+
Containers: []corev1.Container{
134+
{Name: "container-a"},
135+
{Name: "container-b"},
136+
},
137+
}
105138
return nil
106139
})
107140
mockController.EXPECT().PodTerminated(gm.Any(), gm.Any(), corev1.PodSucceeded)
@@ -112,6 +145,7 @@ var _ = Describe("Controller", func() {
112145
Ω(result.Requeue).Should(BeFalse())
113146
})
114147
It("should update controller on pod failed", func() {
148+
mockController.EXPECT().Config().Return(cfg)
115149
mockSink.EXPECT().WithValues(gm.Any()).Return(mockSink)
116150
mockClient.EXPECT().Get(gm.Any(), gm.Any(), gm.AssignableToTypeOf(&corev1.Pod{})).
117151
Do(func(ctx context.Context, key client.ObjectKey, pod *corev1.Pod) error {
@@ -128,6 +162,7 @@ var _ = Describe("Controller", func() {
128162
Ω(result.Requeue).Should(BeFalse())
129163
})
130164
It("should return error on update controller error", func() {
165+
mockController.EXPECT().Config().Return(cfg)
131166
mockSink.EXPECT().WithValues(gm.Any()).Return(mockSink)
132167
mockSink.EXPECT().Error(gm.Any(), gm.Any())
133168
mockClient.EXPECT().Get(gm.Any(), gm.Any(), gm.AssignableToTypeOf(&corev1.Pod{})).

pkg/controller/predicate.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package controller
2+
3+
import (
4+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5+
"sigs.k8s.io/controller-runtime/pkg/event"
6+
)
7+
8+
type podPredicate struct{}
9+
10+
func (podPredicate) Create(e event.CreateEvent) bool {
11+
return matches(e.Object)
12+
}
13+
14+
func (podPredicate) Update(e event.UpdateEvent) bool {
15+
return matches(e.ObjectNew)
16+
}
17+
18+
func (podPredicate) Delete(e event.DeleteEvent) bool {
19+
return matches(e.Object)
20+
}
21+
22+
func (podPredicate) Generic(e event.GenericEvent) bool {
23+
return matches(e.Object)
24+
}
25+
26+
func matches(m metav1.Object) bool {
27+
return m.GetLabels()[LabelExecutionID] != "" && m.GetLabels()[LabelOwner] != ""
28+
}

pkg/http/middleware.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const (
1212

1313
func (s *PostServer) middleware(ctx *gin.Context) {
1414
if s.Controller != nil {
15-
if !s.Controller.Has(nodeAndID(ctx)) && !s.DevMode {
15+
if !s.Controller.Has(nodeAndID(ctx)) && !s.Config.DevMode {
1616
ctx.String(http.StatusNotAcceptable, errorMiddlewareNotAcceptable)
1717
ctx.Abort()
1818
return

pkg/http/postserver-file.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ func (s *PostServer) postFile(ctx *gin.Context) {
1818
}
1919

2020
func (s *PostServer) saveFormFilesCallback(ctx *gin.Context, postLog logr.Logger, executionID string, node string, file *multipart.FileHeader) error {
21-
if err := s.mkdir(executionID); err != nil {
21+
if err := s.Config.MkReportDir(executionID); err != nil {
2222
ctx.String(http.StatusInternalServerError, err.Error())
2323
postLog.Error(err, "error creating upload directory")
2424
return err
2525
}
2626

27-
err := ctx.SaveUploadedFile(file, filepath.Join(s.ReportPath, executionID, fmt.Sprintf("%s-%s", node, file.Filename)))
27+
err := ctx.SaveUploadedFile(file, s.Config.ReportFileName(executionID, fmt.Sprintf("%s-%s", node, file.Filename)))
2828
if err != nil {
2929
ctx.String(http.StatusInternalServerError, err.Error())
3030
postLog.Error(err, "error saving file")

pkg/http/postserver.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"fmt"
55
"io/ioutil"
66
"net/http/pprof"
7-
"os"
8-
"path/filepath"
97

108
"github.com/bakito/batch-job-controller/pkg/config"
119
"github.com/bakito/batch-job-controller/pkg/lifecycle"
@@ -41,9 +39,9 @@ func GenericAPIServer(port int, cfg *config.Config) manager.Runnable {
4139
Kind: "internal",
4240
Handler: r,
4341
Log: ctrl.Log.WithName("api-server"),
42+
Config: cfg,
4443
},
45-
ReportPath: cfg.ReportDirectory,
46-
DevMode: cfg.DevMode,
44+
Config: cfg,
4745
}
4846

4947
rep := r.Group(CallbackBasePath)
@@ -84,8 +82,7 @@ func SetupProfiling(r *gin.Engine) {
8482
type PostServer struct {
8583
*Server
8684
Controller lifecycle.Controller
87-
ReportPath string
88-
DevMode bool
85+
Config *config.Config
8986
EventRecorder record.EventRecorder
9087
Client client.Reader
9188
}
@@ -118,17 +115,13 @@ func nodeAndID(ctx *gin.Context) (string, string) {
118115

119116
// SaveFile save a received file
120117
func (s *PostServer) SaveFile(executionID, name string, data []byte) (string, error) {
121-
if err := s.mkdir(executionID); err != nil {
118+
if err := s.Config.MkReportDir(executionID); err != nil {
122119
return "", err
123120
}
124-
fileName := filepath.Join(s.ReportPath, executionID, name)
121+
fileName := s.Config.ReportFileName(executionID, name)
125122
return fileName, ioutil.WriteFile(fileName, data, 0o600)
126123
}
127124

128-
func (s *PostServer) mkdir(executionID string) error {
129-
return os.MkdirAll(filepath.Join(s.ReportPath, executionID), 0o755)
130-
}
131-
132125
// Name the name of the server
133126
func (s *PostServer) Name() string {
134127
return "api-server"

0 commit comments

Comments
 (0)