Skip to content

Conversation

@wence-
Copy link
Contributor

@wence- wence- commented Nov 4, 2025

No description provided.

@wence- wence- requested a review from a team as a code owner November 4, 2025 15:27
@wence- wence- added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Nov 4, 2025
@quasiben
Copy link
Member

quasiben commented Nov 5, 2025

When running this example I was getting errors like:

TypeError: unhashable type: 'rmm.pylibrmm.stream.Stream'

Changing chunk_streams from a set to a list resolved the issue for me.

diff --git a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py
index c5dac75..39401f3 100644
--- a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py
+++ b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py
@@ -233,10 +233,10 @@ async def broadcast_join(
     keep_keys: bool,
 ) -> None:
     left_tables: list[TableChunk] = []
-    chunk_streams = set()
+    chunk_streams = []
     while (msg := await left_ch.recv(ctx)) is not None:
         left_tables.append(TableChunk.from_message(msg))
-        chunk_streams.add(left_tables[-1].stream)
+        chunk_streams.append(left_tables[-1].stream)
     build_stream = ctx.get_stream_from_pool()
     join_streams(list(chunk_streams), build_stream)
     if len(left_tables) == 1:
diff --git a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py b/python/rapidsmpf/rapidsmpf/examp
les/streaming/ndsh/q09.py
les/streaming/ndsh/q09.py
index c5dac75..39401f3 100644
--- a/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py
+++ b/python/rapidsmpf/rapidsmpf/examples/streaming/ndsh/q09.py
@@ -233,10 +233,10 @@ async def broadcast_join(
     keep_keys: bool,
 ) -> None:
     left_tables: list[TableChunk] = []
-    chunk_streams = set()
+    chunk_streams = []
     while (msg := await left_ch.recv(ctx)) is not None:
         left_tables.append(TableChunk.from_message(msg))
-        chunk_streams.add(left_tables[-1].stream)
+        chunk_streams.append(left_tables[-1].stream)
     build_stream = ctx.get_stream_from_pool()
     join_streams(list(chunk_streams), build_stream)
     if len(left_tables) == 1:
@@ -257,7 +257,7 @@ async def broadcast_join(
     chunk_streams.clear()
     while (msg := await right_ch.recv(ctx)) is not None:
         chunk = TableChunk.from_message(msg)
-        chunk_streams.add(chunk.stream)
+        chunk_streams.append(chunk.stream)
         join_streams([build_stream], chunk.stream)
         # Safe to access left_carrier on chunk.stream
         right_columns = chunk.table_view().columns()
@@ -343,11 +343,11 @@ async def concatenate(
 ) -> None:
     chunks = []
     build_stream = ctx.get_stream_from_pool()
-    chunk_streams = set()
+    chunk_streams = []
     while (msg := await ch_in.recv(ctx)) is not None:
         chunk = TableChunk.from_message(msg)
         chunks.append(chunk)
-        chunk_streams.add(chunk.stream)
+        chunk_streams.append(chunk.stream)
     join_streams(list(chunk_streams), build_stream)
     table = plc.concatenate.concatenate(
         [chunk.table_view() for chunk in chunks], build_stream

@beckernick
Copy link
Member

beckernick commented Nov 5, 2025

For what it's worth, I didn't run into this issue.

When running this example I was getting errors like:

TypeError: unhashable type: 'rmm.pylibrmm.stream.Stream'

Changing chunk_streams from a set to a list resolved the issue for me.

EDIT:

Just kidding. Once I built for CUDA 13 switched to a machine with rapidsmpf at a more recent commit (501d72c) (both on cuDF commit 60edcc2f6ac8a325b4a7e340693f7d83c34d0377) I started seeing this issue

@beckernick
Copy link
Member

beckernick commented Nov 5, 2025

When I run this q9 implementation at SF1K (parquet, floats not decimals, partitioned tables) on 1x H100 of an internal DGX H100 system, I get the following performance:

$ CUDA_VISIBLE_DEVICES=4 python q09.py
Iteration 0: Pipeline construction 0.0121s
Iteration 0: Pipeline execution 41.03s
Iteration 1: Pipeline construction 0.009996s
Iteration 1: Pipeline execution 35.29s

@wence- , is this in line with expectations from your local testing or unexpected?

@TomAugspurger
Copy link
Contributor

The stream changes are probably from https://github.com/rapidsai/rmm/pull/2110/files#diff-83f78faf97018ac3ad1e8b43d42eb56d0a6acc442b499a75fb04a7838301174eR127 (cc @nirandaperera). https://docs.python.org/3/reference/datamodel.html#object.__hash__ mentions that objects implementing __eq__ must also implement __hash__ to be used in hashable collections like set().

I'm not sure what the best way to uniquely identify a stream. I see https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__STREAM.html#group__CUDART__STREAM_1g5799ae8dd744e561dfdeda02c53e82df, but I don't know if that's an option for RMM.

If we only deal with owning streams at the Python level, then I think that id(self) is fine since the memory address of the python object will uniquely identify the CUDA stream. If we have a view on some other stream (Python or C++) we'd need a way to get the original stream.

orders_files = get_files(orders, parquet_suffix)
nation_files = get_files(nation, parquet_suffix)
nodes: list[CppNode | PyNode] = []
lineitem_ch = Channel[TableChunk]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably aware, but these Channel() calls need to change to ctx.create_channel() now that #631 is in. Just leaving a comment for viz.

@wence-
Copy link
Contributor Author

wence- commented Nov 6, 2025

When I run this q9 implementation at SF1K (parquet, floats not decimals, partitioned tables) on 1x H100 of an internal DGX H100 system, I get the following performance:

$ CUDA_VISIBLE_DEVICES=4 python q09.py
Iteration 0: Pipeline construction 0.0121s
Iteration 0: Pipeline execution 41.03s
Iteration 1: Pipeline construction 0.009996s
Iteration 1: Pipeline execution 35.29s

@wence- , is this in line with expectations from your local testing or unexpected?

This seems very slow. I will try on an H100 as well...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants