Skip to content

Commit

Permalink
Merge pull request #8 from hobbit-project/prl-delete_spbv2.0
Browse files Browse the repository at this point in the history
paralelly produce deleteset
  • Loading branch information
vpapako authored Dec 13, 2017
2 parents dae2e07 + 62f252c commit f051ac3
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 67 deletions.
24 changes: 4 additions & 20 deletions required_files/export_cws_tbd.sh
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
#!/bin/bash

CURR_VERSION=$1
CURR_VERSION_FILE=$1
CW_TBD=$2
DESTINATION_PATH=$3

export_deleted() {
filename=$(basename "$1")
filename="${filename%added.*}"
tbd=$(grep -F -f $CW_TBD $1)
lines=$(echo "${tbd}" | wc -l)
if [[ "$lines" > "1" ]]; then
printf "%s\n" "${tbd}" >> $DESTINATION_PATH/$filename"deleted.nt"
echo $lines
fi
}

# do it parallely in the background for improving performance
for ((i=0; i<$CURR_VERSION; i++)); do
SOURCE_PATH=$([ "$i" = "0" ] && echo $4"/v"$i || echo $4"/c"$i)
for f in $SOURCE_PATH/generatedCreativeWorks*.added.nt; do
export_deleted $f &
done
done
wait
filename=$(basename "$CURR_VERSION_FILE")
filename="${filename%added.*}"
grep -F -f $CW_TBD $CURR_VERSION_FILE >> $DESTINATION_PATH/$filename"deleted.nt"
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryExecution;
Expand All @@ -37,8 +42,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.io.Files;

