From 55964decd7ccb1ed77bfc1cce139540d13831f33 Mon Sep 17 00:00:00 2001 From: Mitch Denny Date: Mon, 4 May 2026 21:15:36 +1000 Subject: [PATCH 1/5] Add Windows PTY support for executables (HMP v1 over UDS) Phase 3 of Aspire WithTerminal end-to-end. Adds DCP-side terminal support for Executable resources on Windows via ConPTY. The PTY-attached process is reachable from the Aspire terminal host through a Hex1b Multiplex Protocol v1 server on a per-replica Unix domain socket. API: - api/v1/terminal_types.go: TerminalSpec (Enabled, UDSPath, Cols, Rows) - api/v1/executable_types.go: ExecutableSpec.Terminal *TerminalSpec Implementation: - internal/hmp1/: HMP v1 server (Hello/StateSync/Output/Input/Resize/Exit) - internal/exerunners/pty_windows.go: ConPTY wrapper via UserExistsError/conpty - internal/exerunners/pty_other.go: non-Windows stub returning ErrTerminalNotSupported - internal/exerunners/terminal_session.go: per-executable UDS listener + accept loop, single-viewer model with graceful shutdown that flushes the Exit frame - internal/exerunners/process_executable_runner.go: branches on Terminal.Enabled in StartRun and tears the session down in StopRun Tests: - internal/hmp1/server_test.go: 6 unit tests covering Hello/StateSync, output forwarding, input/resize delivery, Exit on PTY exit, oversize-frame rejection - internal/exerunners/terminal_session_windows_test.go: end-to-end smoke test that spawns cmd.exe under ConPTY, dials the UDS as an HMP v1 client, and asserts the full Hello -> StateSync -> Output -> Exit frame sequence with the literal echoed text and exit code 0 --- api/v1/executable_types.go | 13 + api/v1/terminal_types.go | 94 +++++ api/v1/zz_generated.deepcopy.go | 5 + go.mod | 1 + go.sum | 3 + .../exerunners/process_executable_runner.go | 22 ++ .../process_executable_runner_terminal.go | 120 ++++++ internal/exerunners/pty.go | 46 +++ internal/exerunners/pty_other.go | 21 ++ internal/exerunners/pty_windows.go | 140 +++++++ internal/exerunners/terminal_session.go | 279 ++++++++++++++ .../terminal_session_windows_test.go | 199 ++++++++++ internal/hmp1/server.go | 295 +++++++++++++++ internal/hmp1/server_test.go | 355 ++++++++++++++++++ 14 files changed, 1593 insertions(+) create mode 100644 api/v1/terminal_types.go create mode 100644 internal/exerunners/process_executable_runner_terminal.go create mode 100644 internal/exerunners/pty.go create mode 100644 internal/exerunners/pty_other.go create mode 100644 internal/exerunners/pty_windows.go create mode 100644 internal/exerunners/terminal_session.go create mode 100644 internal/exerunners/terminal_session_windows_test.go create mode 100644 internal/hmp1/server.go create mode 100644 internal/hmp1/server_test.go diff --git a/api/v1/executable_types.go b/api/v1/executable_types.go index 765935d8..8a0cda61 100644 --- a/api/v1/executable_types.go +++ b/api/v1/executable_types.go @@ -261,6 +261,13 @@ type ExecutableSpec struct { // PEM formatted certificates to be written for the Executable // +optional PemCertificates *ExecutablePemCertificates `json:"pemCertificates,omitempty"` + + // Terminal, when non-nil and Enabled, allocates a pseudo-terminal for the + // Executable's process and exposes an HMP v1 producer endpoint that the + // Aspire terminal host connects to as a client. See TerminalSpec for + // details. + // +optional + Terminal *TerminalSpec `json:"terminal,omitempty"` } func (es ExecutableSpec) Equal(other ExecutableSpec) bool { @@ -314,6 +321,10 @@ func (es ExecutableSpec) Equal(other ExecutableSpec) bool { return false } + if !es.Terminal.Equal(other.Terminal) { + return false + } + return true } @@ -355,6 +366,8 @@ func (es ExecutableSpec) Validate(specPath *field.Path) field.ErrorList { errorList = append(errorList, es.PemCertificates.Validate(specPath.Child("pemCertificates"))...) + errorList = append(errorList, es.Terminal.Validate(specPath.Child("terminal"))...) + return errorList } diff --git a/api/v1/terminal_types.go b/api/v1/terminal_types.go new file mode 100644 index 00000000..8ad2eecc --- /dev/null +++ b/api/v1/terminal_types.go @@ -0,0 +1,94 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package v1 + +import ( + "k8s.io/apimachinery/pkg/util/validation/field" +) + +// TerminalSpec configures pseudo-terminal allocation for an Executable or +// Container replica and the HMP v1 producer endpoint that the Aspire terminal +// host connects to as a client. +// +// When Enabled is true, DCP allocates a PTY for the underlying process and +// listens on UDSPath (a Unix Domain Socket path on Linux/macOS, or a named pipe +// path on Windows in a follow-up). When the terminal host opens an HMP v1 +// connection, DCP starts an HMP v1 server on the connection and bridges: +// +// - PTY output (from the process's tty) -> HMP v1 Output frames +// - HMP v1 Input frames -> PTY input (process stdin) +// - HMP v1 Resize frames -> PTY resize (TIOCSWINSZ / ResizePseudoConsole) +// - Process exit -> HMP v1 Exit frame, then close +// +// The HMP v1 wire format is defined by the Aspire dashboard's terminal host +// (see Hex1b's Hmp1Protocol). DCP's responsibility is limited to PTY +// allocation, the listener, and frame translation. +type TerminalSpec struct { + // Enabled controls whether DCP allocates a PTY for the process and exposes + // an HMP v1 producer endpoint at UDSPath. + Enabled bool `json:"enabled,omitempty"` + + // UDSPath is the Unix Domain Socket path that DCP listens on for the + // terminal host's HMP v1 client connection. Required when Enabled is true. + UDSPath string `json:"udsPath,omitempty"` + + // Cols is the initial width of the pseudo-terminal in character columns. + // If zero, a sensible default (80) is used. + // +kubebuilder:default:=0 + Cols int32 `json:"cols,omitempty"` + + // Rows is the initial height of the pseudo-terminal in character rows. + // If zero, a sensible default (24) is used. + // +kubebuilder:default:=0 + Rows int32 `json:"rows,omitempty"` +} + +// Equal reports whether two TerminalSpec values are equal. +func (ts *TerminalSpec) Equal(other *TerminalSpec) bool { + if ts == other { + return true + } + if ts == nil || other == nil { + return false + } + return ts.Enabled == other.Enabled && + ts.UDSPath == other.UDSPath && + ts.Cols == other.Cols && + ts.Rows == other.Rows +} + +// Validate verifies the TerminalSpec content. +func (ts *TerminalSpec) Validate(specPath *field.Path) field.ErrorList { + errorList := field.ErrorList{} + if ts == nil { + return errorList + } + if ts.Enabled && ts.UDSPath == "" { + errorList = append(errorList, field.Invalid(specPath.Child("udsPath"), ts.UDSPath, "udsPath is required when Enabled is true.")) + } + if ts.Cols < 0 { + errorList = append(errorList, field.Invalid(specPath.Child("cols"), ts.Cols, "cols must be non-negative.")) + } + if ts.Rows < 0 { + errorList = append(errorList, field.Invalid(specPath.Child("rows"), ts.Rows, "rows must be non-negative.")) + } + return errorList +} + +// DeepCopyInto copies the receiver, writing into out. +func (in *TerminalSpec) DeepCopyInto(out *TerminalSpec) { + *out = *in +} + +// DeepCopy returns a deep copy of the TerminalSpec. +func (in *TerminalSpec) DeepCopy() *TerminalSpec { + if in == nil { + return nil + } + out := new(TerminalSpec) + in.DeepCopyInto(out) + return out +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b516e432..6e8fcc36 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -1257,6 +1257,11 @@ func (in *ExecutableSpec) DeepCopyInto(out *ExecutableSpec) { *out = new(ExecutablePemCertificates) (*in).DeepCopyInto(*out) } + if in.Terminal != nil { + in, out := &in.Terminal, &out.Terminal + *out = new(TerminalSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExecutableSpec. diff --git a/go.mod b/go.mod index a3f9f626..90ea4cd6 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/microsoft/dcp go 1.26.1 require ( + github.com/UserExistsError/conpty v0.1.4 github.com/cenkalti/backoff/v4 v4.3.0 github.com/davidwartell/go-onecontext v1.0.2 github.com/emirpasic/gods v1.18.1 diff --git a/go.sum b/go.sum index efe3747c..0b432f7a 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1 github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= +github.com/UserExistsError/conpty v0.1.4 h1:+3FhJhiqhyEJa+K5qaK3/w6w+sN3Nh9O9VbJyBS02to= +github.com/UserExistsError/conpty v0.1.4/go.mod h1:PDglKIkX3O/2xVk0MV9a6bCWxRmPVfxqZoTG/5sSd9I= github.com/akavel/rsrc v0.10.2 h1:Zxm8V5eI1hW4gGaYsJQUhxpjkENuG91ki8B4zCrvEsw= github.com/akavel/rsrc v0.10.2/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/akutz/memconn v0.1.0 h1:NawI0TORU4hcOMsMr11g7vwlCdkYeLKXBcxWu2W/P8A= @@ -305,6 +307,7 @@ golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= diff --git a/internal/exerunners/process_executable_runner.go b/internal/exerunners/process_executable_runner.go index d37982c5..4876ae46 100644 --- a/internal/exerunners/process_executable_runner.go +++ b/internal/exerunners/process_executable_runner.go @@ -35,6 +35,11 @@ type processRunState struct { stdOutFile *os.File stdErrFile *os.File cmdInfo string // Command line used to start the process, for logging purposes + + // terminalSession is non-nil when the run was started under a PTY via + // startTerminalRun. It owns the PTY and the HMP v1 listener; closing it + // terminates both. + terminalSession *terminalSession } type ProcessExecutableRunner struct { @@ -55,6 +60,10 @@ func (r *ProcessExecutableRunner) StartRun( runChangeHandler controllers.RunChangeHandler, log logr.Logger, ) *controllers.ExecutableStartResult { + if exe.Spec.Terminal != nil && exe.Spec.Terminal.Enabled { + return r.startTerminalRun(ctx, exe, runChangeHandler, log) + } + cmd := makeCommand(exe) if osutil.IsWindows() { // On Windows we have seen some apps (e.g. Python uvicorn runner) sending Ctrl-C to the whole console group @@ -158,6 +167,19 @@ func (r *ProcessExecutableRunner) StopRun(ctx context.Context, runID controllers stopLog := log.WithValues("RunID", runID, "Command", runState.cmdInfo) stopLog.V(1).Info("Stopping run...") + // Terminal-attached runs don't go through the OSExecutor and don't have + // an entry in the underlying waiter table. Closing the terminal session + // closes the PTY master, which delivers SIGHUP / detaches the ConPTY and + // causes the child process to exit on its own. We then return without + // touching the OSExecutor stop machinery. + if runState.terminalSession != nil { + closeErr := runState.terminalSession.Close() + if closeErr != nil { + stopLog.Error(closeErr, "Failed to close terminal session cleanly") + } + return closeErr + } + // We want to make progress eventually, so we don't want to wait indefinitely for the process to stop. const ProcessStopTimeout = 15 * time.Second diff --git a/internal/exerunners/process_executable_runner_terminal.go b/internal/exerunners/process_executable_runner_terminal.go new file mode 100644 index 00000000..7152b29e --- /dev/null +++ b/internal/exerunners/process_executable_runner_terminal.go @@ -0,0 +1,120 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package exerunners + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apiv1 "github.com/microsoft/dcp/api/v1" + "github.com/microsoft/dcp/controllers" + "github.com/microsoft/dcp/pkg/pointers" + "github.com/microsoft/dcp/pkg/process" +) + +// startTerminalRun is the PTY-attached counterpart to the regular StartRun +// flow. It allocates a pseudo-terminal, starts the executable inside it, and +// stands up an HMP v1 listener at exe.Spec.Terminal.UDSPath. +// +// On success the returned result has ExeState=Running, Pid set, and a +// StartWaitForRunCompletion function that, when invoked, fires the eventual +// OnRunCompleted callback once the underlying process exits. +func (r *ProcessExecutableRunner) startTerminalRun( + ctx context.Context, + exe *apiv1.Executable, + runChangeHandler controllers.RunChangeHandler, + log logr.Logger, +) *controllers.ExecutableStartResult { + startLog := log.WithValues("Cmd", exe.Spec.ExecutablePath, "Args", exe.Status.EffectiveArgs, "Terminal", true, "UDSPath", exe.Spec.Terminal.UDSPath) + startLog.Info("Starting process under PTY...") + + result := controllers.NewExecutableStartResult() + + tp, err := startTerminalProcess(ctx, exe) + if err != nil { + startLog.Error(err, "Failed to start process under PTY") + result.CompletionTimestamp = metav1.NowMicro() + result.ExeState = apiv1.ExecutableStateFailedToStart + result.StartupError = err + runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result) + return result + } + + session, err := startTerminalSession(ctx, exe, tp, startLog) + if err != nil { + startLog.Error(err, "Failed to start terminal session listener") + result.CompletionTimestamp = metav1.NowMicro() + result.ExeState = apiv1.ExecutableStateFailedToStart + result.StartupError = err + runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result) + return result + } + + pid := process.Pid_t(tp.pid) + identityTime := time.Now() + runID := pidToRunID(pid) + + r.runningProcesses.Store(runID, &processRunState{ + identityTime: identityTime, + cmdInfo: exe.Spec.ExecutablePath, + terminalSession: session, + }) + + result.RunID = runID + pointers.SetValue(&result.Pid, int64(pid)) + result.ExeState = apiv1.ExecutableStateRunning + result.CompletionTimestamp = metav1.NowMicro() + + // We arm the run-completion watcher here, but it must not fire until the + // caller has invoked StartWaitForRunCompletion (see the contract on + // RunChangeHandler.OnStartupCompleted). + var armed atomic.Bool + armCh := make(chan struct{}) + result.StartWaitForRunCompletion = func() { + if armed.CompareAndSwap(false, true) { + close(armCh) + } + } + + go func() { + // Wait until the controller is ready to accept OnRunCompleted, or + // until the parent context is cancelled. + select { + case <-armCh: + case <-ctx.Done(): + // Even if the controller never armed us, drain the session so we + // don't leak goroutines. + session.Close() + <-session.Done() + return + } + + <-session.Done() + + // Pull the real exit code from the session, if we have one. + var exitCode *int32 + if state, found := r.runningProcesses.LoadAndDelete(runID); found && state.terminalSession != nil { + if code, ok := state.terminalSession.ExitCode(); ok { + ec := code + exitCode = &ec + } + } + + var runErr error + if errors.Is(ctx.Err(), context.Canceled) { + runErr = ctx.Err() + } + runChangeHandler.OnRunCompleted(runID, exitCode, runErr) + }() + + runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result) + return result +} diff --git a/internal/exerunners/pty.go b/internal/exerunners/pty.go new file mode 100644 index 00000000..f3ad83c9 --- /dev/null +++ b/internal/exerunners/pty.go @@ -0,0 +1,46 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package exerunners + +import ( + "context" + "errors" + + apiv1 "github.com/microsoft/dcp/api/v1" + "github.com/microsoft/dcp/internal/hmp1" +) + +// ErrTerminalNotSupported is returned when terminal/PTY support is requested +// for an Executable on a platform where DCP does not yet implement it. The +// initial slice (Aspire 13.4) supports Windows only via ConPTY; Linux/macOS +// support follows in a subsequent change. +var ErrTerminalNotSupported = errors.New("terminal/PTY support is not implemented on this platform") + +// terminalProcess represents a process running attached to a pseudo-terminal, +// plus the PTY interface used by the HMP v1 server to bridge it to the Aspire +// terminal host. +type terminalProcess struct { + // pty is the ReadWriteCloser exposing the PTY master end. Implements + // hmp1.PTY (Read, Write, Resize, Close). + pty hmp1.PTY + + // pid is the OS process id of the spawned process. + pid int + + // waitExit blocks until the process exits and returns the exit code. + // Called at most once. + waitExit func() int32 +} + +// startTerminalProcess allocates a pseudo-terminal and starts the Executable +// attached to it. The returned terminalProcess must be Closed by the caller +// when the process exits or when the run is being stopped. +// +// The implementation is platform-specific. See pty_windows.go and +// pty_other.go. +func startTerminalProcess(ctx context.Context, exe *apiv1.Executable) (*terminalProcess, error) { + return startTerminalProcessImpl(ctx, exe) +} diff --git a/internal/exerunners/pty_other.go b/internal/exerunners/pty_other.go new file mode 100644 index 00000000..60474c18 --- /dev/null +++ b/internal/exerunners/pty_other.go @@ -0,0 +1,21 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +//go:build !windows + +package exerunners + +import ( + "context" + + apiv1 "github.com/microsoft/dcp/api/v1" +) + +// startTerminalProcessImpl is the non-Windows fallback. The initial slice +// (Aspire 13.4) implements PTY support on Windows via ConPTY only; Linux and +// macOS support is tracked as follow-up work. +func startTerminalProcessImpl(_ context.Context, _ *apiv1.Executable) (*terminalProcess, error) { + return nil, ErrTerminalNotSupported +} diff --git a/internal/exerunners/pty_windows.go b/internal/exerunners/pty_windows.go new file mode 100644 index 00000000..cf481a78 --- /dev/null +++ b/internal/exerunners/pty_windows.go @@ -0,0 +1,140 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +//go:build windows + +package exerunners + +import ( + "context" + "fmt" + "strings" + + "github.com/UserExistsError/conpty" + apiv1 "github.com/microsoft/dcp/api/v1" +) + +// conPtyAdapter wraps a *conpty.ConPty so it satisfies hmp1.PTY (specifically +// the Resize signature: cols, rows int). +type conPtyAdapter struct { + cp *conpty.ConPty +} + +func (a *conPtyAdapter) Read(p []byte) (int, error) { return a.cp.Read(p) } +func (a *conPtyAdapter) Write(p []byte) (int, error) { return a.cp.Write(p) } +func (a *conPtyAdapter) Close() error { return a.cp.Close() } +func (a *conPtyAdapter) Resize(cols, rows int) error { return a.cp.Resize(cols, rows) } + +// startTerminalProcessImpl allocates a Windows ConPTY, spawns the executable +// attached to it, and returns a terminalProcess. The caller is responsible for +// calling pty.Close() on shutdown. +func startTerminalProcessImpl(ctx context.Context, exe *apiv1.Executable) (*terminalProcess, error) { + if !conpty.IsConPtyAvailable() { + return nil, fmt.Errorf("ConPTY is not available on this Windows version: %w", conpty.ErrConPtyUnsupported) + } + + commandLine := buildWindowsCommandLine(exe.Spec.ExecutablePath, exe.Status.EffectiveArgs) + + envBlock := make([]string, 0, len(exe.Status.EffectiveEnv)) + for _, e := range exe.Status.EffectiveEnv { + envBlock = append(envBlock, fmt.Sprintf("%s=%s", e.Name, e.Value)) + } + + cols, rows := terminalDimensions(exe.Spec.Terminal) + + options := []conpty.ConPtyOption{ + conpty.ConPtyDimensions(cols, rows), + conpty.ConPtyEnv(envBlock), + } + if exe.Spec.WorkingDirectory != "" { + options = append(options, conpty.ConPtyWorkDir(exe.Spec.WorkingDirectory)) + } + + cp, err := conpty.Start(commandLine, options...) + if err != nil { + return nil, fmt.Errorf("failed to start process under ConPTY: %w", err) + } + + return &terminalProcess{ + pty: &conPtyAdapter{cp: cp}, + pid: cp.Pid(), + waitExit: func() int32 { + exitCode, waitErr := cp.Wait(context.Background()) + if waitErr != nil { + // Wait failures are best-effort: the connection's about to + // close anyway. Surface UnknownExitCode to make the situation + // visible in the terminal host. + return -1 + } + return int32(exitCode) + }, + }, nil +} + +// buildWindowsCommandLine constructs a single command-line string suitable for +// CreateProcessW from a path + argv. Each token is wrapped in quotes and +// embedded quotes are escaped per the documented Windows argv parsing rules. +func buildWindowsCommandLine(executablePath string, args []string) string { + var sb strings.Builder + sb.WriteString(quoteWindowsArg(executablePath)) + for _, a := range args { + sb.WriteByte(' ') + sb.WriteString(quoteWindowsArg(a)) + } + return sb.String() +} + +// quoteWindowsArg quotes a single argument per the rules CommandLineToArgvW +// uses. Empty strings, strings containing whitespace, or strings containing +// quotes get wrapped in double quotes; backslashes preceding a quote are +// doubled. +func quoteWindowsArg(arg string) string { + if arg == "" { + return `""` + } + if !strings.ContainsAny(arg, " \t\n\v\"") { + return arg + } + var sb strings.Builder + sb.WriteByte('"') + backslashes := 0 + for _, r := range arg { + switch r { + case '\\': + backslashes++ + case '"': + for i := 0; i < backslashes*2; i++ { + sb.WriteByte('\\') + } + backslashes = 0 + sb.WriteString(`\"`) + default: + for i := 0; i < backslashes; i++ { + sb.WriteByte('\\') + } + backslashes = 0 + sb.WriteRune(r) + } + } + for i := 0; i < backslashes*2; i++ { + sb.WriteByte('\\') + } + sb.WriteByte('"') + return sb.String() +} + +func terminalDimensions(t *apiv1.TerminalSpec) (cols, rows int) { + cols, rows = 80, 24 + if t == nil { + return + } + if t.Cols > 0 { + cols = int(t.Cols) + } + if t.Rows > 0 { + rows = int(t.Rows) + } + return +} diff --git a/internal/exerunners/terminal_session.go b/internal/exerunners/terminal_session.go new file mode 100644 index 00000000..26bd5593 --- /dev/null +++ b/internal/exerunners/terminal_session.go @@ -0,0 +1,279 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package exerunners + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/go-logr/logr" + + apiv1 "github.com/microsoft/dcp/api/v1" + "github.com/microsoft/dcp/internal/hmp1" +) + +// terminalSession owns the lifecycle of a single Executable's pseudo-terminal +// listener. It listens on the configured UDS path, accepts at most one viewer +// connection at a time, and runs an HMP v1 server on each connection, +// bridging the connection to the underlying PTY. +// +// When the underlying process exits the session sends a final Exit frame to +// any connected viewer (via hmp1.Serve's waitExit hook), then tears down the +// listener. Calling Close() forces the same teardown. +type terminalSession struct { + log logr.Logger + listener net.Listener + udsPath string + + tp *terminalProcess + + mu sync.Mutex + currConn net.Conn + closed atomic.Bool + + // Tracks active handleConn goroutines so graceful shutdown can wait for + // in-flight HMP1 sessions to flush their Exit frame before closing the + // listener. + connWg sync.WaitGroup + + // exitDone is closed once the underlying PTY-attached process has exited; + // exitCode then holds the observed exit code. This is the signal used by + // each handleConn's waitExit hook so that hmp1.Serve sends the real exit + // code on the per-connection Exit frame. + exitOnce sync.Once + exitCode atomic.Int32 + exitDone chan struct{} + + doneCh chan struct{} +} + +// startTerminalSession opens a listener at exe.Spec.Terminal.UDSPath, then +// spawns an accept loop that runs hmp1.Serve on each accepted connection. +// The PTY-attached process must already be running (passed via tp); the +// session owns tp from this point forward and will Close it on shutdown. +func startTerminalSession(ctx context.Context, exe *apiv1.Executable, tp *terminalProcess, log logr.Logger) (*terminalSession, error) { + udsPath := exe.Spec.Terminal.UDSPath + // Best-effort cleanup of a stale socket file from a previous run. + if _, statErr := os.Stat(udsPath); statErr == nil { + _ = os.Remove(udsPath) + } + + lc := net.ListenConfig{} + lis, err := lc.Listen(ctx, "unix", udsPath) + if err != nil { + _ = tp.pty.Close() + return nil, fmt.Errorf("failed to listen on terminal UDS %q: %w", udsPath, err) + } + + s := &terminalSession{ + log: log.WithValues("UDSPath", udsPath, "PID", tp.pid), + listener: lis, + udsPath: udsPath, + tp: tp, + exitDone: make(chan struct{}), + doneCh: make(chan struct{}), + } + + go s.acceptLoop(ctx, exe.Spec.Terminal) + go s.watchExit() + + s.log.Info("Terminal session listening") + return s, nil +} + +// acceptLoop accepts incoming connections one at a time. If a new connection +// arrives while a previous one is still active, the previous connection is +// closed (last-writer-wins), matching the single-viewer model of the initial +// slice. +func (s *terminalSession) acceptLoop(ctx context.Context, spec *apiv1.TerminalSpec) { + for { + conn, err := s.listener.Accept() + if err != nil { + if !s.closed.Load() && !errors.Is(err, net.ErrClosed) { + s.log.Error(err, "Terminal session accept loop terminated unexpectedly") + } + return + } + + // Take over the "current connection" slot, closing any prior one. + s.mu.Lock() + if prev := s.currConn; prev != nil { + s.log.V(1).Info("Closing previous terminal viewer connection in favor of new one") + _ = prev.Close() + } + s.currConn = conn + s.mu.Unlock() + + s.connWg.Add(1) + go s.handleConn(ctx, conn, spec) + } +} + +func (s *terminalSession) handleConn(ctx context.Context, conn net.Conn, spec *apiv1.TerminalSpec) { + defer s.connWg.Done() + defer func() { + _ = conn.Close() + s.mu.Lock() + if s.currConn == conn { + s.currConn = nil + } + s.mu.Unlock() + }() + + cols, rows := terminalDimensionsForServe(spec) + opts := hmp1.ServerOptions{ + InitialCols: cols, + InitialRows: rows, + Log: s.log, + } + + // waitExit is invoked by hmp1.Serve after its PTY read pump returns + // (i.e. when pty.Close has been called or an upstream EOF was observed). + // It blocks until the underlying process exit has been signalled and + // returns the observed exit code, so the per-connection Exit frame + // carries the real code rather than a synthetic 0. + if serveErr := hmp1.Serve(ctx, conn, s.tp.pty, s.waitProcessExit, opts); serveErr != nil { + s.log.V(1).Info("HMP v1 serve exited with error", "err", serveErr.Error()) + } +} + +// signalProcessExit publishes the process exit code; subsequent calls to +// waitProcessExit return immediately. Safe to call multiple times; only the +// first call wins. +func (s *terminalSession) signalProcessExit(code int32) { + s.exitOnce.Do(func() { + s.exitCode.Store(code) + close(s.exitDone) + }) +} + +// waitProcessExit blocks until the underlying process has exited, then +// returns the observed exit code. Used as the waitExit callback for hmp1.Serve. +func (s *terminalSession) waitProcessExit() int32 { + <-s.exitDone + return s.exitCode.Load() +} + +// watchExit blocks until the PTY process exits, then performs a graceful +// teardown: it publishes the exit code, closes the PTY (so any in-flight +// hmp1.Serve invocation drains and sends its Exit frame), waits a bounded +// amount of time for those handlers to finish, then closes the listener and +// removes the UDS file. +func (s *terminalSession) watchExit() { + defer close(s.doneCh) + exitCode := s.tp.waitExit() + s.log.Info("Terminal-attached process exited", "exitCode", exitCode) + s.signalProcessExit(exitCode) + + // Close the PTY to wake up any blocked Serve PTY pump. After this, Serve + // will drain, call waitProcessExit (already unblocked), write the Exit + // frame, and return. + if s.tp != nil && s.tp.pty != nil { + _ = s.tp.pty.Close() + } + + // Give in-flight HMP v1 handlers a moment to flush their Exit frame + // before we slam the listener shut. + s.waitConnsOrTimeout(2 * time.Second) + + if !s.closed.CompareAndSwap(false, true) { + return + } + _ = s.listener.Close() + _ = os.Remove(s.udsPath) +} + +// waitConnsOrTimeout blocks until all in-flight handleConn goroutines have +// finished, or until the timeout elapses, whichever comes first. +func (s *terminalSession) waitConnsOrTimeout(timeout time.Duration) { + done := make(chan struct{}) + go func() { + s.connWg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(timeout): + s.log.V(1).Info("Timed out waiting for HMP v1 handlers to drain; forcing teardown") + } +} + +// ExitCode returns the observed process exit code if the underlying process +// has exited; the second return value is true if the exit has been observed, +// false otherwise. Safe to call concurrently with watchExit. +func (s *terminalSession) ExitCode() (int32, bool) { + select { + case <-s.exitDone: + return s.exitCode.Load(), true + default: + return 0, false + } +} + +// Done returns a channel that is closed once the underlying process exits and +// the session has finished cleaning up. +func (s *terminalSession) Done() <-chan struct{} { + return s.doneCh +} + +// Close stops the listener, closes any active connection, closes the PTY +// master, and removes the UDS file. Safe to call multiple times. +// +// Unlike watchExit's graceful path, Close interrupts in-flight HMP v1 handlers +// immediately. Use it when an external caller (e.g. controller-runtime +// reconciliation tearing down the run) wants the listener gone NOW. +func (s *terminalSession) Close() error { + if !s.closed.CompareAndSwap(false, true) { + return nil + } + + // Best-effort: publish a 0 exit code if we don't have a real one yet, so + // any in-flight Serve invocations don't block forever inside waitExit. + s.signalProcessExit(0) + + var firstErr error + if err := s.listener.Close(); err != nil { + firstErr = err + } + + s.mu.Lock() + conn := s.currConn + s.currConn = nil + s.mu.Unlock() + if conn != nil { + _ = conn.Close() + } + + if s.tp != nil && s.tp.pty != nil { + if err := s.tp.pty.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + + // Best-effort UDS file cleanup. + _ = os.Remove(s.udsPath) + return firstErr +} + +func terminalDimensionsForServe(spec *apiv1.TerminalSpec) (cols, rows int) { + cols, rows = 80, 24 + if spec == nil { + return + } + if spec.Cols > 0 { + cols = int(spec.Cols) + } + if spec.Rows > 0 { + rows = int(spec.Rows) + } + return +} diff --git a/internal/exerunners/terminal_session_windows_test.go b/internal/exerunners/terminal_session_windows_test.go new file mode 100644 index 00000000..d1c70c75 --- /dev/null +++ b/internal/exerunners/terminal_session_windows_test.go @@ -0,0 +1,199 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +//go:build windows + +package exerunners + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/go-logr/logr/testr" + + apiv1 "github.com/microsoft/dcp/api/v1" + "github.com/microsoft/dcp/internal/hmp1" +) + +// TestTerminalSessionEndToEndWindows is the slice's headline integration test: +// it spawns a real cmd.exe process under ConPTY, listens for an HMP v1 client +// on a UDS, connects from the test as a fake terminal-host client, and asserts +// the full Hello -> StateSync -> Output -> Exit frame sequence is observed +// with sensible content. +func TestTerminalSessionEndToEndWindows(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end ConPTY test in short mode") + } + + log := testr.New(t) + + udsPath := filepath.Join(t.TempDir(), "term.sock") + + exe := &apiv1.Executable{ + Spec: apiv1.ExecutableSpec{ + ExecutablePath: `C:\Windows\System32\cmd.exe`, + Terminal: &apiv1.TerminalSpec{ + Enabled: true, + UDSPath: udsPath, + Cols: 100, + Rows: 30, + }, + }, + Status: apiv1.ExecutableStatus{ + EffectiveArgs: []string{"/c", "echo hello-world-from-conpty && exit 0"}, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + tp, err := startTerminalProcess(ctx, exe) + if err != nil { + t.Fatalf("startTerminalProcess: %v", err) + } + + session, err := startTerminalSession(ctx, exe, tp, log) + if err != nil { + _ = tp.pty.Close() + t.Fatalf("startTerminalSession: %v", err) + } + defer session.Close() + + // Give the listener a moment to actually bind. acceptLoop runs in a + // goroutine; on Windows UDS a Dial too soon can race the Listen. + if err := waitForUDS(udsPath, 2*time.Second); err != nil { + t.Fatalf("waiting for UDS to appear: %v", err) + } + + conn, err := net.Dial("unix", udsPath) + if err != nil { + t.Fatalf("dial UDS: %v", err) + } + defer conn.Close() + + // Read frames until we see Exit (or hit a timeout). Collect all Output + // payloads so we can assert on their concatenation. + _ = conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + + var ( + sawHello bool + sawStateSync bool + sawExit bool + exitCode int32 + outputBuf bytes.Buffer + ) + +readLoop: + for { + frameType, payload, readErr := readClientFrame(conn) + if readErr != nil { + if errors.Is(readErr, io.EOF) { + break readLoop + } + t.Fatalf("read frame: %v", readErr) + } + switch frameType { + case hmp1.FrameHello: + sawHello = true + var p hmp1.HelloPayload + if err := json.Unmarshal(payload, &p); err != nil { + t.Fatalf("Hello payload was not valid JSON (%v): %q", err, string(payload)) + } + if p.Version != 1 { + t.Errorf("Hello.Version = %d, want 1", p.Version) + } + if p.Width != 100 || p.Height != 30 { + t.Errorf("Hello dimensions = %dx%d, want 100x30", p.Width, p.Height) + } + case hmp1.FrameStateSync: + sawStateSync = true + case hmp1.FrameOutput: + outputBuf.Write(payload) + case hmp1.FrameExit: + sawExit = true + if len(payload) != 4 { + t.Errorf("Exit payload length = %d, want 4", len(payload)) + } else { + exitCode = int32(binary.LittleEndian.Uint32(payload)) + } + break readLoop + default: + t.Logf("ignoring unexpected frame type 0x%02x len=%d", frameType, len(payload)) + } + } + + if !sawHello { + t.Errorf("did not receive Hello frame") + } + if !sawStateSync { + t.Errorf("did not receive StateSync frame") + } + if !sawExit { + t.Errorf("did not receive Exit frame") + } + if exitCode != 0 { + t.Errorf("exit code = %d, want 0", exitCode) + } + + // ConPTY output includes ANSI escape sequences plus the echoed command + // line (because ConPTY runs interactively). What we assert is that the + // literal payload string appears somewhere in the rendered output. + out := outputBuf.String() + if !strings.Contains(out, "hello-world-from-conpty") { + t.Errorf("expected Output to contain %q, got %q", "hello-world-from-conpty", out) + } + + // Wait for the session to finish tearing itself down. + select { + case <-session.Done(): + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for terminal session to finish") + } +} + +// readClientFrame reads a single HMP v1 frame from the client side of the +// connection (mirroring the server's readFrame helper but kept local to the +// test to avoid exporting internal helpers). +func readClientFrame(r net.Conn) (hmp1.FrameType, []byte, error) { + var header [5]byte + if _, err := io.ReadFull(r, header[:]); err != nil { + return 0, nil, err + } + t := hmp1.FrameType(header[0]) + length := binary.LittleEndian.Uint32(header[1:5]) + if length > hmp1.MaxPayloadLength { + return 0, nil, fmt.Errorf("frame length %d exceeds max %d", length, hmp1.MaxPayloadLength) + } + if length == 0 { + return t, nil, nil + } + payload := make([]byte, length) + if _, err := io.ReadFull(r, payload); err != nil { + return 0, nil, err + } + return t, payload, nil +} + +func waitForUDS(path string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if _, err := os.Stat(path); err == nil { + return nil + } + time.Sleep(20 * time.Millisecond) + } + return fmt.Errorf("UDS %q did not appear within %s", path, timeout) +} diff --git a/internal/hmp1/server.go b/internal/hmp1/server.go new file mode 100644 index 00000000..99b90510 --- /dev/null +++ b/internal/hmp1/server.go @@ -0,0 +1,295 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +// Package hmp1 implements the server side of the Hex1b Multiplex Protocol +// version 1 (HMP v1) that DCP uses to expose pseudo-terminal output to the +// Aspire terminal host. The wire format is intentionally minimal: +// +// [type:1B][length:4B little-endian][payload:length bytes] +// +// Frame types (the producer / "server" side that DCP plays): +// +// 0x01 Hello JSON: {"version":1,"width":W,"height":H} S->C +// 0x02 StateSync raw ANSI bytes (current screen) S->C +// 0x03 Output raw ANSI bytes from PTY S->C +// 0x04 Input raw bytes to deliver to PTY stdin C->S +// 0x05 Resize [width:4B LE][height:4B LE] bidirectional +// 0x06 Exit [exitCode:4B LE] S->C (then close) +// +// The protocol is a strict superset of what the Hex1b client needs to render +// a terminal session and forward keystrokes/resize. Any frames the server +// receives outside of {Input, Resize} are logged at debug level and dropped. +package hmp1 + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "sync" + "time" + + "github.com/go-logr/logr" +) + +// Maximum payload length (16 MiB). Matches the Hex1b implementation. +const MaxPayloadLength = 16 * 1024 * 1024 + +// FrameType is the 1-byte frame discriminator at the head of every HMP v1 frame. +type FrameType byte + +const ( + FrameHello FrameType = 0x01 + FrameStateSync FrameType = 0x02 + FrameOutput FrameType = 0x03 + FrameInput FrameType = 0x04 + FrameResize FrameType = 0x05 + FrameExit FrameType = 0x06 +) + +// HelloPayload is the JSON payload of a Hello frame. +type HelloPayload struct { + Version int `json:"version"` + Width int `json:"width"` + Height int `json:"height"` +} + +// PTY is the abstraction the server needs over the underlying pseudo-terminal. +// Implementations must be safe for concurrent use by separate goroutines for +// Read, Write, Resize, and Wait (each call site uses a single goroutine). +type PTY interface { + // Read reads bytes from the PTY's master side (i.e. process stdout/stderr). + // Should return io.EOF when the process exits and the PTY is closed. + Read(p []byte) (int, error) + + // Write writes bytes to the PTY's master side (i.e. process stdin). + Write(p []byte) (int, error) + + // Resize updates the PTY window size. + Resize(cols, rows int) error + + // Close closes the PTY master. + Close() error +} + +// ServerOptions configures a Server invocation. +type ServerOptions struct { + // InitialCols / InitialRows are the dimensions sent in the Hello frame. + // Reasonable defaults are applied if either is zero. + InitialCols int + InitialRows int + + // Log receives diagnostic messages from the server. May be the zero + // logr.Logger to discard. + Log logr.Logger +} + +// Serve runs an HMP v1 server on the given connection, bridging conn <-> pty +// until either the PTY exits, the connection closes, or the context is +// cancelled. The exitCode reported back to the client (via the Exit frame) is +// taken from waitExit; it is invoked exactly once, after the PTY's read loop +// returns, and may block until the underlying process exits. If waitExit is +// nil the Exit frame is sent with exit code 0. +// +// Serve takes ownership of conn for the duration of the call: it will close +// conn before returning, both to release any goroutine blocked on conn.Read +// and to signal the viewer that the session is over. It does NOT close pty; +// the caller owns the PTY lifetime so it can be reused across multiple Serve +// invocations (reconnects). +func Serve(ctx context.Context, conn net.Conn, pty PTY, waitExit func() int32, opts ServerOptions) error { + defer conn.Close() + cols := opts.InitialCols + if cols <= 0 { + cols = 80 + } + rows := opts.InitialRows + if rows <= 0 { + rows = 24 + } + + helloBytes, err := json.Marshal(HelloPayload{Version: 1, Width: cols, Height: rows}) + if err != nil { + return fmt.Errorf("marshal Hello payload: %w", err) + } + + // Serialize frame writes; both Output and Exit are written from different + // goroutines (the read pump and the exit watcher). + var writeMu sync.Mutex + writeFrame := func(t FrameType, payload []byte) error { + writeMu.Lock() + defer writeMu.Unlock() + return writeFrameLocked(conn, t, payload) + } + + if err := writeFrame(FrameHello, helloBytes); err != nil { + return fmt.Errorf("write Hello: %w", err) + } + // Empty StateSync: the headless terminal in the Aspire terminal host + // rebuilds state from subsequent Output frames. + if err := writeFrame(FrameStateSync, nil); err != nil { + return fmt.Errorf("write StateSync: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // When the context is cancelled (either externally or by one of our pumps), + // nudge the connection's blocked Read to return immediately. This is the + // only way to unstick the conn->pty pump that is blocked inside + // readFrame's io.ReadFull. We do not close conn (the caller owns its + // lifetime), we just expire its read deadline. + cancelWatcher := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + if d, ok := conn.(interface{ SetReadDeadline(time.Time) error }); ok { + _ = d.SetReadDeadline(time.Unix(1, 0)) + } + case <-cancelWatcher: + } + }() + defer close(cancelWatcher) + + var wg sync.WaitGroup + errCh := make(chan error, 2) + + // Pump 1: PTY -> conn (Output frames). Exits when PTY EOFs / errors. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + buf := make([]byte, 32*1024) + for { + n, readErr := pty.Read(buf) + if n > 0 { + if writeErr := writeFrame(FrameOutput, buf[:n]); writeErr != nil { + errCh <- fmt.Errorf("write Output: %w", writeErr) + return + } + } + if readErr != nil { + if errors.Is(readErr, io.EOF) { + return + } + errCh <- fmt.Errorf("read PTY: %w", readErr) + return + } + } + }() + + // Pump 2: conn -> PTY (Input/Resize). Exits when connection closes / errors + // or the context is cancelled. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + for { + select { + case <-ctx.Done(): + return + default: + } + + frameType, payload, readErr := readFrame(conn) + if readErr != nil { + // Shutdown signals (EOF, closed conn, our own deadline nudge) + // are silent. Other errors are real protocol failures and + // must propagate even if the context happens to be cancelled + // concurrently (e.g. because the PTY exited at the same time + // as we received an invalid frame). + if errors.Is(readErr, io.EOF) || errors.Is(readErr, net.ErrClosed) || errors.Is(readErr, os.ErrDeadlineExceeded) { + return + } + errCh <- fmt.Errorf("read frame: %w", readErr) + return + } + + switch frameType { + case FrameInput: + if _, writeErr := pty.Write(payload); writeErr != nil { + errCh <- fmt.Errorf("write PTY: %w", writeErr) + return + } + case FrameResize: + if len(payload) != 8 { + opts.Log.V(1).Info("dropping malformed Resize frame", "length", len(payload)) + continue + } + w := int(binary.LittleEndian.Uint32(payload[0:4])) + h := int(binary.LittleEndian.Uint32(payload[4:8])) + if resizeErr := pty.Resize(w, h); resizeErr != nil { + opts.Log.V(1).Info("PTY resize failed", "err", resizeErr.Error(), "cols", w, "rows", h) + } + default: + opts.Log.V(1).Info("ignoring unexpected frame from client", "type", byte(frameType), "length", len(payload)) + } + } + }() + + wg.Wait() + + // Send a best-effort Exit frame. If the remote side has already gone away + // (protocol error, dropped TCP, etc.) the write will block until the + // deadline fires; either way we move on. + var exitCode int32 + if waitExit != nil { + exitCode = waitExit() + } + exitPayload := make([]byte, 4) + binary.LittleEndian.PutUint32(exitPayload, uint32(exitCode)) + if d, ok := conn.(interface{ SetWriteDeadline(time.Time) error }); ok { + _ = d.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) + } + _ = writeFrame(FrameExit, exitPayload) + + select { + case err := <-errCh: + return err + default: + return nil + } +} + +func writeFrameLocked(w io.Writer, t FrameType, payload []byte) error { + if len(payload) > MaxPayloadLength { + return fmt.Errorf("hmp1: payload exceeds maximum (%d > %d)", len(payload), MaxPayloadLength) + } + header := [5]byte{} + header[0] = byte(t) + binary.LittleEndian.PutUint32(header[1:5], uint32(len(payload))) + if _, err := w.Write(header[:]); err != nil { + return err + } + if len(payload) > 0 { + if _, err := w.Write(payload); err != nil { + return err + } + } + return nil +} + +func readFrame(r io.Reader) (FrameType, []byte, error) { + var header [5]byte + if _, err := io.ReadFull(r, header[:]); err != nil { + return 0, nil, err + } + t := FrameType(header[0]) + length := binary.LittleEndian.Uint32(header[1:5]) + if length > MaxPayloadLength { + return 0, nil, fmt.Errorf("hmp1: payload length %d exceeds maximum %d", length, MaxPayloadLength) + } + if length == 0 { + return t, nil, nil + } + payload := make([]byte, length) + if _, err := io.ReadFull(r, payload); err != nil { + return 0, nil, err + } + return t, payload, nil +} diff --git a/internal/hmp1/server_test.go b/internal/hmp1/server_test.go new file mode 100644 index 00000000..775e8330 --- /dev/null +++ b/internal/hmp1/server_test.go @@ -0,0 +1,355 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package hmp1 + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "io" + "net" + "sync" + "testing" + "time" +) + +// fakePty is an in-memory PTY for tests. Reads pull from inbound (data +// "produced" by the user process); writes push into outbound (data going +// "to" the process). Resize calls are recorded for assertions. +type fakePty struct { + inbound chan []byte + outbound chan []byte + + mu sync.Mutex + closed bool + resizes []struct{ Cols, Rows int } +} + +func newFakePty() *fakePty { + return &fakePty{ + inbound: make(chan []byte, 16), + outbound: make(chan []byte, 16), + } +} + +func (f *fakePty) Read(p []byte) (int, error) { + chunk, ok := <-f.inbound + if !ok { + return 0, io.EOF + } + n := copy(p, chunk) + return n, nil +} + +func (f *fakePty) Write(p []byte) (int, error) { + out := make([]byte, len(p)) + copy(out, p) + f.outbound <- out + return len(p), nil +} + +func (f *fakePty) Resize(cols, rows int) error { + f.mu.Lock() + defer f.mu.Unlock() + f.resizes = append(f.resizes, struct{ Cols, Rows int }{cols, rows}) + return nil +} + +func (f *fakePty) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if !f.closed { + f.closed = true + close(f.inbound) + } + return nil +} + +func TestServeSendsHelloAndStateSyncOnConnect(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{InitialCols: 100, InitialRows: 30}) + }() + + // Read Hello. + frameType, payload := readFrameHelper(t, a) + if frameType != FrameHello { + t.Fatalf("expected Hello frame, got 0x%02x", frameType) + } + var hello HelloPayload + if err := json.Unmarshal(payload, &hello); err != nil { + t.Fatalf("unmarshal Hello: %v", err) + } + if hello.Version != 1 || hello.Width != 100 || hello.Height != 30 { + t.Errorf("unexpected Hello payload: %+v", hello) + } + + // Read StateSync. + frameType, payload = readFrameHelper(t, a) + if frameType != FrameStateSync { + t.Fatalf("expected StateSync frame, got 0x%02x", frameType) + } + if len(payload) != 0 { + t.Errorf("expected empty StateSync, got %d bytes", len(payload)) + } + + // Trigger PTY exit so Serve returns. + pty.Close() + + // Read terminal Exit frame. + frameType, _ = readFrameHelper(t, a) + if frameType != FrameExit { + t.Errorf("expected Exit frame after PTY EOF, got 0x%02x", frameType) + } + + if err := <-done; err != nil { + t.Errorf("Serve returned error: %v", err) + } +} + +func TestServeForwardsOutputFromPty(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{InitialCols: 80, InitialRows: 24}) + }() + + // Drain Hello + StateSync. + readFrameHelper(t, a) + readFrameHelper(t, a) + + // Push some PTY output. + pty.inbound <- []byte("hello world") + + // Should arrive as an Output frame. + frameType, payload := readFrameHelper(t, a) + if frameType != FrameOutput { + t.Fatalf("expected Output frame, got 0x%02x", frameType) + } + if string(payload) != "hello world" { + t.Errorf("unexpected Output payload: %q", string(payload)) + } + + pty.Close() + readFrameHelper(t, a) // Exit + <-done +} + +func TestServeForwardsInputToPty(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{}) + }() + + // Drain Hello + StateSync. + readFrameHelper(t, a) + readFrameHelper(t, a) + + // Send an Input frame. + writeFrameHelper(t, a, FrameInput, []byte("ls\r\n")) + + select { + case got := <-pty.outbound: + if string(got) != "ls\r\n" { + t.Errorf("unexpected PTY input: %q", string(got)) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for PTY input") + } + + pty.Close() + readFrameHelper(t, a) + <-done +} + +func TestServeForwardsResizeToPty(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{}) + }() + + readFrameHelper(t, a) + readFrameHelper(t, a) + + resizePayload := make([]byte, 8) + binary.LittleEndian.PutUint32(resizePayload[0:4], 132) + binary.LittleEndian.PutUint32(resizePayload[4:8], 50) + writeFrameHelper(t, a, FrameResize, resizePayload) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + pty.mu.Lock() + count := len(pty.resizes) + pty.mu.Unlock() + if count > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + + pty.mu.Lock() + if len(pty.resizes) != 1 || pty.resizes[0].Cols != 132 || pty.resizes[0].Rows != 50 { + t.Errorf("unexpected PTY resize history: %+v", pty.resizes) + } + pty.mu.Unlock() + + pty.Close() + readFrameHelper(t, a) + <-done +} + +func TestServeSendsExitFrameWithExitCode(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, func() int32 { return 42 }, ServerOptions{}) + }() + + readFrameHelper(t, a) + readFrameHelper(t, a) + + pty.Close() + + frameType, payload := readFrameHelper(t, a) + if frameType != FrameExit { + t.Fatalf("expected Exit frame, got 0x%02x", frameType) + } + if len(payload) != 4 { + t.Fatalf("expected 4-byte Exit payload, got %d", len(payload)) + } + exitCode := int32(binary.LittleEndian.Uint32(payload)) + if exitCode != 42 { + t.Errorf("expected exit code 42, got %d", exitCode) + } + + <-done +} + +func TestServeRejectsOversizedFrame(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{}) + }() + + readFrameHelper(t, a) // Hello + readFrameHelper(t, a) // StateSync + + // Craft a header that claims an oversize length without sending the body. + header := [5]byte{byte(FrameInput), 0, 0, 0, 0} + binary.LittleEndian.PutUint32(header[1:5], uint32(MaxPayloadLength+1)) + if _, err := a.Write(header[:]); err != nil { + t.Fatalf("write header: %v", err) + } + + // In production the PTY persists across viewer reconnects, so Serve does + // not close it on abort. Close it explicitly here to let the read pump + // exit, which is what production would do on shutdown anyway. + pty.Close() + + // Server should treat the frame as a fatal protocol error and exit; we + // observe that by Serve completing. + select { + case err := <-done: + if err == nil { + t.Errorf("expected Serve to return an error on oversize frame") + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for Serve to exit on oversize frame") + } +} + +// readFrameHelper performs a blocking read of one HMP v1 frame from r, +// failing the test on error. +func readFrameHelper(t *testing.T, r net.Conn) (FrameType, []byte) { + t.Helper() + _ = r.SetReadDeadline(time.Now().Add(3 * time.Second)) + defer r.SetReadDeadline(time.Time{}) + var header [5]byte + if _, err := io.ReadFull(r, header[:]); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { + t.Fatalf("connection closed while expecting frame") + } + t.Fatalf("read header: %v", err) + } + t.Logf("got frame type=0x%02x len=%d", header[0], binary.LittleEndian.Uint32(header[1:5])) + length := binary.LittleEndian.Uint32(header[1:5]) + if length == 0 { + return FrameType(header[0]), nil + } + payload := make([]byte, length) + if _, err := io.ReadFull(r, payload); err != nil { + t.Fatalf("read payload: %v", err) + } + return FrameType(header[0]), payload +} + +func writeFrameHelper(t *testing.T, w net.Conn, ft FrameType, payload []byte) { + t.Helper() + header := [5]byte{} + header[0] = byte(ft) + binary.LittleEndian.PutUint32(header[1:5], uint32(len(payload))) + if _, err := w.Write(header[:]); err != nil { + t.Fatalf("write header: %v", err) + } + if len(payload) > 0 { + if _, err := w.Write(payload); err != nil { + t.Fatalf("write payload: %v", err) + } + } +} From 8979c9b0a3af73d9d98c92b80d2e7618d6c7dd9d Mon Sep 17 00:00:00 2001 From: Mitch Denny Date: Tue, 5 May 2026 14:08:54 +1000 Subject: [PATCH 2/5] Add container PTY support and shared termpty package Refactors the Windows PTY/HMP v1 plumbing introduced in PR #133 from internal/exerunners into a shared internal/termpty package, then layers container terminal support on top using the same primitives. Refactor (no behavior change for executables): - New internal/termpty package with Process, Session, CommandSpec, SessionConfig and BuildWindowsCommandLine. - internal/exerunners now consumes termpty.StartProcess and termpty.StartSession; the executable terminal session windows test moves into termpty/session_windows_test.go. Container PTY: - ContainerSpec gains a Terminal *TerminalSpec field (mirrors the shape on ExecutableSpec). - Docker and Podman applyCreateContainerOptions append "-t -i" to `docker create` / `podman create` when Terminal is enabled, so the container runtime allocates a real PTY for the container's PID 1. - New controllers/container_terminal.go::startContainerTerminalSession spawns the orchestrator's CLI as ` start --attach --interactive ` under a host ConPTY, exposing the resulting byte stream as an HMP v1 producer at TerminalSpec.UDSPath. - runningContainerData tracks the live *termpty.Session; the new closeTerminalSession helper is invoked from deleteContainer so the attach process is reaped during teardown. - ContainerReconciler.ensureContainerTerminalSession invokes the helper after a successful start (both the simple-network and custom-network code paths). Terminal attach errors are non-fatal: the container keeps running, just without an attachable terminal. The terminal attach process exiting (because the container exited or was removed) signals container exit through the same Session.Done() channel that the executable runner already uses. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/v1/container_types.go | 13 +++ api/v1/zz_generated.deepcopy.go | 5 + controllers/container_controller.go | 59 ++++++++++++ controllers/container_terminal.go | 79 ++++++++++++++++ controllers/running_container_data.go | 34 +++++++ internal/docker/cli_orchestrator.go | 7 ++ .../exerunners/process_executable_runner.go | 3 +- .../process_executable_runner_terminal.go | 33 ++++++- internal/exerunners/pty.go | 46 --------- internal/podman/cli_orchestrator.go | 7 ++ internal/termpty/config.go | 35 +++++++ internal/termpty/pty.go | 77 +++++++++++++++ internal/{exerunners => termpty}/pty_other.go | 15 ++- .../{exerunners => termpty}/pty_windows.go | 61 +++++------- .../session.go} | 93 +++++++++---------- .../session_windows_test.go} | 36 +++---- 16 files changed, 435 insertions(+), 168 deletions(-) create mode 100644 controllers/container_terminal.go delete mode 100644 internal/exerunners/pty.go create mode 100644 internal/termpty/config.go create mode 100644 internal/termpty/pty.go rename internal/{exerunners => termpty}/pty_other.go (56%) rename internal/{exerunners => termpty}/pty_windows.go (63%) rename internal/{exerunners/terminal_session.go => termpty/session.go} (74%) rename internal/{exerunners/terminal_session_windows_test.go => termpty/session_windows_test.go} (88%) diff --git a/api/v1/container_types.go b/api/v1/container_types.go index 8c47b423..3ada710f 100644 --- a/api/v1/container_types.go +++ b/api/v1/container_types.go @@ -682,6 +682,15 @@ type ContainerSpec struct { // PEM formatted public certificates to be created in the container // +optional PemCertificates *ContainerPemCertificates `json:"pemCertificates,omitempty"` + + // Optional terminal/PTY configuration. When set and Enabled is true, the + // container's primary process is started under a host pseudo-terminal + // and its stdin/stdout/stderr are bridged to the configured UDS via + // HMP v1, instead of the container being run detached with separate log + // capture. Terminal mode requires Windows on the host (ConPTY) for the + // initial slice; non-Windows hosts return ErrTerminalNotSupported at + // startup. + Terminal *TerminalSpec `json:"terminal,omitempty"` } func (cs *ContainerSpec) Equal(other *ContainerSpec) bool { @@ -791,6 +800,10 @@ func (cs *ContainerSpec) Equal(other *ContainerSpec) bool { return false } + if !cs.Terminal.Equal(other.Terminal) { + return false + } + return true } diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 6e8fcc36..d08bcab0 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -739,6 +739,11 @@ func (in *ContainerSpec) DeepCopyInto(out *ContainerSpec) { *out = new(ContainerPemCertificates) (*in).DeepCopyInto(*out) } + if in.Terminal != nil { + in, out := &in.Terminal, &out.Terminal + *out = new(TerminalSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerSpec. diff --git a/controllers/container_controller.go b/controllers/container_controller.go index b0429fbc..488c5ad7 100644 --- a/controllers/container_controller.go +++ b/controllers/container_controller.go @@ -546,6 +546,10 @@ func ensureContainerStartingState( rcd.containerState = apiv1.ContainerStateRunning rcd.startAttemptFinishedAt = metav1.NowMicro() change |= statusChanged + + if termErr := r.ensureContainerTerminalSession(ctx, rcd, log); termErr != nil { + log.Error(termErr, "Failed to attach terminal session to running container; container is running but interactive terminal will be unavailable") + } case inspected != nil && inspected.Status == containers.ContainerStatusExited: rcd.containerState = apiv1.ContainerStateExited rcd.startAttemptFinishedAt = metav1.NowMicro() @@ -1481,6 +1485,13 @@ func (r *ContainerReconciler) startContainerWithOrchestrator(container *apiv1.Co if inspected.Status == containers.ContainerStatusRunning { log.V(1).Info("Container started") rcd.containerState = apiv1.ContainerStateRunning + + if err := r.ensureContainerTerminalSession(startupCtx, rcd, log); err != nil { + // Terminal attach failure is non-fatal: the container is + // running and observable via the orchestrator, but no + // interactive PTY is available. We log and continue. + log.Error(err, "Failed to attach terminal session to running container; container is running but interactive terminal will be unavailable") + } } else { log.V(1).Info("Container started and exited shortly after", "ContainerStatus", inspected.Status) rcd.containerState = apiv1.ContainerStateExited @@ -1557,6 +1568,7 @@ func (r *ContainerReconciler) deleteContainer(ctx context.Context, container *ap // or if the container has already finished starting/stopping and we know the outcome of either. defer rcd.deleteStartupLogFiles(log) + defer rcd.closeTerminalSession(log) if container.Spec.Persistent { log.V(1).Info("Container is not using Managed mode, leaving underlying resources") @@ -1588,6 +1600,53 @@ func (r *ContainerReconciler) cleanupDcpContainerResources(ctx context.Context, r.ReleaseContainerWatchForResource(container.UID, log) } +// ensureContainerTerminalSession attaches a host-side PTY to the running +// container and starts the HMP v1 listener at the configured UDS path, +// storing the resulting session on rcd. No-op if the container does not +// have terminal enabled or a session is already active. +// +// The container must have been created with `-t -i` for the attach to +// deliver a usable terminal; that is handled by applyCreateContainerOptions +// in the docker/podman orchestrator when ContainerSpec.Terminal is set. +// +// Errors here are non-fatal to the container lifecycle: the container is +// already running by the time this is called. The caller is expected to +// log the error and move on. +func (r *ContainerReconciler) ensureContainerTerminalSession( + ctx context.Context, + rcd *runningContainerData, + log logr.Logger, +) error { + if rcd == nil || rcd.runSpec == nil { + return nil + } + + terminal := rcd.runSpec.Terminal + if terminal == nil || !terminal.Enabled { + return nil + } + + if rcd.terminalSession != nil { + return nil + } + + if !rcd.hasValidContainerID() { + return errors.New("ensureContainerTerminalSession called without a valid container ID") + } + + runner, ok := r.orchestrator.(containers.CLICommandRunner) + if !ok { + return fmt.Errorf("container orchestrator %T does not implement containers.CLICommandRunner; terminal attach not supported", r.orchestrator) + } + + session, err := startContainerTerminalSession(ctx, runner, string(rcd.containerID), terminal, log) + if err != nil { + return err + } + rcd.terminalSession = session + return nil +} + func (r *ContainerReconciler) startContainerWithTimeout( parentCtx context.Context, containerName string, diff --git a/controllers/container_terminal.go b/controllers/container_terminal.go new file mode 100644 index 00000000..b74e1e01 --- /dev/null +++ b/controllers/container_terminal.go @@ -0,0 +1,79 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package controllers + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + + apiv1 "github.com/microsoft/dcp/api/v1" + "github.com/microsoft/dcp/internal/containers" + "github.com/microsoft/dcp/internal/termpty" +) + +// startContainerTerminalSession spawns the container runtime CLI with +// `start --attach --interactive ` under a host PTY, then stands +// up an HMP v1 listener at spec.UDSPath that bridges viewer connections to +// that PTY. The returned Session owns the lifetime of both the CLI process +// and the listener; callers must Close it during teardown. +// +// The container must have been created with `-t -i` (allocate TTY + keep +// stdin open) for the attach to deliver a usable terminal; this is handled +// automatically when ContainerSpec.Terminal != nil && Enabled by the docker +// and podman orchestrators' applyCreateContainerOptions helper. +// +// On hosts where DCP does not yet implement PTY allocation (currently +// non-Windows) this returns termpty.ErrTerminalNotSupported. +func startContainerTerminalSession( + ctx context.Context, + runner containers.CLICommandRunner, + containerID string, + spec *apiv1.TerminalSpec, + log logr.Logger, +) (*termpty.Session, error) { + if spec == nil || !spec.Enabled { + return nil, fmt.Errorf("startContainerTerminalSession: spec must be non-nil and Enabled") + } + + // Use MakeCommand to extract the configured CLI path (e.g. "docker" or + // "podman", possibly resolved against PATH); we don't actually start the + // command via the orchestrator's process.Executor because we need direct + // ConPTY semantics. + cmd := runner.MakeCommand("start", "--attach", "--interactive", containerID) + + commandLine := termpty.BuildWindowsCommandLine(cmd.Path, cmd.Args[1:]) + + startLog := log.WithValues( + "Cmd", cmd.Path, + "ContainerID", containerID, + "Terminal", true, + "UDSPath", spec.UDSPath, + ) + startLog.Info("Starting container attach under PTY...") + + tp, err := termpty.StartProcess(ctx, termpty.CommandSpec{ + CommandLine: commandLine, + Cols: int(spec.Cols), + Rows: int(spec.Rows), + }) + if err != nil { + return nil, fmt.Errorf("starting container attach under PTY: %w", err) + } + + session, err := termpty.StartSession(ctx, termpty.SessionConfig{ + UDSPath: spec.UDSPath, + Cols: int(spec.Cols), + Rows: int(spec.Rows), + }, tp, startLog) + if err != nil { + // StartSession is responsible for closing tp.PTY when it fails. + return nil, fmt.Errorf("starting container terminal session listener: %w", err) + } + + return session, nil +} diff --git a/controllers/running_container_data.go b/controllers/running_container_data.go index c652f061..90b96dfc 100644 --- a/controllers/running_container_data.go +++ b/controllers/running_container_data.go @@ -18,6 +18,7 @@ import ( apiv1 "github.com/microsoft/dcp/api/v1" ct "github.com/microsoft/dcp/internal/containers" "github.com/microsoft/dcp/internal/health" + "github.com/microsoft/dcp/internal/termpty" usvc_io "github.com/microsoft/dcp/pkg/io" "github.com/microsoft/dcp/pkg/maps" "github.com/microsoft/dcp/pkg/osutil" @@ -87,6 +88,14 @@ type runningContainerData struct { // Whether health probes are enabled for the Container healthProbesEnabled *bool + + // terminalSession owns the host-side PTY + HMP v1 listener bridging the + // container's primary process to the Aspire terminal host. Non-nil only + // when the container was created with Spec.Terminal.Enabled and the + // attach to the running container succeeded. Closing the session tears + // down both the listener and the underlying CLI attach process (which + // in turn closes the container's stdin and detaches). + terminalSession *termpty.Session } const placeholderContainerIdPrefix = "__placeholder-" @@ -134,6 +143,10 @@ func (rcd *runningContainerData) Clone() *runningContainerData { networkConnections: stdmaps.Clone(rcd.networkConnections), runSpec: rcd.runSpec.DeepCopy(), healthProbeResults: stdmaps.Clone(rcd.healthProbeResults), + // terminalSession is shared (one runtime resource per container). + // Clones must reference the same Session; otherwise UpdateFrom + // would lose track of the in-flight attach process. + terminalSession: rcd.terminalSession, } pointers.SetValueFrom(&clone.exitCode, rcd.exitCode) @@ -228,6 +241,14 @@ func (rcd *runningContainerData) UpdateFrom(other *runningContainerData) bool { updated = true } + // terminalSession is a one-shot pointer assignment: once attached, it + // must not be cleared by stale clones replayed via deferred ops, so + // only propagate non-nil values forward. + if other.terminalSession != nil && rcd.terminalSession != other.terminalSession { + rcd.terminalSession = other.terminalSession + updated = true + } + return updated } @@ -278,6 +299,19 @@ func (rcd *runningContainerData) deleteStartupLogFiles(log logr.Logger) { } } +// closeTerminalSession tears down any host-side PTY + HMP v1 listener +// previously stood up for this container. Safe to call multiple times. +func (rcd *runningContainerData) closeTerminalSession(log logr.Logger) { + session := rcd.terminalSession + rcd.terminalSession = nil + if session == nil { + return + } + if err := session.Close(); err != nil { + log.V(1).Info("Error closing container terminal session", "Error", err.Error()) + } +} + func (rcd *runningContainerData) getStartupLogWriters() (usvc_io.ParagraphWriter, usvc_io.ParagraphWriter) { var stdoutWriter, stderrWriter usvc_io.ParagraphWriter diff --git a/internal/docker/cli_orchestrator.go b/internal/docker/cli_orchestrator.go index 6640055c..41cacd71 100644 --- a/internal/docker/cli_orchestrator.go +++ b/internal/docker/cli_orchestrator.go @@ -574,6 +574,13 @@ func applyCreateContainerOptions(args []string, options containers.CreateContain } } + if options.Terminal != nil && options.Terminal.Enabled { + // Allocate a TTY in the container and keep stdin open. Required so a + // later "docker start --attach --interactive" can bridge a host PTY + // to the container's primary process. + args = append(args, "-t", "-i") + } + args = append(args, options.RunArgs...) return args diff --git a/internal/exerunners/process_executable_runner.go b/internal/exerunners/process_executable_runner.go index 4876ae46..1694f4a0 100644 --- a/internal/exerunners/process_executable_runner.go +++ b/internal/exerunners/process_executable_runner.go @@ -22,6 +22,7 @@ import ( "github.com/microsoft/dcp/controllers" "github.com/microsoft/dcp/internal/dcpproc" "github.com/microsoft/dcp/internal/logs" + "github.com/microsoft/dcp/internal/termpty" usvc_io "github.com/microsoft/dcp/pkg/io" "github.com/microsoft/dcp/pkg/osutil" "github.com/microsoft/dcp/pkg/pointers" @@ -39,7 +40,7 @@ type processRunState struct { // terminalSession is non-nil when the run was started under a PTY via // startTerminalRun. It owns the PTY and the HMP v1 listener; closing it // terminates both. - terminalSession *terminalSession + terminalSession *termpty.Session } type ProcessExecutableRunner struct { diff --git a/internal/exerunners/process_executable_runner_terminal.go b/internal/exerunners/process_executable_runner_terminal.go index 7152b29e..1a91e4d3 100644 --- a/internal/exerunners/process_executable_runner_terminal.go +++ b/internal/exerunners/process_executable_runner_terminal.go @@ -8,6 +8,7 @@ package exerunners import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -16,6 +17,7 @@ import ( apiv1 "github.com/microsoft/dcp/api/v1" "github.com/microsoft/dcp/controllers" + "github.com/microsoft/dcp/internal/termpty" "github.com/microsoft/dcp/pkg/pointers" "github.com/microsoft/dcp/pkg/process" ) @@ -38,7 +40,8 @@ func (r *ProcessExecutableRunner) startTerminalRun( result := controllers.NewExecutableStartResult() - tp, err := startTerminalProcess(ctx, exe) + cmdSpec := executableTerminalCommandSpec(exe) + tp, err := termpty.StartProcess(ctx, cmdSpec) if err != nil { startLog.Error(err, "Failed to start process under PTY") result.CompletionTimestamp = metav1.NowMicro() @@ -48,7 +51,12 @@ func (r *ProcessExecutableRunner) startTerminalRun( return result } - session, err := startTerminalSession(ctx, exe, tp, startLog) + sessionCfg := termpty.SessionConfig{ + UDSPath: exe.Spec.Terminal.UDSPath, + Cols: int(exe.Spec.Terminal.Cols), + Rows: int(exe.Spec.Terminal.Rows), + } + session, err := termpty.StartSession(ctx, sessionCfg, tp, startLog) if err != nil { startLog.Error(err, "Failed to start terminal session listener") result.CompletionTimestamp = metav1.NowMicro() @@ -58,7 +66,7 @@ func (r *ProcessExecutableRunner) startTerminalRun( return result } - pid := process.Pid_t(tp.pid) + pid := process.Pid_t(tp.PID) identityTime := time.Now() runID := pidToRunID(pid) @@ -118,3 +126,22 @@ func (r *ProcessExecutableRunner) startTerminalRun( runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result) return result } + +// executableTerminalCommandSpec builds a termpty.CommandSpec from the +// Executable's effective configuration. Currently this is Windows-shaped +// (single command-line string for CreateProcessW); cross-platform support +// is tracked by ErrTerminalNotSupported on non-Windows hosts. +func executableTerminalCommandSpec(exe *apiv1.Executable) termpty.CommandSpec { + envBlock := make([]string, 0, len(exe.Status.EffectiveEnv)) + for _, e := range exe.Status.EffectiveEnv { + envBlock = append(envBlock, fmt.Sprintf("%s=%s", e.Name, e.Value)) + } + + return termpty.CommandSpec{ + CommandLine: termpty.BuildWindowsCommandLine(exe.Spec.ExecutablePath, exe.Status.EffectiveArgs), + Env: envBlock, + WorkingDirectory: exe.Spec.WorkingDirectory, + Cols: int(exe.Spec.Terminal.Cols), + Rows: int(exe.Spec.Terminal.Rows), + } +} diff --git a/internal/exerunners/pty.go b/internal/exerunners/pty.go deleted file mode 100644 index f3ad83c9..00000000 --- a/internal/exerunners/pty.go +++ /dev/null @@ -1,46 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See LICENSE in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package exerunners - -import ( - "context" - "errors" - - apiv1 "github.com/microsoft/dcp/api/v1" - "github.com/microsoft/dcp/internal/hmp1" -) - -// ErrTerminalNotSupported is returned when terminal/PTY support is requested -// for an Executable on a platform where DCP does not yet implement it. The -// initial slice (Aspire 13.4) supports Windows only via ConPTY; Linux/macOS -// support follows in a subsequent change. -var ErrTerminalNotSupported = errors.New("terminal/PTY support is not implemented on this platform") - -// terminalProcess represents a process running attached to a pseudo-terminal, -// plus the PTY interface used by the HMP v1 server to bridge it to the Aspire -// terminal host. -type terminalProcess struct { - // pty is the ReadWriteCloser exposing the PTY master end. Implements - // hmp1.PTY (Read, Write, Resize, Close). - pty hmp1.PTY - - // pid is the OS process id of the spawned process. - pid int - - // waitExit blocks until the process exits and returns the exit code. - // Called at most once. - waitExit func() int32 -} - -// startTerminalProcess allocates a pseudo-terminal and starts the Executable -// attached to it. The returned terminalProcess must be Closed by the caller -// when the process exits or when the run is being stopped. -// -// The implementation is platform-specific. See pty_windows.go and -// pty_other.go. -func startTerminalProcess(ctx context.Context, exe *apiv1.Executable) (*terminalProcess, error) { - return startTerminalProcessImpl(ctx, exe) -} diff --git a/internal/podman/cli_orchestrator.go b/internal/podman/cli_orchestrator.go index fa3b281d..9c90e223 100644 --- a/internal/podman/cli_orchestrator.go +++ b/internal/podman/cli_orchestrator.go @@ -559,6 +559,13 @@ func applyCreateContainerOptions(args []string, options containers.CreateContain } } + if options.Terminal != nil && options.Terminal.Enabled { + // Allocate a TTY in the container and keep stdin open. Required so a + // later "podman start --attach --interactive" can bridge a host PTY + // to the container's primary process. + args = append(args, "-t", "-i") + } + args = append(args, options.RunArgs...) return args diff --git a/internal/termpty/config.go b/internal/termpty/config.go new file mode 100644 index 00000000..db1bde31 --- /dev/null +++ b/internal/termpty/config.go @@ -0,0 +1,35 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package termpty + +// SessionConfig captures the parameters the HMP v1 listener needs to accept +// terminal viewer connections. It mirrors the relevant subset of +// apiv1.TerminalSpec but lives in this package so it can be constructed +// from any caller (executable runner, container reconciler, ...). +type SessionConfig struct { + // UDSPath is the Unix domain socket path the listener binds to. + UDSPath string + + // Cols/Rows are the initial terminal dimensions the HMP v1 server + // reports to a connecting viewer if it doesn't supply its own. If + // either is <= 0 the defaults from normalizeDimensions are used. + Cols int + Rows int +} + +// normalizeDimensions returns the effective dimensions to use for an +// allocated PTY or for HMP1's initial Hello frame, applying the +// 80x24 fallback when either input is non-positive. +func normalizeDimensions(cols, rows int) (int, int) { + c, r := cols, rows + if c <= 0 { + c = 80 + } + if r <= 0 { + r = 24 + } + return c, r +} diff --git a/internal/termpty/pty.go b/internal/termpty/pty.go new file mode 100644 index 00000000..1e51db25 --- /dev/null +++ b/internal/termpty/pty.go @@ -0,0 +1,77 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +// Package termpty provides the terminal/PTY primitives used by both the +// executable runner (for executables started under WithTerminal) and the +// container reconciler (for containers started under WithTerminal). It +// abstracts over the platform-specific PTY implementation (ConPTY on +// Windows; not yet implemented on other platforms) and the HMP v1 listener +// that bridges the host PTY to the Aspire terminal host. +package termpty + +import ( + "context" + "errors" + + "github.com/microsoft/dcp/internal/hmp1" +) + +// ErrTerminalNotSupported is returned when terminal/PTY support is requested +// on a platform where DCP does not yet implement it. The initial slice +// (Aspire 13.4) supports Windows only via ConPTY; Linux/macOS support +// follows in a subsequent change. +var ErrTerminalNotSupported = errors.New("terminal/PTY support is not implemented on this platform") + +// Process represents a process running attached to a pseudo-terminal, +// plus the PTY interface used by the HMP v1 server to bridge it to the +// Aspire terminal host. +type Process struct { + // PTY is the ReadWriteCloser exposing the PTY master end. Implements + // hmp1.PTY (Read, Write, Resize, Close). + PTY hmp1.PTY + + // PID is the OS process id of the spawned process. + PID int + + // WaitExit blocks until the process exits and returns the exit code. + // Called at most once. + WaitExit func() int32 +} + +// CommandSpec captures everything needed to spawn a process attached to a +// freshly allocated pseudo-terminal. It deliberately mirrors only the inputs +// the underlying ConPTY API needs and is independent of any specific resource +// kind (Executable, Container, ...). +type CommandSpec struct { + // CommandLine is the full command line in CreateProcessW form (a single + // string with arguments embedded and quoted as appropriate). Callers can + // build this via BuildWindowsCommandLine. + CommandLine string + + // Env is the environment block passed to the child process (each entry + // is "KEY=VALUE"). May be nil to inherit the parent's environment. + Env []string + + // WorkingDirectory is the cwd for the child process. Empty inherits the + // parent's cwd. + WorkingDirectory string + + // Cols/Rows are the initial dimensions of the allocated PTY. If either + // is <= 0 the implementation falls back to a sensible default + // (currently 80x24). + Cols int + Rows int +} + +// StartProcess allocates a pseudo-terminal and starts a process attached to +// it according to the supplied CommandSpec. The returned Process must be +// closed (via Process.PTY.Close) by the caller when the process exits or the +// terminal is being torn down. +// +// The implementation is platform-specific. See pty_windows.go and +// pty_other.go. +func StartProcess(ctx context.Context, spec CommandSpec) (*Process, error) { + return startProcessImpl(ctx, spec) +} diff --git a/internal/exerunners/pty_other.go b/internal/termpty/pty_other.go similarity index 56% rename from internal/exerunners/pty_other.go rename to internal/termpty/pty_other.go index 60474c18..49791d1a 100644 --- a/internal/exerunners/pty_other.go +++ b/internal/termpty/pty_other.go @@ -5,17 +5,22 @@ //go:build !windows -package exerunners +package termpty import ( "context" - - apiv1 "github.com/microsoft/dcp/api/v1" ) -// startTerminalProcessImpl is the non-Windows fallback. The initial slice +// startProcessImpl is the non-Windows fallback. The initial slice // (Aspire 13.4) implements PTY support on Windows via ConPTY only; Linux and // macOS support is tracked as follow-up work. -func startTerminalProcessImpl(_ context.Context, _ *apiv1.Executable) (*terminalProcess, error) { +func startProcessImpl(_ context.Context, _ CommandSpec) (*Process, error) { return nil, ErrTerminalNotSupported } + +// BuildWindowsCommandLine is a no-op stub on non-Windows platforms; it lets +// callers reference it unconditionally without a build tag. Returns the +// empty string so accidental use surfaces obviously. +func BuildWindowsCommandLine(_ string, _ []string) string { + return "" +} diff --git a/internal/exerunners/pty_windows.go b/internal/termpty/pty_windows.go similarity index 63% rename from internal/exerunners/pty_windows.go rename to internal/termpty/pty_windows.go index cf481a78..27deb179 100644 --- a/internal/exerunners/pty_windows.go +++ b/internal/termpty/pty_windows.go @@ -5,7 +5,7 @@ //go:build windows -package exerunners +package termpty import ( "context" @@ -13,7 +13,6 @@ import ( "strings" "github.com/UserExistsError/conpty" - apiv1 "github.com/microsoft/dcp/api/v1" ) // conPtyAdapter wraps a *conpty.ConPty so it satisfies hmp1.PTY (specifically @@ -27,40 +26,35 @@ func (a *conPtyAdapter) Write(p []byte) (int, error) { return a.cp.Write(p) } func (a *conPtyAdapter) Close() error { return a.cp.Close() } func (a *conPtyAdapter) Resize(cols, rows int) error { return a.cp.Resize(cols, rows) } -// startTerminalProcessImpl allocates a Windows ConPTY, spawns the executable -// attached to it, and returns a terminalProcess. The caller is responsible for -// calling pty.Close() on shutdown. -func startTerminalProcessImpl(ctx context.Context, exe *apiv1.Executable) (*terminalProcess, error) { +// startProcessImpl allocates a Windows ConPTY, spawns a process attached to +// it, and returns a Process. The caller is responsible for calling +// PTY.Close() on shutdown. +func startProcessImpl(_ context.Context, spec CommandSpec) (*Process, error) { if !conpty.IsConPtyAvailable() { return nil, fmt.Errorf("ConPTY is not available on this Windows version: %w", conpty.ErrConPtyUnsupported) } - commandLine := buildWindowsCommandLine(exe.Spec.ExecutablePath, exe.Status.EffectiveArgs) - - envBlock := make([]string, 0, len(exe.Status.EffectiveEnv)) - for _, e := range exe.Status.EffectiveEnv { - envBlock = append(envBlock, fmt.Sprintf("%s=%s", e.Name, e.Value)) - } - - cols, rows := terminalDimensions(exe.Spec.Terminal) + cols, rows := normalizeDimensions(spec.Cols, spec.Rows) options := []conpty.ConPtyOption{ conpty.ConPtyDimensions(cols, rows), - conpty.ConPtyEnv(envBlock), } - if exe.Spec.WorkingDirectory != "" { - options = append(options, conpty.ConPtyWorkDir(exe.Spec.WorkingDirectory)) + if spec.Env != nil { + options = append(options, conpty.ConPtyEnv(spec.Env)) + } + if spec.WorkingDirectory != "" { + options = append(options, conpty.ConPtyWorkDir(spec.WorkingDirectory)) } - cp, err := conpty.Start(commandLine, options...) + cp, err := conpty.Start(spec.CommandLine, options...) if err != nil { return nil, fmt.Errorf("failed to start process under ConPTY: %w", err) } - return &terminalProcess{ - pty: &conPtyAdapter{cp: cp}, - pid: cp.Pid(), - waitExit: func() int32 { + return &Process{ + PTY: &conPtyAdapter{cp: cp}, + PID: cp.Pid(), + WaitExit: func() int32 { exitCode, waitErr := cp.Wait(context.Background()) if waitErr != nil { // Wait failures are best-effort: the connection's about to @@ -73,10 +67,11 @@ func startTerminalProcessImpl(ctx context.Context, exe *apiv1.Executable) (*term }, nil } -// buildWindowsCommandLine constructs a single command-line string suitable for -// CreateProcessW from a path + argv. Each token is wrapped in quotes and -// embedded quotes are escaped per the documented Windows argv parsing rules. -func buildWindowsCommandLine(executablePath string, args []string) string { +// BuildWindowsCommandLine constructs a single command-line string suitable +// for CreateProcessW from a path + argv. Each token is wrapped in quotes +// and embedded quotes are escaped per the documented Windows argv parsing +// rules. +func BuildWindowsCommandLine(executablePath string, args []string) string { var sb strings.Builder sb.WriteString(quoteWindowsArg(executablePath)) for _, a := range args { @@ -124,17 +119,3 @@ func quoteWindowsArg(arg string) string { sb.WriteByte('"') return sb.String() } - -func terminalDimensions(t *apiv1.TerminalSpec) (cols, rows int) { - cols, rows = 80, 24 - if t == nil { - return - } - if t.Cols > 0 { - cols = int(t.Cols) - } - if t.Rows > 0 { - rows = int(t.Rows) - } - return -} diff --git a/internal/exerunners/terminal_session.go b/internal/termpty/session.go similarity index 74% rename from internal/exerunners/terminal_session.go rename to internal/termpty/session.go index 26bd5593..eec96919 100644 --- a/internal/exerunners/terminal_session.go +++ b/internal/termpty/session.go @@ -3,7 +3,7 @@ * Licensed under the MIT License. See LICENSE in the project root for license information. *--------------------------------------------------------------------------------------------*/ -package exerunners +package termpty import ( "context" @@ -17,24 +17,26 @@ import ( "github.com/go-logr/logr" - apiv1 "github.com/microsoft/dcp/api/v1" "github.com/microsoft/dcp/internal/hmp1" ) -// terminalSession owns the lifecycle of a single Executable's pseudo-terminal -// listener. It listens on the configured UDS path, accepts at most one viewer -// connection at a time, and runs an HMP v1 server on each connection, -// bridging the connection to the underlying PTY. +// Session owns the lifecycle of a single PTY-attached process plus the +// HMP v1 listener that bridges viewer connections to that PTY. It is shared +// between the executable runner (one Session per executable replica started +// under WithTerminal) and the container reconciler (one Session per +// container started under WithTerminal). // // When the underlying process exits the session sends a final Exit frame to // any connected viewer (via hmp1.Serve's waitExit hook), then tears down the // listener. Calling Close() forces the same teardown. -type terminalSession struct { +type Session struct { log logr.Logger listener net.Listener udsPath string + cols int + rows int - tp *terminalProcess + tp *Process mu sync.Mutex currConn net.Conn @@ -56,12 +58,12 @@ type terminalSession struct { doneCh chan struct{} } -// startTerminalSession opens a listener at exe.Spec.Terminal.UDSPath, then -// spawns an accept loop that runs hmp1.Serve on each accepted connection. -// The PTY-attached process must already be running (passed via tp); the -// session owns tp from this point forward and will Close it on shutdown. -func startTerminalSession(ctx context.Context, exe *apiv1.Executable, tp *terminalProcess, log logr.Logger) (*terminalSession, error) { - udsPath := exe.Spec.Terminal.UDSPath +// StartSession opens a listener at cfg.UDSPath, then spawns an accept loop +// that runs hmp1.Serve on each accepted connection. The PTY-attached process +// must already be running (passed via tp); the session owns tp from this +// point forward and will close it on shutdown. +func StartSession(ctx context.Context, cfg SessionConfig, tp *Process, log logr.Logger) (*Session, error) { + udsPath := cfg.UDSPath // Best-effort cleanup of a stale socket file from a previous run. if _, statErr := os.Stat(udsPath); statErr == nil { _ = os.Remove(udsPath) @@ -70,20 +72,24 @@ func startTerminalSession(ctx context.Context, exe *apiv1.Executable, tp *termin lc := net.ListenConfig{} lis, err := lc.Listen(ctx, "unix", udsPath) if err != nil { - _ = tp.pty.Close() + _ = tp.PTY.Close() return nil, fmt.Errorf("failed to listen on terminal UDS %q: %w", udsPath, err) } - s := &terminalSession{ - log: log.WithValues("UDSPath", udsPath, "PID", tp.pid), + cols, rows := normalizeDimensions(cfg.Cols, cfg.Rows) + + s := &Session{ + log: log.WithValues("UDSPath", udsPath, "PID", tp.PID), listener: lis, udsPath: udsPath, + cols: cols, + rows: rows, tp: tp, exitDone: make(chan struct{}), doneCh: make(chan struct{}), } - go s.acceptLoop(ctx, exe.Spec.Terminal) + go s.acceptLoop(ctx) go s.watchExit() s.log.Info("Terminal session listening") @@ -94,7 +100,7 @@ func startTerminalSession(ctx context.Context, exe *apiv1.Executable, tp *termin // arrives while a previous one is still active, the previous connection is // closed (last-writer-wins), matching the single-viewer model of the initial // slice. -func (s *terminalSession) acceptLoop(ctx context.Context, spec *apiv1.TerminalSpec) { +func (s *Session) acceptLoop(ctx context.Context) { for { conn, err := s.listener.Accept() if err != nil { @@ -114,11 +120,11 @@ func (s *terminalSession) acceptLoop(ctx context.Context, spec *apiv1.TerminalSp s.mu.Unlock() s.connWg.Add(1) - go s.handleConn(ctx, conn, spec) + go s.handleConn(ctx, conn) } } -func (s *terminalSession) handleConn(ctx context.Context, conn net.Conn, spec *apiv1.TerminalSpec) { +func (s *Session) handleConn(ctx context.Context, conn net.Conn) { defer s.connWg.Done() defer func() { _ = conn.Close() @@ -129,10 +135,9 @@ func (s *terminalSession) handleConn(ctx context.Context, conn net.Conn, spec *a s.mu.Unlock() }() - cols, rows := terminalDimensionsForServe(spec) opts := hmp1.ServerOptions{ - InitialCols: cols, - InitialRows: rows, + InitialCols: s.cols, + InitialRows: s.rows, Log: s.log, } @@ -141,7 +146,7 @@ func (s *terminalSession) handleConn(ctx context.Context, conn net.Conn, spec *a // It blocks until the underlying process exit has been signalled and // returns the observed exit code, so the per-connection Exit frame // carries the real code rather than a synthetic 0. - if serveErr := hmp1.Serve(ctx, conn, s.tp.pty, s.waitProcessExit, opts); serveErr != nil { + if serveErr := hmp1.Serve(ctx, conn, s.tp.PTY, s.waitProcessExit, opts); serveErr != nil { s.log.V(1).Info("HMP v1 serve exited with error", "err", serveErr.Error()) } } @@ -149,7 +154,7 @@ func (s *terminalSession) handleConn(ctx context.Context, conn net.Conn, spec *a // signalProcessExit publishes the process exit code; subsequent calls to // waitProcessExit return immediately. Safe to call multiple times; only the // first call wins. -func (s *terminalSession) signalProcessExit(code int32) { +func (s *Session) signalProcessExit(code int32) { s.exitOnce.Do(func() { s.exitCode.Store(code) close(s.exitDone) @@ -158,7 +163,7 @@ func (s *terminalSession) signalProcessExit(code int32) { // waitProcessExit blocks until the underlying process has exited, then // returns the observed exit code. Used as the waitExit callback for hmp1.Serve. -func (s *terminalSession) waitProcessExit() int32 { +func (s *Session) waitProcessExit() int32 { <-s.exitDone return s.exitCode.Load() } @@ -168,17 +173,17 @@ func (s *terminalSession) waitProcessExit() int32 { // hmp1.Serve invocation drains and sends its Exit frame), waits a bounded // amount of time for those handlers to finish, then closes the listener and // removes the UDS file. -func (s *terminalSession) watchExit() { +func (s *Session) watchExit() { defer close(s.doneCh) - exitCode := s.tp.waitExit() + exitCode := s.tp.WaitExit() s.log.Info("Terminal-attached process exited", "exitCode", exitCode) s.signalProcessExit(exitCode) // Close the PTY to wake up any blocked Serve PTY pump. After this, Serve // will drain, call waitProcessExit (already unblocked), write the Exit // frame, and return. - if s.tp != nil && s.tp.pty != nil { - _ = s.tp.pty.Close() + if s.tp != nil && s.tp.PTY != nil { + _ = s.tp.PTY.Close() } // Give in-flight HMP v1 handlers a moment to flush their Exit frame @@ -194,7 +199,7 @@ func (s *terminalSession) watchExit() { // waitConnsOrTimeout blocks until all in-flight handleConn goroutines have // finished, or until the timeout elapses, whichever comes first. -func (s *terminalSession) waitConnsOrTimeout(timeout time.Duration) { +func (s *Session) waitConnsOrTimeout(timeout time.Duration) { done := make(chan struct{}) go func() { s.connWg.Wait() @@ -210,7 +215,7 @@ func (s *terminalSession) waitConnsOrTimeout(timeout time.Duration) { // ExitCode returns the observed process exit code if the underlying process // has exited; the second return value is true if the exit has been observed, // false otherwise. Safe to call concurrently with watchExit. -func (s *terminalSession) ExitCode() (int32, bool) { +func (s *Session) ExitCode() (int32, bool) { select { case <-s.exitDone: return s.exitCode.Load(), true @@ -221,7 +226,7 @@ func (s *terminalSession) ExitCode() (int32, bool) { // Done returns a channel that is closed once the underlying process exits and // the session has finished cleaning up. -func (s *terminalSession) Done() <-chan struct{} { +func (s *Session) Done() <-chan struct{} { return s.doneCh } @@ -231,7 +236,7 @@ func (s *terminalSession) Done() <-chan struct{} { // Unlike watchExit's graceful path, Close interrupts in-flight HMP v1 handlers // immediately. Use it when an external caller (e.g. controller-runtime // reconciliation tearing down the run) wants the listener gone NOW. -func (s *terminalSession) Close() error { +func (s *Session) Close() error { if !s.closed.CompareAndSwap(false, true) { return nil } @@ -253,8 +258,8 @@ func (s *terminalSession) Close() error { _ = conn.Close() } - if s.tp != nil && s.tp.pty != nil { - if err := s.tp.pty.Close(); err != nil && firstErr == nil { + if s.tp != nil && s.tp.PTY != nil { + if err := s.tp.PTY.Close(); err != nil && firstErr == nil { firstErr = err } } @@ -263,17 +268,3 @@ func (s *terminalSession) Close() error { _ = os.Remove(s.udsPath) return firstErr } - -func terminalDimensionsForServe(spec *apiv1.TerminalSpec) (cols, rows int) { - cols, rows = 80, 24 - if spec == nil { - return - } - if spec.Cols > 0 { - cols = int(spec.Cols) - } - if spec.Rows > 0 { - rows = int(spec.Rows) - } - return -} diff --git a/internal/exerunners/terminal_session_windows_test.go b/internal/termpty/session_windows_test.go similarity index 88% rename from internal/exerunners/terminal_session_windows_test.go rename to internal/termpty/session_windows_test.go index d1c70c75..e9106281 100644 --- a/internal/exerunners/terminal_session_windows_test.go +++ b/internal/termpty/session_windows_test.go @@ -5,7 +5,7 @@ //go:build windows -package exerunners +package termpty import ( "bytes" @@ -24,7 +24,6 @@ import ( "github.com/go-logr/logr/testr" - apiv1 "github.com/microsoft/dcp/api/v1" "github.com/microsoft/dcp/internal/hmp1" ) @@ -42,33 +41,26 @@ func TestTerminalSessionEndToEndWindows(t *testing.T) { udsPath := filepath.Join(t.TempDir(), "term.sock") - exe := &apiv1.Executable{ - Spec: apiv1.ExecutableSpec{ - ExecutablePath: `C:\Windows\System32\cmd.exe`, - Terminal: &apiv1.TerminalSpec{ - Enabled: true, - UDSPath: udsPath, - Cols: 100, - Rows: 30, - }, - }, - Status: apiv1.ExecutableStatus{ - EffectiveArgs: []string{"/c", "echo hello-world-from-conpty && exit 0"}, - }, - } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - tp, err := startTerminalProcess(ctx, exe) + tp, err := StartProcess(ctx, CommandSpec{ + CommandLine: BuildWindowsCommandLine(`C:\Windows\System32\cmd.exe`, []string{"/c", "echo hello-world-from-conpty && exit 0"}), + Cols: 100, + Rows: 30, + }) if err != nil { - t.Fatalf("startTerminalProcess: %v", err) + t.Fatalf("StartProcess: %v", err) } - session, err := startTerminalSession(ctx, exe, tp, log) + session, err := StartSession(ctx, SessionConfig{ + UDSPath: udsPath, + Cols: 100, + Rows: 30, + }, tp, log) if err != nil { - _ = tp.pty.Close() - t.Fatalf("startTerminalSession: %v", err) + _ = tp.PTY.Close() + t.Fatalf("StartSession: %v", err) } defer session.Close() From 7453b580b04035bb619dbdec78cf4681f4b99129 Mon Sep 17 00:00:00 2001 From: Mitch Denny Date: Tue, 5 May 2026 15:06:48 +1000 Subject: [PATCH 3/5] Use 'attach' instead of 'start --attach --interactive' for container PTY docker container start is documented to start STOPPED containers; on a running container, '--attach --interactive' is a no-op and the host PTY never wires up to PID 1's stdin/stdout. Because the container reconciler already starts the container detached (via StartContainers) before ensureContainerTerminalSession runs, the second 'start' call did nothing and the host saw no terminal traffic. Switch to 'attach' on the running container with: --sig-proxy=false - keep host-process Ctrl-C from killing the container; signals are delivered in-band via HMP v1 input bytes (e.g. 0x03). --detach-keys= - disable the default Ctrl-P,Ctrl-Q detach so those keystrokes pass through as raw bytes. Container creation already opens TTY+stdin via -t -i (in applyCreateContainerOptions), so 'attach' delivers a usable bidirectional PTY stream. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- controllers/container_terminal.go | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/controllers/container_terminal.go b/controllers/container_terminal.go index b74e1e01..155e03f1 100644 --- a/controllers/container_terminal.go +++ b/controllers/container_terminal.go @@ -16,11 +16,22 @@ import ( "github.com/microsoft/dcp/internal/termpty" ) -// startContainerTerminalSession spawns the container runtime CLI with -// `start --attach --interactive ` under a host PTY, then stands -// up an HMP v1 listener at spec.UDSPath that bridges viewer connections to -// that PTY. The returned Session owns the lifetime of both the CLI process -// and the listener; callers must Close it during teardown. +// startContainerTerminalSession runs the container runtime CLI's `attach` +// command against an already-running container under a host PTY, then +// stands up an HMP v1 listener at spec.UDSPath that bridges viewer +// connections to that PTY. The returned Session owns the lifetime of both +// the CLI process and the listener; callers must Close it during teardown. +// +// We use ` attach` (not ` start --attach --interactive`) +// because the container is already started by the time this is called via +// the reconciler's normal `docker container start ` path. Running +// `docker start --attach --interactive` against a running container is a +// no-op and would leave the container's primary process with no host-side +// stdin/stdout connection. +// +// `--sig-proxy=false` prevents the attach process from forwarding signals +// (e.g. SIGINT from the dashboard) to the container; signals are delivered +// in-band via the HMP v1 input channel as keystrokes (Ctrl-C → 0x03 byte). // // The container must have been created with `-t -i` (allocate TTY + keep // stdin open) for the attach to deliver a usable terminal; this is handled @@ -44,7 +55,12 @@ func startContainerTerminalSession( // "podman", possibly resolved against PATH); we don't actually start the // command via the orchestrator's process.Executor because we need direct // ConPTY semantics. - cmd := runner.MakeCommand("start", "--attach", "--interactive", containerID) + // + // `--detach-keys=""` disables the default Ctrl-P,Ctrl-Q detach sequence + // so those keystrokes are forwarded into the container as plain bytes + // (matching an interactive terminal's expectation that all keys reach + // the application). HMP v1 viewers manage detach themselves. + cmd := runner.MakeCommand("attach", "--sig-proxy=false", "--detach-keys=", containerID) commandLine := termpty.BuildWindowsCommandLine(cmd.Path, cmd.Args[1:]) @@ -54,7 +70,7 @@ func startContainerTerminalSession( "Terminal", true, "UDSPath", spec.UDSPath, ) - startLog.Info("Starting container attach under PTY...") + startLog.Info("Attaching container under PTY...") tp, err := termpty.StartProcess(ctx, termpty.CommandSpec{ CommandLine: commandLine, From 72debb27f9c5c1996f1766e8788b73c866515f96 Mon Sep 17 00:00:00 2001 From: Mitch Denny Date: Tue, 5 May 2026 16:05:49 +1000 Subject: [PATCH 4/5] Flip producer-side connection direction: DCP now dials the terminal host Previously DCP listened on the producer UDS and the Aspire terminal host dialed in. That ordering meant the terminal host could only attach AFTER DCP had already started the PTY-attached process AND opened its listener. Anything the underlying shell/process printed in that window (notably bash's first PS1 prompt) was lost, because docker attach (and a fresh PTY read pump in general) does not replay buffered TTY output. This commit reverses just the TCP role on the producer link: * Terminal host listens on the producer UDS (it owns the file now). * DCP dials in via dialUDSWithRetry (50 x 100ms) which absorbs the small window between the host process being marked running and its listener actually being bound. The resource-model WaitAnnotation already sequences the host before any resource that uses it. * The HMP v1 protocol roles are unchanged: DCP still serves the protocol (it owns the PTY data source); the host still consumes it. Only the underlying TCP direction flipped. Session lifecycle simplifications that fall out of the flip: * No listener field, no acceptLoop, no per-session multi-viewer slot. Exactly one connection per session. * DCP no longer owns the UDS file; os.Remove calls in StartSession, watchExit, and Close are gone. * The connection is established eagerly in StartSession, so a dial failure surfaces immediately to the caller. Why this fixes the missed-prompt symptom: with the new direction the terminal host's listener exists before DCP attempts to dial. Once DCP dials, hmp1.Serve immediately starts pumping PTY output. The terminal host's headless presentation captures that output into its own scrollback, and any dashboard/CLI viewer that connects later gets a StateSync replay from the host - even if their attach happens many seconds after the shell's first prompt. The Windows test now plays the listening side: net.Listen first, then StartSession (which dials), then Accept. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- internal/termpty/session.go | 192 ++++++++++++----------- internal/termpty/session_windows_test.go | 31 ++-- 2 files changed, 113 insertions(+), 110 deletions(-) diff --git a/internal/termpty/session.go b/internal/termpty/session.go index eec96919..5ccb9a88 100644 --- a/internal/termpty/session.go +++ b/internal/termpty/session.go @@ -10,7 +10,6 @@ import ( "errors" "fmt" "net" - "os" "sync" "sync/atomic" "time" @@ -20,37 +19,45 @@ import ( "github.com/microsoft/dcp/internal/hmp1" ) -// Session owns the lifecycle of a single PTY-attached process plus the -// HMP v1 listener that bridges viewer connections to that PTY. It is shared -// between the executable runner (one Session per executable replica started -// under WithTerminal) and the container reconciler (one Session per +// Session owns the lifecycle of a single PTY-attached process plus a single +// HMP v1 connection that bridges that PTY to the Aspire terminal host. It is +// shared between the executable runner (one Session per executable replica +// started under WithTerminal) and the container reconciler (one Session per // container started under WithTerminal). // +// Connection direction: DCP dials into the terminal host's listener. The +// terminal host owns the UDS file and is started before any Session is +// created (the resource model wires a WaitAnnotation that holds the target +// resource until the terminal host reports running). This direction means +// the terminal host is guaranteed to be receiving from the very first byte +// the PTY emits, so a long-running shell's initial prompt is captured even +// for late dashboard viewers (the terminal host preserves its own +// scrollback). +// // When the underlying process exits the session sends a final Exit frame to -// any connected viewer (via hmp1.Serve's waitExit hook), then tears down the -// listener. Calling Close() forces the same teardown. +// the connected viewer (via hmp1.Serve's waitExit hook), then closes the +// connection. Calling Close() forces the same teardown. type Session struct { - log logr.Logger - listener net.Listener - udsPath string - cols int - rows int + log logr.Logger + udsPath string + cols int + rows int tp *Process - mu sync.Mutex - currConn net.Conn - closed atomic.Bool + mu sync.Mutex + conn net.Conn + closed atomic.Bool - // Tracks active handleConn goroutines so graceful shutdown can wait for - // in-flight HMP1 sessions to flush their Exit frame before closing the - // listener. + // Tracks the in-flight handleConn goroutine so graceful shutdown can + // wait for the HMP1 session to flush its Exit frame before closing the + // connection. connWg sync.WaitGroup // exitDone is closed once the underlying PTY-attached process has exited; // exitCode then holds the observed exit code. This is the signal used by - // each handleConn's waitExit hook so that hmp1.Serve sends the real exit - // code on the per-connection Exit frame. + // handleConn's waitExit hook so that hmp1.Serve sends the real exit code + // on the Exit frame. exitOnce sync.Once exitCode atomic.Int32 exitDone chan struct{} @@ -58,79 +65,80 @@ type Session struct { doneCh chan struct{} } -// StartSession opens a listener at cfg.UDSPath, then spawns an accept loop -// that runs hmp1.Serve on each accepted connection. The PTY-attached process -// must already be running (passed via tp); the session owns tp from this -// point forward and will close it on shutdown. +// dialUDSWithRetry dials the given UDS path, retrying briefly to absorb the +// race where the terminal host has reported running but its listener has not +// yet bound. The WaitAnnotation in the resource model already sequences the +// terminal host before any resource that uses it, so this retry budget is +// only meant to cover the small window between the host process declaring +// itself ready and its listener actually existing on disk. +func dialUDSWithRetry(ctx context.Context, udsPath string) (net.Conn, error) { + const ( + attempts = 50 + every = 100 * time.Millisecond + ) + var lastErr error + for i := 0; i < attempts; i++ { + if err := ctx.Err(); err != nil { + return nil, err + } + var d net.Dialer + dialCtx, cancel := context.WithTimeout(ctx, every) + conn, err := d.DialContext(dialCtx, "unix", udsPath) + cancel() + if err == nil { + return conn, nil + } + lastErr = err + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(every): + } + } + return nil, fmt.Errorf("dial %q after %d attempts: %w", udsPath, attempts, lastErr) +} + +// StartSession dials cfg.UDSPath, then spawns an HMP v1 server on that +// connection. The PTY-attached process must already be running (passed via +// tp); the session owns tp from this point forward and will close it on +// shutdown. func StartSession(ctx context.Context, cfg SessionConfig, tp *Process, log logr.Logger) (*Session, error) { udsPath := cfg.UDSPath - // Best-effort cleanup of a stale socket file from a previous run. - if _, statErr := os.Stat(udsPath); statErr == nil { - _ = os.Remove(udsPath) - } - lc := net.ListenConfig{} - lis, err := lc.Listen(ctx, "unix", udsPath) + conn, err := dialUDSWithRetry(ctx, udsPath) if err != nil { _ = tp.PTY.Close() - return nil, fmt.Errorf("failed to listen on terminal UDS %q: %w", udsPath, err) + return nil, fmt.Errorf("failed to dial terminal UDS %q: %w", udsPath, err) } cols, rows := normalizeDimensions(cfg.Cols, cfg.Rows) s := &Session{ log: log.WithValues("UDSPath", udsPath, "PID", tp.PID), - listener: lis, udsPath: udsPath, cols: cols, rows: rows, tp: tp, + conn: conn, exitDone: make(chan struct{}), doneCh: make(chan struct{}), } - go s.acceptLoop(ctx) + s.connWg.Add(1) + go s.handleConn(ctx, conn) go s.watchExit() - s.log.Info("Terminal session listening") + s.log.Info("Terminal session connected to host") return s, nil } -// acceptLoop accepts incoming connections one at a time. If a new connection -// arrives while a previous one is still active, the previous connection is -// closed (last-writer-wins), matching the single-viewer model of the initial -// slice. -func (s *Session) acceptLoop(ctx context.Context) { - for { - conn, err := s.listener.Accept() - if err != nil { - if !s.closed.Load() && !errors.Is(err, net.ErrClosed) { - s.log.Error(err, "Terminal session accept loop terminated unexpectedly") - } - return - } - - // Take over the "current connection" slot, closing any prior one. - s.mu.Lock() - if prev := s.currConn; prev != nil { - s.log.V(1).Info("Closing previous terminal viewer connection in favor of new one") - _ = prev.Close() - } - s.currConn = conn - s.mu.Unlock() - - s.connWg.Add(1) - go s.handleConn(ctx, conn) - } -} - func (s *Session) handleConn(ctx context.Context, conn net.Conn) { defer s.connWg.Done() defer func() { _ = conn.Close() s.mu.Lock() - if s.currConn == conn { - s.currConn = nil + if s.conn == conn { + s.conn = nil } s.mu.Unlock() }() @@ -144,8 +152,8 @@ func (s *Session) handleConn(ctx context.Context, conn net.Conn) { // waitExit is invoked by hmp1.Serve after its PTY read pump returns // (i.e. when pty.Close has been called or an upstream EOF was observed). // It blocks until the underlying process exit has been signalled and - // returns the observed exit code, so the per-connection Exit frame - // carries the real code rather than a synthetic 0. + // returns the observed exit code, so the Exit frame carries the real + // code rather than a synthetic 0. if serveErr := hmp1.Serve(ctx, conn, s.tp.PTY, s.waitProcessExit, opts); serveErr != nil { s.log.V(1).Info("HMP v1 serve exited with error", "err", serveErr.Error()) } @@ -169,10 +177,11 @@ func (s *Session) waitProcessExit() int32 { } // watchExit blocks until the PTY process exits, then performs a graceful -// teardown: it publishes the exit code, closes the PTY (so any in-flight +// teardown: it publishes the exit code, closes the PTY (so the in-flight // hmp1.Serve invocation drains and sends its Exit frame), waits a bounded -// amount of time for those handlers to finish, then closes the listener and -// removes the UDS file. +// amount of time for the handler to finish, then closes the connection. The +// UDS file itself is owned by the terminal host (the listener side) and is +// not removed here. func (s *Session) watchExit() { defer close(s.doneCh) exitCode := s.tp.WaitExit() @@ -186,18 +195,23 @@ func (s *Session) watchExit() { _ = s.tp.PTY.Close() } - // Give in-flight HMP v1 handlers a moment to flush their Exit frame - // before we slam the listener shut. + // Give the in-flight HMP v1 handler a moment to flush its Exit frame + // before we slam the connection shut. s.waitConnsOrTimeout(2 * time.Second) if !s.closed.CompareAndSwap(false, true) { return } - _ = s.listener.Close() - _ = os.Remove(s.udsPath) + s.mu.Lock() + conn := s.conn + s.conn = nil + s.mu.Unlock() + if conn != nil { + _ = conn.Close() + } } -// waitConnsOrTimeout blocks until all in-flight handleConn goroutines have +// waitConnsOrTimeout blocks until the in-flight handleConn goroutine has // finished, or until the timeout elapses, whichever comes first. func (s *Session) waitConnsOrTimeout(timeout time.Duration) { done := make(chan struct{}) @@ -208,7 +222,7 @@ func (s *Session) waitConnsOrTimeout(timeout time.Duration) { select { case <-done: case <-time.After(timeout): - s.log.V(1).Info("Timed out waiting for HMP v1 handlers to drain; forcing teardown") + s.log.V(1).Info("Timed out waiting for HMP v1 handler to drain; forcing teardown") } } @@ -230,32 +244,32 @@ func (s *Session) Done() <-chan struct{} { return s.doneCh } -// Close stops the listener, closes any active connection, closes the PTY -// master, and removes the UDS file. Safe to call multiple times. +// Close closes the active connection, closes the PTY master, and signals +// teardown. Safe to call multiple times. // -// Unlike watchExit's graceful path, Close interrupts in-flight HMP v1 handlers -// immediately. Use it when an external caller (e.g. controller-runtime -// reconciliation tearing down the run) wants the listener gone NOW. +// Unlike watchExit's graceful path, Close interrupts the in-flight HMP v1 +// handler immediately. Use it when an external caller (e.g. +// controller-runtime reconciliation tearing down the run) wants the session +// gone NOW. The UDS file is owned by the terminal host and is not removed +// here. func (s *Session) Close() error { if !s.closed.CompareAndSwap(false, true) { return nil } // Best-effort: publish a 0 exit code if we don't have a real one yet, so - // any in-flight Serve invocations don't block forever inside waitExit. + // any in-flight Serve invocation doesn't block forever inside waitExit. s.signalProcessExit(0) var firstErr error - if err := s.listener.Close(); err != nil { - firstErr = err - } - s.mu.Lock() - conn := s.currConn - s.currConn = nil + conn := s.conn + s.conn = nil s.mu.Unlock() if conn != nil { - _ = conn.Close() + if err := conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { + firstErr = err + } } if s.tp != nil && s.tp.PTY != nil { @@ -264,7 +278,5 @@ func (s *Session) Close() error { } } - // Best-effort UDS file cleanup. - _ = os.Remove(s.udsPath) return firstErr } diff --git a/internal/termpty/session_windows_test.go b/internal/termpty/session_windows_test.go index e9106281..251106f5 100644 --- a/internal/termpty/session_windows_test.go +++ b/internal/termpty/session_windows_test.go @@ -16,7 +16,6 @@ import ( "fmt" "io" "net" - "os" "path/filepath" "strings" "testing" @@ -44,6 +43,15 @@ func TestTerminalSessionEndToEndWindows(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + // Test plays the role of the terminal host: listen on the UDS first, + // then start the session (which dials in), then accept the dialed + // connection. + lis, err := net.Listen("unix", udsPath) + if err != nil { + t.Fatalf("listen UDS: %v", err) + } + defer lis.Close() + tp, err := StartProcess(ctx, CommandSpec{ CommandLine: BuildWindowsCommandLine(`C:\Windows\System32\cmd.exe`, []string{"/c", "echo hello-world-from-conpty && exit 0"}), Cols: 100, @@ -64,15 +72,9 @@ func TestTerminalSessionEndToEndWindows(t *testing.T) { } defer session.Close() - // Give the listener a moment to actually bind. acceptLoop runs in a - // goroutine; on Windows UDS a Dial too soon can race the Listen. - if err := waitForUDS(udsPath, 2*time.Second); err != nil { - t.Fatalf("waiting for UDS to appear: %v", err) - } - - conn, err := net.Dial("unix", udsPath) + conn, err := lis.Accept() if err != nil { - t.Fatalf("dial UDS: %v", err) + t.Fatalf("accept UDS: %v", err) } defer conn.Close() @@ -178,14 +180,3 @@ func readClientFrame(r net.Conn) (hmp1.FrameType, []byte, error) { } return t, payload, nil } - -func waitForUDS(path string, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - if _, err := os.Stat(path); err == nil { - return nil - } - time.Sleep(20 * time.Millisecond) - } - return fmt.Errorf("UDS %q did not appear within %s", path, timeout) -} From 38f88c931b2f12fbdceb003372df1bef855a9232 Mon Sep 17 00:00:00 2001 From: Mitch Denny Date: Tue, 5 May 2026 21:44:55 +1000 Subject: [PATCH 5/5] Fix Session teardown race that cascade-killed sibling resources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously Session.Close() and watchExit() both raced to close the same ConPTY handles (PseudoConsole, pi.Process, pi.Thread, pty I/O pipes) while cp.Wait() was still polling WaitForSingleObject(pi.Process) in a 1-second loop. The double-close pattern is dangerous on Windows: when a HANDLE value is closed and the kernel recycles it for an unrelated object in the same process, a second CloseHandle on the original numeric value closes the unrelated object. If the recycled handle happens to be DCP's process cleanup job (JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE in pkg/process/os_executor_windows.go), Windows immediately terminates every process assigned to that job. In the Aspire WithTerminal scenario this means clicking Stop on a single PTY-attached resource cascade-kills the dashboard, every other terminal-host replica, and every sibling executable/container managed by the same DCP — while DCP itself stays alive because its own handle table is fine. Refactor watchExit() to be the *sole* owner of session teardown: * The PTY wait is isolated to a private inner goroutine that is the only code that ever observes pi.Process. It exits when the process exits naturally OR when watchExit closes the PTY. * watchExit selects on (waitDone, stopCh): natural exit takes the first branch; explicit Close() takes the second by closing stopCh. * In both branches watchExit calls PTY.Close exactly once, drains the inner wait goroutine, gives the in-flight HMP v1 handler a bounded window to flush its Exit frame, and finally closes the connection. * Close() no longer touches any conpty handles or the connection. It just signals teardown via stopCh (guarded by sync.Once) and blocks on doneCh with a 5s safety timeout. This eliminates the double-close pattern entirely. PTY handles are closed exactly once and only after the wait goroutine that owns them has been drained. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- internal/termpty/session.go | 131 +++++++++++++++++++++++++----------- 1 file changed, 90 insertions(+), 41 deletions(-) diff --git a/internal/termpty/session.go b/internal/termpty/session.go index 5ccb9a88..fddb459d 100644 --- a/internal/termpty/session.go +++ b/internal/termpty/session.go @@ -7,7 +7,6 @@ package termpty import ( "context" - "errors" "fmt" "net" "sync" @@ -45,9 +44,8 @@ type Session struct { tp *Process - mu sync.Mutex - conn net.Conn - closed atomic.Bool + mu sync.Mutex + conn net.Conn // Tracks the in-flight handleConn goroutine so graceful shutdown can // wait for the HMP1 session to flush its Exit frame before closing the @@ -62,6 +60,15 @@ type Session struct { exitCode atomic.Int32 exitDone chan struct{} + // stopCh is closed by Close() to request immediate teardown. watchExit + // is the sole owner of the PTY/conn close sequence and reads stopCh to + // distinguish a natural process exit from an explicit stop request. + // stopOnce guards the close(stopCh) call so Close() is idempotent. + stopOnce sync.Once + stopCh chan struct{} + + // doneCh is closed by watchExit's defer when the session has finished + // tearing itself down. Both Close() and Done() observe it. doneCh chan struct{} } @@ -121,6 +128,7 @@ func StartSession(ctx context.Context, cfg SessionConfig, tp *Process, log logr. tp: tp, conn: conn, exitDone: make(chan struct{}), + stopCh: make(chan struct{}), doneCh: make(chan struct{}), } @@ -176,32 +184,74 @@ func (s *Session) waitProcessExit() int32 { return s.exitCode.Load() } -// watchExit blocks until the PTY process exits, then performs a graceful -// teardown: it publishes the exit code, closes the PTY (so the in-flight -// hmp1.Serve invocation drains and sends its Exit frame), waits a bounded -// amount of time for the handler to finish, then closes the connection. The -// UDS file itself is owned by the terminal host (the listener side) and is -// not removed here. +// watchExit is the sole owner of session teardown. It runs concurrently with +// the in-flight HMP v1 handler and is the only goroutine that ever closes the +// PTY handles or the connection. It blocks until either: +// +// - the underlying PTY-attached process exits naturally (the inner wait +// goroutine produces an exit code on waitDone), or +// - Close() requests an explicit teardown by closing stopCh. +// +// In both cases watchExit calls PTY.Close exactly once, drains the inner wait +// goroutine, gives the in-flight HMP v1 handler a bounded window to flush its +// Exit frame, then closes the connection. The single-owner pattern eliminates +// the historical race where Close() and watchExit() both closed the conpty +// handles concurrently with cp.Wait's WaitForSingleObject(pi.Process) loop — +// that race could close a stale handle that Windows had already recycled for +// another component in the same process (e.g. DCP's process cleanup job +// handle), cascade-killing every job-assigned sibling resource. +// +// The UDS file itself is owned by the terminal host (the listener side) and +// is not removed here. func (s *Session) watchExit() { defer close(s.doneCh) - exitCode := s.tp.WaitExit() - s.log.Info("Terminal-attached process exited", "exitCode", exitCode) - s.signalProcessExit(exitCode) - // Close the PTY to wake up any blocked Serve PTY pump. After this, Serve - // will drain, call waitProcessExit (already unblocked), write the Exit - // frame, and return. + // Isolate the WaitForSingleObject call to a single goroutine so no other + // code path observes pi.Process. It will return when the process exits + // naturally or when PTY.Close (called by us, below) closes pi.Process. + waitDone := make(chan int32, 1) + go func() { + waitDone <- s.tp.WaitExit() + }() + + var ( + exitCode int32 + waitDrained bool + ) + select { + case exitCode = <-waitDone: + waitDrained = true + s.log.Info("Terminal-attached process exited", "exitCode", exitCode) + case <-s.stopCh: + s.log.V(1).Info("Terminal session received explicit stop request") + } + + // Single PTY.Close call. This kills the conhost (and the attached child + // if it was still running) and closes all conpty handles. After this, + // pty.Read in hmp1.Serve's read pump will fail and the in-flight handler + // will unwind so we can close the connection. if s.tp != nil && s.tp.PTY != nil { _ = s.tp.PTY.Close() } + // Drain the inner wait goroutine if we took the stopCh path. Closing the + // PTY caused pi.Process to be closed; the wait will return with + // WAIT_FAILED / ERROR_INVALID_HANDLE and the closure in pty_windows.go + // will report -1. + if !waitDrained { + select { + case <-waitDone: + case <-time.After(2 * time.Second): + s.log.V(1).Info("Timed out waiting for PTY wait goroutine to drain") + } + } + + s.signalProcessExit(exitCode) + // Give the in-flight HMP v1 handler a moment to flush its Exit frame // before we slam the connection shut. s.waitConnsOrTimeout(2 * time.Second) - if !s.closed.CompareAndSwap(false, true) { - return - } s.mu.Lock() conn := s.conn s.conn = nil @@ -244,39 +294,38 @@ func (s *Session) Done() <-chan struct{} { return s.doneCh } -// Close closes the active connection, closes the PTY master, and signals -// teardown. Safe to call multiple times. +// Close requests an immediate teardown of the session and blocks until +// watchExit has finished its single-owner cleanup of the PTY and connection. +// Safe to call multiple times. // -// Unlike watchExit's graceful path, Close interrupts the in-flight HMP v1 +// Unlike watchExit's natural-exit path, Close interrupts the in-flight HMP v1 // handler immediately. Use it when an external caller (e.g. // controller-runtime reconciliation tearing down the run) wants the session // gone NOW. The UDS file is owned by the terminal host and is not removed // here. +// +// The bounded wait protects the caller against a stuck teardown (e.g. the +// inner WaitForSingleObject hanging on a kernel handle that never signals); +// in practice teardown completes well within the timeout. func (s *Session) Close() error { - if !s.closed.CompareAndSwap(false, true) { - return nil - } - // Best-effort: publish a 0 exit code if we don't have a real one yet, so // any in-flight Serve invocation doesn't block forever inside waitExit. s.signalProcessExit(0) - var firstErr error - s.mu.Lock() - conn := s.conn - s.conn = nil - s.mu.Unlock() - if conn != nil { - if err := conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { - firstErr = err - } - } + // Trigger watchExit's stop branch. Idempotent. + s.stopOnce.Do(func() { + close(s.stopCh) + }) - if s.tp != nil && s.tp.PTY != nil { - if err := s.tp.PTY.Close(); err != nil && firstErr == nil { - firstErr = err - } + // Wait for watchExit to complete teardown. We do not touch any conpty + // handles or the connection ourselves — that is watchExit's exclusive + // responsibility. + const closeTimeout = 5 * time.Second + select { + case <-s.doneCh: + case <-time.After(closeTimeout): + s.log.Error(nil, "Timed out waiting for terminal session teardown", "timeout", closeTimeout.String()) } - return firstErr + return nil }