Skip to content

Commit 552aa96

Browse files
committed
changes, bugfixes and refactoring
1 parent bfdfa95 commit 552aa96

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+679
-482
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
<target>8</target>
1818
</configuration>
1919
</plugin>
20+
<plugin>
21+
<groupId>org.apache.maven.plugins</groupId>
22+
<artifactId>maven-surefire-plugin</artifactId>
23+
<version>2.19.1</version>
24+
</plugin>
2025
</plugins>
2126
</build>
2227

src/main/java/certifier/AbstractCertifier.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public abstract class AbstractCertifier implements Certifier<Long>{
1717

1818
final Timestamp<Long> currentCommitTs;
1919
final Timestamp<Long> lowWaterMark;
20+
final Timestamp<Long> lastTombStone;
2021

2122
final LinkedHashMap<Timestamp<Long>, RunningState> runningTransactions;
2223

@@ -28,6 +29,7 @@ public AbstractCertifier(long timestep){
2829
lowWaterMark = new MonotonicTimestamp(-1);
2930
runningTransactions = new LinkedHashMap<>();
3031
history = new HashMap<>();
32+
lastTombStone = new MonotonicTimestamp(timestep);
3133
}
3234

3335
public AbstractCertifier(AbstractCertifier certifier){
@@ -36,12 +38,15 @@ public AbstractCertifier(AbstractCertifier certifier){
3638
lowWaterMark = certifier.lowWaterMark;
3739
runningTransactions = new LinkedHashMap<>(certifier.runningTransactions);
3840
history = new HashMap<>(certifier.history);
41+
lastTombStone = certifier.lastTombStone;
3942
}
4043

4144
public abstract CompletableFuture<Timestamp<Long>> start();
4245

4346
public abstract long truncateStartTS(Timestamp<Long> startTimestamp);
4447

48+
public abstract long truncateForGC(Timestamp<Long> startTimestamp);
49+
4550
public abstract Timestamp<Long> treatCommit(BitWriteSet newBws, Timestamp<Long> ts);
4651

4752
public abstract void update(Timestamp<Long> commitTimestamp);
@@ -67,35 +72,37 @@ public Timestamp<Long> commit(BitWriteSet newBws, Timestamp<Long> ts) {
6772

6873
@Override
6974
public void transactionStarted(Timestamp<Long> startTimestamp) {
70-
Timestamp<Long> truncated = new MonotonicTimestamp(truncateStartTS(startTimestamp));
75+
Timestamp<Long> truncated = new MonotonicTimestamp(truncateForGC(startTimestamp));
7176
LOG.info("transaction started truncated={}", truncated.toPrimitive());
7277
this.runningTransactions.putIfAbsent(truncated, new RunningState());
7378
this.runningTransactions.get(truncated).addTransaction();
7479
}
7580

7681
@Override
77-
public void transactionCommited(Timestamp<Long> startTimestamp){
78-
Timestamp<Long> truncated = new MonotonicTimestamp(truncateStartTS(startTimestamp));
82+
public void transactionEnded(Timestamp<Long> startTimestamp){
83+
Timestamp<Long> truncated = new MonotonicTimestamp(truncateForGC(startTimestamp));
7984
LOG.info("transaction commited={}", truncated.toPrimitive());
8085
this.runningTransactions.get(truncated).removeTransaction();
8186
}
8287

8388
@Override
84-
public void setTombstone(Timestamp<Long> commitTimestamp, LocalDateTime value){
85-
LOG.info("set Tombstone={}", commitTimestamp.toPrimitive());
86-
this.runningTransactions.get(commitTimestamp).setTombstone(value);
89+
public void setTombstone(LocalDateTime value){
90+
LOG.info("set Tombstone={}", currentCommitTs.toPrimitive());
91+
this.runningTransactions.get(currentCommitTs).setTombstone(value);
8792
}
8893

8994
@Override
9095
public Timestamp<Long> getSafeToDeleteTimestamp(){
9196
Timestamp<Long> newLowWaterMark = lowWaterMark;
9297
for(Map.Entry<Timestamp<Long>, RunningState> entry : runningTransactions.entrySet()){
9398
RunningState runningState = entry.getValue();
94-
if(!runningState.isCleared())
99+
if(!runningState.isCleared()) {
95100
return new MonotonicTimestamp(newLowWaterMark);
101+
}
96102
else
97103
newLowWaterMark = entry.getKey();
98104
}
105+
LOG.info("Normal GC newLowWaterMark = {}", newLowWaterMark.toPrimitive());
99106
return new MonotonicTimestamp(newLowWaterMark);
100107
}
101108

@@ -107,25 +114,22 @@ public void evictStoredWriteSets(Long newLowWaterMark){
107114
this.runningTransactions.remove(i);
108115
}
109116
this.lowWaterMark.setPrimitive(newLowWaterMark);
117+
LOG.info("GC performed newLowWaterMark = {}", this.lowWaterMark.toPrimitive());
110118
}
111119

112120

113-
@Override
114-
public Timestamp<Long> forceEvictStoredWriteSets(LocalDateTime eventTime, long intervalSec){
115-
ArrayList<Timestamp<Long>> removed = new ArrayList<>();
116-
this.runningTransactions.forEach((k,v) -> {
117-
LocalDateTime tombstone = v.getTombstone();
121+
public Timestamp<Long> getForceDeleteTimestamp(LocalDateTime eventTime, long intervalSec){
122+
Timestamp<Long> newLowWaterMark = lowWaterMark;
123+
for(Map.Entry<Timestamp<Long>, RunningState> entry : runningTransactions.entrySet()){
124+
LocalDateTime tombstone = entry.getValue().getTombstone();
118125
if(tombstone != null){
119126
long interval = Duration.between(tombstone, eventTime).getSeconds();
120127
if(interval > intervalSec)
121-
removed.add(k);
128+
newLowWaterMark = entry.getKey();
122129
}
123-
});
124-
removed.forEach(v -> {
125-
this.runningTransactions.remove(v);
126-
this.history.remove(v);
127-
});
128-
return removed.get(removed.size() - 1);
130+
}
131+
LOG.info("Force GC newLowWaterMark = {}", newLowWaterMark.toPrimitive());
132+
return newLowWaterMark;
129133
}
130134

131135
public Timestamp<Long> getCurrentCommitTs() {

src/main/java/certifier/Certifier.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ public interface Certifier<V> extends Serializable {
1414
Timestamp<V> getCurrentCommitTs();
1515
Timestamp<V> getSafeToDeleteTimestamp();
1616
void evictStoredWriteSets(V newLowWaterMark);
17-
Timestamp<Long> forceEvictStoredWriteSets(LocalDateTime eventTime, long intervalSec);
18-
void setTombstone(Timestamp<Long> commitTimestamp, LocalDateTime value);
19-
void transactionCommited(Timestamp<Long> startTimestamp);
17+
Timestamp<Long> getForceDeleteTimestamp(LocalDateTime eventTime, long intervalSec);
18+
void setTombstone(LocalDateTime value);
19+
void transactionEnded(Timestamp<Long> startTimestamp);
2020
void transactionStarted(Timestamp<Long> startTimestamp);
2121
}

src/main/java/certifier/CertifierImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import transaction_manager.utils.BitWriteSet;
66

77
import java.io.Serializable;
8+
import java.time.LocalDateTime;
89
import java.util.concurrent.CompletableFuture;
910

1011
public class CertifierImpl extends AbstractCertifier {
@@ -31,6 +32,11 @@ public long truncateStartTS(Timestamp<Long> startTimestamp) {
3132
return startTimestamp.toPrimitive();
3233
}
3334

35+
@Override
36+
public long truncateForGC(Timestamp<Long> startTimestamp) {
37+
return startTimestamp.toPrimitive();
38+
}
39+
3440
@Override
3541
public Timestamp<Long> treatCommit(BitWriteSet newBws, Timestamp<Long> ts){
3642
if (isWritable(newBws, ts, currentCommitTs)) {

src/main/java/certifier/IntervalCertifierImpl.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@
33
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
55
import transaction_manager.utils.BitWriteSet;
6-
7-
import java.io.Serializable;
86
import java.util.LinkedList;
9-
10-
import java.util.Queue;
117
import java.util.concurrent.CompletableFuture;
128

139

@@ -18,6 +14,7 @@ public class IntervalCertifierImpl extends AbstractCertifier {
1814
private final Timestamp<Long> provisionalCommitTs;
1915
private final LinkedList<CompletableFuture<Void>> startsOnWait;
2016

17+
2118
public IntervalCertifierImpl(long timestep) {
2219
super(timestep);
2320
currentStartTs = new MonotonicTimestamp(0);
@@ -49,7 +46,12 @@ public CompletableFuture<Timestamp<Long>> start() {
4946

5047
@Override
5148
public long truncateStartTS(Timestamp<Long> startTimestamp) {
52-
return startTimestamp.toPrimitive() / timestep * timestep + timestep;
49+
return truncateForGC(startTimestamp) + timestep;
50+
}
51+
52+
@Override
53+
public long truncateForGC(Timestamp<Long> startTimestamp) {
54+
return startTimestamp.toPrimitive() / timestep * timestep;
5355
}
5456

5557
@Override

src/main/java/npvs/AbstractNPVS.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import certifier.MonotonicTimestamp;
44
import certifier.Timestamp;
55
import npvs.messaging.FlushMessage;
6+
import npvs.messaging.NPVSReply;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89
import transaction_manager.utils.ByteArrayWrapper;
@@ -15,17 +16,26 @@ public abstract class AbstractNPVS implements NPVS<Long>{
1516
private static final Logger LOG = LoggerFactory.getLogger(AbstractNPVS.class);
1617
private final HashSet<Timestamp<Long>> requestsMade;
1718
private final Timestamp<Long> currentCommitTs;
19+
private Timestamp<Long> rebootTs;
1820

1921
public AbstractNPVS(){
2022
this.requestsMade = new HashSet<>();
2123
this.currentCommitTs = new MonotonicTimestamp(0);
24+
this.rebootTs = new MonotonicTimestamp(-1);
2225
}
2326

24-
public abstract void evictVersions(Timestamp<Long> lowWaterMark);
27+
public abstract void evict(Timestamp<Long> lowWaterMark);
2528

2629
public abstract CompletableFuture<Void> putImpl(Map<ByteArrayWrapper, byte[]> writeMap, Timestamp<Long> ts);
2730

28-
public abstract CompletableFuture<NPVSReply> get(ByteArrayWrapper key, Timestamp<Long> ts);
31+
public abstract CompletableFuture<NPVSReply> getImpl(ByteArrayWrapper key, Timestamp<Long> ts);
32+
33+
public CompletableFuture<NPVSReply> get(ByteArrayWrapper key, Timestamp<Long> ts){
34+
if (ts.isBefore(rebootTs))
35+
return CompletableFuture.completedFuture(NPVSReply.FAIL());
36+
else
37+
return getImpl(key, ts);
38+
}
2939

3040
@Override
3141
public CompletableFuture<Void> put(FlushMessage flushMessage) {
@@ -41,4 +51,8 @@ public CompletableFuture<Void> put(FlushMessage flushMessage) {
4151
}
4252
return putImpl(flushMessage.getWriteMap(), flushMessage.getCurrentTimestamp());
4353
}
54+
55+
public void setRebootTs(Timestamp<Long> rebootTs) {
56+
this.rebootTs = rebootTs;
57+
}
4458
}

src/main/java/npvs/NPVS.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import certifier.Timestamp;
44
import npvs.messaging.FlushMessage;
5+
import npvs.messaging.NPVSReply;
56
import transaction_manager.utils.ByteArrayWrapper;
67

78
import java.util.concurrent.CompletableFuture;
@@ -10,4 +11,5 @@ public interface NPVS<V> {
1011

1112
CompletableFuture<Void> put(FlushMessage flushMessage);
1213
CompletableFuture<NPVSReply> get(ByteArrayWrapper key, Timestamp<V> ts);
14+
void evict(Timestamp<Long> lowWaterMark);
1315
}

src/main/java/npvs/RaftMessagingService.java renamed to src/main/java/npvs/NPVSRaftClient.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@
1414
import java.util.Random;
1515
import java.util.concurrent.TimeoutException;
1616

17-
public class RaftMessagingService {
17+
import static transaction_manager.raft.sofa_jraft.rpc.RaftMessagingService.getResponseFromFollower;
18+
import static transaction_manager.raft.sofa_jraft.rpc.RaftMessagingService.refreshLeader;
19+
20+
public class NPVSRaftClient {
1821

1922
private final CliClientServiceImpl cliClientService;
20-
private final String groupId;
2123
private PeerId leader;
2224

2325

24-
public RaftMessagingService(String groupId, String confStr){
25-
this.groupId = groupId;
26+
public NPVSRaftClient(String groupId, String confStr){
2627
Configuration conf = new Configuration();
2728
if (!conf.parse(confStr)) {
2829
throw new IllegalArgumentException("Fail to parse conf:" + confStr);
@@ -31,33 +32,16 @@ public RaftMessagingService(String groupId, String confStr){
3132
cliClientService = new CliClientServiceImpl();
3233
cliClientService.init(new CliOptions());
3334
try {
34-
refreshLeader();
35+
leader = refreshLeader(cliClientService, groupId);
3536
} catch (TimeoutException | InterruptedException e) {
3637
e.printStackTrace();
3738
}
3839
}
3940

40-
// raft servers should be ready first
41-
public void refreshLeader() throws TimeoutException, InterruptedException {
42-
if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
43-
int waitMs = 2000;
44-
Thread.sleep(waitMs);
45-
System.out.println("Waiting for raft, retrying in " + waitMs/1000 + " seconds");
46-
refreshLeader();
47-
}
48-
leader = RouteTable.getInstance().selectLeader(groupId);
49-
System.out.println("Leader is " + leader);
50-
}
51-
5241
@SuppressWarnings("unchecked")
5342
public Timestamp<Long> getTimestamp() {
5443
try {
55-
List<PeerId> pps = RouteTable.getInstance().getConfiguration("manager").getPeers();
56-
pps.remove(leader);
57-
int randomIndex = new Random().nextInt(pps.size());
58-
PeerId pp = pps.get(randomIndex);
59-
60-
return ((ValueResponse<Timestamp<Long>>) cliClientService.getRpcClient().invokeSync(pp.getEndpoint(), new GetTimestamp(), 30000)).getValue();
44+
return getResponseFromFollower(cliClientService, new GetTimestamp(), leader);
6145
} catch (InterruptedException | RemotingException e) {
6246
e.printStackTrace();
6347
}

src/main/java/npvs/NPVSReply.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)