From a8488148b0970392db46f5ce78049c3802a64ebd Mon Sep 17 00:00:00 2001 From: Liam Brady Date: Mon, 26 May 2025 17:03:52 -0400 Subject: [PATCH 1/2] munet: add Commander.async_spawn() spawn(), which conducts a send/expect process is blocking due to the abscence of async_=True to pexpect.expect() within the method's main send/expect loop. Some async methods call spawn(), however, which leads to such methods being blocked. This is particularly problematic in cases where multiple VMs requiring custom console expect/send prompts are required (e.g. routers). Since each QEMU VM sets up both console logging and completes an expect/send loop without yielding control to the coroutine of the other QEMU VMs, only one VM can ever have logging configured (and thus advance in an expect/send loop) at any given time. Not only is this inefficient given that all QEMU VMs are running and waiting for input, but this can be fatal if console logging is not set up on each VM in time. If some mandatory console output is missed, then there is no way to recover and the setup of the QEMU VM will time out. However, setting async_=True in pexpect.expect() leads to errors when mixed with PopenSpawn. To bypass this issue, we do not use pexpect.expect() with async_=True, but repeatedly call pexpect.expect() with a timeout set at 0.1 (Until a timeout that we track expires). The end result of this change is that both logging is set up on all QEMU nodes immediately and that independent progress can be made in each VM's expect/send loop simultaneously. Signed-off-by: Liam Brady --- munet/base.py | 147 +++++++++++++++++++++++++++++++++++++++++++++++- munet/native.py | 7 ++- 2 files changed, 152 insertions(+), 2 deletions(-) diff --git a/munet/base.py b/munet/base.py index e9410d4..44744fe 100644 --- a/munet/base.py +++ b/munet/base.py @@ -727,6 +727,151 @@ def spawn( p.close() raise error from eoferr + async def async_spawn( + self, + cmd, + spawned_re, + expects=(), + sends=(), + use_pty=False, + logfile=None, + logfile_read=None, + logfile_send=None, + trace=None, + **kwargs, + ): + """Create an async spawned send/expect process. + + Args: + cmd: list of args to exec/popen with, or an already open socket + spawned_re: what to look for to know when done, `spawn` returns when seen + expects: a list of regex other than `spawned_re` to look for. Commonly, + "ogin:" or "[Pp]assword:"r. + sends: what to send when an element of `expects` matches. So e.g., the + username or password if thats what corresponding expect matched. Can + be the empty string to send nothing. + use_pty: true for pty based expect, otherwise uses popen (pipes/files) + trace: if true then log send/expects + **kwargs - kwargs passed on the _spawn. + + Returns: + A pexpect process. + + Raises: + pexpect.TIMEOUT, pexpect.EOF as documented in `pexpect` + CalledProcessError if EOF is seen and `cmd` exited then + raises a CalledProcessError to indicate the failure. + """ + if is_file_like(cmd): + assert not use_pty + ac = "*socket*" + p = self._fdspawn(cmd, **kwargs) + else: + p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs) + + if logfile: + p.logfile = logfile + if logfile_read: + p.logfile_read = logfile_read + if logfile_send: + p.logfile_send = logfile_send + + # for spawned shells (i.e., a direct command an not a console) + # this is wrong and will cause 2 prompts + if not use_pty: + # This isn't very nice looking + p.echo = False + if not is_file_like(cmd): + p.isalive = lambda: p.proc.poll() is None + if not hasattr(p, "close"): + p.close = p.wait + + # Do a quick check to see if we got the prompt right away, otherwise we may be + # at a console so we send a \n to re-issue the prompt + index = p.expect([spawned_re, pexpect.TIMEOUT, pexpect.EOF], timeout=0.1) + if index == 0: + assert p.match is not None + self.logger.debug( + "%s: got spawned_re quick: '%s' matching '%s'", + self, + p.match.group(0), + spawned_re, + ) + return p + + # Now send a CRLF to cause the prompt (or whatever else) to re-issue + p.send("\n") + try: + patterns = [spawned_re, *expects] + + self.logger.debug("%s: expecting: %s", self, patterns) + + # The timestamp is only used for the case of use_pty != True + timeout = kwargs.get("timeout", 120) + timeout_ts = datetime.datetime.now() + datetime.timedelta(seconds=timeout) + index = None + while True: + if use_pty is True: + index = await p.expect(patterns, async_=True) + else: + # Due to an upstream issue, async_=True cannot be mixed with + # pipes (pexpect.popen_spawn.PopenSpawn). This hack is used + # to bypass that problem. + await asyncio.sleep(0) # Avoid blocking other coroutines + try: + index = p.expect(patterns, timeout=0.1) + except pexpect.TIMEOUT: + # We must declare when a timeout occurs instead of pexpect + if timeout_ts < datetime.datetime.now(): + raise + continue + if index == 0: + break + + if trace: + assert p.match is not None + self.logger.debug( + "%s: got expect: '%s' matching %d '%s', sending '%s'", + self, + p.match.group(0), + index, + patterns[index], + sends[index - 1], + ) + if sends[index - 1]: + p.send(sends[index - 1]) + + self.logger.debug("%s: expecting again: %s", self, patterns) + self.logger.debug( + "%s: got spawned_re: '%s' matching '%s'", + self, + p.match.group(0), + spawned_re, + ) + return p + except pexpect.TIMEOUT: + self.logger.error( + "%s: TIMEOUT looking for spawned_re '%s' expect buffer so far:\n%s", + self, + spawned_re, + indent(p.buffer), + ) + raise + except pexpect.EOF as eoferr: + if p.isalive(): + raise + rc = p.status + before = indent(p.before) + error = CalledProcessError(rc, ac, output=before) + self.logger.error( + "%s: EOF looking for spawned_re '%s' before EOF:\n%s", + self, + spawned_re, + before, + ) + p.close() + raise error from eoferr + async def shell_spawn( self, cmd, @@ -761,7 +906,7 @@ async def shell_spawn( combined_prompt = r"({}|{})".format(re.escape(PEXPECT_PROMPT), prompt) assert not is_file_like(cmd) or not use_pty - p = self.spawn( + p = await self.async_spawn( cmd, combined_prompt, expects=expects, diff --git a/munet/native.py b/munet/native.py index 43246fe..04c47e7 100644 --- a/munet/native.py +++ b/munet/native.py @@ -911,7 +911,12 @@ async def monitor( logfile_read = open(lfname, "a+", encoding="utf-8") logfile_read.write("-- start read logging for: '{}' --\n".format(sock)) - p = self.spawn(sock, prompt, logfile=logfile, logfile_read=logfile_read) + p = await self.async_spawn( + sock, + prompt, + logfile=logfile, + logfile_read=logfile_read, + ) from .base import ShellWrapper # pylint: disable=C0415 p.send("\n") From fc1a5609720d3b17faa5be4ab27f00212ab4fca3 Mon Sep 17 00:00:00 2001 From: Liam Brady Date: Tue, 24 Jun 2025 12:06:19 -0400 Subject: [PATCH 2/2] munet: minor code duplication improvements tests: cover error cases, both sync/async This commit introduces a minor refactor that collects sync code used by both sync/async spawn() into a single callable method. The majority of sync/async spawn() is untouched due to the fundamental difference that asyncio brings with it to the send/expect loop. New tests are introduced such that both sync/async spawn() methods are tested separately. Movever, the error cases are also now tested. Signed-off-by: Liam Brady --- munet/base.py | 97 +++++++++++++++++----------------- tests/control/test_spawn.py | 100 +++++++++++++++++++++++++++++++++++- 2 files changed, 150 insertions(+), 47 deletions(-) diff --git a/munet/base.py b/munet/base.py index 44744fe..673e2d9 100644 --- a/munet/base.py +++ b/munet/base.py @@ -603,6 +603,41 @@ def _spawn(self, cmd, skip_pre_cmd=False, use_pty=False, echo=False, **kwargs): p = pexpect.spawn(actual_cmd[0], actual_cmd[1:], echo=echo, **defaults) return p, actual_cmd + def _spawn_with_logging( + self, + cmd, + use_pty=False, + logfile=None, + logfile_read=None, + logfile_send=None, + **kwargs, + ): + """Create a spawned process with logging to files configured.""" + if is_file_like(cmd): + assert not use_pty + ac = "*socket*" + p = self._fdspawn(cmd, **kwargs) + else: + p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs) + + if logfile: + p.logfile = logfile + if logfile_read: + p.logfile_read = logfile_read + if logfile_send: + p.logfile_send = logfile_send + + # for spawned shells (i.e., a direct command an not a console) + # this is wrong and will cause 2 prompts + if not use_pty: + # This isn't very nice looking + p.echo = False + if not is_file_like(cmd): + p.isalive = lambda: p.proc.poll() is None + if not hasattr(p, "close"): + p.close = p.wait + return p, ac + def spawn( self, cmd, @@ -638,29 +673,14 @@ def spawn( CalledProcessError if EOF is seen and `cmd` exited then raises a CalledProcessError to indicate the failure. """ - if is_file_like(cmd): - assert not use_pty - ac = "*socket*" - p = self._fdspawn(cmd, **kwargs) - else: - p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs) - - if logfile: - p.logfile = logfile - if logfile_read: - p.logfile_read = logfile_read - if logfile_send: - p.logfile_send = logfile_send - - # for spawned shells (i.e., a direct command an not a console) - # this is wrong and will cause 2 prompts - if not use_pty: - # This isn't very nice looking - p.echo = False - if not is_file_like(cmd): - p.isalive = lambda: p.proc.poll() is None - if not hasattr(p, "close"): - p.close = p.wait + p, ac = self._spawn_with_logging( + cmd, + use_pty, + logfile, + logfile_read, + logfile_send, + **kwargs, + ) # Do a quick check to see if we got the prompt right away, otherwise we may be # at a console so we send a \n to re-issue the prompt @@ -762,29 +782,14 @@ async def async_spawn( CalledProcessError if EOF is seen and `cmd` exited then raises a CalledProcessError to indicate the failure. """ - if is_file_like(cmd): - assert not use_pty - ac = "*socket*" - p = self._fdspawn(cmd, **kwargs) - else: - p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs) - - if logfile: - p.logfile = logfile - if logfile_read: - p.logfile_read = logfile_read - if logfile_send: - p.logfile_send = logfile_send - - # for spawned shells (i.e., a direct command an not a console) - # this is wrong and will cause 2 prompts - if not use_pty: - # This isn't very nice looking - p.echo = False - if not is_file_like(cmd): - p.isalive = lambda: p.proc.poll() is None - if not hasattr(p, "close"): - p.close = p.wait + p, ac = self._spawn_with_logging( + cmd, + use_pty, + logfile, + logfile_read, + logfile_send, + **kwargs, + ) # Do a quick check to see if we got the prompt right away, otherwise we may be # at a console so we send a \n to re-issue the prompt diff --git a/tests/control/test_spawn.py b/tests/control/test_spawn.py index 4a53c07..bc0de36 100644 --- a/tests/control/test_spawn.py +++ b/tests/control/test_spawn.py @@ -9,6 +9,10 @@ import logging import os import time +import pexpect +import logging +import munet +import re import pytest @@ -53,7 +57,7 @@ async def _test_repl(unet, hostname, cmd, use_pty, will_echo=False): @pytest.mark.parametrize("host", ["host1", "container1", "remote1", "hn1"]) @pytest.mark.parametrize("mode", ["pty", "piped"]) @pytest.mark.parametrize("shellcmd", ["/bin/bash", "/bin/dash", "/usr/bin/ksh"]) -async def test_spawn(unet_share, host, mode, shellcmd): +async def test_shell_expect(unet_share, host, mode, shellcmd): unet = unet_share if not os.path.exists(shellcmd): pytest.skip(f"{shellcmd} not installed skipping") @@ -98,3 +102,97 @@ async def test_spawn(unet_share, host, mode, shellcmd): # this is required for setns() restoration to work for non-pty (piped) bash if mode != "pty": repl.child.proc.kill() + + +@pytest.mark.parametrize("host_name", ["host1", "container1", "remote1", "hn1"]) +@pytest.mark.parametrize("mode", ["pty", "piped"]) +@pytest.mark.parametrize("shellcmd", ["/bin/bash", "/bin/dash", "/usr/bin/ksh"]) +# As of 6-24-25, spawn is unused in the munet API. It is still maintained for the future +async def test_spawn(unet_share, host_name, mode, shellcmd): + unet = unet_share + if not os.path.exists(shellcmd): + pytest.skip(f"{shellcmd} not installed skipping") + + use_pty = mode == "pty" + prompt = r"(^|\r?\n)[^#\$]*[#\$] " + + if use_pty: + cmd = [shellcmd] + else: + cmd = [shellcmd, "-si"] + + host = unet.hosts[host_name] + time.sleep(1) + + p = host.spawn( + cmd, + prompt, + use_pty=use_pty, + timeout=1, + ) + + p.send("\n") + + # Sanity check, create a value in p's STDOUT that doesn't appear within p's STDIN + p.send("echo $(( 3 * 7 )) \n") + + try: + index = p.expect([r"21"], timeout=1) + assert index == 0 + except pexpect.TIMEOUT: + host.logger.critical("test_spawn: Expect failed with the process returned by spawn()") + assert False + finally: + p.kill(9) + + +@pytest.mark.parametrize("mode", ['async', 'sync']) +@pytest.mark.parametrize("catch_err", ["timeout", "eof"]) +async def test_spawn_err(unet_share, mode, catch_err): + unet = unet_share + hostname = "host1" + prompt = r"foo" + expects = ["bar"] + sends = ["baz"] + + if catch_err == "timeout": + expected_error = pexpect.TIMEOUT + cmd = ["/bin/bash"] + elif catch_err == "eof": + expected_error = munet.base.CalledProcessError + cmd = ["/bin/sleep", "1"] # A command that exits instantly results in a broken pipe + + host = unet.hosts[hostname] + time.sleep(1) + + saved_level = host.logger.getEffectiveLevel() + host.logger.setLevel(logging.CRITICAL) # Hide expected error messages for duration of test + + try: + if mode == 'async': + p = await host.async_spawn( + cmd, + prompt, + expects=expects, + sends=sends, + use_pty=False, + echo=False, + timeout=1, + ) + host.logger.critical("test_spawn_err: async_spawn unexpectedly succeeded") + else: + p = host.spawn( + cmd, + prompt, + expects=expects, + sends=sends, + use_pty=False, + echo=False, + timeout=1, + ) + host.logger.critical("test_spawn_err: spawn unexpectedly succeeded") + assert False + except expected_error: + pass + finally: + host.logger.setLevel(saved_level) # Revert to initial logging level