Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-4306: Avoid CloseSessionTxns larger than jute.maxbuffer, branch-3.9 #2201

Open
wants to merge 11 commits into
base: branch-3.9
Choose a base branch
from
Open
3 changes: 2 additions & 1 deletion zookeeper-client/zookeeper-client-c/include/zookeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ enum ZOO_ERRORS {
ZNOWATCHER = -121, /*!< The watcher couldn't be found */
ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */
ZSESSIONCLOSEDREQUIRESASLAUTH = -124, /*!< The session has been closed by server because server requires client to do authentication via configured authentication scheme at server, but client is not configured with required authentication scheme or configured but failed (i.e. wrong credential used.). */
ZTHROTTLEDOP = -127 /*!< Operation was throttled and not executed at all. please, retry! */
ZTHROTTLEDOP = -127, /*!< Operation was throttled and not executed at all. please, retry! */
ZTOOMANYEPHEMERALS = -128 /*!< Adding an ephemeral with the requested path could overflow transaction size */

/* when adding/changing values here also update zerror(int) to return correct error message */
};
Expand Down
2 changes: 2 additions & 0 deletions zookeeper-client/zookeeper-client-c/src/zookeeper.c
Original file line number Diff line number Diff line change
Expand Up @@ -5051,6 +5051,8 @@ const char* zerror(int c)
return "session closed by server because client is required to do SASL authentication";
case ZTHROTTLEDOP:
return "Operation was throttled due to high load";
case ZTOOMANYEPHEMERALS:
return "Adding an ephemeral with the requested path could overflow transaction size";
}
if (c > 0) {
return strerror(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,16 @@ public void writeDouble(double d, String tag) throws IOException {
}

/**
* create our own char encoder to utf8. This is faster
* then string.getbytes(UTF8).
* Encodes the characters of {@code s} into an UTF-8-like byte
* sequence. This method reuses a local {@code ByteBuffer} and
* should seldom allocate.
*
* @param s the string to encode into utf8
* @return utf8 byte sequence.
* <p>Note that this is not a full-blown UTF-8 implementation;
* notably, it does not decode UTF-16 surrogate pairs, and rather
* encodes each {@code char} individually.
*
* @param s the string to encode
* @return the resulting byte sequence
*/
private ByteBuffer stringToByteBuffer(CharSequence s) {
bb.clear();
Expand Down Expand Up @@ -110,6 +115,35 @@ private ByteBuffer stringToByteBuffer(CharSequence s) {
return bb;
}

/**
* Computes the exact payload size of a character sequence as
* encoded by the {@link #stringToByteBuffer(CharSequence)
* stringToByteBuffer} method, without any "length descriptor".
*
* <p>Note that the algorithm used by {@code stringToByteBuffer}
* does not match {@code StandardCharsets.UTF_8}; this method
* "emulates" the former.
*
* @param s the string to encode
* @return the serialized payload size in bytes
* @throws ArithmeticException if the result overflows an int
*/
private static int serializedStringPayloadSize(CharSequence s) {
int size = 0;
final int len = s.length();
for (int i = 0; i < len; i++) {
char c = s.charAt(i);
if (c < 0x80) {
size = Math.addExact(size, 1);
} else if (c < 0x800) {
size = Math.addExact(size, 2);
} else {
size = Math.addExact(size, 3);
}
}
return size;
}

public void writeString(String s, String tag) throws IOException {
if (s == null) {
writeInt(-1, "len");
Expand All @@ -122,6 +156,22 @@ public void writeString(String s, String tag) throws IOException {
dataSize += strLen;
}

/**
* Computes the exact serialization size of a string.
*
* @param s the string to encode, potentially {@code null}
* @return the serialization size in bytes
* @throws ArithmeticException if the result overflows an int
*
* @see #serializedStringPayloadSize(CharSequence)
*/
public static int serializedStringSize(String s) {
int payloadSize = s == null ? 0 : serializedStringPayloadSize(s);

// length descriptor + payload.
return Math.addExact(4, payloadSize);
}

public void writeBuffer(byte[] barr, String tag)
throws IOException {
if (barr == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public static KeeperException create(Code code) {
return new QuotaExceededException();
case THROTTLEDOP:
return new ThrottledOpException();
case TOOMANYEPHEMERALS:
return new TooManyEphemeralsException();
case OK:
default:
throw new IllegalArgumentException("Invalid exception code:" + code.code);
Expand Down Expand Up @@ -415,7 +417,9 @@ public enum Code implements CodeDeprecated {
/** Operation was throttled and not executed at all. This error code indicates that zookeeper server
* is under heavy load and can't process incoming requests at full speed; please retry with back off.
*/
THROTTLEDOP (-127);
THROTTLEDOP (-127),
/** Adding an ephemeral with the requested path could overflow transaction size. */
TOOMANYEPHEMERALS(-128);

private static final Map<Integer, Code> lookup = new HashMap<>();

Expand Down Expand Up @@ -514,6 +518,8 @@ static String getCodeMessage(Code code) {
return "Quota has exceeded";
case THROTTLEDOP:
return "Op throttled due to high load";
case TOOMANYEPHEMERALS:
return "Adding an ephemeral with the requested path could overflow transaction size";
default:
return "Unknown error " + code;
}
Expand Down Expand Up @@ -980,4 +986,17 @@ public ThrottledOpException() {
super(Code.THROTTLEDOP);
}
}

/**
* @see Code#TOOMANYEPHEMERALS
*/
@InterfaceAudience.Public
public static class TooManyEphemeralsException extends KeeperException {
public TooManyEphemeralsException() {
super(Code.TOOMANYEPHEMERALS);
}
public TooManyEphemeralsException(String path) {
super(Code.TOOMANYEPHEMERALS, path);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ private static String getMessage(Throwable cause) {
+ "new servers are connected and synced";
} else if (keeperException instanceof KeeperException.QuotaExceededException) {
return "Quota has exceeded : " + keeperException.getPath();
} else if (keeperException instanceof KeeperException.TooManyEphemeralsException) {
return "Adding ephemeral could overflow transaction size : " + keeperException.getPath();
}
}
return cause.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.zookeeper.common;

import org.apache.jute.BinaryOutputArchive;

/**
* Path related utilities
*/
Expand Down Expand Up @@ -123,4 +125,16 @@ public static String getTopNamespace(final String path) {
final String[] parts = path.split("/");
return parts.length > 1 ? parts[1] : null;
}

/**
* Computes the byte size of a path {@code path} as serialized
* into a transaction.
*
* @param path the path
* @return the size in bytes
* @throws ArithmeticException if the result overflows an int
*/
public static int serializedSize(String path) {
return BinaryOutputArchive.serializedStringSize(path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public class DataTree {
/**
* This hashtable lists the paths of the ephemeral nodes of a session.
*/
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>();
private final Map<Long, OwnedEphemerals> ephemerals = new ConcurrentHashMap<>();

/**
* This set contains the paths of all container nodes
Expand Down Expand Up @@ -189,15 +189,20 @@ public class DataTree {

private final DigestCalculator digestCalculator;

@SuppressWarnings("unchecked")
public Set<String> getEphemerals(long sessionId) {
HashSet<String> ret = ephemerals.get(sessionId);
if (ret == null) {
OwnedEphemerals ownedEphemerals = ephemerals.get(sessionId);
if (ownedEphemerals == null) {
return new HashSet<>();
}
synchronized (ret) {
return (HashSet<String>) ret.clone();
return ownedEphemerals.clonePaths();
}

public int getEphemeralsSerializedSize(long sessionId) {
OwnedEphemerals ownedEphemerals = ephemerals.get(sessionId);
if (ownedEphemerals == null) {
return OwnedEphemerals.MIN_SERIALIZED_SIZE;
}
return ownedEphemerals.getSerializedSize();
}

public Set<String> getContainers() {
Expand Down Expand Up @@ -226,8 +231,8 @@ public int getWatchCount() {

public int getEphemeralsCount() {
int result = 0;
for (HashSet<String> set : ephemerals.values()) {
result += set.size();
for (OwnedEphemerals ownedEphemerals : ephemerals.values()) {
result += ownedEphemerals.count();
}
return result;
}
Expand Down Expand Up @@ -492,10 +497,8 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
synchronized (list) {
list.add(path);
}
OwnedEphemerals ownedEphemerals = ephemerals.computeIfAbsent(ephemeralOwner, k -> new OwnedEphemerals());
ownedEphemerals.add(path);
}
if (outputStat != null) {
child.copyStat(outputStat);
Expand Down Expand Up @@ -584,11 +587,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException {
} else if (ephemeralType == EphemeralType.TTL) {
ttls.remove(path);
} else if (owner != 0) {
Set<String> nodes = ephemerals.get(owner);
if (nodes != null) {
synchronized (nodes) {
nodes.remove(path);
}
OwnedEphemerals ownedEphemerals = ephemerals.get(owner);
if (ownedEphemerals != null) {
ownedEphemerals.remove(path);
}
}
}
Expand Down Expand Up @@ -951,8 +952,9 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx
case OpCode.closeSession:
long sessionId = header.getClientId();
if (txn != null) {
OwnedEphemerals ownedEphemerals = ephemerals.remove(sessionId);
killSession(sessionId, header.getZxid(),
ephemerals.remove(sessionId),
ownedEphemerals != null ? ownedEphemerals.clonePaths() : null,
((CloseSessionTxn) txn).getPaths2Delete());
} else {
killSession(sessionId, header.getZxid());
Expand Down Expand Up @@ -1130,7 +1132,10 @@ void killSession(long session, long zxid) {
// so there is no need for synchronization. The list is not
// changed here. Only create and delete change the list which
// are again called from FinalRequestProcessor in sequence.
killSession(session, zxid, ephemerals.remove(session), null);
OwnedEphemerals ownedEphemerals = ephemerals.remove(session);
killSession(session, zxid,
ownedEphemerals != null ? ownedEphemerals.clonePaths() : null,
null);
}

void killSession(long session, long zxid, Set<String> paths2DeleteLocal,
Expand Down Expand Up @@ -1378,8 +1383,8 @@ public void deserialize(InputArchive ia, String tag) throws IOException {
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (owner != 0) {
HashSet<String> list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>());
list.add(path);
OwnedEphemerals ownedEphemerals = ephemerals.computeIfAbsent(owner, k -> new OwnedEphemerals());
ownedEphemerals.add(path);
}
}
path = ia.readString("path");
Expand Down Expand Up @@ -1451,13 +1456,13 @@ public synchronized WatchesSummary getWatchesSummary() {
*/
public void dumpEphemerals(PrintWriter writer) {
writer.println("Sessions with Ephemerals (" + ephemerals.keySet().size() + "):");
for (Entry<Long, HashSet<String>> entry : ephemerals.entrySet()) {
for (Entry<Long, OwnedEphemerals> entry : ephemerals.entrySet()) {
writer.print("0x" + Long.toHexString(entry.getKey()));
writer.println(":");
Set<String> tmp = entry.getValue();
OwnedEphemerals tmp = entry.getValue();
if (tmp != null) {
synchronized (tmp) {
for (String path : tmp) {
for (String path : tmp.clonePaths()) {
writer.println("\t" + path);
}
}
Expand All @@ -1477,10 +1482,8 @@ public void shutdownWatcher() {
*/
public Map<Long, Set<String>> getEphemerals() {
Map<Long, Set<String>> ephemeralsCopy = new HashMap<>();
for (Entry<Long, HashSet<String>> e : ephemerals.entrySet()) {
synchronized (e.getValue()) {
ephemeralsCopy.put(e.getKey(), new HashSet<>(e.getValue()));
}
for (Entry<Long, OwnedEphemerals> e : ephemerals.entrySet()) {
ephemeralsCopy.put(e.getKey(), e.getValue().clonePaths());
}
return ephemeralsCopy;
}
Expand Down Expand Up @@ -1953,6 +1956,59 @@ public long getDigest() {

}

/**
* Holds information about the ephemeral paths associated with a
* session. Currently just a simple wrapper around {@code
* HashSet<String>}.
*/
private static class OwnedEphemerals {
// Serialization starts with a vector length descriptor.
public static final int MIN_SERIALIZED_SIZE = 4;

private HashSet<String> paths = new HashSet<>();

private int serializedSize = MIN_SERIALIZED_SIZE;

@SuppressWarnings("unchecked")
public synchronized Set<String> clonePaths() {
return (Set<String>) paths.clone();
}

public synchronized int count() {
return paths.size();
}

public boolean add(String path) {
int pathSerSize = PathUtils.serializedSize(path);

synchronized (this) {
int newSerSize = Math.addExact(serializedSize, pathSerSize);
boolean result = paths.add(path);
if (result) {
serializedSize = newSerSize;
}
return result;
}
}

public boolean remove(String path) {
int pathSerSize = PathUtils.serializedSize(path);

synchronized (this) {
int newSerSize = Math.subtractExact(serializedSize, pathSerSize);
boolean result = paths.remove(path);
if (result) {
serializedSize = newSerSize;
}
return result;
}
}

public synchronized int getSerializedSize() {
return serializedSize;
}
};

/**
* Create a node stat from the given params.
*
Expand Down
Loading
Loading