import eu.ldbc.semanticpublishing.generators.data.DataGenerator;
import eu.ldbc.semanticpublishing.properties.Configuration;
import eu.ldbc.semanticpublishing.properties.Definitions;
Expand Down Expand Up @@ -164,55 +167,92 @@ public void init() throws Exception {
dataGenerator.produceData();
cwsToBeLoaded[0] = v0SizeInTriples;


// Generate the change sets. Only additions/deletions are supported.
// TODO: support changes
int preVersionDeletedCWs = 0;
long changeSetStart = System.currentTimeMillis();
for(int i = 1; i < numberOfVersions; i++) {
String destinationPath = generatedDatasetPath + File.separator + "c" + i;
File theDir = new File(destinationPath);
theDir.mkdir();

int triplesToBeAdded = Math.round(versionInsertionRatio / 100f * cwsToBeLoaded[i-1]);
int triplesToBeDeleted = Math.round(versionDeletionRatio / 100f * cwsToBeLoaded[i-1]);
cwsToBeLoaded[i] = cwsToBeLoaded[i-1] + triplesToBeAdded - triplesToBeDeleted;
LOGGER.info("Generating version " + i + " changeset. Target: " + "[+" + triplesToBeAdded + ", -" + triplesToBeDeleted + "]");
String destinationPath = generatedDatasetPath + File.separator + "c" + i;

// produce the add set
LOGGER.info("Generating version " + i + " add-set.");
dataGenerator.produceAdded(destinationPath, triplesToBeAdded);

LOGGER.info("Generating version " + i + " changeset. Target: " + "[+" + String.format(Locale.US, "%,d", triplesToBeAdded).replace(',', '.') + " , -" + String.format(Locale.US, "%,d", triplesToBeDeleted).replace(',', '.') + "]");

// produce the delete set
LOGGER.info("Generating version " + i + " delete-set.");
long deleteSetStart = System.currentTimeMillis();
int currVersionDeletedCreativeWorks = 0;
int currVersionDeletedTriples = 0;
int creativeWorkAvgTriples = DataManager.randomTriples.intValue() / DataManager.randomCreativeWorkIdsList.size();

// Estimate the total number of creative works that have to be deleted, using
// creative work average triples that have been generated so far.
int creativeWorksToBeDeleted = triplesToBeDeleted / creativeWorkAvgTriples;
LOGGER.info(creativeWorksToBeDeleted + " cworks estimated that have to be deleted from v" + (i - 1));
while (currVersionDeletedTriples < triplesToBeDeleted) {
ArrayList<String> cwToBeDeleted = new ArrayList<String>();
for(int c=0; c<creativeWorksToBeDeleted; c++) {
int deletedCWIndex = randomGenerator.nextInt(DataManager.remainingRandomCreativeWorkIdsList.size() - 1);
long creativeWorkToBeDeleted = DataManager.remainingRandomCreativeWorkIdsList.get(deletedCWIndex);
DataManager.remainingRandomCreativeWorkIdsList.remove(deletedCWIndex);
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkToBeDeleted + "#id");
int totalRandomTriplesSoFar = DataManager.randomCreativeWorkTriples.intValue();

ArrayList<String> cwToBeDeleted = new ArrayList<String>();

// if the number of triples that have to be deleted is larger than the already existing
// random-model ones, take all the random and choose from other data-models (correlations,
// major/minor events) as well
List<Long> randomCreativeWorkIds = new ArrayList<Long>(DataManager.randomCreativeWorkIdsList.keySet());
if(triplesToBeDeleted > totalRandomTriplesSoFar) {
LOGGER.info("Target of " + String.format(Locale.US, "%,d", triplesToBeDeleted).replace(',', '.') + " triples exceedes the already (random-model) existing ones (" + String.format(Locale.US, "%,d", totalRandomTriplesSoFar).replace(',', '.') + "). Will choose from clustering and correlation models as well.");
// take all the random
for (long creativeWorkId : randomCreativeWorkIds) {
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkId + "#id");
}
DataManager.randomCreativeWorkIdsList.clear();
DataManager.randomCreativeWorkTriples.set(0);
currVersionDeletedTriples = totalRandomTriplesSoFar;

// as delete-set target have not reached yet, choose the rest from correlations or major/minor
List<Long> corrExpCreativeWorkIds = new ArrayList<Long>(DataManager.corrExpCreativeWorkIdsList.keySet());
int corrExpTotalTriples = 0;
while (currVersionDeletedTriples < triplesToBeDeleted) {
int creativeWorkToBeDeletedIdx = randomGenerator.nextInt(corrExpCreativeWorkIds.size());
long creativeWorkToBeDeleted = corrExpCreativeWorkIds.get(creativeWorkToBeDeletedIdx);
currVersionDeletedTriples += DataManager.corrExpCreativeWorkIdsList.get(creativeWorkToBeDeleted);
corrExpTotalTriples += DataManager.corrExpCreativeWorkIdsList.get(creativeWorkToBeDeleted);
corrExpCreativeWorkIds.remove(creativeWorkToBeDeletedIdx);
DataManager.corrExpCreativeWorkIdsList.remove(creativeWorkToBeDeleted);
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkToBeDeleted + "#id");
}
// write down the creative work uris that are going to be deleted
FileUtils.writeLines(new File("/versioning/creativeWorksToBeDeleted.txt") , cwToBeDeleted, false);
// extract all triples that have to be deleted
currVersionDeletedTriples += extractDeleted(i, "/versioning/creativeWorksToBeDeleted.txt", destinationPath, generatedDatasetPath);
currVersionDeletedCreativeWorks += creativeWorksToBeDeleted;
// estimation of the remaining creative works that have to be extracted
creativeWorksToBeDeleted = (int) Math.ceil((double) (triplesToBeDeleted - currVersionDeletedTriples) / creativeWorkAvgTriples);
if(creativeWorksToBeDeleted > 0) {
LOGGER.info(creativeWorksToBeDeleted + " more cwork" + (creativeWorksToBeDeleted > 1 ? "s" : "") +" estimated that have to be deleted from v" + (i - 1));

// extract all triples that have to be deleted using multiple threads
long start = System.currentTimeMillis();
parallelyExtract(i, destinationPath);
long end = System.currentTimeMillis();
DataManager.corrExpCreativeWorkTriples.addAndGet(-corrExpTotalTriples);

currVersionDeletedCreativeWorks += cwToBeDeleted.size();
} else {
while (currVersionDeletedTriples < triplesToBeDeleted) {
int creativeWorkToBeDeletedIdx = randomGenerator.nextInt(randomCreativeWorkIds.size());
long creativeWorkToBeDeleted = randomCreativeWorkIds.get(creativeWorkToBeDeletedIdx);
currVersionDeletedTriples += DataManager.randomCreativeWorkIdsList.get(creativeWorkToBeDeleted);
randomCreativeWorkIds.remove(creativeWorkToBeDeletedIdx);
DataManager.randomCreativeWorkIdsList.remove(creativeWorkToBeDeleted);
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkToBeDeleted + "#id");
}
// write down the creative work uris that are going to be deleted
// in order to use it in grep -F -f
FileUtils.writeLines(new File("/versioning/creativeWorksToBeDeleted.txt") , cwToBeDeleted, false);

// extract all triples that have to be deleted using multiple threads
long start = System.currentTimeMillis();
parallelyExtract(i, destinationPath);
long end = System.currentTimeMillis();
DataManager.randomCreativeWorkTriples.addAndGet(-currVersionDeletedTriples);
currVersionDeletedCreativeWorks += cwToBeDeleted.size();
}
preVersionDeletedCWs = currVersionDeletedCreativeWorks;
long deleteSetEnd = System.currentTimeMillis();
LOGGER.info("Deleteset of total " + preVersionDeletedCWs + " Creative Works generated successfully. Triples: " + currVersionDeletedTriples + ". Target: " + triplesToBeDeleted + " triples. Time: " + (deleteSetEnd - deleteSetStart) + " ms.");
LOGGER.info("Deleteset of total " + String.format(Locale.US, "%,d", preVersionDeletedCWs).replace(',', '.') + " Creative Works generated successfully. Triples: " + String.format(Locale.US, "%,d", currVersionDeletedTriples).replace(',', '.') + " . Target: " + String.format(Locale.US, "%,d", triplesToBeDeleted).replace(',', '.') + " triples. Time: " + (deleteSetEnd - deleteSetStart) + " ms.");

// produce the add set
LOGGER.info("Generating version " + i + " add-set.");
dataGenerator.produceAdded(destinationPath, triplesToBeAdded);
}
long changeSetEnd = System.currentTimeMillis();
LOGGER.info("All changesets generated successfully. Time: " + (changeSetEnd - changeSetStart) + " ms.");
Expand Down Expand Up @@ -249,6 +289,45 @@ public void init() throws Exception {
LOGGER.info("Expected answers have computed successfully for all generated SPRQL tasks.");
}

