From fc277731f9b5233e5ade6cf7f7892a3c36ca51c1 Mon Sep 17 00:00:00 2001 From: Adam Cecile Date: Tue, 21 Aug 2018 15:30:12 +0200 Subject: [PATCH 1/2] Wrap Cassandra CQL calls into separate try/catch for easier debugging --- .../scheduler/jobs/stats/JobStats.scala | 70 ++++++++++++++----- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index dfb83fbf4..efd8c2aec 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -164,15 +164,30 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan case Some(c) => try { val session = c.build.connect() - session.execute(new SimpleStatement( - s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" - )) - session.execute(new SimpleStatement( - s"USE ${config.cassandraKeyspace()};" - )) - - session.execute(new SimpleStatement( - s"CREATE TABLE IF NOT EXISTS ${config.cassandraTable()}" + + + val createKeyspaceStatement = s"CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace()} WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" + try { + session.execute(new SimpleStatement( + createKeyspaceStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createKeyspaceStatement)) + throw e + } + + val useKeyspaceStatement = s"USE ${config.cassandraKeyspace()};" + try { + session.execute(new SimpleStatement( + useKeyspaceStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(useKeyspaceStatement)) + throw e + } + + val createTableStatement = s"CREATE TABLE IF NOT EXISTS ${config.cassandraTable()}" + """ |( | id VARCHAR, @@ -190,17 +205,32 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan | WITH bloom_filter_fp_chance=0.100000 AND | compaction = {'class':'LeveledCompactionStrategy'} """.stripMargin - )) - session.execute(new SimpleStatement( - s"CREATE INDEX IF NOT EXISTS ON ${config.cassandraTable()} ($JOB_NAME);" - )) + try { + session.execute(new SimpleStatement( + createTableStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createTableStatement)) + throw e + } + + val createIndexStatement = s"CREATE INDEX IF NOT EXISTS ON ${config.cassandraTable()} ($JOB_NAME);" + try { + session.execute(new SimpleStatement( + createIndexStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createIndexStatement)) + throw e + } /* * highest bloom filter to reduce memory consumption and reducing * false positives */ - session.execute(new SimpleStatement( - s"CREATE TABLE IF NOT EXISTS ${config.cassandraStatCountTable()}" + + val createStatTableStatement = s"CREATE TABLE IF NOT EXISTS ${config.cassandraStatCountTable()}" + """ |( | task_id VARCHAR, @@ -210,7 +240,15 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan | WITH bloom_filter_fp_chance=0.100000 AND | compaction = {'class':'LeveledCompactionStrategy'} """.stripMargin - )) + try { + session.execute(new SimpleStatement( + createStatTableStatement + )) + } catch { + case e: Exception => + log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createStatTableStatement)) + throw e + } _session = Some(session) _session From 2ce2ed2563bb75993c22068e8320589f11ee8dfa Mon Sep 17 00:00:00 2001 From: Adam Cecile Date: Tue, 21 Aug 2018 15:30:57 +0200 Subject: [PATCH 2/2] Do not re-throw InvalidQueryException when CQL CREATE INDEXES statement fails --- .../apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala index efd8c2aec..dd691ccb3 100644 --- a/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala +++ b/src/main/scala/org/apache/mesos/chronos/scheduler/jobs/stats/JobStats.scala @@ -5,7 +5,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.logging.{Level, Logger} import com.datastax.driver.core._ -import com.datastax.driver.core.exceptions.{DriverException, NoHostAvailableException, QueryExecutionException, QueryValidationException} +import com.datastax.driver.core.exceptions.{DriverException, NoHostAvailableException, QueryExecutionException, QueryValidationException, InvalidQueryException} import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder} import com.google.inject.Inject import org.apache.mesos.Protos.{TaskState, TaskStatus} @@ -221,6 +221,8 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan createIndexStatement )) } catch { + case e: InvalidQueryException => + log.log(Level.WARNING, "Caught InvalidQueryException when creating Cassandra JobStats session (%s), probably not fatal".format(createIndexStatement), e) case e: Exception => log.log(Level.WARNING, "Caught exception when creating Cassandra JobStats session (%s)".format(createIndexStatement)) throw e