Skip to content

Commit 0a235cf

Browse files
committed
Try and fix a lot of style issues, we still have a ways to go but it should be slightly nicer to read
1 parent 7c7a91f commit 0a235cf

27 files changed

+446
-251
lines changed

scalastyle-config.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@
6767
</check>
6868
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
6969
<check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
70-
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
71-
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
70+
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="false"></check>
71+
<check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="false"></check>
7272
<check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
7373
<check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
7474
<check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>

src/main/1.3-only/scala/com/holdenkarau/spark/testing/LocalSparkContext.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ import org.scalatest.BeforeAndAfterAll
2323
import org.scalatest.BeforeAndAfterEach
2424
import org.scalatest.Suite
2525

26-
/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
27-
trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
26+
/**
27+
* Manages a local `sc` {@link SparkContext} variable,
28+
* correctly stopping it after each test.
29+
*/
30+
trait LocalSparkContext extends BeforeAndAfterEach
31+
with BeforeAndAfterAll { self: Suite =>
2832

2933
@transient var sc: SparkContext = _
3034

@@ -43,10 +47,9 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
4347

4448
object LocalSparkContext {
4549
def stop(sc: SparkContext) {
46-
if (sc != null) {
47-
sc.stop()
48-
}
49-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
50+
Option(sc).foreach(_.stop())
51+
// To avoid Akka rebinding to the same port, since it doesn't
52+
// unbind immediately on shutdown.
5053
System.clearProperty("spark.driver.port")
5154
}
5255

src/main/1.3/scala/com/holdenkarau/spark/testing/DataframeGenerator.scala

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,42 @@ object DataframeGenerator {
1616
* @param minPartitions minimum number of partitions, defaults to 1.
1717
* @return Arbitrary DataFrames generator of the required schema.
1818
*/
19-
def arbitraryDataFrame(sqlContext: SQLContext, schema: StructType, minPartitions: Int = 1): Arbitrary[DataFrame] = {
19+
def arbitraryDataFrame(
20+
sqlContext: SQLContext, schema: StructType, minPartitions: Int = 1):
21+
Arbitrary[DataFrame] = {
2022
arbitraryDataFrameWithCustomFields(sqlContext, schema, minPartitions)()
2123
}
2224

2325
/**
24-
* Creates a DataFrame Generator for the given Schema, and the given custom generators.
25-
* custom generators should be in the form of (column index, generator function).
26+
* Creates a DataFrame Generator for the given Schema, and the given custom
27+
* generators.
28+
* Custom generators should be specified as a list of:
29+
* (column index, generator function) tuples.
2630
*
2731
* Note: The given custom generators should match the required schema,
2832
* for ex. you can't use Int generator for StringType.
2933
*
30-
* Note 2: The ColumnGenerator* accepted as userGenerators has changed. ColumnGenerator is now the base class of the
31-
* accepted generators, users upgrading to 0.6 need to change their calls to use Column. Futher explanation can be
32-
* found in the release notes, and in the class descriptions at the bottom of this file.
34+
* Note 2: The ColumnGenerator* accepted as userGenerators has changed.
35+
* ColumnGenerator is now the base class of the
36+
* accepted generators, users upgrading to 0.6 need to change their calls
37+
* to use Column. Further explanation can be found in the release notes, and
38+
* in the class descriptions at the bottom of this file.
3339
*
3440
* @param sqlContext SQL Context.
3541
* @param schema The required Schema.
3642
* @param minPartitions minimum number of partitions, defaults to 1.
37-
* @param userGenerators custom user generators in the form of (column index, generator function).
38-
* column index starts from 0 to length - 1
43+
* @param userGenerators custom user generators in the form of:
44+
* (column index, generator function).
45+
* where column index starts from 0 to length - 1
3946
* @return Arbitrary DataFrames generator of the required schema.
4047
*/
41-
def arbitraryDataFrameWithCustomFields(sqlContext: SQLContext, schema: StructType, minPartitions: Int = 1)
42-
(userGenerators: ColumnGenerator*): Arbitrary[DataFrame] = {
48+
def arbitraryDataFrameWithCustomFields(
49+
sqlContext: SQLContext, schema: StructType, minPartitions: Int = 1)
50+
(userGenerators: ColumnGenerator*): Arbitrary[DataFrame] = {
4351

44-
val arbitraryRDDs = RDDGenerator.genRDD(sqlContext.sparkContext, minPartitions)(getRowGenerator(schema, userGenerators))
52+
val arbitraryRDDs = RDDGenerator.genRDD(
53+
sqlContext.sparkContext, minPartitions)(
54+
getRowGenerator(schema, userGenerators))
4555
Arbitrary {
4656
arbitraryRDDs.map(sqlContext.createDataFrame(_, schema))
4757
}
@@ -60,22 +70,31 @@ object DataframeGenerator {
6070
/**
6171
* Creates row generator for the required schema and with user's custom generators.
6272
*
63-
* Note: Custom generators should match the required schema, for ex. you can't use Int generator for StringType.
73+
* Note: Custom generators should match the required schema, for example
74+
* you can't use Int generator for StringType.
6475
*
6576
* @param schema the required Row's schema.
66-
* @param customGenerators user custom generator, this is useful if the user want to
67-
* Control specific columns generation.
77+
* @param customGenerators user custom generator, this is useful if the you want
78+
* to control specific columns generation.
6879
* @return Gen[Row]
6980
*/
70-
def getRowGenerator(schema: StructType, customGenerators: Seq[ColumnGenerator]): Gen[Row] = {
71-
val generators: List[Gen[Any]] = createGenerators(schema.fields, customGenerators)
72-
val listGen: Gen[List[Any]] = Gen.sequence[List[Any], Any](generators)
73-
val generator: Gen[Row] = listGen.map(list => Row.fromSeq(list))
81+
def getRowGenerator(
82+
schema: StructType, customGenerators: Seq[ColumnGenerator]): Gen[Row] = {
83+
val generators: List[Gen[Any]] =
84+
createGenerators(schema.fields, customGenerators)
85+
val listGen: Gen[List[Any]] =
86+
Gen.sequence[List[Any], Any](generators)
87+
val generator: Gen[Row] =
88+
listGen.map(list => Row.fromSeq(list))
7489
generator
7590
}
7691

77-
private def createGenerators(fields: Array[StructField], userGenerators: Seq[ColumnGenerator]): List[Gen[Any]] = {
78-
val generatorMap = userGenerators.map(generator => (generator.columnName -> generator)).toMap
92+
private def createGenerators(
93+
fields: Array[StructField],
94+
userGenerators: Seq[ColumnGenerator]):
95+
List[Gen[Any]] = {
96+
val generatorMap = userGenerators.map(
97+
generator => (generator.columnName -> generator)).toMap
7998
(0 until fields.length).toList.map(index => {
8099
if (generatorMap.contains(fields(index).name)) {
81100
generatorMap.get(fields(index).name).get match {
@@ -87,7 +106,8 @@ object DataframeGenerator {
87106
})
88107
}
89108

90-
private def getGenerator(dataType: DataType, generators: Seq[ColumnGenerator] = Seq()): Gen[Any] = {
109+
private def getGenerator(
110+
dataType: DataType, generators: Seq[ColumnGenerator] = Seq()): Gen[Any] = {
91111
dataType match {
92112
case ByteType => Arbitrary.arbitrary[Byte]
93113
case ShortType => Arbitrary.arbitrary[Short]
@@ -102,7 +122,7 @@ object DataframeGenerator {
102122
case DateType => Arbitrary.arbLong.arbitrary.map(new Date(_))
103123
case arr: ArrayType => {
104124
val elementGenerator = getGenerator(arr.elementType)
105-
return Gen.listOf(elementGenerator)
125+
Gen.listOf(elementGenerator)
106126
}
107127
case map: MapType => {
108128
val keyGenerator = getGenerator(map.keyType)
@@ -112,31 +132,37 @@ object DataframeGenerator {
112132
value <- valueGenerator
113133
} yield (key, value)
114134

115-
return Gen.mapOf(keyValueGenerator)
135+
Gen.mapOf(keyValueGenerator)
116136
}
117-
case row: StructType => return getRowGenerator(row, generators)
118-
case _ => throw new UnsupportedOperationException(s"Type: $dataType not supported")
137+
case row: StructType => getRowGenerator(row, generators)
138+
case _ => throw new UnsupportedOperationException(
139+
s"Type: $dataType not supported")
119140
}
120141
}
121142

122143
}
123144

124145
/**
125-
* Previously ColumnGenerator. Allows the user to specify a generator for a specific column
146+
* Previously ColumnGenerator. Allows the user to specify a generator for a
147+
* specific column.
126148
*/
127-
class Column(val columnName: String, generator: => Gen[Any]) extends ColumnGenerator {
149+
class Column(val columnName: String, generator: => Gen[Any])
150+
extends ColumnGenerator {
128151
lazy val gen = generator
129152
}
130153

131154
/**
132-
* ColumnList allows users to specify custom generators for a list of columns inside a StructType column
155+
* ColumnList allows users to specify custom generators for a list of
156+
* columns inside a StructType column.
133157
*/
134-
class ColumnList(val columnName: String, generators: => Seq[ColumnGenerator]) extends ColumnGenerator {
158+
class ColumnList(val columnName: String, generators: => Seq[ColumnGenerator])
159+
extends ColumnGenerator {
135160
lazy val gen = generators
136161
}
137162

138163
/**
139-
* ColumnGenerator - prevously Column; it is now the base class for all ColumnGenerators
164+
* ColumnGenerator - prevously Column; it is now the base class for all
165+
* ColumnGenerators.
140166
*/
141167
abstract class ColumnGenerator extends java.io.Serializable {
142168
val columnName: String

src/main/1.3/scala/com/holdenkarau/spark/testing/JavaDataFrameSuiteBase.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.holdenkarau.spark.testing
22

3-
class JavaDataFrameSuiteBase extends SharedJavaSparkContext with DataFrameSuiteBaseLike with JavaTestSuite {
3+
class JavaDataFrameSuiteBase extends
4+
SharedJavaSparkContext with DataFrameSuiteBaseLike with JavaTestSuite {
45

56
override def beforeAllTestCasesHook() {
67
sqlBeforeAllTestCases()

src/main/1.3/scala/com/holdenkarau/spark/testing/JavaRDDComparisons.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ object JavaRDDComparisons extends RDDComparisonsLike with JavaTestSuite {
1616
* Compare two RDDs. If they are equal returns None, otherwise
1717
* returns Some with the first mismatch. Assumes we have the same partitioner.
1818
*/
19-
def compareRDDWithOrder[T](expected: JavaRDD[T], result: JavaRDD[T]): Option[(Option[T], Option[T])] = {
19+
def compareRDDWithOrder[T](expected: JavaRDD[T], result: JavaRDD[T]):
20+
Option[(Option[T], Option[T])] = {
2021
implicit val ctag = Utils.fakeClassTag[T]
2122
compareRDDWithOrder(expected.rdd, result.rdd)
2223
}
@@ -33,13 +34,16 @@ object JavaRDDComparisons extends RDDComparisonsLike with JavaTestSuite {
3334
* Compare two RDDs where we do not require the order to be equal.
3435
* If they are equal returns None, otherwise returns Some with the first mismatch.
3536
*
36-
* @return None if the two RDDs are equal, or Some That contains first mismatch information.
37-
* Mismatch information will be Tuple3 of: (key, number of times this key occur in expected RDD,
37+
* @return None if the two RDDs are equal, or Some that contains the first
38+
* mismatch information. Mismatch information will be Tuple3 of:
39+
* (key, number of times this key occur in expected RDD,
3840
* number of times this key occur in result RDD)
3941
*/
40-
def compareRDD[T](expected: JavaRDD[T], result: JavaRDD[T]): Option[(T, Integer, Integer)] = {
42+
def compareRDD[T](expected: JavaRDD[T], result: JavaRDD[T]):
43+
Option[(T, Integer, Integer)] = {
4144
implicit val ctag = Utils.fakeClassTag[T]
42-
compareRDD(expected.rdd, result.rdd).map(x => (x._1, Integer.valueOf(x._2), Integer.valueOf(x._3)))
45+
compareRDD(expected.rdd, result.rdd).
46+
map(x => (x._1, Integer.valueOf(x._2), Integer.valueOf(x._3)))
4347
}
4448

4549
}

src/main/1.3/scala/com/holdenkarau/spark/testing/JavaStreamingSuitebase.scala

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ package com.holdenkarau.spark.testing
1818

1919
import java.util.{List => JList}
2020

21-
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
21+
import org.apache.spark.api.java.function.{
22+
Function => JFunction, Function2 => JFunction2}
2223
import org.apache.spark.streaming.api.java._
2324
import org.apache.spark.streaming.dstream.DStream
2425
import org.junit.Assert._
@@ -29,11 +30,13 @@ import scala.language.implicitConversions
2930
import scala.reflect.ClassTag
3031

3132
/**
32-
* This is the base trait for Spark Streaming testsuite. This provides basic functionality
33-
* to run user-defined set of input on user-defined stream operations, and verify the output.
33+
* This is the base trait for Spark Streaming testsuite. This provides basic
34+
* functionality to run user-defined set of input on user-defined stream operations,
35+
* and verify the output matches as expected.
36+
*
3437
* This implementation is designed to work with JUnit for java users.
3538
*
36-
* Note: this always uses the manual clock
39+
* Note: this always uses the manual clock to control Spark Streaming's batches.
3740
*/
3841
class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
3942

@@ -67,7 +70,9 @@ class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
6770
compareArrays[V](expectedOutput(i).toArray, output(i).toArray)
6871
} else {
6972
implicit val config = Bag.configuration.compact[V]
70-
compareArrays[V](Bag(expectedOutput(i): _*).toArray, Bag(output(i): _*).toArray)
73+
compareArrays[V](
74+
Bag(expectedOutput(i): _*).toArray,
75+
Bag(output(i): _*).toArray)
7176
}
7277
}
7378

@@ -77,7 +82,8 @@ class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
7782
/**
7883
* Test unary DStream operation with a list of inputs, with number of
7984
* batches to run same as the number of input values.
80-
* You can simulate the input batch as a List of values or as null to simulate empty batch.
85+
*
86+
* Each input micro-batch is a list of values or as null to simulate empty batch.
8187
*
8288
* @param input Sequence of input collections
8389
* @param operation Binary DStream operation to be applied to the 2 inputs
@@ -93,7 +99,8 @@ class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
9399
/**
94100
* Test unary DStream operation with a list of inputs, with number of
95101
* batches to run same as the number of input values.
96-
* You can simulate the input batch as a List of values or as null to simulate empty batch.
102+
*
103+
* Each input micro-batch is a list of values or as null to simulate empty batch.
97104
*
98105
* @param input Sequence of input collections
99106
* @param operation Binary DStream operation to be applied to the 2 inputs
@@ -120,18 +127,23 @@ class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
120127
operation.call(new JavaDStream[U](input)).dstream
121128
}
122129

123-
withOutputAndStreamingContext(setupStreams[U, V](sInput, wrappedOperation)) {
130+
withOutputAndStreamingContext(
131+
setupStreams[U, V](sInput, wrappedOperation)) {
132+
124133
(outputStream, ssc) =>
125-
val output: Seq[Seq[V]] = runStreams[V](outputStream, ssc, numBatches, expectedOutput.size)
126-
verifyOutput[V](output, sExpectedOutput, ordered)
134+
val output: Seq[Seq[V]] =
135+
runStreams[V](outputStream, ssc, numBatches, expectedOutput.size)
136+
verifyOutput[V](output, sExpectedOutput, ordered)
127137
}
128138
}
129139

130140

131141
/**
132142
* Test binary DStream operation with two lists of inputs, with number of
133-
* batches to run same as the number of input values. The size of the two input lists Should be the same.
134-
* You can simulate the input batch as a List of values or as null to simulate empty batch.
143+
* batches to run same as the number of input values. The size of the two input
144+
* lists should be equal.
145+
*
146+
* Each input micro-batch is a list of values or as null to simulate empty batch.
135147
*
136148
* @param input1 First sequence of input collections
137149
* @param input2 Second sequence of input collections
@@ -148,8 +160,10 @@ class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
148160

149161
/**
150162
* Test binary DStream operation with two lists of inputs, with number of
151-
* batches to run same as the number of input values. The size of the two input lists Should be the same.
152-
* You can simulate the input batch as a List of values or as null to simulate empty batch.
163+
* batches to run same as the number of input values. The size of the two input
164+
* lists should be equal.
165+
*
166+
* Each input micro-batch is a list of values or as null to simulate empty batch.
153167
*
154168
* @param input1 First sequence of input collections
155169
* @param input2 Second sequence of input collections
@@ -166,7 +180,8 @@ class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
166180
expectedOutput: JList[JList[W]],
167181
ordered: Boolean): Unit = {
168182

169-
assertEquals("Length of the input lists are not equal", input1.length, input2.length)
183+
assertEquals("Length of the input lists are not equal",
184+
input1.length, input2.length)
170185
val numBatches = input1.size
171186

172187
implicit val ctagU = Utils.fakeClassTag[U]
@@ -181,10 +196,13 @@ class JavaStreamingSuiteBase extends JavaSuiteBase with StreamingSuiteCommon {
181196
operation.call(new JavaDStream[U](input1), new JavaDStream[V](input2)).dstream
182197
}
183198

184-
withOutputAndStreamingContext(setupStreams[U, V, W](sInput1, sInput2, wrappedOperation)) {
199+
withOutputAndStreamingContext(
200+
setupStreams[U, V, W](sInput1, sInput2, wrappedOperation)) {
201+
185202
(outputStream, ssc) =>
186-
val output = runStreams[W](outputStream, ssc, numBatches, expectedOutput.size)
187-
verifyOutput[W](output, sExpectedOutput, ordered)
203+
val output = runStreams[W](
204+
outputStream, ssc, numBatches, expectedOutput.size)
205+
verifyOutput[W](output, sExpectedOutput, ordered)
188206
}
189207
}
190208

src/main/1.3/scala/com/holdenkarau/spark/testing/PerTestSparkContext.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ import org.scalatest.BeforeAndAfterAll
2323
import org.scalatest.BeforeAndAfterEach
2424
import org.scalatest.Suite
2525

26-
/** Provides a local `sc` {@link SparkContext} variable, correctly stopping it after each test.
27-
* The stopping logic is provided in {@link LocalSparkContext} */
26+
/**
27+
* Provides a local `sc`
28+
* {@link SparkContext} variable, correctly stopping it after each test.
29+
* The stopping logic is provided in {@link LocalSparkContext}.
30+
*/
2831
trait PerTestSparkContext extends LocalSparkContext with BeforeAndAfterEach
2932
with SparkContextProvider { self: Suite =>
3033

0 commit comments

Comments
 (0)