From 99ad3e298237176158f23413f05f49b6d5462a04 Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Wed, 19 Nov 2025 22:34:06 +0000 Subject: [PATCH 1/6] Add trace-aware stage summaries + plotting helper for profiling --- scripts/tests/README.md | 92 ++++++++++++++++++ scripts/tests/cases/e2e.py | 170 ++++++++++++++++++++++++++++++++- scripts/tests/config.py | 6 ++ scripts/tests/run.py | 2 + src/nv_ingest/api/v2/ingest.py | 2 +- 5 files changed, 270 insertions(+), 2 deletions(-) diff --git a/scripts/tests/README.md b/scripts/tests/README.md index ecd062efb..62345dcf7 100644 --- a/scripts/tests/README.md +++ b/scripts/tests/README.md @@ -198,6 +198,43 @@ EXTRACT_IMAGES=true API_VERSION=v2 python run.py --case=e2e --dataset=bo767 - `artifacts_dir` (string): Test output directory (auto-generated if null) - `collection_name` (string): Milvus collection name (auto-generated as `{test_name}_multimodal` if null, deterministic - no timestamp) +#### Trace Capture Options +- `enable_traces` (boolean): When `true`, the `e2e` case requests parent-level trace payloads from the V2 API (see `docs/docs/extraction/v2-api-guide.md`) and emits per-stage summaries. +- `trace_output_dir` (string or null): Optional override for where raw trace JSON files are written. Defaults to `/traces` when null. + +When traces are enabled: + +1. Each processed document gets a raw payload written to `trace_output_dir/_.json` that contains the complete `"trace::"` dictionary plus a small stage summary. These files match the structure shown in `scripts/private_local/trace.json`. +2. `results.json` gains a `trace_summary` block that aggregates resident-time seconds per stage and lists total compute seconds per document. Example: + +```json +"trace_summary": { + "documents": 20, + "output_dir": "/raid/.../scripts/tests/artifacts/bo20_20251119_183733_UTC/traces", + "stage_totals": { + "pdf_extractor": { + "documents": 20, + "total_resident_s": 32.18, + "avg_resident_s": 1.61, + "max_resident_s": 2.04, + "min_resident_s": 1.42 + } + }, + "document_totals": [ + {"document_index": 0, "source_id": "doc001.pdf", "total_resident_s": 1.58}, + {"document_index": 1, "source_id": "doc002.pdf", "total_resident_s": 1.64} + // ... + ] +} +``` + +**Enable via YAML or environment variables:** + +- YAML: set `enable_traces: true` (and optionally `trace_output_dir`) in the `active` section. +- Environment: `ENABLE_TRACES=true TRACE_OUTPUT_DIR=/raid/jioffe/traces python run.py --case=e2e --dataset=bo20` + +> ℹ️ Tracing is most valuable with V2 + PDF splitting enabled. Follow the guidance in `docs/docs/extraction/v2-api-guide.md` to ensure `api_version=v2` and `pdf_split_page_count` are configured. + ### Valid Configuration Values **text_depth**: `block`, `body`, `document`, `header`, `line`, `nearby_block`, `other`, `page`, `span` @@ -820,6 +857,61 @@ EXTRACT_IMAGES=true python run.py --case=e2e --dataset=bo767 - **Configuration**: See `config.py` for complete field list and validation logic - **Test utilities**: See `interact.py` for shared helper functions +## Profiling & Trace Capture Workflow + +### Baseline vs RC Trace Runs + +1. **Prep environment** + ```bash + source ~/setup_env.sh + nv-restart # baseline (local build) + # or + nv-stop && nv-start-release # RC image + ``` +2. **Enable traces + V2 split settings (env vars or YAML)** + ```bash + export API_VERSION=v2 + export PDF_SPLIT_PAGE_COUNT=32 # use 16/64/etc. as needed + export ENABLE_TRACES=true + # Optional: export TRACE_OUTPUT_DIR=/raid/jioffe/traces/baseline + ``` +3. **Run datasets (repeat for bo20 + bo767, baseline + RC)** + ```bash + python scripts/tests/run.py --case=e2e --dataset=bo20 + python scripts/tests/run.py --case=e2e --dataset=bo767 + ``` +4. **Collect artifacts** + - `stdout.txt` → console logs + - `results.json` → includes `trace_summary` with per-stage resident seconds + - `traces/*.json` → raw payloads identical to `scripts/private_local/trace.json` + - Use descriptive folder names (e.g., copy artifacts to `artifacts/baseline_bo20/`) to keep baseline vs RC side-by-side. +5. **Visualize stage timings** + ```bash + # Generates results.stage_time.png + textual summary + python scripts/tests/tools/plot_stage_totals.py \ + scripts/tests/artifacts//results.json \ + --sort total \ + --keep-nested \ + --exclude-network + ``` + - `--keep-nested` preserves nested entries such as chart/table OCR workloads + - `--exclude-network` hides broker/Redis wait time so the chart focuses on Ray stages + +### Future Trace Visualization Case (roadmap) + +With `trace_summary` + raw dumps in place, a follow-on `cases/e2e_profile.py` can: +- Accept one or two artifact directories (baseline vs RC) as input. +- Load each run’s `trace_summary` and optional raw trace JSON to compute deltas. +- Emit per-stage bar/violin charts (resident vs wall clock) plus CSV summaries. +- Store outputs under `trace_profile/` inside each artifact directory. + +Open questions before implementing: +- Preferred plotting backend (`matplotlib`, `plotly`, or Altair) and whether CI should emit PNG, HTML, or both. +- Artifact naming for comparisons (e.g., `trace_profile/baseline_vs_rc/bo20.png`). +- Regression thresholds and alerting expectations (what % delta constitutes a failure?). +- Should the visualization case automatically diff against the most recent baseline or just emit standalone assets? + +Capturing these answers (plus a few representative trace samples) will let a future sprint land the visualization workflow quickly. - **Docker setup**: See project root README for service management commands - **API documentation**: See `docs/` for API version differences diff --git a/scripts/tests/cases/e2e.py b/scripts/tests/cases/e2e.py index c4aa3f91b..f00ff85c5 100644 --- a/scripts/tests/cases/e2e.py +++ b/scripts/tests/cases/e2e.py @@ -1,9 +1,11 @@ import json import logging import os +import re import shutil import sys import time +from collections import defaultdict from nv_ingest_client.client import Ingestor from nv_ingest_client.util.document_analysis import analyze_document_chunks @@ -25,6 +27,138 @@ MilvusClient = None # Optional; stats logging will be skipped if unavailable +NS_IN_SECOND = 1_000_000_000 + + +def _ns_to_seconds(value): + if value is None: + return None + return value / NS_IN_SECOND + + +def _safe_trace_filename(source_id: str | None, index: int) -> str: + base = source_id or f"document_{index}" + base = os.path.basename(str(base)) + sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", base)[:80] + if not sanitized: + sanitized = f"document_{index}" + return f"{index:03d}_{sanitized}" + + +def _extract_source_id(doc_results) -> str | None: + if doc_results is None: + return None + try: + first_entry = doc_results[0] + except (IndexError, KeyError, TypeError): + return None + except Exception: + try: + iterator = iter(doc_results) + first_entry = next(iterator) + except Exception: + return None + metadata = first_entry.get("metadata", {}) if isinstance(first_entry, dict) else {} + source_meta = metadata.get("source_metadata", {}) + return source_meta.get("source_id") + + +def _summarize_traces(traces_list, results, trace_dir: str | None): + if not traces_list: + return None + + valid_traces = [(idx, trace) for idx, trace in enumerate(traces_list) if trace] + if not valid_traces: + return None + + stage_totals = defaultdict(list) + document_totals = [] + + if trace_dir: + os.makedirs(trace_dir, exist_ok=True) + + for doc_index, trace_payload in valid_traces: + if not isinstance(trace_payload, dict): + continue + + source_id = None + if results and doc_index < len(results): + source_id = _extract_source_id(results[doc_index]) + if not source_id: + source_id = f"document_{doc_index}" + + stage_breakdown = {} + total_resident = 0.0 + total_wall = 0.0 + wall_seen = False + + for key, value in trace_payload.items(): + if not key.startswith("trace::resident_time::"): + continue + stage = key.replace("trace::resident_time::", "") + resident_s = _ns_to_seconds(value) or 0.0 + entry = trace_payload.get(f"trace::entry::{stage}") + exit_value = trace_payload.get(f"trace::exit::{stage}") + wall_s = None + if entry is not None and exit_value is not None: + wall_s = _ns_to_seconds(exit_value - entry) + + stage_entry = {"resident_s": round(resident_s, 6)} + if wall_s is not None: + stage_entry["wall_s"] = round(wall_s, 6) + wall_seen = True + total_wall += wall_s + + stage_breakdown[stage] = stage_entry + total_resident += resident_s + stage_totals[stage].append(resident_s) + + document_totals.append( + { + "document_index": doc_index, + "source_id": source_id, + "total_resident_s": round(total_resident, 6), + **({"total_wall_s": round(total_wall, 6)} if wall_seen else {}), + } + ) + + if trace_dir: + trace_payload_path = os.path.join(trace_dir, f"{_safe_trace_filename(source_id, doc_index)}.json") + trace_record = { + "document_index": doc_index, + "source_id": source_id, + "trace": trace_payload, + "stage_summary": stage_breakdown, + } + try: + with open(trace_payload_path, "w") as fp: + json.dump(trace_record, fp, indent=2) + except OSError as err: + print(f"Failed to write trace file {trace_payload_path}: {err}") + + if not stage_totals: + return None + + stage_summary = {} + for stage, values in stage_totals.items(): + total = sum(values) + count = len(values) + stage_summary[stage] = { + "documents": count, + "total_resident_s": round(total, 6), + "avg_resident_s": round(total / count, 6), + "max_resident_s": round(max(values), 6), + "min_resident_s": round(min(values), 6), + } + + return { + "documents": len(document_totals), + "output_dir": trace_dir, + "stage_totals": stage_summary, + "document_totals": document_totals, + } + + def main(config=None, log_path: str = "test_results") -> int: """ Main test entry point. @@ -77,6 +211,18 @@ def main(config=None, log_path: str = "test_results") -> int: enable_caption = config.enable_caption enable_split = config.enable_split + # Trace capture + enable_traces = getattr(config, "enable_traces", False) + trace_output_dir = getattr(config, "trace_output_dir", None) + trace_dir = None + if enable_traces: + trace_dir = ( + os.path.abspath(os.path.expanduser(trace_output_dir)) + if trace_output_dir + else os.path.join(log_path, "traces") + ) + print(f"Trace capture enabled. Raw traces will be written to: {trace_dir}") + # Text splitting configuration split_chunk_size = config.split_chunk_size split_chunk_overlap = config.split_chunk_overlap @@ -179,7 +325,20 @@ def main(config=None, log_path: str = "test_results") -> int: .save_to_disk(output_directory=spill_dir) ) - results, failures = ingestor.ingest(show_progress=True, return_failures=True, save_to_disk=True) + ingest_kwargs = { + "show_progress": True, + "return_failures": True, + "save_to_disk": True, + } + if enable_traces: + ingest_kwargs["return_traces"] = True + + ingest_result = ingestor.ingest(**ingest_kwargs) + if enable_traces: + results, failures, traces_list = ingest_result + else: + results, failures = ingest_result + traces_list = [] ingestion_time = time.time() - ingestion_start kv_event_log("result_count", len(results), log_path) kv_event_log("failure_count", len(failures), log_path) @@ -236,6 +395,12 @@ def main(config=None, log_path: str = "test_results") -> int: retrieval_time = time.time() - querying_start kv_event_log("retrieval_time_s", retrieval_time, log_path) + trace_summary = None + if enable_traces: + trace_summary = _summarize_traces(traces_list, results, trace_dir) + if trace_summary: + kv_event_log("trace_documents", trace_summary["documents"], log_path) + # Summarize - Build comprehensive results dict dataset_name = os.path.basename(data_dir.rstrip("/")) if data_dir else "unknown" test_name = config.test_name or dataset_name @@ -280,6 +445,9 @@ def main(config=None, log_path: str = "test_results") -> int: test_results["test_config"]["split_chunk_size"] = split_chunk_size test_results["test_config"]["split_chunk_overlap"] = split_chunk_overlap + if trace_summary: + test_results["trace_summary"] = trace_summary + print(f"\n{test_name}_e2e summary:") print(json.dumps(test_results, indent=2)) diff --git a/scripts/tests/config.py b/scripts/tests/config.py index 5a85d3afd..9294fac9c 100644 --- a/scripts/tests/config.py +++ b/scripts/tests/config.py @@ -56,6 +56,10 @@ class TestConfig: split_chunk_size: int = 1024 split_chunk_overlap: int = 150 + # Trace configuration + enable_traces: bool = False + trace_output_dir: Optional[str] = None + # Storage configuration spill_dir: str = "/tmp/spill" artifacts_dir: Optional[str] = None @@ -265,6 +269,8 @@ def parse_list(value: str) -> List[str]: "ENABLE_SPLIT": ("enable_split", parse_bool), "SPLIT_CHUNK_SIZE": ("split_chunk_size", parse_int), "SPLIT_CHUNK_OVERLAP": ("split_chunk_overlap", parse_int), + "ENABLE_TRACES": ("enable_traces", parse_bool), + "TRACE_OUTPUT_DIR": ("trace_output_dir", str), "SPILL_DIR": ("spill_dir", str), "ARTIFACTS_DIR": ("artifacts_dir", str), "COLLECTION_NAME": ("collection_name", str), diff --git a/scripts/tests/run.py b/scripts/tests/run.py index dc53082a4..c01fcad19 100644 --- a/scripts/tests/run.py +++ b/scripts/tests/run.py @@ -179,6 +179,8 @@ def run_datasets( "infrastructure": "managed" if managed else "attach", "api_version": config.api_version, "pdf_split_page_count": config.pdf_split_page_count, + "enable_traces": getattr(config, "enable_traces", False), + "trace_output_dir": getattr(config, "trace_output_dir", None), "return_code": rc, } diff --git a/src/nv_ingest/api/v2/ingest.py b/src/nv_ingest/api/v2/ingest.py index d060ec543..6a5878b9e 100644 --- a/src/nv_ingest/api/v2/ingest.py +++ b/src/nv_ingest/api/v2/ingest.py @@ -212,7 +212,7 @@ def _create_subjob_dict( tracing_options = dict(base_tracing_options) tracing_options.setdefault("trace", True) tracing_options["trace_id"] = str(current_trace_id) - tracing_options["ts_send"] = int(time.time() * 1000) + tracing_options["ts_send"] = time.time_ns() tracing_options["parent_job_id"] = parent_job_id for key, value in start_key.items(): tracing_options[key] = value From 19315a15161e0074171999d3230a1874eee9a87f Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Thu, 20 Nov 2025 22:16:01 +0000 Subject: [PATCH 2/6] posthoc util for plotting --- scripts/tests/tools/plot_stage_totals.py | 246 +++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 scripts/tests/tools/plot_stage_totals.py diff --git a/scripts/tests/tools/plot_stage_totals.py b/scripts/tests/tools/plot_stage_totals.py new file mode 100644 index 000000000..67a3394d1 --- /dev/null +++ b/scripts/tests/tools/plot_stage_totals.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +Visualize per-stage resident times from scripts/tests artifacts. + +Usage: + python scripts/tests/tools/plot_stage_totals.py \ + scripts/tests/artifacts//results.json \ + --pipeline config/default_pipeline.yaml \ + --top-n 30 +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Dict, List, Tuple + +import matplotlib.pyplot as plt +import yaml + + +FRIENDLY_STAGE_NAMES: Dict[str, str] = { + "source_stage": "Source", + "metadata_injector": "Metadata Injector", + "metadata_injector_channel_in": "Metadata Queue", + "pdf_extractor": "PDF Extractor", + "pdf_extractor_channel_in": "PDF Queue", + "audio_extractor": "Audio Extractor", + "audio_extractor_channel_in": "Audio Queue", + "docx_extractor": "DOCX Extractor", + "docx_extractor_channel_in": "DOCX Queue", + "pptx_extractor": "PPTX Extractor", + "pptx_extractor_channel_in": "PPTX Queue", + "image_extractor": "Image Extractor", + "image_extractor_channel_in": "Image Queue", + "html_extractor": "HTML Extractor", + "html_extractor_channel_in": "HTML Queue", + "infographic_extractor": "Infographic Extractor", + "infographic_extractor_channel_in": "Infographic Queue", + "table_extractor": "Table Extractor", + "table_extractor_channel_in": "Table Queue", + "chart_extractor": "Chart Extractor", + "chart_extractor_channel_in": "Chart Queue", + "image_filter": "Image Filter", + "image_filter_channel_in": "Image Filter Queue", + "image_dedup": "Image Dedup", + "image_dedup_channel_in": "Image Dedup Queue", + "text_splitter": "Text Splitter", + "text_splitter_channel_in": "Text Splitter Queue", + "image_caption": "Image Caption", + "image_caption_channel_in": "Image Caption Queue", + "text_embedder": "Text Embedder", + "text_embedder_channel_in": "Text Embed Queue", + "image_storage": "Image Storage", + "image_storage_channel_in": "Image Storage Queue", + "embedding_storage": "Embedding Storage", + "embedding_storage_channel_in": "Embedding Storage Queue", + "broker_response": "Broker Response", + "broker_source_network_in": "Broker Network", + "message_broker_task_source": "Message Broker Source", +} + + +def load_stage_order(pipeline_yaml: Path) -> List[str]: + pipeline = yaml.safe_load(pipeline_yaml.read_text()) + return [stage["name"] for stage in pipeline.get("stages", [])] + + +def friendly_name(stage: str) -> str: + if stage in FRIENDLY_STAGE_NAMES: + return FRIENDLY_STAGE_NAMES[stage] + if stage.endswith("_channel_in"): + base = stage.removesuffix("_channel_in") + return f"{FRIENDLY_STAGE_NAMES.get(base, base)} Queue" + # Collapse nested trace stages + if "::" in stage: + parts = stage.split("::") + if parts[0] in FRIENDLY_STAGE_NAMES: + return f"{FRIENDLY_STAGE_NAMES[parts[0]]} :: {'::'.join(parts[2:])}" + return stage + + +def stage_sort_key(stage: str, order_map: Dict[str, int]) -> Tuple[int, str]: + base = stage.split("::")[0] + base = base.replace("_channel_in", "") + return order_map.get(base, len(order_map)), stage + + +def should_keep_stage(stage: str, keep_nested: bool, exclude_network: bool) -> bool: + if exclude_network and ("broker_source_network_in" in stage or "network_in" in stage): + return False + if "::pdfium_pages_to_numpy" in stage: + return False + if keep_nested: + return True + return "::" not in stage + + +def build_plot( + results_path: Path, + pipeline_yaml: Path, + top_n: int | None, + log_scale: bool, + width: int, + keep_nested: bool, + sort_mode: str, + summary_rows: int, + exclude_network: bool, +): + data = json.loads(results_path.read_text()) + stage_totals = data["trace_summary"]["stage_totals"] + + stage_order = load_stage_order(pipeline_yaml) + order_map = {stage: idx for idx, stage in enumerate(stage_order)} + + merged: Dict[str, Dict[str, float]] = {} + for stage, stats in stage_totals.items(): + if not should_keep_stage(stage, keep_nested=keep_nested, exclude_network=exclude_network): + continue + merged[stage] = stats + + entries = [] + for stage, stats in merged.items(): + sort_key = stage_sort_key(stage, order_map) + entries.append((sort_key, stage, stats)) + + if sort_mode == "total": + entries.sort(key=lambda item: item[2]["total_resident_s"], reverse=True) + else: + entries.sort() + + if top_n: + entries = entries[:top_n] + + labeled_entries = [ + { + "raw_stage": stage, + "label": friendly_name(stage), + "total": stats["total_resident_s"], + "avg": stats["avg_resident_s"], + "docs": stats["documents"], + } + for _, stage, stats in entries + ] + + stages = [item["label"] for item in labeled_entries] + totals = [item["total"] for item in labeled_entries] + avgs = [item["avg"] for item in labeled_entries] + docs = [item["docs"] for item in labeled_entries] + + fig_height = max(6, len(stages) * 0.35) + fig, ax = plt.subplots(figsize=(width, fig_height)) + bars = ax.barh(range(len(stages) - 1, -1, -1), totals, color="#4C72B0") + ax.set_xlabel("Total resident seconds") + ax.set_yticks(range(len(stages) - 1, -1, -1)) + ax.set_yticklabels(stages) + if log_scale: + ax.set_xscale("log") + ax.grid(axis="x", linestyle="--", alpha=0.3) + + for bar, avg, doc_cnt in zip(bars, avgs, docs): + width_val = bar.get_width() + ax.text( + width_val, + bar.get_y() + bar.get_height() / 2, + f"{width_val:.1f}s (avg {avg:.2f}s, docs {doc_cnt})", + va="center", + ha="left", + fontsize=8, + color="#333333", + ) + + ax.set_title(f"Stage resident time totals – {data['test_config']['test_name']}") + plt.tight_layout() + + out_path = results_path.with_suffix(".stage_time.png") + plt.savefig(out_path, dpi=200) + print(f"Wrote {out_path}") + if summary_rows > 0: + print("\nTop stages by resident time") + print("-" * 90) + print(f"{'Stage':<40} {'Total (s)':>12} {'Avg (s)':>10} {'Docs':>6}") + print("-" * 90) + for item in labeled_entries[:summary_rows]: + stage = item["label"] + total = item["total"] + avg = item["avg"] + doc_cnt = item["docs"] + print(f"{stage:<40} {total:>12.2f} {avg:>10.2f} {doc_cnt:>6}") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Plot stage resident times from results.json") + parser.add_argument("results", type=Path, help="Path to results.json artifact") + parser.add_argument( + "--pipeline", + type=Path, + default=Path("config/default_pipeline.yaml"), + help="Pipeline YAML to derive stage ordering", + ) + parser.add_argument("--top-n", type=int, help="Limit to top N stages") + parser.add_argument("--log-scale", action="store_true", help="Use log scale on x-axis") + parser.add_argument("--width", type=int, default=14, help="Plot width in inches") + parser.add_argument( + "--keep-nested", + action="store_true", + help="Keep nested stage names (default: drop entries containing '::')", + ) + parser.add_argument( + "--sort", + choices=["total", "pipeline"], + default="total", + help="Sort bars by total resident seconds or pipeline order", + ) + parser.add_argument( + "--summary-rows", + type=int, + default=10, + help="Print textual summary for top N stages (0 disables)", + ) + parser.add_argument( + "--exclude-network", + action="store_true", + help="Exclude broker/network-in stages from the visualization", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + build_plot( + results_path=args.results, + pipeline_yaml=args.pipeline, + top_n=args.top_n, + log_scale=args.log_scale, + width=args.width, + keep_nested=args.keep_nested, + sort_mode=args.sort, + summary_rows=args.summary_rows, + exclude_network=args.exclude_network, + ) + + +if __name__ == "__main__": + main() From 59318d9ccb3e124d0f311dad6fa133e5e2b8ecf0 Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Thu, 20 Nov 2025 23:36:42 +0000 Subject: [PATCH 3/6] adding wall time plot + better tracing --- scripts/tests/README.md | 6 +- scripts/tests/cases/e2e.py | 26 ++-- scripts/tests/test_configs.yaml | 4 + scripts/tests/tools/plot_stage_totals.py | 181 ++++++++++++++++++++++- 4 files changed, 198 insertions(+), 19 deletions(-) diff --git a/scripts/tests/README.md b/scripts/tests/README.md index 62345dcf7..9d7886b53 100644 --- a/scripts/tests/README.md +++ b/scripts/tests/README.md @@ -885,9 +885,9 @@ EXTRACT_IMAGES=true python run.py --case=e2e --dataset=bo767 - `results.json` → includes `trace_summary` with per-stage resident seconds - `traces/*.json` → raw payloads identical to `scripts/private_local/trace.json` - Use descriptive folder names (e.g., copy artifacts to `artifacts/baseline_bo20/`) to keep baseline vs RC side-by-side. -5. **Visualize stage timings** +5. **Visualize stage + wall timings** ```bash - # Generates results.stage_time.png + textual summary + # Generates results.stage_time.png (per-stage resident) and results.wall_time.png (doc wall vs resident) python scripts/tests/tools/plot_stage_totals.py \ scripts/tests/artifacts//results.json \ --sort total \ @@ -896,6 +896,8 @@ EXTRACT_IMAGES=true python run.py --case=e2e --dataset=bo767 ``` - `--keep-nested` preserves nested entries such as chart/table OCR workloads - `--exclude-network` hides broker/Redis wait time so the chart focuses on Ray stages + - `--doc-top-n` controls how many slowest documents appear on the wall-time plot (set `0` for all) + - `--skip-wall-plot` emits only the resident-time chart if you want the legacy behavior ### Future Trace Visualization Case (roadmap) diff --git a/scripts/tests/cases/e2e.py b/scripts/tests/cases/e2e.py index f00ff85c5..4d9996383 100644 --- a/scripts/tests/cases/e2e.py +++ b/scripts/tests/cases/e2e.py @@ -89,8 +89,8 @@ def _summarize_traces(traces_list, results, trace_dir: str | None): stage_breakdown = {} total_resident = 0.0 - total_wall = 0.0 - wall_seen = False + doc_first_entry_ns = None + doc_last_exit_ns = None for key, value in trace_payload.items(): if not key.startswith("trace::resident_time::"): @@ -102,25 +102,27 @@ def _summarize_traces(traces_list, results, trace_dir: str | None): wall_s = None if entry is not None and exit_value is not None: wall_s = _ns_to_seconds(exit_value - entry) + if doc_first_entry_ns is None or entry < doc_first_entry_ns: + doc_first_entry_ns = entry + if doc_last_exit_ns is None or exit_value > doc_last_exit_ns: + doc_last_exit_ns = exit_value stage_entry = {"resident_s": round(resident_s, 6)} if wall_s is not None: stage_entry["wall_s"] = round(wall_s, 6) - wall_seen = True - total_wall += wall_s stage_breakdown[stage] = stage_entry total_resident += resident_s stage_totals[stage].append(resident_s) - document_totals.append( - { - "document_index": doc_index, - "source_id": source_id, - "total_resident_s": round(total_resident, 6), - **({"total_wall_s": round(total_wall, 6)} if wall_seen else {}), - } - ) + doc_record = { + "document_index": doc_index, + "source_id": source_id, + "total_resident_s": round(total_resident, 6), + } + if doc_first_entry_ns is not None and doc_last_exit_ns is not None and doc_last_exit_ns >= doc_first_entry_ns: + doc_record["total_wall_s"] = round(_ns_to_seconds(doc_last_exit_ns - doc_first_entry_ns), 6) + document_totals.append(doc_record) if trace_dir: trace_payload_path = os.path.join(trace_dir, f"{_safe_trace_filename(source_id, doc_index)}.json") diff --git a/scripts/tests/test_configs.yaml b/scripts/tests/test_configs.yaml index 36f87b0c3..82a280b62 100644 --- a/scripts/tests/test_configs.yaml +++ b/scripts/tests/test_configs.yaml @@ -38,6 +38,10 @@ active: split_chunk_size: 1024 # Chunk size for text splitting split_chunk_overlap: 150 # Overlap for text splitting + # Trace configuration + enable_traces: false + trace_output_dir: null # Null => artifacts//traces + # Storage configuration spill_dir: /tmp/spill artifacts_dir: null # null = use default (scripts/tests/artifacts) diff --git a/scripts/tests/tools/plot_stage_totals.py b/scripts/tests/tools/plot_stage_totals.py index 67a3394d1..1d8d1f521 100644 --- a/scripts/tests/tools/plot_stage_totals.py +++ b/scripts/tests/tools/plot_stage_totals.py @@ -14,7 +14,9 @@ import argparse import json from pathlib import Path -from typing import Dict, List, Tuple +from typing import Any, Dict, List, Tuple + +import math import matplotlib.pyplot as plt import yaml @@ -97,7 +99,8 @@ def should_keep_stage(stage: str, keep_nested: bool, exclude_network: bool) -> b return "::" not in stage -def build_plot( +def build_stage_plot( + data: Dict[str, Any], results_path: Path, pipeline_yaml: Path, top_n: int | None, @@ -108,8 +111,12 @@ def build_plot( summary_rows: int, exclude_network: bool, ): - data = json.loads(results_path.read_text()) - stage_totals = data["trace_summary"]["stage_totals"] + trace_summary = data.get("trace_summary") + if not trace_summary or "stage_totals" not in trace_summary: + print("No stage_totals found in trace_summary; skipping stage plot.") + return + + stage_totals = trace_summary["stage_totals"] stage_order = load_stage_order(pipeline_yaml) order_map = {stage: idx for idx, stage in enumerate(stage_order)} @@ -190,6 +197,141 @@ def build_plot( print(f"{stage:<40} {total:>12.2f} {avg:>10.2f} {doc_cnt:>6}") +def build_wall_plot( + data: Dict[str, Any], + results_path: Path, + doc_top_n: int | None, + width: int, + summary_rows: int, + title_suffix: str | None = None, +): + trace_summary = data.get("trace_summary") + if not trace_summary: + print("No trace_summary present; skipping wall-time plot.") + return + document_totals = trace_summary.get("document_totals") + if not document_totals: + print("No document_totals present; skipping wall-time plot.") + return + + run_results = data.get("results", {}) + ingestion_time = run_results.get("ingestion_time_s") + result_count = run_results.get("result_count") or len(document_totals) + + documents = sorted( + document_totals, + key=lambda item: item.get("total_wall_s", 0.0), + reverse=True, + ) + if doc_top_n: + documents = documents[:doc_top_n] + + if not documents: + print("No document entries available after filtering; skipping wall-time plot.") + return + + labels = [] + wall_vals = [] + resident_vals = [] + ratios = [] + for doc in documents: + source = doc.get("source_id") or f"doc_{doc.get('document_index', '?')}" + source_name = Path(source).name + doc_idx = doc.get("document_index") + label = f"{doc_idx}: {source_name}" if doc_idx is not None else source_name + labels.append(label) + wall = float(doc.get("total_wall_s", 0.0)) + resident = float(doc.get("total_resident_s", 0.0)) + wall_vals.append(wall) + resident_vals.append(resident) + ratios.append(resident / wall if wall > 0 else math.inf) + + labels = labels[::-1] + wall_vals = wall_vals[::-1] + resident_vals = resident_vals[::-1] + ratios = ratios[::-1] + + base_positions = list(range(len(labels))) + wall_positions = [pos + 0.2 for pos in base_positions] + resident_positions = [pos - 0.2 for pos in base_positions] + + fig_height = max(6, len(labels) * 0.35) + fig, ax = plt.subplots(figsize=(width, fig_height)) + + ax.barh( + wall_positions, + wall_vals, + height=0.35, + color="#55A868", + label="Wall seconds", + ) + ax.barh( + resident_positions, + resident_vals, + height=0.35, + color="#C44E52", + label="Resident seconds", + ) + + ax.set_yticks(base_positions) + ax.set_yticklabels(labels) + ax.invert_yaxis() + ax.set_xlabel("Seconds per document") + title = "Document wall vs resident time" + if title_suffix: + title = f"{title} – {title_suffix}" + ax.set_title(title) + ax.legend(loc="upper right") + + max_val = max(wall_vals + resident_vals) + text_x = max_val * 1.02 if max_val > 0 else 0.5 + + for pos, wall, resident, ratio in zip(base_positions, wall_vals, resident_vals, ratios): + if wall <= 0 and resident <= 0: + continue + ratio_str = "∞" if math.isinf(ratio) else f"{ratio:.1f}×" + ax.text( + text_x, + pos, + f"resident/wall {ratio_str}", + va="center", + fontsize=8, + color="#333333", + ) + + info_lines = [] + if ingestion_time is not None: + info_lines.append(f"Run wall time: {ingestion_time:.1f}s") + if result_count: + info_lines.append(f"Documents: {result_count}") + if info_lines: + ax.text( + 0.01, + 1.02, + " • ".join(info_lines), + transform=ax.transAxes, + ha="left", + va="bottom", + fontsize=9, + color="#333333", + ) + + plt.tight_layout() + out_path = results_path.with_suffix(".wall_time.png") + plt.savefig(out_path, dpi=200) + print(f"Wrote {out_path}") + + if summary_rows > 0: + print("\nTop documents by wall time") + print("-" * 90) + print(f"{'Document':<40} {'Wall (s)':>12} {'Resident (s)':>14} {'Ratio':>8}") + print("-" * 90) + truncated = list(zip(labels[::-1], wall_vals[::-1], resident_vals[::-1], ratios[::-1]))[:summary_rows] + for label, wall, resident, ratio in truncated: + ratio_str = "∞" if math.isinf(ratio) else f"{ratio:.2f}" + print(f"{label:<40} {wall:>12.2f} {resident:>14.2f} {ratio_str:>8}") + + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Plot stage resident times from results.json") parser.add_argument("results", type=Path, help="Path to results.json artifact") @@ -224,12 +366,31 @@ def parse_args() -> argparse.Namespace: action="store_true", help="Exclude broker/network-in stages from the visualization", ) + parser.add_argument( + "--doc-top-n", + type=int, + default=30, + help="Limit wall-time plot to top N documents by wall seconds (0 disables limit)", + ) + parser.add_argument( + "--doc-summary-rows", + type=int, + default=10, + help="Print textual summary for document wall times (0 disables)", + ) + parser.add_argument( + "--skip-wall-plot", + action="store_true", + help="Only emit the stage resident-time plot", + ) return parser.parse_args() def main(): args = parse_args() - build_plot( + data = json.loads(args.results.read_text()) + build_stage_plot( + data=data, results_path=args.results, pipeline_yaml=args.pipeline, top_n=args.top_n, @@ -240,6 +401,16 @@ def main(): summary_rows=args.summary_rows, exclude_network=args.exclude_network, ) + if not args.skip_wall_plot: + test_name = data.get("test_config", {}).get("test_name") + build_wall_plot( + data=data, + results_path=args.results, + doc_top_n=args.doc_top_n or None, + width=args.width, + summary_rows=args.doc_summary_rows, + title_suffix=test_name, + ) if __name__ == "__main__": From 9bca01e6c590da84ac50789411a9e8533bbbbd70 Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Tue, 25 Nov 2025 16:05:38 +0000 Subject: [PATCH 4/6] Add submission timestamp and wait/queue time profiling - Track submission_ts_ns throughout V2 ingest pipeline - Extract ray_wait_s, in_ray_queue_s, ray_start_ts_s, ray_end_ts_s metrics - Enhance wall-time visualization with wait and queue time bars - Add wait/queue time summaries and percentile statistics - Update documentation with new profiling metrics --- scripts/tests/README.md | 19 ++- scripts/tests/cases/e2e.py | 23 +++ scripts/tests/tools/plot_stage_totals.py | 207 +++++++++++++++++++++-- src/nv_ingest/api/v2/ingest.py | 15 +- 4 files changed, 248 insertions(+), 16 deletions(-) diff --git a/scripts/tests/README.md b/scripts/tests/README.md index 9d7886b53..19eed8799 100644 --- a/scripts/tests/README.md +++ b/scripts/tests/README.md @@ -221,8 +221,22 @@ When traces are enabled: } }, "document_totals": [ - {"document_index": 0, "source_id": "doc001.pdf", "total_resident_s": 1.58}, - {"document_index": 1, "source_id": "doc002.pdf", "total_resident_s": 1.64} + { + "document_index": 0, + "source_id": "doc001.pdf", + "total_resident_s": 1.58, + "ray_start_ts_s": 1732194384.123456, + "ray_end_ts_s": 1732194399.901234, + "total_wall_s": 15.78 + }, + { + "document_index": 1, + "source_id": "doc002.pdf", + "total_resident_s": 1.64, + "ray_start_ts_s": 1732194385.012345, + "ray_end_ts_s": 1732194400.456789, + "total_wall_s": 15.44 + } // ... ] } @@ -898,6 +912,7 @@ EXTRACT_IMAGES=true python run.py --case=e2e --dataset=bo767 - `--exclude-network` hides broker/Redis wait time so the chart focuses on Ray stages - `--doc-top-n` controls how many slowest documents appear on the wall-time plot (set `0` for all) - `--skip-wall-plot` emits only the resident-time chart if you want the legacy behavior +- When traces include `ray_start_ts_s`/`ray_end_ts_s` (scripts/tests/cases/e2e.py now records them), the wall-time textual table shows the Ray window timestamps alongside wall/resident totals. ### Future Trace Visualization Case (roadmap) diff --git a/scripts/tests/cases/e2e.py b/scripts/tests/cases/e2e.py index 4d9996383..87ce6ecbf 100644 --- a/scripts/tests/cases/e2e.py +++ b/scripts/tests/cases/e2e.py @@ -6,6 +6,7 @@ import sys import time from collections import defaultdict +from typing import Dict from nv_ingest_client.client import Ingestor from nv_ingest_client.util.document_analysis import analyze_document_chunks @@ -92,6 +93,18 @@ def _summarize_traces(traces_list, results, trace_dir: str | None): doc_first_entry_ns = None doc_last_exit_ns = None + submission_ts_ns = trace_payload.get("submission_ts_ns") + doc_submission_ts_ns = None + if isinstance(submission_ts_ns, (int, float)): + doc_submission_ts_ns = int(submission_ts_ns) + elif isinstance(submission_ts_ns, str): + try: + doc_submission_ts_ns = int(submission_ts_ns) + except ValueError: + doc_submission_ts_ns = None + + queue_wait_totals: Dict[str, float] = defaultdict(float) + for key, value in trace_payload.items(): if not key.startswith("trace::resident_time::"): continue @@ -114,14 +127,24 @@ def _summarize_traces(traces_list, results, trace_dir: str | None): stage_breakdown[stage] = stage_entry total_resident += resident_s stage_totals[stage].append(resident_s) + if stage.endswith("_channel_in"): + queue_wait_totals[stage] += resident_s doc_record = { "document_index": doc_index, "source_id": source_id, "total_resident_s": round(total_resident, 6), } + if queue_wait_totals: + doc_record["in_ray_queue_s"] = round(sum(queue_wait_totals.values()), 6) + if doc_submission_ts_ns is not None: + doc_record["submission_ts_s"] = round(_ns_to_seconds(doc_submission_ts_ns), 6) if doc_first_entry_ns is not None and doc_last_exit_ns is not None and doc_last_exit_ns >= doc_first_entry_ns: + doc_record["ray_start_ts_s"] = round(_ns_to_seconds(doc_first_entry_ns), 6) + doc_record["ray_end_ts_s"] = round(_ns_to_seconds(doc_last_exit_ns), 6) doc_record["total_wall_s"] = round(_ns_to_seconds(doc_last_exit_ns - doc_first_entry_ns), 6) + if doc_submission_ts_ns is not None and doc_first_entry_ns >= doc_submission_ts_ns: + doc_record["ray_wait_s"] = round(_ns_to_seconds(doc_first_entry_ns - doc_submission_ts_ns), 6) document_totals.append(doc_record) if trace_dir: diff --git a/scripts/tests/tools/plot_stage_totals.py b/scripts/tests/tools/plot_stage_totals.py index 1d8d1f521..1e00322dc 100644 --- a/scripts/tests/tools/plot_stage_totals.py +++ b/scripts/tests/tools/plot_stage_totals.py @@ -197,6 +197,71 @@ def build_stage_plot( print(f"{stage:<40} {total:>12.2f} {avg:>10.2f} {doc_cnt:>6}") +def _doc_wait_seconds(doc: Dict[str, Any]) -> float | None: + wait = doc.get("ray_wait_s") + if wait is not None: + try: + return float(wait) + except (TypeError, ValueError): + return None + start = doc.get("ray_start_ts_s") + submitted = doc.get("submission_ts_s") + if start is None or submitted is None: + return None + try: + wait_val = float(start) - float(submitted) + except (TypeError, ValueError): + return None + return max(0.0, wait_val) + + +def _percentile(values: List[float], pct: float) -> float | None: + if not values: + return None + if pct <= 0: + return values[0] + if pct >= 100: + return values[-1] + k = (len(values) - 1) * (pct / 100.0) + f = math.floor(k) + c = math.ceil(k) + if f == c: + return values[int(k)] + return values[f] + (values[c] - values[f]) * (k - f) + + +def _print_wait_summary(wait_values: List[float], wall_values: List[float]): + if not wait_values or not wall_values: + return + sorted_waits = sorted(wait_values) + p50 = _percentile(sorted_waits, 50) + p90 = _percentile(sorted_waits, 90) + p99 = _percentile(sorted_waits, 99) + max_wait = sorted_waits[-1] + ratio_samples = [w / w_tot for w, w_tot in zip(wait_values, wall_values) if w_tot > 0] + avg_ratio = sum(ratio_samples) / len(ratio_samples) if ratio_samples else 0.0 + print("\nWait time summary (all documents)") + print("-" * 90) + print(f"Median wait: {p50:.2f}s | p90: {p90:.2f}s | p99: {p99:.2f}s | max: {max_wait:.2f}s") + print(f"Average wait / wall fraction: {avg_ratio * 100:.2f}%") + + +def _print_queue_summary(queue_values: List[float], wall_values: List[float]): + if not queue_values or not wall_values: + return + sorted_queues = sorted(queue_values) + p50 = _percentile(sorted_queues, 50) + p90 = _percentile(sorted_queues, 90) + p99 = _percentile(sorted_queues, 99) + max_queue = sorted_queues[-1] + ratios = [queue / wall for queue, wall in zip(queue_values, wall_values) if wall > 0] + avg_ratio = sum(ratios) / len(ratios) if ratios else 0.0 + print("\nIn-Ray queue time summary (all documents)") + print("-" * 90) + print(f"Median queue: {p50:.2f}s | p90: {p90:.2f}s | p99: {p99:.2f}s | max: {max_queue:.2f}s") + print(f"Average queue / wall fraction: {avg_ratio * 100:.2f}%") + + def build_wall_plot( data: Dict[str, Any], results_path: Path, @@ -204,6 +269,7 @@ def build_wall_plot( width: int, summary_rows: int, title_suffix: str | None = None, + doc_sort: str = "wall", ): trace_summary = data.get("trace_summary") if not trace_summary: @@ -218,11 +284,18 @@ def build_wall_plot( ingestion_time = run_results.get("ingestion_time_s") result_count = run_results.get("result_count") or len(document_totals) - documents = sorted( - document_totals, - key=lambda item: item.get("total_wall_s", 0.0), - reverse=True, - ) + def wait_sort_key(doc: Dict[str, Any]) -> float: + wait_val = _doc_wait_seconds(doc) + return wait_val if wait_val is not None else 0.0 + + if doc_sort == "wait": + documents = sorted(document_totals, key=wait_sort_key, reverse=True) + else: + documents = sorted( + document_totals, + key=lambda item: item.get("total_wall_s", 0.0), + reverse=True, + ) if doc_top_n: documents = documents[:doc_top_n] @@ -230,10 +303,16 @@ def build_wall_plot( print("No document entries available after filtering; skipping wall-time plot.") return - labels = [] - wall_vals = [] - resident_vals = [] - ratios = [] + labels: List[str] = [] + wall_vals: List[float] = [] + resident_vals: List[float] = [] + ratios: List[float] = [] + ray_start_vals: List[float | None] = [] + ray_end_vals: List[float | None] = [] + submission_vals: List[float | None] = [] + wait_vals: List[float | None] = [] + queue_vals: List[float | None] = [] + for doc in documents: source = doc.get("source_id") or f"doc_{doc.get('document_index', '?')}" source_name = Path(source).name @@ -245,15 +324,33 @@ def build_wall_plot( wall_vals.append(wall) resident_vals.append(resident) ratios.append(resident / wall if wall > 0 else math.inf) + ray_start_vals.append(doc.get("ray_start_ts_s")) + ray_end_vals.append(doc.get("ray_end_ts_s")) + submission_vals.append(doc.get("submission_ts_s")) + wait_vals.append(_doc_wait_seconds(doc)) + queue_val = doc.get("in_ray_queue_s") + if queue_val is None: + queue_vals.append(None) + else: + try: + queue_vals.append(float(queue_val)) + except (TypeError, ValueError): + queue_vals.append(None) labels = labels[::-1] wall_vals = wall_vals[::-1] resident_vals = resident_vals[::-1] ratios = ratios[::-1] + wait_vals = wait_vals[::-1] + ray_start_vals = ray_start_vals[::-1] + ray_end_vals = ray_end_vals[::-1] + submission_vals = submission_vals[::-1] + queue_vals = queue_vals[::-1] base_positions = list(range(len(labels))) wall_positions = [pos + 0.2 for pos in base_positions] resident_positions = [pos - 0.2 for pos in base_positions] + queue_positions = [pos + 0.05 for pos in base_positions] fig_height = max(6, len(labels) * 0.35) fig, ax = plt.subplots(figsize=(width, fig_height)) @@ -272,6 +369,22 @@ def build_wall_plot( color="#C44E52", label="Resident seconds", ) + wait_display_vals = [val if val is not None else 0.0 for val in wait_vals] + ax.barh( + base_positions, + wait_display_vals, + height=0.15, + color="#FFA600", + label="Wait before Ray", + ) + queue_display_vals = [val if val is not None else 0.0 for val in queue_vals] + ax.barh( + queue_positions, + queue_display_vals, + height=0.12, + color="#AA65D2", + label="In-Ray queue", + ) ax.set_yticks(base_positions) ax.set_yticklabels(labels) @@ -321,15 +434,76 @@ def build_wall_plot( plt.savefig(out_path, dpi=200) print(f"Wrote {out_path}") + all_wait_values: List[float] = [] + all_queue_values: List[float] = [] + all_walls: List[float] = [] + for doc in document_totals: + wait_value = _doc_wait_seconds(doc) + if wait_value is None: + continue + all_wait_values.append(wait_value) + wall_value = doc.get("total_wall_s") + if wall_value is None: + all_walls.append(wait_value) # Dummy placeholder to keep lengths equal + else: + try: + all_walls.append(float(wall_value)) + except (TypeError, ValueError): + all_walls.append(wait_value) + queue_val = doc.get("in_ray_queue_s") + if queue_val is not None: + try: + all_queue_values.append(float(queue_val)) + except (TypeError, ValueError): + pass + _print_wait_summary(all_wait_values, all_walls) + _print_queue_summary(all_queue_values, all_walls) + + has_ray = any(value is not None for value in ray_start_vals) + has_submission = any(value is not None for value in submission_vals) + has_wait = any(value is not None for value in wait_vals) + has_queue = any(value is not None for value in queue_vals) + if summary_rows > 0: print("\nTop documents by wall time") print("-" * 90) - print(f"{'Document':<40} {'Wall (s)':>12} {'Resident (s)':>14} {'Ratio':>8}") + header = f"{'Document':<40} {'Wall (s)':>12} {'Resident (s)':>14} {'Ratio':>8}" + if has_wait: + header += f" {'Wait (s)':>10}" + if has_queue: + header += f" {'Ray queue (s)':>14}" + if has_submission: + header += f" {'Submitted (s)':>15}" + if has_ray: + header += f" {'Ray start (s)':>15} {'Ray end (s)':>15}" + print(header) print("-" * 90) - truncated = list(zip(labels[::-1], wall_vals[::-1], resident_vals[::-1], ratios[::-1]))[:summary_rows] - for label, wall, resident, ratio in truncated: + truncated = list( + zip( + labels[::-1], + wall_vals[::-1], + resident_vals[::-1], + ratios[::-1], + wait_vals[::-1], + queue_vals[::-1], + submission_vals[::-1], + ray_start_vals[::-1], + ray_end_vals[::-1], + ) + )[:summary_rows] + for label, wall, resident, ratio, wait, queue, submitted, ray_start, ray_end in truncated: ratio_str = "∞" if math.isinf(ratio) else f"{ratio:.2f}" - print(f"{label:<40} {wall:>12.2f} {resident:>14.2f} {ratio_str:>8}") + row = f"{label:<40} {wall:>12.2f} {resident:>14.2f} {ratio_str:>8}" + if has_wait: + row += f" {wait if wait is not None else '-':>10}" + if has_queue: + row += f" {queue if queue is not None else '-':>10}" + if has_submission: + row += f" {submitted if submitted is not None else '-':>15}" + if has_ray: + row += f" {ray_start if ray_start is not None else '-':>15}" + row += f" {ray_end if ray_end is not None else '-':>15}" + print(row) def parse_args() -> argparse.Namespace: @@ -378,6 +552,12 @@ def parse_args() -> argparse.Namespace: default=10, help="Print textual summary for document wall times (0 disables)", ) + parser.add_argument( + "--doc-sort", + choices=["wall", "wait"], + default="wall", + help="Sort document chart by wall time or wait time", + ) parser.add_argument( "--skip-wall-plot", action="store_true", @@ -410,6 +590,7 @@ def main(): width=args.width, summary_rows=args.doc_summary_rows, title_suffix=test_name, + doc_sort=args.doc_sort, ) diff --git a/src/nv_ingest/api/v2/ingest.py b/src/nv_ingest/api/v2/ingest.py index 6a5878b9e..3e0efb209 100644 --- a/src/nv_ingest/api/v2/ingest.py +++ b/src/nv_ingest/api/v2/ingest.py @@ -722,6 +722,10 @@ def _build_aggregated_response( end_page = descriptor.get("end_page") if trace_data: + submission_ts_ns = metadata.get("submission_ts_ns") + if submission_ts_ns is not None: + trace_data = dict(trace_data) + trace_data["submission_ts_ns"] = submission_ts_ns # Add to trace_segments (detailed, per-chunk view) aggregated_result["metadata"]["trace_segments"].append( { @@ -752,6 +756,11 @@ def _build_aggregated_response( logger.warning(f"Page {page_num} failed or missing") # Compute parent-level trace aggregations from trace_segments + submission_ts_ns = metadata.get("submission_ts_ns") + if submission_ts_ns is not None: + aggregated_result["metadata"]["submission_ts_ns"] = submission_ts_ns + aggregated_result["trace"]["submission_ts_ns"] = submission_ts_ns + trace_segments = aggregated_result["metadata"]["trace_segments"] if trace_segments: # Build a temporary chunk trace dict for aggregation @@ -765,6 +774,8 @@ def _build_aggregated_response( # Aggregate and set as top-level trace (only parent traces, no chunk traces) parent_level_traces = _aggregate_parent_traces(temp_chunk_traces) + if submission_ts_ns is not None: + parent_level_traces["submission_ts_ns"] = submission_ts_ns aggregated_result["trace"] = parent_level_traces return aggregated_result @@ -836,6 +847,7 @@ async def submit_job_v2( document_type = None try: span.add_event("Submitting file for processing (V2)") + submission_ts_ns = time.time_ns() current_trace_id = span.get_span_context().trace_id parent_job_id = trace_id_to_uuid(current_trace_id) @@ -863,7 +875,7 @@ async def submit_job_v2( submission_items: List[Tuple[str, MessageWrapper]] = [] subjob_ids: List[str] = [] subjob_descriptors: List[Dict[str, Any]] = [] - parent_metadata: Dict[str, Any] = {} + parent_metadata: Dict[str, Any] = {"submission_ts_ns": submission_ts_ns} submission_items: List[Tuple[str, MessageWrapper]] = [] try: parent_uuid = uuid.UUID(parent_job_id) @@ -1072,6 +1084,7 @@ async def submit_job_v2( "original_source_name": original_source_name, "document_type": document_types[0], "subjob_order": [], # No subjobs for non-split PDFs + "submission_ts_ns": submission_ts_ns, } # Store as parent job metadata with empty subjob list for consistency From 57c0dcde1b7b41b60a8c3da3e22012bc810f54cb Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Tue, 25 Nov 2025 21:07:23 +0000 Subject: [PATCH 5/6] reorganizing code and simplifying --- scripts/__init__.py | 1 + scripts/tests/README.md | 38 ++++- scripts/tests/__init__.py | 1 + scripts/tests/cases/e2e.py | 173 ++------------------ scripts/tests/run.py | 9 +- scripts/tests/tools/plot_stage_totals.py | 97 +++++++++++- scripts/tests/utils/__init__.py | 1 + scripts/tests/utils/trace_summary.py | 191 +++++++++++++++++++++++ 8 files changed, 335 insertions(+), 176 deletions(-) create mode 100644 scripts/__init__.py create mode 100644 scripts/tests/__init__.py create mode 100644 scripts/tests/utils/__init__.py create mode 100644 scripts/tests/utils/trace_summary.py diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 000000000..0ed70382e --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1 @@ +"""Project-level scripts package.""" diff --git a/scripts/tests/README.md b/scripts/tests/README.md index 19eed8799..9bb3c7662 100644 --- a/scripts/tests/README.md +++ b/scripts/tests/README.md @@ -205,7 +205,16 @@ EXTRACT_IMAGES=true API_VERSION=v2 python run.py --case=e2e --dataset=bo767 When traces are enabled: 1. Each processed document gets a raw payload written to `trace_output_dir/_.json` that contains the complete `"trace::"` dictionary plus a small stage summary. These files match the structure shown in `scripts/private_local/trace.json`. -2. `results.json` gains a `trace_summary` block that aggregates resident-time seconds per stage and lists total compute seconds per document. Example: +2. `results.json` automatically gains a `trace_summary` block every time `run.py` executes a traced case—no manual helper calls needed. +3. The per-document section now includes the full timing context needed for wait-time analysis: + - `submission_ts_s`: FastAPI submission timestamp (wall-clock seconds) + - `ray_start_ts_s` / `ray_end_ts_s`: first stage entry and last stage exit inside Ray + - `ray_wait_s`: latency between submission and Ray start (pre-Ray wait) + - `in_ray_queue_s`: cumulative resident seconds spent in `*_channel_in` actors (Ray queue wait) + - `total_wall_s`: elapsed Ray processing window (`ray_end - ray_start`) + - `total_resident_s`: summed resident seconds across all compute stages + +Example: ```json "trace_summary": { @@ -224,18 +233,24 @@ When traces are enabled: { "document_index": 0, "source_id": "doc001.pdf", + "submission_ts_s": 1732194383.812345, "total_resident_s": 1.58, "ray_start_ts_s": 1732194384.123456, "ray_end_ts_s": 1732194399.901234, - "total_wall_s": 15.78 + "total_wall_s": 15.78, + "ray_wait_s": 0.311111, + "in_ray_queue_s": 0.042381 }, { "document_index": 1, "source_id": "doc002.pdf", + "submission_ts_s": 1732194384.034567, "total_resident_s": 1.64, "ray_start_ts_s": 1732194385.012345, "ray_end_ts_s": 1732194400.456789, - "total_wall_s": 15.44 + "total_wall_s": 15.44, + "ray_wait_s": 0.977778, + "in_ray_queue_s": 0.063255 } // ... ] @@ -791,6 +806,8 @@ All test cases receive a validated `TestConfig` object with typed fields, elimin CASES = ["e2e", "e2e_with_llm_summary", "your_new_case"] ``` +> ℹ️ Import hygiene: `run.py` now prepends the `cases/`, `tests/`, `scripts/`, and repo-root directories to `sys.path` before loading a case, so new modules can directly import shared helpers (`from scripts.interact import ...`, `from scripts.tests.utils import ...`) without mutating `sys.path` locally. + ### Extending Configuration To add new configurable parameters: @@ -899,20 +916,27 @@ EXTRACT_IMAGES=true python run.py --case=e2e --dataset=bo767 - `results.json` → includes `trace_summary` with per-stage resident seconds - `traces/*.json` → raw payloads identical to `scripts/private_local/trace.json` - Use descriptive folder names (e.g., copy artifacts to `artifacts/baseline_bo20/`) to keep baseline vs RC side-by-side. -5. **Visualize stage + wall timings** +5. **Visualize stage + wall/wait timings** ```bash - # Generates results.stage_time.png (per-stage resident) and results.wall_time.png (doc wall vs resident) + # Positional argument = results.json path; emits stage + wall PNGs python scripts/tests/tools/plot_stage_totals.py \ scripts/tests/artifacts//results.json \ + --doc-top-n 25 \ + --doc-sort wait \ --sort total \ --keep-nested \ --exclude-network ``` - `--keep-nested` preserves nested entries such as chart/table OCR workloads - `--exclude-network` hides broker/Redis wait time so the chart focuses on Ray stages - - `--doc-top-n` controls how many slowest documents appear on the wall-time plot (set `0` for all) + - `--doc-top-n` controls how many documents appear on the wall-time plot (set `0` for all) + - `--doc-sort wait` sorts documents by Ray wait time instead of wall time (useful for large jobs) - `--skip-wall-plot` emits only the resident-time chart if you want the legacy behavior -- When traces include `ray_start_ts_s`/`ray_end_ts_s` (scripts/tests/cases/e2e.py now records them), the wall-time textual table shows the Ray window timestamps alongside wall/resident totals. + - Wall-time output now includes: + - Pre-Ray wait (submission → Ray start) + - In-Ray queue totals (sum of `*_channel_in` resident time) + - Execution window (Ray start/stop timestamps + wall vs resident bars) + - Percentile summaries for wait and queue time are printed in the CLI output for the entire dataset. ### Future Trace Visualization Case (roadmap) diff --git a/scripts/tests/__init__.py b/scripts/tests/__init__.py new file mode 100644 index 000000000..41e41434c --- /dev/null +++ b/scripts/tests/__init__.py @@ -0,0 +1 @@ +"""scripts.tests package initialization.""" diff --git a/scripts/tests/cases/e2e.py b/scripts/tests/cases/e2e.py index 87ce6ecbf..8978b850d 100644 --- a/scripts/tests/cases/e2e.py +++ b/scripts/tests/cases/e2e.py @@ -1,20 +1,21 @@ import json import logging import os -import re import shutil -import sys import time -from collections import defaultdict -from typing import Dict from nv_ingest_client.client import Ingestor from nv_ingest_client.util.document_analysis import analyze_document_chunks from nv_ingest_client.util.milvus import nvingest_retrieval -# Import from interact module (now properly structured) -sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) -from interact import embed_info, kv_event_log, milvus_chunks, segment_results, pdf_page_count +from scripts.interact import ( + embed_info, + kv_event_log, + milvus_chunks, + pdf_page_count, + segment_results, +) +from scripts.tests.utils.trace_summary import summarize_traces # Future: Will integrate with modular ingest_documents.py when VDB upload is separated @@ -28,162 +29,6 @@ MilvusClient = None # Optional; stats logging will be skipped if unavailable -NS_IN_SECOND = 1_000_000_000 - - -def _ns_to_seconds(value): - if value is None: - return None - return value / NS_IN_SECOND - - -def _safe_trace_filename(source_id: str | None, index: int) -> str: - base = source_id or f"document_{index}" - base = os.path.basename(str(base)) - sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", base)[:80] - if not sanitized: - sanitized = f"document_{index}" - return f"{index:03d}_{sanitized}" - - -def _extract_source_id(doc_results) -> str | None: - if doc_results is None: - return None - try: - first_entry = doc_results[0] - except (IndexError, KeyError, TypeError): - return None - except Exception: - try: - iterator = iter(doc_results) - first_entry = next(iterator) - except Exception: - return None - metadata = first_entry.get("metadata", {}) if isinstance(first_entry, dict) else {} - source_meta = metadata.get("source_metadata", {}) - return source_meta.get("source_id") - - -def _summarize_traces(traces_list, results, trace_dir: str | None): - if not traces_list: - return None - - valid_traces = [(idx, trace) for idx, trace in enumerate(traces_list) if trace] - if not valid_traces: - return None - - stage_totals = defaultdict(list) - document_totals = [] - - if trace_dir: - os.makedirs(trace_dir, exist_ok=True) - - for doc_index, trace_payload in valid_traces: - if not isinstance(trace_payload, dict): - continue - - source_id = None - if results and doc_index < len(results): - source_id = _extract_source_id(results[doc_index]) - if not source_id: - source_id = f"document_{doc_index}" - - stage_breakdown = {} - total_resident = 0.0 - doc_first_entry_ns = None - doc_last_exit_ns = None - - submission_ts_ns = trace_payload.get("submission_ts_ns") - doc_submission_ts_ns = None - if isinstance(submission_ts_ns, (int, float)): - doc_submission_ts_ns = int(submission_ts_ns) - elif isinstance(submission_ts_ns, str): - try: - doc_submission_ts_ns = int(submission_ts_ns) - except ValueError: - doc_submission_ts_ns = None - - queue_wait_totals: Dict[str, float] = defaultdict(float) - - for key, value in trace_payload.items(): - if not key.startswith("trace::resident_time::"): - continue - stage = key.replace("trace::resident_time::", "") - resident_s = _ns_to_seconds(value) or 0.0 - entry = trace_payload.get(f"trace::entry::{stage}") - exit_value = trace_payload.get(f"trace::exit::{stage}") - wall_s = None - if entry is not None and exit_value is not None: - wall_s = _ns_to_seconds(exit_value - entry) - if doc_first_entry_ns is None or entry < doc_first_entry_ns: - doc_first_entry_ns = entry - if doc_last_exit_ns is None or exit_value > doc_last_exit_ns: - doc_last_exit_ns = exit_value - - stage_entry = {"resident_s": round(resident_s, 6)} - if wall_s is not None: - stage_entry["wall_s"] = round(wall_s, 6) - - stage_breakdown[stage] = stage_entry - total_resident += resident_s - stage_totals[stage].append(resident_s) - if stage.endswith("_channel_in"): - queue_wait_totals[stage] += resident_s - - doc_record = { - "document_index": doc_index, - "source_id": source_id, - "total_resident_s": round(total_resident, 6), - } - if queue_wait_totals: - doc_record["in_ray_queue_s"] = round(sum(queue_wait_totals.values()), 6) - if doc_submission_ts_ns is not None: - doc_record["submission_ts_s"] = round(_ns_to_seconds(doc_submission_ts_ns), 6) - if doc_first_entry_ns is not None and doc_last_exit_ns is not None and doc_last_exit_ns >= doc_first_entry_ns: - doc_record["ray_start_ts_s"] = round(_ns_to_seconds(doc_first_entry_ns), 6) - doc_record["ray_end_ts_s"] = round(_ns_to_seconds(doc_last_exit_ns), 6) - doc_record["total_wall_s"] = round(_ns_to_seconds(doc_last_exit_ns - doc_first_entry_ns), 6) - if doc_submission_ts_ns is not None and doc_first_entry_ns >= doc_submission_ts_ns: - doc_record["ray_wait_s"] = round(_ns_to_seconds(doc_first_entry_ns - doc_submission_ts_ns), 6) - document_totals.append(doc_record) - - if trace_dir: - trace_payload_path = os.path.join(trace_dir, f"{_safe_trace_filename(source_id, doc_index)}.json") - trace_record = { - "document_index": doc_index, - "source_id": source_id, - "trace": trace_payload, - "stage_summary": stage_breakdown, - } - try: - with open(trace_payload_path, "w") as fp: - json.dump(trace_record, fp, indent=2) - except OSError as err: - print(f"Failed to write trace file {trace_payload_path}: {err}") - - if not stage_totals: - return None - - stage_summary = {} - for stage, values in stage_totals.items(): - total = sum(values) - count = len(values) - stage_summary[stage] = { - "documents": count, - "total_resident_s": round(total, 6), - "avg_resident_s": round(total / count, 6), - "max_resident_s": round(max(values), 6), - "min_resident_s": round(min(values), 6), - } - - return { - "documents": len(document_totals), - "output_dir": trace_dir, - "stage_totals": stage_summary, - "document_totals": document_totals, - } - - def main(config=None, log_path: str = "test_results") -> int: """ Main test entry point. @@ -422,7 +267,7 @@ def main(config=None, log_path: str = "test_results") -> int: trace_summary = None if enable_traces: - trace_summary = _summarize_traces(traces_list, results, trace_dir) + trace_summary = summarize_traces(traces_list, results, trace_dir) if trace_summary: kv_event_log("trace_documents", trace_summary["documents"], log_path) diff --git a/scripts/tests/run.py b/scripts/tests/run.py index c01fcad19..3679d8243 100644 --- a/scripts/tests/run.py +++ b/scripts/tests/run.py @@ -271,8 +271,13 @@ def close(self): # Add cases directory to sys.path so modules can import from utils cases_dir = os.path.dirname(case_path) - if cases_dir not in sys.path: - sys.path.insert(0, cases_dir) + tests_dir = os.path.dirname(cases_dir) + scripts_dir = os.path.dirname(tests_dir) + repo_root = os.path.dirname(scripts_dir) + + for path in (cases_dir, tests_dir, scripts_dir, repo_root): + if path and path not in sys.path: + sys.path.insert(0, path) # Load and execute the test case module spec = importlib.util.spec_from_file_location(case_name, case_path) diff --git a/scripts/tests/tools/plot_stage_totals.py b/scripts/tests/tools/plot_stage_totals.py index 1e00322dc..f1a3e1e1a 100644 --- a/scripts/tests/tools/plot_stage_totals.py +++ b/scripts/tests/tools/plot_stage_totals.py @@ -13,14 +13,29 @@ import argparse import json -from pathlib import Path -from typing import Any, Dict, List, Tuple - import math +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple import matplotlib.pyplot as plt import yaml +# Ensure repo root is importable so we can reuse shared trace helpers +CURRENT_DIR = Path(__file__).resolve() +try: + REPO_ROOT = CURRENT_DIR.parents[3] +except IndexError: + REPO_ROOT = CURRENT_DIR +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +try: + from scripts.tests.utils.trace_summary import summarize_traces +except ImportError as err: # pragma: no cover - defensive fallback + summarize_traces = None # type: ignore[assignment] + print(f"Warning: failed to import trace_summary helpers ({err}); trace rebuild disabled.") + FRIENDLY_STAGE_NAMES: Dict[str, str] = { "source_stage": "Source", @@ -64,6 +79,76 @@ } +def _load_traces_from_dir(trace_dir: Path) -> Optional[Tuple[List[Dict[str, Any]], List[List[Dict[str, Any]]]]]: + trace_files = sorted(trace_dir.glob("*.json")) + if not trace_files: + return None + traces: List[Optional[Dict[str, Any]]] = [] + results_stub: List[List[Dict[str, Any]]] = [] + + for trace_file in trace_files: + payload = json.loads(trace_file.read_text()) + trace_payload = payload.get("trace") or payload + doc_index = payload.get("document_index") + if doc_index is None: + doc_index = len(traces) + while len(traces) <= doc_index: + traces.append(None) + results_stub.append([]) + traces[doc_index] = trace_payload + source_id = payload.get("source_id") + results_stub[doc_index] = [ + { + "metadata": { + "source_metadata": { + "source_id": source_id, + } + } + } + ] + + # Replace any remaining None entries with empty dict so summarize_traces can skip them + clean_traces: List[Dict[str, Any]] = [trace or {} for trace in traces] + return clean_traces, results_stub + + +def _ensure_trace_summary(data: Dict[str, Any], results_path: Path, trace_dir_arg: Optional[Path]) -> None: + trace_summary = data.get("trace_summary") + if isinstance(trace_summary, dict) and trace_summary.get("document_totals"): + return + if summarize_traces is None: + print("trace_summary helpers unavailable; cannot rebuild trace summary.") + return + + candidate_dirs: List[Path] = [] + if trace_dir_arg: + candidate_dirs.append(trace_dir_arg) + output_dir = trace_summary.get("output_dir") if isinstance(trace_summary, dict) else None + if output_dir: + candidate_dirs.append(Path(output_dir)) + candidate_dirs.append(results_path.parent / "traces") + + visited = set() + for candidate in candidate_dirs: + if not candidate: + continue + candidate = candidate.expanduser().resolve() + if candidate in visited or not candidate.is_dir(): + continue + visited.add(candidate) + loaded = _load_traces_from_dir(candidate) + if not loaded: + continue + traces_list, results_stub = loaded + summary = summarize_traces(traces_list, results_stub, trace_dir=None) + if summary: + summary.setdefault("output_dir", str(candidate)) + data["trace_summary"] = summary + print(f"Rebuilt trace_summary from {candidate}") + return + print("Unable to locate trace_summary data; stage/wall plots may be skipped.") + + def load_stage_order(pipeline_yaml: Path) -> List[str]: pipeline = yaml.safe_load(pipeline_yaml.read_text()) return [stage["name"] for stage in pipeline.get("stages", [])] @@ -509,6 +594,11 @@ def wait_sort_key(doc: Dict[str, Any]) -> float: def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Plot stage resident times from results.json") parser.add_argument("results", type=Path, help="Path to results.json artifact") + parser.add_argument( + "--trace-dir", + type=Path, + help="Optional directory containing raw trace *.json files (auto-detected if omitted)", + ) parser.add_argument( "--pipeline", type=Path, @@ -569,6 +659,7 @@ def parse_args() -> argparse.Namespace: def main(): args = parse_args() data = json.loads(args.results.read_text()) + _ensure_trace_summary(data, args.results, args.trace_dir) build_stage_plot( data=data, results_path=args.results, diff --git a/scripts/tests/utils/__init__.py b/scripts/tests/utils/__init__.py new file mode 100644 index 000000000..fe9e55ad7 --- /dev/null +++ b/scripts/tests/utils/__init__.py @@ -0,0 +1 @@ +"""Shared utilities for scripts/tests cases and tools.""" diff --git a/scripts/tests/utils/trace_summary.py b/scripts/tests/utils/trace_summary.py new file mode 100644 index 000000000..fbb60bdba --- /dev/null +++ b/scripts/tests/utils/trace_summary.py @@ -0,0 +1,191 @@ +import json +import os +import re +from collections import defaultdict +from typing import Dict, List, Optional + +NS_IN_SECOND = 1_000_000_000 + + +def _ns_to_seconds(value: Optional[int]) -> Optional[float]: + if value is None: + return None + return value / NS_IN_SECOND + + +def _safe_trace_filename(source_id: str | None, index: int) -> str: + base = source_id or f"document_{index}" + base = os.path.basename(str(base)) + sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", base)[:80] + if not sanitized: + sanitized = f"document_{index}" + return f"{index:03d}_{sanitized}" + + +def summarize_traces( + traces_list: List[dict], + results: List, + trace_dir: str | None, +) -> Optional[dict]: + if not traces_list: + return None + + valid_traces = [(idx, trace) for idx, trace in enumerate(traces_list) if trace] + if not valid_traces: + return None + + stage_totals = defaultdict(list) + document_totals = [] + + if trace_dir: + os.makedirs(trace_dir, exist_ok=True) + + for doc_index, trace_payload in valid_traces: + if not isinstance(trace_payload, dict): + continue + + source_id = _extract_source_id(results, doc_index) + stage_summary = {} + total_resident = 0.0 + doc_first_entry_ns = None + doc_last_exit_ns = None + doc_submission_ts_ns = _parse_submission_ts(trace_payload) + queue_wait_totals: Dict[str, float] = defaultdict(float) + + for key, value in trace_payload.items(): + if not key.startswith("trace::resident_time::"): + continue + stage = key.replace("trace::resident_time::", "") + resident_s = _ns_to_seconds(value) or 0.0 + entry = trace_payload.get(f"trace::entry::{stage}") + exit_value = trace_payload.get(f"trace::exit::{stage}") + wall_s = None + if entry is not None and exit_value is not None: + wall_s = _ns_to_seconds(exit_value - entry) + doc_first_entry_ns = _min_val(doc_first_entry_ns, entry) + doc_last_exit_ns = _max_val(doc_last_exit_ns, exit_value) + + stage_entry = {"resident_s": round(resident_s, 6)} + if wall_s is not None: + stage_entry["wall_s"] = round(wall_s, 6) + + stage_summary[stage] = stage_entry + total_resident += resident_s + stage_totals[stage].append(resident_s) + if stage.endswith("_channel_in"): + queue_wait_totals[stage] += resident_s + + doc_record = { + "document_index": doc_index, + "source_id": source_id, + "total_resident_s": round(total_resident, 6), + } + if queue_wait_totals: + doc_record["in_ray_queue_s"] = round(sum(queue_wait_totals.values()), 6) + if doc_submission_ts_ns is not None: + doc_record["submission_ts_s"] = round(_ns_to_seconds(doc_submission_ts_ns), 6) + if doc_first_entry_ns is not None and doc_last_exit_ns is not None and doc_last_exit_ns >= doc_first_entry_ns: + doc_record["ray_start_ts_s"] = round(_ns_to_seconds(doc_first_entry_ns), 6) + doc_record["ray_end_ts_s"] = round(_ns_to_seconds(doc_last_exit_ns), 6) + doc_record["total_wall_s"] = round(_ns_to_seconds(doc_last_exit_ns - doc_first_entry_ns), 6) + if doc_submission_ts_ns is not None and doc_first_entry_ns >= doc_submission_ts_ns: + doc_record["ray_wait_s"] = round( + _ns_to_seconds(doc_first_entry_ns - doc_submission_ts_ns), + 6, + ) + + document_totals.append(doc_record) + if trace_dir: + _write_trace_payload(trace_dir, source_id, doc_index, trace_payload, stage_summary) + + if not stage_totals: + return None + + stage_summary = _build_stage_summary(stage_totals) + return { + "documents": len(document_totals), + "output_dir": trace_dir, + "stage_totals": stage_summary, + "document_totals": document_totals, + } + + +def _extract_source_id(results, doc_index: int) -> str: + source_id = None + if results and doc_index < len(results): + source_id = _extract_source_from_results(results[doc_index]) + if not source_id: + source_id = f"document_{doc_index}" + return source_id + + +def _extract_source_from_results(doc_results) -> Optional[str]: + if doc_results is None: + return None + try: + first_entry = doc_results[0] + except (IndexError, KeyError, TypeError): + return None + except Exception: + try: + iterator = iter(doc_results) + first_entry = next(iterator) + except Exception: + return None + metadata = first_entry.get("metadata", {}) if isinstance(first_entry, dict) else {} + source_meta = metadata.get("source_metadata", {}) + return source_meta.get("source_id") + + +def _parse_submission_ts(payload: dict) -> Optional[int]: + submission_ts_ns = payload.get("submission_ts_ns") + if isinstance(submission_ts_ns, (int, float)): + return int(submission_ts_ns) + if isinstance(submission_ts_ns, str): + try: + return int(submission_ts_ns) + except ValueError: + return None + return None + + +def _write_trace_payload(trace_dir: str, source_id: str, doc_index: int, payload: dict, stage_summary: dict) -> None: + trace_payload_path = os.path.join(trace_dir, f"{_safe_trace_filename(source_id, doc_index)}.json") + trace_record = { + "document_index": doc_index, + "source_id": source_id, + "trace": payload, + "stage_summary": stage_summary, + } + try: + with open(trace_payload_path, "w") as fp: + json.dump(trace_record, fp, indent=2) + except OSError as err: + print(f"Failed to write trace file {trace_payload_path}: {err}") + + +def _build_stage_summary(stage_totals: Dict[str, List[float]]) -> dict: + summary = {} + for stage, values in stage_totals.items(): + total = sum(values) + count = len(values) + summary[stage] = { + "documents": count, + "total_resident_s": round(total, 6), + "avg_resident_s": round(total / count, 6), + "max_resident_s": round(max(values), 6), + "min_resident_s": round(min(values), 6), + } + return summary + + +def _min_val(current: Optional[int], candidate: int) -> int: + if current is None or candidate < current: + return candidate + return current + + +def _max_val(current: Optional[int], candidate: int) -> int: + if current is None or candidate > current: + return candidate + return current From 429513f02feddbfd8fa1210d7e90fb21829df693 Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Wed, 26 Nov 2025 03:04:06 +0000 Subject: [PATCH 6/6] simplifying plot tool --- scripts/tests/tools/plot_stage_totals.py | 83 +++------------------- scripts/tests/utils/trace_loader.py | 88 ++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 75 deletions(-) create mode 100644 scripts/tests/utils/trace_loader.py diff --git a/scripts/tests/tools/plot_stage_totals.py b/scripts/tests/tools/plot_stage_totals.py index f1a3e1e1a..8be27482e 100644 --- a/scripts/tests/tools/plot_stage_totals.py +++ b/scripts/tests/tools/plot_stage_totals.py @@ -16,7 +16,7 @@ import math import sys from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Tuple import matplotlib.pyplot as plt import yaml @@ -31,10 +31,10 @@ sys.path.insert(0, str(REPO_ROOT)) try: - from scripts.tests.utils.trace_summary import summarize_traces + from scripts.tests.utils.trace_loader import ensure_trace_summary except ImportError as err: # pragma: no cover - defensive fallback - summarize_traces = None # type: ignore[assignment] - print(f"Warning: failed to import trace_summary helpers ({err}); trace rebuild disabled.") + ensure_trace_summary = None # type: ignore[assignment] + print(f"Warning: failed to import trace_loader helpers ({err}); trace summary rebuild disabled.") FRIENDLY_STAGE_NAMES: Dict[str, str] = { @@ -79,76 +79,6 @@ } -def _load_traces_from_dir(trace_dir: Path) -> Optional[Tuple[List[Dict[str, Any]], List[List[Dict[str, Any]]]]]: - trace_files = sorted(trace_dir.glob("*.json")) - if not trace_files: - return None - traces: List[Optional[Dict[str, Any]]] = [] - results_stub: List[List[Dict[str, Any]]] = [] - - for trace_file in trace_files: - payload = json.loads(trace_file.read_text()) - trace_payload = payload.get("trace") or payload - doc_index = payload.get("document_index") - if doc_index is None: - doc_index = len(traces) - while len(traces) <= doc_index: - traces.append(None) - results_stub.append([]) - traces[doc_index] = trace_payload - source_id = payload.get("source_id") - results_stub[doc_index] = [ - { - "metadata": { - "source_metadata": { - "source_id": source_id, - } - } - } - ] - - # Replace any remaining None entries with empty dict so summarize_traces can skip them - clean_traces: List[Dict[str, Any]] = [trace or {} for trace in traces] - return clean_traces, results_stub - - -def _ensure_trace_summary(data: Dict[str, Any], results_path: Path, trace_dir_arg: Optional[Path]) -> None: - trace_summary = data.get("trace_summary") - if isinstance(trace_summary, dict) and trace_summary.get("document_totals"): - return - if summarize_traces is None: - print("trace_summary helpers unavailable; cannot rebuild trace summary.") - return - - candidate_dirs: List[Path] = [] - if trace_dir_arg: - candidate_dirs.append(trace_dir_arg) - output_dir = trace_summary.get("output_dir") if isinstance(trace_summary, dict) else None - if output_dir: - candidate_dirs.append(Path(output_dir)) - candidate_dirs.append(results_path.parent / "traces") - - visited = set() - for candidate in candidate_dirs: - if not candidate: - continue - candidate = candidate.expanduser().resolve() - if candidate in visited or not candidate.is_dir(): - continue - visited.add(candidate) - loaded = _load_traces_from_dir(candidate) - if not loaded: - continue - traces_list, results_stub = loaded - summary = summarize_traces(traces_list, results_stub, trace_dir=None) - if summary: - summary.setdefault("output_dir", str(candidate)) - data["trace_summary"] = summary - print(f"Rebuilt trace_summary from {candidate}") - return - print("Unable to locate trace_summary data; stage/wall plots may be skipped.") - - def load_stage_order(pipeline_yaml: Path) -> List[str]: pipeline = yaml.safe_load(pipeline_yaml.read_text()) return [stage["name"] for stage in pipeline.get("stages", [])] @@ -659,7 +589,10 @@ def parse_args() -> argparse.Namespace: def main(): args = parse_args() data = json.loads(args.results.read_text()) - _ensure_trace_summary(data, args.results, args.trace_dir) + if ensure_trace_summary is None: + print("trace_loader helpers unavailable; proceeding with existing trace_summary.") + else: + ensure_trace_summary(data, args.results, args.trace_dir) build_stage_plot( data=data, results_path=args.results, diff --git a/scripts/tests/utils/trace_loader.py b/scripts/tests/utils/trace_loader.py new file mode 100644 index 000000000..70a28f32c --- /dev/null +++ b/scripts/tests/utils/trace_loader.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from .trace_summary import summarize_traces + + +def _load_traces_from_dir( + trace_dir: Path, +) -> Optional[Tuple[List[Dict[str, Any]], List[List[Dict[str, Any]]]]]: + """ + Read every *.json trace payload, normalizing indices and source ids. + Returns (traces_list, results_stub) tuples that summarize_traces() expects. + """ + trace_files = sorted(trace_dir.glob("*.json")) + if not trace_files: + return None + + traces: List[Optional[Dict[str, Any]]] = [] + results_stub: List[List[Dict[str, Any]]] = [] + + for trace_file in trace_files: + payload = json.loads(trace_file.read_text()) + trace_payload = payload.get("trace") or payload + doc_index = payload.get("document_index") + if doc_index is None: + doc_index = len(traces) + while len(traces) <= doc_index: + traces.append(None) + results_stub.append([]) + traces[doc_index] = trace_payload + source_id = payload.get("source_id") + results_stub[doc_index] = [ + { + "metadata": { + "source_metadata": { + "source_id": source_id, + } + } + } + ] + + clean_traces: List[Dict[str, Any]] = [trace or {} for trace in traces] + return clean_traces, results_stub + + +def ensure_trace_summary( + data: Dict[str, Any], results_path: Path, trace_dir_override: Optional[Path] = None +) -> Optional[Dict[str, Any]]: + """ + Ensure data["trace_summary"] exists and contains per-document totals. + Attempts to rebuild from raw traces if missing. + """ + trace_summary = data.get("trace_summary") + if isinstance(trace_summary, dict) and trace_summary.get("document_totals"): + return trace_summary + + candidate_dirs: List[Path] = [] + if trace_dir_override: + candidate_dirs.append(trace_dir_override) + if isinstance(trace_summary, dict): + output_dir = trace_summary.get("output_dir") + if output_dir: + candidate_dirs.append(Path(output_dir)) + candidate_dirs.append(results_path.parent / "traces") + + visited: set[Path] = set() + for candidate in candidate_dirs: + if not candidate: + continue + candidate = candidate.expanduser().resolve() + if candidate in visited or not candidate.is_dir(): + continue + visited.add(candidate) + loaded = _load_traces_from_dir(candidate) + if not loaded: + continue + traces_list, results_stub = loaded + rebuilt = summarize_traces(traces_list, results_stub, trace_dir=None) + if rebuilt: + rebuilt.setdefault("output_dir", str(candidate)) + data["trace_summary"] = rebuilt + return rebuilt + + print("Unable to locate trace_summary data; stage/wall plots may be skipped.") + return None