Skip to content

Commit 84263e3

Browse files
Fix execution context queue stress tests failures (#16472)
Refactors and abstracts the stress test specs of `Fiber::ExecutionContext::Runnables` and `Fiber::ExecutionContext::GlobalQueue` so no loop will run forever: the "thread setup" part has been removed, the main fiber won't block waiting for the threads to be ready, and the threads' loop will eventually timeout, and the thread return, so the main fiber won't block while joining the threads. I abstracted a helper because the different tests used the same structure, and it was painful & noisy to dup the logic. This fixes the regular CI failures that often occurred on Darwin on CI, and that I just reproduced on Linux when running these specs in tight loops multiple times in parallel to overload the CPU cores. Co-authored-by: Johannes Müller <[email protected]>
1 parent a8e85a6 commit 84263e3

File tree

5 files changed

+196
-160
lines changed

5 files changed

+196
-160
lines changed

spec/std/fiber/execution_context/global_queue_spec.cr

Lines changed: 87 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -91,131 +91,112 @@ describe Fiber::ExecutionContext::GlobalQueue do
9191
# interpreter doesn't support threads yet (#14287)
9292
pending_interpreted describe: "thread safety" do
9393
it "one by one" do
94+
{% if flag?(:win32) && flag?(:aarch64) %}
95+
pending! "CI/WIN32/CLANGARM64 always fails"
96+
{% end %}
97+
9498
fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 763).new do |i|
9599
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
96100
end
97101

98102
n = 7
99103
increments = 15
104+
100105
queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
101-
ready = Thread::WaitGroup.new(n)
102-
103-
threads = Array.new(n) do |i|
104-
new_thread("ONE-#{i}") do
105-
slept = 0
106-
ready.done
107-
108-
loop do
109-
if fiber = queue.pop?
110-
fc = fibers.find! { |x| x.@fiber == fiber }
111-
queue.push(fiber) if fc.increment < increments
112-
slept = 0
113-
elsif slept < 100
114-
slept += 1
115-
Thread.sleep(1.nanosecond) # don't burn CPU
116-
else
117-
break
118-
end
119-
end
120-
end
121-
end
122-
ready.wait
123106

124-
fibers.each_with_index do |fc, i|
125-
queue.push(fc.@fiber)
126-
Thread.sleep(10.nanoseconds) if i % 10 == 9
127-
end
107+
Fiber::ExecutionContext.stress_test(
108+
n,
109+
iteration: ->(i : Int32) {
110+
if fiber = queue.pop?
111+
fc = fibers.find! { |x| x.@fiber == fiber }
112+
queue.push(fiber) if fc.increment < increments
113+
return :next
114+
end
128115

129-
threads.each(&.join)
116+
# done?
117+
if fibers.all? { |fc| fc.counter >= increments }
118+
return :break
119+
end
120+
},
121+
publish: -> {
122+
fibers.each_with_index do |fc, i|
123+
queue.push(fc.@fiber)
124+
Thread.sleep(10.nanoseconds) if i % 10 == 9
125+
end
126+
},
127+
)
130128

131129
# must have dequeued each fiber exactly X times
132130
fibers.each { |fc| fc.counter.should eq(increments) }
133131
end
134132

135-
{% if flag?(:darwin) %}
136-
# FIXME: the spec regularly fails on macOS with "expected 15 got 0"
137-
pending "bulk operations"
138-
{% else %}
139-
it "bulk operations" do
140-
n = 7
141-
increments = 15
133+
it "bulk operations" do
134+
n = 7
135+
increments = 15
142136

143-
fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5
144-
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
145-
end
137+
fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5
138+
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
139+
end
146140

