Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ahead-of-time compilation #248

Merged
merged 111 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
734b261
Add more logging messages
nck-mlcnv Dec 5, 2023
8a62c90
Fix log4j2 configuration
nck-mlcnv Dec 5, 2023
07b3052
Implement apache HTTP client
nck-mlcnv Jan 12, 2024
78408b9
Implement apache HTTP async client 5
nck-mlcnv Jan 12, 2024
476f528
Fix timeout
nck-mlcnv Jan 12, 2024
9587d60
Fixes
nck-mlcnv Feb 6, 2024
1831b60
Fix hashing bug
nck-mlcnv Feb 7, 2024
a9ca2a5
Fix conversion of byte stream to string
nck-mlcnv Feb 8, 2024
48be6c1
Implement POST request streaming
nck-mlcnv Feb 8, 2024
744b09e
Disable the storing and hashing of responses when the parseResults pa…
nck-mlcnv Feb 13, 2024
d454f6b
Move utility classes
nck-mlcnv Feb 14, 2024
d517e9c
StreamEntityProducer can send fixed-sized data and is reproducible now
nck-mlcnv Feb 16, 2024
c238a40
Make QueryHandler return stream supplier and info about query being c…
nck-mlcnv Feb 16, 2024
d9190cb
Change RequestFactory behavior
nck-mlcnv Feb 16, 2024
c25b560
Cleanup
nck-mlcnv Feb 16, 2024
94bcb3c
Preload requests
nck-mlcnv Feb 16, 2024
52ce0bb
Fix IDE warnings
nck-mlcnv Feb 16, 2024
e2f8199
Fix tests
nck-mlcnv Feb 16, 2024
fc523f0
Remove unneeded test class
nck-mlcnv Feb 16, 2024
bac8488
Add Javadocs
nck-mlcnv Feb 16, 2024
5480342
Add the GraalVM native-maven-plugin for ahead-of-time compilation
nck-mlcnv Feb 14, 2024
4823a8b
Switch to Logback implementation of SLF4J, as Log4j2 is not supported…
nck-mlcnv Feb 14, 2024
986423b
Update native-maven-plugin version
nck-mlcnv Feb 20, 2024
10abf6c
Native-image builder optimizations
nck-mlcnv Feb 20, 2024
57a5217
Merge branch 'develop' into feature/ahead-of-time-compiler
nck-mlcnv Mar 21, 2024
75810ce
Remove pre-made graalvm config
nck-mlcnv Mar 26, 2024
9cc4d7c
Update native profile
nck-mlcnv Mar 26, 2024
f970759
Catch exceptions inside TriplestoreStorage
nck-mlcnv Mar 27, 2024
4611e11
Reset workerId after warmup
nck-mlcnv Mar 27, 2024
eddb49f
Update native image plugin configuration
nck-mlcnv Mar 27, 2024
69538a9
Add scripts for working with native images
nck-mlcnv Mar 27, 2024
5654ee5
Remove spring
nck-mlcnv Apr 24, 2024
f10c504
Rename directory
nck-mlcnv May 22, 2024
3860a3b
Add test workflow
nck-mlcnv May 22, 2024
ad9d99b
Fix permissions
nck-mlcnv May 22, 2024
c3f4606
Remove periods
nck-mlcnv May 22, 2024
180e725
Fix script
nck-mlcnv May 22, 2024
1e4e328
Fix workflow
nck-mlcnv May 22, 2024
caa8805
Update workflow
nck-mlcnv May 22, 2024
a25ec5e
Test directory upload
nck-mlcnv May 22, 2024
fc97825
Update workflows
nck-mlcnv May 22, 2024
d5cc274
Update Test Workflow
nck-mlcnv May 22, 2024
2c88c0e
Fix workflow
nck-mlcnv May 22, 2024
a80beb6
Another fix
nck-mlcnv May 22, 2024
677ccdd
Rename job
nck-mlcnv May 22, 2024
069c7ae
Remove test workflow
nck-mlcnv May 23, 2024
749dfa7
Make workerID go out of scope
nck-mlcnv Jun 11, 2024
151af88
Add comment for registering LanguageProcessors
nck-mlcnv Jun 11, 2024
9b876b1
Clean up logging config
nck-mlcnv Jun 11, 2024
7d84ac0
Fix deploy workflow
nck-mlcnv Jun 11, 2024
286529e
Merge branch 'develop' into feature/ahead-of-time-compiler
nck-mlcnv Jun 11, 2024
8c721d1
Disable non supported tests
nck-mlcnv Jun 13, 2024
8d0adfd
Update pom.xml to automatically generate configuration files for nati…
nck-mlcnv Jun 13, 2024
5d38131
Update workflows
nck-mlcnv Jun 13, 2024
f3a641c
Update documentation
nck-mlcnv Jun 13, 2024
c99582a
Fix symlink
nck-mlcnv Jun 13, 2024
bcd0f9c
Add cpu micro architectures
nck-mlcnv Jun 13, 2024
8628f86
Add cpu micro architectures 2
nck-mlcnv Jun 13, 2024
fdcb710
Update generate-config.sh
nck-mlcnv Jun 17, 2024
e80516e
Fix unstable tests
nck-mlcnv Jun 28, 2024
70f845f
Fix regex cleanup
nck-mlcnv Jun 28, 2024
9fbf565
Enable long running tests on environment variable
nck-mlcnv Jun 28, 2024
70d0a2b
Increase the thread count for the apache http client
nck-mlcnv Jul 4, 2024
3b5c344
Disable re-usage of bbaos and create bbaos of optimal size when possible
nck-mlcnv Jul 4, 2024
871a874
Try to fix something
nck-mlcnv Jul 4, 2024
5daf544
Debug logging
nck-mlcnv Jul 4, 2024
89cb31b
Debug logging 2
nck-mlcnv Jul 4, 2024
a03ac9f
Attempt to fix something
nck-mlcnv Jul 4, 2024
1975ac4
Attempt to fix something 2
nck-mlcnv Jul 4, 2024
f910cec
Attempt to fix something 3
nck-mlcnv Jul 4, 2024
858fc80
Attempt to fix something 4
nck-mlcnv Jul 5, 2024
f35f9aa
Attempt to fix something 5
nck-mlcnv Jul 5, 2024
eeaf789
Attempt to fix something 6
nck-mlcnv Jul 5, 2024
e23b02a
Make thread dump
nck-mlcnv Jul 5, 2024
bd4669f
Make thread dump 2
nck-mlcnv Jul 5, 2024
26867ad
Attempt to fix something 7
nck-mlcnv Jul 5, 2024
1c062ba
Attempt to fix something 8
nck-mlcnv Jul 5, 2024
7ff7acd
Attempt to fix something 9
nck-mlcnv Jul 5, 2024
b613a47
Attempt to fix something 10
nck-mlcnv Jul 5, 2024
ac82a6d
Attempt to fix something 11
nck-mlcnv Jul 5, 2024
2650bfc
Finetuning test
nck-mlcnv Jul 5, 2024
64d7445
Finetuning test 2
nck-mlcnv Jul 5, 2024
ec4749c
Cleanup httpclient configuration
nck-mlcnv Jul 5, 2024
632cce9
Cleanup tests
nck-mlcnv Jul 5, 2024
3bb2e7e
Disable compressed references by default
nck-mlcnv Jul 16, 2024
04b85fe
Remove test configurations
nck-mlcnv Jul 17, 2024
04158d0
Re-enable configurations and decrease timeout in tests
nck-mlcnv Jul 17, 2024
a9f3115
Add workaround for failing tests
nck-mlcnv Jul 17, 2024
6bedf3c
Adjust test configurations
nck-mlcnv Jul 18, 2024
305bc59
Adjust test configurations 2
nck-mlcnv Jul 18, 2024
5488327
Adjust test configurations 3
nck-mlcnv Jul 18, 2024
9bf8cc8
Adjust test configurations 4
nck-mlcnv Jul 18, 2024
25a7ca6
Revert "Adjust test configurations 4"
nck-mlcnv Jul 18, 2024
69b0c0b
Shorten http client configuration
nck-mlcnv Jul 19, 2024
b6928e5
Add ByteArrayList output and inputstream
nck-mlcnv Jul 19, 2024
feb8a18
Update SPARQLProtocolWorker to use ByteArrayListOutputStream when res…
nck-mlcnv Jul 19, 2024
24c5b55
Merge branch 'develop' into feature/ahead-of-time-compiler
nck-mlcnv Jul 19, 2024
2f12e49
Fix bad merge conflict resolve
nck-mlcnv Jul 19, 2024
deea528
Fix size calculation in ByteArrayListOutputStream
nck-mlcnv Jul 19, 2024
09e73a8
Add test + fix for ByteArrayListInputStream
nck-mlcnv Jul 22, 2024
19d0282
Add test for ByteArrayListOutputStream
nck-mlcnv Jul 24, 2024
97cfa4c
Change single log message
nck-mlcnv Jul 24, 2024
bedf55a
Update exception handling in TriplestoreStorage
nck-mlcnv Jul 24, 2024
2214fd8
Add execution parameter to configuration generation
nck-mlcnv Jul 24, 2024
ff6d771
Fix dry-run parameter
nck-mlcnv Jul 24, 2024
edf2a26
Add comment in TriplestoreStorage
nck-mlcnv Jul 24, 2024
d842fa1
Change behavior of ByteArrayListInputStream
nck-mlcnv Jul 24, 2024
38344ab
Add comments and access modifiers
nck-mlcnv Jul 24, 2024
1d28690
Update src/main/java/org/aksw/iguana/cc/storage/impl/TriplestoreStora…
nck-mlcnv Jul 25, 2024
cd25e5c
Update github workflow
nck-mlcnv Jul 25, 2024
5ee69a1
Merge remote-tracking branch 'origin/feature/ahead-of-time-compiler' …
nck-mlcnv Jul 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions graalvm/generate-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ if [ -f "$TARGET_DIR"/native/agent-output/test/resource-config.json ]; then
fi

