#BigSolr
BigSolr will try to provide comprehensive implementation of Solr connectors for Apache Hadoop, Spark and other big data technologies with Hadoop and Spark.
##Features
- Provides custom Hadoop APIs to access Solr servers
- Allows Apache Spark to read and write data with Solr servers through the Hadoop APIs
- Integration with Cascading/Scalding, Pig, Hive, etc. (future plan-- not supported yet)
- Apache Spark 1.1 or higher (1.2 is recommended)
- Apache Solr 4.10.x
The following maven command creates bigsolr-0.1.jar in the target directory.
``` $ mvn clean compile package ```Before running the Spark program, Solr has to be up and running either in StandAlone or SolrCloud mode.
``` In StandAlone/HttpServer mode: $ bin/solr startIn SolrCloud (cloud) mode: $ bin/solr -e cloud
For the following example, create collection/core (index table) named "collection1" once the Solr server is up and running.
## Using Spark Shell
<p><i>Note: please ensure your Spark distribution's Hadoop version! For Hadoop 2.3/2.4 or higher distributions, follow the instructions for New Hadoop API. If your Spark is Hadoop 1.x, please follow the instructions for old Hadoop API below.</i></p>
$ spark-shell --jars target/bigsolr-0.1.jar
### Reading with New Hadoop API (mapreduce)
scala> import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.io.NullWritable
scala> import org.bigsolr.hadoop.SolrInputFormat
scala> import org.bigsolr.hadoop.SolrRecord
For SolrCloud mode
scala> val serverMode: String = "cloud"
scala> val serverUrl: String = "localhost:9983"
For StandAlone/HttpServer mode
scala> val serverMode: String = "standalone"
scala> val serverUrl: String = "http://localhost:8983/solr"
scala> val collection: String = "collection1"
scala> val fields: String = "id,description"
scala> val queryStr: String = "description:*"
scala> var conf = new Configuration()
scala> conf.set("solr.query", queryStr)
scala> conf.set("solr.server.url", serverUrl)
scala> conf.set("solr.server.mode", serverMode)
scala> conf.set("solr.server.collection", collection)
scala> conf.set("solr.server.fields", fields)
scala> val rdds = sc.newAPIHadoopRDD(conf, classOf[SolrInputFormat], classOf[NullWritable], classOf[SolrRecord]).map { case (key, value) => { value.getSolrDocument() } }
scala> rdds.count
scala> rdds.first
scala> rdds.first.getFieldValue("id")
scala> rdds.first.getFieldValue("description")
scala> rdds.first.getFieldValuesMap()
### Indexing with New Hadoop API (mapreduce)
scala> import org.bigsolr.hadoop.SolrOutputFormat scala> import org.bigsolr.hadoop.SolrInputRecord
scala> import org.apache.hadoop.io.MapWritable scala> import org.apache.hadoop.io.NullWritable scala> import org.apache.hadoop.mapreduce.Job // New Hadoop API scala> import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.io.Text;
scala> var conf = new Configuration() scala> conf.set("solr.server.url", serverUrl) scala> conf.set("solr.server.mode", serverMode) scala> conf.set("solr.server.collection", collection) scala> conf.set("solr.server.fields", fields)
// Example with MapWritable
scala> val m1 = Map("id" -> "1", "description" -> "apple orange New York", "author" -> "John") scala> val m2 = Map("id" -> "2", "description" -> "apple peach San Diego", "author" -> "Kevin") scala> val m3 = Map("id" -> "3", "description" -> "Apple tomato San Francisco", "author" -> "Nick") scala> val l1 = ListMap[String,String] scala> val rdds1 = sc.parallelize(l1)
scala> val rdds1a = rdds1.map(e => { val record = new MapWritable() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.put(new Text("id"), new Text(id)) record.put(new Text("description"), new Text(description)) record.put(new Text("author"), new Text(author)) (NullWritable.get, record) })
// Index with MapWritable scala> rdds1a.saveAsNewAPIHadoopFile( "-", // this path parameter will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf )
// Example with SolrInputRecord (Wrapper for SolrInputDocument)
scala> val m4 = Map("id" -> "4", "description" -> "orange lake Florida", "author" -> "Emily") scala> val m5 = Map("id" -> "5", "description" -> "cherry forest Vermont", "author" -> "Kate") scala> val m6 = Map("id" -> "6", "description" -> "strawberry beach California", "author" -> "Monica") scala> val l2 = ListMap[String,String] scala> val rdds2 = sc.parallelize(l2)
scala> val rdds2a = rdds2.map(e => { val record = new SolrInputRecord() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.setField("id", id) record.setField("description", description) record.setField("author", author) //record.put(new Text(id), new Text(description)) (NullWritable.get, record) })
// Index with SolrInputRecord scala> rdds2a.saveAsNewAPIHadoopFile( "-", classOf[NullWritable], classOf[SolrInputRecord], classOf[SolrOutputFormat], conf )
### Reading with old Hadoop API (mapred)</p>
scala> import org.apache.hadoop.mapred.JobConf
scala> import org.apache.hadoop.io.NullWritable
scala> import org.bigsolr.hadoop.SolrInputFormat
scala> import org.bigsolr.hadoop.SolrRecord
For SolrCloud mode
scala> val serverMode: String = "cloud"
scala> val serverUrl: String = "localhost:9983"
For StandAlone/HttpServer mode
scala> val serverMode: String = "standalone"
scala> val serverUrl: String = "http://localhost:8983/solr"
scala> val collection: String = "collection1"
scala> val fields: String = "id,description"
scala> val queryStr: String = "description:*"
scala> var conf = new JobConf(sc.hadoopConfiguration)
scala> conf.set("solr.query", queryStr)
scala> conf.set("solr.server.url", serverUrl)
scala> conf.set("solr.server.mode", serverMode)
scala> conf.set("solr.server.collection", collection)
scala> conf.set("solr.server.fields", fields)
scala> val rdds = sc.hadoopRDD(conf, classOf[SolrInputFormat], classOf[NullWritable], classOf[SolrRecord]).map { case (key, value) => { value.getSolrDocument() } }
scala> rdds.count
scala> rdds.first
scala> rdds.first.getFieldValue("id")
scala> rdds.first.getFieldValue("description")
scala> rdds.first.getFieldValuesMap()
### Indexing with Old Hadoop API (mapred)
scala> import org.bigsolr.hadoop.SolrOutputFormat scala> import org.bigsolr.hadoop.SolrInputRecord
scala> import org.apache.hadoop.io.MapWritable scala> import org.apache.hadoop.io.NullWritable scala> import org.apache.hadoop.mapred.JobConf // Old Hadoop API scala> import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.io.Text;
scala> var conf = new JobConf(sc.hadoopConfiguration) scala> conf.set("solr.server.url", serverUrl) scala> conf.set("solr.server.mode", serverMode) scala> conf.set("solr.server.collection", collection) scala> conf.set("solr.server.fields", fields)
scala> val m1 = Map("id" -> "1", "description" -> "apple orange New York", "author" -> scala> val m2 = Map("id" -> "2", "description" -> "apple peach San Diego", "author" -> "Kevin") scala> val m3 = Map("id" -> "3", "description" -> "Apple tomato San Francisco", "author" -> "Nick") scala> val l1 = ListMap[String,String] scala> val rdds1 = sc.parallelize(l1)
// Example with MapWritable scala> val rdds1a = rdds1.map(e => { val record = new MapWritable() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.put(new Text("id"), new Text(id)) record.put(new Text("description"), new Text(description)) record.put(new Text("author"), new Text(author)) (NullWritable.get, record) })
// Index with MapWritable scala> rdds1a.saveAsHadoopFile( "-", // No Path-- will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf, None )
scala> val m4 = Map("id" -> "4", "description" -> "orange lake Florida", "author" -> "Emily") scala> val m5 = Map("id" -> "5", "description" -> "cherry forest Vermont", "author" -> "Kate") scala> val m6 = Map("id" -> "6", "description" -> "strawberry beach California", "author" -> "Monica") scala> val l2 = ListMap[String,String] scala> val rdds2 = sc.parallelize(l2)
// Example with SolrInputRecord (Wrapper for SolrInputDocument) scala> val rdds2a = rdds2.map(e => { val record = new SolrInputRecord() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.setField("id", id) record.setField("description", description) record.setField("author", author) //record.put(new Text(id), new Text(description)) (NullWritable.get, record) })
// Index with SolrInputRecord scala> rdds1a.saveAsHadoopFile( "-", // No Path-- will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf, None )
## SparkSQL
### Scala API
$ spark-shell --jars target/bigsolr-0.1.jar
scala> import org.apache.spark.sql.SQLContext
scala> val sqlContext = new SQLContext(sc)
scala> import org.bigsolr.spark.solr._
scala> val rdds = sqlContext.query("id:*", "http://localhost:8983/solr", "standalone", "collection1", "id,description")
scala> rdds.count
scala> rdds.first
### SQL API
$ spark-sql --jars target/bigsolr-0.1.jar
spark-sql> CREATE TEMPORARY TABLE solr
> USING org.bigsolr.spark
> OPTIONS (query "id:*", serverUrl "http://localhost:8983/solr", serverMode "standalone", collection "collection1", fields "id,description");
spark-sql> select description from solr;
## Using PySpark Shell
<p><i>Note: please ensure your Spark distribution's Hadoop version! For Hadoop 2.3/2.4 or higher distributions, follow the instructions for New Hadoop API. If your Spark is Hadoop 1.x, please follow the instructions for old Hadoop API below.</i></p>
$ pyspark --jars target/bigsolr-0.1.jar
### Reading from Solr with PySpark
For SolrCloud mode
conf = {"solr.server.url":"localhost:9983", "solr.server.mode":"cloud", "solr.server.collection":"collection1", "solr.query":"id:*", "solr.server.fields":"id,description"}
For StandAlone/HttpServer mode
conf = {"solr.server.url":"http://localhost:8983/solr", "solr.server.mode":"standalone", "solr.server.collection":"collection1", "solr.query":"id:*", "solr.server.fields":"id,description"}
rdds = sc.hadoopRDD("org.bigsolr.hadoop.SolrInputFormat", "org.apache.hadoop.io.NullWritable", "org.bigsolr.hadoop.SolrRecord", conf=conf)
rdd.count()
import json results = rdds.collect() for r in results: ... print json.loads(r[1])["id"] ... print json.loads(r[1])["description"]
### Indexing (Saving) RDDs in Solr with PySpark
For SolrCloud mode
conf = {"solr.server.url":"localhost:9983", "solr.server.mode":"cloud", "solr.server.collection":"collection1", "solr.server.fields":"id,description"}
For StandAlone/HttpServer mode
conf = {"solr.server.url":"http://localhost:8983/solr", "solr.server.mode":"standalone", "solr.server.collection":"collection1", "solr.server.fields":"id,description"}
m1 = (None, {"id": "1", "description": "apple orange New York", "author": "John"}) m2 = (None, {"id": "2", "description": "apple peach San Diego", "author": "Kevin"}) data = [m1,m2] rdds = sc.parallelize(data)
rdds.saveAsHadoopFile("-", "org.bigsolr.hadoop.SolrOutputFormat", "org.apache.hadoop.io.NullWritable", "org.apache.hadoop.io.MapWritable", conf=conf)
## License
This software is available under the [Apache License, Version 2.0](LICENSE.txt).
## Reporting Bugs
Please use GitHub to report feature requests or bugs.