Skip to content

Commit 726b142

Browse files
start inputs only when all WorkerLoop are fully initialized (#11492)
1 parent d179293 commit 726b142

File tree

1 file changed

+35
-5
lines changed

1 file changed

+35
-5
lines changed

logstash-core/lib/logstash/java_pipeline.rb

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,18 +244,28 @@ def start_workers
244244

245245
filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
246246

247-
pipeline_workers.times do |t|
247+
# First launch WorkerLoop initialization in separate threads which concurrently
248+
# compiles and initializes the worker pipelines
249+
250+
worker_loops = pipeline_workers.times
251+
.map { Thread.new { init_worker_loop } }
252+
.map(&:value)
253+
254+
fail("Some worker(s) were not correctly initialized") if worker_loops.any?{|v| v.nil?}
255+
256+
# Once all WorkerLoop have been initialized run them in separate threads
257+
258+
worker_loops.each_with_index do |worker_loop, t|
248259
thread = Thread.new do
249260
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
250261
ThreadContext.put("pipeline.id", pipeline_id)
251-
org.logstash.execution.WorkerLoop.new(
252-
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
253-
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
262+
worker_loop.run
254263
end
255264
@worker_threads << thread
256265
end
257266

258-
# inputs should be started last, after all workers
267+
# Finally inputs should be started last, after all workers have been initialized and started
268+
259269
begin
260270
start_inputs
261271
rescue => e
@@ -470,6 +480,26 @@ def inspect
470480

471481
private
472482

483+
# @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception
484+
def init_worker_loop
485+
begin
486+
org.logstash.execution.WorkerLoop.new(
487+
lir_execution,
488+
filter_queue_client,
489+
@events_filtered,
490+
@events_consumed,
491+
@flushRequested,
492+
@flushing,
493+
@shutdownRequested,
494+
@drain_queue)
495+
rescue => e
496+
@logger.error(
497+
"Worker loop initialization error",
498+
default_logging_keys(:error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n")))
499+
nil
500+
end
501+
end
502+
473503
def maybe_setup_out_plugins
474504
if @outputs_registered.make_true
475505
register_plugins(outputs)

0 commit comments

Comments
 (0)