From 9ce13972c1f696943163a78d85dbdbac4492fcfc Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 15:55:18 +0200 Subject: [PATCH 01/11] ZOOKEEPER-4306: BinaryOutputArchive: Add note about UTF-8-like encoding --- .../java/org/apache/jute/BinaryOutputArchive.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java index 52db1bc3c45..d5edecb90d4 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java @@ -78,11 +78,16 @@ public void writeDouble(double d, String tag) throws IOException { } /** - * create our own char encoder to utf8. This is faster - * then string.getbytes(UTF8). + * Encodes the characters of {@code s} into an UTF-8-like byte + * sequence. This method reuses a local {@code ByteBuffer} and + * should seldom allocate. * - * @param s the string to encode into utf8 - * @return utf8 byte sequence. + * <p>Note that this is not a full-blown UTF-8 implementation; + * notably, it does not decode UTF-16 surrogate pairs, and rather + * encodes each {@code char} individually. + * + * @param s the string to encode + * @return the resulting byte sequence */ private ByteBuffer stringToByteBuffer(CharSequence s) { bb.clear(); From 9b9f41f941cfe422ea569a4d1c6eb99fe392e0ba Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 11:19:59 +0200 Subject: [PATCH 02/11] ZOOKEEPER-4306: BinaryOutputArchive: Add serialized String measurement API --- .../org/apache/jute/BinaryOutputArchive.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java index d5edecb90d4..e46013f337c 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java @@ -115,6 +115,35 @@ private ByteBuffer stringToByteBuffer(CharSequence s) { return bb; } + /** + * Computes the exact payload size of a character sequence as + * encoded by the {@link #stringToByteBuffer(CharSequence) + * stringToByteBuffer} method, without any "length descriptor". + * + * <p>Note that the algorithm used by {@code stringToByteBuffer} + * does not match {@code StandardCharsets.UTF_8}; this method + * "emulates" the former. + * + * @param s the string to encode + * @return the serialized payload size in bytes + * @throws ArithmeticException if the result overflows an int + */ + private static int serializedStringPayloadSize(CharSequence s) { + int size = 0; + final int len = s.length(); + for (int i = 0; i < len; i++) { + char c = s.charAt(i); + if (c < 0x80) { + size = Math.addExact(size, 1); + } else if (c < 0x800) { + size = Math.addExact(size, 2); + } else { + size = Math.addExact(size, 3); + } + } + return size; + } + public void writeString(String s, String tag) throws IOException { if (s == null) { writeInt(-1, "len"); @@ -127,6 +156,22 @@ public void writeString(String s, String tag) throws IOException { dataSize += strLen; } + /** + * Computes the exact serialization size of a string. + * + * @param s the string to encode, potentially {@code null} + * @return the serialization size in bytes + * @throws ArithmeticException if the result overflows an int + * + * @see #serializedStringPayloadSize(CharSequence) + */ + public static int serializedStringSize(String s) { + int payloadSize = s == null ? 0 : serializedStringPayloadSize(s); + + // length descriptor + payload. + return Math.addExact(4, payloadSize); + } + public void writeBuffer(byte[] barr, String tag) throws IOException { if (barr == null) { From 7711816f738495687b61476ad7378f93c4b378be Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 15:40:34 +0200 Subject: [PATCH 03/11] ZOOKEEPER-4306: Test serialized String measurement --- .../server/util/SerializeUtilsTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java index 4e2421d13de..54b113d05bd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.util; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; @@ -30,6 +31,9 @@ import static org.mockito.Mockito.verify; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; @@ -118,4 +122,31 @@ public Object answer(InvocationOnMock invocation) throws Throwable { assertArrayEquals(baos.toByteArray(), data); } + private void testSerializeStringSizeWith(String s, int expectedLength) throws IOException { + assertEquals(expectedLength, BinaryOutputArchive.serializedStringSize(s)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeString(s, "test"); + baos.close(); + + assertEquals(expectedLength, baos.size()); + } + + @Test + public void testSerializeStringSize() throws IOException { + testSerializeStringSizeWith("", 4); + testSerializeStringSizeWith("test", 8); + + byte[] bytes = new byte[BinaryInputArchive.maxBuffer - 4]; + Arrays.fill(bytes, (byte) 'x'); + testSerializeStringSizeWith(new String(bytes, StandardCharsets.US_ASCII), BinaryInputArchive.maxBuffer); + + testSerializeStringSizeWith("-\u00f4-", 8); + testSerializeStringSizeWith("-\u0939-", 9); + + // Note: 12, not 10, because BinaryOutputArchive's + // stringToByteBuffer encodes each 'char' individually. + testSerializeStringSizeWith("-\ud800\udf48-", 12); + } } From 8edaf1e928e7d90943b95bf8653798ef81379c81 Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 11:35:02 +0200 Subject: [PATCH 04/11] ZOOKEEPER-4306: PathUtils: Add serialized path measurement API --- .../org/apache/zookeeper/common/PathUtils.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java index 7ddc5576db6..c8a1ced60d3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java @@ -18,6 +18,8 @@ package org.apache.zookeeper.common; +import org.apache.jute.BinaryOutputArchive; + /** * Path related utilities */ @@ -123,4 +125,16 @@ public static String getTopNamespace(final String path) { final String[] parts = path.split("/"); return parts.length > 1 ? parts[1] : null; } + + /** + * Computes the byte size of a path {@code path} as serialized + * into a transaction. + * + * @param path the path + * @return the size in bytes + * @throws ArithmeticException if the result overflows an int + */ + public static int serializedSize(String path) { + return BinaryOutputArchive.serializedStringSize(path); + } } From 63bdded927432c9751b423fff37b0da9e54d37c8 Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 10 Jun 2021 15:27:42 +0200 Subject: [PATCH 05/11] ZOOKEEPER-4306: DataTree: Wrap per-session ephemeral path sets This commit does not introduce anything new; it simply refactors the ephemeral path management logic in preparation for storing additional bookkeeping information. --- .../org/apache/zookeeper/server/DataTree.java | 79 ++++++++++++------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 3b61c80d822..0090faf81a8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -151,7 +151,7 @@ public class DataTree { /** * This hashtable lists the paths of the ephemeral nodes of a session. */ - private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>(); + private final Map<Long, OwnedEphemerals> ephemerals = new ConcurrentHashMap<>(); /** * This set contains the paths of all container nodes @@ -189,15 +189,12 @@ public class DataTree { private final DigestCalculator digestCalculator; - @SuppressWarnings("unchecked") public Set<String> getEphemerals(long sessionId) { - HashSet<String> ret = ephemerals.get(sessionId); - if (ret == null) { + OwnedEphemerals ownedEphemerals = ephemerals.get(sessionId); + if (ownedEphemerals == null) { return new HashSet<>(); } - synchronized (ret) { - return (HashSet<String>) ret.clone(); - } + return ownedEphemerals.clonePaths(); } public Set<String> getContainers() { @@ -226,8 +223,8 @@ public int getWatchCount() { public int getEphemeralsCount() { int result = 0; - for (HashSet<String> set : ephemerals.values()) { - result += set.size(); + for (OwnedEphemerals ownedEphemerals : ephemerals.values()) { + result += ownedEphemerals.count(); } return result; } @@ -492,10 +489,8 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (ephemeralOwner != 0) { - HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>()); - synchronized (list) { - list.add(path); - } + OwnedEphemerals ownedEphemerals = ephemerals.computeIfAbsent(ephemeralOwner, k -> new OwnedEphemerals()); + ownedEphemerals.add(path); } if (outputStat != null) { child.copyStat(outputStat); @@ -584,11 +579,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException { } else if (ephemeralType == EphemeralType.TTL) { ttls.remove(path); } else if (owner != 0) { - Set<String> nodes = ephemerals.get(owner); - if (nodes != null) { - synchronized (nodes) { - nodes.remove(path); - } + OwnedEphemerals ownedEphemerals = ephemerals.get(owner); + if (ownedEphemerals != null) { + ownedEphemerals.remove(path); } } } @@ -951,8 +944,9 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx case OpCode.closeSession: long sessionId = header.getClientId(); if (txn != null) { + OwnedEphemerals ownedEphemerals = ephemerals.remove(sessionId); killSession(sessionId, header.getZxid(), - ephemerals.remove(sessionId), + ownedEphemerals != null ? ownedEphemerals.clonePaths() : null, ((CloseSessionTxn) txn).getPaths2Delete()); } else { killSession(sessionId, header.getZxid()); @@ -1130,7 +1124,10 @@ void killSession(long session, long zxid) { // so there is no need for synchronization. The list is not // changed here. Only create and delete change the list which // are again called from FinalRequestProcessor in sequence. - killSession(session, zxid, ephemerals.remove(session), null); + OwnedEphemerals ownedEphemerals = ephemerals.remove(session); + killSession(session, zxid, + ownedEphemerals != null ? ownedEphemerals.clonePaths() : null, + null); } void killSession(long session, long zxid, Set<String> paths2DeleteLocal, @@ -1378,8 +1375,8 @@ public void deserialize(InputArchive ia, String tag) throws IOException { } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (owner != 0) { - HashSet<String> list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>()); - list.add(path); + OwnedEphemerals ownedEphemerals = ephemerals.computeIfAbsent(owner, k -> new OwnedEphemerals()); + ownedEphemerals.add(path); } } path = ia.readString("path"); @@ -1451,13 +1448,13 @@ public synchronized WatchesSummary getWatchesSummary() { */ public void dumpEphemerals(PrintWriter writer) { writer.println("Sessions with Ephemerals (" + ephemerals.keySet().size() + "):"); - for (Entry<Long, HashSet<String>> entry : ephemerals.entrySet()) { + for (Entry<Long, OwnedEphemerals> entry : ephemerals.entrySet()) { writer.print("0x" + Long.toHexString(entry.getKey())); writer.println(":"); - Set<String> tmp = entry.getValue(); + OwnedEphemerals tmp = entry.getValue(); if (tmp != null) { synchronized (tmp) { - for (String path : tmp) { + for (String path : tmp.clonePaths()) { writer.println("\t" + path); } } @@ -1477,10 +1474,8 @@ public void shutdownWatcher() { */ public Map<Long, Set<String>> getEphemerals() { Map<Long, Set<String>> ephemeralsCopy = new HashMap<>(); - for (Entry<Long, HashSet<String>> e : ephemerals.entrySet()) { - synchronized (e.getValue()) { - ephemeralsCopy.put(e.getKey(), new HashSet<>(e.getValue())); - } + for (Entry<Long, OwnedEphemerals> e : ephemerals.entrySet()) { + ephemeralsCopy.put(e.getKey(), e.getValue().clonePaths()); } return ephemeralsCopy; } @@ -1953,6 +1948,32 @@ public long getDigest() { } + /** + * Holds information about the ephemeral paths associated with a + * session. Currently just a simple wrapper around {@code + * HashSet<String>}. + */ + private static class OwnedEphemerals { + private HashSet<String> paths = new HashSet<>(); + + @SuppressWarnings("unchecked") + public synchronized Set<String> clonePaths() { + return (Set<String>) paths.clone(); + } + + public synchronized int count() { + return paths.size(); + } + + public synchronized boolean add(String path) { + return paths.add(path); + } + + public synchronized boolean remove(String path) { + return paths.remove(path); + } + }; + /** * Create a node stat from the given params. * From ab513741282dac1aeac333aa339c284bbb9ad630 Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 10 Jun 2021 16:15:44 +0200 Subject: [PATCH 06/11] ZOOKEEPER-4306: DataTree: Track serialized size of ephemeral paths --- .../org/apache/zookeeper/server/DataTree.java | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 0090faf81a8..23ca66ac9a1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -197,6 +197,14 @@ public Set<String> getEphemerals(long sessionId) { return ownedEphemerals.clonePaths(); } + public int getEphemeralsSerializedSize(long sessionId) { + OwnedEphemerals ownedEphemerals = ephemerals.get(sessionId); + if (ownedEphemerals == null) { + return OwnedEphemerals.MIN_SERIALIZED_SIZE; + } + return ownedEphemerals.getSerializedSize(); + } + public Set<String> getContainers() { return new HashSet<>(containers); } @@ -1954,8 +1962,13 @@ public long getDigest() { * HashSet<String>}. */ private static class OwnedEphemerals { + // Serialization starts with a vector length descriptor. + public static final int MIN_SERIALIZED_SIZE = 4; + private HashSet<String> paths = new HashSet<>(); + private int serializedSize = MIN_SERIALIZED_SIZE; + @SuppressWarnings("unchecked") public synchronized Set<String> clonePaths() { return (Set<String>) paths.clone(); @@ -1965,12 +1978,34 @@ public synchronized int count() { return paths.size(); } - public synchronized boolean add(String path) { - return paths.add(path); + public boolean add(String path) { + int pathSerSize = PathUtils.serializedSize(path); + + synchronized (this) { + int newSerSize = Math.addExact(serializedSize, pathSerSize); + boolean result = paths.add(path); + if (result) { + serializedSize = newSerSize; + } + return result; + } + } + + public boolean remove(String path) { + int pathSerSize = PathUtils.serializedSize(path); + + synchronized (this) { + int newSerSize = Math.subtractExact(serializedSize, pathSerSize); + boolean result = paths.remove(path); + if (result) { + serializedSize = newSerSize; + } + return result; + } } - public synchronized boolean remove(String path) { - return paths.remove(path); + public synchronized int getSerializedSize() { + return serializedSize; } }; From aae6dbd42d00c1d5169391d956cd1409077eaa58 Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 16:33:09 +0200 Subject: [PATCH 07/11] ZOOKEEPER-4306: Test DataTree serialization size --- .../apache/zookeeper/server/DataTreeTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java index 07a69f14f01..0dce399e2fe 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java @@ -32,7 +32,9 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -49,6 +51,7 @@ import org.apache.zookeeper.common.PathTrie; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.metrics.MetricsUtils; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.TxnHeader; import org.junit.jupiter.api.Test; @@ -671,4 +674,37 @@ private void testSerializeLastProcessedZxid(boolean enableForSerialize, boolean } } + private void testEphemeralsSerializedSizeWith(String[] paths) throws Exception { + DataTree dataTree = new DataTree(); + long ephemeralOwner = 42; + + for (String path : paths) { + int lastSlash = path.lastIndexOf('/'); + String parentPath = path.substring(0, lastSlash); + DataNode parent = dataTree.getNode(parentPath); + + dataTree.createNode(path, null, null, ephemeralOwner, parent.stat.getCversion() + 1, 1, 1); + } + + int size = dataTree.getEphemeralsSerializedSize(ephemeralOwner); + Set<String> ephemerals = dataTree.getEphemerals(ephemeralOwner); + + CloseSessionTxn txn = new CloseSessionTxn(new ArrayList<String>(ephemerals)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + txn.serialize(boa, "txn"); + baos.close(); + + assertEquals(size, baos.size()); + } + + @Test + public void testEphemeralsSerializedSize() throws Exception { + testEphemeralsSerializedSizeWith(new String[] { "/test" }); + + testEphemeralsSerializedSizeWith(new String[] { "/a", "/b", "/c" }); + + testEphemeralsSerializedSizeWith(new String[] { "/a", "/\u00f4", "/\u0939", "/\ud800\udf48" }); + } } From a7825fc84d1d64225c7eb5a3399739d1ba4e5212 Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 10:00:41 +0200 Subject: [PATCH 08/11] ZOOKEEPER-4306: PrepRequestProcessor: Prevent CloseSessionTxn "overflow" --- .../server/PrepRequestProcessor.java | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 8404ed9b82e..6a2746d452f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -32,6 +32,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; @@ -683,6 +684,15 @@ private void pRequest2TxnCreate(int type, Request request, Record record) throws } int newCversion = parentRecord.stat.getCversion() + 1; zks.checkQuota(path, null, data, OpCode.create); + long ephemeralOwner = 0; + if (createMode.isContainer()) { + ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER; + } else if (createMode.isTTL()) { + ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl); + } else if (createMode.isEphemeral()) { + checkCloseSessionTxnSize(path, request.sessionId); + ephemeralOwner = request.sessionId; + } if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { @@ -692,14 +702,6 @@ private void pRequest2TxnCreate(int type, Request request, Record record) throws } TxnHeader hdr = request.getHdr(); - long ephemeralOwner = 0; - if (createMode.isContainer()) { - ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER; - } else if (createMode.isTTL()) { - ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl); - } else if (createMode.isEphemeral()) { - ephemeralOwner = request.sessionId; - } StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner); parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; @@ -741,6 +743,33 @@ private static int checkAndIncVersion(int currentVersion, int expectedVersion, S return currentVersion + 1; } + private void checkCloseSessionTxnSize(String path, long sessionId) throws KeeperException.MarshallingErrorException { + int size = PathUtils.serializedSize(path); + + List<String> outstandingPaths = new ArrayList<>(); + DataTree dataTree = zks.getZKDatabase().getDataTree(); + synchronized (zks.outstandingChanges) { + size = Math.addExact(size, dataTree.getEphemeralsSerializedSize(sessionId)); + for (ChangeRecord c : zks.outstandingChanges) { + // Ignoring deleted nodes and existing ephemerals means that we might be + // overcounting. + if (c.stat != null && c.stat.getEphemeralOwner() == sessionId) { + outstandingPaths.add(c.path); + } + } + } + + for (String outstandingPath : outstandingPaths) { + size = Math.addExact(size, PathUtils.serializedSize(outstandingPath)); + } + + if (size > BinaryInputArchive.maxBuffer) { + LOG.info("Rejecting ephemeral path {} as it would overflow session 0x{}", + path, Long.toHexString(sessionId)); + throw new KeeperException.MarshallingErrorException(); + } + } + /** * This method will be called inside the ProcessRequestThread, which is a * singleton, so there will be a single thread calling this code. From bc382388c3128b7f924c3d89c740638b3d0e599c Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 14:52:25 +0200 Subject: [PATCH 09/11] ZOOKEEPER-4306: Introduce TooManyEphemeralsException --- .../org/apache/zookeeper/KeeperException.java | 21 ++++++++++++++++++- .../zookeeper/cli/CliWrapperException.java | 2 ++ .../server/PrepRequestProcessor.java | 4 ++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java index 06826a672ed..1461ffc7a26 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java @@ -151,6 +151,8 @@ public static KeeperException create(Code code) { return new QuotaExceededException(); case THROTTLEDOP: return new ThrottledOpException(); + case TOOMANYEPHEMERALS: + return new TooManyEphemeralsException(); case OK: default: throw new IllegalArgumentException("Invalid exception code:" + code.code); @@ -415,7 +417,9 @@ public enum Code implements CodeDeprecated { /** Operation was throttled and not executed at all. This error code indicates that zookeeper server * is under heavy load and can't process incoming requests at full speed; please retry with back off. */ - THROTTLEDOP (-127); + THROTTLEDOP (-127), + /** Adding an ephemeral with the requested path could overflow transaction size. */ + TOOMANYEPHEMERALS(-128); private static final Map<Integer, Code> lookup = new HashMap<>(); @@ -514,6 +518,8 @@ static String getCodeMessage(Code code) { return "Quota has exceeded"; case THROTTLEDOP: return "Op throttled due to high load"; + case TOOMANYEPHEMERALS: + return "Adding an ephemeral with the requested path could overflow transaction size"; default: return "Unknown error " + code; } @@ -980,4 +986,17 @@ public ThrottledOpException() { super(Code.THROTTLEDOP); } } + + /** + * @see Code#TOOMANYEPHEMERALS + */ + @InterfaceAudience.Public + public static class TooManyEphemeralsException extends KeeperException { + public TooManyEphemeralsException() { + super(Code.TOOMANYEPHEMERALS); + } + public TooManyEphemeralsException(String path) { + super(Code.TOOMANYEPHEMERALS, path); + } + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java index 8095a40c2e0..8318a4f236c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java @@ -56,6 +56,8 @@ private static String getMessage(Throwable cause) { + "new servers are connected and synced"; } else if (keeperException instanceof KeeperException.QuotaExceededException) { return "Quota has exceeded : " + keeperException.getPath(); + } else if (keeperException instanceof KeeperException.TooManyEphemeralsException) { + return "Adding ephemeral could overflow transaction size : " + keeperException.getPath(); } } return cause.getMessage(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 6a2746d452f..ec8cba56de7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -743,7 +743,7 @@ private static int checkAndIncVersion(int currentVersion, int expectedVersion, S return currentVersion + 1; } - private void checkCloseSessionTxnSize(String path, long sessionId) throws KeeperException.MarshallingErrorException { + private void checkCloseSessionTxnSize(String path, long sessionId) throws KeeperException.TooManyEphemeralsException { int size = PathUtils.serializedSize(path); List<String> outstandingPaths = new ArrayList<>(); @@ -766,7 +766,7 @@ private void checkCloseSessionTxnSize(String path, long sessionId) throws Keeper if (size > BinaryInputArchive.maxBuffer) { LOG.info("Rejecting ephemeral path {} as it would overflow session 0x{}", path, Long.toHexString(sessionId)); - throw new KeeperException.MarshallingErrorException(); + throw new KeeperException.TooManyEphemeralsException(path); } } From 0fd0a9846a8703c179bea7eeca0310b182b2b8e8 Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 17:58:39 +0200 Subject: [PATCH 10/11] ZOOKEEPER-4306: Test CloseSessionTxn "overflow" protection --- .../server/CloseSessionTxnSizeTest.java | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java new file mode 100644 index 00000000000..04d8134bc24 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.txn.CloseSessionTxn; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class CloseSessionTxnSizeTest extends ClientBase { + + private static final String JUTE_MAXBUFFER = "jute.maxbuffer"; + + private static final int JUTE_MAXBUFFER_VALUE = 256; + + private String previousMaxBuffer; + + private ZooKeeper zk; + + @BeforeEach + @Override + public void setUp() throws Exception { + previousMaxBuffer = System.setProperty(JUTE_MAXBUFFER, Integer.toString(JUTE_MAXBUFFER_VALUE)); + assertEquals(JUTE_MAXBUFFER_VALUE, BinaryInputArchive.maxBuffer, "Couldn't set jute.maxbuffer!"); + super.setUp(); + zk = createClient(); + } + + @AfterEach + @Override + public void tearDown() throws Exception { + super.tearDown(); + zk.close(); + if (previousMaxBuffer == null) { + System.clearProperty(JUTE_MAXBUFFER); + } else { + System.setProperty(JUTE_MAXBUFFER, previousMaxBuffer); + } + } + + private static String makePath(int length) { + byte[] bytes = new byte[length]; + Arrays.fill(bytes, (byte) 'x'); + bytes[0] = (byte) '/'; + + return new String(bytes, StandardCharsets.US_ASCII); + } + + @Test + public void testCloseSessionTxnSizeFit() throws InterruptedException, IOException, KeeperException { + // 4 bytes for vector length, 4 bytes for string length + String path = makePath((JUTE_MAXBUFFER_VALUE - 4) / 2 - 4 - 1); + + zk.create(path + "x", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + zk.create(path + "y", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + Set<String> paths = serverFactory.getZooKeeperServer() + .getZKDatabase().getDataTree().getEphemerals(zk.getSessionId()); + + // Ensure we are looking at the right session. + assertEquals(2, paths.size(), "Ephemerals count"); + + CloseSessionTxn txn = new CloseSessionTxn(new ArrayList<String>(paths)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); + oa.writeRecord(txn, "CloseSessionTxn"); + + // Check that our encoding assumptions hold. + assertEquals(baos.size(), JUTE_MAXBUFFER_VALUE, "CloseSessionTxn size"); + } + + @Test + public void testCloseSessionTxnSizeOverflow() throws KeeperException, InterruptedException { + String path = makePath((JUTE_MAXBUFFER_VALUE - 4) / 2 - 4 - 1); + + zk.create(path + "x", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + try { + zk.create(path + "yz", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + fail(); + } catch (KeeperException.TooManyEphemeralsException e) { + // Expected + } + } + + @Test + public void testCloseSessionTxnSizeSequential() throws KeeperException, InterruptedException { + String prefix = "/test-"; + String specimen = prefix + "0123456789"; + + int nOk = (JUTE_MAXBUFFER_VALUE - 4) / (specimen.length() + 4); + for (int i = 0; i < nOk; i++) { + zk.create(prefix, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + } + + try { + zk.create(prefix, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + fail(); + } catch (KeeperException.TooManyEphemeralsException e) { + // Expected + } + } +} From 97c3b3c03eba3322084aec8647190176c93fed6e Mon Sep 17 00:00:00 2001 From: Damien Diederen <ddiederen@apache.org> Date: Thu, 17 Jun 2021 15:05:11 +0200 Subject: [PATCH 11/11] ZOOKEEPER-4306: C client: Add ZTOOMANYEPHEMERALS code & message --- zookeeper-client/zookeeper-client-c/include/zookeeper.h | 3 ++- zookeeper-client/zookeeper-client-c/src/zookeeper.c | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/zookeeper-client/zookeeper-client-c/include/zookeeper.h b/zookeeper-client/zookeeper-client-c/include/zookeeper.h index 7d889f60557..b15129e0a33 100644 --- a/zookeeper-client/zookeeper-client-c/include/zookeeper.h +++ b/zookeeper-client/zookeeper-client-c/include/zookeeper.h @@ -141,7 +141,8 @@ enum ZOO_ERRORS { ZNOWATCHER = -121, /*!< The watcher couldn't be found */ ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */ ZSESSIONCLOSEDREQUIRESASLAUTH = -124, /*!< The session has been closed by server because server requires client to do authentication via configured authentication scheme at server, but client is not configured with required authentication scheme or configured but failed (i.e. wrong credential used.). */ - ZTHROTTLEDOP = -127 /*!< Operation was throttled and not executed at all. please, retry! */ + ZTHROTTLEDOP = -127, /*!< Operation was throttled and not executed at all. please, retry! */ + ZTOOMANYEPHEMERALS = -128 /*!< Adding an ephemeral with the requested path could overflow transaction size */ /* when adding/changing values here also update zerror(int) to return correct error message */ }; diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c index 74b04717195..5594e6d40b6 100644 --- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c +++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c @@ -5051,6 +5051,8 @@ const char* zerror(int c) return "session closed by server because client is required to do SASL authentication"; case ZTHROTTLEDOP: return "Operation was throttled due to high load"; + case ZTOOMANYEPHEMERALS: + return "Adding an ephemeral with the requested path could overflow transaction size"; } if (c > 0) { return strerror(c);