Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some messages can be dropped #7

Open
mixazya opened this issue Dec 26, 2017 · 2 comments
Open

Some messages can be dropped #7

mixazya opened this issue Dec 26, 2017 · 2 comments

Comments

@mixazya
Copy link

mixazya commented Dec 26, 2017

Issue happend when data coming compressed:
Module used lib/fluent/plugin/socket_util.rb
class UdpHandler
which chomping incoming data:

      def on_readable
        msg, addr = @io.recvfrom_nonblock(@body_size_limit)
        msg.chomp!
        @callback.call(msg, addr)
      rescue => e
        @log.error "unexpected error", error: e, error_class: e.class
      end

So when you receiving compressed packet then ending with '0d' (CR), this symbol will be removed and compressed stream will be damaged.
Testing with compressing of zlib, after symbol was removed, zlib failed to decode and show following message in logs: 2017-12-26 13:28:54 +1000 [warn]: Gelfd failed to parse a message error="Failed to decode data: buffer error "

One way of resolving:

--- /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-input-gelf-0.2.0/lib/fluent/plugin/in_gelf.rb.bkp	2017-12-26 13:16:37.532566124 +1000
+++ /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-input-gelf-0.2.0/lib/fluent/plugin/in_gelf.rb	2017-12-26 13:29:18.422364466 +1000
@@ -7,6 +7,21 @@
 require 'fluent/input'
 
 module Fluent
+#mixaz patch
+  class UdpHandler < SocketUtil::UdpHandler
+      def initialize(io, log, body_size_limit, callback)
+       super
+      end
+      def on_readable
+        msg, addr = @io.recvfrom_nonblock(@body_size_limit)
+        @callback.call(msg, addr)
+      rescue => e
+        @log.error "unexpected error", error: e, error_class: e.class
+      end
+  end
+#mixaz patch end
+
   class GelfInput < Fluent::Input
     Fluent::Plugin.register_input('gelf', self)
 
@@ -105,7 +120,10 @@
       else
         @usock = SocketUtil.create_udp_socket(@bind)
         @usock.bind(@bind, @port)
-        SocketUtil::UdpHandler.new(@usock, log, 8192, callback)
+#mixaz patch
+        #SocketUtil::UdpHandler.new(@usock, log, 8192, callback)
+        UdpHandler.new(@usock, log, 8192, callback)
+#mixaz patch end
       end
     end
@wdoekes
Copy link
Contributor

wdoekes commented Apr 12, 2022

@wdoekes
Copy link
Contributor

wdoekes commented Apr 12, 2022

(A bug fix that is necessary, is #16 though. Also causing messages to get dropped.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants