diff --git a/examples/sandbox_14_pty_test.py b/examples/sandbox_14_pty_test.py index efefc355..aec9b798 100644 --- a/examples/sandbox_14_pty_test.py +++ b/examples/sandbox_14_pty_test.py @@ -20,6 +20,8 @@ load_dotenv() EXPECTED_OUTPUT = "PTY_OK" +READY_OUTPUT = "PTY_READY" +DONE_OUTPUT = "PTY_DONE" PROMPT_MARKER = "$ " @@ -28,20 +30,44 @@ async def collect_output_until( marker: str, *, timeout: float = 30.0, + require_full_line: bool = False, ) -> bytes: """Read PTY output until the expected marker appears.""" async def _collect() -> bytes: output = b"" + marker_bytes = marker.encode() async for data in session.stream: output += data - if marker.encode() in output: + if require_full_line: + if any(line.rstrip(b"\r") == marker_bytes for line in output.splitlines()): + return output + elif marker_bytes in output: return output return output return await asyncio.wait_for(_collect(), timeout=timeout) +async def run_and_collect( + session: AsyncPTYSession, + command: str, + marker: str, + *, + timeout: float = 30.0, + require_full_line: bool = False, +) -> bytes: + """Send a command to the PTY and collect output until a marker appears.""" + + await session.stream.send(command.encode()) + return await collect_output_until( + session, + marker, + timeout=timeout, + require_full_line=require_full_line, + ) + + async def main() -> int: print("=" * 60) print("Low-level AsyncPTYSession Example") @@ -83,11 +109,34 @@ async def main() -> int: print(initial_output_text.rstrip()) print("-" * 60) + print("Verifying PTY input/output round-trip...") + try: + ready_output = await run_and_collect( + session, + f"printf '{READY_OUTPUT}\\n'\n", + READY_OUTPUT, + require_full_line=True, + ) + except asyncio.TimeoutError: + print("Timed out waiting for PTY round-trip confirmation.") + return 1 + + ready_output_text = ready_output.decode("utf-8", errors="replace") + print() + print("Received PTY round-trip output:") + print("-" * 60) + print(ready_output_text.rstrip()) + print("-" * 60) + print("Writing a simple command through the PTY stream...") - await session.stream.send(f"printf '{EXPECTED_OUTPUT}\\n'; pwd; exit\n".encode()) try: - output = await collect_output_until(session, EXPECTED_OUTPUT) + output = await run_and_collect( + session, + f"printf '{EXPECTED_OUTPUT}\\n'; pwd; printf '{DONE_OUTPUT}\\n'; exit\n", + DONE_OUTPUT, + require_full_line=True, + ) except asyncio.TimeoutError: print("Timed out waiting for PTY output.") return 1 @@ -102,6 +151,9 @@ async def main() -> int: if EXPECTED_OUTPUT not in output_text: print("Expected PTY output marker was not observed.") return 1 + if DONE_OUTPUT not in output_text: + print("PTY command completion marker was not observed.") + return 1 print() print("Low-level PTY session flow completed successfully.")