Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate array copies #31

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

wolfchimneyrock
Copy link

*Issue number of the reported bug or feature request: #30

Describe your changes

  1. BrokerSession passes Collection instead of PutMessageImpl[] to avoid making copies.
  2. ApplicationData calculates crc32c on the fly to avoid having to make an expensive copy of the whole payload.
  3. ByteBufferOutputStream:
    • add peek() call to make an independent view of the underlying data.
    • don't copy and repack ByteBuffer's written to the ByteBufferOutputStream.
  4. NettyTcpConnection still makes a copy of data read, but let netty make the copy instead of us.

Testing performed
Unit and integration tests have been updated to reflect the changes.

Additional context
As discussed with @sgalichkin

Copy link
Collaborator

@sgalichkin sgalichkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to provide some feedback while still looking at ByteBufferOutputStream and its usage

brokerSession.post(this, msgs);
return msgs;
public void flush() throws BMQException {
brokerSession.post(this, putMessages.getAndSet(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not we check here if there are messages to send? Otherwise we may just replace empty array with another empty array

synchronized (lock) {
putMessages.add(message);
}
putMessages.get().add(message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose .get().add() is not an atomic operation, so it's possible get the array which is being flushed and add new messages afterward and eventually lost them or modify the collection while it is being iterated in PutPoster?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get() itself is atomic, and so is the .getAndSet() in flush, so afaik I don't think its possible to get the array that is being flushed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, get() and getAndSet() are atomic, but the following sequence seems to be possible:
[t1] .get() returns an array to flush
[t2] .getAndSet() returns the same array
[t2] is iterating the array and processing the messages
[t1] calls .add() which modifies the array

Correct?

@@ -1249,6 +1253,14 @@ public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQExce
}
}

public void post(QueueHandle queueHandle, PutMessageImpl msg) throws BMQException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need there there two methods here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonList() is more efficient for a single element than Arrays.asList()

pack(msgs);
flush();
}

public void post(PutMessageImpl msg) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here

public void writeBytes(String s) throws IOException {
throw new UnsupportedOperationException();
}

public void writeBytes(ByteBuffer b) throws IOException {
private boolean bufferIsFresh(ByteBuffer b) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to bufferIsClear()?

.toArray(ByteBuffer[]::new);
}

private ArrayList<ByteBuffer> buffers() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah maybe I can get rid of it

*/
public ByteBuffer[] peek() {
return bbArray.stream()
.map(bb -> (ByteBuffer) (bb.duplicate().flip()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe asReadOnlyBuffer() instead of duplicate()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that first, but a lot of unit tests then fail with:

[ERROR] com.bloomberg.bmq.impl.infr.proto.SchemaEventImplBuilderTest.testBuildOpenQueueJson  Time elapsed: 0.001 s  <<< ERROR!
java.nio.ReadOnlyBufferException
        at java.base/java.nio.ByteBuffer.array(ByteBuffer.java:1473)
        at com.bloomberg.bmq.util.TestHelpers.buffersContents(TestHelpers.java:65)
        at com.bloomberg.bmq.util.TestHelpers.compareWithFileContent(TestHelpers.java:91)
        at com.bloomberg.bmq.impl.infr.proto.SchemaEventImplBuilderTest.testBuildOpenQueueJson(SchemaEventImplBuilderTest.java:152)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:345)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
        ```

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the peek() method is called by gson. Is it possible to mark the method to be ignored when serializing?

public void writeBytes(String s) throws IOException {
throw new UnsupportedOperationException();
}

public void writeBytes(ByteBuffer b) throws IOException {
private boolean bufferIsFresh(ByteBuffer b) {
// a buffer that has never been put() to nor flipped
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not completely sure if this statement is correct. From javadocs:
clear() makes a buffer ready for a new sequence of channel-read or relative put operations: It sets the limit to the capacity and the position to zero.

So it is possible to put some data (up to limit which may be set to capacity value) and then call clear() in order to be able to read it.

Or my understanding is incorrect and this is a requirement for ByteBuffer argument to be "fresh"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one thing that really bugs me about nio.ByteBuffer, there is no way to determine if a buffer that has no data vs one that has data but has already been flipped for reading. isClear() is probably a better name for this method ...

but in the long term, I think the better thing to do is replace every usage of nio.ByteBuffer with netty.ByteBuf in the entire SDK, which would

  1. allow to eliminate that initial copy from NettyTcpConnection
  2. allow to delete ByteBufferOutputStream and ByteBufferInputStream entirely since netty.ByteBufOutputStream can be used instead

Copy link
Collaborator

@sgalichkin sgalichkin Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one thing that really bugs me about nio.ByteBuffer, there is no way to determine if a buffer that has no data vs one that has data but has already been flipped for reading. isClear() is probably a better name for this method ...

Agree, but maybe we do not need to determine this and just take what is between 0 and limit regardless of current position?

but in the long term, I think the better thing to do is replace every usage of nio.ByteBuffer with netty.ByteBuf in the entire SDK, which would

  1. allow to eliminate that initial copy from NettyTcpConnection
  2. allow to delete ByteBufferOutputStream and ByteBufferInputStream entirely since netty.ByteBufOutputStream can be used instead

I would agree but AFAIK the limited use of netty and its components was intentional. The reason is that in some day SDK may switch to another network library and in case of wide use of its components in the code e.g. netty.ByteBuf instead of nio.ByteBuffer etc. it would require a lot of work to do. And having netty.ByteBuf in API class (Queue) will make it even more difficult.

I suggest to have a quick call tomorrow to discuss this

continue;
}
getNewBuffer();
if (bufferIsFresh(b)) return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So taking into account the comment above, is this statement correct?

// TODO - can this copy be eliminated?
ByteBuffer nioBuf = ByteBuffer.allocate(byteBuf.readableBytes());
byteBuf.readBytes(nioBuf);
readBuffer.writeBuffer(nioBuf);
Copy link
Collaborator

@sgalichkin sgalichkin Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here we make a temporary buffer which gets bytes from netty one and then is added to the output stream as is without any copies?

Maybe it makes sense to add a comment saying this.
And maybe I would preserve a part of the removed comment in order to explain why we need to make a copy in general

@wolfchimneyrock
Copy link
Author

wolfchimneyrock commented Sep 22, 2023

I was able to run the JMH benchmarks with this latest commit, here are the results

Before:

Benchmark                                               Mode  Cnt     Score     Error  Units
ApplicationDataBenchmark.testZlibStreamInOut           thrpt   10     8.664 ±   0.379  ops/s
SessionBenchmark.sendReceive512B                       thrpt    5  1690.186 ±  76.546  ops/s
SessionBenchmark.sendReceive512B_Zlib                  thrpt    5  1686.011 ± 115.191  ops/s
SessionBenchmark.sendReceive512KiB                     thrpt    5     6.356 ±   0.110  ops/s
SessionBenchmark.sendReceive512KiB_Zlib                thrpt    5     6.360 ±   0.184  ops/s
SessionBenchmark.sendReceive5MiB                       thrpt    5    12.180 ±   0.367  ops/s
SessionBenchmark.sendReceive5MiB_Zlib                  thrpt    5    15.087 ±   0.116  ops/s
SessionBenchmark.sendReceive60MiB                      thrpt    5     1.013 ±   0.037  ops/s
SessionBenchmark.sendReceive60MiB_Zlib                 thrpt    5     1.358 ±   0.021  ops/s
SessionBenchmark.sendReceiveBatch1000ZlibConfirmLater  thrpt    5     0.169 ±   0.003  ops/s
SessionBenchmark.sendReceiveBatch1000ZlibConfirmNow    thrpt    5     0.225 ±   0.014  ops/s
SessionBenchmark.sendReceiveBatch100ConfirmLater       thrpt    5     1.539 ±   0.093  ops/s
SessionBenchmark.sendReceiveBatch100ConfirmNow         thrpt    5     2.155 ±   0.056  ops/s
SessionBenchmark.sendReceiveBatch100ZlibConfirmNow     thrpt    5     2.120 ±   0.088  ops/s
SessionBenchmark.sendReceiveBatch800ZlibConfirmLater   thrpt    5     0.197 ±   0.003  ops/s

After:

Benchmark                                               Mode  Cnt     Score     Error  Units
ApplicationDataBenchmark.testZlibStreamInOut           thrpt   10     6.088 ±   0.569  ops/s
SessionBenchmark.sendReceive512B                       thrpt    5  1881.041 ± 178.831  ops/s
SessionBenchmark.sendReceive512B_Zlib                  thrpt    5  1868.423 ± 186.251  ops/s
SessionBenchmark.sendReceive512KiB                     thrpt    5    18.013 ±   0.627  ops/s
SessionBenchmark.sendReceive512KiB_Zlib                thrpt    5    16.877 ±   0.652  ops/s
SessionBenchmark.sendReceive5MiB                       thrpt    5    34.717 ±   0.714  ops/s
SessionBenchmark.sendReceive5MiB_Zlib                  thrpt    5    16.922 ±   0.130  ops/s
SessionBenchmark.sendReceive60MiB                      thrpt    5     3.089 ±   0.121  ops/s
SessionBenchmark.sendReceive60MiB_Zlib                 thrpt    5     1.503 ±   0.031  ops/s
SessionBenchmark.sendReceiveBatch1000ZlibConfirmLater  thrpt    5     0.181 ±   0.003  ops/s
SessionBenchmark.sendReceiveBatch1000ZlibConfirmNow    thrpt    5     0.238 ±   0.009  ops/s
SessionBenchmark.sendReceiveBatch100ConfirmLater       thrpt    5     8.246 ±   0.714  ops/s
SessionBenchmark.sendReceiveBatch100ConfirmNow         thrpt    5     8.279 ±   0.657  ops/s
SessionBenchmark.sendReceiveBatch100ZlibConfirmNow     thrpt    5     2.242 ±   0.074  ops/s
SessionBenchmark.sendReceiveBatch800ZlibConfirmLater   thrpt    5     0.226 ±   0.005  ops/s

except for the individual 512B messages, looks like 2x - 4x higher throughput for uncompressed.
for ZLib, I think there is still some bottleneck as the increase isn't so pronounced.

@sgalichkin sgalichkin assigned 678098 and unassigned sgalichkin Mar 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants