@@ -12,7 +12,7 @@ import com.workday.warp.common.utils.StackTraceFilter
12
12
import com .workday .warp .persistence .CorePersistenceAware
13
13
import com .workday .warp .persistence .TablesLike ._
14
14
import com .workday .warp .persistence .Tables ._
15
- import org .influxdb .dto .{BatchPoints , Point , Pong }
15
+ import org .influxdb .dto .{BatchPoints , Point , Pong , Query , QueryResult }
16
16
import org .influxdb .{InfluxDB , InfluxDBFactory }
17
17
import org .pmw .tinylog .Logger
18
18
@@ -34,12 +34,12 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
34
34
* @param seriesName the series (also referred to as a measurement) to use for persistence.
35
35
*/
36
36
def persistHeapHistogram (histo : HeapHistogram , dbName : String , seriesName : String , warpTestName : String ): Try [Unit ] = {
37
- InfluxDBClient .client match {
38
- case None => Failure (new RuntimeException ( " unable to connect to influxdb " ))
39
- case Some (client) =>
37
+ InfluxDBClient .maybeClient match {
38
+ case Left (error) => Failure (new WarpConfigurationException (error ))
39
+ case Right (client) =>
40
40
// create the database if necessary
41
41
if (! this .databaseExists(dbName).getOrElse(false )) {
42
- client .createDatabase(dbName)
42
+ this .createDatabase(dbName)
43
43
}
44
44
45
45
// timestamp to use for all points in this batch
@@ -120,12 +120,12 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
120
120
testExecutions : Iterable [T ],
121
121
threshold : Option [Duration ] = None ): Try [Unit ] = {
122
122
123
- InfluxDBClient .client match {
124
- case None => Failure (new RuntimeException ( " unable to connect to influxdb " ))
125
- case Some (client) =>
123
+ InfluxDBClient .maybeClient match {
124
+ case Left (error) => Failure (new WarpConfigurationException (error ))
125
+ case Right (client) =>
126
126
// create the database if necessary
127
127
if (! this .databaseExists(dbName).getOrElse(false )) {
128
- client .createDatabase(dbName)
128
+ this .createDatabase(dbName)
129
129
}
130
130
131
131
val points = addPoints(dbName, seriesName, testExecutions, threshold)
@@ -157,9 +157,20 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
157
157
* @return true iff databaseName exists as a database in InfluxDB and we have a successful connection.
158
158
*/
159
159
def databaseExists (databaseName : String ): Try [Boolean ] = {
160
- InfluxDBClient .client match {
161
- case None => Failure (new WarpConfigurationException (InfluxDBClient .error))
162
- case Some (client) => Try (client.describeDatabases.asScala.exists(_.equals(databaseName)))
160
+ val showQuery : Query = new Query (" SHOW DATABASES" , databaseName)
161
+ InfluxDBClient .maybeClient match {
162
+ case Left (error) => Failure (new WarpConfigurationException (error))
163
+ case Right (client) => Try {
164
+ val results : Seq [QueryResult .Result ] = client.query(showQuery).getResults.asScala
165
+ val databaseNames : Seq [String ] = for {
166
+ res <- results
167
+ serie <- res.getSeries.asScala
168
+ value <- serie.getValues.asScala
169
+ name <- value.asScala
170
+ } yield name.toString
171
+
172
+ databaseNames.exists(_.equals(databaseName))
173
+ }
163
174
}
164
175
}
165
176
@@ -169,10 +180,27 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
169
180
*
170
181
* @param database name of the database to delete
171
182
*/
172
- def deleteDatabase (database : String ): Try [Unit ] = {
173
- InfluxDBClient .client match {
174
- case None => Failure (new WarpConfigurationException (InfluxDBClient .error))
175
- case Some (client) => Try (client.deleteDatabase(database))
183
+ def dropDatabase (database : String ): Try [Unit ] = {
184
+ val dropQuery : Query = new Query (s """ DROP DATABASE " $database" """ , database)
185
+
186
+ InfluxDBClient .maybeClient match {
187
+ case Left (error) => Failure (new WarpConfigurationException (error))
188
+ case Right (client) => Try (client.query(dropQuery))
189
+ }
190
+ }
191
+
192
+
193
+ /**
194
+ *
195
+ * @param database
196
+ * @return
197
+ */
198
+ def createDatabase (database : String ): Try [Unit ] = {
199
+ val createQuery : Query = new Query (s """ CREATE DATABASE " $database" """ , database)
200
+
201
+ InfluxDBClient .maybeClient match {
202
+ case Left (error) => Failure (new WarpConfigurationException (error))
203
+ case Right (client) => Try (client.query(createQuery))
176
204
}
177
205
}
178
206
@@ -181,9 +209,9 @@ trait InfluxDBClient extends StackTraceFilter with CorePersistenceAware {
181
209
* @return a Pong object describing the deployed influxdb server
182
210
*/
183
211
def ping : Try [Pong ] = {
184
- InfluxDBClient .client match {
185
- case None => Failure (new WarpConfigurationException (InfluxDBClient . error))
186
- case Some (client) => Try (client.ping)
212
+ InfluxDBClient .maybeClient match {
213
+ case Left (error) => Failure (new WarpConfigurationException (error))
214
+ case Right (client) => Try (client.ping)
187
215
}
188
216
}
189
217
}
@@ -196,15 +224,8 @@ object InfluxDBClient {
196
224
private val user : String = WARP_INFLUXDB_USER .value
197
225
private val password : String = WARP_INFLUXDB_PASSWORD .value
198
226
199
- /** [[Option ]] containing an [[InfluxDB ]]. Use this to write datapoints to influxdb. */
200
- val client : Option [InfluxDB ] = this .connect
201
-
202
-
203
- /** a simple error message containing url, user, password we attempted to connect with. */
204
- lazy val error : String = {
205
- s " unable to connect to influxdb at ${this .url} using credentials (user = ${this .user}, password = ${this .password}) "
206
- }
207
-
227
+ /** [[Either ]] containing an error message, or an [[InfluxDB ]]. Use this to write datapoints to influxdb. */
228
+ val maybeClient : Either [String , InfluxDB ] = this .connect(this .url, this .user, this .password)
208
229
209
230
/**
210
231
* Constructs a [[BatchPoints ]].
@@ -222,14 +243,17 @@ object InfluxDBClient {
222
243
223
244
224
245
/** @return an InfluxDB connection based on the values set in WarpProperty. */
225
- private [ this ] def connect : Option [ InfluxDB ] = {
226
- val influx : InfluxDB = InfluxDBFactory .connect(this . url, this . user, this . password)
246
+ protected [influxdb ] def connect ( url : String , user : String , password : String ) : Either [ String , InfluxDB ] = {
247
+ val influx : InfluxDB = InfluxDBFactory .connect(url, user, password)
227
248
228
249
Try (influx.ping) match {
229
250
case Failure (exception) =>
230
- Logger .warn(this .error, exception.getMessage)
231
- None
232
- case Success (_) => Option (influx)
251
+ val error : String =
252
+ s " unable to connect to influxdb at $url using credentials (user = $user, password = $password) "
253
+ Logger .warn(error, exception.getMessage)
254
+ Left (error)
255
+ case Success (_) =>
256
+ Right (influx)
233
257
}
234
258
}
235
259
}
0 commit comments