diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 3deec46..bed0129 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -109,6 +109,10 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # `session.timeout.ms`, but typically should be set no higher than 1/3 of that value. # It can be adjusted even lower to control the expected time for normal rebalances. config :heartbeat_interval_ms, :validate => :string + # If the producer uses transactions, we can make the client transaction aware by setting the isolation + # level to "read_committed": this value causes the client to wait to read transactional messages until + # the associated transaction has been committed. + config :isolation_level, :validate => ["read_committed", "read_uncommitted"], :default => "read_uncommitted" # Java Class used to deserialize the record's key config :key_deserializer_class, :validate => :string, :default => "org.apache.kafka.common.serialization.StringDeserializer" # The maximum delay between invocations of poll() when using consumer group management. This places @@ -300,6 +304,7 @@ def create_consumer(client_id) props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes) unless fetch_min_bytes.nil? props.put(kafka::GROUP_ID_CONFIG, group_id) props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms) unless heartbeat_interval_ms.nil? + props.put(kafka::ISOLATION_LEVEL_CONFIG, isolation_level) props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class) props.put(kafka::MAX_PARTITION_FETCH_BYTES_CONFIG, max_partition_fetch_bytes) unless max_partition_fetch_bytes.nil? props.put(kafka::MAX_POLL_RECORDS_CONFIG, max_poll_records) unless max_poll_records.nil?