Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Refactor transcoder compress logic. #691

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,33 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import net.spy.memcached.CachedData;
import net.spy.memcached.compat.CloseUtil;
import net.spy.memcached.compat.SpyObject;

/**
* Base class for any transcoders that may want to work with serialized or
* compressed data.
*/
public abstract class BaseSerializingTranscoder extends SpyObject {

/**
* Default compression threshold value.
*/
public static final int DEFAULT_COMPRESSION_THRESHOLD = 16384;

private static final String DEFAULT_CHARSET = "UTF-8";

protected int compressionThreshold = DEFAULT_COMPRESSION_THRESHOLD;
protected String charset = DEFAULT_CHARSET;

private final Compressor compressor;
private final int maxSize;

/**
* Initialize a serializing transcoder with the given maximum data size.
*/
public BaseSerializingTranscoder(int max) {
this(max, new GzipCompressor());
}

/**
* Initialize a serializing transcoder with the given maximum data size and compressor.
*/
public BaseSerializingTranscoder(int max, Compressor compressor) {
super();
this.compressor = compressor;
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
maxSize = max;
}

Expand All @@ -67,7 +64,9 @@ public boolean asyncDecode(CachedData d) {
* @param to the number of bytes
*/
public void setCompressionThreshold(int to) {
compressionThreshold = to;
if (compressor != null) {
compressor.setCompressionThreshold(to);
}
}

/**
Expand All @@ -83,6 +82,35 @@ public void setCharset(String to) {
charset = to;
}

protected CachedData doCompress(byte[] before, int flags, Class<?> type) {
if (compressor == null) {
return new CachedData(flags, before, getMaxSize());
}
if (before.length > compressor.getCompressionThreshold()) {
byte[] compressed = compressor.compress(before);
if (compressed.length < before.length) {
getLogger().debug("Compressed %s from %d to %d", type.getName(),
before.length, compressed.length);
before = compressed;
flags |= SerializingTranscoder.COMPRESSED;
} else if (compressed.length > before.length) {
getLogger().info("Compression increased the size of %s from %d to %d",
type.getName(), before.length, compressed.length);
} else {
getLogger().info("Compression makes same length of %s : %d",
type.getName(), before.length);
}
}
return new CachedData(flags, before, getMaxSize());
}

protected byte[] doDecompress(CachedData cachedData) {
if (compressor == null) {
return cachedData.getData();
}
return compressor.decompress(cachedData.getData());
}

/**
* Get the bytes representing the given serialized object.
*/
Expand Down Expand Up @@ -127,56 +155,6 @@ protected Object deserialize(byte[] in) {
return rv;
}

/**
* Compress the given array of bytes.
*/
protected byte[] compress(byte[] in) {
if (in == null) {
throw new NullPointerException("Can't compress null");
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gz = null;
try {
gz = new GZIPOutputStream(bos);
gz.write(in);
} catch (IOException e) {
throw new RuntimeException("IO exception compressing data", e);
} finally {
CloseUtil.close(gz);
CloseUtil.close(bos);
}
byte[] rv = bos.toByteArray();
getLogger().debug("Compressed %d bytes to %d", in.length, rv.length);
return rv;
}

/**
* Decompress the given array of bytes.
*
* @return null if the bytes cannot be decompressed
*/
protected byte[] decompress(byte[] in) {
ByteArrayOutputStream bos = null;
if (in != null) {
ByteArrayInputStream bis = new ByteArrayInputStream(in);
bos = new ByteArrayOutputStream();
GZIPInputStream gis;
try {
gis = new GZIPInputStream(bis);

byte[] buf = new byte[8192];
int r = -1;
while ((r = gis.read(buf)) > 0) {
bos.write(buf, 0, r);
}
} catch (IOException e) {
getLogger().warn("Failed to decompress data", e);
bos = null;
}
}
return bos == null ? null : bos.toByteArray();
}

/**
* Decode the string with the current character set.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public CollectionTranscoder() {
this(MAX_ELEMENT_BYTES);
}

public CollectionTranscoder(Compressor compressor) {
super(CachedData.MAX_SIZE, compressor);
}

/**
* Get a serializing transcoder that specifies the max data size.
*/
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/net/spy/memcached/transcoders/Compressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package net.spy.memcached.transcoders;

public interface Compressor {
/**
* Default compression threshold value.
*/
int DEFAULT_COMPRESSION_THRESHOLD = 16384;

/**
* Compress the given array of bytes.
*/
public byte[] compress(byte[] in);

/**
* Decompress the given array of bytes.
*
* @return null if the bytes cannot be decompressed
*/
public byte[] decompress(byte[] in);

public int getCompressionThreshold();

public void setCompressionThreshold(int to);
}
74 changes: 74 additions & 0 deletions src/main/java/net/spy/memcached/transcoders/GzipCompressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package net.spy.memcached.transcoders;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import net.spy.memcached.compat.CloseUtil;
import net.spy.memcached.compat.SpyObject;

public class GzipCompressor extends SpyObject implements Compressor {
private int compressionThreshold;

public GzipCompressor() {
this.compressionThreshold = DEFAULT_COMPRESSION_THRESHOLD;
}

public GzipCompressor(int compressionThreshold) {
this.compressionThreshold = compressionThreshold;
}

@Override
public byte[] compress(byte[] in) {
if (in == null) {
throw new NullPointerException("Can't compress null");
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gz = null;
try {
gz = new GZIPOutputStream(bos);
gz.write(in);
} catch (IOException e) {
throw new RuntimeException("IO exception compressing data", e);
} finally {
CloseUtil.close(gz);
CloseUtil.close(bos);
}
return bos.toByteArray();
}

@Override
public byte[] decompress(byte[] in) {
ByteArrayOutputStream bos = null;
if (in != null) {
ByteArrayInputStream bis = new ByteArrayInputStream(in);
bos = new ByteArrayOutputStream();
GZIPInputStream gis;
try {
gis = new GZIPInputStream(bis);
byte[] buf = new byte[8192];
int r = -1;
while ((r = gis.read(buf)) > 0) {
bos.write(buf, 0, r);
}
} catch (IOException e) {
getLogger().warn("Failed to decompress data", e);
bos = null;
}
}
return bos == null ? null : bos.toByteArray();
}

@Override
public int getCompressionThreshold() {
return compressionThreshold;
}

@Override
public void setCompressionThreshold(int compressionThreshold) {
this.compressionThreshold = compressionThreshold;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public InspectObjectSizeTranscoder(LoggingObjectSize objSizeLogger) {
this.objSizeLogger = objSizeLogger;
}

public InspectObjectSizeTranscoder(LoggingObjectSize objSizeLogger, Compressor compressor) {
super(compressor);
this.objSizeLogger = objSizeLogger;
}

@Override
public CachedData encode(Object o) {
CachedData encoded = super.encode(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ public SerializingTranscoder(int max) {
super(max);
}

/**
* Get a serializing transcoder customized compressor.
*/
public SerializingTranscoder(Compressor compressor) {
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
super(CachedData.MAX_SIZE, compressor);
}

/**
* Get a serializing transcoder that specifies the max data size and customized compressor.
*/
public SerializingTranscoder(int max, Compressor compressor) {
super(max, compressor);
}

@Override
public boolean asyncDecode(CachedData d) {
if ((d.getFlags() & COMPRESSED) != 0
Expand All @@ -72,7 +86,7 @@ public Object decode(CachedData d) {
byte[] data = d.getData();
Object rv = null;
if ((d.getFlags() & COMPRESSED) != 0) {
data = decompress(d.getData());
data = doDecompress(d);
}
int flags = d.getFlags() & SPECIAL_MASK;
if ((d.getFlags() & SERIALIZED) != 0 && data != null) {
Expand Down Expand Up @@ -146,20 +160,6 @@ public CachedData encode(Object o) {
flags |= SERIALIZED;
}
assert b != null;
if (b.length > compressionThreshold) {
byte[] compressed = compress(b);
if (compressed.length < b.length) {
getLogger().debug("Compressed %s from %d to %d",
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
o.getClass().getName(), b.length, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().info(
"Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
}
}
return new CachedData(flags, b, getMaxSize());
return doCompress(b, flags, o.getClass());
}

}
21 changes: 6 additions & 15 deletions src/main/java/net/spy/memcached/transcoders/WhalinTranscoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ public WhalinTranscoder() {
super(CachedData.MAX_SIZE);
}

public WhalinTranscoder(Compressor compressor) {
super(CachedData.MAX_SIZE, compressor);
}

public Object decode(CachedData d) {
byte[] data = d.getData();
Object rv = null;
if ((d.getFlags() & COMPRESSED) != 0) {
data = decompress(d.getData());
data = doDecompress(d);
}
if ((d.getFlags() & SERIALIZED) != 0) {
rv = deserialize(data);
Expand Down Expand Up @@ -152,20 +156,7 @@ public CachedData encode(Object o) {
flags |= SERIALIZED;
}
assert b != null;
if (b.length > compressionThreshold) {
byte[] compressed = compress(b);
if (compressed.length < b.length) {
getLogger().debug("Compressed %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().info(
"Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
}
}
return new CachedData(flags, b, getMaxSize());
return doCompress(b, flags, o.getClass());
}

protected Character decodeCharacter(byte[] b) {
Expand Down
Loading
Loading