Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #39

Merged
merged 9 commits into from
Jul 25, 2018
15 changes: 13 additions & 2 deletions src/main/java/org/hobbit/benchmark/versioning/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ public class Task implements Serializable {
private String query;
private int queryType;
private int querySubType;
private int querySubstitutionParam;
private byte[] expectedAnswers;

public Task(int queryType, int querySubType, String id, String query, byte[] expectedAnswers) {
public Task(int queryType, int querySubType, int querySubParam, String id, String query, byte[] expectedAnswers) {
this.queryType = queryType;
this.querySubType = querySubType;
this.querySubstitutionParam = querySubParam;
this.taskId = id;
this.query = query;
this.expectedAnswers = expectedAnswers;
Expand Down Expand Up @@ -53,6 +55,14 @@ public int getQueryType() {
return this.queryType;
}

public void setQuerySubstitutionParam(int querySubParam) {
this.querySubstitutionParam = querySubParam;
}

public int getQuerySubstitutionParam() {
return this.querySubstitutionParam;
}

public void setQuerySubType(int queryType) {
this.querySubType = queryType;
}
Expand All @@ -64,9 +74,10 @@ public int getQuerySubType() {
// the results are preceded by the query type as this information required
// by the evaluation module.
public void setExpectedAnswers(byte[] res) {
ByteBuffer buffer = ByteBuffer.allocate(8);
ByteBuffer buffer = ByteBuffer.allocate(12);
buffer.putInt(queryType);
buffer.putInt(querySubType);
buffer.putInt(querySubstitutionParam);
this.expectedAnswers = RabbitMQUtils.writeByteArrays(buffer.array(), new byte[][]{res}, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.commons.lang3.SerializationUtils;
import org.apache.jena.query.QueryExecution;
import org.apache.jena.query.QueryExecutionFactory;
import org.apache.jena.query.ResultSet;
import org.apache.jena.query.ResultSetFactory;
import org.apache.jena.query.ResultSetFormatter;
import org.apache.jena.query.ResultSetRewindable;
Expand Down Expand Up @@ -134,10 +133,6 @@ public void init() throws Exception {
// Evenly distribute the 5 dbpedia versions to the total number of versions that were generated
distributeDBpediaVersions();

LOGGER.info("triplesExpectedToBeLoaded after dbpedia: " + Arrays.toString(this.triplesExpectedToBeLoaded));
LOGGER.info("triplesExpectedToBeAdded after dbpedia: " + Arrays.toString(this.triplesExpectedToBeAdded));
LOGGER.info("triplesExpectedToBeDeleted after dbpedia: " + Arrays.toString(this.triplesExpectedToBeDeleted));

String configurationFile = System.getProperty("user.dir") + File.separator + "test.properties";
String definitionsFile = System.getProperty("user.dir") + File.separator + "definitions.properties";
String dictionaryFile = System.getProperty("user.dir") + File.separator + "WordsDictionary.txt";
Expand Down Expand Up @@ -305,7 +300,8 @@ public void init() throws Exception {
LOGGER.info("Building SPRQL tasks...");
buildSPRQLQueries();
LOGGER.info("All SPRQL tasks built successfully.");

// if(!allQueriesDisabled) {
sendAllToFTP(true);
// load generated data in order to compute the expected answers
loadFirstNVersions(numberOfVersions);

Expand All @@ -314,7 +310,11 @@ public void init() throws Exception {
computeExpectedAnswers();

LOGGER.info("Expected answers have computed successfully for all generated SPRQL tasks.");
}
}

LOGGER.info("Sending generated data, queries and expected answers to FTP server...");
sendAllToFTP(true);

LOGGER.info("Data Generator initialized successfully.");
}

Expand Down Expand Up @@ -593,7 +593,7 @@ public void computeExpectedAnswers() {
ResultSetFormatter.outputAsJSON(outputStream, results);
//debug
LOGGER.info("Expected answers for task " + task.getTaskId() + " computed"
+ ". Type: " + task.getQueryType() + "." + task.getQuerySubType()
+ ". Type: " + task.getQueryType() + "." + task.getQuerySubType() + "." + task.getQuerySubstitutionParam()
+ ", ResultsNum: " + results.size()
+ ", Time: " + (queryEnd - queryStart) + " ms.");

Expand All @@ -612,7 +612,7 @@ public void buildSPRQLQueries() {
// QT1
if(enabledQueryTypes.getProperty("QT1", "0").equals("1")) {
queryString = compileMustacheTemplate(1, queryIndex, 0);
tasks.add(new Task(1, 1, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(1, 1, 1, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery1.1.1.sparql"), queryString);
} catch (IOException e) {
Expand All @@ -627,7 +627,7 @@ public void buildSPRQLQueries() {
for (int querySubType = 1; querySubType <= Statistics.VERSIONING_SUB_QUERIES_COUNT; querySubType++) {
for(int querySubstParam = 1; querySubstParam <= querySubstParamCount; querySubstParam++) {
queryString = compileMustacheTemplate(2, queryIndex, querySubstParam);
tasks.add(new Task(2, querySubType, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(2, querySubType, querySubstParam, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery2." + querySubType + "." + querySubstParam + ".sparql"), queryString);
} catch (IOException e) {
Expand All @@ -650,7 +650,7 @@ public void buildSPRQLQueries() {
querySubstParamCount = 3;
for(int querySubstParam = 1; querySubstParam <= querySubstParamCount && querySubstParam < numberOfVersions ; querySubstParam++) {
queryString = compileMustacheTemplate(3, queryIndex, querySubstParam);
tasks.add(new Task(3, 1, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(3, 1, querySubstParam, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery3.1." + querySubstParam + ".sparql"), queryString);
} catch (IOException e) {
Expand All @@ -668,7 +668,7 @@ public void buildSPRQLQueries() {
// if there are less than four versions take all the historical ones
for(int querySubstParam = 1; querySubstParam <= querySubstParamCount && querySubstParam < numberOfVersions; querySubstParam++) {
queryString = compileMustacheTemplate(4, queryIndex, querySubstParam);
tasks.add(new Task(4, querySubType, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(4, querySubType, querySubstParam, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery4." + querySubType + "." + querySubstParam + ".sparql"), queryString);
} catch (IOException e) {
Expand All @@ -692,7 +692,7 @@ public void buildSPRQLQueries() {
if(enabledQueryTypes.getProperty("QT5", "0").equals("1")) {
for(int querySubstParam = 1; querySubstParam <= querySubstParamCount; querySubstParam++) {
queryString = compileMustacheTemplate(5, queryIndex, querySubstParam);
tasks.add(new Task(5, 1, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(5, 1, querySubstParam, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery5.1." + querySubstParam + ".sparql"), queryString);
} catch (IOException e) {
Expand All @@ -707,7 +707,7 @@ public void buildSPRQLQueries() {
if(enabledQueryTypes.getProperty("QT6", "0").equals("1")) {
for(int querySubstParam = 1; querySubstParam <= querySubstParamCount; querySubstParam++) {
queryString = compileMustacheTemplate(6, queryIndex, querySubstParam);
tasks.add(new Task(6, 1, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(6, 1, querySubstParam, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery6.1." + querySubstParam + ".sparql"), queryString);
} catch (IOException e) {
Expand All @@ -723,7 +723,7 @@ public void buildSPRQLQueries() {
querySubstParamCount = 3;
for(int querySubstParam = 1; querySubstParam <= querySubstParamCount && querySubstParam < numberOfVersions - 1; querySubstParam++) {
queryString = compileMustacheTemplate(7, queryIndex, querySubstParam);
tasks.add(new Task(7, 1, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(7, 1, querySubstParam, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery7.1." + querySubstParam + ".sparql"), queryString);
} catch (IOException e) {
Expand All @@ -747,7 +747,7 @@ public void buildSPRQLQueries() {
for (int querySubType = 1; querySubType <= Statistics.VERSIONING_SUB_QUERIES_COUNT; querySubType++) {
for(int querySubstParam = 1; querySubstParam <= querySubstParamCount; querySubstParam++) {
queryString = compileMustacheTemplate(8, queryIndex, querySubstParam);
tasks.add(new Task(8, querySubType, Integer.toString(taskId++), queryString, null));
tasks.add(new Task(8, querySubType, querySubstParam, Integer.toString(taskId++), queryString, null));
try {
FileUtils.writeStringToFile(new File(queriesPath + "versioningQuery8." + querySubType + "." + querySubstParam + ".sparql"), queryString);
} catch (IOException e) {
Expand Down Expand Up @@ -1193,58 +1193,36 @@ public void writeResults() {
String resultsPath = System.getProperty("user.dir") + File.separator + "results";
File resultsDir = new File(resultsPath);
resultsDir.mkdirs();
// skip current version materialization query
int taskId = 0;

// mind the non zero-based numbering of query types
for (int queryType = 0; queryType < Statistics.VERSIONING_QUERIES_COUNT; queryType++) {
if (Arrays.asList(1,3,7).contains(queryType)) {
for (int querySubType = 0; querySubType < Statistics.VERSIONING_SUB_QUERIES_COUNT; querySubType++) {
for (int querySubstParam = 0; querySubstParam < subsParametersAmount; querySubstParam++) {
ByteBuffer expectedResultsBuffer = ByteBuffer.wrap(tasks.get(taskId++).getExpectedAnswers());
expectedResultsBuffer.getInt();
expectedResultsBuffer.getInt();
byte expectedDataBytes[] = RabbitMQUtils.readByteArray(expectedResultsBuffer);
try {
FileUtils.writeByteArrayToFile(new File(resultsDir + File.separator + "versionigQuery" + (queryType + 1) + "." + (querySubType + 1) + "." + (querySubstParam + 1) + "_results.json"), expectedDataBytes);
} catch (IOException e) {
LOGGER.error("Exception caught during saving of expected results: ", e);
}
}
}
continue;
}
for (int querySubstParam = 0; querySubstParam < subsParametersAmount; querySubstParam++) {
ByteBuffer expectedResultsBuffer = ByteBuffer.wrap(tasks.get(taskId++).getExpectedAnswers());
expectedResultsBuffer.getInt();
expectedResultsBuffer.getInt();
byte expectedDataBytes[] = RabbitMQUtils.readByteArray(expectedResultsBuffer);
try {
FileUtils.writeByteArrayToFile(new File(resultsDir + File.separator + "versionigQuery" + (queryType + 1) + ".1." + (querySubstParam + 1) + "_results.json"), expectedDataBytes);
} catch (IOException e) {
LOGGER.error("Exception caught during saving of expected results : ", e);
}
if (queryType == 0) break;
for (Task task : tasks) {
ByteBuffer expectedResultsBuffer = ByteBuffer.wrap(task.getExpectedAnswers());
int queryType = expectedResultsBuffer.getInt();
int querySubType = expectedResultsBuffer.getInt();
int querySubstParam = expectedResultsBuffer.getInt();
byte expectedDataBytes[] = RabbitMQUtils.readByteArray(expectedResultsBuffer);
try {
FileUtils.writeByteArrayToFile(new File(resultsDir + File.separator + "versionigQuery" + queryType + "." + querySubType + "." + querySubstParam + "_results.json"), expectedDataBytes);
} catch (IOException e) {
LOGGER.error("Exception caught during saving of expected results: ", e);
}
}
}

public void sendAllToFTP() {
writeResults();
FTPUtils.sendToFtp("/versioning/data/v0/", "public/SPVB-LS/test/data/changesets/c0", "nt");
FTPUtils.sendToFtp("/versioning/data/c1/", "public/SPVB-LS/test/data/changesets/c1", "nt");
FTPUtils.sendToFtp("/versioning/data/c2/", "public/SPVB-LS/test/data/changesets/c2", "nt");
FTPUtils.sendToFtp("/versioning/data/c3/", "public/SPVB-LS/test/data/changesets/c3", "nt");
FTPUtils.sendToFtp("/versioning/data/c4/", "public/SPVB-LS/test/data/changesets/c4", "nt");
FTPUtils.sendToFtp("/versioning/data/final/v0/", "public/SPVB-LS/test/data/independentcopies/v0", "nt");
FTPUtils.sendToFtp("/versioning/data/final/v1/", "public/SPVB-LS/test/data/independentcopies/v1", "nt");
FTPUtils.sendToFtp("/versioning/data/final/v2/", "public/SPVB-LS/test/data/independentcopies/v2", "nt");
FTPUtils.sendToFtp("/versioning/data/final/v3/", "public/SPVB-LS/test/data/independentcopies/v3", "nt");
FTPUtils.sendToFtp("/versioning/data/final/v4/", "public/SPVB-LS/test/data/independentcopies/v4", "nt");
FTPUtils.sendToFtp("/versioning/queries/", "public/SPVB-LS/test/queries", "sparql");
FTPUtils.sendToFtp("/versioning/query_templates/", "public/SPVB-LS/test/query_templates", "txt");
FTPUtils.sendToFtp("/versioning/substitution_parameters/", "public/SPVB-LS/test/substitution_parameters", "txt");
FTPUtils.sendToFtp("/versioning/results/", "public/SPVB-LS/test/expected_results", "json");
public void sendAllToFTP(boolean proceed) {
if (!proceed) {
return;
}
// writeResults();
String datasetType = "1m" + numberOfVersions + "v";
for(int versionNum = 0; versionNum < numberOfVersions; versionNum++) {
FTPUtils.sendToFtp("/versioning/data/" + (versionNum == 0 ? "v" : "c") + versionNum + "/", "public/SPVB-LS/" + datasetType + "/data/changesets/c" + versionNum, "nt");
FTPUtils.sendToFtp("/versioning/data/final/v" + versionNum + "/", "public/SPVB-LS/" + datasetType + "/data/independentcopies/v" + versionNum, "nt");

}
FTPUtils.sendToFtp("/versioning/queries/", "public/SPVB-LS/" + datasetType + "/queries", "sparql");
// FTPUtils.sendToFtp("/versioning/query_templates/", "public/SPVB-LS/" + datasetType + "/query_templates", "txt");
// FTPUtils.sendToFtp("/versioning/substitution_parameters/", "public/SPVB-LS/" + datasetType + "/substitution_parameters", "txt");
// FTPUtils.sendToFtp("/versioning/results/", "public/SPVB-LS/" + datasetType + "/results", "json");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ protected void evaluateResponse(byte[] expectedData, byte[] receivedData, long t
// get the query type
int queryType = expectedBuffer.getInt();
int querySubType = expectedBuffer.getInt();
int querySustitutionParam = expectedBuffer.getInt();


// get the expected results
Expand Down Expand Up @@ -199,7 +200,7 @@ protected void evaluateResponse(byte[] expectedData, byte[] receivedData, long t
}
break;
}
LOGGER.info((resultSetsEqual ? "[SUCCESS]" : "[FAIL]") + " - Task type: " + queryType + "." + querySubType + " executed in " + (responseReceivedTimestamp - taskSentTimestamp) + " ms and returned " + received.size() + "/" + expected.size() + " results.");
LOGGER.info((resultSetsEqual ? "[SUCCESS]" : "[FAIL]") + " - Task type: " + queryType + "." + querySubType + "." + querySustitutionParam + " executed in " + (responseReceivedTimestamp - taskSentTimestamp) + " ms and returned " + received.size() + "/" + expected.size() + " results.");
}

private void computeTotalFailures() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ protected void generateTask(byte[] data) {
sendTaskToEvalStorage(taskId, timestamp, task.getExpectedAnswers());

LOGGER.info("Expected answers of task " + taskId + " sent to Evaluation Storage.");

} catch (Exception e) {
LOGGER.error("Exception caught while reading the tasks and their expected answers", e);
}
Expand Down