Skip to content

Commit

Permalink
Merge pull request #20 from scalableminds/rocksdb-configfile
Browse files Browse the repository at this point in the history
optionally read rocksdb configfile
  • Loading branch information
fm3 authored Jul 9, 2018
2 parents 7a6a995 + dc19df0 commit 8c50a96
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ libraryDependencies ++= Seq(
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
"io.grpc" % "grpc-services" % scalapb.compiler.Version.grpcJavaVersion,
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
"org.rocksdb" % "rocksdbjni" % "5.1.2",
"org.rocksdb" % "rocksdbjni" % "5.11.3",
"com.github.scopt" %% "scopt" % "3.7.0"
)

Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/com/scalableminds/fossildb/FossilDB.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import fossildb.BuildInfo

import scala.concurrent.ExecutionContext

object ConfigDefaults {val port = 7155; val dataDir = "data"; val backupDir = "backup"; val columnFamilies = List()}
object ConfigDefaults {val port = 7155; val dataDir = "data"; val backupDir = "backup"; val columnFamilies = List(); val rocksOptionsFile = None}
case class Config(port: Int = ConfigDefaults.port, dataDir: String = ConfigDefaults.dataDir,
backupDir: String = ConfigDefaults.backupDir, columnFamilies: List[String] = ConfigDefaults.columnFamilies)
backupDir: String = ConfigDefaults.backupDir, columnFamilies: List[String] = ConfigDefaults.columnFamilies,
rocksOptionsFile: Option[String] = ConfigDefaults.rocksOptionsFile)

