From ecd0c3d316eaa028ab670494c79214aac119ad29 Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Mon, 9 Sep 2024 21:21:13 +0200 Subject: [PATCH] Fixed bugs in BinarySearcherOverPlainSource where the record offset and record header cache could go out of sync --- .../io/input/ReadableChannelTracker.java | 20 +++++++++++ .../binseach/v2/BinSearchLevelCache.java | 10 +++--- .../binseach/v2/BinSearchResourceCache.java | 1 + .../v2/BinarySearcherOverBlockSource.java | 7 ++++ .../v2/BinarySearcherOverPlainSource.java | 35 ++++++++++++++----- 5 files changed, 60 insertions(+), 13 deletions(-) create mode 100644 aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelTracker.java diff --git a/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelTracker.java b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelTracker.java new file mode 100644 index 00000000..e5a40032 --- /dev/null +++ b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelTracker.java @@ -0,0 +1,20 @@ +package org.aksw.commons.io.input; + +import java.io.IOException; + +public class ReadableChannelTracker> + extends ReadableChannelDecoratorBase +{ + protected long totalReadBytes; + protected long totalReadDuration; + + public ReadableChannelTracker(X delegate) { + super(delegate); + } + + // TODO Track number of bytes read + time spent + // On close add the amount to the source - so stats are best accessed on the source. +// @Override +// public int read(A array, int position, int length) throws IOException { +// } +} diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchLevelCache.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchLevelCache.java index fcd5d993..d313ce48 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchLevelCache.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchLevelCache.java @@ -1,7 +1,7 @@ package org.aksw.commons.io.hadoop.binseach.v2; public class BinSearchLevelCache { - protected int flowLevel; + protected int fluidLevel; protected BinSearchCache fixedCache; protected BinSearchCache fluidCache; @@ -9,9 +9,9 @@ public BinSearchLevelCache() { this(16, new BinSearchCacheFixed(), new BinSearchCacheFluid()); } - public BinSearchLevelCache(int flowLevel, BinSearchCache fixedCache, BinSearchCache flowCache) { + public BinSearchLevelCache(int fluidLevel, BinSearchCache fixedCache, BinSearchCache flowCache) { super(); - this.flowLevel = flowLevel; + this.fluidLevel = fluidLevel; this.fixedCache = fixedCache; this.fluidCache = flowCache; } @@ -30,7 +30,7 @@ public HeaderRecord getHeader(long position) { // } public void setHeader(int depth, HeaderRecord headerRecord) { - BinSearchCache target = depth < flowLevel ? fixedCache : fluidCache; + BinSearchCache target = depth < fluidLevel ? fixedCache : fluidCache; target.setHeader(headerRecord); } @@ -43,7 +43,7 @@ public long getDisposition(long position) { } public void setDisposition(int depth, long from, long to) { - BinSearchCache target = depth < flowLevel ? fixedCache : fluidCache; + BinSearchCache target = depth < fluidLevel ? fixedCache : fluidCache; target.setDisposition(from, to); } diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java index e3c96977..2a06b373 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java @@ -16,6 +16,7 @@ record CacheEntry(BinSearchLevelCache levelCache, Cache blockCache) public BinSearchResourceCache(int maxCacheSize) { this(Caffeine.newBuilder().maximumSize(maxCacheSize).build(), () -> { return new CacheEntry(BinSearchLevelCache.dftCache(), Caffeine.newBuilder().maximumSize(16).build()); + // return new CacheEntry(BinSearchLevelCache.noCache(), Caffeine.newBuilder().maximumSize(16).build()); }); } diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java index 94357392..6ea6fb16 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java @@ -160,6 +160,13 @@ public static long adjustStart(BlockSource blockSource, long start, int depth, B /** * When this method returns the input stream's position is unspecified. * + *

+ * Note on cache semantics: + *

    + *
  1. Disposition is the mapping from the current offset to that of the next block (NOT the record).
  2. + *
  3. The header map maps the block id to the first record
  4. + *
+ * * @param in The seekable input stream on which to perform binary search for the given prefix. * @param searchMode Whether we are searching the initial match, or the start or end of a run of matches. * @param start diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java index c2b6e1c1..c36fba4d 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java @@ -125,51 +125,70 @@ public static Match binarySearch( long mid = (start + end) >> 1; // division by 2 + if (false) { + System.out.println(String.format("%d <= %d < %d)", start, mid, end)); + } + if (logger.isDebugEnabled()) { logger.debug(String.format("%d <= %d < %d)", start, mid, end)); } - in.position(mid); int cmp = 0; // Find the next record start long nextDelimPos = cache.getDisposition(mid); + long bytesToNextDelimiter = -1; if (nextDelimPos == -1) { - long bytesToNextDelimiter = 0; + in.position(mid); + if (mid > 0) { long allowedSearchBytes = end - mid; bytesToNextDelimiter = BinSearchUtils.readUntilDelimiter(in, delimiter, allowedSearchBytes); } + // -1 means that no delimiter was found + // if there is no more record starting from mid then continue searching on the left half if (bytesToNextDelimiter < 0) { - nextDelimPos = mid; - cmp = -1; + nextDelimPos = mid - 1; // Slightly hacky using mid-1 to signal no more delimiter } else { nextDelimPos = mid + bytesToNextDelimiter; } cache.setDisposition(depth, mid, nextDelimPos); } + if (nextDelimPos < mid) { + cmp = -1; + nextDelimPos = mid; + } + if (cmp == 0) { HeaderRecord headerRecord = cache.getHeader(nextDelimPos); int l = prefix.length; if (headerRecord == null || (headerRecord.data().length < prefix.length && !headerRecord.isDataConsumed())) { + // If bytesToNextDelimiter is -1 it means that the next delimiter position + // was taken from cache - we then need to position 'in' to the delimiter position + if (bytesToNextDelimiter == -1) { + in.position(nextDelimPos); + } + boolean isDataConsumed = false; - // byte[] header = headerRecord.data(); int blockSize = Math.max(prefix.length, 256); byte[] header = new byte[blockSize]; // XXX resort to a readFully without extra wrapping + // TODO Input stream may need to be reinitialized int n = ReadableChannels.readFully(ReadableChannels.wrap(in), header, 0, blockSize); if (n < blockSize) { isDataConsumed = true; header = Arrays.copyOf(header, n); } - if (header.length < prefix.length) { - cmp = -1; - } headerRecord = new HeaderRecord(nextDelimPos, 0, header, isDataConsumed); cache.setHeader(depth, headerRecord); } + // FIXME This condition seems wrong + // We need to compare the available bytes and if all match then ...? + if (headerRecord.data().length < prefix.length) { + cmp = -1; + } if (cmp == 0) { cmp = Arrays.compare(prefix, 0, l, headerRecord.data(), 0, l); }