From f4a8d377476d052cc1798797125336ac835d6db7 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 11 Sep 2025 17:31:14 +0900 Subject: [PATCH] compressable: Handle zstd frames correctly This change is needed because, without this change, we just received unknown zstd frames like: forward: len=12984, head=54 d1 00 da cc 74 2e 4e 20 d6 2a ce 01 be f2 c2 | compressed(opt)=2 But, zstd specification always needs to attach the head of magic bytes like: forward: len=19835, head=28 b5 2f fd a0 c2 14 03 00 a4 d0 00 4a f7 30 39 | compressed(opt)=2 So, we need to attach the head of magic bytes `28 b5 2f fd` in zstd compressed payloads. This can be dumped with: ```diff diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 4c323bb0..f192d6e3 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -672,6 +672,9 @@ module Fluent::Plugin sock.write @sender.forward_header # array, size=3 sock.write tag.to_msgpack # 1. tag: String (str) chunk.open(compressed: @compress) do |chunk_io| + head = chunk_io.read(8) || ''.b + @log.info "debug: forward entries head", hex: head.bytes.map { |b| "%02x" % b }.join(' ') + chunk_io.rewind entries = [0xdb, chunk_io.size].pack('CN') sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32) IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es) ``` Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/compressable.rb | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb index 2ec2122931..94c595b1de 100644 --- a/lib/fluent/plugin/compressable.rb +++ b/lib/fluent/plugin/compressable.rb @@ -26,14 +26,29 @@ def compress(data, type: :gzip, **kwargs) io = output_io || StringIO.new if type == :gzip writer = Zlib::GzipWriter.new(io) + writer.write(data) + writer.finish + io.string elsif type == :zstd - writer = Zstd::StreamWriter.new(io) + sc = Zstd::StreamingCompress.new(**kwargs) + chunk_size = kwargs[:chunk_size] || (256 * 1024) + + if data.is_a?(String) + i = 0 + while i < data.bytesize + io << sc.compress(data.byteslice(i, chunk_size)) + i += chunk_size + end + else + data.each { |d| io << sc.compress(d) } + end + + # Need to close frame to prevent unknown frame descriptor errors + io << sc.finish + io.string else raise ArgumentError, "Unknown compression type: #{type}" end - writer.write(data) - writer.finish - output_io || io.string end # compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`