diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index e01c0fe..0fec307 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -11,14 +11,47 @@ } data: { - # data source. optional of nebula,nebula-ngql,csv,json + # data source. optional of nebula,nebula-ngql,csv,json,hive source: csv - # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text + # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text,hive sink: csv # if your algorithm needs weight hasWeight: false } + # Hive related config + hive: { + #[Optional] spark and hive require configuration on different clusters. Read and write connect hive with this metastore + metaStoreUris: "thrift://hive-metastore-server:9083" + # algo's data source from hive + read: { + #spark sql + sql: "select column_1,column_2,column_3 from database_01.table_01 " + #[Optional] graph source vid mapping with column of sql result. + srcId: "column_1" + #[Optional] graph dest vid mapping with column of sql result + dstId: "column_2" + #[Optional] graph weight mapping with column of sql result + weight: "column_3" + } + + # algo result sink into hive + write: { + #save result to hive table + dbTableName: "database_02.table_02" + #[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite + saveMode: "Overwrite" + #[Optional] if auto create hive table. Default is true + autoCreateTable: true + #[Optional] algorithm result mapping with hive table column name. Default same with column name of algo result dataframe + resultTableColumnMapping: { + # Note: Different algorithms have different output fields, so let's take the pagerank algorithm for example: + _id: "column_1" + pagerank: "pagerank_value" + } + } + } + # NebulaGraph related config nebula: { # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid. @@ -78,7 +111,7 @@ # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent, # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount, # betweenness, graphtriangleCount, clusteringcoefficient, bfs, hanp, closeness, jaccard, node2vec] - executeAlgo: graphtrianglecount + executeAlgo: pagerank # PageRank parameter pagerank: { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 0b2fc54..974f88b 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -55,7 +55,7 @@ object Main { val algoTime = System.currentTimeMillis() // writer - saveAlgoResult(algoResult, configs) + saveAlgoResult(sparkConfig.spark, algoResult, configs) val endTime = System.currentTimeMillis() sparkConfig.spark.stop() @@ -149,8 +149,8 @@ object Main { } } - private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = { + private[this] def saveAlgoResult(spark: SparkSession, algoResult: DataFrame, configs: Configs): Unit = { val writer = AlgoWriter.make(configs) - writer.write(algoResult, configs) + writer.write(spark, algoResult, configs) } } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index dc906d6..6222bc6 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -12,6 +12,7 @@ import org.apache.log4j.Logger import scala.collection.JavaConverters._ import com.typesafe.config.{Config, ConfigFactory} import com.vesoft.nebula.algorithm.config.Configs.readConfig +import com.vesoft.nebula.algorithm.config.Configs.getOrElse import scala.collection.mutable @@ -129,6 +130,51 @@ object LocalConfigEntry { } } + +object HiveConfigEntry { + def apply(config: Config): HiveConfigEntry = { + //uri of hive metastore. eg: thrift://127.0.0.1:9083 + val hiveMetaStoreUris: String = getOrElse(config, "hive.metaStoreUris", "") + val readConfigEntry = buildReadConfig(config) + val writeConfigEntry = buildWriteConfig(config) + HiveConfigEntry(hiveMetaStoreUris,readConfigEntry, writeConfigEntry) + } + + def buildReadConfig(config: Config): HiveReadConfigEntry = { + //source data of spark sql + val sql: String = getOrElse(config, "hive.read.sql", "") + //the source vertex ID is mapped with the SQL result column name + val srcIdCol: String = getOrElse(config, "hive.read.srcId", "") + //the dest vertex ID is mapped with the SQL result column name + val dstIdCol: String = getOrElse(config, "hive.read.dstId", "") + //the weight is mapped with the SQL result column name + val weightCol: String = getOrElse(config, "hive.read.weight", "") + HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol) + } + + def buildWriteConfig(config: Config): HiveWriteConfigEntry = { + //algo result save to hive table + val dbTableName: String = getOrElse(config, "hive.write.dbTableName", "") + //save mode of spark + val saveMode: String = getOrElse(config, "hive.write.saveMode", "") + //Whether the table is automatically created + val autoCreateTable: Boolean = getOrElse(config, "hive.write.autoCreateTable", true) + //algo results dataframe column and hive table column mapping relationships + val resultColumnMapping = mutable.Map[String, String]() + val mappingKey = "hive.write.resultTableColumnMapping" + if (config.hasPath(mappingKey)) { + val mappingConfig = config.getObject(mappingKey) + for (subkey <- mappingConfig.unwrapped().keySet().asScala) { + val key = s"${mappingKey}.${subkey}" + val value = config.getString(key) + resultColumnMapping += subkey -> value + } + } + HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping) + } + +} + /** * SparkConfigEntry support key-value pairs for spark session. * @@ -173,6 +219,34 @@ case class LocalConfigEntry(filePath: String, } } +case class HiveConfigEntry(hiveMetaStoreUris: String, + hiveReadConfigEntry: HiveReadConfigEntry, + hiveWriteConfigEntry: HiveWriteConfigEntry) { + override def toString: String = { + s"HiveConfigEntry: {hiveMetaStoreUris:$hiveMetaStoreUris, read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}" + } +} + +case class HiveReadConfigEntry(sql: String, + srcIdCol: String = "srcId", + dstIdCol: String = "dstId", + weightCol: String) { + override def toString: String = { + s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " + + s"weightCol:$weightCol}" + } +} + +case class HiveWriteConfigEntry(dbTableName: String, + saveMode: String, + autoCreateTable: Boolean, + resultColumnMapping: mutable.Map[String, String]) { + override def toString: String = { + s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " + + s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping}" + } +} + /** * NebulaConfigEntry * @param readConfigEntry config for nebula-spark-connector reader @@ -218,6 +292,7 @@ case class Configs(sparkConfig: SparkConfigEntry, dataSourceSinkEntry: DataSourceSinkEntry, nebulaConfig: NebulaConfigEntry, localConfigEntry: LocalConfigEntry, + hiveConfigEntry: HiveConfigEntry, algorithmConfig: AlgorithmConfigEntry) object Configs { @@ -237,10 +312,11 @@ object Configs { val dataSourceEntry = DataSourceSinkEntry(config) val localConfigEntry = LocalConfigEntry(config) val nebulaConfigEntry = NebulaConfigEntry(config) - val sparkEntry = SparkConfigEntry(config) - val algorithmEntry = AlgorithmConfigEntry(config) + val hiveConfigEntry = HiveConfigEntry(config) + val sparkEntry = SparkConfigEntry(config) + val algorithmEntry = AlgorithmConfigEntry(config) - Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, algorithmEntry) + Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, hiveConfigEntry, algorithmEntry) } /** @@ -277,15 +353,15 @@ object Configs { } /** - * Get the value from config by the path. If the path not exist, return the default value. - * - * @param config The config. - * @param path The path of the config. - * @param defaultValue The default value for the path. - * - * @return - */ - private[this] def getOrElse[T](config: Config, path: String, defaultValue: T): T = { + * Get the value from config by the path. If the path not exist, return the default value. + * + * @param config The config. + * @param path The path of the config. + * @param defaultValue The default value for the path. + * + * @return + */ + def getOrElse[T](config: Config, path: String, defaultValue: T): T = { if (config.hasPath(path)) { config.getAnyRef(path).asInstanceOf[T] } else { diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala index 7c863be..86c68b4 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala @@ -5,6 +5,8 @@ package com.vesoft.nebula.algorithm.config +import com.vesoft.nebula.algorithm.reader.ReaderType +import com.vesoft.nebula.algorithm.writer.WriterType import org.apache.spark.sql.SparkSession case class SparkConfig(spark: SparkSession, partitionNum: Int) @@ -20,12 +22,29 @@ object SparkConfig { sparkConfigs.foreach { case (key, value) => session.config(key, value) } + + // set hive config + setHiveConfig(session, configs) + val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0") val spark = session.getOrCreate() validate(spark.version, "2.4.*") SparkConfig(spark, partitionNum.toInt) } + private def setHiveConfig(session: org.apache.spark.sql.SparkSession.Builder, configs: Configs): Unit = { + val dataSource = configs.dataSourceSinkEntry + if (dataSource.source.equals(ReaderType.hive.stringify) + || dataSource.sink.equals(WriterType.hive.stringify)) { + session.enableHiveSupport() + val uris = configs.hiveConfigEntry.hiveMetaStoreUris + if (uris != null && uris.trim.nonEmpty) { + session.config("hive.metastore.schema.verification", false) + session.config("hive.metastore.uris", uris) + } + } + } + private def validate(sparkVersion: String, supportedVersions: String*): Unit = { if (sparkVersion != "UNKNOWN" && !supportedVersions.exists(sparkVersion.matches)) { throw new RuntimeException( diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala index e11d868..872e834 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala @@ -25,6 +25,7 @@ object DataReader { case ReaderType.nebulaNgql => new NebulaNgqlReader case ReaderType.nebula => new NebulaReader case ReaderType.csv => new CsvReader + case ReaderType.hive => new HiveReader } .getOrElse(throw new UnsupportedOperationException("unsupported reader")) } @@ -179,3 +180,30 @@ final class JsonReader extends DataReader { data } } +final class HiveReader extends DataReader { + + override val tpe: ReaderType = ReaderType.hive + override def read(spark: SparkSession, configs: Configs, partitionNum: Int): DataFrame = { + val readConfig = configs.hiveConfigEntry.hiveReadConfigEntry + val sql = readConfig.sql + val srcIdCol = readConfig.srcIdCol + val dstIdCol = readConfig.dstIdCol + val weightCol = readConfig.weightCol + + var data = spark.sql(sql) + + if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) { + if (configs.dataSourceSinkEntry.hasWeight && weightCol != null && weightCol.trim.nonEmpty) { + data = data.select(srcIdCol, dstIdCol, weightCol) + } else { + data = data.select(srcIdCol, dstIdCol) + } + } + + if (partitionNum != 0) { + data.repartition(partitionNum) + } + + data + } +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala index ca1d101..12fc054 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala @@ -17,6 +17,7 @@ sealed trait ReaderType { case ReaderType.nebulaNgql => "nebula-ngql" case ReaderType.nebula => "nebula" case ReaderType.csv => "csv" + case ReaderType.hive => "hive" } } object ReaderType { @@ -24,10 +25,12 @@ object ReaderType { json.stringify -> json, nebulaNgql.stringify -> nebulaNgql, nebula.stringify -> nebula, - csv.stringify -> csv + csv.stringify -> csv, + hive.stringify -> hive ) object json extends ReaderType object nebulaNgql extends ReaderType object nebula extends ReaderType object csv extends ReaderType + object hive extends ReaderType } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala index e4da34d..3fcb8ce 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/AlgoWriter.scala @@ -8,11 +8,11 @@ package com.vesoft.nebula.algorithm.writer import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteMode, WriteNebulaVertexConfig} import com.vesoft.nebula.algorithm.config.{AlgoConstants, Configs} -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} abstract class AlgoWriter { val tpe:WriterType - def write(data: DataFrame, configs: Configs): Unit + def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit } object AlgoWriter { def make(configs: Configs): AlgoWriter = { @@ -20,6 +20,7 @@ object AlgoWriter { case WriterType.text => new TextWriter case WriterType.nebula => new NebulaWriter case WriterType.csv => new CsvWriter + case WriterType.hive => new HiveWriter }.getOrElse(throw new UnsupportedOperationException("unsupported writer")) } @@ -27,7 +28,7 @@ object AlgoWriter { final class NebulaWriter extends AlgoWriter { override val tpe: WriterType = WriterType.nebula - override def write(data: DataFrame, configs: Configs): Unit = { + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { val graphAddress = configs.nebulaConfig.writeConfigEntry.graphAddress val metaAddress = configs.nebulaConfig.writeConfigEntry.metaAddress val space = configs.nebulaConfig.writeConfigEntry.space @@ -61,7 +62,7 @@ final class NebulaWriter extends AlgoWriter { final class CsvWriter extends AlgoWriter { override val tpe: WriterType = WriterType.csv - override def write(data: DataFrame, configs: Configs): Unit = { + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { val resultPath = configs.localConfigEntry.resultPath data.write.option("header", true).csv(resultPath) } @@ -69,8 +70,40 @@ final class CsvWriter extends AlgoWriter { final class TextWriter extends AlgoWriter { override val tpe: WriterType = WriterType.text - override def write(data: DataFrame, configs: Configs): Unit = { + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { val resultPath = configs.localConfigEntry.resultPath data.write.option("header", true).text(resultPath) } } + +final class HiveWriter extends AlgoWriter { + override val tpe: WriterType = WriterType.hive + override def write(spark: SparkSession, data: DataFrame, configs: Configs): Unit = { + val config = configs.hiveConfigEntry.hiveWriteConfigEntry + val saveMode = SaveMode.values().find(_.name.equalsIgnoreCase(config.saveMode)).getOrElse(SaveMode.Append) + val columnMapping = config.resultColumnMapping + + var _data = data + columnMapping.map{ + case (from, to) => + _data = _data.withColumnRenamed(from, to) + } + + if(config.autoCreateTable){ + val createTableStatement = generateCreateTableStatement(_data, config.dbTableName) + println(s"execute create hive table statement:${createTableStatement}") + spark.sql(createTableStatement) + } + + _data.write.mode(saveMode).insertInto(config.dbTableName) + } + + def generateCreateTableStatement(df: DataFrame, tableName: String): String = { + val columns = df.schema.fields + val columnDefinitions = columns.map { field => + s"${field.name} ${field.dataType.typeName}" + }.mkString(",\n ") + s"CREATE TABLE IF NOT EXISTS $tableName (\n $columnDefinitions\n)" + } + +} diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala index 84a7839..1a81497 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/writer/WriterType.scala @@ -16,15 +16,18 @@ sealed trait WriterType { case WriterType.text => "text" case WriterType.nebula => "nebula" case WriterType.csv => "csv" + case WriterType.hive => "hive" } } object WriterType { lazy val mapping: Map[String, WriterType] = Map( text.stringify -> text, nebula.stringify -> nebula, - csv.stringify -> csv + csv.stringify -> csv, + hive.stringify -> hive ) object text extends WriterType object nebula extends WriterType object csv extends WriterType + object hive extends WriterType }