Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

paralelly produce deleteset #8

Merged
merged 8 commits into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions required_files/export_cws_tbd.sh
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
#!/bin/bash

CURR_VERSION=$1
CURR_VERSION_FILE=$1
CW_TBD=$2
DESTINATION_PATH=$3

export_deleted() {
filename=$(basename "$1")
filename="${filename%added.*}"
tbd=$(grep -F -f $CW_TBD $1)
lines=$(echo "${tbd}" | wc -l)
if [[ "$lines" > "1" ]]; then
printf "%s\n" "${tbd}" >> $DESTINATION_PATH/$filename"deleted.nt"
echo $lines
fi
}

# do it parallely in the background for improving performance
for ((i=0; i<$CURR_VERSION; i++)); do
SOURCE_PATH=$([ "$i" = "0" ] && echo $4"/v"$i || echo $4"/c"$i)
for f in $SOURCE_PATH/generatedCreativeWorks*.added.nt; do
export_deleted $f &
done
done
wait
filename=$(basename "$CURR_VERSION_FILE")
filename="${filename%added.*}"
grep -F -f $CW_TBD $CURR_VERSION_FILE >> $DESTINATION_PATH/$filename"deleted.nt"
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryExecution;
Expand All @@ -37,8 +42,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.io.Files;

