Skip to content

Commit

Permalink
updates with comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Unmesh Joshi authored and Unmesh Joshi committed Nov 22, 2024
1 parent 7a22940 commit 9401717
Show file tree
Hide file tree
Showing 23 changed files with 236 additions and 46 deletions.
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 3 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
38 changes: 25 additions & 13 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

##############################################################################
#
Expand Down Expand Up @@ -55,7 +57,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
Expand All @@ -80,13 +82,12 @@ do
esac
done

APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit

APP_NAME="Gradle"
# This is normally unused
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
' "$PWD" ) || exit

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
Expand Down Expand Up @@ -133,22 +134,29 @@ location of your Java installation."
fi
else
JAVACMD=java
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
if ! command -v java >/dev/null 2>&1
then
die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
fi

# Increase the maximum file descriptors if we can.
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
Expand Down Expand Up @@ -193,11 +201,15 @@ if "$cygwin" || "$msys" ; then
done
fi

# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.

set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
Expand Down
23 changes: 13 additions & 10 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@rem SPDX-License-Identifier: Apache-2.0
@rem

@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
Expand All @@ -26,6 +28,7 @@ if "%OS%"=="Windows_NT" setlocal

set DIRNAME=%~dp0
if "%DIRNAME%"=="" set DIRNAME=.
@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

Expand All @@ -42,11 +45,11 @@ set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if %ERRORLEVEL% equ 0 goto execute

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
echo. 1>&2
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2

goto fail

Expand All @@ -56,11 +59,11 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe

if exist "%JAVA_EXE%" goto execute

echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
echo. 1>&2
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2

goto fail

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/replicate/common/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void addDelayForMessagesOfType(Replica n, MessageId messageId) {
network.addDelayForMessagesOfType(n.getPeerConnectionAddress(), messageId);
}

