diff --git a/python/grass/app/cli.py b/python/grass/app/cli.py index 227c3a53168..4c5e40edfe3 100644 --- a/python/grass/app/cli.py +++ b/python/grass/app/cli.py @@ -20,10 +20,30 @@ import tempfile import os import sys +import subprocess from pathlib import Path + import grass.script as gs from grass.app.data import lock_mapset, unlock_mapset, MapsetLockingException +from grass.experimental.tools import Tools + + +def subcommand_run_tool(args, tool_args: list, help: bool): + command = [args.tool_name, *tool_args] + with tempfile.TemporaryDirectory() as tmp_dir_name: + project_name = "project" + project_path = Path(tmp_dir_name) / project_name + gs.create_project(project_path) + with gs.setup.init(project_path) as session: + if help: + result = subprocess.run(command, env=session.env) + return result.returncode + tools = Tools(capture_output=False) + try: + tools.run_from_list(command) + except subprocess.CalledProcessError as error: + return error.returncode def subcommand_lock_mapset(args): @@ -77,6 +97,10 @@ def main(args=None, program=None): # Subcommand parsers + subparser = subparsers.add_parser("run", help="run a tool") + subparser.add_argument("tool_name", type=str) + subparser.set_defaults(func=subcommand_run_tool) + subparser = subparsers.add_parser("lock", help="lock a mapset") subparser.add_argument("mapset_path", type=str) subparser.add_argument( @@ -120,5 +144,27 @@ def main(args=None, program=None): subparser.add_argument("page", type=str) subparser.set_defaults(func=subcommand_show_man) - parsed_args = parser.parse_args(args) + # Parsing + + if not args: + args = sys.argv[1:] + raw_args = args.copy() + add_back = None + if len(raw_args) > 2 and raw_args[0] == "run": + # Getting the --help of tools needs to work around the standard help mechanism + # of argparse. + # Maybe a better workaround is to use custom --help, action="help", print_help, + # and dedicated tool help function complimentary with g.manual subcommand + # interface. + if "--help" in raw_args[2:]: + raw_args.remove("--help") + add_back = "--help" + elif "--h" in raw_args[2:]: + raw_args.remove("--h") + add_back = "--h" + parsed_args, other_args = parser.parse_known_args(raw_args) + if parsed_args.subcommand == "run": + if add_back: + other_args.append(add_back) + return parsed_args.func(parsed_args, other_args, help=bool(add_back)) return parsed_args.func(parsed_args) diff --git a/python/grass/experimental/Makefile b/python/grass/experimental/Makefile index 46525e3e91a..7b2c0e7d0b1 100644 --- a/python/grass/experimental/Makefile +++ b/python/grass/experimental/Makefile @@ -7,7 +7,8 @@ DSTDIR = $(ETC)/python/grass/experimental MODULES = \ create \ - mapset + mapset \ + tools PYFILES := $(patsubst %,$(DSTDIR)/%.py,$(MODULES) __init__) PYCFILES := $(patsubst %,$(DSTDIR)/%.pyc,$(MODULES) __init__) diff --git a/python/grass/experimental/tests/conftest.py b/python/grass/experimental/tests/conftest.py index 1dbbc95fcd7..b33ae757f8e 100644 --- a/python/grass/experimental/tests/conftest.py +++ b/python/grass/experimental/tests/conftest.py @@ -33,6 +33,14 @@ def xy_session_for_module(tmp_path_factory): yield session +@pytest.fixture +def xy_dataset_session(tmp_path): + """Creates a session with a mapset which has vector with a float column""" + gs.core._create_location_xy(tmp_path, "test") # pylint: disable=protected-access + with gs.setup.init(tmp_path / "test") as session: + yield session + + @pytest.fixture def unique_id(): """A unique alphanumeric identifier""" diff --git a/python/grass/experimental/tests/grass_tools_test.py b/python/grass/experimental/tests/grass_tools_test.py new file mode 100644 index 00000000000..367f160ca26 --- /dev/null +++ b/python/grass/experimental/tests/grass_tools_test.py @@ -0,0 +1,411 @@ +"""Test grass.experimental.Tools class""" + +import os +import io + +import numpy as np +import pytest + +import grass.script as gs +from grass.experimental.mapset import TemporaryMapsetSession +from grass.experimental.tools import Tools +from grass.exceptions import CalledModuleError + + +def test_key_value_parser_number(xy_dataset_session): + """Check that numbers are parsed as numbers""" + tools = Tools(session=xy_dataset_session) + assert tools.g_region(flags="g").keyval["nsres"] == 1 + + +@pytest.mark.xfail +def test_key_value_parser_multiple_values(xy_dataset_session): + """Check that strings and floats are parsed""" + tools = Tools(session=xy_dataset_session) + name = "surface" + tools.r_surf_gauss(output=name) # needs seed + result = tools.r_info(map=name, flags="g").keyval + assert result["datatype"] == "DCELL" + assert result["nsres"] == 1 + result = tools.r_univar(map=name, flags="g").keyval + assert result["mean"] == pytest.approx(-0.756762744552762) + + +def test_json_parser(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + assert ( + tools.g_search_modules(keyword="random", flags="j").json[0]["name"] + == "r.random" + ) + + +def test_json_with_name_and_parameter_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert ( + tools.run("g.search.modules", keyword="random", flags="j")[0]["name"] + == "r.random" + ) + + +def test_json_with_subprocess_run_like_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert ( + tools.run_from_list(["g.search.modules", "keyword=random", "-j"])[0]["name"] + == "r.random" + ) + + +def test_json_direct_access(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_search_modules(keyword="random", flags="j")[0]["name"] == "r.random" + + +def test_json_direct_access_bad_key_type(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + with pytest.raises(TypeError): + tools.g_search_modules(keyword="random", flags="j")["name"] + + +def test_json_direct_access_bad_key_value(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + high_number = 100_000_000 + with pytest.raises(IndexError): + tools.g_search_modules(keyword="random", flags="j")[high_number] + + +def test_json_direct_access_not_json(xy_dataset_session): + """Check that JSON parsing creates an ValueError + + Specifically, this tests the case when format="json" is not set. + """ + tools = Tools(session=xy_dataset_session) + with pytest.raises(ValueError, match=r"format.*json"): + tools.g_search_modules(keyword="random")[0]["name"] + + +def test_stdout_as_text(xy_dataset_session): + """Check that simple text is parsed and has no whitespace""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="p").text == "PERMANENT" + + +def test_stdout_as_space_items(xy_dataset_session): + """Check that whitespace-separated items are parsed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="l").space_items == ["PERMANENT"] + + +def test_stdout_split_whitespace(xy_dataset_session): + """Check that whitespace-based split function works""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="l").text_split() == ["PERMANENT"] + + +def test_stdout_split_space(xy_dataset_session): + """Check that the split function works with space""" + tools = Tools(session=xy_dataset_session) + # Not a good example usage, but it tests the functionality. + assert tools.g_mapset(flags="l").text_split(" ") == ["PERMANENT", ""] + + +def test_stdout_without_capturing(xy_dataset_session): + """Check that text is not present when not capturing it""" + tools = Tools(session=xy_dataset_session, capture_output=False) + assert not tools.g_mapset(flags="p").text + assert tools.g_mapset(flags="p").text is None + + +def test_direct_overwrite(xy_dataset_session): + """Check overwrite as a parameter""" + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42, overwrite=True) + + +def test_object_overwrite(xy_dataset_session): + """Check overwrite as parameter of the tools object""" + tools = Tools(session=xy_dataset_session, overwrite=True) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42) + + +def test_no_overwrite(xy_dataset_session): + """Check that it fails without overwrite""" + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + + +def test_env_overwrite(xy_dataset_session): + """Check that overwrite from env parameter is used""" + # env = xy_dataset_session.env.copy() # ideally + env = os.environ.copy() # for now + env["GRASS_OVERWRITE"] = "1" + tools = Tools(session=xy_dataset_session, env=env) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42) + + +def test_global_overwrite_vs_env(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + # env = xy_dataset_session.env.copy() # ideally + env = os.environ.copy() # for now + os.environ["GRASS_OVERWRITE"] = "1" # change to xy_dataset_session.env + tools = Tools(session=xy_dataset_session, env=env) + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + del os.environ["GRASS_OVERWRITE"] # check or ideally remove this + + +def test_global_overwrite_vs_init(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + tools = Tools(session=xy_dataset_session) + os.environ["GRASS_OVERWRITE"] = "1" # change to xy_dataset_session.env + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + del os.environ["GRASS_OVERWRITE"] # check or ideally remove this + + +def test_stdin(xy_dataset_session): + """Test that stdin is accepted""" + tools = Tools(session=xy_dataset_session) + tools.feed_input_to("13.45,29.96,200").v_in_ascii( + input="-", output="point", separator="," + ) + + +def test_raises(xy_dataset_session): + """Test that exception is raised for wrong parameter value""" + tools = Tools(session=xy_dataset_session) + wrong_name = "wrong_standard" + with pytest.raises(CalledModuleError, match=wrong_name): + tools.feed_input_to("13.45,29.96,200").v_in_ascii( + input="-", + output="point", + format=wrong_name, + ) + + +def test_run_command(xy_dataset_session): + """Check run_command and its overwrite parameter""" + tools = Tools(session=xy_dataset_session) + tools.run_command("r.random.surface", output="surface", seed=42) + tools.run_command("r.random.surface", output="surface", seed=42, overwrite=True) + + +def test_parse_command_key_value(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + assert tools.parse_command("g.region", flags="g")["nsres"] == "1" + + +def test_parse_command_json(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + assert ( + tools.parse_command("g.region", flags="g", format="json")["region"]["ns-res"] + == 1 + ) + + +def test_with_context_managers(tmpdir): + project = tmpdir / "project" + gs.create_project(project) + with gs.setup.init(project) as session: + tools = Tools(session=session) + tools.r_random_surface(output="surface", seed=42) + with TemporaryMapsetSession(env=tools.env) as mapset: + tools.r_random_surface(output="surface", seed=42, env=mapset.env) + with gs.MaskManager(env=mapset.env) as mask: + # TODO: Do actual test + tools.r_univar(map="surface", env=mask.env, format="json")["mean"] + + +def test_misspelling(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"r\.slope\.aspect"): + tools.r_sloppy_respect() + + +def test_multiple_suggestions(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"v\.db\.univar|db\.univar"): + tools.db_v_uni_var() + + +def test_tool_group_vs_model_name(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"r\.sim\.water"): + tools.rSIMWEwater() + + +def test_wrong_attribute(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match="execute_big_command"): + tools.execute_big_command() + + +def test_numpy_one_input(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + tools = Tools(session=xy_dataset_session) + tools.r_slope_aspect(elevation=np.ones((1, 1)), slope="slope") + assert tools.r_info(map="slope", format="json")["datatype"] == "FCELL" + + +# NumPy syntax for outputs +# While inputs are straightforward, there is several possible ways how to handle +# syntax for outputs. +# Output is the type of function for creating NumPy arrays, return value is now the arrays: +# tools.r_slope_aspect(elevation=np.ones((1, 1)), slope=np.ndarray, aspect=np.array) +# Output is explicitly requested: +# tools.r_slope_aspect(elevation=np.ones((1, 1)), slope="slope", aspect="aspect", force_numpy_for_output=True) +# Output is explicitly requested at the object level: +# Tools(force_numpy_for_output=True).r_slope_aspect(elevation=np.ones((1, 1)), slope="slope", aspect="aspect") +# Output is always array or arrays when at least on input is an array: +# tools.r_slope_aspect(elevation=np.ones((1, 1)), slope="slope", aspect="aspect") +# An empty array is passed to signal the desired output: +# tools.r_slope_aspect(elevation=np.ones((1, 1)), slope=np.nulls((0, 0))) +# An array to be filled with data is passed, the return value is kept as is: +# tools.r_slope_aspect(elevation=np.ones((1, 1)), slope=np.nulls((1, 1))) +# NumPy universal function concept can be used explicitly to indicate, +# possibly more easily allowing for nameless args as opposed to keyword arguments, +# but outputs still need to be explicitly requested: +# Returns by value (tuple: (np.array, np.array)): +# tools.r_slope_aspect.ufunc(np.ones((1, 1)), slope=True, aspect=True) +# Modifies its arguments in-place: +# tools.r_slope_aspect.ufunc(np.ones((1, 1)), slope=True, aspect=True, out=(np.array((1, 1)), np.array((1, 1)))) +# Custom signaling classes or objects are passed (assuming empty classes AsNumpy and AsInput): +# tools.r_slope_aspect(elevation=np.ones((1, 1)), slope=ToNumpy(), aspect=ToNumpy()) +# tools.r_slope_aspect(elevation=np.ones((1, 1)), slope=AsInput, aspect=AsInput) +# NumPy functions usually return a tuple, for multiple outputs. Universal function does +# unless the output is written to out parameter which is also provided as a tuple. We +# have names, so generally, we can return a dictionary: +# {"slope": np.array(...), "aspect": np.array(...) }. + + +def test_numpy_one_input_one_output(xy_dataset_session): + """Check that a NumPy array works as input and for signaling output + + It tests that the np.ndarray class is supported to signal output. + Return type is not strictly defined, so we are not testing for it explicitly + (only by actually using it as an NumPy array). + """ + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=2, cols=3) + slope = tools.r_slope_aspect(elevation=np.ones((2, 3)), slope=np.ndarray) + assert slope.shape == (2, 3) + assert np.all(slope == np.full((2, 3), 0)) + + +def test_numpy_with_name_and_parameter(xy_dataset_session): + """Check that a NumPy array works as input and for signaling output + + It tests that the np.ndarray class is supported to signal output. + Return type is not strictly defined, so we are not testing for it explicitly + (only by actually using it as an NumPy array). + """ + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=2, cols=3) + slope = tools.run("r.slope.aspect", elevation=np.ones((2, 3)), slope=np.ndarray) + assert slope.shape == (2, 3) + assert np.all(slope == np.full((2, 3), 0)) + + +def test_numpy_one_input_multiple_outputs(xy_dataset_session): + """Check that a NumPy array function works for signaling multiple outputs + + Besides multiple outputs it tests that np.array is supported to signal output. + """ + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=2, cols=3) + (slope, aspect) = tools.r_slope_aspect( + elevation=np.ones((2, 3)), slope=np.array, aspect=np.array + ) + assert slope.shape == (2, 3) + assert np.all(slope == np.full((2, 3), 0)) + assert aspect.shape == (2, 3) + assert np.all(aspect == np.full((2, 3), 0)) + + +def test_numpy_multiple_inputs_one_output(xy_dataset_session): + """Check that a NumPy array works for multiple inputs""" + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=2, cols=3) + result = tools.r_mapcalc_simple( + expression="A + B", a=np.full((2, 3), 2), b=np.full((2, 3), 5), output=np.array + ) + assert result.shape == (2, 3) + assert np.all(result == np.full((2, 3), 7)) + + +def test_numpy_grass_array_input_output(xy_dataset_session): + """Check that global overwrite is not used when separate env is used + + When grass array output is requested, we explicitly test the return value type. + """ + tools = Tools(session=xy_dataset_session) + rows = 2 + cols = 3 + tools.g_region(rows=rows, cols=cols) + tools.r_mapcalc_simple(expression="5", output="const_5") + const_5 = gs.array.array("const_5") + result = tools.r_mapcalc_simple( + expression="2 * A", a=const_5, output=gs.array.array + ) + assert result.shape == (rows, cols) + assert np.all(result == np.full((rows, cols), 10)) + assert isinstance(result, gs.array.array) + + +def test_tool_groups_raster(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + raster = Tools(session=xy_dataset_session, prefix="r") + raster.mapcalc(expression="streams = if(row() > 1, 1, null())") + raster.buffer(input="streams", output="buffer", distance=1) + assert raster.info(map="streams", format="json")["datatype"] == "CELL" + + +def test_tool_groups_vector(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + vector = Tools(prefix="v") + vector.edit(map="points", type="point", tool="create", env=xy_dataset_session.env) + # Here, the feed_input_to style does not make sense, but we are not using StringIO + # here to test the feed_input_to functionality and avoid dependence on the StringIO + # functionality. + # The ASCII format is for one point with no categories. + vector.feed_input_to("P 1 0\n 10 20").edit( + map="points", + type="point", + tool="add", + input="-", + flags="n", + env=xy_dataset_session.env, + ) + vector.buffer( + input="points", output="buffer", distance=1, env=xy_dataset_session.env + ) + assert ( + vector.info(map="buffer", format="json", env=xy_dataset_session.env)["areas"] + == 1 + ) + + +def test_stdin_as_stringio_object(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + tools = Tools(session=xy_dataset_session) + tools.v_edit(map="points", type="point", tool="create") + tools.v_edit( + map="points", + type="point", + tool="add", + input=io.StringIO("P 1 0\n 10 20"), + flags="n", + ) + assert tools.v_info(map="points", format="json")["points"] == 1 diff --git a/python/grass/experimental/tools.py b/python/grass/experimental/tools.py new file mode 100644 index 00000000000..a59abb00a03 --- /dev/null +++ b/python/grass/experimental/tools.py @@ -0,0 +1,533 @@ +#!/usr/bin/env python + +############################################################################## +# AUTHOR(S): Vaclav Petras +# +# PURPOSE: API to call GRASS tools (modules) as Python functions +# +# COPYRIGHT: (C) 2023-2025 Vaclav Petras and the GRASS Development Team +# +# This program is free software under the GNU General Public +# License (>=v2). Read the file COPYING that comes with GRASS +# for details. +############################################################################## + +"""API to call GRASS tools (modules) as Python functions""" + +import json +import os +import shutil +import subprocess +from io import StringIO + +import numpy as np + +import grass.script as gs +import grass.script.array as garray +from grass.exceptions import CalledModuleError + + +class ObjectParameterHandler: + def __init__(self): + self._numpy_inputs = {} + self._numpy_outputs = {} + self._numpy_inputs_ordered = [] + self.stdin = None + + def process_parameters(self, kwargs): + for key, value in kwargs.items(): + if isinstance(value, np.ndarray): + kwargs[key] = gs.append_uuid("tmp_serialized_input_array") + self._numpy_inputs[key] = value + self._numpy_inputs_ordered.append(value) + elif value in (np.ndarray, np.array, garray.array): + # We test for class or the function. + kwargs[key] = gs.append_uuid("tmp_serialized_output_array") + self._numpy_outputs[key] = value + elif isinstance(value, StringIO): + kwargs[key] = "-" + self.stdin = value.getvalue() + + def translate_objects_to_data(self, kwargs, parameters, env): + if "inputs" in parameters: + for param in parameters["inputs"]: + if param["param"] in self._numpy_inputs: + map2d = garray.array(env=env) + map2d[:] = self._numpy_inputs[param["param"]] + map2d.write(kwargs[param["param"]]) + + def input_rows_columns(self): + if not len(self._numpy_inputs_ordered): + return None + return self._numpy_inputs_ordered[0].shape + + def translate_data_to_objects(self, kwargs, parameters, env): + output_arrays = [] + if "outputs" in parameters: + for param in parameters["outputs"]: + if param["param"] not in self._numpy_outputs: + continue + output_array = garray.array(kwargs[param["param"]], env=env) + output_arrays.append(output_array) + if len(output_arrays) == 1: + self.result = output_arrays[0] + return True + if len(output_arrays) > 1: + self.result = tuple(output_arrays) + return True + self.result = None + return False + + +class ToolFunctionNameHelper: + def __init__(self, *, run_function, env, prefix=None): + self._run_function = run_function + self._env = env + self._prefix = prefix + + # def __getattr__(self, name): + # self.get_function(name, exception_type=AttributeError) + + def get_function(self, name, exception_type): + """Parse attribute to GRASS display module. Attribute should be in + the form 'd_module_name'. For example, 'd.rast' is called with 'd_rast'. + """ + if self._prefix: + name = f"{self._prefix}.{name}" + # Reformat string + tool_name = name.replace("_", ".") + # Assert module exists + if not shutil.which(tool_name, path=self._env["PATH"]): + suggestions = self.suggest_tools(tool_name) + if suggestions: + msg = ( + f"Tool {tool_name} not found. " + f"Did you mean: {', '.join(suggestions)}?" + ) + raise AttributeError(msg) + msg = ( + f"Tool or attribute {name} not found. " + "If you are executing a tool, is the session set up and the tool on path? " + "If you are looking for an attribute, is it in the documentation?" + ) + raise AttributeError(msg) + + def wrapper(**kwargs): + # Run module + return self._run_function(tool_name, **kwargs) + + return wrapper + + @staticmethod + def levenshtein_distance(text1: str, text2: str) -> int: + if len(text1) < len(text2): + return ToolFunctionNameHelper.levenshtein_distance(text2, text1) + + if len(text2) == 0: + return len(text1) + + previous_row = list(range(len(text2) + 1)) + for i, char1 in enumerate(text1): + current_row = [i + 1] + for j, char2 in enumerate(text2): + insertions = previous_row[j + 1] + 1 + deletions = current_row[j] + 1 + substitutions = previous_row[j] + (char1 != char2) + current_row.append(min(insertions, deletions, substitutions)) + previous_row = current_row + + return previous_row[-1] + + @staticmethod + def suggest_tools(tool): + # TODO: cache commands also for dir + all_names = list(gs.get_commands()[0]) + result = [] + max_suggestions = 10 + for name in all_names: + if ToolFunctionNameHelper.levenshtein_distance(tool, name) < len(tool) / 2: + result.append(name) + if len(result) >= max_suggestions: + break + return result + + +class ExecutedTool: + """Result returned after executing a tool""" + + def __init__(self, name, kwargs, stdout, stderr): + self._name = name + self._kwargs = kwargs + self._stdout = stdout + self._stderr = stderr + if self._stdout is not None: + self._decoded_stdout = gs.decode(self._stdout) + else: + self._decoded_stdout = None + self._cached_json = None + + @property + def text(self) -> str: + """Text output as decoded string""" + if self._decoded_stdout is None: + return None + return self._decoded_stdout.strip() + + @property + def json(self): + """Text output read as JSON + + This returns the nested structure of dictionaries and lists or fails when + the output is not JSON. + """ + if self._cached_json is None: + self._cached_json = json.loads(self._stdout) + return self._cached_json + + @property + def keyval(self): + """Text output read as key-value pairs separated by equal signs""" + + def conversion(value): + """Convert text to int or float if possible, otherwise return it as is""" + try: + return int(value) + except ValueError: + pass + try: + return float(value) + except ValueError: + pass + return value + + return gs.parse_key_val(self._stdout, val_type=conversion) + + @property + def comma_items(self): + """Text output read as comma-separated list""" + return self.text_split(",") + + @property + def space_items(self): + """Text output read as whitespace-separated list""" + return self.text_split(None) + + def text_split(self, separator=None): + """Parse text output read as list separated by separators + + Any leading or trailing newlines are removed prior to parsing. + """ + # The use of strip is assuming that the output is one line which + # ends with a newline character which is for display only. + return self._decoded_stdout.strip("\n").split(separator) + + def __getitem__(self, name): + if self._stdout: + # We are testing just std out and letting rest to the parse and the user. + # This makes no assumption about how JSON is produced by the tool. + try: + return self.json[name] + except json.JSONDecodeError as error: + if self._kwargs.get("format") == "json": + raise + msg = ( + f"Output of {self._name} cannot be parsed as JSON. " + 'Did you use format="json"?' + ) + raise ValueError(msg) from error + msg = f"No text output for {self._name} to be parsed as JSON" + raise ValueError(msg) + + +class Tools: + """Call GRASS tools as methods + + GRASS tools (modules) can be executed as methods of this class. + """ + + def __init__( + self, + *, + session=None, + env=None, + overwrite=False, + quiet=False, + verbose=False, + superquiet=False, + freeze_region=False, + stdin=None, + errors=None, + capture_output=True, + prefix=None, + ): + if env: + self._env = env.copy() + elif session and hasattr(session, "env"): + self._env = session.env.copy() + else: + self._env = os.environ.copy() + self._region_is_frozen = False + if freeze_region: + self._freeze_region() + if overwrite: + self._overwrite() + # This hopefully sets the numbers directly. An alternative implementation would + # be to pass the parameter every time. + # Does not check for multiple set at the same time, but the most verbose wins + # for safety. + if superquiet: + self._env["GRASS_VERBOSE"] = "0" + if quiet: + self._env["GRASS_VERBOSE"] = "1" + if verbose: + self._env["GRASS_VERBOSE"] = "3" + self._set_stdin(stdin) + self._errors = errors + self._capture_output = capture_output + self._prefix = prefix + self._name_helper = None + + # These could be public, not protected. + def _freeze_region(self): + self._env["GRASS_REGION"] = gs.region_env(env=self._env) + self._region_is_frozen = True + + def _overwrite(self): + self._env["GRASS_OVERWRITE"] = "1" + + def _set_stdin(self, stdin, /): + self._stdin = stdin + + @property + def env(self): + """Internally used environment (reference to it, not a copy)""" + return self._env + + def _process_parameters(self, command, popen_options): + env = popen_options.get("env", self._env) + + return subprocess.run( + [*command, "--json"], text=True, capture_output=True, env=env + ) + + def run(self, name, /, **kwargs): + """Run modules from the GRASS display family (modules starting with "d."). + + This function passes arguments directly to grass.script.run_command() + so the syntax is the same. + + :param str module: name of GRASS module + :param `**kwargs`: named arguments passed to run_command()""" + + object_parameter_handler = ObjectParameterHandler() + object_parameter_handler.process_parameters(kwargs) + + args, popen_options = gs.popen_args_command(name, **kwargs) + + interface_result = self._process_parameters(args, popen_options) + if interface_result.returncode != 0: + # This is only for the error states. + return gs.handle_errors( + interface_result.returncode, + result=None, + args=[name], + kwargs=kwargs, + stderr=interface_result.stderr, + handler="raise", + ) + parameters = json.loads(interface_result.stdout) + object_parameter_handler.translate_objects_to_data( + kwargs, parameters, env=self._env + ) + + # We approximate tool_kwargs as original kwargs. + result = self.run_from_list( + args, + tool_kwargs=kwargs, + processed_parameters=parameters, + stdin=object_parameter_handler.stdin, + **popen_options, + ) + use_objects = object_parameter_handler.translate_data_to_objects( + kwargs, parameters, env=self._env + ) + if use_objects: + result = object_parameter_handler.result + return result + + def run_from_list( + self, + command, + tool_kwargs=None, + stdin=None, + processed_parameters=None, + **popen_options, + ): + if not processed_parameters: + interface_result = self._process_parameters(command, popen_options) + if interface_result.returncode != 0: + # This is only for the error states. + return gs.handle_errors( + interface_result.returncode, + result=None, + args=[command], + kwargs=tool_kwargs, + stderr=interface_result.stderr, + handler="raise", + ) + processed_parameters = json.loads(interface_result.stdout) + + # We approximate tool_kwargs as original kwargs. + return self.no_nonsense_run_from_list( + command, + tool_kwargs=tool_kwargs, + stdin=stdin, + **popen_options, + ) + + def run_command(self, name, /, **kwargs): + # TODO: Provide custom implementation for full control + return gs.run_command(name, **kwargs, env=self._env) + + def parse_command(self, name, /, **kwargs): + # TODO: Provide custom implementation for full control + return gs.parse_command(name, **kwargs, env=self._env) + + def no_nonsense_run(self, name, /, *, tool_kwargs=None, stdin=None, **kwargs): + args, popen_options = gs.popen_args_command(name, **kwargs) + return self.no_nonsense_run_from_list( + args, tool_kwargs=tool_kwargs, stdin=stdin, **popen_options + ) + + # Make this an overload of run. + def no_nonsense_run_from_list( + self, command, tool_kwargs=None, stdin=None, **popen_options + ): + # alternatively use dev null as default or provide it as convenient settings + if self._capture_output: + stdout_pipe = gs.PIPE + stderr_pipe = gs.PIPE + else: + stdout_pipe = None + stderr_pipe = None + if self._stdin: + stdin_pipe = gs.PIPE + stdin = gs.utils.encode(self._stdin) + elif stdin: + stdin_pipe = gs.PIPE + stdin = gs.utils.encode(stdin) + else: + stdin_pipe = None + stdin = None + # Allowing to overwrite env, but that's just to have maximum flexibility when + # the session is actually set up, but it may be confusing. + if "env" not in popen_options: + popen_options["env"] = self._env + process = gs.Popen( + command, + stdin=stdin_pipe, + stdout=stdout_pipe, + stderr=stderr_pipe, + **popen_options, + ) + stdout, stderr = process.communicate(input=stdin) + if stderr: + stderr = gs.utils.decode(stderr) + returncode = process.poll() + if returncode and self._errors != "ignore": + raise CalledModuleError( + command[0], + code=" ".join(command), + returncode=returncode, + errors=stderr, + ) + # TODO: solve tool_kwargs is None + # We don't have the keyword arguments to pass to the resulting object. + return ExecutedTool( + name=command[0], kwargs=tool_kwargs, stdout=stdout, stderr=stderr + ) + + def feed_input_to(self, stdin, /): + """Get a new object which will feed text input to a tool or tools""" + return Tools( + env=self._env, + stdin=stdin, + freeze_region=self._region_is_frozen, + errors=self._errors, + capture_output=self._capture_output, + prefix=self._prefix, + ) + + def ignore_errors_of(self): + """Get a new object which will ignore errors of the called tools""" + return Tools(env=self._env, errors="ignore") + + def __getattr__(self, name): + """Parse attribute to GRASS display module. Attribute should be in + the form 'd_module_name'. For example, 'd.rast' is called with 'd_rast'. + """ + if not self._name_helper: + self._name_helper = ToolFunctionNameHelper( + run_function=self.run, + env=self.env, + prefix=self._prefix, + ) + return self._name_helper.get_function(name, exception_type=AttributeError) + + +def _test(): + """Ad-hoc tests and examples of the Tools class""" + session = gs.setup.init("~/grassdata/nc_spm_08_grass7/user1") + + tools = Tools() + tools.g_region(raster="elevation") + tools.r_slope_aspect(elevation="elevation", slope="slope", overwrite=True) + print(tools.r_univar(map="slope", flags="g").keyval) + + print(tools.v_info(map="bridges", flags="c").text) + print( + tools.v_db_univar(map="bridges", column="YEAR_BUILT", format="json").json[ + "statistics" + ]["mean"] + ) + + print(tools.g_mapset(flags="p").text) + print(tools.g_mapsets(flags="l").text_split()) + print(tools.g_mapsets(flags="l").space_items) + print(tools.g_gisenv(get="GISDBASE,LOCATION_NAME,MAPSET", sep="comma").comma_items) + + print(tools.g_region(flags="g").keyval) + + env = os.environ.copy() + env["GRASS_REGION"] = gs.region_env(res=250) + coarse_computation = Tools(env=env) + current_region = coarse_computation.g_region(flags="g").keyval + print(current_region["ewres"], current_region["nsres"]) + coarse_computation.r_slope_aspect( + elevation="elevation", slope="slope", flags="a", overwrite=True + ) + print(coarse_computation.r_info(map="slope", flags="g").keyval) + + independent_computation = Tools(session=session, freeze_region=True) + tools.g_region(res=500) # we would do this for another computation elsewhere + print(independent_computation.g_region(flags="g").keyval["ewres"]) + + tools_pro = Tools( + session=session, freeze_region=True, overwrite=True, superquiet=True + ) + tools_pro.r_slope_aspect(elevation="elevation", slope="slope") + tools_pro.feed_input_to("13.45,29.96,200").v_in_ascii( + input="-", output="point", separator="," + ) + print(tools_pro.v_info(map="point", flags="t").keyval["points"]) + + print(tools_pro.ignore_errors_of().g_version(flags="rge").keyval) + + elevation = "elevation" + exaggerated = "exaggerated" + tools_pro.r_mapcalc(expression=f"{exaggerated} = 5 * {elevation}") + tools_pro.feed_input_to(f"{exaggerated} = 5 * {elevation}").r_mapcalc(file="-") + + +if __name__ == "__main__": + _test() diff --git a/python/grass/script/core.py b/python/grass/script/core.py index d114dbeeefc..bd08865c510 100644 --- a/python/grass/script/core.py +++ b/python/grass/script/core.py @@ -313,7 +313,7 @@ def make_command( return args -def handle_errors(returncode, result, args, kwargs): +def handle_errors(returncode, result, args, kwargs, handler=None, stderr=None): """Error handler for :func:`run_command()` and similar functions The functions which are using this function to handle errors, @@ -358,7 +358,8 @@ def get_module_and_code(args, kwargs): code = " ".join(args) return module, code - handler = kwargs.get("errors", "raise") + if handler is None: + handler = kwargs.get("errors", "raise") if handler.lower() == "status": return returncode if returncode == 0: @@ -376,7 +377,9 @@ def get_module_and_code(args, kwargs): sys.exit(returncode) else: module, code = get_module_and_code(args, kwargs) - raise CalledModuleError(module=module, code=code, returncode=returncode) + raise CalledModuleError( + module=module, code=code, returncode=returncode, errors=stderr + ) def popen_args_command(