Skip to content

Commit

Permalink
better timeout handling in the SPARQLProtocolWorker
Browse files Browse the repository at this point in the history
* SPARQLProtocolWorker now shuts down the httpClients executor if the task is still running after a timeout, for this the httpclient receives its own ExecutorService now
* the responseBodybbaos will now be reset when reused
* implements parseResults variable
  • Loading branch information
nck-mlcnv committed Sep 4, 2023
1 parent f68ba3f commit d745dc9
Showing 1 changed file with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ public boolean successful() {
}


private final HttpClient httpClient;
private final ExecutorService executor;
private HttpClient httpClient;
private final ThreadPoolExecutor executor;

private final XXHashFactory hasherFactory = XXHashFactory.fastestJavaInstance();
private final RequestFactory requestFactory;
Expand All @@ -178,22 +178,18 @@ public boolean successful() {
// used to read the http response body
private byte[] buffer = new byte[4096];


@Override
public Config config() {
return (Config) config;
return (SPARQLProtocolWorker.Config) config;
}


public SPARQLProtocolWorker(long workerId, ResponseBodyProcessor responseBodyProcessor, Config config) {
super(workerId, responseBodyProcessor, config);
this.responseBodyProcessor = responseBodyProcessor;
this.executor = Executors.newFixedThreadPool(2);
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
this.requestFactory = new RequestFactory(config().requestType());
this.httpClient = HttpClient.newBuilder()
.executor(this.executor)
.followRedirects(HttpClient.Redirect.ALWAYS)
.connectTimeout(config().timeout())
.build();
this.httpClient = buildHttpClient();
}


Expand Down Expand Up @@ -224,10 +220,10 @@ public CompletableFuture<Result> start() {
}
}
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
throw new RuntimeException(e); // TODO: better error handling
}

return new Result(this.workerId, executionStats);
return new Result(this.workerID, executionStats);
}, executor);
}

Expand All @@ -237,14 +233,15 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure)
if (result.response().isPresent())
statuscode = Optional.of(result.response().get().statusCode());

if (result.successful()) { // 2xx
if (result.successful() && this.config.parseResults()) { // 2xx
// process result
if (!responseBodyProcessor.add(result.actualContentLength().getAsLong(), result.hash().getAsLong(), result.outputStream().get())) {
this.responseBodybbaos = result.outputStream().get();
} else {
this.responseBodybbaos = new BigByteArrayOutputStream();
}
}
this.responseBodybbaos.reset();

if (!result.completed() && discardOnFailure) {
return null;
Expand All @@ -271,6 +268,11 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept
config().acceptHeader()
);

if (((ThreadPoolExecutor) this.httpClient.executor().get()).getActiveCount() != 0) {
((ThreadPoolExecutor) this.httpClient.executor().get()).shutdownNow();
this.httpClient = buildHttpClient();
}

final Instant requestStart = Instant.now();
BiFunction<HttpResponse<InputStream>, Exception, HttpExecutionResult> createFailedResult = (response, e) -> {
final Duration requestDuration = Duration.between(requestStart, Instant.now());
Expand All @@ -295,14 +297,17 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept
try (var hasher = hasherFactory.newStreamingHash64(0)) {
int readBytes;
while ((readBytes = bodyStream.readNBytes(this.buffer, 0, this.buffer.length)) != 0) {
if (Duration.between(requestStart, requestStart.plus(timeout)).isNegative()) {
return createFailedResult.apply(httpResponse, new TimeoutException());
}
hasher.update(this.buffer, 0, readBytes);
this.responseBodybbaos.write(this.buffer, 0, readBytes);
}

if (contentLength.isPresent() &&
(this.responseBodybbaos.size() < contentLength.getAsLong() ||
this.responseBodybbaos.size() > contentLength.getAsLong())) {
return createFailedResult.apply(httpResponse, null); // TODO: custom exception maybe?
return createFailedResult.apply(httpResponse, new ProtocolException("Content-Length header value doesn't match actual content length."));
}

return new HttpExecutionResult(
Expand All @@ -328,4 +333,12 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept
return createFailedResult.apply(null, e);
}
}

private HttpClient buildHttpClient() {
return HttpClient.newBuilder()
.executor(Executors.newFixedThreadPool(1))
.followRedirects(HttpClient.Redirect.ALWAYS)
.connectTimeout(config().timeout())
.build();
}
}

0 comments on commit d745dc9

Please sign in to comment.