Skip to content

Commit

Permalink
Merge pull request #20 from hobbit-project/feature/storage-space-gath…
Browse files Browse the repository at this point in the history
…ering

Feature/storage space gathering
  • Loading branch information
vpapako authored Apr 25, 2018
2 parents 2f8f425 + c853436 commit 5fe6139
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 129 deletions.
83 changes: 0 additions & 83 deletions dependency-reduced-pom.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.hobbit.core.Commands;
import org.hobbit.core.Constants;
import org.hobbit.core.components.AbstractBenchmarkController;
import org.hobbit.core.components.utils.SystemResourceUsageRequester;
import org.hobbit.core.data.usage.ResourceUsageInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,7 +33,6 @@ public class VersioningBenchmarkController extends AbstractBenchmarkController {
private String[] dataGenEnvVariables = null;
private String[] evalStorageEnvVariables = null;


private Semaphore versionSentMutex = new Semaphore(0);
private Semaphore versionLoadedMutex = new Semaphore(0);

Expand All @@ -44,19 +45,23 @@ public class VersioningBenchmarkController extends AbstractBenchmarkController {
private AtomicIntegerArray triplesToBeDeleted;
private AtomicIntegerArray triplesToBeLoaded;
private AtomicInteger numberOfMessages = new AtomicInteger(0);


private SystemResourceUsageRequester resUsageRequester = null;
private long systemInitialUsableSpace = 0;
private long systemStorageSpaceCost = 0;

@Override
public void init() throws Exception {
LOGGER.info("Initilalizing Benchmark Controller...");
super.init();

numberOfDataGenerators = (Integer) getProperty(PREFIX + "hasNumberOfGenerators", 1);
int v0Size = (Integer) getProperty(PREFIX + "v0SizeInTriples", 1000000);
int generatorSeed = (Integer) getProperty(PREFIX + "generatorSeed", 0);
numOfVersions = (Integer) getProperty(PREFIX + "numberOfVersions", 12);
int insRatio = (Integer) getProperty(PREFIX + "versionInsertionRatio", 5);
int delRatio = (Integer) getProperty(PREFIX + "versionDeletionRatio", 3);
String dataForm = (String) getProperty(PREFIX + "generatedDataForm", "ic");
numberOfDataGenerators = (Integer) getPropertyOrDefault(PREFIX + "hasNumberOfGenerators", 1);
int v0Size = (Integer) getPropertyOrDefault(PREFIX + "v0SizeInTriples", 1000000);
int generatorSeed = (Integer) getPropertyOrDefault(PREFIX + "generatorSeed", 0);
numOfVersions = (Integer) getPropertyOrDefault(PREFIX + "numberOfVersions", 12);
int insRatio = (Integer) getPropertyOrDefault(PREFIX + "versionInsertionRatio", 5);
int delRatio = (Integer) getPropertyOrDefault(PREFIX + "versionDeletionRatio", 3);
String dataForm = (String) getPropertyOrDefault(PREFIX + "generatedDataForm", "ic");

loadingTimes = new long[numOfVersions];
triplesToBeAdded = new AtomicIntegerArray(numOfVersions);
Expand Down Expand Up @@ -119,7 +124,7 @@ public void init() throws Exception {
* @return the value of requested parameter
*/
@SuppressWarnings("unchecked")
private <T> T getProperty(String property, T defaultValue) {
private <T> T getPropertyOrDefault(String property, T defaultValue) {
T propertyValue = null;
NodeIterator iterator = benchmarkParamModel
.listObjectsOfProperty(benchmarkParamModel
Expand Down Expand Up @@ -172,17 +177,23 @@ public void receiveCommand(byte command, byte[] data) {
numberOfMessages.addAndGet(dataGenNumOfMessages);

// signal sent from data generator that all its data generated successfully
LOGGER.info("Recieved signal from Data Generator " + dataGeneratorId + " that all data (#" + dataGenNumOfMessages + ") of version " + loadedVersion + " successfully sent to System Adapter.");
LOGGER.info("Recieved signal from Data Generator " + dataGeneratorId + " that all data (#" + dataGenNumOfMessages + ") of version " + loadedVersion + " successfully sent to the system.");
versionSentMutex.release();
} else if (command == SystemAdapterConstants.BULK_LOADING_DATA_FINISHED) {
// signal sent from system adapter that a version loaded successfully
LOGGER.info("Recieved signal that all data of version " + loadedVersion + " successfully loaded from system.");
// signal sent from the system that a version loaded successfully
LOGGER.info("Recieved signal that all data of version " + loadedVersion + " successfully loaded by the system.");
long currTimeMillis = System.currentTimeMillis();
long versionLoadingTime = currTimeMillis - prevLoadingStartedTime;
loadingTimes[loadedVersion++] = versionLoadingTime;
prevLoadingStartedTime = currTimeMillis;

LOGGER.info("Waiting 15 seconds...");
try {
Thread.sleep(1000 * 15);
} catch (InterruptedException e) {
LOGGER.error("An error occured while waiting.", e);
}
versionLoadedMutex.release();
}
}
super.receiveCommand(command, data);
}

Expand All @@ -199,46 +210,76 @@ protected void executeBenchmark() throws Exception {
sendToCmdQueue(Commands.DATA_GENERATOR_START_SIGNAL);
LOGGER.info("Start signals sent to Data and Task Generators");

LOGGER.info("Creating requester for system resource usage information.");
resUsageRequester = SystemResourceUsageRequester.create(this, getHobbitSessionId());

LOGGER.info("Measuring system's usable space before data loading");
ResourceUsageInformation infoBefore = resUsageRequester.getSystemResourceUsage();
if (infoBefore.getDiskStats() != null) {
systemInitialUsableSpace = infoBefore.getDiskStats().getFsSizeSum();
LOGGER.info("System's usable space before data loading: " + systemInitialUsableSpace);
} else {
LOGGER.info(infoBefore.toString());
LOGGER.info("Got null as response.");
}

// iterate through different versions starting from version 0
for (int v=0; v<numOfVersions; v++) {
for (int v = 0; v < numOfVersions; v++) {
// wait for all data generators to sent data of version v to system adapter
LOGGER.info("Waiting for all data generators to send data of version " + v + " to system adapter.");
LOGGER.info("Waiting for all data generators to send data of version " + v + " to the system.");
versionSentMutex.acquire(numberOfDataGenerators);
LOGGER.info("Signal from all data generators received.");

// Send signal that all data, generated and sent to system adapter successfully.
// Send signal that all data, generated and sent to the system successfully.
// The number of messages along with a flag is also sent
LOGGER.info("Send signal to System Adapter that the sending of all data of version " + v + " from Data Generators have finished.");
LOGGER.info("Send signal to the system that the sending of all data of version " + v + " from Data Generators have finished.");
ByteBuffer buffer = ByteBuffer.allocate(5);
buffer.putInt(numberOfMessages.get());
buffer.put(v == numOfVersions - 1 ? (byte) 1 : (byte) 0);
prevLoadingStartedTime = System.currentTimeMillis();
sendToCmdQueue(SystemAdapterConstants.BULK_LOAD_DATA_GEN_FINISHED, buffer.array());
numberOfMessages.set(0);

LOGGER.info("Waiting for the system to load data of version " + v);
versionLoadedMutex.acquire();
}

// wait for the data generators to finish their work
LOGGER.info("Waiting for the data generators to finish their work.");
waitForDataGenToFinish();
LOGGER.info("Data generators finished.");
LOGGER.info("Data generators finished.");

// wait for the task generators to finish their work
LOGGER.info("Waiting for the task generators to finish their work.");
waitForTaskGenToFinish();
LOGGER.info("Task generators finished.");

LOGGER.info("Computing system's storage space overhead after data loading");
ResourceUsageInformation infoAfter = resUsageRequester.getSystemResourceUsage();
if (infoAfter.getDiskStats() != null) {
systemStorageSpaceCost = infoAfter.getDiskStats().getFsSizeSum() - systemInitialUsableSpace;
LOGGER.info("System's storage space overhead after data loading: " + systemStorageSpaceCost);
} else {
LOGGER.info(infoAfter.toString());
LOGGER.info("Got null as response.");
}

// wait for the system to terminate
LOGGER.info("Waiting for the system to terminate.");
waitForSystemToFinish(1000 * 60 * 25);
LOGGER.info("System terminated.");

if (resUsageRequester != null) {
try {
resUsageRequester.close();
LOGGER.info("Resource Usage Requester closed successfully.");
} catch (IOException e) {
LOGGER.error("An error occured while closing SystemResourceUsageRequester");
}
}

// pass the number of versions composing the dataset to the environment
// variables of the evaluation module
evalModuleEnvVariables = ArrayUtils.add(evalModuleEnvVariables, VersioningConstants.TOTAL_VERSIONS + "=" + numOfVersions);

// pass the loading times and the number of triples that have to be loaded to
// the environment variables of the evaluation module, so that it can compute
// the ingestion and applied changes speeds
Expand All @@ -252,6 +293,10 @@ protected void executeBenchmark() throws Exception {
evalModuleEnvVariables = ArrayUtils.add(evalModuleEnvVariables,
String.format(VersioningConstants.LOADING_TIMES, version) + "=" + loadingTimes[version]);
}

// pass the storage space cost as an environment variable
evalModuleEnvVariables = ArrayUtils.add(evalModuleEnvVariables,VersioningConstants.STORAGE_COST_VALUE + "=" + systemStorageSpaceCost);

// create the evaluation module
createEvaluationModule(EVALUATION_MODULE_CONTAINER_IMAGE, evalModuleEnvVariables);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class VersioningEvaluationModule extends AbstractEvaluationModule {

private static final Logger LOGGER = LoggerFactory.getLogger(VersioningEvaluationModule.class);

private Model finalModel = ModelFactory.createDefaultModel();

private Property INITIAL_VERSION_INGESTION_SPEED = null;
Expand Down Expand Up @@ -81,10 +81,11 @@ public void init() throws Exception {
is.reportSuccess(version, triplesToBeAdded, triplesToBeDeleted, triplesToBeLoaded, loadingTime);
}

storageCost = Integer.parseInt(env.get(String.format(VersioningConstants.STORAGE_COST_VALUE))) / (1024f * 1024f);

INITIAL_VERSION_INGESTION_SPEED = initFinalModelFromEnv(env, VersioningConstants.INITIAL_VERSION_INGESTION_SPEED);
AVG_APPLIED_CHANGES_PS = initFinalModelFromEnv(env, VersioningConstants.AVG_APPLIED_CHANGES_PS);
STORAGE_COST = initFinalModelFromEnv(env, VersioningConstants.STORAGE_COST);

QT_1_AVG_EXEC_TIME = initFinalModelFromEnv(env, VersioningConstants.QT_1_AVG_EXEC_TIME);
QT_2_AVG_EXEC_TIME = initFinalModelFromEnv(env, VersioningConstants.QT_2_AVG_EXEC_TIME);
QT_3_AVG_EXEC_TIME = initFinalModelFromEnv(env, VersioningConstants.QT_3_AVG_EXEC_TIME);
Expand All @@ -94,7 +95,7 @@ public void init() throws Exception {
QT_7_AVG_EXEC_TIME = initFinalModelFromEnv(env, VersioningConstants.QT_7_AVG_EXEC_TIME);
QT_8_AVG_EXEC_TIME = initFinalModelFromEnv(env, VersioningConstants.QT_8_AVG_EXEC_TIME);
QUERY_FAILURES = initFinalModelFromEnv(env, VersioningConstants.QUERY_FAILURES);
QUERIES_PER_SECOND = initFinalModelFromEnv(env, VersioningConstants.QUERIES_PER_SECOND);
QUERIES_PER_SECOND = initFinalModelFromEnv(env, VersioningConstants.QUERIES_PER_SECOND);

LOGGER.info("Evaluation Module initialized successfully.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public final class VersioningConstants {
public static final String AVG_APPLIED_CHANGES_PS = "avg_applied_changes_ps";

public static final String STORAGE_COST = "storage_cost";


public static final String STORAGE_COST_VALUE = "storage_cost_value";

public static final String QT_1_AVG_EXEC_TIME = "query_type_1_avgerage_execution_time";

public static final String QT_2_AVG_EXEC_TIME = "query_type_2_avgerage_execution_time";
Expand Down
Loading

0 comments on commit 5fe6139

Please sign in to comment.