From 831f2b9759fec3cc5ed1c9c40de893908bb3a6b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0mucr?= Date: Tue, 28 Nov 2017 10:39:02 +0100 Subject: [PATCH 1/2] Do not silently discard undelivered messages The plugin no longer discards messages which it fails to send if the server disconnects and sends some content prior to the disconnection. Example situation before the fix: 1. A connection is established between a server and the plugin. 2. The plugin delivers some messages to the server. 2.1) It may or may not read something that it received from the server. 2.2) It sends the payload to the server. 3. The server sends some content back and disconnects. 4. The plugin tries to deliver some other messages to the server. 4.1) It tries to read (if r.any?) and expects to get an EOFError in case the server just died. 4.2) No error is thrown as some content just arrived from the server. It is discarded. 4.3) The first payload is written to the socket, no error being thrown. 4.4) The next payload finally fails with EOFError upon reading from the socket and an retry follows. This commit fixes the 4.1 and 4.2 phases. The reading is now performed repeatedly while there's still something to read left so that we don't miss the EOFError exception. --- lib/logstash/outputs/tcp.rb | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index edf7d26..fa23d72 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -150,10 +150,14 @@ def register begin client_socket = connect unless client_socket r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) - # don't expect any reads, but a readable socket might - # mean the remote end closed, so read it and throw it away. - # we'll get an EOFError if it happens. - client_socket.sysread(16384) if r.any? + loop do + break if !r.any? + # don't expect any reads, but a readable socket might + # mean the remote end closed, so read it and throw it away. + # we'll get an EOFError if it happens. + client_socket.sysread(16384) + r = IO.select([client_socket]) + end # Now send the payload client_socket.syswrite(payload) if w.any? From 1dc9a3d6d25b47fa582cb326973d4780957196ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0mucr?= Date: Thu, 30 Nov 2017 14:53:21 +0100 Subject: [PATCH 2/2] Updated the file to be compatible with 6.0.0 --- lib/logstash/outputs/tcp.rb | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index fa23d72..dc7adaf 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -51,8 +51,6 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base # SSL key passphrase config :ssl_key_passphrase, :validate => :password, :default => nil - config :message_format, :validate => :string, :obsolete => "This setting is obsolete. The event will be formatted according to the codec used" - class Client public def initialize(socket, logger) @@ -85,8 +83,12 @@ def setup_ssl require "openssl" @ssl_context = OpenSSL::SSL::SSLContext.new - @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) - @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) + if @ssl_cert + @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) + if @ssl_key + @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) + end + end if @ssl_verify @cert_store = OpenSSL::X509::Store.new # Load the system default certificate path to the store @@ -150,8 +152,8 @@ def register begin client_socket = connect unless client_socket r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) - loop do - break if !r.any? + # Read everything first + while r.any? do # don't expect any reads, but a readable socket might # mean the remote end closed, so read it and throw it away. # we'll get an EOFError if it happens. @@ -162,7 +164,7 @@ def register # Now send the payload client_socket.syswrite(payload) if w.any? rescue => e - @logger.warn("tcp output exception", :host => @host, :port => @port, + @logger.warn("tcp output exception, will retry", :host => @host, :port => @port, :exception => e, :backtrace => e.backtrace) client_socket.close rescue nil client_socket = nil @@ -204,4 +206,4 @@ def server? def receive(event) @codec.encode(event) end # def receive -end # class LogStash::Outputs::Tcp +end # class LogStash::Outputs::Tcp \ No newline at end of file