public int quorum() {
public int majorityQuorum() {
return getNoOfReplicas() / 2 + 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class GenerationVoting extends Replica {
//epoch/term/generation
//this is durable.
//DurableKVStore
int generation = 0;
int generation = 0; //this needs to be durable.

DurableKVStore ballotStore;

Expand Down Expand Up @@ -76,8 +76,9 @@ private void handlePrepareRequest(Message<PrepareRequest> message) {
//no synchronized in here..
var prepareRequest = message.messagePayload();
boolean promised = false;
if (prepareRequest.proposedBallot > generation) { //accept only if 'strictly greater'
generation = prepareRequest.proposedBallot;
if (prepareRequest.proposedGeneration > generation) { //accept only if 'strictly
// greater'
generation = prepareRequest.proposedGeneration;
logger.info(getName() + " accepting " + generation + " in " + getName());
promised = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import replicate.common.MessageId;

public class PrepareRequest extends MessagePayload {
public final int proposedBallot;
public final int proposedGeneration;

public PrepareRequest(int proposedBallot) {
super(MessageId.Prepare);
this.proposedBallot = proposedBallot;
this.proposedGeneration = proposedBallot;
}
}
1 change: 1 addition & 0 deletions src/main/java/replicate/paxos/SingleValuePaxos.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private void handlePromise(Message<PrepareResponse> promise) {
handleResponse(promise);
}

//TODO: handle execute request.
private CompletableFuture<SetValueResponse> handleSetValueRequest(SetValueRequest setValueRequest) {
return doPaxos(new SetValueCommand(setValueRequest.getKey(), setValueRequest.getValue()).serialize()).thenApply(value -> new SetValueResponse(value.orElse("")));
}
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/replicate/paxoslog/PaxosLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public class PaxosLog extends Replica {
//Paxos State
//THIS HAS TO BE DURABLE.. Homework. Make paxosLog durable.
//Solution: Use DurableKVStore instead of a TreeMap
Map<Integer, PaxosState> paxosLog = new TreeMap<>();
Map<Integer, PaxosState> paxosLog = new TreeMap<>(); //RocksDB

//This is the final state for Key-value store.
Map<String, String> kv = new HashMap<>();

private final SetValueCommand NO_OP_COMMAND = new SetValueCommand("", "");
Expand Down Expand Up @@ -105,12 +106,12 @@ private CompletableFuture<GetValueResponse> handleClientGetValueRequest(GetValue
AtomicInteger maxKnownPaxosRoundId = new AtomicInteger(1);
AtomicInteger logIndex = new AtomicInteger(0);

public CompletableFuture<PaxosResult> append(int index, byte[] initialValue, CompletionCallback<ExecuteCommandResponse> callback) {
CompletableFuture<PaxosResult> appendFuture = doPaxos(index, initialValue, callback);
public CompletableFuture<PaxosResult> append(int index, byte[] clientRequest, CompletionCallback<ExecuteCommandResponse> callback) {
CompletableFuture<PaxosResult> appendFuture = doPaxos(index, clientRequest, callback);
return appendFuture.thenCompose((result)->{
if (result.value.stream().allMatch(v -> v != initialValue)) {
if (result.value.stream().allMatch(v -> v != clientRequest)) {
logger.info("Could not append proposed value to " + logIndex + ". Trying next index");
return append(logIndex.incrementAndGet(), initialValue, callback);
return append(logIndex.incrementAndGet(), clientRequest, callback);
}
return CompletableFuture.completedFuture(result);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private void handleCommit(Message<CommitCommandRequest> message) {
CommitCommandRequest t = message.messagePayload();
Command command = getCommand(t.getCommand());
acceptedCommand = command;
// //if command instanceof TransferFunds
if (command instanceof CompareAndSwap) {
CompareAndSwap cas = (CompareAndSwap) command;
Optional<String> existingValue = Optional.ofNullable(kvStore.get(cas.getKey()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ CompletableFuture<ExecuteCommandResponse> handleExecute(ExecuteCommandRequest ne
CompletionCallback<ExecuteCommandResponse> callback = new CompletionCallback();
requestWaitingList.add(requestIdentifier(newCommand.command), callback);
//phase 1
prepare().
prepare(). //check if there are any pending requests..
thenCompose(r -> {
//phase 2
byte[] command = pickCommandToExecute(r.values().stream().toList(), newCommand.command);
Expand Down
9 changes: 5 additions & 4 deletions src/main/java/replicate/vsr/ViewStampedReplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void handleDoViewChange(Message<DoViewChange> message) {
logger.info(getName() + " Received DoViewChange from " + message.getFromAddress() + " for view " + doViewChange.viewNumber);
doViewChangeCounter++;
doViewChangeMessages.add(doViewChange);
if (doViewChangeCounter == quorum()) {
if (doViewChangeCounter == majorityQuorum()) {
logger.info("");
DoViewChange selectedViewChange = pickViewChangeMessageWithHighestNormalViewnumber(doViewChangeMessages);
this.log = selectedViewChange.log;
Expand All @@ -146,7 +146,6 @@ private void handleDoViewChange(Message<DoViewChange> message) {
heartbeatChecker.stop();
heartBeatScheduler.start();
logger.info(getName() + " DoViewChange quorum reached. Starting view " + this.viewNumber);

sendOnewayMessageToOtherReplicas(new StartView(this.log, this.opNumber, this.commitNumber));
}
}
Expand All @@ -166,7 +165,7 @@ private void handleStartViewChange(Message<StartViewChange> message) {
transitionToViewChange();
}
startViewChangeCounter++;
if (startViewChangeCounter == quorum()) {
if (startViewChangeCounter == majorityQuorum()) {
InetAddressAndPort primaryForView = configuration.getPrimaryForView(viewNumber);
logger.info(getName() + " StartViewChange quorum reached." + primaryForView + " is the new primary." + " Sending DoViewChange");
sendOneway(primaryForView, new DoViewChange(viewNumber, log, normalStatusViewNumber, opNumber, commitNumber), message.getCorrelationId());
Expand Down Expand Up @@ -197,7 +196,7 @@ void maybeIncrementCommitNumberAndApply() {
//from last commit number to the entry which is quorum accepted.
for (int i = commitNumber + 1; i <= log.size(); i++) {
LogEntry logEntry = log.get(i);
if (logEntry == null || !logEntry.isQuorumAccepted(quorum())) {
if (logEntry == null || !logEntry.isQuorumAccepted(majorityQuorum())) {
break;
}
commitNumber = i;
Expand Down Expand Up @@ -261,6 +260,8 @@ public CompletableFuture<ExecuteCommandResponse> handleClientRequest(ExecuteComm
return callback.getFuture();
}

//if viewchange is higher than the node.
// Then
public void handlePrepare(Message<Prepare> message) {
Prepare prepare = message.messagePayload();
if (this.viewNumber == prepare.viewNumber) {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/replicate/wal/DurableKVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private Command deserialize(WALEntry walEntry) {
return Command.deserialize(new ByteArrayInputStream(walEntry.getData()));
}

//simulates crash.
public void close() {
wal.close();
kv.clear();
Expand Down
98 changes: 98 additions & 0 deletions src/test/java/replicate/common/DiskWritePerformanceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package replicate.common;

import java.io.*;
import java.time.Duration;
import java.time.Instant;

public class DiskWritePerformanceTest {

private static final String FILE_NAME = "testfile.bin";
private static final int WRITE_SIZE = 1024; // Size of each write in bytes
// (1 KB)
private static final int DURATION_IN_SECONDS = 20; // Duration of the test in seconds

public static void main(String[] args) throws IOException {
byte[] data = createData(WRITE_SIZE);
File perfFile = createFile(FILE_NAME);

System.out.println("Writing data to = " + perfFile);

PerformanceMetrics metrics = performWriteTest(perfFile, data, DURATION_IN_SECONDS);

printMetrics(metrics);
}

private static byte[] createData(int size) {
byte[] data = new byte[size];
for (int i = 0; i < size; i++) {
data[i] = 'A';
}
return data;
}

private static File createFile(String fileName) {
return new File(TestUtils.tempDir("perf"), fileName);
}

private static PerformanceMetrics performWriteTest(File file, byte[] data
, int durationInSeconds) throws IOException {
Instant startTime = Instant.now();
Instant endTime = startTime.plus(Duration.ofSeconds(durationInSeconds));


long numberOfWrites = writeUntil(endTime, file, data);

Instant actualEndTime = Instant.now();
Duration duration = Duration.between(startTime, actualEndTime);

return new PerformanceMetrics(numberOfWrites, duration.getSeconds(), WRITE_SIZE);
}

private static long writeUntil(Instant endTime, File file, byte[] data) throws IOException {
long numberOfWrites = 0;
FileOutputStream os =
new FileOutputStream(file);
while (Instant.now().isBefore(endTime)) {
os.write(data);
// os.getFD().sync();
numberOfWrites++;
}
return numberOfWrites;
}

private static void printMetrics(PerformanceMetrics metrics) {
double writesPerSecond = metrics.getNumberOfWrites() / metrics.getSeconds();
double mbWritten = (metrics.getNumberOfWrites() * metrics.getWriteSize()) / (1024.0 * 1024.0);
double mbPerSecond = mbWritten / metrics.getSeconds();

System.out.println("Total writes: " + metrics.getNumberOfWrites());
System.out.println("Total time: " + metrics.getSeconds() + " seconds");
System.out.println("Writes per second: " + writesPerSecond);
System.out.println("MB written: " + mbWritten);
System.out.println("MB per second: " + mbPerSecond);
}

private static class PerformanceMetrics {
private final long numberOfWrites;
private final double seconds;
private final int writeSize;

public PerformanceMetrics(long numberOfWrites, double seconds, int writeSize) {
this.numberOfWrites = numberOfWrites;
this.seconds = seconds;
this.writeSize = writeSize;
}

public long getNumberOfWrites() {
return numberOfWrites;
}

public double getSeconds() {
return seconds;
}

public int getWriteSize() {
return writeSize;
}
}
}
Loading

0 comments on commit 9401717

Please sign in to comment.