Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Nov 27, 2025

What changes were proposed in this pull request?

When ShuffleBlockFetcherIterator fetch data, two shuffle cost not calculated.

  1. Network resource congestion and waiting between fetchUpToMaxBytes and fetchAllHostLocalBlocks ;
  2. Connection establishment congestion. When fetchUpToMaxBytes and fetchAllHostLocalBlocks send request, create client may be congestion

Why are the changes needed?

Make shuffle fetch wait time request time more accurate.

Does this PR introduce any user-facing change?

No

How was this patch tested?

For open block request add a Thread.sleep(3000) latency, shuffle read metrics like below

截屏2025-11-27 17 38 26

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Nov 27, 2025
@AngersZhuuuu
Copy link
Contributor Author

gentle ping @cloud-fan @holdenk Could you take a look?

fetchAllHostLocalBlocks(hostLocalBlocksByExecutor)
pushBasedFetchHelper.fetchAllPushMergedLocalBlocks(pushMergedLocalBlocks)
val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
shuffleMetrics.incFetchWaitTime(fetchWaitTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this metric is never updated before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discuss, change to like this.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 28, 2025

Please clearly describe the problem. How is shuffle fetch wait time calculated today and why it's inaccurate.

@AngersZhuuuu
Copy link
Contributor Author

Please clearly describe the problem. How is shuffle fetch wait time calculated today and why it's inaccurate.

Add more explain in the pr desc, pls take a look


initialize()

private def incFetchWaitTime[T](f: () => T): Unit = {
Copy link
Contributor

@cloud-fan cloud-fan Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def incFetchWaitTime[T](f: () => T): Unit = {
private def withFetchWaitTimeTracked[T](f: => T): Unit = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val t = f()
val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
shuffleMetrics.incFetchWaitTime(fetchWaitTime)
return t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return t
t

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private def incFetchWaitTime[T](f: () => T): Unit = {
val startFetchWait = System.nanoTime()
val t = f()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val t = f()
val res = f()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-54536[CORE] Shuffle FetchWaitTime missing first call fetchUpToMaxBytes() open block rpc response time [SPARK-54536][CORE] Shuffle FetchWaitTime missing collect create client/wait cost Dec 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants