From 59881221a94b64b83c1214e9a613ce9c3718cc90 Mon Sep 17 00:00:00 2001 From: gnuhpc Date: Thu, 10 Aug 2017 13:35:46 +0800 Subject: [PATCH] Add charset conversion feature. --- lib/logstash/inputs/kafka.rb | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index c235ff0..ec04199 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -1,5 +1,6 @@ require 'logstash/namespace' require 'logstash/inputs/base' +require "logstash/util/charset" require 'stud/interval' require 'java' require 'logstash-input-kafka_jars.rb' @@ -214,9 +215,18 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base config :decorate_events, :validate => :boolean, :default => false + # Option for charset conversion, default is UTF-8, especially useful when after one codec applied. + config :charset, :validate => ::Encoding.name_list, :default => "UTF-8" + + # Option for what fields need to be converted to the charset specified by option charset. Default is an empty array. + # Once a unexist field set, an error log will be writen into the log. + config :charset_field, :validate => :array, :default => [] + public def register @runner_threads = [] + @converter = LogStash::Util::Charset.new(@charset) + @converter.logger = @logger end # def register public @@ -253,6 +263,15 @@ def thread_runner(logstash_queue, consumer) for record in records do codec_instance.decode(record.value.to_s) do |event| decorate(event) + unless @charset_field.empty? + @charset_field.each do |e| + unless event.get(e).nil? + event.set(e,@converter.convert(event.get(e))) + else + @logger.error("No such field:[" + e + "]. Skip it for charset conversion.") + end + end + end if @decorate_events event.set("[@metadata][kafka][topic]", record.topic) event.set("[@metadata][kafka][consumer_group]", @group_id)