diff --git a/kubernetes/cmd/task-executor/main.go b/kubernetes/cmd/task-executor/main.go index 6000af05..1872b5bc 100644 --- a/kubernetes/cmd/task-executor/main.go +++ b/kubernetes/cmd/task-executor/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "fmt" "net/http" "os" "os/signal" @@ -36,7 +37,10 @@ func main() { cfg := config.NewConfig() cfg.LoadFromEnv() cfg.LoadFromFlags() - + if err := cfg.InitKlog(); err != nil { + fmt.Println("failed to init klog") + os.Exit(1) + } klog.InfoS("task-executor starting", "dataDir", cfg.DataDir, "listenAddr", cfg.ListenAddr, "sidecarMode", cfg.EnableSidecarMode) // Initialize TaskStore diff --git a/kubernetes/go.mod b/kubernetes/go.mod index 4b20f831..249b61a4 100644 --- a/kubernetes/go.mod +++ b/kubernetes/go.mod @@ -88,6 +88,7 @@ require ( google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.33.0 // indirect k8s.io/apiserver v0.33.0 // indirect diff --git a/kubernetes/go.sum b/kubernetes/go.sum index b1d7d9ae..3c5ca7ac 100644 --- a/kubernetes/go.sum +++ b/kubernetes/go.sum @@ -229,6 +229,8 @@ gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSP gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kubernetes/internal/task-executor/config/config.go b/kubernetes/internal/task-executor/config/config.go index 2ba05ee5..c70e7195 100644 --- a/kubernetes/internal/task-executor/config/config.go +++ b/kubernetes/internal/task-executor/config/config.go @@ -16,8 +16,13 @@ package config import ( "flag" + "fmt" "os" + "path" "time" + + "gopkg.in/natefinch/lumberjack.v2" + "k8s.io/klog/v2" ) type Config struct { @@ -29,6 +34,10 @@ type Config struct { ReconcileInterval time.Duration EnableSidecarMode bool MainContainerName string + LogMaxSize int + LogMaxBackups int + LogMaxAge int + LogDir string } func NewConfig() *Config { @@ -41,6 +50,10 @@ func NewConfig() *Config { ReconcileInterval: 500 * time.Millisecond, EnableSidecarMode: false, MainContainerName: "main", + LogMaxSize: 100, + LogMaxBackups: 10, + LogMaxAge: 7, + LogDir: "logs", } } @@ -68,5 +81,31 @@ func (c *Config) LoadFromFlags() { flag.StringVar(&c.CRISocket, "cri-socket", c.CRISocket, "CRI socket path for container runner mode") flag.BoolVar(&c.EnableSidecarMode, "enable-sidecar-mode", c.EnableSidecarMode, "enable sidecar runner mode") flag.StringVar(&c.MainContainerName, "main-container-name", c.MainContainerName, "main container name") + // set log flags + flag.IntVar(&c.LogMaxSize, "log-max-size", c.LogMaxSize, "maximum log file size in MB") + flag.IntVar(&c.LogMaxBackups, "log-max-backups", c.LogMaxBackups, "maximum number of log backup files") + flag.IntVar(&c.LogMaxAge, "log-max-age", c.LogMaxAge, "maximum number of days to keep log files") + flag.StringVar(&c.LogDir, "log-dir", c.LogDir, "log file directory") flag.Parse() } + +func (c *Config) InitKlog() error { + if err := os.MkdirAll(c.LogDir, 0755); err != nil { + return fmt.Errorf("failed to create log directory: %w", err) + } + logFile := path.Join(c.LogDir, "task-executor.log") + fs := flag.NewFlagSet("klog", flag.ContinueOnError) + klog.InitFlags(fs) + fs.Set("logtostderr", "false") + fs.Set("alsologtostderr", "false") + fs.Set("stderrthreshold", "FATAL") + fs.Set("one_output", "true") + klog.SetOutput(&lumberjack.Logger{ + Filename: logFile, + MaxSize: c.LogMaxSize, + MaxBackups: c.LogMaxBackups, + MaxAge: c.LogMaxAge, + Compress: true, + }) + return nil +} diff --git a/kubernetes/internal/task-executor/manager/task_manager.go b/kubernetes/internal/task-executor/manager/task_manager.go index 0201f5c8..9a00a772 100644 --- a/kubernetes/internal/task-executor/manager/task_manager.go +++ b/kubernetes/internal/task-executor/manager/task_manager.go @@ -31,7 +31,6 @@ import ( ) const ( - // Maximum number of concurrent tasks (enforcing single task limitation) maxConcurrentTasks = 1 ) @@ -43,11 +42,8 @@ type taskManager struct { executor runtime.Executor config *config.Config - // stopping tracks tasks that are currently being stopped. - // This prevents duplicate Stop calls and status rollback during async stop. stopping map[string]bool - // Reconcile loop control stopCh chan struct{} doneCh chan struct{} } @@ -75,8 +71,7 @@ func NewTaskManager(cfg *config.Config, taskStore store.TaskStore, exec runtime. }, nil } -// isTaskActive checks if the task is counting towards the concurrency limit. -// A task is active if it is NOT marked for deletion AND NOT in a terminated state. +// isTaskActive checks if the task is counting towards the concurrency limit func (m *taskManager) isTaskActive(task *types.Task) bool { if task == nil { return false @@ -88,8 +83,7 @@ func (m *taskManager) isTaskActive(task *types.Task) bool { return state == types.TaskStatePending || state == types.TaskStateRunning } -// countActiveTasks counts tasks that are active (not deleted AND not terminated). -// Must be called with lock held. +// countActiveTasks counts tasks that are active func (m *taskManager) countActiveTasks() int { count := 0 for _, task := range m.tasks { @@ -100,7 +94,6 @@ func (m *taskManager) countActiveTasks() int { return count } -// Create creates a new task and starts execution. func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task, error) { if task == nil { return nil, fmt.Errorf("task cannot be nil") @@ -112,31 +105,25 @@ func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task m.mu.Lock() defer m.mu.Unlock() - // Check if task already exists if _, exists := m.tasks[task.Name]; exists { return nil, fmt.Errorf("task %s already exists", task.Name) } - // Enforce single task limitation using real-time count if m.countActiveTasks() >= maxConcurrentTasks { return nil, fmt.Errorf("maximum concurrent tasks (%d) reached, cannot create new task", maxConcurrentTasks) } - // Persist task to store if err := m.store.Create(ctx, task); err != nil { return nil, fmt.Errorf("failed to persist task: %w", err) } - // Start task execution if err := m.executor.Start(ctx, task); err != nil { - // Rollback - delete from store if delErr := m.store.Delete(ctx, task.Name); delErr != nil { klog.ErrorS(delErr, "failed to rollback task creation", "name", task.Name) } return nil, fmt.Errorf("failed to start task: %w", err) } - // Inspect immediately to populate status (Running/Waiting) so API response is not empty if status, err := m.executor.Inspect(ctx, task); err == nil { task.Status = *status // Persist the PID and initial status @@ -147,21 +134,17 @@ func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task klog.ErrorS(err, "failed to inspect task after start", "name", task.Name) } - // Safety fallback: Ensure task has a state if task.Status.State == "" { task.Status.State = types.TaskStatePending } - // Add to memory m.tasks[task.Name] = task klog.InfoS("task created successfully", "name", task.Name) return task, nil } -// Sync synchronizes the current task list with the desired state. -// It deletes tasks not in the desired list and creates new ones. -// Returns the current task list and any errors encountered during sync. +// Sync synchronizes the current task list with the desired state func (m *taskManager) Sync(ctx context.Context, desired []*types.Task) ([]*types.Task, error) { if desired == nil { return nil, fmt.Errorf("desired task list cannot be nil") @@ -170,7 +153,6 @@ func (m *taskManager) Sync(ctx context.Context, desired []*types.Task) ([]*types m.mu.Lock() defer m.mu.Unlock() - // Build desired task map desiredMap := make(map[string]*types.Task) for _, task := range desired { if task != nil && task.Name != "" { @@ -178,10 +160,8 @@ func (m *taskManager) Sync(ctx context.Context, desired []*types.Task) ([]*types } } - // Collect errors during sync var syncErrors []error - // Delete tasks not in desired list for name, task := range m.tasks { if _, ok := desiredMap[name]; !ok { if err := m.softDeleteLocked(ctx, task); err != nil { @@ -191,7 +171,6 @@ func (m *taskManager) Sync(ctx context.Context, desired []*types.Task) ([]*types } } - // Create new tasks for name, task := range desiredMap { if _, exists := m.tasks[name]; !exists { if err := m.createTaskLocked(ctx, task); err != nil { @@ -201,14 +180,12 @@ func (m *taskManager) Sync(ctx context.Context, desired []*types.Task) ([]*types } } - // Return current task list with aggregated errors if len(syncErrors) > 0 { return m.listTasksLocked(), errors.Join(syncErrors...) } return m.listTasksLocked(), nil } -// Get retrieves a task by name. func (m *taskManager) Get(ctx context.Context, name string) (*types.Task, error) { if name == "" { return nil, fmt.Errorf("task name cannot be empty") @@ -225,7 +202,6 @@ func (m *taskManager) Get(ctx context.Context, name string) (*types.Task, error) return task, nil } -// List returns all tasks. func (m *taskManager) List(ctx context.Context) ([]*types.Task, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -233,8 +209,7 @@ func (m *taskManager) List(ctx context.Context) ([]*types.Task, error) { return m.listTasksLocked(), nil } -// Delete removes a task by marking it for deletion (soft delete). -// The reconcile loop will handle the actual stopping and removal. +// Delete removes a task by marking it for deletion func (m *taskManager) Delete(ctx context.Context, name string) error { if name == "" { return fmt.Errorf("task name cannot be empty") @@ -251,10 +226,10 @@ func (m *taskManager) Delete(ctx context.Context, name string) error { return m.softDeleteLocked(ctx, task) } -// softDeleteLocked marks a task for deletion without acquiring the lock. +// softDeleteLocked marks a task for deletion func (m *taskManager) softDeleteLocked(ctx context.Context, task *types.Task) error { if task.DeletionTimestamp != nil { - return nil // Already marked + return nil } now := time.Now() @@ -268,22 +243,19 @@ func (m *taskManager) softDeleteLocked(ctx context.Context, task *types.Task) er return nil } -// Start initializes the manager, loads tasks from store, and starts the reconcile loop. +// Start initializes the manager, loads tasks from store, and starts the reconcile loop func (m *taskManager) Start(ctx context.Context) { klog.InfoS("starting task manager") - // Recover tasks from store if err := m.recoverTasks(ctx); err != nil { klog.ErrorS(err, "failed to recover tasks from store") } - // Start reconcile loop go m.reconcileLoop(ctx) klog.InfoS("task manager started") } -// Stop stops the reconcile loop and cleans up resources. func (m *taskManager) Stop() { klog.InfoS("stopping task manager") close(m.stopCh) @@ -291,35 +263,29 @@ func (m *taskManager) Stop() { klog.InfoS("task manager stopped") } -// createTaskLocked creates a task without acquiring the lock (must be called with lock held). +// createTaskLocked creates a task without acquiring the lock func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) error { if task == nil || task.Name == "" { return fmt.Errorf("invalid task") } - // Check if already exists if _, exists := m.tasks[task.Name]; exists { return fmt.Errorf("task %s already exists", task.Name) } - // Enforce single task limitation using real-time count if m.countActiveTasks() >= maxConcurrentTasks { return fmt.Errorf("maximum concurrent tasks (%d) reached, cannot create new task", maxConcurrentTasks) } - // Persist to store if err := m.store.Create(ctx, task); err != nil { return fmt.Errorf("failed to persist task: %w", err) } - // Start execution if err := m.executor.Start(ctx, task); err != nil { - // Rollback m.store.Delete(ctx, task.Name) return fmt.Errorf("failed to start task: %w", err) } - // Inspect immediately to populate status (Running/Waiting) so API response is not empty if status, err := m.executor.Inspect(ctx, task); err == nil { task.Status = *status // Persist the PID and initial status @@ -330,12 +296,11 @@ func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) er klog.ErrorS(err, "failed to inspect task after start", "name", task.Name) } - // Add to memory m.tasks[task.Name] = task return nil } -// listTasksLocked returns all tasks without acquiring the lock (must be called with lock held). +// listTasksLocked returns all tasks without acquiring the lock func (m *taskManager) listTasksLocked() []*types.Task { tasks := make([]*types.Task, 0, len(m.tasks)) for _, task := range m.tasks { @@ -346,7 +311,6 @@ func (m *taskManager) listTasksLocked() []*types.Task { return tasks } -// recoverTasks loads tasks from store and recovers their state. func (m *taskManager) recoverTasks(ctx context.Context) error { klog.InfoS("recovering tasks from store") @@ -363,17 +327,14 @@ func (m *taskManager) recoverTasks(ctx context.Context) error { continue } - // Inspect task to get current status status, err := m.executor.Inspect(ctx, task) if err != nil { klog.ErrorS(err, "failed to inspect task during recovery", "name", task.Name) continue } - // Update task status task.Status = *status - // Add to memory m.tasks[task.Name] = task klog.InfoS("recovered task", "name", task.Name, "state", task.Status.State, "deleting", task.DeletionTimestamp != nil) @@ -383,7 +344,6 @@ func (m *taskManager) recoverTasks(ctx context.Context) error { return nil } -// reconcileLoop periodically synchronizes task states. func (m *taskManager) reconcileLoop(ctx context.Context) { ticker := time.NewTicker(m.config.ReconcileInterval) defer ticker.Stop() @@ -403,7 +363,6 @@ func (m *taskManager) reconcileLoop(ctx context.Context) { } } -// reconcileTasks updates the status of all tasks and handles deletion. func (m *taskManager) reconcileTasks(ctx context.Context) { m.mu.Lock() defer m.mu.Unlock() @@ -421,7 +380,6 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { } state := status.State - // Determine if we should stop the task shouldStop := false stopReason := "" @@ -436,10 +394,9 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { } if shouldStop { - klog.InfoS("stopping task", "name", name, "reason", stopReason) + klog.InfoS("stopping task", "name", name, "reason", stopReason, "current_state", state) m.stopping[name] = true - // Async stop to avoid blocking the reconcile loop go func(t *types.Task, taskName string) { defer func() { m.mu.Lock() @@ -447,6 +404,7 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { m.mu.Unlock() }() + klog.V(1).InfoS("task stop initiated", "name", taskName, "reason", stopReason) if err := m.executor.Stop(ctx, t); err != nil { klog.ErrorS(err, "failed to stop task", "name", taskName) } @@ -454,16 +412,19 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { }(task, name) } - // Determine if we can finalize deletion if task.DeletionTimestamp != nil && isTerminalState(state) { klog.InfoS("task terminated, finalizing deletion", "name", name) tasksToDelete = append(tasksToDelete, name) } - // Update status only if not stopping (prevent status rollback during async stop) if !m.stopping[name] { if !reflect.DeepEqual(task.Status, *status) { + oldState := task.Status.State task.Status = *status + // Log state changes only + if oldState != status.State { + klog.InfoS("task state changed", "name", name, "oldState", oldState, "newState", status.State) + } if err := m.store.Update(ctx, task); err != nil { klog.ErrorS(err, "failed to update task status in store", "name", name) } @@ -471,7 +432,6 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { } } - // Finalize deletions for _, name := range tasksToDelete { if _, exists := m.tasks[name]; !exists { continue @@ -488,7 +448,7 @@ func (m *taskManager) reconcileTasks(ctx context.Context) { } } -// isTerminalState returns true if the task will not transition to another state. +// isTerminalState returns true if the task will not transition to another state func isTerminalState(state types.TaskState) bool { return state == types.TaskStateSucceeded || state == types.TaskStateFailed || diff --git a/kubernetes/internal/task-executor/runtime/composite.go b/kubernetes/internal/task-executor/runtime/composite.go index 31c6349e..7cceed16 100644 --- a/kubernetes/internal/task-executor/runtime/composite.go +++ b/kubernetes/internal/task-executor/runtime/composite.go @@ -29,19 +29,17 @@ func NewExecutor(cfg *config.Config) (Executor, error) { return nil, fmt.Errorf("config cannot be nil") } - // 1. Initialize ProcessExecutor (Always available for Host/Sidecar modes) procExec, err := NewProcessExecutor(cfg) if err != nil { return nil, fmt.Errorf("failed to create process executor: %w", err) } - klog.InfoS("process executor initialized.", "enableSidecar", cfg.EnableSidecarMode, "mainContainer", cfg.MainContainerName) + klog.InfoS("process executor initialized", "enableSidecar", cfg.EnableSidecarMode, "mainContainer", cfg.MainContainerName) - // 2. Initialize ContainerExecutor containerExec, err := newContainerExecutor(cfg) if err != nil { return nil, fmt.Errorf("failed to create container executor: %w", err) } - // 3. Return Composite + return &compositeExecutor{ processExec: procExec, containerExec: containerExec, diff --git a/kubernetes/internal/task-executor/runtime/process.go b/kubernetes/internal/task-executor/runtime/process.go index 340bc781..607854f5 100644 --- a/kubernetes/internal/task-executor/runtime/process.go +++ b/kubernetes/internal/task-executor/runtime/process.go @@ -61,7 +61,6 @@ func (e *processExecutor) Start(ctx context.Context, task *types.Task) error { pidPath := filepath.Join(taskDir, PidFile) exitPath := filepath.Join(taskDir, ExitFile) - // 1. Construct the user command securely var cmdList []string if task.Process != nil { cmdList = append(task.Process.Command, task.Process.Args...) @@ -73,21 +72,17 @@ func (e *processExecutor) Start(ctx context.Context, task *types.Task) error { return fmt.Errorf("no command specified in process spec (task name: %s)", task.Name) } - // Use shell escaping to prevent command injection safeCmdStr := shellEscape(cmdList) shimScript := e.buildShimScript(exitPath, safeCmdStr) - // 2. Prepare the execution command based on mode var cmd *exec.Cmd if e.config.EnableSidecarMode { - // Sidecar Logic: Find target PID and use nsenter targetPID, err := e.findPidByEnvVar("SANDBOX_MAIN_CONTAINER", e.config.MainContainerName) if err != nil { return fmt.Errorf("failed to resolve target PID: %w", err) } - // Inherit environment variables from the target process (Main Container) targetEnv, err := getProcEnviron(targetPID) if err != nil { return fmt.Errorf("failed to read target process environment: %w", err) @@ -104,22 +99,16 @@ func (e *processExecutor) Start(ctx context.Context, task *types.Task) error { klog.InfoS("Starting sidecar task", "id", task.Name, "targetPID", targetPID) } else { - // Host Logic: Direct execution - // Use exec.Command instead of CommandContext to ensure the process survives - // after the HTTP request context is canceled. cmd = exec.Command("/bin/sh", "-c", shimScript) cmd.Env = os.Environ() klog.InfoS("Starting host task", "name", task.Name, "cmd", safeCmdStr, "exitPath", exitPath) } - // Set process group ID to isolate from parent process lifecycle - // This applies to both Host and Sidecar (nsenter) processes cmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, // Create new process group - Pgid: 0, // Use PID as PGID + Setpgid: true, + Pgid: 0, } - // 3. Execute common logic (logs, shim start) return e.executeCommand(task, cmd, pidPath) } @@ -151,16 +140,13 @@ func (e *processExecutor) executeCommand(task *types.Task, cmd *exec.Cmd, pidPat cmd.Stdout = stdoutFile cmd.Stderr = stderrFile - // Apply environment variables from ProcessTask spec if task.Process != nil { - // Add task-specific environment variables for _, env := range task.Process.Env { if env.Name != "" { cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", env.Name, env.Value)) } } - // Apply working directory if task.Process.WorkingDir != "" { cmd.Dir = task.Process.WorkingDir klog.InfoS("Set working directory", "name", task.Name, "workingDir", task.Process.WorkingDir) @@ -179,7 +165,6 @@ func (e *processExecutor) executeCommand(task *types.Task, cmd *exec.Cmd, pidPat pid := cmd.Process.Pid if err := os.WriteFile(pidPath, []byte(strconv.Itoa(pid)), 0644); err != nil { klog.ErrorS(err, "failed to write pid file", "name", task.Name) - // Try to kill the process since we failed to track it _ = cmd.Process.Kill() stdoutFile.Close() stderrFile.Close() @@ -188,11 +173,9 @@ func (e *processExecutor) executeCommand(task *types.Task, cmd *exec.Cmd, pidPat klog.InfoS("Task command started successfully", "name", task.Name, "pid", pid) - // Close file descriptors in parent; child process has inherited them stdoutFile.Close() stderrFile.Close() - // Wait for process in background go func() { if err := cmd.Wait(); err != nil { klog.ErrorS(err, "task process exited with error", "name", task.Name) @@ -240,10 +223,8 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types status := &types.Status{ State: types.TaskStateUnknown, } - // Prepare a single sub-status for the process subStatus := types.SubStatus{} var pid int - // 1. Check Exit File (Completed) if exitData, err := os.ReadFile(exitPath); err == nil { fileInfo, _ := os.Stat(exitPath) exitCode, _ := strconv.Atoi(string(exitData)) @@ -260,7 +241,6 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types subStatus.Reason = "Failed" } - // Try to read start time from PID file if pidFileInfo, err := os.Stat(pidPath); err == nil { startedAt := pidFileInfo.ModTime() subStatus.StartedAt = &startedAt @@ -270,7 +250,6 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types return status, nil } - // 2. Check PID File (Running) if pidData, err := os.ReadFile(pidPath); err == nil { pid, _ = strconv.Atoi(strings.TrimSpace(string(pidData))) fileInfo, _ := os.Stat(pidPath) @@ -289,19 +268,16 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types } } } else { - // Process crashed status.State = types.TaskStateFailed - subStatus.ExitCode = 137 // Assume kill/crash + subStatus.ExitCode = 137 subStatus.Reason = "ProcessCrashed" subStatus.Message = "Process exited without writing exit code" - // Use ModTime as FinishedAt for crash approximation subStatus.FinishedAt = &startedAt } status.SubStatuses = []types.SubStatus{subStatus} return status, nil } - // 3. Pending status.State = types.TaskStatePending subStatus.Reason = "Pending" status.SubStatuses = []types.SubStatus{subStatus} @@ -310,7 +286,6 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types } func (e *processExecutor) Stop(ctx context.Context, task *types.Task) error { - // Read from pid file (Root PID: nsenter or sh) taskDir, err := utils.SafeJoin(e.rootDir, task.Name) if err != nil { return fmt.Errorf("invalid task name: %w", err) @@ -318,7 +293,7 @@ func (e *processExecutor) Stop(ctx context.Context, task *types.Task) error { pidPath := filepath.Join(taskDir, PidFile) pidData, err := os.ReadFile(pidPath) if err != nil { - return nil // pid file does not exist, process might not be started + return nil } var pid int pid, err = strconv.Atoi(strings.TrimSpace(string(pidData))) @@ -327,29 +302,21 @@ func (e *processExecutor) Stop(ctx context.Context, task *types.Task) error { } klog.InfoS("Read PID from pid file", "name", task.Name, "pid", pid) - // Target the process group (negative PID) pgid := -pid - // Determine target PID to signal targetPID := 0 if e.config.EnableSidecarMode { - // In Sidecar mode, pid is nsenter. We need to signal its child (Shim). - // We use /proc//task//children which is O(1) compared to scanning /proc. children, err := getChildrenPIDs(pid) if err == nil && len(children) > 0 { - targetPID = children[0] // Assume first child is Shim + targetPID = children[0] klog.InfoS("Sidecar mode: targeted Shim process via /proc/children", "nsenterPID", pid, "shimPID", targetPID) } else { klog.Warning("Sidecar mode: failed to find child process via /proc/children, falling back to PGID", "pid", pid, "err", err) } } else { - // In Host mode, pid is the Shim itself. targetPID = pid } - // 1. Send SIGTERM - // If we found a specific target (Shim), signal it. It will trap and forward to child. - // If not (or if signal fails), fallback to signaling the group. killedShim := false if targetPID > 0 { if err := syscall.Kill(targetPID, syscall.SIGTERM); err == nil { @@ -360,13 +327,9 @@ func (e *processExecutor) Stop(ctx context.Context, task *types.Task) error { } if !killedShim { - // Fallback: kill the group. - // Note: In Sidecar mode, this might kill nsenter before Shim exits, risking zombies. _ = syscall.Kill(pgid, syscall.SIGTERM) } - // 2. Wait for process to exit (Graceful shutdown) - // Poll every 500ms for up to 10 seconds timeout := 10 * time.Second deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { @@ -376,7 +339,6 @@ func (e *processExecutor) Stop(ctx context.Context, task *types.Task) error { time.Sleep(500 * time.Millisecond) } - // 3. Force Kill (SIGKILL) klog.InfoS("Process did not exit after timeout, sending SIGKILL", "pgid", pgid) if targetPID > 0 { _ = syscall.Kill(targetPID, syscall.SIGKILL) @@ -386,8 +348,7 @@ func (e *processExecutor) Stop(ctx context.Context, task *types.Task) error { return nil } -// getChildrenPIDs reads /proc//task//children to find direct children. -// This requires kernel 3.5+ and CONFIG_PROC_CHILDREN. +// getChildrenPIDs reads /proc//task//children to find direct children func getChildrenPIDs(pid int) ([]int, error) { path := fmt.Sprintf("/proc/%d/task/%d/children", pid, pid) data, err := os.ReadFile(path) @@ -404,7 +365,6 @@ func getChildrenPIDs(pid int) ([]int, error) { return pids, nil } -// Helpers func isProcessRunning(pid int) bool { process, err := os.FindProcess(pid) if err != nil { @@ -429,8 +389,7 @@ func shellEscapePath(s string) string { return "'" + strings.ReplaceAll(s, "'", "'\\''") + "'" } -// findPidByEnvVar finds a process by checking for a specific environment variable. -// It looks for processes with SANDBOX_MAIN_CONTAINER= in their environment. +// findPidByEnvVar finds a process by checking for a specific environment variable func (e *processExecutor) findPidByEnvVar(envName, expectedValue string) (int, error) { procDir, err := os.Open("/proc") if err != nil { @@ -475,8 +434,7 @@ func (e *processExecutor) findPidByEnvVar(envName, expectedValue string) (int, e return 0, fmt.Errorf("no process found with environment variable %s=%s", envName, expectedValue) } -// getProcEnviron reads the environment variables of a process from /proc//environ. -// It returns a list of "KEY=VALUE" strings. +// getProcEnviron reads environment variables from /proc//environ func getProcEnviron(pid int) ([]string, error) { envPath := filepath.Join("/proc", strconv.Itoa(pid), "environ") data, err := os.ReadFile(envPath) diff --git a/kubernetes/internal/task-executor/server/handler.go b/kubernetes/internal/task-executor/server/handler.go index 7dddfe42..522b94b3 100644 --- a/kubernetes/internal/task-executor/server/handler.go +++ b/kubernetes/internal/task-executor/server/handler.go @@ -30,7 +30,7 @@ import ( api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) -// ErrorResponse represents a standard error response. +// ErrorResponse represents a standard error response type ErrorResponse struct { Code string `json:"code"` Message string `json:"message"` @@ -60,27 +60,23 @@ func (h *Handler) CreateTask(w http.ResponseWriter, r *http.Request) { return } - // Parse request body var apiTask api.Task if err := json.NewDecoder(r.Body).Decode(&apiTask); err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) return } - // Validate task if apiTask.Name == "" { writeError(w, http.StatusBadRequest, "task name is required") return } - // Convert to internal model task := h.convertAPIToInternalTask(&apiTask) if task == nil { writeError(w, http.StatusBadRequest, "failed to convert task") return } - // Create task created, err := h.manager.Create(r.Context(), task) if err != nil { klog.ErrorS(err, "failed to create task", "name", apiTask.Name) @@ -88,7 +84,6 @@ func (h *Handler) CreateTask(w http.ResponseWriter, r *http.Request) { return } - // Convert back to API model response := convertInternalToAPITask(created) w.Header().Set("Content-Type", "application/json") @@ -104,18 +99,16 @@ func (h *Handler) SyncTasks(w http.ResponseWriter, r *http.Request) { return } - // Parse request body - array of tasks var apiTasks []api.Task if err := json.NewDecoder(r.Body).Decode(&apiTasks); err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) return } - // Convert to internal model desired := make([]*types.Task, 0, len(apiTasks)) for i := range apiTasks { if apiTasks[i].Name == "" { - continue // Skip invalid tasks + continue } task := h.convertAPIToInternalTask(&apiTasks[i]) if task != nil { @@ -123,7 +116,6 @@ func (h *Handler) SyncTasks(w http.ResponseWriter, r *http.Request) { } } - // Sync tasks current, err := h.manager.Sync(r.Context(), desired) if err != nil { klog.ErrorS(err, "failed to sync tasks") @@ -131,7 +123,6 @@ func (h *Handler) SyncTasks(w http.ResponseWriter, r *http.Request) { return } - // Convert back to API model response := make([]api.Task, 0, len(current)) for _, task := range current { if task != nil { @@ -142,7 +133,7 @@ func (h *Handler) SyncTasks(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) - klog.InfoS("tasks synced via API", "count", len(response)) + klog.V(1).InfoS("tasks synced via API", "count", len(response)) } func (h *Handler) GetTask(w http.ResponseWriter, r *http.Request) { @@ -158,7 +149,6 @@ func (h *Handler) GetTask(w http.ResponseWriter, r *http.Request) { return } - // Get task task, err := h.manager.Get(r.Context(), taskID) if err != nil { klog.ErrorS(err, "failed to get task", "id", taskID) @@ -166,7 +156,6 @@ func (h *Handler) GetTask(w http.ResponseWriter, r *http.Request) { return } - // Convert to API model response := convertInternalToAPITask(task) w.Header().Set("Content-Type", "application/json") @@ -179,7 +168,6 @@ func (h *Handler) ListTasks(w http.ResponseWriter, r *http.Request) { return } - // List all tasks tasks, err := h.manager.List(r.Context()) if err != nil { klog.ErrorS(err, "failed to list tasks") @@ -187,7 +175,6 @@ func (h *Handler) ListTasks(w http.ResponseWriter, r *http.Request) { return } - // Convert to API model response := make([]api.Task, 0, len(tasks)) for _, task := range tasks { if task != nil { @@ -199,7 +186,6 @@ func (h *Handler) ListTasks(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(response) } -// Health returns the health status of the task executor func (h *Handler) Health(w http.ResponseWriter, r *http.Request) { response := map[string]string{ "status": "healthy", @@ -221,7 +207,6 @@ func (h *Handler) DeleteTask(w http.ResponseWriter, r *http.Request) { return } - // Delete task err := h.manager.Delete(r.Context(), taskID) if err != nil { klog.ErrorS(err, "failed to delete task", "id", taskID) @@ -233,7 +218,6 @@ func (h *Handler) DeleteTask(w http.ResponseWriter, r *http.Request) { klog.InfoS("task deleted via API", "id", taskID) } -// writeError writes an error response in JSON format. func writeError(w http.ResponseWriter, code int, message string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) @@ -243,7 +227,6 @@ func writeError(w http.ResponseWriter, code int, message string) { }) } -// convertAPIToInternalTask converts api.Task to types.Task. func (h *Handler) convertAPIToInternalTask(apiTask *api.Task) *types.Task { if apiTask == nil { return nil @@ -253,7 +236,6 @@ func (h *Handler) convertAPIToInternalTask(apiTask *api.Task) *types.Task { Process: apiTask.Process, PodTemplateSpec: apiTask.PodTemplateSpec, } - // Initialize default status task.Status = types.Status{ State: types.TaskStatePending, } @@ -261,7 +243,6 @@ func (h *Handler) convertAPIToInternalTask(apiTask *api.Task) *types.Task { return task } -// convertInternalToAPITask converts types.Task to api.Task. func convertInternalToAPITask(task *types.Task) *api.Task { if task == nil { return nil @@ -273,17 +254,15 @@ func convertInternalToAPITask(task *types.Task) *api.Task { PodTemplateSpec: task.PodTemplateSpec, } - // 1. Process Status Conversion if task.Process != nil && len(task.Status.SubStatuses) > 0 { sub := task.Status.SubStatuses[0] apiStatus := &api.ProcessStatus{} - // Handle Timeout state - map to Terminated with exitCode 137 if task.Status.State == types.TaskStateTimeout { term := &api.Terminated{ ExitCode: 137, - Reason: sub.Reason, // "TaskTimeout" - Message: sub.Message, // "Task exceeded timeout of X seconds" + Reason: sub.Reason, + Message: sub.Message, } if sub.StartedAt != nil { term.StartedAt = metav1.NewTime(*sub.StartedAt) @@ -291,7 +270,6 @@ func convertInternalToAPITask(task *types.Task) *api.Task { term.FinishedAt = metav1.Now() apiStatus.Terminated = term } else if sub.FinishedAt != nil { - // Terminated term := &api.Terminated{ ExitCode: int32(sub.ExitCode), Reason: sub.Reason, @@ -303,12 +281,10 @@ func convertInternalToAPITask(task *types.Task) *api.Task { } apiStatus.Terminated = term } else if sub.StartedAt != nil { - // Running apiStatus.Running = &api.Running{ StartedAt: metav1.NewTime(*sub.StartedAt), } } else { - // Waiting apiStatus.Waiting = &api.Waiting{ Reason: sub.Reason, Message: sub.Message, @@ -317,10 +293,8 @@ func convertInternalToAPITask(task *types.Task) *api.Task { apiTask.ProcessStatus = apiStatus } - // 2. Pod Status Conversion if task.PodTemplateSpec != nil { podStatus := &corev1.PodStatus{ - // Default phase mapping Phase: corev1.PodUnknown, } diff --git a/kubernetes/internal/task-executor/storage/file_store.go b/kubernetes/internal/task-executor/storage/file_store.go index 936830b2..b1c0bc45 100644 --- a/kubernetes/internal/task-executor/storage/file_store.go +++ b/kubernetes/internal/task-executor/storage/file_store.go @@ -33,7 +33,6 @@ type fileStore struct { locks sync.Map // key: taskName, value: *sync.RWMutex } -// NewFileStore creates a new file-based task store. func NewFileStore(dataDir string) (TaskStore, error) { if dataDir == "" { return nil, fmt.Errorf("dataDir cannot be empty") @@ -56,13 +55,11 @@ func NewFileStore(dataDir string) (TaskStore, error) { }, nil } -// getTaskLock retrieves or creates a lock for a specific task. func (s *fileStore) getTaskLock(name string) *sync.RWMutex { val, _ := s.locks.LoadOrStore(name, &sync.RWMutex{}) return val.(*sync.RWMutex) } -// Create persists a new task to disk. func (s *fileStore) Create(ctx context.Context, task *types.Task) error { if task == nil { return fmt.Errorf("task cannot be nil") @@ -97,7 +94,6 @@ func (s *fileStore) Create(ctx context.Context, task *types.Task) error { return nil } -// Update updates an existing task's runtime information. func (s *fileStore) Update(ctx context.Context, task *types.Task) error { if task == nil { return fmt.Errorf("task cannot be nil") @@ -120,16 +116,14 @@ func (s *fileStore) Update(ctx context.Context, task *types.Task) error { return fmt.Errorf("task %s does not exist", task.Name) } - // Write task data if err := s.writeTaskFile(taskDir, task); err != nil { return err } - klog.InfoS("updated task", "name", task.Name) + klog.V(2).InfoS("updated task", "name", task.Name, "state", task.Status.State) return nil } -// Get retrieves a task by name. func (s *fileStore) Get(ctx context.Context, name string) (*types.Task, error) { if name == "" { return nil, fmt.Errorf("task name cannot be empty") @@ -152,11 +146,7 @@ func (s *fileStore) Get(ctx context.Context, name string) (*types.Task, error) { return s.readTaskFile(taskDir, name) } -// List returns all tasks in the store. func (s *fileStore) List(ctx context.Context) ([]*types.Task, error) { - // Read all task directories - // Note: We don't have a global lock, so the list of tasks might change during iteration. - // This is acceptable for a file-based store. entries, err := os.ReadDir(s.dataDir) if err != nil { return nil, fmt.Errorf("failed to read data directory: %w", err) @@ -175,7 +165,6 @@ func (s *fileStore) List(ctx context.Context) ([]*types.Task, error) { continue } - // Acquire read lock for this specific task mu := s.getTaskLock(taskName) mu.RLock() task, err := s.readTaskFile(taskDir, taskName) @@ -192,7 +181,6 @@ func (s *fileStore) List(ctx context.Context) ([]*types.Task, error) { return tasks, nil } -// Delete removes a task from the store. func (s *fileStore) Delete(ctx context.Context, name string) error { if name == "" { return fmt.Errorf("task name cannot be empty") @@ -213,7 +201,6 @@ func (s *fileStore) Delete(ctx context.Context, name string) error { return nil } - // Remove task directory if err := os.RemoveAll(taskDir); err != nil { return fmt.Errorf("failed to delete task %s: %w", name, err) } @@ -222,14 +209,12 @@ func (s *fileStore) Delete(ctx context.Context, name string) error { return nil } -// getTaskFilePath returns the file path for a task's JSON file. func (s *fileStore) getTaskFilePath(taskDir string) string { return filepath.Join(taskDir, "task.json") } -// writeTaskFile writes task data to disk atomically using temp file + rename. +// writeTaskFile writes task data to disk atomically func (s *fileStore) writeTaskFile(taskDir string, task *types.Task) error { - // Marshal to JSON data, err := json.MarshalIndent(task, "", " ") if err != nil { return fmt.Errorf("failed to marshal task: %w", err) @@ -238,12 +223,10 @@ func (s *fileStore) writeTaskFile(taskDir string, task *types.Task) error { taskFile := s.getTaskFilePath(taskDir) tmpFile := taskFile + ".tmp" - // Write to temporary file if err := os.WriteFile(tmpFile, data, 0644); err != nil { return fmt.Errorf("failed to write temp file: %w", err) } - // Sync to ensure data is written to disk f, err := os.Open(tmpFile) if err != nil { os.Remove(tmpFile) @@ -256,7 +239,6 @@ func (s *fileStore) writeTaskFile(taskDir string, task *types.Task) error { } f.Close() - // Atomically rename temp file to final file if err := os.Rename(tmpFile, taskFile); err != nil { os.Remove(tmpFile) return fmt.Errorf("failed to rename temp file: %w", err) @@ -265,17 +247,14 @@ func (s *fileStore) writeTaskFile(taskDir string, task *types.Task) error { return nil } -// readTaskFile reads task data from disk. func (s *fileStore) readTaskFile(taskDir, taskName string) (*types.Task, error) { taskFile := s.getTaskFilePath(taskDir) - // Read file data, err := os.ReadFile(taskFile) if err != nil { return nil, fmt.Errorf("failed to read task file: %w", err) } - // Unmarshal JSON var task types.Task if err := json.Unmarshal(data, &task); err != nil { return nil, fmt.Errorf("failed to unmarshal task file: %w", err)