Skip to content

Commit

Permalink
Add concurrent page cache warming to reduce IO latency
Browse files Browse the repository at this point in the history
This is a relatively hacky way to implement the issues raised in #64.

This PR adds a utility class to concurrently "warm" the OS page cache
with files that will be used for highlighting.

This should significantly reduce the I/O latency during the sequential
highlighting process, especially when using a network storage layer or a
RAID system.

The idea is that a lot of storage layers can benefit from parallel I/O.
Unfortunately, snippet generation with the current UHighlighter approach
is strongly sequential, which means we give away a lot of potential
performance, since we're limited by the I/O latency of the underlying
storage layer. By pre-reading the data we might need in a concurrent
way, we pre-populate the operating system's page cache, so any I/O
performed by the snippet generation process further down the line should
only hit the page cache and not incur as much of a latency hit.

The class also provides a way to cancel the pre-loading of a given
source pointer. This is called at the beginning of the snippet
generation process, since at that point any background I/O on the target
files will only add to the latency we might experience anyway.
  • Loading branch information
jbaiter authored and bitzl committed Feb 20, 2020
1 parent 25bacc8 commit 7d66243
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import de.digitalcollections.solrocr.formats.OcrPassageFormatter;
import de.digitalcollections.solrocr.formats.OcrSnippet;
import de.digitalcollections.solrocr.util.IterableCharSequence;
import de.digitalcollections.solrocr.util.PageCacheWarmer;
import java.io.IOException;
import java.text.BreakIterator;
import java.util.Arrays;
Expand Down Expand Up @@ -41,6 +42,9 @@ public OcrSnippet[] highlightFieldForDoc(LeafReader reader, int docId, BreakIter
int snippetLimit)
throws IOException {
// note: it'd be nice to accept a CharSequence for content, but we need a CharacterIterator impl for it.

// Stop page cache pre-warming, we're doing the IO ourselves now
PageCacheWarmer.cancelPreload(content.getPointer());
if (content.length() == 0) {
return null; // nothing to do
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import de.digitalcollections.solrocr.util.IterableCharSequence;
import de.digitalcollections.solrocr.util.MultiFileBytesCharIterator;
import de.digitalcollections.solrocr.util.OcrHighlightResult;
import de.digitalcollections.solrocr.util.PageCacheWarmer;
import de.digitalcollections.solrocr.util.SourcePointer;
import java.io.IOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -340,13 +341,14 @@ protected List<IterableCharSequence[]> loadOcrFieldValues(String[] fields, DocId
ocrVals[fieldIdx] = null;
continue;
}
PageCacheWarmer.preload(sourcePointer);
if (sourcePointer.sources.size() == 1) {
ocrVals[fieldIdx] = new FileBytesCharIterator(
sourcePointer.sources.get(0).path, StandardCharsets.UTF_8);
sourcePointer.sources.get(0).path, StandardCharsets.UTF_8, sourcePointer);
} else {
ocrVals[fieldIdx] = new MultiFileBytesCharIterator(
sourcePointer.sources.stream().map(s -> s.path).collect(Collectors.toList()),
StandardCharsets.UTF_8);
StandardCharsets.UTF_8, sourcePointer);
}
}
fieldValues.add(ocrVals);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ public class FileBytesCharIterator implements IterableCharSequence {
private final MappedByteBuffer buf;
private final int numBytes;
private final Charset charset;
private final SourcePointer ptr;

private int current;

public FileBytesCharIterator(Path path) throws IOException {
this(path, StandardCharsets.UTF_8);
public FileBytesCharIterator(Path path, SourcePointer ptr) throws IOException {
this(path, StandardCharsets.UTF_8, ptr);
}

public FileBytesCharIterator(Path path, Charset charset) throws IOException {
public FileBytesCharIterator(Path path, Charset charset, SourcePointer ptr) throws IOException {
this.ptr = ptr;
this.charset = charset;
this.filePath = path;
FileChannel channel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ);
Expand All @@ -54,7 +56,7 @@ public FileBytesCharIterator(Path path, Charset charset) throws IOException {
}

public FileBytesCharIterator(FileBytesCharIterator other) throws IOException {
this(other.filePath, other.charset);
this(other.filePath, other.charset, other.ptr);
this.current = other.current;
}

Expand Down Expand Up @@ -239,4 +241,9 @@ public OffsetType getOffsetType() {
public Charset getCharset() {
return this.charset;
}

@Override
public SourcePointer getPointer() {
return ptr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ enum OffsetType {

Charset getCharset();

SourcePointer getPointer();

static IterableCharSequence fromString(String string) {
return new IterableStringCharSequence(string);
}
Expand Down Expand Up @@ -137,5 +139,10 @@ public OffsetType getOffsetType() {
public Charset getCharset() {
return StandardCharsets.UTF_16;
}

@Override
public SourcePointer getPointer() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ public class MultiFileBytesCharIterator implements IterableCharSequence {
private final Map<Path, Integer> pathToOffset;
private final Charset charset;
private final int numBytes;
private final SourcePointer ptr;
private int current;

private final LoadingCache<Path, FileBytesCharIterator> subiters = CacheBuilder.newBuilder()
.maximumSize(1000)
.build(new CacheLoader<Path, FileBytesCharIterator>() {
@Override
public FileBytesCharIterator load(Path p) throws Exception {
return new FileBytesCharIterator(p, charset);
return new FileBytesCharIterator(p, charset, ptr);
}
});

public MultiFileBytesCharIterator(List<Path> filePaths, Charset charset) throws IOException {
public MultiFileBytesCharIterator(List<Path> filePaths, Charset charset, SourcePointer ptr) throws IOException {
this.ptr = ptr;
this.paths = filePaths;
this.charset = charset;
this.offsetMap = new TreeMap<>();
Expand All @@ -46,7 +48,7 @@ public MultiFileBytesCharIterator(List<Path> filePaths, Charset charset) throws
}

public MultiFileBytesCharIterator(MultiFileBytesCharIterator other) throws IOException {
this(other.paths, other.charset);
this(other.paths, other.charset, other.ptr);
this.current = other.current;
}

Expand Down Expand Up @@ -78,6 +80,11 @@ public Charset getCharset() {
return charset;
}

@Override
public SourcePointer getPointer() {
return ptr;
}

@Override
public int length() {
return numBytes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package de.digitalcollections.solrocr.util;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import de.digitalcollections.solrocr.util.SourcePointer.FileSource;
import de.digitalcollections.solrocr.util.SourcePointer.Region;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** Utility to concurrently "warm" the OS page cache with files that will be used for highlighting.
*
* Should significantly reduce the I/O latency during the sequential highlighting process, especially when using
* a network storage layer or a RAID system.
*
* The idea is that a lot of storage layers can benefit from parallel I/O. Unfortunately, snippet generation with the
* current UHighlighter approach is strongly sequential, which means we give away a lot of potential performance, since
* we're limited by the I/O latency of the underlying storage layer. By pre-reading the data we might need in a
* concurrent way, we pre-populate the operating system's page cache, so any I/O performed by the snippet generation
* process further down the line should only hit the page cache and not incur as much of a latency hit.
*
* The class also provides a way to cancel the pre-loading of a given source pointer. This is called at the beginning
* of the snippet generation process, since at that point any background I/O on the target files will only add to the
* latency we might experience anyway.
*/
public class PageCacheWarmer {
private static int BUF_SIZE = 32 * 1024;
private static int NUM_THREADS = 8;
private static int MAX_PENDING_JOBS = 128;

// This is the read buffer for every worker thread, so we only do as many allocations as necessary
private static ThreadLocal<ByteBuffer> BUF = ThreadLocal.withInitial(() -> ByteBuffer.allocate(BUF_SIZE));

// Set of pending preload operations for file sources, used to allow the cancelling of preloading tasks
private static final Set<FileSource> pendingPreloads = ConcurrentHashMap.newKeySet(MAX_PENDING_JOBS);

private static final ExecutorService service = MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(
NUM_THREADS, NUM_THREADS, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(MAX_PENDING_JOBS),
new ThreadFactoryBuilder().setNameFormat("solr-ocrhighlighting-cache-warmer-%d").build(),
new ThreadPoolExecutor.DiscardOldestPolicy()),
0, TimeUnit.MILLISECONDS);


/**
* Reads the file source in 32KiB chunks
* @param src file source
*/
private static void preload(FileSource src) {
pendingPreloads.add(src);
ByteBuffer buf = BUF.get();
try (SeekableByteChannel channel = Files.newByteChannel(src.path, StandardOpenOption.READ)) {
for (Region region : src.regions) {
channel.position(region.start);
int remainingSize = region.end - region.start;
while (remainingSize > 0 && pendingPreloads.contains(src)) {
remainingSize -= channel.read(buf);
}
}
} catch (IOException e) {
// NOP, this method only serves to populate the page cache, so we don't care about I/O errors.
} finally {
pendingPreloads.remove(src);
}
}

/**
* Populate the OS page cache with the targets of the source pointer.
*/
public static void preload(SourcePointer ptr) {
if (ptr == null) {
return;
}
for (FileSource source : ptr.sources) {
if (pendingPreloads.contains(source)) {
continue;
}
service.submit(() -> preload(source));
}
}

/**
* Cancel all running and pending preloading tasks for the given source pointer.
*/
public static void cancelPreload(SourcePointer ptr) {
if (ptr == null) {
return;
}
ptr.sources.forEach(pendingPreloads::remove);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.digitalcollections.solrocr.formats.hocr;

import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableSet;
import de.digitalcollections.solrocr.util.FileBytesCharIterator;
import de.digitalcollections.solrocr.util.IterableCharSequence;
Expand All @@ -13,8 +15,6 @@
import org.apache.lucene.analysis.charfilter.HTMLStripCharFilter;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

class HocrClassBreakIteratorTest {
private static final Path utf8Path = Paths.get("src/test/resources/data/hocr.html");

Expand All @@ -25,7 +25,7 @@ private String stripTags(String val) throws IOException {

@Test
void firstNext() throws IOException {
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8);
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8, null);
HocrClassBreakIterator it = new HocrClassBreakIterator("ocrx_word");
it.setText(seq);
int start = it.next();
Expand All @@ -37,7 +37,7 @@ void firstNext() throws IOException {

@Test
void next() throws IOException {
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8);
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8, null);
HocrClassBreakIterator it = new HocrClassBreakIterator("ocrx_word");
it.setText(seq);
seq.setIndex(670861);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private String stripTags(String val) throws IOException {

@Test
void testContext() throws IOException {
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8);
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8, null);
BreakIterator baseIter = new TagBreakIterator("w");
BreakIterator limitIter = new TagBreakIterator("b");
ContextBreakIterator it = new ContextBreakIterator(baseIter, limitIter, 5);
Expand All @@ -43,7 +43,7 @@ void testContext() throws IOException {
@Test
void testContextHonorsLimits() throws IOException {
IterableCharSequence seq = new FileBytesCharIterator(Paths.get("src/test/resources/data/hocr.html"),
StandardCharsets.UTF_8);
StandardCharsets.UTF_8, null);
BreakIterator baseIter = new HocrClassBreakIterator("ocr_line");
BreakIterator limitIter = new HocrClassBreakIterator("ocrx_block");
ContextBreakIterator it = new ContextBreakIterator(baseIter, limitIter, 5);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.digitalcollections.solrocr.util;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -14,8 +16,6 @@
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class FileBytesCharIteratorTest {
private static final Pattern OFFSET_PAT = Pattern.compile("\\s(.+?)⚑(\\d+)");

Expand All @@ -24,7 +24,7 @@ class FileBytesCharIteratorTest {
private FileBytesCharIterator it;

public FileBytesCharIteratorTest() throws IOException {
it = new FileBytesCharIterator(ocrPath);
it = new FileBytesCharIterator(ocrPath, null);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
MiniOcrByteOffsetsParser.parse(Files.readAllBytes(ocrPath), bos);
String text = bos.toString(StandardCharsets.UTF_8.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.digitalcollections.solrocr.util;

import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -15,8 +17,6 @@
import java.util.regex.Pattern;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class MultiFileBytesCharIteratorTest {
private static final Pattern OFFSET_PAT = Pattern.compile("\\s(.+?)⚑(\\d+)");

Expand All @@ -34,7 +34,7 @@ class MultiFileBytesCharIteratorTest {
private String asciiText;

public MultiFileBytesCharIteratorTest() throws IOException {
utf8It = new MultiFileBytesCharIterator(utfPaths, StandardCharsets.UTF_8);
utf8It = new MultiFileBytesCharIterator(utfPaths, StandardCharsets.UTF_8, null);
utf8CompletePath = Paths.get("src/test/resources/data/multi_txt/complete.txt");
utf8Text = new String(Files.readAllBytes(utf8CompletePath), StandardCharsets.UTF_8);
Matcher m = OFFSET_PAT.matcher(new String(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package de.digitalcollections.solrocr.util;

import static org.assertj.core.api.Java6Assertions.assertThat;

import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.StringReader;
Expand All @@ -11,8 +13,6 @@
import org.apache.lucene.analysis.charfilter.HTMLStripCharFilter;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Java6Assertions.assertThat;

class TagBreakIteratorTest {

private static final Path utf8Path = Paths.get("src/test/resources/data/miniocr.xml");
Expand All @@ -24,7 +24,7 @@ private String stripTags(String val) throws IOException {

@Test
void firstNext() throws IOException {
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8);
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8, null);
TagBreakIterator it = new TagBreakIterator("w");
it.setText(seq);
int start = it.next();
Expand All @@ -37,7 +37,7 @@ void firstNext() throws IOException {

@Test
void next() throws IOException {
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8);
IterableCharSequence seq = new FileBytesCharIterator(utf8Path, StandardCharsets.UTF_8, null);
TagBreakIterator it = new TagBreakIterator("w");
it.setText(seq);
seq.setIndex(8267);
Expand Down

0 comments on commit 7d66243

Please sign in to comment.