Skip to content

Conversation

@nirandaperera
Copy link
Contributor

@nirandaperera nirandaperera commented Nov 22, 2025

This PR introduces the following changes to Shuffler PostBox.

  • Currently PostBox has an internal mutex and to prevent extractions during spilling, its is also protected by an external mutex in the Shuffler. IINM, the idea there is, only to block extraction and still allow insertions during spilling.
  • This PR introduces finer grained locking to the postbox. It adds a mutex per key, and removes the external mutex in the shuffler.
  • It also now requires the keys during the postbox construction. This allows keys to be populated during the initialization. This allows us to remove the class level mutex in the PostBox. Consequence of this is, that the emptiness test has now become more complicated. Its handled by an atomic counter.
  • Additionally, we were stashing values as unordered_map<ChunkID, Chunk> map. This PR changes this to use vector<Chunk> because we will no longer be querying from ChunkID values.

Todo:

  • Add multithreaded spilling test
  • Do some nsys profiling

profiling

Closes #674

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@nirandaperera nirandaperera added breaking Introduces a breaking change improvement Improves an existing functionality labels Nov 22, 2025
@nirandaperera nirandaperera requested a review from a team as a code owner November 22, 2025 00:28
@nirandaperera nirandaperera marked this pull request as draft November 22, 2025 00:29
@copy-pr-bot
Copy link

copy-pr-bot bot commented Nov 22, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@nirandaperera nirandaperera marked this pull request as ready for review November 22, 2025 00:46
Signed-off-by: niranda perera <[email protected]>
@TomAugspurger
Copy link
Contributor

I'll look at the implementation in a bit, but one comment on the strategy here: As you say, the goal of our lock here is to

prevent extractions during spilling

There are a few ways to accomplish this:

  1. A single lock around the Postbox: anybody wishing to spill or extract any buffer must acquire that lock. Spilling requires acquiring the lock, spilling the buffer, and releasing the lock. This is our strategy on main.
  2. Many locks in the Postbox, one per key. Anybody wishing to spill or extract a specific buffer must acquire the lock for that buffer. This is the strategy here, IIUC.

When I wrote up the issue, I imagined a variant of 1: What if we have just the single lock, but release it while doing the actual spill operation?

# thread 1: spilling
with lock:
    chunk_ids = postbox.search(...)

# release the lock to perform the spill
# This is currently unsafe, because someone could extract
# the buffer mid-spill
for chunk_id in chunk_ids:
    postbox.spill(chunk_id)

with lock:
    postbox.update_state(buffers)

As written, my pseudocode isn't safe: We can't have some other thread come along and try to extract one of the buffers being spilled in that for loop.

# thread 2: extract
with lock:
    buffer = postbox.extract(chunk_id)

because the lock isn't held for my spill function, acquiring the lock in extract doesn't ensure that the buffer isn't moving out from underneath us.

If we wanted to do something like this, we'd need to make the Postbox state a bit more complicated. I think it'd need some kind of map of {ChunkID: SpillState} where SpillState can be something like "Device", "DeviceToHost" (i.e. in the process of being spilled), "Host", and maybe "HostToDevice". The critical sections, which must have a lock to access and update, are around checking and updating the state of each buffer. While, or after, identifying a buffer to spill we'd also need to update its state from Device to DeviceToHost. Likewise for the extract side: when we identify a buffer to extract we'd need to remove it from the postbox's set of keys (while holding the lock).

Then we'd need to figure out what we want the extract thread to do when it encounters a buffer that's in the process of being spilled: a spin loop, waiting for it to be spilled (which will happen when all of the spill threads are done with all of their spill buffers). And that gets into larger architectural questions like why are we extracting this specific chunk ID? Do we have other chunks we can work on? etc.

@nirandaperera
Copy link
Contributor Author

nirandaperera commented Nov 24, 2025

@TomAugspurger I think I agree with your take. Initially I was thinking about an atomic<State> for each key which could have,

  • Empty
  • Inserting (transitioning from Empty/ Available to Available )
  • Available
  • Extracting (transitioning from Available to Empty/ Available)
  • Spilling (Available to available)

This way, we could make PostBoxes truly lock-free. But felt like the impl is a bit involved where we'd have to do a bunch of CAS on the atomic.
But the mutex-per-key approach is more-or-less similar because, what is does is, if a key is not in an "available" state, lock would either wait or we can choose to skip that key.
If mutexes truly are a bottleneck, let's replace it with the atomic (I think I have a fair idea about how to do that even now).

Then we'd need to figure out what we want the extract thread to do when it encounters a buffer that's in the process of being spilled:

This depends on the extract method. When we want to extract outputs, we call extract with an exact key. In that case, caller threads can wait. But when progress thread extracts data/ spill thread spilling doesn't really care about the Key, so, we can skip a key if its busy (inserting/ spilling/ extracting).

And that gets into larger architectural questions like why are we extracting this specific chunk ID?

This was used only for spilling earlier, and this PR removes it. The reasoning was, we first query a postbox what were the chunks that have device data, and then come back to spill each of those.

Signed-off-by: niranda perera <[email protected]>
@TomAugspurger
Copy link
Contributor

Thanks for the overview.

LMK if you need any help with profiling / benchmarking. The setup in #674 used query 4 from cudf-polars tpch benhmarks at SF 1K on an H100: python -m cudf_polars.experimental.benchmarks.pdsh 4 --iterations 1 --path=/data/tpch-rs/scale-1000 --suffix --spill-device 0.55 --blocksize 1_000_000_000 --rmm-async --stream-policy pool --runtime rapidsmpf.

@nirandaperera
Copy link
Contributor Author

@TomAugspurger Can you run a profile with this PR?

@TomAugspurger
Copy link
Contributor

Sure.
rapidsmpf.q4-lock-contention.1k.nsys-rep.zip (zip archive with just the nsys report) runs query 4 with this branch for rapidsmpf.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Nov 24, 2025

A couple things I notice:

  1. the runtime is approximately the same as I've been seeing: ~20-25 seconds.
  2. I do still see the two threads seeming to both want the same lock
    image (is this an insertion thread and a background spilling thread?)
  3. Once we get to the actual computation, we do spend a bit of time trading locks between threads:
Screenshot 2025-11-24 at 4 16 17 PM

I'm not sure at this point whether any of these is a problem. Maybe it'd be worth writing up a simpler reproducer. I guess https://gist.github.com/TomAugspurger/d5b0d3b0e5765e448aa07a4fcc706171#file-slow_spill-py might cover this a bit, but I'm not sure.

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@nirandaperera
Copy link
Contributor Author

@TomAugspurger Looking at the profile, it does seem like there are mutex lock regions corresponding to spill regions. I'm suspecting these could be insert/extract calls. I think one problem with this impl is, insert/ extract locks each key per chunk while spill locks all the chunks. So, its quite possible that spilling receives more time.
eg. pid 10 has 100 chunks and insert is trying to append another, while spill has lock and keeps on spilling for all 100 chunks. Since spilling takes long time, this unnecessarily delays insert. Hmm. I need to verify this. Let me check this again tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking Introduces a breaking change improvement Improves an existing functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Reduce lock contention in spilling

2 participants