import eu.ldbc.semanticpublishing.generators.data.DataGenerator;
import eu.ldbc.semanticpublishing.properties.Configuration;
import eu.ldbc.semanticpublishing.properties.Definitions;
Expand Down Expand Up @@ -164,55 +167,92 @@ public void init() throws Exception {
dataGenerator.produceData();
cwsToBeLoaded[0] = v0SizeInTriples;


// Generate the change sets. Only additions/deletions are supported.
// TODO: support changes
int preVersionDeletedCWs = 0;
long changeSetStart = System.currentTimeMillis();
for(int i = 1; i < numberOfVersions; i++) {
String destinationPath = generatedDatasetPath + File.separator + "c" + i;
File theDir = new File(destinationPath);
theDir.mkdir();

int triplesToBeAdded = Math.round(versionInsertionRatio / 100f * cwsToBeLoaded[i-1]);
int triplesToBeDeleted = Math.round(versionDeletionRatio / 100f * cwsToBeLoaded[i-1]);
cwsToBeLoaded[i] = cwsToBeLoaded[i-1] + triplesToBeAdded - triplesToBeDeleted;
LOGGER.info("Generating version " + i + " changeset. Target: " + "[+" + triplesToBeAdded + ", -" + triplesToBeDeleted + "]");
String destinationPath = generatedDatasetPath + File.separator + "c" + i;

// produce the add set
LOGGER.info("Generating version " + i + " add-set.");
dataGenerator.produceAdded(destinationPath, triplesToBeAdded);

LOGGER.info("Generating version " + i + " changeset. Target: " + "[+" + String.format(Locale.US, "%,d", triplesToBeAdded).replace(',', '.') + " , -" + String.format(Locale.US, "%,d", triplesToBeDeleted).replace(',', '.') + "]");

// produce the delete set
LOGGER.info("Generating version " + i + " delete-set.");
long deleteSetStart = System.currentTimeMillis();
int currVersionDeletedCreativeWorks = 0;
int currVersionDeletedTriples = 0;
int creativeWorkAvgTriples = DataManager.randomTriples.intValue() / DataManager.randomCreativeWorkIdsList.size();

// Estimate the total number of creative works that have to be deleted, using
// creative work average triples that have been generated so far.
int creativeWorksToBeDeleted = triplesToBeDeleted / creativeWorkAvgTriples;
LOGGER.info(creativeWorksToBeDeleted + " cworks estimated that have to be deleted from v" + (i - 1));
while (currVersionDeletedTriples < triplesToBeDeleted) {
ArrayList<String> cwToBeDeleted = new ArrayList<String>();
for(int c=0; c<creativeWorksToBeDeleted; c++) {
int deletedCWIndex = randomGenerator.nextInt(DataManager.remainingRandomCreativeWorkIdsList.size() - 1);
long creativeWorkToBeDeleted = DataManager.remainingRandomCreativeWorkIdsList.get(deletedCWIndex);
DataManager.remainingRandomCreativeWorkIdsList.remove(deletedCWIndex);
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkToBeDeleted + "#id");
int totalRandomTriplesSoFar = DataManager.randomCreativeWorkTriples.intValue();

ArrayList<String> cwToBeDeleted = new ArrayList<String>();

// if the number of triples that have to be deleted is larger than the already existing
// random-model ones, take all the random and choose from other data-models (correlations,
// major/minor events) as well
List<Long> randomCreativeWorkIds = new ArrayList<Long>(DataManager.randomCreativeWorkIdsList.keySet());
if(triplesToBeDeleted > totalRandomTriplesSoFar) {
LOGGER.info("Target of " + String.format(Locale.US, "%,d", triplesToBeDeleted).replace(',', '.') + " triples exceedes the already (random-model) existing ones (" + String.format(Locale.US, "%,d", totalRandomTriplesSoFar).replace(',', '.') + "). Will choose from clustering and correlation models as well.");
// take all the random
for (long creativeWorkId : randomCreativeWorkIds) {
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkId + "#id");
}
DataManager.randomCreativeWorkIdsList.clear();
DataManager.randomCreativeWorkTriples.set(0);
currVersionDeletedTriples = totalRandomTriplesSoFar;

// as delete-set target have not reached yet, choose the rest from correlations or major/minor
List<Long> corrExpCreativeWorkIds = new ArrayList<Long>(DataManager.corrExpCreativeWorkIdsList.keySet());
int corrExpTotalTriples = 0;
while (currVersionDeletedTriples < triplesToBeDeleted) {
int creativeWorkToBeDeletedIdx = randomGenerator.nextInt(corrExpCreativeWorkIds.size());
long creativeWorkToBeDeleted = corrExpCreativeWorkIds.get(creativeWorkToBeDeletedIdx);
currVersionDeletedTriples += DataManager.corrExpCreativeWorkIdsList.get(creativeWorkToBeDeleted);
corrExpTotalTriples += DataManager.corrExpCreativeWorkIdsList.get(creativeWorkToBeDeleted);
corrExpCreativeWorkIds.remove(creativeWorkToBeDeletedIdx);
DataManager.corrExpCreativeWorkIdsList.remove(creativeWorkToBeDeleted);
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkToBeDeleted + "#id");
}
// write down the creative work uris that are going to be deleted
FileUtils.writeLines(new File("/versioning/creativeWorksToBeDeleted.txt") , cwToBeDeleted, false);
// extract all triples that have to be deleted
currVersionDeletedTriples += extractDeleted(i, "/versioning/creativeWorksToBeDeleted.txt", destinationPath, generatedDatasetPath);
currVersionDeletedCreativeWorks += creativeWorksToBeDeleted;
// estimation of the remaining creative works that have to be extracted
creativeWorksToBeDeleted = (int) Math.ceil((double) (triplesToBeDeleted - currVersionDeletedTriples) / creativeWorkAvgTriples);
if(creativeWorksToBeDeleted > 0) {
LOGGER.info(creativeWorksToBeDeleted + " more cwork" + (creativeWorksToBeDeleted > 1 ? "s" : "") +" estimated that have to be deleted from v" + (i - 1));

// extract all triples that have to be deleted using multiple threads
long start = System.currentTimeMillis();
parallelyExtract(i, destinationPath);
long end = System.currentTimeMillis();
DataManager.corrExpCreativeWorkTriples.addAndGet(-corrExpTotalTriples);

currVersionDeletedCreativeWorks += cwToBeDeleted.size();
} else {
while (currVersionDeletedTriples < triplesToBeDeleted) {
int creativeWorkToBeDeletedIdx = randomGenerator.nextInt(randomCreativeWorkIds.size());
long creativeWorkToBeDeleted = randomCreativeWorkIds.get(creativeWorkToBeDeletedIdx);
currVersionDeletedTriples += DataManager.randomCreativeWorkIdsList.get(creativeWorkToBeDeleted);
randomCreativeWorkIds.remove(creativeWorkToBeDeletedIdx);
DataManager.randomCreativeWorkIdsList.remove(creativeWorkToBeDeleted);
cwToBeDeleted.add("http://www.bbc.co.uk/things/" + getGeneratorId() + "-" + creativeWorkToBeDeleted + "#id");
}
// write down the creative work uris that are going to be deleted
// in order to use it in grep -F -f
FileUtils.writeLines(new File("/versioning/creativeWorksToBeDeleted.txt") , cwToBeDeleted, false);

// extract all triples that have to be deleted using multiple threads
long start = System.currentTimeMillis();
parallelyExtract(i, destinationPath);
long end = System.currentTimeMillis();
DataManager.randomCreativeWorkTriples.addAndGet(-currVersionDeletedTriples);
currVersionDeletedCreativeWorks += cwToBeDeleted.size();
}
preVersionDeletedCWs = currVersionDeletedCreativeWorks;
long deleteSetEnd = System.currentTimeMillis();
LOGGER.info("Deleteset of total " + preVersionDeletedCWs + " Creative Works generated successfully. Triples: " + currVersionDeletedTriples + ". Target: " + triplesToBeDeleted + " triples. Time: " + (deleteSetEnd - deleteSetStart) + " ms.");
LOGGER.info("Deleteset of total " + String.format(Locale.US, "%,d", preVersionDeletedCWs).replace(',', '.') + " Creative Works generated successfully. Triples: " + String.format(Locale.US, "%,d", currVersionDeletedTriples).replace(',', '.') + " . Target: " + String.format(Locale.US, "%,d", triplesToBeDeleted).replace(',', '.') + " triples. Time: " + (deleteSetEnd - deleteSetStart) + " ms.");

// produce the add set
LOGGER.info("Generating version " + i + " add-set.");
dataGenerator.produceAdded(destinationPath, triplesToBeAdded);
}
long changeSetEnd = System.currentTimeMillis();
LOGGER.info("All changesets generated successfully. Time: " + (changeSetEnd - changeSetStart) + " ms.");
Expand Down Expand Up @@ -249,6 +289,45 @@ public void init() throws Exception {
LOGGER.info("Expected answers have computed successfully for all generated SPRQL tasks.");
}

public void parallelyExtract(int currVersion, String destinationPath) {
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for(int j = 0; j < currVersion; j++) {
String sourcePath = generatedDatasetPath + File.separator + (j == 0 ? "v" : "c") + j + File.separator;
File sourcePathFile = new File(sourcePath);
List<File> previousVersionAddedFiles = (List<File>) FileUtils.listFiles(sourcePathFile, new RegexFileFilter("generatedCreativeWorks-[0-9]+-[0-9]+.added.nt"), null);
for (File f : previousVersionAddedFiles) {
executor.execute(new ExtractDeleted(f, "/versioning/creativeWorksToBeDeleted.txt", destinationPath));
}
}
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); // no timeout
} catch (InterruptedException e) {
LOGGER.error("Exception caught while awaiting termination...", e);
}
}

