- Support Hierarchical Data Format, HDF5/NetCDF4 and Rich Parallel I/O Interface in Spark
- Optimize I/O Performance on HPC with Lustre Filesystems Tuning
- Input is HDF5 file(s)
- Output is a RDD object
git clone https://github.com/valiantljk/h5spark.git
Python version:
- export PYTHONPATH=$PYTHONPATH:path_to_h5spark/src/main/python/h5spark
- sbatch spark-python.sh
Scala version:
- export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:path_to_h5spark/lib
- module load sbt
- sbt assembly
- sbatch spark-scala.sh
Add the h5spark path to your python path:
export PYTHONPATH=$PYTHONPATH:path_to_h5spark/src/main/python/h5spark
Then your python codes will be like so:
from pyspark import SparkContext
import os,sys
import h5py
import read
def test_h5sparkReadsingle():
sc = SparkContext(appName="h5sparktest")
rdd=read.h5read(sc,('oceanTemps.h5','temperatures'),mode='single',partitions=100)
rdd.cache()
print "rdd count:",rdd.count()
sc.stop()
if __name__ == '__main__':
test_h5sparkReadsingle()
Current h5spark python read API:
Read single file:
h5read(sc,(file,dataset),mode='single', partitions)
Read multiple files:
Takes in a list of (file, dataset) tuples, one such tuple or the name of a file that contains a list of files and returns rdd with each row as a record
h5read(sc,file_list_or_txt_file,mode='multi', partitions)
Besides, we have the functions to return indexedrow and indexedrowmatrix
h5read_irow
h5read_imat
- export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:your_project_dir/lib
- cp h5spark/target/scala-2.10/h5spark_2.10-1.0.jar your_project_dir/lib/
- cp h5spark/lib/* your_project_dir/lib/
- cp project/assembly.sbt your_project_dir/project/
- sbt assembly
- Then in your codes, you can use it like:
import org.nersc.io._
object readtest {
def main(args: Array[String]): Unit = {
var logger = LoggerFactory.getLogger(getClass)
val sc = new SparkContext()
val rdd = read.h5read_array (sc,"oceanTemps.h5","temperatures", 3000)
rdd.cache()
val count= rdd.count()
logger.info("\nRDD_Count: "+count+" , Total number of rows of all hdf5 files\n")
sc.stop()
}
}
Current h5spark scala read API supports:
val rdd = read.h5read_point (sc, inputpath, variablename, partition) //load n-D data into RDD[(value:Double,key:Long)]
val rdd = read.h5read_array (sc, inputpath, variablename, partition) //load n-D data into RDD[Array[Double]]
val rdd = read.h5read_vec (sc,inputpath, variablename, partition) //Load n-D data into RDD[DenseVector]
val rdd = read.h5read_irow (sc,inputpath, variablename, partition) //Load n-D data into RDD[IndexedRow]
val rdd = read.h5read_imat (sc,inputpath, variablename, partition) //Load n-D data into IndexedRowMatrix
- If you are using NERSC's machine, please feel free to email [email protected]
- If not, you can send your questions to [email protected]
J.L. Liu, E. Racah, Q. Koziol, R. S. Canon, A. Gittens, L. Gerhardt, S. Byna, M. F. Ringenburg, Prabhat. "H5Spark: Bridging the I/O Gap between Spark and Scientific Data Formats on HPC Systems", Cray User Group, 2016, (Paper, Slides, Bib)
Alex Gittens, Aditya Devarakonda, Evan Racah, Michael Ringenburg, Lisa Gerhardt, Jey Kottalam, Jialin Liu, Kristyn Maschhoff, Shane Canon, Jatin Chhugani, Pramod Sharma, Jiyan Yang, James Demmel, Jim Harrell, Venkat Krishnamurthy, Michael W Mahoney. "Matrix Factorizations at Scale: A Comparison of Scientific Data Analytics in Spark and C+ MPI using Three Case Studies". IEEE BigData 2016. Paper.
- Tested at full scale on Cori phase 1, with 1600 nodes, 51200 cores. H5Spark took 2 minutes to load 16 TBs HDF5 2D data
- H5Spark takes 35 seconds in loading 2 TB data, while MPI uses 15 seconds.
- LLNL: Spark-HDF5
- NASA: SciSpark