Skip to content

Commit

Permalink
WIP: more tmp changes to avoid out-of-memory
Browse files Browse the repository at this point in the history
  • Loading branch information
dtebbs committed May 17, 2021
1 parent d8fb489 commit 7aedb51
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
10 changes: 7 additions & 3 deletions src/main/java/common/PairRDDAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.ArrayList;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;
import scala.collection.JavaConverters;

Expand Down Expand Up @@ -57,10 +59,12 @@ void processBatch() {
final var newBatchRDD = sc.parallelizePairs(currentBatch, numPartitions);

// To avoid running out of memory, 'checkpoint' the RDD. (The goal is to
// force it to be fully evaluated and then stored on disk, removing any need
// to recompute it, since receomputing requires that the original array of
// batch data must be present in memory somewhere).
// force it to be fully evaluated (and potentially evicted to disk),
// removing any need to recompute it, since receomputing requires that the
// original array of batch data must be present in memory somewhere).
newBatchRDD.cache();
newBatchRDD.checkpoint();
// newBatchRDD.persist(StorageLevel.MEMORY_AND_DISK());

batches.add(newBatchRDD);
currentBatch = null;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/prover/Prover.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ static JavaSparkContext createSparkContext(boolean local) {
spark.sparkContext().conf().set("spark.files.overwrite", "true");

// checkpoint directory
spark.sparkContext().setCheckpointDir("checkpoint");
spark.sparkContext().setCheckpointDir("hdfs://ip-172-31-42-216:9000/checkpoints/");
// clean checkpoint files if the reference is out of scope
spark.sparkContext().conf().set("spark.cleaner.referenceTracking.cleanCheckpoints", "true");
// spark.sparkContext().conf().set("spark.cleaner.referenceTracking.cleanCheckpoints", "true");

// TODO: reinstate this when it can be made to work
// spark.sparkContext().conf().set(
Expand Down

0 comments on commit 7aedb51

Please sign in to comment.