Skip to content

Commit

Permalink
Make linearly executing workers start at different positions (#286)
Browse files Browse the repository at this point in the history
* Update LinearQuerySelector to also accept starting index

* Fix minor bug concerning failed results

* Update QueryHandler and Stresstest to give each worker a starting index when executing queries linearly

* Fix the initialization of the query handlers

* Add simple test

* Remove unused variable

* Add comment back about LinearQuerySelector indexing

Closes #193
  • Loading branch information
nck-mlcnv authored Oct 11, 2024
1 parent a4f755f commit 3b0633b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public record QueryStreamWrapper(int index, boolean cached, Supplier<InputStream
final protected QueryList queryList;

private int workerCount = 0; // give every worker inside the same worker config an offset seed
private int totalWorkerCount = 0;

final protected int hashCode;

Expand Down Expand Up @@ -186,6 +187,10 @@ public QueryHandler(Config config) throws IOException {
this.hashCode = queryList.hashCode();
}

public void setTotalWorkerCount(int workers) {
this.totalWorkerCount = workers;
}

private QueryList initializeTemplateQueryHandler(QuerySource templateSource) throws IOException {
QuerySource querySource = templateSource;
final var originalPath = templateSource.getPath();
Expand Down Expand Up @@ -238,7 +243,7 @@ private QuerySource createQuerySource(Path path) throws IOException {

public QuerySelector getQuerySelectorInstance() {
switch (config.order()) {
case LINEAR -> { return new LinearQuerySelector(queryList.size()); }
case LINEAR -> { return new LinearQuerySelector(queryList.size(), totalWorkerCount != 0 ? (queryList.size() * workerCount++) / totalWorkerCount : 0); }
case RANDOM -> { return new RandomQuerySelector(queryList.size(), config.seed() + workerCount++); }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,30 @@ public class LinearQuerySelector extends QuerySelector {

public LinearQuerySelector(int size) {
super(size);
index = -1;
index = 0;
}

public LinearQuerySelector(int size, int startIndex) {
super(size);
index = startIndex;
}

@Override
public int getNextIndex() {
index++;
if (index >= this.size) {
index = 0;
}
return index;
return index++;
}

/**
* Return the current index. This is the index of the last returned query. If no query was returned yet, it returns
* -1.
* @return
* Return the current index. This is the index of the last returned query.
* If no query was returned yet, the method will return -1.
*
* @return the current index
*/
@Override
public int getCurrentIndex() {
return index;
return index - 1;
}
}
22 changes: 20 additions & 2 deletions src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.aksw.iguana.cc.metrics.Metric;
import org.aksw.iguana.cc.query.handler.QueryHandler;
import org.aksw.iguana.cc.storage.Storage;
import org.aksw.iguana.cc.tasks.Task;
import org.aksw.iguana.cc.worker.HttpWorker;
Expand Down Expand Up @@ -44,6 +45,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody

// initialize workers
if (config.warmupWorkers() != null) {
// initialize query handlers
// count the number of workers for each query handler
final var queryHandlers = config.warmupWorkers.stream().map(HttpWorker.Config::queries).distinct().toList();
queryHandlers.stream().map(qh1 ->
List.of(qh1, config.warmupWorkers.stream()
.map(HttpWorker.Config::queries)
.filter(qh1::equals)
.count()))
.forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) (long) list.get(1)));
long workerId = 0;
for (HttpWorker.Config workerConfig : config.warmupWorkers()) {
for (int i = 0; i < workerConfig.number(); i++) {
Expand All @@ -54,6 +64,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody
}

for (HttpWorker.Config workerConfig : config.workers()) {
// initialize query handlers
// count the number of workers for each query handler
final var queryHandlers = config.workers.stream().map(HttpWorker.Config::queries).distinct().toList();
queryHandlers.stream().map(qh1 ->
List.of(qh1, config.workers.stream()
.filter(w -> w.queries().equals(qh1))
.mapToInt(HttpWorker.Config::number)
.sum()))
.forEach(list -> ((QueryHandler) list.get(0)).setTotalWorkerCount((int) list.get(1)));
long workerId = 0;
for (int i = 0; i < workerConfig.number(); i++) {
var responseBodyProcessor = (workerConfig.parseResults()) ? responseBodyProcessorInstances.getProcessor(workerConfig.acceptHeader()) : null;
Expand Down Expand Up @@ -83,10 +102,9 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody
public void run() {
if (!warmupWorkers.isEmpty()) {
SPARQLProtocolWorker.initHttpClient(warmupWorkers.size());
var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed
executeWorkers(warmupWorkers); // warmup results will be dismissed
SPARQLProtocolWorker.closeHttpClient();
}

SPARQLProtocolWorker.initHttpClient(workers.size());
var results = executeWorkers(workers);
SPARQLProtocolWorker.closeHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) {
try {
request = requestFactory.buildHttpRequest(queryHandle);
} catch (IOException | URISyntaxException e) {
return createFailedResultBeforeRequest(config.queries().getQuerySelectorInstance().getCurrentIndex(), e);
return createFailedResultBeforeRequest(querySelector.getCurrentIndex(), e);
}

// execute the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,18 @@ public void ThrowOnLinearQuerySelectorSizeZero() {
final var size = 0;
assertThrows(IllegalArgumentException.class, () -> new LinearQuerySelector(size));
}

@Test
public void testStartingIndex() {
final var size = 5;
final var startIndex = 3;
final var linearQuerySelector = new LinearQuerySelector(size, startIndex);
// -1, because the next index hasn't been requested yet
assertEquals(startIndex - 1, linearQuerySelector.getCurrentIndex());
for (int i = 0; i < 10; i++) {
int currentIndex = linearQuerySelector.getNextIndex();
assertEquals((i + startIndex) % size, currentIndex);
assertEquals(currentIndex, linearQuerySelector.getCurrentIndex());
}
}
}

0 comments on commit 3b0633b

Please sign in to comment.