diff --git a/nless/batch.py b/nless/batch.py
index 80bd2ed..cf73457 100644
--- a/nless/batch.py
+++ b/nless/batch.py
@@ -2,13 +2,13 @@
from __future__ import annotations
-import csv
import json
import re
import signal
import sys
from .dataprocessing import coerce_sort_key, matches_all_filters, strip_markup
+from .operations import write_rows_to_fd
from .delimiter import (
detect_space_splitting_strategy,
find_header_index,
@@ -215,35 +215,8 @@ def col_fn(name, _render=False):
visible_headers = [strip_markup(columns[i].name) for i in visible_indices]
# Write output
- _write_output(cli_args.output_format, visible_headers, data_rows, visible_indices)
-
-
-def _write_output(
- fmt: str,
- headers: list[str],
- rows: list[list[str]],
- visible_indices: list[int],
-) -> None:
- """Write rows to stdout in the requested format."""
- if fmt == "json":
- for row in rows:
- obj = {}
- for i, hi in enumerate(visible_indices):
- val = strip_markup(row[hi]) if hi < len(row) else ""
- obj[headers[i]] = val
- sys.stdout.write(json.dumps(obj) + "\n")
- elif fmt == "tsv":
- writer = csv.writer(sys.stdout, delimiter="\t")
- writer.writerow(headers)
- for row in rows:
- writer.writerow(
- [strip_markup(row[i]) if i < len(row) else "" for i in visible_indices]
- )
- else:
- # csv (default)
- writer = csv.writer(sys.stdout)
- writer.writerow(headers)
- for row in rows:
- writer.writerow(
- [strip_markup(row[i]) if i < len(row) else "" for i in visible_indices]
- )
+ cleaned_rows = [
+ [strip_markup(row[i]) if i < len(row) else "" for i in visible_indices]
+ for row in data_rows
+ ]
+ write_rows_to_fd(visible_headers, cleaned_rows, sys.stdout, cli_args.output_format)
diff --git a/nless/cli.py b/nless/cli.py
index 4833773..fe1d578 100644
--- a/nless/cli.py
+++ b/nless/cli.py
@@ -110,7 +110,7 @@ def parse_args(argv=None) -> CliArgs:
parser.add_argument(
"--output-format",
"-o",
- choices=["csv", "tsv", "json", "raw"],
+ choices=["csv", "tsv", "json", "raw", "markdown", "html"],
help="Output format for pipe/batch output (default: csv)",
default="csv",
)
diff --git a/nless/operations.py b/nless/operations.py
index 8a08c06..b044e02 100644
--- a/nless/operations.py
+++ b/nless/operations.py
@@ -128,6 +128,51 @@ def handle_mark_unique(new_buffer: NlessBuffer, new_unique_column_name: str) ->
new_buffer._pivot_hidden_columns.clear()
+def write_rows_to_fd(
+ headers: list[str], rows: list[list[str]], fd: IO[str], output_format: str = "csv"
+) -> None:
+ """Write pre-cleaned headers and rows to fd in the requested format."""
+ if output_format == "json":
+ for row in rows:
+ fd.write(json.dumps(dict(zip(headers, row))) + "\n")
+ elif output_format == "raw":
+ for row in rows:
+ fd.write("\t".join(row) + "\n")
+ elif output_format == "markdown":
+ col_widths = [len(h) for h in headers]
+ for row in rows:
+ for i, cell in enumerate(row):
+ col_widths[i] = max(col_widths[i], len(cell))
+ sep = "| " + " | ".join("-" * w for w in col_widths) + " |\n"
+ fd.write(
+ "| " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths)) + " |\n"
+ )
+ fd.write(sep)
+ for row in rows:
+ fd.write(
+ "| "
+ + " | ".join(cell.ljust(col_widths[i]) for i, cell in enumerate(row))
+ + " |\n"
+ )
+ elif output_format == "html":
+ fd.write("
\n")
+ fd.write(
+ "\n"
+ + "".join(f"| {h} | " for h in headers)
+ + "
\n\n"
+ )
+ fd.write("\n")
+ for row in rows:
+ fd.write("" + "".join(f"| {cell} | " for cell in row) + "
\n")
+ fd.write("\n
\n")
+ else:
+ delim = "\t" if output_format == "tsv" else ","
+ writer = csv.writer(fd, delimiter=delim)
+ writer.writerow(headers)
+ for row in rows:
+ writer.writerow(row)
+
+
def write_buffer_to_fd(
current_buffer: NlessBuffer, fd: IO[str], output_format: str = "csv"
) -> None:
@@ -137,36 +182,12 @@ def write_buffer_to_fd(
immediately, intended for use during app exit (pipe output).
"""
headers = current_buffer._get_visible_column_labels()
- rows = current_buffer.displayed_rows
-
+ rows = [
+ [strip_markup(str(cell)) for cell in row]
+ for row in current_buffer.displayed_rows
+ ]
try:
- if output_format == "json":
- for row in rows:
- obj = {h: strip_markup(str(cell)) for h, cell in zip(headers, row)}
- fd.write(json.dumps(obj) + "\n")
- elif output_format == "raw":
- for row in rows:
- fd.write("\t".join(strip_markup(str(cell)) for cell in row) + "\n")
- elif output_format == "markdown":
- col_widths = [max(len(strip_markup(str(cell))) for cell in [h] + [r[i] for r in rows]) for i, h in enumerate(headers)]
- sep = "| " + "| ".join("-" * w for w in col_widths) + "|\n"
- fd.write("| " + "| ".join(strip_markup(str(h)).ljust(col_widths[i]) for i, h in enumerate(headers)) + "|\n")
- fd.write(sep)
- for row in rows:
- fd.write("| " + "| ".join(strip_markup(str(cell)).ljust(col_widths[i]) for i, cell in enumerate(row)) + "|\n")
- elif output_format == "html":
- fd.write("\n")
- fd.write("\n" + "".join(f"| {strip_markup(str(h))} | " for h in headers) + "
\n\n")
- fd.write("\n")
- for row in rows:
- fd.write("" + "".join(f"| {strip_markup(str(cell))} | " for cell in row) + "
\n")
- fd.write("\n
\n")
- else:
- delim = "\t" if output_format == "tsv" else ","
- writer = csv.writer(fd, delimiter=delim)
- writer.writerow(headers)
- for row in rows:
- writer.writerow([strip_markup(str(cell)) for cell in row])
+ write_rows_to_fd(headers, rows, fd, output_format)
except BrokenPipeError:
pass
diff --git a/tests/test_cli.py b/tests/test_cli.py
index a1257bf..f235b41 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -204,6 +204,14 @@ def test_output_format_default_csv(self):
cli_args = parse_args([])
assert cli_args.output_format == "csv"
+ def test_output_format_markdown(self):
+ cli_args = parse_args(["-o", "markdown"])
+ assert cli_args.output_format == "markdown"
+
+ def test_output_format_html(self):
+ cli_args = parse_args(["-o", "html"])
+ assert cli_args.output_format == "html"
+
def test_output_format_invalid_exits(self):
with pytest.raises(SystemExit):
parse_args(["-o", "xml"])
diff --git a/tests/test_operations.py b/tests/test_operations.py
index 4d3dc00..51963c1 100644
--- a/tests/test_operations.py
+++ b/tests/test_operations.py
@@ -2511,6 +2511,9 @@ def test_infer_output_format(self):
assert _infer_output_format("out.csv") == "csv"
assert _infer_output_format("out.txt") == "raw"
assert _infer_output_format("out.log") == "raw"
+ assert _infer_output_format("out.md") == "markdown"
+ assert _infer_output_format("out.markdown") == "markdown"
+ assert _infer_output_format("out.html") == "html"
assert _infer_output_format("out.xyz") == "csv"
assert _infer_output_format("out") == "csv"
assert _infer_output_format("/dev/stdout") == "csv"
@@ -2614,6 +2617,56 @@ async def test_write_jsonl_extension(self, cli_args, tmp_path):
obj = json_mod.loads(lines[0])
assert obj["name"] == "Alice"
+ @pytest.mark.asyncio
+ async def test_write_markdown_output(self, cli_args, tmp_path):
+ from nless.operations import write_buffer
+
+ app = NlessApp(cli_args=cli_args, starting_stream=None)
+ async with app.run_test(size=(120, 40)) as pilot:
+ buf = app.buffers[0]
+ _load(buf, ["name,age,city", "Alice,30,NYC", "Bob,25,SF"])
+ await _wait(pilot, app)
+
+ output_path = str(tmp_path / "output.md")
+ write_buffer(buf, output_path)
+
+ with open(output_path) as f:
+ lines = f.readlines()
+
+ # header row, separator row, 2 data rows
+ assert len(lines) == 4
+ assert lines[0].startswith("| ")
+ assert "name" in lines[0]
+ assert "age" in lines[0]
+ assert (
+ set(lines[1].strip(" |\n-")) == set()
+ ) # separator has only dashes, pipes, spaces
+ assert "Alice" in lines[2]
+ assert "Bob" in lines[3]
+
+ @pytest.mark.asyncio
+ async def test_write_html_output(self, cli_args, tmp_path):
+ from nless.operations import write_buffer
+
+ app = NlessApp(cli_args=cli_args, starting_stream=None)
+ async with app.run_test(size=(120, 40)) as pilot:
+ buf = app.buffers[0]
+ _load(buf, ["name,age,city", "Alice,30,NYC", "Bob,25,SF"])
+ await _wait(pilot, app)
+
+ output_path = str(tmp_path / "output.html")
+ write_buffer(buf, output_path)
+
+ with open(output_path) as f:
+ content = f.read()
+
+ assert "" in content
+ assert "" in content and "" in content
+ assert "" in content and "" in content
+ assert "| name | " in content
+ assert "Alice | " in content
+ assert "Bob | " in content
+
class TestColumnAggregations:
def test_numeric_aggregations(self):