Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ public static class Reader implements Closeable {
private final Long dataSize;
private final long fileHeaderEnd;

private final DataFileReader<GenericRecord> mFileReader;
private final Options options;
private DataFileReader<GenericRecord> mFileReader;

private final Schema mKeySchema;
private final Schema mValueSchema;
private final RecordProjection projection;

public Schema getKeySchema() {
return mKeySchema;
Expand Down Expand Up @@ -96,22 +98,61 @@ public Path getPath() {
}

public Reader(Options options) throws IOException {
// Open the data file.
Path dataFilePath = options.getPath();
logger.debug("Loading the data file " + dataFilePath);
DatumReader<GenericRecord> datumReader = GenericData.get().createDatumReader(null);
mFileReader = new DataFileReader<>(new FsInput(dataFilePath, options.getConfiguration()), datumReader);
this.options = options;
initFileReader();
String[] split = mFileReader.getMetaString(KEY_VALUE_HEADER_NAME).split("\\|");
mKeySchema = projectSchema(mFileReader.getSchema(), split[0].split(","));
mValueSchema = projectSchema(mFileReader.getSchema(), split[1].split(","));
projection = new RecordProjection(mKeySchema, mValueSchema);

fileHeaderEnd = mFileReader.previousSync();
dataSize = mFileReader.getMetaLong(DATA_SIZE_KEY);
}

// TODO: do we need this sync?
public void sync(Long syncPosition) throws IOException {
mFileReader.sync(syncPosition);
private void initFileReader() throws IOException {
// Open the data file.
Path dataFilePath = options.getPath();
logger.debug("Loading the data file " + dataFilePath);
DatumReader<GenericRecord> datumReader = GenericData.get().createDatumReader(null);
mFileReader = new DataFileReader<>(new FsInput(dataFilePath, options.getConfiguration()), datumReader);
}

GenericRecord nxt;
GenericRecord lastKey;
Node curNode;

private GenericRecord getNext(GenericRecord key) {
int blockCount = curNode.records.size();
while ((curNode.curRecord < blockCount)) {
int comparison = GenericData.get().compare(projection.getKey(curNode.getCurGenericRecord()), key, mKeySchema);
logger.debug("comparison was: {} with: {} and {}", comparison, projection.getKey(curNode.getCurGenericRecord()), key);
if (0 == comparison) {
// We've found it!
logger.debug("Found record for key {}", key);
// return projection.getValue(node.getCurGenericRecord());
GenericRecord ret = curNode.getCurGenericRecord();
curNode.curRecord++;
return ret;
} else if (comparison > 0) {
// We've passed it.
if (!curNode.prevHasUnvisitedChild()) {
logger.debug("key does not appear in the file: {}", key);
return null;
} else {
return getNextFromPrevChild(key);
}
}
curNode.curRecord += 1;
}
if (curNode.prevHasUnvisitedChild()) {
return getNextFromPrevChild(key);
} else if (curNode.parent != null) {
curNode = curNode.parent;
return getNext(key);
}

logger.debug("reached end of road. key does not appear in the file: {}", key);
return null;
}

/**
Expand All @@ -123,16 +164,16 @@ public void sync(Long syncPosition) throws IOException {
*/
public Iterator<GenericRecord> get(GenericRecord key) {
logger.debug("searching for key: {}", key);
if (lastKey!=null && GenericData.get().compare(lastKey, key, lastKey.getSchema())>=0) {
throw new RuntimeException("Only allowed to search for same or larger keys. current key: " + key
+ " previous key: " + lastKey);
}
if (curNode == null)
curNode = new Node();
nxt = getNext(key);
lastKey = key;
return new Iterator<GenericRecord>() {

long curOffset;
GenericRecord lastRecord = null;
int counter;
long blockCount;
private RecordProjection projection = new RecordProjection(mKeySchema, mValueSchema);

GenericRecord nxt = getNextFromOffset(0);

@Override
public boolean hasNext() {
return nxt!=null;
Expand All @@ -144,83 +185,32 @@ public GenericRecord next() {
throw new NoSuchElementException();
}
GenericRecord ret = nxt;
nxt = getNext();
nxt = getNext(key);
return ret;
}

private GenericRecord getNextFromOffset(long offset) {
curOffset = offset;
init();
return getNext();
}

private void init() {
curOffset += fileHeaderEnd;
logger.debug("seeking to position: " + curOffset);
counter = 0;
blockCount = -1;
lastRecord = null;
try {
mFileReader.seek(curOffset);
} catch (IOException e) {
throw new RuntimeException(e);
}
mFileReader.hasNext();
}

private GenericRecord getNext() {
while (mFileReader.hasNext() && (counter < blockCount || blockCount < 0)) {
GenericRecord record = mFileReader.next();
if (blockCount < 0) blockCount = mFileReader.getBlockCount();

counter += 1;
int comparison = GenericData.get().compare(projection.getKey(record), key, mKeySchema);
logger.debug("comparison was: {} with: {} and {}", comparison, projection.getKey(record), key);
if (0 == comparison) {
// We've found it!
logger.debug("Found record for key {}", key);
lastRecord = record;
return projection.getValue(record);
} else if (comparison > 0) {
// We've passed it.
if (lastRecord == null || projection.getMetadata(lastRecord) == null) {
logger.debug("key does not appear in the file: {}", key);
curOffset -= fileHeaderEnd;
return null;
} else {
return getNextFromOffset(getRealOffset(lastRecord));
}
}
lastRecord = record;
}
if (lastRecord != null && projection.getMetadata(lastRecord) != null) {
return getNextFromOffset(getRealOffset(lastRecord));
}

logger.debug("reached end of road. key does not appear in the file: {}", key);
return null;
}

};
}

private Long getRealOffset(GenericRecord record) {
Long offset = dataSize;
Long reversedOffset = (Long) record.get(METADATA_COL_NAME);
if (reversedOffset != null)
offset -= reversedOffset;
return offset;
private GenericRecord getNextFromPrevChild(GenericRecord key) {
curNode = curNode.getChildNode(curNode.curRecord-1);
return getNext(key);
}

/**
* this iterator runs on the records in sorted order, and not in the "b-tree" order the records are
* saved in the file
*/
public Iterator<GenericRecord> getIterator() {
if (mFileReader.previousSync()>fileHeaderEnd) {
try {
initFileReader();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return new Iterator<GenericRecord>() {

private final RecordProjection projection = new RecordProjection(mKeySchema, mValueSchema);
private Node next = new Node(0);
private Node next = new Node();

@Override
public boolean hasNext() {
Expand All @@ -246,48 +236,71 @@ public GenericRecord next() {
return ret;
}

class Node {
Node(long offset) {
try {
mFileReader.seek(fileHeaderEnd + offset);
GenericRecord firstRecord = mFileReader.next();
// we only know the block count after the first next()
int blockCount = (int) mFileReader.getBlockCount();
records = new ArrayList<>(blockCount);
records.add(firstRecord);
for (int i=1; i<blockCount; i++) {
records.add(mFileReader.next());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}

boolean curHasChild() {
return projection.getMetadata(records.get(curRecord)) != null;
}
@Override
public void close() throws IOException {
mFileReader.close();
}

Node getChildNode() {
Node childNode = new Node(getRealOffset(records.get(curRecord)));
childNode.parent = this;
return childNode;
}
class Node {
Node() {
GenericRecord firstRecord = mFileReader.next();
// we only know the block count after the first next()
int blockCount = (int) mFileReader.getBlockCount();
records = new ArrayList<>(blockCount);
records.add(firstRecord);
for (int i=1; i<blockCount; i++) {
records.add(mFileReader.next());
}
}

GenericRecord getCurGenericRecord() {
return records.get(curRecord);
}
boolean prevHasUnvisitedChild() {
return curRecord>0 && hasChild(curRecord-1) && lastVisited != curRecord-1;
}

boolean curHasChild() {
return hasChild(curRecord);
}

private boolean hasChild(int i) {
return projection.getMetadata(records.get(i)) != null;
}

Node getChildNode() {
return getChildNode(curRecord);
}

int curRecord = 0;
final List<GenericRecord> records;
Node parent;
Node getChildNode(int i) {
long offset = dataSize - ((Long) records.get(i).get(METADATA_COL_NAME)) + fileHeaderEnd;

if (offset > mFileReader.previousSync()) {
try {
logger.debug("seeking to position: " + offset);
mFileReader.seek(offset);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (offset < mFileReader.previousSync()) {
throw new RuntimeException("Only allowed forward seek, was requested to seek from: " +
mFileReader.previousSync() + " to: " + offset);
}

};
}
Node childNode = new Node();
childNode.parent = this;
lastVisited = i;
return childNode;
}

@Override
public void close() throws IOException {
mFileReader.close();
GenericRecord getCurGenericRecord() {
return records.get(curRecord);
}

int curRecord = 0;
private int lastVisited = -1;
final List<GenericRecord> records;
Node parent;
}
}

Expand Down
Loading