Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for un compressed files, unit test added. #381

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions app/com/linkedin/drelephant/util/SparkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ trait SparkUtils {
val SPARK_EVENT_LOG_DIR_KEY = "spark.eventLog.dir"
val SPARK_EVENT_LOG_COMPRESS_KEY = "spark.eventLog.compress"
val DFS_HTTP_PORT = 50070
val NO_COMPRESSION_CODEC = "no-compression"

/**
* Returns the webhdfs FileSystem and Path for the configured Spark event log directory and optionally the
Expand Down Expand Up @@ -118,9 +119,13 @@ trait SparkUtils {
(path, codec)
}
case None => {
val (logPath, codecName) = getLogPathAndCodecName(fs, fs.getUri.resolve(basePath.toUri), appId)
val (logPath, codecName) = getLogPathAndCodecName(sparkConf, fs, fs.getUri.resolve(basePath.toUri), appId)

(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName))))
if (codecName.equals(NO_COMPRESSION_CODEC)) {
(logPath, None)
} else {
(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName))))
}
}
}

Expand Down Expand Up @@ -182,6 +187,7 @@ trait SparkUtils {
private val IN_PROGRESS = ".inprogress"
private val DEFAULT_COMPRESSION_CODEC = "lz4"


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit : Remove unnecessary lines (apply elsewhere)

private val compressionCodecClassNamesByShortName = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
Expand Down Expand Up @@ -239,10 +245,12 @@ trait SparkUtils {
(appId, attempt, extension)
}
private def getLogPathAndCodecName(
sparkConf: SparkConf,
fs: FileSystem,
logBaseDir: URI,
appId: String
): (Path, String) = {
val shouldUseCompression = sparkConf.getBoolean(SPARK_EVENT_LOG_COMPRESS_KEY, defaultValue = false)
val base = logBaseDir.toString.stripSuffix("/");
val filter = new PathFilter() {
override def accept(file: Path): Boolean = {
Expand Down Expand Up @@ -274,6 +282,9 @@ trait SparkUtils {
// if codec is not available, but we found a file match with appId, use the actual file Path from the first match
case nocodec if nocodec._1 != None & nocodec._3 == None => (attemptsList(0).getPath(), DEFAULT_COMPRESSION_CODEC)

case nocompression if nocompression._1 == None & nocompression._2 == None & nocompression._3 == None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write the coment for this case condition , as have written in another case conditions

& shouldUseCompression == false =>
(new Path(base + "/" + appId), NO_COMPRESSION_CODEC)
// This should be reached only if we can't parse the filename in the path.
// Try to construct a general path in that case.
case _ => (new Path(base + "/" + appId + "." + DEFAULT_COMPRESSION_CODEC), DEFAULT_COMPRESSION_CODEC)
Expand Down
6 changes: 4 additions & 2 deletions app/org/apache/spark/deploy/history/SparkFSFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant
logger.info("The event log of Spark application: " + appId + " is over the limit size of "
+ eventLogSizeLimitMb + " MB, the parsing process gets throttled.")
} else {
logger.info("Replaying Spark logs for application: " + appId +
if (eventLogCodec.nonEmpty) {
logger.info("Replaying Spark logs for application: " + appId +
" withlogPath: " + eventLogPath +
" with codec:" + eventLogCodec)
" with codec:" + eventLogCodec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit : Extra space

Copy link
Contributor

@pralabhkumar pralabhkumar Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please log also in case eventLogCodec empty (as with no-compression case)


}
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec) { in =>
dataCollection.load(in, eventLogPath.toString())
}
Expand Down
45 changes: 45 additions & 0 deletions test/com/linkedin/drelephant/util/SparkUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,51 @@ class SparkUtilsTest extends FunSpec with org.scalatest.Matchers with OptionValu
}
}
}

describe(".withEventLogAndNoCompression") {
it("loans the input stream for the event log with no compression") {
val expectedLog =
"""{"Event":"SparkListenerApplicationStart","App Name":"app","App ID":"application_1","Timestamp":1,"User":"foo"}"""

val eventLogBytes = {
val bout = new ByteArrayOutputStream()
for {
in <- resource.managed(new ByteArrayInputStream(expectedLog.getBytes("UTF-8")))
out <- resource.managed(bout)
} {
IOUtils.copy(in, out)
}
bout.toByteArray
}

val hadoopConfiguration = new Configuration(false)

val sparkConf =
new SparkConf()
.set("spark.eventLog.dir", "/logs/spark")
.set("spark.eventLog.compress", "false")

val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
new URI("webhdfs://nn1.grid.example.com:50070"),
new Path("/logs/spark"),
new Path("application_1"),
eventLogBytes
)

val (fs, basePath) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration, sparkConf, None)

val (path, codec) =
sparkUtils.pathAndCodecforEventLog(sparkConf: SparkConf, fs: FileSystem, basePath: Path, "application_1", None)

sparkUtils.withEventLog(fs, path, codec) { in =>
val bout = new ByteArrayOutputStream()
IOUtils.copy(in, bout)

val actualLog = new String(bout.toByteArray, "UTF-8")
actualLog should be(expectedLog)
}
}
}
}
}

Expand Down