# Run through multiple different execution paths, so that the tracing agent can generate complete configuration files.
"$GRAALVM_HOME"/bin/java -agentlib:native-image-agent=config-merge-dir=src/main/resources/META-INF/native-image/ -jar "$TARGET_DIR"/iguana.jar --help > /dev/null
"$GRAALVM_HOME"/bin/java -agentlib:native-image-agent=config-merge-dir=src/main/resources/META-INF/native-image/ -jar "$TARGET_DIR"/iguana.jar -is "$SUITE" > /dev/null
"$GRAALVM_HOME"/bin/java -agentlib:native-image-agent=config-merge-dir=src/main/resources/META-INF/native-image/ -jar "$TARGET_DIR"/iguana.jar "$SUITE" > /dev/null
"$GRAALVM_HOME"/bin/java -agentlib:native-image-agent=config-merge-dir=src/main/resources/META-INF/native-image/ -jar "$TARGET_DIR"/iguana.jar --help > /dev/null
"$GRAALVM_HOME"/bin/java -agentlib:native-image-agent=config-merge-dir=src/main/resources/META-INF/native-image/ -jar "$TARGET_DIR"/iguana.jar --dry-run -is "$SUITE" > /dev/null
"$GRAALVM_HOME"/bin/java -agentlib:native-image-agent=config-merge-dir=src/main/resources/META-INF/native-image/ -jar "$TARGET_DIR"/iguana.jar --dry-run "$SUITE" > /dev/null

