Skip to content

Commit

Permalink
added the bench client
Browse files Browse the repository at this point in the history
  • Loading branch information
lubux committed Jul 17, 2020
1 parent a74f177 commit 8a759bc
Show file tree
Hide file tree
Showing 18 changed files with 523 additions and 26 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ RUN apt-get update \

ENV CLIENT_JAR_NAME "/timecrypt-client-jar-with-dependencies.jar"
ENV TESTBED_JAR_NAME "/timecrypt-testbed-jar-with-dependencies.jar"
ENV BENCH_JAR_NAME "/timecrypt-bench-client-jar-with-dependencies.jar"
ENV SERVER_JAR_NAME "/timecrypt-server-jar-with-dependencies.jar"
ENV EXAMPLE1_JAR_NAME "/timecrypt-example-usage-jar-with-dependencies.jar"

Expand All @@ -31,7 +32,9 @@ RUN chmod u+x /docker-start.sh && dos2unix /docker-start.sh && apt-get --purge r

COPY --from=build /build/timecrypt-client/target/$TESTBED_JAR_NAME $TESTBED_JAR_NAME
COPY --from=build /build/timecrypt-client/target/$CLIENT_JAR_NAME $CLIENT_JAR_NAME
COPY --from=build /build/timecrypt-client/target/$BENCH_JAR_NAME $BENCH_JAR_NAME
COPY --from=build /build/timecrypt-server/target/$SERVER_JAR_NAME $SERVER_JAR_NAME
COPY --from=build /build/timecrypt-examples/target/$EXAMPLE1_JAR_NAME $EXAMPLE1_JAR_NAME


ENTRYPOINT ["/docker-start.sh"]
7 changes: 7 additions & 0 deletions docker-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ case "$ACTION" in

java -jar $CLIENT_JAR_NAME "${@:2}"
;;
benchclient)
echo "Starting client - waiting for server"
wait-for-it -t $TIMECRYPT_TIMEOUT $TIMECRYPT_HOST:$TIMECRYPT_PORT
echo "Server up"

java -jar $BENCH_JAR_NAME "${@:2}"
;;
*)
echo "Action '$ACTION' undefined."
echo ""
Expand Down
18 changes: 18 additions & 0 deletions timecrypt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@
</archive>
</configuration>
</execution>
<execution>
<id>build-bench-client</id>
<goals>
<goal>single</goal>
</goals>
<phase>package</phase>
<configuration>
<finalName>timecrypt-bench-client</finalName>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>ch.ethz.dsg.timecrypt.BenchClients</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
9 changes: 9 additions & 0 deletions timecrypt-client/scripts/run_bench_client.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

LOCAL_PATH=$(cd -P -- "$(dirname -- "$0")" && pwd -P)
CUR_PATH=$(pwd)
cd ${LOCAL_PATH}/../

java -jar target/timecrypt-bench-client-jar-with-dependencies.jar "$@"

cd ${CUR_PATH}
263 changes: 263 additions & 0 deletions timecrypt-client/src/main/java/ch/ethz/dsg/timecrypt/BenchClients.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
* Copyright (c) 2020 by ETH Zurich, see AUTHORS file for more
* Licensed under the Apache License, Version 2.0, see LICENSE file for more details.
*/

package ch.ethz.dsg.timecrypt;

import ch.ethz.dsg.timecrypt.client.exceptions.CouldNotReceiveException;
import ch.ethz.dsg.timecrypt.client.exceptions.CouldNotStoreException;
import ch.ethz.dsg.timecrypt.client.exceptions.InvalidQueryException;
import ch.ethz.dsg.timecrypt.client.exceptions.TCWriteException;
import ch.ethz.dsg.timecrypt.client.queryInterface.Interval;
import ch.ethz.dsg.timecrypt.client.queryInterface.Query;
import ch.ethz.dsg.timecrypt.client.state.LocalTimeCryptKeystore;
import ch.ethz.dsg.timecrypt.client.state.LocalTimeCryptProfile;
import ch.ethz.dsg.timecrypt.client.state.TimeCryptKeystore;
import ch.ethz.dsg.timecrypt.client.state.TimeCryptProfile;
import ch.ethz.dsg.timecrypt.client.streamHandling.DataPoint;
import ch.ethz.dsg.timecrypt.client.streamHandling.InsertHandler;
import ch.ethz.dsg.timecrypt.client.streamHandling.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

