Skip to content

Commit

Permalink
Merge pull request #46 from scalableminds/close-all-iterators
Browse files Browse the repository at this point in the history
Close all RocksIterators
  • Loading branch information
fm3 authored Aug 20, 2024
2 parents 475c34b + f8efd58 commit d851fd3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 45 deletions.
15 changes: 7 additions & 8 deletions src/main/scala/com/scalableminds/fossildb/FossilDBGrpcImpl.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.scalableminds.fossildb

import java.io.{PrintWriter, StringWriter}

import com.google.protobuf.ByteString
import com.scalableminds.fossildb.db.StoreManager
import com.scalableminds.fossildb.proto.fossildbapi._
Expand All @@ -20,7 +19,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)

override def get(req: GetRequest): Future[GetReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val versionedKeyValuePairOpt = store.get(req.key, req.version)
val versionedKeyValuePairOpt = store.withRawRocksIterator{rocksIt => store.get(rocksIt, req.key, req.version)}
versionedKeyValuePairOpt match {
case Some(pair) => GetReply(success = true, None, ByteString.copyFrom(pair.value), pair.version)
case None =>
Expand All @@ -31,7 +30,7 @@ class FossilDBGrpcImpl(storeManager: StoreManager)

override def put(req: PutRequest): Future[PutReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val version = req.version.getOrElse(store.get(req.key, None).map(_.version + 1).getOrElse(0L))
val version = store.withRawRocksIterator{rocksIt => req.version.getOrElse(store.get(rocksIt, req.key, None).map(_.version + 1).getOrElse(0L))}
require(version >= 0, "Version numbers must be non-negative")
store.put(req.key, version, req.value.toByteArray)
PutReply(success = true)
Expand All @@ -45,31 +44,31 @@ class FossilDBGrpcImpl(storeManager: StoreManager)

override def getMultipleVersions(req: GetMultipleVersionsRequest): Future[GetMultipleVersionsReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val (values, versions) = store.getMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
val (values, versions) = store.withRawRocksIterator{rocksIt => store.getMultipleVersions(rocksIt, req.key, req.oldestVersion, req.newestVersion)}
GetMultipleVersionsReply(success = true, None, values.map(ByteString.copyFrom), versions)
} { errorMsg => GetMultipleVersionsReply(success = false, errorMsg) }

override def getMultipleKeys(req: GetMultipleKeysRequest): Future[GetMultipleKeysReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val (keys, values, versions) = store.getMultipleKeys(req.startAfterKey, req.prefix, req.version, req.limit)
val (keys, values, versions) = store.withRawRocksIterator{rocksIt => store.getMultipleKeys(rocksIt, req.startAfterKey, req.prefix, req.version, req.limit)}
GetMultipleKeysReply(success = true, None, keys, values.map(ByteString.copyFrom), versions)
} { errorMsg => GetMultipleKeysReply(success = false, errorMsg) }

override def deleteMultipleVersions(req: DeleteMultipleVersionsRequest): Future[DeleteMultipleVersionsReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
store.deleteMultipleVersions(req.key, req.oldestVersion, req.newestVersion)
store.withRawRocksIterator{rocksIt => store.deleteMultipleVersions(rocksIt, req.key, req.oldestVersion, req.newestVersion)}
DeleteMultipleVersionsReply(success = true)
} { errorMsg => DeleteMultipleVersionsReply(success = false, errorMsg) }

override def listKeys(req: ListKeysRequest): Future[ListKeysReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val keys = store.listKeys(req.limit, req.startAfterKey)
val keys = store.withRawRocksIterator{rocksIt => store.listKeys(rocksIt, req.limit, req.startAfterKey)}
ListKeysReply(success = true, None, keys)
} { errorMsg => ListKeysReply(success = false, errorMsg) }

override def listVersions(req: ListVersionsRequest): Future[ListVersionsReply] = withExceptionHandler(req) {
val store = storeManager.getStore(req.collection)
val versions = store.listVersions(req.key, req.limit, req.offset)
val versions = store.withRawRocksIterator{rocksIt => store.listVersions(rocksIt, req.key, req.limit, req.offset)}
ListVersionsReply(success = true, None, versions)
} { errorMsg => ListVersionsReply(success = false, errorMsg) }

Expand Down
47 changes: 31 additions & 16 deletions src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.nio.file.{Files, Path}
import java.util
import scala.collection.mutable
import scala.concurrent.Future
import scala.jdk.CollectionConverters.{ListHasAsScala, BufferHasAsJava, SeqHasAsJava}
import scala.jdk.CollectionConverters.{BufferHasAsJava, ListHasAsScala, SeqHasAsJava}
import scala.language.postfixOps

case class BackupInfo(id: Int, timestamp: Long, size: Long)
Expand Down Expand Up @@ -35,7 +35,8 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
}
}
options.setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
val defaultColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(columnOptions)
val defaultColumnFamilyOptions: ColumnFamilyOptions = cfListRef.find(_.getName sameElements RocksDB.DEFAULT_COLUMN_FAMILY).map(_.getOptions).getOrElse(columnOptions)
println(defaultColumnFamilyOptions)
val newColumnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).diff(cfListRef.toList.map(_.getName)).map(new ColumnFamilyDescriptor(_, defaultColumnFamilyOptions))
val columnFamilyDescriptors = cfListRef.toList ::: newColumnFamilyDescriptors
logger.info("Opening RocksDB at " + dataDir.toAbsolutePath)
Expand Down Expand Up @@ -84,8 +85,11 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePat
logger.info(s"Exporting to new DB at ${newDataDir.toString} with options file $newOptionsFilePathOpt")
val newManager = new RocksDBManager(newDataDir, columnFamilies, newOptionsFilePathOpt)
newManager.columnFamilyHandles.foreach { case (name, handle) =>
val dataIterator = getStoreForColumnFamily(name).get.scan("", None)
dataIterator.foreach(el => newManager.db.put(handle, el.key.getBytes, el.value))
val store = getStoreForColumnFamily(name).get
store.withRawRocksIterator { rocksIt =>
val dataIterator = RocksDBStore.scan(rocksIt, "", None)
dataIterator.foreach(el => newManager.db.put(handle, el.key.getBytes, el.value))
}
}
logger.info("Writing data completed. Start compaction")
newManager.db.compactRange()
Expand Down Expand Up @@ -128,20 +132,17 @@ class RocksDBIterator(it: RocksIterator, prefix: Option[String]) extends Iterato

