From 1c50aa2735c3fbfb0e2a8ef651c63e8cd360bbdd Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Fri, 6 Sep 2024 05:08:00 +0200 Subject: [PATCH] Towards impoved caching for binary search --- .../io/input/ReadableChannelOverIterator.java | 2 - .../commons/io/input/ReadableChannels.java | 6 ++- .../io/hadoop/binseach/v2/BinCount.java | 10 ++-- .../io/hadoop/binseach/v2/BinSearch.java | 2 +- .../binseach/v2/BinSearchResourceCache.java | 30 +++++++++++ .../io/hadoop/binseach/v2/BinSearchUtils.java | 8 +-- .../binseach/v2/BinarySearchBuilder.java | 54 +++++++++++-------- .../v2/BinarySearcherOverBlockSource.java | 35 ++++++++---- .../v2/BinarySearcherOverPlainSource.java | 24 ++++++--- 9 files changed, 118 insertions(+), 53 deletions(-) create mode 100644 aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java diff --git a/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelOverIterator.java b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelOverIterator.java index ea3f65c1..cc4bb0f3 100644 --- a/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelOverIterator.java +++ b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannelOverIterator.java @@ -22,7 +22,6 @@ public ReadableChannelOverIterator(ArrayOps arrayOps, Iterator it, Runna this.closeAction = closeAction; } - public void setCloseAction(Runnable closeAction) { this.closeAction = closeAction; } @@ -36,7 +35,6 @@ public Iterator getIterator() { return iterator; } - @Override public void closeActual() throws IOException { if (closeAction != null) { diff --git a/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannels.java b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannels.java index 8bdcf2ff..9a3fb981 100644 --- a/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannels.java +++ b/aksw-commons-io-parent/aksw-commons-io-buffers/src/main/java/org/aksw/commons/io/input/ReadableChannels.java @@ -42,7 +42,11 @@ public static ReadableChannel wrap(InputStream inputStream) { } public static ReadableChannel wrap(Stream stream, ArrayOps arrayOps) { - return new ReadableChannelOverIterator<>(arrayOps, stream.iterator(), stream::close); + return wrap(stream.iterator(), stream::close, arrayOps); + } + + public static ReadableChannel wrap(Iterator iterator, Runnable closeAction, ArrayOps arrayOps) { + return new ReadableChannelOverIterator<>(arrayOps, iterator, closeAction); } public static > ReadableByteChannelAdapter newChannel(T dataStream) { diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinCount.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinCount.java index 3166dcf3..b8224910 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinCount.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinCount.java @@ -158,11 +158,11 @@ public static void main2(String[] args) throws IOException { // if (true) { return; } - BinarySearcherOverBlockSource binSearcher = new BinarySearcherOverBlockSource(blockSource, BinSearchLevelCache.noCache(), 10000); - - try (BufferedReader br = new BufferedReader(new InputStreamReader(binSearcher.search(lookup)))) { - br.lines().forEach(x -> System.out.println(x)); - } +// BinarySearcherOverBlockSource binSearcher = new BinarySearcherOverBlockSource(blockSource, BinSearchLevelCache.noCache(), 10000); +// +// try (BufferedReader br = new BufferedReader(new InputStreamReader(binSearcher.search(lookup)))) { +// br.lines().forEach(x -> System.out.println(x)); +// } } // public static void main3(String[] args) throws IOException { diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearch.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearch.java index 57ba0389..36c0b98b 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearch.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearch.java @@ -38,7 +38,7 @@ public static void main(String[] args) throws IOException { BinarySearcher bs = BinarySearchBuilder.newBuilder() .setSource(bz2Path) .setCodec(new BZip2Codec()) - .setBinSearchCache(BinSearchLevelCache.dftCache()) + // .setBinSearchCache(BinSearchLevelCache.dftCache()) .build(); // BinarySearcher bs = BinarySearchBuilder.newBuilder() // .setSource(plainPath) diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java new file mode 100644 index 00000000..e3c96977 --- /dev/null +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchResourceCache.java @@ -0,0 +1,30 @@ +package org.aksw.commons.io.hadoop.binseach.v2; + +import java.util.function.Supplier; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +public class BinSearchResourceCache { + record CacheEntry(BinSearchLevelCache levelCache, Cache blockCache) {} + + protected Cache resourceCache; + + /** Factory for the caches of individual resources. */ + protected Supplier cacheFactory; + + public BinSearchResourceCache(int maxCacheSize) { + this(Caffeine.newBuilder().maximumSize(maxCacheSize).build(), () -> { + return new CacheEntry(BinSearchLevelCache.dftCache(), Caffeine.newBuilder().maximumSize(16).build()); + }); + } + + public BinSearchResourceCache(Cache resourceCache, Supplier cacheFactory) { + this.resourceCache = resourceCache; + this.cacheFactory = cacheFactory; + } + + public CacheEntry getOrCreate(Object key) { + return resourceCache.get(key, k -> cacheFactory.get()); + } +} diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java index 2e89088b..4c9068ae 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinSearchUtils.java @@ -54,11 +54,11 @@ public static int compareToPrefix(InputStream in, byte[] prefix) throws IOExcept return cmp; } - - public static InputStream configureStream(SeekableReadableChannel channel, long end, byte[] prefix) throws IOException { + public static InputStream configureStream( + SeekableReadableChannel channel, long end, byte[] prefix, BinSearchLevelCache levelCache) throws IOException { InputStream result; SeekableInputStream in = SeekableInputStreams.create(channel); - Match match = BinarySearcherOverPlainSource.binarySearch(in,SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, BinSearchLevelCache.noCache()); + Match match = BinarySearcherOverPlainSource.binarySearch(in,SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, levelCache); if (match != null) { in.position(match.start()); @@ -73,7 +73,7 @@ public static InputStream configureStream(SeekableReadableChannel channe new ReadableByteChannelForLinesMatchingPrefix( SeekableInputStreams.wrap(in), scanState)); } else { - System.err.println("NO MATCH"); + System.err.println("NO MATCH for " + new String(prefix)); in.close(); result = InputStream.nullInputStream(); // ReadableChannels.newInputStream(ReadableChannels.limit(in, 0)); } diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearchBuilder.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearchBuilder.java index 276d9b57..3f6fe0a7 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearchBuilder.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearchBuilder.java @@ -1,19 +1,21 @@ package org.aksw.commons.io.hadoop.binseach.v2; import java.nio.file.Path; +import java.util.function.Supplier; import org.aksw.commons.io.binseach.BinarySearcher; +import org.aksw.commons.io.hadoop.binseach.v2.BinSearchResourceCache.CacheEntry; import org.aksw.commons.io.input.SeekableReadableChannelSource; import org.apache.hadoop.io.compress.SplittableCompressionCodec; -import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; public class BinarySearchBuilder { protected Path path; protected SplittableCompressionCodec codec; - protected Cache blockCache; - protected BinSearchLevelCache binSearchCache; + // protected Cache blockCache; + //protected BinSearchLevelCache binSearchCache; + protected BinSearchResourceCache resourceCache; public static BinarySearchBuilder newBuilder() { return new BinarySearchBuilder(); @@ -29,39 +31,49 @@ public BinarySearchBuilder setCodec(SplittableCompressionCodec codec) { return this; } - public BinarySearchBuilder setBlockCache(Cache blockCache) { - this.blockCache = blockCache; + public BinarySearchBuilder setResourceCache(BinSearchResourceCache resourceCache) { + this.resourceCache = resourceCache; return this; } - public BinarySearchBuilder setBlockCacheSize(int maxSize) { - this.blockCache = Caffeine.newBuilder().maximumSize(maxSize).build(); - return this; - } - public BinarySearchBuilder setBinSearchCache(BinSearchLevelCache binSearchCache) { - this.binSearchCache = binSearchCache; - return this; - } +// public BinarySearchBuilder setBlockCache(Cache blockCache) { +// this.blockCache = blockCache; +// return this; +// } +// +// public BinarySearchBuilder setBlockCacheSize(int maxSize) { +// this.blockCache = Caffeine.newBuilder().maximumSize(maxSize).build(); +// return this; +// } +// +// public BinarySearchBuilder setBinSearchCache(BinSearchLevelCache binSearchCache) { +// this.binSearchCache = binSearchCache; +// return this; +// } public BinarySearcher build() { BinarySearcher result; - BinSearchLevelCache finalBinSearchCache = binSearchCache != null - ? binSearchCache - : BinSearchLevelCache.dftCache(); + Supplier cacheSupplier = resourceCache != null + ? () -> resourceCache.getOrCreate(path) + : () -> new CacheEntry(BinSearchLevelCache.dftCache(), Caffeine.newBuilder().maximumSize(16).build()); + +// BinSearchLevelCache finalBinSearchCache = binSearchCache != null +// ? binSearchCache +// : BinSearchLevelCache.dftCache(); if (codec == null) { SeekableReadableChannelSource source = new SeekableReadableChannelSourceOverNio(path); - result = new BinarySearcherOverPlainSource(source, finalBinSearchCache); + result = new BinarySearcherOverPlainSource(source, cacheSupplier); } else { BlockSource blockSource = BlockSource.of(path, codec); - Cache finalBlockCache = blockCache != null - ? blockCache - : Caffeine.newBuilder().maximumSize(16).build(); +// Cache finalBlockCache = blockCache != null +// ? blockCache +// : Caffeine.newBuilder().maximumSize(16).build(); - result = new BinarySearcherOverBlockSource(blockSource, finalBinSearchCache, finalBlockCache); + result = new BinarySearcherOverBlockSource(blockSource, cacheSupplier); } return result; } diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java index 609fad5c..70ae44bf 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverBlockSource.java @@ -4,10 +4,12 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.function.Supplier; import java.util.stream.Stream; import org.aksw.commons.io.binseach.BinarySearcher; import org.aksw.commons.io.hadoop.SeekableInputStreams; +import org.aksw.commons.io.hadoop.binseach.v2.BinSearchResourceCache.CacheEntry; import org.aksw.commons.io.input.ReadableChannel; import org.aksw.commons.io.input.ReadableChannelSources; import org.aksw.commons.io.input.ReadableChannelSupplier; @@ -30,18 +32,18 @@ public class BinarySearcherOverBlockSource private static final Logger logger = LoggerFactory.getLogger(BinarySearcherOverBlockSource.class); protected BlockSource blockSource; - protected BinSearchLevelCache cache; - protected Cache pageCache; + protected Supplier cacheSupplier; + // protected BinSearchLevelCache cache; + // protected Cache pageCache; - public BinarySearcherOverBlockSource(BlockSource blockSource, BinSearchLevelCache cache, int pageCacheSize) { - this(blockSource, cache, Caffeine.newBuilder().maximumSize(pageCacheSize).build()); - } +// public BinarySearcherOverBlockSource(BlockSource blockSource, BinSearchLevelCache cache, int pageCacheSize) { +// this(blockSource, cache, Caffeine.newBuilder().maximumSize(pageCacheSize).build()); +// } - public BinarySearcherOverBlockSource(BlockSource blockSource, BinSearchLevelCache cache, Cache pageCache) { + public BinarySearcherOverBlockSource(BlockSource blockSource, Supplier cacheSupplier) { super(); this.blockSource = blockSource; - this.cache = cache; - this.pageCache = pageCache; + this.cacheSupplier = cacheSupplier; } @Override @@ -50,17 +52,28 @@ public void close() throws Exception { @Override public InputStream search(byte[] prefix) throws IOException { + CacheEntry cacheEntry = cacheSupplier.get(); + BinSearchLevelCache levelCache = cacheEntry == null ? null : cacheEntry.levelCache(); + if (levelCache == null) { + levelCache = BinSearchLevelCache.noCache(); + } + InputStream result; - Match match = binarySearch(blockSource, prefix, cache); + Match match = binarySearch(blockSource, prefix, levelCache); if (match != null) { // System.out.println("Match found: " + match); + Cache blockCache = cacheEntry == null ? null : cacheEntry.blockCache(); + if (blockCache == null) { + blockCache = Caffeine.newBuilder().maximumSize(16).build(); + } + long startBlockId = match.start(); - SeekableReadableChannelOverBlocks channel = new SeekableReadableChannelOverBlocks(blockSource, startBlockId, pageCache); + SeekableReadableChannelOverBlocks channel = new SeekableReadableChannelOverBlocks(blockSource, startBlockId, blockCache); long blockSize = channel.getStartingBlockSize(); blockSize = 900000; - result = BinSearchUtils.configureStream(channel, blockSize * 2, prefix); + result = BinSearchUtils.configureStream(channel, blockSize * 2, prefix, BinSearchLevelCache.noCache()); boolean showKnownBlocks = false; if (showKnownBlocks) { diff --git a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java index 9e33e7f8..c2b6e1c1 100644 --- a/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java +++ b/aksw-commons-io-parent/aksw-commons-io-hadoop/src/main/java/org/aksw/commons/io/hadoop/binseach/v2/BinarySearcherOverPlainSource.java @@ -4,10 +4,12 @@ import java.io.InputStream; import java.nio.file.Path; import java.util.Arrays; +import java.util.function.Supplier; import java.util.stream.Stream; import org.aksw.commons.io.binseach.BinarySearcher; import org.aksw.commons.io.hadoop.SeekableInputStream; +import org.aksw.commons.io.hadoop.binseach.v2.BinSearchResourceCache.CacheEntry; import org.aksw.commons.io.input.ReadableChannel; import org.aksw.commons.io.input.ReadableChannelSources; import org.aksw.commons.io.input.ReadableChannelSupplier; @@ -30,12 +32,12 @@ public class BinarySearcherOverPlainSource private static final Logger logger = LoggerFactory.getLogger(BinarySearcherOverPlainSource.class); protected SeekableReadableChannelSource source; - protected BinSearchLevelCache cache; + protected Supplier cacheSupplier; - protected BinarySearcherOverPlainSource(SeekableReadableChannelSource source, BinSearchLevelCache cache) { + protected BinarySearcherOverPlainSource(SeekableReadableChannelSource source, Supplier cacheSupplier) { super(); this.source = source; - this.cache = cache; + this.cacheSupplier = cacheSupplier; } @Override @@ -46,19 +48,25 @@ public static Match binarySearch(SeekableInputStream channel, long end, byte[] p return binarySearch(channel, SearchMode.BOTH, 0, 0, end, (byte)'\n', prefix, BinSearchLevelCache.noCache()); } - public static BinarySearcherOverPlainSource of(SeekableReadableChannelSource source, BinSearchLevelCache cache) { - return new BinarySearcherOverPlainSource(source, cache); + public static BinarySearcherOverPlainSource of(SeekableReadableChannelSource source, Supplier cacheSupplier) { + return new BinarySearcherOverPlainSource(source, cacheSupplier); } - public static BinarySearcherOverPlainSource of(Path path, BinSearchLevelCache cache) { - return of(new SeekableReadableChannelSourceOverNio(path), cache); + public static BinarySearcherOverPlainSource of(Path path, Supplier cacheSupplier) { + return of(new SeekableReadableChannelSourceOverNio(path), cacheSupplier); } @Override public InputStream search(byte[] prefix) throws IOException { + CacheEntry cacheEntry = cacheSupplier.get(); + BinSearchLevelCache levelCache = cacheEntry == null ? null : cacheEntry.levelCache(); + if (levelCache == null) { + levelCache = BinSearchLevelCache.noCache(); + } + SeekableReadableChannel channel = source.newReadableChannel(); long searchRangeEnd = source.size(); - InputStream result = BinSearchUtils.configureStream(channel, searchRangeEnd, prefix); + InputStream result = BinSearchUtils.configureStream(channel, searchRangeEnd, prefix, levelCache); return result; }