-
Notifications
You must be signed in to change notification settings - Fork 856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for un compressed files, unit test added. #381
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ trait SparkUtils { | |
val SPARK_EVENT_LOG_DIR_KEY = "spark.eventLog.dir" | ||
val SPARK_EVENT_LOG_COMPRESS_KEY = "spark.eventLog.compress" | ||
val DFS_HTTP_PORT = 50070 | ||
val NO_COMPRESSION_CODEC = "no-compression" | ||
|
||
/** | ||
* Returns the webhdfs FileSystem and Path for the configured Spark event log directory and optionally the | ||
|
@@ -118,9 +119,13 @@ trait SparkUtils { | |
(path, codec) | ||
} | ||
case None => { | ||
val (logPath, codecName) = getLogPathAndCodecName(fs, fs.getUri.resolve(basePath.toUri), appId) | ||
val (logPath, codecName) = getLogPathAndCodecName(sparkConf, fs, fs.getUri.resolve(basePath.toUri), appId) | ||
|
||
(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName)))) | ||
if (codecName.equals(NO_COMPRESSION_CODEC)) { | ||
(logPath, None) | ||
} else { | ||
(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName)))) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -182,6 +187,7 @@ trait SparkUtils { | |
private val IN_PROGRESS = ".inprogress" | ||
private val DEFAULT_COMPRESSION_CODEC = "lz4" | ||
|
||
|
||
private val compressionCodecClassNamesByShortName = Map( | ||
"lz4" -> classOf[LZ4CompressionCodec].getName, | ||
"lzf" -> classOf[LZFCompressionCodec].getName, | ||
|
@@ -239,10 +245,12 @@ trait SparkUtils { | |
(appId, attempt, extension) | ||
} | ||
private def getLogPathAndCodecName( | ||
sparkConf: SparkConf, | ||
fs: FileSystem, | ||
logBaseDir: URI, | ||
appId: String | ||
): (Path, String) = { | ||
val shouldUseCompression = sparkConf.getBoolean(SPARK_EVENT_LOG_COMPRESS_KEY, defaultValue = false) | ||
val base = logBaseDir.toString.stripSuffix("/"); | ||
val filter = new PathFilter() { | ||
override def accept(file: Path): Boolean = { | ||
|
@@ -274,6 +282,9 @@ trait SparkUtils { | |
// if codec is not available, but we found a file match with appId, use the actual file Path from the first match | ||
case nocodec if nocodec._1 != None & nocodec._3 == None => (attemptsList(0).getPath(), DEFAULT_COMPRESSION_CODEC) | ||
|
||
case nocompression if nocompression._1 == None & nocompression._2 == None & nocompression._3 == None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please write the coment for this case condition , as have written in another case conditions |
||
& shouldUseCompression == false => | ||
(new Path(base + "/" + appId), NO_COMPRESSION_CODEC) | ||
// This should be reached only if we can't parse the filename in the path. | ||
// Try to construct a general path in that case. | ||
case _ => (new Path(base + "/" + appId + "." + DEFAULT_COMPRESSION_CODEC), DEFAULT_COMPRESSION_CODEC) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,10 +97,12 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant | |
logger.info("The event log of Spark application: " + appId + " is over the limit size of " | ||
+ eventLogSizeLimitMb + " MB, the parsing process gets throttled.") | ||
} else { | ||
logger.info("Replaying Spark logs for application: " + appId + | ||
if (eventLogCodec.nonEmpty) { | ||
logger.info("Replaying Spark logs for application: " + appId + | ||
" withlogPath: " + eventLogPath + | ||
" with codec:" + eventLogCodec) | ||
" with codec:" + eventLogCodec) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit : Extra space There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please log also in case eventLogCodec empty (as with no-compression case) |
||
|
||
} | ||
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec) { in => | ||
dataCollection.load(in, eventLogPath.toString()) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit : Remove unnecessary lines (apply elsewhere)