Skip to content

Commit

Permalink
Revision of io package for improved binsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Sep 5, 2024
1 parent c7fc1e7 commit bf1924c
Show file tree
Hide file tree
Showing 87 changed files with 3,789 additions and 519 deletions.
2 changes: 1 addition & 1 deletion aksw-commons-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<!-- tell the compiler we can use 1.6 -->
<!-- <maven.compiler.source>1.8</maven.compiler.source> -->
<!-- <maven.compiler.target>1.8</maven.compiler.target> -->
<maven.compiler.release>11</maven.compiler.release>
<maven.compiler.release>17</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<gpg.keyname>AKSW</gpg.keyname>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
*
*/
public class BinSearchScanState {
public long firstDelimPos; // The match position found by scanning backwards with Pattern.match
public long matchDelimPos; // The match position found by binary search
public byte[] prefixBytes; // Generalize using lambda with a compatible signature to Pattern.match
public long size; // Absolute end of the data region on which the match was run
}
public long firstDelimPos; // The first match position of a run.
public long matchDelimPos; // A match position within a run (found by binary search).
public byte[] prefixBytes; // The prefix used for matching. XXX Generalize using lambda with a compatible signature to Pattern.match.
public long size; // Absolute end of the data region on which the match was run.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.aksw.commons.io.binseach;

import java.io.InputStream;
import java.util.stream.Stream;

import org.aksw.commons.io.input.ReadableChannelSupplier;

