Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit f9942ff

Browse files
authored
Merge pull request #12 from intenthq/spark-2
Update to spark 2.0
2 parents 9dba929 + b8bd475 commit f9942ff

File tree

7 files changed

+28
-33
lines changed

7 files changed

+28
-33
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,29 @@ The following top level dependencies are published in Maven central:
1212

1313
**Thrift support**:
1414
```
15-
"com.intenthq.pucket" %% "pucket-thrift" % "1.2.0"
15+
"com.intenthq.pucket" %% "pucket-thrift" % "1.3.0"
1616
```
1717

1818
**Avro support**:
1919
```
20-
"com.intenthq.pucket" %% "pucket-avro" % "1.2.0"
20+
"com.intenthq.pucket" %% "pucket-avro" % "1.3.0"
2121
```
2222

2323
**Spark connectors**:
2424
```
25-
"com.intenthq.pucket" %% "pucket-spark" % "1.2.0"
25+
"com.intenthq.pucket" %% "pucket-spark" % "1.3.0"
2626
```
2727

2828
**MapReduce integration**:
2929
```
30-
"com.intenthq.pucket" %% "pucket-mapreduce" % "1.2.0"
30+
"com.intenthq.pucket" %% "pucket-mapreduce" % "1.3.0"
3131
```
3232

3333
These dependencies should be combined depending on your usages; for example if you use Thrift and Spark then use the following:
3434

3535
```
36-
"com.intenthq.pucket" %% "pucket-thrift" % "1.2.0"
37-
"com.intenthq.pucket" %% "pucket-spark" % "1.2.0"
36+
"com.intenthq.pucket" %% "pucket-thrift" % "1.3.0"
37+
"com.intenthq.pucket" %% "pucket-spark" % "1.3.0"
3838
```
3939

4040

avro/src/main/scala/com/intenthq/pucket/avro/AvroPucketDescriptor.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,24 @@ object AvroPucketDescriptor extends PucketDescriptorCompanion {
4242

4343
val avroSchemaKey = "avroSchema"
4444

45-
private def validate[T <: HigherType](descriptorString: String): Throwable \/ (Map[String, String], Schema, CompressionCodecName, Option[PucketPartitioner[T]]) =
45+
private def validate[T <: HigherType](descriptorString: String): Throwable \/ (Schema, CompressionCodecName, Option[PucketPartitioner[T]]) =
4646
for {
4747
underlying <- parseDescriptor[T](descriptorString)
4848
schema <- extractValue(underlying._1, avroSchemaKey)
49-
parsedSchema <- parse(schema)
5049
avroSchema <- \/.fromTryCatchNonFatal(new Schema.Parser().parse(schema))
51-
} yield (parsedSchema, avroSchema, underlying._2, underlying._3)
50+
} yield (avroSchema, underlying._2, underlying._3)
5251

5352
/** @inheritdoc */
5453
override def apply[T <: IndexedRecord](expectedSchema: Schema, descriptorString: String): Throwable \/ AvroPucketDescriptor[T] =
5554
for {
5655
underlying <- validate[T](descriptorString)
57-
expectedSchemaJson <- parse(expectedSchema.toString)
58-
_ <- if (underlying._2 == expectedSchema) ().right
56+
_ <- if (underlying._1 == expectedSchema) ().right
5957
else new RuntimeException("Found schema does not match expected").left
60-
} yield AvroPucketDescriptor[T](underlying._2, underlying._3, underlying._4)
58+
} yield AvroPucketDescriptor[T](underlying._1, underlying._2, underlying._3)
6159

6260
/** @inheritdoc */
6361
override def apply[T <: HigherType](descriptorString: String): Throwable \/ AvroPucketDescriptor[T] =
64-
validate[T](descriptorString).map(underlying => AvroPucketDescriptor[T](underlying._2, underlying._3, underlying._4))
62+
validate[T](descriptorString).map(underlying => AvroPucketDescriptor[T](underlying._1, underlying._2, underlying._3))
6563
}
6664

6765

avro/src/test/scala/com/intenthq/pucket/avro/AvroPucketDescriptorSpec.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,11 @@ class AvroPucketDescriptorSpec extends Specification with DisjunctionMatchers wi
3131

3232
def badString =
3333
Prop.forAll(randomString) { s =>
34-
AvroPucketDescriptor[AvroTest](AvroTest.getClassSchema, s) must be_-\/[Throwable].like {
35-
case a => a must beAnInstanceOf[jodd.json.JsonException]
36-
}
34+
AvroPucketDescriptor[AvroTest](AvroTest.getClassSchema, s) must be_-\/[Throwable]
3735
}
3836

3937
def badSchema =
40-
AvroPucketDescriptor[AvroTest](AvroTest.getClassSchema, badSchemaString) must be_-\/.like {
41-
case a => a must beAnInstanceOf[jodd.json.JsonException]
42-
}
38+
AvroPucketDescriptor[AvroTest](AvroTest.getClassSchema, badSchemaString) must be_-\/[Throwable]
4339

