Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.logging.{Level, Logger}

import com.datastax.driver.core._
import com.datastax.driver.core.exceptions.{DriverException, NoHostAvailableException, QueryExecutionException, QueryValidationException}
import com.datastax.driver.core.exceptions.{DriverException, NoHostAvailableException, QueryExecutionException, QueryValidationException, InvalidQueryException}
import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder}
import com.google.inject.Inject
import org.apache.mesos.Protos.{TaskState, TaskStatus}
Expand Down Expand Up @@ -164,15 +164,30 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan
case Some(c) =>
try {
val session = c.build.connect()
session.execute(new SimpleStatement(
s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
))
session.execute(new SimpleStatement(
s"USE ${config.cassandraKeyspace()};"
))

session.execute(new SimpleStatement(
s"CREATE TABLE IF NOT EXISTS ${config.cassandraTable()}" +

val createKeyspaceStatement = s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"
try {
session.execute(new SimpleStatement(
createKeyspaceStatement
))
} catch {
case e: Exception =>
log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createKeyspaceStatement))
throw e
}

val useKeyspaceStatement = s"USE ${config.cassandraKeyspace()};"
try {
session.execute(new SimpleStatement(
useKeyspaceStatement
))
} catch {
case e: Exception =>
log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(useKeyspaceStatement))
throw e
}

val createTableStatement = s"CREATE TABLE IF NOT EXISTS ${config.cassandraTable()}" +
"""
|(
| id VARCHAR,
Expand All @@ -190,17 +205,34 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan
| WITH bloom_filter_fp_chance=0.100000 AND
| compaction = {'class':'LeveledCompactionStrategy'}
""".stripMargin
))
session.execute(new SimpleStatement(
s"CREATE INDEX IF NOT EXISTS ON ${config.cassandraTable()} ($JOB_NAME);"
))
try {
session.execute(new SimpleStatement(
createTableStatement
))
} catch {
case e: Exception =>
log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createTableStatement))
throw e
}

val createIndexStatement = s"CREATE INDEX IF NOT EXISTS ON ${config.cassandraTable()} ($JOB_NAME);"
try {
session.execute(new SimpleStatement(
createIndexStatement
))
} catch {
case e: InvalidQueryException =>
log.log(Level.WARNING, "Caught InvalidQueryException when creating Cassandra JobStats session (%s), probably not fatal".format(createIndexStatement), e)
case e: Exception =>
log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createIndexStatement))
throw e
}

/*
* highest bloom filter to reduce memory consumption and reducing
* false positives
*/
session.execute(new SimpleStatement(
s"CREATE TABLE IF NOT EXISTS ${config.cassandraStatCountTable()}" +
val createStatTableStatement = s"CREATE TABLE IF NOT EXISTS ${config.cassandraStatCountTable()}" +
"""
|(
| task_id VARCHAR,
Expand All @@ -210,7 +242,15 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan
| WITH bloom_filter_fp_chance=0.100000 AND
| compaction = {'class':'LeveledCompactionStrategy'}
""".stripMargin
))
try {
session.execute(new SimpleStatement(
createStatTableStatement
))
} catch {
case e: Exception =>
log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createStatTableStatement))
throw e
}

_session = Some(session)
_session
Expand Down