public class BenchClients {

public static final String TYPE_LOG_QUERY = "Q";
public static final String TYPE_LOG_INSERT = "I";
private static int logGranularity = 100;
private static int queryStart = 100;
private static int numElementsPerChunk = 500;
private static long DEFAULT_START_DATE_MS = 1000;
private static TimeUtil.Precision DEFAULT_CHUNK_WINDOW = TimeUtil.Precision.TEN_SECONDS;
private static Date DEFAULT_START_DATE = new Date(DEFAULT_START_DATE_MS);
private static int[] granularities = new int[]{4, 8, 16, 32, 64, 128, 256, 512, 1024};
private static Random rangeSelector = new Random(460003178);
private static Logger logger = LoggerFactory.getLogger(BenchClients.class);
private static List<Query.SupportedOperation> operationsToQuery = Arrays.asList(Query.SupportedOperation.AVG);

private static int[][] currentState;

public static String usernameFromID(int id) {
return String.format("user%d", id);
}

public static void createState(int numUsers, int numStreams) {
currentState = new int[numUsers][];
for (int i = 0; i < numUsers; i++) {
currentState[i] = new int[numStreams];
}
}

public static QueryRequest selectRequest(int userid) {
int buff = 2;
QueryRequest req = new QueryRequest();
req.userID = userid;
req.queryStream = rangeSelector.nextInt(currentState[req.userID].length);
int len = currentState[req.userID][req.queryStream] - buff;
if (len < granularities[0])
return null;

do {
req.granularity = granularities[rangeSelector.nextInt(granularities.length)];
} while (req.granularity > len);
int range = len - req.granularity;
req.from = (range > 0) ? rangeSelector.nextInt(range) : 0;
req.to = req.from + req.granularity;
return req;
}

public static void main(String[] args) throws Exception {
int exitCode = new CommandLine(new BenchCommandLine()).execute(args);
System.exit(exitCode);
}

private static class QueryRequest {
int from;
int to;
int granularity;
int queryStream;
int userID;
}

private static class Client extends Thread {

public AtomicBoolean ok = new AtomicBoolean(true);
String username;
int clientId;
int numStreams;
int readWriteRatio;
private int maxRounds;
private TimeCryptClient client;
private int insertCounter = 0;
private int queryCounter = 0;
private long[] streamIDs;
private InsertHandler[] insertHandlers;

public Client(int clientId, int numStreams, TimeCryptClient client, int readWriteRatio, int maxRounds) {
this.clientId = clientId;
this.numStreams = numStreams;
this.client = client;
this.readWriteRatio = readWriteRatio;
this.username = usernameFromID(clientId);
this.maxRounds = maxRounds;
}

private void createStreams() throws CouldNotStoreException, IOException, CouldNotReceiveException, InvalidQueryException {
streamIDs = new long[numStreams];
insertHandlers = new InsertHandler[numStreams];
for (int streamId = 0; streamId < numStreams; streamId++) {
streamIDs[streamId] = client.createStream(
String.format("User:%sStream:%d", username, streamId),
"",
DEFAULT_CHUNK_WINDOW,
Collections.singletonList(TimeUtil.Precision.TEN_SECONDS),
DefaultConfigs.getDefaultMetaDataConfig(),
DefaultConfigs.getDefaultEncryptionScheme(),
null,
DEFAULT_START_DATE);
insertHandlers[streamId] = client.getHandlerForInsertBench(streamIDs[streamId], DEFAULT_START_DATE);
}
}

private void insertToStream(int counter) throws IOException, TCWriteException {
long gap = DEFAULT_CHUNK_WINDOW.getMillis() / numElementsPerChunk;
for (int streamId = 0; streamId < numStreams; streamId++) {
for (int i = 0; i < numElementsPerChunk; i++) {
insertHandlers[streamId].writeDataPointToStream(
new DataPoint(
new Date(DEFAULT_START_DATE_MS + DEFAULT_CHUNK_WINDOW.getMillis() * counter + i * gap)
, 1));
}
long timeBefore = System.nanoTime();
insertHandlers[streamId].flush();
if (++insertCounter % logGranularity == 0)
logger.info("{},{},{}", this.clientId, TYPE_LOG_INSERT, System.nanoTime() - timeBefore);

currentState[clientId][streamId]++;
}
}

private void queryStreamsStatistics(int numQueries) throws Exception {
for (int count = 0; count < numQueries; count++) {
QueryRequest request = selectRequest(this.clientId);
if (request == null)
continue;
int from = request.from;
int to = request.to;
int granularity = request.granularity;
long queryStream = request.queryStream;
long timeBefore = System.nanoTime();
List<Interval> x = client.performRangeQuery(streamIDs[(int) queryStream],
new Date(DEFAULT_START_DATE_MS + from * DEFAULT_CHUNK_WINDOW.getMillis()),
new Date(DEFAULT_START_DATE_MS + to * DEFAULT_CHUNK_WINDOW.getMillis()),
operationsToQuery,
false,
granularity * DEFAULT_CHUNK_WINDOW.getMillis());
if (++queryCounter % logGranularity == 0)
logger.info("{},{},{}", this.clientId, TYPE_LOG_QUERY, System.nanoTime() - timeBefore);

}
}

@Override
public void run() {
try {
createStreams();
} catch (Exception e) {
e.printStackTrace();
return;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
int counter = 0;
while (ok.get() && counter < maxRounds) {
try {
insertToStream(counter++);
if (counter > queryStart)
queryStreamsStatistics(numStreams * readWriteRatio);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

@CommandLine.Command(name = "timecrypt-bench-client", mixinStandardHelpOptions = true, description = "Basic Benchmark client for the TC server ",
version = "1.0")
private static class BenchCommandLine implements Callable<Integer> {

@CommandLine.Option(names = {"-c", "--clients"},
description = "Number of clients ")
int numberOfClients = 10;

@CommandLine.Option(names = {"-s", "--streams"},
description = "Number of streams per client")
int numStreamsPerClients = 4;

@CommandLine.Option(names = {"-r", "--rwratio"},
description = "The chunk insert to Query Ratio")
int readWriteRatio = 4;

@CommandLine.Option(names = {"-i", "--ip"},
description = "The ip address of the server.")
String connectTo = "127.0.0.1";

@CommandLine.Option(names = {"-p", "--port"},
description = "The chunk insert to Query Ratio")
int connectToPort = 15000;

@CommandLine.Option(names = {"-mr", "--maxrounds"},
description = "The maximal number of rounds")
int maxRounds = 10000;

@CommandLine.Option(names = {"-md", "--maxduration"},
description = "The maximal duration of the benchmark")
int awaitTime = 120000;

@CommandLine.Option(names = {"-nr", "--recordsperchunk"},
description = "The number of record per chunk")
int numberOfRecords = 500;

@CommandLine.Option(names = {"-lg", "--loggranularity"},
description = "How many reults to log. eg 100 = every 100th event")
int logGran = 100;

@Override
public Integer call() throws Exception {
Client[] clients = new Client[numberOfClients];
createState(numberOfClients, numStreamsPerClients);
String DUMMY_PASSWORD = "1234";
numElementsPerChunk = numberOfRecords;
logGranularity = logGran;
TimeCryptKeystore keystore = LocalTimeCryptKeystore.createLocalKeystore(null, DUMMY_PASSWORD.toCharArray());
for (int i = 0; i < numberOfClients; i++) {
TimeCryptProfile profile = new LocalTimeCryptProfile(null, usernameFromID(i),
"Profile" + usernameFromID(i), connectTo, connectToPort);
TimeCryptClient tcClient = new TimeCryptClient(keystore, profile);
clients[i] = new Client(i, numStreamsPerClients, tcClient,
readWriteRatio, maxRounds);
clients[i].start();
}
Thread.sleep(awaitTime);
for (int i = 0; i < numberOfClients; i++) {
clients[i].ok.set(false);
}

for (int i = 0; i < numberOfClients; i++) {
clients[i].join();
}
return 0;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class TimeCryptClient {

private final List<InsertHandler> openInsertHandlers = Collections.synchronizedList(new ArrayList<InsertHandler>());
private final ServerInterface serverInterface;

public ServerInterface getServerInterface() {
return serverInterface;
}
Expand Down Expand Up @@ -334,6 +333,17 @@ public InsertHandler getHandlerForBackupInsert(long streamId, Date backupStartTi
streamId).getEncoded(), CHUNK_KEY_STREAM_DEPTH), this.serverInterface, backupStartTime, openInsertHandlers);
}

/**
* Creates a insert stream handler for bench inserts.
* @param streamId the stream id the handler should insert to.
* @param backupStartTime the start time of the data that is to be inserted.
* @return a TC insert handler for backup inserts.
*/
public InsertHandler getHandlerForInsertBench(long streamId, Date backupStartTime) throws CouldNotReceiveException, InvalidQueryException, IOException {
return new BenchInsertHandler(profile.getStream(streamId), new StreamKeyManager(keyStore.receiveStreamKey(profile.getProfileName() +
streamId).getEncoded(), CHUNK_KEY_STREAM_DEPTH), this.serverInterface, backupStartTime, openInsertHandlers);
}

/**
* Terminates all open insertHandlers created by this client
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,7 @@ private static Query checkEndDate(Stream stream, Date to, boolean allowChunkScan
);
}

// we have to allow last chunk ID + 1 because the interval is EXCLUSIVE the last item
if (chunkId > stream.getLastWrittenChunkId() + 1) {
if (chunkId - 1 > stream.getLastWrittenChunkId()) {
throw new InvalidQueryIntervalException("Interval ends after the last inserted chunk.", false,
new Date(TimeUtil.getChunkStartTime(stream, stream.getLastWrittenChunkId())),
new Date(TimeUtil.getChunkStartTime(stream, stream.getLastWrittenChunkId())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ public class GrpcServerClient implements ServerInterface {
private TimecryptGrpc.TimecryptBlockingStub stub;


// Create a communication channel to the server, known as a Channel. Channels are thread-safe
// and reusable. It is common to create channels at the beginning of your application and reuse
// them until the application shuts down.
// See: https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/helloworld/HelloWorldClient.java


public GrpcServerClient(String serverAddress, int serverPort) {

// create a custom header
Expand All @@ -58,7 +52,6 @@ public GrpcServerClient(String serverAddress, int serverPort) {
// create client stub
ManagedChannel channel = ManagedChannelBuilder.forAddress(serverAddress, serverPort)
.usePlaintext()
.intercept()
.build();
stub = TimecryptGrpc.newBlockingStub(channel);
stub = MetadataUtils.attachHeaders(stub, header);
Expand Down
Loading

0 comments on commit 8a759bc

Please sign in to comment.