From f78a485199a6f7516bc271fffa092fb307ae44a0 Mon Sep 17 00:00:00 2001 From: dsebban Date: Tue, 14 Aug 2018 15:45:53 +0300 Subject: [PATCH] Add LZ4 kafka support --- build.sbt | 1 + .../record/KafkaLZ4BlockInputStream.java | 264 +++++++++++ .../record/KafkaLZ4BlockOutputStream.java | 423 ++++++++++++++++++ .../apache/kafka/common/utils/ByteUtils.java | 334 ++++++++++++++ .../kafka/codec/MessageSetCodec.scala | 3 +- .../protocol/kafka/codec/compression.scala | 11 + .../kafka/codec/MessageSetCodecSpec.scala | 49 +- 7 files changed, 1083 insertions(+), 2 deletions(-) create mode 100644 kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java create mode 100644 kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java create mode 100644 kafka/src/main/java/org/apache/kafka/common/utils/ByteUtils.java diff --git a/build.sbt b/build.sbt index 156bd6d..950c570 100644 --- a/build.sbt +++ b/build.sbt @@ -189,6 +189,7 @@ lazy val kafka = name := "protocol-kafka" , libraryDependencies ++= Seq( "org.xerial.snappy" % "snappy-java" % "1.1.2.1" // for supporting a Snappy compression of message sets + , "org.lz4" % "lz4-java" % "1.4.1" // for supporting a LZ4 compression of message sets , "org.apache.kafka" %% "kafka" % "0.10.2.0" % "test" ) ).dependsOn( diff --git a/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java new file mode 100644 index 0000000..048551f --- /dev/null +++ b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; +import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH; +import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD; +import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG; +import org.apache.kafka.common.utils.ByteUtils; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4SafeDecompressor; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +/** + * A partial implementation of the v1.5.1 LZ4 Frame format. + * + * @see LZ4 Frame Format + */ +public final class KafkaLZ4BlockInputStream extends FilterInputStream { + + public static final String PREMATURE_EOS = "Stream ended prematurely"; + public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)"; + public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch"; + public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted"; + + private final LZ4SafeDecompressor decompressor; + private final XXHash32 checksum; + private final byte[] buffer; + private final byte[] compressedBuffer; + private final int maxBlockSize; + private final boolean ignoreFlagDescriptorChecksum; + private FLG flg; + private BD bd; + private int bufferOffset; + private int bufferSize; + private boolean finished; + + /** + * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. + * + * @param in The stream to decompress + * @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte + * @throws IOException + */ + public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException { + super(in); + decompressor = LZ4Factory.fastestInstance().safeDecompressor(); + checksum = XXHashFactory.fastestInstance().hash32(); + this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum; + readHeader(); + maxBlockSize = bd.getBlockMaximumSize(); + buffer = new byte[maxBlockSize]; + compressedBuffer = new byte[maxBlockSize]; + bufferOffset = 0; + bufferSize = 0; + finished = false; + } + + /** + * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. + * + * @param in The stream to decompress + * @throws IOException + */ + public KafkaLZ4BlockInputStream(InputStream in) throws IOException { + this(in, false); + } + + /** + * Check whether KafkaLZ4BlockInputStream is configured to ignore the + * Frame Descriptor checksum, which is useful for compatibility with + * old client implementations that use incorrect checksum calculations. + */ + public boolean ignoreFlagDescriptorChecksum() { + return this.ignoreFlagDescriptorChecksum; + } + + /** + * Reads the magic number and frame descriptor from the underlying {@link InputStream}. + * + * @throws IOException + */ + private void readHeader() throws IOException { + byte[] header = new byte[LZ4_MAX_HEADER_LENGTH]; + + // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags + int headerOffset = 6; + if (in.read(header, 0, headerOffset) != headerOffset) { + throw new IOException(PREMATURE_EOS); + } + + if (MAGIC != ByteUtils.readUnsignedIntLE(header, headerOffset - 6)) { + throw new IOException(NOT_SUPPORTED); + } + flg = FLG.fromByte(header[headerOffset - 2]); + bd = BD.fromByte(header[headerOffset - 1]); + + if (flg.isContentSizeSet()) { + if (in.read(header, headerOffset, 8) != 8) + throw new IOException(PREMATURE_EOS); + headerOffset += 8; + } + + // Final byte of Frame Descriptor is HC checksum + header[headerOffset++] = (byte) in.read(); + + // Old implementations produced incorrect HC checksums + if (ignoreFlagDescriptorChecksum) + return; + + int offset = 4; + int len = headerOffset - offset - 1; // dont include magic bytes or HC + byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF); + if (hash != header[headerOffset - 1]) + throw new IOException(DESCRIPTOR_HASH_MISMATCH); + } + + /** + * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the + * result to a buffer. + * + * @throws IOException + */ + private void readBlock() throws IOException { + int blockSize = ByteUtils.readUnsignedIntLE(in); + + // Check for EndMark + if (blockSize == 0) { + finished = true; + if (flg.isContentChecksumSet()) + ByteUtils.readUnsignedIntLE(in); // TODO: verify this content checksum + return; + } else if (blockSize > maxBlockSize) { + throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize)); + } + + boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0; + byte[] bufferToRead; + if (compressed) { + bufferToRead = compressedBuffer; + } else { + blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK; + bufferToRead = buffer; + bufferSize = blockSize; + } + + if (in.read(bufferToRead, 0, blockSize) != blockSize) { + throw new IOException(PREMATURE_EOS); + } + + // verify checksum + if (flg.isBlockChecksumSet() && ByteUtils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { + throw new IOException(BLOCK_HASH_MISMATCH); + } + + if (compressed) { + try { + bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize); + } catch (LZ4Exception e) { + throw new IOException(e); + } + } + + bufferOffset = 0; + } + + @Override + public int read() throws IOException { + if (finished) { + return -1; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return -1; + } + + return buffer[bufferOffset++] & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + net.jpountz.util.SafeUtils.checkRange(b, off, len); + if (finished) { + return -1; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return -1; + } + len = Math.min(len, available()); + System.arraycopy(buffer, bufferOffset, b, off, len); + bufferOffset += len; + return len; + } + + @Override + public long skip(long n) throws IOException { + if (finished) { + return 0; + } + if (available() == 0) { + readBlock(); + } + if (finished) { + return 0; + } + n = Math.min(n, available()); + bufferOffset += n; + return n; + } + + @Override + public int available() throws IOException { + return bufferSize - bufferOffset; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + throw new RuntimeException("mark not supported"); + } + + @Override + public synchronized void reset() throws IOException { + throw new RuntimeException("reset not supported"); + } + + @Override + public boolean markSupported() { + return false; + } + +} \ No newline at end of file diff --git a/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java new file mode 100644 index 0000000..591ab16 --- /dev/null +++ b/kafka/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.kafka.common.utils.ByteUtils; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +/** + * A partial implementation of the v1.5.1 LZ4 Frame format. + * + * @see LZ4 Frame Format + * + * This class is not thread-safe. + */ +public final class KafkaLZ4BlockOutputStream extends OutputStream { + + public static final int MAGIC = 0x184D2204; + public static final int LZ4_MAX_HEADER_LENGTH = 19; + public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000; + + public static final String CLOSED_STREAM = "The stream is already closed"; + + public static final int BLOCKSIZE_64KB = 4; + public static final int BLOCKSIZE_256KB = 5; + public static final int BLOCKSIZE_1MB = 6; + public static final int BLOCKSIZE_4MB = 7; + + private final LZ4Compressor compressor; + private final XXHash32 checksum; + private final boolean useBrokenFlagDescriptorChecksum; + private final FLG flg; + private final BD bd; + private final int maxBlockSize; + private OutputStream out; + private byte[] buffer; + private byte[] compressedBuffer; + private int bufferOffset; + private boolean finished; + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other + * values will generate an exception + * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for + * every block of data + * @param useBrokenFlagDescriptorChecksum Default: false. When true, writes an incorrect FrameDescriptor checksum + * compatible with older kafka clients. + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException { + this.out = out; + compressor = LZ4Factory.fastestInstance().fastCompressor(); + checksum = XXHashFactory.fastestInstance().hash32(); + this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum; + bd = new BD(blockSize); + flg = new FLG(blockChecksum); + bufferOffset = 0; + maxBlockSize = bd.getBlockMaximumSize(); + buffer = new byte[maxBlockSize]; + compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)]; + finished = false; + writeHeader(); + } + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other + * values will generate an exception + * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for + * every block of data + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException { + this(out, blockSize, blockChecksum, false); + } + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The stream to compress + * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other + * values will generate an exception + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException { + this(out, blockSize, false, false); + } + + /** + * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. + * + * @param out The output stream to compress + * @throws IOException + */ + public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException { + this(out, BLOCKSIZE_64KB); + } + + public KafkaLZ4BlockOutputStream(OutputStream out, boolean useBrokenHC) throws IOException { + this(out, BLOCKSIZE_64KB, false, useBrokenHC); + } + + /** + * Check whether KafkaLZ4BlockInputStream is configured to write an + * incorrect Frame Descriptor checksum, which is useful for + * compatibility with old client implementations. + */ + public boolean useBrokenFlagDescriptorChecksum() { + return this.useBrokenFlagDescriptorChecksum; + } + + /** + * Writes the magic number and frame descriptor to the underlying {@link OutputStream}. + * + * @throws IOException + */ + private void writeHeader() throws IOException { + ByteUtils.writeUnsignedIntLE(buffer, 0, MAGIC); + bufferOffset = 4; + buffer[bufferOffset++] = flg.toByte(); + buffer[bufferOffset++] = bd.toByte(); + // TODO write uncompressed content size, update flg.validate() + + // compute checksum on all descriptor fields + int offset = 4; + int len = bufferOffset - offset; + if (this.useBrokenFlagDescriptorChecksum) { + len += offset; + offset = 0; + } + byte hash = (byte) ((checksum.hash(buffer, offset, len, 0) >> 8) & 0xFF); + buffer[bufferOffset++] = hash; + + // write out frame descriptor + out.write(buffer, 0, bufferOffset); + bufferOffset = 0; + } + + /** + * Compresses buffered data, optionally computes an XXHash32 checksum, and writes the result to the underlying + * {@link OutputStream}. + * + * @throws IOException + */ + private void writeBlock() throws IOException { + if (bufferOffset == 0) { + return; + } + + int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0); + byte[] bufferToWrite = compressedBuffer; + int compressMethod = 0; + + // Store block uncompressed if compressed length is greater (incompressible) + if (compressedLength >= bufferOffset) { + bufferToWrite = buffer; + compressedLength = bufferOffset; + compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK; + } + + // Write content + ByteUtils.writeUnsignedIntLE(out, compressedLength | compressMethod); + out.write(bufferToWrite, 0, compressedLength); + + // Calculate and write block checksum + if (flg.isBlockChecksumSet()) { + int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0); + ByteUtils.writeUnsignedIntLE(out, hash); + } + bufferOffset = 0; + } + + /** + * Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block checksum) to signal the end + * of the block stream. + * + * @throws IOException + */ + private void writeEndMark() throws IOException { + ByteUtils.writeUnsignedIntLE(out, 0); + // TODO implement content checksum, update flg.validate() + } + + @Override + public void write(int b) throws IOException { + ensureNotFinished(); + if (bufferOffset == maxBlockSize) { + writeBlock(); + } + buffer[bufferOffset++] = (byte) b; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + net.jpountz.util.SafeUtils.checkRange(b, off, len); + ensureNotFinished(); + + int bufferRemainingLength = maxBlockSize - bufferOffset; + // while b will fill the buffer + while (len > bufferRemainingLength) { + // fill remaining space in buffer + System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength); + bufferOffset = maxBlockSize; + writeBlock(); + // compute new offset and length + off += bufferRemainingLength; + len -= bufferRemainingLength; + bufferRemainingLength = maxBlockSize; + } + + System.arraycopy(b, off, buffer, bufferOffset, len); + bufferOffset += len; + } + + @Override + public void flush() throws IOException { + if (!finished) { + writeBlock(); + } + if (out != null) { + out.flush(); + } + } + + /** + * A simple state check to ensure the stream is still open. + */ + private void ensureNotFinished() { + if (finished) { + throw new IllegalStateException(CLOSED_STREAM); + } + } + + @Override + public void close() throws IOException { + try { + if (!finished) { + // basically flush the buffer writing the last block + writeBlock(); + // write the end block + writeEndMark(); + } + } finally { + try { + if (out != null) { + try (OutputStream outStream = out) { + outStream.flush(); + } + } + } finally { + out = null; + buffer = null; + compressedBuffer = null; + finished = true; + } + } + } + + public static class FLG { + + private static final int VERSION = 1; + + private final int reserved; + private final int contentChecksum; + private final int contentSize; + private final int blockChecksum; + private final int blockIndependence; + private final int version; + + public FLG() { + this(false); + } + + public FLG(boolean blockChecksum) { + this(0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION); + } + + private FLG(int reserved, + int contentChecksum, + int contentSize, + int blockChecksum, + int blockIndependence, + int version) { + this.reserved = reserved; + this.contentChecksum = contentChecksum; + this.contentSize = contentSize; + this.blockChecksum = blockChecksum; + this.blockIndependence = blockIndependence; + this.version = version; + validate(); + } + + public static FLG fromByte(byte flg) { + int reserved = (flg >>> 0) & 3; + int contentChecksum = (flg >>> 2) & 1; + int contentSize = (flg >>> 3) & 1; + int blockChecksum = (flg >>> 4) & 1; + int blockIndependence = (flg >>> 5) & 1; + int version = (flg >>> 6) & 3; + + return new FLG(reserved, + contentChecksum, + contentSize, + blockChecksum, + blockIndependence, + version); + } + + public byte toByte() { + return (byte) (((reserved & 3) << 0) | ((contentChecksum & 1) << 2) + | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6)); + } + + private void validate() { + if (reserved != 0) { + throw new RuntimeException("Reserved bits must be 0"); + } + if (blockIndependence != 1) { + throw new RuntimeException("Dependent block stream is unsupported"); + } + if (version != VERSION) { + throw new RuntimeException(String.format("Version %d is unsupported", version)); + } + } + + public boolean isContentChecksumSet() { + return contentChecksum == 1; + } + + public boolean isContentSizeSet() { + return contentSize == 1; + } + + public boolean isBlockChecksumSet() { + return blockChecksum == 1; + } + + public boolean isBlockIndependenceSet() { + return blockIndependence == 1; + } + + public int getVersion() { + return version; + } + } + + public static class BD { + + private final int reserved2; + private final int blockSizeValue; + private final int reserved3; + + public BD() { + this(0, BLOCKSIZE_64KB, 0); + } + + public BD(int blockSizeValue) { + this(0, blockSizeValue, 0); + } + + private BD(int reserved2, int blockSizeValue, int reserved3) { + this.reserved2 = reserved2; + this.blockSizeValue = blockSizeValue; + this.reserved3 = reserved3; + validate(); + } + + public static BD fromByte(byte bd) { + int reserved2 = (bd >>> 0) & 15; + int blockMaximumSize = (bd >>> 4) & 7; + int reserved3 = (bd >>> 7) & 1; + + return new BD(reserved2, blockMaximumSize, reserved3); + } + + private void validate() { + if (reserved2 != 0) { + throw new RuntimeException("Reserved2 field must be 0"); + } + if (blockSizeValue < 4 || blockSizeValue > 7) { + throw new RuntimeException("Block size value must be between 4 and 7"); + } + if (reserved3 != 0) { + throw new RuntimeException("Reserved3 field must be 0"); + } + } + + // 2^(2n+8) + public int getBlockMaximumSize() { + return 1 << ((2 * blockSizeValue) + 8); + } + + public byte toByte() { + return (byte) (((reserved2 & 15) << 0) | ((blockSizeValue & 7) << 4) | ((reserved3 & 1) << 7)); + } + } + +} diff --git a/kafka/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/kafka/src/main/java/org/apache/kafka/common/utils/ByteUtils.java new file mode 100644 index 0000000..6efe311 --- /dev/null +++ b/kafka/src/main/java/org/apache/kafka/common/utils/ByteUtils.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * This classes exposes low-level methods for reading/writing from byte streams or buffers. + */ +public final class ByteUtils { + + private ByteUtils() {} + + /** + * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes + * + * @param buffer The buffer to read from + * @return The integer read, as a long to avoid signedness + */ + public static long readUnsignedInt(ByteBuffer buffer) { + return buffer.getInt() & 0xffffffffL; + } + + /** + * Read an unsigned integer from the given position without modifying the buffers position + * + * @param buffer the buffer to read from + * @param index the index from which to read the integer + * @return The integer read, as a long to avoid signedness + */ + public static long readUnsignedInt(ByteBuffer buffer, int index) { + return buffer.getInt(index) & 0xffffffffL; + } + + /** + * Read an unsigned integer stored in little-endian format from the {@link InputStream}. + * + * @param in The stream to read from + * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) + */ + public static int readUnsignedIntLE(InputStream in) throws IOException { + return in.read() + | (in.read() << 8) + | (in.read() << 16) + | (in.read() << 24); + } + + /** + * Read an unsigned integer stored in little-endian format from a byte array + * at a given offset. + * + * @param buffer The byte array to read from + * @param offset The position in buffer to read from + * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS) + */ + public static int readUnsignedIntLE(byte[] buffer, int offset) { + return (buffer[offset] << 0 & 0xff) + | ((buffer[offset + 1] & 0xff) << 8) + | ((buffer[offset + 2] & 0xff) << 16) + | ((buffer[offset + 3] & 0xff) << 24); + } + + /** + * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. + * + * @param buffer The buffer to write to + * @param index The position in the buffer at which to begin writing + * @param value The value to write + */ + public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) { + buffer.putInt(index, (int) (value & 0xffffffffL)); + } + + /** + * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. + * + * @param buffer The buffer to write to + * @param value The value to write + */ + public static void writeUnsignedInt(ByteBuffer buffer, long value) { + buffer.putInt((int) (value & 0xffffffffL)); + } + + /** + * Write an unsigned integer in little-endian format to the {@link OutputStream}. + * + * @param out The stream to write to + * @param value The value to write + */ + public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException { + out.write(value); + out.write(value >>> 8); + out.write(value >>> 16); + out.write(value >>> 24); + } + + /** + * Write an unsigned integer in little-endian format to a byte array + * at a given offset. + * + * @param buffer The byte array to write to + * @param offset The position in buffer to write to + * @param value The value to write + */ + public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) { + buffer[offset] = (byte) value; + buffer[offset + 1] = (byte) (value >>> 8); + buffer[offset + 2] = (byte) (value >>> 16); + buffer[offset + 3] = (byte) (value >>> 24); + } + + /** + * Read an integer stored in variable-length format using zig-zag decoding from + * Google Protocol Buffers. + * + * @param buffer The buffer to read from + * @return The integer read + * + * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read + */ + public static int readVarint(ByteBuffer buffer) { + int value = 0; + int i = 0; + int b; + while (((b = buffer.get()) & 0x80) != 0) { + value |= (b & 0x7f) << i; + i += 7; + if (i > 28) + throw illegalVarintException(value); + } + value |= b << i; + return (value >>> 1) ^ -(value & 1); + } + + /** + * Read an integer stored in variable-length format using zig-zag decoding from + * Google Protocol Buffers. + * + * @param in The input to read from + * @return The integer read + * + * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read + * @throws IOException if {@link DataInput} throws {@link IOException} + */ + public static int readVarint(DataInput in) throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = in.readByte()) & 0x80) != 0) { + value |= (b & 0x7f) << i; + i += 7; + if (i > 28) + throw illegalVarintException(value); + } + value |= b << i; + return (value >>> 1) ^ -(value & 1); + } + + /** + * Read a long stored in variable-length format using zig-zag decoding from + * Google Protocol Buffers. + * + * @param in The input to read from + * @return The long value read + * + * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read + * @throws IOException if {@link DataInput} throws {@link IOException} + */ + public static long readVarlong(DataInput in) throws IOException { + long value = 0L; + int i = 0; + long b; + while (((b = in.readByte()) & 0x80) != 0) { + value |= (b & 0x7f) << i; + i += 7; + if (i > 63) + throw illegalVarlongException(value); + } + value |= b << i; + return (value >>> 1) ^ -(value & 1); + } + + /** + * Read a long stored in variable-length format using zig-zag decoding from + * Google Protocol Buffers. + * + * @param buffer The buffer to read from + * @return The long value read + * + * @throws IllegalArgumentException if variable-length value does not terminate after 10 bytes have been read + */ + public static long readVarlong(ByteBuffer buffer) { + long value = 0L; + int i = 0; + long b; + while (((b = buffer.get()) & 0x80) != 0) { + value |= (b & 0x7f) << i; + i += 7; + if (i > 63) + throw illegalVarlongException(value); + } + value |= b << i; + return (value >>> 1) ^ -(value & 1); + } + + /** + * Write the given integer following the variable-length zig-zag encoding from + * Google Protocol Buffers + * into the output. + * + * @param value The value to write + * @param out The output to write to + */ + public static void writeVarint(int value, DataOutput out) throws IOException { + int v = (value << 1) ^ (value >> 31); + while ((v & 0xffffff80) != 0L) { + out.writeByte((v & 0x7f) | 0x80); + v >>>= 7; + } + out.writeByte((byte) v); + } + + /** + * Write the given integer following the variable-length zig-zag encoding from + * Google Protocol Buffers + * into the buffer. + * + * @param value The value to write + * @param buffer The output to write to + */ + public static void writeVarint(int value, ByteBuffer buffer) { + int v = (value << 1) ^ (value >> 31); + while ((v & 0xffffff80) != 0L) { + byte b = (byte) ((v & 0x7f) | 0x80); + buffer.put(b); + v >>>= 7; + } + buffer.put((byte) v); + } + + /** + * Write the given integer following the variable-length zig-zag encoding from + * Google Protocol Buffers + * into the output. + * + * @param value The value to write + * @param out The output to write to + */ + public static void writeVarlong(long value, DataOutput out) throws IOException { + long v = (value << 1) ^ (value >> 63); + while ((v & 0xffffffffffffff80L) != 0L) { + out.writeByte(((int) v & 0x7f) | 0x80); + v >>>= 7; + } + out.writeByte((byte) v); + } + + /** + * Write the given integer following the variable-length zig-zag encoding from + * Google Protocol Buffers + * into the buffer. + * + * @param value The value to write + * @param buffer The buffer to write to + */ + public static void writeVarlong(long value, ByteBuffer buffer) { + long v = (value << 1) ^ (value >> 63); + while ((v & 0xffffffffffffff80L) != 0L) { + byte b = (byte) ((v & 0x7f) | 0x80); + buffer.put(b); + v >>>= 7; + } + buffer.put((byte) v); + } + + /** + * Number of bytes needed to encode an integer in variable-length format. + * + * @param value The signed value + */ + public static int sizeOfVarint(int value) { + int v = (value << 1) ^ (value >> 31); + int bytes = 1; + while ((v & 0xffffff80) != 0L) { + bytes += 1; + v >>>= 7; + } + return bytes; + } + + /** + * Number of bytes needed to encode a long in variable-length format. + * + * @param value The signed value + */ + public static int sizeOfVarlong(long value) { + long v = (value << 1) ^ (value >> 63); + int bytes = 1; + while ((v & 0xffffffffffffff80L) != 0L) { + bytes += 1; + v >>>= 7; + } + return bytes; + } + + private static IllegalArgumentException illegalVarintException(int value) { + throw new IllegalArgumentException("Varint is too long, the most significant bit in the 5th byte is set, " + + "converted value: " + Integer.toHexString(value)); + } + + private static IllegalArgumentException illegalVarlongException(long value) { + throw new IllegalArgumentException("Varlong is too long, most significant bit in the 10th byte is set, " + + "converted value: " + Long.toHexString(value)); + } +} diff --git a/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala b/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala index d691ac6..8275703 100644 --- a/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala +++ b/kafka/src/main/scala/spinoco/protocol/kafka/codec/MessageSetCodec.scala @@ -87,6 +87,7 @@ object MessageSetCodec { compressionType match { case Compression.GZIP => GZipCompression.inflate(v) flatMap decodeCompressed(Compression.GZIP) case Compression.Snappy => SnappyCompression.inflate(v) flatMap decodeCompressed(Compression.Snappy) + case Compression.LZ4 if version == MessageVersion.V1 => LZ4Compression.inflate(v) flatMap decodeCompressed(Compression.LZ4) case Compression.LZ4 => Attempt.failure(Err("LZ4 Compression not yet supported")) } } @@ -117,7 +118,7 @@ object MessageSetCodec { cm.compression match { case Compression.GZIP => encodeCompressed(cm.messages).flatMap(GZipCompression.deflate) case Compression.Snappy => encodeCompressed(cm.messages).flatMap(SnappyCompression.deflate) - case Compression.LZ4 => Attempt.failure(Err("LZ4 Compression not yet supported")) + case Compression.LZ4 => encodeCompressed(cm.messages).flatMap(LZ4Compression.deflate) } val (timeFlag, time) = mkTime(cm.timeStamp) diff --git a/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala b/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala index 68972c9..1f96121 100644 --- a/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala +++ b/kafka/src/main/scala/spinoco/protocol/kafka/codec/compression.scala @@ -5,6 +5,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, Output import java.util.zip.{GZIPInputStream, GZIPOutputStream} import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} +import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream} import scodec.Attempt import scodec.bits.ByteVector @@ -36,6 +37,16 @@ object SnappyCompression { } +object LZ4Compression { + + /** deflates uncompressed bytes **/ + def deflate(bv:ByteVector):Attempt[ByteVector] = + StreamCompression.inflate(bv)(new KafkaLZ4BlockOutputStream(_)) + + def inflate(bv:ByteVector):Attempt[ByteVector] = + StreamCompression.deflate(bv)(new KafkaLZ4BlockInputStream(_ , false)) + +} object StreamCompression { diff --git a/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala b/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala index 712af94..6be0bb9 100644 --- a/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala +++ b/kafka/src/test/scala/spinoco/protocol/kafka/codec/MessageSetCodecSpec.scala @@ -3,7 +3,7 @@ package spinoco.protocol.kafka.codec import java.util.Date -import kafka.message.{ByteBufferMessageSet, GZIPCompressionCodec, MessageAndOffset, SnappyCompressionCodec, Message => KMessage} +import kafka.message.{ByteBufferMessageSet, GZIPCompressionCodec, MessageAndOffset, SnappyCompressionCodec, LZ4CompressionCodec, Message => KMessage} import spinoco.protocol.common.ProtocolSpec import scodec.{Attempt, DecodeResult} import scodec.bits.{BitVector, ByteVector} @@ -94,6 +94,15 @@ class MessageSetCodecSpec extends ProtocolSpec { } + // "and fails when messages are compressed (LZ4) " in { + // val buff = new ByteBufferMessageSet(LZ4CompressionCodec, kMessages0: _*) + + // val r = MessageSetCodec.messageSetCodec.decode(ByteVector(buff.getBuffer).bits) + + // r shouldBe Attempt.failure(Err("0/Message Entry/Message/V0: LZ4 Compression not yet supported")) + + // } + } "deserializes V1" - { @@ -153,6 +162,28 @@ class MessageSetCodecSpec extends ProtocolSpec { } + "when messages are compressed (LZ4)" in { + val buff = new ByteBufferMessageSet(LZ4CompressionCodec, kMessages1: _*) + + val r = MessageSetCodec.messageSetCodec.decode(ByteVector(buff.getBuffer).bits) + + r shouldBe Attempt.successful { + DecodeResult( + Vector( + CompressedMessages( + offset = 2 + , version = MessageVersion.V1 + , compression = Compression.LZ4 + , timeStamp = Some(CreateTime(now)) + , messages = sMessages1 + ) + ) + , BitVector.empty + ) + } + + } + } "serializes V0" - { @@ -181,6 +212,14 @@ class MessageSetCodecSpec extends ProtocolSpec { kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o) } shouldBe sMessages0 } + "when messages are compressed (LZ4)" in { + val compressed = CompressedMessages(10,MessageVersion.V0,Compression.LZ4, None, sMessages0) + val encoded = MessageSetCodec.messageSetCodec.encode(Vector(compressed)) + val kafkaSet = new ByteBufferMessageSet(encoded.map(_.toByteBuffer).getOrElse(fail("Failed to encode"))) + + kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o) } shouldBe sMessages0 + } + } @@ -211,6 +250,14 @@ class MessageSetCodecSpec extends ProtocolSpec { kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o-8) } shouldBe sMessages1 } + "when messages are compressed (LZ4)" in { + val compressed = CompressedMessages(10,MessageVersion.V1,Compression.LZ4, Some(CreateTime(now)), sMessages1) + val encoded = MessageSetCodec.messageSetCodec.encode(Vector(compressed)) + val kafkaSet = new ByteBufferMessageSet(encoded.map(_.toByteBuffer).getOrElse(fail("Failed to encode"))) + + kafkaSet.iterator.toVector.map { case MessageAndOffset(m,o) => kafka2Spinoco(m,o-8) } shouldBe sMessages1 + } + }