Skip to content

Commit

Permalink
WIP: more tmp changes to avoid out-of-memory
Browse files Browse the repository at this point in the history
  • Loading branch information
dtebbs committed May 10, 2021
1 parent b2d76f0 commit 743a2e7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
10 changes: 7 additions & 3 deletions src/main/java/common/PairRDDAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.ArrayList;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;
import scala.collection.JavaConverters;

Expand Down Expand Up @@ -57,10 +59,12 @@ void processBatch() {
final var newBatchRDD = sc.parallelizePairs(currentBatch, numPartitions);

// To avoid running out of memory, 'checkpoint' the RDD. (The goal is to
// force it to be fully evaluated and then stored on disk, removing any need
// to recompute it, since receomputing requires that the original array of
// batch data must be present in memory somewhere).
// force it to be fully evaluated (and potentially evicted to disk),
// removing any need to recompute it, since receomputing requires that the
// original array of batch data must be present in memory somewhere).
newBatchRDD.cache();
newBatchRDD.checkpoint();
// newBatchRDD.persist(StorageLevel.MEMORY_AND_DISK());

batches.add(newBatchRDD);
currentBatch = null;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/prover/Prover.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ static JavaSparkContext createSparkContext(boolean local) {
spark.sparkContext().conf().set("spark.files.overwrite", "true");

// checkpoint directory
spark.sparkContext().setCheckpointDir("checkpoint");
spark.sparkContext().setCheckpointDir("hdfs://ip-172-31-42-216:9000/checkpoints/");
// clean checkpoint files if the reference is out of scope
spark.sparkContext().conf().set("spark.cleaner.referenceTracking.cleanCheckpoints", "true");
// spark.sparkContext().conf().set("spark.cleaner.referenceTracking.cleanCheckpoints", "true");

// TODO: reinstate this when it can be made to work
// spark.sparkContext().conf().set(
Expand Down Expand Up @@ -226,7 +226,7 @@ void run(
final int numPartitions = 8;
final StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();

final int batchSize = 32*1024;
final int batchSize = 8*1024;
final var provingKeyRDD =
provingKeyReader.readProvingKeyRDD(primaryInputSize, sc, numPartitions, batchSize);
final var primFullRDD =
Expand Down

0 comments on commit 743a2e7

Please sign in to comment.