diff --git a/api/v1/container_types.go b/api/v1/container_types.go index 8c47b423..3ada710f 100644 --- a/api/v1/container_types.go +++ b/api/v1/container_types.go @@ -682,6 +682,15 @@ type ContainerSpec struct { // PEM formatted public certificates to be created in the container // +optional PemCertificates *ContainerPemCertificates `json:"pemCertificates,omitempty"` + + // Optional terminal/PTY configuration. When set and Enabled is true, the + // container's primary process is started under a host pseudo-terminal + // and its stdin/stdout/stderr are bridged to the configured UDS via + // HMP v1, instead of the container being run detached with separate log + // capture. Terminal mode requires Windows on the host (ConPTY) for the + // initial slice; non-Windows hosts return ErrTerminalNotSupported at + // startup. + Terminal *TerminalSpec `json:"terminal,omitempty"` } func (cs *ContainerSpec) Equal(other *ContainerSpec) bool { @@ -791,6 +800,10 @@ func (cs *ContainerSpec) Equal(other *ContainerSpec) bool { return false } + if !cs.Terminal.Equal(other.Terminal) { + return false + } + return true } diff --git a/api/v1/executable_types.go b/api/v1/executable_types.go index 765935d8..8a0cda61 100644 --- a/api/v1/executable_types.go +++ b/api/v1/executable_types.go @@ -261,6 +261,13 @@ type ExecutableSpec struct { // PEM formatted certificates to be written for the Executable // +optional PemCertificates *ExecutablePemCertificates `json:"pemCertificates,omitempty"` + + // Terminal, when non-nil and Enabled, allocates a pseudo-terminal for the + // Executable's process and exposes an HMP v1 producer endpoint that the + // Aspire terminal host connects to as a client. See TerminalSpec for + // details. + // +optional + Terminal *TerminalSpec `json:"terminal,omitempty"` } func (es ExecutableSpec) Equal(other ExecutableSpec) bool { @@ -314,6 +321,10 @@ func (es ExecutableSpec) Equal(other ExecutableSpec) bool { return false } + if !es.Terminal.Equal(other.Terminal) { + return false + } + return true } @@ -355,6 +366,8 @@ func (es ExecutableSpec) Validate(specPath *field.Path) field.ErrorList { errorList = append(errorList, es.PemCertificates.Validate(specPath.Child("pemCertificates"))...) + errorList = append(errorList, es.Terminal.Validate(specPath.Child("terminal"))...) + return errorList } diff --git a/api/v1/terminal_types.go b/api/v1/terminal_types.go new file mode 100644 index 00000000..8ad2eecc --- /dev/null +++ b/api/v1/terminal_types.go @@ -0,0 +1,94 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package v1 + +import ( + "k8s.io/apimachinery/pkg/util/validation/field" +) + +// TerminalSpec configures pseudo-terminal allocation for an Executable or +// Container replica and the HMP v1 producer endpoint that the Aspire terminal +// host connects to as a client. +// +// When Enabled is true, DCP allocates a PTY for the underlying process and +// listens on UDSPath (a Unix Domain Socket path on Linux/macOS, or a named pipe +// path on Windows in a follow-up). When the terminal host opens an HMP v1 +// connection, DCP starts an HMP v1 server on the connection and bridges: +// +// - PTY output (from the process's tty) -> HMP v1 Output frames +// - HMP v1 Input frames -> PTY input (process stdin) +// - HMP v1 Resize frames -> PTY resize (TIOCSWINSZ / ResizePseudoConsole) +// - Process exit -> HMP v1 Exit frame, then close +// +// The HMP v1 wire format is defined by the Aspire dashboard's terminal host +// (see Hex1b's Hmp1Protocol). DCP's responsibility is limited to PTY +// allocation, the listener, and frame translation. +type TerminalSpec struct { + // Enabled controls whether DCP allocates a PTY for the process and exposes + // an HMP v1 producer endpoint at UDSPath. + Enabled bool `json:"enabled,omitempty"` + + // UDSPath is the Unix Domain Socket path that DCP listens on for the + // terminal host's HMP v1 client connection. Required when Enabled is true. + UDSPath string `json:"udsPath,omitempty"` + + // Cols is the initial width of the pseudo-terminal in character columns. + // If zero, a sensible default (80) is used. + // +kubebuilder:default:=0 + Cols int32 `json:"cols,omitempty"` + + // Rows is the initial height of the pseudo-terminal in character rows. + // If zero, a sensible default (24) is used. + // +kubebuilder:default:=0 + Rows int32 `json:"rows,omitempty"` +} + +// Equal reports whether two TerminalSpec values are equal. +func (ts *TerminalSpec) Equal(other *TerminalSpec) bool { + if ts == other { + return true + } + if ts == nil || other == nil { + return false + } + return ts.Enabled == other.Enabled && + ts.UDSPath == other.UDSPath && + ts.Cols == other.Cols && + ts.Rows == other.Rows +} + +// Validate verifies the TerminalSpec content. +func (ts *TerminalSpec) Validate(specPath *field.Path) field.ErrorList { + errorList := field.ErrorList{} + if ts == nil { + return errorList + } + if ts.Enabled && ts.UDSPath == "" { + errorList = append(errorList, field.Invalid(specPath.Child("udsPath"), ts.UDSPath, "udsPath is required when Enabled is true.")) + } + if ts.Cols < 0 { + errorList = append(errorList, field.Invalid(specPath.Child("cols"), ts.Cols, "cols must be non-negative.")) + } + if ts.Rows < 0 { + errorList = append(errorList, field.Invalid(specPath.Child("rows"), ts.Rows, "rows must be non-negative.")) + } + return errorList +} + +// DeepCopyInto copies the receiver, writing into out. +func (in *TerminalSpec) DeepCopyInto(out *TerminalSpec) { + *out = *in +} + +// DeepCopy returns a deep copy of the TerminalSpec. +func (in *TerminalSpec) DeepCopy() *TerminalSpec { + if in == nil { + return nil + } + out := new(TerminalSpec) + in.DeepCopyInto(out) + return out +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b516e432..d08bcab0 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -739,6 +739,11 @@ func (in *ContainerSpec) DeepCopyInto(out *ContainerSpec) { *out = new(ContainerPemCertificates) (*in).DeepCopyInto(*out) } + if in.Terminal != nil { + in, out := &in.Terminal, &out.Terminal + *out = new(TerminalSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerSpec. @@ -1257,6 +1262,11 @@ func (in *ExecutableSpec) DeepCopyInto(out *ExecutableSpec) { *out = new(ExecutablePemCertificates) (*in).DeepCopyInto(*out) } + if in.Terminal != nil { + in, out := &in.Terminal, &out.Terminal + *out = new(TerminalSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExecutableSpec. diff --git a/controllers/container_controller.go b/controllers/container_controller.go index b0429fbc..488c5ad7 100644 --- a/controllers/container_controller.go +++ b/controllers/container_controller.go @@ -546,6 +546,10 @@ func ensureContainerStartingState( rcd.containerState = apiv1.ContainerStateRunning rcd.startAttemptFinishedAt = metav1.NowMicro() change |= statusChanged + + if termErr := r.ensureContainerTerminalSession(ctx, rcd, log); termErr != nil { + log.Error(termErr, "Failed to attach terminal session to running container; container is running but interactive terminal will be unavailable") + } case inspected != nil && inspected.Status == containers.ContainerStatusExited: rcd.containerState = apiv1.ContainerStateExited rcd.startAttemptFinishedAt = metav1.NowMicro() @@ -1481,6 +1485,13 @@ func (r *ContainerReconciler) startContainerWithOrchestrator(container *apiv1.Co if inspected.Status == containers.ContainerStatusRunning { log.V(1).Info("Container started") rcd.containerState = apiv1.ContainerStateRunning + + if err := r.ensureContainerTerminalSession(startupCtx, rcd, log); err != nil { + // Terminal attach failure is non-fatal: the container is + // running and observable via the orchestrator, but no + // interactive PTY is available. We log and continue. + log.Error(err, "Failed to attach terminal session to running container; container is running but interactive terminal will be unavailable") + } } else { log.V(1).Info("Container started and exited shortly after", "ContainerStatus", inspected.Status) rcd.containerState = apiv1.ContainerStateExited @@ -1557,6 +1568,7 @@ func (r *ContainerReconciler) deleteContainer(ctx context.Context, container *ap // or if the container has already finished starting/stopping and we know the outcome of either. defer rcd.deleteStartupLogFiles(log) + defer rcd.closeTerminalSession(log) if container.Spec.Persistent { log.V(1).Info("Container is not using Managed mode, leaving underlying resources") @@ -1588,6 +1600,53 @@ func (r *ContainerReconciler) cleanupDcpContainerResources(ctx context.Context, r.ReleaseContainerWatchForResource(container.UID, log) } +// ensureContainerTerminalSession attaches a host-side PTY to the running +// container and starts the HMP v1 listener at the configured UDS path, +// storing the resulting session on rcd. No-op if the container does not +// have terminal enabled or a session is already active. +// +// The container must have been created with `-t -i` for the attach to +// deliver a usable terminal; that is handled by applyCreateContainerOptions +// in the docker/podman orchestrator when ContainerSpec.Terminal is set. +// +// Errors here are non-fatal to the container lifecycle: the container is +// already running by the time this is called. The caller is expected to +// log the error and move on. +func (r *ContainerReconciler) ensureContainerTerminalSession( + ctx context.Context, + rcd *runningContainerData, + log logr.Logger, +) error { + if rcd == nil || rcd.runSpec == nil { + return nil + } + + terminal := rcd.runSpec.Terminal + if terminal == nil || !terminal.Enabled { + return nil + } + + if rcd.terminalSession != nil { + return nil + } + + if !rcd.hasValidContainerID() { + return errors.New("ensureContainerTerminalSession called without a valid container ID") + } + + runner, ok := r.orchestrator.(containers.CLICommandRunner) + if !ok { + return fmt.Errorf("container orchestrator %T does not implement containers.CLICommandRunner; terminal attach not supported", r.orchestrator) + } + + session, err := startContainerTerminalSession(ctx, runner, string(rcd.containerID), terminal, log) + if err != nil { + return err + } + rcd.terminalSession = session + return nil +} + func (r *ContainerReconciler) startContainerWithTimeout( parentCtx context.Context, containerName string, diff --git a/controllers/container_terminal.go b/controllers/container_terminal.go new file mode 100644 index 00000000..155e03f1 --- /dev/null +++ b/controllers/container_terminal.go @@ -0,0 +1,95 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package controllers + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + + apiv1 "github.com/microsoft/dcp/api/v1" + "github.com/microsoft/dcp/internal/containers" + "github.com/microsoft/dcp/internal/termpty" +) + +// startContainerTerminalSession runs the container runtime CLI's `attach` +// command against an already-running container under a host PTY, then +// stands up an HMP v1 listener at spec.UDSPath that bridges viewer +// connections to that PTY. The returned Session owns the lifetime of both +// the CLI process and the listener; callers must Close it during teardown. +// +// We use ` attach` (not ` start --attach --interactive`) +// because the container is already started by the time this is called via +// the reconciler's normal `docker container start ` path. Running +// `docker start --attach --interactive` against a running container is a +// no-op and would leave the container's primary process with no host-side +// stdin/stdout connection. +// +// `--sig-proxy=false` prevents the attach process from forwarding signals +// (e.g. SIGINT from the dashboard) to the container; signals are delivered +// in-band via the HMP v1 input channel as keystrokes (Ctrl-C → 0x03 byte). +// +// The container must have been created with `-t -i` (allocate TTY + keep +// stdin open) for the attach to deliver a usable terminal; this is handled +// automatically when ContainerSpec.Terminal != nil && Enabled by the docker +// and podman orchestrators' applyCreateContainerOptions helper. +// +// On hosts where DCP does not yet implement PTY allocation (currently +// non-Windows) this returns termpty.ErrTerminalNotSupported. +func startContainerTerminalSession( + ctx context.Context, + runner containers.CLICommandRunner, + containerID string, + spec *apiv1.TerminalSpec, + log logr.Logger, +) (*termpty.Session, error) { + if spec == nil || !spec.Enabled { + return nil, fmt.Errorf("startContainerTerminalSession: spec must be non-nil and Enabled") + } + + // Use MakeCommand to extract the configured CLI path (e.g. "docker" or + // "podman", possibly resolved against PATH); we don't actually start the + // command via the orchestrator's process.Executor because we need direct + // ConPTY semantics. + // + // `--detach-keys=""` disables the default Ctrl-P,Ctrl-Q detach sequence + // so those keystrokes are forwarded into the container as plain bytes + // (matching an interactive terminal's expectation that all keys reach + // the application). HMP v1 viewers manage detach themselves. + cmd := runner.MakeCommand("attach", "--sig-proxy=false", "--detach-keys=", containerID) + + commandLine := termpty.BuildWindowsCommandLine(cmd.Path, cmd.Args[1:]) + + startLog := log.WithValues( + "Cmd", cmd.Path, + "ContainerID", containerID, + "Terminal", true, + "UDSPath", spec.UDSPath, + ) + startLog.Info("Attaching container under PTY...") + + tp, err := termpty.StartProcess(ctx, termpty.CommandSpec{ + CommandLine: commandLine, + Cols: int(spec.Cols), + Rows: int(spec.Rows), + }) + if err != nil { + return nil, fmt.Errorf("starting container attach under PTY: %w", err) + } + + session, err := termpty.StartSession(ctx, termpty.SessionConfig{ + UDSPath: spec.UDSPath, + Cols: int(spec.Cols), + Rows: int(spec.Rows), + }, tp, startLog) + if err != nil { + // StartSession is responsible for closing tp.PTY when it fails. + return nil, fmt.Errorf("starting container terminal session listener: %w", err) + } + + return session, nil +} diff --git a/controllers/running_container_data.go b/controllers/running_container_data.go index c652f061..90b96dfc 100644 --- a/controllers/running_container_data.go +++ b/controllers/running_container_data.go @@ -18,6 +18,7 @@ import ( apiv1 "github.com/microsoft/dcp/api/v1" ct "github.com/microsoft/dcp/internal/containers" "github.com/microsoft/dcp/internal/health" + "github.com/microsoft/dcp/internal/termpty" usvc_io "github.com/microsoft/dcp/pkg/io" "github.com/microsoft/dcp/pkg/maps" "github.com/microsoft/dcp/pkg/osutil" @@ -87,6 +88,14 @@ type runningContainerData struct { // Whether health probes are enabled for the Container healthProbesEnabled *bool + + // terminalSession owns the host-side PTY + HMP v1 listener bridging the + // container's primary process to the Aspire terminal host. Non-nil only + // when the container was created with Spec.Terminal.Enabled and the + // attach to the running container succeeded. Closing the session tears + // down both the listener and the underlying CLI attach process (which + // in turn closes the container's stdin and detaches). + terminalSession *termpty.Session } const placeholderContainerIdPrefix = "__placeholder-" @@ -134,6 +143,10 @@ func (rcd *runningContainerData) Clone() *runningContainerData { networkConnections: stdmaps.Clone(rcd.networkConnections), runSpec: rcd.runSpec.DeepCopy(), healthProbeResults: stdmaps.Clone(rcd.healthProbeResults), + // terminalSession is shared (one runtime resource per container). + // Clones must reference the same Session; otherwise UpdateFrom + // would lose track of the in-flight attach process. + terminalSession: rcd.terminalSession, } pointers.SetValueFrom(&clone.exitCode, rcd.exitCode) @@ -228,6 +241,14 @@ func (rcd *runningContainerData) UpdateFrom(other *runningContainerData) bool { updated = true } + // terminalSession is a one-shot pointer assignment: once attached, it + // must not be cleared by stale clones replayed via deferred ops, so + // only propagate non-nil values forward. + if other.terminalSession != nil && rcd.terminalSession != other.terminalSession { + rcd.terminalSession = other.terminalSession + updated = true + } + return updated } @@ -278,6 +299,19 @@ func (rcd *runningContainerData) deleteStartupLogFiles(log logr.Logger) { } } +// closeTerminalSession tears down any host-side PTY + HMP v1 listener +// previously stood up for this container. Safe to call multiple times. +func (rcd *runningContainerData) closeTerminalSession(log logr.Logger) { + session := rcd.terminalSession + rcd.terminalSession = nil + if session == nil { + return + } + if err := session.Close(); err != nil { + log.V(1).Info("Error closing container terminal session", "Error", err.Error()) + } +} + func (rcd *runningContainerData) getStartupLogWriters() (usvc_io.ParagraphWriter, usvc_io.ParagraphWriter) { var stdoutWriter, stderrWriter usvc_io.ParagraphWriter diff --git a/go.mod b/go.mod index a3f9f626..90ea4cd6 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/microsoft/dcp go 1.26.1 require ( + github.com/UserExistsError/conpty v0.1.4 github.com/cenkalti/backoff/v4 v4.3.0 github.com/davidwartell/go-onecontext v1.0.2 github.com/emirpasic/gods v1.18.1 diff --git a/go.sum b/go.sum index efe3747c..0b432f7a 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1 github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= +github.com/UserExistsError/conpty v0.1.4 h1:+3FhJhiqhyEJa+K5qaK3/w6w+sN3Nh9O9VbJyBS02to= +github.com/UserExistsError/conpty v0.1.4/go.mod h1:PDglKIkX3O/2xVk0MV9a6bCWxRmPVfxqZoTG/5sSd9I= github.com/akavel/rsrc v0.10.2 h1:Zxm8V5eI1hW4gGaYsJQUhxpjkENuG91ki8B4zCrvEsw= github.com/akavel/rsrc v0.10.2/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/akutz/memconn v0.1.0 h1:NawI0TORU4hcOMsMr11g7vwlCdkYeLKXBcxWu2W/P8A= @@ -305,6 +307,7 @@ golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= diff --git a/internal/docker/cli_orchestrator.go b/internal/docker/cli_orchestrator.go index 6640055c..41cacd71 100644 --- a/internal/docker/cli_orchestrator.go +++ b/internal/docker/cli_orchestrator.go @@ -574,6 +574,13 @@ func applyCreateContainerOptions(args []string, options containers.CreateContain } } + if options.Terminal != nil && options.Terminal.Enabled { + // Allocate a TTY in the container and keep stdin open. Required so a + // later "docker start --attach --interactive" can bridge a host PTY + // to the container's primary process. + args = append(args, "-t", "-i") + } + args = append(args, options.RunArgs...) return args diff --git a/internal/exerunners/process_executable_runner.go b/internal/exerunners/process_executable_runner.go index d37982c5..1694f4a0 100644 --- a/internal/exerunners/process_executable_runner.go +++ b/internal/exerunners/process_executable_runner.go @@ -22,6 +22,7 @@ import ( "github.com/microsoft/dcp/controllers" "github.com/microsoft/dcp/internal/dcpproc" "github.com/microsoft/dcp/internal/logs" + "github.com/microsoft/dcp/internal/termpty" usvc_io "github.com/microsoft/dcp/pkg/io" "github.com/microsoft/dcp/pkg/osutil" "github.com/microsoft/dcp/pkg/pointers" @@ -35,6 +36,11 @@ type processRunState struct { stdOutFile *os.File stdErrFile *os.File cmdInfo string // Command line used to start the process, for logging purposes + + // terminalSession is non-nil when the run was started under a PTY via + // startTerminalRun. It owns the PTY and the HMP v1 listener; closing it + // terminates both. + terminalSession *termpty.Session } type ProcessExecutableRunner struct { @@ -55,6 +61,10 @@ func (r *ProcessExecutableRunner) StartRun( runChangeHandler controllers.RunChangeHandler, log logr.Logger, ) *controllers.ExecutableStartResult { + if exe.Spec.Terminal != nil && exe.Spec.Terminal.Enabled { + return r.startTerminalRun(ctx, exe, runChangeHandler, log) + } + cmd := makeCommand(exe) if osutil.IsWindows() { // On Windows we have seen some apps (e.g. Python uvicorn runner) sending Ctrl-C to the whole console group @@ -158,6 +168,19 @@ func (r *ProcessExecutableRunner) StopRun(ctx context.Context, runID controllers stopLog := log.WithValues("RunID", runID, "Command", runState.cmdInfo) stopLog.V(1).Info("Stopping run...") + // Terminal-attached runs don't go through the OSExecutor and don't have + // an entry in the underlying waiter table. Closing the terminal session + // closes the PTY master, which delivers SIGHUP / detaches the ConPTY and + // causes the child process to exit on its own. We then return without + // touching the OSExecutor stop machinery. + if runState.terminalSession != nil { + closeErr := runState.terminalSession.Close() + if closeErr != nil { + stopLog.Error(closeErr, "Failed to close terminal session cleanly") + } + return closeErr + } + // We want to make progress eventually, so we don't want to wait indefinitely for the process to stop. const ProcessStopTimeout = 15 * time.Second diff --git a/internal/exerunners/process_executable_runner_terminal.go b/internal/exerunners/process_executable_runner_terminal.go new file mode 100644 index 00000000..1a91e4d3 --- /dev/null +++ b/internal/exerunners/process_executable_runner_terminal.go @@ -0,0 +1,147 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package exerunners + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apiv1 "github.com/microsoft/dcp/api/v1" + "github.com/microsoft/dcp/controllers" + "github.com/microsoft/dcp/internal/termpty" + "github.com/microsoft/dcp/pkg/pointers" + "github.com/microsoft/dcp/pkg/process" +) + +// startTerminalRun is the PTY-attached counterpart to the regular StartRun +// flow. It allocates a pseudo-terminal, starts the executable inside it, and +// stands up an HMP v1 listener at exe.Spec.Terminal.UDSPath. +// +// On success the returned result has ExeState=Running, Pid set, and a +// StartWaitForRunCompletion function that, when invoked, fires the eventual +// OnRunCompleted callback once the underlying process exits. +func (r *ProcessExecutableRunner) startTerminalRun( + ctx context.Context, + exe *apiv1.Executable, + runChangeHandler controllers.RunChangeHandler, + log logr.Logger, +) *controllers.ExecutableStartResult { + startLog := log.WithValues("Cmd", exe.Spec.ExecutablePath, "Args", exe.Status.EffectiveArgs, "Terminal", true, "UDSPath", exe.Spec.Terminal.UDSPath) + startLog.Info("Starting process under PTY...") + + result := controllers.NewExecutableStartResult() + + cmdSpec := executableTerminalCommandSpec(exe) + tp, err := termpty.StartProcess(ctx, cmdSpec) + if err != nil { + startLog.Error(err, "Failed to start process under PTY") + result.CompletionTimestamp = metav1.NowMicro() + result.ExeState = apiv1.ExecutableStateFailedToStart + result.StartupError = err + runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result) + return result + } + + sessionCfg := termpty.SessionConfig{ + UDSPath: exe.Spec.Terminal.UDSPath, + Cols: int(exe.Spec.Terminal.Cols), + Rows: int(exe.Spec.Terminal.Rows), + } + session, err := termpty.StartSession(ctx, sessionCfg, tp, startLog) + if err != nil { + startLog.Error(err, "Failed to start terminal session listener") + result.CompletionTimestamp = metav1.NowMicro() + result.ExeState = apiv1.ExecutableStateFailedToStart + result.StartupError = err + runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result) + return result + } + + pid := process.Pid_t(tp.PID) + identityTime := time.Now() + runID := pidToRunID(pid) + + r.runningProcesses.Store(runID, &processRunState{ + identityTime: identityTime, + cmdInfo: exe.Spec.ExecutablePath, + terminalSession: session, + }) + + result.RunID = runID + pointers.SetValue(&result.Pid, int64(pid)) + result.ExeState = apiv1.ExecutableStateRunning + result.CompletionTimestamp = metav1.NowMicro() + + // We arm the run-completion watcher here, but it must not fire until the + // caller has invoked StartWaitForRunCompletion (see the contract on + // RunChangeHandler.OnStartupCompleted). + var armed atomic.Bool + armCh := make(chan struct{}) + result.StartWaitForRunCompletion = func() { + if armed.CompareAndSwap(false, true) { + close(armCh) + } + } + + go func() { + // Wait until the controller is ready to accept OnRunCompleted, or + // until the parent context is cancelled. + select { + case <-armCh: + case <-ctx.Done(): + // Even if the controller never armed us, drain the session so we + // don't leak goroutines. + session.Close() + <-session.Done() + return + } + + <-session.Done() + + // Pull the real exit code from the session, if we have one. + var exitCode *int32 + if state, found := r.runningProcesses.LoadAndDelete(runID); found && state.terminalSession != nil { + if code, ok := state.terminalSession.ExitCode(); ok { + ec := code + exitCode = &ec + } + } + + var runErr error + if errors.Is(ctx.Err(), context.Canceled) { + runErr = ctx.Err() + } + runChangeHandler.OnRunCompleted(runID, exitCode, runErr) + }() + + runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result) + return result +} + +// executableTerminalCommandSpec builds a termpty.CommandSpec from the +// Executable's effective configuration. Currently this is Windows-shaped +// (single command-line string for CreateProcessW); cross-platform support +// is tracked by ErrTerminalNotSupported on non-Windows hosts. +func executableTerminalCommandSpec(exe *apiv1.Executable) termpty.CommandSpec { + envBlock := make([]string, 0, len(exe.Status.EffectiveEnv)) + for _, e := range exe.Status.EffectiveEnv { + envBlock = append(envBlock, fmt.Sprintf("%s=%s", e.Name, e.Value)) + } + + return termpty.CommandSpec{ + CommandLine: termpty.BuildWindowsCommandLine(exe.Spec.ExecutablePath, exe.Status.EffectiveArgs), + Env: envBlock, + WorkingDirectory: exe.Spec.WorkingDirectory, + Cols: int(exe.Spec.Terminal.Cols), + Rows: int(exe.Spec.Terminal.Rows), + } +} diff --git a/internal/hmp1/server.go b/internal/hmp1/server.go new file mode 100644 index 00000000..99b90510 --- /dev/null +++ b/internal/hmp1/server.go @@ -0,0 +1,295 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +// Package hmp1 implements the server side of the Hex1b Multiplex Protocol +// version 1 (HMP v1) that DCP uses to expose pseudo-terminal output to the +// Aspire terminal host. The wire format is intentionally minimal: +// +// [type:1B][length:4B little-endian][payload:length bytes] +// +// Frame types (the producer / "server" side that DCP plays): +// +// 0x01 Hello JSON: {"version":1,"width":W,"height":H} S->C +// 0x02 StateSync raw ANSI bytes (current screen) S->C +// 0x03 Output raw ANSI bytes from PTY S->C +// 0x04 Input raw bytes to deliver to PTY stdin C->S +// 0x05 Resize [width:4B LE][height:4B LE] bidirectional +// 0x06 Exit [exitCode:4B LE] S->C (then close) +// +// The protocol is a strict superset of what the Hex1b client needs to render +// a terminal session and forward keystrokes/resize. Any frames the server +// receives outside of {Input, Resize} are logged at debug level and dropped. +package hmp1 + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "sync" + "time" + + "github.com/go-logr/logr" +) + +// Maximum payload length (16 MiB). Matches the Hex1b implementation. +const MaxPayloadLength = 16 * 1024 * 1024 + +// FrameType is the 1-byte frame discriminator at the head of every HMP v1 frame. +type FrameType byte + +const ( + FrameHello FrameType = 0x01 + FrameStateSync FrameType = 0x02 + FrameOutput FrameType = 0x03 + FrameInput FrameType = 0x04 + FrameResize FrameType = 0x05 + FrameExit FrameType = 0x06 +) + +// HelloPayload is the JSON payload of a Hello frame. +type HelloPayload struct { + Version int `json:"version"` + Width int `json:"width"` + Height int `json:"height"` +} + +// PTY is the abstraction the server needs over the underlying pseudo-terminal. +// Implementations must be safe for concurrent use by separate goroutines for +// Read, Write, Resize, and Wait (each call site uses a single goroutine). +type PTY interface { + // Read reads bytes from the PTY's master side (i.e. process stdout/stderr). + // Should return io.EOF when the process exits and the PTY is closed. + Read(p []byte) (int, error) + + // Write writes bytes to the PTY's master side (i.e. process stdin). + Write(p []byte) (int, error) + + // Resize updates the PTY window size. + Resize(cols, rows int) error + + // Close closes the PTY master. + Close() error +} + +// ServerOptions configures a Server invocation. +type ServerOptions struct { + // InitialCols / InitialRows are the dimensions sent in the Hello frame. + // Reasonable defaults are applied if either is zero. + InitialCols int + InitialRows int + + // Log receives diagnostic messages from the server. May be the zero + // logr.Logger to discard. + Log logr.Logger +} + +// Serve runs an HMP v1 server on the given connection, bridging conn <-> pty +// until either the PTY exits, the connection closes, or the context is +// cancelled. The exitCode reported back to the client (via the Exit frame) is +// taken from waitExit; it is invoked exactly once, after the PTY's read loop +// returns, and may block until the underlying process exits. If waitExit is +// nil the Exit frame is sent with exit code 0. +// +// Serve takes ownership of conn for the duration of the call: it will close +// conn before returning, both to release any goroutine blocked on conn.Read +// and to signal the viewer that the session is over. It does NOT close pty; +// the caller owns the PTY lifetime so it can be reused across multiple Serve +// invocations (reconnects). +func Serve(ctx context.Context, conn net.Conn, pty PTY, waitExit func() int32, opts ServerOptions) error { + defer conn.Close() + cols := opts.InitialCols + if cols <= 0 { + cols = 80 + } + rows := opts.InitialRows + if rows <= 0 { + rows = 24 + } + + helloBytes, err := json.Marshal(HelloPayload{Version: 1, Width: cols, Height: rows}) + if err != nil { + return fmt.Errorf("marshal Hello payload: %w", err) + } + + // Serialize frame writes; both Output and Exit are written from different + // goroutines (the read pump and the exit watcher). + var writeMu sync.Mutex + writeFrame := func(t FrameType, payload []byte) error { + writeMu.Lock() + defer writeMu.Unlock() + return writeFrameLocked(conn, t, payload) + } + + if err := writeFrame(FrameHello, helloBytes); err != nil { + return fmt.Errorf("write Hello: %w", err) + } + // Empty StateSync: the headless terminal in the Aspire terminal host + // rebuilds state from subsequent Output frames. + if err := writeFrame(FrameStateSync, nil); err != nil { + return fmt.Errorf("write StateSync: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // When the context is cancelled (either externally or by one of our pumps), + // nudge the connection's blocked Read to return immediately. This is the + // only way to unstick the conn->pty pump that is blocked inside + // readFrame's io.ReadFull. We do not close conn (the caller owns its + // lifetime), we just expire its read deadline. + cancelWatcher := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + if d, ok := conn.(interface{ SetReadDeadline(time.Time) error }); ok { + _ = d.SetReadDeadline(time.Unix(1, 0)) + } + case <-cancelWatcher: + } + }() + defer close(cancelWatcher) + + var wg sync.WaitGroup + errCh := make(chan error, 2) + + // Pump 1: PTY -> conn (Output frames). Exits when PTY EOFs / errors. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + buf := make([]byte, 32*1024) + for { + n, readErr := pty.Read(buf) + if n > 0 { + if writeErr := writeFrame(FrameOutput, buf[:n]); writeErr != nil { + errCh <- fmt.Errorf("write Output: %w", writeErr) + return + } + } + if readErr != nil { + if errors.Is(readErr, io.EOF) { + return + } + errCh <- fmt.Errorf("read PTY: %w", readErr) + return + } + } + }() + + // Pump 2: conn -> PTY (Input/Resize). Exits when connection closes / errors + // or the context is cancelled. + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + for { + select { + case <-ctx.Done(): + return + default: + } + + frameType, payload, readErr := readFrame(conn) + if readErr != nil { + // Shutdown signals (EOF, closed conn, our own deadline nudge) + // are silent. Other errors are real protocol failures and + // must propagate even if the context happens to be cancelled + // concurrently (e.g. because the PTY exited at the same time + // as we received an invalid frame). + if errors.Is(readErr, io.EOF) || errors.Is(readErr, net.ErrClosed) || errors.Is(readErr, os.ErrDeadlineExceeded) { + return + } + errCh <- fmt.Errorf("read frame: %w", readErr) + return + } + + switch frameType { + case FrameInput: + if _, writeErr := pty.Write(payload); writeErr != nil { + errCh <- fmt.Errorf("write PTY: %w", writeErr) + return + } + case FrameResize: + if len(payload) != 8 { + opts.Log.V(1).Info("dropping malformed Resize frame", "length", len(payload)) + continue + } + w := int(binary.LittleEndian.Uint32(payload[0:4])) + h := int(binary.LittleEndian.Uint32(payload[4:8])) + if resizeErr := pty.Resize(w, h); resizeErr != nil { + opts.Log.V(1).Info("PTY resize failed", "err", resizeErr.Error(), "cols", w, "rows", h) + } + default: + opts.Log.V(1).Info("ignoring unexpected frame from client", "type", byte(frameType), "length", len(payload)) + } + } + }() + + wg.Wait() + + // Send a best-effort Exit frame. If the remote side has already gone away + // (protocol error, dropped TCP, etc.) the write will block until the + // deadline fires; either way we move on. + var exitCode int32 + if waitExit != nil { + exitCode = waitExit() + } + exitPayload := make([]byte, 4) + binary.LittleEndian.PutUint32(exitPayload, uint32(exitCode)) + if d, ok := conn.(interface{ SetWriteDeadline(time.Time) error }); ok { + _ = d.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) + } + _ = writeFrame(FrameExit, exitPayload) + + select { + case err := <-errCh: + return err + default: + return nil + } +} + +func writeFrameLocked(w io.Writer, t FrameType, payload []byte) error { + if len(payload) > MaxPayloadLength { + return fmt.Errorf("hmp1: payload exceeds maximum (%d > %d)", len(payload), MaxPayloadLength) + } + header := [5]byte{} + header[0] = byte(t) + binary.LittleEndian.PutUint32(header[1:5], uint32(len(payload))) + if _, err := w.Write(header[:]); err != nil { + return err + } + if len(payload) > 0 { + if _, err := w.Write(payload); err != nil { + return err + } + } + return nil +} + +func readFrame(r io.Reader) (FrameType, []byte, error) { + var header [5]byte + if _, err := io.ReadFull(r, header[:]); err != nil { + return 0, nil, err + } + t := FrameType(header[0]) + length := binary.LittleEndian.Uint32(header[1:5]) + if length > MaxPayloadLength { + return 0, nil, fmt.Errorf("hmp1: payload length %d exceeds maximum %d", length, MaxPayloadLength) + } + if length == 0 { + return t, nil, nil + } + payload := make([]byte, length) + if _, err := io.ReadFull(r, payload); err != nil { + return 0, nil, err + } + return t, payload, nil +} diff --git a/internal/hmp1/server_test.go b/internal/hmp1/server_test.go new file mode 100644 index 00000000..775e8330 --- /dev/null +++ b/internal/hmp1/server_test.go @@ -0,0 +1,355 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package hmp1 + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "io" + "net" + "sync" + "testing" + "time" +) + +// fakePty is an in-memory PTY for tests. Reads pull from inbound (data +// "produced" by the user process); writes push into outbound (data going +// "to" the process). Resize calls are recorded for assertions. +type fakePty struct { + inbound chan []byte + outbound chan []byte + + mu sync.Mutex + closed bool + resizes []struct{ Cols, Rows int } +} + +func newFakePty() *fakePty { + return &fakePty{ + inbound: make(chan []byte, 16), + outbound: make(chan []byte, 16), + } +} + +func (f *fakePty) Read(p []byte) (int, error) { + chunk, ok := <-f.inbound + if !ok { + return 0, io.EOF + } + n := copy(p, chunk) + return n, nil +} + +func (f *fakePty) Write(p []byte) (int, error) { + out := make([]byte, len(p)) + copy(out, p) + f.outbound <- out + return len(p), nil +} + +func (f *fakePty) Resize(cols, rows int) error { + f.mu.Lock() + defer f.mu.Unlock() + f.resizes = append(f.resizes, struct{ Cols, Rows int }{cols, rows}) + return nil +} + +func (f *fakePty) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if !f.closed { + f.closed = true + close(f.inbound) + } + return nil +} + +func TestServeSendsHelloAndStateSyncOnConnect(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{InitialCols: 100, InitialRows: 30}) + }() + + // Read Hello. + frameType, payload := readFrameHelper(t, a) + if frameType != FrameHello { + t.Fatalf("expected Hello frame, got 0x%02x", frameType) + } + var hello HelloPayload + if err := json.Unmarshal(payload, &hello); err != nil { + t.Fatalf("unmarshal Hello: %v", err) + } + if hello.Version != 1 || hello.Width != 100 || hello.Height != 30 { + t.Errorf("unexpected Hello payload: %+v", hello) + } + + // Read StateSync. + frameType, payload = readFrameHelper(t, a) + if frameType != FrameStateSync { + t.Fatalf("expected StateSync frame, got 0x%02x", frameType) + } + if len(payload) != 0 { + t.Errorf("expected empty StateSync, got %d bytes", len(payload)) + } + + // Trigger PTY exit so Serve returns. + pty.Close() + + // Read terminal Exit frame. + frameType, _ = readFrameHelper(t, a) + if frameType != FrameExit { + t.Errorf("expected Exit frame after PTY EOF, got 0x%02x", frameType) + } + + if err := <-done; err != nil { + t.Errorf("Serve returned error: %v", err) + } +} + +func TestServeForwardsOutputFromPty(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{InitialCols: 80, InitialRows: 24}) + }() + + // Drain Hello + StateSync. + readFrameHelper(t, a) + readFrameHelper(t, a) + + // Push some PTY output. + pty.inbound <- []byte("hello world") + + // Should arrive as an Output frame. + frameType, payload := readFrameHelper(t, a) + if frameType != FrameOutput { + t.Fatalf("expected Output frame, got 0x%02x", frameType) + } + if string(payload) != "hello world" { + t.Errorf("unexpected Output payload: %q", string(payload)) + } + + pty.Close() + readFrameHelper(t, a) // Exit + <-done +} + +func TestServeForwardsInputToPty(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{}) + }() + + // Drain Hello + StateSync. + readFrameHelper(t, a) + readFrameHelper(t, a) + + // Send an Input frame. + writeFrameHelper(t, a, FrameInput, []byte("ls\r\n")) + + select { + case got := <-pty.outbound: + if string(got) != "ls\r\n" { + t.Errorf("unexpected PTY input: %q", string(got)) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for PTY input") + } + + pty.Close() + readFrameHelper(t, a) + <-done +} + +func TestServeForwardsResizeToPty(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{}) + }() + + readFrameHelper(t, a) + readFrameHelper(t, a) + + resizePayload := make([]byte, 8) + binary.LittleEndian.PutUint32(resizePayload[0:4], 132) + binary.LittleEndian.PutUint32(resizePayload[4:8], 50) + writeFrameHelper(t, a, FrameResize, resizePayload) + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + pty.mu.Lock() + count := len(pty.resizes) + pty.mu.Unlock() + if count > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + + pty.mu.Lock() + if len(pty.resizes) != 1 || pty.resizes[0].Cols != 132 || pty.resizes[0].Rows != 50 { + t.Errorf("unexpected PTY resize history: %+v", pty.resizes) + } + pty.mu.Unlock() + + pty.Close() + readFrameHelper(t, a) + <-done +} + +func TestServeSendsExitFrameWithExitCode(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, func() int32 { return 42 }, ServerOptions{}) + }() + + readFrameHelper(t, a) + readFrameHelper(t, a) + + pty.Close() + + frameType, payload := readFrameHelper(t, a) + if frameType != FrameExit { + t.Fatalf("expected Exit frame, got 0x%02x", frameType) + } + if len(payload) != 4 { + t.Fatalf("expected 4-byte Exit payload, got %d", len(payload)) + } + exitCode := int32(binary.LittleEndian.Uint32(payload)) + if exitCode != 42 { + t.Errorf("expected exit code 42, got %d", exitCode) + } + + <-done +} + +func TestServeRejectsOversizedFrame(t *testing.T) { + t.Parallel() + a, b := net.Pipe() + defer a.Close() + defer b.Close() + pty := newFakePty() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- Serve(ctx, b, pty, nil, ServerOptions{}) + }() + + readFrameHelper(t, a) // Hello + readFrameHelper(t, a) // StateSync + + // Craft a header that claims an oversize length without sending the body. + header := [5]byte{byte(FrameInput), 0, 0, 0, 0} + binary.LittleEndian.PutUint32(header[1:5], uint32(MaxPayloadLength+1)) + if _, err := a.Write(header[:]); err != nil { + t.Fatalf("write header: %v", err) + } + + // In production the PTY persists across viewer reconnects, so Serve does + // not close it on abort. Close it explicitly here to let the read pump + // exit, which is what production would do on shutdown anyway. + pty.Close() + + // Server should treat the frame as a fatal protocol error and exit; we + // observe that by Serve completing. + select { + case err := <-done: + if err == nil { + t.Errorf("expected Serve to return an error on oversize frame") + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for Serve to exit on oversize frame") + } +} + +// readFrameHelper performs a blocking read of one HMP v1 frame from r, +// failing the test on error. +func readFrameHelper(t *testing.T, r net.Conn) (FrameType, []byte) { + t.Helper() + _ = r.SetReadDeadline(time.Now().Add(3 * time.Second)) + defer r.SetReadDeadline(time.Time{}) + var header [5]byte + if _, err := io.ReadFull(r, header[:]); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) { + t.Fatalf("connection closed while expecting frame") + } + t.Fatalf("read header: %v", err) + } + t.Logf("got frame type=0x%02x len=%d", header[0], binary.LittleEndian.Uint32(header[1:5])) + length := binary.LittleEndian.Uint32(header[1:5]) + if length == 0 { + return FrameType(header[0]), nil + } + payload := make([]byte, length) + if _, err := io.ReadFull(r, payload); err != nil { + t.Fatalf("read payload: %v", err) + } + return FrameType(header[0]), payload +} + +func writeFrameHelper(t *testing.T, w net.Conn, ft FrameType, payload []byte) { + t.Helper() + header := [5]byte{} + header[0] = byte(ft) + binary.LittleEndian.PutUint32(header[1:5], uint32(len(payload))) + if _, err := w.Write(header[:]); err != nil { + t.Fatalf("write header: %v", err) + } + if len(payload) > 0 { + if _, err := w.Write(payload); err != nil { + t.Fatalf("write payload: %v", err) + } + } +} diff --git a/internal/podman/cli_orchestrator.go b/internal/podman/cli_orchestrator.go index fa3b281d..9c90e223 100644 --- a/internal/podman/cli_orchestrator.go +++ b/internal/podman/cli_orchestrator.go @@ -559,6 +559,13 @@ func applyCreateContainerOptions(args []string, options containers.CreateContain } } + if options.Terminal != nil && options.Terminal.Enabled { + // Allocate a TTY in the container and keep stdin open. Required so a + // later "podman start --attach --interactive" can bridge a host PTY + // to the container's primary process. + args = append(args, "-t", "-i") + } + args = append(args, options.RunArgs...) return args diff --git a/internal/termpty/config.go b/internal/termpty/config.go new file mode 100644 index 00000000..db1bde31 --- /dev/null +++ b/internal/termpty/config.go @@ -0,0 +1,35 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package termpty + +// SessionConfig captures the parameters the HMP v1 listener needs to accept +// terminal viewer connections. It mirrors the relevant subset of +// apiv1.TerminalSpec but lives in this package so it can be constructed +// from any caller (executable runner, container reconciler, ...). +type SessionConfig struct { + // UDSPath is the Unix domain socket path the listener binds to. + UDSPath string + + // Cols/Rows are the initial terminal dimensions the HMP v1 server + // reports to a connecting viewer if it doesn't supply its own. If + // either is <= 0 the defaults from normalizeDimensions are used. + Cols int + Rows int +} + +// normalizeDimensions returns the effective dimensions to use for an +// allocated PTY or for HMP1's initial Hello frame, applying the +// 80x24 fallback when either input is non-positive. +func normalizeDimensions(cols, rows int) (int, int) { + c, r := cols, rows + if c <= 0 { + c = 80 + } + if r <= 0 { + r = 24 + } + return c, r +} diff --git a/internal/termpty/pty.go b/internal/termpty/pty.go new file mode 100644 index 00000000..1e51db25 --- /dev/null +++ b/internal/termpty/pty.go @@ -0,0 +1,77 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +// Package termpty provides the terminal/PTY primitives used by both the +// executable runner (for executables started under WithTerminal) and the +// container reconciler (for containers started under WithTerminal). It +// abstracts over the platform-specific PTY implementation (ConPTY on +// Windows; not yet implemented on other platforms) and the HMP v1 listener +// that bridges the host PTY to the Aspire terminal host. +package termpty + +import ( + "context" + "errors" + + "github.com/microsoft/dcp/internal/hmp1" +) + +// ErrTerminalNotSupported is returned when terminal/PTY support is requested +// on a platform where DCP does not yet implement it. The initial slice +// (Aspire 13.4) supports Windows only via ConPTY; Linux/macOS support +// follows in a subsequent change. +var ErrTerminalNotSupported = errors.New("terminal/PTY support is not implemented on this platform") + +// Process represents a process running attached to a pseudo-terminal, +// plus the PTY interface used by the HMP v1 server to bridge it to the +// Aspire terminal host. +type Process struct { + // PTY is the ReadWriteCloser exposing the PTY master end. Implements + // hmp1.PTY (Read, Write, Resize, Close). + PTY hmp1.PTY + + // PID is the OS process id of the spawned process. + PID int + + // WaitExit blocks until the process exits and returns the exit code. + // Called at most once. + WaitExit func() int32 +} + +// CommandSpec captures everything needed to spawn a process attached to a +// freshly allocated pseudo-terminal. It deliberately mirrors only the inputs +// the underlying ConPTY API needs and is independent of any specific resource +// kind (Executable, Container, ...). +type CommandSpec struct { + // CommandLine is the full command line in CreateProcessW form (a single + // string with arguments embedded and quoted as appropriate). Callers can + // build this via BuildWindowsCommandLine. + CommandLine string + + // Env is the environment block passed to the child process (each entry + // is "KEY=VALUE"). May be nil to inherit the parent's environment. + Env []string + + // WorkingDirectory is the cwd for the child process. Empty inherits the + // parent's cwd. + WorkingDirectory string + + // Cols/Rows are the initial dimensions of the allocated PTY. If either + // is <= 0 the implementation falls back to a sensible default + // (currently 80x24). + Cols int + Rows int +} + +// StartProcess allocates a pseudo-terminal and starts a process attached to +// it according to the supplied CommandSpec. The returned Process must be +// closed (via Process.PTY.Close) by the caller when the process exits or the +// terminal is being torn down. +// +// The implementation is platform-specific. See pty_windows.go and +// pty_other.go. +func StartProcess(ctx context.Context, spec CommandSpec) (*Process, error) { + return startProcessImpl(ctx, spec) +} diff --git a/internal/termpty/pty_other.go b/internal/termpty/pty_other.go new file mode 100644 index 00000000..49791d1a --- /dev/null +++ b/internal/termpty/pty_other.go @@ -0,0 +1,26 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +//go:build !windows + +package termpty + +import ( + "context" +) + +// startProcessImpl is the non-Windows fallback. The initial slice +// (Aspire 13.4) implements PTY support on Windows via ConPTY only; Linux and +// macOS support is tracked as follow-up work. +func startProcessImpl(_ context.Context, _ CommandSpec) (*Process, error) { + return nil, ErrTerminalNotSupported +} + +// BuildWindowsCommandLine is a no-op stub on non-Windows platforms; it lets +// callers reference it unconditionally without a build tag. Returns the +// empty string so accidental use surfaces obviously. +func BuildWindowsCommandLine(_ string, _ []string) string { + return "" +} diff --git a/internal/termpty/pty_windows.go b/internal/termpty/pty_windows.go new file mode 100644 index 00000000..27deb179 --- /dev/null +++ b/internal/termpty/pty_windows.go @@ -0,0 +1,121 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +//go:build windows + +package termpty + +import ( + "context" + "fmt" + "strings" + + "github.com/UserExistsError/conpty" +) + +// conPtyAdapter wraps a *conpty.ConPty so it satisfies hmp1.PTY (specifically +// the Resize signature: cols, rows int). +type conPtyAdapter struct { + cp *conpty.ConPty +} + +func (a *conPtyAdapter) Read(p []byte) (int, error) { return a.cp.Read(p) } +func (a *conPtyAdapter) Write(p []byte) (int, error) { return a.cp.Write(p) } +func (a *conPtyAdapter) Close() error { return a.cp.Close() } +func (a *conPtyAdapter) Resize(cols, rows int) error { return a.cp.Resize(cols, rows) } + +// startProcessImpl allocates a Windows ConPTY, spawns a process attached to +// it, and returns a Process. The caller is responsible for calling +// PTY.Close() on shutdown. +func startProcessImpl(_ context.Context, spec CommandSpec) (*Process, error) { + if !conpty.IsConPtyAvailable() { + return nil, fmt.Errorf("ConPTY is not available on this Windows version: %w", conpty.ErrConPtyUnsupported) + } + + cols, rows := normalizeDimensions(spec.Cols, spec.Rows) + + options := []conpty.ConPtyOption{ + conpty.ConPtyDimensions(cols, rows), + } + if spec.Env != nil { + options = append(options, conpty.ConPtyEnv(spec.Env)) + } + if spec.WorkingDirectory != "" { + options = append(options, conpty.ConPtyWorkDir(spec.WorkingDirectory)) + } + + cp, err := conpty.Start(spec.CommandLine, options...) + if err != nil { + return nil, fmt.Errorf("failed to start process under ConPTY: %w", err) + } + + return &Process{ + PTY: &conPtyAdapter{cp: cp}, + PID: cp.Pid(), + WaitExit: func() int32 { + exitCode, waitErr := cp.Wait(context.Background()) + if waitErr != nil { + // Wait failures are best-effort: the connection's about to + // close anyway. Surface UnknownExitCode to make the situation + // visible in the terminal host. + return -1 + } + return int32(exitCode) + }, + }, nil +} + +// BuildWindowsCommandLine constructs a single command-line string suitable +// for CreateProcessW from a path + argv. Each token is wrapped in quotes +// and embedded quotes are escaped per the documented Windows argv parsing +// rules. +func BuildWindowsCommandLine(executablePath string, args []string) string { + var sb strings.Builder + sb.WriteString(quoteWindowsArg(executablePath)) + for _, a := range args { + sb.WriteByte(' ') + sb.WriteString(quoteWindowsArg(a)) + } + return sb.String() +} + +// quoteWindowsArg quotes a single argument per the rules CommandLineToArgvW +// uses. Empty strings, strings containing whitespace, or strings containing +// quotes get wrapped in double quotes; backslashes preceding a quote are +// doubled. +func quoteWindowsArg(arg string) string { + if arg == "" { + return `""` + } + if !strings.ContainsAny(arg, " \t\n\v\"") { + return arg + } + var sb strings.Builder + sb.WriteByte('"') + backslashes := 0 + for _, r := range arg { + switch r { + case '\\': + backslashes++ + case '"': + for i := 0; i < backslashes*2; i++ { + sb.WriteByte('\\') + } + backslashes = 0 + sb.WriteString(`\"`) + default: + for i := 0; i < backslashes; i++ { + sb.WriteByte('\\') + } + backslashes = 0 + sb.WriteRune(r) + } + } + for i := 0; i < backslashes*2; i++ { + sb.WriteByte('\\') + } + sb.WriteByte('"') + return sb.String() +} diff --git a/internal/termpty/session.go b/internal/termpty/session.go new file mode 100644 index 00000000..fddb459d --- /dev/null +++ b/internal/termpty/session.go @@ -0,0 +1,331 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package termpty + +import ( + "context" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/go-logr/logr" + + "github.com/microsoft/dcp/internal/hmp1" +) + +// Session owns the lifecycle of a single PTY-attached process plus a single +// HMP v1 connection that bridges that PTY to the Aspire terminal host. It is +// shared between the executable runner (one Session per executable replica +// started under WithTerminal) and the container reconciler (one Session per +// container started under WithTerminal). +// +// Connection direction: DCP dials into the terminal host's listener. The +// terminal host owns the UDS file and is started before any Session is +// created (the resource model wires a WaitAnnotation that holds the target +// resource until the terminal host reports running). This direction means +// the terminal host is guaranteed to be receiving from the very first byte +// the PTY emits, so a long-running shell's initial prompt is captured even +// for late dashboard viewers (the terminal host preserves its own +// scrollback). +// +// When the underlying process exits the session sends a final Exit frame to +// the connected viewer (via hmp1.Serve's waitExit hook), then closes the +// connection. Calling Close() forces the same teardown. +type Session struct { + log logr.Logger + udsPath string + cols int + rows int + + tp *Process + + mu sync.Mutex + conn net.Conn + + // Tracks the in-flight handleConn goroutine so graceful shutdown can + // wait for the HMP1 session to flush its Exit frame before closing the + // connection. + connWg sync.WaitGroup + + // exitDone is closed once the underlying PTY-attached process has exited; + // exitCode then holds the observed exit code. This is the signal used by + // handleConn's waitExit hook so that hmp1.Serve sends the real exit code + // on the Exit frame. + exitOnce sync.Once + exitCode atomic.Int32 + exitDone chan struct{} + + // stopCh is closed by Close() to request immediate teardown. watchExit + // is the sole owner of the PTY/conn close sequence and reads stopCh to + // distinguish a natural process exit from an explicit stop request. + // stopOnce guards the close(stopCh) call so Close() is idempotent. + stopOnce sync.Once + stopCh chan struct{} + + // doneCh is closed by watchExit's defer when the session has finished + // tearing itself down. Both Close() and Done() observe it. + doneCh chan struct{} +} + +// dialUDSWithRetry dials the given UDS path, retrying briefly to absorb the +// race where the terminal host has reported running but its listener has not +// yet bound. The WaitAnnotation in the resource model already sequences the +// terminal host before any resource that uses it, so this retry budget is +// only meant to cover the small window between the host process declaring +// itself ready and its listener actually existing on disk. +func dialUDSWithRetry(ctx context.Context, udsPath string) (net.Conn, error) { + const ( + attempts = 50 + every = 100 * time.Millisecond + ) + var lastErr error + for i := 0; i < attempts; i++ { + if err := ctx.Err(); err != nil { + return nil, err + } + var d net.Dialer + dialCtx, cancel := context.WithTimeout(ctx, every) + conn, err := d.DialContext(dialCtx, "unix", udsPath) + cancel() + if err == nil { + return conn, nil + } + lastErr = err + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(every): + } + } + return nil, fmt.Errorf("dial %q after %d attempts: %w", udsPath, attempts, lastErr) +} + +// StartSession dials cfg.UDSPath, then spawns an HMP v1 server on that +// connection. The PTY-attached process must already be running (passed via +// tp); the session owns tp from this point forward and will close it on +// shutdown. +func StartSession(ctx context.Context, cfg SessionConfig, tp *Process, log logr.Logger) (*Session, error) { + udsPath := cfg.UDSPath + + conn, err := dialUDSWithRetry(ctx, udsPath) + if err != nil { + _ = tp.PTY.Close() + return nil, fmt.Errorf("failed to dial terminal UDS %q: %w", udsPath, err) + } + + cols, rows := normalizeDimensions(cfg.Cols, cfg.Rows) + + s := &Session{ + log: log.WithValues("UDSPath", udsPath, "PID", tp.PID), + udsPath: udsPath, + cols: cols, + rows: rows, + tp: tp, + conn: conn, + exitDone: make(chan struct{}), + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + } + + s.connWg.Add(1) + go s.handleConn(ctx, conn) + go s.watchExit() + + s.log.Info("Terminal session connected to host") + return s, nil +} + +func (s *Session) handleConn(ctx context.Context, conn net.Conn) { + defer s.connWg.Done() + defer func() { + _ = conn.Close() + s.mu.Lock() + if s.conn == conn { + s.conn = nil + } + s.mu.Unlock() + }() + + opts := hmp1.ServerOptions{ + InitialCols: s.cols, + InitialRows: s.rows, + Log: s.log, + } + + // waitExit is invoked by hmp1.Serve after its PTY read pump returns + // (i.e. when pty.Close has been called or an upstream EOF was observed). + // It blocks until the underlying process exit has been signalled and + // returns the observed exit code, so the Exit frame carries the real + // code rather than a synthetic 0. + if serveErr := hmp1.Serve(ctx, conn, s.tp.PTY, s.waitProcessExit, opts); serveErr != nil { + s.log.V(1).Info("HMP v1 serve exited with error", "err", serveErr.Error()) + } +} + +// signalProcessExit publishes the process exit code; subsequent calls to +// waitProcessExit return immediately. Safe to call multiple times; only the +// first call wins. +func (s *Session) signalProcessExit(code int32) { + s.exitOnce.Do(func() { + s.exitCode.Store(code) + close(s.exitDone) + }) +} + +// waitProcessExit blocks until the underlying process has exited, then +// returns the observed exit code. Used as the waitExit callback for hmp1.Serve. +func (s *Session) waitProcessExit() int32 { + <-s.exitDone + return s.exitCode.Load() +} + +// watchExit is the sole owner of session teardown. It runs concurrently with +// the in-flight HMP v1 handler and is the only goroutine that ever closes the +// PTY handles or the connection. It blocks until either: +// +// - the underlying PTY-attached process exits naturally (the inner wait +// goroutine produces an exit code on waitDone), or +// - Close() requests an explicit teardown by closing stopCh. +// +// In both cases watchExit calls PTY.Close exactly once, drains the inner wait +// goroutine, gives the in-flight HMP v1 handler a bounded window to flush its +// Exit frame, then closes the connection. The single-owner pattern eliminates +// the historical race where Close() and watchExit() both closed the conpty +// handles concurrently with cp.Wait's WaitForSingleObject(pi.Process) loop — +// that race could close a stale handle that Windows had already recycled for +// another component in the same process (e.g. DCP's process cleanup job +// handle), cascade-killing every job-assigned sibling resource. +// +// The UDS file itself is owned by the terminal host (the listener side) and +// is not removed here. +func (s *Session) watchExit() { + defer close(s.doneCh) + + // Isolate the WaitForSingleObject call to a single goroutine so no other + // code path observes pi.Process. It will return when the process exits + // naturally or when PTY.Close (called by us, below) closes pi.Process. + waitDone := make(chan int32, 1) + go func() { + waitDone <- s.tp.WaitExit() + }() + + var ( + exitCode int32 + waitDrained bool + ) + select { + case exitCode = <-waitDone: + waitDrained = true + s.log.Info("Terminal-attached process exited", "exitCode", exitCode) + case <-s.stopCh: + s.log.V(1).Info("Terminal session received explicit stop request") + } + + // Single PTY.Close call. This kills the conhost (and the attached child + // if it was still running) and closes all conpty handles. After this, + // pty.Read in hmp1.Serve's read pump will fail and the in-flight handler + // will unwind so we can close the connection. + if s.tp != nil && s.tp.PTY != nil { + _ = s.tp.PTY.Close() + } + + // Drain the inner wait goroutine if we took the stopCh path. Closing the + // PTY caused pi.Process to be closed; the wait will return with + // WAIT_FAILED / ERROR_INVALID_HANDLE and the closure in pty_windows.go + // will report -1. + if !waitDrained { + select { + case <-waitDone: + case <-time.After(2 * time.Second): + s.log.V(1).Info("Timed out waiting for PTY wait goroutine to drain") + } + } + + s.signalProcessExit(exitCode) + + // Give the in-flight HMP v1 handler a moment to flush its Exit frame + // before we slam the connection shut. + s.waitConnsOrTimeout(2 * time.Second) + + s.mu.Lock() + conn := s.conn + s.conn = nil + s.mu.Unlock() + if conn != nil { + _ = conn.Close() + } +} + +// waitConnsOrTimeout blocks until the in-flight handleConn goroutine has +// finished, or until the timeout elapses, whichever comes first. +func (s *Session) waitConnsOrTimeout(timeout time.Duration) { + done := make(chan struct{}) + go func() { + s.connWg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(timeout): + s.log.V(1).Info("Timed out waiting for HMP v1 handler to drain; forcing teardown") + } +} + +// ExitCode returns the observed process exit code if the underlying process +// has exited; the second return value is true if the exit has been observed, +// false otherwise. Safe to call concurrently with watchExit. +func (s *Session) ExitCode() (int32, bool) { + select { + case <-s.exitDone: + return s.exitCode.Load(), true + default: + return 0, false + } +} + +// Done returns a channel that is closed once the underlying process exits and +// the session has finished cleaning up. +func (s *Session) Done() <-chan struct{} { + return s.doneCh +} + +// Close requests an immediate teardown of the session and blocks until +// watchExit has finished its single-owner cleanup of the PTY and connection. +// Safe to call multiple times. +// +// Unlike watchExit's natural-exit path, Close interrupts the in-flight HMP v1 +// handler immediately. Use it when an external caller (e.g. +// controller-runtime reconciliation tearing down the run) wants the session +// gone NOW. The UDS file is owned by the terminal host and is not removed +// here. +// +// The bounded wait protects the caller against a stuck teardown (e.g. the +// inner WaitForSingleObject hanging on a kernel handle that never signals); +// in practice teardown completes well within the timeout. +func (s *Session) Close() error { + // Best-effort: publish a 0 exit code if we don't have a real one yet, so + // any in-flight Serve invocation doesn't block forever inside waitExit. + s.signalProcessExit(0) + + // Trigger watchExit's stop branch. Idempotent. + s.stopOnce.Do(func() { + close(s.stopCh) + }) + + // Wait for watchExit to complete teardown. We do not touch any conpty + // handles or the connection ourselves — that is watchExit's exclusive + // responsibility. + const closeTimeout = 5 * time.Second + select { + case <-s.doneCh: + case <-time.After(closeTimeout): + s.log.Error(nil, "Timed out waiting for terminal session teardown", "timeout", closeTimeout.String()) + } + + return nil +} diff --git a/internal/termpty/session_windows_test.go b/internal/termpty/session_windows_test.go new file mode 100644 index 00000000..251106f5 --- /dev/null +++ b/internal/termpty/session_windows_test.go @@ -0,0 +1,182 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See LICENSE in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +//go:build windows + +package termpty + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/go-logr/logr/testr" + + "github.com/microsoft/dcp/internal/hmp1" +) + +// TestTerminalSessionEndToEndWindows is the slice's headline integration test: +// it spawns a real cmd.exe process under ConPTY, listens for an HMP v1 client +// on a UDS, connects from the test as a fake terminal-host client, and asserts +// the full Hello -> StateSync -> Output -> Exit frame sequence is observed +// with sensible content. +func TestTerminalSessionEndToEndWindows(t *testing.T) { + if testing.Short() { + t.Skip("skipping end-to-end ConPTY test in short mode") + } + + log := testr.New(t) + + udsPath := filepath.Join(t.TempDir(), "term.sock") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Test plays the role of the terminal host: listen on the UDS first, + // then start the session (which dials in), then accept the dialed + // connection. + lis, err := net.Listen("unix", udsPath) + if err != nil { + t.Fatalf("listen UDS: %v", err) + } + defer lis.Close() + + tp, err := StartProcess(ctx, CommandSpec{ + CommandLine: BuildWindowsCommandLine(`C:\Windows\System32\cmd.exe`, []string{"/c", "echo hello-world-from-conpty && exit 0"}), + Cols: 100, + Rows: 30, + }) + if err != nil { + t.Fatalf("StartProcess: %v", err) + } + + session, err := StartSession(ctx, SessionConfig{ + UDSPath: udsPath, + Cols: 100, + Rows: 30, + }, tp, log) + if err != nil { + _ = tp.PTY.Close() + t.Fatalf("StartSession: %v", err) + } + defer session.Close() + + conn, err := lis.Accept() + if err != nil { + t.Fatalf("accept UDS: %v", err) + } + defer conn.Close() + + // Read frames until we see Exit (or hit a timeout). Collect all Output + // payloads so we can assert on their concatenation. + _ = conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + + var ( + sawHello bool + sawStateSync bool + sawExit bool + exitCode int32 + outputBuf bytes.Buffer + ) + +readLoop: + for { + frameType, payload, readErr := readClientFrame(conn) + if readErr != nil { + if errors.Is(readErr, io.EOF) { + break readLoop + } + t.Fatalf("read frame: %v", readErr) + } + switch frameType { + case hmp1.FrameHello: + sawHello = true + var p hmp1.HelloPayload + if err := json.Unmarshal(payload, &p); err != nil { + t.Fatalf("Hello payload was not valid JSON (%v): %q", err, string(payload)) + } + if p.Version != 1 { + t.Errorf("Hello.Version = %d, want 1", p.Version) + } + if p.Width != 100 || p.Height != 30 { + t.Errorf("Hello dimensions = %dx%d, want 100x30", p.Width, p.Height) + } + case hmp1.FrameStateSync: + sawStateSync = true + case hmp1.FrameOutput: + outputBuf.Write(payload) + case hmp1.FrameExit: + sawExit = true + if len(payload) != 4 { + t.Errorf("Exit payload length = %d, want 4", len(payload)) + } else { + exitCode = int32(binary.LittleEndian.Uint32(payload)) + } + break readLoop + default: + t.Logf("ignoring unexpected frame type 0x%02x len=%d", frameType, len(payload)) + } + } + + if !sawHello { + t.Errorf("did not receive Hello frame") + } + if !sawStateSync { + t.Errorf("did not receive StateSync frame") + } + if !sawExit { + t.Errorf("did not receive Exit frame") + } + if exitCode != 0 { + t.Errorf("exit code = %d, want 0", exitCode) + } + + // ConPTY output includes ANSI escape sequences plus the echoed command + // line (because ConPTY runs interactively). What we assert is that the + // literal payload string appears somewhere in the rendered output. + out := outputBuf.String() + if !strings.Contains(out, "hello-world-from-conpty") { + t.Errorf("expected Output to contain %q, got %q", "hello-world-from-conpty", out) + } + + // Wait for the session to finish tearing itself down. + select { + case <-session.Done(): + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for terminal session to finish") + } +} + +// readClientFrame reads a single HMP v1 frame from the client side of the +// connection (mirroring the server's readFrame helper but kept local to the +// test to avoid exporting internal helpers). +func readClientFrame(r net.Conn) (hmp1.FrameType, []byte, error) { + var header [5]byte + if _, err := io.ReadFull(r, header[:]); err != nil { + return 0, nil, err + } + t := hmp1.FrameType(header[0]) + length := binary.LittleEndian.Uint32(header[1:5]) + if length > hmp1.MaxPayloadLength { + return 0, nil, fmt.Errorf("frame length %d exceeds max %d", length, hmp1.MaxPayloadLength) + } + if length == 0 { + return t, nil, nil + } + payload := make([]byte, length) + if _, err := io.ReadFull(r, payload); err != nil { + return 0, nil, err + } + return t, payload, nil +}