Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support uncompressed event logs #357

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions app/com/linkedin/drelephant/util/SparkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ trait SparkUtils {
}
case None => {
val (logPath, codecName) = getLogPathAndCodecName(fs, fs.getUri.resolve(basePath.toUri), appId)

(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName))))
if(codecName == UNCOMPRESSED){
(logPath, None)
} else {
(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName))))
}
}
}

Expand Down Expand Up @@ -181,6 +184,7 @@ trait SparkUtils {

private val IN_PROGRESS = ".inprogress"
private val DEFAULT_COMPRESSION_CODEC = "lz4"
private val UNCOMPRESSED = "uncompressed"

private val compressionCodecClassNamesByShortName = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
Expand Down Expand Up @@ -227,14 +231,14 @@ trait SparkUtils {
val nameAndExtension = logPath.split('.')
if( nameAndExtension.length == 2 ) {
extension = Some(nameAndExtension(1))
val name = nameAndExtension(0)
val appIdAndAttempt = name.split('_')
if( appIdAndAttempt.length == 4 ) {
attempt = Some(appIdAndAttempt(3))
appId = Some(appIdAndAttempt.dropRight(1).mkString("_"))
} else {
appId = Some(name)
}
}
val name = nameAndExtension(0)
val appIdAndAttempt = name.split('_')
if (appIdAndAttempt.length == 4) {
attempt = Some(appIdAndAttempt(3))
appId = Some(appIdAndAttempt.dropRight(1).mkString("_"))
} else {
appId = Some(name)
}
(appId, attempt, extension)
}
Expand Down Expand Up @@ -272,7 +276,7 @@ trait SparkUtils {
"_" + sanitize(finalAttempt._2.get) +
"." + finalAttempt._3.get), finalAttempt._3.get)
// if codec is not available, but we found a file match with appId, use the actual file Path from the first match
case nocodec if nocodec._1 != None & nocodec._3 == None => (attemptsList(0).getPath(), DEFAULT_COMPRESSION_CODEC)
case nocodec if nocodec._1 != None & nocodec._3 == None => (attemptsList(0).getPath(), UNCOMPRESSED)

// This should be reached only if we can't parse the filename in the path.
// Try to construct a general path in that case.
Expand Down