diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java index 70ae44bf..a5964670 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java @@ -125,29 +125,59 @@ public Stream> parallelSearch(byte[] prefix) thr public static Match binarySearch(BlockSource blockSource, byte[] prefix, BinSearchLevelCache cache) throws IOException { long end = blockSource.size(); - Match result = binarySearch(blockSource, SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, cache); + + long startAfter = adjustStart(blockSource, 1, 0, cache); + // System.out.println("StartAfter:" + startAfter); + +// long startAfter2 = adjustStart(blockSource, startAfter, 0, cache); +// // adjustStart(adjustedStart) -> adjustedStart +// System.out.println("StartAfter2:" + startAfter2 + 1); + + Match result = binarySearch(blockSource, SearchMode.BOTH, 0, 0, startAfter, end, (byte)'\n', prefix, cache); return result; } + /** Adjust a position to the next block. It must hold that this function is idempotent: adjustStart(adjustStart(offset)) = adjustStart(offset) */ + public static long adjustStart(BlockSource blockSource, long start, int depth, BinSearchLevelCache cache) throws IOException { + long currentBlockId = cache.getDisposition(start); + if (currentBlockId == -1) { + try (BlockSourceChannelAdapter channel = blockSource.newReadableChannel(start)) { + currentBlockId = channel.getStartingBlockId(); + if (currentBlockId == -1) { + throw new IllegalStateException("Should not happen: Block id not set after read."); + } + cache.setDisposition(depth, start, currentBlockId); + // System.out.println("Adjusted: " + start + " -> " + currentBlockId); + } + } + return currentBlockId; + } + /** * When this method returns the input stream's position is unspecified. * * @param in The seekable input stream on which to perform binary search for the given prefix. * @param searchMode Whether we are searching the initial match, or the start or end of a run of matches. * @param start + * @param startAfter The adjusted start for the offset (start + 1) * @param end * @param delimiter * @param prefix * @return * @throws IOException */ - public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, int depth, long start, long end, byte delimiter, byte[] prefix, BinSearchLevelCache cache) throws IOException { + public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, int depth, long start, long startAfter, long end, byte delimiter, byte[] prefix, BinSearchLevelCache cache) throws IOException { if (start > end) { return null; } long mid = (start + end) >> 1; // division by 2 + // If mid lies within the starting block then adjust it to start + if (mid > start && mid <= startAfter) { + mid = startAfter; + } + if (false) { System.out.println(String.format("%d <= %d < %d)", start, mid, end)); } @@ -158,7 +188,7 @@ public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, long currentBlockId; - InputStream in = null; // Created on demand + InputStream in = null; // Initialized on demand try { if (mid > 0) { nextBlockId = cache.getDisposition(mid); @@ -166,34 +196,24 @@ public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, BlockSourceChannelAdapter channel = blockSource.newReadableChannel(mid); in = new InputStreamWithZeroOffsetRead(SeekableInputStreams.create(channel)); + // The start blockId is the position such that + // blockSource.newReadableChannel(startBlockId + 1) would return the next block startBlockId = channel.getStartingBlockId(); currentBlockId = channel.getCurrentBlockId(); - long effectiveBlockId = startBlockId; - // System.out.println(String.format("start %s - current %s", startBlockId, currentBlockId)); - if (effectiveBlockId == -1) { + if (startBlockId == -1) { throw new IllegalStateException("Should not happen: Block id not set after read."); } -// if (effectiveBlockId > end) { -// // nextBlockId = mid; -// nextBlockId = effectiveBlockId; -// cmp = -1; -// } -// else { -// nextBlockId = effectiveBlockId; -// } - - nextBlockId = effectiveBlockId; - cache.setDisposition(depth, mid, effectiveBlockId); + nextBlockId = startBlockId; + cache.setDisposition(depth, mid, startBlockId); } } else { nextBlockId = 0; } -// if (cmp == 0) { HeaderRecord headerRecord = cache.getHeader(nextBlockId); int l = prefix.length; if (headerRecord == null || (headerRecord.data().length < prefix.length && !headerRecord.isDataConsumed())) { @@ -251,16 +271,21 @@ public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, // Find the start of a run: // Continue searching left - if there is no match then return the candidate result if (nextBlockId != 0) { - Match expandLeft = binarySearch(blockSource, SearchMode.LEFT, depth, start, mid, delimiter, prefix, cache); - if (expandLeft != null) { - left = expandLeft.start(); + if (mid != start) { // We can't expand if we are already at start + long nextEnd = mid <= startAfter ? start : mid; + Match expandLeft = binarySearch(blockSource, SearchMode.LEFT, depth, start, startAfter, nextEnd, delimiter, prefix, cache); + if (expandLeft != null) { + left = expandLeft.start(); + } } } } boolean findEndOfRun = false; if (findEndOfRun) { if (SearchMode.RIGHT.equals(searchMode) || SearchMode.BOTH.equals(searchMode)) { - Match expandRight = binarySearch(blockSource, SearchMode.RIGHT, depth, nextBlockId + 1, end, delimiter, prefix, cache); + long nextStart = nextBlockId + 1; + long nextStartAfter = adjustStart(blockSource, nextStart + 1 , depth, cache); + Match expandRight = binarySearch(blockSource, SearchMode.RIGHT, depth, nextStart, nextStartAfter, end, delimiter, prefix, cache); if (expandRight != null) { right = expandRight.end(); } @@ -268,20 +293,28 @@ public static Match binarySearch(BlockSource blockSource, SearchMode searchMode, } result = new Match(left, right); } else if(cmp < 0) { - long nextEnd = mid; //nextBlockId - 1; - if (start >= nextEnd) { + // long nextEnd = mid; //nextBlockId - 1; + if (mid == start) { result = new Match(start, start); + } else if (mid <= startAfter) { + // set end to start + result = binarySearch(blockSource, searchMode, depth, start, startAfter, start, delimiter, prefix, cache); } else { - result = binarySearch(blockSource, searchMode, depth, start, nextEnd, delimiter, prefix, cache); + result = binarySearch(blockSource, searchMode, depth, start, startAfter, mid, delimiter, prefix, cache); } } else { // If the prefix compares greater than the header it may still be in the starting page // so do not increment the start long nextStart = nextBlockId; - if (nextStart + 1>= end) { + + // if (nextStart + 1 >= end) { + // if (startAfter + 1 >= end) { + if (end <= startAfter) { + // If the end marker is within the starting block then return the starting block result = new Match(start, start); } else { - result = binarySearch(blockSource, searchMode, depth, nextStart, end, delimiter, prefix, cache); + long nextStartAfter = adjustStart(blockSource, nextStart + 1 , depth, cache); + result = binarySearch(blockSource, searchMode, depth, nextStart, nextStartAfter, end, delimiter, prefix, cache); } }