Skip to content

Commit 478e50f

Browse files
committed
PROTON-2287 Improve Symbol decoding cache
1 parent a72aff3 commit 478e50f

File tree

8 files changed

+333
-33
lines changed

8 files changed

+333
-33
lines changed

proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBuffer.java

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ public byte get(int index) {
145145
throw new IndexOutOfBoundsException("The given index is not valid: " + index);
146146
}
147147

148+
return _get(index);
149+
}
150+
151+
/**
152+
* Unchecked ie no bound-checks get
153+
*/
154+
private byte _get(int index) {
148155
byte result = 0;
149156

150157
if (index == position) {
@@ -813,8 +820,8 @@ public int hashCode() {
813820
int remaining = remaining();
814821

815822
if (currentArrayIndex < 0 || remaining <= currentArray.length - currentOffset) {
816-
while (remaining > 0) {
817-
hash = 31 * hash + currentArray[currentOffset + --remaining];
823+
if (remaining > 0) {
824+
hash = Hashing.byteBufferCompatibleHashCode(currentArray, currentOffset, currentOffset + remaining);
818825
}
819826
} else {
820827
hash = hashCodeFromComponents();
@@ -875,7 +882,7 @@ public boolean equals(Object other) {
875882
return true;
876883
}
877884

878-
if (hasArray() || remaining <= currentArray.length - currentOffset) {
885+
if (remaining <= currentArray.length - currentOffset || hasArray()) {
879886
// Either there is only one array, or the span to compare is within a single chunk of this buffer,
880887
// allowing the compare to directly access the underlying array instead of using slower get methods.
881888
return equals(currentArray, currentOffset, remaining, buffer);
@@ -885,6 +892,40 @@ public boolean equals(Object other) {
885892
}
886893

887894
private static boolean equals(byte[] buffer, int start, int length, ReadableBuffer other) {
895+
if (other.hasArray()) {
896+
// fast-path: jdk 11 has a vectorized Arrays::equals for ranged comparisons, but
897+
// sadly JDK 8 nope so let's try to save at least bound checks
898+
final int otherStart = other.arrayOffset() + other.position();
899+
return equals(buffer, start, other.array(), otherStart, length);
900+
} else if (other instanceof ByteBufferReader) {
901+
return rawEquals(buffer, start, length, other.byteBuffer());
902+
}
903+
return rawEquals(buffer, start, length, other);
904+
}
905+
906+
private static boolean uncheckedEquals(byte[] buffer, int start, int length, CompositeReadableBuffer other) {
907+
final int position = other.position();
908+
for (int i = 0; i < length; i++) {
909+
if (buffer[start + i] != other._get(position + i)) {
910+
return false;
911+
}
912+
}
913+
return true;
914+
}
915+
916+
private static boolean uncheckedEquals(CompositeReadableBuffer buffer, ByteBuffer other, int length) {
917+
assert buffer.remaining() >= length;
918+
final int otherPosition = other.position();
919+
final int bufferPosition = buffer.position();
920+
for (int i = 0; i < length; i++) {
921+
if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) {
922+
return false;
923+
}
924+
}
925+
return true;
926+
}
927+
928+
private static boolean rawEquals(byte[] buffer, int start, int length, ByteBuffer other) {
888929
final int position = other.position();
889930
for (int i = 0; i < length; i++) {
890931
if (buffer[start + i] != other.get(position + i)) {
@@ -894,18 +935,47 @@ private static boolean equals(byte[] buffer, int start, int length, ReadableBuff
894935
return true;
895936
}
896937

897-
private static boolean equals(ReadableBuffer buffer, ReadableBuffer other) {
898-
final int origPos = buffer.position();
899-
try {
900-
for (int i = other.position(); buffer.hasRemaining(); i++) {
901-
if (!equals(buffer.get(), other.get(i))) {
902-
return false;
903-
}
938+
private static boolean rawEquals(byte[] buffer, int start, int length, ReadableBuffer other) {
939+
final int position = other.position();
940+
for (int i = 0; i < length; i++) {
941+
if (buffer[start + i] != other.get(position + i)) {
942+
return false;
943+
}
944+
}
945+
return true;
946+
}
947+
948+
private static boolean equals(byte[] a, int aStart, byte[] b, int bStart, int length) {
949+
for (int i = 0; i < length; i++) {
950+
if (a[aStart + i] != b[bStart + i]) {
951+
return false;
904952
}
905-
return true;
906-
} finally {
907-
buffer.position(origPos);
908953
}
954+
return true;
955+
}
956+
957+
private static boolean equals(CompositeReadableBuffer buffer, ReadableBuffer other) {
958+
final int bufferRemaining = buffer.remaining();
959+
if (other.hasArray()) {
960+
final int otherStart = other.arrayOffset() + other.position();
961+
// check if otherEnd is beyond other limits, because the underline array is just limited by the capacity
962+
if (other.limit() < otherStart + bufferRemaining) {
963+
throw new BufferUnderflowException();
964+
}
965+
return uncheckedEquals(other.array(), otherStart, bufferRemaining, buffer);
966+
}
967+
if (other instanceof ByteBufferReader) {
968+
return uncheckedEquals(buffer, other.byteBuffer(), bufferRemaining);
969+
}
970+
// slow path
971+
final int bufferPosition = buffer.position();
972+
final int otherPosition = other.position();
973+
for (int i = 0; i < bufferRemaining; i++) {
974+
if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) {
975+
return false;
976+
}
977+
}
978+
return true;
909979
}
910980

911981
@Override
@@ -923,10 +993,6 @@ public String toString() {
923993
return builder.toString();
924994
}
925995

926-
private static boolean equals(byte x, byte y) {
927-
return x == y;
928-
}
929-
930996
private void maybeMoveToNextArray() {
931997
if (currentArray.length == currentOffset) {
932998
if (currentArrayIndex >= 0 && currentArrayIndex < (contents.size() - 1)) {

proton-j/src/main/java/org/apache/qpid/proton/codec/DecoderImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,8 +1075,18 @@ void readRaw(final byte[] data, final int offset, final int length)
10751075

10761076
<V> V readRaw(TypeDecoder<V> decoder, int size)
10771077
{
1078-
V decode = decoder.decode(this, _buffer.slice().limit(size));
1079-
_buffer.position(_buffer.position()+size);
1078+
final int originalLimit = _buffer.limit();
1079+
final int originalPosition = _buffer.position();
1080+
final V decode;
1081+
try {
1082+
decode = decoder.decode(this, _buffer.limit(originalPosition + size));
1083+
} catch (Throwable t) {
1084+
_buffer.position(originalPosition);
1085+
throw t;
1086+
} finally {
1087+
_buffer.limit(originalLimit);
1088+
}
1089+
_buffer.position(originalPosition + size);
10801090
return decode;
10811091
}
10821092

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.qpid.proton.codec;
22+
23+
import java.nio.ByteBuffer;
24+
25+
public class Hashing {
26+
27+
private Hashing() {
28+
29+
}
30+
31+
// this const propagation should be already handled by the JIT
32+
// but we do this to make it more readable
33+
private static final int PRIME_1 = 31;
34+
private static final int PRIME_2 = PRIME_1 * PRIME_1;
35+
private static final int PRIME_3 = PRIME_2 * PRIME_1;
36+
private static final int PRIME_4 = PRIME_3 * PRIME_1;
37+
private static final int PRIME_5 = PRIME_4 * PRIME_1;
38+
private static final int PRIME_6 = PRIME_5 * PRIME_1;
39+
private static final int PRIME_7 = PRIME_6 * PRIME_1;
40+
private static final int PRIME_8 = PRIME_7 * PRIME_1;
41+
42+
public static int byteBufferCompatibleHashCode(ByteBuffer byteBuffer) {
43+
if (byteBuffer.hasArray()) {
44+
final int arrayOffset = byteBuffer.arrayOffset();
45+
final int arrayPosition = arrayOffset + byteBuffer.position();
46+
final int arrayLimit = arrayOffset + byteBuffer.limit();
47+
return byteBufferCompatibleHashCode(byteBuffer.array(), arrayPosition, arrayLimit);
48+
}
49+
// direct ByteBuffers does have some heavy-weight bound checks and memory barriers that
50+
// we just hope JIT to be better then us!
51+
return byteBuffer.hashCode();
52+
}
53+
54+
public static int byteBufferCompatibleHashCode(byte[] bytes, int position, int limit) {
55+
int h = 1;
56+
int remaining = limit - position;
57+
if (remaining == 0) {
58+
return h;
59+
}
60+
int index = limit - 1;
61+
// unrolled version
62+
final int bytesCount = remaining & 7;
63+
if (bytesCount > 0) {
64+
assert h == 1;
65+
h = unrolledHashCode(bytes, index, bytesCount, 1);
66+
index -= bytesCount;
67+
}
68+
final long longsCount = remaining >>> 3;
69+
// let's break the data dependency of each per element hash code
70+
// and save bound checks by manual unrolling 8 ops at time
71+
for (int i = 0; i < longsCount; i++) {
72+
final byte b7 = bytes[index];
73+
final byte b6 = bytes[index - 1];
74+
final byte b5 = bytes[index - 2];
75+
final byte b4 = bytes[index - 3];
76+
final byte b3 = bytes[index - 4];
77+
final byte b2 = bytes[index - 5];
78+
final byte b1 = bytes[index - 6];
79+
final byte b0 = bytes[index - 7];
80+
h = PRIME_8 * h +
81+
PRIME_7 * b7 +
82+
PRIME_6 * b6 +
83+
PRIME_5 * b5 +
84+
PRIME_4 * b4 +
85+
PRIME_3 * b3 +
86+
PRIME_2 * b2 +
87+
PRIME_1 * b1 +
88+
b0;
89+
index -= Long.BYTES;
90+
}
91+
return h;
92+
}
93+
94+
private static int unrolledHashCode(byte[] bytes, int index, int bytesCount, int h) {
95+
// there is still the hash data dependency but is more friendly
96+
// then a plain loop, given that we know no loop is needed here
97+
assert bytesCount > 0 && bytesCount < 8;
98+
h = PRIME_1 * h + bytes[index];
99+
if (bytesCount == 1) {
100+
return h;
101+
}
102+
h = PRIME_1 * h + bytes[index - 1];
103+
if (bytesCount == 2) {
104+
return h;
105+
}
106+
h = PRIME_1 * h + bytes[index - 2];
107+
if (bytesCount == 3) {
108+
return h;
109+
}
110+
h = PRIME_1 * h + bytes[index - 3];
111+
if (bytesCount == 4) {
112+
return h;
113+
}
114+
h = PRIME_1 * h + bytes[index - 4];
115+
if (bytesCount == 5) {
116+
return h;
117+
}
118+
h = PRIME_1 * h + bytes[index - 5];
119+
if (bytesCount == 6) {
120+
return h;
121+
}
122+
h = PRIME_1 * h + bytes[index - 6];
123+
return h;
124+
}
125+
}

proton-j/src/main/java/org/apache/qpid/proton/codec/ReadableBuffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public interface ReadableBuffer {
330330

331331
final class ByteBufferReader implements ReadableBuffer {
332332

333-
private ByteBuffer buffer;
333+
private final ByteBuffer buffer;
334334

335335
public static ByteBufferReader allocate(int size) {
336336
ByteBuffer allocated = ByteBuffer.allocate(size);
@@ -522,7 +522,7 @@ public String toString() {
522522

523523
@Override
524524
public int hashCode() {
525-
return buffer.hashCode();
525+
return Hashing.byteBufferCompatibleHashCode(buffer);
526526
}
527527

528528
@Override

proton-j/src/main/java/org/apache/qpid/proton/codec/SymbolType.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class SymbolType extends AbstractPrimitiveType<Symbol>
3535
private final SymbolEncoding _shortSymbolEncoding;
3636

3737
private final Map<ReadableBuffer, Symbol> _symbolCache = new HashMap<ReadableBuffer, Symbol>();
38-
private DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
38+
private final DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
3939
new DecoderImpl.TypeDecoder<Symbol>()
4040
{
4141
@Override
@@ -44,7 +44,7 @@ public Symbol decode(DecoderImpl decoder, ReadableBuffer buffer)
4444
Symbol symbol = _symbolCache.get(buffer);
4545
if (symbol == null)
4646
{
47-
byte[] bytes = new byte[buffer.limit()];
47+
byte[] bytes = new byte[buffer.remaining()];
4848
buffer.get(bytes);
4949

5050
String str = new String(bytes, ASCII_CHARSET);

tests/performance-jmh/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<name>Proton-J JMH Performance Tests</name>
3131

3232
<properties>
33-
<jmh-version>1.19</jmh-version>
33+
<jmh-version>1.25.2</jmh-version>
3434
</properties>
3535

3636
<dependencies>

tests/performance-jmh/src/main/java/org/apache/qpid/proton/codec/CompositeReadableBufferEqualsBenchmark.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
2424
import org.openjdk.jmh.annotations.Benchmark;
2525
import org.openjdk.jmh.annotations.BenchmarkMode;
26+
import org.openjdk.jmh.annotations.Fork;
2627
import org.openjdk.jmh.annotations.Measurement;
2728
import org.openjdk.jmh.annotations.Mode;
2829
import org.openjdk.jmh.annotations.OutputTimeUnit;
@@ -43,15 +44,16 @@
4344
@State(Scope.Benchmark)
4445
@BenchmarkMode(Mode.AverageTime)
4546
@OutputTimeUnit(TimeUnit.NANOSECONDS)
46-
@Warmup(iterations = 5, time = 1)
47-
@Measurement(iterations = 5, time = 1)
47+
@Warmup(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS)
48+
@Measurement(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS)
49+
@Fork(2)
4850
public class CompositeReadableBufferEqualsBenchmark {
4951

5052
private CompositeReadableBuffer composite;
51-
@Param({"8", "64", "1024"})
53+
@Param({"8", "16", "64"})
5254
private int size;
5355
private ReadableBuffer.ByteBufferReader bufferReader;
54-
@Param({"false", "true"})
56+
@Param({ "false", "true" })
5557
private boolean direct;
5658
@Param({"1", "2"})
5759
private int chunks;
@@ -97,11 +99,6 @@ public static void main(String[] args) throws RunnerException {
9799
public static void runBenchmark(Class<?> benchmarkClass) throws RunnerException {
98100
final Options opt = new OptionsBuilder()
99101
.include(benchmarkClass.getSimpleName())
100-
.addProfiler(GCProfiler.class)
101-
.shouldDoGC(true)
102-
.warmupIterations(5)
103-
.measurementIterations(5)
104-
.forks(1)
105102
.build();
106103
new Runner(opt).run();
107104
}

0 commit comments

Comments
 (0)