public void parallelyExtract(int currVersion, String destinationPath) {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for(int j = 0; j < currVersion; j++) {
String sourcePath = generatedDatasetPath + File.separator + (j == 0 ? "v" : "c") + j + File.separator;
File sourcePathFile = new File(sourcePath);
List<File> previousVersionAddedFiles = (List<File>) FileUtils.listFiles(sourcePathFile, new RegexFileFilter("generatedCreativeWorks-[0-9]+-[0-9]+.added.nt"), null);
for (File f : previousVersionAddedFiles) {
executor.execute(new ExtractDeleted(f, "/versioning/creativeWorksToBeDeleted.txt", destinationPath));
}
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); // no timeout
} catch (InterruptedException e) {
LOGGER.error("Exception caught while awaiting termination...", e);
}
}

// class for implementing extraction of triples that have to be deleted concurrently
public static class ExtractDeleted implements Runnable {
private File file;
private String cwTBD;
private String destinationPath;

ExtractDeleted(File file, String cwTBD, String destinationPath) {
this.file = file;
this.cwTBD = cwTBD;
this.destinationPath = destinationPath;
}

public void run() {
try {
extractDeleted(file.getAbsolutePath(), cwTBD, destinationPath);
} catch (Exception e) {
LOGGER.error("Exception caught during the extraction of deleted triples from " + file, e);
}
}
}

public void initFromEnv() {
LOGGER.info("Getting Data Generator's properites from the environment...");

Expand Down Expand Up @@ -919,34 +998,25 @@ public void loadVersion(int version) {
LOGGER.error("Exception while executing script for loading data.", e);
}
}

private int extractDeleted(int currentVersion, String cwIdsFile, String destPath, String sourcePath) {
int deletedTriples = 0;

private static void extractDeleted(String currentFile, String cwIdsFile, String destPath) {
try {
String scriptFilePath = System.getProperty("user.dir") + File.separator + "export_cws_tbd.sh";
String[] command = {"/bin/bash", scriptFilePath,
Integer.toString(currentVersion), cwIdsFile, destPath, sourcePath };
String[] command = {"/bin/bash", scriptFilePath, currentFile, cwIdsFile, destPath };
Process p = new ProcessBuilder(command).start();
BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
String line;
while ((line = in.readLine()) != null) {
deletedTriples += Integer.parseInt(line);
}
String line = null;
while ((line = stdError.readLine()) != null) {
LOGGER.info(line);
}
p.waitFor();
in.close();
stdError.close();
} catch (IOException e) {
LOGGER.error("Exception while executing script for extracting creative works that have to be deleted.", e);
} catch (InterruptedException e) {
LOGGER.error("Exception while executing script for extracting creative works that have to be deleted.", e);
}
return deletedTriples;
}

}
@Override
public void receiveCommand(byte command, byte[] data) {
if (command == VirtuosoSystemAdapterConstants.BULK_LOADING_DATA_FINISHED) {
Expand Down
4 changes: 2 additions & 2 deletions system/virtuoso.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ DisableTcpSocket = 0
;X509ClientVerify = 0
;X509ClientVerifyDepth = 0
;X509ClientVerifyCAFile = ca.pem
MaxClientConnections = 25
MaxClientConnections = 10
CheckpointInterval = 60
O_DIRECT = 0
CaseMode = 2
Expand Down Expand Up @@ -126,7 +126,7 @@ EnabledDavVSP = 0
HTTPProxyEnabled = 0
TempASPXDir = 0
DefaultMailServer = localhost:25
ServerThreads = 20
ServerThreads = 10
MaxKeepAlives = 10
KeepAliveTimeout = 10
MaxCachedProxyConnections = 10
Expand Down

0 comments on commit f051ac3

Please sign in to comment.