Skip to content

Commit

Permalink
BinSearchCacheFixed is now thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Sep 8, 2024
1 parent 1c2e8a6 commit 717c2f9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.aksw.commons.util.lock.LockUtils;

class BinSearchCacheFixed
implements BinSearchCache
Expand All @@ -13,6 +17,9 @@ class BinSearchCacheFixed
protected NavigableMap<Long, Long> fixedDispositions;
protected Map<Long, HeaderRecord> fixedHeaders;

protected ReadWriteLock dispositionLock = new ReentrantReadWriteLock();
protected ReadWriteLock headerLock = new ReentrantReadWriteLock();

public BinSearchCacheFixed() {
super();
this.fixedDispositions = new TreeMap<>();
Expand All @@ -21,64 +28,72 @@ public BinSearchCacheFixed() {

@Override
public long getDisposition(long position) {
long result = -1;
if (fixedHeaders.containsKey(position)) {
result = position;
} else {
Entry<Long, Long> e = fixedDispositions.floorEntry(position);
if (e != null) {
// long from = e.getKey();
long to = e.getValue();
if (position <= to) {
result = to;
return LockUtils.runWithLock(dispositionLock.readLock(), () -> {
long result = -1;
if (fixedHeaders.containsKey(position)) {
result = position;
} else {
Entry<Long, Long> e = fixedDispositions.floorEntry(position);
if (e != null) {
// long from = e.getKey();
long to = e.getValue();
if (position <= to) {
result = to;
}
}
}
}
return result;
return result;
});
}

@Override
public void setDisposition(long from, long to) {
Entry<Long, Long> e = fixedDispositions.floorEntry(from);
if (e != null) {
long cachedFrom = e.getKey();
long cachedTo = e.getValue();
LockUtils.runWithLock(dispositionLock.writeLock(), () -> {
Entry<Long, Long> e = fixedDispositions.floorEntry(from);
if (e != null) {
long cachedFrom = e.getKey();
long cachedTo = e.getValue();

// issues to check for:
// TODO cached range overlaps with starting point (cachedTo > from)
if (cachedTo > to) {
// new: [ ]
// cached: [ ]
throw new IllegalStateException(String.format("The upper endoint overlaps with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo));
} else if (cachedTo == to) {
// Update an existing entry with a lower boundary
if (from < cachedFrom) {
fixedDispositions.remove(cachedFrom);
// issues to check for:
// TODO cached range overlaps with starting point (cachedTo > from)
if (cachedTo > to) {
// new: [ ]
// cached: [ ]
throw new IllegalStateException(String.format("The upper endoint overlaps with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo));
} else if (cachedTo == to) {
// Update an existing entry with a lower boundary
if (from < cachedFrom) {
fixedDispositions.remove(cachedFrom);
fixedDispositions.put(from, to);
}
} else { // to < cachedTo
// Sanity check: New's lower endpoint must not overlap
// new: [ ]
// existing: [ ]
if (from <= cachedTo) {
throw new IllegalStateException(String.format("Overlap with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo));
}
fixedDispositions.put(from, to);
}
} else { // to < cachedTo
// Sanity check: New's lower endpoint must not overlap
// new: [ ]
// existing: [ ]
if (from <= cachedTo) {
throw new IllegalStateException(String.format("Overlap with an existing entry: [%d, %d] -> [%d, %d]", from, to, cachedFrom, cachedTo));
}
} else {
fixedDispositions.put(from, to);
}
} else {
fixedDispositions.put(from, to);
}
});
}

@Override
public HeaderRecord getHeader(long position) {
HeaderRecord result = fixedHeaders.get(position);
return result;
return LockUtils.runWithLock(headerLock.readLock(), () -> {
HeaderRecord result = fixedHeaders.get(position);
return result;
});
}

@Override
public void setHeader(HeaderRecord headerRecord) {
// HeaderRecord headerRecord = new HeaderRecord(position, disposition, header, isDataConsumed);
fixedHeaders.put(headerRecord.position(), headerRecord);
LockUtils.runWithLock(headerLock.writeLock(), () -> {
// HeaderRecord headerRecord = new HeaderRecord(position, disposition, header, isDataConsumed);
fixedHeaders.put(headerRecord.position(), headerRecord);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static InputStream configureStream(
new ReadableByteChannelForLinesMatchingPrefix(
SeekableInputStreams.wrap(in), scanState));
} else {
System.err.println("NO MATCH for " + new String(prefix));
// System.err.println("NO MATCH for " + new String(prefix));
in.close();
result = InputStream.nullInputStream(); // ReadableChannels.newInputStream(ReadableChannels.limit(in, 0));
}
Expand Down

0 comments on commit 717c2f9

Please sign in to comment.