@@ -221,8 +221,12 @@ def register
221
221
222
222
public
223
223
def run ( logstash_queue )
224
- @runner_consumers = consumer_threads . times . map { |i | create_consumer ( "#{ client_id } -#{ i } " ) }
225
- @runner_threads = @runner_consumers . map { |consumer | thread_runner ( logstash_queue , consumer ) }
224
+ @runner_consumers = consumer_threads . times . map do |index |
225
+ create_consumer ( "#{ client_id } -#{ index } " )
226
+ end
227
+ @runner_threads = @runner_consumers . map . with_index do |consumer , index |
228
+ thread_runner ( logstash_queue , consumer , index )
229
+ end
226
230
@runner_threads . each { |t | t . join }
227
231
end # def run
228
232
@@ -237,9 +241,11 @@ def kafka_consumers
237
241
end
238
242
239
243
private
240
- def thread_runner ( logstash_queue , consumer )
244
+ def thread_runner ( logstash_queue , consumer , consumer_index )
245
+ consumer_identifier = "#{ client_id } -#{ consumer_index } "
241
246
Thread . new do
242
247
begin
248
+ logger . info ( "opening consumer #{ consumer_identifier } " )
243
249
unless @topics_pattern . nil?
244
250
nooplistener = org . apache . kafka . clients . consumer . internals . NoOpConsumerRebalanceListener . new
245
251
pattern = java . util . regex . Pattern . compile ( @topics_pattern )
@@ -271,8 +277,15 @@ def thread_runner(logstash_queue, consumer)
271
277
end
272
278
end
273
279
rescue org . apache . kafka . common . errors . WakeupException => e
274
- raise e if !stop?
280
+ unless stop?
281
+ logger . error ( "wakeup exception in consumer #{ consumer_identifier } : #{ e } " )
282
+ raise e
283
+ end
284
+ rescue => e
285
+ logger . error ( "uncaught exception in consumer #{ consumer_identifier } : #{ e } " )
286
+ raise e
275
287
ensure
288
+ logger . info ( "closing consumer #{ consumer_identifier } " )
276
289
consumer . close
277
290
end
278
291
end
0 commit comments