Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (ctrl *VolumeConfigController) manageStandardVolumes(ctx context.Context, r
}{
// /var/log
{
Path: "/var/log",
Path: constants.LogMountPoint,
Mode: 0o755,
SELinuxLabel: "system_u:object_r:var_log_t:s0",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (suite *VolumeConfigSuite) TestReconcileDefaults() {
})

ctest.AssertResources(suite, []resource.ID{
"/var/log",
constants.LogMountPoint,
"/var/log/audit",
"/var/log/containers",
"/var/log/pods",
Expand Down
129 changes: 129 additions & 0 deletions internal/app/machined/pkg/controllers/runtime/kmsg_log_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package runtime

import (
"context"
"fmt"
"io"
"time"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/siderolabs/go-kmsg"
"go.uber.org/zap"

"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
)

// KmsgLogStorageController watches events and forwards them to the system logger.
type KmsgLogStorageController struct {
Drainer *runtime.Drainer
V1Alpha1Logging runtime.LoggingManager

drainSub *runtime.DrainSubscription
logWriter io.WriteCloser
}

// Name implements controller.Controller interface.
func (ctrl *KmsgLogStorageController) Name() string {
return "runtime.KmsgLogStorageController"
}

// Inputs implements controller.Controller interface.
func (ctrl *KmsgLogStorageController) Inputs() []controller.Input {
return nil
}

// Outputs implements controller.Controller interface.
func (ctrl *KmsgLogStorageController) Outputs() []controller.Output {
return nil
}

// Run implements controller.Controller interface.
func (ctrl *KmsgLogStorageController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
var err error

ctrl.logWriter, err = ctrl.V1Alpha1Logging.ServiceLog("kernel").Writer()
if err != nil {
return fmt.Errorf("error opening logger: %w", err)
}

// initilalize kmsg reader early, so that we don't lose position on config changes
reader, err := kmsg.NewReader(kmsg.Follow())
if err != nil {
return fmt.Errorf("error reading kernel messages: %w", err)
}

defer reader.Close() //nolint:errcheck

kmsgCh := reader.Scan(ctx)

for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}

if err = ctrl.deliverLogs(ctx, r, kmsgCh); err != nil {
return fmt.Errorf("error delivering logs: %w", err)
}

r.ResetRestartBackoff()
}
}

//nolint:gocyclo
func (ctrl *KmsgLogStorageController) deliverLogs(ctx context.Context, r controller.Runtime, kmsgCh <-chan kmsg.Packet) error {
if ctrl.drainSub == nil {
ctrl.drainSub = ctrl.Drainer.Subscribe()
}

var (
drainTimer *time.Timer
drainTimerCh <-chan time.Time
)

for {
var msg kmsg.Packet

select {
case <-ctx.Done():
ctrl.drainSub.Cancel()

return nil
case <-r.EventCh():
// config changed, restart the loop
return nil
case <-ctrl.drainSub.EventCh():
// drain started, assume that ksmg is drained if there're no new messages in drainTimeout
drainTimer = time.NewTimer(drainTimeout)
drainTimerCh = drainTimer.C

continue
case <-drainTimerCh:
ctrl.drainSub.Cancel()

return nil
case msg = <-kmsgCh:
if drainTimer != nil {
// if draining, reset the timer as there's a new message
if !drainTimer.Stop() {
<-drainTimer.C
}

drainTimer.Reset(drainTimeout)
}
}

if msg.Err != nil {
return fmt.Errorf("error receiving kernel logs: %w", msg.Err)
}

ctrl.logWriter.Write([]byte(
msg.Message.Timestamp.String() + ": " + msg.Message.Facility.String() + ": " + msg.Message.Message,
))
}
}
239 changes: 239 additions & 0 deletions internal/app/machined/pkg/controllers/runtime/log_persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package runtime

import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"go.uber.org/zap"

"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/resources/block"
)

// LogPersistenceController is a controller that persists logs in files.
type LogPersistenceController struct {
V1Alpha1Logging runtime.LoggingManager

// RLocked by the log writers, Locked by volume handlers
canLog sync.RWMutex
filesMutex sync.Mutex
files map[string]*os.File
}

// Name implements controller.Controller interface.
func (ctrl *LogPersistenceController) Name() string {
return "runtime.LogPersistenceController"
}

// Inputs implements controller.Controller interface.
func (ctrl *LogPersistenceController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: block.NamespaceName,
Type: block.VolumeMountStatusType,
Kind: controller.InputStrong,
},
{
Namespace: block.NamespaceName,
Type: block.VolumeMountRequestType,
Kind: controller.InputDestroyReady,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *LogPersistenceController) Outputs() []controller.Output {
return []controller.Output{
{
Type: block.VolumeMountRequestType,
Kind: controller.OutputShared,
},
}
}

