diff --git a/conda/environments/all_cuda-129_arch-aarch64.yaml b/conda/environments/all_cuda-129_arch-aarch64.yaml index cbbd871477e..87b11f10479 100644 --- a/conda/environments/all_cuda-129_arch-aarch64.yaml +++ b/conda/environments/all_cuda-129_arch-aarch64.yaml @@ -96,6 +96,7 @@ dependencies: - sphinx>=8.1.0 - sphinxcontrib-websupport - streamz +- structlog - sysroot_linux-aarch64==2.28 - typing_extensions>=4.0.0 - zlib>=1.2.13 diff --git a/conda/environments/all_cuda-129_arch-x86_64.yaml b/conda/environments/all_cuda-129_arch-x86_64.yaml index 129809438cf..793e9b1e856 100644 --- a/conda/environments/all_cuda-129_arch-x86_64.yaml +++ b/conda/environments/all_cuda-129_arch-x86_64.yaml @@ -97,6 +97,7 @@ dependencies: - sphinx>=8.1.0 - sphinxcontrib-websupport - streamz +- structlog - sysroot_linux-64==2.28 - typing_extensions>=4.0.0 - zlib>=1.2.13 diff --git a/conda/environments/all_cuda-130_arch-aarch64.yaml b/conda/environments/all_cuda-130_arch-aarch64.yaml index c33712a1f09..978ebd59a34 100644 --- a/conda/environments/all_cuda-130_arch-aarch64.yaml +++ b/conda/environments/all_cuda-130_arch-aarch64.yaml @@ -95,6 +95,7 @@ dependencies: - sphinx>=8.1.0 - sphinxcontrib-websupport - streamz +- structlog - sysroot_linux-aarch64==2.28 - typing_extensions>=4.0.0 - zlib>=1.2.13 diff --git a/conda/environments/all_cuda-130_arch-x86_64.yaml b/conda/environments/all_cuda-130_arch-x86_64.yaml index 4fea6669f35..472be62bee9 100644 --- a/conda/environments/all_cuda-130_arch-x86_64.yaml +++ b/conda/environments/all_cuda-130_arch-x86_64.yaml @@ -96,6 +96,7 @@ dependencies: - sphinx>=8.1.0 - sphinxcontrib-websupport - streamz +- structlog - sysroot_linux-64==2.28 - typing_extensions>=4.0.0 - zlib>=1.2.13 diff --git a/dependencies.yaml b/dependencies.yaml index b21fdb0dfe2..fc3b26e4963 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -13,6 +13,7 @@ files: - clang - cuda - cuda_version + - cudf_polars_trace - depends_on_cupy - depends_on_dask_cuda - depends_on_libkvikio @@ -337,6 +338,15 @@ files: - numpy_run - test_python_common - test_python_cudf_polars + - cudf_polars_trace + py_trace_cudf_polars: + output: pyproject + pyproject_dir: python/cudf_polars + extras: + table: project.optional-dependencies + key: trace + includes: + - cudf_polars_trace py_build_dask_cudf: output: pyproject pyproject_dir: python/dask_cudf @@ -1214,3 +1224,8 @@ dependencies: - output_types: [conda, requirements, pyproject] packages: - narwhals==2.0.1 + cudf_polars_trace: + common: + - output_types: [conda, requirements, pyproject] + packages: + - structlog diff --git a/docs/cudf/source/cudf_polars/usage.md b/docs/cudf/source/cudf_polars/usage.md index 2708d94c4aa..030e31029db 100644 --- a/docs/cudf/source/cudf_polars/usage.md +++ b/docs/cudf/source/cudf_polars/usage.md @@ -83,3 +83,42 @@ shape: (3, 3) │ Scan ┆ 813 ┆ 233993 │ └────────────────────┴───────┴────────┘ ``` + +## Tracing + +cudf-polars can optionally trace execution of each node in the query plan. +To enable tracing, set the environment variable ``CUDF_POLARS_LOG_TRACES`` to a +true value ("1", "true", "y", "yes") before starting your process. This will +capture and log information about each node before and after it executes, including information on + +- The type of the node being executed (e.g. `Scan`, `Select`, `Join`, `Groupby`, etc.) +- The shape and size (in memory) of each input and output DataFrame +- The GPU memory usage, as reported by [nvml], before and after executing the node +- A start and stop clock, which can be used to measure the duration of the node's execution + +The implementation uses [structlog] to build log records. You can configure the +output using structlog's [configuration][structlog-configure] and enrich the +records with [context variables][structlog-context]. + + +``` +>>> df = pl.DataFrame({"a": ["a", "a", "b"], "b": [1, 2, 3]}).lazy() +>>> df.group_by("a").agg(pl.col("b").min().alias("min"), pl.col("b").max().alias("max")).collect(engine="gpu") +2025-09-10 07:44:01 [info ] Execute IR count_frames_input=0 count_frames_output=1 ... type=DataFrameScan +2025-09-10 07:44:01 [info ] Execute IR count_frames_input=1 count_frames_output=1 ... type=GroupBy +shape: (2, 3) +┌─────┬─────┬─────┐ +│ a ┆ min ┆ max │ +│ --- ┆ --- ┆ --- │ +│ str ┆ i64 ┆ i64 │ +╞═════╪═════╪═════╡ +│ b ┆ 3 ┆ 3 │ +│ a ┆ 1 ┆ 2 │ +└─────┴─────┴─────┘ +``` + +[nvml]: https://developer.nvidia.com/management-library-nvml +[rmm-stats]: https://docs.rapids.ai/api/rmm/stable/guide/#memory-statistics-and-profiling +[structlog]: https://www.structlog.org/ +[structlog-configure](https://www.structlog.org/en/stable/configuration.html) +[structlog-context](https://www.structlog.org/en/stable/contextvars.html) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index a3418311630..b61e7db670c 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -22,6 +22,7 @@ import rmm from rmm._cuda import gpu +import cudf_polars.dsl.tracing from cudf_polars.dsl.tracing import CUDF_POLARS_NVTX_DOMAIN from cudf_polars.dsl.translate import Translator from cudf_polars.utils.config import _env_get_int, get_total_device_memory @@ -133,6 +134,12 @@ def set_memory_resource( != 0 ), ) + + if ( + cudf_polars.dsl.tracing.LOG_TRACES + ): # pragma: no cover; requires CUDF_POLARS_LOG_TRACES=1 + mr = rmm.mr.StatisticsResourceAdaptor(mr) + rmm.mr.set_current_device_resource(mr) try: yield mr diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index ab2b3055ede..c440a8f8577 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -33,7 +33,7 @@ from cudf_polars.dsl.expressions.base import ExecutionContext from cudf_polars.dsl.nodebase import Node from cudf_polars.dsl.to_ast import to_ast, to_parquet_filter -from cudf_polars.dsl.tracing import nvtx_annotate_cudf_polars +from cudf_polars.dsl.tracing import log_do_evaluate, nvtx_annotate_cudf_polars from cudf_polars.dsl.utils.reshape import broadcast from cudf_polars.dsl.utils.windows import range_window_bounds from cudf_polars.utils import dtypes @@ -479,6 +479,7 @@ def fast_count(self) -> int: # pragma: no cover return max(total_rows, 0) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Scan") def do_evaluate( cls, @@ -975,6 +976,7 @@ def _write_parquet( plc.io.parquet.write_parquet(writer_options) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Sink") def do_evaluate( cls, @@ -1034,6 +1036,7 @@ def is_equal(self, other: Self) -> bool: # noqa: D102 return False @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Cache") def do_evaluate( cls, key: int, refcount: int | None, df: DataFrame @@ -1114,6 +1117,7 @@ def get_hashable(self) -> Hashable: ) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="DataFrameScan") def do_evaluate( cls, @@ -1173,6 +1177,7 @@ def _is_len_expr(exprs: tuple[expr.NamedExpr, ...]) -> bool: # pragma: no cover return False @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Select") def do_evaluate( cls, @@ -1256,6 +1261,7 @@ def __init__( self._non_child_args = (self.exprs,) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Reduce") def do_evaluate( cls, @@ -1352,6 +1358,7 @@ def __init__( ) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Rolling") def do_evaluate( cls, @@ -1476,6 +1483,7 @@ def __init__( ) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="GroupBy") def do_evaluate( cls, @@ -1603,7 +1611,8 @@ def __reduce__(self) -> tuple[Any, ...]: tuple[ str, pl_expr.Operator | Iterable[pl_expr.Operator], - ], + ] + | None, bool, Zlice | None, str, @@ -1625,6 +1634,12 @@ def __init__( ) -> None: self.schema = schema self.predicate = predicate + # options[0] is a tuple[str, Operator, ...] + # The Operator class can't be pickled, but we don't use it anyway so + # just throw that away + if options[0] is not None: + options = (None, *options[1:]) + self.options = options self.children = (left, right) predicate_wrapper = self.Predicate(predicate) @@ -1637,20 +1652,21 @@ def __init__( raise NotImplementedError( f"Conditional join with predicate {predicate}" ) # pragma: no cover; polars never delivers expressions we can't handle - self._non_child_args = (predicate_wrapper, zlice, suffix, maintain_order) + self._non_child_args = (predicate_wrapper, options) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="ConditionalJoin") def do_evaluate( cls, predicate_wrapper: Predicate, - zlice: Zlice | None, - suffix: str, - maintain_order: Literal["none", "left", "right", "left_right", "right_left"], + options: tuple, left: DataFrame, right: DataFrame, ) -> DataFrame: """Evaluate and return a dataframe.""" + _, _, zlice, suffix, _, _ = options + lg, rg = plc.join.conditional_inner_join( left.table, right.table, @@ -1875,6 +1891,7 @@ def _build_columns( return columns @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Join") def do_evaluate( cls, @@ -2027,6 +2044,7 @@ def __init__( self.children = (df,) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="HStack") def do_evaluate( cls, @@ -2092,6 +2110,7 @@ def __init__( } @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Distinct") def do_evaluate( cls, @@ -2182,6 +2201,7 @@ def __init__( self.children = (df,) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Sort") def do_evaluate( cls, @@ -2232,6 +2252,7 @@ def __init__(self, schema: Schema, offset: int, length: int | None, df: IR): self.children = (df,) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Slice") def do_evaluate(cls, offset: int, length: int, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" @@ -2253,6 +2274,7 @@ def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR): self.children = (df,) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Filter") def do_evaluate(cls, mask_expr: expr.NamedExpr, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" @@ -2272,6 +2294,7 @@ def __init__(self, schema: Schema, df: IR): self.children = (df,) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Projection") def do_evaluate(cls, schema: Schema, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" @@ -2305,6 +2328,7 @@ def __init__(self, schema: Schema, key: str, left: IR, right: IR): self._non_child_args = (key,) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="MergeSorted") def do_evaluate(cls, key: str, *dfs: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" @@ -2425,6 +2449,7 @@ def get_hashable(self) -> Hashable: ) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="MapFunction") def do_evaluate( cls, schema: Schema, name: str, options: Any, df: DataFrame @@ -2525,6 +2550,7 @@ def __init__(self, schema: Schema, zlice: Zlice | None, *children: IR): schema = self.children[0].schema @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Union") def do_evaluate(cls, zlice: Zlice | None, *dfs: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" @@ -2582,6 +2608,7 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: ) @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="HConcat") def do_evaluate( cls, @@ -2627,6 +2654,7 @@ def __init__(self, schema: Schema): self.children = () @classmethod + @log_do_evaluate @nvtx_annotate_cudf_polars(message="Empty") def do_evaluate(cls, schema: Schema) -> DataFrame: # pragma: no cover """Evaluate and return a dataframe.""" diff --git a/python/cudf_polars/cudf_polars/dsl/tracing.py b/python/cudf_polars/cudf_polars/dsl/tracing.py index 2d0fad91f98..222c14df0ca 100644 --- a/python/cudf_polars/cudf_polars/dsl/tracing.py +++ b/python/cudf_polars/cudf_polars/dsl/tracing.py @@ -6,11 +6,180 @@ from __future__ import annotations import functools +import os +import time +from typing import TYPE_CHECKING, Any, Concatenate, Literal import nvtx +import pynvml +from typing_extensions import ParamSpec + +import rmm +import rmm.statistics + +from cudf_polars.utils.config import _bool_converter, get_device_handle + +try: + import structlog +except ImportError: + _HAS_STRUCTLOG = False +else: + _HAS_STRUCTLOG = True + + +LOG_TRACES = _HAS_STRUCTLOG and _bool_converter( + os.environ.get("CUDF_POLARS_LOG_TRACES", "0") +) CUDF_POLARS_NVTX_DOMAIN = "cudf_polars" nvtx_annotate_cudf_polars = functools.partial( nvtx.annotate, domain=CUDF_POLARS_NVTX_DOMAIN ) + +if TYPE_CHECKING: + from collections.abc import Callable, Sequence + + import cudf_polars.containers + from cudf_polars.dsl import ir + + +@functools.cache +def _getpid() -> int: # pragma: no cover + # Gets called for each IR.do_evaluate node, so we'll cache it. + return os.getpid() + + +def make_snapshot( + node_type: type[ir.IR], + frames: Sequence[cudf_polars.containers.DataFrame], + extra: dict[str, Any] | None = None, + *, + pid: int, + device_handle: Any | None = None, + phase: Literal["input", "output"] = "input", +) -> dict: # pragma: no cover; requires CUDF_POLARS_LOG_TRACES=1 + """ + Collect statistics about the evaluation of an IR node. + + Parameters + ---------- + node_type + The type of the IR node. + frames + The list of DataFrames to capture information for. For ``phase="input"``, + this is typically the dataframes passed to ``IR.do_evaluate``. For + ``phase="output"``, this is typically the DataFrame returned from + ``IR.do_evaluate``. + extra + Extra information to log. + pid + The ID of the current process. Used for NVML memory usage. + device_handle + The pynvml device handle. Used for NVML memory usage. + phase + The phase of the evaluation. Either "input" or "output". + """ + ir_name = node_type.__name__ + + d: dict[str, Any] = { + "type": ir_name, + f"count_frames_{phase}": len(frames), + f"frames_{phase}": [ + { + "shape": frame.table.shape(), + "size": sum(col.device_buffer_size() for col in frame.table.columns()), + } + for frame in frames + ], + } + d[f"total_bytes_{phase}"] = sum(x["size"] for x in d[f"frames_{phase}"]) + + stats = rmm.statistics.get_statistics() + if stats: + d.update( + { + f"rmm_current_bytes_{phase}": stats.current_bytes, + f"rmm_current_count_{phase}": stats.current_count, + f"rmm_peak_bytes_{phase}": stats.peak_bytes, + f"rmm_peak_count_{phase}": stats.peak_count, + f"rmm_total_bytes_{phase}": stats.total_bytes, + f"rmm_total_count_{phase}": stats.total_count, + } + ) + + if extra: + d.update(extra) + + if device_handle is not None: + processes = pynvml.nvmlDeviceGetComputeRunningProcesses(device_handle) + for proc in processes: + if proc.pid == pid: + d[f"nvml_current_bytes_{phase}"] = proc.usedGpuMemory + break + + return d + + +P = ParamSpec("P") + + +def log_do_evaluate( + func: Callable[Concatenate[type[ir.IR], P], cudf_polars.containers.DataFrame], +) -> Callable[Concatenate[type[ir.IR], P], cudf_polars.containers.DataFrame]: + """ + Decorator for an ``IR.do_evaluate`` method that logs information before and after evaluation. + + Parameters + ---------- + func + The ``IR.do_evaluate`` method to wrap. + """ + if not LOG_TRACES: + return func + else: # pragma: no cover; requires CUDF_POLARS_LOG_TRACES=1 + + @functools.wraps(func) + def wrapper( + cls: type[ir.IR], + *args: P.args, + **kwargs: P.kwargs, + ) -> cudf_polars.containers.DataFrame: + # do this just once + pynvml.nvmlInit() + maybe_handle = get_device_handle() + pid = _getpid() + log = structlog.get_logger() + + # By convention, all non-dataframe arguments (non_child) come first. + # Anything remaining is a dataframe. + frames: list[cudf_polars.containers.DataFrame] = ( + list(args) + list(kwargs.values()) + )[len(cls._non_child) :] # type: ignore[assignment] + + before = make_snapshot( + cls, frames, phase="input", device_handle=maybe_handle, pid=pid + ) + + # The decorator preserves the exact signature of the original do_evaluate method. + # Each IR.do_evaluate method is a classmethod that takes the IR class as first + # argument, followed by the method-specific arguments, and returns a DataFrame. + + start = time.monotonic_ns() + result = func(cls, *args, **kwargs) + stop = time.monotonic_ns() + + after = make_snapshot( + cls, + [result], + phase="output", + extra={"start": start, "stop": stop}, + device_handle=maybe_handle, + pid=pid, + ) + record = before | after + log.info("Execute IR", **record) + + return result + + return wrapper diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds.py index 75b56991a55..6041d9a9d79 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/pdsds.py @@ -124,7 +124,7 @@ def run_duckdb(benchmark: Any, options: Sequence[str] | None = None) -> None: result = execute_duckdb_query(duckdb_query, run_config.dataset_path) t1 = time.time() - record = Record(query=q_id, duration=t1 - t0) + record = Record(query=q_id, iteration=i, duration=t1 - t0) if args.print_results: print(result) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py index 3161dac0fbd..58fa86f65f8 100644 --- a/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py @@ -8,7 +8,10 @@ import argparse import dataclasses import importlib +import io +import itertools import json +import logging import os import statistics import sys @@ -23,6 +26,8 @@ import polars as pl +import rmm.statistics + try: import pynvml except ImportError: @@ -44,6 +49,17 @@ from pathlib import Path +try: + import structlog + import structlog.contextvars + import structlog.processors + import structlog.stdlib +except ImportError: + _HAS_STRUCTLOG = False +else: + _HAS_STRUCTLOG = True + + ExecutorType = Literal["in-memory", "streaming", "cpu"] @@ -52,8 +68,28 @@ class Record: """Results for a single run of a single PDS-H query.""" query: int + iteration: int duration: float shuffle_stats: dict[str, dict[str, int | float]] | None = None + traces: list[dict[str, Any]] | None = None + + @classmethod + def new( + cls, + query: int, + iteration: int, + duration: float, + shuffle_stats: dict[str, dict[str, int | float]] | None = None, + traces: list[dict[str, Any]] | None = None, + ) -> Record: + """Create a Record from plain data.""" + return cls( + query=query, + iteration=iteration, + duration=duration, + shuffle_stats=shuffle_stats, + traces=traces, + ) @dataclasses.dataclass @@ -205,6 +241,7 @@ class RunConfig: rapidsmpf_spill: bool spill_device: float query_set: str + collect_traces: bool = False stats_planning: bool def __post_init__(self) -> None: # noqa: D105 @@ -275,6 +312,7 @@ def from_args(cls, args: argparse.Namespace) -> RunConfig: rapidsmpf_spill=args.rapidsmpf_spill, max_rows_per_partition=args.max_rows_per_partition, query_set=args.query_set, + collect_traces=args.collect_traces, stats_planning=args.stats_planning, ) @@ -437,6 +475,10 @@ def initialize_dask_cluster(run_config: RunConfig, args: argparse.Namespace): # } ), ) + # Setting this globally makes the peak statistics not meaningful + # across queries / iterations. But doing it per query isn't worth + # the effort right now. + client.run(rmm.statistics.enable_statistics) except ImportError as err: if run_config.shuffle == "rapidsmpf": raise ImportError( @@ -721,6 +763,14 @@ def parse_args( default="duckdb", help="Which engine to use as the baseline for validation.", ) + + parser.add_argument( + "--collect-traces", + action=argparse.BooleanOptionalAction, + default=False, + help="Collect data tracing cudf-polars execution.", + ) + parser.add_argument( "--stats-planning", action=argparse.BooleanOptionalAction, @@ -764,8 +814,12 @@ def run_polars( print_query_plan(q_id, q, args, run_config, engine) records[q_id] = [] - for i in range(args.iterations): + if _HAS_STRUCTLOG and run_config.collect_traces: + setup_logging(q_id, i) + if client is not None: + client.run(setup_logging, q_id, i) + t0 = time.monotonic() try: @@ -800,7 +854,9 @@ def run_polars( print(f"❌ Query {q_id} failed validation!\n{e}") t1 = time.monotonic() - record = Record(query=q_id, duration=t1 - t0, shuffle_stats=shuffle_stats) + record = Record( + query=q_id, iteration=i, duration=t1 - t0, shuffle_stats=shuffle_stats + ) if args.print_results: print(result) @@ -809,6 +865,51 @@ def run_polars( run_config = dataclasses.replace(run_config, records=dict(records)) + # consolidate logs + if _HAS_STRUCTLOG and run_config.collect_traces: + + def gather_logs() -> str: + logger = logging.getLogger() + return logger.handlers[0].stream.getvalue() # type: ignore[attr-defined] + + if client is not None: + all_logs = "\n".join(client.run(gather_logs).values()) + else: + all_logs = gather_logs() + + parsed_logs = [json.loads(log) for log in all_logs.splitlines() if log] + # Some other log records can end up in here. Filter those out. + parsed_logs = [log for log in parsed_logs if log["event"] == "Execute IR"] + # Now we want to augment the existing Records with the trace data. + + def group_key(x: dict) -> int: + return x["query_id"] + + def sort_key(x: dict) -> tuple[int, int]: + return x["query_id"], x["iteration"] + + grouped = itertools.groupby( + sorted(parsed_logs, key=sort_key), + key=group_key, + ) + + for query_id, run_logs_group in grouped: + run_logs = list(run_logs_group) + by_iteration = [ + list(x) + for _, x in itertools.groupby(run_logs, key=lambda x: x["iteration"]) + ] + run_records = run_config.records[query_id] + assert len(by_iteration) == len(run_records) # same number of iterations + all_traces = [list(iteration) for iteration in by_iteration] + + new_records = [ + dataclasses.replace(record, traces=traces) + for record, traces in zip(run_records, all_traces, strict=True) + ] + + run_config.records[query_id] = new_records + if args.summarize: run_config.summarize() @@ -830,3 +931,78 @@ def run_polars( if query_failures or validation_failures: sys.exit(1) + + +def setup_logging(query_id: int, iteration: int) -> None: # noqa: D103 + import cudf_polars.dsl.tracing + + if not cudf_polars.dsl.tracing.LOG_TRACES: + msg = ( + "Tracing requested via --collect-traces, but tracking is not enabled. " + "Verify that 'CUDF_POLARS_LOG_TRACES' is set and structlog is installed." + ) + raise RuntimeError(msg) + + if _HAS_STRUCTLOG: + # structlog uses contextvars to propagate context down to where log records + # are emitted. Ideally, we'd just set the contextvars here using + # structlog.bind_contextvars; for the distributed scheduler we would need + # to use something like client.run to set the contextvars on the worker. + # However, there's an unfortunate conflict between structlog's use of + # context vars and how Dask Workers actually execute tasks, such that + # the contextvars set via `client.run` aren't visible to the actual + # tasks. + # + # So instead we make a new logger each time we need a new context, + # i.e. for each query/iteration pair. + + def make_injector( + query_id: int, iteration: int + ) -> Callable[[logging.Logger, str, dict[str, Any]], dict[str, Any]]: + def inject( + logger: Any, method_name: Any, event_dict: Any + ) -> dict[str, Any]: + event_dict["query_id"] = query_id + event_dict["iteration"] = iteration + return event_dict + + return inject + + shared_processors = [ + structlog.contextvars.merge_contextvars, + make_injector(query_id, iteration), + structlog.processors.add_log_level, + structlog.processors.CallsiteParameterAdder( + parameters=[ + structlog.processors.CallsiteParameter.PROCESS, + structlog.processors.CallsiteParameter.THREAD, + ], + ), + structlog.processors.StackInfoRenderer(), + structlog.dev.set_exc_info, + structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S.%f", utc=False), + ] + + # For logging to a file + json_renderer = structlog.processors.JSONRenderer() + + stream = io.StringIO() + json_file_handler = logging.StreamHandler(stream) + json_file_handler.setFormatter( + structlog.stdlib.ProcessorFormatter( + processor=json_renderer, + foreign_pre_chain=shared_processors, + ) + ) + + logging.basicConfig(level=logging.INFO, handlers=[json_file_handler]) + + structlog.configure( + processors=[ + *shared_processors, + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), + cache_logger_on_first_use=True, + ) diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index 9093de133b7..f9baa066b29 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -13,7 +13,7 @@ from cudf_polars.containers import DataFrame from cudf_polars.dsl.expr import Col from cudf_polars.dsl.ir import IR -from cudf_polars.dsl.tracing import nvtx_annotate_cudf_polars +from cudf_polars.dsl.tracing import log_do_evaluate, nvtx_annotate_cudf_polars from cudf_polars.experimental.base import get_key_name from cudf_polars.experimental.dispatch import generate_ir_tasks, lower_ir_node from cudf_polars.experimental.utils import _concat @@ -149,7 +149,12 @@ def __init__( self._non_child_args = (schema, keys, shuffle_method) self.children = (df,) - @classmethod + # the type-ignore is for + # Argument 1 to "log_do_evaluate" has incompatible type "Callable[[type[Shuffle], ]" + # expected Callable[[type[IR], ] + # But Shuffle is a subclass of IR, so this is fine. + @classmethod # type: ignore[arg-type] + @log_do_evaluate def do_evaluate( cls, schema: Schema, diff --git a/python/cudf_polars/cudf_polars/utils/config.py b/python/cudf_polars/cudf_polars/utils/config.py index 32376a721aa..1af1f9c7a98 100644 --- a/python/cudf_polars/cudf_polars/utils/config.py +++ b/python/cudf_polars/cudf_polars/utils/config.py @@ -28,7 +28,7 @@ import json import os import warnings -from typing import TYPE_CHECKING, Literal, TypeVar +from typing import TYPE_CHECKING, Any, Literal, TypeVar if TYPE_CHECKING: from collections.abc import Callable @@ -57,8 +57,9 @@ def _env_get_int(name: str, default: int) -> int: return default # pragma: no cover -def get_total_device_memory() -> int | None: - """Return the total memory of the current device.""" +@functools.cache +def get_device_handle() -> Any: + # Gets called for each IR.do_evaluate node, so we'll cache it. import pynvml try: @@ -74,12 +75,23 @@ def get_total_device_memory() -> int | None: handle = pynvml.nvmlDeviceGetDeviceHandleFromMigDeviceHandle(handle) else: handle = pynvml.nvmlDeviceGetHandleByIndex(int(index)) - - return pynvml.nvmlDeviceGetMemoryInfo(handle).total - except pynvml.NVMLError_NotSupported: # pragma: no cover # System doesn't have proper "GPU memory". return None + else: + return handle + + +def get_total_device_memory() -> int | None: + """Return the total memory of the current device.""" + import pynvml + + maybe_handle = get_device_handle() + + if maybe_handle is not None: + return pynvml.nvmlDeviceGetMemoryInfo(maybe_handle).total + else: # pragma: no cover + return None @functools.cache diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index d12701b1de4..43fc6f56d68 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -46,11 +46,15 @@ test = [ "pytest-httpserver", "pytest-xdist", "rich", + "structlog", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. experimental = [ "nvidia-ml-py>=12", "rapids-dask-dependency==25.12.*,>=0.0.0a0", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. +trace = [ + "structlog", +] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. [project.urls] Homepage = "https://github.com/rapidsai/cudf" diff --git a/python/cudf_polars/tests/test_tracing.py b/python/cudf_polars/tests/test_tracing.py new file mode 100644 index 00000000000..9a163b9e095 --- /dev/null +++ b/python/cudf_polars/tests/test_tracing.py @@ -0,0 +1,76 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import subprocess +import sys +import textwrap +from typing import Any + +import pytest + +import polars as pl + +import cudf_polars.testing.asserts + +structlog = pytest.importorskip("structlog") + + +@pytest.fixture(name="log_output") +def fixture_log_output(): + return structlog.testing.LogCapture() + + +@pytest.fixture(autouse=True) +def fixture_configure_structlog(log_output): + structlog.configure(processors=[log_output]) + + +def test_trace_basic( + log_output: Any, + monkeypatch: pytest.MonkeyPatch, +) -> None: + # Whether tracing is enabled is determined when cudf_polars is imported. + # So our best way of testing this is to run things in a subprocess + # to control the environment and isolate it from the rest of the test suite. + code = textwrap.dedent("""\ + import polars as pl + import rmm + + q = pl.DataFrame({"a": [1, 2, 3]}).lazy().select(pl.col("a").sum()) + q.collect(engine=pl.GPUEngine(memory_resource=rmm.mr.ManagedMemoryResource())) + """) + + env = { + "CUDF_POLARS__EXECUTOR": cudf_polars.testing.asserts.DEFAULT_EXECUTOR, + "CUDF_POLARS_LOG_TRACES": "1", + } + + result = subprocess.check_output([sys.executable, "-c", code], env=env) + # Just ensure that the default structlog output is in the result + assert b"Execute IR" in result + assert b"frames_output" in result + assert b"frames_input" in result + assert b"total_bytes_output" in result + assert b"total_bytes_input" in result + assert b"rmm_total_bytes_output" in result + assert b"rmm_total_bytes_input" in result + assert b"rmm_current_bytes_output" in result + + +def test_import_without_structlog(monkeypatch: pytest.MonkeyPatch) -> None: + modules = list(sys.modules) + + for module in modules: + if module.startswith("cudf_polars"): + monkeypatch.delitem(sys.modules, module) + monkeypatch.setitem(sys.modules, "structlog", None) + + import cudf_polars.dsl.tracing + + assert not cudf_polars.dsl.tracing._HAS_STRUCTLOG + + # And we can run a query without error + q = pl.DataFrame({"a": [1, 2, 3]}).lazy().select(pl.col("a").sum()) + q.collect(engine="gpu")