class RocksDBStore(db: RocksDB, handle: ColumnFamilyHandle) extends LazyLogging {

def get(key: String): Array[Byte] = {
db.get(handle, key.getBytes())
}

def scan(key: String, prefix: Option[String]): RocksDBIterator = {
val it = db.newIterator(handle)
it.seek(key.getBytes())
new RocksDBIterator(it, prefix)
def withRawRocksIterator[T](block: RocksIterator => T): T = {
val rocksIt = db.newIterator(handle)
try {
block(rocksIt)
} finally {
rocksIt.close()
}
}

def scanKeysOnly(key: String, prefix: Option[String]): RocksDBKeyIterator = {
val it = db.newIterator(handle)
it.seek(key.getBytes())
new RocksDBKeyIterator(it, prefix)
def get(key: String): Array[Byte] = {
db.get(handle, key.getBytes())
}

def put(key: String, value: Array[Byte]): Unit = {
Expand All @@ -153,3 +154,17 @@ class RocksDBStore(db: RocksDB, handle: ColumnFamilyHandle) extends LazyLogging
}

}

object RocksDBStore {

def scan(rocksIt: RocksIterator, key: String, prefix: Option[String]): RocksDBIterator = {
rocksIt.seek(key.getBytes())
new RocksDBIterator(rocksIt, prefix)
}

def scanKeysOnly(rocksIt: RocksIterator, key: String, prefix: Option[String]): RocksDBKeyIterator = {
rocksIt.seek(key.getBytes())
new RocksDBKeyIterator(rocksIt, prefix)
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.scalableminds.fossildb.db

import org.rocksdb.RocksIterator

import scala.annotation.tailrec
import scala.util.Try

Expand Down Expand Up @@ -56,7 +58,7 @@ class VersionFilterIterator(it: RocksDBIterator, version: Option[Long]) extends

}

class KeyOnlyIterator[T](underlying: RocksDBStore, startAfterKey: Option[String]) extends Iterator[String] {
class KeyOnlyIterator[T](rocksIt: RocksIterator, startAfterKey: Option[String]) extends Iterator[String] {

/*
Note that seek in the underlying iterators either hits precisely or goes to the
Expand All @@ -72,13 +74,13 @@ class KeyOnlyIterator[T](underlying: RocksDBStore, startAfterKey: Option[String]
}

override def hasNext: Boolean = {
val it = underlying.scanKeysOnly(compositeKeyFor(currentKey), None)
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), None)
if (it.hasNext && currentKey.isDefined && currentKey.contains(VersionedKey(it.peek).get.key)) it.next()
it.hasNext
}

override def next(): String = {
val it = underlying.scanKeysOnly(compositeKeyFor(currentKey), None)
val it = RocksDBStore.scanKeysOnly(rocksIt, compositeKeyFor(currentKey), None)
if (it.hasNext && currentKey.isDefined && currentKey.contains(VersionedKey(it.peek).get.key)) it.next()
val nextKey = VersionedKey(it.next()).get.key
currentKey = Some(nextKey)
Expand All @@ -90,10 +92,12 @@ class KeyOnlyIterator[T](underlying: RocksDBStore, startAfterKey: Option[String]

class VersionedKeyValueStore(underlying: RocksDBStore) {

def get(key: String, version: Option[Long] = None): Option[VersionedKeyValuePair[Array[Byte]]] =
scanVersionValuePairs(key, version).nextOption()
def withRawRocksIterator[T](block: RocksIterator => T): T = underlying.withRawRocksIterator(block)

def get(rocksIt: RocksIterator, key: String, version: Option[Long] = None): Option[VersionedKeyValuePair[Array[Byte]]] =
scanVersionValuePairs(rocksIt, key, version).nextOption()

def getMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): (List[Array[Byte]], List[Long]) = {
def getMultipleVersions(rocksIt: RocksIterator, key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): (List[Array[Byte]], List[Long]) = {

@tailrec
def toListIter(versionIterator: Iterator[VersionedKeyValuePair[Array[Byte]]],
Expand All @@ -106,31 +110,31 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
}
}

val iterator = scanVersionValuePairs(key, newestVersion)
val iterator = scanVersionValuePairs(rocksIt, key, newestVersion)
val (versions, keys) = toListIter(iterator, List(), List())
(versions.reverse, keys.reverse)
}

private def scanVersionValuePairs(key: String, version: Option[Long] = None): Iterator[VersionedKeyValuePair[Array[Byte]]] = {
private def scanVersionValuePairs(rocksIt: RocksIterator, key: String, version: Option[Long] = None): Iterator[VersionedKeyValuePair[Array[Byte]]] = {
requireValidKey(key)
val prefix = s"$key${VersionedKey.versionSeparator}"
underlying.scan(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { pair =>
RocksDBStore.scan(rocksIt, version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { pair =>
VersionedKey(pair.key).map(VersionedKeyValuePair(_, pair.value))
}
}

private def scanVersionsOnly(key: String, version: Option[Long] = None): Iterator[VersionedKey] = {
private def scanVersionsOnly(rocksIt: RocksIterator, key: String, version: Option[Long] = None): Iterator[VersionedKey] = {
requireValidKey(key)
val prefix = s"$key${VersionedKey.versionSeparator}"
underlying.scanKeysOnly(version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { key =>
RocksDBStore.scanKeysOnly(rocksIt, version.map(VersionedKey(key, _).toString).getOrElse(prefix), Some(prefix)).flatMap { key =>
VersionedKey(key)
}
}

def getMultipleKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
def getMultipleKeys(rocksIt: RocksIterator, startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None, limit: Option[Int]): (Seq[String], Seq[Array[Byte]], Seq[Long]) = {
startAfterKey.foreach(requireValidKey)
prefix.foreach{ p => requireValidKey(p)}
val iterator: VersionFilterIterator = scanKeys(startAfterKey, prefix, version)
val iterator: VersionFilterIterator = scanKeys(rocksIt, startAfterKey, prefix, version)

/*
Note that seek in the underlying iterators either hits precisely or goes to the
Expand All @@ -155,12 +159,12 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
(keys, values, versions)
}

private def scanKeys(startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator = {
private def scanKeys(rocksIt: RocksIterator, startAfterKey: Option[String], prefix: Option[String] = None, version: Option[Long] = None): VersionFilterIterator = {
val fullKey = startAfterKey.map(key => s"$key${VersionedKey.versionSeparator}").orElse(prefix).getOrElse("")
new VersionFilterIterator(underlying.scan(fullKey, prefix), version)
new VersionFilterIterator(RocksDBStore.scan(rocksIt, fullKey, prefix), version)
}

def deleteMultipleVersions(key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): Unit = {
def deleteMultipleVersions(rocksIt: RocksIterator, key: String, oldestVersion: Option[Long] = None, newestVersion: Option[Long] = None): Unit = {
@tailrec
def deleteIter(versionIterator: Iterator[VersionedKey]): Unit = {
if (versionIterator.hasNext) {
Expand All @@ -172,7 +176,7 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
}
}

val versionsIterator = scanVersionsOnly(key, newestVersion)
val versionsIterator = scanVersionsOnly(rocksIt, key, newestVersion)
deleteIter(versionsIterator)
}

Expand All @@ -186,13 +190,13 @@ class VersionedKeyValueStore(underlying: RocksDBStore) {
underlying.delete(VersionedKey(key, version).toString)
}

def listKeys(limit: Option[Int], startAfterKey: Option[String]): Seq[String] = {
val iterator = new KeyOnlyIterator(underlying, startAfterKey)
def listKeys(rocksIt: RocksIterator, limit: Option[Int], startAfterKey: Option[String]): Seq[String] = {
val iterator = new KeyOnlyIterator(rocksIt, startAfterKey)
iterator.take(limit.getOrElse(Int.MaxValue)).toSeq
}

def listVersions(key: String, limit: Option[Int], offset: Option[Int]): Seq[Long] = {
val iterator = scanVersionsOnly(key)
def listVersions(rocksIt: RocksIterator, key: String, limit: Option[Int], offset: Option[Int]): Seq[Long] = {
val iterator = scanVersionsOnly(rocksIt, key)
iterator.map(_.version).drop(offset.getOrElse(0)).take(limit.getOrElse(Int.MaxValue)).toSeq
}

Expand Down

0 comments on commit d851fd3

Please sign in to comment.