Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-359. Cache livy logs as config driven #332

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

praveen-kanamarlapudi
Copy link
Contributor

Changes Made:

  1. Added logs cacheing as config driven.
  2. Fixed adding driver process logs livy logs for local and client mode in interactive mode.

.map(new LineBufferedProcess(_))
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
Copy link
Contributor Author

@praveen-kanamarlapudi praveen-kanamarlapudi May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Earlier we were returning None if the master is local (not yarn) so the driver logs didn't go to livy logs.
I think this change should solve the problem.
Let me know if we are ok with this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I want.

_lock.lock()
try {
_lines = _lines :+ line
info(s"stdout: $line")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao In client mode we were seeing spark driver logs in livy log after great delay as the log statement was after the loop. I agree log inside the lock is heavy.

Let me know if there is a better way to handle the scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this line out of try finally block.

@codecov-io
Copy link

codecov-io commented May 16, 2017

Codecov Report

Merging #332 into master will increase coverage by 0.06%.
The diff coverage is 76.19%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #332      +/-   ##
============================================
+ Coverage     70.42%   70.49%   +0.06%     
- Complexity      726      727       +1     
============================================
  Files            96       96              
  Lines          5123     5124       +1     
  Branches        774      774              
============================================
+ Hits           3608     3612       +4     
+ Misses          997      994       -3     
  Partials        518      518
Impacted Files Coverage Δ Complexity Δ
...scala/com/cloudera/livy/utils/LivySparkUtils.scala 69.86% <100%> (ø) 0 <0> (ø) ⬇️
.../com/cloudera/livy/utils/SparkProcessBuilder.scala 57.14% <100%> (ø) 12 <0> (ø) ⬇️
...n/scala/com/cloudera/livy/utils/SparkProcApp.scala 41.17% <100%> (+2.28%) 5 <1> (ø) ⬇️
...er/src/main/scala/com/cloudera/livy/LivyConf.scala 94.4% <100%> (+0.04%) 16 <0> (ø) ⬇️
...a/livy/server/interactive/InteractiveSession.scala 63.05% <33.33%> (-0.87%) 42 <1> (ø)
.../com/cloudera/livy/utils/LineBufferedProcess.scala 73.33% <66.66%> (ø) 7 <1> (ø) ⬇️
...a/com/cloudera/livy/utils/LineBufferedStream.scala 82.5% <81.81%> (+4.12%) 5 <2> (ø) ⬇️
...la/com/cloudera/livy/scalaapi/ScalaJobHandle.scala 55.88% <0%> (+2.94%) 0% <0%> (ø) ⬇️
...va/com/cloudera/livy/rsc/rpc/KryoMessageCodec.java 98.11% <0%> (+3.77%) 19% <0%> (+1%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 59af39d...2d092ae. Read the comment docs.

@jerryshao
Copy link
Contributor

jerryshao commented May 16, 2017

@praveenkanamarlapudi thanks for bring this out, could you please file a JIRA and update the PR title first.

@jerryshao
Copy link
Contributor

Also please fix the UTs.

.map(new LineBufferedProcess(_))
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I want.

@@ -36,17 +38,17 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
private val thread = new Thread {
override def run() = {
val lines = Source.fromInputStream(inputStream).getLines()
for (line <- lines) {
for (line <- lines) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove unnecessary white space.

_lock.lock()
try {
_lines = _lines :+ line
info(s"stdout: $line")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this line out of try finally block.

_lock.lock()
try {
_lines = _lines :+ line
info(s"stdout: $line")
if(logSize > 0) _lines add line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add if check, I assume if logSize is zero then we cannot add line to this queue.

@@ -59,7 +61,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
thread.setDaemon(true)
thread.start()

def lines: IndexedSeq[String] = _lines
def lines: IndexedSeq[String] = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will introduce threading issue when copying to new array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add lock here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

@@ -100,7 +100,7 @@ object LivySparkUtils extends Logging {
pb.environment().put("LIVY_TEST_CLASSPATH", sys.props("java.class.path"))
}

val process = new LineBufferedProcess(pb.start())
val process = new LineBufferedProcess(pb.start(), 200)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using configuration instead of hard-coded number.

Copy link
Contributor Author

@praveen-kanamarlapudi praveen-kanamarlapudi May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it runs only when livy server is started. Do we need to make it config driven?
If the number is set too low by mistake, the livy server mayn't start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see.

@@ -92,7 +92,8 @@ object SparkApp {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
require(process.isDefined, "process must not be None when Livy master is not YARN.")
// process is None in recovery mode
// require(process.isDefined, "process must not be None when Livy master is not YARN.")
new SparkProcApp(process.get, listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If process can be None, will this process.get work?

override def log(): IndexedSeq[String] = process.inputLines

override def log(): IndexedSeq[String] =
("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line after it.

@praveen-kanamarlapudi praveen-kanamarlapudi changed the title Added logs cacheing as config driven. LIVY-359. Cache livy logs as config driven May 16, 2017
@praveen-kanamarlapudi praveen-kanamarlapudi force-pushed the master branch 2 times, most recently from 6c9dba2 to 75d8511 Compare May 16, 2017 22:43
@@ -31,6 +31,9 @@
# If livy should impersonate the requesting users when creating a new session.
# livy.impersonation.enabled = true

# Logs size livy can cache for each session/batch. 0 means don't cache the logs.
# livy.spark.logs.size = 200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to rename this to "livy.session.cache-log.size".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Let me do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name it like livy.cache-log.size or something as it applies to both interactive and batch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm fine with it.

}
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is a bit strange and not so scala-ish. Why not keep the original style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driverProcess can be None for session recovery. We need to return None for that case and create SparkApp otherwise. Let me know if there is a better way to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks.

Also can we change the style to:

driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

try {
_lines = _lines :+ line
try {
_lines add line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the prefix whitespace before try. Also be better to use _lines.add(line) for clearance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. let me update it.

@praveen-kanamarlapudi
Copy link
Contributor Author

@jerryshao updated PR as per the comments. Please have a look at it.

@@ -90,7 +97,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
}

override def next(): String = {
val line = _lines(index)
val line = _lines.poll()
index += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This index seems not used any more.

Also I was thinking this poll may has threading issue with add. I'm not sure if EvictingQueue is a thread safe queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. It's not thread safe. Added lock to it.

@jerryshao
Copy link
Contributor

LGTM, thanks for the fix.

@praveen-kanamarlapudi
Copy link
Contributor Author

@jerryshao Are you waiting for any changes from me?

@jerryshao
Copy link
Contributor

No, I don't have further comments, because I don't have merging permission, so it is pending.

CC @zjffdu can you please take a review, thanks!

@praveen-kanamarlapudi
Copy link
Contributor Author

@alex-the-man @zjffdu can you please review the PR.

@jerryshao
Copy link
Contributor

Hi @praveenkanamarlapudi sorry for delay, can you please submit this PR against incubator-livy (https://github.com/apache/incubator-livy/pulls)?

@jerryshao
Copy link
Contributor

Hi @praveenkanamarlapudi would you please submit this PR again in incubator-livy? If you're not working on this, I can help to cherry-pick the changes.

@praveen-kanamarlapudi
Copy link
Contributor Author

Hi @jerryshao I will make the PR tomorrow.

@praveen-kanamarlapudi
Copy link
Contributor Author

Made the PR-14

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants