diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 9f3f32d..851bcf6 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -212,7 +212,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # `key`: A ByteBuffer containing the message key # `timestamp`: The timestamp of this message config :decorate_events, :validate => :boolean, :default => false - + config :manual_commit_interval_ms, :validate => :string public def register @@ -221,6 +221,7 @@ def register public def run(logstash_queue) + @manual_commit_interval_ms = manual_commit_interval_ms.to_i @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } @@ -247,6 +248,7 @@ def thread_runner(logstash_queue, consumer) else consumer.subscribe(topics); end + last_commit_time = timestamp_ms codec_instance = @codec.clone while !stop? records = consumer.poll(poll_timeout_ms) @@ -266,8 +268,9 @@ def thread_runner(logstash_queue, consumer) end end # Manual offset commit - if @enable_auto_commit == "false" + if has_to_commit?(last_commit_time) consumer.commitSync + last_commit_time = timestamp_ms end end rescue org.apache.kafka.common.errors.WakeupException => e @@ -354,4 +357,16 @@ def set_sasl_config(props) props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? end + + def timestamp_ms + (Time.now.to_f * 1000).to_i + end + + def has_to_commit?(last_commit_time) + # If auto_commit is enable we just leave the commit to the client library on poll and close actions + return false if @enable_auto_commit == "false" + + # If auto_commit is disable, we need to commit, we will do it depending on the manual_commit_interval option + @manual_commit_interval_ms <= 0 || (last_commit_time + @manual_commit_interval_ms) < timestamp_ms + end end #class LogStash::Inputs::Kafka