Skip to content

Spark: Avoid real-time waits in streaming read timestamp tests#16965

Draft
huan233usc wants to merge 1 commit into
apache:mainfrom
huan233usc:spark-streaming-test-no-busyspin
Draft

Spark: Avoid real-time waits in streaming read timestamp tests#16965
huan233usc wants to merge 1 commit into
apache:mainfrom
huan233usc:spark-streaming-test-no-busyspin

Conversation

@huan233usc

Copy link
Copy Markdown
Contributor

Problem

TestStructuredStreamingRead3 has two timestamp-based streaming read tests that pick a stream start timestamp in the future and then wait for the wall clock to reach it:

  • testReadingStreamFromFutureTimetsamp uses now + 10000 ms
  • testReadingStreamFromTimestampFutureWithExistingSnapshots uses now + 2000 ms

The wait goes through TestBase.waitUntilAfter(), which busy-spins on System.currentTimeMillis(). So the future-timestamp test idles for ~10s on every run (it averaged ~16s/run in CI), and because the wait is a spin it pegs a core for the whole duration, contending with the other parallel Gradle test forks.

Change

Anchor the stream's start timestamp to the committed snapshot's timestampMillis() + 1 instead of a synthetic future instant:

  • snapshots committed before that point are still excluded;
  • snapshots committed after waitUntilAfter(startTimestamp) returns are still included.

The read semantics are unchanged, but no real time has to elapse — waitUntilAfter is now handed a near-current timestamp and returns within ~1ms. This matches the existing testReadingStreamFromTimestamp / testReadingStreamFromTimestampOfExistingSnapshot idiom in the same class.

waitUntilAfter() is also changed to sleep once for the remaining time instead of busy-spinning, so any remaining future wait no longer burns a core.

Applied identically to spark v3.5, v4.0 and v4.1.

Note on coverage

The original testReadingStreamFromFutureTimetsamp asserted emptiness after each insert while the stream was running (the inserts landed before the future timestamp elapsed). The rewrite commits those snapshots first and starts the stream from a timestamp just after them, so it exercises start-timestamp filtering rather than a running stream crossing the boundary. Verifying the latter deterministically would require controlling snapshot commit timestamps, which has no clean test hook; both paths hit the same start-timestamp boundary check.

Testing

  • The two methods pass on spark v3.5, v4.0 and v4.1 (HIVE/async + REST params).
  • Stress-ran the two methods 20x on v3.5 with the Gradle build cache disabled: 0 failures.

TestStructuredStreamingRead3.testReadingStreamFromFutureTimetsamp and
testReadingStreamFromTimestampFutureWithExistingSnapshots picked a start
timestamp of "now + 10s" / "now + 2s" and then waited for the wall clock
to reach it via waitUntilAfter(). That wait is pure idle time on every
run (the future test alone averaged ~16s/run in CI).

Anchor the stream's start timestamp to the committed snapshot's
timestampMillis() + 1 instead. Snapshots committed before that point are
still excluded and those committed after are still included, so the
semantics are unchanged, but no synthetic future instant has to elapse.

waitUntilAfter() also busy-spun on System.currentTimeMillis(), pegging a
core for the entire wait and starving other parallel test forks on a
busy CI box. Sleep once for the remaining time instead; a past timestamp
returns immediately.

Applied identically to spark v3.5, v4.0 and v4.1.
@github-actions github-actions Bot added the spark label Jun 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant