Skip to content

Commit 5ea72dd

Browse files
authored
Separate "not terminated" pipeline state into "running" and "loading". (#12716)
This change fix the behavior of considering as "running" also pipelines that are still in "loading", both "loading" and "running" is considered as "not terminated". Fixed a flakyness in tests due to different ways to looks at the same thing: pipeline status. The pipeline status is determined by both `pipeline.running?` and by `agent.pipelines_running`. The first checks for an atomic boolean in pipeline object, the second check for the status in PipelineRegistry. Fixes #12190 (cherry picked from commit 79d8f47)
1 parent 854a58b commit 5ea72dd

File tree

4 files changed

+58
-4
lines changed

4 files changed

+58
-4
lines changed

logstash-core/lib/logstash/agent.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,10 @@ def running_pipelines
288288
@pipelines_registry.running_pipelines
289289
end
290290

291+
def loading_pipelines
292+
@pipelines_registry.loading_pipelines
293+
end
294+
291295
def non_running_pipelines
292296
@pipelines_registry.non_running_pipelines
293297
end

logstash-core/lib/logstash/pipelines_registry.rb

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,19 @@ def terminated?
3535
end
3636
end
3737

38+
def running?
39+
@lock.synchronize do
40+
# not terminated and not loading
41+
@loading.false? && !@pipeline.finished_execution?
42+
end
43+
end
44+
45+
def loading?
46+
@lock.synchronize do
47+
@loading.true?
48+
end
49+
end
50+
3851
def set_loading(is_loading)
3952
@lock.synchronize do
4053
@loading.value = is_loading
@@ -253,7 +266,11 @@ def empty?
253266

254267
# @return [Hash{String=>Pipeline}]
255268
def running_pipelines
256-
select_pipelines { |state| !state.terminated? }
269+
select_pipelines { |state| state.running? }
270+
end
271+
272+
def loading_pipelines
273+
select_pipelines { |state| state.loading? }
257274
end
258275

259276
# @return [Hash{String=>Pipeline}]

logstash-core/spec/logstash/pipelines_registry_spec.rb

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,20 +210,22 @@
210210
end
211211
end
212212

213-
# make sure we entered the block executioin
213+
# make sure we entered the block execution
214214
wait(10).for {in_block.true?}.to be_truthy
215215

216216
# at this point the thread is suspended waiting on queue
217217

218218
# since in reloading state, running_pipelines is not empty
219-
expect(subject.running_pipelines).not_to be_empty
219+
expect(subject.running_pipelines).to be_empty
220+
expect(subject.loading_pipelines).not_to be_empty
220221

221222
# unblock thread
222223
queue.push(:dummy)
223224
thread.join
224225

225226
# 3rd call: finished_execution? is true
226227
expect(subject.running_pipelines).to be_empty
228+
expect(subject.loading_pipelines).to be_empty
227229
end
228230
end
229231
end
@@ -271,6 +273,33 @@
271273
end
272274

273275
context "pipelines collections" do
276+
context "with a reloading pipeline" do
277+
before :each do
278+
subject.create_pipeline(pipeline_id, pipeline) { true }
279+
# expect(pipeline).to receive(:finished_execution?).and_return(false)
280+
in_block = Concurrent::AtomicBoolean.new(false)
281+
queue = Queue.new # threadsafe queue
282+
thread = Thread.new(in_block) do |in_block|
283+
subject.reload_pipeline(pipeline_id) do
284+
in_block.make_true
285+
# sleep(3) # simulate a long loading pipeline
286+
queue.pop
287+
end
288+
end
289+
# make sure we entered the block execution
290+
wait(10).for {in_block.true?}.to be_truthy
291+
end
292+
293+
it "should not find running pipelines" do
294+
expect(subject.running_pipelines).to be_empty
295+
end
296+
297+
it "should not find non_running pipelines" do
298+
# non running pipelines are those terminated
299+
expect(subject.non_running_pipelines).to be_empty
300+
end
301+
end
302+
274303
context "with a non terminated pipelines" do
275304
before :each do
276305
subject.create_pipeline(pipeline_id, pipeline) { true }

logstash-core/spec/support/matchers.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def all_instance_methods_implemented?
9595
expect(pipeline.running?).to be_truthy
9696
end
9797
expect(pipeline.config_str).to eq(pipeline_config.config_string)
98-
expect(agent.running_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
98+
expect(agent.running_pipelines.keys.map(&:to_s) + agent.loading_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
9999
end
100100

101101
failure_message do |agent|
@@ -108,6 +108,10 @@ def all_instance_methods_implemented?
108108
"Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines but its not running"
109109
elsif pipeline.config_str != pipeline_config.config_string
110110
"Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines and running, but the config_string doesn't match,\nExpected:\n#{pipeline_config.config_string}\n\ngot:\n#{pipeline.config_str}"
111+
elsif agent.running_pipelines.keys.map(&:to_s).include?(pipeline_config.pipeline_id.to_s)
112+
"Found '#{pipeline_config.pipeline_id.to_s}' in running but not included in the list of agent.running_pipelines or agent.loading_pipelines"
113+
else
114+
"Unrecognized error condition, probably you missed to track properly a newly added expect in :have_running_pipeline?"
111115
end
112116
end
113117
end

0 commit comments

Comments
 (0)