147-
queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
148-
ready = Thread::WaitGroup.new(n)
149-
150-
threads = Array.new(n) do |i|
151-
new_thread("BULK-#{i}") do
152-
slept = 0
153-
154-
r = Fiber::ExecutionContext::Runnables(3).new(queue)
155-
156-
batch = Fiber::List.new
157-
size = 0
158-
159-
reenqueue = -> {
160-
if size > 0
161-
queue.bulk_push(pointerof(batch))
162-
names = [] of String?
163-
batch.each { |f| names << f.name }
164-
batch.clear
165-
size = 0
166-
end
167-
}
168-
169-
execute = ->(fiber : Fiber) {
170-
fc = fibers.find! { |x| x.@fiber == fiber }
171-
172-
if fc.increment < increments
173-
batch.push(fc.@fiber)
174-
size += 1
175-
end
176-
}
177-
178-
ready.done
179-
180-
loop do
181-
if fiber = r.shift?
182-
execute.call(fiber)
183-
slept = 0
184-
next
185-
end
186-
187-
if fiber = queue.grab?(r, 1)
188-
reenqueue.call
189-
execute.call(fiber)
190-
slept = 0
191-
next
192-
end
193-
194-
if slept >= 100
195-
break
196-
end
197-
198-
reenqueue.call
199-
slept += 1
200-
Thread.sleep(1.nanosecond) # don't burn CPU
201-
end
202-
end
141+
queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
142+
143+
runnables = Array.new(n) { Fiber::ExecutionContext::Runnables(3).new(queue) }
144+
batches = Array.new(n) { Fiber::List.new }
145+
146+
reenqueue = ->(batch : Pointer(Fiber::List)) {
147+
if batch.value.size > 0
148+
queue.bulk_push(batch)
149+
names = [] of String?
150+
batch.value.each { |f| names << f.name }
151+
batch.value.clear
203152
end
204-
ready.wait
205-
206-
# enqueue in batches of 5
207-
0.step(to: fibers.size - 1, by: 5) do |i|
208-
list = Fiber::List.new
209-
5.times { |j| list.push(fibers[i + j].@fiber) }
210-
queue.bulk_push(pointerof(list))
211-
Thread.sleep(10.nanoseconds) if i % 4 == 3
153+
}
154+
155+
execute = ->(fiber : Fiber, batch : Pointer(Fiber::List)) {
156+
fc = fibers.find! { |x| x.@fiber == fiber }
157+
158+
if fc.increment < increments
159+
batch.value.push(fc.@fiber)
212160
end
161+
}
213162

214-
threads.each(&.join)
163+
Fiber::ExecutionContext.stress_test(
164+
n,
165+
iteration: ->(i : Int32) {
166+
r = runnables[i]
167+
batch = batches.to_unsafe + i
215168

216-
# must have dequeued each fiber exactly X times (no less, no more)
217-
fibers.each { |fc| fc.counter.should eq(increments) }
218-
end
219-
{% end %}
169+
if fiber = r.shift?
170+
execute.call(fiber, batch)
171+
return :next
172+
end
173+
174+
if fiber = queue.grab?(r, 1)
175+
reenqueue.call(batch)
176+
execute.call(fiber, batch)
177+
return :next
178+
end
179+
180+
# done?
181+
if fibers.all? { |fc| fc.counter >= increments }
182+
return :break
183+
end
184+
185+
reenqueue.call(batch)
186+
},
187+
publish: -> {
188+
# enqueue in batches of 5
189+
0.step(to: fibers.size - 1, by: 5) do |i|
190+
list = Fiber::List.new
191+
5.times { |j| list.push(fibers[i + j].@fiber) }
192+
queue.bulk_push(pointerof(list))
193+
Thread.sleep(10.nanoseconds) if i % 4 == 3
194+
end
195+
}
196+
)
197+
198+
# must have dequeued each fiber exactly X times (no less, no more)
199+
fibers.each { |fc| fc.counter.should eq(increments) }
200+
end
220201
end
221202
end

spec/std/fiber/execution_context/runnables_spec.cr

Lines changed: 48 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -216,68 +216,62 @@ describe Fiber::ExecutionContext::Runnables do
216216
end
217217

218218
global_queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
219-
ready = Thread::WaitGroup.new(n)
220219

221220
all_runnables = Array(Fiber::ExecutionContext::Runnables(16)).new(n) do
222221
Fiber::ExecutionContext::Runnables(16).new(global_queue)
223222
end
224223

