Skip to content

Commit

Permalink
Apache HTTP Client 5 implementation (#243)
Browse files Browse the repository at this point in the history
* Add more logging messages

* Fix log4j2 configuration

* Implement apache HTTP client

* Implement apache HTTP async client 5

* Fix timeout

* Fixes

* Fix hashing bug

* Fix conversion of byte stream to string

* Implement POST request streaming

* Disable the storing and hashing of responses when the parseResults parameter in the config is false

* Move utility classes

* StreamEntityProducer can send fixed-sized data and is reproducible now

* Make QueryHandler return stream supplier and info about query being cached

* Change RequestFactory behavior

* cached queries will be sent with fixed-sizes request
* requests of cached queries will be cached as well (addresses #223)

* Cleanup

* Preload requests

* Fix IDE warnings

* Fix tests

* Remove unneeded test class

* Add Javadocs

* Change requests

* Move the RequestFactory to a separate class and add comments

* Add comments from overridden methods

* Lower maximum capacity while reading response
  • Loading branch information
nck-mlcnv authored Feb 23, 2024
1 parent 9c293d6 commit a92d239
Show file tree
Hide file tree
Showing 20 changed files with 730 additions and 421 deletions.
22 changes: 6 additions & 16 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
Expand All @@ -123,12 +113,6 @@
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down Expand Up @@ -176,6 +160,12 @@
<artifactId>spring-context</artifactId>
<version>6.0.11</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3</version>
</dependency>

</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.beust.jcommander.*;
import org.aksw.iguana.cc.suite.IguanaSuiteParser;
import org.aksw.iguana.cc.suite.Suite;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;


Expand Down Expand Up @@ -44,6 +46,10 @@ public Path convert(String value) {
* @param argc The command line arguments that are passed to the program.
*/
public static void main(String[] argc) {
// Apparently, there is something weird going on, where the apache jena library already configures log4j2 for
// some reason. That's why you have to call reconfigure here.
Configurator.reconfigure(URI.create("log4j2.yml"));

var args = new Args();
JCommander jc = JCommander.newBuilder()
.addObject(args)
Expand Down
21 changes: 18 additions & 3 deletions src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.aksw.iguana.cc.query.selector.QuerySelector;
Expand All @@ -23,6 +22,7 @@
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Objects;
import java.util.function.Supplier;

/**
* The QueryHandler is used by every worker that extends the AbstractWorker.
Expand Down Expand Up @@ -124,7 +124,7 @@ public String value() {
}

public record QueryStringWrapper(int index, String query) {}
public record QueryStreamWrapper(int index, InputStream queryInputStream) {}
public record QueryStreamWrapper(int index, boolean cached, Supplier<InputStream> queryInputStreamSupplier) {}


protected final Logger LOGGER = LoggerFactory.getLogger(QueryHandler.class);
Expand Down Expand Up @@ -180,7 +180,13 @@ public QueryStringWrapper getNextQuery(QuerySelector querySelector) throws IOExc

public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) throws IOException {
final var queryIndex = querySelector.getNextIndex();
return new QueryStreamWrapper(queryIndex, this.queryList.getQueryStream(queryIndex));
return new QueryStreamWrapper(queryIndex, config.caching(), () -> {
try {
return this.queryList.getQueryStream(queryIndex);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Override
Expand Down Expand Up @@ -209,4 +215,13 @@ public String[] getAllQueryIds() {
}
return out;
}

/**
* Returns the configuration of the QueryHandler.
*
* @return the configuration of the QueryHandler
*/
public Config getConfig() {
return config;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.aksw.iguana.cc.query.source;

import org.aksw.iguana.cc.utils.FileUtils;
import org.aksw.iguana.cc.utils.files.FileUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.aksw.iguana.cc.query.source.impl;

import org.aksw.iguana.cc.utils.FileUtils;
import org.aksw.iguana.cc.utils.files.FileUtils;

import java.io.IOException;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.aksw.iguana.cc.query.source.impl;

import org.aksw.iguana.cc.query.source.QuerySource;
import org.aksw.iguana.cc.utils.IndexedQueryReader;
import org.aksw.iguana.cc.utils.files.IndexedQueryReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.aksw.iguana.cc.query.source.impl;

import org.aksw.iguana.cc.query.source.QuerySource;
import org.aksw.iguana.cc.utils.FileUtils;
import org.aksw.iguana.cc.utils.files.FileUtils;
import org.apache.commons.io.input.AutoCloseInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/aksw/iguana/cc/suite/Suite.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ else if (storageConfig instanceof RDFFileStorage.Config) {

public void run() {
for (int i = 0; i < tasks.size(); i++) {
LOGGER.info("Task/{} {} starting.", tasks.get(i).getTaskName(), i);
tasks.get(i).run();
LOGGER.info("Task/{} {} finished.", tasks.get(i).getTaskName(), i);
}
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody
}

public void run() {
var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed
if (!warmupWorkers.isEmpty()) {
SPARQLProtocolWorker.initHttpClient(warmupWorkers.size());
var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed
SPARQLProtocolWorker.closeHttpClient();
}

SPARQLProtocolWorker.initHttpClient(workers.size());
var results = executeWorkers(workers);
SPARQLProtocolWorker.closeHttpClient();

srp.process(results.workerResults);
srp.calculateAndSaveMetrics(results.startTime, results.endTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aksw.iguana.cc.utils;
package org.aksw.iguana.cc.utils.files;

import java.io.*;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aksw.iguana.cc.utils;
package org.aksw.iguana.cc.utils.files;

import org.apache.commons.io.input.AutoCloseInputStream;
import org.apache.commons.io.input.BoundedInputStream;
Expand Down
144 changes: 144 additions & 0 deletions src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.aksw.iguana.cc.utils.http;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.aksw.iguana.cc.config.elements.ConnectionConfig;
import org.aksw.iguana.cc.query.handler.QueryHandler;
import org.aksw.iguana.cc.worker.HttpWorker;
import org.aksw.iguana.cc.worker.impl.SPARQLProtocolWorker;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import org.apache.hc.core5.net.URIBuilder;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* A factory for creating HTTP requests.
* The factory can create requests for different types of HTTP methods and different types of SPARQL queries.
* The factory can also cache requests to avoid creating the same request multiple times.
*/
public final class RequestFactory {
public enum RequestType {
GET_QUERY("get query"),
POST_URL_ENC_QUERY("post url-enc query"),
POST_QUERY("post query"),
POST_URL_ENC_UPDATE("post url-enc update"),
POST_UPDATE("post update");

private final String value;

@JsonCreator
RequestType(String value) {
this.value = Objects.requireNonNullElse(value, "get query");
}

@JsonValue
public String value() {
return value;
}
}

private final RequestType requestType;
private final Map<Integer, AsyncRequestProducer> cache = new HashMap<>();

public RequestFactory(RequestType requestType) {
this.requestType = requestType;
}

private static String urlEncode(List<String[]> parameters) {
return parameters.stream()
.map(e -> e[0] + "=" + URLEncoder.encode(e[1], StandardCharsets.UTF_8))
.collect(Collectors.joining("&"));
}

private static String urlEncode(String name, String value) {
return name + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
}

/**
* Builds an HTTP request for a given query.
* If the query has been cached by the query handler, its content will be fully read by the entity producer into a
* byte buffer, which will then be reused on consecutive request executions.
* Cached requests will be sent non-chunked.
* If the query has not been cached by the query handler, the entity producer will use the query stream supplier to
* send the query in chunks.
*
* @param queryHandle the query handle containing the query and its index
* @param connection the connection to send the request to
* @param requestHeader the request header
* @return the request as an AsyncRequestProducer
* @throws URISyntaxException if the URI is invalid
* @throws IOException if the query stream cannot be read
*/
public AsyncRequestProducer buildHttpRequest(QueryHandler.QueryStreamWrapper queryHandle,
ConnectionConfig connection,
String requestHeader) throws URISyntaxException, IOException {
if (queryHandle.cached() && cache.containsKey(queryHandle.index()))
return cache.get(queryHandle.index());

AsyncRequestBuilder asyncRequestBuilder;
Supplier<InputStream> queryStreamSupplier;
InputStream queryStream;

try {
queryStreamSupplier = queryHandle.queryInputStreamSupplier();
queryStream = queryStreamSupplier.get();
} catch (RuntimeException e) {
throw new IOException(e);
}

switch (this.requestType) {
case GET_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.get(new URIBuilder(connection.endpoint())
.addParameter("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8))
.build()
);
case POST_URL_ENC_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
// manually set content type, because otherwise the
// entity producer would set it to "application/x-www-form-urlencoded; charset=ISO-8859-1"
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new BasicAsyncEntityProducer(urlEncode("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, !queryHandle.cached()));
case POST_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached(), "application/sparql-query"));
case POST_URL_ENC_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new BasicAsyncEntityProducer(urlEncode("update", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, !queryHandle.cached()));
case POST_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached(), "application/sparql-update"));
default -> throw new IllegalStateException("Unexpected value: " + this.requestType);
}

if (requestHeader != null)
asyncRequestBuilder.addHeader("Accept", requestHeader);
if (connection.authentication() != null && connection.authentication().user() != null)
asyncRequestBuilder.addHeader("Authorization",
HttpWorker.basicAuth(connection.authentication().user(),
Optional.ofNullable(connection.authentication().password()).orElse("")));

if (queryHandle.cached())
cache.put(queryHandle.index(), asyncRequestBuilder.build());

return asyncRequestBuilder.build();
}

/**
* Get a cached request by the index of the query.
* If the request is not in the cache, an IllegalArgumentException is thrown.
*
* @param index the index of the query
* @return the request as an AsyncRequestProducer
*/
public AsyncRequestProducer getCachedRequest(int index) {
if (!cache.containsKey(index))
throw new IllegalArgumentException("No request with index " + index + " found in cache.");
return cache.get(index);
}
}
Loading

0 comments on commit a92d239

Please sign in to comment.