Skip to content

Commit f78a485

Browse files
author
dsebban
committed
Add LZ4 kafka support
1 parent ed2f3e1 commit f78a485

File tree

7 files changed

+1083
-2
lines changed

7 files changed

+1083
-2
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ lazy val kafka =
189189
name := "protocol-kafka"
190190
, libraryDependencies ++= Seq(
191191
"org.xerial.snappy" % "snappy-java" % "1.1.2.1" // for supporting a Snappy compression of message sets
192+
, "org.lz4" % "lz4-java" % "1.4.1" // for supporting a LZ4 compression of message sets
192193
, "org.apache.kafka" %% "kafka" % "0.10.2.0" % "test"
193194
)
194195
).dependsOn(
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.record;
18+
19+
import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
20+
import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
21+
import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
22+
23+
import java.io.FilterInputStream;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
27+
import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
28+
import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
29+
import org.apache.kafka.common.utils.ByteUtils;
30+
31+
import net.jpountz.lz4.LZ4Exception;
32+
import net.jpountz.lz4.LZ4Factory;
33+
import net.jpountz.lz4.LZ4SafeDecompressor;
34+
import net.jpountz.xxhash.XXHash32;
35+
import net.jpountz.xxhash.XXHashFactory;
36+
37+
/**
38+
* A partial implementation of the v1.5.1 LZ4 Frame format.
39+
*
40+
* @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
41+
*/
42+
public final class KafkaLZ4BlockInputStream extends FilterInputStream {
43+
44+
public static final String PREMATURE_EOS = "Stream ended prematurely";
45+
public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
46+
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
47+
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
48+
49+
private final LZ4SafeDecompressor decompressor;
50+
private final XXHash32 checksum;
51+
private final byte[] buffer;
52+
private final byte[] compressedBuffer;
53+
private final int maxBlockSize;
54+
private final boolean ignoreFlagDescriptorChecksum;
55+
private FLG flg;
56+
private BD bd;
57+
private int bufferOffset;
58+
private int bufferSize;
59+
private boolean finished;
60+
61+
/**
62+
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
63+
*
64+
* @param in The stream to decompress
65+
* @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
66+
* @throws IOException
67+
*/
68+
public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
69+
super(in);
70+
decompressor = LZ4Factory.fastestInstance().safeDecompressor();
71+
checksum = XXHashFactory.fastestInstance().hash32();
72+
this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
73+
readHeader();
74+
maxBlockSize = bd.getBlockMaximumSize();
75+
buffer = new byte[maxBlockSize];
76+
compressedBuffer = new byte[maxBlockSize];
77+
bufferOffset = 0;
78+
bufferSize = 0;
79+
finished = false;
80+
}
81+
82+
/**
83+
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
84+
*
85+
* @param in The stream to decompress
86+
* @throws IOException
87+
*/
88+
public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
89+
this(in, false);
90+
}
91+
92+
/**
93+
* Check whether KafkaLZ4BlockInputStream is configured to ignore the
94+
* Frame Descriptor checksum, which is useful for compatibility with
95+
* old client implementations that use incorrect checksum calculations.
96+
*/
97+
public boolean ignoreFlagDescriptorChecksum() {
98+
return this.ignoreFlagDescriptorChecksum;
99+
}
100+
101+
/**
102+
* Reads the magic number and frame descriptor from the underlying {@link InputStream}.
103+
*
104+
* @throws IOException
105+
*/
106+
private void readHeader() throws IOException {
107+
byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
108+
109+
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
110+
int headerOffset = 6;
111+
if (in.read(header, 0, headerOffset) != headerOffset) {
112+
throw new IOException(PREMATURE_EOS);
113+
}
114+
115+
if (MAGIC != ByteUtils.readUnsignedIntLE(header, headerOffset - 6)) {
116+
throw new IOException(NOT_SUPPORTED);
117+
}
118+
flg = FLG.fromByte(header[headerOffset - 2]);
119+
bd = BD.fromByte(header[headerOffset - 1]);
120+
121+
if (flg.isContentSizeSet()) {
122+
if (in.read(header, headerOffset, 8) != 8)
123+
throw new IOException(PREMATURE_EOS);
124+
headerOffset += 8;
125+
}
126+
127+
// Final byte of Frame Descriptor is HC checksum
128+
header[headerOffset++] = (byte) in.read();
129+
130+
// Old implementations produced incorrect HC checksums
131+
if (ignoreFlagDescriptorChecksum)
132+
return;
133+
134+
int offset = 4;
135+
int len = headerOffset - offset - 1; // dont include magic bytes or HC
136+
byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
137+
if (hash != header[headerOffset - 1])
138+
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
139+
}
140+
141+
/**
142+
* Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the
143+
* result to a buffer.
144+
*
145+
* @throws IOException
146+
*/
147+
private void readBlock() throws IOException {
148+
int blockSize = ByteUtils.readUnsignedIntLE(in);
149+
150+
// Check for EndMark
151+
if (blockSize == 0) {
152+
finished = true;
153+
if (flg.isContentChecksumSet())
154+
ByteUtils.readUnsignedIntLE(in); // TODO: verify this content checksum
155+
return;
156+
} else if (blockSize > maxBlockSize) {
157+
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
158+
}
159+
160+
boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
161+
byte[] bufferToRead;
162+
if (compressed) {
163+
bufferToRead = compressedBuffer;
164+
} else {
165+
blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
166+
bufferToRead = buffer;
167+
bufferSize = blockSize;
168+
}
169+
170+
if (in.read(bufferToRead, 0, blockSize) != blockSize) {
171+
throw new IOException(PREMATURE_EOS);
172+
}
173+
174+
// verify checksum
175+
if (flg.isBlockChecksumSet() && ByteUtils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
176+
throw new IOException(BLOCK_HASH_MISMATCH);
177+
}
178+
179+
if (compressed) {
180+
try {
181+
bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
182+
} catch (LZ4Exception e) {
183+
throw new IOException(e);
184+
}
185+
}
186+
187+
bufferOffset = 0;
188+
}
189+
190+
@Override
191+
public int read() throws IOException {
192+
if (finished) {
193+
return -1;
194+
}
195+
if (available() == 0) {
196+
readBlock();
197+
}
198+
if (finished) {
199+
return -1;
200+
}
201+
202+
return buffer[bufferOffset++] & 0xFF;
203+
}
204+
205+
@Override
206+
public int read(byte[] b, int off, int len) throws IOException {
207+
net.jpountz.util.SafeUtils.checkRange(b, off, len);
208+
if (finished) {
209+
return -1;
210+
}
211+
if (available() == 0) {
212+
readBlock();
213+
}
214+
if (finished) {
215+
return -1;
216+
}
217+
len = Math.min(len, available());
218+
System.arraycopy(buffer, bufferOffset, b, off, len);
219+
bufferOffset += len;
220+
return len;
221+
}
222+
223+
@Override
224+
public long skip(long n) throws IOException {
225+
if (finished) {
226+
return 0;
227+
}
228+
if (available() == 0) {
229+
readBlock();
230+
}
231+
if (finished) {
232+
return 0;
233+
}
234+
n = Math.min(n, available());
235+
bufferOffset += n;
236+
return n;
237+
}
238+
239+
@Override
240+
public int available() throws IOException {
241+
return bufferSize - bufferOffset;
242+
}
243+
244+
@Override
245+
public void close() throws IOException {
246+
in.close();
247+
}
248+
249+
@Override
250+
public synchronized void mark(int readlimit) {
251+
throw new RuntimeException("mark not supported");
252+
}
253+
254+
@Override
255+
public synchronized void reset() throws IOException {
256+
throw new RuntimeException("reset not supported");
257+
}
258+
259+
@Override
260+
public boolean markSupported() {
261+
return false;
262+
}
263+
264+
}

0 commit comments

Comments
 (0)