-
Notifications
You must be signed in to change notification settings - Fork 990
Trace node execution in cudf-polars #19895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Trace node execution in cudf-polars #19895
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
49ae03e to
cad2f48
Compare
This PR introduces a new *low-overhead* tracing tool for cudf-polars. When enabled, we'll capture a record for each `IR.do_evaluate` node executed while running the polars query.
wence-
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you show some benchmark data for the overhead this adds? In both off and on modes?
python/cudf_polars/cudf_polars/experimental/benchmarks/utils.py
Outdated
Show resolved
Hide resolved
First, there's the extra time at import (measured with
The overall So tl/dr: I'm not worried about the import time cost. Which leaves the runtime overhead. Let's measure the cost of executing a import os
import sys
import time
import cudf_polars
import cudf_polars.containers
import polars as pl
from cudf_polars.dsl.translate import Translator
q = pl.DataFrame({"a": [1, 2, 3]}).lazy()
ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir()
df = q.collect()
schema = {
"a": cudf_polars.containers.DataType(pl.Int64)
}
t = type(ir)
t0 = time.monotonic()
t.do_evaluate(schema, df, None)
t1 = time.monotonic()
print("CUDF_POLARS_LOG_TRACES", os.environ.get("CUDF_POLARS_LOG_TRACES", "0"), file=sys.stderr)
print(f"Initial: {t1 - t0:0.2f} seconds", file=sys.stderr)
t0 = time.monotonic()
for i in range(100):
t.do_evaluate(schema, df, None)
t1 = time.monotonic()
print(f"Subsequent: {t1 - t0:0.2f} seconds", file=sys.stderr)On main, the first call takes between 0.4 - 0.6 seconds and the subsequent On this branch withouthout tracing enabled, the first call takes roughly 0.4-0.6 seconds, and the With tracing, the first call takes between 0.4 - 0.6 seconds (very roughly the same). The I think that DataFrameScan with a small input should be the worst case scenario. In general, the amount of overhead includes will be fixed, so the faster the node the larger the overhead is in percentage terms. The only part of the tracing that scales is in the number of input dataframes, and the number of columns per dataframe (but not in the number of rows). Overall, I think things look acceptable. |
|
I've run the full pdsh benchmarks for both single (SF-1K) and multi-GPU (SF-3K) from this branch with 3 iterations each. The import pandas as pd
import json
import pyarrow as pa
runs = [json.loads(x) for x in open("single.jsonl").readlines()]
dfs = []
for run in runs:
for query_id, records in run["records"].items():
for iteration_id, iteration in enumerate(records):
traces = iteration["traces"]
df = pa.Table.from_pylist(traces).to_pandas(types_mapper=pd.ArrowDtype)
dfs.append(df)
df = pd.concat(dfs)
df.to_parquet("traces.parquet")And the file drops to about 2 MB. I think keeping them in the |
Matt711
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @TomAugspurger, I think this a nice low-overhead improvement. We aren't including network latency breakdown in this PR, but do you think that's something we could eventually achieve?
At the very least there is a --nic-metrics flag in nsys. So we could turn that on (if we haven't) for profiling to get an idea of network activity.
One clarifying question: do you mean network latency for things like reading parquet files over the network? Or network latency for moving data between nodes in some kind of multi-GPU setup? The tracing here is fundamentally at the "execute this IR node level". So it could possibly capture things like the network activity that happened while executing the IR node. That said any kind of metric based off reading a device counter would be hard to trace back to a specific IR node, since IIUC other stuff could be happening on that device to increment that counter. Then there's the communication that needs to happen to ensure that the data necessary to execute some node, in a multi-GPU setup. Today, that's essentially outside of cudf-polars hands. With our dask-based execution, communication happens implicitly thanks to the task framework. Some IR node will depend on the output of one or more earlier IR nodes, and Dask will take care of getting the data there for you. That would show up this tracing only as a gap in time between the conclusion of one IR node and the start of the subsequent nodes. This is a major weakness of doing the tracing at the IR node execution level. |
|
/merge |
ebe3d8b
into
rapidsai:branch-25.12
Fixes a regression in rapidsai#19895, which changed the logic for when we catch this exception and fall back to the default device memory size.
Fixes a regression in #19895, which changed the logic for when we catch this exception and fall back to the default device memory size. Authors: - Tom Augspurger (https://github.com/TomAugspurger) Approvers: - Matthew Murray (https://github.com/Matt711) URL: #20179
The so-called "low-overhead" tracing added in #19895 *can* have some measurable overhead in some cases (see below). This PR adds additional configuration options to control which metrics are collected when tracing is enabled. The default is not to collect any traces, which is zero overhead. With `CUDF_POLARS_LOG_TRACES=1`, *all* tracing is enabled, which includes information on memory from RMM and NVML, and information on the input / output dataframes from cudf-polars. Users can disable certain metrics by setting another environment variable. For example, this would log disable logging of memory (from RMM and nvml): ``` CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 python ... ``` and this would disable the memory and dataframe-related metrics: ``` CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 CUDF_POLARS_LOG_TRACES_DATAFRAMES=0 python ... ``` This boxplot shows the runtime of our PDSH benchmarks at SF-3K with the distributed scheduler, using 8 workers with an H100 each, 5 iterations per run. There are 3 runs show: 1. "on": tracing was enabled with `CUDF_POLARS_LOG_TRACES=1` 2. "off": tracing was not enabled 3. time-only: tracing was enabled, but memory and dataframe metrics were disabled, with `CUDF_POLARS_LOG_TRACES=1 CUDF_POLARS_LOG_TRACES_MEMORY=0 CUDF_POLARS_LOG_TRACES_DATAFRAMES=0` <img width="1600" height="800" alt="tracing-overhead" src="https://github.com/user-attachments/assets/ccb4b454-233f-45d5-8a4e-36cb586e1ba0" /> The interesting parts are the large gaps between the "on" box and the two "off" / "time-only" boxes, which I've highlighted. These indicate that the tracing overhead is relatively large with all the metrics turned on. But the limited tracing that only measures durations doesn't have that same overhead, because the "off" and "time-only" boxes are overlapping. --- A note on the implementation: I wasn't sure whether to make things opt-in or opt-out. Right now we have a mix (opt in to everything with `CUDF_POLARS_LOG_TRACES=1`, and opt out of specific metrics with `CUDF_POLARS_LOG_TRACES_MEMORY=0`). We could easily make it opt-in to specific metrics (e.g. `CUDF_POLARS_LOG_TRACES_MEMORY=1` would enable just memory, `CUDF_POLARS_LOG_TRACES_DATAFRAMES=1` would enable just dataframe tracing). Neither option seemed obviously better to me. Authors: - Tom Augspurger (https://github.com/TomAugspurger) Approvers: - Matthew Murray (https://github.com/Matt711) URL: #20223
Description
This PR introduces a new low-overhead tracing tool for cudf-polars. When enabled, we'll capture a record for each
IR.do_evaluatenode executed while running the polars query. The record will containThe overhead of this when not tracing should just be a conditional check against a constant bool, and an extra function call. The overhead when tracing is enabled should be minimal (mostly getting the dataframe sizes, RMM statistics, and emitting the log record).
The implementation is based on structlog. This is helpful for us because we can inject additional context (like the PDSH query ID being run in our benchmarks) into the log records.
As an example of what this can be used for, I've updated our PDSH benchmarks to include a new
--collect-tracesflag. When set, this will include thetraceson eachrecordexported to ourpdsh_results.jsonlfile. Therecordsfor that run (one per query / iteration pair) will now contain alist[traces]field, with the traces from each node executed during each query.Checklist