func (ctrl *LogPersistenceController) filenameForId(id string) string {
return filepath.Join(constants.LogMountPoint, id+".log")
}

func (ctrl *LogPersistenceController) getLogFile(id string, overwrite bool) (*os.File, error) {
var err error

f, ok := ctrl.files[id]

if ok && !overwrite {
return f, nil
}

f, err = os.OpenFile(ctrl.filenameForId(id), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, fmt.Errorf("error opening log file for %q: %w", id, err)
}

ctrl.filesMutex.Lock()
ctrl.files[id] = f
ctrl.filesMutex.Unlock()

return f, nil
}

func (ctrl *LogPersistenceController) WriteLog(id string, line []byte) error {
var err error

ctrl.canLog.RLock()
defer ctrl.canLog.RUnlock()

f, err := ctrl.getLogFile(id, false)
if err != nil {
return err
}

if _, err = f.Write(append(line, '\n')); err != nil {
return fmt.Errorf("error writing log line for %q: %w", id, err)
}

return nil
}

func (ctrl *LogPersistenceController) startLogging() {
// here we can start logging activities
ctrl.canLog.Unlock()
}

func (ctrl *LogPersistenceController) stopLogging() error {
// Stop all logging activities, close files
// after this call we should not hold /var/log
ctrl.canLog.Lock()
ctrl.filesMutex.Lock()
defer ctrl.filesMutex.Unlock()

for id := range ctrl.files {
if err := ctrl.files[id].Close(); err != nil {
return fmt.Errorf("error closing log file for %q: %w", id, err)
}
delete(ctrl.files, id)
}

return nil
}

func (ctrl *LogPersistenceController) rotate(id string) error {
ctrl.canLog.Lock()
defer ctrl.canLog.Unlock()

_, ok := ctrl.files[id]
if !ok {
return nil
}

if err := ctrl.files[id].Close(); err != nil {
return fmt.Errorf("error closing log file for %q: %w", id, err)
}

filename := ctrl.filenameForId(id)
err := os.Rename(filename, filename+".1")
if err != nil {
return fmt.Errorf("rename: %w", err)
}

_, err = ctrl.getLogFile(id, true)
if err != nil {
return fmt.Errorf("create: %w", err)
}

return nil
}

// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *LogPersistenceController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
ctrl.V1Alpha1Logging.SetLineWriter(ctrl)

ctrl.files = make(map[string]*os.File)
// Block writes until /var/log is ready
ctrl.canLog.Lock()

ticker := time.NewTicker(5 * time.Second)
tickerC := ticker.C

for {
select {
case <-ctx.Done():
return nil
case <-tickerC:
for id := range ctrl.files {
st, err := os.Stat(ctrl.filenameForId(id))
if err != nil {
return fmt.Errorf("error stat logfile %s: %w", id, err)
}
if st.Size() >= 512*1024 {
err = ctrl.rotate(id)
if err != nil {
return fmt.Errorf("error rotating logfile %s: %w", id, err)
}
}
}
case <-r.EventCh():
}

requestID := ctrl.Name() + "-" + constants.LogMountPoint

// create a volume mount request for the logs volume mount point
// to keep it alive and prevent it from being torn down
if err := safe.WriterModify(ctx, r,
block.NewVolumeMountRequest(block.NamespaceName, requestID),
func(v *block.VolumeMountRequest) error {
v.TypedSpec().Requester = ctrl.Name()
v.TypedSpec().VolumeID = constants.LogMountPoint

return nil
},
); err != nil {
return fmt.Errorf("error creating volume mount request for user volume mount point: %w", err)
}

vms, err := safe.ReaderGetByID[*block.VolumeMountStatus](ctx, r, requestID)
if err != nil {
if state.IsNotFoundError(err) {
// volume mount not ready yet, wait more
continue
}

return fmt.Errorf("error getting volume mount status for log volume: %w", err)
}

switch vms.Metadata().Phase() {
case resource.PhaseRunning:
if !vms.Metadata().Finalizers().Has(ctrl.Name()) {
if err = r.AddFinalizer(ctx, vms.Metadata(), ctrl.Name()); err != nil {
return fmt.Errorf("error adding finalizer to volume mount status for log volume: %w", err)
}

ctrl.startLogging()
}
case resource.PhaseTearingDown:
if vms.Metadata().Finalizers().Has(ctrl.Name()) {
if err = ctrl.stopLogging(); err != nil {
return fmt.Errorf("error stopping persistent logging: %w", err)
}

if err = r.RemoveFinalizer(ctx, vms.Metadata(), ctrl.Name()); err != nil {
return fmt.Errorf("error removing finalizer from volume mount status for log volume: %w", err)
}
}
}
}
}
Loading