public class BinarySearchRequestBuilder {
protected byte[] prefix;
protected long splitSize;
protected int splitCount;

public BinarySearchRequestBuilder setPrefix(byte[] prefix) {
this.prefix = prefix;
return this;
}

public BinarySearchRequestBuilder setPrefix(String prefix) {
this.prefix = prefix == null ? null : prefix.getBytes();
return this;
}

public BinarySearchRequestBuilder setSplitSize(long splitSize) {
this.splitSize = splitSize;
this.splitCount = -1;
return this;
}

public BinarySearchRequestBuilder setSplitCount(int splitCount) {
this.splitCount = splitCount;
this.splitSize = -1;
return this;
}

public InputStream find() {
return null;
}

public Stream<ReadableChannelSupplier<byte[]>> findParallel() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.stream.Stream;

import org.aksw.commons.io.input.ReadableChannelSupplier;
import org.aksw.commons.io.input.ReadableChannels;

public interface BinarySearcher
extends AutoCloseable
Expand All @@ -10,11 +14,22 @@ public interface BinarySearcher

// Add default method for CharSequence?

default InputStream search(String str) throws IOException {
InputStream result = search(str == null ? null : str.getBytes());
default InputStream search(String prefixStr) throws IOException {
InputStream result = search(prefixStr == null ? null : prefixStr.getBytes());
return result;
}

// XXX Not ideal mixing InputStream and Channel
default Stream<ReadableChannelSupplier<byte[]>> parallelSearch(byte[] prefix) throws IOException {
return Stream.of(() -> {
try {
return ReadableChannels.wrap(search(prefix));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

@Override
void close() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,26 @@
import org.aksw.commons.util.ref.Ref;

/**
* A helper iterator that automatically closes
* the previous item when next() is called.
*
* Actually, we do not need next to return an object, instead it could
* just set properties directly:
*
* A helper that holds a reference to the current block
* and automatically closes it when advancing to the next one.
* *
* IterState.advance();
* IterState.closeCurrent();
* IterState.current();
*
* @author raven
*
*/
public class BlockIterState {
// implements Iterator<OpenBlock> {

// protected OpenBlock current;
public class BlockEnumerator {
public Ref<? extends Block> blockRef;
public Block block;
public Seekable seekable;


protected boolean yieldSelf;
protected boolean skipFirstClose;
protected boolean isFwd;

public BlockIterState(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable, boolean isFwd, boolean skipFirstClose) {
// this.current = new OpenBlock(blockRef, seekable);
protected BlockEnumerator(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable, boolean isFwd, boolean skipFirstClose) {
Objects.requireNonNull(blockRef);

this.blockRef = blockRef;
Expand All @@ -47,23 +39,18 @@ public BlockIterState(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable
this.isFwd = isFwd;
}

public static BlockIterState fwd(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable) {
return new BlockIterState(yieldSelf, blockRef, seekable, true, true);
public Ref<? extends Block> getCurrentBlockRef() {
return blockRef;
}

public static BlockIterState fwd(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable, boolean skipFirstClose) {
return new BlockIterState(yieldSelf, blockRef, seekable, true, skipFirstClose);
public Block getCurrentBlock() {
return block;
}

public static BlockIterState fwd(boolean yieldSelf, Ref<? extends Block> blockRef, boolean skipFirstClose) {
return new BlockIterState(yieldSelf, blockRef, blockRef.get().newChannel(), true, skipFirstClose);
public Seekable getCurrentSeekable() {
return seekable;
}

public static BlockIterState bwd(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable) {
return new BlockIterState(yieldSelf, blockRef, seekable, false, true);
}

//@Override
public boolean hasNext() {
boolean result;
try {
Expand All @@ -74,12 +61,11 @@ public boolean hasNext() {
: block.hasPrev();

} catch (IOException e) {
throw new RuntimeException();
throw new RuntimeException(e);
}
return result;
}


public void closeCurrent() {
if(!skipFirstClose) {
try {
Expand All @@ -96,6 +82,7 @@ public void closeCurrent() {
}
}

/** Move to the next or previous block based on the configured direction. */
public void advance() {
try {
if(yieldSelf) {
Expand All @@ -114,16 +101,26 @@ public void advance() {
blockRef = next;
block = next.get();
seekable = block.newChannel();

//current = new OpenBlock(next, next.get().newChannel());
//result = current;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static BlockEnumerator fwd(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable) {
return new BlockEnumerator(yieldSelf, blockRef, seekable, true, true);
}

// return result;
public static BlockEnumerator fwd(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable, boolean skipFirstClose) {
return new BlockEnumerator(yieldSelf, blockRef, seekable, true, skipFirstClose);
}

}
public static BlockEnumerator fwd(boolean yieldSelf, Ref<? extends Block> blockRef, boolean skipFirstClose) {
return new BlockEnumerator(yieldSelf, blockRef, blockRef.get().newChannel(), true, skipFirstClose);
}

public static BlockEnumerator bwd(boolean yieldSelf, Ref<? extends Block> blockRef, Seekable seekable) {
return new BlockEnumerator(yieldSelf, blockRef, seekable, false, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import java.io.IOException;

import org.aksw.commons.io.input.ReadableChannels;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.aksw.commons.io.input.SeekableReadableChannelSource;
import org.aksw.commons.io.input.SeekableReadableChannels;

public interface ArrayReadable<A>
extends SeekableReadableChannelSource<A>
Expand All @@ -15,7 +15,7 @@ public interface ArrayReadable<A>
@Override
default SeekableReadableChannel<A> newReadableChannel() throws IOException {
// Preconditions.checkArgument(start <= end, String.format("Start (%d) must be <= end (%d)", start, end));
SeekableReadableChannel<A> result = ReadableChannels.newChannel(this, 0);
SeekableReadableChannel<A> result = SeekableReadableChannels.newChannel(this, 0);
// if (Long.MAX_VALUE != end) {
// long length = end - start;
// result = ReadableChannels.ra(result, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.aksw.commons.io.input.ReadableChannelSwitchable;
import org.aksw.commons.io.input.ReadableChannels;
import org.aksw.commons.io.input.SeekableReadableChannel;
import org.aksw.commons.io.input.SeekableReadableChannelSwitchable;
import org.aksw.commons.io.input.SeekableReadableChannels;
import org.aksw.commons.io.shared.ChannelBase;
import org.apache.commons.io.input.BoundedInputStream;

Expand All @@ -31,7 +31,6 @@
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteStreams;
import com.google.common.math.IntMath;
import com.google.common.primitives.Ints;

/**
Expand Down Expand Up @@ -574,7 +573,7 @@ public void write(long offsetInBuffer, ReadableChannel<A> source, int amount) th

@Override
public void write(long offsetInBuffer, A arrayWithItemsOfTypeT, int arrOffset, int arrLength) throws IOException {
write(offsetInBuffer, ReadableChannels.of(arrayOps, arrayWithItemsOfTypeT, arrOffset), arrLength);
write(offsetInBuffer, SeekableReadableChannels.of(arrayOps, arrayWithItemsOfTypeT, arrOffset), arrLength);
}

@Override
Expand Down Expand Up @@ -719,7 +718,7 @@ public static void main(String[] args) throws IOException {
// bc.position(pos);
//bc.read(b, 0, 1);
// actual = b[0];
CharSequence cs = ReadableChannels.asCharSequence(bc2, Ints.saturatedCast(size));
CharSequence cs = SeekableReadableChannels.asCharSequence(bc2, Ints.saturatedCast(size));
actual = (byte)cs.charAt(Ints.checkedCast(pos));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.aksw.commons.io.buffer.plain;

import java.io.IOException;

import org.aksw.commons.io.buffer.array.ArrayOps;

import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -34,14 +36,19 @@ public void write(long offsetInBuffer, A arrayWithItemsOfTypeT, int arrOffset, i
}

public A getArray() {
return array;
}
return array;
}

@Override
public long getCapacity() {
return arrayOps.length(array);
}

@Override
public long size() throws IOException {
return arrayOps.length(array);
}

@Override
public ArrayOps<A> getArrayOps() {
return arrayOps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import java.io.IOException;
import java.util.AbstractList;

import com.google.common.primitives.Ints;

/**
* List view over a buffer.
*
* @param <T> The item type. Casts to this type are be unchecked.
* @param <T> The item type. Casts to this type are unchecked.
*/
public class ListOverBuffer<T>
extends AbstractList<T>
Expand Down
Loading

0 comments on commit bf1924c

Please sign in to comment.