# there is a bug in the tracing agent that outputs wrong formatted lines in the resource-config.json file (https://github.com/oracle/graal/issues/7985)
sed 's/\\\\E//g' src/main/resources/META-INF/native-image/resource-config.json | sed 's/\\\\Q//g' > src/main/resources/META-INF/native-image/resource-config.json.tmp
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
-march=x86-64-v3
nck-mlcnv marked this conversation as resolved.
Show resolved Hide resolved
--no-fallback
-O3
-H:-UseCompressedReferences
</buildArgs>
<metadataRepository>
<enabled>true</enabled>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ public Path convert(String value) {
@Parameter(names = {"--ignore-schema", "-is"}, description = "Do not check the schema before parsing the suite file.")
private boolean ignoreShema = false;

@Parameter(names = {"--dry-run", "-d"}, hidden = true)
public static boolean dryRun = false;

bigerl marked this conversation as resolved.
Show resolved Hide resolved
@Parameter(names = "--help", help = true)
private boolean help;

@Parameter(description = "suite file {yml,yaml,json}", arity = 1, required = true, converter = PathConverter.class)
private Path suitePath;
}


private static final Logger LOGGER = LoggerFactory.getLogger(MainController.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.aksw.iguana.cc.config.elements.StorageConfig;
import org.aksw.iguana.cc.controller.MainController;
import org.aksw.iguana.cc.storage.Storage;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
Expand All @@ -17,6 +18,7 @@
import org.apache.jena.update.UpdateFactory;
import org.apache.jena.update.UpdateProcessor;
import org.apache.jena.update.UpdateRequest;
import org.mortbay.jetty.Main;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,10 +81,14 @@ public void storeResult(Model data) {
//submit Block to Triple Store
UpdateProcessor processor = UpdateExecutionFactory
.createRemote(blockRequest, endpoint, createHttpClient());
try {
if (MainController.Args.dryRun) {
nck-mlcnv marked this conversation as resolved.
Show resolved Hide resolved
try {
processor.execute();
} catch (Exception e) {
logger.error("Error while storing data in triplestore: " + e.getMessage());
}
} else {
processor.execute();
} catch (Exception e) {
logger.error("Error while storing data in triplestore: " + e.getMessage());
}
blockRequest = new UpdateRequest();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package org.aksw.iguana.cc.worker;

import org.aksw.iguana.cc.lang.LanguageProcessor;
import org.aksw.iguana.commons.io.BigByteArrayInputStream;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -46,18 +45,18 @@ public ResponseBodyProcessor(String contentType) {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService executorHandler = Executors.newScheduledThreadPool(1);

public boolean add(long contentLength, long xxh64, BigByteArrayOutputStream bbaos) {
public boolean add(long contentLength, long xxh64, InputStream responseBodyStream) {
final var key = new Key(contentLength, xxh64);
if (seenResponseBodies.add(key)) {
submit(key, bbaos);
submit(key, responseBodyStream);
return true;
}
return false;
}

private void submit(Key key, BigByteArrayOutputStream bigByteArrayOutputStream) {
private void submit(Key key, InputStream responseBodyStream) {
final var future = executor.submit(() -> {
var processingResult = languageProcessor.process(new BigByteArrayInputStream(bigByteArrayOutputStream), key.xxh64);
var processingResult = languageProcessor.process(responseBodyStream, key.xxh64);
responseDataMetrics.add(processingResult);
});
executorHandler.schedule(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import org.aksw.iguana.cc.worker.ResponseBodyProcessor;
import org.aksw.iguana.cc.worker.HttpWorker;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.aksw.iguana.commons.io.ByteArrayListOutputStream;
import org.aksw.iguana.commons.io.ReversibleOutputStream;
import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
Expand All @@ -22,11 +23,7 @@
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;
Expand Down Expand Up @@ -77,7 +74,7 @@ record HttpExecutionResult(
Optional<HttpResponse> response,
Instant requestStart,
Duration duration,
Optional<BigByteArrayOutputStream> outputStream,
Optional<ReversibleOutputStream> outputStream,
OptionalLong actualContentLength,
OptionalLong hash,
Optional<Exception> exception
Expand Down Expand Up @@ -131,28 +128,19 @@ public static void initHttpClient(int threadCount) {
connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setMaxConnTotal(threadCount * 1000)
.setMaxConnPerRoute(threadCount * 1000)
.setPoolConcurrencyPolicy(PoolConcurrencyPolicy.LAX)
.setConnPoolPolicy(PoolReusePolicy.FIFO)
.setDefaultConnectionConfig(org.apache.hc.client5.http.config.ConnectionConfig.custom()
.setConnectTimeout(Timeout.ofSeconds(5))
.build())
.build();
final var ioReactorConfig = IOReactorConfig.custom()
.setTcpNoDelay(true)
.setIoThreadCount(threadCount + 1)
.setSelectInterval(TimeValue.ofMilliseconds(100))
.setSoKeepAlive(true)
.build();
httpClient = HttpAsyncClients.custom()
.setConnectionManager(connectionManager)
.setIOReactorConfig(ioReactorConfig)
.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
.setConnectionManagerShared(false)
.setDefaultRequestConfig(RequestConfig.custom()
.setContentCompressionEnabled(false)
.setHardCancellationEnabled(true)
.build())
.setRetryStrategy(DefaultHttpRequestRetryStrategy.INSTANCE)
.build();
httpClient.start();
}
Expand Down Expand Up @@ -264,7 +252,7 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure)
}

// process result
responseBodyProcessor.add(result.actualContentLength().orElse(-1), result.hash().orElse(-1), result.outputStream().orElse(new BigByteArrayOutputStream()));
responseBodyProcessor.add(result.actualContentLength().getAsLong(), result.hash().getAsLong(), result.outputStream.get().toInputStream());
}

if (!result.successful() && discardOnFailure) {
Expand Down Expand Up @@ -328,7 +316,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) {
private final StreamingXXHash64 hasher = hasherFactory.newStreamingHash64(0);
private long responseSize = 0; // will be used if parseResults is false
private long responseEnd = 0; // time in nanos
private BigByteArrayOutputStream responseBodybbaos = null;
private ReversibleOutputStream responseBody = null;

@Override
public void releaseResources() {} // nothing to release
Expand All @@ -349,22 +337,22 @@ protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
if (endOfStream)
responseEnd = System.nanoTime();

if (responseBodybbaos == null)
responseBodybbaos = new BigByteArrayOutputStream();
if (responseBody == null)
responseBody = new ByteArrayListOutputStream();

responseSize += src.remaining();
if (config.parseResults()) {
// if the buffer uses an array, use the array directly
if (src.hasArray()) {
hasher.update(src.array(), src.position() + src.arrayOffset(), src.remaining());
responseBodybbaos.write(src.array(), src.position() + src.arrayOffset(), src.remaining());
responseBody.write(src.array(), src.position() + src.arrayOffset(), src.remaining());
} else { // otherwise, copy the buffer to an array
int readCount;
while (src.hasRemaining()) {
readCount = Math.min(BUFFER_SIZE, src.remaining());
src.get(buffer, 0, readCount);
hasher.update(buffer, 0, readCount);
responseBodybbaos.write(buffer, 0, readCount);
responseBody.write(buffer, 0, readCount);
}
}
}
Expand All @@ -383,8 +371,8 @@ protected void start(HttpResponse response, ContentType contentType) {
final var contentLengthHeader = response.getFirstHeader("Content-Length");
Long contentLength = contentLengthHeader != null ? Long.parseLong(contentLengthHeader.getValue()) : null;
// if the content length is known, create a BigByteArrayOutputStream with the known length
if (contentLength != null && responseBodybbaos == null && config.parseResults()) {
responseBodybbaos = new BigByteArrayOutputStream(contentLength);
if (contentLength != null && responseBody == null && config.parseResults()) {
responseBody = new BigByteArrayOutputStream(contentLength);
}
}

Expand Down Expand Up @@ -412,10 +400,10 @@ protected HttpExecutionResult buildResult() {
Long contentLength = contentLengthHeader != null ? Long.parseLong(contentLengthHeader.getValue()) : null;
if (contentLength != null) {
if ((!config.parseResults() && responseSize != contentLength) // if parseResults is false, the responseSize will be used
|| (config.parseResults() && responseBodybbaos.size() != contentLength)) { // if parseResults is true, the size of the bbaos will be used
if (responseSize != responseBodybbaos.size())
LOGGER.error("Error during copying the response data. (expected written data size = {}, actual written data size = {}, Content-Length-Header = {})", responseSize, responseBodybbaos.size(), contentLengthHeader.getValue());
final var exception = new HttpException(String.format("Content-Length header value doesn't match actual content length. (Content-Length-Header = %s, written data size = %s)", contentLength, config.parseResults() ? responseBodybbaos.size() : responseSize));
|| (config.parseResults() && responseBody.size() != contentLength)) { // if parseResults is true, the size of the bbaos will be used
if (responseSize != responseBody.size())
LOGGER.error("Error during copying the response data. (expected written data size = {}, actual written data size = {}, Content-Length-Header = {})", responseSize, responseBody.size(), contentLengthHeader.getValue());
final var exception = new HttpException(String.format("Content-Length header value doesn't match actual content length. (Content-Length-Header = %s, written data size = %s)", contentLength, config.parseResults() ? responseBody.size() : responseSize));
return createFailedResultDuringResponse(queryIndex, response, timeStamp, duration, exception);
}
}
Expand All @@ -431,8 +419,8 @@ protected HttpExecutionResult buildResult() {
Optional.of(response),
timeStamp,
Duration.ofNanos(responseEnd - requestStart),
Optional.of(responseBodybbaos),
OptionalLong.of(config.parseResults() ? responseBodybbaos.size() : responseSize),
Optional.of(responseBody),
OptionalLong.of(config.parseResults() ? responseBody.size() : responseSize),
OptionalLong.of(config.parseResults() ? hasher.getValue() : 0),
Optional.empty()
);
Expand All @@ -451,7 +439,7 @@ protected HttpExecutionResult buildResult() {
return createFailedResultBeforeRequest(queryIndex, e);
} catch (TimeoutException e) {
if (future.isDone()) {
LOGGER.warn("Request was already done after timeout.");
LOGGER.warn("Request finished immediately after timeout but will still be counted as timed out.");
try {
return future.get();
} catch (InterruptedException | ExecutionException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -22,7 +22,7 @@
* the stream is cleared, all the internal ByteArrayOutputStreams are cleared and a new one is
* added to the list.
*/
public class BigByteArrayOutputStream extends OutputStream {
public class BigByteArrayOutputStream extends ReversibleOutputStream {

/**
* The maximum size limit for an array. This is no limit to the amount of bytes {@code BigByteArrayOutputStream} can consume.
Expand Down Expand Up @@ -102,6 +102,7 @@ public void write(BigByteArrayOutputStream bbaos) throws IOException {
write(bbaos.toByteArray());
}

@Override
public long size() {
return baosList.stream().mapToLong(ByteArrayOutputStream::size).sum();
}
Expand Down Expand Up @@ -201,4 +202,9 @@ public void clear() throws IOException {
public void close() throws IOException {
this.closed = true;
}

@Override
public InputStream toInputStream() {
return new BigByteArrayInputStream(this);
}
}
Loading
Loading