// class for implementing extraction of triples that have to be deleted concurrently
public static class ExtractDeleted implements Runnable {
private File file;
private String cwTBD;
private String destinationPath;

ExtractDeleted(File file, String cwTBD, String destinationPath) {
this.file = file;
this.cwTBD = cwTBD;
this.destinationPath = destinationPath;
}

public void run() {
try {
extractDeleted(file.getAbsolutePath(), cwTBD, destinationPath);
} catch (Exception e) {
LOGGER.error("Exception caught during the extraction of deleted triples from " + file, e);
}
}
}

public void initFromEnv() {
LOGGER.info("Getting Data Generator's properites from the environment...");

Expand Down Expand Up @@ -919,34 +998,25 @@ public void loadVersion(int version) {
LOGGER.error("Exception while executing script for loading data.", e);
}
}

private int extractDeleted(int currentVersion, String cwIdsFile, String destPath, String sourcePath) {
int deletedTriples = 0;

private static void extractDeleted(String currentFile, String cwIdsFile, String destPath) {
try {
String scriptFilePath = System.getProperty("user.dir") + File.separator + "export_cws_tbd.sh";
String[] command = {"/bin/bash", scriptFilePath,
Integer.toString(currentVersion), cwIdsFile, destPath, sourcePath };
String[] command = {"/bin/bash", scriptFilePath, currentFile, cwIdsFile, destPath };
Process p = new ProcessBuilder(command).start();
BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
String line;
while ((line = in.readLine()) != null) {
deletedTriples += Integer.parseInt(line);
}
String line = null;
while ((line = stdError.readLine()) != null) {
LOGGER.info(line);
}
p.waitFor();
in.close();
stdError.close();
} catch (IOException e) {
LOGGER.error("Exception while executing script for extracting creative works that have to be deleted.", e);
} catch (InterruptedException e) {
LOGGER.error("Exception while executing script for extracting creative works that have to be deleted.", e);
}
return deletedTriples;
}

}
@Override
public void receiveCommand(byte command, byte[] data) {
if (command == VirtuosoSystemAdapterConstants.BULK_LOADING_DATA_FINISHED) {
Expand Down
4 changes: 2 additions & 2 deletions system/virtuoso.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ DisableTcpSocket = 0
;X509ClientVerify = 0
;X509ClientVerifyDepth = 0
;X509ClientVerifyCAFile = ca.pem
MaxClientConnections = 25
MaxClientConnections = 10
CheckpointInterval = 60
O_DIRECT = 0
CaseMode = 2
Expand Down Expand Up @@ -126,7 +126,7 @@ EnabledDavVSP = 0
HTTPProxyEnabled = 0
TempASPXDir = 0
DefaultMailServer = localhost:25
ServerThreads = 20
ServerThreads = 10
MaxKeepAlives = 10
KeepAliveTimeout = 10
MaxCachedProxyConnections = 10
Expand Down