diff --git a/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java b/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java index 1bb40efd..26864704 100644 --- a/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java +++ b/dione-hadoop/src/main/java/com/paypal/dione/avro/hadoop/file/AvroBtreeFile.java @@ -46,10 +46,12 @@ public static class Reader implements Closeable { private final Long dataSize; private final long fileHeaderEnd; - private final DataFileReader mFileReader; + private final Options options; + private DataFileReader mFileReader; private final Schema mKeySchema; private final Schema mValueSchema; + private final RecordProjection projection; public Schema getKeySchema() { return mKeySchema; @@ -96,22 +98,61 @@ public Path getPath() { } public Reader(Options options) throws IOException { - // Open the data file. - Path dataFilePath = options.getPath(); - logger.debug("Loading the data file " + dataFilePath); - DatumReader datumReader = GenericData.get().createDatumReader(null); - mFileReader = new DataFileReader<>(new FsInput(dataFilePath, options.getConfiguration()), datumReader); + this.options = options; + initFileReader(); String[] split = mFileReader.getMetaString(KEY_VALUE_HEADER_NAME).split("\\|"); mKeySchema = projectSchema(mFileReader.getSchema(), split[0].split(",")); mValueSchema = projectSchema(mFileReader.getSchema(), split[1].split(",")); + projection = new RecordProjection(mKeySchema, mValueSchema); fileHeaderEnd = mFileReader.previousSync(); dataSize = mFileReader.getMetaLong(DATA_SIZE_KEY); } - // TODO: do we need this sync? - public void sync(Long syncPosition) throws IOException { - mFileReader.sync(syncPosition); + private void initFileReader() throws IOException { + // Open the data file. + Path dataFilePath = options.getPath(); + logger.debug("Loading the data file " + dataFilePath); + DatumReader datumReader = GenericData.get().createDatumReader(null); + mFileReader = new DataFileReader<>(new FsInput(dataFilePath, options.getConfiguration()), datumReader); + } + + GenericRecord nxt; + GenericRecord lastKey; + Node curNode; + + private GenericRecord getNext(GenericRecord key) { + int blockCount = curNode.records.size(); + while ((curNode.curRecord < blockCount)) { + int comparison = GenericData.get().compare(projection.getKey(curNode.getCurGenericRecord()), key, mKeySchema); + logger.debug("comparison was: {} with: {} and {}", comparison, projection.getKey(curNode.getCurGenericRecord()), key); + if (0 == comparison) { + // We've found it! + logger.debug("Found record for key {}", key); +// return projection.getValue(node.getCurGenericRecord()); + GenericRecord ret = curNode.getCurGenericRecord(); + curNode.curRecord++; + return ret; + } else if (comparison > 0) { + // We've passed it. + if (!curNode.prevHasUnvisitedChild()) { + logger.debug("key does not appear in the file: {}", key); + return null; + } else { + return getNextFromPrevChild(key); + } + } + curNode.curRecord += 1; + } + if (curNode.prevHasUnvisitedChild()) { + return getNextFromPrevChild(key); + } else if (curNode.parent != null) { + curNode = curNode.parent; + return getNext(key); + } + + logger.debug("reached end of road. key does not appear in the file: {}", key); + return null; } /** @@ -123,16 +164,16 @@ public void sync(Long syncPosition) throws IOException { */ public Iterator get(GenericRecord key) { logger.debug("searching for key: {}", key); + if (lastKey!=null && GenericData.get().compare(lastKey, key, lastKey.getSchema())>=0) { + throw new RuntimeException("Only allowed to search for same or larger keys. current key: " + key + + " previous key: " + lastKey); + } + if (curNode == null) + curNode = new Node(); + nxt = getNext(key); + lastKey = key; return new Iterator() { - long curOffset; - GenericRecord lastRecord = null; - int counter; - long blockCount; - private RecordProjection projection = new RecordProjection(mKeySchema, mValueSchema); - - GenericRecord nxt = getNextFromOffset(0); - @Override public boolean hasNext() { return nxt!=null; @@ -144,72 +185,15 @@ public GenericRecord next() { throw new NoSuchElementException(); } GenericRecord ret = nxt; - nxt = getNext(); + nxt = getNext(key); return ret; } - - private GenericRecord getNextFromOffset(long offset) { - curOffset = offset; - init(); - return getNext(); - } - - private void init() { - curOffset += fileHeaderEnd; - logger.debug("seeking to position: " + curOffset); - counter = 0; - blockCount = -1; - lastRecord = null; - try { - mFileReader.seek(curOffset); - } catch (IOException e) { - throw new RuntimeException(e); - } - mFileReader.hasNext(); - } - - private GenericRecord getNext() { - while (mFileReader.hasNext() && (counter < blockCount || blockCount < 0)) { - GenericRecord record = mFileReader.next(); - if (blockCount < 0) blockCount = mFileReader.getBlockCount(); - - counter += 1; - int comparison = GenericData.get().compare(projection.getKey(record), key, mKeySchema); - logger.debug("comparison was: {} with: {} and {}", comparison, projection.getKey(record), key); - if (0 == comparison) { - // We've found it! - logger.debug("Found record for key {}", key); - lastRecord = record; - return projection.getValue(record); - } else if (comparison > 0) { - // We've passed it. - if (lastRecord == null || projection.getMetadata(lastRecord) == null) { - logger.debug("key does not appear in the file: {}", key); - curOffset -= fileHeaderEnd; - return null; - } else { - return getNextFromOffset(getRealOffset(lastRecord)); - } - } - lastRecord = record; - } - if (lastRecord != null && projection.getMetadata(lastRecord) != null) { - return getNextFromOffset(getRealOffset(lastRecord)); - } - - logger.debug("reached end of road. key does not appear in the file: {}", key); - return null; - } - }; } - private Long getRealOffset(GenericRecord record) { - Long offset = dataSize; - Long reversedOffset = (Long) record.get(METADATA_COL_NAME); - if (reversedOffset != null) - offset -= reversedOffset; - return offset; + private GenericRecord getNextFromPrevChild(GenericRecord key) { + curNode = curNode.getChildNode(curNode.curRecord-1); + return getNext(key); } /** @@ -217,10 +201,16 @@ private Long getRealOffset(GenericRecord record) { * saved in the file */ public Iterator getIterator() { + if (mFileReader.previousSync()>fileHeaderEnd) { + try { + initFileReader(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } return new Iterator() { - private final RecordProjection projection = new RecordProjection(mKeySchema, mValueSchema); - private Node next = new Node(0); + private Node next = new Node(); @Override public boolean hasNext() { @@ -246,48 +236,71 @@ public GenericRecord next() { return ret; } - class Node { - Node(long offset) { - try { - mFileReader.seek(fileHeaderEnd + offset); - GenericRecord firstRecord = mFileReader.next(); - // we only know the block count after the first next() - int blockCount = (int) mFileReader.getBlockCount(); - records = new ArrayList<>(blockCount); - records.add(firstRecord); - for (int i=1; i(blockCount); + records.add(firstRecord); + for (int i=1; i0 && hasChild(curRecord-1) && lastVisited != curRecord-1; + } + + boolean curHasChild() { + return hasChild(curRecord); + } + + private boolean hasChild(int i) { + return projection.getMetadata(records.get(i)) != null; + } + + Node getChildNode() { + return getChildNode(curRecord); + } - int curRecord = 0; - final List records; - Node parent; + Node getChildNode(int i) { + long offset = dataSize - ((Long) records.get(i).get(METADATA_COL_NAME)) + fileHeaderEnd; + + if (offset > mFileReader.previousSync()) { + try { + logger.debug("seeking to position: " + offset); + mFileReader.seek(offset); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (offset < mFileReader.previousSync()) { + throw new RuntimeException("Only allowed forward seek, was requested to seek from: " + + mFileReader.previousSync() + " to: " + offset); } - }; - } + Node childNode = new Node(); + childNode.parent = this; + lastVisited = i; + return childNode; + } - @Override - public void close() throws IOException { - mFileReader.close(); + GenericRecord getCurGenericRecord() { + return records.get(curRecord); + } + + int curRecord = 0; + private int lastVisited = -1; + final List records; + Node parent; } } diff --git a/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala b/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala index 6545005d..92a4bd3c 100644 --- a/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala +++ b/dione-hadoop/src/test/scala/com/paypal/dione/kvstorage/hadoop/avro/TestAvroBtreeStorageFile.scala @@ -18,6 +18,8 @@ class TestAvroBtreeStorageFile extends AvroExtensions { val printDebug = false val strList = new ArrayBuffer[String] + def padKey(i: Int, d: Int = 4): String = s"%0${d}d".format(i) + @Test def testOneLevel(): Unit = { @@ -32,10 +34,8 @@ class TestAvroBtreeStorageFile extends AvroExtensions { val kvStorageFileReader = tuplesStorage.reader(filename) - Assertions.assertEquals(10, kvStorageFileReader.getIterator().toList.size) val expected = List(("001", ("1", 1)), ("002", ("2", 2)), ("003", ("3", 3))).map(tuple2records).map(_.toString()) - kvStorageFileReader.fileReader.sync(0) val actual = kvStorageFileReader.getIterator().toList.take(3).map(_.toString()) Assertions.assertEquals(expected, actual) } @@ -47,13 +47,12 @@ class TestAvroBtreeStorageFile extends AvroExtensions { .map(i => (simpleSchema.createRecord(i), simpleSchema2.createRecord(i))) kvStorageFileWriter.write(entries, filename) + printBtreeAvroFile(simpleStorage.reader(filename)) val kvStorageFileReader = simpleStorage.reader(filename) - printBtreeAvroFile(kvStorageFileReader) + Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("000"))) Assertions.assertEquals("002", kvStorageFileReader.get(simpleSchema.createRecord("002")).get.get("val2").toString) Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("100"))) - Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("000"))) - kvStorageFileReader.fileReader.sync(0) Assertions.assertEquals(10, kvStorageFileReader.getIterator().size) } @@ -64,10 +63,9 @@ class TestAvroBtreeStorageFile extends AvroExtensions { kvStorageFileWriter.write(entries, filename) val kvStorageFileReader = simpleStorage.reader(filename) - Assertions.assertEquals("1234", kvStorageFileReader.get(simpleSchema.createRecord("1234")).get.get("val2").toString) - Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("100a"))) Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("000"))) - kvStorageFileReader.fileReader.sync(0) + Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("100a"))) + Assertions.assertEquals("1234", kvStorageFileReader.get(simpleSchema.createRecord("1234")).get.get("val2").toString) Assertions.assertEquals(100000, kvStorageFileReader.getIterator().size) } @@ -75,16 +73,18 @@ class TestAvroBtreeStorageFile extends AvroExtensions { def testMany(): Unit = { val kvStorageFileWriter = simpleStorage.writer(2, 3) val N = 100 - val entries = (1 to N).iterator.map(i => (simpleSchema.createRecord((i * 2).toString), simpleSchema2.createRecord((i * 2).toString))) + val entries = (1 to N).iterator.map(i => (simpleSchema.createRecord(padKey(i * 2)), simpleSchema2.createRecord((i * 2).toString))) kvStorageFileWriter.write(entries, filename) + printBtreeAvroFile(simpleStorage.reader(filename)) + val kvStorageFileReader = simpleStorage.reader(filename) - (-1 to 2 * N + 2).foreach(i => { + (1 to 2 * N + 2).foreach(i => { // println("Trying to get: " + i) - if (i <= 0 || i > 2 * N || i % 2 == 1) - Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema2.createRecord(i.toString))) + if (i > 2 * N || i % 2 == 1) + Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema2.createRecord(padKey(i)))) else - Assertions.assertEquals(i.toString, kvStorageFileReader.get(simpleSchema2.createRecord(i.toString)).get.get("val2").toString) + Assertions.assertEquals(i.toString, kvStorageFileReader.get(simpleSchema2.createRecord(padKey(i))).get.get("val2").toString) }) } @@ -97,27 +97,29 @@ class TestAvroBtreeStorageFile extends AvroExtensions { kvStorageFileWriter.write(entries, filename) val kvStorageFileReader = simpleStorage.reader(filename) - Assertions.assertEquals("1", kvStorageFileReader.get(simpleSchema.createRecord("a")).get.get("val2").toString) - Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("aa"))) - kvStorageFileReader.fileReader.sync(0) + Assertions.assertEquals("1", kvStorageFileReader.getIterator(simpleSchema.createRecord("a")).next().get("val2").toString) + Assertions.assertFalse(kvStorageFileReader.getIterator(simpleSchema.createRecord("aa")).hasNext) + Assertions.assertEquals("2", kvStorageFileReader.getIterator(simpleSchema.createRecord("b")).next().get("val2").toString) Assertions.assertEquals(3, kvStorageFileReader.getIterator().size) } @Test def testBugLast(): Unit = { val kvStorageFileWriter = simpleStorage.writer(1000, 2) - val entries = (1 to 19).iterator.map(_.toString).map{ i=> - (simpleSchema.createRecord(i), simpleSchema2.createRecord(i)) + val entries = (1 to 19).iterator.map{ i=> + (simpleSchema.createRecord(padKey(i)), simpleSchema2.createRecord(i.toString)) } kvStorageFileWriter.write(entries, filename) val kvStorageFileReader = simpleStorage.reader(filename) + Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("0000"))) + (1 to 19).foreach(i => { - Assertions.assertEquals("" + i, kvStorageFileReader.get(simpleSchema2.createRecord("" + i)).get.get("val2").toString) + Assertions.assertEquals("" + i, kvStorageFileReader.get(simpleSchema2.createRecord(padKey(i))).get.get("val2").toString) }) - Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("0"))) - Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("20"))) - Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("100"))) + + Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("0020"))) + Assertions.assertEquals(None, kvStorageFileReader.get(simpleSchema.createRecord("0100"))) } def btreeProps(n: Int, interval: Int, height: Int): Unit = { @@ -139,7 +141,8 @@ class TestAvroBtreeStorageFile extends AvroExtensions { val it = fileS.iterator().asScala var block = 0L val headerPos = kvStorageFileReader.fileReader.getFileHeaderEnd - printDebug("datasize: " + kvStorageFileReader.fileReader.getmFileReader.getMetaLong("data_bytes")) + val dataSize = kvStorageFileReader.fileReader.getmFileReader.getMetaLong("data_bytes") + printDebug("datasize: " + dataSize) printDebug("header end at: " + headerPos) strList.clear() @@ -148,7 +151,7 @@ class TestAvroBtreeStorageFile extends AvroExtensions { if (lastSync!=block) { printDebug(r.toString) if (block>0) - printDebug("block ^^ at:" + (block-headerPos)) + printDebug("block ^^ at:" + (block-headerPos) + " (" + (dataSize-block+headerPos) + ")") block = lastSync } else { printDebug(r.toString) @@ -181,33 +184,33 @@ class TestAvroBtreeStorageFile extends AvroExtensions { """{val1: 001, val2: 001, metadata: 295} |{val1: 008, val2: 008, metadata: 195} |{val1: 015, val2: 015, metadata: 93} - |block ^^ at:0 + |block ^^ at:0 (340) |{val1: 002, val2: 002, metadata: 259} |{val1: 005, val2: 005, metadata: 227} - |block ^^ at:45 + |block ^^ at:45 (295) |{val1: 003, val2: 003, metadata: null} |{val1: 004, val2: 004, metadata: null} - |block ^^ at:81 + |block ^^ at:81 (259) |{val1: 006, val2: 006, metadata: null} |{val1: 007, val2: 007, metadata: null} - |block ^^ at:113 + |block ^^ at:113 (227) |{val1: 009, val2: 009, metadata: 157} |{val1: 012, val2: 012, metadata: 125} - |block ^^ at:145 + |block ^^ at:145 (195) |{val1: 010, val2: 010, metadata: null} |{val1: 011, val2: 011, metadata: null} - |block ^^ at:183 + |block ^^ at:183 (157) |{val1: 013, val2: 013, metadata: null} |{val1: 014, val2: 014, metadata: null} - |block ^^ at:215 + |block ^^ at:215 (125) |{val1: 016, val2: 016, metadata: 59} |{val1: 019, val2: 019, metadata: 27} - |block ^^ at:247 + |block ^^ at:247 (93) |{val1: 017, val2: 017, metadata: null} |{val1: 018, val2: 018, metadata: null} - |block ^^ at:281 + |block ^^ at:281 (59) |{val1: 020, val2: 020, metadata: null} - |block ^^ at:313""".stripMargin, + |block ^^ at:313 (27)""".stripMargin, strList.mkString("\n").replaceAll("\"", "")) } @@ -221,14 +224,16 @@ class TestAvroBtreeStorageFile extends AvroExtensions { val kvStorageFileReader = simpleStorage.reader(filename) + Assertions.assertFalse(kvStorageFileReader.getIterator(simpleSchema.createRecord("a0")).hasNext) + Assertions.assertEquals(List("1", "3"), kvStorageFileReader.getIterator(simpleSchema.createRecord("a1")).map(_.get("val2").toString).toList) + Assertions.assertFalse(kvStorageFileReader.getIterator(simpleSchema.createRecord("a2")).hasNext) + Assertions.assertEquals(List("2"), kvStorageFileReader.getIterator(simpleSchema.createRecord("b1")).map(_.get("val2").toString).toList) - Assertions.assertFalse(kvStorageFileReader.getIterator(simpleSchema.createRecord("a0")).hasNext) - Assertions.assertFalse(kvStorageFileReader.getIterator(simpleSchema.createRecord("a2")).hasNext) Assertions.assertFalse(kvStorageFileReader.getIterator(simpleSchema.createRecord("c1")).hasNext) } diff --git a/dione-spark/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroHdfsIndexerWithSpark.scala b/dione-spark/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroHdfsIndexerWithSpark.scala index c8aadc78..f2cb2f59 100644 --- a/dione-spark/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroHdfsIndexerWithSpark.scala +++ b/dione-spark/src/test/scala/com/paypal/dione/hdfs/index/avro/TestAvroHdfsIndexerWithSpark.scala @@ -77,13 +77,14 @@ class TestAvroHdfsIndexerWithSpark extends AvroExtensions { @Order(2) @Test def testIndexKeyValue(): Unit = { - val avroBtreeStorageFileReader = AvroBtreeStorageFileReader(baseTestPath + "avro_hdfs_btree/index_part-00001") + val path = baseTestPath + "avro_hdfs_btree/index_part-00001" - Assertions.assertEquals(10, avroBtreeStorageFileReader.getIterator().size) + Assertions.assertEquals(10, AvroBtreeStorageFileReader(path).getIterator().size) + val avroBtreeStorageFileReader = AvroBtreeStorageFileReader(path) + Assertions.assertEquals(None, avroBtreeStorageFileReader.get(createRecord(avroKeySchema, "foo"))) val filename = avroBtreeStorageFileReader.get(createRecord(avroKeySchema, "msg_13")).get.get(FILE_NAME_COLUMN).toString Assertions.assertEquals("part-00001", filename.substring(filename.lastIndexOf("/") + 1).substring(0, 10)) - Assertions.assertEquals(None, avroBtreeStorageFileReader.get(createRecord(avroKeySchema, "foo"))) } @Order(3) @@ -128,14 +129,14 @@ class TestAvroHdfsIndexerWithSpark extends AvroExtensions { def testFolderKeyValue(): Unit = { val avroBtreeStorageFileReader = AvroBtreeStorageFileReader(baseTestPath + "avro_folder_btree/idx_file") + Assertions.assertEquals(None, avroBtreeStorageFileReader.get(createRecord(avroKeySchema, "m3"))) + val gr = avroBtreeStorageFileReader.get(createRecord(avroKeySchema, "msg_20")).get val filename = gr.get(FILE_NAME_COLUMN).toString Assertions.assertEquals("part-00002", filename.substring(filename.lastIndexOf("/")+1).substring(0,10)) Assertions.assertEquals("2018-10-04 12:34:20", gr.get("time_result_created").toString) - avroBtreeStorageFileReader.fileReader.sync(0) Assertions.assertEquals(30, avroBtreeStorageFileReader.getIterator().size) - Assertions.assertEquals(None, avroBtreeStorageFileReader.get(createRecord(avroKeySchema, "m3"))) } @Order(6)