225-
threads = Array.new(n) do |i|
226-
new_thread("RUN-#{i}") do
224+
all_randoms = Array.new(n) { Random.split }
225+
226+
execute = ->(fiber : Fiber, runnables : Fiber::ExecutionContext::Runnables(16)) {
227+
fc = fibers.find! { |x| x.@fiber == fiber }
228+
runnables.push(fiber) if fc.increment < increments
229+
}
230+
231+
Fiber::ExecutionContext.stress_test(
232+
n,
233+
iteration: ->(i : Int32) {
227234
runnables = all_runnables[i]
228-
slept = 0
229-
230-
execute = ->(fiber : Fiber) {
231-
fc = fibers.find! { |x| x.@fiber == fiber }
232-
runnables.push(fiber) if fc.increment < increments
233-
}
234-
235-
ready.done
236-
237-
loop do
238-
# dequeue from local queue
239-
if fiber = runnables.shift?
240-
execute.call(fiber)
241-
slept = 0
242-
next
243-
end
244-
245-
# steal from another queue
246-
while (r = all_runnables.sample) == runnables
247-
end
248-
if fiber = runnables.steal_from(r)
249-
execute.call(fiber)
250-
slept = 0
251-
next
252-
end
253-
254-
# dequeue from global queue
255-
if fiber = global_queue.grab?(runnables, n)
256-
execute.call(fiber)
257-
slept = 0
258-
next
259-
end
260-
261-
if slept >= 100
262-
break
263-
end
264-
265-
slept += 1
266-
Thread.sleep(1.nanosecond) # don't burn CPU
235+
random = all_randoms[i]
236+
237+
# dequeue from local queue
238+
if fiber = runnables.shift?
239+
execute.call(fiber, runnables)
240+
return :next
241+
end
242+
243+
# steal from another queue
244+
j = 0
245+
while (r = all_runnables.sample(random)) == runnables
246+
next if (j += 1) < 1000
247+
raise "FATAL: all_runnables.sample returned the local queue 1000 times!"
248+
end
249+
if fiber = runnables.steal_from(r)
250+
execute.call(fiber, runnables)
251+
return :next
252+
end
253+
254+
# dequeue from global queue
255+
if fiber = global_queue.grab?(runnables, n)
256+
execute.call(fiber, runnables)
257+
return :next
267258
end
268-
end
269-
end
270-
ready.wait
271-
272-
# enqueue in batches
273-
0.step(to: fibers.size - 1, by: 9) do |i|
274-
list = Fiber::List.new
275-
9.times { |j| list.push(fibers[i + j].@fiber) }
276-
global_queue.bulk_push(pointerof(list))
277-
Thread.sleep(10.nanoseconds) if i % 2 == 1
278-
end
279259

280-
threads.each(&.join)
260+
# done?
261+
if fibers.all? { |fc| fc.counter >= increments }
262+
return :break
263+
end
264+
},
265+
publish: -> {
266+
# enqueue in batches of 9
267+
0.step(to: fibers.size - 1, by: 9) do |i|
268+
list = Fiber::List.new
269+
9.times { |j| list.push(fibers[i + j].@fiber) }
270+
global_queue.bulk_push(pointerof(list))
271+
Thread.sleep(10.nanoseconds) if i % 2 == 1
272+
end
273+
},
274+
)
281275

282276
# must have dequeued each fiber exactly X times (no less, no more)
283277
fibers.each { |fc| fc.counter.should eq(increments) }

spec/std/fiber/execution_context/spec_helper.cr

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,52 @@ module Fiber::ExecutionContext
1919
@counter.get(:relaxed)
2020
end
2121
end
22+
23+
# Runs a multithreaded test by starting *n* threads, waiting for all the
24+
# threads to have been started, then runs the *publish* proc.
25+
#
26+
# Each thread calls *iteration* until the timeout is reached or the proc
27+
# returns `:break`; if the proc returns `:next` the thread goes immediately to
28+
# the next iteration, other it will ease the CPU before the next iteration.
29+
#
30+
# Returns after every thread has been joined.
31+
def self.stress_test(n, *, iteration, publish, name = "STRESS", timeout = 1.second)
32+
ready = Thread::WaitGroup.new(n)
33+
34+
threads = Array.new(n) do |i|
35+
new_thread("#{name}-#{i}") do
36+
ready.done
37+
38+
started = Time.monotonic
39+
attempts = 0
40+
41+
iter = 0
42+
while iter += 1
43+
if iter % 100 == 99 && (Time.monotonic - started) >= timeout
44+
# reached timeout: abort
45+
break
46+
end
47+
48+
case iteration.call(i)
49+
when :next
50+
attempts = 0
51+
next
52+
when :break
53+
break
54+
else
55+
# don't burn CPU
56+
attempts = Thread.delay(attempts)
57+
end
58+
end
59+
end
60+
end
61+
62+
ready.wait(timeout * 2) do
63+
raise "timeout while waiting for threads to be ready"
64+
end
65+
66+
publish.call
67+
68+
threads.each(&.join)
69+
end
2270
end

0 commit comments

Comments
 (0)