4440
def schemaClassIncorrect =
4541
AvroPucketDescriptor[AvroTest](AvroTest.getClassSchema, incorrectSchema) must be_-\/.like {

build.sbt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import com.typesafe.sbt.SbtGit.{GitKeys => git}
44

55
val specs2Ver = "3.6.4"
66
val parquetVer = "1.8.1"
7-
val hadoopVer = "2.7.1"
8-
val sparkVer = "1.6.1"
7+
val hadoopVer = "2.7.3"
8+
val sparkVer = "2.0.0"
9+
val circeVersion = "0.5.1"
910

1011
val pomInfo = (
1112
<url>https://github.com/intenthq/pucket</url>
@@ -32,7 +33,7 @@ def excludeServlet(deps: Seq[ModuleID]) = deps.map(_.exclude("javax.servlet", "s
3233

3334
lazy val commonSettings = Seq(
3435
organization := "com.intenthq.pucket",
35-
version := "1.2.1",
36+
version := "1.3.0",
3637
scalaVersion := "2.11.8",
3738
publishArtifact in Test := false,
3839
pomIncludeRepository := { _ => false },
@@ -48,13 +49,17 @@ lazy val commonSettings = Seq(
4849
autoAPIMappings := true,
4950
libraryDependencies ++= excludeServlet(Seq(
5051
"org.scalaz" %% "scalaz-core" % "7.1.3",
51-
"org.jodd" % "jodd-json" % "3.7",
5252
"org.mortbay.jetty" % "servlet-api" % "3.0.20100224" % "provided",
5353
"org.apache.hadoop" % "hadoop-common" % hadoopVer % "provided",
5454
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVer % "provided",
5555
"org.apache.parquet" % "parquet-column" % parquetVer,
5656
"org.apache.parquet" % "parquet-hadoop" % parquetVer
5757
)),
58+
libraryDependencies ++= Seq(
59+
"io.circe" %% "circe-core",
60+
"io.circe" %% "circe-generic",
61+
"io.circe" %% "circe-parser"
62+
).map(_ % circeVersion),
5863
dependencyOverrides += "org.slf4j" % "slf4j-log4j12" % "1.7.12",
5964
dependencyOverrides += "org.slf4j" % "slf4j-api" % "1.7.12",
6065
resolvers ++= Seq(
@@ -145,6 +150,7 @@ lazy val spark = (project in file("spark")).
145150
ExclusionRule(organization = "org.slf4j"),
146151
ExclusionRule(organization = "log4j"),
147152
ExclusionRule(organization = "org.scala-lang"),
153+
ExclusionRule(organization = "org.scalatest"),
148154
ExclusionRule(organization = "javax.servlet", name = "servlet-api")
149155
)
150156
))

core/src/main/scala/com/intenthq/pucket/PucketDescriptor.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.intenthq.pucket
22

33
import com.intenthq.pucket.util.PucketPartitioner
4-
import jodd.json.{JsonParser, JsonSerializer}
4+
import io.circe.parser._
5+
import io.circe.syntax._
56
import org.apache.hadoop.fs.Path
67
import org.apache.parquet.hadoop.api.ReadSupport
78
import org.apache.parquet.hadoop.metadata.CompressionCodecName
89

9-
import scala.collection.JavaConverters._
1010
import scalaz.\/
1111
import scalaz.syntax.either._
1212

@@ -16,8 +16,6 @@ import scalaz.syntax.either._
1616
* @tparam T the type of data being stored in the pucket
1717
*/
1818
trait PucketDescriptor[T] {
19-
val jsonSerializer = new JsonSerializer()
20-
2119
import PucketDescriptor._
2220
/** Parquet compression codec */
2321
def compression: CompressionCodecName
@@ -42,7 +40,7 @@ trait PucketDescriptor[T] {
4240
*
4341
* @return JSON string of descriptor
4442
*/
45-
override def toString: String = jsonSerializer.serialize(json.asJava)
43+
override def toString: String = json.asJson.spaces2
4644
}
4745

4846
/** Trait for implementations of the pucket descriptor companion object */
@@ -108,5 +106,5 @@ object PucketDescriptor {
108106
fold[Throwable \/ String](new RuntimeException(s"Could not find $key in descriptor").left)(_.right)
109107

110108
def parse(json: String): Throwable \/ Map[String, String] =
111-
\/.fromTryCatchNonFatal(new JsonParser().parse(json).asInstanceOf[java.util.Map[String, String]].asScala.toMap)
109+
decode[Map[String, String]](json).fold(_.getCause.left, _.right)
112110
}

spark/src/main/scala/com/intenthq/pucket/spark/PucketSparkAdapter.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import org.apache.spark.SparkContext
1010
import org.apache.spark.rdd.RDD
1111

1212
import scala.collection.JavaConversions._
13-
1413
import scala.reflect.ClassTag
1514
/** Provides classes to extend Spark's RDD so that puckets can be read and written with spark */
1615
object PucketSparkAdapter {

thrift/src/test/scala/com/intenthq/pucket/thrift/ThriftPucketDescriptorSpec.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ class ThriftPucketDescriptorSpec extends Specification with DisjunctionMatchers
3030

3131
def badString =
3232
Prop.forAll(randomString) { s =>
33-
ThriftPucketDescriptor[ThriftTest](classOf[ThriftTest], s) must be_-\/[Throwable].like {
34-
case a => a must beAnInstanceOf[jodd.json.JsonException]
35-
}
33+
ThriftPucketDescriptor[ThriftTest](classOf[ThriftTest], s) must be_-\/[Throwable]
3634
}
3735

3836
def schemaClassNotFound =

0 commit comments

Comments
 (0)