Skip to content

Commit

Permalink
Optimized block-based binary search to now abort once search range li…
Browse files Browse the repository at this point in the history
…es within the same block
  • Loading branch information
Aklakan committed Sep 6, 2024
1 parent 1c50aa2 commit 91b9bf8
Showing 1 changed file with 60 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,29 +125,59 @@ public Stream<ReadableChannelSupplier<byte[]>> parallelSearch(byte[] prefix) thr

public static Match binarySearch(BlockSource blockSource, byte[] prefix, BinSearchLevelCache cache) throws IOException {
long end = blockSource.size();
Match result = binarySearch(blockSource, SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, cache);

long startAfter = adjustStart(blockSource, 1, 0, cache);
// System.out.println("StartAfter:" + startAfter);

// long startAfter2 = adjustStart(blockSource, startAfter, 0, cache);
// // adjustStart(adjustedStart) -> adjustedStart
// System.out.println("StartAfter2:" + startAfter2 + 1);

Match result = binarySearch(blockSource, SearchMode.BOTH, 0, 0, startAfter, end, (byte)'\n', prefix, cache);
return result;
}

/** Adjust a position to the next block. It must hold that this function is idempotent: adjustStart(adjustStart(offset)) = adjustStart(offset) */
public static long adjustStart(BlockSource blockSource, long start, int depth, BinSearchLevelCache cache) throws IOException {
long currentBlockId = cache.getDisposition(start);
if (currentBlockId == -1) {
try (BlockSourceChannelAdapter channel = blockSource.newReadableChannel(start)) {
currentBlockId = channel.getStartingBlockId();
if (currentBlockId == -1) {
throw new IllegalStateException("Should not happen: Block id not set after read.");
}
cache.setDisposition(depth, start, currentBlockId);
// System.out.println("Adjusted: " + start + " -> " + currentBlockId);
}
}
return currentBlockId;
}

/**
* When this method returns the input stream's position is unspecified.
*
* @param in The seekable input stream on which to perform binary search for the given prefix.
* @param searchMode Whether we are searching the initial match, or the start or end of a run of matches.
* @param start
* @param startAfter The adjusted start for the offset (start + 1)
* @param end
* @param delimiter
* @param prefix
* @return
* @throws IOException
*/
public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, int depth, long start, long end, byte delimiter, byte[] prefix, BinSearchLevelCache cache) throws IOException {
public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, int depth, long start, long startAfter, long end, byte delimiter, byte[] prefix, BinSearchLevelCache cache) throws IOException {
if (start > end) {
return null;
}

long mid = (start + end) >> 1; // division by 2

// If mid lies within the starting block then adjust it to start
if (mid > start && mid <= startAfter) {
mid = startAfter;
}

if (false) {
System.out.println(String.format("%d <= %d < %d)", start, mid, end));
}
Expand All @@ -158,42 +188,32 @@ public static Match binarySearch(BlockSource blockSource, SearchMode searchMode,
long currentBlockId;


InputStream in = null; // Created on demand
InputStream in = null; // Initialized on demand
try {
if (mid > 0) {
nextBlockId = cache.getDisposition(mid);
if (nextBlockId == -1) {
BlockSourceChannelAdapter channel = blockSource.newReadableChannel(mid);
in = new InputStreamWithZeroOffsetRead(SeekableInputStreams.create(channel));

// The start blockId is the position such that
// blockSource.newReadableChannel(startBlockId + 1) would return the next block
startBlockId = channel.getStartingBlockId();
currentBlockId = channel.getCurrentBlockId();

long effectiveBlockId = startBlockId;

// System.out.println(String.format("start %s - current %s", startBlockId, currentBlockId));

if (effectiveBlockId == -1) {
if (startBlockId == -1) {
throw new IllegalStateException("Should not happen: Block id not set after read.");
}

// if (effectiveBlockId > end) {
// // nextBlockId = mid;
// nextBlockId = effectiveBlockId;
// cmp = -1;
// }
// else {
// nextBlockId = effectiveBlockId;
// }

nextBlockId = effectiveBlockId;
cache.setDisposition(depth, mid, effectiveBlockId);
nextBlockId = startBlockId;
cache.setDisposition(depth, mid, startBlockId);
}
} else {
nextBlockId = 0;
}

// if (cmp == 0) {
HeaderRecord headerRecord = cache.getHeader(nextBlockId);
int l = prefix.length;
if (headerRecord == null || (headerRecord.data().length < prefix.length && !headerRecord.isDataConsumed())) {
Expand Down Expand Up @@ -251,37 +271,50 @@ public static Match binarySearch(BlockSource blockSource, SearchMode searchMode,
// Find the start of a run:
// Continue searching left - if there is no match then return the candidate result
if (nextBlockId != 0) {
Match expandLeft = binarySearch(blockSource, SearchMode.LEFT, depth, start, mid, delimiter, prefix, cache);
if (expandLeft != null) {
left = expandLeft.start();
if (mid != start) { // We can't expand if we are already at start
long nextEnd = mid <= startAfter ? start : mid;
Match expandLeft = binarySearch(blockSource, SearchMode.LEFT, depth, start, startAfter, nextEnd, delimiter, prefix, cache);
if (expandLeft != null) {
left = expandLeft.start();
}
}
}
}
boolean findEndOfRun = false;
if (findEndOfRun) {
if (SearchMode.RIGHT.equals(searchMode) || SearchMode.BOTH.equals(searchMode)) {
Match expandRight = binarySearch(blockSource, SearchMode.RIGHT, depth, nextBlockId + 1, end, delimiter, prefix, cache);
long nextStart = nextBlockId + 1;
long nextStartAfter = adjustStart(blockSource, nextStart + 1 , depth, cache);
Match expandRight = binarySearch(blockSource, SearchMode.RIGHT, depth, nextStart, nextStartAfter, end, delimiter, prefix, cache);
if (expandRight != null) {
right = expandRight.end();
}
}
}
result = new Match(left, right);
} else if(cmp < 0) {
long nextEnd = mid; //nextBlockId - 1;
if (start >= nextEnd) {
// long nextEnd = mid; //nextBlockId - 1;
if (mid == start) {
result = new Match(start, start);
} else if (mid <= startAfter) {
// set end to start
result = binarySearch(blockSource, searchMode, depth, start, startAfter, start, delimiter, prefix, cache);
} else {
result = binarySearch(blockSource, searchMode, depth, start, nextEnd, delimiter, prefix, cache);
result = binarySearch(blockSource, searchMode, depth, start, startAfter, mid, delimiter, prefix, cache);
}
} else {
// If the prefix compares greater than the header it may still be in the starting page
// so do not increment the start
long nextStart = nextBlockId;
if (nextStart + 1>= end) {

// if (nextStart + 1 >= end) {
// if (startAfter + 1 >= end) {
if (end <= startAfter) {
// If the end marker is within the starting block then return the starting block
result = new Match(start, start);
} else {
result = binarySearch(blockSource, searchMode, depth, nextStart, end, delimiter, prefix, cache);
long nextStartAfter = adjustStart(blockSource, nextStart + 1 , depth, cache);
result = binarySearch(blockSource, searchMode, depth, nextStart, nextStartAfter, end, delimiter, prefix, cache);
}
}

Expand Down

0 comments on commit 91b9bf8

Please sign in to comment.