Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 6 additions & 33 deletions nless/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

from __future__ import annotations

import csv
import json
import re
import signal
import sys

from .dataprocessing import coerce_sort_key, matches_all_filters, strip_markup
from .operations import write_rows_to_fd
from .delimiter import (
detect_space_splitting_strategy,
find_header_index,
Expand Down Expand Up @@ -215,35 +215,8 @@ def col_fn(name, _render=False):
visible_headers = [strip_markup(columns[i].name) for i in visible_indices]

# Write output
_write_output(cli_args.output_format, visible_headers, data_rows, visible_indices)


def _write_output(
fmt: str,
headers: list[str],
rows: list[list[str]],
visible_indices: list[int],
) -> None:
"""Write rows to stdout in the requested format."""
if fmt == "json":
for row in rows:
obj = {}
for i, hi in enumerate(visible_indices):
val = strip_markup(row[hi]) if hi < len(row) else ""
obj[headers[i]] = val
sys.stdout.write(json.dumps(obj) + "\n")
elif fmt == "tsv":
writer = csv.writer(sys.stdout, delimiter="\t")
writer.writerow(headers)
for row in rows:
writer.writerow(
[strip_markup(row[i]) if i < len(row) else "" for i in visible_indices]
)
else:
# csv (default)
writer = csv.writer(sys.stdout)
writer.writerow(headers)
for row in rows:
writer.writerow(
[strip_markup(row[i]) if i < len(row) else "" for i in visible_indices]
)
cleaned_rows = [
[strip_markup(row[i]) if i < len(row) else "" for i in visible_indices]
for row in data_rows
]
write_rows_to_fd(visible_headers, cleaned_rows, sys.stdout, cli_args.output_format)
2 changes: 1 addition & 1 deletion nless/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def parse_args(argv=None) -> CliArgs:
parser.add_argument(
"--output-format",
"-o",
choices=["csv", "tsv", "json", "raw"],
choices=["csv", "tsv", "json", "raw", "markdown", "html"],
help="Output format for pipe/batch output (default: csv)",
default="csv",
)
Expand Down
79 changes: 50 additions & 29 deletions nless/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,51 @@ def handle_mark_unique(new_buffer: NlessBuffer, new_unique_column_name: str) ->
new_buffer._pivot_hidden_columns.clear()


def write_rows_to_fd(
headers: list[str], rows: list[list[str]], fd: IO[str], output_format: str = "csv"
) -> None:
"""Write pre-cleaned headers and rows to fd in the requested format."""
if output_format == "json":
for row in rows:
fd.write(json.dumps(dict(zip(headers, row))) + "\n")
elif output_format == "raw":
for row in rows:
fd.write("\t".join(row) + "\n")
elif output_format == "markdown":
col_widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
col_widths[i] = max(col_widths[i], len(cell))
sep = "| " + " | ".join("-" * w for w in col_widths) + " |\n"
fd.write(
"| " + " | ".join(h.ljust(w) for h, w in zip(headers, col_widths)) + " |\n"
)
fd.write(sep)
for row in rows:
fd.write(
"| "
+ " | ".join(cell.ljust(col_widths[i]) for i, cell in enumerate(row))
+ " |\n"
)
elif output_format == "html":
fd.write("<table>\n")
fd.write(
"<thead>\n<tr>"
+ "".join(f"<th>{h}</th>" for h in headers)
+ "</tr>\n</thead>\n"
)
fd.write("<tbody>\n")
for row in rows:
fd.write("<tr>" + "".join(f"<td>{cell}</td>" for cell in row) + "</tr>\n")
fd.write("</tbody>\n</table>\n")
else:
delim = "\t" if output_format == "tsv" else ","
writer = csv.writer(fd, delimiter=delim)
writer.writerow(headers)
for row in rows:
writer.writerow(row)


def write_buffer_to_fd(
current_buffer: NlessBuffer, fd: IO[str], output_format: str = "csv"
) -> None:
Expand All @@ -137,36 +182,12 @@ def write_buffer_to_fd(
immediately, intended for use during app exit (pipe output).
"""
headers = current_buffer._get_visible_column_labels()
rows = current_buffer.displayed_rows

rows = [
[strip_markup(str(cell)) for cell in row]
for row in current_buffer.displayed_rows
]
try:
if output_format == "json":
for row in rows:
obj = {h: strip_markup(str(cell)) for h, cell in zip(headers, row)}
fd.write(json.dumps(obj) + "\n")
elif output_format == "raw":
for row in rows:
fd.write("\t".join(strip_markup(str(cell)) for cell in row) + "\n")
elif output_format == "markdown":
col_widths = [max(len(strip_markup(str(cell))) for cell in [h] + [r[i] for r in rows]) for i, h in enumerate(headers)]
sep = "| " + "| ".join("-" * w for w in col_widths) + "|\n"
fd.write("| " + "| ".join(strip_markup(str(h)).ljust(col_widths[i]) for i, h in enumerate(headers)) + "|\n")
fd.write(sep)
for row in rows:
fd.write("| " + "| ".join(strip_markup(str(cell)).ljust(col_widths[i]) for i, cell in enumerate(row)) + "|\n")
elif output_format == "html":
fd.write("<table>\n")
fd.write("<thead>\n<tr>" + "".join(f"<th>{strip_markup(str(h))}</th>" for h in headers) + "</tr>\n</thead>\n")
fd.write("<tbody>\n")
for row in rows:
fd.write("<tr>" + "".join(f"<td>{strip_markup(str(cell))}</td>" for cell in row) + "</tr>\n")
fd.write("</tbody>\n</table>\n")
else:
delim = "\t" if output_format == "tsv" else ","
writer = csv.writer(fd, delimiter=delim)
writer.writerow(headers)
for row in rows:
writer.writerow([strip_markup(str(cell)) for cell in row])
write_rows_to_fd(headers, rows, fd, output_format)
except BrokenPipeError:
pass

Expand Down
8 changes: 8 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ def test_output_format_default_csv(self):
cli_args = parse_args([])
assert cli_args.output_format == "csv"

def test_output_format_markdown(self):
cli_args = parse_args(["-o", "markdown"])
assert cli_args.output_format == "markdown"

def test_output_format_html(self):
cli_args = parse_args(["-o", "html"])
assert cli_args.output_format == "html"

def test_output_format_invalid_exits(self):
with pytest.raises(SystemExit):
parse_args(["-o", "xml"])
Expand Down
53 changes: 53 additions & 0 deletions tests/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2511,6 +2511,9 @@ def test_infer_output_format(self):
assert _infer_output_format("out.csv") == "csv"
assert _infer_output_format("out.txt") == "raw"
assert _infer_output_format("out.log") == "raw"
assert _infer_output_format("out.md") == "markdown"
assert _infer_output_format("out.markdown") == "markdown"
assert _infer_output_format("out.html") == "html"
assert _infer_output_format("out.xyz") == "csv"
assert _infer_output_format("out") == "csv"
assert _infer_output_format("/dev/stdout") == "csv"
Expand Down Expand Up @@ -2614,6 +2617,56 @@ async def test_write_jsonl_extension(self, cli_args, tmp_path):
obj = json_mod.loads(lines[0])
assert obj["name"] == "Alice"

@pytest.mark.asyncio
async def test_write_markdown_output(self, cli_args, tmp_path):
from nless.operations import write_buffer

app = NlessApp(cli_args=cli_args, starting_stream=None)
async with app.run_test(size=(120, 40)) as pilot:
buf = app.buffers[0]
_load(buf, ["name,age,city", "Alice,30,NYC", "Bob,25,SF"])
await _wait(pilot, app)

output_path = str(tmp_path / "output.md")
write_buffer(buf, output_path)

with open(output_path) as f:
lines = f.readlines()

# header row, separator row, 2 data rows
assert len(lines) == 4
assert lines[0].startswith("| ")
assert "name" in lines[0]
assert "age" in lines[0]
assert (
set(lines[1].strip(" |\n-")) == set()
) # separator has only dashes, pipes, spaces
assert "Alice" in lines[2]
assert "Bob" in lines[3]

@pytest.mark.asyncio
async def test_write_html_output(self, cli_args, tmp_path):
from nless.operations import write_buffer

app = NlessApp(cli_args=cli_args, starting_stream=None)
async with app.run_test(size=(120, 40)) as pilot:
buf = app.buffers[0]
_load(buf, ["name,age,city", "Alice,30,NYC", "Bob,25,SF"])
await _wait(pilot, app)

output_path = str(tmp_path / "output.html")
write_buffer(buf, output_path)

with open(output_path) as f:
content = f.read()

assert "<table>" in content
assert "<thead>" in content and "</thead>" in content
assert "<tbody>" in content and "</tbody>" in content
assert "<th>name</th>" in content
assert "<td>Alice</td>" in content
assert "<td>Bob</td>" in content


class TestColumnAggregations:
def test_numeric_aggregations(self):
Expand Down
Loading