Run distributed spatial computations and analytics on your geospatial
data using Apache Spark. To instantiate a RDD[SimpleFeature]
,
call GeoMesaSpark.init(sparkConf, ds)
on one of your data stores.
Then, request a RDD
using a CQL filter as follows:
geomesa.compute.spark.GeoMesaSpark.rdd(hadoopConf, sparkContext, ds, query)
The following example demonstrates computing a time series across geospatial data within the constraints of a CQL query.
object CountByDay {
def main(args: Array[String]) {
// Get a handle to the data store
val params = Map(
"instanceId" -> "instance",
"zookeepers" -> "zoo1,zoo2,zoo3",
"user" -> "user",
"password" -> "*****",
"auths" -> "USER,ADMIN",
"tableName" -> "geomesa_catalog")
val ds = DataStoreFinder.getDataStore(params).asInstanceOf[AccumuloDataStore]
// Construct a CQL query to filter by bounding box
val ff = CommonFactoryFinder.getFilterFactory2
val f = ff.bbox("geom", -80, 35, -79, 36, "EPSG:4326")
val q = new Query("myFeatureType", f)
// Configure Spark
val conf = new Configuration
val sconf = init(new SparkConf(true), ds)
val sc = new SparkContext(sconf)
// Create an RDD from a query
val queryRDD = geomesa.compute.spark.GeoMesaSpark.rdd(conf, sc, ds, query)
// Convert RDD[SimpleFeature] to RDD[(String, SimpleFeature)] where the first
// element of the tuple is the date to the day resolution
val dayAndFeature = queryRDD.mapPartitions { iter =>
val df = new SimpleDateFormat("yyyyMMdd")
val ff = CommonFactoryFinder.getFilterFactory2
val exp = ff.property("dtg")
iter.map { f => (df.format(exp.evaluate(f).asInstanceOf[java.util.Date]), f) }
}
// Group the results by day
val groupedByDay = dayAndFeature.groupBy { case (date, _) => date }
// Count the number of features in each day
val countByDay = groupedByDay.map { case (date, iter) => (date, iter.size) }
// Collect the results and print
countByDay.collect.foreach(println)
}
}
To run the example, execute the following command:
$ /opt/spark/bin/spark-submit --master yarn-client --num-executors 40 --executor-cores 4 countbyday.jar --deploy-mode client --class com.mycompany.example.CountByDay
To run the spark shell (for spark version 1.1.0) compile and run:
bin/spark-shell --driver-class-path /path/to/geomesa-compute-accumulo1.5-1.0.0-rc.3-SNAPSHOT-shaded.jar