From 865d5430154dc8aa57ab7e9deece616bfc9b33e1 Mon Sep 17 00:00:00 2001 From: Abdul Haseeb Hussain Date: Thu, 7 Dec 2017 22:52:42 +0000 Subject: [PATCH 1/5] Fixed incorrect millisecond to second conversion for retry_backoff_ms --- lib/logstash/outputs/kafka.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index e4d29f7..03d14a9 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -285,7 +285,7 @@ def retrying_send(batch) # Otherwise, retry with any failed transmissions batch = failures - delay = 1.0 / @retry_backoff_ms + delay = @retry_backoff_ms / 1000.0 logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, :failures => failures.size, :sleep => delay); sleep(delay) From b9d852dd64e19ba04b4d2ec49c47346e19cf3abb Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Fri, 8 Feb 2019 12:59:40 -0500 Subject: [PATCH 2/5] Remove unnecessary sleep after exhausted retries Also fixes up some issues with logging, where the batch size was calculated incorrectly. Fixes #216 --- lib/logstash/outputs/kafka.rb | 17 ++++++----- spec/unit/outputs/kafka_spec.rb | 52 ++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 03d14a9..c5e1428 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -234,7 +234,7 @@ def multi_receive(events) end def retrying_send(batch) - remaining = @retries; + remaining = @retries while batch.any? if !remaining.nil? @@ -284,13 +284,16 @@ def retrying_send(batch) break if failures.empty? # Otherwise, retry with any failed transmissions - batch = failures - delay = @retry_backoff_ms / 1000.0 - logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, - :failures => failures.size, :sleep => delay); - sleep(delay) + if remaining != nil && remaining < 0 + logger.info("Sending batch to Kafka failed.", :batch_size => batch.size,:failures => failures.size) + else + delay = @retry_backoff_ms / 1000.0 + logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, + :failures => failures.size, :sleep => delay) + batch = failures + sleep(delay) + end end - end def close diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb index 7041a45..2d99a05 100644 --- a/spec/unit/outputs/kafka_spec.rb +++ b/spec/unit/outputs/kafka_spec.rb @@ -49,7 +49,7 @@ kafka.register kafka.multi_receive([event]) end - + it 'should raise config error when truststore location is not set and ssl is enabled' do kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL")) expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/) @@ -120,6 +120,41 @@ end end + context 'when retries is 0' do + let(:retries) { 0 } + let(:max_sends) { 1 } + + it "should should only send once" do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + kafka.register + kafka.multi_receive([event]) + end + + it 'should not sleep' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .once + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).not_to receive(:sleep).with(anything) + kafka.register + kafka.multi_receive([event]) + end + end + context "and when retries is set by the user" do let(:retries) { (rand * 10).to_i } let(:max_sends) { retries + 1 } @@ -137,6 +172,21 @@ kafka.register kafka.multi_receive([event]) end + + it 'should only sleep retries number of times' do + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + .at_most(max_sends) + .and_wrap_original do |m, *args| + # Always fail. + future = java.util.concurrent.FutureTask.new { raise "Failed" } + future.run + future + end + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries)) + expect(kafka).to receive(:sleep).exactly(retries).times + kafka.register + kafka.multi_receive([event]) + end end end end From d06346e32a0f13adb2223f4ba501b0ee7c713d6f Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Fri, 8 Feb 2019 14:15:16 -0500 Subject: [PATCH 3/5] Update .travis.yml build targets --- lib/logstash/outputs/kafka.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index c5e1428..6aa5ef9 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -284,12 +284,11 @@ def retrying_send(batch) break if failures.empty? # Otherwise, retry with any failed transmissions - if remaining != nil && remaining < 0 - logger.info("Sending batch to Kafka failed.", :batch_size => batch.size,:failures => failures.size) - else + if remaining.nil? || remaining >= 0 delay = @retry_backoff_ms / 1000.0 logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, - :failures => failures.size, :sleep => delay) + :failures => failures.size, + :sleep => delay) batch = failures sleep(delay) end From 6d3dd874b2e5b20f15b91d55b0e4564cb88654eb Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 7 Feb 2018 10:33:13 -0500 Subject: [PATCH 4/5] Log Kafka send errors as warn Log errors encountered when sending messages to Kafka at warn, rather than debug, to improve the visibility of errors such as 'RecordTooLargeException.' Fixes #177 Fixes #179 --- lib/logstash/outputs/kafka.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 6aa5ef9..02dd9a8 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -275,7 +275,7 @@ def retrying_send(batch) result = future.get() rescue => e # TODO(sissel): Add metric to count failures, possibly by exception type. - logger.debug? && logger.debug("KafkaProducer.send() failed: #{e}", :exception => e); + logger.warn("KafkaProducer.send() failed: #{e}", :exception => e) failures << batch[i] end end From 6ac4b9822b5cf6238a9035a46c4b6aef58ecabd2 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 11 Feb 2019 11:24:29 -0500 Subject: [PATCH 5/5] Backport of fixes from more recent branches: Fixed incorrect millisecond to second conversion for retry_backoff_ms #216 Fixed unnecessary sleep after exhausted retries #166 Changed Kafka send errors to log as warn #179 --- .travis.yml | 6 ++++-- CHANGELOG.md | 6 ++++++ kafka_test_setup.sh | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index ce6e007..0592b1e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,8 +6,10 @@ jdk: oraclejdk8 rvm: - jruby-1.7.25 env: - - KAFKA_VERSION=0.10.0.1 -before_install: ./kafka_test_setup.sh + - KAFKA_VERSION=0.10.2.2 +before_install: + - gem install bundler -v '< 2' + - ./kafka_test_setup.sh before_script: - bundle exec rake vendor script: bundle exec rspec && bundle exec rspec --tag integration diff --git a/CHANGELOG.md b/CHANGELOG.md index c23a609..3fcb468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 5.1.12 + - Backport of fixes from more recent branches: + - Fixed incorrect millisecond to second conversion for retry_backoff_ms [#216](https://github.com/logstash-plugins/logstash-output-kafka/pull/216) + - Fixed unnecessary sleep after exhausted retries [#166](https://github.com/logstash-plugins/logstash-output-kafka/pull/166) + - Changed Kafka send errors to log as warn [#179](https://github.com/logstash-plugins/logstash-output-kafka/pull/179) + ## 5.1.11 - Bugfix: Sends are now retried until successful. Previously, failed transmissions to Kafka could have been lost by the KafkaProducer library. Now we verify transmission explicitly. diff --git a/kafka_test_setup.sh b/kafka_test_setup.sh index 771fb17..b9abdfa 100755 --- a/kafka_test_setup.sh +++ b/kafka_test_setup.sh @@ -5,7 +5,7 @@ set -ex if [ -n "${KAFKA_VERSION+1}" ]; then echo "KAFKA_VERSION is $KAFKA_VERSION" else - KAFKA_VERSION=0.10.1.0 + KAFKA_VERSION=0.10.2.2 fi echo "Downloading Kafka version $KAFKA_VERSION"