diff --git a/munet/base.py b/munet/base.py index e9410d4..673e2d9 100644 --- a/munet/base.py +++ b/munet/base.py @@ -603,6 +603,41 @@ def _spawn(self, cmd, skip_pre_cmd=False, use_pty=False, echo=False, **kwargs): p = pexpect.spawn(actual_cmd[0], actual_cmd[1:], echo=echo, **defaults) return p, actual_cmd + def _spawn_with_logging( + self, + cmd, + use_pty=False, + logfile=None, + logfile_read=None, + logfile_send=None, + **kwargs, + ): + """Create a spawned process with logging to files configured.""" + if is_file_like(cmd): + assert not use_pty + ac = "*socket*" + p = self._fdspawn(cmd, **kwargs) + else: + p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs) + + if logfile: + p.logfile = logfile + if logfile_read: + p.logfile_read = logfile_read + if logfile_send: + p.logfile_send = logfile_send + + # for spawned shells (i.e., a direct command an not a console) + # this is wrong and will cause 2 prompts + if not use_pty: + # This isn't very nice looking + p.echo = False + if not is_file_like(cmd): + p.isalive = lambda: p.proc.poll() is None + if not hasattr(p, "close"): + p.close = p.wait + return p, ac + def spawn( self, cmd, @@ -638,29 +673,123 @@ def spawn( CalledProcessError if EOF is seen and `cmd` exited then raises a CalledProcessError to indicate the failure. """ - if is_file_like(cmd): - assert not use_pty - ac = "*socket*" - p = self._fdspawn(cmd, **kwargs) - else: - p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs) + p, ac = self._spawn_with_logging( + cmd, + use_pty, + logfile, + logfile_read, + logfile_send, + **kwargs, + ) - if logfile: - p.logfile = logfile - if logfile_read: - p.logfile_read = logfile_read - if logfile_send: - p.logfile_send = logfile_send + # Do a quick check to see if we got the prompt right away, otherwise we may be + # at a console so we send a \n to re-issue the prompt + index = p.expect([spawned_re, pexpect.TIMEOUT, pexpect.EOF], timeout=0.1) + if index == 0: + assert p.match is not None + self.logger.debug( + "%s: got spawned_re quick: '%s' matching '%s'", + self, + p.match.group(0), + spawned_re, + ) + return p - # for spawned shells (i.e., a direct command an not a console) - # this is wrong and will cause 2 prompts - if not use_pty: - # This isn't very nice looking - p.echo = False - if not is_file_like(cmd): - p.isalive = lambda: p.proc.poll() is None - if not hasattr(p, "close"): - p.close = p.wait + # Now send a CRLF to cause the prompt (or whatever else) to re-issue + p.send("\n") + try: + patterns = [spawned_re, *expects] + + self.logger.debug("%s: expecting: %s", self, patterns) + + while index := p.expect(patterns): + if trace: + assert p.match is not None + self.logger.debug( + "%s: got expect: '%s' matching %d '%s', sending '%s'", + self, + p.match.group(0), + index, + patterns[index], + sends[index - 1], + ) + if sends[index - 1]: + p.send(sends[index - 1]) + + self.logger.debug("%s: expecting again: %s", self, patterns) + self.logger.debug( + "%s: got spawned_re: '%s' matching '%s'", + self, + p.match.group(0), + spawned_re, + ) + return p + except pexpect.TIMEOUT: + self.logger.error( + "%s: TIMEOUT looking for spawned_re '%s' expect buffer so far:\n%s", + self, + spawned_re, + indent(p.buffer), + ) + raise + except pexpect.EOF as eoferr: + if p.isalive(): + raise + rc = p.status + before = indent(p.before) + error = CalledProcessError(rc, ac, output=before) + self.logger.error( + "%s: EOF looking for spawned_re '%s' before EOF:\n%s", + self, + spawned_re, + before, + ) + p.close() + raise error from eoferr + + async def async_spawn( + self, + cmd, + spawned_re, + expects=(), + sends=(), + use_pty=False, + logfile=None, + logfile_read=None, + logfile_send=None, + trace=None, + **kwargs, + ): + """Create an async spawned send/expect process. + + Args: + cmd: list of args to exec/popen with, or an already open socket + spawned_re: what to look for to know when done, `spawn` returns when seen + expects: a list of regex other than `spawned_re` to look for. Commonly, + "ogin:" or "[Pp]assword:"r. + sends: what to send when an element of `expects` matches. So e.g., the + username or password if thats what corresponding expect matched. Can + be the empty string to send nothing. + use_pty: true for pty based expect, otherwise uses popen (pipes/files) + trace: if true then log send/expects + **kwargs - kwargs passed on the _spawn. + + Returns: + A pexpect process. + + Raises: + pexpect.TIMEOUT, pexpect.EOF as documented in `pexpect` + CalledProcessError if EOF is seen and `cmd` exited then + raises a CalledProcessError to indicate the failure. + """ + p, ac = self._spawn_with_logging( + cmd, + use_pty, + logfile, + logfile_read, + logfile_send, + **kwargs, + ) # Do a quick check to see if we got the prompt right away, otherwise we may be # at a console so we send a \n to re-issue the prompt @@ -682,7 +811,28 @@ def spawn( self.logger.debug("%s: expecting: %s", self, patterns) - while index := p.expect(patterns): + # The timestamp is only used for the case of use_pty != True + timeout = kwargs.get("timeout", 120) + timeout_ts = datetime.datetime.now() + datetime.timedelta(seconds=timeout) + index = None + while True: + if use_pty is True: + index = await p.expect(patterns, async_=True) + else: + # Due to an upstream issue, async_=True cannot be mixed with + # pipes (pexpect.popen_spawn.PopenSpawn). This hack is used + # to bypass that problem. + await asyncio.sleep(0) # Avoid blocking other coroutines + try: + index = p.expect(patterns, timeout=0.1) + except pexpect.TIMEOUT: + # We must declare when a timeout occurs instead of pexpect + if timeout_ts < datetime.datetime.now(): + raise + continue + if index == 0: + break + if trace: assert p.match is not None self.logger.debug( @@ -761,7 +911,7 @@ async def shell_spawn( combined_prompt = r"({}|{})".format(re.escape(PEXPECT_PROMPT), prompt) assert not is_file_like(cmd) or not use_pty - p = self.spawn( + p = await self.async_spawn( cmd, combined_prompt, expects=expects, diff --git a/munet/native.py b/munet/native.py index 43246fe..04c47e7 100644 --- a/munet/native.py +++ b/munet/native.py @@ -911,7 +911,12 @@ async def monitor( logfile_read = open(lfname, "a+", encoding="utf-8") logfile_read.write("-- start read logging for: '{}' --\n".format(sock)) - p = self.spawn(sock, prompt, logfile=logfile, logfile_read=logfile_read) + p = await self.async_spawn( + sock, + prompt, + logfile=logfile, + logfile_read=logfile_read, + ) from .base import ShellWrapper # pylint: disable=C0415 p.send("\n") diff --git a/tests/control/test_spawn.py b/tests/control/test_spawn.py index 4a53c07..bc0de36 100644 --- a/tests/control/test_spawn.py +++ b/tests/control/test_spawn.py @@ -9,6 +9,10 @@ import logging import os import time +import pexpect +import logging +import munet +import re import pytest @@ -53,7 +57,7 @@ async def _test_repl(unet, hostname, cmd, use_pty, will_echo=False): @pytest.mark.parametrize("host", ["host1", "container1", "remote1", "hn1"]) @pytest.mark.parametrize("mode", ["pty", "piped"]) @pytest.mark.parametrize("shellcmd", ["/bin/bash", "/bin/dash", "/usr/bin/ksh"]) -async def test_spawn(unet_share, host, mode, shellcmd): +async def test_shell_expect(unet_share, host, mode, shellcmd): unet = unet_share if not os.path.exists(shellcmd): pytest.skip(f"{shellcmd} not installed skipping") @@ -98,3 +102,97 @@ async def test_spawn(unet_share, host, mode, shellcmd): # this is required for setns() restoration to work for non-pty (piped) bash if mode != "pty": repl.child.proc.kill() + + +@pytest.mark.parametrize("host_name", ["host1", "container1", "remote1", "hn1"]) +@pytest.mark.parametrize("mode", ["pty", "piped"]) +@pytest.mark.parametrize("shellcmd", ["/bin/bash", "/bin/dash", "/usr/bin/ksh"]) +# As of 6-24-25, spawn is unused in the munet API. It is still maintained for the future +async def test_spawn(unet_share, host_name, mode, shellcmd): + unet = unet_share + if not os.path.exists(shellcmd): + pytest.skip(f"{shellcmd} not installed skipping") + + use_pty = mode == "pty" + prompt = r"(^|\r?\n)[^#\$]*[#\$] " + + if use_pty: + cmd = [shellcmd] + else: + cmd = [shellcmd, "-si"] + + host = unet.hosts[host_name] + time.sleep(1) + + p = host.spawn( + cmd, + prompt, + use_pty=use_pty, + timeout=1, + ) + + p.send("\n") + + # Sanity check, create a value in p's STDOUT that doesn't appear within p's STDIN + p.send("echo $(( 3 * 7 )) \n") + + try: + index = p.expect([r"21"], timeout=1) + assert index == 0 + except pexpect.TIMEOUT: + host.logger.critical("test_spawn: Expect failed with the process returned by spawn()") + assert False + finally: + p.kill(9) + + +@pytest.mark.parametrize("mode", ['async', 'sync']) +@pytest.mark.parametrize("catch_err", ["timeout", "eof"]) +async def test_spawn_err(unet_share, mode, catch_err): + unet = unet_share + hostname = "host1" + prompt = r"foo" + expects = ["bar"] + sends = ["baz"] + + if catch_err == "timeout": + expected_error = pexpect.TIMEOUT + cmd = ["/bin/bash"] + elif catch_err == "eof": + expected_error = munet.base.CalledProcessError + cmd = ["/bin/sleep", "1"] # A command that exits instantly results in a broken pipe + + host = unet.hosts[hostname] + time.sleep(1) + + saved_level = host.logger.getEffectiveLevel() + host.logger.setLevel(logging.CRITICAL) # Hide expected error messages for duration of test + + try: + if mode == 'async': + p = await host.async_spawn( + cmd, + prompt, + expects=expects, + sends=sends, + use_pty=False, + echo=False, + timeout=1, + ) + host.logger.critical("test_spawn_err: async_spawn unexpectedly succeeded") + else: + p = host.spawn( + cmd, + prompt, + expects=expects, + sends=sends, + use_pty=False, + echo=False, + timeout=1, + ) + host.logger.critical("test_spawn_err: spawn unexpectedly succeeded") + assert False + except expected_error: + pass + finally: + host.logger.setLevel(saved_level) # Revert to initial logging level