object FossilDB extends LazyLogging {
def main(args: Array[String]) = {
Expand All @@ -24,7 +25,7 @@ object FossilDB extends LazyLogging {
logger.info("BuildInfo: (" + BuildInfo + ")")
logger.info("Config: " + config)

val storeManager = new StoreManager(Paths.get(config.dataDir), Paths.get(config.backupDir), config.columnFamilies)
val storeManager = new StoreManager(Paths.get(config.dataDir), Paths.get(config.backupDir), config.columnFamilies, config.rocksOptionsFile)

val server = new FossilDBServer(storeManager, config.port, ExecutionContext.global)

Expand All @@ -51,6 +52,9 @@ object FossilDB extends LazyLogging {

opt[Seq[String]]('c', "columnFamilies").required.valueName("<cf1>,<cf2>...").action( (x, c) =>
c.copy(columnFamilies = x.toList) ).text("column families of the database (created if there is no db yet)")

opt[String]('r', "rocksOptionsFile").valueName("<filepath>").action( (x, c) =>
c.copy(rocksOptionsFile = Some(x)) ).text("rocksdb options file. Default: " + ConfigDefaults.rocksOptionsFile)
}

parser.parse(args, Config())
Expand Down
17 changes: 15 additions & 2 deletions src/main/scala/com/scalableminds/fossildb/db/RocksDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import com.typesafe.scalalogging.LazyLogging
import org.rocksdb._

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future

case class BackupInfo(id: Int, timestamp: Long, size: Long)

case class KeyValuePair[T](key: String, value: T)

class RocksDBManager(dataDir: Path, columnFamilies: List[String]) extends LazyLogging {
class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePathOpt: Option[String]) extends LazyLogging {

val (db: RocksDB, columnFamilyHandles) = {
RocksDB.loadLibrary()
Expand All @@ -28,7 +29,19 @@ class RocksDBManager(dataDir: Path, columnFamilies: List[String]) extends LazyLo
new ColumnFamilyDescriptor(columnFamily, columnOptions)
}
val columnFamilyHandles = new util.ArrayList[ColumnFamilyHandle]
val options = new DBOptions()
var options = new DBOptions()
var cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer()
optionsFilePathOpt.map { optionsFilePath =>
try {
org.rocksdb.OptionsUtil.loadOptionsFromFile(optionsFilePath, Env.getDefault, options, cfListRef.asJava)
logger.info("successfully loaded rocksdb options from " + optionsFilePath)
} catch {
case e: Exception => {
throw new Exception("Failed to load rocksdb options from file " + optionsFilePath, e)
}
}
}
options = options
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true)
logger.info("Opening RocksDB at " + dataDir.toAbsolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package com.scalableminds.fossildb.db
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicBoolean

class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String]) {
class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String], rocksdbOptions: Option[String]) {

var rocksDBManager: Option[RocksDBManager] = None
var stores: Option[Map[String, VersionedKeyValueStore]] = None
Expand All @@ -15,7 +15,7 @@ class StoreManager(dataDir: Path, backupDir: Path, columnFamilies: List[String])

def reInitialize = {
rocksDBManager.map(_.close)
rocksDBManager = Some(new RocksDBManager(dataDir, columnFamilies))
rocksDBManager = Some(new RocksDBManager(dataDir, columnFamilies, rocksdbOptions))
stores = Some(columnFamilies.map { cf =>
val store: VersionedKeyValueStore = new VersionedKeyValueStore(rocksDBManager.get.getStoreForColumnFamily(cf).get)
(cf -> store)
Expand Down
13 changes: 3 additions & 10 deletions src/test/scala/com/scalableminds/fossildb/FossilDBSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import org.scalatest.{BeforeAndAfterEach, FlatSpec}

import scala.concurrent.ExecutionContext

class FossilDBSuite extends FlatSpec with BeforeAndAfterEach {
val testTempDir = "testData"
class FossilDBSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers {
val testTempDir = "testData1"
val dataDir = Paths.get(testTempDir, "data")
val backupDir = Paths.get(testTempDir, "backup")

Expand All @@ -38,20 +38,13 @@ class FossilDBSuite extends FlatSpec with BeforeAndAfterEach {
val anotherKey = "anotherKey"
val aThirdKey = "aThirdKey"

private def deleteRecursively(file: File): Unit = {
if (file.isDirectory)
file.listFiles.foreach(deleteRecursively)
if (file.exists && !file.delete)
throw new Exception(s"Unable to delete ${file.getAbsolutePath}")
}

override def beforeEach = {
deleteRecursively(new File(testTempDir))
new File(testTempDir).mkdir()

val columnFamilies = List(collectionA, collectionB)

val storeManager = new StoreManager(dataDir, backupDir, columnFamilies)
val storeManager = new StoreManager(dataDir, backupDir, columnFamilies, None)

serverOpt.map(_.stop())
serverOpt = Some(new FossilDBServer(storeManager, port, ExecutionContext.global))
Expand Down
74 changes: 74 additions & 0 deletions src/test/scala/com/scalableminds/fossildb/RocksOptionsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2011-2018 scalable minds UG (haftungsbeschränkt) & Co. KG. <http://scm.io>
*/
package com.scalableminds.fossildb

import java.io.File
import java.nio.file.Paths

import com.scalableminds.fossildb.db.StoreManager
import org.rocksdb.{ColumnFamilyDescriptor, DBOptions, Env}
import org.scalatest.{BeforeAndAfterEach, FlatSpec}

import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.io.Source

class RocksOptionsSuite extends FlatSpec with BeforeAndAfterEach with TestHelpers {

val testTempDir = "testData2"
val dataDir = Paths.get(testTempDir, "data")
val backupDir = Paths.get(testTempDir, "backup")

val collectionA = "collectionA"
val collectionB = "collectionB"

val columnFamilies = List(collectionA, collectionB)



override def beforeEach = {
deleteRecursively(new File(testTempDir))
new File(testTempDir).mkdir()
}

override def afterEach = {
deleteRecursively(new File(testTempDir))
}



"Initializing the StoreManager" should "load and use a specified config file" in {
val file = new File(testTempDir, "testConfig.ini")
writeToFile(file, "[Version]\n rocksdb_version=5.11.3\n options_file_version=1.1\n\n[DBOptions]\n stats_dump_period_sec=700\n\n[CFOptions \"default\"]\n\n")

val storeManager = new StoreManager(dataDir, backupDir, columnFamilies, Some(file.getPath))

var options = new DBOptions()
.setStatsDumpPeriodSec(100)
var cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer()
// if successful, the rocksdb writes the loaded options to a file that can then be retreived with loadLatestOptions
// we test that that one now includes the value 700 from the file above, rather than the 100 specified as a default
org.rocksdb.OptionsUtil.loadLatestOptions(dataDir.toString, Env.getDefault, options, cfListRef.asJava)
assert(options.statsDumpPeriodSec() == 700)
storeManager.close
}

it should "fail if specified config file does not exist" in {
assertThrows[Exception] {
new StoreManager(dataDir, backupDir, columnFamilies, Some("nonExistingPath.ini"))
}
}

it should "fail if specified config file is invalid" in {
val file = new File(testTempDir, "testConfig.ini")
writeToFile(file, "[Version]\n rocksdb_version=5.11.3\n options_file_version=1.1\n\n[DBOptions]\n stats_dump_period_sec=700")

assertThrows[Exception] {
new StoreManager(dataDir, backupDir, columnFamilies, Some(file.getPath))
}
}



}
23 changes: 23 additions & 0 deletions src/test/scala/com/scalableminds/fossildb/TestHelpers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (C) 2011-2018 scalable minds UG (haftungsbeschränkt) & Co. KG. <http://scm.io>
*/
package com.scalableminds.fossildb

import java.io.{BufferedWriter, File, FileWriter}

trait TestHelpers {

protected def deleteRecursively(file: File): Unit = {
if (file.isDirectory)
file.listFiles.foreach(deleteRecursively)
if (file.exists && !file.delete)
throw new Exception(s"Unable to delete ${file.getAbsolutePath}")
}

protected def writeToFile(file: File, content: String): Unit = {
val bw = new BufferedWriter(new FileWriter(file))
bw.write(content)
bw.close()
}

}

0 comments on commit 8c50a96

Please sign in to comment.