Skip to content

munet: add Commander.async_spawn() #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 173 additions & 23 deletions munet/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,41 @@
p = pexpect.spawn(actual_cmd[0], actual_cmd[1:], echo=echo, **defaults)
return p, actual_cmd

def _spawn_with_logging(
self,
cmd,
use_pty=False,
logfile=None,
logfile_read=None,
logfile_send=None,
**kwargs,
):
"""Create a spawned process with logging to files configured."""
if is_file_like(cmd):
assert not use_pty
ac = "*socket*"
p = self._fdspawn(cmd, **kwargs)
else:
p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs)

if logfile:
p.logfile = logfile
if logfile_read:
p.logfile_read = logfile_read
if logfile_send:
p.logfile_send = logfile_send

# for spawned shells (i.e., a direct command an not a console)
# this is wrong and will cause 2 prompts
if not use_pty:
# This isn't very nice looking
p.echo = False
if not is_file_like(cmd):
p.isalive = lambda: p.proc.poll() is None
if not hasattr(p, "close"):
p.close = p.wait
return p, ac

def spawn(
self,
cmd,
Expand Down Expand Up @@ -638,29 +673,123 @@
CalledProcessError if EOF is seen and `cmd` exited then
raises a CalledProcessError to indicate the failure.
"""
if is_file_like(cmd):
assert not use_pty
ac = "*socket*"
p = self._fdspawn(cmd, **kwargs)
else:
p, ac = self._spawn(cmd, use_pty=use_pty, **kwargs)
p, ac = self._spawn_with_logging(
cmd,
use_pty,
logfile,
logfile_read,
logfile_send,
**kwargs,
)

if logfile:
p.logfile = logfile
if logfile_read:
p.logfile_read = logfile_read
if logfile_send:
p.logfile_send = logfile_send
# Do a quick check to see if we got the prompt right away, otherwise we may be
# at a console so we send a \n to re-issue the prompt
index = p.expect([spawned_re, pexpect.TIMEOUT, pexpect.EOF], timeout=0.1)
if index == 0:
assert p.match is not None
self.logger.debug(
"%s: got spawned_re quick: '%s' matching '%s'",
self,
p.match.group(0),
spawned_re,
)
return p

# for spawned shells (i.e., a direct command an not a console)
# this is wrong and will cause 2 prompts
if not use_pty:
# This isn't very nice looking
p.echo = False
if not is_file_like(cmd):
p.isalive = lambda: p.proc.poll() is None
if not hasattr(p, "close"):
p.close = p.wait
# Now send a CRLF to cause the prompt (or whatever else) to re-issue
p.send("\n")
try:
patterns = [spawned_re, *expects]

self.logger.debug("%s: expecting: %s", self, patterns)

while index := p.expect(patterns):
if trace:
assert p.match is not None
self.logger.debug(

Check warning on line 708 in munet/base.py

View check run for this annotation

Codecov / codecov/patch

munet/base.py#L706-L708

Added lines #L706 - L708 were not covered by tests
"%s: got expect: '%s' matching %d '%s', sending '%s'",
self,
p.match.group(0),
index,
patterns[index],
sends[index - 1],
)
if sends[index - 1]:
p.send(sends[index - 1])

Check warning on line 717 in munet/base.py

View check run for this annotation

Codecov / codecov/patch

munet/base.py#L716-L717

Added lines #L716 - L717 were not covered by tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The majority of synchronous spawn() is tested, but not all of the duplicate code. I was unable to determine a solution that allowed the send/expect logic to be tested while ensuring that we also test functionality across all the different shells.


self.logger.debug("%s: expecting again: %s", self, patterns)

Check warning on line 719 in munet/base.py

View check run for this annotation

Codecov / codecov/patch

munet/base.py#L719

Added line #L719 was not covered by tests
self.logger.debug(
"%s: got spawned_re: '%s' matching '%s'",
self,
p.match.group(0),
spawned_re,
)
return p
except pexpect.TIMEOUT:
self.logger.error(
"%s: TIMEOUT looking for spawned_re '%s' expect buffer so far:\n%s",
self,
spawned_re,
indent(p.buffer),
)
raise
except pexpect.EOF as eoferr:
if p.isalive():
raise

Check warning on line 737 in munet/base.py

View check run for this annotation

Codecov / codecov/patch

munet/base.py#L737

Added line #L737 was not covered by tests
rc = p.status
before = indent(p.before)
error = CalledProcessError(rc, ac, output=before)
self.logger.error(
"%s: EOF looking for spawned_re '%s' before EOF:\n%s",
self,
spawned_re,
before,
)
p.close()
raise error from eoferr

async def async_spawn(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to create this new function you did a large copy and paste. That's a good indication we need to refactor the function so that more code is shared. :)

That is hard to do sometimes with async/await. I feel like there should be some way to share code better between asyncio and non-asyncio, but I haven't figured it out yet. This is why you see other places in the code where I run the async function inside an asyncio.run() call for the non-asyncio variant. I'm not sure how safe this is though b/c sometimes people aren't using asyncio b/c it's broken for them in which case running it with asyncio.run() will be broken too.

A good place to test this would be in an FRR topotest which uses spawn.

Copy link
Contributor Author

@liambrady liambrady Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite like the asyncio.run() solution for two reasons:

  1. The synchronous version of spawn() cannot be used from within an asynchronous method since asyncio does not support nested event loops to the best of my knowledge. I am not confident enough to claim that there will never be a use case where spawn() inside an async method is appropriate (especially since spawn() is located so deep within the API). As such, an asyncio.run() solution does not feel safe.
  2. Even if we followed the asyncio.run() solution, then we have to work around the async and use_pty=False hack (i.e. to avoid the exception with p.expect(..., async_=True) and pipes). Specifically, we only want to use the hack when absolutely necessary. However, using asyncio.run() as the solution would imply that we end up using the hack during the synchronous variant of spawn(), which is not desired.

The only other real option then in refactoring is to try and split off as much synchronous code as possible and place it into helper methods. I don't find such a solution satisfying, since that would leave a large portion of the methods untouched, but don't see any other real alternatives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the latest commit for a minor refactor that collects code that is synchronous in either version (sync/async) into a single method to reduce duplicate code.

I am struggling to determine if there are any other changes that I could make to reduce code duplication. The presence of the await statement within the try/except block + while block make it difficult to reduce the logic any further imo.

self,
cmd,
spawned_re,
expects=(),
sends=(),
use_pty=False,
logfile=None,
logfile_read=None,
logfile_send=None,
trace=None,
**kwargs,
):
"""Create an async spawned send/expect process.

Args:
cmd: list of args to exec/popen with, or an already open socket
spawned_re: what to look for to know when done, `spawn` returns when seen
expects: a list of regex other than `spawned_re` to look for. Commonly,
"ogin:" or "[Pp]assword:"r.
sends: what to send when an element of `expects` matches. So e.g., the
username or password if thats what corresponding expect matched. Can
be the empty string to send nothing.
use_pty: true for pty based expect, otherwise uses popen (pipes/files)
trace: if true then log send/expects
**kwargs - kwargs passed on the _spawn.

Returns:
A pexpect process.

Raises:
pexpect.TIMEOUT, pexpect.EOF as documented in `pexpect`
CalledProcessError if EOF is seen and `cmd` exited then
raises a CalledProcessError to indicate the failure.
"""
p, ac = self._spawn_with_logging(
cmd,
use_pty,
logfile,
logfile_read,
logfile_send,
**kwargs,
)

# Do a quick check to see if we got the prompt right away, otherwise we may be
# at a console so we send a \n to re-issue the prompt
Expand All @@ -682,7 +811,28 @@

self.logger.debug("%s: expecting: %s", self, patterns)

while index := p.expect(patterns):
# The timestamp is only used for the case of use_pty != True
timeout = kwargs.get("timeout", 120)
timeout_ts = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
index = None
while True:
if use_pty is True:
index = await p.expect(patterns, async_=True)
else:
# Due to an upstream issue, async_=True cannot be mixed with
# pipes (pexpect.popen_spawn.PopenSpawn). This hack is used
# to bypass that problem.
await asyncio.sleep(0) # Avoid blocking other coroutines
try:
index = p.expect(patterns, timeout=0.1)
except pexpect.TIMEOUT:
# We must declare when a timeout occurs instead of pexpect
if timeout_ts < datetime.datetime.now():
raise
continue
if index == 0:
break

if trace:
assert p.match is not None
self.logger.debug(
Expand Down Expand Up @@ -761,7 +911,7 @@
combined_prompt = r"({}|{})".format(re.escape(PEXPECT_PROMPT), prompt)

assert not is_file_like(cmd) or not use_pty
p = self.spawn(
p = await self.async_spawn(
cmd,
combined_prompt,
expects=expects,
Expand Down
7 changes: 6 additions & 1 deletion munet/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,12 @@ async def monitor(
logfile_read = open(lfname, "a+", encoding="utf-8")
logfile_read.write("-- start read logging for: '{}' --\n".format(sock))

p = self.spawn(sock, prompt, logfile=logfile, logfile_read=logfile_read)
p = await self.async_spawn(
sock,
prompt,
logfile=logfile,
logfile_read=logfile_read,
)
from .base import ShellWrapper # pylint: disable=C0415

p.send("\n")
Expand Down
100 changes: 99 additions & 1 deletion tests/control/test_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import logging
import os
import time
import pexpect
import logging
import munet
import re

import pytest

Expand Down Expand Up @@ -53,7 +57,7 @@ async def _test_repl(unet, hostname, cmd, use_pty, will_echo=False):
@pytest.mark.parametrize("host", ["host1", "container1", "remote1", "hn1"])
@pytest.mark.parametrize("mode", ["pty", "piped"])
@pytest.mark.parametrize("shellcmd", ["/bin/bash", "/bin/dash", "/usr/bin/ksh"])
async def test_spawn(unet_share, host, mode, shellcmd):
async def test_shell_expect(unet_share, host, mode, shellcmd):
unet = unet_share
if not os.path.exists(shellcmd):
pytest.skip(f"{shellcmd} not installed skipping")
Expand Down Expand Up @@ -98,3 +102,97 @@ async def test_spawn(unet_share, host, mode, shellcmd):
# this is required for setns() restoration to work for non-pty (piped) bash
if mode != "pty":
repl.child.proc.kill()


@pytest.mark.parametrize("host_name", ["host1", "container1", "remote1", "hn1"])
@pytest.mark.parametrize("mode", ["pty", "piped"])
@pytest.mark.parametrize("shellcmd", ["/bin/bash", "/bin/dash", "/usr/bin/ksh"])
# As of 6-24-25, spawn is unused in the munet API. It is still maintained for the future
async def test_spawn(unet_share, host_name, mode, shellcmd):
unet = unet_share
if not os.path.exists(shellcmd):
pytest.skip(f"{shellcmd} not installed skipping")

use_pty = mode == "pty"
prompt = r"(^|\r?\n)[^#\$]*[#\$] "

if use_pty:
cmd = [shellcmd]
else:
cmd = [shellcmd, "-si"]

host = unet.hosts[host_name]
time.sleep(1)

p = host.spawn(
cmd,
prompt,
use_pty=use_pty,
timeout=1,
)

p.send("\n")

# Sanity check, create a value in p's STDOUT that doesn't appear within p's STDIN
p.send("echo $(( 3 * 7 )) \n")

try:
index = p.expect([r"21"], timeout=1)
assert index == 0
except pexpect.TIMEOUT:
host.logger.critical("test_spawn: Expect failed with the process returned by spawn()")
assert False
finally:
p.kill(9)


@pytest.mark.parametrize("mode", ['async', 'sync'])
@pytest.mark.parametrize("catch_err", ["timeout", "eof"])
async def test_spawn_err(unet_share, mode, catch_err):
unet = unet_share
hostname = "host1"
prompt = r"foo"
expects = ["bar"]
sends = ["baz"]

if catch_err == "timeout":
expected_error = pexpect.TIMEOUT
cmd = ["/bin/bash"]
elif catch_err == "eof":
expected_error = munet.base.CalledProcessError
cmd = ["/bin/sleep", "1"] # A command that exits instantly results in a broken pipe

host = unet.hosts[hostname]
time.sleep(1)

saved_level = host.logger.getEffectiveLevel()
host.logger.setLevel(logging.CRITICAL) # Hide expected error messages for duration of test

try:
if mode == 'async':
p = await host.async_spawn(
cmd,
prompt,
expects=expects,
sends=sends,
use_pty=False,
echo=False,
timeout=1,
)
host.logger.critical("test_spawn_err: async_spawn unexpectedly succeeded")
else:
p = host.spawn(
cmd,
prompt,
expects=expects,
sends=sends,
use_pty=False,
echo=False,
timeout=1,
)
host.logger.critical("test_spawn_err: spawn unexpectedly succeeded")
assert False
except expected_error:
pass
finally:
host.logger.setLevel(saved_level) # Revert to initial logging level