diff --git a/python/grass/CMakeLists.txt b/python/grass/CMakeLists.txt index 58976d8601d..5d92b4433ff 100644 --- a/python/grass/CMakeLists.txt +++ b/python/grass/CMakeLists.txt @@ -21,6 +21,7 @@ set(PYDIRS semantic_label temporal temporal/ply + tools utils) set(PYDIR_GRASS ${GRASS_INSTALL_PYDIR}/grass) diff --git a/python/grass/Makefile b/python/grass/Makefile index cc54ca583b6..cc04b04b496 100644 --- a/python/grass/Makefile +++ b/python/grass/Makefile @@ -19,6 +19,7 @@ SUBDIRS = \ script \ semantic_label \ temporal \ + tools \ utils default: $(PYDIR)/__init__.py diff --git a/python/grass/app/cli.py b/python/grass/app/cli.py index 227c3a53168..c29df4fdde2 100644 --- a/python/grass/app/cli.py +++ b/python/grass/app/cli.py @@ -20,10 +20,48 @@ import tempfile import os import sys +import subprocess from pathlib import Path + import grass.script as gs from grass.app.data import lock_mapset, unlock_mapset, MapsetLockingException +from grass.tools import Tools + +# Special flags supported besides help and --json which does not need special handling: +SPECIAL_FLAGS = [ + "--interface-description", + "--md-description", + "--wps-process-description", + "--script", +] +# To make this list shorter, we don't support outdated special flags: +# --help-text --html-description --rst-description + + +def subcommand_run_tool(args, tool_args: list, print_help: bool): + command = [args.tool, *tool_args] + with tempfile.TemporaryDirectory() as tmp_dir_name: + project_name = "project" + project_path = Path(tmp_dir_name) / project_name + gs.create_project(project_path) + with gs.setup.init(project_path) as session: + tools = Tools(session=session, capture_output=False) + try: + # From here, we return the subprocess return code regardless of its + # value. Error states are handled through exceptions. + if print_help: + # We consumed the help flag, so we need to add it explicitly. + return tools.call_cmd([*command, "--help"]).returncode + if any(item in command for item in SPECIAL_FLAGS): + # This is here basically because of how --json behaves, + # two JSON flags are accepted, but --json currently overridden by + # other special flags, so later use of --json in tools will fail + # with the other flags active. + return tools.call_cmd(command).returncode + return tools.run_cmd(command).returncode + except subprocess.CalledProcessError as error: + return error.returncode def subcommand_lock_mapset(args): @@ -73,10 +111,22 @@ def main(args=None, program=None): description="Experimental low-level CLI interface to GRASS. Consult developers before using it.", prog=program, ) - subparsers = parser.add_subparsers(title="subcommands", required=True) + subparsers = parser.add_subparsers( + title="subcommands", dest="subcommand", required=True + ) # Subcommand parsers + run_subparser = subparsers.add_parser( + "run", + help="run a tool", + add_help=False, + epilog="Tool name is followed by its parameters.", + ) + run_subparser.add_argument("tool", type=str, nargs="?", help="name of a tool") + run_subparser.add_argument("--help", action="store_true") + run_subparser.set_defaults(func=subcommand_run_tool) + subparser = subparsers.add_parser("lock", help="lock a mapset") subparser.add_argument("mapset_path", type=str) subparser.add_argument( @@ -120,5 +170,16 @@ def main(args=None, program=None): subparser.add_argument("page", type=str) subparser.set_defaults(func=subcommand_show_man) - parsed_args = parser.parse_args(args) + # Parsing + parsed_args, other_args = parser.parse_known_args(args) + # Standard help already exited, but we need to handle tools separately. + if parsed_args.subcommand == "run": + if parsed_args.tool is None and parsed_args.help: + run_subparser.print_help() + return 0 + if parsed_args.tool is None: + run_subparser.print_usage() + # argparse gives 2 without parameters, so we behave the same. + return 2 + return parsed_args.func(parsed_args, other_args, print_help=parsed_args.help) return parsed_args.func(parsed_args) diff --git a/python/grass/app/tests/grass_app_cli_test.py b/python/grass/app/tests/grass_app_cli_test.py index fa6cf59bd78..7e7472ac534 100644 --- a/python/grass/app/tests/grass_app_cli_test.py +++ b/python/grass/app/tests/grass_app_cli_test.py @@ -6,6 +6,7 @@ def test_cli_help_runs(): + """Check help of the main command""" with pytest.raises(SystemExit) as exception: main(["--help"]) assert exception.value.code == 0 @@ -13,4 +14,42 @@ def test_cli_help_runs(): @pytest.mark.skipif(sys.platform.startswith("win"), reason="No man on Windows") def test_man_subcommand_runs(): + """Check that man subcommand runs without an error""" assert main(["man", "g.region"]) == 0 + + +def test_subcommand_man_no_page(): + """argparse gives 2 without parameters""" + with pytest.raises(SystemExit) as exception: + main(["man"]) + assert exception.value.code == 2 + + +def test_subcommand_run_help(): + """Check help of a subcommand""" + assert main(["run", "--help"]) == 0 + + +def test_subcommand_run_no_tool(): + """argparse gives 2 without parameters""" + assert main(["run"]) == 2 + + +def test_subcommand_run_tool_help(): + """Check help of a tool""" + assert main(["run", "g.region", "--help"]) == 0 + + +def test_subcommand_run_tool_special_flag(): + """Check that a special flag is supported""" + assert main(["run", "g.region", "--interface-description"]) == 0 + + +def test_subcommand_run_tool_regular_run(): + """Check that a tool runs without error""" + assert main(["run", "g.region", "-p"]) == 0 + + +def test_subcommand_run_tool_failure_run(): + """Check that a tool produces non-zero return code""" + assert main(["run", "g.region", "raster=does_not_exist"]) == 1 diff --git a/python/grass/experimental/Makefile b/python/grass/experimental/Makefile index 46525e3e91a..e28f932a0a8 100644 --- a/python/grass/experimental/Makefile +++ b/python/grass/experimental/Makefile @@ -1,7 +1,7 @@ MODULE_TOPDIR = ../../.. -include $(MODULE_TOPDIR)/include/Make/Other.make include $(MODULE_TOPDIR)/include/Make/Python.make +include $(MODULE_TOPDIR)/include/Make/Dir.make DSTDIR = $(ETC)/python/grass/experimental @@ -12,7 +12,7 @@ MODULES = \ PYFILES := $(patsubst %,$(DSTDIR)/%.py,$(MODULES) __init__) PYCFILES := $(patsubst %,$(DSTDIR)/%.pyc,$(MODULES) __init__) -default: $(PYFILES) $(PYCFILES) +default: subdirs $(PYFILES) $(PYCFILES) $(DSTDIR): $(MKDIR) $@ diff --git a/python/grass/experimental/tests/conftest.py b/python/grass/experimental/tests/conftest.py index d749bd09c51..5bd44da1de0 100644 --- a/python/grass/experimental/tests/conftest.py +++ b/python/grass/experimental/tests/conftest.py @@ -1,4 +1,4 @@ -"""Fixtures for grass.script""" +"""Fixtures for grass.experimental""" import uuid import os diff --git a/python/grass/tools/Makefile b/python/grass/tools/Makefile new file mode 100644 index 00000000000..8820c884347 --- /dev/null +++ b/python/grass/tools/Makefile @@ -0,0 +1,21 @@ +MODULE_TOPDIR = ../../.. + +include $(MODULE_TOPDIR)/include/Make/Other.make +include $(MODULE_TOPDIR)/include/Make/Python.make + +DSTDIR = $(ETC)/python/grass/tools + +MODULES = \ + session_tools \ + support + +PYFILES := $(patsubst %,$(DSTDIR)/%.py,$(MODULES) __init__) +PYCFILES := $(patsubst %,$(DSTDIR)/%.pyc,$(MODULES) __init__) + +default: $(PYFILES) $(PYCFILES) + +$(DSTDIR): + $(MKDIR) $@ + +$(DSTDIR)/%: % | $(DSTDIR) + $(INSTALL_DATA) $< $@ diff --git a/python/grass/tools/__init__.py b/python/grass/tools/__init__.py new file mode 100644 index 00000000000..86826c6dd50 --- /dev/null +++ b/python/grass/tools/__init__.py @@ -0,0 +1,10 @@ +def __getattr__(name): + if name == "Tools": + from .session_tools import Tools + + return Tools + msg = f"module {__name__} has no attribute {name}" + raise AttributeError(msg) + + +__all__ = ["Tools"] # pylint: disable=undefined-all-variable diff --git a/python/grass/tools/session_tools.py b/python/grass/tools/session_tools.py new file mode 100644 index 00000000000..410fc8256ea --- /dev/null +++ b/python/grass/tools/session_tools.py @@ -0,0 +1,308 @@ +############################################################################## +# AUTHOR(S): Vaclav Petras <wenzeslaus gmail com> +# +# PURPOSE: API to call GRASS tools (modules) as Python functions +# +# COPYRIGHT: (C) 2023-2025 Vaclav Petras and the GRASS Development Team +# +# This program is free software under the GNU General Public +# License (>=v2). Read the file COPYING that comes with GRASS +# for details. +############################################################################## + +"""The module provides an API to use GRASS tools (modules) as Python functions""" + +from __future__ import annotations + +import os + +import grass.script as gs + +from .support import ParameterConverter, ToolFunctionResolver, ToolResult + + +class Tools: + """Use GRASS tools through function calls (experimental) + + GRASS tools (modules) can be executed as methods of this class. + This API is experimental in version 8.5 and is expected to be stable in version 8.6. + + The tools can be used in an active GRASS session (this skipped when writing + a GRASS tool): + + >>> import grass.script as gs + >>> gs.create_project("xy_project") + >>> session = gs.setup.init("xy_project") + + Multiple tools can be accessed through a single *Tools* object: + + >>> from grass.tools import Tools + >>> tools = Tools(session=session) + >>> tools.g_region(rows=100, cols=100) # doctest: +ELLIPSIS + ToolResult(...) + >>> tools.r_random_surface(output="surface", seed=42) + ToolResult(...) + + For tools outputting JSON, the results can be accessed directly: + + >>> print("cells:", tools.g_region(flags="p", format="json")["cells"]) + cells: 10000 + + Resulting text or other output can be accessed through attributes of + the *ToolResult* object: + + >>> tools.g_region(flags="p").text # doctest: +SKIP + """ + + def __init__( + self, + *, + session=None, + env=None, + overwrite=None, + verbose=None, + quiet=None, + superquiet=None, + errors=None, + capture_output=True, + capture_stderr=None, + ): + """ + If session is provided and has an env attribute, it is used to execute tools. + If env is provided, it is used to execute tools. If both session and env are + provided, env is used to execute tools and session is ignored. + However, session and env interaction may change in the future. + + If overwrite is provided, a an overwrite is set for all the tools. + When overwrite is set to False, individual tool calls can set overwrite + to True. If overwrite is set in the session or env, it is used. + Note that once overwrite is set to True globally, an individual tool call + cannot set it back to False. + + If verbose, quiet, superquiet is set to True, the corresponding verbosity level + is set for all the tools. If one of them is set to False and the environment + has the corresponding variable set, it is unset. + The values cannot be combined. If multiple are set to True, the most verbose + one wins. + """ + if env: + self._original_env = env + elif session and hasattr(session, "env"): + self._original_env = session.env + else: + self._original_env = os.environ + self._modified_env = None + self._overwrite = overwrite + self._verbose = verbose + self._quiet = quiet + self._superquiet = superquiet + self._errors = errors + self._capture_output = capture_output + if capture_stderr is None: + self._capture_stderr = capture_output + else: + self._capture_stderr = capture_stderr + self._name_resolver = None + + def _modified_env_if_needed(self): + """Get the environment for subprocesses + + Creates a modified copy if needed based on the parameters, + but returns the original environment otherwise. + """ + env = None + if self._overwrite is not None: + env = env or self._original_env.copy() + if self._overwrite: + env["GRASS_OVERWRITE"] = "1" + else: + env["GRASS_OVERWRITE"] = "0" + + if ( + self._verbose is not None + or self._quiet is not None + or self._superquiet is not None + ): + env = env or self._original_env.copy() + + def set_or_unset(env, variable_value, state): + """ + Set the variable the corresponding value if state is True. If it is + False and the variable is set to the corresponding value, unset it. + """ + if state: + env["GRASS_VERBOSE"] = variable_value + elif ( + state is False + and "GRASS_VERBOSE" in env + and env["GRASS_VERBOSE"] == variable_value + ): + del env["GRASS_VERBOSE"] + + # This does not check for multiple ones set at the same time, + # but the most verbose one wins for safety. + set_or_unset(env, "0", self._superquiet) + set_or_unset(env, "1", self._quiet) + set_or_unset(env, "3", self._verbose) + + return env or self._original_env + + def run(self, name: str, /, **kwargs): + """Run a tool by specifying its name as a string and parameters. + + The parameters tool are tool name as a string and parameters as keyword + arguments. The keyword arguments may include an argument *flags* which is a + string of one-character tool flags. + + The function may perform additional processing on the parameters. + + :param name: name of a GRASS tool + :param kwargs: tool parameters + """ + # Object parameters are handled first before the conversion of the call to a + # list of strings happens. + object_parameter_handler = ParameterConverter() + object_parameter_handler.process_parameters(kwargs) + + # Get a fixed env parameter at at the beginning of each execution, + # but repeat it every time in case the referenced environment is modified. + args, popen_options = gs.popen_args_command(name, **kwargs) + # We approximate original kwargs with the possibly-modified kwargs. + return self.run_cmd( + args, + tool_kwargs=kwargs, + input=object_parameter_handler.stdin, + **popen_options, + ) + + def run_cmd( + self, + command: list[str], + *, + input: str | bytes | None = None, + tool_kwargs: dict | None = None, + **popen_options, + ): + """Run a tool by passing its name and parameters a list of strings. + + The function may perform additional processing on the parameters. + + :param command: list of strings to execute as the command + :param input: text input for the standard input of the tool + :param tool_kwargs: named tool arguments used for error reporting (experimental) + :param **popen_options: additional options for :py:func:`subprocess.Popen` + """ + return self.call_cmd( + command, + tool_kwargs=tool_kwargs, + input=input, + **popen_options, + ) + + def call(self, name: str, /, **kwargs): + """Run a tool by specifying its name as a string and parameters. + + The parameters tool are tool name as a string and parameters as keyword + arguments. The keyword arguments may include an argument *flags* which is a + string of one-character tool flags. + + The function will directly execute the tool without any major processing of + the parameters, but numbers, lists, and tuples will still be translated to + strings for execution. + + :param name: name of a GRASS tool + :param **kwargs: tool parameters + """ + args, popen_options = gs.popen_args_command(name, **kwargs) + return self.call_cmd(args, **popen_options) + + def call_cmd(self, command, tool_kwargs=None, input=None, **popen_options): + """Run a tool by passing its name and parameters as a list of strings. + + The function is similar to :py:func:`subprocess.run` but with different + defaults and return value. + + :param command: list of strings to execute as the command + :param tool_kwargs: named tool arguments used for error reporting (experimental) + :param input: text input for the standard input of the tool + :param **popen_options: additional options for :py:func:`subprocess.Popen` + """ + # We allow the user to overwrite env, which allows for maximum flexibility + # with some potential for confusion when the user uses a broken environment. + if "env" not in popen_options: + popen_options["env"] = self._modified_env_if_needed() + if self._capture_output: + if "stdout" not in popen_options: + popen_options["stdout"] = gs.PIPE + if self._capture_stderr: + if "stderr" not in popen_options: + popen_options["stderr"] = gs.PIPE + if input is not None: + popen_options["stdin"] = gs.PIPE + else: + popen_options["stdin"] = None + process = gs.Popen( + command, + **popen_options, + ) + stdout, stderr = process.communicate(input=input) + returncode = process.poll() + # We don't have the keyword arguments to pass to the resulting object. + result = ToolResult( + name=command[0], + command=command, + kwargs=tool_kwargs, + returncode=returncode, + stdout=stdout, + stderr=stderr, + ) + if returncode != 0: + # This is only for the error states. + # The handle_errors function handles also the run_command functions + # and may use some overall review to make the handling of the tool name + # and parameters more clear, but currently, the first item in args is a + # list if it is a whole command. + args = [command[0]] if tool_kwargs else [command] + return gs.handle_errors( + returncode, + result=result, + args=args, + kwargs=tool_kwargs or {}, + stderr=stderr, + handler=self._errors, + ) + return result + + def __getattr__(self, name): + """Get a function representing a GRASS tool. + + Attribute should be in the form 'r_example_name'. For example, 'r.slope.aspect' + is used trough attribute 'r_slope_aspect'. + """ + if not self._name_resolver: + self._name_resolver = ToolFunctionResolver( + run_function=self.run, + env=self._original_env, + ) + return self._name_resolver.get_function(name, exception_type=AttributeError) + + def __dir__(self): + """List available tools and standard attributes.""" + if not self._name_resolver: + self._name_resolver = ToolFunctionResolver( + run_function=self.run, + env=self._original_env, + ) + # Collect instance and class attributes + static_attrs = set(dir(type(self))) | set(self.__dict__.keys()) + return list(static_attrs) + self._name_resolver.names() + + def __enter__(self): + """Enter the context manager context. + + :returns: reference to the object (self) + """ + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the context manager context.""" diff --git a/python/grass/tools/support.py b/python/grass/tools/support.py new file mode 100644 index 00000000000..00844871de8 --- /dev/null +++ b/python/grass/tools/support.py @@ -0,0 +1,276 @@ +############################################################################## +# AUTHOR(S): Vaclav Petras <wenzeslaus gmail com> +# +# PURPOSE: API to call GRASS tools (modules) as Python functions +# +# COPYRIGHT: (C) 2025 Vaclav Petras and the GRASS Development Team +# +# This program is free software under the GNU General Public +# License (>=v2). Read the file COPYING that comes with GRASS +# for details. +############################################################################## + +""" +The module provides support functionality for calling GRASS tools (modules) +as Python functions. + +Do not use this module directly unless you are developing GRASS or its wrappers. +In that case, be prepared to update your code if this module changes. This module may +change between releases. +""" + +from __future__ import annotations + +import json +import shutil +from io import StringIO + +import grass.script as gs + + +class ParameterConverter: + def __init__(self): + self._numpy_inputs = {} + self._numpy_outputs = {} + self._numpy_inputs_ordered = [] + self.stdin = None + + def process_parameters(self, kwargs): + for key, value in kwargs.items(): + if isinstance(value, StringIO): + kwargs[key] = "-" + self.stdin = value.getvalue() + + +class ToolFunctionResolver: + def __init__(self, *, run_function, env): + self._run_function = run_function + self._env = env + self._names = None + + def get_tool_name(self, name, exception_type): + """Parse attribute to GRASS display module. Attribute should be in + the form 'tool_name'. For example, 'r.info' is called with 'r_info'. + """ + # Convert snake case attribute name to dotted tool name. + tool_name = name.replace("_", ".") + # We first try to find the tool on path which is much faster than getting + # and checking the names, but if the tool is not found, likely because runtime + # is not set up, we check the names. + if ( + not shutil.which(tool_name, path=self._env["PATH"]) + and name not in self.names() + ): + suggestions = self.suggest_tools(tool_name) + if suggestions: + # While Python may automatically suggest the closest match, + # we show more matches. We also show single match more often + # (this may change in the future). + msg = ( + f"Tool {name} ({tool_name}) not found" + f" (but found {', '.join(suggestions)})" + ) + raise exception_type(msg) + msg = ( + f"Tool or attribute {name} ({tool_name}) not found" + " (check session setup and documentation for tool and attribute names)" + ) + raise exception_type(msg) + return tool_name + + def get_function(self, name, exception_type): + tool_name = self.get_tool_name(name, exception_type) + + def wrapper(**kwargs): + return self._run_function(tool_name, **kwargs) + + wrapper.__doc__ = f"Run {tool_name} as function" + + return wrapper + + def __getattr__(self, name): + return self.get_function(name, exception_type=AttributeError) + + @staticmethod + def levenshtein_distance(text1: str, text2: str) -> int: + if len(text1) < len(text2): + return ToolFunctionResolver.levenshtein_distance(text2, text1) + + if len(text2) == 0: + return len(text1) + + previous_row = list(range(len(text2) + 1)) + for i, char1 in enumerate(text1): + current_row = [i + 1] + for j, char2 in enumerate(text2): + insertions = previous_row[j + 1] + 1 + deletions = current_row[j] + 1 + substitutions = previous_row[j] + (char1 != char2) + current_row.append(min(insertions, deletions, substitutions)) + previous_row = current_row + + return previous_row[-1] + + def suggest_tools(self, text): + """Suggest matching tool names based on provided text. + + The first five tools matching the condition are returned, + so this always returns at most five names, but it may not be the best matches. + The returned names are sorted alphabetically (not by priority). + This specific behavior may change in the future versions. + """ + result = [] + max_suggestions = 5 + for name in self.names(): + if ToolFunctionResolver.levenshtein_distance(text, name) < len(text) / 2: + result.append(name) + if len(result) >= max_suggestions: + break + result.sort() + return result + + def names(self): + if self._names: + return self._names + self._names = [name.replace(".", "_") for name in gs.get_commands()[0]] + return self._names + + +class ToolResult: + """Result returned after executing a tool""" + + def __init__(self, *, name, command, kwargs, returncode, stdout, stderr): + self._name = name + self._command = command + self._kwargs = kwargs + self.returncode = returncode + self._stdout = stdout + self._stderr = stderr + self._text = None + self._cached_json = None + + @property + def text(self) -> str | None: + """Text output as decoded string""" + if self._text is not None: + return self._text + if self._stdout is None: + return None + if isinstance(self._stdout, bytes): + decoded_stdout = gs.decode(self._stdout) + else: + decoded_stdout = self._stdout + self._text = decoded_stdout.strip() + return self._text + + @property + def keyval(self) -> dict: + """Text output read as key-value pairs separated by equal signs + + If possible, values are converted to int or float. Empty values result in + an empty string value (there is no recognized null value). + Empty or no output results in an empty dictionary. + """ + + def conversion(value): + """Convert text to int or float if possible, otherwise return it as is""" + try: + return int(value) + except ValueError: + pass + try: + return float(value) + except ValueError: + pass + return value + + return gs.parse_key_val(self._stdout, val_type=conversion) + + @property + def comma_items(self) -> list: + """Text output read as comma-separated list + + Empty or no output results in an empty list. + """ + return self.text_split(",") + + @property + def space_items(self) -> list: + """Text output read as whitespace-separated list + + Empty or no output results in an empty list. + """ + return self.text_split(separator=None) + + def text_split(self, separator=None) -> list: + """Parse text output read as list separated by separators + + Any leading or trailing newlines are removed prior to parsing. + Empty or no output results in an empty list. + """ + if not self.text: + # This provides consitent behavior with explicit separator including + # a single space and no separator which triggers general whitespace + # splitting which results in an empty list for an empty string. + return [] + # The use of strip is assuming that the output is one line which + # ends with a newline character which is for display only. + return self.text.split(separator) + + @property + def stdout(self) -> str | bytes | None: + return self._stdout + + @property + def stderr(self) -> str | bytes | None: + return self._stderr + + def _json_or_error(self) -> dict: + if self._cached_json is not None: + return self._cached_json + if self._stdout: + # We are testing just std out and letting rest to the parse and the user. + # This makes no assumption about how JSON is produced by the tool. + try: + self._cached_json = json.loads(self._stdout) + return self._cached_json + except json.JSONDecodeError as error: + if self._kwargs and self._kwargs.get("format") == "json": + raise + if self._command and "format=json" in self._command: + raise + msg = ( + f"Output of {self._name} cannot be parsed as JSON. " + 'Did you use format="json"?' + ) + # We don't raise JSONDecodeError ourselves, but use more general + # ValueError when format is likely not set properly or when there is + # no output (see below). + # JSONDecodeError is ValueError, so users may just catch ValueError. + raise ValueError(msg) from error + msg = f"No text output for {self._name} to be parsed as JSON" + raise ValueError(msg) + + @property + def json(self) -> dict: + """Text output read as JSON + + This returns the nested structure of dictionaries and lists or fails when + the output is not JSON. + """ + return self._json_or_error() + + def __getitem__(self, name): + return self._json_or_error()[name] + + def __len__(self): + return len(self._json_or_error()) + + def __repr__(self): + parameters = [] + parameters.append(f"returncode={self.returncode}") + if self._stdout is not None: + parameters.append(f"stdout='{self._stdout}'") + if self._stderr is not None: + parameters.append(f"stderr='{self._stderr}'") + return f"{self.__class__.__name__}({', '.join(parameters)})" diff --git a/python/grass/tools/tests/conftest.py b/python/grass/tools/tests/conftest.py new file mode 100644 index 00000000000..f4b9145af1c --- /dev/null +++ b/python/grass/tools/tests/conftest.py @@ -0,0 +1,35 @@ +"""Fixtures for grass.tools""" + +import os + +import pytest + +import grass.script as gs +from grass.tools.support import ToolResult, ToolFunctionResolver + + +@pytest.fixture +def xy_dataset_session(tmp_path): + """Creates a session with XY project""" + gs.create_project(tmp_path / "test") + with gs.setup.init(tmp_path / "test", env=os.environ.copy()) as session: + yield session + + +@pytest.fixture +def empty_result(): + return ToolResult( + name=None, command=None, kwargs=None, returncode=None, stdout=None, stderr=None + ) + + +@pytest.fixture +def empty_string_result(): + return ToolResult( + name=None, command=None, kwargs=None, returncode=None, stdout="", stderr="" + ) + + +@pytest.fixture +def echoing_resolver(): + return ToolFunctionResolver(run_function=lambda x: x, env=os.environ.copy()) diff --git a/python/grass/tools/tests/grass_tools_result_test.py b/python/grass/tools/tests/grass_tools_result_test.py new file mode 100644 index 00000000000..2c76330aefa --- /dev/null +++ b/python/grass/tools/tests/grass_tools_result_test.py @@ -0,0 +1,218 @@ +"""Tests of grass.tools.support.ToolResult""" + +import json + +import pytest + +from grass.tools.support import ToolResult + + +def test_no_text_stdout(empty_result): + assert not empty_result.stdout + assert empty_result.stdout is None + + +def test_no_text_stderr(empty_result): + assert not empty_result.stderr + assert empty_result.stderr is None + + +def test_no_text(empty_result): + assert not empty_result.text + assert empty_result.text is None + + +def test_no_text_split(empty_result): + assert empty_result.text_split() == [] + + +def test_no_text_split_pipe(empty_result): + assert empty_result.text_split("|") == [] + + +def test_no_text_comma_items(empty_result): + assert empty_result.comma_items == [] + + +def test_no_text_space_items(empty_result): + assert empty_result.space_items == [] + + +def test_no_text_keyval(empty_result): + assert empty_result.keyval == {} + + +def test_no_text_json(empty_result): + with pytest.raises(ValueError, match="No text output"): + assert empty_result.json + + +def test_empty_text_stdout(empty_string_result): + assert not empty_string_result.stdout + assert empty_string_result.stdout is not None + + +def test_empty_text_stderr(empty_string_result): + assert not empty_string_result.stderr + assert empty_string_result.stderr is not None + + +def test_empty_text(empty_string_result): + assert not empty_string_result.text + assert empty_string_result.text is not None + + +def test_empty_text_split(empty_string_result): + assert empty_string_result.text_split() == [] + + +def test_empty_text_split_pipe(empty_string_result): + assert empty_string_result.text_split(" ") == [] + + +def test_empty_text_comma_items(empty_string_result): + assert empty_string_result.comma_items == [] + + +def test_empty_text_space_items(empty_string_result): + assert empty_string_result.space_items == [] + + +def test_empty_text_keyval(empty_string_result): + assert empty_string_result.keyval == {} + + +def test_empty_text_json(empty_string_result): + with pytest.raises(ValueError, match="No text output"): + assert empty_string_result.json + + +def test_empty_text_keyval_empty_value(): + text = "a=\nb=xyz" + result = ToolResult( + name=None, command=None, kwargs=None, returncode=None, stdout=text, stderr=None + ) + assert result.keyval == {"a": "", "b": "xyz"} + + +def test_empty_text_keyval_numbers(): + text = "a=1\nb=1.0" + result = ToolResult( + name=None, command=None, kwargs=None, returncode=None, stdout=text, stderr=None + ) + assert result.keyval == {"a": 1, "b": 1.0} + + +def test_json_format_set_but_text_invalid_with_command(): + """Check invalid format, but the format is set in command""" + text = "invalid format" + result = ToolResult( + name=None, + command=["test_tool", "format=json"], + kwargs=None, + returncode=None, + stdout=text, + stderr=None, + ) + with pytest.raises(json.JSONDecodeError): + assert result.json + + +def test_json_format_set_but_text_invalid_with_kwargs(): + """Check invalid format, but the format is set in kwargs""" + text = "invalid format" + result = ToolResult( + name="test_tool", + command=None, + kwargs={"format": "json"}, + returncode=None, + stdout=text, + stderr=None, + ) + with pytest.raises(json.JSONDecodeError): + assert result.json + + +def test_json_decode_error_exception_is_value_error(): + """Check that ValueError is raised when JSON decoding fails + + In the ocde, we assume that JSONDecodeError is a subclass of ValueError + which allows users to catch both JSONDecodeError and ValueError as ValueError, + so we test this behavior here by catching ValueError. + """ + text = "invalid format" + result = ToolResult( + name="test_tool", + command=None, + kwargs={"format": "json"}, + returncode=None, + stdout=text, + stderr=None, + ) + with pytest.raises(ValueError, match="Expecting"): + assert result.json + + +def test_json_format_not_set_and_text_invalid(): + """Check format set to something else than JSON""" + text = "invalid format" + result = ToolResult( + name="test_tool", + command=None, + kwargs={"format": "csv"}, + returncode=None, + stdout=text, + stderr=None, + ) + with pytest.raises(ValueError, match=r"format.*json"): + assert result.json + + +def test_json_format_correct_with_wrong_parameter(): + """Check that parameter does not influence JSON parsing""" + text = '{"a": 1, "b": 1.0}' + result = ToolResult( + name="test_tool", + command=None, + kwargs={"format": "csv"}, + returncode=None, + stdout=text, + stderr=None, + ) + assert result.json == {"a": 1, "b": 1.0} + + +def test_text_as_bytes(): + stdout = b"a=1\nb=1.0" + result = ToolResult( + name=None, + command=None, + kwargs=None, + returncode=None, + stdout=stdout, + stderr=None, + ) + # The stdout attribute should be untouched. + assert result.stdout == stdout + # The text attribute should be decoded. + assert result.text == stdout.decode() + + +def test_text_strip(): + stdout = " a=1\nb=1.0 \n" + result = ToolResult( + name=None, + command=None, + kwargs=None, + returncode=None, + stdout=stdout, + stderr=None, + ) + # The stdout attribute should be untouched. + assert result.stdout == stdout + # Different ways of asking the same thing. + # The repeated access should also trigger caching, but we don't test for + # that explicitly. + assert result.text == "a=1\nb=1.0" + assert result.text == result.text.strip() + assert result.text == stdout.strip() diff --git a/python/grass/tools/tests/grass_tools_session_tools_test.py b/python/grass/tools/tests/grass_tools_session_tools_test.py new file mode 100644 index 00000000000..ffa5b1657ff --- /dev/null +++ b/python/grass/tools/tests/grass_tools_session_tools_test.py @@ -0,0 +1,812 @@ +"""Test grass.tools.Tools class""" + +import io +import os + +import pytest + +import grass.script as gs +from grass.exceptions import CalledModuleError +from grass.experimental.mapset import TemporaryMapsetSession +from grass.tools import Tools + + +def test_key_value_parser_number(xy_dataset_session): + """Check that numbers are parsed as numbers from key-value (shell) output""" + tools = Tools(session=xy_dataset_session) + assert tools.g_region(flags="p", format="shell").keyval["nsres"] == 1 + + +def test_key_value_parser_multiple_values(xy_dataset_session): + """Check that strings and floats are parsed""" + tools = Tools(session=xy_dataset_session) + name = "surface" + tools.r_surf_gauss(output=name) # needs seed + result = tools.r_info(map=name, format="shell").keyval + assert result["datatype"] == "DCELL" + assert result["nsres"] == 1 + result = tools.r_univar(map=name, format="shell").keyval + # We don't worry about the specific output value, and just test the type. + assert isinstance(result["mean"], float) + + +def test_json_parser(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_region(flags="p", format="json").json["cols"] == 1 + + +def test_json_with_name_and_parameter_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert tools.run("g.region", flags="p", format="json").json["cols"] == 1 + + +def test_json_with_direct_name_and_parameter_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert tools.call("g.region", flags="p", format="json").json["cols"] == 1 + + +def test_json_with_subprocess_run_like_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert tools.run_cmd(["g.region", "format=json", "-p"]).json["cols"] == 1 + + +def test_json_with_direct_subprocess_run_like_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert tools.call_cmd(["g.region", "format=json", "-p"]).json["cols"] == 1 + + +def test_json_as_list(xy_dataset_session): + """Check that a JSON result behaves as a list""" + tools = Tools(session=xy_dataset_session) + # This also tests JSON parsing with a format option. + result = tools.g_search_modules(keyword="random", flags="j") + for item in result: + assert "name" in item + assert len(result) + + +def test_help_call_no_parameters(xy_dataset_session): + """Check that help text is generated without any one parameters provided""" + tools = Tools(session=xy_dataset_session) + assert "r.slope.aspect" in tools.call_cmd(["r.slope.aspect", "--help"]).stderr + + +def test_help_call_with_parameters(xy_dataset_session): + """Check that help text is generated with parameters provided""" + tools = Tools(session=xy_dataset_session) + assert ( + "r.slope.aspect" + in tools.call_cmd( + ["r.slope.aspect", "elevation=dem", "slope=slope", "--help"] + ).stderr + ) + + +def test_json_call_with_low_level_call(xy_dataset_session): + """Check that --json call works including JSON data parsing""" + tools = Tools(session=xy_dataset_session) + # This also tests JSON parsing with a format option. + data = tools.call_cmd( + ["r.slope.aspect", "elevation=dem", "slope=slope", "--json"] + ).json + assert "inputs" in data + assert data["inputs"][0]["value"] == "dem" + + +def test_json_call_with_high_level_call(xy_dataset_session): + """Check that --json call works including JSON data parsing""" + tools = Tools(session=xy_dataset_session) + data = tools.run_cmd( + ["r.slope.aspect", "elevation=dem", "slope=slope", "--json"] + ).json + assert "inputs" in data + assert data["inputs"][0]["value"] == "dem" + + +def test_json_direct_access(xy_dataset_session): + """Check that JSON data can be directly accessed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_region(flags="p", format="json")["cols"] == 1 + + +def test_json_direct_access_bad_key_type(xy_dataset_session): + """Check that JSON indexing fails when using a wrong type""" + tools = Tools(session=xy_dataset_session) + with pytest.raises(TypeError): + tools.g_search_modules(keyword="random", flags="j")["name"] + + +def test_json_direct_access_bad_key_value(xy_dataset_session): + """Check that JSON indexing fails when using a wrong value""" + tools = Tools(session=xy_dataset_session) + high_number = 100_000_000 + with pytest.raises(IndexError): + tools.g_search_modules(keyword="random", flags="j")[high_number] + + +def test_json_direct_access_not_json(xy_dataset_session): + """Check that JSON parsing creates an ValueError + + Specifically, this tests the case when format="json" is not set. + """ + tools = Tools(session=xy_dataset_session) + with pytest.raises(ValueError, match=r"format.*json"): + tools.g_search_modules(keyword="random")[0]["name"] + + +def test_stdout_as_text(xy_dataset_session): + """Check that simple text is parsed and has no whitespace""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="p").text == "PERMANENT" + + +def test_stdout_as_space_items(xy_dataset_session): + """Check that whitespace-separated items are parsed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="l").space_items == ["PERMANENT"] + + +def test_stdout_split_whitespace(xy_dataset_session): + """Check that whitespace-based split function works""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="l").text_split() == ["PERMANENT"] + + +def test_stdout_split_space(xy_dataset_session): + """Check that the split function works with space""" + tools = Tools(session=xy_dataset_session) + # Not a good example usage, but it tests the functionality. + assert tools.g_mapset(flags="l").text_split(" ") == ["PERMANENT"] + + +def test_stdout_comma_items(xy_dataset_session): + """Check that the comma-separated split works""" + tools = Tools(session=xy_dataset_session) + result = tools.g_gisenv( + get="GISDBASE,LOCATION_NAME,MAPSET", sep="comma" + ).comma_items + assert len(result) == 3 + for item in result: + assert item + + +def test_stdout_without_capturing(xy_dataset_session): + """Check that stdout and stderr are not present when not capturing it""" + tools = Tools(session=xy_dataset_session, capture_output=False) + result = tools.g_mapset(flags="p") + assert not result.stdout + assert result.stdout is None + assert not result.stderr + assert result.stderr is None + + +def test_attributes_without_stdout(xy_dataset_session): + """Check that attributes behave as expected when not capturing stdout""" + tools = Tools(session=xy_dataset_session, capture_output=False) + result = tools.run_cmd(["g.region", "format=json", "-p"]) + assert not result.text + assert result.text is None + # We pretend like we want the value. That makes for more natural syntax + # for the test and also makes code analysis happy. + value = None + with pytest.raises(ValueError, match="No text output"): + value = result.json + with pytest.raises(ValueError, match="No text output"): + value = result["cols"] + assert not result.keyval + with pytest.raises(KeyError): + value = result.keyval["cols"] + assert value is None + + +def test_capturing_stdout_and_stderr(xy_dataset_session): + """Check that both stdout and stdin are present when capturing them""" + tools = Tools(session=xy_dataset_session, capture_output=True, errors="ignore") + result = tools.g_mapset(flags="l") + assert result.stdout + assert result.stdout is not None + assert "PERMANENT" in result.stdout + result = tools.g_mapset(mapset="does_not_exist") + assert result.stderr + assert result.stderr is not None + assert "does_not_exist" in result.stderr + + +def test_capturing_stdout_without_stderr(xy_dataset_session): + """Check that only stdout is present when stderr is not captured""" + tools = Tools( + session=xy_dataset_session, + capture_output=True, + capture_stderr=False, + errors="ignore", + ) + result = tools.g_mapset(flags="l") + assert result.stdout + assert result.stdout is not None + assert "PERMANENT" in result.stdout + result = tools.g_mapset(mapset="does_not_exist") + assert not result.stderr + assert result.stderr is None + + +def test_capturing_stderr_without_stdout(xy_dataset_session): + """Check that only stderr is present when stdout is not captured""" + tools = Tools( + session=xy_dataset_session, + capture_output=False, + capture_stderr=True, + errors="ignore", + ) + result = tools.g_mapset(flags="p") + assert not result.stdout + assert result.stdout is None + result = tools.g_mapset(mapset="does_not_exist") + assert result.stderr + assert result.stderr is not None + assert "does_not_exist" in result.stderr + + +def test_capturing_stderr_for_exception_without_stdout(xy_dataset_session): + """Check that stderr is part of the exception message when not capturing stdout""" + tools = Tools(session=xy_dataset_session, capture_output=False, capture_stderr=True) + # Testing the actual English message here because that is really generated by the + # tool itself rather than the message containing the tool parameters. + with pytest.raises(CalledModuleError, match="does not exist"): + tools.g_mapset(mapset="does_not_exist") + + +def test_raises(xy_dataset_session): + """Test that exception is raised for wrong parameter value""" + tools = Tools(session=xy_dataset_session) + wrong_name = "does_not_exist" + with pytest.raises(CalledModuleError, match=wrong_name): + tools.r_slope_aspect( + elevation=wrong_name, + slope="slope", + ) + + +def test_error_handler_default(xy_dataset_session): + """Check that default error handler works as expected""" + tools = Tools(session=xy_dataset_session) + with pytest.raises(CalledModuleError, match="does_not_exist"): + tools.g_mapset(mapset="does_not_exist") + # Works after errors as usual. + assert tools.g_mapset(flags="p").text == "PERMANENT" + + +def test_error_handler_none(xy_dataset_session): + """Check that explicit None error handler works as expected""" + tools = Tools(session=xy_dataset_session, errors=None) + with pytest.raises(CalledModuleError, match="does_not_exist"): + tools.g_mapset(mapset="does_not_exist") + # Works after errors as usual. + assert tools.g_mapset(flags="p").text == "PERMANENT" + + +def test_error_handler_raise(xy_dataset_session): + """Check that raise error handler raises an exception""" + tools = Tools(session=xy_dataset_session, errors="raise") + with pytest.raises(CalledModuleError, match="does_not_exist"): + tools.g_mapset(mapset="does_not_exist") + # Works after errors as usual. + assert tools.g_mapset(flags="p").text == "PERMANENT" + + +def test_error_handler_ignore(xy_dataset_session): + """Check that ignore error handler does not raise and terminate""" + tools = Tools(session=xy_dataset_session, errors="ignore") + tools.g_mapset(mapset="does_not_exist") + tools.g_region(raster="does_not_exist") + # Works after errors as usual. + assert tools.g_mapset(flags="p").text == "PERMANENT" + + +def test_error_handler_check_returncode(xy_dataset_session): + """Check that ignore error handler allows to check return code""" + tools = Tools(session=xy_dataset_session, errors="ignore") + assert tools.g_mapset(mapset="does_not_exist").returncode == 1 + assert tools.g_region(raster="does_not_exist").returncode == 1 + # Works after errors as usual with return code. + assert tools.g_mapset(flags="p").returncode == 0 + + +def test_error_handler_fatal(xy_dataset_session): + """Check that fatal error handler exits""" + tools = Tools(session=xy_dataset_session, errors="fatal") + with pytest.raises(SystemExit): + assert tools.g_mapset(mapset="does_not_exist") + + +def test_error_handler_exit(xy_dataset_session): + """Check that exit error handler exits""" + tools = Tools(session=xy_dataset_session, errors="exit") + with pytest.raises(SystemExit): + assert tools.g_mapset(mapset="does_not_exist") + + +def test_verbose(xy_dataset_session): + """Check that verbose message is generated""" + tools = Tools(session=xy_dataset_session, verbose=True) + tools.g_mapset(mapset="test1", flags="c") + tools.g_mapset(mapset="test2", flags="c") + result = tools.g_mapsets(mapset="test1") + # This depends on the specific user messages being produced. + # (Assumes name of the mapset is in a verbose message.) + assert "test1" in result.stderr + + +def test_quiet(xy_dataset_session): + """Check that quiet configuration works""" + tools = Tools(session=xy_dataset_session, quiet=True) + result = tools.g_mapsets(mapset="PERMANENT") + # Expecting an important message about no modification. + assert result.stderr + + +def test_superquiet(xy_dataset_session): + """Check that superquiet configuration does not generate any messages""" + tools = Tools(session=xy_dataset_session, superquiet=True) + result = tools.g_mapsets(mapset="PERMANENT") + assert not result.stderr + + +def test_superquiet_verbose(xy_dataset_session): + """Check that text function-level verbose overrides object-level superquiet""" + tools = Tools(session=xy_dataset_session, superquiet=True) + result = tools.g_mapsets(mapset="PERMANENT", verbose=True) + assert result.stderr + + +def test_verbose_superquiet(xy_dataset_session): + """Check that text function-level superquiet overrides object-level verbose""" + tools = Tools(session=xy_dataset_session, verbose=True) + result = tools.g_mapsets(mapset="PERMANENT", superquiet=True) + assert not result.stderr + + +def test_superquiet_false(xy_dataset_session): + """Check that text object-level superquiet False overrides global superquiet""" + xy_dataset_session.env["GRASS_VERBOSE"] = "0" + tools = Tools(session=xy_dataset_session, superquiet=False) + result = tools.g_mapsets(mapset="PERMANENT") + assert result.stderr + assert "GRASS_VERBOSE" in xy_dataset_session.env + assert xy_dataset_session.env["GRASS_VERBOSE"] == "0" + assert "GRASS_VERBOSE" not in tools._modified_env_if_needed(), ( + f"GRASS_VERBOSE has value {tools._modified_env_if_needed()['GRASS_VERBOSE']}" + ) # pylint: disable=protected-access + + +def test_verbose_false(xy_dataset_session): + """Check that text object-level verbose False overrides global verbose""" + xy_dataset_session.env["GRASS_VERBOSE"] = "3" + tools = Tools(session=xy_dataset_session, verbose=False) + tools.g_mapset(mapset="test1", flags="c") + tools.g_mapset(mapset="test2", flags="c") + result = tools.g_mapsets(mapset="test1") + assert not result.stderr + assert "GRASS_VERBOSE" in xy_dataset_session.env + assert xy_dataset_session.env["GRASS_VERBOSE"] == "3" + assert "GRASS_VERBOSE" not in tools._modified_env_if_needed(), ( + f"GRASS_VERBOSE has value {tools._modified_env_if_needed()['GRASS_VERBOSE']}" + ) # pylint: disable=protected-access + + +def test_direct_overwrite(xy_dataset_session): + """Check overwrite as a function parameter""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42, overwrite=True) + + +def test_object_overwrite(xy_dataset_session): + """Check overwrite as parameter of the tools object""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session, overwrite=True) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42) + + +def test_no_overwrite(xy_dataset_session): + """Check that it fails without overwrite""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + + +def test_overwrite_true_in_call(xy_dataset_session): + """Check explicitly disabled overwrite, but working tool-level overwrite""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session, overwrite=False) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42, overwrite=True) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + + +def test_overwrite_false_in_call(xy_dataset_session): + """Individual overwrite is not used when global is True and individual is False + + Not ideal behavior, but we simply test for the current behavior. + """ + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session, overwrite=True) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42, overwrite=False) + + +def test_env_overwrite(xy_dataset_session): + """Check that overwrite from env parameter is used""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + env = xy_dataset_session.env.copy() + env["GRASS_OVERWRITE"] = "1" + tools = Tools(env=env) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42) + + +def test_session_overwrite(xy_dataset_session): + """Check that overwrite from session env attribute is used""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + xy_dataset_session.env["GRASS_OVERWRITE"] = "1" + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42) + + +def test_parent_overwrite_vs_env(xy_dataset_session): + """Check that parent overwrite is not used when separate env is used""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + # Set up everything head of time. + env = xy_dataset_session.env.copy() + xy_dataset_session.env["GRASS_OVERWRITE"] = "1" + # Only after the setup, create tools. + tools = Tools(session=xy_dataset_session, env=env) + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + + +def test_parent_overwrite_vs_object_init(xy_dataset_session): + """Check that parent overwrite is not used when parameter is used""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session, overwrite=False) + xy_dataset_session.env["GRASS_OVERWRITE"] = "1" + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + + +def test_parent_overwrite_vs_call(xy_dataset_session): + """Check that parent overwrite is not used when function parameter is used""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session) + xy_dataset_session.env["GRASS_OVERWRITE"] = "0" + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42, overwrite=True) + + +def test_parent_overwrite_after_call(xy_dataset_session): + """Check that parent overwrite is used when using the same env""" + assert "GRASS_OVERWRITE" not in xy_dataset_session.env + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + xy_dataset_session.env["GRASS_OVERWRITE"] = "1" + tools.r_random_surface(output="surface", seed=42) + + +def test_migration_from_run_command_family(xy_dataset_session): + """Check example usages corresponding to run_command family calls + + We also check the run_command family calls just to be sure they are + actually correct. + """ + # run_command + # original: + gs.run_command( + "r.random.surface", output="surface", seed=42, env=xy_dataset_session.env + ) + # replacement: + tools = Tools(env=xy_dataset_session.env) # same for one or multiple calls + tools.run("r.random.surface", output="surface2", seed=42) # name as a string + tools.r_random_surface(output="surface3", seed=42) # name as a function + + # write_command + # original: + gs.write_command( + "v.in.ascii", + input="-", + output="point1", + separator=",", + env=xy_dataset_session.env, + stdin="13.45,29.96,200\n", + ) + # replacement: + tools.run( + "v.in.ascii", + input=io.StringIO("13.45,29.96,200\n"), + output="point2", + separator=",", + ) + tools.v_in_ascii( + input=io.StringIO("13.45,29.96,200\n"), + output="point3", + separator=",", + ) + + # read_command + # original: + assert ( + gs.read_command("g.region", flags="c", env=xy_dataset_session.env) + == "center easting: 0.500000\ncenter northing: 0.500000\n" + ) + # replacement: + assert ( + tools.run("g.region", flags="c").stdout + == "center easting: 0.500000\ncenter northing: 0.500000\n" + ) + assert ( + tools.g_region(flags="c").text + == "center easting: 0.500000\ncenter northing: 0.500000" + ) + + # parse_command + # original (numbers are strings): + assert gs.parse_command( + "g.region", flags="c", format="shell", env=xy_dataset_session.env + ) == { + "center_easting": "0.500000", + "center_northing": "0.500000", + } + assert gs.parse_command( + "g.region", flags="c", format="json", env=xy_dataset_session.env + ) == { + "center_easting": 0.5, + "center_northing": 0.5, + } + # replacement with format=shell (numbers are not strings, but actual numbers as in JSON + # if they convert to Python int or float): + assert tools.run("g.region", flags="c", format="shell").keyval == { + "center_easting": 0.5, + "center_northing": 0.5, + } + # parse_command with JSON and the function call syntax: + assert tools.g_region(flags="c", format="json").json == { + "center_easting": 0.5, + "center_northing": 0.5, + } + + # parse_command storing JSON output in a variable and accessing individual values + # original: + data = gs.parse_command( + "g.region", flags="c", format="json", env=xy_dataset_session.env + ) + assert data["center_easting"] == 0.5 + assert data["center_northing"] == 0.5 + # replacement: + data = tools.g_region(flags="c", format="json") + assert data["center_easting"] == 0.5 + assert data["center_northing"] == 0.5 + + # mapcalc wrapper of r.mapcalc + # original: + gs.mapcalc("a = 1", env=xy_dataset_session.env) + # replacement for short expressions: + tools.r_mapcalc(expression="b = 1") + # replacement for long expressions: + tools.r_mapcalc(file=io.StringIO("c = 1")) + + # g.list wrappers + # test data preparation (for comparison of the results): + names = ["a", "b", "c", "surface", "surface2", "surface3"] + # original: + assert gs.list_grouped("raster", env=xy_dataset_session.env)["PERMANENT"] == names + # replacement (using the JSON output of g.list): + assert [ + item["name"] + for item in tools.g_list(type="raster", format="json") + if item["mapset"] == "PERMANENT" + ] == names + # original and replacement (directly comparing the results): + assert gs.list_strings("raster", env=xy_dataset_session.env) == [ + item["fullname"] for item in tools.g_list(type="raster", format="json") + ] + # original and replacement (directly comparing the results): + assert gs.list_pairs("raster", env=xy_dataset_session.env) == [ + (item["name"], item["mapset"]) + for item in tools.g_list(type="raster", format="json") + ] + + # all other wrappers + # Wrappers in grass.script usually parse shell-script style key-value pairs, + # and convert values from strings to numbers, e.g. g.region: + assert gs.region(env=xy_dataset_session.env)["rows"] == 1 + # Conversion is done automatically in Tools and/or with JSON, and the basic tool + # call syntax is more lightweight, so the direct tool call is not that different + # from a wrapper. Direct tool calling also benefits from better defaults (e.g., + # printing more in JSON) and more consistent tool behavior (e.g., tools accepting + # format="json"). So, direct call of g.region to obtain the number of rows: + assert tools.g_region(flags="p", format="json")["rows"] == 1 + + # run_command with returncode + # original: + assert ( + gs.run_command( + "r.mask.status", flags="t", env=xy_dataset_session.env, errors="status" + ) + == 1 + ) + # replacement: + tools_with_returncode = Tools(errors="ignore", env=xy_dataset_session.env) + assert tools_with_returncode.run("r.mask.status", flags="t").returncode == 1 + assert tools_with_returncode.r_mask_status(flags="t").returncode == 1 + + # run_command with overwrite + # original: + gs.run_command( + "r.random.surface", + output="surface", + seed=42, + overwrite=True, + env=xy_dataset_session.env, + ) + # replacement: + tools = Tools(env=xy_dataset_session.env) + tools.r_random_surface(output="surface", seed=42, overwrite=True) + # or with global overwrite: + tools = Tools(overwrite=True, env=xy_dataset_session.env) + tools.r_random_surface(output="surface", seed=42) + + +def test_as_context_manager(xy_dataset_session): + """Test usage of as a context manager""" + with Tools(session=xy_dataset_session) as tools: + tools.r_random_surface(output="surface1", seed=42) + # There is no limit on entering the context manager multiple times. + with tools: + tools.r_random_surface(output="surface2", seed=42) + # Neither there is limit on using it after exiting the context. + tools.r_random_surface(output="surface3", seed=42) + assert [item["name"] for item in tools.g_list(type="raster", format="json")] == [ + "surface1", + "surface2", + "surface3", + ] + + +def test_with_context_managers_explicit_env(tmpdir): + """Test with other context managers while passing the environment to functions""" + project = tmpdir / "project" + gs.create_project(project) + with gs.setup.init(project, env=os.environ.copy()) as session: + tools = Tools() + tools.r_random_surface(output="surface", seed=42, env=session.env) + with TemporaryMapsetSession(env=session.env) as mapset: + tools.r_random_surface(output="surface", seed=42, env=mapset.env) + with gs.MaskManager(env=mapset.env.copy()) as mask: + tools.r_mask(raster="surface", env=mask.env) + assert tools.r_mask_status(format="json", env=mask.env)["present"] + with gs.RegionManager( + rows=100, cols=100, env=mask.env.copy() + ) as region: + assert ( + tools.g_region(flags="p", format="json", env=region.env)["rows"] + == 100 + ) + + +def test_with_context_managers_session_env(tmpdir): + """Test with other context managers while using environment from a session""" + project = tmpdir / "project" + gs.create_project(project) + with ( + gs.setup.init(project, env=os.environ.copy()) as main_session, + TemporaryMapsetSession(env=main_session.env) as mapset_session, + ): + # TemporaryMapsetSession does not modify like the others, but creates its own env. + tools = Tools(session=mapset_session) + tools.r_random_surface(output="surface", seed=42) + with gs.RegionManager(rows=100, cols=100, env=mapset_session.env): + assert tools.g_region(flags="p", format="json")["rows"] == 100 + with gs.MaskManager(env=mapset_session.env): + tools.r_mask(raster="surface") + assert tools.r_mask_status(format="json")["present"] + + +def test_with_context_managers_session_env_one_block(tmpdir): + """Check behavior in a single with statement with other context managers""" + project = tmpdir / "project" + gs.create_project(project) + with ( + gs.setup.init(project, env=os.environ.copy()) as main_session, + TemporaryMapsetSession(env=main_session.env) as mapset_session, + gs.RegionManager(rows=100, cols=100, env=mapset_session.env), + gs.MaskManager(env=mapset_session.env), + Tools(session=mapset_session) as tools, + ): + tools.r_random_surface(output="surface", seed=42) + tools.r_mask(raster="surface") + assert tools.r_mask_status(format="json")["present"] + assert tools.g_region(flags="p", format="json")["rows"] == 100 + + +def test_misspelling(xy_dataset_session): + """Check a misspelled tool name as a function name results in a right suggestion""" + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"r_slope_aspect"): + tools.r_sloppy_respect() + + +def test_multiple_suggestions(xy_dataset_session): + """Check a confused tool name results in multiple suggestions""" + tools = Tools(session=xy_dataset_session) + with pytest.raises( + AttributeError, + match=r".*".join(sorted(["v_univar", "v_db_univar", "db_univar"])), + ): + tools.db_v_univar() + + +def test_tool_group_vs_model_name(xy_dataset_session): + """Check a confused tool group name as a function name results in a right suggestion""" + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"r_sim_water"): + tools.rSIMWEwater() + + +def test_wrong_attribute(xy_dataset_session): + """Check a wrong attribute results in an exception with its name""" + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match="execute_big_command"): + tools.execute_big_command() + + +def test_stdin_as_stringio_object(xy_dataset_session): + """Check that StringIO interface for stdin works""" + tools = Tools(session=xy_dataset_session) + tools.v_edit(map="points", type="point", tool="create") + tools.v_edit( + map="points", + type="point", + tool="add", + input=io.StringIO("P 1 0\n 10 20"), + flags="n", + ) + assert tools.v_info(map="points", format="json")["points"] == 1 + + +def test_tool_attribute_access_c_tools(): + """Check C tool names can be listed without a session""" + tools = Tools() + assert "g_region" in dir(tools) + assert "r_slope_aspect" in dir(tools) + + +def test_tool_attribute_access_python_tools(): + """Check Python tool names can be listed without a session""" + tools = Tools() + assert "g_search_modules" in dir(tools) + assert "r_mask" in dir(tools) + + +def test_tool_doc_access_c_tools(): + """Check C tools have doc strings without a session""" + tools = Tools() + assert tools.g_region.__doc__ + assert tools.r_slope_aspect.__doc__ + + +def test_tool_doc_access_python_tools(): + """Check Python tools have doc strings without a session""" + tools = Tools() + assert tools.g_search_modules.__doc__ + assert tools.r_mask.__doc__ diff --git a/python/grass/tools/tests/grass_tools_tool_function_wrap_test.py b/python/grass/tools/tests/grass_tools_tool_function_wrap_test.py new file mode 100644 index 00000000000..e851c83b13d --- /dev/null +++ b/python/grass/tools/tests/grass_tools_tool_function_wrap_test.py @@ -0,0 +1,71 @@ +"""Tests of grass.tools.support.ToolFunctionResolver""" + +import pytest + +from grass.tools.support import ToolFunctionResolver + + +class CustomException(Exception): + pass + + +def test_get_tool_name(echoing_resolver): + """Check that dotted tool name is resolved""" + assert ( + echoing_resolver.get_tool_name("r_info", exception_type=CustomException) + == "r.info" + ) + + +def test_get_tool_name_exception(echoing_resolver): + """Check that custom exception is raised when tool does not exist""" + with pytest.raises(CustomException, match="does_not_exist"): + assert echoing_resolver.get_tool_name( + "does_not_exist", exception_type=CustomException + ) + + +def test_get_function(echoing_resolver): + """Check that function is called (this makes use of echoing resolver)""" + assert ( + echoing_resolver.get_function("r_info", exception_type=CustomException)() + == "r.info" + ) + + +def test_get_function_exception(echoing_resolver): + """Check that custom exception is raised when tool does not exist""" + with pytest.raises(CustomException, match="does_not_exist"): + assert echoing_resolver.get_function( + "does_not_exist", exception_type=CustomException + ) + + +def test_attribute(echoing_resolver): + """Check that function is called (this makes use of echoing resolver)""" + assert echoing_resolver.r_info() == "r.info" + + +def test_attribute_exception(echoing_resolver): + """Check that attribute error is raised with attribute access""" + with pytest.raises(AttributeError, match="does_not_exist"): + assert echoing_resolver.does_not_exist + + +def test_names(echoing_resolver): + """Check that tool names are present with underscores, not dots""" + assert "r_info" in echoing_resolver.names() + assert "v_info" in echoing_resolver.names() + assert "r.info" not in echoing_resolver.names() + assert "v.info" not in echoing_resolver.names() + + +def test_levenshtein_distance_empty_text(): + empty_text = "" + non_empty_text = "abc" + ToolFunctionResolver.levenshtein_distance(empty_text, non_empty_text) == len( + non_empty_text + ) + ToolFunctionResolver.levenshtein_distance(non_empty_text, empty_text) == len( + non_empty_text + )