-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix classpath hashing #1832
base: main
Are you sure you want to change the base?
fix classpath hashing #1832
Conversation
kpodsiad
commented
Oct 5, 2022
•
edited
Loading
edited
- remove previous mechanism which tried to share work between concurrent requests - it was causing too much mental overhead and was error prone. I'd say that gains from it were pretty negligibly small - if the same jar was being hashed at the same time workload was shared - I'd say that it's very rare situation and it doesn't cost that much. After computation results are cached so subsequent requests will use cache.
- uncomment already existing tests & fix them
- remove unused code
def testAsyncT(name: String, maxDuration: Duration)( | ||
run: => Task[Unit] | ||
): Unit = { | ||
test(name) { | ||
Await.result(run.runAsync(ExecutionContext.scheduler), maxDuration) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new utility method which accepts task - no more Thread.sleep and Await.Result in tests. Logic can be composed via Task's map/flatMap and we can have Await.Result only at the end.
/** Creates an empty workspace where operations described as Tasks can happen. */ | ||
def withinWorkspaceT[T](fun: AbsolutePath => Task[T]): Task[T] = for { | ||
temp <- Task(Files.createTempDirectory("bloop-test-workspace").toRealPath()) | ||
tempDirectory = AbsolutePath(temp) | ||
result <- fun(tempDirectory).doOnFinish(_ => Task(delete(tempDirectory))) | ||
} yield result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the withinWorkspaceT
but creating and deleting directory is bounded to Task's lifetime
456bf89
to
54160c6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! In general looks good, I have only one question.
Also, we need to ask @tgodzik to run performance tests.
It's the first occurrence of Task.parSequnceN
on a large amount of task - I have some doubts about it's implementation 😸
Running them right now! |
Performance test finished successfully. Benchmarks is based on merging with master |
Gimme Gimme Gimme results from perf tests after midnight. |
It seems to have run correctly but I don't see any results yet, I think there is some delay there :/ |
Ok, I don't see any regressions, so we should be good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some very minor comments, but I am wondering how much was the deduplication of hashing needed.
@Duhemm do you have any idea what kind of gains this might offer? Would you be able to test the difference on a larger codebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I brought back workload sharing. Implementation is almost pure (updating concurrent hashmap isn't) and should be more straightforward to understand.
|
||
Task.gatherUnordered(classpath.map(readJar(_))) | ||
object ClasspathHasher { | ||
final val global: ClasspathHasher = new ClasspathHasher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was impossible to test ClasspathHasher
because tests were sharing global cache. I turned it into class and create global
instance because:
- I didn't want to touch already existing code which was referring to
ClasspathHasher
as an object. Now it's usingClasspathHasher.global
which is almost the same - in tests, each testcase is creating a fresh instance of
ClasspathHasher
- no pollution
private sealed trait FileHashingResult { | ||
def task: Task[FileHash] | ||
} | ||
private final case class Computed(task: Task[FileHash]) extends FileHashingResult | ||
private final case class FromPromise(task: Task[FileHash]) extends FileHashingResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is scaladoc good enough to explain why this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the description you wrote:
remove previous mechanism which tried to share work between concurrent requests - it was causing too much mental overhead and was error prone.
should we not have only computed since we don't want to depend on another task?
I must have messed up cancelation somehow :/ |
Looks like the tests are not finishing now 🤔 |
|
@tgodzik @dos65 you guys probably want to unsubscribe from his PR for while, I can push a lot of trash commits here to debug what's going on.
This is actually ok I think, every hash request for file is adding/removing or just waiting for promise to complete. Logging is placed at the very end of task, when all task were resolved so it means that all promises should be completed & removed too. I'm a bit lost here currently as locally there is no such deadlock like there is on CI. |
Hi! Thanks a lot for looking into this @kpodsiad ! I did some testing on a small subset of our build, and it looks like there's a small performance regression for us:
Some info about the Bloop projects that are being built:
|
* remove previous mechanism which tried to share work between concurrent requests - it was causing too much mental overhead and was error prone * uncomment already existing tests & fix them * remove unused code
def parSequenceN[A](n: Int)(in: Iterable[Task[A]]): Task[Vector[A]] = { | ||
if (in.isEmpty) { | ||
Task.now(Vector.empty) | ||
} else { | ||
// val isCancelled = new AtomicBoolean(false) | ||
Task.defer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previously, parSequenceN
impl was fragile to big tasks halting other tasks in chunk from progressing:
val chunks = in.grouped(n).toList.map(group => Task.parSequence(group))
Task.sequence(chunks).map(_.flatten)
I ported Monix implementation of parSequenceN
which starts N workers that are constantly polling for the next job whenever they finish its task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for coming back to this! It looks good, though I have some questions and comments.
@@ -593,6 +593,7 @@ abstract class BspBaseSuite extends BaseSuite with BspClientTest { | |||
// https://github.com/scalacenter/bloop/issues/281 | |||
super.ignore(name, "DISABLED")(fun) | |||
} else { | |||
pprint.log(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
@@ -159,7 +160,10 @@ object ResultsCache { | |||
logger: Logger | |||
): ResultsCache = { | |||
val handle = loadAsync(build, cwd, cleanOrphanedInternalDirs, logger) | |||
Await.result(handle.runAsync(ExecutionContext.ioScheduler), Duration.Inf) | |||
Await.result( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is causing timeouts for DAP tests, any reason for the change?
} | ||
} | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the commented out method?
@@ -533,6 +533,16 @@ abstract class BaseSuite extends TestSuite with BloopHelpers { | |||
) | |||
} | |||
|
|||
@nowarn("msg=parameter value run|maxDuration in method ignore is never used") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not removed to avoid too many changes?
private sealed trait FileHashingResult { | ||
def task: Task[FileHash] | ||
} | ||
private final case class Computed(task: Task[FileHash]) extends FileHashingResult | ||
private final case class FromPromise(task: Task[FileHash]) extends FileHashingResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the description you wrote:
remove previous mechanism which tried to share work between concurrent requests - it was causing too much mental overhead and was error prone.
should we not have only computed since we don't want to depend on another task?