Skip to content

Commit

Permalink
Fixed bugs in BinarySearcherOverPlainSource where the record offset a…
Browse files Browse the repository at this point in the history
…nd record header cache could go out of sync
  • Loading branch information
Aklakan committed Sep 9, 2024
1 parent 717c2f9 commit ecd0c3d
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.aksw.commons.io.input;

import java.io.IOException;

public class ReadableChannelTracker<A, X extends ReadableChannel<A>>
extends ReadableChannelDecoratorBase<A, X>
{
protected long totalReadBytes;
protected long totalReadDuration;

public ReadableChannelTracker(X delegate) {
super(delegate);
}

// TODO Track number of bytes read + time spent
// On close add the amount to the source - so stats are best accessed on the source.
// @Override
// public int read(A array, int position, int length) throws IOException {
// }
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package org.aksw.commons.io.hadoop.binseach.v2;

public class BinSearchLevelCache {
protected int flowLevel;
protected int fluidLevel;
protected BinSearchCache fixedCache;
protected BinSearchCache fluidCache;

public BinSearchLevelCache() {
this(16, new BinSearchCacheFixed(), new BinSearchCacheFluid());
}

public BinSearchLevelCache(int flowLevel, BinSearchCache fixedCache, BinSearchCache flowCache) {
public BinSearchLevelCache(int fluidLevel, BinSearchCache fixedCache, BinSearchCache flowCache) {
super();
this.flowLevel = flowLevel;
this.fluidLevel = fluidLevel;
this.fixedCache = fixedCache;
this.fluidCache = flowCache;
}
Expand All @@ -30,7 +30,7 @@ public HeaderRecord getHeader(long position) {
// }

public void setHeader(int depth, HeaderRecord headerRecord) {
BinSearchCache target = depth < flowLevel ? fixedCache : fluidCache;
BinSearchCache target = depth < fluidLevel ? fixedCache : fluidCache;
target.setHeader(headerRecord);
}

Expand All @@ -43,7 +43,7 @@ public long getDisposition(long position) {
}

public void setDisposition(int depth, long from, long to) {
BinSearchCache target = depth < flowLevel ? fixedCache : fluidCache;
BinSearchCache target = depth < fluidLevel ? fixedCache : fluidCache;
target.setDisposition(from, to);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ record CacheEntry(BinSearchLevelCache levelCache, Cache<Long, Block> blockCache)
public BinSearchResourceCache(int maxCacheSize) {
this(Caffeine.newBuilder().maximumSize(maxCacheSize).build(), () -> {
return new CacheEntry(BinSearchLevelCache.dftCache(), Caffeine.newBuilder().maximumSize(16).build());
// return new CacheEntry(BinSearchLevelCache.noCache(), Caffeine.newBuilder().maximumSize(16).build());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ public static long adjustStart(BlockSource blockSource, long start, int depth, B
/**
* When this method returns the input stream's position is unspecified.
*
* <p>
* Note on cache semantics:
* <ol>
* <li>Disposition is the mapping from the current offset to that of the next block (NOT the record).</li>
* <li>The header map maps the block id to the first record</li>
* </ol>
*
* @param in The seekable input stream on which to perform binary search for the given prefix.
* @param searchMode Whether we are searching the initial match, or the start or end of a run of matches.
* @param start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,51 +125,70 @@ public static Match binarySearch(

long mid = (start + end) >> 1; // division by 2

if (false) {
System.out.println(String.format("%d <= %d < %d)", start, mid, end));
}

if (logger.isDebugEnabled()) {
logger.debug(String.format("%d <= %d < %d)", start, mid, end));
}
in.position(mid);

int cmp = 0;
// Find the next record start

long nextDelimPos = cache.getDisposition(mid);
long bytesToNextDelimiter = -1;
if (nextDelimPos == -1) {
long bytesToNextDelimiter = 0;
in.position(mid);

if (mid > 0) {
long allowedSearchBytes = end - mid;
bytesToNextDelimiter = BinSearchUtils.readUntilDelimiter(in, delimiter, allowedSearchBytes);
}

// -1 means that no delimiter was found
// if there is no more record starting from mid then continue searching on the left half
if (bytesToNextDelimiter < 0) {
nextDelimPos = mid;
cmp = -1;
nextDelimPos = mid - 1; // Slightly hacky using mid-1 to signal no more delimiter
} else {
nextDelimPos = mid + bytesToNextDelimiter;
}
cache.setDisposition(depth, mid, nextDelimPos);
}

if (nextDelimPos < mid) {
cmp = -1;
nextDelimPos = mid;
}

if (cmp == 0) {
HeaderRecord headerRecord = cache.getHeader(nextDelimPos);
int l = prefix.length;
if (headerRecord == null || (headerRecord.data().length < prefix.length && !headerRecord.isDataConsumed())) {
// If bytesToNextDelimiter is -1 it means that the next delimiter position
// was taken from cache - we then need to position 'in' to the delimiter position
if (bytesToNextDelimiter == -1) {
in.position(nextDelimPos);
}

boolean isDataConsumed = false;
// byte[] header = headerRecord.data();
int blockSize = Math.max(prefix.length, 256);
byte[] header = new byte[blockSize];
// XXX resort to a readFully without extra wrapping
// TODO Input stream may need to be reinitialized
int n = ReadableChannels.readFully(ReadableChannels.wrap(in), header, 0, blockSize);
if (n < blockSize) {
isDataConsumed = true;
header = Arrays.copyOf(header, n);
}
if (header.length < prefix.length) {
cmp = -1;
}
headerRecord = new HeaderRecord(nextDelimPos, 0, header, isDataConsumed);
cache.setHeader(depth, headerRecord);
}
// FIXME This condition seems wrong
// We need to compare the available bytes and if all match then ...?
if (headerRecord.data().length < prefix.length) {
cmp = -1;
}
if (cmp == 0) {
cmp = Arrays.compare(prefix, 0, l, headerRecord.data(), 0, l);
}
Expand Down

0 comments on commit ecd0c3d

Please sign in to comment.