Skip to content

Commit 576c43f

Browse files
committed
[SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes
## What changes were proposed in this pull request? Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including: - Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference - Fixing lots of "eta expansion of zero arg method deprecated" warnings - Resolving the SparkContext.sequenceFile implicits compile problem - Fixing an odd but valid jetty-server missing dependency in hive-thriftserver ## How was this patch tested? Existing tests Author: Sean Owen <[email protected]> Closes apache#19307 from srowen/Scala212.
1 parent 4943ea5 commit 576c43f

File tree

48 files changed

+316
-112
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+316
-112
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2826,6 +2826,42 @@ object WritableConverter {
28262826
// them automatically. However, we still keep the old functions in SparkContext for backward
28272827
// compatibility and forward to the following functions directly.
28282828

2829+
// The following implicit declarations have been added on top of the very similar ones
2830+
// below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta
2831+
// expansion of zero-arg methods and thus won't match a no-arg method where it expects
2832+
// an implicit that is a function of no args.
2833+
2834+
implicit val intWritableConverterFn: () => WritableConverter[Int] =
2835+
() => simpleWritableConverter[Int, IntWritable](_.get)
2836+
2837+
implicit val longWritableConverterFn: () => WritableConverter[Long] =
2838+
() => simpleWritableConverter[Long, LongWritable](_.get)
2839+
2840+
implicit val doubleWritableConverterFn: () => WritableConverter[Double] =
2841+
() => simpleWritableConverter[Double, DoubleWritable](_.get)
2842+
2843+
implicit val floatWritableConverterFn: () => WritableConverter[Float] =
2844+
() => simpleWritableConverter[Float, FloatWritable](_.get)
2845+
2846+
implicit val booleanWritableConverterFn: () => WritableConverter[Boolean] =
2847+
() => simpleWritableConverter[Boolean, BooleanWritable](_.get)
2848+
2849+
implicit val bytesWritableConverterFn: () => WritableConverter[Array[Byte]] = {
2850+
() => simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
2851+
// getBytes method returns array which is longer then data to be returned
2852+
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
2853+
}
2854+
}
2855+
2856+
implicit val stringWritableConverterFn: () => WritableConverter[String] =
2857+
() => simpleWritableConverter[String, Text](_.toString)
2858+
2859+
implicit def writableWritableConverterFn[T <: Writable : ClassTag]: () => WritableConverter[T] =
2860+
() => new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
2861+
2862+
// These implicits remain included for backwards-compatibility. They fulfill the
2863+
// same role as those above.
2864+
28292865
implicit def intWritableConverter(): WritableConverter[Int] =
28302866
simpleWritableConverter[Int, IntWritable](_.get)
28312867

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
218218
if (!conf.contains("spark.testing")) {
219219
// A task that periodically checks for event log updates on disk.
220220
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
221-
pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
221+
pool.scheduleWithFixedDelay(
222+
getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
222223

223224
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
224225
// A task that periodically cleans event logs on disk.
225-
pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
226+
pool.scheduleWithFixedDelay(
227+
getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
226228
}
227229
} else {
228230
logDebug("Background update thread disabled for testing")
@@ -268,7 +270,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
268270
appListener.adminAclsGroups.getOrElse("")
269271
ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
270272
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
271-
Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
273+
Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)))
272274
} else {
273275
None
274276
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -450,10 +450,9 @@ private[deploy] class Worker(
450450
}
451451
}(cleanupThreadExecutor)
452452

453-
cleanupFuture.onFailure {
454-
case e: Throwable =>
455-
logError("App dir cleanup failed: " + e.getMessage, e)
456-
}(cleanupThreadExecutor)
453+
cleanupFuture.failed.foreach(e =>
454+
logError("App dir cleanup failed: " + e.getMessage, e)
455+
)(cleanupThreadExecutor)
457456

458457
case MasterChanged(masterRef, masterWebUiUrl) =>
459458
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
@@ -622,10 +621,9 @@ private[deploy] class Worker(
622621
dirList.foreach { dir =>
623622
Utils.deleteRecursively(new File(dir))
624623
}
625-
}(cleanupThreadExecutor).onFailure {
626-
case e: Throwable =>
627-
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
628-
}(cleanupThreadExecutor)
624+
}(cleanupThreadExecutor).failed.foreach(e =>
625+
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
626+
)(cleanupThreadExecutor)
629627
}
630628
shuffleService.applicationRemoved(id)
631629
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ private[spark] class CoarseGrainedExecutorBackend(
163163
if (notifyDriver && driver.nonEmpty) {
164164
driver.get.ask[Boolean](
165165
RemoveExecutor(executorId, new ExecutorLossReason(reason))
166-
).onFailure { case e =>
166+
).failed.foreach(e =>
167167
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
168-
}(ThreadUtils.sameThread)
168+
)(ThreadUtils.sameThread)
169169
}
170170

171171
System.exit(code)

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
143143
}
144144

145145
executionPool.acquireMemory(
146-
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
146+
numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
147147
}
148148

149149
override def acquireStorageMemory(

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
128128
}
129129
// Compute the minimum and the maximum
130130
val (max: Double, min: Double) = self.mapPartitions { items =>
131-
Iterator(items.foldRight(Double.NegativeInfinity,
132-
Double.PositiveInfinity)((e: Double, x: (Double, Double)) =>
133-
(x._1.max(e), x._2.min(e))))
131+
Iterator(
132+
items.foldRight((Double.NegativeInfinity, Double.PositiveInfinity)
133+
)((e: Double, x: (Double, Double)) => (x._1.max(e), x._2.min(e))))
134134
}.reduce { (maxmin1, maxmin2) =>
135135
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
136136
}

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private[netty] class NettyRpcEnv(
232232
onFailure,
233233
(client, response) => onSuccess(deserialize[Any](client, response)))
234234
postToOutbox(message.receiver, rpcMessage)
235-
promise.future.onFailure {
235+
promise.future.failed.foreach {
236236
case _: TimeoutException => rpcMessage.onTimeout()
237237
case _ =>
238238
}(ThreadUtils.sameThread)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
2424

2525
import scala.annotation.tailrec
2626
import scala.collection.Map
27-
import scala.collection.mutable.{HashMap, HashSet, Stack}
27+
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
2828
import scala.concurrent.duration._
2929
import scala.language.existentials
3030
import scala.language.postfixOps
@@ -396,12 +396,12 @@ class DAGScheduler(
396396

397397
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
398398
private def getMissingAncestorShuffleDependencies(
399-
rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
400-
val ancestors = new Stack[ShuffleDependency[_, _, _]]
399+
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
400+
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
401401
val visited = new HashSet[RDD[_]]
402402
// We are manually maintaining a stack here to prevent StackOverflowError
403403
// caused by recursively visiting
404-
val waitingForVisit = new Stack[RDD[_]]
404+
val waitingForVisit = new ArrayStack[RDD[_]]
405405
waitingForVisit.push(rdd)
406406
while (waitingForVisit.nonEmpty) {
407407
val toVisit = waitingForVisit.pop()
@@ -434,7 +434,7 @@ class DAGScheduler(
434434
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
435435
val parents = new HashSet[ShuffleDependency[_, _, _]]
436436
val visited = new HashSet[RDD[_]]
437-
val waitingForVisit = new Stack[RDD[_]]
437+
val waitingForVisit = new ArrayStack[RDD[_]]
438438
waitingForVisit.push(rdd)
439439
while (waitingForVisit.nonEmpty) {
440440
val toVisit = waitingForVisit.pop()
@@ -456,7 +456,7 @@ class DAGScheduler(
456456
val visited = new HashSet[RDD[_]]
457457
// We are manually maintaining a stack here to prevent StackOverflowError
458458
// caused by recursively visiting
459-
val waitingForVisit = new Stack[RDD[_]]
459+
val waitingForVisit = new ArrayStack[RDD[_]]
460460
def visit(rdd: RDD[_]) {
461461
if (!visited(rdd)) {
462462
visited += rdd
@@ -1633,7 +1633,7 @@ class DAGScheduler(
16331633
val visitedRdds = new HashSet[RDD[_]]
16341634
// We are manually maintaining a stack here to prevent StackOverflowError
16351635
// caused by recursively visiting
1636-
val waitingForVisit = new Stack[RDD[_]]
1636+
val waitingForVisit = new ArrayStack[RDD[_]]
16371637
def visit(rdd: RDD[_]) {
16381638
if (!visitedRdds(rdd)) {
16391639
visitedRdds += rdd

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -471,15 +471,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
471471
*/
472472
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
473473
// Only log the failure since we don't care about the result.
474-
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure {
475-
case t => logError(t.getMessage, t)
476-
}(ThreadUtils.sameThread)
474+
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t =>
475+
logError(t.getMessage, t))(ThreadUtils.sameThread)
477476
}
478477

479478
protected def removeWorker(workerId: String, host: String, message: String): Unit = {
480-
driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure {
481-
case t => logError(t.getMessage, t)
482-
}(ThreadUtils.sameThread)
479+
driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t =>
480+
logError(t.getMessage, t))(ThreadUtils.sameThread)
483481
}
484482

485483
def sufficientResourcesRegistered(): Boolean = true

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,8 +501,8 @@ private class JavaIterableWrapperSerializer
501501
private object JavaIterableWrapperSerializer extends Logging {
502502
// The class returned by JavaConverters.asJava
503503
// (scala.collection.convert.Wrappers$IterableWrapper).
504-
val wrapperClass =
505-
scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
504+
import scala.collection.JavaConverters._
505+
val wrapperClass = Seq(1).asJava.getClass
506506

507507
// Get the underlying method so we can use it to get the Scala collection for serialization.
508508
private val underlyingMethodOpt = {

0 commit comments

Comments
 (0)